File size: 5,143 Bytes
618430a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# Ultroid - UserBot
# Copyright (C) 2021-2025 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>.
# https://github.com/xditya/TeleBot/blob/master/telebot/plugins/mybot/pmbot/incoming.py
# --------------------------------------- Imports -------------------------------------------- #
import os
from telethon.errors.rpcerrorlist import UserNotParticipantError
from telethon.tl.custom import Button
from telethon.tl.functions.channels import GetFullChannelRequest
from telethon.tl.functions.messages import GetFullChatRequest
from telethon.tl.types import Channel, Chat
from telethon.utils import get_display_name
from pyUltroid.dB.base import KeyManager
from pyUltroid.dB.botchat_db import *
from pyUltroid.fns.helper import inline_mention
from . import *
botb = KeyManager("BOTBLS", cast=list)
FSUB = udB.get_key("PMBOT_FSUB")
CACHE = {}
# --------------------------------------- Incoming -------------------------------------------- #
@asst_cmd(
load=AST_PLUGINS,
incoming=True,
func=lambda e: e.is_private and not botb.contains(e.sender_id),
)
async def on_new_mssg(event):
who = event.sender_id
# doesn't reply to that user anymore
if event.text.startswith("/") or who == OWNER_ID:
return
if FSUB:
MSG = ""
BTTS = []
for chat in FSUB:
try:
await event.client.get_permissions(chat, event.sender_id)
except UserNotParticipantError:
if not MSG:
MSG += get_string("pmbot_1")
try:
uri = ""
TAHC_ = await event.client.get_entity(chat)
if hasattr(TAHC_, "username") and TAHC_.username:
uri = f"t.me/{TAHC_.username}"
elif CACHE.get(chat):
uri = CACHE[chat]
else:
if isinstance(TAHC_, Channel):
FUGB = await event.client(GetFullChannelRequest(chat))
elif isinstance(TAHC_, Chat):
FUGB = await event.client(GetFullChatRequest(chat))
else:
return
if FUGB.full_chat.exported_invite:
CACHE[chat] = FUGB.full_chat.exported_invite.link
uri = CACHE[chat]
BTTS.append(Button.url(get_display_name(TAHC_), uri))
except Exception as er:
LOGS.exception(f"Error On PmBot Force Sub!\n - {chat} \n{er}")
if MSG and BTTS:
return await event.reply(MSG, buttons=BTTS)
xx = await event.forward_to(OWNER_ID)
if event.fwd_from:
await xx.reply(f"From {inline_mention(event.sender)} [`{event.sender_id}`]")
add_stuff(xx.id, who)
# --------------------------------------- Outgoing -------------------------------------------- #
@asst_cmd(
load=AST_PLUGINS,
from_users=[OWNER_ID],
incoming=True,
func=lambda e: e.is_private and e.is_reply,
)
async def on_out_mssg(event):
x = event.reply_to_msg_id
to_user = get_who(x)
if event.text.startswith("/who"):
try:
k = await asst.get_entity(to_user)
photu = await event.client.download_profile_photo(k.id)
await event.reply(
f"• **Name :** {get_display_name(k)}\n• **ID :** `{k.id}`\n• **Link :** {inline_mention(k)}",
file=photu,
)
if photu:
os.remove(photu)
return
except BaseException as er:
return await event.reply(f"**ERROR : **{str(er)}")
elif event.text.startswith("/"):
return
if to_user:
await asst.send_message(to_user, event.message)
# --------------------------------------- Ban/Unban -------------------------------------------- #
@asst_cmd(
pattern="ban",
load=AST_PLUGINS,
from_users=[OWNER_ID],
func=lambda x: x.is_private,
)
async def banhammer(event):
if not event.is_reply:
return await event.reply(get_string("pmbot_2"))
target = get_who(event.reply_to_msg_id)
if botb.contains(target):
return await event.reply(get_string("pmbot_3"))
botb.add(target)
await event.reply(f"#BAN\nUser : {target}")
await asst.send_message(target, get_string("pmbot_4"))
@asst_cmd(
pattern="unban",
load=AST_PLUGINS,
from_users=[OWNER_ID],
func=lambda x: x.is_private,
)
async def unbanhammer(event):
if not event.is_reply:
return await event.reply(get_string("pmbot_5"))
target = get_who(event.reply_to_msg_id)
if not botb.contains(target):
return await event.reply(get_string("pmbot_6"))
botb.remove(target)
await event.reply(f"#UNBAN\nUser : {target}")
await asst.send_message(target, get_string("pmbot_7"))
# --------------------------------------- END -------------------------------------------- #
|