2 Commits

Author SHA1 Message Date
pluv 6a3fa75e13 style: 规范化部分模块注释 2026-06-04 08:08:20 +08:00
pluv b31c045aa5 docs: 修改文档 2026-05-19 00:15:08 +08:00
36 changed files with 594 additions and 104 deletions
+52 -26
View File
@@ -8,19 +8,17 @@
[详细介绍](INTRODUCTION.md) [屏幕截图](SCREENSHOTS.md)
<p align="left">
<a href="https://github.com/pluvium27/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/GitHub-fafafa?style=for-the-badge&logo=github&logoColor=181717" alt="GitHub" /></a>
<a href="https://invent.kde.org/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/KDE_Invent-1D99F3?style=for-the-badge&logo=kde&logoColor=white" alt="KDE Invent" /></a>
<a href="https://gitee.com/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/Gitee-C71D23?style=for-the-badge&logo=gitee&logoColor=white" alt="Gitee" /></a>
<a href="https://git.pluv27.top/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/git.pluv27.top-609926?style=for-the-badge&logo=gitea&logoColor=white" alt="git.pluv27.top" /></a>
<a href="https://github.com/pluvium27/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/GitHub-fafafa?style=for-the-badge&logo=github&logoColor=181717" alt="GitHub" /></a><a href="https://invent.kde.org/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/KDE_Invent-1D99F3?style=for-the-badge&logo=kde&logoColor=white" alt="KDE Invent" /></a><a href="https://gitee.com/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/Gitee-C71D23?style=for-the-badge&logo=gitee&logoColor=white" alt="Gitee" /></a><a href="https://git.pluv27.top/pluv/HeurAMS" target="_blank" rel="noopener noreferrer"><img src="https://img.shields.io/badge/git.pluv27.top-609926?style=for-the-badge&logo=gitea&logoColor=white" alt="git.pluv27.top" /></a>
</p>
## 快速开始
### 从包管理器安装
### 安装软件
潜进 (包名是 `heurams`) 处于早期开发考虑, 尚未上架 PyPI, 但您可以用 pip 支持的 git 协议安装稳定版和开发版本, 这要求您的电脑上安装了 python 环境 (建议 3.12.13 及之后版本).
#### 从包管理器安装
#### 面向用户的安装
潜进 (包名是 `heurams`) 处于早期开发考虑, 尚未上架 PyPI.
但可以用 pip 从仓库安装稳定版和开发版本, 这要求设备上安装了 python 环境 (建议 3.12.13 及之后版本).
从稳定的 `master` 分支安装, 并安装适用于用户体验的可选依赖(推荐):
@@ -34,35 +32,63 @@ pip install --upgrade 'heurams[basic] @ https://git.pluv27.top/pluv/HeurAMS/arch
pip install --force-reinstall --no-deps 'heurams[basic] @ https://git.pluv27.top/pluv/HeurAMS/archive/dev.zip'
```
安装适用于一般计算机的通用音频模块(基于 playsound3):\
安装适用于一般计算机的通用音频模块 (基于 playsound3):\
(此项不适用于 termux 环境, termux 的音频支持是内建的)
```
pip install --upgrade 'heurams[audio-playsound] @ https://git.pluv27.top/pluv/HeurAMS/archive/master.zip'
```
#### 面向开发者的安装
> 您也可以从 `refactor/...` 等特定分支安装以测试某项更改
> [!CAUTION]
> 对于部分 Linux 发行版和 Android Termux 用户:\
> 您需要先行安装 `cmake` 和 `libzmq` 才能正确安装项目的 `zmq` 依赖.\
> 例如在 termux 上先运行 `pkg install cmake clang libzmq`.\
> 项目功能本身不依赖它, 但需要该依赖用于启动可选的调试服务器.
[依赖分组说明](INTRODUCTION.md#包依赖组说明)
`dev` 分支进行基于 git 的可编辑安装, 并安装全部可选依赖(推荐):
#### 从源码安装
```
pip install --force-reinstall --no-deps 'heurams[all] @ https://git.pluv27.top/pluv/HeurAMS/archive/dev.zip'
我们提供原生 python 和 uv 两种源码安装方式.\
详见[贡献指南 - 设置开发环境](CONTRIBUTING.md#设置开发环境).
### 使用软件
在终端中运行 `heurams`, 您会看到一系列帮助信息, 例如:
```plain
~ $ heurams
Usage: heurams [OPTIONS] COMMAND [ARGS]...
HeurAMS 0.5.1 - 启发式辅助记忆调度器
Options:
-v, --version Show the version and exit.
-h, --help Show this message and exit.
Commands:
help 显示此帮助信息
tui 启动内置基本用户界面 (TUI)
version 输出版本信息
```
> 您也可以从 `refactor/...` 等特定分支安装
可以通过键入 `heurams tui` 启动基本用户界面, 例如:
[依赖组说明](INTRODUCTION.md#包依赖组说明)
```plain
~ $ heurams tui
欢迎使用基本用户界面!
加载配置与上下文... 已完成! (耗时: 33ms)
加载用户界面框架... 已完成! (耗时: 169ms)
加载用户界面布局... 已完成! (耗时: 165ms)
组件目录: <软件包所在目录>
工作目录: <运行目录, 将在此目录下建立 ./data 文件夹>
前置工作共计耗时: 142ms
### 从源码安装
(此时您的终端将转为呈现美观的 TUI 基本用户界面)
```
我们提供原生 python 和 uv 两种安装方式.\
详见[贡献指南](CONTRIBUTING.md).
通过键入 `heurams -v` 查看版本:
```
~ $ heurams -v
HeurAMS 0.5.1 stable (fulcrum/支点), Linux
```
## 常见问题 (FAQ)
@@ -96,19 +122,19 @@ HeurAMS 项目标识如下, 文件(位图和矢量图)位于 `./src/heurams/asse
### 项目本身
本项目基于 AGPL-3.0 许可证开放源代码, 并有一个豁免本机 API 调用的附加条款, 较标准 AGPL-3.0 更松.
本项目基于 AGPL-3.0 许可证开放源代码, 并有一个豁免本机 API 调用的附加条款, 较标准 AGPL-3.0 更松.
详见根目录下 [LICENSE](LICENSE) 文件.
### 第三方代码
项目在 `src/heurams/vendor/` 目录下嵌入或在其他位置接使用了以下第三方代码(可能有修改):
项目在 `src/heurams/vendor/` 目录下嵌入或在其他位置接使用了以下第三方代码或其衍生作品 (可能有修改):
#### SM.js (slaypni)
#### SM.js
- 上游版本: commit `6e3bb4a` (2015年2月4日上游已停止维护)
- 引用方式: 将 coffeescript 重写为 python 并间接引用, 数学原理一致; 并对重写后代码进行逻辑, 性能与标准化 API 改进
- 位置: `src/heurams/kernel/algorithms/sm15m*.py`
- 位置: `src/heurams/kernel/algorithms/sm15m.py`
- 原项目: [SM.js](https://github.com/slaypni/SM-15)
- 原版权: Copyright (c) 2014 Kazuaki Tanida
- 原许可证: MIT License
+1
View File
@@ -4,6 +4,7 @@
- Textual 基本用户界面 (heurams.interface): 基于 Python Textual 框架构建的程序库内置跨平台 TUI 界面, 支持触屏、鼠标、键盘多操作模式, 是当前开箱即用的默认前端.
- KiriMemo (org.kde.kirimemo): 基于 KDE Kirigami 框架的现代跨平台前端, 使用 C++ 和 QML 构建, 通过 `PyOtherSide` 直接复用 Python 内核, 为多种平台提供原生体验 (尚未稳定).
<!--- ArkMemo (top.pluv27.arkmemo): 基于 ArkUI 的现代移动设备前端, 使用 ArkTS 构建, 通过 API 调用 Python 内核, 为 Android, HarmonyOS, iOS 平台提供原生体验 (尚未稳定)-->
欢迎为现有前端贡献代码, 或开发您自己的前端.\
详见[贡献指南](CONTRIBUTING.md#%E6%96%B0%E7%9A%84%E7%94%A8%E6%88%B7%E7%95%8C%E9%9D%A2%E5%89%8D%E7%AB%AF).
+1 -1
View File
@@ -68,8 +68,8 @@ API 版本代号: `{version.codename.capitalize()}`
感谢以下人士与团体, 他们的算法与理论构成了此软件现有算法的基石:
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论
- [Jarrett Ye](https://github.com/L-M-Sherlock): FSRS 算法与间隔重复理论文献参考
- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 逆向实现
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 间隔重复文献参考
- [Open Spaced Repetition](https://github.com/open-spaced-repetition): FSRS 算法底层实现
# 运行环境信息
+2 -2
View File
@@ -128,7 +128,7 @@ class FavoriteManagerScreen(Screen):
return ListItem(container)
def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]:
"""获取仓库信息标题、原子内容预览"""
"""获取仓库信息 (标题、原子内容预览) """
try:
data_repo = Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dir = data_repo / repo_path
@@ -200,7 +200,7 @@ class FavoriteManagerScreen(Screen):
# 重新组合
if not self.favorites:
container.mount(Label("暂无收藏", classes="empty-label"))
container.mount(Static("使用 * 键在记忆界面中添加收藏"))
container.mount(Static("使用 * 键在记忆界面中添加收藏. "))
else:
container.mount(
Label(f"{len(self.favorites)} 个收藏项", classes="count-label")
+3 -3
View File
@@ -240,7 +240,7 @@ class MemScreen(Screen):
self.rating = 3
def _get_repo_rel_path(self) -> str:
"""获取仓库相对路径相对于 data/repo"""
"""获取仓库相对路径 (相对于 data/repo) """
if self.repo is None:
return ""
# self.repo.source 是 Path 对象, 指向仓库目录
@@ -250,7 +250,7 @@ class MemScreen(Screen):
rel_path = repo_full_path.relative_to(data_repo_path)
return str(rel_path)
except ValueError:
# 如果不在 data/repo 下, 则返回完整路径字符串形式
# 如果不在 data/repo 下, 则返回完整路径 (字符串形式)
return str(repo_full_path)
def _is_current_atom_favorited(self) -> bool:
@@ -273,7 +273,7 @@ class MemScreen(Screen):
else:
favorite_manager.add(repo_path, ident)
self.app.notify(f"已收藏:{ident}", severity="information")
# 更新显示如果需要
# 更新显示 (如果需要)
self.update_display()
def action_block_prompt(self):
+2 -2
View File
@@ -78,7 +78,7 @@ class PrecachingScreen(Screen):
for repo in repos:
try:
total += len(repo.ident_index)
except:
except (AttributeError, TypeError):
continue
return total
@@ -278,7 +278,7 @@ class PrecachingScreen(Screen):
repo.nucleonic_data_lict.get_itemic_unit(i)
)
)
except:
except (KeyError, TypeError, AttributeError):
continue
self.total = len(nucleon_list)
return self.precache_by_list(nucleon_list)
+2 -2
View File
@@ -190,7 +190,7 @@ class SyncScreen(Screen):
self.run_worker(self.perform_sync, thread=True)
def perform_sync(self):
"""执行同步任务在后台线程中运行"""
"""执行同步任务 (在后台线程中运行) """
worker = get_current_worker()
try:
@@ -234,7 +234,7 @@ class SyncScreen(Screen):
is_error=True,
)
# 同步 orbital 目录如果存在
# 同步 orbital 目录 (如果存在)
orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital"))
if orbital_dir.exists():
self.log_message(f"同步 orbital 目录: {orbital_dir}")
@@ -1,3 +1,8 @@
"""算法模块
自动发现并加载所有算法实现, 提供统一的算法注册表.
"""
import importlib
import pkgutil
from pathlib import Path
+58 -5
View File
@@ -9,6 +9,17 @@ _registry: dict[str, type["BaseAlgorithm"]] = {}
class BaseAlgorithm:
"""间隔重复算法基类
定义所有调度算法必须实现的接口. 子类通过继承此类并设置 algo_name
自动注册到全局算法注册表.
Attributes:
algo_name: 算法的唯一标识名称, 用于注册和查找
desc: 算法的简短描述
defaults: 算法数据字典的默认值模板
"""
algo_name = "BaseAlgorithm"
desc = "算法基类"
@@ -18,6 +29,11 @@ class BaseAlgorithm:
@classmethod
def get_registry(cls) -> dict[str, type["BaseAlgorithm"]]:
"""获取所有已注册算法的字典
Returns:
键为 algo_name, 值为算法类的字典
"""
return dict(_registry)
class AlgodataDict(TypedDict):
@@ -43,7 +59,15 @@ class BaseAlgorithm:
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
) -> None:
"""迭代记忆数据"""
"""迭代记忆数据
根据用户反馈更新算法状态, 计算下一次复习时间.
Args:
algodata: 算法数据字典, 包含该算法的所有状态参数
feedback: 用户反馈评分 (0-5), -1 表示跳过更新
is_new_activation: 是否为首次激活, 首次激活时重置部分参数
"""
logger.debug(
"BaseAlgorithm.revisor 被调用, algodata keys: %s, feedback: %d, is_new_activation: %s",
list(algodata.keys()) if algodata else [],
@@ -53,7 +77,14 @@ class BaseAlgorithm:
@classmethod
def is_due(cls, algodata) -> int:
"""是否应该复习"""
"""判断是否应该复习
Args:
algodata: 算法数据字典
Returns:
1 表示应该复习, 0 表示不需要
"""
logger.debug(
"BaseAlgorithm.is_due 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
@@ -62,7 +93,14 @@ class BaseAlgorithm:
@classmethod
def get_rating(cls, algodata) -> str:
"""获取评分信息"""
"""获取当前记忆状态的评分信息
Args:
algodata: 算法数据字典
Returns:
评分的字符串表示, 如 efactor 值
"""
logger.debug(
"BaseAlgorithm.rate 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
@@ -71,7 +109,14 @@ class BaseAlgorithm:
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次记忆时间戳"""
"""获取下一次复习的时间戳
Args:
algodata: 算法数据字典
Returns:
下次复习的日期戳 (天数), -1 表示无计划
"""
logger.debug(
"BaseAlgorithm.nextdate 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
@@ -80,8 +125,16 @@ class BaseAlgorithm:
@classmethod
def check_integrity(cls, algodata):
"""校验算法数据完整性
Args:
algodata: 算法数据字典
Returns:
1 表示数据完整, 0 表示数据缺失或格式错误
"""
try:
cls.AlgodataDict(**algodata[cls.algo_name])
return 1
except:
except (KeyError, TypeError, ValueError):
return 0
+7 -7
View File
@@ -61,7 +61,7 @@ def _feedback_to_rating(feedback: int) -> Rating:
def _datetime_to_daystamp(dt: datetime) -> int:
"""将 datetime 转换为天数戳从 1970-01-01"""
"""将 datetime 转换为天数戳 (从 1970-01-01) """
epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
delta = dt - epoch
return delta.days
@@ -81,7 +81,7 @@ class FSRSAlgorithm(BaseAlgorithm):
# FSRS 特有字段
fsrs_state: int # State 枚举值: 1=Learning, 2=Review, 3=Relearning
fsrs_step: int # 当前学习步进索引, -1 表示 None (Review 状态)
fsrs_stability: float # 稳定性(秒), 0.0 表示尚未计算
fsrs_stability: float # 稳定性 (秒) , 0.0 表示尚未计算
fsrs_difficulty: float # 难度 [1.0, 10.0], 0.0 表示尚未计算
# 标准 BaseAlgorithm 兼容字段
real_rept: int
@@ -116,11 +116,11 @@ class FSRSAlgorithm(BaseAlgorithm):
# State: int → IntEnum
card.state = data.get("fsrs_state", 1)
# Step: -1 表示 NoneReview 状态下的 card.step 为 None
# Step: -1 表示 None (Review 状态下的 card.step 为 None)
step = data.get("fsrs_step", -1)
card.step = None if step == -1 else step
# Stability: 0.0 表示尚未计算新卡片
# Stability: 0.0 表示尚未计算 (新卡片)
stability = data.get("fsrs_stability", 0.0)
card.stability = None if stability == 0.0 else stability
@@ -170,11 +170,11 @@ class FSRSAlgorithm(BaseAlgorithm):
):
"""FSRS 算法迭代决策机制实现
将 feedback (0-5) 映射为 FSRS Rating 后交由 py-fsrs 调度器处理
将 feedback (0-5) 映射为 FSRS Rating 后交由 py-fsrs 调度器处理.
Args:
feedback (int): 0-5 的记忆保留率量化参数
is_new_activation: 是否为全新激活重置为初始状态
is_new_activation: 是否为全新激活 (重置为初始状态)
"""
logger.debug(
"FSRS.revisor 开始, feedback: %d, is_new_activation: %s",
@@ -201,7 +201,7 @@ class FSRSAlgorithm(BaseAlgorithm):
cls._card_to_algodata(card, algodata)
# real_rept: 总复习次数
algodata[cls.algo_name]["real_rept"] += 1
# rept: 成功回忆次数feedback ≥ 3 视为成功
# rept: 成功回忆次数 (feedback ≥ 3 视为成功)
if feedback >= 3:
algodata[cls.algo_name]["rept"] += 1
+42 -6
View File
@@ -9,6 +9,16 @@ logger = get_logger(__name__)
class NSP0Algorithm(BaseAlgorithm):
"""NSP-0 非间隔重复调度器
快速筛选用算法, 对低分项目保持每日复习, 高分项目标记为已掌握.
适用于需要快速过滤大量材料的场景.
Attributes:
algo_name: "NSP-0"
desc: 快速筛选用非间隔重复调度器
"""
algo_name = "NSP-0"
desc = "快速筛选用非间隔重复调度器"
@@ -38,11 +48,13 @@ class NSP0Algorithm(BaseAlgorithm):
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
):
"""NSP-0 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
低分 (feedback<=3) 设置间隔为 1 天, 高分标记为已掌握 (间隔无限).
Args:
quality (int): 记忆保留率量化参数
algodata: 算法数据字典
feedback: 记忆保留率量化参数 (0-5), -1 表示跳过
is_new_activation: 是否为首次激活
"""
logger.debug(
"NSP0.revisor 开始, feedback: %d, is_new_activation: %s",
@@ -73,6 +85,14 @@ class NSP0Algorithm(BaseAlgorithm):
@classmethod
def is_due(cls, algodata):
"""判断是否应该复习
Args:
algodata: 算法数据字典
Returns:
True 表示到期, False 表示未到期
"""
result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
logger.debug(
"NSP0.is_due: next_date=%d, current_daystamp=%d, result=%s",
@@ -84,12 +104,28 @@ class NSP0Algorithm(BaseAlgorithm):
@classmethod
def get_rating(cls, algodata):
efactor = algodata[cls.algo_name]["efactor"]
logger.debug("NSP0.rate: efactor=%f", efactor)
return str(efactor)
"""获取当前 important 标记作为评分信息
Args:
algodata: 算法数据字典
Returns:
important 值的字符串表示
"""
important = algodata[cls.algo_name]["important"]
logger.debug("NSP0.rate: important=%d", important)
return str(important)
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次复习日期
Args:
algodata: 算法数据字典
Returns:
下次复习的天数戳
"""
next_date = algodata[cls.algo_name]["next_date"]
logger.debug("NSP0.nextdate: %d", next_date)
return next_date
+3 -3
View File
@@ -643,13 +643,13 @@ class SM15MAlgorithm(BaseAlgorithm):
lapse: int
repetition: int
of_val: float # O-Factor
optimum_interval_days: int # 最优间隔(天)
optimum_interval_days: int # 最优间隔 (天)
afs: list # A-Factor 历史
af: float # 当前 A-Factor
# 毫秒精度子日排程
# 毫秒精度 (子日排程)
last_date_ms: int
next_date_ms: int
# BaseAlgorithm 兼容天精度, 向后兼容
# BaseAlgorithm 兼容 (天精度, 向后兼容)
real_rept: int
rept: int
interval: int
+39 -3
View File
@@ -9,6 +9,16 @@ logger = get_logger(__name__)
class SM2Algorithm(BaseAlgorithm):
"""SuperMemo-2 算法实现
经典间隔重复算法, 基于 1987 年 Piotr Wozniak 设计的 SM-2.
通过维护 efactor (难度因子) 来调整复习间隔.
Attributes:
algo_name: "SM-2"
desc: SuperMemo2 (1987) 简单间隔重复调度器
"""
algo_name = "SM-2"
desc = "SuperMemo2 (1987) 简单间隔重复调度器"
@@ -38,11 +48,13 @@ class SM2Algorithm(BaseAlgorithm):
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
根据 feedback (0-5) 更新 efactor 并计算下次复习间隔.
Args:
quality (int): 记忆保留率量化参数
algodata: 算法数据字典
feedback: 记忆保留率量化参数 (0-5), -1 表示跳过
is_new_activation: 是否为首次激活
"""
logger.debug(
"SM2.revisor 开始, feedback: %d, is_new_activation: %s",
@@ -107,6 +119,14 @@ class SM2Algorithm(BaseAlgorithm):
@classmethod
def is_due(cls, algodata):
"""判断是否应该复习
Args:
algodata: 算法数据字典
Returns:
True 表示到期, False 表示未到期
"""
result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
logger.debug(
"SM2.is_due: next_date=%d, current_daystamp=%d, result=%s",
@@ -118,12 +138,28 @@ class SM2Algorithm(BaseAlgorithm):
@classmethod
def get_rating(cls, algodata):
"""获取当前 efactor 作为评分信息
Args:
algodata: 算法数据字典
Returns:
efactor 值的字符串表示
"""
efactor = algodata[cls.algo_name]["efactor"]
logger.debug("SM2.rate: efactor=%f", efactor)
return str(efactor)
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次复习日期
Args:
algodata: 算法数据字典
Returns:
下次复习的天数戳
"""
next_date = algodata[cls.algo_name]["next_date"]
logger.debug("SM2.nextdate: %d", next_date)
return next_date
+32 -2
View File
@@ -1,19 +1,41 @@
class Evalizer:
"""几乎无副作用的模板系统
接受环境信息并创建一个模板解析工具, 工具传入参数支持list, dict及其嵌套
副作用问题: 仅存在于 eval 函数
接受环境信息并创建一个模板解析工具, 递归遍历数据结构,
"eval:" 前缀的字符串执行表达式求值.
副作用问题: 仅存在于 eval 函数调用.
Attributes:
env: 模板求值时的环境变量字典
"""
# TODO: 弃用风险极高的 eval
# TODO: 异步/多线程执行避免堵塞
def __init__(self, environment: dict) -> None:
"""初始化模板解析器
Args:
environment: 模板求值时的环境变量字典
"""
self.env = environment
def __call__(self, anyobj):
"""调用入口, 等同于 travel"""
return self.travel(anyobj)
def travel(self, anyobj):
"""递归遍历数据结构并展开模板表达式
支持 list, dict, tuple 的嵌套结构.
字符串以 "eval:" 开头时执行表达式求值.
Args:
anyobj: 任意 Python 对象
Returns:
展开后的对象
"""
if isinstance(anyobj, list):
return list(map(self.travel, anyobj))
elif isinstance(anyobj, dict):
@@ -29,5 +51,13 @@ class Evalizer:
return anyobj
def eval_with_env(self, s: str):
"""在环境变量上下文中执行表达式
Args:
s: Python 表达式字符串
Returns:
表达式求值结果
"""
ret = eval(s, globals(), self.env)
return ret
+46 -1
View File
@@ -50,6 +50,11 @@ class Lict(MutableSequence):
self._list_dirty = False
def __getitem__(self, key):
"""按键或索引获取值
Args:
key: 字符串键 (字典访问) 或整数索引 (列表访问)
"""
if isinstance(key, str):
return self._dict[key]
else:
@@ -57,7 +62,12 @@ class Lict(MutableSequence):
return self._list[key]
def __setitem__(self, key, value):
"""传入键值对时等同于操作字典, 传入索引+元组时等用于替换某索引的列表值为新元组"""
"""按键或索引设置值
Args:
key: 字符串键 (字典设置) 或整数索引 (替换列表元组)
value: 新值, 索引访问时必须为 (key, value) 元组
"""
if isinstance(key, str):
self._dict[key] = value
self._list_dirty = True
@@ -81,14 +91,17 @@ class Lict(MutableSequence):
del self._dict[del_key]
def keys(self):
"""返回所有键"""
self._sync_if_needed()
return self._dict.keys()
def values(self):
"""返回所有值"""
self._sync_if_needed()
return self._dict.values()
def items(self):
"""返回所有键值对元组列表"""
self._sync_if_needed()
return self._list
@@ -105,6 +118,14 @@ class Lict(MutableSequence):
return item in self._list or item in self.keys() or item in self.values()
def append(self, item):
"""追加键值对元组
Args:
item: (key, value) 格式的元组
Raises:
NotImplementedError: item 不是二元组
"""
if item != (item[0], item[1]):
raise NotImplementedError
self._sync_if_needed() # 以防 forced_order
@@ -114,6 +135,14 @@ class Lict(MutableSequence):
self._sync_if_needed() # 以防 forced_order
def append_if_it_doesnt_exist_before(self, item: Any):
"""若键不存在则追加键值对
Args:
item: (key, value) 格式的元组
Raises:
NotImplementedError: item 不是二元组
"""
if item != (item[0], item[1]):
raise NotImplementedError
self._sync_if_needed()
@@ -162,9 +191,25 @@ class Lict(MutableSequence):
raise NotImplementedError
def get_itemic_unit(self, ident):
"""获取指定键的 (key, value) 元组
Args:
ident: 键名
Returns:
(key, value) 格式的元组
"""
return (ident, self._dict[ident])
def keys_equal_with(self, other):
"""比较两个 Lict 的键集合是否相同
Args:
other: 另一个 Lict 实例
Returns:
True 表示键集合相同
"""
self._sync_if_needed()
return self.key_equality(self, other)
+5
View File
@@ -1,3 +1,8 @@
"""粒子数据模型模块
定义记忆单元的核心数据结构: Nucleon (内容)Electron (状态)Atom (组装).
"""
from .atom import Atom
from .electron import Electron
from .nucleon import Nucleon
+2 -1
View File
@@ -70,7 +70,8 @@ class Electron:
try:
result = self.algo.get_rating(self.algodata)
return result
except:
except (KeyError, TypeError, AttributeError) as e:
logger.warning("获取评分失败 (ident=%s): %s", self.ident, e)
return 0
def nextdate(self) -> int:
+61 -2
View File
@@ -8,9 +8,27 @@ logger = get_logger(__name__)
class Nucleon:
"""原子核: 带有运行时隔离的模板化只读材料元数据容器"""
"""原子核: 带有运行时隔离的模板化只读材料元数据容器
封装记忆单元的内容数据, 通过 Evalizer 模板系统展开 payload common
中的动态表达式. 创建后数据不可修改.
Attributes:
ident: 记忆单元的唯一标识
data: 展开后的内容字典 (只读)
"""
def __init__(self, ident, payload, common):
"""初始化核子
合并 payload common, 通过 Evalizer 展开模板表达式.
展开失败时静默降级为原始数据.
Args:
ident: 记忆单元标识
payload: 记忆内容字典
common: 通用元数据字典
"""
self.ident = ident
try:
data_safe = deepcopy((payload | common))
@@ -36,31 +54,64 @@ class Nucleon:
self.data = payload | common
def __getitem__(self, key):
"""按字符串键获取数据
Args:
key: 字符串键名, "ident" 返回标识
Returns:
对应键的值
Raises:
AttributeError: 键类型不是字符串
"""
if isinstance(key, str):
if key == "ident":
return self.ident
return self.data[key]
else:
raise AttributeError
raise AttributeError(f"Nucleon 仅支持字符串键访问, 收到: {type(key).__name__}")
def __setitem__(self, key, value):
"""禁止修改 (只读容器)
Raises:
AttributeError: 始终抛出
"""
raise AttributeError("应为只读")
def __delitem__(self, key):
"""禁止删除 (只读容器)
Raises:
AttributeError: 始终抛出
"""
raise AttributeError("应为只读")
def __iter__(self):
"""迭代数据字典的键"""
return iter(self.data)
def __contains__(self, key):
"""检查键是否存在于数据中"""
return key in (self.data)
def get(self, key, default=None):
"""安全获取数据值
Args:
key: 键名
default: 键不存在时的默认值
Returns:
键对应的值或默认值
"""
if key in self:
return self[key]
return default
def __len__(self):
"""返回数据字典的长度"""
return len(self.data)
def __repr__(self):
@@ -71,6 +122,14 @@ class Nucleon:
@staticmethod
def from_data(nucleonic_data: tuple):
"""从元组数据创建核子
Args:
nucleonic_data: 格式为 (ident, (payload, common)) 的元组
Returns:
Nucleon 实例
"""
_data = nucleonic_data
payload = _data[1][0]
common = _data[1][1]
@@ -1,3 +1,8 @@
"""占位符模块
提供用于 UI 预览和测试的占位粒子对象, 避免空值错误.
"""
from .atom import Atom
from .electron import Electron
from .nucleon import Nucleon
@@ -21,6 +26,8 @@ orbital_placeholder = {
class NucleonPlaceholder(Nucleon):
"""核子占位符, 用于 UI 预览"""
def __init__(self):
super().__init__("__placeholder__", {}, {})
@@ -29,11 +36,15 @@ class NucleonPlaceholder(Nucleon):
class ElectronPlaceholder(Electron):
"""电子占位符, 用于 UI 预览"""
def __init__(self):
super().__init__("__placeholder__", {"": {"": ""}}, "")
class AtomPlaceholder(Atom):
"""原子占位符, 用于 UI 预览"""
def __init__(self):
super().__init__(
NucleonPlaceholder(), ElectronPlaceholder(), orbital_placeholder
+7
View File
@@ -1,3 +1,5 @@
"""猜测谜题模块 (预留)"""
from heurams.services.logger import get_logger
from .base import BasePuzzle
@@ -6,5 +8,10 @@ logger = get_logger(__name__)
class GuessPuzzle(BasePuzzle):
"""猜测型谜题 (预留实现)
要求用户猜测词义, 尚未完成实现.
"""
def __init__(self):
super().__init__()
+7 -4
View File
@@ -1,4 +1,4 @@
# mcq.py
"""识别谜题模块"""
from heurams.services.logger import get_logger
@@ -8,11 +8,14 @@ logger = get_logger(__name__)
class RecognitionPuzzle(BasePuzzle):
"""识别占位符"""
"""识别型谜题
展示内容供用户识别确认, 无需主动回忆.
常用于复习流程的最后阶段 (retronly 回溯模式).
"""
def __init__(self) -> None:
logger.debug("RecognitionPuzzle.__init__")
super().__init__()
def refresh(self):
logger.debug("RecognitionPuzzle.refresh(空实现)")
"""刷新谜题 (识别型无需生成内容)"""
+4 -1
View File
@@ -1,4 +1,7 @@
from heurams.services.logger import get_logger
"""反应器模块
基于三层嵌套状态机 (Router -> Procession -> Expander) 实现复习流程调度与排程.
"""
from .expander import Expander
from .router import Router
+38 -1
View File
@@ -14,7 +14,17 @@ logger = get_logger(__name__)
class Expander(Machine):
"""单原子调度展开器"""
"""单原子调度展开器
根据轨道策略 (orbital) 将单个原子展开为谜题序列.
包含 exammode (考试模式) retronly (回溯模式) 两个阶段.
Attributes:
atom: 关联的 Atom 实例
route: 当前路由阶段
puzzles_inf: 展开后的谜题信息列表
min_ratings: 每个谜题的最低评分记录
"""
def __init__(self, atom: pt.Atom, route=RouterState.RECOGNITION):
self.route = route
@@ -85,20 +95,47 @@ class Expander(Machine):
)
def get_puzzles_inf(self):
"""获取谜题信息列表
回溯模式下返回识别谜题, 否则返回展开的谜题列表.
Returns:
谜题信息字典列表
"""
if self.state == "retronly":
return [{"puzzle": puz.puzzles["recognition"], "alia": "Recognition"}]
return self.puzzles_inf
def get_current_puzzle_inf(self):
"""获取当前谜题信息
Returns:
当前谜题的信息字典
"""
if self.state == "retronly":
return {"puzzle": puz.puzzles["recognition"], "alia": "Recognition"}
return self.current_puzzle_inf
def report(self, rating):
"""报告当前谜题的评分
Args:
rating: 用户评分 (0-5)
"""
if self.puzzles_inf:
self.min_ratings[self.cursor] = min(rating, self.min_ratings[self.cursor])
def get_quality(self):
"""获取所有谜题的最低评分
仅在回溯模式 (retronly) 下可用.
Returns:
所有谜题评分的最小值
Raises:
IndexError: 非回溯模式下调用
"""
if self.puzzles_inf:
if self.is_state("retronly", self):
return reduce(lambda x, y: min(x, y), self.min_ratings)
+20
View File
@@ -100,20 +100,40 @@ class Procession(Machine):
return length
def process(self):
"""获取当前游标位置
Returns:
当前游标索引
"""
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
"""获取队列总长度 (含已处理的原子)
Returns:
原子总数
"""
total = len(self.atoms)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
"""判断队列是否为空
Returns:
True 表示队列为空
"""
empty = len(self.atoms) == 0
logger.debug("Procession.is_empty: %s", empty)
return empty
def get_expander(self):
"""获取当前原子的展开器
Returns:
Expander 实例
"""
return Expander(atom=self.current_atom, route=self.route) # type: ignore
def __repr__(self, style="pipe", ends="\n"):
+8
View File
@@ -113,6 +113,14 @@ class Router(Machine):
logger.debug("Router 进入 FINISHED 状态")
def current_procession(self):
"""获取当前未完成的队列
遍历所有队列, 返回第一个未完成的 Procession.
若全部完成则切换到 FINISHED 状态并返回占位队列.
Returns:
当前活跃的 Procession, 或占位 Procession (全部完成时)
"""
logger.debug("Router.current_procession 被调用")
for i in self.processions:
i: Procession
+11
View File
@@ -1,3 +1,8 @@
"""状态枚举定义
定义反应器三层状态机的所有状态值.
"""
from enum import Enum
from heurams.services.logger import get_logger
@@ -6,6 +11,8 @@ logger = get_logger(__name__)
class RouterState(Enum):
"""路由器状态: 全局复习阶段"""
UNSURE = "unsure"
QUICK_REVIEW = "quick_review"
RECOGNITION = "recognition"
@@ -14,11 +21,15 @@ class RouterState(Enum):
class ProcessionState(Enum):
"""队列状态: 单阶段进度"""
ACTIVE = "active"
FINISHED = "finished"
class ExpanderState(Enum):
"""展开器状态: 单原子调度模式"""
EXAMMODE = "exammode"
RETRONLY = "retronly"
+5
View File
@@ -1,3 +1,8 @@
"""仓库系统模块
提供记忆单元集的加载保存和管理功能.
"""
from .repo import Repo, RepoManifest
__all__ = ["Repo", "RepoManifest"]
+81 -11
View File
@@ -10,6 +10,15 @@ from heurams.kernel.auxiliary.lict import Lict
class RepoManifest(TypedDict):
"""仓库清单数据结构
Attributes:
title: 仓库标题
author: 作者名称
package: 包名标识
desc: 仓库描述
"""
title: str
author: str
package: str
@@ -17,8 +26,21 @@ class RepoManifest(TypedDict):
class Repo:
"""只维护仓库本身
上层 API 请访问此对象下粒子对象列表"""
"""记忆单元仓库
管理单个记忆单元集的所有数据, 包括内容 (payload)算法状态 (algodata)
复习策略 (schedule) 和元信息 (manifest/typedef).
上层 API 请访问此对象下的粒子对象列表 (nucleonic_data_lict ).
Attributes:
schedule: 复习策略字典 (轨道定义)
payload: 记忆内容 (Lict)
algodata: 算法状态数据 (Lict)
manifest: 仓库清单信息
typedef: 类型定义和谜题配置
source: 仓库来源目录路径
"""
file_mapping = {
"schedule": "schedule.toml",
@@ -67,12 +89,16 @@ class Repo:
}
try:
self.config.update(dict(config_var.get()["repo"][self.manifest["package"]]))
except:
except (KeyError, TypeError, ValueError):
pass
self._generate_particles_data()
def _generate_particles_data(self):
"""生成上层的粒子对象组和 API 交互, 会在 init 后自动调用"""
"""生成上层的粒子数据
payload 转换为 Nucleon 所需格式, 并为每个 ident 初始化
algodata 条目. 会在 __init__ 后自动调用.
"""
self.nucleonic_data_lict = Lict(
initlist=list(map(self._nucleonic_proc, self.payload))
)
@@ -113,7 +139,7 @@ class Repo:
with open(source / filename, "w") as f:
try:
dict_data = self.database[keyname].dicted_data
except:
except AttributeError:
dict_data = dict(self.database[keyname])
if filename.endswith("toml"):
toml.dump(dict_data, f)
@@ -128,7 +154,14 @@ class Repo:
@classmethod
def create_new_repo(cls, source=None):
"""创建新的空单元集"""
"""创建新的空单元集
Args:
source: 可选的仓库目录路径
Returns:
包含空数据的 Repo 实例
"""
default_database = {
"schedule": {},
"payload": Lict([]),
@@ -141,7 +174,20 @@ class Repo:
@classmethod
def from_repodir(cls, source: Path):
"""从目录创建单元集"""
"""从目录创建单元集
读取目录中的 TOML/JSON 文件并构建 Repo 实例.
Args:
source: 仓库目录路径
Returns:
Repo 实例
Raises:
FileNotFoundError: 目录缺少必要的文件
ValueError: 文件格式不支持
"""
database = {}
for keyname, filename in cls.file_mapping.items():
with open(source / filename, "r") as f:
@@ -163,23 +209,47 @@ class Repo:
@classmethod
def from_dict(cls, dictdata, source: Path | None = None):
"""从单一字典创建单元集"""
"""从单一字典创建单元集
Args:
dictdata: 包含 schedule/payload/algodata/manifest/typedef 的字典
source: 可选的仓库目录路径
Returns:
Repo 实例
"""
database = dictdata
database["source"] = source
return Repo(**database)
@classmethod
def check_repodir(cls, source: Path):
"""检测单元集目录合法性"""
"""检测单元集目录合法性
尝试从目录加载 Repo, 成功则视为合法.
Args:
source: 待检测的目录路径
Returns:
True 表示合法, False 表示不合法
"""
try:
cls.from_repodir(source)
return True
except:
except (FileNotFoundError, KeyError, ValueError, toml.TomlDecodeError):
return False
@classmethod
def probe_valid_repos_in_dir(cls, folder: Path):
"""返回一个合法的子目录 Path() 列表"""
"""扫描目录中的所有合法仓库子目录
Args:
folder: 待扫描的父目录路径
Returns:
合法仓库子目录的 Path 列表
"""
lst = list()
for i in folder.iterdir():
if i.is_dir():
+1 -1
View File
@@ -38,7 +38,7 @@ class BaseLLM:
return "BaseLLM 未实现具体功能"
async def chat_stream(self, messages: List[Dict[str, str]], **kwargs):
"""流式聊天可选实现
"""流式聊天 (可选实现)
Args:
messages: 消息列表
+1 -1
View File
@@ -22,7 +22,7 @@ class OpenAILLM(BaseLLM):
logger.debug("OpenAILLM 初始化完成: base_url=%s", self.base_url)
def _get_client(self):
"""获取 OpenAI 客户端延迟导入"""
"""获取 OpenAI 客户端 (延迟导入) """
if self._client is None:
try:
from openai import AsyncOpenAI
+4 -3
View File
@@ -5,7 +5,7 @@ from heurams.context import config_var
from pathlib import Path
import atexit
from heurams.services import timer
from heurams.services.exceptions import WTFException
from heurams.services.exceptions import AtticError
logger = get_logger(__name__)
@@ -34,7 +34,7 @@ class Attic:
self.ident = self.ident.replace("<DAYSTAMP>", str(timer.get_daystamp()))
self.ident = self.ident.replace("<TIMESTAMP>", str(timer.get_timestamp()))
if "<" in ident or ">" in ident:
raise WTFException
raise AtticError(f"Attic 标识 '{ident}' 中仍含有未替换的占位符")
# self.ident = get_md5(self.ident)
self.pklpath = atticdir / f"{self.ident}.pkl"
atexit.register(self.save)
@@ -43,7 +43,8 @@ class Attic:
try:
self.load()
return
except:
except (pkl.UnpicklingError, EOFError, ModuleNotFoundError, ImportError) as e:
logger.warning("Attic '%s' 加载失败, 将重建: %s", self.ident, e)
self.pklpath.unlink(missing_ok=True)
self.pklpath.touch(exist_ok=True)
+19 -10
View File
@@ -4,9 +4,9 @@ import toml
from collections import UserDict
from heurams.services.logger import get_logger
from heurams.services.exceptions import WTFException
from heurams.services.exceptions import ConfigError
# 我们的流程: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得
# 流程: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引无他用
logger = get_logger(__name__)
@@ -15,7 +15,7 @@ class ConfigDict(UserDict):
def __new__(cls, config_path: pathlib.Path, dict=None):
if dict:
raise WTFException("不要放默认值...")
raise ConfigError("ConfigDict 不接受默认字典参数")
# 规范化路径, 免得单例存在"别名"
path_key = config_path.resolve()
@@ -33,7 +33,7 @@ class ConfigDict(UserDict):
return
self._initialized = True
if dict:
raise WTFException("不要放默认值...")
raise ConfigError("ConfigDict 不接受默认字典参数")
super().__init__(dict)
logger = get_logger(__name__)
self.path = config_path
@@ -44,12 +44,14 @@ class ConfigDict(UserDict):
with open(self.path, "r+") as f: # TODO: 给这个做缓存
try:
self.data = toml.load(f)
except:
self._writable = True
except toml.TomlDecodeError as e:
logger.warning("配置文件解析失败: %s, 将以空配置运行", e)
self.data = {}
self.persist = lambda: False # 不修改错误的配置文件
self._writable = False
def __getitem__(self, key):
# 我们实现了先进的懒狗加载
# 实现懒加载
value = super().__getitem__(key)
if isinstance(value, pathlib.Path):
return ConfigDict(value)
@@ -59,10 +61,10 @@ class ConfigDict(UserDict):
return super().__contains__(key)
def __setitem__(self, key, value):
origvalue = super().__getitem__(key) # 所以不该访问不存在的对象
origvalue = super().__getitem__(key) # 所以不该访问不存在的对象
if isinstance(origvalue, ConfigDict):
if origvalue.path.is_dir():
raise WTFException("你怎么能变更目录配置的内容呢?!")
raise ConfigError("不允许变更目录配置的内容")
else:
# 对文件, 我们允许这种覆写存在
# 但是不准变类型
@@ -71,8 +73,12 @@ class ConfigDict(UserDict):
def update_index(
self,
): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性
): # 如果有人没事干在config里面创建指向config的符号链接 此函数会崩溃 但是不要修复: 需要这个符号链接特性并且真崩溃了绝对是用户故意的
for i in self.path.iterdir():
if i.name.startswith("."):
continue # 以防 OSX 与 dolphin 在目录产生隐藏废料
if i.name == "desktop.ini":
continue # 以防 Windows 产生废料
if i.name.startswith("_"):
if i.name == "_.toml" and not i.is_dir():
with open(self.path / "_.toml", "r+") as f:
@@ -87,6 +93,9 @@ class ConfigDict(UserDict):
logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro
def persist(self):
if not getattr(self, "_writable", True):
logger.warning("跳过写入: 配置文件解析失败, 避免覆盖损坏的文件")
return
if self.is_dir:
for i in self.data.keys():
j = self[i]
+9 -1
View File
@@ -1,2 +1,10 @@
class WTFException(Exception):
class HeurAMSError(Exception):
pass
class ConfigError(HeurAMSError):
pass
class AtticError(HeurAMSError):
pass
+1 -1
View File
@@ -139,7 +139,7 @@ class FavoriteManager:
return False
def get_all(self) -> List[FavoriteItem]:
"""获取所有收藏项按添加时间倒序"""
"""获取所有收藏项 (按添加时间倒序) """
return sorted(self._favorites, key=lambda x: x.added, reverse=True)
def get_by_repo(self, repo_path: str) -> List[FavoriteItem]:
+3 -3
View File
@@ -49,9 +49,9 @@ def daystamp_to_datetime(daystamp: int) -> datetime.datetime:
def datetime_to_daystamp(dt: datetime.datetime) -> int:
"""将 datetime 转换为日戳从 1970-01-01 起的天数
"""将 datetime 转换为日戳 (从 1970-01-01 起的天数)
接受带时区或 naive datetimenaive 视为 UTC
接受带时区或 naive datetime (naive 视为 UTC).
"""
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
if dt.tzinfo is None:
@@ -61,5 +61,5 @@ def datetime_to_daystamp(dt: datetime.datetime) -> int:
def get_now_datetime() -> datetime.datetime:
"""获取当前时间的 UTC datetime遵守时间覆盖"""
"""获取当前时间的 UTC datetime (遵守时间覆盖) """
return datetime.datetime.fromtimestamp(get_timestamp(), tz=datetime.timezone.utc)
+1 -1
View File
@@ -128,7 +128,7 @@ def csv_to_toml(csv_path, toml_path=None, random_seed=None):
# 添加section标题
toml_content.append(f"[{ident}]")
# 添加所有其他列作为键值对排除ident列
# 添加所有其他列作为键值对 (排除ident列)
for key, value in row.items():
if key == "ident":
continue