feat: Add batch processing for audiobook chapters including parse, generate, and combined process actions.

This commit is contained in:
2026-03-11 14:08:09 +08:00
parent cd73871c64
commit a0047d5c29
10 changed files with 296 additions and 34 deletions

View File

@@ -281,6 +281,8 @@ async def parse_one_chapter(project_id: int, chapter_id: int, user: User, db) ->
ps.append_line(key, f"{len(chunks)}\n")
seg_counter = 0
failed_chunks = 0
last_error = ""
for i, chunk in enumerate(chunks):
ps.append_line(key, f"{i + 1}/{len(chunks)}")
ps.append_line(key, "")
@@ -293,6 +295,8 @@ async def parse_one_chapter(project_id: int, chapter_id: int, user: User, db) ->
except Exception as e:
logger.warning(f"Chapter {chapter_id} chunk {i} failed: {e}")
ps.append_line(key, f"\n[回退] {e}")
failed_chunks += 1
last_error = str(e)
narrator = char_map.get("narrator")
if narrator:
crud.create_audiobook_segment(
@@ -319,8 +323,18 @@ async def parse_one_chapter(project_id: int, chapter_id: int, user: User, db) ->
ps.append_line(key, f"\n{chunk_count}")
ps.append_line(key, f"\n[完成] 共 {seg_counter}")
crud.update_audiobook_chapter_status(db, chapter_id, "ready")
if failed_chunks == len(chunks):
# All chunks failed — mark chapter as error, remove fallback segments
crud.delete_audiobook_segments_for_chapter(db, project_id, chapter.chapter_index)
error_msg = f"所有 {len(chunks)} 个块均解析失败: {last_error}"
ps.append_line(key, f"\n[错误] {error_msg}")
crud.update_audiobook_chapter_status(db, chapter_id, "error", error_message=error_msg)
elif failed_chunks > 0:
ps.append_line(key, f"\n[完成] 共 {seg_counter} 段({failed_chunks}/{len(chunks)} 块回退到旁白)")
crud.update_audiobook_chapter_status(db, chapter_id, "ready")
else:
ps.append_line(key, f"\n[完成] 共 {seg_counter}")
crud.update_audiobook_chapter_status(db, chapter_id, "ready")
ps.mark_done(key)
logger.info(f"Chapter {chapter_id} parsed: {seg_counter} segments")
@@ -545,3 +559,92 @@ def merge_audio_files(audio_paths: list[str], output_path: str) -> None:
if combined:
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
combined.export(output_path, format="wav")
async def parse_all_chapters(project_id: int, user: User, db: Session) -> None:
"""Concurrently parse all pending/error/ready chapters using asyncio.Semaphore."""
from core.database import SessionLocal
chapters = crud.list_audiobook_chapters(db, project_id)
pending = [ch for ch in chapters if ch.status in ("pending", "error", "ready")]
if not pending:
return
max_concurrent = settings.AUDIOBOOK_PARSE_CONCURRENCY
semaphore = asyncio.Semaphore(max_concurrent)
logger.info(f"parse_all_chapters: project={project_id}, {len(pending)} chapters, concurrency={max_concurrent}")
async def parse_with_limit(chapter):
async with semaphore:
task_db = SessionLocal()
try:
db_user = crud.get_user_by_id(task_db, user.id)
await parse_one_chapter(project_id, chapter.id, db_user, task_db)
except Exception as e:
logger.error(f"parse_all_chapters: chapter {chapter.id} failed: {e}", exc_info=True)
finally:
task_db.close()
await asyncio.gather(*[parse_with_limit(ch) for ch in pending])
logger.info(f"parse_all_chapters: project={project_id} complete")
async def generate_all_chapters(project_id: int, user: User, db: Session) -> None:
"""Concurrently generate audio for all ready chapters using asyncio.Semaphore."""
from core.database import SessionLocal
chapters = crud.list_audiobook_chapters(db, project_id)
ready = [ch for ch in chapters if ch.status == "ready"]
if not ready:
return
crud.update_audiobook_project_status(db, project_id, "generating")
max_concurrent = settings.AUDIOBOOK_GENERATE_CONCURRENCY
semaphore = asyncio.Semaphore(max_concurrent)
logger.info(f"generate_all_chapters: project={project_id}, {len(ready)} chapters, concurrency={max_concurrent}")
async def generate_with_limit(chapter):
async with semaphore:
task_db = SessionLocal()
try:
db_user = crud.get_user_by_id(task_db, user.id)
await generate_project(project_id, db_user, task_db, chapter_index=chapter.chapter_index)
except Exception as e:
logger.error(f"generate_all_chapters: chapter {chapter.chapter_index} failed: {e}", exc_info=True)
finally:
task_db.close()
await asyncio.gather(*[generate_with_limit(ch) for ch in ready])
# Check final project status
final_db = SessionLocal()
try:
all_segs = crud.list_audiobook_segments(final_db, project_id)
all_done = all(s.status == "done" for s in all_segs) if all_segs else False
if all_done:
crud.update_audiobook_project_status(final_db, project_id, "done")
else:
crud.update_audiobook_project_status(final_db, project_id, "ready")
finally:
final_db.close()
logger.info(f"generate_all_chapters: project={project_id} complete")
async def process_all(project_id: int, user: User, db: Session) -> None:
"""Parse all pending chapters, then generate all ready chapters — both with concurrency."""
from core.database import SessionLocal
# Phase 1: parse all pending chapters concurrently
await parse_all_chapters(project_id, user, db)
# Phase 2: reload chapters and generate all ready ones concurrently
phase2_db = SessionLocal()
try:
await generate_all_chapters(project_id, user, phase2_db)
finally:
phase2_db.close()
logger.info(f"process_all: project={project_id} complete")