Spaces:
Sleeping
Sleeping
File size: 11,925 Bytes
a419c1b 7ca39e8 a419c1b 7ca39e8 a419c1b 7ca39e8 a419c1b f1777d2 a419c1b 9173e80 a419c1b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# app/error_handler.py
import logging
import traceback
import time
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from datetime import datetime
import functools
import json
class ErrorHandler:
def __init__(self, metrics_calculator=None):
"""Initialize the ErrorHandler with optional metrics calculator."""
self.logger = logging.getLogger(__name__)
self.metrics_calculator = metrics_calculator
# Error registry
self.error_registry = {}
# Fallback registry
self.fallback_registry = {}
# Error count tracking
self.error_counts = {}
# Circuit breaker states
self.circuit_breakers = {}
def register_fallback(self, component_name: str, operation_name: str,
fallback_function: Callable) -> None:
"""
Register a fallback function for a specific component and operation.
"""
key = f"{component_name}:{operation_name}"
self.fallback_registry[key] = fallback_function
self.logger.info(f"Registered fallback for {key}")
def handle_error(self, component_name: str, operation_name: str, error: Exception,
context: Dict[str, Any] = None) -> Tuple[bool, Any]:
"""
Handle an error from a component.
Returns (handled, result) where handled is a boolean indicating if the error was handled,
and result is the result from the fallback function if available.
"""
key = f"{component_name}:{operation_name}"
error_id = f"error_{int(time.time())}_{hash(str(error)) % 10000}"
# Log the error
self.logger.error(f"Error in {key}: {str(error)}")
# Record error details
error_details = {
"component": component_name,
"operation": operation_name,
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback.format_exc(),
"timestamp": datetime.now().isoformat(),
"context": context or {}
}
self.error_registry[error_id] = error_details
# Update error counts
if key not in self.error_counts:
self.error_counts[key] = 0
self.error_counts[key] += 1
# Check if we need to trip the circuit breaker
if self._should_trip_circuit_breaker(key):
self._trip_circuit_breaker(key)
# Check if circuit is open (preventing further calls)
if self._is_circuit_open(key):
self.logger.warning(f"Circuit breaker open for {key}, using fallback")
return self._use_fallback(key, context)
# Check if we have a fallback for this component/operation
if key in self.fallback_registry:
# Use fallback
return self._use_fallback(key, context)
# No fallback available
return False, None
def _use_fallback(self, key: str, context: Dict[str, Any]) -> Tuple[bool, Any]:
"""Use a registered fallback function."""
if key not in self.fallback_registry:
return False, None
fallback_func = self.fallback_registry[key]
try:
result = fallback_func(context)
self.logger.info(f"Used fallback for {key}")
# Track fallback usage if metrics calculator is available
if self.metrics_calculator:
# TODO: Add fallback usage to metrics
pass
return True, result
except Exception as e:
self.logger.error(f"Fallback for {key} also failed: {str(e)}")
return False, None
def _should_trip_circuit_breaker(self, key: str) -> bool:
"""Determine if we should trip the circuit breaker for a component."""
# Get current error count
error_count = self.error_counts.get(key, 0)
# Get circuit breaker state
circuit = self.circuit_breakers.get(key, {
"state": "closed", # closed, open, half-open
"failure_threshold": 5, # Number of failures before opening
"reset_timeout": 300, # Seconds before trying again (5 minutes)
"last_failure_time": None,
"failure_count": 0
})
# If already open, don't need to trip
if circuit["state"] == "open":
return False
# Check if we've hit the threshold
return error_count >= circuit["failure_threshold"]
def _trip_circuit_breaker(self, key: str) -> None:
"""Trip the circuit breaker for a component."""
circuit = self.circuit_breakers.get(key, {
"state": "closed",
"failure_threshold": 5,
"reset_timeout": 300,
"last_failure_time": None,
"failure_count": 0
})
# Update circuit state
circuit["state"] = "open"
circuit["last_failure_time"] = datetime.now().isoformat()
circuit["failure_count"] = self.error_counts.get(key, 0)
# Store updated circuit
self.circuit_breakers[key] = circuit
self.logger.warning(f"Circuit breaker tripped for {key}")
def _is_circuit_open(self, key: str) -> bool:
"""Check if the circuit breaker is open for a component."""
if key not in self.circuit_breakers:
return False
circuit = self.circuit_breakers[key]
# If circuit is closed, it's not open
if circuit["state"] == "closed":
return False
# If circuit is open, check if it's time to try again
if circuit["state"] == "open":
# Get last failure time
last_failure_time = datetime.fromisoformat(circuit["last_failure_time"])
now = datetime.now()
# Check if we've waited long enough to try again
seconds_since_failure = (now - last_failure_time).total_seconds()
if seconds_since_failure >= circuit["reset_timeout"]:
# Time to try again, set to half-open
circuit["state"] = "half-open"
self.circuit_breakers[key] = circuit
self.logger.info(f"Circuit breaker for {key} switched to half-open state")
return False
# Still in timeout period
return True
# If circuit is half-open, allow the call (we're testing if it works now)
return False
def reset_circuit_breaker(self, key: str) -> bool:
"""
Manually reset a circuit breaker.
Returns True if the circuit was reset, False if it wasn't found.
"""
if key not in self.circuit_breakers:
return False
# Reset circuit to closed state
circuit = self.circuit_breakers[key]
circuit["state"] = "closed"
circuit["failure_count"] = 0
self.circuit_breakers[key] = circuit
# Also reset error count
self.error_counts[key] = 0
self.logger.info(f"Circuit breaker for {key} manually reset")
return True
def record_success(self, component_name: str, operation_name: str) -> None:
"""
Record a successful operation, which may reset circuit breakers.
"""
key = f"{component_name}:{operation_name}"
# If the circuit is half-open and we get a success, close it
if key in self.circuit_breakers and self.circuit_breakers[key]["state"] == "half-open":
circuit = self.circuit_breakers[key]
circuit["state"] = "closed"
circuit["failure_count"] = 0
self.circuit_breakers[key] = circuit
self.logger.info(f"Circuit breaker for {key} closed after successful operation")
def get_error_report(self) -> Dict[str, Any]:
"""Generate a report of errors and circuit breaker states."""
return {
"total_errors": sum(self.error_counts.values()),
"error_counts_by_component": self.error_counts,
"circuit_breaker_states": {
key: circuit["state"]
for key, circuit in self.circuit_breakers.items()
},
"recent_errors": {
error_id: {
"component": details["component"],
"operation": details["operation"],
"error_type": details["error_type"],
"timestamp": details["timestamp"]
}
for error_id, details in list(self.error_registry.items())[-10:] # Last 10 errors
}
}
# Decorator for error handling
def with_error_handling(component_name: str, operation_name: str, error_handler=None):
"""
Decorator to add error handling to functions.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
#if error_handler is None:
# if not hasattr(self, 'error_handler') or self.error_handler is None:
# # No error handler, just execute the function
# return func(self, *args, **kwargs)
# #return func(*args, **kwargs)
instance = args[0] if args and hasattr(args[0], '__dict__') else None
# Get the error handler
error_handler = None
if instance and hasattr(instance, 'error_handler'):
error_handler = instance.error_handler
if error_handler is None:
# No error handler, just execute the function
return func(*args, **kwargs)
# try:
# # Execute the function
# result = func(*args, **kwargs)
# # Record success
# error_handler.record_success(component_name, operation_name)
# return result
# except Exception as e:
# # Create context from args and kwargs
# context = {
# "args": [str(arg) for arg in args],
# "kwargs": {k: str(v) for k, v in kwargs.items()}
# }
# # Handle the error
# handled, fallback_result = error_handler.handle_error(
# component_name, operation_name, e, context)
# if handled:
# return fallback_result
# else:
# # Re-raise the exception if not handled
# raise
try:
# Execute the function
result = func(*args, **kwargs)
# Record success
error_handler.record_success(component_name, operation_name)
return result
except Exception as e:
# Create context from args and kwargs
context = {
"args": [str(arg) for arg in args],
"kwargs": {k: str(v) for k, v in kwargs.items()}
}
# Handle the error
handled, fallback_result = error_handler.handle_error(
component_name, operation_name, e, context)
if handled:
return fallback_result
else:
# Re-raise the exception if not handled
raise
return wrapper
return decorator
|