import os import threading import requests import logging import queue import json from typing import List, Optional, Union, Literal from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse from pydantic import BaseModel, Field, ValidationError from FLUX import generate_image from VoiceReply import generate_voice_reply from polLLM import generate_llm, LLMBadRequestError # assume this exception is raised on 400 # --- Logging Setup --------------------------------------------------------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger("eve_bot") logger.setLevel(LOG_LEVEL) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s" ) handler.setFormatter(formatter) class ContextFilter(logging.Filter): def filter(self, record): record.message_id = getattr(record, "message_id", "-") record.sender = getattr(record, "sender", "-") return True handler.addFilter(ContextFilter()) logger.handlers = [handler] # Thread‐local to carry context through helpers _thread_ctx = threading.local() def set_thread_context(chat_id, sender, message_id): _thread_ctx.chat_id = chat_id _thread_ctx.sender = sender _thread_ctx.message_id = message_id def get_thread_context(): return ( getattr(_thread_ctx, "chat_id", None), getattr(_thread_ctx, "sender", None), getattr(_thread_ctx, "message_id", None), ) # --- Conversation History ------------------------------------------------- history = defaultdict(lambda: deque(maxlen=20)) def record_user_message(chat_id, sender, message): history[(chat_id, sender)].append(f"User: {message}") def record_bot_message(chat_id, sender, message): history[(chat_id, sender)].append(f"Assistant: {message}") def get_history_text(chat_id, sender): return "\n".join(history[(chat_id, sender)]) def clear_history(chat_id, sender): history[(chat_id, sender)].clear() # --- Bot Config & Client -------------------------------------------------- class BotConfig: GREEN_API_URL = os.getenv("GREEN_API_URL") GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") BOT_GROUP_CHAT = "120363312903494448@g.us" BOT_JID = os.getenv("BOT_JID") IMAGE_DIR = "/tmp/images" AUDIO_DIR = "/tmp/audio" DEFAULT_IMAGE_COUNT = 4 @classmethod def validate(cls): missing = [n for n in ( "GREEN_API_URL","GREEN_API_TOKEN", "GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID" ) if not getattr(cls, n)] if missing: raise ValueError(f"Missing env vars: {', '.join(missing)}") class BotClient: def __init__(self, cfg: BotConfig): self.cfg = cfg self.session = requests.Session() def send(self, endpoint, payload, files=None, retries=3): url = ( f"{self.cfg.GREEN_API_URL}/waInstance" f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" f"{self.cfg.GREEN_API_TOKEN}" ) for i in range(1, retries+1): try: resp = self.session.post( url, json=payload if files is None else None, data=None if files is None else payload, files=files ) resp.raise_for_status() return resp.json() except requests.RequestException as e: logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}") return {"error":"failed"} def send_message(self, message_id, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text, "quotedMessageId": message_id }) def send_message_to(self, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text }) def send_media(self, message_id, chat_id, file_path, caption, media_type): endpoint = "sendFileByUpload" payload = { "chatId": chat_id, "caption": caption, "quotedMessageId": message_id } with open(file_path,"rb") as f: mime = "image/jpeg" if media_type=="image" else "audio/mpeg" files = [("file",(os.path.basename(file_path),f,mime))] return self.send(endpoint, payload, files=files) BotConfig.validate() client = BotClient(BotConfig) # --- Threading & Queues --------------------------------------------------- task_queue = queue.Queue() executor = ThreadPoolExecutor(max_workers=4) def worker(): while True: task = task_queue.get() try: if task["type"] == "image": _fn_generate_images(**task) elif task["type"] == "audio": _fn_voice_reply(**task) except Exception as e: logger.error(f"Worker error {task}: {e}") finally: task_queue.task_done() for _ in range(4): threading.Thread(target=worker, daemon=True).start() # --- Basic Tool Functions ------------------------------------------------- def _fn_send_text(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) task_queue.put({ "type": "audio", "message_id": mid, "chat_id": cid, "prompt": message }) def _fn_send_accept(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) def _fn_summarize(mid, cid, text): summary = generate_llm(f"Summarize:\n\n{text}") _fn_send_text(mid, cid, summary) def _fn_translate(mid, cid, lang, text): resp = generate_llm(f"Translate to {lang}:\n\n{text}") _fn_send_text(mid, cid, resp) def _fn_joke(mid, cid): try: j = requests.get( "https://official-joke-api.appspot.com/random_joke", timeout=5 ).json() joke = f"{j['setup']}\n\n{j['punchline']}" except: joke = generate_llm("Tell me a short joke.") _fn_send_text(mid, cid, joke) def _fn_weather(mid, cid, loc): raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text report = generate_llm(f"Give a weather report in °C:\n\n{raw}") _fn_send_text(mid, cid, report) def _fn_inspire(mid, cid): quote = generate_llm("Give me a unique, random short inspirational quote.") _fn_send_text(mid, cid, f"✨ {quote}") def _fn_meme(mid, cid, txt): _fn_send_accept(mid, cid, "🎨 Generating meme…") task_queue.put({ "type": "image", "message_id": mid, "chat_id": cid, "prompt": f"meme: {txt}" }) def _fn_generate_images( message_id: str, chat_id: str, prompt: str, count: int = 1, width: Optional[int] = None, height: Optional[int] = None, **_ ): _fn_send_accept(message_id, chat_id, f"✨ Generating {count if count != 1 else 'a'} image{'s' if count != 1 else ''}...") for i in range(1, count+1): try: img, path, ret_p, url = generate_image( prompt, message_id, message_id, BotConfig.IMAGE_DIR, width=width, height=height ) formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip()) cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" client.send_media(message_id, chat_id, path, cap, media_type="image") os.remove(path) except Exception as e: if "Timed out" in str(e): logger.warning("Image generation timed out.") else: logger.warning(f"Img {i}/{count} failed: {e}") _fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.") def _fn_voice_reply( message_id: str, chat_id: str, prompt: str, **_ ): """ Try to generate an audio reply once. If it fails (e.g. a 400), send the text fallback directly (no further retry). """ proc = ( f"Just say this exactly as written in a friendly, playful, " f"happy and helpful but a little bit clumsy-cute way: {prompt}" ) try: res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) except Exception as e: logger.warning(f"Audio generation failed ({e}); sending text only.") return if res and res[0]: path, _ = res client.send_media(message_id, chat_id, path, "", media_type="audio") os.remove(path) else: logger.warning("Audio reply failed") # --- Pydantic Models for Function Calling -------------------------------- class BaseIntent(BaseModel): action: str class SummarizeIntent(BaseIntent): action: Literal["summarize"] text: str class TranslateIntent(BaseIntent): action: Literal["translate"] lang: str text: str class JokeIntent(BaseIntent): action: Literal["joke"] class WeatherIntent(BaseIntent): action: Literal["weather"] location: str class InspireIntent(BaseIntent): action: Literal["inspire"] class MemeIntent(BaseIntent): action: Literal["meme"] text: str class GenerateImageIntent(BaseModel): action: Literal["generate_image"] prompt: str count: int = Field(default=1, ge=1) width: Optional[int] height: Optional[int] class SendTextIntent(BaseModel): action: Literal["send_text"] message: str # list of all intent models INTENT_MODELS = [ SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, InspireIntent, MemeIntent, GenerateImageIntent, SendTextIntent ] ACTION_HANDLERS = { "summarize": lambda mid,cid,**i: _fn_summarize(mid,cid,i["text"]), "translate": lambda mid,cid,**i: _fn_translate(mid,cid,i["lang"],i["text"]), "joke": lambda mid,cid,**i: _fn_joke(mid,cid), "weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]), "inspire": lambda mid,cid,**i: _fn_inspire(mid,cid), "meme": lambda mid,cid,**i: _fn_meme(mid,cid,i["text"]), "generate_image": _fn_generate_images, "send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]), } # --- Intent Routing with Fallback & History‐Reset on 400 ------------------- def route_intent(user_input: str, chat_id: str, sender: str): history_text = get_history_text(chat_id, sender) sys_prompt = ( "You never perform work yourself—you only invoke one of the available functions." "When the user asks for something that matches a function signature, you must return exactly one JSON object matching that function’s parameters—and nothing else. " "Do not wrap it in markdown, do not add extra text, and do not show the JSON to the user. " "If the user’s request does not match any function, reply in plain text, and never mention JSON or internal logic.\n\n" "- summarize(text)\n" "- translate(lang, text)\n" "- joke()\n" "- weather(location)\n" "- inspire()\n" "- meme(text)\n" "- generate_image(prompt, count, width, height)\n" "- send_text(message)\n\n" "Return only raw JSON matching one of these shapes. For example:\n" " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":4,\"width\":1920,\"height\":1080}\n" "Another Example:\n" " {\"action\":\"send_text\",\"message\":\"Hello!\"}\n\n" "Otherwise, use send_text to reply with plain chat and you should only return one json for the current message not for previous conversations.\n" f"Conversation so far:\n{history_text}\n\n current message: User: {user_input}" ) try: raw = generate_llm(sys_prompt) except LLMBadRequestError: clear_history(chat_id, sender) return SendTextIntent(action="send_text", message="Oops, I lost my train of thought—let’s start fresh!") logger.debug(f"LLM raw response: {raw}") try: parsed = json.loads(raw) logger.debug(f"Parsed JSON: {parsed}") except json.JSONDecodeError: return SendTextIntent(action="send_text", message=raw) for M in INTENT_MODELS: try: intent = M.model_validate(parsed) logger.debug(f"Matched intent model: {M.__name__} with data {parsed}") return intent except ValidationError: continue logger.warning("Strict parse failed for all models, falling back to lenient") action = parsed.get("action") if action in ACTION_HANDLERS: data = parsed kwargs = {} if action == "generate_image": kwargs["prompt"] = data.get("prompt","") kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)) kwargs["width"] = data.get("width") kwargs["height"] = data.get("height") elif action == "send_text": kwargs["message"] = data.get("message","") elif action == "translate": kwargs["lang"] = data.get("lang","") kwargs["text"] = data.get("text","") elif action == "summarize": kwargs["text"] = data.get("text","") elif action == "weather": kwargs["location"] = data.get("location","") elif action == "meme": kwargs["text"] = data.get("text","") try: model = next( m for m in INTENT_MODELS if getattr(m, "__fields__", {}).get("action").default == action ) intent = model.model_validate({"action":action, **kwargs}) logger.debug(f"Leniently matched intent model: {model.__name__} with kwargs {kwargs}") return intent except Exception as e: logger.error(f"Lenient parsing into Pydantic failed: {e}") return SendTextIntent(action="send_text", message=raw) return SendTextIntent(action="send_text", message=raw) # --- FastAPI & Webhook ---------------------------------------------------- app = FastAPI() help_text = ( "*🤖 Eve's Command Center:*\n\n" "🔹 `/help` - Show this help message\n" "🔹 `/summarize ` - Get a quick summary of the text\n" "🔹 `/translate |` - Convert your text to the specified language\n" "🔹 `/joke` - Enjoy a light-hearted joke\n" "🔹 `/weather ` - Check the current weather for your location\n" "🔹 `/inspire` - Receive an uplifting inspirational quote\n" "🔹 `/meme ` - Generate a meme based on your text\n" "🔹 `/gen |||` - Generate creative images\n\n" "Just type your message or command to start interacting with Eve!" ) @app.post("/whatsapp") async def whatsapp_webhook(request: Request): data = await request.json() logger.debug(f"Incoming webhook payload: {json.dumps(data)}") if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": raise HTTPException(403, "Unauthorized") try: chat_id = data["senderData"]["chatId"] sender = data["senderData"]["sender"] mid = data["idMessage"] except KeyError: try: chat_id = data["chatId"] sender = data["sender"] mid = data["messageId"] except KeyError: logger.error("Cannot find chat_id/sender/message_id in payload") return {"success": False, "error": "bad payload"} set_thread_context(chat_id, sender, mid) logger.debug(f"Received webhook for message {mid} from {sender}") if chat_id != BotConfig.BOT_GROUP_CHAT or data.get("typeWebhook") != "incomingMessageReceived": return {"success": True} md = data.get("messageData", {}) tmd = md.get("textMessageData") or md.get("extendedTextMessageData") if not tmd: return {"success": True} body = (tmd.get("textMessage") or tmd.get("text","")).strip() record_user_message(chat_id, sender, body) logger.debug(f"User message: {body}") low = body.lower() if low == "/help": _fn_send_text(mid, chat_id, help_text) return {"success": True} if low.startswith("/summarize "): _fn_summarize(mid, chat_id, body[11:].strip()) return {"success": True} if low.startswith("/translate "): lang, txt = body[11:].split("|", 1) _fn_translate(mid, chat_id, lang.strip(), txt.strip()) return {"success": True} if low == "/joke": _fn_joke(mid, chat_id) return {"success": True} if low.startswith("/weather "): _fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")) return {"success": True} if low == "/inspire": _fn_inspire(mid, chat_id) return {"success": True} if low.startswith("/meme "): _fn_meme(mid, chat_id, body[6:].strip()) return {"success": True} if low.startswith("/gen"): parts = body[4:].split("|") pr = parts[0].strip() count = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None _fn_send_accept(mid, chat_id, f"✨ Generating {count if count != 1 else 'a'} image{'s' if count != 1 else ''}...") task_queue.put({ "type": "image", "message_id": mid, "chat_id": chat_id, "prompt": pr, "count": count, "width": width, "height": height }) return {"success": True} if tmd.get("contextInfo", {}).get("mentionedJidList"): return {"success": True} if md.get("typeMessage") == "quotedMessage": ext = md["extendedTextMessageData"] quoted = md["quotedMessage"] if ext.get("participant") == BotConfig.BOT_JID: effective = ( f"Quoted: {quoted.get('textMessage','')}\n" f"User: {ext.get('text','')}" ) else: effective = body else: effective = body intent = route_intent(effective, chat_id, sender) logger.debug(f"Final intent: {intent}") handler = ACTION_HANDLERS.get(intent.action) if handler: kwargs = intent.model_dump(exclude={"action"}) logger.debug(f"Dispatching action '{intent.action}' with args {kwargs}") handler(mid, chat_id, **kwargs) else: logger.warning(f"No handler for action '{intent.action}'") _fn_send_text(mid, chat_id, "Sorry, I didn't understand that.") return {"success": True} @app.get("/", response_class=PlainTextResponse) def index(): return "Server is running!" @app.api_route("/health", methods=["GET", "HEAD"]) def health(): # HEAD requests ignore the body by HTTP spec, so FastAPI handles that automatically return JSONResponse(content={"status": "ok"}) if __name__ == "__main__": client.send_message_to( BotConfig.BOT_GROUP_CHAT, "🌟 Eve is online! Type /help to see commands." ) import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)