Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import logging | |
import requests | |
from fastapi import FastAPI, HTTPException, Depends, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html | |
from fastapi.openapi.utils import get_openapi | |
from fastapi.staticfiles import StaticFiles | |
import uvicorn | |
from dotenv import load_dotenv | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler(sys.stdout)] | |
) | |
logger = logging.getLogger("auth-server") | |
# Import database libraries | |
try: | |
# First try the recommended libsql-experimental package | |
import libsql_experimental as libsql | |
logger.info("Successfully imported libsql-experimental package") | |
HAS_LIBSQL = True | |
LIBSQL_TYPE = "experimental" | |
except ImportError: | |
try: | |
# Then try the libsql-client package as fallback | |
import libsql_client | |
logger.info("Successfully imported libsql-client package") | |
HAS_LIBSQL = True | |
LIBSQL_TYPE = "client" | |
except ImportError: | |
logger.error("Failed to import any libsql package. Please install libsql-experimental==0.0.49") | |
logger.error("Falling back to HTTP API method for database access") | |
# We'll use requests for HTTP API fallback | |
import requests | |
HAS_LIBSQL = False | |
LIBSQL_TYPE = "http" | |
# Load environment variables | |
load_dotenv() | |
# Create FastAPI app with detailed metadata | |
app = FastAPI( | |
title="Seamo Auth Server", | |
description=""" | |
# Seamo Authentication API | |
The Seamo Auth Server provides authentication and user management services for the Seamo platform. | |
## Features | |
* User registration and authentication | |
* Project management | |
* Journal management | |
* Access control | |
## Authentication | |
Most endpoints require authentication. Use the /api/auth/token endpoint to obtain access tokens. | |
""", | |
version="1.0.0", | |
contact={ | |
"name": "Seamo Team", | |
"url": "https://seamo.earth/contact", | |
"email": "[email protected]", | |
}, | |
license_info={ | |
"name": "MIT", | |
"url": "https://opensource.org/licenses/MIT", | |
}, | |
openapi_tags=[ | |
{ | |
"name": "Authentication", | |
"description": "Operations related to user authentication and token management", | |
}, | |
{ | |
"name": "Projects", | |
"description": "Operations related to project management", | |
}, | |
{ | |
"name": "Journals", | |
"description": "Operations related to journal management", | |
}, | |
{ | |
"name": "General", | |
"description": "General server information and health checks", | |
}, | |
], | |
docs_url=None, # Disable default docs | |
redoc_url=None # Disable default redoc | |
) | |
# Configure CORS | |
origins = [ | |
os.getenv("FRONTEND_URL", "http://localhost:3000"), | |
"https://seamo.earth", | |
"https://seamoo.netlify.app", | |
"https://seamo-ai-ai-server.hf.space", | |
"https://seamo-ai-auth-server.hf.space", | |
"https://seamo-ai-scraper-server.hf.space", | |
"http://localhost:3000", # For local development | |
"http://localhost:8000", # Local AI server | |
"http://localhost:8001", # Local Auth server | |
"http://localhost:8002", # Local Scraper server | |
] | |
print(f"CORS origins configured: {origins}") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Database connection | |
async def startup_db_client(): | |
# Get environment variables | |
db_url = os.getenv("TURSO_DATABASE_URL") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN") | |
if not db_url or not auth_token: | |
error_msg = "Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set." | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
# Clean the auth token to remove any problematic characters | |
clean_auth_token = auth_token.strip() | |
# Log connection details (without showing the full token) | |
token_preview = clean_auth_token[:10] + "..." if len(clean_auth_token) > 10 else "***" | |
logger.info(f"Connecting to database at URL: {db_url}") | |
logger.info(f"Using libsql type: {LIBSQL_TYPE}") | |
logger.info(f"Auth token preview: {token_preview}") | |
# Ensure the token is properly formatted | |
if not clean_auth_token.startswith("eyJ"): | |
logger.warning("Auth token does not appear to be in JWT format (should start with 'eyJ')") | |
# Try to extract the token if it's wrapped in quotes or has extra characters | |
if "eyJ" in clean_auth_token: | |
start_idx = clean_auth_token.find("eyJ") | |
# Find the end of the token (usually at a quote, space, or newline) | |
end_markers = ['"', "'", ' ', '\n', '\r'] | |
end_idx = len(clean_auth_token) | |
for marker in end_markers: | |
marker_idx = clean_auth_token.find(marker, start_idx) | |
if marker_idx > start_idx and marker_idx < end_idx: | |
end_idx = marker_idx | |
clean_auth_token = clean_auth_token[start_idx:end_idx] | |
logger.info(f"Extracted JWT token: {clean_auth_token[:10]}...") | |
# Verify the token has three parts (header.payload.signature) | |
parts = clean_auth_token.split('.') | |
if len(parts) != 3: | |
logger.warning(f"Auth token does not have the expected JWT format (3 parts separated by dots). Found {len(parts)} parts.") | |
else: | |
logger.info("Auth token has the expected JWT format") | |
# Initialize database connection | |
connected = False | |
# Method 1: Try with libsql-experimental | |
if HAS_LIBSQL and LIBSQL_TYPE == "experimental": | |
try: | |
logger.info("Connecting with libsql-experimental") | |
# Try multiple connection methods | |
# Method 1a: Try with auth_token parameter (works with version 0.0.49) | |
try: | |
logger.info("Trying connection with auth_token parameter") | |
# Log the libsql version | |
if hasattr(libsql, '__version__'): | |
logger.info(f"libsql-experimental version: {libsql.__version__}") | |
# Create the connection | |
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) | |
# Test connection with a simple query | |
logger.info("Testing connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection successful with auth_token parameter: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing connection with INSERT") | |
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value",)) | |
app.db_conn.commit() | |
# Test select | |
logger.info("Testing connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"Test table contents: {result}") | |
connected = True | |
app.db_type = "libsql-experimental" | |
app.last_successful_connection_method = "auth_token" | |
logger.info("All connection tests passed successfully") | |
except Exception as e: | |
logger.warning(f"Connection with auth_token parameter failed: {str(e)}") | |
# Method 1b: Try with auth token in URL (works with other versions) | |
if not connected: | |
try: | |
logger.info("Trying connection with auth token in URL") | |
# Format the URL to include the auth token | |
if "?" in db_url: | |
connection_url = f"{db_url}&authToken={clean_auth_token}" | |
else: | |
connection_url = f"{db_url}?authToken={clean_auth_token}" | |
logger.info(f"Using connection URL: {connection_url}") | |
# Use the direct URL connection method with auth token in URL | |
app.db_conn = libsql.connect(connection_url) | |
app.last_successful_connection_url = connection_url | |
# Test connection with a simple query | |
logger.info("Testing connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection successful with auth token in URL: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing connection with INSERT") | |
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_url",)) | |
app.db_conn.commit() | |
# Test select | |
logger.info("Testing connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"Test table contents: {result}") | |
connected = True | |
app.db_type = "libsql-experimental" | |
app.last_successful_connection_method = "url" | |
logger.info("All connection tests passed successfully") | |
except Exception as e: | |
logger.error(f"Connection with auth token in URL failed: {str(e)}") | |
except Exception as e: | |
logger.error(f"All libsql-experimental connection methods failed: {str(e)}") | |
# Method 2: Try with libsql-client | |
if not connected and HAS_LIBSQL and LIBSQL_TYPE == "client": | |
try: | |
logger.info("Connecting with libsql-client") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
logger.info(f"Using URL: {http_url}") | |
# Connect using the client | |
app.db_conn = libsql_client.create_client_sync( | |
url=http_url, | |
auth_token=clean_auth_token | |
) | |
# Test connection | |
result = app.db_conn.execute("SELECT 1").rows() | |
logger.info(f"Connection test successful: {result}") | |
connected = True | |
app.db_type = "libsql-client" | |
except Exception as e: | |
logger.error(f"libsql-client connection failed: {str(e)}") | |
# Method 3: Fallback to HTTP API | |
if not connected: | |
try: | |
logger.info("Falling back to HTTP API method") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
# Verify the URL format | |
if not http_url.startswith("https://"): | |
logger.warning(f"HTTP URL does not start with https://: {http_url}") | |
# Try to fix the URL | |
if "://" not in http_url: | |
http_url = f"https://{http_url}" | |
logger.info(f"Added https:// prefix to URL: {http_url}") | |
logger.info(f"Using HTTP URL: {http_url}") | |
# Create a simple HTTP API client class | |
class TursoHttpClient: | |
def __init__(self, url, auth_token): | |
self.url = url | |
self.auth_token = auth_token | |
self.headers = { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
# Add a property to track the last inserted ID | |
self.last_insert_id = None | |
def execute(self, query, params=None): | |
# Format the request according to the v2/pipeline specification | |
requests_data = [] | |
# Prepare the statement | |
stmt = {"sql": query} | |
# Add parameters if provided | |
if params: | |
# Convert parameters to the expected format | |
args = [] | |
for param in params: | |
if param is None: | |
args.append({"type": "null", "value": None}) | |
elif isinstance(param, int): | |
args.append({"type": "integer", "value": str(param)}) | |
elif isinstance(param, float): | |
args.append({"type": "float", "value": str(param)}) | |
else: | |
args.append({"type": "text", "value": str(param)}) | |
stmt["args"] = args | |
requests_data.append({"type": "execute", "stmt": stmt}) | |
# If this is an INSERT, add a query to get the last inserted ID | |
is_insert = query.strip().upper().startswith("INSERT") | |
if is_insert: | |
requests_data.append({ | |
"type": "execute", | |
"stmt": {"sql": "SELECT last_insert_rowid()"} | |
}) | |
# Always close the connection at the end | |
requests_data.append({"type": "close"}) | |
# Prepare the final request payload | |
data = {"requests": requests_data} | |
# Use the v2/pipeline endpoint | |
pipeline_url = f"{self.url}/v2/pipeline" | |
logger.info(f"Sending request to: {pipeline_url}") | |
logger.info(f"Headers: Authorization: Bearer {self.auth_token[:5]}... (truncated)") | |
try: | |
response = requests.post(pipeline_url, headers=self.headers, json=data, timeout=10) | |
# Log response status | |
logger.info(f"Response status: {response.status_code}") | |
# Check for auth errors specifically | |
if response.status_code == 401: | |
logger.error(f"Authentication error (401): {response.text}") | |
raise Exception(f"Authentication failed: {response.text}") | |
# Raise for other errors | |
response.raise_for_status() | |
# Parse the response | |
result = response.json() | |
except requests.exceptions.RequestException as e: | |
logger.error(f"HTTP request failed: {str(e)}") | |
raise | |
# Process the response | |
if "results" in result and len(result["results"]) > 0: | |
# If this was an INSERT, get the last inserted ID | |
if is_insert and len(result["results"]) > 1: | |
try: | |
last_id_result = result["results"][1] | |
if "rows" in last_id_result and len(last_id_result["rows"]) > 0: | |
self.last_insert_id = last_id_result["rows"][0]["values"][0] | |
logger.info(f"Last inserted ID: {self.last_insert_id}") | |
except Exception as e: | |
logger.warning(f"Failed to get last inserted ID: {str(e)}") | |
# Return a cursor-like object with the main result | |
cursor = TursoHttpCursor(result["results"][0]) | |
cursor.lastrowid = self.last_insert_id | |
return cursor | |
# Return an empty cursor | |
cursor = TursoHttpCursor(None) | |
cursor.lastrowid = self.last_insert_id | |
return cursor | |
def commit(self): | |
# HTTP API is stateless, no need to commit | |
logger.info("HTTP API commit called (no-op)") | |
pass | |
def close(self): | |
# HTTP API is stateless, no need to close | |
logger.info("HTTP API close called (no-op)") | |
pass | |
# Create a cursor-like class for HTTP API | |
class TursoHttpCursor: | |
def __init__(self, result): | |
self.result = result | |
self.lastrowid = None | |
def fetchone(self): | |
if self.result and "rows" in self.result and len(self.result["rows"]) > 0: | |
return self.result["rows"][0]["values"] | |
return None | |
def fetchall(self): | |
if self.result and "rows" in self.result: | |
return [row["values"] for row in self.result["rows"]] | |
return [] | |
# Create the HTTP API client | |
app.db_conn = TursoHttpClient(http_url, clean_auth_token) | |
# Test connection with a simple query | |
logger.info("Testing HTTP API connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"HTTP API connection test successful: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing HTTP API connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing HTTP API connection with INSERT") | |
cursor = app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_http",)) | |
app.db_conn.commit() | |
logger.info(f"HTTP API insert test - lastrowid: {cursor.lastrowid}") | |
# Test select | |
logger.info("Testing HTTP API connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"HTTP API test table contents: {result}") | |
connected = True | |
app.db_type = "http-api" | |
logger.info("All HTTP API connection tests passed successfully") | |
except Exception as e: | |
logger.error(f"HTTP API connection failed: {str(e)}") | |
if not connected: | |
error_msg = "All database connection methods failed. Please check your credentials and try again." | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
# Create tables if they don't exist | |
# We'll create each table in a separate try-except block to ensure that if one fails, others can still be created | |
logger.info("Creating database tables") | |
# Function to execute SQL with retry | |
def execute_with_retry(sql, max_retries=3): | |
for attempt in range(max_retries): | |
try: | |
# For each attempt, we'll create a fresh connection if needed | |
if attempt > 0: | |
logger.info(f"Retry attempt {attempt+1} for SQL execution") | |
# Test the connection first | |
try: | |
test_result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection test successful: {test_result}") | |
except Exception as conn_err: | |
logger.warning(f"Connection test failed, reconnecting: {str(conn_err)}") | |
# Reconnect using the same method that worked initially | |
if app.db_type == "libsql-experimental": | |
if hasattr(app, 'last_successful_connection_method') and app.last_successful_connection_method == 'auth_token': | |
logger.info("Reconnecting with auth_token parameter") | |
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) | |
elif hasattr(app, 'last_successful_connection_url'): | |
logger.info(f"Reconnecting with URL: {app.last_successful_connection_url}") | |
app.db_conn = libsql.connect(app.last_successful_connection_url) | |
# Execute the SQL | |
app.db_conn.execute(sql) | |
app.db_conn.commit() | |
return True | |
except Exception as e: | |
logger.warning(f"SQL execution failed (attempt {attempt+1}): {str(e)}") | |
if attempt == max_retries - 1: | |
raise | |
# Small delay before retry | |
import time | |
time.sleep(1) | |
# Simplified table creation with fewer constraints | |
try: | |
# Create users table | |
users_table = """ | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""" | |
execute_with_retry(users_table) | |
logger.info("Users table created successfully") | |
# Create projects table | |
projects_table = """ | |
CREATE TABLE IF NOT EXISTS projects ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
owner_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
description TEXT, | |
geojson TEXT, | |
storage_bucket TEXT DEFAULT 'default', | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
execute_with_retry(projects_table) | |
logger.info("Projects table created successfully") | |
# Create journals table | |
journals_table = """ | |
CREATE TABLE IF NOT EXISTS journals ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
project_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
content TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
execute_with_retry(journals_table) | |
logger.info("Journals table created successfully") | |
logger.info("All database tables created successfully") | |
except Exception as e: | |
error_msg = f"Failed to create database tables: {str(e)}" | |
logger.error(error_msg) | |
# We'll continue even if table creation fails | |
# This allows the server to start and use existing tables if they exist | |
logger.warning("Continuing server startup despite table creation errors") | |
async def shutdown_db_client(): | |
logger.info("Shutting down database connection") | |
try: | |
# Close connection based on the type | |
if hasattr(app, 'db_type'): | |
if app.db_type == "libsql-experimental": | |
try: | |
app.db_conn.close() | |
logger.info("libsql-experimental connection closed successfully") | |
except Exception as e: | |
logger.warning(f"Error closing libsql-experimental connection: {str(e)}") | |
elif app.db_type == "libsql-client": | |
# libsql-client doesn't have a close method | |
logger.info("No close method needed for libsql-client") | |
elif app.db_type == "http-api": | |
# HTTP API is stateless, no need to close | |
logger.info("No close method needed for HTTP API") | |
else: | |
logger.warning(f"Unknown database type: {app.db_type}") | |
else: | |
logger.warning("No database connection to close") | |
except Exception as e: | |
logger.error(f"Error closing database connection: {str(e)}") | |
# Custom API documentation routes | |
async def custom_swagger_ui_html(): | |
return get_swagger_ui_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - Swagger UI", | |
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, | |
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js", | |
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css", | |
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
async def redoc_html(): | |
return get_redoc_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - ReDoc", | |
redoc_js_url="https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js", | |
redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
# Serve Swagger UI at root path | |
async def root_swagger(): | |
logger.info("Root endpoint accessed - serving Swagger UI") | |
return get_swagger_ui_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - API Documentation", | |
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, | |
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js", | |
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css", | |
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
# API information endpoint (moved from root) | |
async def api_info(): | |
""" | |
API information endpoint providing details about the Seamo Auth Server API. | |
Returns: | |
dict: Basic information about the API and links to documentation. | |
""" | |
logger.info("API info endpoint accessed") | |
return { | |
"message": "Welcome to Seamo Auth Server API", | |
"version": "1.0.0", | |
"documentation": { | |
"swagger_ui": "/", | |
"redoc": "/redoc", | |
"openapi_json": "/openapi.json" | |
}, | |
"endpoints": { | |
"health": "/health", | |
"auth": "/api/auth", | |
"projects": "/api/projects", | |
"journals": "/api/journals" | |
} | |
} | |
# Health check endpoint | |
async def health_check(): | |
""" | |
Health check endpoint to verify the server is running properly. | |
Returns: | |
dict: Status information about the server. | |
""" | |
logger.info("Health check endpoint accessed") | |
return { | |
"status": "healthy", | |
"version": "1.0.0", | |
"database_connected": hasattr(app, "db_conn"), | |
"database_type": getattr(app, "db_type", "unknown") | |
} | |
# Database test endpoint | |
async def test_database(): | |
""" | |
Test endpoint to verify database operations. | |
This is for debugging purposes only. | |
""" | |
import time | |
# Generate a unique test ID | |
test_id = f"test_{int(time.time())}" | |
logger.info(f"[{test_id}] Starting database test") | |
results = { | |
"connection_type": getattr(app, "db_type", "unknown"), | |
"connection_object_type": type(app.db_conn).__name__ if hasattr(app, "db_conn") else "None", | |
"operations": [] | |
} | |
if hasattr(app, "last_successful_connection_method"): | |
results["connection_method"] = app.last_successful_connection_method | |
if not hasattr(app, "db_conn"): | |
logger.error(f"[{test_id}] Database connection not available") | |
results["operations"].append({ | |
"name": "Database connection check", | |
"success": False, | |
"error": "Database connection not available" | |
}) | |
return results | |
try: | |
# Test 1: Simple SELECT | |
logger.info(f"[{test_id}] Test 1: Simple SELECT") | |
test_query = "SELECT 1 as test" | |
result = app.db_conn.execute(test_query).fetchone() | |
results["operations"].append({ | |
"name": "Simple SELECT", | |
"success": result is not None, | |
"result": str(result) if result is not None else None | |
}) | |
logger.info(f"[{test_id}] Test 1 result: {result}") | |
# Test 2: Create a temporary table | |
logger.info(f"[{test_id}] Test 2: Create a temporary table") | |
create_temp_table = """ | |
CREATE TABLE IF NOT EXISTS test_table ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
app.db_conn.execute(create_temp_table) | |
app.db_conn.commit() | |
results["operations"].append({ | |
"name": "Create temporary table", | |
"success": True | |
}) | |
logger.info(f"[{test_id}] Test 2 completed successfully") | |
# Test 3: Insert into the temporary table | |
logger.info(f"[{test_id}] Test 3: Insert into the temporary table") | |
test_name = f"test_user_{int(time.time())}" | |
insert_query = "INSERT INTO test_table (name) VALUES (?)" | |
cursor = app.db_conn.execute(insert_query, (test_name,)) | |
app.db_conn.commit() | |
# Check if lastrowid is available | |
last_id = None | |
try: | |
last_id = cursor.lastrowid | |
logger.info(f"[{test_id}] Got lastrowid: {last_id}") | |
except Exception as e: | |
logger.warning(f"[{test_id}] Could not get lastrowid: {str(e)}") | |
# Try to get the ID using a query | |
try: | |
id_query = "SELECT id FROM test_table WHERE name = ? ORDER BY id DESC LIMIT 1" | |
id_result = app.db_conn.execute(id_query, (test_name,)).fetchone() | |
if id_result: | |
last_id = id_result[0] | |
logger.info(f"[{test_id}] Got ID from query: {last_id}") | |
except Exception as e2: | |
logger.error(f"[{test_id}] Error getting ID from query: {str(e2)}") | |
results["operations"].append({ | |
"name": "Insert into temporary table", | |
"success": True, | |
"last_id": last_id | |
}) | |
logger.info(f"[{test_id}] Test 3 completed successfully. Last ID: {last_id}") | |
# Test 4: Select from the temporary table | |
logger.info(f"[{test_id}] Test 4: Select from the temporary table") | |
select_query = "SELECT * FROM test_table WHERE name = ?" | |
result = app.db_conn.execute(select_query, (test_name,)).fetchone() | |
results["operations"].append({ | |
"name": "Select from temporary table", | |
"success": result is not None, | |
"result": str(result) if result is not None else None | |
}) | |
logger.info(f"[{test_id}] Test 4 result: {result}") | |
# Test 5: Check if users table exists and has the expected structure | |
logger.info(f"[{test_id}] Test 5: Check users table structure") | |
try: | |
table_info = app.db_conn.execute("PRAGMA table_info(users)").fetchall() | |
results["operations"].append({ | |
"name": "Check users table structure", | |
"success": len(table_info) > 0, | |
"columns": [col[1] for col in table_info] if table_info else [] | |
}) | |
logger.info(f"[{test_id}] Test 5 result: {table_info}") | |
except Exception as e: | |
logger.error(f"[{test_id}] Error checking users table structure: {str(e)}") | |
results["operations"].append({ | |
"name": "Check users table structure", | |
"success": False, | |
"error": str(e) | |
}) | |
# Test 6: List all users | |
logger.info(f"[{test_id}] Test 6: List all users") | |
try: | |
all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() | |
results["operations"].append({ | |
"name": "List all users", | |
"success": True, | |
"count": len(all_users) if all_users else 0, | |
"users": [{"id": user[0], "email": user[1]} for user in all_users] if all_users else [] | |
}) | |
logger.info(f"[{test_id}] Test 6 result: {all_users}") | |
except Exception as e: | |
logger.error(f"[{test_id}] Error listing all users: {str(e)}") | |
results["operations"].append({ | |
"name": "List all users", | |
"success": False, | |
"error": str(e) | |
}) | |
logger.info(f"[{test_id}] Database test completed successfully") | |
return results | |
except Exception as e: | |
logger.error(f"[{test_id}] Database test failed: {str(e)}") | |
results["operations"].append({ | |
"name": "Error during tests", | |
"success": False, | |
"error": str(e) | |
}) | |
return results | |
# Test user creation endpoint (direct SQL) | |
async def create_test_user(): | |
""" | |
Create a test user with email [email protected]. | |
This is for testing purposes only. | |
Returns: | |
dict: Information about the created test user. | |
""" | |
import time | |
# Generate a unique test ID | |
test_id = f"test_{int(time.time())}" | |
logger.info(f"[{test_id}] Starting test user creation (direct SQL)") | |
if not hasattr(app, "db_conn"): | |
logger.error(f"[{test_id}] Database connection not available") | |
return { | |
"success": False, | |
"error": "Database connection not available" | |
} | |
try: | |
# First check if the test user already exists | |
logger.info(f"[{test_id}] Checking if test user already exists") | |
check_query = "SELECT id FROM users WHERE email = ?" | |
existing_user = app.db_conn.execute(check_query, ("[email protected]",)).fetchone() | |
if existing_user: | |
logger.info(f"[{test_id}] Test user already exists with ID: {existing_user[0]}") | |
return { | |
"success": True, | |
"user_id": existing_user[0], | |
"email": "[email protected]", | |
"status": "already_exists" | |
} | |
# Use a pre-hashed password to avoid dependency on passlib | |
logger.info(f"[{test_id}] Using pre-hashed password for test user") | |
# This is a pre-hashed version of "TestPassword123!" using Argon2 | |
hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE" | |
# Insert the test user | |
logger.info(f"[{test_id}] Inserting test user with email: [email protected]") | |
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" | |
cursor = app.db_conn.execute(insert_query, ("[email protected]", hashed_password)) | |
app.db_conn.commit() | |
logger.info(f"[{test_id}] Committed test user insert") | |
# Try to get the user ID | |
user_id = None | |
try: | |
user_id = cursor.lastrowid | |
logger.info(f"[{test_id}] Got test user lastrowid: {user_id}") | |
except Exception as e: | |
logger.warning(f"[{test_id}] Could not get test user lastrowid: {str(e)}") | |
# Verify the insert with a separate query | |
logger.info(f"[{test_id}] Verifying test user was created") | |
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" | |
verify_result = app.db_conn.execute(verify_query, ("[email protected]",)).fetchone() | |
if verify_result: | |
user_id = verify_result[0] | |
logger.info(f"[{test_id}] Verified test user with ID: {user_id}") | |
return { | |
"success": True, | |
"user_id": user_id, | |
"email": "[email protected]", | |
"created_at": verify_result[2] if len(verify_result) > 2 else None, | |
"status": "created" | |
} | |
else: | |
logger.error(f"[{test_id}] Failed to verify test user after insert") | |
return { | |
"success": False, | |
"error": "Failed to verify user after insert" | |
} | |
except Exception as e: | |
logger.error(f"[{test_id}] Error creating test user: {str(e)}") | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
# Import and include routers | |
from app.api.routes import auth_router, projects_router, journals_router | |
app.include_router(auth_router.router, prefix="/api/auth", tags=["Authentication"]) | |
app.include_router(projects_router.router, prefix="/api/projects", tags=["Projects"]) | |
app.include_router(journals_router.router, prefix="/api/journals", tags=["Journals"]) | |
if __name__ == "__main__": | |
port = int(os.getenv("PORT", 7860)) | |
logger.info(f"Starting server on port {port}") | |
uvicorn.run("main:app", host="0.0.0.0", port=port) | |