|
|
|
import pandas as pd |
|
import numpy as np |
|
import ccxt |
|
import time |
|
import os |
|
import csv |
|
import traceback |
|
from datetime import datetime, timedelta |
|
import warnings |
|
import plotly.graph_objects as go |
|
import plotly.colors as pcolors |
|
import gradio as gr |
|
|
|
|
|
from ta.trend import MACD, ADXIndicator, IchimokuIndicator, VortexIndicator |
|
from ta.momentum import RSIIndicator, StochasticOscillator, AwesomeOscillatorIndicator, WilliamsRIndicator |
|
from ta.volume import MFIIndicator, OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, VolumeWeightedAveragePrice |
|
from ta.volatility import AverageTrueRange, BollingerBands |
|
|
|
|
|
warnings.filterwarnings('ignore', category=RuntimeWarning) |
|
warnings.filterwarnings('ignore', category=FutureWarning) |
|
warnings.filterwarnings('ignore', category=UserWarning) |
|
|
|
|
|
DEFAULT_EXCHANGE_ID = 'mexc' |
|
DEFAULT_TOP_N_COINS = 30 |
|
DEFAULT_TIMEFRAMES = ['1m', '5m', '15m', '30m', '1h', '4h'] |
|
DEFAULT_MIN_CONFIRMATION = 0.75 |
|
|
|
LIMIT_PER_TIMEFRAME = 1050 |
|
BACKTEST_HISTORY_CANDLES = 1000 |
|
|
|
ATR_SL_MULTIPLIER = 1.5 |
|
ATR_TP1_MULTIPLIER = 1.0 |
|
ATR_TP2_MULTIPLIER = 2.0 |
|
LEVERAGES = [20, 50] |
|
SIMULATED_FEE_PERCENT = 0.06 |
|
BACKTEST_RESULTS_FILE = 'backtest_summary_enhanced.csv' |
|
SIGNAL_LOG_FILE = 'realtime_signal_log.csv' |
|
|
|
TIMEFRAME_ORDER_MAP = { |
|
'1m': 1, '3m': 2, '5m': 3, '15m': 4, '30m': 5, '1h': 6, '2h': 7, |
|
'4h': 8, '6h': 9, '8h': 10, '12h': 11, '1d': 12, '3d': 13, '1w': 14, '1M': 15 |
|
} |
|
|
|
|
|
|
|
def log_signal_to_csv(signal_info): |
|
"""Appends signal information to the CSV log file.""" |
|
file_exists = os.path.isfile(SIGNAL_LOG_FILE) |
|
fieldnames = [ |
|
'LogTimestamp', 'SignalCandleTime', 'Symbol', 'Timeframe', 'Direction', |
|
'Entry', 'SL', 'TP1', 'TP2', 'Status', |
|
] |
|
try: |
|
with open(SIGNAL_LOG_FILE, 'a', newline='', encoding='utf-8') as csvfile: |
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
|
if not file_exists: |
|
writer.writeheader() |
|
writer.writerow(signal_info) |
|
except IOError as e: |
|
print(f"Error: Could not write to CSV log file {SIGNAL_LOG_FILE}: {e}") |
|
except Exception as e: |
|
print(f"Error logging signal to CSV: {e}\n{traceback.format_exc()}") |
|
|
|
|
|
class CryptoTrendIndicator: |
|
def __init__(self, exchange_id, top_coins, selected_timeframes): |
|
self.exchange_id = exchange_id |
|
self.top_coins = top_coins |
|
self.requested_timeframes = selected_timeframes |
|
self.exchange = None |
|
self.valid_timeframes = [] |
|
|
|
self.analysis_results = {} |
|
self.backtest_results = [] |
|
self.heatmap_df = pd.DataFrame() |
|
self.active_signals_df = pd.DataFrame() |
|
self.heatmap_details = {} |
|
|
|
self._initialize_exchange() |
|
|
|
def _initialize_exchange(self): |
|
"""Initialize the ccxt exchange instance.""" |
|
try: |
|
|
|
self.exchange = getattr(ccxt, self.exchange_id)({ |
|
'enableRateLimit': True, |
|
'options': {'defaultType': 'spot'} |
|
}) |
|
|
|
self.exchange.timeout = 30000 |
|
self.exchange.load_markets(reload=True) |
|
print(f"Exchange {self.exchange_id} initialized (using SPOT markets for data).") |
|
self._validate_timeframes() |
|
except AttributeError: |
|
raise ValueError(f"Error: Exchange '{self.exchange_id}' not found or supported by ccxt.") |
|
except ccxt.AuthenticationError as e: |
|
raise ValueError(f"Authentication Error for {self.exchange_id}: {e}") |
|
except ccxt.ExchangeError as e: |
|
raise ValueError(f"Exchange Error initializing {self.exchange_id}: {e}") |
|
except Exception as e: |
|
raise ValueError(f"Unexpected error initializing exchange: {e}\n{traceback.format_exc()}") |
|
|
|
def _validate_timeframes(self): |
|
"""Filter selected timeframes against those supported by the exchange.""" |
|
if not self.exchange or not self.exchange.timeframes: |
|
print(f"Warning: Could not get timeframes from {self.exchange_id}. Cannot validate.") |
|
|
|
self.valid_timeframes = sorted( |
|
self.requested_timeframes, |
|
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99) |
|
) |
|
print(f"Proceeding with requested timeframes (validation skipped): {self.valid_timeframes}") |
|
return |
|
|
|
supported_tfs = self.exchange.timeframes |
|
self.valid_timeframes = sorted( |
|
[tf for tf in self.requested_timeframes if tf in supported_tfs], |
|
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99) |
|
) |
|
print(f"Supported timeframes for analysis: {self.valid_timeframes}") |
|
if len(self.valid_timeframes) != len(self.requested_timeframes): |
|
skipped = set(self.requested_timeframes) - set(self.valid_timeframes) |
|
print(f"Warning: Skipped unsupported timeframes for {self.exchange_id}: {', '.join(skipped)}") |
|
if not self.valid_timeframes: |
|
print(f"Warning: No valid timeframes selected or supported by {self.exchange_id}.") |
|
|
|
|
|
def fetch_top_coins(self): |
|
"""Fetch the top coins by USDT volume from the exchange (spot only)""" |
|
if not self.exchange: return [], "Exchange not initialized" |
|
|
|
try: |
|
tickers = self.exchange.fetch_tickers() |
|
usdt_pairs = {} |
|
|
|
for symbol, data in tickers.items(): |
|
try: |
|
market = self.exchange.market(symbol) |
|
if (symbol.endswith('/USDT') and |
|
data is not None and |
|
market is not None and market.get('spot', False) and |
|
market.get('active', True) and |
|
not market.get('leveraged', False) and |
|
data.get('quoteVolume') is not None and data['quoteVolume'] > 10000 and |
|
data.get('symbol') is not None and |
|
|
|
'UP/' not in symbol and 'DOWN/' not in symbol and |
|
'BULL/' not in symbol and 'BEAR/' not in symbol and |
|
'3L/' not in symbol and '3S/' not in symbol and |
|
'5L/' not in symbol and '5S/' not in symbol |
|
): |
|
usdt_pairs[symbol] = data |
|
except ccxt.BadSymbol: |
|
continue |
|
except Exception as e_inner: |
|
|
|
continue |
|
|
|
|
|
if not usdt_pairs: |
|
return [], f"No suitable USDT spot pairs found on {self.exchange_id} (check filters/volume)." |
|
|
|
sorted_pairs = sorted( |
|
usdt_pairs.items(), |
|
key=lambda x: x[1]['quoteVolume'], |
|
reverse=True |
|
) |
|
|
|
|
|
fetch_limit = min(len(sorted_pairs), self.top_coins + 10) |
|
top_symbols_initial = [pair[0] for pair in sorted_pairs[:fetch_limit]] |
|
|
|
|
|
final_symbols = [] |
|
count = 0 |
|
print(f"Validating {len(top_symbols_initial)} potential symbols...") |
|
for s in top_symbols_initial: |
|
if count >= self.top_coins: |
|
break |
|
try: |
|
mkt = self.exchange.market(s) |
|
if mkt and mkt.get('active', True): |
|
final_symbols.append(s) |
|
count += 1 |
|
except ccxt.BadSymbol: |
|
pass |
|
except Exception as e: |
|
print(f"Market {s} skipped during validation due to error: {e}") |
|
|
|
msg = f"Fetched and validated top {len(final_symbols)} USDT spot pairs by volume from {self.exchange_id}." |
|
print(msg) |
|
if not final_symbols: |
|
msg += " (Warning: Result list is empty)" |
|
return final_symbols, msg |
|
|
|
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e: |
|
msg = f"Network/Timeout Error fetching tickers from {self.exchange_id}: {e}." |
|
print(msg) |
|
return [], msg |
|
except ccxt.ExchangeError as e: |
|
msg = f"Exchange Error fetching tickers from {self.exchange_id}: {e}" |
|
print(msg) |
|
return [], msg |
|
except Exception as e: |
|
msg = f"An unexpected error occurred fetching top coins: {e}\n{traceback.format_exc()}" |
|
print(msg) |
|
return [], msg |
|
|
|
def fetch_ohlcv_data(self, symbol, timeframe, limit=LIMIT_PER_TIMEFRAME): |
|
"""Fetches OHLCV data with retry mechanism.""" |
|
if not self.exchange: return [], "Exchange not initialized" |
|
max_retries = 3 |
|
retry_delay = 5 |
|
for attempt in range(max_retries): |
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) |
|
|
|
if not ohlcv: |
|
|
|
return [], f"No data returned for {symbol} [{timeframe}]" |
|
elif len(ohlcv) < 100: |
|
return [], f"Insufficient data ({len(ohlcv)}) for {symbol} [{timeframe}] (Need >100)" |
|
elif len(ohlcv) < BACKTEST_HISTORY_CANDLES + 50: |
|
print(f"Warning: Fetched {len(ohlcv)} candles for {symbol} [{timeframe}], less than ideal ({BACKTEST_HISTORY_CANDLES + 50}) for full backtest + lookback.") |
|
|
|
|
|
|
|
print(f"Fetched {len(ohlcv)} candles for {symbol} [{timeframe}]") |
|
return ohlcv, None |
|
|
|
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e: |
|
print(f"Network error fetching {symbol} [{timeframe}] (Attempt {attempt+1}): {e}. Retrying in {retry_delay}s...") |
|
if attempt == max_retries - 1: |
|
return [], f"Network error {symbol} [{timeframe}] after {max_retries} attempts: {e}" |
|
time.sleep(retry_delay + attempt * 2) |
|
except ccxt.RateLimitExceeded as e: |
|
print(f"Rate limit hit fetching {symbol} [{timeframe}]. Waiting longer...") |
|
time.sleep(self.exchange.rateLimit / 1000 * 5 if self.exchange.rateLimit else 60) |
|
if attempt == max_retries - 1: |
|
return [], f"Rate limit exceeded for {symbol} [{timeframe}] after retries: {e}" |
|
|
|
except ccxt.BadSymbol: |
|
return [], f"Invalid symbol {symbol}" |
|
except ccxt.ExchangeError as e: |
|
print(f"Exchange error fetching {symbol} [{timeframe}]: {e}") |
|
|
|
if 'timeframe not available' in str(e).lower(): |
|
return [], f"Timeframe {timeframe} not supported for {symbol} on {self.exchange_id}" |
|
return [], f"Exchange error {symbol} [{timeframe}]: {e}" |
|
except Exception as e: |
|
print(f"Unexpected error fetching OHLCV {symbol} [{timeframe}]: {e}\n{traceback.format_exc()}") |
|
return [], f"Unexpected error fetching OHLCV {symbol} [{timeframe}]" |
|
return [], f"Failed to fetch data for {symbol} [{timeframe}] after {max_retries} attempts." |
|
|
|
|
|
def calculate_indicators(self, ohlcv_data, timeframe): |
|
"""Calculates existing and new technical indicators.""" |
|
|
|
required_length = 100 |
|
if not isinstance(ohlcv_data, list) or len(ohlcv_data) < required_length: |
|
print(f"Indicator calc skip: Input data invalid or too short for {timeframe} (needs {required_length}, got {len(ohlcv_data) if ohlcv_data else 0})") |
|
return None |
|
|
|
try: |
|
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) |
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') |
|
df.set_index('timestamp', inplace=True) |
|
|
|
|
|
for col in ['open', 'high', 'low', 'close', 'volume']: |
|
df[col] = pd.to_numeric(df[col], errors='coerce') |
|
df.dropna(subset=['close', 'volume', 'high', 'low'], inplace=True) |
|
|
|
if df.empty or len(df) < required_length: |
|
print(f"Data too short after cleaning for {timeframe} (needs {required_length}, got {len(df)})") |
|
return None |
|
|
|
|
|
df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi().fillna(50) |
|
stoch_obj = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=14, smooth_window=3) |
|
df['stoch_k'] = stoch_obj.stoch().fillna(50) |
|
df['stoch_d'] = stoch_obj.stoch_signal().fillna(50) |
|
ao_obj = AwesomeOscillatorIndicator(high=df['high'], low=df['low'], fillna=True) |
|
df['ao'] = ao_obj.awesome_oscillator().fillna(0) |
|
macd_obj = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9, fillna=True) |
|
df['macd'] = macd_obj.macd().fillna(0) |
|
df['macd_signal'] = macd_obj.macd_signal().fillna(0) |
|
df['macd_diff'] = macd_obj.macd_diff().fillna(0) |
|
adx_obj = ADXIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True) |
|
df['adx'] = adx_obj.adx().fillna(20) |
|
df['adx_pos'] = adx_obj.adx_pos().fillna(0) |
|
df['adx_neg'] = adx_obj.adx_neg().fillna(0) |
|
df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean() |
|
df['ema_50'] = df['close'].ewm(span=50, adjust=False).mean() |
|
df['ema_100'] = df['close'].ewm(span=100, adjust=False).mean() |
|
df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).money_flow_index().fillna(50) |
|
df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume'], fillna=True).on_balance_volume().fillna(method='ffill') |
|
df['cmf'] = ChaikinMoneyFlowIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=20, fillna=True).chaikin_money_flow().fillna(0) |
|
df['volume_sma'] = df['volume'].rolling(window=20, min_periods=10).mean() |
|
|
|
df['volume_ratio'] = (df['volume'] / df['volume_sma'].replace(0, np.nan)).replace([np.inf, -np.inf], 1.0).fillna(1.0) |
|
df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True).average_true_range().fillna(method='ffill').fillna(0) |
|
|
|
|
|
bb_obj = BollingerBands(close=df['close'], window=20, window_dev=2, fillna=True) |
|
df['bb_hband'] = bb_obj.bollinger_hband() |
|
df['bb_lband'] = bb_obj.bollinger_lband() |
|
df['bb_mavg'] = bb_obj.bollinger_mavg() |
|
df['bb_width'] = bb_obj.bollinger_wband() |
|
ichi_obj = IchimokuIndicator(high=df['high'], low=df['low'], window1=9, window2=26, window3=52, fillna=True) |
|
df['ichi_a'] = ichi_obj.ichimoku_a() |
|
df['ichi_b'] = ichi_obj.ichimoku_b() |
|
df['ichi_base'] = ichi_obj.ichimoku_base_line() |
|
df['ichi_conv'] = ichi_obj.ichimoku_conversion_line() |
|
df['will_r'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close'], lbp=14, fillna=True).williams_r().fillna(-50) |
|
df['vwap'] = VolumeWeightedAveragePrice(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).volume_weighted_average_price().fillna(method='ffill') |
|
vortex_obj = VortexIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True) |
|
df['vortex_pos'] = vortex_obj.vortex_indicator_pos() |
|
df['vortex_neg'] = vortex_obj.vortex_indicator_neg() |
|
|
|
|
|
|
|
|
|
df.ffill(inplace=True) |
|
df.bfill(inplace=True) |
|
|
|
|
|
min_usable_length = 50 |
|
if len(df) < min_usable_length: |
|
print(f"Data too short after indicator calculation & filling for {timeframe} (needs >{min_usable_length}, got {len(df)})") |
|
return None |
|
|
|
return df.copy() |
|
except Exception as e: |
|
print(f"Error calculating indicators for {timeframe}: {e}\n{traceback.format_exc()}") |
|
return None |
|
|
|
def generate_signals_and_values(self, df): |
|
""" |
|
Generates signals based on latest indicator values, including new ones. |
|
Implements the "K-map" concept via multi-factor confirmation scoring. |
|
Returns: |
|
- final_signals (dict): Dictionary of individual indicator signals {indicator_name: signal (-1, 0, 1)}. |
|
- values (dict): Dictionary of raw indicator values {indicator_name: value}. |
|
- signal_direction (int): Overall signal direction (-1, 0, 1) based on composite score. |
|
""" |
|
if df is None or not isinstance(df, pd.DataFrame) or len(df.index) < 2: |
|
|
|
return None, None, None |
|
|
|
try: |
|
|
|
if not isinstance(df.index, pd.DatetimeIndex): |
|
df.index = pd.to_datetime(df.index) |
|
df = df.sort_index() |
|
|
|
|
|
latest = df.iloc[-1] |
|
prev = df.iloc[-2] |
|
|
|
signals = {} |
|
final_signals = {} |
|
values = {} |
|
|
|
|
|
values['price'] = latest['close'] |
|
values['volume'] = latest['volume'] |
|
values['timestamp'] = latest.name.strftime('%Y-%m-%d %H:%M:%S') |
|
values['atr'] = latest['atr'] |
|
|
|
values['rsi'] = latest['rsi'] |
|
values['stoch_k'] = latest['stoch_k']; values['stoch_d'] = latest['stoch_d'] |
|
values['ao'] = latest['ao'] |
|
values['will_r'] = latest['will_r'] |
|
|
|
values['macd'] = latest['macd']; values['macd_signal'] = latest['macd_signal']; values['macd_diff'] = latest['macd_diff'] |
|
values['adx'] = latest['adx']; values['adx_pos'] = latest['adx_pos']; values['adx_neg'] = latest['adx_neg'] |
|
values['ema_20'] = latest['ema_20']; values['ema_50'] = latest['ema_50']; values['ema_100'] = latest['ema_100'] |
|
values['ichi_a'] = latest['ichi_a']; values['ichi_b'] = latest['ichi_b']; values['ichi_base'] = latest['ichi_base']; values['ichi_conv'] = latest['ichi_conv'] |
|
values['vortex_pos'] = latest['vortex_pos']; values['vortex_neg'] = latest['vortex_neg'] |
|
|
|
values['mfi'] = latest['mfi'] |
|
values['obv'] = latest['obv'] |
|
values['cmf'] = latest['cmf'] |
|
values['volume_ratio'] = latest['volume_ratio'] |
|
values['vwap'] = latest['vwap'] |
|
|
|
values['bb_hband'] = latest['bb_hband']; values['bb_lband'] = latest['bb_lband']; values['bb_mavg'] = latest['bb_mavg']; values['bb_width'] = latest['bb_width'] |
|
|
|
|
|
|
|
|
|
signals['rsi'] = 1 if latest['rsi'] < 30 else (-1 if latest['rsi'] > 70 else 0) |
|
|
|
signals['stoch'] = 1 if latest['stoch_k'] < 25 and prev['stoch_k'] <= prev['stoch_d'] and latest['stoch_k'] > latest['stoch_d'] else (-1 if latest['stoch_k'] > 75 and prev['stoch_k'] >= prev['stoch_d'] and latest['stoch_k'] < latest['stoch_d'] else 0) |
|
|
|
if latest['ao'] > 0 and prev['ao'] <= 0: signals['ao'] = 1 |
|
elif latest['ao'] < 0 and prev['ao'] >= 0: signals['ao'] = -1 |
|
elif latest['ao'] > 0 and prev['ao'] > 0 and latest['ao'] > prev['ao']: signals['ao'] = 0.5 |
|
elif latest['ao'] < 0 and prev['ao'] < 0 and latest['ao'] < prev['ao']: signals['ao'] = -0.5 |
|
else: signals['ao'] = 0 |
|
|
|
signals['will_r'] = 1 if latest['will_r'] > -20 and prev['will_r'] <= -20 else (-1 if latest['will_r'] < -80 and prev['will_r'] >= -80 else 0) |
|
|
|
|
|
if (latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']): signals['macd'] = 1 |
|
elif (latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']): signals['macd'] = -1 |
|
else: signals['macd'] = 0 |
|
|
|
signals['adx'] = 1 if latest['adx'] > 25 and latest['adx_pos'] > latest['adx_neg'] else (-1 if latest['adx'] > 25 and latest['adx_neg'] > latest['adx_pos'] else 0) |
|
|
|
if latest['close'] > latest['ema_20'] and latest['ema_20'] > latest['ema_50'] and latest['ema_50'] > latest['ema_100']: signals['ema_trend'] = 1 |
|
elif latest['close'] < latest['ema_20'] and latest['ema_20'] < latest['ema_50'] and latest['ema_50'] < latest['ema_100']: signals['ema_trend'] = -1 |
|
elif latest['close'] > latest['ema_50']: signals['ema_trend'] = 0.5 |
|
elif latest['close'] < latest['ema_50']: signals['ema_trend'] = -0.5 |
|
else: signals['ema_trend'] = 0 |
|
|
|
ichi_signal = 0 |
|
tenkan_cross_kijun_up = latest['ichi_conv'] > latest['ichi_base'] and prev['ichi_conv'] <= prev['ichi_base'] |
|
tenkan_cross_kijun_down = latest['ichi_conv'] < latest['ichi_base'] and prev['ichi_conv'] >= prev['ichi_base'] |
|
above_cloud = latest['close'] > latest['ichi_a'] and latest['close'] > latest['ichi_b'] |
|
below_cloud = latest['close'] < latest['ichi_a'] and latest['close'] < latest['ichi_b'] |
|
price_above_kijun = latest['close'] > latest['ichi_base'] |
|
price_below_kijun = latest['close'] < latest['ichi_base'] |
|
|
|
if tenkan_cross_kijun_up and above_cloud and price_above_kijun: ichi_signal = 1 |
|
elif tenkan_cross_kijun_down and below_cloud and price_below_kijun: ichi_signal = -1 |
|
elif above_cloud and price_above_kijun and latest['ichi_conv'] > latest['ichi_base']: ichi_signal = 0.5 |
|
elif below_cloud and price_below_kijun and latest['ichi_conv'] < latest['ichi_base']: ichi_signal = -0.5 |
|
signals['ichimoku'] = ichi_signal |
|
|
|
if latest['vortex_pos'] > latest['vortex_neg'] and prev['vortex_pos'] <= prev['vortex_neg']: signals['vortex'] = 1 |
|
elif latest['vortex_neg'] > latest['vortex_pos'] and prev['vortex_neg'] <= prev['vortex_pos']: signals['vortex'] = -1 |
|
else: signals['vortex'] = 0 |
|
|
|
|
|
signals['mfi'] = 1 if latest['mfi'] < 20 else (-1 if latest['mfi'] > 80 else 0) |
|
|
|
signals['cmf'] = 1 if latest['cmf'] > 0.05 else (-1 if latest['cmf'] < -0.05 else 0) |
|
|
|
if len(df) > 5: |
|
try: |
|
obv_sma5 = df['obv'].rolling(window=5).mean().iloc[-1] |
|
|
|
if pd.notna(obv_sma5): |
|
signals['obv_trend'] = 1 if latest['obv'] > obv_sma5 else (-1 if latest['obv'] < obv_sma5 else 0) |
|
else: signals['obv_trend'] = 0 |
|
except IndexError: |
|
signals['obv_trend'] = 0 |
|
else: signals['obv_trend'] = 0 |
|
|
|
if latest['volume_ratio'] > 1.8: |
|
|
|
signals['vol_spike'] = 0.5 if latest['close'] > latest['open'] else (-0.5 if latest['close'] < latest['open'] else 0) |
|
else: signals['vol_spike'] = 0 |
|
|
|
signals['vwap_cross'] = 1 if latest['close'] > latest['vwap'] and prev['close'] <= prev['vwap'] else (-1 if latest['close'] < latest['vwap'] and prev['close'] >= prev['vwap'] else 0) |
|
|
|
|
|
if latest['close'] > latest['bb_hband'] and prev['close'] <= prev['bb_hband']: signals['bbands'] = 1 |
|
elif latest['close'] < latest['bb_lband'] and prev['close'] >= prev['bb_lband']: signals['bbands'] = -1 |
|
else: signals['bbands'] = 0 |
|
|
|
|
|
|
|
|
|
all_signal_keys = list(signals.keys()) |
|
for k in all_signal_keys: |
|
values.setdefault(k, np.nan) |
|
|
|
|
|
for key, value in signals.items(): |
|
if value >= 0.5: final_signals[key] = 1 |
|
elif value <= -0.5: final_signals[key] = -1 |
|
else: final_signals[key] = 0 |
|
|
|
|
|
non_neutral_signals = [s for s in final_signals.values() if s != 0] |
|
composite_score = sum(non_neutral_signals) |
|
|
|
|
|
|
|
num_indicators_signaling = len(final_signals) |
|
signal_strength_threshold = max(3, int(num_indicators_signaling * 0.30)) |
|
|
|
signal_direction = 0 |
|
if num_indicators_signaling > 0: |
|
|
|
if composite_score >= signal_strength_threshold: |
|
signal_direction = 1 |
|
elif composite_score <= -signal_strength_threshold: |
|
signal_direction = -1 |
|
|
|
|
|
value_keys_for_signals = [k for k in values.keys() if k not in ['price', 'volume', 'timestamp', 'atr']] |
|
for k in value_keys_for_signals: |
|
final_signals.setdefault(k, 0) |
|
|
|
return final_signals, values, signal_direction |
|
|
|
except KeyError as e: |
|
print(f"KeyError during signal generation (likely missing indicator column: {e}) in DF columns: {df.columns if df is not None else 'None'}. Check calculation step.") |
|
return None, None, None |
|
except IndexError as e: |
|
print(f"IndexError during signal generation (likely insufficient data rows for prev/latest): {e}. DF length: {len(df) if df is not None else 0}") |
|
return None, None, None |
|
except Exception as e: |
|
print(f"Error generating signals/values: {e}\n{traceback.format_exc()}") |
|
return None, None, None |
|
|
|
|
|
def calculate_trade_params(self, values, signal_direction): |
|
"""Calculate Entry, SL, TP1, TP2 based on ATR""" |
|
params = {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} |
|
try: |
|
|
|
if signal_direction == 0 or \ |
|
not values or \ |
|
pd.isna(values.get('atr')) or values.get('atr', 0) <= 0 or \ |
|
pd.isna(values.get('price')) or values.get('price', 0) <= 0: |
|
return params |
|
|
|
entry_price = values['price'] |
|
atr_val = values['atr'] |
|
|
|
if atr_val <= 0 or entry_price <= 0 : return params |
|
|
|
params['entry'] = entry_price |
|
|
|
if signal_direction == 1: |
|
params['sl'] = entry_price - ATR_SL_MULTIPLIER * atr_val |
|
params['tp1'] = entry_price + ATR_TP1_MULTIPLIER * atr_val |
|
params['tp2'] = entry_price + ATR_TP2_MULTIPLIER * atr_val |
|
elif signal_direction == -1: |
|
params['sl'] = entry_price + ATR_SL_MULTIPLIER * atr_val |
|
params['tp1'] = entry_price - ATR_TP1_MULTIPLIER * atr_val |
|
params['tp2'] = entry_price - ATR_TP2_MULTIPLIER * atr_val |
|
|
|
|
|
|
|
if pd.isna(params['sl']) or pd.isna(params['tp1']) or pd.isna(params['tp2']) or \ |
|
params['sl'] <= 0 or params['tp1'] <= 0 or params['tp2'] <= 0 or \ |
|
(signal_direction == 1 and params['sl'] >= params['entry']) or \ |
|
(signal_direction == -1 and params['sl'] <= params['entry']): |
|
|
|
|
|
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} |
|
|
|
|
|
fee = SIMULATED_FEE_PERCENT / 100.0 |
|
|
|
for lev in LEVERAGES: |
|
|
|
profit_ratio_tp1 = abs(params['tp1'] - entry_price) / entry_price |
|
loss_ratio_sl = abs(params['sl'] - entry_price) / entry_price |
|
|
|
|
|
leveraged_profit_tp1 = profit_ratio_tp1 * lev |
|
leveraged_loss_sl = loss_ratio_sl * lev |
|
|
|
|
|
fee_impact = 2 * fee * lev |
|
|
|
|
|
dollar_profit_tp1 = leveraged_profit_tp1 - fee_impact |
|
dollar_loss_sl = -leveraged_loss_sl - fee_impact |
|
|
|
params['lev_profit'][f'{lev}x'] = {'tp1_profit_$': round(dollar_profit_tp1, 3), 'sl_loss_$': round(dollar_loss_sl, 3)} |
|
|
|
return params |
|
|
|
except Exception as e: |
|
print(f"Error calculating trade params for signal {signal_direction}, values {values}: {e}\n{traceback.format_exc()}") |
|
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}} |
|
|
|
|
|
def _run_simple_backtest(self, symbol, timeframe, df): |
|
"""VERY Basic backtest simulation on the provided DataFrame (Uses longer history).""" |
|
default_result = {'symbol': symbol, 'timeframe': timeframe, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0} |
|
|
|
min_backtest_len = BACKTEST_HISTORY_CANDLES + 50 |
|
|
|
|
|
if df is None or not isinstance(df, pd.DataFrame) or len(df) < min_backtest_len: |
|
|
|
return default_result |
|
|
|
try: |
|
|
|
if not isinstance(df.index, pd.DatetimeIndex): |
|
df.index = pd.to_datetime(df.index) |
|
df = df.sort_index() |
|
|
|
|
|
|
|
backtest_start_iloc = len(df) - BACKTEST_HISTORY_CANDLES |
|
if backtest_start_iloc < 0: backtest_start_iloc = 0 |
|
|
|
|
|
|
|
|
|
|
|
trades = [] |
|
in_position = False |
|
entry_price = 0 |
|
position_direction = 0 |
|
stop_loss = 0 |
|
take_profit = 0 |
|
entry_timestamp = None |
|
|
|
|
|
|
|
|
|
first_signal_candle_idx = max(50, backtest_start_iloc) |
|
|
|
for i in range(first_signal_candle_idx, len(df)): |
|
current_row = df.iloc[i] |
|
signal_candle_iloc = i - 1 |
|
|
|
|
|
|
|
|
|
if signal_candle_iloc < 1: continue |
|
df_for_signal_calc = df.iloc[:signal_candle_iloc + 1] |
|
|
|
|
|
if df_for_signal_calc is None or len(df_for_signal_calc) < 2: |
|
continue |
|
|
|
|
|
sim_signals, sim_values, sim_direction = self.generate_signals_and_values(df_for_signal_calc) |
|
|
|
|
|
|
|
if not in_position and sim_direction != 0 and sim_values: |
|
|
|
atr_at_entry = sim_values.get('atr') |
|
|
|
entry_price_candidate = current_row['open'] |
|
entry_timestamp = current_row.name |
|
|
|
|
|
if pd.notna(entry_price_candidate) and entry_price_candidate > 0 and \ |
|
pd.notna(atr_at_entry) and atr_at_entry > 0: |
|
|
|
|
|
if sim_direction == 1: |
|
potential_sl = entry_price_candidate - ATR_SL_MULTIPLIER * atr_at_entry |
|
potential_tp = entry_price_candidate + ATR_TP1_MULTIPLIER * atr_at_entry |
|
else: |
|
potential_sl = entry_price_candidate + ATR_SL_MULTIPLIER * atr_at_entry |
|
potential_tp = entry_price_candidate - ATR_TP1_MULTIPLIER * atr_at_entry |
|
|
|
|
|
if not (pd.isna(potential_sl) or pd.isna(potential_tp) or potential_sl <= 0 or potential_tp <= 0 or \ |
|
(sim_direction == 1 and potential_sl >= entry_price_candidate) or \ |
|
(sim_direction == -1 and potential_sl <= entry_price_candidate)): |
|
|
|
in_position = True |
|
position_direction = sim_direction |
|
entry_price = entry_price_candidate |
|
stop_loss = potential_sl |
|
take_profit = potential_tp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elif in_position: |
|
exit_price = None |
|
pnl = 0 |
|
exit_reason = "N/A" |
|
current_high = current_row['high'] |
|
current_low = current_row['low'] |
|
exit_timestamp = current_row.name |
|
|
|
|
|
if pd.isna(current_high) or pd.isna(current_low): |
|
print(f"BT Warning {symbol} {timeframe}: NaN High/Low at {exit_timestamp}, cannot check exit.") |
|
continue |
|
|
|
|
|
|
|
if position_direction == 1: |
|
if current_low <= stop_loss: |
|
exit_price = stop_loss |
|
exit_reason = "SL Hit" |
|
elif current_high >= take_profit: |
|
exit_price = take_profit |
|
exit_reason = "TP1 Hit" |
|
elif position_direction == -1: |
|
if current_high >= stop_loss: |
|
exit_price = stop_loss |
|
exit_reason = "SL Hit" |
|
elif current_low <= take_profit: |
|
exit_price = take_profit |
|
exit_reason = "TP1 Hit" |
|
|
|
|
|
if exit_price is not None: |
|
|
|
|
|
if position_direction == 1: |
|
pnl = exit_price - entry_price |
|
else: |
|
pnl = entry_price - exit_price |
|
|
|
|
|
|
|
entry_fee = (SIMULATED_FEE_PERCENT / 100.0) * entry_price |
|
exit_fee = (SIMULATED_FEE_PERCENT / 100.0) * exit_price |
|
pnl -= (entry_fee + exit_fee) |
|
|
|
trades.append({'entry': entry_price, 'exit': exit_price, 'pnl': pnl, 'direction': position_direction}) |
|
|
|
in_position = False |
|
entry_timestamp = None |
|
|
|
|
|
num_trades = len(trades) |
|
if num_trades > 0: |
|
wins = sum(1 for t in trades if t['pnl'] > 0) |
|
win_rate = (wins / num_trades * 100) |
|
total_pnl = sum(t['pnl'] for t in trades) |
|
|
|
pnl_percentage_sum = sum((t['pnl'] / t['entry']) * 100 for t in trades if t['entry'] > 0) |
|
else: |
|
win_rate = 0 |
|
total_pnl = 0 |
|
pnl_percentage_sum = 0 |
|
|
|
|
|
|
|
return { |
|
'symbol': symbol, 'timeframe': timeframe, 'trades': num_trades, |
|
'win_rate': round(win_rate, 2), 'pnl_sum': round(total_pnl, 5), |
|
'pnl_%_sum': round(pnl_percentage_sum, 2) |
|
} |
|
except Exception as e: |
|
print(f"Error during backtest simulation for {symbol} {timeframe}: {e}\n{traceback.format_exc()}") |
|
|
|
|
|
|
|
|
|
return default_result |
|
|
|
|
|
def analyze_symbol(self, symbol, progress): |
|
"""Analyzes a single symbol across all valid timeframes.""" |
|
timeframe_details = {} |
|
heatmap_composites = {} |
|
symbol_backtest_results = [] |
|
log_msgs = [] |
|
symbol_active_signals = {} |
|
symbol_hover_details = {} |
|
|
|
if not self.valid_timeframes: |
|
return timeframe_details, heatmap_composites, symbol_backtest_results, [f"No valid timeframes for {symbol}."], {}, {} |
|
|
|
|
|
time.sleep(0.1) |
|
|
|
for i, timeframe in enumerate(self.valid_timeframes): |
|
progress(i / len(self.valid_timeframes), desc=f"Fetching {symbol} [{timeframe}]") |
|
|
|
|
|
ohlcv, err_msg = self.fetch_ohlcv_data(symbol, timeframe) |
|
if err_msg: |
|
log_msgs.append(f"Data fetch skip: {symbol} [{timeframe}] {err_msg}") |
|
heatmap_composites[timeframe] = 0 |
|
|
|
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None)) |
|
symbol_hover_details[timeframe] = {} |
|
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.3)) |
|
continue |
|
|
|
|
|
progress((i + 0.3) / len(self.valid_timeframes), desc=f"Calculating Ind. {symbol} [{timeframe}]") |
|
df = self.calculate_indicators(ohlcv, timeframe) |
|
if df is None or df.empty: |
|
log_msgs.append(f"Indicator calc skip: {symbol} [{timeframe}] (DataFrame invalid or empty)") |
|
heatmap_composites[timeframe] = 0 |
|
|
|
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None)) |
|
symbol_hover_details[timeframe] = {} |
|
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2)) |
|
continue |
|
|
|
|
|
progress((i + 0.6) / len(self.valid_timeframes), desc=f"Generating Sig. {symbol} [{timeframe}]") |
|
signals, values, signal_direction = self.generate_signals_and_values(df) |
|
|
|
if signals is not None and values is not None: |
|
|
|
trade_params = self.calculate_trade_params(values, signal_direction) |
|
|
|
timeframe_details[timeframe] = { |
|
'signals': signals, |
|
'values': values, |
|
'trade_params': trade_params, |
|
'direction': signal_direction |
|
} |
|
|
|
num_potential_signals = len(signals) |
|
composite = sum(signals.values()) / num_potential_signals if num_potential_signals > 0 else 0 |
|
heatmap_composites[timeframe] = composite |
|
|
|
|
|
symbol_hover_details[timeframe] = { |
|
'Price': values.get('price', np.nan), |
|
'RSI': values.get('rsi', np.nan), |
|
'MACD': values.get('macd', np.nan), |
|
'StochK': values.get('stoch_k', np.nan), |
|
'ADX': values.get('adx', np.nan), |
|
'WillR': values.get('will_r', np.nan), |
|
} |
|
|
|
|
|
if signal_direction != 0 and trade_params.get('entry') is not None: |
|
active_sig_details = { |
|
'direction': signal_direction, |
|
'entry': trade_params['entry'], |
|
'sl': trade_params['sl'], |
|
'tp1': trade_params['tp1'], |
|
'tp2': trade_params['tp2'] |
|
} |
|
symbol_active_signals[timeframe] = active_sig_details |
|
|
|
|
|
log_data = { |
|
'LogTimestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
'SignalCandleTime': values.get('timestamp', 'N/A'), |
|
'Symbol': symbol, |
|
'Timeframe': timeframe, |
|
'Direction': 'LONG' if signal_direction == 1 else 'SHORT', |
|
'Entry': f"{trade_params['entry']:.8f}", |
|
'SL': f"{trade_params['sl']:.8f}", |
|
'TP1': f"{trade_params['tp1']:.8f}", |
|
'TP2': f"{trade_params['tp2']:.8f}", |
|
'Status': 'Triggered' |
|
|
|
|
|
|
|
|
|
} |
|
log_signal_to_csv(log_data) |
|
|
|
|
|
else: |
|
log_msgs.append(f"Signal gen skip: {symbol} [{timeframe}]") |
|
heatmap_composites[timeframe] = 0 |
|
symbol_hover_details[timeframe] = {} |
|
|
|
|
|
progress((i + 0.9) / len(self.valid_timeframes), desc=f"Backtesting {symbol} [{timeframe}]") |
|
|
|
bt_result = self._run_simple_backtest(symbol, timeframe, df.copy()) |
|
symbol_backtest_results.append(bt_result) |
|
|
|
|
|
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2)) |
|
|
|
return timeframe_details, heatmap_composites, symbol_backtest_results, log_msgs, symbol_active_signals, symbol_hover_details |
|
|
|
|
|
def run_full_analysis(self, progress=gr.Progress()): |
|
"""Runs the complete analysis for top coins across selected timeframes.""" |
|
self.analysis_results = {} |
|
self.backtest_results = [] |
|
self.active_signals_df = pd.DataFrame() |
|
self.heatmap_details = {} |
|
all_log_msgs = [] |
|
all_active_signals_list = [] |
|
|
|
|
|
progress(0, desc="Fetching top coins...") |
|
symbols, msg = self.fetch_top_coins() |
|
all_log_msgs.append(msg) |
|
if not symbols: |
|
self.heatmap_df = pd.DataFrame() |
|
all_log_msgs.append("Error: No symbols found to analyze. Stopping.") |
|
|
|
return {}, pd.DataFrame(), [], pd.DataFrame(), {}, all_log_msgs |
|
|
|
|
|
heatmap_data_list = [] |
|
total_symbols = len(symbols) |
|
progress(0.05, desc=f"Starting analysis for {total_symbols} coins...") |
|
|
|
for idx, symbol in enumerate(symbols): |
|
|
|
symbol_progress_start = 0.05 + (idx / total_symbols) * 0.90 |
|
symbol_progress_range = (1 / total_symbols) * 0.90 |
|
symbol_progress_desc = f"Analyzing {symbol} ({idx+1}/{total_symbols})" |
|
progress(symbol_progress_start, desc=symbol_progress_desc) |
|
|
|
|
|
symbol_progress_tracker = lambda p, desc=symbol_progress_desc: progress( |
|
symbol_progress_start + (p * symbol_progress_range), desc=desc |
|
) |
|
|
|
try: |
|
|
|
|
|
tf_details, tf_heatmap_scores, symbol_bt_results, log_msgs, symbol_active_tf_signals, symbol_tf_hover_details = \ |
|
self.analyze_symbol(symbol, symbol_progress_tracker) |
|
|
|
all_log_msgs.extend(log_msgs) |
|
self.backtest_results.extend(symbol_bt_results) |
|
|
|
|
|
if tf_details: |
|
self.analysis_results[symbol] = tf_details |
|
|
|
coin_name = symbol.split('/')[0] |
|
heatmap_row = {'Coin': coin_name} |
|
symbol_hover_data = {} |
|
|
|
|
|
for tf in self.valid_timeframes: |
|
heatmap_row[tf] = tf_heatmap_scores.get(tf, 0) |
|
symbol_hover_data[tf] = symbol_tf_hover_details.get(tf, {}) |
|
|
|
heatmap_data_list.append(heatmap_row) |
|
self.heatmap_details[coin_name] = symbol_hover_data |
|
|
|
|
|
for tf, active_sig in symbol_active_tf_signals.items(): |
|
|
|
all_active_signals_list.append({ |
|
'Symbol': symbol, |
|
'Timeframe': tf, |
|
'Direction': 'LONG' if active_sig['direction'] == 1 else 'SHORT', |
|
'Entry': f"{active_sig['entry']:.8f}", |
|
'SL': f"{active_sig['sl']:.8f}", |
|
'TP1': f"{active_sig['tp1']:.8f}", |
|
'TP2': f"{active_sig['tp2']:.8f}", |
|
}) |
|
|
|
except ccxt.RateLimitExceeded as e: |
|
wait_time = 60 |
|
all_log_msgs.append(f"Rate limit exceeded analyzing {symbol}. Sleeping for {wait_time}s... {e}") |
|
print(all_log_msgs[-1]) |
|
progress(symbol_progress_start + symbol_progress_range * 0.9, desc=f"Rate Limit Hit! Waiting {wait_time}s...") |
|
time.sleep(wait_time) |
|
|
|
except Exception as e: |
|
error_msg = f"Critical error processing {symbol}: {e}\n{traceback.format_exc()}" |
|
all_log_msgs.append(error_msg) |
|
print(error_msg) |
|
|
|
coin_name = symbol.split('/')[0] |
|
row = {'Coin': coin_name} |
|
for tf in self.valid_timeframes: row[tf] = 0 |
|
heatmap_data_list.append(row) |
|
self.heatmap_details[coin_name] = {tf: {} for tf in self.valid_timeframes} |
|
for tf in self.valid_timeframes: |
|
|
|
self.backtest_results.append({'symbol': symbol, 'timeframe': tf, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0}) |
|
|
|
progress(0.95, desc="Finalizing results...") |
|
|
|
|
|
if not heatmap_data_list: |
|
self.heatmap_df = pd.DataFrame() |
|
all_log_msgs.append("Warning: No heatmap data generated (no symbols processed successfully?).") |
|
else: |
|
self.heatmap_df = pd.DataFrame(heatmap_data_list).set_index('Coin') |
|
|
|
if self.valid_timeframes: |
|
|
|
cols_to_keep = [tf for tf in self.valid_timeframes if tf in self.heatmap_df.columns] |
|
self.heatmap_df = self.heatmap_df[cols_to_keep] |
|
|
|
|
|
self.heatmap_df = self.heatmap_df.reindex(columns=self.valid_timeframes, fill_value=0) |
|
self.heatmap_df.index.name = 'Coin' |
|
|
|
|
|
|
|
if all_active_signals_list: |
|
self.active_signals_df = pd.DataFrame(all_active_signals_list) |
|
|
|
self.active_signals_df['tf_order'] = self.active_signals_df['Timeframe'].map(TIMEFRAME_ORDER_MAP) |
|
self.active_signals_df = self.active_signals_df.sort_values(by=['Symbol', 'tf_order']).drop('tf_order', axis=1) |
|
|
|
self.active_signals_df = self.active_signals_df[['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']] |
|
else: |
|
|
|
self.active_signals_df = pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']) |
|
|
|
|
|
|
|
if self.backtest_results: |
|
|
|
valid_bt_results = [res for res in self.backtest_results if isinstance(res, dict)] |
|
if valid_bt_results: |
|
bt_df = pd.DataFrame(valid_bt_results) |
|
try: |
|
bt_df.to_csv(BACKTEST_RESULTS_FILE, index=False) |
|
all_log_msgs.append(f"Backtest summary saved to {BACKTEST_RESULTS_FILE}") |
|
except Exception as e: |
|
all_log_msgs.append(f"Error saving backtest results: {e}") |
|
print(f"Error saving backtest results: {e}\n{traceback.format_exc()}") |
|
else: |
|
all_log_msgs.append("No valid backtest results generated to save.") |
|
else: |
|
all_log_msgs.append("No backtest results generated.") |
|
|
|
progress(1, desc="Analysis complete.") |
|
all_log_msgs.append("Analysis Complete.") |
|
if os.path.exists(SIGNAL_LOG_FILE): |
|
all_log_msgs.append(f"Signals logged to {SIGNAL_LOG_FILE}") |
|
|
|
|
|
return self.analysis_results, self.heatmap_df, self.backtest_results, self.active_signals_df, self.heatmap_details, all_log_msgs |
|
|
|
|
|
|
|
|
|
def create_plotly_heatmap(df, heatmap_details): |
|
"""Creates heatmap with enhanced hover text including key indicator values.""" |
|
if df is None or df.empty: |
|
print("Heatmap DF is empty, returning empty figure.") |
|
return go.Figure(layout=go.Layout(title="No data available for heatmap", height=300)) |
|
|
|
colorscale = pcolors.diverging.RdYlGn |
|
|
|
try: |
|
|
|
df.index = df.index.astype(str) |
|
df.columns = df.columns.astype(str) |
|
heatmap_values = df.apply(pd.to_numeric, errors='coerce').fillna(0).values |
|
rows = list(df.index) |
|
cols = list(df.columns) |
|
except Exception as e: |
|
print(f"Error preparing heatmap data: {e}") |
|
return go.Figure(layout=go.Layout(title=f"Error preparing heatmap: {e}", height=300)) |
|
|
|
|
|
|
|
hover_texts = [] |
|
for r_idx, coin in enumerate(rows): |
|
row_texts = [] |
|
coin_details_by_tf = heatmap_details.get(coin, {}) |
|
for c_idx, tf in enumerate(cols): |
|
try: |
|
score = heatmap_values[r_idx, c_idx] |
|
|
|
details = coin_details_by_tf.get(tf, {}) |
|
|
|
|
|
price_val = details.get('Price', 'N/A') |
|
rsi_val = details.get('RSI', 'N/A') |
|
macd_val = details.get('MACD', 'N/A') |
|
stochk_val = details.get('StochK', 'N/A') |
|
adx_val = details.get('ADX', 'N/A') |
|
willr_val = details.get('WillR', 'N/A') |
|
|
|
|
|
text = f"<b>Coin:</b> {coin}<br>" |
|
text += f"<b>Timeframe:</b> {tf}<br>" |
|
|
|
text += f"<b>Price:</b> {float(price_val):.8f}<br>" if isinstance(price_val, (int, float)) and not pd.isna(price_val) else f"<b>Price:</b> {price_val}<br>" |
|
text += f"<b>Score:</b> {score:.3f}<br>" |
|
text += "----------<br>" |
|
|
|
try: text += f"RSI: {float(rsi_val):.1f}<br>" if isinstance(rsi_val, (int, float)) and not pd.isna(rsi_val) else f"RSI: {rsi_val}<br>" |
|
except: text += f"RSI: {rsi_val}<br>" |
|
try: text += f"MACD: {float(macd_val):.6f}<br>" if isinstance(macd_val, (int, float)) and not pd.isna(macd_val) else f"MACD: {macd_val}<br>" |
|
except: text += f"MACD: {macd_val}<br>" |
|
try: text += f"Stoch K: {float(stochk_val):.1f}<br>" if isinstance(stochk_val, (int, float)) and not pd.isna(stochk_val) else f"Stoch K: {stochk_val}<br>" |
|
except: text += f"Stoch K: {stochk_val}<br>" |
|
try: text += f"ADX: {float(adx_val):.1f}<br>" if isinstance(adx_val, (int, float)) and not pd.isna(adx_val) else f"ADX: {adx_val}<br>" |
|
except: text += f"ADX: {adx_val}<br>" |
|
try: text += f"Will %R: {float(willr_val):.1f}<br>" if isinstance(willr_val, (int, float)) and not pd.isna(willr_val) else f"Will %R: {willr_val}<br>" |
|
except: text += f"Will %R: {willr_val}<br>" |
|
|
|
text += "<extra></extra>" |
|
|
|
row_texts.append(text) |
|
except Exception as hover_e: |
|
print(f"Error generating hover text for {coin}/{tf}: {hover_e}") |
|
row_texts.append(f"Error displaying hover for {coin}/{tf}") |
|
hover_texts.append(row_texts) |
|
|
|
try: |
|
fig = go.Figure(data=go.Heatmap( |
|
z=heatmap_values, |
|
x=cols, |
|
y=rows, |
|
colorscale=colorscale, |
|
zmid=0, zmin=-0.6, zmax=0.6, |
|
hoverongaps=False, |
|
hoverinfo='text', |
|
text=hover_texts, |
|
texttemplate=None |
|
)) |
|
fig.update_layout( |
|
title='Cryptocurrency Signal Strength Heatmap (Hover for Key Values, Click Cell for Full Details)', |
|
xaxis_title='Timeframe', |
|
yaxis_title='Coin', |
|
yaxis={'tickmode': 'linear', 'tickfont': {'size': 9}, 'automargin': True}, |
|
xaxis={'tickmode': 'linear'}, |
|
height=max(450, len(rows) * 18 + 100), |
|
margin=dict(l=70, r=50, t=60, b=50) |
|
) |
|
|
|
return fig |
|
except Exception as e: |
|
print(f"Error creating Plotly heatmap figure: {e}\n{traceback.format_exc()}") |
|
|
|
return go.Figure(layout=go.Layout(title=f"Error creating heatmap: {e}", height=400)) |
|
|
|
|
|
def format_heatmap_click_details(evt: gr.SelectData, current_state): |
|
"""Formats detailed analysis results for the specific cell clicked on the heatmap.""" |
|
|
|
if evt is None or evt.index is None or not isinstance(evt.index, (list, tuple)) or len(evt.index) != 2: |
|
|
|
return "Click on a heatmap cell after running analysis to see details here. (Ensure you clicked a colored cell)" |
|
|
|
|
|
if not isinstance(current_state, dict) or 'analysis_results' not in current_state or 'heatmap_df' not in current_state: |
|
|
|
return "Analysis data not found in state. Please run the analysis first." |
|
|
|
try: |
|
row_index, col_index = evt.index |
|
heatmap_df = current_state.get('heatmap_df') |
|
analysis_data = current_state.get('analysis_results') |
|
|
|
if heatmap_df is None or heatmap_df.empty or analysis_data is None: |
|
|
|
return "Heatmap or analysis data is not available. Please run analysis." |
|
|
|
|
|
if not (0 <= row_index < len(heatmap_df.index) and 0 <= col_index < len(heatmap_df.columns)): |
|
|
|
return "Error: Clicked cell index is out of bounds." |
|
|
|
coin_name = heatmap_df.index[row_index] |
|
timeframe = heatmap_df.columns[col_index] |
|
|
|
|
|
|
|
full_symbol = None |
|
for symbol_key in analysis_data.keys(): |
|
|
|
if symbol_key.startswith(str(coin_name) + '/'): |
|
full_symbol = symbol_key |
|
break |
|
|
|
if not full_symbol or full_symbol not in analysis_data: |
|
|
|
return f"Details not found for {coin_name} (symbol mismatch or no analysis data?)." |
|
|
|
|
|
details = analysis_data.get(full_symbol, {}).get(timeframe) |
|
if details is None: |
|
|
|
return f"No analysis data available for {coin_name} on {timeframe}." |
|
|
|
|
|
values = details.get('values', {}) |
|
signals = details.get('signals', {}) |
|
trade_params = details.get('trade_params', {}) |
|
direction = details.get('direction', 0) |
|
|
|
if not values or not signals: |
|
|
|
return f"Incomplete data for {coin_name} [{timeframe}]. Cannot display details." |
|
|
|
markdown_str = f"### Details for {coin_name} ({full_symbol}) [{timeframe}]\n\n" |
|
markdown_str += f"- **Timestamp:** {values.get('timestamp', 'N/A')}\n" |
|
price = values.get('price', np.nan) |
|
markdown_str += f"- **Price:** {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- **Price:** {price}\n" |
|
volume = values.get('volume', np.nan) |
|
markdown_str += f"- **Volume:** {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- **Volume:** {volume}\n" |
|
atr = values.get('atr', np.nan) |
|
markdown_str += f"- **ATR:** {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- **ATR:** {atr}\n\n" |
|
|
|
|
|
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n" |
|
|
|
markdown_str += "**Indicator Values & Individual Signals:**\n" |
|
markdown_str += "| Indicator | Value | Signal |\n" |
|
markdown_str += "|----------------|-----------------|--------|\n" |
|
|
|
|
|
indicator_keys = sorted(signals.keys()) |
|
|
|
for name in indicator_keys: |
|
val = values.get(name, 'N/A') |
|
sig_val = signals.get(name, 0) |
|
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)') |
|
|
|
|
|
if isinstance(val, (int, float)) and not pd.isna(val): |
|
if abs(val) > 100000: val_str = f"{val:,.0f}" |
|
elif abs(val) > 100: val_str = f"{val:,.2f}" |
|
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" |
|
elif abs(val) < 1: val_str = f"{val:.8f}" |
|
else: val_str = f"{val:.6f}" |
|
elif pd.isna(val): val_str = "NaN" |
|
else: val_str = str(val) |
|
|
|
|
|
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n" |
|
|
|
|
|
if trade_params and trade_params.get('entry') is not None: |
|
markdown_str += f"\n**Potential Trade Setup (Based on this signal & ATR):**\n" |
|
markdown_str += f"- **Direction:** {'LONG' if direction > 0 else 'SHORT'}\n" |
|
|
|
markdown_str += f"- **Entry:** {trade_params['entry']:.8f}\n" |
|
markdown_str += f"- **Stop Loss:** {trade_params['sl']:.8f}\n" |
|
markdown_str += f"- **Take Profit 1:** {trade_params['tp1']:.8f}\n" |
|
markdown_str += f"- **Take Profit 2:** {trade_params['tp2']:.8f}\n\n" |
|
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n" |
|
for lev, pnl_data in trade_params.get('lev_profit', {}).items(): |
|
tp1_pnl = pnl_data.get('tp1_profit_$','N/A') |
|
sl_loss = pnl_data.get('sl_loss_$','N/A') |
|
|
|
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl) |
|
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss) |
|
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n" |
|
else: |
|
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe at this time.\n" |
|
|
|
|
|
return markdown_str |
|
|
|
except IndexError: |
|
|
|
return "Error processing click: Index out of range. Please ensure the heatmap is up to date." |
|
except KeyError as e: |
|
|
|
return f"Error processing click: Missing expected data key '{e}'. Analysis data might be incomplete." |
|
except Exception as e: |
|
print(f"Unexpected error formatting heatmap click details: {e}\n{traceback.format_exc()}") |
|
return f"An unexpected error occurred displaying details for the clicked cell: {e}" |
|
|
|
def format_coin_details(symbol, analysis_data, valid_timeframes): |
|
"""Formats full details for a selected coin across all analyzed timeframes.""" |
|
if not analysis_data or symbol not in analysis_data: |
|
return f"No analysis data available for {symbol}. Please run analysis first." |
|
|
|
coin_data_per_tf = analysis_data[symbol] |
|
markdown_str = f"## Full Details for {symbol}\n\n" |
|
|
|
|
|
|
|
for timeframe in valid_timeframes: |
|
markdown_str += f"---\n### Timeframe: {timeframe}\n" |
|
details = coin_data_per_tf.get(timeframe) |
|
|
|
|
|
if details is None: |
|
markdown_str += f"*No analysis data generated for this timeframe.*\n" |
|
continue |
|
|
|
|
|
values = details.get('values') |
|
signals = details.get('signals') |
|
trade_params = details.get('trade_params') |
|
direction = details.get('direction') |
|
|
|
if values is None or signals is None or trade_params is None or direction is None: |
|
markdown_str += f"*Incomplete analysis data for this timeframe.*\n" |
|
continue |
|
|
|
|
|
markdown_str += f"- Timestamp: {values.get('timestamp', 'N/A')}\n" |
|
price = values.get('price', np.nan) |
|
markdown_str += f"- Price: {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- Price: {price}\n" |
|
volume = values.get('volume', np.nan) |
|
markdown_str += f"- Volume: {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- Volume: {volume}\n" |
|
atr = values.get('atr', np.nan) |
|
markdown_str += f"- ATR: {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- ATR: {atr}\n\n" |
|
|
|
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n" |
|
|
|
markdown_str += "**Indicator Values & Individual Signals:**\n" |
|
markdown_str += "| Indicator | Value | Signal |\n" |
|
markdown_str += "|----------------|-----------------|--------|\n" |
|
indicator_keys = sorted(signals.keys()) |
|
for name in indicator_keys: |
|
val = values.get(name, 'N/A') |
|
sig_val = signals.get(name, 0) |
|
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)') |
|
|
|
if isinstance(val, (int, float)) and not pd.isna(val): |
|
if abs(val) > 100000: val_str = f"{val:,.0f}" |
|
elif abs(val) > 100: val_str = f"{val:,.2f}" |
|
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" |
|
elif abs(val) < 1: val_str = f"{val:.8f}" |
|
else: val_str = f"{val:.6f}" |
|
elif pd.isna(val): val_str = "NaN" |
|
else: val_str = str(val) |
|
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n" |
|
|
|
|
|
if trade_params and trade_params.get('entry') is not None: |
|
markdown_str += f"\n**Potential Trade Setup:**\n" |
|
markdown_str += f"- Direction: {'LONG' if direction > 0 else 'SHORT'}\n" |
|
|
|
markdown_str += f"- Entry: {trade_params['entry']:.8f}\n" |
|
markdown_str += f"- SL: {trade_params['sl']:.8f}\n" |
|
markdown_str += f"- TP1: {trade_params['tp1']:.8f}\n" |
|
markdown_str += f"- TP2: {trade_params['tp2']:.8f}\n\n" |
|
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n" |
|
for lev, pnl_data in trade_params.get('lev_profit', {}).items(): |
|
tp1_pnl = pnl_data.get('tp1_profit_$','N/A') |
|
sl_loss = pnl_data.get('sl_loss_$','N/A') |
|
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl) |
|
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss) |
|
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n" |
|
else: |
|
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe.\n" |
|
|
|
markdown_str += "\n---\n" |
|
return markdown_str |
|
|
|
|
|
def find_zones(heatmap_df, min_confirmation_threshold): |
|
"""Finds potential long/short zones based on average heatmap score.""" |
|
if heatmap_df is None or heatmap_df.empty: |
|
print("Zone finding skipped: Heatmap DF is empty.") |
|
return [], [] |
|
|
|
try: |
|
|
|
|
|
numeric_df = heatmap_df.apply(pd.to_numeric, errors='coerce').dropna(axis=1, how='all') |
|
if numeric_df.empty: |
|
print("Zone finding skipped: No numeric timeframe columns found after coercion.") |
|
return [],[] |
|
|
|
|
|
avg_scores = numeric_df.mean(axis=1, skipna=True) |
|
|
|
|
|
|
|
|
|
zone_threshold_abs = max(0.1, min(1.0, min_confirmation_threshold * 0.4)) |
|
long_zone_threshold = zone_threshold_abs |
|
short_zone_threshold = -zone_threshold_abs |
|
|
|
|
|
|
|
|
|
long_zone_coins = avg_scores[avg_scores >= long_zone_threshold].sort_values(ascending=False) |
|
short_zone_coins = avg_scores[avg_scores <= short_zone_threshold].sort_values(ascending=True) |
|
|
|
|
|
long_zone_list = list(zip(long_zone_coins.index.astype(str), long_zone_coins.round(3))) |
|
short_zone_list = list(zip(short_zone_coins.index.astype(str), short_zone_coins.round(3))) |
|
|
|
print(f"Zones Found: {len(long_zone_list)} long, {len(short_zone_list)} short candidates (Threshold +/- {zone_threshold_abs:.3f}).") |
|
return long_zone_list, short_zone_list |
|
except Exception as e: |
|
print(f"Error finding zones: {e}\n{traceback.format_exc()}") |
|
return [],[] |
|
|
|
def format_backtest_summary(backtest_results): |
|
"""Formats the list of backtest result dictionaries into a DataFrame for display.""" |
|
if not backtest_results: |
|
print("No backtest results to format.") |
|
|
|
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) |
|
|
|
try: |
|
|
|
valid_results = [r for r in backtest_results if isinstance(r, dict)] |
|
if not valid_results: |
|
print("No valid dictionary entries found in backtest results.") |
|
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) |
|
|
|
df = pd.DataFrame(valid_results) |
|
|
|
|
|
required_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_sum', 'pnl_%_sum'] |
|
for col in required_cols: |
|
if col not in df.columns: |
|
print(f"Warning: Backtest result missing column '{col}', filling default.") |
|
if col in ['trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']: |
|
df[col] = 0 |
|
else: |
|
df[col] = 'N/A' |
|
|
|
|
|
|
|
df['tf_order'] = df['timeframe'].map(TIMEFRAME_ORDER_MAP).fillna(99) |
|
df = df.sort_values(by=['symbol', 'tf_order']).drop('tf_order', axis=1) |
|
|
|
|
|
df = df.rename(columns={'pnl_sum': 'pnl_abs_sum', 'pnl_%_sum': 'pnl_%_sum_on_entry'}) |
|
|
|
|
|
display_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'] |
|
|
|
final_cols = [col for col in display_cols if col in df.columns] |
|
df = df[final_cols] |
|
|
|
return df |
|
except Exception as e: |
|
print(f"Error formatting backtest summary: {e}\n{traceback.format_exc()}") |
|
|
|
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']) |
|
|
|
|
|
|
|
def create_gradio_app(): |
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue), title="Crypto Signal & Backtest V3") as app: |
|
gr.Markdown("# Crypto Multi-Indicator, Multi-Timeframe Signal & Backtest V3") |
|
gr.Markdown(f"*Warning: Analysis uses up to **{LIMIT_PER_TIMEFRAME}** candles per timeframe and backtests on the last **{BACKTEST_HISTORY_CANDLES}**. This can be **very slow** and **API-intensive**. Rate limits may occur. Use fewer coins/timeframes for faster results. Signals are logged to **`{SIGNAL_LOG_FILE}`**.*") |
|
|
|
|
|
shared_state = gr.State({ |
|
'analysis_results': {}, |
|
'heatmap_df': pd.DataFrame(), |
|
'heatmap_details': {}, |
|
'backtest_results': [], |
|
'active_signals_df': pd.DataFrame(), |
|
'valid_timeframes': [], |
|
'analyzer': None |
|
}) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("## Configuration") |
|
|
|
try: |
|
available_exchanges = ccxt.exchanges |
|
except Exception as e: |
|
print(f"Warning: Could not fetch ccxt exchanges list: {e}") |
|
available_exchanges = [DEFAULT_EXCHANGE_ID] |
|
|
|
exchange_input = gr.Dropdown(label="Exchange", choices=available_exchanges, value=DEFAULT_EXCHANGE_ID, interactive=True) |
|
top_n_input = gr.Slider(label="Number of Top Coins by Volume", minimum=5, maximum=100, step=5, value=DEFAULT_TOP_N_COINS, interactive=True) |
|
all_timeframes = list(TIMEFRAME_ORDER_MAP.keys()) |
|
timeframe_input = gr.CheckboxGroup(label="Select Timeframes (Fewer = Faster)", choices=all_timeframes, value=DEFAULT_TIMEFRAMES, interactive=True) |
|
|
|
run_button = gr.Button("Run Analysis & Backtest", variant="primary") |
|
status_log = gr.Textbox(label="Status Log", lines=15, interactive=False, placeholder="Analysis logs will appear here...", max_lines=30) |
|
|
|
with gr.Column(scale=3): |
|
gr.Markdown("## Results") |
|
with gr.Tabs(): |
|
with gr.TabItem("Heatmap"): |
|
gr.Markdown("Signal strength heatmap based on composite indicator score. Hover over cells for key values, click a cell for full details below.") |
|
|
|
|
|
heatmap_plot = gr.Plot(label="Signal Heatmap", show_label=False) |
|
heatmap_detail_output = gr.Markdown(label="Clicked Cell Full Details", value="*Click on a heatmap cell after analysis to see full indicator details and trade setup.*") |
|
|
|
with gr.TabItem("Active Trade Setups"): |
|
gr.Markdown("### Potential Trade Setups (Current Snapshot)") |
|
gr.Markdown(f"*Shows pairs and timeframes with a non-neutral signal direction and valid ATR-based parameters from the **latest** analyzed candle. Signals logged to `{SIGNAL_LOG_FILE}`. This is NOT financial advice. DYOR!*") |
|
active_signals_table = gr.DataFrame( |
|
label="Active Signals", |
|
headers=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'], |
|
datatype=['str'] * 7, |
|
interactive=False, |
|
row_count=(10, "dynamic"), |
|
col_count=(7, "fixed"), |
|
wrap=True |
|
) |
|
|
|
with gr.TabItem("Zones"): |
|
gr.Markdown("### Potential Long/Short Zones") |
|
zone_threshold_display = max(0.1, min(1.0, DEFAULT_MIN_CONFIRMATION * 0.4)) |
|
gr.Markdown(f"*Coins with the highest/lowest average signal score across the selected timeframes (based on an average score threshold of ~**+/- {zone_threshold_display:.2f}**). Indicates potential broader trend alignment.*") |
|
with gr.Row(): |
|
long_zone_output = gr.DataFrame(label="Potential Long Zone (Highest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10) |
|
short_zone_output = gr.DataFrame(label="Potential Short Zone (Lowest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10) |
|
|
|
with gr.TabItem("Full Coin Details"): |
|
gr.Markdown("Select a coin analyzed in the heatmap to view its detailed indicator values, signals, and potential trade setup across all selected timeframes.") |
|
coin_selector = gr.Dropdown(label="Select Coin to View All Timeframe Details", choices=[], interactive=False) |
|
coin_detail_output = gr.Markdown(label="Detailed Indicator Values & Trade Setup per Timeframe", value="*Select a coin from the dropdown after analysis runs.*") |
|
|
|
with gr.TabItem("Backtest Summary"): |
|
gr.Markdown(f"### Simplified Backtest Results (ATR TP1/SL Strategy)") |
|
gr.Markdown(f"*Note: Simulated on last **{BACKTEST_HISTORY_CANDLES}** candles per timeframe. Assumes entry on signal candle's open, exits on TP1/SL hit within the **next** candle's high/low. Includes estimated ~{SIMULATED_FEE_PERCENT*2:.2f}% round-trip fee. **This is a highly simplified simulation for indicative purposes only and NOT investment advice.** Results also saved to `{BACKTEST_RESULTS_FILE}`.*") |
|
backtest_summary_df = gr.DataFrame( |
|
label="Backtest Metrics per Symbol/Timeframe", |
|
interactive=False, |
|
wrap=True, |
|
row_count=(15, "dynamic"), |
|
col_count=(6, "fixed") |
|
) |
|
|
|
|
|
|
|
def analysis_process_wrapper(exchange, top_n, timeframes, current_state, progress=gr.Progress(track_tqdm=True)): |
|
"""Wrapper to run analysis and update UI components.""" |
|
start_time = time.time() |
|
log = ["Initializing analysis..."] |
|
|
|
current_state = { |
|
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, |
|
'backtest_results': [], 'active_signals_df': pd.DataFrame(), |
|
'valid_timeframes': [], 'analyzer': None |
|
} |
|
initial_updates = { |
|
status_log: "\n".join(log), |
|
heatmap_plot: None, |
|
heatmap_detail_output: "Running analysis...", |
|
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), |
|
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), |
|
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), |
|
coin_selector: gr.Dropdown(choices=[], value=None, label="Select Coin...", interactive=False), |
|
coin_detail_output: "Running analysis...", |
|
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), |
|
shared_state: current_state |
|
} |
|
yield initial_updates |
|
|
|
try: |
|
|
|
if not exchange: |
|
log.append("Error: No exchange selected.") |
|
yield {status_log: "\n".join(log), shared_state: current_state} |
|
return |
|
if not timeframes: |
|
log.append("Error: No timeframes selected.") |
|
yield {status_log: "\n".join(log), shared_state: current_state} |
|
return |
|
|
|
analyzer = CryptoTrendIndicator(exchange, int(top_n), timeframes) |
|
current_state['analyzer'] = analyzer |
|
|
|
if not analyzer.valid_timeframes: |
|
log.append(f"Error: No valid timeframes found or supported for exchange '{exchange}'. Check selection or exchange capabilities.") |
|
yield {status_log: "\n".join(log), shared_state: current_state} |
|
return |
|
|
|
log.append(f"Analyzer initialized for {exchange} | {top_n} coins | Timeframes: {analyzer.valid_timeframes}") |
|
log.append(f"Fetching up to {LIMIT_PER_TIMEFRAME} candles | Backtesting last {BACKTEST_HISTORY_CANDLES} candles.") |
|
log.append("Starting data fetch and analysis (this may take several minutes)...") |
|
yield {status_log: "\n".join(log)} |
|
|
|
|
|
analysis_results, heatmap_df, backtest_results, active_signals_df, heatmap_details, log_msgs = analyzer.run_full_analysis(progress=progress) |
|
log.extend(log_msgs) |
|
|
|
|
|
current_state['analysis_results'] = analysis_results |
|
current_state['heatmap_df'] = heatmap_df if isinstance(heatmap_df, pd.DataFrame) else pd.DataFrame() |
|
current_state['heatmap_details'] = heatmap_details if isinstance(heatmap_details, dict) else {} |
|
current_state['backtest_results'] = backtest_results if isinstance(backtest_results, list) else [] |
|
current_state['active_signals_df'] = active_signals_df if isinstance(active_signals_df, pd.DataFrame) else pd.DataFrame() |
|
current_state['valid_timeframes'] = analyzer.valid_timeframes |
|
|
|
|
|
|
|
if not current_state['heatmap_df'].empty: |
|
fig = create_plotly_heatmap(current_state['heatmap_df'], current_state['heatmap_details']) |
|
coin_list = sorted(current_state['heatmap_df'].index.astype(str).tolist()) |
|
coin_selector_update = gr.Dropdown(choices=coin_list, value=None, label="Select Coin...", interactive=True) |
|
heatmap_detail_msg = "Click on a heatmap cell for specific details." |
|
else: |
|
log.append("Warning: Heatmap data is empty after analysis.") |
|
fig = go.Figure(layout=go.Layout(title="No heatmap data generated", height=300)) |
|
coin_list = [] |
|
coin_selector_update = gr.Dropdown(choices=[], value=None, label="No Coins Analyzed", interactive=False) |
|
heatmap_detail_msg = "No heatmap data generated. Check logs." |
|
|
|
|
|
long_coins, short_coins = find_zones(current_state['heatmap_df'], DEFAULT_MIN_CONFIRMATION) |
|
|
|
long_df_data = long_coins if long_coins else [(" ", " ")] |
|
short_df_data = short_coins if short_coins else [(" ", " ")] |
|
|
|
|
|
bt_summary_display_df = format_backtest_summary(current_state['backtest_results']) |
|
if bt_summary_display_df.empty: |
|
bt_summary_display_df = pd.DataFrame([{'symbol': 'No results', 'timeframe': '', 'trades': 0, 'win_rate': 0, 'pnl_abs_sum': 0, 'pnl_%_sum_on_entry': 0}]) |
|
|
|
|
|
|
|
active_signals_display_df = current_state['active_signals_df'] |
|
if active_signals_display_df.empty: |
|
active_signals_display_df = pd.DataFrame([{'Symbol': 'No active signals', 'Timeframe': '', 'Direction': '', 'Entry': '', 'SL': '', 'TP1': '', 'TP2': ''}]) |
|
|
|
|
|
end_time = time.time() |
|
log.append(f"Analysis & Backtest finished in {end_time - start_time:.2f} seconds.") |
|
|
|
|
|
final_updates = { |
|
status_log: "\n".join(log), |
|
heatmap_plot: fig, |
|
heatmap_detail_output: heatmap_detail_msg, |
|
active_signals_table: active_signals_display_df, |
|
long_zone_output: gr.DataFrame(value=long_df_data, headers=["Coin", "Avg Score"]), |
|
short_zone_output: gr.DataFrame(value=short_df_data, headers=["Coin", "Avg Score"]), |
|
coin_selector: coin_selector_update, |
|
coin_detail_output: "Select a coin from the dropdown above." if coin_list else "No analysis results.", |
|
backtest_summary_df: bt_summary_display_df, |
|
shared_state: current_state |
|
} |
|
yield final_updates |
|
|
|
except ValueError as ve: |
|
|
|
log.append(f"--- CONFIGURATION ERROR ---") |
|
error_details = f"{str(ve)}\n{traceback.format_exc()}" |
|
log.append(error_details) |
|
print(error_details) |
|
current_state = { |
|
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, |
|
'backtest_results': [], 'active_signals_df': pd.DataFrame(), |
|
'valid_timeframes': [], 'analyzer': None |
|
} |
|
yield { status_log: "\n".join(log), shared_state: current_state } |
|
|
|
except Exception as e: |
|
log.append(f"--- FATAL ERROR DURING ANALYSIS ---") |
|
error_details = f"{str(e)}\n{traceback.format_exc()}" |
|
log.append(error_details) |
|
print(error_details) |
|
|
|
current_state = { |
|
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {}, |
|
'backtest_results': [], 'active_signals_df': pd.DataFrame(), |
|
'valid_timeframes': [], 'analyzer': None |
|
} |
|
error_updates = { |
|
status_log: "\n".join(log), |
|
|
|
heatmap_plot: None, |
|
heatmap_detail_output: f"Analysis failed. Check logs.\nError: {e}", |
|
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), |
|
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), |
|
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]), |
|
coin_selector: gr.Dropdown(choices=[], value=None, label="Error", interactive=False), |
|
coin_detail_output: f"Analysis failed. Check logs.\nError: {e}", |
|
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), |
|
shared_state: current_state |
|
} |
|
yield error_updates |
|
|
|
run_button.click( |
|
fn=analysis_process_wrapper, |
|
inputs=[exchange_input, top_n_input, timeframe_input, shared_state], |
|
outputs=[ |
|
status_log, heatmap_plot, heatmap_detail_output, active_signals_table, |
|
long_zone_output, short_zone_output, coin_selector, coin_detail_output, |
|
backtest_summary_df, shared_state |
|
] |
|
) |
|
|
|
|
|
def display_full_details_handler(selected_coin, current_state): |
|
"""Handles dropdown change to show full details for a selected coin.""" |
|
if not selected_coin: |
|
return "Select a coin from the dropdown." |
|
|
|
if not isinstance(current_state, dict) or not current_state.get('analysis_results') or not current_state.get('valid_timeframes'): |
|
print("Debug Coin Select: State invalid or missing data.") |
|
return "Run analysis first or analysis data is missing/incomplete in state." |
|
|
|
analysis_results = current_state['analysis_results'] |
|
valid_tfs = current_state.get('valid_timeframes', []) |
|
|
|
|
|
full_symbol = None |
|
for symbol_key in analysis_results.keys(): |
|
|
|
if str(symbol_key).startswith(str(selected_coin) + '/'): |
|
full_symbol = symbol_key |
|
break |
|
|
|
if not full_symbol: |
|
print(f"Debug Coin Select: Full symbol not found for base {selected_coin}") |
|
return f"Details not found for {selected_coin} in the current analysis results." |
|
|
|
if not valid_tfs: |
|
print(f"Debug Coin Select: Valid timeframes list missing for {selected_coin}") |
|
return f"Valid timeframes list is missing from state for {selected_coin}." |
|
|
|
|
|
return format_coin_details(full_symbol, analysis_results, valid_tfs) |
|
|
|
coin_selector.change( |
|
fn=display_full_details_handler, |
|
inputs=[coin_selector, shared_state], |
|
outputs=[coin_detail_output] |
|
) |
|
|
|
|
|
|
|
heatmap_plot.change( |
|
fn=format_heatmap_click_details, |
|
inputs=[shared_state], |
|
outputs=[heatmap_detail_output] |
|
) |
|
|
|
return app |
|
|
|
|
|
if __name__ == "__main__": |
|
print("\n--- Crypto Analysis App V3 ---") |
|
|
|
for fpath in [BACKTEST_RESULTS_FILE, SIGNAL_LOG_FILE]: |
|
if os.path.exists(fpath): |
|
print(f"INFO: Existing file found: '{fpath}'. It may be appended to or overwritten on the next run.") |
|
else: |
|
print(f"INFO: Results/Logs will be saved to '{fpath}' after analysis.") |
|
|
|
print("\nStarting Crypto Analysis Gradio App...") |
|
print("------------------------------------------------------") |
|
print(f"CONFIG: Backtest Candles={BACKTEST_HISTORY_CANDLES}, Fetch Limit={LIMIT_PER_TIMEFRAME}") |
|
print(f"CONFIG: Default Exchange={DEFAULT_EXCHANGE_ID}, Top Coins={DEFAULT_TOP_N_COINS}, Timeframes={DEFAULT_TIMEFRAMES}") |
|
print("WARNING: Initial analysis might be slow due to extensive data fetching and calculations.") |
|
print("Ensure you have required libraries: pandas, numpy, ccxt, ta, plotly, gradio") |
|
print("------------------------------------------------------") |
|
|
|
gradio_app = create_gradio_app() |
|
|
|
|
|
|
|
gradio_app.queue().launch(debug=False, max_threads=4) |