import os import sys import logging import requests import time from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html from fastapi.openapi.utils import get_openapi from fastapi.staticfiles import StaticFiles import uvicorn from dotenv import load_dotenv from passlib.context import CryptContext from pydantic import BaseModel, EmailStr from typing import Dict, Any, List, Optional # Import the HTTP API database utility from app.utils import db_http # Configure logging logging.basicConfig( level=logging.WARNING, # Changed from INFO to WARNING to reduce verbosity format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("auth-server") # Set specific loggers to different levels as needed logging.getLogger("uvicorn").setLevel(logging.WARNING) logging.getLogger("fastapi").setLevel(logging.WARNING) # Import database libraries try: # First try the recommended libsql-experimental package import libsql_experimental as libsql logger.info("Successfully imported libsql-experimental package") HAS_LIBSQL = True LIBSQL_TYPE = "experimental" except ImportError: try: # Then try the libsql-client package as fallback import libsql_client logger.info("Successfully imported libsql-client package") HAS_LIBSQL = True LIBSQL_TYPE = "client" except ImportError: logger.error("Failed to import any libsql package. Please install libsql-experimental==0.0.49") logger.error("Falling back to HTTP API method for database access") # We'll use requests for HTTP API fallback import requests HAS_LIBSQL = False LIBSQL_TYPE = "http" # Load environment variables load_dotenv() # Create FastAPI app with detailed metadata app = FastAPI( title="Seamo Auth Server", description=""" # Seamo Authentication API The Seamo Auth Server provides authentication and user management services for the Seamo platform. ## Features * User registration and authentication * Project management * Journal management * Access control ## Authentication Most endpoints require authentication. Use the /api/auth/token endpoint to obtain access tokens. """, version="1.0.0", contact={ "name": "Seamo Team", "url": "https://seamo.earth/contact", "email": "info@seamo.earth", }, license_info={ "name": "MIT", "url": "https://opensource.org/licenses/MIT", }, openapi_tags=[ { "name": "Authentication", "description": "Operations related to user authentication and token management", }, { "name": "Projects", "description": "Operations related to project management", }, { "name": "Journals", "description": "Operations related to journal management", }, { "name": "General", "description": "General server information and health checks", }, ], docs_url=None, # Disable default docs redoc_url=None # Disable default redoc ) # Configure CORS origins = [ os.getenv("FRONTEND_URL", "http://localhost:3000"), "https://seamo.earth", "https://seamoo.netlify.app", "https://seamo-ai-ai-server.hf.space", "https://seamo-ai-auth-server.hf.space", "https://seamo-ai-scraper-server.hf.space", "http://localhost:3000", # For local development "http://localhost:8000", # Local AI server "http://localhost:8001", # Local Auth server "http://localhost:8002", # Local Scraper server ] print(f"CORS origins configured: {origins}") app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database connection @app.on_event("startup") async def startup_db_client(): # Get environment variables db_url = os.getenv("TURSO_DATABASE_URL") auth_token = os.getenv("TURSO_AUTH_TOKEN") if not db_url or not auth_token: error_msg = "Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set." logger.error(error_msg) raise Exception(error_msg) # Clean the auth token to remove any problematic characters clean_auth_token = auth_token.strip() # Log connection details (without showing the full token) token_preview = clean_auth_token[:10] + "..." if len(clean_auth_token) > 10 else "***" logger.info(f"Connecting to database at URL: {db_url}") logger.info(f"Using libsql type: {LIBSQL_TYPE}") logger.info(f"Auth token preview: {token_preview}") # Ensure the token is properly formatted if not clean_auth_token.startswith("eyJ"): logger.warning("Auth token does not appear to be in JWT format (should start with 'eyJ')") # Try to extract the token if it's wrapped in quotes or has extra characters if "eyJ" in clean_auth_token: start_idx = clean_auth_token.find("eyJ") # Find the end of the token (usually at a quote, space, or newline) end_markers = ['"', "'", ' ', '\n', '\r'] end_idx = len(clean_auth_token) for marker in end_markers: marker_idx = clean_auth_token.find(marker, start_idx) if marker_idx > start_idx and marker_idx < end_idx: end_idx = marker_idx clean_auth_token = clean_auth_token[start_idx:end_idx] logger.info(f"Extracted JWT token: {clean_auth_token[:10]}...") # Verify the token has three parts (header.payload.signature) parts = clean_auth_token.split('.') if len(parts) != 3: logger.warning(f"Auth token does not have the expected JWT format (3 parts separated by dots). Found {len(parts)} parts.") else: logger.info("Auth token has the expected JWT format") # Function to create a super user if it doesn't exist using HTTP API def create_super_user_http(): """ Create a super user with admin privileges if it doesn't exist using the HTTP API. Uses the email test@seamo.earth as specified. """ # Generate a unique identifier for this operation operation_id = f"create_super_user_http_{int(time.time())}" logger.info(f"[{operation_id}] Starting super user creation using HTTP API") # Super user details super_user_email = "test@seamo.earth" super_user_password = "TestPassword123!" # This is just a default password try: # Create password hashing context pwd_context = CryptContext( schemes=["argon2"], argon2__time_cost=4, argon2__memory_cost=102400, argon2__parallelism=8, argon2__salt_len=16 ) # Hash the password hashed_password = pwd_context.hash(super_user_password) logger.info(f"[{operation_id}] Password hashed successfully") # First, make sure the users table exists logger.info(f"[{operation_id}] Creating users table if it doesn't exist") db_http.create_table_if_not_exists( "users", """ id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 """, operation_id=f"{operation_id}_create_table" ) # Check if the super user already exists logger.info(f"[{operation_id}] Checking if super user already exists") users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": super_user_email}], limit=1, operation_id=f"{operation_id}_check" ) if users and len(users) > 0: # User exists, check if they're an admin user = users[0] user_id = user.get("id") is_admin = bool(user.get("is_admin", 0)) logger.info(f"[{operation_id}] Super user already exists with ID: {user_id}, is_admin: {is_admin}") # If user exists but is not admin, make them admin if not is_admin: logger.info(f"[{operation_id}] Updating user to have admin privileges") update_result = db_http.update_record( "users", {"is_admin": 1}, "id = ?", [{"type": "integer", "value": str(user_id)}], operation_id=f"{operation_id}_update" ) if update_result: logger.info(f"[{operation_id}] User updated to admin successfully") else: logger.warning(f"[{operation_id}] Failed to update user to admin") return # User doesn't exist, create it logger.info(f"[{operation_id}] Super user does not exist, creating now") # Insert the super user user_data = { "email": super_user_email, "hashed_password": hashed_password, "is_admin": 1 } user_id = db_http.insert_record("users", user_data, operation_id=f"{operation_id}_insert") if user_id: logger.info(f"[{operation_id}] Super user created successfully with ID: {user_id}") # Verify the user was created new_user = db_http.get_record_by_id("users", user_id, operation_id=f"{operation_id}_verify") if new_user: logger.info(f"[{operation_id}] Super user verified with ID: {user_id}") return else: logger.warning(f"[{operation_id}] Super user not found by ID after creation") # Try to find by email users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": super_user_email}], limit=1, operation_id=f"{operation_id}_verify_email" ) if users and len(users) > 0: logger.info(f"[{operation_id}] Super user found by email after creation") return # If we get here, we couldn't verify the user was created # List all users for diagnostic purposes all_users = db_http.select_records("users", operation_id=f"{operation_id}_list_all") logger.warning(f"[{operation_id}] Super user was not found after creation. All users: {all_users}") except Exception as e: logger.error(f"[{operation_id}] Error creating super user with HTTP API: {str(e)}") # We'll continue even if super user creation fails # This allows the server to start and function without a super user logger.warning(f"[{operation_id}] Continuing server startup despite super user creation error") # Function to create a super user if it doesn't exist (original method - kept for reference) def create_super_user(): """ Create a super user with admin privileges if it doesn't exist. Uses the email test@seamo.earth as specified. Uses the approach from turso_interactive_crud.py for reliable operation. Note: This function is kept for reference but is not used anymore. The create_super_user_http function is used instead. """ # This function is kept for reference but is not used anymore pass # Function to create database tables using HTTP API def create_tables_http(): """ Create database tables if they don't exist using the HTTP API. """ operation_id = f"create_tables_http_{int(time.time())}" logger.info(f"[{operation_id}] Creating database tables using HTTP API") try: # Import the HTTP API utility from app.utils import db_http # Create users table logger.info(f"[{operation_id}] Creating users table") db_http.create_table_if_not_exists( "users", """ id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin BOOLEAN DEFAULT 0 """, operation_id=f"{operation_id}_users" ) # Create projects table logger.info(f"[{operation_id}] Creating projects table") db_http.create_table_if_not_exists( "projects", """ id INTEGER PRIMARY KEY AUTOINCREMENT, owner_id INTEGER NOT NULL, title TEXT NOT NULL, description TEXT, geojson TEXT, storage_bucket TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (owner_id) REFERENCES users (id) """, operation_id=f"{operation_id}_projects" ) # Create journals table logger.info(f"[{operation_id}] Creating journals table") db_http.create_table_if_not_exists( "journals", """ id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL, title TEXT NOT NULL, content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects (id) """, operation_id=f"{operation_id}_journals" ) logger.info(f"[{operation_id}] All database tables created successfully") except Exception as e: logger.error(f"[{operation_id}] Error creating database tables: {str(e)}") # Initialize database connection connected = False # Method 1: Try with libsql-experimental if HAS_LIBSQL and LIBSQL_TYPE == "experimental": try: logger.info("Connecting with libsql-experimental") # Try multiple connection methods # Method 1a: Try with auth_token parameter (works with version 0.0.49) try: logger.info("Trying connection with auth_token parameter") # Log the libsql version if hasattr(libsql, '__version__'): logger.info(f"libsql-experimental version: {libsql.__version__}") # Create the connection app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) # Test connection with a simple query logger.info("Testing connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection successful with auth_token parameter: {result}") # Test connection with a more complex query logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing connection with INSERT") app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value",)) app.db_conn.commit() # Test select logger.info("Testing connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"Test table contents: {result}") connected = True app.db_type = "libsql-experimental" app.last_successful_connection_method = "auth_token" logger.info("All connection tests passed successfully") except Exception as e: logger.warning(f"Connection with auth_token parameter failed: {str(e)}") # Method 1b: Try with auth token in URL (works with other versions) if not connected: try: logger.info("Trying connection with auth token in URL") # Format the URL to include the auth token if "?" in db_url: connection_url = f"{db_url}&authToken={clean_auth_token}" else: connection_url = f"{db_url}?authToken={clean_auth_token}" logger.info(f"Using connection URL: {connection_url}") # Use the direct URL connection method with auth token in URL app.db_conn = libsql.connect(connection_url) app.last_successful_connection_url = connection_url # Test connection with a simple query logger.info("Testing connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection successful with auth token in URL: {result}") # Test connection with a more complex query logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing connection with INSERT") app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_url",)) app.db_conn.commit() # Test select logger.info("Testing connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"Test table contents: {result}") connected = True app.db_type = "libsql-experimental" app.last_successful_connection_method = "url" logger.info("All connection tests passed successfully") except Exception as e: logger.error(f"Connection with auth token in URL failed: {str(e)}") except Exception as e: logger.error(f"All libsql-experimental connection methods failed: {str(e)}") # Method 2: Try with libsql-client if not connected and HAS_LIBSQL and LIBSQL_TYPE == "client": try: logger.info("Connecting with libsql-client") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url logger.info(f"Using URL: {http_url}") # Connect using the client app.db_conn = libsql_client.create_client_sync( url=http_url, auth_token=clean_auth_token ) # Test connection result = app.db_conn.execute("SELECT 1").rows() logger.info(f"Connection test successful: {result}") connected = True app.db_type = "libsql-client" except Exception as e: logger.error(f"libsql-client connection failed: {str(e)}") # Method 3: Fallback to HTTP API if not connected: try: logger.info("Falling back to HTTP API method") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') # Verify the URL format if not http_url.startswith("https://"): logger.warning(f"HTTP URL does not start with https://: {http_url}") # Try to fix the URL if "://" not in http_url: http_url = f"https://{http_url}" logger.info(f"Added https:// prefix to URL: {http_url}") logger.info(f"Using HTTP URL: {http_url}") # Create a simple HTTP API client class class TursoHttpClient: def __init__(self, url, auth_token): self.url = url self.auth_token = auth_token self.headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } # Add a property to track the last inserted ID self.last_insert_id = None def execute(self, query, params=None): # Format the request according to the v2/pipeline specification requests_data = [] # Prepare the statement stmt = {"sql": query} # Add parameters if provided if params: # Convert parameters to the expected format args = [] for param in params: if param is None: args.append({"type": "null", "value": None}) elif isinstance(param, int): args.append({"type": "integer", "value": str(param)}) elif isinstance(param, float): args.append({"type": "float", "value": str(param)}) else: args.append({"type": "text", "value": str(param)}) stmt["args"] = args requests_data.append({"type": "execute", "stmt": stmt}) # If this is an INSERT, add a query to get the last inserted ID is_insert = query.strip().upper().startswith("INSERT") if is_insert: requests_data.append({ "type": "execute", "stmt": {"sql": "SELECT last_insert_rowid()"} }) # Always close the connection at the end requests_data.append({"type": "close"}) # Prepare the final request payload data = {"requests": requests_data} # Use the v2/pipeline endpoint pipeline_url = f"{self.url}/v2/pipeline" logger.info(f"Sending request to: {pipeline_url}") logger.info(f"Headers: Authorization: Bearer {self.auth_token[:5]}... (truncated)") try: response = requests.post(pipeline_url, headers=self.headers, json=data, timeout=10) # Log response status logger.info(f"Response status: {response.status_code}") # Check for auth errors specifically if response.status_code == 401: logger.error(f"Authentication error (401): {response.text}") raise Exception(f"Authentication failed: {response.text}") # Raise for other errors response.raise_for_status() # Parse the response result = response.json() except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {str(e)}") raise # Process the response if "results" in result and len(result["results"]) > 0: # If this was an INSERT, get the last inserted ID if is_insert and len(result["results"]) > 1: try: last_id_result = result["results"][1] if "rows" in last_id_result and len(last_id_result["rows"]) > 0: self.last_insert_id = last_id_result["rows"][0]["values"][0] logger.info(f"Last inserted ID: {self.last_insert_id}") except Exception as e: logger.warning(f"Failed to get last inserted ID: {str(e)}") # Return a cursor-like object with the main result cursor = TursoHttpCursor(result["results"][0]) cursor.lastrowid = self.last_insert_id return cursor # Return an empty cursor cursor = TursoHttpCursor(None) cursor.lastrowid = self.last_insert_id return cursor def commit(self): # HTTP API is stateless, no need to commit logger.info("HTTP API commit called (no-op)") pass def close(self): # HTTP API is stateless, no need to close logger.info("HTTP API close called (no-op)") pass # Create a cursor-like class for HTTP API class TursoHttpCursor: def __init__(self, result): self.result = result self.lastrowid = None def fetchone(self): if self.result and "rows" in self.result and len(self.result["rows"]) > 0: return self.result["rows"][0]["values"] return None def fetchall(self): if self.result and "rows" in self.result: return [row["values"] for row in self.result["rows"]] return [] # Create the HTTP API client app.db_conn = TursoHttpClient(http_url, clean_auth_token) # Test connection with a simple query logger.info("Testing HTTP API connection with SELECT 1") result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"HTTP API connection test successful: {result}") # Test connection with a more complex query logger.info("Testing HTTP API connection with CREATE TABLE IF NOT EXISTS") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS connection_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT ) """) app.db_conn.commit() # Test insert logger.info("Testing HTTP API connection with INSERT") cursor = app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_http",)) app.db_conn.commit() logger.info(f"HTTP API insert test - lastrowid: {cursor.lastrowid}") # Test select logger.info("Testing HTTP API connection with SELECT from test table") result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() logger.info(f"HTTP API test table contents: {result}") connected = True app.db_type = "http-api" logger.info("All HTTP API connection tests passed successfully") except Exception as e: logger.error(f"HTTP API connection failed: {str(e)}") if not connected: error_msg = "All database connection methods failed. Please check your credentials and try again." logger.error(error_msg) raise Exception(error_msg) # Create tables if they don't exist # We'll create each table in a separate try-except block to ensure that if one fails, others can still be created logger.info("Creating database tables") # Function to execute SQL with retry def execute_with_retry(sql, max_retries=3): for attempt in range(max_retries): try: # For each attempt, we'll create a fresh connection if needed if attempt > 0: logger.info(f"Retry attempt {attempt+1} for SQL execution") # Test the connection first try: test_result = app.db_conn.execute("SELECT 1").fetchone() logger.info(f"Connection test successful: {test_result}") except Exception as conn_err: logger.warning(f"Connection test failed, reconnecting: {str(conn_err)}") # Reconnect using the same method that worked initially if app.db_type == "libsql-experimental": if hasattr(app, 'last_successful_connection_method') and app.last_successful_connection_method == 'auth_token': logger.info("Reconnecting with auth_token parameter") app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) elif hasattr(app, 'last_successful_connection_url'): logger.info(f"Reconnecting with URL: {app.last_successful_connection_url}") app.db_conn = libsql.connect(app.last_successful_connection_url) # Execute the SQL app.db_conn.execute(sql) app.db_conn.commit() return True except Exception as e: logger.warning(f"SQL execution failed (attempt {attempt+1}): {str(e)}") if attempt == max_retries - 1: raise # Small delay before retry import time time.sleep(1) # Simplified table creation with fewer constraints try: # Create users table users_table = """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """ execute_with_retry(users_table) logger.info("Users table created successfully") # Create projects table projects_table = """ CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, owner_id INTEGER NOT NULL, title TEXT NOT NULL, description TEXT, geojson TEXT, storage_bucket TEXT DEFAULT 'default', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ execute_with_retry(projects_table) logger.info("Projects table created successfully") # Create journals table journals_table = """ CREATE TABLE IF NOT EXISTS journals ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL, title TEXT NOT NULL, content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ execute_with_retry(journals_table) logger.info("Journals table created successfully") logger.info("All database tables created successfully") # Create super user if it doesn't exist using HTTP API create_super_user_http() # Also create tables using HTTP API for better reliability try: logger.info("Creating tables using HTTP API for better reliability") create_tables_http() except Exception as e: logger.error(f"Error creating tables with HTTP API: {str(e)}") except Exception as e: error_msg = f"Failed to create database tables: {str(e)}" logger.error(error_msg) # We'll continue even if table creation fails # This allows the server to start and use existing tables if they exist logger.warning("Continuing server startup despite table creation errors") # Try to create tables using HTTP API as a fallback try: logger.info("Trying to create tables using HTTP API as a fallback") create_tables_http() except Exception as e: logger.error(f"Error creating tables with HTTP API fallback: {str(e)}") @app.on_event("shutdown") async def shutdown_db_client(): logger.info("Shutting down database connection") try: # Close connection based on the type if hasattr(app, 'db_type'): if app.db_type == "libsql-experimental": try: app.db_conn.close() logger.info("libsql-experimental connection closed successfully") except Exception as e: logger.warning(f"Error closing libsql-experimental connection: {str(e)}") elif app.db_type == "libsql-client": # libsql-client doesn't have a close method logger.info("No close method needed for libsql-client") elif app.db_type == "http-api": # HTTP API is stateless, no need to close logger.info("No close method needed for HTTP API") else: logger.warning(f"Unknown database type: {app.db_type}") else: logger.warning("No database connection to close") except Exception as e: logger.error(f"Error closing database connection: {str(e)}") # Custom API documentation routes @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( openapi_url=app.openapi_url, title=f"{app.title} - Swagger UI", oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui-bundle.js", swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css", swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) @app.get("/redoc", include_in_schema=False) async def redoc_html(): return get_redoc_html( openapi_url=app.openapi_url, title=f"{app.title} - ReDoc", redoc_js_url="https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js", redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) # Serve Swagger UI at root path @app.get("/", include_in_schema=False) async def root_swagger(): logger.info("Root endpoint accessed - serving Swagger UI") return get_swagger_ui_html( openapi_url=app.openapi_url, title=f"{app.title} - API Documentation", oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui-bundle.js", swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css", swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", ) # API information endpoint (moved from root) @app.get("/api/info", tags=["General"]) async def api_info(): """ API information endpoint providing details about the Seamo Auth Server API. Returns: dict: Basic information about the API and links to documentation. """ logger.info("API info endpoint accessed") return { "message": "Welcome to Seamo Auth Server API", "version": "1.0.0", "documentation": { "swagger_ui": "/", "redoc": "/redoc", "openapi_json": "/openapi.json" }, "endpoints": { "health": "/health", "auth": "/api/auth", "projects": "/api/projects", "journals": "/api/journals" } } # Health check endpoint @app.get("/health", tags=["General"]) async def health_check(): """ Health check endpoint to verify the server is running properly. Returns: dict: Status information about the server. """ logger.info("Health check endpoint accessed") return { "status": "healthy", "version": "1.0.0", "database_connected": hasattr(app, "db_conn"), "database_type": getattr(app, "db_type", "unknown") } # Super user check/create endpoint @app.get("/ensure-super-user", tags=["General"]) async def ensure_super_user(): """ Check if the super user exists and create it if it doesn't. This endpoint can be used to manually trigger super user creation. Returns: dict: Information about the super user status. """ logger.info("Ensure super user endpoint accessed") # First, let's test if we can insert and retrieve data from a simple test table try: # Create a simple test table logger.info("Creating test table") app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS super_user_test ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) app.db_conn.commit() # Insert a test value test_value = f"test_{int(time.time())}" logger.info(f"Inserting test value: {test_value}") app.db_conn.execute("INSERT INTO super_user_test (test_value) VALUES (?)", (test_value,)) app.db_conn.commit() # Try to retrieve the test value logger.info("Retrieving test value") result = app.db_conn.execute("SELECT * FROM super_user_test WHERE test_value = ?", (test_value,)).fetchone() if result: logger.info(f"Test value retrieved successfully: {result}") else: logger.warning("Test value not found after insert") # Get all values from the test table all_test_values = app.db_conn.execute("SELECT * FROM super_user_test").fetchall() logger.info(f"All test values: {all_test_values}") except Exception as e: logger.error(f"Error during test table operations: {str(e)}") # Now let's check the users table schema try: logger.info("Checking users table schema") table_info = app.db_conn.execute("PRAGMA table_info(users)").fetchall() logger.info(f"Users table schema: {table_info}") except Exception as e: logger.error(f"Error checking users table schema: {str(e)}") # Let's try a direct SQL approach for creating the super user try: logger.info("Trying direct SQL approach for super user creation") # Check if super user exists super_user_email = "test@seamo.earth" direct_check = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() logger.info(f"Direct check result: {direct_check}") if direct_check and direct_check[0] > 0: logger.info("Super user already exists according to direct check") else: logger.info("Super user does not exist according to direct check, creating now") # Create password hashing context pwd_context = CryptContext( schemes=["argon2"], argon2__time_cost=4, argon2__memory_cost=102400, argon2__parallelism=8, argon2__salt_len=16 ) # Hash the password super_user_password = "TestPassword123!" hashed_password = pwd_context.hash(super_user_password) # Try a direct insert with explicit SQL direct_insert = """ INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 1) """ app.db_conn.execute(direct_insert, (super_user_email, hashed_password)) app.db_conn.commit() logger.info("Direct insert completed") # Check if the insert worked direct_verify = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (super_user_email,)).fetchone() logger.info(f"Direct verify result: {direct_verify}") if direct_verify: logger.info("Super user created successfully with direct SQL approach") else: logger.warning("Super user not found after direct SQL insert") # Let's try to list all users all_users = app.db_conn.execute("SELECT * FROM users").fetchall() logger.info(f"All users after direct insert: {all_users}") except Exception as e: logger.error(f"Error during direct SQL approach: {str(e)}") # Return diagnostic information return { "status": "diagnostic_complete", "message": "Diagnostic tests completed. Check server logs for details.", "email": "test@seamo.earth", "next_steps": [ "Check server logs for detailed diagnostic information", "Verify database permissions", "Try accessing the /test-db endpoint for additional diagnostics" ] } # Database test endpoint @app.get("/test-db", tags=["General"]) async def test_database(): """ Test database connectivity and operations. This endpoint performs various database operations to diagnose issues. Returns: dict: Results of the database tests. """ logger.info("Database test endpoint accessed") results = { "tests": [], "overall_status": "unknown" } # Test 1: Basic connection try: test_result = app.db_conn.execute("SELECT 1").fetchone() results["tests"].append({ "name": "basic_connection", "status": "success", "result": str(test_result) }) except Exception as e: results["tests"].append({ "name": "basic_connection", "status": "failure", "error": str(e) }) # Test 2: Create test table try: app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS db_test_table ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_value TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) app.db_conn.commit() results["tests"].append({ "name": "create_test_table", "status": "success" }) except Exception as e: results["tests"].append({ "name": "create_test_table", "status": "failure", "error": str(e) }) # Test 3: Insert into test table test_id = int(time.time()) test_value = f"test_value_{test_id}" try: app.db_conn.execute("INSERT INTO db_test_table (test_value) VALUES (?)", (test_value,)) app.db_conn.commit() results["tests"].append({ "name": "insert_test_value", "status": "success", "value_inserted": test_value }) except Exception as e: results["tests"].append({ "name": "insert_test_value", "status": "failure", "error": str(e) }) # Test 4: Retrieve from test table try: result = app.db_conn.execute("SELECT * FROM db_test_table WHERE test_value = ?", (test_value,)).fetchone() if result: results["tests"].append({ "name": "retrieve_test_value", "status": "success", "result": str(result) }) else: results["tests"].append({ "name": "retrieve_test_value", "status": "failure", "error": "Value not found after insert" }) except Exception as e: results["tests"].append({ "name": "retrieve_test_value", "status": "failure", "error": str(e) }) # Test 5: List all values in test table try: all_values = app.db_conn.execute("SELECT * FROM db_test_table").fetchall() results["tests"].append({ "name": "list_all_test_values", "status": "success", "count": len(all_values), "values": str(all_values[-5:]) if all_values else "[]" # Show only the last 5 values }) except Exception as e: results["tests"].append({ "name": "list_all_test_values", "status": "failure", "error": str(e) }) # Test 6: Check users table try: users_count = app.db_conn.execute("SELECT COUNT(*) FROM users").fetchone() results["tests"].append({ "name": "check_users_table", "status": "success", "count": users_count[0] if users_count else 0 }) # If users exist, try to get one if users_count and users_count[0] > 0: try: user = app.db_conn.execute("SELECT * FROM users LIMIT 1").fetchone() results["tests"].append({ "name": "retrieve_user", "status": "success", "user_id": user[0] if user else None }) except Exception as e: results["tests"].append({ "name": "retrieve_user", "status": "failure", "error": str(e) }) except Exception as e: results["tests"].append({ "name": "check_users_table", "status": "failure", "error": str(e) }) # Test 7: Try to insert a test user test_user_email = f"test_user_{test_id}@example.com" try: # Create password hashing context pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") hashed_password = pwd_context.hash("TestPassword123!") # Insert test user app.db_conn.execute( "INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 0)", (test_user_email, hashed_password) ) app.db_conn.commit() results["tests"].append({ "name": "insert_test_user", "status": "success", "email": test_user_email }) # Try to retrieve the test user try: user = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (test_user_email,)).fetchone() if user: results["tests"].append({ "name": "retrieve_test_user", "status": "success", "user_id": user[0] if user else None }) else: results["tests"].append({ "name": "retrieve_test_user", "status": "failure", "error": "Test user not found after insert" }) except Exception as e: results["tests"].append({ "name": "retrieve_test_user", "status": "failure", "error": str(e) }) except Exception as e: results["tests"].append({ "name": "insert_test_user", "status": "failure", "error": str(e) }) # Calculate overall status success_count = sum(1 for test in results["tests"] if test["status"] == "success") total_count = len(results["tests"]) if success_count == total_count: results["overall_status"] = "success" elif success_count > 0: results["overall_status"] = "partial_success" else: results["overall_status"] = "failure" results["success_rate"] = f"{success_count}/{total_count}" return results # Super user creation endpoint (direct HTTP API) @app.get("/create-super-user-http", tags=["General"]) async def create_super_user_http_endpoint(): """ Create a super user with email test@seamo.earth directly using the Turso HTTP API. This endpoint is for testing purposes only. Returns: dict: Information about the super user creation. """ logger.info("Create super user HTTP endpoint accessed") # Super user details super_user_email = "test@seamo.earth" super_user_password = "TestPassword123!" # Create password hashing context pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") hashed_password = pwd_context.hash(super_user_password) try: # Extract the database URL and auth token db_url = os.getenv("TURSO_DATABASE_URL", "") auth_token = os.getenv("TURSO_AUTH_TOKEN", "") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') # Set up headers for the HTTP request headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } # First, check if the user already exists logger.info(f"Checking if super user already exists using HTTP API") check_query = { "requests": [ { "type": "execute", "stmt": { "sql": "SELECT COUNT(*) FROM users WHERE email = ?", "args": [ {"type": "text", "value": super_user_email} ] } }, {"type": "close"} ] } # Send the request to check if user exists check_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=check_query) check_response.raise_for_status() check_result = check_response.json() logger.info(f"Check response: {check_result}") # Parse the result to see if user exists user_exists = False if "results" in check_result and len(check_result["results"]) > 0: result = check_result["results"][0] if "rows" in result and len(result["rows"]) > 0: count = result["rows"][0]["values"][0] user_exists = count > 0 if user_exists: logger.info(f"Super user already exists according to HTTP API") return { "status": "exists", "message": "Super user already exists", "email": super_user_email } # User doesn't exist, create it logger.info(f"Creating super user using HTTP API") # First, make sure the table exists create_table_query = { "requests": [ { "type": "execute", "stmt": { "sql": """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """ } }, {"type": "close"} ] } # Send the request to create the table create_table_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=create_table_query) create_table_response.raise_for_status() logger.info(f"Table creation response: {create_table_response.status_code}") # Insert the super user insert_query = { "requests": [ { "type": "execute", "stmt": { "sql": "INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 1)", "args": [ {"type": "text", "value": super_user_email}, {"type": "text", "value": hashed_password} ] } }, { "type": "execute", "stmt": {"sql": "SELECT last_insert_rowid()"} }, {"type": "close"} ] } # Send the request to insert the user insert_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=insert_query) insert_response.raise_for_status() insert_result = insert_response.json() logger.info(f"Insert response: {insert_result}") # Try to get the last inserted ID last_id = None if "results" in insert_result and len(insert_result["results"]) > 1: result = insert_result["results"][1] if "rows" in result and len(result["rows"]) > 0: last_id = result["rows"][0]["values"][0] logger.info(f"Last inserted ID: {last_id}") # Verify the user was created verify_query = { "requests": [ { "type": "execute", "stmt": { "sql": "SELECT * FROM users WHERE email = ?", "args": [ {"type": "text", "value": super_user_email} ] } }, {"type": "close"} ] } # Send the request to verify the user verify_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=verify_query) verify_response.raise_for_status() verify_result = verify_response.json() logger.info(f"Verify response: {verify_result}") # Check if the user was found user_found = False user_id = None if "results" in verify_result and len(verify_result["results"]) > 0: result = verify_result["results"][0] if "rows" in result and len(result["rows"]) > 0: user_found = True user_id = result["rows"][0]["values"][0] logger.info(f"User found with ID: {user_id}") if user_found: return { "status": "success", "message": "Super user created successfully", "user_id": user_id, "email": super_user_email } else: # List all users for diagnostic purposes list_query = { "requests": [ { "type": "execute", "stmt": {"sql": "SELECT id, email FROM users"} }, {"type": "close"} ] } # Send the request to list all users list_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=list_query) list_response.raise_for_status() list_result = list_response.json() logger.info(f"List response: {list_result}") # Extract the list of users all_users = [] if "results" in list_result and len(list_result["results"]) > 0: result = list_result["results"][0] if "rows" in result: for row in result["rows"]: all_users.append(row["values"]) return { "status": "error", "message": "Super user was not found after creation", "email": super_user_email, "all_users": all_users } except Exception as e: logger.error(f"Error creating super user with HTTP API: {str(e)}") return { "status": "error", "message": f"Error creating super user: {str(e)}", "email": super_user_email } # Super user creation endpoint (direct SQL) @app.get("/create-super-user", tags=["General"]) async def create_super_user_endpoint(): """ Create a super user with email test@seamo.earth directly using SQL. This endpoint is for testing purposes only. Returns: dict: Information about the super user creation. """ logger.info("Create super user endpoint accessed") # Super user details super_user_email = "test@seamo.earth" super_user_password = "TestPassword123!" # Create password hashing context pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") hashed_password = pwd_context.hash(super_user_password) try: # Check if super user already exists - using COUNT(*) for more reliable results logger.info("Checking if super user already exists") count_result = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() user_exists = count_result and count_result[0] > 0 if user_exists: # Get user details existing_user = app.db_conn.execute("SELECT id, is_admin FROM users WHERE email = ?", (super_user_email,)).fetchone() if existing_user: user_id = existing_user[0] is_admin = existing_user[1] if len(existing_user) > 1 else 0 logger.info(f"Super user already exists with ID: {user_id}, is_admin: {is_admin}") # If user exists but is not admin, make them admin if not is_admin: logger.info("Updating user to have admin privileges") app.db_conn.execute("UPDATE users SET is_admin = 1 WHERE email = ?", (super_user_email,)) app.db_conn.commit() logger.info("User updated to admin successfully") return { "status": "updated", "message": f"Super user already existed with ID {user_id} and was updated to have admin privileges", "user_id": user_id, "email": super_user_email } return { "status": "exists", "message": f"Super user already exists with ID {user_id} and has admin privileges", "user_id": user_id, "email": super_user_email } else: logger.warning("User count says user exists but couldn't retrieve details") # Insert the super user - using the approach from turso_interactive_crud.py logger.info(f"Inserting super user with email: {super_user_email}") # First, make sure the table exists app.db_conn.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """) app.db_conn.commit() # Insert the user app.db_conn.execute( "INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 1)", (super_user_email, hashed_password) ) app.db_conn.commit() logger.info("Super user insert committed") # Wait a moment to ensure the insert is processed time.sleep(0.5) # Get the user ID by querying for the email we just inserted # This is more reliable than using lastrowid which might not work correctly with Turso logger.info("Getting user ID by email") id_result = app.db_conn.execute("SELECT id FROM users WHERE email = ?", (super_user_email,)).fetchone() if id_result: user_id = id_result[0] logger.info(f"Found user ID: {user_id}") # Get the full user record logger.info("Getting full user record") user = app.db_conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if user: logger.info(f"Super user retrieved successfully: {user}") return { "status": "success", "message": "Super user created and retrieved successfully", "user_id": user_id, "email": super_user_email } else: logger.warning(f"Found user ID {user_id} but couldn't retrieve full record") else: logger.warning("Could not find user ID after insert") # If we get here, we couldn't verify the user was created properly # Try to list all users as a diagnostic step logger.info("Listing all users for diagnostic purposes") all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() logger.info(f"All users: {all_users}") # Try one more approach - check if the user exists now count_after = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() exists_after = count_after and count_after[0] > 0 if exists_after: logger.info("User exists according to COUNT(*) after insert") return { "status": "likely_success", "message": "Super user appears to have been created (confirmed by COUNT query)", "email": super_user_email, "all_users": str(all_users) } else: logger.warning("User does not exist according to COUNT(*) after insert") return { "status": "partial_success", "message": "Super user insert operation completed but user could not be verified", "email": super_user_email, "all_users": str(all_users) } except Exception as e: logger.error(f"Error creating super user: {str(e)}") return { "status": "error", "message": f"Error creating super user: {str(e)}", "email": super_user_email } # Test user creation endpoint (direct SQL) @app.get("/create-test-user", tags=["General"]) async def create_test_user(): """ Create a test user directly using SQL. This endpoint is for testing purposes only. Returns: dict: Information about the test user creation. """ logger.info("Create test user endpoint accessed") # Generate a unique email test_id = int(time.time()) test_email = f"test_user_{test_id}@example.com" test_password = "TestPassword123!" # Create password hashing context pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") hashed_password = pwd_context.hash(test_password) try: # Insert the test user logger.info(f"Inserting test user with email: {test_email}") app.db_conn.execute( "INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 0)", (test_email, hashed_password) ) app.db_conn.commit() logger.info("Test user inserted successfully") # Try to retrieve the test user logger.info("Retrieving test user") user = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (test_email,)).fetchone() if user: logger.info(f"Test user retrieved successfully: {user}") return { "status": "success", "message": "Test user created and retrieved successfully", "user_id": user[0] if user else None, "email": test_email } else: logger.warning("Test user not found after insert") # Try to list all users all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() logger.info(f"All users: {all_users}") return { "status": "partial_success", "message": "Test user created but could not be retrieved", "email": test_email, "all_users": str(all_users) } except Exception as e: logger.error(f"Error creating test user: {str(e)}") return { "status": "error", "message": f"Error creating test user: {str(e)}", "email": test_email } # Direct HTTP API endpoint for user registration @app.post("/direct-register", tags=["Authentication"]) async def direct_register(request: Request): """ Register a new user using direct HTTP API. This is a workaround for issues with the libsql-experimental driver. """ class UserCreate(BaseModel): email: EmailStr password: str # Generate a unique request ID request_id = f"direct_reg_{int(time.time())}" logger.info(f"[{request_id}] Starting direct registration") try: # Parse the request body body = await request.json() user = UserCreate(**body) logger.info(f"[{request_id}] Registering user with email: {user.email}") # Get Turso database connection details db_url = os.getenv("TURSO_DATABASE_URL") auth_token = os.getenv("TURSO_AUTH_TOKEN") if not db_url or not auth_token: logger.error(f"[{request_id}] Missing Turso credentials") return { "success": False, "error": "Missing Turso credentials" } # Clean the auth token auth_token = auth_token.strip() # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') # Verify the URL format if not http_url.startswith("https://"): if "://" not in http_url: http_url = f"https://{http_url}" logger.info(f"[{request_id}] Using HTTP URL: {http_url}") # Check if user already exists logger.info(f"[{request_id}] Checking if user already exists") check_query = "SELECT id FROM users WHERE email = ?" # Format the request for checking if user exists check_data = { "requests": [ { "type": "execute", "stmt": { "sql": check_query, "args": [ {"type": "text", "value": user.email} ] } }, {"type": "close"} ] } # Send the request headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } pipeline_url = f"{http_url}/v2/pipeline" check_response = requests.post(pipeline_url, headers=headers, json=check_data, timeout=10) check_response.raise_for_status() check_result = check_response.json() if "results" in check_result and len(check_result["results"]) > 0: result = check_result["results"][0] if "rows" in result and len(result["rows"]) > 0: logger.warning(f"[{request_id}] User with email {user.email} already exists") return { "success": False, "error": "Email already registered" } # Hash the password logger.info(f"[{request_id}] Hashing password") pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") hashed_password = pwd_context.hash(user.password) logger.info(f"[{request_id}] Password hashed successfully") # Insert the user logger.info(f"[{request_id}] Inserting user") insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" # Format the request for inserting the user insert_data = { "requests": [ { "type": "execute", "stmt": { "sql": insert_query, "args": [ {"type": "text", "value": user.email}, {"type": "text", "value": hashed_password} ] } }, { "type": "execute", "stmt": { "sql": "SELECT last_insert_rowid()" } }, {"type": "close"} ] } # Send the request insert_response = requests.post(pipeline_url, headers=headers, json=insert_data, timeout=10) insert_response.raise_for_status() insert_result = insert_response.json() # Get the user ID user_id = None if "results" in insert_result and len(insert_result["results"]) > 1: id_result = insert_result["results"][1] if "rows" in id_result and len(id_result["rows"]) > 0: user_id = id_result["rows"][0]["values"][0] logger.info(f"[{request_id}] User inserted with ID: {user_id}") # Verify the insert logger.info(f"[{request_id}] Verifying user was created") verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" # Format the request for verifying the user verify_data = { "requests": [ { "type": "execute", "stmt": { "sql": verify_query, "args": [ {"type": "text", "value": user.email} ] } }, {"type": "close"} ] } # Send the request verify_response = requests.post(pipeline_url, headers=headers, json=verify_data, timeout=10) verify_response.raise_for_status() verify_result = verify_response.json() if "results" in verify_result and len(verify_result["results"]) > 0: result = verify_result["results"][0] if "rows" in result and len(result["rows"]) > 0: user_data = result["rows"][0]["values"] user_id = user_data[0] logger.info(f"[{request_id}] Verified user with ID: {user_id}") # Generate access token from datetime import timedelta from app.api.routes.auth_router import create_access_token, create_refresh_token access_token_expires = timedelta(minutes=30) refresh_token_expires = timedelta(days=7) access_token = create_access_token( data={"sub": user.email}, expires_delta=access_token_expires ) refresh_token = create_refresh_token( data={"sub": user.email}, expires_delta=refresh_token_expires ) return { "success": True, "id": user_id, "email": user.email, "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer" } else: logger.error(f"[{request_id}] Failed to verify user after insert") return { "success": False, "error": "Failed to verify user after insert" } else: logger.error(f"[{request_id}] Failed to verify user after insert") return { "success": False, "error": "Failed to verify user after insert" } except Exception as e: logger.error(f"[{request_id}] Error during direct registration: {str(e)}") return { "success": False, "error": str(e) } # Import and include routers from app.api.routes import auth_router, projects_router, journals_router app.include_router(auth_router.router, prefix="/api/auth", tags=["Authentication"]) app.include_router(projects_router.router, prefix="/api/projects", tags=["Projects"]) app.include_router(journals_router.router, prefix="/api/journals", tags=["Journals"]) # Fix users table endpoint @app.get("/fix-users-table", tags=["General"]) async def fix_users_table(): """ Fix the users table by recreating it without the CHECK constraint on hashed_password. This endpoint is for fixing database issues. Returns: dict: Information about the operation. """ logger.info("Fix users table endpoint accessed") operation_id = f"fix_users_table_{int(time.time())}" try: # Import the HTTP API utility from app.utils import db_http # Step 1: Check if users table exists logger.info(f"[{operation_id}] Checking if users table exists") # Step 2: Create a temporary table without the constraint logger.info(f"[{operation_id}] Creating temporary table") db_http.execute_query( """ CREATE TABLE IF NOT EXISTS users_temp ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, hashed_password TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME, is_admin INTEGER DEFAULT 0 ) """, operation_id=f"{operation_id}_create_temp" ) # Step 3: Copy data from users to users_temp if users exists logger.info(f"[{operation_id}] Copying data to temporary table") try: db_http.execute_query( """ INSERT INTO users_temp (id, email, hashed_password, created_at, last_login, is_admin) SELECT id, email, hashed_password, created_at, last_login, is_admin FROM users """, operation_id=f"{operation_id}_copy_data" ) logger.info(f"[{operation_id}] Data copied successfully") except Exception as e: logger.warning(f"[{operation_id}] Error copying data: {str(e)}") # This is expected if the users table doesn't exist or is empty # Step 4: Drop the original users table logger.info(f"[{operation_id}] Dropping original users table") db_http.execute_query( "DROP TABLE IF EXISTS users", operation_id=f"{operation_id}_drop_users" ) # Step 5: Rename users_temp to users logger.info(f"[{operation_id}] Renaming temporary table to users") db_http.execute_query( "ALTER TABLE users_temp RENAME TO users", operation_id=f"{operation_id}_rename_table" ) # Step 6: Create indexes logger.info(f"[{operation_id}] Creating indexes") db_http.execute_query( "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)", operation_id=f"{operation_id}_create_index" ) logger.info(f"[{operation_id}] Users table fixed successfully") return { "success": True, "message": "Users table fixed successfully" } except Exception as e: logger.error(f"[{operation_id}] Error fixing users table: {str(e)}") return { "success": False, "error": str(e) } # Test database with HTTP API endpoint @app.get("/test-db-http", tags=["General"]) async def test_db_http(): """ Test the database connection and operations using the HTTP API. This endpoint is for testing purposes only. Returns: dict: Information about the database tests. """ logger.info("Test database HTTP API endpoint accessed") results = { "tests": [], "success_count": 0, "total_count": 0 } try: # Extract the database URL and auth token db_url = os.getenv("TURSO_DATABASE_URL", "") auth_token = os.getenv("TURSO_AUTH_TOKEN", "") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') # Set up headers for the HTTP request headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } # Test 1: Basic connection basic_query = { "requests": [ { "type": "execute", "stmt": {"sql": "SELECT 1"} }, {"type": "close"} ] } basic_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=basic_query) basic_response.raise_for_status() basic_result = basic_response.json() results["tests"].append({ "name": "Basic connection (HTTP API)", "success": True, "result": str(basic_result) }) results["success_count"] += 1 results["total_count"] += 1 # Test 2: Create a test table create_table_query = { "requests": [ { "type": "execute", "stmt": { "sql": """ CREATE TABLE IF NOT EXISTS test_table_http ( id INTEGER PRIMARY KEY AUTOINCREMENT, value TEXT NOT NULL ) """ } }, {"type": "close"} ] } create_table_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=create_table_query) create_table_response.raise_for_status() results["tests"].append({ "name": "Create test table (HTTP API)", "success": True }) results["success_count"] += 1 results["total_count"] += 1 # Test 3: Insert into test table insert_query = { "requests": [ { "type": "execute", "stmt": { "sql": "INSERT INTO test_table_http (value) VALUES (?)", "args": [ {"type": "text", "value": "test_value_http"} ] } }, {"type": "close"} ] } insert_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=insert_query) insert_response.raise_for_status() results["tests"].append({ "name": "Insert into test table (HTTP API)", "success": True }) results["success_count"] += 1 results["total_count"] += 1 # Test 4: Select from test table select_query = { "requests": [ { "type": "execute", "stmt": {"sql": "SELECT * FROM test_table_http"} }, {"type": "close"} ] } select_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=select_query) select_response.raise_for_status() select_result = select_response.json() results["tests"].append({ "name": "Select from test table (HTTP API)", "success": True, "result": str(select_result) }) results["success_count"] += 1 results["total_count"] += 1 except Exception as e: results["error"] = str(e) # Add success rate results["success_rate"] = f"{results['success_count']}/{results['total_count']}" return results @app.get("/test-db-http-util", tags=["General"]) async def test_db_http_util(): """ Test the database connection and operations using the HTTP API utility. This endpoint is for testing purposes only. Returns: dict: Information about the database tests. """ logger.info("Test database HTTP API utility endpoint accessed") try: # Import the HTTP API utility from app.utils import db_http operation_id = f"test_db_http_util_{int(time.time())}" logger.info(f"[{operation_id}] Starting HTTP API utility test") results = { "connection_type": "http-api-util", "operations": [] } # Test 1: Simple SELECT logger.info(f"[{operation_id}] Test 1: Simple SELECT") result = db_http.execute_query("SELECT 1 as test", operation_id=f"{operation_id}_1") results["operations"].append({ "name": "Simple SELECT", "success": True, "result": str(result) }) # Test 2: Create a temporary table logger.info(f"[{operation_id}] Test 2: Create a temporary table") db_http.create_table_if_not_exists( "test_table_http_util", """ id INTEGER PRIMARY KEY AUTOINCREMENT, value TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP """, operation_id=f"{operation_id}_2" ) results["operations"].append({ "name": "Create temporary table", "success": True }) # Test 3: Insert into the temporary table logger.info(f"[{operation_id}] Test 3: Insert into the temporary table") test_value = f"test_value_http_util_{int(time.time())}" insert_id = db_http.insert_record( "test_table_http_util", {"value": test_value}, operation_id=f"{operation_id}_3" ) results["operations"].append({ "name": "Insert into temporary table", "success": insert_id is not None, "insert_id": insert_id }) # Test 4: Select from the temporary table logger.info(f"[{operation_id}] Test 4: Select from the temporary table") records = db_http.select_records( "test_table_http_util", condition="value = ?", condition_params=[{"type": "text", "value": test_value}], limit=1, operation_id=f"{operation_id}_4" ) results["operations"].append({ "name": "Select from temporary table", "success": len(records) > 0, "result": str(records[0]) if records else None }) # Test 5: Update the record logger.info(f"[{operation_id}] Test 5: Update the record") updated_value = f"{test_value}_updated" success = db_http.update_record( "test_table_http_util", {"value": updated_value}, "id = ?", [{"type": "integer", "value": str(insert_id)}], operation_id=f"{operation_id}_5" ) results["operations"].append({ "name": "Update record", "success": success }) # Test 6: Verify the update logger.info(f"[{operation_id}] Test 6: Verify the update") updated_records = db_http.select_records( "test_table_http_util", condition="id = ?", condition_params=[{"type": "integer", "value": str(insert_id)}], limit=1, operation_id=f"{operation_id}_6" ) results["operations"].append({ "name": "Verify update", "success": len(updated_records) > 0 and updated_records[0].get("value") == updated_value, "result": str(updated_records[0]) if updated_records else None }) # Test 7: Count records logger.info(f"[{operation_id}] Test 7: Count records") count = db_http.count_records("test_table_http_util", operation_id=f"{operation_id}_7") results["operations"].append({ "name": "Count records", "success": count > 0, "count": count }) # Test 8: List all records logger.info(f"[{operation_id}] Test 8: List all records") all_records = db_http.select_records( "test_table_http_util", limit=10, operation_id=f"{operation_id}_8" ) results["operations"].append({ "name": "List all records", "success": True, "count": len(all_records), "records": all_records }) # Test 9: Create a test user logger.info(f"[{operation_id}] Test 9: Create a test user") # Create password hashing context from app.api.routes.auth_router import get_password_hash test_user_email = f"test_user_http_util_{int(time.time())}@example.com" test_user_password = "TestPassword123!" hashed_password = get_password_hash(test_user_password) user_id = db_http.insert_record( "users", { "email": test_user_email, "hashed_password": hashed_password, "is_admin": 0 }, operation_id=f"{operation_id}_9" ) results["operations"].append({ "name": "Create test user", "success": user_id is not None, "user_id": user_id, "email": test_user_email }) # Test 10: Verify the user was created logger.info(f"[{operation_id}] Test 10: Verify the user was created") users = db_http.select_records( "users", condition="email = ?", condition_params=[{"type": "text", "value": test_user_email}], limit=1, operation_id=f"{operation_id}_10" ) results["operations"].append({ "name": "Verify test user", "success": len(users) > 0, "result": str(users[0]) if users else None }) logger.info(f"[{operation_id}] HTTP API utility test completed successfully") return results except Exception as e: logger.error(f"Error during HTTP API utility test: {str(e)}") return { "error": str(e), "operations": [] } if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) logger.info(f"Starting server on port {port}") uvicorn.run("main:app", host="0.0.0.0", port=port)