feat(audiobook): implement log streaming for project status updates and enhance progress tracking

This commit is contained in:
2026-03-10 16:27:01 +08:00
parent 230274bbc3
commit 01b6f4633e
5 changed files with 261 additions and 11 deletions

View File

@@ -1,10 +1,11 @@
import asyncio
import json
import logging
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy.orm import Session
from api.auth import get_current_user
@@ -288,6 +289,45 @@ async def generate_project(
return {"message": msg, "project_id": project_id, "chapter_index": chapter_index}
@router.get("/projects/{project_id}/logs")
async def stream_project_logs(
project_id: int,
current_user: User = Depends(get_current_user),
):
from core import progress_store as ps
async def generator():
sent_complete = -1
last_streaming = ""
while True:
state = ps.get_snapshot(project_id)
lines = state["lines"]
n = len(lines)
for i in range(sent_complete + 1, max(0, n - 1)):
yield f"data: {json.dumps({'index': i, 'line': lines[i]})}\n\n"
sent_complete = i
if n > 0:
cur = lines[n - 1]
if cur != last_streaming or (sent_complete < n - 1):
yield f"data: {json.dumps({'index': n - 1, 'line': cur})}\n\n"
last_streaming = cur
sent_complete = max(sent_complete, n - 2)
if state["done"]:
yield f"data: {json.dumps({'done': True})}\n\n"
break
await asyncio.sleep(0.05)
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.get("/projects/{project_id}/segments", response_model=list[AudiobookSegmentResponse])
async def get_segments(
project_id: int,

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
import re
from pathlib import Path
@@ -7,6 +8,7 @@ from sqlalchemy.orm import Session
from core.config import settings
from core.llm_service import LLMService
from core import progress_store as ps
from db import crud
from db.models import AudiobookProject, AudiobookCharacter, User
@@ -115,16 +117,20 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None:
if not project:
return
ps.reset(project_id)
try:
crud.update_audiobook_project_status(db, project_id, "analyzing")
ps.append_line(project_id, f"[分析] 项目「{project.title}」开始角色分析")
llm = _get_llm_service(user)
if project.source_type == "epub" and project.source_path:
ps.append_line(project_id, "[解析] 正在提取 EPUB 章节内容...")
epub_chapters = _extract_epub_chapters(project.source_path)
if not epub_chapters:
raise ValueError("No text content extracted from epub.")
text = "\n\n".join(epub_chapters)
ps.append_line(project_id, f"[解析] 提取完成,共 {len(epub_chapters)} 章,{len(text)}")
project.source_text = text
db.commit()
else:
@@ -133,7 +139,13 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None:
if not text.strip():
raise ValueError("No text content found in project.")
characters_data = await llm.extract_characters(text)
ps.append_line(project_id, f"\n[LLM] 模型:{user.llm_model},正在分析角色...\n")
ps.append_line(project_id, "")
def on_token(token: str) -> None:
ps.append_token(project_id, token)
characters_data = await llm.extract_characters(text, on_token=on_token)
has_narrator = any(c.get("name") == "narrator" for c in characters_data)
if not has_narrator:
@@ -143,6 +155,8 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None:
"instruct": "中性声音,语速平稳,叙述感强"
})
ps.append_line(project_id, f"\n\n[完成] 发现 {len(characters_data)} 个角色:{', '.join(c.get('name', '') for c in characters_data)}")
crud.delete_audiobook_segments(db, project_id)
crud.delete_audiobook_characters(db, project_id)
@@ -172,10 +186,13 @@ async def analyze_project(project_id: int, user: User, db: Session) -> None:
)
crud.update_audiobook_project_status(db, project_id, "characters_ready")
ps.mark_done(project_id)
logger.info(f"Project {project_id} character extraction complete: {len(characters_data)} characters")
except Exception as e:
logger.error(f"Analysis failed for project {project_id}: {e}", exc_info=True)
ps.append_line(project_id, f"\n[错误] {e}")
ps.mark_done(project_id)
crud.update_audiobook_project_status(db, project_id, "error", error_message=str(e))
@@ -184,6 +201,7 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None:
if not project:
return
ps.reset(project_id)
try:
crud.update_audiobook_project_status(db, project_id, "parsing")
@@ -205,19 +223,29 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None:
else:
chapters = _split_into_chapters(text)
non_empty = [(i, t) for i, t in enumerate(chapters) if t.strip()]
ps.append_line(project_id, f"[解析] 共 {len(non_empty)} 章,角色:{', '.join(character_names)}\n")
crud.delete_audiobook_segments(db, project_id)
seg_counters: dict[int, int] = {}
for chapter_idx, chapter_text in enumerate(chapters):
if not chapter_text.strip():
continue
for chapter_idx, chapter_text in non_empty:
chunks = _chunk_chapter(chapter_text, max_chars=4000)
logger.info(f"Chapter {chapter_idx}: {len(chapter_text)} chars → {len(chunks)} chunk(s)")
for chunk in chunks:
ps.append_line(project_id, f"[第 {chapter_idx + 1} 章] {len(chapter_text)} 字,{len(chunks)}")
for chunk_i, chunk in enumerate(chunks):
ps.append_line(project_id, f"{chunk_i + 1}/{len(chunks)}")
ps.append_line(project_id, "")
def on_token(token: str) -> None:
ps.append_token(project_id, token)
try:
segments_data = await llm.parse_chapter_segments(chunk, character_names)
segments_data = await llm.parse_chapter_segments(chunk, character_names, on_token=on_token)
except Exception as e:
logger.warning(f"Chapter {chapter_idx} chunk LLM parse failed, fallback to narrator: {e}")
ps.append_line(project_id, f"\n [回退] LLM 失败,整块归属 narrator")
narrator = char_map.get("narrator")
if narrator:
idx = seg_counters.get(chapter_idx, 0)
@@ -231,6 +259,8 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None:
)
seg_counters[chapter_idx] = idx + 1
continue
chunk_seg_count = 0
for seg in segments_data:
char_name = seg.get("character", "narrator")
seg_text = seg.get("text", "").strip()
@@ -249,12 +279,20 @@ async def parse_chapters(project_id: int, user: User, db: Session) -> None:
segment_index=idx,
)
seg_counters[chapter_idx] = idx + 1
chunk_seg_count += 1
ps.append_line(project_id, f"\n [完成] 解析出 {chunk_seg_count}")
total_segs = sum(seg_counters.values())
ps.append_line(project_id, f"\n[完成] 全部解析完毕,共 {total_segs}")
crud.update_audiobook_project_status(db, project_id, "ready")
ps.mark_done(project_id)
logger.info(f"Project {project_id} chapter parsing complete: {len(chapters)} chapters")
except Exception as e:
logger.error(f"Chapter parsing failed for project {project_id}: {e}", exc_info=True)
ps.append_line(project_id, f"\n[错误] {e}")
ps.mark_done(project_id)
crud.update_audiobook_project_status(db, project_id, "error", error_message=str(e))

View File

@@ -13,6 +13,65 @@ class LLMService:
self.api_key = api_key
self.model = model
async def stream_chat(self, system_prompt: str, user_message: str, on_token=None) -> str:
url = f"{self.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"temperature": 0.3,
"stream": True,
}
full_text = ""
timeout = httpx.Timeout(connect=10.0, read=90.0, write=10.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", url, json=payload, headers=headers) as resp:
if resp.status_code != 200:
body = await resp.aread()
logger.error(f"LLM streaming error {resp.status_code}: {body}")
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:]
if data.strip() == "[DONE]":
break
try:
chunk = json.loads(data)
delta = chunk["choices"][0]["delta"].get("content", "")
if delta:
full_text += delta
if on_token:
on_token(delta)
except (json.JSONDecodeError, KeyError, IndexError):
continue
return full_text
async def stream_chat_json(self, system_prompt: str, user_message: str, on_token=None):
raw = await self.stream_chat(system_prompt, user_message, on_token)
raw = raw.strip()
if not raw:
raise ValueError("LLM returned empty response")
if raw.startswith("```"):
lines = raw.split("\n")
inner = lines[1:]
if inner and inner[-1].strip().startswith("```"):
inner = inner[:-1]
raw = "\n".join(inner).strip()
if not raw:
raise ValueError("LLM returned empty JSON after stripping markdown")
try:
return json.loads(raw)
except json.JSONDecodeError:
logger.error(f"JSON parse failed. Raw (first 500): {raw[:500]}")
raise
async def chat(self, system_prompt: str, user_message: str) -> str:
url = f"{self.base_url}/chat/completions"
headers = {
@@ -56,7 +115,7 @@ class LLMService:
logger.error(f"JSON parse failed. Raw response (first 500 chars): {raw[:500]}")
raise
async def extract_characters(self, text: str) -> list[Dict]:
async def extract_characters(self, text: str, on_token=None) -> list[Dict]:
system_prompt = (
"你是一个专业的小说分析助手兼声音导演。请分析给定的小说文本提取所有出现的角色包括旁白narrator\n"
"对每个角色instruct字段必须是详细的声音导演说明需覆盖以下六个维度每个维度单独一句用换行分隔\n"
@@ -70,10 +129,10 @@ class LLMService:
'{"characters": [{"name": "narrator", "description": "第三人称叙述者", "instruct": "音色信息:...\\n身份背景...\\n年龄设定...\\n外貌特征...\\n性格特质...\\n叙事风格..."}, ...]}'
)
user_message = f"请分析以下小说文本并提取角色:\n\n{text[:30000]}"
result = await self.chat_json(system_prompt, user_message)
result = await self.stream_chat_json(system_prompt, user_message, on_token)
return result.get("characters", [])
async def parse_chapter_segments(self, chapter_text: str, character_names: list[str]) -> list[Dict]:
async def parse_chapter_segments(self, chapter_text: str, character_names: list[str], on_token=None) -> list[Dict]:
names_str = "".join(character_names)
system_prompt = (
"你是一个专业的有声书制作助手。请将给定的章节文本解析为对话片段列表。"
@@ -83,7 +142,7 @@ class LLMService:
'[{"character": "narrator", "text": "叙述文字"}, {"character": "角色名", "text": "对话内容"}, ...]'
)
user_message = f"请解析以下章节文本:\n\n{chapter_text}"
result = await self.chat_json(system_prompt, user_message)
result = await self.stream_chat_json(system_prompt, user_message, on_token)
if isinstance(result, list):
return result
return []

View File

@@ -0,0 +1,38 @@
from typing import Dict
_store: Dict[int, dict] = {}
def _ensure(project_id: int) -> dict:
if project_id not in _store:
_store[project_id] = {"lines": [], "done": False}
return _store[project_id]
def reset(project_id: int) -> None:
_store[project_id] = {"lines": [], "done": False}
def append_line(project_id: int, text: str) -> None:
s = _ensure(project_id)
s["lines"].append(text)
def append_token(project_id: int, token: str) -> None:
s = _ensure(project_id)
if s["lines"]:
s["lines"][-1] += token
else:
s["lines"].append(token)
def mark_done(project_id: int) -> None:
s = _ensure(project_id)
s["done"] = True
def get_snapshot(project_id: int) -> dict:
s = _store.get(project_id)
if not s:
return {"lines": [], "done": True}
return {"lines": list(s["lines"]), "done": s["done"]}

View File

@@ -140,6 +140,77 @@ function SequentialPlayer({
)
}
function LogStream({ projectId, active }: { projectId: number; active: boolean }) {
const [lines, setLines] = useState<string[]>([])
const [done, setDone] = useState(false)
const bottomRef = useRef<HTMLDivElement>(null)
const activeRef = useRef(active)
activeRef.current = active
useEffect(() => {
if (!active) return
setLines([])
setDone(false)
const token = localStorage.getItem('token')
const apiBase = (import.meta.env.VITE_API_URL as string) || ''
const controller = new AbortController()
fetch(`${apiBase}/audiobook/projects/${projectId}/logs`, {
headers: { Authorization: `Bearer ${token}` },
signal: controller.signal,
}).then(async res => {
const reader = res.body?.getReader()
if (!reader) return
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done: streamDone, value } = await reader.read()
if (streamDone) break
buffer += decoder.decode(value, { stream: true })
const parts = buffer.split('\n\n')
buffer = parts.pop() ?? ''
for (const part of parts) {
const line = part.trim()
if (!line.startsWith('data: ')) continue
try {
const msg = JSON.parse(line.slice(6))
if (msg.done) {
setDone(true)
} else if (typeof msg.index === 'number') {
setLines(prev => {
const next = [...prev]
next[msg.index] = msg.line
return next
})
}
} catch {}
}
}
}).catch(() => {})
return () => controller.abort()
}, [projectId, active])
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' })
}, [lines])
if (lines.length === 0) return null
return (
<div className="rounded border border-green-900/40 bg-black/90 text-green-400 font-mono text-xs p-3 max-h-52 overflow-y-auto leading-relaxed">
{lines.map((line, i) => (
<div key={i} className="whitespace-pre-wrap">{line}</div>
))}
{!done && (
<span className="inline-block w-2 h-3 bg-green-400 animate-pulse ml-0.5 align-middle" />
)}
<div ref={bottomRef} />
</div>
)
}
function LLMConfigPanel({ onSaved }: { onSaved?: () => void }) {
const [baseUrl, setBaseUrl] = useState('')
const [apiKey, setApiKey] = useState('')
@@ -460,6 +531,10 @@ function ProjectCard({ project, onRefresh }: { project: AudiobookProject; onRefr
</div>
)}
{['analyzing', 'parsing'].includes(status) && (
<LogStream projectId={project.id} active={['analyzing', 'parsing'].includes(status)} />
)}
{project.error_message && (
<div className="text-xs text-destructive bg-destructive/10 rounded p-2">{project.error_message}</div>
)}