# Cognee Long-Term Memory Integration Plan ## Executive Summary **Goal**: Add long-term memory capabilities to Miku using Cognee while keeping the existing fast, JSON-based short-term system. **Strategy**: Hybrid two-tier memory architecture - **Tier 1 (Hot)**: Current system - 8 messages in-memory, JSON configs (0-5ms latency) - **Tier 2 (Cold)**: Cognee - Long-term knowledge graph + vectors (50-200ms latency) **Result**: Best of both worlds - fast responses with deep memory when needed. --- ## Architecture Overview ``` ┌─────────────────────────────────────────────────────────────┐ │ Discord Event │ │ (Message, Reaction, Presence) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────┐ │ Short-Term Memory (Fast) │ │ - Last 8 messages │ │ - Current mood │ │ - Active context │ │ Latency: ~2-5ms │ └─────────────┬───────────────┘ │ ▼ ┌────────────────┐ │ LLM Response │ └────────┬───────┘ │ ┌─────────────┴─────────────┐ │ │ ▼ ▼ ┌────────────────┐ ┌─────────────────┐ │ Send to Discord│ │ Background Job │ └────────────────┘ │ Async Ingestion │ │ to Cognee │ │ Latency: N/A │ │ (non-blocking) │ └─────────┬────────┘ │ ▼ ┌──────────────────────┐ │ Long-Term Memory │ │ (Cognee) │ │ - Knowledge graph │ │ - User preferences │ │ - Entity relations │ │ - Historical facts │ │ Query: 50-200ms │ └──────────────────────┘ ``` --- ## Performance Analysis ### Current System Baseline ```python # Short-term memory (in-memory) conversation_history.add_message(...) # ~0.1ms messages = conversation_history.format() # ~2ms JSON config read/write # ~1-3ms Total per response: ~5-10ms ``` ### Cognee Overhead (Estimated) #### 1. **Write Operations (Background - Non-blocking)** ```python # These run asynchronously AFTER Discord message is sent await cognee.add(message_text) # 20-50ms await cognee.cognify() # 100-500ms (graph processing) ``` **Impact on user**: ✅ NONE - Happens in background #### 2. **Read Operations (When querying long-term memory)** ```python # Only triggered when deep memory is needed results = await cognee.search(query) # 50-200ms ``` **Impact on user**: ⚠️ Adds 50-200ms to response time (only when used) ### Mitigation Strategies #### Strategy 1: Intelligent Query Decision (Recommended) ```python def should_query_long_term_memory(user_prompt: str, context: dict) -> bool: """ Decide if we need deep memory BEFORE querying Cognee. Fast heuristic checks (< 1ms). """ # Triggers for long-term memory: triggers = [ "remember when", "you said", "last week", "last month", "you told me", "what did i say about", "do you recall", "preference", "favorite", ] prompt_lower = user_prompt.lower() # 1. Explicit memory queries if any(trigger in prompt_lower for trigger in triggers): return True # 2. Short-term context is insufficient if context.get('messages_in_history', 0) < 3: return False # Not enough history to need deep search # 3. Question about user preferences if '?' in user_prompt and any(word in prompt_lower for word in ['like', 'prefer', 'think']): return True return False ``` #### Strategy 2: Parallel Processing ```python async def query_with_hybrid_memory(prompt, user_id, guild_id): """Query both memory tiers in parallel when needed.""" # Always get short-term (fast) short_term = conversation_history.format_for_llm(channel_id) # Decide if we need long-term if should_query_long_term_memory(prompt, context): # Query both in parallel long_term_task = asyncio.create_task(cognee.search(prompt)) # Don't wait - continue with short-term # Only await long-term if it's ready quickly try: long_term = await asyncio.wait_for(long_term_task, timeout=0.15) # 150ms max except asyncio.TimeoutError: long_term = None # Fallback - proceed without deep memory else: long_term = None # Combine contexts combined_context = merge_contexts(short_term, long_term) return await llm_query(combined_context) ``` #### Strategy 3: Caching Layer ```python from functools import lru_cache from datetime import datetime, timedelta # Cache frequent queries for 5 minutes _cognee_cache = {} _cache_ttl = timedelta(minutes=5) async def cached_cognee_search(query: str): """Cache Cognee results to avoid repeated queries.""" cache_key = query.lower().strip() now = datetime.now() if cache_key in _cognee_cache: result, timestamp = _cognee_cache[cache_key] if now - timestamp < _cache_ttl: print(f"🎯 Cache hit for: {query[:50]}...") return result # Cache miss - query Cognee result = await cognee.search(query) _cognee_cache[cache_key] = (result, now) return result ``` #### Strategy 4: Tiered Response Times ```python # Set different response strategies based on context RESPONSE_MODES = { "instant": { "use_long_term": False, "max_latency": 100, # ms "contexts": ["reactions", "quick_replies"] }, "normal": { "use_long_term": "conditional", # Only if triggers match "max_latency": 300, # ms "contexts": ["server_messages", "dm_casual"] }, "deep": { "use_long_term": True, "max_latency": 1000, # ms "contexts": ["dm_deep_conversation", "user_questions"] } } ``` --- ## Integration Points ### 1. Message Ingestion (Background - Non-blocking) **Location**: `bot/bot.py` - `on_message` event ```python @globals.client.event async def on_message(message): # ... existing message handling ... # After Miku responds, ingest to Cognee (non-blocking) asyncio.create_task(ingest_to_cognee( message=message, response=miku_response, guild_id=message.guild.id if message.guild else None )) # Continue immediately - don't wait ``` **Implementation**: New file `bot/utils/cognee_integration.py` ```python async def ingest_to_cognee(message, response, guild_id): """ Background task to add conversation to long-term memory. Non-blocking - runs after Discord message is sent. """ try: # Build rich context document doc = { "timestamp": datetime.now().isoformat(), "user_id": str(message.author.id), "user_name": message.author.display_name, "guild_id": str(guild_id) if guild_id else None, "message": message.content, "miku_response": response, "mood": get_current_mood(guild_id), } # Add to Cognee (async) await cognee.add([ f"User {doc['user_name']} said: {doc['message']}", f"Miku responded: {doc['miku_response']}" ]) # Process into knowledge graph await cognee.cognify() print(f"✅ Ingested to Cognee: {message.id}") except Exception as e: print(f"⚠️ Cognee ingestion failed (non-critical): {e}") ``` ### 2. Query Enhancement (Conditional) **Location**: `bot/utils/llm.py` - `query_llama` function ```python async def query_llama(user_prompt, user_id, guild_id=None, ...): # Get short-term context (always) short_term = conversation_history.format_for_llm(channel_id, max_messages=8) # Check if we need long-term memory long_term_context = None if should_query_long_term_memory(user_prompt, {"guild_id": guild_id}): try: # Query Cognee with timeout long_term_context = await asyncio.wait_for( cognee_integration.search_long_term_memory(user_prompt, user_id, guild_id), timeout=0.15 # 150ms max ) except asyncio.TimeoutError: print("⏱️ Long-term memory query timeout - proceeding without") except Exception as e: print(f"⚠️ Long-term memory error: {e}") # Build messages for LLM messages = short_term # Always use short-term # Inject long-term context if available if long_term_context: messages.insert(0, { "role": "system", "content": f"[Long-term memory context]: {long_term_context}" }) # ... rest of existing LLM query code ... ``` ### 3. Autonomous Actions Integration **Location**: `bot/utils/autonomous.py` ```python async def autonomous_tick_v2(guild_id: int): """Enhanced with long-term memory awareness.""" # Get decision from autonomous engine (existing fast logic) action_type = autonomous_engine.should_take_action(guild_id) if action_type is None: return # ENHANCEMENT: Check if action should use long-term context context = {} if action_type in ["engage_user", "join_conversation"]: # Get recent server activity from Cognee try: context["recent_topics"] = await asyncio.wait_for( cognee_integration.get_recent_topics(guild_id, hours=24), timeout=0.1 # 100ms max - this is background ) except asyncio.TimeoutError: pass # Proceed without - autonomous actions are best-effort # Execute action with enhanced context if action_type == "engage_user": await miku_engage_random_user_for_server(guild_id, context=context) # ... rest of existing action execution ... ``` ### 4. User Preference Tracking **New Feature**: Learn user preferences over time ```python # bot/utils/cognee_integration.py async def extract_and_store_preferences(message, response): """ Extract user preferences from conversations and store in Cognee. Runs in background - doesn't block responses. """ # Simple heuristic extraction (can be enhanced with LLM later) preferences = extract_preferences_simple(message.content) if preferences: for pref in preferences: await cognee.add([{ "type": "user_preference", "user_id": str(message.author.id), "preference": pref["category"], "value": pref["value"], "context": message.content[:200], "timestamp": datetime.now().isoformat() }]) def extract_preferences_simple(text: str) -> list: """Fast pattern matching for common preferences.""" prefs = [] text_lower = text.lower() # Pattern: "I love/like/prefer X" if "i love" in text_lower or "i like" in text_lower: # Extract what they love/like # ... simple parsing logic ... pass # Pattern: "my favorite X is Y" if "favorite" in text_lower: # ... extraction logic ... pass return prefs ``` --- ## Docker Compose Integration ### Add Cognee Services ```yaml # Add to docker-compose.yml cognee-db: image: postgres:15-alpine container_name: cognee-db environment: - POSTGRES_USER=cognee - POSTGRES_PASSWORD=cognee_pass - POSTGRES_DB=cognee volumes: - cognee_postgres_data:/var/lib/postgresql/data restart: unless-stopped profiles: - cognee # Optional profile - enable with --profile cognee cognee-neo4j: image: neo4j:5-community container_name: cognee-neo4j environment: - NEO4J_AUTH=neo4j/cognee_pass - NEO4J_PLUGINS=["apoc"] ports: - "7474:7474" # Neo4j Browser (optional) - "7687:7687" # Bolt protocol volumes: - cognee_neo4j_data:/data restart: unless-stopped profiles: - cognee volumes: cognee_postgres_data: cognee_neo4j_data: ``` ### Update Miku Bot Service ```yaml miku-bot: # ... existing config ... environment: # ... existing env vars ... - COGNEE_ENABLED=true - COGNEE_DB_URL=postgresql://cognee:cognee_pass@cognee-db:5432/cognee - COGNEE_NEO4J_URL=bolt://cognee-neo4j:7687 - COGNEE_NEO4J_USER=neo4j - COGNEE_NEO4J_PASSWORD=cognee_pass depends_on: - llama-swap - cognee-db - cognee-neo4j ``` --- ## Performance Benchmarks (Estimated) ### Without Cognee (Current) ``` User message → Discord event → Short-term lookup (5ms) → LLM query (2000ms) → Response Total: ~2005ms (LLM dominates) ``` ### With Cognee (Instant Mode - No long-term query) ``` User message → Discord event → Short-term lookup (5ms) → LLM query (2000ms) → Response Background: Cognee ingestion (150ms) - non-blocking Total: ~2005ms (no change - ingestion is background) ``` ### With Cognee (Deep Memory Mode - User asks about past) ``` User message → Discord event → Short-term (5ms) + Long-term query (150ms) → LLM query (2000ms) → Response Total: ~2155ms (+150ms overhead, but only when explicitly needed) ``` ### Autonomous Actions (Background) ``` Autonomous tick → Decision (5ms) → Get topics from Cognee (100ms) → Generate message (2000ms) → Post Total: ~2105ms (+100ms, but autonomous actions are already async) ``` --- ## Feature Enhancements Enabled by Cognee ### 1. User Memory ```python # User asks: "What's my favorite anime?" # Cognee searches: All messages from user mentioning "favorite" + "anime" # Returns: "You mentioned loving Steins;Gate in a conversation 3 weeks ago" ``` ### 2. Topic Trends ```python # Autonomous action: Join conversation # Cognee query: "What topics have been trending in this server this week?" # Returns: ["gaming", "anime recommendations", "music production"] # Miku: "I've noticed you all have been talking about anime a lot lately! Any good recommendations?" ``` ### 3. Relationship Tracking ```python # Knowledge graph tracks: # User A → likes → "cats" # User B → dislikes → "cats" # User A → friends_with → User B # When Miku talks to both: Avoids cat topics to prevent friction ``` ### 4. Event Recall ```python # User: "Remember when we talked about that concert?" # Cognee searches: Conversations with this user + keyword "concert" # Returns: "Yes! You were excited about the Miku Expo in Los Angeles in July!" ``` ### 5. Mood Pattern Analysis ```python # Query Cognee: "When does this server get most active?" # Returns: "Evenings between 7-10 PM, discussions about gaming" # Autonomous engine: Schedule more engagement during peak times ``` --- ## Implementation Phases ### Phase 1: Foundation (Week 1) - [ ] Add Cognee to `requirements.txt` - [ ] Create `bot/utils/cognee_integration.py` - [ ] Set up Docker services (PostgreSQL, Neo4j) - [ ] Basic initialization and health checks - [ ] Test ingestion in background (non-blocking) ### Phase 2: Basic Integration (Week 2) - [ ] Add background ingestion to `on_message` - [ ] Implement `should_query_long_term_memory()` heuristics - [ ] Add conditional long-term queries to `query_llama()` - [ ] Add caching layer - [ ] Monitor latency impact ### Phase 3: Advanced Features (Week 3) - [ ] User preference extraction - [ ] Topic trend analysis for autonomous actions - [ ] Relationship tracking between users - [ ] Event recall capabilities ### Phase 4: Optimization (Week 4) - [ ] Fine-tune timeout thresholds - [ ] Implement smart caching strategies - [ ] Add Cognee query statistics to dashboard - [ ] Performance benchmarking and tuning --- ## Configuration Management ### Keep JSON Files (Hot Config) ```python # These remain JSON for instant access: - servers_config.json # Current mood, sleep state, settings - autonomous_context.json # Real-time autonomous state - blocked_users.json # Security/moderation - figurine_subscribers.json # Active subscriptions # Reason: Need instant read/write, changed frequently ``` ### Migrate to Cognee (Historical Data) ```python # These can move to Cognee over time: - Full DM history (dms/*.json) → Cognee knowledge graph - Profile picture metadata → Cognee (searchable by mood) - Reaction logs → Cognee (analyze patterns) # Reason: Historical, queried infrequently, benefit from graph relationships ``` ### Hybrid Approach ```json // servers_config.json - Keep recent data { "guild_id": 123, "current_mood": "bubbly", "is_sleeping": false, "recent_topics": ["cached", "from", "cognee"] // Cache Cognee query results } ``` --- ## Monitoring & Observability ### Add Performance Tracking ```python # bot/utils/cognee_integration.py import time from dataclasses import dataclass from typing import Optional @dataclass class CogneeMetrics: """Track Cognee performance.""" total_queries: int = 0 cache_hits: int = 0 cache_misses: int = 0 avg_query_time: float = 0.0 timeouts: int = 0 errors: int = 0 background_ingestions: int = 0 cognee_metrics = CogneeMetrics() async def search_long_term_memory(query: str, user_id: str, guild_id: Optional[int]) -> str: """Search with metrics tracking.""" start = time.time() cognee_metrics.total_queries += 1 try: result = await cached_cognee_search(query) elapsed = time.time() - start cognee_metrics.avg_query_time = ( (cognee_metrics.avg_query_time * (cognee_metrics.total_queries - 1) + elapsed) / cognee_metrics.total_queries ) return result except asyncio.TimeoutError: cognee_metrics.timeouts += 1 raise except Exception as e: cognee_metrics.errors += 1 raise ``` ### Dashboard Integration Add to `bot/api.py`: ```python @app.get("/cognee/metrics") def get_cognee_metrics(): """Get Cognee performance metrics.""" from utils.cognee_integration import cognee_metrics return { "enabled": globals.COGNEE_ENABLED, "total_queries": cognee_metrics.total_queries, "cache_hit_rate": ( cognee_metrics.cache_hits / cognee_metrics.total_queries if cognee_metrics.total_queries > 0 else 0 ), "avg_query_time_ms": cognee_metrics.avg_query_time * 1000, "timeouts": cognee_metrics.timeouts, "errors": cognee_metrics.errors, "background_ingestions": cognee_metrics.background_ingestions } ``` --- ## Risk Mitigation ### Risk 1: Cognee Service Failure **Mitigation**: Graceful degradation ```python if not cognee_available(): # Fall back to short-term memory only # Bot continues functioning normally return short_term_context_only ``` ### Risk 2: Increased Latency **Mitigation**: Aggressive timeouts + caching ```python MAX_COGNEE_QUERY_TIME = 150 # ms # If timeout, proceed without long-term context ``` ### Risk 3: Storage Growth **Mitigation**: Data retention policies ```python # Auto-cleanup old data from Cognee # Keep: Last 90 days of conversations # Archive: Older data to cold storage ``` ### Risk 4: Context Pollution **Mitigation**: Relevance scoring ```python # Only inject Cognee results if confidence > 0.7 if cognee_result.score < 0.7: # Too irrelevant - don't add to context pass ``` --- ## Cost-Benefit Analysis ### Benefits ✅ **Deep Memory**: Recall conversations from weeks/months ago ✅ **User Preferences**: Remember what users like/dislike ✅ **Smarter Autonomous**: Context-aware engagement ✅ **Relationship Graph**: Understand user dynamics ✅ **No User Impact**: Background ingestion, conditional queries ✅ **Scalable**: Handles unlimited conversation history ### Costs ⚠️ **Complexity**: +2 services (PostgreSQL, Neo4j) ⚠️ **Storage**: ~100MB-1GB per month (depending on activity) ⚠️ **Latency**: +50-150ms when querying (conditional) ⚠️ **Memory**: +500MB RAM for Neo4j, +200MB for PostgreSQL ⚠️ **Maintenance**: Additional service to monitor ### Verdict ✅ **Worth it if**: - Your servers have active, long-running conversations - Users want Miku to remember personal details - You want smarter autonomous behavior based on trends ❌ **Skip it if**: - Conversations are mostly one-off interactions - Current 8-message context is sufficient - Hardware resources are limited --- ## Quick Start Commands ### 1. Enable Cognee ```bash # Start with Cognee services docker-compose --profile cognee up -d # Check Cognee health docker-compose logs cognee-neo4j docker-compose logs cognee-db ``` ### 2. Test Integration ```python # In Discord, test long-term memory: User: "Remember that I love cats" Miku: "Got it! I'll remember that you love cats! 🐱" # Later... User: "What do I love?" Miku: "You told me you love cats! 🐱" ``` ### 3. Monitor Performance ```bash # Check metrics via API curl http://localhost:3939/cognee/metrics # View Cognee dashboard (optional) # Open browser: http://localhost:7474 (Neo4j Browser) ``` --- ## Conclusion **Recommended Approach**: Implement Phase 1-2 first, then evaluate based on real usage patterns. **Expected Latency Impact**: - 95% of messages: **0ms** (background ingestion only) - 5% of messages: **+50-150ms** (when long-term memory explicitly needed) **Key Success Factors**: 1. ✅ Keep JSON configs for hot data 2. ✅ Background ingestion (non-blocking) 3. ✅ Conditional long-term queries only 4. ✅ Aggressive timeouts (150ms max) 5. ✅ Caching layer for repeated queries 6. ✅ Graceful degradation on failure This hybrid approach gives you deep memory capabilities without sacrificing the snappy response times users expect from Discord bots.