Spaces:
Running
Running
Chandima Prabhath
Refactor image handling in handle_image_generation to streamline image sending and improve error handling
1743c01
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import re | |
import json | |
import time | |
import random | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse, JSONResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") | |
# Env vars | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
BOT_STATUS_CHAT = "[email protected]" # Chat ID for system messages | |
image_dir = "/tmp/images" | |
audio_dir = "/tmp/audio" | |
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
raise ValueError("Environment variables are not set properly") | |
# Queues & inβmemory stores | |
task_queue = queue.Queue() | |
trivia_store = {} # chat_id β {"question":β¦, "answer":β¦} | |
polls = {} # chat_id β {"question":β¦, "options": [β¦], "votes": {1: 0, ...}, "voters": {jid: opt}} | |
app = FastAPI() | |
# Global inactivity tracker | |
last_message_time = time.time() | |
# --- Inactivity Monitor --- | |
def inactivity_monitor(): | |
global last_message_time | |
while True: | |
time.sleep(60) # check every minute | |
if time.time() - last_message_time >= 300: # 5 minutes inactivity | |
if BOT_STATUS_CHAT: | |
reminder = "β° I haven't heard from you in a while! I'm still here if you need anything." | |
send_message("inactivity", BOT_STATUS_CHAT, reminder) | |
last_message_time = time.time() | |
threading.Thread(target=inactivity_monitor, daemon=True).start() | |
# --- Background Worker --- | |
def worker(): | |
while True: | |
task = task_queue.get() | |
try: | |
typ = task["type"] | |
mid = task["message_id"] | |
cid = task["chat_id"] | |
if typ == "image": | |
handle_image_generation(mid, cid, task["prompt"]) | |
elif typ == "audio": | |
response_audio(mid, cid, task["prompt"]) | |
except Exception as e: | |
logging.error(f"Error processing {task}: {e}") | |
finally: | |
task_queue.task_done() | |
threading.Thread(target=worker, daemon=True).start() | |
# --- send helpers --- | |
def send_message_to_chat(to_number, message, retries=3): | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "message": message} | |
for i in range(retries): | |
try: | |
r = requests.post(url, json=payload) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries - 1: | |
logging.error("send_message_to_chat failed: %s", str(e)) | |
return {"error": str(e)} | |
def send_message(message_id, to_number, message, retries=3): | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "message": message, "quotedMessageId": message_id} | |
for i in range(retries): | |
try: | |
r = requests.post(url, json=payload) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries - 1: | |
return {"error": str(e)} | |
def send_image(message_id, to_number, image_path, caption="Here you go!", retries=3): | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "caption": caption, "quotedMessageId": message_id} | |
files = [("file", ("image.jpg", open(image_path, "rb"), "image/jpeg"))] | |
for i in range(retries): | |
try: | |
r = requests.post(url, data=payload, files=files) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries - 1: | |
return {"error": str(e)} | |
def send_audio(message_id, to_number, audio_path, retries=3): | |
logging.debug("send_audio") | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
if not os.path.exists(audio_path): | |
logging.debug(f"Missing audio: {audio_path}") | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "caption": "Here is your voice reply!", "quotedMessageId": message_id} | |
try: | |
with open(audio_path, "rb") as f: | |
files = [("file", ("audio.mp3", f, "audio/mpeg"))] | |
for i in range(retries): | |
try: | |
r = requests.post(url, data=payload, files=files) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries - 1: | |
return {"error": str(e)} | |
except Exception as e: | |
return {"error": str(e)} | |
# --- core response functions --- | |
def response_text(message_id, chat_id, prompt): | |
try: | |
msg = generate_llm(prompt) | |
send_message(message_id, chat_id, msg) | |
except Exception: | |
send_message(message_id, chat_id, "Error processing your request.") | |
def response_audio(message_id, chat_id, prompt): | |
logging.debug("response_audio prompt=%s", prompt) | |
try: | |
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
if result and result[0]: | |
audio_path, _ = result | |
send_audio(message_id, chat_id, audio_path) | |
if os.path.exists(audio_path): | |
os.remove(audio_path) | |
else: | |
response_text(message_id, chat_id, prompt) | |
except Exception as e: | |
logging.debug("audio error: %s", e) | |
send_message(message_id, chat_id, "Error generating audio. Try again later.") | |
def handle_image_generation(message_id, chat_id, prompt): | |
for i in range(4): | |
try: | |
img, path, ret_prompt, url = generate_image(prompt, message_id, message_id, image_dir) | |
if img: | |
formatted_ret_prompt = "\n\n".join( | |
f"_{paragraph.strip()}_" for paragraph in ret_prompt.split("\n\n") if paragraph.strip() | |
) | |
send_image( | |
message_id, | |
chat_id, | |
path, | |
caption=f"β¨ Image ready: {url}\n>{chr(8203)} {formatted_ret_prompt}" | |
) | |
else: | |
send_message(message_id, chat_id, "Image generation failed.") | |
except Exception as e: | |
logging.error("Error in handle_image_generation: %s", e) | |
send_message(message_id, chat_id, "Error generating image.") | |
# --- Startup Message --- | |
def send_startup_message(): | |
if BOT_STATUS_CHAT: | |
startup_msg = "π Hi! I'm Eve, your friendly AI assistant. I'm now live and ready to help with images, voice replies, and more!" | |
resp = send_message_to_chat(BOT_STATUS_CHAT, startup_msg) | |
if "error" in resp: | |
logging.error("Startup message failed: %s", resp["error"]) | |
else: | |
logging.warning("BOT_STATUS_CHAT is not set; startup message not sent.") | |
help_text = ( | |
"π€ *Hi there, I'm Eve!* Here are the commands you can use:\n\n" | |
"β’ */help* β _Show this help message._\n" | |
"β’ */summarize <text>* β _Get a quick summary of your text._\n" | |
"β’ */translate <language>|<text>* β _Translate text to your chosen language._\n" | |
"β’ */joke* β _Enjoy a random, funny joke._\n" | |
"β’ */weather <location>* β _Get the current weather for a location._\n" | |
"β’ */inspire* β _Receive a short inspirational quote._\n" | |
"β’ */trivia* β _Start a new trivia question._\n" | |
"β’ */answer [your answer]* β _Reveal the trivia answer or check your answer if provided._\n" | |
"β’ */meme <text>* β _Generate a fun meme image._\n" | |
"β’ */poll <Question>|<Option1>|<Option2>|β¦* β _Create a poll._\n" | |
"β’ */results* β _See current poll results._\n" | |
"β’ */endpoll* β _End the poll and show final results._\n" | |
"β’ */gen <prompt>* β _Generate an image from your prompt._\n\n" | |
"Send any other text and I'll reply with a voice message. I'm here to help, so don't hesitate to ask!" | |
) | |
# --- Webhook --- | |
async def whatsapp_webhook(request: Request): | |
global last_message_time | |
last_message_time = time.time() | |
auth = request.headers.get("Authorization", "").strip() | |
if auth != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(403, "Unauthorized") | |
try: | |
data = await request.json() | |
chat_id = data["senderData"]["chatId"] | |
print(f"New message from chat ID: {chat_id}") | |
except: | |
return JSONResponse({"error": "Invalid JSON"}, status_code=400) | |
if data.get("typeWebhook") != "incomingMessageReceived": | |
return {"success": True} | |
logging.debug("recv: %s", data) | |
sd = data["senderData"] | |
chat = sd["chatId"] | |
mid = data["idMessage"] | |
sender_jid = sd.get("sender") | |
md = data.get("messageData", {}) | |
if md.get("typeMessage") == "quotedMessage" or "quotedMessage" in md: | |
logging.debug("skip native quotedMessage") | |
return {"success": True} | |
if "textMessageData" in md: | |
body = md["textMessageData"].get("textMessage", "").strip() | |
ctx = md["textMessageData"].get("contextInfo", {}) | |
elif "extendedTextMessageData" in md: | |
body = md["extendedTextMessageData"].get("text", "").strip() | |
ctx = md["extendedTextMessageData"].get("contextInfo", {}) | |
else: | |
return {"success": True} | |
if ctx.get("mentionedJid") or ctx.get("mentionedJidList"): | |
return {"success": True} | |
if chat.endswith("@g.us") and re.search(r"@\d+", body): | |
return {"success": True} | |
low = body.lower() | |
# --- New Commands --- | |
if low == "/help": | |
send_message(mid, chat, help_text) | |
return {"success": True} | |
if low.startswith("/summarize "): | |
txt = body[len("/summarize "):].strip() | |
summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{txt}") | |
send_message(mid, chat, summary) | |
return {"success": True} | |
if low.startswith("/translate "): | |
part = body[len("/translate "):] | |
if "|" not in part: | |
send_message(mid, chat, "Please use `/translate <language>|<text>`") | |
else: | |
lang, txt = part.split("|", 1) | |
resp = generate_llm(f"Translate the following into {lang.strip()}:\n\n{txt.strip()}") | |
send_message(mid, chat, resp) | |
return {"success": True} | |
if low == "/joke": | |
try: | |
joke = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
send_message(mid, chat, f"{joke['setup']}\n\n{joke['punchline']}") | |
except: | |
send_message(mid, chat, generate_llm("Tell me a short, funny joke.")) | |
return {"success": True} | |
if low.startswith("/weather "): | |
loc = body[len("/weather "):].strip().replace(" ", "+") | |
try: | |
w = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
send_message(mid, chat, w) | |
except: | |
send_message(mid, chat, "Could not fetch weather.") | |
return {"success": True} | |
if low == "/inspire": | |
quote = generate_llm("Give me a short inspirational quote.") | |
send_message(mid, chat, f"β¨ {quote}") | |
return {"success": True} | |
# TRIVIA | |
if low == "/trivia": | |
randomSeed = random.randint(0, 9999999) | |
raw = generate_llm( | |
(f"Generate a unique and random trivia question and answer based on this random seed {randomSeed} to make it unique in JSON format. " | |
"The output should strictly follow this example format without extra text:\n\n" | |
"{\"question\": \"What is the capital of France?\", \"answer\": \"Paris\"}") | |
) | |
def extract_json(text): | |
import re | |
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) | |
if match: | |
return match.group(1) | |
return text | |
try: | |
json_text = extract_json(raw) | |
obj = json.loads(json_text) | |
if "question" in obj and "answer" in obj: | |
trivia_store[chat] = obj | |
send_message(mid, chat, f"β {obj['question']}\nReply with `/answer` followed by your answer (if you want to check it) or just `/answer` to reveal the correct answer.") | |
else: | |
raise ValueError("Missing expected keys.") | |
except Exception as e: | |
logging.error("Trivia JSON parse error: %s, raw response: %s", e, raw) | |
send_message(mid, chat, "Failed to generate trivia. Please try again.") | |
return {"success": True} | |
# ANSWER: Accept any message starting with /answer. If additional text is provided, check it. | |
if low.startswith("/answer"): | |
# Remove command and any extra spaces | |
user_response = body[len("/answer"):].strip() | |
if chat in trivia_store: | |
correct_answer = trivia_store[chat]["answer"] | |
question = trivia_store[chat]["question"] | |
# If user provided an answer, evaluate it via LLM; otherwise, just reveal the answer. | |
if user_response: | |
eval_prompt = ( | |
f"Question: {question}\n" | |
f"Correct Answer: {correct_answer}\n" | |
f"User Answer: {user_response}\n" | |
"Is the user's answer correct? Respond with 'Correct' if yes, or 'Incorrect' if not, and explain briefly." | |
) | |
verdict = generate_llm(eval_prompt) | |
send_message(mid, chat, f"π‘ {verdict}") | |
else: | |
send_message(mid, chat, f"π‘ Answer: {correct_answer}") | |
trivia_store.pop(chat, None) | |
else: | |
send_message(mid, chat, "No active trivia. Send `/trivia` to start one.") | |
return {"success": True} | |
if low.startswith("/meme "): | |
txt = body[len("/meme "):].strip() | |
send_message(mid, chat, "π¨ Generating your meme...") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": f"meme template with text: {txt}" | |
}) | |
return {"success": True} | |
if low.startswith("/poll "): | |
parts = body[len("/poll "):].split("|") | |
if len(parts) < 3: | |
send_message(mid, chat, "Please use `/poll Question|Option1|Option2|...`") | |
else: | |
q = parts[0].strip() | |
opts = [p.strip() for p in parts[1:]] | |
votes = {i+1: 0 for i in range(len(opts))} | |
polls[chat] = {"question": q, "options": opts, "votes": votes, "voters": {}} | |
txt = f"π *Poll:* {q}\n" + "\n".join( | |
f"{i+1}. {opt}" for i, opt in enumerate(opts) | |
) + "\n\nReply with the *option number* to vote." | |
send_message(mid, chat, txt) | |
return {"success": True} | |
if chat in polls and body.isdigit(): | |
n = int(body) | |
p = polls[chat] | |
if 1 <= n <= len(p["options"]): | |
prev = p["voters"].get(sender_jid) | |
if prev: | |
p["votes"][prev] -= 1 | |
p["votes"][n] += 1 | |
p["voters"][sender_jid] = n | |
send_message(mid, chat, f"β Vote recorded: {p['options'][n-1]}") | |
return {"success": True} | |
if low == "/results": | |
if chat in polls: | |
p = polls[chat] | |
txt = f"π *Results:* {p['question']}\n" + "\n".join( | |
f"{i}. {opt}: {p['votes'][i]}" for i, opt in enumerate([""] + p["options"]) if i > 0 | |
) | |
send_message(mid, chat, txt) | |
else: | |
send_message(mid, chat, "No active poll.") | |
return {"success": True} | |
if low == "/endpoll": | |
if chat in polls: | |
p = polls.pop(chat) | |
txt = f"π *Final Results:* {p['question']}\n" + "\n".join( | |
f"{i}. {opt}: {p['votes'][i]}" for i, opt in enumerate([""] + p["options"]) if i > 0 | |
) | |
send_message(mid, chat, txt) | |
else: | |
send_message(mid, chat, "No active poll.") | |
return {"success": True} | |
if low.startswith("/gen"): | |
prompt = body[len("/gen"):].strip() | |
if not prompt: | |
send_message(mid, chat, "Please use `/gen <prompt>` to generate an image.") | |
else: | |
send_message(mid, chat, "β¨ Your image is being generated. Please wait...") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": prompt | |
}) | |
return {"success": True} | |
# Fallback: voice reply for any other text | |
task_queue.put({ | |
"type": "audio", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": body | |
}) | |
return {"success": True} | |
def index(): | |
return "Server is running!" | |
if __name__ == "__main__": | |
# Send startup message on launch | |
def send_startup_message(): | |
if BOT_STATUS_CHAT: | |
startup_msg = "π Hi! I'm Eve, your friendly AI assistant. I'm now live and ready to help with images, voice replies, and more!" | |
send_message_to_chat(BOT_STATUS_CHAT, startup_msg) | |
send_message_to_chat(BOT_STATUS_CHAT, help_text) | |
else: | |
logging.warning("BOT_STATUS_CHAT is not set; startup message not sent.") | |
send_startup_message() | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |