From 4ca9c65bead4e02771fe1d93a45f28ca4cdfea50 Mon Sep 17 00:00:00 2001 From: pluvium27 Date: Mon, 20 Apr 2026 16:30:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=8C=96,=20=E6=94=B9=E8=BF=9B=E4=BB=AA=E8=A1=A8=E7=9B=98,=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=A4=9ACSS=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/config/global.toml | 6 +- data/config/interface/global.toml | 2 +- .../{高考必背古诗文-0.toml => cngk-0.toml} | 0 .../repo/{高考必背古诗文.toml => cngk.toml} | 0 data/repo/cngk-t/manifest.toml | 1 + examples/simplemem.py | 8 +- src/heurams/__main__.py | 4 +- src/heurams/context.py | 1 + src/heurams/interface/__init__.py | 9 +- src/heurams/interface/__main__.py | 18 +- .../interface/css/screens/dashboard.tcss | 18 ++ src/heurams/interface/screens/about.py | 40 ++-- src/heurams/interface/screens/dashboard.py | 168 +++++++++-------- src/heurams/interface/screens/favmgr.py | 8 +- src/heurams/interface/screens/memoqueue.py | 16 +- src/heurams/interface/screens/navigator.py | 6 +- src/heurams/interface/screens/precache.py | 58 ++++-- src/heurams/interface/screens/preparation.py | 49 +++-- src/heurams/interface/screens/setting.py | 172 +++++++++++++----- .../interface/widgets/base_puzzle_widget.py | 4 +- src/heurams/interface/widgets/cloze_puzzle.py | 2 +- src/heurams/interface/widgets/mcq_puzzle.py | 4 +- src/heurams/interface/widgets/placeholder.py | 4 +- src/heurams/interface/widgets/recognition.py | 4 +- src/heurams/kernel/algorithms/base.py | 3 +- src/heurams/kernel/algorithms/nsp0.py | 11 +- src/heurams/kernel/algorithms/sm15m.py | 2 +- src/heurams/kernel/algorithms/sm2.py | 3 +- src/heurams/kernel/particles/atom.py | 2 +- src/heurams/kernel/particles/electron.py | 9 +- src/heurams/kernel/particles/nucleon.py | 16 +- src/heurams/kernel/reactor/fission.py | 2 +- src/heurams/kernel/repolib/repo.py | 37 ++-- src/heurams/services/audio_service.py | 8 +- src/heurams/services/config.py | 41 +++-- src/heurams/services/epath.py | 50 +++-- src/heurams/services/exceptions.py | 3 +- src/heurams/services/favorite_service.py | 2 +- src/heurams/services/textproc.py | 6 +- src/heurams/services/timer.py | 9 +- src/heurams/services/tts_service.py | 5 +- src/heurams/tools/csv2payload.py | 69 +++---- src/heurams/tools/zmqclient.py | 20 +- 43 files changed, 551 insertions(+), 349 deletions(-) rename data/config/repo/{高考必背古诗文-0.toml => cngk-0.toml} (100%) rename data/config/repo/{高考必背古诗文.toml => cngk.toml} (100%) create mode 100644 src/heurams/interface/css/screens/dashboard.tcss diff --git a/data/config/global.toml b/data/config/global.toml index 47d5999..c4ef4c2 100644 --- a/data/config/global.toml +++ b/data/config/global.toml @@ -1,7 +1,7 @@ -zmq_debug = false -_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开调试服务器\n可作为 HeurAMS 执行任意 python 代码, 如无必要请关闭" +zmq_debug = true +_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开一个服务器\n调试工具可远程在 HeurAMS 内执行任意 python 代码, 无必要请关闭" zmq_debug_port = 5555 -_zmq_debug_port_desc = "ZMQ 调试服务器端口" +_zmq_debug_port_desc = "[调试] ZMQ 调试服务器端口" enable_built_in_interface = false _enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)" _paths_desc = "用户数据路径定义" diff --git a/data/config/interface/global.toml b/data/config/interface/global.toml index d9d800b..e6e60fd 100644 --- a/data/config/interface/global.toml +++ b/data/config/interface/global.toml @@ -4,7 +4,7 @@ quick_pass = true _quick_pass_desc = "[调试] 启用快速应答功能(跳过测验)" auto_pass = false _auto_pass_desc = "[调试] 自动通过测试模式" -scheduled_num = "420" +scheduled_num = 420 _scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)" algorithm = "NSP-0" _algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)" diff --git a/data/config/repo/高考必背古诗文-0.toml b/data/config/repo/cngk-0.toml similarity index 100% rename from data/config/repo/高考必背古诗文-0.toml rename to data/config/repo/cngk-0.toml diff --git a/data/config/repo/高考必背古诗文.toml b/data/config/repo/cngk.toml similarity index 100% rename from data/config/repo/高考必背古诗文.toml rename to data/config/repo/cngk.toml diff --git a/data/repo/cngk-t/manifest.toml b/data/repo/cngk-t/manifest.toml index 4a5e9c3..95a8a4a 100644 --- a/data/repo/cngk-t/manifest.toml +++ b/data/repo/cngk-t/manifest.toml @@ -1,3 +1,4 @@ title = "高考必背古诗文-0" +package = "cngk-0" author = "__heurams__" desc = "高考古诗文 60 篇" diff --git a/examples/simplemem.py b/examples/simplemem.py index 55c0a9f..44d4483 100644 --- a/examples/simplemem.py +++ b/examples/simplemem.py @@ -5,14 +5,12 @@ import heurams.kernel.particles as pt import heurams.kernel.repolib as repolib from heurams.services.textproc import truncate -repo = repolib.Repo.create_from_repodir(Path("./test_repo")) +repo = repolib.Repo.from_repodir(Path("./test_repo")) alist = list() print(repo.ident_index) for i in repo.ident_index: - n = pt.Nucleon.create_on_nucleonic_data( - nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) - ) - e = pt.Electron.create_on_electonic_data( + n = pt.Nucleon.from_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)) + e = pt.Electron.from_data( electronic_data=repo.electronic_data_lict.get_itemic_unit(i) ) print(n) diff --git a/src/heurams/__main__.py b/src/heurams/__main__.py index 81065af..942f389 100644 --- a/src/heurams/__main__.py +++ b/src/heurams/__main__.py @@ -1,5 +1,6 @@ import heurams.services.version as ver + # __main__.py def main(): prompt = f"""HeurAMS {ver.ver} 已经被成功地安装在系统中. @@ -17,5 +18,6 @@ python 代指您使用的解释器, 在某些发行版中可能是 python3, 而 注意: 一个常见的误区是, 执行 interface 下的 __main__.py 运行基本用户界面, 这会导致 Python 上下文环境异常, 请不要这样做.""" print(prompt) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/heurams/context.py b/src/heurams/context.py index bd30f6a..b0e008d 100644 --- a/src/heurams/context.py +++ b/src/heurams/context.py @@ -30,6 +30,7 @@ config_var: ContextVar[ConfigDict].get = ContextVar( ) """配置对象的全局引用对象.""" + class ConfigContext: """ 功能完备的上下文管理器 diff --git a/src/heurams/interface/__init__.py b/src/heurams/interface/__init__.py index 8454e4f..b01807a 100644 --- a/src/heurams/interface/__init__.py +++ b/src/heurams/interface/__init__.py @@ -1,9 +1,11 @@ from time import sleep, perf_counter + print("欢迎使用基本用户界面!") print("加载配置与上下文... ", end="", flush=True) _start1 = perf_counter() _start = perf_counter() from heurams.context import * + _end = perf_counter() print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") @@ -11,6 +13,7 @@ print("加载用户界面框架... ", end="", flush=True) _start = perf_counter() from textual.app import App from textual.widgets import Button + _end = perf_counter() print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") @@ -22,6 +25,7 @@ from .screens.navigator import NavigatorScreen from .screens.precache import PrecachingScreen from .screens.setting import SettingScreen from .screens.synctool import SyncScreen + _end = perf_counter() print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") @@ -29,9 +33,12 @@ print(f"组件目录: {rootdir}") print(f"工作目录: {workdir}") _end1 = perf_counter() print(f"前置工作共计耗时: {round(1000 * (_end1 - _start1))}ms") + + class HeurAMSApp(App): TITLE = "潜进" CSS_PATH = "css/main.tcss" + css_dir = pathlib.Path("css").resolve() SUB_TITLE = "启发式辅助记忆调度器" BINDINGS = [ ("q", "go_back", "退出"), @@ -72,4 +79,4 @@ class HeurAMSApp(App): def panic(self, *args): self._close_messages_no_wait() - raise self._exception \ No newline at end of file + raise self._exception diff --git a/src/heurams/interface/__main__.py b/src/heurams/interface/__main__.py index 65f8454..5dbacab 100644 --- a/src/heurams/interface/__main__.py +++ b/src/heurams/interface/__main__.py @@ -7,6 +7,7 @@ import pickle logger = get_logger(__name__) + def environment_check(): from pathlib import Path @@ -23,11 +24,12 @@ def environment_check(): print(f"找到 {i}") logger.debug("环境检查完成") + def start_debug_server(app): logger = get_logger("zmq_debug") context = zmq.Context() socket = context.socket(zmq.REP) - port = config_var.get()['global'].get('zmq_debug_port', 5555) + port = config_var.get()["global"].get("zmq_debug_port", 5555) socket.bind(f"tcp://*:{port}") logger.info(f"ZMQ Debug server started on port {port}") first = 1 @@ -36,7 +38,7 @@ def start_debug_server(app): code = pickle.loads(msg) namespace = {"app": app, "logger": logger, "config_var": config_var} if first: - app.title += ' [调试已连接]' + app.title += " [调试已连接]" first = 0 try: # 先尝试 eval @@ -52,15 +54,17 @@ def start_debug_server(app): except Exception as e: socket.send(pickle.dumps(f"错误: {e}")) + def main(): environment_check() - + app = HeurAMSApp() - - if config_var.get()['global'].get('zmq_debug', False): + + if config_var.get()["global"].get("zmq_debug", False): threading.Thread(target=start_debug_server, args=(app,), daemon=True).start() - + app.run(inline=False) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/heurams/interface/css/screens/dashboard.tcss b/src/heurams/interface/css/screens/dashboard.tcss new file mode 100644 index 0000000..05ae942 --- /dev/null +++ b/src/heurams/interface/css/screens/dashboard.tcss @@ -0,0 +1,18 @@ +.repo-list { + +} + +#header { + height: 4; +} + +.repo-list-item { + layout: grid; + grid-size: 2; + height: 3; +} + +.repo-list-item-shortcut { + dock: right; + offset: -5% 0 +} diff --git a/src/heurams/interface/screens/about.py b/src/heurams/interface/screens/about.py index f7d6b19..77f635b 100644 --- a/src/heurams/interface/screens/about.py +++ b/src/heurams/interface/screens/about.py @@ -23,7 +23,7 @@ class AboutScreen(Screen): yield Header(show_clock=True) with ScrollableContainer(id="about_container"): yield Label("[b]关于与版本信息[/b]") - + # 获取系统信息 textual_version = self._get_textual_version() terminal_info = self._get_terminal_info() @@ -31,7 +31,7 @@ class AboutScreen(Screen): os_version = self._get_os_version() disk_usage = self._get_disk_usage() memory_info = self._get_memory_info() - + about_text = f""" # 关于 "潜进" @@ -95,36 +95,39 @@ Textual 框架版本: {textual_version} event.stop() if event.button.id == "back_button": self.action_go_back() - + def _get_textual_version(self) -> str: """获取 Textual 框架版本""" try: import textual + return textual.__version__ - except (ImportError, AttributeError): + except ImportError, AttributeError: return "未知" - + def _get_terminal_info(self) -> str: """获取终端模拟器信息""" terminal = shutil.which("terminal") if terminal: return terminal # 尝试从环境变量获取 - terminal_env = os.environ.get('TERM_PROGRAM') or os.environ.get('TERM') + terminal_env = os.environ.get("TERM_PROGRAM") or os.environ.get("TERM") return terminal_env or "未知" - + def _get_python_version(self) -> str: """获取 Python 解释器版本""" return platform.python_version() - + def _get_os_version(self) -> str: """获取操作系统版本""" try: if platform.system() == "Darwin": # macOS import subprocess - result = subprocess.run(['sw_vers', '-productVersion'], - capture_output=True, text=True) + + result = subprocess.run( + ["sw_vers", "-productVersion"], capture_output=True, text=True + ) return f"macOS {result.stdout.strip()}" elif platform.system() == "Windows": # Windows @@ -133,30 +136,31 @@ Textual 框架版本: {textual_version} # Linux - 尝试获取发行版信息 try: import distro + return f"{distro.name()} {distro.version()}" - except (ImportError, AttributeError): + except ImportError, AttributeError: return platform.platform() else: return platform.platform() except Exception: return platform.platform() - + def _get_disk_usage(self) -> str: """获取磁盘使用情况""" try: - usage = psutil.disk_usage('/') - free_gb = usage.free / (1024 ** 3) - total_gb = usage.total / (1024 ** 3) + usage = psutil.disk_usage("/") + free_gb = usage.free / (1024**3) + total_gb = usage.total / (1024**3) percent_free = (free_gb / total_gb) * 100 return f"{free_gb:.1f} GB ({percent_free:.1f}%)" except Exception: return "未知" - + def _get_memory_info(self) -> str: """获取内存信息""" try: memory = psutil.virtual_memory() - total_gb = memory.total / (1024 ** 3) + total_gb = memory.total / (1024**3) return f"{total_gb:.1f} GB" except Exception: - return "未知" \ No newline at end of file + return "未知" diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index de85d2c..1fce3ee 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -35,6 +35,8 @@ class DashboardScreen(Screen): ("q", "go_back", "返回"), ] + CSS_PATH = Path(__file__).parent.parent / 'css' / "screens" / "dashboard.tcss" + def __init__( self, name: str | None = None, @@ -42,97 +44,91 @@ class DashboardScreen(Screen): classes: str | None = None, ) -> None: super().__init__(name, id, classes) - self.repostat = {} - self.title2dirname = {} - self.title2repo = {} - self.dirname2repo = {} - self._load_data() + self.repolink = {} def compose(self) -> ComposeResult: """组合界面组件""" + self._load_data() yield Header(show_clock=True) with ScrollableContainer(): - yield Horizontal( + yield Horizontal( # 顶部的状态 Vertical( Label(f'欢迎使用 "潜进" 版本 {version.ver}'), + Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"), Label( - f"当前 UNIX 日时间戳: {timer.get_daystamp()}" + f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}" ), - Label(f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"), - Label(f"全局算法设置: {config_var.get()['interface']['global']['algorithm']}: {algorithms[config_var.get()['interface']['global']['algorithm']].desc}"), - classes="column infview", + Label( + f"默认算法设置: {config_var.get()['interface']['global']['algorithm']}" + ), + classes="left", ), Vertical( - Label(f"已加载 {len(self.repostat)} 个单元集", classes='dataview'), - Label(f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.get('unit_sum'), self.repostat.values()))} 个单元", classes='dataview'), - Label(f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.get('activated_sum'), self.repostat.values()))} 个单元", classes='dataview'), + Label(f"已加载 {len(self.repos)} 个单元集"), + Label( + f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.progress['total'], self.repos))} 个单元" + ), + Label( + f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.progress['touched'], self.repos))} 个单元" + ), Label(f""), - classes="column dataview", + classes="right", ), - id="dashboardtop" + id="header", ) - yield ListView(id="repo-list", classes="repo-list-view") - yield Label(f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}') + yield ListView(id="repo_list", classes="repo-list") # 单元集选择 + + yield Label( + f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}' + ) # 版本信息 yield Footer() def _load_data(self): - self.repo_dirs = Repo.probe_valid_repos_in_dir( - Path(config_var.get()['global']["paths"]["data"]) / "repo" + repo_dirs = Repo.probe_valid_repos_in_dir( + Path(config_var.get()["global"]["paths"]["repo"]) ) - for repo_dir in self.repo_dirs: - repo = Repo.create_from_repodir(repo_dir) + self.repos = list(map(Repo.from_repodir, repo_dirs)) + for repo in self.repos: self._analyse_repo(repo) def _analyse_repo(self, repo: Repo): - dirname = repo.source.name # type: ignore - title = repo.manifest["title"] - is_due = 0 - unit_sum = len(repo) - activated_sum = 0 - nextdate = float('inf') - for i in repo.ident_index: - nucleon = pt.Nucleon.create_on_nucleonic_data( - nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) - ) - electron = pt.Electron.create_on_electonic_data( - electronic_data=repo.electronic_data_lict.get_itemic_unit(i), - algo_name=config_var.get()['repo'][repo.manifest['title']]['algorithm'] - ) - if electron.is_activated(): - activated_sum += 1 - if electron.is_due(): - is_due = 1 - nextdate = min(nextdate, electron.nextdate()) - is_unfinished = unit_sum > activated_sum - if is_unfinished: - nextdate = min(nextdate, timer.get_daystamp()) - need_to_study = is_due or is_unfinished - prompt = f"{title}\0\n 进度: {activated_sum}/{unit_sum} ({round(activated_sum/unit_sum*100)}%)\n {'需要学习' if need_to_study else '无需操作'}" - stat = { - "is_due": is_due, - "unit_sum": unit_sum, - "title": title, - "activated_sum": activated_sum, - "nextdate": nextdate, - "is_unfinished": is_unfinished, - "need_to_study": need_to_study, - "prompt": prompt, - "dirname": dirname, + # need_review: 需要/不需要学习 + # nearest_review_time: 最近下次学习时间 + # progress: 进度 + # algotype: 算法类型 + ## initial_time: 起始时间 + # package: 包名 + # prompt: 最终呈现信息 + repo.package = repo.manifest["package"] + repo.nearest_review_time = float("inf") + repo.progress = { + "total": repo.data_length, + "touched": 0, } - self.repostat[dirname] = stat - self.title2dirname[title] = dirname - self.title2repo[title] = repo - self.dirname2repo[dirname] = repo + initial_time = float("inf") + for i in range(repo.data_length): + e = pt.Electron.from_data(repo.electronic_data_lict[i]) + n = pt.Nucleon.from_data(repo.nucleonic_data_lict[i]) + if e.is_activated(): + repo.algotype = e.algoname + repo.progress["touched"] += 1 + repo.nearest_review_time = min(repo.nearest_review_time, e.nextdate()) + # initial_time = min(initial_time, e.) + repo.need_review = timer.get_daystamp() >= repo.nearest_review_time + repo.prompt = f"""{repo.manifest['title']} ({repo.algotype}) + 进度: {repo.progress['touched']}/{repo.progress['total']} ({round(repo.progress['touched']/repo.progress['total']*100, 1)}%) + {'需要学习' if repo.need_review else "无需操作"} + """ def on_mount(self) -> None: """挂载组件时初始化""" - repo_list_widget = self.query_one("#repo-list", ListView) + repo_list_widget = self.query_one("#repo_list", ListView) # 按下次复习时间排序 repodirs = sorted( - self.repo_dirs, - key=lambda f: self.repostat[f.name]["nextdate"], + self.repos, + key=lambda r: r.nearest_review_time, reverse=True, ) repotitles = map(lambda f: self.repostat[f.name]["title"], repodirs) @@ -141,43 +137,44 @@ class DashboardScreen(Screen): repo_list_widget.append( ListItem( Static( - "在 ./data/repo/ 中未找到任何仓库.\n" + f"在 {config_var.get()['global']['paths']['repo']} 中未找到任何仓库.\n" "请导入仓库后重启应用, 或者新建空的仓库." - ) + ), + id="not-found", ) ) repo_list_widget.disabled = True return - for repotitle in repotitles: - prompt = self.repostat[self.title2dirname[repotitle]]["prompt"] - list_item = ListItem(Label(prompt), Button(f"开始学习", flat=True, variant="primary", classes="repo_listitem_btn", id=f"launch_{self.repostat[self.title2dirname[repotitle]]['dirname']}"), classes="repo_listitem") + for r in self.repos: + self.repolink[str(id(r))] = r # 用于规避 ctype id 对象还原 + list_item = ListItem( + Label(r.prompt), + Button( + f"开始学习", + flat=True, + variant="primary", + id=f"slaunch_repo_{id(r)}", + classes="repo-list-item-shortcut", + ), + classes="repo-list-item", + id=f"launch_repo_{id(r)}", + ) repo_list_widget.append(list_item) - # if not self.stay_enabled[repodir]: - # list_item.disabled = True - def on_list_view_selected(self, event) -> None: """处理列表项选择事件""" if not isinstance(event.item, ListItem): return - selected_label = event.item.query_one(Label) - label_text = str(selected_label.render()) - - if "未找到任何仓库" in label_text: + if "not-found" == event.item.id: return - # 提取文件名 - selected_repotitle = label_text.partition("\0")[0].replace("*", "") - selected_repo = self.title2repo[label_text.partition("\0")[0].replace("*", "")] + # 还原对象 + selected_repo = self.repolink[event.item.id.lstrip("launch_repo_")] # 跳转到准备屏幕 - self.app.push_screen( - PreparationScreen( - selected_repo, self.repostat[self.title2dirname[selected_repotitle]] - ) - ) + self.app.push_screen(PreparationScreen(selected_repo)) def action_quit_app(self) -> None: """退出应用程序""" @@ -188,9 +185,10 @@ class DashboardScreen(Screen): self.app.push_screen(NavigatorScreen()) def on_button_pressed(self, event: Button.Pressed) -> None: - logger.debug(f"event.button.id: {event.button.id}") """处理按钮点击事件""" - if str(event.button.id).startswith("launch_"): # type: ignore + logger.debug(f"event.button.id: {event.button.id}") + if event.button.id.startswith("slaunch_repo_"): # type: ignore from .preparation import launch - launch(repo=self.dirname2repo[event.button.id[7:]], app=self.app, scheduled_num=-1) # type: ignore + + launch(repo=self.repolink[event.button.id.lstrip("slaunch_repo_")], app=self.app, scheduled_num=-1) # type: ignore # TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中 diff --git a/src/heurams/interface/screens/favmgr.py b/src/heurams/interface/screens/favmgr.py index 4635cd4..01bf024 100644 --- a/src/heurams/interface/screens/favmgr.py +++ b/src/heurams/interface/screens/favmgr.py @@ -68,7 +68,7 @@ class FavoriteManagerScreen(Screen): if self.favorites: list_view = self.query_one("#favorites-list") for fav in self.favorites: - list_view.append(self._create_favorite_item(fav)) # type: ignore + list_view.append(self._create_favorite_item(fav)) # type: ignore def _encode_favorite_key(self, repo_path: str, ident: str) -> str: """编码仓库路径和标识符为安全的按钮 ID 部分""" @@ -115,12 +115,12 @@ class FavoriteManagerScreen(Screen): def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]: """获取仓库信息(标题、原子内容预览)""" try: - data_repo = Path(config_var.get()['global']["paths"]["data"]) / "repo" + data_repo = Path(config_var.get()["global"]["paths"]["data"]) / "repo" repo_dir = data_repo / repo_path if not repo_dir.exists(): logger.warning("仓库目录不存在: %s", repo_dir) return None - repo = Repo.create_from_repodir(repo_dir) + repo = Repo.from_repodir(repo_dir) # 获取原子内容预览 content_preview = "" payload = repo.payload @@ -201,4 +201,4 @@ class FavoriteManagerScreen(Screen): def action_toggle_dark(self) -> None: """切换暗黑模式""" - self.app.dark = not self.app.dark # type: ignore + self.app.dark = not self.app.dark # type: ignore diff --git a/src/heurams/interface/screens/memoqueue.py b/src/heurams/interface/screens/memoqueue.py index 1d7c877..68782d7 100644 --- a/src/heurams/interface/screens/memoqueue.py +++ b/src/heurams/interface/screens/memoqueue.py @@ -38,7 +38,7 @@ class MemScreen(Screen): ("0,1,2,3", "app.push_screen('about')", ""), ] - if config_var.get()['interface']['global']["quick_pass"]: + if config_var.get()["interface"]["global"]["quick_pass"]: BINDINGS.append(("k", "quick_pass", "正确应答")) BINDINGS.append(("f", "quick_fail", "错误应答")) rating = reactive(-1) @@ -70,7 +70,6 @@ class MemScreen(Screen): """更新状态机""" self.procession: Procession = self.phaser.current_procession() # type: ignore self.atom: pt.Atom = self.procession.current_atom # type: ignore - def on_mount(self): self.fission = self.procession.get_fission() @@ -93,7 +92,7 @@ class MemScreen(Screen): if self.repo is not None: fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏" s += f"收藏: {fav_status}\n" - '''if config_var.get().get("debug_topline", 0): + """if config_var.get().get("debug_topline", 0): try: alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore s += f"谜题: {alia}\n" @@ -113,7 +112,7 @@ class MemScreen(Screen): stat = self.fission.__repr__("simple", "") s += f"{stat}\n" except Exception as e: - s = str(e)''' + s = str(e)""" s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}" return s @@ -139,9 +138,9 @@ class MemScreen(Screen): i.remove() from heurams.interface.widgets.finished import Finished - if config_var.get()['interface']['global']["persist_to_file"]: + if config_var.get()["interface"]["global"]["persist_to_file"]: self.save_func() - container.mount(Finished(is_saved=['interface']['global']["persist_to_file"])) + container.mount(Finished(is_saved=["interface"]["global"]["persist_to_file"])) def on_button_pressed(self, event): event.stop() @@ -156,7 +155,7 @@ class MemScreen(Screen): from heurams.services.audio_service import play_by_path from heurams.services.hasher import get_md5 - path = Path(config_var.get()['global']["paths"]["data"]) / "cache" / "voice" + path = Path(config_var.get()["global"]["paths"]["data"]) / "cache" / "voice" path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav" if path.exists(): play_by_path(path) @@ -177,7 +176,6 @@ class MemScreen(Screen): self.forward(new_rating) self.rating = -1 - def forward(self, rating): self.update_state() allow_forward = 1 if rating >= 4 else 0 @@ -226,7 +224,7 @@ class MemScreen(Screen): return "" # self.repo.source 是 Path 对象,指向仓库目录 repo_full_path = self.repo.source - data_repo_path = Path(config_var.get()['global']["paths"]["data"]) / "repo" + data_repo_path = Path(config_var.get()["global"]["paths"]["data"]) / "repo" try: rel_path = repo_full_path.relative_to(data_repo_path) return str(rel_path) diff --git a/src/heurams/interface/screens/navigator.py b/src/heurams/interface/screens/navigator.py index b8e3d90..724ce7e 100644 --- a/src/heurams/interface/screens/navigator.py +++ b/src/heurams/interface/screens/navigator.py @@ -53,7 +53,11 @@ class NavigatorScreen(ModalScreen): ) yield Static("按下回车以完成切换\n所有会话将被保存") yield Button( - "关闭 (n)", id="close_button", variant="primary", classes="close-button", flat=True + "关闭 (n)", + id="close_button", + variant="primary", + classes="close-button", + flat=True, ) def on_mount(self) -> None: diff --git a/src/heurams/interface/screens/precache.py b/src/heurams/interface/screens/precache.py index 4b3d055..e775c74 100644 --- a/src/heurams/interface/screens/precache.py +++ b/src/heurams/interface/screens/precache.py @@ -13,16 +13,16 @@ import heurams.services.hasher as hasher from heurams.context import * # 兼容性缓存路径:优先使用 paths.cache,否则使用 data/cache -paths = config_var.get()['global']["paths"] +paths = config_var.get()["global"]["paths"] cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice" def format_size(bytes_num: int) -> str: """将字节数格式化为人类可读的字符串""" - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + for unit in ["B", "KB", "MB", "GB", "TB"]: if bytes_num < 1024.0: return f"{bytes_num:.2f} {unit}" - bytes_num /= 1024.0 # type: ignore + bytes_num /= 1024.0 # type: ignore return f"{bytes_num:.2f} PB" @@ -54,16 +54,24 @@ class PrecachingScreen(Screen): self.cancel_flag = 0 self.desc = desc # 不再需要缓存配置,保留配置读取以兼容 - self.cache_stats = {"total_size": 0, "file_count": 0, "human_size": "0 B", "cached_units": 0, "total_units": 0, "cache_rate": 0} + self.cache_stats = { + "total_size": 0, + "file_count": 0, + "human_size": "0 B", + "cached_units": 0, + "total_units": 0, + "cache_rate": 0, + } self._update_cache_stats() def _get_total_units(self) -> int: """获取所有仓库的总单元数""" from heurams.context import config_var from heurams.kernel.repolib import Repo - repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo" + + repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo" repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) - repos = map(Repo.create_from_repodir, repo_dirs) + repos = map(Repo.from_repodir, repo_dirs) total = 0 for repo in repos: try: @@ -86,7 +94,7 @@ class PrecachingScreen(Screen): cached_units += 1 total_units = self._get_total_units() cache_rate = (cached_units / total_units * 100) if total_units > 0 else 0 - + self.cache_stats["total_size"] = total_size self.cache_stats["file_count"] = file_count self.cache_stats["human_size"] = format_size(total_size) @@ -101,11 +109,15 @@ class PrecachingScreen(Screen): with Container(): yield Static( f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)", - classes="cache-usage-text" + classes="cache-usage-text", ) if self.nucleons: - yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info") - yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info") + yield Static( + f"目标单元归属: [b]{self.desc}[/b]", classes="target-info" + ) + yield Static( + f"单元数量: {len(self.nucleons)}", classes="target-info" + ) else: yield Static("目标: 所有单元", classes="target-info") @@ -114,16 +126,26 @@ class PrecachingScreen(Screen): yield ProgressBar(total=100, show_eta=False, id="progress_bar") with Horizontal(classes="button-group"): if not self.is_precaching: - yield Button("开始预缓存", id="start_precache", variant="primary") + yield Button( + "开始预缓存", id="start_precache", variant="primary" + ) else: - yield Button("取消预缓存", id="cancel_precache", variant="error") + yield Button( + "取消预缓存", id="cancel_precache", variant="error" + ) yield Button("清空缓存", id="clear_cache", variant="warning") yield Button("返回", id="go_back", variant="default") with Container(classes="cache-info"): yield Static(f"缓存路径: {cache_dir}", classes="cache-path") - yield Static(f"文件数: {self.cache_stats['file_count']}", classes="cache-count") - yield Static(f"总大小: {self.cache_stats['human_size']}", classes="cache-size") - yield Button("刷新", id="refresh_cache_stats", variant="default", flat=True) + yield Static( + f"文件数: {self.cache_stats['file_count']}", classes="cache-count" + ) + yield Static( + f"总大小: {self.cache_stats['human_size']}", classes="cache-size" + ) + yield Button( + "刷新", id="refresh_cache_stats", variant="default", flat=True + ) yield Static("若您离开此界面, 未完成的缓存进程会自动停止.") yield Static('缓存程序支持 "断点续传".') @@ -230,9 +252,9 @@ class PrecachingScreen(Screen): from heurams.context import config_var, rootdir, workdir from heurams.kernel.repolib import Repo - repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo" + repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo" repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) - repos = map(Repo.create_from_repodir, repo_dirs) + repos = map(Repo.from_repodir, repo_dirs) # 计算总项目数 self.total = 0 @@ -241,7 +263,7 @@ class PrecachingScreen(Screen): try: for i in repo.ident_index: nucleon_list.append( - pt.Nucleon.create_on_nucleonic_data( + pt.Nucleon.from_data( repo.nucleonic_data_lict.get_itemic_unit(i) ) ) diff --git a/src/heurams/interface/screens/preparation.py b/src/heurams/interface/screens/preparation.py index 3a151a8..b9de967 100644 --- a/src/heurams/interface/screens/preparation.py +++ b/src/heurams/interface/screens/preparation.py @@ -5,7 +5,16 @@ from textual.containers import ScrollableContainer from textual.reactive import reactive from textual.screen import Screen from textual.widget import Widget -from textual.widgets import Button, Footer, Header, Label, Markdown, Static, Rule, Sparkline +from textual.widgets import ( + Button, + Footer, + Header, + Label, + Markdown, + Static, + Rule, + Sparkline, +) import heurams.kernel.particles as pt import heurams.services.hasher as hasher @@ -28,20 +37,19 @@ class PreparationScreen(Screen): ("0,1,2,3", "app.push_screen('about')", ""), ] - scheduled_num = reactive(config_var.get()['interface']['global']["scheduled_num"]) + scheduled_num = reactive(config_var.get()["interface"]["global"]["scheduled_num"]) - def __init__(self, repo: Repo, repostat: dict) -> None: + def __init__(self, repo: Repo) -> None: super().__init__(name=None, id=None, classes=None) self.repo = repo - self.repostat = repostat self.load_data() def compose(self) -> ComposeResult: yield Header(show_clock=True) with ScrollableContainer(id="vice_container"): - yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n") + yield Label(f"准备就绪: [b]{self.repo.manifest['title']}[/b]\n") yield Label( - f"仓库路径: {config_var.get()['global']['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]" + f"[b]仓库路径: {self.repo.source}[/b]" ) yield Label(f"\n单元数量: {len(self.repo)}\n") yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label") @@ -62,12 +70,12 @@ class PreparationScreen(Screen): yield Static() yield Sparkline(self.spark_line_arr, summary_function=max) yield Rule() - #yield Static(str(self.spark_line_arr)) + # yield Static(str(self.spark_line_arr)) yield Static(f"单元状态预览:\n") for i in self.content.splitlines(): yield Static(i, classes="full") yield Footer() - + # def watch_scheduled_num(self, old_scheduled_num, new_scheduled_num): # logger.debug("响应", old_scheduled_num, "->", new_scheduled_num) # try: @@ -80,19 +88,21 @@ class PreparationScreen(Screen): content = "" spark_line_arr = [] for i in self.repo.ident_index: - n = pt.Nucleon.create_on_nucleonic_data( + n = pt.Nucleon.from_data( nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i) ) - e = pt.Electron.create_on_electonic_data(electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i)) + e = pt.Electron.from_data( + electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i) + ) statstr = "" if e.is_activated(): - statstr = '[#00ff00]A[/]' + statstr = "[#00ff00]A[/]" if e.is_due(): - statstr = '[#ffff00]R[/]' - #statstr += ('[dim]' + str(e.rept(real_rept=True)).zfill(2)+'[/]') + statstr = "[#ffff00]R[/]" + # statstr += ('[dim]' + str(e.rept(real_rept=True)).zfill(2)+'[/]') else: - statstr = '[#ff0000]U[/]' + statstr = "[#ff0000]U[/]" spark_line_arr.append(e.rept(real_rept=True)) content += f" {statstr} {n['content'].replace('/', '')} \n" self.content = content @@ -107,9 +117,7 @@ class PreparationScreen(Screen): lst = list() for i in self.repo.ident_index: lst.append( - pt.Nucleon.create_on_nucleonic_data( - self.repo.nucleonic_data_lict.get_itemic_unit(i) - ) + pt.Nucleon.from_data(self.repo.nucleonic_data_lict.get_itemic_unit(i)) ) precache_screen = PrecachingScreen( nucleons=lst, desc=self.repo.manifest["title"] @@ -128,15 +136,16 @@ class PreparationScreen(Screen): elif event.button.id == "precache_button": self.action_precache() + def launch(repo, app, scheduled_num): if scheduled_num == -1: - scheduled_num = config_var.get()['interface']['global']["scheduled_num"] + scheduled_num = config_var.get()["interface"]["global"]["scheduled_num"] atoms = list() for i in repo.ident_index: - n = pt.Nucleon.create_on_nucleonic_data( + n = pt.Nucleon.from_data( nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) ) - e = pt.Electron.create_on_electonic_data( + e = pt.Electron.from_data( electronic_data=repo.electronic_data_lict.get_itemic_unit(i) ) a = pt.Atom(n, e, repo.orbitic_data) diff --git a/src/heurams/interface/screens/setting.py b/src/heurams/interface/screens/setting.py index 04c9f60..9976d74 100644 --- a/src/heurams/interface/screens/setting.py +++ b/src/heurams/interface/screens/setting.py @@ -8,7 +8,19 @@ import os from textual.app import ComposeResult from textual.containers import ScrollableContainer, Container, Horizontal, Vertical from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static, Collapsible, Input, Switch, Select +from textual.widgets import ( + Button, + Footer, + Header, + Label, + ListItem, + ListView, + Static, + Collapsible, + Input, + Switch, + Select, +) from textual.layouts import horizontal import heurams.kernel.particles as pt @@ -45,13 +57,15 @@ class SettingScreen(Screen): """组合界面组件""" yield Header(show_clock=True) with ScrollableContainer(): - yield Label('[b]设置页面[/b]') + yield Label("[b]设置页面[/b]") for i in config_var.get(): - if i.startswith('_'): + if i.startswith("_"): continue - a = self._get_subcfg(f'{i}') + a = self._get_subcfg(f"{i}") if a: - yield Collapsible(*a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}') + yield Collapsible( + *a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}' + ) yield Footer() def _get_subcfg(self, parent_epath: str): @@ -60,61 +74,115 @@ class SettingScreen(Screen): if parent.is_dir: lst = list() for i in parent: - if i.startswith('_'): + if i.startswith("_"): continue a = self._get_subcfg(f"{parent_epath}.{i}") if a: - lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}')) + lst.append( + Collapsible( + *a, title=i + f'\n{parent.get(f"_{i}_desc", "")}' + ) + ) return lst - if isinstance(parent, dict) or (isinstance(parent, ConfigDict) and not parent.is_dir): + if isinstance(parent, dict) or ( + isinstance(parent, ConfigDict) and not parent.is_dir + ): lst = list() for i in parent: - if i.startswith('_'): + if i.startswith("_"): continue if isinstance(parent[i], dict): a = self._get_subcfg(f"{parent_epath}.{i}") if a: - lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}')) - elif f'_{i}_candidate' in parent: # 选择框模式 - if isinstance(parent[f'_{i}_candidate'], dict): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Select(((f"{j} ({k})", j) for j, k in parent[f'_{i}_candidate'].items()), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")), - classes='container' - )) - elif isinstance(parent[f'_{i}_candidate'], list): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Select(((j, j) for j in parent[f'_{i}_candidate']), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")), - classes='container' - )) + lst.append( + Collapsible( + *a, title=i + f'\n{parent.get(f"_{i}_desc", "")}' + ) + ) + elif f"_{i}_candidate" in parent: # 选择框模式 + if isinstance(parent[f"_{i}_candidate"], dict): + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Select( + ( + (f"{j} ({k})", j) + for j, k in parent[f"_{i}_candidate"].items() + ), + prompt=f'{parent.get(f"{i}", "")}', + id=domize(f"{parent_epath}.{i}"), + ), + classes="container", + ) + ) + elif isinstance(parent[f"_{i}_candidate"], list): + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Select( + ((j, j) for j in parent[f"_{i}_candidate"]), + prompt=f'{parent.get(f"{i}", "")}', + id=domize(f"{parent_epath}.{i}"), + ), + classes="container", + ) + ) else: if isinstance(parent[i], float): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Input(value=str(parent[i]), placeholder='要求一个浮点数', type='number', id=domize(f"{parent_epath}.{i}")), - classes='container')) + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Input( + value=str(parent[i]), + placeholder="要求一个浮点数", + type="number", + id=domize(f"{parent_epath}.{i}"), + ), + classes="container", + ) + ) elif isinstance(parent[i], str): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Input(value=parent[i], placeholder='要求一个字符串', type='text', id=domize(f"{parent_epath}.{i}")), - classes='container')) + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Input( + value=parent[i], + placeholder="要求一个字符串", + type="text", + id=domize(f"{parent_epath}.{i}"), + ), + classes="container", + ) + ) elif isinstance(parent[i], bool): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Switch(value=parent[i], id=domize(f"{parent_epath}.{i}")), - classes='container')) + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Switch( + value=parent[i], id=domize(f"{parent_epath}.{i}") + ), + classes="container", + ) + ) elif isinstance(parent[i], int): - lst.append(Horizontal( - Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), - Input(value=str(parent[i]), placeholder='要求一个整数', type='integer', id=domize(f"{parent_epath}.{i}")), - classes='container')) + lst.append( + Horizontal( + Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), + Input( + value=str(parent[i]), + placeholder="要求一个整数", + type="integer", + id=domize(f"{parent_epath}.{i}"), + ), + classes="container", + ) + ) elif isinstance(parent[i], list): pass else: - lst.append(Label('未知类型')) + lst.append(Label("未知类型")) return lst - return [Label('无子项')] + return [Label("无子项")] def on_mount(self) -> None: """挂载组件时初始化""" @@ -133,14 +201,18 @@ class SettingScreen(Screen): """打开导航器""" self.app.push_screen(NavigatorScreen()) - def on_input_changed(self, event: Input.Changed) -> None: widget_id = event.input.id if not widget_id: return eepath = undomize(widget_id) value = event.value - epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) + epath( + config_var.get(), + eepath, + enable_modify=True, + new_value=type(epath(config_var.get(), eepath))(value), + ) def on_switch_changed(self, event: Switch.Changed) -> None: widget_id = event.switch.id @@ -148,7 +220,12 @@ class SettingScreen(Screen): return eepath = undomize(widget_id) value = event.value - epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) + epath( + config_var.get(), + eepath, + enable_modify=True, + new_value=type(epath(config_var.get(), eepath))(value), + ) def on_select_changed(self, event: Select.Changed) -> None: widget_id = event.select.id @@ -156,4 +233,9 @@ class SettingScreen(Screen): return eepath = undomize(widget_id) value = event.value - epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) \ No newline at end of file + epath( + config_var.get(), + eepath, + enable_modify=True, + new_value=type(epath(config_var.get(), eepath))(value), + ) diff --git a/src/heurams/interface/widgets/base_puzzle_widget.py b/src/heurams/interface/widgets/base_puzzle_widget.py index e0eba81..46fd537 100644 --- a/src/heurams/interface/widgets/base_puzzle_widget.py +++ b/src/heurams/interface/widgets/base_puzzle_widget.py @@ -15,7 +15,7 @@ class BasePuzzleWidget(Widget): id: str | None = None, classes: str | None = None, disabled: bool = False, - markup: bool = True + markup: bool = True, ) -> None: super().__init__( *children, @@ -23,7 +23,7 @@ class BasePuzzleWidget(Widget): id=id, classes=classes, disabled=disabled, - markup=markup + markup=markup, ) self.atom = atom diff --git a/src/heurams/interface/widgets/cloze_puzzle.py b/src/heurams/interface/widgets/cloze_puzzle.py index 3a9a980..12a9e13 100644 --- a/src/heurams/interface/widgets/cloze_puzzle.py +++ b/src/heurams/interface/widgets/cloze_puzzle.py @@ -83,7 +83,7 @@ class ClozePuzzle(BasePuzzleWidget): if lst: lastone = lst[-1] for i in lst[:-1]: - s += (i + ' ') + s += i + " " s += f" `{lastone}`" return s diff --git a/src/heurams/interface/widgets/mcq_puzzle.py b/src/heurams/interface/widgets/mcq_puzzle.py index d724a15..400a601 100644 --- a/src/heurams/interface/widgets/mcq_puzzle.py +++ b/src/heurams/interface/widgets/mcq_puzzle.py @@ -55,8 +55,8 @@ class MCQPuzzle(BasePuzzleWidget): def _load(self): cfg = self.atom.registry["nucleon"]["puzzles"][self.alia] - if cfg['mapping'] == {}: - self.screen.rating = 5 # type: ignore + if cfg["mapping"] == {}: + self.screen.rating = 5 # type: ignore self.puzzle = pz.MCQPuzzle( cfg["mapping"], cfg["jammer"], int(cfg["max_riddles_num"]), cfg["prefix"] ) diff --git a/src/heurams/interface/widgets/placeholder.py b/src/heurams/interface/widgets/placeholder.py index e5e7b70..9814e9e 100644 --- a/src/heurams/interface/widgets/placeholder.py +++ b/src/heurams/interface/widgets/placeholder.py @@ -11,7 +11,7 @@ class Placeholder(Widget): id: str | None = None, classes: str | None = None, disabled: bool = False, - markup: bool = True + markup: bool = True, ) -> None: super().__init__( *children, @@ -19,7 +19,7 @@ class Placeholder(Widget): id=id, classes=classes, disabled=disabled, - markup=markup + markup=markup, ) def compose(self): diff --git a/src/heurams/interface/widgets/recognition.py b/src/heurams/interface/widgets/recognition.py index 0162519..9504e5d 100644 --- a/src/heurams/interface/widgets/recognition.py +++ b/src/heurams/interface/widgets/recognition.py @@ -67,7 +67,7 @@ class Recognition(BasePuzzleWidget): f";{delim}": ";", f":{delim}": ":", } - + primary = cfg["primary"] with Center(): @@ -88,7 +88,7 @@ class Recognition(BasePuzzleWidget): for item in cfg["secondary"]: if isinstance(item, list): for j in item: - yield Markdown(f"### 笔记: {j}") #TODO ANNOTATION + yield Markdown(f"### 笔记: {j}") # TODO ANNOTATION continue if isinstance(item, Dict): total = "" diff --git a/src/heurams/kernel/algorithms/base.py b/src/heurams/kernel/algorithms/base.py index c89839d..b71d6c9 100644 --- a/src/heurams/kernel/algorithms/base.py +++ b/src/heurams/kernel/algorithms/base.py @@ -8,7 +8,8 @@ logger = get_logger(__name__) class BaseAlgorithm: algo_name = "BaseAlgorithm" - desc = '算法基类' + desc = "算法基类" + class AlgodataDict(TypedDict): real_rept: int rept: int diff --git a/src/heurams/kernel/algorithms/nsp0.py b/src/heurams/kernel/algorithms/nsp0.py index 752a833..45557d2 100644 --- a/src/heurams/kernel/algorithms/nsp0.py +++ b/src/heurams/kernel/algorithms/nsp0.py @@ -10,7 +10,8 @@ logger = get_logger(__name__) class NSP0Algorithm(BaseAlgorithm): algo_name = "NSP-0" - desc = '快速筛选用特殊调度器' + desc = "快速筛选用特殊调度器" + class AlgodataDict(TypedDict): real_rept: int rept: int @@ -23,7 +24,7 @@ class NSP0Algorithm(BaseAlgorithm): defaults = { "real_rept": 0, - 'important': 0, + "important": 0, "rept": 0, "interval": 0, "last_date": 0, @@ -52,8 +53,10 @@ class NSP0Algorithm(BaseAlgorithm): if feedback == -1: logger.debug("feedback 为 -1, 跳过更新") return - algodata[cls.algo_name]["interval"] = (1 if feedback <= 3 else float('inf')) - algodata[cls.algo_name]["important"] = (1 if feedback <= 3 else algodata[cls.algo_name]["important"]) + algodata[cls.algo_name]["interval"] = 1 if feedback <= 3 else float("inf") + algodata[cls.algo_name]["important"] = ( + 1 if feedback <= 3 else algodata[cls.algo_name]["important"] + ) algodata[cls.algo_name]["last_date"] = timer.get_daystamp() algodata[cls.algo_name]["next_date"] = ( timer.get_daystamp() + algodata[cls.algo_name]["interval"] diff --git a/src/heurams/kernel/algorithms/sm15m.py b/src/heurams/kernel/algorithms/sm15m.py index 83fa9e8..6f84996 100644 --- a/src/heurams/kernel/algorithms/sm15m.py +++ b/src/heurams/kernel/algorithms/sm15m.py @@ -27,7 +27,7 @@ from heurams.kernel.algorithms.sm15m_calc import ( # 全局状态文件路径 _GLOBAL_STATE_FILE = os.path.expanduser( - pathlib.Path(config_var.get()['global']["paths"]["data"]) + pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "global" / "sm15m_global_state.json" ) diff --git a/src/heurams/kernel/algorithms/sm2.py b/src/heurams/kernel/algorithms/sm2.py index aa49538..bb8d187 100644 --- a/src/heurams/kernel/algorithms/sm2.py +++ b/src/heurams/kernel/algorithms/sm2.py @@ -10,7 +10,8 @@ logger = get_logger(__name__) class SM2Algorithm(BaseAlgorithm): algo_name = "SM-2" - desc = '经典间隔重复算法' + desc = "经典间隔重复算法" + class AlgodataDict(TypedDict): efactor: float real_rept: int diff --git a/src/heurams/kernel/particles/atom.py b/src/heurams/kernel/particles/atom.py index cc2baef..c572755 100644 --- a/src/heurams/kernel/particles/atom.py +++ b/src/heurams/kernel/particles/atom.py @@ -32,7 +32,7 @@ class Atom: default_runtime = { "locked": False, - "min_rate": float('inf'), + "min_rate": float("inf"), "new_activation": False, } diff --git a/src/heurams/kernel/particles/electron.py b/src/heurams/kernel/particles/electron.py index 8f4901c..0bfeb7d 100644 --- a/src/heurams/kernel/particles/electron.py +++ b/src/heurams/kernel/particles/electron.py @@ -24,6 +24,7 @@ class Electron: algo_name = "SM-2" self.algodata = algodata self.ident = ident + self.algoname = algo_name self.algo: algolib.BaseAlgorithm = algorithms[algo_name] if not self.algo.check_integrity(self.algodata): @@ -53,10 +54,10 @@ class Electron: result = self.algo.is_due(self.algodata) return result and self.is_activated() - def rept(self, real_rept = False): + def rept(self, real_rept=False): if real_rept: - return self.algodata[self.algo.algo_name]['real_rept'] - return self.algodata[self.algo.algo_name]['rept'] + return self.algodata[self.algo.algo_name]["real_rept"] + return self.algodata[self.algo.algo_name]["rept"] def is_activated(self): result = self.algodata[self.algo.algo_name]["is_activated"] @@ -112,7 +113,7 @@ class Electron: return len(self.algodata[self.algo.algo_name]) @staticmethod - def create_on_electonic_data(electronic_data: tuple, algo_name: str = ""): + def from_data(electronic_data: tuple, algo_name: str = ""): _data = electronic_data ident = _data[0] algodata = _data[1] diff --git a/src/heurams/kernel/particles/nucleon.py b/src/heurams/kernel/particles/nucleon.py index 17affdb..8435644 100644 --- a/src/heurams/kernel/particles/nucleon.py +++ b/src/heurams/kernel/particles/nucleon.py @@ -15,26 +15,26 @@ class Nucleon: self.ident = ident try: data_safe = deepcopy((payload | common)) - data_puz = deepcopy(data_safe['puzzles']) - data_safe['puzzles'] = {} + data_puz = deepcopy(data_safe["puzzles"]) + data_safe["puzzles"] = {} env = { "payload": data_safe, - "default": config_var.get()['interface']["puzzles"], + "default": config_var.get()["interface"]["puzzles"], "nucleon": data_safe, } self.evalizer = Evalizer(environment=env) data_safe = self.evalizer(deepcopy(data_safe)) env = { "payload": data_safe, - "default": config_var.get()['interface']["puzzles"], + "default": config_var.get()["interface"]["puzzles"], "nucleon": data_safe, } self.evalizer = Evalizer(environment=env) data_puz = self.evalizer(deepcopy(data_puz)) - data_safe['puzzles'] = data_puz # type: ignore - self.data: dict = data_safe # type: ignore + data_safe["puzzles"] = data_puz # type: ignore + self.data: dict = data_safe # type: ignore except Exception: - self.data = (payload | common) + self.data = payload | common def __getitem__(self, key): if isinstance(key, str): @@ -71,7 +71,7 @@ class Nucleon: return s @staticmethod - def create_on_nucleonic_data(nucleonic_data: tuple): + def from_data(nucleonic_data: tuple): _data = nucleonic_data payload = _data[1][0] common = _data[1][1] diff --git a/src/heurams/kernel/reactor/fission.py b/src/heurams/kernel/reactor/fission.py index cf6cd8b..4d1d100 100644 --- a/src/heurams/kernel/reactor/fission.py +++ b/src/heurams/kernel/reactor/fission.py @@ -75,7 +75,7 @@ class Fission(Machine): self.current_puzzle_inf = self.puzzles_inf[0] for i in range(len(self.puzzles_inf)): - self.min_ratings.append(float('inf')) + self.min_ratings.append(float("inf")) Machine.__init__( self, diff --git a/src/heurams/kernel/repolib/repo.py b/src/heurams/kernel/repolib/repo.py index b863a34..b8cd9a5 100644 --- a/src/heurams/kernel/repolib/repo.py +++ b/src/heurams/kernel/repolib/repo.py @@ -13,10 +13,14 @@ from heurams.kernel.auxiliary.lict import Lict class RepoManifest(TypedDict): title: str author: str + package: str desc: str class Repo: + """只维护仓库本身 + 上层 API 请访问此对象下粒子对象列表""" + file_mapping = { "schedule": "schedule.toml", "payload": "payload.toml", @@ -58,14 +62,15 @@ class Repo: "algodata": self.algodata, "source": self.source, } - self.generate_particles_data() - - def generate_particles_data(self): + self._generate_particles_data() + def _generate_particles_data(self): + """生成上层的粒子对象组和 API 交互, 会在 init 后自动调用""" self.nucleonic_data_lict = Lict( initlist=list(map(self._nucleonic_proc, self.payload)) ) self.orbitic_data = self.schedule + self.data_length = len(self.nucleonic_data_lict) self.ident_index = self.nucleonic_data_lict.keys() for i in self.ident_index: self.algodata.append_new((i, {})) @@ -76,13 +81,6 @@ class Repo: common = self.typedef["common"] return (ident, (unit[1], common)) - @staticmethod - def _merge(value): - def inner(x): - return (x, value) - - return inner - def __len__(self): return len(self.payload) @@ -95,6 +93,7 @@ class Repo: def persist_to_repodir( self, save_list: list | None = None, source: Path | None = None ): + """保存单元集数据到目录""" if save_list == None: save_list = self.default_save_list if self.source != None and source == None: @@ -116,11 +115,13 @@ class Repo: else: raise ValueError(f"不支持的文件类型: {filename}") - def export_to_single_dict(self): + def export_to_dict(self): + """导出至单个字典""" return self.database @classmethod def create_new_repo(cls, source=None): + """创建新的空单元集""" default_database = { "schedule": {}, "payload": Lict([]), @@ -132,7 +133,8 @@ class Repo: return Repo(**default_database) @classmethod - def create_from_repodir(cls, source: Path): + def from_repodir(cls, source: Path): + """从目录创建单元集""" database = {} for keyname, filename in cls.file_mapping.items(): with open(source / filename, "r") as f: @@ -153,21 +155,24 @@ class Repo: return Repo(**database) @classmethod - def create_from_single_dict(cls, dictdata, source: Path | None = None): + def from_dict(cls, dictdata, source: Path | None = None): + """从单一字典创建单元集""" database = dictdata database["source"] = source return Repo(**database) @classmethod def check_repodir(cls, source: Path): + """检测单元集目录合法性""" try: - cls.create_from_repodir(source) - return 1 + cls.from_repodir(source) + return True except: - return 0 + return False @classmethod def probe_valid_repos_in_dir(cls, folder: Path): + """返回一个合法的子目录 Path() 列表""" lst = list() for i in folder.iterdir(): if i.is_dir(): diff --git a/src/heurams/services/audio_service.py b/src/heurams/services/audio_service.py index 4eefd11..c662d10 100644 --- a/src/heurams/services/audio_service.py +++ b/src/heurams/services/audio_service.py @@ -1,4 +1,5 @@ """音频服务""" + from typing import Callable from heurams.context import config_var @@ -7,7 +8,10 @@ from heurams.services.logger import get_logger logger = get_logger(__name__) -play_by_path: Callable = prov[config_var.get()["services"]["audio"]['provider']].play_by_path +play_by_path: Callable = prov[ + config_var.get()["services"]["audio"]["provider"] +].play_by_path logger.debug( - "音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]['provider'] + "音频服务初始化完成, 使用 Provider: %s", + config_var.get()["services"]["audio"]["provider"], ) diff --git a/src/heurams/services/config.py b/src/heurams/services/config.py index 9f6b2a5..d52355a 100644 --- a/src/heurams/services/config.py +++ b/src/heurams/services/config.py @@ -11,26 +11,27 @@ from heurams.services.exceptions import WTFException # 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得 logger = get_logger(__name__) -class ConfigDict(UserDict): # 舒服了 - _instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题 + +class ConfigDict(UserDict): # 舒服了 + _instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题 def __new__(cls, config_path: pathlib.Path, dict=None): if dict: raise WTFException("不要放默认值...") - + # 规范化路径, 免得单例存在"别名" path_key = config_path.resolve() - + if path_key in cls._instances: return cls._instances[path_key] - + instance = super().__new__(cls) cls._instances[path_key] = instance return instance - def __init__(self, config_path: pathlib.Path, dict = None): # 需要自己把自己提起来 + def __init__(self, config_path: pathlib.Path, dict=None): # 需要自己把自己提起来 # 避免重复初始化 - if hasattr(self, '_initialized'): + if hasattr(self, "_initialized"): return self._initialized = True if dict: @@ -42,13 +43,13 @@ class ConfigDict(UserDict): # 舒服了 if self.is_dir: self.update_index() else: - with open(self.path, 'r+') as f: #TODO: 给这个做缓存 + with open(self.path, "r+") as f: # TODO: 给这个做缓存 try: self.data = toml.load(f) except: self.data = {} - self.persist = lambda: False # 不修改错误的配置文件 - + self.persist = lambda: False # 不修改错误的配置文件 + def __getitem__(self, key): # 我们实现了先进的懒狗加载 value = super().__getitem__(key) @@ -60,7 +61,7 @@ class ConfigDict(UserDict): # 舒服了 return super().__contains__(key) def __setitem__(self, key, value): - origvalue = super().__getitem__(key) # 所以你不该访问不存在的对象 + origvalue = super().__getitem__(key) # 所以你不该访问不存在的对象 if isinstance(origvalue, ConfigDict): if origvalue.path.is_dir(): raise WTFException("你怎么能变更目录配置的内容呢?!") @@ -70,20 +71,22 @@ class ConfigDict(UserDict): # 舒服了 origvalue.data = value super().__setitem__(key, value) - def update_index(self): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性 + def update_index( + self, + ): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性 for i in self.path.iterdir(): - if i.name.startswith('_'): - if i.name == '_.toml' and not i.is_dir(): - with open(self.path/'_.toml', 'r+') as f: + if i.name.startswith("_"): + if i.name == "_.toml" and not i.is_dir(): + with open(self.path / "_.toml", "r+") as f: self.data.update(dict(toml.load(f))) continue if i.is_dir(): self.data[i.name] = i else: - if i.suffix == '.toml': + if i.suffix == ".toml": self.data[i.stem] = i else: - logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro + logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro def persist(self): if self.is_dir: @@ -94,5 +97,5 @@ class ConfigDict(UserDict): # 舒服了 logger.debug("完成配置持久化") return - with open(self.path, 'w+') as f: - toml.dump(self.data, f) \ No newline at end of file + with open(self.path, "w+") as f: + toml.dump(self.data, f) diff --git a/src/heurams/services/epath.py b/src/heurams/services/epath.py index 91d0504..c9fa953 100644 --- a/src/heurams/services/epath.py +++ b/src/heurams/services/epath.py @@ -3,27 +3,41 @@ from heurams.services.logger import get_logger logger = get_logger(__name__) -def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, new_value=None): + +def epath( + dct, + path: str = "", + default=None, + parents=False, + enable_modify=False, + new_value=None, +): if not path: return dct - - path = path.rstrip('.') - path = path.lstrip('.') + + path = path.rstrip(".") + path = path.lstrip(".") target = dct - keys = path.split('.') + keys = path.split(".") logger.debug(f"处理 EPATH {path}, {new_value}") for idx, i in enumerate(keys): - is_last = (idx == len(keys) - 1) - + is_last = idx == len(keys) - 1 + # 处理字典键 - logger.debug(f'处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}') - + logger.debug( + f"处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}" + ) + if is_last and enable_modify: # 最后一次循环执行修改 - if (isinstance(target, dict) or isinstance(target, ConfigDict)): + if isinstance(target, dict) or isinstance(target, ConfigDict): target[i] = new_value return new_value - elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)): + elif ( + i.startswith("[") + and i.endswith("]") + and isinstance(target, (list, tuple)) + ): idx_num = int(i[1:-1]) if 0 <= idx_num < len(target): target[idx_num] = new_value @@ -38,9 +52,15 @@ def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, else: return default else: - if (isinstance(target, dict) or isinstance(target, ConfigDict)) and i in target: + if ( + isinstance(target, dict) or isinstance(target, ConfigDict) + ) and i in target: target = target[i] - elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)): + elif ( + i.startswith("[") + and i.endswith("]") + and isinstance(target, (list, tuple)) + ): idx_num = int(i[1:-1]) if 0 <= idx_num < len(target): target = target[idx_num] @@ -56,5 +76,5 @@ def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, target = target[i] else: return default - - return target \ No newline at end of file + + return target diff --git a/src/heurams/services/exceptions.py b/src/heurams/services/exceptions.py index 3708473..741b86a 100644 --- a/src/heurams/services/exceptions.py +++ b/src/heurams/services/exceptions.py @@ -1,4 +1,5 @@ from heurams.services.logger import get_logger + class WTFException(Exception): - pass \ No newline at end of file + pass diff --git a/src/heurams/services/favorite_service.py b/src/heurams/services/favorite_service.py index e0939e4..b21e804 100644 --- a/src/heurams/services/favorite_service.py +++ b/src/heurams/services/favorite_service.py @@ -63,7 +63,7 @@ class FavoriteManager: def _get_file_path(self) -> Path: """获取收藏文件路径""" - config_path = Path(config_var.get()['global']["paths"]["data"]) + config_path = Path(config_var.get()["global"]["paths"]["data"]) fav_path = config_path / "global" / "favorites.json" fav_path.parent.mkdir(parents=True, exist_ok=True) return fav_path diff --git a/src/heurams/services/textproc.py b/src/heurams/services/textproc.py index 0df352a..dc9030e 100644 --- a/src/heurams/services/textproc.py +++ b/src/heurams/services/textproc.py @@ -3,8 +3,10 @@ def truncate(text): return text return text[:3] + ">" + def domize(text): - return text.replace('.', '--DOT--') + return text.replace(".", "--DOT--") + def undomize(text): - return text.replace('--DOT--', '.') \ No newline at end of file + return text.replace("--DOT--", ".") diff --git a/src/heurams/services/timer.py b/src/heurams/services/timer.py index ec52336..6081401 100644 --- a/src/heurams/services/timer.py +++ b/src/heurams/services/timer.py @@ -9,12 +9,15 @@ logger = get_logger(__name__) def get_daystamp() -> int: """获取当前日戳(以天为单位的整数时间戳)""" - time_override = config_var.get()['services']["timer"]["daystamp_override"] + time_override = config_var.get()["services"]["timer"]["daystamp_override"] if time_override != -1: logger.debug("使用覆盖的日戳: %d", time_override) return int(time_override) - result = int((time.time() + config_var.get()['services']["timer"]["timezone_offset"]) // (24 * 3600)) + result = int( + (time.time() + config_var.get()["services"]["timer"]["timezone_offset"]) + // (24 * 3600) + ) logger.debug("计算日戳: %d", result) return result @@ -22,7 +25,7 @@ def get_daystamp() -> int: def get_timestamp() -> float: """获取 UNIX 时间戳""" # 搞这个类的原因是要支持可复现操作 - time_override = config_var.get()['services']["timer"]["timestamp_override"] + time_override = config_var.get()["services"]["timer"]["timestamp_override"] if time_override != -1: logger.debug("使用覆盖的时间戳: %f", time_override) return float(time_override) diff --git a/src/heurams/services/tts_service.py b/src/heurams/services/tts_service.py index 71d4524..f9ed645 100644 --- a/src/heurams/services/tts_service.py +++ b/src/heurams/services/tts_service.py @@ -7,7 +7,8 @@ from heurams.services.logger import get_logger logger = get_logger(__name__) -convertor: Callable = prov[config_var.get()['services']["tts"]["provider"]].convert +convertor: Callable = prov[config_var.get()["services"]["tts"]["provider"]].convert logger.debug( - "TTS 服务初始化完成, 使用 provider: %s", config_var.get()['services']["tts"]["provider"] + "TTS 服务初始化完成, 使用 provider: %s", + config_var.get()["services"]["tts"]["provider"], ) diff --git a/src/heurams/tools/csv2payload.py b/src/heurams/tools/csv2payload.py index 4caa023..509b69c 100755 --- a/src/heurams/tools/csv2payload.py +++ b/src/heurams/tools/csv2payload.py @@ -23,7 +23,7 @@ ident, content, meaning, ... , "Woof", "狗发出的声音" ``` -转换后的 TOML: +转换后的 TOML: ```toml [Fox] content = "Fox" @@ -65,6 +65,7 @@ meaning = "狗发出的声音" - 如果 CSV 包含更多列,它们也会以相同方式转换为键值对 - 支持 `-r` 参数指定随机种子来打乱 section 顺序 """ + import csv import sys import os @@ -72,10 +73,11 @@ import random import argparse from pathlib import Path + def csv_to_toml(csv_path, toml_path=None, random_seed=None): """ 将CSV文件转换为TOML格式 - + Args: csv_path (str): 输入CSV文件路径 toml_path (str): 输出TOML文件路径,默认为相同目录下同名文件 @@ -86,92 +88,97 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None): if not csv_file.exists(): print(f"错误: CSV文件不存在 - {csv_path}") sys.exit(1) - + # 确定输出TOML文件路径 if toml_path is None: - toml_path = csv_file.with_suffix('.toml') + toml_path = csv_file.with_suffix(".toml") else: toml_path = Path(toml_path) - + # 读取CSV文件 try: - with open(csv_file, 'r', encoding='utf-8') as f: + with open(csv_file, "r", encoding="utf-8") as f: reader = csv.DictReader(f) rows = list(reader) except Exception as e: print(f"错误: 无法读取CSV文件 - {e}") sys.exit(1) - + # 检查CSV文件是否有数据 if not rows: print("错误: CSV文件为空或格式不正确") sys.exit(1) - + # 如果指定了随机种子,设置随机种子并打乱行顺序 if random_seed is not None: random.seed(random_seed) random.shuffle(rows) print(f"提示: 使用随机种子 {random_seed} 打乱了 section 顺序") - + # 生成TOML内容 toml_content = [] idx_counter = 1 - + for row in rows: # 处理ident列,为空时生成自动标识符 - ident = row.get('ident', '').strip() + ident = row.get("ident", "").strip() if not ident: ident = f"idx_{idx_counter}" idx_counter += 1 - + # 添加section标题 toml_content.append(f"[{ident}]") - + # 添加所有其他列作为键值对(排除ident列) for key, value in row.items(): - if key == 'ident': + if key == "ident": continue - + # 确保值存在且不为空 - if value is not None and str(value).strip() != '': + if value is not None and str(value).strip() != "": # 转义特殊字符并添加引号 escaped_value = str(value).replace('"', '\\"') toml_content.append(f'"{key}" = "{escaped_value}"') - + # section之间添加空行 toml_content.append("") - + # 写入TOML文件 try: - with open(toml_path, 'w', encoding='utf-8') as f: - f.write('\n'.join(toml_content).strip()) + with open(toml_path, "w", encoding="utf-8") as f: + f.write("\n".join(toml_content).strip()) print(f"成功: 已生成TOML文件 - {toml_path}") except Exception as e: print(f"错误: 无法写入TOML文件 - {e}") sys.exit(1) + def main(): """主函数""" parser = argparse.ArgumentParser( - description='将CSV文件转换为TOML格式,支持随机打乱section顺序', + description="将CSV文件转换为TOML格式,支持随机打乱section顺序", formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=''' + epilog=""" 示例: %(prog)s input.csv output.toml %(prog)s input.csv # 自动生成input.toml %(prog)s input.csv -r 42 # 使用种子42打乱顺序 %(prog)s input.csv -r 123 output.toml # 指定种子和输出路径 - ''' + """, ) - - parser.add_argument('csv_path', help='输入的CSV文件路径') - parser.add_argument('toml_path', nargs='?', help='输出的TOML文件路径,默认为CSV同名文件') - parser.add_argument('-r', '--random-seed', type=int, - help='随机种子,用于打乱TOML section的顺序') - + + parser.add_argument("csv_path", help="输入的CSV文件路径") + parser.add_argument( + "toml_path", nargs="?", help="输出的TOML文件路径,默认为CSV同名文件" + ) + parser.add_argument( + "-r", "--random-seed", type=int, help="随机种子,用于打乱TOML section的顺序" + ) + args = parser.parse_args() - + csv_to_toml(args.csv_path, args.toml_path, args.random_seed) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/heurams/tools/zmqclient.py b/src/heurams/tools/zmqclient.py index 1a7a481..cab1d5b 100644 --- a/src/heurams/tools/zmqclient.py +++ b/src/heurams/tools/zmqclient.py @@ -3,6 +3,7 @@ import pickle import readline import sys + class DebugClient: def __init__(self, port=5555): self.context = zmq.Context() @@ -12,7 +13,7 @@ class DebugClient: print("输入Python代码并按回车执行, 输入 'exit' 退出") print("可用变量: app, logger") print("-" * 50) - + def execute(self, code): """执行代码并返回结果""" try: @@ -21,7 +22,7 @@ class DebugClient: return response except Exception as e: return f"连接错误: {e}" - + def repl(self): """交互式REPL循环""" self.execute('print("test")') @@ -29,27 +30,28 @@ class DebugClient: try: # 获取用户输入 code = input(">>> ").strip() - + if not code: continue - - if code.lower() in ['exit', 'quit']: + + if code.lower() in ["exit", "quit"]: print("退出调试客户端") break - + # 执行代码 result = self.execute(code) print(f"结果: {result}\n") - + except KeyboardInterrupt: print("\n退出调试客户端") break except EOFError: break + if __name__ == "__main__": # 从命令行参数获取端口 port = int(sys.argv[1]) if len(sys.argv) > 1 else 5555 - + client = DebugClient(port) - client.repl() \ No newline at end of file + client.repl()