Spaces:
Running
Running
from fastapi import Depends, HTTPException, status | |
from fastapi.security import OAuth2PasswordBearer | |
from jose import JWTError, jwt | |
from passlib.context import CryptContext | |
from datetime import datetime, timedelta | |
from sqlalchemy.orm import Session | |
from database import engine, get_db | |
from models import Base, User | |
import os | |
Base.metadata.create_all(bind=engine) | |
# Load secrets from environment variables or set defaults | |
SECRET_KEY = os.getenv("SECRET_KEY", "def6nQHONW99pOPyba9DShny6FB1CJJBigZault") | |
ALGORITHM = "HS256" | |
ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
REFRESH_TOKEN_EXPIRE_DAYS = 7 | |
# OAuth2 scheme (Ensure the token URL matches the actual login endpoint) | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") | |
# Password hashing context | |
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
def hash_password(password: str) -> str: | |
return pwd_context.hash(password) | |
def register_user(username: str, password: str, db: Session): | |
existing_user = db.query(User).filter(User.username == username).first() | |
if existing_user: | |
raise HTTPException(status_code=400, detail="Username already taken") | |
hashed_password = hash_password(password) | |
new_user = User(username=username, password=hashed_password) | |
db.add(new_user) | |
db.commit() | |
db.refresh(new_user) | |
return new_user | |
def verify_password(plain_password: str, hashed_password: str) -> bool: | |
return pwd_context.verify(plain_password, hashed_password) | |
def authenticate_user(username: str, password: str, db: Session): | |
user = db.query(User).filter(User.username == username).first() | |
if not user or not verify_password(password, user.password): | |
return None | |
return user | |
def create_token(data: dict, expires_delta: timedelta, secret_key: str) -> str: | |
"""Generate JWT token.""" | |
to_encode = data.copy() | |
expire = datetime.utcnow() + expires_delta | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM) | |
return encoded_jwt | |
def verify_token(token: str, secret_key: str) -> str: | |
"""Verifies JWT token and extracts the username.""" | |
try: | |
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM]) | |
username: str = payload.get("sub") | |
if username is None: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid token", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
return username | |
except JWTError: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
def verify_access_token(token: str = Depends(oauth2_scheme)) -> str: | |
"""Verifies access token and returns the username.""" | |
try: | |
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
username: str = payload.get("sub") | |
if username is None: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid access token", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
return username | |
except JWTError: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) |