auth-server / app /utils /db_http.py
kamau1's picture
Upload 12 files
ac8d4a4 verified
"""
Database utility module for HTTP API operations with Turso database.
This module provides functions for interacting with the Turso database using the HTTP API.
"""
import os
import logging
import time
import requests as req
from typing import List, Dict, Any, Optional, Tuple
# Configure logging
logger = logging.getLogger("auth-server")
def get_http_url() -> Tuple[str, str]:
"""
Get the HTTP URL and auth token for the Turso database.
Returns:
Tuple[str, str]: The HTTP URL and auth token.
"""
# Extract the database URL and auth token
db_url = os.getenv("TURSO_DATABASE_URL", "")
auth_token = os.getenv("TURSO_AUTH_TOKEN", "")
# Convert URL from libsql:// to https://
if db_url.startswith("libsql://"):
http_url = db_url.replace("libsql://", "https://")
else:
http_url = db_url
# Ensure the URL doesn't have a trailing slash
http_url = http_url.rstrip('/')
return http_url, auth_token
def get_headers(auth_token: str) -> Dict[str, str]:
"""
Get the headers for the HTTP request.
Args:
auth_token (str): The authentication token.
Returns:
Dict[str, str]: The headers for the HTTP request.
"""
return {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json"
}
def execute_query(sql: str, params: List[Dict[str, Any]] = None, operation_id: str = None) -> Dict[str, Any]:
"""
Execute a SQL query using the HTTP API.
Args:
sql (str): The SQL query to execute.
params (List[Dict[str, Any]], optional): The parameters for the query. Defaults to None.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
Dict[str, Any]: The response from the database.
"""
if operation_id is None:
operation_id = f"query_{int(time.time())}"
# Only log at debug level for routine operations
logger.debug(f"[{operation_id}] Executing query: {sql}")
http_url, auth_token = get_http_url()
headers = get_headers(auth_token)
# Prepare the query
query = {
"requests": [
{
"type": "execute",
"stmt": {
"sql": sql,
"args": params or []
}
},
{"type": "close"}
]
}
# Send the request
try:
response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=query)
response.raise_for_status()
result = response.json()
logger.debug(f"[{operation_id}] Query executed successfully")
return result
except Exception as e:
logger.error(f"[{operation_id}] Error executing query: {str(e)}")
raise
def execute_pipeline(pipeline_requests: List[Dict[str, Any]], operation_id: str = None) -> Dict[str, Any]:
"""
Execute a pipeline of SQL queries using the HTTP API.
Args:
pipeline_requests (List[Dict[str, Any]]): The requests to execute.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
Dict[str, Any]: The response from the database.
"""
if operation_id is None:
operation_id = f"pipeline_{int(time.time())}"
logger.debug(f"[{operation_id}] Executing pipeline with {len(pipeline_requests)} requests")
http_url, auth_token = get_http_url()
headers = get_headers(auth_token)
# Prepare the pipeline
pipeline = {
"requests": pipeline_requests + [{"type": "close"}]
}
# Send the request
try:
response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=pipeline)
response.raise_for_status()
result = response.json()
logger.debug(f"[{operation_id}] Pipeline executed successfully")
return result
except Exception as e:
logger.error(f"[{operation_id}] Error executing pipeline: {str(e)}")
raise
def insert_record(table: str, data: Dict[str, Any], operation_id: str = None) -> Optional[int]:
"""
Insert a record into a table.
Args:
table (str): The table to insert into.
data (Dict[str, Any]): The data to insert.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
Optional[int]: The ID of the inserted record, or None if the insert failed.
"""
if operation_id is None:
operation_id = f"insert_{int(time.time())}"
logger.debug(f"[{operation_id}] Inserting record into {table}")
# Prepare the SQL and parameters
columns = list(data.keys())
placeholders = ["?"] * len(columns)
sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
params = []
for key, value in data.items():
if isinstance(value, str):
params.append({"type": "text", "value": value})
elif isinstance(value, int):
params.append({"type": "integer", "value": str(value)})
elif isinstance(value, float):
params.append({"type": "float", "value": str(value)})
elif value is None:
params.append({"type": "null", "value": None})
else:
params.append({"type": "text", "value": str(value)})
# Execute the query directly instead of using pipeline
try:
# Direct query execution is more reliable
result = execute_query(sql, params, operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Insert error: {error_msg}")
return None
# Try to get the last inserted ID with a separate query
try:
id_result = execute_query("SELECT last_insert_rowid()", operation_id=f"{operation_id}_get_id")
last_id = None
if "results" in id_result and len(id_result["results"]) > 0:
if id_result["results"][0]["type"] == "ok":
response = id_result["results"][0]["response"]
if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0:
row = response["result"]["rows"][0]
# Get the value safely
if len(row) > 0:
cell = row[0]
if isinstance(cell, dict) and "value" in cell:
try:
last_id = int(cell["value"])
logger.info(f"[{operation_id}] Record inserted with ID: {last_id}")
return last_id
except (ValueError, TypeError) as e:
logger.warning(f"[{operation_id}] Error converting ID to integer: {str(e)}")
# Try to return the value as is
return cell["value"]
else:
logger.warning(f"[{operation_id}] Unexpected cell format: {cell}")
else:
logger.warning(f"[{operation_id}] Empty row in last_insert_rowid result")
except Exception as e:
logger.error(f"[{operation_id}] Error getting last insert ID: {str(e)}")
# If we can't get the ID, try to find it by other means
logger.warning(f"[{operation_id}] Insert succeeded but couldn't get ID")
return None
except Exception as e:
logger.error(f"[{operation_id}] Error inserting record: {str(e)}")
return None
def update_record(table: str, data: Dict[str, Any], condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool:
"""
Update a record in a table.
Args:
table (str): The table to update.
data (Dict[str, Any]): The data to update.
condition (str): The condition for the update (e.g., "id = ?").
condition_params (List[Dict[str, Any]]): The parameters for the condition.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
bool: True if the update succeeded, False otherwise.
"""
if operation_id is None:
operation_id = f"update_{int(time.time())}"
logger.debug(f"[{operation_id}] Updating record in {table}")
# Prepare the SQL and parameters
set_clauses = [f"{key} = ?" for key in data.keys()]
sql = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE {condition}"
params = []
for key, value in data.items():
if isinstance(value, str):
params.append({"type": "text", "value": value})
elif isinstance(value, int):
params.append({"type": "integer", "value": str(value)})
elif isinstance(value, float):
params.append({"type": "float", "value": str(value)})
elif value is None:
params.append({"type": "null", "value": None})
else:
params.append({"type": "text", "value": str(value)})
# Add condition parameters
params.extend(condition_params)
try:
result = execute_query(sql, params, operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Update error: {error_msg}")
return False
# Check affected rows
affected_rows = result["results"][0]["response"]["result"]["affected_row_count"]
logger.debug(f"[{operation_id}] Updated {affected_rows} rows")
return affected_rows > 0
return False
except Exception as e:
logger.error(f"[{operation_id}] Error updating record: {str(e)}")
return False
def delete_record(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool:
"""
Delete a record from a table.
Args:
table (str): The table to delete from.
condition (str): The condition for the delete (e.g., "id = ?").
condition_params (List[Dict[str, Any]]): The parameters for the condition.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
bool: True if the delete succeeded, False otherwise.
"""
if operation_id is None:
operation_id = f"delete_{int(time.time())}"
logger.info(f"[{operation_id}] Deleting record from {table}")
# Prepare the SQL
sql = f"DELETE FROM {table} WHERE {condition}"
try:
result = execute_query(sql, condition_params, operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Delete error: {error_msg}")
return False
# Check affected rows
affected_rows = result["results"][0]["response"]["result"]["affected_row_count"]
logger.info(f"[{operation_id}] Deleted {affected_rows} rows")
return affected_rows > 0
return False
except Exception as e:
logger.error(f"[{operation_id}] Error deleting record: {str(e)}")
return False
def select_records(table: str, columns: List[str] = None, condition: str = None,
condition_params: List[Dict[str, Any]] = None, limit: int = None,
offset: int = None, order_by: str = None, operation_id: str = None) -> List[Dict[str, Any]]:
"""
Select records from a table.
Args:
table (str): The table to select from.
columns (List[str], optional): The columns to select. Defaults to None (all columns).
condition (str, optional): The condition for the select. Defaults to None.
condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None.
limit (int, optional): The maximum number of records to return. Defaults to None.
offset (int, optional): The number of records to skip. Defaults to None.
order_by (str, optional): The order by clause. Defaults to None.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
List[Dict[str, Any]]: The selected records.
"""
if operation_id is None:
operation_id = f"select_{int(time.time())}"
logger.debug(f"[{operation_id}] Selecting records from {table}")
# Prepare the SQL
cols = "*" if not columns else ", ".join(columns)
sql = f"SELECT {cols} FROM {table}"
if condition:
sql += f" WHERE {condition}"
if order_by:
sql += f" ORDER BY {order_by}"
if limit:
sql += f" LIMIT {limit}"
if offset:
sql += f" OFFSET {offset}"
try:
result = execute_query(sql, condition_params, operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Select error: {error_msg}")
return []
# Extract the records
response = result["results"][0]["response"]
if "result" in response and "rows" in response["result"]:
rows = response["result"]["rows"]
cols = response["result"]["cols"]
# Convert rows to dictionaries
records = []
for row in rows:
record = {}
for i, col in enumerate(cols):
try:
# Get column name safely
col_name = col.get("name", f"column_{i}")
# Get cell data safely
cell = row[i] if i < len(row) else {}
# Get value and type safely
value = cell.get("value") if isinstance(cell, dict) else cell
cell_type = cell.get("type") if isinstance(cell, dict) else None
# Convert value based on type
if cell_type == "integer" and value is not None:
try:
value = int(value)
except (ValueError, TypeError):
value = 0
elif cell_type == "float" and value is not None:
try:
value = float(value)
except (ValueError, TypeError):
value = 0.0
elif cell_type == "null":
value = None
# Store the value
record[col_name] = value
except Exception as e:
# Handle any errors
logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}")
record[f"column_{i}"] = None
records.append(record)
logger.debug(f"[{operation_id}] Selected {len(records)} records")
return records
logger.debug(f"[{operation_id}] No records found")
return []
except Exception as e:
logger.error(f"[{operation_id}] Error selecting records: {str(e)}")
return []
def get_record_by_id(table: str, id: int, columns: List[str] = None, operation_id: str = None) -> Optional[Dict[str, Any]]:
"""
Get a record by ID.
Args:
table (str): The table to select from.
id (int): The ID of the record.
columns (List[str], optional): The columns to select. Defaults to None (all columns).
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
Optional[Dict[str, Any]]: The record, or None if not found.
"""
if operation_id is None:
operation_id = f"get_by_id_{int(time.time())}"
try:
condition = "id = ?"
condition_params = [{"type": "integer", "value": str(id)}]
records = select_records(table, columns, condition, condition_params, limit=1, operation_id=operation_id)
if records and len(records) > 0:
logger.debug(f"[{operation_id}] Found record by ID {id}: {records[0]}")
return records[0]
logger.warning(f"[{operation_id}] No record found with ID {id} in table {table}")
return None
except Exception as e:
logger.error(f"[{operation_id}] Error getting record by ID {id}: {str(e)}")
# Try a direct query as a fallback
try:
logger.info(f"[{operation_id}] Trying direct query as fallback")
cols = "*" if not columns else ", ".join(columns)
sql = f"SELECT {cols} FROM {table} WHERE id = ? LIMIT 1"
result = execute_query(sql, [{"type": "integer", "value": str(id)}], operation_id=f"{operation_id}_fallback")
if "results" in result and len(result["results"]) > 0 and result["results"][0]["type"] == "ok":
response = result["results"][0]["response"]
if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0:
row = response["result"]["rows"][0]
cols = response["result"]["cols"]
record = {}
for i, col in enumerate(cols):
try:
# Get column name safely
col_name = col.get("name", f"column_{i}")
# Get cell data safely
cell = row[i] if i < len(row) else {}
# Get value and type safely
value = cell.get("value") if isinstance(cell, dict) else cell
cell_type = cell.get("type") if isinstance(cell, dict) else None
# Convert value based on type
if cell_type == "integer" and value is not None:
try:
value = int(value)
except (ValueError, TypeError):
value = 0
elif cell_type == "float" and value is not None:
try:
value = float(value)
except (ValueError, TypeError):
value = 0.0
elif cell_type == "null":
value = None
# Store the value
record[col_name] = value
except Exception as e:
# Handle any errors
logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}")
record[f"column_{i}"] = None
logger.info(f"[{operation_id}] Found record by ID {id} using fallback: {record}")
return record
logger.warning(f"[{operation_id}] No record found with ID {id} using fallback")
return None
except Exception as e2:
logger.error(f"[{operation_id}] Error in fallback query: {str(e2)}")
return None
def count_records(table: str, condition: str = None, condition_params: List[Dict[str, Any]] = None, operation_id: str = None) -> int:
"""
Count records in a table.
Args:
table (str): The table to count records in.
condition (str, optional): The condition for the count. Defaults to None.
condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
int: The number of records.
"""
if operation_id is None:
operation_id = f"count_{int(time.time())}"
logger.info(f"[{operation_id}] Counting records in {table}")
# Prepare the SQL
sql = f"SELECT COUNT(*) FROM {table}"
if condition:
sql += f" WHERE {condition}"
try:
result = execute_query(sql, condition_params, operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Count error: {error_msg}")
return 0
# Extract the count
response = result["results"][0]["response"]
if "result" in response and "rows" in response["result"] and response["result"]["rows"]:
try:
row = response["result"]["rows"][0]
# Get the value safely
if len(row) > 0:
cell = row[0]
if isinstance(cell, dict) and "value" in cell:
try:
count = int(cell["value"])
logger.info(f"[{operation_id}] Counted {count} records")
return count
except (ValueError, TypeError) as e:
logger.warning(f"[{operation_id}] Error converting count to integer: {str(e)}")
return 0
else:
logger.warning(f"[{operation_id}] Unexpected cell format in count: {cell}")
return 0
else:
logger.warning(f"[{operation_id}] Empty row in count result")
return 0
except Exception as e:
logger.error(f"[{operation_id}] Error extracting count: {str(e)}")
return 0
logger.warning(f"[{operation_id}] Count failed")
return 0
except Exception as e:
logger.error(f"[{operation_id}] Error counting records: {str(e)}")
return 0
def record_exists(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool:
"""
Check if a record exists in a table.
Args:
table (str): The table to check.
condition (str): The condition for the check.
condition_params (List[Dict[str, Any]]): The parameters for the condition.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
bool: True if the record exists, False otherwise.
"""
count = count_records(table, condition, condition_params, operation_id)
return count > 0
def create_table_if_not_exists(table: str, schema: str, operation_id: str = None) -> bool:
"""
Create a table if it doesn't exist.
Args:
table (str): The table to create.
schema (str): The schema for the table.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
bool: True if the table was created or already exists, False otherwise.
"""
if operation_id is None:
operation_id = f"create_table_{int(time.time())}"
logger.info(f"[{operation_id}] Creating table {table} if not exists")
# Prepare the SQL
sql = f"CREATE TABLE IF NOT EXISTS {table} ({schema})"
try:
result = execute_query(sql, operation_id=operation_id)
# Check for errors
if "results" in result and len(result["results"]) > 0:
if result["results"][0]["type"] == "error":
error_msg = result["results"][0]["error"]["message"]
logger.error(f"[{operation_id}] Create table error: {error_msg}")
return False
logger.info(f"[{operation_id}] Table {table} created or already exists")
return True
return False
except Exception as e:
logger.error(f"[{operation_id}] Error creating table: {str(e)}")
return False
def execute_transaction(queries: List[Tuple[str, List[Dict[str, Any]]]], operation_id: str = None) -> bool:
"""
Execute a transaction with multiple queries.
Args:
queries (List[Tuple[str, List[Dict[str, Any]]]]): The queries to execute.
operation_id (str, optional): A unique identifier for the operation. Defaults to None.
Returns:
bool: True if the transaction succeeded, False otherwise.
"""
if operation_id is None:
operation_id = f"transaction_{int(time.time())}"
logger.info(f"[{operation_id}] Executing transaction with {len(queries)} queries")
# Prepare the pipeline
requests = [{"type": "execute", "stmt": {"sql": "BEGIN"}}]
for sql, params in queries:
requests.append({
"type": "execute",
"stmt": {
"sql": sql,
"args": params or []
}
})
requests.append({"type": "execute", "stmt": {"sql": "COMMIT"}})
try:
result = execute_pipeline(requests, operation_id)
# Check for errors
for i, res in enumerate(result.get("results", [])):
if res.get("type") == "error":
error_msg = res.get("error", {}).get("message", "Unknown error")
logger.error(f"[{operation_id}] Transaction error in query {i}: {error_msg}")
# Try to rollback
try:
execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback")
except:
pass
return False
logger.info(f"[{operation_id}] Transaction executed successfully")
return True
except Exception as e:
logger.error(f"[{operation_id}] Error executing transaction: {str(e)}")
# Try to rollback
try:
execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback")
except:
pass
return False