akn-dev / akn /utils /gmail_verifed.py
randydev's picture
fix revert back and update
21bc372
import random
import string
from config import *
from akn.utils.database import *
from datetime import datetime, timedelta
TEST_VIA_EMAIL_TEXT = """
Your login code: <spoiler>{otp}</spoiler> Never give this code to anyone,
even if they say it is from **AkenoAi**
"""
def generate_otp(length=6):
return ''.join(random.choices(string.digits, k=length))
async def store_otp(email: str, otp: str):
expiration_time = datetime.utcnow() + timedelta(minutes=5)
otp_data = {
"email": email,
"otp": otp,
"expires_at": expiration_time
}
await otp_collection.update_one(
{"email": email},
{"$set": otp_data},
upsert=True
)
async def verify_otp_in_db(email: str, otp: str):
otp_record = await otp_collection.find_one({"email": email})
if otp_record and otp_record["otp"] == otp:
if otp_record["expires_at"] > datetime.utcnow():
return True
return False
async def email_send_otp(email: str):
otp = generate_otp()
if email == str(ME_GMAIL):
return {"message": f"can't be blacklisted"}
await store_otp(email, otp)
try:
await send_check_otp_email(
receiver_email=email,
text=TEST_VIA_EMAIL_TEXT.format(
otp=otp
)
)
return {"message": f"OTP sent to check your email"}
except Exception as e:
return {"message": f"Error: {e}"}
async def email_verify_otp(email: str, otp: str):
is_valid = await verify_otp_in_db(email, otp)
if is_valid:
return {"message": "OTP verified successfully!"}
else:
return {"message": "Invalid or expired OTP"}