338 lines
13 KiB
Python
338 lines
13 KiB
Python
"""缓存工具界面"""
|
||
|
||
import pathlib
|
||
|
||
from textual.app import ComposeResult
|
||
from textual.containers import Horizontal, ScrollableContainer, Container
|
||
from textual.screen import Screen
|
||
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
|
||
from textual.worker import get_current_worker
|
||
|
||
from textual import events, on
|
||
|
||
import heurams.kernel.particles as pt
|
||
import heurams.services.hasher as hasher
|
||
from heurams.context import *
|
||
|
||
# 兼容性缓存路径:优先使用 paths.cache,否则使用 data/cache
|
||
paths = config_var.get()["global"]["paths"]
|
||
cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice"
|
||
|
||
|
||
def human_size(bytes_num: int) -> str:
|
||
"""将字节数格式化为人类可读的字符串"""
|
||
for unit in ["B", "KB", "MB", "GB", "TB"]:
|
||
if bytes_num < 1024.0:
|
||
return f"{bytes_num:.2f} {unit}"
|
||
bytes_num /= 1024.0 # type: ignore
|
||
return f"{bytes_num:.2f} PB"
|
||
|
||
|
||
class PrecachingScreen(Screen):
|
||
"""预缓存音频文件屏幕
|
||
|
||
缓存记忆单元音频文件, 全部(默认) 或部分记忆单元(可选参数传入)
|
||
|
||
Args:
|
||
nucleons (list): 可选列表, 仅包含 Nucleon 对象
|
||
desc (list): 可选字符串, 包含对此次调用的文字描述
|
||
"""
|
||
|
||
SUB_TITLE = "缓存管理器"
|
||
BINDINGS = [
|
||
("q", "go_back", "返回"),
|
||
]
|
||
|
||
def __init__(self, nucleons: list = [], desc: str = ""):
|
||
super().__init__(name=None, id=None, classes=None)
|
||
self.nucleons = nucleons
|
||
self.is_precaching = False
|
||
self.current_file = ""
|
||
self.current_item = ""
|
||
self.progress = 0
|
||
self.total = len(nucleons)
|
||
self.processed = 0
|
||
self.precache_worker = None
|
||
self.cancel_flag = 0
|
||
self.desc = desc
|
||
# 不再需要缓存配置,保留配置读取以兼容
|
||
self.cache_stats = {
|
||
"total_size": 0,
|
||
"file_count": 0,
|
||
"human_size": "0 B",
|
||
"cached_units": 0,
|
||
"total_units": 0,
|
||
"cache_rate": 0,
|
||
}
|
||
self._update_cache_stats()
|
||
|
||
def _get_total_units(self) -> int:
|
||
"""获取所有仓库的总单元数"""
|
||
from heurams.context import config_var
|
||
from heurams.kernel.repolib import Repo
|
||
|
||
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
|
||
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
|
||
repos = map(Repo.from_repodir, repo_dirs)
|
||
total = 0
|
||
for repo in repos:
|
||
try:
|
||
total += len(repo.ident_index)
|
||
except:
|
||
continue
|
||
return total
|
||
|
||
@on(events.ScreenResume)
|
||
def post_active(self, event):
|
||
from heurams.interface import shim
|
||
|
||
shim.set_term_title(f"{self.app.TITLE} - {self.SUB_TITLE}")
|
||
|
||
def _update_cache_stats(self) -> None:
|
||
"""更新缓存统计信息"""
|
||
total_size = 0
|
||
file_count = 0
|
||
cached_units = 0
|
||
if cache_dir.exists():
|
||
for file in cache_dir.rglob("*"):
|
||
if file.is_file():
|
||
total_size += file.stat().st_size
|
||
file_count += 1
|
||
if file.suffix.lower() == ".wav":
|
||
cached_units += 1
|
||
total_units = self._get_total_units()
|
||
cache_rate = (cached_units / total_units * 100) if total_units > 0 else 0
|
||
|
||
self.cache_stats["total_size"] = total_size
|
||
self.cache_stats["file_count"] = file_count
|
||
self.cache_stats["human_size"] = human_size(total_size)
|
||
self.cache_stats["cached_units"] = cached_units
|
||
self.cache_stats["total_units"] = total_units
|
||
self.cache_stats["cache_rate"] = cache_rate
|
||
|
||
def compose(self) -> ComposeResult:
|
||
|
||
if config_var.get()['interface']['global']['show_header']:
|
||
yield Header(show_clock=config_var.get()['interface']['global']['clock_on_header'])
|
||
with ScrollableContainer(id="precache_container"):
|
||
yield Label("[b]音频预缓存[/b]", classes="title-label")
|
||
with Container():
|
||
yield Static(
|
||
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)",
|
||
classes="cache-usage-text",
|
||
)
|
||
if self.nucleons:
|
||
yield Static(
|
||
f"目标单元归属: [b]{self.desc}[/b]", classes="target-info"
|
||
)
|
||
yield Static(
|
||
f"单元数量: {len(self.nucleons)}", classes="target-info"
|
||
)
|
||
else:
|
||
yield Static("目标: 所有单元", classes="target-info")
|
||
|
||
yield Static(id="status", classes="status-info")
|
||
yield Static(id="current_item", classes="current-item")
|
||
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
|
||
with Horizontal(classes="button-group"):
|
||
if not self.is_precaching:
|
||
yield Button(
|
||
"开始预缓存", id="start_precache", variant="primary"
|
||
)
|
||
else:
|
||
yield Button(
|
||
"取消预缓存", id="cancel_precache", variant="error"
|
||
)
|
||
yield Button("清空缓存", id="clear_cache", variant="warning")
|
||
yield Button("返回", id="go_back", variant="default")
|
||
with Container(classes="cache-info"):
|
||
yield Static(f"缓存路径: {cache_dir}", classes="cache-path")
|
||
yield Static(
|
||
f"文件数: {self.cache_stats['file_count']}", classes="cache-count"
|
||
)
|
||
yield Static(
|
||
f"总大小: {self.cache_stats['human_size']}", classes="cache-size"
|
||
)
|
||
yield Button(
|
||
"刷新", id="refresh_cache_stats", variant="default", flat=True
|
||
)
|
||
yield Static("若您离开此界面, 未完成的缓存进程会自动停止.")
|
||
yield Static('缓存程序支持 "断点续传".')
|
||
|
||
yield Footer()
|
||
|
||
def on_mount(self):
|
||
"""挂载时初始化状态"""
|
||
self.update_status("就绪", "等待开始...")
|
||
self._update_cache_display()
|
||
|
||
def update_status(self, status, current_item="", progress=None):
|
||
"""更新状态显示"""
|
||
status_widget = self.query_one("#status", Static)
|
||
item_widget = self.query_one("#current_item", Static)
|
||
progress_bar = self.query_one("#progress_bar", ProgressBar)
|
||
|
||
status_widget.update(f"状态: {status}")
|
||
item_widget.update(f"当前项目: {current_item}" if current_item else "")
|
||
|
||
if progress is not None:
|
||
progress_bar.progress = progress
|
||
progress_bar.advance(0) # 刷新显示
|
||
|
||
def _update_cache_display(self) -> None:
|
||
"""更新缓存信息显示"""
|
||
# 更新统计信息
|
||
self._update_cache_stats()
|
||
# 更新缓存率进度条
|
||
# 更新缓存大小和文件数显示
|
||
cache_count_widget = self.query_one(".cache-count", Static)
|
||
cache_size_widget = self.query_one(".cache-size", Static)
|
||
cache_usage_text = self.query_one(".cache-usage-text", Static)
|
||
if cache_count_widget:
|
||
cache_count_widget.update(f"文件数: {self.cache_stats['file_count']}")
|
||
if cache_size_widget:
|
||
cache_size_widget.update(f"总大小: {self.cache_stats['human_size']}")
|
||
if cache_usage_text:
|
||
cache_usage_text.update(
|
||
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% "
|
||
f"(已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)"
|
||
)
|
||
|
||
def precache_by_text(self, text: str):
|
||
"""预缓存单段文本的音频"""
|
||
|
||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
|
||
if not cache_file.exists():
|
||
try:
|
||
from heurams.services.tts_service import convertor
|
||
|
||
convertor(text, cache_file)
|
||
return 1
|
||
except Exception as e:
|
||
print(f"预缓存失败 '{text}': {e}")
|
||
return 0
|
||
return 1
|
||
|
||
def precache_by_nucleon(self, nucleon: pt.Nucleon):
|
||
"""依据 Nucleon 缓存"""
|
||
ret = self.precache_by_text(nucleon["tts_text"])
|
||
return ret
|
||
|
||
def precache_by_list(self, nucleons: list):
|
||
"""依据 Nucleons 列表缓存"""
|
||
for idx, nucleon in enumerate(nucleons):
|
||
# print(f"PROC: {nucleon}")
|
||
worker = get_current_worker()
|
||
if worker and worker.is_cancelled: # 函数在worker中执行且已被取消
|
||
return False
|
||
text = nucleon["tts_text"]
|
||
# self.current_item = text[:30] + "..." if len(text) > 50 else text
|
||
# print(text)
|
||
self.processed += 1
|
||
# print(self.processed)
|
||
# print(self.total)
|
||
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
|
||
# print(progress)
|
||
self.update_status(f"正处理 ({idx + 1}/{len(nucleons)})", text, progress)
|
||
ret = self.precache_by_nucleon(nucleon)
|
||
if not ret:
|
||
self.update_status(
|
||
"出错",
|
||
f"处理失败, 跳过: {self.current_item}",
|
||
)
|
||
import time
|
||
|
||
time.sleep(1)
|
||
if self.cancel_flag:
|
||
worker.cancel()
|
||
self.cancel_flag = 0
|
||
return False
|
||
return True
|
||
|
||
def precache_by_nucleons(self):
|
||
# print("开始缓存")
|
||
ret = self.precache_by_list(self.nucleons)
|
||
# print(f"返回 {ret}")
|
||
return ret
|
||
|
||
def precache_all_files(self):
|
||
"""预缓存所有文件"""
|
||
from heurams.context import config_var
|
||
from heurams.kernel.repolib import Repo
|
||
|
||
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
|
||
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
|
||
repos = map(Repo.from_repodir, repo_dirs)
|
||
|
||
# 计算总项目数
|
||
self.total = 0
|
||
nucleon_list = list()
|
||
for repo in repos:
|
||
try:
|
||
for i in repo.ident_index:
|
||
nucleon_list.append(
|
||
pt.Nucleon.from_data(
|
||
repo.nucleonic_data_lict.get_itemic_unit(i)
|
||
)
|
||
)
|
||
except:
|
||
continue
|
||
self.total = len(nucleon_list)
|
||
return self.precache_by_list(nucleon_list)
|
||
|
||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||
event.stop()
|
||
if event.button.id == "start_precache" and not self.is_precaching:
|
||
# 开始预缓存
|
||
if self.nucleons:
|
||
self.precache_worker = self.run_worker(
|
||
self.precache_by_nucleons,
|
||
thread=True,
|
||
exclusive=True,
|
||
exit_on_error=True,
|
||
)
|
||
else:
|
||
self.precache_worker = self.run_worker(
|
||
self.precache_all_files,
|
||
thread=True,
|
||
exclusive=True,
|
||
exit_on_error=True,
|
||
)
|
||
|
||
elif event.button.id == "cancel_precache" and self.is_precaching:
|
||
# 取消预缓存
|
||
if self.precache_worker:
|
||
self.precache_worker.cancel()
|
||
self.is_precaching = False
|
||
self.processed = 0
|
||
self.progress = 0
|
||
self.update_status("已取消", "预缓存操作被用户取消", 0)
|
||
|
||
elif event.button.id == "clear_cache":
|
||
# 清空缓存
|
||
try:
|
||
import shutil
|
||
|
||
|
||
shutil.rmtree(cache_dir, ignore_errors=True)
|
||
self.update_status("已清空", "音频缓存已清空", 0)
|
||
self._update_cache_display() # 更新缓存统计显示
|
||
except Exception as e:
|
||
self.update_status("错误", f"清空缓存失败: {e}")
|
||
self.cancel_flag = 1
|
||
self.processed = 0
|
||
self.progress = 0
|
||
|
||
elif event.button.id == "refresh_cache_stats":
|
||
# 刷新缓存统计信息
|
||
self._update_cache_display()
|
||
self.app.notify("缓存信息已刷新", severity="information")
|
||
elif event.button.id == "go_back":
|
||
self.action_go_back()
|
||
|
||
def action_go_back(self):
|
||
if self.is_precaching and self.precache_worker:
|
||
self.precache_worker.cancel()
|
||
self.app.pop_screen()
|