#!/usr/bin/env python3 # ========================= # Fichier: main.py # ========================= import os import logging import hydra import mlflow from omegaconf import DictConfig, OmegaConf from config import Config from trainers.cuml.svm_trainer import SvmTrainer from trainers.cuml.random_forest_trainer import RandomForestTrainer from trainers.cuml.logistic_regression_trainer import LogisticRegressionTrainer from trainers.cuml.linear_regression_trainer import LinearRegressionTrainer from trainers.huggingface.huggingface_transformer_trainer import HuggingFaceTransformerTrainer from optimizers.optuna_optimizer import OptunaOptimizer from optimizers.ray_tune_optimizer import RayTuneOptimizer from mlflow_integration.mlflow_decorator import MLflowDecorator import tempfile import pickle import pandas as pd from utilities.cuml_pyfunc_wrapper import CuMLPyFuncWrapper logger = logging.getLogger(__name__) def get_trainer(config: Config): """ Crée et retourne l'instance du trainer approprié en fonction de la configuration. """ model_type = config.model.type.lower() # Mapping des types de modèles vers leurs trainers trainer_map = { "svm": SvmTrainer, "random_forest": RandomForestTrainer, "logistic_regression": LogisticRegressionTrainer, "linear_regression": LinearRegressionTrainer, "transformer": HuggingFaceTransformerTrainer, "bert": HuggingFaceTransformerTrainer, "roberta": HuggingFaceTransformerTrainer, } if model_type not in trainer_map: raise ValueError(f"Type de modèle non supporté: {model_type}") trainer_class = trainer_map[model_type] return trainer_class( config=config, data_path=config.data.path, target_column=config.data.target_column ) def get_optimizer(config: Config): """ Crée et retourne l'instance d'optimizer appropriée en fonction de la configuration. """ optimizer_type = config.hyperparameters.optimizer.lower() optimizer_map = { "optuna": OptunaOptimizer, "raytune": RayTuneOptimizer, } if optimizer_type not in optimizer_map: raise ValueError(f"Type d'optimizer non supporté: {optimizer_type}") return optimizer_map[optimizer_type]() def log_cuml_model_to_mlflow(trainer_instance, run_id=None): """ Sérialise le vectorizer et le classifier dans un répertoire temporaire puis log le tout dans MLflow en tant que modèle PyFunc. Les artifacts sont ainsi stockés dans mlruns, liés au run en cours. """ logger.info("Logging du modèle CuML via mlflow.pyfunc.log_model...") input_example = pd.DataFrame({"example_text": ["exemple"]}) # On va utiliser mlflow.pyfunc.log_model pour stocker le wrapper PyFunc + nos artifacts with tempfile.TemporaryDirectory() as tmpdir: vectorizer_path = os.path.join(tmpdir, "vectorizer.pkl") classifier_path = os.path.join(tmpdir, "classifier.pkl") # Sauvegarde sur disque with open(vectorizer_path, "wb") as vf: pickle.dump(trainer_instance.vectorizer, vf) with open(classifier_path, "wb") as cf: pickle.dump(trainer_instance.classifier, cf) # PyFunc wrapper (placeholder, héberge la logique de load_model) pyfunc_wrapper = CuMLPyFuncWrapper( vectorizer=None, classifier=None ) # Log en PyFunc; "cuml_model" est le chemin (artifact_path) où sera stocké le modèle dans MLflow mlflow.pyfunc.log_model( artifact_path="cuml_model", python_model=pyfunc_wrapper, artifacts={ "vectorizer": vectorizer_path, "classifier": classifier_path }, input_example=input_example ) logger.info("Le modèle et ses artifacts ont été enregistrés dans MLflow.") @hydra.main(config_path="conf", config_name="config", version_base=None) def main(cfg: DictConfig) -> None: """ Point d'entrée principal de l'application. """ try: config = Config(**OmegaConf.to_container(cfg, resolve=True)) except Exception as e: logger.error(f"Erreur lors de la validation Pydantic de la configuration: {e}") logger.error(f"Configuration après fusion Hydra: \n{OmegaConf.to_yaml(cfg)}") raise logger.info(f"Configuration Pydantic finale chargée: {config}") # Sélection du tracker MLflow mlflow.set_tracking_uri(config.mlflow.tracking_uri) trainer = get_trainer(config) trainer.build_components() def run_pipeline(trainer_instance): """ Exécute la séquence complète : - Optimisation hyperparamètres (si besoin) - Entraînement - Évaluation - Logging MLflow (paramètres, métriques, et modèles) """ logger.info("Vérification et lancement de l'optimisation des hyperparamètres si nécessaire...") trainer_instance.optimize_if_needed() logger.info("Lancement de l'entraînement...") trainer_instance.train() logger.info("Lancement de l'évaluation...") metrics = trainer_instance.evaluate() logger.info(f"Metrics calculés: {metrics}") # Log des métriques mlflow.log_metrics(metrics) # Log des paramètres uniquement si ce n'est pas un trainer Hugging Face # car le Trainer HF logue déjà ses propres paramètres (TrainingArguments) if not isinstance(trainer_instance, HuggingFaceTransformerTrainer): logger.info("Logging des paramètres...") trainer_instance.log_parameters_to_mlflow() else: logger.info("Logging des paramètres désactivé pour HuggingFaceTransformerTrainer (géré par HF Trainer).") # Log du modèle final (vectorizer+classifier) sous forme PyFunc # Note: Cette fonction est spécifique aux modèles cuML pour le moment if hasattr(trainer_instance, 'vectorizer') and trainer_instance.vectorizer is not None: log_cuml_model_to_mlflow(trainer_instance) else: logger.info("Logging du modèle PyFunc non applicable pour ce type de trainer.") logger.info("Pipeline MLflow complet terminé.") # On utilise un décorateur défini pour centraliser le démarrage/arrêt du run mlflow_decorator = MLflowDecorator( experiment_name=config.mlflow.experiment_name, tracking_uri=config.mlflow.tracking_uri ) run_pipeline_with_mlflow = mlflow_decorator(run_pipeline) logger.info("Lancement du pipeline complet avec MLflow...") run_pipeline_with_mlflow(trainer) logger.info("Pipeline MLflow terminé avec succès.") if __name__ == "__main__": main()