import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split import gradio as gr import os # Define the Dataset class class BankNiftyDataset(Dataset): def __init__(self, data, seq_len, target_cols=['close']): self.data = data self.seq_len = seq_len self.target_cols = target_cols def __len__(self): return max(0, len(self.data) - self.seq_len + 1) def __getitem__(self, idx): seq_data = self.data.iloc[idx:idx+self.seq_len] features = torch.tensor(seq_data[['open', 'high', 'low', 'close', 'volume', 'oi']].values, dtype=torch.float32) label = torch.tensor(seq_data[self.target_cols].iloc[-1].values, dtype=torch.float32) return features, label # Define the LSTM model class LSTMModel(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout=0.1): super(LSTMModel, self).__init__() self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) self.fc = nn.Sequential( nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_dim // 2, output_dim) ) def forward(self, x): lstm_out, _ = self.lstm(x) out = self.fc(lstm_out[:, -1, :]) return out # Function to train the model def train_model(model, train_loader, val_loader, num_epochs=10): criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) best_val_loss = float('inf') best_model = None for epoch in range(num_epochs): model.train() for features, labels in train_loader: optimizer.zero_grad() outputs = model(features) loss = criterion(outputs, labels) loss.backward() optimizer.step() model.eval() val_loss = 0 with torch.no_grad(): for features, labels in val_loader: outputs = model(features) val_loss += criterion(outputs, labels).item() val_loss /= len(val_loader) print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}") if val_loss < best_val_loss: best_val_loss = val_loss best_model = model.state_dict().copy() model.load_state_dict(best_model) return model, best_val_loss # Function to generate trading signals def generate_signals(predictions, actual_values, stop_loss_threshold=0.05): signals = [] for pred, actual in zip(predictions, actual_values): if pred > actual * (1 + stop_loss_threshold): signals.append("Buy CE") elif pred < actual * (1 - stop_loss_threshold): signals.append("Buy PE") else: signals.append("Hold") return signals # Function to generate a report def generate_report(predictions, actual_values, signals, val_loss): report = [] cumulative_profit = 0 for i in range(len(signals)): signal = signals[i] profit = actual_values[i] - predictions[i] if signal == "Buy CE": cumulative_profit += profit elif signal == "Buy PE": cumulative_profit -= profit report.append(f"Signal: {signal}, Actual: {actual_values[i]:.2f}, Predicted: {predictions[i]:.2f}, Profit: {profit:.2f}") total_profit = cumulative_profit report.append(f"Total Profit: {total_profit:.2f}") report.append(f"Model Validation Loss: {val_loss:.4f}") return "\n".join(report) # Global variables to store the model and scaler global_model = None global_scaler = None # Function to process data and make predictions def predict(): global global_model, global_scaler # Load the pre-existing CSV file csv_path = 'BANKNIFTY_OPTION_CHAIN_data.csv' if not os.path.exists(csv_path): return "Error: CSV file not found in the expected location." # Load and preprocess data data = pd.read_csv(csv_path) if global_scaler is None: global_scaler = StandardScaler() scaled_data = global_scaler.fit_transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) else: scaled_data = global_scaler.transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) data[['open', 'high', 'low', 'close', 'volume', 'oi']] = scaled_data # Split data train_data, val_data = train_test_split(data, test_size=0.2, random_state=42) # Create datasets and dataloaders seq_len = 20 target_cols = ['close'] train_dataset = BankNiftyDataset(train_data, seq_len, target_cols) val_dataset = BankNiftyDataset(val_data, seq_len, target_cols) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) # Initialize and train the model input_dim = 6 hidden_dim = 64 output_dim = len(target_cols) if global_model is None: global_model = LSTMModel(input_dim, hidden_dim, output_dim) global_model, val_loss = train_model(global_model, train_loader, val_loader) # Make predictions global_model.eval() predictions = [] actual_values = val_data['close'].values[seq_len-1:] with torch.no_grad(): for i in range(len(val_dataset)): features, _ = val_dataset[i] pred = global_model(features.unsqueeze(0)).item() predictions.append(pred) # Generate signals and report signals = generate_signals(predictions, actual_values) report = generate_report(predictions, actual_values, signals, val_loss) return report # Set up the Gradio interface iface = gr.Interface( fn=predict, inputs=None, outputs=gr.Textbox(label="Prediction Report"), title="BankNifty Option Chain Predictor", description="Click 'Submit' to generate predictions and trading signals based on the latest BankNifty option chain data. The model is automatically trained and improved with each run." ) # Launch the app iface.launch()