Spaces:
Sleeping
Sleeping
Alex Hortua
Creating a faster version with a different approach (Training with a frozen Backbone of COCO images)
b87aa54
import os | |
import torch | |
import torchvision | |
import json | |
import yaml | |
from tqdm import tqdm | |
from torch.utils.data import DataLoader, random_split | |
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor | |
from torchvision.transforms import functional as F | |
from dataset import LegoDataset | |
from evaluate import calculate_map | |
# Load Configuration | |
with open("config.yaml", "r") as f: | |
config = yaml.safe_load(f) | |
# Load Pretrained Model | |
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT", trainable_backbone_layers=config["model"]["trainable_backbone_layers"]) | |
# # Freeze Backbone | |
# for param in model.backbone.parameters(): | |
# param.requires_grad = False | |
# Modify Predictor | |
num_classes = config["model"]["num_classes"] | |
in_features = model.roi_heads.box_predictor.cls_score.in_features | |
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) | |
# Training Setup | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
optimizer = torch.optim.Adam(model.parameters(), lr=config["model"]["learning_rate"]) | |
# Load Dataset | |
image_dir = config["dataset"]["image_dir"] | |
annotation_dir = config["dataset"]["annotation_dir"] | |
dataset = LegoDataset(image_dir, annotation_dir) | |
train_size = int(config["dataset"]["train_split"] * len(dataset)) | |
val_size = len(dataset) - train_size | |
train_dataset, val_dataset = random_split(dataset, [train_size, val_size]) | |
train_loader = DataLoader(train_dataset, batch_size=config["model"]["batch_size"], shuffle=True, collate_fn=lambda x: tuple(zip(*x))) | |
val_loader = DataLoader(val_dataset, batch_size=config["model"]["batch_size"], shuffle=False, collate_fn=lambda x: tuple(zip(*x))) | |
# Logging Function (Writes logs efficiently to file) | |
def log_message(message): | |
with open(config["logs"]["log_dir"], "a") as f: | |
f.write(message + "\n") | |
# Training Function with tqdm progress bar | |
def train_one_epoch(model, optimizer, data_loader, device): | |
model.train() | |
running_loss = 0.0 | |
for batch_idx, (images, targets) in enumerate(tqdm(data_loader, desc="Training Progress")): | |
images = [F.to_tensor(img).to(device) for img in images] | |
targets = [{k: v.to(device) for k, v in t.items()} for t in targets] | |
loss_dict = model(images, targets) | |
loss = sum(loss for loss in loss_dict.values()) | |
if(batch_idx % 100 == 0): | |
log_message(f"Iteration {batch_idx+1}, Loss: {loss.item()}") | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
running_loss += loss.item() | |
return running_loss / len(data_loader) | |
def store_log(loss, mAP): | |
# Save log in JSON for visualization | |
log_json = config["notebooks"]["visualization"] | |
if not os.path.exists(log_json): | |
log_data = {"loss": [], "mAP": []} | |
else: | |
with open(log_json, "r") as f: | |
log_data = json.load(f) | |
log_data["loss"].append(loss) | |
log_data["mAP"].append(mAP) | |
with open(log_json, "w") as f: | |
json.dump(log_data, f, indent=4) | |
# Train Model with Logging | |
num_epochs = config["model"]["epochs"] | |
os.makedirs("models", exist_ok=True) # Ensure model directory exists | |
for epoch in range(num_epochs): | |
log_message(f"Starting Epoch {epoch+1}/{num_epochs}") | |
loss = train_one_epoch(model, optimizer, train_loader, device) | |
# Evaluate mAP after each epoch | |
mAP = calculate_map(model, val_loader, device) | |
##save model | |
torch.save(model.state_dict(), f"models/records/lego_fasterrcnn_{epoch+1}.pth") | |
##store log | |
store_log(loss, mAP) | |
# Save the trained model | |
torch.save(model.state_dict(), "models/lego_fasterrcnn.pth") | |
log_message("Model saved as models/lego_fasterrcnn.pth") | |