arpit13's picture
Deploy Whale_Arbitrum on HF Spaces
011960a
import json
import pandas as pd
from datetime import datetime
from typing import Dict, List, Optional, Union, Any, Tuple
from langchain.tools import tool
from modules.api_client import ArbiscanClient, GeminiClient
from modules.data_processor import DataProcessor
# Tools for Arbiscan API
class ArbiscanTools:
def __init__(self, arbiscan_client: ArbiscanClient):
self.client = arbiscan_client
@tool("get_token_transfers")
def get_token_transfers(self, address: str, contract_address: Optional[str] = None) -> str:
"""
Get ERC-20 token transfers for a specific address
Args:
address: Wallet address
contract_address: Optional token contract address to filter by
Returns:
List of token transfers as JSON string
"""
transfers = self.client.get_token_transfers(
address=address,
contract_address=contract_address
)
return json.dumps(transfers)
@tool("get_token_balance")
def get_token_balance(self, address: str, contract_address: str) -> str:
"""
Get the current balance of a specific token for an address
Args:
address: Wallet address
contract_address: Token contract address
Returns:
Token balance
"""
balance = self.client.get_token_balance(
address=address,
contract_address=contract_address
)
return balance
@tool("get_normal_transactions")
def get_normal_transactions(self, address: str) -> str:
"""
Get normal transactions (ETH/ARB transfers) for a specific address
Args:
address: Wallet address
Returns:
List of normal transactions as JSON string
"""
transactions = self.client.get_normal_transactions(address=address)
return json.dumps(transactions)
@tool("get_internal_transactions")
def get_internal_transactions(self, address: str) -> str:
"""
Get internal transactions for a specific address
Args:
address: Wallet address
Returns:
List of internal transactions as JSON string
"""
transactions = self.client.get_internal_transactions(address=address)
return json.dumps(transactions)
@tool("fetch_whale_transactions")
def fetch_whale_transactions(self,
addresses: List[str],
token_address: Optional[str] = None,
min_token_amount: Optional[float] = None,
min_usd_value: Optional[float] = None) -> str:
"""
Fetch whale transactions for a list of addresses
Args:
addresses: List of wallet addresses
token_address: Optional token contract address to filter by
min_token_amount: Minimum token amount
min_usd_value: Minimum USD value
Returns:
DataFrame of whale transactions as JSON string
"""
transactions_df = self.client.fetch_whale_transactions(
addresses=addresses,
token_address=token_address,
min_token_amount=min_token_amount,
min_usd_value=min_usd_value
)
return transactions_df.to_json(orient="records")
# Tools for Gemini API
class GeminiTools:
def __init__(self, gemini_client: GeminiClient):
self.client = gemini_client
@tool("get_current_price")
def get_current_price(self, symbol: str) -> str:
"""
Get the current price of a token
Args:
symbol: Token symbol (e.g., "ETHUSD")
Returns:
Current price
"""
price = self.client.get_current_price(symbol=symbol)
return str(price) if price is not None else "Price not found"
@tool("get_historical_prices")
def get_historical_prices(self,
symbol: str,
start_time: str,
end_time: str) -> str:
"""
Get historical prices for a token within a time range
Args:
symbol: Token symbol (e.g., "ETHUSD")
start_time: Start datetime in ISO format
end_time: End datetime in ISO format
Returns:
DataFrame of historical prices as JSON string
"""
# Parse datetime strings
start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_time_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
prices_df = self.client.get_historical_prices(
symbol=symbol,
start_time=start_time_dt,
end_time=end_time_dt
)
if prices_df is not None:
return prices_df.to_json(orient="records")
else:
return "[]"
@tool("get_price_impact")
def get_price_impact(self,
symbol: str,
transaction_time: str,
lookback_minutes: int = 5,
lookahead_minutes: int = 5) -> str:
"""
Analyze the price impact before and after a transaction
Args:
symbol: Token symbol (e.g., "ETHUSD")
transaction_time: Transaction datetime in ISO format
lookback_minutes: Minutes to look back before the transaction
lookahead_minutes: Minutes to look ahead after the transaction
Returns:
Price impact data as JSON string
"""
# Parse datetime string
transaction_time_dt = datetime.fromisoformat(transaction_time.replace('Z', '+00:00'))
impact_data = self.client.get_price_impact(
symbol=symbol,
transaction_time=transaction_time_dt,
lookback_minutes=lookback_minutes,
lookahead_minutes=lookahead_minutes
)
# Convert to JSON string
result = {
"pre_price": impact_data["pre_price"],
"post_price": impact_data["post_price"],
"impact_pct": impact_data["impact_pct"]
}
return json.dumps(result)
# Tools for Data Processor
class DataProcessorTools:
def __init__(self, data_processor: DataProcessor):
self.processor = data_processor
@tool("aggregate_transactions")
def aggregate_transactions(self,
transactions_json: str,
time_window: str = 'D') -> str:
"""
Aggregate transactions by time window
Args:
transactions_json: JSON string of transactions
time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour)
Returns:
Aggregated DataFrame as JSON string
"""
# Convert JSON to DataFrame
transactions_df = pd.read_json(transactions_json)
# Process data
agg_df = self.processor.aggregate_transactions(
transactions_df=transactions_df,
time_window=time_window
)
# Convert result to JSON
return agg_df.to_json(orient="records")
@tool("identify_patterns")
def identify_patterns(self,
transactions_json: str,
n_clusters: int = 3) -> str:
"""
Identify trading patterns using clustering
Args:
transactions_json: JSON string of transactions
n_clusters: Number of clusters for K-Means
Returns:
List of pattern dictionaries as JSON string
"""
# Convert JSON to DataFrame
transactions_df = pd.read_json(transactions_json)
# Process data
patterns = self.processor.identify_patterns(
transactions_df=transactions_df,
n_clusters=n_clusters
)
# Convert result to JSON
result = []
for pattern in patterns:
# Convert non-serializable objects to serializable format
pattern_json = {
"name": pattern["name"],
"description": pattern["description"],
"cluster_id": pattern["cluster_id"],
"occurrence_count": pattern["occurrence_count"],
"confidence": pattern["confidence"],
# Skip chart_data as it's not JSON serializable
"examples": pattern["examples"].to_json(orient="records") if isinstance(pattern["examples"], pd.DataFrame) else []
}
result.append(pattern_json)
return json.dumps(result)
@tool("detect_anomalous_transactions")
def detect_anomalous_transactions(self,
transactions_json: str,
sensitivity: str = "Medium") -> str:
"""
Detect anomalous transactions using statistical methods
Args:
transactions_json: JSON string of transactions
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
DataFrame of anomalous transactions as JSON string
"""
# Convert JSON to DataFrame
transactions_df = pd.read_json(transactions_json)
# Process data
anomalies_df = self.processor.detect_anomalous_transactions(
transactions_df=transactions_df,
sensitivity=sensitivity
)
# Convert result to JSON
return anomalies_df.to_json(orient="records")
@tool("analyze_price_impact")
def analyze_price_impact(self,
transactions_json: str,
price_data_json: str) -> str:
"""
Analyze the price impact of transactions
Args:
transactions_json: JSON string of transactions
price_data_json: JSON string of price impact data
Returns:
Price impact analysis as JSON string
"""
# Convert JSON to DataFrame
transactions_df = pd.read_json(transactions_json)
# Convert price_data_json to dictionary
price_data = json.loads(price_data_json)
# Process data
impact_analysis = self.processor.analyze_price_impact(
transactions_df=transactions_df,
price_data=price_data
)
# Convert result to JSON (excluding non-serializable objects)
result = {
"avg_impact_pct": impact_analysis.get("avg_impact_pct"),
"max_impact_pct": impact_analysis.get("max_impact_pct"),
"min_impact_pct": impact_analysis.get("min_impact_pct"),
"significant_moves_count": impact_analysis.get("significant_moves_count"),
"total_transactions": impact_analysis.get("total_transactions"),
# Skip impact_chart as it's not JSON serializable
"transactions_with_impact": impact_analysis.get("transactions_with_impact").to_json(orient="records") if "transactions_with_impact" in impact_analysis else []
}
return json.dumps(result)
@tool("detect_wash_trading")
def detect_wash_trading(self,
transactions_json: str,
addresses_json: str,
sensitivity: str = "Medium") -> str:
"""
Detect potential wash trading between addresses
Args:
transactions_json: JSON string of transactions
addresses_json: JSON string of addresses to analyze
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential wash trading incidents as JSON string
"""
# Convert JSON to DataFrame
transactions_df = pd.read_json(transactions_json)
# Convert addresses_json to list
addresses = json.loads(addresses_json)
# Process data
wash_trades = self.processor.detect_wash_trading(
transactions_df=transactions_df,
addresses=addresses,
sensitivity=sensitivity
)
# Convert result to JSON (excluding non-serializable objects)
result = []
for trade in wash_trades:
trade_json = {
"type": trade["type"],
"addresses": trade["addresses"],
"risk_level": trade["risk_level"],
"description": trade["description"],
"detection_time": trade["detection_time"],
"title": trade["title"],
"evidence": trade["evidence"].to_json(orient="records") if isinstance(trade["evidence"], pd.DataFrame) else []
# Skip chart as it's not JSON serializable
}
result.append(trade_json)
return json.dumps(result)