eve / telebot.py
Chandima Prabhath
Add health check endpoint and improve logging context; update requirements for telegram bot support
b9cdf7a
import os
import threading
import requests
import logging
import queue
import json
import asyncio
from typing import List, Optional, Literal
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from telegram import Update, Message, Bot
from telegram.ext import (
ApplicationBuilder,
ContextTypes,
CommandHandler,
MessageHandler,
filters,
)
from pydantic import BaseModel, Field, ValidationError
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm, LLMBadRequestError
# --- Logging Setup ---------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger("eve_bot")
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(chat_id)s/%(user_id)s] %(message)s"
)
handler.setFormatter(formatter)
class ContextFilter(logging.Filter):
def filter(self, record):
record.chat_id = getattr(record, "chat_id", "-")
record.user_id = getattr(record, "user_id", "-")
return True
handler.addFilter(ContextFilter())
logger.handlers = [handler]
# Thread‐local to carry context through helpers
_thread_ctx = threading.local()
def set_thread_context(chat_id, user_id, message_id):
_thread_ctx.chat_id = chat_id
_thread_ctx.user_id = user_id
_thread_ctx.message_id = message_id
def get_thread_context():
return (
getattr(_thread_ctx, "chat_id", None),
getattr(_thread_ctx, "user_id", None),
getattr(_thread_ctx, "message_id", None),
)
# --- Conversation History -------------------------------------------------
history = defaultdict(lambda: deque(maxlen=20))
def record_user_message(chat_id, user_id, message):
history[(chat_id, user_id)].append(f"User: {message}")
def record_bot_message(chat_id, user_id, message):
history[(chat_id, user_id)].append(f"Assistant: {message}")
def get_history_text(chat_id, user_id):
return "\n".join(history[(chat_id, user_id)])
def clear_history(chat_id, user_id):
history[(chat_id, user_id)].clear()
# --- Config ---------------------------------------------------------------
class BotConfig:
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
if not cls.TELEGRAM_TOKEN:
raise ValueError("Missing TELEGRAM_TOKEN")
BotConfig.validate()
# --- Threading & Queues ---------------------------------------------------
task_queue = queue.Queue()
polls = {}
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(**task)
except Exception as e:
logger.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Core Handlers --------------------------------------------------------
async def _fn_send_text(mid: int, cid: int, text: str, context: ContextTypes.DEFAULT_TYPE):
chat_id, user_id, _ = get_thread_context()
msg: Message = await context.bot.send_message(
chat_id=cid,
text=text,
reply_to_message_id=mid
)
record_bot_message(chat_id, user_id, text)
# enqueue audio reply in async loop
context.application.create_task(_fn_voice_reply_async(
msg.message_id, cid, text, context
))
async def _fn_send_text_wrapper(mid, cid, message, context):
await _fn_send_text(mid, cid, message, context)
async def _fn_voice_reply_async(mid: int, cid: int, prompt: str, context: ContextTypes.DEFAULT_TYPE):
proc = (
f"Just say this exactly as written in a friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR)
if res and res[0]:
path, _ = res
with open(path, "rb") as f:
await context.bot.send_audio(chat_id=cid, audio=f, reply_to_message_id=mid)
os.remove(path)
else:
await _fn_send_text(mid, cid, prompt, context)
async def _fn_summarize(mid, cid, text, context):
summary = generate_llm(f"Summarize:\n\n{text}")
await _fn_send_text(mid, cid, summary, context)
async def _fn_translate(mid, cid, lang, text, context):
resp = generate_llm(f"Translate to {lang}:\n\n{text}")
await _fn_send_text(mid, cid, resp, context)
async def _fn_joke(mid, cid, context):
try:
j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
await _fn_send_text(mid, cid, joke, context)
async def _fn_weather(mid, cid, loc, context):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(f"Give a weather report in °C:\n\n{raw}")
await _fn_send_text(mid, cid, report, context)
async def _fn_inspire(mid, cid, context):
quote = generate_llm("Give me a unique, random short inspirational quote.")
await _fn_send_text(mid, cid, f"✨ {quote}", context)
async def _fn_meme(mid, cid, txt, context):
await context.bot.send_message(chat_id=cid, text="🎨 Generating meme…", reply_to_message_id=mid)
task_queue.put({"type":"image","message_id":mid,"chat_id":cid,"prompt":f"meme: {txt}"})
async def _fn_poll_create(mid, cid, question, options, context):
votes = {i+1:0 for i in range(len(options))}
polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}}
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options))
await context.bot.send_message(chat_id=cid, text=text, reply_to_message_id=mid, parse_mode="Markdown")
record_bot_message(cid, get_thread_context()[1], text)
async def _fn_poll_vote(mid, cid, voter, choice, context):
poll = polls.get(cid)
if not poll or choice<1 or choice>len(poll["options"]): return
prev = poll["voters"].get(voter)
if prev: poll["votes"][prev] -= 1
poll["votes"][choice] += 1
poll["voters"][voter] = choice
await _fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}", context)
async def _fn_poll_results(mid, cid, context):
poll = polls.get(cid)
if not poll:
await _fn_send_text(mid, cid, "No active poll.", context)
return
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
await _fn_send_text(mid, cid, txt, context)
async def _fn_poll_end(mid, cid, context):
poll = polls.pop(cid, None)
if not poll:
await _fn_send_text(mid, cid, "No active poll.", context)
return
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
await _fn_send_text(mid, cid, txt, context)
def _fn_generate_images(message_id, chat_id, prompt, count=1, width=None, height=None, **_):
"""
Runs in a background thread. Spins up its own asyncio loop
so we can `await` the bot’s send_message / send_photo coroutines.
"""
b = Bot(BotConfig.TELEGRAM_TOKEN)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
b.send_message(
chat_id=chat_id,
text=f"✨ Generating {count} image(s)…",
reply_to_message_id=message_id
)
)
for i in range(1, count+1):
try:
img, path, ret_p, url = generate_image(
prompt, str(message_id), str(message_id),
BotConfig.IMAGE_DIR, width=width, height=height
)
caption = f"✨ Image {i}/{count}: {url}\n\n{ret_p}"
with open(path, "rb") as f:
loop.run_until_complete(
b.send_photo(
chat_id=chat_id,
photo=f,
caption=caption,
reply_to_message_id=message_id
)
)
os.remove(path)
except Exception as e:
logger.warning(f"Img {i}/{count} failed: {e}")
loop.run_until_complete(
b.send_message(
chat_id=chat_id,
text=f"😢 Failed to generate image {i}/{count}.",
reply_to_message_id=message_id
)
)
finally:
loop.close()
# --- Pydantic Models & Intent Routing ------------------------------------
class BaseIntent(BaseModel):
action: str
class SummarizeIntent(BaseIntent):
action: Literal["summarize"]
text: str
class TranslateIntent(BaseIntent):
action: Literal["translate"]
lang: str
text: str
class JokeIntent(BaseIntent):
action: Literal["joke"]
class WeatherIntent(BaseIntent):
action: Literal["weather"]
location: str
class InspireIntent(BaseIntent):
action: Literal["inspire"]
class MemeIntent(BaseIntent):
action: Literal["meme"]
text: str
class PollCreateIntent(BaseIntent):
action: Literal["poll_create"]
question: str
options: List[str]
class PollVoteIntent(BaseIntent):
action: Literal["poll_vote"]
voter: str
choice: int
class PollResultsIntent(BaseIntent):
action: Literal["poll_results"]
class PollEndIntent(BaseIntent):
action: Literal["poll_end"]
class GenerateImageIntent(BaseModel):
action: Literal["generate_image"]
prompt: str
count: int = Field(default=1, ge=1)
width: Optional[int]
height: Optional[int]
class SendTextIntent(BaseIntent):
action: Literal["send_text"]
message: str
INTENT_MODELS = [
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent,
InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent,
PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent
]
async def _fn_enqueue_image(mid, cid, prompt, count, width, height, context):
task_queue.put({
"type":"image",
"message_id": mid,
"chat_id": cid,
"prompt": prompt,
"count": count,
"width": width,
"height": height
})
ACTION_HANDLERS = {
"summarize": _fn_summarize,
"translate": _fn_translate,
"joke": _fn_joke,
"weather": _fn_weather,
"inspire": _fn_inspire,
"meme": _fn_meme,
"poll_create": _fn_poll_create,
"poll_vote": _fn_poll_vote,
"poll_results": _fn_poll_results,
"poll_end": _fn_poll_end,
"generate_image": _fn_enqueue_image,
"send_text": _fn_send_text_wrapper,
}
def route_intent(user_input: str, chat_id: str, sender: str):
history_text = get_history_text(chat_id, sender)
sys_prompt = (
"You never perform work yourself—you only invoke one of the available functions. "
"When the user asks for something that matches a function signature, you must return exactly one JSON object matching that function’s parameters—and nothing else. "
"Do not wrap it in markdown, do not add extra text, and do not show the JSON to the user. "
"If the user’s request does not match any function, reply in plain text, and never mention JSON or internal logic.\n\n"
"- summarize(text)\n"
"- translate(lang, text)\n"
"- joke()\n"
"- weather(location)\n"
"- inspire()\n"
"- meme(text)\n"
"- poll_create(question, options)\n"
"- poll_vote(voter, choice)\n"
"- poll_results()\n"
"- poll_end()\n"
"- generate_image(prompt, count, width, height)\n"
"- send_text(message)\n\n"
"Return only raw JSON matching one of these shapes. For example:\n"
" {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n"
"Otherwise, use send_text to reply with plain chat and you should only return one json for the current message not for previous conversations.\n"
f"Conversation so far:\n{history_text}\n\n current message: User: {user_input}"
)
try:
raw = generate_llm(sys_prompt)
except LLMBadRequestError:
clear_history(chat_id, sender)
return SendTextIntent(action="send_text", message="Oops, let’s start fresh!")
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return SendTextIntent(action="send_text", message=raw)
for M in INTENT_MODELS:
try:
return M.model_validate(parsed)
except ValidationError:
continue
action = parsed.get("action")
if action in ACTION_HANDLERS:
data = parsed
kwargs = {}
if action == "generate_image":
kwargs = {
"prompt": data.get("prompt",""),
"count": int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)),
"width": data.get("width"),
"height": data.get("height"),
}
elif action == "send_text":
kwargs = {"message": data.get("message","")}
elif action == "translate":
kwargs = {"lang": data.get("lang",""), "text": data.get("text","")}
elif action == "summarize":
kwargs = {"text": data.get("text","")}
elif action == "weather":
kwargs = {"location": data.get("location","")}
elif action == "meme":
kwargs = {"text": data.get("text","")}
elif action == "poll_create":
kwargs = {"question": data.get("question",""), "options": data.get("options",[])}
elif action == "poll_vote":
kwargs = {"voter": sender, "choice": int(data.get("choice",0))}
try:
model = next(m for m in INTENT_MODELS if getattr(m, "__fields__", {}).get("action").default == action)
return model.model_validate({"action":action, **kwargs})
except Exception:
return SendTextIntent(action="send_text", message=raw)
return SendTextIntent(action="send_text", message=raw)
# --- Telegram Handlers ----------------------------------------------------
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("🌟 Eve is online! Type /help to see commands.")
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_markdown(
"🤖 *Eve* commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /inspire\n"
"• /meme <text>\n"
"• /poll <Q>|opt1|opt2 …\n"
"• /results\n"
"• /endpoll\n"
"• /gen <prompt>|<count>|<width>|<height>\n"
"Otherwise just chat with me."
)
async def message_router(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg: Message = update.message
chat_id = msg.chat.id
user_id = msg.from_user.id
mid = msg.message_id
text = msg.text or ""
set_thread_context(chat_id, user_id, mid)
record_user_message(chat_id, user_id, text)
low = text.lower().strip()
if low == "/help":
return await help_cmd(update, context)
if low.startswith("/summarize "):
return await _fn_summarize(mid, chat_id, text[11:].strip(), context)
if low.startswith("/translate "):
lang, txt = text[11:].split("|",1)
return await _fn_translate(mid, chat_id, lang.strip(), txt.strip(), context)
if low == "/joke":
return await _fn_joke(mid, chat_id, context)
if low.startswith("/weather "):
return await _fn_weather(mid, chat_id, text[9:].strip().replace(" ","+"), context)
if low == "/inspire":
return await _fn_inspire(mid, chat_id, context)
if low.startswith("/meme "):
return await _fn_meme(mid, chat_id, text[6:].strip(), context)
if low.startswith("/poll "):
parts = [p.strip() for p in text[6:].split("|")]
return await _fn_poll_create(mid, chat_id, parts[0], parts[1:], context)
if chat_id in polls and low.isdigit():
return await _fn_poll_vote(mid, chat_id, str(user_id), int(low), context)
if low == "/results":
return await _fn_poll_results(mid, chat_id, context)
if low == "/endpoll":
return await _fn_poll_end(mid, chat_id, context)
if low.startswith("/gen"):
parts = text[4:].split("|")
pr = parts[0].strip()
ct = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
width = int(parts[2]) if len(parts)>2 and parts[2].isdigit() else None
height = int(parts[3]) if len(parts)>3 and parts[3].isdigit() else None
task_queue.put({
"type":"image","message_id":mid,"chat_id":chat_id,
"prompt":pr,"count":ct,"width":width,"height":height
})
return
intent = route_intent(text, str(chat_id), str(user_id))
handler = ACTION_HANDLERS.get(intent.action)
kwargs = intent.model_dump(exclude={"action"})
await handler(mid, chat_id, **kwargs, context=context)
def main():
app = ApplicationBuilder().token(BotConfig.TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_cmd))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, message_router))
logger.info("Starting Telegram bot…")
app.run_polling()
if __name__ == "__main__":
main()