indextts2-api / app.py
ataberkkilavuzcu's picture
Update app.py
e84f64a verified
raw
history blame
6.84 kB
import base64
import os
import sys
import tempfile
import uuid
from io import StringIO
from pathlib import Path
from typing import Optional
import requests
import torch
import torchaudio
from torchaudio.transforms import Resample
from fastapi import BackgroundTasks, Body, FastAPI, Header, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel, Field, HttpUrl
SPACE_API_KEY = os.getenv("SPACE_API_KEY")
HF_TOKEN = (
os.getenv("HUGGING_FACE_HUB_TOKEN")
or os.getenv("HUGGINGFACEHUB_API_TOKEN")
or os.getenv("HF_TOKEN")
)
MODEL_NAME = "tts_models/multilingual/multi-dataset/xtts_v2"
MAX_TEXT_LENGTH = 1000
DEFAULT_LANGUAGE = "en"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# Set token in environment before importing TTS
if HF_TOKEN:
os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
os.environ["HF_TOKEN"] = HF_TOKEN
# Also login explicitly via huggingface_hub
try:
from huggingface_hub import login
login(token=HF_TOKEN, add_to_git_credential=False)
except ImportError:
pass # huggingface_hub might not be installed, that's okay
# Mock stdin to automatically accept TTS Terms of Service
# This prevents the interactive prompt that causes EOFError in containers
_original_stdin = sys.stdin
sys.stdin = StringIO("y\n") # Auto-accept TOS
from TTS.api import TTS
try:
tts_model = TTS(MODEL_NAME, gpu=DEVICE == "cuda", progress_bar=False)
except Exception as exc: # pragma: no cover
hint = ""
if "EOF when reading a line" in str(exc):
hint = " Hint: set HUGGING_FACE_HUB_TOKEN to a Hugging Face token that has accepted the XTTS v2 license."
raise RuntimeError(f"Failed to load XTTS v2 model: {exc}.{hint}") from exc
finally:
# Restore stdin after model loading (TOS check happens during model load)
sys.stdin = _original_stdin
app = FastAPI(title="xtts-v2-api", version="1.0.0")
class GenerateRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=MAX_TEXT_LENGTH)
speaker_wav: str = Field(..., description="HTTPS URL or base64-encoded audio")
language: Optional[str] = Field(DEFAULT_LANGUAGE, description="ISO code, default en")
def _require_api_key(x_api_key: Optional[str]):
if not SPACE_API_KEY:
return
if x_api_key != SPACE_API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
def _write_temp_audio_from_url(url: HttpUrl) -> str:
response = requests.get(url, stream=True, timeout=30)
if response.status_code >= 400:
raise HTTPException(status_code=400, detail=f"Could not fetch speaker audio: {response.status_code}")
suffix = Path(url.path).suffix or ".wav"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmp.write(chunk)
return tmp.name
def _write_temp_audio_from_base64(payload: str) -> str:
try:
raw = base64.b64decode(payload)
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=400, detail="Invalid base64 speaker_wav") from exc
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
tmp.write(raw)
return tmp.name
def _temp_speaker_file(speaker_wav: str) -> str:
if speaker_wav.startswith("http://") or speaker_wav.startswith("https://"):
return _write_temp_audio_from_url(HttpUrl(speaker_wav))
return _write_temp_audio_from_base64(speaker_wav)
def _preprocess_audio_wav(path: str, target_sr: int = 24000, target_peak: float = 0.98) -> str:
"""
Light preprocessing to stabilize embeddings and output quality:
- convert to mono
- resample to target_sr
- peak-normalize to target_peak (avoid clipping)
"""
wav, sr = torchaudio.load(path)
# Mono
if wav.shape[0] > 1:
wav = wav.mean(dim=0, keepdim=True)
# Resample if needed
if sr != target_sr:
resampler = Resample(orig_freq=sr, new_freq=target_sr)
wav = resampler(wav)
sr = target_sr
# Peak normalize
peak = wav.abs().max().item() if wav.numel() else 0.0
if peak > 0:
scale = min(target_peak / peak, 1.0)
wav = wav * scale
# Overwrite input file to avoid extra temp files
torchaudio.save(path, wav, sr, bits_per_sample=16)
return path
@app.post("/health")
def health(x_api_key: Optional[str] = Header(default=None)):
_require_api_key(x_api_key)
return {"status": "ok", "model": "xtts_v2", "device": DEVICE}
def _cleanup_files(*files: str):
"""Background task to clean up temporary files after response is sent."""
for file_path in files:
if file_path and Path(file_path).exists():
try:
Path(file_path).unlink(missing_ok=True)
except Exception:
pass # Ignore cleanup errors
@app.post("/generate")
def generate(
payload: GenerateRequest = Body(...),
background_tasks: BackgroundTasks = BackgroundTasks(),
x_api_key: Optional[str] = Header(default=None),
):
_require_api_key(x_api_key)
speaker_file = None
output_file = None
try:
speaker_file = _temp_speaker_file(payload.speaker_wav)
speaker_file = _preprocess_audio_wav(speaker_file)
output_file = os.path.join(tempfile.gettempdir(), f"xtts-{uuid.uuid4()}.wav")
tts_model.tts_to_file(
text=payload.text,
file_path=output_file,
speaker_wav=speaker_file,
language=payload.language or DEFAULT_LANGUAGE,
split_sentences=True,
)
# Light post-process to avoid end-of-file artifacts
output_file = _preprocess_audio_wav(output_file)
# Verify the output file was created
if not Path(output_file).exists():
raise RuntimeError(f"TTS generation failed: output file was not created at {output_file}")
# Schedule cleanup after response is sent
background_tasks.add_task(_cleanup_files, speaker_file, output_file)
return FileResponse(output_file, media_type="audio/wav", filename="output.wav")
except HTTPException:
# Clean up on HTTPException
if speaker_file and Path(speaker_file).exists():
Path(speaker_file).unlink(missing_ok=True)
raise
except Exception as exc: # pragma: no cover
# Clean up on error
if speaker_file and Path(speaker_file).exists():
Path(speaker_file).unlink(missing_ok=True)
if output_file and Path(output_file).exists():
Path(output_file).unlink(missing_ok=True)
return JSONResponse(status_code=500, content={"error": str(exc)})
@app.get("/")
def root():
return {"name": "xtts-v2-api", "endpoints": ["/health", "/generate"]}