"""同步工具界面""" import pathlib import time from textual.app import ComposeResult from textual.containers import Horizontal, ScrollableContainer from textual.screen import Screen from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static from textual.worker import get_current_worker import heurams.kernel.particles as pt import heurams.services.hasher as hasher from heurams.context import * class SyncScreen(Screen): BINDINGS = [("q", "go_back", "返回")] def __init__(self, nucleons: list = [], desc: str = ""): super().__init__(name=None, id=None, classes=None) self.sync_service = None self.is_syncing = False self.is_paused = False self.log_messages = [] self.max_log_lines = 50 def compose(self) -> ComposeResult: yield Header(show_clock=True) with ScrollableContainer(id="sync_container"): # 标题和连接状态 yield Static("同步工具", classes="title") yield Static("", id="status_label", classes="status") # 配置信息 yield Static(f"同步协议: {config_var.get()['services']['sync']}") yield Static("服务器配置:", classes="section_title") with Horizontal(classes="config_info"): yield Static("远程服务器:", classes="config_label") yield Static("", id="server_url", classes="config_value") with Horizontal(classes="config_info"): yield Static("远程路径:", classes="config_label") yield Static("", id="remote_path", classes="config_value") with Horizontal(classes="control_buttons"): yield Button("测试连接", id="test_connection", variant="primary") yield Button("开始同步", id="start_sync", variant="success") yield Button("暂停", id="pause_sync", variant="warning", disabled=True) yield Button("取消", id="cancel_sync", variant="error", disabled=True) yield Static("同步进度", classes="section_title") yield ProgressBar(id="progress_bar", show_percentage=True, total=100) yield Static("", id="progress_label", classes="progress_text") yield Static("同步日志", classes="section_title") yield Static("", id="log_output", classes="log_output") yield Footer() def on_mount(self): """挂载时初始化状态""" self.update_ui_from_config() self.log_message("同步工具已启动") def update_ui_from_config(self): """更新 UI 显示配置信息""" try: sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"] # 更新服务器 URL url = sync_cfg.get("url", "未配置") url_widget = self.query_one("#server_url") url_widget.update(url) # type: ignore # 更新远程路径 remote_path = sync_cfg.get("remote_path", "/") path_widget = self.query_one("#remote_path") path_widget.update(remote_path) # type: ignore # 更新状态标签 status_widget = self.query_one("#status_label") if self.sync_service and self.sync_service.client: status_widget.update("✅ 同步服务已就绪") # type: ignore status_widget.add_class("ready") else: status_widget.update("❌ 同步服务未配置或未启用") # type: ignore status_widget.add_class("error") except Exception as e: self.log_message(f"更新 UI 失败: {e}", is_error=True) def update_status(self, status, current_item="", progress=None): """更新状态显示""" try: status_widget = self.query_one("#status_label") status_widget.update(status) # type: ignore if progress is not None: progress_bar = self.query_one("#progress_bar") progress_bar.progress = progress # type: ignore progress_label = self.query_one("#progress_label") progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore except Exception as e: self.log_message(f"更新状态失败: {e}", is_error=True) def log_message(self, message: str, is_error: bool = False): """添加日志消息并更新显示""" timestamp = time.strftime("%H:%M:%S") prefix = "[ERROR]" if is_error else "[INFO]" log_line = f"{timestamp} {prefix} {message}" self.log_messages.append(log_line) # 保持日志行数不超过最大值 if len(self.log_messages) > self.max_log_lines: self.log_messages = self.log_messages[-self.max_log_lines :] # 更新日志显示 try: log_widget = self.query_one("#log_output") log_widget.update("\n".join(self.log_messages)) # type: ignore except Exception: pass # 如果组件未就绪,忽略错误 def on_button_pressed(self, event: Button.Pressed) -> None: """处理按钮点击事件""" button_id = event.button.id if button_id == "test_connection": self.test_connection() elif button_id == "start_sync": self.start_sync() elif button_id == "pause_sync": self.pause_sync() elif button_id == "cancel_sync": self.cancel_sync() event.stop() def test_connection(self): """测试 WebDAV 服务器连接""" if not self.sync_service: self.log_message("同步服务未初始化,请检查配置", is_error=True) self.update_status("❌ 同步服务未初始化") return self.log_message("正在测试 WebDAV 连接...") self.update_status("正在测试连接...") try: success = self.sync_service.test_connection() if success: self.log_message("连接测试成功") self.update_status("✅ 连接正常") else: self.log_message("连接测试失败", is_error=True) self.update_status("❌ 连接失败") except Exception as e: self.log_message(f"连接测试异常: {e}", is_error=True) self.update_status("❌ 连接异常") def start_sync(self): """开始同步""" if not self.sync_service: self.log_message("同步服务未初始化,无法开始同步", is_error=True) return if self.is_syncing: self.log_message("同步已在进行中", is_error=True) return self.is_syncing = True self.is_paused = False self.update_button_states() self.log_message("开始同步数据...") self.update_status("正在同步...", progress=0) # 启动后台同步任务 self.run_worker(self.perform_sync, thread=True) def perform_sync(self): """执行同步任务(在后台线程中运行)""" worker = get_current_worker() try: # 获取需要同步的本地目录 from heurams.context import config_var config = config_var.get() paths = config.get("paths", {}) # 同步 nucleon 目录 nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon")) if nucleon_dir.exists(): self.log_message(f"同步 nucleon 目录: {nucleon_dir}") self.update_status(f"同步 nucleon 目录...", progress=10) result = self.sync_service.sync_directory(nucleon_dir) # type: ignore if result.get("success"): self.log_message( f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" ) else: self.log_message( f"nucleon 同步失败: {result.get('error', '未知错误')}", is_error=True, ) # 同步 electron 目录 electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron")) if electron_dir.exists(): self.log_message(f"同步 electron 目录: {electron_dir}") self.update_status(f"同步 electron 目录...", progress=60) result = self.sync_service.sync_directory(electron_dir) # type: ignore if result.get("success"): self.log_message( f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" ) else: self.log_message( f"electron 同步失败: {result.get('error', '未知错误')}", is_error=True, ) # 同步 orbital 目录(如果存在) orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital")) if orbital_dir.exists(): self.log_message(f"同步 orbital 目录: {orbital_dir}") self.update_status(f"同步 orbital 目录...", progress=80) result = self.sync_service.sync_directory(orbital_dir) # type: ignore if result.get("success"): self.log_message( f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" ) else: self.log_message( f"orbital 同步失败: {result.get('error', '未知错误')}", is_error=True, ) # 同步完成 self.update_status("同步完成", progress=100) self.log_message("所有目录同步完成") except Exception as e: self.log_message(f"同步过程中发生错误: {e}", is_error=True) self.update_status("同步失败") finally: # 重置同步状态 self.is_syncing = False self.is_paused = False self.update_button_states() # type: ignore def pause_sync(self): """暂停同步""" if not self.is_syncing: return self.is_paused = not self.is_paused self.update_button_states() if self.is_paused: self.log_message("同步已暂停") self.update_status("同步已暂停") else: self.log_message("同步已恢复") self.update_status("正在同步...") def cancel_sync(self): """取消同步""" if not self.is_syncing: return self.is_syncing = False self.is_paused = False self.update_button_states() self.log_message("同步已取消") self.update_status("同步已取消") def update_button_states(self): """更新按钮状态""" try: start_button = self.query_one("#start_sync") pause_button = self.query_one("#pause_sync") cancel_button = self.query_one("#cancel_sync") if self.is_syncing: start_button.disabled = True pause_button.disabled = False cancel_button.disabled = False pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore else: start_button.disabled = False pause_button.disabled = True cancel_button.disabled = True except Exception as e: self.log_message(f"更新按钮状态失败: {e}", is_error=True) def action_go_back(self): self.app.pop_screen() def action_quit_app(self): self.app.exit()