Compare commits

..

2 Commits

44 changed files with 576 additions and 348 deletions

View File

@@ -1,7 +1,7 @@
zmq_debug = false zmq_debug = true
_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开调试服务器\n可作为 HeurAMS 执行任意 python 代码, 无必要请关闭" _zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开一个服务器\n调试工具可远程在 HeurAMS 执行任意 python 代码, 无必要请关闭"
zmq_debug_port = 5555 zmq_debug_port = 5555
_zmq_debug_port_desc = "ZMQ 调试服务器端口" _zmq_debug_port_desc = "[调试] ZMQ 调试服务器端口"
enable_built_in_interface = false enable_built_in_interface = false
_enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)" _enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)"
_paths_desc = "用户数据路径定义" _paths_desc = "用户数据路径定义"

View File

@@ -4,7 +4,7 @@ quick_pass = true
_quick_pass_desc = "[调试] 启用快速应答功能(跳过测验)" _quick_pass_desc = "[调试] 启用快速应答功能(跳过测验)"
auto_pass = false auto_pass = false
_auto_pass_desc = "[调试] 自动通过测试模式" _auto_pass_desc = "[调试] 自动通过测试模式"
scheduled_num = "420" scheduled_num = 420
_scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)" _scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)"
algorithm = "NSP-0" algorithm = "NSP-0"
_algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)" _algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)"

2
data/config/repo/_.toml Normal file
View File

@@ -0,0 +1,2 @@
_cngk-t_desc = "高考必备古诗文"
_cngk_desc = "高考必备古诗文"

View File

@@ -0,0 +1,11 @@
algorithm = "NSP-0"
_algorithm_desc = "记忆调度算法"
scheduled_num = 420
_scheduled_num_desc = "单次记忆单元数量"
[_algorithm_candidate]
NSP-0 = "筛选用非间隔重复调度器"
none = "不设置默认调度器"
SM-2 = "第二代 SuperMemo 简单间隔重复调度器"
SM-15M = "第15代 SuperMemo 复杂间隔重复调度器 (不稳定且逆向工程)"
FSRS = "先进开放间隔重复调度器"

View File

@@ -0,0 +1,11 @@
algorithm = "SM-2"
_algorithm_desc = "记忆调度算法"
scheduled_num = 20
_scheduled_num_desc = "单次记忆单元数量"
[_algorithm_candidate]
NSP-0 = "筛选用非间隔重复调度器"
none = "不设置默认调度器"
SM-2 = "第二代 SuperMemo 简单间隔重复调度器"
SM-15M = "第15代 SuperMemo 复杂间隔重复调度器 (不稳定且逆向工程)"
FSRS = "先进开放间隔重复调度器"

View File

@@ -1,3 +1,4 @@
title = "高考必背古诗文-0" title = "高考必背古诗文-0"
package = "cngk-0"
author = "__heurams__" author = "__heurams__"
desc = "高考古诗文 60 篇" desc = "高考古诗文 60 篇"

View File

@@ -5,14 +5,12 @@ import heurams.kernel.particles as pt
import heurams.kernel.repolib as repolib import heurams.kernel.repolib as repolib
from heurams.services.textproc import truncate from heurams.services.textproc import truncate
repo = repolib.Repo.create_from_repodir(Path("./test_repo")) repo = repolib.Repo.from_repodir(Path("./test_repo"))
alist = list() alist = list()
print(repo.ident_index) print(repo.ident_index)
for i in repo.ident_index: for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data( n = pt.Nucleon.from_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i))
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) e = pt.Electron.from_data(
)
e = pt.Electron.create_on_electonic_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i) electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
) )
print(n) print(n)

View File

@@ -1,5 +1,6 @@
import heurams.services.version as ver import heurams.services.version as ver
# __main__.py # __main__.py
def main(): def main():
prompt = f"""HeurAMS {ver.ver} 已经被成功地安装在系统中. prompt = f"""HeurAMS {ver.ver} 已经被成功地安装在系统中.
@@ -17,5 +18,6 @@ python 代指您使用的解释器, 在某些发行版中可能是 python3, 而
注意: 一个常见的误区是, 执行 interface 下的 __main__.py 运行基本用户界面, 这会导致 Python 上下文环境异常, 请不要这样做.""" 注意: 一个常见的误区是, 执行 interface 下的 __main__.py 运行基本用户界面, 这会导致 Python 上下文环境异常, 请不要这样做."""
print(prompt) print(prompt)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -30,6 +30,7 @@ config_var: ContextVar[ConfigDict].get = ContextVar(
) )
"""配置对象的全局引用对象.""" """配置对象的全局引用对象."""
class ConfigContext: class ConfigContext:
""" """
功能完备的上下文管理器 功能完备的上下文管理器

View File

@@ -1,9 +1,11 @@
from time import sleep, perf_counter from time import sleep, perf_counter
print("欢迎使用基本用户界面!") print("欢迎使用基本用户界面!")
print("加载配置与上下文... ", end="", flush=True) print("加载配置与上下文... ", end="", flush=True)
_start1 = perf_counter() _start1 = perf_counter()
_start = perf_counter() _start = perf_counter()
from heurams.context import * from heurams.context import *
_end = perf_counter() _end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -11,6 +13,7 @@ print("加载用户界面框架... ", end="", flush=True)
_start = perf_counter() _start = perf_counter()
from textual.app import App from textual.app import App
from textual.widgets import Button from textual.widgets import Button
_end = perf_counter() _end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -22,6 +25,7 @@ from .screens.navigator import NavigatorScreen
from .screens.precache import PrecachingScreen from .screens.precache import PrecachingScreen
from .screens.setting import SettingScreen from .screens.setting import SettingScreen
from .screens.synctool import SyncScreen from .screens.synctool import SyncScreen
_end = perf_counter() _end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)") print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -29,14 +33,18 @@ print(f"组件目录: {rootdir}")
print(f"工作目录: {workdir}") print(f"工作目录: {workdir}")
_end1 = perf_counter() _end1 = perf_counter()
print(f"前置工作共计耗时: {round(1000 * (_end1 - _start1))}ms") print(f"前置工作共计耗时: {round(1000 * (_end1 - _start1))}ms")
class HeurAMSApp(App): class HeurAMSApp(App):
TITLE = "潜进" TITLE = "潜进"
CSS_PATH = "css/main.tcss" CSS_PATH = "css/main.tcss"
css_dir = pathlib.Path("css").resolve()
SUB_TITLE = "启发式辅助记忆调度器" SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [ BINDINGS = [
("q", "go_back", "退出"), ("q", "go_back", "退出"),
("d", "toggle_dark", "主题"), ("d", "toggle_dark", "主题"),
("n", "app.push_screen('navigator')", "导航"), ("n", "app.push_screen('navigator')", "导航"),
("s", "app.push_screen('setting')", "设置"),
("z", "app.push_screen('about')", "关于"), ("z", "app.push_screen('about')", "关于"),
] ]
SCREENS = { SCREENS = {
@@ -71,4 +79,4 @@ class HeurAMSApp(App):
def panic(self, *args): def panic(self, *args):
self._close_messages_no_wait() self._close_messages_no_wait()
raise self._exception raise self._exception

View File

@@ -7,6 +7,7 @@ import pickle
logger = get_logger(__name__) logger = get_logger(__name__)
def environment_check(): def environment_check():
from pathlib import Path from pathlib import Path
@@ -23,11 +24,12 @@ def environment_check():
print(f"找到 {i}") print(f"找到 {i}")
logger.debug("环境检查完成") logger.debug("环境检查完成")
def start_debug_server(app): def start_debug_server(app):
logger = get_logger("zmq_debug") logger = get_logger("zmq_debug")
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.REP) socket = context.socket(zmq.REP)
port = config_var.get()['global'].get('zmq_debug_port', 5555) port = config_var.get()["global"].get("zmq_debug_port", 5555)
socket.bind(f"tcp://*:{port}") socket.bind(f"tcp://*:{port}")
logger.info(f"ZMQ Debug server started on port {port}") logger.info(f"ZMQ Debug server started on port {port}")
first = 1 first = 1
@@ -36,7 +38,7 @@ def start_debug_server(app):
code = pickle.loads(msg) code = pickle.loads(msg)
namespace = {"app": app, "logger": logger, "config_var": config_var} namespace = {"app": app, "logger": logger, "config_var": config_var}
if first: if first:
app.title += ' [调试已连接]' app.title += " [调试已连接]"
first = 0 first = 0
try: try:
# 先尝试 eval # 先尝试 eval
@@ -52,15 +54,17 @@ def start_debug_server(app):
except Exception as e: except Exception as e:
socket.send(pickle.dumps(f"错误: {e}")) socket.send(pickle.dumps(f"错误: {e}"))
def main(): def main():
environment_check() environment_check()
app = HeurAMSApp() app = HeurAMSApp()
if config_var.get()['global'].get('zmq_debug', False): if config_var.get()["global"].get("zmq_debug", False):
threading.Thread(target=start_debug_server, args=(app,), daemon=True).start() threading.Thread(target=start_debug_server, args=(app,), daemon=True).start()
app.run(inline=False) app.run(inline=False)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -0,0 +1,18 @@
.repo-list {
}
#header {
height: 4;
}
.repo-list-item {
layout: grid;
grid-size: 2;
height: 3;
}
.repo-list-item-shortcut {
dock: right;
offset: -5% 0
}

View File

@@ -23,7 +23,7 @@ class AboutScreen(Screen):
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(id="about_container"): with ScrollableContainer(id="about_container"):
yield Label("[b]关于与版本信息[/b]") yield Label("[b]关于与版本信息[/b]")
# 获取系统信息 # 获取系统信息
textual_version = self._get_textual_version() textual_version = self._get_textual_version()
terminal_info = self._get_terminal_info() terminal_info = self._get_terminal_info()
@@ -31,7 +31,7 @@ class AboutScreen(Screen):
os_version = self._get_os_version() os_version = self._get_os_version()
disk_usage = self._get_disk_usage() disk_usage = self._get_disk_usage()
memory_info = self._get_memory_info() memory_info = self._get_memory_info()
about_text = f""" about_text = f"""
# 关于 "潜进" # 关于 "潜进"
@@ -95,36 +95,39 @@ Textual 框架版本: {textual_version}
event.stop() event.stop()
if event.button.id == "back_button": if event.button.id == "back_button":
self.action_go_back() self.action_go_back()
def _get_textual_version(self) -> str: def _get_textual_version(self) -> str:
"""获取 Textual 框架版本""" """获取 Textual 框架版本"""
try: try:
import textual import textual
return textual.__version__ return textual.__version__
except (ImportError, AttributeError): except ImportError, AttributeError:
return "未知" return "未知"
def _get_terminal_info(self) -> str: def _get_terminal_info(self) -> str:
"""获取终端模拟器信息""" """获取终端模拟器信息"""
terminal = shutil.which("terminal") terminal = shutil.which("terminal")
if terminal: if terminal:
return terminal return terminal
# 尝试从环境变量获取 # 尝试从环境变量获取
terminal_env = os.environ.get('TERM_PROGRAM') or os.environ.get('TERM') terminal_env = os.environ.get("TERM_PROGRAM") or os.environ.get("TERM")
return terminal_env or "未知" return terminal_env or "未知"
def _get_python_version(self) -> str: def _get_python_version(self) -> str:
"""获取 Python 解释器版本""" """获取 Python 解释器版本"""
return platform.python_version() return platform.python_version()
def _get_os_version(self) -> str: def _get_os_version(self) -> str:
"""获取操作系统版本""" """获取操作系统版本"""
try: try:
if platform.system() == "Darwin": if platform.system() == "Darwin":
# macOS # macOS
import subprocess import subprocess
result = subprocess.run(['sw_vers', '-productVersion'],
capture_output=True, text=True) result = subprocess.run(
["sw_vers", "-productVersion"], capture_output=True, text=True
)
return f"macOS {result.stdout.strip()}" return f"macOS {result.stdout.strip()}"
elif platform.system() == "Windows": elif platform.system() == "Windows":
# Windows # Windows
@@ -133,30 +136,31 @@ Textual 框架版本: {textual_version}
# Linux - 尝试获取发行版信息 # Linux - 尝试获取发行版信息
try: try:
import distro import distro
return f"{distro.name()} {distro.version()}" return f"{distro.name()} {distro.version()}"
except (ImportError, AttributeError): except ImportError, AttributeError:
return platform.platform() return platform.platform()
else: else:
return platform.platform() return platform.platform()
except Exception: except Exception:
return platform.platform() return platform.platform()
def _get_disk_usage(self) -> str: def _get_disk_usage(self) -> str:
"""获取磁盘使用情况""" """获取磁盘使用情况"""
try: try:
usage = psutil.disk_usage('/') usage = psutil.disk_usage("/")
free_gb = usage.free / (1024 ** 3) free_gb = usage.free / (1024**3)
total_gb = usage.total / (1024 ** 3) total_gb = usage.total / (1024**3)
percent_free = (free_gb / total_gb) * 100 percent_free = (free_gb / total_gb) * 100
return f"{free_gb:.1f} GB ({percent_free:.1f}%)" return f"{free_gb:.1f} GB ({percent_free:.1f}%)"
except Exception: except Exception:
return "未知" return "未知"
def _get_memory_info(self) -> str: def _get_memory_info(self) -> str:
"""获取内存信息""" """获取内存信息"""
try: try:
memory = psutil.virtual_memory() memory = psutil.virtual_memory()
total_gb = memory.total / (1024 ** 3) total_gb = memory.total / (1024**3)
return f"{total_gb:.1f} GB" return f"{total_gb:.1f} GB"
except Exception: except Exception:
return "未知" return "未知"

View File

@@ -35,6 +35,8 @@ class DashboardScreen(Screen):
("q", "go_back", "返回"), ("q", "go_back", "返回"),
] ]
CSS_PATH = Path(__file__).parent.parent / 'css' / "screens" / "dashboard.tcss"
def __init__( def __init__(
self, self,
name: str | None = None, name: str | None = None,
@@ -42,96 +44,91 @@ class DashboardScreen(Screen):
classes: str | None = None, classes: str | None = None,
) -> None: ) -> None:
super().__init__(name, id, classes) super().__init__(name, id, classes)
self.repostat = {} self.repolink = {}
self.title2dirname = {}
self.title2repo = {}
self.dirname2repo = {}
self._load_data()
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""组合界面组件""" """组合界面组件"""
self._load_data()
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(): with ScrollableContainer():
yield Horizontal( yield Horizontal( # 顶部的状态
Vertical( Vertical(
Label(f'欢迎使用 "潜进" 版本 {version.ver}'), Label(f'欢迎使用 "潜进" 版本 {version.ver}'),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label( Label(
f"当前 UNIX 日时间戳: {timer.get_daystamp()}" f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"
), ),
Label(f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"), Label(
Label(f"全局算法设置: {config_var.get()['interface']['global']['algorithm']}: {algorithms[config_var.get()['interface']['global']['algorithm']].desc}"), f"默认算法设置: {config_var.get()['interface']['global']['algorithm']}"
classes="column infview", ),
classes="left",
), ),
Vertical( Vertical(
Label(f"已加载 {len(self.repostat)} 个单元集", classes='dataview'), Label(f"已加载 {len(self.repos)} 个单元集"),
Label(f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.get('unit_sum'), self.repostat.values()))} 个单元", classes='dataview'), Label(
Label(f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.get('activated_sum'), self.repostat.values()))} 个单元", classes='dataview'), f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.progress['total'], self.repos))} 个单元"
),
Label(
f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.progress['touched'], self.repos))} 个单元"
),
Label(f""), Label(f""),
classes="column dataview", classes="right",
), ),
id="dashboardtop" id="header",
) )
yield ListView(id="repo-list", classes="repo-list-view") yield ListView(id="repo_list", classes="repo-list") # 单元集选择
yield Label(f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}')
yield Label(
f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}'
) # 版本信息
yield Footer() yield Footer()
def _load_data(self): def _load_data(self):
self.repo_dirs = Repo.probe_valid_repos_in_dir( repo_dirs = Repo.probe_valid_repos_in_dir(
Path(config_var.get()['global']["paths"]["data"]) / "repo" Path(config_var.get()["global"]["paths"]["repo"])
) )
for repo_dir in self.repo_dirs: self.repos = list(map(Repo.from_repodir, repo_dirs))
repo = Repo.create_from_repodir(repo_dir) for repo in self.repos:
self._analyse_repo(repo) self._analyse_repo(repo)
def _analyse_repo(self, repo: Repo): def _analyse_repo(self, repo: Repo):
dirname = repo.source.name # type: ignore # need_review: 需要/不需要学习
title = repo.manifest["title"] # nearest_review_time: 最近下次学习时间
is_due = 0 # progress: 进度
unit_sum = len(repo) # algotype: 算法类型
activated_sum = 0 ## initial_time: 起始时间
nextdate = float('inf') # package: 包名
for i in repo.ident_index: # prompt: 最终呈现信息
nucleon = pt.Nucleon.create_on_nucleonic_data( repo.package = repo.manifest["package"]
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) repo.nearest_review_time = float("inf")
) repo.progress = {
electron = pt.Electron.create_on_electonic_data( "total": repo.data_length,
electronic_data=repo.electronic_data_lict.get_itemic_unit(i) "touched": 0,
)
if electron.is_activated():
activated_sum += 1
if electron.is_due():
is_due = 1
nextdate = min(nextdate, electron.nextdate())
is_unfinished = unit_sum > activated_sum
if is_unfinished:
nextdate = min(nextdate, timer.get_daystamp())
need_to_study = is_due or is_unfinished
prompt = f"{title}\0\n 进度: {activated_sum}/{unit_sum} ({round(activated_sum/unit_sum*100)}%)\n {'需要学习' if need_to_study else '无需操作'}"
stat = {
"is_due": is_due,
"unit_sum": unit_sum,
"title": title,
"activated_sum": activated_sum,
"nextdate": nextdate,
"is_unfinished": is_unfinished,
"need_to_study": need_to_study,
"prompt": prompt,
"dirname": dirname,
} }
self.repostat[dirname] = stat initial_time = float("inf")
self.title2dirname[title] = dirname for i in range(repo.data_length):
self.title2repo[title] = repo e = pt.Electron.from_data(repo.electronic_data_lict[i])
self.dirname2repo[dirname] = repo n = pt.Nucleon.from_data(repo.nucleonic_data_lict[i])
if e.is_activated():
repo.algotype = e.algoname
repo.progress["touched"] += 1
repo.nearest_review_time = min(repo.nearest_review_time, e.nextdate())
# initial_time = min(initial_time, e.)
repo.need_review = timer.get_daystamp() >= repo.nearest_review_time
repo.prompt = f"""{repo.manifest['title']} ({repo.algotype})
进度: {repo.progress['touched']}/{repo.progress['total']} ({round(repo.progress['touched']/repo.progress['total']*100, 1)}%)
{'需要学习' if repo.need_review else "无需操作"}
"""
def on_mount(self) -> None: def on_mount(self) -> None:
"""挂载组件时初始化""" """挂载组件时初始化"""
repo_list_widget = self.query_one("#repo-list", ListView) repo_list_widget = self.query_one("#repo_list", ListView)
# 按下次复习时间排序 # 按下次复习时间排序
repodirs = sorted( repodirs = sorted(
self.repo_dirs, self.repos,
key=lambda f: self.repostat[f.name]["nextdate"], key=lambda r: r.nearest_review_time,
reverse=True, reverse=True,
) )
repotitles = map(lambda f: self.repostat[f.name]["title"], repodirs) repotitles = map(lambda f: self.repostat[f.name]["title"], repodirs)
@@ -140,43 +137,44 @@ class DashboardScreen(Screen):
repo_list_widget.append( repo_list_widget.append(
ListItem( ListItem(
Static( Static(
"./data/repo/ 中未找到任何仓库.\n" f"{config_var.get()['global']['paths']['repo']} 中未找到任何仓库.\n"
"请导入仓库后重启应用, 或者新建空的仓库." "请导入仓库后重启应用, 或者新建空的仓库."
) ),
id="not-found",
) )
) )
repo_list_widget.disabled = True repo_list_widget.disabled = True
return return
for repotitle in repotitles: for r in self.repos:
prompt = self.repostat[self.title2dirname[repotitle]]["prompt"] self.repolink[str(id(r))] = r # 用于规避 ctype id 对象还原
list_item = ListItem(Label(prompt), Button(f"开始学习", flat=True, variant="primary", classes="repo_listitem_btn", id=f"launch_{self.repostat[self.title2dirname[repotitle]]['dirname']}"), classes="repo_listitem") list_item = ListItem(
Label(r.prompt),
Button(
f"开始学习",
flat=True,
variant="primary",
id=f"slaunch_repo_{id(r)}",
classes="repo-list-item-shortcut",
),
classes="repo-list-item",
id=f"launch_repo_{id(r)}",
)
repo_list_widget.append(list_item) repo_list_widget.append(list_item)
# if not self.stay_enabled[repodir]:
# list_item.disabled = True
def on_list_view_selected(self, event) -> None: def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件""" """处理列表项选择事件"""
if not isinstance(event.item, ListItem): if not isinstance(event.item, ListItem):
return return
selected_label = event.item.query_one(Label) if "not-found" == event.item.id:
label_text = str(selected_label.render())
if "未找到任何仓库" in label_text:
return return
# 提取文件名 # 还原对象
selected_repotitle = label_text.partition("\0")[0].replace("*", "") selected_repo = self.repolink[event.item.id.lstrip("launch_repo_")]
selected_repo = self.title2repo[label_text.partition("\0")[0].replace("*", "")]
# 跳转到准备屏幕 # 跳转到准备屏幕
self.app.push_screen( self.app.push_screen(PreparationScreen(selected_repo))
PreparationScreen(
selected_repo, self.repostat[self.title2dirname[selected_repotitle]]
)
)
def action_quit_app(self) -> None: def action_quit_app(self) -> None:
"""退出应用程序""" """退出应用程序"""
@@ -187,9 +185,10 @@ class DashboardScreen(Screen):
self.app.push_screen(NavigatorScreen()) self.app.push_screen(NavigatorScreen())
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:
logger.debug(f"event.button.id: {event.button.id}")
"""处理按钮点击事件""" """处理按钮点击事件"""
if str(event.button.id).startswith("launch_"): # type: ignore logger.debug(f"event.button.id: {event.button.id}")
if event.button.id.startswith("slaunch_repo_"): # type: ignore
from .preparation import launch from .preparation import launch
launch(repo=self.dirname2repo[event.button.id[7:]], app=self.app, scheduled_num=-1) # type: ignore
launch(repo=self.repolink[event.button.id.lstrip("slaunch_repo_")], app=self.app, scheduled_num=-1) # type: ignore
# TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中 # TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中

View File

@@ -68,7 +68,7 @@ class FavoriteManagerScreen(Screen):
if self.favorites: if self.favorites:
list_view = self.query_one("#favorites-list") list_view = self.query_one("#favorites-list")
for fav in self.favorites: for fav in self.favorites:
list_view.append(self._create_favorite_item(fav)) # type: ignore list_view.append(self._create_favorite_item(fav)) # type: ignore
def _encode_favorite_key(self, repo_path: str, ident: str) -> str: def _encode_favorite_key(self, repo_path: str, ident: str) -> str:
"""编码仓库路径和标识符为安全的按钮 ID 部分""" """编码仓库路径和标识符为安全的按钮 ID 部分"""
@@ -115,12 +115,12 @@ class FavoriteManagerScreen(Screen):
def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]: def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]:
"""获取仓库信息(标题、原子内容预览)""" """获取仓库信息(标题、原子内容预览)"""
try: try:
data_repo = Path(config_var.get()['global']["paths"]["data"]) / "repo" data_repo = Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dir = data_repo / repo_path repo_dir = data_repo / repo_path
if not repo_dir.exists(): if not repo_dir.exists():
logger.warning("仓库目录不存在: %s", repo_dir) logger.warning("仓库目录不存在: %s", repo_dir)
return None return None
repo = Repo.create_from_repodir(repo_dir) repo = Repo.from_repodir(repo_dir)
# 获取原子内容预览 # 获取原子内容预览
content_preview = "" content_preview = ""
payload = repo.payload payload = repo.payload
@@ -201,4 +201,4 @@ class FavoriteManagerScreen(Screen):
def action_toggle_dark(self) -> None: def action_toggle_dark(self) -> None:
"""切换暗黑模式""" """切换暗黑模式"""
self.app.dark = not self.app.dark # type: ignore self.app.dark = not self.app.dark # type: ignore

View File

@@ -38,7 +38,7 @@ class MemScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""), ("0,1,2,3", "app.push_screen('about')", ""),
] ]
if config_var.get()['interface']['global']["quick_pass"]: if config_var.get()["interface"]["global"]["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "正确应答")) BINDINGS.append(("k", "quick_pass", "正确应答"))
BINDINGS.append(("f", "quick_fail", "错误应答")) BINDINGS.append(("f", "quick_fail", "错误应答"))
rating = reactive(-1) rating = reactive(-1)
@@ -70,7 +70,6 @@ class MemScreen(Screen):
"""更新状态机""" """更新状态机"""
self.procession: Procession = self.phaser.current_procession() # type: ignore self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom # type: ignore self.atom: pt.Atom = self.procession.current_atom # type: ignore
def on_mount(self): def on_mount(self):
self.fission = self.procession.get_fission() self.fission = self.procession.get_fission()
@@ -93,7 +92,7 @@ class MemScreen(Screen):
if self.repo is not None: if self.repo is not None:
fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏" fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏"
s += f"收藏: {fav_status}\n" s += f"收藏: {fav_status}\n"
'''if config_var.get().get("debug_topline", 0): """if config_var.get().get("debug_topline", 0):
try: try:
alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore
s += f"谜题: {alia}\n" s += f"谜题: {alia}\n"
@@ -113,7 +112,7 @@ class MemScreen(Screen):
stat = self.fission.__repr__("simple", "") stat = self.fission.__repr__("simple", "")
s += f"{stat}\n" s += f"{stat}\n"
except Exception as e: except Exception as e:
s = str(e)''' s = str(e)"""
s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}" s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}"
return s return s
@@ -139,9 +138,9 @@ class MemScreen(Screen):
i.remove() i.remove()
from heurams.interface.widgets.finished import Finished from heurams.interface.widgets.finished import Finished
if config_var.get()['interface']['global']["persist_to_file"]: if config_var.get()["interface"]["global"]["persist_to_file"]:
self.save_func() self.save_func()
container.mount(Finished(is_saved=['interface']['global']["persist_to_file"])) container.mount(Finished(is_saved=["interface"]["global"]["persist_to_file"]))
def on_button_pressed(self, event): def on_button_pressed(self, event):
event.stop() event.stop()
@@ -156,7 +155,7 @@ class MemScreen(Screen):
from heurams.services.audio_service import play_by_path from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5 from heurams.services.hasher import get_md5
path = Path(config_var.get()['global']["paths"]["data"]) / "cache" / "voice" path = Path(config_var.get()["global"]["paths"]["data"]) / "cache" / "voice"
path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav" path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav"
if path.exists(): if path.exists():
play_by_path(path) play_by_path(path)
@@ -177,7 +176,6 @@ class MemScreen(Screen):
self.forward(new_rating) self.forward(new_rating)
self.rating = -1 self.rating = -1
def forward(self, rating): def forward(self, rating):
self.update_state() self.update_state()
allow_forward = 1 if rating >= 4 else 0 allow_forward = 1 if rating >= 4 else 0
@@ -226,7 +224,7 @@ class MemScreen(Screen):
return "" return ""
# self.repo.source 是 Path 对象,指向仓库目录 # self.repo.source 是 Path 对象,指向仓库目录
repo_full_path = self.repo.source repo_full_path = self.repo.source
data_repo_path = Path(config_var.get()['global']["paths"]["data"]) / "repo" data_repo_path = Path(config_var.get()["global"]["paths"]["data"]) / "repo"
try: try:
rel_path = repo_full_path.relative_to(data_repo_path) rel_path = repo_full_path.relative_to(data_repo_path)
return str(rel_path) return str(rel_path)

View File

@@ -53,7 +53,11 @@ class NavigatorScreen(ModalScreen):
) )
yield Static("按下回车以完成切换\n所有会话将被保存") yield Static("按下回车以完成切换\n所有会话将被保存")
yield Button( yield Button(
"关闭 (n)", id="close_button", variant="primary", classes="close-button", flat=True "关闭 (n)",
id="close_button",
variant="primary",
classes="close-button",
flat=True,
) )
def on_mount(self) -> None: def on_mount(self) -> None:

View File

@@ -13,16 +13,16 @@ import heurams.services.hasher as hasher
from heurams.context import * from heurams.context import *
# 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache # 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache
paths = config_var.get()['global']["paths"] paths = config_var.get()["global"]["paths"]
cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice" cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice"
def format_size(bytes_num: int) -> str: def format_size(bytes_num: int) -> str:
"""将字节数格式化为人类可读的字符串""" """将字节数格式化为人类可读的字符串"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']: for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_num < 1024.0: if bytes_num < 1024.0:
return f"{bytes_num:.2f} {unit}" return f"{bytes_num:.2f} {unit}"
bytes_num /= 1024.0 # type: ignore bytes_num /= 1024.0 # type: ignore
return f"{bytes_num:.2f} PB" return f"{bytes_num:.2f} PB"
@@ -54,16 +54,24 @@ class PrecachingScreen(Screen):
self.cancel_flag = 0 self.cancel_flag = 0
self.desc = desc self.desc = desc
# 不再需要缓存配置,保留配置读取以兼容 # 不再需要缓存配置,保留配置读取以兼容
self.cache_stats = {"total_size": 0, "file_count": 0, "human_size": "0 B", "cached_units": 0, "total_units": 0, "cache_rate": 0} self.cache_stats = {
"total_size": 0,
"file_count": 0,
"human_size": "0 B",
"cached_units": 0,
"total_units": 0,
"cache_rate": 0,
}
self._update_cache_stats() self._update_cache_stats()
def _get_total_units(self) -> int: def _get_total_units(self) -> int:
"""获取所有仓库的总单元数""" """获取所有仓库的总单元数"""
from heurams.context import config_var from heurams.context import config_var
from heurams.kernel.repolib import Repo from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs) repos = map(Repo.from_repodir, repo_dirs)
total = 0 total = 0
for repo in repos: for repo in repos:
try: try:
@@ -86,7 +94,7 @@ class PrecachingScreen(Screen):
cached_units += 1 cached_units += 1
total_units = self._get_total_units() total_units = self._get_total_units()
cache_rate = (cached_units / total_units * 100) if total_units > 0 else 0 cache_rate = (cached_units / total_units * 100) if total_units > 0 else 0
self.cache_stats["total_size"] = total_size self.cache_stats["total_size"] = total_size
self.cache_stats["file_count"] = file_count self.cache_stats["file_count"] = file_count
self.cache_stats["human_size"] = format_size(total_size) self.cache_stats["human_size"] = format_size(total_size)
@@ -101,11 +109,15 @@ class PrecachingScreen(Screen):
with Container(): with Container():
yield Static( yield Static(
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)", f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)",
classes="cache-usage-text" classes="cache-usage-text",
) )
if self.nucleons: if self.nucleons:
yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info") yield Static(
yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info") f"目标单元归属: [b]{self.desc}[/b]", classes="target-info"
)
yield Static(
f"单元数量: {len(self.nucleons)}", classes="target-info"
)
else: else:
yield Static("目标: 所有单元", classes="target-info") yield Static("目标: 所有单元", classes="target-info")
@@ -114,16 +126,26 @@ class PrecachingScreen(Screen):
yield ProgressBar(total=100, show_eta=False, id="progress_bar") yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"): with Horizontal(classes="button-group"):
if not self.is_precaching: if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary") yield Button(
"开始预缓存", id="start_precache", variant="primary"
)
else: else:
yield Button("取消预缓存", id="cancel_precache", variant="error") yield Button(
"取消预缓存", id="cancel_precache", variant="error"
)
yield Button("清空缓存", id="clear_cache", variant="warning") yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default") yield Button("返回", id="go_back", variant="default")
with Container(classes="cache-info"): with Container(classes="cache-info"):
yield Static(f"缓存路径: {cache_dir}", classes="cache-path") yield Static(f"缓存路径: {cache_dir}", classes="cache-path")
yield Static(f"文件数: {self.cache_stats['file_count']}", classes="cache-count") yield Static(
yield Static(f"总大小: {self.cache_stats['human_size']}", classes="cache-size") f"文件数: {self.cache_stats['file_count']}", classes="cache-count"
yield Button("刷新", id="refresh_cache_stats", variant="default", flat=True) )
yield Static(
f"总大小: {self.cache_stats['human_size']}", classes="cache-size"
)
yield Button(
"刷新", id="refresh_cache_stats", variant="default", flat=True
)
yield Static("若您离开此界面, 未完成的缓存进程会自动停止.") yield Static("若您离开此界面, 未完成的缓存进程会自动停止.")
yield Static('缓存程序支持 "断点续传".') yield Static('缓存程序支持 "断点续传".')
@@ -230,9 +252,9 @@ class PrecachingScreen(Screen):
from heurams.context import config_var, rootdir, workdir from heurams.context import config_var, rootdir, workdir
from heurams.kernel.repolib import Repo from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo" repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs) repos = map(Repo.from_repodir, repo_dirs)
# 计算总项目数 # 计算总项目数
self.total = 0 self.total = 0
@@ -241,7 +263,7 @@ class PrecachingScreen(Screen):
try: try:
for i in repo.ident_index: for i in repo.ident_index:
nucleon_list.append( nucleon_list.append(
pt.Nucleon.create_on_nucleonic_data( pt.Nucleon.from_data(
repo.nucleonic_data_lict.get_itemic_unit(i) repo.nucleonic_data_lict.get_itemic_unit(i)
) )
) )

View File

@@ -5,7 +5,16 @@ from textual.containers import ScrollableContainer
from textual.reactive import reactive from textual.reactive import reactive
from textual.screen import Screen from textual.screen import Screen
from textual.widget import Widget from textual.widget import Widget
from textual.widgets import Button, Footer, Header, Label, Markdown, Static, Rule, Sparkline from textual.widgets import (
Button,
Footer,
Header,
Label,
Markdown,
Static,
Rule,
Sparkline,
)
import heurams.kernel.particles as pt import heurams.kernel.particles as pt
import heurams.services.hasher as hasher import heurams.services.hasher as hasher
@@ -28,20 +37,19 @@ class PreparationScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""), ("0,1,2,3", "app.push_screen('about')", ""),
] ]
scheduled_num = reactive(config_var.get()['interface']['global']["scheduled_num"]) scheduled_num = reactive(config_var.get()["interface"]["global"]["scheduled_num"])
def __init__(self, repo: Repo, repostat: dict) -> None: def __init__(self, repo: Repo) -> None:
super().__init__(name=None, id=None, classes=None) super().__init__(name=None, id=None, classes=None)
self.repo = repo self.repo = repo
self.repostat = repostat
self.load_data() self.load_data()
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(id="vice_container"): with ScrollableContainer(id="vice_container"):
yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n") yield Label(f"准备就绪: [b]{self.repo.manifest['title']}[/b]\n")
yield Label( yield Label(
f"仓库路径: {config_var.get()['global']['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]" f"[b]仓库路径: {self.repo.source}[/b]"
) )
yield Label(f"\n单元数量: {len(self.repo)}\n") yield Label(f"\n单元数量: {len(self.repo)}\n")
yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label") yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label")
@@ -62,12 +70,12 @@ class PreparationScreen(Screen):
yield Static() yield Static()
yield Sparkline(self.spark_line_arr, summary_function=max) yield Sparkline(self.spark_line_arr, summary_function=max)
yield Rule() yield Rule()
#yield Static(str(self.spark_line_arr)) # yield Static(str(self.spark_line_arr))
yield Static(f"单元状态预览:\n") yield Static(f"单元状态预览:\n")
for i in self.content.splitlines(): for i in self.content.splitlines():
yield Static(i, classes="full") yield Static(i, classes="full")
yield Footer() yield Footer()
# def watch_scheduled_num(self, old_scheduled_num, new_scheduled_num): # def watch_scheduled_num(self, old_scheduled_num, new_scheduled_num):
# logger.debug("响应", old_scheduled_num, "->", new_scheduled_num) # logger.debug("响应", old_scheduled_num, "->", new_scheduled_num)
# try: # try:
@@ -80,19 +88,21 @@ class PreparationScreen(Screen):
content = "" content = ""
spark_line_arr = [] spark_line_arr = []
for i in self.repo.ident_index: for i in self.repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data( n = pt.Nucleon.from_data(
nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i) nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i)
) )
e = pt.Electron.create_on_electonic_data(electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i)) e = pt.Electron.from_data(
electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i)
)
statstr = "" statstr = ""
if e.is_activated(): if e.is_activated():
statstr = '[#00ff00]A[/]' statstr = "[#00ff00]A[/]"
if e.is_due(): if e.is_due():
statstr = '[#ffff00]R[/]' statstr = "[#ffff00]R[/]"
#statstr += ('[dim]' + str(e.rept(real_rept=True)).zfill(2)+'[/]') # statstr += ('[dim]' + str(e.rept(real_rept=True)).zfill(2)+'[/]')
else: else:
statstr = '[#ff0000]U[/]' statstr = "[#ff0000]U[/]"
spark_line_arr.append(e.rept(real_rept=True)) spark_line_arr.append(e.rept(real_rept=True))
content += f" {statstr} {n['content'].replace('/', '')} \n" content += f" {statstr} {n['content'].replace('/', '')} \n"
self.content = content self.content = content
@@ -107,9 +117,7 @@ class PreparationScreen(Screen):
lst = list() lst = list()
for i in self.repo.ident_index: for i in self.repo.ident_index:
lst.append( lst.append(
pt.Nucleon.create_on_nucleonic_data( pt.Nucleon.from_data(self.repo.nucleonic_data_lict.get_itemic_unit(i))
self.repo.nucleonic_data_lict.get_itemic_unit(i)
)
) )
precache_screen = PrecachingScreen( precache_screen = PrecachingScreen(
nucleons=lst, desc=self.repo.manifest["title"] nucleons=lst, desc=self.repo.manifest["title"]
@@ -128,15 +136,16 @@ class PreparationScreen(Screen):
elif event.button.id == "precache_button": elif event.button.id == "precache_button":
self.action_precache() self.action_precache()
def launch(repo, app, scheduled_num): def launch(repo, app, scheduled_num):
if scheduled_num == -1: if scheduled_num == -1:
scheduled_num = config_var.get()['interface']['global']["scheduled_num"] scheduled_num = config_var.get()["interface"]["global"]["scheduled_num"]
atoms = list() atoms = list()
for i in repo.ident_index: for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data( n = pt.Nucleon.from_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i) nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
) )
e = pt.Electron.create_on_electonic_data( e = pt.Electron.from_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i) electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
) )
a = pt.Atom(n, e, repo.orbitic_data) a = pt.Atom(n, e, repo.orbitic_data)

View File

@@ -8,7 +8,19 @@ import os
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.containers import ScrollableContainer, Container, Horizontal, Vertical from textual.containers import ScrollableContainer, Container, Horizontal, Vertical
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static, Collapsible, Input, Switch, Select from textual.widgets import (
Button,
Footer,
Header,
Label,
ListItem,
ListView,
Static,
Collapsible,
Input,
Switch,
Select,
)
from textual.layouts import horizontal from textual.layouts import horizontal
import heurams.kernel.particles as pt import heurams.kernel.particles as pt
@@ -45,13 +57,15 @@ class SettingScreen(Screen):
"""组合界面组件""" """组合界面组件"""
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(): with ScrollableContainer():
yield Label('[b]设置页面[/b]') yield Label("[b]设置页面[/b]")
for i in config_var.get(): for i in config_var.get():
if i.startswith('_'): if i.startswith("_"):
continue continue
a = self._get_subcfg(f'{i}') a = self._get_subcfg(f"{i}")
if a: if a:
yield Collapsible(*a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}') yield Collapsible(
*a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}'
)
yield Footer() yield Footer()
def _get_subcfg(self, parent_epath: str): def _get_subcfg(self, parent_epath: str):
@@ -60,61 +74,115 @@ class SettingScreen(Screen):
if parent.is_dir: if parent.is_dir:
lst = list() lst = list()
for i in parent: for i in parent:
if i.startswith('_'): if i.startswith("_"):
continue continue
a = self._get_subcfg(f"{parent_epath}.{i}") a = self._get_subcfg(f"{parent_epath}.{i}")
if a: if a:
lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}')) lst.append(
Collapsible(
*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'
)
)
return lst return lst
if isinstance(parent, dict) or (isinstance(parent, ConfigDict) and not parent.is_dir): if isinstance(parent, dict) or (
isinstance(parent, ConfigDict) and not parent.is_dir
):
lst = list() lst = list()
for i in parent: for i in parent:
if i.startswith('_'): if i.startswith("_"):
continue continue
if isinstance(parent[i], dict): if isinstance(parent[i], dict):
a = self._get_subcfg(f"{parent_epath}.{i}") a = self._get_subcfg(f"{parent_epath}.{i}")
if a: if a:
lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}')) lst.append(
elif f'_{i}_candidate' in parent: # 选择框模式 Collapsible(
if isinstance(parent[f'_{i}_candidate'], dict): *a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'
lst.append(Horizontal( )
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), )
Select(((f"{j} ({k})", j) for j, k in parent[f'_{i}_candidate'].items()), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")), elif f"_{i}_candidate" in parent: # 选择框模式
classes='container' if isinstance(parent[f"_{i}_candidate"], dict):
)) lst.append(
elif isinstance(parent[f'_{i}_candidate'], list): Horizontal(
lst.append(Horizontal( Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), Select(
Select(((j, j) for j in parent[f'_{i}_candidate']), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")), (
classes='container' (f"{j} ({k})", j)
)) for j, k in parent[f"_{i}_candidate"].items()
),
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[f"_{i}_candidate"], list):
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Select(
((j, j) for j in parent[f"_{i}_candidate"]),
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
else: else:
if isinstance(parent[i], float): if isinstance(parent[i], float):
lst.append(Horizontal( lst.append(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), Horizontal(
Input(value=str(parent[i]), placeholder='要求一个浮点数', type='number', id=domize(f"{parent_epath}.{i}")), Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
classes='container')) Input(
value=str(parent[i]),
placeholder="要求一个浮点数",
type="number",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], str): elif isinstance(parent[i], str):
lst.append(Horizontal( lst.append(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), Horizontal(
Input(value=parent[i], placeholder='要求一个字符串', type='text', id=domize(f"{parent_epath}.{i}")), Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
classes='container')) Input(
value=parent[i],
placeholder="要求一个字符串",
type="text",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], bool): elif isinstance(parent[i], bool):
lst.append(Horizontal( lst.append(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), Horizontal(
Switch(value=parent[i], id=domize(f"{parent_epath}.{i}")), Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
classes='container')) Switch(
value=parent[i], id=domize(f"{parent_epath}.{i}")
),
classes="container",
)
)
elif isinstance(parent[i], int): elif isinstance(parent[i], int):
lst.append(Horizontal( lst.append(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'), Horizontal(
Input(value=str(parent[i]), placeholder='要求一个整数', type='integer', id=domize(f"{parent_epath}.{i}")), Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
classes='container')) Input(
value=str(parent[i]),
placeholder="要求一个整数",
type="integer",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], list): elif isinstance(parent[i], list):
pass pass
else: else:
lst.append(Label('未知类型')) lst.append(Label("未知类型"))
return lst return lst
return [Label('无子项')] return [Label("无子项")]
def on_mount(self) -> None: def on_mount(self) -> None:
"""挂载组件时初始化""" """挂载组件时初始化"""
@@ -133,14 +201,18 @@ class SettingScreen(Screen):
"""打开导航器""" """打开导航器"""
self.app.push_screen(NavigatorScreen()) self.app.push_screen(NavigatorScreen())
def on_input_changed(self, event: Input.Changed) -> None: def on_input_changed(self, event: Input.Changed) -> None:
widget_id = event.input.id widget_id = event.input.id
if not widget_id: if not widget_id:
return return
eepath = undomize(widget_id) eepath = undomize(widget_id)
value = event.value value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)
def on_switch_changed(self, event: Switch.Changed) -> None: def on_switch_changed(self, event: Switch.Changed) -> None:
widget_id = event.switch.id widget_id = event.switch.id
@@ -148,7 +220,12 @@ class SettingScreen(Screen):
return return
eepath = undomize(widget_id) eepath = undomize(widget_id)
value = event.value value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)
def on_select_changed(self, event: Select.Changed) -> None: def on_select_changed(self, event: Select.Changed) -> None:
widget_id = event.select.id widget_id = event.select.id
@@ -156,4 +233,9 @@ class SettingScreen(Screen):
return return
eepath = undomize(widget_id) eepath = undomize(widget_id)
value = event.value value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value)) epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)

View File

@@ -15,7 +15,7 @@ class BasePuzzleWidget(Widget):
id: str | None = None, id: str | None = None,
classes: str | None = None, classes: str | None = None,
disabled: bool = False, disabled: bool = False,
markup: bool = True markup: bool = True,
) -> None: ) -> None:
super().__init__( super().__init__(
*children, *children,
@@ -23,7 +23,7 @@ class BasePuzzleWidget(Widget):
id=id, id=id,
classes=classes, classes=classes,
disabled=disabled, disabled=disabled,
markup=markup markup=markup,
) )
self.atom = atom self.atom = atom

View File

@@ -83,7 +83,7 @@ class ClozePuzzle(BasePuzzleWidget):
if lst: if lst:
lastone = lst[-1] lastone = lst[-1]
for i in lst[:-1]: for i in lst[:-1]:
s += (i + ' ') s += i + " "
s += f" `{lastone}`" s += f" `{lastone}`"
return s return s

View File

@@ -55,8 +55,8 @@ class MCQPuzzle(BasePuzzleWidget):
def _load(self): def _load(self):
cfg = self.atom.registry["nucleon"]["puzzles"][self.alia] cfg = self.atom.registry["nucleon"]["puzzles"][self.alia]
if cfg['mapping'] == {}: if cfg["mapping"] == {}:
self.screen.rating = 5 # type: ignore self.screen.rating = 5 # type: ignore
self.puzzle = pz.MCQPuzzle( self.puzzle = pz.MCQPuzzle(
cfg["mapping"], cfg["jammer"], int(cfg["max_riddles_num"]), cfg["prefix"] cfg["mapping"], cfg["jammer"], int(cfg["max_riddles_num"]), cfg["prefix"]
) )

View File

@@ -11,7 +11,7 @@ class Placeholder(Widget):
id: str | None = None, id: str | None = None,
classes: str | None = None, classes: str | None = None,
disabled: bool = False, disabled: bool = False,
markup: bool = True markup: bool = True,
) -> None: ) -> None:
super().__init__( super().__init__(
*children, *children,
@@ -19,7 +19,7 @@ class Placeholder(Widget):
id=id, id=id,
classes=classes, classes=classes,
disabled=disabled, disabled=disabled,
markup=markup markup=markup,
) )
def compose(self): def compose(self):

View File

@@ -67,7 +67,7 @@ class Recognition(BasePuzzleWidget):
f";{delim}": ";", f";{delim}": ";",
f":{delim}": ":", f":{delim}": ":",
} }
primary = cfg["primary"] primary = cfg["primary"]
with Center(): with Center():
@@ -88,7 +88,7 @@ class Recognition(BasePuzzleWidget):
for item in cfg["secondary"]: for item in cfg["secondary"]:
if isinstance(item, list): if isinstance(item, list):
for j in item: for j in item:
yield Markdown(f"### 笔记: {j}") #TODO ANNOTATION yield Markdown(f"### 笔记: {j}") # TODO ANNOTATION
continue continue
if isinstance(item, Dict): if isinstance(item, Dict):
total = "" total = ""

View File

@@ -8,7 +8,8 @@ logger = get_logger(__name__)
class BaseAlgorithm: class BaseAlgorithm:
algo_name = "BaseAlgorithm" algo_name = "BaseAlgorithm"
desc = '算法基类' desc = "算法基类"
class AlgodataDict(TypedDict): class AlgodataDict(TypedDict):
real_rept: int real_rept: int
rept: int rept: int

View File

@@ -10,7 +10,8 @@ logger = get_logger(__name__)
class NSP0Algorithm(BaseAlgorithm): class NSP0Algorithm(BaseAlgorithm):
algo_name = "NSP-0" algo_name = "NSP-0"
desc = '快速筛选用特殊调度器' desc = "快速筛选用特殊调度器"
class AlgodataDict(TypedDict): class AlgodataDict(TypedDict):
real_rept: int real_rept: int
rept: int rept: int
@@ -23,7 +24,7 @@ class NSP0Algorithm(BaseAlgorithm):
defaults = { defaults = {
"real_rept": 0, "real_rept": 0,
'important': 0, "important": 0,
"rept": 0, "rept": 0,
"interval": 0, "interval": 0,
"last_date": 0, "last_date": 0,
@@ -52,8 +53,10 @@ class NSP0Algorithm(BaseAlgorithm):
if feedback == -1: if feedback == -1:
logger.debug("feedback 为 -1, 跳过更新") logger.debug("feedback 为 -1, 跳过更新")
return return
algodata[cls.algo_name]["interval"] = (1 if feedback <= 3 else float('inf')) algodata[cls.algo_name]["interval"] = 1 if feedback <= 3 else float("inf")
algodata[cls.algo_name]["important"] = (1 if feedback <= 3 else algodata[cls.algo_name]["important"]) algodata[cls.algo_name]["important"] = (
1 if feedback <= 3 else algodata[cls.algo_name]["important"]
)
algodata[cls.algo_name]["last_date"] = timer.get_daystamp() algodata[cls.algo_name]["last_date"] = timer.get_daystamp()
algodata[cls.algo_name]["next_date"] = ( algodata[cls.algo_name]["next_date"] = (
timer.get_daystamp() + algodata[cls.algo_name]["interval"] timer.get_daystamp() + algodata[cls.algo_name]["interval"]

View File

@@ -27,7 +27,7 @@ from heurams.kernel.algorithms.sm15m_calc import (
# 全局状态文件路径 # 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser( _GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()['global']["paths"]["data"]) pathlib.Path(config_var.get()["global"]["paths"]["data"])
/ "global" / "global"
/ "sm15m_global_state.json" / "sm15m_global_state.json"
) )

View File

@@ -10,7 +10,8 @@ logger = get_logger(__name__)
class SM2Algorithm(BaseAlgorithm): class SM2Algorithm(BaseAlgorithm):
algo_name = "SM-2" algo_name = "SM-2"
desc = '经典间隔重复算法' desc = "经典间隔重复算法"
class AlgodataDict(TypedDict): class AlgodataDict(TypedDict):
efactor: float efactor: float
real_rept: int real_rept: int

View File

@@ -32,7 +32,7 @@ class Atom:
default_runtime = { default_runtime = {
"locked": False, "locked": False,
"min_rate": float('inf'), "min_rate": float("inf"),
"new_activation": False, "new_activation": False,
} }

View File

@@ -24,6 +24,7 @@ class Electron:
algo_name = "SM-2" algo_name = "SM-2"
self.algodata = algodata self.algodata = algodata
self.ident = ident self.ident = ident
self.algoname = algo_name
self.algo: algolib.BaseAlgorithm = algorithms[algo_name] self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
if not self.algo.check_integrity(self.algodata): if not self.algo.check_integrity(self.algodata):
@@ -53,10 +54,10 @@ class Electron:
result = self.algo.is_due(self.algodata) result = self.algo.is_due(self.algodata)
return result and self.is_activated() return result and self.is_activated()
def rept(self, real_rept = False): def rept(self, real_rept=False):
if real_rept: if real_rept:
return self.algodata[self.algo.algo_name]['real_rept'] return self.algodata[self.algo.algo_name]["real_rept"]
return self.algodata[self.algo.algo_name]['rept'] return self.algodata[self.algo.algo_name]["rept"]
def is_activated(self): def is_activated(self):
result = self.algodata[self.algo.algo_name]["is_activated"] result = self.algodata[self.algo.algo_name]["is_activated"]
@@ -112,7 +113,7 @@ class Electron:
return len(self.algodata[self.algo.algo_name]) return len(self.algodata[self.algo.algo_name])
@staticmethod @staticmethod
def create_on_electonic_data(electronic_data: tuple, algo_name: str = ""): def from_data(electronic_data: tuple, algo_name: str = ""):
_data = electronic_data _data = electronic_data
ident = _data[0] ident = _data[0]
algodata = _data[1] algodata = _data[1]

View File

@@ -15,26 +15,26 @@ class Nucleon:
self.ident = ident self.ident = ident
try: try:
data_safe = deepcopy((payload | common)) data_safe = deepcopy((payload | common))
data_puz = deepcopy(data_safe['puzzles']) data_puz = deepcopy(data_safe["puzzles"])
data_safe['puzzles'] = {} data_safe["puzzles"] = {}
env = { env = {
"payload": data_safe, "payload": data_safe,
"default": config_var.get()['interface']["puzzles"], "default": config_var.get()["interface"]["puzzles"],
"nucleon": data_safe, "nucleon": data_safe,
} }
self.evalizer = Evalizer(environment=env) self.evalizer = Evalizer(environment=env)
data_safe = self.evalizer(deepcopy(data_safe)) data_safe = self.evalizer(deepcopy(data_safe))
env = { env = {
"payload": data_safe, "payload": data_safe,
"default": config_var.get()['interface']["puzzles"], "default": config_var.get()["interface"]["puzzles"],
"nucleon": data_safe, "nucleon": data_safe,
} }
self.evalizer = Evalizer(environment=env) self.evalizer = Evalizer(environment=env)
data_puz = self.evalizer(deepcopy(data_puz)) data_puz = self.evalizer(deepcopy(data_puz))
data_safe['puzzles'] = data_puz # type: ignore data_safe["puzzles"] = data_puz # type: ignore
self.data: dict = data_safe # type: ignore self.data: dict = data_safe # type: ignore
except Exception: except Exception:
self.data = (payload | common) self.data = payload | common
def __getitem__(self, key): def __getitem__(self, key):
if isinstance(key, str): if isinstance(key, str):
@@ -71,7 +71,7 @@ class Nucleon:
return s return s
@staticmethod @staticmethod
def create_on_nucleonic_data(nucleonic_data: tuple): def from_data(nucleonic_data: tuple):
_data = nucleonic_data _data = nucleonic_data
payload = _data[1][0] payload = _data[1][0]
common = _data[1][1] common = _data[1][1]

View File

@@ -75,7 +75,7 @@ class Fission(Machine):
self.current_puzzle_inf = self.puzzles_inf[0] self.current_puzzle_inf = self.puzzles_inf[0]
for i in range(len(self.puzzles_inf)): for i in range(len(self.puzzles_inf)):
self.min_ratings.append(float('inf')) self.min_ratings.append(float("inf"))
Machine.__init__( Machine.__init__(
self, self,

View File

@@ -13,10 +13,14 @@ from heurams.kernel.auxiliary.lict import Lict
class RepoManifest(TypedDict): class RepoManifest(TypedDict):
title: str title: str
author: str author: str
package: str
desc: str desc: str
class Repo: class Repo:
"""只维护仓库本身
上层 API 请访问此对象下粒子对象列表"""
file_mapping = { file_mapping = {
"schedule": "schedule.toml", "schedule": "schedule.toml",
"payload": "payload.toml", "payload": "payload.toml",
@@ -58,14 +62,15 @@ class Repo:
"algodata": self.algodata, "algodata": self.algodata,
"source": self.source, "source": self.source,
} }
self.generate_particles_data() self._generate_particles_data()
def generate_particles_data(self):
def _generate_particles_data(self):
"""生成上层的粒子对象组和 API 交互, 会在 init 后自动调用"""
self.nucleonic_data_lict = Lict( self.nucleonic_data_lict = Lict(
initlist=list(map(self._nucleonic_proc, self.payload)) initlist=list(map(self._nucleonic_proc, self.payload))
) )
self.orbitic_data = self.schedule self.orbitic_data = self.schedule
self.data_length = len(self.nucleonic_data_lict)
self.ident_index = self.nucleonic_data_lict.keys() self.ident_index = self.nucleonic_data_lict.keys()
for i in self.ident_index: for i in self.ident_index:
self.algodata.append_new((i, {})) self.algodata.append_new((i, {}))
@@ -76,13 +81,6 @@ class Repo:
common = self.typedef["common"] common = self.typedef["common"]
return (ident, (unit[1], common)) return (ident, (unit[1], common))
@staticmethod
def _merge(value):
def inner(x):
return (x, value)
return inner
def __len__(self): def __len__(self):
return len(self.payload) return len(self.payload)
@@ -95,6 +93,7 @@ class Repo:
def persist_to_repodir( def persist_to_repodir(
self, save_list: list | None = None, source: Path | None = None self, save_list: list | None = None, source: Path | None = None
): ):
"""保存单元集数据到目录"""
if save_list == None: if save_list == None:
save_list = self.default_save_list save_list = self.default_save_list
if self.source != None and source == None: if self.source != None and source == None:
@@ -116,11 +115,13 @@ class Repo:
else: else:
raise ValueError(f"不支持的文件类型: {filename}") raise ValueError(f"不支持的文件类型: {filename}")
def export_to_single_dict(self): def export_to_dict(self):
"""导出至单个字典"""
return self.database return self.database
@classmethod @classmethod
def create_new_repo(cls, source=None): def create_new_repo(cls, source=None):
"""创建新的空单元集"""
default_database = { default_database = {
"schedule": {}, "schedule": {},
"payload": Lict([]), "payload": Lict([]),
@@ -132,7 +133,8 @@ class Repo:
return Repo(**default_database) return Repo(**default_database)
@classmethod @classmethod
def create_from_repodir(cls, source: Path): def from_repodir(cls, source: Path):
"""从目录创建单元集"""
database = {} database = {}
for keyname, filename in cls.file_mapping.items(): for keyname, filename in cls.file_mapping.items():
with open(source / filename, "r") as f: with open(source / filename, "r") as f:
@@ -153,21 +155,24 @@ class Repo:
return Repo(**database) return Repo(**database)
@classmethod @classmethod
def create_from_single_dict(cls, dictdata, source: Path | None = None): def from_dict(cls, dictdata, source: Path | None = None):
"""从单一字典创建单元集"""
database = dictdata database = dictdata
database["source"] = source database["source"] = source
return Repo(**database) return Repo(**database)
@classmethod @classmethod
def check_repodir(cls, source: Path): def check_repodir(cls, source: Path):
"""检测单元集目录合法性"""
try: try:
cls.create_from_repodir(source) cls.from_repodir(source)
return 1 return True
except: except:
return 0 return False
@classmethod @classmethod
def probe_valid_repos_in_dir(cls, folder: Path): def probe_valid_repos_in_dir(cls, folder: Path):
"""返回一个合法的子目录 Path() 列表"""
lst = list() lst = list()
for i in folder.iterdir(): for i in folder.iterdir():
if i.is_dir(): if i.is_dir():

View File

@@ -1,4 +1,5 @@
"""音频服务""" """音频服务"""
from typing import Callable from typing import Callable
from heurams.context import config_var from heurams.context import config_var
@@ -7,7 +8,10 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]['provider']].play_by_path play_by_path: Callable = prov[
config_var.get()["services"]["audio"]["provider"]
].play_by_path
logger.debug( logger.debug(
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]['provider'] "音频服务初始化完成, 使用 Provider: %s",
config_var.get()["services"]["audio"]["provider"],
) )

View File

@@ -11,26 +11,27 @@ from heurams.services.exceptions import WTFException
# 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得 # 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得
logger = get_logger(__name__) logger = get_logger(__name__)
class ConfigDict(UserDict): # 舒服了
_instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题 class ConfigDict(UserDict): # 舒服了
_instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题
def __new__(cls, config_path: pathlib.Path, dict=None): def __new__(cls, config_path: pathlib.Path, dict=None):
if dict: if dict:
raise WTFException("不要放默认值...") raise WTFException("不要放默认值...")
# 规范化路径, 免得单例存在"别名" # 规范化路径, 免得单例存在"别名"
path_key = config_path.resolve() path_key = config_path.resolve()
if path_key in cls._instances: if path_key in cls._instances:
return cls._instances[path_key] return cls._instances[path_key]
instance = super().__new__(cls) instance = super().__new__(cls)
cls._instances[path_key] = instance cls._instances[path_key] = instance
return instance return instance
def __init__(self, config_path: pathlib.Path, dict = None): # 需要自己把自己提起来 def __init__(self, config_path: pathlib.Path, dict=None): # 需要自己把自己提起来
# 避免重复初始化 # 避免重复初始化
if hasattr(self, '_initialized'): if hasattr(self, "_initialized"):
return return
self._initialized = True self._initialized = True
if dict: if dict:
@@ -42,13 +43,13 @@ class ConfigDict(UserDict): # 舒服了
if self.is_dir: if self.is_dir:
self.update_index() self.update_index()
else: else:
with open(self.path, 'r+') as f: #TODO: 给这个做缓存 with open(self.path, "r+") as f: # TODO: 给这个做缓存
try: try:
self.data = toml.load(f) self.data = toml.load(f)
except: except:
self.data = {} self.data = {}
self.persist = lambda: False # 不修改错误的配置文件 self.persist = lambda: False # 不修改错误的配置文件
def __getitem__(self, key): def __getitem__(self, key):
# 我们实现了先进的懒狗加载 # 我们实现了先进的懒狗加载
value = super().__getitem__(key) value = super().__getitem__(key)
@@ -60,7 +61,7 @@ class ConfigDict(UserDict): # 舒服了
return super().__contains__(key) return super().__contains__(key)
def __setitem__(self, key, value): def __setitem__(self, key, value):
origvalue = super().__getitem__(key) # 所以你不该访问不存在的对象 origvalue = super().__getitem__(key) # 所以你不该访问不存在的对象
if isinstance(origvalue, ConfigDict): if isinstance(origvalue, ConfigDict):
if origvalue.path.is_dir(): if origvalue.path.is_dir():
raise WTFException("你怎么能变更目录配置的内容呢?!") raise WTFException("你怎么能变更目录配置的内容呢?!")
@@ -70,20 +71,22 @@ class ConfigDict(UserDict): # 舒服了
origvalue.data = value origvalue.data = value
super().__setitem__(key, value) super().__setitem__(key, value)
def update_index(self): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性 def update_index(
self,
): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性
for i in self.path.iterdir(): for i in self.path.iterdir():
if i.name.startswith('_'): if i.name.startswith("_"):
if i.name == '_.toml' and not i.is_dir(): if i.name == "_.toml" and not i.is_dir():
with open(self.path/'_.toml', 'r+') as f: with open(self.path / "_.toml", "r+") as f:
self.data.update(dict(toml.load(f))) self.data.update(dict(toml.load(f)))
continue continue
if i.is_dir(): if i.is_dir():
self.data[i.name] = i self.data[i.name] = i
else: else:
if i.suffix == '.toml': if i.suffix == ".toml":
self.data[i.stem] = i self.data[i.stem] = i
else: else:
logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro
def persist(self): def persist(self):
if self.is_dir: if self.is_dir:
@@ -94,5 +97,5 @@ class ConfigDict(UserDict): # 舒服了
logger.debug("完成配置持久化") logger.debug("完成配置持久化")
return return
with open(self.path, 'w+') as f: with open(self.path, "w+") as f:
toml.dump(self.data, f) toml.dump(self.data, f)

View File

@@ -3,27 +3,41 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, new_value=None):
def epath(
dct,
path: str = "",
default=None,
parents=False,
enable_modify=False,
new_value=None,
):
if not path: if not path:
return dct return dct
path = path.rstrip('.') path = path.rstrip(".")
path = path.lstrip('.') path = path.lstrip(".")
target = dct target = dct
keys = path.split('.') keys = path.split(".")
logger.debug(f"处理 EPATH {path}, {new_value}") logger.debug(f"处理 EPATH {path}, {new_value}")
for idx, i in enumerate(keys): for idx, i in enumerate(keys):
is_last = (idx == len(keys) - 1) is_last = idx == len(keys) - 1
# 处理字典键 # 处理字典键
logger.debug(f'处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}') logger.debug(
f"处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}"
)
if is_last and enable_modify: if is_last and enable_modify:
# 最后一次循环执行修改 # 最后一次循环执行修改
if (isinstance(target, dict) or isinstance(target, ConfigDict)): if isinstance(target, dict) or isinstance(target, ConfigDict):
target[i] = new_value target[i] = new_value
return new_value return new_value
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)): elif (
i.startswith("[")
and i.endswith("]")
and isinstance(target, (list, tuple))
):
idx_num = int(i[1:-1]) idx_num = int(i[1:-1])
if 0 <= idx_num < len(target): if 0 <= idx_num < len(target):
target[idx_num] = new_value target[idx_num] = new_value
@@ -38,9 +52,15 @@ def epath(dct, path: str = '', default=None, parents=False, enable_modify=False,
else: else:
return default return default
else: else:
if (isinstance(target, dict) or isinstance(target, ConfigDict)) and i in target: if (
isinstance(target, dict) or isinstance(target, ConfigDict)
) and i in target:
target = target[i] target = target[i]
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)): elif (
i.startswith("[")
and i.endswith("]")
and isinstance(target, (list, tuple))
):
idx_num = int(i[1:-1]) idx_num = int(i[1:-1])
if 0 <= idx_num < len(target): if 0 <= idx_num < len(target):
target = target[idx_num] target = target[idx_num]
@@ -56,5 +76,5 @@ def epath(dct, path: str = '', default=None, parents=False, enable_modify=False,
target = target[i] target = target[i]
else: else:
return default return default
return target return target

View File

@@ -1,4 +1,5 @@
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
class WTFException(Exception): class WTFException(Exception):
pass pass

View File

@@ -63,7 +63,7 @@ class FavoriteManager:
def _get_file_path(self) -> Path: def _get_file_path(self) -> Path:
"""获取收藏文件路径""" """获取收藏文件路径"""
config_path = Path(config_var.get()['global']["paths"]["data"]) config_path = Path(config_var.get()["global"]["paths"]["data"])
fav_path = config_path / "global" / "favorites.json" fav_path = config_path / "global" / "favorites.json"
fav_path.parent.mkdir(parents=True, exist_ok=True) fav_path.parent.mkdir(parents=True, exist_ok=True)
return fav_path return fav_path

View File

@@ -3,8 +3,10 @@ def truncate(text):
return text return text
return text[:3] + ">" return text[:3] + ">"
def domize(text): def domize(text):
return text.replace('.', '--DOT--') return text.replace(".", "--DOT--")
def undomize(text): def undomize(text):
return text.replace('--DOT--', '.') return text.replace("--DOT--", ".")

View File

@@ -9,12 +9,15 @@ logger = get_logger(__name__)
def get_daystamp() -> int: def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)""" """获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get()['services']["timer"]["daystamp_override"] time_override = config_var.get()["services"]["timer"]["daystamp_override"]
if time_override != -1: if time_override != -1:
logger.debug("使用覆盖的日戳: %d", time_override) logger.debug("使用覆盖的日戳: %d", time_override)
return int(time_override) return int(time_override)
result = int((time.time() + config_var.get()['services']["timer"]["timezone_offset"]) // (24 * 3600)) result = int(
(time.time() + config_var.get()["services"]["timer"]["timezone_offset"])
// (24 * 3600)
)
logger.debug("计算日戳: %d", result) logger.debug("计算日戳: %d", result)
return result return result
@@ -22,7 +25,7 @@ def get_daystamp() -> int:
def get_timestamp() -> float: def get_timestamp() -> float:
"""获取 UNIX 时间戳""" """获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作 # 搞这个类的原因是要支持可复现操作
time_override = config_var.get()['services']["timer"]["timestamp_override"] time_override = config_var.get()["services"]["timer"]["timestamp_override"]
if time_override != -1: if time_override != -1:
logger.debug("使用覆盖的时间戳: %f", time_override) logger.debug("使用覆盖的时间戳: %f", time_override)
return float(time_override) return float(time_override)

View File

@@ -7,7 +7,8 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
convertor: Callable = prov[config_var.get()['services']["tts"]["provider"]].convert convertor: Callable = prov[config_var.get()["services"]["tts"]["provider"]].convert
logger.debug( logger.debug(
"TTS 服务初始化完成, 使用 provider: %s", config_var.get()['services']["tts"]["provider"] "TTS 服务初始化完成, 使用 provider: %s",
config_var.get()["services"]["tts"]["provider"],
) )

View File

@@ -23,7 +23,7 @@ ident, content, meaning, ...
, "Woof", "狗发出的声音" , "Woof", "狗发出的声音"
``` ```
转换后的 TOML: 转换后的 TOML:
```toml ```toml
[Fox] [Fox]
content = "Fox" content = "Fox"
@@ -65,6 +65,7 @@ meaning = "狗发出的声音"
- 如果 CSV 包含更多列,它们也会以相同方式转换为键值对 - 如果 CSV 包含更多列,它们也会以相同方式转换为键值对
- 支持 `-r` 参数指定随机种子来打乱 section 顺序 - 支持 `-r` 参数指定随机种子来打乱 section 顺序
""" """
import csv import csv
import sys import sys
import os import os
@@ -72,10 +73,11 @@ import random
import argparse import argparse
from pathlib import Path from pathlib import Path
def csv_to_toml(csv_path, toml_path=None, random_seed=None): def csv_to_toml(csv_path, toml_path=None, random_seed=None):
""" """
将CSV文件转换为TOML格式 将CSV文件转换为TOML格式
Args: Args:
csv_path (str): 输入CSV文件路径 csv_path (str): 输入CSV文件路径
toml_path (str): 输出TOML文件路径默认为相同目录下同名文件 toml_path (str): 输出TOML文件路径默认为相同目录下同名文件
@@ -86,92 +88,97 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
if not csv_file.exists(): if not csv_file.exists():
print(f"错误: CSV文件不存在 - {csv_path}") print(f"错误: CSV文件不存在 - {csv_path}")
sys.exit(1) sys.exit(1)
# 确定输出TOML文件路径 # 确定输出TOML文件路径
if toml_path is None: if toml_path is None:
toml_path = csv_file.with_suffix('.toml') toml_path = csv_file.with_suffix(".toml")
else: else:
toml_path = Path(toml_path) toml_path = Path(toml_path)
# 读取CSV文件 # 读取CSV文件
try: try:
with open(csv_file, 'r', encoding='utf-8') as f: with open(csv_file, "r", encoding="utf-8") as f:
reader = csv.DictReader(f) reader = csv.DictReader(f)
rows = list(reader) rows = list(reader)
except Exception as e: except Exception as e:
print(f"错误: 无法读取CSV文件 - {e}") print(f"错误: 无法读取CSV文件 - {e}")
sys.exit(1) sys.exit(1)
# 检查CSV文件是否有数据 # 检查CSV文件是否有数据
if not rows: if not rows:
print("错误: CSV文件为空或格式不正确") print("错误: CSV文件为空或格式不正确")
sys.exit(1) sys.exit(1)
# 如果指定了随机种子,设置随机种子并打乱行顺序 # 如果指定了随机种子,设置随机种子并打乱行顺序
if random_seed is not None: if random_seed is not None:
random.seed(random_seed) random.seed(random_seed)
random.shuffle(rows) random.shuffle(rows)
print(f"提示: 使用随机种子 {random_seed} 打乱了 section 顺序") print(f"提示: 使用随机种子 {random_seed} 打乱了 section 顺序")
# 生成TOML内容 # 生成TOML内容
toml_content = [] toml_content = []
idx_counter = 1 idx_counter = 1
for row in rows: for row in rows:
# 处理ident列为空时生成自动标识符 # 处理ident列为空时生成自动标识符
ident = row.get('ident', '').strip() ident = row.get("ident", "").strip()
if not ident: if not ident:
ident = f"idx_{idx_counter}" ident = f"idx_{idx_counter}"
idx_counter += 1 idx_counter += 1
# 添加section标题 # 添加section标题
toml_content.append(f"[{ident}]") toml_content.append(f"[{ident}]")
# 添加所有其他列作为键值对排除ident列 # 添加所有其他列作为键值对排除ident列
for key, value in row.items(): for key, value in row.items():
if key == 'ident': if key == "ident":
continue continue
# 确保值存在且不为空 # 确保值存在且不为空
if value is not None and str(value).strip() != '': if value is not None and str(value).strip() != "":
# 转义特殊字符并添加引号 # 转义特殊字符并添加引号
escaped_value = str(value).replace('"', '\\"') escaped_value = str(value).replace('"', '\\"')
toml_content.append(f'"{key}" = "{escaped_value}"') toml_content.append(f'"{key}" = "{escaped_value}"')
# section之间添加空行 # section之间添加空行
toml_content.append("") toml_content.append("")
# 写入TOML文件 # 写入TOML文件
try: try:
with open(toml_path, 'w', encoding='utf-8') as f: with open(toml_path, "w", encoding="utf-8") as f:
f.write('\n'.join(toml_content).strip()) f.write("\n".join(toml_content).strip())
print(f"成功: 已生成TOML文件 - {toml_path}") print(f"成功: 已生成TOML文件 - {toml_path}")
except Exception as e: except Exception as e:
print(f"错误: 无法写入TOML文件 - {e}") print(f"错误: 无法写入TOML文件 - {e}")
sys.exit(1) sys.exit(1)
def main(): def main():
"""主函数""" """主函数"""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='将CSV文件转换为TOML格式支持随机打乱section顺序', description="将CSV文件转换为TOML格式支持随机打乱section顺序",
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=''' epilog="""
示例: 示例:
%(prog)s input.csv output.toml %(prog)s input.csv output.toml
%(prog)s input.csv # 自动生成input.toml %(prog)s input.csv # 自动生成input.toml
%(prog)s input.csv -r 42 # 使用种子42打乱顺序 %(prog)s input.csv -r 42 # 使用种子42打乱顺序
%(prog)s input.csv -r 123 output.toml # 指定种子和输出路径 %(prog)s input.csv -r 123 output.toml # 指定种子和输出路径
''' """,
) )
parser.add_argument('csv_path', help='输入的CSV文件路径') parser.add_argument("csv_path", help="输入的CSV文件路径")
parser.add_argument('toml_path', nargs='?', help='输出的TOML文件路径默认为CSV同名文件') parser.add_argument(
parser.add_argument('-r', '--random-seed', type=int, "toml_path", nargs="?", help="输出的TOML文件路径默认为CSV同名文件"
help='随机种子用于打乱TOML section的顺序') )
parser.add_argument(
"-r", "--random-seed", type=int, help="随机种子用于打乱TOML section的顺序"
)
args = parser.parse_args() args = parser.parse_args()
csv_to_toml(args.csv_path, args.toml_path, args.random_seed) csv_to_toml(args.csv_path, args.toml_path, args.random_seed)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -3,6 +3,7 @@ import pickle
import readline import readline
import sys import sys
class DebugClient: class DebugClient:
def __init__(self, port=5555): def __init__(self, port=5555):
self.context = zmq.Context() self.context = zmq.Context()
@@ -12,7 +13,7 @@ class DebugClient:
print("输入Python代码并按回车执行, 输入 'exit' 退出") print("输入Python代码并按回车执行, 输入 'exit' 退出")
print("可用变量: app, logger") print("可用变量: app, logger")
print("-" * 50) print("-" * 50)
def execute(self, code): def execute(self, code):
"""执行代码并返回结果""" """执行代码并返回结果"""
try: try:
@@ -21,7 +22,7 @@ class DebugClient:
return response return response
except Exception as e: except Exception as e:
return f"连接错误: {e}" return f"连接错误: {e}"
def repl(self): def repl(self):
"""交互式REPL循环""" """交互式REPL循环"""
self.execute('print("test")') self.execute('print("test")')
@@ -29,27 +30,28 @@ class DebugClient:
try: try:
# 获取用户输入 # 获取用户输入
code = input(">>> ").strip() code = input(">>> ").strip()
if not code: if not code:
continue continue
if code.lower() in ['exit', 'quit']: if code.lower() in ["exit", "quit"]:
print("退出调试客户端") print("退出调试客户端")
break break
# 执行代码 # 执行代码
result = self.execute(code) result = self.execute(code)
print(f"结果: {result}\n") print(f"结果: {result}\n")
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n退出调试客户端") print("\n退出调试客户端")
break break
except EOFError: except EOFError:
break break
if __name__ == "__main__": if __name__ == "__main__":
# 从命令行参数获取端口 # 从命令行参数获取端口
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5555 port = int(sys.argv[1]) if len(sys.argv) > 1 else 5555
client = DebugClient(port) client = DebugClient(port)
client.repl() client.repl()