Spaces:
Running
on
Zero
Running
on
Zero
import warnings | |
warnings.filterwarnings("ignore") | |
import os | |
import glob | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import torch | |
from torchvision import models, transforms | |
from thop import profile | |
is_flop_cal = False | |
# get the activation | |
def get_activation(model, layer, input_img_data): | |
model.eval() | |
activations = [] | |
inputs = [] | |
def hook(module, input, output): | |
activations.append(output) | |
inputs.append(input[0]) | |
hook_handle = layer.register_forward_hook(hook) | |
with torch.no_grad(): | |
model(input_img_data) | |
hook_handle.remove() | |
return activations, inputs | |
def get_activation_map(frame, layer_name, resnet50, device): | |
# image pre-processing | |
transform = transforms.Compose([ | |
transforms.Resize((224, 224)), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
]) | |
# Apply the transformations (resize and normalize) | |
frame_tensor = transform(frame) | |
# adding index 0 changes the original [C, H, W] shape to [1, C, H, W] | |
if frame_tensor.dim() == 3: | |
frame_tensor = frame_tensor.unsqueeze(0) | |
# print(f'Image dimension: {frame_tensor.shape}') | |
# getting the activation of a given layer | |
conv_idx = layer_name | |
layer_obj = eval(conv_idx) | |
activations, inputs = get_activation(resnet50, layer_obj, frame_tensor) | |
activated_img = activations[0][0] | |
activation_array = activated_img.cpu().numpy() | |
# calculate FLOPs for layer | |
if is_flop_cal == True: | |
flops, params = profile(layer_obj, inputs=(inputs[0],), verbose=False) | |
if params == 0 and isinstance(layer_obj, torch.nn.Conv2d): | |
params = layer_obj.in_channels * layer_obj.out_channels * layer_obj.kernel_size[0] * layer_obj.kernel_size[1] | |
if layer_obj.bias is not None: | |
params += layer_obj.out_channels | |
# print(f"FLOPs for {layer_name}: {flops}, Params: {params}") | |
else: | |
flops, params = None, None | |
return activated_img, activation_array, flops, params | |
def process_video_frame(video_name, frame, frame_number, layer_name, resnet50, device): | |
# create a dictionary to store activation arrays for each layer | |
activations_dict = {} | |
total_flops = 0 | |
total_params = 0 | |
fig_name = f"resnet50_feature_map_layer_{layer_name}" | |
combined_name = f"resnet50_feature_map" | |
activated_img, activation_array, flops, params = get_activation_map(frame, layer_name, resnet50, device) | |
if is_flop_cal == True: | |
total_flops += flops | |
total_params += params | |
# save activation maps as png | |
# png_path = f'../visualisation/resnet50/{video_name}/frame_{frame_number}/' | |
# npy_path = f'../features/resnet50/{video_name}/frame_{frame_number}/' | |
# os.makedirs(png_path, exist_ok=True) | |
# os.makedirs(npy_path, exist_ok=True) | |
# get_activation_png(png_path, fig_name, activated_img) | |
# save activation features as pny | |
# get_activation_npy(npy_path, fig_name, activation_array) | |
# print(f"total FLOPs for Resnet50 layerstack: {total_flops}, Params: {total_params}") | |
frame_npy_path = f'../features/resnet50/{video_name}/frame_{frame_number}_{combined_name}.npy' | |
return activated_img, frame_npy_path, total_flops, total_params | |
def get_activation_png(png_path, fig_name, activated_img, n=8): | |
fig = plt.figure(figsize=(10, 10)) | |
# visualise activation map for 64 channels | |
for i in range(n): | |
for j in range(n): | |
idx = (n * i) + j | |
if idx >= activated_img.shape[0]: | |
break | |
ax = fig.add_subplot(n, n, idx + 1) | |
ax.imshow(activated_img[idx].cpu().numpy(), cmap='viridis') | |
ax.axis('off') | |
# save figures | |
fig_path = f'{png_path}{fig_name}.png' | |
print(fig_path) | |
print("----------------" + '\n') | |
plt.savefig(fig_path) | |
plt.close() | |
def get_activation_npy(npy_path, fig_name, activation_array): | |
np.save(f'{npy_path}{fig_name}.npy', activation_array) | |
if __name__ == '__main__': | |
device_name = "gpu" | |
if device_name == "gpu": | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
else: | |
device = torch.device("cpu") | |
print(f"Running on {'GPU' if device.type == 'cuda' else 'CPU'}") | |
# pre-trained ResNet-50 model to device | |
resnet50 = models.resnet50(pretrained=True).to(device) | |
for idx, (name, layer) in enumerate(resnet50.named_children()): | |
print(f"Index: {idx}, Layer Name: {name}, Layer Type: {type(layer)}") | |
layer_name = 'layer4.2.conv2' | |
video_type = 'test' | |
# Test | |
if video_type == 'test': | |
metadata_path = "../../metadata/test_videos.csv" | |
# NR: | |
elif video_type == 'resolution_ugc': | |
resolution = '360P' | |
metadata_path = f"../../metadata/YOUTUBE_UGC_{resolution}_metadata.csv" | |
else: | |
metadata_path = f'../../metadata/{video_type.upper()}_metadata.csv' | |
ugcdata = pd.read_csv(metadata_path) | |
for i in range(len(ugcdata)): | |
video_name = ugcdata['vid'][i] | |
sampled_frame_path = os.path.join('../..', 'video_sampled_frame', 'sampled_frame', f'{video_name}') | |
print(f"Processing video: {video_name}") | |
image_paths = glob.glob(os.path.join(sampled_frame_path, f'{video_name}_*.png')) | |
frame_number = 0 | |
for image in image_paths: | |
print(f"{image}") | |
frame_number += 1 | |
process_video_frame(video_name, image, frame_number, layer_name, resnet50, device) | |
# # ResNet-50 layers to visualize | |
# layers_to_visualize_resnet50 = { | |
# 'conv1': 0, | |
# 'layer1.0.conv1': 2, | |
# 'layer1.0.conv2': 3, | |
# 'layer1.1.conv1': 5, | |
# 'layer1.1.conv2': 6, | |
# 'layer1.2.conv1': 8, | |
# 'layer1.2.conv2': 9, | |
# 'layer2.0.conv1': 11, | |
# 'layer2.0.conv2': 12, | |
# 'layer2.1.conv1': 14, | |
# 'layer2.1.conv2': 15, | |
# 'layer2.2.conv1': 17, | |
# 'layer2.2.conv2': 18, | |
# 'layer2.3.conv1': 20, | |
# 'layer2.3.conv2': 21, | |
# 'layer3.0.conv1': 23, | |
# 'layer3.0.conv2': 24, | |
# 'layer3.0.downsample.0': 25, | |
# 'layer3.1.conv1': 27, | |
# 'layer3.1.conv2': 28, | |
# 'layer3.2.conv1': 30, | |
# 'layer3.2.conv2': 31, | |
# 'layer3.3.conv1': 33, | |
# 'layer3.3.conv2': 34, | |
# 'layer4.0.conv1': 36, | |
# 'layer4.0.conv2': 37, | |
# 'layer4.0.downsample.0': 38, | |
# 'layer4.1.conv1': 40, | |
# 'layer4.1.conv2': 41, | |
# 'layer4.2.conv1': 43, | |
# 'layer4.2.conv2': 44, | |
# } | |
# Index: 0, Layer Name: conv1, Layer Type: <class 'torch.nn.modules.conv.Conv2d'> | |
# Index: 1, Layer Name: bn1, Layer Type: <class 'torch.nn.modules.batchnorm.BatchNorm2d'> | |
# Index: 2, Layer Name: relu, Layer Type: <class 'torch.nn.modules.activation.ReLU'> | |
# Index: 3, Layer Name: maxpool, Layer Type: <class 'torch.nn.modules.pooling.MaxPool2d'> | |
# Index: 4, Layer Name: layer1, Layer Type: <class 'torch.nn.modules.container.Sequential'> | |
# Index: 5, Layer Name: layer2, Layer Type: <class 'torch.nn.modules.container.Sequential'> | |
# Index: 6, Layer Name: layer3, Layer Type: <class 'torch.nn.modules.container.Sequential'> | |
# Index: 7, Layer Name: layer4, Layer Type: <class 'torch.nn.modules.container.Sequential'> | |
# Index: 8, Layer Name: avgpool, Layer Type: <class 'torch.nn.modules.pooling.AdaptiveAvgPool2d'> | |
# Index: 9, Layer Name: fc, Layer Type: <class 'torch.nn.modules.linear.Linear'> | |