Spaces:
Running
Running
import os | |
import sys | |
import logging | |
import requests | |
import time | |
from fastapi import FastAPI, HTTPException, Depends, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html | |
from fastapi.openapi.utils import get_openapi | |
from fastapi.staticfiles import StaticFiles | |
import uvicorn | |
from dotenv import load_dotenv | |
from passlib.context import CryptContext | |
from pydantic import BaseModel, EmailStr | |
from typing import Dict, Any, List, Optional | |
# Import the HTTP API database utility | |
from app.utils import db_http | |
# Configure logging | |
logging.basicConfig( | |
level=logging.WARNING, # Changed from INFO to WARNING to reduce verbosity | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler(sys.stdout)] | |
) | |
logger = logging.getLogger("auth-server") | |
# Set specific loggers to different levels as needed | |
logging.getLogger("uvicorn").setLevel(logging.WARNING) | |
logging.getLogger("fastapi").setLevel(logging.WARNING) | |
# Import database libraries | |
try: | |
# First try the recommended libsql-experimental package | |
import libsql_experimental as libsql | |
logger.info("Successfully imported libsql-experimental package") | |
HAS_LIBSQL = True | |
LIBSQL_TYPE = "experimental" | |
except ImportError: | |
try: | |
# Then try the libsql-client package as fallback | |
import libsql_client | |
logger.info("Successfully imported libsql-client package") | |
HAS_LIBSQL = True | |
LIBSQL_TYPE = "client" | |
except ImportError: | |
logger.error("Failed to import any libsql package. Please install libsql-experimental==0.0.49") | |
logger.error("Falling back to HTTP API method for database access") | |
# We'll use requests for HTTP API fallback | |
import requests | |
HAS_LIBSQL = False | |
LIBSQL_TYPE = "http" | |
# Load environment variables | |
load_dotenv() | |
# Create FastAPI app with detailed metadata | |
app = FastAPI( | |
title="Seamo Auth Server", | |
description=""" | |
# Seamo Authentication API | |
The Seamo Auth Server provides authentication and user management services for the Seamo platform. | |
## Features | |
* User registration and authentication | |
* Project management | |
* Journal management | |
* Access control | |
## Authentication | |
Most endpoints require authentication. Use the /api/auth/token endpoint to obtain access tokens. | |
""", | |
version="1.0.0", | |
contact={ | |
"name": "Seamo Team", | |
"url": "https://seamo.earth/contact", | |
"email": "[email protected]", | |
}, | |
license_info={ | |
"name": "MIT", | |
"url": "https://opensource.org/licenses/MIT", | |
}, | |
openapi_tags=[ | |
{ | |
"name": "Authentication", | |
"description": "Operations related to user authentication and token management", | |
}, | |
{ | |
"name": "Projects", | |
"description": "Operations related to project management", | |
}, | |
{ | |
"name": "Journals", | |
"description": "Operations related to journal management", | |
}, | |
{ | |
"name": "General", | |
"description": "General server information and health checks", | |
}, | |
], | |
docs_url=None, # Disable default docs | |
redoc_url=None # Disable default redoc | |
) | |
# Configure CORS | |
origins = [ | |
os.getenv("FRONTEND_URL", "http://localhost:3000"), | |
"https://seamo.earth", | |
"https://seamoo.netlify.app", | |
"https://seamo-ai-ai-server.hf.space", | |
"https://seamo-ai-auth-server.hf.space", | |
"https://seamo-ai-scraper-server.hf.space", | |
"http://localhost:3000", # For local development | |
"http://localhost:8000", # Local AI server | |
"http://localhost:8001", # Local Auth server | |
"http://localhost:8002", # Local Scraper server | |
] | |
print(f"CORS origins configured: {origins}") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Database connection | |
async def startup_db_client(): | |
# Get environment variables | |
db_url = os.getenv("TURSO_DATABASE_URL") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN") | |
if not db_url or not auth_token: | |
error_msg = "Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set." | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
# Clean the auth token to remove any problematic characters | |
clean_auth_token = auth_token.strip() | |
# Log connection details (without showing the full token) | |
token_preview = clean_auth_token[:10] + "..." if len(clean_auth_token) > 10 else "***" | |
logger.info(f"Connecting to database at URL: {db_url}") | |
logger.info(f"Using libsql type: {LIBSQL_TYPE}") | |
logger.info(f"Auth token preview: {token_preview}") | |
# Ensure the token is properly formatted | |
if not clean_auth_token.startswith("eyJ"): | |
logger.warning("Auth token does not appear to be in JWT format (should start with 'eyJ')") | |
# Try to extract the token if it's wrapped in quotes or has extra characters | |
if "eyJ" in clean_auth_token: | |
start_idx = clean_auth_token.find("eyJ") | |
# Find the end of the token (usually at a quote, space, or newline) | |
end_markers = ['"', "'", ' ', '\n', '\r'] | |
end_idx = len(clean_auth_token) | |
for marker in end_markers: | |
marker_idx = clean_auth_token.find(marker, start_idx) | |
if marker_idx > start_idx and marker_idx < end_idx: | |
end_idx = marker_idx | |
clean_auth_token = clean_auth_token[start_idx:end_idx] | |
logger.info(f"Extracted JWT token: {clean_auth_token[:10]}...") | |
# Verify the token has three parts (header.payload.signature) | |
parts = clean_auth_token.split('.') | |
if len(parts) != 3: | |
logger.warning(f"Auth token does not have the expected JWT format (3 parts separated by dots). Found {len(parts)} parts.") | |
else: | |
logger.info("Auth token has the expected JWT format") | |
# Function to create a super user if it doesn't exist using HTTP API | |
def create_super_user_http(): | |
""" | |
Create a super user with admin privileges if it doesn't exist using the HTTP API. | |
Uses the email [email protected] as specified. | |
""" | |
# Generate a unique identifier for this operation | |
operation_id = f"create_super_user_http_{int(time.time())}" | |
logger.info(f"[{operation_id}] Starting super user creation using HTTP API") | |
# Super user details | |
super_user_email = "[email protected]" | |
super_user_password = "TestPassword123!" # This is just a default password | |
try: | |
# Create password hashing context | |
pwd_context = CryptContext( | |
schemes=["argon2"], | |
argon2__time_cost=4, | |
argon2__memory_cost=102400, | |
argon2__parallelism=8, | |
argon2__salt_len=16 | |
) | |
# Hash the password | |
hashed_password = pwd_context.hash(super_user_password) | |
logger.info(f"[{operation_id}] Password hashed successfully") | |
# First, make sure the users table exists | |
logger.info(f"[{operation_id}] Creating users table if it doesn't exist") | |
db_http.create_table_if_not_exists( | |
"users", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
""", | |
operation_id=f"{operation_id}_create_table" | |
) | |
# Check if the super user already exists | |
logger.info(f"[{operation_id}] Checking if super user already exists") | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": super_user_email}], | |
limit=1, | |
operation_id=f"{operation_id}_check" | |
) | |
if users and len(users) > 0: | |
# User exists, check if they're an admin | |
user = users[0] | |
user_id = user.get("id") | |
is_admin = bool(user.get("is_admin", 0)) | |
logger.info(f"[{operation_id}] Super user already exists with ID: {user_id}, is_admin: {is_admin}") | |
# If user exists but is not admin, make them admin | |
if not is_admin: | |
logger.info(f"[{operation_id}] Updating user to have admin privileges") | |
update_result = db_http.update_record( | |
"users", | |
{"is_admin": 1}, | |
"id = ?", | |
[{"type": "integer", "value": str(user_id)}], | |
operation_id=f"{operation_id}_update" | |
) | |
if update_result: | |
logger.info(f"[{operation_id}] User updated to admin successfully") | |
else: | |
logger.warning(f"[{operation_id}] Failed to update user to admin") | |
return | |
# User doesn't exist, create it | |
logger.info(f"[{operation_id}] Super user does not exist, creating now") | |
# Insert the super user | |
user_data = { | |
"email": super_user_email, | |
"hashed_password": hashed_password, | |
"is_admin": 1 | |
} | |
user_id = db_http.insert_record("users", user_data, operation_id=f"{operation_id}_insert") | |
if user_id: | |
logger.info(f"[{operation_id}] Super user created successfully with ID: {user_id}") | |
# Verify the user was created | |
new_user = db_http.get_record_by_id("users", user_id, operation_id=f"{operation_id}_verify") | |
if new_user: | |
logger.info(f"[{operation_id}] Super user verified with ID: {user_id}") | |
return | |
else: | |
logger.warning(f"[{operation_id}] Super user not found by ID after creation") | |
# Try to find by email | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": super_user_email}], | |
limit=1, | |
operation_id=f"{operation_id}_verify_email" | |
) | |
if users and len(users) > 0: | |
logger.info(f"[{operation_id}] Super user found by email after creation") | |
return | |
# If we get here, we couldn't verify the user was created | |
# List all users for diagnostic purposes | |
all_users = db_http.select_records("users", operation_id=f"{operation_id}_list_all") | |
logger.warning(f"[{operation_id}] Super user was not found after creation. All users: {all_users}") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error creating super user with HTTP API: {str(e)}") | |
# We'll continue even if super user creation fails | |
# This allows the server to start and function without a super user | |
logger.warning(f"[{operation_id}] Continuing server startup despite super user creation error") | |
# Function to create a super user if it doesn't exist (original method - kept for reference) | |
def create_super_user(): | |
""" | |
Create a super user with admin privileges if it doesn't exist. | |
Uses the email [email protected] as specified. | |
Uses the approach from turso_interactive_crud.py for reliable operation. | |
Note: This function is kept for reference but is not used anymore. | |
The create_super_user_http function is used instead. | |
""" | |
# This function is kept for reference but is not used anymore | |
pass | |
# Function to create database tables using HTTP API | |
def create_tables_http(): | |
""" | |
Create database tables if they don't exist using the HTTP API. | |
""" | |
operation_id = f"create_tables_http_{int(time.time())}" | |
logger.info(f"[{operation_id}] Creating database tables using HTTP API") | |
try: | |
# Import the HTTP API utility | |
from app.utils import db_http | |
# Create users table | |
logger.info(f"[{operation_id}] Creating users table") | |
db_http.create_table_if_not_exists( | |
"users", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin BOOLEAN DEFAULT 0 | |
""", | |
operation_id=f"{operation_id}_users" | |
) | |
# Create projects table | |
logger.info(f"[{operation_id}] Creating projects table") | |
db_http.create_table_if_not_exists( | |
"projects", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
owner_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
description TEXT, | |
geojson TEXT, | |
storage_bucket TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (owner_id) REFERENCES users (id) | |
""", | |
operation_id=f"{operation_id}_projects" | |
) | |
# Create journals table | |
logger.info(f"[{operation_id}] Creating journals table") | |
db_http.create_table_if_not_exists( | |
"journals", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
project_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
content TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (project_id) REFERENCES projects (id) | |
""", | |
operation_id=f"{operation_id}_journals" | |
) | |
logger.info(f"[{operation_id}] All database tables created successfully") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error creating database tables: {str(e)}") | |
# Initialize database connection | |
connected = False | |
# Method 1: Try with libsql-experimental | |
if HAS_LIBSQL and LIBSQL_TYPE == "experimental": | |
try: | |
logger.info("Connecting with libsql-experimental") | |
# Try multiple connection methods | |
# Method 1a: Try with auth_token parameter (works with version 0.0.49) | |
try: | |
logger.info("Trying connection with auth_token parameter") | |
# Log the libsql version | |
if hasattr(libsql, '__version__'): | |
logger.info(f"libsql-experimental version: {libsql.__version__}") | |
# Create the connection | |
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) | |
# Test connection with a simple query | |
logger.info("Testing connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection successful with auth_token parameter: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing connection with INSERT") | |
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value",)) | |
app.db_conn.commit() | |
# Test select | |
logger.info("Testing connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"Test table contents: {result}") | |
connected = True | |
app.db_type = "libsql-experimental" | |
app.last_successful_connection_method = "auth_token" | |
logger.info("All connection tests passed successfully") | |
except Exception as e: | |
logger.warning(f"Connection with auth_token parameter failed: {str(e)}") | |
# Method 1b: Try with auth token in URL (works with other versions) | |
if not connected: | |
try: | |
logger.info("Trying connection with auth token in URL") | |
# Format the URL to include the auth token | |
if "?" in db_url: | |
connection_url = f"{db_url}&authToken={clean_auth_token}" | |
else: | |
connection_url = f"{db_url}?authToken={clean_auth_token}" | |
logger.info(f"Using connection URL: {connection_url}") | |
# Use the direct URL connection method with auth token in URL | |
app.db_conn = libsql.connect(connection_url) | |
app.last_successful_connection_url = connection_url | |
# Test connection with a simple query | |
logger.info("Testing connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection successful with auth token in URL: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing connection with INSERT") | |
app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_url",)) | |
app.db_conn.commit() | |
# Test select | |
logger.info("Testing connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"Test table contents: {result}") | |
connected = True | |
app.db_type = "libsql-experimental" | |
app.last_successful_connection_method = "url" | |
logger.info("All connection tests passed successfully") | |
except Exception as e: | |
logger.error(f"Connection with auth token in URL failed: {str(e)}") | |
except Exception as e: | |
logger.error(f"All libsql-experimental connection methods failed: {str(e)}") | |
# Method 2: Try with libsql-client | |
if not connected and HAS_LIBSQL and LIBSQL_TYPE == "client": | |
try: | |
logger.info("Connecting with libsql-client") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
logger.info(f"Using URL: {http_url}") | |
# Connect using the client | |
app.db_conn = libsql_client.create_client_sync( | |
url=http_url, | |
auth_token=clean_auth_token | |
) | |
# Test connection | |
result = app.db_conn.execute("SELECT 1").rows() | |
logger.info(f"Connection test successful: {result}") | |
connected = True | |
app.db_type = "libsql-client" | |
except Exception as e: | |
logger.error(f"libsql-client connection failed: {str(e)}") | |
# Method 3: Fallback to HTTP API | |
if not connected: | |
try: | |
logger.info("Falling back to HTTP API method") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
# Verify the URL format | |
if not http_url.startswith("https://"): | |
logger.warning(f"HTTP URL does not start with https://: {http_url}") | |
# Try to fix the URL | |
if "://" not in http_url: | |
http_url = f"https://{http_url}" | |
logger.info(f"Added https:// prefix to URL: {http_url}") | |
logger.info(f"Using HTTP URL: {http_url}") | |
# Create a simple HTTP API client class | |
class TursoHttpClient: | |
def __init__(self, url, auth_token): | |
self.url = url | |
self.auth_token = auth_token | |
self.headers = { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
# Add a property to track the last inserted ID | |
self.last_insert_id = None | |
def execute(self, query, params=None): | |
# Format the request according to the v2/pipeline specification | |
requests_data = [] | |
# Prepare the statement | |
stmt = {"sql": query} | |
# Add parameters if provided | |
if params: | |
# Convert parameters to the expected format | |
args = [] | |
for param in params: | |
if param is None: | |
args.append({"type": "null", "value": None}) | |
elif isinstance(param, int): | |
args.append({"type": "integer", "value": str(param)}) | |
elif isinstance(param, float): | |
args.append({"type": "float", "value": str(param)}) | |
else: | |
args.append({"type": "text", "value": str(param)}) | |
stmt["args"] = args | |
requests_data.append({"type": "execute", "stmt": stmt}) | |
# If this is an INSERT, add a query to get the last inserted ID | |
is_insert = query.strip().upper().startswith("INSERT") | |
if is_insert: | |
requests_data.append({ | |
"type": "execute", | |
"stmt": {"sql": "SELECT last_insert_rowid()"} | |
}) | |
# Always close the connection at the end | |
requests_data.append({"type": "close"}) | |
# Prepare the final request payload | |
data = {"requests": requests_data} | |
# Use the v2/pipeline endpoint | |
pipeline_url = f"{self.url}/v2/pipeline" | |
logger.info(f"Sending request to: {pipeline_url}") | |
logger.info(f"Headers: Authorization: Bearer {self.auth_token[:5]}... (truncated)") | |
try: | |
response = requests.post(pipeline_url, headers=self.headers, json=data, timeout=10) | |
# Log response status | |
logger.info(f"Response status: {response.status_code}") | |
# Check for auth errors specifically | |
if response.status_code == 401: | |
logger.error(f"Authentication error (401): {response.text}") | |
raise Exception(f"Authentication failed: {response.text}") | |
# Raise for other errors | |
response.raise_for_status() | |
# Parse the response | |
result = response.json() | |
except requests.exceptions.RequestException as e: | |
logger.error(f"HTTP request failed: {str(e)}") | |
raise | |
# Process the response | |
if "results" in result and len(result["results"]) > 0: | |
# If this was an INSERT, get the last inserted ID | |
if is_insert and len(result["results"]) > 1: | |
try: | |
last_id_result = result["results"][1] | |
if "rows" in last_id_result and len(last_id_result["rows"]) > 0: | |
self.last_insert_id = last_id_result["rows"][0]["values"][0] | |
logger.info(f"Last inserted ID: {self.last_insert_id}") | |
except Exception as e: | |
logger.warning(f"Failed to get last inserted ID: {str(e)}") | |
# Return a cursor-like object with the main result | |
cursor = TursoHttpCursor(result["results"][0]) | |
cursor.lastrowid = self.last_insert_id | |
return cursor | |
# Return an empty cursor | |
cursor = TursoHttpCursor(None) | |
cursor.lastrowid = self.last_insert_id | |
return cursor | |
def commit(self): | |
# HTTP API is stateless, no need to commit | |
logger.info("HTTP API commit called (no-op)") | |
pass | |
def close(self): | |
# HTTP API is stateless, no need to close | |
logger.info("HTTP API close called (no-op)") | |
pass | |
# Create a cursor-like class for HTTP API | |
class TursoHttpCursor: | |
def __init__(self, result): | |
self.result = result | |
self.lastrowid = None | |
def fetchone(self): | |
if self.result and "rows" in self.result and len(self.result["rows"]) > 0: | |
return self.result["rows"][0]["values"] | |
return None | |
def fetchall(self): | |
if self.result and "rows" in self.result: | |
return [row["values"] for row in self.result["rows"]] | |
return [] | |
# Create the HTTP API client | |
app.db_conn = TursoHttpClient(http_url, clean_auth_token) | |
# Test connection with a simple query | |
logger.info("Testing HTTP API connection with SELECT 1") | |
result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"HTTP API connection test successful: {result}") | |
# Test connection with a more complex query | |
logger.info("Testing HTTP API connection with CREATE TABLE IF NOT EXISTS") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS connection_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT | |
) | |
""") | |
app.db_conn.commit() | |
# Test insert | |
logger.info("Testing HTTP API connection with INSERT") | |
cursor = app.db_conn.execute("INSERT INTO connection_test (test_value) VALUES (?)", ("test_value_http",)) | |
app.db_conn.commit() | |
logger.info(f"HTTP API insert test - lastrowid: {cursor.lastrowid}") | |
# Test select | |
logger.info("Testing HTTP API connection with SELECT from test table") | |
result = app.db_conn.execute("SELECT * FROM connection_test").fetchall() | |
logger.info(f"HTTP API test table contents: {result}") | |
connected = True | |
app.db_type = "http-api" | |
logger.info("All HTTP API connection tests passed successfully") | |
except Exception as e: | |
logger.error(f"HTTP API connection failed: {str(e)}") | |
if not connected: | |
error_msg = "All database connection methods failed. Please check your credentials and try again." | |
logger.error(error_msg) | |
raise Exception(error_msg) | |
# Create tables if they don't exist | |
# We'll create each table in a separate try-except block to ensure that if one fails, others can still be created | |
logger.info("Creating database tables") | |
# Function to execute SQL with retry | |
def execute_with_retry(sql, max_retries=3): | |
for attempt in range(max_retries): | |
try: | |
# For each attempt, we'll create a fresh connection if needed | |
if attempt > 0: | |
logger.info(f"Retry attempt {attempt+1} for SQL execution") | |
# Test the connection first | |
try: | |
test_result = app.db_conn.execute("SELECT 1").fetchone() | |
logger.info(f"Connection test successful: {test_result}") | |
except Exception as conn_err: | |
logger.warning(f"Connection test failed, reconnecting: {str(conn_err)}") | |
# Reconnect using the same method that worked initially | |
if app.db_type == "libsql-experimental": | |
if hasattr(app, 'last_successful_connection_method') and app.last_successful_connection_method == 'auth_token': | |
logger.info("Reconnecting with auth_token parameter") | |
app.db_conn = libsql.connect(db_url, auth_token=clean_auth_token) | |
elif hasattr(app, 'last_successful_connection_url'): | |
logger.info(f"Reconnecting with URL: {app.last_successful_connection_url}") | |
app.db_conn = libsql.connect(app.last_successful_connection_url) | |
# Execute the SQL | |
app.db_conn.execute(sql) | |
app.db_conn.commit() | |
return True | |
except Exception as e: | |
logger.warning(f"SQL execution failed (attempt {attempt+1}): {str(e)}") | |
if attempt == max_retries - 1: | |
raise | |
# Small delay before retry | |
import time | |
time.sleep(1) | |
# Simplified table creation with fewer constraints | |
try: | |
# Create users table | |
users_table = """ | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""" | |
execute_with_retry(users_table) | |
logger.info("Users table created successfully") | |
# Create projects table | |
projects_table = """ | |
CREATE TABLE IF NOT EXISTS projects ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
owner_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
description TEXT, | |
geojson TEXT, | |
storage_bucket TEXT DEFAULT 'default', | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
execute_with_retry(projects_table) | |
logger.info("Projects table created successfully") | |
# Create journals table | |
journals_table = """ | |
CREATE TABLE IF NOT EXISTS journals ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
project_id INTEGER NOT NULL, | |
title TEXT NOT NULL, | |
content TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""" | |
execute_with_retry(journals_table) | |
logger.info("Journals table created successfully") | |
logger.info("All database tables created successfully") | |
# Create super user if it doesn't exist using HTTP API | |
create_super_user_http() | |
# Also create tables using HTTP API for better reliability | |
try: | |
logger.info("Creating tables using HTTP API for better reliability") | |
create_tables_http() | |
except Exception as e: | |
logger.error(f"Error creating tables with HTTP API: {str(e)}") | |
except Exception as e: | |
error_msg = f"Failed to create database tables: {str(e)}" | |
logger.error(error_msg) | |
# We'll continue even if table creation fails | |
# This allows the server to start and use existing tables if they exist | |
logger.warning("Continuing server startup despite table creation errors") | |
# Try to create tables using HTTP API as a fallback | |
try: | |
logger.info("Trying to create tables using HTTP API as a fallback") | |
create_tables_http() | |
except Exception as e: | |
logger.error(f"Error creating tables with HTTP API fallback: {str(e)}") | |
async def shutdown_db_client(): | |
logger.info("Shutting down database connection") | |
try: | |
# Close connection based on the type | |
if hasattr(app, 'db_type'): | |
if app.db_type == "libsql-experimental": | |
try: | |
app.db_conn.close() | |
logger.info("libsql-experimental connection closed successfully") | |
except Exception as e: | |
logger.warning(f"Error closing libsql-experimental connection: {str(e)}") | |
elif app.db_type == "libsql-client": | |
# libsql-client doesn't have a close method | |
logger.info("No close method needed for libsql-client") | |
elif app.db_type == "http-api": | |
# HTTP API is stateless, no need to close | |
logger.info("No close method needed for HTTP API") | |
else: | |
logger.warning(f"Unknown database type: {app.db_type}") | |
else: | |
logger.warning("No database connection to close") | |
except Exception as e: | |
logger.error(f"Error closing database connection: {str(e)}") | |
# Custom API documentation routes | |
async def custom_swagger_ui_html(): | |
return get_swagger_ui_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - Swagger UI", | |
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, | |
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js", | |
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css", | |
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
async def redoc_html(): | |
return get_redoc_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - ReDoc", | |
redoc_js_url="https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js", | |
redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
# Serve Swagger UI at root path | |
async def root_swagger(): | |
logger.info("Root endpoint accessed - serving Swagger UI") | |
return get_swagger_ui_html( | |
openapi_url=app.openapi_url, | |
title=f"{app.title} - API Documentation", | |
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url, | |
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js", | |
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css", | |
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", | |
) | |
# API information endpoint (moved from root) | |
async def api_info(): | |
""" | |
API information endpoint providing details about the Seamo Auth Server API. | |
Returns: | |
dict: Basic information about the API and links to documentation. | |
""" | |
logger.info("API info endpoint accessed") | |
return { | |
"message": "Welcome to Seamo Auth Server API", | |
"version": "1.0.0", | |
"documentation": { | |
"swagger_ui": "/", | |
"redoc": "/redoc", | |
"openapi_json": "/openapi.json" | |
}, | |
"endpoints": { | |
"health": "/health", | |
"auth": "/api/auth", | |
"projects": "/api/projects", | |
"journals": "/api/journals" | |
} | |
} | |
# Health check endpoint | |
async def health_check(): | |
""" | |
Health check endpoint to verify the server is running properly. | |
Returns: | |
dict: Status information about the server. | |
""" | |
logger.info("Health check endpoint accessed") | |
return { | |
"status": "healthy", | |
"version": "1.0.0", | |
"database_connected": hasattr(app, "db_conn"), | |
"database_type": getattr(app, "db_type", "unknown") | |
} | |
# Super user check/create endpoint | |
async def ensure_super_user(): | |
""" | |
Check if the super user exists and create it if it doesn't. | |
This endpoint can be used to manually trigger super user creation. | |
Returns: | |
dict: Information about the super user status. | |
""" | |
logger.info("Ensure super user endpoint accessed") | |
# First, let's test if we can insert and retrieve data from a simple test table | |
try: | |
# Create a simple test table | |
logger.info("Creating test table") | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS super_user_test ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""") | |
app.db_conn.commit() | |
# Insert a test value | |
test_value = f"test_{int(time.time())}" | |
logger.info(f"Inserting test value: {test_value}") | |
app.db_conn.execute("INSERT INTO super_user_test (test_value) VALUES (?)", (test_value,)) | |
app.db_conn.commit() | |
# Try to retrieve the test value | |
logger.info("Retrieving test value") | |
result = app.db_conn.execute("SELECT * FROM super_user_test WHERE test_value = ?", (test_value,)).fetchone() | |
if result: | |
logger.info(f"Test value retrieved successfully: {result}") | |
else: | |
logger.warning("Test value not found after insert") | |
# Get all values from the test table | |
all_test_values = app.db_conn.execute("SELECT * FROM super_user_test").fetchall() | |
logger.info(f"All test values: {all_test_values}") | |
except Exception as e: | |
logger.error(f"Error during test table operations: {str(e)}") | |
# Now let's check the users table schema | |
try: | |
logger.info("Checking users table schema") | |
table_info = app.db_conn.execute("PRAGMA table_info(users)").fetchall() | |
logger.info(f"Users table schema: {table_info}") | |
except Exception as e: | |
logger.error(f"Error checking users table schema: {str(e)}") | |
# Let's try a direct SQL approach for creating the super user | |
try: | |
logger.info("Trying direct SQL approach for super user creation") | |
# Check if super user exists | |
super_user_email = "[email protected]" | |
direct_check = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
logger.info(f"Direct check result: {direct_check}") | |
if direct_check and direct_check[0] > 0: | |
logger.info("Super user already exists according to direct check") | |
else: | |
logger.info("Super user does not exist according to direct check, creating now") | |
# Create password hashing context | |
pwd_context = CryptContext( | |
schemes=["argon2"], | |
argon2__time_cost=4, | |
argon2__memory_cost=102400, | |
argon2__parallelism=8, | |
argon2__salt_len=16 | |
) | |
# Hash the password | |
super_user_password = "TestPassword123!" | |
hashed_password = pwd_context.hash(super_user_password) | |
# Try a direct insert with explicit SQL | |
direct_insert = """ | |
INSERT INTO users (email, hashed_password, is_admin) | |
VALUES (?, ?, 1) | |
""" | |
app.db_conn.execute(direct_insert, (super_user_email, hashed_password)) | |
app.db_conn.commit() | |
logger.info("Direct insert completed") | |
# Check if the insert worked | |
direct_verify = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
logger.info(f"Direct verify result: {direct_verify}") | |
if direct_verify: | |
logger.info("Super user created successfully with direct SQL approach") | |
else: | |
logger.warning("Super user not found after direct SQL insert") | |
# Let's try to list all users | |
all_users = app.db_conn.execute("SELECT * FROM users").fetchall() | |
logger.info(f"All users after direct insert: {all_users}") | |
except Exception as e: | |
logger.error(f"Error during direct SQL approach: {str(e)}") | |
# Return diagnostic information | |
return { | |
"status": "diagnostic_complete", | |
"message": "Diagnostic tests completed. Check server logs for details.", | |
"email": "[email protected]", | |
"next_steps": [ | |
"Check server logs for detailed diagnostic information", | |
"Verify database permissions", | |
"Try accessing the /test-db endpoint for additional diagnostics" | |
] | |
} | |
# Database test endpoint | |
async def test_database(): | |
""" | |
Test database connectivity and operations. | |
This endpoint performs various database operations to diagnose issues. | |
Returns: | |
dict: Results of the database tests. | |
""" | |
logger.info("Database test endpoint accessed") | |
results = { | |
"tests": [], | |
"overall_status": "unknown" | |
} | |
# Test 1: Basic connection | |
try: | |
test_result = app.db_conn.execute("SELECT 1").fetchone() | |
results["tests"].append({ | |
"name": "basic_connection", | |
"status": "success", | |
"result": str(test_result) | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "basic_connection", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 2: Create test table | |
try: | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS db_test_table ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
test_value TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""") | |
app.db_conn.commit() | |
results["tests"].append({ | |
"name": "create_test_table", | |
"status": "success" | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "create_test_table", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 3: Insert into test table | |
test_id = int(time.time()) | |
test_value = f"test_value_{test_id}" | |
try: | |
app.db_conn.execute("INSERT INTO db_test_table (test_value) VALUES (?)", (test_value,)) | |
app.db_conn.commit() | |
results["tests"].append({ | |
"name": "insert_test_value", | |
"status": "success", | |
"value_inserted": test_value | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "insert_test_value", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 4: Retrieve from test table | |
try: | |
result = app.db_conn.execute("SELECT * FROM db_test_table WHERE test_value = ?", (test_value,)).fetchone() | |
if result: | |
results["tests"].append({ | |
"name": "retrieve_test_value", | |
"status": "success", | |
"result": str(result) | |
}) | |
else: | |
results["tests"].append({ | |
"name": "retrieve_test_value", | |
"status": "failure", | |
"error": "Value not found after insert" | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "retrieve_test_value", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 5: List all values in test table | |
try: | |
all_values = app.db_conn.execute("SELECT * FROM db_test_table").fetchall() | |
results["tests"].append({ | |
"name": "list_all_test_values", | |
"status": "success", | |
"count": len(all_values), | |
"values": str(all_values[-5:]) if all_values else "[]" # Show only the last 5 values | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "list_all_test_values", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 6: Check users table | |
try: | |
users_count = app.db_conn.execute("SELECT COUNT(*) FROM users").fetchone() | |
results["tests"].append({ | |
"name": "check_users_table", | |
"status": "success", | |
"count": users_count[0] if users_count else 0 | |
}) | |
# If users exist, try to get one | |
if users_count and users_count[0] > 0: | |
try: | |
user = app.db_conn.execute("SELECT * FROM users LIMIT 1").fetchone() | |
results["tests"].append({ | |
"name": "retrieve_user", | |
"status": "success", | |
"user_id": user[0] if user else None | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "retrieve_user", | |
"status": "failure", | |
"error": str(e) | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "check_users_table", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Test 7: Try to insert a test user | |
test_user_email = f"test_user_{test_id}@example.com" | |
try: | |
# Create password hashing context | |
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") | |
hashed_password = pwd_context.hash("TestPassword123!") | |
# Insert test user | |
app.db_conn.execute( | |
"INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 0)", | |
(test_user_email, hashed_password) | |
) | |
app.db_conn.commit() | |
results["tests"].append({ | |
"name": "insert_test_user", | |
"status": "success", | |
"email": test_user_email | |
}) | |
# Try to retrieve the test user | |
try: | |
user = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (test_user_email,)).fetchone() | |
if user: | |
results["tests"].append({ | |
"name": "retrieve_test_user", | |
"status": "success", | |
"user_id": user[0] if user else None | |
}) | |
else: | |
results["tests"].append({ | |
"name": "retrieve_test_user", | |
"status": "failure", | |
"error": "Test user not found after insert" | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "retrieve_test_user", | |
"status": "failure", | |
"error": str(e) | |
}) | |
except Exception as e: | |
results["tests"].append({ | |
"name": "insert_test_user", | |
"status": "failure", | |
"error": str(e) | |
}) | |
# Calculate overall status | |
success_count = sum(1 for test in results["tests"] if test["status"] == "success") | |
total_count = len(results["tests"]) | |
if success_count == total_count: | |
results["overall_status"] = "success" | |
elif success_count > 0: | |
results["overall_status"] = "partial_success" | |
else: | |
results["overall_status"] = "failure" | |
results["success_rate"] = f"{success_count}/{total_count}" | |
return results | |
# Super user creation endpoint (direct HTTP API) | |
async def create_super_user_http_endpoint(): | |
""" | |
Create a super user with email [email protected] directly using the Turso HTTP API. | |
This endpoint is for testing purposes only. | |
Returns: | |
dict: Information about the super user creation. | |
""" | |
logger.info("Create super user HTTP endpoint accessed") | |
# Super user details | |
super_user_email = "[email protected]" | |
super_user_password = "TestPassword123!" | |
# Create password hashing context | |
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") | |
hashed_password = pwd_context.hash(super_user_password) | |
try: | |
# Extract the database URL and auth token | |
db_url = os.getenv("TURSO_DATABASE_URL", "") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN", "") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
# Set up headers for the HTTP request | |
headers = { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
# First, check if the user already exists | |
logger.info(f"Checking if super user already exists using HTTP API") | |
check_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": "SELECT COUNT(*) FROM users WHERE email = ?", | |
"args": [ | |
{"type": "text", "value": super_user_email} | |
] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request to check if user exists | |
check_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=check_query) | |
check_response.raise_for_status() | |
check_result = check_response.json() | |
logger.info(f"Check response: {check_result}") | |
# Parse the result to see if user exists | |
user_exists = False | |
if "results" in check_result and len(check_result["results"]) > 0: | |
result = check_result["results"][0] | |
if "rows" in result and len(result["rows"]) > 0: | |
count = result["rows"][0]["values"][0] | |
user_exists = count > 0 | |
if user_exists: | |
logger.info(f"Super user already exists according to HTTP API") | |
return { | |
"status": "exists", | |
"message": "Super user already exists", | |
"email": super_user_email | |
} | |
# User doesn't exist, create it | |
logger.info(f"Creating super user using HTTP API") | |
# First, make sure the table exists | |
create_table_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": """ | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""" | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request to create the table | |
create_table_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=create_table_query) | |
create_table_response.raise_for_status() | |
logger.info(f"Table creation response: {create_table_response.status_code}") | |
# Insert the super user | |
insert_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": "INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 1)", | |
"args": [ | |
{"type": "text", "value": super_user_email}, | |
{"type": "text", "value": hashed_password} | |
] | |
} | |
}, | |
{ | |
"type": "execute", | |
"stmt": {"sql": "SELECT last_insert_rowid()"} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request to insert the user | |
insert_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=insert_query) | |
insert_response.raise_for_status() | |
insert_result = insert_response.json() | |
logger.info(f"Insert response: {insert_result}") | |
# Try to get the last inserted ID | |
last_id = None | |
if "results" in insert_result and len(insert_result["results"]) > 1: | |
result = insert_result["results"][1] | |
if "rows" in result and len(result["rows"]) > 0: | |
last_id = result["rows"][0]["values"][0] | |
logger.info(f"Last inserted ID: {last_id}") | |
# Verify the user was created | |
verify_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": "SELECT * FROM users WHERE email = ?", | |
"args": [ | |
{"type": "text", "value": super_user_email} | |
] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request to verify the user | |
verify_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=verify_query) | |
verify_response.raise_for_status() | |
verify_result = verify_response.json() | |
logger.info(f"Verify response: {verify_result}") | |
# Check if the user was found | |
user_found = False | |
user_id = None | |
if "results" in verify_result and len(verify_result["results"]) > 0: | |
result = verify_result["results"][0] | |
if "rows" in result and len(result["rows"]) > 0: | |
user_found = True | |
user_id = result["rows"][0]["values"][0] | |
logger.info(f"User found with ID: {user_id}") | |
if user_found: | |
return { | |
"status": "success", | |
"message": "Super user created successfully", | |
"user_id": user_id, | |
"email": super_user_email | |
} | |
else: | |
# List all users for diagnostic purposes | |
list_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": {"sql": "SELECT id, email FROM users"} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request to list all users | |
list_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=list_query) | |
list_response.raise_for_status() | |
list_result = list_response.json() | |
logger.info(f"List response: {list_result}") | |
# Extract the list of users | |
all_users = [] | |
if "results" in list_result and len(list_result["results"]) > 0: | |
result = list_result["results"][0] | |
if "rows" in result: | |
for row in result["rows"]: | |
all_users.append(row["values"]) | |
return { | |
"status": "error", | |
"message": "Super user was not found after creation", | |
"email": super_user_email, | |
"all_users": all_users | |
} | |
except Exception as e: | |
logger.error(f"Error creating super user with HTTP API: {str(e)}") | |
return { | |
"status": "error", | |
"message": f"Error creating super user: {str(e)}", | |
"email": super_user_email | |
} | |
# Super user creation endpoint (direct SQL) | |
async def create_super_user_endpoint(): | |
""" | |
Create a super user with email [email protected] directly using SQL. | |
This endpoint is for testing purposes only. | |
Returns: | |
dict: Information about the super user creation. | |
""" | |
logger.info("Create super user endpoint accessed") | |
# Super user details | |
super_user_email = "[email protected]" | |
super_user_password = "TestPassword123!" | |
# Create password hashing context | |
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") | |
hashed_password = pwd_context.hash(super_user_password) | |
try: | |
# Check if super user already exists - using COUNT(*) for more reliable results | |
logger.info("Checking if super user already exists") | |
count_result = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
user_exists = count_result and count_result[0] > 0 | |
if user_exists: | |
# Get user details | |
existing_user = app.db_conn.execute("SELECT id, is_admin FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
if existing_user: | |
user_id = existing_user[0] | |
is_admin = existing_user[1] if len(existing_user) > 1 else 0 | |
logger.info(f"Super user already exists with ID: {user_id}, is_admin: {is_admin}") | |
# If user exists but is not admin, make them admin | |
if not is_admin: | |
logger.info("Updating user to have admin privileges") | |
app.db_conn.execute("UPDATE users SET is_admin = 1 WHERE email = ?", (super_user_email,)) | |
app.db_conn.commit() | |
logger.info("User updated to admin successfully") | |
return { | |
"status": "updated", | |
"message": f"Super user already existed with ID {user_id} and was updated to have admin privileges", | |
"user_id": user_id, | |
"email": super_user_email | |
} | |
return { | |
"status": "exists", | |
"message": f"Super user already exists with ID {user_id} and has admin privileges", | |
"user_id": user_id, | |
"email": super_user_email | |
} | |
else: | |
logger.warning("User count says user exists but couldn't retrieve details") | |
# Insert the super user - using the approach from turso_interactive_crud.py | |
logger.info(f"Inserting super user with email: {super_user_email}") | |
# First, make sure the table exists | |
app.db_conn.execute(""" | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""") | |
app.db_conn.commit() | |
# Insert the user | |
app.db_conn.execute( | |
"INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 1)", | |
(super_user_email, hashed_password) | |
) | |
app.db_conn.commit() | |
logger.info("Super user insert committed") | |
# Wait a moment to ensure the insert is processed | |
time.sleep(0.5) | |
# Get the user ID by querying for the email we just inserted | |
# This is more reliable than using lastrowid which might not work correctly with Turso | |
logger.info("Getting user ID by email") | |
id_result = app.db_conn.execute("SELECT id FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
if id_result: | |
user_id = id_result[0] | |
logger.info(f"Found user ID: {user_id}") | |
# Get the full user record | |
logger.info("Getting full user record") | |
user = app.db_conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() | |
if user: | |
logger.info(f"Super user retrieved successfully: {user}") | |
return { | |
"status": "success", | |
"message": "Super user created and retrieved successfully", | |
"user_id": user_id, | |
"email": super_user_email | |
} | |
else: | |
logger.warning(f"Found user ID {user_id} but couldn't retrieve full record") | |
else: | |
logger.warning("Could not find user ID after insert") | |
# If we get here, we couldn't verify the user was created properly | |
# Try to list all users as a diagnostic step | |
logger.info("Listing all users for diagnostic purposes") | |
all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() | |
logger.info(f"All users: {all_users}") | |
# Try one more approach - check if the user exists now | |
count_after = app.db_conn.execute("SELECT COUNT(*) FROM users WHERE email = ?", (super_user_email,)).fetchone() | |
exists_after = count_after and count_after[0] > 0 | |
if exists_after: | |
logger.info("User exists according to COUNT(*) after insert") | |
return { | |
"status": "likely_success", | |
"message": "Super user appears to have been created (confirmed by COUNT query)", | |
"email": super_user_email, | |
"all_users": str(all_users) | |
} | |
else: | |
logger.warning("User does not exist according to COUNT(*) after insert") | |
return { | |
"status": "partial_success", | |
"message": "Super user insert operation completed but user could not be verified", | |
"email": super_user_email, | |
"all_users": str(all_users) | |
} | |
except Exception as e: | |
logger.error(f"Error creating super user: {str(e)}") | |
return { | |
"status": "error", | |
"message": f"Error creating super user: {str(e)}", | |
"email": super_user_email | |
} | |
# Test user creation endpoint (direct SQL) | |
async def create_test_user(): | |
""" | |
Create a test user directly using SQL. | |
This endpoint is for testing purposes only. | |
Returns: | |
dict: Information about the test user creation. | |
""" | |
logger.info("Create test user endpoint accessed") | |
# Generate a unique email | |
test_id = int(time.time()) | |
test_email = f"test_user_{test_id}@example.com" | |
test_password = "TestPassword123!" | |
# Create password hashing context | |
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") | |
hashed_password = pwd_context.hash(test_password) | |
try: | |
# Insert the test user | |
logger.info(f"Inserting test user with email: {test_email}") | |
app.db_conn.execute( | |
"INSERT INTO users (email, hashed_password, is_admin) VALUES (?, ?, 0)", | |
(test_email, hashed_password) | |
) | |
app.db_conn.commit() | |
logger.info("Test user inserted successfully") | |
# Try to retrieve the test user | |
logger.info("Retrieving test user") | |
user = app.db_conn.execute("SELECT * FROM users WHERE email = ?", (test_email,)).fetchone() | |
if user: | |
logger.info(f"Test user retrieved successfully: {user}") | |
return { | |
"status": "success", | |
"message": "Test user created and retrieved successfully", | |
"user_id": user[0] if user else None, | |
"email": test_email | |
} | |
else: | |
logger.warning("Test user not found after insert") | |
# Try to list all users | |
all_users = app.db_conn.execute("SELECT id, email FROM users").fetchall() | |
logger.info(f"All users: {all_users}") | |
return { | |
"status": "partial_success", | |
"message": "Test user created but could not be retrieved", | |
"email": test_email, | |
"all_users": str(all_users) | |
} | |
except Exception as e: | |
logger.error(f"Error creating test user: {str(e)}") | |
return { | |
"status": "error", | |
"message": f"Error creating test user: {str(e)}", | |
"email": test_email | |
} | |
# Direct HTTP API endpoint for user registration | |
async def direct_register(request: Request): | |
""" | |
Register a new user using direct HTTP API. | |
This is a workaround for issues with the libsql-experimental driver. | |
""" | |
class UserCreate(BaseModel): | |
email: EmailStr | |
password: str | |
# Generate a unique request ID | |
request_id = f"direct_reg_{int(time.time())}" | |
logger.info(f"[{request_id}] Starting direct registration") | |
try: | |
# Parse the request body | |
body = await request.json() | |
user = UserCreate(**body) | |
logger.info(f"[{request_id}] Registering user with email: {user.email}") | |
# Get Turso database connection details | |
db_url = os.getenv("TURSO_DATABASE_URL") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN") | |
if not db_url or not auth_token: | |
logger.error(f"[{request_id}] Missing Turso credentials") | |
return { | |
"success": False, | |
"error": "Missing Turso credentials" | |
} | |
# Clean the auth token | |
auth_token = auth_token.strip() | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
# Verify the URL format | |
if not http_url.startswith("https://"): | |
if "://" not in http_url: | |
http_url = f"https://{http_url}" | |
logger.info(f"[{request_id}] Using HTTP URL: {http_url}") | |
# Check if user already exists | |
logger.info(f"[{request_id}] Checking if user already exists") | |
check_query = "SELECT id FROM users WHERE email = ?" | |
# Format the request for checking if user exists | |
check_data = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": check_query, | |
"args": [ | |
{"type": "text", "value": user.email} | |
] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request | |
headers = { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
pipeline_url = f"{http_url}/v2/pipeline" | |
check_response = requests.post(pipeline_url, headers=headers, json=check_data, timeout=10) | |
check_response.raise_for_status() | |
check_result = check_response.json() | |
if "results" in check_result and len(check_result["results"]) > 0: | |
result = check_result["results"][0] | |
if "rows" in result and len(result["rows"]) > 0: | |
logger.warning(f"[{request_id}] User with email {user.email} already exists") | |
return { | |
"success": False, | |
"error": "Email already registered" | |
} | |
# Hash the password | |
logger.info(f"[{request_id}] Hashing password") | |
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") | |
hashed_password = pwd_context.hash(user.password) | |
logger.info(f"[{request_id}] Password hashed successfully") | |
# Insert the user | |
logger.info(f"[{request_id}] Inserting user") | |
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" | |
# Format the request for inserting the user | |
insert_data = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": insert_query, | |
"args": [ | |
{"type": "text", "value": user.email}, | |
{"type": "text", "value": hashed_password} | |
] | |
} | |
}, | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": "SELECT last_insert_rowid()" | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request | |
insert_response = requests.post(pipeline_url, headers=headers, json=insert_data, timeout=10) | |
insert_response.raise_for_status() | |
insert_result = insert_response.json() | |
# Get the user ID | |
user_id = None | |
if "results" in insert_result and len(insert_result["results"]) > 1: | |
id_result = insert_result["results"][1] | |
if "rows" in id_result and len(id_result["rows"]) > 0: | |
user_id = id_result["rows"][0]["values"][0] | |
logger.info(f"[{request_id}] User inserted with ID: {user_id}") | |
# Verify the insert | |
logger.info(f"[{request_id}] Verifying user was created") | |
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" | |
# Format the request for verifying the user | |
verify_data = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": verify_query, | |
"args": [ | |
{"type": "text", "value": user.email} | |
] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request | |
verify_response = requests.post(pipeline_url, headers=headers, json=verify_data, timeout=10) | |
verify_response.raise_for_status() | |
verify_result = verify_response.json() | |
if "results" in verify_result and len(verify_result["results"]) > 0: | |
result = verify_result["results"][0] | |
if "rows" in result and len(result["rows"]) > 0: | |
user_data = result["rows"][0]["values"] | |
user_id = user_data[0] | |
logger.info(f"[{request_id}] Verified user with ID: {user_id}") | |
# Generate access token | |
from datetime import timedelta | |
from app.api.routes.auth_router import create_access_token, create_refresh_token | |
access_token_expires = timedelta(minutes=30) | |
refresh_token_expires = timedelta(days=7) | |
access_token = create_access_token( | |
data={"sub": user.email}, | |
expires_delta=access_token_expires | |
) | |
refresh_token = create_refresh_token( | |
data={"sub": user.email}, | |
expires_delta=refresh_token_expires | |
) | |
return { | |
"success": True, | |
"id": user_id, | |
"email": user.email, | |
"access_token": access_token, | |
"refresh_token": refresh_token, | |
"token_type": "bearer" | |
} | |
else: | |
logger.error(f"[{request_id}] Failed to verify user after insert") | |
return { | |
"success": False, | |
"error": "Failed to verify user after insert" | |
} | |
else: | |
logger.error(f"[{request_id}] Failed to verify user after insert") | |
return { | |
"success": False, | |
"error": "Failed to verify user after insert" | |
} | |
except Exception as e: | |
logger.error(f"[{request_id}] Error during direct registration: {str(e)}") | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
# Import and include routers | |
from app.api.routes import auth_router, projects_router, journals_router | |
app.include_router(auth_router.router, prefix="/api/auth", tags=["Authentication"]) | |
app.include_router(projects_router.router, prefix="/api/projects", tags=["Projects"]) | |
app.include_router(journals_router.router, prefix="/api/journals", tags=["Journals"]) | |
# Fix users table endpoint | |
async def fix_users_table(): | |
""" | |
Fix the users table by recreating it without the CHECK constraint on hashed_password. | |
This endpoint is for fixing database issues. | |
Returns: | |
dict: Information about the operation. | |
""" | |
logger.info("Fix users table endpoint accessed") | |
operation_id = f"fix_users_table_{int(time.time())}" | |
try: | |
# Import the HTTP API utility | |
from app.utils import db_http | |
# Step 1: Check if users table exists | |
logger.info(f"[{operation_id}] Checking if users table exists") | |
# Step 2: Create a temporary table without the constraint | |
logger.info(f"[{operation_id}] Creating temporary table") | |
db_http.execute_query( | |
""" | |
CREATE TABLE IF NOT EXISTS users_temp ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
email TEXT NOT NULL UNIQUE, | |
hashed_password TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
last_login DATETIME, | |
is_admin INTEGER DEFAULT 0 | |
) | |
""", | |
operation_id=f"{operation_id}_create_temp" | |
) | |
# Step 3: Copy data from users to users_temp if users exists | |
logger.info(f"[{operation_id}] Copying data to temporary table") | |
try: | |
db_http.execute_query( | |
""" | |
INSERT INTO users_temp (id, email, hashed_password, created_at, last_login, is_admin) | |
SELECT id, email, hashed_password, created_at, last_login, is_admin FROM users | |
""", | |
operation_id=f"{operation_id}_copy_data" | |
) | |
logger.info(f"[{operation_id}] Data copied successfully") | |
except Exception as e: | |
logger.warning(f"[{operation_id}] Error copying data: {str(e)}") | |
# This is expected if the users table doesn't exist or is empty | |
# Step 4: Drop the original users table | |
logger.info(f"[{operation_id}] Dropping original users table") | |
db_http.execute_query( | |
"DROP TABLE IF EXISTS users", | |
operation_id=f"{operation_id}_drop_users" | |
) | |
# Step 5: Rename users_temp to users | |
logger.info(f"[{operation_id}] Renaming temporary table to users") | |
db_http.execute_query( | |
"ALTER TABLE users_temp RENAME TO users", | |
operation_id=f"{operation_id}_rename_table" | |
) | |
# Step 6: Create indexes | |
logger.info(f"[{operation_id}] Creating indexes") | |
db_http.execute_query( | |
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)", | |
operation_id=f"{operation_id}_create_index" | |
) | |
logger.info(f"[{operation_id}] Users table fixed successfully") | |
return { | |
"success": True, | |
"message": "Users table fixed successfully" | |
} | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error fixing users table: {str(e)}") | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
# Test database with HTTP API endpoint | |
async def test_db_http(): | |
""" | |
Test the database connection and operations using the HTTP API. | |
This endpoint is for testing purposes only. | |
Returns: | |
dict: Information about the database tests. | |
""" | |
logger.info("Test database HTTP API endpoint accessed") | |
results = { | |
"tests": [], | |
"success_count": 0, | |
"total_count": 0 | |
} | |
try: | |
# Extract the database URL and auth token | |
db_url = os.getenv("TURSO_DATABASE_URL", "") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN", "") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
# Set up headers for the HTTP request | |
headers = { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
# Test 1: Basic connection | |
basic_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": {"sql": "SELECT 1"} | |
}, | |
{"type": "close"} | |
] | |
} | |
basic_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=basic_query) | |
basic_response.raise_for_status() | |
basic_result = basic_response.json() | |
results["tests"].append({ | |
"name": "Basic connection (HTTP API)", | |
"success": True, | |
"result": str(basic_result) | |
}) | |
results["success_count"] += 1 | |
results["total_count"] += 1 | |
# Test 2: Create a test table | |
create_table_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": """ | |
CREATE TABLE IF NOT EXISTS test_table_http ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
value TEXT NOT NULL | |
) | |
""" | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
create_table_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=create_table_query) | |
create_table_response.raise_for_status() | |
results["tests"].append({ | |
"name": "Create test table (HTTP API)", | |
"success": True | |
}) | |
results["success_count"] += 1 | |
results["total_count"] += 1 | |
# Test 3: Insert into test table | |
insert_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": "INSERT INTO test_table_http (value) VALUES (?)", | |
"args": [ | |
{"type": "text", "value": "test_value_http"} | |
] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
insert_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=insert_query) | |
insert_response.raise_for_status() | |
results["tests"].append({ | |
"name": "Insert into test table (HTTP API)", | |
"success": True | |
}) | |
results["success_count"] += 1 | |
results["total_count"] += 1 | |
# Test 4: Select from test table | |
select_query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": {"sql": "SELECT * FROM test_table_http"} | |
}, | |
{"type": "close"} | |
] | |
} | |
select_response = requests.post(f"{http_url}/v2/pipeline", headers=headers, json=select_query) | |
select_response.raise_for_status() | |
select_result = select_response.json() | |
results["tests"].append({ | |
"name": "Select from test table (HTTP API)", | |
"success": True, | |
"result": str(select_result) | |
}) | |
results["success_count"] += 1 | |
results["total_count"] += 1 | |
except Exception as e: | |
results["error"] = str(e) | |
# Add success rate | |
results["success_rate"] = f"{results['success_count']}/{results['total_count']}" | |
return results | |
async def test_db_http_util(): | |
""" | |
Test the database connection and operations using the HTTP API utility. | |
This endpoint is for testing purposes only. | |
Returns: | |
dict: Information about the database tests. | |
""" | |
logger.info("Test database HTTP API utility endpoint accessed") | |
try: | |
# Import the HTTP API utility | |
from app.utils import db_http | |
operation_id = f"test_db_http_util_{int(time.time())}" | |
logger.info(f"[{operation_id}] Starting HTTP API utility test") | |
results = { | |
"connection_type": "http-api-util", | |
"operations": [] | |
} | |
# Test 1: Simple SELECT | |
logger.info(f"[{operation_id}] Test 1: Simple SELECT") | |
result = db_http.execute_query("SELECT 1 as test", operation_id=f"{operation_id}_1") | |
results["operations"].append({ | |
"name": "Simple SELECT", | |
"success": True, | |
"result": str(result) | |
}) | |
# Test 2: Create a temporary table | |
logger.info(f"[{operation_id}] Test 2: Create a temporary table") | |
db_http.create_table_if_not_exists( | |
"test_table_http_util", | |
""" | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
value TEXT NOT NULL, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
""", | |
operation_id=f"{operation_id}_2" | |
) | |
results["operations"].append({ | |
"name": "Create temporary table", | |
"success": True | |
}) | |
# Test 3: Insert into the temporary table | |
logger.info(f"[{operation_id}] Test 3: Insert into the temporary table") | |
test_value = f"test_value_http_util_{int(time.time())}" | |
insert_id = db_http.insert_record( | |
"test_table_http_util", | |
{"value": test_value}, | |
operation_id=f"{operation_id}_3" | |
) | |
results["operations"].append({ | |
"name": "Insert into temporary table", | |
"success": insert_id is not None, | |
"insert_id": insert_id | |
}) | |
# Test 4: Select from the temporary table | |
logger.info(f"[{operation_id}] Test 4: Select from the temporary table") | |
records = db_http.select_records( | |
"test_table_http_util", | |
condition="value = ?", | |
condition_params=[{"type": "text", "value": test_value}], | |
limit=1, | |
operation_id=f"{operation_id}_4" | |
) | |
results["operations"].append({ | |
"name": "Select from temporary table", | |
"success": len(records) > 0, | |
"result": str(records[0]) if records else None | |
}) | |
# Test 5: Update the record | |
logger.info(f"[{operation_id}] Test 5: Update the record") | |
updated_value = f"{test_value}_updated" | |
success = db_http.update_record( | |
"test_table_http_util", | |
{"value": updated_value}, | |
"id = ?", | |
[{"type": "integer", "value": str(insert_id)}], | |
operation_id=f"{operation_id}_5" | |
) | |
results["operations"].append({ | |
"name": "Update record", | |
"success": success | |
}) | |
# Test 6: Verify the update | |
logger.info(f"[{operation_id}] Test 6: Verify the update") | |
updated_records = db_http.select_records( | |
"test_table_http_util", | |
condition="id = ?", | |
condition_params=[{"type": "integer", "value": str(insert_id)}], | |
limit=1, | |
operation_id=f"{operation_id}_6" | |
) | |
results["operations"].append({ | |
"name": "Verify update", | |
"success": len(updated_records) > 0 and updated_records[0].get("value") == updated_value, | |
"result": str(updated_records[0]) if updated_records else None | |
}) | |
# Test 7: Count records | |
logger.info(f"[{operation_id}] Test 7: Count records") | |
count = db_http.count_records("test_table_http_util", operation_id=f"{operation_id}_7") | |
results["operations"].append({ | |
"name": "Count records", | |
"success": count > 0, | |
"count": count | |
}) | |
# Test 8: List all records | |
logger.info(f"[{operation_id}] Test 8: List all records") | |
all_records = db_http.select_records( | |
"test_table_http_util", | |
limit=10, | |
operation_id=f"{operation_id}_8" | |
) | |
results["operations"].append({ | |
"name": "List all records", | |
"success": True, | |
"count": len(all_records), | |
"records": all_records | |
}) | |
# Test 9: Create a test user | |
logger.info(f"[{operation_id}] Test 9: Create a test user") | |
# Create password hashing context | |
from app.api.routes.auth_router import get_password_hash | |
test_user_email = f"test_user_http_util_{int(time.time())}@example.com" | |
test_user_password = "TestPassword123!" | |
hashed_password = get_password_hash(test_user_password) | |
user_id = db_http.insert_record( | |
"users", | |
{ | |
"email": test_user_email, | |
"hashed_password": hashed_password, | |
"is_admin": 0 | |
}, | |
operation_id=f"{operation_id}_9" | |
) | |
results["operations"].append({ | |
"name": "Create test user", | |
"success": user_id is not None, | |
"user_id": user_id, | |
"email": test_user_email | |
}) | |
# Test 10: Verify the user was created | |
logger.info(f"[{operation_id}] Test 10: Verify the user was created") | |
users = db_http.select_records( | |
"users", | |
condition="email = ?", | |
condition_params=[{"type": "text", "value": test_user_email}], | |
limit=1, | |
operation_id=f"{operation_id}_10" | |
) | |
results["operations"].append({ | |
"name": "Verify test user", | |
"success": len(users) > 0, | |
"result": str(users[0]) if users else None | |
}) | |
logger.info(f"[{operation_id}] HTTP API utility test completed successfully") | |
return results | |
except Exception as e: | |
logger.error(f"Error during HTTP API utility test: {str(e)}") | |
return { | |
"error": str(e), | |
"operations": [] | |
} | |
if __name__ == "__main__": | |
port = int(os.getenv("PORT", 7860)) | |
logger.info(f"Starting server on port {port}") | |
uvicorn.run("main:app", host="0.0.0.0", port=port) | |