Spaces:
Build error
Build error
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
import plotly.graph_objects as go | |
import plotly.express as px | |
class ManipulationDetector: | |
""" | |
Detect potential market manipulation patterns in whale transactions | |
""" | |
def __init__(self): | |
# Define known manipulation patterns | |
self.patterns = { | |
"pump_and_dump": { | |
"description": "Rapid buys followed by coordinated sell-offs, causing price to first rise then crash", | |
"risk_factor": 0.8 | |
}, | |
"wash_trading": { | |
"description": "Self-trading across multiple addresses to create false impression of market activity", | |
"risk_factor": 0.9 | |
}, | |
"spoofing": { | |
"description": "Large orders placed then canceled before execution to manipulate price", | |
"risk_factor": 0.7 | |
}, | |
"layering": { | |
"description": "Multiple orders at different price levels to create false impression of market depth", | |
"risk_factor": 0.6 | |
}, | |
"momentum_ignition": { | |
"description": "Creating sharp price moves to trigger other participants' momentum-based trading", | |
"risk_factor": 0.5 | |
} | |
} | |
def detect_wash_trading(self, | |
transactions_df: pd.DataFrame, | |
addresses: List[str], | |
sensitivity: str = "Medium", | |
lookback_hours: int = 24) -> List[Dict[str, Any]]: | |
""" | |
Detect potential wash trading between addresses | |
Args: | |
transactions_df: DataFrame of transactions | |
addresses: List of addresses to analyze | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
lookback_hours: Hours to look back for wash trading patterns | |
Returns: | |
List of potential wash trading alerts | |
""" | |
if transactions_df.empty or not addresses: | |
return [] | |
# Ensure from/to columns exist | |
if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
raise ValueError("From/To columns not found in transactions DataFrame") | |
# Ensure timestamp column exists | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure timestamp is datetime | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') | |
else: | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
min_cycles = 3 # Minimum number of back-and-forth transactions | |
max_time_diff = 120 # Maximum minutes between transactions | |
elif sensitivity == "Medium": | |
min_cycles = 2 | |
max_time_diff = 60 | |
else: # High | |
min_cycles = 1 | |
max_time_diff = 30 | |
# Filter transactions by lookback period | |
lookback_time = datetime.now() - timedelta(hours=lookback_hours) | |
recent_txs = transactions_df[transactions_df[timestamp_col] >= lookback_time] | |
if recent_txs.empty: | |
return [] | |
# Filter transactions involving the addresses | |
address_txs = recent_txs[ | |
(recent_txs[from_col].isin(addresses)) | | |
(recent_txs[to_col].isin(addresses)) | |
].copy() | |
if address_txs.empty: | |
return [] | |
# Sort by timestamp | |
address_txs = address_txs.sort_values(by=timestamp_col) | |
# Detect cycles of transactions between same addresses | |
wash_trades = [] | |
for addr1 in addresses: | |
for addr2 in addresses: | |
if addr1 == addr2: | |
continue | |
# Find transactions from addr1 to addr2 | |
a1_to_a2 = address_txs[ | |
(address_txs[from_col] == addr1) & | |
(address_txs[to_col] == addr2) | |
] | |
# Find transactions from addr2 to addr1 | |
a2_to_a1 = address_txs[ | |
(address_txs[from_col] == addr2) & | |
(address_txs[to_col] == addr1) | |
] | |
if a1_to_a2.empty or a2_to_a1.empty: | |
continue | |
# Check for back-and-forth patterns | |
cycles = 0 | |
evidence = [] | |
for _, tx1 in a1_to_a2.iterrows(): | |
tx1_time = tx1[timestamp_col] | |
# Find return transactions within the time window | |
return_txs = a2_to_a1[ | |
(a2_to_a1[timestamp_col] > tx1_time) & | |
(a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff)) | |
] | |
if not return_txs.empty: | |
cycles += 1 | |
evidence.append(tx1) | |
evidence.append(return_txs.iloc[0]) | |
if cycles >= min_cycles: | |
# Create visualization | |
if evidence: | |
evidence_df = pd.DataFrame(evidence) | |
# Get amount column | |
if 'Amount' in evidence_df.columns: | |
amount_col = 'Amount' | |
elif 'tokenAmount' in evidence_df.columns: | |
amount_col = 'tokenAmount' | |
elif 'value' in evidence_df.columns: | |
# Try to adjust for decimals if 'tokenDecimal' exists | |
if 'tokenDecimal' in evidence_df.columns: | |
evidence_df['adjustedValue'] = evidence_df['value'].astype(float) / (10 ** evidence_df['tokenDecimal'].astype(int)) | |
amount_col = 'adjustedValue' | |
else: | |
amount_col = 'value' | |
else: | |
amount_col = None | |
# Create figure if amount column exists | |
if amount_col: | |
fig = px.scatter( | |
evidence_df, | |
x=timestamp_col, | |
y=amount_col, | |
color=from_col, | |
title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..." | |
) | |
else: | |
fig = None | |
else: | |
fig = None | |
wash_trades.append({ | |
"type": "Wash Trading", | |
"addresses": [addr1, addr2], | |
"risk_level": "High" if cycles >= min_cycles * 2 else "Medium", | |
"description": f"Detected {cycles} cycles of back-and-forth transactions between addresses", | |
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"title": f"Wash Trading Pattern ({cycles} cycles)", | |
"evidence": pd.DataFrame(evidence) if evidence else None, | |
"chart": fig | |
}) | |
return wash_trades | |
def detect_pump_and_dump(self, | |
transactions_df: pd.DataFrame, | |
price_data: Dict[str, Dict[str, Any]], | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential pump and dump schemes | |
Args: | |
transactions_df: DataFrame of transactions | |
price_data: Dictionary of price impact data for each transaction | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential pump and dump alerts | |
""" | |
if transactions_df.empty or not price_data: | |
return [] | |
# Ensure timestamp column exists | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure from/to columns exist | |
if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
raise ValueError("From/To columns not found in transactions DataFrame") | |
# Ensure timestamp is datetime | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') | |
else: | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
accumulation_threshold = 5 # Number of buys to consider accumulation | |
pump_threshold = 10.0 # % price increase to trigger pump | |
dump_threshold = -8.0 # % price decrease to trigger dump | |
elif sensitivity == "Medium": | |
accumulation_threshold = 3 | |
pump_threshold = 7.0 | |
dump_threshold = -5.0 | |
else: # High | |
accumulation_threshold = 2 | |
pump_threshold = 5.0 | |
dump_threshold = -3.0 | |
# Combine price impact data with transactions | |
txs_with_impact = [] | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
if not tx_hash or tx_hash not in price_data: | |
continue | |
tx_impact = price_data[tx_hash] | |
if tx_impact['impact_pct'] is None: | |
continue | |
txs_with_impact.append({ | |
'transaction_hash': tx_hash, | |
'timestamp': row[timestamp_col], | |
'from': row[from_col], | |
'to': row[to_col], | |
'pre_price': tx_impact['pre_price'], | |
'post_price': tx_impact['post_price'], | |
'impact_pct': tx_impact['impact_pct'] | |
}) | |
if not txs_with_impact: | |
return [] | |
impact_df = pd.DataFrame(txs_with_impact) | |
impact_df = impact_df.sort_values(by='timestamp') | |
# Look for accumulation phases followed by price pumps and then dumps | |
pump_and_dumps = [] | |
# Group by address to analyze per wallet | |
address_groups = {} | |
for from_addr in impact_df['from'].unique(): | |
address_groups[from_addr] = impact_df[impact_df['from'] == from_addr] | |
for to_addr in impact_df['to'].unique(): | |
if to_addr in address_groups: | |
address_groups[to_addr] = pd.concat([ | |
address_groups[to_addr], | |
impact_df[impact_df['to'] == to_addr] | |
]) | |
else: | |
address_groups[to_addr] = impact_df[impact_df['to'] == to_addr] | |
for address, addr_df in address_groups.items(): | |
# Skip if not enough transactions | |
if len(addr_df) < accumulation_threshold + 2: | |
continue | |
# Look for continuous price increase followed by sharp drop | |
window_size = min(len(addr_df), 10) | |
for i in range(len(addr_df) - window_size + 1): | |
window = addr_df.iloc[i:i+window_size] | |
# Get cumulative price change in window | |
if len(window) >= 2: | |
first_price = window.iloc[0]['pre_price'] | |
last_price = window.iloc[-1]['post_price'] | |
if first_price is None or last_price is None: | |
continue | |
cumulative_change = ((last_price - first_price) / first_price) * 100 | |
# Check for pump phase | |
max_price = window['post_price'].max() | |
max_idx = window['post_price'].idxmax() | |
if max_idx < len(window) - 1: | |
max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100 | |
# If we have a pump followed by a dump | |
if (cumulative_change > pump_threshold or | |
any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold: | |
# Create chart | |
fig = go.Figure() | |
# Plot price line | |
times = [t.timestamp() for t in window['timestamp']] | |
prices = [] | |
for _, row in window.iterrows(): | |
prices.append(row['pre_price']) | |
prices.append(row['post_price']) | |
times_expanded = [] | |
for t in times: | |
times_expanded.append(t - 60) # 1 min before | |
times_expanded.append(t + 60) # 1 min after | |
fig.add_trace(go.Scatter( | |
x=times_expanded, | |
y=prices, | |
mode='lines+markers', | |
name='Price', | |
line=dict(color='blue') | |
)) | |
# Highlight pump and dump phases | |
max_time_idx = window.index.get_loc(max_idx) | |
pump_x = times_expanded[:max_time_idx*2+2] | |
pump_y = prices[:max_time_idx*2+2] | |
dump_x = times_expanded[max_time_idx*2:] | |
dump_y = prices[max_time_idx*2:] | |
fig.add_trace(go.Scatter( | |
x=pump_x, | |
y=pump_y, | |
mode='lines', | |
line=dict(color='green', width=3), | |
name='Pump Phase' | |
)) | |
fig.add_trace(go.Scatter( | |
x=dump_x, | |
y=dump_y, | |
mode='lines', | |
line=dict(color='red', width=3), | |
name='Dump Phase' | |
)) | |
fig.update_layout( | |
title='Potential Pump and Dump Pattern', | |
xaxis_title='Time', | |
yaxis_title='Price', | |
hovermode='closest' | |
) | |
pump_and_dumps.append({ | |
"type": "Pump and Dump", | |
"addresses": [address], | |
"risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium", | |
"description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%", | |
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)", | |
"evidence": window, | |
"chart": fig | |
}) | |
return pump_and_dumps | |
def detect_spoofing(self, | |
transactions_df: pd.DataFrame, | |
order_book_data: Optional[pd.DataFrame] = None, | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential spoofing (placing and quickly canceling large orders) | |
Args: | |
transactions_df: DataFrame of transactions | |
order_book_data: Optional DataFrame of order book data | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential spoofing alerts | |
""" | |
# Note: This is a placeholder since we don't have direct order book data | |
# In a real implementation, this would analyze order placement and cancellations | |
# For now, return an empty list as we can't detect spoofing without order book data | |
return [] | |
def detect_layering(self, | |
transactions_df: pd.DataFrame, | |
order_book_data: Optional[pd.DataFrame] = None, | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential layering (placing multiple orders at different price levels) | |
Args: | |
transactions_df: DataFrame of transactions | |
order_book_data: Optional DataFrame of order book data | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential layering alerts | |
""" | |
# Note: This is a placeholder since we don't have direct order book data | |
# In a real implementation, this would analyze order book depth and patterns | |
# For now, return an empty list as we can't detect layering without order book data | |
return [] | |
def detect_momentum_ignition(self, | |
transactions_df: pd.DataFrame, | |
price_data: Dict[str, Dict[str, Any]], | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential momentum ignition (creating sharp price moves) | |
Args: | |
transactions_df: DataFrame of transactions | |
price_data: Dictionary of price impact data for each transaction | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential momentum ignition alerts | |
""" | |
if transactions_df.empty or not price_data: | |
return [] | |
# Ensure timestamp column exists | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure timestamp is datetime | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') | |
else: | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
impact_threshold = 15.0 # % price impact to trigger alert | |
time_window_minutes = 5 # Time window to look for follow-up transactions | |
elif sensitivity == "Medium": | |
impact_threshold = 10.0 | |
time_window_minutes = 10 | |
else: # High | |
impact_threshold = 5.0 | |
time_window_minutes = 15 | |
# Combine price impact data with transactions | |
txs_with_impact = [] | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
if not tx_hash or tx_hash not in price_data: | |
continue | |
tx_impact = price_data[tx_hash] | |
if tx_impact['impact_pct'] is None: | |
continue | |
txs_with_impact.append({ | |
'transaction_hash': tx_hash, | |
'timestamp': row[timestamp_col], | |
'from': row.get('From', row.get('from', 'Unknown')), | |
'to': row.get('To', row.get('to', 'Unknown')), | |
'pre_price': tx_impact['pre_price'], | |
'post_price': tx_impact['post_price'], | |
'impact_pct': tx_impact['impact_pct'] | |
}) | |
if not txs_with_impact: | |
return [] | |
impact_df = pd.DataFrame(txs_with_impact) | |
impact_df = impact_df.sort_values(by='timestamp') | |
# Look for large price impacts followed by increased trading activity | |
momentum_alerts = [] | |
# Find high-impact transactions | |
high_impact_txs = impact_df[abs(impact_df['impact_pct']) > impact_threshold] | |
for idx, high_impact_tx in high_impact_txs.iterrows(): | |
tx_time = high_impact_tx['timestamp'] | |
# Look for increased trading activity after the high-impact transaction | |
follow_up_window = impact_df[ | |
(impact_df['timestamp'] > tx_time) & | |
(impact_df['timestamp'] <= tx_time + pd.Timedelta(minutes=time_window_minutes)) | |
] | |
# Compare activity to baseline (same time window before the transaction) | |
baseline_window = impact_df[ | |
(impact_df['timestamp'] < tx_time) & | |
(impact_df['timestamp'] >= tx_time - pd.Timedelta(minutes=time_window_minutes)) | |
] | |
if len(follow_up_window) > len(baseline_window) * 1.5 and len(follow_up_window) >= 3: | |
# Create chart | |
fig = go.Figure() | |
# Plot price timeline | |
all_relevant_txs = pd.concat([ | |
pd.DataFrame([high_impact_tx]), | |
follow_up_window, | |
baseline_window | |
]).sort_values(by='timestamp') | |
# Create time series for price | |
timestamps = all_relevant_txs['timestamp'] | |
prices = [] | |
for _, row in all_relevant_txs.iterrows(): | |
prices.append(row['pre_price']) | |
prices.append(row['post_price']) | |
times_expanded = [] | |
for t in timestamps: | |
times_expanded.append(t - pd.Timedelta(seconds=30)) | |
times_expanded.append(t + pd.Timedelta(seconds=30)) | |
# Plot price line | |
fig.add_trace(go.Scatter( | |
x=times_expanded[:len(prices)], # In case of any length mismatch | |
y=prices[:len(times_expanded)], | |
mode='lines', | |
name='Price' | |
)) | |
# Highlight the high-impact transaction | |
fig.add_trace(go.Scatter( | |
x=[high_impact_tx['timestamp']], | |
y=[high_impact_tx['post_price']], | |
mode='markers', | |
marker=dict( | |
size=15, | |
color='red', | |
symbol='circle' | |
), | |
name='Momentum Ignition' | |
)) | |
# Highlight the follow-up transactions | |
if not follow_up_window.empty: | |
fig.add_trace(go.Scatter( | |
x=follow_up_window['timestamp'], | |
y=follow_up_window['post_price'], | |
mode='markers', | |
marker=dict( | |
size=10, | |
color='orange', | |
symbol='circle' | |
), | |
name='Follow-up Activity' | |
)) | |
fig.update_layout( | |
title='Potential Momentum Ignition Pattern', | |
xaxis_title='Time', | |
yaxis_title='Price', | |
hovermode='closest' | |
) | |
momentum_alerts.append({ | |
"type": "Momentum Ignition", | |
"addresses": [high_impact_tx['from']], | |
"risk_level": "High" if abs(high_impact_tx['impact_pct']) > impact_threshold * 1.5 else "Medium", | |
"description": f"Large {high_impact_tx['impact_pct']:.2f}% price move followed by {len(follow_up_window)} transactions in {time_window_minutes} minutes (vs {len(baseline_window)} in baseline)", | |
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"title": f"Momentum Ignition ({high_impact_tx['impact_pct']:.1f}% price move)", | |
"evidence": pd.concat([pd.DataFrame([high_impact_tx]), follow_up_window]), | |
"chart": fig | |
}) | |
return momentum_alerts | |
def run_all_detections(self, | |
transactions_df: pd.DataFrame, | |
addresses: List[str], | |
price_data: Dict[str, Dict[str, Any]] = None, | |
order_book_data: Optional[pd.DataFrame] = None, | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Run all manipulation detection algorithms | |
Args: | |
transactions_df: DataFrame of transactions | |
addresses: List of addresses to analyze | |
price_data: Optional dictionary of price impact data for each transaction | |
order_book_data: Optional DataFrame of order book data | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential manipulation alerts | |
""" | |
if transactions_df.empty: | |
return [] | |
all_alerts = [] | |
# Detect wash trading | |
wash_trading_alerts = self.detect_wash_trading( | |
transactions_df=transactions_df, | |
addresses=addresses, | |
sensitivity=sensitivity | |
) | |
all_alerts.extend(wash_trading_alerts) | |
# Detect pump and dump (if price data available) | |
if price_data: | |
pump_and_dump_alerts = self.detect_pump_and_dump( | |
transactions_df=transactions_df, | |
price_data=price_data, | |
sensitivity=sensitivity | |
) | |
all_alerts.extend(pump_and_dump_alerts) | |
# Detect momentum ignition (if price data available) | |
momentum_alerts = self.detect_momentum_ignition( | |
transactions_df=transactions_df, | |
price_data=price_data, | |
sensitivity=sensitivity | |
) | |
all_alerts.extend(momentum_alerts) | |
# Detect spoofing (if order book data available) | |
if order_book_data is not None: | |
spoofing_alerts = self.detect_spoofing( | |
transactions_df=transactions_df, | |
order_book_data=order_book_data, | |
sensitivity=sensitivity | |
) | |
all_alerts.extend(spoofing_alerts) | |
# Detect layering (if order book data available) | |
layering_alerts = self.detect_layering( | |
transactions_df=transactions_df, | |
order_book_data=order_book_data, | |
sensitivity=sensitivity | |
) | |
all_alerts.extend(layering_alerts) | |
# Sort alerts by risk level | |
risk_order = {"High": 0, "Medium": 1, "Low": 2} | |
all_alerts.sort(key=lambda x: risk_order.get(x.get("risk_level", "Low"), 3)) | |
return all_alerts | |