feat: Implement batch cancellation for audiobook processing with enhanced frontend progress display.
This commit is contained in:
@@ -14,6 +14,17 @@ from db.models import AudiobookProject, AudiobookCharacter, User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cancellation events for batch operations, keyed by project_id
|
||||
_cancel_events: dict[int, asyncio.Event] = {}
|
||||
|
||||
|
||||
def cancel_batch(project_id: int) -> None:
|
||||
"""Signal cancellation for any running batch operation on this project."""
|
||||
ev = _cancel_events.get(project_id)
|
||||
if ev:
|
||||
ev.set()
|
||||
logger.info(f"cancel_batch: project={project_id} cancellation signalled")
|
||||
|
||||
|
||||
def _get_llm_service(user: User) -> LLMService:
|
||||
from core.security import decrypt_api_key
|
||||
@@ -570,12 +581,19 @@ async def parse_all_chapters(project_id: int, user: User, db: Session) -> None:
|
||||
if not pending:
|
||||
return
|
||||
|
||||
cancel_ev = asyncio.Event()
|
||||
_cancel_events[project_id] = cancel_ev
|
||||
|
||||
max_concurrent = settings.AUDIOBOOK_PARSE_CONCURRENCY
|
||||
semaphore = asyncio.Semaphore(max_concurrent)
|
||||
logger.info(f"parse_all_chapters: project={project_id}, {len(pending)} chapters, concurrency={max_concurrent}")
|
||||
|
||||
async def parse_with_limit(chapter):
|
||||
if cancel_ev.is_set():
|
||||
return
|
||||
async with semaphore:
|
||||
if cancel_ev.is_set():
|
||||
return
|
||||
task_db = SessionLocal()
|
||||
try:
|
||||
db_user = crud.get_user_by_id(task_db, user.id)
|
||||
@@ -586,7 +604,8 @@ async def parse_all_chapters(project_id: int, user: User, db: Session) -> None:
|
||||
task_db.close()
|
||||
|
||||
await asyncio.gather(*[parse_with_limit(ch) for ch in pending])
|
||||
logger.info(f"parse_all_chapters: project={project_id} complete")
|
||||
_cancel_events.pop(project_id, None)
|
||||
logger.info(f"parse_all_chapters: project={project_id} {'cancelled' if cancel_ev.is_set() else 'complete'}")
|
||||
|
||||
|
||||
async def generate_all_chapters(project_id: int, user: User, db: Session) -> None:
|
||||
@@ -598,6 +617,11 @@ async def generate_all_chapters(project_id: int, user: User, db: Session) -> Non
|
||||
if not ready:
|
||||
return
|
||||
|
||||
cancel_ev = _cancel_events.get(project_id)
|
||||
if not cancel_ev:
|
||||
cancel_ev = asyncio.Event()
|
||||
_cancel_events[project_id] = cancel_ev
|
||||
|
||||
crud.update_audiobook_project_status(db, project_id, "generating")
|
||||
|
||||
max_concurrent = settings.AUDIOBOOK_GENERATE_CONCURRENCY
|
||||
@@ -605,7 +629,11 @@ async def generate_all_chapters(project_id: int, user: User, db: Session) -> Non
|
||||
logger.info(f"generate_all_chapters: project={project_id}, {len(ready)} chapters, concurrency={max_concurrent}")
|
||||
|
||||
async def generate_with_limit(chapter):
|
||||
if cancel_ev.is_set():
|
||||
return
|
||||
async with semaphore:
|
||||
if cancel_ev.is_set():
|
||||
return
|
||||
task_db = SessionLocal()
|
||||
try:
|
||||
db_user = crud.get_user_by_id(task_db, user.id)
|
||||
@@ -629,7 +657,8 @@ async def generate_all_chapters(project_id: int, user: User, db: Session) -> Non
|
||||
finally:
|
||||
final_db.close()
|
||||
|
||||
logger.info(f"generate_all_chapters: project={project_id} complete")
|
||||
_cancel_events.pop(project_id, None)
|
||||
logger.info(f"generate_all_chapters: project={project_id} {'cancelled' if cancel_ev.is_set() else 'complete'}")
|
||||
|
||||
|
||||
async def process_all(project_id: int, user: User, db: Session) -> None:
|
||||
|
||||
Reference in New Issue
Block a user