Spaces:
Running
Running
""" | |
Database utility module for HTTP API operations with Turso database. | |
This module provides functions for interacting with the Turso database using the HTTP API. | |
""" | |
import os | |
import logging | |
import time | |
import requests as req | |
from typing import List, Dict, Any, Optional, Tuple | |
# Configure logging | |
logger = logging.getLogger("auth-server") | |
def get_http_url() -> Tuple[str, str]: | |
""" | |
Get the HTTP URL and auth token for the Turso database. | |
Returns: | |
Tuple[str, str]: The HTTP URL and auth token. | |
""" | |
# Extract the database URL and auth token | |
db_url = os.getenv("TURSO_DATABASE_URL", "") | |
auth_token = os.getenv("TURSO_AUTH_TOKEN", "") | |
# Convert URL from libsql:// to https:// | |
if db_url.startswith("libsql://"): | |
http_url = db_url.replace("libsql://", "https://") | |
else: | |
http_url = db_url | |
# Ensure the URL doesn't have a trailing slash | |
http_url = http_url.rstrip('/') | |
return http_url, auth_token | |
def get_headers(auth_token: str) -> Dict[str, str]: | |
""" | |
Get the headers for the HTTP request. | |
Args: | |
auth_token (str): The authentication token. | |
Returns: | |
Dict[str, str]: The headers for the HTTP request. | |
""" | |
return { | |
"Authorization": f"Bearer {auth_token}", | |
"Content-Type": "application/json" | |
} | |
def execute_query(sql: str, params: List[Dict[str, Any]] = None, operation_id: str = None) -> Dict[str, Any]: | |
""" | |
Execute a SQL query using the HTTP API. | |
Args: | |
sql (str): The SQL query to execute. | |
params (List[Dict[str, Any]], optional): The parameters for the query. Defaults to None. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
Dict[str, Any]: The response from the database. | |
""" | |
if operation_id is None: | |
operation_id = f"query_{int(time.time())}" | |
# Only log at debug level for routine operations | |
logger.debug(f"[{operation_id}] Executing query: {sql}") | |
http_url, auth_token = get_http_url() | |
headers = get_headers(auth_token) | |
# Prepare the query | |
query = { | |
"requests": [ | |
{ | |
"type": "execute", | |
"stmt": { | |
"sql": sql, | |
"args": params or [] | |
} | |
}, | |
{"type": "close"} | |
] | |
} | |
# Send the request | |
try: | |
response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=query) | |
response.raise_for_status() | |
result = response.json() | |
logger.debug(f"[{operation_id}] Query executed successfully") | |
return result | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error executing query: {str(e)}") | |
raise | |
def execute_pipeline(pipeline_requests: List[Dict[str, Any]], operation_id: str = None) -> Dict[str, Any]: | |
""" | |
Execute a pipeline of SQL queries using the HTTP API. | |
Args: | |
pipeline_requests (List[Dict[str, Any]]): The requests to execute. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
Dict[str, Any]: The response from the database. | |
""" | |
if operation_id is None: | |
operation_id = f"pipeline_{int(time.time())}" | |
logger.debug(f"[{operation_id}] Executing pipeline with {len(pipeline_requests)} requests") | |
http_url, auth_token = get_http_url() | |
headers = get_headers(auth_token) | |
# Prepare the pipeline | |
pipeline = { | |
"requests": pipeline_requests + [{"type": "close"}] | |
} | |
# Send the request | |
try: | |
response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=pipeline) | |
response.raise_for_status() | |
result = response.json() | |
logger.debug(f"[{operation_id}] Pipeline executed successfully") | |
return result | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error executing pipeline: {str(e)}") | |
raise | |
def insert_record(table: str, data: Dict[str, Any], operation_id: str = None) -> Optional[int]: | |
""" | |
Insert a record into a table. | |
Args: | |
table (str): The table to insert into. | |
data (Dict[str, Any]): The data to insert. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
Optional[int]: The ID of the inserted record, or None if the insert failed. | |
""" | |
if operation_id is None: | |
operation_id = f"insert_{int(time.time())}" | |
logger.debug(f"[{operation_id}] Inserting record into {table}") | |
# Prepare the SQL and parameters | |
columns = list(data.keys()) | |
placeholders = ["?"] * len(columns) | |
sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})" | |
params = [] | |
for key, value in data.items(): | |
if isinstance(value, str): | |
params.append({"type": "text", "value": value}) | |
elif isinstance(value, int): | |
params.append({"type": "integer", "value": str(value)}) | |
elif isinstance(value, float): | |
params.append({"type": "float", "value": str(value)}) | |
elif value is None: | |
params.append({"type": "null", "value": None}) | |
else: | |
params.append({"type": "text", "value": str(value)}) | |
# Execute the query directly instead of using pipeline | |
try: | |
# Direct query execution is more reliable | |
result = execute_query(sql, params, operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Insert error: {error_msg}") | |
return None | |
# Try to get the last inserted ID with a separate query | |
try: | |
id_result = execute_query("SELECT last_insert_rowid()", operation_id=f"{operation_id}_get_id") | |
last_id = None | |
if "results" in id_result and len(id_result["results"]) > 0: | |
if id_result["results"][0]["type"] == "ok": | |
response = id_result["results"][0]["response"] | |
if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0: | |
row = response["result"]["rows"][0] | |
# Get the value safely | |
if len(row) > 0: | |
cell = row[0] | |
if isinstance(cell, dict) and "value" in cell: | |
try: | |
last_id = int(cell["value"]) | |
logger.info(f"[{operation_id}] Record inserted with ID: {last_id}") | |
return last_id | |
except (ValueError, TypeError) as e: | |
logger.warning(f"[{operation_id}] Error converting ID to integer: {str(e)}") | |
# Try to return the value as is | |
return cell["value"] | |
else: | |
logger.warning(f"[{operation_id}] Unexpected cell format: {cell}") | |
else: | |
logger.warning(f"[{operation_id}] Empty row in last_insert_rowid result") | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error getting last insert ID: {str(e)}") | |
# If we can't get the ID, try to find it by other means | |
logger.warning(f"[{operation_id}] Insert succeeded but couldn't get ID") | |
return None | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error inserting record: {str(e)}") | |
return None | |
def update_record(table: str, data: Dict[str, Any], condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: | |
""" | |
Update a record in a table. | |
Args: | |
table (str): The table to update. | |
data (Dict[str, Any]): The data to update. | |
condition (str): The condition for the update (e.g., "id = ?"). | |
condition_params (List[Dict[str, Any]]): The parameters for the condition. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
bool: True if the update succeeded, False otherwise. | |
""" | |
if operation_id is None: | |
operation_id = f"update_{int(time.time())}" | |
logger.debug(f"[{operation_id}] Updating record in {table}") | |
# Prepare the SQL and parameters | |
set_clauses = [f"{key} = ?" for key in data.keys()] | |
sql = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE {condition}" | |
params = [] | |
for key, value in data.items(): | |
if isinstance(value, str): | |
params.append({"type": "text", "value": value}) | |
elif isinstance(value, int): | |
params.append({"type": "integer", "value": str(value)}) | |
elif isinstance(value, float): | |
params.append({"type": "float", "value": str(value)}) | |
elif value is None: | |
params.append({"type": "null", "value": None}) | |
else: | |
params.append({"type": "text", "value": str(value)}) | |
# Add condition parameters | |
params.extend(condition_params) | |
try: | |
result = execute_query(sql, params, operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Update error: {error_msg}") | |
return False | |
# Check affected rows | |
affected_rows = result["results"][0]["response"]["result"]["affected_row_count"] | |
logger.debug(f"[{operation_id}] Updated {affected_rows} rows") | |
return affected_rows > 0 | |
return False | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error updating record: {str(e)}") | |
return False | |
def delete_record(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: | |
""" | |
Delete a record from a table. | |
Args: | |
table (str): The table to delete from. | |
condition (str): The condition for the delete (e.g., "id = ?"). | |
condition_params (List[Dict[str, Any]]): The parameters for the condition. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
bool: True if the delete succeeded, False otherwise. | |
""" | |
if operation_id is None: | |
operation_id = f"delete_{int(time.time())}" | |
logger.info(f"[{operation_id}] Deleting record from {table}") | |
# Prepare the SQL | |
sql = f"DELETE FROM {table} WHERE {condition}" | |
try: | |
result = execute_query(sql, condition_params, operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Delete error: {error_msg}") | |
return False | |
# Check affected rows | |
affected_rows = result["results"][0]["response"]["result"]["affected_row_count"] | |
logger.info(f"[{operation_id}] Deleted {affected_rows} rows") | |
return affected_rows > 0 | |
return False | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error deleting record: {str(e)}") | |
return False | |
def select_records(table: str, columns: List[str] = None, condition: str = None, | |
condition_params: List[Dict[str, Any]] = None, limit: int = None, | |
offset: int = None, order_by: str = None, operation_id: str = None) -> List[Dict[str, Any]]: | |
""" | |
Select records from a table. | |
Args: | |
table (str): The table to select from. | |
columns (List[str], optional): The columns to select. Defaults to None (all columns). | |
condition (str, optional): The condition for the select. Defaults to None. | |
condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None. | |
limit (int, optional): The maximum number of records to return. Defaults to None. | |
offset (int, optional): The number of records to skip. Defaults to None. | |
order_by (str, optional): The order by clause. Defaults to None. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
List[Dict[str, Any]]: The selected records. | |
""" | |
if operation_id is None: | |
operation_id = f"select_{int(time.time())}" | |
logger.debug(f"[{operation_id}] Selecting records from {table}") | |
# Prepare the SQL | |
cols = "*" if not columns else ", ".join(columns) | |
sql = f"SELECT {cols} FROM {table}" | |
if condition: | |
sql += f" WHERE {condition}" | |
if order_by: | |
sql += f" ORDER BY {order_by}" | |
if limit: | |
sql += f" LIMIT {limit}" | |
if offset: | |
sql += f" OFFSET {offset}" | |
try: | |
result = execute_query(sql, condition_params, operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Select error: {error_msg}") | |
return [] | |
# Extract the records | |
response = result["results"][0]["response"] | |
if "result" in response and "rows" in response["result"]: | |
rows = response["result"]["rows"] | |
cols = response["result"]["cols"] | |
# Convert rows to dictionaries | |
records = [] | |
for row in rows: | |
record = {} | |
for i, col in enumerate(cols): | |
try: | |
# Get column name safely | |
col_name = col.get("name", f"column_{i}") | |
# Get cell data safely | |
cell = row[i] if i < len(row) else {} | |
# Get value and type safely | |
value = cell.get("value") if isinstance(cell, dict) else cell | |
cell_type = cell.get("type") if isinstance(cell, dict) else None | |
# Convert value based on type | |
if cell_type == "integer" and value is not None: | |
try: | |
value = int(value) | |
except (ValueError, TypeError): | |
value = 0 | |
elif cell_type == "float" and value is not None: | |
try: | |
value = float(value) | |
except (ValueError, TypeError): | |
value = 0.0 | |
elif cell_type == "null": | |
value = None | |
# Store the value | |
record[col_name] = value | |
except Exception as e: | |
# Handle any errors | |
logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}") | |
record[f"column_{i}"] = None | |
records.append(record) | |
logger.debug(f"[{operation_id}] Selected {len(records)} records") | |
return records | |
logger.debug(f"[{operation_id}] No records found") | |
return [] | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error selecting records: {str(e)}") | |
return [] | |
def get_record_by_id(table: str, id: int, columns: List[str] = None, operation_id: str = None) -> Optional[Dict[str, Any]]: | |
""" | |
Get a record by ID. | |
Args: | |
table (str): The table to select from. | |
id (int): The ID of the record. | |
columns (List[str], optional): The columns to select. Defaults to None (all columns). | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
Optional[Dict[str, Any]]: The record, or None if not found. | |
""" | |
if operation_id is None: | |
operation_id = f"get_by_id_{int(time.time())}" | |
try: | |
condition = "id = ?" | |
condition_params = [{"type": "integer", "value": str(id)}] | |
records = select_records(table, columns, condition, condition_params, limit=1, operation_id=operation_id) | |
if records and len(records) > 0: | |
logger.debug(f"[{operation_id}] Found record by ID {id}: {records[0]}") | |
return records[0] | |
logger.warning(f"[{operation_id}] No record found with ID {id} in table {table}") | |
return None | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error getting record by ID {id}: {str(e)}") | |
# Try a direct query as a fallback | |
try: | |
logger.info(f"[{operation_id}] Trying direct query as fallback") | |
cols = "*" if not columns else ", ".join(columns) | |
sql = f"SELECT {cols} FROM {table} WHERE id = ? LIMIT 1" | |
result = execute_query(sql, [{"type": "integer", "value": str(id)}], operation_id=f"{operation_id}_fallback") | |
if "results" in result and len(result["results"]) > 0 and result["results"][0]["type"] == "ok": | |
response = result["results"][0]["response"] | |
if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0: | |
row = response["result"]["rows"][0] | |
cols = response["result"]["cols"] | |
record = {} | |
for i, col in enumerate(cols): | |
try: | |
# Get column name safely | |
col_name = col.get("name", f"column_{i}") | |
# Get cell data safely | |
cell = row[i] if i < len(row) else {} | |
# Get value and type safely | |
value = cell.get("value") if isinstance(cell, dict) else cell | |
cell_type = cell.get("type") if isinstance(cell, dict) else None | |
# Convert value based on type | |
if cell_type == "integer" and value is not None: | |
try: | |
value = int(value) | |
except (ValueError, TypeError): | |
value = 0 | |
elif cell_type == "float" and value is not None: | |
try: | |
value = float(value) | |
except (ValueError, TypeError): | |
value = 0.0 | |
elif cell_type == "null": | |
value = None | |
# Store the value | |
record[col_name] = value | |
except Exception as e: | |
# Handle any errors | |
logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}") | |
record[f"column_{i}"] = None | |
logger.info(f"[{operation_id}] Found record by ID {id} using fallback: {record}") | |
return record | |
logger.warning(f"[{operation_id}] No record found with ID {id} using fallback") | |
return None | |
except Exception as e2: | |
logger.error(f"[{operation_id}] Error in fallback query: {str(e2)}") | |
return None | |
def count_records(table: str, condition: str = None, condition_params: List[Dict[str, Any]] = None, operation_id: str = None) -> int: | |
""" | |
Count records in a table. | |
Args: | |
table (str): The table to count records in. | |
condition (str, optional): The condition for the count. Defaults to None. | |
condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
int: The number of records. | |
""" | |
if operation_id is None: | |
operation_id = f"count_{int(time.time())}" | |
logger.info(f"[{operation_id}] Counting records in {table}") | |
# Prepare the SQL | |
sql = f"SELECT COUNT(*) FROM {table}" | |
if condition: | |
sql += f" WHERE {condition}" | |
try: | |
result = execute_query(sql, condition_params, operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Count error: {error_msg}") | |
return 0 | |
# Extract the count | |
response = result["results"][0]["response"] | |
if "result" in response and "rows" in response["result"] and response["result"]["rows"]: | |
try: | |
row = response["result"]["rows"][0] | |
# Get the value safely | |
if len(row) > 0: | |
cell = row[0] | |
if isinstance(cell, dict) and "value" in cell: | |
try: | |
count = int(cell["value"]) | |
logger.info(f"[{operation_id}] Counted {count} records") | |
return count | |
except (ValueError, TypeError) as e: | |
logger.warning(f"[{operation_id}] Error converting count to integer: {str(e)}") | |
return 0 | |
else: | |
logger.warning(f"[{operation_id}] Unexpected cell format in count: {cell}") | |
return 0 | |
else: | |
logger.warning(f"[{operation_id}] Empty row in count result") | |
return 0 | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error extracting count: {str(e)}") | |
return 0 | |
logger.warning(f"[{operation_id}] Count failed") | |
return 0 | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error counting records: {str(e)}") | |
return 0 | |
def record_exists(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: | |
""" | |
Check if a record exists in a table. | |
Args: | |
table (str): The table to check. | |
condition (str): The condition for the check. | |
condition_params (List[Dict[str, Any]]): The parameters for the condition. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
bool: True if the record exists, False otherwise. | |
""" | |
count = count_records(table, condition, condition_params, operation_id) | |
return count > 0 | |
def create_table_if_not_exists(table: str, schema: str, operation_id: str = None) -> bool: | |
""" | |
Create a table if it doesn't exist. | |
Args: | |
table (str): The table to create. | |
schema (str): The schema for the table. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
bool: True if the table was created or already exists, False otherwise. | |
""" | |
if operation_id is None: | |
operation_id = f"create_table_{int(time.time())}" | |
logger.info(f"[{operation_id}] Creating table {table} if not exists") | |
# Prepare the SQL | |
sql = f"CREATE TABLE IF NOT EXISTS {table} ({schema})" | |
try: | |
result = execute_query(sql, operation_id=operation_id) | |
# Check for errors | |
if "results" in result and len(result["results"]) > 0: | |
if result["results"][0]["type"] == "error": | |
error_msg = result["results"][0]["error"]["message"] | |
logger.error(f"[{operation_id}] Create table error: {error_msg}") | |
return False | |
logger.info(f"[{operation_id}] Table {table} created or already exists") | |
return True | |
return False | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error creating table: {str(e)}") | |
return False | |
def execute_transaction(queries: List[Tuple[str, List[Dict[str, Any]]]], operation_id: str = None) -> bool: | |
""" | |
Execute a transaction with multiple queries. | |
Args: | |
queries (List[Tuple[str, List[Dict[str, Any]]]]): The queries to execute. | |
operation_id (str, optional): A unique identifier for the operation. Defaults to None. | |
Returns: | |
bool: True if the transaction succeeded, False otherwise. | |
""" | |
if operation_id is None: | |
operation_id = f"transaction_{int(time.time())}" | |
logger.info(f"[{operation_id}] Executing transaction with {len(queries)} queries") | |
# Prepare the pipeline | |
requests = [{"type": "execute", "stmt": {"sql": "BEGIN"}}] | |
for sql, params in queries: | |
requests.append({ | |
"type": "execute", | |
"stmt": { | |
"sql": sql, | |
"args": params or [] | |
} | |
}) | |
requests.append({"type": "execute", "stmt": {"sql": "COMMIT"}}) | |
try: | |
result = execute_pipeline(requests, operation_id) | |
# Check for errors | |
for i, res in enumerate(result.get("results", [])): | |
if res.get("type") == "error": | |
error_msg = res.get("error", {}).get("message", "Unknown error") | |
logger.error(f"[{operation_id}] Transaction error in query {i}: {error_msg}") | |
# Try to rollback | |
try: | |
execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback") | |
except: | |
pass | |
return False | |
logger.info(f"[{operation_id}] Transaction executed successfully") | |
return True | |
except Exception as e: | |
logger.error(f"[{operation_id}] Error executing transaction: {str(e)}") | |
# Try to rollback | |
try: | |
execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback") | |
except: | |
pass | |
return False | |