Files
miku-discord/bot/utils/autonomous_v1_legacy.py
koko210Serve 615dd4a5ef fix(P3): 3 priority-3 fixes — timezone, decay rounding, rate limiter
#16  Timezone consistency — added TZ=Europe/Sofia to docker-compose.yml
     so datetime.now() returns local time inside the container. Removed
     the +3 hour hack from get_time_of_day(). All three time-of-day
     consumers (autonomous_v1_legacy, moods, autonomous_engine) now
     use the same correct local hour automatically.

#17  Decay truncation — replaced int() with round() in decay_events()
     so a counter of 1 survives one more 15-minute cycle instead of
     being immediately zeroed (round(0.841)=1 vs int(0.841)=0).

#20  Unpersisted rate limiter — _last_action_execution dict in
     autonomous.py is now seeded from the engine's persisted
     server_last_action on import, so restarts don't bypass the
     30-second cooldown.

Note: #18 (dead config fields) was a false positive — autonomous_interval_minutes
IS used by the scheduler. #19 deferred to bipolar mode rework.
2026-02-23 13:53:22 +02:00

1137 lines
48 KiB
Python

# autonomous.py
import random
import time
import json
import os
import re
from datetime import datetime
import discord
from discord import Status
from discord import TextChannel
from difflib import SequenceMatcher
import globals
from server_manager import server_manager
from utils.llm import query_llama
from utils.moods import MOOD_EMOJIS
from utils.twitter_fetcher import fetch_miku_tweets
from utils.image_handling import (
analyze_image_with_qwen,
download_and_encode_image,
download_and_encode_media,
extract_video_frames,
analyze_video_with_vision,
convert_gif_to_mp4
)
from utils.sleep_responses import SLEEP_RESPONSES
from utils.logger import get_logger
logger = get_logger('autonomous')
async def fetch_tweet_by_url(tweet_url: str):
"""Fetch a specific tweet by its URL using twscrape.
Args:
tweet_url: URL of the tweet to fetch (x.com, twitter.com, or fxtwitter.com)
Returns:
Dictionary with tweet data or None if fetch fails
"""
try:
# Extract tweet ID from URL
# Handle various URL formats:
# https://twitter.com/username/status/1234567890
# https://x.com/username/status/1234567890
# https://fxtwitter.com/username/status/1234567890
match = re.search(r'/status/(\d+)', tweet_url)
if not match:
logger.error(f"Could not extract tweet ID from URL: {tweet_url}")
return None
tweet_id = int(match.group(1))
from twscrape import API
# Load cookies from JSON file
from pathlib import Path
COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json"
if not COOKIE_PATH.exists():
logger.error(f"Cookie file not found: {COOKIE_PATH}")
return None
import json
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
cookie_list = json.load(f)
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
api = API()
await api.pool.add_account(
username="HSankyuu39",
password="x",
email="x",
email_password="x",
cookies=cookie_header
)
await api.pool.login_all()
# Fetch the specific tweet using search (same approach as figurine_notifier.py)
from twscrape import gather
logger.debug(f"Searching for tweet with ID {tweet_id}")
search_results = await gather(api.search(f"{tweet_id}", limit=1))
logger.debug(f"Search returned {len(search_results)} results")
# Check if we found the tweet
tweet = None
for search_tweet in search_results:
if str(search_tweet.id) == str(tweet_id):
tweet = search_tweet
logger.debug(f"Found matching tweet with ID {tweet.id}")
break
if not tweet and search_results:
# If no exact match but we have results, use the first one
tweet = search_results[0]
logger.debug(f"Using first search result with ID {tweet.id}")
if not tweet:
logger.error(f"Failed to fetch tweet ID {tweet_id}")
return None
# Extract media URLs if present
media_urls = []
if hasattr(tweet, 'media') and tweet.media:
if hasattr(tweet.media, 'photos'):
for photo in tweet.media.photos:
if hasattr(photo, 'url'):
media_url = photo.url
if '?' in media_url:
media_url = media_url.split('?')[0]
media_url += '?name=large'
media_urls.append(media_url)
# Extract username and build URL
username = tweet.user.username
tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
result = {
"username": username,
"text": tweet.rawContent if hasattr(tweet, 'rawContent') else "",
"url": tweet_url,
"media": media_urls if media_urls else []
}
logger.info(f"Successfully fetched tweet {tweet_id} from @{username}")
return result
except Exception as e:
logger.error(f"Error fetching tweet by URL {tweet_url}: {e}")
return None
# Server-specific memory storage
_server_autonomous_messages = {} # guild_id -> rotating buffer of last general messages
_server_user_engagements = {} # guild_id -> user_id -> timestamp
_reacted_message_ids = set() # Track messages we've already reacted to
MAX_HISTORY = 10
LAST_SENT_TWEETS_FILE = "memory/last_sent_tweets.json"
LAST_SENT_TWEETS = []
AUTONOMOUS_CONFIG_FILE = "memory/autonomous_config.json"
def load_autonomous_config():
if os.path.exists(AUTONOMOUS_CONFIG_FILE):
with open(AUTONOMOUS_CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_autonomous_config(config):
with open(AUTONOMOUS_CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2)
def setup_autonomous_speaking():
"""Setup autonomous speaking for all configured servers"""
# This is now handled by the server manager
logger.debug("Autonomous Miku setup delegated to server manager!")
async def miku_autonomous_tick_for_server(guild_id: int, action_type="general", force=False, force_action=None):
"""Run autonomous behavior for a specific server"""
if not force and random.random() > 0.10: # 10% chance to act
return
if force_action:
action_type = force_action
else:
action_type = random.choice(["general", "engage_user", "share_tweet"])
if action_type == "general":
await miku_say_something_general_for_server(guild_id)
elif action_type == "engage_user":
await miku_engage_random_user_for_server(guild_id)
else:
await share_miku_tweet_for_server(guild_id)
async def miku_say_something_general_for_server(guild_id: int):
"""Miku says something general in a specific server"""
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if not channel:
logger.warning(f"Autonomous channel not found for server {guild_id}")
return
# Check if evil mode is active
from utils.evil_mode import is_evil_mode, get_evil_general_prompt, get_evil_mood_emoji, get_current_evil_mood
evil_mode = is_evil_mode()
# Use appropriate mood based on mode
if evil_mode:
mood, _ = get_current_evil_mood()
emoji = get_evil_mood_emoji(mood)
else:
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
time_of_day = get_time_of_day()
# Special handling for sleep state (only in non-evil mode)
if mood == "asleep" and not evil_mode:
message = random.choice(SLEEP_RESPONSES)
await channel.send(message)
return
# Get server-specific message history
if guild_id not in _server_autonomous_messages:
_server_autonomous_messages[guild_id] = []
history_summary = "\n".join(f"- {msg}" for msg in _server_autonomous_messages[guild_id][-5:]) if _server_autonomous_messages[guild_id] else "None yet."
# Use evil prompt if in evil mode
if evil_mode:
prompt = get_evil_general_prompt(mood, time_of_day, history_summary)
else:
prompt = (
f"Miku is feeling {mood}. It's currently {time_of_day}. "
f"Write a short, natural message that Miku might say out of the blue in a chat. "
f"She might greet everyone, make a cute observation, ask a silly question, or say something funny. "
f"Make sure it feels casual and spontaneous, like a real person might say.\n\n"
f"Here are some things Miku recently said, do not repeat them or say anything too similar:\n{history_summary}"
)
for attempt in range(3): # retry up to 3 times if message is too similar
# Use consistent user_id per guild for autonomous actions to enable conversation history
# and prompt caching, rather than creating new IDs with timestamps
message = await query_llama(prompt, user_id=f"miku-autonomous-{guild_id}", guild_id=guild_id, response_type="autonomous_general")
if not is_too_similar(message, _server_autonomous_messages[guild_id]):
break
logger.debug("Response was too similar to past messages, retrying...")
try:
await channel.send(message)
_server_autonomous_messages[guild_id].append(message)
if len(_server_autonomous_messages[guild_id]) > MAX_HISTORY:
_server_autonomous_messages[guild_id].pop(0)
character_name = "Evil Miku" if evil_mode else "Miku"
logger.info(f"{character_name} said something general in #{channel.name} (Server: {server_config.guild_name})")
except Exception as e:
logger.error(f"Failed to send autonomous message: {e}")
async def miku_engage_random_user_for_server(guild_id: int, user_id: str = None, engagement_type: str = None, manual_trigger: bool = False):
"""Miku engages a random user in a specific server
Args:
guild_id: The server ID
user_id: Optional specific user ID to engage (as string). If None, picks random user
engagement_type: Optional engagement style - 'activity', 'general', 'status', or None for auto-detect
manual_trigger: If True, bypass cooldown checks (for web UI manual triggers)
"""
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
guild = globals.client.get_guild(guild_id)
if not guild:
logger.warning(f"Guild {guild_id} not found.")
return
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if not channel:
logger.warning(f"Autonomous channel not found for server {guild_id}")
return
# Get target user
if user_id:
# Target specific user
try:
target = guild.get_member(int(user_id))
if not target:
logger.warning(f"User {user_id} not found in server {guild_id}")
return
if target.bot:
logger.warning(f"Cannot engage bot user {user_id}")
return
logger.info(f"Targeting specific user: {target.display_name} (ID: {user_id})")
except ValueError:
logger.warning(f"Invalid user ID: {user_id}")
return
else:
# Pick random user
members = [
m for m in guild.members
if m.status in {Status.online, Status.idle, Status.dnd} and not m.bot
]
if not members:
logger.warning(f"No available members to talk to in server {guild_id}.")
return
target = random.choice(members)
logger.info(f"Randomly selected user: {target.display_name}")
time_of_day = get_time_of_day()
# Initialize server-specific user engagements
if guild_id not in _server_user_engagements:
_server_user_engagements[guild_id] = {}
now = time.time()
last_time = _server_user_engagements[guild_id].get(target.id, 0)
# Skip cooldown check if this is a manual trigger from web UI
if not manual_trigger and now - last_time < 43200: # 12 hours in seconds
logger.warning(f"[engage_user→general] Recently engaged {target.display_name} in server {guild_id}, falling back to general message (cooldown).")
await miku_say_something_general_for_server(guild_id)
return
if manual_trigger:
logger.info(f"Manual trigger - bypassing cooldown for {target.display_name} in server {guild_id}")
activity_name = None
if target.activities:
for a in target.activities:
if hasattr(a, 'name') and a.name:
activity_name = a.name
break
# Use server-specific mood instead of global
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
is_invisible = target.status == Status.offline
display_name = target.display_name
# Check if evil mode is active
from utils.evil_mode import is_evil_mode, get_evil_engage_user_prompt, get_evil_mood_emoji, get_current_evil_mood
evil_mode = is_evil_mode()
# Use appropriate mood based on mode
if evil_mode:
mood, _ = get_current_evil_mood()
emoji = get_evil_mood_emoji(mood)
else:
# Use server-specific mood instead of global
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
# Build prompt based on engagement_type and mode
if evil_mode:
prompt = get_evil_engage_user_prompt(mood, emoji, time_of_day, display_name,
activity_name, is_invisible, engagement_type)
else:
prompt = f"Miku is feeling {mood} {emoji} during the {time_of_day}. "
if engagement_type == 'activity':
# Force activity-based engagement
if activity_name:
prompt += (
f"She notices {display_name} is currently playing or doing: {activity_name}. "
f"Miku wants to comment on this activity and start a friendly conversation about it."
)
else:
prompt += (
f"She wants to ask {display_name} what they're up to or what they like to do for fun."
)
elif engagement_type == 'status':
# Force status-based engagement
prompt += f"She notices {display_name}'s current status is {target.status.name}. "
if is_invisible:
prompt += (
f"Miku suspects that {display_name} is being sneaky and invisible 👻. "
f"She wants to playfully call them out in a fun, teasing, but still affectionate way."
)
else:
prompt += (
f"Miku wants to comment on their current status and start a conversation."
)
elif engagement_type == 'general':
# Force general conversation
prompt += (
f"Miku wants to casually start a conversation with {display_name}, "
f"maybe ask how they're doing, what they're up to, or talk about something random."
)
else:
# Auto-detect (original behavior)
prompt += f"She notices {display_name}'s current status is {target.status.name}. "
if is_invisible:
prompt += (
f"Miku suspects that {display_name} is being sneaky and invisible 👻. "
f"She wants to playfully call them out in a fun, teasing, but still affectionate way. "
)
elif activity_name:
prompt += (
f"They appear to be playing or doing: {activity_name}. "
f"Miku wants to comment on this and start a friendly conversation."
)
else:
prompt += (
f"Miku wants to casually start a conversation with them, maybe ask how they're doing, what they're up to, or even talk about something random with them."
)
prompt += (
f"\nThe message should be short and reflect Miku's current mood."
)
if engagement_type:
logger.debug(f"Engagement type: {engagement_type}")
try:
# Use consistent user_id for engaging users to enable conversation history
message = await query_llama(prompt, user_id=f"miku-engage-{guild_id}", guild_id=guild_id)
await channel.send(f"{target.mention} {message}")
_server_user_engagements[guild_id][target.id] = time.time()
character_name = "Evil Miku" if evil_mode else "Miku"
logger.info(f"{character_name} engaged {display_name} in server {server_config.guild_name}")
except Exception as e:
logger.error(f"Failed to engage user: {e}")
async def miku_detect_and_join_conversation_for_server(guild_id: int, force: bool = False):
"""Miku detects and joins conversations in a specific server
Args:
guild_id: The server ID
force: If True, bypass activity checks and random chance (for manual triggers)
"""
logger.debug(f"[Join Conv] Called for server {guild_id} (force={force})")
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if not isinstance(channel, TextChannel):
logger.warning(f"Autonomous channel is invalid or not found for server {guild_id}")
return
# Fetch last 20 messages (for filtering)
try:
messages = [msg async for msg in channel.history(limit=20)]
logger.debug(f"[Join Conv] Fetched {len(messages)} messages from history")
except Exception as e:
logger.error(f"Failed to fetch channel history for server {guild_id}: {e}")
return
# Filter messages based on force mode
if force:
# When forced, use messages from real users (no time limit) - but limit to last 10
recent_msgs = [msg for msg in messages if not msg.author.bot][:10]
logger.debug(f"[Join Conv] Force mode: Using last {len(recent_msgs)} messages from users (no time limit)")
else:
# Normal mode: Filter to messages in last 10 minutes from real users (not bots)
recent_msgs = [
msg for msg in messages
if not msg.author.bot
and (datetime.now(msg.created_at.tzinfo) - msg.created_at).total_seconds() < 600
]
logger.debug(f"[Join Conv] Found {len(recent_msgs)} recent messages from users (last 10 min)")
user_ids = set(msg.author.id for msg in recent_msgs)
if not force:
if len(recent_msgs) < 5 or len(user_ids) < 2:
# Not enough activity
logger.debug(f"[Join Conv] Not enough activity: {len(recent_msgs)} messages, {len(user_ids)} users (need 5+ messages, 2+ users)")
return
# Note: V1 had a redundant 50% coin flip here, removed since V2 engine already decided to act
else:
logger.debug(f"[Join Conv] Force mode - bypassing activity checks")
if len(recent_msgs) < 1:
logger.warning(f"[Join Conv] No messages found in channel history")
return
# Use last 10 messages for context (oldest to newest)
convo_lines = reversed(recent_msgs[:10])
history_text = "\n".join(
f"{msg.author.display_name}: {msg.content}" for msg in convo_lines
)
# Check if evil mode is active
from utils.evil_mode import is_evil_mode, get_evil_conversation_join_prompt, get_evil_mood_emoji, get_current_evil_mood
evil_mode = is_evil_mode()
# Use appropriate mood based on mode
if evil_mode:
mood, _ = get_current_evil_mood()
emoji = get_evil_mood_emoji(mood)
prompt = get_evil_conversation_join_prompt(mood, emoji, history_text)
else:
# Use server-specific mood instead of global
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
prompt = (
f"Miku is watching a conversation happen in the chat. Her current mood is {mood} {emoji}. "
f"She wants to say something relevant, playful, or insightful based on what people are talking about.\n\n"
f"Here's the conversation:\n{history_text}\n\n"
f"Write a short reply that feels natural and adds to the discussion. It should reflect Miku's mood and personality."
)
try:
# Use consistent user_id for joining conversations to enable conversation history
reply = await query_llama(prompt, user_id=f"miku-conversation-{guild_id}", guild_id=guild_id, response_type="conversation_join")
await channel.send(reply)
character_name = "Evil Miku" if evil_mode else "Miku"
logger.info(f"{character_name} joined an ongoing conversation in server {server_config.guild_name}")
except Exception as e:
logger.error(f"Failed to interject in conversation: {e}")
async def share_miku_tweet_for_server(guild_id: int, tweet_url: str = None):
"""Share a Miku tweet in a specific server
Args:
guild_id: The server ID to share the tweet to
tweet_url: Optional URL of a specific tweet to share. If None, fetches a random tweet.
"""
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
channel = globals.client.get_channel(server_config.autonomous_channel_id)
# If a specific tweet URL is provided, fetch that tweet
if tweet_url:
tweet = await fetch_tweet_by_url(tweet_url)
if not tweet:
logger.error(f"Failed to fetch tweet from URL: {tweet_url}")
return
else:
# Fetch random tweets as usual
tweets = await fetch_miku_tweets(limit=5)
if not tweets:
logger.warning(f"No good tweets found for server {guild_id}")
return
fresh_tweets = [t for t in tweets if t["url"] not in LAST_SENT_TWEETS]
if not fresh_tweets:
logger.warning(f"All fetched tweets were recently sent in server {guild_id}. Reusing tweets.")
fresh_tweets = tweets
tweet = random.choice(fresh_tweets)
LAST_SENT_TWEETS.append(tweet["url"])
if len(LAST_SENT_TWEETS) > 50:
LAST_SENT_TWEETS.pop(0)
save_last_sent_tweets()
# Check if evil mode is active
from utils.evil_mode import is_evil_mode, get_evil_tweet_prompt, get_evil_mood_emoji, get_current_evil_mood
evil_mode = is_evil_mode()
# Optionally analyze first image if media exists
img_desc = None
if tweet.get("media") and len(tweet["media"]) > 0:
first_img_url = tweet["media"][0]
base64_img = await download_and_encode_image(first_img_url)
if base64_img:
img_desc = await analyze_image_with_qwen(base64_img)
# Prepare prompt based on mode
if evil_mode:
mood, _ = get_current_evil_mood()
emoji = get_evil_mood_emoji(mood)
base_prompt = get_evil_tweet_prompt(mood, emoji, tweet['username'], tweet['text'], img_desc)
else:
# Use server-specific mood instead of global
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
base_prompt = f"Here's a tweet from @{tweet['username']}:\n\n{tweet['text']}\n\nComment on it in a fun Miku style! Miku's current mood is {mood} {emoji}. Make sure the comment reflects Miku's mood and personality."
if img_desc:
base_prompt += f"\n\nThe image looks like this: {img_desc}"
miku_comment = await query_llama(base_prompt, user_id=f"autonomous-{guild_id}", guild_id=guild_id, response_type="autonomous_tweet")
# Post to Discord (convert to fxtwitter for better embeds)
fx_tweet_url = tweet['url'].replace("twitter.com", "fxtwitter.com").replace("x.com", "fxtwitter.com")
await channel.send(f"{fx_tweet_url}")
await channel.send(miku_comment)
async def handle_custom_prompt_for_server(guild_id: int, user_prompt: str):
"""Handle custom prompt for a specific server"""
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return False
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if not channel:
logger.warning(f"Autonomous channel not found for server {guild_id}")
return False
mood = server_config.current_mood_name
emoji = MOOD_EMOJIS.get(mood, "")
time_of_day = get_time_of_day()
# Wrap user's idea in Miku context
prompt = (
f"Miku is feeling {mood} {emoji} during the {time_of_day}. "
f"She has been instructed to: \"{user_prompt.strip()}\"\n\n"
f"Write a short, natural message as Miku that follows this instruction. "
f"Make it feel spontaneous, emotionally in character, and aligned with her mood and personality. Decide if the time of day is relevant to this request or not and if it is not, do not mention it."
)
try:
# Use consistent user_id for manual prompts to enable conversation history
message = await query_llama(prompt, user_id=f"miku-manual-{guild_id}", guild_id=guild_id, response_type="autonomous_general")
await channel.send(message)
logger.info(f"Miku responded to custom prompt in server {server_config.guild_name}")
# Add to server-specific message history
if guild_id not in _server_autonomous_messages:
_server_autonomous_messages[guild_id] = []
_server_autonomous_messages[guild_id].append(message)
if len(_server_autonomous_messages[guild_id]) > MAX_HISTORY:
_server_autonomous_messages[guild_id].pop(0)
return True
except Exception as e:
logger.error(f"Failed to send custom autonomous message: {e}")
return False
# Legacy functions for backward compatibility - these now delegate to server-specific versions
async def miku_autonomous_tick(action_type="general", force=False, force_action=None):
"""Legacy function - now runs for all servers"""
for guild_id in server_manager.servers:
await miku_autonomous_tick_for_server(guild_id, action_type, force, force_action)
async def miku_say_something_general():
"""Legacy function - now runs for all servers"""
for guild_id in server_manager.servers:
await miku_say_something_general_for_server(guild_id)
async def miku_engage_random_user(user_id: str = None, engagement_type: str = None, manual_trigger: bool = False):
"""Legacy function - now runs for all servers
Args:
user_id: Optional specific user ID to engage
engagement_type: Optional engagement style
manual_trigger: If True, bypass cooldown checks (for web UI manual triggers)
"""
for guild_id in server_manager.servers:
await miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type, manual_trigger=manual_trigger)
async def miku_detect_and_join_conversation(force: bool = False):
"""Legacy function - now runs for all servers
Args:
force: If True, bypass activity checks and random chance (for manual triggers)
"""
for guild_id in server_manager.servers:
await miku_detect_and_join_conversation_for_server(guild_id, force=force)
async def share_miku_tweet(tweet_url: str = None):
"""Legacy function - now runs for all servers
Args:
tweet_url: Optional URL of a specific tweet to share. If None, fetches a random tweet.
"""
for guild_id in server_manager.servers:
await share_miku_tweet_for_server(guild_id, tweet_url=tweet_url)
async def handle_custom_prompt(user_prompt: str):
"""Legacy function - now runs for all servers"""
results = []
for guild_id in server_manager.servers:
result = await handle_custom_prompt_for_server(guild_id, user_prompt)
results.append(result)
return any(results)
def load_last_sent_tweets():
global LAST_SENT_TWEETS
if os.path.exists(LAST_SENT_TWEETS_FILE):
try:
with open(LAST_SENT_TWEETS_FILE, "r", encoding="utf-8") as f:
LAST_SENT_TWEETS = json.load(f)
except Exception as e:
logger.error(f"Failed to load last sent tweets: {e}")
LAST_SENT_TWEETS = []
else:
LAST_SENT_TWEETS = []
def save_last_sent_tweets():
try:
with open(LAST_SENT_TWEETS_FILE, "w", encoding="utf-8") as f:
json.dump(LAST_SENT_TWEETS, f)
except Exception as e:
logger.error(f"Failed to save last sent tweets: {e}")
def get_time_of_day():
hour = datetime.now().hour
if 5 <= hour < 12:
return "morning"
elif 12 <= hour < 18:
return "afternoon"
elif 18 <= hour < 22:
return "evening"
return "late night. Miku wonders if anyone is still awake"
def is_too_similar(new_message, history, threshold=0.85):
for old in history:
ratio = SequenceMatcher(None, new_message.lower(), old.lower()).ratio()
if ratio > threshold:
return True
return False
# ========== Autonomous Reaction System ==========
# Mood-based emoji mappings for autonomous reactions
MOOD_REACTION_EMOJIS = {
"bubbly": ["", "🫧", "💙", "🌟", "💫", "🎀", "🌸"],
"sleepy": ["😴", "💤", "🌙", "😪", "🥱"],
"curious": ["👀", "🤔", "", "🔍", "💭"],
"shy": ["👉👈", "🙈", "😊", "💕", "☺️"],
"serious": ["🤨", "📝", "👔", "💼", "🎯"],
"excited": ["", "🎉", "😆", "🌟", "💫", "🎊", "🔥"],
"silly": ["🪿", "😜", "🤪", "😝", "🎭", "🎪"],
"melancholy": ["🍷", "🌧️", "💭", "🥀", "🌙"],
"flirty": ["🫦", "😏", "💋", "💕", "😘", "💖"],
"romantic": ["💌", "💖", "💕", "💝", "❤️", "🌹"],
"irritated": ["😒", "💢", "😤", "🙄", "😑"],
"angry": ["💢", "😠", "👿", "💥", "😡"],
"neutral": ["💙", "👍", "😊", "", "🎵"],
"asleep": [] # Don't react when asleep
}
async def _analyze_message_media(message):
"""
Analyze any media (images, videos, GIFs) in a message.
Returns a description string or None if no media.
"""
if not message.attachments:
return None
for attachment in message.attachments:
try:
# Handle images
if any(attachment.filename.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".webp"]):
logger.debug(f" Analyzing image for reaction: {attachment.filename}")
base64_img = await download_and_encode_image(attachment.url)
if base64_img:
description = await analyze_image_with_qwen(base64_img)
return f"[Image: {description}]"
# Handle videos and GIFs
elif any(attachment.filename.lower().endswith(ext) for ext in [".gif", ".mp4", ".webm", ".mov"]):
is_gif = attachment.filename.lower().endswith('.gif')
media_type = "GIF" if is_gif else "video"
logger.debug(f" Analyzing {media_type} for reaction: {attachment.filename}")
# Download media
media_bytes_b64 = await download_and_encode_media(attachment.url)
if not media_bytes_b64:
continue
import base64
media_bytes = base64.b64decode(media_bytes_b64)
# Convert GIF to MP4 if needed
if is_gif:
mp4_bytes = await convert_gif_to_mp4(media_bytes)
if mp4_bytes:
media_bytes = mp4_bytes
# Extract frames
frames = await extract_video_frames(media_bytes, num_frames=6)
if frames:
description = await analyze_video_with_vision(frames, media_type="gif" if is_gif else "video")
return f"[{media_type}: {description}]"
except Exception as e:
logger.warning(f" Error analyzing media for reaction: {e}")
continue
return None
async def miku_autonomous_reaction_for_server(guild_id: int, force_message=None, force=False):
"""Miku autonomously reacts to a recent message with an LLM-selected emoji
Args:
guild_id: The server ID
force_message: If provided, react to this specific message (for real-time reactions)
force: If True, bypass the 50% probability check (for manual triggers)
"""
# 50% chance to proceed (unless forced or with a specific message)
if not force and force_message is None and random.random() > 0.5:
logger.debug(f"Autonomous reaction skipped for server {guild_id} (50% chance)")
return
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
server_name = server_config.guild_name
# Don't react if asleep
if server_config.current_mood_name == "asleep" or server_config.is_sleeping:
logger.info(f"[{server_name}] Miku is asleep, skipping autonomous reaction")
return
# Get the autonomous channel
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if not channel:
logger.warning(f"[{server_name}] Autonomous channel not found")
return
try:
# If a specific message was provided, use it
if force_message:
target_message = force_message
# Check if we've already reacted to this message
if target_message.id in _reacted_message_ids:
logger.debug(f"[{server_name}] Already reacted to message {target_message.id}, skipping")
return
logger.info(f"[{server_name}] Reacting to new message from {target_message.author.display_name}")
else:
# Fetch recent messages (last 50 messages to get more candidates)
messages = []
async for message in channel.history(limit=50):
# Skip bot's own messages
if message.author == globals.client.user:
continue
# Skip messages we've already reacted to
if message.id in _reacted_message_ids:
continue
# Skip messages that are too old (more than 12 hours)
age = (datetime.now() - message.created_at.replace(tzinfo=None)).total_seconds()
if age > 43200: # 12 hours
continue
messages.append(message)
if not messages:
logger.debug(f"[{server_name}] No recent unreacted messages to react to")
return
# Pick a random message from the recent ones
target_message = random.choice(messages)
# Analyze any media in the message
logger.debug(f"[{server_name}] Analyzing message for reaction from {target_message.author.display_name}")
media_description = await _analyze_message_media(target_message)
# Build message content with media description if present
message_content = target_message.content[:200] # Limit text context length
if media_description:
# If there's media, prepend the description
message_content = f"{media_description} {message_content}".strip()
# Limit total length
message_content = message_content[:400]
# Ask LLM to select an appropriate emoji
prompt = (
f"You are Miku, a playful virtual idol on Discord. Someone just posted: \"{message_content}\"\n\n"
f"React with ONE emoji that captures your response! Be creative and expressive - don't just use 😊 or 👍. "
f"Think about:\n"
f"- What emotion does this make you feel? (use expressive emojis like 🤨, 😭, 🤯, 💀, etc.)\n"
f"- Is it funny? (try 💀, 😂, 🤡, 🪿, etc.)\n"
f"- Is it interesting? (try 👀, 🤔, 🧐, 😳, etc.)\n"
f"- Is it relatable? (try 😔, 🥺, 😩, 🙃, etc.)\n"
f"- Does it mention something specific? (match it with a relevant emoji like 🎮, 🍕, 🎸, etc.)\n\n"
f"Be bold! Use uncommon emojis! Respond with ONLY the emoji character itself, no text."
)
emoji = await query_llama(
prompt,
user_id=f"miku-reaction-{guild_id}", # Use consistent user_id
guild_id=guild_id,
response_type="emoji_selection"
)
# Clean up the response (remove any extra text)
original_response = emoji
emoji = emoji.strip()
# Remove common prefixes/quotes that LLM might add
emoji = emoji.replace('"', '').replace("'", '').replace('`', '')
emoji = emoji.replace(':', '') # Remove colons from :emoji: format
# Try to extract just emoji characters using regex
import re
emoji_pattern = re.compile("["
u"\U0001F300-\U0001F9FF" # Most emojis
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags
u"\U00002600-\U000027BF" # misc symbols
u"\U0001F900-\U0001F9FF" # supplemental symbols
u"\U00002700-\U000027BF" # dingbats
u"\U0001FA70-\U0001FAFF" # extended pictographs
u"\U00002300-\U000023FF" # misc technical
"]", flags=re.UNICODE)
# Find all individual emojis
emojis = emoji_pattern.findall(original_response)
if emojis:
# Take only the FIRST emoji
emoji = emojis[0]
else:
# No emoji found in response, use fallback
logger.warning(f"[{server_name}] LLM response contained no emoji: '{original_response[:50]}' - using fallback")
emoji = "💙"
# Final validation: try adding the reaction
try:
await target_message.add_reaction(emoji)
except discord.HTTPException as e:
if "Unknown Emoji" in str(e):
logger.warning(f"[{server_name}] Invalid emoji from LLM: '{original_response[:50]}' - using fallback")
emoji = "💙"
await target_message.add_reaction(emoji)
else:
raise
# Track this message ID to prevent duplicate reactions
_reacted_message_ids.add(target_message.id)
# Cleanup old message IDs (keep last 100 to prevent memory growth)
if len(_reacted_message_ids) > 100:
# Remove oldest half
ids_to_remove = list(_reacted_message_ids)[:50]
for msg_id in ids_to_remove:
_reacted_message_ids.discard(msg_id)
logger.info(f"[{server_name}] Autonomous reaction: Added {emoji} to message from {target_message.author.display_name}")
except discord.Forbidden:
logger.error(f"[{server_name}] Missing permissions to add reactions")
except discord.HTTPException as e:
logger.error(f"[{server_name}] Failed to add reaction: {e}")
except Exception as e:
logger.error(f"[{server_name}] Error in autonomous reaction: {e}")
async def miku_autonomous_reaction(force=False):
"""Legacy function - run autonomous reactions for all servers
Args:
force: If True, bypass the 50% probability check (for manual triggers)
"""
for guild_id in server_manager.servers:
await miku_autonomous_reaction_for_server(guild_id, force=force)
async def miku_autonomous_reaction_for_dm(user_id: int, force_message=None):
"""Miku autonomously reacts to a DM message with an LLM-selected emoji
Args:
user_id: The Discord user ID
force_message: If provided, react to this specific message (for real-time reactions)
"""
# 50% chance to proceed (unless forced with a specific message)
if force_message is None and random.random() > 0.5:
logger.debug(f"DM reaction skipped for user {user_id} (50% chance)")
return
# Get the user object
try:
user = await globals.client.fetch_user(user_id)
if not user:
logger.warning(f"Could not find user {user_id}")
return
dm_channel = user.dm_channel
if not dm_channel:
dm_channel = await user.create_dm()
username = user.display_name
except Exception as e:
logger.error(f"Error fetching DM channel for user {user_id}: {e}")
return
try:
# If a specific message was provided, use it
if force_message:
target_message = force_message
# Check if we've already reacted to this message
if target_message.id in _reacted_message_ids:
logger.debug(f"[DM: {username}] Already reacted to message {target_message.id}, skipping")
return
logger.info(f"[DM: {username}] Reacting to new message")
else:
# Fetch recent messages from DM (last 50 messages)
messages = []
async for message in dm_channel.history(limit=50):
# Skip bot's own messages
if message.author == globals.client.user:
continue
# Skip messages we've already reacted to
if message.id in _reacted_message_ids:
continue
# Skip messages that are too old (more than 12 hours)
age = (datetime.now() - message.created_at.replace(tzinfo=None)).total_seconds()
if age > 43200: # 12 hours
continue
messages.append(message)
if not messages:
logger.debug(f"[DM: {username}] No recent unreacted messages to react to")
return
# Pick a random message from the recent ones
target_message = random.choice(messages)
# Analyze any media in the message
logger.debug(f"[DM: {username}] Analyzing message for reaction")
media_description = await _analyze_message_media(target_message)
# Build message content with media description if present
message_content = target_message.content[:200] # Limit text context length
if media_description:
# If there's media, prepend the description
message_content = f"{media_description} {message_content}".strip()
# Limit total length
message_content = message_content[:400]
# Ask LLM to select an appropriate emoji
prompt = (
f"You are Miku, a playful virtual idol. Someone just sent you this DM: \"{message_content}\"\n\n"
f"React with ONE emoji that captures your response! Be creative and expressive - don't just use 😊 or 👍. "
f"Think about:\n"
f"- What emotion does this make you feel? (use expressive emojis like 🤨, 😭, 🤯, 💀, etc.)\n"
f"- Is it funny? (try 💀, 😂, 🤡, 🪿, etc.)\n"
f"- Is it interesting? (try 👀, 🤔, 🧐, 😳, etc.)\n"
f"- Is it relatable? (try 😔, 🥺, 😩, 🙃, etc.)\n"
f"- Does it mention something specific? (match it with a relevant emoji like 🎮, 🍕, 🎸, etc.)\n\n"
f"Be bold! Use uncommon emojis! Respond with ONLY the emoji character itself, no text."
)
emoji = await query_llama(
prompt,
user_id=f"miku-dm-reaction-{user_id}", # Use consistent user_id per DM user
guild_id=None, # DM doesn't have guild
response_type="emoji_selection"
)
# Clean up the response (remove any extra text)
original_response = emoji
emoji = emoji.strip()
# Remove common prefixes/quotes that LLM might add
emoji = emoji.replace('"', '').replace("'", '').replace('`', '')
emoji = emoji.replace(':', '') # Remove colons from :emoji: format
# Try to extract just emoji characters using regex
import re
emoji_pattern = re.compile("["
u"\U0001F300-\U0001F9FF" # Most emojis
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags
u"\U00002600-\U000027BF" # misc symbols
u"\U0001F900-\U0001F9FF" # supplemental symbols
u"\U00002700-\U000027BF" # dingbats
u"\U0001FA70-\U0001FAFF" # extended pictographs
u"\U00002300-\U000023FF" # misc technical
"]", flags=re.UNICODE)
# Find all individual emojis
emojis = emoji_pattern.findall(original_response)
if emojis:
# Take only the FIRST emoji
emoji = emojis[0]
else:
# No emoji found in response, use fallback
logger.warning(f"[DM: {username}] LLM response contained no emoji: '{original_response[:50]}' - using fallback")
emoji = "💙"
# Final validation: try adding the reaction
try:
await target_message.add_reaction(emoji)
except discord.HTTPException as e:
if "Unknown Emoji" in str(e):
logger.warning(f"[DM: {username}] Invalid emoji from LLM: '{original_response[:50]}' - using fallback")
emoji = "💙"
await target_message.add_reaction(emoji)
else:
raise
# Track this message ID to prevent duplicate reactions
_reacted_message_ids.add(target_message.id)
# Cleanup old message IDs (keep last 100 to prevent memory growth)
if len(_reacted_message_ids) > 100:
# Remove oldest half
ids_to_remove = list(_reacted_message_ids)[:50]
for msg_id in ids_to_remove:
_reacted_message_ids.discard(msg_id)
logger.info(f"[DM: {username}] Autonomous reaction: Added {emoji} to message")
except discord.Forbidden:
logger.error(f"[DM: {username}] Missing permissions to add reactions")
except discord.HTTPException as e:
logger.error(f"[DM: {username}] Failed to add reaction: {e}")
except Exception as e:
logger.error(f"[DM: {username}] Error in autonomous reaction: {e}")
async def miku_update_profile_picture_for_server(guild_id: int):
"""
Miku autonomously updates her profile picture by searching for artwork.
This is a global action (affects all servers) but is triggered by server context.
"""
from utils.profile_picture_manager import update_profile_picture, should_update_profile_picture
# Check if enough time has passed
if not should_update_profile_picture():
logger.debug(f"[Server: {guild_id}] Profile picture not ready for update yet")
return
# Get server config to use current mood
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
mood = server_config.current_mood_name
logger.info(f"[Server: {guild_id}] Attempting profile picture update (mood: {mood})")
try:
success = await update_profile_picture(globals.client, mood=mood)
if success:
# Announce the change in the autonomous channel
channel = globals.client.get_channel(server_config.autonomous_channel_id)
if channel:
messages = [
"*updates profile picture* ✨ What do you think? Does it suit me?",
"I found a new look! *twirls* Do you like it? 💚",
"*changes profile picture* Felt like switching things up today~ ✨",
"New profile pic! I thought this one was really cute 💚",
"*updates avatar* Time for a fresh look! ✨"
]
await channel.send(random.choice(messages))
logger.info(f"[Server: {guild_id}] Profile picture updated and announced!")
else:
logger.warning(f"[Server: {guild_id}] Profile picture update failed")
except Exception as e:
logger.error(f"[Server: {guild_id}] Error updating profile picture: {e}")