Spaces:
Running
Running
Chandima Prabhath
Enhance configuration to support function calling for image generation and text replies; improve help text for user commands.
3ad83d3
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import json | |
import time | |
import random | |
from concurrent.futures import ThreadPoolExecutor | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm | |
# --- Configuration and Client Classes --- | |
class BotConfig: | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
BOT_GROUP_CHAT = "[email protected]" | |
BOT_JID = os.getenv("BOT_JID") # your bot's own WhatsApp JID | |
IMAGE_DIR = "/tmp/images" | |
AUDIO_DIR = "/tmp/audio" | |
DEFAULT_IMAGE_COUNT = 4 | |
def validate(cls): | |
missing = [ | |
name for name in ( | |
"GREEN_API_URL", | |
"GREEN_API_TOKEN", | |
"GREEN_API_ID_INSTANCE", | |
"WEBHOOK_AUTH_TOKEN", | |
"BOT_JID", | |
) if not getattr(cls, name) | |
] | |
if missing: | |
raise ValueError(f"Missing env vars: {', '.join(missing)}") | |
class BotClient: | |
def __init__(self, cfg: BotConfig): | |
self.cfg = cfg | |
self.session = requests.Session() | |
logging.basicConfig(level=logging.DEBUG, | |
format="%(asctime)s [%(levelname)s] %(message)s") | |
def send(self, endpoint: str, payload: dict, files=None, retries=3): | |
url = (f"{self.cfg.GREEN_API_URL}/waInstance" | |
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" | |
f"{self.cfg.GREEN_API_TOKEN}") | |
for attempt in range(1, retries + 1): | |
try: | |
resp = self.session.post( | |
url, | |
json=payload if files is None else None, | |
data=None if files is None else payload, | |
files=files | |
) | |
resp.raise_for_status() | |
return resp.json() | |
except requests.RequestException as e: | |
logging.warning(f"Attempt {attempt}/{retries} failed for {endpoint}: {e}") | |
if attempt == retries: | |
logging.error(f"{endpoint} ultimately failed: {e}") | |
return {"error": str(e)} | |
def send_message(self, message_id: str, chat_id: str, text: str): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text, | |
"quotedMessageId": message_id | |
}) | |
def send_message_to(self, chat_id: str, text: str): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text | |
}) | |
def send_media(self, message_id: str, chat_id: str, file_path: str, | |
caption: str, media_type: str): | |
endpoint = "sendFileByUpload" | |
payload = { | |
"chatId": chat_id, | |
"caption": caption, | |
"quotedMessageId": message_id | |
} | |
with open(file_path, "rb") as f: | |
mime = "image/jpeg" if media_type == "image" else "audio/mpeg" | |
files = [("file", (os.path.basename(file_path), f, mime))] | |
return self.send(endpoint, payload, files=files) | |
# Validate env | |
BotConfig.validate() | |
client = BotClient(BotConfig) | |
# --- Threading, Queues, Stores --- | |
task_queue = queue.Queue() | |
trivia_store = {} | |
polls = {} | |
last_message_time = time.time() | |
def inactivity_monitor(): | |
global last_message_time | |
while True: | |
time.sleep(60) | |
if time.time() - last_message_time >= 300: | |
client.send_message_to( | |
BotConfig.BOT_GROUP_CHAT, | |
"⏰ I haven't heard from you in a while! I'm still here if you need anything." | |
) | |
last_message_time = time.time() | |
threading.Thread(target=inactivity_monitor, daemon=True).start() | |
executor = ThreadPoolExecutor(max_workers=4) | |
def worker(): | |
while True: | |
task = task_queue.get() | |
try: | |
if task["type"] == "image": | |
_fn_generate_images(task["message_id"], | |
task["chat_id"], | |
task["prompt"], | |
task.get("num_images", 1)) | |
elif task["type"] == "audio": | |
_fn_voice_reply(task["message_id"], | |
task["chat_id"], | |
task["prompt"]) | |
except Exception as e: | |
logging.error(f"Worker error {task}: {e}") | |
finally: | |
task_queue.task_done() | |
for _ in range(4): | |
threading.Thread(target=worker, daemon=True).start() | |
# --- Primitive “tool” functions --- | |
def _fn_summarize(message_id, chat_id, text): | |
summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{text}") | |
client.send_message(message_id, chat_id, summary) | |
def _fn_translate(message_id, chat_id, lang, text): | |
resp = generate_llm(f"Translate the following into {lang}:\n\n{text}") | |
client.send_message(message_id, chat_id, resp) | |
def _fn_joke(message_id, chat_id): | |
try: | |
j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
joke = f"{j['setup']}\n\n{j['punchline']}" | |
except: | |
joke = generate_llm("Tell me a short, funny joke.") | |
client.send_message(message_id, chat_id, joke) | |
def _fn_weather(message_id, chat_id, loc): | |
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
report = generate_llm( | |
f"Convert this weather report into Celsius and craft a short, creative report:\n\n{raw}" | |
) | |
client.send_message(message_id, chat_id, report) | |
task_queue.put({ | |
"type":"audio","message_id":message_id,"chat_id":chat_id, | |
"prompt":f"Speak only this weather report: {report}" | |
}) | |
def _fn_weather_poem(message_id, chat_id, loc): | |
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
poem = generate_llm( | |
f"Write a short, poetic weather summary in Celsius based on:\n\n{raw}" | |
) | |
client.send_message(message_id, chat_id, poem) | |
task_queue.put({ | |
"type":"audio","message_id":message_id,"chat_id":chat_id, | |
"prompt":f"Speak only this poetic weather summary: {poem}" | |
}) | |
def _fn_inspire(message_id, chat_id): | |
quote = generate_llm(f"Give me a short inspirational unique quote.") | |
client.send_message(message_id, chat_id, f"✨ {quote}") | |
def _fn_trivia(message_id, chat_id): | |
raw = generate_llm( | |
f"Generate a unique trivia Q&A in JSON: {{\"question\":\"...\",\"answer\":\"...\"}}" | |
) | |
try: | |
obj = json.loads(raw.strip().strip("```json").strip("```")) | |
trivia_store[chat_id] = obj | |
client.send_message( | |
message_id, chat_id, | |
f"❓ {obj['question']}\nReply `/answer` or `/answer your guess`." | |
) | |
except: | |
client.send_message(message_id, chat_id, "Failed to generate trivia.") | |
def _fn_answer(message_id, chat_id, guess): | |
if chat_id not in trivia_store: | |
client.send_message(message_id, chat_id, "No active trivia. `/trivia` to start.") | |
return | |
qa = trivia_store.pop(chat_id) | |
if guess: | |
verdict = generate_llm( | |
f"Q: {qa['question']}\nCorrect: {qa['answer']}\nUser: {guess}\nCorrect?" | |
) | |
client.send_message(message_id, chat_id, verdict) | |
else: | |
client.send_message(message_id, chat_id, f"💡 Answer: {qa['answer']}") | |
def _fn_meme(message_id, chat_id, txt): | |
client.send_message(message_id, chat_id, "🎨 Generating your meme...") | |
task_queue.put({"type":"image","message_id":message_id, | |
"chat_id":chat_id,"prompt":f"meme: {txt}"}) | |
def _fn_poll(message_id, chat_id, question, options): | |
votes = {i+1:0 for i in range(len(options))} | |
polls[chat_id] = {"question":question,"options":options,"votes":votes,"voters":{}} | |
text = f"📊 *Poll:* {question}\n" + "\n".join( | |
f"{i+1}. {o}" for i,o in enumerate(options) | |
) | |
client.send_message(message_id, chat_id, text) | |
def _fn_poll_vote(message_id, chat_id, voter, choice): | |
poll = polls.get(chat_id) | |
if not poll or choice < 1 or choice > len(poll["options"]): | |
return | |
prev = poll["voters"].get(voter) | |
if prev: | |
poll["votes"][prev] -= 1 | |
poll["votes"][choice] += 1 | |
poll["voters"][voter] = choice | |
client.send_message(message_id, chat_id, | |
f"✅ Voted for {poll['options'][choice-1]}") | |
def _fn_poll_results(message_id, chat_id): | |
poll = polls.get(chat_id) | |
if not poll: | |
client.send_message(message_id, chat_id, "No active poll.") | |
return | |
text = f"📊 *Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
client.send_message(message_id, chat_id, text) | |
def _fn_poll_end(message_id, chat_id): | |
poll = polls.pop(chat_id, None) | |
if not poll: | |
client.send_message(message_id, chat_id, "No active poll.") | |
return | |
text = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
client.send_message(message_id, chat_id, text) | |
def _fn_generate_images(message_id, chat_id, prompt, count=1): | |
for i in range(1, count+1): | |
try: | |
img, path, ret_prompt, url = generate_image( | |
prompt, message_id, message_id, BotConfig.IMAGE_DIR | |
) | |
formatted = "\n\n".join(f"_{p.strip()}_" | |
for p in ret_prompt.split("\n\n") if p.strip()) | |
caption = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" | |
client.send_media(message_id, chat_id, path, caption, media_type="image") | |
os.remove(path) | |
except Exception as e: | |
logging.warning(f"Image {i}/{count} failed: {e}") | |
client.send_message(message_id, chat_id, | |
f"😢 Failed to generate image {i}/{count}.") | |
def _fn_voice_reply(message_id, chat_id, prompt): | |
result = generate_voice_reply(prompt, | |
model="openai-audio", | |
voice="coral", | |
audio_dir=BotConfig.AUDIO_DIR) | |
if result and result[0]: | |
audio_path, _ = result | |
client.send_media(message_id, chat_id, audio_path, "", media_type="audio") | |
os.remove(audio_path) | |
else: | |
# fallback to text | |
response = generate_llm(prompt) | |
client.send_message(message_id, chat_id, response) | |
# --- Intent router for fallback --- | |
FUNCTION_SCHEMA = { | |
"generate_image": { | |
"description": "Generate one or more images", | |
"params": ["prompt","count"] | |
}, | |
"send_text": { | |
"description": "Send a plain text response", | |
"params": ["message"] | |
} | |
} | |
def route_intent(user_input: str): | |
""" | |
Ask the LLM whether to call a function or just chat. | |
Expects a JSON response like: | |
{"action":"generate_image","prompt":"a sunset","count":2} | |
or | |
{"action":"send_text","message":"Here's my reply..."} | |
""" | |
sys_prompt = ( | |
"You are Eve. You can either chat normally or call one of these functions:\n" | |
+ "\n".join(f"- {name}: {info['description']}" | |
for name,info in FUNCTION_SCHEMA.items()) | |
+ "\n\nIf the user wants an image generated, return JSON with " | |
"\"action\":\"generate_image\",\"prompt\":\"...\",\"count\":<int>.\n" | |
"Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n" | |
"Do NOT wrap your response in any extra text—only raw JSON." | |
) | |
raw = generate_llm(f"{sys_prompt}\nUser: {user_input}") | |
try: | |
return json.loads(raw) | |
except: | |
# fallback: treat entire raw as chat | |
return {"action":"send_text","message":raw} | |
# --- FastAPI App & Webhook --- | |
app = FastAPI() | |
help_text = ( | |
"🤖 *Hi, I'm Eve!* Commands:\n" | |
"• /help\n" | |
"• /summarize <text>\n" | |
"• /translate <lang>|<text>\n" | |
"• /joke\n" | |
"• /weather <loc>\n" | |
"• /weatherpoem <loc>\n" | |
"• /inspire\n" | |
"• /trivia / /answer\n" | |
"• /meme <text>\n" | |
"• /poll <Q>|<opt1>|… / /results / /endpoll\n" | |
"• /gen <prompt>|<count>\n" | |
"Otherwise I’ll chat or generate images for you!" | |
) | |
async def whatsapp_webhook(request: Request): | |
global last_message_time | |
last_message_time = time.time() | |
# Auth | |
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(403, "Unauthorized") | |
data = await request.json() | |
chat_id = data.get("senderData", {}).get("chatId") | |
if chat_id != BotConfig.BOT_GROUP_CHAT or data.get("typeWebhook") != "incomingMessageReceived": | |
return {"success": True} | |
md = data["messageData"] | |
mid = data["idMessage"] | |
tmd = md.get("textMessageData") or md.get("extendedTextMessageData") | |
if not tmd: | |
return {"success": True} | |
body = tmd.get("textMessage", tmd.get("text", "")).strip() | |
ctx = tmd.get("contextInfo", {}) | |
# 1) Quoted‑reply to bot | |
if md.get("typeMessage") == "quotedMessage": | |
ext = md["extendedTextMessageData"] | |
quoted = md["quotedMessage"] | |
if ext.get("participant") == BotConfig.BOT_JID: | |
user_reply = ext.get("text", "") | |
quoted_text = quoted.get("textMessage", "") | |
prompt = ( | |
f"You asked: {quoted_text}\n" | |
f"User replied: {user_reply}\n" | |
"Provide a helpful follow‑up." | |
) | |
ans = generate_llm(prompt) | |
client.send_message(mid, chat_id, ans) | |
task_queue.put({ | |
"type":"audio","message_id":mid, | |
"chat_id":chat_id,"prompt":ans | |
}) | |
return {"success": True} | |
# 2) Mentions skip | |
if ctx.get("mentionedJidList"): | |
return {"success": True} | |
low = body.lower() | |
# 3) Slash‑commands | |
if low == "/help": | |
client.send_message(mid, chat_id, help_text); return {"success": True} | |
if low.startswith("/summarize "): | |
_fn_summarize(mid, chat_id, body[11:].strip()); return {"success": True} | |
if low.startswith("/translate "): | |
lang, txt = body[11:].split("|",1) | |
_fn_translate(mid, chat_id, lang.strip(), txt.strip()); return {"success": True} | |
if low == "/joke": | |
_fn_joke(mid, chat_id); return {"success": True} | |
if low.startswith("/weather "): | |
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")); return {"success": True} | |
if low.startswith("/weatherpoem "): | |
_fn_weather_poem(mid, chat_id, body[13:].strip().replace(" ","+")); return {"success": True} | |
if low == "/inspire": | |
_fn_inspire(mid, chat_id); return {"success": True} | |
if low == "/trivia": | |
_fn_trivia(mid, chat_id); return {"success": True} | |
if low.startswith("/answer"): | |
_fn_answer(mid, chat_id, body[7:].strip()); return {"success": True} | |
if low.startswith("/meme "): | |
_fn_meme(mid, chat_id, body[6:].strip()); return {"success": True} | |
if low.startswith("/poll "): | |
parts = [p.strip() for p in body[6:].split("|")] | |
_fn_poll(mid, chat_id, parts[0], parts[1:]); return {"success": True} | |
if chat_id in polls and low.isdigit(): | |
_fn_poll_vote(mid, chat_id, | |
data["senderData"]["sender"], | |
int(low)); return {"success": True} | |
if low == "/results": | |
_fn_poll_results(mid, chat_id); return {"success": True} | |
if low == "/endpoll": | |
_fn_poll_end(mid, chat_id); return {"success": True} | |
if low.startswith("/gen"): | |
parts = body[4:].split("|",1) | |
prompt = parts[0].strip() | |
cnt = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT | |
client.send_message(mid, chat_id, f"✨ Generating {cnt} image(s)…") | |
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id, | |
"prompt":prompt,"num_images":cnt}) | |
return {"success": True} | |
# 4) Fallback → function calling router | |
intent = route_intent(body) | |
act = intent.get("action") | |
if act == "generate_image": | |
pr = intent.get("prompt","") | |
ct = intent.get("count",1) | |
client.send_message(mid, chat_id, f"👍 Generating {ct} images for “{pr}”…") | |
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id, | |
"prompt":pr,"num_images":ct}) | |
else: | |
# send_text or any unknown | |
msg = intent.get("message", "Sorry, I didn't understand.") | |
client.send_message(mid, chat_id, msg) | |
task_queue.put({"type":"audio","message_id":mid,"chat_id":chat_id, | |
"prompt":msg}) | |
return {"success": True} | |
def index(): | |
return "Server is running!" | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |