Spaces:
Running
Running
# fonctions.py | |
#from config import DATA_DIR, RESULTS_DIR | |
# -------------------- FONCTIONS DE BASE DATANT DU PROJET 8 -------------------- | |
# fonctions.py | |
# Importations nécessaires | |
import os | |
from cityscapesscripts.helpers.labels import name2label | |
from cityscapesscripts.preparation.json2labelImg import json2labelImg | |
import json | |
import numpy as np | |
import albumentations as A | |
import cv2 | |
from tensorflow.keras.utils import Sequence | |
from tensorflow.keras.preprocessing.image import load_img, img_to_array | |
from albumentations import Compose, HorizontalFlip, Rotate, OneOf, RandomScale, Blur, GaussNoise, Resize | |
import matplotlib.pyplot as plt | |
from typing import List, Tuple | |
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, MaxPooling2D, UpSampling2D, Concatenate, Resizing, BatchNormalization, Dropout | |
from tensorflow.keras.models import Model | |
from tqdm import tqdm | |
from tensorflow.keras.applications import VGG16, ResNet50 | |
from tensorflow.keras.callbacks import EarlyStopping, CSVLogger, ReduceLROnPlateau, ModelCheckpoint | |
from cityscapesscripts.helpers.labels import trainId2label | |
import time | |
import segmentation_models as sm | |
import pandas as pd | |
from pathlib import Path | |
from datetime import datetime | |
from tensorflow.keras.optimizers import Adam | |
import glob | |
import torch | |
from typing import Tuple | |
from torchvision import transforms | |
import torch.nn.functional as F | |
# Définition des classes utiles | |
CLASSES_UTILES = { | |
"void": 0, "flat": 1, "construction": 2, "object": 3, | |
"nature": 4, "sky": 5, "human": 6, "vehicle": 7 | |
} | |
# Correction du chemin pour Projet 9 | |
root_path = Path(".") # racine du projet 9 | |
data_path = root_path / "data" | |
cityscapes_scripts_path = root_path / "notebook/cityscapesScripts/cityscapesscripts" | |
images_path = data_path / "leftImg8bit" | |
masks_path = data_path / "gtFine" | |
class CityscapesDataset(torch.utils.data.Dataset): | |
def __init__(self, root, split="train", mode="fine", target_type="semantic", image_size=(512, 512)): | |
from torchvision.datasets import Cityscapes | |
from torchvision import transforms | |
self.dataset = Cityscapes(root=root, split=split, mode="fine", target_type="semantic") | |
self.image_size = image_size | |
self.transforms = transforms | |
def __len__(self): | |
return len(self.dataset) | |
def __getitem__(self, index): | |
image, mask = self.dataset[index] | |
image = image.resize(self.image_size) | |
mask = mask.resize(self.image_size) | |
# Convertir l’image en tenseur | |
image = self.transforms.ToTensor()(image) | |
# Convertir le masque en tableau numpy puis appliquer le remapping | |
mask_np = np.array(mask).astype(np.uint8) | |
mask_remap = remap_classes(mask_np) | |
mask_tensor = torch.from_numpy(mask_remap).long() | |
return image, mask_tensor | |
def remap_classes(mask: np.ndarray) -> np.ndarray: | |
""" | |
Convertit les classes Cityscapes originales (0-33) vers les 8 catégories principales définies. | |
Retourne un masque avec uniquement des valeurs de 0 à 7. | |
""" | |
# Nettoyage des valeurs non prévues (ex: 34, 35) | |
mask = np.where(mask > 33, 0, mask) # Toute valeur > 33 est convertie en void (classe 0) | |
# Définition précise du mapping basé sur les "labelIds" Cityscapes originaux | |
labelIds_to_main_classes = { | |
0: 0, # unlabeled → void | |
1: 0, # ego vehicle → void | |
2: 0, # rectification border → void | |
3: 0, # out of roi → void | |
4: 0, # static → void | |
5: 0, # dynamic → void | |
6: 0, # ground → void | |
7: 1, # road → flat | |
8: 1, # sidewalk → flat | |
9: 0, # parking → void | |
10: 0, # rail track → void | |
11: 2, # building → construction | |
12: 2, # wall → construction | |
13: 2, # fence → construction | |
14: 0, # guard rail → void | |
15: 0, # bridge → void | |
16: 0, # tunnel → void | |
17: 3, # pole → object | |
18: 3, # polegroup → object | |
19: 3, # traffic light → object | |
20: 3, # traffic sign → object | |
21: 4, # vegetation → nature | |
22: 4, # terrain → nature | |
23: 5, # sky → sky | |
24: 6, # person → human | |
25: 6, # rider → human | |
26: 7, # car → vehicle | |
27: 7, # truck → vehicle | |
28: 7, # bus → vehicle | |
29: 7, # caravan → vehicle | |
30: 7, # trailer → vehicle | |
31: 7, # train → vehicle | |
32: 7, # motorcycle → vehicle | |
33: 7 # bicycle → vehicle | |
} | |
remapped_mask = np.copy(mask) | |
for original_class, new_class in labelIds_to_main_classes.items(): | |
remapped_mask[mask == original_class] = new_class | |
return remapped_mask.astype(np.uint8) | |
def view_folder(dossier): | |
dossier = Path(dossier) | |
if not dossier.exists(): | |
print(f"❌ Le dossier {dossier} n'existe pas.") | |
return | |
for sous_dossier in dossier.iterdir(): | |
if sous_dossier.is_dir(): | |
print(f"|-- {sous_dossier.name}") | |
for sous_sous_dossier in sous_dossier.iterdir(): | |
if sous_sous_dossier.is_dir(): | |
print(f" |-- {sous_sous_dossier.name}") | |
def load_image(path: str, target_size: Tuple[int, int]) -> np.ndarray: | |
"""Charge et normalise une image entre 0 et 1.""" | |
img = load_img(path, target_size=target_size) | |
return img_to_array(img).astype("float32") / 255.0 | |
def load_mask(path: str, target_size: Tuple[int, int], mask_mode="labelIds") -> np.ndarray: | |
""" | |
Charge, redimensionne et remappe un masque. | |
Applique systématiquement le remapping vers les 8 classes principales. | |
Args: | |
path (str): Chemin vers le masque. | |
target_size (Tuple[int, int]): Taille de sortie (hauteur, largeur). | |
mask_mode (str): "labelIds" pour les masques Cityscapes originaux, "trainIds" sinon. | |
Returns: | |
np.ndarray: Masque avec valeurs de classe entre 0 et 7. | |
""" | |
mask = load_img(path, target_size=target_size, color_mode="grayscale") | |
mask = img_to_array(mask).astype("uint8").squeeze() | |
# Toujours appliquer le remapping pour garantir 8 classes | |
mask = remap_classes(mask) | |
return mask | |
def one_hot_encode_mask(mask: np.ndarray, num_classes: int) -> np.ndarray: | |
"""Encode un masque en One-Hot.""" | |
# Vérifier les valeurs uniques avant l'encodage | |
unique_values = np.unique(mask) | |
if np.any(unique_values >= num_classes): | |
print(f"Attention : Certaines valeurs de masques dépassent {num_classes-1}: {unique_values}") | |
mask = np.clip(mask, 0, num_classes - 1) | |
return np.eye(num_classes, dtype=np.uint8)[mask] | |
def decode_mask(mask: np.ndarray) -> np.ndarray: | |
"""Convertit un masque One-Hot en format indexé.""" | |
return np.argmax(mask, axis=-1) | |
def get_augmentations(image_size: Tuple[int, int]) -> Compose: | |
"""Définit les transformations Albumentations pour l'entraînement.""" | |
return Compose([ | |
HorizontalFlip(p=0.2), | |
Rotate(limit=15, p=0.2), | |
RandomScale(scale_limit=0.1, p=0.2), | |
Resize(*image_size, interpolation=cv2.INTER_NEAREST) | |
]) | |
class DataGenerator(Sequence): | |
def __init__(self, image_paths, mask_paths, image_size=(256, 256), batch_size=16, num_classes=8, # TEST avec 512x512, 1024x1024, 512x1024, 1024x512, 256x512 et 512x256 | |
shuffle=True, augmentation_ratio=1.0, use_cache=False): | |
self.image_paths = image_paths | |
self.mask_paths = mask_paths | |
self.image_size = image_size | |
self.batch_size = batch_size | |
self.num_classes = num_classes | |
self.shuffle = shuffle | |
self.augmentation_ratio = augmentation_ratio | |
self.use_cache = use_cache | |
self.cache = {} # Cache des masques transformés | |
self.augmentation = get_augmentations(image_size) | |
self.on_epoch_end() | |
def __getitem__(self, index): | |
start_time = time.time() | |
start = index * self.batch_size | |
end = start + self.batch_size | |
batch_image_paths = self.image_paths[start:end] | |
batch_mask_paths = self.mask_paths[start:end] | |
batch_images, batch_masks = [], [] | |
for img_path, mask_path in zip(batch_image_paths, batch_mask_paths): | |
img = load_image(img_path, self.image_size) | |
if self.use_cache and mask_path in self.cache: | |
mask = self.cache[mask_path] | |
else: | |
mask = load_mask(mask_path, self.image_size, mask_mode="trainIds") | |
if self.use_cache: | |
self.cache[mask_path] = mask | |
if np.random.rand() < self.augmentation_ratio: | |
augmented = self.augmentation(image=img, mask=mask) | |
img, mask = augmented["image"], augmented["mask"] | |
batch_images.append(img) | |
batch_masks.append(one_hot_encode_mask(mask, self.num_classes)) | |
elapsed_time = time.time() - start_time | |
# print(f"📊 Génération batch {index} en {elapsed_time:.2f}s") | |
return np.stack(batch_images), np.stack(batch_masks) | |
def __len__(self): | |
"""Renvoie le nombre total de batches par epoch.""" | |
return int(np.ceil(len(self.image_paths) / self.batch_size)) | |
def on_epoch_end(self) -> None: | |
"""Mélange les données après chaque epoch si shuffle est activé.""" | |
if self.shuffle: | |
data = list(zip(self.image_paths, self.mask_paths)) | |
np.random.shuffle(data) | |
self.image_paths, self.mask_paths = zip(*data) | |
def visualize_batch(self, num_images: int = 5) -> None: | |
"""Affiche correctement un lot d'images et de masques.""" | |
batch_images, batch_masks = self.__getitem__(0) | |
num_images = min(num_images, len(batch_images)) | |
fig, axes = plt.subplots(num_images, 2, figsize=(10, num_images * 5)) | |
for i in range(num_images): | |
axes[i, 0].imshow(batch_images[i]) | |
axes[i, 0].set_title("Image") | |
axes[i, 0].axis("off") | |
axes[i, 1].imshow(decode_mask(batch_masks[i]), cmap="inferno") | |
axes[i, 1].set_title("Mask (decoded)") | |
axes[i, 1].axis("off") | |
plt.tight_layout() | |
plt.show() | |
# Test du DataGenerator | |
if __name__ == "__main__": | |
train_gen = DataGenerator( | |
image_paths=train_input_img_paths, | |
mask_paths=train_label_ids_img_paths, | |
image_size=(256, 256), # TEST avec 512x512 | |
batch_size=16, # TEST: 8, 16 ou 32 | |
num_classes=8, | |
shuffle=True, | |
augmentation_ratio=0.5 | |
) | |
train_gen.visualize_batch(num_images=3) | |
def on_epoch_end(self) -> None: | |
"""Mélange les données après chaque epoch si shuffle est activé.""" | |
if self.shuffle: | |
data = list(zip(self.image_paths, self.mask_paths)) | |
np.random.shuffle(data) | |
self.image_paths, self.mask_paths = zip(*data) | |
def visualize_batch(self, num_images: int = 5) -> None: | |
"""Affiche correctement un lot d'images et de masques.""" | |
batch_images, batch_masks = self.__getitem__(0) | |
num_images = min(num_images, len(batch_images)) | |
fig, axes = plt.subplots(num_images, 2, figsize=(10, num_images * 5)) | |
for i in range(num_images): | |
axes[i, 0].imshow(batch_images[i]) | |
axes[i, 0].set_title("Image") | |
axes[i, 0].axis("off") | |
axes[i, 1].imshow(decode_mask(batch_masks[i]), cmap="inferno") | |
axes[i, 1].set_title("Mask (decoded)") | |
axes[i, 1].axis("off") | |
plt.tight_layout() | |
plt.show() | |
def iou_coef(y_true, y_pred, smooth=1e-6): | |
""" | |
Calcule l'Intersection over Union (IoU). | |
Correction : conversion explicite en float32. | |
""" | |
y_true = tf.keras.backend.cast(y_true, "float32") | |
y_pred = tf.keras.backend.cast(y_pred, "float32") | |
y_true_f = tf.keras.backend.flatten(y_true) | |
y_pred_f = tf.keras.backend.flatten(y_pred) | |
intersection = tf.keras.backend.sum(y_true_f * y_pred_f) | |
union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) - intersection | |
return (intersection + smooth) / (union + smooth) | |
def get_logger(nom_modele: str): | |
""" | |
Crée un CSVLogger pour enregistrer les métriques d'entraînement dans un fichier horodaté. | |
""" | |
from datetime import datetime | |
from tensorflow.keras.callbacks import CSVLogger | |
RESULTS_DIR.mkdir(parents=True, exist_ok=True) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
log_filename = RESULTS_DIR / f"{nom_modele}_{timestamp}.csv" | |
return CSVLogger(log_filename, separator=",", append=False) | |
def charger_metriques(dossier_logs): | |
""" | |
Charge tous les fichiers CSV de métriques présents dans un dossier. | |
Args: | |
dossier_logs (str): Chemin vers le dossier contenant les fichiers CSV. | |
Returns: | |
dict: Dictionnaire avec nom du modèle en clé et dataframe en valeur. | |
""" | |
fichiers = glob.glob(os.path.join(dossier_logs, "*.csv")) | |
resultats = {} | |
for fichier in fichiers: | |
# Récupère le nom complet du modèle (par exemple unet_mini, unet_vgg16) | |
nom_modele = "_".join(os.path.basename(fichier).split("_")[:-2]) | |
df = pd.read_csv(fichier) | |
resultats[nom_modele] = df | |
return resultats | |
def tracer_metriques(resultats): | |
""" | |
Trace les métriques des différents modèles sur des graphiques. | |
Args: | |
resultats (dict): Dictionnaire avec nom modèle et dataframe. | |
""" | |
# Palette de couleurs spécifique pour chaque modèle | |
couleurs = { | |
"mini": "blue", | |
"vgg16": "green", | |
"resnet50": "red", | |
"efficientnetb3": "purple" | |
} | |
plt.figure(figsize=(18, 18)) | |
# Graphique de Loss (Perte) | |
plt.subplot(3, 2, 1) | |
for modele, df in resultats.items(): | |
couleur = couleurs.get(modele, "black") | |
plt.plot(df["loss"], label=f"{modele} Train Loss", color=couleur, linestyle="--") | |
plt.plot(df["val_loss"], label=f"{modele} Val Loss", color=couleur, linestyle="-") | |
plt.title("Comparaison des Loss (Perte)") | |
plt.xlabel("Epochs") | |
plt.ylabel("Loss") | |
plt.grid(True) | |
plt.legend() | |
# Graphique Mean IoU | |
plt.subplot(3, 2, 2) | |
for modele, df in resultats.items(): | |
couleur = couleurs.get(modele, "black") | |
if "mean_iou" in df.columns: | |
plt.plot(df["mean_iou"], label=f"{modele} Train Mean IoU", color=couleur, linestyle="--") | |
plt.plot(df["val_mean_iou"], label=f"{modele} Val Mean IoU", color=couleur, linestyle="-") | |
elif "iou_score" in df.columns: | |
plt.plot(df["iou_score"], label=f"{modele} Train IoU Score", color=couleur, linestyle="--") | |
plt.plot(df["val_iou_score"], label=f"{modele} Val IoU Score", color=couleur, linestyle="-") | |
plt.title("Comparaison du Mean IoU / IoU Score") | |
plt.xlabel("Epochs") | |
plt.ylabel("Mean IoU") | |
plt.grid(True) | |
plt.legend() | |
# Graphique Dice Coefficient | |
plt.subplot(3, 2, 3) | |
for modele, df in resultats.items(): | |
couleur = couleurs.get(modele, "black") | |
if "dice_coef" in df.columns: | |
plt.plot(df["dice_coef"], label=f"{modele} Train Dice", color=couleur, linestyle="--") | |
plt.plot(df["val_dice_coef"], label=f"{modele} Val Dice", color=couleur, linestyle="-") | |
plt.title("Comparaison du Dice Coefficient") | |
plt.xlabel("Epochs") | |
plt.ylabel("Dice Coefficient") | |
plt.grid(True) | |
plt.legend() | |
# Graphique Accuracy | |
plt.subplot(3, 2, 4) | |
for modele, df in resultats.items(): | |
couleur = couleurs.get(modele, "black") | |
if "accuracy" in df.columns: | |
plt.plot(df["accuracy"], label=f"{modele} Train Accuracy", color=couleur, linestyle="--") | |
plt.plot(df["val_accuracy"], label=f"{modele} Val Accuracy", color=couleur, linestyle="-") | |
plt.title("Comparaison de l'Accuracy") | |
plt.xlabel("Epochs") | |
plt.ylabel("Accuracy") | |
plt.grid(True) | |
plt.legend() | |
# Graphique Temps d'entraînement par modèle | |
plt.subplot(3, 1, 3) | |
temps_entrainement = {} | |
for modele, df in resultats.items(): | |
couleur = couleurs.get(modele, "black") | |
if "temps_total_sec" in df.columns: | |
temps = df["temps_total_sec"].iloc[-1] / 60 # converti en minutes | |
temps_entrainement[modele] = temps | |
plt.bar(modele, temps, color=couleur) | |
plt.text(modele, temps, f"{temps:.2f} min", ha="center", va="bottom") | |
plt.title("Comparaison du Temps total d'entraînement (en minutes)") | |
plt.ylabel("Temps (minutes)") | |
plt.grid(True, axis="y") | |
plt.tight_layout() | |
plt.show() | |
# -------------------- NOUVELLES FONCTIONS POUR PROJET 9 -------------------- | |
def charger_oneformer(num_classes: int = 8): | |
""" | |
Charge le modèle OneFormer adapté au dataset Cityscapes. | |
""" | |
from transformers import OneFormerForSemanticSegmentation | |
model = OneFormerForSemanticSegmentation.from_pretrained("nvidia/oneformer_coco_swin_large") | |
model.config.num_labels = num_classes | |
return model | |
def charger_segnext(num_classes: int = 8): | |
""" | |
Charge le modèle SegNeXt-L (simplifié avec timm ou autre wrapper). | |
""" | |
import timm | |
model = timm.create_model("segnext_l", pretrained=True, num_classes=num_classes) | |
return model | |
def entrainer_model_pytorch( | |
model, | |
train_loader, | |
val_loader, | |
model_name="model", | |
epochs=10, | |
lr=1e-4, | |
num_classes=8 | |
): | |
""" | |
Entraîne un modèle PyTorch de segmentation avec : | |
- Mixed Precision (torch.cuda.amp) | |
- GradScaler pour la stabilité | |
- Scheduler 'ReduceLROnPlateau' | |
- Gestion de la sortie pour SegFormer (SemanticSegmenterOutput) | |
ou un simple tenseur | |
- Upsampling de la sortie pour correspondre au masque (H, W) | |
- Calcul et log des métriques (accuracy, Dice, IoU) pour train et val | |
- Mesure du temps par epoch et de la mémoire GPU peak | |
- Sauvegarde CSV + .pth dans '../resultats_modeles/' | |
- Génération d'un graphique PNG de l'évolution du Dice et du Mean IoU. | |
""" | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.optim.lr_scheduler as lr_sched | |
from torch.cuda.amp import autocast, GradScaler | |
from transformers.modeling_outputs import SemanticSegmenterOutput | |
from tqdm import tqdm | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import os | |
import time | |
import torch.nn.functional as F | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
# -------- Définition locale des métriques PyTorch (évite doublons) -------- | |
def compute_batch_metrics(pred_logits, target, num_classes): | |
""" | |
Calcule accuracy, Dice et IoU moyens (macro) pour un batch. | |
- pred_logits: (N, C, H, W) | |
- target: (N, H, W) (valeurs entières [0..num_classes-1]) | |
Retourne un dict: {"accuracy": float, "dice": float, "iou": float} | |
""" | |
# 1) Conversion argmax => (N, H, W) | |
pred = torch.argmax(pred_logits, dim=1) | |
# 2) Accuracy globale (tous pixels confondus) | |
correct = (pred == target).sum().item() | |
total = target.numel() # N*H*W | |
accuracy = correct / total | |
# 3) Intersection / union par classe => Dice, IoU | |
dice_list = [] | |
iou_list = [] | |
for c in range(num_classes): | |
pred_c = (pred == c) | |
target_c = (target == c) | |
inter = (pred_c & target_c).sum().item() | |
pred_area = pred_c.sum().item() | |
target_area = target_c.sum().item() | |
union = pred_area + target_area - inter | |
# IoU | |
if union == 0: | |
# classe absente dans les 2 => convention IoU = 1 | |
iou_c = 1.0 | |
else: | |
iou_c = inter / union | |
# Dice = 2*inter / (|pred_c| + |target_c|) | |
denom = pred_area + target_area | |
if denom == 0: | |
dice_c = 1.0 | |
else: | |
dice_c = 2.0 * inter / denom | |
dice_list.append(dice_c) | |
iou_list.append(iou_c) | |
mean_dice = sum(dice_list) / len(dice_list) | |
mean_iou = sum(iou_list) / len(iou_list) | |
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou} | |
# -------- Setup Optim / Loss / Scheduler / GradScaler -------- | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=lr) | |
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True) | |
scaler = GradScaler() | |
os.makedirs("../resultats_modeles", exist_ok=True) | |
# -------- Structure du log -------- | |
log = { | |
"epoch": [], | |
"train_loss": [], | |
"val_loss": [], | |
"train_accuracy": [], | |
"train_dice_coef": [], | |
"train_mean_iou": [], | |
"val_accuracy": [], | |
"val_dice_coef": [], | |
"val_mean_iou": [], | |
"epoch_time_s": [], | |
"peak_gpu_mem_mb": [] | |
} | |
start_time = time.time() | |
# ============================ BOUCLE D'ENTRAÎNEMENT ============================ | |
for epoch in range(epochs): | |
# Pour mesurer le pic de mémoire GPU sur l'epoch | |
torch.cuda.reset_peak_memory_stats(device=device) | |
epoch_start = time.time() | |
# -------- TRAIN LOOP -------- | |
model.train() | |
running_loss = 0.0 | |
running_accuracy = 0.0 | |
running_dice = 0.0 | |
running_iou = 0.0 | |
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"): | |
images, masks = images.to(device), masks.to(device) | |
optimizer.zero_grad() | |
with autocast(): | |
outdict = model(images) | |
# Gérer SegFormer / DeepLab / simple Tensor | |
if isinstance(outdict, SemanticSegmenterOutput): | |
logits = outdict.logits | |
elif isinstance(outdict, dict): | |
logits = outdict["out"] | |
else: | |
logits = outdict | |
# Upsample -> (N, C, H, W) = taille de masks | |
logits = F.interpolate( | |
logits, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
loss = criterion(logits, masks) | |
scaler.scale(loss).backward() | |
scaler.step(optimizer) | |
scaler.update() | |
running_loss += loss.item() | |
# Calcul des métriques sur ce batch | |
metrics_batch = compute_batch_metrics(logits, masks, num_classes=num_classes) | |
running_accuracy += metrics_batch["accuracy"] | |
running_dice += metrics_batch["dice"] | |
running_iou += metrics_batch["iou"] | |
avg_train_loss = running_loss / len(train_loader) | |
avg_train_accuracy = running_accuracy / len(train_loader) | |
avg_train_dice = running_dice / len(train_loader) | |
avg_train_iou = running_iou / len(train_loader) | |
# -------- VALID LOOP -------- | |
model.eval() | |
val_running_loss = 0.0 | |
val_running_accuracy = 0.0 | |
val_running_dice = 0.0 | |
val_running_iou = 0.0 | |
with torch.no_grad(): | |
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"): | |
images, masks = images.to(device), masks.to(device) | |
with autocast(): | |
outdict = model(images) | |
if isinstance(outdict, SemanticSegmenterOutput): | |
logits = outdict.logits | |
elif isinstance(outdict, dict): | |
logits = outdict["out"] | |
else: | |
logits = outdict | |
logits = F.interpolate( | |
logits, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
loss_val = criterion(logits, masks) | |
val_running_loss += loss_val.item() | |
metrics_batch_val = compute_batch_metrics(logits, masks, num_classes=num_classes) | |
val_running_accuracy += metrics_batch_val["accuracy"] | |
val_running_dice += metrics_batch_val["dice"] | |
val_running_iou += metrics_batch_val["iou"] | |
avg_val_loss = val_running_loss / len(val_loader) | |
avg_val_accuracy = val_running_accuracy / len(val_loader) | |
avg_val_dice = val_running_dice / len(val_loader) | |
avg_val_iou = val_running_iou / len(val_loader) | |
# -------- Scheduler : ReduceLROnPlateau -------- | |
scheduler.step(avg_val_loss) | |
# -------- Log de fin d’epoch -------- | |
epoch_time = time.time() - epoch_start | |
peak_mem = torch.cuda.max_memory_allocated(device=device) | |
peak_mem_mb = peak_mem / (1024 ** 2) | |
log["epoch"].append(epoch + 1) | |
log["train_loss"].append(avg_train_loss) | |
log["val_loss"].append(avg_val_loss) | |
log["train_accuracy"].append(avg_train_accuracy) | |
log["train_dice_coef"].append(avg_train_dice) | |
log["train_mean_iou"].append(avg_train_iou) | |
log["val_accuracy"].append(avg_val_accuracy) | |
log["val_dice_coef"].append(avg_val_dice) | |
log["val_mean_iou"].append(avg_val_iou) | |
log["epoch_time_s"].append(epoch_time) | |
log["peak_gpu_mem_mb"].append(peak_mem_mb) | |
print( | |
f"📉 Epoch {epoch+1} | " | |
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | " | |
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | " | |
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | " | |
f"Time: {epoch_time:.1f}s | GPU: {peak_mem_mb:.1f} MB" | |
) | |
# ============================ FIN DE L'ENTRAÎNEMENT ============================ | |
total_time = time.time() - start_time | |
# -------- Sauvegarde du log en CSV -------- | |
df = pd.DataFrame(log) | |
df["temps_total_sec"] = total_time | |
os.makedirs("../resultats_modeles", exist_ok=True) | |
csv_path = f"../resultats_modeles/{model_name}_log.csv" | |
df.to_csv(csv_path, index=False) | |
# -------- Sauvegarde des poids -------- | |
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth") | |
# -------- Génération et sauvegarde d'un graphique (Dice/IoU) -------- | |
plt.figure(figsize=(12, 5)) | |
# Subplot 1 : Dice | |
plt.subplot(1, 2, 1) | |
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue") | |
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange") | |
plt.title("Dice Coefficient") | |
plt.xlabel("Epoch") | |
plt.ylabel("Dice") | |
plt.legend() | |
plt.grid(True) | |
# Subplot 2 : IoU | |
plt.subplot(1, 2, 2) | |
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue") | |
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange") | |
plt.title("Mean IoU") | |
plt.xlabel("Epoch") | |
plt.ylabel("IoU") | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
png_path = f"../resultats_modeles/{model_name}_dice_iou.png" | |
plt.savefig(png_path, dpi=100) | |
plt.close() | |
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.") | |
print(f"📁 Logs : {csv_path}") | |
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth") | |
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}") | |
def comparer_resultats(dossier='../resultats_modeles'): | |
""" | |
Affiche les courbes d'apprentissage de chaque modèle entraîné. | |
""" | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import os | |
plt.figure(figsize=(10, 6)) | |
for file in os.listdir(dossier): | |
if file.endswith("_log.csv"): | |
df = pd.read_csv(os.path.join(dossier, file)) | |
nom = file.replace("_log.csv", "") | |
plt.plot(df["epoch"], df["train_loss"], label=f"{nom} train") | |
plt.plot(df["epoch"], df["val_loss"], label=f"{nom} val") | |
plt.title("Courbes d'apprentissage") | |
plt.xlabel("Epoch") | |
plt.ylabel("Loss") | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
plt.show() | |
# ---------------------- FONCTIONS REECRITE POUR LE PROJET 9 -------------------- | |
def charger_donnees_cityscapes(data_dir: str, batch_size: int = 16, image_size: Tuple[int, int] = (256, 256)): | |
""" | |
Charge les données Cityscapes et retourne deux DataLoaders (train et val). | |
Utilise CityscapesDataset, et applique: | |
- num_workers=4 | |
- pin_memory=True | |
pour des perfs optimales sur GPU | |
""" | |
from torch.utils.data import DataLoader | |
train_dataset = CityscapesDataset(root=data_dir, split="train", image_size=image_size) | |
val_dataset = CityscapesDataset(root=data_dir, split="val", image_size=image_size) | |
train_loader = DataLoader( | |
train_dataset, | |
batch_size=batch_size, | |
shuffle=True, | |
num_workers=0, | |
pin_memory=True | |
) | |
val_loader = DataLoader( | |
val_dataset, | |
batch_size=batch_size, | |
shuffle=False, | |
num_workers=0, | |
pin_memory=True | |
) | |
return train_loader, val_loader | |
import matplotlib.patches as mpatches | |
# Palette colorimétrique douce (8 classes utiles) | |
PALETTE = { | |
0: (0, 0, 0), # void → noir | |
1: (50, 50, 150), # flat → bleu foncé | |
2: (102, 0, 204), # construction → violet | |
3: (255, 85, 0), # object → orange | |
4: (255, 255, 0), # nature → jaune | |
5: (0, 255, 255), # sky → cyan | |
6: (255, 0, 255), # human → magenta | |
7: (255, 255, 255), # vehicle → blanc | |
} | |
CLASS_NAMES = { | |
0: "void", | |
1: "flat", | |
2: "construction", | |
3: "object", | |
4: "nature", | |
5: "sky", | |
6: "human", | |
7: "vehicle" | |
} | |
def decode_cityscapes_mask(mask): | |
""" | |
Convertit un masque 2D (valeurs de 0 à 7) en image RGB pour affichage. | |
""" | |
h, w = mask.shape | |
mask_rgb = np.zeros((h, w, 3), dtype=np.uint8) | |
for class_id, color in PALETTE.items(): | |
mask_rgb[mask == class_id] = color | |
return mask_rgb | |
def afficher_image_et_masque(image_tensor, mask_tensor): | |
import matplotlib.pyplot as plt | |
from matplotlib.colors import ListedColormap | |
import numpy as np | |
PALETTE = [ | |
(0, 0, 0), # 0 - void | |
(100, 0, 200), # 1 - flat | |
(70, 70, 70), # 2 - construction | |
(250, 170, 30), # 3 - object | |
(107, 142, 35), # 4 - nature | |
(70, 130, 180), # 5 - sky | |
(220, 20, 60), # 6 - human | |
(0, 0, 142), # 7 - vehicle | |
] | |
PALETTE_NP = np.array(PALETTE) / 255.0 | |
cmap = ListedColormap(PALETTE_NP) | |
image_np = image_tensor.permute(1, 2, 0).cpu().numpy() | |
mask_np = mask_tensor.cpu().numpy() | |
plt.figure(figsize=(12, 5)) | |
plt.subplot(1, 2, 1) | |
plt.imshow(image_np) | |
plt.title("Image") | |
plt.axis("off") | |
plt.subplot(1, 2, 2) | |
im = plt.imshow(mask_np, cmap=cmap, vmin=0, vmax=7) | |
cbar = plt.colorbar(im, ticks=range(8)) | |
cbar.ax.set_yticklabels(['void', 'flat', 'construction', 'object', 'nature', 'sky', 'human', 'vehicle']) | |
cbar.set_label("Catégories", rotation=270, labelpad=15) | |
plt.title("Masque (8 classes colorisées)") | |
plt.axis("off") | |
plt.tight_layout() | |
plt.show() | |
def charger_segformer(num_classes=8): | |
from transformers import SegformerForSemanticSegmentation | |
model = SegformerForSemanticSegmentation.from_pretrained( | |
"nvidia/segformer-b5-finetuned-ade-640-640", | |
num_labels=8, | |
ignore_mismatched_sizes=True | |
) | |
model.config.num_labels = num_classes | |
model.config.output_hidden_states = False | |
return model | |
def charger_deeplabv3plus(num_classes=8): | |
import torchvision.models.segmentation as models | |
import torch.nn as nn | |
model = models.deeplabv3_resnet101(pretrained=True) | |
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1) | |
return model | |
class MiniCityscapesDataset(torch.utils.data.Dataset): | |
def __init__(self, image_paths, mask_paths, image_size=(256, 256)): | |
self.image_paths = image_paths | |
self.mask_paths = mask_paths | |
self.image_size = image_size | |
def __len__(self): | |
return len(self.image_paths) | |
def __getitem__(self, idx): | |
# Charger l’image et le masque | |
image_path = self.image_paths[idx] | |
mask_path = self.mask_paths[idx] | |
# Charger l’image | |
from PIL import Image | |
image = Image.open(image_path).convert("RGB").resize(self.image_size) | |
# Charger le masque | |
mask = Image.open(mask_path).convert("L").resize(self.image_size) | |
# Convertir en tenseur PyTorch | |
import torchvision.transforms as T | |
to_tensor = T.ToTensor() | |
image = to_tensor(image) # shape (3, H, W) | |
# Numpy + remap classes | |
import numpy as np | |
mask_np = np.array(mask, dtype=np.uint8) | |
# Remap | |
mask_np = remap_classes(mask_np) | |
mask_tensor = torch.from_numpy(mask_np).long() # shape (H, W) | |
return image, mask_tensor | |
def show_predictions(model, dataset, num_images=3, num_classes=8): | |
""" | |
Affiche quelques prédictions vs masques réels depuis un dataset PyTorch. | |
Gère upsample, SegFormer / DeepLab / etc. | |
""" | |
import torch | |
import matplotlib.pyplot as plt | |
from transformers.modeling_outputs import SemanticSegmenterOutput | |
import torch.nn.functional as F | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.eval().to(device) | |
fig, axes = plt.subplots(num_images, 3, figsize=(12, 4 * num_images)) | |
for i in range(num_images): | |
# Choisir un index aléatoire | |
idx = np.random.randint(0, len(dataset)) | |
image, mask_gt = dataset[idx] # (3, H, W), (H, W) | |
image_t = image.unsqueeze(0).to(device) # (1, 3, H, W) | |
mask_gt_np = mask_gt.numpy() # (H, W) | |
with torch.no_grad(): | |
outdict = model(image_t) | |
if isinstance(outdict, SemanticSegmenterOutput): | |
logits = outdict.logits | |
elif isinstance(outdict, dict): | |
logits = outdict["out"] | |
else: | |
logits = outdict | |
logits = F.interpolate( | |
logits, | |
size=mask_gt.shape, | |
mode='bilinear', | |
align_corners=False | |
) | |
pred = logits.argmax(dim=1).squeeze(0).cpu().numpy() # (H, W) | |
# AFFICHAGES | |
axes[i, 0].imshow(image.permute(1, 2, 0).numpy()) | |
axes[i, 0].set_title("Image") | |
axes[i, 0].axis("off") | |
axes[i, 1].imshow(mask_gt_np, cmap="tab10", vmin=0, vmax=num_classes-1) | |
axes[i, 1].set_title("Masque GT") | |
axes[i, 1].axis("off") | |
axes[i, 2].imshow(pred, cmap="tab10", vmin=0, vmax=num_classes-1) | |
axes[i, 2].set_title("Masque Prédit") | |
axes[i, 2].axis("off") | |
plt.tight_layout() | |
plt.show() | |
def charger_maskformer(num_classes=8): | |
""" | |
Charge un modèle MaskFormer (HuggingFace Transformers) | |
pour la segmentation. | |
S'appuie sur un checkpoint préentraîné sur ADE20K. | |
""" | |
from transformers import MaskFormerForInstanceSegmentation | |
# Exemple : "facebook/maskformer-swin-large-ade" (semantic sur ADE20K) | |
# ou "facebook/maskformer-swin-base-coco" (panoptic/instance, COCO) | |
# À adapter selon votre besoin. | |
checkpoint = "facebook/maskformer-swin-large-ade" | |
model = MaskFormerForInstanceSegmentation.from_pretrained( | |
checkpoint, | |
ignore_mismatched_sizes=True # parfois nécessaire si on change num_labels | |
) | |
# Ajuster le nombre de classes pour Cityscapes (8) | |
model.config.num_labels = num_classes | |
# Facultatif : désactiver l'output des hidden states | |
model.config.output_hidden_states = False | |
return model | |
import torch | |
import torch.nn.functional as F | |
def maskformer_aggregator( | |
class_queries_logits: torch.Tensor, | |
masks_queries_logits: torch.Tensor | |
) -> torch.Tensor: | |
""" | |
Combine les prédictions de Mask(2)Former (class_queries_logits, masks_queries_logits) | |
en un tenseur de forme (N, C, H, W) pour la segmentation sémantique. | |
Hypothèses : | |
- class_queries_logits: (N, Q, C) [logits par classe pour chaque query] | |
- masks_queries_logits: (N, Q, H, W) [logits masques (souvent à interpréter en sigmoid)] | |
Approche naïve : | |
1) On transforme class_queries_logits en probabilités par softmax sur la dimension 'classe' (C). | |
2) On applique une sigmoïde sur masks_queries_logits pour obtenir p(query=1) par pixel. | |
3) On effectue un produit de chacun de ces masques par la proba de sa classe, | |
puis on somme sur la dimension 'Q' pour obtenir un tenseur (N, C, H, W). | |
4) On laisse ce tenseur en l'état (non normalisé) pour que CrossEntropyLoss effectue | |
son propre softmax. On l'appelle 'aggregated_logits'. | |
Résultat : | |
aggregated_logits.shape == (N, C, H, W), | |
que vous pourrez envoyer dans F.cross_entropy(aggregated_logits, targets). | |
""" | |
# 1) Softmax sur la dimension 'classe' => shape (N, Q, C) | |
class_probs = F.softmax(class_queries_logits, dim=2) | |
# 2) Sigmoïde sur la dimension 'pixel' => shape (N, Q, H, W) | |
mask_probs = torch.sigmoid(masks_queries_logits) | |
# 3) Produit puis somme : on fait un Einstein summation ou un broadcasting | |
# aggregated[b, c, h, w] = sum_q( class_probs[b,q,c] * mask_probs[b,q,h,w] ) | |
aggregated = torch.einsum('bqc, bqhw -> bchw', class_probs, mask_probs) | |
# Ici, aggregated est un "score" par classe et par pixel, non normalisé. | |
# CrossEntropyLoss attend un tenseur (N, C, H, W) de logits, | |
# puis fait un log_softmax interne. aggregated étant positif, on peut | |
# éventuellement l'écraser un peu. Mais on le laisse tel quel. | |
return aggregated | |
def training_for_maskformer( | |
model, | |
train_loader, | |
val_loader, | |
model_name="maskformer", | |
epochs=10, | |
lr=1e-4, | |
num_classes=8 | |
): | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.optim.lr_scheduler as lr_sched | |
from torch.cuda.amp import autocast, GradScaler | |
from tqdm import tqdm | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import os | |
import time | |
import torch.nn.functional as F | |
# On importe la fonction aggregator | |
from fonctions import maskformer_aggregator | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
# Métriques | |
def compute_batch_metrics(pred_logits, target, nb_classes): | |
pred = torch.argmax(pred_logits, dim=1) | |
correct = (pred == target).sum().item() | |
total = target.numel() | |
accuracy = correct / total | |
dice_list = [] | |
iou_list = [] | |
for c in range(nb_classes): | |
pred_c = (pred == c) | |
target_c = (target == c) | |
inter = (pred_c & target_c).sum().item() | |
pred_area = pred_c.sum().item() | |
target_area = target_c.sum().item() | |
union = pred_area + target_area - inter | |
iou_c = 1.0 if union == 0 else inter / union | |
denom = pred_area + target_area | |
dice_c = 1.0 if denom == 0 else (2.0 * inter / denom) | |
dice_list.append(dice_c) | |
iou_list.append(iou_c) | |
mean_dice = sum(dice_list) / len(dice_list) | |
mean_iou = sum(iou_list) / len(iou_list) | |
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou} | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=lr) | |
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True) | |
scaler = GradScaler() | |
os.makedirs("../resultats_modeles", exist_ok=True) | |
log = { | |
"epoch": [], | |
"train_loss": [], | |
"val_loss": [], | |
"train_accuracy": [], | |
"train_dice_coef": [], | |
"train_mean_iou": [], | |
"val_accuracy": [], | |
"val_dice_coef": [], | |
"val_mean_iou": [], | |
"epoch_time_s": [], | |
"peak_gpu_mem_mb": [] | |
} | |
start_time = time.time() | |
for epoch in range(epochs): | |
torch.cuda.reset_peak_memory_stats(device=device) | |
epoch_start = time.time() | |
# ---------------- TRAIN ---------------- | |
model.train() | |
running_loss = 0.0 | |
running_accuracy = 0.0 | |
running_dice = 0.0 | |
running_iou = 0.0 | |
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"): | |
images, masks = images.to(device), masks.to(device) | |
optimizer.zero_grad() | |
with autocast(): | |
outputs = model(images) | |
# outputs est de type MaskFormerForInstanceSegmentationOutput | |
class_queries = outputs.class_queries_logits # (N, Q, num_labels) | |
masks_queries = outputs.masks_queries_logits # (N, Q, h, w) | |
# On upsample les masques pour correspondre à la taille des ground truth | |
masks_queries = F.interpolate( | |
masks_queries, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
# On agrège en un tenseur (N, C, H, W) | |
aggregated_logits = maskformer_aggregator(class_queries, masks_queries) | |
loss = criterion(aggregated_logits, masks) | |
scaler.scale(loss).backward() | |
scaler.step(optimizer) | |
scaler.update() | |
running_loss += loss.item() | |
# Métriques | |
metrics_batch = compute_batch_metrics(aggregated_logits, masks, num_classes) | |
running_accuracy += metrics_batch["accuracy"] | |
running_dice += metrics_batch["dice"] | |
running_iou += metrics_batch["iou"] | |
avg_train_loss = running_loss / len(train_loader) | |
avg_train_accuracy = running_accuracy / len(train_loader) | |
avg_train_dice = running_dice / len(train_loader) | |
avg_train_iou = running_iou / len(train_loader) | |
# ---------------- VAL ---------------- | |
model.eval() | |
val_running_loss = 0.0 | |
val_running_accuracy = 0.0 | |
val_running_dice = 0.0 | |
val_running_iou = 0.0 | |
with torch.no_grad(): | |
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"): | |
images, masks = images.to(device), masks.to(device) | |
with autocast(): | |
outputs = model(images) | |
class_queries = outputs.class_queries_logits | |
masks_queries = outputs.masks_queries_logits | |
masks_queries = F.interpolate( | |
masks_queries, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
aggregated_logits = maskformer_aggregator(class_queries, masks_queries) | |
loss_val = criterion(aggregated_logits, masks) | |
val_running_loss += loss_val.item() | |
val_metrics = compute_batch_metrics(aggregated_logits, masks, num_classes) | |
val_running_accuracy += val_metrics["accuracy"] | |
val_running_dice += val_metrics["dice"] | |
val_running_iou += val_metrics["iou"] | |
avg_val_loss = val_running_loss / len(val_loader) | |
avg_val_accuracy = val_running_accuracy / len(val_loader) | |
avg_val_dice = val_running_dice / len(val_loader) | |
avg_val_iou = val_running_iou / len(val_loader) | |
scheduler.step(avg_val_loss) | |
epoch_time = time.time() - epoch_start | |
peak_mem = torch.cuda.max_memory_allocated(device=device) / (1024 ** 2) | |
log["epoch"].append(epoch + 1) | |
log["train_loss"].append(avg_train_loss) | |
log["val_loss"].append(avg_val_loss) | |
log["train_accuracy"].append(avg_train_accuracy) | |
log["train_dice_coef"].append(avg_train_dice) | |
log["train_mean_iou"].append(avg_train_iou) | |
log["val_accuracy"].append(avg_val_accuracy) | |
log["val_dice_coef"].append(avg_val_dice) | |
log["val_mean_iou"].append(avg_val_iou) | |
log["epoch_time_s"].append(epoch_time) | |
log["peak_gpu_mem_mb"].append(peak_mem) | |
print( | |
f"Epoch {epoch+1} | " | |
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | " | |
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | " | |
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | " | |
f"Time: {epoch_time:.1f}s | GPU: {peak_mem:.1f} MB" | |
) | |
total_time = time.time() - start_time | |
df = pd.DataFrame(log) | |
df["temps_total_sec"] = total_time | |
csv_path = f"../resultats_modeles/{model_name}_log.csv" | |
df.to_csv(csv_path, index=False) | |
# Sauvegarde du modèle | |
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth") | |
# Génération d’un graphique Dice/IoU | |
plt.figure(figsize=(12, 5)) | |
# Plot Dice | |
plt.subplot(1, 2, 1) | |
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue") | |
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange") | |
plt.title("Dice Coefficient") | |
plt.xlabel("Epoch") | |
plt.ylabel("Dice") | |
plt.legend() | |
plt.grid(True) | |
# Plot IoU | |
plt.subplot(1, 2, 2) | |
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue") | |
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange") | |
plt.title("Mean IoU") | |
plt.xlabel("Epoch") | |
plt.ylabel("IoU") | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
png_path = f"../resultats_modeles/{model_name}_dice_iou.png" | |
plt.savefig(png_path, dpi=100) | |
plt.close() | |
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.") | |
print(f"📁 Logs : {csv_path}") | |
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth") | |
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}") | |
def training_for_mask2former( | |
model, | |
train_loader, | |
val_loader, | |
model_name="mask2former", | |
epochs=10, | |
lr=1e-4, | |
num_classes=8 | |
): | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.optim.lr_scheduler as lr_sched | |
from torch.cuda.amp import autocast, GradScaler | |
from tqdm import tqdm | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import os | |
import time | |
import torch.nn.functional as F | |
from fonctions import maskformer_aggregator | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
def compute_batch_metrics(pred_logits, target, nb_classes): | |
pred = torch.argmax(pred_logits, dim=1) | |
correct = (pred == target).sum().item() | |
total = target.numel() | |
accuracy = correct / total | |
dice_list = [] | |
iou_list = [] | |
for c in range(nb_classes): | |
pred_c = (pred == c) | |
target_c = (target == c) | |
inter = (pred_c & target_c).sum().item() | |
pred_area = pred_c.sum().item() | |
target_area = target_c.sum().item() | |
union = pred_area + target_area - inter | |
iou_c = 1.0 if union == 0 else inter / union | |
denom = pred_area + target_area | |
dice_c = 1.0 if denom == 0 else (2.0 * inter / denom) | |
dice_list.append(dice_c) | |
iou_list.append(iou_c) | |
mean_dice = sum(dice_list) / len(dice_list) | |
mean_iou = sum(iou_list) / len(iou_list) | |
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou} | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=lr) | |
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True) | |
scaler = GradScaler() | |
os.makedirs("../resultats_modeles", exist_ok=True) | |
log = { | |
"epoch": [], | |
"train_loss": [], | |
"val_loss": [], | |
"train_accuracy": [], | |
"train_dice_coef": [], | |
"train_mean_iou": [], | |
"val_accuracy": [], | |
"val_dice_coef": [], | |
"val_mean_iou": [], | |
"epoch_time_s": [], | |
"peak_gpu_mem_mb": [] | |
} | |
start_time = time.time() | |
for epoch in range(epochs): | |
torch.cuda.reset_peak_memory_stats(device=device) | |
epoch_start = time.time() | |
# ---------------- TRAIN ---------------- | |
model.train() | |
running_loss = 0.0 | |
running_accuracy = 0.0 | |
running_dice = 0.0 | |
running_iou = 0.0 | |
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"): | |
images, masks = images.to(device), masks.to(device) | |
optimizer.zero_grad() | |
with autocast(): | |
outputs = model(images) | |
# outputs est de type Mask2FormerForUniversalSegmentationOutput | |
class_queries = outputs.class_queries_logits # (N, Q, num_labels) | |
masks_queries = outputs.masks_queries_logits # (N, Q, h, w) | |
masks_queries = F.interpolate( | |
masks_queries, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
aggregated_logits = maskformer_aggregator(class_queries, masks_queries) | |
loss = criterion(aggregated_logits, masks) | |
scaler.scale(loss).backward() | |
scaler.step(optimizer) | |
scaler.update() | |
running_loss += loss.item() | |
metrics_batch = compute_batch_metrics(aggregated_logits, masks, num_classes) | |
running_accuracy += metrics_batch["accuracy"] | |
running_dice += metrics_batch["dice"] | |
running_iou += metrics_batch["iou"] | |
avg_train_loss = running_loss / len(train_loader) | |
avg_train_accuracy = running_accuracy / len(train_loader) | |
avg_train_dice = running_dice / len(train_loader) | |
avg_train_iou = running_iou / len(train_loader) | |
# ---------------- VAL ---------------- | |
model.eval() | |
val_running_loss = 0.0 | |
val_running_accuracy = 0.0 | |
val_running_dice = 0.0 | |
val_running_iou = 0.0 | |
with torch.no_grad(): | |
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"): | |
images, masks = images.to(device), masks.to(device) | |
with autocast(): | |
outputs = model(images) | |
class_queries = outputs.class_queries_logits | |
masks_queries = outputs.masks_queries_logits | |
masks_queries = F.interpolate( | |
masks_queries, | |
size=(masks.shape[-2], masks.shape[-1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
aggregated_logits = maskformer_aggregator(class_queries, masks_queries) | |
loss_val = criterion(aggregated_logits, masks) | |
val_running_loss += loss_val.item() | |
val_metrics = compute_batch_metrics(aggregated_logits, masks, num_classes) | |
val_running_accuracy += val_metrics["accuracy"] | |
val_running_dice += val_metrics["dice"] | |
val_running_iou += val_metrics["iou"] | |
avg_val_loss = val_running_loss / len(val_loader) | |
avg_val_accuracy = val_running_accuracy / len(val_loader) | |
avg_val_dice = val_running_dice / len(val_loader) | |
avg_val_iou = val_running_iou / len(val_loader) | |
scheduler.step(avg_val_loss) | |
epoch_time = time.time() - epoch_start | |
peak_mem = torch.cuda.max_memory_allocated(device=device) / (1024 ** 2) | |
log["epoch"].append(epoch + 1) | |
log["train_loss"].append(avg_train_loss) | |
log["val_loss"].append(avg_val_loss) | |
log["train_accuracy"].append(avg_train_accuracy) | |
log["train_dice_coef"].append(avg_train_dice) | |
log["train_mean_iou"].append(avg_train_iou) | |
log["val_accuracy"].append(avg_val_accuracy) | |
log["val_dice_coef"].append(avg_val_dice) | |
log["val_mean_iou"].append(avg_val_iou) | |
log["epoch_time_s"].append(epoch_time) | |
log["peak_gpu_mem_mb"].append(peak_mem) | |
print( | |
f"Epoch {epoch+1} | " | |
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | " | |
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | " | |
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | " | |
f"Time: {epoch_time:.1f}s | GPU: {peak_mem:.1f} MB" | |
) | |
total_time = time.time() - start_time | |
df = pd.DataFrame(log) | |
df["temps_total_sec"] = total_time | |
csv_path = f"../resultats_modeles/{model_name}_log.csv" | |
df.to_csv(csv_path, index=False) | |
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth") | |
# Génération courbes Dice/IoU | |
plt.figure(figsize=(12, 5)) | |
plt.subplot(1, 2, 1) | |
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue") | |
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange") | |
plt.title("Dice Coefficient") | |
plt.xlabel("Epoch") | |
plt.ylabel("Dice") | |
plt.legend() | |
plt.grid(True) | |
plt.subplot(1, 2, 2) | |
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue") | |
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange") | |
plt.title("Mean IoU") | |
plt.xlabel("Epoch") | |
plt.ylabel("IoU") | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
png_path = f"../resultats_modeles/{model_name}_dice_iou.png" | |
plt.savefig(png_path, dpi=100) | |
plt.close() | |
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.") | |
print(f"📁 Logs : {csv_path}") | |
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth") | |
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}") | |
def show_predictions_maskformer( | |
model, | |
dataset, | |
num_images=3, | |
num_classes=8 | |
): | |
""" | |
Affiche quelques prédictions vs masques réels depuis un dataset PyTorch, | |
pour un modèle MaskFormer-like (avec class_queries_logits et masks_queries_logits). | |
1) On récupère `class_queries_logits` et `masks_queries_logits`. | |
2) On upsample le masks_queries_logits à la taille du masque target. | |
3) On agrège via maskformer_aggregator pour obtenir un tenseur (N, C, H, W). | |
4) On calcule un argmax (H, W) pour l'affichage. | |
""" | |
import torch | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from torch.cuda.amp import autocast | |
import torch.nn.functional as F | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.eval().to(device) | |
# On importe la fonction aggregator déjà définie | |
# (celle qui combine class_queries_logits et masks_queries_logits) | |
from fonctions import maskformer_aggregator | |
fig, axes = plt.subplots(num_images, 3, figsize=(12, 4 * num_images)) | |
for i in range(num_images): | |
idx = np.random.randint(0, len(dataset)) | |
image, mask_gt = dataset[idx] # (3, H, W), (H, W) | |
image_t = image.unsqueeze(0).to(device) # (1, 3, H, W) | |
mask_gt_np = mask_gt.numpy() # (H, W) | |
with torch.no_grad(), autocast(): | |
outputs = model(image_t) | |
# Récupération des logits | |
class_queries = outputs.class_queries_logits # (1, Q, num_labels) | |
masks_queries = outputs.masks_queries_logits # (1, Q, h, w) | |
# Upsample le masks_queries à la taille du mask GT | |
masks_queries = F.interpolate( | |
masks_queries, | |
size=(mask_gt_np.shape[0], mask_gt_np.shape[1]), | |
mode='bilinear', | |
align_corners=False | |
) | |
# Agrégation => (1, C, H, W) | |
aggregated_logits = maskformer_aggregator(class_queries, masks_queries) | |
# Argmax => (H, W) | |
pred = torch.argmax(aggregated_logits, dim=1).squeeze(0).cpu().numpy() | |
# AFFICHAGE | |
if num_images == 1: | |
# Juste 1 image => axes est un tableau 1D [3 subplots] | |
ax_img, ax_gt, ax_pred = axes | |
else: | |
ax_img, ax_gt, ax_pred = axes[i] | |
ax_img.imshow(image.permute(1, 2, 0).cpu().numpy()) | |
ax_img.set_title("Image") | |
ax_img.axis("off") | |
ax_gt.imshow(mask_gt_np, cmap="tab10", vmin=0, vmax=num_classes-1) | |
ax_gt.set_title("Masque GT") | |
ax_gt.axis("off") | |
ax_pred.imshow(pred, cmap="tab10", vmin=0, vmax=num_classes-1) | |
ax_pred.set_title("Masque Prédit") | |
ax_pred.axis("off") | |
plt.tight_layout() | |
plt.show() | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import os | |
def comparer_modeles(list_csv_files, model_names=None): | |
""" | |
Compare plusieurs modèles sur les métriques d'entraînement (loss, dice, iou, accuracy) | |
et affiche un bar chart du temps total. | |
Args: | |
list_csv_files (list): liste des chemins vers les fichiers CSV de logs. | |
model_names (list): noms courts à afficher en légende. Doit être de même taille que list_csv_files. | |
Si None, on utilise le nom de fichier. | |
""" | |
import os | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
if model_names is None: | |
model_names = [os.path.splitext(os.path.basename(csv_file))[0] for csv_file in list_csv_files] | |
# On charge chaque CSV dans un DataFrame, qu'on stocke dans un dict | |
model_data = {} | |
for csv_file, name in zip(list_csv_files, model_names): | |
df = pd.read_csv(csv_file) | |
model_data[name] = df | |
# Couleurs prédéfinies pour la cohérence | |
color_list = ["red", "blue", "green", "purple", "orange", "black"] | |
# Création de la figure : 3 lignes, 2 colonnes → 5 subplots (le dernier occupant une ligne entière) | |
fig = plt.figure(figsize=(14, 14)) | |
# -- SUBPLOT 1 : Loss (en haut à gauche) -- | |
ax1 = plt.subplot2grid((3, 2), (0, 0)) | |
ax1.set_title("Comparaison des Loss (Perte)") | |
ax1.set_xlabel("Epochs") | |
ax1.set_ylabel("Loss") | |
for i, (name, df) in enumerate(model_data.items()): | |
c = color_list[i % len(color_list)] | |
if "train_loss" in df.columns and "val_loss" in df.columns: | |
ax1.plot(df["epoch"], df["train_loss"], label=f"{name} Train Loss", color=c, linestyle="--") | |
ax1.plot(df["epoch"], df["val_loss"], label=f"{name} Val Loss", color=c, linestyle="-") | |
ax1.grid(True) | |
ax1.legend() | |
# -- SUBPLOT 2 : Accuracy (en haut à droite) -- | |
ax2 = plt.subplot2grid((3, 2), (0, 1)) | |
ax2.set_title("Comparaison de l'Accuracy") | |
ax2.set_xlabel("Epochs") | |
ax2.set_ylabel("Accuracy") | |
for i, (name, df) in enumerate(model_data.items()): | |
c = color_list[i % len(color_list)] | |
if "train_accuracy" in df.columns and "val_accuracy" in df.columns: | |
ax2.plot(df["epoch"], df["train_accuracy"], label=f"{name} Train Acc", color=c, linestyle="--") | |
ax2.plot(df["epoch"], df["val_accuracy"], label=f"{name} Val Acc", color=c, linestyle="-") | |
ax2.grid(True) | |
ax2.legend() | |
# -- SUBPLOT 3 : Dice (en bas à gauche) -- | |
ax3 = plt.subplot2grid((3, 2), (1, 0)) | |
ax3.set_title("Comparaison du Dice Coefficient") | |
ax3.set_xlabel("Epochs") | |
ax3.set_ylabel("Dice Coefficient") | |
for i, (name, df) in enumerate(model_data.items()): | |
c = color_list[i % len(color_list)] | |
if "train_dice_coef" in df.columns and "val_dice_coef" in df.columns: | |
ax3.plot(df["epoch"], df["train_dice_coef"], label=f"{name} Train Dice", color=c, linestyle="--") | |
ax3.plot(df["epoch"], df["val_dice_coef"], label=f"{name} Val Dice", color=c, linestyle="-") | |
ax3.grid(True) | |
ax3.legend() | |
# -- SUBPLOT 4 : Mean IoU (en bas à droite) -- | |
ax4 = plt.subplot2grid((3, 2), (1, 1)) | |
ax4.set_title("Comparaison du Mean IoU") | |
ax4.set_xlabel("Epochs") | |
ax4.set_ylabel("Mean IoU") | |
for i, (name, df) in enumerate(model_data.items()): | |
c = color_list[i % len(color_list)] | |
if "train_mean_iou" in df.columns and "val_mean_iou" in df.columns: | |
ax4.plot(df["epoch"], df["train_mean_iou"], label=f"{name} Train IoU", color=c, linestyle="--") | |
ax4.plot(df["epoch"], df["val_mean_iou"], label=f"{name} Val IoU", color=c, linestyle="-") | |
ax4.grid(True) | |
ax4.legend() | |
# -- SUBPLOT 5 : Temps total (bar chart) -- | |
ax5 = plt.subplot2grid((3, 2), (2, 0), colspan=2) | |
ax5.set_title("Comparaison du Temps total d'entraînement (en minutes)") | |
training_times = [] | |
for i, (name, df) in enumerate(model_data.items()): | |
if "temps_total_sec" in df.columns: | |
total_time_sec = df["temps_total_sec"].iloc[-1] | |
total_time_min = total_time_sec / 60 | |
else: | |
total_time_min = 0 | |
training_times.append((name, total_time_min)) | |
x_labels = [t[0] for t in training_times] | |
y_values = [t[1] for t in training_times] | |
bars = ax5.bar(x_labels, y_values, color=color_list[:len(y_values)]) | |
for bar in bars: | |
height = bar.get_height() | |
ax5.text(bar.get_x() + bar.get_width() / 2, height + 0.1, f"{height:.2f} min", | |
ha='center', va='bottom') | |
ax5.set_ylabel("Temps (minutes)") | |
ax5.grid(True, axis='y') | |
plt.tight_layout() | |
plt.show() | |
# ------------------------------------------------------------------ | |
# FONCTIONS POUR SIMULER LA PLUIE ET COMPARER LES PRÉDICTIONS | |
# ------------------------------------------------------------------ | |
import albumentations as A | |
from torchvision import transforms | |
import torch | |
import torch.nn.functional as F | |
import numpy as np | |
from PIL import Image | |
import io | |
import matplotlib.pyplot as plt | |
# Transformation globale (effet pluie) | |
rain_transform = A.Compose([ | |
A.RandomRain( | |
brightness_coefficient=0.9, | |
drop_length=20, | |
drop_width=1, | |
blur_value=3, | |
rain_type='heavy' | |
) | |
]) | |
def apply_rain_effect(image_pil: Image.Image) -> Image.Image: | |
""" | |
Applique l'effet de pluie à une image PIL et renvoie une nouvelle image PIL. | |
""" | |
# Convertir en NumPy | |
image_np = np.array(image_pil) | |
# Appliquer la transformation Albumentations | |
augmented = rain_transform(image=image_np) | |
rain_np = augmented['image'] | |
# Reconvertir en PIL | |
rain_pil = Image.fromarray(rain_np) | |
return rain_pil | |
def predict_mask(model, image_pil, device="cpu", num_classes=8): | |
""" | |
Utilise 'model' (PyTorch) pour prédire le masque de l'image PIL. | |
Retourne un array NumPy (H,W) avec les classes prédites [0..7]. | |
""" | |
# Conversion PIL -> Tensor | |
transform = transforms.ToTensor() # [0..1], shape (3,H,W) | |
image_tensor = transform(image_pil).unsqueeze(0).to(device) | |
model.eval() | |
with torch.no_grad(): | |
outputs = model(image_tensor) | |
# Ex.: si c’est un SegFormer, on accède à outputs.logits | |
if hasattr(outputs, "logits"): | |
logits = outputs.logits | |
elif isinstance(outputs, dict): | |
logits = outputs["out"] | |
else: | |
logits = outputs | |
# Upsample => taille de l'image originale | |
_, _, h_img, w_img = image_tensor.shape | |
logits = F.interpolate( | |
logits, | |
size=(h_img, w_img), | |
mode='bilinear', | |
align_corners=False | |
) | |
# argmax => (H,W) | |
pred_mask = logits.argmax(dim=1).squeeze(0).cpu().numpy() | |
return pred_mask | |
def compare_rain_predictions( | |
baseline_model, | |
new_model, | |
image_path, | |
device="cpu", | |
size=(256,256) | |
): | |
""" | |
1) Charge l'image d'origine. | |
2) Redimensionne en (size), applique la pluie. | |
3) Fait prédire le masque par baseline_model et new_model. | |
4) Retourne un fig (matplotlib) avec 4 colonnes : | |
- image originale | |
- image "pluie" | |
- masque baseline | |
- masque new model | |
""" | |
# 1) Charger et redimensionner l'image | |
pil_image = Image.open(image_path).convert("RGB").resize(size) | |
# 2) Appliquer la pluie | |
rain_pil = apply_rain_effect(pil_image) | |
# 3) Prédictions | |
mask_old = predict_mask(baseline_model, rain_pil, device=device) | |
mask_new = predict_mask(new_model, rain_pil, device=device) | |
# 4) Préparer l'affichage | |
fig, axs = plt.subplots(1, 4, figsize=(16, 5)) | |
axs[0].imshow(np.array(pil_image)) | |
axs[0].set_title("Original") | |
axs[1].imshow(np.array(rain_pil)) | |
axs[1].set_title("Pluie") | |
axs[2].imshow(mask_old, cmap="magma", vmin=0, vmax=7) | |
axs[2].set_title("Masque (baseline)") | |
axs[3].imshow(mask_new, cmap="magma", vmin=0, vmax=7) | |
axs[3].set_title("Masque (nouveau)") | |
for ax in axs: | |
ax.axis("off") | |
plt.tight_layout() | |
return fig |