""" Celery tasks. Some of them will be launched periodically from admin panel via django-celery-beat """ import time from typing import Union, List, Optional, Dict import telegram from dtb.celery import app from celery.utils.log import get_task_logger from tgbot.handlers.broadcast_message.utils import send_one_message, from_celery_entities_to_entities, \ from_celery_markup_to_markup logger = get_task_logger(__name__) @app.task(ignore_result=True) def broadcast_message( user_ids: List[Union[str, int]], text: str, entities: Optional[List[Dict]] = None, reply_markup: Optional[List[List[Dict]]] = None, sleep_between: float = 0.4, parse_mode=telegram.ParseMode.HTML, ) -> None: """ It's used to broadcast message to big amount of users """ logger.info(f"Going to send message: '{text}' to {len(user_ids)} users") entities_ = from_celery_entities_to_entities(entities) reply_markup_ = from_celery_markup_to_markup(reply_markup) for user_id in user_ids: try: send_one_message( user_id=user_id, text=text, entities=entities_, parse_mode=parse_mode, reply_markup=reply_markup_, ) logger.info(f"Broadcast message was sent to {user_id}") except Exception as e: logger.error(f"Failed to send message to {user_id}, reason: {e}") time.sleep(max(sleep_between, 0.1)) logger.info("Broadcast finished!")