287 lines
11 KiB
Python
287 lines
11 KiB
Python
import asyncio
|
|
import functools
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
from typing import Tuple, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TTSBackend(ABC):
|
|
@abstractmethod
|
|
async def generate_custom_voice(self, params: dict) -> Tuple[bytes, int]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def generate_voice_design(self, params: dict) -> Tuple[bytes, int]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def generate_voice_clone(self, params: dict, ref_audio_bytes: bytes) -> Tuple[bytes, int]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def health_check(self) -> dict:
|
|
pass
|
|
|
|
|
|
class LocalTTSBackend(TTSBackend):
|
|
def __init__(self):
|
|
self.model_manager = None
|
|
# Add a lock to prevent concurrent VRAM contention and CUDA errors on local GPU models
|
|
self._gpu_lock = asyncio.Lock()
|
|
|
|
async def initialize(self):
|
|
from core.model_manager import ModelManager
|
|
self.model_manager = await ModelManager.get_instance()
|
|
|
|
async def generate_custom_voice(self, params: dict) -> Tuple[bytes, int]:
|
|
await self.model_manager.load_model("custom-voice")
|
|
_, tts = await self.model_manager.get_current_model()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
async with self._gpu_lock:
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
functools.partial(
|
|
tts.generate_custom_voice,
|
|
text=params['text'],
|
|
language=params['language'],
|
|
speaker=params['speaker'],
|
|
instruct=params.get('instruct', ''),
|
|
max_new_tokens=params['max_new_tokens'],
|
|
temperature=params['temperature'],
|
|
top_k=params['top_k'],
|
|
top_p=params['top_p'],
|
|
repetition_penalty=params['repetition_penalty'],
|
|
)
|
|
)
|
|
|
|
import numpy as np
|
|
wavs, sample_rate = result if isinstance(result, tuple) else (result, 24000)
|
|
audio_data = wavs[0] if isinstance(wavs, list) else wavs
|
|
return self._numpy_to_bytes(audio_data), sample_rate
|
|
|
|
async def generate_voice_design(self, params: dict) -> Tuple[bytes, int]:
|
|
await self.model_manager.load_model("voice-design")
|
|
_, tts = await self.model_manager.get_current_model()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
async with self._gpu_lock:
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
functools.partial(
|
|
tts.generate_voice_design,
|
|
text=params['text'],
|
|
language=params['language'],
|
|
instruct=params['instruct'],
|
|
max_new_tokens=params['max_new_tokens'],
|
|
temperature=params['temperature'],
|
|
top_k=params['top_k'],
|
|
top_p=params['top_p'],
|
|
repetition_penalty=params['repetition_penalty'],
|
|
)
|
|
)
|
|
|
|
import numpy as np
|
|
wavs, sample_rate = result if isinstance(result, tuple) else (result, 24000)
|
|
audio_data = wavs[0] if isinstance(wavs, list) else wavs
|
|
return self._numpy_to_bytes(audio_data), sample_rate
|
|
|
|
async def generate_voice_clone(self, params: dict, ref_audio_bytes: bytes = None, x_vector=None) -> Tuple[bytes, int]:
|
|
from utils.audio import process_ref_audio
|
|
|
|
await self.model_manager.load_model("base")
|
|
_, tts = await self.model_manager.get_current_model()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
async with self._gpu_lock:
|
|
if x_vector is None:
|
|
if ref_audio_bytes is None:
|
|
raise ValueError("Either ref_audio_bytes or x_vector must be provided")
|
|
|
|
ref_audio_array, ref_sr = process_ref_audio(ref_audio_bytes)
|
|
|
|
x_vector = await loop.run_in_executor(
|
|
None,
|
|
functools.partial(
|
|
tts.create_voice_clone_prompt,
|
|
ref_audio=(ref_audio_array, ref_sr),
|
|
ref_text=params.get('ref_text', ''),
|
|
x_vector_only_mode=False,
|
|
)
|
|
)
|
|
|
|
wavs, sample_rate = await loop.run_in_executor(
|
|
None,
|
|
functools.partial(
|
|
tts.generate_voice_clone,
|
|
text=params['text'],
|
|
language=params['language'],
|
|
voice_clone_prompt=x_vector,
|
|
max_new_tokens=params['max_new_tokens'],
|
|
temperature=params['temperature'],
|
|
top_k=params['top_k'],
|
|
top_p=params['top_p'],
|
|
repetition_penalty=params['repetition_penalty'],
|
|
)
|
|
)
|
|
|
|
import numpy as np
|
|
audio_data = wavs[0] if isinstance(wavs, list) else wavs
|
|
if isinstance(audio_data, list):
|
|
audio_data = np.array(audio_data)
|
|
return self._numpy_to_bytes(audio_data), sample_rate
|
|
|
|
async def health_check(self) -> dict:
|
|
return {
|
|
"available": self.model_manager is not None,
|
|
"current_model": self.model_manager.current_model_name if self.model_manager else None
|
|
}
|
|
|
|
@staticmethod
|
|
def _numpy_to_bytes(audio_array) -> bytes:
|
|
import numpy as np
|
|
import io
|
|
import wave
|
|
|
|
if isinstance(audio_array, list):
|
|
audio_array = np.array(audio_array)
|
|
|
|
audio_array = np.clip(audio_array, -1.0, 1.0)
|
|
audio_int16 = (audio_array * 32767).astype(np.int16)
|
|
|
|
buffer = io.BytesIO()
|
|
with wave.open(buffer, 'wb') as wav_file:
|
|
wav_file.setnchannels(1)
|
|
wav_file.setsampwidth(2)
|
|
wav_file.setframerate(24000)
|
|
wav_file.writeframes(audio_int16.tobytes())
|
|
|
|
buffer.seek(0)
|
|
return buffer.read()
|
|
|
|
|
|
class IndexTTS2Backend:
|
|
_gpu_lock = asyncio.Lock()
|
|
|
|
# Level 10 = these raw weights. Scale linearly: level N → N/10 * max
|
|
EMO_LEVEL_MAX: dict[str, float] = {
|
|
"开心": 0.75, "happy": 0.75,
|
|
"愤怒": 0.08, "angry": 0.08,
|
|
"悲伤": 0.90, "sad": 0.90,
|
|
"恐惧": 0.10, "fear": 0.10,
|
|
"厌恶": 0.50, "hate": 0.50,
|
|
"低沉": 0.35, "low": 0.35,
|
|
"惊讶": 0.35, "surprise": 0.35,
|
|
}
|
|
|
|
# Emotion keyword → index mapping
|
|
# Order: [happy, angry, sad, fear, hate, low, surprise, neutral]
|
|
_EMO_KEYWORDS = [
|
|
['喜', '开心', '快乐', '高兴', '欢乐', '愉快', 'happy', '热情', '兴奋', '愉悦', '激动'],
|
|
['怒', '愤怒', '生气', '恼', 'angry', '气愤', '愤慨'],
|
|
['哀', '悲伤', '难过', '忧郁', '伤心', '悲', 'sad', '感慨', '沉重', '沉痛', '哭'],
|
|
['惧', '恐惧', '害怕', '恐', 'fear', '担心', '紧张'],
|
|
['厌恶', '厌', 'hate', '讨厌', '反感'],
|
|
['低落', '沮丧', '消沉', 'low', '抑郁', '颓废'],
|
|
['惊喜', '惊讶', '意外', 'surprise', '惊', '吃惊', '震惊'],
|
|
]
|
|
|
|
@staticmethod
|
|
def _emo_text_to_vector(emo_text: str) -> Optional[list]:
|
|
tokens = [t.strip() for t in emo_text.split('+') if t.strip()]
|
|
matched = []
|
|
for tok in tokens:
|
|
if ':' in tok:
|
|
name_part, w_str = tok.rsplit(':', 1)
|
|
try:
|
|
weight: Optional[float] = float(w_str)
|
|
except ValueError:
|
|
weight = None
|
|
else:
|
|
name_part = tok
|
|
weight = None
|
|
name_lower = name_part.lower().strip()
|
|
for idx, words in enumerate(IndexTTS2Backend._EMO_KEYWORDS):
|
|
for word in words:
|
|
if word in name_lower:
|
|
matched.append((idx, weight))
|
|
break
|
|
if not matched:
|
|
return None
|
|
vec = [0.0] * 8
|
|
has_explicit = any(w is not None for _, w in matched)
|
|
if has_explicit:
|
|
for idx, w in matched:
|
|
vec[idx] = w if w is not None else 0.5
|
|
else:
|
|
score = 0.8 if len(matched) == 1 else 0.5
|
|
for idx, _ in matched:
|
|
vec[idx] = 0.2 if idx == 1 else score
|
|
return vec
|
|
|
|
async def generate(
|
|
self,
|
|
text: str,
|
|
spk_audio_prompt: str,
|
|
output_path: str,
|
|
emo_text: str = None,
|
|
emo_alpha: float = 0.6,
|
|
) -> bytes:
|
|
from core.model_manager import IndexTTS2ModelManager
|
|
manager = await IndexTTS2ModelManager.get_instance()
|
|
tts = await manager.get_model()
|
|
loop = asyncio.get_event_loop()
|
|
|
|
emo_vector = None
|
|
if emo_text and len(emo_text.strip()) > 0:
|
|
resolved_emo_text = emo_text
|
|
resolved_emo_alpha = emo_alpha
|
|
if emo_alpha is not None and emo_alpha > 1:
|
|
level = min(10, max(1, round(emo_alpha)))
|
|
name = emo_text.strip()
|
|
max_val = self.EMO_LEVEL_MAX.get(name)
|
|
if max_val is None:
|
|
name_lower = name.lower()
|
|
for key, val in self.EMO_LEVEL_MAX.items():
|
|
if key in name_lower or name_lower in key:
|
|
max_val = val
|
|
break
|
|
if max_val is None:
|
|
max_val = 0.20
|
|
weight = round(level / 10 * max_val, 4)
|
|
resolved_emo_text = f"{name}:{weight}"
|
|
resolved_emo_alpha = 1.0
|
|
raw_vector = self._emo_text_to_vector(resolved_emo_text)
|
|
if raw_vector is not None:
|
|
emo_vector = [v * resolved_emo_alpha for v in raw_vector]
|
|
logger.info(f"IndexTTS2 emo_text={repr(emo_text)} emo_alpha={emo_alpha} → resolved={repr(resolved_emo_text)} emo_vector={emo_vector}")
|
|
|
|
async with IndexTTS2Backend._gpu_lock:
|
|
await loop.run_in_executor(
|
|
None,
|
|
functools.partial(
|
|
tts.infer,
|
|
spk_audio_prompt=spk_audio_prompt,
|
|
text=text,
|
|
output_path=output_path,
|
|
emo_vector=emo_vector,
|
|
emo_alpha=1.0,
|
|
)
|
|
)
|
|
with open(output_path, 'rb') as f:
|
|
return f.read()
|
|
|
|
|
|
class TTSServiceFactory:
|
|
_local_backend: Optional[LocalTTSBackend] = None
|
|
|
|
@classmethod
|
|
async def get_backend(cls, backend_type: str = None, user_api_key: Optional[str] = None) -> TTSBackend:
|
|
if cls._local_backend is None:
|
|
cls._local_backend = LocalTTSBackend()
|
|
await cls._local_backend.initialize()
|
|
return cls._local_backend
|