Spaces:
Running
Running
File size: 3,469 Bytes
546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a 546720a e4f5d4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from database import engine, get_db
from models import Base, User
import os
Base.metadata.create_all(bind=engine)
# Load secrets from environment variables or set defaults
SECRET_KEY = os.getenv("SECRET_KEY", "def6nQHONW99pOPyba9DShny6FB1CJJBigZault")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# OAuth2 scheme (Ensure the token URL matches the actual login endpoint)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def register_user(username: str, password: str, db: Session):
existing_user = db.query(User).filter(User.username == username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already taken")
hashed_password = hash_password(password)
new_user = User(username=username, password=hashed_password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(username: str, password: str, db: Session):
user = db.query(User).filter(User.username == username).first()
if not user or not verify_password(password, user.password):
return None
return user
def create_token(data: dict, expires_delta: timedelta, secret_key: str) -> str:
"""Generate JWT token."""
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str, secret_key: str) -> str:
"""Verifies JWT token and extracts the username."""
try:
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_access_token(token: str = Depends(oauth2_scheme)) -> str:
"""Verifies access token and returns the username."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid access token",
headers={"WWW-Authenticate": "Bearer"},
)
return username
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) |