File size: 5,161 Bytes
13ba451
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time
from typing import Dict
import jwt
import secrets
import logging
from fastapi import Depends, HTTPException
import base64
from datetime import datetime, timedelta
from repository import UserRepository, UserLoginRepository
import string, random

def check_token_is_valid(token):
    check = UserRepository.getEmailUserByAccessToken(token)
    if check is None:
        return False
    return True

def unique_string(byte: int = 8) -> str:
    return secrets.token_urlsafe(byte)
JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
JWT_ALGORITHM = "HS512"
SECRET_KEY= "8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"

def token_response(token: str):
    return {
        "access_token": token
    }
def str_encode(string: str) -> str:
    return base64.b85encode(string.encode('ascii')).decode('ascii')

def get_token_payload(token: str, secret: str, algo: str):
    try:
        payload = jwt.decode(token, secret, algorithms=algo)
    except Exception as jwt_exec:
        logging.debug(f"JWT Error: {str(jwt_exec)}")
        payload = None
    return payload

from datetime import datetime
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
    expire = datetime.now() + expiry
    payload.update({"exp": expire})
    return jwt.encode(payload, secret, algorithm=algo)

def str_decode(string: str) -> str:
    return base64.b85decode(string.encode('ascii')).decode('ascii')

def generate_random_string(length=12):
    characters = string.ascii_letters + string.digits
    random_string = ''.join(random.choice(characters) for i in range(length))
    return random_string

import pytz
from datetime import datetime
def signJWT(user_email: str) -> Dict[str, str]:
    rt_expires = timedelta(days=3)
    refresh_key = unique_string(100)
    access_key = unique_string(50)
    at_expires = timedelta(minutes=180)
    at_payload = {
        "sub": str_encode(str(user_email)),
        'a': access_key,
    }
    access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
    rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
    refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
    expires_in = at_expires.seconds
    vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
    current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
    formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
    existing_user = UserRepository.getUserByEmail(user_email)
    if existing_user is None:
        UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
    else:
        UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
    user_record = UserRepository.getUserByEmail(user_email)
    session_id = ""
    if user_record:
        session_id = generate_random_string()
        existing_userlogin = UserLoginRepository.getUserLogin(user_email)
        if existing_userlogin is None:
            UserLoginRepository.addUserLogin(user_email,session_id=session_id)
        else:
            UserLoginRepository.updateUserLogin(user_email, session_id)
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "expires_in": at_expires.seconds,
        "session_id": session_id
     }

def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
    access_key = unique_string(50)
    at_expires = timedelta(minutes=180)
    at_payload = {
        "sub": str_encode(str(user_email)),
        'a': access_key,
    }
    access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
    user_record = UserRepository.getUserByEmail(user_email)
    session_id = ""
    if user_record:
        email1 = user_record.email
    if email1:
        session_id = generate_random_string()
        existing_userlogin = UserLoginRepository.getUserLogin(user_email)
        if existing_userlogin is None:
            UserLoginRepository.addUserLogin(user_email,session_id=session_id)
        else:
            UserLoginRepository.updateUserLogin(user_email,session_id)
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "expires_in": at_expires.seconds,
        "session_id": session_id
    }

def decodeJWT(token: str) -> dict:
    try:
        decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        return decoded_token if decoded_token["exp"] >= time.time() else None
    except:
        return {}
    
def get_refresh_token(refresh_token, email):
    token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
    if not token_payload:
        raise HTTPException(status_code=403, detail="Invalid Request.")
    exp = token_payload.get('exp')
    if exp >= time.time() and token_payload:
       return returnAccessToken(email,refresh_token)
    elif not token_payload:
       return signJWT(email)