feat: 完成部分界面重构

This commit is contained in:
2026-01-02 06:12:49 +08:00
parent 238013ac46
commit 64c9a5c1a7
26 changed files with 700 additions and 323 deletions

View File

@@ -20,7 +20,7 @@ from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
# 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
pathlib.Path(config_var.get()["paths"]["data"]) / 'global' / "sm15m_global_state.json"
)

View File

@@ -21,7 +21,7 @@ class Electron:
algo_name: 使用的算法模块标识
"""
if algo_name == "":
algo_name = "sm-2"
algo_name = "SM-2"
self.algodata = algodata
self.ident = ident
self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
@@ -29,6 +29,11 @@ class Electron:
if not self.algo.check_integrity(self.algodata):
self.algodata[self.algo.algo_name] = deepcopy(self.algo.defaults)
def __repr__(self):
from pprint import pformat
s = pformat(self.algodata, indent=4)
return s
def activate(self):
"""激活此电子"""
self.algodata[self.algo.algo_name]["is_activated"] = 1

View File

@@ -41,7 +41,9 @@ class Nucleon:
return len(self.data)
def __repr__(self):
return repr(self.data)
from pprint import pformat
s = pformat(self.data, indent=4)
return s
@staticmethod
def create_on_nucleonic_data(nucleonic_data: tuple):

View File

@@ -1,12 +0,0 @@
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -1,51 +1,24 @@
# 移相器类定义
from enum import Enum
from typing import Any, Sequence, Type
from transitions import Machine, State, Event, EventData
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
class Phaser(Machine):
def __init__(self, schedule: list):
state_words = ["init"] + schedule.copy()
state_objects = list()
for name in state_words:
state_objects.append(State(
name=name,
on_enter=["on_enter"]
))
Machine.__init__(self, states=state_objects, initial="init", send_event=True)
self.add_ordered_transitions(loop=False)
from .procession import Procession
from .states import PhaserState, ProcessionState
def on_enter(self, event_data: EventData):
print(event_data.transition.source, "->", event_data.transition.dest) # type: ignore
logger = get_logger(__name__)
class Phaser:
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0
if __name__ == "__main__":
p = Phaser(["a", "b"])
p.next_state()
p.next_state()
print(p.state)

View File

@@ -1,74 +1,24 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from enum import Enum
from typing import Any, Sequence, Type
from transitions import Machine, State, Event, EventData
from .states import PhaserState, ProcessionState
class Phaser(Machine):
def __init__(self, schedule: list):
state_words = ["init"] + schedule.copy()
state_objects = list()
for name in state_words:
state_objects.append(State(
name=name,
on_enter=["on_enter"]
))
Machine.__init__(self, states=state_objects, initial="init", send_event=True)
self.add_ordered_transitions(loop=False)
logger = get_logger(__name__)
def on_enter(self, event_data: EventData):
print(event_data.transition.source, "->", event_data.transition.dest) # type: ignore
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty
if __name__ == "__main__":
p = Phaser(["a", "b"])
p.next_state()
p.next_state()
print(p.state)

View File

@@ -0,0 +1,12 @@
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -0,0 +1,51 @@
# 移相器类定义
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Phaser:
"""全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0

View File

@@ -0,0 +1,74 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty

View File

@@ -1,6 +1,7 @@
import json
from functools import reduce
from pathlib import Path
from typing import TypedDict
import toml
@@ -8,6 +9,10 @@ import heurams.kernel.particles as pt
from ...utils.lict import Lict
class RepoManifest(TypedDict):
title: str
author: str
desc: str
class Repo:
file_mapping = {
@@ -38,7 +43,7 @@ class Repo:
source=None,
) -> None:
self.schedule: dict = schedule
self.manifest: dict = manifest
self.manifest: RepoManifest = manifest # type: ignore
self.typedef: dict = typedef
self.payload: Lict = payload
self.algodata: Lict = algodata
@@ -60,8 +65,11 @@ class Repo:
self._nucleonic_proc,
self.payload))
)
self.electronic_data_lict = self.algodata
self.orbitic_data = self.schedule
self.ident_index = self.nucleonic_data_lict.keys()
for i in self.ident_index:
self.algodata.append_new((i, {}))
self.electronic_data_lict = self.algodata
def _nucleonic_proc(self, unit):
ident = unit[0]
@@ -78,6 +86,11 @@ class Repo:
def __len__(self):
return len(self.payload)
def __repr__(self):
from pprint import pformat
s = pformat(self.database, indent=4)
return s
def persist_to_repodir(
self, save_list: list | None = None, source: Path | None = None
):
@@ -151,3 +164,12 @@ class Repo:
return 1
except:
return 0
@classmethod
def probe_vaild_repos_in_dir(cls, folder: Path):
lst = list()
for i in folder.iterdir():
if i.is_dir():
if cls.check_repodir(i):
lst.append(i)
return lst