azils3's picture
Upload 63 files
16bfc87 verified
from typing import Union, Optional, Dict, List
import telegram
from telegram import MessageEntity, InlineKeyboardButton, InlineKeyboardMarkup
from dtb.settings import TELEGRAM_TOKEN
from users.models import User
def from_celery_markup_to_markup(celery_markup: Optional[List[List[Dict]]]) -> Optional[InlineKeyboardMarkup]:
markup = None
if celery_markup:
markup = []
for row_of_buttons in celery_markup:
row = []
for button in row_of_buttons:
row.append(
InlineKeyboardButton(
text=button['text'],
callback_data=button.get('callback_data'),
url=button.get('url'),
)
)
markup.append(row)
markup = InlineKeyboardMarkup(markup)
return markup
def from_celery_entities_to_entities(celery_entities: Optional[List[Dict]] = None) -> Optional[List[MessageEntity]]:
entities = None
if celery_entities:
entities = [
MessageEntity(
type=entity['type'],
offset=entity['offset'],
length=entity['length'],
url=entity.get('url'),
language=entity.get('language'),
)
for entity in celery_entities
]
return entities
def send_one_message(
user_id: Union[str, int],
text: str,
parse_mode: Optional[str] = telegram.ParseMode.HTML,
reply_markup: Optional[List[List[Dict]]] = None,
reply_to_message_id: Optional[int] = None,
disable_web_page_preview: Optional[bool] = None,
entities: Optional[List[MessageEntity]] = None,
tg_token: str = TELEGRAM_TOKEN,
) -> bool:
bot = telegram.Bot(tg_token)
try:
m = bot.send_message(
chat_id=user_id,
text=text,
parse_mode=parse_mode,
reply_markup=reply_markup,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=disable_web_page_preview,
entities=entities,
)
except telegram.error.Unauthorized:
print(f"Can't send message to {user_id}. Reason: Bot was stopped.")
User.objects.filter(user_id=user_id).update(is_blocked_bot=True)
success = False
else:
success = True
User.objects.filter(user_id=user_id).update(is_blocked_bot=False)
return success