Spaces:
Running
Running
Chandima Prabhath
Add random seed to voice reply generation and enhance image sending function with customizable caption
4beb4cc
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import re | |
import json | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse, JSONResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from llm import generate_llm | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") | |
# Env vars | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
image_dir = "/tmp/images" | |
audio_dir = "/tmp/audio" | |
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
raise ValueError("Environment variables are not set properly") | |
# Queues & inβmemory stores | |
task_queue = queue.Queue() | |
trivia_store = {} # chat_id β {"question":β¦, "answer":β¦} | |
polls = {} # chat_id β {"question":β¦, "options":[β¦], "votes":{1:0β¦}, "voters":{jid:opt}} | |
app = FastAPI() | |
# Background worker | |
def worker(): | |
while True: | |
task = task_queue.get() | |
try: | |
typ = task["type"] | |
mid = task["message_id"] | |
cid = task["chat_id"] | |
if typ == "image": | |
handle_image_generation(mid, cid, task["prompt"]) | |
elif typ == "audio": | |
response_audio(mid, cid, task["prompt"]) | |
except Exception as e: | |
logging.error(f"Error processing {task}: {e}") | |
finally: | |
task_queue.task_done() | |
threading.Thread(target=worker, daemon=True).start() | |
# --- send helpers --- | |
def send_message(message_id, to_number, message, retries=3): | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "message": message, "quotedMessageId": message_id} | |
for i in range(retries): | |
try: | |
r = requests.post(url, json=payload) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries-1: | |
return {"error": str(e)} | |
def send_image(message_id, to_number, image_path, caption = "Here you go!", retries=3): | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "caption": caption, "quotedMessageId": message_id} | |
files = [("file", ("image.jpg", open(image_path, "rb"), "image/jpeg"))] | |
for i in range(retries): | |
try: | |
r = requests.post(url, data=payload, files=files) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries-1: | |
return {"error": str(e)} | |
def send_audio(message_id, to_number, audio_path, retries=3): | |
logging.debug("send_audio") | |
chat_id = to_number if to_number.endswith("@g.us") else to_number | |
if not os.path.exists(audio_path): | |
logging.debug(f"Missing audio: {audio_path}") | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {"chatId": chat_id, "caption": "Here is your voice reply!", "quotedMessageId": message_id} | |
try: | |
with open(audio_path, "rb") as f: | |
files = [("file", ("audio.mp3", f, "audio/mpeg"))] | |
for i in range(retries): | |
try: | |
r = requests.post(url, data=payload, files=files) | |
r.raise_for_status() | |
return r.json() | |
except requests.RequestException as e: | |
if i == retries-1: | |
return {"error": str(e)} | |
except Exception as e: | |
return {"error": str(e)} | |
# --- core response fns --- | |
def response_text(message_id, chat_id, prompt): | |
try: | |
msg = generate_llm(prompt) | |
send_message(message_id, chat_id, msg) | |
except: | |
send_message(message_id, chat_id, "Error processing your request.") | |
def response_audio(message_id, chat_id, prompt): | |
logging.debug("response_audio prompt=%s", prompt) | |
try: | |
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
if result and result[0]: | |
audio_path, _ = result | |
send_audio(message_id, chat_id, audio_path) | |
if os.path.exists(audio_path): | |
os.remove(audio_path) | |
else: | |
response_text(message_id, chat_id, prompt) | |
except Exception as e: | |
logging.debug("audio error: %s", e) | |
send_message(message_id, chat_id, "Error generating audio. Try again later.") | |
def handle_image_generation(message_id, chat_id, prompt): | |
try: | |
img, path, ret_prompt, url = generate_image(prompt, message_id, message_id, image_dir) | |
if img: | |
# Split the ret_prompt into paragraphs and wrap each in underscores for italics. | |
formatted_ret_prompt = "\n\n".join( | |
f"_{paragraph.strip()}_" for paragraph in ret_prompt.split("\n\n") if paragraph.strip() | |
) | |
send_image( | |
message_id, | |
chat_id, | |
path, | |
caption=f"β¨ Image ready: {url}\n>{chr(8203)} {formatted_ret_prompt}" | |
) | |
else: | |
send_message(message_id, chat_id, "Image generation failed.") | |
except Exception as e: | |
logging.error("Error in handle_image_generation: %s", e) | |
send_message(message_id, chat_id, "Error generating image.") | |
# --- webhook --- | |
async def whatsapp_webhook(request: Request): | |
# auth & parse | |
auth = request.headers.get("Authorization", "").strip() | |
if auth != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(403, "Unauthorized") | |
try: | |
data = await request.json() | |
except: | |
return JSONResponse({"error": "Invalid JSON"}, status_code=400) | |
if data.get("typeWebhook") != "incomingMessageReceived": | |
return {"success": True} | |
logging.debug("recv: %s", data) | |
sd = data["senderData"] | |
chat = sd["chatId"] | |
mid = data["idMessage"] | |
sender_jid = sd.get("sender") | |
md = data.get("messageData", {}) | |
# drop any WhatsApp native quotedβmessage event | |
if md.get("typeMessage") == "quotedMessage" or "quotedMessage" in md: | |
logging.debug("skip native quotedMessage") | |
return {"success": True} | |
# extract text + contextInfo | |
if "textMessageData" in md: | |
body = md["textMessageData"].get("textMessage","").strip() | |
ctx = md["textMessageData"].get("contextInfo",{}) | |
elif "extendedTextMessageData" in md: | |
body = md["extendedTextMessageData"].get("text","").strip() | |
ctx = md["extendedTextMessageData"].get("contextInfo",{}) | |
else: | |
return {"success": True} | |
# ignore native mentions & plain @123 | |
if ctx.get("mentionedJid") or ctx.get("mentionedJidList"): | |
return {"success": True} | |
if chat.endswith("@g.us") and re.search(r"@\d+", body): | |
return {"success": True} | |
# βββ NEW COMMANDS βββ | |
low = body.lower() | |
# HELP | |
if low == "/help": | |
help_text = ( | |
"π€ *Commands*: \n" | |
"/help\n" | |
"/summarize <text>\n" | |
"/translate <lang>|<text>\n" | |
"/joke\n" | |
"/weather <location>\n" | |
"/inspire\n" | |
"/trivia β new trivia\n" | |
"/answer β reveal answer\n" | |
"/meme <text>\n" | |
"/poll <Q>|<opt1>|<opt2>|β¦\n" | |
"/results\n" | |
"/endpoll\n" | |
"/imagine <prompt>\n" | |
"Or just send any text and Iβll reply by voice!" | |
) | |
send_message(mid, chat, help_text) | |
return {"success": True} | |
# SUMMARIZE | |
if low.startswith("/summarize "): | |
txt = body[len("/summarize "):].strip() | |
summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{txt}") | |
send_message(mid, chat, summary) | |
return {"success": True} | |
# TRANSLATE | |
if low.startswith("/translate "): | |
part = body[len("/translate "):] | |
if "|" not in part: | |
send_message(mid, chat, "Use `/translate Language|Text`") | |
else: | |
lang, txt = part.split("|",1) | |
resp = generate_llm(f"Translate the following into {lang.strip()}:\n\n{txt.strip()}") | |
send_message(mid, chat, resp) | |
return {"success": True} | |
# JOKE | |
if low == "/joke": | |
try: | |
joke = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
send_message(mid, chat, f"{joke['setup']}\n\n{joke['punchline']}") | |
except: | |
send_message(mid, chat, generate_llm("Tell me a short, funny joke.")) | |
return {"success": True} | |
# WEATHER | |
if low.startswith("/weather "): | |
loc = body[len("/weather "):].strip().replace(" ", "+") | |
try: | |
w = requests.get(f"http://wttr.in/{loc}?format=3", timeout=5).text | |
send_message(mid, chat, w) | |
except: | |
send_message(mid, chat, "Could not fetch weather.") | |
return {"success": True} | |
# INSPIRE | |
if low == "/inspire": | |
quote = generate_llm("Give me a short inspirational quote.") | |
send_message(mid, chat, f"β¨ {quote}") | |
return {"success": True} | |
# TRIVIA | |
if low == "/trivia": | |
raw = generate_llm( | |
"Generate a trivia question and answer in JSON: " | |
"{\"question\":\"...\",\"answer\":\"...\"}" | |
) | |
try: | |
obj = json.loads(raw) | |
trivia_store[chat] = obj | |
send_message(mid, chat, f"β {obj['question']}\nReply `/answer` to see the answer.") | |
except: | |
send_message(mid, chat, "Failed to generate trivia.") | |
return {"success": True} | |
# ANSWER | |
if low == "/answer": | |
if chat in trivia_store: | |
ans = trivia_store.pop(chat)["answer"] | |
send_message(mid, chat, f"π‘ Answer: {ans}") | |
else: | |
send_message(mid, chat, "No active trivia. Send `/trivia`.") | |
return {"success": True} | |
# MEME | |
if low.startswith("/meme "): | |
txt = body[len("/meme "):].strip() | |
send_message(mid, chat, "π¨ Generating your meme...") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": f"meme template with text: {txt}" | |
}) | |
return {"success": True} | |
# POLL | |
if low.startswith("/poll "): | |
parts = body[len("/poll "):].split("|") | |
if len(parts) < 3: | |
send_message(mid, chat, "Use `/poll Question|Option1|Option2[...]`") | |
else: | |
q = parts[0].strip() | |
opts = [p.strip() for p in parts[1:]] | |
votes = {i+1: 0 for i in range(len(opts))} | |
polls[chat] = {"question": q, "options": opts, "votes": votes, "voters": {}} | |
txt = f"π *Poll:* {q}\n" + "\n".join( | |
f"{i+1}. {opt}" for i,opt in enumerate(opts) | |
) + "\n\nReply with the *option number* to vote." | |
send_message(mid, chat, txt) | |
return {"success": True} | |
# VOTE in poll | |
if chat in polls and body.isdigit(): | |
n = int(body) | |
p = polls[chat] | |
if 1 <= n <= len(p["options"]): | |
prev = p["voters"].get(sender_jid) | |
if prev: | |
p["votes"][prev] -= 1 | |
p["votes"][n] += 1 | |
p["voters"][sender_jid] = n | |
send_message(mid, chat, f"β Vote recorded: {p['options'][n-1]}") | |
return {"success": True} | |
# POLL RESULTS | |
if low == "/results": | |
if chat in polls: | |
p = polls[chat] | |
txt = f"π *Results:* {p['question']}\n" + "\n".join( | |
f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0 | |
) | |
send_message(mid, chat, txt) | |
else: | |
send_message(mid, chat, "No active poll.") | |
return {"success": True} | |
# END POLL | |
if low == "/endpoll": | |
if chat in polls: | |
p = polls.pop(chat) | |
txt = f"π *Final Results:* {p['question']}\n" + "\n".join( | |
f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0 | |
) | |
send_message(mid, chat, txt) | |
else: | |
send_message(mid, chat, "No active poll.") | |
return {"success": True} | |
# IMAGINE (existing) | |
if low.startswith("/imagine"): | |
prompt = body[len("/imagine"):].strip() | |
if not prompt: | |
send_message(mid, chat, "Use `/imagine <prompt>`") | |
else: | |
send_message(mid, chat, "β¨ Generating image...") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": prompt | |
}) | |
return {"success": True} | |
# fallback β voice reply | |
task_queue.put({ | |
"type": "audio", | |
"message_id": mid, | |
"chat_id": chat, | |
"prompt": body | |
}) | |
return {"success": True} | |
def index(): | |
return "Server is running!" | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |