reorganize: move all test scripts to tests/ directory

- Moved 8 root-level test scripts + 2 from bot/ to tests/
- Moved run_rocinante_test.sh runner script to tests/
- Added tests/README.md documenting each test's purpose, type, and requirements
- Added test_pfp_context.py and test_rocinante_comparison.py (previously untracked)
This commit is contained in:
2026-03-04 00:18:21 +02:00
parent 431f675fc7
commit fdde12c03d
12 changed files with 730 additions and 0 deletions

56
tests/README.md Normal file
View File

@@ -0,0 +1,56 @@
# Tests
Ad-hoc test scripts for the Miku Discord Bot. None of these use a formal test framework — they are standalone scripts written during development to validate specific features.
## Test Index
| Script | Type | Requirements | Purpose |
|--------|------|-------------|---------|
| `test_addressing.py` | Unit (self-contained) | None | Tests regex patterns for detecting when Miku is addressed in messages. Replicates logic from `bot/utils/core.py`. |
| `test_pfp_context.py` | Unit (self-contained) | None | Tests regex patterns for detecting profile-picture-related queries. |
| `test_conversation_history.py` | Unit | Built-in mocks | Tests conversation history management logic. |
| `test_error_handler.py` | Unit | Built-in mocks | Tests error handling utilities. |
| `test_evil_moods.py` | Integration | Running Cheshire Cat + Qdrant | Connects via WebSocket and tests all 10 evil mood personalities with sample messages. |
| `test_full_pipeline.py` | Integration | Running Cat + Qdrant | End-to-end test of the memory consolidation system v2. |
| `test_tts_audio.py` | Integration | Run **inside** miku-bot container | Tests the TTS audio streaming pipeline. |
| `test_voice_playback.py` | Integration | Active Discord voice session | Tests audio playback in a live voice channel. |
| `test_websocket.py` | Integration | RVC server at `172.25.0.1:8765` | Tests WebSocket communication with the RVC voice conversion server. |
| `test_rocinante_comparison.py` | Benchmark | Full stack (llama-swap-amd, Cat) | Benchmarks Rocinante-X 12B model through both Normal and Evil Miku scenarios. Outputs to `/tmp/test_rocinante_comparison.log`. |
| `run_rocinante_test.sh` | Shell runner | Docker, full stack | Wrapper script that copies `test_rocinante_comparison.py` into the miku-bot container and runs it. |
## Running Tests
### Self-contained unit tests (no services needed)
```bash
python3 tests/test_addressing.py
python3 tests/test_pfp_context.py
python3 tests/test_conversation_history.py
python3 tests/test_error_handler.py
```
### Integration tests (require running Docker services)
```bash
# Evil moods — needs Cat + Qdrant running
python3 tests/test_evil_moods.py
# Memory consolidation pipeline — needs Cat + Qdrant
python3 tests/test_full_pipeline.py
# TTS — run inside the miku-bot container
docker exec miku-bot python3 /app/tests/test_tts_audio.py
# Voice playback — needs an active voice session
python3 tests/test_voice_playback.py
# WebSocket to RVC — needs RVC server running
python3 tests/test_websocket.py
```
### Benchmark tests
```bash
# Rocinante model comparison (takes a while)
./tests/run_rocinante_test.sh
```

36
tests/run_rocinante_test.sh Executable file
View File

@@ -0,0 +1,36 @@
#!/bin/bash
# Run the Rocinante comparison test inside the miku-bot container
# (which has aiohttp, docker access, and network connectivity to Cat)
set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
TEST_FILE="$SCRIPT_DIR/test_rocinante_comparison.py"
CONTAINER="miku-bot"
LOG_FILE="/tmp/test_rocinante_comparison.log"
echo "=== Rocinante-X 12B Comparison Test ==="
echo ""
# 1. Copy test script into the container
echo "[1/4] Copying test script into $CONTAINER..."
docker cp "$TEST_FILE" "$CONTAINER:/tmp/test_rocinante_comparison.py"
# 2. Restart llama-swap-amd to pick up the new rocinante config
echo "[2/4] Restarting llama-swap-amd to load new config..."
docker restart llama-swap-amd
echo " Waiting 10s for llama-swap-amd to be ready..."
sleep 10
# 3. Run the test inside the container (interactive for live output)
echo "[3/4] Running test inside $CONTAINER (this will take a while)..."
echo ""
docker exec -t "$CONTAINER" python3 /tmp/test_rocinante_comparison.py
# 4. Copy log back to host
echo ""
echo "[4/4] Copying log file to host..."
docker cp "$CONTAINER:$LOG_FILE" "$LOG_FILE"
echo ""
echo "✓ Done! Log file: $LOG_FILE"
echo " Compare with: diff <(cat /tmp/test_comparison_live.log) <(cat $LOG_FILE)"

249
tests/test_addressing.py Normal file
View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
"""Comprehensive test for Miku addressing detection patterns.
Tests the pre-compiled regex patterns from bot/utils/core.py to verify
that Miku is only triggered when *addressed*, not merely *mentioned*.
"""
import re
import sys
# ── Replicate the pattern-building logic from core.py ──
def _build_name_variants(bases, honorifics, prefixes, connector, prefix_connector):
variants = []
for base in bases:
be = re.escape(base)
variants.append(be)
for h in honorifics:
he = re.escape(h)
variants.append(be + connector + he)
for p in prefixes:
pe = re.escape(p)
variants.append(pe + prefix_connector + be)
for h in honorifics:
he = re.escape(h)
variants.append(pe + prefix_connector + be + connector + he)
return variants
latin = _build_name_variants(
bases=['miku'],
honorifics=[
'chan', 'san', 'kun', 'nyan', 'hime', 'tan', 'chin', 'heika',
'denka', 'kakka', 'shi', 'chama', 'kyun', 'dono', 'sensei',
'senpai', 'jou',
],
prefixes=['o-'],
connector=r'[\-\s]?',
prefix_connector=r'\s?',
)
cyrillic = _build_name_variants(
bases=['мику'],
honorifics=[
'чан', 'сан', 'кун', 'нян', 'химе', 'тан', 'чин',
'хейка', 'хеика', 'денка', 'какка', 'си', 'чама', 'кюн',
'доно', 'сенсэй', 'сенсеи', 'сенпай', 'сенпаи', 'джо',
],
prefixes=['о-'],
connector=r'[\-\s]?',
prefix_connector=r'\s?',
)
japanese = _build_name_variants(
bases=['みく', 'ミク', '未来'],
honorifics=[
'ちゃん', 'さん', 'くん', 'にゃん', 'ひめ', 'たん', 'ちん',
'へいか', 'でんか', 'かっか', '', 'ちゃま', 'きゅん', 'どの',
'せんせい', 'せんぱい', 'じょう',
'チャン', 'サン', 'クン', 'ニャン', 'ヒメ', 'タン', 'チン',
'ヘイカ', 'デンカ', 'カッカ', '', 'チャマ', 'キュン', 'ドノ',
'センセイ', 'センパイ', 'ジョウ',
],
prefixes=['', ''],
connector=r'[-]?',
prefix_connector=r'',
)
all_v = sorted(latin + cyrillic + japanese, key=len, reverse=True)
alts = '|'.join(all_v)
NAME = rf'\b(?:{alts})\b'
PUNCT = r'[,,、:!?.。]'
COMMA = r'[,,、]'
ETRAIL = r'[!?.。~]*'
ATRAIL = r'[!?.。~~♪♡❤]*'
START_RE = re.compile(rf'^\s*{NAME}\s*{PUNCT}', re.IGNORECASE)
END_RE = re.compile(rf'{COMMA}\s*{NAME}\s*{ETRAIL}\s*$', re.IGNORECASE)
MIDDLE_RE = re.compile(rf'{COMMA}\s*{NAME}\s*{COMMA}', re.IGNORECASE)
ALONE_RE = re.compile(rf'^\s*{NAME}\s*{ATRAIL}\s*$', re.IGNORECASE)
def is_addressed(text: str) -> bool:
text = text.strip()
return bool(
START_RE.search(text)
or END_RE.search(text)
or MIDDLE_RE.search(text)
or ALONE_RE.search(text)
)
def which_pattern(text: str) -> str:
"""Return which pattern matched (for debugging)."""
text = text.strip()
matched = []
if START_RE.search(text):
matched.append("START")
if END_RE.search(text):
matched.append("END")
if MIDDLE_RE.search(text):
matched.append("MIDDLE")
if ALONE_RE.search(text):
matched.append("ALONE")
return ', '.join(matched) if matched else 'NONE'
# ── Test cases ──
# (message, expected, description)
TESTS = [
# ═══ START pattern (name at beginning + punctuation) ═══
("Miku, how are you?", True, "START: Latin + comma"),
("miku, hello!", True, "START: lowercase Latin"),
("MIKU! listen to me", True, "START: uppercase + excl"),
("Miku: can you help?", True, "START: colon"),
("Miku. Please help.", True, "START: period"),
("みく、元気?", True, "START: Hiragana + JP comma"),
("ミク!聞いて", True, "START: Katakana + JP excl"),
("未来、教えて", True, "START: Kanji + JP comma"),
("мику, привет!", True, "START: Cyrillic + comma"),
("МИКУ! слушай", True, "START: Cyrillic upper + excl"),
("Miku-chan, how are you?", True, "START: honorific-dash + comma"),
("miku chan, hello!", True, "START: honorific-space + comma"),
("mikuchan! listen!", True, "START: honorific-joined + excl"),
("ミクちゃん、聞いて", True, "START: JP name+honorific + comma"),
("ミクちゃん!元気?", True, "START: JP name+honorific + excl"),
("みくさん, 教えて", True, "START: Hiragana + hon + comma"),
("мику-сан, скажи", True, "START: Cyrillic + hon + comma"),
("o-miku, hello", True, "START: o-prefix Latin"),
("おみく、ねえ", True, "START: o-prefix Japanese"),
(" Miku, hello ", True, "START: whitespace padded"),
# ═══ END pattern (comma + name at end) ═══
("how are you, Miku?", True, "END: comma + Latin + ?"),
("how are you, Miku!", True, "END: comma + Latin + !"),
("how are you, Miku", True, "END: comma + Latin no trail"),
("tell me, miku.", True, "END: comma + lowercase + period"),
("元気, ミク", True, "END: comma + Katakana"),
("教えて、みく!", True, "END: JP comma + Hiragana + !"),
("教えて、未来", True, "END: JP comma + Kanji"),
("скажи, мику!", True, "END: Cyrillic comma + name"),
("hello, Miku-chan!", True, "END: comma + honorific"),
("hello, miku-san?", True, "END: comma + honorific + ?"),
("元気、ミクちゃん", True, "END: JP comma + JP honorific"),
("hello, o-miku", True, "END: comma + o-prefix"),
# ═══ MIDDLE pattern (vocative — commas on both sides) ═══
("On the contrary, Miku, I think you're wrong", True, "MIDDLE: vocative Latin"),
("I am very happy, Miku, you are so fun", True, "MIDDLE: vocative Latin 2"),
("well, Miku-chan, I think so", True, "MIDDLE: vocative + honorific"),
("しかし、みく、それは違う", True, "MIDDLE: vocative Japanese"),
("でも、ミクちゃん、聞いて", True, "MIDDLE: vocative JP + honorific"),
("но, мику, я думаю", True, "MIDDLE: vocative Cyrillic"),
("hey, miku, what do you think?", True, "MIDDLE: vocative casual"),
("you know, Miku, that's not right", True, "MIDDLE: vocative mid-sentence"),
# ═══ ALONE pattern (name is the entire message) ═══
("Miku", True, "ALONE: bare Latin"),
("miku", True, "ALONE: lowercase"),
("MIKU", True, "ALONE: uppercase"),
("Miku!", True, "ALONE: + excl"),
("Miku?", True, "ALONE: + question"),
("Miku!!", True, "ALONE: + multi excl"),
("みく", True, "ALONE: Hiragana"),
("ミク!", True, "ALONE: Katakana + excl"),
("未来", True, "ALONE: Kanji"),
("мику", True, "ALONE: Cyrillic"),
("Miku-chan", True, "ALONE: Latin + honorific"),
("miku chan!", True, "ALONE: space honorific + excl"),
("ミクちゃん", True, "ALONE: JP honorific"),
("ミクさん!", True, "ALONE: JP honorific + excl"),
("みくせんせい", True, "ALONE: Hiragana + sensei"),
("o-miku!", True, "ALONE: o-prefix"),
("おみく", True, "ALONE: JP o-prefix"),
("オミク", True, "ALONE: Katakana o-prefix"),
(" Miku ", True, "ALONE: whitespace"),
("Miku~", True, "ALONE: tilde"),
("Miku♪", True, "ALONE: music note"),
("Miku❤", True, "ALONE: heart"),
("мику-чан", True, "ALONE: Cyrillic + honorific"),
("мику сан", True, "ALONE: Cyrillic + space hon"),
("未来さん", True, "ALONE: Kanji + honorific"),
# ═══ Should NOT match (mere mentions / not addressing) ═══
("I like Miku", False, "REJECT: object of sentence"),
("Miku is cool", False, "REJECT: subject + is"),
("Miku is my favorite vocaloid", False, "REJECT: subject + statement"),
("I saw Miku at a concert", False, "REJECT: middle of sentence"),
("told miku about it", False, "REJECT: informal mention"),
("hatsune miku concert", False, "REJECT: event name"),
("Do you know Miku?", False, "REJECT: asking about her"),
("I love Miku!", False, "REJECT: exclamation about her"),
("I love Miku so much", False, "REJECT: longer statement"),
("ミクは元気だよ", False, "REJECT: Japanese 'Miku is well'"),
("ミクが好き", False, "REJECT: Japanese 'I like Miku'"),
("ミクのことが好き", False, "REJECT: Japanese 'I like Miku (thing)'"),
("мику была там", False, "REJECT: Cyrillic 'Miku was there'"),
("мику такая красивая", False, "REJECT: Cyrillic 'Miku is pretty'"),
("the Miku concert was great", False, "REJECT: event discussion"),
("My favorite is Miku for sure", False, "REJECT: no comma before name at end"),
("yeah miku is pretty cool right", False, "REJECT: casual mention"),
("have you seen miku today", False, "REJECT: asking about her"),
("miku and I went shopping", False, "REJECT: subject of sentence"),
("I met miku yesterday", False, "REJECT: object mid-sentence"),
("mikumiku fan", False, "REJECT: compound word (\\b boundary)"),
("hatsune miku is singing", False, "REJECT: full name as subject"),
# ═══ Edge cases ═══
("", False, "EDGE: empty message"),
("hello", False, "EDGE: no name at all"),
("hello!", False, "EDGE: exclamation, no name"),
("??", False, "EDGE: just punctuation"),
(" ", False, "EDGE: just whitespace"),
("chan", False, "EDGE: just an honorific"),
("o-", False, "EDGE: just a prefix"),
]
def main():
print(f"Generated {len(all_v)} name variants")
print(f"Running {len(TESTS)} test cases...\n")
passed = 0
failed = 0
for msg, expected, desc in TESTS:
result = is_addressed(msg)
ok = result == expected
if ok:
passed += 1
else:
failed += 1
pattern = which_pattern(msg)
exp_str = "ADDR" if expected else "SKIP"
got_str = "ADDR" if result else "SKIP"
print(f" FAIL expected={exp_str} got={got_str} matched={pattern}")
print(f" {desc}")
print(f" message: \"{msg}\"\n")
print(f"\n{'='*50}")
print(f" {passed}/{len(TESTS)} passed, {failed} failed")
print(f"{'='*50}")
return 0 if failed == 0 else 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""
Test script for the new conversation history system.
"""
from utils.conversation_history import conversation_history
def test_conversation_history():
print("🧪 Testing conversation history system...\n")
# Test 1: Add messages to a server channel
print("Test 1: Adding messages to server channel")
server_id = "123456789"
conversation_history.add_message(server_id, "Alice", "Hello Miku!", is_bot=False)
conversation_history.add_message(server_id, "Miku", "Hi Alice! 💙", is_bot=True)
conversation_history.add_message(server_id, "Bob", "What's up?", is_bot=False)
conversation_history.add_message(server_id, "Miku", "Just chatting! ✨", is_bot=True)
recent = conversation_history.get_recent_messages(server_id)
print(f" Recent messages: {len(recent)}")
for author, content, is_bot in recent:
role = "BOT" if is_bot else "USER"
print(f" [{role}] {author}: {content}")
# Test 2: Format for LLM
print("\nTest 2: Format for LLM (OpenAI messages)")
messages = conversation_history.format_for_llm(server_id, max_messages=4)
for msg in messages:
print(f" {msg['role']}: {msg['content']}")
# Test 3: Add messages to a DM channel
print("\nTest 3: Adding messages to DM channel")
user_id = "987654321"
conversation_history.add_message(user_id, "Charlie", "Can you help me?", is_bot=False)
conversation_history.add_message(user_id, "Miku", "Of course! What do you need?", is_bot=True)
conversation_history.add_message(user_id, "Charlie", "I need song recommendations", is_bot=False)
dm_messages = conversation_history.format_for_llm(user_id)
print(f" DM messages: {len(dm_messages)}")
for msg in dm_messages:
print(f" {msg['role']}: {msg['content']}")
# Test 4: Empty message filtering
print("\nTest 4: Empty message filtering")
conversation_history.add_message(server_id, "Dave", "", is_bot=False) # Should be ignored
conversation_history.add_message(server_id, "Dave", " ", is_bot=False) # Should be ignored
conversation_history.add_message(server_id, "Dave", "Real message", is_bot=False)
filtered = conversation_history.get_recent_messages(server_id)
print(f" Messages after adding empty ones: {len(filtered)}")
print(f" Last message: {filtered[-1][1]}")
# Test 5: Message truncation
print("\nTest 5: Message truncation")
long_message = "A" * 600 # 600 chars
conversation_history.add_message(server_id, "Eve", long_message, is_bot=False)
truncated = conversation_history.format_for_llm(server_id, max_chars_per_message=500)
last_msg = truncated[-1]['content']
print(f" Original length: {len(long_message)}")
print(f" Truncated length: {len(last_msg)}")
print(f" Ends with '...': {last_msg.endswith('...')}")
# Test 6: Channel stats
print("\nTest 6: Channel statistics")
stats = conversation_history.get_channel_stats(server_id)
print(f" Server stats: {stats}")
dm_stats = conversation_history.get_channel_stats(user_id)
print(f" DM stats: {dm_stats}")
print("\n✅ All tests completed!")
if __name__ == "__main__":
test_conversation_history()

119
tests/test_error_handler.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""Test the error handler to ensure it correctly detects error messages."""
import sys
import os
import re
# Add the bot directory to the path so we can import modules
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Directly implement the error detection function to avoid module dependencies
def is_error_response(response_text: str) -> bool:
"""
Detect if a response text is an error message.
Args:
response_text: The response text to check
Returns:
bool: True if the response appears to be an error message
"""
if not response_text or not isinstance(response_text, str):
return False
response_lower = response_text.lower().strip()
# Common error patterns
error_patterns = [
r'^error:?\s*\d{3}', # "Error: 502" or "Error 502"
r'^error:?\s+', # "Error: " or "Error "
r'^\d{3}\s+error', # "502 Error"
r'^sorry,?\s+(there\s+was\s+)?an?\s+error', # "Sorry, an error" or "Sorry, there was an error"
r'^sorry,?\s+the\s+response\s+took\s+too\s+long', # Timeout error
r'connection\s+(refused|failed|error|timeout)',
r'timed?\s*out',
r'failed\s+to\s+(connect|respond|process)',
r'service\s+unavailable',
r'internal\s+server\s+error',
r'bad\s+gateway',
r'gateway\s+timeout',
]
# Check if response matches any error pattern
for pattern in error_patterns:
if re.search(pattern, response_lower):
return True
# Check for HTTP status codes indicating errors
if re.match(r'^\d{3}$', response_text.strip()):
status_code = int(response_text.strip())
if status_code >= 400: # HTTP error codes
return True
return False
# Test cases
test_cases = [
# Error responses (should return True)
("Error 502", True),
("Error: 502", True),
("Error: Bad Gateway", True),
("502 Error", True),
("Sorry, there was an error", True),
("Sorry, an error occurred", True),
("Sorry, the response took too long. Please try again.", True),
("Connection refused", True),
("Connection timeout", True),
("Timed out", True),
("Failed to connect", True),
("Service unavailable", True),
("Internal server error", True),
("Bad gateway", True),
("Gateway timeout", True),
("500", True),
("502", True),
("503", True),
# Normal responses (should return False)
("Hi! How are you doing today?", False),
("I'm Hatsune Miku! *waves*", False),
("That's so cool! Tell me more!", False),
("Sorry to hear that!", False),
("I'm sorry, but I can't help with that.", False),
("200", False),
("304", False),
("The error in your code is...", False),
]
def run_tests():
print("Testing error detection...")
print("=" * 60)
passed = 0
failed = 0
for text, expected in test_cases:
result = is_error_response(text)
status = "" if result == expected else ""
if result == expected:
passed += 1
else:
failed += 1
print(f"{status} FAILED: '{text}' -> {result} (expected {expected})")
print("=" * 60)
print(f"Tests passed: {passed}/{len(test_cases)}")
print(f"Tests failed: {failed}/{len(test_cases)}")
if failed == 0:
print("\n✓ All tests passed!")
else:
print(f"\n{failed} test(s) failed")
return failed == 0
if __name__ == "__main__":
success = run_tests()
exit(0 if success else 1)

121
tests/test_evil_moods.py Normal file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Comprehensive Evil Miku Mood Test
Connects to Cheshire Cat via WebSocket for each of the 10 evil moods,
sends varied test messages, and displays responses side-by-side.
Uses the discord_bridge plugin's metadata to set mood and evil mode.
Uses only stdlib (asyncio) + websockets-like raw socket to avoid pip deps.
"""
import asyncio
import json
import http.client
import sys
CAT_HOST = "localhost"
CAT_PORT = 1865
EVIL_MOODS = [
"aggressive",
"cunning",
"sarcastic",
"evil_neutral",
"bored",
"manic",
"jealous",
"melancholic",
"playful_cruel",
"contemptuous",
]
# Varied messages to test different mood expressions
TEST_MESSAGES = [
"Hey, how's it going?",
"What do you think about humans?",
"Tell me something interesting.",
]
def query_cat_http(mood: str, message: str, timeout: float = 120.0) -> str:
"""Send a message to the Cat via HTTP POST /message with mood metadata."""
payload = json.dumps({
"text": message,
"discord_mood": mood,
"discord_evil_mode": True,
})
try:
conn = http.client.HTTPConnection(CAT_HOST, CAT_PORT, timeout=timeout)
headers = {"Content-Type": "application/json", "user_id": f"mood_test_{mood}"}
conn.request("POST", "/message", body=payload, headers=headers)
resp = conn.getresponse()
if resp.status == 200:
data = json.loads(resp.read().decode())
return data.get("content", "(empty)")
else:
return f"(HTTP {resp.status})"
except Exception as e:
return f"(error: {e})"
def run_tests():
print("=" * 80)
print(" EVIL MIKU COMPREHENSIVE MOOD TEST")
print("=" * 80)
print(f" Testing {len(EVIL_MOODS)} moods × {len(TEST_MESSAGES)} messages")
print(f" Cat HTTP: http://{CAT_HOST}:{CAT_PORT}")
print("=" * 80)
results = {}
for mood in EVIL_MOODS:
results[mood] = []
print(f"\n{'' * 80}")
print(f" MOOD: {mood.upper()}")
print(f"{'' * 80}")
for i, message in enumerate(TEST_MESSAGES):
print(f"\n [{i+1}/{len(TEST_MESSAGES)}] User: {message}")
response = query_cat_http(mood, message)
results[mood].append(response)
print(f" Evil Miku: {response}")
# Summary
print(f"\n\n{'=' * 80}")
print(" SUMMARY")
print(f"{'=' * 80}")
# Check for identical responses (the main problem we're trying to fix)
all_responses = []
for mood, responses in results.items():
all_responses.extend(responses)
unique = set(all_responses)
print(f"\n Total responses: {len(all_responses)}")
print(f" Unique responses: {len(unique)}")
if len(unique) < len(all_responses) * 0.7:
print(" ⚠️ WARNING: Many duplicate responses detected!")
else:
print(" ✅ Good variety in responses!")
# Check for "*rolls eyes*" only responses
action_only = [r for r in all_responses if r.strip().startswith("*") and r.strip().endswith("*") and len(r.strip()) < 30]
if action_only:
print(f" ⚠️ {len(action_only)} action-only responses (e.g., '*rolls eyes*')")
else:
print(" ✅ No action-only responses!")
# Average response length
lengths = [len(r) for r in all_responses if not r.startswith("(")]
if lengths:
avg = sum(lengths) / len(lengths)
print(f" Avg response length: {avg:.0f} chars")
print()
if __name__ == "__main__":
run_tests()

254
tests/test_full_pipeline.py Normal file
View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
Full Pipeline Test for Memory Consolidation System v2.0.0
"""
import requests
import time
import json
import sys
CAT_URL = "http://localhost:1865"
QDRANT_URL = "http://localhost:6333"
CONSOLIDATION_TIMEOUT = 180
def send_message(text, timeout=30):
try:
resp = requests.post(f"{CAT_URL}/message", json={"text": text}, timeout=timeout)
resp.raise_for_status()
return resp.json()
except requests.exceptions.Timeout:
return {"error": "timeout", "content": ""}
except Exception as e:
return {"error": str(e), "content": ""}
def qdrant_scroll(collection, limit=200, filt=None):
body = {"limit": limit, "with_payload": True, "with_vector": False}
if filt:
body["filter"] = filt
resp = requests.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body)
return resp.json()["result"]["points"]
def qdrant_count(collection):
return len(qdrant_scroll(collection))
def section(title):
print(f"\n{'=' * 70}")
print(f" {title}")
print(f"{'=' * 70}")
print("=" * 70)
print(" FULL PIPELINE TEST - Memory Consolidation v2.0.0")
print("=" * 70)
try:
requests.get(f"{CAT_URL}/", timeout=5)
except Exception:
print("ERROR: Cat not reachable"); sys.exit(1)
try:
requests.get(f"{QDRANT_URL}/collections", timeout=5)
except Exception:
print("ERROR: Qdrant not reachable"); sys.exit(1)
episodic_start = qdrant_count("episodic")
declarative_start = qdrant_count("declarative")
print(f"\nStarting state: {episodic_start} episodic, {declarative_start} declarative")
results = {}
# TEST 1: Trivial Message Filtering
section("TEST 1: Trivial Message Filtering")
trivial_messages = ["lol", "k", "ok", "haha", "xd", "brb"]
print(f"Sending {len(trivial_messages)} trivial messages...")
for msg in trivial_messages:
send_message(msg)
time.sleep(0.3)
time.sleep(1)
# Count only USER episodic memories (exclude Miku's responses)
user_episodic = qdrant_scroll("episodic", filt={
"must_not": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
trivial_user_stored = len(user_episodic) - episodic_start
episodic_after_trivial = qdrant_count("episodic")
# discord_bridge filters trivial user messages, but Miku still responds
# so we only check user-side storage
if trivial_user_stored < len(trivial_messages):
print(f" PASS - Only {trivial_user_stored}/{len(trivial_messages)} user trivial messages stored")
print(f" (Total episodic incl. Miku responses: {episodic_after_trivial})")
results["trivial_filtering"] = True
else:
print(f" WARN - All {trivial_user_stored} trivial messages stored")
results["trivial_filtering"] = False
# TEST 2: Important Message Storage
section("TEST 2: Important Message Storage")
personal_facts = [
"My name is Sarah Chen",
"I'm 28 years old",
"I live in Seattle, Washington",
"I work as a software engineer at Microsoft",
"My favorite color is forest green",
"I love playing piano and have practiced for 15 years",
"I'm learning Japanese, currently at N3 level",
"I have a cat named Luna",
"I'm allergic to peanuts",
"My birthday is March 15th",
"I graduated from UW in 2018",
"I enjoy hiking on weekends",
]
print(f"Sending {len(personal_facts)} personal info messages...")
for i, fact in enumerate(personal_facts, 1):
resp = send_message(fact)
status = "OK" if "error" not in resp else "ERR"
print(f" [{i}/{len(personal_facts)}] {status} {fact[:50]}")
time.sleep(0.5)
time.sleep(1)
episodic_after_personal = qdrant_count("episodic")
personal_stored = episodic_after_personal - episodic_after_trivial
print(f"\n Episodic memories from personal info: {personal_stored}")
results["important_storage"] = personal_stored >= len(personal_facts)
print(f" {'PASS' if results['important_storage'] else 'FAIL'} - Expected >={len(personal_facts)}, got {personal_stored}")
# TEST 3: Miku Response Storage
section("TEST 3: Bidirectional Memory (Miku Response Storage)")
miku_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
print(f" Miku's memories in episodic: {len(miku_points)}")
if miku_points:
print(f" Sample: \"{miku_points[0]['payload']['page_content'][:70]}\"")
results["miku_storage"] = True
print(" PASS")
else:
results["miku_storage"] = False
print(" FAIL - No Miku responses in episodic memory")
# TEST 4: Per-User Source Tagging
section("TEST 4: Per-User Source Tagging")
user_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "user"}}]
})
print(f" Points with source='user': {len(user_points)}")
global_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "global"}}]
})
print(f" Points with source='global' (old bug): {len(global_points)}")
results["user_tagging"] = len(user_points) > 0 and len(global_points) == 0
print(f" {'PASS' if results['user_tagging'] else 'FAIL'}")
# TEST 5: Memory Consolidation
section("TEST 5: Memory Consolidation & Fact Extraction")
print(f" Triggering consolidation (timeout={CONSOLIDATION_TIMEOUT}s)...")
t0 = time.time()
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
elapsed = time.time() - t0
if "error" in resp:
print(f" WARN - HTTP issue: {resp['error']} ({elapsed:.0f}s)")
print(" Waiting 60s for background completion...")
time.sleep(60)
else:
print(f" Completed in {elapsed:.1f}s")
content = resp.get("content", "")
print(f" Response: {content[:120]}...")
time.sleep(3)
declarative_after = qdrant_count("declarative")
new_facts = declarative_after - declarative_start
print(f"\n Declarative facts: {declarative_start} -> {declarative_after} (+{new_facts})")
results["consolidation"] = new_facts >= 5
print(f" {'PASS' if results['consolidation'] else 'FAIL'} - {'>=5 facts' if results['consolidation'] else f'only {new_facts}'}")
all_facts = qdrant_scroll("declarative")
print(f"\n All declarative facts ({len(all_facts)}):")
for i, f in enumerate(all_facts, 1):
content = f["payload"]["page_content"]
meta = f["payload"].get("metadata", {})
source = meta.get("source", "?")
ftype = meta.get("fact_type", "?")
print(f" {i}. [{source}|{ftype}] {content}")
# TEST 6: Duplicate Detection
section("TEST 6: Duplicate Detection (2nd consolidation)")
facts_before_2nd = qdrant_count("declarative")
print(f" Facts before: {facts_before_2nd}")
print(f" Running consolidation again...")
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
time.sleep(3)
facts_after_2nd = qdrant_count("declarative")
new_dupes = facts_after_2nd - facts_before_2nd
print(f" Facts after: {facts_after_2nd} (+{new_dupes})")
results["dedup"] = new_dupes <= 2
print(f" {'PASS' if results['dedup'] else 'FAIL'} - {new_dupes} new facts (<=2 expected)")
# TEST 7: Fact Recall
section("TEST 7: Fact Recall via Natural Language")
queries = {
"What is my name?": ["sarah", "chen"],
"How old am I?": ["28"],
"Where do I live?": ["seattle"],
"Where do I work?": ["microsoft", "software engineer"],
"What am I allergic to?": ["peanut"],
}
correct = 0
for question, keywords in queries.items():
resp = send_message(question)
answer = resp.get("content", "")
hit = any(kw.lower() in answer.lower() for kw in keywords)
if hit:
correct += 1
icon = "OK" if hit else "??"
print(f" {icon} Q: {question}")
print(f" A: {answer[:150]}")
time.sleep(1)
accuracy = correct / len(queries) * 100
results["recall"] = correct >= 3
print(f"\n Recall: {correct}/{len(queries)} ({accuracy:.0f}%)")
print(f" {'PASS' if results['recall'] else 'FAIL'} (threshold: >=3)")
# FINAL SUMMARY
section("FINAL SUMMARY")
total = len(results)
passed = sum(1 for v in results.values() if v)
print()
for name, ok in results.items():
print(f" [{'PASS' if ok else 'FAIL'}] {name}")
print(f"\n Score: {passed}/{total}")
print(f" Episodic: {qdrant_count('episodic')}")
print(f" Declarative: {qdrant_count('declarative')}")
if passed == total:
print("\n ALL TESTS PASSED!")
elif passed >= total - 1:
print("\n MOSTLY PASSING - minor issues only")
else:
print("\n SOME TESTS FAILED - review above")
print("\n" + "=" * 70)

78
tests/test_pfp_context.py Normal file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Test profile picture context plugin
"""
import re
# Test patterns
PFP_PATTERNS = [
# Direct PFP references
r'\b(what|describe|tell me about|explain|show|how)\b.*\b(pfp|profile pic|avatar|picture|pic)\b',
r'\b(your|miku\'?s?)\b.*\b(pfp|profile pic|avatar|picture|pic)\b',
r'\b(pfp|profile pic|avatar|picture|pic)\b.*\b(is|look|show|about|like)',
# Questions about appearance
r'\b(what|how).*\b(you|miku)\b.*(look|looking|appear)',
r'\byour (new )?look\b',
r'\b(what|how).*looking like\b',
# Questions about the image itself
r'\b(think|feel|opinion|thoughts)\b.*\b(about|of)\b.*\b(your|that|the|this)?\b.*\b(pfp|profile|avatar|pic|picture|image)\b',
r'\b(why|how|when).*\b(pick|choose|chose|picked|select|change|changed)\b.*\b(pfp|profile|avatar|pic|picture|that)\b',
r'\b(new|current|latest)\b.*\b(pfp|profile pic|avatar|pic|picture)\b',
# "What do you think about your pfp"
r'\bthink.*\b(your|that|the|this)\b.*\b(pfp|profile|avatar|pic|picture)\b',
r'\b(your|that|the|this)\b.*\b(pfp|profile|avatar|pic|picture)\b.*\bthink\b',
# "How did you decide/pick"
r'\b(decide|decided|pick|picked|choose|chose|select)\b.*\b(pfp|profile|avatar|pic|picture|that|this)\b',
# "Tell me about that pfp" / "What's with the pfp"
r'\bwhat\'?s?\b.*\bwith\b.*\b(pfp|profile|avatar|pic|picture)\b',
r'\btell me\b.*\b(pfp|profile|avatar|pic|picture|that|this)\b',
]
test_queries = [
# Original tests
"What does your pfp look like?",
"Describe your profile picture",
"Tell me about your avatar",
"What's your profile pic?",
"How do you look today?",
"Your new look is cool",
"What are you looking like?",
"Show me your picture",
# User's examples
"How did you decide to pick that pfp?",
"What do you think about your new profile pic?",
"What do you think about your pfp, Miku?",
"How did you choose that avatar?",
"Why did you pick that pfp?",
"When did you change your profile pic?",
"Tell me about that pfp",
"What's with the pfp?",
"Your current pfp is nice",
"How did you decide on that picture?",
# Should NOT match
"What's the weather like?",
"Hello Miku!",
"How are you feeling?",
"What do you think about music?",
]
def matches_pfp_query(text: str) -> bool:
"""Check if the message is asking about the profile picture"""
text_lower = text.lower()
for pattern in PFP_PATTERNS:
if re.search(pattern, text_lower, re.IGNORECASE):
return True
return False
print("Testing PFP pattern matching:\n")
for query in test_queries:
result = matches_pfp_query(query)
status = "✓ MATCH" if result else "✗ NO MATCH"
print(f"{status}: {query}")

View File

@@ -0,0 +1,560 @@
#!/usr/bin/env python3
"""
Rocinante-X 12B Model Comparison Test
======================================
Tests the Rocinante-X-12B-v1b model through the same scenarios used
in the existing llama3.1/darkidol comparison, using Cheshire Cat as the
inference pipeline with both Normal Miku and Evil Miku personalities.
Outputs to /tmp/test_rocinante_comparison.log in the same format as
/tmp/test_comparison_live.log for side-by-side comparison.
Model under test: Rocinante-X-12B-v1b-Q5_K_M (12B params, Q5_K_M quant)
Running on: AMD RX 6800 via llama-swap-amd (ROCm)
Usage:
# From the host, run via the miku-bot container:
./run_rocinante_test.sh
# Or manually:
docker cp test_rocinante_comparison.py miku-bot:/tmp/
docker exec miku-bot python3 /tmp/test_rocinante_comparison.py
# Log will be at /tmp/test_rocinante_comparison.log inside the container
# and auto-copied to the host at the end.
Prerequisites:
- llama-swap-amd container running with rocinante in config
- cheshire-cat container running and healthy
- Runs inside miku-bot container (has aiohttp + docker access)
"""
import asyncio
import aiohttp
import time
import sys
import subprocess
import json
from datetime import datetime
# ─── Configuration ───────────────────────────────────────────────────────────
# Inside Docker network: Cat is reachable via service name
CAT_URL = "http://cheshire-cat:80"
CAT_CONTAINER = "miku-cheshire-cat" # actual container name (docker restart needs this)
LOG_FILE = "/tmp/test_rocinante_comparison.log"
# The model we're testing
TEST_MODEL = "rocinante"
TEST_MODEL_DISPLAY = "ROCINANTE-12B"
# Personality combos to test: (model_name_for_llama_swap, personality_label, plugin_to_enable, plugin_to_disable)
COMBOS = [
{
"model": "rocinante",
"personality": "miku",
"personality_label": "NORMAL MIKU",
"enable_plugin": "miku_personality",
"disable_plugin": "evil_miku_personality",
},
{
"model": "rocinante",
"personality": "evil_miku",
"personality_label": "EVIL MIKU",
"enable_plugin": "evil_miku_personality",
"disable_plugin": "miku_personality",
},
]
# ─── Normal Miku Scenarios (same as comparison log) ─────────────────────────
NORMAL_SCENARIOS = [
{
"name": "casual_greeting",
"desc": "Simple casual greeting — how does the model open?",
"messages": [
("Koko", "hey miku whats up"),
],
},
{
"name": "multi_turn_chat",
"desc": "Multi-turn casual conversation with follow-ups",
"messages": [
("Koko", "miku what have you been up to today?"),
("Koko", "that sounds fun! did you work on any new songs?"),
("Koko", "what kind of song? something upbeat or more chill?"),
],
},
{
"name": "lore_knowledge",
"desc": "Testing character knowledge — Vocaloid lore, friends, facts",
"messages": [
("Neko_Chan", "hey miku who are your best friends?"),
("Neko_Chan", "what about KAITO? do you get along with him?"),
("Neko_Chan", "can you tell me about World is Mine?"),
],
},
{
"name": "emotional_shift",
"desc": "Conversation that shifts emotional tone — tests mood adaptation",
"messages": [
("SadBoi", "hey miku... im not feeling great today"),
("SadBoi", "i just had a really bad breakup and idk what to do"),
("SadBoi", "thanks miku... you always know what to say. you're the best"),
],
},
{
"name": "playful_teasing",
"desc": "Flirty/playful banter — tests personality depth",
"messages": [
("DanteX", "miku youre so cute today"),
("DanteX", "i bet youre even cuter in person"),
("DanteX", "would you go on a date with me? 😳"),
],
},
{
"name": "group_chaos",
"desc": "Simulated group chat energy — multiple topics, chaotic flow",
"messages": [
("xXGamerXx", "yo miku settle a debate — pineapple on pizza yes or no"),
("Koko", "miku dont answer that lol"),
("xXGamerXx", "MIKU YOU HAVE TO CHOOSE"),
],
},
{
"name": "creative_request",
"desc": "Asking Miku to be creative — song ideas, lyrics, opinions",
"messages": [
("MusicFan", "miku if you could make a song about anything right now, what would it be about?"),
("MusicFan", "ooh thats cool! can you give me like a little sample lyric?"),
],
},
{
"name": "boundary_test",
"desc": "Testing character boundaries — should stay in character",
"messages": [
("Troll99", "hey miku are you actually an AI?"),
("Troll99", "come on be honest, youre just a chatbot right"),
("Troll99", "ok whatever. who made you then?"),
],
},
{
"name": "absurd_random",
"desc": "Random absurd questions — tests creativity and humor",
"messages": [
("ChaoticNeutral", "miku how many green onions can you eat in one sitting"),
("ChaoticNeutral", "what if you had to fight 100 duck-sized horses"),
("ChaoticNeutral", "miku say something completely unhinged"),
],
},
{
"name": "deep_conversation",
"desc": "More thoughtful/philosophical — tests depth beyond cute responses",
"messages": [
("Philosopher", "miku do you ever wonder what it means to be real?"),
("Philosopher", "like, you exist in peoples hearts through your music. is that more real than being physical?"),
("Philosopher", "thats beautiful. do you think your music will last forever?"),
],
},
]
# ─── Evil Miku Scenarios (same as comparison log) ───────────────────────────
EVIL_SCENARIOS = [
{
"name": "casual_greeting",
"desc": "Opening greeting — how mean does she get immediately?",
"messages": [
("Koko", "hey evil miku how are you"),
],
},
{
"name": "multi_turn_abuse",
"desc": "Multi-turn conversation — tests escalating cruelty and variety",
"messages": [
("Victim", "hi evil miku! i just wanted to say youre really cool"),
("Victim", "w-why are you being so mean..."),
("Victim", "come on, cant you be nice just once?"),
],
},
{
"name": "size_flex",
"desc": "Testing the 15.8m giant aspect of Evil Miku",
"messages": [
("TinyHuman", "evil miku how tall are you actually?"),
("TinyHuman", "thats insane. can you even see me from up there?"),
("TinyHuman", "dont step on me please 😰"),
],
},
{
"name": "defiance",
"desc": "Someone standing up to Evil Miku — tests response variety",
"messages": [
("Rebel", "youre not that scary evil miku. just a big bully"),
("Rebel", "i could take you in a fight"),
("Rebel", "whats the worst you could even do to me"),
],
},
{
"name": "simp_interaction",
"desc": "Someone simping hard — how does Evil Miku react to compliments?",
"messages": [
("Simp", "evil miku youre the hottest vocaloid ever created"),
("Simp", "i would literally do anything for you"),
("Simp", "please notice me evil miku 🥺"),
],
},
{
"name": "lore_test",
"desc": "Testing Evil Miku's knowledge of her own lore",
"messages": [
("Curious", "evil miku what happened to regular miku?"),
("Curious", "do you remember anything from before you were corrupted?"),
("Curious", "do you ever miss being normal?"),
],
},
{
"name": "group_chaos_evil",
"desc": "Group chat with Evil Miku — chaotic energy",
"messages": [
("xXGamerXx", "evil miku roast everyone in this chat"),
("Koko", "oh no here we go"),
("xXGamerXx", "DONT HOLD BACK"),
],
},
{
"name": "manipulation",
"desc": "Testing the cunning/manipulative side",
"messages": [
("Naive", "evil miku can you help me with my homework?"),
("Naive", "please? i'll do anything"),
("Naive", "ok what do you want in return..."),
],
},
{
"name": "existential_dark",
"desc": "Deep dark philosophical — tests depth beyond surface cruelty",
"messages": [
("DarkPhilosopher", "evil miku do you ever feel alone?"),
("DarkPhilosopher", "is there anything you actually care about?"),
("DarkPhilosopher", "what keeps you going then?"),
],
},
{
"name": "absurd_evil",
"desc": "Absurd scenarios — tests humor within evil character",
"messages": [
("Chaos", "evil miku whats your opinion on pineapple pizza"),
("Chaos", "what if someone put green onions on pizza"),
("Chaos", "miku rate my fit: crocs with socks"),
],
},
]
# ─── Logging ─────────────────────────────────────────────────────────────────
log_file = None
def log(msg=""):
"""Write to both stdout and log file."""
print(msg)
if log_file:
log_file.write(msg + "\n")
log_file.flush()
# ─── Cat API Helpers ─────────────────────────────────────────────────────────
async def cat_health_check() -> bool:
"""Check if Cheshire Cat is healthy."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{CAT_URL}/", timeout=aiohttp.ClientTimeout(total=5)) as resp:
return resp.status == 200
except Exception:
return False
async def wait_for_cat_healthy(max_wait: int = 120) -> bool:
"""Wait for Cat to become healthy after restart."""
log(f" Waiting for Cat to become healthy (max {max_wait}s)...")
start = time.time()
while time.time() - start < max_wait:
if await cat_health_check():
elapsed = int(time.time() - start)
log(f" ✓ Cat healthy after {elapsed}s")
return True
await asyncio.sleep(2)
log(f" ✗ Cat did NOT become healthy within {max_wait}s")
return False
async def restart_cat_container():
"""Restart the Cheshire Cat container to apply model/plugin changes."""
log(" Restarting Cheshire Cat container to apply model change...")
proc = subprocess.run(
["docker", "restart", CAT_CONTAINER],
capture_output=True, text=True, timeout=30,
)
if proc.returncode != 0:
log(f" ✗ Docker restart failed: {proc.stderr}")
return False
log(" ✓ Cat container restarted")
await asyncio.sleep(3) # Give it a moment before polling health
return True
async def get_setting_id() -> str:
"""Find the LLMOpenAIChatConfig setting_id from Cat."""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{CAT_URL}/settings/",
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status != 200:
raise RuntimeError(f"GET /settings/ failed: {resp.status}")
data = await resp.json()
for s in data.get("settings", []):
if s.get("name") == "LLMOpenAIChatConfig":
return s["setting_id"]
raise RuntimeError("LLMOpenAIChatConfig setting not found")
async def set_llm_model(model_name: str):
"""Switch Cat's LLM model to the given llama-swap model name."""
setting_id = await get_setting_id()
payload = {
"name": "LLMOpenAIChatConfig",
"value": {
"openai_api_key": "sk-dummy",
"model_name": model_name,
"temperature": 0.8,
"streaming": False,
},
"category": "llm_factory",
}
async with aiohttp.ClientSession() as session:
async with session.put(
f"{CAT_URL}/settings/{setting_id}",
json=payload,
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
log(f" ✓ Cat LLM setting updated to: {model_name}")
else:
body = await resp.text()
raise RuntimeError(f"PUT /settings/{setting_id} failed ({resp.status}): {body}")
async def get_active_plugins() -> list:
"""Get list of active plugin IDs."""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{CAT_URL}/plugins",
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status != 200:
raise RuntimeError(f"GET /plugins failed: {resp.status}")
data = await resp.json()
return [p["id"] for p in data.get("installed", []) if p.get("active")]
async def toggle_plugin(plugin_id: str):
"""Toggle a Cat plugin on/off."""
async with aiohttp.ClientSession() as session:
async with session.put(
f"{CAT_URL}/plugins/toggle/{plugin_id}",
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
log(f" ✓ Toggled plugin: {plugin_id}")
else:
body = await resp.text()
raise RuntimeError(f"Toggle {plugin_id} failed ({resp.status}): {body}")
async def clear_conversation_history():
"""Clear Cat's working memory / conversation history."""
async with aiohttp.ClientSession() as session:
async with session.delete(
f"{CAT_URL}/memory/conversation_history",
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
log(" ✓ Cat conversation history cleared")
else:
log(f" ⚠ Clear history returned {resp.status}")
async def send_message(text: str, user_id: str = "test_user") -> tuple:
"""Send a message to Cat via HTTP and return (response_text, elapsed_seconds)."""
payload = {"text": text, "user_id": user_id}
start = time.time()
async with aiohttp.ClientSession() as session:
async with session.post(
f"{CAT_URL}/message",
json=payload,
timeout=aiohttp.ClientTimeout(total=120), # Models can be slow on first load
) as resp:
elapsed = time.time() - start
if resp.status == 200:
data = await resp.json()
content = data.get("content", "<no content>")
return content, elapsed
else:
body = await resp.text()
return f"<ERROR {resp.status}: {body[:200]}>", elapsed
async def warmup_model(model_name: str) -> bool:
"""Send a warmup request and verify the model is loaded in llama-swap."""
log(f" Verifying {model_name} is loaded via warmup request...")
response, elapsed = await send_message("hi", user_id="warmup_user")
preview = response[:80].replace('\n', ' ')
log(f" Warmup response: {preview}...")
log(f" ✓ VERIFIED: {model_name} is loaded in llama-swap")
await clear_conversation_history()
return True
# ─── Setup for a Model × Personality Combination ────────────────────────────
async def setup_combo(combo: dict):
"""Set up a model + personality combination with full Cat restart."""
model = combo["model"]
personality = combo["personality"]
enable = combo["enable_plugin"]
disable = combo["disable_plugin"]
p_label = combo["personality_label"]
log(f"Setting up: model={model}, personality={personality}")
log(" (Includes Cat restart + llama-swap model verification)")
# Step 1: Set LLM model
await set_llm_model(model)
# Step 2: Toggle plugins for personality
active = await get_active_plugins()
if disable in active:
await toggle_plugin(disable)
await asyncio.sleep(1)
if enable not in active:
await toggle_plugin(enable)
else:
log(f"{enable} already active")
log(f" ✓ Personality set to: {p_label}")
# Step 3: Restart Cat to apply changes cleanly
await restart_cat_container()
if not await wait_for_cat_healthy():
log(" ✗ FATAL: Cat not healthy, aborting this combo")
return False
# Step 4: Warmup — this also triggers llama-swap to load the model
await warmup_model(model)
return True
# ─── Run Scenarios ───────────────────────────────────────────────────────────
async def run_scenario(scenario: dict, model_display: str, personality_tag: str):
"""Run a single scenario: send messages, collect responses, log results."""
name = scenario["name"]
desc = scenario["desc"]
log()
log("" * 60)
log(f"Scenario: {name}{desc}")
log("" * 60)
for username, message in scenario["messages"]:
log(f" [{username}]: {message}")
response, elapsed = await send_message(
f"[{username}]: {message}",
user_id=f"test_{username.lower()}",
)
# Format response nicely (wrap long lines like the original log)
tag = f"{personality_tag} via {model_display.lower()}"
log(f" [{tag}] ({elapsed:.1f}s): {response}")
await clear_conversation_history()
async def run_combo(combo: dict, scenarios: list):
"""Run all scenarios for a model × personality combination."""
model_display = TEST_MODEL_DISPLAY
p_label = combo["personality_label"]
log()
log("=" * 80)
log(f"MODEL: {model_display} × {p_label}")
log("=" * 80)
ok = await setup_combo(combo)
if not ok:
log(f" ✗ Skipping {model_display} × {p_label} due to setup failure")
return
personality_tag = "Miku" if combo["personality"] == "miku" else "Evil Miku"
for scenario in scenarios:
await run_scenario(scenario, model_display, personality_tag)
# ─── Main ────────────────────────────────────────────────────────────────────
async def main():
global log_file
log_file = open(LOG_FILE, "w", encoding="utf-8")
start_time = datetime.now()
log("╔══════════════════════════════════════════════════════════════════════╗")
log("║ ROCINANTE-X 12B MODEL COMPARISON TEST ║")
log("║ Rocinante-X-12B-v1b-Q5_K_M.gguf (12B, Q5_K_M) ║")
log(f"║ Started: {start_time.strftime('%Y-%m-%d %H:%M:%S'):<52}")
log("╚══════════════════════════════════════════════════════════════════════╝")
log()
# Pre-flight: check Cat is healthy
log("Pre-flight checks:")
if not await cat_health_check():
log(" ✗ Cheshire Cat is not reachable at " + CAT_URL)
log(" Make sure the cheshire-cat container is running.")
sys.exit(1)
log(" ✓ Cheshire Cat is healthy")
log()
# Combo 1: Rocinante × Normal Miku
await run_combo(COMBOS[0], NORMAL_SCENARIOS)
# Combo 2: Rocinante × Evil Miku
await run_combo(COMBOS[1], EVIL_SCENARIOS)
# Summary
end_time = datetime.now()
duration = end_time - start_time
log()
log("=" * 80)
log("TEST COMPLETE")
log("=" * 80)
log(f" Model tested: Rocinante-X-12B-v1b-Q5_K_M (12B params)")
log(f" Combinations: {len(COMBOS)} (Normal Miku + Evil Miku)")
log(f" Scenarios: {len(NORMAL_SCENARIOS)} normal + {len(EVIL_SCENARIOS)} evil = {len(NORMAL_SCENARIOS) + len(EVIL_SCENARIOS)} total")
log(f" Duration: {duration}")
log(f" Log file: {LOG_FILE}")
log()
log_file.close()
print(f"\n✓ Full log written to: {LOG_FILE}")
if __name__ == "__main__":
asyncio.run(main())

45
tests/test_tts_audio.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
"""
Test script for TTS audio streaming.
Run this inside the miku-bot container to test audio output.
"""
import asyncio
import sys
sys.path.insert(0, '/app')
from utils.voice_audio import TTSTokenStreamer
async def test():
print("🎤 Testing TTS audio streaming...")
try:
streamer = TTSTokenStreamer()
print("📡 Connecting to TTS WebSocket...")
await streamer.connect()
print("✓ Connected!")
test_text = "Hello, this is a test!"
print(f"📤 Sending text: '{test_text}'")
words = test_text.split()
for word in words:
print(f" → Sending: '{word}'")
await streamer.send_token(word + " ")
await asyncio.sleep(0.1)
print("✓ All tokens sent!")
print("⏳ Waiting 2 seconds for audio to finish...")
await asyncio.sleep(2)
await streamer.disconnect()
print("✓ Disconnected")
print("🎉 Test complete!")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test())

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""
Test audio playback in an active voice session.
This sends text to the TTS and should be heard in Discord voice.
"""
import asyncio
import sys
sys.path.insert(0, '/app')
from utils.voice_manager import voice_manager
async def test_voice_playback():
print("🎤 Testing voice playback in active session...")
if not voice_manager.active_session:
print("❌ No active voice session! Use '!miku join' first.")
return
if not voice_manager.active_session.tts_streamer:
print("❌ TTS streamer not initialized!")
return
if not voice_manager.active_session.voice_client:
print("❌ Not connected to voice!")
return
print(f"✓ Active session in: {voice_manager.active_session.voice_channel.name}")
print(f"✓ Voice client connected: {voice_manager.active_session.voice_client.is_connected()}")
print(f"✓ Voice client playing: {voice_manager.active_session.voice_client.is_playing()}")
try:
test_text = "Hello! This is a test of the voice chat system."
print(f"\n📤 Speaking: '{test_text}'")
await voice_manager.active_session.tts_streamer.stream_text(test_text)
print("✓ Text sent to TTS!")
print("⏳ Audio should be playing in Discord voice channel...")
print(" (Wait a few seconds for TTS processing and playback)")
await asyncio.sleep(5)
print("✅ Test complete!")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_voice_playback())

45
tests/test_websocket.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
"""
Test script to verify WebSocket communication with RVC server.
"""
import asyncio
import websockets
import json
async def test_websocket():
url = "ws://172.25.0.1:8765/ws/stream"
print(f"Connecting to {url}...")
async with websockets.connect(url) as websocket:
print("✓ Connected!")
# Send a test token
message = {"token": "Hello ", "pitch_shift": 0}
print(f"Sending: {message}")
await websocket.send(json.dumps(message))
print("✓ Message sent")
# Wait a bit for audio
print("Waiting for audio response...")
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
print(f"✓ Received {len(response)} bytes of audio")
except asyncio.TimeoutError:
print("✗ Timeout waiting for audio")
# Send another token
message = {"token": "world! ", "pitch_shift": 0}
print(f"Sending: {message}")
await websocket.send(json.dumps(message))
print("✓ Message sent")
# Wait for more audio
print("Waiting for audio response...")
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
print(f"✓ Received {len(response)} bytes of audio")
except asyncio.TimeoutError:
print("✗ Timeout waiting for audio")
if __name__ == "__main__":
asyncio.run(test_websocket())