Spaces:
Sleeping
Sleeping
# app/error_handler.py | |
import logging | |
import traceback | |
import time | |
from typing import Dict, List, Optional, Tuple, Union, Any, Callable | |
from datetime import datetime | |
import functools | |
import json | |
class ErrorHandler: | |
def __init__(self, metrics_calculator=None): | |
"""Initialize the ErrorHandler with optional metrics calculator.""" | |
self.logger = logging.getLogger(__name__) | |
self.metrics_calculator = metrics_calculator | |
# Error registry | |
self.error_registry = {} | |
# Fallback registry | |
self.fallback_registry = {} | |
# Error count tracking | |
self.error_counts = {} | |
# Circuit breaker states | |
self.circuit_breakers = {} | |
def register_fallback(self, component_name: str, operation_name: str, | |
fallback_function: Callable) -> None: | |
""" | |
Register a fallback function for a specific component and operation. | |
""" | |
key = f"{component_name}:{operation_name}" | |
self.fallback_registry[key] = fallback_function | |
self.logger.info(f"Registered fallback for {key}") | |
def handle_error(self, component_name: str, operation_name: str, error: Exception, | |
context: Dict[str, Any] = None) -> Tuple[bool, Any]: | |
""" | |
Handle an error from a component. | |
Returns (handled, result) where handled is a boolean indicating if the error was handled, | |
and result is the result from the fallback function if available. | |
""" | |
key = f"{component_name}:{operation_name}" | |
error_id = f"error_{int(time.time())}_{hash(str(error)) % 10000}" | |
# Log the error | |
self.logger.error(f"Error in {key}: {str(error)}") | |
# Record error details | |
error_details = { | |
"component": component_name, | |
"operation": operation_name, | |
"error_type": type(error).__name__, | |
"error_message": str(error), | |
"traceback": traceback.format_exc(), | |
"timestamp": datetime.now().isoformat(), | |
"context": context or {} | |
} | |
self.error_registry[error_id] = error_details | |
# Update error counts | |
if key not in self.error_counts: | |
self.error_counts[key] = 0 | |
self.error_counts[key] += 1 | |
# Check if we need to trip the circuit breaker | |
if self._should_trip_circuit_breaker(key): | |
self._trip_circuit_breaker(key) | |
# Check if circuit is open (preventing further calls) | |
if self._is_circuit_open(key): | |
self.logger.warning(f"Circuit breaker open for {key}, using fallback") | |
return self._use_fallback(key, context) | |
# Check if we have a fallback for this component/operation | |
if key in self.fallback_registry: | |
# Use fallback | |
return self._use_fallback(key, context) | |
# No fallback available | |
return False, None | |
def _use_fallback(self, key: str, context: Dict[str, Any]) -> Tuple[bool, Any]: | |
"""Use a registered fallback function.""" | |
if key not in self.fallback_registry: | |
return False, None | |
fallback_func = self.fallback_registry[key] | |
try: | |
result = fallback_func(context) | |
self.logger.info(f"Used fallback for {key}") | |
# Track fallback usage if metrics calculator is available | |
if self.metrics_calculator: | |
# TODO: Add fallback usage to metrics | |
pass | |
return True, result | |
except Exception as e: | |
self.logger.error(f"Fallback for {key} also failed: {str(e)}") | |
return False, None | |
def _should_trip_circuit_breaker(self, key: str) -> bool: | |
"""Determine if we should trip the circuit breaker for a component.""" | |
# Get current error count | |
error_count = self.error_counts.get(key, 0) | |
# Get circuit breaker state | |
circuit = self.circuit_breakers.get(key, { | |
"state": "closed", # closed, open, half-open | |
"failure_threshold": 5, # Number of failures before opening | |
"reset_timeout": 300, # Seconds before trying again (5 minutes) | |
"last_failure_time": None, | |
"failure_count": 0 | |
}) | |
# If already open, don't need to trip | |
if circuit["state"] == "open": | |
return False | |
# Check if we've hit the threshold | |
return error_count >= circuit["failure_threshold"] | |
def _trip_circuit_breaker(self, key: str) -> None: | |
"""Trip the circuit breaker for a component.""" | |
circuit = self.circuit_breakers.get(key, { | |
"state": "closed", | |
"failure_threshold": 5, | |
"reset_timeout": 300, | |
"last_failure_time": None, | |
"failure_count": 0 | |
}) | |
# Update circuit state | |
circuit["state"] = "open" | |
circuit["last_failure_time"] = datetime.now().isoformat() | |
circuit["failure_count"] = self.error_counts.get(key, 0) | |
# Store updated circuit | |
self.circuit_breakers[key] = circuit | |
self.logger.warning(f"Circuit breaker tripped for {key}") | |
def _is_circuit_open(self, key: str) -> bool: | |
"""Check if the circuit breaker is open for a component.""" | |
if key not in self.circuit_breakers: | |
return False | |
circuit = self.circuit_breakers[key] | |
# If circuit is closed, it's not open | |
if circuit["state"] == "closed": | |
return False | |
# If circuit is open, check if it's time to try again | |
if circuit["state"] == "open": | |
# Get last failure time | |
last_failure_time = datetime.fromisoformat(circuit["last_failure_time"]) | |
now = datetime.now() | |
# Check if we've waited long enough to try again | |
seconds_since_failure = (now - last_failure_time).total_seconds() | |
if seconds_since_failure >= circuit["reset_timeout"]: | |
# Time to try again, set to half-open | |
circuit["state"] = "half-open" | |
self.circuit_breakers[key] = circuit | |
self.logger.info(f"Circuit breaker for {key} switched to half-open state") | |
return False | |
# Still in timeout period | |
return True | |
# If circuit is half-open, allow the call (we're testing if it works now) | |
return False | |
def reset_circuit_breaker(self, key: str) -> bool: | |
""" | |
Manually reset a circuit breaker. | |
Returns True if the circuit was reset, False if it wasn't found. | |
""" | |
if key not in self.circuit_breakers: | |
return False | |
# Reset circuit to closed state | |
circuit = self.circuit_breakers[key] | |
circuit["state"] = "closed" | |
circuit["failure_count"] = 0 | |
self.circuit_breakers[key] = circuit | |
# Also reset error count | |
self.error_counts[key] = 0 | |
self.logger.info(f"Circuit breaker for {key} manually reset") | |
return True | |
def record_success(self, component_name: str, operation_name: str) -> None: | |
""" | |
Record a successful operation, which may reset circuit breakers. | |
""" | |
key = f"{component_name}:{operation_name}" | |
# If the circuit is half-open and we get a success, close it | |
if key in self.circuit_breakers and self.circuit_breakers[key]["state"] == "half-open": | |
circuit = self.circuit_breakers[key] | |
circuit["state"] = "closed" | |
circuit["failure_count"] = 0 | |
self.circuit_breakers[key] = circuit | |
self.logger.info(f"Circuit breaker for {key} closed after successful operation") | |
def get_error_report(self) -> Dict[str, Any]: | |
"""Generate a report of errors and circuit breaker states.""" | |
return { | |
"total_errors": sum(self.error_counts.values()), | |
"error_counts_by_component": self.error_counts, | |
"circuit_breaker_states": { | |
key: circuit["state"] | |
for key, circuit in self.circuit_breakers.items() | |
}, | |
"recent_errors": { | |
error_id: { | |
"component": details["component"], | |
"operation": details["operation"], | |
"error_type": details["error_type"], | |
"timestamp": details["timestamp"] | |
} | |
for error_id, details in list(self.error_registry.items())[-10:] # Last 10 errors | |
} | |
} | |
# Decorator for error handling | |
def with_error_handling(component_name: str, operation_name: str, error_handler=None): | |
""" | |
Decorator to add error handling to functions. | |
""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
#if error_handler is None: | |
# if not hasattr(self, 'error_handler') or self.error_handler is None: | |
# # No error handler, just execute the function | |
# return func(self, *args, **kwargs) | |
# #return func(*args, **kwargs) | |
instance = args[0] if args and hasattr(args[0], '__dict__') else None | |
# Get the error handler | |
error_handler = None | |
if instance and hasattr(instance, 'error_handler'): | |
error_handler = instance.error_handler | |
if error_handler is None: | |
# No error handler, just execute the function | |
return func(*args, **kwargs) | |
# try: | |
# # Execute the function | |
# result = func(*args, **kwargs) | |
# # Record success | |
# error_handler.record_success(component_name, operation_name) | |
# return result | |
# except Exception as e: | |
# # Create context from args and kwargs | |
# context = { | |
# "args": [str(arg) for arg in args], | |
# "kwargs": {k: str(v) for k, v in kwargs.items()} | |
# } | |
# # Handle the error | |
# handled, fallback_result = error_handler.handle_error( | |
# component_name, operation_name, e, context) | |
# if handled: | |
# return fallback_result | |
# else: | |
# # Re-raise the exception if not handled | |
# raise | |
try: | |
# Execute the function | |
result = func(*args, **kwargs) | |
# Record success | |
error_handler.record_success(component_name, operation_name) | |
return result | |
except Exception as e: | |
# Create context from args and kwargs | |
context = { | |
"args": [str(arg) for arg in args], | |
"kwargs": {k: str(v) for k, v in kwargs.items()} | |
} | |
# Handle the error | |
handled, fallback_result = error_handler.handle_error( | |
component_name, operation_name, e, context) | |
if handled: | |
return fallback_result | |
else: | |
# Re-raise the exception if not handled | |
raise | |
return wrapper | |
return decorator | |