Files
Canto/canto-backend/core/tts_service.py
bdim404 2fa9c1fcb6 refactor: rename backend/frontend dirs and remove NovelWriter submodule
- Rename qwen3-tts-backend → canto-backend
- Rename qwen3-tts-frontend → canto-frontend
- Remove NovelWriter embedded repo

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 18:03:29 +08:00

287 lines
11 KiB
Python

import asyncio
import functools
import logging
from abc import ABC, abstractmethod
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
class TTSBackend(ABC):
@abstractmethod
async def generate_custom_voice(self, params: dict) -> Tuple[bytes, int]:
pass
@abstractmethod
async def generate_voice_design(self, params: dict) -> Tuple[bytes, int]:
pass
@abstractmethod
async def generate_voice_clone(self, params: dict, ref_audio_bytes: bytes) -> Tuple[bytes, int]:
pass
@abstractmethod
async def health_check(self) -> dict:
pass
class LocalTTSBackend(TTSBackend):
def __init__(self):
self.model_manager = None
# Add a lock to prevent concurrent VRAM contention and CUDA errors on local GPU models
self._gpu_lock = asyncio.Lock()
async def initialize(self):
from core.model_manager import ModelManager
self.model_manager = await ModelManager.get_instance()
async def generate_custom_voice(self, params: dict) -> Tuple[bytes, int]:
await self.model_manager.load_model("custom-voice")
_, tts = await self.model_manager.get_current_model()
loop = asyncio.get_event_loop()
async with self._gpu_lock:
result = await loop.run_in_executor(
None,
functools.partial(
tts.generate_custom_voice,
text=params['text'],
language=params['language'],
speaker=params['speaker'],
instruct=params.get('instruct', ''),
max_new_tokens=params['max_new_tokens'],
temperature=params['temperature'],
top_k=params['top_k'],
top_p=params['top_p'],
repetition_penalty=params['repetition_penalty'],
)
)
import numpy as np
wavs, sample_rate = result if isinstance(result, tuple) else (result, 24000)
audio_data = wavs[0] if isinstance(wavs, list) else wavs
return self._numpy_to_bytes(audio_data), sample_rate
async def generate_voice_design(self, params: dict) -> Tuple[bytes, int]:
await self.model_manager.load_model("voice-design")
_, tts = await self.model_manager.get_current_model()
loop = asyncio.get_event_loop()
async with self._gpu_lock:
result = await loop.run_in_executor(
None,
functools.partial(
tts.generate_voice_design,
text=params['text'],
language=params['language'],
instruct=params['instruct'],
max_new_tokens=params['max_new_tokens'],
temperature=params['temperature'],
top_k=params['top_k'],
top_p=params['top_p'],
repetition_penalty=params['repetition_penalty'],
)
)
import numpy as np
wavs, sample_rate = result if isinstance(result, tuple) else (result, 24000)
audio_data = wavs[0] if isinstance(wavs, list) else wavs
return self._numpy_to_bytes(audio_data), sample_rate
async def generate_voice_clone(self, params: dict, ref_audio_bytes: bytes = None, x_vector=None) -> Tuple[bytes, int]:
from utils.audio import process_ref_audio
await self.model_manager.load_model("base")
_, tts = await self.model_manager.get_current_model()
loop = asyncio.get_event_loop()
async with self._gpu_lock:
if x_vector is None:
if ref_audio_bytes is None:
raise ValueError("Either ref_audio_bytes or x_vector must be provided")
ref_audio_array, ref_sr = process_ref_audio(ref_audio_bytes)
x_vector = await loop.run_in_executor(
None,
functools.partial(
tts.create_voice_clone_prompt,
ref_audio=(ref_audio_array, ref_sr),
ref_text=params.get('ref_text', ''),
x_vector_only_mode=False,
)
)
wavs, sample_rate = await loop.run_in_executor(
None,
functools.partial(
tts.generate_voice_clone,
text=params['text'],
language=params['language'],
voice_clone_prompt=x_vector,
max_new_tokens=params['max_new_tokens'],
temperature=params['temperature'],
top_k=params['top_k'],
top_p=params['top_p'],
repetition_penalty=params['repetition_penalty'],
)
)
import numpy as np
audio_data = wavs[0] if isinstance(wavs, list) else wavs
if isinstance(audio_data, list):
audio_data = np.array(audio_data)
return self._numpy_to_bytes(audio_data), sample_rate
async def health_check(self) -> dict:
return {
"available": self.model_manager is not None,
"current_model": self.model_manager.current_model_name if self.model_manager else None
}
@staticmethod
def _numpy_to_bytes(audio_array) -> bytes:
import numpy as np
import io
import wave
if isinstance(audio_array, list):
audio_array = np.array(audio_array)
audio_array = np.clip(audio_array, -1.0, 1.0)
audio_int16 = (audio_array * 32767).astype(np.int16)
buffer = io.BytesIO()
with wave.open(buffer, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(24000)
wav_file.writeframes(audio_int16.tobytes())
buffer.seek(0)
return buffer.read()
class IndexTTS2Backend:
_gpu_lock = asyncio.Lock()
# Level 10 = these raw weights. Scale linearly: level N → N/10 * max
EMO_LEVEL_MAX: dict[str, float] = {
"开心": 0.75, "happy": 0.75,
"愤怒": 0.08, "angry": 0.08,
"悲伤": 0.90, "sad": 0.90,
"恐惧": 0.10, "fear": 0.10,
"厌恶": 0.50, "hate": 0.50,
"低沉": 0.35, "low": 0.35,
"惊讶": 0.35, "surprise": 0.35,
}
# Emotion keyword → index mapping
# Order: [happy, angry, sad, fear, hate, low, surprise, neutral]
_EMO_KEYWORDS = [
['', '开心', '快乐', '高兴', '欢乐', '愉快', 'happy', '热情', '兴奋', '愉悦', '激动'],
['', '愤怒', '生气', '', 'angry', '气愤', '愤慨'],
['', '悲伤', '难过', '忧郁', '伤心', '', 'sad', '感慨', '沉重', '沉痛', ''],
['', '恐惧', '害怕', '', 'fear', '担心', '紧张'],
['厌恶', '', 'hate', '讨厌', '反感'],
['低落', '沮丧', '消沉', 'low', '抑郁', '颓废'],
['惊喜', '惊讶', '意外', 'surprise', '', '吃惊', '震惊'],
]
@staticmethod
def _emo_text_to_vector(emo_text: str) -> Optional[list]:
tokens = [t.strip() for t in emo_text.split('+') if t.strip()]
matched = []
for tok in tokens:
if ':' in tok:
name_part, w_str = tok.rsplit(':', 1)
try:
weight: Optional[float] = float(w_str)
except ValueError:
weight = None
else:
name_part = tok
weight = None
name_lower = name_part.lower().strip()
for idx, words in enumerate(IndexTTS2Backend._EMO_KEYWORDS):
for word in words:
if word in name_lower:
matched.append((idx, weight))
break
if not matched:
return None
vec = [0.0] * 8
has_explicit = any(w is not None for _, w in matched)
if has_explicit:
for idx, w in matched:
vec[idx] = w if w is not None else 0.5
else:
score = 0.8 if len(matched) == 1 else 0.5
for idx, _ in matched:
vec[idx] = 0.2 if idx == 1 else score
return vec
async def generate(
self,
text: str,
spk_audio_prompt: str,
output_path: str,
emo_text: str = None,
emo_alpha: float = 0.6,
) -> bytes:
from core.model_manager import IndexTTS2ModelManager
manager = await IndexTTS2ModelManager.get_instance()
tts = await manager.get_model()
loop = asyncio.get_event_loop()
emo_vector = None
if emo_text and len(emo_text.strip()) > 0:
resolved_emo_text = emo_text
resolved_emo_alpha = emo_alpha
if emo_alpha is not None and emo_alpha > 1:
level = min(10, max(1, round(emo_alpha)))
name = emo_text.strip()
max_val = self.EMO_LEVEL_MAX.get(name)
if max_val is None:
name_lower = name.lower()
for key, val in self.EMO_LEVEL_MAX.items():
if key in name_lower or name_lower in key:
max_val = val
break
if max_val is None:
max_val = 0.20
weight = round(level / 10 * max_val, 4)
resolved_emo_text = f"{name}:{weight}"
resolved_emo_alpha = 1.0
raw_vector = self._emo_text_to_vector(resolved_emo_text)
if raw_vector is not None:
emo_vector = [v * resolved_emo_alpha for v in raw_vector]
logger.info(f"IndexTTS2 emo_text={repr(emo_text)} emo_alpha={emo_alpha} → resolved={repr(resolved_emo_text)} emo_vector={emo_vector}")
async with IndexTTS2Backend._gpu_lock:
await loop.run_in_executor(
None,
functools.partial(
tts.infer,
spk_audio_prompt=spk_audio_prompt,
text=text,
output_path=output_path,
emo_vector=emo_vector,
emo_alpha=1.0,
)
)
with open(output_path, 'rb') as f:
return f.read()
class TTSServiceFactory:
_local_backend: Optional[LocalTTSBackend] = None
@classmethod
async def get_backend(cls, backend_type: str = None, user_api_key: Optional[str] = None) -> TTSBackend:
if cls._local_backend is None:
cls._local_backend = LocalTTSBackend()
await cls._local_backend.initialize()
return cls._local_backend