import os import threading import requests import logging import queue import json import asyncio from typing import List, Optional, Literal from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from telegram import Update, Message, Bot from telegram.ext import ( ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters, ) from pydantic import BaseModel, Field, ValidationError from FLUX import generate_image from VoiceReply import generate_voice_reply from polLLM import generate_llm, LLMBadRequestError # --- Logging Setup --------------------------------------------------------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger("eve_bot") logger.setLevel(LOG_LEVEL) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s [%(levelname)s] [%(chat_id)s/%(user_id)s] %(message)s" ) handler.setFormatter(formatter) class ContextFilter(logging.Filter): def filter(self, record): record.chat_id = getattr(record, "chat_id", "-") record.user_id = getattr(record, "user_id", "-") return True handler.addFilter(ContextFilter()) logger.handlers = [handler] # Thread‐local to carry context through helpers _thread_ctx = threading.local() def set_thread_context(chat_id, user_id, message_id): _thread_ctx.chat_id = chat_id _thread_ctx.user_id = user_id _thread_ctx.message_id = message_id def get_thread_context(): return ( getattr(_thread_ctx, "chat_id", None), getattr(_thread_ctx, "user_id", None), getattr(_thread_ctx, "message_id", None), ) # --- Conversation History ------------------------------------------------- history = defaultdict(lambda: deque(maxlen=20)) def record_user_message(chat_id, user_id, message): history[(chat_id, user_id)].append(f"User: {message}") def record_bot_message(chat_id, user_id, message): history[(chat_id, user_id)].append(f"Assistant: {message}") def get_history_text(chat_id, user_id): return "\n".join(history[(chat_id, user_id)]) def clear_history(chat_id, user_id): history[(chat_id, user_id)].clear() # --- Config --------------------------------------------------------------- class BotConfig: TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") IMAGE_DIR = "/tmp/images" AUDIO_DIR = "/tmp/audio" DEFAULT_IMAGE_COUNT = 4 @classmethod def validate(cls): if not cls.TELEGRAM_TOKEN: raise ValueError("Missing TELEGRAM_TOKEN") BotConfig.validate() # --- Threading & Queues --------------------------------------------------- task_queue = queue.Queue() polls = {} def worker(): while True: task = task_queue.get() try: if task["type"] == "image": _fn_generate_images(**task) except Exception as e: logger.error(f"Worker error {task}: {e}") finally: task_queue.task_done() for _ in range(4): threading.Thread(target=worker, daemon=True).start() # --- Core Handlers -------------------------------------------------------- async def _fn_send_text(mid: int, cid: int, text: str, context: ContextTypes.DEFAULT_TYPE): chat_id, user_id, _ = get_thread_context() msg: Message = await context.bot.send_message( chat_id=cid, text=text, reply_to_message_id=mid ) record_bot_message(chat_id, user_id, text) # enqueue audio reply in async loop context.application.create_task(_fn_voice_reply_async( msg.message_id, cid, text, context )) async def _fn_send_text_wrapper(mid, cid, message, context): await _fn_send_text(mid, cid, message, context) async def _fn_voice_reply_async(mid: int, cid: int, prompt: str, context: ContextTypes.DEFAULT_TYPE): proc = ( f"Just say this exactly as written in a friendly, playful, " f"happy and helpful but a little bit clumsy-cute way: {prompt}" ) res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) if res and res[0]: path, _ = res with open(path, "rb") as f: await context.bot.send_audio(chat_id=cid, audio=f, reply_to_message_id=mid) os.remove(path) else: await _fn_send_text(mid, cid, prompt, context) async def _fn_summarize(mid, cid, text, context): summary = generate_llm(f"Summarize:\n\n{text}") await _fn_send_text(mid, cid, summary, context) async def _fn_translate(mid, cid, lang, text, context): resp = generate_llm(f"Translate to {lang}:\n\n{text}") await _fn_send_text(mid, cid, resp, context) async def _fn_joke(mid, cid, context): try: j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() joke = f"{j['setup']}\n\n{j['punchline']}" except: joke = generate_llm("Tell me a short joke.") await _fn_send_text(mid, cid, joke, context) async def _fn_weather(mid, cid, loc, context): raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text report = generate_llm(f"Give a weather report in °C:\n\n{raw}") await _fn_send_text(mid, cid, report, context) async def _fn_inspire(mid, cid, context): quote = generate_llm("Give me a unique, random short inspirational quote.") await _fn_send_text(mid, cid, f"✨ {quote}", context) async def _fn_meme(mid, cid, txt, context): await context.bot.send_message(chat_id=cid, text="🎨 Generating meme…", reply_to_message_id=mid) task_queue.put({"type":"image","message_id":mid,"chat_id":cid,"prompt":f"meme: {txt}"}) async def _fn_poll_create(mid, cid, question, options, context): votes = {i+1:0 for i in range(len(options))} polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}} text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options)) await context.bot.send_message(chat_id=cid, text=text, reply_to_message_id=mid, parse_mode="Markdown") record_bot_message(cid, get_thread_context()[1], text) async def _fn_poll_vote(mid, cid, voter, choice, context): poll = polls.get(cid) if not poll or choice<1 or choice>len(poll["options"]): return prev = poll["voters"].get(voter) if prev: poll["votes"][prev] -= 1 poll["votes"][choice] += 1 poll["voters"][voter] = choice await _fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}", context) async def _fn_poll_results(mid, cid, context): poll = polls.get(cid) if not poll: await _fn_send_text(mid, cid, "No active poll.", context) return txt = f"📊 *Results:* {poll['question']}\n" + "\n".join( f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) ) await _fn_send_text(mid, cid, txt, context) async def _fn_poll_end(mid, cid, context): poll = polls.pop(cid, None) if not poll: await _fn_send_text(mid, cid, "No active poll.", context) return txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) ) await _fn_send_text(mid, cid, txt, context) def _fn_generate_images(message_id, chat_id, prompt, count=1, width=None, height=None, **_): """ Runs in a background thread. Spins up its own asyncio loop so we can `await` the bot’s send_message / send_photo coroutines. """ b = Bot(BotConfig.TELEGRAM_TOKEN) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete( b.send_message( chat_id=chat_id, text=f"✨ Generating {count} image(s)…", reply_to_message_id=message_id ) ) for i in range(1, count+1): try: img, path, ret_p, url = generate_image( prompt, str(message_id), str(message_id), BotConfig.IMAGE_DIR, width=width, height=height ) caption = f"✨ Image {i}/{count}: {url}\n\n{ret_p}" with open(path, "rb") as f: loop.run_until_complete( b.send_photo( chat_id=chat_id, photo=f, caption=caption, reply_to_message_id=message_id ) ) os.remove(path) except Exception as e: logger.warning(f"Img {i}/{count} failed: {e}") loop.run_until_complete( b.send_message( chat_id=chat_id, text=f"😢 Failed to generate image {i}/{count}.", reply_to_message_id=message_id ) ) finally: loop.close() # --- Pydantic Models & Intent Routing ------------------------------------ class BaseIntent(BaseModel): action: str class SummarizeIntent(BaseIntent): action: Literal["summarize"] text: str class TranslateIntent(BaseIntent): action: Literal["translate"] lang: str text: str class JokeIntent(BaseIntent): action: Literal["joke"] class WeatherIntent(BaseIntent): action: Literal["weather"] location: str class InspireIntent(BaseIntent): action: Literal["inspire"] class MemeIntent(BaseIntent): action: Literal["meme"] text: str class PollCreateIntent(BaseIntent): action: Literal["poll_create"] question: str options: List[str] class PollVoteIntent(BaseIntent): action: Literal["poll_vote"] voter: str choice: int class PollResultsIntent(BaseIntent): action: Literal["poll_results"] class PollEndIntent(BaseIntent): action: Literal["poll_end"] class GenerateImageIntent(BaseModel): action: Literal["generate_image"] prompt: str count: int = Field(default=1, ge=1) width: Optional[int] height: Optional[int] class SendTextIntent(BaseIntent): action: Literal["send_text"] message: str INTENT_MODELS = [ SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent, PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent ] async def _fn_enqueue_image(mid, cid, prompt, count, width, height, context): task_queue.put({ "type":"image", "message_id": mid, "chat_id": cid, "prompt": prompt, "count": count, "width": width, "height": height }) ACTION_HANDLERS = { "summarize": _fn_summarize, "translate": _fn_translate, "joke": _fn_joke, "weather": _fn_weather, "inspire": _fn_inspire, "meme": _fn_meme, "poll_create": _fn_poll_create, "poll_vote": _fn_poll_vote, "poll_results": _fn_poll_results, "poll_end": _fn_poll_end, "generate_image": _fn_enqueue_image, "send_text": _fn_send_text_wrapper, } def route_intent(user_input: str, chat_id: str, sender: str): history_text = get_history_text(chat_id, sender) sys_prompt = ( "You never perform work yourself—you only invoke one of the available functions. " "When the user asks for something that matches a function signature, you must return exactly one JSON object matching that function’s parameters—and nothing else. " "Do not wrap it in markdown, do not add extra text, and do not show the JSON to the user. " "If the user’s request does not match any function, reply in plain text, and never mention JSON or internal logic.\n\n" "- summarize(text)\n" "- translate(lang, text)\n" "- joke()\n" "- weather(location)\n" "- inspire()\n" "- meme(text)\n" "- poll_create(question, options)\n" "- poll_vote(voter, choice)\n" "- poll_results()\n" "- poll_end()\n" "- generate_image(prompt, count, width, height)\n" "- send_text(message)\n\n" "Return only raw JSON matching one of these shapes. For example:\n" " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n" "Otherwise, use send_text to reply with plain chat and you should only return one json for the current message not for previous conversations.\n" f"Conversation so far:\n{history_text}\n\n current message: User: {user_input}" ) try: raw = generate_llm(sys_prompt) except LLMBadRequestError: clear_history(chat_id, sender) return SendTextIntent(action="send_text", message="Oops, let’s start fresh!") try: parsed = json.loads(raw) except json.JSONDecodeError: return SendTextIntent(action="send_text", message=raw) for M in INTENT_MODELS: try: return M.model_validate(parsed) except ValidationError: continue action = parsed.get("action") if action in ACTION_HANDLERS: data = parsed kwargs = {} if action == "generate_image": kwargs = { "prompt": data.get("prompt",""), "count": int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)), "width": data.get("width"), "height": data.get("height"), } elif action == "send_text": kwargs = {"message": data.get("message","")} elif action == "translate": kwargs = {"lang": data.get("lang",""), "text": data.get("text","")} elif action == "summarize": kwargs = {"text": data.get("text","")} elif action == "weather": kwargs = {"location": data.get("location","")} elif action == "meme": kwargs = {"text": data.get("text","")} elif action == "poll_create": kwargs = {"question": data.get("question",""), "options": data.get("options",[])} elif action == "poll_vote": kwargs = {"voter": sender, "choice": int(data.get("choice",0))} try: model = next(m for m in INTENT_MODELS if getattr(m, "__fields__", {}).get("action").default == action) return model.model_validate({"action":action, **kwargs}) except Exception: return SendTextIntent(action="send_text", message=raw) return SendTextIntent(action="send_text", message=raw) # --- Telegram Handlers ---------------------------------------------------- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("🌟 Eve is online! Type /help to see commands.") async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_markdown( "🤖 *Eve* commands:\n" "• /help\n" "• /summarize \n" "• /translate |\n" "• /joke\n" "• /weather \n" "• /inspire\n" "• /meme \n" "• /poll |opt1|opt2 …\n" "• /results\n" "• /endpoll\n" "• /gen |||\n" "Otherwise just chat with me." ) async def message_router(update: Update, context: ContextTypes.DEFAULT_TYPE): msg: Message = update.message chat_id = msg.chat.id user_id = msg.from_user.id mid = msg.message_id text = msg.text or "" set_thread_context(chat_id, user_id, mid) record_user_message(chat_id, user_id, text) low = text.lower().strip() if low == "/help": return await help_cmd(update, context) if low.startswith("/summarize "): return await _fn_summarize(mid, chat_id, text[11:].strip(), context) if low.startswith("/translate "): lang, txt = text[11:].split("|",1) return await _fn_translate(mid, chat_id, lang.strip(), txt.strip(), context) if low == "/joke": return await _fn_joke(mid, chat_id, context) if low.startswith("/weather "): return await _fn_weather(mid, chat_id, text[9:].strip().replace(" ","+"), context) if low == "/inspire": return await _fn_inspire(mid, chat_id, context) if low.startswith("/meme "): return await _fn_meme(mid, chat_id, text[6:].strip(), context) if low.startswith("/poll "): parts = [p.strip() for p in text[6:].split("|")] return await _fn_poll_create(mid, chat_id, parts[0], parts[1:], context) if chat_id in polls and low.isdigit(): return await _fn_poll_vote(mid, chat_id, str(user_id), int(low), context) if low == "/results": return await _fn_poll_results(mid, chat_id, context) if low == "/endpoll": return await _fn_poll_end(mid, chat_id, context) if low.startswith("/gen"): parts = text[4:].split("|") pr = parts[0].strip() ct = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT width = int(parts[2]) if len(parts)>2 and parts[2].isdigit() else None height = int(parts[3]) if len(parts)>3 and parts[3].isdigit() else None task_queue.put({ "type":"image","message_id":mid,"chat_id":chat_id, "prompt":pr,"count":ct,"width":width,"height":height }) return intent = route_intent(text, str(chat_id), str(user_id)) handler = ACTION_HANDLERS.get(intent.action) kwargs = intent.model_dump(exclude={"action"}) await handler(mid, chat_id, **kwargs, context=context) def main(): app = ApplicationBuilder().token(BotConfig.TELEGRAM_TOKEN).build() app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_cmd)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, message_router)) logger.info("Starting Telegram bot…") app.run_polling() if __name__ == "__main__": main()