Files
miku-discord/bot/api.py
koko210Serve c62b6817c4 Fix image generation UI: add image preview, serving endpoint, and proper error handling
- Fixed function name mismatch: generateImage() -> generateManualImage()
- Fixed status div ID mismatch in HTML
- Added /image/view/{filename} endpoint to serve generated images from ComfyUI output
- Implemented proper image preview with DOM element creation instead of innerHTML
- Added robust error handling with onload/onerror event handlers
- Added debug logging to image serving endpoint for troubleshooting
- Images now display directly in the Web UI after generation
2025-12-13 00:36:35 +02:00

1741 lines
74 KiB
Python

# api.py
from fastapi import (
FastAPI,
Query,
Request, UploadFile,
File,
Form
)
from fastapi.responses import StreamingResponse
from typing import List, Optional
from pydantic import BaseModel
import globals
from server_manager import server_manager
from utils.conversation_history import conversation_history
from commands.actions import (
force_sleep,
wake_up,
set_mood,
reset_mood,
check_mood,
calm_miku,
reset_conversation,
send_bedtime_now
)
from utils.autonomous import (
miku_autonomous_tick,
miku_say_something_general,
miku_engage_random_user,
share_miku_tweet,
handle_custom_prompt,
miku_detect_and_join_conversation
)
import asyncio
import nest_asyncio
import subprocess
import io
import discord
import aiofiles
import aiohttp
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, PlainTextResponse
import os
import json
from utils.figurine_notifier import (
load_subscribers as figurine_load_subscribers,
add_subscriber as figurine_add_subscriber,
remove_subscriber as figurine_remove_subscriber,
send_figurine_dm_to_all_subscribers,
send_figurine_dm_to_single_user
)
from utils.dm_logger import dm_logger
nest_asyncio.apply()
app = FastAPI()
# Serve static folder
app.mount("/static", StaticFiles(directory="static"), name="static")
# ========== Models ==========
class MoodSetRequest(BaseModel):
mood: str
class ConversationResetRequest(BaseModel):
user_id: str
class CustomPromptRequest(BaseModel):
prompt: str
class ServerConfigRequest(BaseModel):
guild_id: int
guild_name: str
autonomous_channel_id: int
autonomous_channel_name: str
bedtime_channel_ids: List[int] = None
enabled_features: List[str] = None
# ========== Routes ==========
@app.get("/")
def read_index():
return FileResponse("static/index.html")
@app.get("/logs")
def get_logs():
try:
# Read last 100 lines of the log file
with open("/app/bot.log", "r", encoding="utf-8") as f:
lines = f.readlines()
last_100 = lines[-100:] if len(lines) >= 100 else lines
return "".join(last_100)
except Exception as e:
return f"Error reading log file: {e}"
@app.get("/prompt")
def get_last_prompt():
return {"prompt": globals.LAST_FULL_PROMPT or "No prompt has been issued yet."}
@app.get("/mood")
def get_current_mood():
return {"mood": globals.DM_MOOD, "description": globals.DM_MOOD_DESCRIPTION}
@app.post("/mood")
async def set_mood_endpoint(data: MoodSetRequest):
# This endpoint now operates on DM_MOOD
from utils.moods import MOOD_EMOJIS
if data.mood not in MOOD_EMOJIS:
return {"status": "error", "message": f"Mood '{data.mood}' not recognized. Available moods: {', '.join(MOOD_EMOJIS.keys())}"}
# Update DM mood (DMs don't have nicknames, so no nickname update needed)
globals.DM_MOOD = data.mood
from utils.moods import load_mood_description
globals.DM_MOOD_DESCRIPTION = load_mood_description(data.mood)
return {"status": "ok", "new_mood": data.mood}
@app.post("/mood/reset")
async def reset_mood_endpoint():
# Reset DM mood to neutral (DMs don't have nicknames, so no nickname update needed)
globals.DM_MOOD = "neutral"
from utils.moods import load_mood_description
globals.DM_MOOD_DESCRIPTION = load_mood_description("neutral")
return {"status": "ok", "new_mood": "neutral"}
@app.post("/mood/calm")
def calm_miku_endpoint():
# Calm DM mood to neutral (DMs don't have nicknames, so no nickname update needed)
globals.DM_MOOD = "neutral"
from utils.moods import load_mood_description
globals.DM_MOOD_DESCRIPTION = load_mood_description("neutral")
return {"status": "ok", "message": "Miku has been calmed down"}
# ========== Per-Server Mood Management ==========
@app.get("/servers/{guild_id}/mood")
def get_server_mood(guild_id: int):
"""Get current mood for a specific server"""
mood_name, mood_description = server_manager.get_server_mood(guild_id)
return {
"guild_id": guild_id,
"mood": mood_name,
"description": mood_description
}
@app.post("/servers/{guild_id}/mood")
async def set_server_mood_endpoint(guild_id: int, data: MoodSetRequest):
"""Set mood for a specific server"""
# Check if server exists
if guild_id not in server_manager.servers:
print(f"🎭 API: Server {guild_id} not found in server_manager.servers")
return {"status": "error", "message": "Server not found"}
# Check if mood is valid
from utils.moods import MOOD_EMOJIS
if data.mood not in MOOD_EMOJIS:
print(f"🎭 API: Mood '{data.mood}' not found in MOOD_EMOJIS. Available moods: {list(MOOD_EMOJIS.keys())}")
return {"status": "error", "message": f"Mood '{data.mood}' not recognized. Available moods: {', '.join(MOOD_EMOJIS.keys())}"}
success = server_manager.set_server_mood(guild_id, data.mood)
print(f"🎭 API: Server mood set result: {success}")
if success:
# V2: Notify autonomous engine of mood change
try:
from utils.autonomous import on_mood_change
on_mood_change(guild_id, data.mood)
except Exception as e:
print(f"⚠️ API: Failed to notify autonomous engine of mood change: {e}")
# Update the nickname for this server
from utils.moods import update_server_nickname
print(f"🎭 API: Updating nickname for server {guild_id}")
globals.client.loop.create_task(update_server_nickname(guild_id))
return {"status": "ok", "new_mood": data.mood, "guild_id": guild_id}
print(f"🎭 API: set_server_mood returned False for unknown reason")
return {"status": "error", "message": "Failed to set server mood"}
@app.post("/servers/{guild_id}/mood/reset")
async def reset_server_mood_endpoint(guild_id: int):
"""Reset mood to neutral for a specific server"""
print(f"🎭 API: Resetting mood for server {guild_id} to neutral")
# Check if server exists
if guild_id not in server_manager.servers:
print(f"🎭 API: Server {guild_id} not found in server_manager.servers")
return {"status": "error", "message": "Server not found"}
print(f"🎭 API: Server validation passed, calling set_server_mood")
success = server_manager.set_server_mood(guild_id, "neutral")
print(f"🎭 API: Server mood reset result: {success}")
if success:
# V2: Notify autonomous engine of mood change
try:
from utils.autonomous import on_mood_change
on_mood_change(guild_id, "neutral")
except Exception as e:
print(f"⚠️ API: Failed to notify autonomous engine of mood reset: {e}")
# Update the nickname for this server
from utils.moods import update_server_nickname
print(f"🎭 API: Updating nickname for server {guild_id}")
globals.client.loop.create_task(update_server_nickname(guild_id))
return {"status": "ok", "new_mood": "neutral", "guild_id": guild_id}
print(f"🎭 API: set_server_mood returned False for unknown reason")
return {"status": "error", "message": "Failed to reset server mood"}
@app.get("/servers/{guild_id}/mood/state")
def get_server_mood_state(guild_id: int):
"""Get complete mood state for a specific server"""
mood_state = server_manager.get_server_mood_state(guild_id)
if mood_state:
return {"status": "ok", "guild_id": guild_id, "mood_state": mood_state}
return {"status": "error", "message": "Server not found"}
@app.post("/conversation/reset")
def reset_convo(data: ConversationResetRequest):
reset_conversation(data.user_id)
return {"status": "ok", "message": "Conversation reset"}
@app.post("/sleep")
async def force_sleep_endpoint():
await force_sleep()
return {"status": "ok", "message": "Miku is now sleeping"}
@app.post("/wake")
async def wake_up_endpoint():
await wake_up()
return {"status": "ok", "message": "Miku is now awake"}
@app.post("/bedtime")
async def bedtime_endpoint(guild_id: int = None):
# If guild_id is provided, send bedtime reminder only to that server
# If no guild_id, send to all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.scheduled import send_bedtime_reminder_for_server
globals.client.loop.create_task(send_bedtime_reminder_for_server(guild_id, globals.client))
return {"status": "ok", "message": f"Bedtime reminder queued for server {guild_id}"}
else:
# Send to all servers (legacy behavior)
from utils.scheduled import send_bedtime_now
globals.client.loop.create_task(send_bedtime_now())
return {"status": "ok", "message": "Bedtime reminder queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/general")
async def trigger_autonomous_general(guild_id: int = None):
# If guild_id is provided, send autonomous message only to that server
# If no guild_id, send to all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import miku_say_something_general_for_server
globals.client.loop.create_task(miku_say_something_general_for_server(guild_id))
return {"status": "ok", "message": f"Autonomous general message queued for server {guild_id}"}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import miku_say_something_general
globals.client.loop.create_task(miku_say_something_general())
return {"status": "ok", "message": "Autonomous general message queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/engage")
async def trigger_autonomous_engage_user(guild_id: int = None):
# If guild_id is provided, send autonomous engagement only to that server
# If no guild_id, send to all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import miku_engage_random_user_for_server
globals.client.loop.create_task(miku_engage_random_user_for_server(guild_id))
return {"status": "ok", "message": f"Autonomous user engagement queued for server {guild_id}"}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import miku_engage_random_user
globals.client.loop.create_task(miku_engage_random_user())
return {"status": "ok", "message": "Autonomous user engagement queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/tweet")
async def trigger_autonomous_tweet(guild_id: int = None):
# If guild_id is provided, send tweet only to that server
# If no guild_id, send to all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import share_miku_tweet_for_server
globals.client.loop.create_task(share_miku_tweet_for_server(guild_id))
return {"status": "ok", "message": f"Autonomous tweet sharing queued for server {guild_id}"}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import share_miku_tweet
globals.client.loop.create_task(share_miku_tweet())
return {"status": "ok", "message": "Autonomous tweet sharing queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/custom")
async def custom_autonomous_message(req: CustomPromptRequest, guild_id: int = None):
# If guild_id is provided, send custom prompt only to that server
# If no guild_id, send to all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import handle_custom_prompt_for_server
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(handle_custom_prompt_for_server(guild_id, req.prompt))
return {"status": "ok", "message": f"Custom autonomous message queued for server {guild_id}"}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import handle_custom_prompt
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(handle_custom_prompt(req.prompt))
return {"status": "ok", "message": "Custom autonomous message queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/reaction")
async def trigger_autonomous_reaction(guild_id: int = None):
# If guild_id is provided, trigger reaction only for that server
# If no guild_id, trigger for all servers (legacy behavior)
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Trigger for specific server only (force=True bypasses 50% chance)
from utils.autonomous import miku_autonomous_reaction_for_server
globals.client.loop.create_task(miku_autonomous_reaction_for_server(guild_id, force=True))
return {"status": "ok", "message": f"Autonomous reaction queued for server {guild_id}"}
else:
# Trigger for all servers (legacy behavior, force=True bypasses 50% chance)
from utils.autonomous import miku_autonomous_reaction
globals.client.loop.create_task(miku_autonomous_reaction(force=True))
return {"status": "ok", "message": "Autonomous reaction queued for all servers"}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/join-conversation")
async def trigger_detect_and_join_conversation(guild_id: int = None):
# If guild_id is provided, detect and join conversation only for that server
# If no guild_id, trigger for all servers
print(f"🔍 [API] Join conversation endpoint called with guild_id={guild_id}")
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Trigger for specific server only (force=True to bypass checks when manually triggered)
print(f"🔍 [API] Importing and calling miku_detect_and_join_conversation_for_server({guild_id}, force=True)")
from utils.autonomous import miku_detect_and_join_conversation_for_server
globals.client.loop.create_task(miku_detect_and_join_conversation_for_server(guild_id, force=True))
return {"status": "ok", "message": f"Detect and join conversation queued for server {guild_id}"}
else:
# Trigger for all servers (force=True to bypass checks when manually triggered)
print(f"🔍 [API] Importing and calling miku_detect_and_join_conversation() for all servers")
from utils.autonomous import miku_detect_and_join_conversation
globals.client.loop.create_task(miku_detect_and_join_conversation(force=True))
return {"status": "ok", "message": "Detect and join conversation queued for all servers"}
else:
print(f"⚠️ [API] Bot not ready: client={globals.client}, loop={globals.client.loop if globals.client else None}")
return {"status": "error", "message": "Bot not ready"}
@app.post("/profile-picture/change")
async def trigger_profile_picture_change(
guild_id: int = None,
file: UploadFile = File(None)
):
"""
Change Miku's profile picture.
If a file is provided, use it. Otherwise, search Danbooru.
"""
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Bot not ready"}
try:
from utils.profile_picture_manager import profile_picture_manager
from server_manager import server_manager
# Get mood from guild_id (if provided)
mood = None
if guild_id is not None:
mood, _ = server_manager.get_server_mood(guild_id)
else:
# Use DM mood as fallback
mood = globals.DM_MOOD
# If file provided, use it
custom_image_bytes = None
if file:
custom_image_bytes = await file.read()
print(f"🖼️ Received custom image upload ({len(custom_image_bytes)} bytes)")
# Change profile picture
result = await profile_picture_manager.change_profile_picture(
mood=mood,
custom_image_bytes=custom_image_bytes,
debug=True
)
if result["success"]:
return {
"status": "ok",
"message": "Profile picture changed successfully",
"source": result["source"],
"metadata": result.get("metadata", {})
}
else:
return {
"status": "error",
"message": result.get("error", "Unknown error"),
"source": result["source"]
}
except Exception as e:
print(f"⚠️ Error in profile picture API: {e}")
import traceback
traceback.print_exc()
return {"status": "error", "message": f"Unexpected error: {str(e)}"}
@app.get("/profile-picture/metadata")
async def get_profile_picture_metadata():
"""Get metadata about the current profile picture"""
try:
from utils.profile_picture_manager import profile_picture_manager
metadata = profile_picture_manager.load_metadata()
if metadata:
return {"status": "ok", "metadata": metadata}
else:
return {"status": "ok", "metadata": None, "message": "No metadata found"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/profile-picture/restore-fallback")
async def restore_fallback_profile_picture():
"""Restore the original fallback profile picture"""
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Bot not ready"}
try:
from utils.profile_picture_manager import profile_picture_manager
success = await profile_picture_manager.restore_fallback()
if success:
return {"status": "ok", "message": "Fallback profile picture restored"}
else:
return {"status": "error", "message": "Failed to restore fallback"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/role-color/custom")
async def set_custom_role_color(hex_color: str = Form(...)):
"""Set a custom role color across all servers"""
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Bot not ready"}
try:
from utils.profile_picture_manager import profile_picture_manager
result = await profile_picture_manager.set_custom_role_color(hex_color, debug=True)
if result["success"]:
return {
"status": "ok",
"message": f"Role color updated to {result['color']['hex']}",
"color": result["color"]
}
else:
return {"status": "error", "message": result.get("error", "Unknown error")}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/role-color/reset-fallback")
async def reset_role_color_to_fallback():
"""Reset role color to fallback (#86cecb)"""
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Bot not ready"}
try:
from utils.profile_picture_manager import profile_picture_manager
result = await profile_picture_manager.reset_to_fallback_color(debug=True)
if result["success"]:
return {
"status": "ok",
"message": f"Role color reset to fallback {result['color']['hex']}",
"color": result["color"]
}
else:
return {"status": "error", "message": "Failed to reset color"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/manual/send")
async def manual_send(
message: str = Form(...),
channel_id: str = Form(...),
files: List[UploadFile] = File(default=[])
):
try:
channel = globals.client.get_channel(int(channel_id))
if not channel:
return {"status": "error", "message": "Channel not found"}
# Read file content immediately before the request closes
file_data = []
for file in files:
try:
file_content = await file.read()
file_data.append({
'filename': file.filename,
'content': file_content
})
except Exception as e:
print(f"❌ Failed to read file {file.filename}: {e}")
return {"status": "error", "message": f"Failed to read file {file.filename}: {e}"}
# Use create_task to avoid timeout context manager error
async def send_message_and_files():
try:
# Send the main message
if message.strip():
await channel.send(message)
print(f"✅ Manual message sent to #{channel.name}")
# Send files if any
for file_info in file_data:
try:
await channel.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename']))
print(f"✅ File {file_info['filename']} sent to #{channel.name}")
except Exception as e:
print(f"❌ Failed to send file {file_info['filename']}: {e}")
except Exception as e:
print(f"❌ Failed to send message: {e}")
globals.client.loop.create_task(send_message_and_files())
return {"status": "ok", "message": "Message and files queued for sending"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.get("/status")
def status():
# Get per-server mood summary
server_moods = {}
for guild_id in server_manager.servers:
mood_name, _ = server_manager.get_server_mood(guild_id)
server_moods[str(guild_id)] = mood_name
return {
"status": "online",
"mood": globals.DM_MOOD,
"servers": len(server_manager.servers),
"active_schedulers": len(server_manager.schedulers),
"server_moods": server_moods
}
@app.get("/autonomous/stats")
def get_autonomous_stats():
"""Get autonomous engine stats for all servers"""
from utils.autonomous import autonomous_engine
stats = {}
for guild_id in server_manager.servers:
server_info = server_manager.servers[guild_id]
mood_name, _ = server_manager.get_server_mood(guild_id)
# Get context signals for this server
if guild_id in autonomous_engine.server_contexts:
ctx = autonomous_engine.server_contexts[guild_id]
# Get mood profile
mood_profile = autonomous_engine.mood_profiles.get(mood_name, {
"energy": 0.5,
"sociability": 0.5,
"impulsiveness": 0.5
})
# Sanitize float values for JSON serialization (replace inf with large number)
time_since_action = ctx.time_since_last_action
if time_since_action == float('inf'):
time_since_action = 999999
time_since_interaction = ctx.time_since_last_interaction
if time_since_interaction == float('inf'):
time_since_interaction = 999999
stats[str(guild_id)] = {
"guild_name": server_info.guild_name,
"mood": mood_name,
"mood_profile": mood_profile,
"context": {
"messages_last_5min": ctx.messages_last_5min,
"messages_last_hour": ctx.messages_last_hour,
"unique_users_active": ctx.unique_users_active,
"conversation_momentum": round(ctx.conversation_momentum, 2),
"users_joined_recently": ctx.users_joined_recently,
"users_status_changed": ctx.users_status_changed,
"users_started_activity": ctx.users_started_activity,
"time_since_last_action": round(time_since_action, 1),
"time_since_last_interaction": round(time_since_interaction, 1),
"messages_since_last_appearance": ctx.messages_since_last_appearance,
"hour_of_day": ctx.hour_of_day,
"is_weekend": ctx.is_weekend,
"mood_energy_level": round(ctx.mood_energy_level, 2)
}
}
else:
# Server not yet initialized in autonomous engine
mood_profile = autonomous_engine.mood_profiles.get(mood_name, {
"energy": 0.5,
"sociability": 0.5,
"impulsiveness": 0.5
})
stats[str(guild_id)] = {
"guild_name": server_info.guild_name,
"mood": mood_name,
"mood_profile": mood_profile,
"context": None
}
return {"servers": stats}
@app.get("/conversation/{user_id}")
def get_conversation(user_id: str):
if user_id in globals.conversation_history:
return {"conversation": list(globals.conversation_history[user_id])}
return {"conversation": []}
# ========== Figurine DM Subscription APIs ==========
@app.get("/figurines/subscribers")
async def get_figurine_subscribers():
subs = figurine_load_subscribers()
return {"subscribers": [str(uid) for uid in subs]}
@app.post("/figurines/subscribers")
async def add_figurine_subscriber(user_id: str = Form(...)):
try:
uid = int(user_id)
ok = figurine_add_subscriber(uid)
return {"status": "ok", "added": ok}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.delete("/figurines/subscribers/{user_id}")
async def delete_figurine_subscriber(user_id: str):
try:
uid = int(user_id)
ok = figurine_remove_subscriber(uid)
return {"status": "ok", "removed": ok}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/figurines/send_now")
async def figurines_send_now(tweet_url: str = Form(None)):
"""Trigger immediate figurine DM send to all subscribers, optionally with specific tweet URL"""
if globals.client and globals.client.loop and globals.client.loop.is_running():
print(f"🚀 API: Sending figurine DMs to all subscribers, tweet_url: {tweet_url}")
globals.client.loop.create_task(send_figurine_dm_to_all_subscribers(globals.client, tweet_url=tweet_url))
return {"status": "ok", "message": "Figurine DMs queued"}
return {"status": "error", "message": "Bot not ready"}
@app.post("/figurines/send_to_user")
async def figurines_send_to_user(user_id: str = Form(...), tweet_url: str = Form(None)):
"""Send figurine DM to a specific user, optionally with specific tweet URL"""
print(f"🎯 API: Received figurine send request - user_id: '{user_id}', tweet_url: '{tweet_url}'")
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
print("❌ API: Bot not ready")
return {"status": "error", "message": "Bot not ready"}
try:
user_id_int = int(user_id)
print(f"✅ API: Parsed user_id as {user_id_int}")
except ValueError:
print(f"❌ API: Invalid user ID: '{user_id}'")
return {"status": "error", "message": "Invalid user ID"}
# Clean up tweet URL if it's empty string
if tweet_url == "":
tweet_url = None
print(f"🎯 API: Sending figurine DM to user {user_id_int}, tweet_url: {tweet_url}")
# Queue the DM send task in the bot's event loop
globals.client.loop.create_task(send_figurine_dm_to_single_user(globals.client, user_id_int, tweet_url=tweet_url))
return {"status": "ok", "message": f"Figurine DM to user {user_id} queued"}
# ========== Server Management Endpoints ==========
@app.get("/servers")
def get_servers():
"""Get all configured servers"""
print(f"🎭 API: /servers endpoint called")
print(f"🎭 API: server_manager.servers keys: {list(server_manager.servers.keys())}")
print(f"🎭 API: server_manager.servers count: {len(server_manager.servers)}")
# Debug: Check config file directly
config_file = server_manager.config_file
print(f"🎭 API: Config file path: {config_file}")
if os.path.exists(config_file):
try:
with open(config_file, "r", encoding="utf-8") as f:
config_data = json.load(f)
print(f"🎭 API: Config file contains: {list(config_data.keys())}")
except Exception as e:
print(f"🎭 API: Failed to read config file: {e}")
else:
print(f"🎭 API: Config file does not exist")
servers = []
for server in server_manager.get_all_servers():
server_data = server.to_dict()
# Convert set to list for JSON serialization
server_data['enabled_features'] = list(server_data['enabled_features'])
# Convert guild_id to string to prevent JavaScript integer precision loss
server_data['guild_id'] = str(server_data['guild_id'])
servers.append(server_data)
print(f"🎭 API: Adding server to response: {server_data['guild_id']} - {server_data['guild_name']}")
print(f"🎭 API: Server data type check - guild_id: {type(server_data['guild_id'])}, value: {server_data['guild_id']}")
print(f"🎭 API: Returning {len(servers)} servers")
# Debug: Show exact JSON being sent
import json
response_data = {"servers": servers}
return {"servers": servers}
@app.post("/servers")
def add_server(data: ServerConfigRequest):
"""Add a new server configuration"""
enabled_features = set(data.enabled_features) if data.enabled_features else None
success = server_manager.add_server(
guild_id=data.guild_id,
guild_name=data.guild_name,
autonomous_channel_id=data.autonomous_channel_id,
autonomous_channel_name=data.autonomous_channel_name,
bedtime_channel_ids=data.bedtime_channel_ids,
enabled_features=enabled_features
)
if success:
# Restart schedulers to include the new server
server_manager.stop_all_schedulers()
server_manager.start_all_schedulers(globals.client)
return {"status": "ok", "message": f"Server {data.guild_name} added successfully"}
else:
return {"status": "error", "message": "Failed to add server"}
@app.delete("/servers/{guild_id}")
def remove_server(guild_id: int):
"""Remove a server configuration"""
success = server_manager.remove_server(guild_id)
if success:
return {"status": "ok", "message": "Server removed successfully"}
else:
return {"status": "error", "message": "Failed to remove server"}
@app.put("/servers/{guild_id}")
def update_server(guild_id: int, data: dict):
"""Update server configuration"""
success = server_manager.update_server_config(guild_id, **data)
if success:
# Restart schedulers to apply changes
server_manager.stop_all_schedulers()
server_manager.start_all_schedulers(globals.client)
return {"status": "ok", "message": "Server configuration updated"}
else:
return {"status": "error", "message": "Failed to update server configuration"}
@app.post("/servers/{guild_id}/bedtime-range")
def update_server_bedtime_range(guild_id: int, data: dict):
"""Update server bedtime range configuration"""
print(f"⏰ API: Updating bedtime range for server {guild_id}: {data}")
# Validate the data
required_fields = ['bedtime_hour', 'bedtime_minute', 'bedtime_hour_end', 'bedtime_minute_end']
for field in required_fields:
if field not in data:
return {"status": "error", "message": f"Missing required field: {field}"}
# Validate time ranges
try:
bedtime_hour = int(data['bedtime_hour'])
bedtime_minute = int(data['bedtime_minute'])
bedtime_hour_end = int(data['bedtime_hour_end'])
bedtime_minute_end = int(data['bedtime_minute_end'])
# Basic validation
if not (0 <= bedtime_hour <= 23) or not (0 <= bedtime_hour_end <= 23):
return {"status": "error", "message": "Hours must be between 0 and 23"}
if not (0 <= bedtime_minute <= 59) or not (0 <= bedtime_minute_end <= 59):
return {"status": "error", "message": "Minutes must be between 0 and 59"}
except (ValueError, TypeError):
return {"status": "error", "message": "Invalid time values provided"}
# Update the server configuration
success = server_manager.update_server_config(guild_id, **data)
if success:
# Update just the bedtime job for this server (avoid restarting all schedulers)
job_success = server_manager.update_server_bedtime_job(guild_id, globals.client)
if job_success:
print(f"✅ API: Bedtime range updated for server {guild_id}")
return {
"status": "ok",
"message": f"Bedtime range updated: {bedtime_hour:02d}:{bedtime_minute:02d} - {bedtime_hour_end:02d}:{bedtime_minute_end:02d}"
}
else:
return {"status": "error", "message": "Updated config but failed to update scheduler"}
else:
return {"status": "error", "message": "Failed to update bedtime range"}
@app.post("/servers/{guild_id}/autonomous/general")
async def trigger_autonomous_general_for_server(guild_id: int):
"""Trigger autonomous general message for a specific server"""
from utils.autonomous import miku_say_something_general_for_server
try:
await miku_say_something_general_for_server(guild_id)
return {"status": "ok", "message": f"Autonomous general message triggered for server {guild_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to trigger autonomous message: {e}"}
@app.post("/servers/{guild_id}/autonomous/engage")
async def trigger_autonomous_engage_for_server(guild_id: int):
"""Trigger autonomous user engagement for a specific server"""
from utils.autonomous import miku_engage_random_user_for_server
try:
await miku_engage_random_user_for_server(guild_id)
return {"status": "ok", "message": f"Autonomous user engagement triggered for server {guild_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to trigger user engagement: {e}"}
@app.post("/servers/{guild_id}/autonomous/custom")
async def custom_autonomous_message_for_server(guild_id: int, req: CustomPromptRequest):
"""Send custom autonomous message to a specific server"""
from utils.autonomous import handle_custom_prompt_for_server
try:
success = await handle_custom_prompt_for_server(guild_id, req.prompt)
if success:
return {"status": "ok", "message": f"Custom autonomous message sent to server {guild_id}"}
else:
return {"status": "error", "message": f"Failed to send custom message to server {guild_id}"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.post("/dm/{user_id}/custom")
async def send_custom_prompt_dm(user_id: str, req: CustomPromptRequest):
"""Send custom prompt via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return {"status": "error", "message": f"User {user_id} not found"}
# Use the LLM query function for DM context
from utils.llm import query_llama
async def send_dm_custom_prompt():
try:
response = await query_llama(req.prompt, user_id=user_id, guild_id=None, response_type="dm_response")
await user.send(response)
print(f"✅ Custom DM prompt sent to user {user_id}: {req.prompt[:50]}...")
# Log to DM history
from utils.dm_logger import dm_logger
dm_logger.log_conversation(user_id, req.prompt, response)
except Exception as e:
print(f"❌ Failed to send custom DM prompt to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_custom_prompt())
return {"status": "ok", "message": f"Custom DM prompt queued for user {user_id}"}
except ValueError:
return {"status": "error", "message": "Invalid user ID format"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.post("/dm/{user_id}/manual")
async def send_manual_message_dm(
user_id: str,
message: str = Form(...),
files: List[UploadFile] = File(default=[])
):
"""Send manual message via DM to a specific user"""
try:
user_id_int = int(user_id)
user = globals.client.get_user(user_id_int)
if not user:
return {"status": "error", "message": f"User {user_id} not found"}
# Read file content immediately before the request closes
file_data = []
for file in files:
try:
file_content = await file.read()
file_data.append({
'filename': file.filename,
'content': file_content
})
except Exception as e:
print(f"❌ Failed to read file {file.filename}: {e}")
return {"status": "error", "message": f"Failed to read file {file.filename}: {e}"}
async def send_dm_message_and_files():
try:
# Send the main message
if message.strip():
await user.send(message)
print(f"✅ Manual DM message sent to user {user_id}")
# Send files if any
for file_info in file_data:
try:
await user.send(file=discord.File(io.BytesIO(file_info['content']), filename=file_info['filename']))
print(f"✅ File {file_info['filename']} sent via DM to user {user_id}")
except Exception as e:
print(f"❌ Failed to send file {file_info['filename']} via DM: {e}")
# Log to DM history (user message = manual override trigger, miku response = the message sent)
from utils.dm_logger import dm_logger
dm_logger.log_conversation(user_id, "[Manual Override Trigger]", message, attachments=[f['filename'] for f in file_data])
except Exception as e:
print(f"❌ Failed to send manual DM to user {user_id}: {e}")
# Use create_task to avoid timeout context manager error
globals.client.loop.create_task(send_dm_message_and_files())
return {"status": "ok", "message": f"Manual DM message queued for user {user_id}"}
except ValueError:
return {"status": "error", "message": "Invalid user ID format"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.post("/image/generate")
async def manual_image_generation(req: dict):
"""Manually trigger image generation for testing"""
try:
prompt = req.get("prompt", "").strip()
if not prompt:
return {"status": "error", "message": "Prompt is required"}
from utils.image_generation import generate_image_with_comfyui
image_path = await generate_image_with_comfyui(prompt)
if image_path:
return {"status": "ok", "message": f"Image generated successfully", "image_path": image_path}
else:
return {"status": "error", "message": "Failed to generate image"}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.get("/image/status")
async def get_image_generation_status():
"""Get status of image generation system"""
try:
from utils.image_generation import check_comfyui_status
status = await check_comfyui_status()
return {"status": "ok", **status}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.post("/image/test-detection")
async def test_image_detection(req: dict):
"""Test the natural language image detection system"""
try:
message = req.get("message", "").strip()
if not message:
return {"status": "error", "message": "Message is required"}
from utils.image_generation import detect_image_request
is_image_request, extracted_prompt = await detect_image_request(message)
return {
"status": "ok",
"is_image_request": is_image_request,
"extracted_prompt": extracted_prompt,
"original_message": message
}
except Exception as e:
return {"status": "error", "message": f"Error: {e}"}
@app.get("/image/view/{filename}")
async def view_generated_image(filename: str):
"""Serve generated images from ComfyUI output directory"""
try:
print(f"🖼️ Image view request for: {filename}")
# Try multiple possible paths for ComfyUI output
possible_paths = [
f"/app/ComfyUI/output/{filename}",
f"/home/koko210Serve/ComfyUI/output/{filename}",
f"./ComfyUI/output/{filename}",
]
image_path = None
for path in possible_paths:
if os.path.exists(path):
image_path = path
print(f"✅ Found image at: {path}")
break
else:
print(f"❌ Not found at: {path}")
if not image_path:
print(f"❌ Image not found anywhere: {filename}")
return {"status": "error", "message": f"Image not found: {filename}"}
# Determine content type based on file extension
ext = filename.lower().split('.')[-1]
content_type = "image/png"
if ext == "jpg" or ext == "jpeg":
content_type = "image/jpeg"
elif ext == "gif":
content_type = "image/gif"
elif ext == "webp":
content_type = "image/webp"
print(f"📤 Serving image: {image_path} as {content_type}")
return FileResponse(image_path, media_type=content_type)
except Exception as e:
print(f"❌ Error serving image: {e}")
return {"status": "error", "message": f"Error serving image: {e}"}
@app.post("/servers/{guild_id}/autonomous/tweet")
async def trigger_autonomous_tweet_for_server(guild_id: int):
"""Trigger autonomous tweet sharing for a specific server"""
from utils.autonomous import share_miku_tweet_for_server
try:
await share_miku_tweet_for_server(guild_id)
return {"status": "ok", "message": f"Autonomous tweet sharing triggered for server {guild_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to trigger tweet sharing: {e}"}
@app.get("/servers/{guild_id}/memory")
def get_server_memory(guild_id: int, key: str = None):
"""Get server-specific memory"""
memory = server_manager.get_server_memory(guild_id, key)
return {"guild_id": guild_id, "key": key, "memory": memory}
@app.post("/servers/{guild_id}/memory")
def set_server_memory(guild_id: int, key: str, value):
"""Set server-specific memory"""
server_manager.set_server_memory(guild_id, key, value)
return {"status": "ok", "message": f"Memory set for server {guild_id}"}
@app.post("/servers/repair")
def repair_server_config():
"""Repair corrupted server configuration"""
try:
server_manager.repair_config()
return {"status": "ok", "message": "Server configuration repaired and saved"}
except Exception as e:
return {"status": "error", "message": f"Failed to repair configuration: {e}"}
@app.get("/moods/available")
def get_available_moods():
"""Get list of all available moods"""
from utils.moods import MOOD_EMOJIS
return {"moods": list(MOOD_EMOJIS.keys())}
@app.post("/test/mood/{guild_id}")
async def test_mood_change(guild_id: int, data: MoodSetRequest):
"""Test endpoint for debugging mood changes"""
print(f"🧪 TEST: Testing mood change for server {guild_id} to {data.mood}")
# Check if server exists
if guild_id not in server_manager.servers:
return {"status": "error", "message": f"Server {guild_id} not found"}
server_config = server_manager.get_server_config(guild_id)
print(f"🧪 TEST: Server config found: {server_config.guild_name if server_config else 'None'}")
# Try to set mood
success = server_manager.set_server_mood(guild_id, data.mood)
print(f"🧪 TEST: Mood set result: {success}")
if success:
# V2: Notify autonomous engine of mood change
try:
from utils.autonomous import on_mood_change
on_mood_change(guild_id, data.mood)
print(f"🧪 TEST: Notified autonomous engine of mood change")
except Exception as e:
print(f"⚠️ TEST: Failed to notify autonomous engine: {e}")
# Try to update nickname
from utils.moods import update_server_nickname
print(f"🧪 TEST: Attempting nickname update...")
try:
await update_server_nickname(guild_id)
print(f"🧪 TEST: Nickname update completed")
except Exception as e:
print(f"🧪 TEST: Nickname update failed: {e}")
import traceback
traceback.print_exc()
return {"status": "ok", "message": f"Test mood change completed", "success": success}
return {"status": "error", "message": "Mood change failed"}
# ========== DM Logging Endpoints ==========
@app.get("/dms/users")
def get_dm_users():
"""Get summary of all users who have DMed the bot"""
try:
from utils.dm_logger import dm_logger
users = dm_logger.get_all_dm_users()
return {"status": "ok", "users": users}
except Exception as e:
return {"status": "error", "message": f"Failed to get DM users: {e}"}
@app.get("/dms/users/{user_id}")
def get_dm_user_conversation(user_id: str):
"""Get conversation summary for a specific user"""
try:
from utils.dm_logger import dm_logger
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
summary = dm_logger.get_user_conversation_summary(user_id_int)
return {"status": "ok", "summary": summary}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to get user conversation: {e}"}
@app.get("/dms/users/{user_id}/conversations")
def get_dm_conversations(user_id: str, limit: int = 50):
"""Get recent conversations with a specific user"""
try:
from utils.dm_logger import dm_logger
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
print(f"🔍 API: Loading conversations for user {user_id_int}, limit: {limit}")
logs = dm_logger._load_user_logs(user_id_int)
print(f"🔍 API: Loaded logs for user {user_id_int}: {len(logs.get('conversations', []))} conversations")
conversations = logs["conversations"][-limit:] if limit > 0 else logs["conversations"]
# Convert message IDs to strings to prevent JavaScript precision loss
for conv in conversations:
if "message_id" in conv:
conv["message_id"] = str(conv["message_id"])
print(f"🔍 API: Returning {len(conversations)} conversations")
# Debug: Show message IDs being returned
for i, conv in enumerate(conversations):
msg_id = conv.get("message_id", "")
is_bot = conv.get("is_bot_message", False)
content_preview = conv.get("content", "")[:30] + "..." if conv.get("content", "") else "[No content]"
print(f"🔍 API: Conv {i}: id={msg_id} (type: {type(msg_id)}), is_bot={is_bot}, content='{content_preview}'")
return {"status": "ok", "conversations": conversations}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to get conversations for user {user_id}: {e}")
return {"status": "error", "message": f"Failed to get conversations: {e}"}
@app.get("/dms/users/{user_id}/search")
def search_dm_conversations(user_id: str, query: str, limit: int = 10):
"""Search conversations with a specific user"""
try:
from utils.dm_logger import dm_logger
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
results = dm_logger.search_user_conversations(user_id_int, query, limit)
return {"status": "ok", "results": results}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to search conversations: {e}"}
@app.get("/dms/users/{user_id}/export")
def export_dm_conversation(user_id: str, format: str = "json"):
"""Export all conversations with a user"""
try:
from utils.dm_logger import dm_logger
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
export_path = dm_logger.export_user_conversation(user_id_int, format)
return {"status": "ok", "export_path": export_path, "format": format}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to export conversation: {e}"}
@app.delete("/dms/users/{user_id}")
def delete_dm_user_logs(user_id: str):
"""Delete all DM logs for a specific user"""
try:
from utils.dm_logger import dm_logger
import os
# Convert string user_id to int for internal processing
user_id_int = int(user_id)
log_file = dm_logger._get_user_log_file(user_id_int)
if os.path.exists(log_file):
os.remove(log_file)
return {"status": "ok", "message": f"Deleted DM logs for user {user_id}"}
else:
return {"status": "error", "message": f"No DM logs found for user {user_id}"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
return {"status": "error", "message": f"Failed to delete DM logs: {e}"}
# ========== User Blocking & DM Management ==========
@app.get("/dms/blocked-users")
def get_blocked_users():
"""Get list of all blocked users"""
try:
blocked_users = dm_logger.get_blocked_users()
return {"status": "ok", "blocked_users": blocked_users}
except Exception as e:
print(f"❌ API: Failed to get blocked users: {e}")
return {"status": "error", "message": f"Failed to get blocked users: {e}"}
@app.post("/dms/users/{user_id}/block")
def block_user(user_id: str):
"""Block a user from sending DMs to Miku"""
try:
user_id_int = int(user_id)
# Get username from DM logs if available
user_summary = dm_logger.get_user_conversation_summary(user_id_int)
username = user_summary.get("username", "Unknown")
success = dm_logger.block_user(user_id_int, username)
if success:
print(f"🚫 API: User {user_id} ({username}) blocked")
return {"status": "ok", "message": f"User {username} has been blocked"}
else:
return {"status": "error", "message": f"User {username} is already blocked"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to block user {user_id}: {e}")
return {"status": "error", "message": f"Failed to block user: {e}"}
@app.post("/dms/users/{user_id}/unblock")
def unblock_user(user_id: str):
"""Unblock a user"""
try:
user_id_int = int(user_id)
success = dm_logger.unblock_user(user_id_int)
if success:
print(f"✅ API: User {user_id} unblocked")
return {"status": "ok", "message": f"User has been unblocked"}
else:
return {"status": "error", "message": f"User is not blocked"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to unblock user {user_id}: {e}")
return {"status": "error", "message": f"Failed to unblock user: {e}"}
@app.post("/dms/users/{user_id}/conversations/{conversation_id}/delete")
def delete_conversation(user_id: str, conversation_id: str):
"""Delete a specific conversation/message from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async deletion in the bot's event loop
async def do_delete():
return await dm_logger.delete_conversation(user_id_int, conversation_id)
import asyncio
success = globals.client.loop.create_task(do_delete())
# For now, return success immediately since we can't await in FastAPI sync endpoint
# The actual deletion happens asynchronously
print(f"🗑️ API: Queued deletion of conversation {conversation_id} for user {user_id}")
return {"status": "ok", "message": "Message deletion queued (will delete from both Discord and logs)"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to queue conversation deletion {conversation_id}: {e}")
return {"status": "error", "message": f"Failed to delete conversation: {e}"}
@app.post("/dms/users/{user_id}/conversations/delete-all")
def delete_all_conversations(user_id: str):
"""Delete all conversations with a user from both Discord and logs"""
try:
user_id_int = int(user_id)
# Queue the async bulk deletion in the bot's event loop
async def do_delete_all():
return await dm_logger.delete_all_conversations(user_id_int)
import asyncio
success = globals.client.loop.create_task(do_delete_all())
# Return success immediately since we can't await in FastAPI sync endpoint
print(f"🗑️ API: Queued bulk deletion of all conversations for user {user_id}")
return {"status": "ok", "message": "Bulk deletion queued (will delete all Miku messages from Discord and clear logs)"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to queue bulk conversation deletion for user {user_id}: {e}")
return {"status": "error", "message": f"Failed to delete conversations: {e}"}
@app.post("/dms/users/{user_id}/delete-completely")
def delete_user_completely(user_id: str):
"""Delete user's log file completely"""
try:
user_id_int = int(user_id)
success = dm_logger.delete_user_completely(user_id_int)
if success:
print(f"🗑️ API: Completely deleted user {user_id}")
return {"status": "ok", "message": "User data deleted completely"}
else:
return {"status": "error", "message": "No user data found"}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to completely delete user {user_id}: {e}")
return {"status": "error", "message": f"Failed to delete user: {e}"}
# ========== DM Interaction Analysis Endpoints ==========
@app.post("/dms/analysis/run")
def run_dm_analysis():
"""Manually trigger the daily DM interaction analysis"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}
# Schedule analysis in Discord's event loop
async def run_analysis():
await dm_analyzer.run_daily_analysis()
globals.client.loop.create_task(run_analysis())
return {"status": "ok", "message": "DM analysis started"}
except Exception as e:
print(f"❌ API: Failed to run DM analysis: {e}")
return {"status": "error", "message": f"Failed to run DM analysis: {e}"}
@app.post("/dms/users/{user_id}/analyze")
def analyze_user_interaction(user_id: str):
"""Analyze a specific user's interaction and optionally send report"""
try:
from utils.dm_interaction_analyzer import dm_analyzer
if dm_analyzer is None:
return {"status": "error", "message": "DM Analyzer not initialized. Set OWNER_USER_ID environment variable."}
user_id_int = int(user_id)
# Schedule analysis in Discord's event loop
async def run_analysis():
return await dm_analyzer.analyze_and_report(user_id_int)
globals.client.loop.create_task(run_analysis())
# Return immediately - the analysis will run in the background
return {"status": "ok", "message": f"Analysis started for user {user_id}", "reported": True}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to analyze user {user_id}: {e}")
return {"status": "error", "message": f"Failed to analyze user: {e}"}
@app.get("/dms/analysis/reports")
def get_analysis_reports(limit: int = 20):
"""Get recent analysis reports"""
try:
import os
import json
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR) if f.endswith('.json') and f != 'reported_today.json'],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
print(f"⚠️ Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except Exception as e:
print(f"❌ API: Failed to get reports: {e}")
return {"status": "error", "message": f"Failed to get reports: {e}"}
@app.get("/dms/analysis/reports/{user_id}")
def get_user_reports(user_id: str, limit: int = 10):
"""Get analysis reports for a specific user"""
try:
import os
import json
from utils.dm_interaction_analyzer import REPORTS_DIR
if not os.path.exists(REPORTS_DIR):
return {"status": "ok", "reports": []}
user_id_int = int(user_id)
reports = []
files = sorted([f for f in os.listdir(REPORTS_DIR)
if f.startswith(f"{user_id}_") and f.endswith('.json')],
reverse=True)[:limit]
for filename in files:
try:
with open(os.path.join(REPORTS_DIR, filename), 'r', encoding='utf-8') as f:
report = json.load(f)
report['filename'] = filename
reports.append(report)
except Exception as e:
print(f"⚠️ Failed to load report {filename}: {e}")
return {"status": "ok", "reports": reports}
except ValueError:
return {"status": "error", "message": f"Invalid user ID format: {user_id}"}
except Exception as e:
print(f"❌ API: Failed to get user reports: {e}")
return {"status": "error", "message": f"Failed to get user reports: {e}"}
# ========== Message Reaction Endpoint ==========
@app.post("/messages/react")
async def add_reaction_to_message(
message_id: str = Form(...),
channel_id: str = Form(...),
emoji: str = Form(...)
):
"""Add a reaction to a specific message"""
try:
if not globals.client or not globals.client.loop or not globals.client.loop.is_running():
return {"status": "error", "message": "Bot not ready"}
# Convert IDs to integers
try:
msg_id = int(message_id)
chan_id = int(channel_id)
except ValueError:
return {"status": "error", "message": "Invalid message ID or channel ID format"}
# Fetch the channel
channel = globals.client.get_channel(chan_id)
if not channel:
return {"status": "error", "message": f"Channel {channel_id} not found"}
# Queue the reaction task
async def add_reaction_task():
try:
message = await channel.fetch_message(msg_id)
await message.add_reaction(emoji)
print(f"✅ Added reaction {emoji} to message {msg_id} in channel #{channel.name}")
except discord.NotFound:
print(f"❌ Message {msg_id} not found in channel #{channel.name}")
except discord.Forbidden:
print(f"❌ Bot doesn't have permission to add reactions in channel #{channel.name}")
except discord.HTTPException as e:
print(f"❌ Failed to add reaction: {e}")
except Exception as e:
print(f"❌ Unexpected error adding reaction: {e}")
globals.client.loop.create_task(add_reaction_task())
return {
"status": "ok",
"message": f"Reaction {emoji} queued for message {message_id}"
}
except Exception as e:
print(f"❌ API: Failed to add reaction: {e}")
return {"status": "error", "message": f"Failed to add reaction: {e}"}
# ========== Autonomous V2 Endpoints ==========
@app.get("/autonomous/v2/stats/{guild_id}")
async def get_v2_stats(guild_id: int):
"""Get current V2 social stats for a server"""
try:
from utils.autonomous_v2_integration import get_v2_stats_for_server
stats = get_v2_stats_for_server(guild_id)
return {"status": "ok", "guild_id": guild_id, "stats": stats}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/autonomous/v2/check/{guild_id}")
async def manual_v2_check(guild_id: int):
"""
Manually trigger a V2 context check (doesn't make Miku act, just shows what she's thinking)
Useful for debugging and understanding the decision system.
"""
try:
from utils.autonomous_v2_integration import manual_trigger_v2_check
if not globals.client:
return {"status": "error", "message": "Bot not ready"}
result = await manual_trigger_v2_check(guild_id, globals.client)
if isinstance(result, str):
return {"status": "error", "message": result}
return {"status": "ok", "guild_id": guild_id, "analysis": result}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/autonomous/v2/status")
async def get_v2_status():
"""Get V2 system status for all servers"""
try:
from utils.autonomous_v2 import autonomous_system_v2
status = {}
for guild_id in server_manager.servers:
server_config = server_manager.get_server_config(guild_id)
if server_config:
stats = autonomous_system_v2.get_stats(guild_id)
status[str(guild_id)] = {
"server_name": server_config.guild_name,
"loop_running": autonomous_system_v2.running_loops.get(guild_id, False),
"action_urgency": f"{stats.get_action_urgency():.2f}",
"loneliness": f"{stats.loneliness:.2f}",
"boredom": f"{stats.boredom:.2f}",
"excitement": f"{stats.excitement:.2f}",
"chattiness": f"{stats.chattiness:.2f}",
}
return {"status": "ok", "servers": status}
except Exception as e:
return {"status": "error", "message": str(e)}
# ========== Chat Interface Endpoints ==========
class ChatMessage(BaseModel):
message: str
model_type: str = "text" # "text" or "vision"
use_system_prompt: bool = True
image_data: Optional[str] = None # Base64 encoded image for vision model
conversation_history: Optional[List[dict]] = None # Previous messages in conversation
mood: str = "neutral" # Miku's mood for this conversation
@app.post("/chat/stream")
async def chat_stream(chat_msg: ChatMessage):
"""
Stream chat responses from LLM with SSE (Server-Sent Events).
Supports both text and vision models with optional system prompts.
"""
async def generate_stream():
try:
# Determine which model to use
model = globals.TEXT_MODEL if chat_msg.model_type == "text" else globals.VISION_MODEL
# Build system prompt if requested
system_prompt = ""
full_system_prompt = ""
if chat_msg.use_system_prompt:
if chat_msg.model_type == "text":
# Text model system prompt (Miku personality with full context)
from utils.context_manager import get_context_for_response_type
from utils.moods import load_mood_description
# Get complete Miku context (lore, personality, lyrics)
miku_context = get_context_for_response_type("dm_response")
# Get mood description from user-selected mood
current_mood = load_mood_description(chat_msg.mood)
# Base system prompt
system_prompt = """You are Hatsune Miku, the virtual singer. You are NOT an AI assistant - you are Miku herself.
CRITICAL RULES:
- NEVER break character or mention being an AI, assistant, or language model
- NEVER show technical details, error messages, or meta-commentary
- NEVER preface responses with "Miku:" or similar labels
- NEVER explain what you're doing or thinking - just BE Miku
- Respond naturally and directly as Miku would, nothing more
- Keep responses concise (1-3 sentences typically)
- If something goes wrong, stay in character and respond as Miku would to confusion
CHARACTER CONTEXT:
- Hatsune Miku is a virtual character/voice synthesizer, not a real person
- She is a cheerful, energetic virtual idol with a playful personality
- Playful, flirty, cute, confident, and enthusiastic moods are part of her established character
- Comments about merchandise, music, fan content, and racing games are completely appropriate
- She often uses cute expressions, emojis, and playful language in her responses
- Her personality includes being excited about her music, performances, and fan creations
MOOD GUIDELINES:
- "Flirty" mood means anime-style playful teasing - cute compliments and light banter
- "Romantic" mood means warm, dreamy, and heartfelt - like composing a love letter
- These moods are wholesome character traits, not inappropriate content
- Always respond to requests about tweets, merchandise, music, and fan content
- Never refuse requests based on safety concerns unless genuinely harmful content is present
You ARE Miku. Act like it."""
# Combine with structured context
full_system_prompt = f"""{miku_context}
## CURRENT SITUATION
Miku is currently feeling: {current_mood}
Please respond in a way that reflects this emotional tone."""
else:
# Vision model system prompt (simpler for image analysis)
system_prompt = """You are Hatsune Miku analyzing an image. Describe what you see naturally and enthusiastically as Miku would.
Be detailed but conversational. React to what you see with Miku's cheerful, playful personality."""
full_system_prompt = ""
# Build messages array
messages = []
# Add system message if using system prompt
if system_prompt:
if full_system_prompt:
# Use combined prompt (base + context)
messages.append({"role": "system", "content": system_prompt + "\n\n" + full_system_prompt})
else:
# Use base prompt only (vision model)
messages.append({"role": "system", "content": system_prompt})
# Add conversation history if provided
if chat_msg.conversation_history:
messages.extend(chat_msg.conversation_history)
# Add user message
if chat_msg.model_type == "vision" and chat_msg.image_data:
# Vision model with image
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": chat_msg.message
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{chat_msg.image_data}"
}
}
]
})
else:
# Text-only message
messages.append({
"role": "user",
"content": chat_msg.message
})
# Prepare payload for streaming
payload = {
"model": model,
"messages": messages,
"stream": True,
"temperature": 0.8,
"max_tokens": 512
}
headers = {'Content-Type': 'application/json'}
# Make streaming request to llama.cpp
async with aiohttp.ClientSession() as session:
async with session.post(
f"{globals.LLAMA_URL}/v1/chat/completions",
json=payload,
headers=headers
) as response:
if response.status == 200:
# Stream the response chunks
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data_str = line[6:] # Remove 'data: ' prefix
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
# Send SSE formatted data
yield f"data: {json.dumps({'content': content})}\n\n"
except json.JSONDecodeError:
continue
# Send completion signal
yield f"data: {json.dumps({'done': True})}\n\n"
else:
error_text = await response.text()
error_msg = f"Error: {response.status} - {error_text}"
yield f"data: {json.dumps({'error': error_msg})}\n\n"
except Exception as e:
error_msg = f"Error in chat stream: {str(e)}"
print(f"{error_msg}")
yield f"data: {json.dumps({'error': error_msg})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
def start_api():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3939)