|
import requests |
|
import secrets |
|
import random |
|
import re |
|
import string |
|
import json |
|
import urllib |
|
import datetime |
|
import time |
|
from base64 import b64decode as m |
|
from random import choice |
|
from settings import languages |
|
from pyrogram import Client as ren |
|
from pyrogram import * |
|
from pyrogram import errors |
|
from pyrogram import Client, filters |
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message |
|
from pyrogram.errors import ChatAdminRequired, UserNotParticipant, ChatWriteForbidden |
|
from pyrogram.enums import * |
|
from pyrogram.enums.parse_mode import ParseMode |
|
from pyrogram.types import * |
|
from pyrogram.errors import ChatAdminRequired |
|
from pymongo import MongoClient |
|
from config import MONGO_URL |
|
|
|
client_mongo = MongoClient(MONGO_URL) |
|
|
|
db = client_mongo["tiktokbot"] |
|
db_akeno = client_mongo["Akeno"] |
|
collection = db["users"] |
|
collection_magic = db["magicbot"] |
|
collection_gemini = db["geminibot"] |
|
collection_youtube = db["youtubebot"] |
|
collection_sessionbot = db["sessionbot"] |
|
collection_captcha = db["captchabot"] |
|
collection_alldlbot = db["alldlbot"] |
|
collection_string = db_akeno["session"] |
|
|
|
unmute_permissions = ChatPermissions( |
|
can_send_messages=True, |
|
can_send_media_messages=True, |
|
can_send_polls=True, |
|
can_change_info=False, |
|
can_invite_users=True, |
|
can_pin_messages=False, |
|
can_add_web_page_previews=False |
|
) |
|
|
|
RAMDOM_STATUS = [ |
|
"civilian", |
|
"wanted", |
|
"undercover", |
|
"rogue_agent", |
|
"innocent", |
|
"fugitive", |
|
"covert_operator" |
|
] |
|
|
|
CONFIRM_USERBOT = """ |
|
β’ <b>Terms of Service</b> |
|
- By <b>[Ryuzaki AI Dev]({})</b> for userbot, you accept our [Privacy Policy]({}) and agree not to: |
|
- You can get free unlimited userbot by RandyDev |
|
""" |
|
|
|
CONFIRM_MAGIC_BOT = """ |
|
- You can get free unlimited magic bot by RandyDev |
|
""" |
|
|
|
CONFIRM_USERS = """ |
|
β’ <b>Terms of Service</b> |
|
- By using <b>[API Akeno AI Dev]({channel})</b>, you accept our [Privacy Policy]({policy}) and agree not to: |
|
- Use brute force attacks or exploit any security vulnerabilities. |
|
- Misuse the API for illegal activities or harmful content. |
|
- Share your API key with unauthorized users or third parties. |
|
- Engage in any activity that may harm the service's stability or performance. |
|
- Exceed the rate limits defined in your plan or attempt to bypass these limits. |
|
|
|
- Failure to comply with these terms may result in immediate suspension of your API access without notice. |
|
- For further details, please review our full [Terms of Service]({tos}) and [Privacy Policy]({policy}). |
|
|
|
- If you have any questions, feel free to reach out via our support channel. |
|
""" |
|
|
|
KONTOLE_ASK = """ |
|
A new account must be used for a few days/weeks |
|
and don't immediately join many channels/groups |
|
have to interact with people |
|
|
|
β’ You can use the game again to earn coins and reach 500000 coins to unlock the Pro version. |
|
""" |
|
|
|
PAYMENT_SUCCESS = """ |
|
PAYMENT SUPPORT |
|
|
|
β
Diterima, terima kasih! Pembayaran Anda akan diproses sesegera mungkin, |
|
harap tunggu.... |
|
|
|
π /start |
|
""" |
|
|
|
TIKTOKPREM_SUCCESS = """ |
|
β
Thank you for purchasing TiktokUbot UltraPremium! |
|
|
|
π€ UltraPremium - {} Months Expires on {} |
|
|
|
β οΈ To ACTIVATE your Userbot send /createbot |
|
""" |
|
|
|
def get_datetime() -> str: |
|
return datetime.datetime.now().strftime("%d/%m/%Y - %H:%M") |
|
|
|
def is_session(user_id: int) -> bool: |
|
if collection_string.find_one({"user_id": user_id}): |
|
return True |
|
return False |
|
|
|
|
|
def update_session( |
|
user_id: int, |
|
api_id: int, |
|
api_hash: str, |
|
session: str |
|
) -> None: |
|
collection_string.update_one( |
|
{"user_id": user_id}, |
|
{"$set": { |
|
"session": session, |
|
"api_id": api_id, |
|
"api_hash": api_hash, |
|
"date": get_datetime()} |
|
}, |
|
upsert=True, |
|
) |
|
|
|
|
|
def rm_session(user_id: int) -> None: |
|
collection_string.delete_one({"user_id": user_id}) |
|
|
|
def get_session(user_id: int): |
|
if not is_session(user_id): |
|
return False |
|
data = collection_string.find_one({"user_id": user_id}) |
|
return data |
|
|
|
def get_all_sessions() -> list: |
|
return [i for i in collection_string.find({})] |
|
|
|
def generate_promo_code(length=8): |
|
"""Generate a random promo code""" |
|
characters = string.ascii_uppercase + string.digits |
|
promo_code = ''.join(random.choice(characters) for i in range(length)) |
|
return promo_code |
|
|
|
def create_promo_code(tokens, max_uses=1): |
|
promo_code = generate_promo_code() |
|
collection.insert_one({ |
|
"promo_code": promo_code, |
|
"tokens": tokens, |
|
"max_uses": max_uses, |
|
"uses": 0 |
|
}) |
|
return promo_code |
|
|
|
def validate_promo_code(promo_code): |
|
promo = collection.find_one({"promo_code": promo_code}) |
|
if promo and promo.get("uses", 0) < promo.get("max_uses", 0): |
|
return promo |
|
return None |
|
|
|
def apply_promo_code(user_id, promo_code): |
|
promo = validate_promo_code(promo_code) |
|
if promo: |
|
collection.update_one({"user_id": user_id}, {"$inc": {"tokens": promo["tokens"]}}) |
|
collection.update_one({"promo_code": promo_code}, {"$inc": {"uses": 1}}) |
|
return True |
|
return False |
|
|
|
|
|
def add_user_invite(user_id, referrer_id=None): |
|
collection.insert_one({ |
|
"user_id": user_id, |
|
"tokens": 0, |
|
"invites": 0, |
|
"referrer_id": referrer_id |
|
}) |
|
|
|
def get_user_invite(user_id): |
|
return collection.find_one({"user_id": user_id}) |
|
|
|
def generate_referral_link(user_id): |
|
return f"https://t.me/randydev_bot?start=ref={user_id}" |
|
|
|
def get_user_tokens_gpt(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if not user: |
|
return 0 |
|
return user.get("tokens", 0) |
|
|
|
def deduct_tokens_gpt(user_id, amount): |
|
tokens = get_user_tokens_gpt(user_id) |
|
if tokens >= amount: |
|
collection.update_one( |
|
{"user_id": user_id}, |
|
{"$inc": {"tokens": -amount}} |
|
) |
|
return True |
|
else: |
|
return False |
|
|
|
def add_tokens_gpt(user_id, amount): |
|
collection.update_one( |
|
{"user_id": user_id}, |
|
{"$inc": {"tokens": amount}}, |
|
upsert=True |
|
) |
|
|
|
def set_chat_upgrade( |
|
chat_id, |
|
chat_title, |
|
chat_link, |
|
user_id, |
|
date_joined |
|
): |
|
upgrade_data = { |
|
"user_id": user_id, |
|
"chat_title": chat_title, |
|
"chat_link": chat_link, |
|
"date_joined": date_joined |
|
} |
|
return collection.update_one({"chat_ids": chat_id}, {"$set": upgrade_data}, upsert=True) |
|
|
|
def set_model_upgrade( |
|
chat_id, |
|
user_id, |
|
model_name, |
|
chat_link, |
|
|
|
): |
|
upgrade_data = { |
|
"user_id": user_id, |
|
"model_name": model_name, |
|
"chat_link": chat_link |
|
} |
|
return collection.update_one({"chat_ids": chat_id}, {"$set": upgrade_data}, upsert=True) |
|
|
|
def set_model_name(chat_id, model_name): |
|
upgrade_data = { |
|
"model_name": model_name, |
|
} |
|
return collection.update_one({"chat_ids": chat_id}, {"$set": upgrade_data}, upsert=True) |
|
|
|
def get_model_name(chat_id): |
|
user_data = collection.find_one({"chat_ids": chat_id}) |
|
return user_data.get("model_name") if user_data else None |
|
|
|
def set_system_prompt(chat_id, prompt): |
|
upgrade_data = { |
|
"set_prompt": prompt, |
|
} |
|
return collection.update_one({"chat_ids": chat_id}, {"$set": upgrade_data}, upsert=True) |
|
|
|
def get_system_prompt(chat_id): |
|
user_data = collection.find_one({"chat_ids": chat_id}) |
|
return user_data.get("set_prompt") if user_data else None |
|
|
|
def set_number_otp(user_id, otp_id, link): |
|
upgrade_data = { |
|
"otp_id": otp_id, |
|
"otp_link": link |
|
} |
|
return collection.update_one({"user_id": user_id}, {"$set": upgrade_data}, upsert=True) |
|
|
|
def get_numbers_otps(user_id): |
|
user_data = collection.find_one({"user_id": user_id}) |
|
if user_data: |
|
otp_id = user_data.get("otp_id") |
|
otp_link = user_data.get("otp_link") |
|
return otp_id, otp_link |
|
else: |
|
return None |
|
|
|
def _clear_history_from_gemini(user_id): |
|
unset_clear = {"gemini_chat": None} |
|
return collection.update_one({"user_id": user_id}, {"$unset": unset_clear}) |
|
|
|
def clear_model_all(chat_id): |
|
unset_clear = {"model_name": None} |
|
return collection.update_one({"chat_ids": chat_id}, {"$unset": unset_clear}) |
|
|
|
def unset_chat_upgrade(chat_id): |
|
unset_data = {"user_id": None} |
|
return collection.update_one({"chat_ids": chat_id}, {"$unset": unset_data}) |
|
|
|
def get_chat_upgrade(chat_id): |
|
user_data = collection.find_one({"chat_ids": chat_id}) |
|
return user_data.get("user_id") if user_data else None |
|
|
|
def get_model_chat(chat_id): |
|
user_data = collection.find_one({"chat_ids": chat_id}) |
|
if user_data: |
|
user_id = user_data.get("user_id") |
|
model_name = user_data.get("model_name") |
|
return user_id, model_name |
|
else: |
|
return None, None |
|
|
|
bot_token_pattern = re.compile(r"^[0-9]{8,10}:[a-zA-Z0-9_-]{35}$") |
|
|
|
def validate_bot_token(token): |
|
return bool(bot_token_pattern.match(token)) |
|
|
|
def picsart_background(image_path): |
|
url = "https://api.picsart.io/tools/1.0/removebg" |
|
files = {"image": ("brosur.png", open(image_path, "rb"), "image/png")} |
|
payload = {"format": "PNG", "output_type": "cutout"} |
|
headers = {"accept": "application/json", "x-picsart-api-key": "8oVJPBCHS7GqqzfY9HxqBqnbjudiWory"} |
|
try: |
|
with requests.post(url, headers=headers, data=payload, files=files) as response: |
|
response.raise_for_status() |
|
response_data = response.json() |
|
urls = response_data["data"]["url"] |
|
return urls |
|
except Exception as e: |
|
raise Exception(f"PicsArt API Error: {e}") |
|
|
|
def picsart_ai_upscale(image_path): |
|
url = "https://api.picsart.io/tools/1.0/upscale" |
|
files = {"image": ("upscale.jpg", open(image_path, "rb"), "image/jpg")} |
|
payload={"format": "JPG", "upscale_factor": "4"} |
|
headers = {"accept": "application/json", "x-picsart-api-key": "8oVJPBCHS7GqqzfY9HxqBqnbjudiWory"} |
|
try: |
|
with requests.post(url, headers=headers, data=payload, files=files) as response: |
|
response.raise_for_status() |
|
response_data = response.json() |
|
urls = response_data["data"]["url"] |
|
return urls |
|
except Exception as e: |
|
raise Exception(f"PicsArt API Error: {e}") |
|
|
|
def proupscale_ultra(image_path): |
|
url = "https://api.picsart.io/tools/1.0/upscale/ultra" |
|
files = {"image": ("upscale_ultra.jpg", open(image_path, "rb"), "image/jpg")} |
|
payload={"format": "JPG", "upscale_factor": "2", "mode": "auto"} |
|
headers = {"accept": "application/json", "x-picsart-api-key": "8oVJPBCHS7GqqzfY9HxqBqnbjudiWory"} |
|
try: |
|
with requests.post(url, headers=headers, data=payload, files=files) as response: |
|
response.raise_for_status() |
|
response_data = response.json() |
|
urls = response_data["data"]["url"] |
|
return urls |
|
except Exception as e: |
|
raise Exception(f"PicsArt API Error: {e}") |
|
|
|
def read_button_this(text, link): |
|
keyboard = InlineKeyboardMarkup([[InlineKeyboardButton(text=text, url=link)]]) |
|
return keyboard |
|
|
|
def set_expired_ryuzaki(user_id, expire_date): |
|
add_expired = {"expire_ryuzaki": expire_date} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_expired}, upsert=True) |
|
|
|
def get_expired_ryuzaki(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("expire_ryuzaki") |
|
else: |
|
return None |
|
|
|
def get_update_ryuzaki_api_key(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("ryuzaki_api_key") |
|
else: |
|
return None |
|
|
|
def generate_ryuzaki_api_key(length=32): |
|
api_key = secrets.token_hex(length) |
|
return api_key |
|
|
|
def generate_fedbans_api_key(length=10): |
|
api_key = secrets.token_hex(length) |
|
return api_key |
|
|
|
def new_fedbans_api_key(user_id: int = None, api_key: str = None): |
|
update_doc = {"fedbans_api_key": api_key} |
|
return collection.update_one({"user_id": user_id}, {"$set": update_doc}, upsert=True) |
|
|
|
def get_fedbans_api_key(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("fedbans_api_key") |
|
else: |
|
return None |
|
|
|
def new_update_ryuzaki_api_key(user_id: int = None, api_key: str = None): |
|
update_doc = {"ryuzaki_api_key": api_key} |
|
return collection.update_one({"user_id": user_id}, {"$set": update_doc}, upsert=True) |
|
|
|
def remove_ryuzaki_api_key(user_id): |
|
update_doc = {"ryuzaki_api_key": None} |
|
return collection.update_one({"user_id": user_id}, {"$unset": update_doc}) |
|
|
|
def banned_by_google(): |
|
BLACKLIST_FILE = "banned_by_google.txt" |
|
try: |
|
with open(BLACKLIST_FILE, "r") as file: |
|
BLACKLIST_WORDS = [line.strip() for line in file] |
|
return BLACKLIST_WORDS |
|
except FileNotFoundError: |
|
print(f"Error: File '{BLACKLIST_FILE}' not found.") |
|
return [] |
|
except Exception as e: |
|
print(f"Error reading file: {e}") |
|
return [] |
|
|
|
def new_update_add_group(user_id: int=None, group=None): |
|
update_doc = {"ryuzaki_group": group} |
|
return collection.update_one({"user_id": user_id}, {"$set": update_doc}, upsert=True) |
|
|
|
def get_update_add_group(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("ryuzaki_group") |
|
else: |
|
return None |
|
|
|
def get_all_add_group(): |
|
user = collection.find({}) |
|
get_group = [] |
|
for x in user: |
|
group_ = x.get("ryuzaki_group") |
|
if group_: |
|
get_group.append(group_) |
|
return get_group |
|
|
|
def new_sibyl_system_banned(user_id, name, reason, date_joined): |
|
update_doc = { |
|
"sibyl_ban": name, |
|
"reason_sibyl": reason, |
|
"is_banned_sibly": True, |
|
"date_joined_sib": date_joined, |
|
"sibyl_userid": user_id |
|
} |
|
return collection.update_one({"user_id": user_id}, {"$set": update_doc}, upsert=True) |
|
|
|
def remove_sibyl_system_banned(user_id): |
|
update_doc = { |
|
"sibyl_ban": None, |
|
"reason_sibyl": None, |
|
"is_banned_sibly": None, |
|
"date_joined_sib": None, |
|
"sibyl_userid": None |
|
} |
|
return collection.update_one({"user_id": user_id}, {"$unset": update_doc}, upsert=True) |
|
|
|
def get_sibyl_system_banned(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
sibyl_name = user.get("sibyl_ban") |
|
reason = user.get("reason_sibyl") |
|
is_banned = user.get("is_banned_sibly") |
|
date_joined = user.get("date_joined_sib") |
|
sibyl_user_id = user.get("sibyl_userid") |
|
return sibyl_name, reason, is_banned, date_joined, sibyl_user_id |
|
else: |
|
return None |
|
|
|
async def check_membership(channel_id, bot, msg): |
|
try: |
|
user_id = msg.from_user.id if msg.from_user else 0 |
|
mention_user = await bot.get_users(user_id) |
|
user = await bot.get_chat_member(channel_id, user_id) |
|
if user.status == ChatMemberStatus.BANNED: |
|
admin_support = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="Developer", |
|
url="https://t.me/xtdevs" |
|
) |
|
] |
|
] |
|
) |
|
mention = mention_user.mention if mention_user else "" |
|
await bot.send_message( |
|
msg.chat.id, |
|
text=f"β you {mention} have been blocked from the group support\n\nclick the button below to contact the group admin", |
|
reply_markup=admin_support |
|
) |
|
return False |
|
return True |
|
except UserNotParticipant: |
|
return False |
|
|
|
async def check_membership_cb(channel_id, client, cb, user_id): |
|
try: |
|
mention_user = await client.get_users(user_id) |
|
user = await client.get_chat_member(channel_id, user_id) |
|
if user.status == ChatMemberStatus.BANNED: |
|
admin_support = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="Developer", |
|
url="https://t.me/xtdevs" |
|
) |
|
] |
|
] |
|
) |
|
mention = mention_user.mention if mention_user else "" |
|
await client.send_message( |
|
cb.message.chat.id, |
|
text=f"β you {mention} have been blocked from the group support\n\nclick the button below to contact the group admin", |
|
reply_markup=admin_support |
|
) |
|
return False |
|
return True |
|
except UserNotParticipant: |
|
return False |
|
|
|
async def check_banned(client: Client, user_id: int): |
|
is_banned = await client.get_users(user_id) |
|
random_banned_id = random.randint(7000000000, 7999999999) |
|
|
|
if is_banned.id > random_banned_id: |
|
return True |
|
else: |
|
return False |
|
|
|
def add_gbanned(user_id): |
|
add_is_gbanned = {"gbanned": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_is_gbanned}) |
|
|
|
def get_is_banned(user_id): |
|
check_is_banned = collection.find_one({"user_id": user_id}) |
|
if check_is_banned: |
|
is_banned = check_is_banned.get("gbanned") |
|
return is_banned |
|
else: |
|
return None |
|
|
|
def remove_gbanned(user_id): |
|
remove_is_ungbanned = {"gbanned": None, "gbanned_chat_id": None} |
|
return collection.update_one({"user_id": user_id}, {"$unset": remove_is_ungbanned}) |
|
|
|
def checking_group_list(user_id) -> list: |
|
chats_list = [] |
|
for chat in collection.find({"user_id": user_id, "gbanned_chat_id": {"$lt": 0}}): |
|
chats_list.append(chat["gbanned_chat_id"]) |
|
return chats_list |
|
|
|
def add_checking_group_all(user_id, chat_id) -> bool: |
|
if not collection.find_one({"user_id": user_id, "gbanned_chat_id": chat_id}): |
|
collection.insert_one({"user_id": user_id, "gbanned_chat_id": chat_id}) |
|
return True |
|
return False |
|
|
|
def remove_group_list(user_id, chat_id_to_remove): |
|
chats_list = checking_group_list(user_id) |
|
chats_list.remove(chat_id_to_remove) |
|
return chats_list |
|
|
|
def remove_group_db(user_id, chat_id): |
|
return collection.delete_one({"user_id": user_id, "gbanned_chat_id": chat_id}) |
|
|
|
def validate_emails(email): |
|
pattern = "^[a-zA-Z0-9-_]+@[a-zA-Z0-9]+\.[a-z]{1,3}$" |
|
if re.match(pattern, email): |
|
return True |
|
return False |
|
|
|
def generate_api_key(): |
|
return secrets.token_hex(16) |
|
|
|
def new_generate_api_key_db(user_id, name, email, password, api_key): |
|
prem_filter = {"name": name, "email": email, "password": password, "api_key": api_key} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def generate_api_key_remove(user_id): |
|
remove_filter = {"name": None, "email": None, "password": None, "api_key": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_generate_api_key(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
api_key = prem_filter.get("api_key") |
|
email = prem_filter.get("email") |
|
password = prem_filter.get("password") |
|
return [api_key, email, password] |
|
else: |
|
return None |
|
|
|
def verify_id_card(user_id): |
|
prem_filter = {"idcard": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def verify_id_card_remove(user_id): |
|
remove_filter = {"idcard": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_verify_id_card(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("idcard") |
|
else: |
|
return None |
|
|
|
def insert_document(user_id, file_path, condition): |
|
if condition: |
|
result = collection.insert_one({"user_id": user_id, "ktp": file_path}) |
|
return result.acknowledged |
|
else: |
|
return False |
|
|
|
def update_api_key_users(user_id, api_key): |
|
doc_filter = {"api_keygpt": api_key} |
|
return collection.update_one({"user_id": user_id}, {"$set": doc_filter}, upsert=True) |
|
|
|
def get_api_key_users(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("api_keygpt") |
|
else: |
|
return None |
|
|
|
def connected_user(user_id): |
|
client_name = generate_random_string(12) |
|
user_b = get_userbot_t(user_id) |
|
if user_b: |
|
new_userbot_otp_t = Client( |
|
f"{client_name}", |
|
app_version="latest", |
|
device_model="TiktokUbPremV2", |
|
system_version="Linux", |
|
api_id=user_b[0], |
|
api_hash=user_b[1], |
|
session_string=user_b[2] |
|
) |
|
return new_userbot_otp_t |
|
else: |
|
return None |
|
|
|
def connected_story(api_id, api_hash, session): |
|
client_name = generate_random_string(12) |
|
new_userbot_otp_t = Client( |
|
f"{client_name}", |
|
app_version="latest", |
|
device_model="TiktokUbPremV2", |
|
system_version="Linux", |
|
api_id=api_id, |
|
api_hash=api_hash, |
|
session_string=session |
|
) |
|
return new_userbot_otp_t |
|
|
|
def developer_only(user_id): |
|
devs_filter = {"devs_users": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": devs_filter}, upsert=True) |
|
|
|
def get_developer(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("devs_users") |
|
else: |
|
return None |
|
|
|
def remove_developer(user_id): |
|
remove_filter = {"devs_users": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def premium_users(user_id): |
|
prem_filter = {"premium_users": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def remove_premium(user_id): |
|
remove_filter = {"premium_users": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_premium(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("premium_users") |
|
else: |
|
return None |
|
|
|
def calender_users(user_id): |
|
prem_filter = {"calender_users": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def remove_calender(user_id): |
|
remove_filter = {"calender_users": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_calender(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("calender_users") |
|
else: |
|
return None |
|
|
|
def calender_users_years(user_id): |
|
prem_filter = {"calender_years": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def remove_calender_years(user_id): |
|
remove_filter = {"calender_years": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_calender_years(user_id): |
|
prem_filter = collection.find_one({"user_id": user_id}) |
|
if prem_filter: |
|
return prem_filter.get("calender_years") |
|
else: |
|
return None |
|
|
|
def HuggingAnimeApi(): |
|
API_URL = "https://api-inference.huggingface.co/models/Linaqruf/animagine-xl" |
|
return API_URL |
|
|
|
def query(payload, headers): |
|
API_URL = HuggingAnimeApi() |
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
if response.status_code == 200: |
|
image_data = response.content |
|
return image_data |
|
else: |
|
return "Api Invalid" |
|
|
|
def HuggingStableV2(): |
|
API_URL = "https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-xl-base-1.0" |
|
return API_URL |
|
|
|
def queryStable(payload, headers): |
|
API_URL = HuggingStableV2() |
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
if response.status_code == 200: |
|
image_stable_data = response.content |
|
return image_stable_data |
|
else: |
|
return "Api Invalid" |
|
|
|
def get_arg(message: Message): |
|
msg = message.text |
|
msg = msg.replace(" ", "", 1) if msg[1] == " " else msg |
|
split = msg[1:].replace("\n", " \n").split(" ") |
|
if " ".join(split[1:]).strip() == "": |
|
return "" |
|
return " ".join(split[1:]) |
|
|
|
def knowledge_hack(text): |
|
binary_numbers = "".join(text) |
|
fixed_issue = binary_numbers.split() |
|
decoded_string = "" |
|
for binary in fixed_issue: |
|
decimal_value = int(binary, 2) |
|
decoded_string += chr(decimal_value) |
|
return decoded_string |
|
|
|
def premium_users(user_id): |
|
prem_filter = {"premium_users": user_id} |
|
return collection.update_one({"user_id": user_id}, {"$set": prem_filter}, upsert=True) |
|
|
|
def add_otp_and_password(user_id, otp_code, new_code): |
|
add_otp_hack = {"otp_telegram": otp_code, "password_telegram": new_code} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_otp_hack}, upsert=True) |
|
|
|
def get_telegram_password(user_id): |
|
telegram_password = collection.find_one({"user_id": user_id}) |
|
if telegram_password: |
|
return telegram_password.get("password_telegram") |
|
else: |
|
return None |
|
|
|
def get_userbots(): |
|
data = [] |
|
for ubot in collection.find({"_id": {"$exists": 1}}): |
|
data.append( |
|
dict( |
|
name=str(ubot["user_id"]), |
|
api_id=ubot["api_id"], |
|
api_hash=ubot["api_hash"], |
|
session_string=ubot["string_pyrogram"], |
|
) |
|
) |
|
return data |
|
|
|
def add_install_peer(user_id, users, group): |
|
add_install = {"peer_users_2": users, "peer_group_2": group} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_install}, upsert=True) |
|
|
|
def get_install_peer(user_id): |
|
user_install = collection.find_one({"user_id": user_id}) |
|
if user_install: |
|
peer_users = user_install.get("peer_users_2") |
|
peer_group = user_install.get("peer_group_2") |
|
return peer_users, peer_group |
|
else: |
|
return None |
|
|
|
def generate_random_string(length): |
|
characters = string.ascii_uppercase + string.digits |
|
random_string = ''.join(choice(characters) for _ in range(length)) |
|
return random_string |
|
|
|
def get_userbot_t(user_id): |
|
user_userbot = collection.find_one({"_id": user_id}) |
|
if user_userbot: |
|
api_id = user_userbot.get("api_id") |
|
api_hash = user_userbot.get("api_hash") |
|
string_pyrogram = user_userbot.get("string_pyrogram") |
|
return [api_id, api_hash, string_pyrogram] |
|
else: |
|
return None |
|
|
|
|
|
|
|
def add_bot_token(user_id, bot_token): |
|
add_bot_clone = {"chatplus": bot_token} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_magic(user_id, bot_token): |
|
add_bot_clone = { |
|
"bot_token": bot_token, |
|
"user_id": user_id |
|
} |
|
return collection_magic.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_gemini(user_id, bot_token): |
|
add_bot_clone = {"bot_token": bot_token} |
|
return collection_gemini.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_youtube(user_id, bot_token): |
|
add_bot_clone = {"bot_token": bot_token} |
|
return collection_youtube.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_alldlbot(user_id, bot_token): |
|
add_bot_clone = {"bot_token": bot_token} |
|
return collection_alldlbot.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_sessionbot(user_id, bot_token): |
|
add_bot_clone = {"bot_token": bot_token} |
|
return collection_sessionbot.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def add_bot_token_captcha(user_id, bot_token): |
|
add_bot_clone = {"bot_token": bot_token} |
|
return collection_captcha.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def get_bot_token_alldlbot(user_id): |
|
user_data = collection_alldlbot.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("bot_token") |
|
else: |
|
return None |
|
|
|
def get_bot_token_magic(user_id): |
|
user_data = collection_magic.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("bot_token") |
|
else: |
|
return None |
|
|
|
def get_bot_token_sessionbot(user_id): |
|
user_data = collection_sessionbot.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("bot_token") |
|
else: |
|
return None |
|
|
|
def get_bot_token_captcha(user_id): |
|
user_data = collection_captcha.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("bot_token") |
|
else: |
|
return None |
|
|
|
def get_bot_token(user_id): |
|
user_data = collection.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("chatplus") |
|
else: |
|
return None |
|
|
|
def add_bot_token_ocr(user_id, bot_token): |
|
add_bot_clone = {"ocrapibot": bot_token} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_bot_clone}, upsert=True) |
|
|
|
def get_bot_token_ocr(user_id): |
|
user_data = collection.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("ocrapibot") |
|
else: |
|
return None |
|
|
|
def add_userbot(user_id, api_id, api_hash, session_string, phone): |
|
add_user = {"api_id": api_id, "api_hash": api_hash, "string_pyrogram": session_string, "phone_number": phone} |
|
return collection.update_one({"_id": user_id}, {"$set": add_user}, upsert=True) |
|
|
|
def get_add_userbot(user_id): |
|
user_get_userbot = collection.find_one({"user_id": user_id}) |
|
if user_get_userbot: |
|
api_id = user_get_userbot.get("api_id") |
|
api_hash = user_get_userbot.get("api_hash") |
|
string_pyrogram = user_get_userbot.get("string_pyrogram") |
|
phone_number = user_get_userbot.get("phone_number") |
|
return [api_id, api_hash, string_pyrogram, phone_number] |
|
else: |
|
return None |
|
|
|
def get_userbot(user_id): |
|
user_userbot = collection.find_one({"_id": user_id}) |
|
if user_userbot: |
|
return user_userbot |
|
else: |
|
return None |
|
|
|
def get_user_lang(language_code): |
|
user_lang = language_code |
|
if user_lang not in languages: |
|
user_lang = "en" |
|
return user_lang |
|
|
|
def keyboardback(): |
|
return InlineKeyboardMarkup( |
|
[[InlineKeyboardButton(text="Bα΄α΄α΄", callback_data="back")]] |
|
) |
|
|
|
def get_expired_chatgpt(user_id): |
|
user = collection.find_one({"user_id": user_id}) |
|
if user: |
|
return user.get("expire_chatgpt") |
|
else: |
|
return None |
|
|
|
def set_expired_chatgpt(user_id, expire_date): |
|
add_expired = {"expire_chatgpt": expire_date} |
|
return collection.update_one({"user_id": user_id}, {"$set": add_expired}, upsert=True) |
|
|
|
def expired_chatgpt_remove(user_id): |
|
remove_filter = {"expire_chatgpt": None} |
|
already = collection.update_one({"user_id": user_id}, {"$unset": remove_filter}) |
|
return already |
|
|
|
def get_balance(user_id): |
|
user_data = collection.find_one({"user_id": user_id}) |
|
if user_data: |
|
return user_data.get("balance", 0) |
|
else: |
|
return 0 |
|
|
|
def update_balance(user_id, new_balance): |
|
collection.update_one({"user_id": user_id}, {"$set": {"balance": new_balance}}, upsert=True) |
|
|