Whale_Arbitrum / modules /detection.py
arpit13's picture
Deploy Whale_Arbitrum on HF Spaces
011960a
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Any, Tuple
import plotly.graph_objects as go
import plotly.express as px
class ManipulationDetector:
"""
Detect potential market manipulation patterns in whale transactions
"""
def __init__(self):
# Define known manipulation patterns
self.patterns = {
"pump_and_dump": {
"description": "Rapid buys followed by coordinated sell-offs, causing price to first rise then crash",
"risk_factor": 0.8
},
"wash_trading": {
"description": "Self-trading across multiple addresses to create false impression of market activity",
"risk_factor": 0.9
},
"spoofing": {
"description": "Large orders placed then canceled before execution to manipulate price",
"risk_factor": 0.7
},
"layering": {
"description": "Multiple orders at different price levels to create false impression of market depth",
"risk_factor": 0.6
},
"momentum_ignition": {
"description": "Creating sharp price moves to trigger other participants' momentum-based trading",
"risk_factor": 0.5
}
}
def detect_wash_trading(self,
transactions_df: pd.DataFrame,
addresses: List[str],
sensitivity: str = "Medium",
lookback_hours: int = 24) -> List[Dict[str, Any]]:
"""
Detect potential wash trading between addresses
Args:
transactions_df: DataFrame of transactions
addresses: List of addresses to analyze
sensitivity: Detection sensitivity ("Low", "Medium", "High")
lookback_hours: Hours to look back for wash trading patterns
Returns:
List of potential wash trading alerts
"""
if transactions_df.empty or not addresses:
return []
# Ensure from/to columns exist
if 'From' in transactions_df.columns and 'To' in transactions_df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns:
from_col, to_col = 'from', 'to'
else:
raise ValueError("From/To columns not found in transactions DataFrame")
# Ensure timestamp column exists
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s')
else:
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col])
# Define sensitivity thresholds
if sensitivity == "Low":
min_cycles = 3 # Minimum number of back-and-forth transactions
max_time_diff = 120 # Maximum minutes between transactions
elif sensitivity == "Medium":
min_cycles = 2
max_time_diff = 60
else: # High
min_cycles = 1
max_time_diff = 30
# Filter transactions by lookback period
lookback_time = datetime.now() - timedelta(hours=lookback_hours)
recent_txs = transactions_df[transactions_df[timestamp_col] >= lookback_time]
if recent_txs.empty:
return []
# Filter transactions involving the addresses
address_txs = recent_txs[
(recent_txs[from_col].isin(addresses)) |
(recent_txs[to_col].isin(addresses))
].copy()
if address_txs.empty:
return []
# Sort by timestamp
address_txs = address_txs.sort_values(by=timestamp_col)
# Detect cycles of transactions between same addresses
wash_trades = []
for addr1 in addresses:
for addr2 in addresses:
if addr1 == addr2:
continue
# Find transactions from addr1 to addr2
a1_to_a2 = address_txs[
(address_txs[from_col] == addr1) &
(address_txs[to_col] == addr2)
]
# Find transactions from addr2 to addr1
a2_to_a1 = address_txs[
(address_txs[from_col] == addr2) &
(address_txs[to_col] == addr1)
]
if a1_to_a2.empty or a2_to_a1.empty:
continue
# Check for back-and-forth patterns
cycles = 0
evidence = []
for _, tx1 in a1_to_a2.iterrows():
tx1_time = tx1[timestamp_col]
# Find return transactions within the time window
return_txs = a2_to_a1[
(a2_to_a1[timestamp_col] > tx1_time) &
(a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff))
]
if not return_txs.empty:
cycles += 1
evidence.append(tx1)
evidence.append(return_txs.iloc[0])
if cycles >= min_cycles:
# Create visualization
if evidence:
evidence_df = pd.DataFrame(evidence)
# Get amount column
if 'Amount' in evidence_df.columns:
amount_col = 'Amount'
elif 'tokenAmount' in evidence_df.columns:
amount_col = 'tokenAmount'
elif 'value' in evidence_df.columns:
# Try to adjust for decimals if 'tokenDecimal' exists
if 'tokenDecimal' in evidence_df.columns:
evidence_df['adjustedValue'] = evidence_df['value'].astype(float) / (10 ** evidence_df['tokenDecimal'].astype(int))
amount_col = 'adjustedValue'
else:
amount_col = 'value'
else:
amount_col = None
# Create figure if amount column exists
if amount_col:
fig = px.scatter(
evidence_df,
x=timestamp_col,
y=amount_col,
color=from_col,
title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..."
)
else:
fig = None
else:
fig = None
wash_trades.append({
"type": "Wash Trading",
"addresses": [addr1, addr2],
"risk_level": "High" if cycles >= min_cycles * 2 else "Medium",
"description": f"Detected {cycles} cycles of back-and-forth transactions between addresses",
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": f"Wash Trading Pattern ({cycles} cycles)",
"evidence": pd.DataFrame(evidence) if evidence else None,
"chart": fig
})
return wash_trades
def detect_pump_and_dump(self,
transactions_df: pd.DataFrame,
price_data: Dict[str, Dict[str, Any]],
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential pump and dump schemes
Args:
transactions_df: DataFrame of transactions
price_data: Dictionary of price impact data for each transaction
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential pump and dump alerts
"""
if transactions_df.empty or not price_data:
return []
# Ensure timestamp column exists
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure from/to columns exist
if 'From' in transactions_df.columns and 'To' in transactions_df.columns:
from_col, to_col = 'From', 'To'
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns:
from_col, to_col = 'from', 'to'
else:
raise ValueError("From/To columns not found in transactions DataFrame")
# Ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s')
else:
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col])
# Define sensitivity thresholds
if sensitivity == "Low":
accumulation_threshold = 5 # Number of buys to consider accumulation
pump_threshold = 10.0 # % price increase to trigger pump
dump_threshold = -8.0 # % price decrease to trigger dump
elif sensitivity == "Medium":
accumulation_threshold = 3
pump_threshold = 7.0
dump_threshold = -5.0
else: # High
accumulation_threshold = 2
pump_threshold = 5.0
dump_threshold = -3.0
# Combine price impact data with transactions
txs_with_impact = []
for idx, row in transactions_df.iterrows():
tx_hash = row.get('Transaction Hash', row.get('hash', None))
if not tx_hash or tx_hash not in price_data:
continue
tx_impact = price_data[tx_hash]
if tx_impact['impact_pct'] is None:
continue
txs_with_impact.append({
'transaction_hash': tx_hash,
'timestamp': row[timestamp_col],
'from': row[from_col],
'to': row[to_col],
'pre_price': tx_impact['pre_price'],
'post_price': tx_impact['post_price'],
'impact_pct': tx_impact['impact_pct']
})
if not txs_with_impact:
return []
impact_df = pd.DataFrame(txs_with_impact)
impact_df = impact_df.sort_values(by='timestamp')
# Look for accumulation phases followed by price pumps and then dumps
pump_and_dumps = []
# Group by address to analyze per wallet
address_groups = {}
for from_addr in impact_df['from'].unique():
address_groups[from_addr] = impact_df[impact_df['from'] == from_addr]
for to_addr in impact_df['to'].unique():
if to_addr in address_groups:
address_groups[to_addr] = pd.concat([
address_groups[to_addr],
impact_df[impact_df['to'] == to_addr]
])
else:
address_groups[to_addr] = impact_df[impact_df['to'] == to_addr]
for address, addr_df in address_groups.items():
# Skip if not enough transactions
if len(addr_df) < accumulation_threshold + 2:
continue
# Look for continuous price increase followed by sharp drop
window_size = min(len(addr_df), 10)
for i in range(len(addr_df) - window_size + 1):
window = addr_df.iloc[i:i+window_size]
# Get cumulative price change in window
if len(window) >= 2:
first_price = window.iloc[0]['pre_price']
last_price = window.iloc[-1]['post_price']
if first_price is None or last_price is None:
continue
cumulative_change = ((last_price - first_price) / first_price) * 100
# Check for pump phase
max_price = window['post_price'].max()
max_idx = window['post_price'].idxmax()
if max_idx < len(window) - 1:
max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100
# If we have a pump followed by a dump
if (cumulative_change > pump_threshold or
any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold:
# Create chart
fig = go.Figure()
# Plot price line
times = [t.timestamp() for t in window['timestamp']]
prices = []
for _, row in window.iterrows():
prices.append(row['pre_price'])
prices.append(row['post_price'])
times_expanded = []
for t in times:
times_expanded.append(t - 60) # 1 min before
times_expanded.append(t + 60) # 1 min after
fig.add_trace(go.Scatter(
x=times_expanded,
y=prices,
mode='lines+markers',
name='Price',
line=dict(color='blue')
))
# Highlight pump and dump phases
max_time_idx = window.index.get_loc(max_idx)
pump_x = times_expanded[:max_time_idx*2+2]
pump_y = prices[:max_time_idx*2+2]
dump_x = times_expanded[max_time_idx*2:]
dump_y = prices[max_time_idx*2:]
fig.add_trace(go.Scatter(
x=pump_x,
y=pump_y,
mode='lines',
line=dict(color='green', width=3),
name='Pump Phase'
))
fig.add_trace(go.Scatter(
x=dump_x,
y=dump_y,
mode='lines',
line=dict(color='red', width=3),
name='Dump Phase'
))
fig.update_layout(
title='Potential Pump and Dump Pattern',
xaxis_title='Time',
yaxis_title='Price',
hovermode='closest'
)
pump_and_dumps.append({
"type": "Pump and Dump",
"addresses": [address],
"risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium",
"description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%",
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)",
"evidence": window,
"chart": fig
})
return pump_and_dumps
def detect_spoofing(self,
transactions_df: pd.DataFrame,
order_book_data: Optional[pd.DataFrame] = None,
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential spoofing (placing and quickly canceling large orders)
Args:
transactions_df: DataFrame of transactions
order_book_data: Optional DataFrame of order book data
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential spoofing alerts
"""
# Note: This is a placeholder since we don't have direct order book data
# In a real implementation, this would analyze order placement and cancellations
# For now, return an empty list as we can't detect spoofing without order book data
return []
def detect_layering(self,
transactions_df: pd.DataFrame,
order_book_data: Optional[pd.DataFrame] = None,
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential layering (placing multiple orders at different price levels)
Args:
transactions_df: DataFrame of transactions
order_book_data: Optional DataFrame of order book data
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential layering alerts
"""
# Note: This is a placeholder since we don't have direct order book data
# In a real implementation, this would analyze order book depth and patterns
# For now, return an empty list as we can't detect layering without order book data
return []
def detect_momentum_ignition(self,
transactions_df: pd.DataFrame,
price_data: Dict[str, Dict[str, Any]],
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Detect potential momentum ignition (creating sharp price moves)
Args:
transactions_df: DataFrame of transactions
price_data: Dictionary of price impact data for each transaction
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential momentum ignition alerts
"""
if transactions_df.empty or not price_data:
return []
# Ensure timestamp column exists
if 'Timestamp' in transactions_df.columns:
timestamp_col = 'Timestamp'
elif 'timeStamp' in transactions_df.columns:
timestamp_col = 'timeStamp'
else:
raise ValueError("Timestamp column not found in transactions DataFrame")
# Ensure timestamp is datetime
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]):
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)):
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s')
else:
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col])
# Define sensitivity thresholds
if sensitivity == "Low":
impact_threshold = 15.0 # % price impact to trigger alert
time_window_minutes = 5 # Time window to look for follow-up transactions
elif sensitivity == "Medium":
impact_threshold = 10.0
time_window_minutes = 10
else: # High
impact_threshold = 5.0
time_window_minutes = 15
# Combine price impact data with transactions
txs_with_impact = []
for idx, row in transactions_df.iterrows():
tx_hash = row.get('Transaction Hash', row.get('hash', None))
if not tx_hash or tx_hash not in price_data:
continue
tx_impact = price_data[tx_hash]
if tx_impact['impact_pct'] is None:
continue
txs_with_impact.append({
'transaction_hash': tx_hash,
'timestamp': row[timestamp_col],
'from': row.get('From', row.get('from', 'Unknown')),
'to': row.get('To', row.get('to', 'Unknown')),
'pre_price': tx_impact['pre_price'],
'post_price': tx_impact['post_price'],
'impact_pct': tx_impact['impact_pct']
})
if not txs_with_impact:
return []
impact_df = pd.DataFrame(txs_with_impact)
impact_df = impact_df.sort_values(by='timestamp')
# Look for large price impacts followed by increased trading activity
momentum_alerts = []
# Find high-impact transactions
high_impact_txs = impact_df[abs(impact_df['impact_pct']) > impact_threshold]
for idx, high_impact_tx in high_impact_txs.iterrows():
tx_time = high_impact_tx['timestamp']
# Look for increased trading activity after the high-impact transaction
follow_up_window = impact_df[
(impact_df['timestamp'] > tx_time) &
(impact_df['timestamp'] <= tx_time + pd.Timedelta(minutes=time_window_minutes))
]
# Compare activity to baseline (same time window before the transaction)
baseline_window = impact_df[
(impact_df['timestamp'] < tx_time) &
(impact_df['timestamp'] >= tx_time - pd.Timedelta(minutes=time_window_minutes))
]
if len(follow_up_window) > len(baseline_window) * 1.5 and len(follow_up_window) >= 3:
# Create chart
fig = go.Figure()
# Plot price timeline
all_relevant_txs = pd.concat([
pd.DataFrame([high_impact_tx]),
follow_up_window,
baseline_window
]).sort_values(by='timestamp')
# Create time series for price
timestamps = all_relevant_txs['timestamp']
prices = []
for _, row in all_relevant_txs.iterrows():
prices.append(row['pre_price'])
prices.append(row['post_price'])
times_expanded = []
for t in timestamps:
times_expanded.append(t - pd.Timedelta(seconds=30))
times_expanded.append(t + pd.Timedelta(seconds=30))
# Plot price line
fig.add_trace(go.Scatter(
x=times_expanded[:len(prices)], # In case of any length mismatch
y=prices[:len(times_expanded)],
mode='lines',
name='Price'
))
# Highlight the high-impact transaction
fig.add_trace(go.Scatter(
x=[high_impact_tx['timestamp']],
y=[high_impact_tx['post_price']],
mode='markers',
marker=dict(
size=15,
color='red',
symbol='circle'
),
name='Momentum Ignition'
))
# Highlight the follow-up transactions
if not follow_up_window.empty:
fig.add_trace(go.Scatter(
x=follow_up_window['timestamp'],
y=follow_up_window['post_price'],
mode='markers',
marker=dict(
size=10,
color='orange',
symbol='circle'
),
name='Follow-up Activity'
))
fig.update_layout(
title='Potential Momentum Ignition Pattern',
xaxis_title='Time',
yaxis_title='Price',
hovermode='closest'
)
momentum_alerts.append({
"type": "Momentum Ignition",
"addresses": [high_impact_tx['from']],
"risk_level": "High" if abs(high_impact_tx['impact_pct']) > impact_threshold * 1.5 else "Medium",
"description": f"Large {high_impact_tx['impact_pct']:.2f}% price move followed by {len(follow_up_window)} transactions in {time_window_minutes} minutes (vs {len(baseline_window)} in baseline)",
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"title": f"Momentum Ignition ({high_impact_tx['impact_pct']:.1f}% price move)",
"evidence": pd.concat([pd.DataFrame([high_impact_tx]), follow_up_window]),
"chart": fig
})
return momentum_alerts
def run_all_detections(self,
transactions_df: pd.DataFrame,
addresses: List[str],
price_data: Dict[str, Dict[str, Any]] = None,
order_book_data: Optional[pd.DataFrame] = None,
sensitivity: str = "Medium") -> List[Dict[str, Any]]:
"""
Run all manipulation detection algorithms
Args:
transactions_df: DataFrame of transactions
addresses: List of addresses to analyze
price_data: Optional dictionary of price impact data for each transaction
order_book_data: Optional DataFrame of order book data
sensitivity: Detection sensitivity ("Low", "Medium", "High")
Returns:
List of potential manipulation alerts
"""
if transactions_df.empty:
return []
all_alerts = []
# Detect wash trading
wash_trading_alerts = self.detect_wash_trading(
transactions_df=transactions_df,
addresses=addresses,
sensitivity=sensitivity
)
all_alerts.extend(wash_trading_alerts)
# Detect pump and dump (if price data available)
if price_data:
pump_and_dump_alerts = self.detect_pump_and_dump(
transactions_df=transactions_df,
price_data=price_data,
sensitivity=sensitivity
)
all_alerts.extend(pump_and_dump_alerts)
# Detect momentum ignition (if price data available)
momentum_alerts = self.detect_momentum_ignition(
transactions_df=transactions_df,
price_data=price_data,
sensitivity=sensitivity
)
all_alerts.extend(momentum_alerts)
# Detect spoofing (if order book data available)
if order_book_data is not None:
spoofing_alerts = self.detect_spoofing(
transactions_df=transactions_df,
order_book_data=order_book_data,
sensitivity=sensitivity
)
all_alerts.extend(spoofing_alerts)
# Detect layering (if order book data available)
layering_alerts = self.detect_layering(
transactions_df=transactions_df,
order_book_data=order_book_data,
sensitivity=sensitivity
)
all_alerts.extend(layering_alerts)
# Sort alerts by risk level
risk_order = {"High": 0, "Medium": 1, "Low": 2}
all_alerts.sort(key=lambda x: risk_order.get(x.get("risk_level", "Low"), 3))
return all_alerts