Files
miku-discord/bot/utils/twitter_fetcher.py

196 lines
6.9 KiB
Python
Raw Normal View History

2025-12-07 17:15:09 +02:00
# utils/twitter_fetcher.py
import asyncio
import json
from typing import Dict, Any
# Apply twscrape fix BEFORE importing twscrape
from utils.twscrape_fix import apply_twscrape_fix
apply_twscrape_fix()
2025-12-07 17:15:09 +02:00
from twscrape import API, gather, Account
from playwright.async_api import async_playwright
from pathlib import Path
from utils.logger import get_logger
logger = get_logger('media')
2025-12-07 17:15:09 +02:00
COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json"
async def extract_media_urls(page, tweet_url):
logger.debug(f"Visiting tweet page: {tweet_url}")
2025-12-07 17:15:09 +02:00
try:
# Reduced timeout to 10s to prevent hanging
await page.goto(tweet_url, timeout=10000)
2025-12-07 17:15:09 +02:00
await page.wait_for_timeout(1000)
media_elements = await page.query_selector_all("img[src*='pbs.twimg.com/media']")
urls = set()
for element in media_elements:
src = await element.get_attribute("src")
if src:
cleaned = src.split("&name=")[0] + "&name=large"
urls.add(cleaned)
logger.debug(f"Found {len(urls)} media URLs on tweet: {tweet_url}")
2025-12-07 17:15:09 +02:00
return list(urls)
except Exception as e:
logger.warning(f"Playwright error on {tweet_url}: {e}")
2025-12-07 17:15:09 +02:00
return []
async def fetch_miku_tweets(limit=5):
"""
Search for Miku tweets with images.
OPTIMIZED: Uses twscrape's built-in media info instead of Playwright.
"""
2025-12-07 17:15:09 +02:00
# Load cookies from JSON file
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
cookie_list = json.load(f)
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
# Add the account to twscrape
api = API()
await api.pool.add_account(
username="HSankyuu39",
password="x", # placeholder (won't be used)
email="x", # optional
email_password="x", # optional
cookies=cookie_header
)
await api.pool.login_all()
logger.info(f"Searching for Miku tweets (limit={limit})...")
2025-12-07 17:15:09 +02:00
query = 'Hatsune Miku OR 初音ミク has:images after:2025'
tweets = await gather(api.search(query, limit=limit, kv={"product": "Top"}))
logger.info(f"Found {len(tweets)} tweets from API, extracting media...")
# Extract media directly from tweet objects (no browser needed!)
results = []
for tweet in tweets:
try:
# Check if tweet has media
if not hasattr(tweet, 'media') or not tweet.media:
continue
# Extract media URLs from tweet object
media_urls = []
if hasattr(tweet.media, 'photos'):
for photo in tweet.media.photos:
if hasattr(photo, 'url'):
# Get the highest quality version
media_url = photo.url
if '?' in media_url:
media_url = media_url.split('?')[0]
media_url += '?name=large'
media_urls.append(media_url)
2025-12-07 17:15:09 +02:00
if media_urls:
username = tweet.user.username
tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
2025-12-07 17:15:09 +02:00
results.append({
"username": username,
"text": tweet.rawContent,
"url": tweet_url,
"media": media_urls
})
logger.debug(f"Extracted {len(media_urls)} media URLs from @{username}'s tweet")
except Exception as e:
logger.warning(f"Error extracting media from tweet: {e}")
continue
2025-12-07 17:15:09 +02:00
logger.info(f"Finished! Returning {len(results)} tweet(s) with media (no browser needed!)")
return results
2025-12-07 17:15:09 +02:00
async def _search_latest(api: API, query: str, limit: int) -> list:
# kv product "Latest" to search by latest
try:
return await gather(api.search(query, limit=limit, kv={"product": "Latest"}))
except Exception as e:
logger.error(f"Latest search failed for '{query}': {e}")
2025-12-07 17:15:09 +02:00
return []
async def fetch_figurine_tweets_latest(limit_per_source: int = 10) -> list:
"""Search three sources by Latest, collect tweets with images, and return unified list of dicts.
Sources:
- "miku figure from:mecchaJP"
- "miku from:GoodSmile_US"
- "miku from:OtakuOwletMerch"
OPTIMIZED: Uses twscrape's built-in media info instead of Playwright browser scraping.
This is much faster and doesn't risk hanging.
2025-12-07 17:15:09 +02:00
"""
# Load cookies
with open(COOKIE_PATH, "r", encoding="utf-8") as f:
cookie_list = json.load(f)
cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
api = API()
await api.pool.add_account(
username="HSankyuu39",
password="x",
email="x",
email_password="x",
cookies=cookie_header
)
await api.pool.login_all()
queries = [
"miku figure from:mecchaJP",
"miku from:GoodSmile_US",
"miku from:OtakuOwletMerch",
]
logger.info("Searching figurine tweets by Latest across sources...")
2025-12-07 17:15:09 +02:00
all_tweets = []
for q in queries:
tweets = await _search_latest(api, q, limit_per_source)
all_tweets.extend(tweets)
logger.info(f"Found {len(all_tweets)} candidate tweets from API")
# Extract media directly from tweet objects (much faster!)
results = []
for tweet in all_tweets:
try:
# Check if tweet has media
if not hasattr(tweet, 'media') or not tweet.media:
continue
# Extract media URLs from tweet object
media_urls = []
if hasattr(tweet.media, 'photos'):
for photo in tweet.media.photos:
if hasattr(photo, 'url'):
# Get the highest quality version
media_url = photo.url
if '?' in media_url:
media_url = media_url.split('?')[0]
media_url += '?name=large'
media_urls.append(media_url)
if media_urls:
2025-12-07 17:15:09 +02:00
username = tweet.user.username
tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
results.append({
"username": username,
"text": tweet.rawContent,
"url": tweet_url,
"media": media_urls
})
logger.debug(f"Extracted {len(media_urls)} media URLs from @{username}'s tweet")
except Exception as e:
logger.warning(f"Error extracting media from tweet: {e}")
continue
logger.info(f"Figurine fetch finished. Returning {len(results)} tweet(s) with media (no browser needed!)")
return results
2025-12-07 17:15:09 +02:00
# Note: fetch_tweet_by_url was removed - now using twscrape-based approach in figurine_notifier.py
# This avoids Playwright browser dependencies while maintaining functionality