trading / app.py
ahsanr's picture
Create app.py
70218ec verified
import pandas as pd
import numpy as np
import ccxt
import time
import os
import csv # Import csv module for logging
import traceback # Import traceback for detailed error logging
from datetime import datetime, timedelta
import warnings
import plotly.graph_objects as go
import plotly.colors as pcolors
import gradio as gr
# Import necessary TA indicators (Existing + New)
from ta.trend import MACD, ADXIndicator, IchimokuIndicator, VortexIndicator
from ta.momentum import RSIIndicator, StochasticOscillator, AwesomeOscillatorIndicator, WilliamsRIndicator
from ta.volume import MFIIndicator, OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, VolumeWeightedAveragePrice
from ta.volatility import AverageTrueRange, BollingerBands
# Suppress specific warnings
warnings.filterwarnings('ignore', category=RuntimeWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning) # Ignore some TA lib warnings if needed
# --- Configuration ---
DEFAULT_EXCHANGE_ID = 'mexc' # Changed default as requested context seemed to imply binance API issues
DEFAULT_TOP_N_COINS = 30 # Reduced default due to increased backtest history
DEFAULT_TIMEFRAMES = ['1m', '5m', '15m', '30m', '1h', '4h'] # Example Timeframes
DEFAULT_MIN_CONFIRMATION = 0.75 # Used for Zone finding (Average Score)
# Increased limits for longer backtesting
LIMIT_PER_TIMEFRAME = 1050 # Needs to be >= BACKTEST_HISTORY_CANDLES + indicator lookbacks (~50)
BACKTEST_HISTORY_CANDLES = 1000 # Increased backtest candle count
# --- Trade Parameters ---
ATR_SL_MULTIPLIER = 1.5
ATR_TP1_MULTIPLIER = 1.0
ATR_TP2_MULTIPLIER = 2.0
LEVERAGES = [20, 50] # For display/estimation only
SIMULATED_FEE_PERCENT = 0.06 # Approximate futures fee per side (entry/exit = *2)
BACKTEST_RESULTS_FILE = 'backtest_summary_enhanced.csv'
SIGNAL_LOG_FILE = 'realtime_signal_log.csv' # <<< New: CSV file for logging signals
TIMEFRAME_ORDER_MAP = {
'1m': 1, '3m': 2, '5m': 3, '15m': 4, '30m': 5, '1h': 6, '2h': 7,
'4h': 8, '6h': 9, '8h': 10, '12h': 11, '1d': 12, '3d': 13, '1w': 14, '1M': 15
}
# TIMEFRAME_WEIGHTS not actively used in current logic, kept for potential future use.
# --- CSV Signal Logging Function ---
def log_signal_to_csv(signal_info):
"""Appends signal information to the CSV log file."""
file_exists = os.path.isfile(SIGNAL_LOG_FILE)
fieldnames = [
'LogTimestamp', 'SignalCandleTime', 'Symbol', 'Timeframe', 'Direction',
'Entry', 'SL', 'TP1', 'TP2', 'Status', #'RSI', 'MACD_Diff', 'ADX' # Optional: Add key indicator values
]
try:
with open(SIGNAL_LOG_FILE, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader() # Write header only if file is new
writer.writerow(signal_info)
except IOError as e:
print(f"Error: Could not write to CSV log file {SIGNAL_LOG_FILE}: {e}")
except Exception as e:
print(f"Error logging signal to CSV: {e}\n{traceback.format_exc()}")
# --- Crypto Analysis Class ---
class CryptoTrendIndicator:
def __init__(self, exchange_id, top_coins, selected_timeframes):
self.exchange_id = exchange_id
self.top_coins = top_coins
self.requested_timeframes = selected_timeframes
self.exchange = None
self.valid_timeframes = []
# Stores detailed results {symbol: {tf: {signals: {}, values: {}, trade_params: {}, direction: int}}}
self.analysis_results = {}
self.backtest_results = [] # List of dicts for backtest summary
self.heatmap_df = pd.DataFrame()
self.active_signals_df = pd.DataFrame() # DataFrame for active signals table
self.heatmap_details = {} # Stores dict for hover {Coin: {tf: {Ind: Val,...}}}
self._initialize_exchange()
def _initialize_exchange(self):
"""Initialize the ccxt exchange instance."""
try:
# Force spot market for analysis consistency
self.exchange = getattr(ccxt, self.exchange_id)({
'enableRateLimit': True,
'options': {'defaultType': 'spot'} # Use SPOT for fetching data
})
# Increase timeout for potentially longer data fetches
self.exchange.timeout = 30000 # 30 seconds
self.exchange.load_markets(reload=True)
print(f"Exchange {self.exchange_id} initialized (using SPOT markets for data).")
self._validate_timeframes()
except AttributeError:
raise ValueError(f"Error: Exchange '{self.exchange_id}' not found or supported by ccxt.")
except ccxt.AuthenticationError as e:
raise ValueError(f"Authentication Error for {self.exchange_id}: {e}")
except ccxt.ExchangeError as e:
raise ValueError(f"Exchange Error initializing {self.exchange_id}: {e}")
except Exception as e:
raise ValueError(f"Unexpected error initializing exchange: {e}\n{traceback.format_exc()}")
def _validate_timeframes(self):
"""Filter selected timeframes against those supported by the exchange."""
if not self.exchange or not self.exchange.timeframes:
print(f"Warning: Could not get timeframes from {self.exchange_id}. Cannot validate.")
# Attempt to use requested timeframes, hoping they are valid
self.valid_timeframes = sorted(
self.requested_timeframes,
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
)
print(f"Proceeding with requested timeframes (validation skipped): {self.valid_timeframes}")
return
supported_tfs = self.exchange.timeframes
self.valid_timeframes = sorted(
[tf for tf in self.requested_timeframes if tf in supported_tfs],
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
)
print(f"Supported timeframes for analysis: {self.valid_timeframes}")
if len(self.valid_timeframes) != len(self.requested_timeframes):
skipped = set(self.requested_timeframes) - set(self.valid_timeframes)
print(f"Warning: Skipped unsupported timeframes for {self.exchange_id}: {', '.join(skipped)}")
if not self.valid_timeframes:
print(f"Warning: No valid timeframes selected or supported by {self.exchange_id}.")
def fetch_top_coins(self):
"""Fetch the top coins by USDT volume from the exchange (spot only)"""
if not self.exchange: return [], "Exchange not initialized"
# --- Logic unchanged ---
try:
tickers = self.exchange.fetch_tickers()
usdt_pairs = {}
# Stricter filtering for spot, non-leveraged, common pairs
for symbol, data in tickers.items():
try:
market = self.exchange.market(symbol)
if (symbol.endswith('/USDT') and
data is not None and
market is not None and market.get('spot', False) and # Explicitly check for spot
market.get('active', True) and
not market.get('leveraged', False) and # Exclude leveraged
data.get('quoteVolume') is not None and data['quoteVolume'] > 10000 and # Example: Filter low volume
data.get('symbol') is not None and
# Additional filters for common leveraged/problematic tokens
'UP/' not in symbol and 'DOWN/' not in symbol and
'BULL/' not in symbol and 'BEAR/' not in symbol and
'3L/' not in symbol and '3S/' not in symbol and
'5L/' not in symbol and '5S/' not in symbol
):
usdt_pairs[symbol] = data
except ccxt.BadSymbol:
continue # Skip symbols that ccxt can't parse market data for
except Exception as e_inner:
# print(f"Minor error processing ticker {symbol}: {e_inner}") # Log minor errors
continue
if not usdt_pairs:
return [], f"No suitable USDT spot pairs found on {self.exchange_id} (check filters/volume)."
sorted_pairs = sorted(
usdt_pairs.items(),
key=lambda x: x[1]['quoteVolume'],
reverse=True
)
# Fetch slightly more initially to account for potential loading errors
fetch_limit = min(len(sorted_pairs), self.top_coins + 10) # Fetch slightly more
top_symbols_initial = [pair[0] for pair in sorted_pairs[:fetch_limit]]
# Filter again to ensure markets are loadable and active
final_symbols = []
count = 0
print(f"Validating {len(top_symbols_initial)} potential symbols...")
for s in top_symbols_initial:
if count >= self.top_coins:
break
try:
mkt = self.exchange.market(s) # Check if market data is valid & active
if mkt and mkt.get('active', True):
final_symbols.append(s)
count += 1
except ccxt.BadSymbol:
pass
except Exception as e:
print(f"Market {s} skipped during validation due to error: {e}")
msg = f"Fetched and validated top {len(final_symbols)} USDT spot pairs by volume from {self.exchange_id}."
print(msg)
if not final_symbols:
msg += " (Warning: Result list is empty)"
return final_symbols, msg
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
msg = f"Network/Timeout Error fetching tickers from {self.exchange_id}: {e}."
print(msg)
return [], msg
except ccxt.ExchangeError as e:
msg = f"Exchange Error fetching tickers from {self.exchange_id}: {e}"
print(msg)
return [], msg
except Exception as e:
msg = f"An unexpected error occurred fetching top coins: {e}\n{traceback.format_exc()}"
print(msg)
return [], msg
def fetch_ohlcv_data(self, symbol, timeframe, limit=LIMIT_PER_TIMEFRAME):
"""Fetches OHLCV data with retry mechanism."""
if not self.exchange: return [], "Exchange not initialized"
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
# print(f"Fetching {limit} candles for {symbol} {timeframe} (Attempt {attempt+1})...")
# *** Fetch as many as limit allows ***
# CCXT handles the max limit per request internally,
# this 'limit' param tells it the total number desired.
# If limit > exchange max, ccxt might fetch multiple times if supported,
# or just return the max allowed per call. We rely on ccxt's behavior here.
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if not ohlcv:
# If empty list is returned, treat as insufficient
return [], f"No data returned for {symbol} [{timeframe}]"
elif len(ohlcv) < 100: # Need substantial data for long backtest & indicators
return [], f"Insufficient data ({len(ohlcv)}) for {symbol} [{timeframe}] (Need >100)"
elif len(ohlcv) < BACKTEST_HISTORY_CANDLES + 50:
print(f"Warning: Fetched {len(ohlcv)} candles for {symbol} [{timeframe}], less than ideal ({BACKTEST_HISTORY_CANDLES + 50}) for full backtest + lookback.")
# Proceed anyway, backtest might be shorter than intended
# Successfully fetched enough data
print(f"Fetched {len(ohlcv)} candles for {symbol} [{timeframe}]")
return ohlcv, None
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
print(f"Network error fetching {symbol} [{timeframe}] (Attempt {attempt+1}): {e}. Retrying in {retry_delay}s...")
if attempt == max_retries - 1:
return [], f"Network error {symbol} [{timeframe}] after {max_retries} attempts: {e}"
time.sleep(retry_delay + attempt * 2) # Incremental backoff
except ccxt.RateLimitExceeded as e:
print(f"Rate limit hit fetching {symbol} [{timeframe}]. Waiting longer...")
time.sleep(self.exchange.rateLimit / 1000 * 5 if self.exchange.rateLimit else 60) # Wait longer for rate limits
if attempt == max_retries - 1:
return [], f"Rate limit exceeded for {symbol} [{timeframe}] after retries: {e}"
# Continue to next attempt after waiting
except ccxt.BadSymbol:
return [], f"Invalid symbol {symbol}"
except ccxt.ExchangeError as e:
print(f"Exchange error fetching {symbol} [{timeframe}]: {e}")
# Handle specific errors if needed, e.g., timeframe not available for symbol
if 'timeframe not available' in str(e).lower():
return [], f"Timeframe {timeframe} not supported for {symbol} on {self.exchange_id}"
return [], f"Exchange error {symbol} [{timeframe}]: {e}"
except Exception as e:
print(f"Unexpected error fetching OHLCV {symbol} [{timeframe}]: {e}\n{traceback.format_exc()}")
return [], f"Unexpected error fetching OHLCV {symbol} [{timeframe}]"
return [], f"Failed to fetch data for {symbol} [{timeframe}] after {max_retries} attempts."
def calculate_indicators(self, ohlcv_data, timeframe):
"""Calculates existing and new technical indicators."""
# Increased required length for Ichimoku and other lookbacks
required_length = 100
if not isinstance(ohlcv_data, list) or len(ohlcv_data) < required_length:
print(f"Indicator calc skip: Input data invalid or too short for {timeframe} (needs {required_length}, got {len(ohlcv_data) if ohlcv_data else 0})")
return None
try:
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# Convert to numeric, coerce errors, drop NaNs needed for core calcs
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df.dropna(subset=['close', 'volume', 'high', 'low'], inplace=True) # Drop rows where essential data is missing
if df.empty or len(df) < required_length:
print(f"Data too short after cleaning for {timeframe} (needs {required_length}, got {len(df)})")
return None
# --- Calculate Existing Indicators ---
df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi().fillna(50)
stoch_obj = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=14, smooth_window=3)
df['stoch_k'] = stoch_obj.stoch().fillna(50)
df['stoch_d'] = stoch_obj.stoch_signal().fillna(50)
ao_obj = AwesomeOscillatorIndicator(high=df['high'], low=df['low'], fillna=True)
df['ao'] = ao_obj.awesome_oscillator().fillna(0)
macd_obj = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9, fillna=True)
df['macd'] = macd_obj.macd().fillna(0)
df['macd_signal'] = macd_obj.macd_signal().fillna(0)
df['macd_diff'] = macd_obj.macd_diff().fillna(0)
adx_obj = ADXIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
df['adx'] = adx_obj.adx().fillna(20)
df['adx_pos'] = adx_obj.adx_pos().fillna(0)
df['adx_neg'] = adx_obj.adx_neg().fillna(0)
df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema_50'] = df['close'].ewm(span=50, adjust=False).mean()
df['ema_100'] = df['close'].ewm(span=100, adjust=False).mean()
df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).money_flow_index().fillna(50)
df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume'], fillna=True).on_balance_volume().fillna(method='ffill')
df['cmf'] = ChaikinMoneyFlowIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=20, fillna=True).chaikin_money_flow().fillna(0)
df['volume_sma'] = df['volume'].rolling(window=20, min_periods=10).mean()
# Handle potential division by zero or NaN in volume_sma
df['volume_ratio'] = (df['volume'] / df['volume_sma'].replace(0, np.nan)).replace([np.inf, -np.inf], 1.0).fillna(1.0)
df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True).average_true_range().fillna(method='ffill').fillna(0)
# --- Calculate NEW Indicators ---
bb_obj = BollingerBands(close=df['close'], window=20, window_dev=2, fillna=True)
df['bb_hband'] = bb_obj.bollinger_hband()
df['bb_lband'] = bb_obj.bollinger_lband()
df['bb_mavg'] = bb_obj.bollinger_mavg()
df['bb_width'] = bb_obj.bollinger_wband()
ichi_obj = IchimokuIndicator(high=df['high'], low=df['low'], window1=9, window2=26, window3=52, fillna=True)
df['ichi_a'] = ichi_obj.ichimoku_a()
df['ichi_b'] = ichi_obj.ichimoku_b()
df['ichi_base'] = ichi_obj.ichimoku_base_line()
df['ichi_conv'] = ichi_obj.ichimoku_conversion_line()
df['will_r'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close'], lbp=14, fillna=True).williams_r().fillna(-50)
df['vwap'] = VolumeWeightedAveragePrice(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).volume_weighted_average_price().fillna(method='ffill')
vortex_obj = VortexIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
df['vortex_pos'] = vortex_obj.vortex_indicator_pos()
df['vortex_neg'] = vortex_obj.vortex_indicator_neg()
# --- End Indicators ---
# Fill any remaining NaNs using forward fill first, then backward fill
# This helps ensure indicators near the start/end are usable for signals/backtest
df.ffill(inplace=True)
df.bfill(inplace=True)
# Ensure sufficient length remains after potential NaNs at the start
min_usable_length = 50 # Minimum needed for signal generation logic using prev row
if len(df) < min_usable_length:
print(f"Data too short after indicator calculation & filling for {timeframe} (needs >{min_usable_length}, got {len(df)})")
return None
return df.copy() # Use copy to avoid SettingWithCopyWarning later
except Exception as e:
print(f"Error calculating indicators for {timeframe}: {e}\n{traceback.format_exc()}")
return None
def generate_signals_and_values(self, df):
"""
Generates signals based on latest indicator values, including new ones.
Implements the "K-map" concept via multi-factor confirmation scoring.
Returns:
- final_signals (dict): Dictionary of individual indicator signals {indicator_name: signal (-1, 0, 1)}.
- values (dict): Dictionary of raw indicator values {indicator_name: value}.
- signal_direction (int): Overall signal direction (-1, 0, 1) based on composite score.
"""
if df is None or not isinstance(df, pd.DataFrame) or len(df.index) < 2: # Check length using index after potential cleaning
# print("Signal Gen Skip: DataFrame invalid or too short.")
return None, None, None
try:
# Ensure index is datetime for proper iloc selection
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index() # Ensure sorting after conversion
# Use .iloc for positional access (robust to non-sequential indices)
latest = df.iloc[-1]
prev = df.iloc[-2]
signals = {} # Stores intermediate signals (can be 0.5, -0.5)
final_signals = {} # Stores final signals (-1, 0, 1)
values = {} # Store raw values
# --- Store latest values (including new ones) ---
values['price'] = latest['close']
values['volume'] = latest['volume']
values['timestamp'] = latest.name.strftime('%Y-%m-%d %H:%M:%S') # Use index name
values['atr'] = latest['atr']
# Momentum
values['rsi'] = latest['rsi']
values['stoch_k'] = latest['stoch_k']; values['stoch_d'] = latest['stoch_d']
values['ao'] = latest['ao']
values['will_r'] = latest['will_r'] # New
# Trend
values['macd'] = latest['macd']; values['macd_signal'] = latest['macd_signal']; values['macd_diff'] = latest['macd_diff']
values['adx'] = latest['adx']; values['adx_pos'] = latest['adx_pos']; values['adx_neg'] = latest['adx_neg']
values['ema_20'] = latest['ema_20']; values['ema_50'] = latest['ema_50']; values['ema_100'] = latest['ema_100']
values['ichi_a'] = latest['ichi_a']; values['ichi_b'] = latest['ichi_b']; values['ichi_base'] = latest['ichi_base']; values['ichi_conv'] = latest['ichi_conv'] # New
values['vortex_pos'] = latest['vortex_pos']; values['vortex_neg'] = latest['vortex_neg'] # New
# Volume
values['mfi'] = latest['mfi']
values['obv'] = latest['obv']
values['cmf'] = latest['cmf']
values['volume_ratio'] = latest['volume_ratio']
values['vwap'] = latest['vwap'] # New (volume-related)
# Volatility
values['bb_hband'] = latest['bb_hband']; values['bb_lband'] = latest['bb_lband']; values['bb_mavg'] = latest['bb_mavg']; values['bb_width'] = latest['bb_width'] # New
# ---
# --- Generate Signals (Using intermediate 'signals' dict) ---
# RSI (Momentum)
signals['rsi'] = 1 if latest['rsi'] < 30 else (-1 if latest['rsi'] > 70 else 0)
# Stochastic (Momentum) - Crossover signal
signals['stoch'] = 1 if latest['stoch_k'] < 25 and prev['stoch_k'] <= prev['stoch_d'] and latest['stoch_k'] > latest['stoch_d'] else (-1 if latest['stoch_k'] > 75 and prev['stoch_k'] >= prev['stoch_d'] and latest['stoch_k'] < latest['stoch_d'] else 0)
# Awesome Oscillator (Momentum) - Zero cross and twin peaks (simplified)
if latest['ao'] > 0 and prev['ao'] <= 0: signals['ao'] = 1
elif latest['ao'] < 0 and prev['ao'] >= 0: signals['ao'] = -1
elif latest['ao'] > 0 and prev['ao'] > 0 and latest['ao'] > prev['ao']: signals['ao'] = 0.5 # Bullish momentum
elif latest['ao'] < 0 and prev['ao'] < 0 and latest['ao'] < prev['ao']: signals['ao'] = -0.5 # Bearish momentum
else: signals['ao'] = 0
# Williams %R (Momentum) - Exiting Overbought/Oversold
signals['will_r'] = 1 if latest['will_r'] > -20 and prev['will_r'] <= -20 else (-1 if latest['will_r'] < -80 and prev['will_r'] >= -80 else 0)
# MACD (Trend/Momentum) - Line cross Signal
if (latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']): signals['macd'] = 1
elif (latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']): signals['macd'] = -1
else: signals['macd'] = 0
# ADX (Trend Strength) - Directional movement
signals['adx'] = 1 if latest['adx'] > 25 and latest['adx_pos'] > latest['adx_neg'] else (-1 if latest['adx'] > 25 and latest['adx_neg'] > latest['adx_pos'] else 0)
# EMA Trend (Trend)
if latest['close'] > latest['ema_20'] and latest['ema_20'] > latest['ema_50'] and latest['ema_50'] > latest['ema_100']: signals['ema_trend'] = 1
elif latest['close'] < latest['ema_20'] and latest['ema_20'] < latest['ema_50'] and latest['ema_50'] < latest['ema_100']: signals['ema_trend'] = -1
elif latest['close'] > latest['ema_50']: signals['ema_trend'] = 0.5 # Price above mid-term EMA
elif latest['close'] < latest['ema_50']: signals['ema_trend'] = -0.5 # Price below mid-term EMA
else: signals['ema_trend'] = 0
# Ichimoku (Trend) - TK cross, Cloud position, Price vs Kijun
ichi_signal = 0
tenkan_cross_kijun_up = latest['ichi_conv'] > latest['ichi_base'] and prev['ichi_conv'] <= prev['ichi_base']
tenkan_cross_kijun_down = latest['ichi_conv'] < latest['ichi_base'] and prev['ichi_conv'] >= prev['ichi_base']
above_cloud = latest['close'] > latest['ichi_a'] and latest['close'] > latest['ichi_b']
below_cloud = latest['close'] < latest['ichi_a'] and latest['close'] < latest['ichi_b']
price_above_kijun = latest['close'] > latest['ichi_base']
price_below_kijun = latest['close'] < latest['ichi_base']
if tenkan_cross_kijun_up and above_cloud and price_above_kijun: ichi_signal = 1 # Strong Bullish
elif tenkan_cross_kijun_down and below_cloud and price_below_kijun: ichi_signal = -1 # Strong Bearish
elif above_cloud and price_above_kijun and latest['ichi_conv'] > latest['ichi_base']: ichi_signal = 0.5 # Bullish Bias
elif below_cloud and price_below_kijun and latest['ichi_conv'] < latest['ichi_base']: ichi_signal = -0.5 # Bearish Bias
signals['ichimoku'] = ichi_signal
# Vortex (Trend) - Crossover
if latest['vortex_pos'] > latest['vortex_neg'] and prev['vortex_pos'] <= prev['vortex_neg']: signals['vortex'] = 1
elif latest['vortex_neg'] > latest['vortex_pos'] and prev['vortex_neg'] <= prev['vortex_pos']: signals['vortex'] = -1
else: signals['vortex'] = 0
# MFI (Volume/Momentum)
signals['mfi'] = 1 if latest['mfi'] < 20 else (-1 if latest['mfi'] > 80 else 0)
# CMF (Volume/Flow)
signals['cmf'] = 1 if latest['cmf'] > 0.05 else (-1 if latest['cmf'] < -0.05 else 0)
# OBV (Volume/Trend) - Simple trend vs moving average
if len(df) > 5:
try:
obv_sma5 = df['obv'].rolling(window=5).mean().iloc[-1]
# Check for NaN sma due to insufficient data at the start
if pd.notna(obv_sma5):
signals['obv_trend'] = 1 if latest['obv'] > obv_sma5 else (-1 if latest['obv'] < obv_sma5 else 0)
else: signals['obv_trend'] = 0
except IndexError: # Catch potential index error if rolling mean fails near start
signals['obv_trend'] = 0
else: signals['obv_trend'] = 0
# Volume Spike (Volume) - Relative to recent average
if latest['volume_ratio'] > 1.8: # Threshold for "high volume"
# Signal direction based on candle close vs open during spike
signals['vol_spike'] = 0.5 if latest['close'] > latest['open'] else (-0.5 if latest['close'] < latest['open'] else 0)
else: signals['vol_spike'] = 0
# VWAP (Volume/Price Level) - Price cross VWAP
signals['vwap_cross'] = 1 if latest['close'] > latest['vwap'] and prev['close'] <= prev['vwap'] else (-1 if latest['close'] < latest['vwap'] and prev['close'] >= prev['vwap'] else 0)
# Bollinger Bands (Volatility/Mean Reversion/Breakout) - Breakout example
if latest['close'] > latest['bb_hband'] and prev['close'] <= prev['bb_hband']: signals['bbands'] = 1
elif latest['close'] < latest['bb_lband'] and prev['close'] >= prev['bb_lband']: signals['bbands'] = -1
else: signals['bbands'] = 0
# --- End Signal Logic ---
# --- Final Cleanup & Composite (K-Map/Scoring Implementation) ---
# Ensure all indicators used in signals have a default entry
all_signal_keys = list(signals.keys()) # Get keys from the intermediate signals
for k in all_signal_keys:
values.setdefault(k, np.nan) # Ensure value exists even if calculation failed (use NaN)
# Convert intermediate 0.5/-0.5 signals to 1/-1 for final score, store in final_signals
for key, value in signals.items():
if value >= 0.5: final_signals[key] = 1
elif value <= -0.5: final_signals[key] = -1
else: final_signals[key] = 0
# Calculate composite score based on sum of FINAL (-1, 0, 1) signals
non_neutral_signals = [s for s in final_signals.values() if s != 0]
composite_score = sum(non_neutral_signals)
# Determine overall signal direction based on score magnitude
# Refined Threshold: Need ~30% net agreement among indicators, minimum 3 net signals
num_indicators_signaling = len(final_signals) # Count how many indicators produced a signal
signal_strength_threshold = max(3, int(num_indicators_signaling * 0.30))
signal_direction = 0
if num_indicators_signaling > 0: # Avoid division by zero if no signals generated
# Check if composite score meets the threshold
if composite_score >= signal_strength_threshold:
signal_direction = 1
elif composite_score <= -signal_strength_threshold:
signal_direction = -1
# Ensure all keys from values (excluding non-indicator ones) are in final_signals with 0 if not set
value_keys_for_signals = [k for k in values.keys() if k not in ['price', 'volume', 'timestamp', 'atr']]
for k in value_keys_for_signals:
final_signals.setdefault(k, 0) # Default to neutral if no signal logic applied
return final_signals, values, signal_direction
except KeyError as e:
print(f"KeyError during signal generation (likely missing indicator column: {e}) in DF columns: {df.columns if df is not None else 'None'}. Check calculation step.")
return None, None, None
except IndexError as e:
print(f"IndexError during signal generation (likely insufficient data rows for prev/latest): {e}. DF length: {len(df) if df is not None else 0}")
return None, None, None
except Exception as e:
print(f"Error generating signals/values: {e}\n{traceback.format_exc()}")
return None, None, None
def calculate_trade_params(self, values, signal_direction):
"""Calculate Entry, SL, TP1, TP2 based on ATR"""
params = {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
try:
# Validate necessary inputs more rigorously
if signal_direction == 0 or \
not values or \
pd.isna(values.get('atr')) or values.get('atr', 0) <= 0 or \
pd.isna(values.get('price')) or values.get('price', 0) <= 0:
return params # No signal or invalid data
entry_price = values['price']
atr_val = values['atr']
# Added check here as well
if atr_val <= 0 or entry_price <= 0 : return params
params['entry'] = entry_price
if signal_direction == 1: # Long
params['sl'] = entry_price - ATR_SL_MULTIPLIER * atr_val
params['tp1'] = entry_price + ATR_TP1_MULTIPLIER * atr_val
params['tp2'] = entry_price + ATR_TP2_MULTIPLIER * atr_val
elif signal_direction == -1: # Short
params['sl'] = entry_price + ATR_SL_MULTIPLIER * atr_val
params['tp1'] = entry_price - ATR_TP1_MULTIPLIER * atr_val
params['tp2'] = entry_price - ATR_TP2_MULTIPLIER * atr_val
# Ensure SL/TP are valid numbers and positive
# Also check if SL crossed entry (e.g., due to very small ATR) - invalidate if so.
if pd.isna(params['sl']) or pd.isna(params['tp1']) or pd.isna(params['tp2']) or \
params['sl'] <= 0 or params['tp1'] <= 0 or params['tp2'] <= 0 or \
(signal_direction == 1 and params['sl'] >= params['entry']) or \
(signal_direction == -1 and params['sl'] <= params['entry']):
# print(f"Warning: Invalid SL/TP (NaN, <=0, or SL crossed entry) for entry {entry_price:.5f}, ATR {atr_val:.5f}. Signal: {signal_direction}")
# Reset params if invalid
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
# Calculate potential leveraged profit for $1 (simplified)
fee = SIMULATED_FEE_PERCENT / 100.0
for lev in LEVERAGES:
# Ratios based on entry price
profit_ratio_tp1 = abs(params['tp1'] - entry_price) / entry_price
loss_ratio_sl = abs(params['sl'] - entry_price) / entry_price
# Calculate gross P/L ratio with leverage
leveraged_profit_tp1 = profit_ratio_tp1 * lev
leveraged_loss_sl = loss_ratio_sl * lev
# Calculate fee impact (applied to leveraged position size)
fee_impact = 2 * fee * lev # Entry fee + Exit fee on leveraged amount
# Net profit/loss per $1 invested (considering $1 as margin)
dollar_profit_tp1 = leveraged_profit_tp1 - fee_impact
dollar_loss_sl = -leveraged_loss_sl - fee_impact # Loss is negative
params['lev_profit'][f'{lev}x'] = {'tp1_profit_$': round(dollar_profit_tp1, 3), 'sl_loss_$': round(dollar_loss_sl, 3)}
return params
except Exception as e:
print(f"Error calculating trade params for signal {signal_direction}, values {values}: {e}\n{traceback.format_exc()}")
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
def _run_simple_backtest(self, symbol, timeframe, df):
"""VERY Basic backtest simulation on the provided DataFrame (Uses longer history)."""
default_result = {'symbol': symbol, 'timeframe': timeframe, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0}
# Need enough history + buffer for indicator lookbacks
min_backtest_len = BACKTEST_HISTORY_CANDLES + 50 # Need ~50 for lookback before backtest starts
# Check if DataFrame is valid and has enough rows
if df is None or not isinstance(df, pd.DataFrame) or len(df) < min_backtest_len:
# print(f"BT Skip {symbol} {timeframe}: Not enough data ({len(df) if df is not None else 0} < {min_backtest_len})")
return default_result
try:
# Ensure index is datetime and sorted for correct slicing
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# Select the slice for backtesting simulation
# Use iloc for robustness to non-sequential indices after cleaning
backtest_start_iloc = len(df) - BACKTEST_HISTORY_CANDLES
if backtest_start_iloc < 0: backtest_start_iloc = 0 # Should not happen with check above, but safety
# Iterate through the candles *within the backtest period*
# Signal is generated using data *up to* candle i-1 close
# Trade entry occurs at candle i open
# Exit checks happen based on candle i high/low
trades = []
in_position = False
entry_price = 0
position_direction = 0 # 1 for long, -1 for short
stop_loss = 0
take_profit = 0 # Using TP1 for this simple backtest
entry_timestamp = None # For debugging/tracking
# Loop from the start of the backtest period + 1 (need previous candle for signal)
# Ensure we have enough lookback *before* the first signal candle (iloc-based)
# Ensure lookback of at least 50 candles for indicators
first_signal_candle_idx = max(50, backtest_start_iloc) # Start generating signals from here
for i in range(first_signal_candle_idx, len(df)):
current_row = df.iloc[i]
signal_candle_iloc = i - 1 # Signal based on close of previous candle (iloc)
# Slice original df up to and including the signal candle
# Use iloc slicing for performance and robustness
# Ensure the slice is valid
if signal_candle_iloc < 1: continue # Need at least 2 rows for signal calc
df_for_signal_calc = df.iloc[:signal_candle_iloc + 1]
# Check if df_for_signal_calc is valid before generating signal
if df_for_signal_calc is None or len(df_for_signal_calc) < 2:
continue # Skip if not enough data for signal calc
# Generate signal based on data ending at the previous candle's close
sim_signals, sim_values, sim_direction = self.generate_signals_and_values(df_for_signal_calc)
# --- Entry Logic ---
# Enter only if not already in position AND a clear signal occurs AND we have values
if not in_position and sim_direction != 0 and sim_values:
# Use ATR from the *signal candle* for SL/TP calc
atr_at_entry = sim_values.get('atr')
# Enter at current candle's open price
entry_price_candidate = current_row['open']
entry_timestamp = current_row.name # Timestamp of entry candle
# Validate entry conditions before taking position
if pd.notna(entry_price_candidate) and entry_price_candidate > 0 and \
pd.notna(atr_at_entry) and atr_at_entry > 0:
# Calculate potential SL/TP *before* deciding to enter fully
if sim_direction == 1: # Long
potential_sl = entry_price_candidate - ATR_SL_MULTIPLIER * atr_at_entry
potential_tp = entry_price_candidate + ATR_TP1_MULTIPLIER * atr_at_entry
else: # Short
potential_sl = entry_price_candidate + ATR_SL_MULTIPLIER * atr_at_entry
potential_tp = entry_price_candidate - ATR_TP1_MULTIPLIER * atr_at_entry
# Basic sanity check for SL/TP (positive values and SL not crossing entry)
if not (pd.isna(potential_sl) or pd.isna(potential_tp) or potential_sl <= 0 or potential_tp <= 0 or \
(sim_direction == 1 and potential_sl >= entry_price_candidate) or \
(sim_direction == -1 and potential_sl <= entry_price_candidate)):
# All checks passed, commit to position
in_position = True
position_direction = sim_direction
entry_price = entry_price_candidate
stop_loss = potential_sl
take_profit = potential_tp
# print(f"BT Enter {symbol} {timeframe} @ {entry_timestamp}: Dir={position_direction} Entry={entry_price:.4f} SL={stop_loss:.4f} TP={take_profit:.4f} ATR={atr_at_entry:.4f}")
# else: print(f"BT Skip Entry {symbol} {timeframe}: Invalid entry conditions (Price:{entry_price_candidate}, ATR:{atr_at_entry})")
# --- Exit Logic ---
# Check exits only if currently in a position
elif in_position:
exit_price = None
pnl = 0
exit_reason = "N/A"
current_high = current_row['high']
current_low = current_row['low']
exit_timestamp = current_row.name # Timestamp of exit check candle
# Validate high/low data
if pd.isna(current_high) or pd.isna(current_low):
print(f"BT Warning {symbol} {timeframe}: NaN High/Low at {exit_timestamp}, cannot check exit.")
continue # Skip exit check for this candle
# Check SL/TP hit based on current candle's high/low
# Important: Check SL first in case both are hit within the same candle
if position_direction == 1: # Long
if current_low <= stop_loss:
exit_price = stop_loss
exit_reason = "SL Hit"
elif current_high >= take_profit:
exit_price = take_profit
exit_reason = "TP1 Hit"
elif position_direction == -1: # Short
if current_high >= stop_loss:
exit_price = stop_loss
exit_reason = "SL Hit"
elif current_low <= take_profit:
exit_price = take_profit
exit_reason = "TP1 Hit"
# If an exit condition was met
if exit_price is not None:
# Calculate PnL based on entry and exit
# For shorts, PnL = Entry - Exit; for longs, PnL = Exit - Entry
if position_direction == 1:
pnl = exit_price - entry_price
else: # Short
pnl = entry_price - exit_price
# Calculate and subtract simulated fees
# Fee is % of trade value at entry and exit
entry_fee = (SIMULATED_FEE_PERCENT / 100.0) * entry_price
exit_fee = (SIMULATED_FEE_PERCENT / 100.0) * exit_price
pnl -= (entry_fee + exit_fee)
trades.append({'entry': entry_price, 'exit': exit_price, 'pnl': pnl, 'direction': position_direction})
# print(f"BT Exit {symbol} {timeframe} @ {exit_timestamp}: Reason={exit_reason} ExitPx={exit_price:.4f} PnL={pnl:.4f} (Entry @ {entry_price:.4f} on {entry_timestamp})")
in_position = False # Reset position state after closing trade
entry_timestamp = None # Reset entry timestamp
# --- Summarize Results ---
num_trades = len(trades)
if num_trades > 0:
wins = sum(1 for t in trades if t['pnl'] > 0)
win_rate = (wins / num_trades * 100)
total_pnl = sum(t['pnl'] for t in trades)
# Calculate PnL % sum based on entry price (approximation of capital growth)
pnl_percentage_sum = sum((t['pnl'] / t['entry']) * 100 for t in trades if t['entry'] > 0)
else:
win_rate = 0
total_pnl = 0
pnl_percentage_sum = 0
# print(f"BT Summary {symbol} {timeframe}: Trades={num_trades}, WinRate={win_rate:.2f}%, PnL Sum={total_pnl:.5f}, PnL % Sum={pnl_percentage_sum:.2f}%")
return {
'symbol': symbol, 'timeframe': timeframe, 'trades': num_trades,
'win_rate': round(win_rate, 2), 'pnl_sum': round(total_pnl, 5),
'pnl_%_sum': round(pnl_percentage_sum, 2)
}
except Exception as e:
print(f"Error during backtest simulation for {symbol} {timeframe}: {e}\n{traceback.format_exc()}")
# Print details about the DataFrame state at error might help debugging
# print(f"DF info at error for {symbol} {timeframe}:")
# try: df.info()
# except: print("Could not get DF info")
return default_result
def analyze_symbol(self, symbol, progress):
"""Analyzes a single symbol across all valid timeframes."""
timeframe_details = {}
heatmap_composites = {}
symbol_backtest_results = []
log_msgs = []
symbol_active_signals = {} # Store active signals {tf: {details}} for this symbol
symbol_hover_details = {} # Store hover details {tf: {ind: val}} for this symbol
if not self.valid_timeframes:
return timeframe_details, heatmap_composites, symbol_backtest_results, [f"No valid timeframes for {symbol}."], {}, {}
# Add small delay before starting analysis for a symbol to help with rate limits
time.sleep(0.1)
for i, timeframe in enumerate(self.valid_timeframes):
progress(i / len(self.valid_timeframes), desc=f"Fetching {symbol} [{timeframe}]")
# Fetch data
ohlcv, err_msg = self.fetch_ohlcv_data(symbol, timeframe)
if err_msg:
log_msgs.append(f"Data fetch skip: {symbol} [{timeframe}] {err_msg}")
heatmap_composites[timeframe] = 0
# Run backtest with None to get a default failure entry
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
symbol_hover_details[timeframe] = {} # Placeholder
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.3)) # Longer sleep on error
continue
# Calculate indicators
progress((i + 0.3) / len(self.valid_timeframes), desc=f"Calculating Ind. {symbol} [{timeframe}]")
df = self.calculate_indicators(ohlcv, timeframe)
if df is None or df.empty:
log_msgs.append(f"Indicator calc skip: {symbol} [{timeframe}] (DataFrame invalid or empty)")
heatmap_composites[timeframe] = 0
# Run backtest with None
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
symbol_hover_details[timeframe] = {} # Placeholder
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
continue
# Generate signals/values based on the *latest* data
progress((i + 0.6) / len(self.valid_timeframes), desc=f"Generating Sig. {symbol} [{timeframe}]")
signals, values, signal_direction = self.generate_signals_and_values(df)
if signals is not None and values is not None:
# Calculate trade params based on the latest signal
trade_params = self.calculate_trade_params(values, signal_direction)
# Store all details for this timeframe
timeframe_details[timeframe] = {
'signals': signals, # Individual indicator signals (-1, 0, 1)
'values': values, # Raw indicator values
'trade_params': trade_params, # Entry, SL, TP etc.
'direction': signal_direction # Overall signal direction (-1, 0, 1)
}
# Calculate composite for heatmap (normalized sum of final signals)
num_potential_signals = len(signals)
composite = sum(signals.values()) / num_potential_signals if num_potential_signals > 0 else 0
heatmap_composites[timeframe] = composite
# Store key values for hover text
symbol_hover_details[timeframe] = {
'Price': values.get('price', np.nan), # Add price to hover
'RSI': values.get('rsi', np.nan),
'MACD': values.get('macd', np.nan),
'StochK': values.get('stoch_k', np.nan),
'ADX': values.get('adx', np.nan),
'WillR': values.get('will_r', np.nan),
}
# Store active signal IF direction is non-zero AND trade params are valid
if signal_direction != 0 and trade_params.get('entry') is not None:
active_sig_details = {
'direction': signal_direction,
'entry': trade_params['entry'],
'sl': trade_params['sl'],
'tp1': trade_params['tp1'],
'tp2': trade_params['tp2']
}
symbol_active_signals[timeframe] = active_sig_details
# --- Log Signal to CSV ---
log_data = {
'LogTimestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # Log time
'SignalCandleTime': values.get('timestamp', 'N/A'), # Time of candle signal was based on
'Symbol': symbol,
'Timeframe': timeframe,
'Direction': 'LONG' if signal_direction == 1 else 'SHORT',
'Entry': f"{trade_params['entry']:.8f}", # Use more precision for logging
'SL': f"{trade_params['sl']:.8f}",
'TP1': f"{trade_params['tp1']:.8f}",
'TP2': f"{trade_params['tp2']:.8f}",
'Status': 'Triggered' # Initial status
# Optional: Add key indicator values
#'RSI': f"{values.get('rsi', np.nan):.2f}",
#'MACD_Diff': f"{values.get('macd_diff', np.nan):.6f}",
#'ADX': f"{values.get('adx', np.nan):.2f}"
}
log_signal_to_csv(log_data)
# --- End CSV Logging ---
else:
log_msgs.append(f"Signal gen skip: {symbol} [{timeframe}]")
heatmap_composites[timeframe] = 0
symbol_hover_details[timeframe] = {} # Placeholder
# Run backtest using the full historical dataframe calculated earlier
progress((i + 0.9) / len(self.valid_timeframes), desc=f"Backtesting {symbol} [{timeframe}]")
# IMPORTANT: Pass a COPY of df to backtest to avoid modification issues
bt_result = self._run_simple_backtest(symbol, timeframe, df.copy())
symbol_backtest_results.append(bt_result)
# Sleep based on exchange rate limit (minimum 0.2s)
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
return timeframe_details, heatmap_composites, symbol_backtest_results, log_msgs, symbol_active_signals, symbol_hover_details
def run_full_analysis(self, progress=gr.Progress()):
"""Runs the complete analysis for top coins across selected timeframes."""
self.analysis_results = {}
self.backtest_results = [] # Reset backtest results
self.active_signals_df = pd.DataFrame() # Reset active signals
self.heatmap_details = {} # Reset hover details
all_log_msgs = []
all_active_signals_list = [] # Temp list to build the DataFrame
# Start Fetching Coins
progress(0, desc="Fetching top coins...")
symbols, msg = self.fetch_top_coins()
all_log_msgs.append(msg)
if not symbols:
self.heatmap_df = pd.DataFrame()
all_log_msgs.append("Error: No symbols found to analyze. Stopping.")
# Return empty structures
return {}, pd.DataFrame(), [], pd.DataFrame(), {}, all_log_msgs
# Start Analyzing Symbols
heatmap_data_list = [] # For building heatmap DataFrame [{Coin: X, tf1: score1, ...}, ...]
total_symbols = len(symbols)
progress(0.05, desc=f"Starting analysis for {total_symbols} coins...")
for idx, symbol in enumerate(symbols):
# Calculate progress for this symbol
symbol_progress_start = 0.05 + (idx / total_symbols) * 0.90 # Leave space at end
symbol_progress_range = (1 / total_symbols) * 0.90
symbol_progress_desc = f"Analyzing {symbol} ({idx+1}/{total_symbols})"
progress(symbol_progress_start, desc=symbol_progress_desc)
# Create a lambda for inner progress updates relative to this symbol's range
symbol_progress_tracker = lambda p, desc=symbol_progress_desc: progress(
symbol_progress_start + (p * symbol_progress_range), desc=desc
)
try:
# Analyze symbol (includes signal gen, trade param calc, backtest sim)
# Returns: details per tf, heatmap scores per tf, backtest results list, logs, active signals dict, hover details dict
tf_details, tf_heatmap_scores, symbol_bt_results, log_msgs, symbol_active_tf_signals, symbol_tf_hover_details = \
self.analyze_symbol(symbol, symbol_progress_tracker)
all_log_msgs.extend(log_msgs)
self.backtest_results.extend(symbol_bt_results) # Aggregate backtest results
# Only process if we got some valid details back for the symbol
if tf_details: # Check if the dict is not empty
self.analysis_results[symbol] = tf_details # Store full details
coin_name = symbol.split('/')[0]
heatmap_row = {'Coin': coin_name}
symbol_hover_data = {} # To store hover details for this coin
# Populate heatmap row scores and collect hover details for the coin
for tf in self.valid_timeframes:
heatmap_row[tf] = tf_heatmap_scores.get(tf, 0) # Get score, default 0
symbol_hover_data[tf] = symbol_tf_hover_details.get(tf, {}) # Get hover dict, default empty
heatmap_data_list.append(heatmap_row)
self.heatmap_details[coin_name] = symbol_hover_data # Store hover details keyed by coin name
# Add any active signals found for this symbol to the main list
for tf, active_sig in symbol_active_tf_signals.items():
# Use more precision for the active signals table as well
all_active_signals_list.append({
'Symbol': symbol,
'Timeframe': tf,
'Direction': 'LONG' if active_sig['direction'] == 1 else 'SHORT',
'Entry': f"{active_sig['entry']:.8f}", # Format for display
'SL': f"{active_sig['sl']:.8f}",
'TP1': f"{active_sig['tp1']:.8f}",
'TP2': f"{active_sig['tp2']:.8f}",
})
except ccxt.RateLimitExceeded as e:
wait_time = 60 # Longer wait time
all_log_msgs.append(f"Rate limit exceeded analyzing {symbol}. Sleeping for {wait_time}s... {e}")
print(all_log_msgs[-1])
progress(symbol_progress_start + symbol_progress_range * 0.9, desc=f"Rate Limit Hit! Waiting {wait_time}s...") # Update progress during wait
time.sleep(wait_time)
# Optionally, you might want to retry the symbol analysis here (more complex)
except Exception as e:
error_msg = f"Critical error processing {symbol}: {e}\n{traceback.format_exc()}"
all_log_msgs.append(error_msg)
print(error_msg)
# Add placeholder row to heatmap and default backtest results if analysis failed critically
coin_name = symbol.split('/')[0]
row = {'Coin': coin_name}
for tf in self.valid_timeframes: row[tf] = 0
heatmap_data_list.append(row)
self.heatmap_details[coin_name] = {tf: {} for tf in self.valid_timeframes} # Empty hover details
for tf in self.valid_timeframes:
# Add default failed backtest result for this timeframe
self.backtest_results.append({'symbol': symbol, 'timeframe': tf, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0})
progress(0.95, desc="Finalizing results...") # Progress before final processing
# --- Final Processing ---
if not heatmap_data_list:
self.heatmap_df = pd.DataFrame()
all_log_msgs.append("Warning: No heatmap data generated (no symbols processed successfully?).")
else:
self.heatmap_df = pd.DataFrame(heatmap_data_list).set_index('Coin')
# Ensure columns are ordered correctly according to valid_timeframes
if self.valid_timeframes:
# Filter out columns not in valid_timeframes (if any slipped through)
cols_to_keep = [tf for tf in self.valid_timeframes if tf in self.heatmap_df.columns]
self.heatmap_df = self.heatmap_df[cols_to_keep]
# Reindex to ensure all valid_timeframes are present, filling missing with 0
# Make sure valid_timeframes is used for columns
self.heatmap_df = self.heatmap_df.reindex(columns=self.valid_timeframes, fill_value=0)
self.heatmap_df.index.name = 'Coin'
# Create Active Signals DataFrame from the collected list
if all_active_signals_list:
self.active_signals_df = pd.DataFrame(all_active_signals_list)
# Sort for better presentation
self.active_signals_df['tf_order'] = self.active_signals_df['Timeframe'].map(TIMEFRAME_ORDER_MAP)
self.active_signals_df = self.active_signals_df.sort_values(by=['Symbol', 'tf_order']).drop('tf_order', axis=1)
# Reorder columns for consistency
self.active_signals_df = self.active_signals_df[['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']]
else:
# Create empty DF with correct columns if no signals found
self.active_signals_df = pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'])
# Save backtest results to CSV
if self.backtest_results:
# Filter out any potential None results before creating DataFrame
valid_bt_results = [res for res in self.backtest_results if isinstance(res, dict)]
if valid_bt_results:
bt_df = pd.DataFrame(valid_bt_results)
try:
bt_df.to_csv(BACKTEST_RESULTS_FILE, index=False)
all_log_msgs.append(f"Backtest summary saved to {BACKTEST_RESULTS_FILE}")
except Exception as e:
all_log_msgs.append(f"Error saving backtest results: {e}")
print(f"Error saving backtest results: {e}\n{traceback.format_exc()}")
else:
all_log_msgs.append("No valid backtest results generated to save.")
else:
all_log_msgs.append("No backtest results generated.")
progress(1, desc="Analysis complete.")
all_log_msgs.append("Analysis Complete.")
if os.path.exists(SIGNAL_LOG_FILE):
all_log_msgs.append(f"Signals logged to {SIGNAL_LOG_FILE}")
# Return all generated data
return self.analysis_results, self.heatmap_df, self.backtest_results, self.active_signals_df, self.heatmap_details, all_log_msgs
# --- Gradio Helper Functions ---
def create_plotly_heatmap(df, heatmap_details):
"""Creates heatmap with enhanced hover text including key indicator values."""
if df is None or df.empty:
print("Heatmap DF is empty, returning empty figure.")
return go.Figure(layout=go.Layout(title="No data available for heatmap", height=300))
colorscale = pcolors.diverging.RdYlGn
# Ensure data is numeric for heatmap values, coerce errors
try:
# Ensure index and columns are strings for processing if needed
df.index = df.index.astype(str)
df.columns = df.columns.astype(str)
heatmap_values = df.apply(pd.to_numeric, errors='coerce').fillna(0).values
rows = list(df.index)
cols = list(df.columns)
except Exception as e:
print(f"Error preparing heatmap data: {e}")
return go.Figure(layout=go.Layout(title=f"Error preparing heatmap: {e}", height=300))
# Create hover text matrix using heatmap_details
hover_texts = []
for r_idx, coin in enumerate(rows):
row_texts = []
coin_details_by_tf = heatmap_details.get(coin, {}) # Get details for the coin (keyed by BASE coin name now)
for c_idx, tf in enumerate(cols):
try:
score = heatmap_values[r_idx, c_idx]
# Lookup details for this specific coin/tf from the pre-computed dict
details = coin_details_by_tf.get(tf, {}) # Get details for this TF
# Format values from details, handle N/A or missing data gracefully
price_val = details.get('Price', 'N/A')
rsi_val = details.get('RSI', 'N/A')
macd_val = details.get('MACD', 'N/A')
stochk_val = details.get('StochK', 'N/A')
adx_val = details.get('ADX', 'N/A')
willr_val = details.get('WillR', 'N/A')
# Build hover text string
text = f"<b>Coin:</b> {coin}<br>"
text += f"<b>Timeframe:</b> {tf}<br>"
# Use more precision for price display in hover
text += f"<b>Price:</b> {float(price_val):.8f}<br>" if isinstance(price_val, (int, float)) and not pd.isna(price_val) else f"<b>Price:</b> {price_val}<br>"
text += f"<b>Score:</b> {score:.3f}<br>" # Show score from heatmap itself
text += "----------<br>"
# Format indicator values nicely
try: text += f"RSI: {float(rsi_val):.1f}<br>" if isinstance(rsi_val, (int, float)) and not pd.isna(rsi_val) else f"RSI: {rsi_val}<br>"
except: text += f"RSI: {rsi_val}<br>" # Fallback if conversion fails
try: text += f"MACD: {float(macd_val):.6f}<br>" if isinstance(macd_val, (int, float)) and not pd.isna(macd_val) else f"MACD: {macd_val}<br>"
except: text += f"MACD: {macd_val}<br>"
try: text += f"Stoch K: {float(stochk_val):.1f}<br>" if isinstance(stochk_val, (int, float)) and not pd.isna(stochk_val) else f"Stoch K: {stochk_val}<br>"
except: text += f"Stoch K: {stochk_val}<br>"
try: text += f"ADX: {float(adx_val):.1f}<br>" if isinstance(adx_val, (int, float)) and not pd.isna(adx_val) else f"ADX: {adx_val}<br>"
except: text += f"ADX: {adx_val}<br>"
try: text += f"Will %R: {float(willr_val):.1f}<br>" if isinstance(willr_val, (int, float)) and not pd.isna(willr_val) else f"Will %R: {willr_val}<br>"
except: text += f"Will %R: {willr_val}<br>"
text += "<extra></extra>" # Hide default Plotly hover labels
row_texts.append(text)
except Exception as hover_e:
print(f"Error generating hover text for {coin}/{tf}: {hover_e}")
row_texts.append(f"Error displaying hover for {coin}/{tf}") # Add error placeholder
hover_texts.append(row_texts)
try:
fig = go.Figure(data=go.Heatmap(
z=heatmap_values,
x=cols,
y=rows,
colorscale=colorscale,
zmid=0, zmin=-0.6, zmax=0.6, # Adjusted range based on normalized score
hoverongaps=False,
hoverinfo='text', # Use the custom text matrix for hover
text=hover_texts, # Assign the text matrix
texttemplate=None # Disable texttemplate when using hoverinfo='text'
))
fig.update_layout(
title='Cryptocurrency Signal Strength Heatmap (Hover for Key Values, Click Cell for Full Details)',
xaxis_title='Timeframe',
yaxis_title='Coin',
yaxis={'tickmode': 'linear', 'tickfont': {'size': 9}, 'automargin': True}, # Automargin for y-axis labels
xaxis={'tickmode': 'linear'},
height=max(450, len(rows) * 18 + 100), # Dynamic height
margin=dict(l=70, r=50, t=60, b=50)
)
# print("Plotly heatmap figure created successfully.") # Debug print
return fig
except Exception as e:
print(f"Error creating Plotly heatmap figure: {e}\n{traceback.format_exc()}")
# Return an empty figure with error message
return go.Figure(layout=go.Layout(title=f"Error creating heatmap: {e}", height=400))
def format_heatmap_click_details(evt: gr.SelectData, current_state):
"""Formats detailed analysis results for the specific cell clicked on the heatmap."""
# Check if event data is valid (using SelectData attributes)
if evt is None or evt.index is None or not isinstance(evt.index, (list, tuple)) or len(evt.index) != 2:
# print("Debug: format_heatmap_click_details called with invalid event data:", evt)
return "Click on a heatmap cell after running analysis to see details here. (Ensure you clicked a colored cell)"
# Check if state and necessary data are present
if not isinstance(current_state, dict) or 'analysis_results' not in current_state or 'heatmap_df' not in current_state:
# print("Debug: State invalid or missing analysis_results or heatmap_df")
return "Analysis data not found in state. Please run the analysis first."
try:
row_index, col_index = evt.index
heatmap_df = current_state.get('heatmap_df')
analysis_data = current_state.get('analysis_results') # This holds the full nested dict {symbol: {tf: details}}
if heatmap_df is None or heatmap_df.empty or analysis_data is None:
# print("Debug: Heatmap or analysis data is None or empty.")
return "Heatmap or analysis data is not available. Please run analysis."
# Validate indices against the DataFrame dimensions
if not (0 <= row_index < len(heatmap_df.index) and 0 <= col_index < len(heatmap_df.columns)):
# print(f"Debug: Indices out of bounds. Row: {row_index} (max: {len(heatmap_df.index)-1}), Col: {col_index} (max: {len(heatmap_df.columns)-1})")
return "Error: Clicked cell index is out of bounds."
coin_name = heatmap_df.index[row_index] # This is the base coin (e.g., 'BTC')
timeframe = heatmap_df.columns[col_index]
# print(f"Debug: Clicked on Coin: {coin_name}, Timeframe: {timeframe}")
# Find the full symbol (e.g., BTC/USDT) in the analysis_data keys
full_symbol = None
for symbol_key in analysis_data.keys():
# Match based on the start of the symbol key (more robust)
if symbol_key.startswith(str(coin_name) + '/'): # Ensure coin_name is string
full_symbol = symbol_key
break
if not full_symbol or full_symbol not in analysis_data:
# print(f"Debug: Full symbol not found or no data for {full_symbol} (Base: {coin_name})")
return f"Details not found for {coin_name} (symbol mismatch or no analysis data?)."
# Get the specific details for the symbol and timeframe
details = analysis_data.get(full_symbol, {}).get(timeframe)
if details is None:
# print(f"Debug: No details found for {full_symbol} on timeframe {timeframe}")
return f"No analysis data available for {coin_name} on {timeframe}."
# --- Format Markdown String ---
values = details.get('values', {}) # Raw indicator values
signals = details.get('signals', {}) # Final signals (-1, 0, 1) per indicator
trade_params = details.get('trade_params', {}) # Entry, SL, TP etc.
direction = details.get('direction', 0) # Overall signal direction
if not values or not signals:
# print(f"Debug: Missing 'values' or 'signals' dict in details for {full_symbol} / {timeframe}")
return f"Incomplete data for {coin_name} [{timeframe}]. Cannot display details."
markdown_str = f"### Details for {coin_name} ({full_symbol}) [{timeframe}]\n\n"
markdown_str += f"- **Timestamp:** {values.get('timestamp', 'N/A')}\n"
price = values.get('price', np.nan)
markdown_str += f"- **Price:** {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- **Price:** {price}\n"
volume = values.get('volume', np.nan)
markdown_str += f"- **Volume:** {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- **Volume:** {volume}\n"
atr = values.get('atr', np.nan)
markdown_str += f"- **ATR:** {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- **ATR:** {atr}\n\n"
# Display the FINAL calculated direction based on the composite score threshold
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
markdown_str += "**Indicator Values & Individual Signals:**\n"
markdown_str += "| Indicator | Value | Signal |\n" # Adjusted padding
markdown_str += "|----------------|-----------------|--------|\n"
# Get keys from signals dict, which should represent all indicators evaluated
indicator_keys = sorted(signals.keys())
for name in indicator_keys:
val = values.get(name, 'N/A') # Get raw value
sig_val = signals.get(name, 0) # Get the final -1, 0, 1 signal
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
# Formatting value carefully
if isinstance(val, (int, float)) and not pd.isna(val):
if abs(val) > 100000: val_str = f"{val:,.0f}" # Large integer
elif abs(val) > 100: val_str = f"{val:,.2f}" # Moderate number
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
else: val_str = f"{val:.6f}" # Default decimal (more precision)
elif pd.isna(val): val_str = "NaN"
else: val_str = str(val) # Non-numeric
# Pad indicator name and value for alignment in Markdown table
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
# --- Add Trade Parameters Section ---
if trade_params and trade_params.get('entry') is not None:
markdown_str += f"\n**Potential Trade Setup (Based on this signal & ATR):**\n"
markdown_str += f"- **Direction:** {'LONG' if direction > 0 else 'SHORT'}\n"
# More precision for trade params display
markdown_str += f"- **Entry:** {trade_params['entry']:.8f}\n"
markdown_str += f"- **Stop Loss:** {trade_params['sl']:.8f}\n"
markdown_str += f"- **Take Profit 1:** {trade_params['tp1']:.8f}\n"
markdown_str += f"- **Take Profit 2:** {trade_params['tp2']:.8f}\n\n"
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
for lev, pnl_data in trade_params.get('lev_profit', {}).items():
tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
sl_loss = pnl_data.get('sl_loss_$','N/A')
# Format P/L values
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
else:
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe at this time.\n"
# print(f"Debug: Successfully formatted details for {full_symbol} / {timeframe}")
return markdown_str
except IndexError:
# print("Debug: IndexError during heatmap click processing.")
return "Error processing click: Index out of range. Please ensure the heatmap is up to date."
except KeyError as e:
# print(f"Debug: KeyError processing click: Missing key '{e}'.")
return f"Error processing click: Missing expected data key '{e}'. Analysis data might be incomplete."
except Exception as e:
print(f"Unexpected error formatting heatmap click details: {e}\n{traceback.format_exc()}")
return f"An unexpected error occurred displaying details for the clicked cell: {e}"
def format_coin_details(symbol, analysis_data, valid_timeframes):
"""Formats full details for a selected coin across all analyzed timeframes."""
if not analysis_data or symbol not in analysis_data:
return f"No analysis data available for {symbol}. Please run analysis first."
coin_data_per_tf = analysis_data[symbol]
markdown_str = f"## Full Details for {symbol}\n\n"
# Iterate through the timeframes the analysis was run for
# Use the provided valid_timeframes list for consistent ordering
for timeframe in valid_timeframes:
markdown_str += f"---\n### Timeframe: {timeframe}\n"
details = coin_data_per_tf.get(timeframe)
# Check if details exist for this timeframe
if details is None:
markdown_str += f"*No analysis data generated for this timeframe.*\n"
continue # Skip to the next timeframe
# Extract components, check if they exist
values = details.get('values')
signals = details.get('signals')
trade_params = details.get('trade_params')
direction = details.get('direction') # Use the stored direction
if values is None or signals is None or trade_params is None or direction is None:
markdown_str += f"*Incomplete analysis data for this timeframe.*\n"
continue
# --- Format Section for this Timeframe (similar to heatmap click) ---
markdown_str += f"- Timestamp: {values.get('timestamp', 'N/A')}\n"
price = values.get('price', np.nan)
markdown_str += f"- Price: {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- Price: {price}\n"
volume = values.get('volume', np.nan)
markdown_str += f"- Volume: {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- Volume: {volume}\n"
atr = values.get('atr', np.nan)
markdown_str += f"- ATR: {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- ATR: {atr}\n\n"
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
markdown_str += "**Indicator Values & Individual Signals:**\n"
markdown_str += "| Indicator | Value | Signal |\n"
markdown_str += "|----------------|-----------------|--------|\n"
indicator_keys = sorted(signals.keys())
for name in indicator_keys:
val = values.get(name, 'N/A')
sig_val = signals.get(name, 0)
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
# Value formatting (reuse from heatmap click with more precision)
if isinstance(val, (int, float)) and not pd.isna(val):
if abs(val) > 100000: val_str = f"{val:,.0f}"
elif abs(val) > 100: val_str = f"{val:,.2f}"
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
else: val_str = f"{val:.6f}" # Default decimal (more precision)
elif pd.isna(val): val_str = "NaN"
else: val_str = str(val)
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
# Trade Params Section
if trade_params and trade_params.get('entry') is not None:
markdown_str += f"\n**Potential Trade Setup:**\n"
markdown_str += f"- Direction: {'LONG' if direction > 0 else 'SHORT'}\n"
# More precision
markdown_str += f"- Entry: {trade_params['entry']:.8f}\n"
markdown_str += f"- SL: {trade_params['sl']:.8f}\n"
markdown_str += f"- TP1: {trade_params['tp1']:.8f}\n"
markdown_str += f"- TP2: {trade_params['tp2']:.8f}\n\n"
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
for lev, pnl_data in trade_params.get('lev_profit', {}).items():
tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
sl_loss = pnl_data.get('sl_loss_$','N/A')
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
else:
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe.\n"
markdown_str += "\n---\n" # End separator for the coin
return markdown_str
def find_zones(heatmap_df, min_confirmation_threshold):
"""Finds potential long/short zones based on average heatmap score."""
if heatmap_df is None or heatmap_df.empty:
print("Zone finding skipped: Heatmap DF is empty.")
return [], []
try:
# Calculate average score across valid timeframes for each coin
# Ensure we only average over numeric columns
numeric_df = heatmap_df.apply(pd.to_numeric, errors='coerce').dropna(axis=1, how='all') # Drop cols that are all NaN
if numeric_df.empty:
print("Zone finding skipped: No numeric timeframe columns found after coercion.")
return [],[]
# Calculate mean, skipping NaNs if any occurred during coercion
avg_scores = numeric_df.mean(axis=1, skipna=True)
# Define thresholds based on the average score range (-1 to 1 typically)
# Threshold requires a minimum average positive/negative score.
# Example: if min_confirmation_threshold is 0.75, zones need avg > 0.3 or avg < -0.3
zone_threshold_abs = max(0.1, min(1.0, min_confirmation_threshold * 0.4)) # Adjusted multiplier
long_zone_threshold = zone_threshold_abs
short_zone_threshold = -zone_threshold_abs
# print(f"Debug Zones: Long Threshold={long_zone_threshold:.3f}, Short Threshold={short_zone_threshold:.3f}")
# Filter based on thresholds
long_zone_coins = avg_scores[avg_scores >= long_zone_threshold].sort_values(ascending=False)
short_zone_coins = avg_scores[avg_scores <= short_zone_threshold].sort_values(ascending=True)
# Prepare lists of tuples (Coin, Score)
long_zone_list = list(zip(long_zone_coins.index.astype(str), long_zone_coins.round(3)))
short_zone_list = list(zip(short_zone_coins.index.astype(str), short_zone_coins.round(3)))
print(f"Zones Found: {len(long_zone_list)} long, {len(short_zone_list)} short candidates (Threshold +/- {zone_threshold_abs:.3f}).")
return long_zone_list, short_zone_list
except Exception as e:
print(f"Error finding zones: {e}\n{traceback.format_exc()}")
return [],[] # Return empty lists on error
def format_backtest_summary(backtest_results):
"""Formats the list of backtest result dictionaries into a DataFrame for display."""
if not backtest_results:
print("No backtest results to format.")
# Return empty DF with correct columns if no results
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
try:
# Filter out potential non-dict entries just in case
valid_results = [r for r in backtest_results if isinstance(r, dict)]
if not valid_results:
print("No valid dictionary entries found in backtest results.")
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
df = pd.DataFrame(valid_results)
# Ensure required columns exist, fill with defaults if missing
required_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']
for col in required_cols:
if col not in df.columns:
print(f"Warning: Backtest result missing column '{col}', filling default.")
if col in ['trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']:
df[col] = 0 # Default numeric to 0
else:
df[col] = 'N/A' # Default string to N/A
# Sort by symbol then timeframe order
df['tf_order'] = df['timeframe'].map(TIMEFRAME_ORDER_MAP).fillna(99) # Handle potential unknown timeframes
df = df.sort_values(by=['symbol', 'tf_order']).drop('tf_order', axis=1)
# Rename columns for better display clarity
df = df.rename(columns={'pnl_sum': 'pnl_abs_sum', 'pnl_%_sum': 'pnl_%_sum_on_entry'})
# Select and reorder columns for final display
display_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']
# Ensure all display columns exist before selecting
final_cols = [col for col in display_cols if col in df.columns]
df = df[final_cols]
return df
except Exception as e:
print(f"Error formatting backtest summary: {e}\n{traceback.format_exc()}")
# Return empty DF on error
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
# --- Gradio App Definition ---
def create_gradio_app():
with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue), title="Crypto Signal & Backtest V3") as app:
gr.Markdown("# Crypto Multi-Indicator, Multi-Timeframe Signal & Backtest V3")
gr.Markdown(f"*Warning: Analysis uses up to **{LIMIT_PER_TIMEFRAME}** candles per timeframe and backtests on the last **{BACKTEST_HISTORY_CANDLES}**. This can be **very slow** and **API-intensive**. Rate limits may occur. Use fewer coins/timeframes for faster results. Signals are logged to **`{SIGNAL_LOG_FILE}`**.*")
# Global state to store results between interactions
shared_state = gr.State({
'analysis_results': {}, # {symbol: {tf: {signals, values, trade_params, direction}}}
'heatmap_df': pd.DataFrame(), # DataFrame for heatmap display values
'heatmap_details': {}, # {Coin: {tf: {Ind: Val,...}}} for hover
'backtest_results': [], # List of backtest result dicts
'active_signals_df': pd.DataFrame(), # DataFrame of currently active signals
'valid_timeframes': [], # List of timeframes used in the last run
'analyzer': None # Instance of the analyzer class
})
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Configuration")
# Get available exchanges dynamically, handle potential errors
try:
available_exchanges = ccxt.exchanges
except Exception as e:
print(f"Warning: Could not fetch ccxt exchanges list: {e}")
available_exchanges = [DEFAULT_EXCHANGE_ID] # Fallback
exchange_input = gr.Dropdown(label="Exchange", choices=available_exchanges, value=DEFAULT_EXCHANGE_ID, interactive=True)
top_n_input = gr.Slider(label="Number of Top Coins by Volume", minimum=5, maximum=100, step=5, value=DEFAULT_TOP_N_COINS, interactive=True) # Reduced Max
all_timeframes = list(TIMEFRAME_ORDER_MAP.keys())
timeframe_input = gr.CheckboxGroup(label="Select Timeframes (Fewer = Faster)", choices=all_timeframes, value=DEFAULT_TIMEFRAMES, interactive=True)
run_button = gr.Button("Run Analysis & Backtest", variant="primary")
status_log = gr.Textbox(label="Status Log", lines=15, interactive=False, placeholder="Analysis logs will appear here...", max_lines=30) # Increased lines
with gr.Column(scale=3):
gr.Markdown("## Results")
with gr.Tabs():
with gr.TabItem("Heatmap"):
gr.Markdown("Signal strength heatmap based on composite indicator score. Hover over cells for key values, click a cell for full details below.")
# Use gr.Plot which supports Plotly and the 'select' event for clicks
# Explicitly set label for Plot component
heatmap_plot = gr.Plot(label="Signal Heatmap", show_label=False) # show_label=False to hide the default label if desired
heatmap_detail_output = gr.Markdown(label="Clicked Cell Full Details", value="*Click on a heatmap cell after analysis to see full indicator details and trade setup.*") # Placeholder text
with gr.TabItem("Active Trade Setups"):
gr.Markdown("### Potential Trade Setups (Current Snapshot)")
gr.Markdown(f"*Shows pairs and timeframes with a non-neutral signal direction and valid ATR-based parameters from the **latest** analyzed candle. Signals logged to `{SIGNAL_LOG_FILE}`. This is NOT financial advice. DYOR!*")
active_signals_table = gr.DataFrame(
label="Active Signals",
headers=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'],
datatype=['str'] * 7, # Treat all as strings for display consistency with precision
interactive=False,
row_count=(10, "dynamic"), # Show ~10 rows, allow scroll
col_count=(7, "fixed"),
wrap=True
)
with gr.TabItem("Zones"):
gr.Markdown("### Potential Long/Short Zones")
zone_threshold_display = max(0.1, min(1.0, DEFAULT_MIN_CONFIRMATION * 0.4)) # Calculate display threshold
gr.Markdown(f"*Coins with the highest/lowest average signal score across the selected timeframes (based on an average score threshold of ~**+/- {zone_threshold_display:.2f}**). Indicates potential broader trend alignment.*")
with gr.Row():
long_zone_output = gr.DataFrame(label="Potential Long Zone (Highest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
short_zone_output = gr.DataFrame(label="Potential Short Zone (Lowest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
with gr.TabItem("Full Coin Details"):
gr.Markdown("Select a coin analyzed in the heatmap to view its detailed indicator values, signals, and potential trade setup across all selected timeframes.")
coin_selector = gr.Dropdown(label="Select Coin to View All Timeframe Details", choices=[], interactive=False) # Initially disabled
coin_detail_output = gr.Markdown(label="Detailed Indicator Values & Trade Setup per Timeframe", value="*Select a coin from the dropdown after analysis runs.*")
with gr.TabItem("Backtest Summary"):
gr.Markdown(f"### Simplified Backtest Results (ATR TP1/SL Strategy)")
gr.Markdown(f"*Note: Simulated on last **{BACKTEST_HISTORY_CANDLES}** candles per timeframe. Assumes entry on signal candle's open, exits on TP1/SL hit within the **next** candle's high/low. Includes estimated ~{SIMULATED_FEE_PERCENT*2:.2f}% round-trip fee. **This is a highly simplified simulation for indicative purposes only and NOT investment advice.** Results also saved to `{BACKTEST_RESULTS_FILE}`.*")
backtest_summary_df = gr.DataFrame(
label="Backtest Metrics per Symbol/Timeframe",
interactive=False,
wrap=True,
row_count=(15, "dynamic"), # Show more rows
col_count=(6, "fixed")
)
# --- Event Handler: Run Button ---
def analysis_process_wrapper(exchange, top_n, timeframes, current_state, progress=gr.Progress(track_tqdm=True)):
"""Wrapper to run analysis and update UI components."""
start_time = time.time()
log = ["Initializing analysis..."]
# Clear previous results visually and reset state components
current_state = { # Reset state explicitly
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
initial_updates = {
status_log: "\n".join(log),
heatmap_plot: None, # Clear plot
heatmap_detail_output: "Running analysis...",
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), # Clear table
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
coin_selector: gr.Dropdown(choices=[], value=None, label="Select Coin...", interactive=False), # Disable dropdown
coin_detail_output: "Running analysis...",
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), # Clear backtest table
shared_state: current_state # Update the state with cleared data
}
yield initial_updates # Update UI immediately
try:
# Validate inputs
if not exchange:
log.append("Error: No exchange selected.")
yield {status_log: "\n".join(log), shared_state: current_state}
return
if not timeframes:
log.append("Error: No timeframes selected.")
yield {status_log: "\n".join(log), shared_state: current_state}
return
analyzer = CryptoTrendIndicator(exchange, int(top_n), timeframes)
current_state['analyzer'] = analyzer # Store analyzer instance
if not analyzer.valid_timeframes:
log.append(f"Error: No valid timeframes found or supported for exchange '{exchange}'. Check selection or exchange capabilities.")
yield {status_log: "\n".join(log), shared_state: current_state}
return # Stop processing
log.append(f"Analyzer initialized for {exchange} | {top_n} coins | Timeframes: {analyzer.valid_timeframes}")
log.append(f"Fetching up to {LIMIT_PER_TIMEFRAME} candles | Backtesting last {BACKTEST_HISTORY_CANDLES} candles.")
log.append("Starting data fetch and analysis (this may take several minutes)...")
yield {status_log: "\n".join(log)} # Update log
# Run the full analysis, get all results
analysis_results, heatmap_df, backtest_results, active_signals_df, heatmap_details, log_msgs = analyzer.run_full_analysis(progress=progress) # Pass progress tracker
log.extend(log_msgs) # Add logs from the analysis process
# Update state with the new results
current_state['analysis_results'] = analysis_results
current_state['heatmap_df'] = heatmap_df if isinstance(heatmap_df, pd.DataFrame) else pd.DataFrame() # Ensure DF
current_state['heatmap_details'] = heatmap_details if isinstance(heatmap_details, dict) else {}
current_state['backtest_results'] = backtest_results if isinstance(backtest_results, list) else []
current_state['active_signals_df'] = active_signals_df if isinstance(active_signals_df, pd.DataFrame) else pd.DataFrame()
current_state['valid_timeframes'] = analyzer.valid_timeframes
# --- Prepare final UI updates ---
# Heatmap & Coin Selector
if not current_state['heatmap_df'].empty:
fig = create_plotly_heatmap(current_state['heatmap_df'], current_state['heatmap_details'])
coin_list = sorted(current_state['heatmap_df'].index.astype(str).tolist())
coin_selector_update = gr.Dropdown(choices=coin_list, value=None, label="Select Coin...", interactive=True) # Enable dropdown
heatmap_detail_msg = "Click on a heatmap cell for specific details."
else:
log.append("Warning: Heatmap data is empty after analysis.")
fig = go.Figure(layout=go.Layout(title="No heatmap data generated", height=300)) # Empty figure
coin_list = []
coin_selector_update = gr.Dropdown(choices=[], value=None, label="No Coins Analyzed", interactive=False) # Keep disabled
heatmap_detail_msg = "No heatmap data generated. Check logs."
# Zones
long_coins, short_coins = find_zones(current_state['heatmap_df'], DEFAULT_MIN_CONFIRMATION)
# Convert list of tuples directly for Gradio DataFrame
long_df_data = long_coins if long_coins else [(" ", " ")] # Placeholder if empty
short_df_data = short_coins if short_coins else [(" ", " ")]
# Backtest Summary
bt_summary_display_df = format_backtest_summary(current_state['backtest_results'])
if bt_summary_display_df.empty:
bt_summary_display_df = pd.DataFrame([{'symbol': 'No results', 'timeframe': '', 'trades': 0, 'win_rate': 0, 'pnl_abs_sum': 0, 'pnl_%_sum_on_entry': 0}])
# Active Signals
active_signals_display_df = current_state['active_signals_df']
if active_signals_display_df.empty:
active_signals_display_df = pd.DataFrame([{'Symbol': 'No active signals', 'Timeframe': '', 'Direction': '', 'Entry': '', 'SL': '', 'TP1': '', 'TP2': ''}])
end_time = time.time()
log.append(f"Analysis & Backtest finished in {end_time - start_time:.2f} seconds.")
# Prepare the final dictionary of updates for the UI
final_updates = {
status_log: "\n".join(log),
heatmap_plot: fig,
heatmap_detail_output: heatmap_detail_msg,
active_signals_table: active_signals_display_df, # Show the active signals table
long_zone_output: gr.DataFrame(value=long_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
short_zone_output: gr.DataFrame(value=short_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
coin_selector: coin_selector_update, # Update dropdown with choices
coin_detail_output: "Select a coin from the dropdown above." if coin_list else "No analysis results.",
backtest_summary_df: bt_summary_display_df, # Show backtest summary
shared_state: current_state # IMPORTANT: Update the state with all results
}
yield final_updates
except ValueError as ve:
# Catch initialization or config errors
log.append(f"--- CONFIGURATION ERROR ---")
error_details = f"{str(ve)}\n{traceback.format_exc()}"
log.append(error_details)
print(error_details)
current_state = { # Ensure state is reset
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
yield { status_log: "\n".join(log), shared_state: current_state } # Update log and state
except Exception as e:
log.append(f"--- FATAL ERROR DURING ANALYSIS ---")
error_details = f"{str(e)}\n{traceback.format_exc()}"
log.append(error_details)
print(error_details)
# Reset state components on fatal error during run
current_state = { # Ensure state is reset
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
error_updates = {
status_log: "\n".join(log),
# Keep other outputs cleared or show error message
heatmap_plot: None,
heatmap_detail_output: f"Analysis failed. Check logs.\nError: {e}",
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']),
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
coin_selector: gr.Dropdown(choices=[], value=None, label="Error", interactive=False),
coin_detail_output: f"Analysis failed. Check logs.\nError: {e}",
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']),
shared_state: current_state # Update state with cleared data
}
yield error_updates
run_button.click(
fn=analysis_process_wrapper,
inputs=[exchange_input, top_n_input, timeframe_input, shared_state],
outputs=[ # List all components that can be updated
status_log, heatmap_plot, heatmap_detail_output, active_signals_table,
long_zone_output, short_zone_output, coin_selector, coin_detail_output,
backtest_summary_df, shared_state
]
)
# --- Event Handler: Coin Dropdown Change ---
def display_full_details_handler(selected_coin, current_state):
"""Handles dropdown change to show full details for a selected coin."""
if not selected_coin:
return "Select a coin from the dropdown."
# Check state validity
if not isinstance(current_state, dict) or not current_state.get('analysis_results') or not current_state.get('valid_timeframes'):
print("Debug Coin Select: State invalid or missing data.")
return "Run analysis first or analysis data is missing/incomplete in state."
analysis_results = current_state['analysis_results']
valid_tfs = current_state.get('valid_timeframes', [])
# Find the full symbol (e.g., BTC/USDT) based on the selected base coin name
full_symbol = None
for symbol_key in analysis_results.keys():
# Ensure comparison is string-to-string
if str(symbol_key).startswith(str(selected_coin) + '/'):
full_symbol = symbol_key
break # Found the first match
if not full_symbol:
print(f"Debug Coin Select: Full symbol not found for base {selected_coin}")
return f"Details not found for {selected_coin} in the current analysis results."
if not valid_tfs:
print(f"Debug Coin Select: Valid timeframes list missing for {selected_coin}")
return f"Valid timeframes list is missing from state for {selected_coin}."
# Call the formatting function
return format_coin_details(full_symbol, analysis_results, valid_tfs)
coin_selector.change(
fn=display_full_details_handler,
inputs=[coin_selector, shared_state],
outputs=[coin_detail_output]
)
# --- Event Handler: Heatmap Click ---
# Use .select event for gr.Plot with Plotly figures
heatmap_plot.change( # Use .select for Plotly click events
fn=format_heatmap_click_details,
inputs=[shared_state], # Pass the whole state
outputs=[heatmap_detail_output] # Update the Markdown component below heatmap
)
return app
# --- Main Execution ---
if __name__ == "__main__":
print("\n--- Crypto Analysis App V3 ---")
# Check if results/log files exist and inform user
for fpath in [BACKTEST_RESULTS_FILE, SIGNAL_LOG_FILE]:
if os.path.exists(fpath):
print(f"INFO: Existing file found: '{fpath}'. It may be appended to or overwritten on the next run.")
else:
print(f"INFO: Results/Logs will be saved to '{fpath}' after analysis.")
print("\nStarting Crypto Analysis Gradio App...")
print("------------------------------------------------------")
print(f"CONFIG: Backtest Candles={BACKTEST_HISTORY_CANDLES}, Fetch Limit={LIMIT_PER_TIMEFRAME}")
print(f"CONFIG: Default Exchange={DEFAULT_EXCHANGE_ID}, Top Coins={DEFAULT_TOP_N_COINS}, Timeframes={DEFAULT_TIMEFRAMES}")
print("WARNING: Initial analysis might be slow due to extensive data fetching and calculations.")
print("Ensure you have required libraries: pandas, numpy, ccxt, ta, plotly, gradio")
print("------------------------------------------------------")
gradio_app = create_gradio_app()
# Launch the app (debug=False for production/sharing, debug=True for development errors)
# share=True can be used to create a temporary public link (use with caution)
# Increase max_threads if analysis is CPU-bound and you have cores, but be mindful of API rate limits
gradio_app.queue().launch(debug=False, max_threads=4) # Enable queue for better handling of long processes