import json import pandas as pd from datetime import datetime from typing import Dict, List, Optional, Union, Any, Tuple from langchain.tools import tool from modules.api_client import ArbiscanClient, GeminiClient from modules.data_processor import DataProcessor # Tools for Arbiscan API class ArbiscanTools: def __init__(self, arbiscan_client: ArbiscanClient): self.client = arbiscan_client @tool("get_token_transfers") def get_token_transfers(self, address: str, contract_address: Optional[str] = None) -> str: """ Get ERC-20 token transfers for a specific address Args: address: Wallet address contract_address: Optional token contract address to filter by Returns: List of token transfers as JSON string """ transfers = self.client.get_token_transfers( address=address, contract_address=contract_address ) return json.dumps(transfers) @tool("get_token_balance") def get_token_balance(self, address: str, contract_address: str) -> str: """ Get the current balance of a specific token for an address Args: address: Wallet address contract_address: Token contract address Returns: Token balance """ balance = self.client.get_token_balance( address=address, contract_address=contract_address ) return balance @tool("get_normal_transactions") def get_normal_transactions(self, address: str) -> str: """ Get normal transactions (ETH/ARB transfers) for a specific address Args: address: Wallet address Returns: List of normal transactions as JSON string """ transactions = self.client.get_normal_transactions(address=address) return json.dumps(transactions) @tool("get_internal_transactions") def get_internal_transactions(self, address: str) -> str: """ Get internal transactions for a specific address Args: address: Wallet address Returns: List of internal transactions as JSON string """ transactions = self.client.get_internal_transactions(address=address) return json.dumps(transactions) @tool("fetch_whale_transactions") def fetch_whale_transactions(self, addresses: List[str], token_address: Optional[str] = None, min_token_amount: Optional[float] = None, min_usd_value: Optional[float] = None) -> str: """ Fetch whale transactions for a list of addresses Args: addresses: List of wallet addresses token_address: Optional token contract address to filter by min_token_amount: Minimum token amount min_usd_value: Minimum USD value Returns: DataFrame of whale transactions as JSON string """ transactions_df = self.client.fetch_whale_transactions( addresses=addresses, token_address=token_address, min_token_amount=min_token_amount, min_usd_value=min_usd_value ) return transactions_df.to_json(orient="records") # Tools for Gemini API class GeminiTools: def __init__(self, gemini_client: GeminiClient): self.client = gemini_client @tool("get_current_price") def get_current_price(self, symbol: str) -> str: """ Get the current price of a token Args: symbol: Token symbol (e.g., "ETHUSD") Returns: Current price """ price = self.client.get_current_price(symbol=symbol) return str(price) if price is not None else "Price not found" @tool("get_historical_prices") def get_historical_prices(self, symbol: str, start_time: str, end_time: str) -> str: """ Get historical prices for a token within a time range Args: symbol: Token symbol (e.g., "ETHUSD") start_time: Start datetime in ISO format end_time: End datetime in ISO format Returns: DataFrame of historical prices as JSON string """ # Parse datetime strings start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) end_time_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) prices_df = self.client.get_historical_prices( symbol=symbol, start_time=start_time_dt, end_time=end_time_dt ) if prices_df is not None: return prices_df.to_json(orient="records") else: return "[]" @tool("get_price_impact") def get_price_impact(self, symbol: str, transaction_time: str, lookback_minutes: int = 5, lookahead_minutes: int = 5) -> str: """ Analyze the price impact before and after a transaction Args: symbol: Token symbol (e.g., "ETHUSD") transaction_time: Transaction datetime in ISO format lookback_minutes: Minutes to look back before the transaction lookahead_minutes: Minutes to look ahead after the transaction Returns: Price impact data as JSON string """ # Parse datetime string transaction_time_dt = datetime.fromisoformat(transaction_time.replace('Z', '+00:00')) impact_data = self.client.get_price_impact( symbol=symbol, transaction_time=transaction_time_dt, lookback_minutes=lookback_minutes, lookahead_minutes=lookahead_minutes ) # Convert to JSON string result = { "pre_price": impact_data["pre_price"], "post_price": impact_data["post_price"], "impact_pct": impact_data["impact_pct"] } return json.dumps(result) # Tools for Data Processor class DataProcessorTools: def __init__(self, data_processor: DataProcessor): self.processor = data_processor @tool("aggregate_transactions") def aggregate_transactions(self, transactions_json: str, time_window: str = 'D') -> str: """ Aggregate transactions by time window Args: transactions_json: JSON string of transactions time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) Returns: Aggregated DataFrame as JSON string """ # Convert JSON to DataFrame transactions_df = pd.read_json(transactions_json) # Process data agg_df = self.processor.aggregate_transactions( transactions_df=transactions_df, time_window=time_window ) # Convert result to JSON return agg_df.to_json(orient="records") @tool("identify_patterns") def identify_patterns(self, transactions_json: str, n_clusters: int = 3) -> str: """ Identify trading patterns using clustering Args: transactions_json: JSON string of transactions n_clusters: Number of clusters for K-Means Returns: List of pattern dictionaries as JSON string """ # Convert JSON to DataFrame transactions_df = pd.read_json(transactions_json) # Process data patterns = self.processor.identify_patterns( transactions_df=transactions_df, n_clusters=n_clusters ) # Convert result to JSON result = [] for pattern in patterns: # Convert non-serializable objects to serializable format pattern_json = { "name": pattern["name"], "description": pattern["description"], "cluster_id": pattern["cluster_id"], "occurrence_count": pattern["occurrence_count"], "confidence": pattern["confidence"], # Skip chart_data as it's not JSON serializable "examples": pattern["examples"].to_json(orient="records") if isinstance(pattern["examples"], pd.DataFrame) else [] } result.append(pattern_json) return json.dumps(result) @tool("detect_anomalous_transactions") def detect_anomalous_transactions(self, transactions_json: str, sensitivity: str = "Medium") -> str: """ Detect anomalous transactions using statistical methods Args: transactions_json: JSON string of transactions sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: DataFrame of anomalous transactions as JSON string """ # Convert JSON to DataFrame transactions_df = pd.read_json(transactions_json) # Process data anomalies_df = self.processor.detect_anomalous_transactions( transactions_df=transactions_df, sensitivity=sensitivity ) # Convert result to JSON return anomalies_df.to_json(orient="records") @tool("analyze_price_impact") def analyze_price_impact(self, transactions_json: str, price_data_json: str) -> str: """ Analyze the price impact of transactions Args: transactions_json: JSON string of transactions price_data_json: JSON string of price impact data Returns: Price impact analysis as JSON string """ # Convert JSON to DataFrame transactions_df = pd.read_json(transactions_json) # Convert price_data_json to dictionary price_data = json.loads(price_data_json) # Process data impact_analysis = self.processor.analyze_price_impact( transactions_df=transactions_df, price_data=price_data ) # Convert result to JSON (excluding non-serializable objects) result = { "avg_impact_pct": impact_analysis.get("avg_impact_pct"), "max_impact_pct": impact_analysis.get("max_impact_pct"), "min_impact_pct": impact_analysis.get("min_impact_pct"), "significant_moves_count": impact_analysis.get("significant_moves_count"), "total_transactions": impact_analysis.get("total_transactions"), # Skip impact_chart as it's not JSON serializable "transactions_with_impact": impact_analysis.get("transactions_with_impact").to_json(orient="records") if "transactions_with_impact" in impact_analysis else [] } return json.dumps(result) @tool("detect_wash_trading") def detect_wash_trading(self, transactions_json: str, addresses_json: str, sensitivity: str = "Medium") -> str: """ Detect potential wash trading between addresses Args: transactions_json: JSON string of transactions addresses_json: JSON string of addresses to analyze sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential wash trading incidents as JSON string """ # Convert JSON to DataFrame transactions_df = pd.read_json(transactions_json) # Convert addresses_json to list addresses = json.loads(addresses_json) # Process data wash_trades = self.processor.detect_wash_trading( transactions_df=transactions_df, addresses=addresses, sensitivity=sensitivity ) # Convert result to JSON (excluding non-serializable objects) result = [] for trade in wash_trades: trade_json = { "type": trade["type"], "addresses": trade["addresses"], "risk_level": trade["risk_level"], "description": trade["description"], "detection_time": trade["detection_time"], "title": trade["title"], "evidence": trade["evidence"].to_json(orient="records") if isinstance(trade["evidence"], pd.DataFrame) else [] # Skip chart as it's not JSON serializable } result.append(trade_json) return json.dumps(result)