Spaces:
Running
Running
""" | |
API utilities for the Fake News Detector application. | |
This module provides utilities for handling API calls, rate limiting, | |
error handling, and exponential backoff for retrying failed requests. | |
""" | |
import time | |
import functools | |
import random | |
import logging | |
import requests | |
from datetime import datetime, timedelta | |
from collections import deque | |
from config import RATE_LIMITS, ERROR_BACKOFF | |
logger = logging.getLogger("misinformation_detector") | |
class RateLimiter: | |
""" | |
Rate limiter for API calls with support for different APIs. | |
This class implements a token bucket algorithm for rate limiting, | |
with support for different rate limits for different APIs. | |
It also provides exponential backoff for error handling. | |
""" | |
def __init__(self): | |
"""Initialize the rate limiter with configuration from settings.""" | |
# Store rate limits for different APIs | |
self.limits = {} | |
# Initialize limits from config | |
for api_name, limit_info in RATE_LIMITS.items(): | |
self.limits[api_name] = { | |
"requests": limit_info["requests"], | |
"period": limit_info["period"], | |
"timestamps": deque() | |
} | |
# Error backoff settings | |
self.max_retries = ERROR_BACKOFF["max_retries"] | |
self.initial_backoff = ERROR_BACKOFF["initial_backoff"] | |
self.backoff_factor = ERROR_BACKOFF["backoff_factor"] | |
def check_and_update(self, api_name): | |
""" | |
Check if request is allowed and update timestamps. | |
Args: | |
api_name (str): Name of the API to check | |
Returns: | |
tuple: (allowed, wait_time) | |
- allowed (bool): Whether the request is allowed | |
- wait_time (float): Time to wait if not allowed | |
""" | |
if api_name not in self.limits: | |
return True, 0 # Unknown API, allow by default | |
now = datetime.now() | |
limit_info = self.limits[api_name] | |
# Remove timestamps older than the period | |
cutoff = now - timedelta(seconds=limit_info["period"]) | |
while limit_info["timestamps"] and limit_info["timestamps"][0] < cutoff: | |
limit_info["timestamps"].popleft() | |
# Check if we're at the rate limit | |
if len(limit_info["timestamps"]) >= limit_info["requests"]: | |
# Calculate wait time until oldest timestamp expires | |
wait_time = (limit_info["timestamps"][0] + timedelta(seconds=limit_info["period"]) - now).total_seconds() | |
return False, max(0, wait_time) | |
# Add current timestamp and allow request | |
limit_info["timestamps"].append(now) | |
return True, 0 | |
def wait_if_needed(self, api_name): | |
""" | |
Wait if rate limit is reached. | |
Args: | |
api_name (str): Name of the API to check | |
Returns: | |
bool: True if waited, False otherwise | |
""" | |
allowed, wait_time = self.check_and_update(api_name) | |
if not allowed: | |
logger.info(f"Rate limit reached for {api_name}. Waiting {wait_time:.2f} seconds...") | |
time.sleep(wait_time + 0.1) # Add a small buffer | |
return True | |
return False | |
def get_backoff_time(self, attempt): | |
""" | |
Calculate exponential backoff time with jitter. | |
Args: | |
attempt (int): Current attempt number (0-based) | |
Returns: | |
float: Backoff time in seconds | |
""" | |
backoff = self.initial_backoff * (self.backoff_factor ** attempt) | |
# Add jitter to prevent thundering herd problem | |
jitter = random.uniform(0, 0.1 * backoff) | |
return backoff + jitter | |
# Create rate limiter instance | |
rate_limiter = RateLimiter() | |
# API Error Handler decorator | |
def api_error_handler(api_name): | |
""" | |
Decorator for API calls with error handling and rate limiting. | |
This decorator handles rate limiting, retries with exponential | |
backoff, and error handling for API calls. | |
Args: | |
api_name (str): Name of the API being called | |
Returns: | |
callable: Decorated function | |
""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
try: | |
# Apply rate limiting - make sure rate_limiter exists and has the method | |
if hasattr(rate_limiter, 'wait_if_needed'): | |
rate_limiter.wait_if_needed(api_name) | |
# Track retries | |
for attempt in range(rate_limiter.max_retries): | |
try: | |
return func(*args, **kwargs) | |
except requests.exceptions.HTTPError as e: | |
status_code = e.response.status_code if hasattr(e, 'response') else 0 | |
# Handle specific HTTP errors | |
if status_code == 429: # Too Many Requests | |
logger.warning(f"{api_name} rate limit exceeded (429). Attempt {attempt+1}/{rate_limiter.max_retries}") | |
# Get retry-after header or use exponential backoff | |
retry_after = e.response.headers.get('Retry-After') | |
if retry_after and retry_after.isdigit(): | |
wait_time = int(retry_after) | |
else: | |
wait_time = rate_limiter.get_backoff_time(attempt) | |
logger.info(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
elif status_code >= 500: # Server errors | |
logger.warning(f"{api_name} server error ({status_code}). Attempt {attempt+1}/{rate_limiter.max_retries}") | |
time.sleep(rate_limiter.get_backoff_time(attempt)) | |
elif status_code == 403: # Forbidden - likely API key issue | |
logger.error(f"{api_name} access forbidden (403). Check API key.") | |
return None # Don't retry on auth errors | |
elif status_code == 404: # Not Found | |
logger.warning(f"{api_name} resource not found (404).") | |
return None # Don't retry on resource not found | |
else: | |
logger.error(f"{api_name} HTTP error: {e}") | |
if attempt < rate_limiter.max_retries - 1: | |
wait_time = rate_limiter.get_backoff_time(attempt) | |
logger.info(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
else: | |
return None | |
except requests.exceptions.ConnectionError as e: | |
logger.error(f"{api_name} connection error: {e}") | |
if attempt < rate_limiter.max_retries - 1: | |
wait_time = rate_limiter.get_backoff_time(attempt) | |
logger.info(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
else: | |
return None | |
except requests.exceptions.Timeout as e: | |
logger.error(f"{api_name} timeout error: {e}") | |
if attempt < rate_limiter.max_retries - 1: | |
wait_time = rate_limiter.get_backoff_time(attempt) | |
logger.info(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
else: | |
return None | |
except Exception as e: | |
logger.error(f"{api_name} unexpected error: {str(e)}") | |
if attempt < rate_limiter.max_retries - 1: | |
wait_time = rate_limiter.get_backoff_time(attempt) | |
logger.info(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
else: | |
return None | |
# If we've exhausted all retries | |
logger.error(f"{api_name} call failed after {rate_limiter.max_retries} attempts") | |
return None | |
except Exception as e: | |
# Catch any unexpected errors in the decorator itself | |
logger.error(f"{api_name} decorator error: {str(e)}") | |
return None | |
return wrapper | |
return decorator | |
def safe_json_parse(response, api_name): | |
""" | |
Safely parse JSON response with error handling. | |
Args: | |
response (requests.Response): Response object to parse | |
api_name (str): Name of the API for logging | |
Returns: | |
dict: Parsed JSON or empty dict on error | |
""" | |
try: | |
return response.json() | |
except ValueError as e: | |
logger.error(f"Error parsing {api_name} JSON response: {e}") | |
logger.debug(f"Response content: {response.text[:500]}...") | |
return {} |