P9_API-Segformer / fonctions_old.py
JimSmith007's picture
Remplace fonctions.py par fonctions_api dans app.py
0479f3b
# fonctions.py
#from config import DATA_DIR, RESULTS_DIR
# -------------------- FONCTIONS DE BASE DATANT DU PROJET 8 --------------------
# fonctions.py
# Importations nécessaires
import os
from cityscapesscripts.helpers.labels import name2label
from cityscapesscripts.preparation.json2labelImg import json2labelImg
import json
import numpy as np
import albumentations as A
import cv2
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from albumentations import Compose, HorizontalFlip, Rotate, OneOf, RandomScale, Blur, GaussNoise, Resize
import matplotlib.pyplot as plt
from typing import List, Tuple
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, MaxPooling2D, UpSampling2D, Concatenate, Resizing, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tqdm import tqdm
from tensorflow.keras.applications import VGG16, ResNet50
from tensorflow.keras.callbacks import EarlyStopping, CSVLogger, ReduceLROnPlateau, ModelCheckpoint
from cityscapesscripts.helpers.labels import trainId2label
import time
import segmentation_models as sm
import pandas as pd
from pathlib import Path
from datetime import datetime
from tensorflow.keras.optimizers import Adam
import glob
import torch
from typing import Tuple
from torchvision import transforms
import torch.nn.functional as F
# Définition des classes utiles
CLASSES_UTILES = {
"void": 0, "flat": 1, "construction": 2, "object": 3,
"nature": 4, "sky": 5, "human": 6, "vehicle": 7
}
# Correction du chemin pour Projet 9
root_path = Path(".") # racine du projet 9
data_path = root_path / "data"
cityscapes_scripts_path = root_path / "notebook/cityscapesScripts/cityscapesscripts"
images_path = data_path / "leftImg8bit"
masks_path = data_path / "gtFine"
class CityscapesDataset(torch.utils.data.Dataset):
def __init__(self, root, split="train", mode="fine", target_type="semantic", image_size=(512, 512)):
from torchvision.datasets import Cityscapes
from torchvision import transforms
self.dataset = Cityscapes(root=root, split=split, mode="fine", target_type="semantic")
self.image_size = image_size
self.transforms = transforms
def __len__(self):
return len(self.dataset)
def __getitem__(self, index):
image, mask = self.dataset[index]
image = image.resize(self.image_size)
mask = mask.resize(self.image_size)
# Convertir l’image en tenseur
image = self.transforms.ToTensor()(image)
# Convertir le masque en tableau numpy puis appliquer le remapping
mask_np = np.array(mask).astype(np.uint8)
mask_remap = remap_classes(mask_np)
mask_tensor = torch.from_numpy(mask_remap).long()
return image, mask_tensor
def remap_classes(mask: np.ndarray) -> np.ndarray:
"""
Convertit les classes Cityscapes originales (0-33) vers les 8 catégories principales définies.
Retourne un masque avec uniquement des valeurs de 0 à 7.
"""
# Nettoyage des valeurs non prévues (ex: 34, 35)
mask = np.where(mask > 33, 0, mask) # Toute valeur > 33 est convertie en void (classe 0)
# Définition précise du mapping basé sur les "labelIds" Cityscapes originaux
labelIds_to_main_classes = {
0: 0, # unlabeled → void
1: 0, # ego vehicle → void
2: 0, # rectification border → void
3: 0, # out of roi → void
4: 0, # static → void
5: 0, # dynamic → void
6: 0, # ground → void
7: 1, # road → flat
8: 1, # sidewalk → flat
9: 0, # parking → void
10: 0, # rail track → void
11: 2, # building → construction
12: 2, # wall → construction
13: 2, # fence → construction
14: 0, # guard rail → void
15: 0, # bridge → void
16: 0, # tunnel → void
17: 3, # pole → object
18: 3, # polegroup → object
19: 3, # traffic light → object
20: 3, # traffic sign → object
21: 4, # vegetation → nature
22: 4, # terrain → nature
23: 5, # sky → sky
24: 6, # person → human
25: 6, # rider → human
26: 7, # car → vehicle
27: 7, # truck → vehicle
28: 7, # bus → vehicle
29: 7, # caravan → vehicle
30: 7, # trailer → vehicle
31: 7, # train → vehicle
32: 7, # motorcycle → vehicle
33: 7 # bicycle → vehicle
}
remapped_mask = np.copy(mask)
for original_class, new_class in labelIds_to_main_classes.items():
remapped_mask[mask == original_class] = new_class
return remapped_mask.astype(np.uint8)
def view_folder(dossier):
dossier = Path(dossier)
if not dossier.exists():
print(f"❌ Le dossier {dossier} n'existe pas.")
return
for sous_dossier in dossier.iterdir():
if sous_dossier.is_dir():
print(f"|-- {sous_dossier.name}")
for sous_sous_dossier in sous_dossier.iterdir():
if sous_sous_dossier.is_dir():
print(f" |-- {sous_sous_dossier.name}")
def load_image(path: str, target_size: Tuple[int, int]) -> np.ndarray:
"""Charge et normalise une image entre 0 et 1."""
img = load_img(path, target_size=target_size)
return img_to_array(img).astype("float32") / 255.0
def load_mask(path: str, target_size: Tuple[int, int], mask_mode="labelIds") -> np.ndarray:
"""
Charge, redimensionne et remappe un masque.
Applique systématiquement le remapping vers les 8 classes principales.
Args:
path (str): Chemin vers le masque.
target_size (Tuple[int, int]): Taille de sortie (hauteur, largeur).
mask_mode (str): "labelIds" pour les masques Cityscapes originaux, "trainIds" sinon.
Returns:
np.ndarray: Masque avec valeurs de classe entre 0 et 7.
"""
mask = load_img(path, target_size=target_size, color_mode="grayscale")
mask = img_to_array(mask).astype("uint8").squeeze()
# Toujours appliquer le remapping pour garantir 8 classes
mask = remap_classes(mask)
return mask
def one_hot_encode_mask(mask: np.ndarray, num_classes: int) -> np.ndarray:
"""Encode un masque en One-Hot."""
# Vérifier les valeurs uniques avant l'encodage
unique_values = np.unique(mask)
if np.any(unique_values >= num_classes):
print(f"Attention : Certaines valeurs de masques dépassent {num_classes-1}: {unique_values}")
mask = np.clip(mask, 0, num_classes - 1)
return np.eye(num_classes, dtype=np.uint8)[mask]
def decode_mask(mask: np.ndarray) -> np.ndarray:
"""Convertit un masque One-Hot en format indexé."""
return np.argmax(mask, axis=-1)
def get_augmentations(image_size: Tuple[int, int]) -> Compose:
"""Définit les transformations Albumentations pour l'entraînement."""
return Compose([
HorizontalFlip(p=0.2),
Rotate(limit=15, p=0.2),
RandomScale(scale_limit=0.1, p=0.2),
Resize(*image_size, interpolation=cv2.INTER_NEAREST)
])
class DataGenerator(Sequence):
def __init__(self, image_paths, mask_paths, image_size=(256, 256), batch_size=16, num_classes=8, # TEST avec 512x512, 1024x1024, 512x1024, 1024x512, 256x512 et 512x256
shuffle=True, augmentation_ratio=1.0, use_cache=False):
self.image_paths = image_paths
self.mask_paths = mask_paths
self.image_size = image_size
self.batch_size = batch_size
self.num_classes = num_classes
self.shuffle = shuffle
self.augmentation_ratio = augmentation_ratio
self.use_cache = use_cache
self.cache = {} # Cache des masques transformés
self.augmentation = get_augmentations(image_size)
self.on_epoch_end()
def __getitem__(self, index):
start_time = time.time()
start = index * self.batch_size
end = start + self.batch_size
batch_image_paths = self.image_paths[start:end]
batch_mask_paths = self.mask_paths[start:end]
batch_images, batch_masks = [], []
for img_path, mask_path in zip(batch_image_paths, batch_mask_paths):
img = load_image(img_path, self.image_size)
if self.use_cache and mask_path in self.cache:
mask = self.cache[mask_path]
else:
mask = load_mask(mask_path, self.image_size, mask_mode="trainIds")
if self.use_cache:
self.cache[mask_path] = mask
if np.random.rand() < self.augmentation_ratio:
augmented = self.augmentation(image=img, mask=mask)
img, mask = augmented["image"], augmented["mask"]
batch_images.append(img)
batch_masks.append(one_hot_encode_mask(mask, self.num_classes))
elapsed_time = time.time() - start_time
# print(f"📊 Génération batch {index} en {elapsed_time:.2f}s")
return np.stack(batch_images), np.stack(batch_masks)
def __len__(self):
"""Renvoie le nombre total de batches par epoch."""
return int(np.ceil(len(self.image_paths) / self.batch_size))
def on_epoch_end(self) -> None:
"""Mélange les données après chaque epoch si shuffle est activé."""
if self.shuffle:
data = list(zip(self.image_paths, self.mask_paths))
np.random.shuffle(data)
self.image_paths, self.mask_paths = zip(*data)
def visualize_batch(self, num_images: int = 5) -> None:
"""Affiche correctement un lot d'images et de masques."""
batch_images, batch_masks = self.__getitem__(0)
num_images = min(num_images, len(batch_images))
fig, axes = plt.subplots(num_images, 2, figsize=(10, num_images * 5))
for i in range(num_images):
axes[i, 0].imshow(batch_images[i])
axes[i, 0].set_title("Image")
axes[i, 0].axis("off")
axes[i, 1].imshow(decode_mask(batch_masks[i]), cmap="inferno")
axes[i, 1].set_title("Mask (decoded)")
axes[i, 1].axis("off")
plt.tight_layout()
plt.show()
# Test du DataGenerator
if __name__ == "__main__":
train_gen = DataGenerator(
image_paths=train_input_img_paths,
mask_paths=train_label_ids_img_paths,
image_size=(256, 256), # TEST avec 512x512
batch_size=16, # TEST: 8, 16 ou 32
num_classes=8,
shuffle=True,
augmentation_ratio=0.5
)
train_gen.visualize_batch(num_images=3)
def on_epoch_end(self) -> None:
"""Mélange les données après chaque epoch si shuffle est activé."""
if self.shuffle:
data = list(zip(self.image_paths, self.mask_paths))
np.random.shuffle(data)
self.image_paths, self.mask_paths = zip(*data)
def visualize_batch(self, num_images: int = 5) -> None:
"""Affiche correctement un lot d'images et de masques."""
batch_images, batch_masks = self.__getitem__(0)
num_images = min(num_images, len(batch_images))
fig, axes = plt.subplots(num_images, 2, figsize=(10, num_images * 5))
for i in range(num_images):
axes[i, 0].imshow(batch_images[i])
axes[i, 0].set_title("Image")
axes[i, 0].axis("off")
axes[i, 1].imshow(decode_mask(batch_masks[i]), cmap="inferno")
axes[i, 1].set_title("Mask (decoded)")
axes[i, 1].axis("off")
plt.tight_layout()
plt.show()
def iou_coef(y_true, y_pred, smooth=1e-6):
"""
Calcule l'Intersection over Union (IoU).
Correction : conversion explicite en float32.
"""
y_true = tf.keras.backend.cast(y_true, "float32")
y_pred = tf.keras.backend.cast(y_pred, "float32")
y_true_f = tf.keras.backend.flatten(y_true)
y_pred_f = tf.keras.backend.flatten(y_pred)
intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) - intersection
return (intersection + smooth) / (union + smooth)
def get_logger(nom_modele: str):
"""
Crée un CSVLogger pour enregistrer les métriques d'entraînement dans un fichier horodaté.
"""
from datetime import datetime
from tensorflow.keras.callbacks import CSVLogger
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = RESULTS_DIR / f"{nom_modele}_{timestamp}.csv"
return CSVLogger(log_filename, separator=",", append=False)
def charger_metriques(dossier_logs):
"""
Charge tous les fichiers CSV de métriques présents dans un dossier.
Args:
dossier_logs (str): Chemin vers le dossier contenant les fichiers CSV.
Returns:
dict: Dictionnaire avec nom du modèle en clé et dataframe en valeur.
"""
fichiers = glob.glob(os.path.join(dossier_logs, "*.csv"))
resultats = {}
for fichier in fichiers:
# Récupère le nom complet du modèle (par exemple unet_mini, unet_vgg16)
nom_modele = "_".join(os.path.basename(fichier).split("_")[:-2])
df = pd.read_csv(fichier)
resultats[nom_modele] = df
return resultats
def tracer_metriques(resultats):
"""
Trace les métriques des différents modèles sur des graphiques.
Args:
resultats (dict): Dictionnaire avec nom modèle et dataframe.
"""
# Palette de couleurs spécifique pour chaque modèle
couleurs = {
"mini": "blue",
"vgg16": "green",
"resnet50": "red",
"efficientnetb3": "purple"
}
plt.figure(figsize=(18, 18))
# Graphique de Loss (Perte)
plt.subplot(3, 2, 1)
for modele, df in resultats.items():
couleur = couleurs.get(modele, "black")
plt.plot(df["loss"], label=f"{modele} Train Loss", color=couleur, linestyle="--")
plt.plot(df["val_loss"], label=f"{modele} Val Loss", color=couleur, linestyle="-")
plt.title("Comparaison des Loss (Perte)")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()
# Graphique Mean IoU
plt.subplot(3, 2, 2)
for modele, df in resultats.items():
couleur = couleurs.get(modele, "black")
if "mean_iou" in df.columns:
plt.plot(df["mean_iou"], label=f"{modele} Train Mean IoU", color=couleur, linestyle="--")
plt.plot(df["val_mean_iou"], label=f"{modele} Val Mean IoU", color=couleur, linestyle="-")
elif "iou_score" in df.columns:
plt.plot(df["iou_score"], label=f"{modele} Train IoU Score", color=couleur, linestyle="--")
plt.plot(df["val_iou_score"], label=f"{modele} Val IoU Score", color=couleur, linestyle="-")
plt.title("Comparaison du Mean IoU / IoU Score")
plt.xlabel("Epochs")
plt.ylabel("Mean IoU")
plt.grid(True)
plt.legend()
# Graphique Dice Coefficient
plt.subplot(3, 2, 3)
for modele, df in resultats.items():
couleur = couleurs.get(modele, "black")
if "dice_coef" in df.columns:
plt.plot(df["dice_coef"], label=f"{modele} Train Dice", color=couleur, linestyle="--")
plt.plot(df["val_dice_coef"], label=f"{modele} Val Dice", color=couleur, linestyle="-")
plt.title("Comparaison du Dice Coefficient")
plt.xlabel("Epochs")
plt.ylabel("Dice Coefficient")
plt.grid(True)
plt.legend()
# Graphique Accuracy
plt.subplot(3, 2, 4)
for modele, df in resultats.items():
couleur = couleurs.get(modele, "black")
if "accuracy" in df.columns:
plt.plot(df["accuracy"], label=f"{modele} Train Accuracy", color=couleur, linestyle="--")
plt.plot(df["val_accuracy"], label=f"{modele} Val Accuracy", color=couleur, linestyle="-")
plt.title("Comparaison de l'Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.grid(True)
plt.legend()
# Graphique Temps d'entraînement par modèle
plt.subplot(3, 1, 3)
temps_entrainement = {}
for modele, df in resultats.items():
couleur = couleurs.get(modele, "black")
if "temps_total_sec" in df.columns:
temps = df["temps_total_sec"].iloc[-1] / 60 # converti en minutes
temps_entrainement[modele] = temps
plt.bar(modele, temps, color=couleur)
plt.text(modele, temps, f"{temps:.2f} min", ha="center", va="bottom")
plt.title("Comparaison du Temps total d'entraînement (en minutes)")
plt.ylabel("Temps (minutes)")
plt.grid(True, axis="y")
plt.tight_layout()
plt.show()
# -------------------- NOUVELLES FONCTIONS POUR PROJET 9 --------------------
def charger_oneformer(num_classes: int = 8):
"""
Charge le modèle OneFormer adapté au dataset Cityscapes.
"""
from transformers import OneFormerForSemanticSegmentation
model = OneFormerForSemanticSegmentation.from_pretrained("nvidia/oneformer_coco_swin_large")
model.config.num_labels = num_classes
return model
def charger_segnext(num_classes: int = 8):
"""
Charge le modèle SegNeXt-L (simplifié avec timm ou autre wrapper).
"""
import timm
model = timm.create_model("segnext_l", pretrained=True, num_classes=num_classes)
return model
def entrainer_model_pytorch(
model,
train_loader,
val_loader,
model_name="model",
epochs=10,
lr=1e-4,
num_classes=8
):
"""
Entraîne un modèle PyTorch de segmentation avec :
- Mixed Precision (torch.cuda.amp)
- GradScaler pour la stabilité
- Scheduler 'ReduceLROnPlateau'
- Gestion de la sortie pour SegFormer (SemanticSegmenterOutput)
ou un simple tenseur
- Upsampling de la sortie pour correspondre au masque (H, W)
- Calcul et log des métriques (accuracy, Dice, IoU) pour train et val
- Mesure du temps par epoch et de la mémoire GPU peak
- Sauvegarde CSV + .pth dans '../resultats_modeles/'
- Génération d'un graphique PNG de l'évolution du Dice et du Mean IoU.
"""
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_sched
from torch.cuda.amp import autocast, GradScaler
from transformers.modeling_outputs import SemanticSegmenterOutput
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# -------- Définition locale des métriques PyTorch (évite doublons) --------
def compute_batch_metrics(pred_logits, target, num_classes):
"""
Calcule accuracy, Dice et IoU moyens (macro) pour un batch.
- pred_logits: (N, C, H, W)
- target: (N, H, W) (valeurs entières [0..num_classes-1])
Retourne un dict: {"accuracy": float, "dice": float, "iou": float}
"""
# 1) Conversion argmax => (N, H, W)
pred = torch.argmax(pred_logits, dim=1)
# 2) Accuracy globale (tous pixels confondus)
correct = (pred == target).sum().item()
total = target.numel() # N*H*W
accuracy = correct / total
# 3) Intersection / union par classe => Dice, IoU
dice_list = []
iou_list = []
for c in range(num_classes):
pred_c = (pred == c)
target_c = (target == c)
inter = (pred_c & target_c).sum().item()
pred_area = pred_c.sum().item()
target_area = target_c.sum().item()
union = pred_area + target_area - inter
# IoU
if union == 0:
# classe absente dans les 2 => convention IoU = 1
iou_c = 1.0
else:
iou_c = inter / union
# Dice = 2*inter / (|pred_c| + |target_c|)
denom = pred_area + target_area
if denom == 0:
dice_c = 1.0
else:
dice_c = 2.0 * inter / denom
dice_list.append(dice_c)
iou_list.append(iou_c)
mean_dice = sum(dice_list) / len(dice_list)
mean_iou = sum(iou_list) / len(iou_list)
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou}
# -------- Setup Optim / Loss / Scheduler / GradScaler --------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True)
scaler = GradScaler()
os.makedirs("../resultats_modeles", exist_ok=True)
# -------- Structure du log --------
log = {
"epoch": [],
"train_loss": [],
"val_loss": [],
"train_accuracy": [],
"train_dice_coef": [],
"train_mean_iou": [],
"val_accuracy": [],
"val_dice_coef": [],
"val_mean_iou": [],
"epoch_time_s": [],
"peak_gpu_mem_mb": []
}
start_time = time.time()
# ============================ BOUCLE D'ENTRAÎNEMENT ============================
for epoch in range(epochs):
# Pour mesurer le pic de mémoire GPU sur l'epoch
torch.cuda.reset_peak_memory_stats(device=device)
epoch_start = time.time()
# -------- TRAIN LOOP --------
model.train()
running_loss = 0.0
running_accuracy = 0.0
running_dice = 0.0
running_iou = 0.0
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"):
images, masks = images.to(device), masks.to(device)
optimizer.zero_grad()
with autocast():
outdict = model(images)
# Gérer SegFormer / DeepLab / simple Tensor
if isinstance(outdict, SemanticSegmenterOutput):
logits = outdict.logits
elif isinstance(outdict, dict):
logits = outdict["out"]
else:
logits = outdict
# Upsample -> (N, C, H, W) = taille de masks
logits = F.interpolate(
logits,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
loss = criterion(logits, masks)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
running_loss += loss.item()
# Calcul des métriques sur ce batch
metrics_batch = compute_batch_metrics(logits, masks, num_classes=num_classes)
running_accuracy += metrics_batch["accuracy"]
running_dice += metrics_batch["dice"]
running_iou += metrics_batch["iou"]
avg_train_loss = running_loss / len(train_loader)
avg_train_accuracy = running_accuracy / len(train_loader)
avg_train_dice = running_dice / len(train_loader)
avg_train_iou = running_iou / len(train_loader)
# -------- VALID LOOP --------
model.eval()
val_running_loss = 0.0
val_running_accuracy = 0.0
val_running_dice = 0.0
val_running_iou = 0.0
with torch.no_grad():
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"):
images, masks = images.to(device), masks.to(device)
with autocast():
outdict = model(images)
if isinstance(outdict, SemanticSegmenterOutput):
logits = outdict.logits
elif isinstance(outdict, dict):
logits = outdict["out"]
else:
logits = outdict
logits = F.interpolate(
logits,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
loss_val = criterion(logits, masks)
val_running_loss += loss_val.item()
metrics_batch_val = compute_batch_metrics(logits, masks, num_classes=num_classes)
val_running_accuracy += metrics_batch_val["accuracy"]
val_running_dice += metrics_batch_val["dice"]
val_running_iou += metrics_batch_val["iou"]
avg_val_loss = val_running_loss / len(val_loader)
avg_val_accuracy = val_running_accuracy / len(val_loader)
avg_val_dice = val_running_dice / len(val_loader)
avg_val_iou = val_running_iou / len(val_loader)
# -------- Scheduler : ReduceLROnPlateau --------
scheduler.step(avg_val_loss)
# -------- Log de fin d’epoch --------
epoch_time = time.time() - epoch_start
peak_mem = torch.cuda.max_memory_allocated(device=device)
peak_mem_mb = peak_mem / (1024 ** 2)
log["epoch"].append(epoch + 1)
log["train_loss"].append(avg_train_loss)
log["val_loss"].append(avg_val_loss)
log["train_accuracy"].append(avg_train_accuracy)
log["train_dice_coef"].append(avg_train_dice)
log["train_mean_iou"].append(avg_train_iou)
log["val_accuracy"].append(avg_val_accuracy)
log["val_dice_coef"].append(avg_val_dice)
log["val_mean_iou"].append(avg_val_iou)
log["epoch_time_s"].append(epoch_time)
log["peak_gpu_mem_mb"].append(peak_mem_mb)
print(
f"📉 Epoch {epoch+1} | "
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | "
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | "
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | "
f"Time: {epoch_time:.1f}s | GPU: {peak_mem_mb:.1f} MB"
)
# ============================ FIN DE L'ENTRAÎNEMENT ============================
total_time = time.time() - start_time
# -------- Sauvegarde du log en CSV --------
df = pd.DataFrame(log)
df["temps_total_sec"] = total_time
os.makedirs("../resultats_modeles", exist_ok=True)
csv_path = f"../resultats_modeles/{model_name}_log.csv"
df.to_csv(csv_path, index=False)
# -------- Sauvegarde des poids --------
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth")
# -------- Génération et sauvegarde d'un graphique (Dice/IoU) --------
plt.figure(figsize=(12, 5))
# Subplot 1 : Dice
plt.subplot(1, 2, 1)
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue")
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange")
plt.title("Dice Coefficient")
plt.xlabel("Epoch")
plt.ylabel("Dice")
plt.legend()
plt.grid(True)
# Subplot 2 : IoU
plt.subplot(1, 2, 2)
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue")
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange")
plt.title("Mean IoU")
plt.xlabel("Epoch")
plt.ylabel("IoU")
plt.legend()
plt.grid(True)
plt.tight_layout()
png_path = f"../resultats_modeles/{model_name}_dice_iou.png"
plt.savefig(png_path, dpi=100)
plt.close()
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.")
print(f"📁 Logs : {csv_path}")
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth")
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}")
def comparer_resultats(dossier='../resultats_modeles'):
"""
Affiche les courbes d'apprentissage de chaque modèle entraîné.
"""
import matplotlib.pyplot as plt
import pandas as pd
import os
plt.figure(figsize=(10, 6))
for file in os.listdir(dossier):
if file.endswith("_log.csv"):
df = pd.read_csv(os.path.join(dossier, file))
nom = file.replace("_log.csv", "")
plt.plot(df["epoch"], df["train_loss"], label=f"{nom} train")
plt.plot(df["epoch"], df["val_loss"], label=f"{nom} val")
plt.title("Courbes d'apprentissage")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# ---------------------- FONCTIONS REECRITE POUR LE PROJET 9 --------------------
def charger_donnees_cityscapes(data_dir: str, batch_size: int = 16, image_size: Tuple[int, int] = (256, 256)):
"""
Charge les données Cityscapes et retourne deux DataLoaders (train et val).
Utilise CityscapesDataset, et applique:
- num_workers=4
- pin_memory=True
pour des perfs optimales sur GPU
"""
from torch.utils.data import DataLoader
train_dataset = CityscapesDataset(root=data_dir, split="train", image_size=image_size)
val_dataset = CityscapesDataset(root=data_dir, split="val", image_size=image_size)
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0,
pin_memory=True
)
return train_loader, val_loader
import matplotlib.patches as mpatches
# Palette colorimétrique douce (8 classes utiles)
PALETTE = {
0: (0, 0, 0), # void → noir
1: (50, 50, 150), # flat → bleu foncé
2: (102, 0, 204), # construction → violet
3: (255, 85, 0), # object → orange
4: (255, 255, 0), # nature → jaune
5: (0, 255, 255), # sky → cyan
6: (255, 0, 255), # human → magenta
7: (255, 255, 255), # vehicle → blanc
}
CLASS_NAMES = {
0: "void",
1: "flat",
2: "construction",
3: "object",
4: "nature",
5: "sky",
6: "human",
7: "vehicle"
}
def decode_cityscapes_mask(mask):
"""
Convertit un masque 2D (valeurs de 0 à 7) en image RGB pour affichage.
"""
h, w = mask.shape
mask_rgb = np.zeros((h, w, 3), dtype=np.uint8)
for class_id, color in PALETTE.items():
mask_rgb[mask == class_id] = color
return mask_rgb
def afficher_image_et_masque(image_tensor, mask_tensor):
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import numpy as np
PALETTE = [
(0, 0, 0), # 0 - void
(100, 0, 200), # 1 - flat
(70, 70, 70), # 2 - construction
(250, 170, 30), # 3 - object
(107, 142, 35), # 4 - nature
(70, 130, 180), # 5 - sky
(220, 20, 60), # 6 - human
(0, 0, 142), # 7 - vehicle
]
PALETTE_NP = np.array(PALETTE) / 255.0
cmap = ListedColormap(PALETTE_NP)
image_np = image_tensor.permute(1, 2, 0).cpu().numpy()
mask_np = mask_tensor.cpu().numpy()
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.imshow(image_np)
plt.title("Image")
plt.axis("off")
plt.subplot(1, 2, 2)
im = plt.imshow(mask_np, cmap=cmap, vmin=0, vmax=7)
cbar = plt.colorbar(im, ticks=range(8))
cbar.ax.set_yticklabels(['void', 'flat', 'construction', 'object', 'nature', 'sky', 'human', 'vehicle'])
cbar.set_label("Catégories", rotation=270, labelpad=15)
plt.title("Masque (8 classes colorisées)")
plt.axis("off")
plt.tight_layout()
plt.show()
def charger_segformer(num_classes=8):
from transformers import SegformerForSemanticSegmentation
model = SegformerForSemanticSegmentation.from_pretrained(
"nvidia/segformer-b5-finetuned-ade-640-640",
num_labels=8,
ignore_mismatched_sizes=True
)
model.config.num_labels = num_classes
model.config.output_hidden_states = False
return model
def charger_deeplabv3plus(num_classes=8):
import torchvision.models.segmentation as models
import torch.nn as nn
model = models.deeplabv3_resnet101(pretrained=True)
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)
return model
class MiniCityscapesDataset(torch.utils.data.Dataset):
def __init__(self, image_paths, mask_paths, image_size=(256, 256)):
self.image_paths = image_paths
self.mask_paths = mask_paths
self.image_size = image_size
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
# Charger l’image et le masque
image_path = self.image_paths[idx]
mask_path = self.mask_paths[idx]
# Charger l’image
from PIL import Image
image = Image.open(image_path).convert("RGB").resize(self.image_size)
# Charger le masque
mask = Image.open(mask_path).convert("L").resize(self.image_size)
# Convertir en tenseur PyTorch
import torchvision.transforms as T
to_tensor = T.ToTensor()
image = to_tensor(image) # shape (3, H, W)
# Numpy + remap classes
import numpy as np
mask_np = np.array(mask, dtype=np.uint8)
# Remap
mask_np = remap_classes(mask_np)
mask_tensor = torch.from_numpy(mask_np).long() # shape (H, W)
return image, mask_tensor
def show_predictions(model, dataset, num_images=3, num_classes=8):
"""
Affiche quelques prédictions vs masques réels depuis un dataset PyTorch.
Gère upsample, SegFormer / DeepLab / etc.
"""
import torch
import matplotlib.pyplot as plt
from transformers.modeling_outputs import SemanticSegmenterOutput
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval().to(device)
fig, axes = plt.subplots(num_images, 3, figsize=(12, 4 * num_images))
for i in range(num_images):
# Choisir un index aléatoire
idx = np.random.randint(0, len(dataset))
image, mask_gt = dataset[idx] # (3, H, W), (H, W)
image_t = image.unsqueeze(0).to(device) # (1, 3, H, W)
mask_gt_np = mask_gt.numpy() # (H, W)
with torch.no_grad():
outdict = model(image_t)
if isinstance(outdict, SemanticSegmenterOutput):
logits = outdict.logits
elif isinstance(outdict, dict):
logits = outdict["out"]
else:
logits = outdict
logits = F.interpolate(
logits,
size=mask_gt.shape,
mode='bilinear',
align_corners=False
)
pred = logits.argmax(dim=1).squeeze(0).cpu().numpy() # (H, W)
# AFFICHAGES
axes[i, 0].imshow(image.permute(1, 2, 0).numpy())
axes[i, 0].set_title("Image")
axes[i, 0].axis("off")
axes[i, 1].imshow(mask_gt_np, cmap="tab10", vmin=0, vmax=num_classes-1)
axes[i, 1].set_title("Masque GT")
axes[i, 1].axis("off")
axes[i, 2].imshow(pred, cmap="tab10", vmin=0, vmax=num_classes-1)
axes[i, 2].set_title("Masque Prédit")
axes[i, 2].axis("off")
plt.tight_layout()
plt.show()
def charger_maskformer(num_classes=8):
"""
Charge un modèle MaskFormer (HuggingFace Transformers)
pour la segmentation.
S'appuie sur un checkpoint préentraîné sur ADE20K.
"""
from transformers import MaskFormerForInstanceSegmentation
# Exemple : "facebook/maskformer-swin-large-ade" (semantic sur ADE20K)
# ou "facebook/maskformer-swin-base-coco" (panoptic/instance, COCO)
# À adapter selon votre besoin.
checkpoint = "facebook/maskformer-swin-large-ade"
model = MaskFormerForInstanceSegmentation.from_pretrained(
checkpoint,
ignore_mismatched_sizes=True # parfois nécessaire si on change num_labels
)
# Ajuster le nombre de classes pour Cityscapes (8)
model.config.num_labels = num_classes
# Facultatif : désactiver l'output des hidden states
model.config.output_hidden_states = False
return model
import torch
import torch.nn.functional as F
def maskformer_aggregator(
class_queries_logits: torch.Tensor,
masks_queries_logits: torch.Tensor
) -> torch.Tensor:
"""
Combine les prédictions de Mask(2)Former (class_queries_logits, masks_queries_logits)
en un tenseur de forme (N, C, H, W) pour la segmentation sémantique.
Hypothèses :
- class_queries_logits: (N, Q, C) [logits par classe pour chaque query]
- masks_queries_logits: (N, Q, H, W) [logits masques (souvent à interpréter en sigmoid)]
Approche naïve :
1) On transforme class_queries_logits en probabilités par softmax sur la dimension 'classe' (C).
2) On applique une sigmoïde sur masks_queries_logits pour obtenir p(query=1) par pixel.
3) On effectue un produit de chacun de ces masques par la proba de sa classe,
puis on somme sur la dimension 'Q' pour obtenir un tenseur (N, C, H, W).
4) On laisse ce tenseur en l'état (non normalisé) pour que CrossEntropyLoss effectue
son propre softmax. On l'appelle 'aggregated_logits'.
Résultat :
aggregated_logits.shape == (N, C, H, W),
que vous pourrez envoyer dans F.cross_entropy(aggregated_logits, targets).
"""
# 1) Softmax sur la dimension 'classe' => shape (N, Q, C)
class_probs = F.softmax(class_queries_logits, dim=2)
# 2) Sigmoïde sur la dimension 'pixel' => shape (N, Q, H, W)
mask_probs = torch.sigmoid(masks_queries_logits)
# 3) Produit puis somme : on fait un Einstein summation ou un broadcasting
# aggregated[b, c, h, w] = sum_q( class_probs[b,q,c] * mask_probs[b,q,h,w] )
aggregated = torch.einsum('bqc, bqhw -> bchw', class_probs, mask_probs)
# Ici, aggregated est un "score" par classe et par pixel, non normalisé.
# CrossEntropyLoss attend un tenseur (N, C, H, W) de logits,
# puis fait un log_softmax interne. aggregated étant positif, on peut
# éventuellement l'écraser un peu. Mais on le laisse tel quel.
return aggregated
def training_for_maskformer(
model,
train_loader,
val_loader,
model_name="maskformer",
epochs=10,
lr=1e-4,
num_classes=8
):
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_sched
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import torch.nn.functional as F
# On importe la fonction aggregator
from fonctions import maskformer_aggregator
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Métriques
def compute_batch_metrics(pred_logits, target, nb_classes):
pred = torch.argmax(pred_logits, dim=1)
correct = (pred == target).sum().item()
total = target.numel()
accuracy = correct / total
dice_list = []
iou_list = []
for c in range(nb_classes):
pred_c = (pred == c)
target_c = (target == c)
inter = (pred_c & target_c).sum().item()
pred_area = pred_c.sum().item()
target_area = target_c.sum().item()
union = pred_area + target_area - inter
iou_c = 1.0 if union == 0 else inter / union
denom = pred_area + target_area
dice_c = 1.0 if denom == 0 else (2.0 * inter / denom)
dice_list.append(dice_c)
iou_list.append(iou_c)
mean_dice = sum(dice_list) / len(dice_list)
mean_iou = sum(iou_list) / len(iou_list)
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou}
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True)
scaler = GradScaler()
os.makedirs("../resultats_modeles", exist_ok=True)
log = {
"epoch": [],
"train_loss": [],
"val_loss": [],
"train_accuracy": [],
"train_dice_coef": [],
"train_mean_iou": [],
"val_accuracy": [],
"val_dice_coef": [],
"val_mean_iou": [],
"epoch_time_s": [],
"peak_gpu_mem_mb": []
}
start_time = time.time()
for epoch in range(epochs):
torch.cuda.reset_peak_memory_stats(device=device)
epoch_start = time.time()
# ---------------- TRAIN ----------------
model.train()
running_loss = 0.0
running_accuracy = 0.0
running_dice = 0.0
running_iou = 0.0
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"):
images, masks = images.to(device), masks.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(images)
# outputs est de type MaskFormerForInstanceSegmentationOutput
class_queries = outputs.class_queries_logits # (N, Q, num_labels)
masks_queries = outputs.masks_queries_logits # (N, Q, h, w)
# On upsample les masques pour correspondre à la taille des ground truth
masks_queries = F.interpolate(
masks_queries,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
# On agrège en un tenseur (N, C, H, W)
aggregated_logits = maskformer_aggregator(class_queries, masks_queries)
loss = criterion(aggregated_logits, masks)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
running_loss += loss.item()
# Métriques
metrics_batch = compute_batch_metrics(aggregated_logits, masks, num_classes)
running_accuracy += metrics_batch["accuracy"]
running_dice += metrics_batch["dice"]
running_iou += metrics_batch["iou"]
avg_train_loss = running_loss / len(train_loader)
avg_train_accuracy = running_accuracy / len(train_loader)
avg_train_dice = running_dice / len(train_loader)
avg_train_iou = running_iou / len(train_loader)
# ---------------- VAL ----------------
model.eval()
val_running_loss = 0.0
val_running_accuracy = 0.0
val_running_dice = 0.0
val_running_iou = 0.0
with torch.no_grad():
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"):
images, masks = images.to(device), masks.to(device)
with autocast():
outputs = model(images)
class_queries = outputs.class_queries_logits
masks_queries = outputs.masks_queries_logits
masks_queries = F.interpolate(
masks_queries,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
aggregated_logits = maskformer_aggregator(class_queries, masks_queries)
loss_val = criterion(aggregated_logits, masks)
val_running_loss += loss_val.item()
val_metrics = compute_batch_metrics(aggregated_logits, masks, num_classes)
val_running_accuracy += val_metrics["accuracy"]
val_running_dice += val_metrics["dice"]
val_running_iou += val_metrics["iou"]
avg_val_loss = val_running_loss / len(val_loader)
avg_val_accuracy = val_running_accuracy / len(val_loader)
avg_val_dice = val_running_dice / len(val_loader)
avg_val_iou = val_running_iou / len(val_loader)
scheduler.step(avg_val_loss)
epoch_time = time.time() - epoch_start
peak_mem = torch.cuda.max_memory_allocated(device=device) / (1024 ** 2)
log["epoch"].append(epoch + 1)
log["train_loss"].append(avg_train_loss)
log["val_loss"].append(avg_val_loss)
log["train_accuracy"].append(avg_train_accuracy)
log["train_dice_coef"].append(avg_train_dice)
log["train_mean_iou"].append(avg_train_iou)
log["val_accuracy"].append(avg_val_accuracy)
log["val_dice_coef"].append(avg_val_dice)
log["val_mean_iou"].append(avg_val_iou)
log["epoch_time_s"].append(epoch_time)
log["peak_gpu_mem_mb"].append(peak_mem)
print(
f"Epoch {epoch+1} | "
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | "
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | "
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | "
f"Time: {epoch_time:.1f}s | GPU: {peak_mem:.1f} MB"
)
total_time = time.time() - start_time
df = pd.DataFrame(log)
df["temps_total_sec"] = total_time
csv_path = f"../resultats_modeles/{model_name}_log.csv"
df.to_csv(csv_path, index=False)
# Sauvegarde du modèle
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth")
# Génération d’un graphique Dice/IoU
plt.figure(figsize=(12, 5))
# Plot Dice
plt.subplot(1, 2, 1)
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue")
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange")
plt.title("Dice Coefficient")
plt.xlabel("Epoch")
plt.ylabel("Dice")
plt.legend()
plt.grid(True)
# Plot IoU
plt.subplot(1, 2, 2)
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue")
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange")
plt.title("Mean IoU")
plt.xlabel("Epoch")
plt.ylabel("IoU")
plt.legend()
plt.grid(True)
plt.tight_layout()
png_path = f"../resultats_modeles/{model_name}_dice_iou.png"
plt.savefig(png_path, dpi=100)
plt.close()
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.")
print(f"📁 Logs : {csv_path}")
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth")
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}")
def training_for_mask2former(
model,
train_loader,
val_loader,
model_name="mask2former",
epochs=10,
lr=1e-4,
num_classes=8
):
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_sched
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import torch.nn.functional as F
from fonctions import maskformer_aggregator
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
def compute_batch_metrics(pred_logits, target, nb_classes):
pred = torch.argmax(pred_logits, dim=1)
correct = (pred == target).sum().item()
total = target.numel()
accuracy = correct / total
dice_list = []
iou_list = []
for c in range(nb_classes):
pred_c = (pred == c)
target_c = (target == c)
inter = (pred_c & target_c).sum().item()
pred_area = pred_c.sum().item()
target_area = target_c.sum().item()
union = pred_area + target_area - inter
iou_c = 1.0 if union == 0 else inter / union
denom = pred_area + target_area
dice_c = 1.0 if denom == 0 else (2.0 * inter / denom)
dice_list.append(dice_c)
iou_list.append(iou_c)
mean_dice = sum(dice_list) / len(dice_list)
mean_iou = sum(iou_list) / len(iou_list)
return {"accuracy": accuracy, "dice": mean_dice, "iou": mean_iou}
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_sched.ReduceLROnPlateau(optimizer, factor=0.5, patience=2, verbose=True)
scaler = GradScaler()
os.makedirs("../resultats_modeles", exist_ok=True)
log = {
"epoch": [],
"train_loss": [],
"val_loss": [],
"train_accuracy": [],
"train_dice_coef": [],
"train_mean_iou": [],
"val_accuracy": [],
"val_dice_coef": [],
"val_mean_iou": [],
"epoch_time_s": [],
"peak_gpu_mem_mb": []
}
start_time = time.time()
for epoch in range(epochs):
torch.cuda.reset_peak_memory_stats(device=device)
epoch_start = time.time()
# ---------------- TRAIN ----------------
model.train()
running_loss = 0.0
running_accuracy = 0.0
running_dice = 0.0
running_iou = 0.0
for images, masks in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{epochs}] Train"):
images, masks = images.to(device), masks.to(device)
optimizer.zero_grad()
with autocast():
outputs = model(images)
# outputs est de type Mask2FormerForUniversalSegmentationOutput
class_queries = outputs.class_queries_logits # (N, Q, num_labels)
masks_queries = outputs.masks_queries_logits # (N, Q, h, w)
masks_queries = F.interpolate(
masks_queries,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
aggregated_logits = maskformer_aggregator(class_queries, masks_queries)
loss = criterion(aggregated_logits, masks)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
running_loss += loss.item()
metrics_batch = compute_batch_metrics(aggregated_logits, masks, num_classes)
running_accuracy += metrics_batch["accuracy"]
running_dice += metrics_batch["dice"]
running_iou += metrics_batch["iou"]
avg_train_loss = running_loss / len(train_loader)
avg_train_accuracy = running_accuracy / len(train_loader)
avg_train_dice = running_dice / len(train_loader)
avg_train_iou = running_iou / len(train_loader)
# ---------------- VAL ----------------
model.eval()
val_running_loss = 0.0
val_running_accuracy = 0.0
val_running_dice = 0.0
val_running_iou = 0.0
with torch.no_grad():
for images, masks in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{epochs}] Val"):
images, masks = images.to(device), masks.to(device)
with autocast():
outputs = model(images)
class_queries = outputs.class_queries_logits
masks_queries = outputs.masks_queries_logits
masks_queries = F.interpolate(
masks_queries,
size=(masks.shape[-2], masks.shape[-1]),
mode='bilinear',
align_corners=False
)
aggregated_logits = maskformer_aggregator(class_queries, masks_queries)
loss_val = criterion(aggregated_logits, masks)
val_running_loss += loss_val.item()
val_metrics = compute_batch_metrics(aggregated_logits, masks, num_classes)
val_running_accuracy += val_metrics["accuracy"]
val_running_dice += val_metrics["dice"]
val_running_iou += val_metrics["iou"]
avg_val_loss = val_running_loss / len(val_loader)
avg_val_accuracy = val_running_accuracy / len(val_loader)
avg_val_dice = val_running_dice / len(val_loader)
avg_val_iou = val_running_iou / len(val_loader)
scheduler.step(avg_val_loss)
epoch_time = time.time() - epoch_start
peak_mem = torch.cuda.max_memory_allocated(device=device) / (1024 ** 2)
log["epoch"].append(epoch + 1)
log["train_loss"].append(avg_train_loss)
log["val_loss"].append(avg_val_loss)
log["train_accuracy"].append(avg_train_accuracy)
log["train_dice_coef"].append(avg_train_dice)
log["train_mean_iou"].append(avg_train_iou)
log["val_accuracy"].append(avg_val_accuracy)
log["val_dice_coef"].append(avg_val_dice)
log["val_mean_iou"].append(avg_val_iou)
log["epoch_time_s"].append(epoch_time)
log["peak_gpu_mem_mb"].append(peak_mem)
print(
f"Epoch {epoch+1} | "
f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | "
f"Train Dice: {avg_train_dice:.4f} | Val Dice: {avg_val_dice:.4f} | "
f"Train IoU: {avg_train_iou:.4f} | Val IoU: {avg_val_iou:.4f} | "
f"Time: {epoch_time:.1f}s | GPU: {peak_mem:.1f} MB"
)
total_time = time.time() - start_time
df = pd.DataFrame(log)
df["temps_total_sec"] = total_time
csv_path = f"../resultats_modeles/{model_name}_log.csv"
df.to_csv(csv_path, index=False)
torch.save(model.state_dict(), f"../resultats_modeles/{model_name}.pth")
# Génération courbes Dice/IoU
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(df["epoch"], df["train_dice_coef"], label="Train Dice", color="blue")
plt.plot(df["epoch"], df["val_dice_coef"], label="Val Dice", color="orange")
plt.title("Dice Coefficient")
plt.xlabel("Epoch")
plt.ylabel("Dice")
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(df["epoch"], df["train_mean_iou"], label="Train IoU", color="blue")
plt.plot(df["epoch"], df["val_mean_iou"], label="Val IoU", color="orange")
plt.title("Mean IoU")
plt.xlabel("Epoch")
plt.ylabel("IoU")
plt.legend()
plt.grid(True)
plt.tight_layout()
png_path = f"../resultats_modeles/{model_name}_dice_iou.png"
plt.savefig(png_path, dpi=100)
plt.close()
print(f"✅ Entraînement {model_name} terminé en {total_time:.1f} secondes.")
print(f"📁 Logs : {csv_path}")
print(f"📁 Modèle : ../resultats_modeles/{model_name}.pth")
print(f"📊 Graphique Dice/IoU sauvegardé : {png_path}")
def show_predictions_maskformer(
model,
dataset,
num_images=3,
num_classes=8
):
"""
Affiche quelques prédictions vs masques réels depuis un dataset PyTorch,
pour un modèle MaskFormer-like (avec class_queries_logits et masks_queries_logits).
1) On récupère `class_queries_logits` et `masks_queries_logits`.
2) On upsample le masks_queries_logits à la taille du masque target.
3) On agrège via maskformer_aggregator pour obtenir un tenseur (N, C, H, W).
4) On calcule un argmax (H, W) pour l'affichage.
"""
import torch
import matplotlib.pyplot as plt
import numpy as np
from torch.cuda.amp import autocast
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval().to(device)
# On importe la fonction aggregator déjà définie
# (celle qui combine class_queries_logits et masks_queries_logits)
from fonctions import maskformer_aggregator
fig, axes = plt.subplots(num_images, 3, figsize=(12, 4 * num_images))
for i in range(num_images):
idx = np.random.randint(0, len(dataset))
image, mask_gt = dataset[idx] # (3, H, W), (H, W)
image_t = image.unsqueeze(0).to(device) # (1, 3, H, W)
mask_gt_np = mask_gt.numpy() # (H, W)
with torch.no_grad(), autocast():
outputs = model(image_t)
# Récupération des logits
class_queries = outputs.class_queries_logits # (1, Q, num_labels)
masks_queries = outputs.masks_queries_logits # (1, Q, h, w)
# Upsample le masks_queries à la taille du mask GT
masks_queries = F.interpolate(
masks_queries,
size=(mask_gt_np.shape[0], mask_gt_np.shape[1]),
mode='bilinear',
align_corners=False
)
# Agrégation => (1, C, H, W)
aggregated_logits = maskformer_aggregator(class_queries, masks_queries)
# Argmax => (H, W)
pred = torch.argmax(aggregated_logits, dim=1).squeeze(0).cpu().numpy()
# AFFICHAGE
if num_images == 1:
# Juste 1 image => axes est un tableau 1D [3 subplots]
ax_img, ax_gt, ax_pred = axes
else:
ax_img, ax_gt, ax_pred = axes[i]
ax_img.imshow(image.permute(1, 2, 0).cpu().numpy())
ax_img.set_title("Image")
ax_img.axis("off")
ax_gt.imshow(mask_gt_np, cmap="tab10", vmin=0, vmax=num_classes-1)
ax_gt.set_title("Masque GT")
ax_gt.axis("off")
ax_pred.imshow(pred, cmap="tab10", vmin=0, vmax=num_classes-1)
ax_pred.set_title("Masque Prédit")
ax_pred.axis("off")
plt.tight_layout()
plt.show()
import matplotlib.pyplot as plt
import pandas as pd
import os
def comparer_modeles(list_csv_files, model_names=None):
"""
Compare plusieurs modèles sur les métriques d'entraînement (loss, dice, iou, accuracy)
et affiche un bar chart du temps total.
Args:
list_csv_files (list): liste des chemins vers les fichiers CSV de logs.
model_names (list): noms courts à afficher en légende. Doit être de même taille que list_csv_files.
Si None, on utilise le nom de fichier.
"""
import os
import pandas as pd
import matplotlib.pyplot as plt
if model_names is None:
model_names = [os.path.splitext(os.path.basename(csv_file))[0] for csv_file in list_csv_files]
# On charge chaque CSV dans un DataFrame, qu'on stocke dans un dict
model_data = {}
for csv_file, name in zip(list_csv_files, model_names):
df = pd.read_csv(csv_file)
model_data[name] = df
# Couleurs prédéfinies pour la cohérence
color_list = ["red", "blue", "green", "purple", "orange", "black"]
# Création de la figure : 3 lignes, 2 colonnes → 5 subplots (le dernier occupant une ligne entière)
fig = plt.figure(figsize=(14, 14))
# -- SUBPLOT 1 : Loss (en haut à gauche) --
ax1 = plt.subplot2grid((3, 2), (0, 0))
ax1.set_title("Comparaison des Loss (Perte)")
ax1.set_xlabel("Epochs")
ax1.set_ylabel("Loss")
for i, (name, df) in enumerate(model_data.items()):
c = color_list[i % len(color_list)]
if "train_loss" in df.columns and "val_loss" in df.columns:
ax1.plot(df["epoch"], df["train_loss"], label=f"{name} Train Loss", color=c, linestyle="--")
ax1.plot(df["epoch"], df["val_loss"], label=f"{name} Val Loss", color=c, linestyle="-")
ax1.grid(True)
ax1.legend()
# -- SUBPLOT 2 : Accuracy (en haut à droite) --
ax2 = plt.subplot2grid((3, 2), (0, 1))
ax2.set_title("Comparaison de l'Accuracy")
ax2.set_xlabel("Epochs")
ax2.set_ylabel("Accuracy")
for i, (name, df) in enumerate(model_data.items()):
c = color_list[i % len(color_list)]
if "train_accuracy" in df.columns and "val_accuracy" in df.columns:
ax2.plot(df["epoch"], df["train_accuracy"], label=f"{name} Train Acc", color=c, linestyle="--")
ax2.plot(df["epoch"], df["val_accuracy"], label=f"{name} Val Acc", color=c, linestyle="-")
ax2.grid(True)
ax2.legend()
# -- SUBPLOT 3 : Dice (en bas à gauche) --
ax3 = plt.subplot2grid((3, 2), (1, 0))
ax3.set_title("Comparaison du Dice Coefficient")
ax3.set_xlabel("Epochs")
ax3.set_ylabel("Dice Coefficient")
for i, (name, df) in enumerate(model_data.items()):
c = color_list[i % len(color_list)]
if "train_dice_coef" in df.columns and "val_dice_coef" in df.columns:
ax3.plot(df["epoch"], df["train_dice_coef"], label=f"{name} Train Dice", color=c, linestyle="--")
ax3.plot(df["epoch"], df["val_dice_coef"], label=f"{name} Val Dice", color=c, linestyle="-")
ax3.grid(True)
ax3.legend()
# -- SUBPLOT 4 : Mean IoU (en bas à droite) --
ax4 = plt.subplot2grid((3, 2), (1, 1))
ax4.set_title("Comparaison du Mean IoU")
ax4.set_xlabel("Epochs")
ax4.set_ylabel("Mean IoU")
for i, (name, df) in enumerate(model_data.items()):
c = color_list[i % len(color_list)]
if "train_mean_iou" in df.columns and "val_mean_iou" in df.columns:
ax4.plot(df["epoch"], df["train_mean_iou"], label=f"{name} Train IoU", color=c, linestyle="--")
ax4.plot(df["epoch"], df["val_mean_iou"], label=f"{name} Val IoU", color=c, linestyle="-")
ax4.grid(True)
ax4.legend()
# -- SUBPLOT 5 : Temps total (bar chart) --
ax5 = plt.subplot2grid((3, 2), (2, 0), colspan=2)
ax5.set_title("Comparaison du Temps total d'entraînement (en minutes)")
training_times = []
for i, (name, df) in enumerate(model_data.items()):
if "temps_total_sec" in df.columns:
total_time_sec = df["temps_total_sec"].iloc[-1]
total_time_min = total_time_sec / 60
else:
total_time_min = 0
training_times.append((name, total_time_min))
x_labels = [t[0] for t in training_times]
y_values = [t[1] for t in training_times]
bars = ax5.bar(x_labels, y_values, color=color_list[:len(y_values)])
for bar in bars:
height = bar.get_height()
ax5.text(bar.get_x() + bar.get_width() / 2, height + 0.1, f"{height:.2f} min",
ha='center', va='bottom')
ax5.set_ylabel("Temps (minutes)")
ax5.grid(True, axis='y')
plt.tight_layout()
plt.show()
# ------------------------------------------------------------------
# FONCTIONS POUR SIMULER LA PLUIE ET COMPARER LES PRÉDICTIONS
# ------------------------------------------------------------------
import albumentations as A
from torchvision import transforms
import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
import io
import matplotlib.pyplot as plt
# Transformation globale (effet pluie)
rain_transform = A.Compose([
A.RandomRain(
brightness_coefficient=0.9,
drop_length=20,
drop_width=1,
blur_value=3,
rain_type='heavy'
)
])
def apply_rain_effect(image_pil: Image.Image) -> Image.Image:
"""
Applique l'effet de pluie à une image PIL et renvoie une nouvelle image PIL.
"""
# Convertir en NumPy
image_np = np.array(image_pil)
# Appliquer la transformation Albumentations
augmented = rain_transform(image=image_np)
rain_np = augmented['image']
# Reconvertir en PIL
rain_pil = Image.fromarray(rain_np)
return rain_pil
def predict_mask(model, image_pil, device="cpu", num_classes=8):
"""
Utilise 'model' (PyTorch) pour prédire le masque de l'image PIL.
Retourne un array NumPy (H,W) avec les classes prédites [0..7].
"""
# Conversion PIL -> Tensor
transform = transforms.ToTensor() # [0..1], shape (3,H,W)
image_tensor = transform(image_pil).unsqueeze(0).to(device)
model.eval()
with torch.no_grad():
outputs = model(image_tensor)
# Ex.: si c’est un SegFormer, on accède à outputs.logits
if hasattr(outputs, "logits"):
logits = outputs.logits
elif isinstance(outputs, dict):
logits = outputs["out"]
else:
logits = outputs
# Upsample => taille de l'image originale
_, _, h_img, w_img = image_tensor.shape
logits = F.interpolate(
logits,
size=(h_img, w_img),
mode='bilinear',
align_corners=False
)
# argmax => (H,W)
pred_mask = logits.argmax(dim=1).squeeze(0).cpu().numpy()
return pred_mask
def compare_rain_predictions(
baseline_model,
new_model,
image_path,
device="cpu",
size=(256,256)
):
"""
1) Charge l'image d'origine.
2) Redimensionne en (size), applique la pluie.
3) Fait prédire le masque par baseline_model et new_model.
4) Retourne un fig (matplotlib) avec 4 colonnes :
- image originale
- image "pluie"
- masque baseline
- masque new model
"""
# 1) Charger et redimensionner l'image
pil_image = Image.open(image_path).convert("RGB").resize(size)
# 2) Appliquer la pluie
rain_pil = apply_rain_effect(pil_image)
# 3) Prédictions
mask_old = predict_mask(baseline_model, rain_pil, device=device)
mask_new = predict_mask(new_model, rain_pil, device=device)
# 4) Préparer l'affichage
fig, axs = plt.subplots(1, 4, figsize=(16, 5))
axs[0].imshow(np.array(pil_image))
axs[0].set_title("Original")
axs[1].imshow(np.array(rain_pil))
axs[1].set_title("Pluie")
axs[2].imshow(mask_old, cmap="magma", vmin=0, vmax=7)
axs[2].set_title("Masque (baseline)")
axs[3].imshow(mask_new, cmap="magma", vmin=0, vmax=7)
axs[3].set_title("Masque (nouveau)")
for ax in axs:
ax.axis("off")
plt.tight_layout()
return fig