askveracity / utils /api_utils.py
ankanghosh's picture
Upload 21 files
6d11371 verified
"""
API utilities for the Fake News Detector application.
This module provides utilities for handling API calls, rate limiting,
error handling, and exponential backoff for retrying failed requests.
"""
import time
import functools
import random
import logging
import requests
from datetime import datetime, timedelta
from collections import deque
from config import RATE_LIMITS, ERROR_BACKOFF
logger = logging.getLogger("misinformation_detector")
class RateLimiter:
"""
Rate limiter for API calls with support for different APIs.
This class implements a token bucket algorithm for rate limiting,
with support for different rate limits for different APIs.
It also provides exponential backoff for error handling.
"""
def __init__(self):
"""Initialize the rate limiter with configuration from settings."""
# Store rate limits for different APIs
self.limits = {}
# Initialize limits from config
for api_name, limit_info in RATE_LIMITS.items():
self.limits[api_name] = {
"requests": limit_info["requests"],
"period": limit_info["period"],
"timestamps": deque()
}
# Error backoff settings
self.max_retries = ERROR_BACKOFF["max_retries"]
self.initial_backoff = ERROR_BACKOFF["initial_backoff"]
self.backoff_factor = ERROR_BACKOFF["backoff_factor"]
def check_and_update(self, api_name):
"""
Check if request is allowed and update timestamps.
Args:
api_name (str): Name of the API to check
Returns:
tuple: (allowed, wait_time)
- allowed (bool): Whether the request is allowed
- wait_time (float): Time to wait if not allowed
"""
if api_name not in self.limits:
return True, 0 # Unknown API, allow by default
now = datetime.now()
limit_info = self.limits[api_name]
# Remove timestamps older than the period
cutoff = now - timedelta(seconds=limit_info["period"])
while limit_info["timestamps"] and limit_info["timestamps"][0] < cutoff:
limit_info["timestamps"].popleft()
# Check if we're at the rate limit
if len(limit_info["timestamps"]) >= limit_info["requests"]:
# Calculate wait time until oldest timestamp expires
wait_time = (limit_info["timestamps"][0] + timedelta(seconds=limit_info["period"]) - now).total_seconds()
return False, max(0, wait_time)
# Add current timestamp and allow request
limit_info["timestamps"].append(now)
return True, 0
def wait_if_needed(self, api_name):
"""
Wait if rate limit is reached.
Args:
api_name (str): Name of the API to check
Returns:
bool: True if waited, False otherwise
"""
allowed, wait_time = self.check_and_update(api_name)
if not allowed:
logger.info(f"Rate limit reached for {api_name}. Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time + 0.1) # Add a small buffer
return True
return False
def get_backoff_time(self, attempt):
"""
Calculate exponential backoff time with jitter.
Args:
attempt (int): Current attempt number (0-based)
Returns:
float: Backoff time in seconds
"""
backoff = self.initial_backoff * (self.backoff_factor ** attempt)
# Add jitter to prevent thundering herd problem
jitter = random.uniform(0, 0.1 * backoff)
return backoff + jitter
# Create rate limiter instance
rate_limiter = RateLimiter()
# API Error Handler decorator
def api_error_handler(api_name):
"""
Decorator for API calls with error handling and rate limiting.
This decorator handles rate limiting, retries with exponential
backoff, and error handling for API calls.
Args:
api_name (str): Name of the API being called
Returns:
callable: Decorated function
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
# Apply rate limiting - make sure rate_limiter exists and has the method
if hasattr(rate_limiter, 'wait_if_needed'):
rate_limiter.wait_if_needed(api_name)
# Track retries
for attempt in range(rate_limiter.max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code if hasattr(e, 'response') else 0
# Handle specific HTTP errors
if status_code == 429: # Too Many Requests
logger.warning(f"{api_name} rate limit exceeded (429). Attempt {attempt+1}/{rate_limiter.max_retries}")
# Get retry-after header or use exponential backoff
retry_after = e.response.headers.get('Retry-After')
if retry_after and retry_after.isdigit():
wait_time = int(retry_after)
else:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
elif status_code >= 500: # Server errors
logger.warning(f"{api_name} server error ({status_code}). Attempt {attempt+1}/{rate_limiter.max_retries}")
time.sleep(rate_limiter.get_backoff_time(attempt))
elif status_code == 403: # Forbidden - likely API key issue
logger.error(f"{api_name} access forbidden (403). Check API key.")
return None # Don't retry on auth errors
elif status_code == 404: # Not Found
logger.warning(f"{api_name} resource not found (404).")
return None # Don't retry on resource not found
else:
logger.error(f"{api_name} HTTP error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"{api_name} connection error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except requests.exceptions.Timeout as e:
logger.error(f"{api_name} timeout error: {e}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
except Exception as e:
logger.error(f"{api_name} unexpected error: {str(e)}")
if attempt < rate_limiter.max_retries - 1:
wait_time = rate_limiter.get_backoff_time(attempt)
logger.info(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return None
# If we've exhausted all retries
logger.error(f"{api_name} call failed after {rate_limiter.max_retries} attempts")
return None
except Exception as e:
# Catch any unexpected errors in the decorator itself
logger.error(f"{api_name} decorator error: {str(e)}")
return None
return wrapper
return decorator
def safe_json_parse(response, api_name):
"""
Safely parse JSON response with error handling.
Args:
response (requests.Response): Response object to parse
api_name (str): Name of the API for logging
Returns:
dict: Parsed JSON or empty dict on error
"""
try:
return response.json()
except ValueError as e:
logger.error(f"Error parsing {api_name} JSON response: {e}")
logger.debug(f"Response content: {response.text[:500]}...")
return {}