auth-server / app /api /routes /auth_router.py
kamau1's picture
Upload 12 files
f3475eb verified
from fastapi import APIRouter, HTTPException, Depends, Request, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List
import os
import time
import logging
import traceback
from pydantic import BaseModel, EmailStr
# Import the HTTP API database utility
from app.utils import db_http
# Configure logging
logger = logging.getLogger("auth-server")
router = APIRouter()
# Password hashing
# Using a fixed configuration to ensure consistent hash lengths
pwd_context = CryptContext(
schemes=["argon2"],
argon2__time_cost=3,
argon2__memory_cost=65536,
argon2__parallelism=4,
argon2__salt_len=16,
argon2__hash_len=32
)
# JWT settings
SECRET_KEY = os.getenv("JWT_SECRET", "supersecretkey")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
# Models
class UserCreate(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: str
is_admin: bool
created_at: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class TokenData(BaseModel):
user_id: Optional[Any] = None # Can be int or str
# Helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(days=7)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
operation_id = f"token_validation_{int(time.time())}"
logger.info(f"[{operation_id}] Validating token: {token[:10]}... (length: {len(token)})")
logger.info(f"[{operation_id}] Using SECRET_KEY: {SECRET_KEY[:5]}... (length: {len(SECRET_KEY)})")
try:
logger.info(f"[{operation_id}] Attempting to decode token")
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
logger.info(f"[{operation_id}] Token decoded successfully. Payload: {payload}")
user_id = payload.get("sub")
logger.info(f"[{operation_id}] Extracted user_id from token: {user_id} (type: {type(user_id).__name__})")
if user_id is None:
logger.error(f"[{operation_id}] No user_id (sub) found in token payload")
raise credentials_exception
# Ensure user_id is an integer for database queries
user_id_int = None
# Check if user_id is a string that can be converted to int
if isinstance(user_id, str):
if user_id.isdigit():
user_id_int = int(user_id)
logger.info(f"[{operation_id}] Converted string user_id to int: {user_id_int}")
else:
# Try to handle non-numeric strings (like email addresses)
logger.warning(f"[{operation_id}] User ID is non-numeric string: {user_id}")
# If it looks like an email, try to find the user by email
if '@' in user_id:
logger.info(f"[{operation_id}] User ID appears to be an email, will try to find user by email")
# We'll handle this special case later when querying the database
elif isinstance(user_id, int):
user_id_int = user_id
logger.info(f"[{operation_id}] User ID is already an integer: {user_id_int}")
else:
logger.warning(f"[{operation_id}] User ID has unexpected type: {type(user_id).__name__}")
# Store both the original user_id and the integer version
token_data = TokenData(user_id=user_id_int if user_id_int is not None else user_id)
logger.info(f"[{operation_id}] Created TokenData with user_id: {token_data.user_id}")
except JWTError as e:
logger.error(f"[{operation_id}] JWT decode error: {str(e)}")
raise credentials_exception
# Use the HTTP API utility to get the user
operation_id = f"get_user_{int(time.time())}"
logger.info(f"[{operation_id}] Attempting to retrieve user with ID: {token_data.user_id}")
try:
# Determine if we're looking up by ID or email
is_email_lookup = isinstance(token_data.user_id, str) and '@' in token_data.user_id
user_id_for_query = token_data.user_id
logger.info(f"[{operation_id}] Query type: {'Email lookup' if is_email_lookup else 'ID lookup'}")
# First try with the app's db_conn if it exists and is not HTTP API
if hasattr(request.app, "db_conn") and getattr(request.app, "db_type", "") != "http-api":
logger.info(f"[{operation_id}] Using app.db_conn to get user")
if is_email_lookup:
query = "SELECT * FROM users WHERE email = ?"
logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)")
user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone()
else:
query = "SELECT * FROM users WHERE id = ?"
logger.info(f"[{operation_id}] Executing query: {query} with params: ({user_id_for_query},)")
user = request.app.db_conn.execute(query, (user_id_for_query,)).fetchone()
if user is not None:
logger.info(f"[{operation_id}] User found in database using db_conn: {user[0]}, {user[1]}")
return user
else:
logger.warning(f"[{operation_id}] User not found in database using db_conn")
# If that fails or is not available, use the HTTP API
logger.info(f"[{operation_id}] Using HTTP API to get user")
# Log database connection details
http_url, auth_token = db_http.get_http_url()
logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})")
logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}")
# Try to get all users first to debug
try:
all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users")
logger.info(f"[{operation_id}] Found {len(all_users)} users in database")
for u in all_users:
logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}")
except Exception as e:
logger.error(f"[{operation_id}] Error getting all users: {str(e)}")
# Now try to get the specific user
user = None
if is_email_lookup:
# Look up by email
logger.info(f"[{operation_id}] Looking up user by email: {user_id_for_query}")
try:
users = db_http.select_records(
"users",
condition="email = ?",
condition_params=[{"type": "text", "value": user_id_for_query}],
limit=1,
operation_id=operation_id
)
if users and len(users) > 0:
user = users[0]
logger.info(f"[{operation_id}] User found by email: {user.get('id')}, {user.get('email')}")
except Exception as e:
logger.error(f"[{operation_id}] Error looking up user by email: {str(e)}")
else:
# Look up by ID
logger.info(f"[{operation_id}] Looking up user by ID: {user_id_for_query}")
try:
user = db_http.get_record_by_id("users", user_id_for_query, operation_id=operation_id)
except Exception as e:
logger.error(f"[{operation_id}] Error looking up user by ID: {str(e)}")
if user is None:
logger.error(f"[{operation_id}] User not found with ID: {token_data.user_id}")
# Try a direct query as a last resort
try:
logger.info(f"[{operation_id}] Trying direct query as last resort")
result = db_http.execute_query(
"SELECT * FROM users WHERE id = ? LIMIT 1",
[{"type": "integer", "value": str(token_data.user_id)}],
operation_id=f"{operation_id}_direct"
)
logger.info(f"[{operation_id}] Direct query result: {result}")
except Exception as e:
logger.error(f"[{operation_id}] Direct query failed: {str(e)}")
raise credentials_exception
logger.info(f"[{operation_id}] User found with ID: {token_data.user_id}, Email: {user.get('email')}")
# If user is a dict (from HTTP API), convert it to a tuple-like structure
# to maintain compatibility with existing code
if isinstance(user, dict):
user_tuple = (
user.get("id"),
user.get("email"),
user.get("hashed_password"),
user.get("created_at"),
user.get("last_login"),
user.get("is_admin", 0)
)
logger.info(f"[{operation_id}] Converted user dict to tuple: {user_tuple[0]}, {user_tuple[1]}")
return user_tuple
return user
except Exception as e:
logger.error(f"[{operation_id}] Error getting user: {str(e)}")
logger.error(f"[{operation_id}] Exception type: {type(e).__name__}")
logger.error(f"[{operation_id}] Exception traceback: {traceback.format_exc()}")
raise credentials_exception
# Routes
@router.post("/register", response_model=UserResponse)
async def register(user: UserCreate):
# Generate a unique identifier for this registration attempt
registration_id = f"reg_{int(time.time())}"
logger.info(f"[{registration_id}] Starting registration for email: {user.email}")
try:
# Step 1: Check if user already exists using HTTP API
logger.info(f"[{registration_id}] Checking if user already exists using HTTP API")
# Check if user exists
user_exists = db_http.record_exists(
"users",
"email = ?",
[{"type": "text", "value": user.email}],
operation_id=registration_id
)
if user_exists:
logger.warning(f"[{registration_id}] User with email {user.email} already exists")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
logger.info(f"[{registration_id}] User does not exist, proceeding with registration")
# Step 2: Hash the password
logger.info(f"[{registration_id}] Hashing password")
try:
# Use a fixed configuration to ensure consistent hash length
hashed_password = get_password_hash(user.password)
logger.info(f"[{registration_id}] Password hashed successfully, length: {len(hashed_password)}")
# Check if we need to fix the database schema
if len(hashed_password) != 97:
logger.warning(f"[{registration_id}] Password hash length ({len(hashed_password)}) doesn't match expected length (97)")
logger.warning(f"[{registration_id}] This might cause issues if there's a CHECK constraint in the database")
# Try to fix the database schema by removing the constraint
try:
# First, check if the users table exists with a simple query
db_http.execute_query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='users'",
operation_id=f"{registration_id}_check_table"
)
# Create a temporary table without the constraint
db_http.execute_query(
"""
CREATE TABLE IF NOT EXISTS users_temp (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
hashed_password TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login DATETIME,
is_admin INTEGER DEFAULT 0
)
""",
operation_id=f"{registration_id}_create_temp"
)
# Copy data from users to users_temp if users exists
try:
db_http.execute_query(
"""
INSERT INTO users_temp (id, email, hashed_password, created_at, last_login, is_admin)
SELECT id, email, hashed_password, created_at, last_login, is_admin FROM users
""",
operation_id=f"{registration_id}_copy_data"
)
except Exception as e:
logger.warning(f"[{registration_id}] Error copying data: {str(e)}")
# Drop the original users table
db_http.execute_query(
"DROP TABLE IF EXISTS users",
operation_id=f"{registration_id}_drop_users"
)
# Rename users_temp to users
db_http.execute_query(
"ALTER TABLE users_temp RENAME TO users",
operation_id=f"{registration_id}_rename_table"
)
logger.info(f"[{registration_id}] Fixed users table schema")
except Exception as e:
logger.error(f"[{registration_id}] Error fixing users table: {str(e)}")
# Continue with registration anyway
except Exception as e:
logger.error(f"[{registration_id}] Error hashing password: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error hashing password: {str(e)}"
)
# Step 3: Insert the new user using HTTP API
logger.info(f"[{registration_id}] Inserting new user using HTTP API")
# Prepare user data
user_data = {
"email": user.email,
"hashed_password": hashed_password,
"is_admin": 0 # Default to non-admin
}
# Insert the user
user_id = db_http.insert_record("users", user_data, operation_id=registration_id)
if not user_id:
logger.error(f"[{registration_id}] Failed to insert user")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user account"
)
logger.info(f"[{registration_id}] User inserted successfully with ID: {user_id}")
# Step 4: Retrieve the full user record
logger.info(f"[{registration_id}] Retrieving user record")
# Create a default user object with what we know
default_user = {
"id": user_id or 0,
"email": user.email,
"is_admin": False,
"created_at": datetime.now().isoformat()
}
# Try to get the full user record
try:
if user_id:
new_user = db_http.get_record_by_id("users", user_id, operation_id=registration_id)
if new_user:
logger.info(f"[{registration_id}] Found user by ID: {user_id}")
return {
"id": new_user.get("id", user_id),
"email": new_user.get("email", user.email),
"is_admin": bool(new_user.get("is_admin", 0)),
"created_at": str(new_user.get("created_at", datetime.now().isoformat()))
}
# If we couldn't get by ID or user_id is None, try by email
logger.warning(f"[{registration_id}] User not found after insert, trying by email")
try:
# Try to get by email as fallback
users = db_http.select_records(
"users",
condition="email = ?",
condition_params=[{"type": "text", "value": user.email}],
limit=1,
operation_id=f"{registration_id}_email_lookup"
)
if users and len(users) > 0:
new_user = users[0]
user_id = new_user.get("id")
logger.info(f"[{registration_id}] Found user by email with ID: {user_id}")
return {
"id": new_user.get("id", user_id or 0),
"email": new_user.get("email", user.email),
"is_admin": bool(new_user.get("is_admin", 0)),
"created_at": str(new_user.get("created_at", datetime.now().isoformat()))
}
else:
logger.warning(f"[{registration_id}] User not found by email either")
except Exception as e:
logger.error(f"[{registration_id}] Error finding user by email: {str(e)}")
# If all else fails, return the default user object
logger.info(f"[{registration_id}] Returning default user object")
return default_user
except Exception as e:
logger.error(f"[{registration_id}] Error retrieving user record: {str(e)}")
# Return minimal response with what we know
return default_user
except Exception as e:
logger.error(f"[{registration_id}] Registration failed: {str(e)}")
if isinstance(e, HTTPException):
raise
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Registration failed: {str(e)}"
)
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends()
):
operation_id = f"login_{int(time.time())}"
logger.info(f"[{operation_id}] Login attempt for email: {form_data.username}")
logger.info(f"[{operation_id}] Password length: {len(form_data.password)}")
try:
# Log database connection details
http_url, auth_token = db_http.get_http_url()
logger.info(f"[{operation_id}] Database URL: {http_url[:20]}... (length: {len(http_url)})")
logger.info(f"[{operation_id}] Auth token available: {bool(auth_token)}")
# Find the user using HTTP API
logger.info(f"[{operation_id}] Searching for user with email: {form_data.username}")
users = db_http.select_records(
"users",
condition="email = ?",
condition_params=[{"type": "text", "value": form_data.username}],
limit=1,
operation_id=operation_id
)
logger.info(f"[{operation_id}] Query returned {len(users) if users else 0} users")
if not users:
logger.warning(f"[{operation_id}] User not found with email: {form_data.username}")
# Try to list all users for debugging
try:
all_users = db_http.select_records("users", limit=5, operation_id=f"{operation_id}_all_users")
logger.info(f"[{operation_id}] Found {len(all_users)} users in database")
for u in all_users:
logger.info(f"[{operation_id}] User in DB: ID={u.get('id')}, Email={u.get('email')}")
except Exception as e:
logger.error(f"[{operation_id}] Error getting all users: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
user = users[0]
logger.info(f"[{operation_id}] Found user: ID={user.get('id')}, Email={user.get('email')}")
# Log password hash details (safely)
hashed_pwd = user.get("hashed_password", "")
logger.info(f"[{operation_id}] Stored password hash length: {len(hashed_pwd)}")
logger.info(f"[{operation_id}] Stored password hash prefix: {hashed_pwd[:10]}...")
# Verify password
logger.info(f"[{operation_id}] Verifying password")
password_valid = verify_password(form_data.password, hashed_pwd)
logger.info(f"[{operation_id}] Password verification result: {password_valid}")
if not password_valid:
logger.warning(f"[{operation_id}] Invalid password for user: {form_data.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
logger.info(f"[{operation_id}] Password verified for user ID: {user.get('id')}")
# Update last login
update_data = {
"last_login": datetime.now().isoformat()
}
logger.info(f"[{operation_id}] Updating last login timestamp")
update_success = db_http.update_record(
"users",
update_data,
"id = ?",
[{"type": "integer", "value": str(user.get("id"))}],
operation_id=operation_id
)
logger.info(f"[{operation_id}] Last login update result: {update_success}")
# Create tokens
logger.info(f"[{operation_id}] Creating access token with user ID: {user.get('id')}")
# Ensure user ID is an integer and convert to string for JWT
user_id = user.get("id")
if isinstance(user_id, int):
user_id_str = str(user_id)
else:
# If it's already a string or some other type, convert to string
user_id_str = str(user_id)
logger.info(f"[{operation_id}] User ID for token: {user_id_str} (type: {type(user_id_str).__name__})")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user_id_str},
expires_delta=access_token_expires
)
logger.info(f"[{operation_id}] Access token created, length: {len(access_token)}")
logger.info(f"[{operation_id}] Creating refresh token")
refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
refresh_token = create_refresh_token(
data={"sub": user_id_str},
expires_delta=refresh_token_expires
)
logger.info(f"[{operation_id}] Refresh token created, length: {len(refresh_token)}")
# Verify token can be decoded (sanity check)
try:
logger.info(f"[{operation_id}] Verifying access token can be decoded")
payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM])
logger.info(f"[{operation_id}] Token verification successful. Payload: {payload}")
except Exception as e:
logger.error(f"[{operation_id}] Token verification failed: {str(e)}")
# Continue anyway since we just created the token
logger.info(f"[{operation_id}] Login successful for user ID: {user.get('id')}")
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
except Exception as e:
if isinstance(e, HTTPException):
raise
logger.error(f"[{operation_id}] Login failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Login failed: {str(e)}"
)
@router.post("/refresh", response_model=Token)
async def refresh_token(refresh_token_str: str):
operation_id = f"refresh_{int(time.time())}"
logger.info(f"[{operation_id}] Token refresh attempt")
try:
# Decode and validate the refresh token
try:
payload = jwt.decode(refresh_token_str, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
logger.warning(f"[{operation_id}] Missing user ID in token payload")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError as e:
logger.warning(f"[{operation_id}] JWT decode error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
logger.info(f"[{operation_id}] Token decoded successfully for user ID: {user_id}")
# Check if user exists using HTTP API
user = db_http.get_record_by_id("users", user_id, operation_id=operation_id)
if not user:
logger.warning(f"[{operation_id}] User not found with ID: {user_id}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
logger.info(f"[{operation_id}] User found with ID: {user_id}")
# Create new tokens
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user_id},
expires_delta=access_token_expires
)
refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
new_refresh_token = create_refresh_token(
data={"sub": user_id},
expires_delta=refresh_token_expires
)
logger.info(f"[{operation_id}] Token refresh successful for user ID: {user_id}")
return {
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
}
except Exception as e:
if isinstance(e, HTTPException):
raise
logger.error(f"[{operation_id}] Token refresh failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Token refresh failed: {str(e)}"
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user = Depends(get_current_user)):
# Handle both tuple (from db_conn) and dict (from HTTP API) formats
if isinstance(current_user, dict):
return {
"id": current_user.get("id"),
"email": current_user.get("email"),
"is_admin": bool(current_user.get("is_admin", 0)),
"created_at": str(current_user.get("created_at", ""))
}
else:
# Assume tuple format from the original implementation
return {
"id": current_user[0],
"email": current_user[1],
"is_admin": bool(current_user[5] if current_user[5] is not None else 0),
"created_at": str(current_user[3] if current_user[3] is not None else "")
}
@router.get("/test-db")
async def test_database(request: Request):
"""
Test endpoint to verify database operations using the original method.
This is for debugging purposes only.
"""
import logging
import time
logger = logging.getLogger("auth-server")
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Starting database test")
results = {
"connection_type": getattr(request.app, "db_type", "unknown"),
"connection_object_type": type(request.app.db_conn).__name__,
"operations": []
}
if hasattr(request.app, "last_successful_connection_method"):
results["connection_method"] = request.app.last_successful_connection_method
try:
# Test 1: Simple SELECT
logger.info(f"[{test_id}] Test 1: Simple SELECT")
test_query = "SELECT 1 as test"
result = request.app.db_conn.execute(test_query).fetchone()
results["operations"].append({
"name": "Simple SELECT",
"success": result is not None,
"result": str(result) if result is not None else None
})
logger.info(f"[{test_id}] Test 1 result: {result}")
# Test 2: Create a temporary table
logger.info(f"[{test_id}] Test 2: Create a temporary table")
create_temp_table = """
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
request.app.db_conn.execute(create_temp_table)
request.app.db_conn.commit()
results["operations"].append({
"name": "Create temporary table",
"success": True
})
logger.info(f"[{test_id}] Test 2 completed successfully")
# Test 3: Insert into the temporary table
logger.info(f"[{test_id}] Test 3: Insert into the temporary table")
test_name = f"test_user_{int(time.time())}"
insert_query = "INSERT INTO test_table (name) VALUES (?)"
cursor = request.app.db_conn.execute(insert_query, (test_name,))
request.app.db_conn.commit()
# Check if lastrowid is available
last_id = None
try:
last_id = cursor.lastrowid
logger.info(f"[{test_id}] Got lastrowid: {last_id}")
except Exception as e:
logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}")
# Try to get the ID using a query
try:
id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1"
id_result = request.app.db_conn.execute(id_query, (test_name,)).fetchone()
if id_result:
last_id = id_result[0]
logger.info(f"[{test_id}] Got ID from query: {last_id}")
except Exception as e2:
logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}")
results["operations"].append({
"name": "Insert into temporary table",
"success": True,
"last_id": last_id
})
logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}")
# Test 4: Select from the temporary table
logger.info(f"[{test_id}] Test 4: Select from the temporary table")
select_query = "SELECT * FROM test_table WHERE name = ?"
result = request.app.db_conn.execute(select_query, (test_name,)).fetchone()
results["operations"].append({
"name": "Select from temporary table",
"success": result is not None,
"result": str(result) if result is not None else None
})
logger.info(f"[{test_id}] Test 4 result: {result}")
# Test 5: Check if users table exists and has the expected structure
logger.info(f"[{test_id}] Test 5: Check users table structure")
try:
table_info = request.app.db_conn.execute("PRAGMA table_info(users)").fetchall()
results["operations"].append({
"name": "Check users table structure",
"success": len(table_info) > 0,
"columns": [col[1] for col in table_info] if table_info else []
})
logger.info(f"[{test_id}] Test 5 result: {table_info}")
except Exception as e:
logger.error(f"[{test_id}] Error checking users table structure: {str(e)}")
results["operations"].append({
"name": "Check users table structure",
"success": False,
"error": str(e)
})
# Test 6: Create a specific test user ([email protected])
logger.info(f"[{test_id}] Test 6: Create specific test user ([email protected])")
try:
# First check if the test user already exists
check_query = "SELECT id FROM users WHERE email = ?"
existing_user = request.app.db_conn.execute(check_query, ("[email protected]",)).fetchone()
if existing_user:
logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}")
results["operations"].append({
"name": "Create specific test user",
"success": True,
"user_id": existing_user[0],
"status": "already_exists"
})
else:
# Hash a test password
test_password = "TestPassword123!"
hashed_password = get_password_hash(test_password)
# Insert the test user using the approach from turso_libsql_only.py
logger.info(f"[{test_id}] Inserting test user with email: [email protected]")
# Use a transaction-like approach
try:
# Insert the user
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)"
cursor = request.app.db_conn.execute(insert_query, ("[email protected]", hashed_password))
# Commit immediately
request.app.db_conn.commit()
logger.info(f"[{test_id}] Committed test user insert")
# Try to get the user ID
user_id = None
try:
user_id = cursor.lastrowid
logger.info(f"[{test_id}] Got test user lastrowid: {user_id}")
except Exception as e:
logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}")
# Verify the insert with a separate query
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?"
verify_result = request.app.db_conn.execute(verify_query, ("[email protected]",)).fetchone()
if verify_result:
user_id = verify_result[0]
logger.info(f"[{test_id}] Verified test user with ID: {user_id}")
results["operations"].append({
"name": "Create specific test user",
"success": True,
"user_id": user_id,
"user_data": {
"id": verify_result[0],
"email": verify_result[1],
"created_at": verify_result[2] if len(verify_result) > 2 else None
}
})
else:
logger.error(f"[{test_id}] Failed to verify test user after insert")
results["operations"].append({
"name": "Create specific test user",
"success": False,
"error": "Failed to verify user after insert"
})
except Exception as e:
logger.error(f"[{test_id}] Error during test user insert: {str(e)}")
results["operations"].append({
"name": "Create specific test user",
"success": False,
"error": str(e)
})
except Exception as e:
logger.error(f"[{test_id}] Error creating specific test user: {str(e)}")
results["operations"].append({
"name": "Create specific test user",
"success": False,
"error": str(e)
})
# Test 7: List all users
logger.info(f"[{test_id}] Test 7: List all users")
try:
all_users = request.app.db_conn.execute("SELECT id, email FROM users").fetchall()
results["operations"].append({
"name": "List all users",
"success": True,
"count": len(all_users) if all_users else 0,
"users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else []
})
logger.info(f"[{test_id}] Test 7 result: {all_users}")
except Exception as e:
logger.error(f"[{test_id}] Error listing all users: {str(e)}")
results["operations"].append({
"name": "List all users",
"success": False,
"error": str(e)
})
logger.info(f"[{test_id}] Database test completed successfully")
return results
except Exception as e:
logger.error(f"[{test_id}] Database test failed: {str(e)}")
results["operations"].append({
"name": "Error during tests",
"success": False,
"error": str(e)
})
return results
@router.get("/test-db-http")
async def test_database_http():
"""
Test endpoint to verify database operations using the HTTP API method.
This is for debugging purposes only.
"""
operation_id = f"test_db_http_{int(time.time())}"
logger.info(f"[{operation_id}] Starting HTTP API database test")
results = {
"connection_type": "http-api",
"operations": []
}
try:
# Test 1: Simple SELECT
logger.info(f"[{operation_id}] Test 1: Simple SELECT")
result = db_http.execute_query("SELECT 1 as test", operation_id=f"{operation_id}_1")
results["operations"].append({
"name": "Simple SELECT",
"success": True,
"result": str(result)
})
logger.info(f"[{operation_id}] Test 1 result: {result}")
# Test 2: Create a temporary table
logger.info(f"[{operation_id}] Test 2: Create a temporary table")
db_http.create_table_if_not_exists(
"test_table_http",
"""
id INTEGER PRIMARY KEY AUTOINCREMENT,
value TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
""",
operation_id=f"{operation_id}_2"
)
results["operations"].append({
"name": "Create temporary table",
"success": True
})
logger.info(f"[{operation_id}] Test 2 completed successfully")
# Test 3: Insert into the temporary table
logger.info(f"[{operation_id}] Test 3: Insert into the temporary table")
test_value = f"test_value_http_{int(time.time())}"
insert_id = db_http.insert_record(
"test_table_http",
{"value": test_value},
operation_id=f"{operation_id}_3"
)
results["operations"].append({
"name": "Insert into temporary table",
"success": insert_id is not None,
"insert_id": insert_id
})
logger.info(f"[{operation_id}] Test 3 completed successfully. Insert ID: {insert_id}")
# Test 4: Select from the temporary table
logger.info(f"[{operation_id}] Test 4: Select from the temporary table")
records = db_http.select_records(
"test_table_http",
condition="value = ?",
condition_params=[{"type": "text", "value": test_value}],
limit=1,
operation_id=f"{operation_id}_4"
)
results["operations"].append({
"name": "Select from temporary table",
"success": len(records) > 0,
"result": str(records[0]) if records else None
})
logger.info(f"[{operation_id}] Test 4 result: {records}")
# Test 5: Update the record
logger.info(f"[{operation_id}] Test 5: Update the record")
updated_value = f"{test_value}_updated"
success = db_http.update_record(
"test_table_http",
{"value": updated_value},
"id = ?",
[{"type": "integer", "value": str(insert_id)}],
operation_id=f"{operation_id}_5"
)
results["operations"].append({
"name": "Update record",
"success": success
})
logger.info(f"[{operation_id}] Test 5 completed successfully")
# Test 6: Verify the update
logger.info(f"[{operation_id}] Test 6: Verify the update")
updated_records = db_http.select_records(
"test_table_http",
condition="id = ?",
condition_params=[{"type": "integer", "value": str(insert_id)}],
limit=1,
operation_id=f"{operation_id}_6"
)
results["operations"].append({
"name": "Verify update",
"success": len(updated_records) > 0 and updated_records[0].get("value") == updated_value,
"result": str(updated_records[0]) if updated_records else None
})
logger.info(f"[{operation_id}] Test 6 result: {updated_records}")
# Test 7: Count records
logger.info(f"[{operation_id}] Test 7: Count records")
count = db_http.count_records("test_table_http", operation_id=f"{operation_id}_7")
results["operations"].append({
"name": "Count records",
"success": count > 0,
"count": count
})
logger.info(f"[{operation_id}] Test 7 result: {count}")
# Test 8: List all records
logger.info(f"[{operation_id}] Test 8: List all records")
all_records = db_http.select_records(
"test_table_http",
limit=10,
operation_id=f"{operation_id}_8"
)
results["operations"].append({
"name": "List all records",
"success": True,
"count": len(all_records),
"records": all_records
})
logger.info(f"[{operation_id}] Test 8 result: {len(all_records)} records")
logger.info(f"[{operation_id}] HTTP API database test completed successfully")
return results
except Exception as e:
logger.error(f"[{operation_id}] HTTP API database test failed: {str(e)}")
results["operations"].append({
"name": "Error during tests",
"success": False,
"error": str(e)
})
return results