Files
miku-discord/bot/utils/image_handling.py
koko210Serve d5b9964ce7 Fix vision pipeline: route images through Cat, pass user question to vision model
- Fix silent None return in analyze_image_with_vision exception handler
- Add None/empty guards after vision analysis in bot.py (image, video, GIF, Tenor)
- Route all image/video/GIF responses through Cheshire Cat pipeline (was
  calling query_llama directly), enabling episodic memory storage for media
  interactions and correct Last Prompt display in Web UI
- Add media_type parameter to cat_adapter.query() and forward as
  discord_media_type in WebSocket payload
- Update discord_bridge plugin to read media_type from payload and inject
  MEDIA NOTE into system prefix in before_agent_starts hook
- Add _extract_vision_question() helper to strip Discord mentions and bot-name
  triggers from user message; pass cleaned question to vision model so specific
  questions (e.g. 'what is the person wearing?') go directly to the vision model
  instead of the generic 'Describe this image in detail.' fallback
- Pass user_prompt to all analyze_image_with_qwen / analyze_video_with_vision
  call sites in bot.py (image, video, GIF, Tenor, embed paths)
- Fix autonomous reaction loops skipping messages that @mention the bot or have
  media attachments in DMs, preventing duplicate vision model calls for images
  already being processed by the main message handler
- Increase vision max_tokens: images 300->800, video/GIF 400->1000 (no VRAM
  impact; KV cache is pre-allocated at model load time)
2026-03-05 21:59:27 +02:00

559 lines
21 KiB
Python

# utils/image_handling.py
import aiohttp
import base64
import io
import tempfile
import os
import subprocess
from PIL import Image
import re
import globals
from utils.logger import get_logger
logger = get_logger('vision')
# No need for switch_model anymore - llama-swap handles this automatically
def _extract_vision_question(prompt: str):
"""
Strip Discord mentions and bot-name triggers from the user's message to
produce a clean question suitable for passing directly to the vision model.
Returns the cleaned question string, or None if nothing meaningful remains
(e.g. the message was just "@Miku" or "miku," with no actual question).
"""
if not prompt:
return None
# Remove Discord user/role mentions: <@123456789>, <@!123456789>
text = re.sub(r'<@[!&]?\d+>', '', prompt).strip()
# Strip common bot-name invocation prefixes at the very start (case-insensitive)
# e.g. "miku,", "hey miku,", "miku!", "Miku: "
text = re.sub(r'^(?:hey\s+)?miku[,!:\s]+', '', text, flags=re.IGNORECASE).strip()
# Drop any residual leading punctuation/whitespace
text = text.lstrip(',.!? ')
return text if text else None
async def download_and_encode_image(url):
"""Download and encode an image to base64."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
img_bytes = await resp.read()
return base64.b64encode(img_bytes).decode('utf-8')
async def download_and_encode_media(url):
"""Download and encode any media file (image, video, GIF) to base64."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
media_bytes = await resp.read()
return base64.b64encode(media_bytes).decode('utf-8')
async def extract_tenor_gif_url(tenor_url):
"""
Extract the actual GIF URL from a Tenor link.
Tenor URLs look like: https://tenor.com/view/...
We need to get the actual GIF file URL from the page or API.
"""
try:
# Try to extract GIF ID from URL
# Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif
match = re.search(r'tenor\.com/view/[^/]+-(\d+)', tenor_url)
if not match:
match = re.search(r'tenor\.com/(\d+)\.gif', tenor_url)
if not match:
logger.warning(f"Could not extract Tenor GIF ID from: {tenor_url}")
return None
gif_id = match.group(1)
# Tenor's direct media URL format (this works without API key)
# Try the media CDN URL directly
media_url = f"https://media.tenor.com/images/{gif_id}/tenor.gif"
# Verify the URL works
async with aiohttp.ClientSession() as session:
async with session.head(media_url) as resp:
if resp.status == 200:
logger.debug(f"Found Tenor GIF: {media_url}")
return media_url
# If that didn't work, try alternative formats
for fmt in ['tenor.gif', 'raw']:
alt_url = f"https://media.tenor.com/{gif_id}/{fmt}"
async with aiohttp.ClientSession() as session:
async with session.head(alt_url) as resp:
if resp.status == 200:
logger.debug(f"Found Tenor GIF (alternative): {alt_url}")
return alt_url
logger.warning(f"Could not find working Tenor media URL for ID: {gif_id}")
return None
except Exception as e:
logger.error(f"Error extracting Tenor GIF URL: {e}")
return None
async def convert_gif_to_mp4(gif_bytes):
"""
Convert a GIF to MP4 using ffmpeg for better compatibility with video processing.
Returns the MP4 bytes.
"""
try:
# Write GIF to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix='.gif') as temp_gif:
temp_gif.write(gif_bytes)
temp_gif_path = temp_gif.name
# Output MP4 path
temp_mp4_path = temp_gif_path.replace('.gif', '.mp4')
try:
# Convert GIF to MP4 with ffmpeg
# -movflags faststart makes it streamable
# -pix_fmt yuv420p ensures compatibility
# -vf scale makes sure dimensions are even (required for yuv420p)
ffmpeg_cmd = [
'ffmpeg', '-i', temp_gif_path,
'-movflags', 'faststart',
'-pix_fmt', 'yuv420p',
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
'-y',
temp_mp4_path
]
result = subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
# Read the MP4 file
with open(temp_mp4_path, 'rb') as f:
mp4_bytes = f.read()
logger.info(f"Converted GIF to MP4 ({len(gif_bytes)} bytes → {len(mp4_bytes)} bytes)")
return mp4_bytes
finally:
# Clean up temp files
if os.path.exists(temp_gif_path):
os.remove(temp_gif_path)
if os.path.exists(temp_mp4_path):
os.remove(temp_mp4_path)
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg error converting GIF to MP4: {e.stderr.decode()}")
return None
except Exception as e:
logger.error(f"Error converting GIF to MP4: {e}")
import traceback
traceback.print_exc()
return None
async def extract_video_frames(video_bytes, num_frames=4):
"""
Extract frames from a video or GIF for analysis.
Returns a list of base64-encoded frames.
"""
try:
# Try GIF first with PIL
try:
gif = Image.open(io.BytesIO(video_bytes))
if hasattr(gif, 'n_frames'):
frames = []
# Calculate step to get evenly distributed frames
total_frames = gif.n_frames
step = max(1, total_frames // num_frames)
for i in range(0, total_frames, step):
if len(frames) >= num_frames:
break
gif.seek(i)
frame = gif.convert('RGB')
# Convert to base64
buffer = io.BytesIO()
frame.save(buffer, format='JPEG')
frame_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
frames.append(frame_b64)
if frames:
return frames
except Exception as e:
logger.debug(f"Not a GIF, trying video extraction: {e}")
# For video files (MP4, WebM, etc.), use ffmpeg
import subprocess
import asyncio
# Write video bytes to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video:
temp_video.write(video_bytes)
temp_video_path = temp_video.name
try:
# Get video duration first
probe_cmd = [
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
temp_video_path
]
result = subprocess.run(probe_cmd, capture_output=True, text=True)
duration = float(result.stdout.strip())
# Calculate timestamps for evenly distributed frames
timestamps = [duration * i / num_frames for i in range(num_frames)]
frames = []
for i, timestamp in enumerate(timestamps):
# Extract frame at timestamp
output_path = f"/tmp/frame_{i}.jpg"
ffmpeg_cmd = [
'ffmpeg', '-ss', str(timestamp),
'-i', temp_video_path,
'-vframes', '1',
'-q:v', '2',
'-y',
output_path
]
subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
# Read and encode the frame
with open(output_path, 'rb') as f:
frame_bytes = f.read()
frame_b64 = base64.b64encode(frame_bytes).decode('utf-8')
frames.append(frame_b64)
# Clean up frame file
os.remove(output_path)
return frames
finally:
# Clean up temp video file
os.remove(temp_video_path)
except Exception as e:
logger.error(f"Error extracting frames: {e}")
import traceback
traceback.print_exc()
return None
async def analyze_image_with_vision(base64_img, user_prompt=None):
"""
Analyze an image using llama.cpp multimodal capabilities.
Uses OpenAI-compatible chat completions API with image_url.
Always uses NVIDIA GPU for vision model.
If user_prompt is provided (and contains a meaningful question after stripping
mentions/triggers), that question is sent to the vision model instead of the
generic "Describe this image in detail." prompt.
"""
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
# Check if vision endpoint is healthy before attempting request
is_healthy, error = await check_vision_endpoint_health()
if not is_healthy:
logger.warning(f"Vision endpoint unhealthy: {error}")
return f"Vision service currently unavailable: {error}"
question = _extract_vision_question(user_prompt)
vision_prompt_text = question if question else "Describe this image in detail."
logger.info(f"Vision prompt for image: {vision_prompt_text!r}")
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": vision_prompt_text
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_img}"
}
}
]
}
],
"stream": False,
"max_tokens": 800
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
try:
vision_url = get_vision_gpu_url()
logger.info(f"Sending vision request to {vision_url} using model: {globals.VISION_MODEL}")
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as response:
if response.status == 200:
data = await response.json()
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
logger.info(f"Vision analysis completed successfully")
return result
else:
error_text = await response.text()
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
return f"Error analyzing image: {response.status}"
except Exception as e:
logger.error(f"Error in analyze_image_with_vision: {e}", exc_info=True)
return f"Error analyzing image: {str(e)}"
async def analyze_video_with_vision(video_frames, media_type="video", user_prompt=None):
"""
Analyze a video or GIF by analyzing multiple frames.
video_frames: list of base64-encoded frames
media_type: "video", "gif", or "tenor_gif" to customize the analysis prompt
user_prompt: optional raw user message; the vision model will be asked to answer
the specific question instead of giving a generic description.
"""
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
# Check if vision endpoint is healthy before attempting request
is_healthy, error = await check_vision_endpoint_health()
if not is_healthy:
logger.warning(f"Vision endpoint unhealthy: {error}")
return f"Vision service currently unavailable: {error}"
# Customize prompt based on media type, overridden by user question if present
question = _extract_vision_question(user_prompt)
if question:
prompt_text = question
logger.info(f"Vision prompt for {media_type}: {prompt_text!r}")
elif media_type == "gif":
prompt_text = "Describe what's happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns."
elif media_type == "tenor_gif":
prompt_text = "Describe what's happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown."
else: # video
prompt_text = "Describe what's happening in this video. Analyze the sequence of frames and describe the action or motion."
# Build content with multiple images
content = [
{
"type": "text",
"text": prompt_text
}
]
# Add each frame as an image
for frame in video_frames:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{frame}"
}
})
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": content
}
],
"stream": False,
"max_tokens": 1000
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
try:
vision_url = get_vision_gpu_url()
logger.info(f"Sending video analysis request to {vision_url} using model: {globals.VISION_MODEL} (media_type: {media_type}, frames: {len(video_frames)})")
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response:
if response.status == 200:
data = await response.json()
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
logger.info(f"Video analysis completed successfully")
return result
else:
error_text = await response.text()
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
return f"Error analyzing video: {response.status}"
except Exception as e:
logger.error(f"Error in analyze_video_with_vision: {e}", exc_info=True)
return f"Error analyzing video: {str(e)}"
async def rephrase_as_miku(vision_output, user_prompt, guild_id=None, user_id=None, author_name=None, media_type="image"):
"""
Rephrase vision model's image analysis as Miku would respond to it.
Routes through Cheshire Cat pipeline for memory-augmented responses,
falling back to direct query_llama() if Cat is unavailable.
Args:
vision_output: Description from vision model
user_prompt: User's original message
guild_id: Guild ID for server context (None for DMs)
user_id: User ID for conversation history
author_name: Display name of the user
media_type: Type of media ("image", "video", "gif", or "tenor_gif")
"""
from utils.llm import query_llama
# Format the user's message to include vision context with media type
# This will be saved to history automatically by query_llama
if media_type == "gif":
media_prefix = "Looking at a GIF"
elif media_type == "tenor_gif":
media_prefix = "Looking at a Tenor GIF"
elif media_type == "video":
media_prefix = "Looking at a video"
else: # image
media_prefix = "Looking at an image"
if user_prompt:
# Include media type, vision description, and user's text
formatted_prompt = f"[{media_prefix}: {vision_output}] {user_prompt}"
else:
# If no text, just the vision description with media type
formatted_prompt = f"[{media_prefix}: {vision_output}]"
# Use the standard LLM query with appropriate response type
response_type = "dm_response" if guild_id is None else "server_response"
# Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility
history_user_id = user_id if user_id else "image_analysis"
# Determine current mood for Cat pipeline
current_mood = globals.DM_MOOD
if guild_id:
try:
from server_manager import server_manager
sc = server_manager.get_server_config(guild_id)
if sc:
current_mood = sc.current_mood_name
except Exception:
pass
# Phase 3: Try Cheshire Cat pipeline first (memory-augmented response)
# This allows image interactions to be stored in episodic memory and
# benefit from declarative memory recall, just like text messages.
response = None
if globals.USE_CHESHIRE_CAT:
try:
from utils.cat_client import cat_adapter
cat_result = await cat_adapter.query(
text=formatted_prompt,
user_id=history_user_id,
guild_id=str(guild_id) if guild_id else None,
author_name=author_name,
mood=current_mood,
response_type=response_type,
media_type=media_type,
)
if cat_result:
response, cat_full_prompt = cat_result
effective_mood = current_mood
if globals.EVIL_MODE:
effective_mood = f"EVIL:{getattr(globals, 'EVIL_DM_MOOD', 'evil_neutral')}"
logger.info(f"🐱 Cat {media_type} response for {author_name} (mood: {effective_mood})")
# Track Cat interaction for Web UI Last Prompt view
import datetime
globals.LAST_CAT_INTERACTION = {
"full_prompt": cat_full_prompt,
"response": response[:500] if response else "",
"user": author_name or history_user_id,
"mood": effective_mood,
"timestamp": datetime.datetime.now().isoformat(),
}
except Exception as e:
logger.warning(f"🐱 Cat {media_type} pipeline error, falling back to query_llama: {e}")
response = None
# Fallback to direct LLM query if Cat didn't respond
if not response:
response = await query_llama(
formatted_prompt,
user_id=history_user_id,
guild_id=guild_id,
response_type=response_type,
author_name=author_name,
media_type=media_type # Pass media type to Miku's LLM
)
return response
# Backward compatibility aliases
analyze_image_with_qwen = analyze_image_with_vision
async def extract_embed_content(embed):
"""
Extract text and media content from a Discord embed.
Returns a dictionary with:
- 'text': combined text from title, description, fields
- 'images': list of image URLs
- 'videos': list of video URLs
- 'has_content': boolean indicating if there's any content
"""
content = {
'text': '',
'images': [],
'videos': [],
'has_content': False
}
text_parts = []
# Extract text content
if embed.title:
text_parts.append(f"**{embed.title}**")
if embed.description:
text_parts.append(embed.description)
if embed.author and embed.author.name:
text_parts.append(f"Author: {embed.author.name}")
if embed.fields:
for field in embed.fields:
text_parts.append(f"**{field.name}**: {field.value}")
if embed.footer and embed.footer.text:
text_parts.append(f"_{embed.footer.text}_")
# Combine text
content['text'] = '\n\n'.join(text_parts)
# Extract image URLs
if embed.image and embed.image.url:
content['images'].append(embed.image.url)
if embed.thumbnail and embed.thumbnail.url:
content['images'].append(embed.thumbnail.url)
# Extract video URLs
if embed.video and embed.video.url:
content['videos'].append(embed.video.url)
# Check if we have any content
content['has_content'] = bool(content['text'] or content['images'] or content['videos'])
return content