eve / app.py
Chandima Prabhath
Add random seed to voice reply generation and enhance image sending function with customizable caption
4beb4cc
raw
history blame
13.7 kB
import os
import threading
import requests
import logging
import queue
import re
import json
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse, JSONResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from llm import generate_llm
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
# Env vars
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
image_dir = "/tmp/images"
audio_dir = "/tmp/audio"
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]):
raise ValueError("Environment variables are not set properly")
# Queues & in‑memory stores
task_queue = queue.Queue()
trivia_store = {} # chat_id β†’ {"question":…, "answer":…}
polls = {} # chat_id β†’ {"question":…, "options":[…], "votes":{1:0…}, "voters":{jid:opt}}
app = FastAPI()
# Background worker
def worker():
while True:
task = task_queue.get()
try:
typ = task["type"]
mid = task["message_id"]
cid = task["chat_id"]
if typ == "image":
handle_image_generation(mid, cid, task["prompt"])
elif typ == "audio":
response_audio(mid, cid, task["prompt"])
except Exception as e:
logging.error(f"Error processing {task}: {e}")
finally:
task_queue.task_done()
threading.Thread(target=worker, daemon=True).start()
# --- send helpers ---
def send_message(message_id, to_number, message, retries=3):
chat_id = to_number if to_number.endswith("@g.us") else to_number
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}"
payload = {"chatId": chat_id, "message": message, "quotedMessageId": message_id}
for i in range(retries):
try:
r = requests.post(url, json=payload)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
if i == retries-1:
return {"error": str(e)}
def send_image(message_id, to_number, image_path, caption = "Here you go!", retries=3):
chat_id = to_number if to_number.endswith("@g.us") else to_number
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {"chatId": chat_id, "caption": caption, "quotedMessageId": message_id}
files = [("file", ("image.jpg", open(image_path, "rb"), "image/jpeg"))]
for i in range(retries):
try:
r = requests.post(url, data=payload, files=files)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
if i == retries-1:
return {"error": str(e)}
def send_audio(message_id, to_number, audio_path, retries=3):
logging.debug("send_audio")
chat_id = to_number if to_number.endswith("@g.us") else to_number
if not os.path.exists(audio_path):
logging.debug(f"Missing audio: {audio_path}")
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {"chatId": chat_id, "caption": "Here is your voice reply!", "quotedMessageId": message_id}
try:
with open(audio_path, "rb") as f:
files = [("file", ("audio.mp3", f, "audio/mpeg"))]
for i in range(retries):
try:
r = requests.post(url, data=payload, files=files)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
if i == retries-1:
return {"error": str(e)}
except Exception as e:
return {"error": str(e)}
# --- core response fns ---
def response_text(message_id, chat_id, prompt):
try:
msg = generate_llm(prompt)
send_message(message_id, chat_id, msg)
except:
send_message(message_id, chat_id, "Error processing your request.")
def response_audio(message_id, chat_id, prompt):
logging.debug("response_audio prompt=%s", prompt)
try:
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir)
if result and result[0]:
audio_path, _ = result
send_audio(message_id, chat_id, audio_path)
if os.path.exists(audio_path):
os.remove(audio_path)
else:
response_text(message_id, chat_id, prompt)
except Exception as e:
logging.debug("audio error: %s", e)
send_message(message_id, chat_id, "Error generating audio. Try again later.")
def handle_image_generation(message_id, chat_id, prompt):
try:
img, path, ret_prompt, url = generate_image(prompt, message_id, message_id, image_dir)
if img:
# Split the ret_prompt into paragraphs and wrap each in underscores for italics.
formatted_ret_prompt = "\n\n".join(
f"_{paragraph.strip()}_" for paragraph in ret_prompt.split("\n\n") if paragraph.strip()
)
send_image(
message_id,
chat_id,
path,
caption=f"✨ Image ready: {url}\n>{chr(8203)} {formatted_ret_prompt}"
)
else:
send_message(message_id, chat_id, "Image generation failed.")
except Exception as e:
logging.error("Error in handle_image_generation: %s", e)
send_message(message_id, chat_id, "Error generating image.")
# --- webhook ---
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
# auth & parse
auth = request.headers.get("Authorization", "").strip()
if auth != f"Bearer {WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
try:
data = await request.json()
except:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
if data.get("typeWebhook") != "incomingMessageReceived":
return {"success": True}
logging.debug("recv: %s", data)
sd = data["senderData"]
chat = sd["chatId"]
mid = data["idMessage"]
sender_jid = sd.get("sender")
md = data.get("messageData", {})
# drop any WhatsApp native quoted‐message event
if md.get("typeMessage") == "quotedMessage" or "quotedMessage" in md:
logging.debug("skip native quotedMessage")
return {"success": True}
# extract text + contextInfo
if "textMessageData" in md:
body = md["textMessageData"].get("textMessage","").strip()
ctx = md["textMessageData"].get("contextInfo",{})
elif "extendedTextMessageData" in md:
body = md["extendedTextMessageData"].get("text","").strip()
ctx = md["extendedTextMessageData"].get("contextInfo",{})
else:
return {"success": True}
# ignore native mentions & plain @123
if ctx.get("mentionedJid") or ctx.get("mentionedJidList"):
return {"success": True}
if chat.endswith("@g.us") and re.search(r"@\d+", body):
return {"success": True}
# β€”β€”β€” NEW COMMANDS β€”β€”β€”
low = body.lower()
# HELP
if low == "/help":
help_text = (
"πŸ€– *Commands*: \n"
"/help\n"
"/summarize <text>\n"
"/translate <lang>|<text>\n"
"/joke\n"
"/weather <location>\n"
"/inspire\n"
"/trivia β†’ new trivia\n"
"/answer β†’ reveal answer\n"
"/meme <text>\n"
"/poll <Q>|<opt1>|<opt2>|…\n"
"/results\n"
"/endpoll\n"
"/imagine <prompt>\n"
"Or just send any text and I’ll reply by voice!"
)
send_message(mid, chat, help_text)
return {"success": True}
# SUMMARIZE
if low.startswith("/summarize "):
txt = body[len("/summarize "):].strip()
summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{txt}")
send_message(mid, chat, summary)
return {"success": True}
# TRANSLATE
if low.startswith("/translate "):
part = body[len("/translate "):]
if "|" not in part:
send_message(mid, chat, "Use `/translate Language|Text`")
else:
lang, txt = part.split("|",1)
resp = generate_llm(f"Translate the following into {lang.strip()}:\n\n{txt.strip()}")
send_message(mid, chat, resp)
return {"success": True}
# JOKE
if low == "/joke":
try:
joke = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json()
send_message(mid, chat, f"{joke['setup']}\n\n{joke['punchline']}")
except:
send_message(mid, chat, generate_llm("Tell me a short, funny joke."))
return {"success": True}
# WEATHER
if low.startswith("/weather "):
loc = body[len("/weather "):].strip().replace(" ", "+")
try:
w = requests.get(f"http://wttr.in/{loc}?format=3", timeout=5).text
send_message(mid, chat, w)
except:
send_message(mid, chat, "Could not fetch weather.")
return {"success": True}
# INSPIRE
if low == "/inspire":
quote = generate_llm("Give me a short inspirational quote.")
send_message(mid, chat, f"✨ {quote}")
return {"success": True}
# TRIVIA
if low == "/trivia":
raw = generate_llm(
"Generate a trivia question and answer in JSON: "
"{\"question\":\"...\",\"answer\":\"...\"}"
)
try:
obj = json.loads(raw)
trivia_store[chat] = obj
send_message(mid, chat, f"❓ {obj['question']}\nReply `/answer` to see the answer.")
except:
send_message(mid, chat, "Failed to generate trivia.")
return {"success": True}
# ANSWER
if low == "/answer":
if chat in trivia_store:
ans = trivia_store.pop(chat)["answer"]
send_message(mid, chat, f"πŸ’‘ Answer: {ans}")
else:
send_message(mid, chat, "No active trivia. Send `/trivia`.")
return {"success": True}
# MEME
if low.startswith("/meme "):
txt = body[len("/meme "):].strip()
send_message(mid, chat, "🎨 Generating your meme...")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat,
"prompt": f"meme template with text: {txt}"
})
return {"success": True}
# POLL
if low.startswith("/poll "):
parts = body[len("/poll "):].split("|")
if len(parts) < 3:
send_message(mid, chat, "Use `/poll Question|Option1|Option2[...]`")
else:
q = parts[0].strip()
opts = [p.strip() for p in parts[1:]]
votes = {i+1: 0 for i in range(len(opts))}
polls[chat] = {"question": q, "options": opts, "votes": votes, "voters": {}}
txt = f"πŸ“Š *Poll:* {q}\n" + "\n".join(
f"{i+1}. {opt}" for i,opt in enumerate(opts)
) + "\n\nReply with the *option number* to vote."
send_message(mid, chat, txt)
return {"success": True}
# VOTE in poll
if chat in polls and body.isdigit():
n = int(body)
p = polls[chat]
if 1 <= n <= len(p["options"]):
prev = p["voters"].get(sender_jid)
if prev:
p["votes"][prev] -= 1
p["votes"][n] += 1
p["voters"][sender_jid] = n
send_message(mid, chat, f"βœ… Vote recorded: {p['options'][n-1]}")
return {"success": True}
# POLL RESULTS
if low == "/results":
if chat in polls:
p = polls[chat]
txt = f"πŸ“Š *Results:* {p['question']}\n" + "\n".join(
f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0
)
send_message(mid, chat, txt)
else:
send_message(mid, chat, "No active poll.")
return {"success": True}
# END POLL
if low == "/endpoll":
if chat in polls:
p = polls.pop(chat)
txt = f"πŸ“Š *Final Results:* {p['question']}\n" + "\n".join(
f"{i}. {opt}: {p['votes'][i]}" for i,opt in enumerate([""]+p["options"]) if i>0
)
send_message(mid, chat, txt)
else:
send_message(mid, chat, "No active poll.")
return {"success": True}
# IMAGINE (existing)
if low.startswith("/imagine"):
prompt = body[len("/imagine"):].strip()
if not prompt:
send_message(mid, chat, "Use `/imagine <prompt>`")
else:
send_message(mid, chat, "✨ Generating image...")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat,
"prompt": prompt
})
return {"success": True}
# fallback β†’ voice reply
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": chat,
"prompt": body
})
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)