Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Direct SQL Script for Turso Database | |
This script uses the HTTP API directly to interact with the Turso database. | |
It's designed to work around issues with the libsql-experimental driver. | |
""" | |
import os | |
import sys | |
import json | |
import requests | |
import logging | |
import time | |
from dotenv import load_dotenv | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
) | |
logger = logging.getLogger("direct-sql") | |
# Load environment variables | |
load_dotenv() | |
# Get Turso database connection details | |
DATABASE_URL = os.getenv("TURSO_DATABASE_URL") | |
AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN") | |
if not DATABASE_URL or not AUTH_TOKEN: | |
logger.error("Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set.") | |
sys.exit(1) | |
# Clean the auth token | |
AUTH_TOKEN = AUTH_TOKEN.strip() | |
# Convert URL from libsql:// to https:// | |
if DATABASE_URL.startswith("libsql://"): | |
HTTP_URL = DATABASE_URL.replace("libsql://", "https://") | |
else: | |
HTTP_URL = DATABASE_URL | |
# Ensure the URL doesn't have a trailing slash | |
HTTP_URL = HTTP_URL.rstrip('/') | |
# Verify the URL format | |
if not HTTP_URL.startswith("https://"): | |
logger.warning(f"HTTP URL does not start with https://: {HTTP_URL}") | |
# Try to fix the URL | |
if "://" not in HTTP_URL: | |
HTTP_URL = f"https://{HTTP_URL}" | |
logger.info(f"Added https:// prefix to URL: {HTTP_URL}") | |
logger.info(f"Using HTTP URL: {HTTP_URL}") | |
def execute_sql(sql, params=None): | |
"""Execute SQL using the HTTP API directly.""" | |
# Format the request according to the v2/pipeline specification | |
requests_data = [] | |
# Prepare the statement | |
stmt = {"sql": sql} | |
# Add parameters if provided | |
if params: | |
# Convert parameters to the expected format | |
args = [] | |
for param in params: | |
if param is None: | |
args.append({"type": "null", "value": None}) | |
elif isinstance(param, int): | |
args.append({"type": "integer", "value": str(param)}) | |
elif isinstance(param, float): | |
args.append({"type": "float", "value": str(param)}) | |
else: | |
args.append({"type": "text", "value": str(param)}) | |
stmt["args"] = args | |
requests_data.append({"type": "execute", "stmt": stmt}) | |
# Always close the connection at the end | |
requests_data.append({"type": "close"}) | |
# Prepare the final request payload | |
data = {"requests": requests_data} | |
# Use the v2/pipeline endpoint | |
pipeline_url = f"{HTTP_URL}/v2/pipeline" | |
logger.info(f"Sending request to: {pipeline_url}") | |
headers = { | |
"Authorization": f"Bearer {AUTH_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.post(pipeline_url, headers=headers, json=data, timeout=10) | |
# Log response status | |
logger.info(f"Response status: {response.status_code}") | |
# Check for auth errors specifically | |
if response.status_code == 401: | |
logger.error(f"Authentication error (401): {response.text}") | |
return None | |
# Raise for other errors | |
response.raise_for_status() | |
# Parse the response | |
result = response.json() | |
# Process the response | |
if "results" in result and len(result["results"]) > 0: | |
return result["results"][0] | |
return None | |
except requests.exceptions.RequestException as e: | |
logger.error(f"HTTP request failed: {str(e)}") | |
return None | |
def create_test_user(): | |
"""Create a test user with email [email protected].""" | |
# Generate a unique test ID | |
test_id = f"test_{int(time.time())}" | |
logger.info(f"[{test_id}] Starting test user creation") | |
# First check if the test user already exists | |
logger.info(f"[{test_id}] Checking if test user already exists") | |
check_query = "SELECT id FROM users WHERE email = ?" | |
check_result = execute_sql(check_query, ["[email protected]"]) | |
if check_result and "rows" in check_result and len(check_result["rows"]) > 0: | |
user_id = check_result["rows"][0]["values"][0] | |
logger.info(f"[{test_id}] Test user already exists with ID: {user_id}") | |
return { | |
"success": True, | |
"user_id": user_id, | |
"email": "[email protected]", | |
"status": "already_exists" | |
} | |
# Use a pre-hashed password | |
logger.info(f"[{test_id}] Using pre-hashed password for test user") | |
# This is a pre-hashed version of "TestPassword123!" using Argon2 | |
hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE" | |
# Insert the test user | |
logger.info(f"[{test_id}] Inserting test user with email: [email protected]") | |
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)" | |
insert_result = execute_sql(insert_query, ["[email protected]", hashed_password]) | |
if insert_result: | |
logger.info(f"[{test_id}] Test user inserted successfully") | |
# Get the last inserted ID | |
id_query = "SELECT last_insert_rowid()" | |
id_result = execute_sql(id_query, []) | |
if id_result and "rows" in id_result and len(id_result["rows"]) > 0: | |
user_id = id_result["rows"][0]["values"][0] | |
logger.info(f"[{test_id}] Got user ID: {user_id}") | |
else: | |
logger.warning(f"[{test_id}] Could not get user ID") | |
user_id = None | |
# Verify the insert | |
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?" | |
verify_result = execute_sql(verify_query, ["[email protected]"]) | |
if verify_result and "rows" in verify_result and len(verify_result["rows"]) > 0: | |
user_data = verify_result["rows"][0]["values"] | |
user_id = user_data[0] | |
logger.info(f"[{test_id}] Verified test user with ID: {user_id}") | |
return { | |
"success": True, | |
"user_id": user_id, | |
"email": "[email protected]", | |
"created_at": user_data[2] if len(user_data) > 2 else None, | |
"status": "created" | |
} | |
else: | |
logger.error(f"[{test_id}] Failed to verify test user after insert") | |
return { | |
"success": False, | |
"error": "Failed to verify user after insert" | |
} | |
else: | |
logger.error(f"[{test_id}] Failed to insert test user") | |
return { | |
"success": False, | |
"error": "Failed to insert test user" | |
} | |
def list_all_users(): | |
"""List all users in the database.""" | |
# Generate a unique test ID | |
test_id = f"test_{int(time.time())}" | |
logger.info(f"[{test_id}] Listing all users") | |
query = "SELECT id, email, created_at FROM users" | |
result = execute_sql(query, []) | |
if result and "rows" in result and len(result["rows"]) > 0: | |
users = [] | |
for row in result["rows"]: | |
users.append({ | |
"id": row["values"][0], | |
"email": row["values"][1], | |
"created_at": row["values"][2] | |
}) | |
logger.info(f"[{test_id}] Found {len(users)} users") | |
return users | |
else: | |
logger.info(f"[{test_id}] No users found") | |
return [] | |
def main(): | |
"""Main function.""" | |
if len(sys.argv) < 2: | |
print("Usage: python direct_sql.py [create_test_user|list_users]") | |
sys.exit(1) | |
command = sys.argv[1] | |
if command == "create_test_user": | |
result = create_test_user() | |
print(json.dumps(result, indent=2)) | |
elif command == "list_users": | |
users = list_all_users() | |
print(json.dumps(users, indent=2)) | |
else: | |
print(f"Unknown command: {command}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |