Files
miku-discord/bot/utils/voice_manager.py

383 lines
15 KiB
Python

# voice_manager.py
"""
Voice session manager for Miku Discord bot.
Handles Discord voice channel connections, resource locking, and feature blocking during voice sessions.
During a voice session:
- GPU switches to AMD for text inference only
- Vision model is blocked (keeps GTX 1660 for TTS)
- Image generation is blocked
- Bipolar mode interactions are disabled
- Profile picture switching is locked
- Autonomous engine is paused
- Scheduled events are paused
- Text channels are paused (messages queued)
"""
import asyncio
import json
import os
from typing import Optional
import discord
import globals
from utils.logger import get_logger
logger = get_logger('voice_manager')
class VoiceSessionManager:
"""
Singleton manager for voice chat sessions.
Ensures only one voice session active at a time and manages all resource locks.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.active_session: Optional['VoiceSession'] = None
self.session_lock = asyncio.Lock()
self._initialized = True
logger.info("VoiceSessionManager initialized")
async def start_session(self, guild_id: int, voice_channel: discord.VoiceChannel, text_channel: discord.TextChannel):
"""
Start a voice session with full resource locking.
Args:
guild_id: Discord guild ID
voice_channel: Voice channel to join
text_channel: Text channel for voice prompts
Raises:
Exception: If session already active or resources can't be locked
"""
async with self.session_lock:
if self.active_session:
raise Exception("Voice session already active")
logger.info(f"Starting voice session in {voice_channel.name} (guild {guild_id})")
try:
# 1. Switch to AMD GPU for text inference
await self._switch_to_amd_gpu()
# 2. Block vision model loading
await self._block_vision_model()
# 3. Disable image generation (ComfyUI)
await self._disable_image_generation()
# 4. Pause text channel inference (queue messages)
await self._pause_text_channels()
# 5. Disable bipolar mode interactions (Miku/Evil Miku arguments)
await self._disable_bipolar_mode()
# 6. Disable profile picture switching
await self._disable_profile_picture_switching()
# 7. Pause autonomous engine
await self._pause_autonomous_engine()
# 8. Pause scheduled events
await self._pause_scheduled_events()
# 9. Pause figurine notifier
await self._pause_figurine_notifier()
# 10. Create voice session
self.active_session = VoiceSession(guild_id, voice_channel, text_channel)
# 11. Connect to Discord voice channel
try:
voice_client = await voice_channel.connect()
self.active_session.voice_client = voice_client
self.active_session.active = True
logger.info(f"✓ Connected to voice channel: {voice_channel.name}")
except Exception as e:
logger.error(f"Failed to connect to voice channel: {e}", exc_info=True)
raise
logger.info(f"✓ Voice session started successfully")
except Exception as e:
logger.error(f"Failed to start voice session: {e}", exc_info=True)
# Cleanup on failure
await self._cleanup_failed_start()
raise
async def end_session(self):
"""
End voice session and release all resources.
"""
async with self.session_lock:
if not self.active_session:
logger.warning("No active voice session to end")
return
logger.info("Ending voice session...")
try:
# 1. Disconnect from voice channel
if self.active_session.voice_client:
try:
await self.active_session.voice_client.disconnect()
logger.info("✓ Disconnected from voice channel")
except Exception as e:
logger.error(f"Error disconnecting from voice: {e}")
# 2. Resume text channel inference
await self._resume_text_channels()
# 3. Unblock vision model
await self._unblock_vision_model()
# 4. Re-enable image generation
await self._enable_image_generation()
# 5. Re-enable bipolar mode interactions
await self._enable_bipolar_mode()
# 6. Re-enable profile picture switching
await self._enable_profile_picture_switching()
# 7. Resume autonomous engine
await self._resume_autonomous_engine()
# 8. Resume scheduled events
await self._resume_scheduled_events()
# 9. Resume figurine notifier
await self._resume_figurine_notifier()
# 10. Clear active session
self.active_session = None
logger.info("✓ Voice session ended successfully, all resources released")
except Exception as e:
logger.error(f"Error during session cleanup: {e}", exc_info=True)
# Force clear session even on error
self.active_session = None
raise
# ==================== Resource Locking Methods ====================
async def _switch_to_amd_gpu(self):
"""Switch text inference to AMD GPU (RX 6800)"""
try:
gpu_state_file = os.path.join("memory", "gpu_state.json")
os.makedirs("memory", exist_ok=True)
with open(gpu_state_file, "w") as f:
json.dump({"current_gpu": "amd", "reason": "voice_session"}, f)
logger.info("✓ Switched to AMD GPU for text inference")
except Exception as e:
logger.error(f"Failed to switch GPU: {e}")
raise
async def _block_vision_model(self):
"""Prevent vision model from loading during voice session"""
globals.VISION_MODEL_BLOCKED = True
logger.info("✓ Vision model blocked")
async def _unblock_vision_model(self):
"""Allow vision model to load after voice session"""
globals.VISION_MODEL_BLOCKED = False
logger.info("✓ Vision model unblocked")
async def _disable_image_generation(self):
"""Block ComfyUI image generation during voice session"""
globals.IMAGE_GENERATION_BLOCKED = True
globals.IMAGE_GENERATION_BLOCK_MESSAGE = (
"🎤 I can't draw right now, I'm talking in voice chat! "
"Ask me again after I leave the voice channel."
)
logger.info("✓ Image generation disabled")
async def _enable_image_generation(self):
"""Re-enable image generation after voice session"""
globals.IMAGE_GENERATION_BLOCKED = False
globals.IMAGE_GENERATION_BLOCK_MESSAGE = None
logger.info("✓ Image generation re-enabled")
async def _pause_text_channels(self):
"""Queue text messages instead of processing during voice session"""
globals.VOICE_SESSION_ACTIVE = True
globals.TEXT_MESSAGE_QUEUE = []
logger.info("✓ Text channels paused (messages will be queued)")
async def _resume_text_channels(self):
"""Process queued messages after voice session"""
globals.VOICE_SESSION_ACTIVE = False
queued_count = len(globals.TEXT_MESSAGE_QUEUE)
if queued_count > 0:
logger.info(f"Resuming text channels, {queued_count} messages queued")
# TODO: Process queue in Phase 2 (need message handler integration)
# For now, just clear the queue
globals.TEXT_MESSAGE_QUEUE = []
logger.warning(f"Discarded {queued_count} queued messages (queue processing not yet implemented)")
else:
logger.info("✓ Text channels resumed (no queued messages)")
async def _disable_bipolar_mode(self):
"""Prevent Miku/Evil Miku arguments during voice session"""
try:
from utils.bipolar_mode import pause_bipolar_interactions
pause_bipolar_interactions()
logger.info("✓ Bipolar mode interactions disabled")
except ImportError:
logger.warning("bipolar_mode module not found, skipping")
except AttributeError:
logger.warning("pause_bipolar_interactions not implemented yet, skipping")
async def _enable_bipolar_mode(self):
"""Re-enable Miku/Evil Miku arguments after voice session"""
try:
from utils.bipolar_mode import resume_bipolar_interactions
resume_bipolar_interactions()
logger.info("✓ Bipolar mode interactions re-enabled")
except ImportError:
logger.warning("bipolar_mode module not found, skipping")
except AttributeError:
logger.warning("resume_bipolar_interactions not implemented yet, skipping")
async def _disable_profile_picture_switching(self):
"""Lock profile picture during voice session"""
try:
from utils.profile_picture_manager import profile_picture_manager
if hasattr(profile_picture_manager, 'lock_switching'):
profile_picture_manager.lock_switching()
logger.info("✓ Profile picture switching disabled")
else:
logger.warning("profile_picture_manager.lock_switching not implemented yet, skipping")
except ImportError:
logger.warning("profile_picture_manager module not found, skipping")
async def _enable_profile_picture_switching(self):
"""Unlock profile picture after voice session"""
try:
from utils.profile_picture_manager import profile_picture_manager
if hasattr(profile_picture_manager, 'unlock_switching'):
profile_picture_manager.unlock_switching()
logger.info("✓ Profile picture switching re-enabled")
else:
logger.warning("profile_picture_manager.unlock_switching not implemented yet, skipping")
except ImportError:
logger.warning("profile_picture_manager module not found, skipping")
async def _pause_autonomous_engine(self):
"""Pause autonomous message generation during voice session"""
try:
from utils.autonomous import pause_autonomous_system
pause_autonomous_system()
logger.info("✓ Autonomous engine paused")
except ImportError:
logger.warning("autonomous module not found, skipping")
except AttributeError:
logger.warning("pause_autonomous_system not implemented yet, skipping")
async def _resume_autonomous_engine(self):
"""Resume autonomous message generation after voice session"""
try:
from utils.autonomous import resume_autonomous_system
resume_autonomous_system()
logger.info("✓ Autonomous engine resumed")
except ImportError:
logger.warning("autonomous module not found, skipping")
except AttributeError:
logger.warning("resume_autonomous_system not implemented yet, skipping")
async def _pause_scheduled_events(self):
"""Pause all scheduled jobs during voice session"""
try:
globals.scheduler.pause()
logger.info("✓ Scheduled events paused")
except Exception as e:
logger.error(f"Failed to pause scheduler: {e}")
async def _resume_scheduled_events(self):
"""Resume scheduled jobs after voice session"""
try:
globals.scheduler.resume()
logger.info("✓ Scheduled events resumed")
except Exception as e:
logger.error(f"Failed to resume scheduler: {e}")
async def _pause_figurine_notifier(self):
"""Pause figurine notifications during voice session"""
try:
# Assuming figurine notifier is a scheduled job
globals.scheduler.pause_job('figurine_notifier')
logger.info("✓ Figurine notifier paused")
except Exception as e:
# Job might not exist, that's okay
logger.debug(f"Could not pause figurine notifier (may not exist): {e}")
async def _resume_figurine_notifier(self):
"""Resume figurine notifications after voice session"""
try:
globals.scheduler.resume_job('figurine_notifier')
logger.info("✓ Figurine notifier resumed")
except Exception as e:
# Job might not exist, that's okay
logger.debug(f"Could not resume figurine notifier (may not exist): {e}")
async def _cleanup_failed_start(self):
"""Cleanup resources if session start fails"""
logger.warning("Cleaning up after failed session start...")
try:
# Disconnect from voice if connected
if self.active_session and self.active_session.voice_client:
try:
await self.active_session.voice_client.disconnect()
except:
pass
await self._unblock_vision_model()
await self._enable_image_generation()
await self._resume_text_channels()
await self._enable_bipolar_mode()
await self._enable_profile_picture_switching()
await self._resume_autonomous_engine()
await self._resume_scheduled_events()
await self._resume_figurine_notifier()
# Clear the session
self.active_session = None
except Exception as e:
logger.error(f"Error during cleanup: {e}")
class VoiceSession:
"""
Represents an active voice chat session.
Phase 1: Basic structure only, voice connection in Phase 2.
"""
def __init__(self, guild_id: int, voice_channel: discord.VoiceChannel, text_channel: discord.TextChannel):
self.guild_id = guild_id
self.voice_channel = voice_channel
self.text_channel = text_channel
self.voice_client: Optional[discord.VoiceClient] = None
self.active = False
logger.info(f"VoiceSession created for {voice_channel.name} in guild {guild_id}")
# Phase 2: Implement voice connection, audio streaming, TTS integration
# Global singleton instance
voice_manager = VoiceSessionManager()