Spaces:
Running
Running
File size: 7,880 Bytes
82bb0a3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
#!/usr/bin/env python3
"""
Direct SQL Script for Turso Database
This script uses the HTTP API directly to interact with the Turso database.
It's designed to work around issues with the libsql-experimental driver.
"""
import os
import sys
import json
import requests
import logging
import time
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger("direct-sql")
# Load environment variables
load_dotenv()
# Get Turso database connection details
DATABASE_URL = os.getenv("TURSO_DATABASE_URL")
AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN")
if not DATABASE_URL or not AUTH_TOKEN:
logger.error("Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set.")
sys.exit(1)
# Clean the auth token
AUTH_TOKEN = AUTH_TOKEN.strip()
# Convert URL from libsql:// to https://
if DATABASE_URL.startswith("libsql://"):
HTTP_URL = DATABASE_URL.replace("libsql://", "https://")
else:
HTTP_URL = DATABASE_URL
# Ensure the URL doesn't have a trailing slash
HTTP_URL = HTTP_URL.rstrip('/')
# Verify the URL format
if not HTTP_URL.startswith("https://"):
logger.warning(f"HTTP URL does not start with https://: {HTTP_URL}")
# Try to fix the URL
if "://" not in HTTP_URL:
HTTP_URL = f"https://{HTTP_URL}"
logger.info(f"Added https:// prefix to URL: {HTTP_URL}")
logger.info(f"Using HTTP URL: {HTTP_URL}")
def execute_sql(sql, params=None):
"""Execute SQL using the HTTP API directly."""
# Format the request according to the v2/pipeline specification
requests_data = []
# Prepare the statement
stmt = {"sql": sql}
# Add parameters if provided
if params:
# Convert parameters to the expected format
args = []
for param in params:
if param is None:
args.append({"type": "null", "value": None})
elif isinstance(param, int):
args.append({"type": "integer", "value": str(param)})
elif isinstance(param, float):
args.append({"type": "float", "value": str(param)})
else:
args.append({"type": "text", "value": str(param)})
stmt["args"] = args
requests_data.append({"type": "execute", "stmt": stmt})
# Always close the connection at the end
requests_data.append({"type": "close"})
# Prepare the final request payload
data = {"requests": requests_data}
# Use the v2/pipeline endpoint
pipeline_url = f"{HTTP_URL}/v2/pipeline"
logger.info(f"Sending request to: {pipeline_url}")
headers = {
"Authorization": f"Bearer {AUTH_TOKEN}",
"Content-Type": "application/json"
}
try:
response = requests.post(pipeline_url, headers=headers, json=data, timeout=10)
# Log response status
logger.info(f"Response status: {response.status_code}")
# Check for auth errors specifically
if response.status_code == 401:
logger.error(f"Authentication error (401): {response.text}")
return None
# Raise for other errors
response.raise_for_status()
# Parse the response
result = response.json()
# Process the response
if "results" in result and len(result["results"]) > 0:
return result["results"][0]
return None
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {str(e)}")
return None
def create_test_user():
"""Create a test user with email [email protected]."""
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Starting test user creation")
# First check if the test user already exists
logger.info(f"[{test_id}] Checking if test user already exists")
check_query = "SELECT id FROM users WHERE email = ?"
check_result = execute_sql(check_query, ["[email protected]"])
if check_result and "rows" in check_result and len(check_result["rows"]) > 0:
user_id = check_result["rows"][0]["values"][0]
logger.info(f"[{test_id}] Test user already exists with ID: {user_id}")
return {
"success": True,
"user_id": user_id,
"email": "[email protected]",
"status": "already_exists"
}
# Use a pre-hashed password
logger.info(f"[{test_id}] Using pre-hashed password for test user")
# This is a pre-hashed version of "TestPassword123!" using Argon2
hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE"
# Insert the test user
logger.info(f"[{test_id}] Inserting test user with email: [email protected]")
insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)"
insert_result = execute_sql(insert_query, ["[email protected]", hashed_password])
if insert_result:
logger.info(f"[{test_id}] Test user inserted successfully")
# Get the last inserted ID
id_query = "SELECT last_insert_rowid()"
id_result = execute_sql(id_query, [])
if id_result and "rows" in id_result and len(id_result["rows"]) > 0:
user_id = id_result["rows"][0]["values"][0]
logger.info(f"[{test_id}] Got user ID: {user_id}")
else:
logger.warning(f"[{test_id}] Could not get user ID")
user_id = None
# Verify the insert
verify_query = "SELECT id, email, created_at FROM users WHERE email = ?"
verify_result = execute_sql(verify_query, ["[email protected]"])
if verify_result and "rows" in verify_result and len(verify_result["rows"]) > 0:
user_data = verify_result["rows"][0]["values"]
user_id = user_data[0]
logger.info(f"[{test_id}] Verified test user with ID: {user_id}")
return {
"success": True,
"user_id": user_id,
"email": "[email protected]",
"created_at": user_data[2] if len(user_data) > 2 else None,
"status": "created"
}
else:
logger.error(f"[{test_id}] Failed to verify test user after insert")
return {
"success": False,
"error": "Failed to verify user after insert"
}
else:
logger.error(f"[{test_id}] Failed to insert test user")
return {
"success": False,
"error": "Failed to insert test user"
}
def list_all_users():
"""List all users in the database."""
# Generate a unique test ID
test_id = f"test_{int(time.time())}"
logger.info(f"[{test_id}] Listing all users")
query = "SELECT id, email, created_at FROM users"
result = execute_sql(query, [])
if result and "rows" in result and len(result["rows"]) > 0:
users = []
for row in result["rows"]:
users.append({
"id": row["values"][0],
"email": row["values"][1],
"created_at": row["values"][2]
})
logger.info(f"[{test_id}] Found {len(users)} users")
return users
else:
logger.info(f"[{test_id}] No users found")
return []
def main():
"""Main function."""
if len(sys.argv) < 2:
print("Usage: python direct_sql.py [create_test_user|list_users]")
sys.exit(1)
command = sys.argv[1]
if command == "create_test_user":
result = create_test_user()
print(json.dumps(result, indent=2))
elif command == "list_users":
users = list_all_users()
print(json.dumps(users, indent=2))
else:
print(f"Unknown command: {command}")
sys.exit(1)
if __name__ == "__main__":
main()
|