style: 移除设计不当的部分模块

This commit is contained in:
2026-04-16 13:20:12 +08:00
parent f7c072dd0b
commit 215a8648c2
22 changed files with 87 additions and 1283 deletions

View File

@@ -9,29 +9,31 @@ from contextvars import ContextVar
from heurams.services.config import ConfigFile from heurams.services.config import ConfigFile
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
# 默认配置文件路径规定: 以包目录为准 # 默认数据目录, 以包目录下的 data 为准
# 用户配置文件路径规定: 以运行目录为准 # 用户数据目录, 以运行目录下的 data 为准
# 数据文件路径规定: 以运行目录为准
rootdir: pathlib.Path = pathlib.Path(__file__).parent
"""包目录路径, 也就是 heurams 目录."""
rootdir = pathlib.Path(__file__).parent
workdir = pathlib.Path.cwd() workdir = pathlib.Path.cwd()
#print(f"项目根目录: {rootdir}") """工作目录路径."""
#print(f"工作目录: {workdir}")
logger = get_logger(__name__) logger = get_logger(__name__)
logger.debug(f"项目根目录: {rootdir}") logger.debug(f"目录: {rootdir}")
logger.debug(f"工作目录: {workdir}") logger.debug(f"工作目录: {workdir}")
(workdir / "data" / "config").mkdir(parents=True, exist_ok=True) (workdir / "data" / "config").mkdir(parents=True, exist_ok=True)
config_var: ContextVar[ConfigFile] = ContextVar( config_var: ContextVar[ConfigFile].get = ContextVar(
"config_var", "config_var",
default=ConfigFile(workdir / "data" / "config" / "config.toml"), default=ConfigFile(workdir / "data" / "config" / "config.toml"),
) )
"""配置对象的全局引用对象."""
class ConfigContext: class ConfigContext:
""" """
功能完备的上下文管理器 功能完备的上下文管理器
用于临时切换配置的作用域, 支持嵌套使用 用于临时切换配置引用对象的作用域, 支持嵌套使用
Example: Example:
>>> with ConfigContext(test_config): >>> with ConfigContext(test_config):

View File

@@ -18,12 +18,8 @@ print("加载用户界面布局... ", end="", flush=True)
_start = perf_counter() _start = perf_counter()
from .screens.about import AboutScreen from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen from .screens.dashboard import DashboardScreen
from .screens.llmchat import LLMChatScreen
from .screens.navigator import NavigatorScreen from .screens.navigator import NavigatorScreen
from .screens.precache import PrecachingScreen from .screens.precache import PrecachingScreen
from .screens.radio import RadioScreen
from .screens.repocreator import RepoCreatorScreen
from .screens.repoeditor import RepoEditorScreen
from .screens.synctool import SyncScreen from .screens.synctool import SyncScreen
_end = perf_counter() _end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -44,14 +40,10 @@ class HeurAMSApp(App):
] ]
SCREENS = { SCREENS = {
"dashboard": DashboardScreen, "dashboard": DashboardScreen,
"repo_creator": RepoCreatorScreen,
"precache_all": PrecachingScreen, "precache_all": PrecachingScreen,
"synctool": SyncScreen, "synctool": SyncScreen,
"about": AboutScreen, "about": AboutScreen,
"navigator": NavigatorScreen, "navigator": NavigatorScreen,
"radio": RadioScreen,
"repo_editor": RepoEditorScreen,
"llmchat": LLMChatScreen,
# "config": ConfigScreen, # "config": ConfigScreen,
} }

View File

@@ -1,5 +1,3 @@
NavigatorScreen { NavigatorScreen {
align: center middle; align: center middle;
} }
@@ -32,33 +30,6 @@ NavigatorScreen {
background: $surface; background: $surface;
} }
/* LLM 聊天界面样式 */
LLMChatScreen {
background: $surface;
}
#chat-container {
height: 100%;
padding: 1;
}
#toolbar {
height: 3;
margin-bottom: 1;
align: center middle;
}
#toolbar Button {
margin: 0 1;
}
#chat-log {
height: 1fr;
border: solid $primary;
padding: 1;
background: $surface;
}
#dashboardtop { #dashboardtop {
height: 4 height: 4
} }

View File

@@ -35,20 +35,22 @@ class AboutScreen(Screen):
about_text = f""" about_text = f"""
# 关于 "潜进" # 关于 "潜进"
版本 {version.ver} {version.stage.capitalize()} 主程序库版本: `{version.ver}-python`
用户界面分支: `Textual TUI (基本用户界面)`
用户界面版本: `{version.ver}`
API 版本代号: `{version.codename.capitalize()}`
开发代号: {version.codename.capitalize()} {version.codename_cn} 一个基于启发式算法与认知科学理论的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划.
一个基于启发式算法的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划.
以 AGPL-3.0 开放源代码, 这直接意味着任何个体直接基于此代码对外或内部提供的应用和服务, 无论本地或网络, 必须向所有用户公开完整修改后的源代码, 且继续沿用 AGPL-3.0 协议. 以 AGPL-3.0 开放源代码, 这直接意味着任何个体直接基于此代码对外或内部提供的应用和服务, 无论本地或网络, 必须向所有用户公开完整修改后的源代码, 且继续沿用 AGPL-3.0 协议.
可在项目主页 https://ams.imwangzhiyu.xyz 获取用户指南, 开发文档与软件更新. 正使用的 TUI 用户界面是 python 版本程序库自带的基本用户界面, 作为基本的全功能前端实现与程序库测试, 如果您想去除它, 请移除程序库根目录中的 interface 文件夹.
如果您觉得这个软件有用, 可以给它添加一个星标 :) 您可在项目主页 https://ams.pluv27.top 获取用户指南, 开发文档与软件更新.
> 此软件, 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力 如果您觉得这个软件有用, 可以在它的源代码仓库给它添加一个星标 :)
> 即使您不是软件开发人员, 我们也欢迎您加入 HeurAMS 的队伍!
> 潜进(HeurAMS), 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力.
> 您可以加入各种语言的翻译团队来翻译软件的界面, 您还可以制作图像、主题、音效, 或者改进软件配套的文档…… > 您可以加入各种语言的翻译团队来翻译软件的界面, 您还可以制作图像、主题、音效, 或者改进软件配套的文档……
> 不管您来自何方, 我们都欢迎您加入社区并做出贡献. > 不管您来自何方, 我们都欢迎您加入社区并做出贡献.
> 我们的共同目标是为人人带来高品质的辅助记忆 & 学习软件. > 我们的共同目标是为人人带来高品质的辅助记忆 & 学习软件.

View File

@@ -22,7 +22,6 @@ from heurams.services.logger import get_logger
from .about import AboutScreen from .about import AboutScreen
from .navigator import NavigatorScreen from .navigator import NavigatorScreen
from .preparation import PreparationScreen from .preparation import PreparationScreen
from .radio import RadioScreen
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -90,7 +89,7 @@ class DashboardScreen(Screen):
is_due = 0 is_due = 0
unit_sum = len(repo) unit_sum = len(repo)
activated_sum = 0 activated_sum = 0
nextdate = 0x3F3F3F3F nextdate = float('inf')
for i in repo.ident_index: for i in repo.ident_index:
nucleon = pt.Nucleon.create_on_nucleonic_data( nucleon = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)

View File

@@ -1,330 +0,0 @@
from pathlib import Path
from typing import Optional
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Input, Label, RichLog, Static
from heurams.context import *
from heurams.services.llm_service import ChatSession, get_chat_manager
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class LLMChatScreen(Screen):
"""LLM 聊天屏幕"""
SUB_TITLE = "语言模型集成"
BINDINGS = [
("q", "go_back", "返回"),
("ctrl+s", "save_session", "保存会话"),
("ctrl+l", "load_session", "加载会话"),
("ctrl+n", "new_session", "新建会话"),
("ctrl+c", "clear_history", "清空历史"),
("escape", "focus_input", "聚焦输入"),
]
def __init__(
self,
session_id: Optional[str] = None,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.session_id = session_id
self.chat_manager = get_chat_manager()
self.current_session: Optional[ChatSession] = None
self.is_streaming = False
def compose(self) -> ComposeResult:
"""组合界面组件"""
yield Header(show_clock=True)
with Container(id="chat-container"):
# 顶部工具栏
with Horizontal(id="toolbar"):
yield Button("新建会话", id="new-session", variant="primary")
yield Button("保存会话", id="save-session", variant="default")
yield Button("加载会话", id="load-session", variant="default")
yield Button("清空历史", id="clear-history", variant="default")
yield Button("设置系统提示", id="set-system-prompt", variant="default")
yield Static(" | ", classes="separator")
yield Label("当前会话:", classes="label")
yield Static(id="current-session-label", classes="session-label")
# 聊天记录显示区域
yield RichLog(
id="chat-log",
wrap=True,
highlight=True,
markup=True,
classes="chat-log",
)
# 输入区域
with Horizontal(id="input-container"):
yield Input(
id="message-input",
placeholder="输入消息... (按 Ctrl+Enter 发送, Esc 聚焦)",
classes="message-input",
)
yield Button(
"发送", id="send-button", variant="primary", classes="send-button"
)
# 状态栏
yield Static(id="status-bar", classes="status-bar")
yield Footer()
def on_mount(self) -> None:
"""挂载组件时初始化"""
# 获取或创建会话
self.current_session = self.chat_manager.get_session(self.session_id)
if self.current_session is None:
self.notify("无法创建 LLM 会话,请检查配置", severity="error")
return
# 更新会话标签
self.query_one("#current-session-label", Static).update(
f"{self.current_session.session_id}"
)
# 加载历史消息到聊天记录
self._display_history()
# 聚焦输入框
self.query_one("#message-input", Input).focus()
# 检查配置
self._check_config()
def _check_config(self):
"""检查 LLM 配置"""
config = config_var.get()
provider_name = config["services"]["llm"]
provider_config = config["providers"]["llm"][provider_name]
if provider_name == "openai":
if not provider_config.get("key") and not provider_config.get("url"):
self.notify(
"未配置 OpenAI API key 或 URL请在 config.toml 中配置 [providers.llm.openai]",
severity="warning",
)
def _display_history(self):
"""显示当前会话的历史消息"""
if not self.current_session:
return
chat_log = self.query_one("#chat-log", RichLog)
chat_log.clear()
for msg in self.current_session.get_history():
role = msg["role"]
content = msg["content"]
if role == "user":
chat_log.write(f"[bold cyan]你:[/bold cyan] {content}")
elif role == "assistant":
chat_log.write(f"[bold green]AI:[/bold green] {content}")
elif role == "system":
# 系统消息不显示在聊天记录中
pass
def _add_message_to_log(self, role: str, content: str):
"""添加消息到聊天记录显示"""
chat_log = self.query_one("#chat-log", RichLog)
if role == "user":
chat_log.write(f"[bold cyan]你:[/bold cyan] {content}")
elif role == "assistant":
chat_log.write(f"[bold green]AI:[/bold green] {content}")
chat_log.scroll_end()
async def on_input_submitted(self, event: Input.Submitted):
"""处理输入提交"""
if event.input.id == "message-input":
await self._send_message()
async def on_button_pressed(self, event: Button.Pressed):
"""处理按钮点击"""
button_id = event.button.id
if button_id == "send-button":
await self._send_message()
elif button_id == "new-session":
self.action_new_session()
elif button_id == "save-session":
self.action_save_session()
elif button_id == "load-session":
self.action_load_session()
elif button_id == "clear-history":
self.action_clear_history()
elif button_id == "set-system-prompt":
self.action_set_system_prompt()
async def _send_message(self):
"""发送当前输入的消息"""
if not self.current_session or self.is_streaming:
return
input_widget = self.query_one("#message-input", Input)
message = input_widget.value.strip()
if not message:
return
# 清空输入框
input_widget.value = ""
# 显示用户消息
self._add_message_to_log("user", message)
# 禁用输入和按钮
self._set_input_state(disabled=True)
self.is_streaming = True
# 更新状态
self.query_one("#status-bar", Static).update("AI 正在思考...")
try:
# 发送消息并获取响应
response = await self.current_session.send_message(message)
# 显示AI响应
self._add_message_to_log("assistant", response)
except Exception as e:
error_msg = f"请求失败: {str(e)}"
logger.error(error_msg)
self._add_message_to_log("assistant", f"[red]{error_msg}[/red]")
self.notify(error_msg, severity="error")
finally:
# 恢复输入和按钮
self._set_input_state(disabled=False)
self.is_streaming = False
self.query_one("#status-bar", Static).update("就绪")
input_widget.focus()
def _set_input_state(self, disabled: bool):
"""设置输入控件状态"""
self.query_one("#message-input", Input).disabled = disabled
self.query_one("#send-button", Button).disabled = disabled
async def action_save_session(self):
"""保存当前会话到文件"""
if not self.current_session:
self.notify("无当前会话", severity="error")
return
# 默认保存到 data/chat_sessions/ 目录
save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions"
save_dir.mkdir(exist_ok=True, parents=True)
file_path = save_dir / f"{self.current_session.session_id}.json"
self.current_session.save_to_file(file_path)
self.notify(f"会话已保存到 {file_path}", severity="information")
async def action_load_session(self):
"""从文件加载会话"""
# 简化实现:加载默认目录下的第一个会话文件
save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions"
if not save_dir.exists():
self.notify(f"目录不存在: {save_dir}", severity="error")
return
session_files = list(save_dir.glob("*.json"))
if not session_files:
self.notify("未找到会话文件", severity="error")
return
# 使用第一个文件(在实际应用中可以让用户选择)
file_path = session_files[0]
try:
# 获取 LLM 提供者
provider_name = config_var.get()["services"]["llm"]
provider_config = config_var.get()["providers"]["llm"][provider_name]
from heurams.providers.llm import providers as prov
llm_provider = prov[provider_name](provider_config)
# 加载会话
self.current_session = ChatSession.load_from_file(file_path, llm_provider)
# 更新聊天管理器
self.chat_manager.sessions[self.current_session.session_id] = (
self.current_session
)
# 更新UI
self.query_one("#current-session-label", Static).update(
f"{self.current_session.session_id}"
)
self._display_history()
self.notify(f"已加载会话: {file_path.name}", severity="information")
except Exception as e:
logger.error("加载会话失败: %s", e)
self.notify(f"加载失败: {str(e)}", severity="error")
async def action_new_session(self):
"""创建新会话"""
# 简单实现使用时间戳作为会话ID
import time
new_session_id = f"session_{int(time.time())}"
self.current_session = self.chat_manager.get_session(new_session_id)
# 更新UI
self.query_one("#current-session-label", Static).update(
f"{self.current_session.session_id}"
)
self._display_history()
self.notify(f"已创建新会话: {new_session_id}", severity="information")
self.query_one("#message-input", Input).focus()
async def action_clear_history(self):
"""清空当前会话历史"""
if not self.current_session:
return
self.current_session.clear_history()
self._display_history()
self.notify("历史已清空", severity="information")
async def action_set_system_prompt(self):
"""设置系统提示词"""
if not self.current_session:
return
# 使用输入框获取新提示词
input_widget = self.query_one("#message-input", Input)
current_value = input_widget.value
# 临时修改输入框提示
input_widget.placeholder = "输入系统提示词... (按 Enter 确认, Esc 取消)"
input_widget.value = self.current_session.system_prompt
# 等待用户输入
self.notify("请输入系统提示词,按 Enter 确认", severity="information")
# 实际应用中需要更复杂的交互,这里简化处理
# 用户手动输入后按 Enter 会触发 on_input_submitted
# 这里我们只修改占位符,实际系统提示词设置需要额外界面
def action_focus_input(self):
"""聚焦到输入框"""
self.query_one("#message-input", Input).focus()
def action_go_back(self):
"""返回上级屏幕"""
self.app.pop_screen()

View File

@@ -24,15 +24,13 @@ class NavigatorScreen(ModalScreen):
SCREENS = [ SCREENS = [
("仪表盘", "dashboard"), ("仪表盘", "dashboard"),
("电台", "radio"),
("语言模型集成", "llmchat"),
# ("创建仓库", "repo_creator"), # ("创建仓库", "repo_creator"),
("缓存管理器", "precache_all"), ("缓存管理器", "precache_all"),
("收藏夹管理器", FavoriteManagerScreen), ("收藏夹", FavoriteManagerScreen),
("配置设置", "config"), # ("配置设置", "config"),
# ("调试日志", "logviewer"),
("同步工具", "synctool"),
("关于此软件", "about"), ("关于此软件", "about"),
("调试日志", "logviewer"),
# ("同步工具", "synctool"),
# ("仓库编辑器", "repo_editor"), # ("仓库编辑器", "repo_editor"),
] ]

View File

@@ -1,217 +0,0 @@
"""用于筛选当日记忆的条目 以音频形式重放"""
""" "前进电台" 界面"""
import os
from pathlib import Path
from typing import List, Optional
from textual.app import ComposeResult
from textual.containers import Container, ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Static
import heurams.kernel.particles as pt
from heurams.kernel.repolib import Repo
from heurams.context import config_var
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
from heurams.services.logger import get_logger
from heurams.services.tts_service import convertor
logger = get_logger(__name__)
class RadioScreen(Screen):
SUB_TITLE = "电台"
BINDINGS = [
("q", "go_back", "返回"),
("space", "toggle_play", "播放/暂停"),
]
# 当前播放的原子索引
current_index = reactive(0)
# 播放状态: 'stopped', 'playing', 'paused'
play_state = reactive("stopped")
def __init__(
self,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self._organizer()
def _organizer(self):
repodirs = Repo.probe_valid_repos_in_dir(Path(config_var.get()['paths']['data']) / 'repo')
repos = list(map(lambda repodir: Repo.create_from_repodir(repodir), repodirs))
for repo in repos:
last_modify = 0.0
for i in repo.ident_index:
e = pt.Electron.create_on_electonic_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
)
last_modify = max(last_modify, e.las())
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="main"):
yield Label("[b]前进电台[/b]", classes="title")
yield Static(f"{len(self.atoms)} 条当日记忆", id="status")
with Container(id="controls"):
yield Button("播放", id="play", variant="success")
yield Button("暂停", id="pause", variant="primary")
yield Button("上一首", id="prev", variant="default")
yield Button("下一首", id="next", variant="default")
yield Button("停止", id="stop", variant="error")
yield ScrollableContainer(id="playlist")
yield Footer()
def on_mount(self) -> None:
"""挂载后更新播放列表显示"""
self._update_playlist()
def _filter_due_atoms(self) -> List[pt.Atom]:
"""筛选当日需要复习的原子(已激活且到期)"""
atoms = []
for ident in self.repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(ident)
)
e = pt.Electron.create_on_electonic_data(
electronic_data=self.repo.electronic_data_lict.get_itemic_unit(ident)
)
a = pt.Atom(n, e, self.repo.orbitic_data)
# 仅选择已激活且到期的原子
if (
a.registry["electron"].is_activated()
and a.registry["electron"].is_due()
):
atoms.append(a)
return atoms
def _update_playlist(self) -> None:
"""更新播放列表显示"""
container = self.query_one("#playlist")
container.remove_children()
for idx, atom in enumerate(self.atoms):
content = atom.registry["nucleon"].get("content", "无内容")
prefix = "" if idx == self.current_index else " "
widget = Static(f"{prefix}{idx+1}. {content[:50]}...")
widget.set_class(idx == self.current_index, "current")
container.mount(widget)
def _get_audio_path(self, atom: pt.Atom) -> Path:
"""返回音频文件路径,若不存在则生成"""
tts_text = atom.registry["nucleon"].get("tts_text", "")
if not tts_text:
tts_text = atom.registry["nucleon"].get("content", "")
voice_dir = Path(config_var.get()["paths"]["data"]) / "cache" / "voice"
voice_dir.mkdir(parents=True, exist_ok=True)
path = voice_dir / f"{get_md5(tts_text)}.wav"
if not path.exists():
convertor(tts_text, path)
return path
async def _play_atom(self, idx: int) -> None:
"""播放指定索引的原子(异步)"""
if idx < 0 or idx >= len(self.atoms):
return
atom = self.atoms[idx]
try:
path = self._get_audio_path(atom)
self._current_path = path
# 在后台线程中播放避免阻塞UI
await self.run_worker(
lambda: play_by_path(path), exclusive=True, thread=True
)
except Exception as e:
logger.error("播放失败: %s", e)
def _stop_playback(self) -> None:
"""停止当前播放"""
if self._play_task and not self._play_task.done():
self._play_task.cancel()
self._play_task = None
self._current_path = None
self.play_state = "stopped"
async def _play_current(self) -> None:
"""播放当前索引的原子"""
self._stop_playback()
self.play_state = "playing"
self._play_task = asyncio.create_task(self._play_atom(self.current_index))
try:
await self._play_task
except asyncio.CancelledError:
pass
finally:
if self.play_state == "playing":
self.play_state = "stopped"
# 按钮事件处理
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id
if button_id == "play":
self.action_toggle_play()
elif button_id == "pause":
self.action_pause()
elif button_id == "prev":
self.action_prev()
elif button_id == "next":
self.action_next()
elif button_id == "stop":
self.action_stop()
# 键盘动作
def action_toggle_play(self) -> None:
if self.play_state == "playing":
self.action_pause()
else:
self.action_play()
def action_play(self) -> None:
if self.play_state != "playing":
if self.play_state == "paused":
# 恢复播放(目前暂停功能简单实现为停止)
self.play_state = "playing"
else:
asyncio.create_task(self._play_current())
def action_pause(self) -> None:
if self.play_state == "playing":
self._stop_playback()
self.play_state = "paused"
def action_stop(self) -> None:
self._stop_playback()
self.play_state = "stopped"
def action_next(self) -> None:
if self.current_index < len(self.atoms) - 1:
self.current_index += 1
self._update_playlist()
if self.play_state == "playing":
asyncio.create_task(self._play_current())
def action_prev(self) -> None:
if self.current_index > 0:
self.current_index -= 1
self._update_playlist()
if self.play_state == "playing":
asyncio.create_task(self._play_current())
def action_go_back(self) -> None:
self._stop_playback()
self.app.pop_screen()
# 响应式更新
def watch_current_index(self, old: int, new: int) -> None:
self._update_playlist()
def watch_play_state(self, old: str, new: str) -> None:
# 更新按钮状态(可在此添加样式变化)
pass

View File

@@ -1,166 +0,0 @@
"""仓库创建向导界面"""
from pathlib import Path
import toml
from textual.app import ComposeResult
from textual.containers import ScrollableContainer
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Input, Label, Markdown, Select
from heurams.context import config_var
from heurams.services.version import ver
class RepoCreatorScreen(Screen):
BINDINGS = [("q", "go_back", "返回")]
SUB_TITLE = "仓库创建向导"
def __init__(self) -> None:
super().__init__(name=None, id=None, classes=None)
def search_templates(self):
from pathlib import Path
from heurams.context import config_var
template_dir = Path(config_var.get()["paths"]["data"]) / "templates"
templates = list()
for i in template_dir.iterdir():
if i.name.endswith(".toml"):
try:
import toml
with open(i, "r") as f:
dic = toml.load(f)
desc = dic["__metadata__.attribution"]["desc"]
templates.append(desc + " (" + i.name + ")")
except Exception as e:
templates.append(f"无描述模板 ({i.name})")
print(e)
return templates
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer(id="vice_container"):
yield Label(f"[b]空白单元集创建向导\n")
yield Markdown(
"> 提示: 你可能注意到当选中文本框时底栏和操作按键绑定将被覆盖 \n只需选中(使用鼠标或 Tab)选择框即可恢复底栏功能"
)
yield Markdown("1. 键入单元集名称")
yield Input(placeholder="单元集名称", id="name_input")
yield Markdown(
"> 单元集名称不应与现有单元集重复. \n> 新的单元集文件将创建在 ./nucleon/你输入的名称.toml"
)
yield Label(f"\n")
yield Markdown("2. 选择单元集模板")
LINES = self.search_templates()
"""带有宏支持的空白单元集 ({ver})
古诗词模板单元集 ({ver})
英语词汇和短语模板单元集 ({ver})
"""
yield Select.from_values(LINES, prompt="选择类型", id="template_select")
yield Markdown("> 新单元集的版本号将和主程序版本保持同步")
yield Label(f"\n")
yield Markdown("3. 输入常见附加元数据 (可选)")
yield Input(placeholder="作者", id="author_input")
yield Input(placeholder="内容描述", id="desc_input")
yield Button(
"新建空白单元集",
id="submit_button",
variant="primary",
classes="start-button",
)
yield Footer()
def on_mount(self):
self.query_one("#submit_button").focus()
def action_go_back(self):
self.app.pop_screen()
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event) -> None:
event.stop()
if event.button.id == "submit_button":
# 获取输入值
name_input = self.query_one("#name_input")
template_select = self.query_one("#template_select")
author_input = self.query_one("#author_input")
desc_input = self.query_one("#desc_input")
name = name_input.value.strip() # type: ignore
author = author_input.value.strip() # type: ignore
desc = desc_input.value.strip() # type: ignore
selected = template_select.value # type: ignore
# 验证
if not name:
self.notify("单元集名称不能为空", severity="error")
return
# 获取配置路径
config = config_var.get()
nucleon_dir = Path(config["paths"]["nucleon_dir"])
template_dir = Path(config["paths"]["template_dir"])
# 检查文件是否已存在
nucleon_path = nucleon_dir / f"{name}.toml"
if nucleon_path.exists():
self.notify(f"单元集 '{name}' 已存在", severity="error")
return
# 确定模板文件
if selected is None:
self.notify("请选择一个模板", severity="error")
return
# selected 是描述字符串, 格式如 "描述 (filename.toml)"
# 提取文件名
import re
match = re.search(r"\(([^)]+)\)$", selected)
if not match:
self.notify("模板选择格式无效", severity="error")
return
template_filename = match.group(1)
template_path = template_dir / template_filename
if not template_path.exists():
self.notify(f"模板文件不存在: {template_filename}", severity="error")
return
# 加载模板
try:
with open(template_path, "r", encoding="utf-8") as f:
template_data = toml.load(f)
except Exception as e:
self.notify(f"加载模板失败: {e}", severity="error")
return
# 更新元数据
metadata = template_data.get("__metadata__", {})
attribution = metadata.get("attribution", {})
if author:
attribution["author"] = author
if desc:
attribution["desc"] = desc
attribution["name"] = name
# 可选: 设置版本
attribution["version"] = ver
metadata["attribution"] = attribution
template_data["__metadata__"] = metadata
# 确保 nucleon_dir 存在
nucleon_dir.mkdir(parents=True, exist_ok=True)
# 写入新文件
try:
with open(nucleon_path, "w", encoding="utf-8") as f:
toml.dump(template_data, f)
except Exception as e:
self.notify(f"保存单元集失败: {e}", severity="error")
return
self.notify(f"单元集 '{name}' 创建成功")
self.app.pop_screen()

View File

@@ -1,267 +0,0 @@
"""仓库编辑器, 使用TextArea控件等实现仓库配置编辑"""
import json
from pathlib import Path
from typing import Optional
import toml
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, ScrollableContainer, Vertical
from textual.reactive import reactive
from textual.screen import Screen
from textual.widgets import (
Button,
Footer,
Header,
Label,
ListItem,
ListView,
Static,
TextArea,
)
from heurams.context import config_var
from heurams.kernel.repolib import Repo
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class RepoEditorScreen(Screen):
"""仓库编辑器屏幕"""
SUB_TITLE = "仓库编辑器"
BINDINGS = [
("q", "go_back", "返回"),
("s", "save_file", "保存"),
("r", "reload_file", "重载"),
("d", "toggle_dark", ""),
]
# 当前选择的仓库路径
selected_repo_path: reactive[Optional[Path]] = reactive(None)
# 当前选择的文件名
selected_filename: reactive[Optional[str]] = reactive(None)
# 文件内容
file_content: reactive[str] = reactive("")
def __init__(
self,
repo: Optional[Repo] = None,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.repo = repo
self.repo_dir: Optional[Path] = None
self.file_list = []
if repo is not None and repo.source is not None:
self.repo_dir = repo.source
self._load_file_list()
# selected_repo_path 将在 on_mount 中设置避免触发watch时组件未就绪
def _load_file_list(self) -> None:
"""加载仓库目录下的文件列表"""
if self.repo_dir is None:
return
self.file_list = []
for fname in Repo.file_mapping.values():
fpath = self.repo_dir / fname
if fpath.exists():
self.file_list.append(fname)
# 也可能存在其他文件,但暂时只支持标准文件
self.file_list.sort()
def compose(self) -> ComposeResult:
"""组合界面组件"""
yield Header(show_clock=True)
with Container(id="main_container"):
with Horizontal(id="top_panel"):
# 左侧: 仓库选择
with Vertical(id="repo_selector", classes="panel"):
yield Label("仓库列表", classes="panel-title")
yield ListView(
*[
ListItem(Label(repo_dir.name))
for repo_dir in self._get_repo_dirs()
],
id="repo_list",
classes="list-view",
)
# 中间: 文件列表
with Vertical(id="file_selector", classes="panel"):
yield Label("文件列表", classes="panel-title")
yield ListView(
*[ListItem(Label(fname)) for fname in self.file_list],
id="file_list",
classes="list-view",
)
# 右侧: 编辑区域
with Vertical(id="editor_panel", classes="panel"):
yield Label("编辑文件", classes="panel-title")
yield TextArea(
id="text_editor",
language="plaintext",
classes="text-editor",
)
with Horizontal(id="button_bar"):
yield Button("保存", id="save_button", variant="primary")
yield Button("重载", id="reload_button", variant="default")
yield Button("返回", id="back_button", variant="error")
yield Footer()
def _get_repo_dirs(self) -> list[Path]:
"""获取data/repo/下所有有效仓库目录"""
repo_root = Path(config_var.get()["paths"]["data"]) / "repo"
repo_dirs = []
if repo_root.exists():
for entry in repo_root.iterdir():
if entry.is_dir():
# 检查是否存在 manifest.toml
if (entry / "manifest.toml").exists():
repo_dirs.append(entry)
return repo_dirs
def on_mount(self) -> None:
"""挂载组件时初始化"""
# 如果已有仓库,设置 selected_repo_path 以触发watch此时组件已就绪
if self.repo_dir is not None:
self.selected_repo_path = self.repo_dir
# 焦点放在仓库列表
self.query_one("#repo_list", ListView).focus()
def watch_selected_repo_path(
self, old_path: Optional[Path], new_path: Optional[Path]
) -> None:
"""当选择的仓库路径变化时,加载文件列表"""
if new_path is None:
self.file_list = []
self.selected_filename = None
self.file_content = ""
return
self.repo_dir = new_path
self._load_file_list()
# 如果组件已挂载更新UI
if self.is_mounted:
file_list_view = self.query_one("#file_list", ListView)
file_list_view.clear()
for fname in self.file_list:
file_list_view.append(ListItem(Label(fname)))
# 清空编辑器
self.query_one("#text_editor", TextArea).text = ""
self.selected_filename = None
def watch_selected_filename(
self, old_name: Optional[str], new_name: Optional[str]
) -> None:
"""当选择的文件名变化时,加载文件内容"""
if new_name is None or self.repo_dir is None:
self.file_content = ""
return
file_path = self.repo_dir / new_name
if not file_path.exists():
self.notify(f"文件不存在: {new_name}", severity="error")
return
try:
content = file_path.read_text(encoding="utf-8")
self.file_content = content
# 如果组件已挂载,更新编辑器
if self.is_mounted:
editor = self.query_one("#text_editor", TextArea)
editor.text = content
# 根据文件后缀设置语言
if new_name.endswith(".toml"):
editor.language = "toml"
elif new_name.endswith(".json"):
editor.language = "json"
else:
editor.language = "plaintext"
except Exception as e:
logger.error(f"读取文件失败: {e}")
self.notify(f"读取文件失败: {e}", severity="error")
def watch_file_content(self, old_content: str, new_content: str) -> None:
"""当文件内容变化时更新编辑器(仅当外部改变时)"""
# 目前不需要做任何事情,因为编辑器内容已绑定
pass
def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件"""
if not isinstance(event.item, ListItem):
return
list_id = event.list_view.id
selected_label = event.item.query_one(Label)
selected_text = str(selected_label.render())
if list_id == "repo_list":
# 用户选择了仓库
repo_root = Path(config_var.get()["paths"]["data"]) / "repo"
selected_dir = repo_root / selected_text
if selected_dir.exists():
self.selected_repo_path = selected_dir
elif list_id == "file_list":
# 用户选择了文件
if self.repo_dir is None:
self.notify("请先选择仓库", severity="warning")
return
self.selected_filename = selected_text
def on_button_pressed(self, event) -> None:
"""处理按钮点击事件"""
event.stop()
if event.button.id == "save_button":
self.action_save_file()
elif event.button.id == "reload_button":
self.action_reload_file()
elif event.button.id == "back_button":
self.action_go_back()
def action_save_file(self) -> None:
"""保存当前编辑的文件"""
if self.repo_dir is None or self.selected_filename is None:
self.notify("未选择仓库或文件", severity="warning")
return
file_path = self.repo_dir / self.selected_filename
editor = self.query_one("#text_editor", TextArea)
new_content = editor.text
# 验证格式
try:
if self.selected_filename.endswith(".toml"):
toml.loads(new_content) # 验证TOML
elif self.selected_filename.endswith(".json"):
json.loads(new_content) # 验证JSON
except Exception as e:
self.notify(f"格式错误: {e}", severity="error")
return
# 写入文件
try:
file_path.write_text(new_content, encoding="utf-8")
self.notify("保存成功", severity="information")
except Exception as e:
logger.error(f"保存文件失败: {e}")
self.notify(f"保存文件失败: {e}", severity="error")
def action_reload_file(self) -> None:
"""重新加载当前文件(放弃修改)"""
if self.repo_dir is None or self.selected_filename is None:
self.notify("未选择仓库或文件", severity="warning")
return
file_path = self.repo_dir / self.selected_filename
try:
content = file_path.read_text(encoding="utf-8")
editor = self.query_one("#text_editor", TextArea)
editor.text = content
self.notify("已重载", severity="information")
except Exception as e:
logger.error(f"重载文件失败: {e}")
self.notify(f"重载文件失败: {e}", severity="error")
def action_go_back(self) -> None:
"""返回上一屏幕"""
self.app.pop_screen()
def action_toggle_dark(self) -> None:
"""切换暗色模式"""
self.app.dark = not self.app.dark

View File

@@ -1,2 +1,3 @@
# Kernel - HeurAMS 核心 # Kernel - HeurAMS 核心
记忆规划相关算法与数据结构, 可脱离业务层 记忆规划相关算法与数据结构, 可脱离业务层

View File

@@ -32,7 +32,7 @@ class Atom:
default_runtime = { default_runtime = {
"locked": False, "locked": False,
"min_rate": 0x3F3F3F3F, "min_rate": float('inf'),
"new_activation": False, "new_activation": False,
} }

View File

@@ -1,16 +1,24 @@
# Reactor - 记忆流程状态机模块 # Reactor - 记忆流程状态机模块
Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实现是解耦的, 以便后期与其他框架的适配. Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实现是解耦的, 以便后期与其他框架的适配.
得益于 Pickle, 状态机模块支持快照! 得益于 Pickle, 状态机模块支持快照!
## Phaser - 全局阶段控制器 ## Phaser - 全局阶段控制器
在一次队列记忆流程中, Phaser 代表记忆流程本身. 在一次队列记忆流程中, Phaser 代表记忆流程本身.
### 属性 ### 属性
#### 状态属性 #### 状态属性
其有状态属性: 其有状态属性:
- unsure - 用于初始化 - unsure - 用于初始化
- *quick_review - 复习逾期的单元 - *quick_review - 复习逾期的单元
- *recognition - 辨识新单元 - *recognition - 辨识新单元
- *final_review - 复习所有逾期的和新辨认的单元 - *final_review - 复习所有逾期的和新辨认的单元
- finished - 表示完成 - finished - 表示完成
> 逾期的: 指 SM-2 算法间隔显示应该复习的单元 > 逾期的: 指 SM-2 算法间隔显示应该复习的单元
带 * 的属性表示实际的记忆阶段, 由 repo 中 schedule.toml 中 schedule 列表显式声明, 运行过程中可以选择性执行, "空的" Procession 会被直接跳过. 带 * 的属性表示实际的记忆阶段, 由 repo 中 schedule.toml 中 schedule 列表显式声明, 运行过程中可以选择性执行, "空的" Procession 会被直接跳过.
@@ -18,106 +26,148 @@ Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实
在初始化 Procession 时, 每个 Procession 被赋予一个不重复的状态属性 作为"阶段状态"属性, 以此标识 Procession 的阶段属性, 因为每个 Procession 管理一个阶段下的复习进程. 在初始化 Procession 时, 每个 Procession 被赋予一个不重复的状态属性 作为"阶段状态"属性, 以此标识 Procession 的阶段属性, 因为每个 Procession 管理一个阶段下的复习进程.
你可以用 state 属性获取 Phaser 的当前状态. 你可以用 state 属性获取 Phaser 的当前状态.
#### Procession 属性 #### Procession 属性
储存一个顺序列表, 保存所有构造的 Procession. 储存一个顺序列表, 保存所有构造的 Procession.
顺序与 repo 中 schedule.toml 中 schedule 列表中的顺序完全相同 顺序与 repo 中 schedule.toml 中 schedule 列表中的顺序完全相同
### 初始化 ### 初始化
Phaser 接受一个存储 Atom 对象的列表, 作为组织记忆流程的材料 Phaser 接受一个存储 Atom 对象的列表, 作为组织记忆流程的材料
在内部, 根据是否激活将其分为 new_atoms 与 old_atoms. 在内部, 根据是否激活将其分为 new_atoms 与 old_atoms.
因此, 如果你传入的列表中有算法上"无所事事"的 Atom, 流程会对其进行"加强复习" 因此, 如果你传入的列表中有算法上"无所事事"的 Atom, 流程会对其进行"加强复习"
由此创建 Procession. 由此创建 Procession.
### 直接输出呈现形式 ### 直接输出呈现形式
Phaser 的 __repr__ 定义了此对象"官方的显示"用作直观的调试. Phaser 的 __repr__ 定义了此对象"官方的显示"用作直观的调试.
其以 ascii 表格形式输出, 格式也符合 markdown 表格规范, 你可以直接复制到 markdown. 其以 ascii 表格形式输出, 格式也符合 markdown 表格规范, 你可以直接复制到 markdown.
示例: 示例:
```text ```text
| Type | State | Processions | Current Procession | | Type | State | Processions | Current Procession |
|:-------|:--------|:-----------------------|:---------------------| |:-------|:--------|:-----------------------|:---------------------|
| Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 | | Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 |
``` ```
| Type | State | Processions | Current Procession | | Type | State | Processions | Current Procession |
|:-------|:--------|:-----------------------|:---------------------| |:-------|:--------|:-----------------------|:---------------------|
| Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 | | Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 |
### 方法 ### 方法
作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法. 作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法.
除此之外, 它也拥有一些其他方法. 除此之外, 它也拥有一些其他方法.
#### current_procession(self) #### current_procession(self)
用于查询当前的 Procession, 并且根据当前 Procession 更新自身状态. 用于查询当前的 Procession, 并且根据当前 Procession 更新自身状态.
返回一个 Procession 对象, 是当前阶段的 Procession. 返回一个 Procession 对象, 是当前阶段的 Procession.
内部运作是返回第一个状态不为 finished 的 Procession, 并将自身状态变更为 Procession 的"阶段状态"属性 内部运作是返回第一个状态不为 finished 的 Procession, 并将自身状态变更为 Procession 的"阶段状态"属性
若所有 Procession 都已完成, 将返回一个"阶段状态"为 finished 的 Procession 占位符对象(它不在 procession 属性中), 并更新自身状态为 finished. 若所有 Procession 都已完成, 将返回一个"阶段状态"为 finished 的 Procession 占位符对象(它不在 procession 属性中), 并更新自身状态为 finished.
## Procession - 阶段管理器 ## Procession - 阶段管理器
### 属性 ### 属性
#### 状态属性 #### 状态属性
其有状态属性: 其有状态属性:
- active - 标识未完成, 初始化的默认属性 - active - 标识未完成, 初始化的默认属性
- finished - 完成了 - finished - 完成了
#### 其他属性 #### 其他属性
- current_atom: 当前记忆原子的引用 - current_atom: 当前记忆原子的引用
- atoms: 队列中所有原子列表 - atoms: 队列中所有原子列表
- cursor: 指针, 是当前原子在 atoms 列表中的索引 - cursor: 指针, 是当前原子在 atoms 列表中的索引
- phase: "阶段属性" - phase: "阶段属性"
> 注意区分 "Phaser" 和 "Phase", 其中 "Phase" 表示 "Phaser State". > 注意区分 "Phaser" 和 "Phase", 其中 "Phase" 表示 "Phaser State".
- name_: 阶段的命名 - name_: 阶段的命名
- state: 当前状态属性 - state: 当前状态属性
### 初始化 ### 初始化
接受一个 atoms 列表与 phase_state (PhaserState Enum 类型)对象 接受一个 atoms 列表与 phase_state (PhaserState Enum 类型)对象
### 直接输出呈现形式 ### 直接输出呈现形式
同 Phaser, 但显示数据有所不同 同 Phaser, 但显示数据有所不同
与 Phaser 不同, Procession 显示队列会对过长的 atom.ident 进行缩略(末尾 `>` 符号) 与 Phaser 不同, Procession 显示队列会对过长的 atom.ident 进行缩略(末尾 `>` 符号)
```text ```text
| Type | Name | State | Progress | Queue | Current Atom | | Type | Name | State | Progress | Queue | Current Atom |
|:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------| |:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------|
| Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, | | Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, |
``` ```
| Type | Name | State | Progress | Queue | Current Atom | | Type | Name | State | Progress | Queue | Current Atom |
|:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------| |:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------|
| Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, | | Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, |
### 方法 ### 方法
作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法. 作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法.
除此之外, 它也拥有一些其他方法. 除此之外, 它也拥有一些其他方法.
#### forward(self, step=1) #### forward(self, step=1)
移动 cursor 并依情况更新 current_atom 和状态属性 移动 cursor 并依情况更新 current_atom 和状态属性
无论 Procession 是否处于完成状态, forward 操作都是可逆的, 你可以传入负数, 此时已完成的 Procession 会自动"重启". 无论 Procession 是否处于完成状态, forward 操作都是可逆的, 你可以传入负数, 此时已完成的 Procession 会自动"重启".
#### append(self, atom=None) #### append(self, atom=None)
追加(回忆失败的)原子(默认为当前原子, 传入 None 会自动转化为当前原子)到队列末端 追加(回忆失败的)原子(默认为当前原子, 传入 None 会自动转化为当前原子)到队列末端
如果这个原子已经处于队列末端, 不会重复追加, 除非队列只剩下这个原子还没完成(此时最多重复两个) 如果这个原子已经处于队列末端, 不会重复追加, 除非队列只剩下这个原子还没完成(此时最多重复两个)
#### process(self) #### process(self)
返回 cursor 值 返回 cursor 值
#### __len__(self) #### __len__(self)
返回剩余原子量(而不是原子总量) 返回剩余原子量(而不是原子总量)
可以使用 len 函数调用 可以使用 len 函数调用
获取原子总量请用 len(obj.atoms), 或者 total_length(self) 方法 获取原子总量请用 len(obj.atoms), 或者 total_length(self) 方法
#### total_length(self) #### total_length(self)
返回队列原子总量 返回队列原子总量
#### is_empty(self) #### is_empty(self)
判断是否为空队列(传入原子列表对象是空列表的队列) 判断是否为空队列(传入原子列表对象是空列表的队列)
#### get_fission(self) #### get_fission(self)
获取当前原子的 Fission 对象, 用于单原子调度展开 获取当前原子的 Fission 对象, 用于单原子调度展开
## Fission - 单原子调度控制器 ## Fission - 单原子调度控制器
### 属性 ### 属性
#### 状态属性 #### 状态属性
- exammode: 测试模式(默认) - exammode: 测试模式(默认)
- retronly: 仅回顾模式 - retronly: 仅回顾模式
#### 其他属性 #### 其他属性
- cursor - cursor
- atom - atom
- current_puzzle - current_puzzle
- orbital_schedule - orbital_schedule
- orbital_puzzles - orbital_puzzles
- puzzles - puzzles
### 初始化 ### 初始化
接受 atom 对象和 phase 参数 接受 atom 对象和 phase 参数
### 方法 ### 方法
#### get_puzzles(self) #### get_puzzles(self)

View File

@@ -74,7 +74,7 @@ class Fission(Machine):
self.current_puzzle_inf = self.puzzles_inf[0] self.current_puzzle_inf = self.puzzles_inf[0]
for i in range(len(self.puzzles_inf)): for i in range(len(self.puzzles_inf)):
self.min_ratings.append(0x3F3F3F3F) self.min_ratings.append(float('inf'))
Machine.__init__( Machine.__init__(
self, self,

View File

@@ -1,3 +0,0 @@
class Navi:
def __init__(self, init) -> None:
pass

View File

@@ -1,4 +1,4 @@
# 音频服务 """音频服务"""
from typing import Callable from typing import Callable
from heurams.context import config_var from heurams.context import config_var

View File

@@ -1,4 +1,4 @@
# 配置文件服务 """配置文件服务"""
import pathlib import pathlib
import typing import typing

View File

@@ -1,228 +0,0 @@
"""LLM 聊天服务"""
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from heurams.context import config_var
from heurams.providers.llm import providers as prov
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class ChatSession:
"""聊天会话,管理单个对话的历史和参数"""
def __init__(
self, session_id: str, llm_provider, system_prompt: str = "", **default_params
):
"""初始化聊天会话
Args:
session_id: 会话唯一标识符
llm_provider: LLM 提供者实例
system_prompt: 系统提示词
**default_params: 默认参数temperature, max_tokens, model 等)
"""
self.session_id = session_id
self.llm_provider = llm_provider
self.system_prompt = system_prompt
self.default_params = default_params
# 消息历史
self.messages: List[Dict[str, str]] = []
if system_prompt:
self.messages.append({"role": "system", "content": system_prompt})
logger.debug("创建聊天会话: id=%s", session_id)
def add_message(self, role: str, content: str):
"""添加消息到历史"""
self.messages.append({"role": role, "content": content})
logger.debug(
"会话 %s 添加消息: role=%s, length=%d", self.session_id, role, len(content)
)
def clear_history(self):
"""清空消息历史(保留系统提示)"""
self.messages = []
if self.system_prompt:
self.messages.append({"role": "system", "content": self.system_prompt})
logger.debug("会话 %s 清空历史", self.session_id)
def set_system_prompt(self, prompt: str):
"""设置系统提示词"""
self.system_prompt = prompt
# 更新消息历史中的系统消息
if self.messages and self.messages[0]["role"] == "system":
self.messages[0]["content"] = prompt
elif prompt:
self.messages.insert(0, {"role": "system", "content": prompt})
logger.debug("会话 %s 设置系统提示: length=%d", self.session_id, len(prompt))
async def send_message(self, message: str, **override_params) -> str:
"""发送消息并获取响应
Args:
message: 用户消息内容
**override_params: 覆盖默认参数
Returns:
模型响应内容
"""
# 添加用户消息
self.add_message("user", message)
# 合并参数
params = {**self.default_params, **override_params}
# 发送请求
logger.debug("会话 %s 发送消息: length=%d", self.session_id, len(message))
response = await self.llm_provider.chat(self.messages, **params)
# 添加助手响应
self.add_message("assistant", response)
return response
async def send_message_stream(self, message: str, **override_params):
"""流式发送消息
Args:
message: 用户消息内容
**override_params: 覆盖默认参数
Yields:
流式响应的文本块
"""
# 添加用户消息
self.add_message("user", message)
# 合并参数
params = {**self.default_params, **override_params}
# 发送流式请求
logger.debug("会话 %s 发送流式消息: length=%d", self.session_id, len(message))
full_response = ""
async for chunk in self.llm_provider.chat_stream(self.messages, **params):
yield chunk
full_response += chunk
# 添加完整的助手响应到历史
self.add_message("assistant", full_response)
def get_history(self) -> List[Dict[str, str]]:
"""获取消息历史(不包括系统消息)"""
# 返回用户和助手的消息,可选排除系统消息
return [msg for msg in self.messages if msg["role"] != "system"]
def save_to_file(self, file_path: Path):
"""保存会话到文件"""
data = {
"session_id": self.session_id,
"system_prompt": self.system_prompt,
"default_params": self.default_params,
"messages": self.messages,
}
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug("会话 %s 保存到: %s", self.session_id, file_path)
@classmethod
def load_from_file(cls, file_path: Path, llm_provider):
"""从文件加载会话"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
session = cls(
session_id=data["session_id"],
llm_provider=llm_provider,
system_prompt=data.get("system_prompt", ""),
**data.get("default_params", {})
)
session.messages = data["messages"]
logger.debug("从文件加载会话: %s", file_path)
return session
class ChatManager:
"""聊天管理器,管理多个会话"""
def __init__(self):
self.sessions: Dict[str, ChatSession] = {}
self.default_session_id = "default"
logger.debug("聊天管理器初始化完成")
def get_session(
self,
session_id: Optional[str] = None,
create_if_missing: bool = True,
**session_params
) -> Optional[ChatSession]:
"""获取或创建聊天会话
Args:
session_id: 会话标识符None 则使用默认会话
create_if_missing: 如果会话不存在则创建
**session_params: 传递给 ChatSession 的参数
Returns:
聊天会话实例,如果不存在且不创建则返回 None
"""
if session_id is None:
session_id = self.default_session_id
if session_id in self.sessions:
return self.sessions[session_id]
if create_if_missing:
# 获取 LLM 提供者
provider_name = config_var.get()["services"]["llm"]
provider_config = config_var.get()["providers"]["llm"][provider_name]
llm_provider = prov[provider_name](provider_config)
session = ChatSession(
session_id=session_id, llm_provider=llm_provider, **session_params
)
self.sessions[session_id] = session
logger.debug("创建新会话: id=%s", session_id)
return session
return None
def delete_session(self, session_id: str):
"""删除会话"""
if session_id in self.sessions:
del self.sessions[session_id]
logger.debug("删除会话: id=%s", session_id)
def list_sessions(self) -> List[str]:
"""列出所有会话ID"""
return list(self.sessions.keys())
# 全局聊天管理器实例
_chat_manager: Optional[ChatManager] = None
def get_chat_manager() -> ChatManager:
"""获取全局聊天管理器实例"""
global _chat_manager
if _chat_manager is None:
_chat_manager = ChatManager()
logger.debug("创建全局聊天管理器")
return _chat_manager
def create_chat_session(
session_id: Optional[str] = None, **session_params
) -> ChatSession:
"""创建或获取聊天会话(便捷函数)"""
manager = get_chat_manager()
return manager.get_session(session_id, True, **session_params)
logger.debug("LLM 服务初始化完成")

View File

@@ -5,7 +5,7 @@ logger = get_logger(__name__)
ver = "0.5.0" ver = "0.5.0"
stage = "prototype" stage = "prototype"
codename = "fulcrom" codename = "fulcrum"
codename_cn = "支点" codename_cn = "支点"
logger.info("HeurAMS 版本: %s (%s), 阶段: %s", ver, codename, stage) logger.info("HeurAMS 版本: %s (%s), 阶段: %s", ver, codename, stage)