ai_agents_sustainable / app /error_handler.py
Chamin09's picture
Update app/error_handler.py
f1777d2 verified
# app/error_handler.py
import logging
import traceback
import time
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from datetime import datetime
import functools
import json
class ErrorHandler:
def __init__(self, metrics_calculator=None):
"""Initialize the ErrorHandler with optional metrics calculator."""
self.logger = logging.getLogger(__name__)
self.metrics_calculator = metrics_calculator
# Error registry
self.error_registry = {}
# Fallback registry
self.fallback_registry = {}
# Error count tracking
self.error_counts = {}
# Circuit breaker states
self.circuit_breakers = {}
def register_fallback(self, component_name: str, operation_name: str,
fallback_function: Callable) -> None:
"""
Register a fallback function for a specific component and operation.
"""
key = f"{component_name}:{operation_name}"
self.fallback_registry[key] = fallback_function
self.logger.info(f"Registered fallback for {key}")
def handle_error(self, component_name: str, operation_name: str, error: Exception,
context: Dict[str, Any] = None) -> Tuple[bool, Any]:
"""
Handle an error from a component.
Returns (handled, result) where handled is a boolean indicating if the error was handled,
and result is the result from the fallback function if available.
"""
key = f"{component_name}:{operation_name}"
error_id = f"error_{int(time.time())}_{hash(str(error)) % 10000}"
# Log the error
self.logger.error(f"Error in {key}: {str(error)}")
# Record error details
error_details = {
"component": component_name,
"operation": operation_name,
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback.format_exc(),
"timestamp": datetime.now().isoformat(),
"context": context or {}
}
self.error_registry[error_id] = error_details
# Update error counts
if key not in self.error_counts:
self.error_counts[key] = 0
self.error_counts[key] += 1
# Check if we need to trip the circuit breaker
if self._should_trip_circuit_breaker(key):
self._trip_circuit_breaker(key)
# Check if circuit is open (preventing further calls)
if self._is_circuit_open(key):
self.logger.warning(f"Circuit breaker open for {key}, using fallback")
return self._use_fallback(key, context)
# Check if we have a fallback for this component/operation
if key in self.fallback_registry:
# Use fallback
return self._use_fallback(key, context)
# No fallback available
return False, None
def _use_fallback(self, key: str, context: Dict[str, Any]) -> Tuple[bool, Any]:
"""Use a registered fallback function."""
if key not in self.fallback_registry:
return False, None
fallback_func = self.fallback_registry[key]
try:
result = fallback_func(context)
self.logger.info(f"Used fallback for {key}")
# Track fallback usage if metrics calculator is available
if self.metrics_calculator:
# TODO: Add fallback usage to metrics
pass
return True, result
except Exception as e:
self.logger.error(f"Fallback for {key} also failed: {str(e)}")
return False, None
def _should_trip_circuit_breaker(self, key: str) -> bool:
"""Determine if we should trip the circuit breaker for a component."""
# Get current error count
error_count = self.error_counts.get(key, 0)
# Get circuit breaker state
circuit = self.circuit_breakers.get(key, {
"state": "closed", # closed, open, half-open
"failure_threshold": 5, # Number of failures before opening
"reset_timeout": 300, # Seconds before trying again (5 minutes)
"last_failure_time": None,
"failure_count": 0
})
# If already open, don't need to trip
if circuit["state"] == "open":
return False
# Check if we've hit the threshold
return error_count >= circuit["failure_threshold"]
def _trip_circuit_breaker(self, key: str) -> None:
"""Trip the circuit breaker for a component."""
circuit = self.circuit_breakers.get(key, {
"state": "closed",
"failure_threshold": 5,
"reset_timeout": 300,
"last_failure_time": None,
"failure_count": 0
})
# Update circuit state
circuit["state"] = "open"
circuit["last_failure_time"] = datetime.now().isoformat()
circuit["failure_count"] = self.error_counts.get(key, 0)
# Store updated circuit
self.circuit_breakers[key] = circuit
self.logger.warning(f"Circuit breaker tripped for {key}")
def _is_circuit_open(self, key: str) -> bool:
"""Check if the circuit breaker is open for a component."""
if key not in self.circuit_breakers:
return False
circuit = self.circuit_breakers[key]
# If circuit is closed, it's not open
if circuit["state"] == "closed":
return False
# If circuit is open, check if it's time to try again
if circuit["state"] == "open":
# Get last failure time
last_failure_time = datetime.fromisoformat(circuit["last_failure_time"])
now = datetime.now()
# Check if we've waited long enough to try again
seconds_since_failure = (now - last_failure_time).total_seconds()
if seconds_since_failure >= circuit["reset_timeout"]:
# Time to try again, set to half-open
circuit["state"] = "half-open"
self.circuit_breakers[key] = circuit
self.logger.info(f"Circuit breaker for {key} switched to half-open state")
return False
# Still in timeout period
return True
# If circuit is half-open, allow the call (we're testing if it works now)
return False
def reset_circuit_breaker(self, key: str) -> bool:
"""
Manually reset a circuit breaker.
Returns True if the circuit was reset, False if it wasn't found.
"""
if key not in self.circuit_breakers:
return False
# Reset circuit to closed state
circuit = self.circuit_breakers[key]
circuit["state"] = "closed"
circuit["failure_count"] = 0
self.circuit_breakers[key] = circuit
# Also reset error count
self.error_counts[key] = 0
self.logger.info(f"Circuit breaker for {key} manually reset")
return True
def record_success(self, component_name: str, operation_name: str) -> None:
"""
Record a successful operation, which may reset circuit breakers.
"""
key = f"{component_name}:{operation_name}"
# If the circuit is half-open and we get a success, close it
if key in self.circuit_breakers and self.circuit_breakers[key]["state"] == "half-open":
circuit = self.circuit_breakers[key]
circuit["state"] = "closed"
circuit["failure_count"] = 0
self.circuit_breakers[key] = circuit
self.logger.info(f"Circuit breaker for {key} closed after successful operation")
def get_error_report(self) -> Dict[str, Any]:
"""Generate a report of errors and circuit breaker states."""
return {
"total_errors": sum(self.error_counts.values()),
"error_counts_by_component": self.error_counts,
"circuit_breaker_states": {
key: circuit["state"]
for key, circuit in self.circuit_breakers.items()
},
"recent_errors": {
error_id: {
"component": details["component"],
"operation": details["operation"],
"error_type": details["error_type"],
"timestamp": details["timestamp"]
}
for error_id, details in list(self.error_registry.items())[-10:] # Last 10 errors
}
}
# Decorator for error handling
def with_error_handling(component_name: str, operation_name: str, error_handler=None):
"""
Decorator to add error handling to functions.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
#if error_handler is None:
# if not hasattr(self, 'error_handler') or self.error_handler is None:
# # No error handler, just execute the function
# return func(self, *args, **kwargs)
# #return func(*args, **kwargs)
instance = args[0] if args and hasattr(args[0], '__dict__') else None
# Get the error handler
error_handler = None
if instance and hasattr(instance, 'error_handler'):
error_handler = instance.error_handler
if error_handler is None:
# No error handler, just execute the function
return func(*args, **kwargs)
# try:
# # Execute the function
# result = func(*args, **kwargs)
# # Record success
# error_handler.record_success(component_name, operation_name)
# return result
# except Exception as e:
# # Create context from args and kwargs
# context = {
# "args": [str(arg) for arg in args],
# "kwargs": {k: str(v) for k, v in kwargs.items()}
# }
# # Handle the error
# handled, fallback_result = error_handler.handle_error(
# component_name, operation_name, e, context)
# if handled:
# return fallback_result
# else:
# # Re-raise the exception if not handled
# raise
try:
# Execute the function
result = func(*args, **kwargs)
# Record success
error_handler.record_success(component_name, operation_name)
return result
except Exception as e:
# Create context from args and kwargs
context = {
"args": [str(arg) for arg in args],
"kwargs": {k: str(v) for k, v in kwargs.items()}
}
# Handle the error
handled, fallback_result = error_handler.handle_error(
component_name, operation_name, e, context)
if handled:
return fallback_result
else:
# Re-raise the exception if not handled
raise
return wrapper
return decorator