diff --git a/bot/api.py b/bot/api.py
index a47a232..a8f2dbd 100644
--- a/bot/api.py
+++ b/bot/api.py
@@ -869,16 +869,25 @@ async def trigger_autonomous_general(guild_id: int = None):
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/engage")
-async def trigger_autonomous_engage_user(guild_id: int = None, user_id: str = None, engagement_type: str = None):
+async def trigger_autonomous_engage_user(
+ guild_id: int = None,
+ user_id: str = None,
+ engagement_type: str = None,
+ manual_trigger: str = "false"
+):
# If guild_id is provided, send autonomous engagement only to that server
# If no guild_id, send to all servers (legacy behavior)
# user_id: Optional specific user to engage (Discord user ID as string)
# engagement_type: Optional type - 'activity', 'general', 'status', or None for random
+ # manual_trigger: If True (as string), bypass the "recently engaged" check (for web UI manual triggers)
+ # Convert manual_trigger string to boolean
+ manual_trigger_bool = manual_trigger.lower() in ('true', '1', 'yes')
+
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import miku_engage_random_user_for_server
- globals.client.loop.create_task(miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type))
+ globals.client.loop.create_task(miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type, manual_trigger=manual_trigger_bool))
# Build detailed message
msg_parts = [f"Autonomous user engagement queued for server {guild_id}"]
@@ -886,38 +895,49 @@ async def trigger_autonomous_engage_user(guild_id: int = None, user_id: str = No
msg_parts.append(f"targeting user {user_id}")
if engagement_type:
msg_parts.append(f"with {engagement_type} engagement")
+ if manual_trigger_bool:
+ msg_parts.append("(manual trigger - bypassing cooldown)")
return {"status": "ok", "message": " ".join(msg_parts)}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import miku_engage_random_user
- globals.client.loop.create_task(miku_engage_random_user(user_id=user_id, engagement_type=engagement_type))
+ globals.client.loop.create_task(miku_engage_random_user(user_id=user_id, engagement_type=engagement_type, manual_trigger=manual_trigger_bool))
msg_parts = ["Autonomous user engagement queued for all servers"]
if user_id:
msg_parts.append(f"targeting user {user_id}")
if engagement_type:
msg_parts.append(f"with {engagement_type} engagement")
+ if manual_trigger_bool:
+ msg_parts.append("(manual trigger - bypassing cooldown)")
return {"status": "ok", "message": " ".join(msg_parts)}
else:
return {"status": "error", "message": "Bot not ready"}
@app.post("/autonomous/tweet")
-async def trigger_autonomous_tweet(guild_id: int = None):
+async def trigger_autonomous_tweet(guild_id: int = None, tweet_url: str = None):
# If guild_id is provided, send tweet only to that server
# If no guild_id, send to all servers (legacy behavior)
+ # If tweet_url is provided, share that specific tweet; otherwise fetch one
if globals.client and globals.client.loop and globals.client.loop.is_running():
if guild_id is not None:
# Send to specific server only
from utils.autonomous import share_miku_tweet_for_server
- globals.client.loop.create_task(share_miku_tweet_for_server(guild_id))
- return {"status": "ok", "message": f"Autonomous tweet sharing queued for server {guild_id}"}
+ globals.client.loop.create_task(share_miku_tweet_for_server(guild_id, tweet_url=tweet_url))
+ msg = f"Autonomous tweet sharing queued for server {guild_id}"
+ if tweet_url:
+ msg += f" with URL {tweet_url}"
+ return {"status": "ok", "message": msg}
else:
# Send to all servers (legacy behavior)
from utils.autonomous import share_miku_tweet
- globals.client.loop.create_task(share_miku_tweet())
- return {"status": "ok", "message": "Autonomous tweet sharing queued for all servers"}
+ globals.client.loop.create_task(share_miku_tweet(tweet_url=tweet_url))
+ msg = "Autonomous tweet sharing queued for all servers"
+ if tweet_url:
+ msg += f" with URL {tweet_url}"
+ return {"status": "ok", "message": msg}
else:
return {"status": "error", "message": "Bot not ready"}
@@ -1538,11 +1558,26 @@ async def trigger_autonomous_general_for_server(guild_id: int):
return {"status": "error", "message": f"Failed to trigger autonomous message: {e}"}
@app.post("/servers/{guild_id}/autonomous/engage")
-async def trigger_autonomous_engage_for_server(guild_id: int, user_id: str = None, engagement_type: str = None):
- """Trigger autonomous user engagement for a specific server"""
+async def trigger_autonomous_engage_for_server(
+ guild_id: int,
+ user_id: str = None,
+ engagement_type: str = None,
+ manual_trigger: str = "false"
+):
+ """Trigger autonomous user engagement for a specific server
+
+ Args:
+ guild_id: The server ID to engage in
+ user_id: Optional specific user to engage (Discord user ID as string)
+ engagement_type: Optional type - 'activity', 'general', 'status', or None for random
+ manual_trigger: If True (as string), bypass the "recently engaged" check (for web UI manual triggers)
+ """
+ # Convert manual_trigger string to boolean
+ manual_trigger_bool = manual_trigger.lower() in ('true', '1', 'yes')
+
from utils.autonomous import miku_engage_random_user_for_server
try:
- await miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type)
+ await miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type, manual_trigger=manual_trigger_bool)
# Build detailed message
msg_parts = [f"Autonomous user engagement triggered for server {guild_id}"]
@@ -1550,6 +1585,8 @@ async def trigger_autonomous_engage_for_server(guild_id: int, user_id: str = Non
msg_parts.append(f"targeting user {user_id}")
if engagement_type:
msg_parts.append(f"with {engagement_type} engagement")
+ if manual_trigger_bool:
+ msg_parts.append("(manual trigger - bypassing cooldown)")
return {"status": "ok", "message": " ".join(msg_parts)}
except Exception as e:
diff --git a/bot/static/index.html b/bot/static/index.html
index d8978f3..b68aad0 100644
--- a/bot/static/index.html
+++ b/bot/static/index.html
@@ -807,7 +807,17 @@
-
+
+
+
+
+
+
+
+
+
+
+
@@ -3051,6 +3061,9 @@ async function triggerEngageUser() {
params.append('engagement_type', engageType);
}
+ // Add manual_trigger flag to bypass cooldown checks
+ params.append('manual_trigger', 'true');
+
if (params.toString()) {
endpoint += `?${params.toString()}`;
}
@@ -3075,6 +3088,76 @@ async function triggerEngageUser() {
}
}
+// Toggle Share Tweet Submenu
+function toggleTweetSubmenu() {
+ const submenu = document.getElementById('tweet-submenu');
+ if (submenu.style.display === 'none') {
+ submenu.style.display = 'block';
+ } else {
+ submenu.style.display = 'none';
+ }
+}
+
+// Trigger Share Tweet with optional URL
+async function triggerShareTweet() {
+ const selectedServer = document.getElementById('server-select').value;
+ const tweetUrl = document.getElementById('tweet-url').value.trim();
+
+ // Validate URL if provided
+ if (tweetUrl) {
+ const validDomains = ['x.com', 'twitter.com', 'fxtwitter.com'];
+ let isValid = false;
+
+ try {
+ const urlObj = new URL(tweetUrl);
+ const hostname = urlObj.hostname.toLowerCase();
+ isValid = validDomains.some(domain => hostname === domain || hostname.endsWith('.' + domain));
+ } catch (e) {
+ // Invalid URL format
+ }
+
+ if (!isValid) {
+ showNotification('Invalid tweet URL. Must be from x.com, twitter.com, or fxtwitter.com', 'error');
+ return;
+ }
+ }
+
+ try {
+ let endpoint = '/autonomous/tweet';
+ const params = new URLSearchParams();
+
+ // Add guild_id if a specific server is selected
+ if (selectedServer !== 'all') {
+ params.append('guild_id', selectedServer);
+ }
+
+ // Add tweet_url if specified
+ if (tweetUrl) {
+ params.append('tweet_url', tweetUrl);
+ }
+
+ if (params.toString()) {
+ endpoint += `?${params.toString()}`;
+ }
+
+ const response = await fetch(endpoint, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' }
+ });
+
+ const result = await response.json();
+
+ if (response.ok) {
+ showNotification(result.message || 'Tweet share triggered successfully');
+ } else {
+ throw new Error(result.message || 'Failed to trigger tweet share');
+ }
+ } catch (error) {
+ console.error('Failed to trigger tweet share:', error);
+ showNotification(error.message || 'Failed to trigger tweet share', 'error');
+ }
+}
+
// Profile Picture Management
async function changeProfilePicture() {
const selectedServer = document.getElementById('server-select').value;
diff --git a/bot/utils/autonomous_v1_legacy.py b/bot/utils/autonomous_v1_legacy.py
index 1170964..81f6021 100644
--- a/bot/utils/autonomous_v1_legacy.py
+++ b/bot/utils/autonomous_v1_legacy.py
@@ -4,6 +4,7 @@ import random
import time
import json
import os
+import re
from datetime import datetime
import discord
from discord import Status
@@ -27,6 +28,107 @@ from utils.logger import get_logger
logger = get_logger('autonomous')
+
+async def fetch_tweet_by_url(tweet_url: str):
+ """Fetch a specific tweet by its URL using twscrape.
+
+ Args:
+ tweet_url: URL of the tweet to fetch (x.com, twitter.com, or fxtwitter.com)
+
+ Returns:
+ Dictionary with tweet data or None if fetch fails
+ """
+ try:
+ # Extract tweet ID from URL
+ # Handle various URL formats:
+ # https://twitter.com/username/status/1234567890
+ # https://x.com/username/status/1234567890
+ # https://fxtwitter.com/username/status/1234567890
+ match = re.search(r'/status/(\d+)', tweet_url)
+ if not match:
+ logger.error(f"Could not extract tweet ID from URL: {tweet_url}")
+ return None
+
+ tweet_id = int(match.group(1))
+
+ from twscrape import API
+
+ # Load cookies from JSON file
+ from pathlib import Path
+ COOKIE_PATH = Path(__file__).parent / "x.com.cookies.json"
+
+ if not COOKIE_PATH.exists():
+ logger.error(f"Cookie file not found: {COOKIE_PATH}")
+ return None
+
+ import json
+ with open(COOKIE_PATH, "r", encoding="utf-8") as f:
+ cookie_list = json.load(f)
+ cookie_header = "; ".join(f"{c['name']}={c['value']}" for c in cookie_list)
+
+ api = API()
+ await api.pool.add_account(
+ username="HSankyuu39",
+ password="x",
+ email="x",
+ email_password="x",
+ cookies=cookie_header
+ )
+ await api.pool.login_all()
+
+ # Fetch the specific tweet using search (same approach as figurine_notifier.py)
+ from twscrape import gather
+ logger.debug(f"Searching for tweet with ID {tweet_id}")
+ search_results = await gather(api.search(f"{tweet_id}", limit=1))
+ logger.debug(f"Search returned {len(search_results)} results")
+
+ # Check if we found the tweet
+ tweet = None
+ for search_tweet in search_results:
+ if str(search_tweet.id) == str(tweet_id):
+ tweet = search_tweet
+ logger.debug(f"Found matching tweet with ID {tweet.id}")
+ break
+
+ if not tweet and search_results:
+ # If no exact match but we have results, use the first one
+ tweet = search_results[0]
+ logger.debug(f"Using first search result with ID {tweet.id}")
+
+ if not tweet:
+ logger.error(f"Failed to fetch tweet ID {tweet_id}")
+ return None
+
+ # Extract media URLs if present
+ media_urls = []
+ if hasattr(tweet, 'media') and tweet.media:
+ if hasattr(tweet.media, 'photos'):
+ for photo in tweet.media.photos:
+ if hasattr(photo, 'url'):
+ media_url = photo.url
+ if '?' in media_url:
+ media_url = media_url.split('?')[0]
+ media_url += '?name=large'
+ media_urls.append(media_url)
+
+ # Extract username and build URL
+ username = tweet.user.username
+ tweet_url = f"https://twitter.com/{username}/status/{tweet.id}"
+
+ result = {
+ "username": username,
+ "text": tweet.rawContent if hasattr(tweet, 'rawContent') else "",
+ "url": tweet_url,
+ "media": media_urls if media_urls else []
+ }
+
+ logger.info(f"Successfully fetched tweet {tweet_id} from @{username}")
+ return result
+
+ except Exception as e:
+ logger.error(f"Error fetching tweet by URL {tweet_url}: {e}")
+ return None
+
# Server-specific memory storage
_server_autonomous_messages = {} # guild_id -> rotating buffer of last general messages
_server_user_engagements = {} # guild_id -> user_id -> timestamp
@@ -138,13 +240,14 @@ async def miku_say_something_general_for_server(guild_id: int):
except Exception as e:
logger.error(f"Failed to send autonomous message: {e}")
-async def miku_engage_random_user_for_server(guild_id: int, user_id: str = None, engagement_type: str = None):
+async def miku_engage_random_user_for_server(guild_id: int, user_id: str = None, engagement_type: str = None, manual_trigger: bool = False):
"""Miku engages a random user in a specific server
Args:
guild_id: The server ID
user_id: Optional specific user ID to engage (as string). If None, picks random user
engagement_type: Optional engagement style - 'activity', 'general', 'status', or None for auto-detect
+ manual_trigger: If True, bypass cooldown checks (for web UI manual triggers)
"""
server_config = server_manager.get_server_config(guild_id)
if not server_config:
@@ -198,10 +301,15 @@ async def miku_engage_random_user_for_server(guild_id: int, user_id: str = None,
now = time.time()
last_time = _server_user_engagements[guild_id].get(target.id, 0)
- if now - last_time < 43200: # 12 hours in seconds
+
+ # Skip cooldown check if this is a manual trigger from web UI
+ if not manual_trigger and now - last_time < 43200: # 12 hours in seconds
logger.info(f"Recently engaged {target.display_name} in server {guild_id}, switching to general message.")
await miku_say_something_general_for_server(guild_id)
return
+
+ if manual_trigger:
+ logger.info(f"Manual trigger - bypassing cooldown for {target.display_name} in server {guild_id}")
activity_name = None
if target.activities:
@@ -393,26 +501,40 @@ async def miku_detect_and_join_conversation_for_server(guild_id: int, force: boo
except Exception as e:
logger.error(f"Failed to interject in conversation: {e}")
-async def share_miku_tweet_for_server(guild_id: int):
- """Share a Miku tweet in a specific server"""
+async def share_miku_tweet_for_server(guild_id: int, tweet_url: str = None):
+ """Share a Miku tweet in a specific server
+
+ Args:
+ guild_id: The server ID to share the tweet to
+ tweet_url: Optional URL of a specific tweet to share. If None, fetches a random tweet.
+ """
server_config = server_manager.get_server_config(guild_id)
if not server_config:
logger.warning(f"No config found for server {guild_id}")
return
channel = globals.client.get_channel(server_config.autonomous_channel_id)
- tweets = await fetch_miku_tweets(limit=5)
- if not tweets:
- logger.warning(f"No good tweets found for server {guild_id}")
- return
+
+ # If a specific tweet URL is provided, fetch that tweet
+ if tweet_url:
+ tweet = await fetch_tweet_by_url(tweet_url)
+ if not tweet:
+ logger.error(f"Failed to fetch tweet from URL: {tweet_url}")
+ return
+ else:
+ # Fetch random tweets as usual
+ tweets = await fetch_miku_tweets(limit=5)
+ if not tweets:
+ logger.warning(f"No good tweets found for server {guild_id}")
+ return
- fresh_tweets = [t for t in tweets if t["url"] not in LAST_SENT_TWEETS]
+ fresh_tweets = [t for t in tweets if t["url"] not in LAST_SENT_TWEETS]
- if not fresh_tweets:
- logger.warning(f"All fetched tweets were recently sent in server {guild_id}. Reusing tweets.")
- fresh_tweets = tweets
+ if not fresh_tweets:
+ logger.warning(f"All fetched tweets were recently sent in server {guild_id}. Reusing tweets.")
+ fresh_tweets = tweets
- tweet = random.choice(fresh_tweets)
+ tweet = random.choice(fresh_tweets)
LAST_SENT_TWEETS.append(tweet["url"])
if len(LAST_SENT_TWEETS) > 50:
@@ -506,15 +628,16 @@ async def miku_say_something_general():
for guild_id in server_manager.servers:
await miku_say_something_general_for_server(guild_id)
-async def miku_engage_random_user(user_id: str = None, engagement_type: str = None):
+async def miku_engage_random_user(user_id: str = None, engagement_type: str = None, manual_trigger: bool = False):
"""Legacy function - now runs for all servers
Args:
user_id: Optional specific user ID to engage
engagement_type: Optional engagement style
+ manual_trigger: If True, bypass cooldown checks (for web UI manual triggers)
"""
for guild_id in server_manager.servers:
- await miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type)
+ await miku_engage_random_user_for_server(guild_id, user_id=user_id, engagement_type=engagement_type, manual_trigger=manual_trigger)
async def miku_detect_and_join_conversation(force: bool = False):
"""Legacy function - now runs for all servers
@@ -525,10 +648,14 @@ async def miku_detect_and_join_conversation(force: bool = False):
for guild_id in server_manager.servers:
await miku_detect_and_join_conversation_for_server(guild_id, force=force)
-async def share_miku_tweet():
- """Legacy function - now runs for all servers"""
+async def share_miku_tweet(tweet_url: str = None):
+ """Legacy function - now runs for all servers
+
+ Args:
+ tweet_url: Optional URL of a specific tweet to share. If None, fetches a random tweet.
+ """
for guild_id in server_manager.servers:
- await share_miku_tweet_for_server(guild_id)
+ await share_miku_tweet_for_server(guild_id, tweet_url=tweet_url)
async def handle_custom_prompt(user_prompt: str):
"""Legacy function - now runs for all servers"""