import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Union, Any, Tuple import plotly.graph_objects as go import plotly.express as px class ManipulationDetector: """ Detect potential market manipulation patterns in whale transactions """ def __init__(self): # Define known manipulation patterns self.patterns = { "pump_and_dump": { "description": "Rapid buys followed by coordinated sell-offs, causing price to first rise then crash", "risk_factor": 0.8 }, "wash_trading": { "description": "Self-trading across multiple addresses to create false impression of market activity", "risk_factor": 0.9 }, "spoofing": { "description": "Large orders placed then canceled before execution to manipulate price", "risk_factor": 0.7 }, "layering": { "description": "Multiple orders at different price levels to create false impression of market depth", "risk_factor": 0.6 }, "momentum_ignition": { "description": "Creating sharp price moves to trigger other participants' momentum-based trading", "risk_factor": 0.5 } } def detect_wash_trading(self, transactions_df: pd.DataFrame, addresses: List[str], sensitivity: str = "Medium", lookback_hours: int = 24) -> List[Dict[str, Any]]: """ Detect potential wash trading between addresses Args: transactions_df: DataFrame of transactions addresses: List of addresses to analyze sensitivity: Detection sensitivity ("Low", "Medium", "High") lookback_hours: Hours to look back for wash trading patterns Returns: List of potential wash trading alerts """ if transactions_df.empty or not addresses: return [] # Ensure from/to columns exist if 'From' in transactions_df.columns and 'To' in transactions_df.columns: from_col, to_col = 'From', 'To' elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: from_col, to_col = 'from', 'to' else: raise ValueError("From/To columns not found in transactions DataFrame") # Ensure timestamp column exists if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure timestamp is datetime if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') else: transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) # Define sensitivity thresholds if sensitivity == "Low": min_cycles = 3 # Minimum number of back-and-forth transactions max_time_diff = 120 # Maximum minutes between transactions elif sensitivity == "Medium": min_cycles = 2 max_time_diff = 60 else: # High min_cycles = 1 max_time_diff = 30 # Filter transactions by lookback period lookback_time = datetime.now() - timedelta(hours=lookback_hours) recent_txs = transactions_df[transactions_df[timestamp_col] >= lookback_time] if recent_txs.empty: return [] # Filter transactions involving the addresses address_txs = recent_txs[ (recent_txs[from_col].isin(addresses)) | (recent_txs[to_col].isin(addresses)) ].copy() if address_txs.empty: return [] # Sort by timestamp address_txs = address_txs.sort_values(by=timestamp_col) # Detect cycles of transactions between same addresses wash_trades = [] for addr1 in addresses: for addr2 in addresses: if addr1 == addr2: continue # Find transactions from addr1 to addr2 a1_to_a2 = address_txs[ (address_txs[from_col] == addr1) & (address_txs[to_col] == addr2) ] # Find transactions from addr2 to addr1 a2_to_a1 = address_txs[ (address_txs[from_col] == addr2) & (address_txs[to_col] == addr1) ] if a1_to_a2.empty or a2_to_a1.empty: continue # Check for back-and-forth patterns cycles = 0 evidence = [] for _, tx1 in a1_to_a2.iterrows(): tx1_time = tx1[timestamp_col] # Find return transactions within the time window return_txs = a2_to_a1[ (a2_to_a1[timestamp_col] > tx1_time) & (a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff)) ] if not return_txs.empty: cycles += 1 evidence.append(tx1) evidence.append(return_txs.iloc[0]) if cycles >= min_cycles: # Create visualization if evidence: evidence_df = pd.DataFrame(evidence) # Get amount column if 'Amount' in evidence_df.columns: amount_col = 'Amount' elif 'tokenAmount' in evidence_df.columns: amount_col = 'tokenAmount' elif 'value' in evidence_df.columns: # Try to adjust for decimals if 'tokenDecimal' exists if 'tokenDecimal' in evidence_df.columns: evidence_df['adjustedValue'] = evidence_df['value'].astype(float) / (10 ** evidence_df['tokenDecimal'].astype(int)) amount_col = 'adjustedValue' else: amount_col = 'value' else: amount_col = None # Create figure if amount column exists if amount_col: fig = px.scatter( evidence_df, x=timestamp_col, y=amount_col, color=from_col, title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..." ) else: fig = None else: fig = None wash_trades.append({ "type": "Wash Trading", "addresses": [addr1, addr2], "risk_level": "High" if cycles >= min_cycles * 2 else "Medium", "description": f"Detected {cycles} cycles of back-and-forth transactions between addresses", "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "title": f"Wash Trading Pattern ({cycles} cycles)", "evidence": pd.DataFrame(evidence) if evidence else None, "chart": fig }) return wash_trades def detect_pump_and_dump(self, transactions_df: pd.DataFrame, price_data: Dict[str, Dict[str, Any]], sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential pump and dump schemes Args: transactions_df: DataFrame of transactions price_data: Dictionary of price impact data for each transaction sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential pump and dump alerts """ if transactions_df.empty or not price_data: return [] # Ensure timestamp column exists if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure from/to columns exist if 'From' in transactions_df.columns and 'To' in transactions_df.columns: from_col, to_col = 'From', 'To' elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: from_col, to_col = 'from', 'to' else: raise ValueError("From/To columns not found in transactions DataFrame") # Ensure timestamp is datetime if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') else: transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) # Define sensitivity thresholds if sensitivity == "Low": accumulation_threshold = 5 # Number of buys to consider accumulation pump_threshold = 10.0 # % price increase to trigger pump dump_threshold = -8.0 # % price decrease to trigger dump elif sensitivity == "Medium": accumulation_threshold = 3 pump_threshold = 7.0 dump_threshold = -5.0 else: # High accumulation_threshold = 2 pump_threshold = 5.0 dump_threshold = -3.0 # Combine price impact data with transactions txs_with_impact = [] for idx, row in transactions_df.iterrows(): tx_hash = row.get('Transaction Hash', row.get('hash', None)) if not tx_hash or tx_hash not in price_data: continue tx_impact = price_data[tx_hash] if tx_impact['impact_pct'] is None: continue txs_with_impact.append({ 'transaction_hash': tx_hash, 'timestamp': row[timestamp_col], 'from': row[from_col], 'to': row[to_col], 'pre_price': tx_impact['pre_price'], 'post_price': tx_impact['post_price'], 'impact_pct': tx_impact['impact_pct'] }) if not txs_with_impact: return [] impact_df = pd.DataFrame(txs_with_impact) impact_df = impact_df.sort_values(by='timestamp') # Look for accumulation phases followed by price pumps and then dumps pump_and_dumps = [] # Group by address to analyze per wallet address_groups = {} for from_addr in impact_df['from'].unique(): address_groups[from_addr] = impact_df[impact_df['from'] == from_addr] for to_addr in impact_df['to'].unique(): if to_addr in address_groups: address_groups[to_addr] = pd.concat([ address_groups[to_addr], impact_df[impact_df['to'] == to_addr] ]) else: address_groups[to_addr] = impact_df[impact_df['to'] == to_addr] for address, addr_df in address_groups.items(): # Skip if not enough transactions if len(addr_df) < accumulation_threshold + 2: continue # Look for continuous price increase followed by sharp drop window_size = min(len(addr_df), 10) for i in range(len(addr_df) - window_size + 1): window = addr_df.iloc[i:i+window_size] # Get cumulative price change in window if len(window) >= 2: first_price = window.iloc[0]['pre_price'] last_price = window.iloc[-1]['post_price'] if first_price is None or last_price is None: continue cumulative_change = ((last_price - first_price) / first_price) * 100 # Check for pump phase max_price = window['post_price'].max() max_idx = window['post_price'].idxmax() if max_idx < len(window) - 1: max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100 # If we have a pump followed by a dump if (cumulative_change > pump_threshold or any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold: # Create chart fig = go.Figure() # Plot price line times = [t.timestamp() for t in window['timestamp']] prices = [] for _, row in window.iterrows(): prices.append(row['pre_price']) prices.append(row['post_price']) times_expanded = [] for t in times: times_expanded.append(t - 60) # 1 min before times_expanded.append(t + 60) # 1 min after fig.add_trace(go.Scatter( x=times_expanded, y=prices, mode='lines+markers', name='Price', line=dict(color='blue') )) # Highlight pump and dump phases max_time_idx = window.index.get_loc(max_idx) pump_x = times_expanded[:max_time_idx*2+2] pump_y = prices[:max_time_idx*2+2] dump_x = times_expanded[max_time_idx*2:] dump_y = prices[max_time_idx*2:] fig.add_trace(go.Scatter( x=pump_x, y=pump_y, mode='lines', line=dict(color='green', width=3), name='Pump Phase' )) fig.add_trace(go.Scatter( x=dump_x, y=dump_y, mode='lines', line=dict(color='red', width=3), name='Dump Phase' )) fig.update_layout( title='Potential Pump and Dump Pattern', xaxis_title='Time', yaxis_title='Price', hovermode='closest' ) pump_and_dumps.append({ "type": "Pump and Dump", "addresses": [address], "risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium", "description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%", "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)", "evidence": window, "chart": fig }) return pump_and_dumps def detect_spoofing(self, transactions_df: pd.DataFrame, order_book_data: Optional[pd.DataFrame] = None, sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential spoofing (placing and quickly canceling large orders) Args: transactions_df: DataFrame of transactions order_book_data: Optional DataFrame of order book data sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential spoofing alerts """ # Note: This is a placeholder since we don't have direct order book data # In a real implementation, this would analyze order placement and cancellations # For now, return an empty list as we can't detect spoofing without order book data return [] def detect_layering(self, transactions_df: pd.DataFrame, order_book_data: Optional[pd.DataFrame] = None, sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential layering (placing multiple orders at different price levels) Args: transactions_df: DataFrame of transactions order_book_data: Optional DataFrame of order book data sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential layering alerts """ # Note: This is a placeholder since we don't have direct order book data # In a real implementation, this would analyze order book depth and patterns # For now, return an empty list as we can't detect layering without order book data return [] def detect_momentum_ignition(self, transactions_df: pd.DataFrame, price_data: Dict[str, Dict[str, Any]], sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential momentum ignition (creating sharp price moves) Args: transactions_df: DataFrame of transactions price_data: Dictionary of price impact data for each transaction sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential momentum ignition alerts """ if transactions_df.empty or not price_data: return [] # Ensure timestamp column exists if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure timestamp is datetime if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): if isinstance(transactions_df[timestamp_col].iloc[0], (int, float)): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') else: transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) # Define sensitivity thresholds if sensitivity == "Low": impact_threshold = 15.0 # % price impact to trigger alert time_window_minutes = 5 # Time window to look for follow-up transactions elif sensitivity == "Medium": impact_threshold = 10.0 time_window_minutes = 10 else: # High impact_threshold = 5.0 time_window_minutes = 15 # Combine price impact data with transactions txs_with_impact = [] for idx, row in transactions_df.iterrows(): tx_hash = row.get('Transaction Hash', row.get('hash', None)) if not tx_hash or tx_hash not in price_data: continue tx_impact = price_data[tx_hash] if tx_impact['impact_pct'] is None: continue txs_with_impact.append({ 'transaction_hash': tx_hash, 'timestamp': row[timestamp_col], 'from': row.get('From', row.get('from', 'Unknown')), 'to': row.get('To', row.get('to', 'Unknown')), 'pre_price': tx_impact['pre_price'], 'post_price': tx_impact['post_price'], 'impact_pct': tx_impact['impact_pct'] }) if not txs_with_impact: return [] impact_df = pd.DataFrame(txs_with_impact) impact_df = impact_df.sort_values(by='timestamp') # Look for large price impacts followed by increased trading activity momentum_alerts = [] # Find high-impact transactions high_impact_txs = impact_df[abs(impact_df['impact_pct']) > impact_threshold] for idx, high_impact_tx in high_impact_txs.iterrows(): tx_time = high_impact_tx['timestamp'] # Look for increased trading activity after the high-impact transaction follow_up_window = impact_df[ (impact_df['timestamp'] > tx_time) & (impact_df['timestamp'] <= tx_time + pd.Timedelta(minutes=time_window_minutes)) ] # Compare activity to baseline (same time window before the transaction) baseline_window = impact_df[ (impact_df['timestamp'] < tx_time) & (impact_df['timestamp'] >= tx_time - pd.Timedelta(minutes=time_window_minutes)) ] if len(follow_up_window) > len(baseline_window) * 1.5 and len(follow_up_window) >= 3: # Create chart fig = go.Figure() # Plot price timeline all_relevant_txs = pd.concat([ pd.DataFrame([high_impact_tx]), follow_up_window, baseline_window ]).sort_values(by='timestamp') # Create time series for price timestamps = all_relevant_txs['timestamp'] prices = [] for _, row in all_relevant_txs.iterrows(): prices.append(row['pre_price']) prices.append(row['post_price']) times_expanded = [] for t in timestamps: times_expanded.append(t - pd.Timedelta(seconds=30)) times_expanded.append(t + pd.Timedelta(seconds=30)) # Plot price line fig.add_trace(go.Scatter( x=times_expanded[:len(prices)], # In case of any length mismatch y=prices[:len(times_expanded)], mode='lines', name='Price' )) # Highlight the high-impact transaction fig.add_trace(go.Scatter( x=[high_impact_tx['timestamp']], y=[high_impact_tx['post_price']], mode='markers', marker=dict( size=15, color='red', symbol='circle' ), name='Momentum Ignition' )) # Highlight the follow-up transactions if not follow_up_window.empty: fig.add_trace(go.Scatter( x=follow_up_window['timestamp'], y=follow_up_window['post_price'], mode='markers', marker=dict( size=10, color='orange', symbol='circle' ), name='Follow-up Activity' )) fig.update_layout( title='Potential Momentum Ignition Pattern', xaxis_title='Time', yaxis_title='Price', hovermode='closest' ) momentum_alerts.append({ "type": "Momentum Ignition", "addresses": [high_impact_tx['from']], "risk_level": "High" if abs(high_impact_tx['impact_pct']) > impact_threshold * 1.5 else "Medium", "description": f"Large {high_impact_tx['impact_pct']:.2f}% price move followed by {len(follow_up_window)} transactions in {time_window_minutes} minutes (vs {len(baseline_window)} in baseline)", "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "title": f"Momentum Ignition ({high_impact_tx['impact_pct']:.1f}% price move)", "evidence": pd.concat([pd.DataFrame([high_impact_tx]), follow_up_window]), "chart": fig }) return momentum_alerts def run_all_detections(self, transactions_df: pd.DataFrame, addresses: List[str], price_data: Dict[str, Dict[str, Any]] = None, order_book_data: Optional[pd.DataFrame] = None, sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Run all manipulation detection algorithms Args: transactions_df: DataFrame of transactions addresses: List of addresses to analyze price_data: Optional dictionary of price impact data for each transaction order_book_data: Optional DataFrame of order book data sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential manipulation alerts """ if transactions_df.empty: return [] all_alerts = [] # Detect wash trading wash_trading_alerts = self.detect_wash_trading( transactions_df=transactions_df, addresses=addresses, sensitivity=sensitivity ) all_alerts.extend(wash_trading_alerts) # Detect pump and dump (if price data available) if price_data: pump_and_dump_alerts = self.detect_pump_and_dump( transactions_df=transactions_df, price_data=price_data, sensitivity=sensitivity ) all_alerts.extend(pump_and_dump_alerts) # Detect momentum ignition (if price data available) momentum_alerts = self.detect_momentum_ignition( transactions_df=transactions_df, price_data=price_data, sensitivity=sensitivity ) all_alerts.extend(momentum_alerts) # Detect spoofing (if order book data available) if order_book_data is not None: spoofing_alerts = self.detect_spoofing( transactions_df=transactions_df, order_book_data=order_book_data, sensitivity=sensitivity ) all_alerts.extend(spoofing_alerts) # Detect layering (if order book data available) layering_alerts = self.detect_layering( transactions_df=transactions_df, order_book_data=order_book_data, sensitivity=sensitivity ) all_alerts.extend(layering_alerts) # Sort alerts by risk level risk_order = {"High": 0, "Medium": 1, "Low": 2} all_alerts.sort(key=lambda x: risk_order.get(x.get("risk_level", "Low"), 3)) return all_alerts