Spaces:
Running
Running
Chandima Prabhath
Refactor message handling to use _fn_send_text for consistent text and audio responses; update voice reply prompt for a playful tone.
1ef8ab5
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import json | |
import time | |
import random | |
from concurrent.futures import ThreadPoolExecutor | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm | |
# --- Configuration and Client Classes --- | |
class BotConfig: | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
BOT_GROUP_CHAT = "[email protected]" | |
BOT_JID = os.getenv("BOT_JID") | |
IMAGE_DIR = "/tmp/images" | |
AUDIO_DIR = "/tmp/audio" | |
DEFAULT_IMAGE_COUNT = 4 | |
def validate(cls): | |
missing = [n for n in ( | |
"GREEN_API_URL","GREEN_API_TOKEN", | |
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID" | |
) if not getattr(cls, n)] | |
if missing: | |
raise ValueError(f"Missing env vars: {', '.join(missing)}") | |
class BotClient: | |
def __init__(self, cfg: BotConfig): | |
self.cfg = cfg | |
self.session = requests.Session() | |
logging.basicConfig(level=logging.DEBUG, | |
format="%(asctime)s [%(levelname)s] %(message)s") | |
def send(self, endpoint, payload, files=None, retries=3): | |
url = (f"{self.cfg.GREEN_API_URL}/waInstance" | |
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" | |
f"{self.cfg.GREEN_API_TOKEN}") | |
for i in range(1, retries+1): | |
try: | |
resp = self.session.post( | |
url, | |
json=payload if files is None else None, | |
data=None if files is None else payload, | |
files=files | |
) | |
resp.raise_for_status() | |
return resp.json() | |
except requests.RequestException as e: | |
logging.warning(f"{endpoint} attempt {i}/{retries} failed: {e}") | |
return {"error":"failed"} | |
def send_message(self, message_id, chat_id, text): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text, | |
"quotedMessageId": message_id | |
}) | |
def send_message_to(self, chat_id, text): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text | |
}) | |
def send_media(self, message_id, chat_id, file_path, caption, media_type): | |
endpoint = "sendFileByUpload" | |
payload = { | |
"chatId": chat_id, | |
"caption": caption, | |
"quotedMessageId": message_id | |
} | |
with open(file_path,"rb") as f: | |
mime = "image/jpeg" if media_type=="image" else "audio/mpeg" | |
files = [("file",(os.path.basename(file_path),f,mime))] | |
return self.send(endpoint,payload,files=files) | |
# Validate env | |
BotConfig.validate() | |
client = BotClient(BotConfig) | |
# --- Threading & Queues --- | |
task_queue = queue.Queue() | |
polls = {} | |
executor = ThreadPoolExecutor(max_workers=4) | |
def worker(): | |
while True: | |
t = task_queue.get() | |
try: | |
if t["type"]=="image": | |
_fn_generate_images(t["message_id"],t["chat_id"],t["prompt"],t.get("num_images",1)) | |
elif t["type"]=="audio": | |
_fn_voice_reply(t["message_id"],t["chat_id"],t["prompt"]) | |
except Exception as e: | |
logging.error(f"Worker error {t}: {e}") | |
finally: | |
task_queue.task_done() | |
for _ in range(4): | |
threading.Thread(target=worker,daemon=True).start() | |
# --- Tool Functions --- | |
def _fn_summarize(mid, cid, text): | |
s = generate_llm(f"Summarize:\n\n{text}") | |
_fn_send_text(mid, cid, s) | |
def _fn_translate(mid, cid, lang, text): | |
r = generate_llm(f"Translate to {lang}:\n\n{text}") | |
_fn_send_text(mid, cid, r) | |
def _fn_joke(mid, cid): | |
try: | |
j = requests.get("https://official-joke-api.appspot.com/random_joke",timeout=5).json() | |
joke = f"{j['setup']}\n\n{j['punchline']}" | |
except: | |
joke = generate_llm("Tell me a short joke.") | |
_fn_send_text(mid, cid, joke) | |
def _fn_weather(mid, cid, loc): | |
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4",timeout=5).text | |
r = generate_llm(f"Give a weather report in °C:\n\n{raw}") | |
_fn_send_text(mid, cid, r) | |
def _fn_inspire(mid, cid): | |
q = generate_llm("Give me a unique, random short inspirational quote.") | |
_fn_send_text(mid, cid, f"✨ {q}") | |
def _fn_meme(mid, cid, txt): | |
client.send_message(mid, cid, "🎨 Generating meme…") | |
task_queue.put({"type":"image","message_id":mid,"chat_id":cid,"prompt":f"meme: {txt}"}) | |
def _fn_poll_create(mid, cid, question, options): | |
votes = {i+1:0 for i in range(len(options))} | |
polls[cid] = {"question":question,"options":options,"votes":votes,"voters":{}} | |
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options)) | |
_fn_send_text(mid, cid, text) | |
def _fn_poll_vote(mid, cid, voter, choice): | |
poll = polls.get(cid) | |
if not poll or choice<1 or choice>len(poll["options"]): return | |
prev = poll["voters"].get(voter) | |
if prev: poll["votes"][prev]-=1 | |
poll["votes"][choice]+=1 | |
poll["voters"][voter]=choice | |
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}") | |
def _fn_poll_results(mid, cid): | |
poll = polls.get(cid) | |
if not poll: | |
_fn_send_text(mid, cid, "No active poll.") | |
return | |
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
_fn_send_text(mid, cid, txt) | |
def _fn_poll_end(mid, cid): | |
poll = polls.pop(cid,None) | |
if not poll: | |
_fn_send_text(mid, cid, "No active poll.") | |
return | |
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
_fn_send_text(mid, cid, txt) | |
def _fn_generate_images(mid, cid, prompt, count=1): | |
for i in range(1,count+1): | |
try: | |
img,path,ret_p,url = generate_image(prompt,mid,mid,BotConfig.IMAGE_DIR) | |
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip()) | |
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" | |
client.send_media(mid, cid, path, cap, media_type="image") | |
os.remove(path) | |
except Exception as e: | |
logging.warning(f"Img {i}/{count} failed: {e}") | |
_fn_send_text(mid, cid, f"😢 Failed to generate image {i}/{count}.") | |
def _fn_send_text(mid, cid, message): | |
# send text... | |
client.send_message(mid, cid, message) | |
# ...and queue voice with the same content | |
task_queue.put({ | |
"type": "audio", | |
"message_id": mid, | |
"chat_id": cid, | |
"prompt": message | |
}) | |
def _fn_voice_reply(mid, cid, prompt): | |
processed = ( | |
f"Just say this exactly as written in a flirty, friendly, playful, " | |
f"happy and helpful but a little bit clumsy-cute way: {prompt}" | |
) | |
res = generate_voice_reply(processed,model="openai-audio",voice="coral",audio_dir=BotConfig.AUDIO_DIR) | |
if res and res[0]: | |
path,_ = res | |
client.send_media(mid, cid, path, "", media_type="audio") | |
os.remove(path) | |
else: | |
# fallback to text+voice | |
_fn_send_text(mid, cid, prompt) | |
# --- Function schema & router --- | |
FUNCTION_SCHEMA = { | |
"summarize": {"description":"Summarize text","params":["text"]}, | |
"translate": {"description":"Translate text","params":["lang","text"]}, | |
"joke": {"description":"Tell a joke","params":[]}, | |
"weather": {"description":"Weather report","params":["location"]}, | |
"inspire": {"description":"Inspirational quote","params":[]}, | |
"meme": {"description":"Generate meme","params":["text"]}, | |
"poll_create": {"description":"Create poll","params":["question","options"]}, | |
"poll_vote": {"description":"Vote poll","params":["choice"]}, | |
"poll_results": {"description":"Show poll results","params":[]}, | |
"poll_end": {"description":"End poll","params":[]}, | |
"generate_image":{"description":"Generate images","params":["prompt","count"]}, | |
"send_text": {"description":"Send plain text","params":["message"]} | |
} | |
def route_intent(user_input: str): | |
sys_prompt = ( | |
"You are Eve. You can either chat or call one of these functions:\n" | |
+ "\n".join(f"- {n}: {f['description']}" for n,f in FUNCTION_SCHEMA.items()) | |
+ "\n\nTo call a function, return JSON with \"action\":\"<name>\", plus its parameters.\n" | |
"Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n" | |
"Return only raw JSON." | |
) | |
raw = generate_llm(f"{sys_prompt}\nUser: {user_input}") | |
try: | |
return json.loads(raw) | |
except: | |
return {"action":"send_text","message":raw} | |
# --- FastAPI & Webhook --- | |
app = FastAPI() | |
help_text = ( | |
"🤖 *Eve* commands:\n" | |
"• /help\n" | |
"• /summarize <text>\n" | |
"• /translate <lang>|<text>\n" | |
"• /joke\n" | |
"• /weather <loc>\n" | |
"• /inspire\n" | |
"• /meme <text>\n" | |
"• /poll <Q>|… / /results / /endpoll\n" | |
"• /gen <prompt>|<count>\n" | |
"Otherwise chat or reply to my message to invoke tools." | |
) | |
async def whatsapp_webhook(request: Request): | |
data = await request.json() | |
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(403,"Unauthorized") | |
chat_id = data["senderData"]["chatId"] | |
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"]!="incomingMessageReceived": | |
return {"success":True} | |
md = data["messageData"] | |
mid = data["idMessage"] | |
tmd = md.get("textMessageData") or md.get("extendedTextMessageData") | |
if not tmd: | |
return {"success":True} | |
body = (tmd.get("textMessage") or tmd.get("text","")).strip() | |
ctx = tmd.get("contextInfo",{}) | |
# Slash commands | |
low = body.lower() | |
if low=="/help": | |
_fn_send_text(mid,chat_id,help_text); return {"success":True} | |
if low.startswith("/summarize "): | |
_fn_summarize(mid,chat_id,body[11:].strip()); return {"success":True} | |
if low.startswith("/translate "): | |
lang,txt = body[11:].split("|",1) | |
_fn_translate(mid,chat_id,lang.strip(),txt.strip()); return {"success":True} | |
if low=="/joke": | |
_fn_joke(mid,chat_id); return {"success":True} | |
if low.startswith("/weather "): | |
_fn_weather(mid,chat_id,body[9:].strip().replace(" ","+")); return {"success":True} | |
if low=="/inspire": | |
_fn_inspire(mid,chat_id); return {"success":True} | |
if low.startswith("/meme "): | |
_fn_meme(mid,chat_id,body[6:].strip()); return {"success":True} | |
if low.startswith("/poll "): | |
parts=[p.strip() for p in body[6:].split("|")] | |
_fn_poll_create(mid,chat_id,parts[0],parts[1:]); return {"success":True} | |
if chat_id in polls and low.isdigit(): | |
_fn_poll_vote(mid,chat_id,data["senderData"]["sender"],int(low)); return {"success":True} | |
if low=="/results": | |
_fn_poll_results(mid,chat_id); return {"success":True} | |
if low=="/endpoll": | |
_fn_poll_end(mid,chat_id); return {"success":True} | |
if low.startswith("/gen"): | |
parts=body[4:].split("|",1) | |
pr=parts[0].strip() | |
ct=int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT | |
client.send_message(mid,chat_id,f"✨ Generating {ct} images…") | |
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id,"prompt":pr,"num_images":ct}) | |
return {"success":True} | |
# Skip mentions | |
if ctx.get("mentionedJidList"): | |
return {"success":True} | |
# Build effective_text (include quoted if replying to bot) | |
if md.get("typeMessage")=="quotedMessage": | |
ext=md["extendedTextMessageData"] | |
quoted=md["quotedMessage"] | |
if ext.get("participant")==BotConfig.BOT_JID: | |
effective = f"Quoted: {quoted.get('textMessage','')}\nUser: {ext.get('text','')}" | |
else: | |
effective = body | |
else: | |
effective = body | |
# Route intent across all tools | |
intent = route_intent(effective) | |
action = intent.get("action") | |
dispatch = { | |
"summarize": lambda: _fn_summarize(mid,chat_id,intent["text"]), | |
"translate": lambda: _fn_translate(mid,chat_id,intent["lang"],intent["text"]), | |
"joke": lambda: _fn_joke(mid,chat_id), | |
"weather": lambda: _fn_weather(mid,chat_id,intent["location"]), | |
"inspire": lambda: _fn_inspire(mid,chat_id), | |
"meme": lambda: _fn_meme(mid,chat_id,intent["text"]), | |
"poll_create": lambda: _fn_poll_create(mid,chat_id,intent["question"],intent["options"]), | |
"poll_vote": lambda: _fn_poll_vote(mid,chat_id,data["senderData"]["sender"],intent["choice"]), | |
"poll_results": lambda: _fn_poll_results(mid,chat_id), | |
"poll_end": lambda: _fn_poll_end(mid,chat_id), | |
"generate_image":lambda: _fn_generate_images(mid,chat_id,intent["prompt"],intent.get("count",1)), | |
"send_text": lambda: _fn_send_text(mid,chat_id,intent["message"]), | |
} | |
if action in dispatch: | |
dispatch[action]() | |
else: | |
# fallback to send_text (which also queues voice) | |
_fn_send_text(mid,chat_id,intent.get("message","Sorry, I didn't get that.")) | |
return {"success":True} | |
def index(): | |
return "Server is running!" | |
if __name__=="__main__": | |
client.send_message_to( | |
BotConfig.BOT_GROUP_CHAT, | |
"🌟 Eve is online! Type /help to see commands." | |
) | |
import uvicorn | |
uvicorn.run(app,host="0.0.0.0",port=7860) | |