Spaces:
Build error
Build error
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from sklearn.cluster import KMeans, DBSCAN | |
from sklearn.preprocessing import StandardScaler | |
import plotly.graph_objects as go | |
import plotly.express as px | |
import logging | |
import time | |
class DataProcessor: | |
""" | |
Process and analyze transaction data from blockchain APIs | |
""" | |
def __init__(self): | |
pass | |
def aggregate_transactions(self, | |
transactions_df: pd.DataFrame, | |
time_window: str = 'D') -> pd.DataFrame: | |
""" | |
Aggregate transactions by time window | |
Args: | |
transactions_df: DataFrame of transactions | |
time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) | |
Returns: | |
Aggregated DataFrame with transaction counts and volumes | |
""" | |
if transactions_df.empty: | |
return pd.DataFrame() | |
# Ensure timestamp column is datetime | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure amount column exists | |
if 'Amount' in transactions_df.columns: | |
amount_col = 'Amount' | |
elif 'tokenAmount' in transactions_df.columns: | |
amount_col = 'tokenAmount' | |
elif 'value' in transactions_df.columns: | |
# Try to adjust for decimals if 'tokenDecimal' exists | |
if 'tokenDecimal' in transactions_df.columns: | |
transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) | |
amount_col = 'adjustedValue' | |
else: | |
amount_col = 'value' | |
else: | |
raise ValueError("Amount column not found in transactions DataFrame") | |
# Resample by time window | |
transactions_df = transactions_df.copy() | |
try: | |
transactions_df.set_index(pd.DatetimeIndex(transactions_df[timestamp_col]), inplace=True) | |
except Exception as e: | |
print(f"Error setting DatetimeIndex: {str(e)}") | |
# Create a safe index as a fallback | |
transactions_df['safe_timestamp'] = pd.date_range( | |
start='2025-01-01', | |
periods=len(transactions_df), | |
freq='H' | |
) | |
transactions_df.set_index('safe_timestamp', inplace=True) | |
# Identify buy vs sell transactions based on 'from' and 'to' addresses | |
if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
# If we can't determine direction, just aggregate total volume | |
agg_df = transactions_df.resample(time_window).agg({ | |
amount_col: 'sum', | |
timestamp_col: 'count' | |
}) | |
agg_df.columns = ['Volume', 'Count'] | |
return agg_df.reset_index() | |
# Calculate net flow for each wallet address (positive = inflow, negative = outflow) | |
wallet_addresses = set(transactions_df[from_col].unique()) | set(transactions_df[to_col].unique()) | |
results = [] | |
for wallet in wallet_addresses: | |
wallet_df = transactions_df.copy() | |
# Mark transactions as inflow or outflow | |
wallet_df['Direction'] = 'Unknown' | |
wallet_df.loc[wallet_df[to_col] == wallet, 'Direction'] = 'In' | |
wallet_df.loc[wallet_df[from_col] == wallet, 'Direction'] = 'Out' | |
# Calculate net flow | |
wallet_df['NetFlow'] = wallet_df[amount_col] | |
wallet_df.loc[wallet_df['Direction'] == 'Out', 'NetFlow'] = -wallet_df.loc[wallet_df['Direction'] == 'Out', amount_col] | |
# Aggregate by time window | |
wallet_agg = wallet_df.resample(time_window).agg({ | |
'NetFlow': 'sum', | |
timestamp_col: 'count' | |
}) | |
wallet_agg.columns = ['NetFlow', 'Count'] | |
wallet_agg['Wallet'] = wallet | |
results.append(wallet_agg.reset_index()) | |
if not results: | |
return pd.DataFrame() | |
combined_df = pd.concat(results, ignore_index=True) | |
return combined_df | |
# Cache for pattern identification to avoid repeating expensive calculations | |
_pattern_cache = {} | |
def identify_patterns(self, | |
transactions_df: pd.DataFrame, | |
n_clusters: int = 3) -> List[Dict[str, Any]]: | |
""" | |
Identify trading patterns using clustering algorithms | |
Args: | |
transactions_df: DataFrame of transactions | |
n_clusters: Number of clusters to identify | |
Returns: | |
List of pattern dictionaries containing name, description, and confidence | |
""" | |
# Check for empty data early to avoid processing | |
if transactions_df.empty: | |
return [] | |
# Create a cache key based on DataFrame hash and number of clusters | |
try: | |
cache_key = f"{hash(tuple(transactions_df.columns))}_{len(transactions_df)}_{n_clusters}" | |
# Check cache first | |
if cache_key in self._pattern_cache: | |
return self._pattern_cache[cache_key] | |
except Exception: | |
# If hashing fails, proceed without caching | |
cache_key = None | |
try: | |
# Create a reference instead of a deep copy to improve memory usage | |
df = transactions_df | |
# Ensure timestamp column exists - optimize column presence checks | |
timestamp_cols = ['Timestamp', 'timeStamp'] | |
timestamp_col = next((col for col in timestamp_cols if col in df.columns), None) | |
if timestamp_col: | |
# Convert timestamp only if needed | |
if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): | |
try: | |
# Use vectorized operations instead of astype where possible | |
if df[timestamp_col].dtype == 'object': | |
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') | |
else: | |
df[timestamp_col] = pd.to_datetime(df[timestamp_col], unit='s', errors='coerce') | |
except Exception as e: | |
# Create a date range index as fallback | |
df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') | |
timestamp_col = 'dummy_timestamp' | |
else: | |
# If no timestamp column, create a dummy index | |
df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') | |
timestamp_col = 'dummy_timestamp' | |
# Efficiently calculate floor hour using vectorized operations | |
df['hour'] = df[timestamp_col].dt.floor('H') | |
# Check for address columns efficiently | |
if 'From' in df.columns and 'To' in df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in df.columns and 'to' in df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
# Create dummy addresses only if necessary | |
df['from'] = [f'0x{i:040x}' for i in range(len(df))] | |
df['to'] = [f'0x{(i+1):040x}' for i in range(len(df))] | |
from_col, to_col = 'from', 'to' | |
# Efficiently determine amount column | |
amount_cols = ['Amount', 'tokenAmount', 'value', 'adjustedValue'] | |
amount_col = next((col for col in amount_cols if col in df.columns), None) | |
if not amount_col: | |
# Handle special case for token values with decimals | |
if 'value' in df.columns and 'tokenDecimal' in df.columns: | |
# Vectorized calculation for improved performance | |
try: | |
# Ensure values are numeric | |
df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce') | |
df['tokenDecimal_numeric'] = pd.to_numeric(df['tokenDecimal'], errors='coerce').fillna(18) | |
df['adjustedValue'] = df['value_numeric'] / (10 ** df['tokenDecimal_numeric']) | |
amount_col = 'adjustedValue' | |
except Exception as e: | |
logging.warning(f"Error converting values: {e}") | |
df['dummy_amount'] = 1.0 | |
amount_col = 'dummy_amount' | |
else: | |
# Fallback to dummy values | |
df['dummy_amount'] = 1.0 | |
amount_col = 'dummy_amount' | |
# Ensure the amount column is numeric | |
try: | |
if amount_col in df.columns: | |
df[f"{amount_col}_numeric"] = pd.to_numeric(df[amount_col], errors='coerce').fillna(0) | |
amount_col = f"{amount_col}_numeric" | |
except Exception: | |
# If conversion fails, create a dummy numeric column | |
df['safe_amount'] = 1.0 | |
amount_col = 'safe_amount' | |
# Calculate metrics using optimized groupby operations | |
# Use a more efficient approach with built-in pandas aggregation | |
agg_df = df.groupby('hour').agg( | |
Count=pd.NamedAgg(column=from_col, aggfunc='count'), | |
).reset_index() | |
# For NetFlow calculation, we need an additional pass | |
# This uses a more efficient calculation method | |
def calc_netflow(group): | |
# Use optimized filtering and calculations for better performance | |
first_to = group[to_col].iloc[0] if len(group) > 0 else None | |
first_from = group[from_col].iloc[0] if len(group) > 0 else None | |
if first_to is not None and first_from is not None: | |
# Ensure values are converted to numeric before summing | |
try: | |
# Convert to numeric with pd.to_numeric, coerce errors to NaN | |
total_in = pd.to_numeric(group.loc[group[to_col] == first_to, amount_col], errors='coerce').sum() | |
total_out = pd.to_numeric(group.loc[group[from_col] == first_from, amount_col], errors='coerce').sum() | |
# Replace NaN with 0 to avoid propagation | |
if pd.isna(total_in): total_in = 0.0 | |
if pd.isna(total_out): total_out = 0.0 | |
return float(total_in) - float(total_out) | |
except Exception as e: | |
import logging | |
logging.debug(f"Error converting values to numeric: {e}") | |
return 0.0 | |
return 0.0 | |
# Calculate NetFlow using apply instead of loop | |
netflows = df.groupby('hour').apply(calc_netflow) | |
agg_df['NetFlow'] = netflows.values | |
# Early return if not enough data for clustering | |
if agg_df.empty or len(agg_df) < n_clusters: | |
return [] | |
# Ensure we don't have too many clusters for the dataset | |
actual_n_clusters = min(n_clusters, max(2, len(agg_df) // 2)) | |
# Prepare features for clustering - with careful type handling | |
try: | |
if 'NetFlow' in agg_df.columns: | |
# Ensure NetFlow is numeric | |
agg_df['NetFlow'] = pd.to_numeric(agg_df['NetFlow'], errors='coerce').fillna(0) | |
features = agg_df[['NetFlow', 'Count']].copy() | |
primary_metric = 'NetFlow' | |
else: | |
# Calculate Volume if needed | |
if 'Volume' not in agg_df.columns and amount_col in df.columns: | |
# Calculate volume with numeric conversion | |
volume_by_hour = pd.to_numeric(df[amount_col], errors='coerce').fillna(0).groupby(df['hour']).sum() | |
agg_df['Volume'] = agg_df['hour'].map(volume_by_hour) | |
# Ensure Volume exists and is numeric | |
if 'Volume' not in agg_df.columns: | |
agg_df['Volume'] = 1.0 # Default value if calculation failed | |
else: | |
agg_df['Volume'] = pd.to_numeric(agg_df['Volume'], errors='coerce').fillna(1.0) | |
# Ensure Count is numeric | |
agg_df['Count'] = pd.to_numeric(agg_df['Count'], errors='coerce').fillna(1.0) | |
features = agg_df[['Volume', 'Count']].copy() | |
primary_metric = 'Volume' | |
# Final check to ensure features are numeric | |
for col in features.columns: | |
features[col] = pd.to_numeric(features[col], errors='coerce').fillna(0) | |
except Exception as e: | |
logging.warning(f"Error preparing clustering features: {e}") | |
# Create safe dummy features if everything else fails | |
agg_df['SafeFeature'] = 1.0 | |
agg_df['Count'] = 1.0 | |
features = agg_df[['SafeFeature', 'Count']].copy() | |
primary_metric = 'SafeFeature' | |
# Scale features - import only when needed for efficiency | |
from sklearn.preprocessing import StandardScaler | |
scaler = StandardScaler() | |
scaled_features = scaler.fit_transform(features) | |
# Use K-Means with reduced complexity | |
from sklearn.cluster import KMeans | |
kmeans = KMeans(n_clusters=actual_n_clusters, random_state=42, n_init=10, max_iter=100) | |
agg_df['Cluster'] = kmeans.fit_predict(scaled_features) | |
# Calculate time-based metrics from the hour column directly | |
if 'hour' in agg_df.columns: | |
try: | |
# Convert to datetime for hour and day extraction if needed | |
hour_series = pd.to_datetime(agg_df['hour']) | |
agg_df['Hour'] = hour_series.dt.hour | |
agg_df['Day'] = hour_series.dt.dayofweek | |
except Exception: | |
# Fallback for non-convertible data | |
agg_df['Hour'] = 0 | |
agg_df['Day'] = 0 | |
else: | |
# Default values if no hour column | |
agg_df['Hour'] = 0 | |
agg_df['Day'] = 0 | |
# Identify patterns efficiently | |
patterns = [] | |
for i in range(actual_n_clusters): | |
# Use boolean indexing for better performance | |
cluster_mask = agg_df['Cluster'] == i | |
cluster_df = agg_df[cluster_mask] | |
if len(cluster_df) == 0: | |
continue | |
if primary_metric == 'NetFlow': | |
# Use numpy methods for faster calculation | |
avg_flow = cluster_df['NetFlow'].mean() | |
flow_std = cluster_df['NetFlow'].std() | |
behavior = "Accumulation" if avg_flow > 0 else "Distribution" | |
volume_metric = f"Net Flow: {avg_flow:.2f} ± {flow_std:.2f}" | |
else: | |
# Use Volume metrics - optimize to avoid redundant calculations | |
avg_volume = cluster_df['Volume'].mean() if 'Volume' in cluster_df else 0 | |
volume_std = cluster_df['Volume'].std() if 'Volume' in cluster_df else 0 | |
behavior = "High Volume" if 'Volume' in agg_df and avg_volume > agg_df['Volume'].mean() else "Low Volume" | |
volume_metric = f"Volume: {avg_volume:.2f} ± {volume_std:.2f}" | |
# Pattern characteristics | |
pattern_metrics = { | |
"avg_flow": avg_flow, | |
"flow_std": flow_std, | |
"avg_count": cluster_df['Count'].mean(), | |
"max_flow": cluster_df['NetFlow'].max(), | |
"min_flow": cluster_df['NetFlow'].min(), | |
"common_hour": cluster_df['Hour'].mode()[0] if not cluster_df['Hour'].empty else None, | |
"common_day": cluster_df['Day'].mode()[0] if not cluster_df['Day'].empty else None | |
} | |
# Enhanced confidence calculation | |
if primary_metric == 'NetFlow': | |
# Calculate within-cluster variance as a percentage of total variance | |
cluster_variance = cluster_df['NetFlow'].var() | |
total_variance = agg_df['NetFlow'].var() or 1 # Avoid division by zero | |
confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) | |
else: | |
# Calculate within-cluster variance as a percentage of total variance | |
cluster_variance = cluster_df['Volume'].var() | |
total_variance = agg_df['Volume'].var() or 1 # Avoid division by zero | |
confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) | |
# Create enhanced pattern charts - Main Chart | |
if primary_metric == 'NetFlow': | |
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', | |
size='Count', color='Cluster', | |
title=f"Pattern {i+1}: {behavior}", | |
labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, | |
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
# Add a trend line | |
main_fig.add_trace(go.Scatter( | |
x=cluster_df.index, | |
y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), | |
mode='lines', | |
name='Trend', | |
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
)) | |
# Add a zero reference line | |
main_fig.add_shape( | |
type="line", | |
x0=cluster_df.index.min(), | |
y0=0, | |
x1=cluster_df.index.max(), | |
y1=0, | |
line=dict(color="red", width=1, dash="dot"), | |
) | |
else: | |
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', | |
size='Count', color='Cluster', | |
title=f"Pattern {i+1}: {behavior}", | |
labels={'Volume': 'Transaction Volume', 'index': 'Time'}, | |
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
# Add a trend line | |
main_fig.add_trace(go.Scatter( | |
x=cluster_df.index, | |
y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), | |
mode='lines', | |
name='Trend', | |
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
)) | |
main_fig.update_layout( | |
template="plotly_white", | |
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
margin=dict(l=20, r=20, t=50, b=20), | |
height=400 | |
) | |
# Create hourly distribution chart | |
hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) | |
hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, | |
title="Hourly Distribution", | |
labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, | |
color_discrete_sequence=['#1f77b4']) | |
hour_fig.update_layout(template="plotly_white", height=300) | |
# Create volume/flow distribution chart | |
if primary_metric == 'NetFlow': | |
hist_data = cluster_df['NetFlow'] | |
hist_title = "Net Flow Distribution" | |
hist_label = "Net Flow" | |
else: | |
hist_data = cluster_df['Volume'] | |
hist_title = "Volume Distribution" | |
hist_label = "Volume" | |
dist_fig = px.histogram(hist_data, | |
title=hist_title, | |
labels={'value': hist_label, 'count': 'Frequency'}, | |
color_discrete_sequence=['#2ca02c']) | |
dist_fig.update_layout(template="plotly_white", height=300) | |
# Find related transactions | |
if not transactions_df.empty: | |
# Get timestamps from this cluster | |
cluster_times = pd.to_datetime(cluster_df.index) | |
# Create time windows for matching | |
time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] | |
# Find transactions within these time windows | |
pattern_txs = transactions_df[transactions_df[timestamp_col].apply( | |
lambda x: any((start <= x <= end) for start, end in time_windows) | |
)].copy() | |
# If we have too many, sample them | |
if len(pattern_txs) > 10: | |
pattern_txs = pattern_txs.sample(10) | |
# If we have too few, just sample from all transactions | |
if len(pattern_txs) < 5 and len(transactions_df) >= 5: | |
pattern_txs = transactions_df.sample(min(5, len(transactions_df))) | |
else: | |
pattern_txs = pd.DataFrame() | |
# Comprehensive pattern dictionary | |
pattern = { | |
"name": behavior, | |
"description": f"This pattern shows {behavior.lower()} activity.", | |
"strategy": "Unknown", | |
"risk_profile": "Unknown", | |
"time_insight": "Unknown", | |
"cluster_id": i, | |
"metrics": pattern_metrics, | |
"occurrence_count": len(cluster_df), | |
"volume_metric": volume_metric, | |
"confidence": confidence, | |
"impact": 0.0, | |
"charts": { | |
"main": main_fig, | |
"hourly_distribution": hour_fig, | |
"value_distribution": dist_fig | |
}, | |
"examples": pattern_txs | |
} | |
patterns.append(pattern) | |
# Cache results for future reuse | |
if cache_key: | |
self._pattern_cache[cache_key] = patterns | |
return patterns | |
except Exception as e: | |
import logging | |
logging.warning(f"Error during pattern identification: {str(e)}") | |
return [] | |
# Create enhanced pattern detection method with visualization capabilities | |
if primary_metric == 'NetFlow': | |
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', | |
size='Count', color='Cluster', | |
title=f"Pattern {i+1}: {behavior}", | |
labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, | |
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
# Add a trend line | |
main_fig.add_trace(go.Scatter( | |
x=cluster_df.index, | |
y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), | |
mode='lines', | |
name='Trend', | |
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
)) | |
# Add a zero reference line | |
main_fig.add_shape( | |
type="line", | |
x0=cluster_df.index.min(), | |
y0=0, | |
x1=cluster_df.index.max(), | |
y1=0, | |
line=dict(color="red", width=1, dash="dot"), | |
) | |
else: | |
main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', | |
size='Count', color='Cluster', | |
title=f"Pattern {i+1}: {behavior}", | |
labels={'Volume': 'Transaction Volume', 'index': 'Time'}, | |
color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
# Add a trend line | |
main_fig.add_trace(go.Scatter( | |
x=cluster_df.index, | |
y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), | |
mode='lines', | |
name='Trend', | |
line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
)) | |
main_fig.update_layout( | |
template="plotly_white", | |
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
margin=dict(l=20, r=20, t=50, b=20), | |
height=400 | |
) | |
# Create hourly distribution chart | |
hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) | |
hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, | |
title="Hourly Distribution", | |
labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, | |
color_discrete_sequence=['#1f77b4']) | |
hour_fig.update_layout(template="plotly_white", height=300) | |
# Create volume/flow distribution chart | |
if primary_metric == 'NetFlow': | |
hist_data = cluster_df['NetFlow'] | |
hist_title = "Net Flow Distribution" | |
hist_label = "Net Flow" | |
else: | |
hist_data = cluster_df['Volume'] | |
hist_title = "Volume Distribution" | |
hist_label = "Volume" | |
dist_fig = px.histogram(hist_data, | |
title=hist_title, | |
labels={'value': hist_label, 'count': 'Frequency'}, | |
color_discrete_sequence=['#2ca02c']) | |
dist_fig.update_layout(template="plotly_white", height=300) | |
# Find related transactions | |
if not transactions_df.empty: | |
# Get timestamps from this cluster | |
cluster_times = pd.to_datetime(cluster_df.index) | |
# Create time windows for matching | |
time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] | |
# Find transactions within these time windows | |
pattern_txs = transactions_df[transactions_df[timestamp_col].apply( | |
lambda x: any((start <= x <= end) for start, end in time_windows) | |
)].copy() | |
# If we have too many, sample them | |
if len(pattern_txs) > 10: | |
pattern_txs = pattern_txs.sample(10) | |
# If we have too few, just sample from all transactions | |
if len(pattern_txs) < 5 and len(transactions_df) >= 5: | |
pattern_txs = transactions_df.sample(min(5, len(transactions_df))) | |
else: | |
pattern_txs = pd.DataFrame() | |
# Comprehensive pattern dictionary | |
pattern = { | |
"name": behavior, | |
"description": description, | |
"strategy": strategy, | |
"risk_profile": risk_profile, | |
"time_insight": time_insight, | |
"cluster_id": i, | |
"metrics": pattern_metrics, | |
"occurrence_count": len(cluster_df), | |
"volume_metric": volume_metric, | |
"confidence": confidence, | |
"charts": { | |
"main": main_fig, | |
"hourly_distribution": hour_fig, | |
"value_distribution": dist_fig | |
}, | |
"examples": pattern_txs | |
} | |
patterns.append(pattern) | |
return patterns | |
def detect_anomalous_transactions(self, | |
transactions_df: pd.DataFrame, | |
sensitivity: str = "Medium") -> pd.DataFrame: | |
""" | |
Detect anomalous transactions using statistical methods | |
Args: | |
transactions_df: DataFrame of transactions | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
DataFrame of anomalous transactions | |
""" | |
if transactions_df.empty: | |
return pd.DataFrame() | |
# Ensure amount column exists | |
if 'Amount' in transactions_df.columns: | |
amount_col = 'Amount' | |
elif 'tokenAmount' in transactions_df.columns: | |
amount_col = 'tokenAmount' | |
elif 'value' in transactions_df.columns: | |
# Try to adjust for decimals if 'tokenDecimal' exists | |
if 'tokenDecimal' in transactions_df.columns: | |
transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) | |
amount_col = 'adjustedValue' | |
else: | |
amount_col = 'value' | |
else: | |
raise ValueError("Amount column not found in transactions DataFrame") | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
z_threshold = 3.0 # Outliers beyond 3 standard deviations | |
elif sensitivity == "Medium": | |
z_threshold = 2.5 # Outliers beyond 2.5 standard deviations | |
else: # High | |
z_threshold = 2.0 # Outliers beyond 2 standard deviations | |
# Calculate z-score for amount | |
mean_amount = transactions_df[amount_col].mean() | |
std_amount = transactions_df[amount_col].std() | |
if std_amount == 0: | |
return pd.DataFrame() | |
transactions_df['z_score'] = abs((transactions_df[amount_col] - mean_amount) / std_amount) | |
# Flag anomalous transactions | |
anomalies = transactions_df[transactions_df['z_score'] > z_threshold].copy() | |
# Add risk level based on z-score | |
anomalies['risk_level'] = 'Medium' | |
anomalies.loc[anomalies['z_score'] > z_threshold * 1.5, 'risk_level'] = 'High' | |
anomalies.loc[anomalies['z_score'] <= z_threshold * 1.2, 'risk_level'] = 'Low' | |
return anomalies | |
def analyze_price_impact(self, | |
transactions_df: pd.DataFrame, | |
price_data: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: | |
""" | |
Analyze the price impact of transactions with enhanced visualizations | |
Args: | |
transactions_df: DataFrame of transactions | |
price_data: Dictionary of price impact data for each transaction | |
Returns: | |
Dictionary with comprehensive price impact analysis and visualizations | |
""" | |
if transactions_df.empty or not price_data: | |
# Create an empty chart for the default case | |
empty_fig = go.Figure() | |
empty_fig.update_layout( | |
title="No Price Impact Data Available", | |
xaxis_title="Time", | |
yaxis_title="Price Impact (%)", | |
height=400, | |
template="plotly_white" | |
) | |
empty_fig.add_annotation( | |
text="No transactions found with price impact data", | |
showarrow=False, | |
font=dict(size=14) | |
) | |
return { | |
'avg_impact_pct': 0, | |
'max_impact_pct': 0, | |
'min_impact_pct': 0, | |
'significant_moves_count': 0, | |
'total_transactions': 0, | |
'charts': { | |
'main_chart': empty_fig, | |
'impact_distribution': empty_fig, | |
'cumulative_impact': empty_fig, | |
'hourly_impact': empty_fig | |
}, | |
'transactions_with_impact': pd.DataFrame(), | |
'insights': [], | |
'impact_summary': "No price impact data available" | |
} | |
# Ensure timestamp column is datetime | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
# Convert timestamp to datetime if it's not already | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Combine price impact data with transactions | |
impact_data = [] | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
if not tx_hash or tx_hash not in price_data: | |
continue | |
tx_impact = price_data[tx_hash] | |
if tx_impact['impact_pct'] is None: | |
continue | |
# Get token symbol if available | |
token_symbol = row.get('tokenSymbol', 'Unknown') | |
token_amount = row.get('value', 0) | |
if 'tokenDecimal' in row: | |
try: | |
token_amount = float(token_amount) / (10 ** int(row.get('tokenDecimal', 0))) | |
except (ValueError, TypeError): | |
token_amount = 0 | |
impact_data.append({ | |
'transaction_hash': tx_hash, | |
'timestamp': row[timestamp_col], | |
'pre_price': tx_impact['pre_price'], | |
'post_price': tx_impact['post_price'], | |
'impact_pct': tx_impact['impact_pct'], | |
'token_symbol': token_symbol, | |
'token_amount': token_amount, | |
'from': row.get('from', ''), | |
'to': row.get('to', ''), | |
'hour': row[timestamp_col].hour if isinstance(row[timestamp_col], pd.Timestamp) else 0 | |
}) | |
if not impact_data: | |
# Create an empty chart for the default case | |
empty_fig = go.Figure() | |
empty_fig.update_layout( | |
title="No Price Impact Data Available", | |
xaxis_title="Time", | |
yaxis_title="Price Impact (%)", | |
height=400, | |
template="plotly_white" | |
) | |
empty_fig.add_annotation( | |
text="No transactions found with price impact data", | |
showarrow=False, | |
font=dict(size=14) | |
) | |
return { | |
'avg_impact_pct': 0, | |
'max_impact_pct': 0, | |
'min_impact_pct': 0, | |
'significant_moves_count': 0, | |
'total_transactions': len(transactions_df) if not transactions_df.empty else 0, | |
'charts': { | |
'main_chart': empty_fig, | |
'impact_distribution': empty_fig, | |
'cumulative_impact': empty_fig, | |
'hourly_impact': empty_fig | |
}, | |
'transactions_with_impact': pd.DataFrame(), | |
'insights': [], | |
'impact_summary': "No price impact data available" | |
} | |
impact_df = pd.DataFrame(impact_data) | |
# Calculate aggregate metrics | |
avg_impact = impact_df['impact_pct'].mean() | |
max_impact = impact_df['impact_pct'].max() | |
min_impact = impact_df['impact_pct'].min() | |
median_impact = impact_df['impact_pct'].median() | |
std_impact = impact_df['impact_pct'].std() | |
# Count significant moves (>1% impact) | |
significant_threshold = 1.0 | |
high_impact_threshold = 3.0 | |
significant_moves = len(impact_df[abs(impact_df['impact_pct']) > significant_threshold]) | |
high_impact_moves = len(impact_df[abs(impact_df['impact_pct']) > high_impact_threshold]) | |
positive_impacts = len(impact_df[impact_df['impact_pct'] > 0]) | |
negative_impacts = len(impact_df[impact_df['impact_pct'] < 0]) | |
# Calculate cumulative impact | |
impact_df = impact_df.sort_values('timestamp') | |
impact_df['cumulative_impact'] = impact_df['impact_pct'].cumsum() | |
# Generate insights | |
insights = [] | |
# Market direction bias | |
if avg_impact > 0.5: | |
insights.append({ | |
"title": "Positive Price Pressure", | |
"description": f"Transactions show an overall positive price impact of {avg_impact:.2f}%, suggesting accumulation or market strength." | |
}) | |
elif avg_impact < -0.5: | |
insights.append({ | |
"title": "Negative Price Pressure", | |
"description": f"Transactions show an overall negative price impact of {avg_impact:.2f}%, suggesting distribution or market weakness." | |
}) | |
# Volatility analysis | |
if std_impact > 2.0: | |
insights.append({ | |
"title": "High Market Volatility", | |
"description": f"Price impact shows high volatility (std: {std_impact:.2f}%), indicating potential market manipulation or whipsaw conditions." | |
}) | |
# Significant impacts | |
if high_impact_moves > 0: | |
insights.append({ | |
"title": "High Impact Transactions", | |
"description": f"Detected {high_impact_moves} high-impact transactions (>{high_impact_threshold}% price change), indicating potential market-moving activity." | |
}) | |
# Temporal patterns | |
hourly_impact = impact_df.groupby('hour')['impact_pct'].mean() | |
if len(hourly_impact) > 0: | |
max_hour = hourly_impact.abs().idxmax() | |
max_hour_impact = hourly_impact[max_hour] | |
insights.append({ | |
"title": "Time-Based Pattern", | |
"description": f"Highest price impact occurs around {max_hour}:00 with an average of {max_hour_impact:.2f}%." | |
}) | |
# Create impact summary text | |
impact_summary = f"Analysis of {len(impact_df)} price-impacting transactions shows an average impact of {avg_impact:.2f}% " | |
impact_summary += f"(range: {min_impact:.2f}% to {max_impact:.2f}%). " | |
impact_summary += f"Found {significant_moves} significant price moves and {high_impact_moves} high-impact transactions. " | |
if positive_impacts > negative_impacts: | |
impact_summary += f"There is a bias towards positive price impact ({positive_impacts} positive vs {negative_impacts} negative)." | |
elif negative_impacts > positive_impacts: | |
impact_summary += f"There is a bias towards negative price impact ({negative_impacts} negative vs {positive_impacts} positive)." | |
else: | |
impact_summary += "The price impact is balanced between positive and negative moves." | |
# Create enhanced main visualization | |
main_fig = go.Figure() | |
# Add scatter plot for impact | |
main_fig.add_trace(go.Scatter( | |
x=impact_df['timestamp'], | |
y=impact_df['impact_pct'], | |
mode='markers+lines', | |
marker=dict( | |
size=impact_df['impact_pct'].abs() * 1.5 + 5, | |
color=impact_df['impact_pct'], | |
colorscale='RdBu_r', | |
line=dict(width=1), | |
symbol=['circle' if val >= 0 else 'diamond' for val in impact_df['impact_pct']] | |
), | |
text=[ | |
f"TX: {tx[:8]}...{tx[-6:]}<br>" + | |
f"Impact: {impact:.2f}%<br>" + | |
f"Token: {token} ({amount:.4f})<br>" + | |
f"From: {src[:6]}...{src[-4:]}<br>" + | |
f"To: {dst[:6]}...{dst[-4:]}" | |
for tx, impact, token, amount, src, dst in zip( | |
impact_df['transaction_hash'], | |
impact_df['impact_pct'], | |
impact_df['token_symbol'], | |
impact_df['token_amount'], | |
impact_df['from'], | |
impact_df['to'] | |
) | |
], | |
hovertemplate='%{text}<br>Time: %{x}<extra></extra>', | |
name='Price Impact' | |
)) | |
# Add a moving average trendline | |
window_size = max(3, len(impact_df) // 10) # Dynamic window size | |
if len(impact_df) >= window_size: | |
impact_df['ma'] = impact_df['impact_pct'].rolling(window=window_size, min_periods=1).mean() | |
main_fig.add_trace(go.Scatter( | |
x=impact_df['timestamp'], | |
y=impact_df['ma'], | |
mode='lines', | |
line=dict(width=2, color='rgba(255,165,0,0.7)'), | |
name=f'Moving Avg ({window_size} period)' | |
)) | |
# Add a zero line for reference | |
main_fig.add_shape( | |
type='line', | |
x0=impact_df['timestamp'].min(), | |
y0=0, | |
x1=impact_df['timestamp'].max(), | |
y1=0, | |
line=dict(color='gray', width=1, dash='dash') | |
) | |
# Add colored regions for significant impact | |
# Add green band for normal price movement | |
main_fig.add_shape( | |
type='rect', | |
x0=impact_df['timestamp'].min(), | |
y0=-significant_threshold, | |
x1=impact_df['timestamp'].max(), | |
y1=significant_threshold, | |
fillcolor='rgba(0,255,0,0.1)', | |
line=dict(width=0), | |
layer='below' | |
) | |
# Add warning bands for higher impact movements | |
main_fig.add_shape( | |
type='rect', | |
x0=impact_df['timestamp'].min(), | |
y0=significant_threshold, | |
x1=impact_df['timestamp'].max(), | |
y1=high_impact_threshold, | |
fillcolor='rgba(255,255,0,0.1)', | |
line=dict(width=0), | |
layer='below' | |
) | |
main_fig.add_shape( | |
type='rect', | |
x0=impact_df['timestamp'].min(), | |
y0=-high_impact_threshold, | |
x1=impact_df['timestamp'].max(), | |
y1=-significant_threshold, | |
fillcolor='rgba(255,255,0,0.1)', | |
line=dict(width=0), | |
layer='below' | |
) | |
# Add high impact regions | |
main_fig.add_shape( | |
type='rect', | |
x0=impact_df['timestamp'].min(), | |
y0=high_impact_threshold, | |
x1=impact_df['timestamp'].max(), | |
y1=max(high_impact_threshold * 2, max_impact * 1.1), | |
fillcolor='rgba(255,0,0,0.1)', | |
line=dict(width=0), | |
layer='below' | |
) | |
main_fig.add_shape( | |
type='rect', | |
x0=impact_df['timestamp'].min(), | |
y0=min(high_impact_threshold * -2, min_impact * 1.1), | |
x1=impact_df['timestamp'].max(), | |
y1=-high_impact_threshold, | |
fillcolor='rgba(255,0,0,0.1)', | |
line=dict(width=0), | |
layer='below' | |
) | |
main_fig.update_layout( | |
title='Price Impact of Whale Transactions', | |
xaxis_title='Timestamp', | |
yaxis_title='Price Impact (%)', | |
hovermode='closest', | |
template="plotly_white", | |
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
margin=dict(l=20, r=20, t=50, b=20) | |
) | |
# Create impact distribution histogram | |
dist_fig = px.histogram( | |
impact_df['impact_pct'], | |
nbins=20, | |
labels={'value': 'Price Impact (%)', 'count': 'Frequency'}, | |
title='Distribution of Price Impact', | |
color_discrete_sequence=['#3366CC'] | |
) | |
# Add a vertical line at the mean | |
dist_fig.add_vline(x=avg_impact, line_dash="dash", line_color="red") | |
dist_fig.add_annotation(x=avg_impact, y=0.85, yref="paper", text=f"Mean: {avg_impact:.2f}%", | |
showarrow=True, arrowhead=2, arrowcolor="red", ax=40) | |
# Add a vertical line at zero | |
dist_fig.add_vline(x=0, line_dash="solid", line_color="black") | |
dist_fig.update_layout( | |
template="plotly_white", | |
bargap=0.1, | |
height=350 | |
) | |
# Create cumulative impact chart | |
cumul_fig = go.Figure() | |
cumul_fig.add_trace(go.Scatter( | |
x=impact_df['timestamp'], | |
y=impact_df['cumulative_impact'], | |
mode='lines', | |
fill='tozeroy', | |
line=dict(width=2, color='#2ca02c'), | |
name='Cumulative Impact' | |
)) | |
cumul_fig.update_layout( | |
title='Cumulative Price Impact Over Time', | |
xaxis_title='Timestamp', | |
yaxis_title='Cumulative Price Impact (%)', | |
template="plotly_white", | |
height=350 | |
) | |
# Create hourly impact analysis | |
hourly_impact = impact_df.groupby('hour')['impact_pct'].agg(['mean', 'count', 'std']).reset_index() | |
hourly_impact = hourly_impact.sort_values('hour') | |
hour_fig = go.Figure() | |
hour_fig.add_trace(go.Bar( | |
x=hourly_impact['hour'], | |
y=hourly_impact['mean'], | |
error_y=dict(type='data', array=hourly_impact['std'], visible=True), | |
marker_color=hourly_impact['mean'].apply(lambda x: 'green' if x > 0 else 'red'), | |
name='Average Impact' | |
)) | |
hour_fig.update_layout( | |
title='Price Impact by Hour of Day', | |
xaxis_title='Hour of Day', | |
yaxis_title='Average Price Impact (%)', | |
template="plotly_white", | |
height=350, | |
xaxis=dict(tickmode='linear', tick0=0, dtick=2) | |
) | |
# Join with original transactions | |
transactions_df = transactions_df.copy() | |
transactions_df['Timestamp_key'] = transactions_df[timestamp_col] | |
impact_df['Timestamp_key'] = impact_df['timestamp'] | |
merged_df = pd.merge( | |
transactions_df, | |
impact_df[['Timestamp_key', 'impact_pct', 'pre_price', 'post_price', 'cumulative_impact']], | |
on='Timestamp_key', | |
how='left' | |
) | |
# Final result with enhanced output | |
return { | |
'avg_impact_pct': avg_impact, | |
'max_impact_pct': max_impact, | |
'min_impact_pct': min_impact, | |
'median_impact_pct': median_impact, | |
'std_impact_pct': std_impact, | |
'significant_moves_count': significant_moves, | |
'high_impact_moves_count': high_impact_moves, | |
'positive_impacts_count': positive_impacts, | |
'negative_impacts_count': negative_impacts, | |
'total_transactions': len(transactions_df), | |
'charts': { | |
'main_chart': main_fig, | |
'impact_distribution': dist_fig, | |
'cumulative_impact': cumul_fig, | |
'hourly_impact': hour_fig | |
}, | |
'transactions_with_impact': merged_df, | |
'insights': insights, | |
'impact_summary': impact_summary | |
} | |
def detect_wash_trading(self, | |
transactions_df: pd.DataFrame, | |
addresses: List[str], | |
time_window_minutes: int = 60, | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential wash trading between addresses | |
Args: | |
transactions_df: DataFrame of transactions | |
addresses: List of addresses to analyze | |
time_window_minutes: Time window for detecting wash trades | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential wash trading incidents | |
""" | |
if transactions_df.empty or not addresses: | |
return [] | |
# Ensure from/to columns exist | |
if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
raise ValueError("From/To columns not found in transactions DataFrame") | |
# Ensure timestamp column exists | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure timestamp is datetime | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
min_cycles = 3 # Minimum number of back-and-forth transactions | |
max_time_diff = 120 # Maximum minutes between transactions | |
elif sensitivity == "Medium": | |
min_cycles = 2 | |
max_time_diff = 60 | |
else: # High | |
min_cycles = 1 | |
max_time_diff = 30 | |
# Filter transactions involving the addresses | |
address_txs = transactions_df[ | |
(transactions_df[from_col].isin(addresses)) | | |
(transactions_df[to_col].isin(addresses)) | |
].copy() | |
if address_txs.empty: | |
return [] | |
# Sort by timestamp | |
address_txs = address_txs.sort_values(by=timestamp_col) | |
# Detect cycles of transactions between same addresses | |
wash_trades = [] | |
for addr1 in addresses: | |
for addr2 in addresses: | |
if addr1 == addr2: | |
continue | |
# Find transactions from addr1 to addr2 | |
a1_to_a2 = address_txs[ | |
(address_txs[from_col] == addr1) & | |
(address_txs[to_col] == addr2) | |
] | |
# Find transactions from addr2 to addr1 | |
a2_to_a1 = address_txs[ | |
(address_txs[from_col] == addr2) & | |
(address_txs[to_col] == addr1) | |
] | |
if a1_to_a2.empty or a2_to_a1.empty: | |
continue | |
# Check for back-and-forth patterns | |
cycles = 0 | |
evidence = [] | |
for _, tx1 in a1_to_a2.iterrows(): | |
tx1_time = tx1[timestamp_col] | |
# Find return transactions within the time window | |
return_txs = a2_to_a1[ | |
(a2_to_a1[timestamp_col] > tx1_time) & | |
(a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff)) | |
] | |
if not return_txs.empty: | |
cycles += 1 | |
evidence.append(tx1) | |
evidence.append(return_txs.iloc[0]) | |
if cycles >= min_cycles: | |
# Create visualization | |
if evidence: | |
evidence_df = pd.DataFrame(evidence) | |
fig = px.scatter( | |
evidence_df, | |
x=timestamp_col, | |
y=evidence_df.get('Amount', evidence_df.get('tokenAmount', evidence_df.get('value', 0))), | |
color=from_col, | |
title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..." | |
) | |
else: | |
fig = None | |
wash_trades.append({ | |
"type": "Wash Trading", | |
"addresses": [addr1, addr2], | |
"risk_level": "High" if cycles >= min_cycles * 2 else "Medium", | |
"description": f"Detected {cycles} cycles of back-and-forth transactions between addresses", | |
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"title": f"Wash Trading Pattern ({cycles} cycles)", | |
"evidence": pd.DataFrame(evidence) if evidence else None, | |
"chart": fig | |
}) | |
return wash_trades | |
def detect_pump_and_dump(self, | |
transactions_df: pd.DataFrame, | |
price_data: Dict[str, Dict[str, Any]], | |
sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
""" | |
Detect potential pump and dump schemes | |
Args: | |
transactions_df: DataFrame of transactions | |
price_data: Dictionary of price impact data for each transaction | |
sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
Returns: | |
List of potential pump and dump incidents | |
""" | |
if transactions_df.empty or not price_data: | |
return [] | |
# Ensure timestamp column exists | |
if 'Timestamp' in transactions_df.columns: | |
timestamp_col = 'Timestamp' | |
elif 'timeStamp' in transactions_df.columns: | |
timestamp_col = 'timeStamp' | |
else: | |
raise ValueError("Timestamp column not found in transactions DataFrame") | |
# Ensure from/to columns exist | |
if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
from_col, to_col = 'From', 'To' | |
elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
from_col, to_col = 'from', 'to' | |
else: | |
raise ValueError("From/To columns not found in transactions DataFrame") | |
# Ensure timestamp is datetime | |
if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
# Define sensitivity thresholds | |
if sensitivity == "Low": | |
accumulation_threshold = 5 # Number of buys to consider accumulation | |
pump_threshold = 10.0 # % price increase to trigger pump | |
dump_threshold = -8.0 # % price decrease to trigger dump | |
elif sensitivity == "Medium": | |
accumulation_threshold = 3 | |
pump_threshold = 7.0 | |
dump_threshold = -5.0 | |
else: # High | |
accumulation_threshold = 2 | |
pump_threshold = 5.0 | |
dump_threshold = -3.0 | |
# Combine price impact data with transactions | |
txs_with_impact = [] | |
for idx, row in transactions_df.iterrows(): | |
tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
if not tx_hash or tx_hash not in price_data: | |
continue | |
tx_impact = price_data[tx_hash] | |
if tx_impact['impact_pct'] is None: | |
continue | |
txs_with_impact.append({ | |
'transaction_hash': tx_hash, | |
'timestamp': row[timestamp_col], | |
'from': row[from_col], | |
'to': row[to_col], | |
'pre_price': tx_impact['pre_price'], | |
'post_price': tx_impact['post_price'], | |
'impact_pct': tx_impact['impact_pct'] | |
}) | |
if not txs_with_impact: | |
return [] | |
impact_df = pd.DataFrame(txs_with_impact) | |
impact_df = impact_df.sort_values(by='timestamp') | |
# Look for accumulation phases followed by price pumps and then dumps | |
pump_and_dumps = [] | |
# Group by address to analyze per wallet | |
address_groups = {} | |
for from_addr in impact_df['from'].unique(): | |
address_groups[from_addr] = impact_df[impact_df['from'] == from_addr] | |
for to_addr in impact_df['to'].unique(): | |
if to_addr in address_groups: | |
address_groups[to_addr] = pd.concat([ | |
address_groups[to_addr], | |
impact_df[impact_df['to'] == to_addr] | |
]) | |
else: | |
address_groups[to_addr] = impact_df[impact_df['to'] == to_addr] | |
for address, addr_df in address_groups.items(): | |
# Skip if not enough transactions | |
if len(addr_df) < accumulation_threshold + 2: | |
continue | |
# Look for continuous price increase followed by sharp drop | |
window_size = min(len(addr_df), 10) | |
for i in range(len(addr_df) - window_size + 1): | |
window = addr_df.iloc[i:i+window_size] | |
# Get cumulative price change in window | |
if len(window) >= 2: | |
first_price = window.iloc[0]['pre_price'] | |
last_price = window.iloc[-1]['post_price'] | |
if first_price is None or last_price is None: | |
continue | |
cumulative_change = ((last_price - first_price) / first_price) * 100 | |
# Check for pump phase | |
max_price = window['post_price'].max() | |
max_idx = window['post_price'].idxmax() | |
if max_idx < len(window) - 1: | |
max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100 | |
# If we have a pump followed by a dump | |
if (cumulative_change > pump_threshold or | |
any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold: | |
# Create chart | |
fig = go.Figure() | |
# Plot price line | |
times = [t.timestamp() for t in window['timestamp']] | |
prices = [] | |
for _, row in window.iterrows(): | |
prices.append(row['pre_price']) | |
prices.append(row['post_price']) | |
times_expanded = [] | |
for t in times: | |
times_expanded.append(t - 60) # 1 min before | |
times_expanded.append(t + 60) # 1 min after | |
fig.add_trace(go.Scatter( | |
x=times_expanded, | |
y=prices, | |
mode='lines+markers', | |
name='Price', | |
line=dict(color='blue') | |
)) | |
# Highlight pump and dump phases | |
max_time_idx = window.index.get_loc(max_idx) | |
pump_x = times_expanded[:max_time_idx*2+2] | |
pump_y = prices[:max_time_idx*2+2] | |
dump_x = times_expanded[max_time_idx*2:] | |
dump_y = prices[max_time_idx*2:] | |
fig.add_trace(go.Scatter( | |
x=pump_x, | |
y=pump_y, | |
mode='lines', | |
line=dict(color='green', width=3), | |
name='Pump Phase' | |
)) | |
fig.add_trace(go.Scatter( | |
x=dump_x, | |
y=dump_y, | |
mode='lines', | |
line=dict(color='red', width=3), | |
name='Dump Phase' | |
)) | |
fig.update_layout( | |
title='Potential Pump and Dump Pattern', | |
xaxis_title='Time', | |
yaxis_title='Price', | |
hovermode='closest' | |
) | |
pump_and_dumps.append({ | |
"type": "Pump and Dump", | |
"addresses": [address], | |
"risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium", | |
"description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%", | |
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)", | |
"evidence": window, | |
"chart": fig | |
}) | |
return pump_and_dumps | |