# app/error_handler.py import logging import traceback import time from typing import Dict, List, Optional, Tuple, Union, Any, Callable from datetime import datetime import functools import json class ErrorHandler: def __init__(self, metrics_calculator=None): """Initialize the ErrorHandler with optional metrics calculator.""" self.logger = logging.getLogger(__name__) self.metrics_calculator = metrics_calculator # Error registry self.error_registry = {} # Fallback registry self.fallback_registry = {} # Error count tracking self.error_counts = {} # Circuit breaker states self.circuit_breakers = {} def register_fallback(self, component_name: str, operation_name: str, fallback_function: Callable) -> None: """ Register a fallback function for a specific component and operation. """ key = f"{component_name}:{operation_name}" self.fallback_registry[key] = fallback_function self.logger.info(f"Registered fallback for {key}") def handle_error(self, component_name: str, operation_name: str, error: Exception, context: Dict[str, Any] = None) -> Tuple[bool, Any]: """ Handle an error from a component. Returns (handled, result) where handled is a boolean indicating if the error was handled, and result is the result from the fallback function if available. """ key = f"{component_name}:{operation_name}" error_id = f"error_{int(time.time())}_{hash(str(error)) % 10000}" # Log the error self.logger.error(f"Error in {key}: {str(error)}") # Record error details error_details = { "component": component_name, "operation": operation_name, "error_type": type(error).__name__, "error_message": str(error), "traceback": traceback.format_exc(), "timestamp": datetime.now().isoformat(), "context": context or {} } self.error_registry[error_id] = error_details # Update error counts if key not in self.error_counts: self.error_counts[key] = 0 self.error_counts[key] += 1 # Check if we need to trip the circuit breaker if self._should_trip_circuit_breaker(key): self._trip_circuit_breaker(key) # Check if circuit is open (preventing further calls) if self._is_circuit_open(key): self.logger.warning(f"Circuit breaker open for {key}, using fallback") return self._use_fallback(key, context) # Check if we have a fallback for this component/operation if key in self.fallback_registry: # Use fallback return self._use_fallback(key, context) # No fallback available return False, None def _use_fallback(self, key: str, context: Dict[str, Any]) -> Tuple[bool, Any]: """Use a registered fallback function.""" if key not in self.fallback_registry: return False, None fallback_func = self.fallback_registry[key] try: result = fallback_func(context) self.logger.info(f"Used fallback for {key}") # Track fallback usage if metrics calculator is available if self.metrics_calculator: # TODO: Add fallback usage to metrics pass return True, result except Exception as e: self.logger.error(f"Fallback for {key} also failed: {str(e)}") return False, None def _should_trip_circuit_breaker(self, key: str) -> bool: """Determine if we should trip the circuit breaker for a component.""" # Get current error count error_count = self.error_counts.get(key, 0) # Get circuit breaker state circuit = self.circuit_breakers.get(key, { "state": "closed", # closed, open, half-open "failure_threshold": 5, # Number of failures before opening "reset_timeout": 300, # Seconds before trying again (5 minutes) "last_failure_time": None, "failure_count": 0 }) # If already open, don't need to trip if circuit["state"] == "open": return False # Check if we've hit the threshold return error_count >= circuit["failure_threshold"] def _trip_circuit_breaker(self, key: str) -> None: """Trip the circuit breaker for a component.""" circuit = self.circuit_breakers.get(key, { "state": "closed", "failure_threshold": 5, "reset_timeout": 300, "last_failure_time": None, "failure_count": 0 }) # Update circuit state circuit["state"] = "open" circuit["last_failure_time"] = datetime.now().isoformat() circuit["failure_count"] = self.error_counts.get(key, 0) # Store updated circuit self.circuit_breakers[key] = circuit self.logger.warning(f"Circuit breaker tripped for {key}") def _is_circuit_open(self, key: str) -> bool: """Check if the circuit breaker is open for a component.""" if key not in self.circuit_breakers: return False circuit = self.circuit_breakers[key] # If circuit is closed, it's not open if circuit["state"] == "closed": return False # If circuit is open, check if it's time to try again if circuit["state"] == "open": # Get last failure time last_failure_time = datetime.fromisoformat(circuit["last_failure_time"]) now = datetime.now() # Check if we've waited long enough to try again seconds_since_failure = (now - last_failure_time).total_seconds() if seconds_since_failure >= circuit["reset_timeout"]: # Time to try again, set to half-open circuit["state"] = "half-open" self.circuit_breakers[key] = circuit self.logger.info(f"Circuit breaker for {key} switched to half-open state") return False # Still in timeout period return True # If circuit is half-open, allow the call (we're testing if it works now) return False def reset_circuit_breaker(self, key: str) -> bool: """ Manually reset a circuit breaker. Returns True if the circuit was reset, False if it wasn't found. """ if key not in self.circuit_breakers: return False # Reset circuit to closed state circuit = self.circuit_breakers[key] circuit["state"] = "closed" circuit["failure_count"] = 0 self.circuit_breakers[key] = circuit # Also reset error count self.error_counts[key] = 0 self.logger.info(f"Circuit breaker for {key} manually reset") return True def record_success(self, component_name: str, operation_name: str) -> None: """ Record a successful operation, which may reset circuit breakers. """ key = f"{component_name}:{operation_name}" # If the circuit is half-open and we get a success, close it if key in self.circuit_breakers and self.circuit_breakers[key]["state"] == "half-open": circuit = self.circuit_breakers[key] circuit["state"] = "closed" circuit["failure_count"] = 0 self.circuit_breakers[key] = circuit self.logger.info(f"Circuit breaker for {key} closed after successful operation") def get_error_report(self) -> Dict[str, Any]: """Generate a report of errors and circuit breaker states.""" return { "total_errors": sum(self.error_counts.values()), "error_counts_by_component": self.error_counts, "circuit_breaker_states": { key: circuit["state"] for key, circuit in self.circuit_breakers.items() }, "recent_errors": { error_id: { "component": details["component"], "operation": details["operation"], "error_type": details["error_type"], "timestamp": details["timestamp"] } for error_id, details in list(self.error_registry.items())[-10:] # Last 10 errors } } # Decorator for error handling def with_error_handling(component_name: str, operation_name: str, error_handler=None): """ Decorator to add error handling to functions. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): #if error_handler is None: # if not hasattr(self, 'error_handler') or self.error_handler is None: # # No error handler, just execute the function # return func(self, *args, **kwargs) # #return func(*args, **kwargs) instance = args[0] if args and hasattr(args[0], '__dict__') else None # Get the error handler error_handler = None if instance and hasattr(instance, 'error_handler'): error_handler = instance.error_handler if error_handler is None: # No error handler, just execute the function return func(*args, **kwargs) # try: # # Execute the function # result = func(*args, **kwargs) # # Record success # error_handler.record_success(component_name, operation_name) # return result # except Exception as e: # # Create context from args and kwargs # context = { # "args": [str(arg) for arg in args], # "kwargs": {k: str(v) for k, v in kwargs.items()} # } # # Handle the error # handled, fallback_result = error_handler.handle_error( # component_name, operation_name, e, context) # if handled: # return fallback_result # else: # # Re-raise the exception if not handled # raise try: # Execute the function result = func(*args, **kwargs) # Record success error_handler.record_success(component_name, operation_name) return result except Exception as e: # Create context from args and kwargs context = { "args": [str(arg) for arg in args], "kwargs": {k: str(v) for k, v in kwargs.items()} } # Handle the error handled, fallback_result = error_handler.handle_error( component_name, operation_name, e, context) if handled: return fallback_result else: # Re-raise the exception if not handled raise return wrapper return decorator