770 lines
22 KiB
Python
770 lines
22 KiB
Python
import json
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from db.models import User, Job, VoiceCache, SystemSettings, VoiceDesign, AudiobookProject, AudiobookChapter, AudiobookCharacter, AudiobookSegment, UsageLog
|
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]:
|
|
return db.query(User).filter(User.username == username).first()
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
def count_users(db: Session) -> int:
|
|
return db.query(User).count()
|
|
|
|
def create_user(db: Session, username: str, email: str, hashed_password: str) -> User:
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
hashed_password=hashed_password
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def create_user_by_admin(
|
|
db: Session,
|
|
username: str,
|
|
email: str,
|
|
hashed_password: str,
|
|
is_superuser: bool = False,
|
|
can_use_local_model: bool = False,
|
|
can_use_nsfw: bool = False
|
|
) -> User:
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
hashed_password=hashed_password,
|
|
is_superuser=is_superuser,
|
|
can_use_local_model=can_use_local_model,
|
|
can_use_nsfw=can_use_nsfw
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
|
|
return db.query(User).filter(User.id == user_id).first()
|
|
|
|
def list_users(db: Session, skip: int = 0, limit: int = 100) -> tuple[List[User], int]:
|
|
total = db.query(User).count()
|
|
users = db.query(User).order_by(User.created_at.desc()).offset(skip).limit(limit).all()
|
|
return users, total
|
|
|
|
def update_user(
|
|
db: Session,
|
|
user_id: int,
|
|
username: Optional[str] = None,
|
|
email: Optional[str] = None,
|
|
hashed_password: Optional[str] = None,
|
|
is_active: Optional[bool] = None,
|
|
is_superuser: Optional[bool] = None,
|
|
can_use_local_model: Optional[bool] = None,
|
|
can_use_nsfw: Optional[bool] = None
|
|
) -> Optional[User]:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
return None
|
|
|
|
if username is not None:
|
|
user.username = username
|
|
if email is not None:
|
|
user.email = email
|
|
if hashed_password is not None:
|
|
user.hashed_password = hashed_password
|
|
if is_active is not None:
|
|
user.is_active = is_active
|
|
if is_superuser is not None:
|
|
user.is_superuser = is_superuser
|
|
if can_use_local_model is not None:
|
|
user.can_use_local_model = can_use_local_model
|
|
if can_use_nsfw is not None:
|
|
user.can_use_nsfw = can_use_nsfw
|
|
|
|
user.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def delete_user(db: Session, user_id: int) -> bool:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
return False
|
|
db.delete(user)
|
|
db.commit()
|
|
return True
|
|
|
|
def change_user_password(
|
|
db: Session,
|
|
user_id: int,
|
|
new_hashed_password: str
|
|
) -> Optional[User]:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
return None
|
|
|
|
user.hashed_password = new_hashed_password
|
|
user.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def create_job(db: Session, user_id: int, job_type: str, input_data: Dict[str, Any]) -> Job:
|
|
job = Job(
|
|
user_id=user_id,
|
|
job_type=job_type,
|
|
input_data=json.dumps(input_data),
|
|
status="pending"
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
def get_job(db: Session, job_id: int, user_id: int) -> Optional[Job]:
|
|
return db.query(Job).filter(Job.id == job_id, Job.user_id == user_id).first()
|
|
|
|
def list_jobs(
|
|
db: Session,
|
|
user_id: int,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
status: Optional[str] = None
|
|
) -> List[Job]:
|
|
query = db.query(Job).filter(Job.user_id == user_id)
|
|
if status:
|
|
query = query.filter(Job.status == status)
|
|
return query.order_by(Job.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
def update_job_status(
|
|
db: Session,
|
|
job_id: int,
|
|
user_id: int,
|
|
status: str,
|
|
output_path: Optional[str] = None,
|
|
error_message: Optional[str] = None
|
|
) -> Optional[Job]:
|
|
job = get_job(db, job_id, user_id)
|
|
if not job:
|
|
return None
|
|
|
|
job.status = status
|
|
if output_path:
|
|
job.output_path = output_path
|
|
if error_message:
|
|
job.error_message = error_message
|
|
if status in ["completed", "failed"]:
|
|
job.completed_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
def delete_job(db: Session, job_id: int, user_id: int) -> bool:
|
|
job = get_job(db, job_id, user_id)
|
|
if not job:
|
|
return False
|
|
db.delete(job)
|
|
db.commit()
|
|
return True
|
|
|
|
def create_cache_entry(
|
|
db: Session,
|
|
user_id: int,
|
|
ref_audio_hash: str,
|
|
cache_path: str,
|
|
meta_data: Optional[Dict[str, Any]] = None
|
|
) -> VoiceCache:
|
|
cache = VoiceCache(
|
|
user_id=user_id,
|
|
ref_audio_hash=ref_audio_hash,
|
|
cache_path=cache_path,
|
|
meta_data=json.dumps(meta_data) if meta_data else None
|
|
)
|
|
db.add(cache)
|
|
db.commit()
|
|
db.refresh(cache)
|
|
return cache
|
|
|
|
def get_cache_entry(db: Session, user_id: int, ref_audio_hash: str) -> Optional[VoiceCache]:
|
|
cache = db.query(VoiceCache).filter(
|
|
VoiceCache.user_id == user_id,
|
|
VoiceCache.ref_audio_hash == ref_audio_hash
|
|
).first()
|
|
|
|
if cache:
|
|
cache.last_accessed = datetime.utcnow()
|
|
cache.access_count += 1
|
|
db.commit()
|
|
db.refresh(cache)
|
|
|
|
return cache
|
|
|
|
def list_cache_entries(
|
|
db: Session,
|
|
user_id: int,
|
|
skip: int = 0,
|
|
limit: int = 100
|
|
) -> List[VoiceCache]:
|
|
return db.query(VoiceCache).filter(
|
|
VoiceCache.user_id == user_id
|
|
).order_by(VoiceCache.last_accessed.desc()).offset(skip).limit(limit).all()
|
|
|
|
def delete_cache_entry(db: Session, cache_id: int, user_id: int) -> bool:
|
|
cache = db.query(VoiceCache).filter(
|
|
VoiceCache.id == cache_id,
|
|
VoiceCache.user_id == user_id
|
|
).first()
|
|
if not cache:
|
|
return False
|
|
db.delete(cache)
|
|
db.commit()
|
|
return True
|
|
|
|
def get_user_preferences(db: Session, user_id: int) -> dict:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user or not user.user_preferences:
|
|
return {"default_backend": "local", "onboarding_completed": False}
|
|
prefs = dict(user.user_preferences)
|
|
if prefs.get("default_backend") == "aliyun":
|
|
prefs["default_backend"] = "local"
|
|
return prefs
|
|
|
|
def update_user_preferences(db: Session, user_id: int, preferences: dict) -> Optional[User]:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
return None
|
|
user.user_preferences = preferences
|
|
user.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def get_system_setting(db: Session, key: str) -> Optional[dict]:
|
|
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
|
|
if not setting:
|
|
return None
|
|
return setting.value
|
|
|
|
def update_system_setting(db: Session, key: str, value: dict) -> SystemSettings:
|
|
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
|
|
if setting:
|
|
setting.value = value
|
|
setting.updated_at = datetime.utcnow()
|
|
else:
|
|
setting = SystemSettings(key=key, value=value, updated_at=datetime.utcnow())
|
|
db.add(setting)
|
|
db.commit()
|
|
db.refresh(setting)
|
|
return setting
|
|
|
|
def can_user_use_local_model(user: User) -> bool:
|
|
return True
|
|
|
|
def can_user_use_nsfw(user: User) -> bool:
|
|
return user.is_superuser or user.can_use_nsfw
|
|
|
|
def create_voice_design(
|
|
db: Session,
|
|
user_id: int,
|
|
name: str,
|
|
instruct: str,
|
|
meta_data: Optional[Dict[str, Any]] = None,
|
|
preview_text: Optional[str] = None,
|
|
voice_cache_id: Optional[int] = None,
|
|
ref_audio_path: Optional[str] = None,
|
|
ref_text: Optional[str] = None,
|
|
) -> VoiceDesign:
|
|
design = VoiceDesign(
|
|
user_id=user_id,
|
|
name=name,
|
|
instruct=instruct,
|
|
meta_data=meta_data,
|
|
preview_text=preview_text,
|
|
voice_cache_id=voice_cache_id,
|
|
ref_audio_path=ref_audio_path,
|
|
ref_text=ref_text,
|
|
created_at=datetime.utcnow(),
|
|
last_used=datetime.utcnow()
|
|
)
|
|
db.add(design)
|
|
db.commit()
|
|
db.refresh(design)
|
|
return design
|
|
|
|
def get_voice_design(db: Session, design_id: int, user_id: int) -> Optional[VoiceDesign]:
|
|
return db.query(VoiceDesign).filter(
|
|
VoiceDesign.id == design_id,
|
|
VoiceDesign.user_id == user_id,
|
|
VoiceDesign.is_active == True
|
|
).first()
|
|
|
|
def list_voice_designs(
|
|
db: Session,
|
|
user_id: int,
|
|
backend_type: Optional[str] = None,
|
|
skip: int = 0,
|
|
limit: int = 100
|
|
) -> List[VoiceDesign]:
|
|
query = db.query(VoiceDesign).filter(
|
|
VoiceDesign.user_id == user_id,
|
|
VoiceDesign.is_active == True
|
|
)
|
|
return query.order_by(VoiceDesign.last_used.desc()).offset(skip).limit(limit).all()
|
|
|
|
def count_voice_designs(
|
|
db: Session,
|
|
user_id: int,
|
|
backend_type: Optional[str] = None
|
|
) -> int:
|
|
return db.query(VoiceDesign).filter(
|
|
VoiceDesign.user_id == user_id,
|
|
VoiceDesign.is_active == True
|
|
).count()
|
|
|
|
def delete_voice_design(db: Session, design_id: int, user_id: int) -> bool:
|
|
design = get_voice_design(db, design_id, user_id)
|
|
if not design:
|
|
return False
|
|
db.delete(design)
|
|
db.commit()
|
|
return True
|
|
|
|
def update_voice_design_usage(db: Session, design_id: int, user_id: int) -> Optional[VoiceDesign]:
|
|
design = get_voice_design(db, design_id, user_id)
|
|
if design:
|
|
design.last_used = datetime.utcnow()
|
|
design.use_count += 1
|
|
db.commit()
|
|
db.refresh(design)
|
|
return design
|
|
|
|
|
|
def update_user_llm_config(
|
|
db: Session,
|
|
user_id: int,
|
|
llm_api_key: Optional[str] = None,
|
|
llm_base_url: Optional[str] = None,
|
|
llm_model: Optional[str] = None,
|
|
clear: bool = False
|
|
) -> Optional[User]:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
return None
|
|
if clear:
|
|
user.llm_api_key = None
|
|
user.llm_base_url = None
|
|
user.llm_model = None
|
|
else:
|
|
if llm_api_key is not None:
|
|
user.llm_api_key = llm_api_key
|
|
if llm_base_url is not None:
|
|
user.llm_base_url = llm_base_url
|
|
if llm_model is not None:
|
|
user.llm_model = llm_model
|
|
user.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def get_system_setting(db: Session, key: str):
|
|
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
|
|
return setting.value if setting else None
|
|
|
|
|
|
def set_system_setting(db: Session, key: str, value) -> SystemSettings:
|
|
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
|
|
if setting:
|
|
setting.value = value
|
|
setting.updated_at = datetime.utcnow()
|
|
else:
|
|
setting = SystemSettings(key=key, value=value)
|
|
db.add(setting)
|
|
db.commit()
|
|
db.refresh(setting)
|
|
return setting
|
|
|
|
|
|
def delete_system_setting(db: Session, key: str) -> bool:
|
|
setting = db.query(SystemSettings).filter(SystemSettings.key == key).first()
|
|
if setting:
|
|
db.delete(setting)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def create_audiobook_project(
|
|
db: Session,
|
|
user_id: int,
|
|
title: str,
|
|
source_type: str,
|
|
source_text: Optional[str] = None,
|
|
source_path: Optional[str] = None,
|
|
llm_model: Optional[str] = None,
|
|
script_config: Optional[Dict[str, Any]] = None,
|
|
) -> AudiobookProject:
|
|
project = AudiobookProject(
|
|
user_id=user_id,
|
|
title=title,
|
|
source_type=source_type,
|
|
source_text=source_text,
|
|
source_path=source_path,
|
|
llm_model=llm_model,
|
|
script_config=script_config,
|
|
status="pending",
|
|
)
|
|
db.add(project)
|
|
db.commit()
|
|
db.refresh(project)
|
|
return project
|
|
|
|
|
|
def get_audiobook_project(db: Session, project_id: int, user_id: int) -> Optional[AudiobookProject]:
|
|
return db.query(AudiobookProject).filter(
|
|
AudiobookProject.id == project_id,
|
|
AudiobookProject.user_id == user_id
|
|
).first()
|
|
|
|
|
|
def list_audiobook_projects(db: Session, user_id: int, skip: int = 0, limit: int = 50) -> List[AudiobookProject]:
|
|
return db.query(AudiobookProject).filter(
|
|
AudiobookProject.user_id == user_id
|
|
).order_by(AudiobookProject.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
|
|
def update_audiobook_project_status(
|
|
db: Session,
|
|
project_id: int,
|
|
status: str,
|
|
error_message: Optional[str] = None
|
|
) -> Optional[AudiobookProject]:
|
|
project = db.query(AudiobookProject).filter(AudiobookProject.id == project_id).first()
|
|
if not project:
|
|
return None
|
|
project.status = status
|
|
if error_message is not None:
|
|
project.error_message = error_message
|
|
project.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(project)
|
|
return project
|
|
|
|
|
|
def delete_audiobook_project(db: Session, project_id: int, user_id: int) -> bool:
|
|
project = get_audiobook_project(db, project_id, user_id)
|
|
if not project:
|
|
return False
|
|
db.delete(project)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def create_audiobook_chapter(
|
|
db: Session,
|
|
project_id: int,
|
|
chapter_index: int,
|
|
source_text: str,
|
|
title: Optional[str] = None,
|
|
) -> AudiobookChapter:
|
|
chapter = AudiobookChapter(
|
|
project_id=project_id,
|
|
chapter_index=chapter_index,
|
|
source_text=source_text,
|
|
title=title,
|
|
status="pending",
|
|
)
|
|
db.add(chapter)
|
|
db.commit()
|
|
db.refresh(chapter)
|
|
return chapter
|
|
|
|
|
|
def get_audiobook_chapter(db: Session, chapter_id: int) -> Optional[AudiobookChapter]:
|
|
return db.query(AudiobookChapter).filter(AudiobookChapter.id == chapter_id).first()
|
|
|
|
|
|
def get_audiobook_chapter_by_index(db: Session, project_id: int, chapter_index: int) -> Optional[AudiobookChapter]:
|
|
return db.query(AudiobookChapter).filter(
|
|
AudiobookChapter.project_id == project_id,
|
|
AudiobookChapter.chapter_index == chapter_index,
|
|
).first()
|
|
|
|
|
|
def list_audiobook_chapters(db: Session, project_id: int) -> List[AudiobookChapter]:
|
|
return db.query(AudiobookChapter).filter(
|
|
AudiobookChapter.project_id == project_id
|
|
).order_by(AudiobookChapter.chapter_index).all()
|
|
|
|
|
|
def update_audiobook_chapter_status(
|
|
db: Session,
|
|
chapter_id: int,
|
|
status: str,
|
|
error_message: Optional[str] = None,
|
|
) -> Optional[AudiobookChapter]:
|
|
chapter = db.query(AudiobookChapter).filter(AudiobookChapter.id == chapter_id).first()
|
|
if not chapter:
|
|
return None
|
|
chapter.status = status
|
|
if error_message is not None:
|
|
chapter.error_message = error_message
|
|
db.commit()
|
|
db.refresh(chapter)
|
|
return chapter
|
|
|
|
|
|
def delete_audiobook_chapters(db: Session, project_id: int) -> None:
|
|
db.query(AudiobookChapter).filter(AudiobookChapter.project_id == project_id).delete()
|
|
db.commit()
|
|
|
|
|
|
def delete_audiobook_segments_for_chapter(db: Session, project_id: int, chapter_index: int) -> None:
|
|
db.query(AudiobookSegment).filter(
|
|
AudiobookSegment.project_id == project_id,
|
|
AudiobookSegment.chapter_index == chapter_index,
|
|
).delete()
|
|
db.commit()
|
|
|
|
|
|
def create_audiobook_character(
|
|
db: Session,
|
|
project_id: int,
|
|
name: str,
|
|
gender: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
instruct: Optional[str] = None,
|
|
voice_design_id: Optional[int] = None,
|
|
) -> AudiobookCharacter:
|
|
char = AudiobookCharacter(
|
|
project_id=project_id,
|
|
name=name,
|
|
gender=gender,
|
|
description=description,
|
|
instruct=instruct,
|
|
voice_design_id=voice_design_id,
|
|
)
|
|
db.add(char)
|
|
db.commit()
|
|
db.refresh(char)
|
|
return char
|
|
|
|
|
|
def get_audiobook_character(db: Session, char_id: int) -> Optional[AudiobookCharacter]:
|
|
return db.query(AudiobookCharacter).filter(AudiobookCharacter.id == char_id).first()
|
|
|
|
|
|
def list_audiobook_characters(db: Session, project_id: int) -> List[AudiobookCharacter]:
|
|
return db.query(AudiobookCharacter).filter(
|
|
AudiobookCharacter.project_id == project_id
|
|
).all()
|
|
|
|
|
|
def update_audiobook_character_voice(
|
|
db: Session,
|
|
char_id: int,
|
|
voice_design_id: int
|
|
) -> Optional[AudiobookCharacter]:
|
|
char = db.query(AudiobookCharacter).filter(AudiobookCharacter.id == char_id).first()
|
|
if not char:
|
|
return None
|
|
char.voice_design_id = voice_design_id
|
|
db.commit()
|
|
db.refresh(char)
|
|
return char
|
|
|
|
|
|
def update_audiobook_character(
|
|
db: Session,
|
|
char_id: int,
|
|
name: Optional[str] = None,
|
|
gender: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
instruct: Optional[str] = None,
|
|
voice_design_id: Optional[int] = None,
|
|
) -> Optional[AudiobookCharacter]:
|
|
char = db.query(AudiobookCharacter).filter(AudiobookCharacter.id == char_id).first()
|
|
if not char:
|
|
return None
|
|
if name is not None:
|
|
char.name = name
|
|
if gender is not None:
|
|
char.gender = gender
|
|
if description is not None:
|
|
char.description = description
|
|
if instruct is not None:
|
|
char.instruct = instruct
|
|
if voice_design_id is not None:
|
|
char.voice_design_id = voice_design_id
|
|
db.commit()
|
|
db.refresh(char)
|
|
return char
|
|
|
|
|
|
def create_audiobook_segment(
|
|
db: Session,
|
|
project_id: int,
|
|
character_id: int,
|
|
text: str,
|
|
chapter_index: int = 0,
|
|
segment_index: int = 0,
|
|
emo_text: Optional[str] = None,
|
|
emo_alpha: Optional[float] = None,
|
|
) -> AudiobookSegment:
|
|
seg = AudiobookSegment(
|
|
project_id=project_id,
|
|
character_id=character_id,
|
|
text=text,
|
|
chapter_index=chapter_index,
|
|
segment_index=segment_index,
|
|
emo_text=emo_text or None,
|
|
emo_alpha=emo_alpha,
|
|
status="pending",
|
|
)
|
|
db.add(seg)
|
|
db.commit()
|
|
db.refresh(seg)
|
|
return seg
|
|
|
|
|
|
def list_audiobook_segments(
|
|
db: Session,
|
|
project_id: int,
|
|
chapter_index: Optional[int] = None
|
|
) -> List[AudiobookSegment]:
|
|
query = db.query(AudiobookSegment).filter(AudiobookSegment.project_id == project_id)
|
|
if chapter_index is not None:
|
|
query = query.filter(AudiobookSegment.chapter_index == chapter_index)
|
|
return query.order_by(AudiobookSegment.chapter_index, AudiobookSegment.segment_index).all()
|
|
|
|
|
|
def update_audiobook_segment_status(
|
|
db: Session,
|
|
segment_id: int,
|
|
status: str,
|
|
audio_path: Optional[str] = None
|
|
) -> Optional[AudiobookSegment]:
|
|
seg = db.query(AudiobookSegment).filter(AudiobookSegment.id == segment_id).first()
|
|
if not seg:
|
|
return None
|
|
seg.status = status
|
|
if audio_path is not None:
|
|
seg.audio_path = audio_path
|
|
db.commit()
|
|
db.refresh(seg)
|
|
return seg
|
|
|
|
|
|
def update_audiobook_segment(
|
|
db: Session,
|
|
segment_id: int,
|
|
text: str,
|
|
emo_text: Optional[str],
|
|
emo_alpha: Optional[float],
|
|
) -> Optional[AudiobookSegment]:
|
|
seg = db.query(AudiobookSegment).filter(AudiobookSegment.id == segment_id).first()
|
|
if not seg:
|
|
return None
|
|
seg.text = text
|
|
seg.emo_text = emo_text or None
|
|
seg.emo_alpha = emo_alpha
|
|
seg.status = "pending"
|
|
seg.audio_path = None
|
|
db.commit()
|
|
db.refresh(seg)
|
|
return seg
|
|
|
|
|
|
def delete_audiobook_segments(db: Session, project_id: int) -> None:
|
|
db.query(AudiobookSegment).filter(AudiobookSegment.project_id == project_id).delete()
|
|
db.commit()
|
|
|
|
|
|
def delete_audiobook_characters(db: Session, project_id: int) -> None:
|
|
db.query(AudiobookCharacter).filter(AudiobookCharacter.project_id == project_id).delete()
|
|
db.commit()
|
|
|
|
|
|
def create_usage_log(
|
|
db: Session,
|
|
user_id: int,
|
|
prompt_tokens: int,
|
|
completion_tokens: int,
|
|
model: Optional[str] = None,
|
|
context: Optional[str] = None,
|
|
) -> UsageLog:
|
|
log = UsageLog(
|
|
user_id=user_id,
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
model=model,
|
|
context=context,
|
|
)
|
|
db.add(log)
|
|
db.commit()
|
|
return log
|
|
|
|
|
|
def get_usage_stats(
|
|
db: Session,
|
|
date_from: Optional[datetime] = None,
|
|
date_to: Optional[datetime] = None,
|
|
) -> List[Dict]:
|
|
llm_query = db.query(
|
|
UsageLog.user_id,
|
|
func.sum(UsageLog.prompt_tokens).label("prompt_tokens"),
|
|
func.sum(UsageLog.completion_tokens).label("completion_tokens"),
|
|
).group_by(UsageLog.user_id)
|
|
if date_from:
|
|
llm_query = llm_query.filter(UsageLog.created_at >= date_from)
|
|
if date_to:
|
|
llm_query = llm_query.filter(UsageLog.created_at <= date_to)
|
|
llm_rows = llm_query.all()
|
|
llm_map: Dict[int, Dict] = {
|
|
r.user_id: {"prompt_tokens": r.prompt_tokens or 0, "completion_tokens": r.completion_tokens or 0}
|
|
for r in llm_rows
|
|
}
|
|
|
|
tts_query = db.query(
|
|
Job.user_id,
|
|
Job.backend_type,
|
|
func.count(Job.id).label("job_count"),
|
|
func.sum(func.coalesce(func.length(Job.input_data), 0)).label("char_count"),
|
|
).filter(Job.status == "completed").group_by(Job.user_id, Job.backend_type)
|
|
if date_from:
|
|
tts_query = tts_query.filter(Job.created_at >= date_from)
|
|
if date_to:
|
|
tts_query = tts_query.filter(Job.created_at <= date_to)
|
|
tts_rows = tts_query.all()
|
|
tts_map: Dict[int, List] = {}
|
|
for r in tts_rows:
|
|
tts_map.setdefault(r.user_id, []).append({
|
|
"backend_type": r.backend_type,
|
|
"job_count": r.job_count,
|
|
"char_count": r.char_count or 0,
|
|
})
|
|
|
|
user_ids = set(llm_map.keys()) | set(tts_map.keys())
|
|
users = db.query(User).filter(User.id.in_(user_ids)).all() if user_ids else []
|
|
user_info = {u.id: {"username": u.username, "email": u.email} for u in users}
|
|
|
|
result = []
|
|
for uid in sorted(user_ids):
|
|
info = user_info.get(uid, {"username": f"user_{uid}", "email": ""})
|
|
llm = llm_map.get(uid, {"prompt_tokens": 0, "completion_tokens": 0})
|
|
result.append({
|
|
"user_id": uid,
|
|
"username": info["username"],
|
|
"email": info["email"],
|
|
"llm_prompt_tokens": llm["prompt_tokens"],
|
|
"llm_completion_tokens": llm["completion_tokens"],
|
|
"tts_backends": tts_map.get(uid, []),
|
|
})
|
|
return result
|