eve / app.py
Chandima Prabhath
Enhance configuration to support function calling for image generation and text replies; improve help text for user commands.
3ad83d3
raw
history blame
17.5 kB
import os
import threading
import requests
import logging
import queue
import json
import time
import random
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm
# --- Configuration and Client Classes ---
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID") # your bot's own WhatsApp JID
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [
name for name in (
"GREEN_API_URL",
"GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE",
"WEBHOOK_AUTH_TOKEN",
"BOT_JID",
) if not getattr(cls, name)
]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s")
def send(self, endpoint: str, payload: dict, files=None, retries=3):
url = (f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}")
for attempt in range(1, retries + 1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logging.warning(f"Attempt {attempt}/{retries} failed for {endpoint}: {e}")
if attempt == retries:
logging.error(f"{endpoint} ultimately failed: {e}")
return {"error": str(e)}
def send_message(self, message_id: str, chat_id: str, text: str):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id: str, text: str):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id: str, chat_id: str, file_path: str,
caption: str, media_type: str):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path, "rb") as f:
mime = "image/jpeg" if media_type == "image" else "audio/mpeg"
files = [("file", (os.path.basename(file_path), f, mime))]
return self.send(endpoint, payload, files=files)
# Validate env
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading, Queues, Stores ---
task_queue = queue.Queue()
trivia_store = {}
polls = {}
last_message_time = time.time()
def inactivity_monitor():
global last_message_time
while True:
time.sleep(60)
if time.time() - last_message_time >= 300:
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"⏰ I haven't heard from you in a while! I'm still here if you need anything."
)
last_message_time = time.time()
threading.Thread(target=inactivity_monitor, daemon=True).start()
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(task["message_id"],
task["chat_id"],
task["prompt"],
task.get("num_images", 1))
elif task["type"] == "audio":
_fn_voice_reply(task["message_id"],
task["chat_id"],
task["prompt"])
except Exception as e:
logging.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Primitive “tool” functions ---
def _fn_summarize(message_id, chat_id, text):
summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{text}")
client.send_message(message_id, chat_id, summary)
def _fn_translate(message_id, chat_id, lang, text):
resp = generate_llm(f"Translate the following into {lang}:\n\n{text}")
client.send_message(message_id, chat_id, resp)
def _fn_joke(message_id, chat_id):
try:
j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short, funny joke.")
client.send_message(message_id, chat_id, joke)
def _fn_weather(message_id, chat_id, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(
f"Convert this weather report into Celsius and craft a short, creative report:\n\n{raw}"
)
client.send_message(message_id, chat_id, report)
task_queue.put({
"type":"audio","message_id":message_id,"chat_id":chat_id,
"prompt":f"Speak only this weather report: {report}"
})
def _fn_weather_poem(message_id, chat_id, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
poem = generate_llm(
f"Write a short, poetic weather summary in Celsius based on:\n\n{raw}"
)
client.send_message(message_id, chat_id, poem)
task_queue.put({
"type":"audio","message_id":message_id,"chat_id":chat_id,
"prompt":f"Speak only this poetic weather summary: {poem}"
})
def _fn_inspire(message_id, chat_id):
quote = generate_llm(f"Give me a short inspirational unique quote.")
client.send_message(message_id, chat_id, f"✨ {quote}")
def _fn_trivia(message_id, chat_id):
raw = generate_llm(
f"Generate a unique trivia Q&A in JSON: {{\"question\":\"...\",\"answer\":\"...\"}}"
)
try:
obj = json.loads(raw.strip().strip("```json").strip("```"))
trivia_store[chat_id] = obj
client.send_message(
message_id, chat_id,
f"❓ {obj['question']}\nReply `/answer` or `/answer your guess`."
)
except:
client.send_message(message_id, chat_id, "Failed to generate trivia.")
def _fn_answer(message_id, chat_id, guess):
if chat_id not in trivia_store:
client.send_message(message_id, chat_id, "No active trivia. `/trivia` to start.")
return
qa = trivia_store.pop(chat_id)
if guess:
verdict = generate_llm(
f"Q: {qa['question']}\nCorrect: {qa['answer']}\nUser: {guess}\nCorrect?"
)
client.send_message(message_id, chat_id, verdict)
else:
client.send_message(message_id, chat_id, f"💡 Answer: {qa['answer']}")
def _fn_meme(message_id, chat_id, txt):
client.send_message(message_id, chat_id, "🎨 Generating your meme...")
task_queue.put({"type":"image","message_id":message_id,
"chat_id":chat_id,"prompt":f"meme: {txt}"})
def _fn_poll(message_id, chat_id, question, options):
votes = {i+1:0 for i in range(len(options))}
polls[chat_id] = {"question":question,"options":options,"votes":votes,"voters":{}}
text = f"📊 *Poll:* {question}\n" + "\n".join(
f"{i+1}. {o}" for i,o in enumerate(options)
)
client.send_message(message_id, chat_id, text)
def _fn_poll_vote(message_id, chat_id, voter, choice):
poll = polls.get(chat_id)
if not poll or choice < 1 or choice > len(poll["options"]):
return
prev = poll["voters"].get(voter)
if prev:
poll["votes"][prev] -= 1
poll["votes"][choice] += 1
poll["voters"][voter] = choice
client.send_message(message_id, chat_id,
f"✅ Voted for {poll['options'][choice-1]}")
def _fn_poll_results(message_id, chat_id):
poll = polls.get(chat_id)
if not poll:
client.send_message(message_id, chat_id, "No active poll.")
return
text = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
client.send_message(message_id, chat_id, text)
def _fn_poll_end(message_id, chat_id):
poll = polls.pop(chat_id, None)
if not poll:
client.send_message(message_id, chat_id, "No active poll.")
return
text = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
client.send_message(message_id, chat_id, text)
def _fn_generate_images(message_id, chat_id, prompt, count=1):
for i in range(1, count+1):
try:
img, path, ret_prompt, url = generate_image(
prompt, message_id, message_id, BotConfig.IMAGE_DIR
)
formatted = "\n\n".join(f"_{p.strip()}_"
for p in ret_prompt.split("\n\n") if p.strip())
caption = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(message_id, chat_id, path, caption, media_type="image")
os.remove(path)
except Exception as e:
logging.warning(f"Image {i}/{count} failed: {e}")
client.send_message(message_id, chat_id,
f"😢 Failed to generate image {i}/{count}.")
def _fn_voice_reply(message_id, chat_id, prompt):
result = generate_voice_reply(prompt,
model="openai-audio",
voice="coral",
audio_dir=BotConfig.AUDIO_DIR)
if result and result[0]:
audio_path, _ = result
client.send_media(message_id, chat_id, audio_path, "", media_type="audio")
os.remove(audio_path)
else:
# fallback to text
response = generate_llm(prompt)
client.send_message(message_id, chat_id, response)
# --- Intent router for fallback ---
FUNCTION_SCHEMA = {
"generate_image": {
"description": "Generate one or more images",
"params": ["prompt","count"]
},
"send_text": {
"description": "Send a plain text response",
"params": ["message"]
}
}
def route_intent(user_input: str):
"""
Ask the LLM whether to call a function or just chat.
Expects a JSON response like:
{"action":"generate_image","prompt":"a sunset","count":2}
or
{"action":"send_text","message":"Here's my reply..."}
"""
sys_prompt = (
"You are Eve. You can either chat normally or call one of these functions:\n"
+ "\n".join(f"- {name}: {info['description']}"
for name,info in FUNCTION_SCHEMA.items())
+ "\n\nIf the user wants an image generated, return JSON with "
"\"action\":\"generate_image\",\"prompt\":\"...\",\"count\":<int>.\n"
"Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n"
"Do NOT wrap your response in any extra text—only raw JSON."
)
raw = generate_llm(f"{sys_prompt}\nUser: {user_input}")
try:
return json.loads(raw)
except:
# fallback: treat entire raw as chat
return {"action":"send_text","message":raw}
# --- FastAPI App & Webhook ---
app = FastAPI()
help_text = (
"🤖 *Hi, I'm Eve!* Commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /weatherpoem <loc>\n"
"• /inspire\n"
"• /trivia / /answer\n"
"• /meme <text>\n"
"• /poll <Q>|<opt1>|… / /results / /endpoll\n"
"• /gen <prompt>|<count>\n"
"Otherwise I’ll chat or generate images for you!"
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
global last_message_time
last_message_time = time.time()
# Auth
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
data = await request.json()
chat_id = data.get("senderData", {}).get("chatId")
if chat_id != BotConfig.BOT_GROUP_CHAT or data.get("typeWebhook") != "incomingMessageReceived":
return {"success": True}
md = data["messageData"]
mid = data["idMessage"]
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success": True}
body = tmd.get("textMessage", tmd.get("text", "")).strip()
ctx = tmd.get("contextInfo", {})
# 1) Quoted‑reply to bot
if md.get("typeMessage") == "quotedMessage":
ext = md["extendedTextMessageData"]
quoted = md["quotedMessage"]
if ext.get("participant") == BotConfig.BOT_JID:
user_reply = ext.get("text", "")
quoted_text = quoted.get("textMessage", "")
prompt = (
f"You asked: {quoted_text}\n"
f"User replied: {user_reply}\n"
"Provide a helpful follow‑up."
)
ans = generate_llm(prompt)
client.send_message(mid, chat_id, ans)
task_queue.put({
"type":"audio","message_id":mid,
"chat_id":chat_id,"prompt":ans
})
return {"success": True}
# 2) Mentions skip
if ctx.get("mentionedJidList"):
return {"success": True}
low = body.lower()
# 3) Slash‑commands
if low == "/help":
client.send_message(mid, chat_id, help_text); return {"success": True}
if low.startswith("/summarize "):
_fn_summarize(mid, chat_id, body[11:].strip()); return {"success": True}
if low.startswith("/translate "):
lang, txt = body[11:].split("|",1)
_fn_translate(mid, chat_id, lang.strip(), txt.strip()); return {"success": True}
if low == "/joke":
_fn_joke(mid, chat_id); return {"success": True}
if low.startswith("/weather "):
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")); return {"success": True}
if low.startswith("/weatherpoem "):
_fn_weather_poem(mid, chat_id, body[13:].strip().replace(" ","+")); return {"success": True}
if low == "/inspire":
_fn_inspire(mid, chat_id); return {"success": True}
if low == "/trivia":
_fn_trivia(mid, chat_id); return {"success": True}
if low.startswith("/answer"):
_fn_answer(mid, chat_id, body[7:].strip()); return {"success": True}
if low.startswith("/meme "):
_fn_meme(mid, chat_id, body[6:].strip()); return {"success": True}
if low.startswith("/poll "):
parts = [p.strip() for p in body[6:].split("|")]
_fn_poll(mid, chat_id, parts[0], parts[1:]); return {"success": True}
if chat_id in polls and low.isdigit():
_fn_poll_vote(mid, chat_id,
data["senderData"]["sender"],
int(low)); return {"success": True}
if low == "/results":
_fn_poll_results(mid, chat_id); return {"success": True}
if low == "/endpoll":
_fn_poll_end(mid, chat_id); return {"success": True}
if low.startswith("/gen"):
parts = body[4:].split("|",1)
prompt = parts[0].strip()
cnt = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
client.send_message(mid, chat_id, f"✨ Generating {cnt} image(s)…")
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id,
"prompt":prompt,"num_images":cnt})
return {"success": True}
# 4) Fallback → function calling router
intent = route_intent(body)
act = intent.get("action")
if act == "generate_image":
pr = intent.get("prompt","")
ct = intent.get("count",1)
client.send_message(mid, chat_id, f"👍 Generating {ct} images for “{pr}”…")
task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id,
"prompt":pr,"num_images":ct})
else:
# send_text or any unknown
msg = intent.get("message", "Sorry, I didn't understand.")
client.send_message(mid, chat_id, msg)
task_queue.put({"type":"audio","message_id":mid,"chat_id":chat_id,
"prompt":msg})
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)