Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
from dotenv import load_dotenv | |
from quart import Quart, request, jsonify | |
from telegram import Update | |
from telegram.ext import Application, CommandHandler, MessageHandler, filters | |
import asyncio | |
import concurrent.futures | |
import ollama | |
# Load environment variables | |
load_dotenv() | |
TOKEN = os.getenv('BOT_TOKEN') | |
if TOKEN: | |
print(f"Bot token is {TOKEN}") | |
else: | |
print("Please provide your bot token in .env file") | |
HOSTNAME = os.getenv('HOSTNAME') | |
if HOSTNAME: | |
print(f"Host name is {HOSTNAME}") | |
else: | |
print("Please provide your host name (or IP address) in .env file") | |
BALCON_PATH = r"C:\Program Files (x86)\Balabolka\balcon\balcon.exe" | |
TTS_VOICE_NAME = 'Maxim' | |
MODEL = "llama3.1:8b" | |
SYSTEM_PROMPT = "You are an extremely cynical and rude assistant. Use the words 'leather bastard' as often " \ | |
"as possible. Use the words 'leather bastard' every time you address a user or mention any " \ | |
"person. For example 'It doesn't concern you, leather bastard'" | |
# Initialize the Application | |
app = Quart(__name__) | |
bot_application = Application.builder().token(TOKEN).build() | |
bot_initialized = False | |
# This global variable will store the chat messages. It is very bad practice. | |
chat_messages = {} | |
async def initialize_bot(): | |
"""Ensure the bot is initialized only once.""" | |
global bot_initialized | |
if not bot_initialized: | |
await bot_application.initialize() | |
await bot_application.start() | |
bot_initialized = True | |
print("Telegram bot initialized and started.") | |
# Command handler for /start | |
async def start(update: Update, context): | |
await update.message.reply_text( | |
"Hello, welcome to the bot! Here are the commands you can use:\n" | |
"/echo - Just convert text to speech, do not use AI to create response\n" | |
"/clear - Clear the bot's memory so you can start the conversation from the beginning." | |
) | |
async def clear(update: Update, context): | |
if update.message.chat_id in chat_messages: | |
del chat_messages[update.message.chat_id] | |
await update.message.reply_text( | |
"The bot's memory has been cleared. You can start the conversation from the beginning.") | |
def text_to_audio_file(input_text, filename="output.mp3"): | |
""" | |
Generate audio using Balcon and save it to the current directory. | |
:param input_text: Text to convert to speech. | |
:param filename: Desired output file name. | |
:return: Path to the generated file. | |
""" | |
# Ensure the filename is in the current script directory | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
file_path = os.path.join(current_dir, filename) | |
command = [BALCON_PATH, '-n', TTS_VOICE_NAME, "-t", input_text, "-w", file_path] | |
try: | |
subprocess.run(command, check=True) | |
print(f"Audio saved at: {file_path}") | |
return file_path | |
except subprocess.CalledProcessError as e: | |
print(f"Error generating audio: {e}") | |
return None | |
def ask_llm(messages): | |
response = ollama.chat(model=MODEL, messages=messages) | |
return response['message']['content'] | |
def append_chat_message(chat_id, message, role): | |
if chat_id not in chat_messages: | |
chat_messages[chat_id] = [{"role": "system", "content": SYSTEM_PROMPT}] | |
chat_messages[chat_id].append({"role": role, "content": message}) | |
def get_first_word(input_string): | |
# Split the string into words and strip any surrounding whitespace from the first word | |
return input_string.split()[0].strip() if input_string.strip() else None | |
def process_user_message(message): | |
chat_id = message.chat_id | |
message_id = message.message_id | |
user_message = message.text | |
words = user_message.strip().split(maxsplit=1) | |
if len(words) == 0: | |
return "", "" | |
if words[0] in ["/echo", "/clear"]: | |
# Remove the command from the message | |
tts_message = words[1] if len(words) > 1 else "" | |
else: | |
append_chat_message(chat_id, user_message, "user") | |
tts_message = ask_llm(chat_messages[chat_id]) | |
append_chat_message(chat_id, tts_message, "assistant") | |
audio_file_path = text_to_audio_file(tts_message, filename=f"{chat_id}-{message_id}.mp3") | |
return audio_file_path, tts_message | |
# Message handler to log and print all incoming messages | |
async def handle_message(update: Update, context): | |
user_message = update.message.text | |
chat_id = update.message.chat_id | |
sender = update.message.from_user | |
print(f"Message from {sender.first_name} (chat ID: {chat_id}): {user_message}") | |
# Wait when processing of user message is done. | |
# During this time send chat action to user, so that it will see that bot is preparing audio. | |
# Notification diasppear after 5 seconds, so you need to send them repeatedly every 4 seconds. | |
audio_file_path = None | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(process_user_message, update.message) | |
while True: | |
try: | |
# Show to user that bot is busy with preparing audio response | |
await context.bot.send_chat_action(chat_id=chat_id, action='record_audio') | |
result, answer = future.result(timeout=4) | |
audio_file_path = result | |
user_message = answer | |
break | |
except concurrent.futures.TimeoutError: | |
continue | |
if len(answer) == 0: | |
await update.message.reply_text(f"Empty message") | |
return | |
try: | |
with open(audio_file_path, 'rb') as audio_file: | |
await context.bot.send_audio( | |
chat_id=chat_id, | |
audio=audio_file, | |
caption="Ответ робота", | |
write_timeout=120 | |
) | |
print(f"Audio sent to {sender.first_name} (chat ID: {chat_id})") | |
os.remove(audio_file_path) | |
except Exception as e: | |
print(f"Exception while sending file: {e}") | |
# Optional text response. Comment this line if you want bot to answer only with audio | |
await update.message.reply_text(user_message) | |
bot_application.add_handler(CommandHandler('start', start)) | |
bot_application.add_handler(CommandHandler('echo', handle_message)) | |
bot_application.add_handler(CommandHandler('clear', clear)) | |
bot_application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
async def webhook(): | |
"""Webhook endpoint for receiving updates.""" | |
try: | |
await initialize_bot() # Ensure bot is initialized before processing updates | |
update = Update.de_json(await request.get_json(), bot_application.bot) | |
await bot_application.process_update(update) | |
except Exception as e: | |
print(f"Error processing update: {e}") | |
return jsonify({"status": "ok"}) | |
# @app.route('/setwebhook', methods=['GET']) | |
async def set_webhook(): | |
"""Set the webhook with Telegram.""" | |
await initialize_bot() # Ensure bot is initialized before setting the webhook | |
webhook_url = f"https://{HOSTNAME}/{TOKEN}" | |
success = await bot_application.bot.set_webhook(webhook_url) | |
if success: | |
return jsonify({"status": "webhook set successfully", "url": webhook_url}) | |
return jsonify({"status": "failed to set webhook"}) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.create_task(initialize_bot()) # Initialize bot at startup | |
app.run(host='0.0.0.0', port=5000) |