|
import random |
|
import string |
|
from config import * |
|
from akn.utils.database import * |
|
from datetime import datetime, timedelta |
|
|
|
TEST_VIA_EMAIL_TEXT = """ |
|
Your login code: <spoiler>{otp}</spoiler> Never give this code to anyone, |
|
even if they say it is from **AkenoAi** |
|
""" |
|
|
|
def generate_otp(length=6): |
|
return ''.join(random.choices(string.digits, k=length)) |
|
|
|
async def store_otp(email: str, otp: str): |
|
expiration_time = datetime.utcnow() + timedelta(minutes=5) |
|
otp_data = { |
|
"email": email, |
|
"otp": otp, |
|
"expires_at": expiration_time |
|
} |
|
await otp_collection.update_one( |
|
{"email": email}, |
|
{"$set": otp_data}, |
|
upsert=True |
|
) |
|
|
|
async def verify_otp_in_db(email: str, otp: str): |
|
otp_record = await otp_collection.find_one({"email": email}) |
|
if otp_record and otp_record["otp"] == otp: |
|
if otp_record["expires_at"] > datetime.utcnow(): |
|
return True |
|
return False |
|
|
|
async def email_send_otp(email: str): |
|
otp = generate_otp() |
|
if email == str(ME_GMAIL): |
|
return {"message": f"can't be blacklisted"} |
|
await store_otp(email, otp) |
|
try: |
|
await send_check_otp_email( |
|
receiver_email=email, |
|
text=TEST_VIA_EMAIL_TEXT.format( |
|
otp=otp |
|
) |
|
) |
|
return {"message": f"OTP sent to check your email"} |
|
except Exception as e: |
|
return {"message": f"Error: {e}"} |
|
|
|
async def email_verify_otp(email: str, otp: str): |
|
is_valid = await verify_otp_in_db(email, otp) |
|
if is_valid: |
|
return {"message": "OTP verified successfully!"} |
|
else: |
|
return {"message": "Invalid or expired OTP"} |
|
|