feat: 改进对象系统

This commit is contained in:
2026-01-01 20:18:18 +08:00
parent d25e398701
commit bd313db5f2
39 changed files with 682 additions and 180 deletions

View File

@@ -1,8 +1,8 @@
from heurams.services.logger import get_logger
from .base import BaseAlgorithm
from .sm2 import SM2Algorithm
from .sm15m import SM15MAlgorithm
from .base import BaseAlgorithm
logger = get_logger(__name__)

View File

@@ -75,4 +75,3 @@ class BaseAlgorithm:
return 1
except:
return 0

View File

@@ -6,6 +6,7 @@ from .base import BaseEvaluator
logger = get_logger(__name__)
class GuessEvaluator(BaseEvaluator):
def __init__(self):
super().__init__()
super().__init__()

View File

@@ -1,4 +1,4 @@
from .atom import Atom
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
from .orbital import Orbital

View File

@@ -5,7 +5,6 @@ from typing import TypedDict
import toml
from heurams.context import config_var
from heurams.services.logger import get_logger
from .electron import Electron
@@ -19,11 +18,13 @@ class AtomRegister_runtime(TypedDict):
min_rate: int # 最低评分
new_activation: bool # 新激活
class AtomRegister(TypedDict):
nucleon: Nucleon
electron: Electron
runtime: AtomRegister_runtime
class Atom:
"""
统一处理一系列对象的所有信息与持久化:
@@ -39,10 +40,10 @@ class Atom:
"new_activation": False,
}
def __init__(self, nucleon_obj = None, electron_obj = None, orbital_obj = None):
self.ident = nucleon_obj["ident"] # type: ignore
def __init__(self, nucleon_obj=None, electron_obj=None, orbital_obj=None):
self.ident = nucleon_obj["ident"] # type: ignore
self.registry: AtomRegister = { # type: ignore
"ident": nucleon_obj["ident"], # type: ignore
"ident": nucleon_obj["ident"], # type: ignore
"nucleon": nucleon_obj,
"electron": electron_obj,
"orbital": orbital_obj,
@@ -53,7 +54,7 @@ class Atom:
self.registry["runtime"]["new_activation"] = True
def init_runtime(self):
self.registry['runtime'] = AtomRegister_runtime(**self.default_runtime)
self.registry["runtime"] = AtomRegister_runtime(**self.default_runtime)
def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
@@ -97,4 +98,4 @@ class Atom:
def __setitem__(self, key, value):
if key == "ident":
raise AttributeError("应为只读")
self.registry[key] = value
self.registry[key] = value

View File

@@ -1,12 +1,14 @@
from copy import deepcopy
from typing import TypedDict
import heurams.kernel.algorithms as algolib
import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import algorithms
from heurams.services.logger import get_logger
import heurams.kernel.algorithms as algolib
from copy import deepcopy
logger = get_logger(__name__)
class Electron:
"""电子: 单算法支持的记忆数据包装"""
@@ -89,11 +91,11 @@ class Electron:
def __len__(self):
"""仅返回当前算法的配置数量"""
return len(self.algodata[self.algo.algo_name])
@staticmethod
def create_on_electonic_data(electronic_data: tuple, algo_name: str = ""):
_data = electronic_data
ident = _data[0]
algodata = _data[1]
ident = ident
return Electron(ident, algodata, algo_name)
return Electron(ident, algodata, algo_name)

View File

@@ -1,17 +1,17 @@
from heurams.services.logger import get_logger
from copy import deepcopy
from heurams.services.logger import get_logger
from heurams.utils.evalizor import Evalizer
logger = get_logger(__name__)
class Nucleon:
"""原子核: 带有运行时隔离的模板化只读材料元数据容器
"""
"""原子核: 带有运行时隔离的模板化只读材料元数据容器"""
def __init__(self, ident, payload, common):
self.ident = ident
env = {
"payload": payload
}
env = {"payload": payload}
self.evalizer = Evalizer(environment=env)
self.data = self.evalizer(deepcopy((payload | common)))
@@ -19,10 +19,10 @@ class Nucleon:
if key == "ident":
return self.ident
return self.data[key]
def __setitem__(self, key, value):
raise AttributeError("应为只读")
def __delitem__(self, key):
raise AttributeError("应为只读")
@@ -31,12 +31,12 @@ class Nucleon:
def __contains__(self, key):
return key in (self.data)
def get(self, key, default=None):
if key in self:
return self[key]
return default
def __len__(self):
return len(self.data)
@@ -48,5 +48,5 @@ class Nucleon:
_data = nucleonic_data
payload = _data[1][0]
common = _data[1][1]
ident = _data[0] #TODO:实现eval
return Nucleon(ident, payload, common)
ident = _data[0] # TODO:实现eval
return Nucleon(ident, payload, common)

View File

@@ -1,14 +1,16 @@
from heurams.utils.lict import Lict
from heurams.utils.evalizor import Evalizer
from heurams.utils.lict import Lict
class Orbital():
class Orbital:
@classmethod
def create_orbital(cls, schedule: list, phase_def: dict):
phase_def = Lict(initdict=phase_def) # type: ignore
phase_def = Lict(initdict=phase_def) # type: ignore
orbital = Lict()
for i in schedule:
orbital[i] = Lict(phase_def[i])
return orbital
@classmethod
def create_orbital_on_orbitic_data(cls, orbitic_data):
return cls.create_orbital(orbitic_data["schedule"], orbitic_data["phases"])
return cls.create_orbital(orbitic_data["schedule"], orbitic_data["phases"])

View File

@@ -1,7 +1,7 @@
import random
import heurams.kernel.particles as pt
import heurams.kernel.evaluators as puz
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState

View File

@@ -1,5 +1,5 @@
from ...utils.lict import Lict
def merge(x: Lict, y: Lict):
return Lict(list(x.values()) + list(y.values()))

View File

@@ -1,3 +1,3 @@
class Navi():
class Navi:
def __init__(self, init) -> None:
pass
pass

View File

@@ -1,11 +1,15 @@
import json
from functools import reduce
from pathlib import Path
import heurams.kernel.particles as pt
import toml
import json
import heurams.kernel.particles as pt
from ...utils.lict import Lict
class Repo():
class Repo:
file_mapping = {
"schedule": "schedule.toml",
"payload": "payload.toml",
@@ -24,13 +28,21 @@ class Repo():
default_save_list = ["algodata"]
def __init__(self, schedule: dict, payload: Lict, manifest: dict, typedef: dict, algodata: Lict, source = None) -> None:
def __init__(
self,
schedule: dict,
payload: Lict,
manifest: dict,
typedef: dict,
algodata: Lict,
source=None,
) -> None:
self.schedule: dict = schedule
self.manifest: dict = manifest
self.typedef: dict = typedef
self.payload: Lict = payload
self.algodata: Lict = algodata
self.source: Path | None = source # 若存在, 指向 repo 所在 dir
self.source: Path | None = source # 若存在, 指向 repo 所在 dir
self.database = {
"schedule": self.schedule,
"payload": self.payload,
@@ -42,35 +54,43 @@ class Repo():
self.generate_particles_data()
def generate_particles_data(self):
self.nucleonic_data_lict = Lict(initlist=list(map(self._attach(self.typedef), self.payload)))
self.nucleonic_data_lict = Lict(
initlist=list(map(
self._nucleonic_proc,
self.payload))
)
self.electronic_data_lict = self.algodata
self.orbitic_data = self.schedule
@staticmethod
def _attach(value):
def inner(x):
return (x, value)
return inner
def _nucleonic_proc(self, unit):
ident = unit[0]
common = self.typedef["common"]
return (ident, (unit[1], common))
@staticmethod
def _merge(value):
def inner(x):
return map(x, value)
return (x, value)
return inner
def __len__(self):
return len(self.payload)
def persist_to_repodir(self, save_list: list | None = None, source: Path | None= None):
def persist_to_repodir(
self, save_list: list | None = None, source: Path | None = None
):
if save_list == None:
save_list = self.default_save_list
if self.source != None and source == None:
source = self.source
if source == None:
raise FileNotFoundError("不存在仓库到文件的映射")
source.mkdir(parents=True, exist_ok=False)
for keyname in save_list:
filename = self.file_mapping[keyname]
with open(source / filename, 'w') as f:
with open(source / filename, "w") as f:
try:
dict_data = self.database[keyname].dicted_data
except:
@@ -81,22 +101,22 @@ class Repo():
json.dump(dict_data, f)
else:
raise ValueError(f"不支持的文件类型: {filename}")
def export_to_single_dict(self):
return self.database
@classmethod
def create_new_repo(cls, source = None):
def create_new_repo(cls, source=None):
default_database = {
"schedule": {},
"payload": Lict([]),
"algodata": Lict([]),
"manifest": {},
"typedef": {},
"source": source
"source": source,
}
return Repo(**default_database)
@classmethod
def create_from_repodir(cls, source: Path):
database = {}
@@ -117,17 +137,17 @@ class Repo():
raise ValueError(f"不支持的数据容器: {cls.type_mapping[keyname]}")
database["source"] = source
return Repo(**database)
@classmethod
def create_from_single_dict(cls, dictdata, source: Path | None = None):
database = dictdata
database["source"] = source
return Repo(**database)
@classmethod
def check_repodir(cls, source: Path):
try:
cls.create_from_repodir(source)
return 1
except:
return 0
return 0