From 279a78f7cebad87c6c20b346a414735091193a92 Mon Sep 17 00:00:00 2001 From: pluvium27 Date: Thu, 16 Apr 2026 13:20:12 +0800 Subject: [PATCH] =?UTF-8?q?style:=20=E7=A7=BB=E9=99=A4=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=E4=B8=8D=E5=BD=93=E7=9A=84=E9=83=A8=E5=88=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/config/config.toml | 0 src/heurams/context.py | 20 +- src/heurams/interface/__init__.py | 8 - src/heurams/interface/css/main.tcss | 29 -- src/heurams/interface/screens/about.py | 20 +- src/heurams/interface/screens/dashboard.py | 3 +- src/heurams/interface/screens/llmchat.py | 330 ------------------ src/heurams/interface/screens/navigator.py | 10 +- src/heurams/interface/screens/radio.py | 217 ------------ src/heurams/interface/screens/repocreator.py | 166 --------- src/heurams/interface/screens/repoeditor.py | 267 -------------- src/heurams/kernel/README.md | 3 +- src/heurams/kernel/particles/atom.py | 2 +- src/heurams/kernel/reactor/README.md | 54 ++- src/heurams/kernel/reactor/fission.py | 2 +- src/heurams/kernel/repolib/navi.py | 3 - src/heurams/services/audio_service.py | 2 +- src/heurams/services/config.py | 2 +- src/heurams/services/llm_service.py | 228 ------------ src/heurams/services/sync_service.py | 0 src/heurams/services/tts_service.py | 2 +- src/heurams/services/version.py | 2 +- 22 files changed, 87 insertions(+), 1283 deletions(-) rename src/heurams/interface/screens/configure.py => examples/data/config/config.toml (100%) delete mode 100644 src/heurams/interface/screens/llmchat.py delete mode 100644 src/heurams/interface/screens/radio.py delete mode 100644 src/heurams/interface/screens/repocreator.py delete mode 100644 src/heurams/interface/screens/repoeditor.py delete mode 100644 src/heurams/kernel/repolib/navi.py delete mode 100644 src/heurams/services/llm_service.py delete mode 100644 src/heurams/services/sync_service.py diff --git a/src/heurams/interface/screens/configure.py b/examples/data/config/config.toml similarity index 100% rename from src/heurams/interface/screens/configure.py rename to examples/data/config/config.toml diff --git a/src/heurams/context.py b/src/heurams/context.py index d4d4521..c4c7af8 100644 --- a/src/heurams/context.py +++ b/src/heurams/context.py @@ -9,29 +9,31 @@ from contextvars import ContextVar from heurams.services.config import ConfigFile from heurams.services.logger import get_logger -# 默认配置文件路径规定: 以包目录为准 -# 用户配置文件路径规定: 以运行目录为准 -# 数据文件路径规定: 以运行目录为准 +# 默认数据目录, 以包目录下的 data 为准 +# 用户数据目录, 以运行目录下的 data 为准 + +rootdir: pathlib.Path = pathlib.Path(__file__).parent +"""包目录路径, 也就是 heurams 目录.""" -rootdir = pathlib.Path(__file__).parent workdir = pathlib.Path.cwd() -#print(f"项目根目录: {rootdir}") -#print(f"工作目录: {workdir}") +"""工作目录路径.""" + logger = get_logger(__name__) -logger.debug(f"项目根目录: {rootdir}") +logger.debug(f"包目录: {rootdir}") logger.debug(f"工作目录: {workdir}") (workdir / "data" / "config").mkdir(parents=True, exist_ok=True) -config_var: ContextVar[ConfigFile] = ContextVar( +config_var: ContextVar[ConfigFile].get = ContextVar( "config_var", default=ConfigFile(workdir / "data" / "config" / "config.toml"), ) +"""配置对象的全局引用对象.""" class ConfigContext: """ 功能完备的上下文管理器 - 用于临时切换配置的作用域, 支持嵌套使用 + 用于临时切换配置引用对象的作用域, 支持嵌套使用 Example: >>> with ConfigContext(test_config): diff --git a/src/heurams/interface/__init__.py b/src/heurams/interface/__init__.py index 2bb3171..f02130e 100644 --- a/src/heurams/interface/__init__.py +++ b/src/heurams/interface/__init__.py @@ -18,12 +18,8 @@ print("加载用户界面布局... ", end="", flush=True) _start = perf_counter() from .screens.about import AboutScreen from .screens.dashboard import DashboardScreen -from .screens.llmchat import LLMChatScreen from .screens.navigator import NavigatorScreen from .screens.precache import PrecachingScreen -from .screens.radio import RadioScreen -from .screens.repocreator import RepoCreatorScreen -from .screens.repoeditor import RepoEditorScreen from .screens.synctool import SyncScreen _end = perf_counter() print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") @@ -44,14 +40,10 @@ class HeurAMSApp(App): ] SCREENS = { "dashboard": DashboardScreen, - "repo_creator": RepoCreatorScreen, "precache_all": PrecachingScreen, "synctool": SyncScreen, "about": AboutScreen, "navigator": NavigatorScreen, - "radio": RadioScreen, - "repo_editor": RepoEditorScreen, - "llmchat": LLMChatScreen, # "config": ConfigScreen, } diff --git a/src/heurams/interface/css/main.tcss b/src/heurams/interface/css/main.tcss index 347966d..ac93bce 100644 --- a/src/heurams/interface/css/main.tcss +++ b/src/heurams/interface/css/main.tcss @@ -1,5 +1,3 @@ - - NavigatorScreen { align: center middle; } @@ -32,33 +30,6 @@ NavigatorScreen { background: $surface; } -/* LLM 聊天界面样式 */ -LLMChatScreen { - background: $surface; -} - -#chat-container { - height: 100%; - padding: 1; -} - -#toolbar { - height: 3; - margin-bottom: 1; - align: center middle; -} - -#toolbar Button { - margin: 0 1; -} - -#chat-log { - height: 1fr; - border: solid $primary; - padding: 1; - background: $surface; -} - #dashboardtop { height: 4 } diff --git a/src/heurams/interface/screens/about.py b/src/heurams/interface/screens/about.py index beff979..f7d6b19 100644 --- a/src/heurams/interface/screens/about.py +++ b/src/heurams/interface/screens/about.py @@ -35,24 +35,26 @@ class AboutScreen(Screen): about_text = f""" # 关于 "潜进" -版本 {version.ver} {version.stage.capitalize()} - -开发代号: {version.codename.capitalize()} {version.codename_cn} +主程序库版本: `{version.ver}-python` +用户界面分支: `Textual TUI (基本用户界面)` +用户界面版本: `{version.ver}` +API 版本代号: `{version.codename.capitalize()}` -一个基于启发式算法的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划. +一个基于启发式算法与认知科学理论的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划. 以 AGPL-3.0 开放源代码, 这直接意味着任何个体直接基于此代码对外或内部提供的应用和服务, 无论本地或网络, 必须向所有用户公开完整修改后的源代码, 且继续沿用 AGPL-3.0 协议. -您可在项目主页 https://ams.imwangzhiyu.xyz 获取用户指南, 开发文档与软件更新. +您正使用的 TUI 用户界面是 python 版本程序库自带的基本用户界面, 作为基本的全功能前端实现与程序库测试, 如果您想去除它, 请移除程序库根目录中的 interface 文件夹. -如果您觉得这个软件有用, 可以给它添加一个星标 :) +您可在项目主页 https://ams.pluv27.top 获取用户指南, 开发文档与软件更新. -> 此软件, 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力 -> 即使您不是软件开发人员, 我们也欢迎您加入 HeurAMS 的队伍! +如果您觉得这个软件有用, 可以在它的源代码仓库给它添加一个星标 :) + +> 潜进(HeurAMS), 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力. > 您可以加入各种语言的翻译团队来翻译软件的界面, 您还可以制作图像、主题、音效, 或者改进软件配套的文档…… > 不管您来自何方, 我们都欢迎您加入社区并做出贡献. > 我们的共同目标是为人人带来高品质的辅助记忆 & 学习软件. -> 您的慷慨支持, 我们必当涌泉相报. +> 您的慷慨支持, 我们必当涌泉相报. 开发人员列表: diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index cf8b9ce..3827214 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -22,7 +22,6 @@ from heurams.services.logger import get_logger from .about import AboutScreen from .navigator import NavigatorScreen from .preparation import PreparationScreen -from .radio import RadioScreen logger = get_logger(__name__) @@ -90,7 +89,7 @@ class DashboardScreen(Screen): is_due = 0 unit_sum = len(repo) activated_sum = 0 - nextdate = 0x3F3F3F3F + nextdate = float('inf') for i in repo.ident_index: nucleon = pt.Nucleon.create_on_nucleonic_data( nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) diff --git a/src/heurams/interface/screens/llmchat.py b/src/heurams/interface/screens/llmchat.py deleted file mode 100644 index bd54e04..0000000 --- a/src/heurams/interface/screens/llmchat.py +++ /dev/null @@ -1,330 +0,0 @@ -from pathlib import Path -from typing import Optional - -from textual.app import ComposeResult -from textual.containers import Container, Horizontal -from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Input, Label, RichLog, Static - -from heurams.context import * -from heurams.services.llm_service import ChatSession, get_chat_manager -from heurams.services.logger import get_logger - -logger = get_logger(__name__) - - -class LLMChatScreen(Screen): - """LLM 聊天屏幕""" - - SUB_TITLE = "语言模型集成" - BINDINGS = [ - ("q", "go_back", "返回"), - ("ctrl+s", "save_session", "保存会话"), - ("ctrl+l", "load_session", "加载会话"), - ("ctrl+n", "new_session", "新建会话"), - ("ctrl+c", "clear_history", "清空历史"), - ("escape", "focus_input", "聚焦输入"), - ] - - def __init__( - self, - session_id: Optional[str] = None, - name: str | None = None, - id: str | None = None, - classes: str | None = None, - ) -> None: - super().__init__(name, id, classes) - self.session_id = session_id - self.chat_manager = get_chat_manager() - self.current_session: Optional[ChatSession] = None - self.is_streaming = False - - def compose(self) -> ComposeResult: - """组合界面组件""" - yield Header(show_clock=True) - - with Container(id="chat-container"): - # 顶部工具栏 - with Horizontal(id="toolbar"): - yield Button("新建会话", id="new-session", variant="primary") - yield Button("保存会话", id="save-session", variant="default") - yield Button("加载会话", id="load-session", variant="default") - yield Button("清空历史", id="clear-history", variant="default") - yield Button("设置系统提示", id="set-system-prompt", variant="default") - yield Static(" | ", classes="separator") - yield Label("当前会话:", classes="label") - yield Static(id="current-session-label", classes="session-label") - - # 聊天记录显示区域 - yield RichLog( - id="chat-log", - wrap=True, - highlight=True, - markup=True, - classes="chat-log", - ) - - # 输入区域 - with Horizontal(id="input-container"): - yield Input( - id="message-input", - placeholder="输入消息... (按 Ctrl+Enter 发送, Esc 聚焦)", - classes="message-input", - ) - yield Button( - "发送", id="send-button", variant="primary", classes="send-button" - ) - - # 状态栏 - yield Static(id="status-bar", classes="status-bar") - - yield Footer() - - def on_mount(self) -> None: - """挂载组件时初始化""" - # 获取或创建会话 - self.current_session = self.chat_manager.get_session(self.session_id) - if self.current_session is None: - self.notify("无法创建 LLM 会话,请检查配置", severity="error") - return - - # 更新会话标签 - self.query_one("#current-session-label", Static).update( - f"{self.current_session.session_id}" - ) - - # 加载历史消息到聊天记录 - self._display_history() - - # 聚焦输入框 - self.query_one("#message-input", Input).focus() - - # 检查配置 - self._check_config() - - def _check_config(self): - """检查 LLM 配置""" - config = config_var.get() - provider_name = config["services"]["llm"] - provider_config = config["providers"]["llm"][provider_name] - - if provider_name == "openai": - if not provider_config.get("key") and not provider_config.get("url"): - self.notify( - "未配置 OpenAI API key 或 URL,请在 config.toml 中配置 [providers.llm.openai]", - severity="warning", - ) - - def _display_history(self): - """显示当前会话的历史消息""" - if not self.current_session: - return - - chat_log = self.query_one("#chat-log", RichLog) - chat_log.clear() - - for msg in self.current_session.get_history(): - role = msg["role"] - content = msg["content"] - - if role == "user": - chat_log.write(f"[bold cyan]你:[/bold cyan] {content}") - elif role == "assistant": - chat_log.write(f"[bold green]AI:[/bold green] {content}") - elif role == "system": - # 系统消息不显示在聊天记录中 - pass - - def _add_message_to_log(self, role: str, content: str): - """添加消息到聊天记录显示""" - chat_log = self.query_one("#chat-log", RichLog) - if role == "user": - chat_log.write(f"[bold cyan]你:[/bold cyan] {content}") - elif role == "assistant": - chat_log.write(f"[bold green]AI:[/bold green] {content}") - chat_log.scroll_end() - - async def on_input_submitted(self, event: Input.Submitted): - """处理输入提交""" - if event.input.id == "message-input": - await self._send_message() - - async def on_button_pressed(self, event: Button.Pressed): - """处理按钮点击""" - button_id = event.button.id - - if button_id == "send-button": - await self._send_message() - elif button_id == "new-session": - self.action_new_session() - elif button_id == "save-session": - self.action_save_session() - elif button_id == "load-session": - self.action_load_session() - elif button_id == "clear-history": - self.action_clear_history() - elif button_id == "set-system-prompt": - self.action_set_system_prompt() - - async def _send_message(self): - """发送当前输入的消息""" - if not self.current_session or self.is_streaming: - return - - input_widget = self.query_one("#message-input", Input) - message = input_widget.value.strip() - - if not message: - return - - # 清空输入框 - input_widget.value = "" - - # 显示用户消息 - self._add_message_to_log("user", message) - - # 禁用输入和按钮 - self._set_input_state(disabled=True) - self.is_streaming = True - - # 更新状态 - self.query_one("#status-bar", Static).update("AI 正在思考...") - - try: - # 发送消息并获取响应 - response = await self.current_session.send_message(message) - - # 显示AI响应 - self._add_message_to_log("assistant", response) - - except Exception as e: - error_msg = f"请求失败: {str(e)}" - logger.error(error_msg) - self._add_message_to_log("assistant", f"[red]{error_msg}[/red]") - self.notify(error_msg, severity="error") - - finally: - # 恢复输入和按钮 - self._set_input_state(disabled=False) - self.is_streaming = False - self.query_one("#status-bar", Static).update("就绪") - input_widget.focus() - - def _set_input_state(self, disabled: bool): - """设置输入控件状态""" - self.query_one("#message-input", Input).disabled = disabled - self.query_one("#send-button", Button).disabled = disabled - - async def action_save_session(self): - """保存当前会话到文件""" - if not self.current_session: - self.notify("无当前会话", severity="error") - return - - # 默认保存到 data/chat_sessions/ 目录 - save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions" - save_dir.mkdir(exist_ok=True, parents=True) - - file_path = save_dir / f"{self.current_session.session_id}.json" - self.current_session.save_to_file(file_path) - - self.notify(f"会话已保存到 {file_path}", severity="information") - - async def action_load_session(self): - """从文件加载会话""" - # 简化实现:加载默认目录下的第一个会话文件 - save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions" - if not save_dir.exists(): - self.notify(f"目录不存在: {save_dir}", severity="error") - return - - session_files = list(save_dir.glob("*.json")) - if not session_files: - self.notify("未找到会话文件", severity="error") - return - - # 使用第一个文件(在实际应用中可以让用户选择) - file_path = session_files[0] - - try: - # 获取 LLM 提供者 - provider_name = config_var.get()["services"]["llm"] - provider_config = config_var.get()["providers"]["llm"][provider_name] - from heurams.providers.llm import providers as prov - - llm_provider = prov[provider_name](provider_config) - - # 加载会话 - self.current_session = ChatSession.load_from_file(file_path, llm_provider) - - # 更新聊天管理器 - self.chat_manager.sessions[self.current_session.session_id] = ( - self.current_session - ) - - # 更新UI - self.query_one("#current-session-label", Static).update( - f"{self.current_session.session_id}" - ) - self._display_history() - - self.notify(f"已加载会话: {file_path.name}", severity="information") - - except Exception as e: - logger.error("加载会话失败: %s", e) - self.notify(f"加载失败: {str(e)}", severity="error") - - async def action_new_session(self): - """创建新会话""" - # 简单实现:使用时间戳作为会话ID - import time - - new_session_id = f"session_{int(time.time())}" - - self.current_session = self.chat_manager.get_session(new_session_id) - - # 更新UI - self.query_one("#current-session-label", Static).update( - f"{self.current_session.session_id}" - ) - self._display_history() - - self.notify(f"已创建新会话: {new_session_id}", severity="information") - self.query_one("#message-input", Input).focus() - - async def action_clear_history(self): - """清空当前会话历史""" - if not self.current_session: - return - - self.current_session.clear_history() - self._display_history() - self.notify("历史已清空", severity="information") - - async def action_set_system_prompt(self): - """设置系统提示词""" - if not self.current_session: - return - - # 使用输入框获取新提示词 - input_widget = self.query_one("#message-input", Input) - current_value = input_widget.value - - # 临时修改输入框提示 - input_widget.placeholder = "输入系统提示词... (按 Enter 确认, Esc 取消)" - input_widget.value = self.current_session.system_prompt - - # 等待用户输入 - self.notify("请输入系统提示词,按 Enter 确认", severity="information") - - # 实际应用中需要更复杂的交互,这里简化处理 - # 用户手动输入后按 Enter 会触发 on_input_submitted - # 这里我们只修改占位符,实际系统提示词设置需要额外界面 - - def action_focus_input(self): - """聚焦到输入框""" - self.query_one("#message-input", Input).focus() - - def action_go_back(self): - """返回上级屏幕""" - self.app.pop_screen() diff --git a/src/heurams/interface/screens/navigator.py b/src/heurams/interface/screens/navigator.py index cedb038..cd4907c 100644 --- a/src/heurams/interface/screens/navigator.py +++ b/src/heurams/interface/screens/navigator.py @@ -24,15 +24,13 @@ class NavigatorScreen(ModalScreen): SCREENS = [ ("仪表盘", "dashboard"), - ("电台", "radio"), - ("语言模型集成", "llmchat"), # ("创建仓库", "repo_creator"), ("缓存管理器", "precache_all"), - ("收藏夹管理器", FavoriteManagerScreen), - ("配置设置", "config"), + ("收藏夹", FavoriteManagerScreen), + # ("配置设置", "config"), + # ("调试日志", "logviewer"), + ("同步工具", "synctool"), ("关于此软件", "about"), - ("调试日志", "logviewer"), - # ("同步工具", "synctool"), # ("仓库编辑器", "repo_editor"), ] diff --git a/src/heurams/interface/screens/radio.py b/src/heurams/interface/screens/radio.py deleted file mode 100644 index 6d0b610..0000000 --- a/src/heurams/interface/screens/radio.py +++ /dev/null @@ -1,217 +0,0 @@ -"""用于筛选当日记忆的条目 以音频形式重放""" - -""" "前进电台" 界面""" -import os -from pathlib import Path -from typing import List, Optional - -from textual.app import ComposeResult -from textual.containers import Container, ScrollableContainer -from textual.reactive import reactive -from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Label, Static - -import heurams.kernel.particles as pt -from heurams.kernel.repolib import Repo -from heurams.context import config_var -from heurams.services.audio_service import play_by_path -from heurams.services.hasher import get_md5 -from heurams.services.logger import get_logger -from heurams.services.tts_service import convertor - -logger = get_logger(__name__) - - -class RadioScreen(Screen): - SUB_TITLE = "电台" - - BINDINGS = [ - ("q", "go_back", "返回"), - ("space", "toggle_play", "播放/暂停"), - ] - - # 当前播放的原子索引 - current_index = reactive(0) - # 播放状态: 'stopped', 'playing', 'paused' - play_state = reactive("stopped") - - def __init__( - self, - name: str | None = None, - id: str | None = None, - classes: str | None = None, - ) -> None: - super().__init__(name, id, classes) - self._organizer() - - def _organizer(self): - repodirs = Repo.probe_valid_repos_in_dir(Path(config_var.get()['paths']['data']) / 'repo') - repos = list(map(lambda repodir: Repo.create_from_repodir(repodir), repodirs)) - for repo in repos: - last_modify = 0.0 - for i in repo.ident_index: - e = pt.Electron.create_on_electonic_data( - electronic_data=repo.electronic_data_lict.get_itemic_unit(i) - ) - last_modify = max(last_modify, e.las()) - - - def compose(self) -> ComposeResult: - yield Header(show_clock=True) - with Container(id="main"): - yield Label("[b]前进电台[/b]", classes="title") - yield Static(f"共 {len(self.atoms)} 条当日记忆", id="status") - with Container(id="controls"): - yield Button("播放", id="play", variant="success") - yield Button("暂停", id="pause", variant="primary") - yield Button("上一首", id="prev", variant="default") - yield Button("下一首", id="next", variant="default") - yield Button("停止", id="stop", variant="error") - yield ScrollableContainer(id="playlist") - yield Footer() - - def on_mount(self) -> None: - """挂载后更新播放列表显示""" - self._update_playlist() - - def _filter_due_atoms(self) -> List[pt.Atom]: - """筛选当日需要复习的原子(已激活且到期)""" - atoms = [] - for ident in self.repo.ident_index: - n = pt.Nucleon.create_on_nucleonic_data( - nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(ident) - ) - e = pt.Electron.create_on_electonic_data( - electronic_data=self.repo.electronic_data_lict.get_itemic_unit(ident) - ) - a = pt.Atom(n, e, self.repo.orbitic_data) - # 仅选择已激活且到期的原子 - if ( - a.registry["electron"].is_activated() - and a.registry["electron"].is_due() - ): - atoms.append(a) - return atoms - - def _update_playlist(self) -> None: - """更新播放列表显示""" - container = self.query_one("#playlist") - container.remove_children() - for idx, atom in enumerate(self.atoms): - content = atom.registry["nucleon"].get("content", "无内容") - prefix = "▶ " if idx == self.current_index else " " - widget = Static(f"{prefix}{idx+1}. {content[:50]}...") - widget.set_class(idx == self.current_index, "current") - container.mount(widget) - - def _get_audio_path(self, atom: pt.Atom) -> Path: - """返回音频文件路径,若不存在则生成""" - tts_text = atom.registry["nucleon"].get("tts_text", "") - if not tts_text: - tts_text = atom.registry["nucleon"].get("content", "") - voice_dir = Path(config_var.get()["paths"]["data"]) / "cache" / "voice" - voice_dir.mkdir(parents=True, exist_ok=True) - path = voice_dir / f"{get_md5(tts_text)}.wav" - if not path.exists(): - convertor(tts_text, path) - return path - - async def _play_atom(self, idx: int) -> None: - """播放指定索引的原子(异步)""" - if idx < 0 or idx >= len(self.atoms): - return - atom = self.atoms[idx] - try: - path = self._get_audio_path(atom) - self._current_path = path - # 在后台线程中播放,避免阻塞UI - await self.run_worker( - lambda: play_by_path(path), exclusive=True, thread=True - ) - except Exception as e: - logger.error("播放失败: %s", e) - - def _stop_playback(self) -> None: - """停止当前播放""" - if self._play_task and not self._play_task.done(): - self._play_task.cancel() - self._play_task = None - self._current_path = None - self.play_state = "stopped" - - async def _play_current(self) -> None: - """播放当前索引的原子""" - self._stop_playback() - self.play_state = "playing" - self._play_task = asyncio.create_task(self._play_atom(self.current_index)) - try: - await self._play_task - except asyncio.CancelledError: - pass - finally: - if self.play_state == "playing": - self.play_state = "stopped" - - # 按钮事件处理 - def on_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id - if button_id == "play": - self.action_toggle_play() - elif button_id == "pause": - self.action_pause() - elif button_id == "prev": - self.action_prev() - elif button_id == "next": - self.action_next() - elif button_id == "stop": - self.action_stop() - - # 键盘动作 - def action_toggle_play(self) -> None: - if self.play_state == "playing": - self.action_pause() - else: - self.action_play() - - def action_play(self) -> None: - if self.play_state != "playing": - if self.play_state == "paused": - # 恢复播放(目前暂停功能简单实现为停止) - self.play_state = "playing" - else: - asyncio.create_task(self._play_current()) - - def action_pause(self) -> None: - if self.play_state == "playing": - self._stop_playback() - self.play_state = "paused" - - def action_stop(self) -> None: - self._stop_playback() - self.play_state = "stopped" - - def action_next(self) -> None: - if self.current_index < len(self.atoms) - 1: - self.current_index += 1 - self._update_playlist() - if self.play_state == "playing": - asyncio.create_task(self._play_current()) - - def action_prev(self) -> None: - if self.current_index > 0: - self.current_index -= 1 - self._update_playlist() - if self.play_state == "playing": - asyncio.create_task(self._play_current()) - - def action_go_back(self) -> None: - self._stop_playback() - self.app.pop_screen() - - # 响应式更新 - def watch_current_index(self, old: int, new: int) -> None: - self._update_playlist() - - def watch_play_state(self, old: str, new: str) -> None: - # 更新按钮状态(可在此添加样式变化) - pass diff --git a/src/heurams/interface/screens/repocreator.py b/src/heurams/interface/screens/repocreator.py deleted file mode 100644 index d60a9ec..0000000 --- a/src/heurams/interface/screens/repocreator.py +++ /dev/null @@ -1,166 +0,0 @@ -"""仓库创建向导界面""" - -from pathlib import Path - -import toml -from textual.app import ComposeResult -from textual.containers import ScrollableContainer -from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Input, Label, Markdown, Select - -from heurams.context import config_var -from heurams.services.version import ver - - -class RepoCreatorScreen(Screen): - BINDINGS = [("q", "go_back", "返回")] - SUB_TITLE = "仓库创建向导" - - def __init__(self) -> None: - super().__init__(name=None, id=None, classes=None) - - def search_templates(self): - from pathlib import Path - - from heurams.context import config_var - - template_dir = Path(config_var.get()["paths"]["data"]) / "templates" - templates = list() - for i in template_dir.iterdir(): - if i.name.endswith(".toml"): - try: - import toml - - with open(i, "r") as f: - dic = toml.load(f) - desc = dic["__metadata__.attribution"]["desc"] - templates.append(desc + " (" + i.name + ")") - except Exception as e: - templates.append(f"无描述模板 ({i.name})") - print(e) - return templates - - def compose(self) -> ComposeResult: - yield Header(show_clock=True) - with ScrollableContainer(id="vice_container"): - yield Label(f"[b]空白单元集创建向导\n") - yield Markdown( - "> 提示: 你可能注意到当选中文本框时底栏和操作按键绑定将被覆盖 \n只需选中(使用鼠标或 Tab)选择框即可恢复底栏功能" - ) - yield Markdown("1. 键入单元集名称") - yield Input(placeholder="单元集名称", id="name_input") - yield Markdown( - "> 单元集名称不应与现有单元集重复. \n> 新的单元集文件将创建在 ./nucleon/你输入的名称.toml" - ) - yield Label(f"\n") - yield Markdown("2. 选择单元集模板") - LINES = self.search_templates() - """带有宏支持的空白单元集 ({ver}) -古诗词模板单元集 ({ver}) -英语词汇和短语模板单元集 ({ver}) -""" - yield Select.from_values(LINES, prompt="选择类型", id="template_select") - yield Markdown("> 新单元集的版本号将和主程序版本保持同步") - yield Label(f"\n") - yield Markdown("3. 输入常见附加元数据 (可选)") - yield Input(placeholder="作者", id="author_input") - yield Input(placeholder="内容描述", id="desc_input") - yield Button( - "新建空白单元集", - id="submit_button", - variant="primary", - classes="start-button", - ) - yield Footer() - - def on_mount(self): - self.query_one("#submit_button").focus() - - def action_go_back(self): - self.app.pop_screen() - - def action_quit_app(self): - self.app.exit() - - def on_button_pressed(self, event) -> None: - event.stop() - if event.button.id == "submit_button": - # 获取输入值 - name_input = self.query_one("#name_input") - template_select = self.query_one("#template_select") - author_input = self.query_one("#author_input") - desc_input = self.query_one("#desc_input") - - name = name_input.value.strip() # type: ignore - author = author_input.value.strip() # type: ignore - desc = desc_input.value.strip() # type: ignore - selected = template_select.value # type: ignore - - # 验证 - if not name: - self.notify("单元集名称不能为空", severity="error") - return - - # 获取配置路径 - config = config_var.get() - nucleon_dir = Path(config["paths"]["nucleon_dir"]) - template_dir = Path(config["paths"]["template_dir"]) - - # 检查文件是否已存在 - nucleon_path = nucleon_dir / f"{name}.toml" - if nucleon_path.exists(): - self.notify(f"单元集 '{name}' 已存在", severity="error") - return - - # 确定模板文件 - if selected is None: - self.notify("请选择一个模板", severity="error") - return - # selected 是描述字符串, 格式如 "描述 (filename.toml)" - # 提取文件名 - import re - - match = re.search(r"\(([^)]+)\)$", selected) - if not match: - self.notify("模板选择格式无效", severity="error") - return - template_filename = match.group(1) - template_path = template_dir / template_filename - if not template_path.exists(): - self.notify(f"模板文件不存在: {template_filename}", severity="error") - return - - # 加载模板 - try: - with open(template_path, "r", encoding="utf-8") as f: - template_data = toml.load(f) - except Exception as e: - self.notify(f"加载模板失败: {e}", severity="error") - return - - # 更新元数据 - metadata = template_data.get("__metadata__", {}) - attribution = metadata.get("attribution", {}) - if author: - attribution["author"] = author - if desc: - attribution["desc"] = desc - attribution["name"] = name - # 可选: 设置版本 - attribution["version"] = ver - metadata["attribution"] = attribution - template_data["__metadata__"] = metadata - - # 确保 nucleon_dir 存在 - nucleon_dir.mkdir(parents=True, exist_ok=True) - - # 写入新文件 - try: - with open(nucleon_path, "w", encoding="utf-8") as f: - toml.dump(template_data, f) - except Exception as e: - self.notify(f"保存单元集失败: {e}", severity="error") - return - - self.notify(f"单元集 '{name}' 创建成功") - self.app.pop_screen() diff --git a/src/heurams/interface/screens/repoeditor.py b/src/heurams/interface/screens/repoeditor.py deleted file mode 100644 index ec3f92c..0000000 --- a/src/heurams/interface/screens/repoeditor.py +++ /dev/null @@ -1,267 +0,0 @@ -"""仓库编辑器, 使用TextArea控件等实现仓库配置编辑""" - -import json -from pathlib import Path -from typing import Optional - -import toml -from textual.app import ComposeResult -from textual.containers import Container, Horizontal, ScrollableContainer, Vertical -from textual.reactive import reactive -from textual.screen import Screen -from textual.widgets import ( - Button, - Footer, - Header, - Label, - ListItem, - ListView, - Static, - TextArea, -) - -from heurams.context import config_var -from heurams.kernel.repolib import Repo -from heurams.services.logger import get_logger - -logger = get_logger(__name__) - - -class RepoEditorScreen(Screen): - """仓库编辑器屏幕""" - - SUB_TITLE = "仓库编辑器" - - BINDINGS = [ - ("q", "go_back", "返回"), - ("s", "save_file", "保存"), - ("r", "reload_file", "重载"), - ("d", "toggle_dark", ""), - ] - - # 当前选择的仓库路径 - selected_repo_path: reactive[Optional[Path]] = reactive(None) - # 当前选择的文件名 - selected_filename: reactive[Optional[str]] = reactive(None) - # 文件内容 - file_content: reactive[str] = reactive("") - - def __init__( - self, - repo: Optional[Repo] = None, - name: str | None = None, - id: str | None = None, - classes: str | None = None, - ) -> None: - super().__init__(name, id, classes) - self.repo = repo - self.repo_dir: Optional[Path] = None - self.file_list = [] - if repo is not None and repo.source is not None: - self.repo_dir = repo.source - self._load_file_list() - # selected_repo_path 将在 on_mount 中设置,避免触发watch时组件未就绪 - - def _load_file_list(self) -> None: - """加载仓库目录下的文件列表""" - if self.repo_dir is None: - return - self.file_list = [] - for fname in Repo.file_mapping.values(): - fpath = self.repo_dir / fname - if fpath.exists(): - self.file_list.append(fname) - # 也可能存在其他文件,但暂时只支持标准文件 - self.file_list.sort() - - def compose(self) -> ComposeResult: - """组合界面组件""" - yield Header(show_clock=True) - with Container(id="main_container"): - with Horizontal(id="top_panel"): - # 左侧: 仓库选择 - with Vertical(id="repo_selector", classes="panel"): - yield Label("仓库列表", classes="panel-title") - yield ListView( - *[ - ListItem(Label(repo_dir.name)) - for repo_dir in self._get_repo_dirs() - ], - id="repo_list", - classes="list-view", - ) - # 中间: 文件列表 - with Vertical(id="file_selector", classes="panel"): - yield Label("文件列表", classes="panel-title") - yield ListView( - *[ListItem(Label(fname)) for fname in self.file_list], - id="file_list", - classes="list-view", - ) - # 右侧: 编辑区域 - with Vertical(id="editor_panel", classes="panel"): - yield Label("编辑文件", classes="panel-title") - yield TextArea( - id="text_editor", - language="plaintext", - classes="text-editor", - ) - with Horizontal(id="button_bar"): - yield Button("保存", id="save_button", variant="primary") - yield Button("重载", id="reload_button", variant="default") - yield Button("返回", id="back_button", variant="error") - yield Footer() - - def _get_repo_dirs(self) -> list[Path]: - """获取data/repo/下所有有效仓库目录""" - repo_root = Path(config_var.get()["paths"]["data"]) / "repo" - repo_dirs = [] - if repo_root.exists(): - for entry in repo_root.iterdir(): - if entry.is_dir(): - # 检查是否存在 manifest.toml - if (entry / "manifest.toml").exists(): - repo_dirs.append(entry) - return repo_dirs - - def on_mount(self) -> None: - """挂载组件时初始化""" - # 如果已有仓库,设置 selected_repo_path 以触发watch(此时组件已就绪) - if self.repo_dir is not None: - self.selected_repo_path = self.repo_dir - # 焦点放在仓库列表 - self.query_one("#repo_list", ListView).focus() - - def watch_selected_repo_path( - self, old_path: Optional[Path], new_path: Optional[Path] - ) -> None: - """当选择的仓库路径变化时,加载文件列表""" - if new_path is None: - self.file_list = [] - self.selected_filename = None - self.file_content = "" - return - self.repo_dir = new_path - self._load_file_list() - # 如果组件已挂载,更新UI - if self.is_mounted: - file_list_view = self.query_one("#file_list", ListView) - file_list_view.clear() - for fname in self.file_list: - file_list_view.append(ListItem(Label(fname))) - # 清空编辑器 - self.query_one("#text_editor", TextArea).text = "" - self.selected_filename = None - - def watch_selected_filename( - self, old_name: Optional[str], new_name: Optional[str] - ) -> None: - """当选择的文件名变化时,加载文件内容""" - if new_name is None or self.repo_dir is None: - self.file_content = "" - return - file_path = self.repo_dir / new_name - if not file_path.exists(): - self.notify(f"文件不存在: {new_name}", severity="error") - return - try: - content = file_path.read_text(encoding="utf-8") - self.file_content = content - # 如果组件已挂载,更新编辑器 - if self.is_mounted: - editor = self.query_one("#text_editor", TextArea) - editor.text = content - # 根据文件后缀设置语言 - if new_name.endswith(".toml"): - editor.language = "toml" - elif new_name.endswith(".json"): - editor.language = "json" - else: - editor.language = "plaintext" - except Exception as e: - logger.error(f"读取文件失败: {e}") - self.notify(f"读取文件失败: {e}", severity="error") - - def watch_file_content(self, old_content: str, new_content: str) -> None: - """当文件内容变化时更新编辑器(仅当外部改变时)""" - # 目前不需要做任何事情,因为编辑器内容已绑定 - pass - - def on_list_view_selected(self, event) -> None: - """处理列表项选择事件""" - if not isinstance(event.item, ListItem): - return - list_id = event.list_view.id - selected_label = event.item.query_one(Label) - selected_text = str(selected_label.render()) - - if list_id == "repo_list": - # 用户选择了仓库 - repo_root = Path(config_var.get()["paths"]["data"]) / "repo" - selected_dir = repo_root / selected_text - if selected_dir.exists(): - self.selected_repo_path = selected_dir - elif list_id == "file_list": - # 用户选择了文件 - if self.repo_dir is None: - self.notify("请先选择仓库", severity="warning") - return - self.selected_filename = selected_text - - def on_button_pressed(self, event) -> None: - """处理按钮点击事件""" - event.stop() - if event.button.id == "save_button": - self.action_save_file() - elif event.button.id == "reload_button": - self.action_reload_file() - elif event.button.id == "back_button": - self.action_go_back() - - def action_save_file(self) -> None: - """保存当前编辑的文件""" - if self.repo_dir is None or self.selected_filename is None: - self.notify("未选择仓库或文件", severity="warning") - return - file_path = self.repo_dir / self.selected_filename - editor = self.query_one("#text_editor", TextArea) - new_content = editor.text - # 验证格式 - try: - if self.selected_filename.endswith(".toml"): - toml.loads(new_content) # 验证TOML - elif self.selected_filename.endswith(".json"): - json.loads(new_content) # 验证JSON - except Exception as e: - self.notify(f"格式错误: {e}", severity="error") - return - # 写入文件 - try: - file_path.write_text(new_content, encoding="utf-8") - self.notify("保存成功", severity="information") - except Exception as e: - logger.error(f"保存文件失败: {e}") - self.notify(f"保存文件失败: {e}", severity="error") - - def action_reload_file(self) -> None: - """重新加载当前文件(放弃修改)""" - if self.repo_dir is None or self.selected_filename is None: - self.notify("未选择仓库或文件", severity="warning") - return - file_path = self.repo_dir / self.selected_filename - try: - content = file_path.read_text(encoding="utf-8") - editor = self.query_one("#text_editor", TextArea) - editor.text = content - self.notify("已重载", severity="information") - except Exception as e: - logger.error(f"重载文件失败: {e}") - self.notify(f"重载文件失败: {e}", severity="error") - - def action_go_back(self) -> None: - """返回上一屏幕""" - self.app.pop_screen() - - def action_toggle_dark(self) -> None: - """切换暗色模式""" - self.app.dark = not self.app.dark diff --git a/src/heurams/kernel/README.md b/src/heurams/kernel/README.md index 37eab76..6eb5b8d 100644 --- a/src/heurams/kernel/README.md +++ b/src/heurams/kernel/README.md @@ -1,2 +1,3 @@ # Kernel - HeurAMS 核心 -记忆规划相关算法与数据结构, 可脱离业务层 \ No newline at end of file + +记忆规划相关算法与数据结构, 可脱离业务层 diff --git a/src/heurams/kernel/particles/atom.py b/src/heurams/kernel/particles/atom.py index 463add0..cc2baef 100644 --- a/src/heurams/kernel/particles/atom.py +++ b/src/heurams/kernel/particles/atom.py @@ -32,7 +32,7 @@ class Atom: default_runtime = { "locked": False, - "min_rate": 0x3F3F3F3F, + "min_rate": float('inf'), "new_activation": False, } diff --git a/src/heurams/kernel/reactor/README.md b/src/heurams/kernel/reactor/README.md index 8aaaaa3..c35073d 100644 --- a/src/heurams/kernel/reactor/README.md +++ b/src/heurams/kernel/reactor/README.md @@ -1,16 +1,24 @@ # Reactor - 记忆流程状态机模块 + Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实现是解耦的, 以便后期与其他框架的适配. 得益于 Pickle, 状态机模块支持快照! + ## Phaser - 全局阶段控制器 + 在一次队列记忆流程中, Phaser 代表记忆流程本身. + ### 属性 + #### 状态属性 + 其有状态属性: + - unsure - 用于初始化 - *quick_review - 复习逾期的单元 - *recognition - 辨识新单元 - *final_review - 复习所有逾期的和新辨认的单元 -- finished - 表示完成 +- finished - 表示完成 + > 逾期的: 指 SM-2 算法间隔显示应该复习的单元 带 * 的属性表示实际的记忆阶段, 由 repo 中 schedule.toml 中 schedule 列表显式声明, 运行过程中可以选择性执行, "空的" Procession 会被直接跳过. @@ -18,106 +26,148 @@ Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实 在初始化 Procession 时, 每个 Procession 被赋予一个不重复的状态属性 作为"阶段状态"属性, 以此标识 Procession 的阶段属性, 因为每个 Procession 管理一个阶段下的复习进程. 你可以用 state 属性获取 Phaser 的当前状态. + #### Procession 属性 + 储存一个顺序列表, 保存所有构造的 Procession. 顺序与 repo 中 schedule.toml 中 schedule 列表中的顺序完全相同 ### 初始化 + Phaser 接受一个存储 Atom 对象的列表, 作为组织记忆流程的材料 在内部, 根据是否激活将其分为 new_atoms 与 old_atoms. 因此, 如果你传入的列表中有算法上"无所事事"的 Atom, 流程会对其进行"加强复习" 由此创建 Procession. ### 直接输出呈现形式 + Phaser 的 __repr__ 定义了此对象"官方的显示"用作直观的调试. 其以 ascii 表格形式输出, 格式也符合 markdown 表格规范, 你可以直接复制到 markdown. 示例: + ```text | Type | State | Processions | Current Procession | |:-------|:--------|:-----------------------|:---------------------| | Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 | ``` + | Type | State | Processions | Current Procession | |:-------|:--------|:-----------------------|:---------------------| | Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 | ### 方法 + 作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法. 除此之外, 它也拥有一些其他方法. + #### current_procession(self) + 用于查询当前的 Procession, 并且根据当前 Procession 更新自身状态. 返回一个 Procession 对象, 是当前阶段的 Procession. 内部运作是返回第一个状态不为 finished 的 Procession, 并将自身状态变更为 Procession 的"阶段状态"属性 若所有 Procession 都已完成, 将返回一个"阶段状态"为 finished 的 Procession 占位符对象(它不在 procession 属性中), 并更新自身状态为 finished. ## Procession - 阶段管理器 + ### 属性 + #### 状态属性 + 其有状态属性: + - active - 标识未完成, 初始化的默认属性 - finished - 完成了 + #### 其他属性 + - current_atom: 当前记忆原子的引用 - atoms: 队列中所有原子列表 - cursor: 指针, 是当前原子在 atoms 列表中的索引 - phase: "阶段属性" + > 注意区分 "Phaser" 和 "Phase", 其中 "Phase" 表示 "Phaser State". + - name_: 阶段的命名 - state: 当前状态属性 + ### 初始化 + 接受一个 atoms 列表与 phase_state (PhaserState Enum 类型)对象 + ### 直接输出呈现形式 + 同 Phaser, 但显示数据有所不同 与 Phaser 不同, Procession 显示队列会对过长的 atom.ident 进行缩略(末尾 `>` 符号) + ```text | Type | Name | State | Progress | Queue | Current Atom | |:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------| | Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, | ``` + | Type | Name | State | Progress | Queue | Current Atom | |:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------| | Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, | + ### 方法 + 作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法. 除此之外, 它也拥有一些其他方法. + #### forward(self, step=1) + 移动 cursor 并依情况更新 current_atom 和状态属性 无论 Procession 是否处于完成状态, forward 操作都是可逆的, 你可以传入负数, 此时已完成的 Procession 会自动"重启". #### append(self, atom=None) + 追加(回忆失败的)原子(默认为当前原子, 传入 None 会自动转化为当前原子)到队列末端 如果这个原子已经处于队列末端, 不会重复追加, 除非队列只剩下这个原子还没完成(此时最多重复两个) #### process(self) + 返回 cursor 值 #### __len__(self) + 返回剩余原子量(而不是原子总量) 可以使用 len 函数调用 获取原子总量请用 len(obj.atoms), 或者 total_length(self) 方法 #### total_length(self) + 返回队列原子总量 #### is_empty(self) + 判断是否为空队列(传入原子列表对象是空列表的队列) #### get_fission(self) + 获取当前原子的 Fission 对象, 用于单原子调度展开 ## Fission - 单原子调度控制器 + ### 属性 + #### 状态属性 + - exammode: 测试模式(默认) - retronly: 仅回顾模式 + #### 其他属性 + - cursor - atom - current_puzzle - orbital_schedule - orbital_puzzles - puzzles + ### 初始化 + 接受 atom 对象和 phase 参数 + ### 方法 -#### get_puzzles(self) \ No newline at end of file + +#### get_puzzles(self) diff --git a/src/heurams/kernel/reactor/fission.py b/src/heurams/kernel/reactor/fission.py index cba67cd..98ef82d 100644 --- a/src/heurams/kernel/reactor/fission.py +++ b/src/heurams/kernel/reactor/fission.py @@ -74,7 +74,7 @@ class Fission(Machine): self.current_puzzle_inf = self.puzzles_inf[0] for i in range(len(self.puzzles_inf)): - self.min_ratings.append(0x3F3F3F3F) + self.min_ratings.append(float('inf')) Machine.__init__( self, diff --git a/src/heurams/kernel/repolib/navi.py b/src/heurams/kernel/repolib/navi.py deleted file mode 100644 index bbd2449..0000000 --- a/src/heurams/kernel/repolib/navi.py +++ /dev/null @@ -1,3 +0,0 @@ -class Navi: - def __init__(self, init) -> None: - pass diff --git a/src/heurams/services/audio_service.py b/src/heurams/services/audio_service.py index fe801ac..7693156 100644 --- a/src/heurams/services/audio_service.py +++ b/src/heurams/services/audio_service.py @@ -1,4 +1,4 @@ -# 音频服务 +"""音频服务""" from typing import Callable from heurams.context import config_var diff --git a/src/heurams/services/config.py b/src/heurams/services/config.py index c71cc6b..e0c6b12 100644 --- a/src/heurams/services/config.py +++ b/src/heurams/services/config.py @@ -1,4 +1,4 @@ -# 配置文件服务 +"""配置文件服务""" import pathlib import typing diff --git a/src/heurams/services/llm_service.py b/src/heurams/services/llm_service.py deleted file mode 100644 index 2fd730e..0000000 --- a/src/heurams/services/llm_service.py +++ /dev/null @@ -1,228 +0,0 @@ -"""LLM 聊天服务""" - -import asyncio -import json -from pathlib import Path -from typing import Any, Dict, List, Optional - -from heurams.context import config_var -from heurams.providers.llm import providers as prov -from heurams.services.logger import get_logger - -logger = get_logger(__name__) - - -class ChatSession: - """聊天会话,管理单个对话的历史和参数""" - - def __init__( - self, session_id: str, llm_provider, system_prompt: str = "", **default_params - ): - """初始化聊天会话 - - Args: - session_id: 会话唯一标识符 - llm_provider: LLM 提供者实例 - system_prompt: 系统提示词 - **default_params: 默认参数(temperature, max_tokens, model 等) - """ - self.session_id = session_id - self.llm_provider = llm_provider - self.system_prompt = system_prompt - self.default_params = default_params - - # 消息历史 - self.messages: List[Dict[str, str]] = [] - if system_prompt: - self.messages.append({"role": "system", "content": system_prompt}) - - logger.debug("创建聊天会话: id=%s", session_id) - - def add_message(self, role: str, content: str): - """添加消息到历史""" - self.messages.append({"role": role, "content": content}) - logger.debug( - "会话 %s 添加消息: role=%s, length=%d", self.session_id, role, len(content) - ) - - def clear_history(self): - """清空消息历史(保留系统提示)""" - self.messages = [] - if self.system_prompt: - self.messages.append({"role": "system", "content": self.system_prompt}) - logger.debug("会话 %s 清空历史", self.session_id) - - def set_system_prompt(self, prompt: str): - """设置系统提示词""" - self.system_prompt = prompt - # 更新消息历史中的系统消息 - if self.messages and self.messages[0]["role"] == "system": - self.messages[0]["content"] = prompt - elif prompt: - self.messages.insert(0, {"role": "system", "content": prompt}) - logger.debug("会话 %s 设置系统提示: length=%d", self.session_id, len(prompt)) - - async def send_message(self, message: str, **override_params) -> str: - """发送消息并获取响应 - - Args: - message: 用户消息内容 - **override_params: 覆盖默认参数 - - Returns: - 模型响应内容 - """ - # 添加用户消息 - self.add_message("user", message) - - # 合并参数 - params = {**self.default_params, **override_params} - - # 发送请求 - logger.debug("会话 %s 发送消息: length=%d", self.session_id, len(message)) - response = await self.llm_provider.chat(self.messages, **params) - - # 添加助手响应 - self.add_message("assistant", response) - - return response - - async def send_message_stream(self, message: str, **override_params): - """流式发送消息 - - Args: - message: 用户消息内容 - **override_params: 覆盖默认参数 - - Yields: - 流式响应的文本块 - """ - # 添加用户消息 - self.add_message("user", message) - - # 合并参数 - params = {**self.default_params, **override_params} - - # 发送流式请求 - logger.debug("会话 %s 发送流式消息: length=%d", self.session_id, len(message)) - - full_response = "" - async for chunk in self.llm_provider.chat_stream(self.messages, **params): - yield chunk - full_response += chunk - - # 添加完整的助手响应到历史 - self.add_message("assistant", full_response) - - def get_history(self) -> List[Dict[str, str]]: - """获取消息历史(不包括系统消息)""" - # 返回用户和助手的消息,可选排除系统消息 - return [msg for msg in self.messages if msg["role"] != "system"] - - def save_to_file(self, file_path: Path): - """保存会话到文件""" - data = { - "session_id": self.session_id, - "system_prompt": self.system_prompt, - "default_params": self.default_params, - "messages": self.messages, - } - with open(file_path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - logger.debug("会话 %s 保存到: %s", self.session_id, file_path) - - @classmethod - def load_from_file(cls, file_path: Path, llm_provider): - """从文件加载会话""" - with open(file_path, "r", encoding="utf-8") as f: - data = json.load(f) - - session = cls( - session_id=data["session_id"], - llm_provider=llm_provider, - system_prompt=data.get("system_prompt", ""), - **data.get("default_params", {}) - ) - session.messages = data["messages"] - logger.debug("从文件加载会话: %s", file_path) - return session - - -class ChatManager: - """聊天管理器,管理多个会话""" - - def __init__(self): - self.sessions: Dict[str, ChatSession] = {} - self.default_session_id = "default" - logger.debug("聊天管理器初始化完成") - - def get_session( - self, - session_id: Optional[str] = None, - create_if_missing: bool = True, - **session_params - ) -> Optional[ChatSession]: - """获取或创建聊天会话 - - Args: - session_id: 会话标识符,None 则使用默认会话 - create_if_missing: 如果会话不存在则创建 - **session_params: 传递给 ChatSession 的参数 - - Returns: - 聊天会话实例,如果不存在且不创建则返回 None - """ - if session_id is None: - session_id = self.default_session_id - - if session_id in self.sessions: - return self.sessions[session_id] - - if create_if_missing: - # 获取 LLM 提供者 - provider_name = config_var.get()["services"]["llm"] - provider_config = config_var.get()["providers"]["llm"][provider_name] - llm_provider = prov[provider_name](provider_config) - - session = ChatSession( - session_id=session_id, llm_provider=llm_provider, **session_params - ) - self.sessions[session_id] = session - logger.debug("创建新会话: id=%s", session_id) - return session - - return None - - def delete_session(self, session_id: str): - """删除会话""" - if session_id in self.sessions: - del self.sessions[session_id] - logger.debug("删除会话: id=%s", session_id) - - def list_sessions(self) -> List[str]: - """列出所有会话ID""" - return list(self.sessions.keys()) - - -# 全局聊天管理器实例 -_chat_manager: Optional[ChatManager] = None - - -def get_chat_manager() -> ChatManager: - """获取全局聊天管理器实例""" - global _chat_manager - if _chat_manager is None: - _chat_manager = ChatManager() - logger.debug("创建全局聊天管理器") - return _chat_manager - - -def create_chat_session( - session_id: Optional[str] = None, **session_params -) -> ChatSession: - """创建或获取聊天会话(便捷函数)""" - manager = get_chat_manager() - return manager.get_session(session_id, True, **session_params) - - -logger.debug("LLM 服务初始化完成") diff --git a/src/heurams/services/sync_service.py b/src/heurams/services/sync_service.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/heurams/services/tts_service.py b/src/heurams/services/tts_service.py index 81b4c70..0c582f5 100644 --- a/src/heurams/services/tts_service.py +++ b/src/heurams/services/tts_service.py @@ -9,5 +9,5 @@ logger = get_logger(__name__) convertor: Callable = prov[config_var.get()["services"]["tts"]].convert logger.debug( - "TTS服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"] + "TTS 服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"] ) diff --git a/src/heurams/services/version.py b/src/heurams/services/version.py index a8135ba..0a30daf 100644 --- a/src/heurams/services/version.py +++ b/src/heurams/services/version.py @@ -5,7 +5,7 @@ logger = get_logger(__name__) ver = "0.5.0" stage = "prototype" -codename = "fulcrom" +codename = "fulcrum" codename_cn = "支点" logger.info("HeurAMS 版本: %s (%s), 阶段: %s", ver, codename, stage)