Files
miku-discord/test_full_pipeline.py
koko210Serve edb88e9ede fix: Phase 2 integrity review - v2.0.0 rewrite & bugfixes
Memory Consolidation Plugin (828 -> 465 lines):
- Replace SentenceTransformer with cat.embedder.embed_query() for vector consistency
- Fix per-user fact isolation: source=user_id instead of global
- Add duplicate fact detection (_is_duplicate_fact, score_threshold=0.85)
- Remove ~350 lines of dead async run_consolidation() code
- Remove duplicate declarative search in before_cat_sends_message
- Unify trivial patterns into TRIVIAL_PATTERNS frozenset
- Remove all sys.stderr.write debug logging
- Remove sentence-transformers from requirements.txt (no external deps)

Loguru Fix (cheshire-cat/cat/log.py):
- Patch Cat v1.6.2 loguru format to provide default extra fields
- Fixes KeyError: 'original_name' from third-party libs (fastembed)
- Mounted via docker-compose volume

Discord Bridge:
- Copy discord_bridge.py to cat-plugins/ (was empty directory)

Test Results (6/7 pass, 100% fact recall):
- 11 facts extracted, per-user isolation working
- Duplicate detection effective (+2 on 2nd run)
- 5/5 natural language recall queries correct
2026-02-07 19:24:46 +02:00

255 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Full Pipeline Test for Memory Consolidation System v2.0.0
"""
import requests
import time
import json
import sys
CAT_URL = "http://localhost:1865"
QDRANT_URL = "http://localhost:6333"
CONSOLIDATION_TIMEOUT = 180
def send_message(text, timeout=30):
try:
resp = requests.post(f"{CAT_URL}/message", json={"text": text}, timeout=timeout)
resp.raise_for_status()
return resp.json()
except requests.exceptions.Timeout:
return {"error": "timeout", "content": ""}
except Exception as e:
return {"error": str(e), "content": ""}
def qdrant_scroll(collection, limit=200, filt=None):
body = {"limit": limit, "with_payload": True, "with_vector": False}
if filt:
body["filter"] = filt
resp = requests.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body)
return resp.json()["result"]["points"]
def qdrant_count(collection):
return len(qdrant_scroll(collection))
def section(title):
print(f"\n{'=' * 70}")
print(f" {title}")
print(f"{'=' * 70}")
print("=" * 70)
print(" FULL PIPELINE TEST - Memory Consolidation v2.0.0")
print("=" * 70)
try:
requests.get(f"{CAT_URL}/", timeout=5)
except Exception:
print("ERROR: Cat not reachable"); sys.exit(1)
try:
requests.get(f"{QDRANT_URL}/collections", timeout=5)
except Exception:
print("ERROR: Qdrant not reachable"); sys.exit(1)
episodic_start = qdrant_count("episodic")
declarative_start = qdrant_count("declarative")
print(f"\nStarting state: {episodic_start} episodic, {declarative_start} declarative")
results = {}
# TEST 1: Trivial Message Filtering
section("TEST 1: Trivial Message Filtering")
trivial_messages = ["lol", "k", "ok", "haha", "xd", "brb"]
print(f"Sending {len(trivial_messages)} trivial messages...")
for msg in trivial_messages:
send_message(msg)
time.sleep(0.3)
time.sleep(1)
# Count only USER episodic memories (exclude Miku's responses)
user_episodic = qdrant_scroll("episodic", filt={
"must_not": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
trivial_user_stored = len(user_episodic) - episodic_start
episodic_after_trivial = qdrant_count("episodic")
# discord_bridge filters trivial user messages, but Miku still responds
# so we only check user-side storage
if trivial_user_stored < len(trivial_messages):
print(f" PASS - Only {trivial_user_stored}/{len(trivial_messages)} user trivial messages stored")
print(f" (Total episodic incl. Miku responses: {episodic_after_trivial})")
results["trivial_filtering"] = True
else:
print(f" WARN - All {trivial_user_stored} trivial messages stored")
results["trivial_filtering"] = False
# TEST 2: Important Message Storage
section("TEST 2: Important Message Storage")
personal_facts = [
"My name is Sarah Chen",
"I'm 28 years old",
"I live in Seattle, Washington",
"I work as a software engineer at Microsoft",
"My favorite color is forest green",
"I love playing piano and have practiced for 15 years",
"I'm learning Japanese, currently at N3 level",
"I have a cat named Luna",
"I'm allergic to peanuts",
"My birthday is March 15th",
"I graduated from UW in 2018",
"I enjoy hiking on weekends",
]
print(f"Sending {len(personal_facts)} personal info messages...")
for i, fact in enumerate(personal_facts, 1):
resp = send_message(fact)
status = "OK" if "error" not in resp else "ERR"
print(f" [{i}/{len(personal_facts)}] {status} {fact[:50]}")
time.sleep(0.5)
time.sleep(1)
episodic_after_personal = qdrant_count("episodic")
personal_stored = episodic_after_personal - episodic_after_trivial
print(f"\n Episodic memories from personal info: {personal_stored}")
results["important_storage"] = personal_stored >= len(personal_facts)
print(f" {'PASS' if results['important_storage'] else 'FAIL'} - Expected >={len(personal_facts)}, got {personal_stored}")
# TEST 3: Miku Response Storage
section("TEST 3: Bidirectional Memory (Miku Response Storage)")
miku_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
print(f" Miku's memories in episodic: {len(miku_points)}")
if miku_points:
print(f" Sample: \"{miku_points[0]['payload']['page_content'][:70]}\"")
results["miku_storage"] = True
print(" PASS")
else:
results["miku_storage"] = False
print(" FAIL - No Miku responses in episodic memory")
# TEST 4: Per-User Source Tagging
section("TEST 4: Per-User Source Tagging")
user_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "user"}}]
})
print(f" Points with source='user': {len(user_points)}")
global_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "global"}}]
})
print(f" Points with source='global' (old bug): {len(global_points)}")
results["user_tagging"] = len(user_points) > 0 and len(global_points) == 0
print(f" {'PASS' if results['user_tagging'] else 'FAIL'}")
# TEST 5: Memory Consolidation
section("TEST 5: Memory Consolidation & Fact Extraction")
print(f" Triggering consolidation (timeout={CONSOLIDATION_TIMEOUT}s)...")
t0 = time.time()
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
elapsed = time.time() - t0
if "error" in resp:
print(f" WARN - HTTP issue: {resp['error']} ({elapsed:.0f}s)")
print(" Waiting 60s for background completion...")
time.sleep(60)
else:
print(f" Completed in {elapsed:.1f}s")
content = resp.get("content", "")
print(f" Response: {content[:120]}...")
time.sleep(3)
declarative_after = qdrant_count("declarative")
new_facts = declarative_after - declarative_start
print(f"\n Declarative facts: {declarative_start} -> {declarative_after} (+{new_facts})")
results["consolidation"] = new_facts >= 5
print(f" {'PASS' if results['consolidation'] else 'FAIL'} - {'>=5 facts' if results['consolidation'] else f'only {new_facts}'}")
all_facts = qdrant_scroll("declarative")
print(f"\n All declarative facts ({len(all_facts)}):")
for i, f in enumerate(all_facts, 1):
content = f["payload"]["page_content"]
meta = f["payload"].get("metadata", {})
source = meta.get("source", "?")
ftype = meta.get("fact_type", "?")
print(f" {i}. [{source}|{ftype}] {content}")
# TEST 6: Duplicate Detection
section("TEST 6: Duplicate Detection (2nd consolidation)")
facts_before_2nd = qdrant_count("declarative")
print(f" Facts before: {facts_before_2nd}")
print(f" Running consolidation again...")
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
time.sleep(3)
facts_after_2nd = qdrant_count("declarative")
new_dupes = facts_after_2nd - facts_before_2nd
print(f" Facts after: {facts_after_2nd} (+{new_dupes})")
results["dedup"] = new_dupes <= 2
print(f" {'PASS' if results['dedup'] else 'FAIL'} - {new_dupes} new facts (<=2 expected)")
# TEST 7: Fact Recall
section("TEST 7: Fact Recall via Natural Language")
queries = {
"What is my name?": ["sarah", "chen"],
"How old am I?": ["28"],
"Where do I live?": ["seattle"],
"Where do I work?": ["microsoft", "software engineer"],
"What am I allergic to?": ["peanut"],
}
correct = 0
for question, keywords in queries.items():
resp = send_message(question)
answer = resp.get("content", "")
hit = any(kw.lower() in answer.lower() for kw in keywords)
if hit:
correct += 1
icon = "OK" if hit else "??"
print(f" {icon} Q: {question}")
print(f" A: {answer[:150]}")
time.sleep(1)
accuracy = correct / len(queries) * 100
results["recall"] = correct >= 3
print(f"\n Recall: {correct}/{len(queries)} ({accuracy:.0f}%)")
print(f" {'PASS' if results['recall'] else 'FAIL'} (threshold: >=3)")
# FINAL SUMMARY
section("FINAL SUMMARY")
total = len(results)
passed = sum(1 for v in results.values() if v)
print()
for name, ok in results.items():
print(f" [{'PASS' if ok else 'FAIL'}] {name}")
print(f"\n Score: {passed}/{total}")
print(f" Episodic: {qdrant_count('episodic')}")
print(f" Declarative: {qdrant_count('declarative')}")
if passed == total:
print("\n ALL TESTS PASSED!")
elif passed >= total - 1:
print("\n MOSTLY PASSING - minor issues only")
else:
print("\n SOME TESTS FAILED - review above")
print("\n" + "=" * 70)