feat: add PFP album/gallery system with batch upload, cropping, and disk management
- Backend: album storage in memory/profile_pictures/album/{uuid}/ with
original.png, cropped.png, and metadata.json per entry
- add_to_album/add_batch_to_album with efficient resource management
(vision model + face detector kept alive across batch)
- set_album_entry_as_current auto-archives current PFP before replacing
- manual/auto crop album entries without applying to Discord
- Disk usage tracking, single & bulk delete
- API: full CRUD endpoints under /profile-picture/album/*
- Frontend: collapsible album grid in tab11 with thumbnail cards,
multi-select checkboxes for bulk delete, detail panel with crop
interface (Cropper.js), description editor, set-as-current action
This commit is contained in:
@@ -13,6 +13,8 @@ Supports both static images and animated GIFs:
|
||||
|
||||
import os
|
||||
import io
|
||||
import uuid
|
||||
import shutil
|
||||
import aiohttp
|
||||
import asyncio
|
||||
from PIL import Image, ImageDraw
|
||||
@@ -39,6 +41,7 @@ class ProfilePictureManager:
|
||||
CURRENT_PATH = "memory/profile_pictures/current.png"
|
||||
ORIGINAL_PATH = "memory/profile_pictures/original.png"
|
||||
METADATA_PATH = "memory/profile_pictures/metadata.json"
|
||||
ALBUM_DIR = "memory/profile_pictures/album"
|
||||
|
||||
# Face detection API endpoint
|
||||
FACE_DETECTOR_API = "http://anime-face-detector:6078/detect"
|
||||
@@ -63,6 +66,7 @@ class ProfilePictureManager:
|
||||
def _ensure_directories(self):
|
||||
"""Ensure profile picture directory exists"""
|
||||
os.makedirs(self.PROFILE_PIC_DIR, exist_ok=True)
|
||||
os.makedirs(self.ALBUM_DIR, exist_ok=True)
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize the profile picture manager (check API availability)"""
|
||||
@@ -1753,6 +1757,615 @@ Respond in JSON format:
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Album / Gallery System
|
||||
# =========================================================================
|
||||
|
||||
def _album_entry_dir(self, entry_id: str) -> str:
|
||||
"""Return the directory path for an album entry."""
|
||||
return os.path.join(self.ALBUM_DIR, entry_id)
|
||||
|
||||
def _save_album_metadata(self, entry_id: str, metadata: Dict):
|
||||
"""Save metadata for an album entry."""
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
os.makedirs(entry_dir, exist_ok=True)
|
||||
meta_path = os.path.join(entry_dir, "metadata.json")
|
||||
with open(meta_path, 'w') as f:
|
||||
json.dump(metadata, f, indent=2)
|
||||
|
||||
def _load_album_metadata(self, entry_id: str) -> Optional[Dict]:
|
||||
"""Load metadata for an album entry."""
|
||||
meta_path = os.path.join(self._album_entry_dir(entry_id), "metadata.json")
|
||||
try:
|
||||
if os.path.exists(meta_path):
|
||||
with open(meta_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading album metadata for {entry_id}: {e}")
|
||||
return None
|
||||
|
||||
async def _save_current_to_album(self, debug: bool = False) -> Optional[str]:
|
||||
"""
|
||||
Archive the current PFP (original + cropped + metadata) into the album.
|
||||
|
||||
Returns:
|
||||
The new album entry ID, or None if nothing to archive.
|
||||
"""
|
||||
if not os.path.exists(self.ORIGINAL_PATH) and not os.path.exists(self.CURRENT_PATH):
|
||||
if debug:
|
||||
logger.info("No current PFP to archive")
|
||||
return None
|
||||
|
||||
entry_id = str(uuid.uuid4())
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
os.makedirs(entry_dir, exist_ok=True)
|
||||
|
||||
# Copy images
|
||||
if os.path.exists(self.ORIGINAL_PATH):
|
||||
shutil.copy2(self.ORIGINAL_PATH, os.path.join(entry_dir, "original.png"))
|
||||
if os.path.exists(self.CURRENT_PATH):
|
||||
shutil.copy2(self.CURRENT_PATH, os.path.join(entry_dir, "cropped.png"))
|
||||
|
||||
# Build metadata from current
|
||||
current_meta = self.load_metadata() or {}
|
||||
description = self.get_current_description() or ""
|
||||
|
||||
album_meta = {
|
||||
"id": entry_id,
|
||||
"added_at": datetime.now().isoformat(),
|
||||
"source": current_meta.get("source", "unknown"),
|
||||
"description": description,
|
||||
"dominant_color": current_meta.get("dominant_color"),
|
||||
"original_width": current_meta.get("original_width"),
|
||||
"original_height": current_meta.get("original_height"),
|
||||
"crop_region": current_meta.get("crop_region"),
|
||||
"was_current": True,
|
||||
"archived_at": datetime.now().isoformat(),
|
||||
}
|
||||
# Carry over Danbooru metadata if present
|
||||
for key in ("danbooru_post_id", "danbooru_tags", "danbooru_artist",
|
||||
"danbooru_rating", "danbooru_score", "danbooru_file_url"):
|
||||
if key in current_meta:
|
||||
album_meta[key] = current_meta[key]
|
||||
|
||||
self._save_album_metadata(entry_id, album_meta)
|
||||
|
||||
if debug:
|
||||
logger.info(f"Archived current PFP to album entry {entry_id}")
|
||||
return entry_id
|
||||
|
||||
async def add_to_album(
|
||||
self,
|
||||
image_bytes: bytes,
|
||||
source: str = "custom_upload",
|
||||
extra_metadata: Optional[Dict] = None,
|
||||
generate_description: bool = True,
|
||||
auto_crop: bool = True,
|
||||
debug: bool = False,
|
||||
_batch_mode: bool = False,
|
||||
) -> Dict:
|
||||
"""
|
||||
Add an image to the album gallery.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes
|
||||
source: Source label (e.g. "custom_upload", "danbooru")
|
||||
extra_metadata: Additional metadata to store (Danbooru info, etc.)
|
||||
generate_description: Whether to generate a vision-model description
|
||||
auto_crop: Whether to auto-crop (face detection → saliency)
|
||||
debug: Enable debug output
|
||||
_batch_mode: Internal flag — if True, skip starting/stopping face detector
|
||||
(caller manages detector lifecycle)
|
||||
|
||||
Returns:
|
||||
Dict with success, entry_id, metadata
|
||||
"""
|
||||
result = {"success": False, "error": None, "entry_id": None, "metadata": {}}
|
||||
|
||||
try:
|
||||
image = Image.open(io.BytesIO(image_bytes))
|
||||
if image.mode not in ("RGB", "RGBA"):
|
||||
image = image.convert("RGB")
|
||||
|
||||
entry_id = str(uuid.uuid4())
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
os.makedirs(entry_dir, exist_ok=True)
|
||||
|
||||
# Save original
|
||||
original_path = os.path.join(entry_dir, "original.png")
|
||||
with open(original_path, 'wb') as f:
|
||||
f.write(image_bytes)
|
||||
|
||||
album_meta: Dict = {
|
||||
"id": entry_id,
|
||||
"added_at": datetime.now().isoformat(),
|
||||
"source": source,
|
||||
"original_width": image.size[0],
|
||||
"original_height": image.size[1],
|
||||
}
|
||||
if extra_metadata:
|
||||
album_meta.update(extra_metadata)
|
||||
|
||||
# Generate description
|
||||
if generate_description:
|
||||
if debug:
|
||||
logger.info(f"[Album {entry_id[:8]}] Generating description...")
|
||||
description = await self._generate_image_description(image_bytes, debug=debug)
|
||||
if description:
|
||||
album_meta["description"] = description
|
||||
if debug:
|
||||
logger.info(f"[Album {entry_id[:8]}] Description: {description[:80]}...")
|
||||
|
||||
# Auto-crop
|
||||
if auto_crop:
|
||||
target_size = 512
|
||||
w, h = image.size
|
||||
if w <= target_size and h <= target_size:
|
||||
cropped_image = image
|
||||
else:
|
||||
if _batch_mode:
|
||||
# In batch mode the face detector is already running
|
||||
cropped_image = await self._intelligent_crop(image, image_bytes, target_size=target_size, debug=debug)
|
||||
else:
|
||||
cropped_image = await self._intelligent_crop(image, image_bytes, target_size=target_size, debug=debug)
|
||||
|
||||
if cropped_image:
|
||||
buf = io.BytesIO()
|
||||
cropped_image.save(buf, format='PNG')
|
||||
cropped_bytes = buf.getvalue()
|
||||
cropped_path = os.path.join(entry_dir, "cropped.png")
|
||||
with open(cropped_path, 'wb') as f:
|
||||
f.write(cropped_bytes)
|
||||
|
||||
# Extract dominant color from cropped
|
||||
dominant_color = self._extract_dominant_color(cropped_image, debug=debug)
|
||||
if dominant_color:
|
||||
album_meta["dominant_color"] = {
|
||||
"rgb": dominant_color,
|
||||
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
|
||||
}
|
||||
else:
|
||||
if debug:
|
||||
logger.warning(f"[Album {entry_id[:8]}] Auto-crop failed, using original resized")
|
||||
# Fallback — resize original to square
|
||||
cropped_image = image.copy()
|
||||
cropped_image.thumbnail((target_size, target_size), Image.Resampling.LANCZOS)
|
||||
buf = io.BytesIO()
|
||||
cropped_image.save(buf, format='PNG')
|
||||
with open(os.path.join(entry_dir, "cropped.png"), 'wb') as f:
|
||||
f.write(buf.getvalue())
|
||||
|
||||
self._save_album_metadata(entry_id, album_meta)
|
||||
|
||||
result["success"] = True
|
||||
result["entry_id"] = entry_id
|
||||
result["metadata"] = album_meta
|
||||
if debug:
|
||||
logger.info(f"[Album] Added entry {entry_id}")
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = f"Error adding to album: {e}"
|
||||
logger.error(f"Error in add_to_album: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
return result
|
||||
|
||||
async def add_batch_to_album(
|
||||
self,
|
||||
images: List[Dict],
|
||||
debug: bool = False,
|
||||
) -> Dict:
|
||||
"""
|
||||
Batch-add multiple images to the album efficiently.
|
||||
Keeps the vision model loaded and the face detector running across all images.
|
||||
|
||||
Args:
|
||||
images: List of dicts, each with 'bytes' (raw image bytes) and optional 'source', 'extra_metadata'
|
||||
debug: Enable debug output
|
||||
|
||||
Returns:
|
||||
Dict with results list and summary counts
|
||||
"""
|
||||
results = []
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
face_detector_started = False
|
||||
|
||||
try:
|
||||
# Pre-start face detector for the whole batch
|
||||
await self._ensure_vram_available(debug=debug)
|
||||
if await self._start_face_detector(debug=debug):
|
||||
face_detector_started = True
|
||||
if debug:
|
||||
logger.info(f"[Album Batch] Face detector started for {len(images)} images")
|
||||
|
||||
for i, img_info in enumerate(images):
|
||||
if debug:
|
||||
logger.info(f"[Album Batch] Processing image {i+1}/{len(images)}...")
|
||||
|
||||
entry_result = await self.add_to_album(
|
||||
image_bytes=img_info["bytes"],
|
||||
source=img_info.get("source", "custom_upload"),
|
||||
extra_metadata=img_info.get("extra_metadata"),
|
||||
generate_description=True,
|
||||
auto_crop=True,
|
||||
debug=debug,
|
||||
_batch_mode=True,
|
||||
)
|
||||
results.append(entry_result)
|
||||
if entry_result["success"]:
|
||||
success_count += 1
|
||||
else:
|
||||
fail_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in batch album add: {e}")
|
||||
finally:
|
||||
# Clean up face detector after entire batch
|
||||
if face_detector_started:
|
||||
await self._stop_face_detector(debug=debug)
|
||||
|
||||
return {
|
||||
"success": fail_count == 0,
|
||||
"total": len(images),
|
||||
"succeeded": success_count,
|
||||
"failed": fail_count,
|
||||
"results": results,
|
||||
}
|
||||
|
||||
def get_album_entries(self) -> List[Dict]:
|
||||
"""
|
||||
List all album entries with their metadata, sorted newest-first.
|
||||
|
||||
Returns:
|
||||
List of metadata dicts
|
||||
"""
|
||||
entries = []
|
||||
if not os.path.exists(self.ALBUM_DIR):
|
||||
return entries
|
||||
|
||||
for name in os.listdir(self.ALBUM_DIR):
|
||||
entry_dir = os.path.join(self.ALBUM_DIR, name)
|
||||
if not os.path.isdir(entry_dir):
|
||||
continue
|
||||
meta = self._load_album_metadata(name)
|
||||
if meta:
|
||||
# Add convenience flags
|
||||
meta["has_original"] = os.path.exists(os.path.join(entry_dir, "original.png"))
|
||||
meta["has_cropped"] = os.path.exists(os.path.join(entry_dir, "cropped.png"))
|
||||
entries.append(meta)
|
||||
|
||||
# Sort newest first
|
||||
entries.sort(key=lambda e: e.get("added_at", ""), reverse=True)
|
||||
return entries
|
||||
|
||||
def get_album_entry(self, entry_id: str) -> Optional[Dict]:
|
||||
"""Get metadata for a single album entry."""
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
if not os.path.isdir(entry_dir):
|
||||
return None
|
||||
meta = self._load_album_metadata(entry_id)
|
||||
if meta:
|
||||
meta["has_original"] = os.path.exists(os.path.join(entry_dir, "original.png"))
|
||||
meta["has_cropped"] = os.path.exists(os.path.join(entry_dir, "cropped.png"))
|
||||
return meta
|
||||
|
||||
def get_album_image_path(self, entry_id: str, image_type: str = "cropped") -> Optional[str]:
|
||||
"""
|
||||
Get the file path for an album entry's image.
|
||||
|
||||
Args:
|
||||
entry_id: Album entry UUID
|
||||
image_type: 'original' or 'cropped'
|
||||
|
||||
Returns:
|
||||
Absolute path string or None if not found
|
||||
"""
|
||||
filename = "original.png" if image_type == "original" else "cropped.png"
|
||||
path = os.path.join(self._album_entry_dir(entry_id), filename)
|
||||
return path if os.path.exists(path) else None
|
||||
|
||||
async def set_album_entry_as_current(self, entry_id: str, archive_current: bool = True, debug: bool = False) -> Dict:
|
||||
"""
|
||||
Set an album entry as the current Discord profile picture.
|
||||
Optionally archives the current PFP into the album first.
|
||||
|
||||
Args:
|
||||
entry_id: Album entry UUID
|
||||
archive_current: Whether to archive current PFP before replacing
|
||||
debug: Enable debug output
|
||||
|
||||
Returns:
|
||||
Dict with success status
|
||||
"""
|
||||
result = {"success": False, "error": None, "archived_entry_id": None}
|
||||
|
||||
try:
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
meta = self._load_album_metadata(entry_id)
|
||||
if not meta:
|
||||
result["error"] = f"Album entry {entry_id} not found"
|
||||
return result
|
||||
|
||||
# Find the best image to use — prefer cropped, fall back to original
|
||||
cropped_path = os.path.join(entry_dir, "cropped.png")
|
||||
original_path = os.path.join(entry_dir, "original.png")
|
||||
|
||||
if not os.path.exists(cropped_path) and not os.path.exists(original_path):
|
||||
result["error"] = "Album entry has no images"
|
||||
return result
|
||||
|
||||
# Archive current PFP first
|
||||
if archive_current and (os.path.exists(self.ORIGINAL_PATH) or os.path.exists(self.CURRENT_PATH)):
|
||||
archived_id = await self._save_current_to_album(debug=debug)
|
||||
result["archived_entry_id"] = archived_id
|
||||
|
||||
# Copy album entry images to current slots
|
||||
if os.path.exists(original_path):
|
||||
shutil.copy2(original_path, self.ORIGINAL_PATH)
|
||||
if os.path.exists(cropped_path):
|
||||
shutil.copy2(cropped_path, self.CURRENT_PATH)
|
||||
elif os.path.exists(original_path):
|
||||
# No cropped version — copy original as current too
|
||||
shutil.copy2(original_path, self.CURRENT_PATH)
|
||||
|
||||
# Read the avatar bytes for Discord
|
||||
avatar_path = self.CURRENT_PATH if os.path.exists(self.CURRENT_PATH) else self.ORIGINAL_PATH
|
||||
with open(avatar_path, 'rb') as f:
|
||||
avatar_bytes = f.read()
|
||||
|
||||
# Update Discord avatar
|
||||
if globals.client and globals.client.user:
|
||||
try:
|
||||
if globals.client.loop and globals.client.loop.is_running():
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
globals.client.user.edit(avatar=avatar_bytes),
|
||||
globals.client.loop
|
||||
)
|
||||
future.result(timeout=10)
|
||||
else:
|
||||
await globals.client.user.edit(avatar=avatar_bytes)
|
||||
|
||||
result["success"] = True
|
||||
logger.info(f"Set album entry {entry_id} as current PFP")
|
||||
except discord.HTTPException as e:
|
||||
result["error"] = f"Discord API error: {e}"
|
||||
return result
|
||||
except Exception as e:
|
||||
result["error"] = f"Unexpected error updating avatar: {e}"
|
||||
return result
|
||||
else:
|
||||
result["error"] = "Bot client not ready"
|
||||
return result
|
||||
|
||||
# Update current metadata
|
||||
new_meta = {
|
||||
"source": meta.get("source", "album"),
|
||||
"changed_at": datetime.now().isoformat(),
|
||||
"album_entry_id": entry_id,
|
||||
"original_width": meta.get("original_width"),
|
||||
"original_height": meta.get("original_height"),
|
||||
"dominant_color": meta.get("dominant_color"),
|
||||
"crop_region": meta.get("crop_region"),
|
||||
}
|
||||
self._save_metadata(new_meta)
|
||||
|
||||
# Update description
|
||||
description = meta.get("description", "")
|
||||
if description:
|
||||
desc_path = os.path.join(self.PROFILE_PIC_DIR, "current_description.txt")
|
||||
with open(desc_path, 'w', encoding='utf-8') as f:
|
||||
f.write(description)
|
||||
|
||||
# Update role colors
|
||||
dom_color = meta.get("dominant_color")
|
||||
if dom_color and "rgb" in dom_color:
|
||||
color_tuple = tuple(dom_color["rgb"])
|
||||
await self._update_role_colors(color_tuple, debug=debug)
|
||||
|
||||
# Update bipolar webhooks
|
||||
if globals.BIPOLAR_MODE:
|
||||
try:
|
||||
from utils.bipolar_mode import update_webhook_avatars
|
||||
await update_webhook_avatars(globals.client)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to update bipolar webhook avatars: {e}")
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = f"Error setting album entry as current: {e}"
|
||||
logger.error(f"Error in set_album_entry_as_current: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def delete_album_entry(self, entry_id: str) -> bool:
|
||||
"""Delete a single album entry and its files."""
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
if os.path.isdir(entry_dir):
|
||||
shutil.rmtree(entry_dir)
|
||||
logger.info(f"Deleted album entry {entry_id}")
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_album_entries(self, entry_ids: List[str]) -> Dict:
|
||||
"""Bulk delete multiple album entries."""
|
||||
deleted = 0
|
||||
failed = 0
|
||||
for eid in entry_ids:
|
||||
if self.delete_album_entry(eid):
|
||||
deleted += 1
|
||||
else:
|
||||
failed += 1
|
||||
return {"deleted": deleted, "failed": failed, "total": len(entry_ids)}
|
||||
|
||||
def get_album_disk_usage(self) -> Dict:
|
||||
"""Calculate total disk usage of the album directory."""
|
||||
total_bytes = 0
|
||||
entry_count = 0
|
||||
if os.path.exists(self.ALBUM_DIR):
|
||||
for name in os.listdir(self.ALBUM_DIR):
|
||||
entry_dir = os.path.join(self.ALBUM_DIR, name)
|
||||
if not os.path.isdir(entry_dir):
|
||||
continue
|
||||
entry_count += 1
|
||||
for fname in os.listdir(entry_dir):
|
||||
fpath = os.path.join(entry_dir, fname)
|
||||
if os.path.isfile(fpath):
|
||||
total_bytes += os.path.getsize(fpath)
|
||||
|
||||
# Human-readable
|
||||
if total_bytes < 1024:
|
||||
human = f"{total_bytes} B"
|
||||
elif total_bytes < 1024 * 1024:
|
||||
human = f"{total_bytes / 1024:.1f} KB"
|
||||
elif total_bytes < 1024 * 1024 * 1024:
|
||||
human = f"{total_bytes / (1024*1024):.1f} MB"
|
||||
else:
|
||||
human = f"{total_bytes / (1024*1024*1024):.2f} GB"
|
||||
|
||||
return {
|
||||
"total_bytes": total_bytes,
|
||||
"human_readable": human,
|
||||
"entry_count": entry_count,
|
||||
}
|
||||
|
||||
async def update_album_entry_description(self, entry_id: str, description: str, debug: bool = False) -> Dict:
|
||||
"""Update the description of an album entry."""
|
||||
result = {"success": False, "error": None}
|
||||
meta = self._load_album_metadata(entry_id)
|
||||
if not meta:
|
||||
result["error"] = f"Album entry {entry_id} not found"
|
||||
return result
|
||||
meta["description"] = description
|
||||
self._save_album_metadata(entry_id, meta)
|
||||
result["success"] = True
|
||||
if debug:
|
||||
logger.info(f"Updated description for album entry {entry_id}")
|
||||
return result
|
||||
|
||||
async def manual_crop_album_entry(
|
||||
self, entry_id: str, x: int, y: int, width: int, height: int,
|
||||
target_size: int = 512, debug: bool = False
|
||||
) -> Dict:
|
||||
"""
|
||||
Manually crop an album entry's original image.
|
||||
Does NOT apply to Discord — only updates the album entry's cropped.png.
|
||||
"""
|
||||
result = {"success": False, "error": None, "metadata": {}}
|
||||
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
original_path = os.path.join(entry_dir, "original.png")
|
||||
|
||||
if not os.path.exists(original_path):
|
||||
result["error"] = f"No original image for album entry {entry_id}"
|
||||
return result
|
||||
|
||||
try:
|
||||
image = Image.open(original_path)
|
||||
img_w, img_h = image.size
|
||||
|
||||
if x < 0 or y < 0:
|
||||
result["error"] = f"Crop coordinates must be non-negative"
|
||||
return result
|
||||
if x + width > img_w or y + height > img_h:
|
||||
result["error"] = f"Crop region exceeds image bounds ({img_w}x{img_h})"
|
||||
return result
|
||||
if width < 64 or height < 64:
|
||||
result["error"] = f"Crop region too small (min 64x64)"
|
||||
return result
|
||||
|
||||
cropped = image.crop((x, y, x + width, y + height))
|
||||
cropped = cropped.resize((target_size, target_size), Image.Resampling.LANCZOS)
|
||||
|
||||
buf = io.BytesIO()
|
||||
cropped.save(buf, format='PNG')
|
||||
cropped_bytes = buf.getvalue()
|
||||
|
||||
with open(os.path.join(entry_dir, "cropped.png"), 'wb') as f:
|
||||
f.write(cropped_bytes)
|
||||
|
||||
# Update metadata
|
||||
meta = self._load_album_metadata(entry_id) or {}
|
||||
meta["crop_region"] = {"x": x, "y": y, "width": width, "height": height}
|
||||
|
||||
# Re-extract dominant color
|
||||
dominant_color = self._extract_dominant_color(cropped, debug=debug)
|
||||
if dominant_color:
|
||||
meta["dominant_color"] = {
|
||||
"rgb": dominant_color,
|
||||
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
|
||||
}
|
||||
self._save_album_metadata(entry_id, meta)
|
||||
|
||||
result["success"] = True
|
||||
result["metadata"] = meta
|
||||
if debug:
|
||||
logger.info(f"Manual crop applied to album entry {entry_id}")
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = f"Error cropping album entry: {e}"
|
||||
logger.error(f"Error in manual_crop_album_entry: {e}")
|
||||
|
||||
return result
|
||||
|
||||
async def auto_crop_album_entry(self, entry_id: str, debug: bool = False) -> Dict:
|
||||
"""
|
||||
Auto-crop an album entry's original image (face detection → saliency).
|
||||
Does NOT apply to Discord — only updates the album entry's cropped.png.
|
||||
"""
|
||||
result = {"success": False, "error": None, "metadata": {}}
|
||||
|
||||
entry_dir = self._album_entry_dir(entry_id)
|
||||
original_path = os.path.join(entry_dir, "original.png")
|
||||
|
||||
if not os.path.exists(original_path):
|
||||
result["error"] = f"No original image for album entry {entry_id}"
|
||||
return result
|
||||
|
||||
try:
|
||||
with open(original_path, 'rb') as f:
|
||||
image_bytes = f.read()
|
||||
image = Image.open(io.BytesIO(image_bytes))
|
||||
|
||||
target_size = 512
|
||||
w, h = image.size
|
||||
if w <= target_size and h <= target_size:
|
||||
cropped_image = image
|
||||
else:
|
||||
cropped_image = await self._intelligent_crop(image, image_bytes, target_size=target_size, debug=debug)
|
||||
|
||||
if not cropped_image:
|
||||
result["error"] = "Auto-crop failed"
|
||||
return result
|
||||
|
||||
buf = io.BytesIO()
|
||||
cropped_image.save(buf, format='PNG')
|
||||
with open(os.path.join(entry_dir, "cropped.png"), 'wb') as f:
|
||||
f.write(buf.getvalue())
|
||||
|
||||
# Update metadata
|
||||
meta = self._load_album_metadata(entry_id) or {}
|
||||
meta.pop("crop_region", None) # auto-crop doesn't have user-defined region
|
||||
|
||||
dominant_color = self._extract_dominant_color(cropped_image, debug=debug)
|
||||
if dominant_color:
|
||||
meta["dominant_color"] = {
|
||||
"rgb": dominant_color,
|
||||
"hex": "#{:02x}{:02x}{:02x}".format(*dominant_color)
|
||||
}
|
||||
self._save_album_metadata(entry_id, meta)
|
||||
|
||||
result["success"] = True
|
||||
result["metadata"] = meta
|
||||
if debug:
|
||||
logger.info(f"Auto-crop applied to album entry {entry_id}")
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = f"Error auto-cropping album entry: {e}"
|
||||
logger.error(f"Error in auto_crop_album_entry: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# Global instance
|
||||
profile_picture_manager = ProfilePictureManager()
|
||||
|
||||
Reference in New Issue
Block a user