#!/usr/bin/env python3 """ Direct consolidation test - call the consolidation function directly to validate the logic without relying on hooks. """ import requests import time import json CAT_URL = "http://localhost:1865" def get_unconsolidated_memories(): """Query Qdrant directly to see unconsolidated memories""" try: # Use Cat's admin API to query memory response = requests.get(f"{CAT_URL}/memory/collections") if response.status_code == 200: collections = response.json() print(f"āœ… Memory collections: {json.dumps(collections, indent=2)}") else: print(f"āŒ Failed to get collections: {response.status_code}") except Exception as e: print(f"āŒ Error querying memory: {e}") def trigger_consolidation_via_api(): """Try triggering consolidation via the message API""" print("\nšŸ”§ Attempting to trigger consolidation...") response = requests.post( f"{CAT_URL}/message", headers={"Content-Type": "application/json"}, json={ "text": "consolidate now", "user_id": "admin_test" } ) if response.status_code == 200: result = response.json() print(f"āœ… Response: {result.get('content', '')[:200]}") return True else: print(f"āŒ Failed: {response.status_code}") return False def check_memories_after_consolidation(): """Check if consolidation actually ran""" print("\nšŸ“Š Checking memory state...") # Send a query that should recall memories response = requests.post( f"{CAT_URL}/message", headers={"Content-Type": "application/json"}, json={ "text": "What do you know about me? Tell me everything you remember.", "user_id": "test_alice" } ) if response.status_code == 200: result = response.json() content = result.get('content', '') memory = result.get('why', {}).get('memory', {}) episodic = memory.get('episodic', []) print(f"\nšŸ¤– Miku's response:\n{content}\n") print(f"šŸ“ Episodic memories recalled: {len(episodic)}") # Check what memories exist for mem in episodic[:5]: print(f" - {mem['page_content'][:80]}...") return episodic else: print(f"āŒ Failed to query memories: {response.status_code}") return [] def main(): print("=" * 70) print("CONSOLIDATION DIRECT TEST") print("=" * 70) # Step 1: Check current memory state print("\nšŸ“‹ STEP 1: Check memory collections") get_unconsolidated_memories() # Step 2: Check memories before consolidation print("\nšŸ“‹ STEP 2: Query memories before consolidation") memories_before = check_memories_after_consolidation() print(f"\nšŸ“Š Memories BEFORE consolidation: {len(memories_before)}") # Step 3: Trigger consolidation print("\nšŸ“‹ STEP 3: Trigger consolidation") triggered = trigger_consolidation_via_api() if triggered: # Wait for consolidation to complete print("\nā³ Waiting 5 seconds for consolidation to process...") time.sleep(5) # Step 4: Check memories after consolidation print("\nšŸ“‹ STEP 4: Query memories after consolidation") memories_after = check_memories_after_consolidation() print(f"\nšŸ“Š Memories AFTER consolidation: {len(memories_after)}") # Compare print("\n" + "=" * 70) print("RESULTS:") print("=" * 70) print(f"Memories before: {len(memories_before)}") print(f"Memories after: {len(memories_after)}") print(f"Deleted: {len(memories_before) - len(memories_after)}") if len(memories_after) < len(memories_before): print("\nāœ… SUCCESS! Consolidation deleted some memories!") else: print("\nāš ļø No memories were deleted. Consolidation may not have run.") else: print("\nāŒ Failed to trigger consolidation") print("\n" + "=" * 70) if __name__ == "__main__": main()