auth-server / main.py
kamau1's picture
Upload 10 files
20dd904 verified
raw
history blame
38 kB
import os
import sys
import logging
import requests
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.openapi.utils import get_openapi
from fastapi.staticfiles import StaticFiles
import uvicorn
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("auth-server")
# Import database libraries
try:
# First try the recommended libsql-experimental package
import libsql_experimental as libsql
logger.info("Successfully imported libsql-experimental package")
HAS_LIBSQL = True
LIBSQL_TYPE = "experimental"
except ImportError:
try:
# Then try the libsql-client package as fallback
import libsql_client
logger.info("Successfully imported libsql-client package")
HAS_LIBSQL = True
LIBSQL_TYPE = "client"
except ImportError:
logger.error("Failed to import any libsql package. Please install libsql-experimental==0.0.49")
logger.error("Falling back to HTTP API method for database access")
# We'll use requests for HTTP API fallback
import requests
HAS_LIBSQL = False
LIBSQL_TYPE = "http"
# Load environment variables
load_dotenv()
# Create FastAPI app with detailed metadata
app = FastAPI(
title="Seamo Auth Server",
description="""
# Seamo Authentication API
The Seamo Auth Server provides authentication and user management services for the Seamo platform.
## Features
* User registration and authentication
* Project management
* Journal management
* Access control
## Authentication
Most endpoints require authentication. Use the /api/auth/token endpoint to obtain access tokens.
""",
version="1.0.0",
contact={
"name": "Seamo Team",
"url": "https://seamo.earth/contact",
"email": "[email protected]",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
openapi_tags=[
{
"name": "Authentication",
"description": "Operations related to user authentication and token management",
},
{
"name": "Projects",
"description": "Operations related to project management",
},
{
"name": "Journals",
"description": "Operations related to journal management",
},
{
"name": "General",
"description": "General server information and health checks",
},
],
docs_url=None, # Disable default docs
redoc_url=None # Disable default redoc
)
# Configure CORS
origins = [
os.getenv("FRONTEND_URL", "http://localhost:3000"),
"https://seamo.earth",
"https://seamoo.netlify.app",
"https://seamo-ai-ai-server.hf.space",
"https://seamo-ai-auth-server.hf.space",
"https://seamo-ai-scraper-server.hf.space",
"http://localhost:3000", # For local development
"http://localhost:8000", # Local AI server
"http://localhost:8001", # Local Auth server
"http://localhost:8002", # Local Scraper server
]
print(f"CORS origins configured: {origins}")
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database connection
@app.on_event("startup")
async def startup_db_client():
# Get environment variables
db_url = os.getenv("TURSO_DATABASE_URL")
auth_token = os.getenv("TURSO_AUTH_TOKEN")
if not db_url or not auth_token:
error_msg = "Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set."
logger.error(error_msg)
raise Exception(error_msg)
# Clean the auth token to remove any problematic characters
clean_auth_token = auth_token.strip()
# Log connection details (without showing the full token)
token_preview = clean_auth_token[:10] + "..." if len(clean_auth_token) > 10 else "***"
logger.info(f"Connecting to database at URL: {db_url}")
logger.info(f"Using libsql type: {LIBSQL_TYPE}")
logger.info(f"Auth token preview: {token_preview}")
# Ensure the token is properly formatted
if not clean_auth_token.startswith("eyJ"):
logger.warning("Auth token does not appear to be in JWT format (should start with 'eyJ')")
# Try to extract the token if it's wrapped in quotes or has extra characters
if "eyJ" in clean_auth_token:
start_idx = clean_auth_token.find("eyJ")
# Find the end of the token (usually at a quote, space, or newline)
end_markers = ['"', "'", ' ', '\n', '\r']
end_idx = len(clean_auth_token)
for marker in end_markers:
marker_idx = clean_auth_token.find(marker, start_idx)
if marker_idx > start_idx and marker_idx < end_idx:
end_idx = marker_idx
clean_auth_token = clean_auth_token[start_idx:end_idx]
logger.info(f"Extracted JWT token: {clean_auth_token[:10]}...")
# Verify the token has three parts (header.payload.signature)
parts = clean_auth_token.split('.')
if len(parts) != 3:
logger.warning(f"Auth token does not have the expected JWT format (3 parts separated by dots). Found {len(parts)} parts.")
else:
logger.info("Auth token has the expected JWT format")
# Initialize database connection
connected = False
# Method 1: Try with libsql-experimental
if HAS_LIBSQL and LIBSQL_TYPE == "experimental":
try:
logger.info("Connecting with libsql-experimental")
# Try multiple connection methods
# Method 1a: Try with auth_token parameter (works with version 0.0.49)
try:
logger.info("Trying connection with auth_token parameter")
# Log the libsql version
if hasattr(libsql, '__version__'):
logger.info(f"libsql-experimental version: {libsql.__version__}")
# Create the connection
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token)
# Test connection with a simple query
logger.info("Testing connection with SELECT 1")
result = app.db_conn.execute("SELECT 1").fetchone()
logger.info(f"Connection successful with auth_token parameter: {result}")
# Test connection with a more complex query
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS")
app.db_conn.execute("""
CREATE TABLE IF NOT EXISTS connection_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_value TEXT
)
""")
app.db_conn.commit()
# Test insert
logger.info("Testing connection with INSERT")
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value",))
app.db_conn.commit()
# Test select
logger.info("Testing connection with SELECT from test table")
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall()
logger.info(f"Test table contents: {result}")
connected = True
app.db_type = "libsql-experimental"
app.last_successful_connection_method = "auth_token"
logger.info("All connection tests passed successfully")
except Exception as e:
logger.warning(f"Connection with auth_token parameter failed: {str(e)}")
# Method 1b: Try with auth token in URL (works with other versions)
if not connected:
try:
logger.info("Trying connection with auth token in URL")
# Format the URL to include the auth token
if "?" in db_url:
connection_url = f"{db_url}&authToken={clean_auth_token}"
else:
connection_url = f"{db_url}?authToken={clean_auth_token}"
logger.info(f"Using connection URL: {connection_url}")
# Use the direct URL connection method with auth token in URL
app.db_conn = libsql.connect(connection_url)
app.last_successful_connection_url = connection_url
# Test connection with a simple query
logger.info("Testing connection with SELECT 1")
result = app.db_conn.execute("SELECT 1").fetchone()
logger.info(f"Connection successful with auth token in URL: {result}")
# Test connection with a more complex query
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS")
app.db_conn.execute("""
CREATE TABLE IF NOT EXISTS connection_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_value TEXT
)
""")
app.db_conn.commit()
# Test insert
logger.info("Testing connection with INSERT")
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_url",))
app.db_conn.commit()
# Test select
logger.info("Testing connection with SELECT from test table")
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall()
logger.info(f"Test table contents: {result}")
connected = True
app.db_type = "libsql-experimental"
app.last_successful_connection_method = "url"
logger.info("All connection tests passed successfully")
except Exception as e:
logger.error(f"Connection with auth token in URL failed: {str(e)}")
except Exception as e:
logger.error(f"All libsql-experimental connection methods failed: {str(e)}")
# Method 2: Try with libsql-client
if not connected and HAS_LIBSQL and LIBSQL_TYPE == "client":
try:
logger.info("Connecting with libsql-client")
# Convert URL from libsql:// to https://
if db_url.startswith("libsql://"):
http_url = db_url.replace("libsql://", "https://")
else:
http_url = db_url
logger.info(f"Using URL: {http_url}")
# Connect using the client
app.db_conn = libsql_client.create_client_sync(
url=http_url,
auth_token=clean_auth_token
)
# Test connection
result = app.db_conn.execute("SELECT 1").rows()
logger.info(f"Connection test successful: {result}")
connected = True
app.db_type = "libsql-client"
except Exception as e:
logger.error(f"libsql-client connection failed: {str(e)}")
# Method 3: Fallback to HTTP API
if not connected:
try:
logger.info("Falling back to HTTP API method")
# Convert URL from libsql:// to https://
if db_url.startswith("libsql://"):
http_url = db_url.replace("libsql://", "https://")
else:
http_url = db_url
# Ensure the URL doesn't have a trailing slash
http_url = http_url.rstrip('/')
# Verify the URL format
if not http_url.startswith("https://"):
logger.warning(f"HTTP URL does not start with https://: {http_url}")
# Try to fix the URL
if "://" not in http_url:
http_url = f"https://{http_url}"
logger.info(f"Added https:// prefix to URL: {http_url}")
logger.info(f"Using HTTP URL: {http_url}")
# Create a simple HTTP API client class
class TursoHttpClient:
def __init__(self, url, auth_token):
self.url = url
self.auth_token = auth_token
self.headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json"
}
# Add a property to track the last inserted ID
self.last_insert_id = None
def execute(self, query, params=None):
# Format the request according to the v2/pipeline specification
requests_data = []
# Prepare the statement
stmt = {"sql": query}
# Add parameters if provided
if params:
# Convert parameters to the expected format
args = []
for param in params:
if param is None:
args.append({"type": "null", "value": None})
elif isinstance(param, int):
args.append({"type": "integer", "value": str(param)})
elif isinstance(param, float):
args.append({"type": "float", "value": str(param)})
else:
args.append({"type": "text", "value": str(param)})
stmt["args"] = args
requests_data.append({"type": "execute", "stmt": stmt})
# If this is an INSERT, add a query to get the last inserted ID
is_insert = query.strip().upper().startswith("INSERT")
if is_insert:
requests_data.append({
"type": "execute",
"stmt": {"sql": "SELECT last_insert_rowid()"}
})
# Always close the connection at the end
requests_data.append({"type": "close"})
# Prepare the final request payload
data = {"requests": requests_data}
# Use the v2/pipeline endpoint
pipeline_url = f"{self.url}/v2/pipeline"
logger.info(f"Sending request to: {pipeline_url}")
logger.info(f"Headers: Authorization: Bearer {self.auth_token[:5]}... (truncated)")
try:
response = requests.post(pipeline_url, headers=self.headers, json=data, timeout=10)
# Log response status
logger.info(f"Response status: {response.status_code}")
# Check for auth errors specifically
if response.status_code == 401:
logger.error(f"Authentication error (401): {response.text}")
raise Exception(f"Authentication failed: {response.text}")
# Raise for other errors
response.raise_for_status()
# Parse the response
result = response.json()
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {str(e)}")
raise
# Process the response
if "results" in result and len(result["results"]) > 0:
# If this was an INSERT, get the last inserted ID
if is_insert and len(result["results"]) > 1:
try:
last_id_result = result["results"][1]
if "rows" in last_id_result and len(last_id_result["rows"]) > 0:
self.last_insert_id = last_id_result["rows"][0]["values"][0]
logger.info(f"Last inserted ID: {self.last_insert_id}")
except Exception as e:
logger.warning(f"Failed to get last inserted ID: {str(e)}")
# Return a cursor-like object with the main result
cursor = TursoHttpCursor(result["results"][0])
cursor.lastrowid = self.last_insert_id
return cursor
# Return an empty cursor
cursor = TursoHttpCursor(None)
cursor.lastrowid = self.last_insert_id
return cursor
def commit(self):
# HTTP API is stateless, no need to commit
logger.info("HTTP API commit called (no-op)")
pass
def close(self):
# HTTP API is stateless, no need to close
logger.info("HTTP API close called (no-op)")
pass
# Create a cursor-like class for HTTP API
class TursoHttpCursor:
def __init__(self, result):
self.result = result
self.lastrowid = None
def fetchone(self):
if self.result and "rows" in self.result and len(self.result["rows"]) > 0:
return self.result["rows"][0]["values"]
return None
def fetchall(self):
if self.result and "rows" in self.result:
return [row["values"] for row in self.result["rows"]]
return []
# Create the HTTP API client
app.db_conn = TursoHttpClient(http_url, clean_auth_token)
# Test connection with a simple query
logger.info("Testing HTTP API connection with SELECT 1")
result = app.db_conn.execute("SELECT 1").fetchone()
logger.info(f"HTTP API connection test successful: {result}")
# Test connection with a more complex query
logger.info("Testing HTTP API connection with CREATE TABLE IF NOT EXISTS")
app.db_conn.execute("""
CREATE TABLE IF NOT EXISTS connection_test (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_value TEXT
)
""")
app.db_conn.commit()
# Test insert
logger.info("Testing HTTP API connection with INSERT")
cursor = app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_http",))
app.db_conn.commit()
logger.info(f"HTTP API insert test - lastrowid: {cursor.lastrowid}")
# Test select
logger.info("Testing HTTP API connection with SELECT from test table")
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall()
logger.info(f"HTTP API test table contents: {result}")
connected = True
app.db_type = "http-api"
logger.info("All HTTP API connection tests passed successfully")
except Exception as e:
logger.error(f"HTTP API connection failed: {str(e)}")
if not connected:
error_msg = "All database connection methods failed. Please check your credentials and try again."
logger.error(error_msg)
raise Exception(error_msg)
# Create tables if they don't exist
# We'll create each table in a separate try-except block to ensure that if one fails, others can still be created
logger.info("Creating database tables")
# Function to execute SQL with retry
def execute_with_retry(sql, max_retries=3):
for attempt in range(max_retries):
try:
# For each attempt, we'll create a fresh connection if needed
if attempt > 0:
logger.info(f"Retry attempt {attempt+1} for SQL execution")
# Test the connection first
try:
test_result = app.db_conn.execute("SELECT 1").fetchone()
logger.info(f"Connection test successful: {test_result}")
except Exception as conn_err:
logger.warning(f"Connection test failed, reconnecting: {str(conn_err)}")
# Reconnect using the same method that worked initially
if app.db_type == "libsql-experimental":
if hasattr(app, 'last_successful_connection_method') and app.last_successful_connection_method == 'auth_token':
logger.info("Reconnecting with auth_token parameter")
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token)
elif hasattr(app, 'last_successful_connection_url'):
logger.info(f"Reconnecting with URL: {app.last_successful_connection_url}")
app.db_conn = libsql.connect(app.last_successful_connection_url)
# Execute the SQL
app.db_conn.execute(sql)
app.db_conn.commit()
return True
except Exception as e:
logger.warning(f"SQL execution failed (attempt {attempt+1}): {str(e)}")
if attempt == max_retries - 1:
raise
# Small delay before retry
import time
time.sleep(1)
# Simplified table creation with fewer constraints
try:
# Create users table
users_table = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
hashed_password TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login DATETIME,
is_admin INTEGER DEFAULT 0
)
"""
execute_with_retry(users_table)
logger.info("Users table created successfully")
# Create projects table
projects_table = """
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
owner_id INTEGER NOT NULL,
title TEXT NOT NULL,
description TEXT,
geojson TEXT,
storage_bucket TEXT DEFAULT 'default',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
execute_with_retry(projects_table)
logger.info("Projects table created successfully")
# Create journals table
journals_table = """
CREATE TABLE IF NOT EXISTS journals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
execute_with_retry(journals_table)
logger.info("Journals table created successfully")
logger.info("All database tables created successfully")
except Exception as e:
error_msg = f"Failed to create database tables: {str(e)}"
logger.error(error_msg)
# We'll continue even if table creation fails
# This allows the server to start and use existing tables if they exist
logger.warning("Continuing server startup despite table creation errors")
@app.on_event("shutdown")
async def shutdown_db_client():
logger.info("Shutting down database connection")
try:
# Close connection based on the type
if hasattr(app, 'db_type'):
if app.db_type == "libsql-experimental":
try:
app.db_conn.close()
logger.info("libsql-experimental connection closed successfully")
except Exception as e:
logger.warning(f"Error closing libsql-experimental connection: {str(e)}")
elif app.db_type == "libsql-client":
# libsql-client doesn't have a close method
logger.info("No close method needed for libsql-client")
elif app.db_type == "http-api":
# HTTP API is stateless, no need to close
logger.info("No close method needed for HTTP API")
else:
logger.warning(f"Unknown database type: {app.db_type}")
else:
logger.warning("No database connection to close")
except Exception as e:
logger.error(f"Error closing database connection: {str(e)}")
# Custom API documentation routes
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=f"{app.title} - Swagger UI",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js",
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css",
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
)
@app.get("/redoc", include_in_schema=False)
async def redoc_html():
return get_redoc_html(
openapi_url=app.openapi_url,
title=f"{app.title} - ReDoc",
redoc_js_url="https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js",
redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
)
# Serve Swagger UI at root path
@app.get("/", include_in_schema=False)
async def root_swagger():
logger.info("Root endpoint accessed - serving Swagger UI")
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=f"{app.title} - API Documentation",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js",
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css",
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
)
# API information endpoint (moved from root)
@app.get("/api/info", tags=["General"])
async def api_info():
"""
API information endpoint providing details about the Seamo Auth Server API.
Returns:
dict: Basic information about the API and links to documentation.
"""
logger.info("API info endpoint accessed")
return {
"message": "Welcome to Seamo Auth Server API",
"version": "1.0.0",
"documentation": {
"swagger_ui": "/",
"redoc": "/redoc",
"openapi_json": "/openapi.json"
},
"endpoints": {
"health": "/health",
"auth": "/api/auth",
"projects": "/api/projects",
"journals": "/api/journals"
}
}
# Health check endpoint
@app.get("/health", tags=["General"])
async def health_check():
"""
Health check endpoint to verify the server is running properly.
Returns:
dict: Status information about the server.
"""
logger.info("Health check endpoint accessed")
return {
"status": "healthy",
"version": "1.0.0",
"database_connected": hasattr(app, "db_conn"),
"database_type": getattr(app, "db_type", "unknown")
}
# Database test endpoint
@app.get("/test-db", tags=["General"])
async def test_database():
"""
Test endpoint to verify database operations.
This is for debugging purposes only.
"""
import time
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Starting database test")
results = {
"connection_type": getattr(app, "db_type", "unknown"),
"connection_object_type": type(app.db_conn).__name__ if hasattr(app, "db_conn") else "None",
"operations": []
}
if hasattr(app, "last_successful_connection_method"):
results["connection_method"] = app.last_successful_connection_method
if not hasattr(app, "db_conn"):
logger.error(f"[{test_id}] Database connection not available")
results["operations"].append({
"name": "Database connection check",
"success": False,
"error": "Database connection not available"
})
return results
try:
# Test 1: Simple SELECT
logger.info(f"[{test_id}] Test 1: Simple SELECT")
test_query = "SELECT 1 as test"
result = app.db_conn.execute(test_query).fetchone()
results["operations"].append({
"name": "Simple SELECT",
"success": result is not None,
"result": str(result) if result is not None else None
})
logger.info(f"[{test_id}] Test 1 result: {result}")
# Test 2: Create a temporary table
logger.info(f"[{test_id}] Test 2: Create a temporary table")
create_temp_table = """
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
app.db_conn.execute(create_temp_table)
app.db_conn.commit()
results["operations"].append({
"name": "Create temporary table",
"success": True
})
logger.info(f"[{test_id}] Test 2 completed successfully")
# Test 3: Insert into the temporary table
logger.info(f"[{test_id}] Test 3: Insert into the temporary table")
test_name = f"test_user_{int(time.time())}"
insert_query = "INSERT INTO test_table (name) VALUES (?)"
cursor = app.db_conn.execute(insert_query, (test_name,))
app.db_conn.commit()
# Check if lastrowid is available
last_id = None
try:
last_id = cursor.lastrowid
logger.info(f"[{test_id}] Got lastrowid: {last_id}")
except Exception as e:
logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}")
# Try to get the ID using a query
try:
id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1"
id_result = app.db_conn.execute(id_query, (test_name,)).fetchone()
if id_result:
last_id = id_result[0]
logger.info(f"[{test_id}] Got ID from query: {last_id}")
except Exception as e2:
logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}")
results["operations"].append({
"name": "Insert into temporary table",
"success": True,
"last_id": last_id
})
logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}")
# Test 4: Select from the temporary table
logger.info(f"[{test_id}] Test 4: Select from the temporary table")
select_query = "SELECT * FROM test_table WHERE name = ?"
result = app.db_conn.execute(select_query, (test_name,)).fetchone()
results["operations"].append({
"name": "Select from temporary table",
"success": result is not None,
"result": str(result) if result is not None else None
})
logger.info(f"[{test_id}] Test 4 result: {result}")
# Test 5: Check if users table exists and has the expected structure
logger.info(f"[{test_id}] Test 5: Check users table structure")
try:
table_info = app.db_conn.execute("PRAGMA table_info(users)").fetchall()
results["operations"].append({
"name": "Check users table structure",
"success": len(table_info) > 0,
"columns": [col[1] for col in table_info] if table_info else []
})
logger.info(f"[{test_id}] Test 5 result: {table_info}")
except Exception as e:
logger.error(f"[{test_id}] Error checking users table structure: {str(e)}")
results["operations"].append({
"name": "Check users table structure",
"success": False,
"error": str(e)
})
# Test 6: List all users
logger.info(f"[{test_id}] Test 6: List all users")
try:
all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall()
results["operations"].append({
"name": "List all users",
"success": True,
"count": len(all_users) if all_users else 0,
"users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else []
})
logger.info(f"[{test_id}] Test 6 result: {all_users}")
except Exception as e:
logger.error(f"[{test_id}] Error listing all users: {str(e)}")
results["operations"].append({
"name": "List all users",
"success": False,
"error": str(e)
})
logger.info(f"[{test_id}] Database test completed successfully")
return results
except Exception as e:
logger.error(f"[{test_id}] Database test failed: {str(e)}")
results["operations"].append({
"name": "Error during tests",
"success": False,
"error": str(e)
})
return results
# Test user creation endpoint (direct SQL)
@app.get("/create-test-user", tags=["General"])
async def create_test_user():
"""
Create a test user with email [email protected].
This is for testing purposes only.
Returns:
dict: Information about the created test user.
"""
import time
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Starting test user creation (direct SQL)")
if not hasattr(app, "db_conn"):
logger.error(f"[{test_id}] Database connection not available")
return {
"success": False,
"error": "Database connection not available"
}
try:
# First check if the test user already exists
logger.info(f"[{test_id}] Checking if test user already exists")
check_query = "SELECT id FROM users WHERE email = ?"
existing_user = app.db_conn.execute(check_query, ("[email protected]",)).fetchone()
if existing_user:
logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}")
return {
"success": True,
"user_id": existing_user[0],
"email": "[email protected]",
"status": "already_exists"
}
# Use a pre-hashed password to avoid dependency on passlib
logger.info(f"[{test_id}] Using pre-hashed password for test user")
# This is a pre-hashed version of "TestPassword123!" using Argon2
hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE"
# Insert the test user
logger.info(f"[{test_id}] Inserting test user with email: [email protected]")
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)"
cursor = app.db_conn.execute(insert_query, ("[email protected]", hashed_password))
app.db_conn.commit()
logger.info(f"[{test_id}] Committed test user insert")
# Try to get the user ID
user_id = None
try:
user_id = cursor.lastrowid
logger.info(f"[{test_id}] Got test user lastrowid: {user_id}")
except Exception as e:
logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}")
# Verify the insert with a separate query
logger.info(f"[{test_id}] Verifying test user was created")
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?"
verify_result = app.db_conn.execute(verify_query, ("[email protected]",)).fetchone()
if verify_result:
user_id = verify_result[0]
logger.info(f"[{test_id}] Verified test user with ID: {user_id}")
return {
"success": True,
"user_id": user_id,
"email": "[email protected]",
"created_at": verify_result[2] if len(verify_result) > 2 else None,
"status": "created"
}
else:
logger.error(f"[{test_id}] Failed to verify test user after insert")
return {
"success": False,
"error": "Failed to verify user after insert"
}
except Exception as e:
logger.error(f"[{test_id}] Error creating test user: {str(e)}")
return {
"success": False,
"error": str(e)
}
# Import and include routers
from app.api.routes import auth_router, projects_router, journals_router
app.include_router(auth_router.router, prefix="/api/auth", tags=["Authentication"])
app.include_router(projects_router.router, prefix="/api/projects", tags=["Projects"])
app.include_router(journals_router.router, prefix="/api/journals", tags=["Journals"])
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
logger.info(f"Starting server on port {port}")
uvicorn.run("main:app", host="0.0.0.0", port=port)