Spaces:
Build error
Build error
import os | |
import logging | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
import pandas as pd | |
from datetime import datetime, timedelta | |
import io | |
import base64 | |
from crewai import Agent, Task, Crew, Process | |
from langchain.tools import BaseTool | |
from langchain.chat_models import ChatOpenAI | |
from modules.api_client import ArbiscanClient, GeminiClient | |
from modules.data_processor import DataProcessor | |
from modules.crew_tools import ( | |
ArbiscanGetTokenTransfersTool, | |
ArbiscanGetNormalTransactionsTool, | |
ArbiscanGetInternalTransactionsTool, | |
ArbiscanFetchWhaleTransactionsTool, | |
GeminiGetCurrentPriceTool, | |
GeminiGetHistoricalPricesTool, | |
DataProcessorIdentifyPatternsTool, | |
DataProcessorDetectAnomalousTransactionsTool, | |
set_global_clients | |
) | |
class WhaleAnalysisCrewSystem: | |
""" | |
CrewAI system for analyzing whale wallet activity and detecting market manipulation | |
""" | |
def __init__(self, arbiscan_client: ArbiscanClient, gemini_client: GeminiClient, data_processor: DataProcessor): | |
self.arbiscan_client = arbiscan_client | |
self.gemini_client = gemini_client | |
self.data_processor = data_processor | |
# Initialize LLM | |
try: | |
from langchain.chat_models import ChatOpenAI | |
self.llm = ChatOpenAI( | |
model="gpt-4", | |
temperature=0.2, | |
api_key=os.getenv("OPENAI_API_KEY") | |
) | |
except Exception as e: | |
logging.warning(f"Could not initialize LLM: {str(e)}") | |
self.llm = None | |
# Use a factory method to safely create tool instances | |
self.setup_tools() | |
def setup_tools(self): | |
"""Setup LangChain tools for the whale analysis crew""" | |
try: | |
# Setup clients | |
arbiscan_client = ArbiscanClient(api_key=os.getenv("ARBISCAN_API_KEY")) | |
gemini_client = GeminiClient(api_key=os.getenv("GEMINI_API_KEY")) | |
data_processor = DataProcessor() | |
# Set global clients first | |
set_global_clients( | |
arbiscan_client=arbiscan_client, | |
gemini_client=gemini_client, | |
data_processor=data_processor | |
) | |
# Create tools (no need to pass clients, they'll use globals) | |
self.arbiscan_tools = [ | |
self._create_tool(ArbiscanGetTokenTransfersTool), | |
self._create_tool(ArbiscanGetNormalTransactionsTool), | |
self._create_tool(ArbiscanGetInternalTransactionsTool), | |
self._create_tool(ArbiscanFetchWhaleTransactionsTool) | |
] | |
self.gemini_tools = [ | |
self._create_tool(GeminiGetCurrentPriceTool), | |
self._create_tool(GeminiGetHistoricalPricesTool) | |
] | |
self.data_processor_tools = [ | |
self._create_tool(DataProcessorIdentifyPatternsTool), | |
self._create_tool(DataProcessorDetectAnomalousTransactionsTool) | |
] | |
logging.info(f"Successfully created {len(self.arbiscan_tools + self.gemini_tools + self.data_processor_tools)} tools") | |
except Exception as e: | |
logging.error(f"Error setting up tools: {str(e)}") | |
raise Exception(f"Error setting up tools: {str(e)}") | |
def _create_tool(self, tool_class, *args, **kwargs): | |
"""Factory method to safely create a tool with proper error handling""" | |
try: | |
tool = tool_class(*args, **kwargs) | |
return tool | |
except Exception as e: | |
logging.error(f"Failed to create tool {tool_class.__name__}: {str(e)}") | |
raise Exception(f"Failed to create tool {tool_class.__name__}: {str(e)}") | |
def create_agents(self): | |
"""Create the agents for the crew""" | |
# Data Collection Agent | |
data_collector = Agent( | |
role="Blockchain Data Collector", | |
goal="Collect comprehensive whale transaction data from the blockchain", | |
backstory="""You are a blockchain analytics expert specialized in extracting and | |
organizing on-chain data from the Arbitrum network. You have deep knowledge of blockchain | |
transaction structures and can efficiently query APIs to gather relevant whale activity.""", | |
verbose=True, | |
allow_delegation=True, | |
tools=self.arbiscan_tools, | |
llm=self.llm | |
) | |
# Price Analysis Agent | |
price_analyst = Agent( | |
role="Price Impact Analyst", | |
goal="Analyze how whale transactions impact token prices", | |
backstory="""You are a quantitative market analyst with expertise in correlating | |
trading activity with price movements. You specialize in detecting how large trades | |
influence market dynamics, and can identify unusual price patterns.""", | |
verbose=True, | |
allow_delegation=True, | |
tools=self.gemini_tools, | |
llm=self.llm | |
) | |
# Pattern Detection Agent | |
pattern_detector = Agent( | |
role="Trading Pattern Detector", | |
goal="Identify recurring behavior patterns in whale trading activity", | |
backstory="""You are a data scientist specialized in time-series analysis and behavioral | |
pattern recognition. You excel at spotting cyclical behaviors, correlation patterns, and | |
anomalous trading activities across multiple addresses.""", | |
verbose=True, | |
allow_delegation=True, | |
tools=self.data_processor_tools, | |
llm=self.llm | |
) | |
# Manipulation Detector Agent | |
manipulation_detector = Agent( | |
role="Market Manipulation Investigator", | |
goal="Detect potential market manipulation in whale activity", | |
backstory="""You are a financial forensics expert who has studied market manipulation | |
techniques for years. You can identify pump-and-dump schemes, wash trading, spoofing, | |
and other deceptive practices used by whale traders to manipulate market prices.""", | |
verbose=True, | |
allow_delegation=True, | |
tools=self.data_processor_tools, | |
llm=self.llm | |
) | |
# Report Generator Agent | |
report_generator = Agent( | |
role="Insights Reporter", | |
goal="Create comprehensive, actionable reports on whale activity", | |
backstory="""You are a financial data storyteller who excels at transforming complex | |
blockchain data into clear, insightful narratives. You can distill technical findings | |
into actionable intelligence for different audiences.""", | |
verbose=True, | |
allow_delegation=True, | |
tools=[], | |
llm=self.llm | |
) | |
return { | |
"data_collector": data_collector, | |
"price_analyst": price_analyst, | |
"pattern_detector": pattern_detector, | |
"manipulation_detector": manipulation_detector, | |
"report_generator": report_generator | |
} | |
def track_large_transactions(self, | |
wallets: List[str], | |
start_date: datetime, | |
end_date: datetime, | |
threshold_value: float, | |
threshold_type: str, | |
token_symbol: Optional[str] = None) -> pd.DataFrame: | |
""" | |
Track large buy/sell transactions for specified wallets | |
Args: | |
wallets: List of wallet addresses to track | |
start_date: Start date for analysis | |
end_date: End date for analysis | |
threshold_value: Minimum value for transaction tracking | |
threshold_type: Type of threshold ("Token Amount" or "USD Value") | |
token_symbol: Symbol of token to track (only required if threshold_type is "Token Amount") | |
Returns: | |
DataFrame of large transactions | |
""" | |
agents = self.create_agents() | |
# Define tasks | |
data_collection_task = Task( | |
description=f""" | |
Collect all transactions for the following wallets: {', '.join(wallets)} | |
between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
Filter for transactions {'of ' + token_symbol if token_symbol else ''} with a | |
{'token amount greater than ' + str(threshold_value) if threshold_type == 'Token Amount' | |
else 'USD value greater than $' + str(threshold_value)}. | |
Return the data in a well-structured format with timestamp, transaction hash, | |
sender, recipient, token symbol, and amount. | |
""", | |
agent=agents["data_collector"], | |
expected_output=""" | |
A comprehensive dataset of all large transactions for the specified wallets, | |
properly filtered according to the threshold criteria. | |
""" | |
) | |
# Create and run the crew | |
crew = Crew( | |
agents=[agents["data_collector"]], | |
tasks=[data_collection_task], | |
verbose=2, | |
process=Process.sequential | |
) | |
result = crew.kickoff() | |
# Process the result | |
import json | |
try: | |
# Try to extract JSON from the result | |
import re | |
json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
if json_match: | |
json_str = json_match.group(1) | |
transactions_data = json.loads(json_str) | |
if isinstance(transactions_data, list): | |
return pd.DataFrame(transactions_data) | |
else: | |
return pd.DataFrame() | |
else: | |
# Try to parse the entire result as JSON | |
transactions_data = json.loads(result) | |
if isinstance(transactions_data, list): | |
return pd.DataFrame(transactions_data) | |
else: | |
return pd.DataFrame() | |
except: | |
# Fallback to querying the API directly | |
token_address = None # Would need a mapping of symbol to address | |
transactions_df = self.arbiscan_client.fetch_whale_transactions( | |
addresses=wallets, | |
token_address=token_address, | |
min_token_amount=threshold_value if threshold_type == "Token Amount" else None, | |
min_usd_value=threshold_value if threshold_type == "USD Value" else None | |
) | |
return transactions_df | |
def identify_trading_patterns(self, | |
wallets: List[str], | |
start_date: datetime, | |
end_date: datetime) -> List[Dict[str, Any]]: | |
""" | |
Identify trading patterns for specified wallets | |
Args: | |
wallets: List of wallet addresses to analyze | |
start_date: Start date for analysis | |
end_date: End date for analysis | |
Returns: | |
List of identified patterns | |
""" | |
agents = self.create_agents() | |
# Define tasks | |
data_collection_task = Task( | |
description=f""" | |
Collect all transactions for the following wallets: {', '.join(wallets)} | |
between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
Include all token transfers, regardless of size. | |
""", | |
agent=agents["data_collector"], | |
expected_output=""" | |
A comprehensive dataset of all transactions for the specified wallets. | |
""" | |
) | |
pattern_analysis_task = Task( | |
description=""" | |
Analyze the transaction data to identify recurring trading patterns. | |
Look for: | |
1. Cyclical buying/selling behaviors | |
2. Time-of-day patterns | |
3. Accumulation/distribution phases | |
4. Coordinated movements across multiple addresses | |
Cluster similar behaviors and describe each pattern identified. | |
""", | |
agent=agents["pattern_detector"], | |
expected_output=""" | |
A detailed analysis of trading patterns with: | |
- Pattern name/type | |
- Description of behavior | |
- Frequency and confidence level | |
- Example transactions showing the pattern | |
""", | |
context=[data_collection_task] | |
) | |
# Create and run the crew | |
crew = Crew( | |
agents=[agents["data_collector"], agents["pattern_detector"]], | |
tasks=[data_collection_task, pattern_analysis_task], | |
verbose=2, | |
process=Process.sequential | |
) | |
result = crew.kickoff() | |
# Process the result | |
import json | |
try: | |
# Try to extract JSON from the result | |
import re | |
json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
if json_match: | |
json_str = json_match.group(1) | |
patterns_data = json.loads(json_str) | |
# Convert the patterns to the expected format | |
return self._convert_patterns_to_visual_format(patterns_data) | |
else: | |
# Fallback to a simple pattern analysis | |
# First, get transaction data directly | |
all_transactions = [] | |
for wallet in wallets: | |
transfers = self.arbiscan_client.fetch_all_token_transfers( | |
address=wallet | |
) | |
all_transactions.extend(transfers) | |
if not all_transactions: | |
return [] | |
transactions_df = pd.DataFrame(all_transactions) | |
# Use data processor to identify patterns | |
patterns = self.data_processor.identify_patterns(transactions_df) | |
return patterns | |
except Exception as e: | |
print(f"Error processing patterns: {str(e)}") | |
return [] | |
def _convert_patterns_to_visual_format(self, patterns_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
""" | |
Convert pattern data from agents to visual format with charts | |
Args: | |
patterns_data: Pattern data from agents | |
Returns: | |
List of patterns with visualizations | |
""" | |
visual_patterns = [] | |
for pattern in patterns_data: | |
# Create chart | |
if 'examples' in pattern and pattern['examples']: | |
examples_data = [] | |
# Check if examples is a JSON string | |
if isinstance(pattern['examples'], str): | |
try: | |
examples_data = pd.read_json(pattern['examples']) | |
except: | |
examples_data = pd.DataFrame() | |
else: | |
examples_data = pd.DataFrame(pattern['examples']) | |
# Create visualization | |
if not examples_data.empty: | |
import plotly.express as px | |
# Check for timestamp column | |
if 'Timestamp' in examples_data.columns: | |
time_col = 'Timestamp' | |
elif 'timeStamp' in examples_data.columns: | |
time_col = 'timeStamp' | |
else: | |
time_col = None | |
# Check for amount column | |
if 'Amount' in examples_data.columns: | |
amount_col = 'Amount' | |
elif 'tokenAmount' in examples_data.columns: | |
amount_col = 'tokenAmount' | |
elif 'value' in examples_data.columns: | |
amount_col = 'value' | |
else: | |
amount_col = None | |
if time_col and amount_col: | |
# Create time series chart | |
fig = px.line( | |
examples_data, | |
x=time_col, | |
y=amount_col, | |
title=f"Pattern: {pattern['name']}" | |
) | |
else: | |
fig = None | |
else: | |
fig = None | |
else: | |
fig = None | |
examples_data = pd.DataFrame() | |
# Create visual pattern object | |
visual_pattern = { | |
"name": pattern.get("name", "Unknown Pattern"), | |
"description": pattern.get("description", ""), | |
"confidence": pattern.get("confidence", 0.5), | |
"occurrence_count": pattern.get("occurrence_count", 0), | |
"chart_data": fig, | |
"examples": examples_data | |
} | |
visual_patterns.append(visual_pattern) | |
return visual_patterns | |
def analyze_price_impact(self, | |
wallets: List[str], | |
start_date: datetime, | |
end_date: datetime, | |
lookback_minutes: int = 5, | |
lookahead_minutes: int = 5) -> Dict[str, Any]: | |
""" | |
Analyze the impact of whale transactions on token prices | |
Args: | |
wallets: List of wallet addresses to analyze | |
start_date: Start date for analysis | |
end_date: End date for analysis | |
lookback_minutes: Minutes to look back before transactions | |
lookahead_minutes: Minutes to look ahead after transactions | |
Returns: | |
Dictionary with price impact analysis | |
""" | |
agents = self.create_agents() | |
# Define tasks | |
data_collection_task = Task( | |
description=f""" | |
Collect all transactions for the following wallets: {', '.join(wallets)} | |
between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
Focus on large transactions that might impact price. | |
""", | |
agent=agents["data_collector"], | |
expected_output=""" | |
A comprehensive dataset of all significant transactions for the specified wallets. | |
""" | |
) | |
price_impact_task = Task( | |
description=f""" | |
Analyze the price impact of the whale transactions. | |
For each transaction: | |
1. Fetch price data for {lookback_minutes} minutes before and {lookahead_minutes} minutes after the transaction | |
2. Calculate the percentage price change | |
3. Identify transactions that caused significant price moves | |
Summarize the overall price impact statistics and highlight notable instances. | |
""", | |
agent=agents["price_analyst"], | |
expected_output=""" | |
A detailed analysis of price impacts with: | |
- Average price impact percentage | |
- Maximum price impact (positive and negative) | |
- Count of significant price moves | |
- List of transactions with their corresponding price impacts | |
""", | |
context=[data_collection_task] | |
) | |
# Create and run the crew | |
crew = Crew( | |
agents=[agents["data_collector"], agents["price_analyst"]], | |
tasks=[data_collection_task, price_impact_task], | |
verbose=2, | |
process=Process.sequential | |
) | |
result = crew.kickoff() | |
# Process the result | |
import json | |
try: | |
# Try to extract JSON from the result | |
import re | |
json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
if json_match: | |
json_str = json_match.group(1) | |
impact_data = json.loads(json_str) | |
# Convert the impact data to visual format | |
return self._convert_impact_to_visual_format(impact_data) | |
else: | |
# Fallback to direct calculation | |
# First, get transaction data | |
all_transactions = [] | |
for wallet in wallets: | |
transfers = self.arbiscan_client.fetch_all_token_transfers( | |
address=wallet | |
) | |
all_transactions.extend(transfers) | |
if not all_transactions: | |
return {} | |
transactions_df = pd.DataFrame(all_transactions) | |
# Calculate price impact for each transaction | |
price_data = {} | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('hash', '') | |
if not tx_hash: | |
continue | |
# Get symbol | |
symbol = row.get('tokenSymbol', '') | |
if not symbol: | |
continue | |
# Get timestamp | |
timestamp = row.get('timeStamp', 0) | |
if not timestamp: | |
continue | |
# Convert timestamp to datetime | |
if isinstance(timestamp, (int, float)): | |
tx_time = datetime.fromtimestamp(int(timestamp)) | |
else: | |
tx_time = timestamp | |
# Get price impact | |
symbol_usd = f"{symbol}USD" | |
impact = self.gemini_client.get_price_impact( | |
symbol=symbol_usd, | |
transaction_time=tx_time, | |
lookback_minutes=lookback_minutes, | |
lookahead_minutes=lookahead_minutes | |
) | |
price_data[tx_hash] = impact | |
# Use data processor to analyze price impact | |
impact_analysis = self.data_processor.analyze_price_impact( | |
transactions_df=transactions_df, | |
price_data=price_data | |
) | |
return impact_analysis | |
except Exception as e: | |
print(f"Error processing price impact: {str(e)}") | |
return {} | |
def _convert_impact_to_visual_format(self, impact_data: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Convert price impact data to visual format with charts | |
Args: | |
impact_data: Price impact data | |
Returns: | |
Dictionary with price impact analysis and visualizations | |
""" | |
# Convert transactions_with_impact to DataFrame if it's a string | |
if 'transactions_with_impact' in impact_data and isinstance(impact_data['transactions_with_impact'], str): | |
try: | |
transactions_df = pd.read_json(impact_data['transactions_with_impact']) | |
except: | |
transactions_df = pd.DataFrame() | |
elif 'transactions_with_impact' in impact_data and isinstance(impact_data['transactions_with_impact'], list): | |
transactions_df = pd.DataFrame(impact_data['transactions_with_impact']) | |
else: | |
transactions_df = pd.DataFrame() | |
# Create impact chart | |
if not transactions_df.empty and 'impact_pct' in transactions_df.columns and 'Timestamp' in transactions_df.columns: | |
import plotly.graph_objects as go | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=transactions_df['Timestamp'], | |
y=transactions_df['impact_pct'], | |
mode='markers+lines', | |
name='Price Impact (%)', | |
marker=dict( | |
size=10, | |
color=transactions_df['impact_pct'], | |
colorscale='RdBu', | |
cmin=-max(abs(transactions_df['impact_pct'])) if len(transactions_df) > 0 else -1, | |
cmax=max(abs(transactions_df['impact_pct'])) if len(transactions_df) > 0 else 1, | |
colorbar=dict(title='Impact %'), | |
symbol='circle' | |
) | |
)) | |
fig.update_layout( | |
title='Price Impact of Whale Transactions', | |
xaxis_title='Timestamp', | |
yaxis_title='Price Impact (%)', | |
hovermode='closest' | |
) | |
# Add zero line | |
fig.add_hline(y=0, line_dash="dash", line_color="gray") | |
else: | |
fig = None | |
# Create visual impact analysis | |
visual_impact = { | |
'avg_impact_pct': impact_data.get('avg_impact_pct', 0), | |
'max_impact_pct': impact_data.get('max_impact_pct', 0), | |
'min_impact_pct': impact_data.get('min_impact_pct', 0), | |
'significant_moves_count': impact_data.get('significant_moves_count', 0), | |
'total_transactions': impact_data.get('total_transactions', 0), | |
'impact_chart': fig, | |
'transactions_with_impact': transactions_df | |
} | |
return visual_impact | |
def detect_manipulation(self, | |
wallets: List[str], | |
start_date: datetime, | |
end_date: datetime, | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential market manipulation by whale wallets | |
Args: | |
wallets: List of wallet addresses to analyze | |
start_date: Start date for analysis | |
end_date: End date for analysis | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of manipulation alerts | |
""" | |
agents = self.create_agents() | |
# Define tasks | |
data_collection_task = Task( | |
description=f""" | |
Collect all transactions for the following wallets: {', '.join(wallets)} | |
between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
Include all token transfers and also fetch price data if available. | |
""", | |
agent=agents["data_collector"], | |
expected_output=""" | |
A comprehensive dataset of all transactions for the specified wallets. | |
""" | |
) | |
price_impact_task = Task( | |
description=""" | |
Analyze the price impact of the whale transactions. | |
For each significant transaction, fetch and analyze price data around the transaction time. | |
""", | |
agent=agents["price_analyst"], | |
expected_output=""" | |
Price impact data for the transactions. | |
""", | |
context=[data_collection_task] | |
) | |
manipulation_detection_task = Task( | |
description=f""" | |
Detect potential market manipulation patterns in the transaction data with sensitivity level: {sensitivity}. | |
Look for: | |
1. Pump-and-Dump: Rapid buys followed by coordinated sell-offs | |
2. Wash Trading: Self-trading across multiple addresses | |
3. Spoofing: Large orders placed then canceled (if detectable) | |
4. Momentum Ignition: Creating sharp price moves to trigger other participants' momentum-based trading | |
For each potential manipulation, provide: | |
- Type of manipulation | |
- Involved addresses | |
- Risk level (High, Medium, Low) | |
- Description of the suspicious behavior | |
- Evidence (transactions showing the pattern) | |
""", | |
agent=agents["manipulation_detector"], | |
expected_output=""" | |
A detailed list of potential manipulation incidents with supporting evidence. | |
""", | |
context=[data_collection_task, price_impact_task] | |
) | |
# Create and run the crew | |
crew = Crew( | |
agents=[ | |
agents["data_collector"], | |
agents["price_analyst"], | |
agents["manipulation_detector"] | |
], | |
tasks=[ | |
data_collection_task, | |
price_impact_task, | |
manipulation_detection_task | |
], | |
verbose=2, | |
process=Process.sequential | |
) | |
result = crew.kickoff() | |
# Process the result | |
import json | |
try: | |
# Try to extract JSON from the result | |
import re | |
json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
if json_match: | |
json_str = json_match.group(1) | |
alerts_data = json.loads(json_str) | |
# Convert the alerts to visual format | |
return self._convert_alerts_to_visual_format(alerts_data) | |
else: | |
# Fallback to direct detection | |
# First, get transaction data | |
all_transactions = [] | |
for wallet in wallets: | |
transfers = self.arbiscan_client.fetch_all_token_transfers( | |
address=wallet | |
) | |
all_transactions.extend(transfers) | |
if not all_transactions: | |
return [] | |
transactions_df = pd.DataFrame(all_transactions) | |
# Calculate price impact for each transaction | |
price_data = {} | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('hash', '') | |
if not tx_hash: | |
continue | |
# Get symbol | |
symbol = row.get('tokenSymbol', '') | |
if not symbol: | |
continue | |
# Get timestamp | |
timestamp = row.get('timeStamp', 0) | |
if not timestamp: | |
continue | |
# Convert timestamp to datetime | |
if isinstance(timestamp, (int, float)): | |
tx_time = datetime.fromtimestamp(int(timestamp)) | |
else: | |
tx_time = timestamp | |
# Get price impact | |
symbol_usd = f"{symbol}USD" | |
impact = self.gemini_client.get_price_impact( | |
symbol=symbol_usd, | |
transaction_time=tx_time, | |
lookback_minutes=5, | |
lookahead_minutes=5 | |
) | |
price_data[tx_hash] = impact | |
# Detect wash trading | |
wash_trading_alerts = self.data_processor.detect_wash_trading( | |
transactions_df=transactions_df, | |
addresses=wallets, | |
sensitivity=sensitivity | |
) | |
# Detect pump and dump | |
pump_and_dump_alerts = self.data_processor.detect_pump_and_dump( | |
transactions_df=transactions_df, | |
price_data=price_data, | |
sensitivity=sensitivity | |
) | |
# Combine alerts | |
all_alerts = wash_trading_alerts + pump_and_dump_alerts | |
return all_alerts | |
except Exception as e: | |
print(f"Error detecting manipulation: {str(e)}") | |
return [] | |
def _convert_alerts_to_visual_format(self, alerts_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
""" | |
Convert manipulation alerts data to visual format with charts | |
Args: | |
alerts_data: Alerts data from agents | |
Returns: | |
List of alerts with visualizations | |
""" | |
visual_alerts = [] | |
for alert in alerts_data: | |
# Create chart based on alert type | |
if 'evidence' in alert and alert['evidence']: | |
evidence_data = [] | |
# Check if evidence is a JSON string | |
if isinstance(alert['evidence'], str): | |
try: | |
evidence_data = pd.read_json(alert['evidence']) | |
except: | |
evidence_data = pd.DataFrame() | |
else: | |
evidence_data = pd.DataFrame(alert['evidence']) | |
# Create visualization based on alert type | |
if not evidence_data.empty: | |
import plotly.graph_objects as go | |
import plotly.express as px | |
# Check for timestamp column | |
if 'Timestamp' in evidence_data.columns: | |
time_col = 'Timestamp' | |
elif 'timeStamp' in evidence_data.columns: | |
time_col = 'timeStamp' | |
elif 'timestamp' in evidence_data.columns: | |
time_col = 'timestamp' | |
else: | |
time_col = None | |
# Different visualizations based on alert type | |
if alert.get('type') == 'Wash Trading' and time_col: | |
# Create scatter plot of wash trading | |
fig = px.scatter( | |
evidence_data, | |
x=time_col, | |
y=evidence_data.get('Amount', evidence_data.get('tokenAmount', evidence_data.get('value', 0))), | |
color=evidence_data.get('From', evidence_data.get('from', 'Unknown')), | |
title=f"Wash Trading Evidence: {alert.get('title', '')}" | |
) | |
elif alert.get('type') == 'Pump and Dump' and time_col and 'pre_price' in evidence_data.columns: | |
# Create price line for pump and dump | |
fig = go.Figure() | |
# Plot price line | |
fig.add_trace(go.Scatter( | |
x=evidence_data[time_col], | |
y=evidence_data['pre_price'], | |
mode='lines+markers', | |
name='Price Before Transaction', | |
line=dict(color='blue') | |
)) | |
fig.add_trace(go.Scatter( | |
x=evidence_data[time_col], | |
y=evidence_data['post_price'], | |
mode='lines+markers', | |
name='Price After Transaction', | |
line=dict(color='red') | |
)) | |
fig.update_layout( | |
title=f"Pump and Dump Evidence: {alert.get('title', '')}", | |
xaxis_title='Time', | |
yaxis_title='Price', | |
hovermode='closest' | |
) | |
elif alert.get('type') == 'Momentum Ignition' and time_col and 'impact_pct' in evidence_data.columns: | |
# Create impact scatter for momentum ignition | |
fig = px.scatter( | |
evidence_data, | |
x=time_col, | |
y='impact_pct', | |
size=abs(evidence_data['impact_pct']), | |
color='impact_pct', | |
color_continuous_scale='RdBu', | |
title=f"Momentum Ignition Evidence: {alert.get('title', '')}" | |
) | |
else: | |
# Generic timeline view | |
if time_col: | |
fig = px.timeline( | |
evidence_data, | |
x_start=time_col, | |
x_end=time_col, | |
y=evidence_data.get('From', evidence_data.get('from', 'Unknown')), | |
color=alert.get('risk_level', 'Medium'), | |
title=f"Alert Evidence: {alert.get('title', '')}" | |
) | |
else: | |
fig = None | |
else: | |
fig = None | |
else: | |
fig = None | |
evidence_data = pd.DataFrame() | |
# Create visual alert object | |
visual_alert = { | |
"type": alert.get("type", "Unknown"), | |
"addresses": alert.get("addresses", []), | |
"risk_level": alert.get("risk_level", "Medium"), | |
"description": alert.get("description", ""), | |
"detection_time": alert.get("detection_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), | |
"title": alert.get("title", "Alert"), | |
"evidence": evidence_data, | |
"chart": fig | |
} | |
visual_alerts.append(visual_alert) | |
return visual_alerts | |
def generate_report(self, | |
wallets: List[str], | |
start_date: datetime, | |
end_date: datetime, | |
report_type: str = "Transaction Summary", | |
export_format: str = "PDF") -> Dict[str, Any]: | |
""" | |
Generate a report of whale activity | |
Args: | |
wallets: List of wallet addresses to include in the report | |
start_date: Start date for report period | |
end_date: End date for report period | |
report_type: Type of report to generate | |
export_format: Format for the report (CSV, PDF, PNG) | |
Returns: | |
Dictionary with report data | |
""" | |
from modules.visualizer import Visualizer | |
visualizer = Visualizer() | |
agents = self.create_agents() | |
# Define tasks | |
data_collection_task = Task( | |
description=f""" | |
Collect all transactions for the following wallets: {', '.join(wallets)} | |
between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
""", | |
agent=agents["data_collector"], | |
expected_output=""" | |
A comprehensive dataset of all transactions for the specified wallets. | |
""" | |
) | |
report_task = Task( | |
description=f""" | |
Generate a {report_type} report in {export_format} format. | |
The report should include: | |
1. Executive summary of wallet activity | |
2. Transaction analysis | |
3. Pattern identification (if applicable) | |
4. Price impact analysis (if applicable) | |
5. Manipulation detection (if applicable) | |
Organize the information clearly and provide actionable insights. | |
""", | |
agent=agents["report_generator"], | |
expected_output=f""" | |
A complete {export_format} report with all relevant analyses. | |
""", | |
context=[data_collection_task] | |
) | |
# Create and run the crew | |
crew = Crew( | |
agents=[agents["data_collector"], agents["report_generator"]], | |
tasks=[data_collection_task, report_task], | |
verbose=2, | |
process=Process.sequential | |
) | |
result = crew.kickoff() | |
# Process the result - for reports, we'll use our visualizer directly | |
# First, get transaction data | |
all_transactions = [] | |
for wallet in wallets: | |
transfers = self.arbiscan_client.fetch_all_token_transfers( | |
address=wallet | |
) | |
all_transactions.extend(transfers) | |
if not all_transactions: | |
return { | |
"filename": f"no_data_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format.lower()}", | |
"content": "" | |
} | |
transactions_df = pd.DataFrame(all_transactions) | |
# Generate the report based on format | |
filename = f"whale_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
if export_format == "CSV": | |
content = visualizer.generate_csv_report( | |
transactions_df=transactions_df, | |
report_type=report_type | |
) | |
filename += ".csv" | |
return { | |
"filename": filename, | |
"content": content | |
} | |
elif export_format == "PDF": | |
# For PDF we need to get more data | |
# Run pattern detection | |
patterns = self.identify_trading_patterns( | |
wallets=wallets, | |
start_date=start_date, | |
end_date=end_date | |
) | |
# Run price impact analysis | |
price_impact = self.analyze_price_impact( | |
wallets=wallets, | |
start_date=start_date, | |
end_date=end_date | |
) | |
# Run manipulation detection | |
alerts = self.detect_manipulation( | |
wallets=wallets, | |
start_date=start_date, | |
end_date=end_date | |
) | |
content = visualizer.generate_pdf_report( | |
transactions_df=transactions_df, | |
patterns=patterns, | |
price_impact=price_impact, | |
alerts=alerts, | |
title=f"Whale Analysis Report: {report_type}", | |
start_date=start_date, | |
end_date=end_date | |
) | |
filename += ".pdf" | |
return { | |
"filename": filename, | |
"content": content | |
} | |
elif export_format == "PNG": | |
# For PNG we'll create a chart based on report type | |
if report_type == "Transaction Summary": | |
fig = visualizer.create_transaction_timeline(transactions_df) | |
elif report_type == "Pattern Analysis": | |
fig = visualizer.create_volume_chart(transactions_df) | |
elif report_type == "Price Impact": | |
# Run price impact analysis first | |
price_impact = self.analyze_price_impact( | |
wallets=wallets, | |
start_date=start_date, | |
end_date=end_date | |
) | |
fig = price_impact.get('impact_chart', visualizer.create_transaction_timeline(transactions_df)) | |
else: # "Manipulation Detection" or "Complete Analysis" | |
fig = visualizer.create_network_graph(transactions_df) | |
content = visualizer.generate_png_chart(fig) | |
filename += ".png" | |
return { | |
"filename": filename, | |
"content": content | |
} | |
else: | |
return { | |
"filename": f"unsupported_format_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", | |
"content": "Unsupported export format requested." | |
} | |