mohitkumarrajbadi's picture
New Improvement in Pages
e37cfd0
import streamlit as st
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import matplotlib.pyplot as plt
import time
import json
import re
import os
import asyncio
from dotenv import load_dotenv
from scipy.stats import skew, kurtosis, zscore
import llama_cpp
# -------------------------------
# Environment and Token Management
# -------------------------------
# Load environment variables from .env file in local development
load_dotenv()
def get_hf_token():
"""
Retrieves HF token from secrets or .env file.
"""
token = os.getenv("HF_TOKEN") # Prioritize environment variable
# If not found, fallback to Streamlit secrets
if not token:
try:
token = st.secrets["HF_TOKEN"]
except (FileNotFoundError, KeyError):
st.error("❌ HF_TOKEN not found. Add it to .env or secrets.toml.")
return None
return token
# -------------------------------
# Model Loading and Management
# -------------------------------
async def async_load(model_id: str):
"""
Dummy async function to initialize the event loop.
"""
await asyncio.sleep(0.1)
@st.cache_resource
def load_model(model_id: str, token: str, checkpoint_path: str = None):
"""
Loads and caches the Gemma model and tokenizer with the Hugging Face token.
Args:
model_id (str): The Hugging Face model ID.
token (str): The authentication token.
checkpoint_path (str): Optional path to a fine-tuned model checkpoint.
Returns:
tuple: tokenizer, model
"""
try:
asyncio.run(async_load(model_id))
tokenizer = AutoTokenizer.from_pretrained(model_id, token=token)
model = AutoModelForCausalLM.from_pretrained(model_id, token=token)
# Load fine-tuned checkpoint if provided
if checkpoint_path and os.path.exists(checkpoint_path):
model.load_state_dict(torch.load(checkpoint_path, map_location=torch.device('cpu')))
model.eval()
st.success("βœ… Fine-tuned model loaded successfully!")
return tokenizer, model
except Exception as e:
st.error(f"❌ Model loading failed: {e}")
return None, None
# -------------------------------
# Model Saving Function
# -------------------------------
def save_model(model, model_name: str):
"""
Saves the fine-tuned model to the specified path.
Args:
model (torch.nn.Module): The PyTorch model instance.
model_name (str): The file path to save the model.
Returns:
str: The path where the model is saved.
"""
try:
# Ensure the models directory exists
os.makedirs(os.path.dirname(model_name), exist_ok=True)
# Save the model
torch.save(model.state_dict(), model_name)
st.success(f"βœ… Model saved successfully at `{model_name}`")
return model_name
except Exception as e:
st.error(f"❌ Failed to save model: {e}")
return None
# -------------------------------
# File Processing and Cleaning
# -------------------------------
def preprocess_data(uploaded_file, file_extension):
"""
Reads the uploaded file and returns a processed version.
Supports CSV, JSONL, and TXT.
"""
try:
if file_extension == "csv":
return pd.read_csv(uploaded_file)
elif file_extension == "jsonl":
data = [json.loads(line) for line in uploaded_file.readlines()]
try:
return pd.DataFrame(data)
except Exception:
st.warning("⚠️ Unable to convert JSONL to table. Previewing raw JSON.")
return data
elif file_extension == "txt":
text_data = uploaded_file.read().decode("utf-8")
return text_data.splitlines()
except Exception as e:
st.error(f"❌ Error processing file: {e}")
return None
def clean_text(text, lowercase=True, remove_punctuation=True):
"""
Cleans text data by applying basic normalization.
"""
if lowercase:
text = text.lower()
if remove_punctuation:
text = re.sub(r'[^\w\s]', '', text)
return text
# -------------------------------
# Model Conversion and Quantization
# -------------------------------
def quantize_model(model):
"""
Applies dynamic quantization.
"""
try:
quantized_model = torch.quantization.quantize_dynamic(
model, {torch.nn.Linear}, dtype=torch.qint8
)
st.success("βœ… Model quantized successfully!")
return quantized_model
except Exception as e:
st.error(f"❌ Quantization failed: {e}")
return model
def convert_to_torchscript(model, output_path="model_ts.pt"):
"""
Converts the model to TorchScript format.
"""
try:
example_input = torch.randint(0, 100, (1, 10))
traced_model = torch.jit.trace(model, example_input)
traced_model.save(output_path)
return output_path
except Exception as e:
st.error(f"❌ TorchScript conversion failed: {e}")
return None
def convert_to_onnx(model, output_path="model.onnx"):
"""
Converts the model to ONNX format.
"""
try:
dummy_input = torch.randint(0, 100, (1, 10))
torch.onnx.export(model, dummy_input, output_path, input_names=["input"], output_names=["output"])
return output_path
except Exception as e:
st.error(f"❌ ONNX conversion failed: {e}")
return None
# Convert to GGUF (for Llama.cpp)
def convert_to_gguf(model, output_path="model.gguf"):
llama_cpp.export_gguf(model, output_path)
return output_path
# Convert to TensorFlow SavedModel
def convert_to_tf_saved_model(model, output_path="model_tf"):
tf_model = tf.Module()
# Export the PyTorch model to TensorFlow using ONNX as intermediary
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(model, dummy_input, "temp_model.onnx")
# Load ONNX model into TensorFlow
import onnx
from onnx_tf.backend import prepare
onnx_model = onnx.load("temp_model.onnx")
tf_rep = prepare(onnx_model)
tf_rep.export_graph(output_path)
return output_path
# Convert to PyTorch format
def convert_to_pytorch(model, output_path="model.pth"):
torch.save(model.state_dict(), output_path)
return output_path
# -------------------------------
# Model Inference and Training
# -------------------------------
def simulate_training(num_epochs):
"""
Simulates a training loop for demonstration.
Yields current epoch, loss values, and accuracy values.
"""
loss_values = []
accuracy_values = []
for epoch in range(1, num_epochs + 1):
loss = np.exp(-epoch) + np.random.random() * 0.1
acc = 0.5 + (epoch / num_epochs) * 0.5 + np.random.random() * 0.05
loss_values.append(loss)
accuracy_values.append(acc)
yield epoch, loss_values, accuracy_values
time.sleep(1)
def plot_training_metrics(epochs, loss_values, accuracy_values):
"""
Plots training loss and accuracy.
"""
fig, ax = plt.subplots(1, 2, figsize=(12, 4))
ax[0].plot(range(1, epochs+1), loss_values, marker='o', color='red')
ax[0].set_title("Training Loss")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("Loss")
ax[1].plot(range(1, epochs+1), accuracy_values, marker='o', color='green')
ax[1].set_title("Training Accuracy")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Accuracy")
return fig
def generate_response(prompt, model, tokenizer, max_length=200):
"""
Generates a response using the fine-tuned model.
"""
try:
inputs = tokenizer(prompt, return_tensors="pt").input_ids
with torch.no_grad():
outputs = model.generate(inputs, max_length=max_length, num_return_sequences=1, temperature=0.7)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
except Exception as e:
st.error(f"❌ Response generation failed: {e}")
return ""
# -------------------------------
# Model Loading for Inference
# -------------------------------
def load_finetuned_model(model, checkpoint_path="fine_tuned_model.pt"):
"""
Loads a fine-tuned model from a checkpoint.
"""
if os.path.exists(checkpoint_path):
model.load_state_dict(torch.load(checkpoint_path, map_location=torch.device('cpu')))
model.eval()
st.success("βœ… Fine-tuned model loaded successfully!")
else:
st.error(f"❌ Checkpoint not found: {checkpoint_path}")
return model
import pandas as pd
import os
import pyarrow as pa
import numpy as np
from scipy.stats import zscore, kurtosis, skew
# ======================================
# Dataset Operations
# ======================================
def load_dataset(path: str) -> pd.DataFrame:
"""Load dataset from CSV with error handling."""
try:
df = pd.read_csv(path)
return make_arrow_compatible(df)
except Exception as e:
print(f"Error loading dataset: {e}")
return pd.DataFrame()
def save_dataset(df: pd.DataFrame, path: str):
"""Save dataset to CSV with error handling."""
try:
df.to_csv(path, index=False)
except Exception as e:
print(f"Error saving dataset: {e}")
def list_datasets(directory: str = "datasets") -> list:
"""List all available datasets in the directory."""
try:
return [f for f in os.listdir(directory) if f.endswith(('.csv', '.json', '.xlsx'))]
except Exception as e:
print(f"Error listing datasets: {e}")
return []
# ======================================
# Data Cleaning Functions
# ======================================
def clean_dataset(
df: pd.DataFrame,
remove_duplicates: bool = True,
fill_missing: bool = False,
fill_value: str = "0",
trim_spaces: bool = True
) -> pd.DataFrame:
"""
Clean the dataset with multiple operations:
- Remove duplicates
- Fill missing values
- Trim spaces
- Remove empty columns and rows
- Auto-cast date columns
"""
# Remove duplicates
if remove_duplicates:
df = df.drop_duplicates()
# Fill missing values
if fill_missing:
df = df.fillna(fill_value)
# Trim spaces
if trim_spaces:
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
# Remove empty columns & rows
df = df.dropna(how="all", axis=1)
df = df.dropna(how="all", axis=0)
# Auto-cast date columns
for col in df.columns:
try:
df[col] = pd.to_datetime(df[col])
except (ValueError, TypeError):
pass
return make_arrow_compatible(df)
# --------------------------------------
# Dataset Quality Score
# --------------------------------------
def compute_dataset_score(df):
"""Compute dataset quality score."""
if df.empty:
return 0.0
total_cells = np.prod(df.shape)
missing_cells = df.isnull().sum().sum()
missing_ratio = missing_cells / total_cells
duplicate_ratio = 1 - (df.drop_duplicates().shape[0] / df.shape[0])
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
skew_vals = df[numeric_cols].apply(lambda x: np.abs(skew(x.dropna())), axis=0)
kurt_vals = df[numeric_cols].apply(lambda x: np.abs(kurtosis(x.dropna())), axis=0)
numeric_score = 1 - (skew_vals.mean() + kurt_vals.mean()) / 10
else:
numeric_score = 1
score = (1 - missing_ratio) * (1 - duplicate_ratio) * numeric_score * 100
return round(score, 2)
# --------------------------------------
# Outlier Detection
# --------------------------------------
def detect_outliers(df, threshold=3):
"""Detect outliers in numeric columns using Z-score."""
numeric_cols = df.select_dtypes(include=["number"]).columns
outliers = {}
for col in numeric_cols:
z_scores = np.abs(zscore(df[col].dropna()))
outliers[col] = np.sum(z_scores > threshold)
return outliers
# --------------------------------------
# Detect Inconsistent Types
# --------------------------------------
def detect_inconsistent_types(df):
"""Detect inconsistent data types across columns."""
inconsistent_cols = {}
for col in df.columns:
if df[col].apply(type).nunique() > 1:
inconsistent_cols[col] = df[col].apply(type).value_counts().to_dict()
return inconsistent_cols
# ======================================
# Data Transformations
# ======================================
def apply_transformation(df: pd.DataFrame, col: str, transform: str) -> pd.DataFrame:
"""
Apply transformations to a specified column:
- Log Transformation
- Min-Max Normalization
- Z-score Standardization
"""
if col not in df.columns:
raise KeyError(f"Column '{col}' not found in dataset")
if transform == "Log":
df[col] = np.log1p(df[col].replace(0, np.nan)).fillna(0)
elif transform == "Normalize":
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
elif transform == "Standardize":
df[col] = (df[col] - df[col].mean()) / df[col].std()
return make_arrow_compatible(df)
# ======================================
# Normalization & Standardization
# ======================================
def normalize_column(df: pd.DataFrame, col: str) -> pd.DataFrame:
"""Normalize column (Min-Max Scaling)."""
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
return df
def standardize_column(df: pd.DataFrame, col: str) -> pd.DataFrame:
"""Standardize column (Z-score)."""
df[col] = (df[col] - df[col].mean()) / df[col].std()
return df
# ======================================
# Arrow Compatibility & Fixes
# ======================================
def make_arrow_compatible(df: pd.DataFrame) -> pd.DataFrame:
"""
Ensure dataset compatibility with Streamlit Arrow serialization.
"""
for col in df.columns:
if df[col].dtype == object:
try:
df[col] = df[col].astype(str)
except Exception as e:
print(f"Could not convert column {col}: {e}")
return df
def fix_arrow_incompatibility(df: pd.DataFrame) -> pd.DataFrame:
"""
Fix Arrow incompatibility by converting mixed types to `str`.
"""
for col in df.columns:
try:
pa.Table.from_pandas(df[[col]])
except pa.lib.ArrowInvalid:
print(f"Arrow compatibility issue in column: {col}")
df[col] = df[col].astype(str)
return df