import pandas as pd import numpy as np import ccxt import time import os import csv # Import csv module for logging import traceback # Import traceback for detailed error logging from datetime import datetime, timedelta import warnings import plotly.graph_objects as go import plotly.colors as pcolors import gradio as gr # Import necessary TA indicators (Existing + New) from ta.trend import MACD, ADXIndicator, IchimokuIndicator, VortexIndicator from ta.momentum import RSIIndicator, StochasticOscillator, AwesomeOscillatorIndicator, WilliamsRIndicator from ta.volume import MFIIndicator, OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, VolumeWeightedAveragePrice from ta.volatility import AverageTrueRange, BollingerBands # Suppress specific warnings warnings.filterwarnings('ignore', category=RuntimeWarning) warnings.filterwarnings('ignore', category=FutureWarning) warnings.filterwarnings('ignore', category=UserWarning) # Ignore some TA lib warnings if needed # --- Configuration --- DEFAULT_EXCHANGE_ID = 'mexc' # Changed default as requested context seemed to imply binance API issues DEFAULT_TOP_N_COINS = 30 # Reduced default due to increased backtest history DEFAULT_TIMEFRAMES = ['1m', '5m', '15m', '30m', '1h', '4h'] # Example Timeframes DEFAULT_MIN_CONFIRMATION = 0.75 # Used for Zone finding (Average Score) # Increased limits for longer backtesting LIMIT_PER_TIMEFRAME = 1050 # Needs to be >= BACKTEST_HISTORY_CANDLES + indicator lookbacks (~50) BACKTEST_HISTORY_CANDLES = 1000 # Increased backtest candle count # --- Trade Parameters --- ATR_SL_MULTIPLIER = 1.5 ATR_TP1_MULTIPLIER = 1.0 ATR_TP2_MULTIPLIER = 2.0 LEVERAGES = [20, 50] # For display/estimation only SIMULATED_FEE_PERCENT = 0.06 # Approximate futures fee per side (entry/exit = *2) BACKTEST_RESULTS_FILE = 'backtest_summary_enhanced.csv' SIGNAL_LOG_FILE = 'realtime_signal_log.csv' # <<< New: CSV file for logging signals TIMEFRAME_ORDER_MAP = { '1m': 1, '3m': 2, '5m': 3, '15m': 4, '30m': 5, '1h': 6, '2h': 7, '4h': 8, '6h': 9, '8h': 10, '12h': 11, '1d': 12, '3d': 13, '1w': 14, '1M': 15 } # TIMEFRAME_WEIGHTS not actively used in current logic, kept for potential future use. # --- CSV Signal Logging Function --- def log_signal_to_csv(signal_info): """Appends signal information to the CSV log file.""" file_exists = os.path.isfile(SIGNAL_LOG_FILE) fieldnames = [ 'LogTimestamp', 'SignalCandleTime', 'Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2', 'Status', #'RSI', 'MACD_Diff', 'ADX' # Optional: Add key indicator values ] try: with open(SIGNAL_LOG_FILE, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists: writer.writeheader() # Write header only if file is new writer.writerow(signal_info) except IOError as e: print(f"Error: Could not write to CSV log file {SIGNAL_LOG_FILE}: {e}") except Exception as e: print(f"Error logging signal to CSV: {e}\n{traceback.format_exc()}") # --- Crypto Analysis Class --- class CryptoTrendIndicator: def __init__(self, exchange_id, top_coins, selected_timeframes): self.exchange_id = exchange_id self.top_coins = top_coins self.requested_timeframes = selected_timeframes self.exchange = None self.valid_timeframes = [] # Stores detailed results {symbol: {tf: {signals: {}, values: {}, trade_params: {}, direction: int}}} self.analysis_results = {} self.backtest_results = [] # List of dicts for backtest summary self.heatmap_df = pd.DataFrame() self.active_signals_df = pd.DataFrame() # DataFrame for active signals table self.heatmap_details = {} # Stores dict for hover {Coin: {tf: {Ind: Val,...}}} self._initialize_exchange() def _initialize_exchange(self): """Initialize the ccxt exchange instance.""" try: # Force spot market for analysis consistency self.exchange = getattr(ccxt, self.exchange_id)({ 'enableRateLimit': True, 'options': {'defaultType': 'spot'} # Use SPOT for fetching data }) # Increase timeout for potentially longer data fetches self.exchange.timeout = 30000 # 30 seconds self.exchange.load_markets(reload=True) print(f"Exchange {self.exchange_id} initialized (using SPOT markets for data).") self._validate_timeframes() except AttributeError: raise ValueError(f"Error: Exchange '{self.exchange_id}' not found or supported by ccxt.") except ccxt.AuthenticationError as e: raise ValueError(f"Authentication Error for {self.exchange_id}: {e}") except ccxt.ExchangeError as e: raise ValueError(f"Exchange Error initializing {self.exchange_id}: {e}") except Exception as e: raise ValueError(f"Unexpected error initializing exchange: {e}\n{traceback.format_exc()}") def _validate_timeframes(self): """Filter selected timeframes against those supported by the exchange.""" if not self.exchange or not self.exchange.timeframes: print(f"Warning: Could not get timeframes from {self.exchange_id}. Cannot validate.") # Attempt to use requested timeframes, hoping they are valid self.valid_timeframes = sorted( self.requested_timeframes, key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99) ) print(f"Proceeding with requested timeframes (validation skipped): {self.valid_timeframes}") return supported_tfs = self.exchange.timeframes self.valid_timeframes = sorted( [tf for tf in self.requested_timeframes if tf in supported_tfs], key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99) ) print(f"Supported timeframes for analysis: {self.valid_timeframes}") if len(self.valid_timeframes) != len(self.requested_timeframes): skipped = set(self.requested_timeframes) - set(self.valid_timeframes) print(f"Warning: Skipped unsupported timeframes for {self.exchange_id}: {', '.join(skipped)}") if not self.valid_timeframes: print(f"Warning: No valid timeframes selected or supported by {self.exchange_id}.") def fetch_top_coins(self): """Fetch the top coins by USDT volume from the exchange (spot only)""" if not self.exchange: return [], "Exchange not initialized" # --- Logic unchanged --- try: tickers = self.exchange.fetch_tickers() usdt_pairs = {} # Stricter filtering for spot, non-leveraged, common pairs for symbol, data in tickers.items(): try: market = self.exchange.market(symbol) if (symbol.endswith('/USDT') and data is not None and market is not None and market.get('spot', False) and # Explicitly check for spot market.get('active', True) and not market.get('leveraged', False) and # Exclude leveraged data.get('quoteVolume') is not None and data['quoteVolume'] > 10000 and # Example: Filter low volume data.get('symbol') is not None and # Additional filters for common leveraged/problematic tokens 'UP/' not in symbol and 'DOWN/' not in symbol and 'BULL/' not in symbol and 'BEAR/' not in symbol and '3L/' not in symbol and '3S/' not in symbol and '5L/' not in symbol and '5S/' not in symbol ): usdt_pairs[symbol] = data except ccxt.BadSymbol: continue # Skip symbols that ccxt can't parse market data for except Exception as e_inner: # print(f"Minor error processing ticker {symbol}: {e_inner}") # Log minor errors continue if not usdt_pairs: return [], f"No suitable USDT spot pairs found on {self.exchange_id} (check filters/volume)." sorted_pairs = sorted( usdt_pairs.items(), key=lambda x: x[1]['quoteVolume'], reverse=True ) # Fetch slightly more initially to account for potential loading errors fetch_limit = min(len(sorted_pairs), self.top_coins + 10) # Fetch slightly more top_symbols_initial = [pair[0] for pair in sorted_pairs[:fetch_limit]] # Filter again to ensure markets are loadable and active final_symbols = [] count = 0 print(f"Validating {len(top_symbols_initial)} potential symbols...") for s in top_symbols_initial: if count >= self.top_coins: break try: mkt = self.exchange.market(s) # Check if market data is valid & active if mkt and mkt.get('active', True): final_symbols.append(s) count += 1 except ccxt.BadSymbol: pass except Exception as e: print(f"Market {s} skipped during validation due to error: {e}") msg = f"Fetched and validated top {len(final_symbols)} USDT spot pairs by volume from {self.exchange_id}." print(msg) if not final_symbols: msg += " (Warning: Result list is empty)" return final_symbols, msg except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e: msg = f"Network/Timeout Error fetching tickers from {self.exchange_id}: {e}." print(msg) return [], msg except ccxt.ExchangeError as e: msg = f"Exchange Error fetching tickers from {self.exchange_id}: {e}" print(msg) return [], msg except Exception as e: msg = f"An unexpected error occurred fetching top coins: {e}\n{traceback.format_exc()}" print(msg) return [], msg def fetch_ohlcv_data(self, symbol, timeframe, limit=LIMIT_PER_TIMEFRAME): """Fetches OHLCV data with retry mechanism.""" if not self.exchange: return [], "Exchange not initialized" max_retries = 3 retry_delay = 5 # seconds for attempt in range(max_retries): try: # print(f"Fetching {limit} candles for {symbol} {timeframe} (Attempt {attempt+1})...") # *** Fetch as many as limit allows *** # CCXT handles the max limit per request internally, # this 'limit' param tells it the total number desired. # If limit > exchange max, ccxt might fetch multiple times if supported, # or just return the max allowed per call. We rely on ccxt's behavior here. ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) if not ohlcv: # If empty list is returned, treat as insufficient return [], f"No data returned for {symbol} [{timeframe}]" elif len(ohlcv) < 100: # Need substantial data for long backtest & indicators return [], f"Insufficient data ({len(ohlcv)}) for {symbol} [{timeframe}] (Need >100)" elif len(ohlcv) < BACKTEST_HISTORY_CANDLES + 50: print(f"Warning: Fetched {len(ohlcv)} candles for {symbol} [{timeframe}], less than ideal ({BACKTEST_HISTORY_CANDLES + 50}) for full backtest + lookback.") # Proceed anyway, backtest might be shorter than intended # Successfully fetched enough data print(f"Fetched {len(ohlcv)} candles for {symbol} [{timeframe}]") return ohlcv, None except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e: print(f"Network error fetching {symbol} [{timeframe}] (Attempt {attempt+1}): {e}. Retrying in {retry_delay}s...") if attempt == max_retries - 1: return [], f"Network error {symbol} [{timeframe}] after {max_retries} attempts: {e}" time.sleep(retry_delay + attempt * 2) # Incremental backoff except ccxt.RateLimitExceeded as e: print(f"Rate limit hit fetching {symbol} [{timeframe}]. Waiting longer...") time.sleep(self.exchange.rateLimit / 1000 * 5 if self.exchange.rateLimit else 60) # Wait longer for rate limits if attempt == max_retries - 1: return [], f"Rate limit exceeded for {symbol} [{timeframe}] after retries: {e}" # Continue to next attempt after waiting except ccxt.BadSymbol: return [], f"Invalid symbol {symbol}" except ccxt.ExchangeError as e: print(f"Exchange error fetching {symbol} [{timeframe}]: {e}") # Handle specific errors if needed, e.g., timeframe not available for symbol if 'timeframe not available' in str(e).lower(): return [], f"Timeframe {timeframe} not supported for {symbol} on {self.exchange_id}" return [], f"Exchange error {symbol} [{timeframe}]: {e}" except Exception as e: print(f"Unexpected error fetching OHLCV {symbol} [{timeframe}]: {e}\n{traceback.format_exc()}") return [], f"Unexpected error fetching OHLCV {symbol} [{timeframe}]" return [], f"Failed to fetch data for {symbol} [{timeframe}] after {max_retries} attempts." def calculate_indicators(self, ohlcv_data, timeframe): """Calculates existing and new technical indicators.""" # Increased required length for Ichimoku and other lookbacks required_length = 100 if not isinstance(ohlcv_data, list) or len(ohlcv_data) < required_length: print(f"Indicator calc skip: Input data invalid or too short for {timeframe} (needs {required_length}, got {len(ohlcv_data) if ohlcv_data else 0})") return None try: df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) # Convert to numeric, coerce errors, drop NaNs needed for core calcs for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') df.dropna(subset=['close', 'volume', 'high', 'low'], inplace=True) # Drop rows where essential data is missing if df.empty or len(df) < required_length: print(f"Data too short after cleaning for {timeframe} (needs {required_length}, got {len(df)})") return None # --- Calculate Existing Indicators --- df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi().fillna(50) stoch_obj = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=14, smooth_window=3) df['stoch_k'] = stoch_obj.stoch().fillna(50) df['stoch_d'] = stoch_obj.stoch_signal().fillna(50) ao_obj = AwesomeOscillatorIndicator(high=df['high'], low=df['low'], fillna=True) df['ao'] = ao_obj.awesome_oscillator().fillna(0) macd_obj = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9, fillna=True) df['macd'] = macd_obj.macd().fillna(0) df['macd_signal'] = macd_obj.macd_signal().fillna(0) df['macd_diff'] = macd_obj.macd_diff().fillna(0) adx_obj = ADXIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True) df['adx'] = adx_obj.adx().fillna(20) df['adx_pos'] = adx_obj.adx_pos().fillna(0) df['adx_neg'] = adx_obj.adx_neg().fillna(0) df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema_50'] = df['close'].ewm(span=50, adjust=False).mean() df['ema_100'] = df['close'].ewm(span=100, adjust=False).mean() df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).money_flow_index().fillna(50) df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume'], fillna=True).on_balance_volume().fillna(method='ffill') df['cmf'] = ChaikinMoneyFlowIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=20, fillna=True).chaikin_money_flow().fillna(0) df['volume_sma'] = df['volume'].rolling(window=20, min_periods=10).mean() # Handle potential division by zero or NaN in volume_sma df['volume_ratio'] = (df['volume'] / df['volume_sma'].replace(0, np.nan)).replace([np.inf, -np.inf], 1.0).fillna(1.0) df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True).average_true_range().fillna(method='ffill').fillna(0) # --- Calculate NEW Indicators --- bb_obj = BollingerBands(close=df['close'], window=20, window_dev=2, fillna=True) df['bb_hband'] = bb_obj.bollinger_hband() df['bb_lband'] = bb_obj.bollinger_lband() df['bb_mavg'] = bb_obj.bollinger_mavg() df['bb_width'] = bb_obj.bollinger_wband() ichi_obj = IchimokuIndicator(high=df['high'], low=df['low'], window1=9, window2=26, window3=52, fillna=True) df['ichi_a'] = ichi_obj.ichimoku_a() df['ichi_b'] = ichi_obj.ichimoku_b() df['ichi_base'] = ichi_obj.ichimoku_base_line() df['ichi_conv'] = ichi_obj.ichimoku_conversion_line() df['will_r'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close'], lbp=14, fillna=True).williams_r().fillna(-50) df['vwap'] = VolumeWeightedAveragePrice(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).volume_weighted_average_price().fillna(method='ffill') vortex_obj = VortexIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True) df['vortex_pos'] = vortex_obj.vortex_indicator_pos() df['vortex_neg'] = vortex_obj.vortex_indicator_neg() # --- End Indicators --- # Fill any remaining NaNs using forward fill first, then backward fill # This helps ensure indicators near the start/end are usable for signals/backtest df.ffill(inplace=True) df.bfill(inplace=True) # Ensure sufficient length remains after potential NaNs at the start min_usable_length = 50 # Minimum needed for signal generation logic using prev row if len(df) < min_usable_length: print(f"Data too short after indicator calculation & filling for {timeframe} (needs >{min_usable_length}, got {len(df)})") return None return df.copy() # Use copy to avoid SettingWithCopyWarning later except Exception as e: print(f"Error calculating indicators for {timeframe}: {e}\n{traceback.format_exc()}") return None def generate_signals_and_values(self, df): """ Generates signals based on latest indicator values, including new ones. Implements the "K-map" concept via multi-factor confirmation scoring. Returns: - final_signals (dict): Dictionary of individual indicator signals {indicator_name: signal (-1, 0, 1)}. - values (dict): Dictionary of raw indicator values {indicator_name: value}. - signal_direction (int): Overall signal direction (-1, 0, 1) based on composite score. """ if df is None or not isinstance(df, pd.DataFrame) or len(df.index) < 2: # Check length using index after potential cleaning # print("Signal Gen Skip: DataFrame invalid or too short.") return None, None, None try: # Ensure index is datetime for proper iloc selection if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) df = df.sort_index() # Ensure sorting after conversion # Use .iloc for positional access (robust to non-sequential indices) latest = df.iloc[-1] prev = df.iloc[-2] signals = {} # Stores intermediate signals (can be 0.5, -0.5) final_signals = {} # Stores final signals (-1, 0, 1) values = {} # Store raw values # --- Store latest values (including new ones) --- values['price'] = latest['close'] values['volume'] = latest['volume'] values['timestamp'] = latest.name.strftime('%Y-%m-%d %H:%M:%S') # Use index name values['atr'] = latest['atr'] # Momentum values['rsi'] = latest['rsi'] values['stoch_k'] = latest['stoch_k']; values['stoch_d'] = latest['stoch_d'] values['ao'] = latest['ao'] values['will_r'] = latest['will_r'] # New # Trend values['macd'] = latest['macd']; values['macd_signal'] = latest['macd_signal']; values['macd_diff'] = latest['macd_diff'] values['adx'] = latest['adx']; values['adx_pos'] = latest['adx_pos']; values['adx_neg'] = latest['adx_neg'] values['ema_20'] = latest['ema_20']; values['ema_50'] = latest['ema_50']; values['ema_100'] = latest['ema_100'] values['ichi_a'] = latest['ichi_a']; values['ichi_b'] = latest['ichi_b']; values['ichi_base'] = latest['ichi_base']; values['ichi_conv'] = latest['ichi_conv'] # New values['vortex_pos'] = latest['vortex_pos']; values['vortex_neg'] = latest['vortex_neg'] # New # Volume values['mfi'] = latest['mfi'] values['obv'] = latest['obv'] values['cmf'] = latest['cmf'] values['volume_ratio'] = latest['volume_ratio'] values['vwap'] = latest['vwap'] # New (volume-related) # Volatility values['bb_hband'] = latest['bb_hband']; values['bb_lband'] = latest['bb_lband']; values['bb_mavg'] = latest['bb_mavg']; values['bb_width'] = latest['bb_width'] # New # --- # --- Generate Signals (Using intermediate 'signals' dict) --- # RSI (Momentum) signals['rsi'] = 1 if latest['rsi'] < 30 else (-1 if latest['rsi'] > 70 else 0) # Stochastic (Momentum) - Crossover signal signals['stoch'] = 1 if latest['stoch_k'] < 25 and prev['stoch_k'] <= prev['stoch_d'] and latest['stoch_k'] > latest['stoch_d'] else (-1 if latest['stoch_k'] > 75 and prev['stoch_k'] >= prev['stoch_d'] and latest['stoch_k'] < latest['stoch_d'] else 0) # Awesome Oscillator (Momentum) - Zero cross and twin peaks (simplified) if latest['ao'] > 0 and prev['ao'] <= 0: signals['ao'] = 1 elif latest['ao'] < 0 and prev['ao'] >= 0: signals['ao'] = -1 elif latest['ao'] > 0 and prev['ao'] > 0 and latest['ao'] > prev['ao']: signals['ao'] = 0.5 # Bullish momentum elif latest['ao'] < 0 and prev['ao'] < 0 and latest['ao'] < prev['ao']: signals['ao'] = -0.5 # Bearish momentum else: signals['ao'] = 0 # Williams %R (Momentum) - Exiting Overbought/Oversold signals['will_r'] = 1 if latest['will_r'] > -20 and prev['will_r'] <= -20 else (-1 if latest['will_r'] < -80 and prev['will_r'] >= -80 else 0) # MACD (Trend/Momentum) - Line cross Signal if (latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']): signals['macd'] = 1 elif (latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']): signals['macd'] = -1 else: signals['macd'] = 0 # ADX (Trend Strength) - Directional movement signals['adx'] = 1 if latest['adx'] > 25 and latest['adx_pos'] > latest['adx_neg'] else (-1 if latest['adx'] > 25 and latest['adx_neg'] > latest['adx_pos'] else 0) # EMA Trend (Trend) if latest['close'] > latest['ema_20'] and latest['ema_20'] > latest['ema_50'] and latest['ema_50'] > latest['ema_100']: signals['ema_trend'] = 1 elif latest['close'] < latest['ema_20'] and latest['ema_20'] < latest['ema_50'] and latest['ema_50'] < latest['ema_100']: signals['ema_trend'] = -1 elif latest['close'] > latest['ema_50']: signals['ema_trend'] = 0.5 # Price above mid-term EMA elif latest['close'] < latest['ema_50']: signals['ema_trend'] = -0.5 # Price below mid-term EMA else: signals['ema_trend'] = 0 # Ichimoku (Trend) - TK cross, Cloud position, Price vs Kijun ichi_signal = 0 tenkan_cross_kijun_up = latest['ichi_conv'] > latest['ichi_base'] and prev['ichi_conv'] <= prev['ichi_base'] tenkan_cross_kijun_down = latest['ichi_conv'] < latest['ichi_base'] and prev['ichi_conv'] >= prev['ichi_base'] above_cloud = latest['close'] > latest['ichi_a'] and latest['close'] > latest['ichi_b'] below_cloud = latest['close'] < latest['ichi_a'] and latest['close'] < latest['ichi_b'] price_above_kijun = latest['close'] > latest['ichi_base'] price_below_kijun = latest['close'] < latest['ichi_base'] if tenkan_cross_kijun_up and above_cloud and price_above_kijun: ichi_signal = 1 # Strong Bullish elif tenkan_cross_kijun_down and below_cloud and price_below_kijun: ichi_signal = -1 # Strong Bearish elif above_cloud and price_above_kijun and latest['ichi_conv'] > latest['ichi_base']: ichi_signal = 0.5 # Bullish Bias elif below_cloud and price_below_kijun and latest['ichi_conv'] < latest['ichi_base']: ichi_signal = -0.5 # Bearish Bias signals['ichimoku'] = ichi_signal # Vortex (Trend) - Crossover if latest['vortex_pos'] > latest['vortex_neg'] and prev['vortex_pos'] <= prev['vortex_neg']: signals['vortex'] = 1 elif latest['vortex_neg'] > latest['vortex_pos'] and prev['vortex_neg'] <= prev['vortex_pos']: signals['vortex'] = -1 else: signals['vortex'] = 0 # MFI (Volume/Momentum) signals['mfi'] = 1 if latest['mfi'] < 20 else (-1 if latest['mfi'] > 80 else 0) # CMF (Volume/Flow) signals['cmf'] = 1 if latest['cmf'] > 0.05 else (-1 if latest['cmf'] < -0.05 else 0) # OBV (Volume/Trend) - Simple trend vs moving average if len(df) > 5: try: obv_sma5 = df['obv'].rolling(window=5).mean().iloc[-1] # Check for NaN sma due to insufficient data at the start if pd.notna(obv_sma5): signals['obv_trend'] = 1 if latest['obv'] > obv_sma5 else (-1 if latest['obv'] < obv_sma5 else 0) else: signals['obv_trend'] = 0 except IndexError: # Catch potential index error if rolling mean fails near start signals['obv_trend'] = 0 else: signals['obv_trend'] = 0 # Volume Spike (Volume) - Relative to recent average if latest['volume_ratio'] > 1.8: # Threshold for "high volume" # Signal direction based on candle close vs open during spike signals['vol_spike'] = 0.5 if latest['close'] > latest['open'] else (-0.5 if latest['close'] < latest['open'] else 0) else: signals['vol_spike'] = 0 # VWAP (Volume/Price Level) - Price cross VWAP signals['vwap_cross'] = 1 if latest['close'] > latest['vwap'] and prev['close'] <= prev['vwap'] else (-1 if latest['close'] < latest['vwap'] and prev['close'] >= prev['vwap'] else 0) # Bollinger Bands (Volatility/Mean Reversion/Breakout) - Breakout example if latest['close'] > latest['bb_hband'] and prev['close'] <= prev['bb_hband']: signals['bbands'] = 1 elif latest['close'] < latest['bb_lband'] and prev['close'] >= prev['bb_lband']: signals['bbands'] = -1 else: signals['bbands'] = 0 # --- End Signal Logic --- # --- Final Cleanup & Composite (K-Map/Scoring Implementation) --- # Ensure all indicators used in signals have a default entry all_signal_keys = list(signals.keys()) # Get keys from the intermediate signals for k in all_signal_keys: values.setdefault(k, np.nan) # Ensure value exists even if calculation failed (use NaN) # Convert intermediate 0.5/-0.5 signals to 1/-1 for final score, store in final_signals for key, value in signals.items(): if value >= 0.5: final_signals[key] = 1 elif value <= -0.5: final_signals[key] = -1 else: final_signals[key] = 0 # Calculate composite score based on sum of FINAL (-1, 0, 1) signals non_neutral_signals = [s for s in final_signals.values() if s != 0] composite_score = sum(non_neutral_signals) # Determine overall signal direction based on score magnitude # Refined Threshold: Need ~30% net agreement among indicators, minimum 3 net signals num_indicators_signaling = len(final_signals) # Count how many indicators produced a signal signal_strength_threshold = max(3, int(num_indicators_signaling * 0.30)) signal_direction = 0 if num_indicators_signaling > 0: # Avoid division by zero if no signals generated # Check if composite score meets the threshold if composite_score >= signal_strength_threshold: signal_direction = 1 elif composite_score <= -signal_strength_threshold: signal_direction = -1 # Ensure all keys from values (excluding non-indicator ones) are in final_signals with 0 if not set value_keys_for_signals = [k for k in values.keys() if k not in ['price', 'volume', 'timestamp', 'atr']] for k in value_keys_for_signals: final_signals.setdefault(k, 0) # Default to neutral if no signal logic applied return final_signals, values, signal_direction except KeyError as e: print(f"KeyError during signal generation (likely missing indicator column: {e}) in DF columns: {df.columns if df is not None else 'None'}. Check calculation step.") return None, None, None except IndexError as e: print(f"IndexError during signal generation (likely insufficient data rows for prev/latest): {e}. DF length: {len(df) if df is not None else 0}") return None, None, None except Exception as e: print(f"Error generating signals/values: {e}\n{traceback.format_exc()}") return None, None, None def calculate_trade_params(self, values, signal_direction): """Calculate Entry, SL, TP1, TP2 based on ATR""" params = {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} try: # Validate necessary inputs more rigorously if signal_direction == 0 or \ not values or \ pd.isna(values.get('atr')) or values.get('atr', 0) <= 0 or \ pd.isna(values.get('price')) or values.get('price', 0) <= 0: return params # No signal or invalid data entry_price = values['price'] atr_val = values['atr'] # Added check here as well if atr_val <= 0 or entry_price <= 0 : return params params['entry'] = entry_price if signal_direction == 1: # Long params['sl'] = entry_price - ATR_SL_MULTIPLIER * atr_val params['tp1'] = entry_price + ATR_TP1_MULTIPLIER * atr_val params['tp2'] = entry_price + ATR_TP2_MULTIPLIER * atr_val elif signal_direction == -1: # Short params['sl'] = entry_price + ATR_SL_MULTIPLIER * atr_val params['tp1'] = entry_price - ATR_TP1_MULTIPLIER * atr_val params['tp2'] = entry_price - ATR_TP2_MULTIPLIER * atr_val # Ensure SL/TP are valid numbers and positive # Also check if SL crossed entry (e.g., due to very small ATR) - invalidate if so. if pd.isna(params['sl']) or pd.isna(params['tp1']) or pd.isna(params['tp2']) or \ params['sl'] <= 0 or params['tp1'] <= 0 or params['tp2'] <= 0 or \ (signal_direction == 1 and params['sl'] >= params['entry']) or \ (signal_direction == -1 and params['sl'] <= params['entry']): # print(f"Warning: Invalid SL/TP (NaN, <=0, or SL crossed entry) for entry {entry_price:.5f}, ATR {atr_val:.5f}. Signal: {signal_direction}") # Reset params if invalid return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} # Calculate potential leveraged profit for $1 (simplified) fee = SIMULATED_FEE_PERCENT / 100.0 for lev in LEVERAGES: # Ratios based on entry price profit_ratio_tp1 = abs(params['tp1'] - entry_price) / entry_price loss_ratio_sl = abs(params['sl'] - entry_price) / entry_price # Calculate gross P/L ratio with leverage leveraged_profit_tp1 = profit_ratio_tp1 * lev leveraged_loss_sl = loss_ratio_sl * lev # Calculate fee impact (applied to leveraged position size) fee_impact = 2 * fee * lev # Entry fee + Exit fee on leveraged amount # Net profit/loss per $1 invested (considering $1 as margin) dollar_profit_tp1 = leveraged_profit_tp1 - fee_impact dollar_loss_sl = -leveraged_loss_sl - fee_impact # Loss is negative params['lev_profit'][f'{lev}x'] = {'tp1_profit_$': round(dollar_profit_tp1, 3), 'sl_loss_$': round(dollar_loss_sl, 3)} return params except Exception as e: print(f"Error calculating trade params for signal {signal_direction}, values {values}: {e}\n{traceback.format_exc()}") return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} def _run_simple_backtest(self, symbol, timeframe, df): """VERY Basic backtest simulation on the provided DataFrame (Uses longer history).""" default_result = {'symbol': symbol, 'timeframe': timeframe, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0} # Need enough history + buffer for indicator lookbacks min_backtest_len = BACKTEST_HISTORY_CANDLES + 50 # Need ~50 for lookback before backtest starts # Check if DataFrame is valid and has enough rows if df is None or not isinstance(df, pd.DataFrame) or len(df) < min_backtest_len: # print(f"BT Skip {symbol} {timeframe}: Not enough data ({len(df) if df is not None else 0} < {min_backtest_len})") return default_result try: # Ensure index is datetime and sorted for correct slicing if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index) df = df.sort_index() # Select the slice for backtesting simulation # Use iloc for robustness to non-sequential indices after cleaning backtest_start_iloc = len(df) - BACKTEST_HISTORY_CANDLES if backtest_start_iloc < 0: backtest_start_iloc = 0 # Should not happen with check above, but safety # Iterate through the candles *within the backtest period* # Signal is generated using data *up to* candle i-1 close # Trade entry occurs at candle i open # Exit checks happen based on candle i high/low trades = [] in_position = False entry_price = 0 position_direction = 0 # 1 for long, -1 for short stop_loss = 0 take_profit = 0 # Using TP1 for this simple backtest entry_timestamp = None # For debugging/tracking # Loop from the start of the backtest period + 1 (need previous candle for signal) # Ensure we have enough lookback *before* the first signal candle (iloc-based) # Ensure lookback of at least 50 candles for indicators first_signal_candle_idx = max(50, backtest_start_iloc) # Start generating signals from here for i in range(first_signal_candle_idx, len(df)): current_row = df.iloc[i] signal_candle_iloc = i - 1 # Signal based on close of previous candle (iloc) # Slice original df up to and including the signal candle # Use iloc slicing for performance and robustness # Ensure the slice is valid if signal_candle_iloc < 1: continue # Need at least 2 rows for signal calc df_for_signal_calc = df.iloc[:signal_candle_iloc + 1] # Check if df_for_signal_calc is valid before generating signal if df_for_signal_calc is None or len(df_for_signal_calc) < 2: continue # Skip if not enough data for signal calc # Generate signal based on data ending at the previous candle's close sim_signals, sim_values, sim_direction = self.generate_signals_and_values(df_for_signal_calc) # --- Entry Logic --- # Enter only if not already in position AND a clear signal occurs AND we have values if not in_position and sim_direction != 0 and sim_values: # Use ATR from the *signal candle* for SL/TP calc atr_at_entry = sim_values.get('atr') # Enter at current candle's open price entry_price_candidate = current_row['open'] entry_timestamp = current_row.name # Timestamp of entry candle # Validate entry conditions before taking position if pd.notna(entry_price_candidate) and entry_price_candidate > 0 and \ pd.notna(atr_at_entry) and atr_at_entry > 0: # Calculate potential SL/TP *before* deciding to enter fully if sim_direction == 1: # Long potential_sl = entry_price_candidate - ATR_SL_MULTIPLIER * atr_at_entry potential_tp = entry_price_candidate + ATR_TP1_MULTIPLIER * atr_at_entry else: # Short potential_sl = entry_price_candidate + ATR_SL_MULTIPLIER * atr_at_entry potential_tp = entry_price_candidate - ATR_TP1_MULTIPLIER * atr_at_entry # Basic sanity check for SL/TP (positive values and SL not crossing entry) if not (pd.isna(potential_sl) or pd.isna(potential_tp) or potential_sl <= 0 or potential_tp <= 0 or \ (sim_direction == 1 and potential_sl >= entry_price_candidate) or \ (sim_direction == -1 and potential_sl <= entry_price_candidate)): # All checks passed, commit to position in_position = True position_direction = sim_direction entry_price = entry_price_candidate stop_loss = potential_sl take_profit = potential_tp # print(f"BT Enter {symbol} {timeframe} @ {entry_timestamp}: Dir={position_direction} Entry={entry_price:.4f} SL={stop_loss:.4f} TP={take_profit:.4f} ATR={atr_at_entry:.4f}") # else: print(f"BT Skip Entry {symbol} {timeframe}: Invalid entry conditions (Price:{entry_price_candidate}, ATR:{atr_at_entry})") # --- Exit Logic --- # Check exits only if currently in a position elif in_position: exit_price = None pnl = 0 exit_reason = "N/A" current_high = current_row['high'] current_low = current_row['low'] exit_timestamp = current_row.name # Timestamp of exit check candle # Validate high/low data if pd.isna(current_high) or pd.isna(current_low): print(f"BT Warning {symbol} {timeframe}: NaN High/Low at {exit_timestamp}, cannot check exit.") continue # Skip exit check for this candle # Check SL/TP hit based on current candle's high/low # Important: Check SL first in case both are hit within the same candle if position_direction == 1: # Long if current_low <= stop_loss: exit_price = stop_loss exit_reason = "SL Hit" elif current_high >= take_profit: exit_price = take_profit exit_reason = "TP1 Hit" elif position_direction == -1: # Short if current_high >= stop_loss: exit_price = stop_loss exit_reason = "SL Hit" elif current_low <= take_profit: exit_price = take_profit exit_reason = "TP1 Hit" # If an exit condition was met if exit_price is not None: # Calculate PnL based on entry and exit # For shorts, PnL = Entry - Exit; for longs, PnL = Exit - Entry if position_direction == 1: pnl = exit_price - entry_price else: # Short pnl = entry_price - exit_price # Calculate and subtract simulated fees # Fee is % of trade value at entry and exit entry_fee = (SIMULATED_FEE_PERCENT / 100.0) * entry_price exit_fee = (SIMULATED_FEE_PERCENT / 100.0) * exit_price pnl -= (entry_fee + exit_fee) trades.append({'entry': entry_price, 'exit': exit_price, 'pnl': pnl, 'direction': position_direction}) # print(f"BT Exit {symbol} {timeframe} @ {exit_timestamp}: Reason={exit_reason} ExitPx={exit_price:.4f} PnL={pnl:.4f} (Entry @ {entry_price:.4f} on {entry_timestamp})") in_position = False # Reset position state after closing trade entry_timestamp = None # Reset entry timestamp # --- Summarize Results --- num_trades = len(trades) if num_trades > 0: wins = sum(1 for t in trades if t['pnl'] > 0) win_rate = (wins / num_trades * 100) total_pnl = sum(t['pnl'] for t in trades) # Calculate PnL % sum based on entry price (approximation of capital growth) pnl_percentage_sum = sum((t['pnl'] / t['entry']) * 100 for t in trades if t['entry'] > 0) else: win_rate = 0 total_pnl = 0 pnl_percentage_sum = 0 # print(f"BT Summary {symbol} {timeframe}: Trades={num_trades}, WinRate={win_rate:.2f}%, PnL Sum={total_pnl:.5f}, PnL % Sum={pnl_percentage_sum:.2f}%") return { 'symbol': symbol, 'timeframe': timeframe, 'trades': num_trades, 'win_rate': round(win_rate, 2), 'pnl_sum': round(total_pnl, 5), 'pnl_%_sum': round(pnl_percentage_sum, 2) } except Exception as e: print(f"Error during backtest simulation for {symbol} {timeframe}: {e}\n{traceback.format_exc()}") # Print details about the DataFrame state at error might help debugging # print(f"DF info at error for {symbol} {timeframe}:") # try: df.info() # except: print("Could not get DF info") return default_result def analyze_symbol(self, symbol, progress): """Analyzes a single symbol across all valid timeframes.""" timeframe_details = {} heatmap_composites = {} symbol_backtest_results = [] log_msgs = [] symbol_active_signals = {} # Store active signals {tf: {details}} for this symbol symbol_hover_details = {} # Store hover details {tf: {ind: val}} for this symbol if not self.valid_timeframes: return timeframe_details, heatmap_composites, symbol_backtest_results, [f"No valid timeframes for {symbol}."], {}, {} # Add small delay before starting analysis for a symbol to help with rate limits time.sleep(0.1) for i, timeframe in enumerate(self.valid_timeframes): progress(i / len(self.valid_timeframes), desc=f"Fetching {symbol} [{timeframe}]") # Fetch data ohlcv, err_msg = self.fetch_ohlcv_data(symbol, timeframe) if err_msg: log_msgs.append(f"Data fetch skip: {symbol} [{timeframe}] {err_msg}") heatmap_composites[timeframe] = 0 # Run backtest with None to get a default failure entry symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None)) symbol_hover_details[timeframe] = {} # Placeholder time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.3)) # Longer sleep on error continue # Calculate indicators progress((i + 0.3) / len(self.valid_timeframes), desc=f"Calculating Ind. {symbol} [{timeframe}]") df = self.calculate_indicators(ohlcv, timeframe) if df is None or df.empty: log_msgs.append(f"Indicator calc skip: {symbol} [{timeframe}] (DataFrame invalid or empty)") heatmap_composites[timeframe] = 0 # Run backtest with None symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None)) symbol_hover_details[timeframe] = {} # Placeholder time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2)) continue # Generate signals/values based on the *latest* data progress((i + 0.6) / len(self.valid_timeframes), desc=f"Generating Sig. {symbol} [{timeframe}]") signals, values, signal_direction = self.generate_signals_and_values(df) if signals is not None and values is not None: # Calculate trade params based on the latest signal trade_params = self.calculate_trade_params(values, signal_direction) # Store all details for this timeframe timeframe_details[timeframe] = { 'signals': signals, # Individual indicator signals (-1, 0, 1) 'values': values, # Raw indicator values 'trade_params': trade_params, # Entry, SL, TP etc. 'direction': signal_direction # Overall signal direction (-1, 0, 1) } # Calculate composite for heatmap (normalized sum of final signals) num_potential_signals = len(signals) composite = sum(signals.values()) / num_potential_signals if num_potential_signals > 0 else 0 heatmap_composites[timeframe] = composite # Store key values for hover text symbol_hover_details[timeframe] = { 'Price': values.get('price', np.nan), # Add price to hover 'RSI': values.get('rsi', np.nan), 'MACD': values.get('macd', np.nan), 'StochK': values.get('stoch_k', np.nan), 'ADX': values.get('adx', np.nan), 'WillR': values.get('will_r', np.nan), } # Store active signal IF direction is non-zero AND trade params are valid if signal_direction != 0 and trade_params.get('entry') is not None: active_sig_details = { 'direction': signal_direction, 'entry': trade_params['entry'], 'sl': trade_params['sl'], 'tp1': trade_params['tp1'], 'tp2': trade_params['tp2'] } symbol_active_signals[timeframe] = active_sig_details # --- Log Signal to CSV --- log_data = { 'LogTimestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # Log time 'SignalCandleTime': values.get('timestamp', 'N/A'), # Time of candle signal was based on 'Symbol': symbol, 'Timeframe': timeframe, 'Direction': 'LONG' if signal_direction == 1 else 'SHORT', 'Entry': f"{trade_params['entry']:.8f}", # Use more precision for logging 'SL': f"{trade_params['sl']:.8f}", 'TP1': f"{trade_params['tp1']:.8f}", 'TP2': f"{trade_params['tp2']:.8f}", 'Status': 'Triggered' # Initial status # Optional: Add key indicator values #'RSI': f"{values.get('rsi', np.nan):.2f}", #'MACD_Diff': f"{values.get('macd_diff', np.nan):.6f}", #'ADX': f"{values.get('adx', np.nan):.2f}" } log_signal_to_csv(log_data) # --- End CSV Logging --- else: log_msgs.append(f"Signal gen skip: {symbol} [{timeframe}]") heatmap_composites[timeframe] = 0 symbol_hover_details[timeframe] = {} # Placeholder # Run backtest using the full historical dataframe calculated earlier progress((i + 0.9) / len(self.valid_timeframes), desc=f"Backtesting {symbol} [{timeframe}]") # IMPORTANT: Pass a COPY of df to backtest to avoid modification issues bt_result = self._run_simple_backtest(symbol, timeframe, df.copy()) symbol_backtest_results.append(bt_result) # Sleep based on exchange rate limit (minimum 0.2s) time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2)) return timeframe_details, heatmap_composites, symbol_backtest_results, log_msgs, symbol_active_signals, symbol_hover_details def run_full_analysis(self, progress=gr.Progress()): """Runs the complete analysis for top coins across selected timeframes.""" self.analysis_results = {} self.backtest_results = [] # Reset backtest results self.active_signals_df = pd.DataFrame() # Reset active signals self.heatmap_details = {} # Reset hover details all_log_msgs = [] all_active_signals_list = [] # Temp list to build the DataFrame # Start Fetching Coins progress(0, desc="Fetching top coins...") symbols, msg = self.fetch_top_coins() all_log_msgs.append(msg) if not symbols: self.heatmap_df = pd.DataFrame() all_log_msgs.append("Error: No symbols found to analyze. Stopping.") # Return empty structures return {}, pd.DataFrame(), [], pd.DataFrame(), {}, all_log_msgs # Start Analyzing Symbols heatmap_data_list = [] # For building heatmap DataFrame [{Coin: X, tf1: score1, ...}, ...] total_symbols = len(symbols) progress(0.05, desc=f"Starting analysis for {total_symbols} coins...") for idx, symbol in enumerate(symbols): # Calculate progress for this symbol symbol_progress_start = 0.05 + (idx / total_symbols) * 0.90 # Leave space at end symbol_progress_range = (1 / total_symbols) * 0.90 symbol_progress_desc = f"Analyzing {symbol} ({idx+1}/{total_symbols})" progress(symbol_progress_start, desc=symbol_progress_desc) # Create a lambda for inner progress updates relative to this symbol's range symbol_progress_tracker = lambda p, desc=symbol_progress_desc: progress( symbol_progress_start + (p * symbol_progress_range), desc=desc ) try: # Analyze symbol (includes signal gen, trade param calc, backtest sim) # Returns: details per tf, heatmap scores per tf, backtest results list, logs, active signals dict, hover details dict tf_details, tf_heatmap_scores, symbol_bt_results, log_msgs, symbol_active_tf_signals, symbol_tf_hover_details = \ self.analyze_symbol(symbol, symbol_progress_tracker) all_log_msgs.extend(log_msgs) self.backtest_results.extend(symbol_bt_results) # Aggregate backtest results # Only process if we got some valid details back for the symbol if tf_details: # Check if the dict is not empty self.analysis_results[symbol] = tf_details # Store full details coin_name = symbol.split('/')[0] heatmap_row = {'Coin': coin_name} symbol_hover_data = {} # To store hover details for this coin # Populate heatmap row scores and collect hover details for the coin for tf in self.valid_timeframes: heatmap_row[tf] = tf_heatmap_scores.get(tf, 0) # Get score, default 0 symbol_hover_data[tf] = symbol_tf_hover_details.get(tf, {}) # Get hover dict, default empty heatmap_data_list.append(heatmap_row) self.heatmap_details[coin_name] = symbol_hover_data # Store hover details keyed by coin name # Add any active signals found for this symbol to the main list for tf, active_sig in symbol_active_tf_signals.items(): # Use more precision for the active signals table as well all_active_signals_list.append({ 'Symbol': symbol, 'Timeframe': tf, 'Direction': 'LONG' if active_sig['direction'] == 1 else 'SHORT', 'Entry': f"{active_sig['entry']:.8f}", # Format for display 'SL': f"{active_sig['sl']:.8f}", 'TP1': f"{active_sig['tp1']:.8f}", 'TP2': f"{active_sig['tp2']:.8f}", }) except ccxt.RateLimitExceeded as e: wait_time = 60 # Longer wait time all_log_msgs.append(f"Rate limit exceeded analyzing {symbol}. Sleeping for {wait_time}s... {e}") print(all_log_msgs[-1]) progress(symbol_progress_start + symbol_progress_range * 0.9, desc=f"Rate Limit Hit! Waiting {wait_time}s...") # Update progress during wait time.sleep(wait_time) # Optionally, you might want to retry the symbol analysis here (more complex) except Exception as e: error_msg = f"Critical error processing {symbol}: {e}\n{traceback.format_exc()}" all_log_msgs.append(error_msg) print(error_msg) # Add placeholder row to heatmap and default backtest results if analysis failed critically coin_name = symbol.split('/')[0] row = {'Coin': coin_name} for tf in self.valid_timeframes: row[tf] = 0 heatmap_data_list.append(row) self.heatmap_details[coin_name] = {tf: {} for tf in self.valid_timeframes} # Empty hover details for tf in self.valid_timeframes: # Add default failed backtest result for this timeframe self.backtest_results.append({'symbol': symbol, 'timeframe': tf, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0}) progress(0.95, desc="Finalizing results...") # Progress before final processing # --- Final Processing --- if not heatmap_data_list: self.heatmap_df = pd.DataFrame() all_log_msgs.append("Warning: No heatmap data generated (no symbols processed successfully?).") else: self.heatmap_df = pd.DataFrame(heatmap_data_list).set_index('Coin') # Ensure columns are ordered correctly according to valid_timeframes if self.valid_timeframes: # Filter out columns not in valid_timeframes (if any slipped through) cols_to_keep = [tf for tf in self.valid_timeframes if tf in self.heatmap_df.columns] self.heatmap_df = self.heatmap_df[cols_to_keep] # Reindex to ensure all valid_timeframes are present, filling missing with 0 # Make sure valid_timeframes is used for columns self.heatmap_df = self.heatmap_df.reindex(columns=self.valid_timeframes, fill_value=0) self.heatmap_df.index.name = 'Coin' # Create Active Signals DataFrame from the collected list if all_active_signals_list: self.active_signals_df = pd.DataFrame(all_active_signals_list) # Sort for better presentation self.active_signals_df['tf_order'] = self.active_signals_df['Timeframe'].map(TIMEFRAME_ORDER_MAP) self.active_signals_df = self.active_signals_df.sort_values(by=['Symbol', 'tf_order']).drop('tf_order', axis=1) # Reorder columns for consistency self.active_signals_df = self.active_signals_df[['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']] else: # Create empty DF with correct columns if no signals found self.active_signals_df = pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']) # Save backtest results to CSV if self.backtest_results: # Filter out any potential None results before creating DataFrame valid_bt_results = [res for res in self.backtest_results if isinstance(res, dict)] if valid_bt_results: bt_df = pd.DataFrame(valid_bt_results) try: bt_df.to_csv(BACKTEST_RESULTS_FILE, index=False) all_log_msgs.append(f"Backtest summary saved to {BACKTEST_RESULTS_FILE}") except Exception as e: all_log_msgs.append(f"Error saving backtest results: {e}") print(f"Error saving backtest results: {e}\n{traceback.format_exc()}") else: all_log_msgs.append("No valid backtest results generated to save.") else: all_log_msgs.append("No backtest results generated.") progress(1, desc="Analysis complete.") all_log_msgs.append("Analysis Complete.") if os.path.exists(SIGNAL_LOG_FILE): all_log_msgs.append(f"Signals logged to {SIGNAL_LOG_FILE}") # Return all generated data return self.analysis_results, self.heatmap_df, self.backtest_results, self.active_signals_df, self.heatmap_details, all_log_msgs # --- Gradio Helper Functions --- def create_plotly_heatmap(df, heatmap_details): """Creates heatmap with enhanced hover text including key indicator values.""" if df is None or df.empty: print("Heatmap DF is empty, returning empty figure.") return go.Figure(layout=go.Layout(title="No data available for heatmap", height=300)) colorscale = pcolors.diverging.RdYlGn # Ensure data is numeric for heatmap values, coerce errors try: # Ensure index and columns are strings for processing if needed df.index = df.index.astype(str) df.columns = df.columns.astype(str) heatmap_values = df.apply(pd.to_numeric, errors='coerce').fillna(0).values rows = list(df.index) cols = list(df.columns) except Exception as e: print(f"Error preparing heatmap data: {e}") return go.Figure(layout=go.Layout(title=f"Error preparing heatmap: {e}", height=300)) # Create hover text matrix using heatmap_details hover_texts = [] for r_idx, coin in enumerate(rows): row_texts = [] coin_details_by_tf = heatmap_details.get(coin, {}) # Get details for the coin (keyed by BASE coin name now) for c_idx, tf in enumerate(cols): try: score = heatmap_values[r_idx, c_idx] # Lookup details for this specific coin/tf from the pre-computed dict details = coin_details_by_tf.get(tf, {}) # Get details for this TF # Format values from details, handle N/A or missing data gracefully price_val = details.get('Price', 'N/A') rsi_val = details.get('RSI', 'N/A') macd_val = details.get('MACD', 'N/A') stochk_val = details.get('StochK', 'N/A') adx_val = details.get('ADX', 'N/A') willr_val = details.get('WillR', 'N/A') # Build hover text string text = f"Coin: {coin}
" text += f"Timeframe: {tf}
" # Use more precision for price display in hover text += f"Price: {float(price_val):.8f}
" if isinstance(price_val, (int, float)) and not pd.isna(price_val) else f"Price: {price_val}
" text += f"Score: {score:.3f}
" # Show score from heatmap itself text += "----------
" # Format indicator values nicely try: text += f"RSI: {float(rsi_val):.1f}
" if isinstance(rsi_val, (int, float)) and not pd.isna(rsi_val) else f"RSI: {rsi_val}
" except: text += f"RSI: {rsi_val}
" # Fallback if conversion fails try: text += f"MACD: {float(macd_val):.6f}
" if isinstance(macd_val, (int, float)) and not pd.isna(macd_val) else f"MACD: {macd_val}
" except: text += f"MACD: {macd_val}
" try: text += f"Stoch K: {float(stochk_val):.1f}
" if isinstance(stochk_val, (int, float)) and not pd.isna(stochk_val) else f"Stoch K: {stochk_val}
" except: text += f"Stoch K: {stochk_val}
" try: text += f"ADX: {float(adx_val):.1f}
" if isinstance(adx_val, (int, float)) and not pd.isna(adx_val) else f"ADX: {adx_val}
" except: text += f"ADX: {adx_val}
" try: text += f"Will %R: {float(willr_val):.1f}
" if isinstance(willr_val, (int, float)) and not pd.isna(willr_val) else f"Will %R: {willr_val}
" except: text += f"Will %R: {willr_val}
" text += "" # Hide default Plotly hover labels row_texts.append(text) except Exception as hover_e: print(f"Error generating hover text for {coin}/{tf}: {hover_e}") row_texts.append(f"Error displaying hover for {coin}/{tf}") # Add error placeholder hover_texts.append(row_texts) try: fig = go.Figure(data=go.Heatmap( z=heatmap_values, x=cols, y=rows, colorscale=colorscale, zmid=0, zmin=-0.6, zmax=0.6, # Adjusted range based on normalized score hoverongaps=False, hoverinfo='text', # Use the custom text matrix for hover text=hover_texts, # Assign the text matrix texttemplate=None # Disable texttemplate when using hoverinfo='text' )) fig.update_layout( title='Cryptocurrency Signal Strength Heatmap (Hover for Key Values, Click Cell for Full Details)', xaxis_title='Timeframe', yaxis_title='Coin', yaxis={'tickmode': 'linear', 'tickfont': {'size': 9}, 'automargin': True}, # Automargin for y-axis labels xaxis={'tickmode': 'linear'}, height=max(450, len(rows) * 18 + 100), # Dynamic height margin=dict(l=70, r=50, t=60, b=50) ) # print("Plotly heatmap figure created successfully.") # Debug print return fig except Exception as e: print(f"Error creating Plotly heatmap figure: {e}\n{traceback.format_exc()}") # Return an empty figure with error message return go.Figure(layout=go.Layout(title=f"Error creating heatmap: {e}", height=400)) def format_heatmap_click_details(evt: gr.SelectData, current_state): """Formats detailed analysis results for the specific cell clicked on the heatmap.""" # Check if event data is valid (using SelectData attributes) if evt is None or evt.index is None or not isinstance(evt.index, (list, tuple)) or len(evt.index) != 2: # print("Debug: format_heatmap_click_details called with invalid event data:", evt) return "Click on a heatmap cell after running analysis to see details here. (Ensure you clicked a colored cell)" # Check if state and necessary data are present if not isinstance(current_state, dict) or 'analysis_results' not in current_state or 'heatmap_df' not in current_state: # print("Debug: State invalid or missing analysis_results or heatmap_df") return "Analysis data not found in state. Please run the analysis first." try: row_index, col_index = evt.index heatmap_df = current_state.get('heatmap_df') analysis_data = current_state.get('analysis_results') # This holds the full nested dict {symbol: {tf: details}} if heatmap_df is None or heatmap_df.empty or analysis_data is None: # print("Debug: Heatmap or analysis data is None or empty.") return "Heatmap or analysis data is not available. Please run analysis." # Validate indices against the DataFrame dimensions if not (0 <= row_index < len(heatmap_df.index) and 0 <= col_index < len(heatmap_df.columns)): # print(f"Debug: Indices out of bounds. Row: {row_index} (max: {len(heatmap_df.index)-1}), Col: {col_index} (max: {len(heatmap_df.columns)-1})") return "Error: Clicked cell index is out of bounds." coin_name = heatmap_df.index[row_index] # This is the base coin (e.g., 'BTC') timeframe = heatmap_df.columns[col_index] # print(f"Debug: Clicked on Coin: {coin_name}, Timeframe: {timeframe}") # Find the full symbol (e.g., BTC/USDT) in the analysis_data keys full_symbol = None for symbol_key in analysis_data.keys(): # Match based on the start of the symbol key (more robust) if symbol_key.startswith(str(coin_name) + '/'): # Ensure coin_name is string full_symbol = symbol_key break if not full_symbol or full_symbol not in analysis_data: # print(f"Debug: Full symbol not found or no data for {full_symbol} (Base: {coin_name})") return f"Details not found for {coin_name} (symbol mismatch or no analysis data?)." # Get the specific details for the symbol and timeframe details = analysis_data.get(full_symbol, {}).get(timeframe) if details is None: # print(f"Debug: No details found for {full_symbol} on timeframe {timeframe}") return f"No analysis data available for {coin_name} on {timeframe}." # --- Format Markdown String --- values = details.get('values', {}) # Raw indicator values signals = details.get('signals', {}) # Final signals (-1, 0, 1) per indicator trade_params = details.get('trade_params', {}) # Entry, SL, TP etc. direction = details.get('direction', 0) # Overall signal direction if not values or not signals: # print(f"Debug: Missing 'values' or 'signals' dict in details for {full_symbol} / {timeframe}") return f"Incomplete data for {coin_name} [{timeframe}]. Cannot display details." markdown_str = f"### Details for {coin_name} ({full_symbol}) [{timeframe}]\n\n" markdown_str += f"- **Timestamp:** {values.get('timestamp', 'N/A')}\n" price = values.get('price', np.nan) markdown_str += f"- **Price:** {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- **Price:** {price}\n" volume = values.get('volume', np.nan) markdown_str += f"- **Volume:** {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- **Volume:** {volume}\n" atr = values.get('atr', np.nan) markdown_str += f"- **ATR:** {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- **ATR:** {atr}\n\n" # Display the FINAL calculated direction based on the composite score threshold markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n" markdown_str += "**Indicator Values & Individual Signals:**\n" markdown_str += "| Indicator | Value | Signal |\n" # Adjusted padding markdown_str += "|----------------|-----------------|--------|\n" # Get keys from signals dict, which should represent all indicators evaluated indicator_keys = sorted(signals.keys()) for name in indicator_keys: val = values.get(name, 'N/A') # Get raw value sig_val = signals.get(name, 0) # Get the final -1, 0, 1 signal signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)') # Formatting value carefully if isinstance(val, (int, float)) and not pd.isna(val): if abs(val) > 100000: val_str = f"{val:,.0f}" # Large integer elif abs(val) > 100: val_str = f"{val:,.2f}" # Moderate number elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision) elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision) else: val_str = f"{val:.6f}" # Default decimal (more precision) elif pd.isna(val): val_str = "NaN" else: val_str = str(val) # Non-numeric # Pad indicator name and value for alignment in Markdown table markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n" # --- Add Trade Parameters Section --- if trade_params and trade_params.get('entry') is not None: markdown_str += f"\n**Potential Trade Setup (Based on this signal & ATR):**\n" markdown_str += f"- **Direction:** {'LONG' if direction > 0 else 'SHORT'}\n" # More precision for trade params display markdown_str += f"- **Entry:** {trade_params['entry']:.8f}\n" markdown_str += f"- **Stop Loss:** {trade_params['sl']:.8f}\n" markdown_str += f"- **Take Profit 1:** {trade_params['tp1']:.8f}\n" markdown_str += f"- **Take Profit 2:** {trade_params['tp2']:.8f}\n\n" markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n" for lev, pnl_data in trade_params.get('lev_profit', {}).items(): tp1_pnl = pnl_data.get('tp1_profit_$','N/A') sl_loss = pnl_data.get('sl_loss_$','N/A') # Format P/L values tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl) sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss) markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n" else: markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe at this time.\n" # print(f"Debug: Successfully formatted details for {full_symbol} / {timeframe}") return markdown_str except IndexError: # print("Debug: IndexError during heatmap click processing.") return "Error processing click: Index out of range. Please ensure the heatmap is up to date." except KeyError as e: # print(f"Debug: KeyError processing click: Missing key '{e}'.") return f"Error processing click: Missing expected data key '{e}'. Analysis data might be incomplete." except Exception as e: print(f"Unexpected error formatting heatmap click details: {e}\n{traceback.format_exc()}") return f"An unexpected error occurred displaying details for the clicked cell: {e}" def format_coin_details(symbol, analysis_data, valid_timeframes): """Formats full details for a selected coin across all analyzed timeframes.""" if not analysis_data or symbol not in analysis_data: return f"No analysis data available for {symbol}. Please run analysis first." coin_data_per_tf = analysis_data[symbol] markdown_str = f"## Full Details for {symbol}\n\n" # Iterate through the timeframes the analysis was run for # Use the provided valid_timeframes list for consistent ordering for timeframe in valid_timeframes: markdown_str += f"---\n### Timeframe: {timeframe}\n" details = coin_data_per_tf.get(timeframe) # Check if details exist for this timeframe if details is None: markdown_str += f"*No analysis data generated for this timeframe.*\n" continue # Skip to the next timeframe # Extract components, check if they exist values = details.get('values') signals = details.get('signals') trade_params = details.get('trade_params') direction = details.get('direction') # Use the stored direction if values is None or signals is None or trade_params is None or direction is None: markdown_str += f"*Incomplete analysis data for this timeframe.*\n" continue # --- Format Section for this Timeframe (similar to heatmap click) --- markdown_str += f"- Timestamp: {values.get('timestamp', 'N/A')}\n" price = values.get('price', np.nan) markdown_str += f"- Price: {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- Price: {price}\n" volume = values.get('volume', np.nan) markdown_str += f"- Volume: {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- Volume: {volume}\n" atr = values.get('atr', np.nan) markdown_str += f"- ATR: {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- ATR: {atr}\n\n" markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n" markdown_str += "**Indicator Values & Individual Signals:**\n" markdown_str += "| Indicator | Value | Signal |\n" markdown_str += "|----------------|-----------------|--------|\n" indicator_keys = sorted(signals.keys()) for name in indicator_keys: val = values.get(name, 'N/A') sig_val = signals.get(name, 0) signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)') # Value formatting (reuse from heatmap click with more precision) if isinstance(val, (int, float)) and not pd.isna(val): if abs(val) > 100000: val_str = f"{val:,.0f}" elif abs(val) > 100: val_str = f"{val:,.2f}" elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision) elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision) else: val_str = f"{val:.6f}" # Default decimal (more precision) elif pd.isna(val): val_str = "NaN" else: val_str = str(val) markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n" # Trade Params Section if trade_params and trade_params.get('entry') is not None: markdown_str += f"\n**Potential Trade Setup:**\n" markdown_str += f"- Direction: {'LONG' if direction > 0 else 'SHORT'}\n" # More precision markdown_str += f"- Entry: {trade_params['entry']:.8f}\n" markdown_str += f"- SL: {trade_params['sl']:.8f}\n" markdown_str += f"- TP1: {trade_params['tp1']:.8f}\n" markdown_str += f"- TP2: {trade_params['tp2']:.8f}\n\n" markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n" for lev, pnl_data in trade_params.get('lev_profit', {}).items(): tp1_pnl = pnl_data.get('tp1_profit_$','N/A') sl_loss = pnl_data.get('sl_loss_$','N/A') tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl) sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss) markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n" else: markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe.\n" markdown_str += "\n---\n" # End separator for the coin return markdown_str def find_zones(heatmap_df, min_confirmation_threshold): """Finds potential long/short zones based on average heatmap score.""" if heatmap_df is None or heatmap_df.empty: print("Zone finding skipped: Heatmap DF is empty.") return [], [] try: # Calculate average score across valid timeframes for each coin # Ensure we only average over numeric columns numeric_df = heatmap_df.apply(pd.to_numeric, errors='coerce').dropna(axis=1, how='all') # Drop cols that are all NaN if numeric_df.empty: print("Zone finding skipped: No numeric timeframe columns found after coercion.") return [],[] # Calculate mean, skipping NaNs if any occurred during coercion avg_scores = numeric_df.mean(axis=1, skipna=True) # Define thresholds based on the average score range (-1 to 1 typically) # Threshold requires a minimum average positive/negative score. # Example: if min_confirmation_threshold is 0.75, zones need avg > 0.3 or avg < -0.3 zone_threshold_abs = max(0.1, min(1.0, min_confirmation_threshold * 0.4)) # Adjusted multiplier long_zone_threshold = zone_threshold_abs short_zone_threshold = -zone_threshold_abs # print(f"Debug Zones: Long Threshold={long_zone_threshold:.3f}, Short Threshold={short_zone_threshold:.3f}") # Filter based on thresholds long_zone_coins = avg_scores[avg_scores >= long_zone_threshold].sort_values(ascending=False) short_zone_coins = avg_scores[avg_scores <= short_zone_threshold].sort_values(ascending=True) # Prepare lists of tuples (Coin, Score) long_zone_list = list(zip(long_zone_coins.index.astype(str), long_zone_coins.round(3))) short_zone_list = list(zip(short_zone_coins.index.astype(str), short_zone_coins.round(3))) print(f"Zones Found: {len(long_zone_list)} long, {len(short_zone_list)} short candidates (Threshold +/- {zone_threshold_abs:.3f}).") return long_zone_list, short_zone_list except Exception as e: print(f"Error finding zones: {e}\n{traceback.format_exc()}") return [],[] # Return empty lists on error def format_backtest_summary(backtest_results): """Formats the list of backtest result dictionaries into a DataFrame for display.""" if not backtest_results: print("No backtest results to format.") # Return empty DF with correct columns if no results return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) try: # Filter out potential non-dict entries just in case valid_results = [r for r in backtest_results if isinstance(r, dict)] if not valid_results: print("No valid dictionary entries found in backtest results.") return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) df = pd.DataFrame(valid_results) # Ensure required columns exist, fill with defaults if missing required_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_sum', 'pnl_%_sum'] for col in required_cols: if col not in df.columns: print(f"Warning: Backtest result missing column '{col}', filling default.") if col in ['trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']: df[col] = 0 # Default numeric to 0 else: df[col] = 'N/A' # Default string to N/A # Sort by symbol then timeframe order df['tf_order'] = df['timeframe'].map(TIMEFRAME_ORDER_MAP).fillna(99) # Handle potential unknown timeframes df = df.sort_values(by=['symbol', 'tf_order']).drop('tf_order', axis=1) # Rename columns for better display clarity df = df.rename(columns={'pnl_sum': 'pnl_abs_sum', 'pnl_%_sum': 'pnl_%_sum_on_entry'}) # Select and reorder columns for final display display_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'] # Ensure all display columns exist before selecting final_cols = [col for col in display_cols if col in df.columns] df = df[final_cols] return df except Exception as e: print(f"Error formatting backtest summary: {e}\n{traceback.format_exc()}") # Return empty DF on error return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) # --- Gradio App Definition --- def create_gradio_app(): with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue), title="Crypto Signal & Backtest V3") as app: gr.Markdown("# Crypto Multi-Indicator, Multi-Timeframe Signal & Backtest V3") gr.Markdown(f"*Warning: Analysis uses up to **{LIMIT_PER_TIMEFRAME}** candles per timeframe and backtests on the last **{BACKTEST_HISTORY_CANDLES}**. This can be **very slow** and **API-intensive**. Rate limits may occur. Use fewer coins/timeframes for faster results. Signals are logged to **`{SIGNAL_LOG_FILE}`**.*") # Global state to store results between interactions shared_state = gr.State({ 'analysis_results': {}, # {symbol: {tf: {signals, values, trade_params, direction}}} 'heatmap_df': pd.DataFrame(), # DataFrame for heatmap display values 'heatmap_details': {}, # {Coin: {tf: {Ind: Val,...}}} for hover 'backtest_results': [], # List of backtest result dicts 'active_signals_df': pd.DataFrame(), # DataFrame of currently active signals 'valid_timeframes': [], # List of timeframes used in the last run 'analyzer': None # Instance of the analyzer class }) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Configuration") # Get available exchanges dynamically, handle potential errors try: available_exchanges = ccxt.exchanges except Exception as e: print(f"Warning: Could not fetch ccxt exchanges list: {e}") available_exchanges = [DEFAULT_EXCHANGE_ID] # Fallback exchange_input = gr.Dropdown(label="Exchange", choices=available_exchanges, value=DEFAULT_EXCHANGE_ID, interactive=True) top_n_input = gr.Slider(label="Number of Top Coins by Volume", minimum=5, maximum=100, step=5, value=DEFAULT_TOP_N_COINS, interactive=True) # Reduced Max all_timeframes = list(TIMEFRAME_ORDER_MAP.keys()) timeframe_input = gr.CheckboxGroup(label="Select Timeframes (Fewer = Faster)", choices=all_timeframes, value=DEFAULT_TIMEFRAMES, interactive=True) run_button = gr.Button("Run Analysis & Backtest", variant="primary") status_log = gr.Textbox(label="Status Log", lines=15, interactive=False, placeholder="Analysis logs will appear here...", max_lines=30) # Increased lines with gr.Column(scale=3): gr.Markdown("## Results") with gr.Tabs(): with gr.TabItem("Heatmap"): gr.Markdown("Signal strength heatmap based on composite indicator score. Hover over cells for key values, click a cell for full details below.") # Use gr.Plot which supports Plotly and the 'select' event for clicks # Explicitly set label for Plot component heatmap_plot = gr.Plot(label="Signal Heatmap", show_label=False) # show_label=False to hide the default label if desired heatmap_detail_output = gr.Markdown(label="Clicked Cell Full Details", value="*Click on a heatmap cell after analysis to see full indicator details and trade setup.*") # Placeholder text with gr.TabItem("Active Trade Setups"): gr.Markdown("### Potential Trade Setups (Current Snapshot)") gr.Markdown(f"*Shows pairs and timeframes with a non-neutral signal direction and valid ATR-based parameters from the **latest** analyzed candle. Signals logged to `{SIGNAL_LOG_FILE}`. This is NOT financial advice. DYOR!*") active_signals_table = gr.DataFrame( label="Active Signals", headers=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'], datatype=['str'] * 7, # Treat all as strings for display consistency with precision interactive=False, row_count=(10, "dynamic"), # Show ~10 rows, allow scroll col_count=(7, "fixed"), wrap=True ) with gr.TabItem("Zones"): gr.Markdown("### Potential Long/Short Zones") zone_threshold_display = max(0.1, min(1.0, DEFAULT_MIN_CONFIRMATION * 0.4)) # Calculate display threshold gr.Markdown(f"*Coins with the highest/lowest average signal score across the selected timeframes (based on an average score threshold of ~**+/- {zone_threshold_display:.2f}**). Indicates potential broader trend alignment.*") with gr.Row(): long_zone_output = gr.DataFrame(label="Potential Long Zone (Highest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10) short_zone_output = gr.DataFrame(label="Potential Short Zone (Lowest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10) with gr.TabItem("Full Coin Details"): gr.Markdown("Select a coin analyzed in the heatmap to view its detailed indicator values, signals, and potential trade setup across all selected timeframes.") coin_selector = gr.Dropdown(label="Select Coin to View All Timeframe Details", choices=[], interactive=False) # Initially disabled coin_detail_output = gr.Markdown(label="Detailed Indicator Values & Trade Setup per Timeframe", value="*Select a coin from the dropdown after analysis runs.*") with gr.TabItem("Backtest Summary"): gr.Markdown(f"### Simplified Backtest Results (ATR TP1/SL Strategy)") gr.Markdown(f"*Note: Simulated on last **{BACKTEST_HISTORY_CANDLES}** candles per timeframe. Assumes entry on signal candle's open, exits on TP1/SL hit within the **next** candle's high/low. Includes estimated ~{SIMULATED_FEE_PERCENT*2:.2f}% round-trip fee. **This is a highly simplified simulation for indicative purposes only and NOT investment advice.** Results also saved to `{BACKTEST_RESULTS_FILE}`.*") backtest_summary_df = gr.DataFrame( label="Backtest Metrics per Symbol/Timeframe", interactive=False, wrap=True, row_count=(15, "dynamic"), # Show more rows col_count=(6, "fixed") ) # --- Event Handler: Run Button --- def analysis_process_wrapper(exchange, top_n, timeframes, current_state, progress=gr.Progress(track_tqdm=True)): """Wrapper to run analysis and update UI components.""" start_time = time.time() log = ["Initializing analysis..."] # Clear previous results visually and reset state components current_state = { # Reset state explicitly 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, 'backtest_results': [], 'active_signals_df': pd.DataFrame(), 'valid_timeframes': [], 'analyzer': None } initial_updates = { status_log: "\n".join(log), heatmap_plot: None, # Clear plot heatmap_detail_output: "Running analysis...", active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), # Clear table long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), coin_selector: gr.Dropdown(choices=[], value=None, label="Select Coin...", interactive=False), # Disable dropdown coin_detail_output: "Running analysis...", backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), # Clear backtest table shared_state: current_state # Update the state with cleared data } yield initial_updates # Update UI immediately try: # Validate inputs if not exchange: log.append("Error: No exchange selected.") yield {status_log: "\n".join(log), shared_state: current_state} return if not timeframes: log.append("Error: No timeframes selected.") yield {status_log: "\n".join(log), shared_state: current_state} return analyzer = CryptoTrendIndicator(exchange, int(top_n), timeframes) current_state['analyzer'] = analyzer # Store analyzer instance if not analyzer.valid_timeframes: log.append(f"Error: No valid timeframes found or supported for exchange '{exchange}'. Check selection or exchange capabilities.") yield {status_log: "\n".join(log), shared_state: current_state} return # Stop processing log.append(f"Analyzer initialized for {exchange} | {top_n} coins | Timeframes: {analyzer.valid_timeframes}") log.append(f"Fetching up to {LIMIT_PER_TIMEFRAME} candles | Backtesting last {BACKTEST_HISTORY_CANDLES} candles.") log.append("Starting data fetch and analysis (this may take several minutes)...") yield {status_log: "\n".join(log)} # Update log # Run the full analysis, get all results analysis_results, heatmap_df, backtest_results, active_signals_df, heatmap_details, log_msgs = analyzer.run_full_analysis(progress=progress) # Pass progress tracker log.extend(log_msgs) # Add logs from the analysis process # Update state with the new results current_state['analysis_results'] = analysis_results current_state['heatmap_df'] = heatmap_df if isinstance(heatmap_df, pd.DataFrame) else pd.DataFrame() # Ensure DF current_state['heatmap_details'] = heatmap_details if isinstance(heatmap_details, dict) else {} current_state['backtest_results'] = backtest_results if isinstance(backtest_results, list) else [] current_state['active_signals_df'] = active_signals_df if isinstance(active_signals_df, pd.DataFrame) else pd.DataFrame() current_state['valid_timeframes'] = analyzer.valid_timeframes # --- Prepare final UI updates --- # Heatmap & Coin Selector if not current_state['heatmap_df'].empty: fig = create_plotly_heatmap(current_state['heatmap_df'], current_state['heatmap_details']) coin_list = sorted(current_state['heatmap_df'].index.astype(str).tolist()) coin_selector_update = gr.Dropdown(choices=coin_list, value=None, label="Select Coin...", interactive=True) # Enable dropdown heatmap_detail_msg = "Click on a heatmap cell for specific details." else: log.append("Warning: Heatmap data is empty after analysis.") fig = go.Figure(layout=go.Layout(title="No heatmap data generated", height=300)) # Empty figure coin_list = [] coin_selector_update = gr.Dropdown(choices=[], value=None, label="No Coins Analyzed", interactive=False) # Keep disabled heatmap_detail_msg = "No heatmap data generated. Check logs." # Zones long_coins, short_coins = find_zones(current_state['heatmap_df'], DEFAULT_MIN_CONFIRMATION) # Convert list of tuples directly for Gradio DataFrame long_df_data = long_coins if long_coins else [(" ", " ")] # Placeholder if empty short_df_data = short_coins if short_coins else [(" ", " ")] # Backtest Summary bt_summary_display_df = format_backtest_summary(current_state['backtest_results']) if bt_summary_display_df.empty: bt_summary_display_df = pd.DataFrame([{'symbol': 'No results', 'timeframe': '', 'trades': 0, 'win_rate': 0, 'pnl_abs_sum': 0, 'pnl_%_sum_on_entry': 0}]) # Active Signals active_signals_display_df = current_state['active_signals_df'] if active_signals_display_df.empty: active_signals_display_df = pd.DataFrame([{'Symbol': 'No active signals', 'Timeframe': '', 'Direction': '', 'Entry': '', 'SL': '', 'TP1': '', 'TP2': ''}]) end_time = time.time() log.append(f"Analysis & Backtest finished in {end_time - start_time:.2f} seconds.") # Prepare the final dictionary of updates for the UI final_updates = { status_log: "\n".join(log), heatmap_plot: fig, heatmap_detail_output: heatmap_detail_msg, active_signals_table: active_signals_display_df, # Show the active signals table long_zone_output: gr.DataFrame(value=long_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers short_zone_output: gr.DataFrame(value=short_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers coin_selector: coin_selector_update, # Update dropdown with choices coin_detail_output: "Select a coin from the dropdown above." if coin_list else "No analysis results.", backtest_summary_df: bt_summary_display_df, # Show backtest summary shared_state: current_state # IMPORTANT: Update the state with all results } yield final_updates except ValueError as ve: # Catch initialization or config errors log.append(f"--- CONFIGURATION ERROR ---") error_details = f"{str(ve)}\n{traceback.format_exc()}" log.append(error_details) print(error_details) current_state = { # Ensure state is reset 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, 'backtest_results': [], 'active_signals_df': pd.DataFrame(), 'valid_timeframes': [], 'analyzer': None } yield { status_log: "\n".join(log), shared_state: current_state } # Update log and state except Exception as e: log.append(f"--- FATAL ERROR DURING ANALYSIS ---") error_details = f"{str(e)}\n{traceback.format_exc()}" log.append(error_details) print(error_details) # Reset state components on fatal error during run current_state = { # Ensure state is reset 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, 'backtest_results': [], 'active_signals_df': pd.DataFrame(), 'valid_timeframes': [], 'analyzer': None } error_updates = { status_log: "\n".join(log), # Keep other outputs cleared or show error message heatmap_plot: None, heatmap_detail_output: f"Analysis failed. Check logs.\nError: {e}", active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), coin_selector: gr.Dropdown(choices=[], value=None, label="Error", interactive=False), coin_detail_output: f"Analysis failed. Check logs.\nError: {e}", backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), shared_state: current_state # Update state with cleared data } yield error_updates run_button.click( fn=analysis_process_wrapper, inputs=[exchange_input, top_n_input, timeframe_input, shared_state], outputs=[ # List all components that can be updated status_log, heatmap_plot, heatmap_detail_output, active_signals_table, long_zone_output, short_zone_output, coin_selector, coin_detail_output, backtest_summary_df, shared_state ] ) # --- Event Handler: Coin Dropdown Change --- def display_full_details_handler(selected_coin, current_state): """Handles dropdown change to show full details for a selected coin.""" if not selected_coin: return "Select a coin from the dropdown." # Check state validity if not isinstance(current_state, dict) or not current_state.get('analysis_results') or not current_state.get('valid_timeframes'): print("Debug Coin Select: State invalid or missing data.") return "Run analysis first or analysis data is missing/incomplete in state." analysis_results = current_state['analysis_results'] valid_tfs = current_state.get('valid_timeframes', []) # Find the full symbol (e.g., BTC/USDT) based on the selected base coin name full_symbol = None for symbol_key in analysis_results.keys(): # Ensure comparison is string-to-string if str(symbol_key).startswith(str(selected_coin) + '/'): full_symbol = symbol_key break # Found the first match if not full_symbol: print(f"Debug Coin Select: Full symbol not found for base {selected_coin}") return f"Details not found for {selected_coin} in the current analysis results." if not valid_tfs: print(f"Debug Coin Select: Valid timeframes list missing for {selected_coin}") return f"Valid timeframes list is missing from state for {selected_coin}." # Call the formatting function return format_coin_details(full_symbol, analysis_results, valid_tfs) coin_selector.change( fn=display_full_details_handler, inputs=[coin_selector, shared_state], outputs=[coin_detail_output] ) # --- Event Handler: Heatmap Click --- # Use .select event for gr.Plot with Plotly figures heatmap_plot.change( # Use .select for Plotly click events fn=format_heatmap_click_details, inputs=[shared_state], # Pass the whole state outputs=[heatmap_detail_output] # Update the Markdown component below heatmap ) return app # --- Main Execution --- if __name__ == "__main__": print("\n--- Crypto Analysis App V3 ---") # Check if results/log files exist and inform user for fpath in [BACKTEST_RESULTS_FILE, SIGNAL_LOG_FILE]: if os.path.exists(fpath): print(f"INFO: Existing file found: '{fpath}'. It may be appended to or overwritten on the next run.") else: print(f"INFO: Results/Logs will be saved to '{fpath}' after analysis.") print("\nStarting Crypto Analysis Gradio App...") print("------------------------------------------------------") print(f"CONFIG: Backtest Candles={BACKTEST_HISTORY_CANDLES}, Fetch Limit={LIMIT_PER_TIMEFRAME}") print(f"CONFIG: Default Exchange={DEFAULT_EXCHANGE_ID}, Top Coins={DEFAULT_TOP_N_COINS}, Timeframes={DEFAULT_TIMEFRAMES}") print("WARNING: Initial analysis might be slow due to extensive data fetching and calculations.") print("Ensure you have required libraries: pandas, numpy, ccxt, ta, plotly, gradio") print("------------------------------------------------------") gradio_app = create_gradio_app() # Launch the app (debug=False for production/sharing, debug=True for development errors) # share=True can be used to create a temporary public link (use with caution) # Increase max_threads if analysis is CPU-bound and you have cores, but be mindful of API rate limits gradio_app.queue().launch(debug=False, max_threads=4) # Enable queue for better handling of long processes