from fastapi import APIRouter, HTTPException, Depends, Request, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from passlib.context import CryptContext from jose import JWTError, jwt from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any, List import os import time import logging import traceback from pydantic import BaseModel, EmailStr # Import the HTTP API database utility from app.utils import db_http # Configure logging logger = logging.getLogger("auth-server") router = APIRouter() # Password hashing # Using a fixed configuration to ensure consistent hash lengths pwd_context = CryptContext( schemes=["argon2"], argon2__time_cost=3, argon2__memory_cost=65536, argon2__parallelism=4, argon2__salt_len=16, argon2__hash_len=32 ) # JWT settings SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 # OAuth2 scheme oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login") # Models class UserCreate(BaseModel): email: EmailStr password: str class UserResponse(BaseModel): id: int email: str is_admin: bool created_at: str class Token(BaseModel): access_token: str refresh_token: str token_type: str class TokenData(BaseModel): user_id: Optional[Any] = None # Can be int or str # Helper functions def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(days=7) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) operation_id = f"token_validation_{int(time.time())}" logger.info(f"[{operation_id}] Validating token: {token[:10]}... (length: {len(token)})") logger.info(f"[{operation_id}] Using SECRET_KEY: {SECRET_KEY[:5]}... (length: {len(SECRET_KEY)})") try: logger.info(f"[{operation_id}] Attempting to decode token") payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) logger.info(f"[{operation_id}] Token decoded successfully. Payload: {payload}") user_id = payload.get("sub") logger.info(f"[{operation_id}] Extracted user_id from token: {user_id} (type: {type(user_id).__name__})") if user_id is None: logger.error(f"[{operation_id}] No user_id (sub) found in token payload") raise credentials_exception # Ensure user_id is an integer for database queries user_id_int = None # Check if user_id is a string that can be converted to int if isinstance(user_id, str): if user_id.isdigit(): user_id_int = int(user_id) logger.info(f"[{operation_id}] Converted string user_id to int: {user_id_int}") else: # Try to handle non-numeric strings (like email addresses) logger.warning(f"[{operation_id}] User ID is non-numeric string: {user_id}") # If it looks like an email, try to find the user by email if '@' in user_id: logger.info(f"[{operation_id}] User ID appears to be an email, will try to find user by email") # We'll handle this special case later when querying the database elif isinstance(user_id, int): user_id_int = user_id logger.info(f"[{operation_id}] User ID is already an integer: {user_id_int}") else: logger.warning(f"[{operation_id}] User ID has unexpected type: {type(user_id).__name__}") # Store both the original user_id and the integer version token_data = TokenData(user_id=user_id_int if user_id_int is not None else user_id) logger.info(f"[{operation_id}] Created TokenData with user_id: {token_data.user_id}") except JWTError as e: logger.error(f"[{operation_id}] JWT decode error: {str(e)}") raise credentials_exception # Use the HTTP API utility to get the user operation_id = f"get_user_{int(time.time())}" logger.info(f"[{operation_id}] Attempting to retrieve user with ID: {token_data.user_id}") try: # Determine if we're looking up by ID or email is_email_lookup = isinstance(token_data.user_id, str) and '@' in token_data.user_id user_id_for_query = token_data.user_id logger.info(f"[{operation_id}] Query type: {'Email lookup' if is_email_lookup else 'ID lookup'}") # First try with the app's db_conn if it exists and is not HTTP API if hasattr(request.app, "db_conn") and getattr(request.app, "db_type", "") != "http-api": logger.info(f"[{operation_id}] Using app.db_conn to get user") if is_email_lookup: query = "SELECT * FROM users WHERE email = ?" logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)") user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone() else: query = "SELECT * FROM users WHERE id = ?" logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)") user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone() if user is not None: logger.info(f"[{operation_id}] User found in database using db_conn: {user[0]}, {user[1]}") return user else: logger.warning(f"[{operation_id}] User not found in database using db_conn") # If that fails or is not available, use the HTTP API logger.info(f"[{operation_id}] Using HTTP API to get user") # Log database connection details http_url, auth_token = db_http.get_http_url() logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})") logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}") # Try to get all users first to debug try: all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users") logger.info(f"[{operation_id}] Found {len(all_users)} users in database") for u in all_users: logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}") except Exception as e: logger.error(f"[{operation_id}] Error getting all users: {str(e)}") # Now try to get the specific user user = None if is_email_lookup: # Look up by email logger.info(f"[{operation_id}] Looking up user by email: {user_id_for_query}") try: users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": user_id_for_query}], limit=1, operation_id=operation_id ) if users and len(users) > 0: user = users[0] logger.info(f"[{operation_id}] User found by email: {user.get('id')}, {user.get('email')}") except Exception as e: logger.error(f"[{operation_id}] Error looking up user by email: {str(e)}") else: # Look up by ID logger.info(f"[{operation_id}] Looking up user by ID: {user_id_for_query}") try: user = db_http.get_record_by_id("users", user_id_for_query, operation_id=operation_id) except Exception as e: logger.error(f"[{operation_id}] Error looking up user by ID: {str(e)}") if user is None: logger.error(f"[{operation_id}] User not found with ID: {token_data.user_id}") # Try a direct query as a last resort try: logger.info(f"[{operation_id}] Trying direct query as last resort") result = db_http.execute_query( "SELECT * FROM users WHERE id = ? LIMIT 1", [{"type": "integer", "value": str(token_data.user_id)}], operation_id=f"{operation_id}_direct" ) logger.info(f"[{operation_id}] Direct query result: {result}") except Exception as e: logger.error(f"[{operation_id}] Direct query failed: {str(e)}") raise credentials_exception logger.info(f"[{operation_id}] User found with ID: {token_data.user_id}, Email: {user.get('email')}") # If user is a dict (from HTTP API), convert it to a tuple-like structure # to maintain compatibility with existing code if isinstance(user, dict): user_tuple = ( user.get("id"), user.get("email"), user.get("hashed_password"), user.get("created_at"), user.get("last_login"), user.get("is_admin", 0) ) logger.info(f"[{operation_id}] Converted user dict to tuple: {user_tuple[0]}, {user_tuple[1]}") return user_tuple return user except Exception as e: logger.error(f"[{operation_id}] Error getting user: {str(e)}") logger.error(f"[{operation_id}] Exception type: {type(e).__name__}") logger.error(f"[{operation_id}] Exception traceback: {traceback.format_exc()}") raise credentials_exception # Routes @router.post("/register", response_model=UserResponse) async def register(user: UserCreate): # Generate a unique identifier for this registration attempt registration_id = f"reg_{int(time.time())}" logger.info(f"[{registration_id}] Starting registration for email: {user.email}") try: # Step 1: Check if user already exists using HTTP API logger.info(f"[{registration_id}] Checking if user already exists using HTTP API") # Check if user exists user_exists = db_http.record_exists( "users", "email = ?", [{"type": "text", "value": user.email}], operation_id=registration_id ) if user_exists: logger.warning(f"[{registration_id}] User with email {user.email} already exists") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) logger.info(f"[{registration_id}] User does not exist, proceeding with registration") # Step 2: Hash the password logger.info(f"[{registration_id}] Hashing password") try: # Use a fixed configuration to ensure consistent hash length hashed_password = get_password_hash(user.password) logger.info(f"[{registration_id}] Password hashed successfully, length: {len(hashed_password)}") # Check if we need to fix the database schema if len(hashed_password) != 97: logger.warning(f"[{registration_id}] Password hash length ({len(hashed_password)}) doesn't match expected length (97)") logger.warning(f"[{registration_id}] This might cause issues if there's a CHECK constraint in the database") # Try to fix the database schema by removing the constraint try: # First, check if the users table exists with a simple query db_http.execute_query( "SELECT name FROM sqlite_master WHERE type='table' AND name='users'", operation_id=f"{registration_id}_check_table" ) # Create a temporary table without the constraint db_http.execute_query( """ CREATE TABLE IF NOT EXISTS users_temp ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """, operation_id=f"{registration_id}_create_temp" ) # Copy data from users to users_temp if users exists try: db_http.execute_query( """ INSERT INTO users_temp (id, email, hashed_password, created_at, last_login, is_admin) SELECT id, email, hashed_password, created_at, last_login, is_admin FROM users """, operation_id=f"{registration_id}_copy_data" ) except Exception as e: logger.warning(f"[{registration_id}] Error copying data: {str(e)}") # Drop the original users table db_http.execute_query( "DROP TABLE IF EXISTS users", operation_id=f"{registration_id}_drop_users" ) # Rename users_temp to users db_http.execute_query( "ALTER TABLE users_temp RENAME TO users", operation_id=f"{registration_id}_rename_table" ) logger.info(f"[{registration_id}] Fixed users table schema") except Exception as e: logger.error(f"[{registration_id}] Error fixing users table: {str(e)}") # Continue with registration anyway except Exception as e: logger.error(f"[{registration_id}] Error hashing password: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error hashing password: {str(e)}" ) # Step 3: Insert the new user using HTTP API logger.info(f"[{registration_id}] Inserting new user using HTTP API") # Prepare user data user_data = { "email": user.email, "hashed_password": hashed_password, "is_admin": 0 # Default to non-admin } # Insert the user user_id = db_http.insert_record("users", user_data, operation_id=registration_id) if not user_id: logger.error(f"[{registration_id}] Failed to insert user") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create user account" ) logger.info(f"[{registration_id}] User inserted successfully with ID: {user_id}") # Step 4: Retrieve the full user record logger.info(f"[{registration_id}] Retrieving user record") # Create a default user object with what we know default_user = { "id": user_id or 0, "email": user.email, "is_admin": False, "created_at": datetime.now().isoformat() } # Try to get the full user record try: if user_id: new_user = db_http.get_record_by_id("users", user_id, operation_id=registration_id) if new_user: logger.info(f"[{registration_id}] Found user by ID: {user_id}") return { "id": new_user.get("id", user_id), "email": new_user.get("email", user.email), "is_admin": bool(new_user.get("is_admin", 0)), "created_at": str(new_user.get("created_at", datetime.now().isoformat())) } # If we couldn't get by ID or user_id is None, try by email logger.warning(f"[{registration_id}] User not found after insert, trying by email") try: # Try to get by email as fallback users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": user.email}], limit=1, operation_id=f"{registration_id}_email_lookup" ) if users and len(users) > 0: new_user = users[0] user_id = new_user.get("id") logger.info(f"[{registration_id}] Found user by email with ID: {user_id}") return { "id": new_user.get("id", user_id or 0), "email": new_user.get("email", user.email), "is_admin": bool(new_user.get("is_admin", 0)), "created_at": str(new_user.get("created_at", datetime.now().isoformat())) } else: logger.warning(f"[{registration_id}] User not found by email either") except Exception as e: logger.error(f"[{registration_id}] Error finding user by email: {str(e)}") # If all else fails, return the default user object logger.info(f"[{registration_id}] Returning default user object") return default_user except Exception as e: logger.error(f"[{registration_id}] Error retrieving user record: {str(e)}") # Return minimal response with what we know return default_user except Exception as e: logger.error(f"[{registration_id}] Registration failed: {str(e)}") if isinstance(e, HTTPException): raise raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Registration failed: {str(e)}" ) @router.post("/login", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends() ): operation_id = f"login_{int(time.time())}" logger.info(f"[{operation_id}] Login attempt for email: {form_data.username}") logger.info(f"[{operation_id}] Password length: {len(form_data.password)}") try: # Log database connection details http_url, auth_token = db_http.get_http_url() logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})") logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}") # Find the user using HTTP API logger.info(f"[{operation_id}] Searching for user with email: {form_data.username}") users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": form_data.username}], limit=1, operation_id=operation_id ) logger.info(f"[{operation_id}] Query returned {len(users) if users else 0} users") if not users: logger.warning(f"[{operation_id}] User not found with email: {form_data.username}") # Try to list all users for debugging try: all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users") logger.info(f"[{operation_id}] Found {len(all_users)} users in database") for u in all_users: logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}") except Exception as e: logger.error(f"[{operation_id}] Error getting all users: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) user = users[0] logger.info(f"[{operation_id}] Found user: ID={user.get('id')}, Email={user.get('email')}") # Log password hash details (safely) hashed_pwd = user.get("hashed_password", "") logger.info(f"[{operation_id}] Stored password hash length: {len(hashed_pwd)}") logger.info(f"[{operation_id}] Stored password hash prefix: {hashed_pwd[:10]}...") # Verify password logger.info(f"[{operation_id}] Verifying password") password_valid = verify_password(form_data.password, hashed_pwd) logger.info(f"[{operation_id}] Password verification result: {password_valid}") if not password_valid: logger.warning(f"[{operation_id}] Invalid password for user: {form_data.username}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) logger.info(f"[{operation_id}] Password verified for user ID: {user.get('id')}") # Update last login update_data = { "last_login": datetime.now().isoformat() } logger.info(f"[{operation_id}] Updating last login timestamp") update_success = db_http.update_record( "users", update_data, "id = ?", [{"type": "integer", "value": str(user.get("id"))}], operation_id=operation_id ) logger.info(f"[{operation_id}] Last login update result: {update_success}") # Create tokens logger.info(f"[{operation_id}] Creating access token with user ID: {user.get('id')}") # Ensure user ID is an integer and convert to string for JWT user_id = user.get("id") if isinstance(user_id, int): user_id_str = str(user_id) else: # If it's already a string or some other type, convert to string user_id_str = str(user_id) logger.info(f"[{operation_id}] User ID for token: {user_id_str} (type: {type(user_id_str).__name__})") access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user_id_str}, expires_delta=access_token_expires ) logger.info(f"[{operation_id}] Access token created, length: {len(access_token)}") logger.info(f"[{operation_id}] Creating refresh token") refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) refresh_token = create_refresh_token( data={"sub": user_id_str}, expires_delta=refresh_token_expires ) logger.info(f"[{operation_id}] Refresh token created, length: {len(refresh_token)}") # Verify token can be decoded (sanity check) try: logger.info(f"[{operation_id}] Verifying access token can be decoded") payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) logger.info(f"[{operation_id}] Token verification successful. Payload: {payload}") except Exception as e: logger.error(f"[{operation_id}] Token verification failed: {str(e)}") # Continue anyway since we just created the token logger.info(f"[{operation_id}] Login successful for user ID: {user.get('id')}") return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer" } except Exception as e: if isinstance(e, HTTPException): raise logger.error(f"[{operation_id}] Login failed: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Login failed: {str(e)}" ) @router.post("/refresh", response_model=Token) async def refresh_token(refresh_token_str: str): operation_id = f"refresh_{int(time.time())}" logger.info(f"[{operation_id}] Token refresh attempt") try: # Decode and validate the refresh token try: payload = jwt.decode(refresh_token_str, SECRET_KEY, algorithms=[ALGORITHM]) user_id: int = payload.get("sub") if user_id is None: logger.warning(f"[{operation_id}] Missing user ID in token payload") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) except JWTError as e: logger.warning(f"[{operation_id}] JWT decode error: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) logger.info(f"[{operation_id}] Token decoded successfully for user ID: {user_id}") # Check if user exists using HTTP API user = db_http.get_record_by_id("users", user_id, operation_id=operation_id) if not user: logger.warning(f"[{operation_id}] User not found with ID: {user_id}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) logger.info(f"[{operation_id}] User found with ID: {user_id}") # Create new tokens access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user_id}, expires_delta=access_token_expires ) refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) new_refresh_token = create_refresh_token( data={"sub": user_id}, expires_delta=refresh_token_expires ) logger.info(f"[{operation_id}] Token refresh successful for user ID: {user_id}") return { "access_token": access_token, "refresh_token": new_refresh_token, "token_type": "bearer" } except Exception as e: if isinstance(e, HTTPException): raise logger.error(f"[{operation_id}] Token refresh failed: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Token refresh failed: {str(e)}" ) @router.get("/me", response_model=UserResponse) async def get_current_user_info(current_user = Depends(get_current_user)): # Handle both tuple (from db_conn) and dict (from HTTP API) formats if isinstance(current_user, dict): return { "id": current_user.get("id"), "email": current_user.get("email"), "is_admin": bool(current_user.get("is_admin", 0)), "created_at": str(current_user.get("created_at", "")) } else: # Assume tuple format from the original implementation return { "id": current_user[0], "email": current_user[1], "is_admin": bool(current_user[5] if current_user[5] is not None else 0), "created_at": str(current_user[3] if current_user[3] is not None else "") } @router.get("/test-db") async def test_database(request: Request): """ Test endpoint to verify database operations using the original method. This is for debugging purposes only. """ import logging import time logger = logging.getLogger("auth-server") # Generate a unique test ID test_id = f"test_{int(time.time())}" logger.info(f"[{test_id}] Starting database test") results = { "connection_type": getattr(request.app, "db_type", "unknown"), "connection_object_type": type(request.app.db_conn).__name__, "operations": [] } if hasattr(request.app, "last_successful_connection_method"): results["connection_method"] = request.app.last_successful_connection_method try: # Test 1: Simple SELECT logger.info(f"[{test_id}] Test 1: Simple SELECT") test_query = "SELECT 1 as test" result = request.app.db_conn.execute(test_query).fetchone() results["operations"].append({ "name": "Simple SELECT", "success": result is not None, "result": str(result) if result is not None else None }) logger.info(f"[{test_id}] Test 1 result: {result}") # Test 2: Create a temporary table logger.info(f"[{test_id}] Test 2: Create a temporary table") create_temp_table = """ CREATE TABLE IF NOT EXISTS test_table ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ request.app.db_conn.execute(create_temp_table) request.app.db_conn.commit() results["operations"].append({ "name": "Create temporary table", "success": True }) logger.info(f"[{test_id}] Test 2 completed successfully") # Test 3: Insert into the temporary table logger.info(f"[{test_id}] Test 3: Insert into the temporary table") test_name = f"test_user_{int(time.time())}" insert_query = "INSERT INTO test_table (name) VALUES (?)" cursor = request.app.db_conn.execute(insert_query, (test_name,)) request.app.db_conn.commit() # Check if lastrowid is available last_id = None try: last_id = cursor.lastrowid logger.info(f"[{test_id}] Got lastrowid: {last_id}") except Exception as e: logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}") # Try to get the ID using a query try: id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1" id_result = request.app.db_conn.execute(id_query, (test_name,)).fetchone() if id_result: last_id = id_result[0] logger.info(f"[{test_id}] Got ID from query: {last_id}") except Exception as e2: logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}") results["operations"].append({ "name": "Insert into temporary table", "success": True, "last_id": last_id }) logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}") # Test 4: Select from the temporary table logger.info(f"[{test_id}] Test 4: Select from the temporary table") select_query = "SELECT * FROM test_table WHERE name = ?" result = request.app.db_conn.execute(select_query, (test_name,)).fetchone() results["operations"].append({ "name": "Select from temporary table", "success": result is not None, "result": str(result) if result is not None else None }) logger.info(f"[{test_id}] Test 4 result: {result}") # Test 5: Check if users table exists and has the expected structure logger.info(f"[{test_id}] Test 5: Check users table structure") try: table_info = request.app.db_conn.execute("PRAGMA table_info(users)").fetchall() results["operations"].append({ "name": "Check users table structure", "success": len(table_info) > 0, "columns": [col[1] for col in table_info] if table_info else [] }) logger.info(f"[{test_id}] Test 5 result: {table_info}") except Exception as e: logger.error(f"[{test_id}] Error checking users table structure: {str(e)}") results["operations"].append({ "name": "Check users table structure", "success": False, "error": str(e) }) # Test 6: Create a specific test user (test@seamo.earth) logger.info(f"[{test_id}] Test 6: Create specific test user (test@seamo.earth)") try: # First check if the test user already exists check_query = "SELECT id FROM users WHERE email = ?" existing_user = request.app.db_conn.execute(check_query, ("test@seamo.earth",)).fetchone() if existing_user: logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}") results["operations"].append({ "name": "Create specific test user", "success": True, "user_id": existing_user[0], "status": "already_exists" }) else: # Hash a test password test_password = "TestPassword123!" hashed_password = get_password_hash(test_password) # Insert the test user using the approach from turso_libsql_only.py logger.info(f"[{test_id}] Inserting test user with email: test@seamo.earth") # Use a transaction-like approach try: # Insert the user insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" cursor = request.app.db_conn.execute(insert_query, ("test@seamo.earth", hashed_password)) # Commit immediately request.app.db_conn.commit() logger.info(f"[{test_id}] Committed test user insert") # Try to get the user ID user_id = None try: user_id = cursor.lastrowid logger.info(f"[{test_id}] Got test user lastrowid: {user_id}") except Exception as e: logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}") # Verify the insert with a separate query verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" verify_result = request.app.db_conn.execute(verify_query, ("test@seamo.earth",)).fetchone() if verify_result: user_id = verify_result[0] logger.info(f"[{test_id}] Verified test user with ID: {user_id}") results["operations"].append({ "name": "Create specific test user", "success": True, "user_id": user_id, "user_data": { "id": verify_result[0], "email": verify_result[1], "created_at": verify_result[2] if len(verify_result) > 2 else None } }) else: logger.error(f"[{test_id}] Failed to verify test user after insert") results["operations"].append({ "name": "Create specific test user", "success": False, "error": "Failed to verify user after insert" }) except Exception as e: logger.error(f"[{test_id}] Error during test user insert: {str(e)}") results["operations"].append({ "name": "Create specific test user", "success": False, "error": str(e) }) except Exception as e: logger.error(f"[{test_id}] Error creating specific test user: {str(e)}") results["operations"].append({ "name": "Create specific test user", "success": False, "error": str(e) }) # Test 7: List all users logger.info(f"[{test_id}] Test 7: List all users") try: all_users = request.app.db_conn.execute("SELECT id, email FROM users").fetchall() results["operations"].append({ "name": "List all users", "success": True, "count": len(all_users) if all_users else 0, "users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else [] }) logger.info(f"[{test_id}] Test 7 result: {all_users}") except Exception as e: logger.error(f"[{test_id}] Error listing all users: {str(e)}") results["operations"].append({ "name": "List all users", "success": False, "error": str(e) }) logger.info(f"[{test_id}] Database test completed successfully") return results except Exception as e: logger.error(f"[{test_id}] Database test failed: {str(e)}") results["operations"].append({ "name": "Error during tests", "success": False, "error": str(e) }) return results @router.get("/test-db-http") async def test_database_http(): """ Test endpoint to verify database operations using the HTTP API method. This is for debugging purposes only. """ operation_id = f"test_db_http_{int(time.time())}" logger.info(f"[{operation_id}] Starting HTTP API database test") results = { "connection_type": "http-api", "operations": [] } try: # Test 1: Simple SELECT logger.info(f"[{operation_id}] Test 1: Simple SELECT") result = db_http.execute_query("SELECT 1 as test", operation_id=f"{operation_id}_1") results["operations"].append({ "name": "Simple SELECT", "success": True, "result": str(result) }) logger.info(f"[{operation_id}] Test 1 result: {result}") # Test 2: Create a temporary table logger.info(f"[{operation_id}] Test 2: Create a temporary table") db_http.create_table_if_not_exists( "test_table_http", """ id INTEGER PRIMARY KEY AUTOINCREMENT, value TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP """, operation_id=f"{operation_id}_2" ) results["operations"].append({ "name": "Create temporary table", "success": True }) logger.info(f"[{operation_id}] Test 2 completed successfully") # Test 3: Insert into the temporary table logger.info(f"[{operation_id}] Test 3: Insert into the temporary table") test_value = f"test_value_http_{int(time.time())}" insert_id = db_http.insert_record( "test_table_http", {"value": test_value}, operation_id=f"{operation_id}_3" ) results["operations"].append({ "name": "Insert into temporary table", "success": insert_id is not None, "insert_id": insert_id }) logger.info(f"[{operation_id}] Test 3 completed successfully. Insert ID: {insert_id}") # Test 4: Select from the temporary table logger.info(f"[{operation_id}] Test 4: Select from the temporary table") records = db_http.select_records( "test_table_http", condition="value = ?", condition_params=[{"type": "text", "value": test_value}], limit=1, operation_id=f"{operation_id}_4" ) results["operations"].append({ "name": "Select from temporary table", "success": len(records) > 0, "result": str(records[0]) if records else None }) logger.info(f"[{operation_id}] Test 4 result: {records}") # Test 5: Update the record logger.info(f"[{operation_id}] Test 5: Update the record") updated_value = f"{test_value}_updated" success = db_http.update_record( "test_table_http", {"value": updated_value}, "id = ?", [{"type": "integer", "value": str(insert_id)}], operation_id=f"{operation_id}_5" ) results["operations"].append({ "name": "Update record", "success": success }) logger.info(f"[{operation_id}] Test 5 completed successfully") # Test 6: Verify the update logger.info(f"[{operation_id}] Test 6: Verify the update") updated_records = db_http.select_records( "test_table_http", condition="id = ?", condition_params=[{"type": "integer", "value": str(insert_id)}], limit=1, operation_id=f"{operation_id}_6" ) results["operations"].append({ "name": "Verify update", "success": len(updated_records) > 0 and updated_records[0].get("value") == updated_value, "result": str(updated_records[0]) if updated_records else None }) logger.info(f"[{operation_id}] Test 6 result: {updated_records}") # Test 7: Count records logger.info(f"[{operation_id}] Test 7: Count records") count = db_http.count_records("test_table_http", operation_id=f"{operation_id}_7") results["operations"].append({ "name": "Count records", "success": count > 0, "count": count }) logger.info(f"[{operation_id}] Test 7 result: {count}") # Test 8: List all records logger.info(f"[{operation_id}] Test 8: List all records") all_records = db_http.select_records( "test_table_http", limit=10, operation_id=f"{operation_id}_8" ) results["operations"].append({ "name": "List all records", "success": True, "count": len(all_records), "records": all_records }) logger.info(f"[{operation_id}] Test 8 result: {len(all_records)} records") logger.info(f"[{operation_id}] HTTP API database test completed successfully") return results except Exception as e: logger.error(f"[{operation_id}] HTTP API database test failed: {str(e)}") results["operations"].append({ "name": "Error during tests", "success": False, "error": str(e) }) return results