Whale_Arbitrum / modules /data_processor.py
arpit13's picture
Deploy Whale_Arbitrum on HF Spaces
011960a
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Any, Tuple
from sklearn.cluster import KMeans, DBSCAN
from sklearn.preprocessing import StandardScaler
import plotly.graph_objects as go
import plotly.express as px
import logging
import time
class DataProcessor:
"""
Process and analyze transaction data from blockchain APIs
"""
def __init__(self):
pass
def aggregate_transactions(self,
transactions_df: pd.DataFrame,
time_window: str = 'D') -> pd.DataFrame:
"""
Aggregate transactions by time window
Args:
transactions_df: DataFrame of transactions
time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour)
Returns:
Aggregated DataFrame with transaction counts and volumes
"""
if transactions_df.empty:
return pd.DataFrame()
# Ensure timestamp column is datetime
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure amount column exists
if 'Amount' in transactions_df.columns:
amount_col = 'Amount'
elif 'tokenAmount' in transactions_df.columns:
amount_col = 'tokenAmount'
elif 'value' in transactions_df.columns:
# Try to adjust for decimals if 'tokenDecimal' exists
if 'tokenDecimal' in transactions_df.columns:
transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int))
amount_col = 'adjustedValue'
else:
amount_col = 'value'
else:
raise ValueError("Amount column not found in transactions DataFrame")
# Resample by time window
transactions_df = transactions_df.copy()
try:
transactions_df.set_index(pd.DatetimeIndex(transactions_df[timestamp_col]), inplace=True)
except Exception as e:
print(f"Error setting DatetimeIndex: {str(e)}")
# Create a safe index as a fallback
transactions_df['safe_timestamp'] = pd.date_range(
start='2025-01-01',
periods=len(transactions_df),
freq='H'
)
transactions_df.set_index('safe_timestamp', inplace=True)
# Identify buy vs sell transactions based on 'from' and 'to' addresses
if 'From' in transactions_df.columns and 'To' in transactions_df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns:
from_col, to_col = 'from', 'to'
else:
# If we can't determine direction, just aggregate total volume
agg_df = transactions_df.resample(time_window).agg({
amount_col: 'sum',
timestamp_col: 'count'
})
agg_df.columns = ['Volume', 'Count']
return agg_df.reset_index()
# Calculate net flow for each wallet address (positive = inflow, negative = outflow)
wallet_addresses = set(transactions_df[from_col].unique()) | set(transactions_df[to_col].unique())
results = []
for wallet in wallet_addresses:
wallet_df = transactions_df.copy()
# Mark transactions as inflow or outflow
wallet_df['Direction'] = 'Unknown'
wallet_df.loc[wallet_df[to_col] == wallet, 'Direction'] = 'In'
wallet_df.loc[wallet_df[from_col] == wallet, 'Direction'] = 'Out'
# Calculate net flow
wallet_df['NetFlow'] = wallet_df[amount_col]
wallet_df.loc[wallet_df['Direction'] == 'Out', 'NetFlow'] = -wallet_df.loc[wallet_df['Direction'] == 'Out', amount_col]
# Aggregate by time window
wallet_agg = wallet_df.resample(time_window).agg({
'NetFlow': 'sum',
timestamp_col: 'count'
})
wallet_agg.columns = ['NetFlow', 'Count']
wallet_agg['Wallet'] = wallet
results.append(wallet_agg.reset_index())
if not results:
return pd.DataFrame()
combined_df = pd.concat(results, ignore_index=True)
return combined_df
# Cache for pattern identification to avoid repeating expensive calculations
_pattern_cache = {}
def identify_patterns(self,
transactions_df: pd.DataFrame,
n_clusters: int = 3) -> List[Dict[str, Any]]:
"""
Identify trading patterns using clustering algorithms
Args:
transactions_df: DataFrame of transactions
n_clusters: Number of clusters to identify
Returns:
List of pattern dictionaries containing name, description, and confidence
"""
# Check for empty data early to avoid processing
if transactions_df.empty:
return []
# Create a cache key based on DataFrame hash and number of clusters
try:
cache_key = f"{hash(tuple(transactions_df.columns))}_{len(transactions_df)}_{n_clusters}"
# Check cache first
if cache_key in self._pattern_cache:
return self._pattern_cache[cache_key]
except Exception:
# If hashing fails, proceed without caching
cache_key = None
try:
# Create a reference instead of a deep copy to improve memory usage
df = transactions_df
# Ensure timestamp column exists - optimize column presence checks
timestamp_cols = ['Timestamp', 'timeStamp']
timestamp_col = next((col for col in timestamp_cols if col in df.columns), None)
if timestamp_col:
# Convert timestamp only if needed
if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]):
try:
# Use vectorized operations instead of astype where possible
if df[timestamp_col].dtype == 'object':
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce')
else:
df[timestamp_col] = pd.to_datetime(df[timestamp_col], unit='s', errors='coerce')
except Exception as e:
# Create a date range index as fallback
df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H')
timestamp_col = 'dummy_timestamp'
else:
# If no timestamp column, create a dummy index
df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H')
timestamp_col = 'dummy_timestamp'
# Efficiently calculate floor hour using vectorized operations
df['hour'] = df[timestamp_col].dt.floor('H')
# Check for address columns efficiently
if 'From' in df.columns and 'To' in df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in df.columns and 'to' in df.columns:
from_col, to_col = 'from', 'to'
else:
# Create dummy addresses only if necessary
df['from'] = [f'0x{i:040x}' for i in range(len(df))]
df['to'] = [f'0x{(i+1):040x}' for i in range(len(df))]
from_col, to_col = 'from', 'to'
# Efficiently determine amount column
amount_cols = ['Amount', 'tokenAmount', 'value', 'adjustedValue']
amount_col = next((col for col in amount_cols if col in df.columns), None)
if not amount_col:
# Handle special case for token values with decimals
if 'value' in df.columns and 'tokenDecimal' in df.columns:
# Vectorized calculation for improved performance
try:
# Ensure values are numeric
df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce')
df['tokenDecimal_numeric'] = pd.to_numeric(df['tokenDecimal'], errors='coerce').fillna(18)
df['adjustedValue'] = df['value_numeric'] / (10 ** df['tokenDecimal_numeric'])
amount_col = 'adjustedValue'
except Exception as e:
logging.warning(f"Error converting values: {e}")
df['dummy_amount'] = 1.0
amount_col = 'dummy_amount'
else:
# Fallback to dummy values
df['dummy_amount'] = 1.0
amount_col = 'dummy_amount'
# Ensure the amount column is numeric
try:
if amount_col in df.columns:
df[f"{amount_col}_numeric"] = pd.to_numeric(df[amount_col], errors='coerce').fillna(0)
amount_col = f"{amount_col}_numeric"
except Exception:
# If conversion fails, create a dummy numeric column
df['safe_amount'] = 1.0
amount_col = 'safe_amount'
# Calculate metrics using optimized groupby operations
# Use a more efficient approach with built-in pandas aggregation
agg_df = df.groupby('hour').agg(
Count=pd.NamedAgg(column=from_col, aggfunc='count'),
).reset_index()
# For NetFlow calculation, we need an additional pass
# This uses a more efficient calculation method
def calc_netflow(group):
# Use optimized filtering and calculations for better performance
first_to = group[to_col].iloc[0] if len(group) > 0 else None
first_from = group[from_col].iloc[0] if len(group) > 0 else None
if first_to is not None and first_from is not None:
# Ensure values are converted to numeric before summing
try:
# Convert to numeric with pd.to_numeric, coerce errors to NaN
total_in = pd.to_numeric(group.loc[group[to_col] == first_to, amount_col], errors='coerce').sum()
total_out = pd.to_numeric(group.loc[group[from_col] == first_from, amount_col], errors='coerce').sum()
# Replace NaN with 0 to avoid propagation
if pd.isna(total_in): total_in = 0.0
if pd.isna(total_out): total_out = 0.0
return float(total_in) - float(total_out)
except Exception as e:
import logging
logging.debug(f"Error converting values to numeric: {e}")
return 0.0
return 0.0
# Calculate NetFlow using apply instead of loop
netflows = df.groupby('hour').apply(calc_netflow)
agg_df['NetFlow'] = netflows.values
# Early return if not enough data for clustering
if agg_df.empty or len(agg_df) < n_clusters:
return []
# Ensure we don't have too many clusters for the dataset
actual_n_clusters = min(n_clusters, max(2, len(agg_df) // 2))
# Prepare features for clustering - with careful type handling
try:
if 'NetFlow' in agg_df.columns:
# Ensure NetFlow is numeric
agg_df['NetFlow'] = pd.to_numeric(agg_df['NetFlow'], errors='coerce').fillna(0)
features = agg_df[['NetFlow', 'Count']].copy()
primary_metric = 'NetFlow'
else:
# Calculate Volume if needed
if 'Volume' not in agg_df.columns and amount_col in df.columns:
# Calculate volume with numeric conversion
volume_by_hour = pd.to_numeric(df[amount_col], errors='coerce').fillna(0).groupby(df['hour']).sum()
agg_df['Volume'] = agg_df['hour'].map(volume_by_hour)
# Ensure Volume exists and is numeric
if 'Volume' not in agg_df.columns:
agg_df['Volume'] = 1.0 # Default value if calculation failed
else:
agg_df['Volume'] = pd.to_numeric(agg_df['Volume'], errors='coerce').fillna(1.0)
# Ensure Count is numeric
agg_df['Count'] = pd.to_numeric(agg_df['Count'], errors='coerce').fillna(1.0)
features = agg_df[['Volume', 'Count']].copy()
primary_metric = 'Volume'
# Final check to ensure features are numeric
for col in features.columns:
features[col] = pd.to_numeric(features[col], errors='coerce').fillna(0)
except Exception as e:
logging.warning(f"Error preparing clustering features: {e}")
# Create safe dummy features if everything else fails
agg_df['SafeFeature'] = 1.0
agg_df['Count'] = 1.0
features = agg_df[['SafeFeature', 'Count']].copy()
primary_metric = 'SafeFeature'
# Scale features - import only when needed for efficiency
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# Use K-Means with reduced complexity
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=actual_n_clusters, random_state=42, n_init=10, max_iter=100)
agg_df['Cluster'] = kmeans.fit_predict(scaled_features)
# Calculate time-based metrics from the hour column directly
if 'hour' in agg_df.columns:
try:
# Convert to datetime for hour and day extraction if needed
hour_series = pd.to_datetime(agg_df['hour'])
agg_df['Hour'] = hour_series.dt.hour
agg_df['Day'] = hour_series.dt.dayofweek
except Exception:
# Fallback for non-convertible data
agg_df['Hour'] = 0
agg_df['Day'] = 0
else:
# Default values if no hour column
agg_df['Hour'] = 0
agg_df['Day'] = 0
# Identify patterns efficiently
patterns = []
for i in range(actual_n_clusters):
# Use boolean indexing for better performance
cluster_mask = agg_df['Cluster'] == i
cluster_df = agg_df[cluster_mask]
if len(cluster_df) == 0:
continue
if primary_metric == 'NetFlow':
# Use numpy methods for faster calculation
avg_flow = cluster_df['NetFlow'].mean()
flow_std = cluster_df['NetFlow'].std()
behavior = "Accumulation" if avg_flow > 0 else "Distribution"
volume_metric = f"Net Flow: {avg_flow:.2f} ± {flow_std:.2f}"
else:
# Use Volume metrics - optimize to avoid redundant calculations
avg_volume = cluster_df['Volume'].mean() if 'Volume' in cluster_df else 0
volume_std = cluster_df['Volume'].std() if 'Volume' in cluster_df else 0
behavior = "High Volume" if 'Volume' in agg_df and avg_volume > agg_df['Volume'].mean() else "Low Volume"
volume_metric = f"Volume: {avg_volume:.2f} ± {volume_std:.2f}"
# Pattern characteristics
pattern_metrics = {
"avg_flow": avg_flow,
"flow_std": flow_std,
"avg_count": cluster_df['Count'].mean(),
"max_flow": cluster_df['NetFlow'].max(),
"min_flow": cluster_df['NetFlow'].min(),
"common_hour": cluster_df['Hour'].mode()[0] if not cluster_df['Hour'].empty else None,
"common_day": cluster_df['Day'].mode()[0] if not cluster_df['Day'].empty else None
}
# Enhanced confidence calculation
if primary_metric == 'NetFlow':
# Calculate within-cluster variance as a percentage of total variance
cluster_variance = cluster_df['NetFlow'].var()
total_variance = agg_df['NetFlow'].var() or 1 # Avoid division by zero
confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance)))
else:
# Calculate within-cluster variance as a percentage of total variance
cluster_variance = cluster_df['Volume'].var()
total_variance = agg_df['Volume'].var() or 1 # Avoid division by zero
confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance)))
# Create enhanced pattern charts - Main Chart
if primary_metric == 'NetFlow':
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow',
size='Count', color='Cluster',
title=f"Pattern {i+1}: {behavior}",
labels={'NetFlow': 'Net Token Flow', 'index': 'Time'},
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'])
# Add a trend line
main_fig.add_trace(go.Scatter(
x=cluster_df.index,
y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(),
mode='lines',
name='Trend',
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)')
))
# Add a zero reference line
main_fig.add_shape(
type="line",
x0=cluster_df.index.min(),
y0=0,
x1=cluster_df.index.max(),
y1=0,
line=dict(color="red", width=1, dash="dot"),
)
else:
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume',
size='Count', color='Cluster',
title=f"Pattern {i+1}: {behavior}",
labels={'Volume': 'Transaction Volume', 'index': 'Time'},
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'])
# Add a trend line
main_fig.add_trace(go.Scatter(
x=cluster_df.index,
y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(),
mode='lines',
name='Trend',
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)')
))
main_fig.update_layout(
template="plotly_white",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
margin=dict(l=20, r=20, t=50, b=20),
height=400
)
# Create hourly distribution chart
hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0)
hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values,
title="Hourly Distribution",
labels={'x': 'Hour of Day', 'y': 'Transaction Count'},
color_discrete_sequence=['#1f77b4'])
hour_fig.update_layout(template="plotly_white", height=300)
# Create volume/flow distribution chart
if primary_metric == 'NetFlow':
hist_data = cluster_df['NetFlow']
hist_title = "Net Flow Distribution"
hist_label = "Net Flow"
else:
hist_data = cluster_df['Volume']
hist_title = "Volume Distribution"
hist_label = "Volume"
dist_fig = px.histogram(hist_data,
title=hist_title,
labels={'value': hist_label, 'count': 'Frequency'},
color_discrete_sequence=['#2ca02c'])
dist_fig.update_layout(template="plotly_white", height=300)
# Find related transactions
if not transactions_df.empty:
# Get timestamps from this cluster
cluster_times = pd.to_datetime(cluster_df.index)
# Create time windows for matching
time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times]
# Find transactions within these time windows
pattern_txs = transactions_df[transactions_df[timestamp_col].apply(
lambda x: any((start <= x <= end) for start, end in time_windows)
)].copy()
# If we have too many, sample them
if len(pattern_txs) > 10:
pattern_txs = pattern_txs.sample(10)
# If we have too few, just sample from all transactions
if len(pattern_txs) < 5 and len(transactions_df) >= 5:
pattern_txs = transactions_df.sample(min(5, len(transactions_df)))
else:
pattern_txs = pd.DataFrame()
# Comprehensive pattern dictionary
pattern = {
"name": behavior,
"description": f"This pattern shows {behavior.lower()} activity.",
"strategy": "Unknown",
"risk_profile": "Unknown",
"time_insight": "Unknown",
"cluster_id": i,
"metrics": pattern_metrics,
"occurrence_count": len(cluster_df),
"volume_metric": volume_metric,
"confidence": confidence,
"impact": 0.0,
"charts": {
"main": main_fig,
"hourly_distribution": hour_fig,
"value_distribution": dist_fig
},
"examples": pattern_txs
}
patterns.append(pattern)
# Cache results for future reuse
if cache_key:
self._pattern_cache[cache_key] = patterns
return patterns
except Exception as e:
import logging
logging.warning(f"Error during pattern identification: {str(e)}")
return []
# Create enhanced pattern detection method with visualization capabilities
if primary_metric == 'NetFlow':
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow',
size='Count', color='Cluster',
title=f"Pattern {i+1}: {behavior}",
labels={'NetFlow': 'Net Token Flow', 'index': 'Time'},
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'])
# Add a trend line
main_fig.add_trace(go.Scatter(
x=cluster_df.index,
y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(),
mode='lines',
name='Trend',
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)')
))
# Add a zero reference line
main_fig.add_shape(
type="line",
x0=cluster_df.index.min(),
y0=0,
x1=cluster_df.index.max(),
y1=0,
line=dict(color="red", width=1, dash="dot"),
)
else:
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume',
size='Count', color='Cluster',
title=f"Pattern {i+1}: {behavior}",
labels={'Volume': 'Transaction Volume', 'index': 'Time'},
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'])
# Add a trend line
main_fig.add_trace(go.Scatter(
x=cluster_df.index,
y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(),
mode='lines',
name='Trend',
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)')
))
main_fig.update_layout(
template="plotly_white",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
margin=dict(l=20, r=20, t=50, b=20),
height=400
)
# Create hourly distribution chart
hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0)
hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values,
title="Hourly Distribution",
labels={'x': 'Hour of Day', 'y': 'Transaction Count'},
color_discrete_sequence=['#1f77b4'])
hour_fig.update_layout(template="plotly_white", height=300)
# Create volume/flow distribution chart
if primary_metric == 'NetFlow':
hist_data = cluster_df['NetFlow']
hist_title = "Net Flow Distribution"
hist_label = "Net Flow"
else:
hist_data = cluster_df['Volume']
hist_title = "Volume Distribution"
hist_label = "Volume"
dist_fig = px.histogram(hist_data,
title=hist_title,
labels={'value': hist_label, 'count': 'Frequency'},
color_discrete_sequence=['#2ca02c'])
dist_fig.update_layout(template="plotly_white", height=300)
# Find related transactions
if not transactions_df.empty:
# Get timestamps from this cluster
cluster_times = pd.to_datetime(cluster_df.index)
# Create time windows for matching
time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times]
# Find transactions within these time windows
pattern_txs = transactions_df[transactions_df[timestamp_col].apply(
lambda x: any((start <= x <= end) for start, end in time_windows)
)].copy()
# If we have too many, sample them
if len(pattern_txs) > 10:
pattern_txs = pattern_txs.sample(10)
# If we have too few, just sample from all transactions
if len(pattern_txs) < 5 and len(transactions_df) >= 5:
pattern_txs = transactions_df.sample(min(5, len(transactions_df)))
else:
pattern_txs = pd.DataFrame()
# Comprehensive pattern dictionary
pattern = {
"name": behavior,
"description": description,
"strategy": strategy,
"risk_profile": risk_profile,
"time_insight": time_insight,
"cluster_id": i,
"metrics": pattern_metrics,
"occurrence_count": len(cluster_df),
"volume_metric": volume_metric,
"confidence": confidence,
"charts": {
"main": main_fig,
"hourly_distribution": hour_fig,
"value_distribution": dist_fig
},
"examples": pattern_txs
}
patterns.append(pattern)
return patterns
def detect_anomalous_transactions(self,
transactions_df: pd.DataFrame,
sensitivity: str = "Medium") -> pd.DataFrame:
"""
Detect anomalous transactions using statistical methods
Args:
transactions_df: DataFrame of transactions
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
DataFrame of anomalous transactions
"""
if transactions_df.empty:
return pd.DataFrame()
# Ensure amount column exists
if 'Amount' in transactions_df.columns:
amount_col = 'Amount'
elif 'tokenAmount' in transactions_df.columns:
amount_col = 'tokenAmount'
elif 'value' in transactions_df.columns:
# Try to adjust for decimals if 'tokenDecimal' exists
if 'tokenDecimal' in transactions_df.columns:
transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int))
amount_col = 'adjustedValue'
else:
amount_col = 'value'
else:
raise ValueError("Amount column not found in transactions DataFrame")
# Define sensitivity thresholds
if sensitivity == "Low":
z_threshold = 3.0 # Outliers beyond 3 standard deviations
elif sensitivity == "Medium":
z_threshold = 2.5 # Outliers beyond 2.5 standard deviations
else: # High
z_threshold = 2.0 # Outliers beyond 2 standard deviations
# Calculate z-score for amount
mean_amount = transactions_df[amount_col].mean()
std_amount = transactions_df[amount_col].std()
if std_amount == 0:
return pd.DataFrame()
transactions_df['z_score'] = abs((transactions_df[amount_col] - mean_amount) / std_amount)
# Flag anomalous transactions
anomalies = transactions_df[transactions_df['z_score'] > z_threshold].copy()
# Add risk level based on z-score
anomalies['risk_level'] = 'Medium'
anomalies.loc[anomalies['z_score'] > z_threshold * 1.5, 'risk_level'] = 'High'
anomalies.loc[anomalies['z_score'] <= z_threshold * 1.2, 'risk_level'] = 'Low'
return anomalies
def analyze_price_impact(self,
transactions_df: pd.DataFrame,
price_data: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""
Analyze the price impact of transactions with enhanced visualizations
Args:
transactions_df: DataFrame of transactions
price_data: Dictionary of price impact data for each transaction
Returns:
Dictionary with comprehensive price impact analysis and visualizations
"""
if transactions_df.empty or not price_data:
# Create an empty chart for the default case
empty_fig = go.Figure()
empty_fig.update_layout(
title="No Price Impact Data Available",
xaxis_title="Time",
yaxis_title="Price Impact (%)",
height=400,
template="plotly_white"
)
empty_fig.add_annotation(
text="No transactions found with price impact data",
showarrow=False,
font=dict(size=14)
)
return {
'avg_impact_pct': 0,
'max_impact_pct': 0,
'min_impact_pct': 0,
'significant_moves_count': 0,
'total_transactions': 0,
'charts': {
'main_chart': empty_fig,
'impact_distribution': empty_fig,
'cumulative_impact': empty_fig,
'hourly_impact': empty_fig
},
'transactions_with_impact': pd.DataFrame(),
'insights': [],
'impact_summary': "No price impact data available"
}
# Ensure timestamp column is datetime
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s')
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Combine price impact data with transactions
impact_data = []
for idx, row in transactions_df.iterrows():
tx_hash = row.get('Transaction Hash', row.get('hash', None))
if not tx_hash or tx_hash not in price_data:
continue
tx_impact = price_data[tx_hash]
if tx_impact['impact_pct'] is None:
continue
# Get token symbol if available
token_symbol = row.get('tokenSymbol', 'Unknown')
token_amount = row.get('value', 0)
if 'tokenDecimal' in row:
try:
token_amount = float(token_amount) / (10 ** int(row.get('tokenDecimal', 0)))
except (ValueError, TypeError):
token_amount = 0
impact_data.append({
'transaction_hash': tx_hash,
'timestamp': row[timestamp_col],
'pre_price': tx_impact['pre_price'],
'post_price': tx_impact['post_price'],
'impact_pct': tx_impact['impact_pct'],
'token_symbol': token_symbol,
'token_amount': token_amount,
'from': row.get('from', ''),
'to': row.get('to', ''),
'hour': row[timestamp_col].hour if isinstance(row[timestamp_col], pd.Timestamp) else 0
})
if not impact_data:
# Create an empty chart for the default case
empty_fig = go.Figure()
empty_fig.update_layout(
title="No Price Impact Data Available",
xaxis_title="Time",
yaxis_title="Price Impact (%)",
height=400,
template="plotly_white"
)
empty_fig.add_annotation(
text="No transactions found with price impact data",
showarrow=False,
font=dict(size=14)
)
return {
'avg_impact_pct': 0,
'max_impact_pct': 0,
'min_impact_pct': 0,
'significant_moves_count': 0,
'total_transactions': len(transactions_df) if not transactions_df.empty else 0,
'charts': {
'main_chart': empty_fig,
'impact_distribution': empty_fig,
'cumulative_impact': empty_fig,
'hourly_impact': empty_fig
},
'transactions_with_impact': pd.DataFrame(),
'insights': [],
'impact_summary': "No price impact data available"
}
impact_df = pd.DataFrame(impact_data)
# Calculate aggregate metrics
avg_impact = impact_df['impact_pct'].mean()
max_impact = impact_df['impact_pct'].max()
min_impact = impact_df['impact_pct'].min()
median_impact = impact_df['impact_pct'].median()
std_impact = impact_df['impact_pct'].std()
# Count significant moves (>1% impact)
significant_threshold = 1.0
high_impact_threshold = 3.0
significant_moves = len(impact_df[abs(impact_df['impact_pct']) > significant_threshold])
high_impact_moves = len(impact_df[abs(impact_df['impact_pct']) > high_impact_threshold])
positive_impacts = len(impact_df[impact_df['impact_pct'] > 0])
negative_impacts = len(impact_df[impact_df['impact_pct'] < 0])
# Calculate cumulative impact
impact_df = impact_df.sort_values('timestamp')
impact_df['cumulative_impact'] = impact_df['impact_pct'].cumsum()
# Generate insights
insights = []
# Market direction bias
if avg_impact > 0.5:
insights.append({
"title": "Positive Price Pressure",
"description": f"Transactions show an overall positive price impact of {avg_impact:.2f}%, suggesting accumulation or market strength."
})
elif avg_impact < -0.5:
insights.append({
"title": "Negative Price Pressure",
"description": f"Transactions show an overall negative price impact of {avg_impact:.2f}%, suggesting distribution or market weakness."
})
# Volatility analysis
if std_impact > 2.0:
insights.append({
"title": "High Market Volatility",
"description": f"Price impact shows high volatility (std: {std_impact:.2f}%), indicating potential market manipulation or whipsaw conditions."
})
# Significant impacts
if high_impact_moves > 0:
insights.append({
"title": "High Impact Transactions",
"description": f"Detected {high_impact_moves} high-impact transactions (>{high_impact_threshold}% price change), indicating potential market-moving activity."
})
# Temporal patterns
hourly_impact = impact_df.groupby('hour')['impact_pct'].mean()
if len(hourly_impact) > 0:
max_hour = hourly_impact.abs().idxmax()
max_hour_impact = hourly_impact[max_hour]
insights.append({
"title": "Time-Based Pattern",
"description": f"Highest price impact occurs around {max_hour}:00 with an average of {max_hour_impact:.2f}%."
})
# Create impact summary text
impact_summary = f"Analysis of {len(impact_df)} price-impacting transactions shows an average impact of {avg_impact:.2f}% "
impact_summary += f"(range: {min_impact:.2f}% to {max_impact:.2f}%). "
impact_summary += f"Found {significant_moves} significant price moves and {high_impact_moves} high-impact transactions. "
if positive_impacts > negative_impacts:
impact_summary += f"There is a bias towards positive price impact ({positive_impacts} positive vs {negative_impacts} negative)."
elif negative_impacts > positive_impacts:
impact_summary += f"There is a bias towards negative price impact ({negative_impacts} negative vs {positive_impacts} positive)."
else:
impact_summary += "The price impact is balanced between positive and negative moves."
# Create enhanced main visualization
main_fig = go.Figure()
# Add scatter plot for impact
main_fig.add_trace(go.Scatter(
x=impact_df['timestamp'],
y=impact_df['impact_pct'],
mode='markers+lines',
marker=dict(
size=impact_df['impact_pct'].abs() * 1.5 + 5,
color=impact_df['impact_pct'],
colorscale='RdBu_r',
line=dict(width=1),
symbol=['circle' if val >= 0 else 'diamond' for val in impact_df['impact_pct']]
),
text=[
f"TX: {tx[:8]}...{tx[-6:]}<br>" +
f"Impact: {impact:.2f}%<br>" +
f"Token: {token} ({amount:.4f})<br>" +
f"From: {src[:6]}...{src[-4:]}<br>" +
f"To: {dst[:6]}...{dst[-4:]}"
for tx, impact, token, amount, src, dst in zip(
impact_df['transaction_hash'],
impact_df['impact_pct'],
impact_df['token_symbol'],
impact_df['token_amount'],
impact_df['from'],
impact_df['to']
)
],
hovertemplate='%{text}<br>Time: %{x}<extra></extra>',
name='Price Impact'
))
# Add a moving average trendline
window_size = max(3, len(impact_df) // 10) # Dynamic window size
if len(impact_df) >= window_size:
impact_df['ma'] = impact_df['impact_pct'].rolling(window=window_size, min_periods=1).mean()
main_fig.add_trace(go.Scatter(
x=impact_df['timestamp'],
y=impact_df['ma'],
mode='lines',
line=dict(width=2, color='rgba(255,165,0,0.7)'),
name=f'Moving Avg ({window_size} period)'
))
# Add a zero line for reference
main_fig.add_shape(
type='line',
x0=impact_df['timestamp'].min(),
y0=0,
x1=impact_df['timestamp'].max(),
y1=0,
line=dict(color='gray', width=1, dash='dash')
)
# Add colored regions for significant impact
# Add green band for normal price movement
main_fig.add_shape(
type='rect',
x0=impact_df['timestamp'].min(),
y0=-significant_threshold,
x1=impact_df['timestamp'].max(),
y1=significant_threshold,
fillcolor='rgba(0,255,0,0.1)',
line=dict(width=0),
layer='below'
)
# Add warning bands for higher impact movements
main_fig.add_shape(
type='rect',
x0=impact_df['timestamp'].min(),
y0=significant_threshold,
x1=impact_df['timestamp'].max(),
y1=high_impact_threshold,
fillcolor='rgba(255,255,0,0.1)',
line=dict(width=0),
layer='below'
)
main_fig.add_shape(
type='rect',
x0=impact_df['timestamp'].min(),
y0=-high_impact_threshold,
x1=impact_df['timestamp'].max(),
y1=-significant_threshold,
fillcolor='rgba(255,255,0,0.1)',
line=dict(width=0),
layer='below'
)
# Add high impact regions
main_fig.add_shape(
type='rect',
x0=impact_df['timestamp'].min(),
y0=high_impact_threshold,
x1=impact_df['timestamp'].max(),
y1=max(high_impact_threshold * 2, max_impact * 1.1),
fillcolor='rgba(255,0,0,0.1)',
line=dict(width=0),
layer='below'
)
main_fig.add_shape(
type='rect',
x0=impact_df['timestamp'].min(),
y0=min(high_impact_threshold * -2, min_impact * 1.1),
x1=impact_df['timestamp'].max(),
y1=-high_impact_threshold,
fillcolor='rgba(255,0,0,0.1)',
line=dict(width=0),
layer='below'
)
main_fig.update_layout(
title='Price Impact of Whale Transactions',
xaxis_title='Timestamp',
yaxis_title='Price Impact (%)',
hovermode='closest',
template="plotly_white",
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
margin=dict(l=20, r=20, t=50, b=20)
)
# Create impact distribution histogram
dist_fig = px.histogram(
impact_df['impact_pct'],
nbins=20,
labels={'value': 'Price Impact (%)', 'count': 'Frequency'},
title='Distribution of Price Impact',
color_discrete_sequence=['#3366CC']
)
# Add a vertical line at the mean
dist_fig.add_vline(x=avg_impact, line_dash="dash", line_color="red")
dist_fig.add_annotation(x=avg_impact, y=0.85, yref="paper", text=f"Mean: {avg_impact:.2f}%",
showarrow=True, arrowhead=2, arrowcolor="red", ax=40)
# Add a vertical line at zero
dist_fig.add_vline(x=0, line_dash="solid", line_color="black")
dist_fig.update_layout(
template="plotly_white",
bargap=0.1,
height=350
)
# Create cumulative impact chart
cumul_fig = go.Figure()
cumul_fig.add_trace(go.Scatter(
x=impact_df['timestamp'],
y=impact_df['cumulative_impact'],
mode='lines',
fill='tozeroy',
line=dict(width=2, color='#2ca02c'),
name='Cumulative Impact'
))
cumul_fig.update_layout(
title='Cumulative Price Impact Over Time',
xaxis_title='Timestamp',
yaxis_title='Cumulative Price Impact (%)',
template="plotly_white",
height=350
)
# Create hourly impact analysis
hourly_impact = impact_df.groupby('hour')['impact_pct'].agg(['mean', 'count', 'std']).reset_index()
hourly_impact = hourly_impact.sort_values('hour')
hour_fig = go.Figure()
hour_fig.add_trace(go.Bar(
x=hourly_impact['hour'],
y=hourly_impact['mean'],
error_y=dict(type='data', array=hourly_impact['std'], visible=True),
marker_color=hourly_impact['mean'].apply(lambda x: 'green' if x > 0 else 'red'),
name='Average Impact'
))
hour_fig.update_layout(
title='Price Impact by Hour of Day',
xaxis_title='Hour of Day',
yaxis_title='Average Price Impact (%)',
template="plotly_white",
height=350,
xaxis=dict(tickmode='linear', tick0=0, dtick=2)
)
# Join with original transactions
transactions_df = transactions_df.copy()
transactions_df['Timestamp_key'] = transactions_df[timestamp_col]
impact_df['Timestamp_key'] = impact_df['timestamp']
merged_df = pd.merge(
transactions_df,
impact_df[['Timestamp_key', 'impact_pct', 'pre_price', 'post_price', 'cumulative_impact']],
on='Timestamp_key',
how='left'
)
# Final result with enhanced output
return {
'avg_impact_pct': avg_impact,
'max_impact_pct': max_impact,
'min_impact_pct': min_impact,
'median_impact_pct': median_impact,
'std_impact_pct': std_impact,
'significant_moves_count': significant_moves,
'high_impact_moves_count': high_impact_moves,
'positive_impacts_count': positive_impacts,
'negative_impacts_count': negative_impacts,
'total_transactions': len(transactions_df),
'charts': {
'main_chart': main_fig,
'impact_distribution': dist_fig,
'cumulative_impact': cumul_fig,
'hourly_impact': hour_fig
},
'transactions_with_impact': merged_df,
'insights': insights,
'impact_summary': impact_summary
}
def detect_wash_trading(self,
transactions_df: pd.DataFrame,
addresses: List[str],
time_window_minutes: int = 60,
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential wash trading between addresses
Args:
transactions_df: DataFrame of transactions
addresses: List of addresses to analyze
time_window_minutes: Time window for detecting wash trades
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential wash trading incidents
"""
if transactions_df.empty or not addresses:
return []
# Ensure from/to columns exist
if 'From' in transactions_df.columns and 'To' in transactions_df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns:
from_col, to_col = 'from', 'to'
else:
raise ValueError("From/To columns not found in transactions DataFrame")
# Ensure timestamp column exists
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col])
# Define sensitivity thresholds
if sensitivity == "Low":
min_cycles = 3 # Minimum number of back-and-forth transactions
max_time_diff = 120 # Maximum minutes between transactions
elif sensitivity == "Medium":
min_cycles = 2
max_time_diff = 60
else: # High
min_cycles = 1
max_time_diff = 30
# Filter transactions involving the addresses
address_txs = transactions_df[
(transactions_df[from_col].isin(addresses)) |
(transactions_df[to_col].isin(addresses))
].copy()
if address_txs.empty:
return []
# Sort by timestamp
address_txs = address_txs.sort_values(by=timestamp_col)
# Detect cycles of transactions between same addresses
wash_trades = []
for addr1 in addresses:
for addr2 in addresses:
if addr1 == addr2:
continue
# Find transactions from addr1 to addr2
a1_to_a2 = address_txs[
(address_txs[from_col] == addr1) &
(address_txs[to_col] == addr2)
]
# Find transactions from addr2 to addr1
a2_to_a1 = address_txs[
(address_txs[from_col] == addr2) &
(address_txs[to_col] == addr1)
]
if a1_to_a2.empty or a2_to_a1.empty:
continue
# Check for back-and-forth patterns
cycles = 0
evidence = []
for _, tx1 in a1_to_a2.iterrows():
tx1_time = tx1[timestamp_col]
# Find return transactions within the time window
return_txs = a2_to_a1[
(a2_to_a1[timestamp_col] > tx1_time) &
(a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff))
]
if not return_txs.empty:
cycles += 1
evidence.append(tx1)
evidence.append(return_txs.iloc[0])
if cycles >= min_cycles:
# Create visualization
if evidence:
evidence_df = pd.DataFrame(evidence)
fig = px.scatter(
evidence_df,
x=timestamp_col,
y=evidence_df.get('Amount', evidence_df.get('tokenAmount', evidence_df.get('value', 0))),
color=from_col,
title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..."
)
else:
fig = None
wash_trades.append({
"type": "Wash Trading",
"addresses": [addr1, addr2],
"risk_level": "High" if cycles >= min_cycles * 2 else "Medium",
"description": f"Detected {cycles} cycles of back-and-forth transactions between addresses",
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": f"Wash Trading Pattern ({cycles} cycles)",
"evidence": pd.DataFrame(evidence) if evidence else None,
"chart": fig
})
return wash_trades
def detect_pump_and_dump(self,
transactions_df: pd.DataFrame,
price_data: Dict[str, Dict[str, Any]],
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential pump and dump schemes
Args:
transactions_df: DataFrame of transactions
price_data: Dictionary of price impact data for each transaction
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential pump and dump incidents
"""
if transactions_df.empty or not price_data:
return []
# Ensure timestamp column exists
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure from/to columns exist
if 'From' in transactions_df.columns and 'To' in transactions_df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns:
from_col, to_col = 'from', 'to'
else:
raise ValueError("From/To columns not found in transactions DataFrame")
# Ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col])
# Define sensitivity thresholds
if sensitivity == "Low":
accumulation_threshold = 5 # Number of buys to consider accumulation
pump_threshold = 10.0 # % price increase to trigger pump
dump_threshold = -8.0 # % price decrease to trigger dump
elif sensitivity == "Medium":
accumulation_threshold = 3
pump_threshold = 7.0
dump_threshold = -5.0
else: # High
accumulation_threshold = 2
pump_threshold = 5.0
dump_threshold = -3.0
# Combine price impact data with transactions
txs_with_impact = []
for idx, row in transactions_df.iterrows():
tx_hash = row.get('Transaction Hash', row.get('hash', None))
if not tx_hash or tx_hash not in price_data:
continue
tx_impact = price_data[tx_hash]
if tx_impact['impact_pct'] is None:
continue
txs_with_impact.append({
'transaction_hash': tx_hash,
'timestamp': row[timestamp_col],
'from': row[from_col],
'to': row[to_col],
'pre_price': tx_impact['pre_price'],
'post_price': tx_impact['post_price'],
'impact_pct': tx_impact['impact_pct']
})
if not txs_with_impact:
return []
impact_df = pd.DataFrame(txs_with_impact)
impact_df = impact_df.sort_values(by='timestamp')
# Look for accumulation phases followed by price pumps and then dumps
pump_and_dumps = []
# Group by address to analyze per wallet
address_groups = {}
for from_addr in impact_df['from'].unique():
address_groups[from_addr] = impact_df[impact_df['from'] == from_addr]
for to_addr in impact_df['to'].unique():
if to_addr in address_groups:
address_groups[to_addr] = pd.concat([
address_groups[to_addr],
impact_df[impact_df['to'] == to_addr]
])
else:
address_groups[to_addr] = impact_df[impact_df['to'] == to_addr]
for address, addr_df in address_groups.items():
# Skip if not enough transactions
if len(addr_df) < accumulation_threshold + 2:
continue
# Look for continuous price increase followed by sharp drop
window_size = min(len(addr_df), 10)
for i in range(len(addr_df) - window_size + 1):
window = addr_df.iloc[i:i+window_size]
# Get cumulative price change in window
if len(window) >= 2:
first_price = window.iloc[0]['pre_price']
last_price = window.iloc[-1]['post_price']
if first_price is None or last_price is None:
continue
cumulative_change = ((last_price - first_price) / first_price) * 100
# Check for pump phase
max_price = window['post_price'].max()
max_idx = window['post_price'].idxmax()
if max_idx < len(window) - 1:
max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100
# If we have a pump followed by a dump
if (cumulative_change > pump_threshold or
any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold:
# Create chart
fig = go.Figure()
# Plot price line
times = [t.timestamp() for t in window['timestamp']]
prices = []
for _, row in window.iterrows():
prices.append(row['pre_price'])
prices.append(row['post_price'])
times_expanded = []
for t in times:
times_expanded.append(t - 60) # 1 min before
times_expanded.append(t + 60) # 1 min after
fig.add_trace(go.Scatter(
x=times_expanded,
y=prices,
mode='lines+markers',
name='Price',
line=dict(color='blue')
))
# Highlight pump and dump phases
max_time_idx = window.index.get_loc(max_idx)
pump_x = times_expanded[:max_time_idx*2+2]
pump_y = prices[:max_time_idx*2+2]
dump_x = times_expanded[max_time_idx*2:]
dump_y = prices[max_time_idx*2:]
fig.add_trace(go.Scatter(
x=pump_x,
y=pump_y,
mode='lines',
line=dict(color='green', width=3),
name='Pump Phase'
))
fig.add_trace(go.Scatter(
x=dump_x,
y=dump_y,
mode='lines',
line=dict(color='red', width=3),
name='Dump Phase'
))
fig.update_layout(
title='Potential Pump and Dump Pattern',
xaxis_title='Time',
yaxis_title='Price',
hovermode='closest'
)
pump_and_dumps.append({
"type": "Pump and Dump",
"addresses": [address],
"risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium",
"description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%",
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)",
"evidence": window,
"chart": fig
})
return pump_and_dumps