diff --git a/cat-plugins/discord_bridge/discord_bridge.py b/cat-plugins/discord_bridge/discord_bridge.py new file mode 100644 index 0000000..dcd8a41 --- /dev/null +++ b/cat-plugins/discord_bridge/discord_bridge.py @@ -0,0 +1,109 @@ +""" +Discord Bridge Plugin for Cheshire Cat + +This plugin enriches Cat's memory system with Discord context: +- Unified user identity across all servers and DMs +- Guild/channel metadata for context tracking +- Minimal filtering before storage (only skip obvious junk) +- Marks memories as unconsolidated for nightly processing + +Phase 1 Implementation +""" + +from cat.mad_hatter.decorators import hook +from datetime import datetime +import re + + +@hook(priority=100) +def before_cat_reads_message(user_message_json: dict, cat) -> dict: + """ + Enrich incoming message with Discord metadata. + This runs BEFORE the message is processed. + """ + # Extract Discord context from working memory or metadata + # These will be set by the Discord bot when calling the Cat API + guild_id = cat.working_memory.get('guild_id') + channel_id = cat.working_memory.get('channel_id') + + # Add to message metadata for later use + if 'metadata' not in user_message_json: + user_message_json['metadata'] = {} + + user_message_json['metadata']['guild_id'] = guild_id or 'dm' + user_message_json['metadata']['channel_id'] = channel_id + user_message_json['metadata']['timestamp'] = datetime.now().isoformat() + + return user_message_json + + +@hook(priority=100) +def before_cat_stores_episodic_memory(doc, cat): + """ + Filter and enrich memories before storage. + + Phase 1: Minimal filtering + - Skip only obvious junk (1-2 char messages, pure reactions) + - Store everything else temporarily + - Mark as unconsolidated for nightly processing + """ + message = doc.page_content.strip() + + # Skip only the most trivial messages + skip_patterns = [ + r'^\w{1,2}$', # 1-2 character messages: "k", "ok" + r'^(lol|lmao|haha|hehe|xd|rofl)$', # Pure reactions + r'^:[\w_]+:$', # Discord emoji only: ":smile:" + ] + + for pattern in skip_patterns: + if re.match(pattern, message.lower()): + print(f"๐Ÿ—‘๏ธ [Discord Bridge] Skipping trivial message: {message}") + return None # Don't store at all + + # Add Discord metadata to memory + doc.metadata['consolidated'] = False # Needs nightly processing + doc.metadata['stored_at'] = datetime.now().isoformat() + + # Get Discord context from working memory + guild_id = cat.working_memory.get('guild_id') + channel_id = cat.working_memory.get('channel_id') + + doc.metadata['guild_id'] = guild_id or 'dm' + doc.metadata['channel_id'] = channel_id + doc.metadata['source'] = cat.user_id # CRITICAL: Cat filters episodic by source=user_id! + doc.metadata['discord_source'] = 'discord' # Keep original value as separate field + + print(f"๐Ÿ’พ [Discord Bridge] Storing memory (unconsolidated): {message[:50]}...") + print(f" User: {cat.user_id}, Guild: {doc.metadata['guild_id']}, Channel: {channel_id}") + + return doc + + +@hook(priority=50) +def after_cat_recalls_memories(cat): + """ + Log memory recall for debugging. + Access recalled memories via cat.working_memory. + """ + import sys + sys.stderr.write("๐Ÿง  [Discord Bridge] after_cat_recalls_memories HOOK CALLED!\n") + sys.stderr.flush() + + # Get recalled memories from working memory + episodic_memories = cat.working_memory.get('episodic_memories', []) + declarative_memories = cat.working_memory.get('declarative_memories', []) + + if episodic_memories: + print(f"๐Ÿง  [Discord Bridge] Recalled {len(episodic_memories)} episodic memories for user {cat.user_id}") + # Show which guilds the memories are from + guilds = set() + for doc, score in episodic_memories: + guild = doc.metadata.get('guild_id', 'unknown') + guilds.add(guild) + print(f" From guilds: {', '.join(guilds)}") + + +# Plugin metadata +__version__ = "1.0.0" +__description__ = "Discord bridge with unified user identity and sleep consolidation support" diff --git a/cat-plugins/memory_consolidation/memory_consolidation.py b/cat-plugins/memory_consolidation/memory_consolidation.py index 9a20cf6..e863f53 100644 --- a/cat-plugins/memory_consolidation/memory_consolidation.py +++ b/cat-plugins/memory_consolidation/memory_consolidation.py @@ -5,25 +5,32 @@ Phase 2: Sleep Consolidation Implementation Implements human-like memory consolidation: 1. During the day: Store almost everything temporarily -2. At night (3 AM): Analyze conversations, keep important, delete trivial -3. Extract facts for declarative memory +2. On demand (or scheduled): Analyze conversations, keep important, delete trivial +3. Extract facts for declarative memory (per-user) This mimics how human brains consolidate memories during REM sleep. """ -from cat.mad_hatter.decorators import hook, plugin, tool -from cat.mad_hatter.decorators import CatHook -from datetime import datetime, timedelta +from cat.mad_hatter.decorators import hook, tool +from datetime import datetime import json -import asyncio import os from typing import List, Dict, Any -print("๐ŸŒ™ [Consolidation Plugin] Loading...") +print("\U0001f319 [Consolidation Plugin] Loading...") +# Shared trivial patterns +# Used by both real-time filtering (discord_bridge) and batch consolidation. +# Keep this in sync with discord_bridge's skip_patterns. +TRIVIAL_PATTERNS = frozenset([ + 'lol', 'k', 'ok', 'okay', 'haha', 'lmao', 'xd', 'rofl', 'lmfao', + 'brb', 'gtg', 'afk', 'ttyl', 'lmk', 'idk', 'tbh', 'imo', 'imho', + 'omg', 'wtf', 'fyi', 'btw', 'nvm', 'jk', 'ikr', 'smh', + 'hehe', 'heh', 'gg', 'wp', 'gz', 'gj', 'ty', 'thx', 'np', 'yw', + 'nice', 'cool', 'neat', 'wow', 'yep', 'nope', 'yeah', 'nah', +]) - -# Store consolidation state +# Consolidation state consolidation_state = { 'last_run': None, 'is_running': False, @@ -36,453 +43,105 @@ consolidation_state = { } -async def consolidate_user_memories(user_id: str, memories: List[Any], cat) -> Dict[str, Any]: - """ - Analyze all of a user's conversations from the day in ONE LLM call. - - This is the core intelligence - Miku sees patterns, themes, relationship evolution. - """ - - # Build conversation timeline - timeline = [] - for mem in sorted(memories, key=lambda m: m.metadata.get('stored_at', '')): - timeline.append({ - 'time': mem.metadata.get('stored_at', ''), - 'guild': mem.metadata.get('guild_id', 'unknown'), - 'channel': mem.metadata.get('channel_id', 'unknown'), - 'content': mem.page_content[:200] # Truncate for context window - }) - - # Build consolidation prompt - consolidation_prompt = f"""You are Miku, reviewing your conversations with user {user_id} from today. -Look at the full timeline and decide what's worth remembering long-term. - -Timeline of {len(timeline)} conversations: -{json.dumps(timeline, indent=2)} - -Analyze holistically: -1. What did you learn about this person today? -2. Any recurring themes or important moments? -3. How did your relationship with them evolve? -4. Which conversations were meaningful vs casual chitchat? - -For EACH conversation (by index), decide: -- keep: true/false (should this go to long-term memory?) -- importance: 1-10 (10 = life-changing event, 1 = forget immediately) -- categories: list of ["personal", "preference", "emotional", "event", "relationship"] -- insights: What did you learn? (for declarative memory) -- summary: One sentence for future retrieval - -Respond with VALID JSON (no extra text): -{{ - "day_summary": "One sentence about this person based on today", - "relationship_change": "How your relationship evolved (if at all)", - "conversations": [ - {{ - "index": 0, - "keep": true, - "importance": 8, - "categories": ["personal", "emotional"], - "insights": "User struggles with anxiety, needs support", - "summary": "User opened up about their anxiety" - }}, - {{ - "index": 1, - "keep": false, - "importance": 2, - "categories": [], - "insights": null, - "summary": "Just casual greeting" - }} - ], - "new_facts": [ - "User has anxiety", - "User trusts Miku enough to open up" - ] -}} -""" - - try: - # Call LLM for analysis - print(f"๐ŸŒ™ [Consolidation] Analyzing {len(memories)} memories for {user_id}...") - - # Use the Cat's LLM - from cat.looking_glass.cheshire_cat import CheshireCat - response = cat.llm(consolidation_prompt) - - # Parse JSON response - # Remove markdown code blocks if present - response = response.strip() - if response.startswith('```'): - response = response.split('```')[1] - if response.startswith('json'): - response = response[4:] - - analysis = json.loads(response) - - return analysis - - except json.JSONDecodeError as e: - print(f"โŒ [Consolidation] Failed to parse LLM response: {e}") - print(f" Response: {response[:200]}...") - # Default: keep everything if parsing fails - return { - "day_summary": "Unable to analyze", - "relationship_change": "Unknown", - "conversations": [ - {"index": i, "keep": True, "importance": 5, "categories": [], "insights": None, "summary": "Kept by default"} - for i in range(len(memories)) - ], - "new_facts": [] - } - except Exception as e: - print(f"โŒ [Consolidation] Error during analysis: {e}") - return { - "day_summary": "Error during analysis", - "relationship_change": "Unknown", - "conversations": [ - {"index": i, "keep": True, "importance": 5, "categories": [], "insights": None, "summary": "Kept by default"} - for i in range(len(memories)) - ], - "new_facts": [] - } - - -async def run_consolidation(cat): - """ - Main consolidation task. - Run at 3 AM or on-demand via admin endpoint. - """ - - if consolidation_state['is_running']: - print("โš ๏ธ [Consolidation] Already running, skipping...") - return - - try: - consolidation_state['is_running'] = True - print(f"๐ŸŒ™ [Consolidation] Starting memory consolidation at {datetime.now()}") - - # Get episodic memory collection - print("๐Ÿ“Š [Consolidation] Fetching unconsolidated memories...") - - episodic_memory = cat.memory.vectors.episodic - - # Get all points from episodic memory - # Qdrant API: scroll through all points - try: - from qdrant_client.models import Filter, FieldCondition, MatchValue - - # Query for unconsolidated memories - # Filter by consolidated=False - filter_condition = Filter( - must=[ - FieldCondition( - key="metadata.consolidated", - match=MatchValue(value=False) - ) - ] - ) - - # Get all unconsolidated memories - results = episodic_memory.client.scroll( - collection_name=episodic_memory.collection_name, - scroll_filter=filter_condition, - limit=1000, # Max per batch - with_payload=True, - with_vectors=False - ) - - memories = results[0] if results else [] - - print(f"๐Ÿ“Š [Consolidation] Found {len(memories)} unconsolidated memories") - - if len(memories) == 0: - print("โœจ [Consolidation] No memories to consolidate!") - return - - # Group by user_id - memories_by_user = {} - for point in memories: - # Extract user_id from metadata or ID - user_id = point.payload.get('metadata', {}).get('user_id', 'unknown') - if user_id == 'unknown': - # Try to extract from ID format - continue - - if user_id not in memories_by_user: - memories_by_user[user_id] = [] - - memories_by_user[user_id].append(point) - - print(f"๐Ÿ“Š [Consolidation] Processing {len(memories_by_user)} users") - - # Process each user - total_kept = 0 - total_deleted = 0 - total_processed = 0 - - for user_id, user_memories in memories_by_user.items(): - print(f"\n๐Ÿ‘ค [Consolidation] Processing user: {user_id} ({len(user_memories)} memories)") - - # Simulate consolidation for now - # In Phase 2 complete, this will call consolidate_user_memories() - for memory in user_memories: - total_processed += 1 - - # Simple heuristic for testing - content = memory.payload.get('page_content', '') - - # Delete if very short or common reactions - if len(content.strip()) <= 2 or content.lower().strip() in ['lol', 'k', 'ok', 'okay', 'haha']: - print(f" ๐Ÿ—‘๏ธ Deleting: {content[:50]}") - # Delete from Qdrant - episodic_memory.client.delete( - collection_name=episodic_memory.collection_name, - points_selector=[memory.id] - ) - total_deleted += 1 - else: - print(f" ๐Ÿ’พ Keeping: {content[:50]}") - # Mark as consolidated - payload = memory.payload - if 'metadata' not in payload: - payload['metadata'] = {} - payload['metadata']['consolidated'] = True - payload['metadata']['importance'] = 5 # Default importance - - # Update in Qdrant - episodic_memory.client.set_payload( - collection_name=episodic_memory.collection_name, - payload=payload, - points=[memory.id] - ) - total_kept += 1 - - consolidation_state['stats']['total_processed'] = total_processed - consolidation_state['stats']['kept'] = total_kept - consolidation_state['stats']['deleted'] = total_deleted - consolidation_state['last_run'] = datetime.now() - - print(f"\nโœจ [Consolidation] Complete! Stats:") - print(f" Processed: {total_processed}") - print(f" Kept: {total_kept}") - print(f" Deleted: {total_deleted}") - print(f" Facts learned: {consolidation_state['stats']['facts_learned']}") - - except Exception as e: - print(f"โŒ [Consolidation] Error querying memories: {e}") - import traceback - traceback.print_exc() - - except Exception as e: - print(f"โŒ [Consolidation] Error: {e}") - import traceback - traceback.print_exc() - finally: - consolidation_state['is_running'] = False - +# =================================================================== +# HOOKS +# =================================================================== @hook(priority=50) def after_cat_bootstrap(cat): - """ - Run after Cat starts up. - Schedule nightly consolidation task. - """ - print("๐ŸŒ™ [Memory Consolidation] Plugin loaded") - print(" Scheduling nightly consolidation for 3:00 AM") - - # TODO: Implement scheduler (APScheduler or similar) - # For now, just log that we're ready - - return None + """Run after Cat starts up.""" + print("\U0001f319 [Memory Consolidation] Plugin loaded") + print(" Manual consolidation available via 'consolidate now' command") - - -# NOTE: before_cat_sends_message is defined below (line ~438) with merged logic - - -@hook(priority=10) -def before_cat_recalls_memories(cat): - """ - Retrieve declarative facts BEFORE Cat recalls episodic memories. - This ensures facts are available when building the prompt. - Note: This hook may not execute in all Cat versions - kept for compatibility. - """ - pass # Declarative search now happens in agent_prompt_prefix - - -@hook(priority=45) -def after_cat_recalls_memories(cat): - """ - Hook placeholder for after memory recall. - Currently unused but kept for future enhancements. - """ - pass - - - - -# Manual trigger via agent_prompt_prefix hook @hook(priority=10) def agent_prompt_prefix(prefix, cat): """ - 1. Search and inject declarative facts into the prompt - 2. Handle admin commands like 'consolidate now' + Runs AFTER miku_personality (priority 100) sets the base prompt. + 1. Search and inject declarative facts into the prompt. + 2. Handle 'consolidate now' command. """ - # PART 1: Search for declarative facts and inject into prompt + user_message_json = cat.working_memory.get('user_message_json', {}) + user_text = user_message_json.get('text', '').strip() + + # PART 1: Inject declarative facts try: - user_message_json = cat.working_memory.get('user_message_json', {}) - user_text = user_message_json.get('text', '').strip() - - if user_text: - # Search declarative memory + if user_text and user_text.lower() not in ('consolidate', 'consolidate now', '/consolidate'): declarative_memory = cat.memory.vectors.declarative embedding = cat.embedder.embed_query(user_text) - + results = declarative_memory.recall_memories_from_embedding( embedding=embedding, - metadata=None, + metadata={"source": cat.user_id}, k=5 ) - + if results: - high_confidence_facts = [] - for item in results: - doc = item[0] - score = item[1] - if score > 0.5: # Only reasonably relevant facts - high_confidence_facts.append(doc.page_content) - + high_confidence_facts = [ + item[0].page_content + for item in results + if item[1] > 0.5 + ] + if high_confidence_facts: - facts_text = "\n\n## ๐Ÿ“ Personal Facts About the User:\n" + facts_text = "\n\n## Personal Facts About the User:\n" for fact in high_confidence_facts: facts_text += f"- {fact}\n" facts_text += "\n(Use these facts when answering the user's question)\n" prefix += facts_text - print(f"โœ… [Declarative] Injected {len(high_confidence_facts)} facts into prompt") - + print(f"[Declarative] Injected {len(high_confidence_facts)} facts into prompt") + except Exception as e: - print(f"โŒ [Declarative] Error: {e}") - + print(f"[Declarative] Error: {e}") + # PART 2: Handle consolidation command - user_message = cat.working_memory.get('user_message_json', {}) - user_text = user_message.get('text', '').lower().strip() - - if user_text in ['consolidate', 'consolidate now', '/consolidate']: - print("๐Ÿ”ง [Consolidation] Manual trigger command received!") - - # Run consolidation synchronously - import asyncio - try: - # Try to get the current event loop - loop = asyncio.get_event_loop() - if loop.is_running(): - # We're in an async context, schedule as task - print("๐Ÿ”„ [Consolidation] Scheduling async task...") - # Run synchronously using run_until_complete won't work here - # Instead, we'll use the manual non-async version - result = trigger_consolidation_sync(cat) - else: - # Not in async context, safe to run_until_complete - result = loop.run_until_complete(run_consolidation(cat)) - except RuntimeError: - # Fallback to sync version - result = trigger_consolidation_sync(cat) - - # Store the result in working memory so it can be used by other hooks + if user_text.lower() in ('consolidate', 'consolidate now', '/consolidate'): + print("[Consolidation] Manual trigger command received!") + trigger_consolidation_sync(cat) + stats = consolidation_state['stats'] cat.working_memory['consolidation_triggered'] = True cat.working_memory['consolidation_stats'] = stats - + return prefix -print("โœ… [Consolidation Plugin] agent_prompt_prefix hook registered") - -# Intercept the response to replace with consolidation stats @hook(priority=10) def before_cat_sends_message(message, cat): """ - 1. Inject declarative facts into response context - 2. Replace response if consolidation was triggered + 1. Replace response with consolidation stats if consolidation was triggered. + 2. Store Miku's response in episodic memory (bidirectional memory). """ - import sys - sys.stderr.write("\n๏ฟฝ [before_cat_sends_message] Hook executing...\n") - sys.stderr.flush() - - # PART 1: Inject declarative facts - try: - user_message_json = cat.working_memory.get('user_message_json', {}) - user_text = user_message_json.get('text', '') - - if user_text and not cat.working_memory.get('consolidation_triggered', False): - # Search declarative memory for relevant facts - declarative_memory = cat.memory.vectors.declarative - embedding = cat.embedder.embed_query(user_text) - - results = declarative_memory.recall_memories_from_embedding( - embedding=embedding, - metadata=None, - k=5 - ) - - if results: - sys.stderr.write(f"๐Ÿ’ก [Declarative] Found {len(results)} facts!\n") - # Results format: [(doc, score, vector, id), ...] - ignore vector and id - high_confidence_facts = [] - for item in results: - doc = item[0] - score = item[1] - if score > 0.5: # Only reasonably relevant facts - sys.stderr.write(f" - [{score:.2f}] {doc.page_content}\n") - high_confidence_facts.append(doc.page_content) - - # Store facts in working memory so agent_prompt_prefix can use them - if high_confidence_facts: - cat.working_memory['declarative_facts'] = high_confidence_facts - sys.stderr.write(f"โœ… [Declarative] Stored {len(high_confidence_facts)} facts in working memory\n") - - sys.stderr.flush() - - except Exception as e: - sys.stderr.write(f"โŒ [Declarative] Error: {e}\n") - sys.stderr.flush() - - # PART 2: Handle consolidation response replacement + + # PART 1: Consolidation response replacement if cat.working_memory.get('consolidation_triggered', False): - print("๐Ÿ“ [Consolidation] Replacing message with stats") + print("[Consolidation] Replacing message with stats") stats = cat.working_memory.get('consolidation_stats', {}) - output_str = (f"๐ŸŒ™ **Memory Consolidation Complete!**\n\n" - f"๐Ÿ“Š **Stats:**\n" - f"- Total processed: {stats.get('total_processed', 0)}\n" - f"- Kept: {stats.get('kept', 0)}\n" - f"- Deleted: {stats.get('deleted', 0)}\n" - f"- Facts learned: {stats.get('facts_learned', 0)}\n") - - # Clear the flag + output_str = ( + f"\U0001f319 **Memory Consolidation Complete!**\n\n" + f"**Stats:**\n" + f"- Total processed: {stats.get('total_processed', 0)}\n" + f"- Kept: {stats.get('kept', 0)}\n" + f"- Deleted: {stats.get('deleted', 0)}\n" + f"- Facts learned: {stats.get('facts_learned', 0)}\n" + ) cat.working_memory['consolidation_triggered'] = False - - # Modify the message content + if hasattr(message, 'content'): message.content = output_str else: message['content'] = output_str - - # PART 3: Store Miku's response in memory + + # PART 2: Store Miku's response in episodic memory try: - # Get Miku's response text if hasattr(message, 'content'): miku_response = message.content elif isinstance(message, dict): miku_response = message.get('content', '') else: miku_response = str(message) - - if miku_response and len(miku_response) > 3: - from datetime import datetime - - # Prepare metadata + + if miku_response and len(miku_response.strip()) > 3: metadata = { 'source': cat.user_id, 'when': datetime.now().timestamp(), @@ -492,42 +151,40 @@ def before_cat_sends_message(message, cat): 'guild_id': cat.working_memory.get('guild_id', 'dm'), 'channel_id': cat.working_memory.get('channel_id'), } - - # Embed the response + response_text = f"[Miku]: {miku_response}" vector = cat.embedder.embed_query(response_text) - - # Store in episodic memory + cat.memory.vectors.episodic.add_point( content=response_text, vector=vector, metadata=metadata ) - - print(f"๐Ÿ’ฌ [Miku Memory] Stored response: {miku_response[:50]}...") - + print(f"[Miku Memory] Stored response: {miku_response[:50]}...") + except Exception as e: - print(f"โŒ [Miku Memory] Error: {e}") - + print(f"[Miku Memory] Error storing response: {e}") + return message -print("โœ… [Consolidation Plugin] before_cat_sends_message hook registered") +# =================================================================== +# CONSOLIDATION ENGINE +# =================================================================== def trigger_consolidation_sync(cat): """ - Synchronous version of consolidation for use in hooks. + Synchronous consolidation for use in hooks. + Processes ALL unconsolidated memories across all users. """ from qdrant_client import QdrantClient - - print("๐ŸŒ™ [Consolidation] Starting synchronous consolidation...") - - # Connect to Qdrant + + print("[Consolidation] Starting synchronous consolidation...") + qdrant_host = os.getenv('QDRANT_HOST', 'localhost') qdrant_port = int(os.getenv('QDRANT_PORT', 6333)) - client = QdrantClient(host=qdrant_host, port=qdrant_port) - + # Query all unconsolidated memories result = client.scroll( collection_name='episodic', @@ -540,139 +197,109 @@ def trigger_consolidation_sync(cat): with_payload=True, with_vectors=False ) - + memories = result[0] - print(f"๐Ÿ“Š [Consolidation] Found {len(memories)} unconsolidated memories") - + print(f"[Consolidation] Found {len(memories)} unconsolidated memories") + if not memories: consolidation_state['stats'] = { - 'total_processed': 0, - 'kept': 0, - 'deleted': 0, - 'facts_learned': 0 + 'total_processed': 0, 'kept': 0, 'deleted': 0, 'facts_learned': 0 } return - - #Apply heuristic-based consolidation + + # Classify memories to_delete = [] to_mark_consolidated = [] - user_messages_for_facts = [] # Track USER messages separately for fact extraction - + # Group user messages by source (user_id) for per-user fact extraction + user_messages_by_source = {} + for point in memories: content = point.payload.get('page_content', '').strip() content_lower = content.lower() metadata = point.payload.get('metadata', {}) - - # Check if this is a Miku message + is_miku_message = ( - metadata.get('speaker') == 'miku' or - content.startswith('[Miku]:') + metadata.get('speaker') == 'miku' + or content.startswith('[Miku]:') ) - - # Trivial patterns (expanded list) - trivial_patterns = [ - 'lol', 'k', 'ok', 'okay', 'haha', 'lmao', 'xd', 'rofl', 'lmfao', - 'brb', 'gtg', 'afk', 'ttyl', 'lmk', 'idk', 'tbh', 'imo', 'imho', - 'omg', 'wtf', 'fyi', 'btw', 'nvm', 'jk', 'ikr', 'smh', - 'hehe', 'heh', 'gg', 'wp', 'gz', 'gj', 'ty', 'thx', 'np', 'yw', - 'nice', 'cool', 'neat', 'wow', 'yep', 'nope', 'yeah', 'nah' - ] - - is_trivial = False - - # Check if it matches trivial patterns - if len(content_lower) <= 3 and content_lower in trivial_patterns: - is_trivial = True - elif content_lower in trivial_patterns: - is_trivial = True - + + # Check if trivial + is_trivial = content_lower in TRIVIAL_PATTERNS + if is_trivial: to_delete.append(point.id) else: to_mark_consolidated.append(point.id) - # Only add USER messages for fact extraction (not Miku's responses) + # Only user messages go to fact extraction, grouped by user if not is_miku_message: - user_messages_for_facts.append(point.id) - + source = metadata.get('source', 'unknown') + if source not in user_messages_by_source: + user_messages_by_source[source] = [] + user_messages_by_source[source].append(point.id) + # Delete trivial memories if to_delete: - client.delete( - collection_name='episodic', - points_selector=to_delete - ) - print(f"๐Ÿ—‘๏ธ [Consolidation] Deleted {len(to_delete)} trivial memories") - - # Mark important memories as consolidated + client.delete(collection_name='episodic', points_selector=to_delete) + print(f"[Consolidation] Deleted {len(to_delete)} trivial memories") + + # Mark kept memories as consolidated if to_mark_consolidated: for point_id in to_mark_consolidated: - # Get the point - point = client.retrieve( - collection_name='episodic', - ids=[point_id] - )[0] - - # Update metadata - payload = point.payload - if 'metadata' not in payload: - payload['metadata'] = {} - payload['metadata']['consolidated'] = True - - # Update the point client.set_payload( collection_name='episodic', - payload=payload, + payload={"metadata.consolidated": True}, points=[point_id] ) - - print(f"โœ… [Consolidation] Marked {len(to_mark_consolidated)} memories as consolidated") - - # Update stats - facts_extracted = 0 - - # Extract declarative facts from USER messages only (not Miku's responses) - print(f"๐Ÿ” [Consolidation] Extracting declarative facts from {len(user_messages_for_facts)} user messages...") - facts_extracted = extract_and_store_facts(client, user_messages_for_facts, cat) - print(f"๐Ÿ“ [Consolidation] Extracted and stored {facts_extracted} declarative facts") - + print(f"[Consolidation] Marked {len(to_mark_consolidated)} memories as consolidated") + + # Extract facts per user + total_facts = 0 + for source_user_id, memory_ids in user_messages_by_source.items(): + print(f"[Consolidation] Extracting facts for user '{source_user_id}' from {len(memory_ids)} messages...") + facts = extract_and_store_facts(client, memory_ids, cat, source_user_id) + total_facts += facts + print(f"[Consolidation] Extracted {facts} facts for user '{source_user_id}'") + consolidation_state['stats'] = { 'total_processed': len(memories), 'kept': len(to_mark_consolidated), 'deleted': len(to_delete), - 'facts_learned': facts_extracted + 'facts_learned': total_facts } - - print("โœ… [Consolidation] Synchronous consolidation complete!") + + print("[Consolidation] Synchronous consolidation complete!") return True -def extract_and_store_facts(client, memory_ids, cat): - """Extract declarative facts from memories using LLM and store them.""" +# =================================================================== +# FACT EXTRACTION +# =================================================================== + +def extract_and_store_facts(client, memory_ids, cat, user_id): + """ + Extract declarative facts from user memories using LLM and store them. + Facts are scoped to the specific user_id. + Uses Cat's embedder to ensure vector compatibility. + Deduplicates against existing facts before storing. + """ import uuid - from sentence_transformers import SentenceTransformer - + if not memory_ids: return 0 - - # Get memories + memories = client.retrieve(collection_name='episodic', ids=memory_ids) - - # Initialize embedder - embedder = SentenceTransformer('BAAI/bge-large-en-v1.5') - facts_stored = 0 - - # Process memories in batches to avoid overwhelming the LLM + + # Process in batches of 5 batch_size = 5 for i in range(0, len(memories), batch_size): - batch = memories[i:i+batch_size] - - # Combine batch messages for LLM analysis + batch = memories[i:i + batch_size] + conversation_context = "\n".join([ f"- {mem.payload.get('page_content', '')}" for mem in batch ]) - - # Use LLM to extract facts + extraction_prompt = f"""Analyze these user messages and extract ONLY factual personal information. User messages: @@ -687,37 +314,35 @@ Extract facts in this exact format (one per line): - The user's favorite color is [color] - The user enjoys [hobby/activity] - The user prefers [preference] +- The user's birthday is [date] +- The user graduated from [school/university] IMPORTANT: -- Only include facts that are CLEARLY stated +- Only include facts that are CLEARLY stated in the messages - Use the EXACT format shown above - If no facts found, respond with: "No facts found" - Do not include greetings, questions, or opinions +- Do not invent or assume facts not explicitly stated """ - + try: - # Call LLM response = cat.llm(extraction_prompt) - - print(f"๐Ÿค– [LLM Extract] Response:\n{response[:200]}...") - - # Parse LLM response for facts + print(f"[LLM Extract] Response:\n{response[:200]}...") + lines = response.strip().split('\n') for line in lines: line = line.strip() - - # Skip empty lines, headers, or "no facts" responses + if not line or line.lower().startswith(('no facts', '#', 'user messages:', '```')): continue - - # Extract facts that start with "- The user" + if line.startswith('- The user'): fact_text = line[2:].strip() # Remove "- " prefix - - # Determine fact type from the sentence structure + + # Determine fact type fact_type = 'general' fact_value = fact_text - + if "'s name is" in fact_text: fact_type = 'name' fact_value = fact_text.split("'s name is")[-1].strip() @@ -742,13 +367,22 @@ IMPORTANT: elif "prefers" in fact_text: fact_type = 'preference' fact_value = fact_text.split("prefers")[-1].strip() - - # Generate embedding for the fact - fact_embedding = embedder.encode(fact_text).tolist() - - # Store in declarative collection + elif "'s birthday is" in fact_text: + fact_type = 'birthday' + fact_value = fact_text.split("'s birthday is")[-1].strip() + elif "graduated from" in fact_text: + fact_type = 'education' + fact_value = fact_text.split("graduated from")[-1].strip() + + # Duplicate detection + if _is_duplicate_fact(client, cat, fact_text, fact_type, user_id): + print(f"[Fact Skip] Duplicate: {fact_text}") + continue + + # Store fact using Cat's embedder + fact_embedding = cat.embedder.embed_query(fact_text) point_id = str(uuid.uuid4()) - + client.upsert( collection_name='declarative', points=[{ @@ -757,71 +391,88 @@ IMPORTANT: 'payload': { 'page_content': fact_text, 'metadata': { - 'source': 'memory_consolidation', - 'when': batch[0].payload.get('metadata', {}).get('when', 0), + 'source': user_id, + 'when': datetime.now().timestamp(), 'fact_type': fact_type, 'fact_value': fact_value, - 'user_id': 'global' } } }] ) - + facts_stored += 1 - print(f"โœ… [Fact Stored] {fact_text}") - + print(f"[Fact Stored] [{user_id}] {fact_text}") + except Exception as e: - print(f"โŒ [LLM Extract] Error: {e}") + print(f"[LLM Extract] Error: {e}") import traceback traceback.print_exc() - + return facts_stored -def trigger_consolidation_manual(cat): +def _is_duplicate_fact(client, cat, fact_text, fact_type, user_id): """ - Manually trigger consolidation for testing. - Can be called via admin API or command. + Check if a similar fact already exists for this user. + Uses vector similarity to detect semantic duplicates. """ - print("๐Ÿ”ง [Consolidation] Manual trigger received") - - # Run consolidation - import asyncio try: - # Create event loop if needed - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - loop.run_until_complete(run_consolidation(cat)) - - return consolidation_state + fact_embedding = cat.embedder.embed_query(fact_text) + + # Search existing facts for this user with same fact_type + results = client.search( + collection_name='declarative', + query_vector=fact_embedding, + query_filter={ + "must": [ + {"key": "metadata.source", "match": {"value": user_id}}, + {"key": "metadata.fact_type", "match": {"value": fact_type}}, + ] + }, + limit=1, + score_threshold=0.85 # High threshold = very similar + ) + + if results: + existing = results[0].payload.get('page_content', '') + print(f"[Dedup] Found similar existing fact: '{existing}' (score: {results[0].score:.2f})") + return True + + return False + + except Exception as e: + print(f"[Dedup] Error checking duplicates: {e}") + return False # On error, allow storing -# Plugin metadata -__version__ = "1.0.0" -__description__ = "Sleep consolidation - analyze memories nightly, keep important, delete trivial" +# =================================================================== +# TOOL (for Cat's tool system) +# =================================================================== -print("โœ… [Consolidation Plugin] after_cat_recalls_memories hook registered") - - -# Tool for manual consolidation trigger @tool(return_direct=True) def consolidate_memories(tool_input, cat): - """Use this tool to consolidate memories. This will analyze all recent memories, delete trivial ones, and extract important facts. Input is always an empty string.""" - - print("๐Ÿ”ง [Consolidation] Tool called!") - - # Run consolidation synchronously - result = trigger_consolidation_sync(cat) - - # Return stats - stats = consolidation_state['stats'] - return (f"๐ŸŒ™ **Memory Consolidation Complete!**\n\n" - f"๐Ÿ“Š **Stats:**\n" - f"- Total processed: {stats['total_processed']}\n" - f"- Kept: {stats['kept']}\n" - f"- Deleted: {stats['deleted']}\n" - f"- Facts learned: {stats['facts_learned']}\n") + """Use this tool to consolidate memories. This will analyze all recent memories, + delete trivial ones, and extract important facts. Input is always an empty string.""" + print("[Consolidation] Tool called!") + trigger_consolidation_sync(cat) + + stats = consolidation_state['stats'] + return ( + f"\U0001f319 **Memory Consolidation Complete!**\n\n" + f"**Stats:**\n" + f"- Total processed: {stats['total_processed']}\n" + f"- Kept: {stats['kept']}\n" + f"- Deleted: {stats['deleted']}\n" + f"- Facts learned: {stats['facts_learned']}\n" + ) + + +# =================================================================== +# PLUGIN METADATA +# =================================================================== + +__version__ = "2.0.0" +__description__ = "Sleep consolidation - analyze memories, keep important, delete trivial, extract per-user facts" + +print("[Consolidation Plugin] All hooks registered") diff --git a/cat-plugins/memory_consolidation/requirements.txt b/cat-plugins/memory_consolidation/requirements.txt deleted file mode 100644 index f998fbc..0000000 --- a/cat-plugins/memory_consolidation/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -sentence-transformers>=2.2.0 diff --git a/cheshire-cat/cat/log.py b/cheshire-cat/cat/log.py new file mode 100644 index 0000000..262c6f6 --- /dev/null +++ b/cheshire-cat/cat/log.py @@ -0,0 +1,246 @@ +"""The log engine.""" + +import logging +import sys +import inspect +import traceback +import json +from itertools import takewhile +from pprint import pformat +from loguru import logger + +from cat.env import get_env + +def get_log_level(): + """Return the global LOG level.""" + return get_env("CCAT_LOG_LEVEL") + + +class CatLogEngine: + """The log engine. + + Engine to filter the logs in the terminal according to the level of severity. + + Attributes + ---------- + LOG_LEVEL : str + Level of logging set in the `.env` file. + + Notes + ----- + The logging level set in the `.env` file will print all the logs from that level to above. + Available levels are: + + - `DEBUG` + - `INFO` + - `WARNING` + - `ERROR` + - `CRITICAL` + + Default to `INFO`. + + """ + + def __init__(self): + self.LOG_LEVEL = get_log_level() + self.default_log() + + # workaround for pdfminer logging + # https://github.com/pdfminer/pdfminer.six/issues/347 + logging.getLogger("pdfminer").setLevel(logging.WARNING) + + def show_log_level(self, record): + """Allows to show stuff in the log based on the global setting. + + Parameters + ---------- + record : dict + + Returns + ------- + bool + + """ + return record["level"].no >= logger.level(self.LOG_LEVEL).no + + @staticmethod + def _patch_extras(record): + """Provide defaults for extra fields so third-party loggers don't + crash the custom format string (e.g. fastembed deprecation warnings).""" + record["extra"].setdefault("original_name", "(third-party)") + record["extra"].setdefault("original_class", "") + record["extra"].setdefault("original_caller", "") + record["extra"].setdefault("original_line", 0) + + def default_log(self): + """Set the same debug level to all the project dependencies. + + Returns + ------- + """ + + time = "[{time:YYYY-MM-DD HH:mm:ss.SSS}]" + level = "{level: <6}" + origin = "{extra[original_name]}.{extra[original_class]}.{extra[original_caller]}::{extra[original_line]}" + message = "{message}" + log_format = f"{time} {level} {origin} \n{message}" + + logger.remove() + logger.configure(patcher=self._patch_extras) + if self.LOG_LEVEL == "DEBUG": + return logger.add( + sys.stdout, + colorize=True, + format=log_format, + backtrace=True, + diagnose=True, + filter=self.show_log_level + ) + else: + return logger.add( + sys.stdout, + colorize=True, + format=log_format, + filter=self.show_log_level, + level=self.LOG_LEVEL + ) + + def get_caller_info(self, skip=3): + """Get the name of a caller in the format module.class.method. + + Copied from: https://gist.github.com/techtonik/2151727 + + Parameters + ---------- + skip : int + Specifies how many levels of stack to skip while getting caller name. + + Returns + ------- + package : str + Caller package. + module : str + Caller module. + klass : str + Caller classname if one otherwise None. + caller : str + Caller function or method (if a class exist). + line : int + The line of the call. + + + Notes + ----- + skip=1 means "who calls me", + skip=2 "who calls my caller" etc. + + An empty string is returned if skipped levels exceed stack height. + """ + stack = inspect.stack() + start = 0 + skip + if len(stack) < start + 1: + return "" + parentframe = stack[start][0] + + # module and packagename. + module_info = inspect.getmodule(parentframe) + if module_info: + mod = module_info.__name__.split(".") + package = mod[0] + module = ".".join(mod[1:]) + + # class name. + klass = "" + if "self" in parentframe.f_locals: + klass = parentframe.f_locals["self"].__class__.__name__ + + # method or function name. + caller = None + if parentframe.f_code.co_name != "": # top level usually + caller = parentframe.f_code.co_name + + # call line. + line = parentframe.f_lineno + + # Remove reference to frame + # See: https://docs.python.org/3/library/inspect.html#the-interpreter-stack + del parentframe + + return package, module, klass, caller, line + + def __call__(self, msg, level="DEBUG"): + """Alias of self.log()""" + self.log(msg, level) + + def debug(self, msg): + """Logs a DEBUG message""" + self.log(msg, level="DEBUG") + + def info(self, msg): + """Logs an INFO message""" + self.log(msg, level="INFO") + + def warning(self, msg): + """Logs a WARNING message""" + self.log(msg, level="WARNING") + + def error(self, msg): + """Logs an ERROR message""" + self.log(msg, level="ERROR") + + def critical(self, msg): + """Logs a CRITICAL message""" + self.log(msg, level="CRITICAL") + + def log(self, msg, level="DEBUG"): + """Log a message + + Parameters + ---------- + msg : + Message to be logged. + level : str + Logging level.""" + + (package, module, klass, caller, line) = self.get_caller_info() + + custom_logger = logger.bind( + original_name=f"{package}.{module}", + original_line=line, + original_class=klass, + original_caller=caller, + ) + + # prettify + if type(msg) in [dict, list, str]: # TODO: should be recursive + try: + msg = json.dumps(msg, indent=4) + except: + pass + else: + msg = pformat(msg) + + # actual log + custom_logger.log(level, msg) + + def welcome(self): + """Welcome message in the terminal.""" + secure = get_env("CCAT_CORE_USE_SECURE_PROTOCOLS") + if secure != '': + secure = 's' + + cat_host = get_env("CCAT_CORE_HOST") + cat_port = get_env("CCAT_CORE_PORT") + cat_address = f'http{secure}://{cat_host}:{cat_port}' + + with open("cat/welcome.txt", 'r') as f: + print(f.read()) + + print('\n=============== ^._.^ ===============\n') + print(f'Cat REST API: {cat_address}/docs') + print(f'Cat PUBLIC: {cat_address}/public') + print(f'Cat ADMIN: {cat_address}/admin\n') + print('======================================') + +# logger instance +log = CatLogEngine() diff --git a/cheshire-cat/docker-compose.test.yml b/cheshire-cat/docker-compose.test.yml new file mode 100644 index 0000000..9916f00 --- /dev/null +++ b/cheshire-cat/docker-compose.test.yml @@ -0,0 +1,60 @@ +services: + cheshire-cat-core: + image: ghcr.io/cheshire-cat-ai/core:1.6.2 + container_name: miku_cheshire_cat_test + depends_on: + - cheshire-cat-vector-memory + environment: + PYTHONUNBUFFERED: "1" + WATCHFILES_FORCE_POLLING: "true" + CORE_HOST: ${CORE_HOST:-localhost} + CORE_PORT: ${CORE_PORT:-1865} + QDRANT_HOST: ${QDRANT_HOST:-cheshire-cat-vector-memory} + QDRANT_PORT: ${QDRANT_PORT:-6333} + CORE_USE_SECURE_PROTOCOLS: ${CORE_USE_SECURE_PROTOCOLS:-false} + API_KEY: ${API_KEY:-} + LOG_LEVEL: ${LOG_LEVEL:-INFO} + DEBUG: ${DEBUG:-true} + SAVE_MEMORY_SNAPSHOTS: ${SAVE_MEMORY_SNAPSHOTS:-false} + OPENAI_API_BASE: "http://host.docker.internal:8091/v1" + ports: + - "${CORE_PORT:-1865}:80" + # Allow connection to host services (llama-swap) + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./cat/static:/app/cat/static + - ./cat/plugins:/app/cat/plugins + - ./cat/data:/app/cat/data + - ./cat/log.py:/app/cat/log.py # Patched: fix loguru KeyError for third-party libs + restart: unless-stopped + networks: + - miku-test-network + - miku-discord_default # Connect to existing miku bot network + + cheshire-cat-vector-memory: + image: qdrant/qdrant:v1.9.1 + container_name: miku_qdrant_test + environment: + LOG_LEVEL: ${LOG_LEVEL:-INFO} + ports: + - "6333:6333" # Expose for debugging + ulimits: + nofile: + soft: 65536 + hard: 65536 + volumes: + - ./cat/long_term_memory/vector:/qdrant/storage + restart: unless-stopped + networks: + - miku-test-network + +networks: + miku-test-network: + driver: bridge + # Connect to main miku-discord network to access llama-swap + default: + external: true + name: miku-discord_default + miku-discord_default: + external: true # Connect to your existing bot's network diff --git a/test_full_pipeline.py b/test_full_pipeline.py index 1214449..544a8f8 100644 --- a/test_full_pipeline.py +++ b/test_full_pipeline.py @@ -1,196 +1,254 @@ #!/usr/bin/env python3 """ -Full Pipeline Test for Memory Consolidation System -Tests all phases: Storage โ†’ Consolidation โ†’ Fact Extraction โ†’ Recall +Full Pipeline Test for Memory Consolidation System v2.0.0 """ import requests import time import json +import sys -BASE_URL = "http://localhost:1865" +CAT_URL = "http://localhost:1865" +QDRANT_URL = "http://localhost:6333" +CONSOLIDATION_TIMEOUT = 180 -def send_message(text): - """Send a message to Miku and get response""" - resp = requests.post(f"{BASE_URL}/message", json={"text": text}) - return resp.json() -def get_qdrant_count(collection): - """Get count of items in Qdrant collection""" - resp = requests.post( - f"http://localhost:6333/collections/{collection}/points/scroll", - json={"limit": 1000, "with_payload": False, "with_vector": False} - ) - return len(resp.json()["result"]["points"]) +def send_message(text, timeout=30): + try: + resp = requests.post(f"{CAT_URL}/message", json={"text": text}, timeout=timeout) + resp.raise_for_status() + return resp.json() + except requests.exceptions.Timeout: + return {"error": "timeout", "content": ""} + except Exception as e: + return {"error": str(e), "content": ""} + + +def qdrant_scroll(collection, limit=200, filt=None): + body = {"limit": limit, "with_payload": True, "with_vector": False} + if filt: + body["filter"] = filt + resp = requests.post(f"{QDRANT_URL}/collections/{collection}/points/scroll", json=body) + return resp.json()["result"]["points"] + + +def qdrant_count(collection): + return len(qdrant_scroll(collection)) + + +def section(title): + print(f"\n{'=' * 70}") + print(f" {title}") + print(f"{'=' * 70}") + print("=" * 70) -print("๐Ÿงช FULL PIPELINE TEST - Memory Consolidation System") +print(" FULL PIPELINE TEST - Memory Consolidation v2.0.0") print("=" * 70) +try: + requests.get(f"{CAT_URL}/", timeout=5) +except Exception: + print("ERROR: Cat not reachable"); sys.exit(1) +try: + requests.get(f"{QDRANT_URL}/collections", timeout=5) +except Exception: + print("ERROR: Qdrant not reachable"); sys.exit(1) + +episodic_start = qdrant_count("episodic") +declarative_start = qdrant_count("declarative") +print(f"\nStarting state: {episodic_start} episodic, {declarative_start} declarative") + +results = {} + # TEST 1: Trivial Message Filtering -print("\n๐Ÿ“‹ TEST 1: Trivial Message Filtering") -print("-" * 70) +section("TEST 1: Trivial Message Filtering") -trivial_messages = ["lol", "k", "ok", "haha", "xd"] -important_message = "My name is Alex and I live in Seattle" - -print("Sending trivial messages (should be filtered out)...") +trivial_messages = ["lol", "k", "ok", "haha", "xd", "brb"] +print(f"Sending {len(trivial_messages)} trivial messages...") for msg in trivial_messages: send_message(msg) - time.sleep(0.5) + time.sleep(0.3) -print("Sending important message...") -send_message(important_message) time.sleep(1) +# Count only USER episodic memories (exclude Miku's responses) +user_episodic = qdrant_scroll("episodic", filt={ + "must_not": [{"key": "metadata.speaker", "match": {"value": "miku"}}] +}) +trivial_user_stored = len(user_episodic) - episodic_start +episodic_after_trivial = qdrant_count("episodic") -episodic_count = get_qdrant_count("episodic") -print(f"\nโœ… Episodic memories stored: {episodic_count}") -if episodic_count < len(trivial_messages): - print(" โœ“ Trivial filtering working! (some messages were filtered)") +# discord_bridge filters trivial user messages, but Miku still responds +# so we only check user-side storage +if trivial_user_stored < len(trivial_messages): + print(f" PASS - Only {trivial_user_stored}/{len(trivial_messages)} user trivial messages stored") + print(f" (Total episodic incl. Miku responses: {episodic_after_trivial})") + results["trivial_filtering"] = True else: - print(" โš ๏ธ Trivial filtering may not be active") + print(f" WARN - All {trivial_user_stored} trivial messages stored") + results["trivial_filtering"] = False -# TEST 2: Miku's Response Storage -print("\n๐Ÿ“‹ TEST 2: Miku's Response Storage") -print("-" * 70) +# TEST 2: Important Message Storage +section("TEST 2: Important Message Storage") -print("Sending message and checking if Miku's response is stored...") -resp = send_message("Tell me a very short fact about music") -miku_said = resp["content"] -print(f"Miku said: {miku_said[:80]}...") -time.sleep(2) - -# Check for Miku's messages in episodic -resp = requests.post( - "http://localhost:6333/collections/episodic/points/scroll", - json={ - "limit": 100, - "with_payload": True, - "with_vector": False, - "filter": {"must": [{"key": "metadata.speaker", "match": {"value": "miku"}}]} - } -) -miku_messages = resp.json()["result"]["points"] -print(f"\nโœ… Miku's messages in memory: {len(miku_messages)}") -if miku_messages: - print(f" Example: {miku_messages[0]['payload']['page_content'][:60]}...") - print(" โœ“ Bidirectional memory working!") -else: - print(" โš ๏ธ Miku's responses not being stored") - -# TEST 3: Add Rich Personal Information -print("\n๐Ÿ“‹ TEST 3: Adding Personal Information") -print("-" * 70) - -personal_info = [ +personal_facts = [ "My name is Sarah Chen", "I'm 28 years old", - "I work as a data scientist at Google", - "My favorite color is blue", - "I love playing piano", + "I live in Seattle, Washington", + "I work as a software engineer at Microsoft", + "My favorite color is forest green", + "I love playing piano and have practiced for 15 years", + "I'm learning Japanese, currently at N3 level", + "I have a cat named Luna", "I'm allergic to peanuts", - "I live in Tokyo, Japan", - "My hobbies include photography and hiking" + "My birthday is March 15th", + "I graduated from UW in 2018", + "I enjoy hiking on weekends", ] -print(f"Adding {len(personal_info)} messages with personal information...") -for info in personal_info: - send_message(info) +print(f"Sending {len(personal_facts)} personal info messages...") +for i, fact in enumerate(personal_facts, 1): + resp = send_message(fact) + status = "OK" if "error" not in resp else "ERR" + print(f" [{i}/{len(personal_facts)}] {status} {fact[:50]}") time.sleep(0.5) -episodic_after = get_qdrant_count("episodic") -print(f"\nโœ… Total episodic memories: {episodic_after}") -print(f" ({episodic_after - episodic_count} new memories added)") +time.sleep(1) +episodic_after_personal = qdrant_count("episodic") +personal_stored = episodic_after_personal - episodic_after_trivial +print(f"\n Episodic memories from personal info: {personal_stored}") +results["important_storage"] = personal_stored >= len(personal_facts) +print(f" {'PASS' if results['important_storage'] else 'FAIL'} - Expected >={len(personal_facts)}, got {personal_stored}") -# TEST 4: Memory Consolidation -print("\n๐Ÿ“‹ TEST 4: Memory Consolidation & Fact Extraction") -print("-" * 70) +# TEST 3: Miku Response Storage +section("TEST 3: Bidirectional Memory (Miku Response Storage)") -print("Triggering consolidation...") -resp = send_message("consolidate now") -consolidation_result = resp["content"] -print(f"\n{consolidation_result}") +miku_points = qdrant_scroll("episodic", filt={ + "must": [{"key": "metadata.speaker", "match": {"value": "miku"}}] +}) +print(f" Miku's memories in episodic: {len(miku_points)}") +if miku_points: + print(f" Sample: \"{miku_points[0]['payload']['page_content'][:70]}\"") + results["miku_storage"] = True + print(" PASS") +else: + results["miku_storage"] = False + print(" FAIL - No Miku responses in episodic memory") -time.sleep(2) +# TEST 4: Per-User Source Tagging +section("TEST 4: Per-User Source Tagging") -# Check declarative facts -declarative_count = get_qdrant_count("declarative") -print(f"\nโœ… Declarative facts extracted: {declarative_count}") +user_points = qdrant_scroll("episodic", filt={ + "must": [{"key": "metadata.source", "match": {"value": "user"}}] +}) +print(f" Points with source='user': {len(user_points)}") -if declarative_count > 0: - # Show sample facts - resp = requests.post( - "http://localhost:6333/collections/declarative/points/scroll", - json={"limit": 5, "with_payload": True, "with_vector": False} - ) - facts = resp.json()["result"]["points"] - print("\nSample facts:") - for i, fact in enumerate(facts[:5], 1): - print(f" {i}. {fact['payload']['page_content']}") +global_points = qdrant_scroll("episodic", filt={ + "must": [{"key": "metadata.source", "match": {"value": "global"}}] +}) +print(f" Points with source='global' (old bug): {len(global_points)}") -# TEST 5: Fact Recall -print("\n๐Ÿ“‹ TEST 5: Declarative Fact Recall") -print("-" * 70) +results["user_tagging"] = len(user_points) > 0 and len(global_points) == 0 +print(f" {'PASS' if results['user_tagging'] else 'FAIL'}") -queries = [ - "What is my name?", - "How old am I?", - "Where do I work?", - "What's my favorite color?", - "What am I allergic to?" -] +# TEST 5: Memory Consolidation +section("TEST 5: Memory Consolidation & Fact Extraction") -print("Testing fact recall with queries...") -correct_recalls = 0 -for query in queries: - resp = send_message(query) - answer = resp["content"] - print(f"\nโ“ {query}") - print(f"๐Ÿ’ฌ Miku: {answer[:150]}...") - - # Basic heuristic: check if answer contains likely keywords - keywords = { - "What is my name?": ["Sarah", "Chen"], - "How old am I?": ["28"], - "Where do I work?": ["Google", "data scientist"], - "What's my favorite color?": ["blue"], - "What am I allergic to?": ["peanut"] - } - - if any(kw.lower() in answer.lower() for kw in keywords[query]): - print(" โœ“ Correct recall!") - correct_recalls += 1 - else: - print(" โš ๏ธ May not have recalled correctly") - +print(f" Triggering consolidation (timeout={CONSOLIDATION_TIMEOUT}s)...") +t0 = time.time() +resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT) +elapsed = time.time() - t0 + +if "error" in resp: + print(f" WARN - HTTP issue: {resp['error']} ({elapsed:.0f}s)") + print(" Waiting 60s for background completion...") + time.sleep(60) +else: + print(f" Completed in {elapsed:.1f}s") + content = resp.get("content", "") + print(f" Response: {content[:120]}...") + +time.sleep(3) + +declarative_after = qdrant_count("declarative") +new_facts = declarative_after - declarative_start +print(f"\n Declarative facts: {declarative_start} -> {declarative_after} (+{new_facts})") + +results["consolidation"] = new_facts >= 5 +print(f" {'PASS' if results['consolidation'] else 'FAIL'} - {'>=5 facts' if results['consolidation'] else f'only {new_facts}'}") + +all_facts = qdrant_scroll("declarative") +print(f"\n All declarative facts ({len(all_facts)}):") +for i, f in enumerate(all_facts, 1): + content = f["payload"]["page_content"] + meta = f["payload"].get("metadata", {}) + source = meta.get("source", "?") + ftype = meta.get("fact_type", "?") + print(f" {i}. [{source}|{ftype}] {content}") + +# TEST 6: Duplicate Detection +section("TEST 6: Duplicate Detection (2nd consolidation)") + +facts_before_2nd = qdrant_count("declarative") +print(f" Facts before: {facts_before_2nd}") +print(f" Running consolidation again...") + +resp = send_message("consolidate now", timeout=CONSOLIDATION_TIMEOUT) +time.sleep(3) + +facts_after_2nd = qdrant_count("declarative") +new_dupes = facts_after_2nd - facts_before_2nd +print(f" Facts after: {facts_after_2nd} (+{new_dupes})") + +results["dedup"] = new_dupes <= 2 +print(f" {'PASS' if results['dedup'] else 'FAIL'} - {new_dupes} new facts (<=2 expected)") + +# TEST 7: Fact Recall +section("TEST 7: Fact Recall via Natural Language") + +queries = { + "What is my name?": ["sarah", "chen"], + "How old am I?": ["28"], + "Where do I live?": ["seattle"], + "Where do I work?": ["microsoft", "software engineer"], + "What am I allergic to?": ["peanut"], +} + +correct = 0 +for question, keywords in queries.items(): + resp = send_message(question) + answer = resp.get("content", "") + hit = any(kw.lower() in answer.lower() for kw in keywords) + if hit: + correct += 1 + icon = "OK" if hit else "??" + print(f" {icon} Q: {question}") + print(f" A: {answer[:150]}") time.sleep(1) -print(f"\nโœ… Fact recall accuracy: {correct_recalls}/{len(queries)} ({correct_recalls/len(queries)*100:.0f}%)") +accuracy = correct / len(queries) * 100 +results["recall"] = correct >= 3 +print(f"\n Recall: {correct}/{len(queries)} ({accuracy:.0f}%)") +print(f" {'PASS' if results['recall'] else 'FAIL'} (threshold: >=3)") -# TEST 6: Conversation History Recall -print("\n๐Ÿ“‹ TEST 6: Conversation History (Episodic) Recall") -print("-" * 70) +# FINAL SUMMARY +section("FINAL SUMMARY") -print("Asking about conversation history...") -resp = send_message("What have we talked about today?") -summary = resp["content"] -print(f"๐Ÿ’ฌ Miku's summary:\n{summary}") +total = len(results) +passed = sum(1 for v in results.values() if v) +print() +for name, ok in results.items(): + print(f" [{'PASS' if ok else 'FAIL'}] {name}") -# Final Summary -print("\n" + "=" * 70) -print("๐Ÿ“Š FINAL SUMMARY") -print("=" * 70) -print(f"โœ… Episodic memories: {get_qdrant_count('episodic')}") -print(f"โœ… Declarative facts: {declarative_count}") -print(f"โœ… Miku's messages stored: {len(miku_messages)}") -print(f"โœ… Fact recall accuracy: {correct_recalls}/{len(queries)}") +print(f"\n Score: {passed}/{total}") +print(f" Episodic: {qdrant_count('episodic')}") +print(f" Declarative: {qdrant_count('declarative')}") -# Overall verdict -if declarative_count >= 5 and correct_recalls >= 3: - print("\n๐ŸŽ‰ PIPELINE TEST: PASS") - print(" All major components working correctly!") +if passed == total: + print("\n ALL TESTS PASSED!") +elif passed >= total - 1: + print("\n MOSTLY PASSING - minor issues only") else: - print("\nโš ๏ธ PIPELINE TEST: PARTIAL PASS") - print(" Some components may need adjustment") + print("\n SOME TESTS FAILED - review above") print("\n" + "=" * 70)