Spaces:
Build error
Build error
import requests | |
import json | |
import time | |
import logging | |
from datetime import datetime | |
import pandas as pd | |
from typing import Dict, List, Optional, Union, Any | |
class ArbiscanClient: | |
""" | |
Client to interact with the Arbiscan API for fetching on-chain data from Arbitrum | |
""" | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.base_url = "https://api.arbiscan.io/api" | |
self.rate_limit_delay = 0.2 # Delay between API calls to avoid rate limiting (200ms) | |
# Add caching to improve performance | |
self._transaction_cache = {} | |
self._last_api_call_time = 0 | |
# Configure debug logging - set to True for verbose output, False for minimal output | |
self.verbose_debug = False | |
def _make_request(self, params: Dict[str, str]) -> Dict[str, Any]: | |
""" | |
Make a request to the Arbiscan API with rate limiting | |
""" | |
params["apikey"] = self.api_key | |
# Implement rate limiting | |
current_time = time.time() | |
time_since_last_call = current_time - self._last_api_call_time | |
if time_since_last_call < self.rate_limit_delay: | |
time.sleep(self.rate_limit_delay - time_since_last_call) | |
self._last_api_call_time = time.time() | |
try: | |
# Log the request details but only in verbose mode | |
if self.verbose_debug: | |
debug_params = params.copy() | |
debug_params.pop("apikey", None) | |
logging.debug(f"API Request: {self.base_url}") | |
logging.debug(f"Params: {json.dumps(debug_params, indent=2)}") | |
response = requests.get(self.base_url, params=params) | |
# Print response status and URL only in verbose mode | |
if self.verbose_debug: | |
logging.debug(f"Response Status: {response.status_code}") | |
logging.debug(f"Full URL: {response.url.replace(self.api_key, 'API_KEY_REDACTED')}") | |
response.raise_for_status() | |
# Parse the JSON response | |
json_data = response.json() | |
# Log the response structure but only in verbose mode | |
if self.verbose_debug: | |
result_preview = str(json_data.get('result', ''))[:100] + '...' if len(str(json_data.get('result', ''))) > 100 else str(json_data.get('result', '')) | |
logging.debug(f"Response Status: {json_data.get('status')}") | |
logging.debug(f"Response Message: {json_data.get('message', 'No message')}") | |
logging.debug(f"Result Preview: {result_preview}") | |
# Check for API-level errors in the response | |
status = json_data.get('status') | |
message = json_data.get('message', 'No message') | |
if status == '0' and message != 'No transactions found': | |
logging.warning(f"API Error: {message}") | |
return json_data | |
except requests.exceptions.HTTPError as e: | |
logging.error(f"HTTP Error in API Request: {e.response.status_code}") | |
raise | |
except requests.exceptions.ConnectionError as e: | |
logging.error(f"Connection Error in API Request: {str(e)}") | |
raise | |
except requests.exceptions.Timeout as e: | |
logging.error(f"Timeout in API Request: {str(e)}") | |
raise | |
except requests.exceptions.RequestException as e: | |
logging.error(f"API Request failed: {str(e)}") | |
print(f"ERROR - URL: {self.base_url}") | |
print(f"ERROR - Method: {params.get('module')}/{params.get('action')}") | |
return {"status": "0", "message": f"Error: {str(e)}", "result": []} | |
def get_eth_balance(self, address: str) -> float: | |
""" | |
Get the ETH balance of an address | |
Args: | |
address: Wallet address | |
Returns: | |
ETH balance as a float | |
""" | |
params = { | |
"module": "account", | |
"action": "balance", | |
"address": address, | |
"tag": "latest" | |
} | |
result = self._make_request(params) | |
if result.get("status") == "1": | |
# Convert wei to ETH | |
wei_balance = int(result.get("result", "0")) | |
eth_balance = wei_balance / 10**18 | |
return eth_balance | |
else: | |
return 0.0 | |
def get_token_balance(self, address: str, token_address: str) -> float: | |
""" | |
Get the token balance of an address for a specific token | |
Args: | |
address: Wallet address | |
token_address: Token contract address | |
Returns: | |
Token balance as a float | |
""" | |
params = { | |
"module": "account", | |
"action": "tokenbalance", | |
"address": address, | |
"contractaddress": token_address, | |
"tag": "latest" | |
} | |
result = self._make_request(params) | |
if result.get("status") == "1": | |
# Get token decimals and convert to proper amount | |
decimals = self.get_token_decimals(token_address) | |
raw_balance = int(result.get("result", "0")) | |
token_balance = raw_balance / 10**decimals | |
return token_balance | |
else: | |
return 0.0 | |
def get_token_decimals(self, token_address: str) -> int: | |
""" | |
Get the number of decimals for a token | |
Args: | |
token_address: Token contract address | |
Returns: | |
Number of decimals (default: 18) | |
""" | |
params = { | |
"module": "token", | |
"action": "getToken", | |
"contractaddress": token_address | |
} | |
result = self._make_request(params) | |
if result.get("status") == "1": | |
token_info = result.get("result", {}) | |
return int(token_info.get("divisor", "18")) | |
else: | |
# Default to 18 decimals (most ERC-20 tokens) | |
return 18 | |
def get_token_transfers(self, | |
address: str, | |
contract_address: Optional[str] = None, | |
start_block: int = 0, | |
end_block: int = 99999999, | |
page: int = 1, | |
offset: int = 100, | |
sort: str = "desc") -> List[Dict[str, Any]]: | |
""" | |
Get token transfers for an address | |
Args: | |
address: Wallet address | |
contract_address: Optional token contract address to filter by | |
start_block: Starting block number | |
end_block: Ending block number | |
page: Page number | |
offset: Number of results per page | |
sort: Sort order ("asc" or "desc") | |
Returns: | |
List of token transfers | |
""" | |
params = { | |
"module": "account", | |
"action": "tokentx", | |
"address": address, | |
"startblock": str(start_block), | |
"endblock": str(end_block), | |
"page": str(page), | |
"offset": str(offset), | |
"sort": sort | |
} | |
# Add contract address if specified | |
if contract_address: | |
params["contractaddress"] = contract_address | |
result = self._make_request(params) | |
if result.get("status") == "1": | |
return result.get("result", []) | |
else: | |
message = result.get("message", "Unknown error") | |
if "No transactions found" in message: | |
return [] | |
else: | |
logging.warning(f"Error fetching token transfers: {message}") | |
return [] | |
def fetch_all_token_transfers(self, | |
address: str, | |
contract_address: Optional[str] = None, | |
start_block: int = 0, | |
end_block: int = 99999999, | |
max_pages: int = 10) -> List[Dict[str, Any]]: | |
""" | |
Fetch all token transfers for an address, paginating through results | |
Args: | |
address: Wallet address | |
contract_address: Optional token contract address to filter by | |
start_block: Starting block number | |
end_block: Ending block number | |
max_pages: Maximum number of pages to fetch | |
Returns: | |
List of all token transfers | |
""" | |
all_transfers = [] | |
offset = 100 # Results per page (API limit) | |
for page in range(1, max_pages + 1): | |
try: | |
transfers = self.get_token_transfers( | |
address=address, | |
contract_address=contract_address, | |
start_block=start_block, | |
end_block=end_block, | |
page=page, | |
offset=offset | |
) | |
# No more transfers, break the loop | |
if not transfers: | |
break | |
all_transfers.extend(transfers) | |
# If we got fewer results than the offset, we've reached the end | |
if len(transfers) < offset: | |
break | |
except Exception as e: | |
logging.error(f"Error fetching page {page} of token transfers: {str(e)}") | |
break | |
return all_transfers | |
def fetch_whale_transactions(self, | |
addresses: List[str], | |
token_address: Optional[str] = None, | |
min_token_amount: Optional[float] = None, | |
min_usd_value: Optional[float] = None, | |
start_block: int = 0, | |
end_block: int = 99999999, | |
max_pages: int = 10) -> pd.DataFrame: | |
""" | |
Fetch whale transactions for a list of addresses | |
Args: | |
addresses: List of wallet addresses | |
token_address: Optional token contract address to filter by | |
min_token_amount: Minimum token amount to be considered a whale transaction | |
min_usd_value: Minimum USD value to be considered a whale transaction | |
start_block: Starting block number | |
end_block: Ending block number | |
max_pages: Maximum number of pages to fetch per address (default: 10) | |
Returns: | |
DataFrame of whale transactions | |
""" | |
try: | |
# Create a cache key based on parameters | |
cache_key = f"{','.join(addresses)}_{token_address}_{min_token_amount}_{min_usd_value}_{start_block}_{end_block}_{max_pages}" | |
# Check if we have cached results | |
if cache_key in self._transaction_cache: | |
logging.info(f"Using cached transactions for {len(addresses)} addresses") | |
return self._transaction_cache[cache_key] | |
all_transfers = [] | |
logging.info(f"Fetching whale transactions for {len(addresses)} addresses") | |
logging.info(f"Token address filter: {token_address if token_address else 'None'}") | |
logging.info(f"Min token amount: {min_token_amount}") | |
logging.info(f"Min USD value: {min_usd_value}") | |
for i, address in enumerate(addresses): | |
try: | |
logging.info(f"Processing address {i+1}/{len(addresses)}: {address}") | |
# Create address-specific cache key | |
addr_cache_key = f"{address}_{token_address}_{start_block}_{end_block}_{max_pages}" | |
# Check if we have cached results for this specific address | |
if addr_cache_key in self._transaction_cache: | |
transfers = self._transaction_cache[addr_cache_key] | |
logging.info(f"Using cached {len(transfers)} transfers for address {address}") | |
else: | |
transfers = self.fetch_all_token_transfers( | |
address=address, | |
contract_address=token_address, | |
start_block=start_block, | |
end_block=end_block, | |
max_pages=max_pages | |
) | |
logging.info(f"Found {len(transfers)} transfers for address {address}") | |
# Cache the results for this address | |
self._transaction_cache[addr_cache_key] = transfers | |
all_transfers.extend(transfers) | |
except Exception as e: | |
logging.error(f"Failed to fetch transactions for address {address}: {str(e)}") | |
continue | |
logging.info(f"Total transfers found: {len(all_transfers)}") | |
if not all_transfers: | |
logging.warning("No whale transactions found for the specified addresses") | |
return pd.DataFrame() | |
# Convert to DataFrame | |
logging.info("Converting transfers to DataFrame") | |
df = pd.DataFrame(all_transfers) | |
# Log the column names | |
logging.info(f"DataFrame created with {len(df)} rows and {len(df.columns)} columns") | |
logging.info(f"Columns: {', '.join(df.columns[:5])}...") | |
# Apply token amount filter if specified | |
if min_token_amount is not None: | |
logging.info(f"Applying min token amount filter: {min_token_amount}") | |
# Convert to float and then filter | |
df['tokenAmount'] = df['value'].astype(float) / (10 ** df['tokenDecimal'].astype(int)) | |
df = df[df['tokenAmount'] >= min_token_amount] | |
logging.info(f"After token amount filtering: {len(df)}/{len(all_transfers)} rows remain") | |
# Apply USD value filter if specified (this would require price data) | |
if min_usd_value is not None and 'tokenAmount' in df.columns: | |
logging.info(f"USD value filtering is not implemented yet") | |
# This would require token price data, which we don't have yet | |
# df = df[df['usd_value'] >= min_usd_value] | |
# Convert timestamp to datetime | |
if 'timeStamp' in df.columns: | |
logging.info("Converting timestamp to datetime") | |
try: | |
df['timeStamp'] = pd.to_datetime(df['timeStamp'].astype(float), unit='s') | |
except Exception as e: | |
logging.error(f"Error converting timestamp: {str(e)}") | |
logging.info(f"Final DataFrame has {len(df)} rows") | |
# Cache the final result | |
self._transaction_cache[cache_key] = df | |
return df | |
except Exception as e: | |
logging.error(f"Error fetching whale transactions: {str(e)}") | |
return pd.DataFrame() | |
def get_internal_transactions(self, | |
address: str, | |
start_block: int = 0, | |
end_block: int = 99999999, | |
page: int = 1, | |
offset: int = 100, | |
sort: str = "desc") -> List[Dict[str, Any]]: | |
""" | |
Get internal transactions for an address | |
Args: | |
address: Wallet address | |
start_block: Starting block number | |
end_block: Ending block number | |
page: Page number | |
offset: Number of results per page | |
sort: Sort order ("asc" or "desc") | |
Returns: | |
List of internal transactions | |
""" | |
params = { | |
"module": "account", | |
"action": "txlistinternal", | |
"address": address, | |
"startblock": str(start_block), | |
"endblock": str(end_block), | |
"page": str(page), | |
"offset": str(offset), | |
"sort": sort | |
} | |
result = self._make_request(params) | |
if result.get("status") == "1": | |
return result.get("result", []) | |
else: | |
message = result.get("message", "Unknown error") | |
if "No transactions found" in message: | |
return [] | |
else: | |
logging.warning(f"Error fetching internal transactions: {message}") | |
return [] | |
class GeminiClient: | |
""" | |
Client to interact with the Gemini API for fetching token prices | |
""" | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.base_url = "https://api.gemini.com/v1" | |
# Add caching to avoid repetitive API calls | |
self._price_cache = {} | |
# Track API errors to avoid flooding logs | |
self._error_count = {} | |
self._last_api_call = 0 # For rate limiting | |
def get_current_price(self, symbol: str) -> Optional[float]: | |
""" | |
Get the current price of a token | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
Returns: | |
Current price as a float or None if not found | |
""" | |
try: | |
url = f"{self.base_url}/pubticker/{symbol}" | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json() | |
return float(data.get("last", 0)) | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error fetching price from Gemini API: {e}") | |
return None | |
def get_historical_prices(self, | |
symbol: str, | |
start_time: datetime, | |
end_time: datetime) -> Optional[pd.DataFrame]: | |
""" | |
Get historical prices for a token within a time range | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
start_time: Start datetime | |
end_time: End datetime | |
Returns: | |
DataFrame of historical prices with timestamps | |
""" | |
# Implement simple rate limiting | |
current_time = time.time() | |
if current_time - self._last_api_call < 0.05: # 50ms minimum between calls | |
time.sleep(0.05) | |
self._last_api_call = current_time | |
# Create a cache key based on the parameters | |
cache_key = f"{symbol}_{int(start_time.timestamp())}_{int(end_time.timestamp())}" | |
# Check if we already have this data cached | |
if cache_key in self._price_cache: | |
return self._price_cache[cache_key] | |
try: | |
# Convert datetime to milliseconds | |
start_ms = int(start_time.timestamp() * 1000) | |
end_ms = int(end_time.timestamp() * 1000) | |
url = f"{self.base_url}/trades/{symbol}" | |
params = { | |
"limit_trades": 500, | |
"timestamp": start_ms | |
} | |
# Check if we've seen too many errors for this symbol | |
error_key = f"error_{symbol}" | |
if self._error_count.get(error_key, 0) > 10: | |
# If we've already had too many errors for this symbol, don't try again | |
return None | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
trades = response.json() | |
# Reset error count on success | |
self._error_count[error_key] = 0 | |
# Filter trades within the time range | |
filtered_trades = [ | |
trade for trade in trades | |
if start_ms <= trade.get("timestampms", 0) <= end_ms | |
] | |
if not filtered_trades: | |
# Cache negative result to avoid future lookups | |
self._price_cache[cache_key] = None | |
return None | |
# Convert to DataFrame | |
df = pd.DataFrame(filtered_trades) | |
# Convert timestamp to datetime | |
df['timestamp'] = pd.to_datetime(df['timestampms'], unit='ms') | |
# Select and rename columns | |
result_df = df[['timestamp', 'price', 'amount']].copy() | |
result_df.columns = ['Timestamp', 'Price', 'Amount'] | |
# Convert price to float | |
result_df['Price'] = result_df['Price'].astype(float) | |
# Cache the result | |
self._price_cache[cache_key] = result_df | |
return result_df | |
except requests.exceptions.HTTPError as e: | |
# Handle HTTP errors more efficiently | |
self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
# Only log the first few occurrences of each error | |
if self._error_count[error_key] <= 3: | |
logging.warning(f"HTTP error fetching price for {symbol}: {e.response.status_code}") | |
return None | |
except Exception as e: | |
# For other errors, use a similar approach | |
self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
if self._error_count[error_key] <= 3: | |
logging.error(f"Error fetching prices for {symbol}: {str(e)}") | |
return None | |
def get_price_at_time(self, | |
symbol: str, | |
timestamp: datetime) -> Optional[float]: | |
""" | |
Get the approximate price of a token at a specific time | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
timestamp: Target datetime | |
Returns: | |
Price at the specified time as a float or None if not found | |
""" | |
# Look for prices 5 minutes before and after the target time | |
start_time = timestamp - pd.Timedelta(minutes=5) | |
end_time = timestamp + pd.Timedelta(minutes=5) | |
prices_df = self.get_historical_prices(symbol, start_time, end_time) | |
if prices_df is None or prices_df.empty: | |
return None | |
# Find the closest price | |
prices_df['time_diff'] = abs(prices_df['Timestamp'] - timestamp) | |
closest_price = prices_df.loc[prices_df['time_diff'].idxmin(), 'Price'] | |
return closest_price | |
def get_price_impact(self, | |
symbol: str, | |
transaction_time: datetime, | |
lookback_minutes: int = 5, | |
lookahead_minutes: int = 5) -> Dict[str, Any]: | |
""" | |
Analyze the price impact before and after a transaction | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
transaction_time: Transaction datetime | |
lookback_minutes: Minutes to look back before the transaction | |
lookahead_minutes: Minutes to look ahead after the transaction | |
Returns: | |
Dictionary with price impact metrics | |
""" | |
start_time = transaction_time - pd.Timedelta(minutes=lookback_minutes) | |
end_time = transaction_time + pd.Timedelta(minutes=lookahead_minutes) | |
prices_df = self.get_historical_prices(symbol, start_time, end_time) | |
if prices_df is None or prices_df.empty: | |
return { | |
"pre_price": None, | |
"post_price": None, | |
"impact_pct": None, | |
"prices_df": None | |
} | |
# Find pre and post transaction prices | |
pre_prices = prices_df[prices_df['Timestamp'] < transaction_time] | |
post_prices = prices_df[prices_df['Timestamp'] >= transaction_time] | |
pre_price = pre_prices['Price'].iloc[-1] if not pre_prices.empty else None | |
post_price = post_prices['Price'].iloc[0] if not post_prices.empty else None | |
# Calculate impact percentage | |
impact_pct = None | |
if pre_price is not None and post_price is not None: | |
impact_pct = ((post_price - pre_price) / pre_price) * 100 | |
return { | |
"pre_price": pre_price, | |
"post_price": post_price, | |
"impact_pct": impact_pct, | |
"prices_df": prices_df | |
} | |
def fetch_historical_prices(self, token_symbol: str, timestamp) -> Dict[str, Any]: | |
"""Fetch historical price data for a token at a specific timestamp | |
Args: | |
token_symbol: Token symbol (e.g., "ETH") | |
timestamp: Timestamp (can be int, float, datetime, or pandas Timestamp) | |
Returns: | |
Dictionary with price data | |
""" | |
# Convert timestamp to integer if it's not already | |
timestamp_value = 0 | |
try: | |
# Handle different timestamp types | |
if isinstance(timestamp, (int, float)): | |
timestamp_value = int(timestamp) | |
elif isinstance(timestamp, pd.Timestamp): | |
timestamp_value = int(timestamp.timestamp()) | |
elif isinstance(timestamp, datetime): | |
timestamp_value = int(timestamp.timestamp()) | |
elif isinstance(timestamp, str): | |
# Try to parse string as timestamp | |
dt = pd.to_datetime(timestamp) | |
timestamp_value = int(dt.timestamp()) | |
else: | |
# Default to current time if invalid type | |
logging.warning(f"Invalid timestamp type: {type(timestamp)}, using current time") | |
timestamp_value = int(time.time()) | |
except Exception as e: | |
logging.warning(f"Error converting timestamp {timestamp}: {str(e)}, using current time") | |
timestamp_value = int(time.time()) | |
# Check cache first | |
cache_key = f"{token_symbol}_{timestamp_value}" | |
if cache_key in self._price_cache: | |
return self._price_cache[cache_key] | |
# Implement rate limiting | |
current_time = time.time() | |
if current_time - self._last_api_call < 0.05: # 50ms minimum between calls | |
time.sleep(0.05) | |
self._last_api_call = current_time | |
# Check error count for this symbol | |
error_key = f"error_{token_symbol}" | |
if self._error_count.get(error_key, 0) > 10: | |
# Too many errors, return cached failure | |
return { | |
'symbol': token_symbol, | |
'timestamp': timestamp_value, | |
'price': None, | |
'status': 'error', | |
'error': 'Too many previous errors' | |
} | |
try: | |
url = f"{self.base_url}/trades/{token_symbol}USD" | |
params = { | |
'limit_trades': 500, | |
'timestamp': timestamp_value * 1000 # Convert to milliseconds | |
} | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
# Reset error count on success | |
self._error_count[error_key] = 0 | |
# Calculate average price from recent trades | |
if data: | |
prices = [float(trade['price']) for trade in data] | |
avg_price = sum(prices) / len(prices) | |
result = { | |
'symbol': token_symbol, | |
'timestamp': timestamp_value, | |
'price': avg_price, | |
'status': 'success' | |
} | |
# Cache success | |
self._price_cache[cache_key] = result | |
return result | |
else: | |
result = { | |
'symbol': token_symbol, | |
'timestamp': timestamp_value, | |
'price': None, | |
'status': 'no_data' | |
} | |
# Cache no data | |
self._price_cache[cache_key] = result | |
return result | |
except requests.exceptions.HTTPError as e: | |
# Handle HTTP errors efficiently | |
self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
# Only log first few occurrences | |
if self._error_count[error_key] <= 3: | |
logging.warning(f"HTTP error fetching price for {token_symbol}: {e.response.status_code}") | |
elif self._error_count[error_key] == 10: | |
logging.warning(f"Suppressing further logs for {token_symbol} errors") | |
result = { | |
'symbol': token_symbol, | |
'timestamp': timestamp, | |
'price': None, | |
'status': 'error', | |
'error': f"HTTP {e.response.status_code}" | |
} | |
self._price_cache[cache_key] = result | |
return result | |
except Exception as e: | |
# For other errors | |
self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
if self._error_count[error_key] <= 3: | |
logging.error(f"Error fetching prices for {token_symbol}: {str(e)}") | |
result = { | |
'symbol': token_symbol, | |
'timestamp': timestamp_value, | |
'price': None, | |
'status': 'error', | |
'error': str(e) | |
} | |
self._price_cache[cache_key] = result | |
return result | |