#!/usr/bin/env python3 """ Full Pipeline Test for Memory Consolidation System v2.0.0 """ import requests import time import json import sys CAT_URL = "http://localhost:1865" QDRANT_URL = "http://localhost:6333" CONSOLIDATION_TIMEOUT = 180 def send_message(text, timeout=30): try: resp = requests.post(f"{CAT_URL}/message", json={"text": text}, timeout=timeout) resp.raise_for_status() return resp.json() except requests.exceptions.Timeout: return {"error": "timeout", "content": ""} except Exception as e: return {"error": str(e), "content": ""} def qdrant_scroll(collection, limit=200, filt=None): body = {"limit": limit, "with_payload": True, "with_vector": False} if filt: body["filter"] = filt resp = requests.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body) return resp.json()["result"]["points"] def qdrant_count(collection): return len(qdrant_scroll(collection)) def section(title): print(f"\n{'=' * 70}") print(f" {title}") print(f"{'=' * 70}") print("=" * 70) print(" FULL PIPELINE TEST - Memory Consolidation v2.0.0") print("=" * 70) try: requests.get(f"{CAT_URL}/", timeout=5) except Exception: print("ERROR: Cat not reachable"); sys.exit(1) try: requests.get(f"{QDRANT_URL}/collections", timeout=5) except Exception: print("ERROR: Qdrant not reachable"); sys.exit(1) episodic_start = qdrant_count("episodic") declarative_start = qdrant_count("declarative") print(f"\nStarting state: {episodic_start} episodic, {declarative_start} declarative") results = {} # TEST 1: Trivial Message Filtering section("TEST 1: Trivial Message Filtering") trivial_messages = ["lol", "k", "ok", "haha", "xd", "brb"] print(f"Sending {len(trivial_messages)} trivial messages...") for msg in trivial_messages: send_message(msg) time.sleep(0.3) time.sleep(1) # Count only USER episodic memories (exclude Miku's responses) user_episodic = qdrant_scroll("episodic", filt={ "must_not": [{"key": "metadata.speaker", "match": {"value": "miku"}}] }) trivial_user_stored = len(user_episodic) - episodic_start episodic_after_trivial = qdrant_count("episodic") # discord_bridge filters trivial user messages, but Miku still responds # so we only check user-side storage if trivial_user_stored < len(trivial_messages): print(f" PASS - Only {trivial_user_stored}/{len(trivial_messages)} user trivial messages stored") print(f" (Total episodic incl. Miku responses: {episodic_after_trivial})") results["trivial_filtering"] = True else: print(f" WARN - All {trivial_user_stored} trivial messages stored") results["trivial_filtering"] = False # TEST 2: Important Message Storage section("TEST 2: Important Message Storage") personal_facts = [ "My name is Sarah Chen", "I'm 28 years old", "I live in Seattle, Washington", "I work as a software engineer at Microsoft", "My favorite color is forest green", "I love playing piano and have practiced for 15 years", "I'm learning Japanese, currently at N3 level", "I have a cat named Luna", "I'm allergic to peanuts", "My birthday is March 15th", "I graduated from UW in 2018", "I enjoy hiking on weekends", ] print(f"Sending {len(personal_facts)} personal info messages...") for i, fact in enumerate(personal_facts, 1): resp = send_message(fact) status = "OK" if "error" not in resp else "ERR" print(f" [{i}/{len(personal_facts)}] {status} {fact[:50]}") time.sleep(0.5) time.sleep(1) episodic_after_personal = qdrant_count("episodic") personal_stored = episodic_after_personal - episodic_after_trivial print(f"\n Episodic memories from personal info: {personal_stored}") results["important_storage"] = personal_stored >= len(personal_facts) print(f" {'PASS' if results['important_storage'] else 'FAIL'} - Expected >={len(personal_facts)}, got {personal_stored}") # TEST 3: Miku Response Storage section("TEST 3: Bidirectional Memory (Miku Response Storage)") miku_points = qdrant_scroll("episodic", filt={ "must": [{"key": "metadata.speaker", "match": {"value": "miku"}}] }) print(f" Miku's memories in episodic: {len(miku_points)}") if miku_points: print(f" Sample: \"{miku_points[0]['payload']['page_content'][:70]}\"") results["miku_storage"] = True print(" PASS") else: results["miku_storage"] = False print(" FAIL - No Miku responses in episodic memory") # TEST 4: Per-User Source Tagging section("TEST 4: Per-User Source Tagging") user_points = qdrant_scroll("episodic", filt={ "must": [{"key": "metadata.source", "match": {"value": "user"}}] }) print(f" Points with source='user': {len(user_points)}") global_points = qdrant_scroll("episodic", filt={ "must": [{"key": "metadata.source", "match": {"value": "global"}}] }) print(f" Points with source='global' (old bug): {len(global_points)}") results["user_tagging"] = len(user_points) > 0 and len(global_points) == 0 print(f" {'PASS' if results['user_tagging'] else 'FAIL'}") # TEST 5: Memory Consolidation section("TEST 5: Memory Consolidation & Fact Extraction") print(f" Triggering consolidation (timeout={CONSOLIDATION_TIMEOUT}s)...") t0 = time.time() resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT) elapsed = time.time() - t0 if "error" in resp: print(f" WARN - HTTP issue: {resp['error']} ({elapsed:.0f}s)") print(" Waiting 60s for background completion...") time.sleep(60) else: print(f" Completed in {elapsed:.1f}s") content = resp.get("content", "") print(f" Response: {content[:120]}...") time.sleep(3) declarative_after = qdrant_count("declarative") new_facts = declarative_after - declarative_start print(f"\n Declarative facts: {declarative_start} -> {declarative_after} (+{new_facts})") results["consolidation"] = new_facts >= 5 print(f" {'PASS' if results['consolidation'] else 'FAIL'} - {'>=5 facts' if results['consolidation'] else f'only {new_facts}'}") all_facts = qdrant_scroll("declarative") print(f"\n All declarative facts ({len(all_facts)}):") for i, f in enumerate(all_facts, 1): content = f["payload"]["page_content"] meta = f["payload"].get("metadata", {}) source = meta.get("source", "?") ftype = meta.get("fact_type", "?") print(f" {i}. [{source}|{ftype}] {content}") # TEST 6: Duplicate Detection section("TEST 6: Duplicate Detection (2nd consolidation)") facts_before_2nd = qdrant_count("declarative") print(f" Facts before: {facts_before_2nd}") print(f" Running consolidation again...") resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT) time.sleep(3) facts_after_2nd = qdrant_count("declarative") new_dupes = facts_after_2nd - facts_before_2nd print(f" Facts after: {facts_after_2nd} (+{new_dupes})") results["dedup"] = new_dupes <= 2 print(f" {'PASS' if results['dedup'] else 'FAIL'} - {new_dupes} new facts (<=2 expected)") # TEST 7: Fact Recall section("TEST 7: Fact Recall via Natural Language") queries = { "What is my name?": ["sarah", "chen"], "How old am I?": ["28"], "Where do I live?": ["seattle"], "Where do I work?": ["microsoft", "software engineer"], "What am I allergic to?": ["peanut"], } correct = 0 for question, keywords in queries.items(): resp = send_message(question) answer = resp.get("content", "") hit = any(kw.lower() in answer.lower() for kw in keywords) if hit: correct += 1 icon = "OK" if hit else "??" print(f" {icon} Q: {question}") print(f" A: {answer[:150]}") time.sleep(1) accuracy = correct / len(queries) * 100 results["recall"] = correct >= 3 print(f"\n Recall: {correct}/{len(queries)} ({accuracy:.0f}%)") print(f" {'PASS' if results['recall'] else 'FAIL'} (threshold: >=3)") # FINAL SUMMARY section("FINAL SUMMARY") total = len(results) passed = sum(1 for v in results.values() if v) print() for name, ok in results.items(): print(f" [{'PASS' if ok else 'FAIL'}] {name}") print(f"\n Score: {passed}/{total}") print(f" Episodic: {qdrant_count('episodic')}") print(f" Declarative: {qdrant_count('declarative')}") if passed == total: print("\n ALL TESTS PASSED!") elif passed >= total - 1: print("\n MOSTLY PASSING - minor issues only") else: print("\n SOME TESTS FAILED - review above") print("\n" + "=" * 70)