Spaces:
Build error
Build error
import json | |
import pandas as pd | |
from datetime import datetime | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from langchain.tools import tool | |
from modules.api_client import ArbiscanClient, GeminiClient | |
from modules.data_processor import DataProcessor | |
# Tools for Arbiscan API | |
class ArbiscanTools: | |
def __init__(self, arbiscan_client: ArbiscanClient): | |
self.client = arbiscan_client | |
def get_token_transfers(self, address: str, contract_address: Optional[str] = None) -> str: | |
""" | |
Get ERC-20 token transfers for a specific address | |
Args: | |
address: Wallet address | |
contract_address: Optional token contract address to filter by | |
Returns: | |
List of token transfers as JSON string | |
""" | |
transfers = self.client.get_token_transfers( | |
address=address, | |
contract_address=contract_address | |
) | |
return json.dumps(transfers) | |
def get_token_balance(self, address: str, contract_address: str) -> str: | |
""" | |
Get the current balance of a specific token for an address | |
Args: | |
address: Wallet address | |
contract_address: Token contract address | |
Returns: | |
Token balance | |
""" | |
balance = self.client.get_token_balance( | |
address=address, | |
contract_address=contract_address | |
) | |
return balance | |
def get_normal_transactions(self, address: str) -> str: | |
""" | |
Get normal transactions (ETH/ARB transfers) for a specific address | |
Args: | |
address: Wallet address | |
Returns: | |
List of normal transactions as JSON string | |
""" | |
transactions = self.client.get_normal_transactions(address=address) | |
return json.dumps(transactions) | |
def get_internal_transactions(self, address: str) -> str: | |
""" | |
Get internal transactions for a specific address | |
Args: | |
address: Wallet address | |
Returns: | |
List of internal transactions as JSON string | |
""" | |
transactions = self.client.get_internal_transactions(address=address) | |
return json.dumps(transactions) | |
def fetch_whale_transactions(self, | |
addresses: List[str], | |
token_address: Optional[str] = None, | |
min_token_amount: Optional[float] = None, | |
min_usd_value: Optional[float] = None) -> str: | |
""" | |
Fetch whale transactions for a list of addresses | |
Args: | |
addresses: List of wallet addresses | |
token_address: Optional token contract address to filter by | |
min_token_amount: Minimum token amount | |
min_usd_value: Minimum USD value | |
Returns: | |
DataFrame of whale transactions as JSON string | |
""" | |
transactions_df = self.client.fetch_whale_transactions( | |
addresses=addresses, | |
token_address=token_address, | |
min_token_amount=min_token_amount, | |
min_usd_value=min_usd_value | |
) | |
return transactions_df.to_json(orient="records") | |
# Tools for Gemini API | |
class GeminiTools: | |
def __init__(self, gemini_client: GeminiClient): | |
self.client = gemini_client | |
def get_current_price(self, symbol: str) -> str: | |
""" | |
Get the current price of a token | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
Returns: | |
Current price | |
""" | |
price = self.client.get_current_price(symbol=symbol) | |
return str(price) if price is not None else "Price not found" | |
def get_historical_prices(self, | |
symbol: str, | |
start_time: str, | |
end_time: str) -> str: | |
""" | |
Get historical prices for a token within a time range | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
start_time: Start datetime in ISO format | |
end_time: End datetime in ISO format | |
Returns: | |
DataFrame of historical prices as JSON string | |
""" | |
# Parse datetime strings | |
start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) | |
end_time_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) | |
prices_df = self.client.get_historical_prices( | |
symbol=symbol, | |
start_time=start_time_dt, | |
end_time=end_time_dt | |
) | |
if prices_df is not None: | |
return prices_df.to_json(orient="records") | |
else: | |
return "[]" | |
def get_price_impact(self, | |
symbol: str, | |
transaction_time: str, | |
lookback_minutes: int = 5, | |
lookahead_minutes: int = 5) -> str: | |
""" | |
Analyze the price impact before and after a transaction | |
Args: | |
symbol: Token symbol (e.g., "ETHUSD") | |
transaction_time: Transaction datetime in ISO format | |
lookback_minutes: Minutes to look back before the transaction | |
lookahead_minutes: Minutes to look ahead after the transaction | |
Returns: | |
Price impact data as JSON string | |
""" | |
# Parse datetime string | |
transaction_time_dt = datetime.fromisoformat(transaction_time.replace('Z', '+00:00')) | |
impact_data = self.client.get_price_impact( | |
symbol=symbol, | |
transaction_time=transaction_time_dt, | |
lookback_minutes=lookback_minutes, | |
lookahead_minutes=lookahead_minutes | |
) | |
# Convert to JSON string | |
result = { | |
"pre_price": impact_data["pre_price"], | |
"post_price": impact_data["post_price"], | |
"impact_pct": impact_data["impact_pct"] | |
} | |
return json.dumps(result) | |
# Tools for Data Processor | |
class DataProcessorTools: | |
def __init__(self, data_processor: DataProcessor): | |
self.processor = data_processor | |
def aggregate_transactions(self, | |
transactions_json: str, | |
time_window: str = 'D') -> str: | |
""" | |
Aggregate transactions by time window | |
Args: | |
transactions_json: JSON string of transactions | |
time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) | |
Returns: | |
Aggregated DataFrame as JSON string | |
""" | |
# Convert JSON to DataFrame | |
transactions_df = pd.read_json(transactions_json) | |
# Process data | |
agg_df = self.processor.aggregate_transactions( | |
transactions_df=transactions_df, | |
time_window=time_window | |
) | |
# Convert result to JSON | |
return agg_df.to_json(orient="records") | |
def identify_patterns(self, | |
transactions_json: str, | |
n_clusters: int = 3) -> str: | |
""" | |
Identify trading patterns using clustering | |
Args: | |
transactions_json: JSON string of transactions | |
n_clusters: Number of clusters for K-Means | |
Returns: | |
List of pattern dictionaries as JSON string | |
""" | |
# Convert JSON to DataFrame | |
transactions_df = pd.read_json(transactions_json) | |
# Process data | |
patterns = self.processor.identify_patterns( | |
transactions_df=transactions_df, | |
n_clusters=n_clusters | |
) | |
# Convert result to JSON | |
result = [] | |
for pattern in patterns: | |
# Convert non-serializable objects to serializable format | |
pattern_json = { | |
"name": pattern["name"], | |
"description": pattern["description"], | |
"cluster_id": pattern["cluster_id"], | |
"occurrence_count": pattern["occurrence_count"], | |
"confidence": pattern["confidence"], | |
# Skip chart_data as it's not JSON serializable | |
"examples": pattern["examples"].to_json(orient="records") if isinstance(pattern["examples"], pd.DataFrame) else [] | |
} | |
result.append(pattern_json) | |
return json.dumps(result) | |
def detect_anomalous_transactions(self, | |
transactions_json: str, | |
sensitivity: str = "Medium") -> str: | |
""" | |
Detect anomalous transactions using statistical methods | |
Args: | |
transactions_json: JSON string of transactions | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
DataFrame of anomalous transactions as JSON string | |
""" | |
# Convert JSON to DataFrame | |
transactions_df = pd.read_json(transactions_json) | |
# Process data | |
anomalies_df = self.processor.detect_anomalous_transactions( | |
transactions_df=transactions_df, | |
sensitivity=sensitivity | |
) | |
# Convert result to JSON | |
return anomalies_df.to_json(orient="records") | |
def analyze_price_impact(self, | |
transactions_json: str, | |
price_data_json: str) -> str: | |
""" | |
Analyze the price impact of transactions | |
Args: | |
transactions_json: JSON string of transactions | |
price_data_json: JSON string of price impact data | |
Returns: | |
Price impact analysis as JSON string | |
""" | |
# Convert JSON to DataFrame | |
transactions_df = pd.read_json(transactions_json) | |
# Convert price_data_json to dictionary | |
price_data = json.loads(price_data_json) | |
# Process data | |
impact_analysis = self.processor.analyze_price_impact( | |
transactions_df=transactions_df, | |
price_data=price_data | |
) | |
# Convert result to JSON (excluding non-serializable objects) | |
result = { | |
"avg_impact_pct": impact_analysis.get("avg_impact_pct"), | |
"max_impact_pct": impact_analysis.get("max_impact_pct"), | |
"min_impact_pct": impact_analysis.get("min_impact_pct"), | |
"significant_moves_count": impact_analysis.get("significant_moves_count"), | |
"total_transactions": impact_analysis.get("total_transactions"), | |
# Skip impact_chart as it's not JSON serializable | |
"transactions_with_impact": impact_analysis.get("transactions_with_impact").to_json(orient="records") if "transactions_with_impact" in impact_analysis else [] | |
} | |
return json.dumps(result) | |
def detect_wash_trading(self, | |
transactions_json: str, | |
addresses_json: str, | |
sensitivity: str = "Medium") -> str: | |
""" | |
Detect potential wash trading between addresses | |
Args: | |
transactions_json: JSON string of transactions | |
addresses_json: JSON string of addresses to analyze | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential wash trading incidents as JSON string | |
""" | |
# Convert JSON to DataFrame | |
transactions_df = pd.read_json(transactions_json) | |
# Convert addresses_json to list | |
addresses = json.loads(addresses_json) | |
# Process data | |
wash_trades = self.processor.detect_wash_trading( | |
transactions_df=transactions_df, | |
addresses=addresses, | |
sensitivity=sensitivity | |
) | |
# Convert result to JSON (excluding non-serializable objects) | |
result = [] | |
for trade in wash_trades: | |
trade_json = { | |
"type": trade["type"], | |
"addresses": trade["addresses"], | |
"risk_level": trade["risk_level"], | |
"description": trade["description"], | |
"detection_time": trade["detection_time"], | |
"title": trade["title"], | |
"evidence": trade["evidence"].to_json(orient="records") if isinstance(trade["evidence"], pd.DataFrame) else [] | |
# Skip chart as it's not JSON serializable | |
} | |
result.append(trade_json) | |
return json.dumps(result) | |