Files
HeurAMS/src/heurams/interface/screens/precache.py
2026-04-23 00:13:57 +08:00

339 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""缓存工具界面"""
import pathlib
from textual.app import ComposeResult
from textual.containers import Horizontal, ScrollableContainer, Container
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
from textual.worker import get_current_worker
from textual import events, on
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
# 兼容性缓存路径:优先使用 paths.cache否则使用 data/cache
paths = config_var.get()["global"]["paths"]
cache_dir = pathlib.Path(paths.get("cache", paths["data"] + "/cache")) / "voice"
def human_size(bytes_num: int) -> str:
"""将字节数格式化为人类可读的字符串"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_num < 1024.0:
return f"{bytes_num:.2f} {unit}"
bytes_num /= 1024.0 # type: ignore
return f"{bytes_num:.2f} PB"
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕
缓存记忆单元音频文件, 全部(默认) 或部分记忆单元(可选参数传入)
Args:
nucleons (list): 可选列表, 仅包含 Nucleon 对象
desc (list): 可选字符串, 包含对此次调用的文字描述
"""
SUB_TITLE = "缓存管理器"
BINDINGS = [
("q", "go_back", "返回"),
]
def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None)
self.nucleons = nucleons
self.is_precaching = False
self.current_file = ""
self.current_item = ""
self.progress = 0
self.total = len(nucleons)
self.processed = 0
self.precache_worker = None
self.cancel_flag = 0
self.desc = desc
# 不再需要缓存配置,保留配置读取以兼容
self.cache_stats = {
"total_size": 0,
"file_count": 0,
"human_size": "0 B",
"cached_units": 0,
"total_units": 0,
"cache_rate": 0,
}
self._update_cache_stats()
def _get_total_units(self) -> int:
"""获取所有仓库的总单元数"""
from heurams.context import config_var
from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.from_repodir, repo_dirs)
total = 0
for repo in repos:
try:
total += len(repo.ident_index)
except:
continue
return total
@on(events.ScreenResume)
def post_active(self, event):
from heurams.interface import shim
shim.set_term_title(f"{self.app.TITLE} - {self.SUB_TITLE}")
def _update_cache_stats(self) -> None:
"""更新缓存统计信息"""
total_size = 0
file_count = 0
cached_units = 0
if cache_dir.exists():
for file in cache_dir.rglob("*"):
if file.is_file():
total_size += file.stat().st_size
file_count += 1
if file.suffix.lower() == ".wav":
cached_units += 1
total_units = self._get_total_units()
cache_rate = (cached_units / total_units * 100) if total_units > 0 else 0
self.cache_stats["total_size"] = total_size
self.cache_stats["file_count"] = file_count
self.cache_stats["human_size"] = human_size(total_size)
self.cache_stats["cached_units"] = cached_units
self.cache_stats["total_units"] = total_units
self.cache_stats["cache_rate"] = cache_rate
def compose(self) -> ComposeResult:
if config_var.get()["interface"]["global"]["show_header"]:
yield Header(
show_clock=config_var.get()["interface"]["global"]["clock_on_header"]
)
with ScrollableContainer(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
with Container():
yield Static(
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% (已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)",
classes="cache-usage-text",
)
if self.nucleons:
yield Static(
f"目标单元归属: [b]{self.desc}[/b]", classes="target-info"
)
yield Static(
f"单元数量: {len(self.nucleons)}", classes="target-info"
)
else:
yield Static("目标: 所有单元", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button(
"开始预缓存", id="start_precache", variant="primary"
)
else:
yield Button(
"取消预缓存", id="cancel_precache", variant="error"
)
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
with Container(classes="cache-info"):
yield Static(f"缓存路径: {cache_dir}", classes="cache-path")
yield Static(
f"文件数: {self.cache_stats['file_count']}", classes="cache-count"
)
yield Static(
f"总大小: {self.cache_stats['human_size']}", classes="cache-size"
)
yield Button(
"刷新", id="refresh_cache_stats", variant="default", flat=True
)
yield Static("若您离开此界面, 未完成的缓存进程会自动停止.")
yield Static('缓存程序支持 "断点续传".')
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_status("就绪", "等待开始...")
self._update_cache_display()
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def _update_cache_display(self) -> None:
"""更新缓存信息显示"""
# 更新统计信息
self._update_cache_stats()
# 更新缓存率进度条
# 更新缓存大小和文件数显示
cache_count_widget = self.query_one(".cache-count", Static)
cache_size_widget = self.query_one(".cache-size", Static)
cache_usage_text = self.query_one(".cache-usage-text", Static)
if cache_count_widget:
cache_count_widget.update(f"文件数: {self.cache_stats['file_count']}")
if cache_size_widget:
cache_size_widget.update(f"总大小: {self.cache_stats['human_size']}")
if cache_usage_text:
cache_usage_text.update(
f"缓存率: {self.cache_stats.get('cache_rate', 0):.1f}% "
f"(已缓存 {self.cache_stats.get('cached_units', 0)} / {self.cache_stats.get('total_units', 0)} 个单元)"
)
def precache_by_text(self, text: str):
"""预缓存单段文本的音频"""
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
try:
from heurams.services.tts_service import convertor
convertor(text, cache_file)
return 1
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return 0
return 1
def precache_by_nucleon(self, nucleon: pt.Nucleon):
"""依据 Nucleon 缓存"""
ret = self.precache_by_text(nucleon["tts_text"])
return ret
def precache_by_list(self, nucleons: list):
"""依据 Nucleons 列表缓存"""
for idx, nucleon in enumerate(nucleons):
# print(f"PROC: {nucleon}")
worker = get_current_worker()
if worker and worker.is_cancelled: # 函数在worker中执行且已被取消
return False
text = nucleon["tts_text"]
# self.current_item = text[:30] + "..." if len(text) > 50 else text
# print(text)
self.processed += 1
# print(self.processed)
# print(self.total)
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
# print(progress)
self.update_status(f"正处理 ({idx + 1}/{len(nucleons)})", text, progress)
ret = self.precache_by_nucleon(nucleon)
if not ret:
self.update_status(
"出错",
f"处理失败, 跳过: {self.current_item}",
)
import time
time.sleep(1)
if self.cancel_flag:
worker.cancel()
self.cancel_flag = 0
return False
return True
def precache_by_nucleons(self):
# print("开始缓存")
ret = self.precache_by_list(self.nucleons)
# print(f"返回 {ret}")
return ret
def precache_all_files(self):
"""预缓存所有文件"""
from heurams.context import config_var
from heurams.kernel.repolib import Repo
repo_path = pathlib.Path(config_var.get()["global"]["paths"]["data"]) / "repo"
repo_dirs = Repo.probe_valid_repos_in_dir(repo_path)
repos = map(Repo.from_repodir, repo_dirs)
# 计算总项目数
self.total = 0
nucleon_list = list()
for repo in repos:
try:
for i in repo.ident_index:
nucleon_list.append(
pt.Nucleon.from_data(
repo.nucleonic_data_lict.get_itemic_unit(i)
)
)
except:
continue
self.total = len(nucleon_list)
return self.precache_by_list(nucleon_list)
def on_button_pressed(self, event: Button.Pressed) -> None:
event.stop()
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleons:
self.precache_worker = self.run_worker(
self.precache_by_nucleons,
thread=True,
exclusive=True,
exit_on_error=True,
)
else:
self.precache_worker = self.run_worker(
self.precache_all_files,
thread=True,
exclusive=True,
exit_on_error=True,
)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
self.precache_worker.cancel()
self.is_precaching = False
self.processed = 0
self.progress = 0
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
import shutil
shutil.rmtree(cache_dir, ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
self._update_cache_display() # 更新缓存统计显示
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
self.cancel_flag = 1
self.processed = 0
self.progress = 0
elif event.button.id == "refresh_cache_stats":
# 刷新缓存统计信息
self._update_cache_display()
self.app.notify("缓存信息已刷新", severity="information")
elif event.button.id == "go_back":
self.action_go_back()
def action_go_back(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.pop_screen()