File size: 15,033 Bytes
011960a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
"""
Properly implemented tools for the WhaleAnalysisCrewSystem
"""

import json
import pandas as pd
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
from pydantic import BaseModel, Field
import logging

from modules.api_client import ArbiscanClient, GeminiClient
from modules.data_processor import DataProcessor
from langchain.tools import BaseTool


class GetTokenTransfersInput(BaseModel):
    """Input for the get_token_transfers tool."""
    address: str = Field(..., description="Wallet address to query")
    contract_address: Optional[str] = Field(None, description="Optional token contract address to filter by")


# Global clients that will be used by all tools
_GLOBAL_ARBISCAN_CLIENT = None
_GLOBAL_GEMINI_CLIENT = None
_GLOBAL_DATA_PROCESSOR = None

def set_global_clients(arbiscan_client=None, gemini_client=None, data_processor=None):
    """Set global client instances that will be used by all tools"""
    global _GLOBAL_ARBISCAN_CLIENT, _GLOBAL_GEMINI_CLIENT, _GLOBAL_DATA_PROCESSOR
    if arbiscan_client:
        _GLOBAL_ARBISCAN_CLIENT = arbiscan_client
    if gemini_client:
        _GLOBAL_GEMINI_CLIENT = gemini_client
    if data_processor:
        _GLOBAL_DATA_PROCESSOR = data_processor

class ArbiscanGetTokenTransfersTool(BaseTool):
    """Tool for fetching token transfers from Arbiscan."""
    name = "arbiscan_get_token_transfers"
    description = "Get ERC-20 token transfers for a specific address"
    args_schema: Type[BaseModel] = GetTokenTransfersInput
    
    def __init__(self, arbiscan_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if arbiscan_client:
            set_global_clients(arbiscan_client=arbiscan_client)
    
    def _run(self, address: str, contract_address: Optional[str] = None) -> str:
        global _GLOBAL_ARBISCAN_CLIENT
        
        if not _GLOBAL_ARBISCAN_CLIENT:
            return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."})
            
        try:
            transfers = _GLOBAL_ARBISCAN_CLIENT.get_token_transfers(
                address=address,
                contract_address=contract_address
            )
            return json.dumps(transfers)
        except Exception as e:
            logging.error(f"Error in ArbiscanGetTokenTransfersTool: {str(e)}")
            return json.dumps({"error": str(e)})


class GetNormalTransactionsInput(BaseModel):
    """Input for the get_normal_transactions tool."""
    address: str = Field(..., description="Wallet address to query")


class ArbiscanGetNormalTransactionsTool(BaseTool):
    """Tool for fetching normal transactions from Arbiscan."""
    name = "arbiscan_get_normal_transactions"
    description = "Get normal transactions (ETH/ARB transfers) for a specific address"
    args_schema: Type[BaseModel] = GetNormalTransactionsInput
    
    def __init__(self, arbiscan_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if arbiscan_client:
            set_global_clients(arbiscan_client=arbiscan_client)
    
    def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str:
        global _GLOBAL_ARBISCAN_CLIENT
        
        if not _GLOBAL_ARBISCAN_CLIENT:
            return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."})
            
        try:
            txs = _GLOBAL_ARBISCAN_CLIENT.get_normal_transactions(
                address=address,
                start_block=startblock,
                end_block=endblock,
                page=page,
                offset=offset
            )
            return json.dumps(txs)
        except Exception as e:
            logging.error(f"Error in ArbiscanGetNormalTransactionsTool: {str(e)}")
            return json.dumps({"error": str(e)})


class GetInternalTransactionsInput(BaseModel):
    """Input for the get_internal_transactions tool."""
    address: str = Field(..., description="Wallet address to query")


class ArbiscanGetInternalTransactionsTool(BaseTool):
    """Tool for fetching internal transactions from Arbiscan."""
    name = "arbiscan_get_internal_transactions"
    description = "Get internal transactions for a specific address"
    args_schema: Type[BaseModel] = GetInternalTransactionsInput
    
    def __init__(self, arbiscan_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if arbiscan_client:
            set_global_clients(arbiscan_client=arbiscan_client)
    
    def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str:
        global _GLOBAL_ARBISCAN_CLIENT
        
        if not _GLOBAL_ARBISCAN_CLIENT:
            return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."})
            
        try:
            txs = _GLOBAL_ARBISCAN_CLIENT.get_internal_transactions(
                address=address,
                start_block=startblock,
                end_block=endblock,
                page=page,
                offset=offset
            )
            return json.dumps(txs)
        except Exception as e:
            logging.error(f"Error in ArbiscanGetInternalTransactionsTool: {str(e)}")
            return json.dumps({"error": str(e)})


class FetchWhaleTransactionsInput(BaseModel):
    """Input for the fetch_whale_transactions tool."""
    addresses: List[str] = Field(..., description="List of wallet addresses to query")
    token_address: Optional[str] = Field(None, description="Optional token contract address to filter by")
    min_token_amount: Optional[float] = Field(None, description="Minimum token amount")
    min_usd_value: Optional[float] = Field(None, description="Minimum USD value")


class ArbiscanFetchWhaleTransactionsTool(BaseTool):
    """Tool for fetching whale transactions from Arbiscan."""
    name = "arbiscan_fetch_whale_transactions"
    description = "Fetch whale transactions for a list of addresses"
    args_schema: Type[BaseModel] = FetchWhaleTransactionsInput
    
    def __init__(self, arbiscan_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if arbiscan_client:
            set_global_clients(arbiscan_client=arbiscan_client)
    
    def _run(self, addresses: List[str], token_address: Optional[str] = None, 
              min_token_amount: Optional[float] = None, min_usd_value: Optional[float] = None) -> str:
        global _GLOBAL_ARBISCAN_CLIENT
        
        if not _GLOBAL_ARBISCAN_CLIENT:
            return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."})
            
        try:
            transactions_df = _GLOBAL_ARBISCAN_CLIENT.fetch_whale_transactions(
                addresses=addresses,
                token_address=token_address,
                min_token_amount=min_token_amount,
                min_usd_value=min_usd_value,
                max_pages=5  # Limit to 5 pages to prevent excessive API calls
            )
            return transactions_df.to_json(orient="records")
        except Exception as e:
            logging.error(f"Error in ArbiscanFetchWhaleTransactionsTool: {str(e)}")
            return json.dumps({"error": str(e)})


class GetCurrentPriceInput(BaseModel):
    """Input for the get_current_price tool."""
    symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')")


class GeminiGetCurrentPriceTool(BaseTool):
    """Tool for getting current token price from Gemini."""
    name = "gemini_get_current_price"
    description = "Get the current price of a token"
    args_schema: Type[BaseModel] = GetCurrentPriceInput
    
    def __init__(self, gemini_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if gemini_client:
            set_global_clients(gemini_client=gemini_client)
    
    def _run(self, symbol: str) -> str:
        global _GLOBAL_GEMINI_CLIENT
        
        if not _GLOBAL_GEMINI_CLIENT:
            return json.dumps({"error": "Gemini client not initialized. Please set global client first."})
            
        try:
            price = _GLOBAL_GEMINI_CLIENT.get_current_price(symbol)
            return json.dumps({"symbol": symbol, "price": price})
        except Exception as e:
            logging.error(f"Error in GeminiGetCurrentPriceTool: {str(e)}")
            return json.dumps({"error": str(e)})


class GetHistoricalPricesInput(BaseModel):
    """Input for the get_historical_prices tool."""
    symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')")
    start_time: str = Field(..., description="Start datetime in ISO format")
    end_time: str = Field(..., description="End datetime in ISO format")


class GeminiGetHistoricalPricesTool(BaseTool):
    """Tool for getting historical token prices from Gemini."""
    name = "gemini_get_historical_prices"
    description = "Get historical prices for a token within a time range"
    args_schema: Type[BaseModel] = GetHistoricalPricesInput
    
    def __init__(self, gemini_client=None):
        super().__init__()
        # Store reference to client if provided, otherwise we'll use global instance
        if gemini_client:
            set_global_clients(gemini_client=gemini_client)
    
    def _run(
        self,
        symbol: str,
        start_time: Optional[str] = None,
        end_time: Optional[str] = None,
        interval: str = "15m"
    ) -> str:
        global _GLOBAL_GEMINI_CLIENT
        
        if not _GLOBAL_GEMINI_CLIENT:
            return json.dumps({"error": "Gemini client not initialized. Please set global client first."})
            
        try:
            # Convert string times to datetime if provided
            start_dt = None
            end_dt = None
            
            if start_time:
                start_dt = datetime.fromisoformat(start_time)
            if end_time:
                end_dt = datetime.fromisoformat(end_time)
                
            prices = _GLOBAL_GEMINI_CLIENT.get_historical_prices(
                symbol=symbol,
                start_time=start_dt,
                end_time=end_dt,
                interval=interval
            )
            
            return json.dumps(prices)
        except Exception as e:
            logging.error(f"Error in GeminiGetHistoricalPricesTool: {str(e)}")
            return json.dumps({"error": str(e)})


class IdentifyPatternsInput(BaseModel):
    """Input for the identify_patterns tool."""
    transactions_json: str = Field(..., description="JSON string of transactions")
    n_clusters: int = Field(3, description="Number of clusters for K-Means")


class DataProcessorIdentifyPatternsTool(BaseTool):
    """Tool for identifying trading patterns using the DataProcessor."""
    name = "data_processor_identify_patterns"
    description = "Identify trading patterns in a set of transactions"
    args_schema: Type[BaseModel] = IdentifyPatternsInput
    
    def __init__(self, data_processor=None):
        super().__init__()
        # Store reference to processor if provided, otherwise we'll use global instance
        if data_processor:
            set_global_clients(data_processor=data_processor)
    
    def _run(self, transactions_json: List[Dict[str, Any]], n_clusters: int = 3) -> str:
        global _GLOBAL_DATA_PROCESSOR
        
        if not _GLOBAL_DATA_PROCESSOR:
            return json.dumps({"error": "Data processor not initialized. Please set global processor first."})
            
        try:
            # Convert JSON to DataFrame
            transactions_df = pd.DataFrame(transactions_json)
            
            # Ensure required columns exist
            required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol']
            for col in required_columns:
                if col not in transactions_df.columns:
                    return json.dumps({
                        "error": f"Missing required column: {col}",
                        "available_columns": list(transactions_df.columns)
                    })
            
            # Run pattern identification
            patterns = _GLOBAL_DATA_PROCESSOR.identify_patterns(
                transactions_df=transactions_df,
                n_clusters=n_clusters
            )
            
            return json.dumps(patterns)
        except Exception as e:
            logging.error(f"Error in DataProcessorIdentifyPatternsTool: {str(e)}")
            return json.dumps({"error": str(e)})


class DetectAnomalousTransactionsInput(BaseModel):
    """Input for the detect_anomalous_transactions tool."""
    transactions_json: str = Field(..., description="JSON string of transactions")
    sensitivity: str = Field("Medium", description="Detection sensitivity ('Low', 'Medium', 'High')")


class DataProcessorDetectAnomalousTransactionsTool(BaseTool):
    """Tool for detecting anomalous transactions using the DataProcessor."""
    name = "data_processor_detect_anomalies"
    description = "Detect anomalous transactions in a dataset"
    args_schema: Type[BaseModel] = DetectAnomalousTransactionsInput
    
    def __init__(self, data_processor=None):
        super().__init__()
        # Store reference to processor if provided, otherwise we'll use global instance
        if data_processor:
            set_global_clients(data_processor=data_processor)
    
    def _run(self, transactions_json: List[Dict[str, Any]], sensitivity: str = "Medium") -> str:
        global _GLOBAL_DATA_PROCESSOR
        
        if not _GLOBAL_DATA_PROCESSOR:
            return json.dumps({"error": "Data processor not initialized. Please set global processor first."})
            
        try:
            # Convert JSON to DataFrame
            transactions_df = pd.DataFrame(transactions_json)
            
            # Ensure required columns exist
            required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol']
            for col in required_columns:
                if col not in transactions_df.columns:
                    return json.dumps({
                        "error": f"Missing required column: {col}",
                        "available_columns": list(transactions_df.columns)
                    })
            
            # Run anomaly detection
            anomalies = _GLOBAL_DATA_PROCESSOR.detect_anomalous_transactions(
                transactions_df=transactions_df,
                sensitivity=sensitivity
            )
            
            return json.dumps(anomalies)
        except Exception as e:
            logging.error(f"Error in DataProcessorDetectAnomalousTransactionsTool: {str(e)}")
            return json.dumps({"error": str(e)})