import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Union, Any, Tuple from sklearn.cluster import KMeans, DBSCAN from sklearn.preprocessing import StandardScaler import plotly.graph_objects as go import plotly.express as px import logging import time class DataProcessor: """ Process and analyze transaction data from blockchain APIs """ def __init__(self): pass def aggregate_transactions(self, transactions_df: pd.DataFrame, time_window: str = 'D') -> pd.DataFrame: """ Aggregate transactions by time window Args: transactions_df: DataFrame of transactions time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) Returns: Aggregated DataFrame with transaction counts and volumes """ if transactions_df.empty: return pd.DataFrame() # Ensure timestamp column is datetime if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure amount column exists if 'Amount' in transactions_df.columns: amount_col = 'Amount' elif 'tokenAmount' in transactions_df.columns: amount_col = 'tokenAmount' elif 'value' in transactions_df.columns: # Try to adjust for decimals if 'tokenDecimal' exists if 'tokenDecimal' in transactions_df.columns: transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) amount_col = 'adjustedValue' else: amount_col = 'value' else: raise ValueError("Amount column not found in transactions DataFrame") # Resample by time window transactions_df = transactions_df.copy() try: transactions_df.set_index(pd.DatetimeIndex(transactions_df[timestamp_col]), inplace=True) except Exception as e: print(f"Error setting DatetimeIndex: {str(e)}") # Create a safe index as a fallback transactions_df['safe_timestamp'] = pd.date_range( start='2025-01-01', periods=len(transactions_df), freq='H' ) transactions_df.set_index('safe_timestamp', inplace=True) # Identify buy vs sell transactions based on 'from' and 'to' addresses if 'From' in transactions_df.columns and 'To' in transactions_df.columns: from_col, to_col = 'From', 'To' elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: from_col, to_col = 'from', 'to' else: # If we can't determine direction, just aggregate total volume agg_df = transactions_df.resample(time_window).agg({ amount_col: 'sum', timestamp_col: 'count' }) agg_df.columns = ['Volume', 'Count'] return agg_df.reset_index() # Calculate net flow for each wallet address (positive = inflow, negative = outflow) wallet_addresses = set(transactions_df[from_col].unique()) | set(transactions_df[to_col].unique()) results = [] for wallet in wallet_addresses: wallet_df = transactions_df.copy() # Mark transactions as inflow or outflow wallet_df['Direction'] = 'Unknown' wallet_df.loc[wallet_df[to_col] == wallet, 'Direction'] = 'In' wallet_df.loc[wallet_df[from_col] == wallet, 'Direction'] = 'Out' # Calculate net flow wallet_df['NetFlow'] = wallet_df[amount_col] wallet_df.loc[wallet_df['Direction'] == 'Out', 'NetFlow'] = -wallet_df.loc[wallet_df['Direction'] == 'Out', amount_col] # Aggregate by time window wallet_agg = wallet_df.resample(time_window).agg({ 'NetFlow': 'sum', timestamp_col: 'count' }) wallet_agg.columns = ['NetFlow', 'Count'] wallet_agg['Wallet'] = wallet results.append(wallet_agg.reset_index()) if not results: return pd.DataFrame() combined_df = pd.concat(results, ignore_index=True) return combined_df # Cache for pattern identification to avoid repeating expensive calculations _pattern_cache = {} def identify_patterns(self, transactions_df: pd.DataFrame, n_clusters: int = 3) -> List[Dict[str, Any]]: """ Identify trading patterns using clustering algorithms Args: transactions_df: DataFrame of transactions n_clusters: Number of clusters to identify Returns: List of pattern dictionaries containing name, description, and confidence """ # Check for empty data early to avoid processing if transactions_df.empty: return [] # Create a cache key based on DataFrame hash and number of clusters try: cache_key = f"{hash(tuple(transactions_df.columns))}_{len(transactions_df)}_{n_clusters}" # Check cache first if cache_key in self._pattern_cache: return self._pattern_cache[cache_key] except Exception: # If hashing fails, proceed without caching cache_key = None try: # Create a reference instead of a deep copy to improve memory usage df = transactions_df # Ensure timestamp column exists - optimize column presence checks timestamp_cols = ['Timestamp', 'timeStamp'] timestamp_col = next((col for col in timestamp_cols if col in df.columns), None) if timestamp_col: # Convert timestamp only if needed if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): try: # Use vectorized operations instead of astype where possible if df[timestamp_col].dtype == 'object': df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') else: df[timestamp_col] = pd.to_datetime(df[timestamp_col], unit='s', errors='coerce') except Exception as e: # Create a date range index as fallback df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') timestamp_col = 'dummy_timestamp' else: # If no timestamp column, create a dummy index df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') timestamp_col = 'dummy_timestamp' # Efficiently calculate floor hour using vectorized operations df['hour'] = df[timestamp_col].dt.floor('H') # Check for address columns efficiently if 'From' in df.columns and 'To' in df.columns: from_col, to_col = 'From', 'To' elif 'from' in df.columns and 'to' in df.columns: from_col, to_col = 'from', 'to' else: # Create dummy addresses only if necessary df['from'] = [f'0x{i:040x}' for i in range(len(df))] df['to'] = [f'0x{(i+1):040x}' for i in range(len(df))] from_col, to_col = 'from', 'to' # Efficiently determine amount column amount_cols = ['Amount', 'tokenAmount', 'value', 'adjustedValue'] amount_col = next((col for col in amount_cols if col in df.columns), None) if not amount_col: # Handle special case for token values with decimals if 'value' in df.columns and 'tokenDecimal' in df.columns: # Vectorized calculation for improved performance try: # Ensure values are numeric df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce') df['tokenDecimal_numeric'] = pd.to_numeric(df['tokenDecimal'], errors='coerce').fillna(18) df['adjustedValue'] = df['value_numeric'] / (10 ** df['tokenDecimal_numeric']) amount_col = 'adjustedValue' except Exception as e: logging.warning(f"Error converting values: {e}") df['dummy_amount'] = 1.0 amount_col = 'dummy_amount' else: # Fallback to dummy values df['dummy_amount'] = 1.0 amount_col = 'dummy_amount' # Ensure the amount column is numeric try: if amount_col in df.columns: df[f"{amount_col}_numeric"] = pd.to_numeric(df[amount_col], errors='coerce').fillna(0) amount_col = f"{amount_col}_numeric" except Exception: # If conversion fails, create a dummy numeric column df['safe_amount'] = 1.0 amount_col = 'safe_amount' # Calculate metrics using optimized groupby operations # Use a more efficient approach with built-in pandas aggregation agg_df = df.groupby('hour').agg( Count=pd.NamedAgg(column=from_col, aggfunc='count'), ).reset_index() # For NetFlow calculation, we need an additional pass # This uses a more efficient calculation method def calc_netflow(group): # Use optimized filtering and calculations for better performance first_to = group[to_col].iloc[0] if len(group) > 0 else None first_from = group[from_col].iloc[0] if len(group) > 0 else None if first_to is not None and first_from is not None: # Ensure values are converted to numeric before summing try: # Convert to numeric with pd.to_numeric, coerce errors to NaN total_in = pd.to_numeric(group.loc[group[to_col] == first_to, amount_col], errors='coerce').sum() total_out = pd.to_numeric(group.loc[group[from_col] == first_from, amount_col], errors='coerce').sum() # Replace NaN with 0 to avoid propagation if pd.isna(total_in): total_in = 0.0 if pd.isna(total_out): total_out = 0.0 return float(total_in) - float(total_out) except Exception as e: import logging logging.debug(f"Error converting values to numeric: {e}") return 0.0 return 0.0 # Calculate NetFlow using apply instead of loop netflows = df.groupby('hour').apply(calc_netflow) agg_df['NetFlow'] = netflows.values # Early return if not enough data for clustering if agg_df.empty or len(agg_df) < n_clusters: return [] # Ensure we don't have too many clusters for the dataset actual_n_clusters = min(n_clusters, max(2, len(agg_df) // 2)) # Prepare features for clustering - with careful type handling try: if 'NetFlow' in agg_df.columns: # Ensure NetFlow is numeric agg_df['NetFlow'] = pd.to_numeric(agg_df['NetFlow'], errors='coerce').fillna(0) features = agg_df[['NetFlow', 'Count']].copy() primary_metric = 'NetFlow' else: # Calculate Volume if needed if 'Volume' not in agg_df.columns and amount_col in df.columns: # Calculate volume with numeric conversion volume_by_hour = pd.to_numeric(df[amount_col], errors='coerce').fillna(0).groupby(df['hour']).sum() agg_df['Volume'] = agg_df['hour'].map(volume_by_hour) # Ensure Volume exists and is numeric if 'Volume' not in agg_df.columns: agg_df['Volume'] = 1.0 # Default value if calculation failed else: agg_df['Volume'] = pd.to_numeric(agg_df['Volume'], errors='coerce').fillna(1.0) # Ensure Count is numeric agg_df['Count'] = pd.to_numeric(agg_df['Count'], errors='coerce').fillna(1.0) features = agg_df[['Volume', 'Count']].copy() primary_metric = 'Volume' # Final check to ensure features are numeric for col in features.columns: features[col] = pd.to_numeric(features[col], errors='coerce').fillna(0) except Exception as e: logging.warning(f"Error preparing clustering features: {e}") # Create safe dummy features if everything else fails agg_df['SafeFeature'] = 1.0 agg_df['Count'] = 1.0 features = agg_df[['SafeFeature', 'Count']].copy() primary_metric = 'SafeFeature' # Scale features - import only when needed for efficiency from sklearn.preprocessing import StandardScaler scaler = StandardScaler() scaled_features = scaler.fit_transform(features) # Use K-Means with reduced complexity from sklearn.cluster import KMeans kmeans = KMeans(n_clusters=actual_n_clusters, random_state=42, n_init=10, max_iter=100) agg_df['Cluster'] = kmeans.fit_predict(scaled_features) # Calculate time-based metrics from the hour column directly if 'hour' in agg_df.columns: try: # Convert to datetime for hour and day extraction if needed hour_series = pd.to_datetime(agg_df['hour']) agg_df['Hour'] = hour_series.dt.hour agg_df['Day'] = hour_series.dt.dayofweek except Exception: # Fallback for non-convertible data agg_df['Hour'] = 0 agg_df['Day'] = 0 else: # Default values if no hour column agg_df['Hour'] = 0 agg_df['Day'] = 0 # Identify patterns efficiently patterns = [] for i in range(actual_n_clusters): # Use boolean indexing for better performance cluster_mask = agg_df['Cluster'] == i cluster_df = agg_df[cluster_mask] if len(cluster_df) == 0: continue if primary_metric == 'NetFlow': # Use numpy methods for faster calculation avg_flow = cluster_df['NetFlow'].mean() flow_std = cluster_df['NetFlow'].std() behavior = "Accumulation" if avg_flow > 0 else "Distribution" volume_metric = f"Net Flow: {avg_flow:.2f} ± {flow_std:.2f}" else: # Use Volume metrics - optimize to avoid redundant calculations avg_volume = cluster_df['Volume'].mean() if 'Volume' in cluster_df else 0 volume_std = cluster_df['Volume'].std() if 'Volume' in cluster_df else 0 behavior = "High Volume" if 'Volume' in agg_df and avg_volume > agg_df['Volume'].mean() else "Low Volume" volume_metric = f"Volume: {avg_volume:.2f} ± {volume_std:.2f}" # Pattern characteristics pattern_metrics = { "avg_flow": avg_flow, "flow_std": flow_std, "avg_count": cluster_df['Count'].mean(), "max_flow": cluster_df['NetFlow'].max(), "min_flow": cluster_df['NetFlow'].min(), "common_hour": cluster_df['Hour'].mode()[0] if not cluster_df['Hour'].empty else None, "common_day": cluster_df['Day'].mode()[0] if not cluster_df['Day'].empty else None } # Enhanced confidence calculation if primary_metric == 'NetFlow': # Calculate within-cluster variance as a percentage of total variance cluster_variance = cluster_df['NetFlow'].var() total_variance = agg_df['NetFlow'].var() or 1 # Avoid division by zero confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) else: # Calculate within-cluster variance as a percentage of total variance cluster_variance = cluster_df['Volume'].var() total_variance = agg_df['Volume'].var() or 1 # Avoid division by zero confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) # Create enhanced pattern charts - Main Chart if primary_metric == 'NetFlow': main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', size='Count', color='Cluster', title=f"Pattern {i+1}: {behavior}", labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) # Add a trend line main_fig.add_trace(go.Scatter( x=cluster_df.index, y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), mode='lines', name='Trend', line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') )) # Add a zero reference line main_fig.add_shape( type="line", x0=cluster_df.index.min(), y0=0, x1=cluster_df.index.max(), y1=0, line=dict(color="red", width=1, dash="dot"), ) else: main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', size='Count', color='Cluster', title=f"Pattern {i+1}: {behavior}", labels={'Volume': 'Transaction Volume', 'index': 'Time'}, color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) # Add a trend line main_fig.add_trace(go.Scatter( x=cluster_df.index, y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), mode='lines', name='Trend', line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') )) main_fig.update_layout( template="plotly_white", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), margin=dict(l=20, r=20, t=50, b=20), height=400 ) # Create hourly distribution chart hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, title="Hourly Distribution", labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, color_discrete_sequence=['#1f77b4']) hour_fig.update_layout(template="plotly_white", height=300) # Create volume/flow distribution chart if primary_metric == 'NetFlow': hist_data = cluster_df['NetFlow'] hist_title = "Net Flow Distribution" hist_label = "Net Flow" else: hist_data = cluster_df['Volume'] hist_title = "Volume Distribution" hist_label = "Volume" dist_fig = px.histogram(hist_data, title=hist_title, labels={'value': hist_label, 'count': 'Frequency'}, color_discrete_sequence=['#2ca02c']) dist_fig.update_layout(template="plotly_white", height=300) # Find related transactions if not transactions_df.empty: # Get timestamps from this cluster cluster_times = pd.to_datetime(cluster_df.index) # Create time windows for matching time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] # Find transactions within these time windows pattern_txs = transactions_df[transactions_df[timestamp_col].apply( lambda x: any((start <= x <= end) for start, end in time_windows) )].copy() # If we have too many, sample them if len(pattern_txs) > 10: pattern_txs = pattern_txs.sample(10) # If we have too few, just sample from all transactions if len(pattern_txs) < 5 and len(transactions_df) >= 5: pattern_txs = transactions_df.sample(min(5, len(transactions_df))) else: pattern_txs = pd.DataFrame() # Comprehensive pattern dictionary pattern = { "name": behavior, "description": f"This pattern shows {behavior.lower()} activity.", "strategy": "Unknown", "risk_profile": "Unknown", "time_insight": "Unknown", "cluster_id": i, "metrics": pattern_metrics, "occurrence_count": len(cluster_df), "volume_metric": volume_metric, "confidence": confidence, "impact": 0.0, "charts": { "main": main_fig, "hourly_distribution": hour_fig, "value_distribution": dist_fig }, "examples": pattern_txs } patterns.append(pattern) # Cache results for future reuse if cache_key: self._pattern_cache[cache_key] = patterns return patterns except Exception as e: import logging logging.warning(f"Error during pattern identification: {str(e)}") return [] # Create enhanced pattern detection method with visualization capabilities if primary_metric == 'NetFlow': main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', size='Count', color='Cluster', title=f"Pattern {i+1}: {behavior}", labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) # Add a trend line main_fig.add_trace(go.Scatter( x=cluster_df.index, y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), mode='lines', name='Trend', line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') )) # Add a zero reference line main_fig.add_shape( type="line", x0=cluster_df.index.min(), y0=0, x1=cluster_df.index.max(), y1=0, line=dict(color="red", width=1, dash="dot"), ) else: main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', size='Count', color='Cluster', title=f"Pattern {i+1}: {behavior}", labels={'Volume': 'Transaction Volume', 'index': 'Time'}, color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) # Add a trend line main_fig.add_trace(go.Scatter( x=cluster_df.index, y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), mode='lines', name='Trend', line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') )) main_fig.update_layout( template="plotly_white", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), margin=dict(l=20, r=20, t=50, b=20), height=400 ) # Create hourly distribution chart hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, title="Hourly Distribution", labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, color_discrete_sequence=['#1f77b4']) hour_fig.update_layout(template="plotly_white", height=300) # Create volume/flow distribution chart if primary_metric == 'NetFlow': hist_data = cluster_df['NetFlow'] hist_title = "Net Flow Distribution" hist_label = "Net Flow" else: hist_data = cluster_df['Volume'] hist_title = "Volume Distribution" hist_label = "Volume" dist_fig = px.histogram(hist_data, title=hist_title, labels={'value': hist_label, 'count': 'Frequency'}, color_discrete_sequence=['#2ca02c']) dist_fig.update_layout(template="plotly_white", height=300) # Find related transactions if not transactions_df.empty: # Get timestamps from this cluster cluster_times = pd.to_datetime(cluster_df.index) # Create time windows for matching time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] # Find transactions within these time windows pattern_txs = transactions_df[transactions_df[timestamp_col].apply( lambda x: any((start <= x <= end) for start, end in time_windows) )].copy() # If we have too many, sample them if len(pattern_txs) > 10: pattern_txs = pattern_txs.sample(10) # If we have too few, just sample from all transactions if len(pattern_txs) < 5 and len(transactions_df) >= 5: pattern_txs = transactions_df.sample(min(5, len(transactions_df))) else: pattern_txs = pd.DataFrame() # Comprehensive pattern dictionary pattern = { "name": behavior, "description": description, "strategy": strategy, "risk_profile": risk_profile, "time_insight": time_insight, "cluster_id": i, "metrics": pattern_metrics, "occurrence_count": len(cluster_df), "volume_metric": volume_metric, "confidence": confidence, "charts": { "main": main_fig, "hourly_distribution": hour_fig, "value_distribution": dist_fig }, "examples": pattern_txs } patterns.append(pattern) return patterns def detect_anomalous_transactions(self, transactions_df: pd.DataFrame, sensitivity: str = "Medium") -> pd.DataFrame: """ Detect anomalous transactions using statistical methods Args: transactions_df: DataFrame of transactions sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: DataFrame of anomalous transactions """ if transactions_df.empty: return pd.DataFrame() # Ensure amount column exists if 'Amount' in transactions_df.columns: amount_col = 'Amount' elif 'tokenAmount' in transactions_df.columns: amount_col = 'tokenAmount' elif 'value' in transactions_df.columns: # Try to adjust for decimals if 'tokenDecimal' exists if 'tokenDecimal' in transactions_df.columns: transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) amount_col = 'adjustedValue' else: amount_col = 'value' else: raise ValueError("Amount column not found in transactions DataFrame") # Define sensitivity thresholds if sensitivity == "Low": z_threshold = 3.0 # Outliers beyond 3 standard deviations elif sensitivity == "Medium": z_threshold = 2.5 # Outliers beyond 2.5 standard deviations else: # High z_threshold = 2.0 # Outliers beyond 2 standard deviations # Calculate z-score for amount mean_amount = transactions_df[amount_col].mean() std_amount = transactions_df[amount_col].std() if std_amount == 0: return pd.DataFrame() transactions_df['z_score'] = abs((transactions_df[amount_col] - mean_amount) / std_amount) # Flag anomalous transactions anomalies = transactions_df[transactions_df['z_score'] > z_threshold].copy() # Add risk level based on z-score anomalies['risk_level'] = 'Medium' anomalies.loc[anomalies['z_score'] > z_threshold * 1.5, 'risk_level'] = 'High' anomalies.loc[anomalies['z_score'] <= z_threshold * 1.2, 'risk_level'] = 'Low' return anomalies def analyze_price_impact(self, transactions_df: pd.DataFrame, price_data: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: """ Analyze the price impact of transactions with enhanced visualizations Args: transactions_df: DataFrame of transactions price_data: Dictionary of price impact data for each transaction Returns: Dictionary with comprehensive price impact analysis and visualizations """ if transactions_df.empty or not price_data: # Create an empty chart for the default case empty_fig = go.Figure() empty_fig.update_layout( title="No Price Impact Data Available", xaxis_title="Time", yaxis_title="Price Impact (%)", height=400, template="plotly_white" ) empty_fig.add_annotation( text="No transactions found with price impact data", showarrow=False, font=dict(size=14) ) return { 'avg_impact_pct': 0, 'max_impact_pct': 0, 'min_impact_pct': 0, 'significant_moves_count': 0, 'total_transactions': 0, 'charts': { 'main_chart': empty_fig, 'impact_distribution': empty_fig, 'cumulative_impact': empty_fig, 'hourly_impact': empty_fig }, 'transactions_with_impact': pd.DataFrame(), 'insights': [], 'impact_summary': "No price impact data available" } # Ensure timestamp column is datetime if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' # Convert timestamp to datetime if it's not already if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') else: raise ValueError("Timestamp column not found in transactions DataFrame") # Combine price impact data with transactions impact_data = [] for idx, row in transactions_df.iterrows(): tx_hash = row.get('Transaction Hash', row.get('hash', None)) if not tx_hash or tx_hash not in price_data: continue tx_impact = price_data[tx_hash] if tx_impact['impact_pct'] is None: continue # Get token symbol if available token_symbol = row.get('tokenSymbol', 'Unknown') token_amount = row.get('value', 0) if 'tokenDecimal' in row: try: token_amount = float(token_amount) / (10 ** int(row.get('tokenDecimal', 0))) except (ValueError, TypeError): token_amount = 0 impact_data.append({ 'transaction_hash': tx_hash, 'timestamp': row[timestamp_col], 'pre_price': tx_impact['pre_price'], 'post_price': tx_impact['post_price'], 'impact_pct': tx_impact['impact_pct'], 'token_symbol': token_symbol, 'token_amount': token_amount, 'from': row.get('from', ''), 'to': row.get('to', ''), 'hour': row[timestamp_col].hour if isinstance(row[timestamp_col], pd.Timestamp) else 0 }) if not impact_data: # Create an empty chart for the default case empty_fig = go.Figure() empty_fig.update_layout( title="No Price Impact Data Available", xaxis_title="Time", yaxis_title="Price Impact (%)", height=400, template="plotly_white" ) empty_fig.add_annotation( text="No transactions found with price impact data", showarrow=False, font=dict(size=14) ) return { 'avg_impact_pct': 0, 'max_impact_pct': 0, 'min_impact_pct': 0, 'significant_moves_count': 0, 'total_transactions': len(transactions_df) if not transactions_df.empty else 0, 'charts': { 'main_chart': empty_fig, 'impact_distribution': empty_fig, 'cumulative_impact': empty_fig, 'hourly_impact': empty_fig }, 'transactions_with_impact': pd.DataFrame(), 'insights': [], 'impact_summary': "No price impact data available" } impact_df = pd.DataFrame(impact_data) # Calculate aggregate metrics avg_impact = impact_df['impact_pct'].mean() max_impact = impact_df['impact_pct'].max() min_impact = impact_df['impact_pct'].min() median_impact = impact_df['impact_pct'].median() std_impact = impact_df['impact_pct'].std() # Count significant moves (>1% impact) significant_threshold = 1.0 high_impact_threshold = 3.0 significant_moves = len(impact_df[abs(impact_df['impact_pct']) > significant_threshold]) high_impact_moves = len(impact_df[abs(impact_df['impact_pct']) > high_impact_threshold]) positive_impacts = len(impact_df[impact_df['impact_pct'] > 0]) negative_impacts = len(impact_df[impact_df['impact_pct'] < 0]) # Calculate cumulative impact impact_df = impact_df.sort_values('timestamp') impact_df['cumulative_impact'] = impact_df['impact_pct'].cumsum() # Generate insights insights = [] # Market direction bias if avg_impact > 0.5: insights.append({ "title": "Positive Price Pressure", "description": f"Transactions show an overall positive price impact of {avg_impact:.2f}%, suggesting accumulation or market strength." }) elif avg_impact < -0.5: insights.append({ "title": "Negative Price Pressure", "description": f"Transactions show an overall negative price impact of {avg_impact:.2f}%, suggesting distribution or market weakness." }) # Volatility analysis if std_impact > 2.0: insights.append({ "title": "High Market Volatility", "description": f"Price impact shows high volatility (std: {std_impact:.2f}%), indicating potential market manipulation or whipsaw conditions." }) # Significant impacts if high_impact_moves > 0: insights.append({ "title": "High Impact Transactions", "description": f"Detected {high_impact_moves} high-impact transactions (>{high_impact_threshold}% price change), indicating potential market-moving activity." }) # Temporal patterns hourly_impact = impact_df.groupby('hour')['impact_pct'].mean() if len(hourly_impact) > 0: max_hour = hourly_impact.abs().idxmax() max_hour_impact = hourly_impact[max_hour] insights.append({ "title": "Time-Based Pattern", "description": f"Highest price impact occurs around {max_hour}:00 with an average of {max_hour_impact:.2f}%." }) # Create impact summary text impact_summary = f"Analysis of {len(impact_df)} price-impacting transactions shows an average impact of {avg_impact:.2f}% " impact_summary += f"(range: {min_impact:.2f}% to {max_impact:.2f}%). " impact_summary += f"Found {significant_moves} significant price moves and {high_impact_moves} high-impact transactions. " if positive_impacts > negative_impacts: impact_summary += f"There is a bias towards positive price impact ({positive_impacts} positive vs {negative_impacts} negative)." elif negative_impacts > positive_impacts: impact_summary += f"There is a bias towards negative price impact ({negative_impacts} negative vs {positive_impacts} positive)." else: impact_summary += "The price impact is balanced between positive and negative moves." # Create enhanced main visualization main_fig = go.Figure() # Add scatter plot for impact main_fig.add_trace(go.Scatter( x=impact_df['timestamp'], y=impact_df['impact_pct'], mode='markers+lines', marker=dict( size=impact_df['impact_pct'].abs() * 1.5 + 5, color=impact_df['impact_pct'], colorscale='RdBu_r', line=dict(width=1), symbol=['circle' if val >= 0 else 'diamond' for val in impact_df['impact_pct']] ), text=[ f"TX: {tx[:8]}...{tx[-6:]}
" + f"Impact: {impact:.2f}%
" + f"Token: {token} ({amount:.4f})
" + f"From: {src[:6]}...{src[-4:]}
" + f"To: {dst[:6]}...{dst[-4:]}" for tx, impact, token, amount, src, dst in zip( impact_df['transaction_hash'], impact_df['impact_pct'], impact_df['token_symbol'], impact_df['token_amount'], impact_df['from'], impact_df['to'] ) ], hovertemplate='%{text}
Time: %{x}', name='Price Impact' )) # Add a moving average trendline window_size = max(3, len(impact_df) // 10) # Dynamic window size if len(impact_df) >= window_size: impact_df['ma'] = impact_df['impact_pct'].rolling(window=window_size, min_periods=1).mean() main_fig.add_trace(go.Scatter( x=impact_df['timestamp'], y=impact_df['ma'], mode='lines', line=dict(width=2, color='rgba(255,165,0,0.7)'), name=f'Moving Avg ({window_size} period)' )) # Add a zero line for reference main_fig.add_shape( type='line', x0=impact_df['timestamp'].min(), y0=0, x1=impact_df['timestamp'].max(), y1=0, line=dict(color='gray', width=1, dash='dash') ) # Add colored regions for significant impact # Add green band for normal price movement main_fig.add_shape( type='rect', x0=impact_df['timestamp'].min(), y0=-significant_threshold, x1=impact_df['timestamp'].max(), y1=significant_threshold, fillcolor='rgba(0,255,0,0.1)', line=dict(width=0), layer='below' ) # Add warning bands for higher impact movements main_fig.add_shape( type='rect', x0=impact_df['timestamp'].min(), y0=significant_threshold, x1=impact_df['timestamp'].max(), y1=high_impact_threshold, fillcolor='rgba(255,255,0,0.1)', line=dict(width=0), layer='below' ) main_fig.add_shape( type='rect', x0=impact_df['timestamp'].min(), y0=-high_impact_threshold, x1=impact_df['timestamp'].max(), y1=-significant_threshold, fillcolor='rgba(255,255,0,0.1)', line=dict(width=0), layer='below' ) # Add high impact regions main_fig.add_shape( type='rect', x0=impact_df['timestamp'].min(), y0=high_impact_threshold, x1=impact_df['timestamp'].max(), y1=max(high_impact_threshold * 2, max_impact * 1.1), fillcolor='rgba(255,0,0,0.1)', line=dict(width=0), layer='below' ) main_fig.add_shape( type='rect', x0=impact_df['timestamp'].min(), y0=min(high_impact_threshold * -2, min_impact * 1.1), x1=impact_df['timestamp'].max(), y1=-high_impact_threshold, fillcolor='rgba(255,0,0,0.1)', line=dict(width=0), layer='below' ) main_fig.update_layout( title='Price Impact of Whale Transactions', xaxis_title='Timestamp', yaxis_title='Price Impact (%)', hovermode='closest', template="plotly_white", legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), margin=dict(l=20, r=20, t=50, b=20) ) # Create impact distribution histogram dist_fig = px.histogram( impact_df['impact_pct'], nbins=20, labels={'value': 'Price Impact (%)', 'count': 'Frequency'}, title='Distribution of Price Impact', color_discrete_sequence=['#3366CC'] ) # Add a vertical line at the mean dist_fig.add_vline(x=avg_impact, line_dash="dash", line_color="red") dist_fig.add_annotation(x=avg_impact, y=0.85, yref="paper", text=f"Mean: {avg_impact:.2f}%", showarrow=True, arrowhead=2, arrowcolor="red", ax=40) # Add a vertical line at zero dist_fig.add_vline(x=0, line_dash="solid", line_color="black") dist_fig.update_layout( template="plotly_white", bargap=0.1, height=350 ) # Create cumulative impact chart cumul_fig = go.Figure() cumul_fig.add_trace(go.Scatter( x=impact_df['timestamp'], y=impact_df['cumulative_impact'], mode='lines', fill='tozeroy', line=dict(width=2, color='#2ca02c'), name='Cumulative Impact' )) cumul_fig.update_layout( title='Cumulative Price Impact Over Time', xaxis_title='Timestamp', yaxis_title='Cumulative Price Impact (%)', template="plotly_white", height=350 ) # Create hourly impact analysis hourly_impact = impact_df.groupby('hour')['impact_pct'].agg(['mean', 'count', 'std']).reset_index() hourly_impact = hourly_impact.sort_values('hour') hour_fig = go.Figure() hour_fig.add_trace(go.Bar( x=hourly_impact['hour'], y=hourly_impact['mean'], error_y=dict(type='data', array=hourly_impact['std'], visible=True), marker_color=hourly_impact['mean'].apply(lambda x: 'green' if x > 0 else 'red'), name='Average Impact' )) hour_fig.update_layout( title='Price Impact by Hour of Day', xaxis_title='Hour of Day', yaxis_title='Average Price Impact (%)', template="plotly_white", height=350, xaxis=dict(tickmode='linear', tick0=0, dtick=2) ) # Join with original transactions transactions_df = transactions_df.copy() transactions_df['Timestamp_key'] = transactions_df[timestamp_col] impact_df['Timestamp_key'] = impact_df['timestamp'] merged_df = pd.merge( transactions_df, impact_df[['Timestamp_key', 'impact_pct', 'pre_price', 'post_price', 'cumulative_impact']], on='Timestamp_key', how='left' ) # Final result with enhanced output return { 'avg_impact_pct': avg_impact, 'max_impact_pct': max_impact, 'min_impact_pct': min_impact, 'median_impact_pct': median_impact, 'std_impact_pct': std_impact, 'significant_moves_count': significant_moves, 'high_impact_moves_count': high_impact_moves, 'positive_impacts_count': positive_impacts, 'negative_impacts_count': negative_impacts, 'total_transactions': len(transactions_df), 'charts': { 'main_chart': main_fig, 'impact_distribution': dist_fig, 'cumulative_impact': cumul_fig, 'hourly_impact': hour_fig }, 'transactions_with_impact': merged_df, 'insights': insights, 'impact_summary': impact_summary } def detect_wash_trading(self, transactions_df: pd.DataFrame, addresses: List[str], time_window_minutes: int = 60, sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential wash trading between addresses Args: transactions_df: DataFrame of transactions addresses: List of addresses to analyze time_window_minutes: Time window for detecting wash trades sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential wash trading incidents """ if transactions_df.empty or not addresses: return [] # Ensure from/to columns exist if 'From' in transactions_df.columns and 'To' in transactions_df.columns: from_col, to_col = 'From', 'To' elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: from_col, to_col = 'from', 'to' else: raise ValueError("From/To columns not found in transactions DataFrame") # Ensure timestamp column exists if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure timestamp is datetime if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) # Define sensitivity thresholds if sensitivity == "Low": min_cycles = 3 # Minimum number of back-and-forth transactions max_time_diff = 120 # Maximum minutes between transactions elif sensitivity == "Medium": min_cycles = 2 max_time_diff = 60 else: # High min_cycles = 1 max_time_diff = 30 # Filter transactions involving the addresses address_txs = transactions_df[ (transactions_df[from_col].isin(addresses)) | (transactions_df[to_col].isin(addresses)) ].copy() if address_txs.empty: return [] # Sort by timestamp address_txs = address_txs.sort_values(by=timestamp_col) # Detect cycles of transactions between same addresses wash_trades = [] for addr1 in addresses: for addr2 in addresses: if addr1 == addr2: continue # Find transactions from addr1 to addr2 a1_to_a2 = address_txs[ (address_txs[from_col] == addr1) & (address_txs[to_col] == addr2) ] # Find transactions from addr2 to addr1 a2_to_a1 = address_txs[ (address_txs[from_col] == addr2) & (address_txs[to_col] == addr1) ] if a1_to_a2.empty or a2_to_a1.empty: continue # Check for back-and-forth patterns cycles = 0 evidence = [] for _, tx1 in a1_to_a2.iterrows(): tx1_time = tx1[timestamp_col] # Find return transactions within the time window return_txs = a2_to_a1[ (a2_to_a1[timestamp_col] > tx1_time) & (a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff)) ] if not return_txs.empty: cycles += 1 evidence.append(tx1) evidence.append(return_txs.iloc[0]) if cycles >= min_cycles: # Create visualization if evidence: evidence_df = pd.DataFrame(evidence) fig = px.scatter( evidence_df, x=timestamp_col, y=evidence_df.get('Amount', evidence_df.get('tokenAmount', evidence_df.get('value', 0))), color=from_col, title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..." ) else: fig = None wash_trades.append({ "type": "Wash Trading", "addresses": [addr1, addr2], "risk_level": "High" if cycles >= min_cycles * 2 else "Medium", "description": f"Detected {cycles} cycles of back-and-forth transactions between addresses", "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "title": f"Wash Trading Pattern ({cycles} cycles)", "evidence": pd.DataFrame(evidence) if evidence else None, "chart": fig }) return wash_trades def detect_pump_and_dump(self, transactions_df: pd.DataFrame, price_data: Dict[str, Dict[str, Any]], sensitivity: str = "Medium") -> List[Dict[str, Any]]: """ Detect potential pump and dump schemes Args: transactions_df: DataFrame of transactions price_data: Dictionary of price impact data for each transaction sensitivity: Detection sensitivity ("Low", "Medium", "High") Returns: List of potential pump and dump incidents """ if transactions_df.empty or not price_data: return [] # Ensure timestamp column exists if 'Timestamp' in transactions_df.columns: timestamp_col = 'Timestamp' elif 'timeStamp' in transactions_df.columns: timestamp_col = 'timeStamp' else: raise ValueError("Timestamp column not found in transactions DataFrame") # Ensure from/to columns exist if 'From' in transactions_df.columns and 'To' in transactions_df.columns: from_col, to_col = 'From', 'To' elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: from_col, to_col = 'from', 'to' else: raise ValueError("From/To columns not found in transactions DataFrame") # Ensure timestamp is datetime if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) # Define sensitivity thresholds if sensitivity == "Low": accumulation_threshold = 5 # Number of buys to consider accumulation pump_threshold = 10.0 # % price increase to trigger pump dump_threshold = -8.0 # % price decrease to trigger dump elif sensitivity == "Medium": accumulation_threshold = 3 pump_threshold = 7.0 dump_threshold = -5.0 else: # High accumulation_threshold = 2 pump_threshold = 5.0 dump_threshold = -3.0 # Combine price impact data with transactions txs_with_impact = [] for idx, row in transactions_df.iterrows(): tx_hash = row.get('Transaction Hash', row.get('hash', None)) if not tx_hash or tx_hash not in price_data: continue tx_impact = price_data[tx_hash] if tx_impact['impact_pct'] is None: continue txs_with_impact.append({ 'transaction_hash': tx_hash, 'timestamp': row[timestamp_col], 'from': row[from_col], 'to': row[to_col], 'pre_price': tx_impact['pre_price'], 'post_price': tx_impact['post_price'], 'impact_pct': tx_impact['impact_pct'] }) if not txs_with_impact: return [] impact_df = pd.DataFrame(txs_with_impact) impact_df = impact_df.sort_values(by='timestamp') # Look for accumulation phases followed by price pumps and then dumps pump_and_dumps = [] # Group by address to analyze per wallet address_groups = {} for from_addr in impact_df['from'].unique(): address_groups[from_addr] = impact_df[impact_df['from'] == from_addr] for to_addr in impact_df['to'].unique(): if to_addr in address_groups: address_groups[to_addr] = pd.concat([ address_groups[to_addr], impact_df[impact_df['to'] == to_addr] ]) else: address_groups[to_addr] = impact_df[impact_df['to'] == to_addr] for address, addr_df in address_groups.items(): # Skip if not enough transactions if len(addr_df) < accumulation_threshold + 2: continue # Look for continuous price increase followed by sharp drop window_size = min(len(addr_df), 10) for i in range(len(addr_df) - window_size + 1): window = addr_df.iloc[i:i+window_size] # Get cumulative price change in window if len(window) >= 2: first_price = window.iloc[0]['pre_price'] last_price = window.iloc[-1]['post_price'] if first_price is None or last_price is None: continue cumulative_change = ((last_price - first_price) / first_price) * 100 # Check for pump phase max_price = window['post_price'].max() max_idx = window['post_price'].idxmax() if max_idx < len(window) - 1: max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100 # If we have a pump followed by a dump if (cumulative_change > pump_threshold or any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold: # Create chart fig = go.Figure() # Plot price line times = [t.timestamp() for t in window['timestamp']] prices = [] for _, row in window.iterrows(): prices.append(row['pre_price']) prices.append(row['post_price']) times_expanded = [] for t in times: times_expanded.append(t - 60) # 1 min before times_expanded.append(t + 60) # 1 min after fig.add_trace(go.Scatter( x=times_expanded, y=prices, mode='lines+markers', name='Price', line=dict(color='blue') )) # Highlight pump and dump phases max_time_idx = window.index.get_loc(max_idx) pump_x = times_expanded[:max_time_idx*2+2] pump_y = prices[:max_time_idx*2+2] dump_x = times_expanded[max_time_idx*2:] dump_y = prices[max_time_idx*2:] fig.add_trace(go.Scatter( x=pump_x, y=pump_y, mode='lines', line=dict(color='green', width=3), name='Pump Phase' )) fig.add_trace(go.Scatter( x=dump_x, y=dump_y, mode='lines', line=dict(color='red', width=3), name='Dump Phase' )) fig.update_layout( title='Potential Pump and Dump Pattern', xaxis_title='Time', yaxis_title='Price', hovermode='closest' ) pump_and_dumps.append({ "type": "Pump and Dump", "addresses": [address], "risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium", "description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%", "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)", "evidence": window, "chart": fig }) return pump_and_dumps