From 01b6f4633e9f162c109514ae188782e9c8fb67ee Mon Sep 17 00:00:00 2001 From: bdim404 Date: Tue, 10 Mar 2026 16:27:01 +0800 Subject: [PATCH] feat(audiobook): implement log streaming for project status updates and enhance progress tracking --- qwen3-tts-backend/api/audiobook.py | 42 +++++++++++- qwen3-tts-backend/core/audiobook_service.py | 50 ++++++++++++-- qwen3-tts-backend/core/llm_service.py | 67 ++++++++++++++++-- qwen3-tts-backend/core/progress_store.py | 38 +++++++++++ qwen3-tts-frontend/src/pages/Audiobook.tsx | 75 +++++++++++++++++++++ 5 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 qwen3-tts-backend/core/progress_store.py diff --git a/qwen3-tts-backend/api/audiobook.py b/qwen3-tts-backend/api/audiobook.py index ea570b1..8887c13 100644 --- a/qwen3-tts-backend/api/audiobook.py +++ b/qwen3-tts-backend/api/audiobook.py @@ -1,10 +1,11 @@ import asyncio +import json import logging from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session from api.auth import get_current_user @@ -288,6 +289,45 @@ async def generate_project( return {"message": msg, "project_id": project_id, "chapter_index": chapter_index} +@router.get("/projects/{project_id}/logs") +async def stream_project_logs( + project_id: int, + current_user: User = Depends(get_current_user), +): + from core import progress_store as ps + + async def generator(): + sent_complete = -1 + last_streaming = "" + while True: + state = ps.get_snapshot(project_id) + lines = state["lines"] + n = len(lines) + + for i in range(sent_complete + 1, max(0, n - 1)): + yield f"data: {json.dumps({'index': i, 'line': lines[i]})}\n\n" + sent_complete = i + + if n > 0: + cur = lines[n - 1] + if cur != last_streaming or (sent_complete < n - 1): + yield f"data: {json.dumps({'index': n - 1, 'line': cur})}\n\n" + last_streaming = cur + sent_complete = max(sent_complete, n - 2) + + if state["done"]: + yield f"data: {json.dumps({'done': True})}\n\n" + break + + await asyncio.sleep(0.05) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + @router.get("/projects/{project_id}/segments", response_model=list[AudiobookSegmentResponse]) async def get_segments( project_id: int, diff --git a/qwen3-tts-backend/core/audiobook_service.py b/qwen3-tts-backend/core/audiobook_service.py index 72f0629..a20990f 100644 --- a/qwen3-tts-backend/core/audiobook_service.py +++ b/qwen3-tts-backend/core/audiobook_service.py @@ -1,3 +1,4 @@ +import asyncio import logging import re from pathlib import Path @@ -7,6 +8,7 @@ from sqlalchemy.orm import Session from core.config import settings from core.llm_service import LLMService +from core import progress_store as ps from db import crud from db.models import AudiobookProject, AudiobookCharacter, User @@ -115,16 +117,20 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None: if not project: return + ps.reset(project_id) try: crud.update_audiobook_project_status(db, project_id, "analyzing") + ps.append_line(project_id, f"[分析] 项目「{project.title}」开始角色分析") llm = _get_llm_service(user) if project.source_type == "epub" and project.source_path: + ps.append_line(project_id, "[解析] 正在提取 EPUB 章节内容...") epub_chapters = _extract_epub_chapters(project.source_path) if not epub_chapters: raise ValueError("No text content extracted from epub.") text = "\n\n".join(epub_chapters) + ps.append_line(project_id, f"[解析] 提取完成,共 {len(epub_chapters)} 章,{len(text)} 字") project.source_text = text db.commit() else: @@ -133,7 +139,13 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None: if not text.strip(): raise ValueError("No text content found in project.") - characters_data = await llm.extract_characters(text) + ps.append_line(project_id, f"\n[LLM] 模型:{user.llm_model},正在分析角色...\n") + ps.append_line(project_id, "") + + def on_token(token: str) -> None: + ps.append_token(project_id, token) + + characters_data = await llm.extract_characters(text, on_token=on_token) has_narrator = any(c.get("name") == "narrator" for c in characters_data) if not has_narrator: @@ -143,6 +155,8 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None: "instruct": "中性声音,语速平稳,叙述感强" }) + ps.append_line(project_id, f"\n\n[完成] 发现 {len(characters_data)} 个角色:{', '.join(c.get('name', '') for c in characters_data)}") + crud.delete_audiobook_segments(db, project_id) crud.delete_audiobook_characters(db, project_id) @@ -172,10 +186,13 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None: ) crud.update_audiobook_project_status(db, project_id, "characters_ready") + ps.mark_done(project_id) logger.info(f"Project {project_id} character extraction complete: {len(characters_data)} characters") except Exception as e: logger.error(f"Analysis failed for project {project_id}: {e}", exc_info=True) + ps.append_line(project_id, f"\n[错误] {e}") + ps.mark_done(project_id) crud.update_audiobook_project_status(db, project_id, "error", error_message=str(e)) @@ -184,6 +201,7 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None: if not project: return + ps.reset(project_id) try: crud.update_audiobook_project_status(db, project_id, "parsing") @@ -205,19 +223,29 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None: else: chapters = _split_into_chapters(text) + non_empty = [(i, t) for i, t in enumerate(chapters) if t.strip()] + ps.append_line(project_id, f"[解析] 共 {len(non_empty)} 章,角色:{', '.join(character_names)}\n") + crud.delete_audiobook_segments(db, project_id) seg_counters: dict[int, int] = {} - for chapter_idx, chapter_text in enumerate(chapters): - if not chapter_text.strip(): - continue + for chapter_idx, chapter_text in non_empty: chunks = _chunk_chapter(chapter_text, max_chars=4000) logger.info(f"Chapter {chapter_idx}: {len(chapter_text)} chars → {len(chunks)} chunk(s)") - for chunk in chunks: + ps.append_line(project_id, f"[第 {chapter_idx + 1} 章] {len(chapter_text)} 字,{len(chunks)} 块") + + for chunk_i, chunk in enumerate(chunks): + ps.append_line(project_id, f" 块 {chunk_i + 1}/{len(chunks)} → ") + ps.append_line(project_id, "") + + def on_token(token: str) -> None: + ps.append_token(project_id, token) + try: - segments_data = await llm.parse_chapter_segments(chunk, character_names) + segments_data = await llm.parse_chapter_segments(chunk, character_names, on_token=on_token) except Exception as e: logger.warning(f"Chapter {chapter_idx} chunk LLM parse failed, fallback to narrator: {e}") + ps.append_line(project_id, f"\n [回退] LLM 失败,整块归属 narrator") narrator = char_map.get("narrator") if narrator: idx = seg_counters.get(chapter_idx, 0) @@ -231,6 +259,8 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None: ) seg_counters[chapter_idx] = idx + 1 continue + + chunk_seg_count = 0 for seg in segments_data: char_name = seg.get("character", "narrator") seg_text = seg.get("text", "").strip() @@ -249,12 +279,20 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None: segment_index=idx, ) seg_counters[chapter_idx] = idx + 1 + chunk_seg_count += 1 + ps.append_line(project_id, f"\n [完成] 解析出 {chunk_seg_count} 段") + + total_segs = sum(seg_counters.values()) + ps.append_line(project_id, f"\n[完成] 全部解析完毕,共 {total_segs} 段") crud.update_audiobook_project_status(db, project_id, "ready") + ps.mark_done(project_id) logger.info(f"Project {project_id} chapter parsing complete: {len(chapters)} chapters") except Exception as e: logger.error(f"Chapter parsing failed for project {project_id}: {e}", exc_info=True) + ps.append_line(project_id, f"\n[错误] {e}") + ps.mark_done(project_id) crud.update_audiobook_project_status(db, project_id, "error", error_message=str(e)) diff --git a/qwen3-tts-backend/core/llm_service.py b/qwen3-tts-backend/core/llm_service.py index 9f0d2e6..63194a4 100644 --- a/qwen3-tts-backend/core/llm_service.py +++ b/qwen3-tts-backend/core/llm_service.py @@ -13,6 +13,65 @@ class LLMService: self.api_key = api_key self.model = model + async def stream_chat(self, system_prompt: str, user_message: str, on_token=None) -> str: + url = f"{self.base_url}/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_message}, + ], + "temperature": 0.3, + "stream": True, + } + full_text = "" + timeout = httpx.Timeout(connect=10.0, read=90.0, write=10.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("POST", url, json=payload, headers=headers) as resp: + if resp.status_code != 200: + body = await resp.aread() + logger.error(f"LLM streaming error {resp.status_code}: {body}") + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.startswith("data: "): + continue + data = line[6:] + if data.strip() == "[DONE]": + break + try: + chunk = json.loads(data) + delta = chunk["choices"][0]["delta"].get("content", "") + if delta: + full_text += delta + if on_token: + on_token(delta) + except (json.JSONDecodeError, KeyError, IndexError): + continue + return full_text + + async def stream_chat_json(self, system_prompt: str, user_message: str, on_token=None): + raw = await self.stream_chat(system_prompt, user_message, on_token) + raw = raw.strip() + if not raw: + raise ValueError("LLM returned empty response") + if raw.startswith("```"): + lines = raw.split("\n") + inner = lines[1:] + if inner and inner[-1].strip().startswith("```"): + inner = inner[:-1] + raw = "\n".join(inner).strip() + if not raw: + raise ValueError("LLM returned empty JSON after stripping markdown") + try: + return json.loads(raw) + except json.JSONDecodeError: + logger.error(f"JSON parse failed. Raw (first 500): {raw[:500]}") + raise + async def chat(self, system_prompt: str, user_message: str) -> str: url = f"{self.base_url}/chat/completions" headers = { @@ -56,7 +115,7 @@ class LLMService: logger.error(f"JSON parse failed. Raw response (first 500 chars): {raw[:500]}") raise - async def extract_characters(self, text: str) -> list[Dict]: + async def extract_characters(self, text: str, on_token=None) -> list[Dict]: system_prompt = ( "你是一个专业的小说分析助手兼声音导演。请分析给定的小说文本,提取所有出现的角色(包括旁白narrator)。\n" "对每个角色,instruct字段必须是详细的声音导演说明,需覆盖以下六个维度,每个维度单独一句,用换行分隔:\n" @@ -70,10 +129,10 @@ class LLMService: '{"characters": [{"name": "narrator", "description": "第三人称叙述者", "instruct": "音色信息:...\\n身份背景:...\\n年龄设定:...\\n外貌特征:...\\n性格特质:...\\n叙事风格:..."}, ...]}' ) user_message = f"请分析以下小说文本并提取角色:\n\n{text[:30000]}" - result = await self.chat_json(system_prompt, user_message) + result = await self.stream_chat_json(system_prompt, user_message, on_token) return result.get("characters", []) - async def parse_chapter_segments(self, chapter_text: str, character_names: list[str]) -> list[Dict]: + async def parse_chapter_segments(self, chapter_text: str, character_names: list[str], on_token=None) -> list[Dict]: names_str = "、".join(character_names) system_prompt = ( "你是一个专业的有声书制作助手。请将给定的章节文本解析为对话片段列表。" @@ -83,7 +142,7 @@ class LLMService: '[{"character": "narrator", "text": "叙述文字"}, {"character": "角色名", "text": "对话内容"}, ...]' ) user_message = f"请解析以下章节文本:\n\n{chapter_text}" - result = await self.chat_json(system_prompt, user_message) + result = await self.stream_chat_json(system_prompt, user_message, on_token) if isinstance(result, list): return result return [] diff --git a/qwen3-tts-backend/core/progress_store.py b/qwen3-tts-backend/core/progress_store.py new file mode 100644 index 0000000..fa3aa7d --- /dev/null +++ b/qwen3-tts-backend/core/progress_store.py @@ -0,0 +1,38 @@ +from typing import Dict + +_store: Dict[int, dict] = {} + + +def _ensure(project_id: int) -> dict: + if project_id not in _store: + _store[project_id] = {"lines": [], "done": False} + return _store[project_id] + + +def reset(project_id: int) -> None: + _store[project_id] = {"lines": [], "done": False} + + +def append_line(project_id: int, text: str) -> None: + s = _ensure(project_id) + s["lines"].append(text) + + +def append_token(project_id: int, token: str) -> None: + s = _ensure(project_id) + if s["lines"]: + s["lines"][-1] += token + else: + s["lines"].append(token) + + +def mark_done(project_id: int) -> None: + s = _ensure(project_id) + s["done"] = True + + +def get_snapshot(project_id: int) -> dict: + s = _store.get(project_id) + if not s: + return {"lines": [], "done": True} + return {"lines": list(s["lines"]), "done": s["done"]} diff --git a/qwen3-tts-frontend/src/pages/Audiobook.tsx b/qwen3-tts-frontend/src/pages/Audiobook.tsx index 49e8478..13be56b 100644 --- a/qwen3-tts-frontend/src/pages/Audiobook.tsx +++ b/qwen3-tts-frontend/src/pages/Audiobook.tsx @@ -140,6 +140,77 @@ function SequentialPlayer({ ) } +function LogStream({ projectId, active }: { projectId: number; active: boolean }) { + const [lines, setLines] = useState([]) + const [done, setDone] = useState(false) + const bottomRef = useRef(null) + const activeRef = useRef(active) + activeRef.current = active + + useEffect(() => { + if (!active) return + setLines([]) + setDone(false) + + const token = localStorage.getItem('token') + const apiBase = (import.meta.env.VITE_API_URL as string) || '' + const controller = new AbortController() + + fetch(`${apiBase}/audiobook/projects/${projectId}/logs`, { + headers: { Authorization: `Bearer ${token}` }, + signal: controller.signal, + }).then(async res => { + const reader = res.body?.getReader() + if (!reader) return + const decoder = new TextDecoder() + let buffer = '' + while (true) { + const { done: streamDone, value } = await reader.read() + if (streamDone) break + buffer += decoder.decode(value, { stream: true }) + const parts = buffer.split('\n\n') + buffer = parts.pop() ?? '' + for (const part of parts) { + const line = part.trim() + if (!line.startsWith('data: ')) continue + try { + const msg = JSON.parse(line.slice(6)) + if (msg.done) { + setDone(true) + } else if (typeof msg.index === 'number') { + setLines(prev => { + const next = [...prev] + next[msg.index] = msg.line + return next + }) + } + } catch {} + } + } + }).catch(() => {}) + + return () => controller.abort() + }, [projectId, active]) + + useEffect(() => { + bottomRef.current?.scrollIntoView({ behavior: 'smooth' }) + }, [lines]) + + if (lines.length === 0) return null + + return ( +
+ {lines.map((line, i) => ( +
{line}
+ ))} + {!done && ( + + )} +
+
+ ) +} + function LLMConfigPanel({ onSaved }: { onSaved?: () => void }) { const [baseUrl, setBaseUrl] = useState('') const [apiKey, setApiKey] = useState('') @@ -460,6 +531,10 @@ function ProjectCard({ project, onRefresh }: { project: AudiobookProject; onRefr
)} + {['analyzing', 'parsing'].includes(status) && ( + + )} + {project.error_message && (
{project.error_message}
)}