Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import Dataset, DataLoader | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.model_selection import train_test_split | |
import gradio as gr | |
import os | |
# Define the Dataset class | |
class BankNiftyDataset(Dataset): | |
def __init__(self, data, seq_len, target_cols=['close']): | |
self.data = data | |
self.seq_len = seq_len | |
self.target_cols = target_cols | |
def __len__(self): | |
return max(0, len(self.data) - self.seq_len + 1) | |
def __getitem__(self, idx): | |
seq_data = self.data.iloc[idx:idx+self.seq_len] | |
features = torch.tensor(seq_data[['open', 'high', 'low', 'close', 'volume', 'oi']].values, dtype=torch.float32) | |
label = torch.tensor(seq_data[self.target_cols].iloc[-1].values, dtype=torch.float32) | |
return features, label | |
# Define the LSTM model | |
class LSTMModel(nn.Module): | |
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, dropout=0.1): | |
super(LSTMModel, self).__init__() | |
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) | |
self.fc = nn.Sequential( | |
nn.Linear(hidden_dim, hidden_dim // 2), | |
nn.ReLU(), | |
nn.Dropout(dropout), | |
nn.Linear(hidden_dim // 2, output_dim) | |
) | |
def forward(self, x): | |
lstm_out, _ = self.lstm(x) | |
out = self.fc(lstm_out[:, -1, :]) | |
return out | |
# Function to train the model | |
def train_model(model, train_loader, val_loader, num_epochs=10): | |
criterion = nn.MSELoss() | |
optimizer = optim.Adam(model.parameters(), lr=0.001) | |
best_val_loss = float('inf') | |
best_model = None | |
for epoch in range(num_epochs): | |
model.train() | |
for features, labels in train_loader: | |
optimizer.zero_grad() | |
outputs = model(features) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
model.eval() | |
val_loss = 0 | |
with torch.no_grad(): | |
for features, labels in val_loader: | |
outputs = model(features) | |
val_loss += criterion(outputs, labels).item() | |
val_loss /= len(val_loader) | |
print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}") | |
if val_loss < best_val_loss: | |
best_val_loss = val_loss | |
best_model = model.state_dict().copy() | |
model.load_state_dict(best_model) | |
return model, best_val_loss | |
# Function to generate trading signals | |
def generate_signals(predictions, actual_values, stop_loss_threshold=0.05): | |
signals = [] | |
for pred, actual in zip(predictions, actual_values): | |
if pred > actual * (1 + stop_loss_threshold): | |
signals.append("Buy CE") | |
elif pred < actual * (1 - stop_loss_threshold): | |
signals.append("Buy PE") | |
else: | |
signals.append("Hold") | |
return signals | |
# Function to generate a report | |
def generate_report(predictions, actual_values, signals, val_loss): | |
report = [] | |
cumulative_profit = 0 | |
for i in range(len(signals)): | |
signal = signals[i] | |
profit = actual_values[i] - predictions[i] | |
if signal == "Buy CE": | |
cumulative_profit += profit | |
elif signal == "Buy PE": | |
cumulative_profit -= profit | |
report.append(f"Signal: {signal}, Actual: {actual_values[i]:.2f}, Predicted: {predictions[i]:.2f}, Profit: {profit:.2f}") | |
total_profit = cumulative_profit | |
report.append(f"Total Profit: {total_profit:.2f}") | |
report.append(f"Model Validation Loss: {val_loss:.4f}") | |
return "\n".join(report) | |
# Global variables to store the model and scaler | |
global_model = None | |
global_scaler = None | |
# Function to process data and make predictions | |
def predict(): | |
global global_model, global_scaler | |
# Load the pre-existing CSV file | |
csv_path = 'BANKNIFTY_OPTION_CHAIN_data.csv' | |
if not os.path.exists(csv_path): | |
return "Error: CSV file not found in the expected location." | |
# Load and preprocess data | |
data = pd.read_csv(csv_path) | |
if global_scaler is None: | |
global_scaler = StandardScaler() | |
scaled_data = global_scaler.fit_transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
else: | |
scaled_data = global_scaler.transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
data[['open', 'high', 'low', 'close', 'volume', 'oi']] = scaled_data | |
# Split data | |
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42) | |
# Create datasets and dataloaders | |
seq_len = 20 | |
target_cols = ['close'] | |
train_dataset = BankNiftyDataset(train_data, seq_len, target_cols) | |
val_dataset = BankNiftyDataset(val_data, seq_len, target_cols) | |
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) | |
# Initialize and train the model | |
input_dim = 6 | |
hidden_dim = 64 | |
output_dim = len(target_cols) | |
if global_model is None: | |
global_model = LSTMModel(input_dim, hidden_dim, output_dim) | |
global_model, val_loss = train_model(global_model, train_loader, val_loader) | |
# Make predictions | |
global_model.eval() | |
predictions = [] | |
actual_values = val_data['close'].values[seq_len-1:] | |
with torch.no_grad(): | |
for i in range(len(val_dataset)): | |
features, _ = val_dataset[i] | |
pred = global_model(features.unsqueeze(0)).item() | |
predictions.append(pred) | |
# Generate signals and report | |
signals = generate_signals(predictions, actual_values) | |
report = generate_report(predictions, actual_values, signals, val_loss) | |
return report | |
# Set up the Gradio interface | |
iface = gr.Interface( | |
fn=predict, | |
inputs=None, | |
outputs=gr.Textbox(label="Prediction Report"), | |
title="BankNifty Option Chain Predictor", | |
description="Click 'Submit' to generate predictions and trading signals based on the latest BankNifty option chain data. The model is automatically trained and improved with each run." | |
) | |
# Launch the app | |
iface.launch() |