import gradio as gr from gradio.components import File as InputFile, Number as InputNumber, Checkbox as InputCheckbox from gradio.components import File as OutputFile import yfinance as yf import pandas as pd import pandas_ta as ta from scipy.signal import find_peaks import csv import os from scipy.signal import argrelextrema from collections import deque import numpy as np def getHigherLows(data: np.array, order=5, K=2): # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are higher than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] < lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerHighs(data: np.array, order=5, K=2): # Get highs high_idx = argrelextrema(data, np.greater, order=order)[0] highs = data[high_idx] # Ensure consecutive highs are lower than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] > highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getHigherHighs(data: np.array, order=5, K=2): # Get highs high_idx = argrelextrema(data, np.greater, order=5)[0] highs = data[high_idx] # Ensure consecutive highs are higher than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] < highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerLows(data: np.array, order=5, K=2): # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are lower than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] > lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def check_trend(hist): close = hist['Close'].values order = 5 K = 2 hh = getHigherHighs(close, order, K) hl = getHigherLows(close, order, K) ll = getLowerLows(close, order, K) lh = getLowerHighs(close, order, K) # Get the most recent top and bottom labels top_labels = hh if len(hh) > 0 and hh[-1][-1] > (lh[-1][-1] if len(lh) > 0 else 0) else lh bottom_labels = hl if len(hl) > 0 and hl[-1][-1] > (ll[-1][-1] if len(ll) > 0 else 0) else ll # Check if the most recent top and bottom labels form a pair if top_labels == hh and bottom_labels == hl: return True, 'bullish' elif top_labels == lh and bottom_labels == ll: return True, 'bearish' else: return False, 'uncertain' def process_csv(csv_file, lookback, bullish_stoch_value, bearish_stoch_value, check_bullish_swing, check_bearish_swing): lookback = int(lookback) all_tickers = [] with open(csv_file.name, 'r') as file: reader = csv.reader(file) next(reader) # Skip the header for row in reader: all_tickers.append(row[0]) # Append the value in the first column ttl_tickers = len(all_tickers) bullish_tickers = [] bearish_tickers = [] for ticker in all_tickers: print(f"Processing {ticker} ({all_tickers.index(ticker)+1}/{ttl_tickers})") try: data = yf.Ticker(ticker) hist = data.history(period="1y", actions=False) if not hist.empty: hist.ta.ema(close='Close', length=20, append=True) hist.ta.ema(close='Close', length=50, append=True) hist.ta.ema(close='Close', length=100, append=True) hist.ta.sma(close='Close', length=150, append=True) stoch = hist.ta.stoch(high='High', low='Low', close='Close') trend_exists, trend_type = check_trend(hist) if all(hist['EMA_20'][-lookback:] > hist['EMA_50'][-lookback:]) and all(hist['EMA_50'][-lookback:] > hist['EMA_100'][-lookback:]) and all(hist['EMA_100'][-lookback:] > hist['SMA_150'][-lookback:]) and stoch['STOCHk_14_3_3'][-1] <= bullish_stoch_value and trend_exists and trend_type == 'bullish': bullish_tickers.append([ticker, hist['Close'][-1], hist['Volume'][-1], 'Bullish']) elif all(hist['EMA_20'][-lookback:] < hist['EMA_50'][-lookback:]) and all(hist['EMA_50'][-lookback:] < hist['EMA_100'][-lookback:]) and all(hist['EMA_100'][-lookback:] < hist['SMA_150'][-lookback:]) and stoch['STOCHk_14_3_3'][-1] >= bearish_stoch_value and trend_exists and trend_type == 'bearish': bearish_tickers.append([ticker, hist['Close'][-1], hist['Volume'][-1], 'Bearish']) except Exception as e: print(f"An error occurred with ticker {ticker}: {e}") df_bullish = pd.DataFrame(bullish_tickers, columns=['Ticker', 'Close', 'Volume', 'Trend']) df_bearish = pd.DataFrame(bearish_tickers, columns=['Ticker', 'Close', 'Volume', 'Trend']) base_filename = os.path.splitext(os.path.basename(csv_file.name))[0] output_bullish_xlsx = f'{base_filename}-bullish.xlsx' output_bearish_xlsx = f'{base_filename}-bearish.xlsx' df_bullish.to_excel(output_bullish_xlsx, index=False) df_bearish.to_excel(output_bearish_xlsx, index=False) output_bullish_txt = f'{base_filename}-bullish.txt' output_bearish_txt = f'{base_filename}-bearish.txt' with open(output_bullish_txt, 'w') as f: for ticker in df_bullish['Ticker']: f.write(f"{ticker}\n") with open(output_bearish_txt, 'w') as f: for ticker in df_bearish['Ticker']: f.write(f"{ticker}\n") return output_bullish_xlsx, output_bearish_xlsx, output_bullish_txt, output_bearish_txt iface = gr.Interface( fn=process_csv, inputs=[ InputFile(label="Upload CSV"), InputNumber(label="EMA Loockback period (days, min 5 and max 60)", value=20), InputNumber(label="Bullish Stochastic Value (equal or below)", value=30), InputNumber(label="Bearish Stochastic Value (equal or above)", value=70), InputCheckbox(label="Check Bullish Swing (HH + HL) (slower, in beta)"), InputCheckbox(label="Check Bearish Swing (LH + LL) (slower, in beta)") ], outputs=[ OutputFile(label="Download Bullish XLSX"), OutputFile(label="Download Bearish XLSX"), OutputFile(label="Download Bullish TXT"), OutputFile(label="Download Bearish TXT") ], title="Stock Analysis", description="""Upload a CSV file with a column named 'Ticker' (from TradingView or other) and get a filtered shortlist of bullish and bearish stocks in return. The tool will find 'stacked' EMAs + SMAs and check for Stochastics above or below the set values.""" ) iface.launch()