Spaces:
Build error
Build error
""" | |
Properly implemented tools for the WhaleAnalysisCrewSystem | |
""" | |
import json | |
import pandas as pd | |
from datetime import datetime | |
from typing import Any, Dict, List, Optional, Type | |
from pydantic import BaseModel, Field | |
import logging | |
from modules.api_client import ArbiscanClient, GeminiClient | |
from modules.data_processor import DataProcessor | |
from langchain.tools import BaseTool | |
class GetTokenTransfersInput(BaseModel): | |
"""Input for the get_token_transfers tool.""" | |
address: str = Field(..., description="Wallet address to query") | |
contract_address: Optional[str] = Field(None, description="Optional token contract address to filter by") | |
# Global clients that will be used by all tools | |
_GLOBAL_ARBISCAN_CLIENT = None | |
_GLOBAL_GEMINI_CLIENT = None | |
_GLOBAL_DATA_PROCESSOR = None | |
def set_global_clients(arbiscan_client=None, gemini_client=None, data_processor=None): | |
"""Set global client instances that will be used by all tools""" | |
global _GLOBAL_ARBISCAN_CLIENT, _GLOBAL_GEMINI_CLIENT, _GLOBAL_DATA_PROCESSOR | |
if arbiscan_client: | |
_GLOBAL_ARBISCAN_CLIENT = arbiscan_client | |
if gemini_client: | |
_GLOBAL_GEMINI_CLIENT = gemini_client | |
if data_processor: | |
_GLOBAL_DATA_PROCESSOR = data_processor | |
class ArbiscanGetTokenTransfersTool(BaseTool): | |
"""Tool for fetching token transfers from Arbiscan.""" | |
name = "arbiscan_get_token_transfers" | |
description = "Get ERC-20 token transfers for a specific address" | |
args_schema: Type[BaseModel] = GetTokenTransfersInput | |
def __init__(self, arbiscan_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if arbiscan_client: | |
set_global_clients(arbiscan_client=arbiscan_client) | |
def _run(self, address: str, contract_address: Optional[str] = None) -> str: | |
global _GLOBAL_ARBISCAN_CLIENT | |
if not _GLOBAL_ARBISCAN_CLIENT: | |
return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
try: | |
transfers = _GLOBAL_ARBISCAN_CLIENT.get_token_transfers( | |
address=address, | |
contract_address=contract_address | |
) | |
return json.dumps(transfers) | |
except Exception as e: | |
logging.error(f"Error in ArbiscanGetTokenTransfersTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class GetNormalTransactionsInput(BaseModel): | |
"""Input for the get_normal_transactions tool.""" | |
address: str = Field(..., description="Wallet address to query") | |
class ArbiscanGetNormalTransactionsTool(BaseTool): | |
"""Tool for fetching normal transactions from Arbiscan.""" | |
name = "arbiscan_get_normal_transactions" | |
description = "Get normal transactions (ETH/ARB transfers) for a specific address" | |
args_schema: Type[BaseModel] = GetNormalTransactionsInput | |
def __init__(self, arbiscan_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if arbiscan_client: | |
set_global_clients(arbiscan_client=arbiscan_client) | |
def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str: | |
global _GLOBAL_ARBISCAN_CLIENT | |
if not _GLOBAL_ARBISCAN_CLIENT: | |
return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
try: | |
txs = _GLOBAL_ARBISCAN_CLIENT.get_normal_transactions( | |
address=address, | |
start_block=startblock, | |
end_block=endblock, | |
page=page, | |
offset=offset | |
) | |
return json.dumps(txs) | |
except Exception as e: | |
logging.error(f"Error in ArbiscanGetNormalTransactionsTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class GetInternalTransactionsInput(BaseModel): | |
"""Input for the get_internal_transactions tool.""" | |
address: str = Field(..., description="Wallet address to query") | |
class ArbiscanGetInternalTransactionsTool(BaseTool): | |
"""Tool for fetching internal transactions from Arbiscan.""" | |
name = "arbiscan_get_internal_transactions" | |
description = "Get internal transactions for a specific address" | |
args_schema: Type[BaseModel] = GetInternalTransactionsInput | |
def __init__(self, arbiscan_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if arbiscan_client: | |
set_global_clients(arbiscan_client=arbiscan_client) | |
def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str: | |
global _GLOBAL_ARBISCAN_CLIENT | |
if not _GLOBAL_ARBISCAN_CLIENT: | |
return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
try: | |
txs = _GLOBAL_ARBISCAN_CLIENT.get_internal_transactions( | |
address=address, | |
start_block=startblock, | |
end_block=endblock, | |
page=page, | |
offset=offset | |
) | |
return json.dumps(txs) | |
except Exception as e: | |
logging.error(f"Error in ArbiscanGetInternalTransactionsTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class FetchWhaleTransactionsInput(BaseModel): | |
"""Input for the fetch_whale_transactions tool.""" | |
addresses: List[str] = Field(..., description="List of wallet addresses to query") | |
token_address: Optional[str] = Field(None, description="Optional token contract address to filter by") | |
min_token_amount: Optional[float] = Field(None, description="Minimum token amount") | |
min_usd_value: Optional[float] = Field(None, description="Minimum USD value") | |
class ArbiscanFetchWhaleTransactionsTool(BaseTool): | |
"""Tool for fetching whale transactions from Arbiscan.""" | |
name = "arbiscan_fetch_whale_transactions" | |
description = "Fetch whale transactions for a list of addresses" | |
args_schema: Type[BaseModel] = FetchWhaleTransactionsInput | |
def __init__(self, arbiscan_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if arbiscan_client: | |
set_global_clients(arbiscan_client=arbiscan_client) | |
def _run(self, addresses: List[str], token_address: Optional[str] = None, | |
min_token_amount: Optional[float] = None, min_usd_value: Optional[float] = None) -> str: | |
global _GLOBAL_ARBISCAN_CLIENT | |
if not _GLOBAL_ARBISCAN_CLIENT: | |
return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
try: | |
transactions_df = _GLOBAL_ARBISCAN_CLIENT.fetch_whale_transactions( | |
addresses=addresses, | |
token_address=token_address, | |
min_token_amount=min_token_amount, | |
min_usd_value=min_usd_value, | |
max_pages=5 # Limit to 5 pages to prevent excessive API calls | |
) | |
return transactions_df.to_json(orient="records") | |
except Exception as e: | |
logging.error(f"Error in ArbiscanFetchWhaleTransactionsTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class GetCurrentPriceInput(BaseModel): | |
"""Input for the get_current_price tool.""" | |
symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')") | |
class GeminiGetCurrentPriceTool(BaseTool): | |
"""Tool for getting current token price from Gemini.""" | |
name = "gemini_get_current_price" | |
description = "Get the current price of a token" | |
args_schema: Type[BaseModel] = GetCurrentPriceInput | |
def __init__(self, gemini_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if gemini_client: | |
set_global_clients(gemini_client=gemini_client) | |
def _run(self, symbol: str) -> str: | |
global _GLOBAL_GEMINI_CLIENT | |
if not _GLOBAL_GEMINI_CLIENT: | |
return json.dumps({"error": "Gemini client not initialized. Please set global client first."}) | |
try: | |
price = _GLOBAL_GEMINI_CLIENT.get_current_price(symbol) | |
return json.dumps({"symbol": symbol, "price": price}) | |
except Exception as e: | |
logging.error(f"Error in GeminiGetCurrentPriceTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class GetHistoricalPricesInput(BaseModel): | |
"""Input for the get_historical_prices tool.""" | |
symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')") | |
start_time: str = Field(..., description="Start datetime in ISO format") | |
end_time: str = Field(..., description="End datetime in ISO format") | |
class GeminiGetHistoricalPricesTool(BaseTool): | |
"""Tool for getting historical token prices from Gemini.""" | |
name = "gemini_get_historical_prices" | |
description = "Get historical prices for a token within a time range" | |
args_schema: Type[BaseModel] = GetHistoricalPricesInput | |
def __init__(self, gemini_client=None): | |
super().__init__() | |
# Store reference to client if provided, otherwise we'll use global instance | |
if gemini_client: | |
set_global_clients(gemini_client=gemini_client) | |
def _run( | |
self, | |
symbol: str, | |
start_time: Optional[str] = None, | |
end_time: Optional[str] = None, | |
interval: str = "15m" | |
) -> str: | |
global _GLOBAL_GEMINI_CLIENT | |
if not _GLOBAL_GEMINI_CLIENT: | |
return json.dumps({"error": "Gemini client not initialized. Please set global client first."}) | |
try: | |
# Convert string times to datetime if provided | |
start_dt = None | |
end_dt = None | |
if start_time: | |
start_dt = datetime.fromisoformat(start_time) | |
if end_time: | |
end_dt = datetime.fromisoformat(end_time) | |
prices = _GLOBAL_GEMINI_CLIENT.get_historical_prices( | |
symbol=symbol, | |
start_time=start_dt, | |
end_time=end_dt, | |
interval=interval | |
) | |
return json.dumps(prices) | |
except Exception as e: | |
logging.error(f"Error in GeminiGetHistoricalPricesTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class IdentifyPatternsInput(BaseModel): | |
"""Input for the identify_patterns tool.""" | |
transactions_json: str = Field(..., description="JSON string of transactions") | |
n_clusters: int = Field(3, description="Number of clusters for K-Means") | |
class DataProcessorIdentifyPatternsTool(BaseTool): | |
"""Tool for identifying trading patterns using the DataProcessor.""" | |
name = "data_processor_identify_patterns" | |
description = "Identify trading patterns in a set of transactions" | |
args_schema: Type[BaseModel] = IdentifyPatternsInput | |
def __init__(self, data_processor=None): | |
super().__init__() | |
# Store reference to processor if provided, otherwise we'll use global instance | |
if data_processor: | |
set_global_clients(data_processor=data_processor) | |
def _run(self, transactions_json: List[Dict[str, Any]], n_clusters: int = 3) -> str: | |
global _GLOBAL_DATA_PROCESSOR | |
if not _GLOBAL_DATA_PROCESSOR: | |
return json.dumps({"error": "Data processor not initialized. Please set global processor first."}) | |
try: | |
# Convert JSON to DataFrame | |
transactions_df = pd.DataFrame(transactions_json) | |
# Ensure required columns exist | |
required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol'] | |
for col in required_columns: | |
if col not in transactions_df.columns: | |
return json.dumps({ | |
"error": f"Missing required column: {col}", | |
"available_columns": list(transactions_df.columns) | |
}) | |
# Run pattern identification | |
patterns = _GLOBAL_DATA_PROCESSOR.identify_patterns( | |
transactions_df=transactions_df, | |
n_clusters=n_clusters | |
) | |
return json.dumps(patterns) | |
except Exception as e: | |
logging.error(f"Error in DataProcessorIdentifyPatternsTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
class DetectAnomalousTransactionsInput(BaseModel): | |
"""Input for the detect_anomalous_transactions tool.""" | |
transactions_json: str = Field(..., description="JSON string of transactions") | |
sensitivity: str = Field("Medium", description="Detection sensitivity ('Low', 'Medium', 'High')") | |
class DataProcessorDetectAnomalousTransactionsTool(BaseTool): | |
"""Tool for detecting anomalous transactions using the DataProcessor.""" | |
name = "data_processor_detect_anomalies" | |
description = "Detect anomalous transactions in a dataset" | |
args_schema: Type[BaseModel] = DetectAnomalousTransactionsInput | |
def __init__(self, data_processor=None): | |
super().__init__() | |
# Store reference to processor if provided, otherwise we'll use global instance | |
if data_processor: | |
set_global_clients(data_processor=data_processor) | |
def _run(self, transactions_json: List[Dict[str, Any]], sensitivity: str = "Medium") -> str: | |
global _GLOBAL_DATA_PROCESSOR | |
if not _GLOBAL_DATA_PROCESSOR: | |
return json.dumps({"error": "Data processor not initialized. Please set global processor first."}) | |
try: | |
# Convert JSON to DataFrame | |
transactions_df = pd.DataFrame(transactions_json) | |
# Ensure required columns exist | |
required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol'] | |
for col in required_columns: | |
if col not in transactions_df.columns: | |
return json.dumps({ | |
"error": f"Missing required column: {col}", | |
"available_columns": list(transactions_df.columns) | |
}) | |
# Run anomaly detection | |
anomalies = _GLOBAL_DATA_PROCESSOR.detect_anomalous_transactions( | |
transactions_df=transactions_df, | |
sensitivity=sensitivity | |
) | |
return json.dumps(anomalies) | |
except Exception as e: | |
logging.error(f"Error in DataProcessorDetectAnomalousTransactionsTool: {str(e)}") | |
return json.dumps({"error": str(e)}) | |