import os import sys import logging import requests from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html from fastapi.openapi.utils import get_openapi from fastapi.staticfiles import StaticFiles import uvicorn from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("auth-server") # Import database libraries try: # First try the recommended libsql-experimental package import libsql_experimental as libsql logger.info("Successfully imported libsql-experimental package") HAS_LIBSQL = True LIBSQL_TYPE = "experimental" except ImportError: try: # Then try the libsql-client package as fallback import libsql_client logger.info("Successfully imported libsql-client package") HAS_LIBSQL = True LIBSQL_TYPE = "client" except ImportError: logger.error("Failed to import any libsql package. Please install libsql-experimental==0.0.49") logger.error("Falling back to HTTP API method for database access") # We'll use requests for HTTP API fallback import requests HAS_LIBSQL = False LIBSQL_TYPE = "http" # Load environment variables load_dotenv() # Create FastAPI app with detailed metadata app = FastAPI( title="Seamo Auth Server", description=""" # Seamo Authentication API The Seamo Auth Server provides authentication and user management services for the Seamo platform. ## Features * User registration and authentication * Project management * Journal management * Access control ## Authentication Most endpoints require authentication. Use the /api/auth/token endpoint to obtain access tokens. """, version="1.0.0", contact={ "name": "Seamo Team", "url": "https://seamo.earth/contact", "email": "info@seamo.earth", }, license_info={ "name": "MIT", "url": "https://opensource.org/licenses/MIT", }, openapi_tags=[ { "name": "Authentication", "description": "Operations related to user authentication and token management", }, { "name": "Projects", "description": "Operations related to project management", }, { "name": "Journals", "description": "Operations related to journal management", }, { "name": "General", "description": "General server information and health checks", }, ], docs_url=None, # Disable default docs redoc_url=None # Disable default redoc ) # Configure CORS origins = [ os.getenv("FRONTEND_URL", "http://localhost:3000"), "https://seamo.earth", "https://seamoo.netlify.app", "https://seamo-ai-ai-server.hf.space", "https://seamo-ai-auth-server.hf.space", "https://seamo-ai-scraper-server.hf.space", "http://localhost:3000", # For local development "http://localhost:8000", # Local AI server "http://localhost:8001", # Local Auth server "http://localhost:8002", # Local Scraper server ] print(f"CORS origins configured: {origins}") app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database connection @app.on_event("startup") async def startup_db_client(): # Get environment variables db_url = os.getenv("TURSO_DATABASE_URL") auth_token = os.getenv("TURSO_AUTH_TOKEN") if not db_url or not auth_token: error_msg = "Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set." logger.error(error_msg) raise Exception(error_msg) # Clean the auth token to remove any problematic characters clean_auth_token = auth_token.strip() # Log connection details (without showing the full token) token_preview = clean_auth_token[:10] + "..." if len(clean_auth_token) > 10 else "***" logger.info(f"Connecting to database at URL: {db_url}") logger.info(f"Using libsql type: {LIBSQL_TYPE}") logger.info(f"Auth token preview: {token_preview}") # Ensure the token is properly formatted if not clean_auth_token.startswith("eyJ"): logger.warning("Auth token does not appear to be in JWT format (should start with 'eyJ')") # Try to extract the token if it's wrapped in quotes or has extra characters if "eyJ" in clean_auth_token: start_idx = clean_auth_token.find("eyJ") # Find the end of the token (usually at a quote, space, or newline) end_markers = ['"', "'", ' ', '\n', '\r'] end_idx = len(clean_auth_token) for marker in end_markers: marker_idx = clean_auth_token.find(marker, start_idx) if marker_idx > start_idx and marker_idx < end_idx: end_idx = marker_idx clean_auth_token = clean_auth_token[start_idx:end_idx] logger.info(f"Extracted JWT token: {clean_auth_token[:10]}...") # Verify the token has three parts (header.payload.signature) parts = clean_auth_token.split('.') if len(parts) != 3: logger.warning(f"Auth token does not have the expected JWT format (3 parts separated by dots). Found {len(parts)} parts.") else: logger.info("Auth token has the expected JWT format") # Initialize database connection connected = False # Method 1: Try with libsql-experimental if HAS_LIBSQL and LIBSQL_TYPE == "experimental": try: logger.info("Connecting with libsql-experimental") # Try multiple connection methods # Method 1a: Try with auth_token parameter (works with version 0.0.49) try: logger.info("Trying connection with auth_token parameter") # Log the libsql version if hasattr(libsql, '__version__'): logger.info(f"libsql-experimental version: {libsql.__version__}") # Create the connection app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) # Test connection with a simple query logger.info("Testing connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection successful with auth_token parameter: {result}") # Test connection with a more complex query logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing connection with INSERT") app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value",)) app.db_conn.commit() # Test select logger.info("Testing connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"Test table contents: {result}") connected = True app.db_type = "libsql-experimental" app.last_successful_connection_method = "auth_token" logger.info("All connection tests passed successfully") except Exception as e: logger.warning(f"Connection with auth_token parameter failed: {str(e)}") # Method 1b: Try with auth token in URL (works with other versions) if not connected: try: logger.info("Trying connection with auth token in URL") # Format the URL to include the auth token if "?" in db_url: connection_url = f"{db_url}&authToken={clean_auth_token}" else: connection_url = f"{db_url}?authToken={clean_auth_token}" logger.info(f"Using connection URL: {connection_url}") # Use the direct URL connection method with auth token in URL app.db_conn = libsql.connect(connection_url) app.last_successful_connection_url = connection_url # Test connection with a simple query logger.info("Testing connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection successful with auth token in URL: {result}") # Test connection with a more complex query logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing connection with INSERT") app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_url",)) app.db_conn.commit() # Test select logger.info("Testing connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"Test table contents: {result}") connected = True app.db_type = "libsql-experimental" app.last_successful_connection_method = "url" logger.info("All connection tests passed successfully") except Exception as e: logger.error(f"Connection with auth token in URL failed: {str(e)}") except Exception as e: logger.error(f"All libsql-experimental connection methods failed: {str(e)}") # Method 2: Try with libsql-client if not connected and HAS_LIBSQL and LIBSQL_TYPE == "client": try: logger.info("Connecting with libsql-client") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url logger.info(f"Using URL: {http_url}") # Connect using the client app.db_conn = libsql_client.create_client_sync( url=http_url, auth_token=clean_auth_token ) # Test connection result = app.db_conn.execute("SELECT 1").rows() logger.info(f"Connection test successful: {result}") connected = True app.db_type = "libsql-client" except Exception as e: logger.error(f"libsql-client connection failed: {str(e)}") # Method 3: Fallback to HTTP API if not connected: try: logger.info("Falling back to HTTP API method") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') # Verify the URL format if not http_url.startswith("https://"): logger.warning(f"HTTP URL does not start with https://: {http_url}") # Try to fix the URL if "://" not in http_url: http_url = f"https://{http_url}" logger.info(f"Added https:// prefix to URL: {http_url}") logger.info(f"Using HTTP URL: {http_url}") # Create a simple HTTP API client class class TursoHttpClient: def __init__(self, url, auth_token): self.url = url self.auth_token = auth_token self.headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } # Add a property to track the last inserted ID self.last_insert_id = None def execute(self, query, params=None): # Format the request according to the v2/pipeline specification requests_data = [] # Prepare the statement stmt = {"sql": query} # Add parameters if provided if params: # Convert parameters to the expected format args = [] for param in params: if param is None: args.append({"type": "null", "value": None}) elif isinstance(param, int): args.append({"type": "integer", "value": str(param)}) elif isinstance(param, float): args.append({"type": "float", "value": str(param)}) else: args.append({"type": "text", "value": str(param)}) stmt["args"] = args requests_data.append({"type": "execute", "stmt": stmt}) # If this is an INSERT, add a query to get the last inserted ID is_insert = query.strip().upper().startswith("INSERT") if is_insert: requests_data.append({ "type": "execute", "stmt": {"sql": "SELECT last_insert_rowid()"} }) # Always close the connection at the end requests_data.append({"type": "close"}) # Prepare the final request payload data = {"requests": requests_data} # Use the v2/pipeline endpoint pipeline_url = f"{self.url}/v2/pipeline" logger.info(f"Sending request to: {pipeline_url}") logger.info(f"Headers: Authorization: Bearer {self.auth_token[:5]}... (truncated)") try: response = requests.post(pipeline_url, headers=self.headers, json=data, timeout=10) # Log response status logger.info(f"Response status: {response.status_code}") # Check for auth errors specifically if response.status_code == 401: logger.error(f"Authentication error (401): {response.text}") raise Exception(f"Authentication failed: {response.text}") # Raise for other errors response.raise_for_status() # Parse the response result = response.json() except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {str(e)}") raise # Process the response if "results" in result and len(result["results"]) > 0: # If this was an INSERT, get the last inserted ID if is_insert and len(result["results"]) > 1: try: last_id_result = result["results"][1] if "rows" in last_id_result and len(last_id_result["rows"]) > 0: self.last_insert_id = last_id_result["rows"][0]["values"][0] logger.info(f"Last inserted ID: {self.last_insert_id}") except Exception as e: logger.warning(f"Failed to get last inserted ID: {str(e)}") # Return a cursor-like object with the main result cursor = TursoHttpCursor(result["results"][0]) cursor.lastrowid = self.last_insert_id return cursor # Return an empty cursor cursor = TursoHttpCursor(None) cursor.lastrowid = self.last_insert_id return cursor def commit(self): # HTTP API is stateless, no need to commit logger.info("HTTP API commit called (no-op)") pass def close(self): # HTTP API is stateless, no need to close logger.info("HTTP API close called (no-op)") pass # Create a cursor-like class for HTTP API class TursoHttpCursor: def __init__(self, result): self.result = result self.lastrowid = None def fetchone(self): if self.result and "rows" in self.result and len(self.result["rows"]) > 0: return self.result["rows"][0]["values"] return None def fetchall(self): if self.result and "rows" in self.result: return [row["values"] for row in self.result["rows"]] return [] # Create the HTTP API client app.db_conn = TursoHttpClient(http_url, clean_auth_token) # Test connection with a simple query logger.info("Testing HTTP API connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"HTTP API connection test successful: {result}") # Test connection with a more complex query logger.info("Testing HTTP API connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing HTTP API connection with INSERT") cursor = app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_http",)) app.db_conn.commit() logger.info(f"HTTP API insert test - lastrowid: {cursor.lastrowid}") # Test select logger.info("Testing HTTP API connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"HTTP API test table contents: {result}") connected = True app.db_type = "http-api" logger.info("All HTTP API connection tests passed successfully") except Exception as e: logger.error(f"HTTP API connection failed: {str(e)}") if not connected: error_msg = "All database connection methods failed. Please check your credentials and try again." logger.error(error_msg) raise Exception(error_msg) # Create tables if they don't exist # We'll create each table in a separate try-except block to ensure that if one fails, others can still be created logger.info("Creating database tables") # Function to execute SQL with retry def execute_with_retry(sql, max_retries=3): for attempt in range(max_retries): try: # For each attempt, we'll create a fresh connection if needed if attempt > 0: logger.info(f"Retry attempt {attempt+1} for SQL execution") # Test the connection first try: test_result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection test successful: {test_result}") except Exception as conn_err: logger.warning(f"Connection test failed, reconnecting: {str(conn_err)}") # Reconnect using the same method that worked initially if app.db_type == "libsql-experimental": if hasattr(app, 'last_successful_connection_method') and app.last_successful_connection_method == 'auth_token': logger.info("Reconnecting with auth_token parameter") app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) elif hasattr(app, 'last_successful_connection_url'): logger.info(f"Reconnecting with URL: {app.last_successful_connection_url}") app.db_conn = libsql.connect(app.last_successful_connection_url) # Execute the SQL app.db_conn.execute(sql) app.db_conn.commit() return True except Exception as e: logger.warning(f"SQL execution failed (attempt {attempt+1}): {str(e)}") if attempt == max_retries - 1: raise # Small delay before retry import time time.sleep(1) # Simplified table creation with fewer constraints try: # Create users table users_table = """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """ execute_with_retry(users_table) logger.info("Users table created successfully") # Create projects table projects_table = """ CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, owner_id INTEGER NOT NULL, title TEXT NOT NULL, description TEXT, geojson TEXT, storage_bucket TEXT DEFAULT 'default', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ execute_with_retry(projects_table) logger.info("Projects table created successfully") # Create journals table journals_table = """ CREATE TABLE IF NOT EXISTS journals ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL, title TEXT NOT NULL, content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ execute_with_retry(journals_table) logger.info("Journals table created successfully") logger.info("All database tables created successfully") except Exception as e: error_msg = f"Failed to create database tables: {str(e)}" logger.error(error_msg) # We'll continue even if table creation fails # This allows the server to start and use existing tables if they exist logger.warning("Continuing server startup despite table creation errors") @app.on_event("shutdown") async def shutdown_db_client(): logger.info("Shutting down database connection") try: # Close connection based on the type if hasattr(app, 'db_type'): if app.db_type == "libsql-experimental": try: app.db_conn.close() logger.info("libsql-experimental connection closed successfully") except Exception as e: logger.warning(f"Error closing libsql-experimental connection: {str(e)}") elif app.db_type == "libsql-client": # libsql-client doesn't have a close method logger.info("No close method needed for libsql-client") elif app.db_type == "http-api": # HTTP API is stateless, no need to close logger.info("No close method needed for HTTP API") else: logger.warning(f"Unknown database type: {app.db_type}") else: logger.warning("No database connection to close") except Exception as e: logger.error(f"Error closing database connection: {str(e)}") # Custom API documentation routes @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( openapi_url=app.openapi_url, title=f"{app.title} - Swagger UI", oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui-bundle.js", swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css", swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) @app.get("/redoc", include_in_schema=False) async def redoc_html(): return get_redoc_html( openapi_url=app.openapi_url, title=f"{app.title} - ReDoc", redoc_js_url="https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js", redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) # Serve Swagger UI at root path @app.get("/", include_in_schema=False) async def root_swagger(): logger.info("Root endpoint accessed - serving Swagger UI") return get_swagger_ui_html( openapi_url=app.openapi_url, title=f"{app.title} - API Documentation", oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui-bundle.js", swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css", swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) # API information endpoint (moved from root) @app.get("/api/info", tags=["General"]) async def api_info(): """ API information endpoint providing details about the Seamo Auth Server API. Returns: dict: Basic information about the API and links to documentation. """ logger.info("API info endpoint accessed") return { "message": "Welcome to Seamo Auth Server API", "version": "1.0.0", "documentation": { "swagger_ui": "/", "redoc": "/redoc", "openapi_json": "/openapi.json" }, "endpoints": { "health": "/health", "auth": "/api/auth", "projects": "/api/projects", "journals": "/api/journals" } } # Health check endpoint @app.get("/health", tags=["General"]) async def health_check(): """ Health check endpoint to verify the server is running properly. Returns: dict: Status information about the server. """ logger.info("Health check endpoint accessed") return { "status": "healthy", "version": "1.0.0", "database_connected": hasattr(app, "db_conn"), "database_type": getattr(app, "db_type", "unknown") } # Database test endpoint @app.get("/test-db", tags=["General"]) async def test_database(): """ Test endpoint to verify database operations. This is for debugging purposes only. """ import time # Generate a unique test ID test_id = f"test_{int(time.time())}" logger.info(f"[{test_id}] Starting database test") results = { "connection_type": getattr(app, "db_type", "unknown"), "connection_object_type": type(app.db_conn).__name__ if hasattr(app, "db_conn") else "None", "operations": [] } if hasattr(app, "last_successful_connection_method"): results["connection_method"] = app.last_successful_connection_method if not hasattr(app, "db_conn"): logger.error(f"[{test_id}] Database connection not available") results["operations"].append({ "name": "Database connection check", "success": False, "error": "Database connection not available" }) return results try: # Test 1: Simple SELECT logger.info(f"[{test_id}] Test 1: Simple SELECT") test_query = "SELECT 1 as test" result = app.db_conn.execute(test_query).fetchone() results["operations"].append({ "name": "Simple SELECT", "success": result is not None, "result": str(result) if result is not None else None }) logger.info(f"[{test_id}] Test 1 result: {result}") # Test 2: Create a temporary table logger.info(f"[{test_id}] Test 2: Create a temporary table") create_temp_table = """ CREATE TABLE IF NOT EXISTS test_table ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ app.db_conn.execute(create_temp_table) app.db_conn.commit() results["operations"].append({ "name": "Create temporary table", "success": True }) logger.info(f"[{test_id}] Test 2 completed successfully") # Test 3: Insert into the temporary table logger.info(f"[{test_id}] Test 3: Insert into the temporary table") test_name = f"test_user_{int(time.time())}" insert_query = "INSERT INTO test_table (name) VALUES (?)" cursor = app.db_conn.execute(insert_query, (test_name,)) app.db_conn.commit() # Check if lastrowid is available last_id = None try: last_id = cursor.lastrowid logger.info(f"[{test_id}] Got lastrowid: {last_id}") except Exception as e: logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}") # Try to get the ID using a query try: id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1" id_result = app.db_conn.execute(id_query, (test_name,)).fetchone() if id_result: last_id = id_result[0] logger.info(f"[{test_id}] Got ID from query: {last_id}") except Exception as e2: logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}") results["operations"].append({ "name": "Insert into temporary table", "success": True, "last_id": last_id }) logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}") # Test 4: Select from the temporary table logger.info(f"[{test_id}] Test 4: Select from the temporary table") select_query = "SELECT * FROM test_table WHERE name = ?" result = app.db_conn.execute(select_query, (test_name,)).fetchone() results["operations"].append({ "name": "Select from temporary table", "success": result is not None, "result": str(result) if result is not None else None }) logger.info(f"[{test_id}] Test 4 result: {result}") # Test 5: Check if users table exists and has the expected structure logger.info(f"[{test_id}] Test 5: Check users table structure") try: table_info = app.db_conn.execute("PRAGMA table_info(users)").fetchall() results["operations"].append({ "name": "Check users table structure", "success": len(table_info) > 0, "columns": [col[1] for col in table_info] if table_info else [] }) logger.info(f"[{test_id}] Test 5 result: {table_info}") except Exception as e: logger.error(f"[{test_id}] Error checking users table structure: {str(e)}") results["operations"].append({ "name": "Check users table structure", "success": False, "error": str(e) }) # Test 6: List all users logger.info(f"[{test_id}] Test 6: List all users") try: all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() results["operations"].append({ "name": "List all users", "success": True, "count": len(all_users) if all_users else 0, "users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else [] }) logger.info(f"[{test_id}] Test 6 result: {all_users}") except Exception as e: logger.error(f"[{test_id}] Error listing all users: {str(e)}") results["operations"].append({ "name": "List all users", "success": False, "error": str(e) }) logger.info(f"[{test_id}] Database test completed successfully") return results except Exception as e: logger.error(f"[{test_id}] Database test failed: {str(e)}") results["operations"].append({ "name": "Error during tests", "success": False, "error": str(e) }) return results # Test user creation endpoint (direct SQL) @app.get("/create-test-user", tags=["General"]) async def create_test_user(): """ Create a test user with email test@seamo.earth. This is for testing purposes only. Returns: dict: Information about the created test user. """ import time # Generate a unique test ID test_id = f"test_{int(time.time())}" logger.info(f"[{test_id}] Starting test user creation (direct SQL)") if not hasattr(app, "db_conn"): logger.error(f"[{test_id}] Database connection not available") return { "success": False, "error": "Database connection not available" } try: # First check if the test user already exists logger.info(f"[{test_id}] Checking if test user already exists") check_query = "SELECT id FROM users WHERE email = ?" existing_user = app.db_conn.execute(check_query, ("test@seamo.earth",)).fetchone() if existing_user: logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}") return { "success": True, "user_id": existing_user[0], "email": "test@seamo.earth", "status": "already_exists" } # Use a pre-hashed password to avoid dependency on passlib logger.info(f"[{test_id}] Using pre-hashed password for test user") # This is a pre-hashed version of "TestPassword123!" using Argon2 hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE" # Insert the test user logger.info(f"[{test_id}] Inserting test user with email: test@seamo.earth") insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" cursor = app.db_conn.execute(insert_query, ("test@seamo.earth", hashed_password)) app.db_conn.commit() logger.info(f"[{test_id}] Committed test user insert") # Try to get the user ID user_id = None try: user_id = cursor.lastrowid logger.info(f"[{test_id}] Got test user lastrowid: {user_id}") except Exception as e: logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}") # Verify the insert with a separate query logger.info(f"[{test_id}] Verifying test user was created") verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" verify_result = app.db_conn.execute(verify_query, ("test@seamo.earth",)).fetchone() if verify_result: user_id = verify_result[0] logger.info(f"[{test_id}] Verified test user with ID: {user_id}") return { "success": True, "user_id": user_id, "email": "test@seamo.earth", "created_at": verify_result[2] if len(verify_result) > 2 else None, "status": "created" } else: logger.error(f"[{test_id}] Failed to verify test user after insert") return { "success": False, "error": "Failed to verify user after insert" } except Exception as e: logger.error(f"[{test_id}] Error creating test user: {str(e)}") return { "success": False, "error": str(e) } # Import and include routers from app.api.routes import auth_router, projects_router, journals_router app.include_router(auth_router.router, prefix="/api/auth", tags=["Authentication"]) app.include_router(projects_router.router, prefix="/api/projects", tags=["Projects"]) app.include_router(journals_router.router, prefix="/api/journals", tags=["Journals"]) if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) logger.info(f"Starting server on port {port}") uvicorn.run("main:app", host="0.0.0.0", port=port)