|
""" |
|
Celery tasks. Some of them will be launched periodically from admin panel via django-celery-beat |
|
""" |
|
|
|
import time |
|
from typing import Union, List, Optional, Dict |
|
|
|
import telegram |
|
|
|
from dtb.celery import app |
|
from celery.utils.log import get_task_logger |
|
from tgbot.handlers.broadcast_message.utils import send_one_message, from_celery_entities_to_entities, \ |
|
from_celery_markup_to_markup |
|
|
|
logger = get_task_logger(__name__) |
|
|
|
|
|
@app.task(ignore_result=True) |
|
def broadcast_message( |
|
user_ids: List[Union[str, int]], |
|
text: str, |
|
entities: Optional[List[Dict]] = None, |
|
reply_markup: Optional[List[List[Dict]]] = None, |
|
sleep_between: float = 0.4, |
|
parse_mode=telegram.ParseMode.HTML, |
|
) -> None: |
|
""" It's used to broadcast message to big amount of users """ |
|
logger.info(f"Going to send message: '{text}' to {len(user_ids)} users") |
|
|
|
entities_ = from_celery_entities_to_entities(entities) |
|
reply_markup_ = from_celery_markup_to_markup(reply_markup) |
|
for user_id in user_ids: |
|
try: |
|
send_one_message( |
|
user_id=user_id, |
|
text=text, |
|
entities=entities_, |
|
parse_mode=parse_mode, |
|
reply_markup=reply_markup_, |
|
) |
|
logger.info(f"Broadcast message was sent to {user_id}") |
|
except Exception as e: |
|
logger.error(f"Failed to send message to {user_id}, reason: {e}") |
|
time.sleep(max(sleep_between, 0.1)) |
|
|
|
logger.info("Broadcast finished!") |
|
|
|
|
|
|