Spaces:
Sleeping
Sleeping
# =========================== | |
# Fichier: cuml_trainer.py | |
# =========================== | |
from abc import ABC, abstractmethod | |
from typing import Optional, Union, Tuple | |
import cupy as cp | |
import numpy as np | |
from scipy.sparse import csr_matrix | |
import cudf | |
from cuml.model_selection import train_test_split | |
from config import Config | |
from base_trainer import BaseTrainer | |
from interfaces.vectorizer import Vectorizer | |
class CuMLTrainer(BaseTrainer, ABC): | |
""" | |
Classe abstraite, héritée de BaseTrainer, représentant un entraîneur | |
basé sur la librairie cuML. Elle ajoute notamment le concept de vectoriseur | |
et force le passage de la matrice d'entrée en cupy.ndarray pour la plupart | |
des opérations. | |
Attributs: | |
vectorizer (Vectorizer): Objet responsable de la vectorisation du texte. | |
""" | |
def __init__(self, config: Config, data_path: str, | |
target_column: str) -> None: | |
""" | |
Initialise un CuMLTrainer avec la configuration. | |
Appelle également le constructeur de BaseTrainer. | |
:param config: Configuration globale du système. | |
:param data_path: Chemin vers le fichier de données. | |
:param target_column: Nom de la colonne cible dans les données. | |
""" | |
super().__init__(config, data_path, target_column) | |
self.vectorizer: Vectorizer = None | |
self.classifier: object = None # Déjà dans BaseTrainer, mais redéfini pour clarté | |
# Attributs pour stocker les données splittées (texte brut) | |
self.X_train_text: Optional[cudf.Series] = None | |
self.X_val_text: Optional[cudf.Series] = None | |
self.X_test_text: Optional[cudf.Series] = None | |
self.y_train: Optional[cp.ndarray] = None | |
self.y_val: Optional[cp.ndarray] = None | |
self.y_test: Optional[cp.ndarray] = None | |
# Attributs pour stocker les données vectorisées | |
self.X_train_vec: Optional[Union[cp.ndarray, csr_matrix]] = None | |
self.X_val_vec: Optional[Union[cp.ndarray, csr_matrix]] = None | |
self.X_test_vec: Optional[Union[cp.ndarray, csr_matrix]] = None | |
def build_components(self) -> None: | |
""" | |
Méthode abstraite. Instancie concrètement le vectorizer et le classifieur, | |
selon la configuration (ex. 'svm', 'random_forest', etc.). | |
""" | |
pass | |
def _load_and_split_data(self, test_size=0.2, val_size=0.5, random_state=42) -> None: | |
""" | |
Charge les données depuis data_path, les sépare en features/labels, | |
et les divise en ensembles train/validation/test (80/10/10 par défaut). | |
Stocke les résultats dans les attributs de l'instance. | |
""" | |
if self.X_train_text is not None: # Évite de recharger/resplitter | |
return | |
# Charger les données | |
data = cudf.read_csv(self.data_path) | |
# Identification des features | |
feature_columns = [col for col in data.columns if col != self.target_column] | |
if not feature_columns: | |
raise ValueError("Aucune colonne de feature trouvée.") | |
# Concaténation manuelle des features (agg n'est pas supporté pour les colonnes string dans cuDF) | |
# Commencer avec la première colonne | |
texts_concatenated = data[feature_columns[0]].astype(str) | |
# Ajouter les autres colonnes avec un espace comme séparateur | |
for col in feature_columns[1:]: | |
texts_concatenated = texts_concatenated.str.cat(data[col].astype(str), sep=' ') | |
# Convertir les labels en format compatible avec cuML | |
labels = data[self.target_column].astype(self._get_label_dtype()).values | |
# Créer une copie des textes pour le stockage | |
texts_for_storage = texts_concatenated.copy() | |
# Convertir les textes en indices numériques pour le split | |
# Cette étape est nécessaire car cuML ne peut pas gérer directement les objets string | |
# Nous utilisons une représentation numérique simple pour le split uniquement | |
# Les textes originaux seront stockés pour la vectorisation ultérieure | |
indices = cp.arange(len(texts_concatenated)) | |
# Premier split: 80% train, 20% temp (pour val+test) | |
train_indices, temp_indices, y_train, y_temp = train_test_split( | |
indices, labels, test_size=test_size, random_state=random_state, stratify=labels | |
) | |
# Deuxième split: 50% validation, 50% test sur l'ensemble temp | |
val_indices, test_indices, y_val, y_test = train_test_split( | |
temp_indices, y_temp, test_size=val_size, random_state=random_state, stratify=y_temp | |
) | |
# Récupérer les textes correspondant aux indices | |
X_train = texts_for_storage.iloc[train_indices.get()] | |
X_val = texts_for_storage.iloc[val_indices.get()] | |
X_test = texts_for_storage.iloc[test_indices.get()] | |
# Stockage des résultats | |
self.X_train_text = X_train | |
self.X_val_text = X_val | |
self.X_test_text = X_test | |
self.y_train = y_train | |
self.y_val = y_val | |
self.y_test = y_test | |
def train(self) -> None: | |
""" | |
Entraîne le classifieur sur l'ensemble d'entraînement après vectorisation. | |
Vectorise également les ensembles de validation et de test. | |
""" | |
self._load_and_split_data() # Assure que les données sont chargées et splittées | |
if self.vectorizer is None or self.classifier is None: | |
raise RuntimeError("Les composants (vectorizer, classifier) doivent être construits avant l'entraînement. Appelez build_components().") | |
# fit_transform sur l'entraînement | |
self.X_train_vec = self.vectorizer.fit_transform(self.X_train_text) | |
# transform sur validation et test | |
self.X_val_vec = self.vectorizer.transform(self.X_val_text) | |
self.X_test_vec = self.vectorizer.transform(self.X_test_text) | |
# Préparation pour cuML (conversion en dense si nécessaire) | |
X_train_prepared = self._prepare_input_for_fit(self.X_train_vec) | |
self.classifier.fit(X_train_prepared, self.y_train) | |
def evaluate(self, use_validation_set=False) -> dict: | |
""" | |
Évalue le classifieur sur l'ensemble de test (par défaut) ou de validation. | |
:param use_validation_set: Si True, évalue sur l'ensemble de validation. | |
Sinon (défaut), évalue sur l'ensemble de test. | |
:return: Dictionnaire de métriques. | |
""" | |
if self.X_test_vec is None or self.y_test is None or self.X_val_vec is None or self.y_val is None: | |
raise RuntimeError("Les données doivent être chargées, splittées et vectorisées avant l'évaluation. Appelez train().") | |
if self.classifier is None: | |
raise RuntimeError("Le classifieur doit être entraîné avant l'évaluation. Appelez train().") | |
if use_validation_set: | |
X_eval_vec = self.X_val_vec | |
y_true = self.y_val | |
dataset_name = "validation" | |
else: | |
X_eval_vec = self.X_test_vec | |
y_true = self.y_test | |
dataset_name = "test" | |
# Préparation pour cuML et prédiction | |
X_eval_prepared = self._prepare_input_for_predict(X_eval_vec) | |
y_pred = self.classifier.predict(X_eval_prepared) | |
# Récupérer les probabilités pour la classe positive | |
y_proba = self._get_positive_probabilities(X_eval_prepared) | |
# Calcul des métriques | |
prefix = f"{self.config.model.type.lower()}_{dataset_name}" | |
if self.metrics_calculator is None: | |
# Initialisation par défaut si non fait ailleurs | |
from interfaces.metrics_calculator import DefaultMetricsCalculator | |
self.metrics_calculator = DefaultMetricsCalculator() | |
# Utiliser la méthode calculate_and_log pour la classification binaire | |
metrics = self.metrics_calculator.calculate_and_log( | |
y_true=y_true, | |
y_pred=y_pred, | |
y_proba=y_proba, | |
prefix=prefix | |
) | |
return metrics | |
# Note: La méthode optimize_if_needed appelle l'optimiseur qui, à son tour, | |
# peut appeler train() et evaluate(). Il faudra s'assurer que l'optimiseur | |
# utilise evaluate(use_validation_set=True) pour l'évaluation des hyperparamètres. | |
# Cela pourrait nécessiter une modification des classes Optimizer ou de la façon | |
# dont la fonction objectif est définie dans l'optimiseur. | |
def _prepare_input_for_fit(self, X: Union[cp.ndarray, | |
csr_matrix]) -> cp.ndarray: | |
""" | |
Convertit, si nécessaire, la matrice en cupy.ndarray pour l'entraînement. | |
:param X: Données d'entraînement (cupy.ndarray ou scipy.sparse.csr_matrix). | |
:return: Données converties en cupy.ndarray, pour compatibilité cuML. | |
""" | |
if isinstance(X, csr_matrix): | |
return cp.asarray(X.toarray()) | |
return X # c'est déjà cupy.ndarray | |
def _prepare_input_for_predict( | |
self, X: Union[cp.ndarray, csr_matrix]) -> cp.ndarray: | |
""" | |
Convertit, si nécessaire, la matrice en cupy.ndarray pour la prédiction. | |
:param X: Données de prédiction (cupy.ndarray ou scipy.sparse.csr_matrix). | |
:return: Données converties en cupy.ndarray, pour compatibilité cuML. | |
""" | |
if isinstance(X, csr_matrix): | |
return cp.asarray(X.toarray()) | |
return X | |