import requests import json import time import logging from datetime import datetime import pandas as pd from typing import Dict, List, Optional, Union, Any class ArbiscanClient: """ Client to interact with the Arbiscan API for fetching on-chain data from Arbitrum """ def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://api.arbiscan.io/api" self.rate_limit_delay = 0.2 # Delay between API calls to avoid rate limiting (200ms) # Add caching to improve performance self._transaction_cache = {} self._last_api_call_time = 0 # Configure debug logging - set to True for verbose output, False for minimal output self.verbose_debug = False def _make_request(self, params: Dict[str, str]) -> Dict[str, Any]: """ Make a request to the Arbiscan API with rate limiting """ params["apikey"] = self.api_key # Implement rate limiting current_time = time.time() time_since_last_call = current_time - self._last_api_call_time if time_since_last_call < self.rate_limit_delay: time.sleep(self.rate_limit_delay - time_since_last_call) self._last_api_call_time = time.time() try: # Log the request details but only in verbose mode if self.verbose_debug: debug_params = params.copy() debug_params.pop("apikey", None) logging.debug(f"API Request: {self.base_url}") logging.debug(f"Params: {json.dumps(debug_params, indent=2)}") response = requests.get(self.base_url, params=params) # Print response status and URL only in verbose mode if self.verbose_debug: logging.debug(f"Response Status: {response.status_code}") logging.debug(f"Full URL: {response.url.replace(self.api_key, 'API_KEY_REDACTED')}") response.raise_for_status() # Parse the JSON response json_data = response.json() # Log the response structure but only in verbose mode if self.verbose_debug: result_preview = str(json_data.get('result', ''))[:100] + '...' if len(str(json_data.get('result', ''))) > 100 else str(json_data.get('result', '')) logging.debug(f"Response Status: {json_data.get('status')}") logging.debug(f"Response Message: {json_data.get('message', 'No message')}") logging.debug(f"Result Preview: {result_preview}") # Check for API-level errors in the response status = json_data.get('status') message = json_data.get('message', 'No message') if status == '0' and message != 'No transactions found': logging.warning(f"API Error: {message}") return json_data except requests.exceptions.HTTPError as e: logging.error(f"HTTP Error in API Request: {e.response.status_code}") raise except requests.exceptions.ConnectionError as e: logging.error(f"Connection Error in API Request: {str(e)}") raise except requests.exceptions.Timeout as e: logging.error(f"Timeout in API Request: {str(e)}") raise except requests.exceptions.RequestException as e: logging.error(f"API Request failed: {str(e)}") print(f"ERROR - URL: {self.base_url}") print(f"ERROR - Method: {params.get('module')}/{params.get('action')}") return {"status": "0", "message": f"Error: {str(e)}", "result": []} def get_eth_balance(self, address: str) -> float: """ Get the ETH balance of an address Args: address: Wallet address Returns: ETH balance as a float """ params = { "module": "account", "action": "balance", "address": address, "tag": "latest" } result = self._make_request(params) if result.get("status") == "1": # Convert wei to ETH wei_balance = int(result.get("result", "0")) eth_balance = wei_balance / 10**18 return eth_balance else: return 0.0 def get_token_balance(self, address: str, token_address: str) -> float: """ Get the token balance of an address for a specific token Args: address: Wallet address token_address: Token contract address Returns: Token balance as a float """ params = { "module": "account", "action": "tokenbalance", "address": address, "contractaddress": token_address, "tag": "latest" } result = self._make_request(params) if result.get("status") == "1": # Get token decimals and convert to proper amount decimals = self.get_token_decimals(token_address) raw_balance = int(result.get("result", "0")) token_balance = raw_balance / 10**decimals return token_balance else: return 0.0 def get_token_decimals(self, token_address: str) -> int: """ Get the number of decimals for a token Args: token_address: Token contract address Returns: Number of decimals (default: 18) """ params = { "module": "token", "action": "getToken", "contractaddress": token_address } result = self._make_request(params) if result.get("status") == "1": token_info = result.get("result", {}) return int(token_info.get("divisor", "18")) else: # Default to 18 decimals (most ERC-20 tokens) return 18 def get_token_transfers(self, address: str, contract_address: Optional[str] = None, start_block: int = 0, end_block: int = 99999999, page: int = 1, offset: int = 100, sort: str = "desc") -> List[Dict[str, Any]]: """ Get token transfers for an address Args: address: Wallet address contract_address: Optional token contract address to filter by start_block: Starting block number end_block: Ending block number page: Page number offset: Number of results per page sort: Sort order ("asc" or "desc") Returns: List of token transfers """ params = { "module": "account", "action": "tokentx", "address": address, "startblock": str(start_block), "endblock": str(end_block), "page": str(page), "offset": str(offset), "sort": sort } # Add contract address if specified if contract_address: params["contractaddress"] = contract_address result = self._make_request(params) if result.get("status") == "1": return result.get("result", []) else: message = result.get("message", "Unknown error") if "No transactions found" in message: return [] else: logging.warning(f"Error fetching token transfers: {message}") return [] def fetch_all_token_transfers(self, address: str, contract_address: Optional[str] = None, start_block: int = 0, end_block: int = 99999999, max_pages: int = 10) -> List[Dict[str, Any]]: """ Fetch all token transfers for an address, paginating through results Args: address: Wallet address contract_address: Optional token contract address to filter by start_block: Starting block number end_block: Ending block number max_pages: Maximum number of pages to fetch Returns: List of all token transfers """ all_transfers = [] offset = 100 # Results per page (API limit) for page in range(1, max_pages + 1): try: transfers = self.get_token_transfers( address=address, contract_address=contract_address, start_block=start_block, end_block=end_block, page=page, offset=offset ) # No more transfers, break the loop if not transfers: break all_transfers.extend(transfers) # If we got fewer results than the offset, we've reached the end if len(transfers) < offset: break except Exception as e: logging.error(f"Error fetching page {page} of token transfers: {str(e)}") break return all_transfers def fetch_whale_transactions(self, addresses: List[str], token_address: Optional[str] = None, min_token_amount: Optional[float] = None, min_usd_value: Optional[float] = None, start_block: int = 0, end_block: int = 99999999, max_pages: int = 10) -> pd.DataFrame: """ Fetch whale transactions for a list of addresses Args: addresses: List of wallet addresses token_address: Optional token contract address to filter by min_token_amount: Minimum token amount to be considered a whale transaction min_usd_value: Minimum USD value to be considered a whale transaction start_block: Starting block number end_block: Ending block number max_pages: Maximum number of pages to fetch per address (default: 10) Returns: DataFrame of whale transactions """ try: # Create a cache key based on parameters cache_key = f"{','.join(addresses)}_{token_address}_{min_token_amount}_{min_usd_value}_{start_block}_{end_block}_{max_pages}" # Check if we have cached results if cache_key in self._transaction_cache: logging.info(f"Using cached transactions for {len(addresses)} addresses") return self._transaction_cache[cache_key] all_transfers = [] logging.info(f"Fetching whale transactions for {len(addresses)} addresses") logging.info(f"Token address filter: {token_address if token_address else 'None'}") logging.info(f"Min token amount: {min_token_amount}") logging.info(f"Min USD value: {min_usd_value}") for i, address in enumerate(addresses): try: logging.info(f"Processing address {i+1}/{len(addresses)}: {address}") # Create address-specific cache key addr_cache_key = f"{address}_{token_address}_{start_block}_{end_block}_{max_pages}" # Check if we have cached results for this specific address if addr_cache_key in self._transaction_cache: transfers = self._transaction_cache[addr_cache_key] logging.info(f"Using cached {len(transfers)} transfers for address {address}") else: transfers = self.fetch_all_token_transfers( address=address, contract_address=token_address, start_block=start_block, end_block=end_block, max_pages=max_pages ) logging.info(f"Found {len(transfers)} transfers for address {address}") # Cache the results for this address self._transaction_cache[addr_cache_key] = transfers all_transfers.extend(transfers) except Exception as e: logging.error(f"Failed to fetch transactions for address {address}: {str(e)}") continue logging.info(f"Total transfers found: {len(all_transfers)}") if not all_transfers: logging.warning("No whale transactions found for the specified addresses") return pd.DataFrame() # Convert to DataFrame logging.info("Converting transfers to DataFrame") df = pd.DataFrame(all_transfers) # Log the column names logging.info(f"DataFrame created with {len(df)} rows and {len(df.columns)} columns") logging.info(f"Columns: {', '.join(df.columns[:5])}...") # Apply token amount filter if specified if min_token_amount is not None: logging.info(f"Applying min token amount filter: {min_token_amount}") # Convert to float and then filter df['tokenAmount'] = df['value'].astype(float) / (10 ** df['tokenDecimal'].astype(int)) df = df[df['tokenAmount'] >= min_token_amount] logging.info(f"After token amount filtering: {len(df)}/{len(all_transfers)} rows remain") # Apply USD value filter if specified (this would require price data) if min_usd_value is not None and 'tokenAmount' in df.columns: logging.info(f"USD value filtering is not implemented yet") # This would require token price data, which we don't have yet # df = df[df['usd_value'] >= min_usd_value] # Convert timestamp to datetime if 'timeStamp' in df.columns: logging.info("Converting timestamp to datetime") try: df['timeStamp'] = pd.to_datetime(df['timeStamp'].astype(float), unit='s') except Exception as e: logging.error(f"Error converting timestamp: {str(e)}") logging.info(f"Final DataFrame has {len(df)} rows") # Cache the final result self._transaction_cache[cache_key] = df return df except Exception as e: logging.error(f"Error fetching whale transactions: {str(e)}") return pd.DataFrame() def get_internal_transactions(self, address: str, start_block: int = 0, end_block: int = 99999999, page: int = 1, offset: int = 100, sort: str = "desc") -> List[Dict[str, Any]]: """ Get internal transactions for an address Args: address: Wallet address start_block: Starting block number end_block: Ending block number page: Page number offset: Number of results per page sort: Sort order ("asc" or "desc") Returns: List of internal transactions """ params = { "module": "account", "action": "txlistinternal", "address": address, "startblock": str(start_block), "endblock": str(end_block), "page": str(page), "offset": str(offset), "sort": sort } result = self._make_request(params) if result.get("status") == "1": return result.get("result", []) else: message = result.get("message", "Unknown error") if "No transactions found" in message: return [] else: logging.warning(f"Error fetching internal transactions: {message}") return [] class GeminiClient: """ Client to interact with the Gemini API for fetching token prices """ def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://api.gemini.com/v1" # Add caching to avoid repetitive API calls self._price_cache = {} # Track API errors to avoid flooding logs self._error_count = {} self._last_api_call = 0 # For rate limiting def get_current_price(self, symbol: str) -> Optional[float]: """ Get the current price of a token Args: symbol: Token symbol (e.g., "ETHUSD") Returns: Current price as a float or None if not found """ try: url = f"{self.base_url}/pubticker/{symbol}" response = requests.get(url) response.raise_for_status() data = response.json() return float(data.get("last", 0)) except requests.exceptions.RequestException as e: logging.error(f"Error fetching price from Gemini API: {e}") return None def get_historical_prices(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]: """ Get historical prices for a token within a time range Args: symbol: Token symbol (e.g., "ETHUSD") start_time: Start datetime end_time: End datetime Returns: DataFrame of historical prices with timestamps """ # Implement simple rate limiting current_time = time.time() if current_time - self._last_api_call < 0.05: # 50ms minimum between calls time.sleep(0.05) self._last_api_call = current_time # Create a cache key based on the parameters cache_key = f"{symbol}_{int(start_time.timestamp())}_{int(end_time.timestamp())}" # Check if we already have this data cached if cache_key in self._price_cache: return self._price_cache[cache_key] try: # Convert datetime to milliseconds start_ms = int(start_time.timestamp() * 1000) end_ms = int(end_time.timestamp() * 1000) url = f"{self.base_url}/trades/{symbol}" params = { "limit_trades": 500, "timestamp": start_ms } # Check if we've seen too many errors for this symbol error_key = f"error_{symbol}" if self._error_count.get(error_key, 0) > 10: # If we've already had too many errors for this symbol, don't try again return None response = requests.get(url, params=params) response.raise_for_status() trades = response.json() # Reset error count on success self._error_count[error_key] = 0 # Filter trades within the time range filtered_trades = [ trade for trade in trades if start_ms <= trade.get("timestampms", 0) <= end_ms ] if not filtered_trades: # Cache negative result to avoid future lookups self._price_cache[cache_key] = None return None # Convert to DataFrame df = pd.DataFrame(filtered_trades) # Convert timestamp to datetime df['timestamp'] = pd.to_datetime(df['timestampms'], unit='ms') # Select and rename columns result_df = df[['timestamp', 'price', 'amount']].copy() result_df.columns = ['Timestamp', 'Price', 'Amount'] # Convert price to float result_df['Price'] = result_df['Price'].astype(float) # Cache the result self._price_cache[cache_key] = result_df return result_df except requests.exceptions.HTTPError as e: # Handle HTTP errors more efficiently self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 # Only log the first few occurrences of each error if self._error_count[error_key] <= 3: logging.warning(f"HTTP error fetching price for {symbol}: {e.response.status_code}") return None except Exception as e: # For other errors, use a similar approach self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 if self._error_count[error_key] <= 3: logging.error(f"Error fetching prices for {symbol}: {str(e)}") return None def get_price_at_time(self, symbol: str, timestamp: datetime) -> Optional[float]: """ Get the approximate price of a token at a specific time Args: symbol: Token symbol (e.g., "ETHUSD") timestamp: Target datetime Returns: Price at the specified time as a float or None if not found """ # Look for prices 5 minutes before and after the target time start_time = timestamp - pd.Timedelta(minutes=5) end_time = timestamp + pd.Timedelta(minutes=5) prices_df = self.get_historical_prices(symbol, start_time, end_time) if prices_df is None or prices_df.empty: return None # Find the closest price prices_df['time_diff'] = abs(prices_df['Timestamp'] - timestamp) closest_price = prices_df.loc[prices_df['time_diff'].idxmin(), 'Price'] return closest_price def get_price_impact(self, symbol: str, transaction_time: datetime, lookback_minutes: int = 5, lookahead_minutes: int = 5) -> Dict[str, Any]: """ Analyze the price impact before and after a transaction Args: symbol: Token symbol (e.g., "ETHUSD") transaction_time: Transaction datetime lookback_minutes: Minutes to look back before the transaction lookahead_minutes: Minutes to look ahead after the transaction Returns: Dictionary with price impact metrics """ start_time = transaction_time - pd.Timedelta(minutes=lookback_minutes) end_time = transaction_time + pd.Timedelta(minutes=lookahead_minutes) prices_df = self.get_historical_prices(symbol, start_time, end_time) if prices_df is None or prices_df.empty: return { "pre_price": None, "post_price": None, "impact_pct": None, "prices_df": None } # Find pre and post transaction prices pre_prices = prices_df[prices_df['Timestamp'] < transaction_time] post_prices = prices_df[prices_df['Timestamp'] >= transaction_time] pre_price = pre_prices['Price'].iloc[-1] if not pre_prices.empty else None post_price = post_prices['Price'].iloc[0] if not post_prices.empty else None # Calculate impact percentage impact_pct = None if pre_price is not None and post_price is not None: impact_pct = ((post_price - pre_price) / pre_price) * 100 return { "pre_price": pre_price, "post_price": post_price, "impact_pct": impact_pct, "prices_df": prices_df } def fetch_historical_prices(self, token_symbol: str, timestamp) -> Dict[str, Any]: """Fetch historical price data for a token at a specific timestamp Args: token_symbol: Token symbol (e.g., "ETH") timestamp: Timestamp (can be int, float, datetime, or pandas Timestamp) Returns: Dictionary with price data """ # Convert timestamp to integer if it's not already timestamp_value = 0 try: # Handle different timestamp types if isinstance(timestamp, (int, float)): timestamp_value = int(timestamp) elif isinstance(timestamp, pd.Timestamp): timestamp_value = int(timestamp.timestamp()) elif isinstance(timestamp, datetime): timestamp_value = int(timestamp.timestamp()) elif isinstance(timestamp, str): # Try to parse string as timestamp dt = pd.to_datetime(timestamp) timestamp_value = int(dt.timestamp()) else: # Default to current time if invalid type logging.warning(f"Invalid timestamp type: {type(timestamp)}, using current time") timestamp_value = int(time.time()) except Exception as e: logging.warning(f"Error converting timestamp {timestamp}: {str(e)}, using current time") timestamp_value = int(time.time()) # Check cache first cache_key = f"{token_symbol}_{timestamp_value}" if cache_key in self._price_cache: return self._price_cache[cache_key] # Implement rate limiting current_time = time.time() if current_time - self._last_api_call < 0.05: # 50ms minimum between calls time.sleep(0.05) self._last_api_call = current_time # Check error count for this symbol error_key = f"error_{token_symbol}" if self._error_count.get(error_key, 0) > 10: # Too many errors, return cached failure return { 'symbol': token_symbol, 'timestamp': timestamp_value, 'price': None, 'status': 'error', 'error': 'Too many previous errors' } try: url = f"{self.base_url}/trades/{token_symbol}USD" params = { 'limit_trades': 500, 'timestamp': timestamp_value * 1000 # Convert to milliseconds } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Reset error count on success self._error_count[error_key] = 0 # Calculate average price from recent trades if data: prices = [float(trade['price']) for trade in data] avg_price = sum(prices) / len(prices) result = { 'symbol': token_symbol, 'timestamp': timestamp_value, 'price': avg_price, 'status': 'success' } # Cache success self._price_cache[cache_key] = result return result else: result = { 'symbol': token_symbol, 'timestamp': timestamp_value, 'price': None, 'status': 'no_data' } # Cache no data self._price_cache[cache_key] = result return result except requests.exceptions.HTTPError as e: # Handle HTTP errors efficiently self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 # Only log first few occurrences if self._error_count[error_key] <= 3: logging.warning(f"HTTP error fetching price for {token_symbol}: {e.response.status_code}") elif self._error_count[error_key] == 10: logging.warning(f"Suppressing further logs for {token_symbol} errors") result = { 'symbol': token_symbol, 'timestamp': timestamp, 'price': None, 'status': 'error', 'error': f"HTTP {e.response.status_code}" } self._price_cache[cache_key] = result return result except Exception as e: # For other errors self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 if self._error_count[error_key] <= 3: logging.error(f"Error fetching prices for {token_symbol}: {str(e)}") result = { 'symbol': token_symbol, 'timestamp': timestamp_value, 'price': None, 'status': 'error', 'error': str(e) } self._price_cache[cache_key] = result return result