feat: 改进粒子对象

This commit is contained in:
2026-01-01 06:36:27 +08:00
parent 8358e2532a
commit af63cceba9
19 changed files with 74 additions and 133 deletions

View File

@@ -3,7 +3,6 @@ import pathlib
import typing
from typing import TypedDict
import bidict
import toml
from heurams.context import config_var
@@ -20,13 +19,11 @@ class AtomRegister_runtime(TypedDict):
min_rate: int # 最低评分
new_activation: bool # 新激活
class AtomRegister(TypedDict):
nucleon: Nucleon
electron: Electron
runtime: AtomRegister_runtime
class Atom:
"""
统一处理一系列对象的所有信息与持久化:
@@ -50,61 +47,12 @@ class Atom:
"orbital": orbital_obj,
"runtime": dict(),
}
self.init_runtime()
if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["new_activation"] = True
def init_runtime(self):
self.registry['runtime'] = AtomRegister_runtime(**self.default_runtime)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
nucleon = self.registry["nucleon"]
payload = nucleon # 兼容历史遗留问题
metadata = nucleon # 兼容历史遗留问题
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
try:
traverse(self.registry["nucleon"].payload, eval_with_env)
traverse(self.registry["nucleon"].metadata, eval_with_env)
traverse(self.registry["orbital"], eval_with_env)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning(ret)
logger.debug("EVAL 完成")
def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
@@ -143,22 +91,9 @@ class Atom:
logger.debug("禁止总评分")
def __getitem__(self, key):
logger.debug("Atom.__getitem__: key='%s'", key)
if key in self.registry:
value = self.registry[key]
logger.debug("返回 value type: %s", type(value).__name__)
return value
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
return self.registry[key]
def __setitem__(self, key, value):
logger.debug(
"Atom.__setitem__: key='%s', value type: %s", key, type(value).__name__
)
if key in self.registry:
self.registry[key] = value
logger.debug("'%s' 已设置", key)
else:
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
if key == "ident":
raise AttributeError("应为只读")
self.registry[key] = value

View File

@@ -7,13 +7,6 @@ import heurams.kernel.algorithms as algolib
from copy import deepcopy
logger = get_logger(__name__)
class QueryType(TypedDict):
is_due: bool
is_activated: bool
rating: int
nextdate: int
lastdate: int
class Electron:
"""电子: 单算法支持的记忆数据包装"""
@@ -30,9 +23,8 @@ class Electron:
self.algodata = algodata
self.ident = ident
self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
self.query = dict()
if self.algo.check_integrity(self.algodata):
if not self.algo.check_integrity(self.algodata):
self.algodata[self.algo.algo_name] = deepcopy(self.algo.defaults)
def activate(self):

View File

@@ -1,38 +1,36 @@
from heurams.services.logger import get_logger
from copy import deepcopy
from heurams.utils.evalizor import Evalizer
logger = get_logger(__name__)
class Nucleon:
"""原子核: 带有运行时隔离的只读材料元数据容器
"""原子核: 带有运行时隔离的模板化只读材料元数据容器
"""
def __init__(self, ident, payload, common):
self.ident = ident
self.payload = payload
self.common = common
self.rtlayer = dict() # 运行时层
env = {
"payload": payload
}
self.evalizer = Evalizer(environment=env)
self.data = self.evalizer(deepcopy((payload | common)))
def __getitem__(self, key):
if key == "ident":
return self.ident
merged = self.rtlayer | self.payload | self.common
return merged[key]
return self.data[key]
def __setitem__(self, key, value):
if key == "ident":
raise AttributeError("ident 应为只读")
else:
self.rtlayer[key] = value
raise AttributeError("应为只读")
def __delitem__(self, key):
raise AttributeError("Nucleon 包含的数据被设计为无法删除")
raise AttributeError("应为只读")
def __iter__(self):
merged = self.rtlayer | self.payload | self.common
return iter(merged)
return iter(self.data)
def __contains__(self, key):
return key in (self.rtlayer | self.payload | self.common)
return key in (self.data)
def get(self, key, default=None):
if key in self:
@@ -40,12 +38,10 @@ class Nucleon:
return default
def __len__(self):
return len(self.rtlayer | self.payload | self.common)
return len(self.data)
def __repr__(self):
return f"""RUNTIME:{repr(self.rtlayer)}
PAYLOAD:{repr(self.payload)}
COMMON:{repr(self.common)}"""
return repr(self.data)
@staticmethod
def create_on_nucleonic_data(nucleonic_data: tuple):