import os import threading import requests import logging import queue import json from typing import List, Optional, Literal from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, Request, HTTPException from fastapi.responses import PlainTextResponse from pydantic import BaseModel, Field, ValidationError from FLUX import generate_image from VoiceReply import generate_voice_reply from polLLM import generate_llm, LLMBadRequestError # assume this exception is raised on 400 # --- Logging Setup --------------------------------------------------------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger("eve_bot") logger.setLevel(LOG_LEVEL) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s" ) handler.setFormatter(formatter) class ContextFilter(logging.Filter): def filter(self, record): record.message_id = getattr(record, "message_id", "-") record.sender = getattr(record, "sender", "-") return True handler.addFilter(ContextFilter()) logger.handlers = [handler] # Thread‐local to carry context through helpers _thread_ctx = threading.local() def set_thread_context(chat_id, sender, message_id): _thread_ctx.chat_id = chat_id _thread_ctx.sender = sender _thread_ctx.message_id = message_id def get_thread_context(): return ( getattr(_thread_ctx, "chat_id", None), getattr(_thread_ctx, "sender", None), getattr(_thread_ctx, "message_id", None), ) # --- Conversation History ------------------------------------------------- history = defaultdict(lambda: deque(maxlen=10)) def record_user_message(chat_id, sender, message): history[(chat_id, sender)].append(f"User: {message}") def record_bot_message(chat_id, sender, message): history[(chat_id, sender)].append(f"Assistant: {message}") def get_history_text(chat_id, sender): return "\n".join(history[(chat_id, sender)]) def clear_history(chat_id, sender): history[(chat_id, sender)].clear() # --- Bot Config & Client -------------------------------------------------- class BotConfig: GREEN_API_URL = os.getenv("GREEN_API_URL") GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") BOT_GROUP_CHAT = "120363312903494448@g.us" BOT_JID = os.getenv("BOT_JID") IMAGE_DIR = "/tmp/images" AUDIO_DIR = "/tmp/audio" DEFAULT_IMAGE_COUNT = 4 @classmethod def validate(cls): missing = [n for n in ( "GREEN_API_URL","GREEN_API_TOKEN", "GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID" ) if not getattr(cls, n)] if missing: raise ValueError(f"Missing env vars: {', '.join(missing)}") class BotClient: def __init__(self, cfg: BotConfig): self.cfg = cfg self.session = requests.Session() def send(self, endpoint, payload, files=None, retries=3): url = ( f"{self.cfg.GREEN_API_URL}/waInstance" f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" f"{self.cfg.GREEN_API_TOKEN}" ) for i in range(1, retries+1): try: resp = self.session.post( url, json=payload if files is None else None, data=None if files is None else payload, files=files ) resp.raise_for_status() return resp.json() except requests.RequestException as e: logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}") return {"error":"failed"} def send_message(self, message_id, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text, "quotedMessageId": message_id }) def send_message_to(self, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text }) def send_media(self, message_id, chat_id, file_path, caption, media_type): endpoint = "sendFileByUpload" payload = { "chatId": chat_id, "caption": caption, "quotedMessageId": message_id } with open(file_path,"rb") as f: mime = "image/jpeg" if media_type=="image" else "audio/mpeg" files = [("file",(os.path.basename(file_path),f,mime))] return self.send(endpoint, payload, files=files) BotConfig.validate() client = BotClient(BotConfig) # --- Threading & Queues --------------------------------------------------- task_queue = queue.Queue() polls = {} executor = ThreadPoolExecutor(max_workers=4) def worker(): while True: task = task_queue.get() try: if task["type"] == "image": _fn_generate_images(**task) elif task["type"] == "audio": _fn_voice_reply(**task) except Exception as e: logger.error(f"Worker error {task}: {e}") finally: task_queue.task_done() for _ in range(4): threading.Thread(target=worker, daemon=True).start() # --- Basic Tool Functions ------------------------------------------------- def _fn_send_text(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) task_queue.put({ "type": "audio", "message_id": mid, "chat_id": cid, "prompt": message }) def _fn_send_accept(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) def _fn_joke(mid, cid): try: j = requests.get( "https://official-joke-api.appspot.com/random_joke", timeout=5 ).json() joke = f"{j['setup']}\n\n{j['punchline']}" except: joke = generate_llm("Tell me a short joke.") _fn_send_text(mid, cid, joke) def _fn_weather(mid, cid, loc): raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text report = generate_llm(f"Give a weather report in °C:\n\n{raw}") _fn_send_text(mid, cid, report) def _fn_inspire(mid, cid): quote = generate_llm("Give me a unique, random short inspirational quote.") _fn_send_text(mid, cid, f"✨ {quote}") def _fn_generate_images( message_id: str, chat_id: str, prompt: str, count: int = 1, width: Optional[int] = None, height: Optional[int] = None, **_ ): _fn_send_accept(message_id, chat_id, f"✨ Generating {count} image(s)…") for i in range(1, count+1): try: img, path, ret_p, url = generate_image( prompt, message_id, message_id, BotConfig.IMAGE_DIR, width=width, height=height ) formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip()) cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" client.send_media(message_id, chat_id, path, cap, media_type="image") os.remove(path) except Exception as e: logger.warning(f"Img {i}/{count} failed: {e}") _fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.") def _fn_voice_reply( message_id: str, chat_id: str, prompt: str, **_ ): proc = ( f"Just say this exactly as written in a friendly, playful, " f"happy and helpful but a little bit clumsy-cute way: {prompt}" ) res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) if res and res[0]: path, _ = res client.send_media(message_id, chat_id, path, "", media_type="audio") os.remove(path) else: _fn_send_text(message_id, chat_id, prompt) # --- Pydantic Models for Function Calling -------------------------------- class BaseIntent(BaseModel): action: str class JokeIntent(BaseIntent): action: Literal["joke"] class WeatherIntent(BaseIntent): action: Literal["weather"] location: str class InspireIntent(BaseIntent): action: Literal["inspire"] class GenerateImageIntent(BaseModel): action: Literal["generate_image"] prompt: str count: int = Field(default=1, ge=1) width: Optional[int] height: Optional[int] class SendTextIntent(BaseIntent): action: Literal["send_text"] message: str INTENT_MODELS = [ JokeIntent, WeatherIntent, InspireIntent, GenerateImageIntent, SendTextIntent ] ACTION_HANDLERS = { "joke": lambda mid,cid,**i: _fn_joke(mid,cid), "weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]), "inspire": lambda mid,cid,**i: _fn_inspire(mid,cid), "generate_image": _fn_generate_images, "send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]), } # --- Intent Routing with 400‐Retry & No Spam ------------------------------ def route_intent(user_input: str, chat_id: str, sender: str): history_text = get_history_text(chat_id, sender) sys_prompt = ( "You are Eve. You can either chat or call one of these functions:\n" "- joke()\n" "- weather(location)\n" "- inspire()\n" "- generate_image(prompt, count, width, height)\n" "- send_text(message)\n\n" "Return only raw JSON matching one of these shapes. For example:\n" " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n" "Otherwise, use send_text to reply with plain chat.\n" ) prompt = f"{sys_prompt}\nConversation so far:\n{history_text}\n\nUser: {user_input}" # try primary LLM call try: raw = generate_llm(prompt) except LLMBadRequestError: # on 400: clear history, retry once without context clear_history(chat_id, sender) logger.warning("LLMBadRequestError—cleared history, retrying without context") prompt_retry = f"{sys_prompt}\nUser: {user_input}" try: raw = generate_llm(prompt_retry) except LLMBadRequestError: logger.error("LLMBadRequestError on retry—aborting intent routing") return SendTextIntent(action="send_text", message="Sorry, I'm having trouble. Please try again in a moment.") logger.debug(f"LLM raw response: {raw}") # 1) Strict: Pydantic validation try: parsed = json.loads(raw) logger.debug(f"Parsed JSON: {parsed}") except json.JSONDecodeError: return SendTextIntent(action="send_text", message=raw) for M in INTENT_MODELS: try: intent = M.model_validate(parsed) logger.debug(f"Matched intent model: {M.__name__} with data {parsed}") return intent except ValidationError: continue logger.warning("Strict parse failed for all models, falling back to lenient") # 2) Lenient JSON get action = parsed.get("action") if action in ACTION_HANDLERS: data = parsed kwargs = {} if action == "generate_image": kwargs["prompt"] = data.get("prompt","") kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)) kwargs["width"] = data.get("width") kwargs["height"] = data.get("height") elif action == "send_text": kwargs["message"] = data.get("message","") # ... add other parameter extractions if needed ... try: model = next( m for m in INTENT_MODELS if getattr(m, "__fields__", {}).get("action").default == action ) intent = model.model_validate({"action":action, **kwargs}) logger.debug(f"Leniently matched intent model: {model.__name__} with kwargs {kwargs}") return intent except Exception as e: logger.error(f"Lenient parsing into Pydantic failed: {e}") return SendTextIntent(action="send_text", message=raw) return SendTextIntent(action="send_text", message=raw) # --- FastAPI & Webhook ---------------------------------------------------- app = FastAPI() help_text = ( "🤖 *Eve* commands:\n" "• /help\n" "• /joke\n" "• /weather \n" "• /inspire\n" "• /gen |||\n" "Otherwise chat or reply to my message to invoke tools." ) @app.post("/whatsapp") async def whatsapp_webhook(request: Request): data = await request.json() if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": raise HTTPException(403, "Unauthorized") chat_id = data["senderData"]["chatId"] sender = data["senderData"]["sender"] mid = data["idMessage"] set_thread_context(chat_id, sender, mid) logger.debug(f"Received webhook for message {mid} from {sender}") if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived": return {"success": True} md = data["messageData"] tmd = md.get("textMessageData") or md.get("extendedTextMessageData") if not tmd: return {"success": True} body = (tmd.get("textMessage") or tmd.get("text","")).strip() record_user_message(chat_id, sender, body) logger.debug(f"User message: {body}") low = body.lower() # Slash commands... if low == "/help": _fn_send_text(mid, chat_id, help_text) return {"success": True} if low == "/joke": _fn_joke(mid, chat_id) return {"success": True} if low.startswith("/weather "): _fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")) return {"success": True} if low == "/inspire": _fn_inspire(mid, chat_id) return {"success": True} if low.startswith("/gen"): parts = body[4:].split("|") pr = parts[0].strip() ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None _fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…") task_queue.put({ "type": "image", "message_id": mid, "chat_id": chat_id, "prompt": pr, "count": ct, "width": width, "height": height }) return {"success": True} # Skip mentions if tmd.get("contextInfo", {}).get("mentionedJidList"): return {"success": True} # Handle quoted replies to the bot if md.get("typeMessage") == "quotedMessage": ext = md["extendedTextMessageData"] quoted = md["quotedMessage"] if ext.get("participant") == BotConfig.BOT_JID: effective = ( f"Quoted: {quoted.get('textMessage','')}\n" f"User: {ext.get('text','')}" ) else: effective = body else: effective = body # Route intent & dispatch intent = route_intent(effective, chat_id, sender) logger.debug(f"Final intent: {intent}") handler = ACTION_HANDLERS.get(intent.action) if handler: kwargs = intent.model_dump(exclude={"action"}) logger.debug(f"Dispatching action '{intent.action}' with args {kwargs}") handler(mid, chat_id, **kwargs) else: logger.warning(f"No handler for action '{intent.action}'") _fn_send_text(mid, chat_id, "Sorry, I didn't understand that.") return {"success": True} @app.get("/", response_class=PlainTextResponse) def index(): return "Server is running!" if __name__ == "__main__": client.send_message_to( BotConfig.BOT_GROUP_CHAT, "🌟 Eve is online! Type /help to see commands." ) import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)