import streamlit as st import pandas as pd import seaborn as sns import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.metrics import classification_report, accuracy_score import nbformat as nbf import io import sqlite3 from io import StringIO import os # Constants DB_PATH = "db/database.db" TEMP_DIR = "temp/" # Ensure directories exist os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) # Initialize SQLite database def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS datasets ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, content TEXT NOT NULL ) """) conn.commit() conn.close() # Save dataset to SQLite def save_dataset_to_db(name, content): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT INTO datasets (name, content) VALUES (?, ?)", (name, content)) conn.commit() conn.close() # Fetch all datasets from SQLite def get_datasets(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, name FROM datasets") datasets = cursor.fetchall() conn.close() return datasets # Load dataset by ID def load_dataset_from_db(dataset_id): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT content FROM datasets WHERE id = ?", (dataset_id,)) content = cursor.fetchone() conn.close() if content: return StringIO(content[0]) return None # Initialize database init_db() # Function to detect problem type def detect_problem_type(df, target_column): if target_column not in df.columns: return "Error: Target column not found in the dataset." df_clean = df.dropna(subset=[target_column]) unique_values = df_clean[target_column].nunique() if unique_values == 2: return "binary_classification" elif unique_values > 2: return "multiclass_classification" else: return "Error: Invalid target column (not enough unique values)." # Function to generate notebook content def generate_notebook_code(csv_path, target_column, problem_type): notebook = nbf.v4.new_notebook() code = f""" import pandas as pd import seaborn as sns import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.metrics import classification_report, accuracy_score # Load Dataset df = pd.read_csv("{csv_path}") target_column = "{target_column}" # Display the first few rows print(df.head()) # Check for missing values print("Missing Values:\\n", df.isnull().sum()) # Encode categorical columns categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: df[col] = LabelEncoder().fit_transform(df[col]) # Fill missing values with median df.fillna(df.median(), inplace=True) # Split data into features and target X = df.drop(columns=[target_column]) y = df[target_column] # Standardize numeric columns scaler = StandardScaler() X = scaler.fit_transform(X) # Train/Test Split X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train Models models = [] if "{problem_type}" in ["binary_classification", "multiclass_classification"]: models.append(("Random Forest", RandomForestClassifier())) models.append(("Logistic Regression", LogisticRegression())) models.append(("SVM", SVC())) models.append(("Decision Tree", DecisionTreeClassifier())) # Model Evaluation results = [] for model_name, model in models: model.fit(X_train, y_train) y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) results.append((model_name, accuracy)) print("Model Performance:") for model_name, accuracy in results: print(f"{model_name}: {accuracy}") """ notebook.cells.append(nbf.v4.new_code_cell(code)) return notebook # Streamlit app st.title("Automated Data Science App") st.write("Upload a CSV file and specify the target column to automatically process and train models.") # File upload uploaded_file = st.file_uploader("Upload your CSV file", type="csv") target_column = st.text_input("Enter the target column name") if uploaded_file and target_column: try: df = pd.read_csv(uploaded_file) st.write("Dataset Preview:") st.write(df.head()) st.subheader("Missing Values") st.write(df.isnull().sum()) st.subheader("Basic Statistics") st.write(df.describe()) problem_type = detect_problem_type(df, target_column) if "Error" in problem_type: st.error(problem_type) else: st.write(f"Detected Problem Type: {problem_type}") # Save dataset to database save_dataset_to_db(uploaded_file.name, uploaded_file.getvalue().decode("utf-8")) categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: df[col] = LabelEncoder().fit_transform(df[col]) df.fillna(df.median(), inplace=True) X = df.drop(columns=[target_column]) y = df[target_column] scaler = StandardScaler() X = scaler.fit_transform(X) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) models = [ ("Random Forest", RandomForestClassifier()), ("Logistic Regression", LogisticRegression()), ("SVM", SVC()), ("Decision Tree", DecisionTreeClassifier()) ] results = [] for model_name, model in models: model.fit(X_train, y_train) y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) results.append((model_name, accuracy)) # Display results in a table st.subheader("Model Performance") results_df = pd.DataFrame(results, columns=["Model Name", "Accuracy"]) st.write(results_df) # Display the classification report with proper formatting st.subheader("Classification Report") report = classification_report(y_test, y_pred) st.code(report) # st.text ensures the report is displayed with proper formatting feature_importances = model.feature_importances_ if hasattr(model, "feature_importances_") else None if feature_importances is not None: important_features = pd.Series(feature_importances, index=df.drop(columns=[target_column]).columns) important_features = important_features.sort_values(ascending=False).head(5) st.subheader("Important Features") st.write(important_features) st.subheader("Visualizations") for feature in important_features.index: st.write(f"Box Plot for {feature}") fig, ax = plt.subplots(figsize=(8, 6)) sns.boxplot(x=y, y=df[feature], ax=ax) st.pyplot(fig) st.write(f"Histogram for {feature}") fig, ax = plt.subplots(figsize=(8, 6)) sns.histplot(df[feature], kde=True, bins=30, ax=ax) st.pyplot(fig) temp_csv_path = os.path.join(TEMP_DIR, uploaded_file.name) with open(temp_csv_path, "w") as f: f.write(uploaded_file.getvalue().decode("utf-8")) notebook = generate_notebook_code(temp_csv_path, target_column, problem_type) notebook_buffer = io.StringIO() nbf.write(notebook, notebook_buffer) notebook_buffer.seek(0) notebook_content = notebook_buffer.getvalue() st.download_button( label="Download Code Notebook", data=notebook_content, file_name="data_science_pipeline.ipynb", mime="application/json" ) except Exception as e: st.error(f"An error occurred: {e}")