#!/usr/bin/env python3 """ Test script to verify WebSocket communication with RVC server. """ import asyncio import websockets import json async def test_websocket(): url = "ws://172.25.0.1:8765/ws/stream" print(f"Connecting to {url}...") async with websockets.connect(url) as websocket: print("✓ Connected!") # Send a test token message = {"token": "Hello ", "pitch_shift": 0} print(f"Sending: {message}") await websocket.send(json.dumps(message)) print("✓ Message sent") # Wait a bit for audio print("Waiting for audio response...") try: response = await asyncio.wait_for(websocket.recv(), timeout=5.0) print(f"✓ Received {len(response)} bytes of audio") except asyncio.TimeoutError: print("✗ Timeout waiting for audio") # Send another token message = {"token": "world! ", "pitch_shift": 0} print(f"Sending: {message}") await websocket.send(json.dumps(message)) print("✓ Message sent") # Wait for more audio print("Waiting for audio response...") try: response = await asyncio.wait_for(websocket.recv(), timeout=5.0) print(f"✓ Received {len(response)} bytes of audio") except asyncio.TimeoutError: print("✗ Timeout waiting for audio") if __name__ == "__main__": asyncio.run(test_websocket())