Spaces:
Sleeping
Sleeping
# ============================================================ | |
# Fichier: trainers/huggingface/huggingface_transformer_trainer.py | |
# ============================================================ | |
from typing import Optional, Dict, List, Any | |
import cupy as cp | |
import numpy as np | |
import cudf | |
import torch | |
import torch.nn.functional as F # Pour softmax | |
# Utiliser cuml.model_selection | |
from cuml.model_selection import train_test_split | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EvalPrediction | |
from datasets import Dataset as HFDataset # Utiliser le type Dataset de Hugging Face pour plus de clarté | |
# Utiliser cuml.metrics | |
from cuml.metrics import accuracy_score, precision_recall_curve, roc_auc_score | |
# Importer cupy pour la conversion et argmax | |
import cupy as cp | |
# numpy n'est plus nécessaire ici | |
# import numpy as np | |
from base_trainer import BaseTrainer | |
from config import Config | |
# Fonction pour calculer les métriques en utilisant cuML (sans logs) | |
def compute_metrics(p: EvalPrediction) -> Dict[str, float]: | |
logits = p.predictions | |
# Convertir les labels numpy en cupy | |
labels_cp = cp.asarray(p.label_ids) | |
# Obtenir les prédictions en appliquant argmax aux logits avec cupy | |
preds_cp = cp.argmax(cp.asarray(logits), axis=1) | |
# Obtenir les probabilités (softmax) et convertir en cupy | |
probas_torch = F.softmax(torch.tensor(logits), dim=-1) | |
probas_cp = cp.asarray(probas_torch) | |
# Utiliser les probas de la classe positive | |
proba_pos_class = probas_cp[:, 1] | |
# 1. Calculer l'accuracy | |
acc = accuracy_score(labels_cp, preds_cp) | |
# 2. Calculer l'AUC ROC | |
auc = roc_auc_score(labels_cp.astype(cp.int32), proba_pos_class.astype(cp.float32)) | |
# 3. Utiliser precision_recall_curve pour obtenir les courbes | |
precision, recall, thresholds = precision_recall_curve( | |
labels_cp.astype(cp.int32), proba_pos_class.astype(cp.float32) | |
) | |
# 4. Calculer la précision, le rappel et le F1 score optimaux | |
optimal_precision, optimal_recall, optimal_f1, optimal_threshold = calculate_optimal_f1( | |
precision, recall, thresholds | |
) | |
# Construire le dictionnaire des métriques | |
metrics = { | |
"accuracy": float(acc), | |
"precision": float(optimal_precision), | |
"recall": float(optimal_recall), | |
"f1": float(optimal_f1), | |
"optimal_threshold": float(optimal_threshold), | |
"auc_roc": float(auc) | |
} | |
return metrics | |
def calculate_optimal_f1(precision: cp.ndarray, recall: cp.ndarray, thresholds: cp.ndarray): | |
""" | |
Calcule le F1 score optimal à partir des courbes de précision et de rappel. | |
Args: | |
precision: Tableau de précisions pour différents seuils | |
recall: Tableau de rappels pour différents seuils | |
thresholds: Tableau de seuils correspondants | |
Returns: | |
Tuple contenant (précision optimale, rappel optimal, F1 score optimal, seuil optimal) | |
""" | |
# Ajouter le seuil 1.0 à thresholds (qui n'est pas inclus par défaut dans precision_recall_curve) | |
thresholds_with_one = cp.append(thresholds, cp.array([1.0])) | |
# Calculer le F1 score pour chaque point de la courbe | |
# F1 = 2 * (precision * recall) / (precision + recall) | |
f1_scores = 2 * (precision * recall) / (precision + recall) | |
# Trouver l'indice du F1 score maximal | |
best_idx = cp.argmax(f1_scores) | |
best_precision = float(precision[best_idx]) | |
best_recall = float(recall[best_idx]) | |
best_f1 = float(f1_scores[best_idx]) | |
# Obtenir le seuil optimal | |
best_threshold = float(thresholds_with_one[best_idx]) | |
return best_precision, best_recall, best_f1, best_threshold | |
class HuggingFaceTransformerTrainer(BaseTrainer): | |
""" | |
Entraîneur spécifique Hugging Face, utilisant un tokenizer, | |
un modèle AutoModelForSequenceClassification et un HF Trainer. | |
Ne dépend pas d'un vectorizer cuML d'après l'UML. | |
""" | |
def __init__(self, config: Config, data_path: str, | |
target_column: str) -> None: | |
""" | |
Initialise un HuggingFaceTransformerTrainer avec la configuration | |
et les paramètres du parent BaseTrainer. | |
:param config: Configuration globale du système. | |
(La config.vectorizer n'est pas utilisée ici.) | |
:param data_path: Chemin vers le fichier de données. | |
:param target_column: Nom de la colonne cible dans vos données. | |
""" | |
super().__init__(config, data_path, target_column) | |
super().__init__(config, data_path, target_column) | |
self.tokenizer: Optional[AutoTokenizer] = None | |
self.model: Optional[AutoModelForSequenceClassification] = None | |
self.hf_trainer: Optional[Trainer] = None | |
self.train_dataset: Optional[HFDataset] = None | |
self.eval_dataset: Optional[HFDataset] = None | |
self.test_dataset: Optional[HFDataset] = None | |
def build_components(self) -> None: | |
""" | |
Instancie le tokenizer et le modèle Hugging Face | |
AutoModelForSequenceClassification, puis crée un Trainer | |
avec des TrainingArguments par défaut. | |
""" | |
model_name = self.config.model.params.get("model_name") | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModelForSequenceClassification.from_pretrained( | |
model_name) | |
training_args = self._prepare_training_args() | |
# Le HF Trainer a besoin de datasets, qui sont construits dans train() | |
# On ajoute compute_metrics ici | |
self.hf_trainer = Trainer( | |
model=self.model, | |
args=training_args, | |
train_dataset=self.train_dataset, # Sera défini dans train() | |
eval_dataset=self.eval_dataset, # Sera défini dans train() | |
compute_metrics=compute_metrics, | |
tokenizer=self.tokenizer, # Ajout du tokenizer pour le padding dynamique si besoin | |
callbacks=[] | |
) | |
def train(self) -> None: | |
""" | |
Entraîne le modèle Hugging Face sur le jeu de données. | |
""" | |
# Chargement des données avec cuDF | |
df = cudf.read_csv(self.data_path) | |
# Séparation des labels | |
labels = cp.asarray(df[self.target_column].astype(int)) | |
# Création du texte en concaténant toutes les colonnes sauf la cible | |
features_df = df.drop(columns=[self.target_column]).astype(str) | |
texts = features_df[features_df.columns[0]] | |
for col in features_df.columns[1:]: | |
texts = texts.str.cat(features_df[col], sep=' ') | |
# Créer une copie des textes pour le stockage | |
texts_for_storage = texts.copy() | |
# Utiliser des indices numériques pour le split au lieu des textes directement | |
# Cette approche évite les problèmes de conversion des objets string en tableaux CUDA | |
indices = cp.arange(len(texts)) | |
# Premier split: 80% train, 20% temp en utilisant les indices | |
train_indices, temp_indices, y_train, y_temp = train_test_split( | |
indices, labels, test_size=0.2, random_state=42, stratify=labels | |
) | |
# Deuxième split: 50% validation, 50% test sur temp | |
val_indices, test_indices, y_val, y_test = train_test_split( | |
temp_indices, y_temp, test_size=0.5, random_state=42, stratify=y_temp | |
) | |
# Récupérer les textes correspondant aux indices | |
X_train_text = texts_for_storage.iloc[train_indices.get()] | |
X_val_text = texts_for_storage.iloc[val_indices.get()] | |
X_test_text = texts_for_storage.iloc[test_indices.get()] | |
# Fonction pour créer un dataset Hugging Face à partir de cudf.Series et cp.ndarray | |
def create_hf_dataset(text_series: cudf.Series, label_array: cp.ndarray) -> HFDataset: | |
# Convertir en listes Python pour le tokenizer et HF Dataset | |
texts_list = text_series.to_arrow().to_pylist() | |
# Convertir cupy array en numpy puis en liste pour HF Dataset | |
labels_list = cp.asnumpy(label_array).tolist() | |
encodings = self.tokenizer(texts_list, padding=True, truncation=True) # Pas de return_tensors="pt" ici | |
# Crée un dictionnaire compatible avec Dataset.from_dict | |
data_dict = { | |
"input_ids": encodings["input_ids"], | |
"attention_mask": encodings["attention_mask"], | |
"labels": labels_list | |
} | |
return HFDataset.from_dict(data_dict) | |
# Création des datasets | |
self.train_dataset = create_hf_dataset(X_train_text, y_train) | |
self.eval_dataset = create_hf_dataset(X_val_text, y_val) | |
self.test_dataset = create_hf_dataset(X_test_text, y_test) # Garder pour evaluate() | |
# Assignation des datasets au Trainer HF (déjà fait dans build_components mais on réassigne ici) | |
self.hf_trainer.train_dataset = self.train_dataset | |
self.hf_trainer.eval_dataset = self.eval_dataset | |
# Lancement du fine‑tuning | |
print(f"Starting training with {len(self.train_dataset)} samples.") | |
print(f"Validation during training with {len(self.eval_dataset)} samples.") | |
print(f"Test set prepared with {len(self.test_dataset)} samples.") | |
self.hf_trainer.train() | |
def evaluate(self) -> dict: | |
""" | |
Évalue le modèle Hugging Face; la logique de calcul | |
des métriques est en partie assurée par le HF Trainer. | |
:return: Dictionnaire contenant les métriques calculées sur l'ensemble de test. | |
""" | |
if self.hf_trainer is None or self.test_dataset is None: | |
raise ValueError("Trainer or test dataset not initialized. Run train() first.") | |
print(f"Evaluating on the test set ({len(self.test_dataset)} samples)...") | |
# Utiliser predict pour obtenir les métriques sur le jeu de test | |
results = self.hf_trainer.predict(self.test_dataset) | |
# results.metrics contient déjà les métriques calculées par compute_metrics | |
# sur le test_dataset fourni. | |
print("Evaluation results:", results.metrics) | |
return results.metrics | |
def _create_torch_dataset(self, texts: cudf.Series, | |
labels: cp.ndarray) -> torch.utils.data.Dataset: | |
""" | |
Convertit un cudf.Series de textes et un tableau cupy de labels | |
en un Dataset PyTorch. | |
:param texts: Série cudf contenant les textes. | |
:param labels: Vecteur cupy des labels (ex. classification binaire ou multiclasses). | |
:return: Un Dataset PyTorch utilisable par Trainer. | |
""" | |
# Implémentation possible : tokenization + construction d'un dataset custom. | |
# Cette méthode n'est plus directement utilisée car on crée les HFDatasets dans train() | |
raise NotImplementedError( | |
"La méthode _create_torch_dataset n'est plus utilisée directement." | |
) | |
def _prepare_training_args(self) -> TrainingArguments: | |
""" | |
Construit un objet TrainingArguments Hugging Face, | |
par exemple pour définir l'output_dir, le batch_size, etc. | |
:return: Instance de TrainingArguments configurée. | |
""" | |
params = self.config.model.params | |
return TrainingArguments( | |
output_dir="./results", | |
num_train_epochs=float(params.get("epochs")), | |
per_device_train_batch_size=int(params.get("batch_size")), | |
per_device_eval_batch_size=int(params.get("batch_size")), | |
learning_rate=float(params.get("learning_rate")), | |
warmup_steps=int(params.get("warmup_steps")), | |
weight_decay=float(params.get("weight_decay")), | |
save_steps=50, | |
logging_dir="./logs", | |
logging_strategy="no", | |
save_strategy="epoch", | |
report_to="mlflow" | |
) | |
def optimize_if_needed(self) -> None: | |
""" | |
Surcharge la méthode optimize_if_needed de BaseTrainer pour désactiver | |
l'optimisation des hyperparamètres pour les modèles transformers. | |
""" | |
# Ne rien faire, ce qui désactive l'optimisation des hyperparamètres | |
return | |