132 lines
4.1 KiB
Python
132 lines
4.1 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Direct consolidation test - call the consolidation function directly
|
||
|
|
to validate the logic without relying on hooks.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import requests
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
|
||
|
|
CAT_URL = "http://localhost:1865"
|
||
|
|
|
||
|
|
|
||
|
|
def get_unconsolidated_memories():
|
||
|
|
"""Query Qdrant directly to see unconsolidated memories"""
|
||
|
|
try:
|
||
|
|
# Use Cat's admin API to query memory
|
||
|
|
response = requests.get(f"{CAT_URL}/memory/collections")
|
||
|
|
if response.status_code == 200:
|
||
|
|
collections = response.json()
|
||
|
|
print(f"✅ Memory collections: {json.dumps(collections, indent=2)}")
|
||
|
|
else:
|
||
|
|
print(f"❌ Failed to get collections: {response.status_code}")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ Error querying memory: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
def trigger_consolidation_via_api():
|
||
|
|
"""Try triggering consolidation via the message API"""
|
||
|
|
print("\n🔧 Attempting to trigger consolidation...")
|
||
|
|
|
||
|
|
response = requests.post(
|
||
|
|
f"{CAT_URL}/message",
|
||
|
|
headers={"Content-Type": "application/json"},
|
||
|
|
json={
|
||
|
|
"text": "consolidate now",
|
||
|
|
"user_id": "admin_test"
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
result = response.json()
|
||
|
|
print(f"✅ Response: {result.get('content', '')[:200]}")
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
print(f"❌ Failed: {response.status_code}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def check_memories_after_consolidation():
|
||
|
|
"""Check if consolidation actually ran"""
|
||
|
|
print("\n📊 Checking memory state...")
|
||
|
|
|
||
|
|
# Send a query that should recall memories
|
||
|
|
response = requests.post(
|
||
|
|
f"{CAT_URL}/message",
|
||
|
|
headers={"Content-Type": "application/json"},
|
||
|
|
json={
|
||
|
|
"text": "What do you know about me? Tell me everything you remember.",
|
||
|
|
"user_id": "test_alice"
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
result = response.json()
|
||
|
|
content = result.get('content', '')
|
||
|
|
memory = result.get('why', {}).get('memory', {})
|
||
|
|
episodic = memory.get('episodic', [])
|
||
|
|
|
||
|
|
print(f"\n🤖 Miku's response:\n{content}\n")
|
||
|
|
print(f"📝 Episodic memories recalled: {len(episodic)}")
|
||
|
|
|
||
|
|
# Check what memories exist
|
||
|
|
for mem in episodic[:5]:
|
||
|
|
print(f" - {mem['page_content'][:80]}...")
|
||
|
|
|
||
|
|
return episodic
|
||
|
|
else:
|
||
|
|
print(f"❌ Failed to query memories: {response.status_code}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print("=" * 70)
|
||
|
|
print("CONSOLIDATION DIRECT TEST")
|
||
|
|
print("=" * 70)
|
||
|
|
|
||
|
|
# Step 1: Check current memory state
|
||
|
|
print("\n📋 STEP 1: Check memory collections")
|
||
|
|
get_unconsolidated_memories()
|
||
|
|
|
||
|
|
# Step 2: Check memories before consolidation
|
||
|
|
print("\n📋 STEP 2: Query memories before consolidation")
|
||
|
|
memories_before = check_memories_after_consolidation()
|
||
|
|
print(f"\n📊 Memories BEFORE consolidation: {len(memories_before)}")
|
||
|
|
|
||
|
|
# Step 3: Trigger consolidation
|
||
|
|
print("\n📋 STEP 3: Trigger consolidation")
|
||
|
|
triggered = trigger_consolidation_via_api()
|
||
|
|
|
||
|
|
if triggered:
|
||
|
|
# Wait for consolidation to complete
|
||
|
|
print("\n⏳ Waiting 5 seconds for consolidation to process...")
|
||
|
|
time.sleep(5)
|
||
|
|
|
||
|
|
# Step 4: Check memories after consolidation
|
||
|
|
print("\n📋 STEP 4: Query memories after consolidation")
|
||
|
|
memories_after = check_memories_after_consolidation()
|
||
|
|
print(f"\n📊 Memories AFTER consolidation: {len(memories_after)}")
|
||
|
|
|
||
|
|
# Compare
|
||
|
|
print("\n" + "=" * 70)
|
||
|
|
print("RESULTS:")
|
||
|
|
print("=" * 70)
|
||
|
|
print(f"Memories before: {len(memories_before)}")
|
||
|
|
print(f"Memories after: {len(memories_after)}")
|
||
|
|
print(f"Deleted: {len(memories_before) - len(memories_after)}")
|
||
|
|
|
||
|
|
if len(memories_after) < len(memories_before):
|
||
|
|
print("\n✅ SUCCESS! Consolidation deleted some memories!")
|
||
|
|
else:
|
||
|
|
print("\n⚠️ No memories were deleted. Consolidation may not have run.")
|
||
|
|
else:
|
||
|
|
print("\n❌ Failed to trigger consolidation")
|
||
|
|
|
||
|
|
print("\n" + "=" * 70)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|