eve / app.py
Chandima Prabhath
Increase conversation history limit to 20 messages; add summarize, translate, meme, and poll functionalities with corresponding intents and command support in the configuration.
3b132e6
raw
history blame
21.3 kB
import os
import threading
import requests
import logging
import queue
import json
from typing import List, Optional, Union, Literal
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, Field, ValidationError
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm, LLMBadRequestError # assume this exception is raised on 400
# --- Logging Setup ---------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger("eve_bot")
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s"
)
handler.setFormatter(formatter)
class ContextFilter(logging.Filter):
def filter(self, record):
record.message_id = getattr(record, "message_id", "-")
record.sender = getattr(record, "sender", "-")
return True
handler.addFilter(ContextFilter())
logger.handlers = [handler]
# Thread‐local to carry context through helpers
_thread_ctx = threading.local()
def set_thread_context(chat_id, sender, message_id):
_thread_ctx.chat_id = chat_id
_thread_ctx.sender = sender
_thread_ctx.message_id = message_id
def get_thread_context():
return (
getattr(_thread_ctx, "chat_id", None),
getattr(_thread_ctx, "sender", None),
getattr(_thread_ctx, "message_id", None),
)
# --- Conversation History -------------------------------------------------
history = defaultdict(lambda: deque(maxlen=20))
def record_user_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"User: {message}")
def record_bot_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"Assistant: {message}")
def get_history_text(chat_id, sender):
return "\n".join(history[(chat_id, sender)])
def clear_history(chat_id, sender):
history[(chat_id, sender)].clear()
# --- Bot Config & Client --------------------------------------------------
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [n for n in (
"GREEN_API_URL","GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID"
) if not getattr(cls, n)]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
def send(self, endpoint, payload, files=None, retries=3):
url = (
f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}"
)
for i in range(1, retries+1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}")
return {"error":"failed"}
def send_message(self, message_id, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id, chat_id, file_path, caption, media_type):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path,"rb") as f:
mime = "image/jpeg" if media_type=="image" else "audio/mpeg"
files = [("file",(os.path.basename(file_path),f,mime))]
return self.send(endpoint, payload, files=files)
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading & Queues ---------------------------------------------------
task_queue = queue.Queue()
polls = {}
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(**task)
elif task["type"] == "audio":
_fn_voice_reply(**task)
except Exception as e:
logger.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Basic Tool Functions -------------------------------------------------
def _fn_send_text(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": cid,
"prompt": message
})
def _fn_send_accept(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
def _fn_summarize(mid, cid, text):
summary = generate_llm(f"Summarize:\n\n{text}")
_fn_send_text(mid, cid, summary)
def _fn_translate(mid, cid, lang, text):
resp = generate_llm(f"Translate to {lang}:\n\n{text}")
_fn_send_text(mid, cid, resp)
def _fn_joke(mid, cid):
try:
j = requests.get(
"https://official-joke-api.appspot.com/random_joke",
timeout=5
).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
_fn_send_text(mid, cid, joke)
def _fn_weather(mid, cid, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(f"Give a weather report in °C:\n\n{raw}")
_fn_send_text(mid, cid, report)
def _fn_inspire(mid, cid):
quote = generate_llm("Give me a unique, random short inspirational quote.")
_fn_send_text(mid, cid, f"✨ {quote}")
def _fn_meme(mid, cid, txt):
_fn_send_accept(mid, cid, "🎨 Generating meme…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": cid,
"prompt": f"meme: {txt}"
})
def _fn_poll_create(mid, cid, question, options):
votes = {i+1:0 for i in range(len(options))}
polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}}
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options))
_fn_send_text(mid, cid, text)
def _fn_poll_vote(mid, cid, voter, choice):
poll = polls.get(cid)
if not poll or choice<1 or choice>len(poll["options"]):
return
prev = poll["voters"].get(voter)
if prev:
poll["votes"][prev] -= 1
poll["votes"][choice] += 1
poll["voters"][voter] = choice
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}")
def _fn_poll_results(mid, cid):
poll = polls.get(cid)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_poll_end(mid, cid):
poll = polls.pop(cid, None)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_generate_images(
message_id: str,
chat_id: str,
prompt: str,
count: int = 1,
width: Optional[int] = None,
height: Optional[int] = None,
**_
):
_fn_send_accept(message_id, chat_id, f"✨ Generating {count} image(s)…")
for i in range(1, count+1):
try:
img, path, ret_p, url = generate_image(
prompt, message_id, message_id, BotConfig.IMAGE_DIR,
width=width, height=height
)
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip())
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(message_id, chat_id, path, cap, media_type="image")
os.remove(path)
except Exception as e:
logger.warning(f"Img {i}/{count} failed: {e}")
_fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.")
def _fn_voice_reply(
message_id: str,
chat_id: str,
prompt: str,
**_
):
proc = (
f"Just say this exactly as written in a friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR)
if res and res[0]:
path, _ = res
client.send_media(message_id, chat_id, path, "", media_type="audio")
os.remove(path)
else:
_fn_send_text(message_id, chat_id, prompt)
# --- Pydantic Models for Function Calling --------------------------------
class BaseIntent(BaseModel):
action: str
class SummarizeIntent(BaseIntent):
action: Literal["summarize"]
text: str
class TranslateIntent(BaseIntent):
action: Literal["translate"]
lang: str
text: str
class JokeIntent(BaseIntent):
action: Literal["joke"]
class WeatherIntent(BaseIntent):
action: Literal["weather"]
location: str
class InspireIntent(BaseIntent):
action: Literal["inspire"]
class MemeIntent(BaseIntent):
action: Literal["meme"]
text: str
class PollCreateIntent(BaseIntent):
action: Literal["poll_create"]
question: str
options: List[str]
class PollVoteIntent(BaseIntent):
action: Literal["poll_vote"]
voter: str
choice: int
class PollResultsIntent(BaseIntent):
action: Literal["poll_results"]
class PollEndIntent(BaseIntent):
action: Literal["poll_end"]
class GenerateImageIntent(BaseModel):
action: Literal["generate_image"]
prompt: str
count: int = Field(default=1, ge=1)
width: Optional[int]
height: Optional[int]
class SendTextIntent(BaseIntent):
action: Literal["send_text"]
message: str
# list of all intent models
INTENT_MODELS = [
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent,
InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent,
PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent
]
ACTION_HANDLERS = {
"summarize": lambda mid,cid,**i: _fn_summarize(mid,cid,i["text"]),
"translate": lambda mid,cid,**i: _fn_translate(mid,cid,i["lang"],i["text"]),
"joke": lambda mid,cid,**i: _fn_joke(mid,cid),
"weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]),
"inspire": lambda mid,cid,**i: _fn_inspire(mid,cid),
"meme": lambda mid,cid,**i: _fn_meme(mid,cid,i["text"]),
"poll_create": lambda mid,cid,**i: _fn_poll_create(mid,cid,i["question"],i["options"]),
"poll_vote": lambda mid,cid,**i: _fn_poll_vote(mid,cid,i["voter"],i["choice"]),
"poll_results": lambda mid,cid,**i: _fn_poll_results(mid,cid),
"poll_end": lambda mid,cid,**i: _fn_poll_end(mid,cid),
"generate_image": _fn_generate_images,
"send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]),
}
# --- Intent Routing with Fallback & History‐Reset on 400 -------------------
def route_intent(user_input: str, chat_id: str, sender: str):
history_text = get_history_text(chat_id, sender)
sys_prompt = (
"You are Eve. You can either chat or call one of these functions:\n"
"- summarize(text)\n"
"- translate(lang, text)\n"
"- joke()\n"
"- weather(location)\n"
"- inspire()\n"
"- meme(text)\n"
"- poll_create(question, options)\n"
"- poll_vote(voter, choice)\n"
"- poll_results()\n"
"- poll_end()\n"
"- generate_image(prompt, count, width, height)\n"
"- send_text(message)\n\n"
"Return only raw JSON matching one of these shapes. For example:\n"
" {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n"
"Otherwise, use send_text to reply with plain chat.\n"
)
prompt = f"{sys_prompt}\nConversation so far:\n{history_text}\n\nUser: {user_input}"
try:
raw = generate_llm(prompt)
except LLMBadRequestError:
# Clear history on HTTP 400 from the LLM
clear_history(chat_id, sender)
return SendTextIntent(action="send_text", message="Oops, I lost my train of thought—let’s start fresh!")
logger.debug(f"LLM raw response: {raw}")
# 1) Strict: try each Pydantic model
try:
parsed = json.loads(raw)
logger.debug(f"Parsed JSON: {parsed}")
except json.JSONDecodeError:
return SendTextIntent(action="send_text", message=raw)
for M in INTENT_MODELS:
try:
intent = M.model_validate(parsed)
logger.debug(f"Matched intent model: {M.__name__} with data {parsed}")
return intent
except ValidationError:
continue
logger.warning("Strict parse failed for all models, falling back to lenient")
# 2) Lenient JSON get
action = parsed.get("action")
if action in ACTION_HANDLERS:
data = parsed
kwargs = {}
if action == "generate_image":
kwargs["prompt"] = data.get("prompt","")
kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT))
kwargs["width"] = data.get("width")
kwargs["height"] = data.get("height")
elif action == "send_text":
kwargs["message"] = data.get("message","")
elif action == "translate":
kwargs["lang"] = data.get("lang","")
kwargs["text"] = data.get("text","")
elif action == "summarize":
kwargs["text"] = data.get("text","")
elif action == "weather":
kwargs["location"] = data.get("location","")
elif action == "meme":
kwargs["text"] = data.get("text","")
elif action == "poll_create":
kwargs["question"] = data.get("question","")
kwargs["options"] = data.get("options",[])
elif action == "poll_vote":
kwargs["voter"] = sender
kwargs["choice"] = int(data.get("choice",0))
try:
# coerce into Pydantic for uniform interface
model = next(
m for m in INTENT_MODELS
if getattr(m, "__fields__", {}).get("action").default == action
)
intent = model.model_validate({"action":action, **kwargs})
logger.debug(f"Leniently matched intent model: {model.__name__} with kwargs {kwargs}")
return intent
except Exception as e:
logger.error(f"Lenient parsing into Pydantic failed: {e}")
return SendTextIntent(action="send_text", message=raw)
return SendTextIntent(action="send_text", message=raw)
# --- FastAPI & Webhook ----------------------------------------------------
app = FastAPI()
help_text = (
"🤖 *Eve* commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /inspire\n"
"• /meme <text>\n"
"• /poll <Q>|… / /results / /endpoll\n"
"• /gen <prompt>|<count>|<width>|<height>\n"
"Otherwise chat or reply to my message to invoke tools."
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
data = await request.json()
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
chat_id = data["senderData"]["chatId"]
sender = data["senderData"]["sender"]
mid = data["idMessage"]
set_thread_context(chat_id, sender, mid)
logger.debug(f"Received webhook for message {mid} from {sender}")
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived":
return {"success": True}
md = data["messageData"]
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success": True}
body = (tmd.get("textMessage") or tmd.get("text","")).strip()
record_user_message(chat_id, sender, body)
logger.debug(f"User message: {body}")
low = body.lower()
# Slash commands...
if low == "/help":
_fn_send_text(mid, chat_id, help_text)
return {"success": True}
if low.startswith("/summarize "):
_fn_summarize(mid, chat_id, body[11:].strip())
return {"success": True}
if low.startswith("/translate "):
lang, txt = body[11:].split("|", 1)
_fn_translate(mid, chat_id, lang.strip(), txt.strip())
return {"success": True}
if low == "/joke":
_fn_joke(mid, chat_id)
return {"success": True}
if low.startswith("/weather "):
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+"))
return {"success": True}
if low == "/inspire":
_fn_inspire(mid, chat_id)
return {"success": True}
if low.startswith("/meme "):
_fn_meme(mid, chat_id, body[6:].strip())
return {"success": True}
if low.startswith("/poll "):
parts = [p.strip() for p in body[6:].split("|")]
_fn_poll_create(mid, chat_id, parts[0], parts[1:])
return {"success": True}
if chat_id in polls and low.isdigit():
_fn_poll_vote(mid, chat_id, sender, int(low))
return {"success": True}
if low == "/results":
_fn_poll_results(mid, chat_id)
return {"success": True}
if low == "/endpoll":
_fn_poll_end(mid, chat_id)
return {"success": True}
if low.startswith("/gen"):
parts = body[4:].split("|")
pr = parts[0].strip()
ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None
height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None
_fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat_id,
"prompt": pr,
"count": ct,
"width": width,
"height": height
})
return {"success": True}
# Skip mentions
if tmd.get("contextInfo", {}).get("mentionedJidList"):
return {"success": True}
# Handle quoted replies to the bot
if md.get("typeMessage") == "quotedMessage":
ext = md["extendedTextMessageData"]
quoted = md["quotedMessage"]
if ext.get("participant") == BotConfig.BOT_JID:
effective = (
f"Quoted: {quoted.get('textMessage','')}\n"
f"User: {ext.get('text','')}"
)
else:
effective = body
else:
effective = body
# Route intent & dispatch
intent = route_intent(effective, chat_id, sender)
logger.debug(f"Final intent: {intent}")
handler = ACTION_HANDLERS.get(intent.action)
if handler:
kwargs = intent.model_dump(exclude={"action"})
logger.debug(f"Dispatching action '{intent.action}' with args {kwargs}")
handler(mid, chat_id, **kwargs)
else:
logger.warning(f"No handler for action '{intent.action}'")
_fn_send_text(mid, chat_id, "Sorry, I didn't understand that.")
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__ == "__main__":
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"🌟 Eve is online! Type /help to see commands."
)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)