eve / app.py
Chandima Prabhath
Refactor intent handling to use Pydantic models for strict parsing; update GenerateImageIntent to inherit from BaseModel and improve route_intent function for better error handling and data extraction.
db2f80b
raw
history blame
19.5 kB
import os
import threading
import requests
import logging
import queue
import json
from typing import List, Optional, Union, Literal
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, Field, ValidationError
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm
# --- Logging Setup ---------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger("eve_bot")
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s"
)
handler.setFormatter(formatter)
class ContextFilter(logging.Filter):
def filter(self, record):
record.message_id = getattr(record, "message_id", "-")
record.sender = getattr(record, "sender", "-")
return True
handler.addFilter(ContextFilter())
logger.handlers = [handler]
# Thread‐local to carry context through helpers
_thread_ctx = threading.local()
def set_thread_context(chat_id, sender, message_id):
_thread_ctx.chat_id = chat_id
_thread_ctx.sender = sender
_thread_ctx.message_id = message_id
def get_thread_context():
return (
getattr(_thread_ctx, "chat_id", None),
getattr(_thread_ctx, "sender", None),
getattr(_thread_ctx, "message_id", None),
)
# --- Conversation History -------------------------------------------------
history = defaultdict(lambda: deque(maxlen=20))
def record_user_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"User: {message}")
def record_bot_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"Assistant: {message}")
def get_history_text(chat_id, sender):
return "\n".join(history[(chat_id, sender)])
# --- Bot Config & Client --------------------------------------------------
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [n for n in (
"GREEN_API_URL","GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID"
) if not getattr(cls, n)]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
def send(self, endpoint, payload, files=None, retries=3):
url = (
f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}"
)
for i in range(1, retries+1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}")
return {"error":"failed"}
def send_message(self, message_id, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id, chat_id, file_path, caption, media_type):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path,"rb") as f:
mime = "image/jpeg" if media_type=="image" else "audio/mpeg"
files = [("file",(os.path.basename(file_path),f,mime))]
return self.send(endpoint, payload, files=files)
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading & Queues ---------------------------------------------------
task_queue = queue.Queue()
polls = {}
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(**task)
elif task["type"] == "audio":
_fn_voice_reply(**task)
except Exception as e:
logger.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Basic Tool Functions -------------------------------------------------
def _fn_send_text(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": cid,
"prompt": message
})
def _fn_send_accept(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
def _fn_summarize(mid, cid, text):
summary = generate_llm(f"Summarize:\n\n{text}")
_fn_send_text(mid, cid, summary)
def _fn_translate(mid, cid, lang, text):
resp = generate_llm(f"Translate to {lang}:\n\n{text}")
_fn_send_text(mid, cid, resp)
def _fn_joke(mid, cid):
try:
j = requests.get(
"https://official-joke-api.appspot.com/random_joke",
timeout=5
).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
_fn_send_text(mid, cid, joke)
def _fn_weather(mid, cid, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(f"Give a weather report in °C:\n\n{raw}")
_fn_send_text(mid, cid, report)
def _fn_inspire(mid, cid):
quote = generate_llm("Give me a unique, random short inspirational quote.")
_fn_send_text(mid, cid, f"✨ {quote}")
def _fn_meme(mid, cid, txt):
_fn_send_accept(mid, cid, "🎨 Generating meme…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": cid,
"prompt": f"meme: {txt}"
})
def _fn_poll_create(mid, cid, question, options):
votes = {i+1:0 for i in range(len(options))}
polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}}
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options))
_fn_send_text(mid, cid, text)
def _fn_poll_vote(mid, cid, voter, choice):
poll = polls.get(cid)
if not poll or choice<1 or choice>len(poll["options"]):
return
prev = poll["voters"].get(voter)
if prev:
poll["votes"][prev] -= 1
poll["votes"][choice] += 1
poll["voters"][voter] = choice
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}")
def _fn_poll_results(mid, cid):
poll = polls.get(cid)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_poll_end(mid, cid):
poll = polls.pop(cid, None)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_generate_images(message_id: str, chat_id: str, prompt: str,
count: int = 1, width: Optional[int] = None,
height: Optional[int] = None, **_):
for i in range(1, count+1):
try:
img, path, ret_p, url = generate_image(
prompt, message_id, message_id, BotConfig.IMAGE_DIR,
width=width, height=height
)
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip())
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(message_id, chat_id, path, cap, media_type="image")
os.remove(path)
except Exception as e:
logger.warning(f"Img {i}/{count} failed: {e}")
_fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.")
def _fn_voice_reply(message_id: str, chat_id: str, prompt: str, **_):
proc = (
f"Just say this exactly as written in a flirty, friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR)
if res and res[0]:
path, _ = res
client.send_media(message_id, chat_id, path, "", media_type="audio")
os.remove(path)
else:
_fn_send_text(message_id, chat_id, prompt)
# --- Pydantic Models for Function Calling --------------------------------
class BaseIntent(BaseModel):
action: str
class SummarizeIntent(BaseIntent):
action: Literal["summarize"]
text: str
class TranslateIntent(BaseIntent):
action: Literal["translate"]
lang: str
text: str
class JokeIntent(BaseIntent):
action: Literal["joke"]
class WeatherIntent(BaseIntent):
action: Literal["weather"]
location: str
class InspireIntent(BaseIntent):
action: Literal["inspire"]
class MemeIntent(BaseIntent):
action: Literal["meme"]
text: str
class PollCreateIntent(BaseIntent):
action: Literal["poll_create"]
question: str
options: List[str]
class PollVoteIntent(BaseIntent):
action: Literal["poll_vote"]
voter: str
choice: int
class PollResultsIntent(BaseIntent):
action: Literal["poll_results"]
class PollEndIntent(BaseIntent):
action: Literal["poll_end"]
class GenerateImageIntent(BaseModel):
action: Literal["generate_image"]
prompt: str
count: int = Field(default=1, ge=1)
width: Optional[int]
height: Optional[int]
class SendTextIntent(BaseIntent):
action: Literal["send_text"]
message: str
# list of all intent models
INTENT_MODELS = [
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent,
InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent,
PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent
]
ACTION_HANDLERS = {
"summarize": lambda mid,cid,**i: _fn_summarize(mid,cid,i["text"]),
"translate": lambda mid,cid,**i: _fn_translate(mid,cid,i["lang"],i["text"]),
"joke": lambda mid,cid,**i: _fn_joke(mid,cid),
"weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]),
"inspire": lambda mid,cid,**i: _fn_inspire(mid,cid),
"meme": lambda mid,cid,**i: _fn_meme(mid,cid,i["text"]),
"poll_create": lambda mid,cid,**i: _fn_poll_create(mid,cid,i["question"],i["options"]),
"poll_vote": lambda mid,cid,**i: _fn_poll_vote(mid,cid,i["voter"],i["choice"]),
"poll_results": lambda mid,cid,**i: _fn_poll_results(mid,cid),
"poll_end": lambda mid,cid,**i: _fn_poll_end(mid,cid),
"generate_image": _fn_generate_images,
"send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]),
}
# --- Intent Routing with Fallback ------------------------------------------
def route_intent(user_input: str, chat_id: str, sender: str):
history_text = get_history_text(chat_id, sender)
sys_prompt = (
"You are Eve. You can either chat or call one of these functions:\n"
"- summarize(text)\n"
"- translate(lang, text)\n"
"- joke()\n"
"- weather(location)\n"
"- inspire()\n"
"- meme(text)\n"
"- poll_create(question, options)\n"
"- poll_vote(voter, choice)\n"
"- poll_results()\n"
"- poll_end()\n"
"- generate_image(prompt, count, width, height)\n"
"- send_text(message)\n\n"
"Return only raw JSON matching one of these shapes. For example:\n"
" {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n"
"Otherwise, use send_text to reply with plain chat.\n"
)
prompt = f"{sys_prompt}\nConversation so far:\n{history_text}\n\nUser: {user_input}"
raw = generate_llm(prompt)
# 1) Strict: try each Pydantic model
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return SendTextIntent(action="send_text", message=raw)
for M in INTENT_MODELS:
try:
intent = M.parse_obj(parsed)
return intent
except ValidationError:
continue
logger.warning("Strict parse failed for all models, falling back to lenient")
# 2) Lenient JSON get
action = parsed.get("action")
if action in ACTION_HANDLERS:
data = parsed
kwargs = {}
if action == "generate_image":
kwargs["prompt"] = data.get("prompt","")
kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT))
kwargs["width"] = data.get("width")
kwargs["height"] = data.get("height")
elif action == "send_text":
kwargs["message"] = data.get("message","")
elif action == "translate":
kwargs["lang"] = data.get("lang","")
kwargs["text"] = data.get("text","")
elif action == "summarize":
kwargs["text"] = data.get("text","")
elif action == "weather":
kwargs["location"] = data.get("location","")
elif action == "meme":
kwargs["text"] = data.get("text","")
elif action == "poll_create":
kwargs["question"] = data.get("question","")
kwargs["options"] = data.get("options",[])
elif action == "poll_vote":
kwargs["voter"] = sender
kwargs["choice"] = int(data.get("choice",0))
# parse into Pydantic for uniformity
try:
return next(M for M in INTENT_MODELS if getattr(M, "__fields__", {}).get("action").default == action).parse_obj({"action":action,**kwargs})
except Exception:
return SendTextIntent(action="send_text", message=raw)
return SendTextIntent(action="send_text", message=raw)
# --- FastAPI & Webhook ----------------------------------------------------
app = FastAPI()
help_text = (
"🤖 *Eve* commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /inspire\n"
"• /meme <text>\n"
"• /poll <Q>|… / /results / /endpoll\n"
"• /gen <prompt>|<count>|<width>|<height>\n"
"Otherwise chat or reply to my message to invoke tools."
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
data = await request.json()
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
chat_id = data["senderData"]["chatId"]
sender = data["senderData"]["sender"]
mid = data["idMessage"]
set_thread_context(chat_id, sender, mid)
logger.debug("Received webhook")
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived":
return {"success": True}
md = data["messageData"]
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success": True}
body = (tmd.get("textMessage") or tmd.get("text","")).strip()
record_user_message(chat_id, sender, body)
low = body.lower()
if low == "/help":
_fn_send_text(mid, chat_id, help_text)
return {"success": True}
if low.startswith("/summarize "):
_fn_summarize(mid, chat_id, body[11:].strip())
return {"success": True}
if low.startswith("/translate "):
lang, txt = body[11:].split("|", 1)
_fn_translate(mid, chat_id, lang.strip(), txt.strip())
return {"success": True}
if low == "/joke":
_fn_joke(mid, chat_id)
return {"success": True}
if low.startswith("/weather "):
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+"))
return {"success": True}
if low == "/inspire":
_fn_inspire(mid, chat_id)
return {"success": True}
if low.startswith("/meme "):
_fn_meme(mid, chat_id, body[6:].strip())
return {"success": True}
if low.startswith("/poll "):
parts = [p.strip() for p in body[6:].split("|")]
_fn_poll_create(mid, chat_id, parts[0], parts[1:])
return {"success": True}
if chat_id in polls and low.isdigit():
_fn_poll_vote(mid, chat_id, sender, int(low))
return {"success": True}
if low == "/results":
_fn_poll_results(mid, chat_id)
return {"success": True}
if low == "/endpoll":
_fn_poll_end(mid, chat_id)
return {"success": True}
if low.startswith("/gen"):
parts = body[4:].split("|")
pr = parts[0].strip()
ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None
height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None
_fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat_id,
"prompt": pr,
"count": ct,
"width": width,
"height": height
})
return {"success": True}
if tmd.get("contextInfo", {}).get("mentionedJidList"):
return {"success": True}
effective = body
intent = route_intent(effective, chat_id, sender)
handler = ACTION_HANDLERS.get(intent.action)
if handler:
handler(mid, chat_id, **intent.dict(exclude={"action"}))
else:
_fn_send_text(mid, chat_id, "Sorry, I didn't understand that.")
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__ == "__main__":
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"🌟 Eve is online! Type /help to see commands."
)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)