Spaces:
Running
Running
from fastapi import APIRouter, HTTPException, Depends, Request, status | |
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
from passlib.context import CryptContext | |
from jose import JWTError, jwt | |
from datetime import datetime, timedelta, timezone | |
from typing import Optional, Dict, Any, List | |
import os | |
import time | |
import logging | |
import traceback | |
from pydantic import BaseModel, EmailStr | |
# Import the HTTP API database utility | |
from app.utils import db_http | |
# Configure logging | |
logger = logging.getLogger("auth-server") | |
router = APIRouter() | |
# Password hashing | |
# Using a fixed configuration to ensure consistent hash lengths | |
pwd_context = CryptContext( | |
schemes=["argon2"], | |
argon2__time_cost=3, | |
argon2__memory_cost=65536, | |
argon2__parallelism=4, | |
argon2__salt_len=16, | |
argon2__hash_len=32 | |
) | |
# JWT settings | |
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey") | |
ALGORITHM = "HS256" | |
ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
REFRESH_TOKEN_EXPIRE_DAYS = 7 | |
# OAuth2 scheme | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login") | |
# Models | |
class UserCreate(BaseModel): | |
email: EmailStr | |
password: str | |
class UserResponse(BaseModel): | |
id: int | |
email: str | |
is_admin: bool | |
created_at: str | |
class Token(BaseModel): | |
access_token: str | |
refresh_token: str | |
token_type: str | |
class TokenData(BaseModel): | |
user_id: Optional[Any] = None # Can be int or str | |
# Helper functions | |
def verify_password(plain_password, hashed_password): | |
return pwd_context.verify(plain_password, hashed_password) | |
def get_password_hash(password): | |
return pwd_context.hash(password) | |
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): | |
to_encode = data.copy() | |
if expires_delta: | |
expire = datetime.now(timezone.utc) + expires_delta | |
else: | |
expire = datetime.now(timezone.utc) + timedelta(minutes=15) | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
return encoded_jwt | |
def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None): | |
to_encode = data.copy() | |
if expires_delta: | |
expire = datetime.now(timezone.utc) + expires_delta | |
else: | |
expire = datetime.now(timezone.utc) + timedelta(days=7) | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
return encoded_jwt | |
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)): | |
credentials_exception = HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
operation_id = f"token_validation_{int(time.time())}" | |
logger.info(f"[{operation_id}] Validating token: {token[:10]}... (length: {len(token)})") | |
logger.info(f"[{operation_id}] Using SECRET_KEY: {SECRET_KEY[:5]}... (length: {len(SECRET_KEY)})") | |
try: | |
logger.info(f"[{operation_id}] Attempting to decode token") | |
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
logger.info(f"[{operation_id}] Token decoded successfully. Payload: {payload}") | |
user_id = payload.get("sub") | |
logger.info(f"[{operation_id}] Extracted user_id from token: {user_id} (type: {type(user_id).__name__})") | |
if user_id is None: | |
logger.error(f"[{operation_id}] No user_id (sub) found in token payload") | |
raise credentials_exception | |
# Ensure user_id is an integer for database queries | |
user_id_int = None | |
# Check if user_id is a string that can be converted to int | |
if isinstance(user_id, str): | |
if user_id.isdigit(): | |
user_id_int = int(user_id) | |
logger.info(f"[{operation_id}] Converted string user_id to int: {user_id_int}") | |
else: | |
# Try to handle non-numeric strings (like email addresses) | |
logger.warning(f"[{operation_id}] User ID is non-numeric string: {user_id}") | |
# If it looks like an email, try to find the user by email | |
if '@' in user_id: | |
logger.info(f"[{operation_id}] User ID appears to be an email, will try to find user by email") | |
# We'll handle this special case later when querying the database | |
elif isinstance(user_id, int): | |
user_id_int = user_id | |
logger.info(f"[{operation_id}] User ID is already an integer: {user_id_int}") | |
else: | |
logger.warning(f"[{operation_id}] User ID has unexpected type: {type(user_id).__name__}") | |
# Store both the original user_id and the integer version | |
token_data = TokenData(user_id=user_id_int if user_id_int is not None else user_id) | |
logger.info(f"[{operation_id}] Created TokenData with user_id: {token_data.user_id}") | |
except JWTError as e: | |
logger.error(f"[{operation_id}] JWT decode error: {str(e)}") | |
raise credentials_exception | |
# Use the HTTP API utility to get the user | |
operation_id = f"get_user_{int(time.time())}" | |
logger.info(f"[{operation_id}] Attempting to retrieve user with ID: {token_data.user_id}") | |
try: | |
# Determine if we're looking up by ID or email | |
is_email_lookup = isinstance(token_data.user_id, str) and '@' in token_data.user_id | |
user_id_for_query = token_data.user_id | |
logger.info(f"[{operation_id}] Query type: {'Email lookup' if is_email_lookup else 'ID lookup'}") | |
# First try with the app's db_conn if it exists and is not HTTP API | |
if hasattr(request.app, "db_conn") and getattr(request.app, "db_type", "") != "http-api": | |
logger.info(f"[{operation_id}] Using app.db_conn to get user") | |
if is_email_lookup: | |
query = "SELECT * FROM users WHERE email = ?" | |
logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)") | |
user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone() | |
else: | |
query = "SELECT * FROM users WHERE id = ?" | |
logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)") | |
user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone() | |
if user is not None: | |
logger.info(f"[{operation_id}] User found in database using db_conn: {user[0]}, {user[1]}") | |
return user | |
else: | |
logger.warning(f"[{operation_id}] User not found in database using db_conn") | |
# If that fails or is not available, use the HTTP API | |
logger.info(f"[{operation_id}] Using HTTP API to get user") | |
# Log database connection details | |
http_url, auth_token = db_http.get_http_url() | |
logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})") | |
logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}") | |
# Try to get all users first to debug | |
try: | |
all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users") | |
logger.info(f"[{operation_id}] Found {len(all_users)} users in database") | |
for u in all_users: | |
logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error getting all users: {str(e)}") | |
# Now try to get the specific user | |
user = None | |
if is_email_lookup: | |
# Look up by email | |
logger.info(f"[{operation_id}] Looking up user by email: {user_id_for_query}") | |
try: | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": user_id_for_query}], | |
limit=1, | |
operation_id=operation_id | |
) | |
if users and len(users) > 0: | |
user = users[0] | |
logger.info(f"[{operation_id}] User found by email: {user.get('id')}, {user.get('email')}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error looking up user by email: {str(e)}") | |
else: | |
# Look up by ID | |
logger.info(f"[{operation_id}] Looking up user by ID: {user_id_for_query}") | |
try: | |
user = db_http.get_record_by_id("users", user_id_for_query, operation_id=operation_id) | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error looking up user by ID: {str(e)}") | |
if user is None: | |
logger.error(f"[{operation_id}] User not found with ID: {token_data.user_id}") | |
# Try a direct query as a last resort | |
try: | |
logger.info(f"[{operation_id}] Trying direct query as last resort") | |
result = db_http.execute_query( | |
"SELECT * FROM users WHERE id = ? LIMIT 1", | |
[{"type": "integer", "value": str(token_data.user_id)}], | |
operation_id=f"{operation_id}_direct" | |
) | |
logger.info(f"[{operation_id}] Direct query result: {result}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Direct query failed: {str(e)}") | |
raise credentials_exception | |
logger.info(f"[{operation_id}] User found with ID: {token_data.user_id}, Email: {user.get('email')}") | |
# If user is a dict (from HTTP API), convert it to a tuple-like structure | |
# to maintain compatibility with existing code | |
if isinstance(user, dict): | |
user_tuple = ( | |
user.get("id"), | |
user.get("email"), | |
user.get("hashed_password"), | |
user.get("created_at"), | |
user.get("last_login"), | |
user.get("is_admin", 0) | |
) | |
logger.info(f"[{operation_id}] Converted user dict to tuple: {user_tuple[0]}, {user_tuple[1]}") | |
return user_tuple | |
return user | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error getting user: {str(e)}") | |
logger.error(f"[{operation_id}] Exception type: {type(e).__name__}") | |
logger.error(f"[{operation_id}] Exception traceback: {traceback.format_exc()}") | |
raise credentials_exception | |
# Routes | |
async def register(user: UserCreate): | |
# Generate a unique identifier for this registration attempt | |
registration_id = f"reg_{int(time.time())}" | |
logger.info(f"[{registration_id}] Starting registration for email: {user.email}") | |
try: | |
# Step 1: Check if user already exists using HTTP API | |
logger.info(f"[{registration_id}] Checking if user already exists using HTTP API") | |
# Check if user exists | |
user_exists = db_http.record_exists( | |
"users", | |
"email = ?", | |
[{"type": "text", "value": user.email}], | |
operation_id=registration_id | |
) | |
if user_exists: | |
logger.warning(f"[{registration_id}] User with email {user.email} already exists") | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail="Email already registered" | |
) | |
logger.info(f"[{registration_id}] User does not exist, proceeding with registration") | |
# Step 2: Hash the password | |
logger.info(f"[{registration_id}] Hashing password") | |
try: | |
# Use a fixed configuration to ensure consistent hash length | |
hashed_password = get_password_hash(user.password) | |
logger.info(f"[{registration_id}] Password hashed successfully, length: {len(hashed_password)}") | |
# Check if we need to fix the database schema | |
if len(hashed_password) != 97: | |
logger.warning(f"[{registration_id}] Password hash length ({len(hashed_password)}) doesn't match expected length (97)") | |
logger.warning(f"[{registration_id}] This might cause issues if there's a CHECK constraint in the database") | |
# Try to fix the database schema by removing the constraint | |
try: | |
# First, check if the users table exists with a simple query | |
db_http.execute_query( | |
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'", | |
operation_id=f"{registration_id}_check_table" | |
) | |
# Create a temporary table without the constraint | |
db_http.execute_query( | |
""" | |
CREATE TABLE IF NOT EXISTS users_temp ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""", | |
operation_id=f"{registration_id}_create_temp" | |
) | |
# Copy data from users to users_temp if users exists | |
try: | |
db_http.execute_query( | |
""" | |
INSERT INTO users_temp (id, email, hashed_password, created_at, last_login, is_admin) | |
SELECT id, email, hashed_password, created_at, last_login, is_admin FROM users | |
""", | |
operation_id=f"{registration_id}_copy_data" | |
) | |
except Exception as e: | |
logger.warning(f"[{registration_id}] Error copying data: {str(e)}") | |
# Drop the original users table | |
db_http.execute_query( | |
"DROP TABLE IF EXISTS users", | |
operation_id=f"{registration_id}_drop_users" | |
) | |
# Rename users_temp to users | |
db_http.execute_query( | |
"ALTER TABLE users_temp RENAME TO users", | |
operation_id=f"{registration_id}_rename_table" | |
) | |
logger.info(f"[{registration_id}] Fixed users table schema") | |
except Exception as e: | |
logger.error(f"[{registration_id}] Error fixing users table: {str(e)}") | |
# Continue with registration anyway | |
except Exception as e: | |
logger.error(f"[{registration_id}] Error hashing password: {str(e)}") | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Error hashing password: {str(e)}" | |
) | |
# Step 3: Insert the new user using HTTP API | |
logger.info(f"[{registration_id}] Inserting new user using HTTP API") | |
# Prepare user data | |
user_data = { | |
"email": user.email, | |
"hashed_password": hashed_password, | |
"is_admin": 0 # Default to non-admin | |
} | |
# Insert the user | |
user_id = db_http.insert_record("users", user_data, operation_id=registration_id) | |
if not user_id: | |
logger.error(f"[{registration_id}] Failed to insert user") | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail="Failed to create user account" | |
) | |
logger.info(f"[{registration_id}] User inserted successfully with ID: {user_id}") | |
# Step 4: Retrieve the full user record | |
logger.info(f"[{registration_id}] Retrieving user record") | |
# Create a default user object with what we know | |
default_user = { | |
"id": user_id or 0, | |
"email": user.email, | |
"is_admin": False, | |
"created_at": datetime.now().isoformat() | |
} | |
# Try to get the full user record | |
try: | |
if user_id: | |
new_user = db_http.get_record_by_id("users", user_id, operation_id=registration_id) | |
if new_user: | |
logger.info(f"[{registration_id}] Found user by ID: {user_id}") | |
return { | |
"id": new_user.get("id", user_id), | |
"email": new_user.get("email", user.email), | |
"is_admin": bool(new_user.get("is_admin", 0)), | |
"created_at": str(new_user.get("created_at", datetime.now().isoformat())) | |
} | |
# If we couldn't get by ID or user_id is None, try by email | |
logger.warning(f"[{registration_id}] User not found after insert, trying by email") | |
try: | |
# Try to get by email as fallback | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": user.email}], | |
limit=1, | |
operation_id=f"{registration_id}_email_lookup" | |
) | |
if users and len(users) > 0: | |
new_user = users[0] | |
user_id = new_user.get("id") | |
logger.info(f"[{registration_id}] Found user by email with ID: {user_id}") | |
return { | |
"id": new_user.get("id", user_id or 0), | |
"email": new_user.get("email", user.email), | |
"is_admin": bool(new_user.get("is_admin", 0)), | |
"created_at": str(new_user.get("created_at", datetime.now().isoformat())) | |
} | |
else: | |
logger.warning(f"[{registration_id}] User not found by email either") | |
except Exception as e: | |
logger.error(f"[{registration_id}] Error finding user by email: {str(e)}") | |
# If all else fails, return the default user object | |
logger.info(f"[{registration_id}] Returning default user object") | |
return default_user | |
except Exception as e: | |
logger.error(f"[{registration_id}] Error retrieving user record: {str(e)}") | |
# Return minimal response with what we know | |
return default_user | |
except Exception as e: | |
logger.error(f"[{registration_id}] Registration failed: {str(e)}") | |
if isinstance(e, HTTPException): | |
raise | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Registration failed: {str(e)}" | |
) | |
async def login( | |
form_data: OAuth2PasswordRequestForm = Depends() | |
): | |
operation_id = f"login_{int(time.time())}" | |
logger.info(f"[{operation_id}] Login attempt for email: {form_data.username}") | |
logger.info(f"[{operation_id}] Password length: {len(form_data.password)}") | |
try: | |
# Log database connection details | |
http_url, auth_token = db_http.get_http_url() | |
logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})") | |
logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}") | |
# Find the user using HTTP API | |
logger.info(f"[{operation_id}] Searching for user with email: {form_data.username}") | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": form_data.username}], | |
limit=1, | |
operation_id=operation_id | |
) | |
logger.info(f"[{operation_id}] Query returned {len(users) if users else 0} users") | |
if not users: | |
logger.warning(f"[{operation_id}] User not found with email: {form_data.username}") | |
# Try to list all users for debugging | |
try: | |
all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users") | |
logger.info(f"[{operation_id}] Found {len(all_users)} users in database") | |
for u in all_users: | |
logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error getting all users: {str(e)}") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Incorrect email or password", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
user = users[0] | |
logger.info(f"[{operation_id}] Found user: ID={user.get('id')}, Email={user.get('email')}") | |
# Log password hash details (safely) | |
hashed_pwd = user.get("hashed_password", "") | |
logger.info(f"[{operation_id}] Stored password hash length: {len(hashed_pwd)}") | |
logger.info(f"[{operation_id}] Stored password hash prefix: {hashed_pwd[:10]}...") | |
# Verify password | |
logger.info(f"[{operation_id}] Verifying password") | |
password_valid = verify_password(form_data.password, hashed_pwd) | |
logger.info(f"[{operation_id}] Password verification result: {password_valid}") | |
if not password_valid: | |
logger.warning(f"[{operation_id}] Invalid password for user: {form_data.username}") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Incorrect email or password", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
logger.info(f"[{operation_id}] Password verified for user ID: {user.get('id')}") | |
# Update last login | |
update_data = { | |
"last_login": datetime.now().isoformat() | |
} | |
logger.info(f"[{operation_id}] Updating last login timestamp") | |
update_success = db_http.update_record( | |
"users", | |
update_data, | |
"id = ?", | |
[{"type": "integer", "value": str(user.get("id"))}], | |
operation_id=operation_id | |
) | |
logger.info(f"[{operation_id}] Last login update result: {update_success}") | |
# Create tokens | |
logger.info(f"[{operation_id}] Creating access token with user ID: {user.get('id')}") | |
# Ensure user ID is an integer and convert to string for JWT | |
user_id = user.get("id") | |
if isinstance(user_id, int): | |
user_id_str = str(user_id) | |
else: | |
# If it's already a string or some other type, convert to string | |
user_id_str = str(user_id) | |
logger.info(f"[{operation_id}] User ID for token: {user_id_str} (type: {type(user_id_str).__name__})") | |
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
access_token = create_access_token( | |
data={"sub": user_id_str}, | |
expires_delta=access_token_expires | |
) | |
logger.info(f"[{operation_id}] Access token created, length: {len(access_token)}") | |
logger.info(f"[{operation_id}] Creating refresh token") | |
refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) | |
refresh_token = create_refresh_token( | |
data={"sub": user_id_str}, | |
expires_delta=refresh_token_expires | |
) | |
logger.info(f"[{operation_id}] Refresh token created, length: {len(refresh_token)}") | |
# Verify token can be decoded (sanity check) | |
try: | |
logger.info(f"[{operation_id}] Verifying access token can be decoded") | |
payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) | |
logger.info(f"[{operation_id}] Token verification successful. Payload: {payload}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Token verification failed: {str(e)}") | |
# Continue anyway since we just created the token | |
logger.info(f"[{operation_id}] Login successful for user ID: {user.get('id')}") | |
return { | |
"access_token": access_token, | |
"refresh_token": refresh_token, | |
"token_type": "bearer" | |
} | |
except Exception as e: | |
if isinstance(e, HTTPException): | |
raise | |
logger.error(f"[{operation_id}] Login failed: {str(e)}") | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Login failed: {str(e)}" | |
) | |
async def refresh_token(refresh_token_str: str): | |
operation_id = f"refresh_{int(time.time())}" | |
logger.info(f"[{operation_id}] Token refresh attempt") | |
try: | |
# Decode and validate the refresh token | |
try: | |
payload = jwt.decode(refresh_token_str, SECRET_KEY, algorithms=[ALGORITHM]) | |
user_id: int = payload.get("sub") | |
if user_id is None: | |
logger.warning(f"[{operation_id}] Missing user ID in token payload") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid refresh token", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
except JWTError as e: | |
logger.warning(f"[{operation_id}] JWT decode error: {str(e)}") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid refresh token", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
logger.info(f"[{operation_id}] Token decoded successfully for user ID: {user_id}") | |
# Check if user exists using HTTP API | |
user = db_http.get_record_by_id("users", user_id, operation_id=operation_id) | |
if not user: | |
logger.warning(f"[{operation_id}] User not found with ID: {user_id}") | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="User not found", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
logger.info(f"[{operation_id}] User found with ID: {user_id}") | |
# Create new tokens | |
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
access_token = create_access_token( | |
data={"sub": user_id}, | |
expires_delta=access_token_expires | |
) | |
refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) | |
new_refresh_token = create_refresh_token( | |
data={"sub": user_id}, | |
expires_delta=refresh_token_expires | |
) | |
logger.info(f"[{operation_id}] Token refresh successful for user ID: {user_id}") | |
return { | |
"access_token": access_token, | |
"refresh_token": new_refresh_token, | |
"token_type": "bearer" | |
} | |
except Exception as e: | |
if isinstance(e, HTTPException): | |
raise | |
logger.error(f"[{operation_id}] Token refresh failed: {str(e)}") | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Token refresh failed: {str(e)}" | |
) | |
async def get_current_user_info(current_user = Depends(get_current_user)): | |
# Handle both tuple (from db_conn) and dict (from HTTP API) formats | |
if isinstance(current_user, dict): | |
return { | |
"id": current_user.get("id"), | |
"email": current_user.get("email"), | |
"is_admin": bool(current_user.get("is_admin", 0)), | |
"created_at": str(current_user.get("created_at", "")) | |
} | |
else: | |
# Assume tuple format from the original implementation | |
return { | |
"id": current_user[0], | |
"email": current_user[1], | |
"is_admin": bool(current_user[5] if current_user[5] is not None else 0), | |
"created_at": str(current_user[3] if current_user[3] is not None else "") | |
} | |
async def test_database(request: Request): | |
""" | |
Test endpoint to verify database operations using the original method. | |
This is for debugging purposes only. | |
""" | |
import logging | |
import time | |
logger = logging.getLogger("auth-server") | |
# Generate a unique test ID | |
test_id = f"test_{int(time.time())}" | |
logger.info(f"[{test_id}] Starting database test") | |
results = { | |
"connection_type": getattr(request.app, "db_type", "unknown"), | |
"connection_object_type": type(request.app.db_conn).__name__, | |
"operations": [] | |
} | |
if hasattr(request.app, "last_successful_connection_method"): | |
results["connection_method"] = request.app.last_successful_connection_method | |
try: | |
# Test 1: Simple SELECT | |
logger.info(f"[{test_id}] Test 1: Simple SELECT") | |
test_query = "SELECT 1 as test" | |
result = request.app.db_conn.execute(test_query).fetchone() | |
results["operations"].append({ | |
"name": "Simple SELECT", | |
"success": result is not None, | |
"result": str(result) if result is not None else None | |
}) | |
logger.info(f"[{test_id}] Test 1 result: {result}") | |
# Test 2: Create a temporary table | |
logger.info(f"[{test_id}] Test 2: Create a temporary table") | |
create_temp_table = """ | |
CREATE TABLE IF NOT EXISTS test_table ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
request.app.db_conn.execute(create_temp_table) | |
request.app.db_conn.commit() | |
results["operations"].append({ | |
"name": "Create temporary table", | |
"success": True | |
}) | |
logger.info(f"[{test_id}] Test 2 completed successfully") | |
# Test 3: Insert into the temporary table | |
logger.info(f"[{test_id}] Test 3: Insert into the temporary table") | |
test_name = f"test_user_{int(time.time())}" | |
insert_query = "INSERT INTO test_table (name) VALUES (?)" | |
cursor = request.app.db_conn.execute(insert_query, (test_name,)) | |
request.app.db_conn.commit() | |
# Check if lastrowid is available | |
last_id = None | |
try: | |
last_id = cursor.lastrowid | |
logger.info(f"[{test_id}] Got lastrowid: {last_id}") | |
except Exception as e: | |
logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}") | |
# Try to get the ID using a query | |
try: | |
id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1" | |
id_result = request.app.db_conn.execute(id_query, (test_name,)).fetchone() | |
if id_result: | |
last_id = id_result[0] | |
logger.info(f"[{test_id}] Got ID from query: {last_id}") | |
except Exception as e2: | |
logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}") | |
results["operations"].append({ | |
"name": "Insert into temporary table", | |
"success": True, | |
"last_id": last_id | |
}) | |
logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}") | |
# Test 4: Select from the temporary table | |
logger.info(f"[{test_id}] Test 4: Select from the temporary table") | |
select_query = "SELECT * FROM test_table WHERE name = ?" | |
result = request.app.db_conn.execute(select_query, (test_name,)).fetchone() | |
results["operations"].append({ | |
"name": "Select from temporary table", | |
"success": result is not None, | |
"result": str(result) if result is not None else None | |
}) | |
logger.info(f"[{test_id}] Test 4 result: {result}") | |
# Test 5: Check if users table exists and has the expected structure | |
logger.info(f"[{test_id}] Test 5: Check users table structure") | |
try: | |
table_info = request.app.db_conn.execute("PRAGMA table_info(users)").fetchall() | |
results["operations"].append({ | |
"name": "Check users table structure", | |
"success": len(table_info) > 0, | |
"columns": [col[1] for col in table_info] if table_info else [] | |
}) | |
logger.info(f"[{test_id}] Test 5 result: {table_info}") | |
except Exception as e: | |
logger.error(f"[{test_id}] Error checking users table structure: {str(e)}") | |
results["operations"].append({ | |
"name": "Check users table structure", | |
"success": False, | |
"error": str(e) | |
}) | |
# Test 6: Create a specific test user ([email protected]) | |
logger.info(f"[{test_id}] Test 6: Create specific test user ([email protected])") | |
try: | |
# First check if the test user already exists | |
check_query = "SELECT id FROM users WHERE email = ?" | |
existing_user = request.app.db_conn.execute(check_query, ("[email protected]",)).fetchone() | |
if existing_user: | |
logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}") | |
results["operations"].append({ | |
"name": "Create specific test user", | |
"success": True, | |
"user_id": existing_user[0], | |
"status": "already_exists" | |
}) | |
else: | |
# Hash a test password | |
test_password = "TestPassword123!" | |
hashed_password = get_password_hash(test_password) | |
# Insert the test user using the approach from turso_libsql_only.py | |
logger.info(f"[{test_id}] Inserting test user with email: [email protected]") | |
# Use a transaction-like approach | |
try: | |
# Insert the user | |
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" | |
cursor = request.app.db_conn.execute(insert_query, ("[email protected]", hashed_password)) | |
# Commit immediately | |
request.app.db_conn.commit() | |
logger.info(f"[{test_id}] Committed test user insert") | |
# Try to get the user ID | |
user_id = None | |
try: | |
user_id = cursor.lastrowid | |
logger.info(f"[{test_id}] Got test user lastrowid: {user_id}") | |
except Exception as e: | |
logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}") | |
# Verify the insert with a separate query | |
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" | |
verify_result = request.app.db_conn.execute(verify_query, ("[email protected]",)).fetchone() | |
if verify_result: | |
user_id = verify_result[0] | |
logger.info(f"[{test_id}] Verified test user with ID: {user_id}") | |
results["operations"].append({ | |
"name": "Create specific test user", | |
"success": True, | |
"user_id": user_id, | |
"user_data": { | |
"id": verify_result[0], | |
"email": verify_result[1], | |
"created_at": verify_result[2] if len(verify_result) > 2 else None | |
} | |
}) | |
else: | |
logger.error(f"[{test_id}] Failed to verify test user after insert") | |
results["operations"].append({ | |
"name": "Create specific test user", | |
"success": False, | |
"error": "Failed to verify user after insert" | |
}) | |
except Exception as e: | |
logger.error(f"[{test_id}] Error during test user insert: {str(e)}") | |
results["operations"].append({ | |
"name": "Create specific test user", | |
"success": False, | |
"error": str(e) | |
}) | |
except Exception as e: | |
logger.error(f"[{test_id}] Error creating specific test user: {str(e)}") | |
results["operations"].append({ | |
"name": "Create specific test user", | |
"success": False, | |
"error": str(e) | |
}) | |
# Test 7: List all users | |
logger.info(f"[{test_id}] Test 7: List all users") | |
try: | |
all_users = request.app.db_conn.execute("SELECT id, email FROM users").fetchall() | |
results["operations"].append({ | |
"name": "List all users", | |
"success": True, | |
"count": len(all_users) if all_users else 0, | |
"users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else [] | |
}) | |
logger.info(f"[{test_id}] Test 7 result: {all_users}") | |
except Exception as e: | |
logger.error(f"[{test_id}] Error listing all users: {str(e)}") | |
results["operations"].append({ | |
"name": "List all users", | |
"success": False, | |
"error": str(e) | |
}) | |
logger.info(f"[{test_id}] Database test completed successfully") | |
return results | |
except Exception as e: | |
logger.error(f"[{test_id}] Database test failed: {str(e)}") | |
results["operations"].append({ | |
"name": "Error during tests", | |
"success": False, | |
"error": str(e) | |
}) | |
return results | |
async def test_database_http(): | |
""" | |
Test endpoint to verify database operations using the HTTP API method. | |
This is for debugging purposes only. | |
""" | |
operation_id = f"test_db_http_{int(time.time())}" | |
logger.info(f"[{operation_id}] Starting HTTP API database test") | |
results = { | |
"connection_type": "http-api", | |
"operations": [] | |
} | |
try: | |
# Test 1: Simple SELECT | |
logger.info(f"[{operation_id}] Test 1: Simple SELECT") | |
result = db_http.execute_query("SELECT 1 as test", operation_id=f"{operation_id}_1") | |
results["operations"].append({ | |
"name": "Simple SELECT", | |
"success": True, | |
"result": str(result) | |
}) | |
logger.info(f"[{operation_id}] Test 1 result: {result}") | |
# Test 2: Create a temporary table | |
logger.info(f"[{operation_id}] Test 2: Create a temporary table") | |
db_http.create_table_if_not_exists( | |
"test_table_http", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
value TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
""", | |
operation_id=f"{operation_id}_2" | |
) | |
results["operations"].append({ | |
"name": "Create temporary table", | |
"success": True | |
}) | |
logger.info(f"[{operation_id}] Test 2 completed successfully") | |
# Test 3: Insert into the temporary table | |
logger.info(f"[{operation_id}] Test 3: Insert into the temporary table") | |
test_value = f"test_value_http_{int(time.time())}" | |
insert_id = db_http.insert_record( | |
"test_table_http", | |
{"value": test_value}, | |
operation_id=f"{operation_id}_3" | |
) | |
results["operations"].append({ | |
"name": "Insert into temporary table", | |
"success": insert_id is not None, | |
"insert_id": insert_id | |
}) | |
logger.info(f"[{operation_id}] Test 3 completed successfully. Insert ID: {insert_id}") | |
# Test 4: Select from the temporary table | |
logger.info(f"[{operation_id}] Test 4: Select from the temporary table") | |
records = db_http.select_records( | |
"test_table_http", | |
condition="value = ?", | |
condition_params=[{"type": "text", "value": test_value}], | |
limit=1, | |
operation_id=f"{operation_id}_4" | |
) | |
results["operations"].append({ | |
"name": "Select from temporary table", | |
"success": len(records) > 0, | |
"result": str(records[0]) if records else None | |
}) | |
logger.info(f"[{operation_id}] Test 4 result: {records}") | |
# Test 5: Update the record | |
logger.info(f"[{operation_id}] Test 5: Update the record") | |
updated_value = f"{test_value}_updated" | |
success = db_http.update_record( | |
"test_table_http", | |
{"value": updated_value}, | |
"id = ?", | |
[{"type": "integer", "value": str(insert_id)}], | |
operation_id=f"{operation_id}_5" | |
) | |
results["operations"].append({ | |
"name": "Update record", | |
"success": success | |
}) | |
logger.info(f"[{operation_id}] Test 5 completed successfully") | |
# Test 6: Verify the update | |
logger.info(f"[{operation_id}] Test 6: Verify the update") | |
updated_records = db_http.select_records( | |
"test_table_http", | |
condition="id = ?", | |
condition_params=[{"type": "integer", "value": str(insert_id)}], | |
limit=1, | |
operation_id=f"{operation_id}_6" | |
) | |
results["operations"].append({ | |
"name": "Verify update", | |
"success": len(updated_records) > 0 and updated_records[0].get("value") == updated_value, | |
"result": str(updated_records[0]) if updated_records else None | |
}) | |
logger.info(f"[{operation_id}] Test 6 result: {updated_records}") | |
# Test 7: Count records | |
logger.info(f"[{operation_id}] Test 7: Count records") | |
count = db_http.count_records("test_table_http", operation_id=f"{operation_id}_7") | |
results["operations"].append({ | |
"name": "Count records", | |
"success": count > 0, | |
"count": count | |
}) | |
logger.info(f"[{operation_id}] Test 7 result: {count}") | |
# Test 8: List all records | |
logger.info(f"[{operation_id}] Test 8: List all records") | |
all_records = db_http.select_records( | |
"test_table_http", | |
limit=10, | |
operation_id=f"{operation_id}_8" | |
) | |
results["operations"].append({ | |
"name": "List all records", | |
"success": True, | |
"count": len(all_records), | |
"records": all_records | |
}) | |
logger.info(f"[{operation_id}] Test 8 result: {len(all_records)} records") | |
logger.info(f"[{operation_id}] HTTP API database test completed successfully") | |
return results | |
except Exception as e: | |
logger.error(f"[{operation_id}] HTTP API database test failed: {str(e)}") | |
results["operations"].append({ | |
"name": "Error during tests", | |
"success": False, | |
"error": str(e) | |
}) | |
return results | |