eve / app.py
Chandima Prabhath
Update image generation message to handle singular and plural cases
3b40669
import os
import threading
import requests
import logging
import queue
import json
from typing import List, Optional, Union, Literal
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from pydantic import BaseModel, Field, ValidationError
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm, LLMBadRequestError # assume this exception is raised on 400
# --- Logging Setup ---------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger("eve_bot")
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s"
)
handler.setFormatter(formatter)
class ContextFilter(logging.Filter):
def filter(self, record):
record.message_id = getattr(record, "message_id", "-")
record.sender = getattr(record, "sender", "-")
return True
handler.addFilter(ContextFilter())
logger.handlers = [handler]
# Thread‐local to carry context through helpers
_thread_ctx = threading.local()
def set_thread_context(chat_id, sender, message_id):
_thread_ctx.chat_id = chat_id
_thread_ctx.sender = sender
_thread_ctx.message_id = message_id
def get_thread_context():
return (
getattr(_thread_ctx, "chat_id", None),
getattr(_thread_ctx, "sender", None),
getattr(_thread_ctx, "message_id", None),
)
# --- Conversation History -------------------------------------------------
history = defaultdict(lambda: deque(maxlen=20))
def record_user_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"User: {message}")
def record_bot_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"Assistant: {message}")
def get_history_text(chat_id, sender):
return "\n".join(history[(chat_id, sender)])
def clear_history(chat_id, sender):
history[(chat_id, sender)].clear()
# --- Bot Config & Client --------------------------------------------------
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [n for n in (
"GREEN_API_URL","GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID"
) if not getattr(cls, n)]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
def send(self, endpoint, payload, files=None, retries=3):
url = (
f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}"
)
for i in range(1, retries+1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}")
return {"error":"failed"}
def send_message(self, message_id, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id, chat_id, file_path, caption, media_type):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path,"rb") as f:
mime = "image/jpeg" if media_type=="image" else "audio/mpeg"
files = [("file",(os.path.basename(file_path),f,mime))]
return self.send(endpoint, payload, files=files)
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading & Queues ---------------------------------------------------
task_queue = queue.Queue()
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(**task)
elif task["type"] == "audio":
_fn_voice_reply(**task)
except Exception as e:
logger.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Basic Tool Functions -------------------------------------------------
def _fn_send_text(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": cid,
"prompt": message
})
def _fn_send_accept(mid, cid, message):
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
def _fn_summarize(mid, cid, text):
summary = generate_llm(f"Summarize:\n\n{text}")
_fn_send_text(mid, cid, summary)
def _fn_translate(mid, cid, lang, text):
resp = generate_llm(f"Translate to {lang}:\n\n{text}")
_fn_send_text(mid, cid, resp)
def _fn_joke(mid, cid):
try:
j = requests.get(
"https://official-joke-api.appspot.com/random_joke",
timeout=5
).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
_fn_send_text(mid, cid, joke)
def _fn_weather(mid, cid, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(f"Give a weather report in °C:\n\n{raw}")
_fn_send_text(mid, cid, report)
def _fn_inspire(mid, cid):
quote = generate_llm("Give me a unique, random short inspirational quote.")
_fn_send_text(mid, cid, f"✨ {quote}")
def _fn_meme(mid, cid, txt):
_fn_send_accept(mid, cid, "🎨 Generating meme…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": cid,
"prompt": f"meme: {txt}"
})
def _fn_generate_images(
message_id: str,
chat_id: str,
prompt: str,
count: int = 1,
width: Optional[int] = None,
height: Optional[int] = None,
**_
):
_fn_send_accept(message_id, chat_id, f"✨ Generating {count if count != 1 else 'a'} image{'s' if count != 1 else ''}...")
for i in range(1, count+1):
try:
img, path, ret_p, url = generate_image(
prompt, message_id, message_id, BotConfig.IMAGE_DIR,
width=width, height=height
)
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip())
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(message_id, chat_id, path, cap, media_type="image")
os.remove(path)
except Exception as e:
if "Timed out" in str(e):
logger.warning("Image generation timed out.")
else:
logger.warning(f"Img {i}/{count} failed: {e}")
_fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.")
def _fn_voice_reply(
message_id: str,
chat_id: str,
prompt: str,
**_
):
"""
Try to generate an audio reply once. If it fails (e.g. a 400),
send the text fallback directly (no further retry).
"""
proc = (
f"Just say this exactly as written in a friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
try:
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR)
except Exception as e:
logger.warning(f"Audio generation failed ({e}); sending text only.")
return
if res and res[0]:
path, _ = res
client.send_media(message_id, chat_id, path, "", media_type="audio")
os.remove(path)
else:
logger.warning("Audio reply failed")
# --- Pydantic Models for Function Calling --------------------------------
class BaseIntent(BaseModel):
action: str
class SummarizeIntent(BaseIntent):
action: Literal["summarize"]
text: str
class TranslateIntent(BaseIntent):
action: Literal["translate"]
lang: str
text: str
class JokeIntent(BaseIntent):
action: Literal["joke"]
class WeatherIntent(BaseIntent):
action: Literal["weather"]
location: str
class InspireIntent(BaseIntent):
action: Literal["inspire"]
class MemeIntent(BaseIntent):
action: Literal["meme"]
text: str
class GenerateImageIntent(BaseModel):
action: Literal["generate_image"]
prompt: str
count: int = Field(default=1, ge=1)
width: Optional[int]
height: Optional[int]
class SendTextIntent(BaseModel):
action: Literal["send_text"]
message: str
# list of all intent models
INTENT_MODELS = [
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent,
InspireIntent, MemeIntent, GenerateImageIntent, SendTextIntent
]
ACTION_HANDLERS = {
"summarize": lambda mid,cid,**i: _fn_summarize(mid,cid,i["text"]),
"translate": lambda mid,cid,**i: _fn_translate(mid,cid,i["lang"],i["text"]),
"joke": lambda mid,cid,**i: _fn_joke(mid,cid),
"weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]),
"inspire": lambda mid,cid,**i: _fn_inspire(mid,cid),
"meme": lambda mid,cid,**i: _fn_meme(mid,cid,i["text"]),
"generate_image": _fn_generate_images,
"send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]),
}
# --- Intent Routing with Fallback & History‐Reset on 400 -------------------
def route_intent(user_input: str, chat_id: str, sender: str):
history_text = get_history_text(chat_id, sender)
sys_prompt = (
"You never perform work yourself—you only invoke one of the available functions."
"When the user asks for something that matches a function signature, you must return exactly one JSON object matching that function’s parameters—and nothing else. "
"Do not wrap it in markdown, do not add extra text, and do not show the JSON to the user. "
"If the user’s request does not match any function, reply in plain text, and never mention JSON or internal logic.\n\n"
"- summarize(text)\n"
"- translate(lang, text)\n"
"- joke()\n"
"- weather(location)\n"
"- inspire()\n"
"- meme(text)\n"
"- generate_image(prompt, count, width, height)\n"
"- send_text(message)\n\n"
"Return only raw JSON matching one of these shapes. For example:\n"
" {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":4,\"width\":1920,\"height\":1080}\n"
"Another Example:\n"
" {\"action\":\"send_text\",\"message\":\"Hello!\"}\n\n"
"Otherwise, use send_text to reply with plain chat and you should only return one json for the current message not for previous conversations.\n"
f"Conversation so far:\n{history_text}\n\n current message: User: {user_input}"
)
try:
raw = generate_llm(sys_prompt)
except LLMBadRequestError:
clear_history(chat_id, sender)
return SendTextIntent(action="send_text", message="Oops, I lost my train of thought—let’s start fresh!")
logger.debug(f"LLM raw response: {raw}")
try:
parsed = json.loads(raw)
logger.debug(f"Parsed JSON: {parsed}")
except json.JSONDecodeError:
return SendTextIntent(action="send_text", message=raw)
for M in INTENT_MODELS:
try:
intent = M.model_validate(parsed)
logger.debug(f"Matched intent model: {M.__name__} with data {parsed}")
return intent
except ValidationError:
continue
logger.warning("Strict parse failed for all models, falling back to lenient")
action = parsed.get("action")
if action in ACTION_HANDLERS:
data = parsed
kwargs = {}
if action == "generate_image":
kwargs["prompt"] = data.get("prompt","")
kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT))
kwargs["width"] = data.get("width")
kwargs["height"] = data.get("height")
elif action == "send_text":
kwargs["message"] = data.get("message","")
elif action == "translate":
kwargs["lang"] = data.get("lang","")
kwargs["text"] = data.get("text","")
elif action == "summarize":
kwargs["text"] = data.get("text","")
elif action == "weather":
kwargs["location"] = data.get("location","")
elif action == "meme":
kwargs["text"] = data.get("text","")
try:
model = next(
m for m in INTENT_MODELS
if getattr(m, "__fields__", {}).get("action").default == action
)
intent = model.model_validate({"action":action, **kwargs})
logger.debug(f"Leniently matched intent model: {model.__name__} with kwargs {kwargs}")
return intent
except Exception as e:
logger.error(f"Lenient parsing into Pydantic failed: {e}")
return SendTextIntent(action="send_text", message=raw)
return SendTextIntent(action="send_text", message=raw)
# --- FastAPI & Webhook ----------------------------------------------------
app = FastAPI()
help_text = (
"*🤖 Eve's Command Center:*\n\n"
"🔹 `/help` - Show this help message\n"
"🔹 `/summarize <text>` - Get a quick summary of the text\n"
"🔹 `/translate <lang>|<text>` - Convert your text to the specified language\n"
"🔹 `/joke` - Enjoy a light-hearted joke\n"
"🔹 `/weather <location>` - Check the current weather for your location\n"
"🔹 `/inspire` - Receive an uplifting inspirational quote\n"
"🔹 `/meme <text>` - Generate a meme based on your text\n"
"🔹 `/gen <prompt>|<count>|<width>|<height>` - Generate creative images\n\n"
"Just type your message or command to start interacting with Eve!"
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
data = await request.json()
logger.debug(f"Incoming webhook payload: {json.dumps(data)}")
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
try:
chat_id = data["senderData"]["chatId"]
sender = data["senderData"]["sender"]
mid = data["idMessage"]
except KeyError:
try:
chat_id = data["chatId"]
sender = data["sender"]
mid = data["messageId"]
except KeyError:
logger.error("Cannot find chat_id/sender/message_id in payload")
return {"success": False, "error": "bad payload"}
set_thread_context(chat_id, sender, mid)
logger.debug(f"Received webhook for message {mid} from {sender}")
if chat_id != BotConfig.BOT_GROUP_CHAT or data.get("typeWebhook") != "incomingMessageReceived":
return {"success": True}
md = data.get("messageData", {})
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success": True}
body = (tmd.get("textMessage") or tmd.get("text","")).strip()
record_user_message(chat_id, sender, body)
logger.debug(f"User message: {body}")
low = body.lower()
if low == "/help":
_fn_send_text(mid, chat_id, help_text)
return {"success": True}
if low.startswith("/summarize "):
_fn_summarize(mid, chat_id, body[11:].strip())
return {"success": True}
if low.startswith("/translate "):
lang, txt = body[11:].split("|", 1)
_fn_translate(mid, chat_id, lang.strip(), txt.strip())
return {"success": True}
if low == "/joke":
_fn_joke(mid, chat_id)
return {"success": True}
if low.startswith("/weather "):
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+"))
return {"success": True}
if low == "/inspire":
_fn_inspire(mid, chat_id)
return {"success": True}
if low.startswith("/meme "):
_fn_meme(mid, chat_id, body[6:].strip())
return {"success": True}
if low.startswith("/gen"):
parts = body[4:].split("|")
pr = parts[0].strip()
count = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None
height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None
_fn_send_accept(mid, chat_id, f"✨ Generating {count if count != 1 else 'a'} image{'s' if count != 1 else ''}...")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat_id,
"prompt": pr,
"count": count,
"width": width,
"height": height
})
return {"success": True}
if tmd.get("contextInfo", {}).get("mentionedJidList"):
return {"success": True}
if md.get("typeMessage") == "quotedMessage":
ext = md["extendedTextMessageData"]
quoted = md["quotedMessage"]
if ext.get("participant") == BotConfig.BOT_JID:
effective = (
f"Quoted: {quoted.get('textMessage','')}\n"
f"User: {ext.get('text','')}"
)
else:
effective = body
else:
effective = body
intent = route_intent(effective, chat_id, sender)
logger.debug(f"Final intent: {intent}")
handler = ACTION_HANDLERS.get(intent.action)
if handler:
kwargs = intent.model_dump(exclude={"action"})
logger.debug(f"Dispatching action '{intent.action}' with args {kwargs}")
handler(mid, chat_id, **kwargs)
else:
logger.warning(f"No handler for action '{intent.action}'")
_fn_send_text(mid, chat_id, "Sorry, I didn't understand that.")
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
@app.api_route("/health", methods=["GET", "HEAD"])
def health():
# HEAD requests ignore the body by HTTP spec, so FastAPI handles that automatically
return JSONResponse(content={"status": "ok"})
if __name__ == "__main__":
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"🌟 Eve is online! Type /help to see commands."
)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)