style: 移除设计不当的部分模块
This commit is contained in:
@@ -35,24 +35,26 @@ class AboutScreen(Screen):
|
||||
about_text = f"""
|
||||
# 关于 "潜进"
|
||||
|
||||
版本 {version.ver} {version.stage.capitalize()}
|
||||
|
||||
开发代号: {version.codename.capitalize()} {version.codename_cn}
|
||||
主程序库版本: `{version.ver}-python`
|
||||
用户界面分支: `Textual TUI (基本用户界面)`
|
||||
用户界面版本: `{version.ver}`
|
||||
API 版本代号: `{version.codename.capitalize()}`
|
||||
|
||||
一个基于启发式算法的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划.
|
||||
一个基于启发式算法与认知科学理论的辅助记忆调度器, 旨在帮助用户更高效地进行记忆工作与学习规划.
|
||||
|
||||
以 AGPL-3.0 开放源代码, 这直接意味着任何个体直接基于此代码对外或内部提供的应用和服务, 无论本地或网络, 必须向所有用户公开完整修改后的源代码, 且继续沿用 AGPL-3.0 协议.
|
||||
|
||||
您可在项目主页 https://ams.imwangzhiyu.xyz 获取用户指南, 开发文档与软件更新.
|
||||
您正使用的 TUI 用户界面是 python 版本程序库自带的基本用户界面, 作为基本的全功能前端实现与程序库测试, 如果您想去除它, 请移除程序库根目录中的 interface 文件夹.
|
||||
|
||||
如果您觉得这个软件有用, 可以给它添加一个星标 :)
|
||||
您可在项目主页 https://ams.pluv27.top 获取用户指南, 开发文档与软件更新.
|
||||
|
||||
> 此软件, 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力
|
||||
> 即使您不是软件开发人员, 我们也欢迎您加入 HeurAMS 的队伍!
|
||||
如果您觉得这个软件有用, 可以在它的源代码仓库给它添加一个星标 :)
|
||||
|
||||
> 潜进(HeurAMS), 以及它作为一个"程序库"是自由且免费的, 但是开发工作必须投入大量精力.
|
||||
> 您可以加入各种语言的翻译团队来翻译软件的界面, 您还可以制作图像、主题、音效, 或者改进软件配套的文档……
|
||||
> 不管您来自何方, 我们都欢迎您加入社区并做出贡献.
|
||||
> 我们的共同目标是为人人带来高品质的辅助记忆 & 学习软件.
|
||||
> 您的慷慨支持, 我们必当涌泉相报.
|
||||
> 您的慷慨支持, 我们必当涌泉相报.
|
||||
|
||||
开发人员列表:
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ from heurams.services.logger import get_logger
|
||||
from .about import AboutScreen
|
||||
from .navigator import NavigatorScreen
|
||||
from .preparation import PreparationScreen
|
||||
from .radio import RadioScreen
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -90,7 +89,7 @@ class DashboardScreen(Screen):
|
||||
is_due = 0
|
||||
unit_sum = len(repo)
|
||||
activated_sum = 0
|
||||
nextdate = 0x3F3F3F3F
|
||||
nextdate = float('inf')
|
||||
for i in repo.ident_index:
|
||||
nucleon = pt.Nucleon.create_on_nucleonic_data(
|
||||
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
|
||||
|
||||
@@ -1,330 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Horizontal
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Input, Label, RichLog, Static
|
||||
|
||||
from heurams.context import *
|
||||
from heurams.services.llm_service import ChatSession, get_chat_manager
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class LLMChatScreen(Screen):
|
||||
"""LLM 聊天屏幕"""
|
||||
|
||||
SUB_TITLE = "语言模型集成"
|
||||
BINDINGS = [
|
||||
("q", "go_back", "返回"),
|
||||
("ctrl+s", "save_session", "保存会话"),
|
||||
("ctrl+l", "load_session", "加载会话"),
|
||||
("ctrl+n", "new_session", "新建会话"),
|
||||
("ctrl+c", "clear_history", "清空历史"),
|
||||
("escape", "focus_input", "聚焦输入"),
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session_id: Optional[str] = None,
|
||||
name: str | None = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(name, id, classes)
|
||||
self.session_id = session_id
|
||||
self.chat_manager = get_chat_manager()
|
||||
self.current_session: Optional[ChatSession] = None
|
||||
self.is_streaming = False
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""组合界面组件"""
|
||||
yield Header(show_clock=True)
|
||||
|
||||
with Container(id="chat-container"):
|
||||
# 顶部工具栏
|
||||
with Horizontal(id="toolbar"):
|
||||
yield Button("新建会话", id="new-session", variant="primary")
|
||||
yield Button("保存会话", id="save-session", variant="default")
|
||||
yield Button("加载会话", id="load-session", variant="default")
|
||||
yield Button("清空历史", id="clear-history", variant="default")
|
||||
yield Button("设置系统提示", id="set-system-prompt", variant="default")
|
||||
yield Static(" | ", classes="separator")
|
||||
yield Label("当前会话:", classes="label")
|
||||
yield Static(id="current-session-label", classes="session-label")
|
||||
|
||||
# 聊天记录显示区域
|
||||
yield RichLog(
|
||||
id="chat-log",
|
||||
wrap=True,
|
||||
highlight=True,
|
||||
markup=True,
|
||||
classes="chat-log",
|
||||
)
|
||||
|
||||
# 输入区域
|
||||
with Horizontal(id="input-container"):
|
||||
yield Input(
|
||||
id="message-input",
|
||||
placeholder="输入消息... (按 Ctrl+Enter 发送, Esc 聚焦)",
|
||||
classes="message-input",
|
||||
)
|
||||
yield Button(
|
||||
"发送", id="send-button", variant="primary", classes="send-button"
|
||||
)
|
||||
|
||||
# 状态栏
|
||||
yield Static(id="status-bar", classes="status-bar")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""挂载组件时初始化"""
|
||||
# 获取或创建会话
|
||||
self.current_session = self.chat_manager.get_session(self.session_id)
|
||||
if self.current_session is None:
|
||||
self.notify("无法创建 LLM 会话,请检查配置", severity="error")
|
||||
return
|
||||
|
||||
# 更新会话标签
|
||||
self.query_one("#current-session-label", Static).update(
|
||||
f"{self.current_session.session_id}"
|
||||
)
|
||||
|
||||
# 加载历史消息到聊天记录
|
||||
self._display_history()
|
||||
|
||||
# 聚焦输入框
|
||||
self.query_one("#message-input", Input).focus()
|
||||
|
||||
# 检查配置
|
||||
self._check_config()
|
||||
|
||||
def _check_config(self):
|
||||
"""检查 LLM 配置"""
|
||||
config = config_var.get()
|
||||
provider_name = config["services"]["llm"]
|
||||
provider_config = config["providers"]["llm"][provider_name]
|
||||
|
||||
if provider_name == "openai":
|
||||
if not provider_config.get("key") and not provider_config.get("url"):
|
||||
self.notify(
|
||||
"未配置 OpenAI API key 或 URL,请在 config.toml 中配置 [providers.llm.openai]",
|
||||
severity="warning",
|
||||
)
|
||||
|
||||
def _display_history(self):
|
||||
"""显示当前会话的历史消息"""
|
||||
if not self.current_session:
|
||||
return
|
||||
|
||||
chat_log = self.query_one("#chat-log", RichLog)
|
||||
chat_log.clear()
|
||||
|
||||
for msg in self.current_session.get_history():
|
||||
role = msg["role"]
|
||||
content = msg["content"]
|
||||
|
||||
if role == "user":
|
||||
chat_log.write(f"[bold cyan]你:[/bold cyan] {content}")
|
||||
elif role == "assistant":
|
||||
chat_log.write(f"[bold green]AI:[/bold green] {content}")
|
||||
elif role == "system":
|
||||
# 系统消息不显示在聊天记录中
|
||||
pass
|
||||
|
||||
def _add_message_to_log(self, role: str, content: str):
|
||||
"""添加消息到聊天记录显示"""
|
||||
chat_log = self.query_one("#chat-log", RichLog)
|
||||
if role == "user":
|
||||
chat_log.write(f"[bold cyan]你:[/bold cyan] {content}")
|
||||
elif role == "assistant":
|
||||
chat_log.write(f"[bold green]AI:[/bold green] {content}")
|
||||
chat_log.scroll_end()
|
||||
|
||||
async def on_input_submitted(self, event: Input.Submitted):
|
||||
"""处理输入提交"""
|
||||
if event.input.id == "message-input":
|
||||
await self._send_message()
|
||||
|
||||
async def on_button_pressed(self, event: Button.Pressed):
|
||||
"""处理按钮点击"""
|
||||
button_id = event.button.id
|
||||
|
||||
if button_id == "send-button":
|
||||
await self._send_message()
|
||||
elif button_id == "new-session":
|
||||
self.action_new_session()
|
||||
elif button_id == "save-session":
|
||||
self.action_save_session()
|
||||
elif button_id == "load-session":
|
||||
self.action_load_session()
|
||||
elif button_id == "clear-history":
|
||||
self.action_clear_history()
|
||||
elif button_id == "set-system-prompt":
|
||||
self.action_set_system_prompt()
|
||||
|
||||
async def _send_message(self):
|
||||
"""发送当前输入的消息"""
|
||||
if not self.current_session or self.is_streaming:
|
||||
return
|
||||
|
||||
input_widget = self.query_one("#message-input", Input)
|
||||
message = input_widget.value.strip()
|
||||
|
||||
if not message:
|
||||
return
|
||||
|
||||
# 清空输入框
|
||||
input_widget.value = ""
|
||||
|
||||
# 显示用户消息
|
||||
self._add_message_to_log("user", message)
|
||||
|
||||
# 禁用输入和按钮
|
||||
self._set_input_state(disabled=True)
|
||||
self.is_streaming = True
|
||||
|
||||
# 更新状态
|
||||
self.query_one("#status-bar", Static).update("AI 正在思考...")
|
||||
|
||||
try:
|
||||
# 发送消息并获取响应
|
||||
response = await self.current_session.send_message(message)
|
||||
|
||||
# 显示AI响应
|
||||
self._add_message_to_log("assistant", response)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"请求失败: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
self._add_message_to_log("assistant", f"[red]{error_msg}[/red]")
|
||||
self.notify(error_msg, severity="error")
|
||||
|
||||
finally:
|
||||
# 恢复输入和按钮
|
||||
self._set_input_state(disabled=False)
|
||||
self.is_streaming = False
|
||||
self.query_one("#status-bar", Static).update("就绪")
|
||||
input_widget.focus()
|
||||
|
||||
def _set_input_state(self, disabled: bool):
|
||||
"""设置输入控件状态"""
|
||||
self.query_one("#message-input", Input).disabled = disabled
|
||||
self.query_one("#send-button", Button).disabled = disabled
|
||||
|
||||
async def action_save_session(self):
|
||||
"""保存当前会话到文件"""
|
||||
if not self.current_session:
|
||||
self.notify("无当前会话", severity="error")
|
||||
return
|
||||
|
||||
# 默认保存到 data/chat_sessions/ 目录
|
||||
save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions"
|
||||
save_dir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
file_path = save_dir / f"{self.current_session.session_id}.json"
|
||||
self.current_session.save_to_file(file_path)
|
||||
|
||||
self.notify(f"会话已保存到 {file_path}", severity="information")
|
||||
|
||||
async def action_load_session(self):
|
||||
"""从文件加载会话"""
|
||||
# 简化实现:加载默认目录下的第一个会话文件
|
||||
save_dir = Path(config_var.get()["paths"]["data"]) / "chat_sessions"
|
||||
if not save_dir.exists():
|
||||
self.notify(f"目录不存在: {save_dir}", severity="error")
|
||||
return
|
||||
|
||||
session_files = list(save_dir.glob("*.json"))
|
||||
if not session_files:
|
||||
self.notify("未找到会话文件", severity="error")
|
||||
return
|
||||
|
||||
# 使用第一个文件(在实际应用中可以让用户选择)
|
||||
file_path = session_files[0]
|
||||
|
||||
try:
|
||||
# 获取 LLM 提供者
|
||||
provider_name = config_var.get()["services"]["llm"]
|
||||
provider_config = config_var.get()["providers"]["llm"][provider_name]
|
||||
from heurams.providers.llm import providers as prov
|
||||
|
||||
llm_provider = prov[provider_name](provider_config)
|
||||
|
||||
# 加载会话
|
||||
self.current_session = ChatSession.load_from_file(file_path, llm_provider)
|
||||
|
||||
# 更新聊天管理器
|
||||
self.chat_manager.sessions[self.current_session.session_id] = (
|
||||
self.current_session
|
||||
)
|
||||
|
||||
# 更新UI
|
||||
self.query_one("#current-session-label", Static).update(
|
||||
f"{self.current_session.session_id}"
|
||||
)
|
||||
self._display_history()
|
||||
|
||||
self.notify(f"已加载会话: {file_path.name}", severity="information")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("加载会话失败: %s", e)
|
||||
self.notify(f"加载失败: {str(e)}", severity="error")
|
||||
|
||||
async def action_new_session(self):
|
||||
"""创建新会话"""
|
||||
# 简单实现:使用时间戳作为会话ID
|
||||
import time
|
||||
|
||||
new_session_id = f"session_{int(time.time())}"
|
||||
|
||||
self.current_session = self.chat_manager.get_session(new_session_id)
|
||||
|
||||
# 更新UI
|
||||
self.query_one("#current-session-label", Static).update(
|
||||
f"{self.current_session.session_id}"
|
||||
)
|
||||
self._display_history()
|
||||
|
||||
self.notify(f"已创建新会话: {new_session_id}", severity="information")
|
||||
self.query_one("#message-input", Input).focus()
|
||||
|
||||
async def action_clear_history(self):
|
||||
"""清空当前会话历史"""
|
||||
if not self.current_session:
|
||||
return
|
||||
|
||||
self.current_session.clear_history()
|
||||
self._display_history()
|
||||
self.notify("历史已清空", severity="information")
|
||||
|
||||
async def action_set_system_prompt(self):
|
||||
"""设置系统提示词"""
|
||||
if not self.current_session:
|
||||
return
|
||||
|
||||
# 使用输入框获取新提示词
|
||||
input_widget = self.query_one("#message-input", Input)
|
||||
current_value = input_widget.value
|
||||
|
||||
# 临时修改输入框提示
|
||||
input_widget.placeholder = "输入系统提示词... (按 Enter 确认, Esc 取消)"
|
||||
input_widget.value = self.current_session.system_prompt
|
||||
|
||||
# 等待用户输入
|
||||
self.notify("请输入系统提示词,按 Enter 确认", severity="information")
|
||||
|
||||
# 实际应用中需要更复杂的交互,这里简化处理
|
||||
# 用户手动输入后按 Enter 会触发 on_input_submitted
|
||||
# 这里我们只修改占位符,实际系统提示词设置需要额外界面
|
||||
|
||||
def action_focus_input(self):
|
||||
"""聚焦到输入框"""
|
||||
self.query_one("#message-input", Input).focus()
|
||||
|
||||
def action_go_back(self):
|
||||
"""返回上级屏幕"""
|
||||
self.app.pop_screen()
|
||||
@@ -24,15 +24,13 @@ class NavigatorScreen(ModalScreen):
|
||||
|
||||
SCREENS = [
|
||||
("仪表盘", "dashboard"),
|
||||
("电台", "radio"),
|
||||
("语言模型集成", "llmchat"),
|
||||
# ("创建仓库", "repo_creator"),
|
||||
("缓存管理器", "precache_all"),
|
||||
("收藏夹管理器", FavoriteManagerScreen),
|
||||
("配置设置", "config"),
|
||||
("收藏夹", FavoriteManagerScreen),
|
||||
# ("配置设置", "config"),
|
||||
# ("调试日志", "logviewer"),
|
||||
("同步工具", "synctool"),
|
||||
("关于此软件", "about"),
|
||||
("调试日志", "logviewer"),
|
||||
# ("同步工具", "synctool"),
|
||||
# ("仓库编辑器", "repo_editor"),
|
||||
]
|
||||
|
||||
|
||||
@@ -1,217 +0,0 @@
|
||||
"""用于筛选当日记忆的条目 以音频形式重放"""
|
||||
|
||||
""" "前进电台" 界面"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, ScrollableContainer
|
||||
from textual.reactive import reactive
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Label, Static
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
from heurams.kernel.repolib import Repo
|
||||
from heurams.context import config_var
|
||||
from heurams.services.audio_service import play_by_path
|
||||
from heurams.services.hasher import get_md5
|
||||
from heurams.services.logger import get_logger
|
||||
from heurams.services.tts_service import convertor
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class RadioScreen(Screen):
|
||||
SUB_TITLE = "电台"
|
||||
|
||||
BINDINGS = [
|
||||
("q", "go_back", "返回"),
|
||||
("space", "toggle_play", "播放/暂停"),
|
||||
]
|
||||
|
||||
# 当前播放的原子索引
|
||||
current_index = reactive(0)
|
||||
# 播放状态: 'stopped', 'playing', 'paused'
|
||||
play_state = reactive("stopped")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str | None = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(name, id, classes)
|
||||
self._organizer()
|
||||
|
||||
def _organizer(self):
|
||||
repodirs = Repo.probe_valid_repos_in_dir(Path(config_var.get()['paths']['data']) / 'repo')
|
||||
repos = list(map(lambda repodir: Repo.create_from_repodir(repodir), repodirs))
|
||||
for repo in repos:
|
||||
last_modify = 0.0
|
||||
for i in repo.ident_index:
|
||||
e = pt.Electron.create_on_electonic_data(
|
||||
electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
|
||||
)
|
||||
last_modify = max(last_modify, e.las())
|
||||
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with Container(id="main"):
|
||||
yield Label("[b]前进电台[/b]", classes="title")
|
||||
yield Static(f"共 {len(self.atoms)} 条当日记忆", id="status")
|
||||
with Container(id="controls"):
|
||||
yield Button("播放", id="play", variant="success")
|
||||
yield Button("暂停", id="pause", variant="primary")
|
||||
yield Button("上一首", id="prev", variant="default")
|
||||
yield Button("下一首", id="next", variant="default")
|
||||
yield Button("停止", id="stop", variant="error")
|
||||
yield ScrollableContainer(id="playlist")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""挂载后更新播放列表显示"""
|
||||
self._update_playlist()
|
||||
|
||||
def _filter_due_atoms(self) -> List[pt.Atom]:
|
||||
"""筛选当日需要复习的原子(已激活且到期)"""
|
||||
atoms = []
|
||||
for ident in self.repo.ident_index:
|
||||
n = pt.Nucleon.create_on_nucleonic_data(
|
||||
nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(ident)
|
||||
)
|
||||
e = pt.Electron.create_on_electonic_data(
|
||||
electronic_data=self.repo.electronic_data_lict.get_itemic_unit(ident)
|
||||
)
|
||||
a = pt.Atom(n, e, self.repo.orbitic_data)
|
||||
# 仅选择已激活且到期的原子
|
||||
if (
|
||||
a.registry["electron"].is_activated()
|
||||
and a.registry["electron"].is_due()
|
||||
):
|
||||
atoms.append(a)
|
||||
return atoms
|
||||
|
||||
def _update_playlist(self) -> None:
|
||||
"""更新播放列表显示"""
|
||||
container = self.query_one("#playlist")
|
||||
container.remove_children()
|
||||
for idx, atom in enumerate(self.atoms):
|
||||
content = atom.registry["nucleon"].get("content", "无内容")
|
||||
prefix = "▶ " if idx == self.current_index else " "
|
||||
widget = Static(f"{prefix}{idx+1}. {content[:50]}...")
|
||||
widget.set_class(idx == self.current_index, "current")
|
||||
container.mount(widget)
|
||||
|
||||
def _get_audio_path(self, atom: pt.Atom) -> Path:
|
||||
"""返回音频文件路径,若不存在则生成"""
|
||||
tts_text = atom.registry["nucleon"].get("tts_text", "")
|
||||
if not tts_text:
|
||||
tts_text = atom.registry["nucleon"].get("content", "")
|
||||
voice_dir = Path(config_var.get()["paths"]["data"]) / "cache" / "voice"
|
||||
voice_dir.mkdir(parents=True, exist_ok=True)
|
||||
path = voice_dir / f"{get_md5(tts_text)}.wav"
|
||||
if not path.exists():
|
||||
convertor(tts_text, path)
|
||||
return path
|
||||
|
||||
async def _play_atom(self, idx: int) -> None:
|
||||
"""播放指定索引的原子(异步)"""
|
||||
if idx < 0 or idx >= len(self.atoms):
|
||||
return
|
||||
atom = self.atoms[idx]
|
||||
try:
|
||||
path = self._get_audio_path(atom)
|
||||
self._current_path = path
|
||||
# 在后台线程中播放,避免阻塞UI
|
||||
await self.run_worker(
|
||||
lambda: play_by_path(path), exclusive=True, thread=True
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("播放失败: %s", e)
|
||||
|
||||
def _stop_playback(self) -> None:
|
||||
"""停止当前播放"""
|
||||
if self._play_task and not self._play_task.done():
|
||||
self._play_task.cancel()
|
||||
self._play_task = None
|
||||
self._current_path = None
|
||||
self.play_state = "stopped"
|
||||
|
||||
async def _play_current(self) -> None:
|
||||
"""播放当前索引的原子"""
|
||||
self._stop_playback()
|
||||
self.play_state = "playing"
|
||||
self._play_task = asyncio.create_task(self._play_atom(self.current_index))
|
||||
try:
|
||||
await self._play_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
if self.play_state == "playing":
|
||||
self.play_state = "stopped"
|
||||
|
||||
# 按钮事件处理
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
button_id = event.button.id
|
||||
if button_id == "play":
|
||||
self.action_toggle_play()
|
||||
elif button_id == "pause":
|
||||
self.action_pause()
|
||||
elif button_id == "prev":
|
||||
self.action_prev()
|
||||
elif button_id == "next":
|
||||
self.action_next()
|
||||
elif button_id == "stop":
|
||||
self.action_stop()
|
||||
|
||||
# 键盘动作
|
||||
def action_toggle_play(self) -> None:
|
||||
if self.play_state == "playing":
|
||||
self.action_pause()
|
||||
else:
|
||||
self.action_play()
|
||||
|
||||
def action_play(self) -> None:
|
||||
if self.play_state != "playing":
|
||||
if self.play_state == "paused":
|
||||
# 恢复播放(目前暂停功能简单实现为停止)
|
||||
self.play_state = "playing"
|
||||
else:
|
||||
asyncio.create_task(self._play_current())
|
||||
|
||||
def action_pause(self) -> None:
|
||||
if self.play_state == "playing":
|
||||
self._stop_playback()
|
||||
self.play_state = "paused"
|
||||
|
||||
def action_stop(self) -> None:
|
||||
self._stop_playback()
|
||||
self.play_state = "stopped"
|
||||
|
||||
def action_next(self) -> None:
|
||||
if self.current_index < len(self.atoms) - 1:
|
||||
self.current_index += 1
|
||||
self._update_playlist()
|
||||
if self.play_state == "playing":
|
||||
asyncio.create_task(self._play_current())
|
||||
|
||||
def action_prev(self) -> None:
|
||||
if self.current_index > 0:
|
||||
self.current_index -= 1
|
||||
self._update_playlist()
|
||||
if self.play_state == "playing":
|
||||
asyncio.create_task(self._play_current())
|
||||
|
||||
def action_go_back(self) -> None:
|
||||
self._stop_playback()
|
||||
self.app.pop_screen()
|
||||
|
||||
# 响应式更新
|
||||
def watch_current_index(self, old: int, new: int) -> None:
|
||||
self._update_playlist()
|
||||
|
||||
def watch_play_state(self, old: str, new: str) -> None:
|
||||
# 更新按钮状态(可在此添加样式变化)
|
||||
pass
|
||||
@@ -1,166 +0,0 @@
|
||||
"""仓库创建向导界面"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import toml
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Input, Label, Markdown, Select
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.version import ver
|
||||
|
||||
|
||||
class RepoCreatorScreen(Screen):
|
||||
BINDINGS = [("q", "go_back", "返回")]
|
||||
SUB_TITLE = "仓库创建向导"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
|
||||
def search_templates(self):
|
||||
from pathlib import Path
|
||||
|
||||
from heurams.context import config_var
|
||||
|
||||
template_dir = Path(config_var.get()["paths"]["data"]) / "templates"
|
||||
templates = list()
|
||||
for i in template_dir.iterdir():
|
||||
if i.name.endswith(".toml"):
|
||||
try:
|
||||
import toml
|
||||
|
||||
with open(i, "r") as f:
|
||||
dic = toml.load(f)
|
||||
desc = dic["__metadata__.attribution"]["desc"]
|
||||
templates.append(desc + " (" + i.name + ")")
|
||||
except Exception as e:
|
||||
templates.append(f"无描述模板 ({i.name})")
|
||||
print(e)
|
||||
return templates
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with ScrollableContainer(id="vice_container"):
|
||||
yield Label(f"[b]空白单元集创建向导\n")
|
||||
yield Markdown(
|
||||
"> 提示: 你可能注意到当选中文本框时底栏和操作按键绑定将被覆盖 \n只需选中(使用鼠标或 Tab)选择框即可恢复底栏功能"
|
||||
)
|
||||
yield Markdown("1. 键入单元集名称")
|
||||
yield Input(placeholder="单元集名称", id="name_input")
|
||||
yield Markdown(
|
||||
"> 单元集名称不应与现有单元集重复. \n> 新的单元集文件将创建在 ./nucleon/你输入的名称.toml"
|
||||
)
|
||||
yield Label(f"\n")
|
||||
yield Markdown("2. 选择单元集模板")
|
||||
LINES = self.search_templates()
|
||||
"""带有宏支持的空白单元集 ({ver})
|
||||
古诗词模板单元集 ({ver})
|
||||
英语词汇和短语模板单元集 ({ver})
|
||||
"""
|
||||
yield Select.from_values(LINES, prompt="选择类型", id="template_select")
|
||||
yield Markdown("> 新单元集的版本号将和主程序版本保持同步")
|
||||
yield Label(f"\n")
|
||||
yield Markdown("3. 输入常见附加元数据 (可选)")
|
||||
yield Input(placeholder="作者", id="author_input")
|
||||
yield Input(placeholder="内容描述", id="desc_input")
|
||||
yield Button(
|
||||
"新建空白单元集",
|
||||
id="submit_button",
|
||||
variant="primary",
|
||||
classes="start-button",
|
||||
)
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self):
|
||||
self.query_one("#submit_button").focus()
|
||||
|
||||
def action_go_back(self):
|
||||
self.app.pop_screen()
|
||||
|
||||
def action_quit_app(self):
|
||||
self.app.exit()
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
event.stop()
|
||||
if event.button.id == "submit_button":
|
||||
# 获取输入值
|
||||
name_input = self.query_one("#name_input")
|
||||
template_select = self.query_one("#template_select")
|
||||
author_input = self.query_one("#author_input")
|
||||
desc_input = self.query_one("#desc_input")
|
||||
|
||||
name = name_input.value.strip() # type: ignore
|
||||
author = author_input.value.strip() # type: ignore
|
||||
desc = desc_input.value.strip() # type: ignore
|
||||
selected = template_select.value # type: ignore
|
||||
|
||||
# 验证
|
||||
if not name:
|
||||
self.notify("单元集名称不能为空", severity="error")
|
||||
return
|
||||
|
||||
# 获取配置路径
|
||||
config = config_var.get()
|
||||
nucleon_dir = Path(config["paths"]["nucleon_dir"])
|
||||
template_dir = Path(config["paths"]["template_dir"])
|
||||
|
||||
# 检查文件是否已存在
|
||||
nucleon_path = nucleon_dir / f"{name}.toml"
|
||||
if nucleon_path.exists():
|
||||
self.notify(f"单元集 '{name}' 已存在", severity="error")
|
||||
return
|
||||
|
||||
# 确定模板文件
|
||||
if selected is None:
|
||||
self.notify("请选择一个模板", severity="error")
|
||||
return
|
||||
# selected 是描述字符串, 格式如 "描述 (filename.toml)"
|
||||
# 提取文件名
|
||||
import re
|
||||
|
||||
match = re.search(r"\(([^)]+)\)$", selected)
|
||||
if not match:
|
||||
self.notify("模板选择格式无效", severity="error")
|
||||
return
|
||||
template_filename = match.group(1)
|
||||
template_path = template_dir / template_filename
|
||||
if not template_path.exists():
|
||||
self.notify(f"模板文件不存在: {template_filename}", severity="error")
|
||||
return
|
||||
|
||||
# 加载模板
|
||||
try:
|
||||
with open(template_path, "r", encoding="utf-8") as f:
|
||||
template_data = toml.load(f)
|
||||
except Exception as e:
|
||||
self.notify(f"加载模板失败: {e}", severity="error")
|
||||
return
|
||||
|
||||
# 更新元数据
|
||||
metadata = template_data.get("__metadata__", {})
|
||||
attribution = metadata.get("attribution", {})
|
||||
if author:
|
||||
attribution["author"] = author
|
||||
if desc:
|
||||
attribution["desc"] = desc
|
||||
attribution["name"] = name
|
||||
# 可选: 设置版本
|
||||
attribution["version"] = ver
|
||||
metadata["attribution"] = attribution
|
||||
template_data["__metadata__"] = metadata
|
||||
|
||||
# 确保 nucleon_dir 存在
|
||||
nucleon_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 写入新文件
|
||||
try:
|
||||
with open(nucleon_path, "w", encoding="utf-8") as f:
|
||||
toml.dump(template_data, f)
|
||||
except Exception as e:
|
||||
self.notify(f"保存单元集失败: {e}", severity="error")
|
||||
return
|
||||
|
||||
self.notify(f"单元集 '{name}' 创建成功")
|
||||
self.app.pop_screen()
|
||||
@@ -1,267 +0,0 @@
|
||||
"""仓库编辑器, 使用TextArea控件等实现仓库配置编辑"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import toml
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Horizontal, ScrollableContainer, Vertical
|
||||
from textual.reactive import reactive
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import (
|
||||
Button,
|
||||
Footer,
|
||||
Header,
|
||||
Label,
|
||||
ListItem,
|
||||
ListView,
|
||||
Static,
|
||||
TextArea,
|
||||
)
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.kernel.repolib import Repo
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class RepoEditorScreen(Screen):
|
||||
"""仓库编辑器屏幕"""
|
||||
|
||||
SUB_TITLE = "仓库编辑器"
|
||||
|
||||
BINDINGS = [
|
||||
("q", "go_back", "返回"),
|
||||
("s", "save_file", "保存"),
|
||||
("r", "reload_file", "重载"),
|
||||
("d", "toggle_dark", ""),
|
||||
]
|
||||
|
||||
# 当前选择的仓库路径
|
||||
selected_repo_path: reactive[Optional[Path]] = reactive(None)
|
||||
# 当前选择的文件名
|
||||
selected_filename: reactive[Optional[str]] = reactive(None)
|
||||
# 文件内容
|
||||
file_content: reactive[str] = reactive("")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo: Optional[Repo] = None,
|
||||
name: str | None = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(name, id, classes)
|
||||
self.repo = repo
|
||||
self.repo_dir: Optional[Path] = None
|
||||
self.file_list = []
|
||||
if repo is not None and repo.source is not None:
|
||||
self.repo_dir = repo.source
|
||||
self._load_file_list()
|
||||
# selected_repo_path 将在 on_mount 中设置,避免触发watch时组件未就绪
|
||||
|
||||
def _load_file_list(self) -> None:
|
||||
"""加载仓库目录下的文件列表"""
|
||||
if self.repo_dir is None:
|
||||
return
|
||||
self.file_list = []
|
||||
for fname in Repo.file_mapping.values():
|
||||
fpath = self.repo_dir / fname
|
||||
if fpath.exists():
|
||||
self.file_list.append(fname)
|
||||
# 也可能存在其他文件,但暂时只支持标准文件
|
||||
self.file_list.sort()
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""组合界面组件"""
|
||||
yield Header(show_clock=True)
|
||||
with Container(id="main_container"):
|
||||
with Horizontal(id="top_panel"):
|
||||
# 左侧: 仓库选择
|
||||
with Vertical(id="repo_selector", classes="panel"):
|
||||
yield Label("仓库列表", classes="panel-title")
|
||||
yield ListView(
|
||||
*[
|
||||
ListItem(Label(repo_dir.name))
|
||||
for repo_dir in self._get_repo_dirs()
|
||||
],
|
||||
id="repo_list",
|
||||
classes="list-view",
|
||||
)
|
||||
# 中间: 文件列表
|
||||
with Vertical(id="file_selector", classes="panel"):
|
||||
yield Label("文件列表", classes="panel-title")
|
||||
yield ListView(
|
||||
*[ListItem(Label(fname)) for fname in self.file_list],
|
||||
id="file_list",
|
||||
classes="list-view",
|
||||
)
|
||||
# 右侧: 编辑区域
|
||||
with Vertical(id="editor_panel", classes="panel"):
|
||||
yield Label("编辑文件", classes="panel-title")
|
||||
yield TextArea(
|
||||
id="text_editor",
|
||||
language="plaintext",
|
||||
classes="text-editor",
|
||||
)
|
||||
with Horizontal(id="button_bar"):
|
||||
yield Button("保存", id="save_button", variant="primary")
|
||||
yield Button("重载", id="reload_button", variant="default")
|
||||
yield Button("返回", id="back_button", variant="error")
|
||||
yield Footer()
|
||||
|
||||
def _get_repo_dirs(self) -> list[Path]:
|
||||
"""获取data/repo/下所有有效仓库目录"""
|
||||
repo_root = Path(config_var.get()["paths"]["data"]) / "repo"
|
||||
repo_dirs = []
|
||||
if repo_root.exists():
|
||||
for entry in repo_root.iterdir():
|
||||
if entry.is_dir():
|
||||
# 检查是否存在 manifest.toml
|
||||
if (entry / "manifest.toml").exists():
|
||||
repo_dirs.append(entry)
|
||||
return repo_dirs
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""挂载组件时初始化"""
|
||||
# 如果已有仓库,设置 selected_repo_path 以触发watch(此时组件已就绪)
|
||||
if self.repo_dir is not None:
|
||||
self.selected_repo_path = self.repo_dir
|
||||
# 焦点放在仓库列表
|
||||
self.query_one("#repo_list", ListView).focus()
|
||||
|
||||
def watch_selected_repo_path(
|
||||
self, old_path: Optional[Path], new_path: Optional[Path]
|
||||
) -> None:
|
||||
"""当选择的仓库路径变化时,加载文件列表"""
|
||||
if new_path is None:
|
||||
self.file_list = []
|
||||
self.selected_filename = None
|
||||
self.file_content = ""
|
||||
return
|
||||
self.repo_dir = new_path
|
||||
self._load_file_list()
|
||||
# 如果组件已挂载,更新UI
|
||||
if self.is_mounted:
|
||||
file_list_view = self.query_one("#file_list", ListView)
|
||||
file_list_view.clear()
|
||||
for fname in self.file_list:
|
||||
file_list_view.append(ListItem(Label(fname)))
|
||||
# 清空编辑器
|
||||
self.query_one("#text_editor", TextArea).text = ""
|
||||
self.selected_filename = None
|
||||
|
||||
def watch_selected_filename(
|
||||
self, old_name: Optional[str], new_name: Optional[str]
|
||||
) -> None:
|
||||
"""当选择的文件名变化时,加载文件内容"""
|
||||
if new_name is None or self.repo_dir is None:
|
||||
self.file_content = ""
|
||||
return
|
||||
file_path = self.repo_dir / new_name
|
||||
if not file_path.exists():
|
||||
self.notify(f"文件不存在: {new_name}", severity="error")
|
||||
return
|
||||
try:
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
self.file_content = content
|
||||
# 如果组件已挂载,更新编辑器
|
||||
if self.is_mounted:
|
||||
editor = self.query_one("#text_editor", TextArea)
|
||||
editor.text = content
|
||||
# 根据文件后缀设置语言
|
||||
if new_name.endswith(".toml"):
|
||||
editor.language = "toml"
|
||||
elif new_name.endswith(".json"):
|
||||
editor.language = "json"
|
||||
else:
|
||||
editor.language = "plaintext"
|
||||
except Exception as e:
|
||||
logger.error(f"读取文件失败: {e}")
|
||||
self.notify(f"读取文件失败: {e}", severity="error")
|
||||
|
||||
def watch_file_content(self, old_content: str, new_content: str) -> None:
|
||||
"""当文件内容变化时更新编辑器(仅当外部改变时)"""
|
||||
# 目前不需要做任何事情,因为编辑器内容已绑定
|
||||
pass
|
||||
|
||||
def on_list_view_selected(self, event) -> None:
|
||||
"""处理列表项选择事件"""
|
||||
if not isinstance(event.item, ListItem):
|
||||
return
|
||||
list_id = event.list_view.id
|
||||
selected_label = event.item.query_one(Label)
|
||||
selected_text = str(selected_label.render())
|
||||
|
||||
if list_id == "repo_list":
|
||||
# 用户选择了仓库
|
||||
repo_root = Path(config_var.get()["paths"]["data"]) / "repo"
|
||||
selected_dir = repo_root / selected_text
|
||||
if selected_dir.exists():
|
||||
self.selected_repo_path = selected_dir
|
||||
elif list_id == "file_list":
|
||||
# 用户选择了文件
|
||||
if self.repo_dir is None:
|
||||
self.notify("请先选择仓库", severity="warning")
|
||||
return
|
||||
self.selected_filename = selected_text
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
"""处理按钮点击事件"""
|
||||
event.stop()
|
||||
if event.button.id == "save_button":
|
||||
self.action_save_file()
|
||||
elif event.button.id == "reload_button":
|
||||
self.action_reload_file()
|
||||
elif event.button.id == "back_button":
|
||||
self.action_go_back()
|
||||
|
||||
def action_save_file(self) -> None:
|
||||
"""保存当前编辑的文件"""
|
||||
if self.repo_dir is None or self.selected_filename is None:
|
||||
self.notify("未选择仓库或文件", severity="warning")
|
||||
return
|
||||
file_path = self.repo_dir / self.selected_filename
|
||||
editor = self.query_one("#text_editor", TextArea)
|
||||
new_content = editor.text
|
||||
# 验证格式
|
||||
try:
|
||||
if self.selected_filename.endswith(".toml"):
|
||||
toml.loads(new_content) # 验证TOML
|
||||
elif self.selected_filename.endswith(".json"):
|
||||
json.loads(new_content) # 验证JSON
|
||||
except Exception as e:
|
||||
self.notify(f"格式错误: {e}", severity="error")
|
||||
return
|
||||
# 写入文件
|
||||
try:
|
||||
file_path.write_text(new_content, encoding="utf-8")
|
||||
self.notify("保存成功", severity="information")
|
||||
except Exception as e:
|
||||
logger.error(f"保存文件失败: {e}")
|
||||
self.notify(f"保存文件失败: {e}", severity="error")
|
||||
|
||||
def action_reload_file(self) -> None:
|
||||
"""重新加载当前文件(放弃修改)"""
|
||||
if self.repo_dir is None or self.selected_filename is None:
|
||||
self.notify("未选择仓库或文件", severity="warning")
|
||||
return
|
||||
file_path = self.repo_dir / self.selected_filename
|
||||
try:
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
editor = self.query_one("#text_editor", TextArea)
|
||||
editor.text = content
|
||||
self.notify("已重载", severity="information")
|
||||
except Exception as e:
|
||||
logger.error(f"重载文件失败: {e}")
|
||||
self.notify(f"重载文件失败: {e}", severity="error")
|
||||
|
||||
def action_go_back(self) -> None:
|
||||
"""返回上一屏幕"""
|
||||
self.app.pop_screen()
|
||||
|
||||
def action_toggle_dark(self) -> None:
|
||||
"""切换暗色模式"""
|
||||
self.app.dark = not self.app.dark
|
||||
Reference in New Issue
Block a user