""" Database utility module for HTTP API operations with Turso database. This module provides functions for interacting with the Turso database using the HTTP API. """ import os import logging import time import requests as req from typing import List, Dict, Any, Optional, Tuple # Configure logging logger = logging.getLogger("auth-server") def get_http_url() -> Tuple[str, str]: """ Get the HTTP URL and auth token for the Turso database. Returns: Tuple[str, str]: The HTTP URL and auth token. """ # Extract the database URL and auth token db_url = os.getenv("TURSO_DATABASE_URL", "") auth_token = os.getenv("TURSO_AUTH_TOKEN", "") # Convert URL from libsql:// to https:// if db_url.startswith("libsql://"): http_url = db_url.replace("libsql://", "https://") else: http_url = db_url # Ensure the URL doesn't have a trailing slash http_url = http_url.rstrip('/') return http_url, auth_token def get_headers(auth_token: str) -> Dict[str, str]: """ Get the headers for the HTTP request. Args: auth_token (str): The authentication token. Returns: Dict[str, str]: The headers for the HTTP request. """ return { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json" } def execute_query(sql: str, params: List[Dict[str, Any]] = None, operation_id: str = None) -> Dict[str, Any]: """ Execute a SQL query using the HTTP API. Args: sql (str): The SQL query to execute. params (List[Dict[str, Any]], optional): The parameters for the query. Defaults to None. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: Dict[str, Any]: The response from the database. """ if operation_id is None: operation_id = f"query_{int(time.time())}" # Only log at debug level for routine operations logger.debug(f"[{operation_id}] Executing query: {sql}") http_url, auth_token = get_http_url() headers = get_headers(auth_token) # Prepare the query query = { "requests": [ { "type": "execute", "stmt": { "sql": sql, "args": params or [] } }, {"type": "close"} ] } # Send the request try: response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=query) response.raise_for_status() result = response.json() logger.debug(f"[{operation_id}] Query executed successfully") return result except Exception as e: logger.error(f"[{operation_id}] Error executing query: {str(e)}") raise def execute_pipeline(pipeline_requests: List[Dict[str, Any]], operation_id: str = None) -> Dict[str, Any]: """ Execute a pipeline of SQL queries using the HTTP API. Args: pipeline_requests (List[Dict[str, Any]]): The requests to execute. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: Dict[str, Any]: The response from the database. """ if operation_id is None: operation_id = f"pipeline_{int(time.time())}" logger.debug(f"[{operation_id}] Executing pipeline with {len(pipeline_requests)} requests") http_url, auth_token = get_http_url() headers = get_headers(auth_token) # Prepare the pipeline pipeline = { "requests": pipeline_requests + [{"type": "close"}] } # Send the request try: response = req.post(f"{http_url}/v2/pipeline", headers=headers, json=pipeline) response.raise_for_status() result = response.json() logger.debug(f"[{operation_id}] Pipeline executed successfully") return result except Exception as e: logger.error(f"[{operation_id}] Error executing pipeline: {str(e)}") raise def insert_record(table: str, data: Dict[str, Any], operation_id: str = None) -> Optional[int]: """ Insert a record into a table. Args: table (str): The table to insert into. data (Dict[str, Any]): The data to insert. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: Optional[int]: The ID of the inserted record, or None if the insert failed. """ if operation_id is None: operation_id = f"insert_{int(time.time())}" logger.debug(f"[{operation_id}] Inserting record into {table}") # Prepare the SQL and parameters columns = list(data.keys()) placeholders = ["?"] * len(columns) sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})" params = [] for key, value in data.items(): if isinstance(value, str): params.append({"type": "text", "value": value}) elif isinstance(value, int): params.append({"type": "integer", "value": str(value)}) elif isinstance(value, float): params.append({"type": "float", "value": str(value)}) elif value is None: params.append({"type": "null", "value": None}) else: params.append({"type": "text", "value": str(value)}) # Execute the query directly instead of using pipeline try: # Direct query execution is more reliable result = execute_query(sql, params, operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Insert error: {error_msg}") return None # Try to get the last inserted ID with a separate query try: id_result = execute_query("SELECT last_insert_rowid()", operation_id=f"{operation_id}_get_id") last_id = None if "results" in id_result and len(id_result["results"]) > 0: if id_result["results"][0]["type"] == "ok": response = id_result["results"][0]["response"] if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0: row = response["result"]["rows"][0] # Get the value safely if len(row) > 0: cell = row[0] if isinstance(cell, dict) and "value" in cell: try: last_id = int(cell["value"]) logger.info(f"[{operation_id}] Record inserted with ID: {last_id}") return last_id except (ValueError, TypeError) as e: logger.warning(f"[{operation_id}] Error converting ID to integer: {str(e)}") # Try to return the value as is return cell["value"] else: logger.warning(f"[{operation_id}] Unexpected cell format: {cell}") else: logger.warning(f"[{operation_id}] Empty row in last_insert_rowid result") except Exception as e: logger.error(f"[{operation_id}] Error getting last insert ID: {str(e)}") # If we can't get the ID, try to find it by other means logger.warning(f"[{operation_id}] Insert succeeded but couldn't get ID") return None except Exception as e: logger.error(f"[{operation_id}] Error inserting record: {str(e)}") return None def update_record(table: str, data: Dict[str, Any], condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: """ Update a record in a table. Args: table (str): The table to update. data (Dict[str, Any]): The data to update. condition (str): The condition for the update (e.g., "id = ?"). condition_params (List[Dict[str, Any]]): The parameters for the condition. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: bool: True if the update succeeded, False otherwise. """ if operation_id is None: operation_id = f"update_{int(time.time())}" logger.debug(f"[{operation_id}] Updating record in {table}") # Prepare the SQL and parameters set_clauses = [f"{key} = ?" for key in data.keys()] sql = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE {condition}" params = [] for key, value in data.items(): if isinstance(value, str): params.append({"type": "text", "value": value}) elif isinstance(value, int): params.append({"type": "integer", "value": str(value)}) elif isinstance(value, float): params.append({"type": "float", "value": str(value)}) elif value is None: params.append({"type": "null", "value": None}) else: params.append({"type": "text", "value": str(value)}) # Add condition parameters params.extend(condition_params) try: result = execute_query(sql, params, operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Update error: {error_msg}") return False # Check affected rows affected_rows = result["results"][0]["response"]["result"]["affected_row_count"] logger.debug(f"[{operation_id}] Updated {affected_rows} rows") return affected_rows > 0 return False except Exception as e: logger.error(f"[{operation_id}] Error updating record: {str(e)}") return False def delete_record(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: """ Delete a record from a table. Args: table (str): The table to delete from. condition (str): The condition for the delete (e.g., "id = ?"). condition_params (List[Dict[str, Any]]): The parameters for the condition. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: bool: True if the delete succeeded, False otherwise. """ if operation_id is None: operation_id = f"delete_{int(time.time())}" logger.info(f"[{operation_id}] Deleting record from {table}") # Prepare the SQL sql = f"DELETE FROM {table} WHERE {condition}" try: result = execute_query(sql, condition_params, operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Delete error: {error_msg}") return False # Check affected rows affected_rows = result["results"][0]["response"]["result"]["affected_row_count"] logger.info(f"[{operation_id}] Deleted {affected_rows} rows") return affected_rows > 0 return False except Exception as e: logger.error(f"[{operation_id}] Error deleting record: {str(e)}") return False def select_records(table: str, columns: List[str] = None, condition: str = None, condition_params: List[Dict[str, Any]] = None, limit: int = None, offset: int = None, order_by: str = None, operation_id: str = None) -> List[Dict[str, Any]]: """ Select records from a table. Args: table (str): The table to select from. columns (List[str], optional): The columns to select. Defaults to None (all columns). condition (str, optional): The condition for the select. Defaults to None. condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None. limit (int, optional): The maximum number of records to return. Defaults to None. offset (int, optional): The number of records to skip. Defaults to None. order_by (str, optional): The order by clause. Defaults to None. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: List[Dict[str, Any]]: The selected records. """ if operation_id is None: operation_id = f"select_{int(time.time())}" logger.debug(f"[{operation_id}] Selecting records from {table}") # Prepare the SQL cols = "*" if not columns else ", ".join(columns) sql = f"SELECT {cols} FROM {table}" if condition: sql += f" WHERE {condition}" if order_by: sql += f" ORDER BY {order_by}" if limit: sql += f" LIMIT {limit}" if offset: sql += f" OFFSET {offset}" try: result = execute_query(sql, condition_params, operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Select error: {error_msg}") return [] # Extract the records response = result["results"][0]["response"] if "result" in response and "rows" in response["result"]: rows = response["result"]["rows"] cols = response["result"]["cols"] # Convert rows to dictionaries records = [] for row in rows: record = {} for i, col in enumerate(cols): try: # Get column name safely col_name = col.get("name", f"column_{i}") # Get cell data safely cell = row[i] if i < len(row) else {} # Get value and type safely value = cell.get("value") if isinstance(cell, dict) else cell cell_type = cell.get("type") if isinstance(cell, dict) else None # Convert value based on type if cell_type == "integer" and value is not None: try: value = int(value) except (ValueError, TypeError): value = 0 elif cell_type == "float" and value is not None: try: value = float(value) except (ValueError, TypeError): value = 0.0 elif cell_type == "null": value = None # Store the value record[col_name] = value except Exception as e: # Handle any errors logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}") record[f"column_{i}"] = None records.append(record) logger.debug(f"[{operation_id}] Selected {len(records)} records") return records logger.debug(f"[{operation_id}] No records found") return [] except Exception as e: logger.error(f"[{operation_id}] Error selecting records: {str(e)}") return [] def get_record_by_id(table: str, id: int, columns: List[str] = None, operation_id: str = None) -> Optional[Dict[str, Any]]: """ Get a record by ID. Args: table (str): The table to select from. id (int): The ID of the record. columns (List[str], optional): The columns to select. Defaults to None (all columns). operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: Optional[Dict[str, Any]]: The record, or None if not found. """ if operation_id is None: operation_id = f"get_by_id_{int(time.time())}" try: condition = "id = ?" condition_params = [{"type": "integer", "value": str(id)}] records = select_records(table, columns, condition, condition_params, limit=1, operation_id=operation_id) if records and len(records) > 0: logger.debug(f"[{operation_id}] Found record by ID {id}: {records[0]}") return records[0] logger.warning(f"[{operation_id}] No record found with ID {id} in table {table}") return None except Exception as e: logger.error(f"[{operation_id}] Error getting record by ID {id}: {str(e)}") # Try a direct query as a fallback try: logger.info(f"[{operation_id}] Trying direct query as fallback") cols = "*" if not columns else ", ".join(columns) sql = f"SELECT {cols} FROM {table} WHERE id = ? LIMIT 1" result = execute_query(sql, [{"type": "integer", "value": str(id)}], operation_id=f"{operation_id}_fallback") if "results" in result and len(result["results"]) > 0 and result["results"][0]["type"] == "ok": response = result["results"][0]["response"] if "result" in response and "rows" in response["result"] and len(response["result"]["rows"]) > 0: row = response["result"]["rows"][0] cols = response["result"]["cols"] record = {} for i, col in enumerate(cols): try: # Get column name safely col_name = col.get("name", f"column_{i}") # Get cell data safely cell = row[i] if i < len(row) else {} # Get value and type safely value = cell.get("value") if isinstance(cell, dict) else cell cell_type = cell.get("type") if isinstance(cell, dict) else None # Convert value based on type if cell_type == "integer" and value is not None: try: value = int(value) except (ValueError, TypeError): value = 0 elif cell_type == "float" and value is not None: try: value = float(value) except (ValueError, TypeError): value = 0.0 elif cell_type == "null": value = None # Store the value record[col_name] = value except Exception as e: # Handle any errors logger.warning(f"[{operation_id}] Error processing column {i}: {str(e)}") record[f"column_{i}"] = None logger.info(f"[{operation_id}] Found record by ID {id} using fallback: {record}") return record logger.warning(f"[{operation_id}] No record found with ID {id} using fallback") return None except Exception as e2: logger.error(f"[{operation_id}] Error in fallback query: {str(e2)}") return None def count_records(table: str, condition: str = None, condition_params: List[Dict[str, Any]] = None, operation_id: str = None) -> int: """ Count records in a table. Args: table (str): The table to count records in. condition (str, optional): The condition for the count. Defaults to None. condition_params (List[Dict[str, Any]], optional): The parameters for the condition. Defaults to None. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: int: The number of records. """ if operation_id is None: operation_id = f"count_{int(time.time())}" logger.info(f"[{operation_id}] Counting records in {table}") # Prepare the SQL sql = f"SELECT COUNT(*) FROM {table}" if condition: sql += f" WHERE {condition}" try: result = execute_query(sql, condition_params, operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Count error: {error_msg}") return 0 # Extract the count response = result["results"][0]["response"] if "result" in response and "rows" in response["result"] and response["result"]["rows"]: try: row = response["result"]["rows"][0] # Get the value safely if len(row) > 0: cell = row[0] if isinstance(cell, dict) and "value" in cell: try: count = int(cell["value"]) logger.info(f"[{operation_id}] Counted {count} records") return count except (ValueError, TypeError) as e: logger.warning(f"[{operation_id}] Error converting count to integer: {str(e)}") return 0 else: logger.warning(f"[{operation_id}] Unexpected cell format in count: {cell}") return 0 else: logger.warning(f"[{operation_id}] Empty row in count result") return 0 except Exception as e: logger.error(f"[{operation_id}] Error extracting count: {str(e)}") return 0 logger.warning(f"[{operation_id}] Count failed") return 0 except Exception as e: logger.error(f"[{operation_id}] Error counting records: {str(e)}") return 0 def record_exists(table: str, condition: str, condition_params: List[Dict[str, Any]], operation_id: str = None) -> bool: """ Check if a record exists in a table. Args: table (str): The table to check. condition (str): The condition for the check. condition_params (List[Dict[str, Any]]): The parameters for the condition. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: bool: True if the record exists, False otherwise. """ count = count_records(table, condition, condition_params, operation_id) return count > 0 def create_table_if_not_exists(table: str, schema: str, operation_id: str = None) -> bool: """ Create a table if it doesn't exist. Args: table (str): The table to create. schema (str): The schema for the table. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: bool: True if the table was created or already exists, False otherwise. """ if operation_id is None: operation_id = f"create_table_{int(time.time())}" logger.info(f"[{operation_id}] Creating table {table} if not exists") # Prepare the SQL sql = f"CREATE TABLE IF NOT EXISTS {table} ({schema})" try: result = execute_query(sql, operation_id=operation_id) # Check for errors if "results" in result and len(result["results"]) > 0: if result["results"][0]["type"] == "error": error_msg = result["results"][0]["error"]["message"] logger.error(f"[{operation_id}] Create table error: {error_msg}") return False logger.info(f"[{operation_id}] Table {table} created or already exists") return True return False except Exception as e: logger.error(f"[{operation_id}] Error creating table: {str(e)}") return False def execute_transaction(queries: List[Tuple[str, List[Dict[str, Any]]]], operation_id: str = None) -> bool: """ Execute a transaction with multiple queries. Args: queries (List[Tuple[str, List[Dict[str, Any]]]]): The queries to execute. operation_id (str, optional): A unique identifier for the operation. Defaults to None. Returns: bool: True if the transaction succeeded, False otherwise. """ if operation_id is None: operation_id = f"transaction_{int(time.time())}" logger.info(f"[{operation_id}] Executing transaction with {len(queries)} queries") # Prepare the pipeline requests = [{"type": "execute", "stmt": {"sql": "BEGIN"}}] for sql, params in queries: requests.append({ "type": "execute", "stmt": { "sql": sql, "args": params or [] } }) requests.append({"type": "execute", "stmt": {"sql": "COMMIT"}}) try: result = execute_pipeline(requests, operation_id) # Check for errors for i, res in enumerate(result.get("results", [])): if res.get("type") == "error": error_msg = res.get("error", {}).get("message", "Unknown error") logger.error(f"[{operation_id}] Transaction error in query {i}: {error_msg}") # Try to rollback try: execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback") except: pass return False logger.info(f"[{operation_id}] Transaction executed successfully") return True except Exception as e: logger.error(f"[{operation_id}] Error executing transaction: {str(e)}") # Try to rollback try: execute_query("ROLLBACK", operation_id=f"{operation_id}_rollback") except: pass return False