Files
miku-discord/soprano_to_rvc/soprano_rvc_api.py

1868 lines
77 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Soprano + RVC Streaming API
Integrated pipeline: Soprano TTS RVC Voice Conversion HTTP Stream
Closely follows gui_v1.py logic for buffer management and processing
"""
import os
import sys
import json
import struct
import asyncio
import threading
import time
import uuid
import logging
from queue import Queue, Empty
from typing import Optional, Dict
from dataclasses import dataclass
# Apply torch.load patch for PyTorch 2.5+ compatibility with fairseq models
try:
import rvc_torch_patch
except ImportError:
pass # Patch not available (bare metal setup)
# Add soprano and RVC to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'soprano'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'Retrieval-based-Voice-Conversion-WebUI'))
import numpy as np
import torch
import torch.nn.functional as F
import torchaudio.transforms as tat
import zmq # For Soprano server communication
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# Soprano runs in separate process, don't import it here
# from soprano import SopranoTTS
from infer.lib import rtrvc as rvc_for_realtime
from configs.config import Config as RVCConfig
from tools.torchgate import TorchGate
from multiprocessing import Queue as MPQueue, cpu_count
# Setup logging
logging.basicConfig(
level=logging.DEBUG, # Enable DEBUG to see RVC internal errors
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="Soprano + RVC Streaming API")
# Add CORS middleware to allow WebSocket connections from any origin
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global state
pipeline: Optional['SopranoRVCPipeline'] = None
pipeline_ready = False # Flag to indicate warmup complete
@dataclass
class PipelineConfig:
"""Configuration matching gui_v1.py parameters"""
pth_path: str
index_path: str
pitch: int = 0
formant: float = 0.0
index_rate: float = 0.3
rms_mix_rate: float = 0.25
block_time: float = 0.20 # 0.20s blocks = 9600 samples, reduces CPU overhead
crossfade_time: float = 0.05 # Proportional to block_time
extra_time: float = 1.8 # Reduced from 2.5s (minimum safe value per ChatGPT)
n_cpu: int = 4
f0method: str = "rmvpe" # CPU-bound but stable; GPU methods need kernel compilation
soprano_sample_rate: int = 32000
rvc_sample_rate: int = 48000
use_pv: bool = False
I_noise_reduce: bool = False
O_noise_reduce: bool = False
class TextRequest(BaseModel):
text: str
def write_wav_header(sample_rate=48000, channels=1, bits_per_sample=16):
"""Generate WAV header bytes"""
byte_rate = sample_rate * channels * bits_per_sample // 8
block_align = channels * bits_per_sample // 8
header = b'RIFF'
header += struct.pack('<I', 0xFFFFFFFF)
header += b'WAVE'
header += b'fmt '
header += struct.pack('<I', 16)
header += struct.pack('<H', 1)
header += struct.pack('<H', channels)
header += struct.pack('<I', sample_rate)
header += struct.pack('<I', byte_rate)
header += struct.pack('<H', block_align)
header += struct.pack('<H', bits_per_sample)
header += b'data'
header += struct.pack('<I', 0xFFFFFFFF)
return header
class SopranoRVCPipeline:
"""
Integrated Soprano RVC pipeline
Based on gui_v1.py audio processing logic
"""
def __init__(self, config: PipelineConfig):
self.config = config
# Change to RVC directory for config loading
rvc_dir = os.path.join(os.path.dirname(__file__), 'Retrieval-based-Voice-Conversion-WebUI')
original_dir = os.getcwd()
os.chdir(rvc_dir)
try:
self.rvc_config = RVCConfig()
self.device = self.rvc_config.device
finally:
os.chdir(original_dir)
logger.info(f"Using device: {self.device}")
logger.info(f"Half precision: {self.rvc_config.is_half}")
# Queues for multiprocessing (harvest f0 method)
self.inp_q = MPQueue()
self.opt_q = MPQueue()
# Client management
self.text_queue = Queue(maxsize=50)
self.client_queues: Dict[str, Queue] = {}
self.client_queues_lock = threading.Lock()
self.running = False
# Async pipeline: Queue between Soprano generator and RVC processor
self.soprano_chunk_queue = Queue(maxsize=100) # Large buffer - allow Soprano to run at full speed
self.current_job_id = None
self.soprano_done = threading.Event() # Signal when Soprano finishes generating
# Diagnostic logging flag
self._log_gpu_state = os.environ.get('LOG_GPU_STATE', '0') == '1'
# ZMQ connection to Soprano server (running on GTX 1660)
self.zmq_context = zmq.Context()
self.soprano_socket = self.zmq_context.socket(zmq.REQ)
soprano_server = os.environ.get('SOPRANO_SERVER', 'tcp://localhost:5555')
self.soprano_socket.connect(soprano_server)
logger.info(f"✓ Connected to Soprano server at {soprano_server}")
# Store paths for later use
self.original_dir = os.getcwd()
self.rvc_base_dir = os.path.join(os.path.dirname(__file__), "Retrieval-based-Voice-Conversion-WebUI")
# Initialize models
# Soprano runs in separate process (GTX 1660)
# self._initialize_soprano() # Removed - using ZMQ instead
self._initialize_rvc()
self._initialize_buffers()
logger.info("Pipeline initialization complete!")
def _call_soprano_server(self, text):
"""Call Soprano server via ZMQ and get audio"""
import uuid
job_id = str(uuid.uuid4())
request = {
'job_id': job_id,
'text': text
}
logger.debug(f"[Soprano] Sending request to server: {text[:50]}...")
self.soprano_socket.send_json(request)
logger.debug(f"[Soprano] Waiting for response...")
response = self.soprano_socket.recv_json()
if 'error' in response:
raise Exception(f"Soprano server error: {response['error']}")
# Convert back to numpy array
audio = np.array(response['audio'], dtype=np.float32)
logger.debug(f"[Soprano] Received {len(audio)} samples ({response['audio_duration']:.2f}s)")
return audio
def _initialize_soprano(self):
"""Initialize Soprano TTS model"""
logger.info("Loading Soprano TTS model...")
# Force Soprano to use the same device as RVC
device_str = 'cuda' if 'cuda' in str(self.device) else str(self.device)
self.soprano = SopranoTTS(
backend='lmdeploy', # Using lmdeploy backend (proven faster)
device=device_str,
cache_size_mb=1000, # Maxing out cache (plenty of VRAM available)
decoder_batch_size=4 # Higher batching to maximize throughput
)
logger.info(f"✓ Soprano TTS loaded on {self.soprano.device} using {self.soprano.backend} backend")
def _initialize_rvc(self):
"""Initialize RVC model - follows gui_v1.py start_vc()"""
logger.info("Loading RVC model...")
logger.info(f" Model: {self.config.pth_path}")
logger.info(f" Index: {self.config.index_path}")
# Enable debug logging for RVC module
rvc_logger = logging.getLogger('infer.lib.rtrvc')
rvc_logger.setLevel(logging.DEBUG)
torch.cuda.empty_cache()
# Change to RVC directory so relative paths work (assets/hubert/hubert_base.pt)
original_dir = os.getcwd()
rvc_base_dir = os.path.join(os.path.dirname(__file__), "Retrieval-based-Voice-Conversion-WebUI")
os.chdir(rvc_base_dir)
logger.info(f" Working directory: {os.getcwd()}")
try:
# Check if HuBERT exists
hubert_path = os.path.join(os.getcwd(), "assets/hubert/hubert_base.pt")
if not os.path.exists(hubert_path):
raise FileNotFoundError(f"HuBERT model not found at: {hubert_path}")
logger.info(f" HuBERT path verified: {hubert_path}")
# Convert paths to be relative to RVC directory
# Config paths are relative to soprano_to_rvc dir, need to make them relative to RVC dir
pth_path_abs = os.path.join(original_dir, self.config.pth_path)
index_path_abs = os.path.join(original_dir, self.config.index_path)
# Make them relative to RVC directory
pth_path_rel = os.path.relpath(pth_path_abs, rvc_base_dir)
index_path_rel = os.path.relpath(index_path_abs, rvc_base_dir)
logger.info(f" Adjusted model path: {pth_path_rel}")
logger.info(f" Adjusted index path: {index_path_rel}")
self.rvc = rvc_for_realtime.RVC(
self.config.pitch,
self.config.formant,
pth_path_rel,
index_path_rel,
self.config.index_rate,
self.config.n_cpu,
self.inp_q,
self.opt_q,
self.rvc_config,
None
)
# Verify critical attributes were set
if not hasattr(self.rvc, 'model') or self.rvc.model is None:
# The RVC __init__ catches exceptions silently, so model might not be set
raise RuntimeError(
"RVC model attribute not set - HuBERT model failed to load. "
"Check that assets/hubert/hubert_base.pt is accessible and valid."
)
if not hasattr(self.rvc, 'net_g') or self.rvc.net_g is None:
raise RuntimeError("RVC net_g attribute not set - Synthesizer failed to load")
if not hasattr(self.rvc, 'version'):
self.rvc.version = 'v2' # Default to v2 for modern models
logger.info(" Version attribute not set by RVC, defaulting to v2")
# Get target sample rate
rvc_sr = getattr(self.rvc, 'tgt_sr', self.config.rvc_sample_rate)
logger.info(f"✓ RVC model loaded (version: {self.rvc.version}, target SR: {rvc_sr}Hz)")
# Apply torch optimizations for inference performance
logger.info("Applying torch inference optimizations...")
torch.set_grad_enabled(False) # Disable gradient computation (inference only)
torch.backends.cudnn.benchmark = True # Auto-tune convolution algorithms
torch.backends.cuda.matmul.allow_tf32 = True # Allow TF32 for matrix ops
torch.backends.cudnn.allow_tf32 = True # Allow TF32 for cuDNN ops
# Ensure RVC models are in FP16 if half precision is enabled
if self.rvc_config.is_half:
logger.info("Ensuring RVC models are in FP16...")
self.rvc.model = self.rvc.model.half()
self.rvc.net_g = self.rvc.net_g.half()
logger.info("✓ Torch optimizations applied")
except Exception as e:
logger.error(f"Failed to initialize RVC: {e}")
logger.error(f"Current directory: {os.getcwd()}")
logger.error(f"Files in assets/hubert: {os.listdir('assets/hubert') if os.path.exists('assets/hubert') else 'DIR NOT FOUND'}")
raise
finally:
# Always restore original directory
os.chdir(original_dir)
def _initialize_buffers(self):
"""
Initialize all buffers, windows, and resamplers
Closely follows gui_v1.py buffer setup logic
"""
logger.info("Initializing buffers and processing pipeline...")
# Sample rates
self.soprano_sr = self.config.soprano_sample_rate # 32000 Hz
self.target_sr = self.config.rvc_sample_rate # 48000 Hz (output)
# Zero-crossing rate calculation
self.zc = self.target_sr // 100 # 480 samples
# Block frames (following gui_v1.py logic)
self.block_frame = (
int(np.round(self.config.block_time * self.target_sr / self.zc)) * self.zc
)
self.block_frame_16k = 160 * self.block_frame // self.zc
# Crossfade frames
self.crossfade_frame = (
int(np.round(self.config.crossfade_time * self.target_sr / self.zc)) * self.zc
)
self.sola_buffer_frame = min(self.crossfade_frame, 4 * self.zc)
self.sola_search_frame = self.zc
# Extra frames for processing
self.extra_frame = (
int(np.round(self.config.extra_time * self.target_sr / self.zc)) * self.zc
)
logger.info(f" Block frame: {self.block_frame} samples ({self.config.block_time}s)")
logger.info(f" Crossfade: {self.crossfade_frame} samples ({self.config.crossfade_time}s)")
logger.info(f" Extra time: {self.extra_frame} samples ({self.config.extra_time}s)")
# Input buffers (following gui_v1.py)
self.input_wav = torch.zeros(
self.extra_frame + self.crossfade_frame + self.sola_search_frame + self.block_frame,
device=self.device,
dtype=torch.float32,
)
self.input_wav_denoise = self.input_wav.clone()
self.input_wav_res = torch.zeros(
160 * self.input_wav.shape[0] // self.zc,
device=self.device,
dtype=torch.float32,
)
# RMS buffer for threshold detection
self.rms_buffer = np.zeros(4 * self.zc, dtype='float32')
# SOLA buffers
self.sola_buffer = torch.zeros(
self.sola_buffer_frame, device=self.device, dtype=torch.float32
)
self.nr_buffer = self.sola_buffer.clone()
self.output_buffer = self.input_wav.clone()
# Processing parameters
self.skip_head = self.extra_frame // self.zc
self.return_length = (
self.block_frame + self.sola_buffer_frame + self.sola_search_frame
) // self.zc
# Fade windows for SOLA crossfading
self.fade_in_window = (
torch.sin(
0.5 * np.pi * torch.linspace(
0.0, 1.0, steps=self.sola_buffer_frame,
device=self.device, dtype=torch.float32
)
) ** 2
)
self.fade_out_window = 1 - self.fade_in_window
# Resamplers
# Soprano (32kHz) → Target (48kHz) for buffer accumulation
self.resampler_soprano_to_48k = tat.Resample(
orig_freq=self.soprano_sr,
new_freq=self.target_sr,
dtype=torch.float32
).to(self.device)
# Target (48kHz) → RVC input (16kHz)
self.resampler_48k_to_16k = tat.Resample(
orig_freq=self.target_sr,
new_freq=16000,
dtype=torch.float32
).to(self.device)
# Soprano (32kHz) → RVC input (16kHz) - DEPRECATED, keeping for compatibility
self.resampler_soprano_to_16k = tat.Resample(
orig_freq=self.soprano_sr,
new_freq=16000,
dtype=torch.float32
).to(self.device)
# RVC output (tgt_sr) → Target output (48kHz)
self.rvc_tgt_sr = getattr(self.rvc, 'tgt_sr', self.target_sr)
if self.rvc_tgt_sr != self.target_sr:
self.resampler_rvc_to_target = tat.Resample(
orig_freq=self.rvc_tgt_sr,
new_freq=self.target_sr,
dtype=torch.float32
).to(self.device)
else:
self.resampler_rvc_to_target = None
# Noise gate (if enabled)
if self.config.I_noise_reduce or self.config.O_noise_reduce:
self.tg = TorchGate(
sr=self.target_sr,
n_fft=4 * self.zc,
prop_decrease=0.9
).to(self.device)
logger.info("✓ Buffers initialized")
def process_soprano_chunk(self, soprano_chunk: torch.Tensor) -> torch.Tensor:
"""
Process a Soprano output chunk through the buffer pipeline
Soprano outputs at 32kHz mono, we resample to 48kHz
Returns the chunk at 48kHz
"""
# Convert to tensor if needed
if not torch.is_tensor(soprano_chunk):
soprano_chunk = torch.from_numpy(soprano_chunk)
# Ensure chunk is on the correct device
soprano_chunk = soprano_chunk.to(self.device)
# Dual resampling strategy: resample from 32kHz to both 48kHz and 16kHz in parallel
# This avoids the 48kHz→16kHz step while keeping both buffers we need:
# - 48kHz for RMS mixing and final output buffers
# - 16kHz for RVC inference
chunk_48k = self.resampler_soprano_to_48k(soprano_chunk.unsqueeze(0).unsqueeze(0))[0, 0]
chunk_16k = self.resampler_soprano_to_16k(soprano_chunk.unsqueeze(0).unsqueeze(0))[0, 0]
# Return both for parallel buffer management
return chunk_48k, chunk_16k
def accumulate_and_process_block(self, chunk_48k: torch.Tensor, chunk_16k: torch.Tensor) -> bool:
"""
Accumulate samples in both 48kHz and 16kHz domains and slide buffers when we have enough
Returns True if we accumulated a full block and should process through RVC
Args:
chunk_48k: Audio chunk at 48kHz (for RMS mixing buffer)
chunk_16k: Audio chunk at 16kHz (for RVC inference buffer)
"""
# Accumulation buffers for both sample rates
if not hasattr(self, 'accumulation_buffer_48k'):
self.accumulation_buffer_48k = torch.tensor([], device=self.device, dtype=torch.float32)
if not hasattr(self, 'accumulation_buffer_16k'):
self.accumulation_buffer_16k = torch.tensor([], device=self.device, dtype=torch.float32)
self.accumulation_buffer_48k = torch.cat([self.accumulation_buffer_48k, chunk_48k])
self.accumulation_buffer_16k = torch.cat([self.accumulation_buffer_16k, chunk_16k])
# Check if we have accumulated enough samples for a block (at 48kHz rate)
# We need block_frame samples (12000 at 48kHz = 0.25s)
if self.accumulation_buffer_48k.shape[0] >= self.block_frame:
# Take exactly block_frame samples from 48kHz buffer
chunk_size = self.block_frame
block_data_48k = self.accumulation_buffer_48k[:chunk_size]
self.accumulation_buffer_48k = self.accumulation_buffer_48k[chunk_size:]
# Take corresponding samples from 16kHz buffer (same time duration)
block_data_16k = self.accumulation_buffer_16k[:self.block_frame_16k]
self.accumulation_buffer_16k = self.accumulation_buffer_16k[self.block_frame_16k:]
# Slide the 48kHz input buffer (for RMS mixing)
self.input_wav[:-chunk_size] = self.input_wav[chunk_size:].clone()
self.input_wav[-chunk_size:] = block_data_48k
# Slide the 16kHz RVC input buffer (for inference)
self.input_wav_res[:-self.block_frame_16k] = self.input_wav_res[self.block_frame_16k:].clone()
self.input_wav_res[-self.block_frame_16k:] = block_data_16k
return True # Ready to process through RVC
return False # Need more samples
def flush_buffers(self) -> list:
"""
Flush any remaining audio in accumulation buffers by padding to block size.
Returns list of RVC output chunks for any remaining audio.
Call this at the end of synthesis to ensure no audio is cut off.
"""
output_chunks = []
if not hasattr(self, 'accumulation_buffer_48k') or not hasattr(self, 'accumulation_buffer_16k'):
return output_chunks
# Check if there's remaining audio in the buffers
remaining_48k = self.accumulation_buffer_48k.shape[0]
remaining_16k = self.accumulation_buffer_16k.shape[0]
if remaining_48k > 0:
logger.debug(f"[Flush] Processing {remaining_48k} remaining samples @ 48kHz")
# Pad to block size with silence
pad_size_48k = self.block_frame - remaining_48k
pad_size_16k = self.block_frame_16k - remaining_16k
if pad_size_48k > 0:
padding_48k = torch.zeros(pad_size_48k, device=self.device, dtype=torch.float32)
padding_16k = torch.zeros(pad_size_16k, device=self.device, dtype=torch.float32)
chunk_48k = torch.cat([self.accumulation_buffer_48k, padding_48k])
chunk_16k = torch.cat([self.accumulation_buffer_16k, padding_16k])
else:
chunk_48k = self.accumulation_buffer_48k[:self.block_frame]
chunk_16k = self.accumulation_buffer_16k[:self.block_frame_16k]
# Process the final block
if self.accumulate_and_process_block(chunk_48k, chunk_16k):
rvc_output = self.process_through_rvc()
# Trim the padding from output (only keep the actual audio)
# Calculate how many output samples correspond to input samples
output_samples = int(remaining_48k) # Same rate (48kHz)
if output_samples > 0 and output_samples < len(rvc_output):
rvc_output = rvc_output[:output_samples]
output_chunks.append(rvc_output)
logger.debug(f"[Flush] Generated {len(rvc_output)} output samples")
# Clear the buffers
self.accumulation_buffer_48k = torch.tensor([], device=self.device, dtype=torch.float32)
self.accumulation_buffer_16k = torch.tensor([], device=self.device, dtype=torch.float32)
return output_chunks
def process_through_rvc(self) -> Optional[torch.Tensor]:
"""
Process accumulated buffer through RVC
Returns RVC output chunk ready for streaming
"""
# GPU state logging (for diagnostic purposes)
if hasattr(self, '_log_gpu_state') and self._log_gpu_state:
mem_before = torch.cuda.memory_allocated() / 1024**3 if torch.cuda.is_available() else 0
reserved_before = torch.cuda.memory_reserved() / 1024**3 if torch.cuda.is_available() else 0
# Change to RVC directory for inference (needs to access assets/rmvpe/rmvpe.pt etc)
current_dir = os.getcwd()
os.chdir(self.rvc_base_dir)
try:
# RVC inference
rvc_start = time.time()
infer_wav = self.rvc.infer(
self.input_wav_res,
self.block_frame_16k,
self.skip_head,
self.return_length,
self.config.f0method,
)
rvc_duration = time.time() - rvc_start
# Log RVC inference time if slow
if rvc_duration > 0.200: # >200ms is slow
logger.warning(f"[RVC] Slow inference: {rvc_duration*1000:.1f}ms")
if hasattr(self, '_log_gpu_state') and self._log_gpu_state:
mem_after = torch.cuda.memory_allocated() / 1024**3 if torch.cuda.is_available() else 0
reserved_after = torch.cuda.memory_reserved() / 1024**3 if torch.cuda.is_available() else 0
logger.warning(f"[RVC] GPU mem: {mem_before:.2f}GB -> {mem_after:.2f}GB (reserved: {reserved_before:.2f} -> {reserved_after:.2f})")
finally:
# Restore original directory
os.chdir(current_dir)
# Convert to tensor if needed
if not torch.is_tensor(infer_wav):
infer_wav = torch.from_numpy(infer_wav).to(self.device)
# Resample RVC output to target SR if needed
if self.resampler_rvc_to_target is not None:
infer_wav = self.resampler_rvc_to_target(infer_wav)
# Output noise reduction (optional)
if self.config.O_noise_reduce:
self.output_buffer[:-self.block_frame] = self.output_buffer[self.block_frame:].clone()
self.output_buffer[-self.block_frame:] = infer_wav[-self.block_frame:]
infer_wav = self.tg(infer_wav.unsqueeze(0), self.output_buffer.unsqueeze(0)).squeeze(0)
# Volume envelope mixing (RMS matching)
if self.config.rms_mix_rate < 1:
if self.config.I_noise_reduce:
input_wav = self.input_wav_denoise[self.extra_frame:]
else:
input_wav = self.input_wav[self.extra_frame:]
# Calculate RMS for input and output
import librosa
rms1 = librosa.feature.rms(
y=input_wav[:infer_wav.shape[0]].cpu().numpy(),
frame_length=4 * self.zc,
hop_length=self.zc,
)
rms1 = torch.from_numpy(rms1).to(self.device)
rms1 = F.interpolate(
rms1.unsqueeze(0),
size=infer_wav.shape[0] + 1,
mode='linear',
align_corners=True,
)[0, 0, :-1]
rms2 = librosa.feature.rms(
y=infer_wav[:].cpu().numpy(),
frame_length=4 * self.zc,
hop_length=self.zc,
)
rms2 = torch.from_numpy(rms2).to(self.device)
rms2 = F.interpolate(
rms2.unsqueeze(0),
size=infer_wav.shape[0] + 1,
mode='linear',
align_corners=True,
)[0, 0, :-1]
rms2 = torch.max(rms2, torch.zeros_like(rms2) + 1e-3)
infer_wav *= torch.pow(rms1 / rms2, torch.tensor(1 - self.config.rms_mix_rate))
# SOLA algorithm for seamless crossfading (from gui_v1.py)
conv_input = infer_wav[None, None, :self.sola_buffer_frame + self.sola_search_frame]
cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :])
cor_den = torch.sqrt(
F.conv1d(
conv_input ** 2,
torch.ones(1, 1, self.sola_buffer_frame, device=self.device)
) + 1e-8
)
sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0])
infer_wav = infer_wav[sola_offset:]
# Apply crossfade
if not self.config.use_pv: # Standard fade (phase vocoder disabled)
infer_wav[:self.sola_buffer_frame] *= self.fade_in_window
infer_wav[:self.sola_buffer_frame] += self.sola_buffer * self.fade_out_window
# Update SOLA buffer for next iteration
self.sola_buffer[:] = infer_wav[self.block_frame:self.block_frame + self.sola_buffer_frame]
# Return the block ready for output
return infer_wav[:self.block_frame]
def soprano_generator_worker(self):
"""
Thread 1: Continuously generates Soprano chunks and puts them in queue
Runs in parallel with RVC processor - NO BLOCKING!
"""
logger.info("Soprano generator worker started")
while self.running:
try:
# Get text job from queue
try:
job = self.text_queue.get(timeout=0.1)
job_id = job['job_id']
text = job['text']
self.current_job_id = job_id
logger.info(f"[Soprano] Processing job {job_id[:8]}: {text[:50]}...")
start_time = time.time()
# Clear the done flag
self.soprano_done.clear()
# Get audio from Soprano server (GTX 1660) via ZMQ
soprano_audio = self._call_soprano_server(text)
# Convert to tensor and split into chunks for queue
soprano_audio_tensor = torch.from_numpy(soprano_audio).to(self.device).float()
# Split into chunks of ~0.5s each for queue processing
chunk_size = 16000 # 0.5s @ 32kHz
num_chunks = (len(soprano_audio_tensor) + chunk_size - 1) // chunk_size
chunk_count = 0
chunk_times = []
queue_wait_times = []
for i in range(num_chunks):
chunk_count += 1
chunk_start = i * chunk_size
chunk_end = min((i + 1) * chunk_size, len(soprano_audio_tensor))
soprano_chunk = soprano_audio_tensor[chunk_start:chunk_end].cpu().numpy()
chunk_times.append(time.time())
# Put chunk in queue for RVC processor
# Measure if this blocks (queue full)
queue_put_start = time.time()
self.soprano_chunk_queue.put({
'job_id': job_id,
'chunk': soprano_chunk,
'chunk_num': chunk_count,
'timestamp': time.time()
})
queue_wait = time.time() - queue_put_start
queue_wait_times.append(queue_wait)
if queue_wait > 0.01: # Log if queue put took >10ms (blocking)
logger.warning(f"[Soprano] Chunk {chunk_count} BLOCKED {queue_wait*1000:.1f}ms on queue.put() (queue full!)")
logger.debug(f"[Soprano] Queued chunk {chunk_count} for RVC (queue wait: {queue_wait*1000:.1f}ms)")
# Signal that Soprano is done generating for this job
self.soprano_done.set()
elapsed = time.time() - start_time
total_queue_wait = sum(queue_wait_times)
max_queue_wait = max(queue_wait_times) if queue_wait_times else 0
logger.info(f"[Soprano] Job {job_id[:8]} complete: {chunk_count} chunks in {elapsed:.2f}s")
logger.info(f"[Soprano] Total queue wait: {total_queue_wait:.2f}s ({total_queue_wait/elapsed*100:.1f}% of time)")
logger.info(f"[Soprano] Max queue wait: {max_queue_wait*1000:.1f}ms")
if total_queue_wait > 1.0:
logger.warning(f"[Soprano] ⚠️ BOTTLENECK: Spent {total_queue_wait:.2f}s blocked on queue! RVC too slow to drain.")
except Empty:
continue
except Exception as e:
logger.error(f"[Soprano] Error: {e}", exc_info=True)
self.soprano_done.set()
def rvc_processor_worker(self):
"""
Thread 2: Continuously pulls Soprano chunks from queue and processes through RVC
Runs in parallel with Soprano generator - NO BLOCKING!
"""
logger.info("RVC processor worker started")
# Timing diagnostics
job_start_time = None
current_job_id = None
block_times = []
processing_times = []
while self.running:
try:
# Get Soprano chunk from queue (with timeout to check running flag)
try:
item = self.soprano_chunk_queue.get(timeout=0.1)
job_id = item['job_id']
soprano_chunk = item['chunk']
chunk_num = item['chunk_num']
chunk_timestamp = item['timestamp']
# Track job timing
if current_job_id != job_id:
# New job started
if current_job_id is not None and len(block_times) > 0:
# Log previous job stats
self._log_job_stats(current_job_id, job_start_time, block_times, processing_times)
# GPU state logging between jobs
if self._log_gpu_state and torch.cuda.is_available():
torch.cuda.synchronize()
mem_alloc = torch.cuda.memory_allocated() / 1024**3
mem_reserved = torch.cuda.memory_reserved() / 1024**3
logger.info(f"[GPU] Between jobs: {mem_alloc:.2f}GB allocated, {mem_reserved:.2f}GB reserved")
current_job_id = job_id
job_start_time = time.time()
block_times = []
processing_times = []
logger.info(f"[RVC] Starting job {job_id[:8]}")
# GPU state at job start
if self._log_gpu_state and torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / 1024**3
mem_reserved = torch.cuda.memory_reserved() / 1024**3
logger.info(f"[GPU] Job start: {mem_alloc:.2f}GB allocated, {mem_reserved:.2f}GB reserved")
queue_wait = time.time() - chunk_timestamp
logger.debug(f"[RVC] Processing chunk {chunk_num} (queue wait: {queue_wait*1000:.1f}ms)")
# Process and accumulate chunks (dual resampling: 48kHz + 16kHz)
chunk_48k, chunk_16k = self.process_soprano_chunk(soprano_chunk)
# IMPORTANT: Drain accumulation buffer - one Soprano chunk may fill multiple blocks!
blocks_in_this_chunk = 0
while self.accumulate_and_process_block(chunk_48k, chunk_16k):
blocks_in_this_chunk += 1
block_start = time.time()
# Process through RVC now that we have a full block
try:
processing_start = time.time()
rvc_output = self.process_through_rvc()
processing_duration = time.time() - processing_start
processing_times.append(processing_duration)
# Convert to PCM and broadcast
pcm_data = (rvc_output.cpu().numpy() * 32767).clip(-32768, 32767).astype('int16').tobytes()
# Broadcast to all clients
with self.client_queues_lock:
dead_clients = []
for client_id, queue in list(self.client_queues.items()):
try:
queue.put(pcm_data, timeout=0.5)
except Exception as e:
logger.debug(f"Client {client_id[:8]} queue put failed: {e}")
if queue.full():
dead_clients.append(client_id)
for client_id in dead_clients:
del self.client_queues[client_id]
logger.warning(f"Client {client_id[:8]} removed (queue full)")
block_times.append(time.time() - block_start)
logger.debug(f"[RVC] Block processed in {processing_duration*1000:.1f}ms")
except Exception as e:
logger.error(f"[RVC] Processing error: {e}", exc_info=True)
# After first block, give empty tensors for next iteration
chunk_48k = torch.tensor([], device=self.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=self.device, dtype=torch.float32)
logger.debug(f"[RVC] Chunk {chunk_num} produced {blocks_in_this_chunk} blocks")
except Empty:
# Check if Soprano is done and queue is empty
if self.soprano_done.is_set() and self.soprano_chunk_queue.empty():
if current_job_id is not None and len(block_times) > 0:
# Log final job stats
self._log_job_stats(current_job_id, job_start_time, block_times, processing_times)
current_job_id = None
continue
except Exception as e:
logger.error(f"[RVC] Error: {e}", exc_info=True)
def _log_job_stats(self, job_id, start_time, block_times, processing_times):
"""Log timing statistics for completed job"""
elapsed = time.time() - start_time
total_audio_duration = len(block_times) * self.config.block_time
realtime_factor = total_audio_duration / elapsed if elapsed > 0 else 0
avg_processing = sum(processing_times) / len(processing_times) if processing_times else 0
logger.info(f"""
====== ASYNC PIPELINE PERFORMANCE ======
Job: {job_id[:8]}
Total elapsed: {elapsed:.2f}s
Blocks processed: {len(block_times)}
Audio duration: {total_audio_duration:.2f}s
Realtime factor: {realtime_factor:.2f}x
Avg RVC processing: {avg_processing*1000:.1f}ms per block
========================================
""")
# Write to file for easy access
with open('/tmp/soprano_timing_debug.txt', 'a') as f:
f.write(f"\n=== ASYNC PIPELINE JOB {job_id[:8]} ===\n")
f.write(f"Blocks: {len(block_times)}, Elapsed: {elapsed:.2f}s\n")
f.write(f"Audio: {total_audio_duration:.2f}s, Realtime: {realtime_factor:.2f}x\n")
f.write(f"Avg RVC: {avg_processing*1000:.1f}ms\n")
f.write(f"==================\n\n")
def broadcast_worker(self):
"""
DEPRECATED: Old synchronous implementation
Now using soprano_generator_worker() + rvc_processor_worker() for parallel processing
Kept for reference but starts the new async workers instead
"""
logger.info("Starting async pipeline workers...")
self.running = True
# Start both workers in parallel
soprano_thread = threading.Thread(target=self.soprano_generator_worker, daemon=True, name="Soprano-Generator")
rvc_thread = threading.Thread(target=self.rvc_processor_worker, daemon=True, name="RVC-Processor")
soprano_thread.start()
rvc_thread.start()
logger.info("✓ Async pipeline workers started (Soprano + RVC running in parallel)")
# Keep main thread alive
while self.running:
time.sleep(0.1)
logger.info("Broadcast worker stopped")
def start(self):
"""Start the broadcast worker thread"""
worker = threading.Thread(target=self.broadcast_worker, daemon=True)
worker.start()
logger.info("Pipeline started")
def stop(self):
"""Stop the pipeline"""
self.running = False
logger.info("Pipeline stopped")
@app.on_event("startup")
async def startup_event():
"""Initialize pipeline on startup"""
global pipeline, pipeline_ready
logger.info("="*60)
logger.info("Soprano + RVC Streaming API")
logger.info("="*60)
# Load configuration
config_path = os.path.join(os.path.dirname(__file__), "soprano_rvc_config.json")
if not os.path.exists(config_path):
logger.error(f"Configuration file not found: {config_path}")
raise FileNotFoundError(f"Config file missing: {config_path}")
with open(config_path, 'r') as f:
config_dict = json.load(f)
config = PipelineConfig(**config_dict)
logger.info("Initializing Soprano + RVC pipeline...")
logger.info("This may take 15-20 seconds...")
# Initialize pipeline (blocks until models loaded)
pipeline = SopranoRVCPipeline(config)
pipeline.start()
logger.info("="*60)
logger.info("✓ Pipeline ready! API accepting requests on port 8765")
logger.info("="*60)
# Warmup: Generate a test phrase to initialize all components
logger.info("🔥 Warming up pipeline with test generation...")
try:
# Text sized to generate ~1-2 seconds of audio (roughly 16 blocks @ 0.2s each = 3.2s)
warmup_text = "Hello, I'm Miku. Voice system initialized and ready."
warmup_audio = pipeline._call_soprano_server(warmup_text)
warmup_tensor = torch.from_numpy(warmup_audio).to(pipeline.device).float()
# Process all chunks through the full pipeline to ensure everything is warmed up
chunk_size = 3200 # 0.1s @ 32kHz
blocks_generated = 0
for i in range(0, len(warmup_tensor), chunk_size):
chunk = warmup_tensor[i:min(i + chunk_size, len(warmup_tensor))].cpu().numpy()
chunk_48k, chunk_16k = pipeline.process_soprano_chunk(chunk)
# Keep draining blocks
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
_ = pipeline.process_through_rvc()
blocks_generated += 1
# After first block, subsequent chunks are empty
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# Flush any remaining audio
flush_chunks = pipeline.flush_buffers()
blocks_generated += len(flush_chunks)
if blocks_generated > 0:
logger.info(f"✅ Warmup complete! Generated {blocks_generated} audio blocks. Pipeline is hot and ready.")
else:
logger.warning("⚠ Warmup didn't generate blocks, but pipeline initialized.")
pipeline_ready = True
except Exception as e:
logger.error(f"❌ Warmup failed: {e}", exc_info=True)
logger.warning("Pipeline will still accept requests, but first generation may be slow.")
pipeline_ready = True # Allow connections anyway
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
global pipeline
if pipeline:
pipeline.stop()
logger.info("Pipeline shutdown complete")
@app.post("/api/speak")
async def speak(request: TextRequest):
"""Queue text for synthesis"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
job_id = str(uuid.uuid4())
pipeline.text_queue.put({'job_id': job_id, 'text': request.text})
return {
"status": "queued",
"message": "Text queued for synthesis",
"job_id": job_id,
"queue_size": pipeline.text_queue.qsize()
}
@app.post("/api/speak_to_file")
async def speak_to_file(request: TextRequest):
"""Synthesize text and save directly to WAV file (for testing)"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import wave
import tempfile
import os
job_id = str(uuid.uuid4())
output_path = f"/tmp/soprano_rvc_{job_id[:8]}.wav"
try:
logger.info(f"File synthesis job {job_id[:8]}: {request.text[:50]}...")
# Collect all audio chunks
audio_chunks = []
stream = pipeline.soprano.infer_stream(request.text, chunk_size=10)
chunk_count = 0
soprano_chunk_count = 0
for soprano_chunk in stream:
soprano_chunk_count += 1
# Debug logging (can be commented out for production)
# logger.info(f" Soprano chunk {soprano_chunk_count}: shape={soprano_chunk.shape if hasattr(soprano_chunk, 'shape') else len(soprano_chunk)}")
# Process and accumulate chunks (dual resampling)
chunk_48k, chunk_16k = pipeline.process_soprano_chunk(soprano_chunk)
# logger.info(f" After dual resample: 48k={chunk_48k.shape}, 16k={chunk_16k.shape}")
# IMPORTANT: Drain the accumulation buffer - one Soprano chunk may fill multiple blocks!
# Keep processing blocks until we don't have enough samples left
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
chunk_count += 1
# logger.info(f" Block {chunk_count} ready - processing through RVC")
# logger.info(f" input_wav buffer: shape={pipeline.input_wav.shape}, mean={pipeline.input_wav.abs().mean():.6f}")
# Process through RVC
rvc_output = pipeline.process_through_rvc()
# logger.info(f" RVC output: shape={rvc_output.shape}, mean={rvc_output.abs().mean():.6f}, max={rvc_output.abs().max():.6f}")
audio_chunks.append(rvc_output.cpu().numpy())
# After first block, both chunks should be empty tensors for subsequent iterations
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# CRITICAL: Flush any remaining audio in buffers
flush_chunks = pipeline.flush_buffers()
for rvc_output in flush_chunks:
audio_chunks.append(rvc_output.cpu().numpy())
# Concatenate all chunks
if audio_chunks:
import numpy as np
full_audio = np.concatenate(audio_chunks)
# Write to WAV file
with wave.open(output_path, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(pipeline.config.rvc_sample_rate)
# Convert to int16 PCM
pcm_data = (full_audio * 32767).clip(-32768, 32767).astype('int16')
wav_file.writeframes(pcm_data.tobytes())
logger.info(f"File synthesis complete: {output_path} ({chunk_count} chunks, {len(full_audio)/pipeline.config.rvc_sample_rate:.2f}s)")
return {
"status": "complete",
"output_file": output_path,
"chunks": chunk_count,
"duration_seconds": len(full_audio) / pipeline.config.rvc_sample_rate
}
else:
return {"error": "No audio generated"}
except Exception as e:
logger.error(f"File synthesis error: {e}", exc_info=True)
return {"error": str(e)}
@app.post("/api/speak_soprano_only")
async def speak_soprano_only(request: TextRequest):
"""Synthesize with Soprano only (no RVC) for comparison"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import wave
import numpy as np
job_id = str(uuid.uuid4())
output_path = f"/tmp/soprano_only_{job_id[:8]}.wav"
try:
logger.info(f"Soprano-only synthesis: {request.text[:50]}...")
# Generate with Soprano
audio = pipeline.soprano.infer(request.text)
# Write to WAV file at Soprano's native sample rate
with wave.open(output_path, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(pipeline.config.soprano_sample_rate) # 32kHz
# Convert to int16 PCM
if torch.is_tensor(audio):
audio = audio.cpu().numpy()
pcm_data = (audio * 32767).clip(-32768, 32767).astype('int16')
wav_file.writeframes(pcm_data.tobytes())
logger.info(f"Soprano-only complete: {output_path}")
return {
"status": "complete",
"output_file": output_path,
"sample_rate": pipeline.config.soprano_sample_rate,
"duration_seconds": len(audio) / pipeline.config.soprano_sample_rate
}
except Exception as e:
logger.error(f"Soprano-only synthesis error: {e}", exc_info=True)
return {"error": str(e)}
@app.post("/api/debug_pre_rvc")
async def debug_pre_rvc(request: TextRequest):
"""Debug endpoint: Save reconstructed audio BEFORE RVC processing"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import wave
import numpy as np
job_id = str(uuid.uuid4())
output_48k = f"/tmp/pre_rvc_48k_{job_id[:8]}.wav"
output_16k = f"/tmp/pre_rvc_16k_{job_id[:8]}.wav"
try:
logger.info(f"Pre-RVC debug: {request.text[:50]}...")
# Collect all input_wav states (what goes INTO RVC)
input_wav_chunks_48k = []
input_wav_chunks_16k = []
stream = pipeline.soprano.infer_stream(request.text, chunk_size=10)
chunk_count = 0
for soprano_chunk in stream:
chunk_48k, chunk_16k = pipeline.process_soprano_chunk(soprano_chunk)
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
chunk_count += 1
# SAVE the input_wav buffer BEFORE RVC processes it
# This is the reconstructed audio from accumulated Soprano chunks
# Take the relevant portion (last block_frame + extra)
pre_rvc_48k = pipeline.input_wav[pipeline.extra_frame:pipeline.extra_frame + pipeline.block_frame].clone()
# For 16kHz, just take the last block worth of samples from input_wav_res
pre_rvc_16k = pipeline.input_wav_res[-pipeline.block_frame_16k:].clone()
input_wav_chunks_48k.append(pre_rvc_48k.cpu().numpy())
input_wav_chunks_16k.append(pre_rvc_16k.cpu().numpy())
# Still process through RVC for timing
pipeline.process_through_rvc()
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# Save 48kHz version (reconstructed audio before RVC)
if input_wav_chunks_48k:
full_audio_48k = np.concatenate(input_wav_chunks_48k)
with wave.open(output_48k, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(pipeline.config.rvc_sample_rate)
pcm_data = (full_audio_48k * 32767).clip(-32768, 32767).astype('int16')
wav_file.writeframes(pcm_data.tobytes())
# Save 16kHz version (what RVC actually receives)
if input_wav_chunks_16k:
full_audio_16k = np.concatenate(input_wav_chunks_16k)
with wave.open(output_16k, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(16000) # RVC input sample rate
pcm_data = (full_audio_16k * 32767).clip(-32768, 32767).astype('int16')
wav_file.writeframes(pcm_data.tobytes())
logger.info(f"Pre-RVC debug complete: {output_48k}, {output_16k}")
return {
"status": "complete",
"output_48k": output_48k,
"output_16k": output_16k,
"chunks": chunk_count,
"info": "48k is reconstructed audio, 16k is what RVC receives"
}
except Exception as e:
logger.error(f"Pre-RVC debug error: {e}", exc_info=True)
return {"error": str(e)}
@app.get("/api/stream/continuous")
async def stream_continuous():
"""Continuous audio stream endpoint"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
client_id = str(uuid.uuid4())
# Smaller queue for tighter flow control (20s buffer = 80 blocks at 0.25s each)
with pipeline.client_queues_lock:
pipeline.client_queues[client_id] = Queue(maxsize=80)
logger.info(f"Client {client_id[:8]} connected")
async def generate():
try:
# Send WAV header
yield write_wav_header(
sample_rate=pipeline.config.rvc_sample_rate,
channels=1,
bits_per_sample=16
)
# Wait for initial buffer (help VLC establish timing)
# Wait for at least 1-2 blocks before starting stream
initial_buffer_blocks = 2
buffered_chunks = []
while len(buffered_chunks) < initial_buffer_blocks:
try:
chunk = await asyncio.get_event_loop().run_in_executor(
None,
lambda: pipeline.client_queues[client_id].get(timeout=5.0)
)
buffered_chunks.append(chunk)
except Exception:
# Timeout waiting for initial buffer - start anyway
break
# Send initial buffer
for chunk in buffered_chunks:
yield chunk
# Stream remaining audio chunks
while True:
try:
chunk = await asyncio.get_event_loop().run_in_executor(
None,
lambda: pipeline.client_queues[client_id].get(timeout=0.1)
)
yield chunk
except Empty:
await asyncio.sleep(0.01)
continue
except Exception as e:
logger.error(f"Client {client_id[:8]} stream error: {e}")
break
finally:
with pipeline.client_queues_lock:
if client_id in pipeline.client_queues:
del pipeline.client_queues[client_id]
logger.info(f"Client {client_id[:8]} disconnected")
return StreamingResponse(
generate(),
media_type="audio/wav",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.get("/api/test_soprano_stream")
async def test_soprano_stream(text: str = "This is a test of raw Soprano TTS performance without RVC processing."):
"""Test Soprano TTS performance in isolation - streams raw Soprano output without RVC"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import time
import numpy as np
logger.info(f"Testing Soprano-only streaming: {text[:50]}...")
async def generate():
# Send WAV header (Soprano outputs at 32kHz)
yield write_wav_header(
sample_rate=32000,
channels=1,
bits_per_sample=16
)
# Track timing
start_time = time.time()
chunk_count = 0
total_samples = 0
# Stream directly from Soprano
stream = pipeline.soprano.infer_stream(text, chunk_size=10)
for soprano_chunk in stream:
chunk_count += 1
# Convert to int16 PCM
if not torch.is_tensor(soprano_chunk):
soprano_chunk = torch.from_numpy(soprano_chunk)
pcm_data = (soprano_chunk.cpu().numpy() * 32767).clip(-32768, 32767).astype('int16').tobytes()
total_samples += len(soprano_chunk)
yield pcm_data
# Log performance
elapsed = time.time() - start_time
audio_duration = total_samples / 32000 # Soprano is 32kHz
realtime_factor = audio_duration / elapsed if elapsed > 0 else 0
logger.info(f"Soprano-only test complete:")
logger.info(f" Chunks: {chunk_count}")
logger.info(f" Elapsed: {elapsed:.2f}s")
logger.info(f" Audio duration: {audio_duration:.2f}s")
logger.info(f" Realtime factor: {realtime_factor:.2f}x")
# Write to debug file
with open('/tmp/soprano_only_timing.txt', 'w') as f:
f.write(f"Soprano-only test results:\n")
f.write(f"Text: {text}\n")
f.write(f"Chunks: {chunk_count}\n")
f.write(f"Elapsed: {elapsed:.2f}s\n")
f.write(f"Audio duration: {audio_duration:.2f}s\n")
f.write(f"Realtime factor: {realtime_factor:.2f}x\n")
return StreamingResponse(
generate(),
media_type="audio/wav",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.get("/api/test_rvc_only")
async def test_rvc_only(input_file: str = "/tmp/soprano_test_3.wav"):
"""Test RVC performance in isolation - processes pre-existing Soprano audio"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import time
import wave
import numpy as np
logger.info(f"Testing RVC-only performance with input: {input_file}")
# Verify input file exists
if not os.path.exists(input_file):
return {"error": f"Input file not found: {input_file}"}
async def generate():
# Send WAV header (RVC outputs at 48kHz)
yield write_wav_header(
sample_rate=48000,
channels=1,
bits_per_sample=16
)
# Load the input audio file (Soprano output at 32kHz)
with wave.open(input_file, 'rb') as wav:
sample_rate = wav.getframerate()
n_channels = wav.getnchannels()
audio_data = wav.readframes(wav.getnframes())
audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32767.0
logger.info(f"Loaded audio: {len(audio_np)} samples at {sample_rate}Hz")
# Convert to tensor and resample from 32kHz to 48kHz (input buffer) and 16kHz (RVC input)
audio_tensor = torch.from_numpy(audio_np).to(pipeline.device)
# Resample to 48kHz for input buffer (RMS mixing needs this)
audio_48k = pipeline.resampler_soprano_to_48k(audio_tensor.unsqueeze(0).unsqueeze(0))[0, 0]
# Resample to 16kHz for RVC inference
audio_16k = pipeline.resampler_soprano_to_16k(audio_tensor.unsqueeze(0).unsqueeze(0))[0, 0]
# Track timing
start_time = time.time()
block_count = 0
rvc_times = []
# Process through pipeline in blocks (simulating real streaming)
chunk_48k = audio_48k
chunk_16k = audio_16k
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
block_count += 1
# Process through RVC
rvc_start = time.time()
rvc_output = pipeline.process_through_rvc()
rvc_duration = time.time() - rvc_start
rvc_times.append(rvc_duration)
# Convert to PCM and yield
pcm_data = (rvc_output.cpu().numpy() * 32767).clip(-32768, 32767).astype('int16').tobytes()
yield pcm_data
# Give empty tensors for next iteration
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# Calculate stats
elapsed = time.time() - start_time
audio_duration = block_count * pipeline.config.block_time # 0.16s per block
realtime_factor = audio_duration / elapsed if elapsed > 0 else 0
avg_rvc = sum(rvc_times) / len(rvc_times) if rvc_times else 0
logger.info(f"RVC-only test complete:")
logger.info(f" Blocks: {block_count}")
logger.info(f" Elapsed: {elapsed:.2f}s")
logger.info(f" Audio duration: {audio_duration:.2f}s")
logger.info(f" Realtime factor: {realtime_factor:.2f}x")
logger.info(f" Avg RVC time: {avg_rvc*1000:.1f}ms")
# Write to debug file
with open('/tmp/rvc_only_timing.txt', 'w') as f:
f.write(f"RVC-only test results:\n")
f.write(f"Input file: {input_file}\n")
f.write(f"Blocks: {block_count}\n")
f.write(f"Elapsed: {elapsed:.2f}s\n")
f.write(f"Audio duration: {audio_duration:.2f}s\n")
f.write(f"Realtime factor: {realtime_factor:.2f}x\n")
f.write(f"Avg RVC processing: {avg_rvc*1000:.1f}ms per block\n")
f.write(f"Total RVC time: {sum(rvc_times):.2f}s ({sum(rvc_times)/elapsed*100:.1f}%)\n")
return StreamingResponse(
generate(),
media_type="audio/wav",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.get("/api/status")
async def status():
"""Get API status"""
if pipeline is None:
return {"status": "initializing", "ready": False}
with pipeline.client_queues_lock:
num_clients = len(pipeline.client_queues)
return {
"status": "running",
"ready": True,
"queue_size": pipeline.text_queue.qsize(),
"connected_clients": num_clients,
"config": {
"soprano_sr": pipeline.soprano_sr,
"target_sr": pipeline.target_sr,
"rvc_model_sr": pipeline.rvc_tgt_sr,
"block_time": pipeline.config.block_time,
"f0method": pipeline.config.f0method,
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint for Docker healthcheck"""
if pipeline is None:
return {
"status": "unhealthy",
"error": "Pipeline not initialized"
}, 503
# Test Soprano server connectivity
soprano_connected = False
try:
# Quick connectivity test - don't actually synthesize
test_socket = pipeline.zmq_context.socket(zmq.REQ)
test_socket.setsockopt(zmq.RCVTIMEO, 2000) # 2s timeout
test_socket.connect(pipeline.soprano_socket.getsockopt(zmq.LAST_ENDPOINT).decode())
test_socket.close()
soprano_connected = True
except Exception as e:
logger.warning(f"Soprano health check failed: {e}")
soprano_connected = False
return {
"status": "healthy" if (soprano_connected and pipeline_ready) else "degraded",
"soprano_connected": soprano_connected,
"rvc_initialized": pipeline.rvc is not None,
"pipeline_ready": pipeline is not None,
"warmed_up": pipeline_ready
}
@app.post("/interrupt")
async def interrupt_synthesis():
"""
Interrupt current synthesis and flush all buffers.
Used when user speaks over Miku to cancel ongoing TTS playback.
Returns:
{"status": "interrupted", "flushed": true}
"""
global pipeline
if pipeline is None:
return {"status": "error", "message": "Pipeline not initialized"}, 503
try:
# Flush Soprano ZMQ socket (drain any pending audio chunks)
flushed_chunks = 0
while pipeline.soprano_socket.poll(timeout=0):
pipeline.soprano_socket.recv()
flushed_chunks += 1
# Clear RVC audio buffer (stop processing queued audio)
if hasattr(pipeline, 'rvc_audio_buffer'):
buffer_size = len(pipeline.rvc_audio_buffer)
pipeline.rvc_audio_buffer.clear()
logger.info(f"Interrupted: Flushed {flushed_chunks} ZMQ chunks, cleared {buffer_size} RVC buffer samples")
else:
logger.info(f"Interrupted: Flushed {flushed_chunks} ZMQ chunks")
return {
"status": "interrupted",
"flushed": True,
"zmq_chunks_flushed": flushed_chunks
}
except Exception as e:
logger.error(f"Interrupt failed: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500
@app.websocket("/ws/stream")
async def websocket_stream(websocket: WebSocket):
"""
WebSocket endpoint for token-by-token streaming synthesis.
Protocol:
- Client sends: {"token": "text", "pitch_shift": 0, "flush": false}
- Server sends: binary audio data (PCM float32, 48kHz mono)
- Client sends: {"flush": true} to process remaining buffer
This enables real-time TTS as tokens arrive from LLM streaming.
"""
await websocket.accept()
logger.info("WebSocket client connected")
# Wait for warmup if not ready yet
if not pipeline_ready:
logger.warning("[WS] Pipeline not warmed up yet, waiting...")
await websocket.send_json({
"status": "warming_up",
"message": "Pipeline warming up, please wait..."
})
# Poll until ready (max 30 seconds)
for _ in range(60): # 60 * 0.5s = 30s
if pipeline_ready:
logger.info("[WS] Pipeline ready, proceeding with connection")
await websocket.send_json({
"status": "ready",
"message": "Pipeline ready!"
})
break
await asyncio.sleep(0.5)
else:
logger.error("[WS] Pipeline warmup timeout!")
await websocket.send_json({
"error": "Pipeline warmup timeout",
"message": "Pipeline failed to initialize in time"
})
await websocket.close()
return
# Buffer for accumulating tokens until sentence boundary
text_buffer = ""
config = {
"pitch_shift": 0,
"sample_rate": 48000
}
try:
while True:
# Receive token from client
logger.debug("[WS] Waiting for message...")
data = await websocket.receive_json()
logger.info(f"[WS] Received data: {data}")
# Update config if provided
if "pitch_shift" in data:
config["pitch_shift"] = data["pitch_shift"]
# Handle flush request (process whatever is in buffer)
if data.get("flush", False):
if text_buffer.strip():
logger.info(f"[WS] Flushing buffer: {text_buffer[:50]}...")
await _synthesize_and_send(websocket, text_buffer, config)
text_buffer = ""
continue
# Add token to buffer
token = data.get("token", "")
if not token:
continue
text_buffer += token
logger.debug(f"[WS] Token received: '{token}' | Buffer: {len(text_buffer)} chars")
# Check for sentence boundaries
# Synthesize when we hit punctuation or buffer gets too long
should_synthesize = False
# Sentence-ending punctuation
token_stripped = token.rstrip()
if token_stripped and token_stripped[-1] in ['.', '!', '?', '', '', '']:
should_synthesize = True
logger.info(f"[WS] Sentence boundary detected: {text_buffer[:50]}...")
# Comma/pause (optional - creates more natural pauses)
elif token_stripped and token_stripped[-1] in [',', ';', '', '']:
should_synthesize = True
logger.info(f"[WS] Pause boundary detected: {text_buffer[:50]}...")
# Buffer too long (prevent memory issues)
elif len(text_buffer) > 200:
should_synthesize = True
logger.info(f"[WS] Buffer limit reached: {text_buffer[:50]}...")
# Synthesize if needed
if should_synthesize and text_buffer.strip():
await _synthesize_and_send(websocket, text_buffer, config)
text_buffer = ""
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
# Don't try to send anything - client is already gone
except Exception as e:
logger.error(f"WebSocket error: {e}", exc_info=True)
try:
await websocket.close(code=1011, reason=str(e))
except:
pass
async def _synthesize_and_send(websocket: WebSocket, text: str, config: dict):
"""
Helper function to synthesize text and stream audio chunks via WebSocket.
Sends raw PCM float32 audio data at 48kHz mono.
"""
if pipeline is None:
await websocket.send_json({"error": "Pipeline not initialized"})
return
# Check if WebSocket is still connected
if websocket.client_state.value != 1: # 1 = CONNECTED
logger.warning(f"[WS] Cannot send - WebSocket not connected (state: {websocket.client_state})")
return
try:
# Performance tracking
pipeline_start = time.time()
rvc_times = []
total_blocks = 0
# Get audio from Soprano server via ZMQ
soprano_start = time.time()
soprano_audio = pipeline._call_soprano_server(text)
soprano_time = time.time() - soprano_start
# Convert to tensor
soprano_audio_tensor = torch.from_numpy(soprano_audio).to(pipeline.device).float()
# Split into chunks for streaming (0.1s chunks @ 32kHz)
chunk_size = 3200 # 0.1s @ 32kHz
num_chunks = (len(soprano_audio_tensor) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
chunk_start = i * chunk_size
chunk_end = min((i + 1) * chunk_size, len(soprano_audio_tensor))
soprano_chunk = soprano_audio_tensor[chunk_start:chunk_end].cpu().numpy()
# Process and accumulate chunks (dual resampling)
chunk_48k, chunk_16k = pipeline.process_soprano_chunk(soprano_chunk)
# Drain the accumulation buffer - one Soprano chunk may fill multiple blocks!
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
# Check connection state before sending
if websocket.client_state.value != 1:
logger.warning("[WS] Client disconnected during synthesis, aborting")
return
# Process through RVC
rvc_start = time.time()
rvc_output = pipeline.process_through_rvc()
rvc_time = time.time() - rvc_start
rvc_times.append(rvc_time)
total_blocks += 1
# Convert to bytes and send
# Format: float32 PCM, 48kHz, mono
audio_bytes = rvc_output.cpu().numpy().astype(np.float32).tobytes()
await websocket.send_bytes(audio_bytes)
logger.debug(f"[WS] Sent audio chunk: {len(audio_bytes)} bytes ({len(rvc_output)} samples)")
# After first block, both chunks should be empty tensors
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# CRITICAL: Flush any remaining audio in buffers
# Check connection one more time before flushing
if websocket.client_state.value != 1:
logger.warning("[WS] Client disconnected before flush, skipping")
return
flush_chunks = pipeline.flush_buffers()
for rvc_output in flush_chunks:
# Check connection state before each flush chunk
if websocket.client_state.value != 1:
logger.warning("[WS] Client disconnected during flush, aborting")
return
# Time the flush processing
rvc_start = time.time()
audio_bytes = rvc_output.cpu().numpy().astype(np.float32).tobytes()
rvc_time = time.time() - rvc_start
rvc_times.append(rvc_time)
total_blocks += 1
await websocket.send_bytes(audio_bytes)
logger.debug(f"[WS] Sent final flush chunk: {len(audio_bytes)} bytes")
# Calculate performance metrics
pipeline_time = time.time() - pipeline_start
audio_duration = len(soprano_audio) / pipeline.soprano_sr # Duration in seconds
realtime_factor = audio_duration / pipeline_time if pipeline_time > 0 else 0
avg_rvc_time = sum(rvc_times) / len(rvc_times) if rvc_times else 0
# Log performance summary
logger.info(f"""
[WS] Job complete: {total_blocks} blocks in {pipeline_time:.2f}s
Audio duration: {audio_duration:.2f}s
Realtime factor: {realtime_factor:.2f}x
Avg Soprano: {soprano_time*1000:.1f}ms
Avg RVC: {avg_rvc_time*1000:.1f}ms
Text: '{text[:50]}...'""")
except RuntimeError as e:
# Handle "Unexpected ASGI message" when client disconnects during send
if "Unexpected ASGI message" in str(e) or "websocket.close" in str(e):
logger.info(f"[WS] Client disconnected during synthesis: {e}")
else:
logger.error(f"RuntimeError during synthesis: {e}", exc_info=True)
except Exception as e:
logger.error(f"Synthesis error: {e}", exc_info=True)
await websocket.send_json({"error": str(e)})
@app.get("/stream/wav")
async def stream_wav(text: str, pitch_shift: int = 0):
"""
HTTP streaming endpoint that serves audio as WAV (playable in VLC/browsers).
Usage:
vlc http://localhost:8765/stream/wav?text=Hello%20world!
curl http://localhost:8765/stream/wav?text=Test > output.wav
Returns: WAV file (PCM s16le, 48kHz, mono)
"""
if pipeline is None:
return {"error": "Pipeline not initialized"}
import io
import wave
async def generate_wav():
"""Generate WAV file on the fly as audio chunks are synthesized"""
# Create in-memory WAV file
buffer = io.BytesIO()
# Get audio from Soprano + RVC
soprano_audio = pipeline._call_soprano_server(text)
soprano_audio_tensor = torch.from_numpy(soprano_audio).to(pipeline.device).float()
# Collect all audio chunks
audio_chunks = []
# Process through pipeline
chunk_size = 3200 # 0.1s @ 32kHz
num_chunks = (len(soprano_audio_tensor) + chunk_size - 1) // chunk_size
for i in range(num_chunks):
chunk_start = i * chunk_size
chunk_end = min((i + 1) * chunk_size, len(soprano_audio_tensor))
soprano_chunk = soprano_audio_tensor[chunk_start:chunk_end].cpu().numpy()
chunk_48k, chunk_16k = pipeline.process_soprano_chunk(soprano_chunk)
while pipeline.accumulate_and_process_block(chunk_48k, chunk_16k):
rvc_output = pipeline.process_through_rvc()
audio_chunks.append(rvc_output.cpu().numpy())
chunk_48k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
chunk_16k = torch.tensor([], device=pipeline.device, dtype=torch.float32)
# CRITICAL: Flush any remaining audio in buffers
flush_chunks = pipeline.flush_buffers()
for rvc_output in flush_chunks:
audio_chunks.append(rvc_output.cpu().numpy())
# Concatenate all audio
if not audio_chunks:
yield b''
return
full_audio = np.concatenate(audio_chunks)
# Convert float32 to int16 PCM
audio_int16 = (full_audio * 32767).clip(-32768, 32767).astype(np.int16)
# Write WAV header and data
with wave.open(buffer, 'wb') as wav:
wav.setnchannels(1) # Mono
wav.setsampwidth(2) # 16-bit
wav.setframerate(48000) # 48kHz
wav.writeframes(audio_int16.tobytes())
# Send WAV data
yield buffer.getvalue()
return StreamingResponse(
generate_wav(),
media_type="audio/wav",
headers={
"Content-Disposition": f'inline; filename="tts.wav"',
"Cache-Control": "no-cache"
}
)
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "Soprano + RVC Streaming API",
"version": "1.0",
"endpoints": {
"speak": "POST /api/speak - Queue text for synthesis",
"stream": "GET /api/stream/continuous - Continuous audio stream",
"stream_wav": "GET /stream/wav?text=... - HTTP streaming (VLC compatible)",
"websocket": "WS /ws/stream - Token-by-token streaming (recommended for Discord)",
"status": "GET /api/status - Pipeline status",
"health": "GET /health - Health check"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765)