from __future__ import division import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import cv2 import os import sys from util import convert2cpu as cpu from util import predict_transform class test_net(nn.Module): def __init__(self, num_layers, input_size): super(test_net, self).__init__() self.num_layers= num_layers self.linear_1 = nn.Linear(input_size, 5) self.middle = nn.ModuleList([nn.Linear(5,5) for x in range(num_layers)]) self.output = nn.Linear(5,2) def forward(self, x): x = x.view(-1) fwd = nn.Sequential(self.linear_1, *self.middle, self.output) return fwd(x) def get_test_input(): img = cv2.imread("dog-cycle-car.png") img = cv2.resize(img, (416, 416)) img_ = img[:, :, ::-1].transpose((2, 0, 1)) img_ = img_[np.newaxis, :, :, :]/255.0 img_ = torch.from_numpy(img_).float() return img_ def parse_cfg(cfgfile): """ Takes a configuration file Returns a list of blocks. Each blocks describes a block in the neural network to be built. Block is represented as a dictionary in the list """ # cfgfile = os.path.join(sys.path[-1], cfgfile) file = open(cfgfile, 'r') lines = file.read().split('\n') # store the lines in a list lines = [x for x in lines if len(x) > 0] # get read of the empty lines lines = [x for x in lines if x[0] != '#'] lines = [x.rstrip().lstrip() for x in lines] block = {} blocks = [] for line in lines: if line[0] == "[": # This marks the start of a new block if len(block) != 0: blocks.append(block) block = {} block["type"] = line[1:-1].rstrip() else: key,value = line.split("=") block[key.rstrip()] = value.lstrip() blocks.append(block) return blocks class MaxPoolStride1(nn.Module): def __init__(self, kernel_size): super(MaxPoolStride1, self).__init__() self.kernel_size = kernel_size self.pad = kernel_size - 1 def forward(self, x): padded_x = F.pad(x, (0, self.pad, 0, self.pad), mode="replicate") pooled_x = nn.MaxPool2d(self.kernel_size, self.pad)(padded_x) return pooled_x class EmptyLayer(nn.Module): def __init__(self): super(EmptyLayer, self).__init__() class DetectionLayer(nn.Module): def __init__(self, anchors): super(DetectionLayer, self).__init__() self.anchors = anchors def forward(self, x, inp_dim, num_classes, confidence): x = x.data global CUDA prediction = x prediction = predict_transform(prediction, inp_dim, self.anchors, num_classes, confidence, CUDA) return prediction class Upsample(nn.Module): def __init__(self, stride=2): super(Upsample, self).__init__() self.stride = stride def forward(self, x): stride = self.stride assert(x.data.dim() == 4) B = x.data.size(0) C = x.data.size(1) H = x.data.size(2) W = x.data.size(3) ws = stride hs = stride x = x.view(B, C, H, 1, W, 1).expand(B, C, H, stride, W, stride).contiguous().view(B, C, H*stride, W*stride) return x class ReOrgLayer(nn.Module): def __init__(self, stride=2): super(ReOrgLayer, self).__init__() self.stride= stride def forward(self, x): assert(x.data.dim() == 4) B, C, H, W = x.data.shape hs = self.stride ws = self.stride assert(H % hs == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(H) assert(W % ws == 0), "The stride " + str(self.stride) + " is not a proper divisor of height " + str(W) x = x.view(B, C, H // hs, hs, W // ws, ws).transpose(-2, -3).contiguous() x = x.view(B, C, H // hs * W // ws, hs, ws) x = x.view(B, C, H // hs * W // ws, hs*ws).transpose(-1, -2).contiguous() x = x.view(B, C, ws*hs, H // ws, W // ws).transpose(1, 2).contiguous() x = x.view(B, C*ws*hs, H // ws, W // ws) return x def create_modules(blocks): net_info = blocks[0] # Captures the information about the input and pre-processing module_list = nn.ModuleList() index = 0 # indexing blocks helps with implementing route layers (skip connections) prev_filters = 3 output_filters = [] for x in blocks: module = nn.Sequential() if x["type"] == "net": continue # If it's a convolutional layer if x["type"] == "convolutional": # Get the info about the layer activation = x["activation"] try: batch_normalize = int(x["batch_normalize"]) bias = False except: batch_normalize = 0 bias = True filters= int(x["filters"]) padding = int(x["pad"]) kernel_size = int(x["size"]) stride = int(x["stride"]) if padding: pad = (kernel_size - 1) // 2 else: pad = 0 # Add the convolutional layer conv = nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias = bias) module.add_module("conv_{0}".format(index), conv) # Add the Batch Norm Layer if batch_normalize: bn = nn.BatchNorm2d(filters) module.add_module("batch_norm_{0}".format(index), bn) # Check the activation. # It is either Linear or a Leaky ReLU for YOLO if activation == "leaky": activn = nn.LeakyReLU(0.1, inplace = True) module.add_module("leaky_{0}".format(index), activn) # If it's an upsampling layer # We use Bilinear2dUpsampling elif x["type"] == "upsample": stride = int(x["stride"]) # upsample = Upsample(stride) upsample = nn.Upsample(scale_factor=2, mode="nearest") module.add_module("upsample_{}".format(index), upsample) # If it is a route layer elif (x["type"] == "route"): x["layers"] = x["layers"].split(',') # Start of a route start = int(x["layers"][0]) # end, if there exists one. try: end = int(x["layers"][1]) except: end = 0 # Positive anotation if start > 0: start = start - index if end > 0: end = end - index route = EmptyLayer() module.add_module("route_{0}".format(index), route) if end < 0: filters = output_filters[index + start] + output_filters[index + end] else: filters = output_filters[index + start] # shortcut corresponds to skip connection elif x["type"] == "shortcut": from_ = int(x["from"]) shortcut = EmptyLayer() module.add_module("shortcut_{}".format(index), shortcut) elif x["type"] == "maxpool": stride = int(x["stride"]) size = int(x["size"]) if stride != 1: maxpool = nn.MaxPool2d(size, stride) else: maxpool = MaxPoolStride1(size) module.add_module("maxpool_{}".format(index), maxpool) # Yolo is the detection layer elif x["type"] == "yolo": mask = x["mask"].split(",") mask = [int(x) for x in mask] anchors = x["anchors"].split(",") anchors = [int(a) for a in anchors] anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)] anchors = [anchors[i] for i in mask] detection = DetectionLayer(anchors) module.add_module("Detection_{}".format(index), detection) else: print("Something I dunno") assert False module_list.append(module) prev_filters = filters output_filters.append(filters) index += 1 return (net_info, module_list) class Darknet(nn.Module): def __init__(self, cfgfile): super(Darknet, self).__init__() self.blocks = parse_cfg(cfgfile) self.net_info, self.module_list = create_modules(self.blocks) self.header = torch.IntTensor([0, 0, 0, 0]) self.seen = 0 def get_blocks(self): return self.blocks def get_module_list(self): return self.module_list def forward(self, x, CUDA): detections = [] modules = self.blocks[1:] outputs = {} # We cache the outputs for the route layer write = 0 for i in range(len(modules)): module_type = (modules[i]["type"]) if module_type == "convolutional" or module_type == "upsample" or module_type == "maxpool": x = self.module_list[i](x) outputs[i] = x elif module_type == "route": layers = modules[i]["layers"] layers = [int(a) for a in layers] if (layers[0]) > 0: layers[0] = layers[0] - i if len(layers) == 1: x = outputs[i + (layers[0])] else: if (layers[1]) > 0: layers[1] = layers[1] - i map1 = outputs[i + layers[0]] map2 = outputs[i + layers[1]] x = torch.cat((map1, map2), 1) outputs[i] = x elif module_type == "shortcut": from_ = int(modules[i]["from"]) x = outputs[i-1] + outputs[i+from_] outputs[i] = x elif module_type == 'yolo': anchors = self.module_list[i][0].anchors # Get the input dimensions inp_dim = int(self.net_info["height"]) # Get the number of classes num_classes = int(modules[i]["classes"]) # Output the result x = x.data x = predict_transform(x, inp_dim, anchors, num_classes, CUDA) if type(x) == int: continue if not write: detections = x write = 1 else: detections = torch.cat((detections, x), 1) outputs[i] = outputs[i-1] try: return detections except: return 0 def load_weights(self, weightfile): # Introduction: https://blog.paperspace.com/how-to-implement-a-yolo-v3-object-detector-from-scratch-in-pytorch-part-3/ # Open the weights file # weightfile = os.path.join(sys.path[-1], weightfile) fp = open(weightfile, "rb") # The first 5 values are header information # 1. Major version number # 2. Minor Version Number # 3. Subversion number # 4.5 Images seen by the network (during training) header = np.fromfile(fp, dtype = np.int32, count = 5) self.header = torch.from_numpy(header) self.seen = self.header[3] # The rest of the values are the weights # Let's load them up weights = np.fromfile(fp, dtype = np.float32) ptr = 0 for i in range(len(self.module_list)): module_type = self.blocks[i + 1]["type"] if module_type == "convolutional": model = self.module_list[i] try: batch_normalize = int(self.blocks[i+1]["batch_normalize"]) except: batch_normalize = 0 conv = model[0] if (batch_normalize): bn = model[1] # Get the number of weights of Batch Norm Layer num_bn_biases = bn.bias.numel() # Load the weights bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases]) ptr += num_bn_biases bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases]) ptr += num_bn_biases # Cast the loaded weights into dims of model weights. bn_biases = bn_biases.view_as(bn.bias.data) bn_weights = bn_weights.view_as(bn.weight.data) bn_running_mean = bn_running_mean.view_as(bn.running_mean) bn_running_var = bn_running_var.view_as(bn.running_var) # Copy the data to model bn.bias.data.copy_(bn_biases) bn.weight.data.copy_(bn_weights) bn.running_mean.copy_(bn_running_mean) bn.running_var.copy_(bn_running_var) else: # Number of biases num_biases = conv.bias.numel() # Load the weights conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases]) ptr = ptr + num_biases # reshape the loaded weights according to the dims of the model weights conv_biases = conv_biases.view_as(conv.bias.data) # Finally copy the data conv.bias.data.copy_(conv_biases) # Let us load the weights for the Convolutional layers num_weights = conv.weight.numel() # Do the same as above for weights conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights]) ptr = ptr + num_weights conv_weights = conv_weights.view_as(conv.weight.data) conv.weight.data.copy_(conv_weights)