|
import os |
|
import time |
|
import json |
|
import asyncio |
|
import io |
|
import re |
|
import logging |
|
import string |
|
import random |
|
import requests |
|
from pyrogram import Client, filters |
|
from pyrogram.types import * |
|
from pyrogram.enums import * |
|
from pyrogram.errors import * |
|
|
|
from PIL import Image, ImageDraw, ImageFont, ImageFilter |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
captcha_texts = {} |
|
captcha_modes = {} |
|
|
|
NOT_ALLOWED_NON_PROGRAMMER = [ |
|
5575183435, |
|
948247711, |
|
] |
|
|
|
def generate_math_captcha(): |
|
operations = ['+', '-', '*'] |
|
operation = random.choice(operations) |
|
num1 = random.randint(1, 20) |
|
num2 = random.randint(1, 20) |
|
|
|
if operation == '+': |
|
correct_answer = num1 + num2 |
|
elif operation == '-': |
|
correct_answer = num1 - num2 |
|
else: |
|
correct_answer = num1 * num2 |
|
captcha_text = f"{num1} {operation} {num2} = ?" |
|
choices = [str(correct_answer)] |
|
while len(choices) < 3: |
|
wrong_answer = correct_answer + random.choice([-5, -3, -2, 2, 3, 5]) |
|
if str(wrong_answer) not in choices: |
|
choices.append(str(wrong_answer)) |
|
random.shuffle(choices) |
|
|
|
width, height = 250, 100 |
|
background_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) |
|
img = Image.new('RGB', (width, height), color=background_color) |
|
d = ImageDraw.Draw(img) |
|
|
|
for _ in range(1000): |
|
x = random.randint(0, width) |
|
y = random.randint(0, height) |
|
noise_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)) |
|
d.point((x, y), fill=noise_color) |
|
|
|
try: |
|
font = ImageFont.truetype("arial.ttf", 40) |
|
except IOError: |
|
font = ImageFont.load_default() |
|
|
|
text_width, text_height = d.textsize(captcha_text, font=font) |
|
text_x = (width - text_width) / 2 |
|
text_y = (height - text_height) / 2 - 20 |
|
|
|
text_image = Image.new('RGBA', (text_width, text_height), (255, 255, 255, 0)) |
|
text_draw = ImageDraw.Draw(text_image) |
|
text_draw.text((0, 0), captcha_text, font=font, fill=(0, 0, 0)) |
|
rotated_text = text_image.rotate(random.randint(-25, 25), expand=1) |
|
img.paste(rotated_text, (int(text_x), int(text_y)), rotated_text) |
|
|
|
choice_font = ImageFont.truetype("arial.ttf", 30) if font else ImageFont.load_default() |
|
for idx, choice in enumerate(choices): |
|
choice_text = f"{idx + 1}. {choice}" |
|
choice_width, choice_height = d.textsize(choice_text, font=choice_font) |
|
choice_x = 50 + idx * 80 |
|
choice_y = height - choice_height - 20 |
|
d.text((choice_x, choice_y), choice_text, font=choice_font, fill=(0, 0, 0)) |
|
|
|
for _ in range(5): |
|
start = (random.randint(0, width), random.randint(0, height)) |
|
end = (random.randint(0, width), random.randint(0, height)) |
|
line_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) |
|
d.line([start, end], fill=line_color, width=2) |
|
|
|
img = img.filter(ImageFilter.BLUR) |
|
|
|
img_path = f"captcha_math_{captcha_text}.png" |
|
img.save(img_path) |
|
|
|
return captcha_text, img_path, choices, correct_answer |
|
|
|
def generate_text_captcha(): |
|
letters = string.ascii_uppercase + string.digits |
|
captcha_text = ''.join(random.choice(letters) for _ in range(5)) |
|
|
|
choices = [captcha_text] |
|
while len(choices) < 3: |
|
wrong_choice = ''.join(random.choice(letters) for _ in range(5)) |
|
if wrong_choice not in choices: |
|
choices.append(wrong_choice) |
|
random.shuffle(choices) |
|
|
|
width, height = 200, 80 |
|
background_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) |
|
img = Image.new('RGB', (width, height), color=background_color) |
|
d = ImageDraw.Draw(img) |
|
|
|
for _ in range(500): |
|
x = random.randint(0, width) |
|
y = random.randint(0, height) |
|
noise_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)) |
|
d.point((x, y), fill=noise_color) |
|
|
|
try: |
|
font = ImageFont.truetype("arial.ttf", 45) |
|
except IOError: |
|
font = ImageFont.load_default() |
|
|
|
text_width, text_height = d.textsize(captcha_text, font=font) |
|
text_x = (width - text_width) / 2 |
|
text_y = (height - text_height) / 2 |
|
|
|
text_image = Image.new('RGBA', (text_width, text_height), (255, 255, 255, 0)) |
|
text_draw = ImageDraw.Draw(text_image) |
|
text_draw.text((0, 0), captcha_text, font=font, fill=(0, 0, 0)) |
|
rotated_text = text_image.rotate(random.randint(-25, 25), expand=1) |
|
img.paste(rotated_text, (int(text_x), int(text_y)), rotated_text) |
|
|
|
choice_font = ImageFont.truetype("arial.ttf", 30) if font else ImageFont.load_default() |
|
for idx, choice in enumerate(choices): |
|
choice_text = f"{idx + 1}. {choice}" |
|
choice_width, choice_height = d.textsize(choice_text, font=choice_font) |
|
choice_x = 50 + idx * 80 |
|
choice_y = height - choice_height - 20 |
|
d.text((choice_x, choice_y), choice_text, font=choice_font, fill=(0, 0, 0)) |
|
|
|
for _ in range(5): |
|
start = (random.randint(0, width), random.randint(0, height)) |
|
end = (random.randint(0, width), random.randint(0, height)) |
|
line_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) |
|
d.line([start, end], fill=line_color, width=2) |
|
|
|
img = img.filter(ImageFilter.BLUR) |
|
img_path = f"captcha_text_{captcha_text}.png" |
|
img.save(img_path) |
|
|
|
return captcha_text, img_path, choices, captcha_text |
|
|
|
def generate_captcha(user_id, mode='math'): |
|
if mode == 'math': |
|
return generate_math_captcha() |
|
else: |
|
return generate_text_captcha() |
|
|
|
def thanks_hacker_by_randydev(): |
|
url = "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcR0u8UqJ2JDhfmPOCb_zAHjUQG2NYMjTwLbkq_sQhCQOxX8hn66YbaGFvLL&s=10" |
|
response = requests.get(url) |
|
image_hacker = "hacker.png" |
|
with open(image_hacker, "wb") as f: |
|
f.write(response.content) |
|
return image_hacker |
|
|
|
def failed_hacker_by_randydev(): |
|
url = "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTM0vb4s59H9F1-_7FyELiLU04e8bCHy7o6KQV2mG3DFRLnzP547KjckKG2&s=10" |
|
response = requests.get(url) |
|
image_hacker = "failed_hacker.png" |
|
with open(image_hacker, "wb") as f: |
|
f.write(response.content) |
|
return image_hacker |
|
|
|
async def remove_captcha_after_timeout(client, user_id, delay=300): |
|
await asyncio.sleep(delay) |
|
if user_id in captcha_texts: |
|
captcha_data = captcha_texts.get(user_id) |
|
chat_id = captcha_data['chat_id'] |
|
await client.decline_chat_join_request( |
|
chat_id=chat_id, |
|
user_id=user_id |
|
) |
|
await client.send_message(user_id, "β° Your CAPTCHA verification has expired. Please try again.") |
|
del captcha_texts[user_id] |
|
logger.info(f"CAPTCHA for user {user_id} has expired and been removed.") |
|
|
|
@Client.on_message(filters.command("settingmode") & filters.private) |
|
async def setting_mode(client: Client, message: Message): |
|
if message.from_user.id in NOT_ALLOWED_NON_PROGRAMMER: |
|
return |
|
user_id = message.from_user.id |
|
keyboard = InlineKeyboardMarkup( |
|
[ |
|
[InlineKeyboardButton("π’ Mathematics", callback_data="mode_math")], |
|
[InlineKeyboardButton("π€ Text", callback_data="mode_text")], |
|
[InlineKeyboardButton("β Cancel", callback_data="cancel_mode")] |
|
] |
|
) |
|
await message.reply_text( |
|
"π Select the CAPTCHA mode you want:", |
|
reply_markup=keyboard |
|
) |
|
|
|
@Client.on_callback_query(filters.regex("^mode_")) |
|
async def mode_callback(client: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
data = cb.data.split("_") |
|
if len(data) != 2: |
|
await cb.answer("β Invalid selection.", show_alert=True) |
|
return |
|
mode = data[1] |
|
if mode not in ['math', 'text']: |
|
await cb.answer("β Invalid mode.", show_alert=True) |
|
return |
|
captcha_modes[user_id] = mode |
|
keyboard = InlineKeyboardMarkup( |
|
[ |
|
[InlineKeyboardButton("π’ Mathematics", callback_data="mode_math")], |
|
[InlineKeyboardButton("π€ Text", callback_data="mode_text")], |
|
[InlineKeyboardButton("β Cancel", callback_data="cancel_mode")] |
|
] |
|
) |
|
await cb.edit_message_text( |
|
f"β
CAPTCHA mode has been set to **{'Mathematics' if mode == 'math' else 'Text'}**.", |
|
reply_markup=keyboard |
|
) |
|
await cb.answer("The mode has been changed.", show_alert=False) |
|
logger.info(f"User {user_id} set CAPTCHA mode to {mode}.") |
|
|
|
@Client.on_callback_query(filters.regex("^cancel_mode$")) |
|
async def cancel_mode_callback(client: Client, cb: CallbackQuery): |
|
await cb.edit_message_text("β CAPTCHA mode setting has been canceled.") |
|
await cb.answer("Cancellation successful.", show_alert=False) |
|
logger.info(f"User {cb.from_user.id} canceled CAPTCHA mode setting.") |
|
|
|
@Client.on_chat_join_request(filters.chat("KillerXSupport")) |
|
async def join_request(client: Client, event: ChatJoinRequest): |
|
member = await client.get_chat_member(event.chat.id, "me") |
|
if member.status != ChatMemberStatus.ADMINISTRATOR: |
|
return await client.send_message(event.chat.id, text="I am not an administrator in this group.") |
|
|
|
try: |
|
chat_link = await client.export_chat_invite_link(event.chat.id) |
|
except ChatAdminRequired: |
|
await client.send_message(event.chat.id, text="I need to be an administrator to perform this action.") |
|
return |
|
|
|
mode = captcha_modes.get(event.from_user.id, "text") |
|
captcha_text, img_path, choices, correct_answer = generate_captcha(event.from_user.id, mode) |
|
captcha_texts[event.from_user.id] = { |
|
"captcha_text": captcha_text, |
|
"correct_answer": correct_answer, |
|
"chat_id": event.chat.id, |
|
"chat_link": chat_link, |
|
"first_name": event.from_user.first_name |
|
} |
|
|
|
buttons = [ |
|
[InlineKeyboardButton(choice, callback_data=f"verify_{event.from_user.id}_{choice}")] |
|
for choice in choices |
|
] |
|
buttons.append([ |
|
InlineKeyboardButton("π Refresh CAPTCHA", callback_data="refresh_captcha"), |
|
InlineKeyboardButton("β Cancel", callback_data="cancel_captcha") |
|
]) |
|
keyboard = InlineKeyboardMarkup(buttons) |
|
|
|
if event.chat.type == ChatType.SUPERGROUP: |
|
try: |
|
await client.send_message( |
|
event.chat.id, |
|
text=f" π¦ Verify that you {event.from_user.first_name} are human!", |
|
reply_markup=create_button_userinfo(event.from_user.id, client.me.username) |
|
) |
|
await client.send_photo( |
|
event.from_user.id, |
|
photo=img_path, |
|
caption=f"βοΈ **Verify that you are human!**\n\nβ Please select the correct CAPTCHA text shown in the image below.", |
|
reply_markup=keyboard |
|
) |
|
os.remove(img_path) |
|
asyncio.create_task(remove_captcha_after_timeout(client, event.from_user.id)) |
|
except Exception as e: |
|
await client.send_message( |
|
event.chat.id, |
|
text=str(e) |
|
) |
|
logger.error(str(e)) |
|
|
|
@Client.on_callback_query(filters.regex("^verify_")) |
|
async def verify_captcha_callback(client: Client, cb: CallbackQuery): |
|
data = cb.data.split("_") |
|
if len(data) != 3: |
|
await cb.answer("β Format data tidak valid.", show_alert=True) |
|
return |
|
_, user_id_str, user_choice = data |
|
try: |
|
user_id = int(user_id_str) |
|
except ValueError: |
|
await cb.answer("β Invalid user ID.", show_alert=True) |
|
return |
|
if cb.from_user.id != user_id: |
|
await cb.answer("β You have no right to do this.", show_alert=True) |
|
logger.warning(f"User {cb.from_user.id} mencoba memverifikasi CAPTCHA milik user {user_id}.") |
|
return |
|
if user_id not in captcha_texts: |
|
await cb.answer("βοΈ No active CAPTCHA verification.", show_alert=True) |
|
logger.warning(f"User {user_id} mencoba memverifikasi CAPTCHA tanpa aktif.") |
|
return |
|
captcha_data = captcha_texts.get(user_id) |
|
captcha_text = captcha_data["captcha_text"] |
|
correct_answer = captcha_data["correct_answer"] |
|
chat_id = captcha_data["chat_id"] |
|
chat_link = captcha_data["chat_link"] |
|
first_name = captcha_data["first_name"] |
|
|
|
failed_image = failed_hacker_by_randydev() |
|
hacker_image = thanks_hacker_by_randydev() |
|
|
|
try: |
|
if str(user_choice) == str(correct_answer): |
|
await cb.edit_message_media( |
|
media=InputMediaPhoto( |
|
hacker_image, |
|
caption="β
CAPTCHA verification successful!" |
|
), |
|
reply_markup=create_button_join_group(chat_link) |
|
) |
|
logger.info(f"User {user_id} berhasil memverifikasi CAPTCHA.") |
|
if user_id in NOT_ALLOWED_NON_PROGRAMMER: |
|
await client.ban_chat_member(chat_id=chat_id, user_id=user_id) |
|
return |
|
await client.approve_chat_join_request( |
|
chat_id=chat_id, |
|
user_id=user_id |
|
) |
|
await client.send_message(chat_id, f"Thank you for joining {first_name}") |
|
del captcha_texts[user_id] |
|
else: |
|
await cb.edit_message_media( |
|
media=InputMediaPhoto( |
|
failed_image, |
|
caption="β **CAPTCHA verification failed.**\n\nPlease try again." |
|
) |
|
) |
|
await client.decline_chat_join_request( |
|
chat_id=chat_id, |
|
user_id=user_id |
|
) |
|
await client.send_message(chat_id, f"Failed to join {first_name}") |
|
logger.info(f"User {user_id} gagal memverifikasi CAPTCHA.") |
|
del captcha_texts[user_id] |
|
except Exception as e: |
|
await cb.answer(f"Error CAPTCHA: {e}", show_alert=True) |
|
|
|
@Client.on_callback_query(filters.regex("^refresh_captcha$")) |
|
async def refresh_captcha_callback(client: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
mode = captcha_modes.get(user_id, 'math') |
|
if user_id in captcha_texts: |
|
del captcha_texts[user_id] |
|
logger.info(f"Old CAPTCHA for user {user_id} has been removed.") |
|
captcha_text, img_path, choices, correct_answer = generate_captcha(user_id, mode) |
|
captcha_texts[user_id] = { |
|
"captcha_text": captcha_text, |
|
"correct_answer": correct_answer, |
|
"chat_id": captcha_texts[user_id]["chat_id"], |
|
"chat_link": captcha_texts[user_id]["chat_link"], |
|
"first_name": captcha_texts[user_id]["first_name"] |
|
|
|
} |
|
logger.info(f"Generated new {mode} CAPTCHA for user {user_id}: {captcha_text}") |
|
buttons = [ |
|
[InlineKeyboardButton(choice, callback_data=f"verify_{user_id}_{choice}")] |
|
for choice in choices |
|
] |
|
buttons.append([ |
|
InlineKeyboardButton("π Refresh CAPTCHA", callback_data="refresh_captcha"), |
|
InlineKeyboardButton("β Cancel", callback_data="cancel_captcha") |
|
]) |
|
keyboard = InlineKeyboardMarkup(buttons) |
|
try: |
|
await cb.edit_message_media( |
|
media=InputMediaPhoto(img_path), |
|
reply_markup=keyboard |
|
) |
|
logger.info(f"Updated CAPTCHA image for user {user_id}.") |
|
except Exception as e: |
|
await cb.answer("β Failed to update CAPTCHA.", show_alert=True) |
|
logger.error(f"Error refreshing CAPTCHA for user {user_id}: {e}") |
|
os.remove(img_path) |
|
await cb.answer("π CAPTCHA has been updated!", show_alert=False) |
|
|
|
@Client.on_callback_query(filters.regex("^cancel_captcha$")) |
|
async def cancel_captcha_callback(client: Client, cb: CallbackQuery): |
|
user_id = cb.from_user.id |
|
if user_id in captcha_texts: |
|
del captcha_texts[user_id] |
|
logger.info(f"User {user_id} has canceled CAPTCHA verification.") |
|
await cb.edit_message_caption( |
|
caption="β **CAPTCHA verification has been canceled.**\n\nIf you want to try again.", |
|
) |
|
await cb.answer("CAPTCHA verification canceled.", show_alert=False) |
|
else: |
|
await cb.answer("βοΈ No active CAPTCHA verification found.", show_alert=True) |
|
logger.warning(f"User {user_id} attempted to cancel CAPTCHA without active verification.") |
|
|
|
@Client.on_callback_query(filters.regex("^close$")) |
|
async def close_final(client: Client, cb: CallbackQuery): |
|
await cb.message.delete() |
|
await cb.answer() |
|
|
|
def create_button_join_group(chat_link): |
|
return InlineKeyboardMarkup( |
|
[ |
|
[InlineKeyboardButton("ποΈ Join chat", url=chat_link)], |
|
[InlineKeyboardButton("π Close", callback_data="close")], |
|
] |
|
) |
|
|
|
def create_button_userinfo(user_id, username): |
|
return InlineKeyboardMarkup( |
|
[ |
|
[InlineKeyboardButton("π€ Chmod +W $USER", user_id=user_id)], |
|
[InlineKeyboardButton("π Check human Bot", url=f"https://t.me/{username}")], |
|
] |
|
) |