File size: 104,655 Bytes
70218ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 |
import pandas as pd
import numpy as np
import ccxt
import time
import os
import csv # Import csv module for logging
import traceback # Import traceback for detailed error logging
from datetime import datetime, timedelta
import warnings
import plotly.graph_objects as go
import plotly.colors as pcolors
import gradio as gr
# Import necessary TA indicators (Existing + New)
from ta.trend import MACD, ADXIndicator, IchimokuIndicator, VortexIndicator
from ta.momentum import RSIIndicator, StochasticOscillator, AwesomeOscillatorIndicator, WilliamsRIndicator
from ta.volume import MFIIndicator, OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, VolumeWeightedAveragePrice
from ta.volatility import AverageTrueRange, BollingerBands
# Suppress specific warnings
warnings.filterwarnings('ignore', category=RuntimeWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning) # Ignore some TA lib warnings if needed
# --- Configuration ---
DEFAULT_EXCHANGE_ID = 'mexc' # Changed default as requested context seemed to imply binance API issues
DEFAULT_TOP_N_COINS = 30 # Reduced default due to increased backtest history
DEFAULT_TIMEFRAMES = ['1m', '5m', '15m', '30m', '1h', '4h'] # Example Timeframes
DEFAULT_MIN_CONFIRMATION = 0.75 # Used for Zone finding (Average Score)
# Increased limits for longer backtesting
LIMIT_PER_TIMEFRAME = 1050 # Needs to be >= BACKTEST_HISTORY_CANDLES + indicator lookbacks (~50)
BACKTEST_HISTORY_CANDLES = 1000 # Increased backtest candle count
# --- Trade Parameters ---
ATR_SL_MULTIPLIER = 1.5
ATR_TP1_MULTIPLIER = 1.0
ATR_TP2_MULTIPLIER = 2.0
LEVERAGES = [20, 50] # For display/estimation only
SIMULATED_FEE_PERCENT = 0.06 # Approximate futures fee per side (entry/exit = *2)
BACKTEST_RESULTS_FILE = 'backtest_summary_enhanced.csv'
SIGNAL_LOG_FILE = 'realtime_signal_log.csv' # <<< New: CSV file for logging signals
TIMEFRAME_ORDER_MAP = {
'1m': 1, '3m': 2, '5m': 3, '15m': 4, '30m': 5, '1h': 6, '2h': 7,
'4h': 8, '6h': 9, '8h': 10, '12h': 11, '1d': 12, '3d': 13, '1w': 14, '1M': 15
}
# TIMEFRAME_WEIGHTS not actively used in current logic, kept for potential future use.
# --- CSV Signal Logging Function ---
def log_signal_to_csv(signal_info):
"""Appends signal information to the CSV log file."""
file_exists = os.path.isfile(SIGNAL_LOG_FILE)
fieldnames = [
'LogTimestamp', 'SignalCandleTime', 'Symbol', 'Timeframe', 'Direction',
'Entry', 'SL', 'TP1', 'TP2', 'Status', #'RSI', 'MACD_Diff', 'ADX' # Optional: Add key indicator values
]
try:
with open(SIGNAL_LOG_FILE, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader() # Write header only if file is new
writer.writerow(signal_info)
except IOError as e:
print(f"Error: Could not write to CSV log file {SIGNAL_LOG_FILE}: {e}")
except Exception as e:
print(f"Error logging signal to CSV: {e}\n{traceback.format_exc()}")
# --- Crypto Analysis Class ---
class CryptoTrendIndicator:
def __init__(self, exchange_id, top_coins, selected_timeframes):
self.exchange_id = exchange_id
self.top_coins = top_coins
self.requested_timeframes = selected_timeframes
self.exchange = None
self.valid_timeframes = []
# Stores detailed results {symbol: {tf: {signals: {}, values: {}, trade_params: {}, direction: int}}}
self.analysis_results = {}
self.backtest_results = [] # List of dicts for backtest summary
self.heatmap_df = pd.DataFrame()
self.active_signals_df = pd.DataFrame() # DataFrame for active signals table
self.heatmap_details = {} # Stores dict for hover {Coin: {tf: {Ind: Val,...}}}
self._initialize_exchange()
def _initialize_exchange(self):
"""Initialize the ccxt exchange instance."""
try:
# Force spot market for analysis consistency
self.exchange = getattr(ccxt, self.exchange_id)({
'enableRateLimit': True,
'options': {'defaultType': 'spot'} # Use SPOT for fetching data
})
# Increase timeout for potentially longer data fetches
self.exchange.timeout = 30000 # 30 seconds
self.exchange.load_markets(reload=True)
print(f"Exchange {self.exchange_id} initialized (using SPOT markets for data).")
self._validate_timeframes()
except AttributeError:
raise ValueError(f"Error: Exchange '{self.exchange_id}' not found or supported by ccxt.")
except ccxt.AuthenticationError as e:
raise ValueError(f"Authentication Error for {self.exchange_id}: {e}")
except ccxt.ExchangeError as e:
raise ValueError(f"Exchange Error initializing {self.exchange_id}: {e}")
except Exception as e:
raise ValueError(f"Unexpected error initializing exchange: {e}\n{traceback.format_exc()}")
def _validate_timeframes(self):
"""Filter selected timeframes against those supported by the exchange."""
if not self.exchange or not self.exchange.timeframes:
print(f"Warning: Could not get timeframes from {self.exchange_id}. Cannot validate.")
# Attempt to use requested timeframes, hoping they are valid
self.valid_timeframes = sorted(
self.requested_timeframes,
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
)
print(f"Proceeding with requested timeframes (validation skipped): {self.valid_timeframes}")
return
supported_tfs = self.exchange.timeframes
self.valid_timeframes = sorted(
[tf for tf in self.requested_timeframes if tf in supported_tfs],
key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
)
print(f"Supported timeframes for analysis: {self.valid_timeframes}")
if len(self.valid_timeframes) != len(self.requested_timeframes):
skipped = set(self.requested_timeframes) - set(self.valid_timeframes)
print(f"Warning: Skipped unsupported timeframes for {self.exchange_id}: {', '.join(skipped)}")
if not self.valid_timeframes:
print(f"Warning: No valid timeframes selected or supported by {self.exchange_id}.")
def fetch_top_coins(self):
"""Fetch the top coins by USDT volume from the exchange (spot only)"""
if not self.exchange: return [], "Exchange not initialized"
# --- Logic unchanged ---
try:
tickers = self.exchange.fetch_tickers()
usdt_pairs = {}
# Stricter filtering for spot, non-leveraged, common pairs
for symbol, data in tickers.items():
try:
market = self.exchange.market(symbol)
if (symbol.endswith('/USDT') and
data is not None and
market is not None and market.get('spot', False) and # Explicitly check for spot
market.get('active', True) and
not market.get('leveraged', False) and # Exclude leveraged
data.get('quoteVolume') is not None and data['quoteVolume'] > 10000 and # Example: Filter low volume
data.get('symbol') is not None and
# Additional filters for common leveraged/problematic tokens
'UP/' not in symbol and 'DOWN/' not in symbol and
'BULL/' not in symbol and 'BEAR/' not in symbol and
'3L/' not in symbol and '3S/' not in symbol and
'5L/' not in symbol and '5S/' not in symbol
):
usdt_pairs[symbol] = data
except ccxt.BadSymbol:
continue # Skip symbols that ccxt can't parse market data for
except Exception as e_inner:
# print(f"Minor error processing ticker {symbol}: {e_inner}") # Log minor errors
continue
if not usdt_pairs:
return [], f"No suitable USDT spot pairs found on {self.exchange_id} (check filters/volume)."
sorted_pairs = sorted(
usdt_pairs.items(),
key=lambda x: x[1]['quoteVolume'],
reverse=True
)
# Fetch slightly more initially to account for potential loading errors
fetch_limit = min(len(sorted_pairs), self.top_coins + 10) # Fetch slightly more
top_symbols_initial = [pair[0] for pair in sorted_pairs[:fetch_limit]]
# Filter again to ensure markets are loadable and active
final_symbols = []
count = 0
print(f"Validating {len(top_symbols_initial)} potential symbols...")
for s in top_symbols_initial:
if count >= self.top_coins:
break
try:
mkt = self.exchange.market(s) # Check if market data is valid & active
if mkt and mkt.get('active', True):
final_symbols.append(s)
count += 1
except ccxt.BadSymbol:
pass
except Exception as e:
print(f"Market {s} skipped during validation due to error: {e}")
msg = f"Fetched and validated top {len(final_symbols)} USDT spot pairs by volume from {self.exchange_id}."
print(msg)
if not final_symbols:
msg += " (Warning: Result list is empty)"
return final_symbols, msg
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
msg = f"Network/Timeout Error fetching tickers from {self.exchange_id}: {e}."
print(msg)
return [], msg
except ccxt.ExchangeError as e:
msg = f"Exchange Error fetching tickers from {self.exchange_id}: {e}"
print(msg)
return [], msg
except Exception as e:
msg = f"An unexpected error occurred fetching top coins: {e}\n{traceback.format_exc()}"
print(msg)
return [], msg
def fetch_ohlcv_data(self, symbol, timeframe, limit=LIMIT_PER_TIMEFRAME):
"""Fetches OHLCV data with retry mechanism."""
if not self.exchange: return [], "Exchange not initialized"
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
# print(f"Fetching {limit} candles for {symbol} {timeframe} (Attempt {attempt+1})...")
# *** Fetch as many as limit allows ***
# CCXT handles the max limit per request internally,
# this 'limit' param tells it the total number desired.
# If limit > exchange max, ccxt might fetch multiple times if supported,
# or just return the max allowed per call. We rely on ccxt's behavior here.
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if not ohlcv:
# If empty list is returned, treat as insufficient
return [], f"No data returned for {symbol} [{timeframe}]"
elif len(ohlcv) < 100: # Need substantial data for long backtest & indicators
return [], f"Insufficient data ({len(ohlcv)}) for {symbol} [{timeframe}] (Need >100)"
elif len(ohlcv) < BACKTEST_HISTORY_CANDLES + 50:
print(f"Warning: Fetched {len(ohlcv)} candles for {symbol} [{timeframe}], less than ideal ({BACKTEST_HISTORY_CANDLES + 50}) for full backtest + lookback.")
# Proceed anyway, backtest might be shorter than intended
# Successfully fetched enough data
print(f"Fetched {len(ohlcv)} candles for {symbol} [{timeframe}]")
return ohlcv, None
except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
print(f"Network error fetching {symbol} [{timeframe}] (Attempt {attempt+1}): {e}. Retrying in {retry_delay}s...")
if attempt == max_retries - 1:
return [], f"Network error {symbol} [{timeframe}] after {max_retries} attempts: {e}"
time.sleep(retry_delay + attempt * 2) # Incremental backoff
except ccxt.RateLimitExceeded as e:
print(f"Rate limit hit fetching {symbol} [{timeframe}]. Waiting longer...")
time.sleep(self.exchange.rateLimit / 1000 * 5 if self.exchange.rateLimit else 60) # Wait longer for rate limits
if attempt == max_retries - 1:
return [], f"Rate limit exceeded for {symbol} [{timeframe}] after retries: {e}"
# Continue to next attempt after waiting
except ccxt.BadSymbol:
return [], f"Invalid symbol {symbol}"
except ccxt.ExchangeError as e:
print(f"Exchange error fetching {symbol} [{timeframe}]: {e}")
# Handle specific errors if needed, e.g., timeframe not available for symbol
if 'timeframe not available' in str(e).lower():
return [], f"Timeframe {timeframe} not supported for {symbol} on {self.exchange_id}"
return [], f"Exchange error {symbol} [{timeframe}]: {e}"
except Exception as e:
print(f"Unexpected error fetching OHLCV {symbol} [{timeframe}]: {e}\n{traceback.format_exc()}")
return [], f"Unexpected error fetching OHLCV {symbol} [{timeframe}]"
return [], f"Failed to fetch data for {symbol} [{timeframe}] after {max_retries} attempts."
def calculate_indicators(self, ohlcv_data, timeframe):
"""Calculates existing and new technical indicators."""
# Increased required length for Ichimoku and other lookbacks
required_length = 100
if not isinstance(ohlcv_data, list) or len(ohlcv_data) < required_length:
print(f"Indicator calc skip: Input data invalid or too short for {timeframe} (needs {required_length}, got {len(ohlcv_data) if ohlcv_data else 0})")
return None
try:
df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# Convert to numeric, coerce errors, drop NaNs needed for core calcs
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df.dropna(subset=['close', 'volume', 'high', 'low'], inplace=True) # Drop rows where essential data is missing
if df.empty or len(df) < required_length:
print(f"Data too short after cleaning for {timeframe} (needs {required_length}, got {len(df)})")
return None
# --- Calculate Existing Indicators ---
df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi().fillna(50)
stoch_obj = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=14, smooth_window=3)
df['stoch_k'] = stoch_obj.stoch().fillna(50)
df['stoch_d'] = stoch_obj.stoch_signal().fillna(50)
ao_obj = AwesomeOscillatorIndicator(high=df['high'], low=df['low'], fillna=True)
df['ao'] = ao_obj.awesome_oscillator().fillna(0)
macd_obj = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9, fillna=True)
df['macd'] = macd_obj.macd().fillna(0)
df['macd_signal'] = macd_obj.macd_signal().fillna(0)
df['macd_diff'] = macd_obj.macd_diff().fillna(0)
adx_obj = ADXIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
df['adx'] = adx_obj.adx().fillna(20)
df['adx_pos'] = adx_obj.adx_pos().fillna(0)
df['adx_neg'] = adx_obj.adx_neg().fillna(0)
df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema_50'] = df['close'].ewm(span=50, adjust=False).mean()
df['ema_100'] = df['close'].ewm(span=100, adjust=False).mean()
df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).money_flow_index().fillna(50)
df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume'], fillna=True).on_balance_volume().fillna(method='ffill')
df['cmf'] = ChaikinMoneyFlowIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=20, fillna=True).chaikin_money_flow().fillna(0)
df['volume_sma'] = df['volume'].rolling(window=20, min_periods=10).mean()
# Handle potential division by zero or NaN in volume_sma
df['volume_ratio'] = (df['volume'] / df['volume_sma'].replace(0, np.nan)).replace([np.inf, -np.inf], 1.0).fillna(1.0)
df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True).average_true_range().fillna(method='ffill').fillna(0)
# --- Calculate NEW Indicators ---
bb_obj = BollingerBands(close=df['close'], window=20, window_dev=2, fillna=True)
df['bb_hband'] = bb_obj.bollinger_hband()
df['bb_lband'] = bb_obj.bollinger_lband()
df['bb_mavg'] = bb_obj.bollinger_mavg()
df['bb_width'] = bb_obj.bollinger_wband()
ichi_obj = IchimokuIndicator(high=df['high'], low=df['low'], window1=9, window2=26, window3=52, fillna=True)
df['ichi_a'] = ichi_obj.ichimoku_a()
df['ichi_b'] = ichi_obj.ichimoku_b()
df['ichi_base'] = ichi_obj.ichimoku_base_line()
df['ichi_conv'] = ichi_obj.ichimoku_conversion_line()
df['will_r'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close'], lbp=14, fillna=True).williams_r().fillna(-50)
df['vwap'] = VolumeWeightedAveragePrice(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).volume_weighted_average_price().fillna(method='ffill')
vortex_obj = VortexIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
df['vortex_pos'] = vortex_obj.vortex_indicator_pos()
df['vortex_neg'] = vortex_obj.vortex_indicator_neg()
# --- End Indicators ---
# Fill any remaining NaNs using forward fill first, then backward fill
# This helps ensure indicators near the start/end are usable for signals/backtest
df.ffill(inplace=True)
df.bfill(inplace=True)
# Ensure sufficient length remains after potential NaNs at the start
min_usable_length = 50 # Minimum needed for signal generation logic using prev row
if len(df) < min_usable_length:
print(f"Data too short after indicator calculation & filling for {timeframe} (needs >{min_usable_length}, got {len(df)})")
return None
return df.copy() # Use copy to avoid SettingWithCopyWarning later
except Exception as e:
print(f"Error calculating indicators for {timeframe}: {e}\n{traceback.format_exc()}")
return None
def generate_signals_and_values(self, df):
"""
Generates signals based on latest indicator values, including new ones.
Implements the "K-map" concept via multi-factor confirmation scoring.
Returns:
- final_signals (dict): Dictionary of individual indicator signals {indicator_name: signal (-1, 0, 1)}.
- values (dict): Dictionary of raw indicator values {indicator_name: value}.
- signal_direction (int): Overall signal direction (-1, 0, 1) based on composite score.
"""
if df is None or not isinstance(df, pd.DataFrame) or len(df.index) < 2: # Check length using index after potential cleaning
# print("Signal Gen Skip: DataFrame invalid or too short.")
return None, None, None
try:
# Ensure index is datetime for proper iloc selection
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index() # Ensure sorting after conversion
# Use .iloc for positional access (robust to non-sequential indices)
latest = df.iloc[-1]
prev = df.iloc[-2]
signals = {} # Stores intermediate signals (can be 0.5, -0.5)
final_signals = {} # Stores final signals (-1, 0, 1)
values = {} # Store raw values
# --- Store latest values (including new ones) ---
values['price'] = latest['close']
values['volume'] = latest['volume']
values['timestamp'] = latest.name.strftime('%Y-%m-%d %H:%M:%S') # Use index name
values['atr'] = latest['atr']
# Momentum
values['rsi'] = latest['rsi']
values['stoch_k'] = latest['stoch_k']; values['stoch_d'] = latest['stoch_d']
values['ao'] = latest['ao']
values['will_r'] = latest['will_r'] # New
# Trend
values['macd'] = latest['macd']; values['macd_signal'] = latest['macd_signal']; values['macd_diff'] = latest['macd_diff']
values['adx'] = latest['adx']; values['adx_pos'] = latest['adx_pos']; values['adx_neg'] = latest['adx_neg']
values['ema_20'] = latest['ema_20']; values['ema_50'] = latest['ema_50']; values['ema_100'] = latest['ema_100']
values['ichi_a'] = latest['ichi_a']; values['ichi_b'] = latest['ichi_b']; values['ichi_base'] = latest['ichi_base']; values['ichi_conv'] = latest['ichi_conv'] # New
values['vortex_pos'] = latest['vortex_pos']; values['vortex_neg'] = latest['vortex_neg'] # New
# Volume
values['mfi'] = latest['mfi']
values['obv'] = latest['obv']
values['cmf'] = latest['cmf']
values['volume_ratio'] = latest['volume_ratio']
values['vwap'] = latest['vwap'] # New (volume-related)
# Volatility
values['bb_hband'] = latest['bb_hband']; values['bb_lband'] = latest['bb_lband']; values['bb_mavg'] = latest['bb_mavg']; values['bb_width'] = latest['bb_width'] # New
# ---
# --- Generate Signals (Using intermediate 'signals' dict) ---
# RSI (Momentum)
signals['rsi'] = 1 if latest['rsi'] < 30 else (-1 if latest['rsi'] > 70 else 0)
# Stochastic (Momentum) - Crossover signal
signals['stoch'] = 1 if latest['stoch_k'] < 25 and prev['stoch_k'] <= prev['stoch_d'] and latest['stoch_k'] > latest['stoch_d'] else (-1 if latest['stoch_k'] > 75 and prev['stoch_k'] >= prev['stoch_d'] and latest['stoch_k'] < latest['stoch_d'] else 0)
# Awesome Oscillator (Momentum) - Zero cross and twin peaks (simplified)
if latest['ao'] > 0 and prev['ao'] <= 0: signals['ao'] = 1
elif latest['ao'] < 0 and prev['ao'] >= 0: signals['ao'] = -1
elif latest['ao'] > 0 and prev['ao'] > 0 and latest['ao'] > prev['ao']: signals['ao'] = 0.5 # Bullish momentum
elif latest['ao'] < 0 and prev['ao'] < 0 and latest['ao'] < prev['ao']: signals['ao'] = -0.5 # Bearish momentum
else: signals['ao'] = 0
# Williams %R (Momentum) - Exiting Overbought/Oversold
signals['will_r'] = 1 if latest['will_r'] > -20 and prev['will_r'] <= -20 else (-1 if latest['will_r'] < -80 and prev['will_r'] >= -80 else 0)
# MACD (Trend/Momentum) - Line cross Signal
if (latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']): signals['macd'] = 1
elif (latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']): signals['macd'] = -1
else: signals['macd'] = 0
# ADX (Trend Strength) - Directional movement
signals['adx'] = 1 if latest['adx'] > 25 and latest['adx_pos'] > latest['adx_neg'] else (-1 if latest['adx'] > 25 and latest['adx_neg'] > latest['adx_pos'] else 0)
# EMA Trend (Trend)
if latest['close'] > latest['ema_20'] and latest['ema_20'] > latest['ema_50'] and latest['ema_50'] > latest['ema_100']: signals['ema_trend'] = 1
elif latest['close'] < latest['ema_20'] and latest['ema_20'] < latest['ema_50'] and latest['ema_50'] < latest['ema_100']: signals['ema_trend'] = -1
elif latest['close'] > latest['ema_50']: signals['ema_trend'] = 0.5 # Price above mid-term EMA
elif latest['close'] < latest['ema_50']: signals['ema_trend'] = -0.5 # Price below mid-term EMA
else: signals['ema_trend'] = 0
# Ichimoku (Trend) - TK cross, Cloud position, Price vs Kijun
ichi_signal = 0
tenkan_cross_kijun_up = latest['ichi_conv'] > latest['ichi_base'] and prev['ichi_conv'] <= prev['ichi_base']
tenkan_cross_kijun_down = latest['ichi_conv'] < latest['ichi_base'] and prev['ichi_conv'] >= prev['ichi_base']
above_cloud = latest['close'] > latest['ichi_a'] and latest['close'] > latest['ichi_b']
below_cloud = latest['close'] < latest['ichi_a'] and latest['close'] < latest['ichi_b']
price_above_kijun = latest['close'] > latest['ichi_base']
price_below_kijun = latest['close'] < latest['ichi_base']
if tenkan_cross_kijun_up and above_cloud and price_above_kijun: ichi_signal = 1 # Strong Bullish
elif tenkan_cross_kijun_down and below_cloud and price_below_kijun: ichi_signal = -1 # Strong Bearish
elif above_cloud and price_above_kijun and latest['ichi_conv'] > latest['ichi_base']: ichi_signal = 0.5 # Bullish Bias
elif below_cloud and price_below_kijun and latest['ichi_conv'] < latest['ichi_base']: ichi_signal = -0.5 # Bearish Bias
signals['ichimoku'] = ichi_signal
# Vortex (Trend) - Crossover
if latest['vortex_pos'] > latest['vortex_neg'] and prev['vortex_pos'] <= prev['vortex_neg']: signals['vortex'] = 1
elif latest['vortex_neg'] > latest['vortex_pos'] and prev['vortex_neg'] <= prev['vortex_pos']: signals['vortex'] = -1
else: signals['vortex'] = 0
# MFI (Volume/Momentum)
signals['mfi'] = 1 if latest['mfi'] < 20 else (-1 if latest['mfi'] > 80 else 0)
# CMF (Volume/Flow)
signals['cmf'] = 1 if latest['cmf'] > 0.05 else (-1 if latest['cmf'] < -0.05 else 0)
# OBV (Volume/Trend) - Simple trend vs moving average
if len(df) > 5:
try:
obv_sma5 = df['obv'].rolling(window=5).mean().iloc[-1]
# Check for NaN sma due to insufficient data at the start
if pd.notna(obv_sma5):
signals['obv_trend'] = 1 if latest['obv'] > obv_sma5 else (-1 if latest['obv'] < obv_sma5 else 0)
else: signals['obv_trend'] = 0
except IndexError: # Catch potential index error if rolling mean fails near start
signals['obv_trend'] = 0
else: signals['obv_trend'] = 0
# Volume Spike (Volume) - Relative to recent average
if latest['volume_ratio'] > 1.8: # Threshold for "high volume"
# Signal direction based on candle close vs open during spike
signals['vol_spike'] = 0.5 if latest['close'] > latest['open'] else (-0.5 if latest['close'] < latest['open'] else 0)
else: signals['vol_spike'] = 0
# VWAP (Volume/Price Level) - Price cross VWAP
signals['vwap_cross'] = 1 if latest['close'] > latest['vwap'] and prev['close'] <= prev['vwap'] else (-1 if latest['close'] < latest['vwap'] and prev['close'] >= prev['vwap'] else 0)
# Bollinger Bands (Volatility/Mean Reversion/Breakout) - Breakout example
if latest['close'] > latest['bb_hband'] and prev['close'] <= prev['bb_hband']: signals['bbands'] = 1
elif latest['close'] < latest['bb_lband'] and prev['close'] >= prev['bb_lband']: signals['bbands'] = -1
else: signals['bbands'] = 0
# --- End Signal Logic ---
# --- Final Cleanup & Composite (K-Map/Scoring Implementation) ---
# Ensure all indicators used in signals have a default entry
all_signal_keys = list(signals.keys()) # Get keys from the intermediate signals
for k in all_signal_keys:
values.setdefault(k, np.nan) # Ensure value exists even if calculation failed (use NaN)
# Convert intermediate 0.5/-0.5 signals to 1/-1 for final score, store in final_signals
for key, value in signals.items():
if value >= 0.5: final_signals[key] = 1
elif value <= -0.5: final_signals[key] = -1
else: final_signals[key] = 0
# Calculate composite score based on sum of FINAL (-1, 0, 1) signals
non_neutral_signals = [s for s in final_signals.values() if s != 0]
composite_score = sum(non_neutral_signals)
# Determine overall signal direction based on score magnitude
# Refined Threshold: Need ~30% net agreement among indicators, minimum 3 net signals
num_indicators_signaling = len(final_signals) # Count how many indicators produced a signal
signal_strength_threshold = max(3, int(num_indicators_signaling * 0.30))
signal_direction = 0
if num_indicators_signaling > 0: # Avoid division by zero if no signals generated
# Check if composite score meets the threshold
if composite_score >= signal_strength_threshold:
signal_direction = 1
elif composite_score <= -signal_strength_threshold:
signal_direction = -1
# Ensure all keys from values (excluding non-indicator ones) are in final_signals with 0 if not set
value_keys_for_signals = [k for k in values.keys() if k not in ['price', 'volume', 'timestamp', 'atr']]
for k in value_keys_for_signals:
final_signals.setdefault(k, 0) # Default to neutral if no signal logic applied
return final_signals, values, signal_direction
except KeyError as e:
print(f"KeyError during signal generation (likely missing indicator column: {e}) in DF columns: {df.columns if df is not None else 'None'}. Check calculation step.")
return None, None, None
except IndexError as e:
print(f"IndexError during signal generation (likely insufficient data rows for prev/latest): {e}. DF length: {len(df) if df is not None else 0}")
return None, None, None
except Exception as e:
print(f"Error generating signals/values: {e}\n{traceback.format_exc()}")
return None, None, None
def calculate_trade_params(self, values, signal_direction):
"""Calculate Entry, SL, TP1, TP2 based on ATR"""
params = {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
try:
# Validate necessary inputs more rigorously
if signal_direction == 0 or \
not values or \
pd.isna(values.get('atr')) or values.get('atr', 0) <= 0 or \
pd.isna(values.get('price')) or values.get('price', 0) <= 0:
return params # No signal or invalid data
entry_price = values['price']
atr_val = values['atr']
# Added check here as well
if atr_val <= 0 or entry_price <= 0 : return params
params['entry'] = entry_price
if signal_direction == 1: # Long
params['sl'] = entry_price - ATR_SL_MULTIPLIER * atr_val
params['tp1'] = entry_price + ATR_TP1_MULTIPLIER * atr_val
params['tp2'] = entry_price + ATR_TP2_MULTIPLIER * atr_val
elif signal_direction == -1: # Short
params['sl'] = entry_price + ATR_SL_MULTIPLIER * atr_val
params['tp1'] = entry_price - ATR_TP1_MULTIPLIER * atr_val
params['tp2'] = entry_price - ATR_TP2_MULTIPLIER * atr_val
# Ensure SL/TP are valid numbers and positive
# Also check if SL crossed entry (e.g., due to very small ATR) - invalidate if so.
if pd.isna(params['sl']) or pd.isna(params['tp1']) or pd.isna(params['tp2']) or \
params['sl'] <= 0 or params['tp1'] <= 0 or params['tp2'] <= 0 or \
(signal_direction == 1 and params['sl'] >= params['entry']) or \
(signal_direction == -1 and params['sl'] <= params['entry']):
# print(f"Warning: Invalid SL/TP (NaN, <=0, or SL crossed entry) for entry {entry_price:.5f}, ATR {atr_val:.5f}. Signal: {signal_direction}")
# Reset params if invalid
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
# Calculate potential leveraged profit for $1 (simplified)
fee = SIMULATED_FEE_PERCENT / 100.0
for lev in LEVERAGES:
# Ratios based on entry price
profit_ratio_tp1 = abs(params['tp1'] - entry_price) / entry_price
loss_ratio_sl = abs(params['sl'] - entry_price) / entry_price
# Calculate gross P/L ratio with leverage
leveraged_profit_tp1 = profit_ratio_tp1 * lev
leveraged_loss_sl = loss_ratio_sl * lev
# Calculate fee impact (applied to leveraged position size)
fee_impact = 2 * fee * lev # Entry fee + Exit fee on leveraged amount
# Net profit/loss per $1 invested (considering $1 as margin)
dollar_profit_tp1 = leveraged_profit_tp1 - fee_impact
dollar_loss_sl = -leveraged_loss_sl - fee_impact # Loss is negative
params['lev_profit'][f'{lev}x'] = {'tp1_profit_$': round(dollar_profit_tp1, 3), 'sl_loss_$': round(dollar_loss_sl, 3)}
return params
except Exception as e:
print(f"Error calculating trade params for signal {signal_direction}, values {values}: {e}\n{traceback.format_exc()}")
return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
def _run_simple_backtest(self, symbol, timeframe, df):
"""VERY Basic backtest simulation on the provided DataFrame (Uses longer history)."""
default_result = {'symbol': symbol, 'timeframe': timeframe, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0}
# Need enough history + buffer for indicator lookbacks
min_backtest_len = BACKTEST_HISTORY_CANDLES + 50 # Need ~50 for lookback before backtest starts
# Check if DataFrame is valid and has enough rows
if df is None or not isinstance(df, pd.DataFrame) or len(df) < min_backtest_len:
# print(f"BT Skip {symbol} {timeframe}: Not enough data ({len(df) if df is not None else 0} < {min_backtest_len})")
return default_result
try:
# Ensure index is datetime and sorted for correct slicing
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# Select the slice for backtesting simulation
# Use iloc for robustness to non-sequential indices after cleaning
backtest_start_iloc = len(df) - BACKTEST_HISTORY_CANDLES
if backtest_start_iloc < 0: backtest_start_iloc = 0 # Should not happen with check above, but safety
# Iterate through the candles *within the backtest period*
# Signal is generated using data *up to* candle i-1 close
# Trade entry occurs at candle i open
# Exit checks happen based on candle i high/low
trades = []
in_position = False
entry_price = 0
position_direction = 0 # 1 for long, -1 for short
stop_loss = 0
take_profit = 0 # Using TP1 for this simple backtest
entry_timestamp = None # For debugging/tracking
# Loop from the start of the backtest period + 1 (need previous candle for signal)
# Ensure we have enough lookback *before* the first signal candle (iloc-based)
# Ensure lookback of at least 50 candles for indicators
first_signal_candle_idx = max(50, backtest_start_iloc) # Start generating signals from here
for i in range(first_signal_candle_idx, len(df)):
current_row = df.iloc[i]
signal_candle_iloc = i - 1 # Signal based on close of previous candle (iloc)
# Slice original df up to and including the signal candle
# Use iloc slicing for performance and robustness
# Ensure the slice is valid
if signal_candle_iloc < 1: continue # Need at least 2 rows for signal calc
df_for_signal_calc = df.iloc[:signal_candle_iloc + 1]
# Check if df_for_signal_calc is valid before generating signal
if df_for_signal_calc is None or len(df_for_signal_calc) < 2:
continue # Skip if not enough data for signal calc
# Generate signal based on data ending at the previous candle's close
sim_signals, sim_values, sim_direction = self.generate_signals_and_values(df_for_signal_calc)
# --- Entry Logic ---
# Enter only if not already in position AND a clear signal occurs AND we have values
if not in_position and sim_direction != 0 and sim_values:
# Use ATR from the *signal candle* for SL/TP calc
atr_at_entry = sim_values.get('atr')
# Enter at current candle's open price
entry_price_candidate = current_row['open']
entry_timestamp = current_row.name # Timestamp of entry candle
# Validate entry conditions before taking position
if pd.notna(entry_price_candidate) and entry_price_candidate > 0 and \
pd.notna(atr_at_entry) and atr_at_entry > 0:
# Calculate potential SL/TP *before* deciding to enter fully
if sim_direction == 1: # Long
potential_sl = entry_price_candidate - ATR_SL_MULTIPLIER * atr_at_entry
potential_tp = entry_price_candidate + ATR_TP1_MULTIPLIER * atr_at_entry
else: # Short
potential_sl = entry_price_candidate + ATR_SL_MULTIPLIER * atr_at_entry
potential_tp = entry_price_candidate - ATR_TP1_MULTIPLIER * atr_at_entry
# Basic sanity check for SL/TP (positive values and SL not crossing entry)
if not (pd.isna(potential_sl) or pd.isna(potential_tp) or potential_sl <= 0 or potential_tp <= 0 or \
(sim_direction == 1 and potential_sl >= entry_price_candidate) or \
(sim_direction == -1 and potential_sl <= entry_price_candidate)):
# All checks passed, commit to position
in_position = True
position_direction = sim_direction
entry_price = entry_price_candidate
stop_loss = potential_sl
take_profit = potential_tp
# print(f"BT Enter {symbol} {timeframe} @ {entry_timestamp}: Dir={position_direction} Entry={entry_price:.4f} SL={stop_loss:.4f} TP={take_profit:.4f} ATR={atr_at_entry:.4f}")
# else: print(f"BT Skip Entry {symbol} {timeframe}: Invalid entry conditions (Price:{entry_price_candidate}, ATR:{atr_at_entry})")
# --- Exit Logic ---
# Check exits only if currently in a position
elif in_position:
exit_price = None
pnl = 0
exit_reason = "N/A"
current_high = current_row['high']
current_low = current_row['low']
exit_timestamp = current_row.name # Timestamp of exit check candle
# Validate high/low data
if pd.isna(current_high) or pd.isna(current_low):
print(f"BT Warning {symbol} {timeframe}: NaN High/Low at {exit_timestamp}, cannot check exit.")
continue # Skip exit check for this candle
# Check SL/TP hit based on current candle's high/low
# Important: Check SL first in case both are hit within the same candle
if position_direction == 1: # Long
if current_low <= stop_loss:
exit_price = stop_loss
exit_reason = "SL Hit"
elif current_high >= take_profit:
exit_price = take_profit
exit_reason = "TP1 Hit"
elif position_direction == -1: # Short
if current_high >= stop_loss:
exit_price = stop_loss
exit_reason = "SL Hit"
elif current_low <= take_profit:
exit_price = take_profit
exit_reason = "TP1 Hit"
# If an exit condition was met
if exit_price is not None:
# Calculate PnL based on entry and exit
# For shorts, PnL = Entry - Exit; for longs, PnL = Exit - Entry
if position_direction == 1:
pnl = exit_price - entry_price
else: # Short
pnl = entry_price - exit_price
# Calculate and subtract simulated fees
# Fee is % of trade value at entry and exit
entry_fee = (SIMULATED_FEE_PERCENT / 100.0) * entry_price
exit_fee = (SIMULATED_FEE_PERCENT / 100.0) * exit_price
pnl -= (entry_fee + exit_fee)
trades.append({'entry': entry_price, 'exit': exit_price, 'pnl': pnl, 'direction': position_direction})
# print(f"BT Exit {symbol} {timeframe} @ {exit_timestamp}: Reason={exit_reason} ExitPx={exit_price:.4f} PnL={pnl:.4f} (Entry @ {entry_price:.4f} on {entry_timestamp})")
in_position = False # Reset position state after closing trade
entry_timestamp = None # Reset entry timestamp
# --- Summarize Results ---
num_trades = len(trades)
if num_trades > 0:
wins = sum(1 for t in trades if t['pnl'] > 0)
win_rate = (wins / num_trades * 100)
total_pnl = sum(t['pnl'] for t in trades)
# Calculate PnL % sum based on entry price (approximation of capital growth)
pnl_percentage_sum = sum((t['pnl'] / t['entry']) * 100 for t in trades if t['entry'] > 0)
else:
win_rate = 0
total_pnl = 0
pnl_percentage_sum = 0
# print(f"BT Summary {symbol} {timeframe}: Trades={num_trades}, WinRate={win_rate:.2f}%, PnL Sum={total_pnl:.5f}, PnL % Sum={pnl_percentage_sum:.2f}%")
return {
'symbol': symbol, 'timeframe': timeframe, 'trades': num_trades,
'win_rate': round(win_rate, 2), 'pnl_sum': round(total_pnl, 5),
'pnl_%_sum': round(pnl_percentage_sum, 2)
}
except Exception as e:
print(f"Error during backtest simulation for {symbol} {timeframe}: {e}\n{traceback.format_exc()}")
# Print details about the DataFrame state at error might help debugging
# print(f"DF info at error for {symbol} {timeframe}:")
# try: df.info()
# except: print("Could not get DF info")
return default_result
def analyze_symbol(self, symbol, progress):
"""Analyzes a single symbol across all valid timeframes."""
timeframe_details = {}
heatmap_composites = {}
symbol_backtest_results = []
log_msgs = []
symbol_active_signals = {} # Store active signals {tf: {details}} for this symbol
symbol_hover_details = {} # Store hover details {tf: {ind: val}} for this symbol
if not self.valid_timeframes:
return timeframe_details, heatmap_composites, symbol_backtest_results, [f"No valid timeframes for {symbol}."], {}, {}
# Add small delay before starting analysis for a symbol to help with rate limits
time.sleep(0.1)
for i, timeframe in enumerate(self.valid_timeframes):
progress(i / len(self.valid_timeframes), desc=f"Fetching {symbol} [{timeframe}]")
# Fetch data
ohlcv, err_msg = self.fetch_ohlcv_data(symbol, timeframe)
if err_msg:
log_msgs.append(f"Data fetch skip: {symbol} [{timeframe}] {err_msg}")
heatmap_composites[timeframe] = 0
# Run backtest with None to get a default failure entry
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
symbol_hover_details[timeframe] = {} # Placeholder
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.3)) # Longer sleep on error
continue
# Calculate indicators
progress((i + 0.3) / len(self.valid_timeframes), desc=f"Calculating Ind. {symbol} [{timeframe}]")
df = self.calculate_indicators(ohlcv, timeframe)
if df is None or df.empty:
log_msgs.append(f"Indicator calc skip: {symbol} [{timeframe}] (DataFrame invalid or empty)")
heatmap_composites[timeframe] = 0
# Run backtest with None
symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
symbol_hover_details[timeframe] = {} # Placeholder
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
continue
# Generate signals/values based on the *latest* data
progress((i + 0.6) / len(self.valid_timeframes), desc=f"Generating Sig. {symbol} [{timeframe}]")
signals, values, signal_direction = self.generate_signals_and_values(df)
if signals is not None and values is not None:
# Calculate trade params based on the latest signal
trade_params = self.calculate_trade_params(values, signal_direction)
# Store all details for this timeframe
timeframe_details[timeframe] = {
'signals': signals, # Individual indicator signals (-1, 0, 1)
'values': values, # Raw indicator values
'trade_params': trade_params, # Entry, SL, TP etc.
'direction': signal_direction # Overall signal direction (-1, 0, 1)
}
# Calculate composite for heatmap (normalized sum of final signals)
num_potential_signals = len(signals)
composite = sum(signals.values()) / num_potential_signals if num_potential_signals > 0 else 0
heatmap_composites[timeframe] = composite
# Store key values for hover text
symbol_hover_details[timeframe] = {
'Price': values.get('price', np.nan), # Add price to hover
'RSI': values.get('rsi', np.nan),
'MACD': values.get('macd', np.nan),
'StochK': values.get('stoch_k', np.nan),
'ADX': values.get('adx', np.nan),
'WillR': values.get('will_r', np.nan),
}
# Store active signal IF direction is non-zero AND trade params are valid
if signal_direction != 0 and trade_params.get('entry') is not None:
active_sig_details = {
'direction': signal_direction,
'entry': trade_params['entry'],
'sl': trade_params['sl'],
'tp1': trade_params['tp1'],
'tp2': trade_params['tp2']
}
symbol_active_signals[timeframe] = active_sig_details
# --- Log Signal to CSV ---
log_data = {
'LogTimestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # Log time
'SignalCandleTime': values.get('timestamp', 'N/A'), # Time of candle signal was based on
'Symbol': symbol,
'Timeframe': timeframe,
'Direction': 'LONG' if signal_direction == 1 else 'SHORT',
'Entry': f"{trade_params['entry']:.8f}", # Use more precision for logging
'SL': f"{trade_params['sl']:.8f}",
'TP1': f"{trade_params['tp1']:.8f}",
'TP2': f"{trade_params['tp2']:.8f}",
'Status': 'Triggered' # Initial status
# Optional: Add key indicator values
#'RSI': f"{values.get('rsi', np.nan):.2f}",
#'MACD_Diff': f"{values.get('macd_diff', np.nan):.6f}",
#'ADX': f"{values.get('adx', np.nan):.2f}"
}
log_signal_to_csv(log_data)
# --- End CSV Logging ---
else:
log_msgs.append(f"Signal gen skip: {symbol} [{timeframe}]")
heatmap_composites[timeframe] = 0
symbol_hover_details[timeframe] = {} # Placeholder
# Run backtest using the full historical dataframe calculated earlier
progress((i + 0.9) / len(self.valid_timeframes), desc=f"Backtesting {symbol} [{timeframe}]")
# IMPORTANT: Pass a COPY of df to backtest to avoid modification issues
bt_result = self._run_simple_backtest(symbol, timeframe, df.copy())
symbol_backtest_results.append(bt_result)
# Sleep based on exchange rate limit (minimum 0.2s)
time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
return timeframe_details, heatmap_composites, symbol_backtest_results, log_msgs, symbol_active_signals, symbol_hover_details
def run_full_analysis(self, progress=gr.Progress()):
"""Runs the complete analysis for top coins across selected timeframes."""
self.analysis_results = {}
self.backtest_results = [] # Reset backtest results
self.active_signals_df = pd.DataFrame() # Reset active signals
self.heatmap_details = {} # Reset hover details
all_log_msgs = []
all_active_signals_list = [] # Temp list to build the DataFrame
# Start Fetching Coins
progress(0, desc="Fetching top coins...")
symbols, msg = self.fetch_top_coins()
all_log_msgs.append(msg)
if not symbols:
self.heatmap_df = pd.DataFrame()
all_log_msgs.append("Error: No symbols found to analyze. Stopping.")
# Return empty structures
return {}, pd.DataFrame(), [], pd.DataFrame(), {}, all_log_msgs
# Start Analyzing Symbols
heatmap_data_list = [] # For building heatmap DataFrame [{Coin: X, tf1: score1, ...}, ...]
total_symbols = len(symbols)
progress(0.05, desc=f"Starting analysis for {total_symbols} coins...")
for idx, symbol in enumerate(symbols):
# Calculate progress for this symbol
symbol_progress_start = 0.05 + (idx / total_symbols) * 0.90 # Leave space at end
symbol_progress_range = (1 / total_symbols) * 0.90
symbol_progress_desc = f"Analyzing {symbol} ({idx+1}/{total_symbols})"
progress(symbol_progress_start, desc=symbol_progress_desc)
# Create a lambda for inner progress updates relative to this symbol's range
symbol_progress_tracker = lambda p, desc=symbol_progress_desc: progress(
symbol_progress_start + (p * symbol_progress_range), desc=desc
)
try:
# Analyze symbol (includes signal gen, trade param calc, backtest sim)
# Returns: details per tf, heatmap scores per tf, backtest results list, logs, active signals dict, hover details dict
tf_details, tf_heatmap_scores, symbol_bt_results, log_msgs, symbol_active_tf_signals, symbol_tf_hover_details = \
self.analyze_symbol(symbol, symbol_progress_tracker)
all_log_msgs.extend(log_msgs)
self.backtest_results.extend(symbol_bt_results) # Aggregate backtest results
# Only process if we got some valid details back for the symbol
if tf_details: # Check if the dict is not empty
self.analysis_results[symbol] = tf_details # Store full details
coin_name = symbol.split('/')[0]
heatmap_row = {'Coin': coin_name}
symbol_hover_data = {} # To store hover details for this coin
# Populate heatmap row scores and collect hover details for the coin
for tf in self.valid_timeframes:
heatmap_row[tf] = tf_heatmap_scores.get(tf, 0) # Get score, default 0
symbol_hover_data[tf] = symbol_tf_hover_details.get(tf, {}) # Get hover dict, default empty
heatmap_data_list.append(heatmap_row)
self.heatmap_details[coin_name] = symbol_hover_data # Store hover details keyed by coin name
# Add any active signals found for this symbol to the main list
for tf, active_sig in symbol_active_tf_signals.items():
# Use more precision for the active signals table as well
all_active_signals_list.append({
'Symbol': symbol,
'Timeframe': tf,
'Direction': 'LONG' if active_sig['direction'] == 1 else 'SHORT',
'Entry': f"{active_sig['entry']:.8f}", # Format for display
'SL': f"{active_sig['sl']:.8f}",
'TP1': f"{active_sig['tp1']:.8f}",
'TP2': f"{active_sig['tp2']:.8f}",
})
except ccxt.RateLimitExceeded as e:
wait_time = 60 # Longer wait time
all_log_msgs.append(f"Rate limit exceeded analyzing {symbol}. Sleeping for {wait_time}s... {e}")
print(all_log_msgs[-1])
progress(symbol_progress_start + symbol_progress_range * 0.9, desc=f"Rate Limit Hit! Waiting {wait_time}s...") # Update progress during wait
time.sleep(wait_time)
# Optionally, you might want to retry the symbol analysis here (more complex)
except Exception as e:
error_msg = f"Critical error processing {symbol}: {e}\n{traceback.format_exc()}"
all_log_msgs.append(error_msg)
print(error_msg)
# Add placeholder row to heatmap and default backtest results if analysis failed critically
coin_name = symbol.split('/')[0]
row = {'Coin': coin_name}
for tf in self.valid_timeframes: row[tf] = 0
heatmap_data_list.append(row)
self.heatmap_details[coin_name] = {tf: {} for tf in self.valid_timeframes} # Empty hover details
for tf in self.valid_timeframes:
# Add default failed backtest result for this timeframe
self.backtest_results.append({'symbol': symbol, 'timeframe': tf, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0})
progress(0.95, desc="Finalizing results...") # Progress before final processing
# --- Final Processing ---
if not heatmap_data_list:
self.heatmap_df = pd.DataFrame()
all_log_msgs.append("Warning: No heatmap data generated (no symbols processed successfully?).")
else:
self.heatmap_df = pd.DataFrame(heatmap_data_list).set_index('Coin')
# Ensure columns are ordered correctly according to valid_timeframes
if self.valid_timeframes:
# Filter out columns not in valid_timeframes (if any slipped through)
cols_to_keep = [tf for tf in self.valid_timeframes if tf in self.heatmap_df.columns]
self.heatmap_df = self.heatmap_df[cols_to_keep]
# Reindex to ensure all valid_timeframes are present, filling missing with 0
# Make sure valid_timeframes is used for columns
self.heatmap_df = self.heatmap_df.reindex(columns=self.valid_timeframes, fill_value=0)
self.heatmap_df.index.name = 'Coin'
# Create Active Signals DataFrame from the collected list
if all_active_signals_list:
self.active_signals_df = pd.DataFrame(all_active_signals_list)
# Sort for better presentation
self.active_signals_df['tf_order'] = self.active_signals_df['Timeframe'].map(TIMEFRAME_ORDER_MAP)
self.active_signals_df = self.active_signals_df.sort_values(by=['Symbol', 'tf_order']).drop('tf_order', axis=1)
# Reorder columns for consistency
self.active_signals_df = self.active_signals_df[['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']]
else:
# Create empty DF with correct columns if no signals found
self.active_signals_df = pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'])
# Save backtest results to CSV
if self.backtest_results:
# Filter out any potential None results before creating DataFrame
valid_bt_results = [res for res in self.backtest_results if isinstance(res, dict)]
if valid_bt_results:
bt_df = pd.DataFrame(valid_bt_results)
try:
bt_df.to_csv(BACKTEST_RESULTS_FILE, index=False)
all_log_msgs.append(f"Backtest summary saved to {BACKTEST_RESULTS_FILE}")
except Exception as e:
all_log_msgs.append(f"Error saving backtest results: {e}")
print(f"Error saving backtest results: {e}\n{traceback.format_exc()}")
else:
all_log_msgs.append("No valid backtest results generated to save.")
else:
all_log_msgs.append("No backtest results generated.")
progress(1, desc="Analysis complete.")
all_log_msgs.append("Analysis Complete.")
if os.path.exists(SIGNAL_LOG_FILE):
all_log_msgs.append(f"Signals logged to {SIGNAL_LOG_FILE}")
# Return all generated data
return self.analysis_results, self.heatmap_df, self.backtest_results, self.active_signals_df, self.heatmap_details, all_log_msgs
# --- Gradio Helper Functions ---
def create_plotly_heatmap(df, heatmap_details):
"""Creates heatmap with enhanced hover text including key indicator values."""
if df is None or df.empty:
print("Heatmap DF is empty, returning empty figure.")
return go.Figure(layout=go.Layout(title="No data available for heatmap", height=300))
colorscale = pcolors.diverging.RdYlGn
# Ensure data is numeric for heatmap values, coerce errors
try:
# Ensure index and columns are strings for processing if needed
df.index = df.index.astype(str)
df.columns = df.columns.astype(str)
heatmap_values = df.apply(pd.to_numeric, errors='coerce').fillna(0).values
rows = list(df.index)
cols = list(df.columns)
except Exception as e:
print(f"Error preparing heatmap data: {e}")
return go.Figure(layout=go.Layout(title=f"Error preparing heatmap: {e}", height=300))
# Create hover text matrix using heatmap_details
hover_texts = []
for r_idx, coin in enumerate(rows):
row_texts = []
coin_details_by_tf = heatmap_details.get(coin, {}) # Get details for the coin (keyed by BASE coin name now)
for c_idx, tf in enumerate(cols):
try:
score = heatmap_values[r_idx, c_idx]
# Lookup details for this specific coin/tf from the pre-computed dict
details = coin_details_by_tf.get(tf, {}) # Get details for this TF
# Format values from details, handle N/A or missing data gracefully
price_val = details.get('Price', 'N/A')
rsi_val = details.get('RSI', 'N/A')
macd_val = details.get('MACD', 'N/A')
stochk_val = details.get('StochK', 'N/A')
adx_val = details.get('ADX', 'N/A')
willr_val = details.get('WillR', 'N/A')
# Build hover text string
text = f"<b>Coin:</b> {coin}<br>"
text += f"<b>Timeframe:</b> {tf}<br>"
# Use more precision for price display in hover
text += f"<b>Price:</b> {float(price_val):.8f}<br>" if isinstance(price_val, (int, float)) and not pd.isna(price_val) else f"<b>Price:</b> {price_val}<br>"
text += f"<b>Score:</b> {score:.3f}<br>" # Show score from heatmap itself
text += "----------<br>"
# Format indicator values nicely
try: text += f"RSI: {float(rsi_val):.1f}<br>" if isinstance(rsi_val, (int, float)) and not pd.isna(rsi_val) else f"RSI: {rsi_val}<br>"
except: text += f"RSI: {rsi_val}<br>" # Fallback if conversion fails
try: text += f"MACD: {float(macd_val):.6f}<br>" if isinstance(macd_val, (int, float)) and not pd.isna(macd_val) else f"MACD: {macd_val}<br>"
except: text += f"MACD: {macd_val}<br>"
try: text += f"Stoch K: {float(stochk_val):.1f}<br>" if isinstance(stochk_val, (int, float)) and not pd.isna(stochk_val) else f"Stoch K: {stochk_val}<br>"
except: text += f"Stoch K: {stochk_val}<br>"
try: text += f"ADX: {float(adx_val):.1f}<br>" if isinstance(adx_val, (int, float)) and not pd.isna(adx_val) else f"ADX: {adx_val}<br>"
except: text += f"ADX: {adx_val}<br>"
try: text += f"Will %R: {float(willr_val):.1f}<br>" if isinstance(willr_val, (int, float)) and not pd.isna(willr_val) else f"Will %R: {willr_val}<br>"
except: text += f"Will %R: {willr_val}<br>"
text += "<extra></extra>" # Hide default Plotly hover labels
row_texts.append(text)
except Exception as hover_e:
print(f"Error generating hover text for {coin}/{tf}: {hover_e}")
row_texts.append(f"Error displaying hover for {coin}/{tf}") # Add error placeholder
hover_texts.append(row_texts)
try:
fig = go.Figure(data=go.Heatmap(
z=heatmap_values,
x=cols,
y=rows,
colorscale=colorscale,
zmid=0, zmin=-0.6, zmax=0.6, # Adjusted range based on normalized score
hoverongaps=False,
hoverinfo='text', # Use the custom text matrix for hover
text=hover_texts, # Assign the text matrix
texttemplate=None # Disable texttemplate when using hoverinfo='text'
))
fig.update_layout(
title='Cryptocurrency Signal Strength Heatmap (Hover for Key Values, Click Cell for Full Details)',
xaxis_title='Timeframe',
yaxis_title='Coin',
yaxis={'tickmode': 'linear', 'tickfont': {'size': 9}, 'automargin': True}, # Automargin for y-axis labels
xaxis={'tickmode': 'linear'},
height=max(450, len(rows) * 18 + 100), # Dynamic height
margin=dict(l=70, r=50, t=60, b=50)
)
# print("Plotly heatmap figure created successfully.") # Debug print
return fig
except Exception as e:
print(f"Error creating Plotly heatmap figure: {e}\n{traceback.format_exc()}")
# Return an empty figure with error message
return go.Figure(layout=go.Layout(title=f"Error creating heatmap: {e}", height=400))
def format_heatmap_click_details(evt: gr.SelectData, current_state):
"""Formats detailed analysis results for the specific cell clicked on the heatmap."""
# Check if event data is valid (using SelectData attributes)
if evt is None or evt.index is None or not isinstance(evt.index, (list, tuple)) or len(evt.index) != 2:
# print("Debug: format_heatmap_click_details called with invalid event data:", evt)
return "Click on a heatmap cell after running analysis to see details here. (Ensure you clicked a colored cell)"
# Check if state and necessary data are present
if not isinstance(current_state, dict) or 'analysis_results' not in current_state or 'heatmap_df' not in current_state:
# print("Debug: State invalid or missing analysis_results or heatmap_df")
return "Analysis data not found in state. Please run the analysis first."
try:
row_index, col_index = evt.index
heatmap_df = current_state.get('heatmap_df')
analysis_data = current_state.get('analysis_results') # This holds the full nested dict {symbol: {tf: details}}
if heatmap_df is None or heatmap_df.empty or analysis_data is None:
# print("Debug: Heatmap or analysis data is None or empty.")
return "Heatmap or analysis data is not available. Please run analysis."
# Validate indices against the DataFrame dimensions
if not (0 <= row_index < len(heatmap_df.index) and 0 <= col_index < len(heatmap_df.columns)):
# print(f"Debug: Indices out of bounds. Row: {row_index} (max: {len(heatmap_df.index)-1}), Col: {col_index} (max: {len(heatmap_df.columns)-1})")
return "Error: Clicked cell index is out of bounds."
coin_name = heatmap_df.index[row_index] # This is the base coin (e.g., 'BTC')
timeframe = heatmap_df.columns[col_index]
# print(f"Debug: Clicked on Coin: {coin_name}, Timeframe: {timeframe}")
# Find the full symbol (e.g., BTC/USDT) in the analysis_data keys
full_symbol = None
for symbol_key in analysis_data.keys():
# Match based on the start of the symbol key (more robust)
if symbol_key.startswith(str(coin_name) + '/'): # Ensure coin_name is string
full_symbol = symbol_key
break
if not full_symbol or full_symbol not in analysis_data:
# print(f"Debug: Full symbol not found or no data for {full_symbol} (Base: {coin_name})")
return f"Details not found for {coin_name} (symbol mismatch or no analysis data?)."
# Get the specific details for the symbol and timeframe
details = analysis_data.get(full_symbol, {}).get(timeframe)
if details is None:
# print(f"Debug: No details found for {full_symbol} on timeframe {timeframe}")
return f"No analysis data available for {coin_name} on {timeframe}."
# --- Format Markdown String ---
values = details.get('values', {}) # Raw indicator values
signals = details.get('signals', {}) # Final signals (-1, 0, 1) per indicator
trade_params = details.get('trade_params', {}) # Entry, SL, TP etc.
direction = details.get('direction', 0) # Overall signal direction
if not values or not signals:
# print(f"Debug: Missing 'values' or 'signals' dict in details for {full_symbol} / {timeframe}")
return f"Incomplete data for {coin_name} [{timeframe}]. Cannot display details."
markdown_str = f"### Details for {coin_name} ({full_symbol}) [{timeframe}]\n\n"
markdown_str += f"- **Timestamp:** {values.get('timestamp', 'N/A')}\n"
price = values.get('price', np.nan)
markdown_str += f"- **Price:** {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- **Price:** {price}\n"
volume = values.get('volume', np.nan)
markdown_str += f"- **Volume:** {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- **Volume:** {volume}\n"
atr = values.get('atr', np.nan)
markdown_str += f"- **ATR:** {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- **ATR:** {atr}\n\n"
# Display the FINAL calculated direction based on the composite score threshold
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
markdown_str += "**Indicator Values & Individual Signals:**\n"
markdown_str += "| Indicator | Value | Signal |\n" # Adjusted padding
markdown_str += "|----------------|-----------------|--------|\n"
# Get keys from signals dict, which should represent all indicators evaluated
indicator_keys = sorted(signals.keys())
for name in indicator_keys:
val = values.get(name, 'N/A') # Get raw value
sig_val = signals.get(name, 0) # Get the final -1, 0, 1 signal
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
# Formatting value carefully
if isinstance(val, (int, float)) and not pd.isna(val):
if abs(val) > 100000: val_str = f"{val:,.0f}" # Large integer
elif abs(val) > 100: val_str = f"{val:,.2f}" # Moderate number
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
else: val_str = f"{val:.6f}" # Default decimal (more precision)
elif pd.isna(val): val_str = "NaN"
else: val_str = str(val) # Non-numeric
# Pad indicator name and value for alignment in Markdown table
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
# --- Add Trade Parameters Section ---
if trade_params and trade_params.get('entry') is not None:
markdown_str += f"\n**Potential Trade Setup (Based on this signal & ATR):**\n"
markdown_str += f"- **Direction:** {'LONG' if direction > 0 else 'SHORT'}\n"
# More precision for trade params display
markdown_str += f"- **Entry:** {trade_params['entry']:.8f}\n"
markdown_str += f"- **Stop Loss:** {trade_params['sl']:.8f}\n"
markdown_str += f"- **Take Profit 1:** {trade_params['tp1']:.8f}\n"
markdown_str += f"- **Take Profit 2:** {trade_params['tp2']:.8f}\n\n"
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
for lev, pnl_data in trade_params.get('lev_profit', {}).items():
tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
sl_loss = pnl_data.get('sl_loss_$','N/A')
# Format P/L values
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
else:
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe at this time.\n"
# print(f"Debug: Successfully formatted details for {full_symbol} / {timeframe}")
return markdown_str
except IndexError:
# print("Debug: IndexError during heatmap click processing.")
return "Error processing click: Index out of range. Please ensure the heatmap is up to date."
except KeyError as e:
# print(f"Debug: KeyError processing click: Missing key '{e}'.")
return f"Error processing click: Missing expected data key '{e}'. Analysis data might be incomplete."
except Exception as e:
print(f"Unexpected error formatting heatmap click details: {e}\n{traceback.format_exc()}")
return f"An unexpected error occurred displaying details for the clicked cell: {e}"
def format_coin_details(symbol, analysis_data, valid_timeframes):
"""Formats full details for a selected coin across all analyzed timeframes."""
if not analysis_data or symbol not in analysis_data:
return f"No analysis data available for {symbol}. Please run analysis first."
coin_data_per_tf = analysis_data[symbol]
markdown_str = f"## Full Details for {symbol}\n\n"
# Iterate through the timeframes the analysis was run for
# Use the provided valid_timeframes list for consistent ordering
for timeframe in valid_timeframes:
markdown_str += f"---\n### Timeframe: {timeframe}\n"
details = coin_data_per_tf.get(timeframe)
# Check if details exist for this timeframe
if details is None:
markdown_str += f"*No analysis data generated for this timeframe.*\n"
continue # Skip to the next timeframe
# Extract components, check if they exist
values = details.get('values')
signals = details.get('signals')
trade_params = details.get('trade_params')
direction = details.get('direction') # Use the stored direction
if values is None or signals is None or trade_params is None or direction is None:
markdown_str += f"*Incomplete analysis data for this timeframe.*\n"
continue
# --- Format Section for this Timeframe (similar to heatmap click) ---
markdown_str += f"- Timestamp: {values.get('timestamp', 'N/A')}\n"
price = values.get('price', np.nan)
markdown_str += f"- Price: {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- Price: {price}\n"
volume = values.get('volume', np.nan)
markdown_str += f"- Volume: {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- Volume: {volume}\n"
atr = values.get('atr', np.nan)
markdown_str += f"- ATR: {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- ATR: {atr}\n\n"
markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
markdown_str += "**Indicator Values & Individual Signals:**\n"
markdown_str += "| Indicator | Value | Signal |\n"
markdown_str += "|----------------|-----------------|--------|\n"
indicator_keys = sorted(signals.keys())
for name in indicator_keys:
val = values.get(name, 'N/A')
sig_val = signals.get(name, 0)
signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
# Value formatting (reuse from heatmap click with more precision)
if isinstance(val, (int, float)) and not pd.isna(val):
if abs(val) > 100000: val_str = f"{val:,.0f}"
elif abs(val) > 100: val_str = f"{val:,.2f}"
elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
else: val_str = f"{val:.6f}" # Default decimal (more precision)
elif pd.isna(val): val_str = "NaN"
else: val_str = str(val)
markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
# Trade Params Section
if trade_params and trade_params.get('entry') is not None:
markdown_str += f"\n**Potential Trade Setup:**\n"
markdown_str += f"- Direction: {'LONG' if direction > 0 else 'SHORT'}\n"
# More precision
markdown_str += f"- Entry: {trade_params['entry']:.8f}\n"
markdown_str += f"- SL: {trade_params['sl']:.8f}\n"
markdown_str += f"- TP1: {trade_params['tp1']:.8f}\n"
markdown_str += f"- TP2: {trade_params['tp2']:.8f}\n\n"
markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
for lev, pnl_data in trade_params.get('lev_profit', {}).items():
tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
sl_loss = pnl_data.get('sl_loss_$','N/A')
tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
else:
markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe.\n"
markdown_str += "\n---\n" # End separator for the coin
return markdown_str
def find_zones(heatmap_df, min_confirmation_threshold):
"""Finds potential long/short zones based on average heatmap score."""
if heatmap_df is None or heatmap_df.empty:
print("Zone finding skipped: Heatmap DF is empty.")
return [], []
try:
# Calculate average score across valid timeframes for each coin
# Ensure we only average over numeric columns
numeric_df = heatmap_df.apply(pd.to_numeric, errors='coerce').dropna(axis=1, how='all') # Drop cols that are all NaN
if numeric_df.empty:
print("Zone finding skipped: No numeric timeframe columns found after coercion.")
return [],[]
# Calculate mean, skipping NaNs if any occurred during coercion
avg_scores = numeric_df.mean(axis=1, skipna=True)
# Define thresholds based on the average score range (-1 to 1 typically)
# Threshold requires a minimum average positive/negative score.
# Example: if min_confirmation_threshold is 0.75, zones need avg > 0.3 or avg < -0.3
zone_threshold_abs = max(0.1, min(1.0, min_confirmation_threshold * 0.4)) # Adjusted multiplier
long_zone_threshold = zone_threshold_abs
short_zone_threshold = -zone_threshold_abs
# print(f"Debug Zones: Long Threshold={long_zone_threshold:.3f}, Short Threshold={short_zone_threshold:.3f}")
# Filter based on thresholds
long_zone_coins = avg_scores[avg_scores >= long_zone_threshold].sort_values(ascending=False)
short_zone_coins = avg_scores[avg_scores <= short_zone_threshold].sort_values(ascending=True)
# Prepare lists of tuples (Coin, Score)
long_zone_list = list(zip(long_zone_coins.index.astype(str), long_zone_coins.round(3)))
short_zone_list = list(zip(short_zone_coins.index.astype(str), short_zone_coins.round(3)))
print(f"Zones Found: {len(long_zone_list)} long, {len(short_zone_list)} short candidates (Threshold +/- {zone_threshold_abs:.3f}).")
return long_zone_list, short_zone_list
except Exception as e:
print(f"Error finding zones: {e}\n{traceback.format_exc()}")
return [],[] # Return empty lists on error
def format_backtest_summary(backtest_results):
"""Formats the list of backtest result dictionaries into a DataFrame for display."""
if not backtest_results:
print("No backtest results to format.")
# Return empty DF with correct columns if no results
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
try:
# Filter out potential non-dict entries just in case
valid_results = [r for r in backtest_results if isinstance(r, dict)]
if not valid_results:
print("No valid dictionary entries found in backtest results.")
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
df = pd.DataFrame(valid_results)
# Ensure required columns exist, fill with defaults if missing
required_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']
for col in required_cols:
if col not in df.columns:
print(f"Warning: Backtest result missing column '{col}', filling default.")
if col in ['trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']:
df[col] = 0 # Default numeric to 0
else:
df[col] = 'N/A' # Default string to N/A
# Sort by symbol then timeframe order
df['tf_order'] = df['timeframe'].map(TIMEFRAME_ORDER_MAP).fillna(99) # Handle potential unknown timeframes
df = df.sort_values(by=['symbol', 'tf_order']).drop('tf_order', axis=1)
# Rename columns for better display clarity
df = df.rename(columns={'pnl_sum': 'pnl_abs_sum', 'pnl_%_sum': 'pnl_%_sum_on_entry'})
# Select and reorder columns for final display
display_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']
# Ensure all display columns exist before selecting
final_cols = [col for col in display_cols if col in df.columns]
df = df[final_cols]
return df
except Exception as e:
print(f"Error formatting backtest summary: {e}\n{traceback.format_exc()}")
# Return empty DF on error
return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
# --- Gradio App Definition ---
def create_gradio_app():
with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue), title="Crypto Signal & Backtest V3") as app:
gr.Markdown("# Crypto Multi-Indicator, Multi-Timeframe Signal & Backtest V3")
gr.Markdown(f"*Warning: Analysis uses up to **{LIMIT_PER_TIMEFRAME}** candles per timeframe and backtests on the last **{BACKTEST_HISTORY_CANDLES}**. This can be **very slow** and **API-intensive**. Rate limits may occur. Use fewer coins/timeframes for faster results. Signals are logged to **`{SIGNAL_LOG_FILE}`**.*")
# Global state to store results between interactions
shared_state = gr.State({
'analysis_results': {}, # {symbol: {tf: {signals, values, trade_params, direction}}}
'heatmap_df': pd.DataFrame(), # DataFrame for heatmap display values
'heatmap_details': {}, # {Coin: {tf: {Ind: Val,...}}} for hover
'backtest_results': [], # List of backtest result dicts
'active_signals_df': pd.DataFrame(), # DataFrame of currently active signals
'valid_timeframes': [], # List of timeframes used in the last run
'analyzer': None # Instance of the analyzer class
})
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Configuration")
# Get available exchanges dynamically, handle potential errors
try:
available_exchanges = ccxt.exchanges
except Exception as e:
print(f"Warning: Could not fetch ccxt exchanges list: {e}")
available_exchanges = [DEFAULT_EXCHANGE_ID] # Fallback
exchange_input = gr.Dropdown(label="Exchange", choices=available_exchanges, value=DEFAULT_EXCHANGE_ID, interactive=True)
top_n_input = gr.Slider(label="Number of Top Coins by Volume", minimum=5, maximum=100, step=5, value=DEFAULT_TOP_N_COINS, interactive=True) # Reduced Max
all_timeframes = list(TIMEFRAME_ORDER_MAP.keys())
timeframe_input = gr.CheckboxGroup(label="Select Timeframes (Fewer = Faster)", choices=all_timeframes, value=DEFAULT_TIMEFRAMES, interactive=True)
run_button = gr.Button("Run Analysis & Backtest", variant="primary")
status_log = gr.Textbox(label="Status Log", lines=15, interactive=False, placeholder="Analysis logs will appear here...", max_lines=30) # Increased lines
with gr.Column(scale=3):
gr.Markdown("## Results")
with gr.Tabs():
with gr.TabItem("Heatmap"):
gr.Markdown("Signal strength heatmap based on composite indicator score. Hover over cells for key values, click a cell for full details below.")
# Use gr.Plot which supports Plotly and the 'select' event for clicks
# Explicitly set label for Plot component
heatmap_plot = gr.Plot(label="Signal Heatmap", show_label=False) # show_label=False to hide the default label if desired
heatmap_detail_output = gr.Markdown(label="Clicked Cell Full Details", value="*Click on a heatmap cell after analysis to see full indicator details and trade setup.*") # Placeholder text
with gr.TabItem("Active Trade Setups"):
gr.Markdown("### Potential Trade Setups (Current Snapshot)")
gr.Markdown(f"*Shows pairs and timeframes with a non-neutral signal direction and valid ATR-based parameters from the **latest** analyzed candle. Signals logged to `{SIGNAL_LOG_FILE}`. This is NOT financial advice. DYOR!*")
active_signals_table = gr.DataFrame(
label="Active Signals",
headers=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'],
datatype=['str'] * 7, # Treat all as strings for display consistency with precision
interactive=False,
row_count=(10, "dynamic"), # Show ~10 rows, allow scroll
col_count=(7, "fixed"),
wrap=True
)
with gr.TabItem("Zones"):
gr.Markdown("### Potential Long/Short Zones")
zone_threshold_display = max(0.1, min(1.0, DEFAULT_MIN_CONFIRMATION * 0.4)) # Calculate display threshold
gr.Markdown(f"*Coins with the highest/lowest average signal score across the selected timeframes (based on an average score threshold of ~**+/- {zone_threshold_display:.2f}**). Indicates potential broader trend alignment.*")
with gr.Row():
long_zone_output = gr.DataFrame(label="Potential Long Zone (Highest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
short_zone_output = gr.DataFrame(label="Potential Short Zone (Lowest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
with gr.TabItem("Full Coin Details"):
gr.Markdown("Select a coin analyzed in the heatmap to view its detailed indicator values, signals, and potential trade setup across all selected timeframes.")
coin_selector = gr.Dropdown(label="Select Coin to View All Timeframe Details", choices=[], interactive=False) # Initially disabled
coin_detail_output = gr.Markdown(label="Detailed Indicator Values & Trade Setup per Timeframe", value="*Select a coin from the dropdown after analysis runs.*")
with gr.TabItem("Backtest Summary"):
gr.Markdown(f"### Simplified Backtest Results (ATR TP1/SL Strategy)")
gr.Markdown(f"*Note: Simulated on last **{BACKTEST_HISTORY_CANDLES}** candles per timeframe. Assumes entry on signal candle's open, exits on TP1/SL hit within the **next** candle's high/low. Includes estimated ~{SIMULATED_FEE_PERCENT*2:.2f}% round-trip fee. **This is a highly simplified simulation for indicative purposes only and NOT investment advice.** Results also saved to `{BACKTEST_RESULTS_FILE}`.*")
backtest_summary_df = gr.DataFrame(
label="Backtest Metrics per Symbol/Timeframe",
interactive=False,
wrap=True,
row_count=(15, "dynamic"), # Show more rows
col_count=(6, "fixed")
)
# --- Event Handler: Run Button ---
def analysis_process_wrapper(exchange, top_n, timeframes, current_state, progress=gr.Progress(track_tqdm=True)):
"""Wrapper to run analysis and update UI components."""
start_time = time.time()
log = ["Initializing analysis..."]
# Clear previous results visually and reset state components
current_state = { # Reset state explicitly
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
initial_updates = {
status_log: "\n".join(log),
heatmap_plot: None, # Clear plot
heatmap_detail_output: "Running analysis...",
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), # Clear table
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
coin_selector: gr.Dropdown(choices=[], value=None, label="Select Coin...", interactive=False), # Disable dropdown
coin_detail_output: "Running analysis...",
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), # Clear backtest table
shared_state: current_state # Update the state with cleared data
}
yield initial_updates # Update UI immediately
try:
# Validate inputs
if not exchange:
log.append("Error: No exchange selected.")
yield {status_log: "\n".join(log), shared_state: current_state}
return
if not timeframes:
log.append("Error: No timeframes selected.")
yield {status_log: "\n".join(log), shared_state: current_state}
return
analyzer = CryptoTrendIndicator(exchange, int(top_n), timeframes)
current_state['analyzer'] = analyzer # Store analyzer instance
if not analyzer.valid_timeframes:
log.append(f"Error: No valid timeframes found or supported for exchange '{exchange}'. Check selection or exchange capabilities.")
yield {status_log: "\n".join(log), shared_state: current_state}
return # Stop processing
log.append(f"Analyzer initialized for {exchange} | {top_n} coins | Timeframes: {analyzer.valid_timeframes}")
log.append(f"Fetching up to {LIMIT_PER_TIMEFRAME} candles | Backtesting last {BACKTEST_HISTORY_CANDLES} candles.")
log.append("Starting data fetch and analysis (this may take several minutes)...")
yield {status_log: "\n".join(log)} # Update log
# Run the full analysis, get all results
analysis_results, heatmap_df, backtest_results, active_signals_df, heatmap_details, log_msgs = analyzer.run_full_analysis(progress=progress) # Pass progress tracker
log.extend(log_msgs) # Add logs from the analysis process
# Update state with the new results
current_state['analysis_results'] = analysis_results
current_state['heatmap_df'] = heatmap_df if isinstance(heatmap_df, pd.DataFrame) else pd.DataFrame() # Ensure DF
current_state['heatmap_details'] = heatmap_details if isinstance(heatmap_details, dict) else {}
current_state['backtest_results'] = backtest_results if isinstance(backtest_results, list) else []
current_state['active_signals_df'] = active_signals_df if isinstance(active_signals_df, pd.DataFrame) else pd.DataFrame()
current_state['valid_timeframes'] = analyzer.valid_timeframes
# --- Prepare final UI updates ---
# Heatmap & Coin Selector
if not current_state['heatmap_df'].empty:
fig = create_plotly_heatmap(current_state['heatmap_df'], current_state['heatmap_details'])
coin_list = sorted(current_state['heatmap_df'].index.astype(str).tolist())
coin_selector_update = gr.Dropdown(choices=coin_list, value=None, label="Select Coin...", interactive=True) # Enable dropdown
heatmap_detail_msg = "Click on a heatmap cell for specific details."
else:
log.append("Warning: Heatmap data is empty after analysis.")
fig = go.Figure(layout=go.Layout(title="No heatmap data generated", height=300)) # Empty figure
coin_list = []
coin_selector_update = gr.Dropdown(choices=[], value=None, label="No Coins Analyzed", interactive=False) # Keep disabled
heatmap_detail_msg = "No heatmap data generated. Check logs."
# Zones
long_coins, short_coins = find_zones(current_state['heatmap_df'], DEFAULT_MIN_CONFIRMATION)
# Convert list of tuples directly for Gradio DataFrame
long_df_data = long_coins if long_coins else [(" ", " ")] # Placeholder if empty
short_df_data = short_coins if short_coins else [(" ", " ")]
# Backtest Summary
bt_summary_display_df = format_backtest_summary(current_state['backtest_results'])
if bt_summary_display_df.empty:
bt_summary_display_df = pd.DataFrame([{'symbol': 'No results', 'timeframe': '', 'trades': 0, 'win_rate': 0, 'pnl_abs_sum': 0, 'pnl_%_sum_on_entry': 0}])
# Active Signals
active_signals_display_df = current_state['active_signals_df']
if active_signals_display_df.empty:
active_signals_display_df = pd.DataFrame([{'Symbol': 'No active signals', 'Timeframe': '', 'Direction': '', 'Entry': '', 'SL': '', 'TP1': '', 'TP2': ''}])
end_time = time.time()
log.append(f"Analysis & Backtest finished in {end_time - start_time:.2f} seconds.")
# Prepare the final dictionary of updates for the UI
final_updates = {
status_log: "\n".join(log),
heatmap_plot: fig,
heatmap_detail_output: heatmap_detail_msg,
active_signals_table: active_signals_display_df, # Show the active signals table
long_zone_output: gr.DataFrame(value=long_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
short_zone_output: gr.DataFrame(value=short_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
coin_selector: coin_selector_update, # Update dropdown with choices
coin_detail_output: "Select a coin from the dropdown above." if coin_list else "No analysis results.",
backtest_summary_df: bt_summary_display_df, # Show backtest summary
shared_state: current_state # IMPORTANT: Update the state with all results
}
yield final_updates
except ValueError as ve:
# Catch initialization or config errors
log.append(f"--- CONFIGURATION ERROR ---")
error_details = f"{str(ve)}\n{traceback.format_exc()}"
log.append(error_details)
print(error_details)
current_state = { # Ensure state is reset
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
yield { status_log: "\n".join(log), shared_state: current_state } # Update log and state
except Exception as e:
log.append(f"--- FATAL ERROR DURING ANALYSIS ---")
error_details = f"{str(e)}\n{traceback.format_exc()}"
log.append(error_details)
print(error_details)
# Reset state components on fatal error during run
current_state = { # Ensure state is reset
'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
'backtest_results': [], 'active_signals_df': pd.DataFrame(),
'valid_timeframes': [], 'analyzer': None
}
error_updates = {
status_log: "\n".join(log),
# Keep other outputs cleared or show error message
heatmap_plot: None,
heatmap_detail_output: f"Analysis failed. Check logs.\nError: {e}",
active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']),
long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
coin_selector: gr.Dropdown(choices=[], value=None, label="Error", interactive=False),
coin_detail_output: f"Analysis failed. Check logs.\nError: {e}",
backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']),
shared_state: current_state # Update state with cleared data
}
yield error_updates
run_button.click(
fn=analysis_process_wrapper,
inputs=[exchange_input, top_n_input, timeframe_input, shared_state],
outputs=[ # List all components that can be updated
status_log, heatmap_plot, heatmap_detail_output, active_signals_table,
long_zone_output, short_zone_output, coin_selector, coin_detail_output,
backtest_summary_df, shared_state
]
)
# --- Event Handler: Coin Dropdown Change ---
def display_full_details_handler(selected_coin, current_state):
"""Handles dropdown change to show full details for a selected coin."""
if not selected_coin:
return "Select a coin from the dropdown."
# Check state validity
if not isinstance(current_state, dict) or not current_state.get('analysis_results') or not current_state.get('valid_timeframes'):
print("Debug Coin Select: State invalid or missing data.")
return "Run analysis first or analysis data is missing/incomplete in state."
analysis_results = current_state['analysis_results']
valid_tfs = current_state.get('valid_timeframes', [])
# Find the full symbol (e.g., BTC/USDT) based on the selected base coin name
full_symbol = None
for symbol_key in analysis_results.keys():
# Ensure comparison is string-to-string
if str(symbol_key).startswith(str(selected_coin) + '/'):
full_symbol = symbol_key
break # Found the first match
if not full_symbol:
print(f"Debug Coin Select: Full symbol not found for base {selected_coin}")
return f"Details not found for {selected_coin} in the current analysis results."
if not valid_tfs:
print(f"Debug Coin Select: Valid timeframes list missing for {selected_coin}")
return f"Valid timeframes list is missing from state for {selected_coin}."
# Call the formatting function
return format_coin_details(full_symbol, analysis_results, valid_tfs)
coin_selector.change(
fn=display_full_details_handler,
inputs=[coin_selector, shared_state],
outputs=[coin_detail_output]
)
# --- Event Handler: Heatmap Click ---
# Use .select event for gr.Plot with Plotly figures
heatmap_plot.change( # Use .select for Plotly click events
fn=format_heatmap_click_details,
inputs=[shared_state], # Pass the whole state
outputs=[heatmap_detail_output] # Update the Markdown component below heatmap
)
return app
# --- Main Execution ---
if __name__ == "__main__":
print("\n--- Crypto Analysis App V3 ---")
# Check if results/log files exist and inform user
for fpath in [BACKTEST_RESULTS_FILE, SIGNAL_LOG_FILE]:
if os.path.exists(fpath):
print(f"INFO: Existing file found: '{fpath}'. It may be appended to or overwritten on the next run.")
else:
print(f"INFO: Results/Logs will be saved to '{fpath}' after analysis.")
print("\nStarting Crypto Analysis Gradio App...")
print("------------------------------------------------------")
print(f"CONFIG: Backtest Candles={BACKTEST_HISTORY_CANDLES}, Fetch Limit={LIMIT_PER_TIMEFRAME}")
print(f"CONFIG: Default Exchange={DEFAULT_EXCHANGE_ID}, Top Coins={DEFAULT_TOP_N_COINS}, Timeframes={DEFAULT_TIMEFRAMES}")
print("WARNING: Initial analysis might be slow due to extensive data fetching and calculations.")
print("Ensure you have required libraries: pandas, numpy, ccxt, ta, plotly, gradio")
print("------------------------------------------------------")
gradio_app = create_gradio_app()
# Launch the app (debug=False for production/sharing, debug=True for development errors)
# share=True can be used to create a temporary public link (use with caution)
# Increase max_threads if analysis is CPU-bound and you have cores, but be mindful of API rate limits
gradio_app.queue().launch(debug=False, max_threads=4) # Enable queue for better handling of long processes |