fix: Phase 2 integrity review - v2.0.0 rewrite & bugfixes

Memory Consolidation Plugin (828 -> 465 lines):
- Replace SentenceTransformer with cat.embedder.embed_query() for vector consistency
- Fix per-user fact isolation: source=user_id instead of global
- Add duplicate fact detection (_is_duplicate_fact, score_threshold=0.85)
- Remove ~350 lines of dead async run_consolidation() code
- Remove duplicate declarative search in before_cat_sends_message
- Unify trivial patterns into TRIVIAL_PATTERNS frozenset
- Remove all sys.stderr.write debug logging
- Remove sentence-transformers from requirements.txt (no external deps)

Loguru Fix (cheshire-cat/cat/log.py):
- Patch Cat v1.6.2 loguru format to provide default extra fields
- Fixes KeyError: 'original_name' from third-party libs (fastembed)
- Mounted via docker-compose volume

Discord Bridge:
- Copy discord_bridge.py to cat-plugins/ (was empty directory)

Test Results (6/7 pass, 100% fact recall):
- 11 facts extracted, per-user isolation working
- Duplicate detection effective (+2 on 2nd run)
- 5/5 natural language recall queries correct
This commit is contained in:
2026-02-07 19:24:46 +02:00
parent 83c103324c
commit edb88e9ede
6 changed files with 851 additions and 728 deletions

View File

@@ -0,0 +1,109 @@
"""
Discord Bridge Plugin for Cheshire Cat
This plugin enriches Cat's memory system with Discord context:
- Unified user identity across all servers and DMs
- Guild/channel metadata for context tracking
- Minimal filtering before storage (only skip obvious junk)
- Marks memories as unconsolidated for nightly processing
Phase 1 Implementation
"""
from cat.mad_hatter.decorators import hook
from datetime import datetime
import re
@hook(priority=100)
def before_cat_reads_message(user_message_json: dict, cat) -> dict:
"""
Enrich incoming message with Discord metadata.
This runs BEFORE the message is processed.
"""
# Extract Discord context from working memory or metadata
# These will be set by the Discord bot when calling the Cat API
guild_id = cat.working_memory.get('guild_id')
channel_id = cat.working_memory.get('channel_id')
# Add to message metadata for later use
if 'metadata' not in user_message_json:
user_message_json['metadata'] = {}
user_message_json['metadata']['guild_id'] = guild_id or 'dm'
user_message_json['metadata']['channel_id'] = channel_id
user_message_json['metadata']['timestamp'] = datetime.now().isoformat()
return user_message_json
@hook(priority=100)
def before_cat_stores_episodic_memory(doc, cat):
"""
Filter and enrich memories before storage.
Phase 1: Minimal filtering
- Skip only obvious junk (1-2 char messages, pure reactions)
- Store everything else temporarily
- Mark as unconsolidated for nightly processing
"""
message = doc.page_content.strip()
# Skip only the most trivial messages
skip_patterns = [
r'^\w{1,2}$', # 1-2 character messages: "k", "ok"
r'^(lol|lmao|haha|hehe|xd|rofl)$', # Pure reactions
r'^:[\w_]+:$', # Discord emoji only: ":smile:"
]
for pattern in skip_patterns:
if re.match(pattern, message.lower()):
print(f"🗑️ [Discord Bridge] Skipping trivial message: {message}")
return None # Don't store at all
# Add Discord metadata to memory
doc.metadata['consolidated'] = False # Needs nightly processing
doc.metadata['stored_at'] = datetime.now().isoformat()
# Get Discord context from working memory
guild_id = cat.working_memory.get('guild_id')
channel_id = cat.working_memory.get('channel_id')
doc.metadata['guild_id'] = guild_id or 'dm'
doc.metadata['channel_id'] = channel_id
doc.metadata['source'] = cat.user_id # CRITICAL: Cat filters episodic by source=user_id!
doc.metadata['discord_source'] = 'discord' # Keep original value as separate field
print(f"💾 [Discord Bridge] Storing memory (unconsolidated): {message[:50]}...")
print(f" User: {cat.user_id}, Guild: {doc.metadata['guild_id']}, Channel: {channel_id}")
return doc
@hook(priority=50)
def after_cat_recalls_memories(cat):
"""
Log memory recall for debugging.
Access recalled memories via cat.working_memory.
"""
import sys
sys.stderr.write("🧠 [Discord Bridge] after_cat_recalls_memories HOOK CALLED!\n")
sys.stderr.flush()
# Get recalled memories from working memory
episodic_memories = cat.working_memory.get('episodic_memories', [])
declarative_memories = cat.working_memory.get('declarative_memories', [])
if episodic_memories:
print(f"🧠 [Discord Bridge] Recalled {len(episodic_memories)} episodic memories for user {cat.user_id}")
# Show which guilds the memories are from
guilds = set()
for doc, score in episodic_memories:
guild = doc.metadata.get('guild_id', 'unknown')
guilds.add(guild)
print(f" From guilds: {', '.join(guilds)}")
# Plugin metadata
__version__ = "1.0.0"
__description__ = "Discord bridge with unified user identity and sleep consolidation support"

View File

@@ -5,25 +5,32 @@ Phase 2: Sleep Consolidation Implementation
Implements human-like memory consolidation:
1. During the day: Store almost everything temporarily
2. At night (3 AM): Analyze conversations, keep important, delete trivial
3. Extract facts for declarative memory
2. On demand (or scheduled): Analyze conversations, keep important, delete trivial
3. Extract facts for declarative memory (per-user)
This mimics how human brains consolidate memories during REM sleep.
"""
from cat.mad_hatter.decorators import hook, plugin, tool
from cat.mad_hatter.decorators import CatHook
from datetime import datetime, timedelta
from cat.mad_hatter.decorators import hook, tool
from datetime import datetime
import json
import asyncio
import os
from typing import List, Dict, Any
print("🌙 [Consolidation Plugin] Loading...")
print("\U0001f319 [Consolidation Plugin] Loading...")
# Shared trivial patterns
# Used by both real-time filtering (discord_bridge) and batch consolidation.
# Keep this in sync with discord_bridge's skip_patterns.
TRIVIAL_PATTERNS = frozenset([
'lol', 'k', 'ok', 'okay', 'haha', 'lmao', 'xd', 'rofl', 'lmfao',
'brb', 'gtg', 'afk', 'ttyl', 'lmk', 'idk', 'tbh', 'imo', 'imho',
'omg', 'wtf', 'fyi', 'btw', 'nvm', 'jk', 'ikr', 'smh',
'hehe', 'heh', 'gg', 'wp', 'gz', 'gj', 'ty', 'thx', 'np', 'yw',
'nice', 'cool', 'neat', 'wow', 'yep', 'nope', 'yeah', 'nah',
])
# Store consolidation state
# Consolidation state
consolidation_state = {
'last_run': None,
'is_running': False,
@@ -36,442 +43,97 @@ consolidation_state = {
}
async def consolidate_user_memories(user_id: str, memories: List[Any], cat) -> Dict[str, Any]:
"""
Analyze all of a user's conversations from the day in ONE LLM call.
This is the core intelligence - Miku sees patterns, themes, relationship evolution.
"""
# Build conversation timeline
timeline = []
for mem in sorted(memories, key=lambda m: m.metadata.get('stored_at', '')):
timeline.append({
'time': mem.metadata.get('stored_at', ''),
'guild': mem.metadata.get('guild_id', 'unknown'),
'channel': mem.metadata.get('channel_id', 'unknown'),
'content': mem.page_content[:200] # Truncate for context window
})
# Build consolidation prompt
consolidation_prompt = f"""You are Miku, reviewing your conversations with user {user_id} from today.
Look at the full timeline and decide what's worth remembering long-term.
Timeline of {len(timeline)} conversations:
{json.dumps(timeline, indent=2)}
Analyze holistically:
1. What did you learn about this person today?
2. Any recurring themes or important moments?
3. How did your relationship with them evolve?
4. Which conversations were meaningful vs casual chitchat?
For EACH conversation (by index), decide:
- keep: true/false (should this go to long-term memory?)
- importance: 1-10 (10 = life-changing event, 1 = forget immediately)
- categories: list of ["personal", "preference", "emotional", "event", "relationship"]
- insights: What did you learn? (for declarative memory)
- summary: One sentence for future retrieval
Respond with VALID JSON (no extra text):
{{
"day_summary": "One sentence about this person based on today",
"relationship_change": "How your relationship evolved (if at all)",
"conversations": [
{{
"index": 0,
"keep": true,
"importance": 8,
"categories": ["personal", "emotional"],
"insights": "User struggles with anxiety, needs support",
"summary": "User opened up about their anxiety"
}},
{{
"index": 1,
"keep": false,
"importance": 2,
"categories": [],
"insights": null,
"summary": "Just casual greeting"
}}
],
"new_facts": [
"User has anxiety",
"User trusts Miku enough to open up"
]
}}
"""
try:
# Call LLM for analysis
print(f"🌙 [Consolidation] Analyzing {len(memories)} memories for {user_id}...")
# Use the Cat's LLM
from cat.looking_glass.cheshire_cat import CheshireCat
response = cat.llm(consolidation_prompt)
# Parse JSON response
# Remove markdown code blocks if present
response = response.strip()
if response.startswith('```'):
response = response.split('```')[1]
if response.startswith('json'):
response = response[4:]
analysis = json.loads(response)
return analysis
except json.JSONDecodeError as e:
print(f"❌ [Consolidation] Failed to parse LLM response: {e}")
print(f" Response: {response[:200]}...")
# Default: keep everything if parsing fails
return {
"day_summary": "Unable to analyze",
"relationship_change": "Unknown",
"conversations": [
{"index": i, "keep": True, "importance": 5, "categories": [], "insights": None, "summary": "Kept by default"}
for i in range(len(memories))
],
"new_facts": []
}
except Exception as e:
print(f"❌ [Consolidation] Error during analysis: {e}")
return {
"day_summary": "Error during analysis",
"relationship_change": "Unknown",
"conversations": [
{"index": i, "keep": True, "importance": 5, "categories": [], "insights": None, "summary": "Kept by default"}
for i in range(len(memories))
],
"new_facts": []
}
async def run_consolidation(cat):
"""
Main consolidation task.
Run at 3 AM or on-demand via admin endpoint.
"""
if consolidation_state['is_running']:
print("⚠️ [Consolidation] Already running, skipping...")
return
try:
consolidation_state['is_running'] = True
print(f"🌙 [Consolidation] Starting memory consolidation at {datetime.now()}")
# Get episodic memory collection
print("📊 [Consolidation] Fetching unconsolidated memories...")
episodic_memory = cat.memory.vectors.episodic
# Get all points from episodic memory
# Qdrant API: scroll through all points
try:
from qdrant_client.models import Filter, FieldCondition, MatchValue
# Query for unconsolidated memories
# Filter by consolidated=False
filter_condition = Filter(
must=[
FieldCondition(
key="metadata.consolidated",
match=MatchValue(value=False)
)
]
)
# Get all unconsolidated memories
results = episodic_memory.client.scroll(
collection_name=episodic_memory.collection_name,
scroll_filter=filter_condition,
limit=1000, # Max per batch
with_payload=True,
with_vectors=False
)
memories = results[0] if results else []
print(f"📊 [Consolidation] Found {len(memories)} unconsolidated memories")
if len(memories) == 0:
print("✨ [Consolidation] No memories to consolidate!")
return
# Group by user_id
memories_by_user = {}
for point in memories:
# Extract user_id from metadata or ID
user_id = point.payload.get('metadata', {}).get('user_id', 'unknown')
if user_id == 'unknown':
# Try to extract from ID format
continue
if user_id not in memories_by_user:
memories_by_user[user_id] = []
memories_by_user[user_id].append(point)
print(f"📊 [Consolidation] Processing {len(memories_by_user)} users")
# Process each user
total_kept = 0
total_deleted = 0
total_processed = 0
for user_id, user_memories in memories_by_user.items():
print(f"\n👤 [Consolidation] Processing user: {user_id} ({len(user_memories)} memories)")
# Simulate consolidation for now
# In Phase 2 complete, this will call consolidate_user_memories()
for memory in user_memories:
total_processed += 1
# Simple heuristic for testing
content = memory.payload.get('page_content', '')
# Delete if very short or common reactions
if len(content.strip()) <= 2 or content.lower().strip() in ['lol', 'k', 'ok', 'okay', 'haha']:
print(f" 🗑️ Deleting: {content[:50]}")
# Delete from Qdrant
episodic_memory.client.delete(
collection_name=episodic_memory.collection_name,
points_selector=[memory.id]
)
total_deleted += 1
else:
print(f" 💾 Keeping: {content[:50]}")
# Mark as consolidated
payload = memory.payload
if 'metadata' not in payload:
payload['metadata'] = {}
payload['metadata']['consolidated'] = True
payload['metadata']['importance'] = 5 # Default importance
# Update in Qdrant
episodic_memory.client.set_payload(
collection_name=episodic_memory.collection_name,
payload=payload,
points=[memory.id]
)
total_kept += 1
consolidation_state['stats']['total_processed'] = total_processed
consolidation_state['stats']['kept'] = total_kept
consolidation_state['stats']['deleted'] = total_deleted
consolidation_state['last_run'] = datetime.now()
print(f"\n✨ [Consolidation] Complete! Stats:")
print(f" Processed: {total_processed}")
print(f" Kept: {total_kept}")
print(f" Deleted: {total_deleted}")
print(f" Facts learned: {consolidation_state['stats']['facts_learned']}")
except Exception as e:
print(f"❌ [Consolidation] Error querying memories: {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"❌ [Consolidation] Error: {e}")
import traceback
traceback.print_exc()
finally:
consolidation_state['is_running'] = False
# ===================================================================
# HOOKS
# ===================================================================
@hook(priority=50)
def after_cat_bootstrap(cat):
"""
Run after Cat starts up.
Schedule nightly consolidation task.
"""
print("🌙 [Memory Consolidation] Plugin loaded")
print(" Scheduling nightly consolidation for 3:00 AM")
# TODO: Implement scheduler (APScheduler or similar)
# For now, just log that we're ready
return None
"""Run after Cat starts up."""
print("\U0001f319 [Memory Consolidation] Plugin loaded")
print(" Manual consolidation available via 'consolidate now' command")
# NOTE: before_cat_sends_message is defined below (line ~438) with merged logic
@hook(priority=10)
def before_cat_recalls_memories(cat):
"""
Retrieve declarative facts BEFORE Cat recalls episodic memories.
This ensures facts are available when building the prompt.
Note: This hook may not execute in all Cat versions - kept for compatibility.
"""
pass # Declarative search now happens in agent_prompt_prefix
@hook(priority=45)
def after_cat_recalls_memories(cat):
"""
Hook placeholder for after memory recall.
Currently unused but kept for future enhancements.
"""
pass
# Manual trigger via agent_prompt_prefix hook
@hook(priority=10)
def agent_prompt_prefix(prefix, cat):
"""
1. Search and inject declarative facts into the prompt
2. Handle admin commands like 'consolidate now'
Runs AFTER miku_personality (priority 100) sets the base prompt.
1. Search and inject declarative facts into the prompt.
2. Handle 'consolidate now' command.
"""
# PART 1: Search for declarative facts and inject into prompt
try:
user_message_json = cat.working_memory.get('user_message_json', {})
user_text = user_message_json.get('text', '').strip()
if user_text:
# Search declarative memory
# PART 1: Inject declarative facts
try:
if user_text and user_text.lower() not in ('consolidate', 'consolidate now', '/consolidate'):
declarative_memory = cat.memory.vectors.declarative
embedding = cat.embedder.embed_query(user_text)
results = declarative_memory.recall_memories_from_embedding(
embedding=embedding,
metadata=None,
metadata={"source": cat.user_id},
k=5
)
if results:
high_confidence_facts = []
for item in results:
doc = item[0]
score = item[1]
if score > 0.5: # Only reasonably relevant facts
high_confidence_facts.append(doc.page_content)
high_confidence_facts = [
item[0].page_content
for item in results
if item[1] > 0.5
]
if high_confidence_facts:
facts_text = "\n\n## 📝 Personal Facts About the User:\n"
facts_text = "\n\n## Personal Facts About the User:\n"
for fact in high_confidence_facts:
facts_text += f"- {fact}\n"
facts_text += "\n(Use these facts when answering the user's question)\n"
prefix += facts_text
print(f"[Declarative] Injected {len(high_confidence_facts)} facts into prompt")
print(f"[Declarative] Injected {len(high_confidence_facts)} facts into prompt")
except Exception as e:
print(f"[Declarative] Error: {e}")
print(f"[Declarative] Error: {e}")
# PART 2: Handle consolidation command
user_message = cat.working_memory.get('user_message_json', {})
user_text = user_message.get('text', '').lower().strip()
if user_text.lower() in ('consolidate', 'consolidate now', '/consolidate'):
print("[Consolidation] Manual trigger command received!")
trigger_consolidation_sync(cat)
if user_text in ['consolidate', 'consolidate now', '/consolidate']:
print("🔧 [Consolidation] Manual trigger command received!")
# Run consolidation synchronously
import asyncio
try:
# Try to get the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context, schedule as task
print("🔄 [Consolidation] Scheduling async task...")
# Run synchronously using run_until_complete won't work here
# Instead, we'll use the manual non-async version
result = trigger_consolidation_sync(cat)
else:
# Not in async context, safe to run_until_complete
result = loop.run_until_complete(run_consolidation(cat))
except RuntimeError:
# Fallback to sync version
result = trigger_consolidation_sync(cat)
# Store the result in working memory so it can be used by other hooks
stats = consolidation_state['stats']
cat.working_memory['consolidation_triggered'] = True
cat.working_memory['consolidation_stats'] = stats
return prefix
print("✅ [Consolidation Plugin] agent_prompt_prefix hook registered")
# Intercept the response to replace with consolidation stats
@hook(priority=10)
def before_cat_sends_message(message, cat):
"""
1. Inject declarative facts into response context
2. Replace response if consolidation was triggered
1. Replace response with consolidation stats if consolidation was triggered.
2. Store Miku's response in episodic memory (bidirectional memory).
"""
import sys
sys.stderr.write("\n<EFBFBD> [before_cat_sends_message] Hook executing...\n")
sys.stderr.flush()
# PART 1: Inject declarative facts
try:
user_message_json = cat.working_memory.get('user_message_json', {})
user_text = user_message_json.get('text', '')
if user_text and not cat.working_memory.get('consolidation_triggered', False):
# Search declarative memory for relevant facts
declarative_memory = cat.memory.vectors.declarative
embedding = cat.embedder.embed_query(user_text)
results = declarative_memory.recall_memories_from_embedding(
embedding=embedding,
metadata=None,
k=5
)
if results:
sys.stderr.write(f"💡 [Declarative] Found {len(results)} facts!\n")
# Results format: [(doc, score, vector, id), ...] - ignore vector and id
high_confidence_facts = []
for item in results:
doc = item[0]
score = item[1]
if score > 0.5: # Only reasonably relevant facts
sys.stderr.write(f" - [{score:.2f}] {doc.page_content}\n")
high_confidence_facts.append(doc.page_content)
# Store facts in working memory so agent_prompt_prefix can use them
if high_confidence_facts:
cat.working_memory['declarative_facts'] = high_confidence_facts
sys.stderr.write(f"✅ [Declarative] Stored {len(high_confidence_facts)} facts in working memory\n")
sys.stderr.flush()
except Exception as e:
sys.stderr.write(f"❌ [Declarative] Error: {e}\n")
sys.stderr.flush()
# PART 2: Handle consolidation response replacement
# PART 1: Consolidation response replacement
if cat.working_memory.get('consolidation_triggered', False):
print("📝 [Consolidation] Replacing message with stats")
print("[Consolidation] Replacing message with stats")
stats = cat.working_memory.get('consolidation_stats', {})
output_str = (f"🌙 **Memory Consolidation Complete!**\n\n"
f"📊 **Stats:**\n"
output_str = (
f"\U0001f319 **Memory Consolidation Complete!**\n\n"
f"**Stats:**\n"
f"- Total processed: {stats.get('total_processed', 0)}\n"
f"- Kept: {stats.get('kept', 0)}\n"
f"- Deleted: {stats.get('deleted', 0)}\n"
f"- Facts learned: {stats.get('facts_learned', 0)}\n")
# Clear the flag
f"- Facts learned: {stats.get('facts_learned', 0)}\n"
)
cat.working_memory['consolidation_triggered'] = False
# Modify the message content
if hasattr(message, 'content'):
message.content = output_str
else:
message['content'] = output_str
# PART 3: Store Miku's response in memory
# PART 2: Store Miku's response in episodic memory
try:
# Get Miku's response text
if hasattr(message, 'content'):
miku_response = message.content
elif isinstance(message, dict):
@@ -479,10 +141,7 @@ def before_cat_sends_message(message, cat):
else:
miku_response = str(message)
if miku_response and len(miku_response) > 3:
from datetime import datetime
# Prepare metadata
if miku_response and len(miku_response.strip()) > 3:
metadata = {
'source': cat.user_id,
'when': datetime.now().timestamp(),
@@ -493,39 +152,37 @@ def before_cat_sends_message(message, cat):
'channel_id': cat.working_memory.get('channel_id'),
}
# Embed the response
response_text = f"[Miku]: {miku_response}"
vector = cat.embedder.embed_query(response_text)
# Store in episodic memory
cat.memory.vectors.episodic.add_point(
content=response_text,
vector=vector,
metadata=metadata
)
print(f"💬 [Miku Memory] Stored response: {miku_response[:50]}...")
print(f"[Miku Memory] Stored response: {miku_response[:50]}...")
except Exception as e:
print(f"[Miku Memory] Error: {e}")
print(f"[Miku Memory] Error storing response: {e}")
return message
print("✅ [Consolidation Plugin] before_cat_sends_message hook registered")
# ===================================================================
# CONSOLIDATION ENGINE
# ===================================================================
def trigger_consolidation_sync(cat):
"""
Synchronous version of consolidation for use in hooks.
Synchronous consolidation for use in hooks.
Processes ALL unconsolidated memories across all users.
"""
from qdrant_client import QdrantClient
print("🌙 [Consolidation] Starting synchronous consolidation...")
print("[Consolidation] Starting synchronous consolidation...")
# Connect to Qdrant
qdrant_host = os.getenv('QDRANT_HOST', 'localhost')
qdrant_port = int(os.getenv('QDRANT_PORT', 6333))
client = QdrantClient(host=qdrant_host, port=qdrant_port)
# Query all unconsolidated memories
@@ -542,137 +199,107 @@ def trigger_consolidation_sync(cat):
)
memories = result[0]
print(f"📊 [Consolidation] Found {len(memories)} unconsolidated memories")
print(f"[Consolidation] Found {len(memories)} unconsolidated memories")
if not memories:
consolidation_state['stats'] = {
'total_processed': 0,
'kept': 0,
'deleted': 0,
'facts_learned': 0
'total_processed': 0, 'kept': 0, 'deleted': 0, 'facts_learned': 0
}
return
#Apply heuristic-based consolidation
# Classify memories
to_delete = []
to_mark_consolidated = []
user_messages_for_facts = [] # Track USER messages separately for fact extraction
# Group user messages by source (user_id) for per-user fact extraction
user_messages_by_source = {}
for point in memories:
content = point.payload.get('page_content', '').strip()
content_lower = content.lower()
metadata = point.payload.get('metadata', {})
# Check if this is a Miku message
is_miku_message = (
metadata.get('speaker') == 'miku' or
content.startswith('[Miku]:')
metadata.get('speaker') == 'miku'
or content.startswith('[Miku]:')
)
# Trivial patterns (expanded list)
trivial_patterns = [
'lol', 'k', 'ok', 'okay', 'haha', 'lmao', 'xd', 'rofl', 'lmfao',
'brb', 'gtg', 'afk', 'ttyl', 'lmk', 'idk', 'tbh', 'imo', 'imho',
'omg', 'wtf', 'fyi', 'btw', 'nvm', 'jk', 'ikr', 'smh',
'hehe', 'heh', 'gg', 'wp', 'gz', 'gj', 'ty', 'thx', 'np', 'yw',
'nice', 'cool', 'neat', 'wow', 'yep', 'nope', 'yeah', 'nah'
]
is_trivial = False
# Check if it matches trivial patterns
if len(content_lower) <= 3 and content_lower in trivial_patterns:
is_trivial = True
elif content_lower in trivial_patterns:
is_trivial = True
# Check if trivial
is_trivial = content_lower in TRIVIAL_PATTERNS
if is_trivial:
to_delete.append(point.id)
else:
to_mark_consolidated.append(point.id)
# Only add USER messages for fact extraction (not Miku's responses)
# Only user messages go to fact extraction, grouped by user
if not is_miku_message:
user_messages_for_facts.append(point.id)
source = metadata.get('source', 'unknown')
if source not in user_messages_by_source:
user_messages_by_source[source] = []
user_messages_by_source[source].append(point.id)
# Delete trivial memories
if to_delete:
client.delete(
collection_name='episodic',
points_selector=to_delete
)
print(f"🗑️ [Consolidation] Deleted {len(to_delete)} trivial memories")
client.delete(collection_name='episodic', points_selector=to_delete)
print(f"[Consolidation] Deleted {len(to_delete)} trivial memories")
# Mark important memories as consolidated
# Mark kept memories as consolidated
if to_mark_consolidated:
for point_id in to_mark_consolidated:
# Get the point
point = client.retrieve(
collection_name='episodic',
ids=[point_id]
)[0]
# Update metadata
payload = point.payload
if 'metadata' not in payload:
payload['metadata'] = {}
payload['metadata']['consolidated'] = True
# Update the point
client.set_payload(
collection_name='episodic',
payload=payload,
payload={"metadata.consolidated": True},
points=[point_id]
)
print(f"[Consolidation] Marked {len(to_mark_consolidated)} memories as consolidated")
print(f"✅ [Consolidation] Marked {len(to_mark_consolidated)} memories as consolidated")
# Update stats
facts_extracted = 0
# Extract declarative facts from USER messages only (not Miku's responses)
print(f"🔍 [Consolidation] Extracting declarative facts from {len(user_messages_for_facts)} user messages...")
facts_extracted = extract_and_store_facts(client, user_messages_for_facts, cat)
print(f"📝 [Consolidation] Extracted and stored {facts_extracted} declarative facts")
# Extract facts per user
total_facts = 0
for source_user_id, memory_ids in user_messages_by_source.items():
print(f"[Consolidation] Extracting facts for user '{source_user_id}' from {len(memory_ids)} messages...")
facts = extract_and_store_facts(client, memory_ids, cat, source_user_id)
total_facts += facts
print(f"[Consolidation] Extracted {facts} facts for user '{source_user_id}'")
consolidation_state['stats'] = {
'total_processed': len(memories),
'kept': len(to_mark_consolidated),
'deleted': len(to_delete),
'facts_learned': facts_extracted
'facts_learned': total_facts
}
print("[Consolidation] Synchronous consolidation complete!")
print("[Consolidation] Synchronous consolidation complete!")
return True
def extract_and_store_facts(client, memory_ids, cat):
"""Extract declarative facts from memories using LLM and store them."""
# ===================================================================
# FACT EXTRACTION
# ===================================================================
def extract_and_store_facts(client, memory_ids, cat, user_id):
"""
Extract declarative facts from user memories using LLM and store them.
Facts are scoped to the specific user_id.
Uses Cat's embedder to ensure vector compatibility.
Deduplicates against existing facts before storing.
"""
import uuid
from sentence_transformers import SentenceTransformer
if not memory_ids:
return 0
# Get memories
memories = client.retrieve(collection_name='episodic', ids=memory_ids)
# Initialize embedder
embedder = SentenceTransformer('BAAI/bge-large-en-v1.5')
facts_stored = 0
# Process memories in batches to avoid overwhelming the LLM
# Process in batches of 5
batch_size = 5
for i in range(0, len(memories), batch_size):
batch = memories[i:i + batch_size]
# Combine batch messages for LLM analysis
conversation_context = "\n".join([
f"- {mem.payload.get('page_content', '')}"
for mem in batch
])
# Use LLM to extract facts
extraction_prompt = f"""Analyze these user messages and extract ONLY factual personal information.
User messages:
@@ -687,34 +314,32 @@ Extract facts in this exact format (one per line):
- The user's favorite color is [color]
- The user enjoys [hobby/activity]
- The user prefers [preference]
- The user's birthday is [date]
- The user graduated from [school/university]
IMPORTANT:
- Only include facts that are CLEARLY stated
- Only include facts that are CLEARLY stated in the messages
- Use the EXACT format shown above
- If no facts found, respond with: "No facts found"
- Do not include greetings, questions, or opinions
- Do not invent or assume facts not explicitly stated
"""
try:
# Call LLM
response = cat.llm(extraction_prompt)
print(f"[LLM Extract] Response:\n{response[:200]}...")
print(f"🤖 [LLM Extract] Response:\n{response[:200]}...")
# Parse LLM response for facts
lines = response.strip().split('\n')
for line in lines:
line = line.strip()
# Skip empty lines, headers, or "no facts" responses
if not line or line.lower().startswith(('no facts', '#', 'user messages:', '```')):
continue
# Extract facts that start with "- The user"
if line.startswith('- The user'):
fact_text = line[2:].strip() # Remove "- " prefix
# Determine fact type from the sentence structure
# Determine fact type
fact_type = 'general'
fact_value = fact_text
@@ -742,11 +367,20 @@ IMPORTANT:
elif "prefers" in fact_text:
fact_type = 'preference'
fact_value = fact_text.split("prefers")[-1].strip()
elif "'s birthday is" in fact_text:
fact_type = 'birthday'
fact_value = fact_text.split("'s birthday is")[-1].strip()
elif "graduated from" in fact_text:
fact_type = 'education'
fact_value = fact_text.split("graduated from")[-1].strip()
# Generate embedding for the fact
fact_embedding = embedder.encode(fact_text).tolist()
# Duplicate detection
if _is_duplicate_fact(client, cat, fact_text, fact_type, user_id):
print(f"[Fact Skip] Duplicate: {fact_text}")
continue
# Store in declarative collection
# Store fact using Cat's embedder
fact_embedding = cat.embedder.embed_query(fact_text)
point_id = str(uuid.uuid4())
client.upsert(
@@ -757,71 +391,88 @@ IMPORTANT:
'payload': {
'page_content': fact_text,
'metadata': {
'source': 'memory_consolidation',
'when': batch[0].payload.get('metadata', {}).get('when', 0),
'source': user_id,
'when': datetime.now().timestamp(),
'fact_type': fact_type,
'fact_value': fact_value,
'user_id': 'global'
}
}
}]
)
facts_stored += 1
print(f"[Fact Stored] {fact_text}")
print(f"[Fact Stored] [{user_id}] {fact_text}")
except Exception as e:
print(f"[LLM Extract] Error: {e}")
print(f"[LLM Extract] Error: {e}")
import traceback
traceback.print_exc()
return facts_stored
def trigger_consolidation_manual(cat):
def _is_duplicate_fact(client, cat, fact_text, fact_type, user_id):
"""
Manually trigger consolidation for testing.
Can be called via admin API or command.
Check if a similar fact already exists for this user.
Uses vector similarity to detect semantic duplicates.
"""
print("🔧 [Consolidation] Manual trigger received")
# Run consolidation
import asyncio
try:
# Create event loop if needed
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
fact_embedding = cat.embedder.embed_query(fact_text)
loop.run_until_complete(run_consolidation(cat))
# Search existing facts for this user with same fact_type
results = client.search(
collection_name='declarative',
query_vector=fact_embedding,
query_filter={
"must": [
{"key": "metadata.source", "match": {"value": user_id}},
{"key": "metadata.fact_type", "match": {"value": fact_type}},
]
},
limit=1,
score_threshold=0.85 # High threshold = very similar
)
return consolidation_state
if results:
existing = results[0].payload.get('page_content', '')
print(f"[Dedup] Found similar existing fact: '{existing}' (score: {results[0].score:.2f})")
return True
return False
except Exception as e:
print(f"[Dedup] Error checking duplicates: {e}")
return False # On error, allow storing
# Plugin metadata
__version__ = "1.0.0"
__description__ = "Sleep consolidation - analyze memories nightly, keep important, delete trivial"
# ===================================================================
# TOOL (for Cat's tool system)
# ===================================================================
print("✅ [Consolidation Plugin] after_cat_recalls_memories hook registered")
# Tool for manual consolidation trigger
@tool(return_direct=True)
def consolidate_memories(tool_input, cat):
"""Use this tool to consolidate memories. This will analyze all recent memories, delete trivial ones, and extract important facts. Input is always an empty string."""
"""Use this tool to consolidate memories. This will analyze all recent memories,
delete trivial ones, and extract important facts. Input is always an empty string."""
print("🔧 [Consolidation] Tool called!")
print("[Consolidation] Tool called!")
trigger_consolidation_sync(cat)
# Run consolidation synchronously
result = trigger_consolidation_sync(cat)
# Return stats
stats = consolidation_state['stats']
return (f"🌙 **Memory Consolidation Complete!**\n\n"
f"📊 **Stats:**\n"
return (
f"\U0001f319 **Memory Consolidation Complete!**\n\n"
f"**Stats:**\n"
f"- Total processed: {stats['total_processed']}\n"
f"- Kept: {stats['kept']}\n"
f"- Deleted: {stats['deleted']}\n"
f"- Facts learned: {stats['facts_learned']}\n")
f"- Facts learned: {stats['facts_learned']}\n"
)
# ===================================================================
# PLUGIN METADATA
# ===================================================================
__version__ = "2.0.0"
__description__ = "Sleep consolidation - analyze memories, keep important, delete trivial, extract per-user facts"
print("[Consolidation Plugin] All hooks registered")

View File

@@ -1 +0,0 @@
sentence-transformers>=2.2.0

246
cheshire-cat/cat/log.py Normal file
View File

@@ -0,0 +1,246 @@
"""The log engine."""
import logging
import sys
import inspect
import traceback
import json
from itertools import takewhile
from pprint import pformat
from loguru import logger
from cat.env import get_env
def get_log_level():
"""Return the global LOG level."""
return get_env("CCAT_LOG_LEVEL")
class CatLogEngine:
"""The log engine.
Engine to filter the logs in the terminal according to the level of severity.
Attributes
----------
LOG_LEVEL : str
Level of logging set in the `.env` file.
Notes
-----
The logging level set in the `.env` file will print all the logs from that level to above.
Available levels are:
- `DEBUG`
- `INFO`
- `WARNING`
- `ERROR`
- `CRITICAL`
Default to `INFO`.
"""
def __init__(self):
self.LOG_LEVEL = get_log_level()
self.default_log()
# workaround for pdfminer logging
# https://github.com/pdfminer/pdfminer.six/issues/347
logging.getLogger("pdfminer").setLevel(logging.WARNING)
def show_log_level(self, record):
"""Allows to show stuff in the log based on the global setting.
Parameters
----------
record : dict
Returns
-------
bool
"""
return record["level"].no >= logger.level(self.LOG_LEVEL).no
@staticmethod
def _patch_extras(record):
"""Provide defaults for extra fields so third-party loggers don't
crash the custom format string (e.g. fastembed deprecation warnings)."""
record["extra"].setdefault("original_name", "(third-party)")
record["extra"].setdefault("original_class", "")
record["extra"].setdefault("original_caller", "")
record["extra"].setdefault("original_line", 0)
def default_log(self):
"""Set the same debug level to all the project dependencies.
Returns
-------
"""
time = "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>"
level = "<level>{level: <6}</level>"
origin = "<level>{extra[original_name]}.{extra[original_class]}.{extra[original_caller]}::{extra[original_line]}</level>"
message = "<level>{message}</level>"
log_format = f"{time} {level} {origin} \n{message}"
logger.remove()
logger.configure(patcher=self._patch_extras)
if self.LOG_LEVEL == "DEBUG":
return logger.add(
sys.stdout,
colorize=True,
format=log_format,
backtrace=True,
diagnose=True,
filter=self.show_log_level
)
else:
return logger.add(
sys.stdout,
colorize=True,
format=log_format,
filter=self.show_log_level,
level=self.LOG_LEVEL
)
def get_caller_info(self, skip=3):
"""Get the name of a caller in the format module.class.method.
Copied from: https://gist.github.com/techtonik/2151727
Parameters
----------
skip : int
Specifies how many levels of stack to skip while getting caller name.
Returns
-------
package : str
Caller package.
module : str
Caller module.
klass : str
Caller classname if one otherwise None.
caller : str
Caller function or method (if a class exist).
line : int
The line of the call.
Notes
-----
skip=1 means "who calls me",
skip=2 "who calls my caller" etc.
An empty string is returned if skipped levels exceed stack height.
"""
stack = inspect.stack()
start = 0 + skip
if len(stack) < start + 1:
return ""
parentframe = stack[start][0]
# module and packagename.
module_info = inspect.getmodule(parentframe)
if module_info:
mod = module_info.__name__.split(".")
package = mod[0]
module = ".".join(mod[1:])
# class name.
klass = ""
if "self" in parentframe.f_locals:
klass = parentframe.f_locals["self"].__class__.__name__
# method or function name.
caller = None
if parentframe.f_code.co_name != "<module>": # top level usually
caller = parentframe.f_code.co_name
# call line.
line = parentframe.f_lineno
# Remove reference to frame
# See: https://docs.python.org/3/library/inspect.html#the-interpreter-stack
del parentframe
return package, module, klass, caller, line
def __call__(self, msg, level="DEBUG"):
"""Alias of self.log()"""
self.log(msg, level)
def debug(self, msg):
"""Logs a DEBUG message"""
self.log(msg, level="DEBUG")
def info(self, msg):
"""Logs an INFO message"""
self.log(msg, level="INFO")
def warning(self, msg):
"""Logs a WARNING message"""
self.log(msg, level="WARNING")
def error(self, msg):
"""Logs an ERROR message"""
self.log(msg, level="ERROR")
def critical(self, msg):
"""Logs a CRITICAL message"""
self.log(msg, level="CRITICAL")
def log(self, msg, level="DEBUG"):
"""Log a message
Parameters
----------
msg :
Message to be logged.
level : str
Logging level."""
(package, module, klass, caller, line) = self.get_caller_info()
custom_logger = logger.bind(
original_name=f"{package}.{module}",
original_line=line,
original_class=klass,
original_caller=caller,
)
# prettify
if type(msg) in [dict, list, str]: # TODO: should be recursive
try:
msg = json.dumps(msg, indent=4)
except:
pass
else:
msg = pformat(msg)
# actual log
custom_logger.log(level, msg)
def welcome(self):
"""Welcome message in the terminal."""
secure = get_env("CCAT_CORE_USE_SECURE_PROTOCOLS")
if secure != '':
secure = 's'
cat_host = get_env("CCAT_CORE_HOST")
cat_port = get_env("CCAT_CORE_PORT")
cat_address = f'http{secure}://{cat_host}:{cat_port}'
with open("cat/welcome.txt", 'r') as f:
print(f.read())
print('\n=============== ^._.^ ===============\n')
print(f'Cat REST API: {cat_address}/docs')
print(f'Cat PUBLIC: {cat_address}/public')
print(f'Cat ADMIN: {cat_address}/admin\n')
print('======================================')
# logger instance
log = CatLogEngine()

View File

@@ -0,0 +1,60 @@
services:
cheshire-cat-core:
image: ghcr.io/cheshire-cat-ai/core:1.6.2
container_name: miku_cheshire_cat_test
depends_on:
- cheshire-cat-vector-memory
environment:
PYTHONUNBUFFERED: "1"
WATCHFILES_FORCE_POLLING: "true"
CORE_HOST: ${CORE_HOST:-localhost}
CORE_PORT: ${CORE_PORT:-1865}
QDRANT_HOST: ${QDRANT_HOST:-cheshire-cat-vector-memory}
QDRANT_PORT: ${QDRANT_PORT:-6333}
CORE_USE_SECURE_PROTOCOLS: ${CORE_USE_SECURE_PROTOCOLS:-false}
API_KEY: ${API_KEY:-}
LOG_LEVEL: ${LOG_LEVEL:-INFO}
DEBUG: ${DEBUG:-true}
SAVE_MEMORY_SNAPSHOTS: ${SAVE_MEMORY_SNAPSHOTS:-false}
OPENAI_API_BASE: "http://host.docker.internal:8091/v1"
ports:
- "${CORE_PORT:-1865}:80"
# Allow connection to host services (llama-swap)
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./cat/static:/app/cat/static
- ./cat/plugins:/app/cat/plugins
- ./cat/data:/app/cat/data
- ./cat/log.py:/app/cat/log.py # Patched: fix loguru KeyError for third-party libs
restart: unless-stopped
networks:
- miku-test-network
- miku-discord_default # Connect to existing miku bot network
cheshire-cat-vector-memory:
image: qdrant/qdrant:v1.9.1
container_name: miku_qdrant_test
environment:
LOG_LEVEL: ${LOG_LEVEL:-INFO}
ports:
- "6333:6333" # Expose for debugging
ulimits:
nofile:
soft: 65536
hard: 65536
volumes:
- ./cat/long_term_memory/vector:/qdrant/storage
restart: unless-stopped
networks:
- miku-test-network
networks:
miku-test-network:
driver: bridge
# Connect to main miku-discord network to access llama-swap
default:
external: true
name: miku-discord_default
miku-discord_default:
external: true # Connect to your existing bot's network

View File

@@ -1,196 +1,254 @@
#!/usr/bin/env python3
"""
Full Pipeline Test for Memory Consolidation System
Tests all phases: Storage → Consolidation → Fact Extraction → Recall
Full Pipeline Test for Memory Consolidation System v2.0.0
"""
import requests
import time
import json
import sys
BASE_URL = "http://localhost:1865"
CAT_URL = "http://localhost:1865"
QDRANT_URL = "http://localhost:6333"
CONSOLIDATION_TIMEOUT = 180
def send_message(text):
"""Send a message to Miku and get response"""
resp = requests.post(f"{BASE_URL}/message", json={"text": text})
def send_message(text, timeout=30):
try:
resp = requests.post(f"{CAT_URL}/message", json={"text": text}, timeout=timeout)
resp.raise_for_status()
return resp.json()
except requests.exceptions.Timeout:
return {"error": "timeout", "content": ""}
except Exception as e:
return {"error": str(e), "content": ""}
def qdrant_scroll(collection, limit=200, filt=None):
body = {"limit": limit, "with_payload": True, "with_vector": False}
if filt:
body["filter"] = filt
resp = requests.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body)
return resp.json()["result"]["points"]
def qdrant_count(collection):
return len(qdrant_scroll(collection))
def section(title):
print(f"\n{'=' * 70}")
print(f" {title}")
print(f"{'=' * 70}")
def get_qdrant_count(collection):
"""Get count of items in Qdrant collection"""
resp = requests.post(
f"http://localhost:6333/collections/{collection}/points/scroll",
json={"limit": 1000, "with_payload": False, "with_vector": False}
)
return len(resp.json()["result"]["points"])
print("=" * 70)
print("🧪 FULL PIPELINE TEST - Memory Consolidation System")
print(" FULL PIPELINE TEST - Memory Consolidation v2.0.0")
print("=" * 70)
try:
requests.get(f"{CAT_URL}/", timeout=5)
except Exception:
print("ERROR: Cat not reachable"); sys.exit(1)
try:
requests.get(f"{QDRANT_URL}/collections", timeout=5)
except Exception:
print("ERROR: Qdrant not reachable"); sys.exit(1)
episodic_start = qdrant_count("episodic")
declarative_start = qdrant_count("declarative")
print(f"\nStarting state: {episodic_start} episodic, {declarative_start} declarative")
results = {}
# TEST 1: Trivial Message Filtering
print("\n📋 TEST 1: Trivial Message Filtering")
print("-" * 70)
section("TEST 1: Trivial Message Filtering")
trivial_messages = ["lol", "k", "ok", "haha", "xd"]
important_message = "My name is Alex and I live in Seattle"
print("Sending trivial messages (should be filtered out)...")
trivial_messages = ["lol", "k", "ok", "haha", "xd", "brb"]
print(f"Sending {len(trivial_messages)} trivial messages...")
for msg in trivial_messages:
send_message(msg)
time.sleep(0.5)
time.sleep(0.3)
print("Sending important message...")
send_message(important_message)
time.sleep(1)
# Count only USER episodic memories (exclude Miku's responses)
user_episodic = qdrant_scroll("episodic", filt={
"must_not": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
trivial_user_stored = len(user_episodic) - episodic_start
episodic_after_trivial = qdrant_count("episodic")
episodic_count = get_qdrant_count("episodic")
print(f"\n✅ Episodic memories stored: {episodic_count}")
if episodic_count < len(trivial_messages):
print(" ✓ Trivial filtering working! (some messages were filtered)")
# discord_bridge filters trivial user messages, but Miku still responds
# so we only check user-side storage
if trivial_user_stored < len(trivial_messages):
print(f" PASS - Only {trivial_user_stored}/{len(trivial_messages)} user trivial messages stored")
print(f" (Total episodic incl. Miku responses: {episodic_after_trivial})")
results["trivial_filtering"] = True
else:
print(" ⚠️ Trivial filtering may not be active")
print(f" WARN - All {trivial_user_stored} trivial messages stored")
results["trivial_filtering"] = False
# TEST 2: Miku's Response Storage
print("\n📋 TEST 2: Miku's Response Storage")
print("-" * 70)
# TEST 2: Important Message Storage
section("TEST 2: Important Message Storage")
print("Sending message and checking if Miku's response is stored...")
resp = send_message("Tell me a very short fact about music")
miku_said = resp["content"]
print(f"Miku said: {miku_said[:80]}...")
time.sleep(2)
# Check for Miku's messages in episodic
resp = requests.post(
"http://localhost:6333/collections/episodic/points/scroll",
json={
"limit": 100,
"with_payload": True,
"with_vector": False,
"filter": {"must": [{"key": "metadata.speaker", "match": {"value": "miku"}}]}
}
)
miku_messages = resp.json()["result"]["points"]
print(f"\n✅ Miku's messages in memory: {len(miku_messages)}")
if miku_messages:
print(f" Example: {miku_messages[0]['payload']['page_content'][:60]}...")
print(" ✓ Bidirectional memory working!")
else:
print(" ⚠️ Miku's responses not being stored")
# TEST 3: Add Rich Personal Information
print("\n📋 TEST 3: Adding Personal Information")
print("-" * 70)
personal_info = [
personal_facts = [
"My name is Sarah Chen",
"I'm 28 years old",
"I work as a data scientist at Google",
"My favorite color is blue",
"I love playing piano",
"I live in Seattle, Washington",
"I work as a software engineer at Microsoft",
"My favorite color is forest green",
"I love playing piano and have practiced for 15 years",
"I'm learning Japanese, currently at N3 level",
"I have a cat named Luna",
"I'm allergic to peanuts",
"I live in Tokyo, Japan",
"My hobbies include photography and hiking"
"My birthday is March 15th",
"I graduated from UW in 2018",
"I enjoy hiking on weekends",
]
print(f"Adding {len(personal_info)} messages with personal information...")
for info in personal_info:
send_message(info)
print(f"Sending {len(personal_facts)} personal info messages...")
for i, fact in enumerate(personal_facts, 1):
resp = send_message(fact)
status = "OK" if "error" not in resp else "ERR"
print(f" [{i}/{len(personal_facts)}] {status} {fact[:50]}")
time.sleep(0.5)
episodic_after = get_qdrant_count("episodic")
print(f"\n✅ Total episodic memories: {episodic_after}")
print(f" ({episodic_after - episodic_count} new memories added)")
time.sleep(1)
episodic_after_personal = qdrant_count("episodic")
personal_stored = episodic_after_personal - episodic_after_trivial
print(f"\n Episodic memories from personal info: {personal_stored}")
results["important_storage"] = personal_stored >= len(personal_facts)
print(f" {'PASS' if results['important_storage'] else 'FAIL'} - Expected >={len(personal_facts)}, got {personal_stored}")
# TEST 4: Memory Consolidation
print("\n📋 TEST 4: Memory Consolidation & Fact Extraction")
print("-" * 70)
# TEST 3: Miku Response Storage
section("TEST 3: Bidirectional Memory (Miku Response Storage)")
print("Triggering consolidation...")
resp = send_message("consolidate now")
consolidation_result = resp["content"]
print(f"\n{consolidation_result}")
miku_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.speaker", "match": {"value": "miku"}}]
})
print(f" Miku's memories in episodic: {len(miku_points)}")
if miku_points:
print(f" Sample: \"{miku_points[0]['payload']['page_content'][:70]}\"")
results["miku_storage"] = True
print(" PASS")
else:
results["miku_storage"] = False
print(" FAIL - No Miku responses in episodic memory")
time.sleep(2)
# TEST 4: Per-User Source Tagging
section("TEST 4: Per-User Source Tagging")
# Check declarative facts
declarative_count = get_qdrant_count("declarative")
print(f"\n✅ Declarative facts extracted: {declarative_count}")
user_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "user"}}]
})
print(f" Points with source='user': {len(user_points)}")
if declarative_count > 0:
# Show sample facts
resp = requests.post(
"http://localhost:6333/collections/declarative/points/scroll",
json={"limit": 5, "with_payload": True, "with_vector": False}
)
facts = resp.json()["result"]["points"]
print("\nSample facts:")
for i, fact in enumerate(facts[:5], 1):
print(f" {i}. {fact['payload']['page_content']}")
global_points = qdrant_scroll("episodic", filt={
"must": [{"key": "metadata.source", "match": {"value": "global"}}]
})
print(f" Points with source='global' (old bug): {len(global_points)}")
# TEST 5: Fact Recall
print("\n📋 TEST 5: Declarative Fact Recall")
print("-" * 70)
results["user_tagging"] = len(user_points) > 0 and len(global_points) == 0
print(f" {'PASS' if results['user_tagging'] else 'FAIL'}")
queries = [
"What is my name?",
"How old am I?",
"Where do I work?",
"What's my favorite color?",
"What am I allergic to?"
]
# TEST 5: Memory Consolidation
section("TEST 5: Memory Consolidation & Fact Extraction")
print("Testing fact recall with queries...")
correct_recalls = 0
for query in queries:
resp = send_message(query)
answer = resp["content"]
print(f"\n{query}")
print(f"💬 Miku: {answer[:150]}...")
print(f" Triggering consolidation (timeout={CONSOLIDATION_TIMEOUT}s)...")
t0 = time.time()
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
elapsed = time.time() - t0
# Basic heuristic: check if answer contains likely keywords
keywords = {
"What is my name?": ["Sarah", "Chen"],
if "error" in resp:
print(f" WARN - HTTP issue: {resp['error']} ({elapsed:.0f}s)")
print(" Waiting 60s for background completion...")
time.sleep(60)
else:
print(f" Completed in {elapsed:.1f}s")
content = resp.get("content", "")
print(f" Response: {content[:120]}...")
time.sleep(3)
declarative_after = qdrant_count("declarative")
new_facts = declarative_after - declarative_start
print(f"\n Declarative facts: {declarative_start} -> {declarative_after} (+{new_facts})")
results["consolidation"] = new_facts >= 5
print(f" {'PASS' if results['consolidation'] else 'FAIL'} - {'>=5 facts' if results['consolidation'] else f'only {new_facts}'}")
all_facts = qdrant_scroll("declarative")
print(f"\n All declarative facts ({len(all_facts)}):")
for i, f in enumerate(all_facts, 1):
content = f["payload"]["page_content"]
meta = f["payload"].get("metadata", {})
source = meta.get("source", "?")
ftype = meta.get("fact_type", "?")
print(f" {i}. [{source}|{ftype}] {content}")
# TEST 6: Duplicate Detection
section("TEST 6: Duplicate Detection (2nd consolidation)")
facts_before_2nd = qdrant_count("declarative")
print(f" Facts before: {facts_before_2nd}")
print(f" Running consolidation again...")
resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT)
time.sleep(3)
facts_after_2nd = qdrant_count("declarative")
new_dupes = facts_after_2nd - facts_before_2nd
print(f" Facts after: {facts_after_2nd} (+{new_dupes})")
results["dedup"] = new_dupes <= 2
print(f" {'PASS' if results['dedup'] else 'FAIL'} - {new_dupes} new facts (<=2 expected)")
# TEST 7: Fact Recall
section("TEST 7: Fact Recall via Natural Language")
queries = {
"What is my name?": ["sarah", "chen"],
"How old am I?": ["28"],
"Where do I work?": ["Google", "data scientist"],
"What's my favorite color?": ["blue"],
"What am I allergic to?": ["peanut"]
"Where do I live?": ["seattle"],
"Where do I work?": ["microsoft", "software engineer"],
"What am I allergic to?": ["peanut"],
}
if any(kw.lower() in answer.lower() for kw in keywords[query]):
print(" ✓ Correct recall!")
correct_recalls += 1
else:
print(" ⚠️ May not have recalled correctly")
correct = 0
for question, keywords in queries.items():
resp = send_message(question)
answer = resp.get("content", "")
hit = any(kw.lower() in answer.lower() for kw in keywords)
if hit:
correct += 1
icon = "OK" if hit else "??"
print(f" {icon} Q: {question}")
print(f" A: {answer[:150]}")
time.sleep(1)
print(f"\n✅ Fact recall accuracy: {correct_recalls}/{len(queries)} ({correct_recalls/len(queries)*100:.0f}%)")
accuracy = correct / len(queries) * 100
results["recall"] = correct >= 3
print(f"\n Recall: {correct}/{len(queries)} ({accuracy:.0f}%)")
print(f" {'PASS' if results['recall'] else 'FAIL'} (threshold: >=3)")
# TEST 6: Conversation History Recall
print("\n📋 TEST 6: Conversation History (Episodic) Recall")
print("-" * 70)
# FINAL SUMMARY
section("FINAL SUMMARY")
print("Asking about conversation history...")
resp = send_message("What have we talked about today?")
summary = resp["content"]
print(f"💬 Miku's summary:\n{summary}")
total = len(results)
passed = sum(1 for v in results.values() if v)
print()
for name, ok in results.items():
print(f" [{'PASS' if ok else 'FAIL'}] {name}")
# Final Summary
print("\n" + "=" * 70)
print("📊 FINAL SUMMARY")
print("=" * 70)
print(f"✅ Episodic memories: {get_qdrant_count('episodic')}")
print(f"✅ Declarative facts: {declarative_count}")
print(f"✅ Miku's messages stored: {len(miku_messages)}")
print(f"✅ Fact recall accuracy: {correct_recalls}/{len(queries)}")
print(f"\n Score: {passed}/{total}")
print(f" Episodic: {qdrant_count('episodic')}")
print(f" Declarative: {qdrant_count('declarative')}")
# Overall verdict
if declarative_count >= 5 and correct_recalls >= 3:
print("\n🎉 PIPELINE TEST: PASS")
print(" All major components working correctly!")
if passed == total:
print("\n ALL TESTS PASSED!")
elif passed >= total - 1:
print("\n MOSTLY PASSING - minor issues only")
else:
print("\n⚠️ PIPELINE TEST: PARTIAL PASS")
print(" Some components may need adjustment")
print("\n SOME TESTS FAILED - review above")
print("\n" + "=" * 70)