auth-server / direct_sql.py
kamau1's picture
Upload 11 files
82bb0a3 verified
#!/usr/bin/env python3
"""
Direct SQL Script for Turso Database
This script uses the HTTP API directly to interact with the Turso database.
It's designed to work around issues with the libsql-experimental driver.
"""
import os
import sys
import json
import requests
import logging
import time
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger("direct-sql")
# Load environment variables
load_dotenv()
# Get Turso database connection details
DATABASE_URL = os.getenv("TURSO_DATABASE_URL")
AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN")
if not DATABASE_URL or not AUTH_TOKEN:
logger.error("Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set.")
sys.exit(1)
# Clean the auth token
AUTH_TOKEN = AUTH_TOKEN.strip()
# Convert URL from libsql:// to https://
if DATABASE_URL.startswith("libsql://"):
HTTP_URL = DATABASE_URL.replace("libsql://", "https://")
else:
HTTP_URL = DATABASE_URL
# Ensure the URL doesn't have a trailing slash
HTTP_URL = HTTP_URL.rstrip('/')
# Verify the URL format
if not HTTP_URL.startswith("https://"):
logger.warning(f"HTTP URL does not start with https://: {HTTP_URL}")
# Try to fix the URL
if "://" not in HTTP_URL:
HTTP_URL = f"https://{HTTP_URL}"
logger.info(f"Added https:// prefix to URL: {HTTP_URL}")
logger.info(f"Using HTTP URL: {HTTP_URL}")
def execute_sql(sql, params=None):
"""Execute SQL using the HTTP API directly."""
# Format the request according to the v2/pipeline specification
requests_data = []
# Prepare the statement
stmt = {"sql": sql}
# Add parameters if provided
if params:
# Convert parameters to the expected format
args = []
for param in params:
if param is None:
args.append({"type": "null", "value": None})
elif isinstance(param, int):
args.append({"type": "integer", "value": str(param)})
elif isinstance(param, float):
args.append({"type": "float", "value": str(param)})
else:
args.append({"type": "text", "value": str(param)})
stmt["args"] = args
requests_data.append({"type": "execute", "stmt": stmt})
# Always close the connection at the end
requests_data.append({"type": "close"})
# Prepare the final request payload
data = {"requests": requests_data}
# Use the v2/pipeline endpoint
pipeline_url = f"{HTTP_URL}/v2/pipeline"
logger.info(f"Sending request to: {pipeline_url}")
headers = {
"Authorization": f"Bearer {AUTH_TOKEN}",
"Content-Type": "application/json"
}
try:
response = requests.post(pipeline_url, headers=headers, json=data, timeout=10)
# Log response status
logger.info(f"Response status: {response.status_code}")
# Check for auth errors specifically
if response.status_code == 401:
logger.error(f"Authentication error (401): {response.text}")
return None
# Raise for other errors
response.raise_for_status()
# Parse the response
result = response.json()
# Process the response
if "results" in result and len(result["results"]) > 0:
return result["results"][0]
return None
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {str(e)}")
return None
def create_test_user():
"""Create a test user with email [email protected]."""
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Starting test user creation")
# First check if the test user already exists
logger.info(f"[{test_id}] Checking if test user already exists")
check_query = "SELECT id FROM users WHERE email = ?"
check_result = execute_sql(check_query, ["[email protected]"])
if check_result and "rows" in check_result and len(check_result["rows"]) > 0:
user_id = check_result["rows"][0]["values"][0]
logger.info(f"[{test_id}] Test user already exists with ID: {user_id}")
return {
"success": True,
"user_id": user_id,
"email": "[email protected]",
"status": "already_exists"
}
# Use a pre-hashed password
logger.info(f"[{test_id}] Using pre-hashed password for test user")
# This is a pre-hashed version of "TestPassword123!" using Argon2
hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE"
# Insert the test user
logger.info(f"[{test_id}] Inserting test user with email: [email protected]")
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)"
insert_result = execute_sql(insert_query, ["[email protected]", hashed_password])
if insert_result:
logger.info(f"[{test_id}] Test user inserted successfully")
# Get the last inserted ID
id_query = "SELECT last_insert_rowid()"
id_result = execute_sql(id_query, [])
if id_result and "rows" in id_result and len(id_result["rows"]) > 0:
user_id = id_result["rows"][0]["values"][0]
logger.info(f"[{test_id}] Got user ID: {user_id}")
else:
logger.warning(f"[{test_id}] Could not get user ID")
user_id = None
# Verify the insert
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?"
verify_result = execute_sql(verify_query, ["[email protected]"])
if verify_result and "rows" in verify_result and len(verify_result["rows"]) > 0:
user_data = verify_result["rows"][0]["values"]
user_id = user_data[0]
logger.info(f"[{test_id}] Verified test user with ID: {user_id}")
return {
"success": True,
"user_id": user_id,
"email": "[email protected]",
"created_at": user_data[2] if len(user_data) > 2 else None,
"status": "created"
}
else:
logger.error(f"[{test_id}] Failed to verify test user after insert")
return {
"success": False,
"error": "Failed to verify user after insert"
}
else:
logger.error(f"[{test_id}] Failed to insert test user")
return {
"success": False,
"error": "Failed to insert test user"
}
def list_all_users():
"""List all users in the database."""
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Listing all users")
query = "SELECT id, email, created_at FROM users"
result = execute_sql(query, [])
if result and "rows" in result and len(result["rows"]) > 0:
users = []
for row in result["rows"]:
users.append({
"id": row["values"][0],
"email": row["values"][1],
"created_at": row["values"][2]
})
logger.info(f"[{test_id}] Found {len(users)} users")
return users
else:
logger.info(f"[{test_id}] No users found")
return []
def main():
"""Main function."""
if len(sys.argv) < 2:
print("Usage: python direct_sql.py [create_test_user|list_users]")
sys.exit(1)
command = sys.argv[1]
if command == "create_test_user":
result = create_test_user()
print(json.dumps(result, indent=2))
elif command == "list_users":
users = list_all_users()
print(json.dumps(users, indent=2))
else:
print(f"Unknown command: {command}")
sys.exit(1)
if __name__ == "__main__":
main()