Files
miku-discord/bot/utils/image_handling.py

559 lines
21 KiB
Python
Raw Normal View History

2025-12-07 17:15:09 +02:00
# utils/image_handling.py
import aiohttp
import base64
import io
import tempfile
import os
import subprocess
from PIL import Image
import re
import globals
from utils.logger import get_logger
logger = get_logger('vision')
2025-12-07 17:15:09 +02:00
# No need for switch_model anymore - llama-swap handles this automatically
def _extract_vision_question(prompt: str):
"""
Strip Discord mentions and bot-name triggers from the user's message to
produce a clean question suitable for passing directly to the vision model.
Returns the cleaned question string, or None if nothing meaningful remains
(e.g. the message was just "@Miku" or "miku," with no actual question).
"""
if not prompt:
return None
# Remove Discord user/role mentions: <@123456789>, <@!123456789>
text = re.sub(r'<@[!&]?\d+>', '', prompt).strip()
# Strip common bot-name invocation prefixes at the very start (case-insensitive)
# e.g. "miku,", "hey miku,", "miku!", "Miku: "
text = re.sub(r'^(?:hey\s+)?miku[,!:\s]+', '', text, flags=re.IGNORECASE).strip()
# Drop any residual leading punctuation/whitespace
text = text.lstrip(',.!? ')
return text if text else None
2025-12-07 17:15:09 +02:00
async def download_and_encode_image(url):
"""Download and encode an image to base64."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
img_bytes = await resp.read()
return base64.b64encode(img_bytes).decode('utf-8')
async def download_and_encode_media(url):
"""Download and encode any media file (image, video, GIF) to base64."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return None
media_bytes = await resp.read()
return base64.b64encode(media_bytes).decode('utf-8')
async def extract_tenor_gif_url(tenor_url):
"""
Extract the actual GIF URL from a Tenor link.
Tenor URLs look like: https://tenor.com/view/...
We need to get the actual GIF file URL from the page or API.
"""
try:
# Try to extract GIF ID from URL
# Tenor URLs: https://tenor.com/view/name-name-12345678 or https://tenor.com/12345678.gif
match = re.search(r'tenor\.com/view/[^/]+-(\d+)', tenor_url)
if not match:
match = re.search(r'tenor\.com/(\d+)\.gif', tenor_url)
if not match:
logger.warning(f"Could not extract Tenor GIF ID from: {tenor_url}")
2025-12-07 17:15:09 +02:00
return None
gif_id = match.group(1)
# Tenor's direct media URL format (this works without API key)
# Try the media CDN URL directly
media_url = f"https://media.tenor.com/images/{gif_id}/tenor.gif"
# Verify the URL works
async with aiohttp.ClientSession() as session:
async with session.head(media_url) as resp:
if resp.status == 200:
logger.debug(f"Found Tenor GIF: {media_url}")
2025-12-07 17:15:09 +02:00
return media_url
# If that didn't work, try alternative formats
for fmt in ['tenor.gif', 'raw']:
alt_url = f"https://media.tenor.com/{gif_id}/{fmt}"
async with aiohttp.ClientSession() as session:
async with session.head(alt_url) as resp:
if resp.status == 200:
logger.debug(f"Found Tenor GIF (alternative): {alt_url}")
2025-12-07 17:15:09 +02:00
return alt_url
logger.warning(f"Could not find working Tenor media URL for ID: {gif_id}")
2025-12-07 17:15:09 +02:00
return None
except Exception as e:
logger.error(f"Error extracting Tenor GIF URL: {e}")
2025-12-07 17:15:09 +02:00
return None
async def convert_gif_to_mp4(gif_bytes):
"""
Convert a GIF to MP4 using ffmpeg for better compatibility with video processing.
Returns the MP4 bytes.
"""
try:
# Write GIF to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix='.gif') as temp_gif:
temp_gif.write(gif_bytes)
temp_gif_path = temp_gif.name
# Output MP4 path
temp_mp4_path = temp_gif_path.replace('.gif', '.mp4')
try:
# Convert GIF to MP4 with ffmpeg
# -movflags faststart makes it streamable
# -pix_fmt yuv420p ensures compatibility
# -vf scale makes sure dimensions are even (required for yuv420p)
ffmpeg_cmd = [
'ffmpeg', '-i', temp_gif_path,
'-movflags', 'faststart',
'-pix_fmt', 'yuv420p',
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
'-y',
temp_mp4_path
]
result = subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
# Read the MP4 file
with open(temp_mp4_path, 'rb') as f:
mp4_bytes = f.read()
logger.info(f"Converted GIF to MP4 ({len(gif_bytes)} bytes → {len(mp4_bytes)} bytes)")
2025-12-07 17:15:09 +02:00
return mp4_bytes
finally:
# Clean up temp files
if os.path.exists(temp_gif_path):
os.remove(temp_gif_path)
if os.path.exists(temp_mp4_path):
os.remove(temp_mp4_path)
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg error converting GIF to MP4: {e.stderr.decode()}")
2025-12-07 17:15:09 +02:00
return None
except Exception as e:
logger.error(f"Error converting GIF to MP4: {e}")
2025-12-07 17:15:09 +02:00
import traceback
traceback.print_exc()
return None
async def extract_video_frames(video_bytes, num_frames=4):
"""
Extract frames from a video or GIF for analysis.
Returns a list of base64-encoded frames.
"""
try:
# Try GIF first with PIL
try:
gif = Image.open(io.BytesIO(video_bytes))
if hasattr(gif, 'n_frames'):
frames = []
# Calculate step to get evenly distributed frames
total_frames = gif.n_frames
step = max(1, total_frames // num_frames)
for i in range(0, total_frames, step):
if len(frames) >= num_frames:
break
gif.seek(i)
frame = gif.convert('RGB')
# Convert to base64
buffer = io.BytesIO()
frame.save(buffer, format='JPEG')
frame_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
frames.append(frame_b64)
if frames:
return frames
except Exception as e:
logger.debug(f"Not a GIF, trying video extraction: {e}")
2025-12-07 17:15:09 +02:00
# For video files (MP4, WebM, etc.), use ffmpeg
import subprocess
import asyncio
# Write video bytes to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video:
temp_video.write(video_bytes)
temp_video_path = temp_video.name
try:
# Get video duration first
probe_cmd = [
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
temp_video_path
]
result = subprocess.run(probe_cmd, capture_output=True, text=True)
duration = float(result.stdout.strip())
# Calculate timestamps for evenly distributed frames
timestamps = [duration * i / num_frames for i in range(num_frames)]
frames = []
for i, timestamp in enumerate(timestamps):
# Extract frame at timestamp
output_path = f"/tmp/frame_{i}.jpg"
ffmpeg_cmd = [
'ffmpeg', '-ss', str(timestamp),
'-i', temp_video_path,
'-vframes', '1',
'-q:v', '2',
'-y',
output_path
]
subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
# Read and encode the frame
with open(output_path, 'rb') as f:
frame_bytes = f.read()
frame_b64 = base64.b64encode(frame_bytes).decode('utf-8')
frames.append(frame_b64)
# Clean up frame file
os.remove(output_path)
return frames
finally:
# Clean up temp video file
os.remove(temp_video_path)
except Exception as e:
logger.error(f"Error extracting frames: {e}")
2025-12-07 17:15:09 +02:00
import traceback
traceback.print_exc()
return None
async def analyze_image_with_vision(base64_img, user_prompt=None):
2025-12-07 17:15:09 +02:00
"""
Analyze an image using llama.cpp multimodal capabilities.
Uses OpenAI-compatible chat completions API with image_url.
2026-01-09 00:03:59 +02:00
Always uses NVIDIA GPU for vision model.
If user_prompt is provided (and contains a meaningful question after stripping
mentions/triggers), that question is sent to the vision model instead of the
generic "Describe this image in detail." prompt.
2025-12-07 17:15:09 +02:00
"""
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
# Check if vision endpoint is healthy before attempting request
is_healthy, error = await check_vision_endpoint_health()
if not is_healthy:
logger.warning(f"Vision endpoint unhealthy: {error}")
return f"Vision service currently unavailable: {error}"
2025-12-07 17:15:09 +02:00
question = _extract_vision_question(user_prompt)
vision_prompt_text = question if question else "Describe this image in detail."
logger.info(f"Vision prompt for image: {vision_prompt_text!r}")
2025-12-07 17:15:09 +02:00
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": vision_prompt_text
2025-12-07 17:15:09 +02:00
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_img}"
}
}
]
}
],
"stream": False,
"max_tokens": 800
2025-12-07 17:15:09 +02:00
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
try:
2026-01-09 00:03:59 +02:00
vision_url = get_vision_gpu_url()
logger.info(f"Sending vision request to {vision_url} using model: {globals.VISION_MODEL}")
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as response:
2025-12-07 17:15:09 +02:00
if response.status == 200:
data = await response.json()
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
logger.info(f"Vision analysis completed successfully")
return result
2025-12-07 17:15:09 +02:00
else:
error_text = await response.text()
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
2025-12-07 17:15:09 +02:00
return f"Error analyzing image: {response.status}"
except Exception as e:
logger.error(f"Error in analyze_image_with_vision: {e}", exc_info=True)
return f"Error analyzing image: {str(e)}"
2025-12-07 17:15:09 +02:00
async def analyze_video_with_vision(video_frames, media_type="video", user_prompt=None):
2025-12-07 17:15:09 +02:00
"""
Analyze a video or GIF by analyzing multiple frames.
video_frames: list of base64-encoded frames
media_type: "video", "gif", or "tenor_gif" to customize the analysis prompt
user_prompt: optional raw user message; the vision model will be asked to answer
the specific question instead of giving a generic description.
2025-12-07 17:15:09 +02:00
"""
from utils.llm import get_vision_gpu_url, check_vision_endpoint_health
# Check if vision endpoint is healthy before attempting request
is_healthy, error = await check_vision_endpoint_health()
if not is_healthy:
logger.warning(f"Vision endpoint unhealthy: {error}")
return f"Vision service currently unavailable: {error}"
2025-12-07 17:15:09 +02:00
# Customize prompt based on media type, overridden by user question if present
question = _extract_vision_question(user_prompt)
if question:
prompt_text = question
logger.info(f"Vision prompt for {media_type}: {prompt_text!r}")
elif media_type == "gif":
2025-12-07 17:15:09 +02:00
prompt_text = "Describe what's happening in this GIF animation. Analyze the sequence of frames and describe the action, motion, and any repeating patterns."
elif media_type == "tenor_gif":
prompt_text = "Describe what's happening in this animated GIF. Analyze the sequence of frames and describe the action, emotion, or reaction being shown."
else: # video
prompt_text = "Describe what's happening in this video. Analyze the sequence of frames and describe the action or motion."
# Build content with multiple images
content = [
{
"type": "text",
"text": prompt_text
}
]
# Add each frame as an image
for frame in video_frames:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{frame}"
}
})
payload = {
"model": globals.VISION_MODEL,
"messages": [
{
"role": "user",
"content": content
}
],
"stream": False,
"max_tokens": 1000
2025-12-07 17:15:09 +02:00
}
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
try:
2026-01-09 00:03:59 +02:00
vision_url = get_vision_gpu_url()
logger.info(f"Sending video analysis request to {vision_url} using model: {globals.VISION_MODEL} (media_type: {media_type}, frames: {len(video_frames)})")
async with session.post(f"{vision_url}/v1/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response:
2025-12-07 17:15:09 +02:00
if response.status == 200:
data = await response.json()
result = data.get("choices", [{}])[0].get("message", {}).get("content", "No description.")
logger.info(f"Video analysis completed successfully")
return result
2025-12-07 17:15:09 +02:00
else:
error_text = await response.text()
logger.error(f"Vision API error: {response.status} - {error_text} (endpoint: {vision_url})")
2025-12-07 17:15:09 +02:00
return f"Error analyzing video: {response.status}"
except Exception as e:
logger.error(f"Error in analyze_video_with_vision: {e}", exc_info=True)
2025-12-07 17:15:09 +02:00
return f"Error analyzing video: {str(e)}"
async def rephrase_as_miku(vision_output, user_prompt, guild_id=None, user_id=None, author_name=None, media_type="image"):
"""
Rephrase vision model's image analysis as Miku would respond to it.
Routes through Cheshire Cat pipeline for memory-augmented responses,
falling back to direct query_llama() if Cat is unavailable.
2025-12-07 17:15:09 +02:00
Args:
vision_output: Description from vision model
user_prompt: User's original message
guild_id: Guild ID for server context (None for DMs)
user_id: User ID for conversation history
author_name: Display name of the user
media_type: Type of media ("image", "video", "gif", or "tenor_gif")
"""
from utils.llm import query_llama
# Format the user's message to include vision context with media type
# This will be saved to history automatically by query_llama
if media_type == "gif":
media_prefix = "Looking at a GIF"
elif media_type == "tenor_gif":
media_prefix = "Looking at a Tenor GIF"
elif media_type == "video":
media_prefix = "Looking at a video"
else: # image
media_prefix = "Looking at an image"
if user_prompt:
# Include media type, vision description, and user's text
formatted_prompt = f"[{media_prefix}: {vision_output}] {user_prompt}"
else:
# If no text, just the vision description with media type
formatted_prompt = f"[{media_prefix}: {vision_output}]"
# Use the standard LLM query with appropriate response type
response_type = "dm_response" if guild_id is None else "server_response"
# Use the actual user_id for history tracking, fall back to "image_analysis" for backward compatibility
history_user_id = user_id if user_id else "image_analysis"
# Determine current mood for Cat pipeline
current_mood = globals.DM_MOOD
if guild_id:
try:
from server_manager import server_manager
sc = server_manager.get_server_config(guild_id)
if sc:
current_mood = sc.current_mood_name
except Exception:
pass
# Phase 3: Try Cheshire Cat pipeline first (memory-augmented response)
# This allows image interactions to be stored in episodic memory and
# benefit from declarative memory recall, just like text messages.
response = None
if globals.USE_CHESHIRE_CAT:
try:
from utils.cat_client import cat_adapter
cat_result = await cat_adapter.query(
text=formatted_prompt,
user_id=history_user_id,
guild_id=str(guild_id) if guild_id else None,
author_name=author_name,
mood=current_mood,
response_type=response_type,
media_type=media_type,
)
if cat_result:
response, cat_full_prompt = cat_result
effective_mood = current_mood
if globals.EVIL_MODE:
effective_mood = f"EVIL:{getattr(globals, 'EVIL_DM_MOOD', 'evil_neutral')}"
logger.info(f"🐱 Cat {media_type} response for {author_name} (mood: {effective_mood})")
# Track Cat interaction for Web UI Last Prompt view
import datetime
globals.LAST_CAT_INTERACTION = {
"full_prompt": cat_full_prompt,
"response": response[:500] if response else "",
"user": author_name or history_user_id,
"mood": effective_mood,
"timestamp": datetime.datetime.now().isoformat(),
}
except Exception as e:
logger.warning(f"🐱 Cat {media_type} pipeline error, falling back to query_llama: {e}")
response = None
# Fallback to direct LLM query if Cat didn't respond
if not response:
response = await query_llama(
formatted_prompt,
user_id=history_user_id,
guild_id=guild_id,
response_type=response_type,
author_name=author_name,
media_type=media_type # Pass media type to Miku's LLM
)
return response
2025-12-07 17:15:09 +02:00
# Backward compatibility aliases
analyze_image_with_qwen = analyze_image_with_vision
async def extract_embed_content(embed):
"""
Extract text and media content from a Discord embed.
Returns a dictionary with:
- 'text': combined text from title, description, fields
- 'images': list of image URLs
- 'videos': list of video URLs
- 'has_content': boolean indicating if there's any content
"""
content = {
'text': '',
'images': [],
'videos': [],
'has_content': False
}
text_parts = []
# Extract text content
if embed.title:
text_parts.append(f"**{embed.title}**")
if embed.description:
text_parts.append(embed.description)
if embed.author and embed.author.name:
text_parts.append(f"Author: {embed.author.name}")
if embed.fields:
for field in embed.fields:
text_parts.append(f"**{field.name}**: {field.value}")
if embed.footer and embed.footer.text:
text_parts.append(f"_{embed.footer.text}_")
# Combine text
content['text'] = '\n\n'.join(text_parts)
# Extract image URLs
if embed.image and embed.image.url:
content['images'].append(embed.image.url)
if embed.thumbnail and embed.thumbnail.url:
content['images'].append(embed.thumbnail.url)
# Extract video URLs
if embed.video and embed.video.url:
content['videos'].append(embed.video.url)
# Check if we have any content
content['has_content'] = bool(content['text'] or content['images'] or content['videos'])
return content