import streamlit as st import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objects as go import os import json import logging import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Union, Any from dotenv import load_dotenv # Configure logging - Reduce verbosity and improve performance logging.basicConfig( level=logging.WARNING, # Only show warnings and errors by default format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a custom filter to suppress repetitive Gemini API errors class SuppressRepetitiveErrors(logging.Filter): def __init__(self): super().__init__() self.error_counts = {} self.max_errors = 3 # Show at most 3 instances of each error def filter(self, record): if record.levelno < logging.WARNING: return True # If it's a Gemini API error for non-existent tokens, suppress it after a few occurrences if 'Error fetching historical prices from Gemini API' in record.getMessage(): key = 'gemini_api_error' self.error_counts[key] = self.error_counts.get(key, 0) + 1 # Only allow the first few errors through return self.error_counts[key] <= self.max_errors return True # Apply the filter logging.getLogger().addFilter(SuppressRepetitiveErrors()) from modules.api_client import ArbiscanClient, GeminiClient from modules.data_processor import DataProcessor from modules.visualizer import Visualizer from modules.detection import ManipulationDetector # Load environment variables load_dotenv() # Set page configuration st.set_page_config( page_title="Whale Wallet AI - Market Manipulation Detection", page_icon="🐳", layout="wide", initial_sidebar_state="expanded" ) # Add custom CSS st.markdown(""" """, unsafe_allow_html=True) # Initialize Streamlit session state for persisting data between tab navigation if 'transactions_data' not in st.session_state: st.session_state.transactions_data = pd.DataFrame() if 'patterns_data' not in st.session_state: st.session_state.patterns_data = None if 'price_impact_data' not in st.session_state: st.session_state.price_impact_data = None # Performance metrics tracking if 'performance_metrics' not in st.session_state: st.session_state.performance_metrics = { 'api_calls': 0, 'data_processing_time': 0, 'visualization_time': 0, 'last_refresh': None } # Function to track performance def track_timing(category: str): def timing_decorator(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) elapsed = time.time() - start_time if category in st.session_state.performance_metrics: st.session_state.performance_metrics[category] += elapsed else: st.session_state.performance_metrics[category] = elapsed return result return wrapper return timing_decorator if 'alerts_data' not in st.session_state: st.session_state.alerts_data = None # Initialize API clients arbiscan_client = ArbiscanClient(os.getenv("ARBISCAN_API_KEY")) # Set debug mode to False to reduce log output arbiscan_client.verbose_debug = False gemini_client = GeminiClient(os.getenv("GEMINI_API_KEY")) # Initialize data processor and visualizer data_processor = DataProcessor() visualizer = Visualizer() # Apply performance tracking to key instance methods after initialization original_fetch_whale = arbiscan_client.fetch_whale_transactions arbiscan_client.fetch_whale_transactions = track_timing('api_calls')(original_fetch_whale) original_identify_patterns = data_processor.identify_patterns data_processor.identify_patterns = track_timing('data_processing_time')(original_identify_patterns) original_analyze_price_impact = data_processor.analyze_price_impact data_processor.analyze_price_impact = track_timing('data_processing_time')(original_analyze_price_impact) detection = ManipulationDetector() # Initialize crew system (for AI-assisted analysis) try: from modules.crew_system import WhaleAnalysisCrewSystem crew_system = WhaleAnalysisCrewSystem(arbiscan_client, gemini_client, data_processor) CREW_ENABLED = True logging.info("CrewAI system loaded successfully") except Exception as e: CREW_ENABLED = False logging.error(f"Failed to load CrewAI system: {str(e)}") st.sidebar.error("CrewAI features are disabled due to an error.") # Sidebar for inputs st.sidebar.header("Configuration") # Wallet tracking section st.sidebar.subheader("Track Wallets") wallet_addresses = st.sidebar.text_area( "Enter wallet addresses (one per line)", placeholder="0x1234abcd...\n0xabcd1234..." ) threshold_type = st.sidebar.radio( "Threshold Type", ["Token Amount", "USD Value"] ) if threshold_type == "Token Amount": threshold_value = st.sidebar.number_input("Minimum Token Amount", min_value=0.0, value=1000.0) token_symbol = st.sidebar.text_input("Token Symbol", placeholder="ETH") else: threshold_value = st.sidebar.number_input("Minimum USD Value", min_value=0.0, value=100000.0) # Time period selection st.sidebar.subheader("Time Period") time_period = st.sidebar.selectbox( "Select Time Period", ["Last 24 hours", "Last 7 days", "Last 30 days", "Custom"] ) if time_period == "Custom": start_date = st.sidebar.date_input("Start Date", datetime.now() - timedelta(days=7)) end_date = st.sidebar.date_input("End Date", datetime.now()) else: # Calculate dates based on selection end_date = datetime.now() if time_period == "Last 24 hours": start_date = end_date - timedelta(days=1) elif time_period == "Last 7 days": start_date = end_date - timedelta(days=7) else: # Last 30 days start_date = end_date - timedelta(days=30) # Manipulation detection settings st.sidebar.subheader("Manipulation Detection") enable_manipulation_detection = st.sidebar.toggle("Enable Manipulation Detection", value=True) if enable_manipulation_detection: sensitivity = st.sidebar.select_slider( "Detection Sensitivity", options=["Low", "Medium", "High"], value="Medium" ) # Price impact analysis settings st.sidebar.subheader("Price Impact Analysis") enable_price_impact = st.sidebar.toggle("Enable Price Impact Analysis", value=True) if enable_price_impact: lookback_minutes = st.sidebar.slider("Lookback (minutes)", 1, 60, 5) lookahead_minutes = st.sidebar.slider("Lookahead (minutes)", 1, 60, 5) # Action buttons track_button = st.sidebar.button("Track Transactions", type="primary") pattern_button = st.sidebar.button("Analyze Patterns") if enable_manipulation_detection: detect_button = st.sidebar.button("Detect Manipulation") # Main content area tab1, tab2, tab3, tab4, tab5 = st.tabs([ "Transactions", "Patterns", "Price Impact", "Alerts", "Reports" ]) with tab1: st.header("Whale Transactions") if track_button and wallet_addresses: with st.spinner("Fetching whale transactions..."): # Function to track whale transactions def track_whale_transactions(wallets, start_date, end_date, threshold_value, threshold_type, token_symbol=None): # Direct API call since CrewAI is temporarily disabled try: min_token_amount = None min_usd_value = None if threshold_type == "Token Amount": min_token_amount = threshold_value else: min_usd_value = threshold_value # Add pagination control to prevent infinite API requests max_pages = 5 # Limit the number of pages to prevent excessive API calls transactions = arbiscan_client.fetch_whale_transactions( addresses=wallets, min_token_amount=min_token_amount, max_pages=5, min_usd_value=min_usd_value ) if transactions.empty: st.warning("No transactions found for the specified addresses") return transactions except Exception as e: st.error(f"Error fetching transactions: {str(e)}") return pd.DataFrame() wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()] # Use cached data or fetch new if not available if st.session_state.transactions_data is None or track_button: with st.spinner("Fetching transactions..."): transactions = track_whale_transactions( wallets=wallet_list, start_date=start_date, end_date=end_date, threshold_value=threshold_value, threshold_type=threshold_type, token_symbol=token_symbol ) # Store in session state st.session_state.transactions_data = transactions else: transactions = st.session_state.transactions_data if not transactions.empty: st.success(f"Found {len(transactions)} transactions matching your criteria") # Display transactions if len(transactions) > 0: st.dataframe(transactions, use_container_width=True) # Add download button csv = transactions.to_csv(index=False).encode('utf-8') st.download_button( "Download Transactions CSV", csv, "whale_transactions.csv", "text/csv", key='download-csv' ) # Volume by day chart st.subheader("Transaction Volume by Day") try: st.plotly_chart(visualizer.plot_volume_by_day(transactions), use_container_width=True) except Exception as e: st.error(f"Error generating volume chart: {str(e)}") # Transaction flow visualization st.subheader("Transaction Flow") try: flow_chart = visualizer.plot_transaction_flow(transactions) st.plotly_chart(flow_chart, use_container_width=True) except Exception as e: st.error(f"Error generating flow chart: {str(e)}") else: st.warning("No transactions found matching your criteria. Try adjusting the parameters.") else: st.info("Enter wallet addresses and click 'Track Transactions' to view whale activity") with tab2: st.header("Trading Patterns") if track_button and wallet_addresses: with st.spinner("Analyzing trading patterns..."): # Function to analyze trading patterns def analyze_trading_patterns(wallets, start_date, end_date): # Direct analysis try: transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5) if transactions_df.empty: st.warning("No transactions found for the specified addresses") return [] return data_processor.identify_patterns(transactions_df) except Exception as e: st.error(f"Error analyzing trading patterns: {str(e)}") return [] wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()] # Use cached data or fetch new if not available if st.session_state.patterns_data is None or track_button: with st.spinner("Analyzing trading patterns..."): patterns = analyze_trading_patterns( wallets=wallet_list, start_date=start_date, end_date=end_date ) # Store in session state st.session_state.patterns_data = patterns else: patterns = st.session_state.patterns_data if patterns: for i, pattern in enumerate(patterns): pattern_card = st.container() with pattern_card: # Pattern header with name and risk profile header_cols = st.columns([3, 1]) with header_cols[0]: st.subheader(f"Pattern {i+1}: {pattern['name']}") with header_cols[1]: risk_color = "green" if pattern.get('risk_profile') == "Medium": risk_color = "orange" elif pattern.get('risk_profile') in ["High", "Very High"]: risk_color = "red" st.markdown(f"
Risk: {pattern.get('risk_profile', 'Unknown')}
", unsafe_allow_html=True) # Pattern description and details st.markdown(f"**Description:** {pattern['description']}") # Additional strategy information if 'strategy' in pattern: st.markdown(f"**Strategy:** {pattern['strategy']}") # Time insight if 'time_insight' in pattern: st.info(pattern['time_insight']) # Metrics metric_cols = st.columns(3) with metric_cols[0]: st.markdown(f"**Occurrences:** {pattern['occurrence_count']} instances") with metric_cols[1]: st.markdown(f"**Confidence:** {pattern.get('confidence', 0):.2f}") with metric_cols[2]: st.markdown(f"**Volume:** {pattern.get('volume_metric', 'N/A')}") # Display main chart first if 'charts' in pattern and 'main' in pattern['charts']: st.plotly_chart(pattern['charts']['main'], use_container_width=True) elif 'chart_data' in pattern and pattern['chart_data'] is not None: # Fallback for old format st.plotly_chart(pattern['chart_data'], use_container_width=True) # Create two columns for additional charts if 'charts' in pattern and len(pattern['charts']) > 1: charts_col1, charts_col2 = st.columns(2) # Hourly distribution chart if 'hourly_distribution' in pattern['charts']: with charts_col1: st.plotly_chart(pattern['charts']['hourly_distribution'], use_container_width=True) # Value distribution chart if 'value_distribution' in pattern['charts']: with charts_col2: st.plotly_chart(pattern['charts']['value_distribution'], use_container_width=True) # Advanced metrics in expander if 'metrics' in pattern and pattern['metrics']: with st.expander("Detailed Metrics"): metrics_table = [] for k, v in pattern['metrics'].items(): if v is not None: if isinstance(v, float): metrics_table.append([k.replace('_', ' ').title(), f"{v:.4f}"]) else: metrics_table.append([k.replace('_', ' ').title(), v]) if metrics_table: st.table(pd.DataFrame(metrics_table, columns=["Metric", "Value"])) # Display example transactions if 'examples' in pattern and not pattern['examples'].empty: with st.expander("Example Transactions"): # Format the dataframe for better display display_df = pattern['examples'].copy() # Convert timestamp to readable format if needed if 'timeStamp' in display_df.columns and not pd.api.types.is_datetime64_any_dtype(display_df['timeStamp']): display_df['timeStamp'] = pd.to_datetime(display_df['timeStamp'], unit='s') st.dataframe(display_df, use_container_width=True) st.markdown("---") else: st.info("No significant trading patterns detected. Try expanding the date range or adding more addresses.") else: st.info("Track transactions to analyze trading patterns") with tab3: st.header("Price Impact Analysis") if enable_price_impact and track_button and wallet_addresses: with st.spinner("Analyzing price impact..."): # Function to analyze price impact def analyze_price_impact(wallets, start_date, end_date, lookback_minutes, lookahead_minutes): # Direct analysis transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5) # Get token from first transaction if not transactions_df.empty: token_symbol = transactions_df.iloc[0].get('tokenSymbol', 'ETH') # For each transaction, get price impact price_impacts = {} progress_bar = st.progress(0) for idx, row in transactions_df.iterrows(): progress = int((idx + 1) / len(transactions_df) * 100) progress_bar.progress(progress, text=f"Analyzing transaction {idx+1} of {len(transactions_df)}") if 'timeStamp' in row: try: tx_time = datetime.fromtimestamp(int(row['timeStamp'])) impact_data = gemini_client.get_price_impact( symbol=f"{token_symbol}USD", transaction_time=tx_time, lookback_minutes=lookback_minutes, lookahead_minutes=lookahead_minutes ) price_impacts[row['hash']] = impact_data except Exception as e: st.warning(f"Could not get price data for transaction: {str(e)}") progress_bar.empty() if price_impacts: return data_processor.analyze_price_impact(transactions_df, price_impacts) # Create an empty chart for the default case empty_fig = go.Figure() empty_fig.update_layout( title="No Price Impact Data Available", xaxis_title="Time", yaxis_title="Price Impact (%)", height=400, template="plotly_white" ) empty_fig.add_annotation( text="No transactions found with price impact data", showarrow=False, font=dict(size=14) ) return { "avg_impact_pct": 0, "max_impact_pct": 0, "min_impact_pct": 0, "significant_moves_count": 0, "total_transactions": 0, "transactions_with_impact": pd.DataFrame(), "charts": { "main_chart": empty_fig, "impact_distribution": empty_fig, "cumulative_impact": empty_fig, "hourly_impact": empty_fig }, "insights": [], "impact_summary": "No price impact data available" } wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()] # Use cached data or fetch new if not available if st.session_state.price_impact_data is None or track_button: with st.spinner("Analyzing price impact..."): impact_analysis = analyze_price_impact( wallets=wallet_list, start_date=start_date, end_date=end_date, lookback_minutes=lookback_minutes, lookahead_minutes=lookahead_minutes ) # Store in session state st.session_state.price_impact_data = impact_analysis else: impact_analysis = st.session_state.price_impact_data if impact_analysis: # Display impact summary if 'impact_summary' in impact_analysis: st.info(impact_analysis['impact_summary']) # Summary metrics in two rows metrics_row1 = st.columns(4) with metrics_row1[0]: st.metric("Avg. Price Impact (%)", f"{impact_analysis.get('avg_impact_pct', 0):.2f}%") with metrics_row1[1]: st.metric("Max Impact (%)", f"{impact_analysis.get('max_impact_pct', 0):.2f}%") with metrics_row1[2]: st.metric("Min Impact (%)", f"{impact_analysis.get('min_impact_pct', 0):.2f}%") with metrics_row1[3]: st.metric("Std Dev (%)", f"{impact_analysis.get('std_impact_pct', 0):.2f}%") metrics_row2 = st.columns(4) with metrics_row2[0]: st.metric("Significant Moves", impact_analysis.get('significant_moves_count', 0)) with metrics_row2[1]: st.metric("High Impact Moves", impact_analysis.get('high_impact_moves_count', 0)) with metrics_row2[2]: st.metric("Positive/Negative", f"{impact_analysis.get('positive_impacts_count', 0)}/{impact_analysis.get('negative_impacts_count', 0)}") with metrics_row2[3]: st.metric("Total Transactions", impact_analysis.get('total_transactions', 0)) # Display insights if available if 'insights' in impact_analysis and impact_analysis['insights']: st.subheader("Key Insights") for insight in impact_analysis['insights']: st.markdown(f"**{insight['title']}**: {insight['description']}") # Display the main chart if 'charts' in impact_analysis and 'main_chart' in impact_analysis['charts']: st.subheader("Price Impact Over Time") st.plotly_chart(impact_analysis['charts']['main_chart'], use_container_width=True) # Create two columns for secondary charts col1, col2 = st.columns(2) # Distribution chart if 'charts' in impact_analysis and 'impact_distribution' in impact_analysis['charts']: with col1: st.plotly_chart(impact_analysis['charts']['impact_distribution'], use_container_width=True) # Cumulative impact chart if 'charts' in impact_analysis and 'cumulative_impact' in impact_analysis['charts']: with col2: st.plotly_chart(impact_analysis['charts']['cumulative_impact'], use_container_width=True) # Hourly impact chart if 'charts' in impact_analysis and 'hourly_impact' in impact_analysis['charts']: st.plotly_chart(impact_analysis['charts']['hourly_impact'], use_container_width=True) # Detailed transactions with impact if not impact_analysis['transactions_with_impact'].empty: st.subheader("Transactions with Price Impact") # Convert numeric columns to have 2 decimal places for better display display_df = impact_analysis['transactions_with_impact'].copy() for col in ['impact_pct', 'pre_price', 'post_price', 'cumulative_impact']: if col in display_df.columns: display_df[col] = display_df[col].apply(lambda x: f"{float(x):.2f}%" if pd.notnull(x) else "N/A") st.dataframe(display_df, use_container_width=True) else: st.info("No transaction-specific price impact data available") else: st.info("No price impact data available for the given parameters") else: st.info("Enable Price Impact Analysis and track transactions to see price effects") with tab4: st.header("Manipulation Alerts") if enable_manipulation_detection and detect_button and wallet_addresses: with st.spinner("Detecting potential manipulation..."): wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()] # Function to detect manipulation def detect_manipulation(wallets, start_date, end_date, sensitivity): try: transactions_df = arbiscan_client.fetch_whale_transactions(addresses=wallets, max_pages=5) if transactions_df.empty: st.warning("No transactions found for the specified addresses") return [] pump_dump = detection.detect_pump_and_dump(transactions_df, sensitivity) wash_trades = detection.detect_wash_trading(transactions_df, wallets, sensitivity) return pump_dump + wash_trades except Exception as e: st.error(f"Error detecting manipulation: {str(e)}") return [] alerts = detect_manipulation( wallets=wallet_list, start_date=start_date, end_date=end_date, sensitivity=sensitivity ) if alerts: for i, alert in enumerate(alerts): alert_color = "red" if alert['risk_level'] == "High" else "orange" if alert['risk_level'] == "Medium" else "blue" with st.expander(f" {alert['type']} - Risk: {alert['risk_level']}", expanded=i==0): st.markdown(f"

{alert['title']}

", unsafe_allow_html=True) st.write(f"**Description:** {alert['description']}") st.write(f"**Detection Time:** {alert['detection_time']}") st.write(f"**Involved Addresses:** {', '.join(alert['addresses'])}") # Display evidence if 'evidence' in alert and alert['evidence'] is not None and not (isinstance(alert['evidence'], pd.DataFrame) and alert['evidence'].empty): st.subheader("Evidence") try: evidence_df = alert['evidence'] if isinstance(evidence_df, str): # Try to convert from JSON string if needed evidence_df = pd.read_json(evidence_df) st.dataframe(evidence_df, use_container_width=True) except Exception as e: st.error(f"Error displaying evidence: {str(e)}") # Display chart if available if 'chart' in alert and alert['chart'] is not None: try: st.plotly_chart(alert['chart'], use_container_width=True) except Exception as e: st.error(f"Error displaying chart: {str(e)}") else: st.success("No manipulation tactics detected for the given parameters") else: st.info("Enable Manipulation Detection and click 'Detect Manipulation' to scan for suspicious activity") with tab5: st.header("Reports & Visualizations") # Report type selection report_type = st.selectbox( "Select Report Type", ["Transaction Summary", "Pattern Analysis", "Price Impact", "Manipulation Detection", "Complete Analysis"] ) # Export format export_format = st.radio( "Export Format", ["CSV", "PDF", "PNG"], horizontal=True ) # Generate report button if st.button("Generate Report"): if wallet_addresses: with st.spinner("Generating report..."): wallet_list = [addr.strip() for addr in wallet_addresses.split("\n") if addr.strip()] if CREW_ENABLED and crew_system is not None: try: with st.spinner("Generating AI analysis report..."): # Check if crew_system has llm attribute defined if not hasattr(crew_system, 'llm') or crew_system.llm is None: raise ValueError("LLM not initialized in crew system") report = crew_system.generate_market_manipulation_report(wallet_addresses=wallet_list) st.markdown(f"## AI Analysis Report") st.markdown(report['content']) if 'charts' in report and report['charts']: for i, chart in enumerate(report['charts']): st.plotly_chart(chart, use_container_width=True) except Exception as e: st.error(f"CrewAI report generation failed: {str(e)}") st.warning("Using direct analysis instead") # Fallback to direct analysis with st.spinner("Generating basic analysis..."): insights = detection.generate_manipulation_insights(transactions=st.session_state.transactions_data) st.markdown(f"## Potential Manipulation Insights") for insight in insights: st.markdown(f"**{insight['title']}**\n{insight['description']}") else: st.error("Failed to generate report: CrewAI is not enabled") else: st.error("Please enter wallet addresses to generate a report") # Footer with instructions st.markdown("---") with st.expander("How to Use"): st.markdown(""" ### Typical Workflow 1. **Input wallet addresses** in the sidebar - these are the whale wallets you want to track 2. **Set the minimum threshold** for transaction size (token amount or USD value) 3. **Select time period** for analysis 4. **Click 'Track Transactions'** to see large transfers for these wallets 5. **Enable additional analysis** like pattern recognition or manipulation detection 6. **Export reports** for further analysis or record-keeping ### API Keys This app requires two API keys to function properly: - **ARBISCAN_API_KEY** - For accessing Arbitrum blockchain data - **GEMINI_API_KEY** - For real-time token price data These should be stored in a `.env` file in the project root. """)