Spaces:
Running
Running
File size: 9,315 Bytes
6d11371 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
"""
API utilities for the Fake News Detector application.
This module provides utilities for handling API calls, rate limiting,
error handling, and exponential backoff for retrying failed requests.
"""
import time
import functools
import random
import logging
import requests
from datetime import datetime, timedelta
from collections import deque
from config import RATE_LIMITS, ERROR_BACKOFF
logger = logging.getLogger("misinformation_detector")
class RateLimiter:
"""
Rate limiter for API calls with support for different APIs.
This class implements a token bucket algorithm for rate limiting,
with support for different rate limits for different APIs.
It also provides exponential backoff for error handling.
"""
def __init__(self):
"""Initialize the rate limiter with configuration from settings."""
# Store rate limits for different APIs
self.limits = {}
# Initialize limits from config
for api_name, limit_info in RATE_LIMITS.items():
self.limits[api_name] = {
"requests": limit_info["requests"],
"period": limit_info["period"],
"timestamps": deque()
}
# Error backoff settings
self.max_retries = ERROR_BACKOFF["max_retries"]
self.initial_backoff = ERROR_BACKOFF["initial_backoff"]
self.backoff_factor = ERROR_BACKOFF["backoff_factor"]
def check_and_update(self, api_name):
"""
Check if request is allowed and update timestamps.
Args:
api_name (str): Name of the API to check
Returns:
tuple: (allowed, wait_time)
- allowed (bool): Whether the request is allowed
- wait_time (float): Time to wait if not allowed
"""
if api_name not in self.limits:
return True, 0 # Unknown API, allow by default
now = datetime.now()
limit_info = self.limits[api_name]
# Remove timestamps older than the period
cutoff = now - timedelta(seconds=limit_info["period"])
while limit_info["timestamps"] and limit_info["timestamps"][0] < cutoff:
limit_info["timestamps"].popleft()
# Check if we're at the rate limit
if len(limit_info["timestamps"]) >= limit_info["requests"]:
# Calculate wait time until oldest timestamp expires
wait_time = (limit_info["timestamps"][0] + timedelta(seconds=limit_info["period"]) - now).total_seconds()
return False, max(0, wait_time)
# Add current timestamp and allow request
limit_info["timestamps"].append(now)
return True, 0
def wait_if_needed(self, api_name):
"""
Wait if rate limit is reached.
Args:
api_name (str): Name of the API to check
Returns:
bool: True if waited, False otherwise
"""
allowed, wait_time = self.check_and_update(api_name)
if not allowed:
logger.info(f"Rate limit reached for {api_name}. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time + 0.1) # Add a small buffer
return True
return False
def get_backoff_time(self, attempt):
"""
Calculate exponential backoff time with jitter.
Args:
attempt (int): Current attempt number (0-based)
Returns:
float: Backoff time in seconds
"""
backoff = self.initial_backoff * (self.backoff_factor ** attempt)
# Add jitter to prevent thundering herd problem
jitter = random.uniform(0, 0.1 * backoff)
return backoff + jitter
# Create rate limiter instance
rate_limiter = RateLimiter()
# API Error Handler decorator
def api_error_handler(api_name):
"""
Decorator for API calls with error handling and rate limiting.
This decorator handles rate limiting, retries with exponential
backoff, and error handling for API calls.
Args:
api_name (str): Name of the API being called
Returns:
callable: Decorated function
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
# Apply rate limiting - make sure rate_limiter exists and has the method
if hasattr(rate_limiter, 'wait_if_needed'):
rate_limiter.wait_if_needed(api_name)
# Track retries
for attempt in range(rate_limiter.max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code if hasattr(e, 'response') else 0
# Handle specific HTTP errors
if status_code == 429: # Too Many Requests
logger.warning(f"{api_name} rate limit exceeded (429). Attempt {attempt+1}/{rate_limiter.max_retries}")
# Get retry-after header or use exponential backoff
retry_after = e.response.headers.get('Retry-After')
if retry_after and retry_after.isdigit():
wait_time = int(retry_after)
else:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
elif status_code >= 500: # Server errors
logger.warning(f"{api_name} server error ({status_code}). Attempt {attempt+1}/{rate_limiter.max_retries}")
time.sleep(rate_limiter.get_backoff_time(attempt))
elif status_code == 403: # Forbidden - likely API key issue
logger.error(f"{api_name} access forbidden (403). Check API key.")
return None # Don't retry on auth errors
elif status_code == 404: # Not Found
logger.warning(f"{api_name} resource not found (404).")
return None # Don't retry on resource not found
else:
logger.error(f"{api_name} HTTP error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"{api_name} connection error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except requests.exceptions.Timeout as e:
logger.error(f"{api_name} timeout error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except Exception as e:
logger.error(f"{api_name} unexpected error: {str(e)}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
# If we've exhausted all retries
logger.error(f"{api_name} call failed after {rate_limiter.max_retries} attempts")
return None
except Exception as e:
# Catch any unexpected errors in the decorator itself
logger.error(f"{api_name} decorator error: {str(e)}")
return None
return wrapper
return decorator
def safe_json_parse(response, api_name):
"""
Safely parse JSON response with error handling.
Args:
response (requests.Response): Response object to parse
api_name (str): Name of the API for logging
Returns:
dict: Parsed JSON or empty dict on error
"""
try:
return response.json()
except ValueError as e:
logger.error(f"Error parsing {api_name} JSON response: {e}")
logger.debug(f"Response content: {response.text[:500]}...")
return {} |