File size: 5,208 Bytes
21bc372 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2020-2024 (c) Randy @xtdevs
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import asyncio
from asyncio import *
from random import *
from pyrogram import *
from pyrogram.types import *
from pyrogram.errors import *
from pyrogram import Client as ren
from akn import log_detailed_error
from akn.utils.handler import *
from akn.utils.logger import LOGS
from akn.utils.prefixprem import command
CLONE_STORAGE = {}
@Akeno(
~filters.scheduled
& command(["clone"])
& filters.me
& ~filters.forwarded
)
async def user_clone(client: Client, message: Message):
global CLONE_STORAGE
reply_messages = message.reply_to_message
if not reply_messages:
return await message.reply_text("Please reply to a message.")
if not reply_messages.from_user:
return await message.reply_text("Invalid user.")
user_id = reply_messages.from_user.id if reply_messages.from_user else "@" + reply_messages.from_user.username
try:
me_user = await client.get_chat("me")
me_user_two = await client.get_users("me")
user = await client.get_chat(user_id)
user_two = await client.get_users(user_id)
except Exception as e:
return await message.reply_text(f"Error: {e}")
me_bio = me_user.bio if me_user else ""
me_first_name = me_user.first_name if me_user else ""
me_last_name = me_user.last_name if me_user else ""
user_bio = user.bio if user else ""
user_first_name = user.first_name if user else ""
user_photo_file_id = user.photo.big_file_id if user.photo else None
me_photo_file_id = None
try:
async for file in client.get_chat_photos(client.me.id, limit=1):
me_photo_file_id = file.file_id if file else None
except RPCError as rpc_error:
return await message.reply_text(f"RPCError: {rpc_error}")
except Exception as e:
return await message.reply_text(f"Error: {e}")
try:
CLONE_STORAGE[me_user.id] = {
"first_name": me_first_name,
"last_name": me_last_name,
"profile_id": me_photo_file_id,
"bio_data": me_bio
}
set_profile = None
if user_photo_file_id:
set_profile = await client.download_media(user_photo_file_id)
if set_profile:
await client.set_profile_photo(photo=set_profile)
if user_first_name and user_bio:
await client.update_profile(first_name=user_first_name, last_name="", bio=user_bio)
await message.reply_text("Successfully steal and set to your account!")
if set_profile:
os.remove(set_profile)
except AboutTooLong:
return await message.reply_text("The provided about/bio text is too long")
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
await message.reply_text(f"Error: {e}")
@Akeno(
~filters.scheduled
& command(["revert"])
& filters.me
& ~filters.forwarded
)
async def user_revert(client: Client, message: Message):
user = await client.get_users("me")
CLONE_STORAGE[user.id] = {
"first_name": CLONE_STORAGE[user.id]["first_name"],
"last_name": CLONE_STORAGE[user.id]["last_name"],
"profile_id": CLONE_STORAGE[user.id]["profile_id"],
"bio_data": CLONE_STORAGE[user.id]["bio_data"],
}
try:
clone_data = CLONE_STORAGE.get(user.id)
photos = [p async for p in client.get_chat_photos("me")]
if photos:
await client.delete_profile_photos(photos[0].file_id)
if clone_data["first_name"]:
await client.update_profile(
first_name=clone_data["first_name"],
last_name=clone_data["last_name"],
bio=clone_data["bio_data"]
)
else:
return await message.reply_text("User doesn't have a profile bio and last name")
await message.reply_text("Successfully reverted back to your account!")
del CLONE_STORAGE[user.id]
except RPCError as rpc_error:
await log_detailed_error(rpc_error, where=client.me.id, who=message.chat.title)
await message.reply_text(f"RPCError: {rpc_error}")
except Exception as e:
await log_detailed_error(e, where=client.me.id, who=message.chat.title)
await message.reply_text(f"Error: {e}")
module = modules_help.add_module("clone", __file__)
module.add_command("clone", "to user clone for telegram")
module.add_command("revert", "to user reverse for telegram") |