perf: 脏标记优化 Lict 对象操作复杂度

This commit is contained in:
2026-04-20 18:43:27 +08:00
parent 417588acea
commit 2a47cad2d5
7 changed files with 267 additions and 103 deletions

View File

@@ -1,7 +1,7 @@
zmq_debug = true zmq_debug = true
_zmq_debug_desc = "[调试] ZMQ 调试服务器, 这会在 zmq_debug_port 上打开一个服务器\n调试工具可远程在 HeurAMS 内执行任意 python 代码, 无必要请关闭" _zmq_debug_desc = "[调试] ZeroMQ 调试服务器, 这会在 zmq_debug_port 上打开一个服务器\n调试工具可远程在 HeurAMS 内执行任意 python 代码, 无必要请关闭"
zmq_debug_port = 5555 zmq_debug_port = 5555
_zmq_debug_port_desc = "[调试] ZMQ 调试服务器端口" _zmq_debug_port_desc = "[调试] ZeroMQ 调试服务器端口"
enable_built_in_interface = false enable_built_in_interface = false
_enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)" _enable_built_in_interface_desc = "启用内置基本用户界面\n(当且仅当 HeurAMS 作为程序库时禁用, 以跳过用户界面逻辑)"
_paths_desc = "用户数据路径定义" _paths_desc = "用户数据路径定义"

View File

@@ -6,7 +6,7 @@ auto_pass = false
_auto_pass_desc = "[调试] 自动通过测试模式" _auto_pass_desc = "[调试] 自动通过测试模式"
scheduled_num = 420 scheduled_num = 420
_scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)" _scheduled_num_desc = "默认记忆单元数量(可被单元集设置覆盖)"
algorithm = "NSP-0" algorithm = "SM-2"
_algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)" _algorithm_desc = "默认记忆调度算法(可被单元集设置覆盖)"
[_algorithm_candidate] [_algorithm_candidate]

View File

@@ -105,20 +105,21 @@ class DashboardScreen(Screen):
repo.progress = { repo.progress = {
"total": repo.data_length, "total": repo.data_length,
"touched": 0, "touched": 0,
"have_activated_ever": 0,
} }
initial_time = float("inf") initial_time = float("inf")
for i in range(repo.data_length): for i in range(repo.data_length):
e = pt.Electron.from_data(electronic_data=repo.electronic_data_lict[i], algo_name=repo.config['algorithm']) e = pt.Electron.from_data(electronic_data=repo.electronic_data_lict[i], algo_name=repo.config['algorithm'])
n = pt.Nucleon.from_data(repo.nucleonic_data_lict[i]) n = pt.Nucleon.from_data(repo.nucleonic_data_lict[i])
if e.is_activated(): if e.is_activated():
repo.progress["have_activated_ever"] = 1
repo.progress["touched"] += 1 repo.progress["touched"] += 1
repo.nearest_review_time = min(repo.nearest_review_time, e.nextdate()) repo.nearest_review_time = min(repo.nearest_review_time, e.nextdate())
# initial_time = min(initial_time, e.) # initial_time = min(initial_time, e.)
repo.need_review = timer.get_daystamp() >= repo.nearest_review_time repo.need_review = timer.get_daystamp() >= repo.nearest_review_time
repo.prompt = f"""{repo.manifest['title']} ({repo.config['algorithm']}) repo.prompt = f"""{repo.manifest['title']} ({repo.config['algorithm']})
进度: {repo.progress['touched']}/{repo.progress['total']} ({round(repo.progress['touched']/repo.progress['total']*100, 1)}%) [d]进度: {repo.progress['touched']}/{repo.progress['total']} ({round(repo.progress['touched']/repo.progress['total']*100, 1)}%)[/d]
{'需要' if repo.need_review else "无需操作"} [d]{'需要' if repo.need_review else ("暂未开始" if not repo.progress['have_activated_ever'] else '无需操作')}[/d]"""
"""
def on_mount(self) -> None: def on_mount(self) -> None:
"""挂载组件时初始化""" """挂载组件时初始化"""

View File

@@ -1,124 +1,139 @@
from collections import UserList from collections.abc import MutableSequence
from typing import Any, Iterator from typing import Any, Iterator, Optional
class Lict(MutableSequence):
class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n) def __init__(self, initlist: Optional[list] = None,
""" "列典" 对象 initdict: Optional[dict] = None,
forced_order: bool = False):
同时兼容字典和列表大多数 API, 两边数据同步的容器 self._dict = {}
列表数据是 dictobj.items() 的格式 self._list = []
支持根据字典或列表初始化 self._dict_dirty = False
限制要求: self._list_dirty = False
- 键名一定唯一, 且仅能为字符串
- 值一定是引用对象
- 不使用并发
- 不在乎列表顺序语义(严格按键名字符序排列)和列表索引查找, 因此外部的 sort, index 等功能不可用
- append 的元组中, 表示键名的元素不能重复, 否则会导致覆盖行为
只有在 Python 3.7+ 中, forced_order 行为才能被取消.
"""
def __init__(
self,
initlist: list | None = None,
initdict: dict | None = None,
forced_order=False,
):
self.dicted_data = {}
if initdict != None:
initlist = list(initdict.items())
super().__init__(initlist=initlist)
self.forced_order = forced_order self.forced_order = forced_order
self._sync_based_on_list()
if self.forced_order:
self.data.sort()
def _sync_based_on_dict(self): if initdict: # initdict 更优先
self.data = list(self.dicted_data.items()) self._dict = initdict.copy()
if self.forced_order: self._list_dirty = True
self.data.sort() elif initlist:
self._list = initlist.copy()
self._dict_dirty = True
def _sync_based_on_list(self): self._sync_if_needed()
self.dicted_data = {}
for i in self.data:
self.dicted_data[i[0]] = i[1]
def __iter__(self) -> Iterator: def _sync_if_needed(self):
return self.data.__iter__() """只在需要时同步"""
if self._dict_dirty:
self._dict = {k: v for k, v in self._list}
if self.forced_order:
self._list.sort()
self._dict_dirty = False
def __getitem__(self, i): if self._list_dirty:
if isinstance(i, str): self._list = list(self._dict.items())
return self.dicted_data[i] if self.forced_order:
self._list.sort()
self._list_dirty = False
def __getitem__(self, key):
if isinstance(key, str):
return self._dict[key]
else: else:
return super().__getitem__(i) self._sync_if_needed()
return self._list[key]
def get_itemic_unit(self, ident): def __setitem__(self, key, value):
return (ident, self.dicted_data[ident]) """传入键值对时等同于操作字典, 传入索引+元组时等用于替换某索引的列表值为新元组"""
if isinstance(key, str):
def __setitem__(self, i, item): self._dict[key] = value
if isinstance(i, str): self._list_dirty = True
self.dicted_data[i] = item
self._sync_based_on_dict()
else: else:
if item != (item[0], item[1]): if value != (value[0], value[1]):
raise NotImplementedError raise NotImplementedError
super().__setitem__(i, item) self._sync_if_needed()
self._sync_based_on_list() old_key = self._list[key][0]
self._dict.pop(old_key)
self._list[key] = value
self._dict[value[0]] = value[1] # 避免全量同步
def __delitem__(self, i): def __delitem__(self, key):
if isinstance(i, str): if isinstance(key, str): # 字符串键
del self.dicted_data[i] del self._dict[key]
self._sync_based_on_dict() self._list_dirty = True
else: else: # 数字索引
super().__delitem__(i) self._sync_if_needed()
self._sync_based_on_list() del_key = self._list[key][0]
del self._list[key]
del self._dict[del_key]
def keys(self):
self._sync_if_needed()
return self._dict.keys()
def values(self):
self._sync_if_needed()
return self._dict.values()
def items(self):
self._sync_if_needed()
return self._list
def __len__(self):
self._sync_if_needed()
return len(self._list)
def __iter__(self):
self._sync_if_needed()
return iter(self._list)
def __contains__(self, item): def __contains__(self, item):
return item in self.data or item in self.keys() or item in self.values() self._sync_if_needed()
return item in self._list or item in self.keys() or item in self.values()
def append(self, item: Any) -> None: def append(self, item):
if item != (item[0], item[1]): if item != (item[0], item[1]):
raise NotImplementedError raise NotImplementedError
super().append(item) self._sync_if_needed() # 以防 forced_order
self._sync_based_on_list() key, value = item
if self.forced_order: self._dict[key] = value
self.data.sort() self._list.append(item) # 两端都已同步
self._sync_if_needed() # 以防 forced_order
def append_new(self, item: Any): def append_if_it_donesnt_exist_before(self, item: Any):
if item != (item[0], item[1]): if item != (item[0], item[1]):
raise NotImplementedError raise NotImplementedError
self._sync_if_needed()
if item[0] not in self: if item[0] not in self:
super().append(item) super().append(item)
self._sync_based_on_list() self._sync_if_needed()
if self.forced_order:
self.data.sort()
def insert(self, i: int, item: Any) -> None: def insert(self, i: int, item: Any) -> None:
if item != (item[0], item[1]): # 确保 item 是遵从限制的元组 if item != (item[0], item[1]):
raise NotImplementedError raise NotImplementedError
super().insert(i, item) self._sync_if_needed()
self._sync_based_on_list() self._list.insert(i, item)
if self.forced_order: self._dict_dirty = True
self.data.sort()
def pop(self, i: int = -1) -> Any: def pop(self, i: int = -1) -> Any:
res = super().pop(i) self._sync_if_needed()
self._sync_based_on_list() res = self._list.pop(i)
self._dict_dirty = True
return res return res
def remove(self, item: Any) -> None: def remove(self, item: Any) -> None:
if isinstance(item, str): if isinstance(item, str):
item = (item, self.dicted_data[item]) item = (item, self._dict[item])
if item != (item[0], item[1]): if item != (item[0], item[1]):
raise NotImplementedError raise NotImplementedError
super().remove(item) self._sync_if_needed()
self._sync_based_on_list() self._list.remove(item)
if self.forced_order: self._dict_dirty = True
self.data.sort()
def clear(self) -> None: def clear(self) -> None:
super().clear() self._dict.clear()
self._sync_based_on_list() self._list.clear()
self._dict_dirty = False
self._list_dirty = False
def index(self): def index(self):
raise NotImplementedError raise NotImplementedError
@@ -133,15 +148,22 @@ class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n)
raise NotImplementedError raise NotImplementedError
def keys(self): def keys(self):
return self.dicted_data.keys() self._sync_if_needed()
return self._dict.keys()
def values(self): def values(self):
return self.dicted_data.values() self._sync_if_needed()
return self._dict.values()
def items(self): def items(self):
return self.data self._sync_if_needed()
return self._list
def get_itemic_unit(self, ident):
return (ident, self._dict[ident])
def keys_equal_with(self, other): def keys_equal_with(self, other):
self._sync_if_needed()
return self.key_equality(self, other) return self.key_equality(self, other)
@classmethod @classmethod

View File

@@ -0,0 +1,140 @@
from collections import UserList
from typing import Any, Iterator
class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n)
""" "列典" 对象
同时兼容字典和列表大多数 API, 两边数据同步的容器
列表数据是 dictobj.items() 的格式
支持根据字典或列表初始化
限制要求:
- 键名一定唯一, 且仅能为字符串
- 值一定是引用对象
- 不使用并发
- 不在乎列表顺序语义(严格按键名字符序排列)和列表索引查找, 因此外部的 sort, index 等功能不可用
- append 的元组中, 表示键名的元素不能重复, 否则会导致覆盖行为
"""
def __init__(
self,
initlist: list | None = None,
initdict: dict | None = None,
):
self.dicted_data = {}
if initdict != None:
initlist = list(initdict.items())
super().__init__(initlist=initlist)
self._sync_based_on_list()
self.data.sort()
def _sync_based_on_dict(self):
self.data = list(self.dicted_data.items())
self.data.sort()
def _sync_based_on_list(self):
self.dicted_data = {}
for i in self.data:
self.dicted_data[i[0]] = i[1]
def __iter__(self) -> Iterator:
return self.data.__iter__()
def __getitem__(self, i):
if isinstance(i, str):
return self.dicted_data[i]
else:
return super().__getitem__(i)
def get_itemic_unit(self, ident):
return (ident, self.dicted_data[ident])
def __setitem__(self, i, item):
if isinstance(i, str):
self.dicted_data[i] = item
self._sync_based_on_dict()
else:
if item != (item[0], item[1]):
raise NotImplementedError
super().__setitem__(i, item)
self._sync_based_on_list()
def __delitem__(self, i):
if isinstance(i, str):
del self.dicted_data[i]
self._sync_based_on_dict()
else:
super().__delitem__(i)
self._sync_based_on_list()
def __contains__(self, item):
return item in self.data or item in self.keys() or item in self.values()
def append(self, item: Any) -> None:
if item != (item[0], item[1]):
raise NotImplementedError
super().append(item)
self._sync_based_on_list()
self.data.sort()
def append_new(self, item: Any):
if item != (item[0], item[1]):
raise NotImplementedError
if item[0] not in self:
super().append(item)
self._sync_based_on_list()
self.data.sort()
def insert(self, i: int, item: Any) -> None:
if item != (item[0], item[1]): # 确保 item 是遵从限制的元组
raise NotImplementedError
super().insert(i, item)
self._sync_based_on_list()
self.data.sort()
def pop(self, i: int = -1) -> Any:
res = super().pop(i)
self._sync_based_on_list()
return res
def remove(self, item: Any) -> None:
if isinstance(item, str):
item = (item, self.dicted_data[item])
if item != (item[0], item[1]):
raise NotImplementedError
super().remove(item)
self._sync_based_on_list()
self.data.sort()
def clear(self) -> None:
super().clear()
self._sync_based_on_list()
def index(self):
raise NotImplementedError
def extend(self):
raise NotImplementedError
def sort(self):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def keys(self):
return self.dicted_data.keys()
def values(self):
return self.dicted_data.values()
def items(self):
return self.data
def keys_equal_with(self, other):
return self.key_equality(self, other)
@classmethod
def key_equality(cls, a, b):
return a.keys() == b.keys()

View File

@@ -5,6 +5,7 @@ import heurams.kernel.algorithms as algolib
import heurams.services.timer as timer import heurams.services.timer as timer
from heurams.kernel.algorithms import algorithms from heurams.kernel.algorithms import algorithms
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
from heurams.context import config_var
logger = get_logger(__name__) logger = get_logger(__name__)

View File

@@ -82,7 +82,7 @@ class Repo:
self.data_length = len(self.nucleonic_data_lict) self.data_length = len(self.nucleonic_data_lict)
self.ident_index = self.nucleonic_data_lict.keys() self.ident_index = self.nucleonic_data_lict.keys()
for i in self.ident_index: for i in self.ident_index:
self.algodata.append_new((i, {})) self.algodata.append_if_it_donesnt_exist_before((i, {}))
self.electronic_data_lict = self.algodata self.electronic_data_lict = self.algodata
def _nucleonic_proc(self, unit): def _nucleonic_proc(self, unit):