eve / app.py
Chandima Prabhath
Refactor message handling to use _fn_send_text for consistent text and audio responses; update voice reply prompt for a playful tone.
1ef8ab5
raw
history blame
14.2 kB
import os
import threading
import requests
import logging
import queue
import json
import time
import random
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm
# --- Configuration and Client Classes ---
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [n for n in (
"GREEN_API_URL","GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID"
) if not getattr(cls, n)]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s")
def send(self, endpoint, payload, files=None, retries=3):
url = (f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}")
for i in range(1, retries+1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logging.warning(f"{endpoint} attempt {i}/{retries} failed: {e}")
return {"error":"failed"}
def send_message(self, message_id, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id, chat_id, file_path, caption, media_type):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path,"rb") as f:
mime = "image/jpeg" if media_type=="image" else "audio/mpeg"
files = [("file",(os.path.basename(file_path),f,mime))]
return self.send(endpoint,payload,files=files)
# Validate env
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading & Queues ---
task_queue = queue.Queue()
polls = {}
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
t = task_queue.get()
try:
if t["type"]=="image":
_fn_generate_images(t["message_id"],t["chat_id"],t["prompt"],t.get("num_images",1))
elif t["type"]=="audio":
_fn_voice_reply(t["message_id"],t["chat_id"],t["prompt"])
except Exception as e:
logging.error(f"Worker error {t}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker,daemon=True).start()
# --- Tool Functions ---
def _fn_summarize(mid, cid, text):
s = generate_llm(f"Summarize:\n\n{text}")
_fn_send_text(mid, cid, s)
def _fn_translate(mid, cid, lang, text):
r = generate_llm(f"Translate to {lang}:\n\n{text}")
_fn_send_text(mid, cid, r)
def _fn_joke(mid, cid):
try:
j = requests.get("https://official-joke-api.appspot.com/random_joke",timeout=5).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
_fn_send_text(mid, cid, joke)
def _fn_weather(mid, cid, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4",timeout=5).text
r = generate_llm(f"Give a weather report in °C:\n\n{raw}")
_fn_send_text(mid, cid, r)
def _fn_inspire(mid, cid):
q = generate_llm("Give me a unique, random short inspirational quote.")
_fn_send_text(mid, cid, f"✨ {q}")
def _fn_meme(mid, cid, txt):
client.send_message(mid, cid, "🎨 Generating meme…")
task_queue.put({"type":"image","message_id":mid,"chat_id":cid,"prompt":f"meme: {txt}"})
def _fn_poll_create(mid, cid, question, options):
votes = {i+1:0 for i in range(len(options))}
polls[cid] = {"question":question,"options":options,"votes":votes,"voters":{}}
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options))
_fn_send_text(mid, cid, text)
def _fn_poll_vote(mid, cid, voter, choice):
poll = polls.get(cid)
if not poll or choice<1 or choice>len(poll["options"]): return
prev = poll["voters"].get(voter)
if prev: poll["votes"][prev]-=1
poll["votes"][choice]+=1
poll["voters"][voter]=choice
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}")
def _fn_poll_results(mid, cid):
poll = polls.get(cid)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_poll_end(mid, cid):
poll = polls.pop(cid,None)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_generate_images(mid, cid, prompt, count=1):
for i in range(1,count+1):
try:
img,path,ret_p,url = generate_image(prompt,mid,mid,BotConfig.IMAGE_DIR)
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip())
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(mid, cid, path, cap, media_type="image")
os.remove(path)
except Exception as e:
logging.warning(f"Img {i}/{count} failed: {e}")
_fn_send_text(mid, cid, f"😢 Failed to generate image {i}/{count}.")
def _fn_send_text(mid, cid, message):
# send text...
client.send_message(mid, cid, message)
# ...and queue voice with the same content
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": cid,
"prompt": message
})
def _fn_voice_reply(mid, cid, prompt):
processed = (
f"Just say this exactly as written in a flirty, friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
res = generate_voice_reply(processed,model="openai-audio",voice="coral",audio_dir=BotConfig.AUDIO_DIR)
if res and res[0]:
path,_ = res
client.send_media(mid, cid, path, "", media_type="audio")
os.remove(path)
else:
# fallback to text+voice
_fn_send_text(mid, cid, prompt)
# --- Function schema & router ---
FUNCTION_SCHEMA = {
"summarize": {"description":"Summarize text","params":["text"]},
"translate": {"description":"Translate text","params":["lang","text"]},
"joke": {"description":"Tell a joke","params":[]},
"weather": {"description":"Weather report","params":["location"]},
"inspire": {"description":"Inspirational quote","params":[]},
"meme": {"description":"Generate meme","params":["text"]},
"poll_create": {"description":"Create poll","params":["question","options"]},
"poll_vote": {"description":"Vote poll","params":["choice"]},
"poll_results": {"description":"Show poll results","params":[]},
"poll_end": {"description":"End poll","params":[]},
"generate_image":{"description":"Generate images","params":["prompt","count"]},
"send_text": {"description":"Send plain text","params":["message"]}
}
def route_intent(user_input: str):
sys_prompt = (
"You are Eve. You can either chat or call one of these functions:\n"
+ "\n".join(f"- {n}: {f['description']}" for n,f in FUNCTION_SCHEMA.items())
+ "\n\nTo call a function, return JSON with \"action\":\"<name>\", plus its parameters.\n"
"Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n"
"Return only raw JSON."
)
raw = generate_llm(f"{sys_prompt}\nUser: {user_input}")
try:
return json.loads(raw)
except:
return {"action":"send_text","message":raw}
# --- FastAPI & Webhook ---
app = FastAPI()
help_text = (
"🤖 *Eve* commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /inspire\n"
"• /meme <text>\n"
"• /poll <Q>|… / /results / /endpoll\n"
"• /gen <prompt>|<count>\n"
"Otherwise chat or reply to my message to invoke tools."
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
data = await request.json()
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403,"Unauthorized")
chat_id = data["senderData"]["chatId"]
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"]!="incomingMessageReceived":
return {"success":True}
md = data["messageData"]
mid = data["idMessage"]
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success":True}
body = (tmd.get("textMessage") or tmd.get("text","")).strip()
ctx = tmd.get("contextInfo",{})
# Slash commands
low = body.lower()
if low=="/help":
_fn_send_text(mid,chat_id,help_text); return {"success":True}
if low.startswith("/summarize "):
_fn_summarize(mid,chat_id,body[11:].strip()); return {"success":True}
if low.startswith("/translate "):
lang,txt = body[11:].split("|",1)
_fn_translate(mid,chat_id,lang.strip(),txt.strip()); return {"success":True}
if low=="/joke":
_fn_joke(mid,chat_id); return {"success":True}
if low.startswith("/weather "):
_fn_weather(mid,chat_id,body[9:].strip().replace(" ","+")); return {"success":True}
if low=="/inspire":
_fn_inspire(mid,chat_id); return {"success":True}
if low.startswith("/meme "):
_fn_meme(mid,chat_id,body[6:].strip()); return {"success":True}
if low.startswith("/poll "):
parts=[p.strip() for p in body[6:].split("|")]
_fn_poll_create(mid,chat_id,parts[0],parts[1:]); return {"success":True}
if chat_id in polls and low.isdigit():
_fn_poll_vote(mid,chat_id,data["senderData"]["sender"],int(low)); return {"success":True}
if low=="/results":
_fn_poll_results(mid,chat_id); return {"success":True}
if low=="/endpoll":
_fn_poll_end(mid,chat_id); return {"success":True}
if low.startswith("/gen"):
parts=body[4:].split("|",1)
pr=parts[0].strip()
ct=int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
client.send_message(mid,chat_id,f"✨ Generating {ct} images…")
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id,"prompt":pr,"num_images":ct})
return {"success":True}
# Skip mentions
if ctx.get("mentionedJidList"):
return {"success":True}
# Build effective_text (include quoted if replying to bot)
if md.get("typeMessage")=="quotedMessage":
ext=md["extendedTextMessageData"]
quoted=md["quotedMessage"]
if ext.get("participant")==BotConfig.BOT_JID:
effective = f"Quoted: {quoted.get('textMessage','')}\nUser: {ext.get('text','')}"
else:
effective = body
else:
effective = body
# Route intent across all tools
intent = route_intent(effective)
action = intent.get("action")
dispatch = {
"summarize": lambda: _fn_summarize(mid,chat_id,intent["text"]),
"translate": lambda: _fn_translate(mid,chat_id,intent["lang"],intent["text"]),
"joke": lambda: _fn_joke(mid,chat_id),
"weather": lambda: _fn_weather(mid,chat_id,intent["location"]),
"inspire": lambda: _fn_inspire(mid,chat_id),
"meme": lambda: _fn_meme(mid,chat_id,intent["text"]),
"poll_create": lambda: _fn_poll_create(mid,chat_id,intent["question"],intent["options"]),
"poll_vote": lambda: _fn_poll_vote(mid,chat_id,data["senderData"]["sender"],intent["choice"]),
"poll_results": lambda: _fn_poll_results(mid,chat_id),
"poll_end": lambda: _fn_poll_end(mid,chat_id),
"generate_image":lambda: _fn_generate_images(mid,chat_id,intent["prompt"],intent.get("count",1)),
"send_text": lambda: _fn_send_text(mid,chat_id,intent["message"]),
}
if action in dispatch:
dispatch[action]()
else:
# fallback to send_text (which also queues voice)
_fn_send_text(mid,chat_id,intent.get("message","Sorry, I didn't get that."))
return {"success":True}
@app.get("/",response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__=="__main__":
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"🌟 Eve is online! Type /help to see commands."
)
import uvicorn
uvicorn.run(app,host="0.0.0.0",port=7860)