#!/usr/bin/env python3 """ Direct SQL Script for Turso Database This script uses the HTTP API directly to interact with the Turso database. It's designed to work around issues with the libsql-experimental driver. """ import os import sys import json import requests import logging import time from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logger = logging.getLogger("direct-sql") # Load environment variables load_dotenv() # Get Turso database connection details DATABASE_URL = os.getenv("TURSO_DATABASE_URL") AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN") if not DATABASE_URL or not AUTH_TOKEN: logger.error("Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set.") sys.exit(1) # Clean the auth token AUTH_TOKEN = AUTH_TOKEN.strip() # Convert URL from libsql:// to https:// if DATABASE_URL.startswith("libsql://"): HTTP_URL = DATABASE_URL.replace("libsql://", "https://") else: HTTP_URL = DATABASE_URL # Ensure the URL doesn't have a trailing slash HTTP_URL = HTTP_URL.rstrip('/') # Verify the URL format if not HTTP_URL.startswith("https://"): logger.warning(f"HTTP URL does not start with https://: {HTTP_URL}") # Try to fix the URL if "://" not in HTTP_URL: HTTP_URL = f"https://{HTTP_URL}" logger.info(f"Added https:// prefix to URL: {HTTP_URL}") logger.info(f"Using HTTP URL: {HTTP_URL}") def execute_sql(sql, params=None): """Execute SQL using the HTTP API directly.""" # Format the request according to the v2/pipeline specification requests_data = [] # Prepare the statement stmt = {"sql": sql} # Add parameters if provided if params: # Convert parameters to the expected format args = [] for param in params: if param is None: args.append({"type": "null", "value": None}) elif isinstance(param, int): args.append({"type": "integer", "value": str(param)}) elif isinstance(param, float): args.append({"type": "float", "value": str(param)}) else: args.append({"type": "text", "value": str(param)}) stmt["args"] = args requests_data.append({"type": "execute", "stmt": stmt}) # Always close the connection at the end requests_data.append({"type": "close"}) # Prepare the final request payload data = {"requests": requests_data} # Use the v2/pipeline endpoint pipeline_url = f"{HTTP_URL}/v2/pipeline" logger.info(f"Sending request to: {pipeline_url}") headers = { "Authorization": f"Bearer {AUTH_TOKEN}", "Content-Type": "application/json" } try: response = requests.post(pipeline_url, headers=headers, json=data, timeout=10) # Log response status logger.info(f"Response status: {response.status_code}") # Check for auth errors specifically if response.status_code == 401: logger.error(f"Authentication error (401): {response.text}") return None # Raise for other errors response.raise_for_status() # Parse the response result = response.json() # Process the response if "results" in result and len(result["results"]) > 0: return result["results"][0] return None except requests.exceptions.RequestException as e: logger.error(f"HTTP request failed: {str(e)}") return None def create_test_user(): """Create a test user with email test@seamo.earth.""" # Generate a unique test ID test_id = f"test_{int(time.time())}" logger.info(f"[{test_id}] Starting test user creation") # First check if the test user already exists logger.info(f"[{test_id}] Checking if test user already exists") check_query = "SELECT id FROM users WHERE email = ?" check_result = execute_sql(check_query, ["test@seamo.earth"]) if check_result and "rows" in check_result and len(check_result["rows"]) > 0: user_id = check_result["rows"][0]["values"][0] logger.info(f"[{test_id}] Test user already exists with ID: {user_id}") return { "success": True, "user_id": user_id, "email": "test@seamo.earth", "status": "already_exists" } # Use a pre-hashed password logger.info(f"[{test_id}] Using pre-hashed password for test user") # This is a pre-hashed version of "TestPassword123!" using Argon2 hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE" # Insert the test user logger.info(f"[{test_id}] Inserting test user with email: test@seamo.earth") insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" insert_result = execute_sql(insert_query, ["test@seamo.earth", hashed_password]) if insert_result: logger.info(f"[{test_id}] Test user inserted successfully") # Get the last inserted ID id_query = "SELECT last_insert_rowid()" id_result = execute_sql(id_query, []) if id_result and "rows" in id_result and len(id_result["rows"]) > 0: user_id = id_result["rows"][0]["values"][0] logger.info(f"[{test_id}] Got user ID: {user_id}") else: logger.warning(f"[{test_id}] Could not get user ID") user_id = None # Verify the insert verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" verify_result = execute_sql(verify_query, ["test@seamo.earth"]) if verify_result and "rows" in verify_result and len(verify_result["rows"]) > 0: user_data = verify_result["rows"][0]["values"] user_id = user_data[0] logger.info(f"[{test_id}] Verified test user with ID: {user_id}") return { "success": True, "user_id": user_id, "email": "test@seamo.earth", "created_at": user_data[2] if len(user_data) > 2 else None, "status": "created" } else: logger.error(f"[{test_id}] Failed to verify test user after insert") return { "success": False, "error": "Failed to verify user after insert" } else: logger.error(f"[{test_id}] Failed to insert test user") return { "success": False, "error": "Failed to insert test user" } def list_all_users(): """List all users in the database.""" # Generate a unique test ID test_id = f"test_{int(time.time())}" logger.info(f"[{test_id}] Listing all users") query = "SELECT id, email, created_at FROM users" result = execute_sql(query, []) if result and "rows" in result and len(result["rows"]) > 0: users = [] for row in result["rows"]: users.append({ "id": row["values"][0], "email": row["values"][1], "created_at": row["values"][2] }) logger.info(f"[{test_id}] Found {len(users)} users") return users else: logger.info(f"[{test_id}] No users found") return [] def main(): """Main function.""" if len(sys.argv) < 2: print("Usage: python direct_sql.py [create_test_user|list_users]") sys.exit(1) command = sys.argv[1] if command == "create_test_user": result = create_test_user() print(json.dumps(result, indent=2)) elif command == "list_users": users = list_all_users() print(json.dumps(users, indent=2)) else: print(f"Unknown command: {command}") sys.exit(1) if __name__ == "__main__": main()