Whale_Arbitrum / app.py
arpit13's picture
Deploy Whale_Arbitrum on HF Spaces
011960a
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union, Any
from dotenv import load_dotenv
# Configure logging - Reduce verbosity and improve performance
logging.basicConfig(
level=logging.WARNING, # Only show warnings and errors by default
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create a custom filter to suppress repetitive Gemini API errors
class SuppressRepetitiveErrors(logging.Filter):
def __init__(self):
super().__init__()
self.error_counts = {}
self.max_errors = 3 # Show at most 3 instances of each error
def filter(self, record):
if record.levelno < logging.WARNING:
return True
# If it's a Gemini API error for non-existent tokens, suppress it after a few occurrences
if 'Error fetching historical prices from Gemini API' in record.getMessage():
key = 'gemini_api_error'
self.error_counts[key] = self.error_counts.get(key, 0) + 1
# Only allow the first few errors through
return self.error_counts[key] <= self.max_errors
return True
# Apply the filter
logging.getLogger().addFilter(SuppressRepetitiveErrors())
from modules.api_client import ArbiscanClient, GeminiClient
from modules.data_processor import DataProcessor
from modules.visualizer import Visualizer
from modules.detection import ManipulationDetector
# Load environment variables
load_dotenv()
# Set page configuration
st.set_page_config(
page_title="Whale Wallet AI - Market Manipulation Detection",
page_icon="🐳",
layout="wide",
initial_sidebar_state="expanded"
)
# Add custom CSS
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
color: #1E88E5;
text-align: center;
margin-bottom: 1rem;
}
.sub-header {
font-size: 1.5rem;
color: #424242;
margin-bottom: 1rem;
}
.info-text {
background-color: #E3F2FD;
padding: 1rem;
border-radius: 0.5rem;
margin-bottom: 1rem;
}
.stButton>button {
width: 100%;
}
</style>
""", unsafe_allow_html=True)
# Initialize Streamlit session state for persisting data between tab navigation
if 'transactions_data' not in st.session_state:
st.session_state.transactions_data = pd.DataFrame()
if 'patterns_data' not in st.session_state:
st.session_state.patterns_data = None
if 'price_impact_data' not in st.session_state:
st.session_state.price_impact_data = None
# Performance metrics tracking
if 'performance_metrics' not in st.session_state:
st.session_state.performance_metrics = {
'api_calls': 0,
'data_processing_time': 0,
'visualization_time': 0,
'last_refresh': None
}
# Function to track performance
def track_timing(category: str):
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start_time
if category in st.session_state.performance_metrics:
st.session_state.performance_metrics[category] += elapsed
else:
st.session_state.performance_metrics[category] = elapsed
return result
return wrapper
return timing_decorator
if 'alerts_data' not in st.session_state:
st.session_state.alerts_data = None
# Initialize API clients
arbiscan_client = ArbiscanClient(os.getenv("ARBISCAN_API_KEY"))
# Set debug mode to False to reduce log output
arbiscan_client.verbose_debug = False
gemini_client = GeminiClient(os.getenv("GEMINI_API_KEY"))
# Initialize data processor and visualizer
data_processor = DataProcessor()
visualizer = Visualizer()
# Apply performance tracking to key instance methods after initialization
original_fetch_whale = arbiscan_client.fetch_whale_transactions
arbiscan_client.fetch_whale_transactions = track_timing('api_calls')(original_fetch_whale)
original_identify_patterns = data_processor.identify_patterns
data_processor.identify_patterns = track_timing('data_processing_time')(original_identify_patterns)
original_analyze_price_impact = data_processor.analyze_price_impact
data_processor.analyze_price_impact = track_timing('data_processing_time')(original_analyze_price_impact)
detection = ManipulationDetector()
# Initialize crew system (for AI-assisted analysis)
try:
from modules.crew_system import WhaleAnalysisCrewSystem
crew_system = WhaleAnalysisCrewSystem(arbiscan_client, gemini_client, data_processor)
CREW_ENABLED = True
logging.info("CrewAI system loaded successfully")
except Exception as e:
CREW_ENABLED = False
logging.error(f"Failed to load CrewAI system: {str(e)}")
st.sidebar.error("CrewAI features are disabled due to an error.")
# Sidebar for inputs
st.sidebar.header("Configuration")
# Wallet tracking section
st.sidebar.subheader("Track Wallets")
wallet_addresses = st.sidebar.text_area(
"Enter wallet addresses (one per line)",
placeholder="0x1234abcd...\n0xabcd1234..."
)
threshold_type = st.sidebar.radio(
"Threshold Type",
["Token Amount", "USD Value"]
)
if threshold_type == "Token Amount":
threshold_value = st.sidebar.number_input("Minimum Token Amount", min_value=0.0, value=1000.0)
token_symbol = st.sidebar.text_input("Token Symbol", placeholder="ETH")
else:
threshold_value = st.sidebar.number_input("Minimum USD Value", min_value=0.0, value=100000.0)
# Time period selection
st.sidebar.subheader("Time Period")
time_period = st.sidebar.selectbox(
"Select Time Period",
["Last 24 hours", "Last 7 days", "Last 30 days", "Custom"]
)
if time_period == "Custom":
start_date = st.sidebar.date_input("Start Date", datetime.now() - timedelta(days=7))
end_date = st.sidebar.date_input("End Date", datetime.now())
else:
# Calculate dates based on selection
end_date = datetime.now()
if time_period == "Last 24 hours":
start_date = end_date - timedelta(days=1)
elif time_period == "Last 7 days":
start_date = end_date - timedelta(days=7)
else: # Last 30 days
start_date = end_date - timedelta(days=30)
# Manipulation detection settings
st.sidebar.subheader("Manipulation Detection")
enable_manipulation_detection = st.sidebar.toggle("Enable Manipulation Detection", value=True)
if enable_manipulation_detection:
sensitivity = st.sidebar.select_slider(
"Detection Sensitivity",
options=["Low", "Medium", "High"],
value="Medium"
)
# Price impact analysis settings
st.sidebar.subheader("Price Impact Analysis")
enable_price_impact = st.sidebar.toggle("Enable Price Impact Analysis", value=True)
if enable_price_impact:
lookback_minutes = st.sidebar.slider("Lookback (minutes)", 1, 60, 5)
lookahead_minutes = st.sidebar.slider("Lookahead (minutes)", 1, 60, 5)
# Action buttons
track_button = st.sidebar.button("Track Transactions", type="primary")
pattern_button = st.sidebar.button("Analyze Patterns")
if enable_manipulation_detection:
detect_button = st.sidebar.button("Detect Manipulation")
# Main content area
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"Transactions", "Patterns", "Price Impact", "Alerts", "Reports"
])
with tab1:
st.header("Whale Transactions")
if track_button and wallet_addresses:
with st.spinner("Fetching whale transactions..."):
# Function to track whale transactions
def track_whale_transactions(wallets, start_date, end_date, threshold_value, threshold_type, token_symbol=None):
# Direct API call since CrewAI is temporarily disabled
try:
min_token_amount = None
min_usd_value = None
if threshold_type == "Token Amount":
min_token_amount = threshold_value
else:
min_usd_value = threshold_value
# Add pagination control to prevent infinite API requests
max_pages = 5 # Limit the number of pages to prevent excessive API calls
transactions = arbiscan_client.fetch_whale_transactions(
addresses=wallets,
min_token_amount=min_token_amount,
max_pages=5,
min_usd_value=min_usd_value
)
if transactions.empty:
st.warning("No transactions found for the specified addresses")
return transactions
except Exception as e:
st.error(f"Error fetching transactions: {str(e)}")
return pd.DataFrame()
wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()]
# Use cached data or fetch new if not available
if st.session_state.transactions_data is None or track_button:
with st.spinner("Fetching transactions..."):
transactions = track_whale_transactions(
wallets=wallet_list,
start_date=start_date,
end_date=end_date,
threshold_value=threshold_value,
threshold_type=threshold_type,
token_symbol=token_symbol
)
# Store in session state
st.session_state.transactions_data = transactions
else:
transactions = st.session_state.transactions_data
if not transactions.empty:
st.success(f"Found {len(transactions)} transactions matching your criteria")
# Display transactions
if len(transactions) > 0:
st.dataframe(transactions, use_container_width=True)
# Add download button
csv = transactions.to_csv(index=False).encode('utf-8')
st.download_button(
"Download Transactions CSV",
csv,
"whale_transactions.csv",
"text/csv",
key='download-csv'
)
# Volume by day chart
st.subheader("Transaction Volume by Day")
try:
st.plotly_chart(visualizer.plot_volume_by_day(transactions), use_container_width=True)
except Exception as e:
st.error(f"Error generating volume chart: {str(e)}")
# Transaction flow visualization
st.subheader("Transaction Flow")
try:
flow_chart = visualizer.plot_transaction_flow(transactions)
st.plotly_chart(flow_chart, use_container_width=True)
except Exception as e:
st.error(f"Error generating flow chart: {str(e)}")
else:
st.warning("No transactions found matching your criteria. Try adjusting the parameters.")
else:
st.info("Enter wallet addresses and click 'Track Transactions' to view whale activity")
with tab2:
st.header("Trading Patterns")
if track_button and wallet_addresses:
with st.spinner("Analyzing trading patterns..."):
# Function to analyze trading patterns
def analyze_trading_patterns(wallets, start_date, end_date):
# Direct analysis
try:
transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5)
if transactions_df.empty:
st.warning("No transactions found for the specified addresses")
return []
return data_processor.identify_patterns(transactions_df)
except Exception as e:
st.error(f"Error analyzing trading patterns: {str(e)}")
return []
wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()]
# Use cached data or fetch new if not available
if st.session_state.patterns_data is None or track_button:
with st.spinner("Analyzing trading patterns..."):
patterns = analyze_trading_patterns(
wallets=wallet_list,
start_date=start_date,
end_date=end_date
)
# Store in session state
st.session_state.patterns_data = patterns
else:
patterns = st.session_state.patterns_data
if patterns:
for i, pattern in enumerate(patterns):
pattern_card = st.container()
with pattern_card:
# Pattern header with name and risk profile
header_cols = st.columns([3, 1])
with header_cols[0]:
st.subheader(f"Pattern {i+1}: {pattern['name']}")
with header_cols[1]:
risk_color = "green"
if pattern.get('risk_profile') == "Medium":
risk_color = "orange"
elif pattern.get('risk_profile') in ["High", "Very High"]:
risk_color = "red"
st.markdown(f"<h5 style='color:{risk_color};'>Risk: {pattern.get('risk_profile', 'Unknown')}</h5>", unsafe_allow_html=True)
# Pattern description and details
st.markdown(f"**Description:** {pattern['description']}")
# Additional strategy information
if 'strategy' in pattern:
st.markdown(f"**Strategy:** {pattern['strategy']}")
# Time insight
if 'time_insight' in pattern:
st.info(pattern['time_insight'])
# Metrics
metric_cols = st.columns(3)
with metric_cols[0]:
st.markdown(f"**Occurrences:** {pattern['occurrence_count']} instances")
with metric_cols[1]:
st.markdown(f"**Confidence:** {pattern.get('confidence', 0):.2f}")
with metric_cols[2]:
st.markdown(f"**Volume:** {pattern.get('volume_metric', 'N/A')}")
# Display main chart first
if 'charts' in pattern and 'main' in pattern['charts']:
st.plotly_chart(pattern['charts']['main'], use_container_width=True)
elif 'chart_data' in pattern and pattern['chart_data'] is not None: # Fallback for old format
st.plotly_chart(pattern['chart_data'], use_container_width=True)
# Create two columns for additional charts
if 'charts' in pattern and len(pattern['charts']) > 1:
charts_col1, charts_col2 = st.columns(2)
# Hourly distribution chart
if 'hourly_distribution' in pattern['charts']:
with charts_col1:
st.plotly_chart(pattern['charts']['hourly_distribution'], use_container_width=True)
# Value distribution chart
if 'value_distribution' in pattern['charts']:
with charts_col2:
st.plotly_chart(pattern['charts']['value_distribution'], use_container_width=True)
# Advanced metrics in expander
if 'metrics' in pattern and pattern['metrics']:
with st.expander("Detailed Metrics"):
metrics_table = []
for k, v in pattern['metrics'].items():
if v is not None:
if isinstance(v, float):
metrics_table.append([k.replace('_', ' ').title(), f"{v:.4f}"])
else:
metrics_table.append([k.replace('_', ' ').title(), v])
if metrics_table:
st.table(pd.DataFrame(metrics_table, columns=["Metric", "Value"]))
# Display example transactions
if 'examples' in pattern and not pattern['examples'].empty:
with st.expander("Example Transactions"):
# Format the dataframe for better display
display_df = pattern['examples'].copy()
# Convert timestamp to readable format if needed
if 'timeStamp' in display_df.columns and not pd.api.types.is_datetime64_any_dtype(display_df['timeStamp']):
display_df['timeStamp'] = pd.to_datetime(display_df['timeStamp'], unit='s')
st.dataframe(display_df, use_container_width=True)
st.markdown("---")
else:
st.info("No significant trading patterns detected. Try expanding the date range or adding more addresses.")
else:
st.info("Track transactions to analyze trading patterns")
with tab3:
st.header("Price Impact Analysis")
if enable_price_impact and track_button and wallet_addresses:
with st.spinner("Analyzing price impact..."):
# Function to analyze price impact
def analyze_price_impact(wallets, start_date, end_date, lookback_minutes, lookahead_minutes):
# Direct analysis
transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5)
# Get token from first transaction
if not transactions_df.empty:
token_symbol = transactions_df.iloc[0].get('tokenSymbol', 'ETH')
# For each transaction, get price impact
price_impacts = {}
progress_bar = st.progress(0)
for idx, row in transactions_df.iterrows():
progress = int((idx + 1) / len(transactions_df) * 100)
progress_bar.progress(progress, text=f"Analyzing transaction {idx+1} of {len(transactions_df)}")
if 'timeStamp' in row:
try:
tx_time = datetime.fromtimestamp(int(row['timeStamp']))
impact_data = gemini_client.get_price_impact(
symbol=f"{token_symbol}USD",
transaction_time=tx_time,
lookback_minutes=lookback_minutes,
lookahead_minutes=lookahead_minutes
)
price_impacts[row['hash']] = impact_data
except Exception as e:
st.warning(f"Could not get price data for transaction: {str(e)}")
progress_bar.empty()
if price_impacts:
return data_processor.analyze_price_impact(transactions_df, price_impacts)
# Create an empty chart for the default case
empty_fig = go.Figure()
empty_fig.update_layout(
title="No Price Impact Data Available",
xaxis_title="Time",
yaxis_title="Price Impact (%)",
height=400,
template="plotly_white"
)
empty_fig.add_annotation(
text="No transactions found with price impact data",
showarrow=False,
font=dict(size=14)
)
return {
"avg_impact_pct": 0,
"max_impact_pct": 0,
"min_impact_pct": 0,
"significant_moves_count": 0,
"total_transactions": 0,
"transactions_with_impact": pd.DataFrame(),
"charts": {
"main_chart": empty_fig,
"impact_distribution": empty_fig,
"cumulative_impact": empty_fig,
"hourly_impact": empty_fig
},
"insights": [],
"impact_summary": "No price impact data available"
}
wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()]
# Use cached data or fetch new if not available
if st.session_state.price_impact_data is None or track_button:
with st.spinner("Analyzing price impact..."):
impact_analysis = analyze_price_impact(
wallets=wallet_list,
start_date=start_date,
end_date=end_date,
lookback_minutes=lookback_minutes,
lookahead_minutes=lookahead_minutes
)
# Store in session state
st.session_state.price_impact_data = impact_analysis
else:
impact_analysis = st.session_state.price_impact_data
if impact_analysis:
# Display impact summary
if 'impact_summary' in impact_analysis:
st.info(impact_analysis['impact_summary'])
# Summary metrics in two rows
metrics_row1 = st.columns(4)
with metrics_row1[0]:
st.metric("Avg. Price Impact (%)", f"{impact_analysis.get('avg_impact_pct', 0):.2f}%")
with metrics_row1[1]:
st.metric("Max Impact (%)", f"{impact_analysis.get('max_impact_pct', 0):.2f}%")
with metrics_row1[2]:
st.metric("Min Impact (%)", f"{impact_analysis.get('min_impact_pct', 0):.2f}%")
with metrics_row1[3]:
st.metric("Std Dev (%)", f"{impact_analysis.get('std_impact_pct', 0):.2f}%")
metrics_row2 = st.columns(4)
with metrics_row2[0]:
st.metric("Significant Moves", impact_analysis.get('significant_moves_count', 0))
with metrics_row2[1]:
st.metric("High Impact Moves", impact_analysis.get('high_impact_moves_count', 0))
with metrics_row2[2]:
st.metric("Positive/Negative", f"{impact_analysis.get('positive_impacts_count', 0)}/{impact_analysis.get('negative_impacts_count', 0)}")
with metrics_row2[3]:
st.metric("Total Transactions", impact_analysis.get('total_transactions', 0))
# Display insights if available
if 'insights' in impact_analysis and impact_analysis['insights']:
st.subheader("Key Insights")
for insight in impact_analysis['insights']:
st.markdown(f"**{insight['title']}**: {insight['description']}")
# Display the main chart
if 'charts' in impact_analysis and 'main_chart' in impact_analysis['charts']:
st.subheader("Price Impact Over Time")
st.plotly_chart(impact_analysis['charts']['main_chart'], use_container_width=True)
# Create two columns for secondary charts
col1, col2 = st.columns(2)
# Distribution chart
if 'charts' in impact_analysis and 'impact_distribution' in impact_analysis['charts']:
with col1:
st.plotly_chart(impact_analysis['charts']['impact_distribution'], use_container_width=True)
# Cumulative impact chart
if 'charts' in impact_analysis and 'cumulative_impact' in impact_analysis['charts']:
with col2:
st.plotly_chart(impact_analysis['charts']['cumulative_impact'], use_container_width=True)
# Hourly impact chart
if 'charts' in impact_analysis and 'hourly_impact' in impact_analysis['charts']:
st.plotly_chart(impact_analysis['charts']['hourly_impact'], use_container_width=True)
# Detailed transactions with impact
if not impact_analysis['transactions_with_impact'].empty:
st.subheader("Transactions with Price Impact")
# Convert numeric columns to have 2 decimal places for better display
display_df = impact_analysis['transactions_with_impact'].copy()
for col in ['impact_pct', 'pre_price', 'post_price', 'cumulative_impact']:
if col in display_df.columns:
display_df[col] = display_df[col].apply(lambda x: f"{float(x):.2f}%" if pd.notnull(x) else "N/A")
st.dataframe(display_df, use_container_width=True)
else:
st.info("No transaction-specific price impact data available")
else:
st.info("No price impact data available for the given parameters")
else:
st.info("Enable Price Impact Analysis and track transactions to see price effects")
with tab4:
st.header("Manipulation Alerts")
if enable_manipulation_detection and detect_button and wallet_addresses:
with st.spinner("Detecting potential manipulation..."):
wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()]
# Function to detect manipulation
def detect_manipulation(wallets, start_date, end_date, sensitivity):
try:
transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5)
if transactions_df.empty:
st.warning("No transactions found for the specified addresses")
return []
pump_dump = detection.detect_pump_and_dump(transactions_df, sensitivity)
wash_trades = detection.detect_wash_trading(transactions_df, wallets, sensitivity)
return pump_dump + wash_trades
except Exception as e:
st.error(f"Error detecting manipulation: {str(e)}")
return []
alerts = detect_manipulation(
wallets=wallet_list,
start_date=start_date,
end_date=end_date,
sensitivity=sensitivity
)
if alerts:
for i, alert in enumerate(alerts):
alert_color = "red" if alert['risk_level'] == "High" else "orange" if alert['risk_level'] == "Medium" else "blue"
with st.expander(f" {alert['type']} - Risk: {alert['risk_level']}", expanded=i==0):
st.markdown(f"<h4 style='color:{alert_color}'>{alert['title']}</h4>", unsafe_allow_html=True)
st.write(f"**Description:** {alert['description']}")
st.write(f"**Detection Time:** {alert['detection_time']}")
st.write(f"**Involved Addresses:** {', '.join(alert['addresses'])}")
# Display evidence
if 'evidence' in alert and alert['evidence'] is not None and not (isinstance(alert['evidence'], pd.DataFrame) and alert['evidence'].empty):
st.subheader("Evidence")
try:
evidence_df = alert['evidence']
if isinstance(evidence_df, str):
# Try to convert from JSON string if needed
evidence_df = pd.read_json(evidence_df)
st.dataframe(evidence_df, use_container_width=True)
except Exception as e:
st.error(f"Error displaying evidence: {str(e)}")
# Display chart if available
if 'chart' in alert and alert['chart'] is not None:
try:
st.plotly_chart(alert['chart'], use_container_width=True)
except Exception as e:
st.error(f"Error displaying chart: {str(e)}")
else:
st.success("No manipulation tactics detected for the given parameters")
else:
st.info("Enable Manipulation Detection and click 'Detect Manipulation' to scan for suspicious activity")
with tab5:
st.header("Reports & Visualizations")
# Report type selection
report_type = st.selectbox(
"Select Report Type",
["Transaction Summary", "Pattern Analysis", "Price Impact", "Manipulation Detection", "Complete Analysis"]
)
# Export format
export_format = st.radio(
"Export Format",
["CSV", "PDF", "PNG"],
horizontal=True
)
# Generate report button
if st.button("Generate Report"):
if wallet_addresses:
with st.spinner("Generating report..."):
wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()]
if CREW_ENABLED and crew_system is not None:
try:
with st.spinner("Generating AI analysis report..."):
# Check if crew_system has llm attribute defined
if not hasattr(crew_system, 'llm') or crew_system.llm is None:
raise ValueError("LLM not initialized in crew system")
report = crew_system.generate_market_manipulation_report(wallet_addresses=wallet_list)
st.markdown(f"## AI Analysis Report")
st.markdown(report['content'])
if 'charts' in report and report['charts']:
for i, chart in enumerate(report['charts']):
st.plotly_chart(chart, use_container_width=True)
except Exception as e:
st.error(f"CrewAI report generation failed: {str(e)}")
st.warning("Using direct analysis instead")
# Fallback to direct analysis
with st.spinner("Generating basic analysis..."):
insights = detection.generate_manipulation_insights(transactions=st.session_state.transactions_data)
st.markdown(f"## Potential Manipulation Insights")
for insight in insights:
st.markdown(f"**{insight['title']}**\n{insight['description']}")
else:
st.error("Failed to generate report: CrewAI is not enabled")
else:
st.error("Please enter wallet addresses to generate a report")
# Footer with instructions
st.markdown("---")
with st.expander("How to Use"):
st.markdown("""
### Typical Workflow
1. **Input wallet addresses** in the sidebar - these are the whale wallets you want to track
2. **Set the minimum threshold** for transaction size (token amount or USD value)
3. **Select time period** for analysis
4. **Click 'Track Transactions'** to see large transfers for these wallets
5. **Enable additional analysis** like pattern recognition or manipulation detection
6. **Export reports** for further analysis or record-keeping
### API Keys
This app requires two API keys to function properly:
- **ARBISCAN_API_KEY** - For accessing Arbitrum blockchain data
- **GEMINI_API_KEY** - For real-time token price data
These should be stored in a `.env` file in the project root.
""")