feat: 代码格式化, 改进仪表盘, 新增多CSS支持

This commit is contained in:
2026-04-20 16:30:04 +08:00
parent 8677e828c7
commit 4ca9c65bea
43 changed files with 551 additions and 349 deletions

View File

@@ -1,7 +1,7 @@
zmq_debug = false
_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开调试服务器\n可作为 HeurAMS 执行任意 python 代码, 无必要请关闭"
zmq_debug = true
_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开一个服务器\n调试工具可远程在 HeurAMS 执行任意 python 代码, 无必要请关闭"
zmq_debug_port = 5555
_zmq_debug_port_desc = "ZMQ 调试服务器端口"
_zmq_debug_port_desc = "[调试] ZMQ 调试服务器端口"
enable_built_in_interface = false
_enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)"
_paths_desc = "用户数据路径定义"

View File

@@ -4,7 +4,7 @@ quick_pass = true
_quick_pass_desc = "[调试] 启用快速应答功能(跳过测验)"
auto_pass = false
_auto_pass_desc = "[调试] 自动通过测试模式"
scheduled_num = "420"
scheduled_num = 420
_scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)"
algorithm = "NSP-0"
_algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)"

View File

@@ -1,3 +1,4 @@
title = "高考必背古诗文-0"
package = "cngk-0"
author = "__heurams__"
desc = "高考古诗文 60 篇"

View File

@@ -5,14 +5,12 @@ import heurams.kernel.particles as pt
import heurams.kernel.repolib as repolib
from heurams.services.textproc import truncate
repo = repolib.Repo.create_from_repodir(Path("./test_repo"))
repo = repolib.Repo.from_repodir(Path("./test_repo"))
alist = list()
print(repo.ident_index)
for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
)
e = pt.Electron.create_on_electonic_data(
n = pt.Nucleon.from_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i))
e = pt.Electron.from_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
)
print(n)

View File

@@ -1,5 +1,6 @@
import heurams.services.version as ver
# __main__.py
def main():
prompt = f"""HeurAMS {ver.ver} 已经被成功地安装在系统中.
@@ -17,5 +18,6 @@ python 代指您使用的解释器, 在某些发行版中可能是 python3, 而
注意: 一个常见的误区是, 执行 interface 下的 __main__.py 运行基本用户界面, 这会导致 Python 上下文环境异常, 请不要这样做."""
print(prompt)
if __name__ == "__main__":
main()

View File

@@ -30,6 +30,7 @@ config_var: ContextVar[ConfigDict].get = ContextVar(
)
"""配置对象的全局引用对象."""
class ConfigContext:
"""
功能完备的上下文管理器

View File

@@ -1,9 +1,11 @@
from time import sleep, perf_counter
print("欢迎使用基本用户界面!")
print("加载配置与上下文... ", end="", flush=True)
_start1 = perf_counter()
_start = perf_counter()
from heurams.context import *
_end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -11,6 +13,7 @@ print("加载用户界面框架... ", end="", flush=True)
_start = perf_counter()
from textual.app import App
from textual.widgets import Button
_end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -22,6 +25,7 @@ from .screens.navigator import NavigatorScreen
from .screens.precache import PrecachingScreen
from .screens.setting import SettingScreen
from .screens.synctool import SyncScreen
_end = perf_counter()
print(f"已完成! (耗时: {round(1000 * (_end - _start))}ms)")
@@ -29,9 +33,12 @@ print(f"组件目录: {rootdir}")
print(f"工作目录: {workdir}")
_end1 = perf_counter()
print(f"前置工作共计耗时: {round(1000 * (_end1 - _start1))}ms")
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
css_dir = pathlib.Path("css").resolve()
SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [
("q", "go_back", "退出"),

View File

@@ -7,6 +7,7 @@ import pickle
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
@@ -23,11 +24,12 @@ def environment_check():
print(f"找到 {i}")
logger.debug("环境检查完成")
def start_debug_server(app):
logger = get_logger("zmq_debug")
context = zmq.Context()
socket = context.socket(zmq.REP)
port = config_var.get()['global'].get('zmq_debug_port', 5555)
port = config_var.get()["global"].get("zmq_debug_port", 5555)
socket.bind(f"tcp://*:{port}")
logger.info(f"ZMQ Debug server started on port {port}")
first = 1
@@ -36,7 +38,7 @@ def start_debug_server(app):
code = pickle.loads(msg)
namespace = {"app": app, "logger": logger, "config_var": config_var}
if first:
app.title += ' [调试已连接]'
app.title += " [调试已连接]"
first = 0
try:
# 先尝试 eval
@@ -52,15 +54,17 @@ def start_debug_server(app):
except Exception as e:
socket.send(pickle.dumps(f"错误: {e}"))
def main():
environment_check()
app = HeurAMSApp()
if config_var.get()['global'].get('zmq_debug', False):
if config_var.get()["global"].get("zmq_debug", False):
threading.Thread(target=start_debug_server, args=(app,), daemon=True).start()
app.run(inline=False)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,18 @@
.repo-list {
}
#header {
height: 4;
}
.repo-list-item {
layout: grid;
grid-size: 2;
height: 3;
}
.repo-list-item-shortcut {
dock: right;
offset: -5% 0
}

View File

@@ -100,8 +100,9 @@ Textual 框架版本: {textual_version}
"""获取 Textual 框架版本"""
try:
import textual
return textual.__version__
except (ImportError, AttributeError):
except ImportError, AttributeError:
return "未知"
def _get_terminal_info(self) -> str:
@@ -110,7 +111,7 @@ Textual 框架版本: {textual_version}
if terminal:
return terminal
# 尝试从环境变量获取
terminal_env = os.environ.get('TERM_PROGRAM') or os.environ.get('TERM')
terminal_env = os.environ.get("TERM_PROGRAM") or os.environ.get("TERM")
return terminal_env or "未知"
def _get_python_version(self) -> str:
@@ -123,8 +124,10 @@ Textual 框架版本: {textual_version}
if platform.system() == "Darwin":
# macOS
import subprocess
result = subprocess.run(['sw_vers', '-productVersion'],
capture_output=True, text=True)
result = subprocess.run(
["sw_vers", "-productVersion"], capture_output=True, text=True
)
return f"macOS {result.stdout.strip()}"
elif platform.system() == "Windows":
# Windows
@@ -133,8 +136,9 @@ Textual 框架版本: {textual_version}
# Linux - 尝试获取发行版信息
try:
import distro
return f"{distro.name()} {distro.version()}"
except (ImportError, AttributeError):
except ImportError, AttributeError:
return platform.platform()
else:
return platform.platform()
@@ -144,7 +148,7 @@ Textual 框架版本: {textual_version}
def _get_disk_usage(self) -> str:
"""获取磁盘使用情况"""
try:
usage = psutil.disk_usage('/')
usage = psutil.disk_usage("/")
free_gb = usage.free / (1024**3)
total_gb = usage.total / (1024**3)
percent_free = (free_gb / total_gb) * 100

View File

@@ -35,6 +35,8 @@ class DashboardScreen(Screen):
("q", "go_back", "返回"),
]
CSS_PATH = Path(__file__).parent.parent / 'css' / "screens" / "dashboard.tcss"
def __init__(
self,
name: str | None = None,
@@ -42,97 +44,91 @@ class DashboardScreen(Screen):
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.repostat = {}
self.title2dirname = {}
self.title2repo = {}
self.dirname2repo = {}
self._load_data()
self.repolink = {}
def compose(self) -> ComposeResult:
"""组合界面组件"""
self._load_data()
yield Header(show_clock=True)
with ScrollableContainer():
yield Horizontal(
yield Horizontal( # 顶部的状态
Vertical(
Label(f'欢迎使用 "潜进" 版本 {version.ver}'),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label(
f"当前 UNIX 日时间戳: {timer.get_daystamp()}"
f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"
),
Label(f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"),
Label(f"全局算法设置: {config_var.get()['interface']['global']['algorithm']}: {algorithms[config_var.get()['interface']['global']['algorithm']].desc}"),
classes="column infview",
Label(
f"默认算法设置: {config_var.get()['interface']['global']['algorithm']}"
),
classes="left",
),
Vertical(
Label(f"已加载 {len(self.repostat)} 个单元集", classes='dataview'),
Label(f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.get('unit_sum'), self.repostat.values()))} 个单元", classes='dataview'),
Label(f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.get('activated_sum'), self.repostat.values()))} 个单元", classes='dataview'),
Label(f""),
classes="column dataview",
Label(f"已加载 {len(self.repos)} 个单元集"),
Label(
f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.progress['total'], self.repos))} 个单元"
),
id="dashboardtop"
Label(
f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.progress['touched'], self.repos))} 个单元"
),
Label(f""),
classes="right",
),
id="header",
)
yield ListView(id="repo-list", classes="repo-list-view")
yield Label(f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}')
yield ListView(id="repo_list", classes="repo-list") # 单元集选择
yield Label(
f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} {version.stage.capitalize()}'
) # 版本信息
yield Footer()
def _load_data(self):
self.repo_dirs = Repo.probe_valid_repos_in_dir(
Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(
Path(config_var.get()["global"]["paths"]["repo"])
)
for repo_dir in self.repo_dirs:
repo = Repo.create_from_repodir(repo_dir)
self.repos = list(map(Repo.from_repodir, repo_dirs))
for repo in self.repos:
self._analyse_repo(repo)
def _analyse_repo(self, repo: Repo):
dirname = repo.source.name # type: ignore
title = repo.manifest["title"]
is_due = 0
unit_sum = len(repo)
activated_sum = 0
nextdate = float('inf')
for i in repo.ident_index:
nucleon = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
)
electron = pt.Electron.create_on_electonic_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i),
algo_name=config_var.get()['repo'][repo.manifest['title']]['algorithm']
)
if electron.is_activated():
activated_sum += 1
if electron.is_due():
is_due = 1
nextdate = min(nextdate, electron.nextdate())
is_unfinished = unit_sum > activated_sum
if is_unfinished:
nextdate = min(nextdate, timer.get_daystamp())
need_to_study = is_due or is_unfinished
prompt = f"{title}\0\n 进度: {activated_sum}/{unit_sum} ({round(activated_sum/unit_sum*100)}%)\n {'需要学习' if need_to_study else '无需操作'}"
stat = {
"is_due": is_due,
"unit_sum": unit_sum,
"title": title,
"activated_sum": activated_sum,
"nextdate": nextdate,
"is_unfinished": is_unfinished,
"need_to_study": need_to_study,
"prompt": prompt,
"dirname": dirname,
# need_review: 需要/不需要学习
# nearest_review_time: 最近下次学习时间
# progress: 进度
# algotype: 算法类型
## initial_time: 起始时间
# package: 包名
# prompt: 最终呈现信息
repo.package = repo.manifest["package"]
repo.nearest_review_time = float("inf")
repo.progress = {
"total": repo.data_length,
"touched": 0,
}
self.repostat[dirname] = stat
self.title2dirname[title] = dirname
self.title2repo[title] = repo
self.dirname2repo[dirname] = repo
initial_time = float("inf")
for i in range(repo.data_length):
e = pt.Electron.from_data(repo.electronic_data_lict[i])
n = pt.Nucleon.from_data(repo.nucleonic_data_lict[i])
if e.is_activated():
repo.algotype = e.algoname
repo.progress["touched"] += 1
repo.nearest_review_time = min(repo.nearest_review_time, e.nextdate())
# initial_time = min(initial_time, e.)
repo.need_review = timer.get_daystamp() >= repo.nearest_review_time
repo.prompt = f"""{repo.manifest['title']} ({repo.algotype})
进度: {repo.progress['touched']}/{repo.progress['total']} ({round(repo.progress['touched']/repo.progress['total']*100, 1)}%)
{'需要学习' if repo.need_review else "无需操作"}
"""
def on_mount(self) -> None:
"""挂载组件时初始化"""
repo_list_widget = self.query_one("#repo-list", ListView)
repo_list_widget = self.query_one("#repo_list", ListView)
# 按下次复习时间排序
repodirs = sorted(
self.repo_dirs,
key=lambda f: self.repostat[f.name]["nextdate"],
self.repos,
key=lambda r: r.nearest_review_time,
reverse=True,
)
repotitles = map(lambda f: self.repostat[f.name]["title"], repodirs)
@@ -141,43 +137,44 @@ class DashboardScreen(Screen):
repo_list_widget.append(
ListItem(
Static(
"./data/repo/ 中未找到任何仓库.\n"
f"{config_var.get()['global']['paths']['repo']} 中未找到任何仓库.\n"
"请导入仓库后重启应用, 或者新建空的仓库."
)
),
id="not-found",
)
)
repo_list_widget.disabled = True
return
for repotitle in repotitles:
prompt = self.repostat[self.title2dirname[repotitle]]["prompt"]
list_item = ListItem(Label(prompt), Button(f"开始学习", flat=True, variant="primary", classes="repo_listitem_btn", id=f"launch_{self.repostat[self.title2dirname[repotitle]]['dirname']}"), classes="repo_listitem")
for r in self.repos:
self.repolink[str(id(r))] = r # 用于规避 ctype id 对象还原
list_item = ListItem(
Label(r.prompt),
Button(
f"开始学习",
flat=True,
variant="primary",
id=f"slaunch_repo_{id(r)}",
classes="repo-list-item-shortcut",
),
classes="repo-list-item",
id=f"launch_repo_{id(r)}",
)
repo_list_widget.append(list_item)
# if not self.stay_enabled[repodir]:
# list_item.disabled = True
def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件"""
if not isinstance(event.item, ListItem):
return
selected_label = event.item.query_one(Label)
label_text = str(selected_label.render())
if "未找到任何仓库" in label_text:
if "not-found" == event.item.id:
return
# 提取文件名
selected_repotitle = label_text.partition("\0")[0].replace("*", "")
selected_repo = self.title2repo[label_text.partition("\0")[0].replace("*", "")]
# 还原对象
selected_repo = self.repolink[event.item.id.lstrip("launch_repo_")]
# 跳转到准备屏幕
self.app.push_screen(
PreparationScreen(
selected_repo, self.repostat[self.title2dirname[selected_repotitle]]
)
)
self.app.push_screen(PreparationScreen(selected_repo))
def action_quit_app(self) -> None:
"""退出应用程序"""
@@ -188,9 +185,10 @@ class DashboardScreen(Screen):
self.app.push_screen(NavigatorScreen())
def on_button_pressed(self, event: Button.Pressed) -> None:
logger.debug(f"event.button.id: {event.button.id}")
"""处理按钮点击事件"""
if str(event.button.id).startswith("launch_"): # type: ignore
logger.debug(f"event.button.id: {event.button.id}")
if event.button.id.startswith("slaunch_repo_"): # type: ignore
from .preparation import launch
launch(repo=self.dirname2repo[event.button.id[7:]], app=self.app, scheduled_num=-1) # type: ignore
launch(repo=self.repolink[event.button.id.lstrip("slaunch_repo_")], app=self.app, scheduled_num=-1) # type: ignore
# TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中

View File

@@ -115,12 +115,12 @@ class FavoriteManagerScreen(Screen):
def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]:
"""获取仓库信息(标题、原子内容预览)"""
try:
data_repo = Path(config_var.get()['global']["paths"]["data"]) / "repo"
data_repo = Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dir = data_repo / repo_path
if not repo_dir.exists():
logger.warning("仓库目录不存在: %s", repo_dir)
return None
repo = Repo.create_from_repodir(repo_dir)
repo = Repo.from_repodir(repo_dir)
# 获取原子内容预览
content_preview = ""
payload = repo.payload

View File

@@ -38,7 +38,7 @@ class MemScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""),
]
if config_var.get()['interface']['global']["quick_pass"]:
if config_var.get()["interface"]["global"]["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "正确应答"))
BINDINGS.append(("f", "quick_fail", "错误应答"))
rating = reactive(-1)
@@ -71,7 +71,6 @@ class MemScreen(Screen):
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom # type: ignore
def on_mount(self):
self.fission = self.procession.get_fission()
self.mount_puzzle()
@@ -93,7 +92,7 @@ class MemScreen(Screen):
if self.repo is not None:
fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏"
s += f"收藏: {fav_status}\n"
'''if config_var.get().get("debug_topline", 0):
"""if config_var.get().get("debug_topline", 0):
try:
alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore
s += f"谜题: {alia}\n"
@@ -113,7 +112,7 @@ class MemScreen(Screen):
stat = self.fission.__repr__("simple", "")
s += f"{stat}\n"
except Exception as e:
s = str(e)'''
s = str(e)"""
s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}"
return s
@@ -139,9 +138,9 @@ class MemScreen(Screen):
i.remove()
from heurams.interface.widgets.finished import Finished
if config_var.get()['interface']['global']["persist_to_file"]:
if config_var.get()["interface"]["global"]["persist_to_file"]:
self.save_func()
container.mount(Finished(is_saved=['interface']['global']["persist_to_file"]))
container.mount(Finished(is_saved=["interface"]["global"]["persist_to_file"]))
def on_button_pressed(self, event):
event.stop()
@@ -156,7 +155,7 @@ class MemScreen(Screen):
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
path = Path(config_var.get()['global']["paths"]["data"]) / "cache" / "voice"
path = Path(config_var.get()["global"]["paths"]["data"]) / "cache" / "voice"
path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav"
if path.exists():
play_by_path(path)
@@ -177,7 +176,6 @@ class MemScreen(Screen):
self.forward(new_rating)
self.rating = -1
def forward(self, rating):
self.update_state()
allow_forward = 1 if rating >= 4 else 0
@@ -226,7 +224,7 @@ class MemScreen(Screen):
return ""
# self.repo.source 是 Path 对象,指向仓库目录
repo_full_path = self.repo.source
data_repo_path = Path(config_var.get()['global']["paths"]["data"]) / "repo"
data_repo_path = Path(config_var.get()["global"]["paths"]["data"]) / "repo"
try:
rel_path = repo_full_path.relative_to(data_repo_path)
return str(rel_path)

View File

@@ -53,7 +53,11 @@ class NavigatorScreen(ModalScreen):
)
yield Static("按下回车以完成切换\n所有会话将被保存")
yield Button(
"关闭 (n)", id="close_button", variant="primary", classes="close-button", flat=True
"关闭 (n)",
id="close_button",
variant="primary",
classes="close-button",
flat=True,
)
def on_mount(self) -> None:

View File

@@ -13,13 +13,13 @@ import heurams.services.hasher as hasher
from heurams.context import *
# 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache
paths = config_var.get()['global']["paths"]
paths = config_var.get()["global"]["paths"]
cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice"
def format_size(bytes_num: int) -> str:
"""将字节数格式化为人类可读的字符串"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_num < 1024.0:
return f"{bytes_num:.2f} {unit}"
bytes_num /= 1024.0 # type: ignore
@@ -54,16 +54,24 @@ class PrecachingScreen(Screen):
self.cancel_flag = 0
self.desc = desc
# 不再需要缓存配置,保留配置读取以兼容
self.cache_stats = {"total_size": 0, "file_count": 0, "human_size": "0 B", "cached_units": 0, "total_units": 0, "cache_rate": 0}
self.cache_stats = {
"total_size": 0,
"file_count": 0,
"human_size": "0 B",
"cached_units": 0,
"total_units": 0,
"cache_rate": 0,
}
self._update_cache_stats()
def _get_total_units(self) -> int:
"""获取所有仓库的总单元数"""
from heurams.context import config_var
from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs)
repos = map(Repo.from_repodir, repo_dirs)
total = 0
for repo in repos:
try:
@@ -101,11 +109,15 @@ class PrecachingScreen(Screen):
with Container():
yield Static(
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)",
classes="cache-usage-text"
classes="cache-usage-text",
)
if self.nucleons:
yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info")
yield Static(
f"目标单元归属: [b]{self.desc}[/b]", classes="target-info"
)
yield Static(
f"单元数量: {len(self.nucleons)}", classes="target-info"
)
else:
yield Static("目标: 所有单元", classes="target-info")
@@ -114,16 +126,26 @@ class PrecachingScreen(Screen):
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
yield Button(
"开始预缓存", id="start_precache", variant="primary"
)
else:
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button(
"取消预缓存", id="cancel_precache", variant="error"
)
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
with Container(classes="cache-info"):
yield Static(f"缓存路径: {cache_dir}", classes="cache-path")
yield Static(f"文件数: {self.cache_stats['file_count']}", classes="cache-count")
yield Static(f"总大小: {self.cache_stats['human_size']}", classes="cache-size")
yield Button("刷新", id="refresh_cache_stats", variant="default", flat=True)
yield Static(
f"文件数: {self.cache_stats['file_count']}", classes="cache-count"
)
yield Static(
f"总大小: {self.cache_stats['human_size']}", classes="cache-size"
)
yield Button(
"刷新", id="refresh_cache_stats", variant="default", flat=True
)
yield Static("若您离开此界面, 未完成的缓存进程会自动停止.")
yield Static('缓存程序支持 "断点续传".')
@@ -230,9 +252,9 @@ class PrecachingScreen(Screen):
from heurams.context import config_var, rootdir, workdir
from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs)
repos = map(Repo.from_repodir, repo_dirs)
# 计算总项目数
self.total = 0
@@ -241,7 +263,7 @@ class PrecachingScreen(Screen):
try:
for i in repo.ident_index:
nucleon_list.append(
pt.Nucleon.create_on_nucleonic_data(
pt.Nucleon.from_data(
repo.nucleonic_data_lict.get_itemic_unit(i)
)
)

View File

@@ -5,7 +5,16 @@ from textual.containers import ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from textual.widget import Widget
from textual.widgets import Button, Footer, Header, Label, Markdown, Static, Rule, Sparkline
from textual.widgets import (
Button,
Footer,
Header,
Label,
Markdown,
Static,
Rule,
Sparkline,
)
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
@@ -28,20 +37,19 @@ class PreparationScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""),
]
scheduled_num = reactive(config_var.get()['interface']['global']["scheduled_num"])
scheduled_num = reactive(config_var.get()["interface"]["global"]["scheduled_num"])
def __init__(self, repo: Repo, repostat: dict) -> None:
def __init__(self, repo: Repo) -> None:
super().__init__(name=None, id=None, classes=None)
self.repo = repo
self.repostat = repostat
self.load_data()
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer(id="vice_container"):
yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n")
yield Label(f"准备就绪: [b]{self.repo.manifest['title']}[/b]\n")
yield Label(
f"仓库路径: {config_var.get()['global']['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]"
f"[b]仓库路径: {self.repo.source}[/b]"
)
yield Label(f"\n单元数量: {len(self.repo)}\n")
yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label")
@@ -80,19 +88,21 @@ class PreparationScreen(Screen):
content = ""
spark_line_arr = []
for i in self.repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
n = pt.Nucleon.from_data(
nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i)
)
e = pt.Electron.create_on_electonic_data(electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i))
e = pt.Electron.from_data(
electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i)
)
statstr = ""
if e.is_activated():
statstr = '[#00ff00]A[/]'
statstr = "[#00ff00]A[/]"
if e.is_due():
statstr = '[#ffff00]R[/]'
statstr = "[#ffff00]R[/]"
# statstr += ('[dim]' + str(e.rept(real_rept=True)).zfill(2)+'[/]')
else:
statstr = '[#ff0000]U[/]'
statstr = "[#ff0000]U[/]"
spark_line_arr.append(e.rept(real_rept=True))
content += f" {statstr} {n['content'].replace('/', '')} \n"
self.content = content
@@ -107,9 +117,7 @@ class PreparationScreen(Screen):
lst = list()
for i in self.repo.ident_index:
lst.append(
pt.Nucleon.create_on_nucleonic_data(
self.repo.nucleonic_data_lict.get_itemic_unit(i)
)
pt.Nucleon.from_data(self.repo.nucleonic_data_lict.get_itemic_unit(i))
)
precache_screen = PrecachingScreen(
nucleons=lst, desc=self.repo.manifest["title"]
@@ -128,15 +136,16 @@ class PreparationScreen(Screen):
elif event.button.id == "precache_button":
self.action_precache()
def launch(repo, app, scheduled_num):
if scheduled_num == -1:
scheduled_num = config_var.get()['interface']['global']["scheduled_num"]
scheduled_num = config_var.get()["interface"]["global"]["scheduled_num"]
atoms = list()
for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
n = pt.Nucleon.from_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
)
e = pt.Electron.create_on_electonic_data(
e = pt.Electron.from_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
)
a = pt.Atom(n, e, repo.orbitic_data)

View File

@@ -8,7 +8,19 @@ import os
from textual.app import ComposeResult
from textual.containers import ScrollableContainer, Container, Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static, Collapsible, Input, Switch, Select
from textual.widgets import (
Button,
Footer,
Header,
Label,
ListItem,
ListView,
Static,
Collapsible,
Input,
Switch,
Select,
)
from textual.layouts import horizontal
import heurams.kernel.particles as pt
@@ -45,13 +57,15 @@ class SettingScreen(Screen):
"""组合界面组件"""
yield Header(show_clock=True)
with ScrollableContainer():
yield Label('[b]设置页面[/b]')
yield Label("[b]设置页面[/b]")
for i in config_var.get():
if i.startswith('_'):
if i.startswith("_"):
continue
a = self._get_subcfg(f'{i}')
a = self._get_subcfg(f"{i}")
if a:
yield Collapsible(*a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}')
yield Collapsible(
*a, title=i + f'\n{config_var.get().get(f"_{i}_desc", "")}'
)
yield Footer()
def _get_subcfg(self, parent_epath: str):
@@ -60,61 +74,115 @@ class SettingScreen(Screen):
if parent.is_dir:
lst = list()
for i in parent:
if i.startswith('_'):
if i.startswith("_"):
continue
a = self._get_subcfg(f"{parent_epath}.{i}")
if a:
lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'))
lst.append(
Collapsible(
*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'
)
)
return lst
if isinstance(parent, dict) or (isinstance(parent, ConfigDict) and not parent.is_dir):
if isinstance(parent, dict) or (
isinstance(parent, ConfigDict) and not parent.is_dir
):
lst = list()
for i in parent:
if i.startswith('_'):
if i.startswith("_"):
continue
if isinstance(parent[i], dict):
a = self._get_subcfg(f"{parent_epath}.{i}")
if a:
lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'))
elif f'_{i}_candidate' in parent: # 选择框模式
if isinstance(parent[f'_{i}_candidate'], dict):
lst.append(Horizontal(
lst.append(
Collapsible(
*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'
)
)
elif f"_{i}_candidate" in parent: # 选择框模式
if isinstance(parent[f"_{i}_candidate"], dict):
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Select(((f"{j} ({k})", j) for j, k in parent[f'_{i}_candidate'].items()), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")),
classes='container'
))
elif isinstance(parent[f'_{i}_candidate'], list):
lst.append(Horizontal(
Select(
(
(f"{j} ({k})", j)
for j, k in parent[f"_{i}_candidate"].items()
),
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[f"_{i}_candidate"], list):
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Select(((j, j) for j in parent[f'_{i}_candidate']), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")),
classes='container'
))
Select(
((j, j) for j in parent[f"_{i}_candidate"]),
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
else:
if isinstance(parent[i], float):
lst.append(Horizontal(
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Input(value=str(parent[i]), placeholder='要求一个浮点数', type='number', id=domize(f"{parent_epath}.{i}")),
classes='container'))
Input(
value=str(parent[i]),
placeholder="要求一个浮点数",
type="number",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], str):
lst.append(Horizontal(
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Input(value=parent[i], placeholder='要求一个字符串', type='text', id=domize(f"{parent_epath}.{i}")),
classes='container'))
Input(
value=parent[i],
placeholder="要求一个字符串",
type="text",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], bool):
lst.append(Horizontal(
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Switch(value=parent[i], id=domize(f"{parent_epath}.{i}")),
classes='container'))
Switch(
value=parent[i], id=domize(f"{parent_epath}.{i}")
),
classes="container",
)
)
elif isinstance(parent[i], int):
lst.append(Horizontal(
lst.append(
Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Input(value=str(parent[i]), placeholder='要求一个整数', type='integer', id=domize(f"{parent_epath}.{i}")),
classes='container'))
Input(
value=str(parent[i]),
placeholder="要求一个整数",
type="integer",
id=domize(f"{parent_epath}.{i}"),
),
classes="container",
)
)
elif isinstance(parent[i], list):
pass
else:
lst.append(Label('未知类型'))
lst.append(Label("未知类型"))
return lst
return [Label('无子项')]
return [Label("无子项")]
def on_mount(self) -> None:
"""挂载组件时初始化"""
@@ -133,14 +201,18 @@ class SettingScreen(Screen):
"""打开导航器"""
self.app.push_screen(NavigatorScreen())
def on_input_changed(self, event: Input.Changed) -> None:
widget_id = event.input.id
if not widget_id:
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))
epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)
def on_switch_changed(self, event: Switch.Changed) -> None:
widget_id = event.switch.id
@@ -148,7 +220,12 @@ class SettingScreen(Screen):
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))
epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)
def on_select_changed(self, event: Select.Changed) -> None:
widget_id = event.select.id
@@ -156,4 +233,9 @@ class SettingScreen(Screen):
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))
epath(
config_var.get(),
eepath,
enable_modify=True,
new_value=type(epath(config_var.get(), eepath))(value),
)

View File

@@ -15,7 +15,7 @@ class BasePuzzleWidget(Widget):
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True
markup: bool = True,
) -> None:
super().__init__(
*children,
@@ -23,7 +23,7 @@ class BasePuzzleWidget(Widget):
id=id,
classes=classes,
disabled=disabled,
markup=markup
markup=markup,
)
self.atom = atom

View File

@@ -83,7 +83,7 @@ class ClozePuzzle(BasePuzzleWidget):
if lst:
lastone = lst[-1]
for i in lst[:-1]:
s += (i + ' ')
s += i + " "
s += f" `{lastone}`"
return s

View File

@@ -55,7 +55,7 @@ class MCQPuzzle(BasePuzzleWidget):
def _load(self):
cfg = self.atom.registry["nucleon"]["puzzles"][self.alia]
if cfg['mapping'] == {}:
if cfg["mapping"] == {}:
self.screen.rating = 5 # type: ignore
self.puzzle = pz.MCQPuzzle(
cfg["mapping"], cfg["jammer"], int(cfg["max_riddles_num"]), cfg["prefix"]

View File

@@ -11,7 +11,7 @@ class Placeholder(Widget):
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True
markup: bool = True,
) -> None:
super().__init__(
*children,
@@ -19,7 +19,7 @@ class Placeholder(Widget):
id=id,
classes=classes,
disabled=disabled,
markup=markup
markup=markup,
)
def compose(self):

View File

@@ -8,7 +8,8 @@ logger = get_logger(__name__)
class BaseAlgorithm:
algo_name = "BaseAlgorithm"
desc = '算法基类'
desc = "算法基类"
class AlgodataDict(TypedDict):
real_rept: int
rept: int

View File

@@ -10,7 +10,8 @@ logger = get_logger(__name__)
class NSP0Algorithm(BaseAlgorithm):
algo_name = "NSP-0"
desc = '快速筛选用特殊调度器'
desc = "快速筛选用特殊调度器"
class AlgodataDict(TypedDict):
real_rept: int
rept: int
@@ -23,7 +24,7 @@ class NSP0Algorithm(BaseAlgorithm):
defaults = {
"real_rept": 0,
'important': 0,
"important": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
@@ -52,8 +53,10 @@ class NSP0Algorithm(BaseAlgorithm):
if feedback == -1:
logger.debug("feedback 为 -1, 跳过更新")
return
algodata[cls.algo_name]["interval"] = (1 if feedback <= 3 else float('inf'))
algodata[cls.algo_name]["important"] = (1 if feedback <= 3 else algodata[cls.algo_name]["important"])
algodata[cls.algo_name]["interval"] = 1 if feedback <= 3 else float("inf")
algodata[cls.algo_name]["important"] = (
1 if feedback <= 3 else algodata[cls.algo_name]["important"]
)
algodata[cls.algo_name]["last_date"] = timer.get_daystamp()
algodata[cls.algo_name]["next_date"] = (
timer.get_daystamp() + algodata[cls.algo_name]["interval"]

View File

@@ -27,7 +27,7 @@ from heurams.kernel.algorithms.sm15m_calc import (
# 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()['global']["paths"]["data"])
pathlib.Path(config_var.get()["global"]["paths"]["data"])
/ "global"
/ "sm15m_global_state.json"
)

View File

@@ -10,7 +10,8 @@ logger = get_logger(__name__)
class SM2Algorithm(BaseAlgorithm):
algo_name = "SM-2"
desc = '经典间隔重复算法'
desc = "经典间隔重复算法"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int

View File

@@ -32,7 +32,7 @@ class Atom:
default_runtime = {
"locked": False,
"min_rate": float('inf'),
"min_rate": float("inf"),
"new_activation": False,
}

View File

@@ -24,6 +24,7 @@ class Electron:
algo_name = "SM-2"
self.algodata = algodata
self.ident = ident
self.algoname = algo_name
self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
if not self.algo.check_integrity(self.algodata):
@@ -55,8 +56,8 @@ class Electron:
def rept(self, real_rept=False):
if real_rept:
return self.algodata[self.algo.algo_name]['real_rept']
return self.algodata[self.algo.algo_name]['rept']
return self.algodata[self.algo.algo_name]["real_rept"]
return self.algodata[self.algo.algo_name]["rept"]
def is_activated(self):
result = self.algodata[self.algo.algo_name]["is_activated"]
@@ -112,7 +113,7 @@ class Electron:
return len(self.algodata[self.algo.algo_name])
@staticmethod
def create_on_electonic_data(electronic_data: tuple, algo_name: str = ""):
def from_data(electronic_data: tuple, algo_name: str = ""):
_data = electronic_data
ident = _data[0]
algodata = _data[1]

View File

@@ -15,26 +15,26 @@ class Nucleon:
self.ident = ident
try:
data_safe = deepcopy((payload | common))
data_puz = deepcopy(data_safe['puzzles'])
data_safe['puzzles'] = {}
data_puz = deepcopy(data_safe["puzzles"])
data_safe["puzzles"] = {}
env = {
"payload": data_safe,
"default": config_var.get()['interface']["puzzles"],
"default": config_var.get()["interface"]["puzzles"],
"nucleon": data_safe,
}
self.evalizer = Evalizer(environment=env)
data_safe = self.evalizer(deepcopy(data_safe))
env = {
"payload": data_safe,
"default": config_var.get()['interface']["puzzles"],
"default": config_var.get()["interface"]["puzzles"],
"nucleon": data_safe,
}
self.evalizer = Evalizer(environment=env)
data_puz = self.evalizer(deepcopy(data_puz))
data_safe['puzzles'] = data_puz # type: ignore
data_safe["puzzles"] = data_puz # type: ignore
self.data: dict = data_safe # type: ignore
except Exception:
self.data = (payload | common)
self.data = payload | common
def __getitem__(self, key):
if isinstance(key, str):
@@ -71,7 +71,7 @@ class Nucleon:
return s
@staticmethod
def create_on_nucleonic_data(nucleonic_data: tuple):
def from_data(nucleonic_data: tuple):
_data = nucleonic_data
payload = _data[1][0]
common = _data[1][1]

View File

@@ -75,7 +75,7 @@ class Fission(Machine):
self.current_puzzle_inf = self.puzzles_inf[0]
for i in range(len(self.puzzles_inf)):
self.min_ratings.append(float('inf'))
self.min_ratings.append(float("inf"))
Machine.__init__(
self,

View File

@@ -13,10 +13,14 @@ from heurams.kernel.auxiliary.lict import Lict
class RepoManifest(TypedDict):
title: str
author: str
package: str
desc: str
class Repo:
"""只维护仓库本身
上层 API 请访问此对象下粒子对象列表"""
file_mapping = {
"schedule": "schedule.toml",
"payload": "payload.toml",
@@ -58,14 +62,15 @@ class Repo:
"algodata": self.algodata,
"source": self.source,
}
self.generate_particles_data()
def generate_particles_data(self):
self._generate_particles_data()
def _generate_particles_data(self):
"""生成上层的粒子对象组和 API 交互, 会在 init 后自动调用"""
self.nucleonic_data_lict = Lict(
initlist=list(map(self._nucleonic_proc, self.payload))
)
self.orbitic_data = self.schedule
self.data_length = len(self.nucleonic_data_lict)
self.ident_index = self.nucleonic_data_lict.keys()
for i in self.ident_index:
self.algodata.append_new((i, {}))
@@ -76,13 +81,6 @@ class Repo:
common = self.typedef["common"]
return (ident, (unit[1], common))
@staticmethod
def _merge(value):
def inner(x):
return (x, value)
return inner
def __len__(self):
return len(self.payload)
@@ -95,6 +93,7 @@ class Repo:
def persist_to_repodir(
self, save_list: list | None = None, source: Path | None = None
):
"""保存单元集数据到目录"""
if save_list == None:
save_list = self.default_save_list
if self.source != None and source == None:
@@ -116,11 +115,13 @@ class Repo:
else:
raise ValueError(f"不支持的文件类型: {filename}")
def export_to_single_dict(self):
def export_to_dict(self):
"""导出至单个字典"""
return self.database
@classmethod
def create_new_repo(cls, source=None):
"""创建新的空单元集"""
default_database = {
"schedule": {},
"payload": Lict([]),
@@ -132,7 +133,8 @@ class Repo:
return Repo(**default_database)
@classmethod
def create_from_repodir(cls, source: Path):
def from_repodir(cls, source: Path):
"""从目录创建单元集"""
database = {}
for keyname, filename in cls.file_mapping.items():
with open(source / filename, "r") as f:
@@ -153,21 +155,24 @@ class Repo:
return Repo(**database)
@classmethod
def create_from_single_dict(cls, dictdata, source: Path | None = None):
def from_dict(cls, dictdata, source: Path | None = None):
"""从单一字典创建单元集"""
database = dictdata
database["source"] = source
return Repo(**database)
@classmethod
def check_repodir(cls, source: Path):
"""检测单元集目录合法性"""
try:
cls.create_from_repodir(source)
return 1
cls.from_repodir(source)
return True
except:
return 0
return False
@classmethod
def probe_valid_repos_in_dir(cls, folder: Path):
"""返回一个合法的子目录 Path() 列表"""
lst = list()
for i in folder.iterdir():
if i.is_dir():

View File

@@ -1,4 +1,5 @@
"""音频服务"""
from typing import Callable
from heurams.context import config_var
@@ -7,7 +8,10 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]['provider']].play_by_path
play_by_path: Callable = prov[
config_var.get()["services"]["audio"]["provider"]
].play_by_path
logger.debug(
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]['provider']
"音频服务初始化完成, 使用 Provider: %s",
config_var.get()["services"]["audio"]["provider"],
)

View File

@@ -11,6 +11,7 @@ from heurams.services.exceptions import WTFException
# 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得
logger = get_logger(__name__)
class ConfigDict(UserDict): # 舒服了
_instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题
@@ -30,7 +31,7 @@ class ConfigDict(UserDict): # 舒服了
def __init__(self, config_path: pathlib.Path, dict=None): # 需要自己把自己提起来
# 避免重复初始化
if hasattr(self, '_initialized'):
if hasattr(self, "_initialized"):
return
self._initialized = True
if dict:
@@ -42,7 +43,7 @@ class ConfigDict(UserDict): # 舒服了
if self.is_dir:
self.update_index()
else:
with open(self.path, 'r+') as f: #TODO: 给这个做缓存
with open(self.path, "r+") as f: # TODO: 给这个做缓存
try:
self.data = toml.load(f)
except:
@@ -70,17 +71,19 @@ class ConfigDict(UserDict): # 舒服了
origvalue.data = value
super().__setitem__(key, value)
def update_index(self): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性
def update_index(
self,
): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性
for i in self.path.iterdir():
if i.name.startswith('_'):
if i.name == '_.toml' and not i.is_dir():
with open(self.path/'_.toml', 'r+') as f:
if i.name.startswith("_"):
if i.name == "_.toml" and not i.is_dir():
with open(self.path / "_.toml", "r+") as f:
self.data.update(dict(toml.load(f)))
continue
if i.is_dir():
self.data[i.name] = i
else:
if i.suffix == '.toml':
if i.suffix == ".toml":
self.data[i.stem] = i
else:
logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro
@@ -94,5 +97,5 @@ class ConfigDict(UserDict): # 舒服了
logger.debug("完成配置持久化")
return
with open(self.path, 'w+') as f:
with open(self.path, "w+") as f:
toml.dump(self.data, f)

View File

@@ -3,27 +3,41 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, new_value=None):
def epath(
dct,
path: str = "",
default=None,
parents=False,
enable_modify=False,
new_value=None,
):
if not path:
return dct
path = path.rstrip('.')
path = path.lstrip('.')
path = path.rstrip(".")
path = path.lstrip(".")
target = dct
keys = path.split('.')
keys = path.split(".")
logger.debug(f"处理 EPATH {path}, {new_value}")
for idx, i in enumerate(keys):
is_last = (idx == len(keys) - 1)
is_last = idx == len(keys) - 1
# 处理字典键
logger.debug(f'处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}')
logger.debug(
f"处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}"
)
if is_last and enable_modify:
# 最后一次循环执行修改
if (isinstance(target, dict) or isinstance(target, ConfigDict)):
if isinstance(target, dict) or isinstance(target, ConfigDict):
target[i] = new_value
return new_value
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
elif (
i.startswith("[")
and i.endswith("]")
and isinstance(target, (list, tuple))
):
idx_num = int(i[1:-1])
if 0 <= idx_num < len(target):
target[idx_num] = new_value
@@ -38,9 +52,15 @@ def epath(dct, path: str = '', default=None, parents=False, enable_modify=False,
else:
return default
else:
if (isinstance(target, dict) or isinstance(target, ConfigDict)) and i in target:
if (
isinstance(target, dict) or isinstance(target, ConfigDict)
) and i in target:
target = target[i]
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
elif (
i.startswith("[")
and i.endswith("]")
and isinstance(target, (list, tuple))
):
idx_num = int(i[1:-1])
if 0 <= idx_num < len(target):
target = target[idx_num]

View File

@@ -1,4 +1,5 @@
from heurams.services.logger import get_logger
class WTFException(Exception):
pass

View File

@@ -63,7 +63,7 @@ class FavoriteManager:
def _get_file_path(self) -> Path:
"""获取收藏文件路径"""
config_path = Path(config_var.get()['global']["paths"]["data"])
config_path = Path(config_var.get()["global"]["paths"]["data"])
fav_path = config_path / "global" / "favorites.json"
fav_path.parent.mkdir(parents=True, exist_ok=True)
return fav_path

View File

@@ -3,8 +3,10 @@ def truncate(text):
return text
return text[:3] + ">"
def domize(text):
return text.replace('.', '--DOT--')
return text.replace(".", "--DOT--")
def undomize(text):
return text.replace('--DOT--', '.')
return text.replace("--DOT--", ".")

View File

@@ -9,12 +9,15 @@ logger = get_logger(__name__)
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get()['services']["timer"]["daystamp_override"]
time_override = config_var.get()["services"]["timer"]["daystamp_override"]
if time_override != -1:
logger.debug("使用覆盖的日戳: %d", time_override)
return int(time_override)
result = int((time.time() + config_var.get()['services']["timer"]["timezone_offset"]) // (24 * 3600))
result = int(
(time.time() + config_var.get()["services"]["timer"]["timezone_offset"])
// (24 * 3600)
)
logger.debug("计算日戳: %d", result)
return result
@@ -22,7 +25,7 @@ def get_daystamp() -> int:
def get_timestamp() -> float:
"""获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作
time_override = config_var.get()['services']["timer"]["timestamp_override"]
time_override = config_var.get()["services"]["timer"]["timestamp_override"]
if time_override != -1:
logger.debug("使用覆盖的时间戳: %f", time_override)
return float(time_override)

View File

@@ -7,7 +7,8 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
convertor: Callable = prov[config_var.get()['services']["tts"]["provider"]].convert
convertor: Callable = prov[config_var.get()["services"]["tts"]["provider"]].convert
logger.debug(
"TTS 服务初始化完成, 使用 provider: %s", config_var.get()['services']["tts"]["provider"]
"TTS 服务初始化完成, 使用 provider: %s",
config_var.get()["services"]["tts"]["provider"],
)

View File

@@ -65,6 +65,7 @@ meaning = "狗发出的声音"
- 如果 CSV 包含更多列,它们也会以相同方式转换为键值对
- 支持 `-r` 参数指定随机种子来打乱 section 顺序
"""
import csv
import sys
import os
@@ -72,6 +73,7 @@ import random
import argparse
from pathlib import Path
def csv_to_toml(csv_path, toml_path=None, random_seed=None):
"""
将CSV文件转换为TOML格式
@@ -89,13 +91,13 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
# 确定输出TOML文件路径
if toml_path is None:
toml_path = csv_file.with_suffix('.toml')
toml_path = csv_file.with_suffix(".toml")
else:
toml_path = Path(toml_path)
# 读取CSV文件
try:
with open(csv_file, 'r', encoding='utf-8') as f:
with open(csv_file, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
except Exception as e:
@@ -119,7 +121,7 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
for row in rows:
# 处理ident列为空时生成自动标识符
ident = row.get('ident', '').strip()
ident = row.get("ident", "").strip()
if not ident:
ident = f"idx_{idx_counter}"
idx_counter += 1
@@ -129,11 +131,11 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
# 添加所有其他列作为键值对排除ident列
for key, value in row.items():
if key == 'ident':
if key == "ident":
continue
# 确保值存在且不为空
if value is not None and str(value).strip() != '':
if value is not None and str(value).strip() != "":
# 转义特殊字符并添加引号
escaped_value = str(value).replace('"', '\\"')
toml_content.append(f'"{key}" = "{escaped_value}"')
@@ -143,35 +145,40 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
# 写入TOML文件
try:
with open(toml_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(toml_content).strip())
with open(toml_path, "w", encoding="utf-8") as f:
f.write("\n".join(toml_content).strip())
print(f"成功: 已生成TOML文件 - {toml_path}")
except Exception as e:
print(f"错误: 无法写入TOML文件 - {e}")
sys.exit(1)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='将CSV文件转换为TOML格式支持随机打乱section顺序',
description="将CSV文件转换为TOML格式支持随机打乱section顺序",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
epilog="""
示例:
%(prog)s input.csv output.toml
%(prog)s input.csv # 自动生成input.toml
%(prog)s input.csv -r 42 # 使用种子42打乱顺序
%(prog)s input.csv -r 123 output.toml # 指定种子和输出路径
'''
""",
)
parser.add_argument('csv_path', help='输入的CSV文件路径')
parser.add_argument('toml_path', nargs='?', help='输出的TOML文件路径默认为CSV同名文件')
parser.add_argument('-r', '--random-seed', type=int,
help='随机种子用于打乱TOML section的顺序')
parser.add_argument("csv_path", help="输入的CSV文件路径")
parser.add_argument(
"toml_path", nargs="?", help="输出的TOML文件路径默认为CSV同名文件"
)
parser.add_argument(
"-r", "--random-seed", type=int, help="随机种子用于打乱TOML section的顺序"
)
args = parser.parse_args()
csv_to_toml(args.csv_path, args.toml_path, args.random_seed)
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,7 @@ import pickle
import readline
import sys
class DebugClient:
def __init__(self, port=5555):
self.context = zmq.Context()
@@ -33,7 +34,7 @@ class DebugClient:
if not code:
continue
if code.lower() in ['exit', 'quit']:
if code.lower() in ["exit", "quit"]:
print("退出调试客户端")
break
@@ -47,6 +48,7 @@ class DebugClient:
except EOFError:
break
if __name__ == "__main__":
# 从命令行参数获取端口
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5555