Files
miku-discord/soprano_to_rvc/benchmark_components.py
koko210Serve 8ca716029e add: absorb soprano_to_rvc as regular subdirectory
Voice conversion pipeline (Soprano TTS → RVC) with Docker support.
Previously tracked as bare gitlink; removed .git/ directories and
absorbed into main repo for unified tracking.

Includes: Soprano TTS, RVC WebUI integration, Docker configs,
WebSocket API, and benchmark scripts.
Updated .gitignore to exclude large model weights (*.pth, *.pt, *.onnx, *.index).
287 files (3.1GB of ML weights properly excluded via gitignore).
2026-03-04 00:24:53 +02:00

395 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive benchmark suite to isolate Soprano vs RVC performance.
This script measures:
1. Soprano synthesis time (text -> audio)
2. RVC processing time (audio -> converted audio)
3. Full pipeline time (text -> converted audio)
4. Different text lengths
5. Statistical analysis across multiple runs
"""
import sys
import time
import os
import numpy as np
import zmq
import torch
import json
from pathlib import Path
from collections import defaultdict
import statistics
# Test sentences of varying complexity
TEST_CASES = [
("tiny", "Hello!"),
("short", "Hello, this is a test."),
("medium", "The quick brown fox jumps over the lazy dog."),
("long", "Artificial intelligence is revolutionizing the way we interact with technology."),
("very_long", "Artificial intelligence and machine learning are fundamentally transforming our understanding of human-computer interaction, enabling unprecedented levels of automation and personalization across diverse industries."),
]
RUNS_PER_TEST = 5 # Number of times to repeat each test
class PerformanceBenchmark:
"""Benchmark harness for Soprano + RVC pipeline"""
def __init__(self):
self.results = defaultdict(lambda: defaultdict(list))
# Initialize ZMQ connection to Soprano
self.zmq_context = zmq.Context()
self.soprano_socket = self.zmq_context.socket(zmq.REQ)
self.soprano_socket.connect("tcp://soprano:5555")
self.soprano_socket.setsockopt(zmq.RCVTIMEO, 30000)
# Import RVC components (assuming we're running in RVC container)
sys.path.insert(0, '/app/Retrieval-based-Voice-Conversion-WebUI')
from infer.lib import rtrvc as rvc_for_realtime
from configs.config import Config
from multiprocessing import Queue as MPQueue
self.Config = Config
self.rvc_for_realtime = rvc_for_realtime
self.MPQueue = MPQueue
# Initialize RVC
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.rvc = None
self.rvc_config = None
print(f"🔧 Initialized on device: {self.device}")
print(f"🔧 CUDA available: {torch.cuda.is_available()}")
def initialize_rvc(self):
"""Load RVC model"""
print("\n📦 Loading RVC model...")
start = time.time()
# Change to RVC directory
rvc_dir = '/app/Retrieval-based-Voice-Conversion-WebUI'
original_dir = os.getcwd()
os.chdir(rvc_dir)
try:
self.rvc_config = self.Config()
# Create queues
inp_q = self.MPQueue()
opt_q = self.MPQueue()
self.rvc = self.rvc_for_realtime.RVC(
key=0, # pitch shift
formant=0,
pth_path="assets/weights/MikuAI_e210_s6300.pth",
index_path="assets/indices/added_IVF1811_Flat_nprobe_1_MikuAI_v2.index",
index_rate=0.5,
n_cpu=4,
inp_q=inp_q,
opt_q=opt_q,
config=self.rvc_config,
last_rvc=None
)
# Apply optimizations
torch.set_grad_enabled(False)
torch.backends.cudnn.benchmark = True
if self.rvc_config.is_half:
self.rvc.model = self.rvc.model.half()
self.rvc.net_g = self.rvc.net_g.half()
finally:
os.chdir(original_dir)
elapsed = time.time() - start
print(f"✅ RVC loaded in {elapsed:.2f}s")
print(f" Device: {self.rvc_config.device}")
print(f" Version: {getattr(self.rvc, 'version', 'unknown')}")
print(f" Target SR: {getattr(self.rvc, 'tgt_sr', 48000)}Hz")
def call_soprano(self, text: str) -> tuple:
"""
Call Soprano server and measure time.
Returns: (audio_array, duration_seconds, sample_rate)
"""
import uuid
start = time.time()
# Send text to Soprano with job_id
job_id = str(uuid.uuid4())
self.soprano_socket.send_json({
"job_id": job_id,
"text": text
})
response = self.soprano_socket.recv_json()
elapsed = time.time() - start
# Check for errors
if "error" in response:
error_msg = response.get('error', 'Unknown error')
raise Exception(f"Soprano returned error: {error_msg}")
# Decode audio - Soprano returns it as a list
audio_array = np.array(response.get("audio"), dtype=np.float32)
sample_rate = 32000 # Soprano outputs at 32kHz
audio_duration = len(audio_array) / sample_rate
return audio_array, elapsed, audio_duration, sample_rate
def process_rvc(self, audio: np.ndarray, sample_rate: int) -> tuple:
"""
Process audio through RVC and measure time.
Returns: (converted_audio, duration_seconds)
"""
if self.rvc is None:
self.initialize_rvc()
start = time.time()
# Process through RVC (using infer_pipeline method)
converted = self.rvc.infer_pipeline(
audio,
sample_rate,
0, # pitch_shift
None, # pitch_guidance (f0_file)
"rmvpe", # f0method
"", # file_index
0.5, # index_rate
3, # filter_radius
48000, # tgt_sr
0, # resample_sr
0.25, # rms_mix_rate
"v2", # version
0.33, # protect
128, # crepe_hop_length
)
elapsed = time.time() - start
return converted[1], elapsed # Return audio array and time
def benchmark_soprano_only(self, label: str, text: str):
"""Test 1: Soprano synthesis only"""
print(f"\n 🎤 Testing Soprano: {label}")
for run in range(RUNS_PER_TEST):
try:
audio, soprano_time, audio_duration, sample_rate = self.call_soprano(text)
self.results[label]["soprano_time"].append(soprano_time)
self.results[label]["audio_duration"].append(audio_duration)
self.results[label]["soprano_rtf"].append(audio_duration / soprano_time if soprano_time > 0 else 0)
self.results[label]["audio_length"].append(len(audio))
print(f" Run {run+1}: {soprano_time*1000:.1f}ms -> {audio_duration:.2f}s audio (RTF: {audio_duration/soprano_time:.2f}x)")
# Store audio for RVC testing
if run == 0:
self.results[label]["cached_audio"] = audio
self.results[label]["sample_rate"] = sample_rate
except Exception as e:
print(f" Run {run+1}: ERROR - {e}")
def benchmark_rvc_only(self, label: str):
"""Test 2: RVC processing only (using cached Soprano output)"""
if "cached_audio" not in self.results[label]:
print(f"\n ⚠️ Skipping RVC test for {label} - no cached audio")
return
print(f"\n 🎙️ Testing RVC: {label}")
audio = self.results[label]["cached_audio"]
sample_rate = self.results[label]["sample_rate"]
for run in range(RUNS_PER_TEST):
try:
converted, rvc_time = self.process_rvc(audio, sample_rate)
audio_duration = len(audio) / sample_rate
self.results[label]["rvc_time"].append(rvc_time)
self.results[label]["rvc_rtf"].append(audio_duration / rvc_time if rvc_time > 0 else 0)
print(f" Run {run+1}: {rvc_time*1000:.1f}ms (RTF: {audio_duration/rvc_time:.2f}x)")
except Exception as e:
print(f" Run {run+1}: ERROR - {e}")
def benchmark_full_pipeline(self, label: str, text: str):
"""Test 3: Full pipeline (Soprano + RVC)"""
print(f"\n ⚡ Testing Full Pipeline: {label}")
if self.rvc is None:
self.initialize_rvc()
for run in range(RUNS_PER_TEST):
try:
pipeline_start = time.time()
# Step 1: Soprano
soprano_start = time.time()
audio, _, audio_duration, sample_rate = self.call_soprano(text)
soprano_time = time.time() - soprano_start
# Step 2: RVC
rvc_start = time.time()
converted, _ = self.process_rvc(audio, sample_rate)
rvc_time = time.time() - rvc_start
pipeline_time = time.time() - pipeline_start
self.results[label]["pipeline_time"].append(pipeline_time)
self.results[label]["pipeline_rtf"].append(audio_duration / pipeline_time if pipeline_time > 0 else 0)
self.results[label]["pipeline_soprano_time"].append(soprano_time)
self.results[label]["pipeline_rvc_time"].append(rvc_time)
print(f" Run {run+1}: {pipeline_time:.3f}s total (S: {soprano_time*1000:.1f}ms, R: {rvc_time*1000:.1f}ms, RTF: {audio_duration/pipeline_time:.2f}x)")
except Exception as e:
print(f" Run {run+1}: ERROR - {e}")
def print_statistics(self):
"""Print comprehensive statistical analysis"""
print("\n" + "="*80)
print(" "*25 + "PERFORMANCE ANALYSIS")
print("="*80 + "\n")
for label, text in TEST_CASES:
if label not in self.results:
continue
data = self.results[label]
print(f"\n{''*80}")
print(f"📊 {label.upper()}: \"{text[:60]}{'...' if len(text) > 60 else ''}\"")
print(''*80)
if "audio_duration" in data and data["audio_duration"]:
avg_duration = statistics.mean(data["audio_duration"])
print(f"\n 🎵 Audio Duration: {avg_duration:.2f}s")
# Soprano stats
if "soprano_time" in data and data["soprano_time"]:
print(f"\n 🎤 SOPRANO (isolated):")
print(f" ├─ Mean: {statistics.mean(data['soprano_time'])*1000:.1f}ms")
print(f" ├─ Median: {statistics.median(data['soprano_time'])*1000:.1f}ms")
print(f" ├─ Std Dev: {statistics.stdev(data['soprano_time'])*1000:.1f}ms" if len(data['soprano_time']) > 1 else " ├─ Std Dev: N/A")
print(f" ├─ Min: {min(data['soprano_time'])*1000:.1f}ms")
print(f" ├─ Max: {max(data['soprano_time'])*1000:.1f}ms")
print(f" └─ RTF: {statistics.mean(data['soprano_rtf']):.2f}x")
# RVC stats
if "rvc_time" in data and data["rvc_time"]:
print(f"\n 🎙️ RVC (isolated):")
print(f" ├─ Mean: {statistics.mean(data['rvc_time'])*1000:.1f}ms")
print(f" ├─ Median: {statistics.median(data['rvc_time'])*1000:.1f}ms")
print(f" ├─ Std Dev: {statistics.stdev(data['rvc_time'])*1000:.1f}ms" if len(data['rvc_time']) > 1 else " ├─ Std Dev: N/A")
print(f" ├─ Min: {min(data['rvc_time'])*1000:.1f}ms")
print(f" ├─ Max: {max(data['rvc_time'])*1000:.1f}ms")
print(f" └─ RTF: {statistics.mean(data['rvc_rtf']):.2f}x")
# Pipeline stats
if "pipeline_time" in data and data["pipeline_time"]:
print(f"\n ⚡ FULL PIPELINE:")
print(f" ├─ Mean: {statistics.mean(data['pipeline_time'])*1000:.1f}ms")
print(f" ├─ Median: {statistics.median(data['pipeline_time'])*1000:.1f}ms")
print(f" ├─ Std Dev: {statistics.stdev(data['pipeline_time'])*1000:.1f}ms" if len(data['pipeline_time']) > 1 else " ├─ Std Dev: N/A")
print(f" ├─ Min: {min(data['pipeline_time'])*1000:.1f}ms")
print(f" ├─ Max: {max(data['pipeline_time'])*1000:.1f}ms")
print(f" └─ RTF: {statistics.mean(data['pipeline_rtf']):.2f}x")
# Breakdown
if "pipeline_soprano_time" in data and "pipeline_rvc_time" in data:
avg_soprano = statistics.mean(data['pipeline_soprano_time'])
avg_rvc = statistics.mean(data['pipeline_rvc_time'])
total = avg_soprano + avg_rvc
print(f"\n 📈 PIPELINE BREAKDOWN:")
print(f" ├─ Soprano: {avg_soprano*1000:.1f}ms ({avg_soprano/total*100:.1f}%)")
print(f" └─ RVC: {avg_rvc*1000:.1f}ms ({avg_rvc/total*100:.1f}%)")
# Summary
print("\n" + "="*80)
print(" "*30 + "SUMMARY")
print("="*80 + "\n")
# Calculate average RTFs across all tests
soprano_rtfs = []
rvc_rtfs = []
pipeline_rtfs = []
for label, _ in TEST_CASES:
if label in self.results:
data = self.results[label]
if "soprano_rtf" in data:
soprano_rtfs.extend(data["soprano_rtf"])
if "rvc_rtf" in data:
rvc_rtfs.extend(data["rvc_rtf"])
if "pipeline_rtf" in data:
pipeline_rtfs.extend(data["pipeline_rtf"])
if soprano_rtfs:
print(f" 🎤 Soprano Average RTF: {statistics.mean(soprano_rtfs):.2f}x")
if rvc_rtfs:
print(f" 🎙️ RVC Average RTF: {statistics.mean(rvc_rtfs):.2f}x")
if pipeline_rtfs:
print(f" ⚡ Pipeline Average RTF: {statistics.mean(pipeline_rtfs):.2f}x")
# Bottleneck analysis
print(f"\n 💡 BOTTLENECK ANALYSIS:")
if soprano_rtfs and rvc_rtfs:
soprano_avg = statistics.mean(soprano_rtfs)
rvc_avg = statistics.mean(rvc_rtfs)
if soprano_avg < rvc_avg:
slower = "Soprano"
ratio = rvc_avg / soprano_avg
else:
slower = "RVC"
ratio = soprano_avg / rvc_avg
print(f" └─ {slower} is the bottleneck ({ratio:.2f}x slower)")
def main():
"""Run comprehensive benchmark suite"""
print("\n" + "="*80)
print(" "*20 + "SOPRANO + RVC COMPONENT BENCHMARK")
print("="*80)
benchmark = PerformanceBenchmark()
# Run all tests
for label, text in TEST_CASES:
print(f"\n{''*80}")
print(f" Testing: {label.upper()}")
print(f" Text: \"{text}\"")
print(f" Runs per test: {RUNS_PER_TEST}")
print(''*80)
# Test 1: Soprano only
benchmark.benchmark_soprano_only(label, text)
# Test 2: RVC only
benchmark.benchmark_rvc_only(label)
# Test 3: Full pipeline
benchmark.benchmark_full_pipeline(label, text)
# Print statistics
benchmark.print_statistics()
print("\n" + "="*80)
print(" "*30 + "BENCHMARK COMPLETE")
print("="*80 + "\n")
if __name__ == "__main__":
main()