|
import os
|
|
import joblib
|
|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder
|
|
import shap
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class LoanApprovalModel:
|
|
"""Loan approval model for predicting loan application outcomes."""
|
|
|
|
def __init__(self, model_dir: str = "models", load_model: bool = True):
|
|
"""Initialize the loan approval model.
|
|
|
|
Args:
|
|
model_dir (str): Directory containing the trained model components
|
|
load_model (bool): Whether to load existing model components
|
|
"""
|
|
self.model_dir = model_dir
|
|
self.model = None
|
|
self.scaler = StandardScaler()
|
|
self.feature_names = None
|
|
self.explainer = None
|
|
|
|
|
|
self.categorical_columns = ['education', 'self_employed']
|
|
self.label_encoders = {}
|
|
for col in self.categorical_columns:
|
|
self.label_encoders[col] = LabelEncoder()
|
|
|
|
|
|
if load_model:
|
|
self.load_components()
|
|
|
|
def load_components(self):
|
|
"""Load the trained model and preprocessing components."""
|
|
try:
|
|
logger.info("Loading model components...")
|
|
|
|
|
|
model_path = os.path.join(self.model_dir, 'loan_model.joblib')
|
|
if not os.path.exists(model_path):
|
|
raise FileNotFoundError(f"Model file not found at {model_path}")
|
|
self.model = joblib.load(model_path)
|
|
|
|
|
|
scaler_path = os.path.join(self.model_dir, 'loan_scaler.joblib')
|
|
if not os.path.exists(scaler_path):
|
|
raise FileNotFoundError(f"Scaler file not found at {scaler_path}")
|
|
self.scaler = joblib.load(scaler_path)
|
|
|
|
|
|
encoders_path = os.path.join(self.model_dir, 'loan_label_encoders.joblib')
|
|
if not os.path.exists(encoders_path):
|
|
raise FileNotFoundError(f"Label encoders file not found at {encoders_path}")
|
|
self.label_encoders = joblib.load(encoders_path)
|
|
|
|
|
|
features_path = os.path.join(self.model_dir, 'loan_feature_names.joblib')
|
|
if not os.path.exists(features_path):
|
|
raise FileNotFoundError(f"Feature names file not found at {features_path}")
|
|
self.feature_names = joblib.load(features_path)
|
|
|
|
|
|
explainer_path = os.path.join(self.model_dir, 'loan_explainer.joblib')
|
|
if os.path.exists(explainer_path):
|
|
self.explainer = joblib.load(explainer_path)
|
|
|
|
logger.info("Model components loaded successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading model components: {str(e)}")
|
|
raise
|
|
|
|
def save(self, output_dir: str = "models") -> None:
|
|
"""Save model components to disk.
|
|
|
|
Args:
|
|
output_dir (str): Directory to save model components
|
|
"""
|
|
try:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
model_path = os.path.join(output_dir, "loan_model.joblib")
|
|
joblib.dump(self.model, model_path)
|
|
|
|
|
|
scaler_path = os.path.join(output_dir, "loan_scaler.joblib")
|
|
joblib.dump(self.scaler, scaler_path)
|
|
|
|
|
|
encoders_path = os.path.join(output_dir, "loan_label_encoders.joblib")
|
|
joblib.dump(self.label_encoders, encoders_path)
|
|
|
|
|
|
features_path = os.path.join(output_dir, "loan_feature_names.joblib")
|
|
joblib.dump(self.feature_names, features_path)
|
|
|
|
|
|
if self.explainer is not None:
|
|
explainer_path = os.path.join(output_dir, "loan_explainer.joblib")
|
|
joblib.dump(self.explainer, explainer_path)
|
|
|
|
logger.info(f"Model components saved to {output_dir}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving model components: {str(e)}")
|
|
raise
|
|
|
|
def train(self, X: pd.DataFrame, y: pd.Series) -> None:
|
|
"""Train the loan approval model.
|
|
|
|
Args:
|
|
X (pd.DataFrame): Training features
|
|
y (pd.Series): Target values
|
|
"""
|
|
try:
|
|
|
|
self.feature_names = list(X.columns)
|
|
|
|
|
|
X_processed = self._preprocess_features(X, is_training=True)
|
|
|
|
|
|
logger.info("Training RandomForestClassifier...")
|
|
self.model = RandomForestClassifier(
|
|
n_estimators=200,
|
|
max_depth=10,
|
|
min_samples_split=5,
|
|
min_samples_leaf=2,
|
|
random_state=42
|
|
)
|
|
|
|
|
|
self.model.fit(X_processed, y)
|
|
|
|
|
|
logger.info("Initializing SHAP explainer...")
|
|
self.explainer = shap.TreeExplainer(self.model)
|
|
|
|
logger.info("Model trained successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error training model: {str(e)}")
|
|
raise
|
|
|
|
def predict(self, features: Dict[str, Any]) -> Tuple[str, float, Dict[str, float]]:
|
|
"""Make a prediction for loan approval.
|
|
|
|
Args:
|
|
features (Dict[str, Any]): Input features for prediction
|
|
|
|
Returns:
|
|
Tuple[str, float, Dict[str, float]]: Prediction result, probability, and feature importance
|
|
"""
|
|
try:
|
|
|
|
required_features = [
|
|
'no_of_dependents', 'education', 'self_employed', 'income_annum',
|
|
'loan_amount', 'loan_term', 'cibil_score', 'residential_assets_value',
|
|
'commercial_assets_value', 'luxury_assets_value', 'bank_asset_value'
|
|
]
|
|
|
|
missing_features = [f for f in required_features if f not in features]
|
|
if missing_features:
|
|
raise ValueError(f"Missing required features: {missing_features}")
|
|
|
|
|
|
features = features.copy()
|
|
features['debt_to_income'] = features['loan_amount'] / features['income_annum']
|
|
features['total_assets'] = (
|
|
features['residential_assets_value'] +
|
|
features['commercial_assets_value'] +
|
|
features['luxury_assets_value'] +
|
|
features['bank_asset_value']
|
|
)
|
|
features['asset_to_loan'] = features['total_assets'] / features['loan_amount']
|
|
|
|
|
|
X = pd.DataFrame([features])
|
|
|
|
|
|
required_features = self.feature_names
|
|
missing_features = set(required_features) - set(X.columns)
|
|
if missing_features:
|
|
raise ValueError(f"Missing required features after preprocessing: {missing_features}")
|
|
|
|
|
|
X = X[required_features]
|
|
|
|
|
|
for feature in ['education', 'self_employed']:
|
|
try:
|
|
X[feature] = self.label_encoders[feature].transform(X[feature].astype(str))
|
|
except Exception as e:
|
|
raise ValueError(f"Error encoding {feature}: {str(e)}. Valid values are: {self.label_encoders[feature].classes_}")
|
|
|
|
|
|
numerical_features = [f for f in X.columns if f not in ['education', 'self_employed']]
|
|
X[numerical_features] = self.scaler.transform(X[numerical_features])
|
|
|
|
|
|
prediction = self.model.predict(X)[0]
|
|
probability = self.model.predict_proba(X)[0][1]
|
|
|
|
|
|
feature_importance = dict(zip(self.feature_names, self.model.feature_importances_))
|
|
|
|
|
|
result = "Approved" if prediction == 1 else "Rejected"
|
|
|
|
return result, probability, feature_importance
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error making prediction: {str(e)}")
|
|
logger.exception("Detailed traceback:")
|
|
raise
|
|
|
|
def _preprocess_features(self, X: pd.DataFrame, is_training: bool = False) -> pd.DataFrame:
|
|
"""Preprocess features for model training or prediction.
|
|
|
|
Args:
|
|
X (pd.DataFrame): Input features
|
|
is_training (bool): Whether preprocessing is for training
|
|
|
|
Returns:
|
|
pd.DataFrame: Preprocessed features
|
|
"""
|
|
try:
|
|
|
|
df = X.copy()
|
|
|
|
|
|
for col in self.categorical_columns:
|
|
if col in df.columns:
|
|
if is_training:
|
|
df[col] = self.label_encoders[col].fit_transform(df[col])
|
|
else:
|
|
df[col] = self.label_encoders[col].transform(df[col])
|
|
|
|
|
|
numerical_features = [f for f in df.columns if f not in self.categorical_columns]
|
|
if is_training:
|
|
df[numerical_features] = self.scaler.fit_transform(df[numerical_features])
|
|
else:
|
|
df[numerical_features] = self.scaler.transform(df[numerical_features])
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preprocessing features: {str(e)}")
|
|
raise
|
|
|
|
def get_feature_importance(self):
|
|
"""Return feature importance values from the model."""
|
|
try:
|
|
if self.model is None:
|
|
print("Model not loaded, cannot get feature importance")
|
|
return None
|
|
|
|
|
|
if hasattr(self.model, 'feature_importances_'):
|
|
|
|
return self.model.feature_importances_.tolist()
|
|
elif hasattr(self.model, 'coef_'):
|
|
|
|
return np.abs(self.model.coef_[0]).tolist()
|
|
else:
|
|
|
|
print("Feature importance not available in model, returning dummy values")
|
|
|
|
feature_count = len(self.feature_names) if hasattr(self, 'feature_names') else 10
|
|
return [0.1] * feature_count
|
|
except Exception as e:
|
|
print(f"Error getting feature importance: {str(e)}")
|
|
|
|
feature_count = len(self.feature_names) if hasattr(self, 'feature_names') else 10
|
|
return [0.1] * feature_count |