File size: 7,880 Bytes
82bb0a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3
"""
Direct SQL Script for Turso Database

This script uses the HTTP API directly to interact with the Turso database.
It's designed to work around issues with the libsql-experimental driver.
"""

import os
import sys
import json
import requests
import logging
import time
from dotenv import load_dotenv

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger("direct-sql")

# Load environment variables
load_dotenv()

# Get Turso database connection details
DATABASE_URL = os.getenv("TURSO_DATABASE_URL")
AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN")

if not DATABASE_URL or not AUTH_TOKEN:
    logger.error("Missing Turso credentials. TURSO_DATABASE_URL and TURSO_AUTH_TOKEN must be set.")
    sys.exit(1)

# Clean the auth token
AUTH_TOKEN = AUTH_TOKEN.strip()

# Convert URL from libsql:// to https://
if DATABASE_URL.startswith("libsql://"):
    HTTP_URL = DATABASE_URL.replace("libsql://", "https://")
else:
    HTTP_URL = DATABASE_URL

# Ensure the URL doesn't have a trailing slash
HTTP_URL = HTTP_URL.rstrip('/')

# Verify the URL format
if not HTTP_URL.startswith("https://"):
    logger.warning(f"HTTP URL does not start with https://: {HTTP_URL}")
    # Try to fix the URL
    if "://" not in HTTP_URL:
        HTTP_URL = f"https://{HTTP_URL}"
        logger.info(f"Added https:// prefix to URL: {HTTP_URL}")

logger.info(f"Using HTTP URL: {HTTP_URL}")

def execute_sql(sql, params=None):
    """Execute SQL using the HTTP API directly."""
    # Format the request according to the v2/pipeline specification
    requests_data = []

    # Prepare the statement
    stmt = {"sql": sql}

    # Add parameters if provided
    if params:
        # Convert parameters to the expected format
        args = []
        for param in params:
            if param is None:
                args.append({"type": "null", "value": None})
            elif isinstance(param, int):
                args.append({"type": "integer", "value": str(param)})
            elif isinstance(param, float):
                args.append({"type": "float", "value": str(param)})
            else:
                args.append({"type": "text", "value": str(param)})

        stmt["args"] = args

    requests_data.append({"type": "execute", "stmt": stmt})

    # Always close the connection at the end
    requests_data.append({"type": "close"})

    # Prepare the final request payload
    data = {"requests": requests_data}

    # Use the v2/pipeline endpoint
    pipeline_url = f"{HTTP_URL}/v2/pipeline"
    logger.info(f"Sending request to: {pipeline_url}")

    headers = {
        "Authorization": f"Bearer {AUTH_TOKEN}",
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(pipeline_url, headers=headers, json=data, timeout=10)

        # Log response status
        logger.info(f"Response status: {response.status_code}")

        # Check for auth errors specifically
        if response.status_code == 401:
            logger.error(f"Authentication error (401): {response.text}")
            return None

        # Raise for other errors
        response.raise_for_status()

        # Parse the response
        result = response.json()

        # Process the response
        if "results" in result and len(result["results"]) > 0:
            return result["results"][0]

        return None
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP request failed: {str(e)}")
        return None

def create_test_user():
    """Create a test user with email [email protected]."""
    # Generate a unique test ID
    test_id = f"test_{int(time.time())}"
    logger.info(f"[{test_id}] Starting test user creation")

    # First check if the test user already exists
    logger.info(f"[{test_id}] Checking if test user already exists")
    check_query = "SELECT id FROM users WHERE email = ?"
    check_result = execute_sql(check_query, ["[email protected]"])

    if check_result and "rows" in check_result and len(check_result["rows"]) > 0:
        user_id = check_result["rows"][0]["values"][0]
        logger.info(f"[{test_id}] Test user already exists with ID: {user_id}")
        return {
            "success": True,
            "user_id": user_id,
            "email": "[email protected]",
            "status": "already_exists"
        }

    # Use a pre-hashed password
    logger.info(f"[{test_id}] Using pre-hashed password for test user")
    # This is a pre-hashed version of "TestPassword123!" using Argon2
    hashed_password = "$argon2id$v=19$m=65536,t=3,p=4$NElQRUZCWDRZSHpIWWRGSA$TYU8R7EfXGgEu9FWZGMX9AVwmMwpSKECCZMXgbzr6JE"

    # Insert the test user
    logger.info(f"[{test_id}] Inserting test user with email: [email protected]")
    insert_query = "INSERT INTO users (email, hashed_password) VALUES (?, ?)"
    insert_result = execute_sql(insert_query, ["[email protected]", hashed_password])

    if insert_result:
        logger.info(f"[{test_id}] Test user inserted successfully")

        # Get the last inserted ID
        id_query = "SELECT last_insert_rowid()"
        id_result = execute_sql(id_query, [])

        if id_result and "rows" in id_result and len(id_result["rows"]) > 0:
            user_id = id_result["rows"][0]["values"][0]
            logger.info(f"[{test_id}] Got user ID: {user_id}")
        else:
            logger.warning(f"[{test_id}] Could not get user ID")
            user_id = None

        # Verify the insert
        verify_query = "SELECT id, email, created_at FROM users WHERE email = ?"
        verify_result = execute_sql(verify_query, ["[email protected]"])

        if verify_result and "rows" in verify_result and len(verify_result["rows"]) > 0:
            user_data = verify_result["rows"][0]["values"]
            user_id = user_data[0]
            logger.info(f"[{test_id}] Verified test user with ID: {user_id}")
            return {
                "success": True,
                "user_id": user_id,
                "email": "[email protected]",
                "created_at": user_data[2] if len(user_data) > 2 else None,
                "status": "created"
            }
        else:
            logger.error(f"[{test_id}] Failed to verify test user after insert")
            return {
                "success": False,
                "error": "Failed to verify user after insert"
            }
    else:
        logger.error(f"[{test_id}] Failed to insert test user")
        return {
            "success": False,
            "error": "Failed to insert test user"
        }

def list_all_users():
    """List all users in the database."""
    # Generate a unique test ID
    test_id = f"test_{int(time.time())}"
    logger.info(f"[{test_id}] Listing all users")
    query = "SELECT id, email, created_at FROM users"
    result = execute_sql(query, [])

    if result and "rows" in result and len(result["rows"]) > 0:
        users = []
        for row in result["rows"]:
            users.append({
                "id": row["values"][0],
                "email": row["values"][1],
                "created_at": row["values"][2]
            })
        logger.info(f"[{test_id}] Found {len(users)} users")
        return users
    else:
        logger.info(f"[{test_id}] No users found")
        return []

def main():
    """Main function."""
    if len(sys.argv) < 2:
        print("Usage: python direct_sql.py [create_test_user|list_users]")
        sys.exit(1)

    command = sys.argv[1]

    if command == "create_test_user":
        result = create_test_user()
        print(json.dumps(result, indent=2))
    elif command == "list_users":
        users = list_all_users()
        print(json.dumps(users, indent=2))
    else:
        print(f"Unknown command: {command}")
        sys.exit(1)

if __name__ == "__main__":
    main()