|
import pandas as pd
|
|
import numpy as np
|
|
import pickle
|
|
import os
|
|
import sys
|
|
import logging
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.model_selection import train_test_split
|
|
from typing import List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DiabetesModel:
|
|
def __init__(self):
|
|
self.model = None
|
|
self.scaler = None
|
|
self.feature_names = None
|
|
self.model_metrics = None
|
|
|
|
|
|
self.project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
self.model_path = os.path.join(self.project_root, 'models', 'diabetes_model.pkl')
|
|
self.feature_names_path = os.path.join(self.project_root, 'models', 'diabetes_feature_names.pkl')
|
|
self.model_metrics_path = os.path.join(self.project_root, 'models', 'diabetes_model_metrics.pkl')
|
|
|
|
|
|
self.default_feature_names = [
|
|
'Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness',
|
|
'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'
|
|
]
|
|
|
|
|
|
self.feature_names = self.default_feature_names
|
|
|
|
|
|
self.load_model()
|
|
|
|
def load_model(self):
|
|
"""Load the trained model and related files from disk."""
|
|
try:
|
|
|
|
if os.path.exists(self.feature_names_path):
|
|
try:
|
|
with open(self.feature_names_path, 'rb') as f:
|
|
self.feature_names = pickle.load(f, encoding='latin1')
|
|
logger.info("Feature names loaded successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Error loading feature names: {str(e)}. Using defaults.")
|
|
self.feature_names = self.default_feature_names
|
|
else:
|
|
logger.warning("Feature names file not found, using defaults")
|
|
self.feature_names = self.default_feature_names
|
|
|
|
|
|
if os.path.exists(self.model_path):
|
|
try:
|
|
with open(self.model_path, 'rb') as f:
|
|
model_data = pickle.load(f, encoding='latin1')
|
|
if isinstance(model_data, dict):
|
|
self.model = model_data.get('model')
|
|
self.scaler = model_data.get('scaler')
|
|
if self.model is None or self.scaler is None:
|
|
raise ValueError("Model or scaler missing from loaded data")
|
|
else:
|
|
self.model = model_data
|
|
|
|
self.scaler = StandardScaler()
|
|
logger.warning("Model loaded but scaler not found. Creating new scaler.")
|
|
logger.info("Model loaded successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error loading model: {str(e)}")
|
|
raise ValueError(f"Failed to load diabetes model: {str(e)}")
|
|
else:
|
|
logger.error("Model file not found.")
|
|
raise FileNotFoundError(f"Diabetes model file not found at {self.model_path}")
|
|
|
|
|
|
if os.path.exists(self.model_metrics_path):
|
|
try:
|
|
with open(self.model_metrics_path, 'rb') as f:
|
|
self.model_metrics = pickle.load(f, encoding='latin1')
|
|
logger.info("Model metrics loaded successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Error loading model metrics: {str(e)}")
|
|
self.model_metrics = None
|
|
else:
|
|
logger.warning("Model metrics file not found")
|
|
self.model_metrics = None
|
|
except Exception as e:
|
|
logger.error(f"Error in load_model: {str(e)}")
|
|
raise ValueError(f"Failed to load diabetes model: {str(e)}")
|
|
|
|
|
|
def _create_dummy_model(self):
|
|
"""Create a dummy model for testing purposes."""
|
|
try:
|
|
logger.warning("Creating dummy model")
|
|
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
|
|
self.scaler = StandardScaler()
|
|
|
|
|
|
dummy_data = pd.DataFrame(np.random.randn(100, len(self.feature_names)),
|
|
columns=self.feature_names)
|
|
self.scaler.fit(dummy_data)
|
|
|
|
|
|
dummy_target = np.random.randint(0, 2, 100)
|
|
self.model.fit(dummy_data, dummy_target)
|
|
logger.info("Dummy model created successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error creating dummy model: {str(e)}")
|
|
raise
|
|
|
|
def save_model(self):
|
|
"""Save the model and scaler together in one file."""
|
|
try:
|
|
|
|
model_data = {
|
|
'model': self.model,
|
|
'scaler': self.scaler
|
|
}
|
|
|
|
|
|
with open(self.model_path, 'wb') as f:
|
|
pickle.dump(model_data, f)
|
|
logger.info("Model and scaler saved successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving model: {str(e)}")
|
|
raise
|
|
|
|
def predict(self, features):
|
|
"""Make a prediction using the trained model."""
|
|
try:
|
|
if self.model is None:
|
|
raise ValueError("Model not loaded. Please ensure model file exists and is valid.")
|
|
|
|
print(f"Input features for diabetes prediction: {features}")
|
|
|
|
|
|
processed_features = {}
|
|
for key, value in features.items():
|
|
try:
|
|
processed_features[key] = float(value)
|
|
except (ValueError, TypeError):
|
|
|
|
raise ValueError(f"Invalid value for feature {key}: {value}. Expected numeric value.")
|
|
|
|
|
|
X = pd.DataFrame([processed_features])
|
|
|
|
|
|
required_columns = [
|
|
'Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness',
|
|
'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'
|
|
]
|
|
|
|
for col in required_columns:
|
|
if col not in X.columns:
|
|
raise ValueError(f"Missing required feature: {col}")
|
|
|
|
|
|
X = X[required_columns]
|
|
|
|
|
|
X = X.astype(float)
|
|
|
|
|
|
if hasattr(self, 'scaler') and self.scaler is not None:
|
|
X_scaled = self.scaler.transform(X)
|
|
else:
|
|
X_scaled = X.values
|
|
|
|
|
|
prediction = bool(self.model.predict(X_scaled)[0])
|
|
|
|
|
|
if hasattr(self.model, 'predict_proba'):
|
|
|
|
proba = self.model.predict_proba(X_scaled)[0]
|
|
|
|
probability = float(proba[1]) if len(proba) > 1 else float(proba[0])
|
|
else:
|
|
|
|
probability = 0.5 + (float(self.model.decision_function(X_scaled)[0]) / 10)
|
|
probability = max(0, min(1, probability))
|
|
|
|
return {
|
|
"prediction": prediction,
|
|
"probability": probability
|
|
}
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise ValueError(f"Error during prediction: {str(e)}")
|
|
|
|
def get_feature_importance(self) -> List[float]:
|
|
"""Get the feature importance scores as a list of floats."""
|
|
try:
|
|
if hasattr(self.model, 'feature_importances_'):
|
|
|
|
importances = [float(x) for x in self.model.feature_importances_]
|
|
|
|
if len(importances) == len(self.feature_names):
|
|
return importances
|
|
|
|
logger.warning("Could not get valid feature importances")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting feature importance: {str(e)}")
|
|
return None
|
|
|
|
def get_model_metrics(self):
|
|
"""Get the model metrics."""
|
|
return self.model_metrics if self.model_metrics else None
|
|
|
|
def train_model(self, X, y):
|
|
"""Train the model with the given data."""
|
|
try:
|
|
logger.info("Starting model training...")
|
|
|
|
|
|
self.scaler = StandardScaler()
|
|
X_scaled = self.scaler.fit_transform(X)
|
|
|
|
|
|
self.model = RandomForestClassifier(
|
|
n_estimators=100,
|
|
max_depth=10,
|
|
random_state=42
|
|
)
|
|
self.model.fit(X_scaled, y)
|
|
|
|
|
|
train_score = self.model.score(X_scaled, y)
|
|
feature_importance = self.model.feature_importances_
|
|
|
|
self.model_metrics = {
|
|
'train_score': train_score,
|
|
'feature_importance': feature_importance.tolist()
|
|
}
|
|
|
|
|
|
self.save_model()
|
|
self.save_metrics()
|
|
self.save_feature_names()
|
|
|
|
logger.info(f"Model trained successfully. Training score: {train_score:.4f}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in train_model: {str(e)}")
|
|
raise
|
|
|
|
def save_metrics(self):
|
|
"""Save model metrics to file."""
|
|
try:
|
|
with open(self.model_metrics_path, 'wb') as f:
|
|
pickle.dump(self.model_metrics, f)
|
|
logger.info("Model metrics saved successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error saving model metrics: {str(e)}")
|
|
raise
|
|
|
|
def save_feature_names(self):
|
|
"""Save feature names to file."""
|
|
try:
|
|
with open(self.feature_names_path, 'wb') as f:
|
|
pickle.dump(self.feature_names, f)
|
|
logger.info("Feature names saved successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error saving feature names: {str(e)}")
|
|
raise
|
|
|
|
def train_model():
|
|
"""Train and save the diabetes prediction model"""
|
|
try:
|
|
model = DiabetesModel()
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(os.path.dirname(current_dir))
|
|
data_file = os.path.join(project_root, "data", "diabetes.csv")
|
|
model_dir = os.path.join(project_root, 'models')
|
|
|
|
print(f"Loading data from: {data_file}")
|
|
print(f"Model will be saved to: {model_dir}")
|
|
|
|
|
|
if not os.path.exists(data_file):
|
|
raise FileNotFoundError(f"Data file not found at {data_file}")
|
|
|
|
|
|
os.makedirs(model_dir, exist_ok=True)
|
|
|
|
|
|
print("Loading and preparing data...")
|
|
data = pd.read_csv(data_file)
|
|
|
|
|
|
X = data[model.feature_names]
|
|
y = data['Outcome']
|
|
|
|
|
|
print("Training model...")
|
|
model.train_model(X, y)
|
|
print("Model trained and saved successfully")
|
|
|
|
except Exception as e:
|
|
print(f"Error during model training: {str(e)}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
train_model() |