feat: 增加 ZMQ 调试服务器并完善设置功能

This commit is contained in:
2026-04-20 06:37:46 +08:00
parent 41af2ada45
commit d16ec03da6
11 changed files with 289 additions and 58 deletions

View File

@@ -1,6 +1,9 @@
from heurams.interface import *
from heurams.context import config_var
from heurams.services.logger import get_logger
import threading
import zmq
import pickle
logger = get_logger(__name__)
@@ -20,9 +23,43 @@ def environment_check():
print(f"找到 {i}")
logger.debug("环境检查完成")
def start_debug_server(app):
logger = get_logger("zmq_debug")
context = zmq.Context()
socket = context.socket(zmq.REP)
port = config_var.get()['global'].get('zmq_debug_port', 5555)
socket.bind(f"tcp://*:{port}")
logger.info(f"ZMQ Debug server started on port {port}")
first = 1
while True:
msg = socket.recv()
code = pickle.loads(msg)
namespace = {"app": app, "logger": logger, "config_var": config_var}
if first:
app.title += ' [调试已连接]'
first = 0
try:
# 先尝试 eval
result = eval(code, namespace)
socket.send(pickle.dumps(f"成功: {result}"))
except SyntaxError:
# 再尝试 exec
try:
exec(code, namespace)
socket.send(pickle.dumps(f"成功: 执行完成"))
except Exception as e:
socket.send(pickle.dumps(f"错误: {e}"))
except Exception as e:
socket.send(pickle.dumps(f"错误: {e}"))
def main():
environment_check()
app = HeurAMSApp()
if config_var.get()['global'].get('zmq_debug', False):
threading.Thread(target=start_debug_server, args=(app,), daemon=True).start()
app.run(inline=False)
if __name__ == "__main__":

View File

@@ -75,17 +75,17 @@ class SettingScreen(Screen):
a = self._get_subcfg(f"{parent_epath}.{i}")
if a:
lst.append(Collapsible(*a, title=i + f'\n{parent.get(f"_{i}_desc", "")}'))
elif f'_{i}_candidate' in parent:
elif f'_{i}_candidate' in parent: # 选择框模式
if isinstance(parent[f'_{i}_candidate'], dict):
lst.append(Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Select((f"{j} ({k})", j) for j, k in parent[f'_{i}_candidate'].items()),
Select(((f"{j} ({k})", j) for j, k in parent[f'_{i}_candidate'].items()), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")),
classes='container'
))
elif isinstance(parent[f'_{i}_candidate'], list):
lst.append(Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Select((j, j) for j in parent[f'_{i}_candidate']),
Select(((j, j) for j in parent[f'_{i}_candidate']), prompt=f'{parent.get(f"{i}", "")}', id=domize(f"{parent_epath}.{i}")),
classes='container'
))
else:
@@ -102,7 +102,7 @@ class SettingScreen(Screen):
elif isinstance(parent[i], bool):
lst.append(Horizontal(
Label(i + f'\n{parent.get(f"_{i}_desc", "")}'),
Switch(value=str(parent[i]), id=domize(f"{parent_epath}.{i}")),
Switch(value=parent[i], id=domize(f"{parent_epath}.{i}")),
classes='container'))
elif isinstance(parent[i], int):
lst.append(Horizontal(
@@ -122,6 +122,7 @@ class SettingScreen(Screen):
def action_go_back(self) -> None:
"""返回上一屏幕"""
config_var.get().persist()
self.app.pop_screen()
def action_quit_app(self) -> None:
@@ -132,12 +133,27 @@ class SettingScreen(Screen):
"""打开导航器"""
self.app.push_screen(NavigatorScreen())
def on_button_pressed(self, event: Button.Pressed) -> None:
logger.debug(f"event.button.id: {event.button.id}")
"""处理按钮点击事件"""
if str(event.button.id) == 'apply':
pass
if str(event.button.id) == 'openfolder':
pass
if str(event.button.id) == 'cancel':
pass
def on_input_changed(self, event: Input.Changed) -> None:
widget_id = event.input.id
if not widget_id:
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))
def on_switch_changed(self, event: Switch.Changed) -> None:
widget_id = event.switch.id
if not widget_id:
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))
def on_select_changed(self, event: Select.Changed) -> None:
widget_id = event.select.id
if not widget_id:
return
eepath = undomize(widget_id)
value = event.value
epath(config_var.get(), eepath, enable_modify=True, new_value=type(epath(config_var.get(), eepath))(value))

View File

@@ -9,17 +9,38 @@ from heurams.services.logger import get_logger
from heurams.services.exceptions import WTFException
# 我们的流程是: 找到文件名: 返回文件名里头的数据; 找不到: 继续查索引; 所以 self.data 除了存本级各种索引球用没得
# 递归就是这么吊
logger = get_logger(__name__)
class ConfigDict(UserDict): # 舒服了
_instances = {} # 必须使用单例模式, 不然有严重的多实例导致的配置无法持久化问题
def __new__(cls, config_path: pathlib.Path, dict=None):
if dict:
raise WTFException("不要放默认值...")
# 规范化路径, 免得单例存在"别名"
path_key = config_path.resolve()
if path_key in cls._instances:
return cls._instances[path_key]
instance = super().__new__(cls)
cls._instances[path_key] = instance
return instance
def __init__(self, config_path: pathlib.Path, dict = None): # 需要自己把自己提起来
# 避免重复初始化
if hasattr(self, '_initialized'):
return
self._initialized = True
if dict:
raise WTFException("不要放默认值...")
super().__init__(dict)
self.logger = get_logger(__name__)
logger = get_logger(__name__)
self.path = config_path
self.is_dir = self.path.is_dir()
if self.is_dir:
self.update_index() # 狗儿要唱狗儿歌
self.update_index()
else:
with open(self.path, 'r+') as f: #TODO: 给这个做缓存
try:
@@ -35,11 +56,6 @@ class ConfigDict(UserDict): # 舒服了
return ConfigDict(value)
return value
def converter(self, s):
if (self.path / s).exists:
return self.path / s
return self.path / s+'.toml'
def __contains__(self, key):
return super().__contains__(key)
@@ -67,22 +83,16 @@ class ConfigDict(UserDict): # 舒服了
if i.suffix == '.toml':
self.data[i.stem] = i
else:
self.logger.log(f"配置目录中有无效的文件 {i.stem}") # what's up bro
logger.debug(f"配置目录中有无效的文件 {i.stem}") # what's up bro
def persist(self):
if self.is_dir:
raise WTFException("不准这样浪费性能...")
for i in self.data.keys():
j = self[i]
if isinstance(j, ConfigDict):
j.persist()
logger.debug("完成配置持久化")
return
with open(self.path, 'w+') as f:
toml.dump(self.data, f)
def __del__(self):
if not self.is_dir:
self.persist() # 不准循环引用, 懂了吧
@staticmethod
def titleize(objt):
if isinstance(objt, pathlib.Path):
return objt.stem
else:
return objt
toml.dump(self.data, f)

View File

@@ -2,35 +2,59 @@ from heurams.services.config import ConfigDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def epath(dct, path: str = '', default=None, parents=False):
def epath(dct, path: str = '', default=None, parents=False, enable_modify=False, new_value=None):
if not path:
return dct
path = path.rstrip('.')
path = path.lstrip('.')
target = dct
for i in path.split('.'):
keys = path.split('.')
logger.debug(f"处理 EPATH {path}, {new_value}")
for idx, i in enumerate(keys):
is_last = (idx == len(keys) - 1)
# 处理字典键
logger.debug(f'处理 {i}, {(isinstance(target, dict) or isinstance(target, ConfigDict))} {i in target}')
if (isinstance(target, dict) or isinstance(target, ConfigDict)) and i in target:
target = target[i]
# 处理列表索引
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
idx = int(i[1:-1])
if 0 <= idx < len(target):
target = target[idx]
elif parents:
while len(target) <= idx:
target.append(None)
target[idx] = {}
target = target[idx]
if is_last and enable_modify:
# 最后一次循环执行修改
if (isinstance(target, dict) or isinstance(target, ConfigDict)):
target[i] = new_value
return new_value
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
idx_num = int(i[1:-1])
if 0 <= idx_num < len(target):
target[idx_num] = new_value
return new_value
elif parents:
while len(target) <= idx_num:
target.append(None)
target[idx_num] = new_value
return new_value
else:
return default
else:
return default
elif parents:
target[i] = {}
target = target[i]
else:
return default
if (isinstance(target, dict) or isinstance(target, ConfigDict)) and i in target:
target = target[i]
elif i.startswith('[') and i.endswith(']') and isinstance(target, (list, tuple)):
idx_num = int(i[1:-1])
if 0 <= idx_num < len(target):
target = target[idx_num]
elif parents:
while len(target) <= idx_num:
target.append(None)
target[idx_num] = {}
target = target[idx_num]
else:
return default
elif parents:
target[i] = {}
target = target[i]
else:
return default
return target

View File

@@ -0,0 +1,55 @@
import zmq
import pickle
import readline
import sys
class DebugClient:
def __init__(self, port=5555):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(f"tcp://localhost:{port}")
print(f"已连接到调试服务器 (端口 {port})")
print("输入Python代码并按回车执行, 输入 'exit' 退出")
print("可用变量: app, logger")
print("-" * 50)
def execute(self, code):
"""执行代码并返回结果"""
try:
self.socket.send(pickle.dumps(code))
response = pickle.loads(self.socket.recv())
return response
except Exception as e:
return f"连接错误: {e}"
def repl(self):
"""交互式REPL循环"""
self.execute('print("test")')
while True:
try:
# 获取用户输入
code = input(">>> ").strip()
if not code:
continue
if code.lower() in ['exit', 'quit']:
print("退出调试客户端")
break
# 执行代码
result = self.execute(code)
print(f"结果: {result}\n")
except KeyboardInterrupt:
print("\n退出调试客户端")
break
except EOFError:
break
if __name__ == "__main__":
# 从命令行参数获取端口
port = int(sys.argv[1]) if len(sys.argv) > 1 else 5555
client = DebugClient(port)
client.repl()