refactor: 对配置处理器和配置结构进行重构

This commit is contained in:
2026-04-20 01:44:43 +08:00
parent a8cd774123
commit d1a1fa193f
36 changed files with 1097 additions and 917 deletions

View File

@@ -1,32 +1,31 @@
# [调试] 将更改保存到文件
persist_to_file = 1 persist_to_file = 1
# [调试] 覆写时间, 设为 -1 以禁用
daystamp_override = -1 daystamp_override = -1
timestamp_override = -1 timestamp_override = -1
# [调试] 一键通过
quick_pass = true quick_pass = true
auto_pass = false
# [调试] 自动化测试模式(仅用于测试完整性) scheduled_num = 420
auto_pass = true timezone_offset = 28800
# 对于每个项目的默认新记忆原子数量
scheduled_num = 100
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm] [algorithm]
# default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS, FAST-0 default = "FAST-0"
default = 'FAST-0'
[puzzles] # 谜题默认配置 [paths]
data = "./data"
cache = "./data/cache"
config = "./data/config"
global = "./data/global"
repo = "./data/repo"
[services]
audio = "playsound"
tts = "edgetts"
llm = "openai"
sync = "webdav"
[sync]
[interface.memorizor]
autovoice = true
[puzzles.mcq] [puzzles.mcq]
max_riddles_num = 2 max_riddles_num = 2
@@ -34,30 +33,16 @@ max_riddles_num = 2
[puzzles.cloze] [puzzles.cloze]
min_denominator = 3 min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径 [providers.tts.edgetts]
data = "./data" voice = "zh-CN-XiaoxiaoNeural"
cache = "./data/cache"
config = "./data/config"
global = "./data/global"
repo = "./data/repo"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置 [providers.llm.openai]
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = "" url = ""
key = "" key = ""
[providers.sync.webdav] # WebDAV 同步设置 [providers.sync.webdav]
url = "" url = ""
username = "" username = ""
password = "" password = ""
remote_path = "/heurams/" remote_path = "/heurams/"
verify_ssl = true verify_ssl = true
[sync]

7
data/config/global.toml Normal file
View File

@@ -0,0 +1,7 @@
enable_built_in_interface = true
[paths]
data = "./data"
cache = "./data/cache"
config = "./data/config"
repo = "./data/repo"

View File

@@ -0,0 +1,6 @@
persist_to_file = true
quick_pass = true
auto_pass = false
scheduled_num = 420
algorithm = "NSP-0"
_algorithm_candidate = [ "SM-2", "SM-15M", "FSRS", "NSP-0", "None",]

View File

@@ -0,0 +1 @@
min_denominator = 3

View File

@@ -0,0 +1 @@
max_riddles_num = 2

View File

@@ -0,0 +1 @@
autovoice = true

View File

@@ -0,0 +1 @@
autovoice = true

View File

@@ -0,0 +1,24 @@
voice = "zh-CN-XiaoxiaoNeural"
[_voice_candidate]
zh-CN-XiaoxiaoNeural = "晓晓: 中文温柔女声"
zh-CN-XiaoyiNeural = "晓伊: 中文甜美女声"
zh-CN-XiaochenNeural = "晓辰: 中文知性女声"
zh-CN-XiaohanNeural = "晓涵: 中文优雅女声"
zh-CN-XiaomengNeural = "晓梦: 中文梦幻女声"
zh-CN-XiaomoNeural = "晓墨: 中文文艺女声"
zh-CN-XiaoqiuNeural = "晓秋: 中文成熟女声"
zh-CN-XiaoruiNeural = "晓睿: 中文智慧女声"
zh-CN-XiaoshuangNeural = "晓双: 中文活泼女声"
zh-CN-XiaoxuanNeural = "晓萱: 中文清新女声"
zh-CN-XiaoyanNeural = "晓颜: 中文柔美女声"
zh-CN-XiaoyouNeural = "晓悠: 中文悠扬女声"
zh-CN-XiaozhenNeural = "晓甄: 中文端庄女声"
zh-CN-YunxiNeural = "云希: 中文清朗男声"
zh-CN-YunyangNeural = "云扬: 中文阳光男声"
zh-CN-YunjianNeural = "云健: 中文稳重男声"
zh-CN-YunfengNeural = "云枫: 中文磁性男声"
zh-CN-YunhaoNeural = "云皓: 中文豪迈男声"
zh-CN-YunxiaNeural = "云夏: 中文热情男声"
zh-CN-YunyeNeural = "云野: 中文野性男声"
zh-CN-YunzeNeural = "云泽: 中文深沉男声"

View File

@@ -0,0 +1,8 @@
provider = "playsound"
[_provider_candidate]
playsound = "python 跨平台音频系统"
termux = "Android Termux 音频系统"
mpg123 = "通用音频系统, 依赖系统 mpg123"
pulseaudio = "高级音频路由系统"
none = "不使用音频"

View File

@@ -0,0 +1,5 @@
provider = "openai"
[_provider_candidate]
openai = "OpenAI 风格 API, 同时支持与其相容的模型服务 (如 deepseek)"
none = "不使用语言大模型"

View File

@@ -0,0 +1,6 @@
provider = "webdav"
[_provider_candidate]
webdav = "WebDAV 兼容网络文件系统 (包括 webdavs)"
official = "官方同步服务器"
none = "不使用同步服务器"

View File

@@ -0,0 +1,3 @@
daystamp_override = -1
timestamp_override = -1
timezone_offset = 28800

View File

@@ -0,0 +1,6 @@
provider = "edgetts"
[_provider_candidate]
edgetts = "微软神经网络语音合成, 依赖微软网络服务"
espeak = "低保真度本地语音合成"
none = "不使用文本转语音"

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@
import pathlib import pathlib
from contextvars import ContextVar from contextvars import ContextVar
from heurams.services.config import ConfigFile from heurams.services.config import ConfigDict
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
# 默认数据目录, 以包目录下的 data 为准 # 默认数据目录, 以包目录下的 data 为准
@@ -24,9 +24,9 @@ logger.debug(f"工作目录: {workdir}")
(workdir / "data" / "config").mkdir(parents=True, exist_ok=True) (workdir / "data" / "config").mkdir(parents=True, exist_ok=True)
config_var: ContextVar[ConfigFile].get = ContextVar( config_var: ContextVar[ConfigDict].get = ContextVar(
"config_var", "config_var",
default=ConfigFile(workdir / "data" / "config" / "config.toml"), default=ConfigDict(workdir / "data" / "config"),
) )
"""配置对象的全局引用对象.""" """配置对象的全局引用对象."""
@@ -41,7 +41,7 @@ class ConfigContext:
>>> get_daystamp() # 恢复原配置 >>> get_daystamp() # 恢复原配置
""" """
def __init__(self, config_provider: ConfigFile): def __init__(self, config_provider: ConfigDict):
self.config_provider = config_provider self.config_provider = config_provider
self._token = None self._token = None

View File

@@ -59,3 +59,15 @@ class HeurAMSApp(App):
def action_do_nothing(self): def action_do_nothing(self):
self.refresh() self.refresh()
# 移除烦人的 "rich traceback"
# Textual 官方不会管这破事, 写 Rich 写入脑了导致的
# 不知道哪来的自信改标准库的 traceback
# https://github.com/Textualize/textual/discussions/6255
def _fatal_error(self):
self._close_messages_no_wait()
raise self._exception
def panic(self, *args):
self._close_messages_no_wait()
raise self._exception

View File

@@ -10,7 +10,7 @@ def environment_check():
logger.debug("检查环境路径") logger.debug("检查环境路径")
subdir = ["cache/voice", "repo", "global", "config"] subdir = ["cache/voice", "repo", "global", "config"]
for i in subdir: for i in subdir:
i = Path(config_var.get()["paths"]["data"]) / i i = Path(config_var.get()["global"]["paths"]["data"]) / i
if not i.exists(): if not i.exists():
logger.info("创建目录: %s", i) logger.info("创建目录: %s", i)
print(f"创建 {i}") print(f"创建 {i}")

View File

@@ -58,8 +58,8 @@ class DashboardScreen(Screen):
Label( Label(
f"当前 UNIX 日时间戳: {timer.get_daystamp()}" f"当前 UNIX 日时间戳: {timer.get_daystamp()}"
), ),
Label(f"应用时区修正: UTC+{config_var.get()['timezone_offset'] / 3600}"), Label(f"应用时区修正: UTC+{config_var.get()['services']['timer']['timezone_offset'] / 3600}"),
Label(f"全局算法设置: {config_var.get()['algorithm']['default']}: {algorithms[config_var.get()['algorithm']['default']].desc}"), Label(f"全局算法设置: {config_var.get()['interface']['global']['algorithm']}: {algorithms[config_var.get()['interface']['global']['algorithm']].desc}"),
classes="column infview", classes="column infview",
), ),
Vertical( Vertical(
@@ -78,7 +78,7 @@ class DashboardScreen(Screen):
def _load_data(self): def _load_data(self):
self.repo_dirs = Repo.probe_valid_repos_in_dir( self.repo_dirs = Repo.probe_valid_repos_in_dir(
Path(config_var.get()["paths"]["data"]) / "repo" Path(config_var.get()['global']["paths"]["data"]) / "repo"
) )
for repo_dir in self.repo_dirs: for repo_dir in self.repo_dirs:
repo = Repo.create_from_repodir(repo_dir) repo = Repo.create_from_repodir(repo_dir)

View File

@@ -115,7 +115,7 @@ class FavoriteManagerScreen(Screen):
def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]: def _get_repo_info(self, repo_path: str, fav: FavoriteItem) -> Optional[dict]:
"""获取仓库信息(标题、原子内容预览)""" """获取仓库信息(标题、原子内容预览)"""
try: try:
data_repo = Path(config_var.get()["paths"]["data"]) / "repo" data_repo = Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_dir = data_repo / repo_path repo_dir = data_repo / repo_path
if not repo_dir.exists(): if not repo_dir.exists():
logger.warning("仓库目录不存在: %s", repo_dir) logger.warning("仓库目录不存在: %s", repo_dir)

View File

@@ -38,7 +38,7 @@ class MemScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""), ("0,1,2,3", "app.push_screen('about')", ""),
] ]
if config_var.get()["quick_pass"]: if config_var.get()['interface']['global']["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "正确应答")) BINDINGS.append(("k", "quick_pass", "正确应答"))
BINDINGS.append(("f", "quick_fail", "错误应答")) BINDINGS.append(("f", "quick_fail", "错误应答"))
rating = reactive(-1) rating = reactive(-1)
@@ -93,7 +93,7 @@ class MemScreen(Screen):
if self.repo is not None: if self.repo is not None:
fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏" fav_status = "已收藏" if self._is_current_atom_favorited() else "未收藏"
s += f"收藏: {fav_status}\n" s += f"收藏: {fav_status}\n"
if config_var.get().get("debug_topline", 0): '''if config_var.get().get("debug_topline", 0):
try: try:
alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore alia = self.fission.get_current_puzzle_inf()["alia"] # type: ignore
s += f"谜题: {alia}\n" s += f"谜题: {alia}\n"
@@ -113,7 +113,7 @@ class MemScreen(Screen):
stat = self.fission.__repr__("simple", "") stat = self.fission.__repr__("simple", "")
s += f"{stat}\n" s += f"{stat}\n"
except Exception as e: except Exception as e:
s = str(e) s = str(e)'''
s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}" s += f"进度: {self.procession.process() + 1}/{self.procession.total_length()}"
return s return s
@@ -139,9 +139,9 @@ class MemScreen(Screen):
i.remove() i.remove()
from heurams.interface.widgets.finished import Finished from heurams.interface.widgets.finished import Finished
if config_var.get().get("persist_to_file", 0): if config_var.get()['interface']['global']["persist_to_file"]:
self.save_func() self.save_func()
container.mount(Finished(is_saved=config_var.get().get("persist_to_file", 0))) container.mount(Finished(is_saved=['interface']['global']["persist_to_file"]))
def on_button_pressed(self, event): def on_button_pressed(self, event):
event.stop() event.stop()
@@ -156,7 +156,7 @@ class MemScreen(Screen):
from heurams.services.audio_service import play_by_path from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5 from heurams.services.hasher import get_md5
path = Path(config_var.get()["paths"]["data"]) / "cache" / "voice" path = Path(config_var.get()['global']["paths"]["data"]) / "cache" / "voice"
path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav" path = path / f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav"
if path.exists(): if path.exists():
play_by_path(path) play_by_path(path)
@@ -226,7 +226,7 @@ class MemScreen(Screen):
return "" return ""
# self.repo.source 是 Path 对象,指向仓库目录 # self.repo.source 是 Path 对象,指向仓库目录
repo_full_path = self.repo.source repo_full_path = self.repo.source
data_repo_path = Path(config_var.get()["paths"]["data"]) / "repo" data_repo_path = Path(config_var.get()['global']["paths"]["data"]) / "repo"
try: try:
rel_path = repo_full_path.relative_to(data_repo_path) rel_path = repo_full_path.relative_to(data_repo_path)
return str(rel_path) return str(rel_path)

View File

@@ -13,7 +13,7 @@ import heurams.services.hasher as hasher
from heurams.context import * from heurams.context import *
# 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache # 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache
paths = config_var.get()["paths"] paths = config_var.get()['global']["paths"]
cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice" cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice"
@@ -61,7 +61,7 @@ class PrecachingScreen(Screen):
"""获取所有仓库的总单元数""" """获取所有仓库的总单元数"""
from heurams.context import config_var from heurams.context import config_var
from heurams.kernel.repolib import Repo from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()["paths"]["data"]) / "repo" repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs) repos = map(Repo.create_from_repodir, repo_dirs)
total = 0 total = 0
@@ -230,7 +230,7 @@ class PrecachingScreen(Screen):
from heurams.context import config_var, rootdir, workdir from heurams.context import config_var, rootdir, workdir
from heurams.kernel.repolib import Repo from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()["paths"]["data"]) / "repo" repo_path = pathlib.Path(config_var.get()['global']["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path) repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.create_from_repodir, repo_dirs) repos = map(Repo.create_from_repodir, repo_dirs)

View File

@@ -28,7 +28,7 @@ class PreparationScreen(Screen):
("0,1,2,3", "app.push_screen('about')", ""), ("0,1,2,3", "app.push_screen('about')", ""),
] ]
scheduled_num = reactive(config_var.get()["scheduled_num"]) scheduled_num = reactive(config_var.get()['interface']['global']["scheduled_num"])
def __init__(self, repo: Repo, repostat: dict) -> None: def __init__(self, repo: Repo, repostat: dict) -> None:
super().__init__(name=None, id=None, classes=None) super().__init__(name=None, id=None, classes=None)
@@ -41,7 +41,7 @@ class PreparationScreen(Screen):
with ScrollableContainer(id="vice_container"): with ScrollableContainer(id="vice_container"):
yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n") yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n")
yield Label( yield Label(
f"仓库路径: {config_var.get()['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]" f"仓库路径: {config_var.get()['global']['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]"
) )
yield Label(f"\n单元数量: {len(self.repo)}\n") yield Label(f"\n单元数量: {len(self.repo)}\n")
yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label") yield Label(f"最小记忆分组: {self.scheduled_num}\n", id="schnum_label")
@@ -130,7 +130,7 @@ class PreparationScreen(Screen):
def launch(repo, app, scheduled_num): def launch(repo, app, scheduled_num):
if scheduled_num == -1: if scheduled_num == -1:
scheduled_num = config_var.get()["scheduled_num"] scheduled_num = config_var.get()['interface']['global']["scheduled_num"]
atoms = list() atoms = list()
for i in repo.ident_index: for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data( n = pt.Nucleon.create_on_nucleonic_data(

View File

@@ -51,7 +51,7 @@ class Recognition(BasePuzzleWidget):
def compose(self): def compose(self):
from heurams.context import config_var from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"] autovoice = config_var.get()["interface"]["widgets"]["autovoice"]
if autovoice: if autovoice:
self.screen.action_play_voice() # type: ignore self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["nucleon"]["puzzles"][self.alia] cfg: RecognitionConfig = self.atom.registry["nucleon"]["puzzles"][self.alia]

View File

@@ -1,18 +1,18 @@
from .base import BaseAlgorithm from .base import BaseAlgorithm
from .sm2 import SM2Algorithm from .sm2 import SM2Algorithm
from .sm15m import SM15MAlgorithm from .sm15m import SM15MAlgorithm
from .fast0 import FAST0Algorithm from .nsp0 import NSP0Algorithm
__all__ = [ __all__ = [
"SM2Algorithm", "SM2Algorithm",
"BaseAlgorithm", "BaseAlgorithm",
"SM15MAlgorithm", "SM15MAlgorithm",
"FAST0Algorithm", "NSP0Algorithm",
] ]
algorithms = { algorithms = {
"SM-2": SM2Algorithm, "SM-2": SM2Algorithm,
"FAST-0": FAST0Algorithm, "NSP-0": NSP0Algorithm,
"SM-15M": SM15MAlgorithm, "SM-15M": SM15MAlgorithm,
"Base": BaseAlgorithm, "Base": BaseAlgorithm,
} }

View File

@@ -8,8 +8,8 @@ from .base import BaseAlgorithm
logger = get_logger(__name__) logger = get_logger(__name__)
class FAST0Algorithm(BaseAlgorithm): class NSP0Algorithm(BaseAlgorithm):
algo_name = "FAST-0" algo_name = "NSP-0"
desc = '快速筛选用特殊调度器' desc = '快速筛选用特殊调度器'
class AlgodataDict(TypedDict): class AlgodataDict(TypedDict):
real_rept: int real_rept: int
@@ -36,7 +36,7 @@ class FAST0Algorithm(BaseAlgorithm):
def revisor( def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
): ):
"""FAST-0 算法迭代决策机制实现 """NSP-0 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔 根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估 quality 由主程序评估
@@ -44,7 +44,7 @@ class FAST0Algorithm(BaseAlgorithm):
quality (int): 记忆保留率量化参数 quality (int): 记忆保留率量化参数
""" """
logger.debug( logger.debug(
"FAST0.revisor 开始, feedback: %d, is_new_activation: %s", "NSP0.revisor 开始, feedback: %d, is_new_activation: %s",
feedback, feedback,
is_new_activation, is_new_activation,
) )
@@ -71,7 +71,7 @@ class FAST0Algorithm(BaseAlgorithm):
def is_due(cls, algodata): def is_due(cls, algodata):
result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp() result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
logger.debug( logger.debug(
"FAST0.is_due: next_date=%d, current_daystamp=%d, result=%s", "NSP0.is_due: next_date=%d, current_daystamp=%d, result=%s",
algodata[cls.algo_name]["next_date"], algodata[cls.algo_name]["next_date"],
timer.get_daystamp(), timer.get_daystamp(),
result, result,
@@ -81,11 +81,11 @@ class FAST0Algorithm(BaseAlgorithm):
@classmethod @classmethod
def get_rating(cls, algodata): def get_rating(cls, algodata):
efactor = algodata[cls.algo_name]["efactor"] efactor = algodata[cls.algo_name]["efactor"]
logger.debug("FAST0.rate: efactor=%f", efactor) logger.debug("NSP0.rate: efactor=%f", efactor)
return str(efactor) return str(efactor)
@classmethod @classmethod
def nextdate(cls, algodata) -> int: def nextdate(cls, algodata) -> int:
next_date = algodata[cls.algo_name]["next_date"] next_date = algodata[cls.algo_name]["next_date"]
logger.debug("FAST0.nextdate: %d", next_date) logger.debug("NSP0.nextdate: %d", next_date)
return next_date return next_date

View File

@@ -27,7 +27,7 @@ from heurams.kernel.algorithms.sm15m_calc import (
# 全局状态文件路径 # 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser( _GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()["paths"]["data"]) pathlib.Path(config_var.get()['global']["paths"]["data"])
/ "global" / "global"
/ "sm15m_global_state.json" / "sm15m_global_state.json"
) )

View File

@@ -19,14 +19,14 @@ class Nucleon:
data_safe['puzzles'] = {} data_safe['puzzles'] = {}
env = { env = {
"payload": data_safe, "payload": data_safe,
"default": config_var.get()["puzzles"], "default": config_var.get()['interface']["puzzles"],
"nucleon": data_safe, "nucleon": data_safe,
} }
self.evalizer = Evalizer(environment=env) self.evalizer = Evalizer(environment=env)
data_safe = self.evalizer(deepcopy(data_safe)) data_safe = self.evalizer(deepcopy(data_safe))
env = { env = {
"payload": data_safe, "payload": data_safe,
"default": config_var.get()["puzzles"], "default": config_var.get()['interface']["puzzles"],
"nucleon": data_safe, "nucleon": data_safe,
} }
self.evalizer = Evalizer(environment=env) self.evalizer = Evalizer(environment=env)

View File

@@ -7,7 +7,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path play_by_path: Callable = prov[config_var.get()["services"]["audio"]['provider']].play_by_path
logger.debug( logger.debug(
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"] "音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]['provider']
) )

View File

@@ -0,0 +1,127 @@
"""配置文件服务"""
import pathlib
import typing
import toml
from heurams.services.logger import get_logger
default_config = {
"persist_to_file": 1, # 将更改保存到文件
"daystamp_override": -1, # 覆写时间, 设为 -1 以禁用
"timestamp_override": -1, # 覆写时间, 设为 -1 以禁用
"quick_pass": 1, # 启用用于测试的快速通过
"scheduled_num": 8, # 对于每个项目的默认新记忆原子数量
"timezone_offset": 28800, # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
# 28800 是中国标准时间 (UTC+8)
"interface": {
"memorizor": {
"autovoice": True # 自动语音播放, 仅限于 recognition 组件
}
},
"algorithm": {
"default": "SM-2" # 主要算法
# 可选项: SM-2, SM-15M, FSRS
},
"puzzles": { # 谜题默认配置
"mcq": {
"max_riddles_num": 2
},
"cloze": {
"min_denominator": 3
}
},
"paths": { # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
"data": "./data"
},
"services": { # 定义服务到提供者的映射
"audio": "playsound", # 可选项: playsound(通用), termux(仅用于 Android Termux), mpg123(TODO)
"tts": "edgetts", # 可选项: edgetts
"llm": "openai", # 可选项: openai
"sync": "webdav" # 可选项: 留空, webdav
},
"providers": {
"tts": {
"edgetts": { # EdgeTTS 设置
"voice": "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
}
},
"llm": {
"openai": { # 与 OpenAI 相容的语言模型接口服务设置
"url": "",
"key": ""
}
},
"sync": {
"webdav": { # WebDAV 同步设置
"url": "",
"username": "",
"password": "",
"remote_path": "/heurams/",
"verify_ssl": True
}
}
},
}
class ConfigFile:
def __init__(self, path: pathlib.Path):
self.logger = get_logger(__name__)
self.path = path
self.data = dict()
if not self.path.exists():
self.path.touch()
self.logger.debug("创建配置文件: %s", self.path)
self.data = default_config
self.valid_configfile = 1
# 考虑到可能临时编辑格式错误, 所以不覆写格式错误的配置文件, 而是提示损坏并使用默认配置
self._load()
def _load(self):
"""从文件加载配置数据"""
with open(self.path, "r") as f:
try:
self.data = toml.load(f)
self.logger.debug("配置文件加载成功: %s", self.path)
except toml.TomlDecodeError as e:
print(f"{e}")
self.logger.error("TOML解析错误: %s", e)
self.data = default_config
self.valid_configfile = 0
def modify(self, key: str, value: typing.Any):
"""修改配置值并保存"""
self.data[key] = value
self.logger.debug("修改配置项: %s = %s", key, value)
self.save()
def save(self, path: typing.Union[str, pathlib.Path] = ""):
"""保存配置到文件"""
if self.valid_configfile:
save_path = pathlib.Path(path) if path else self.path
with open(save_path, "w") as f:
toml.dump(self.data, f)
self.logger.debug("配置文件已保存: %s", save_path)
else:
pass
def get(self, key: str, default: typing.Any = None) -> typing.Any:
"""获取配置值, 如果不存在返回默认值"""
return self.data.get(key, default)
def __getitem__(self, key: str) -> typing.Any:
return self.data[key]
def __setitem__(self, key: str, value: typing.Any):
self.data[key] = value
self.save()
def __contains__(self, key: str) -> bool:
"""支持 in 语法"""
return key in self.data

View File

@@ -1,127 +1,80 @@
"""配置文件服务"""
import pathlib import pathlib
import typing import typing
import toml import toml
from collections import UserDict
import atexit
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
from heurams.services.exceptions import WTFException
default_config = { # 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得
"persist_to_file": 1, # 将更改保存到文件 # 递归就是这么吊
"daystamp_override": -1, # 覆写时间, 设为 -1 以禁用 class ConfigDict(UserDict): # 舒服了
"timestamp_override": -1, # 覆写时间, 设为 -1 以禁用 def __init__(self, config_path: pathlib.Path, dict = None): # 需要自己把自己提起来
"quick_pass": 1, # 启用用于测试的快速通过 if dict:
"scheduled_num": 8, # 对于每个项目的默认新记忆原子数量 raise WTFException("不要放默认值...")
"timezone_offset": 28800, # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒 super().__init__(dict)
# 28800 是中国标准时间 (UTC+8)
"interface": {
"memorizor": {
"autovoice": True # 自动语音播放, 仅限于 recognition 组件
}
},
"algorithm": {
"default": "SM-2" # 主要算法
# 可选项: SM-2, SM-15M, FSRS
},
"puzzles": { # 谜题默认配置
"mcq": {
"max_riddles_num": 2
},
"cloze": {
"min_denominator": 3
}
},
"paths": { # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
"data": "./data"
},
"services": { # 定义服务到提供者的映射
"audio": "playsound", # 可选项: playsound(通用), termux(仅用于 Android Termux), mpg123(TODO)
"tts": "edgetts", # 可选项: edgetts
"llm": "openai", # 可选项: openai
"sync": "webdav" # 可选项: 留空, webdav
},
"providers": {
"tts": {
"edgetts": { # EdgeTTS 设置
"voice": "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
}
},
"llm": {
"openai": { # 与 OpenAI 相容的语言模型接口服务设置
"url": "",
"key": ""
}
},
"sync": {
"webdav": { # WebDAV 同步设置
"url": "",
"username": "",
"password": "",
"remote_path": "/heurams/",
"verify_ssl": True
}
}
},
}
class ConfigFile:
def __init__(self, path: pathlib.Path):
self.logger = get_logger(__name__) self.logger = get_logger(__name__)
self.path = path self.path = config_path
self.data = dict() self.is_dir = self.path.is_dir()
if not self.path.exists(): if self.is_dir:
self.path.touch() self.update_index() # 狗儿要唱狗儿歌
self.logger.debug("创建配置文件: %s", self.path)
self.data = default_config
self.valid_configfile = 1
# 考虑到可能临时编辑格式错误, 所以不覆写格式错误的配置文件, 而是提示损坏并使用默认配置
self._load()
def _load(self):
"""从文件加载配置数据"""
with open(self.path, "r") as f:
try:
self.data = toml.load(f)
self.logger.debug("配置文件加载成功: %s", self.path)
except toml.TomlDecodeError as e:
print(f"{e}")
self.logger.error("TOML解析错误: %s", e)
self.data = default_config
self.valid_configfile = 0
def modify(self, key: str, value: typing.Any):
"""修改配置值并保存"""
self.data[key] = value
self.logger.debug("修改配置项: %s = %s", key, value)
self.save()
def save(self, path: typing.Union[str, pathlib.Path] = ""):
"""保存配置到文件"""
if self.valid_configfile:
save_path = pathlib.Path(path) if path else self.path
with open(save_path, "w") as f:
toml.dump(self.data, f)
self.logger.debug("配置文件已保存: %s", save_path)
else: else:
pass with open(self.path, 'r+') as f: #TODO: 给这个做缓存
try:
self.data = toml.load(f)
except:
self.data = {}
self.persist = lambda: False # 不修改错误的配置文件
def __getitem__(self, key):
# 我们实现了先进的懒狗加载
value = super().__getitem__(key)
if isinstance(value, pathlib.Path):
return ConfigDict(value)
return value
def get(self, key: str, default: typing.Any = None) -> typing.Any: def converter(self, s):
"""获取配置值, 如果不存在返回默认值""" if (self.path / s).exists:
return self.data.get(key, default) return self.path / s
return self.path / s+'.toml'
def __getitem__(self, key: str) -> typing.Any: def __contains__(self, key):
return self.data[key] if isinstance(key, str):
key = self.converter(key)
return super().__contains__(key)
def __setitem__(self, key: str, value: typing.Any): def __setitem__(self, key, value):
self.data[key] = value if isinstance(key, str):
self.save() key = self.converter(key)
origvalue = super().__getitem__(key) # 所以你不该访问不存在的对象
if isinstance(origvalue, ConfigDict):
if origvalue.path.is_dir():
raise WTFException("你怎么能变更目录配置的内容呢?!")
else:
# 对文件, 我们允许这种覆写存在
# 但是不准变类型
origvalue.data = value
super().__setitem__(key, value)
def __contains__(self, key: str) -> bool: def update_index(self): # 如果有人没事干在config里面创建指向config的符号链接 这玩意会崩溃 但是不要修复: 需要这个符号链接特性
"""支持 in 语法""" for i in self.path.iterdir():
return key in self.data if i.is_dir():
self.data[i.name] = i
else:
if i.suffix == '.toml':
self.data[i.stem] = i
else:
self.logger.log(f"配置目录中有无效的文件 {i.stem}") # what's up bro
def persist(self):
if self.is_dir:
raise WTFException("不准这样浪费性能...")
with open(self.path, 'w+') as f:
toml.dump(self.data, f)
def __del__(self):
if not self.is_dir:
self.persist() # 不准循环引用, 懂了吧

View File

@@ -0,0 +1,30 @@
def epath(dct, path: str = '', default=None, parents=False):
if not path:
return dct
path = path.rstrip('/')
target = dct
for i in path.split('/'):
# 处理字典键
if isinstance(target, dict) and i in target:
target = target[i]
# 处理列表索引
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
idx = int(i[1:-1])
if 0 <= idx < len(target):
target = target[idx]
elif parents:
while len(target) <= idx:
target.append(None)
target[idx] = {}
target = target[idx]
else:
return default
elif parents:
target[i] = {}
target = target[i]
else:
return default
return target

View File

@@ -0,0 +1,4 @@
from heurams.services.logger import get_logger
class WTFException(Exception):
pass

View File

@@ -63,7 +63,7 @@ class FavoriteManager:
def _get_file_path(self) -> Path: def _get_file_path(self) -> Path:
"""获取收藏文件路径""" """获取收藏文件路径"""
config_path = Path(config_var.get()["paths"]["data"]) config_path = Path(config_var.get()['global']["paths"]["data"])
fav_path = config_path / "global" / "favorites.json" fav_path = config_path / "global" / "favorites.json"
fav_path.parent.mkdir(parents=True, exist_ok=True) fav_path.parent.mkdir(parents=True, exist_ok=True)
return fav_path return fav_path

View File

@@ -9,12 +9,12 @@ logger = get_logger(__name__)
def get_daystamp() -> int: def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)""" """获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get().get("daystamp_override", -1) time_override = config_var.get()['services']["timer"]["daystamp_override"]
if time_override != -1: if time_override != -1:
logger.debug("使用覆盖的日戳: %d", time_override) logger.debug("使用覆盖的日戳: %d", time_override)
return int(time_override) return int(time_override)
result = int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600)) result = int((time.time() + config_var.get()['services']["timer"]["timezone_offset"]) // (24 * 3600))
logger.debug("计算日戳: %d", result) logger.debug("计算日戳: %d", result)
return result return result
@@ -22,7 +22,7 @@ def get_daystamp() -> int:
def get_timestamp() -> float: def get_timestamp() -> float:
"""获取 UNIX 时间戳""" """获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作 # 搞这个类的原因是要支持可复现操作
time_override = config_var.get().get("timestamp_override", -1) time_override = config_var.get()['services']["timer"]["timestamp_override"]
if time_override != -1: if time_override != -1:
logger.debug("使用覆盖的时间戳: %f", time_override) logger.debug("使用覆盖的时间戳: %f", time_override)
return float(time_override) return float(time_override)

View File

@@ -7,7 +7,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
convertor: Callable = prov[config_var.get()["services"]["tts"]].convert convertor: Callable = prov[config_var.get()['services']["tts"]["provider"]].convert
logger.debug( logger.debug(
"TTS 服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"] "TTS 服务初始化完成, 使用 provider: %s", config_var.get()['services']["tts"]["provider"]
) )