from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from sqlalchemy.orm import Session from database import engine, get_db from models import Base, User import os Base.metadata.create_all(bind=engine) # Load secrets from environment variables or set defaults SECRET_KEY = os.getenv("SECRET_KEY", "def6nQHONW99pOPyba9DShny6FB1CJJBigZault") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 # OAuth2 scheme (Ensure the token URL matches the actual login endpoint) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def register_user(username: str, password: str, db: Session): existing_user = db.query(User).filter(User.username == username).first() if existing_user: raise HTTPException(status_code=400, detail="Username already taken") hashed_password = hash_password(password) new_user = User(username=username, password=hashed_password) db.add(new_user) db.commit() db.refresh(new_user) return new_user def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def authenticate_user(username: str, password: str, db: Session): user = db.query(User).filter(User.username == username).first() if not user or not verify_password(password, user.password): return None return user def create_token(data: dict, expires_delta: timedelta, secret_key: str) -> str: """Generate JWT token.""" to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str, secret_key: str) -> str: """Verifies JWT token and extracts the username.""" try: payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"}, ) return username except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) def verify_access_token(token: str = Depends(oauth2_scheme)) -> str: """Verifies access token and returns the username.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid access token", headers={"WWW-Authenticate": "Bearer"}, ) return username except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, )