style: 代码格式化

This commit is contained in:
2026-04-23 00:13:57 +08:00
parent 1c05f42b61
commit 5b7b4ba443
23 changed files with 144 additions and 104 deletions

View File

@@ -20,7 +20,7 @@
- 此外, 算法模块是 "潜进" 内核 (heurams.kernel) 中的一等公民, 内核天然支持插拔各型算法
- 无需安装繁杂的插件即可分单元集完成算法快速切换与调优, 研究者可以方便地修改算法模块以便捷地进行研究与测试
- 内置 `SM-2` 简单间隔重复算法, 此算法亦用作 `Anki` 闪卡记忆软件的默认闪卡调度器
- 还内置 `NSP-0` 筛选用非间隔重复算法以便快速筛选记忆内容, `FSRS` 先进间隔重复算法作为效率更高的调度器, 与 `SM-15M` 复杂间隔重复算法(逆向工程)
- 还内置 `NSP-0` 筛选用非间隔重复算法以便快速筛选记忆内容, `FSRS` 先进间隔重复算法作为效率更高的调度器, 与 `SM-15M (移植自 sm.js 项目)` 复杂间隔重复算法(逆向工程)
- 算法模块可以标记记忆项目, 也可以动态规划每个记忆单元的记忆间隔时间表, 动态跟踪记忆反馈数据, 以优化长期记忆保留率与稳定性
- 得益于项目的模块化架构与单元集结构设计, 一个项目甚至可以与任意种算法共存并互通, 这对研究者及想探索/实验高效率方法的用户极其友好
@@ -35,7 +35,7 @@
- 软件内置多种谜题类型, 包括选择题 (MCQ), 填空题 (Cloze) 与识别题 (Recognition), 您可在同一单元应用多种, 或是选择启用
- 软件天然支持动态内容生成, 支持宏驱动的模板系统, 根据上下文乃至语言模型动态生成知识点的解析
- 在间隔重复研究尚被 SuperMemo 系列独占的时代, Wozniak 就早已表示 "如果不能理解知识, 就无需记忆它". 今天, 我们依然相信理解是记忆的基石
- 云同步与分享优化: 由于我们的记忆数据和单元集文件都是文本文件, 故可进行快速的增量同步而无需完整地上传所有文件, 并且设计天然支持分享内容的版本控制
- 云同步与分享优化: 由于我们的记忆数据和单元集文件都是文本文件, 故可进行快速的增量同步而无需完整地上传所有文件, 并且设计天然支持分享内容的版本控制, 如果您想分享单文件, 我们也支持 .zip/.tar.gz/.tar.xz 导入与导出
- 优越性能: 得益于现代的文件组织结构, 潜进能在保持高自由度的同时仅使用 python 就能达到敏捷且低占用的用户体验
### 实用用户界面

View File

@@ -65,7 +65,9 @@ class HeurAMSApp(App):
def action_go_back(self) -> None:
self.exit() # go_back 在最顶层是退出, Screen 会再次定义为返回, 键位都是 q, 免得不一样
def action_do_nothing(self) -> None: # 用来给没使用/禁用的快捷键占位, 因为 Binding 删除不了
def action_do_nothing(
self,
) -> None: # 用来给没使用/禁用的快捷键占位, 因为 Binding 删除不了
pass
# 移除烦人的 "rich traceback"

View File

@@ -22,6 +22,7 @@ class AboutScreen(Screen):
("z", "go_back", "关于"),
]
SUB_TITLE = "关于"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -33,8 +34,10 @@ class AboutScreen(Screen):
def compose(self) -> ComposeResult:
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="about_container"):
yield Label("[b]关于与版本信息[/b]")
@@ -115,7 +118,7 @@ Textual 框架版本: {textual_version}
import textual
return textual.__version__
except ImportError, AttributeError:
except (ImportError, AttributeError):
return "未知"
def _get_terminal_info(self) -> str:

View File

@@ -47,8 +47,10 @@ class DashboardScreen(Screen):
def compose(self) -> ComposeResult:
"""组合界面组件"""
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer():
yield Horizontal( # 顶部的状态
Vertical(
@@ -76,14 +78,19 @@ class DashboardScreen(Screen):
)
yield ListView(id="repo_list", classes="repo-list") # 单元集选择
from heurams.services.attic import Attic
a = Attic('ana', {'totaltime': 0, 'openpuzzles': 0, 'puzzles_err': 0})
a = Attic("ana", {"totaltime": 0, "openpuzzles": 0, "puzzles_err": 0})
yield Label(f"版本 {version.ver} {version.stage.capitalize()}") # 版本信息
yield Label(f"{round(a.data['totaltime'], 2)} 秒内处理了 {a.data['openpuzzles']} 个谜题, 正确率{'无法求解' if not a.data['openpuzzles'] else ' ' + str(round(100 * (1 - a.data['puzzles_err']/a.data['openpuzzles']), 2)) + '%'}, 平均速度{'无法求解' if not a.data['totaltime'] else ' ' + str(round(a.data['openpuzzles']/a.data['totaltime'], 2)) + '个/s'}", id='analysis') # 版本信息
yield Label(
f"{round(a.data['totaltime'], 2)} 秒内处理了 {a.data['openpuzzles']} 个谜题, 正确率{'无法求解' if not a.data['openpuzzles'] else ' ' + str(round(100 * (1 - a.data['puzzles_err']/a.data['openpuzzles']), 2)) + '%'}, 平均速度{'无法求解' if not a.data['totaltime'] else ' ' + str(round(a.data['openpuzzles']/a.data['totaltime'], 2)) + '个/s'}",
id="analysis",
) # 版本信息
yield Footer()
@on(events.ScreenResume)
def post_active(self, event):
from heurams.interface import shim
shim.set_term_title(f"{self.app.TITLE} - {self.SUB_TITLE}")
# https://github.com/Textualize/textual/discussions/4268
# self.refresh(recompose=True) 此函数有问题且官方不管 而且性能低
@@ -112,10 +119,14 @@ class DashboardScreen(Screen):
}
repo.preview = {
"review": 0,
"new": repo.config["scheduled_num"], # TODO: 考虑之后在这里加点运算避免 SM-2 积压, 但现在需要的是直观!
"new": repo.config[
"scheduled_num"
], # TODO: 考虑之后在这里加点运算避免 SM-2 积压, 但现在需要的是直观!
}
initial_time = float("inf")
for i in range(repo.data_length): # TODO: 增加异步性能优化, 但是学习数据属实规模小...
for i in range(
repo.data_length
): # TODO: 增加异步性能优化, 但是学习数据属实规模小...
e = pt.Electron.from_data(
electronic_data=repo.electronic_data_lict[i],
algo_name=repo.config["algorithm"],
@@ -159,7 +170,7 @@ class DashboardScreen(Screen):
return
for r in self.repos:
self.repolink[str(r.manifest['package'])] = r # 用于规避 ctype id 对象还原
self.repolink[str(r.manifest["package"])] = r # 用于规避 ctype id 对象还原
# NOTE: 上一行不要使用 id(), id 可能被重用!
list_item = ListItem(
*[Label(line) for line in r.prompt.splitlines()],
@@ -202,5 +213,6 @@ class DashboardScreen(Screen):
logger.debug(f"event.button.id: {event.button.id}")
if event.button.id.startswith("slaunch_repo_"): # type: ignore
from .preparation import launch
launch(repo=self.repolink[event.button.id.removeprefix("slaunch_repo_")], app=self.app, scheduled_num=-1) # type: ignore
# TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中

View File

@@ -57,8 +57,10 @@ class FavoriteManagerScreen(Screen):
def compose(self) -> ComposeResult:
"""组合界面组件"""
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="favorites-container"):
if not self.favorites:
yield Label("暂无收藏", classes="empty-label")

View File

@@ -20,6 +20,7 @@ from .. import shim
logger = get_logger(__name__)
class MemScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
@@ -59,14 +60,18 @@ class MemScreen(Screen):
@on(events.ScreenResume)
def post_active(self, event):
from heurams.interface import shim
shim.set_term_title(f"{self.app.TITLE} - {self.SUB_TITLE}")
def compose(self) -> ComposeResult:
from heurams.services.attic import Attic
a = Attic('ana', {'openqueue': 0})
a.data['openqueue'] += 1
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
a = Attic("ana", {"openqueue": 0})
a.data["openqueue"] += 1
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer():
yield Label(self._get_progress_text(), id="head_stat")
yield ScrollableContainer(id="puzzle_container")
@@ -81,8 +86,9 @@ class MemScreen(Screen):
self.expander = self.procession.get_expander()
from heurams.services.attic import Attic
import time
a = Attic('ana', {'last': time.time()})
a.data['last'] = time.time()
a = Attic("ana", {"last": time.time()})
a.data["last"] = time.time()
self.mount_puzzle()
self.update_display()
@@ -122,8 +128,8 @@ class MemScreen(Screen):
def mount_finished_widget(self):
"""挂载已完成组件"""
a = Attic('ana', {'finished': 0})
a.data['finished'] += 1
a = Attic("ana", {"finished": 0})
a.data["finished"] += 1
container = self.query_one("#puzzle_container")
for i in container.children:
i.remove()
@@ -176,12 +182,14 @@ class MemScreen(Screen):
self.forward_atom(self.expander.get_quality())
self.update_state()
from heurams.services.attic import Attic
a = Attic('ana', {'openpuzzles': 0})
a = Attic('ana', {'totaltime': 0})
a.data['openpuzzles'] += 1
a = Attic("ana", {"openpuzzles": 0})
a = Attic("ana", {"totaltime": 0})
a.data["openpuzzles"] += 1
import time
a.data['totaltime'] += time.time() - a.data['last']
a.data['last'] = time.time()
a.data["totaltime"] += time.time() - a.data["last"]
a.data["last"] = time.time()
self.mount_puzzle()
self.update_display()
@@ -201,8 +209,8 @@ class MemScreen(Screen):
logger.debug(f"Quality: {quality}")
self.atom_reporter(quality)
if quality <= 3:
a = Attic('ana', {'puzzles_err': 0})
a.data['puzzles_err'] += 1
a = Attic("ana", {"puzzles_err": 0})
a.data["puzzles_err"] += 1
self.procession.append()
self.update_state() # 刷新状态
self.procession.forward(1)
@@ -261,7 +269,8 @@ class MemScreen(Screen):
def action_resume_mark(self):
from heurams.services.attic import Attic
import time
a = Attic('ana')
l = a.data['last']
a.data['last'] = time.time()
a = Attic("ana")
l = a.data["last"]
a.data["last"] = time.time()
self.app.notify(f"时间恢复已修正: {l} -> {a.data['last']}")

View File

@@ -1,4 +1,3 @@
from textual.app import ComposeResult
from textual.containers import Grid
from textual.screen import ModalScreen

View File

@@ -112,8 +112,10 @@ class PrecachingScreen(Screen):
def compose(self) -> ComposeResult:
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
with Container():
@@ -314,7 +316,6 @@ class PrecachingScreen(Screen):
try:
import shutil
shutil.rmtree(cache_dir, ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
self._update_cache_display() # 更新缓存统计显示

View File

@@ -52,10 +52,13 @@ class PreparationScreen(Screen):
def compose(self) -> ComposeResult:
from heurams.services.attic import Attic
a = Attic('ana', {'openpre': 0})
a.data['openpre'] += 1
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
a = Attic("ana", {"openpre": 0})
a.data["openpre"] += 1
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="main_container"):
yield Markdown(
f"**准备就绪**: `{self.repo.manifest['title']}`\n", id="title"

View File

@@ -1,6 +1,5 @@
"""设置页面"""
from textual.app import ComposeResult
from textual.containers import ScrollableContainer, Horizontal
from textual.screen import Screen
@@ -47,12 +46,15 @@ class SettingScreen(Screen):
@on(events.ScreenResume)
def post_active(self, event):
from heurams.interface import shim
shim.set_term_title(f"{self.app.TITLE} - {self.SUB_TITLE}")
def compose(self) -> ComposeResult:
"""组合界面组件"""
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer():
yield Label("[b]设置页面[/b]")
for i in config_var.get():
@@ -61,7 +63,8 @@ class SettingScreen(Screen):
a = self._get_subcfg(f"{i}")
if a:
yield Collapsible(
*a, title=i + f'\n[d]{config_var.get().get(f"_{i}_desc", "")}[/d]'
*a,
title=i + f'\n[d]{config_var.get().get(f"_{i}_desc", "")}[/d]',
)
yield Label(
"退出页面时, 所作的更改会立即保存, 但仍建议重启软件以确保新的配置得到应用",
@@ -113,7 +116,7 @@ class SettingScreen(Screen):
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="setting-item"
classes="setting-item",
)
)
elif isinstance(parent[f"_{i}_candidate"], list):
@@ -125,7 +128,7 @@ class SettingScreen(Screen):
prompt=f'{parent.get(f"{i}", "")}',
id=domize(f"{parent_epath}.{i}"),
),
classes="setting-item"
classes="setting-item",
)
)
else:
@@ -139,7 +142,7 @@ class SettingScreen(Screen):
type="number",
id=domize(f"{parent_epath}.{i}"),
),
classes="setting-item"
classes="setting-item",
)
)
elif isinstance(parent[i], str):
@@ -152,7 +155,7 @@ class SettingScreen(Screen):
type="text",
id=domize(f"{parent_epath}.{i}"),
),
classes="setting-item"
classes="setting-item",
)
)
elif isinstance(parent[i], bool):
@@ -160,10 +163,11 @@ class SettingScreen(Screen):
Horizontal(
Label(i + f'\n[d]{parent.get(f"_{i}_desc", "")}[/d]'),
Switch(
value=parent[i], id=domize(f"{parent_epath}.{i}"),
value=parent[i],
id=domize(f"{parent_epath}.{i}"),
classes="setting-switch",
),
classes="setting-item"
classes="setting-item",
)
)
elif isinstance(parent[i], int):
@@ -176,7 +180,7 @@ class SettingScreen(Screen):
type="integer",
id=domize(f"{parent_epath}.{i}"),
),
classes="setting-item"
classes="setting-item",
)
)
elif isinstance(parent[i], list):

View File

@@ -34,8 +34,10 @@ class SyncScreen(Screen):
def compose(self) -> ComposeResult:
if config_var.get()['interface']['global']['show_header']:
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="sync_container"):
# 标题和连接状态
yield Static("同步工具", classes="title")

View File

@@ -15,7 +15,7 @@ puzzle2widget = {
def set_term_title(title):
if not config_var.get()['interface']['global']['change_window_title']:
if not config_var.get()["interface"]["global"]["change_window_title"]:
return
system = platform.system()
if system == "Windows":

View File

@@ -79,14 +79,14 @@ class ClozePuzzle(BasePuzzleWidget):
self.hashmap[h] = i
btnid = f"sel000-{h}"
logger.debug(f"建立按钮 {btnid}")
self.btn_shortcuts[f'{c}'] = btnid
yield Button(f'[{c}] {i}', id=f"{btnid}")
self.btn_shortcuts[f"{c}"] = btnid
yield Button(f"[{c}] {i}", id=f"{btnid}")
s.focus()
yield Button("退格", id="delete")
self.btn_shortcuts[f'0'] = 'delete'
self.btn_shortcuts[f'backspace'] = 'delete'
self.btn_shortcuts[f'delete'] = 'delete'
self.btn_shortcuts[f"0"] = "delete"
self.btn_shortcuts[f"backspace"] = "delete"
self.btn_shortcuts[f"delete"] = "delete"
def listprint(self, lst):
s = ""
@@ -133,5 +133,5 @@ class ClozePuzzle(BasePuzzleWidget):
self.notify(event.key)
if event.key in self.btn_shortcuts:
btn_id = self.btn_shortcuts.get(event.key)
btn_id = '#' + btn_id
btn_id = "#" + btn_id
self.query_one(btn_id, Button).press()

View File

@@ -79,21 +79,21 @@ class MCQPuzzle(BasePuzzleWidget):
c = 0
with ScrollableContainer(id="btn-container") as s:
for i in current_options:
if i in [' ', '']:
if i in [" ", ""]:
continue
c += 1
h = str(hash(i))
self.hashmap[h] = i
btnid = f"sel{str(self.cursor).zfill(3)}-{h}"
logger.debug(f"建立按钮 {btnid}")
self.btn_shortcuts[f'{c}'] = f'{btnid}'
yield Button(f'[{c}] ' + i, id=f"{btnid}")
self.btn_shortcuts[f"{c}"] = f"{btnid}"
yield Button(f"[{c}] " + i, id=f"{btnid}")
s.focus()
yield Button("退格", id="delete")
self.btn_shortcuts['0'] = f'delete'
self.btn_shortcuts['delete'] = f'delete'
self.btn_shortcuts['backspace'] = f'delete'
self.btn_shortcuts["0"] = f"delete"
self.btn_shortcuts["delete"] = f"delete"
self.btn_shortcuts["backspace"] = f"delete"
def update_display(self, error=0):
# 更新预览标签
@@ -162,7 +162,7 @@ class MCQPuzzle(BasePuzzleWidget):
if current_question_index < len(self.puzzle.options):
current_options = self.puzzle.options[current_question_index]
for option in current_options:
if option in ['', ' ']:
if option in ["", " "]:
continue
c += 1
button_id = f"sel{str(self.cursor).zfill(3)}-{hash(option)}"
@@ -182,5 +182,5 @@ class MCQPuzzle(BasePuzzleWidget):
self.notify(event.key)
if event.key in self.btn_shortcuts:
btn_id = self.btn_shortcuts.get(event.key)
btn_id = '#' + btn_id
btn_id = "#" + btn_id
self.query_one(btn_id, Button).press()

View File

@@ -49,7 +49,7 @@ class Recognition(BasePuzzleWidget):
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()["interface"]["widgets"]['recognition']["autovoice"]
autovoice = config_var.get()["interface"]["widgets"]["recognition"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["nucleon"]["puzzles"][self.alia]

View File

@@ -1,4 +1,3 @@
from .atom import Atom
from .electron import Electron
from .nucleon import Nucleon

View File

@@ -1,4 +1,3 @@
from heurams.services.logger import get_logger
from .base import BasePuzzle

View File

@@ -136,7 +136,6 @@ class Router(Machine):
def __repr__(self, style="pipe", ends="\n"):
from tabulate import tabulate as tabu
lst = [
{
"Type": "Router",

View File

@@ -6,31 +6,37 @@ from pathlib import Path
import atexit
from heurams.services import timer
from heurams.services.exceptions import WTFException
logger = get_logger(__name__)
def singleton(cls):
instances = {}
def get_instance(ident, default):
key = ident
if key not in instances:
instances[key] = cls(ident)
instances[key].patch_dict(default)
return instances[key]
return get_instance
atticdir = Path(config_var.get()['global']['paths']['misc']) / 'attics'
atticdir = Path(config_var.get()["global"]["paths"]["misc"]) / "attics"
atticdir.mkdir(parents=True, exist_ok=True)
@singleton
class Attic:
def __init__(self, ident, default:dict={}):
def __init__(self, ident, default: dict = {}):
self.ident = ident
self.ident = self.ident.replace('<DAYSTAMP>', str(timer.get_daystamp()))
self.ident = self.ident.replace('<TIMESTAMP>', str(timer.get_timestamp()))
if '<' in ident or '>' in ident:
self.ident = self.ident.replace("<DAYSTAMP>", str(timer.get_daystamp()))
self.ident = self.ident.replace("<TIMESTAMP>", str(timer.get_timestamp()))
if "<" in ident or ">" in ident:
raise WTFException
#self.ident = get_md5(self.ident)
self.pklpath = atticdir / f'{self.ident}.pkl'
# self.ident = get_md5(self.ident)
self.pklpath = atticdir / f"{self.ident}.pkl"
atexit.register(self.save)
self.data = default
if self.pklpath.exists():
@@ -45,9 +51,9 @@ class Attic:
self.data.update({k: v for k, v in dct.items() if k not in self.data})
def save(self):
with open(atticdir / f'{self.ident}.pkl', 'wb') as f:
with open(atticdir / f"{self.ident}.pkl", "wb") as f:
pkl.dump(self.data, f)
def load(self):
with open(atticdir / f'{self.ident}.pkl', 'rb') as f:
with open(atticdir / f"{self.ident}.pkl", "rb") as f:
self.data.update(dict(pkl.load(f)))

View File

@@ -1,4 +1,2 @@
class WTFException(Exception):
pass

View File

@@ -13,8 +13,8 @@ def get_md5(text):
def hash(text):
#logger.debug(f"计算MD5-时间复合哈希, 输入`{text}`")
#result = hashlib.md5(f"{text}{random.randint(0,1000)}".encode("utf-8")).hexdigest()
#logger.debug("哈希结果: %s...", result[:8])
#return result
# logger.debug(f"计算MD5-时间复合哈希, 输入`{text}`")
# result = hashlib.md5(f"{text}{random.randint(0,1000)}".encode("utf-8")).hexdigest()
# logger.debug("哈希结果: %s...", result[:8])
# return result
return get_md5(text)

View File

@@ -1,3 +1,5 @@
"""会话模块"""
class Session:
pass