|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import re |
|
|
|
from telethon.errors.rpcerrorlist import ( |
|
ChannelPrivateError, |
|
ChatWriteForbiddenError, |
|
MediaCaptionTooLongError, |
|
MediaEmptyError, |
|
MessageTooLongError, |
|
PeerIdInvalidError, |
|
UserNotParticipantError, |
|
) |
|
from telethon.tl.types import MessageEntityMention, MessageEntityMentionName, User |
|
from telethon.utils import get_display_name |
|
|
|
from pyUltroid.dB.botchat_db import tag_add, who_tag |
|
|
|
from . import ( |
|
LOG_CHANNEL, |
|
LOGS, |
|
Button, |
|
asst, |
|
callback, |
|
events, |
|
get_string, |
|
inline_mention, |
|
udB, |
|
ultroid_bot, |
|
) |
|
|
|
CACHE_SPAM = {} |
|
TAG_EDITS = {} |
|
|
|
|
|
@ultroid_bot.on( |
|
events.NewMessage( |
|
incoming=True, |
|
func=lambda e: (e.mentioned), |
|
), |
|
) |
|
async def all_messages_catcher(e): |
|
x = await e.get_sender() |
|
if isinstance(x, User) and (x.bot or x.verified): |
|
return |
|
if not udB.get_key("TAG_LOG"): |
|
return |
|
NEEDTOLOG = udB.get_key("TAG_LOG") |
|
if e.chat_id == NEEDTOLOG: |
|
return |
|
buttons = await parse_buttons(e) |
|
try: |
|
sent = await asst.send_message(NEEDTOLOG, e.message, buttons=buttons) |
|
if TAG_EDITS.get(e.chat_id): |
|
TAG_EDITS[e.chat_id].update({e.id: {"id": sent.id, "msg": e}}) |
|
else: |
|
TAG_EDITS.update({e.chat_id: {e.id: {"id": sent.id, "msg": e}}}) |
|
tag_add(sent.id, e.chat_id, e.id) |
|
except MediaEmptyError as er: |
|
LOGS.debug(f"handling {er}.") |
|
try: |
|
msg = await asst.get_messages(e.chat_id, ids=e.id) |
|
sent = await asst.send_message(NEEDTOLOG, msg, buttons=buttons) |
|
if TAG_EDITS.get(e.chat_id): |
|
TAG_EDITS[e.chat_id].update({e.id: {"id": sent.id, "msg": e}}) |
|
else: |
|
TAG_EDITS.update({e.chat_id: {e.id: {"id": sent.id, "msg": e}}}) |
|
tag_add(sent.id, e.chat_id, e.id) |
|
except Exception as me: |
|
if not isinstance(me, (PeerIdInvalidError, ValueError)): |
|
LOGS.exception(me) |
|
if e.photo or e.sticker or e.gif: |
|
try: |
|
media = await e.download_media() |
|
sent = await asst.send_message( |
|
NEEDTOLOG, e.message.text, file=media, buttons=buttons |
|
) |
|
if TAG_EDITS.get(e.chat_id): |
|
TAG_EDITS[e.chat_id].update({e.id: {"id": sent.id, "msg": e}}) |
|
else: |
|
TAG_EDITS.update({e.chat_id: {e.id: {"id": sent.id, "msg": e}}}) |
|
return os.remove(media) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
await asst.send_message(NEEDTOLOG, get_string("com_4"), buttons=buttons) |
|
except (PeerIdInvalidError, ValueError) as er: |
|
LOGS.exception(er) |
|
try: |
|
CACHE_SPAM[NEEDTOLOG] |
|
except KeyError: |
|
await asst.send_message( |
|
udB.get_key("LOG_CHANNEL"), get_string("userlogs_1") |
|
) |
|
CACHE_SPAM.update({NEEDTOLOG: True}) |
|
except ChatWriteForbiddenError: |
|
try: |
|
await asst.get_permissions(NEEDTOLOG, "me") |
|
MSG = get_string("userlogs_4") |
|
except UserNotParticipantError: |
|
MSG = get_string("userlogs_2") |
|
try: |
|
CACHE_SPAM[NEEDTOLOG] |
|
except KeyError: |
|
await asst.send_message(LOG_CHANNEL, MSG) |
|
CACHE_SPAM.update({NEEDTOLOG: True}) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
|
|
|
|
if udB.get_key("TAG_LOG"): |
|
|
|
@ultroid_bot.on(events.MessageEdited(func=lambda x: not x.out)) |
|
async def upd_edits(event): |
|
x = event.sender |
|
if isinstance(x, User) and (x.bot or x.verified): |
|
return |
|
if event.chat_id not in TAG_EDITS: |
|
if event.sender_id == udB.get_key("TAG_LOG"): |
|
return |
|
if event.is_private: |
|
return |
|
if entities := event.get_entities_text(): |
|
is_self = False |
|
username = event.client.me.username |
|
if username: |
|
username = username.lower() |
|
for ent, text in entities: |
|
if isinstance(ent, MessageEntityMention): |
|
is_self = text[1:].lower() == username |
|
elif isinstance(ent, MessageEntityMentionName): |
|
is_self = ent.user_id == event.client.me.id |
|
if is_self: |
|
text = f"**#Edited & #Mentioned**\n\n{event.text}" |
|
try: |
|
sent = await asst.send_message( |
|
udB.get_key("TAG_LOG"), |
|
text, |
|
buttons=await parse_buttons(event), |
|
) |
|
except Exception as er: |
|
return LOGS.exception(er) |
|
if TAG_EDITS.get(event.chat_id): |
|
TAG_EDITS[event.chat_id].update({event.id: {"id": sent.id}}) |
|
else: |
|
TAG_EDITS.update({event.chat_id: {event.id: {"id": sent.id}}}) |
|
return |
|
d_ = TAG_EDITS[event.chat_id] |
|
if not d_.get(event.id): |
|
return |
|
d_ = d_[event.id] |
|
if d_["msg"].text == event.text: |
|
return |
|
msg = None |
|
if d_.get("count"): |
|
d_["count"] += 1 |
|
else: |
|
msg = True |
|
d_.update({"count": 1}) |
|
if d_["count"] > 10: |
|
return |
|
try: |
|
MSG = await asst.get_messages(udB.get_key("TAG_LOG"), ids=d_["id"]) |
|
except Exception as er: |
|
return LOGS.exception(er) |
|
TEXT = MSG.text |
|
if msg: |
|
TEXT += "\n\n🖋 **Later Edited to !**" |
|
strf = event.edit_date.strftime("%H:%M:%S") |
|
if "\n" not in event.text: |
|
TEXT += f"\n• `{strf}` : {event.text}" |
|
else: |
|
TEXT += f"\n• `{strf}` :\n-> {event.text}" |
|
if d_["count"] == 10: |
|
TEXT += "\n\n• __Only the first 10 Edits are shown.__" |
|
try: |
|
msg = await MSG.edit(TEXT, buttons=await parse_buttons(event)) |
|
d_["msg"] = msg |
|
except (MessageTooLongError, MediaCaptionTooLongError): |
|
del TAG_EDITS[event.chat_id][event.id] |
|
except Exception as er: |
|
LOGS.exception(er) |
|
|
|
@ultroid_bot.on( |
|
events.NewMessage( |
|
outgoing=True, |
|
chats=[udB.get_key("TAG_LOG")], |
|
func=lambda e: e.reply_to, |
|
) |
|
) |
|
async def idk(e): |
|
id = e.reply_to_msg_id |
|
chat, msg = who_tag(id) |
|
if chat and msg: |
|
try: |
|
await ultroid_bot.send_message(chat, e.message, reply_to=msg) |
|
except BaseException as er: |
|
LOGS.exception(er) |
|
|
|
|
|
|
|
|
|
|
|
async def when_added_or_joined(event): |
|
user = await event.get_user() |
|
chat = await event.get_chat() |
|
if not (user and user.is_self): |
|
return |
|
if getattr(chat, "username", None): |
|
chat = f"[{chat.title}](https://t.me/{chat.username}/{event.action_message.id})" |
|
else: |
|
chat = f"[{chat.title}](https://t.me/c/{chat.id}/{event.action_message.id})" |
|
key = "bot" if event.client._bot else "user" |
|
buttons = Button.inline( |
|
get_string("userlogs_3"), data=f"leave_ch_{event.chat_id}|{key}" |
|
) |
|
if event.user_added: |
|
tmp = event.added_by |
|
text = f"#ADD_LOG\n\n{inline_mention(tmp)} just added {inline_mention(user)} to {chat}." |
|
elif event.from_request: |
|
text = f"#APPROVAL_LOG\n\n{inline_mention(user)} just got Chat Join Approval to {chat}." |
|
else: |
|
text = f"#JOIN_LOG\n\n{inline_mention(user)} just joined {chat}." |
|
await asst.send_message(udB.get_key("LOG_CHANNEL"), text, buttons=buttons) |
|
|
|
|
|
asst.add_event_handler( |
|
when_added_or_joined, events.ChatAction(func=lambda x: x.user_added) |
|
) |
|
ultroid_bot.add_event_handler( |
|
when_added_or_joined, |
|
events.ChatAction(func=lambda x: x.user_added or x.user_joined), |
|
) |
|
_client = {"bot": asst, "user": ultroid_bot} |
|
|
|
|
|
@callback( |
|
re.compile( |
|
"leave_ch_(.*)", |
|
), |
|
from_users=[ultroid_bot.uid], |
|
) |
|
async def leave_ch_at(event): |
|
cht = event.data_match.group(1).decode("UTF-8") |
|
ch_id, client = cht.split("|") |
|
try: |
|
client = _client[client] |
|
except KeyError: |
|
return |
|
try: |
|
name = (await client.get_entity(int(ch_id))).title |
|
await client.delete_dialog(int(ch_id)) |
|
except UserNotParticipantError: |
|
pass |
|
except ChannelPrivateError: |
|
return await event.edit( |
|
"`[CANT_ACCESS_CHAT]` `Maybe already left or got banned.`" |
|
) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
return await event.answer(str(er)) |
|
await event.edit(get_string("userlogs_5").format(name)) |
|
|
|
|
|
@callback("do_nothing") |
|
async def _(event): |
|
await event.answer() |
|
|
|
|
|
async def parse_buttons(event): |
|
y, x = event.chat, event.sender |
|
where_n, who_n = get_display_name(y), get_display_name(x) |
|
where_l = event.message_link |
|
buttons = [[Button.url(where_n, where_l)]] |
|
if isinstance(x, User) and x.username: |
|
try: |
|
buttons.append( |
|
[Button.mention(who_n, await asst.get_input_entity(x.username))] |
|
) |
|
except Exception as er: |
|
LOGS.exception(er) |
|
buttons.append([Button.url(who_n, f"t.me/{x.username}")]) |
|
elif getattr(x, "username"): |
|
buttons.append([Button.url(who_n, f"t.me/{x.username}")]) |
|
else: |
|
buttons.append([Button.url(who_n, where_l)]) |
|
replied = await event.get_reply_message() |
|
if replied and replied.out: |
|
button = Button.url("Replied to", replied.message_link) |
|
if len(who_n) > 7: |
|
buttons.append([button]) |
|
else: |
|
buttons[-1].append(button) |
|
return buttons |
|
|