import numpy as np from ...utils.checked_cast import checked_cast from .model import BinaryOpTypes as BOT from .model import EltwiseOpTypes as EOT from .model import NcnnLayer, NcnnModel class NcnnOptimizer: def __init__(self, model: NcnnModel) -> None: self.model = model def __fuse_batchnorm_scale(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "BatchNorm": # BatchNorm - Scale batchnorm_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Scale": continue if len(self.model.layers[j].inputs) != 1: continue if self.model.layers[j].inputs[0] == batchnorm_output: break else: j += 1 if j == len(self.model.layers): continue # fuse BatchNorm - Scale to BatchNorm scale = self.model.layers[j] bias = layer.weight_data["bias"].weight layer.weight_data["slope"].weight = ( layer.weight_data["slope"].weight * scale.weight_data["scale"].weight ) bias = bias * scale.weight_data["scale"].weight if scale.params[1].value: bias += scale.weight_data["bias"].weight layer.weight_data["bias"].weight = bias layer.outputs[0] = scale.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 scale.op_type = "ncnnfused" def __fuse_x_batchnorm(self): """Combines fuse_convolution_batchnorm, fuse_convolutiondepthwise_batchnorm, fuse_deconvolution_batchnorm, fuse_deconvolutiondepthwise_batchnorm, and fuse_innerproduct_batchnorm""" for i, layer in enumerate(self.model.layers): if layer.op_type in ( "Convolution", "ConvolutionDepthWise", "Deconvolution", "DeconvolutionDepthWise", "InnerProduct", ): # Convolution - BatchNorm conv_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "BatchNorm": continue if len(self.model.layers[j].inputs) != 1: continue if self.model.layers[j].inputs[0] == conv_output: break else: j += 1 if j == len(self.model.layers): continue # fuse Convolution - BatchNorm to Convolution batchnorm = self.model.layers[j] channels = checked_cast(int, batchnorm.params[0].value) eps = checked_cast(float, batchnorm.params[1].value) # a = bias - slope * mean / sqrt(var + eps) # b = slope / sqrt(var + eps) # value = value * b + a a = np.ndarray((channels,)) b = np.ndarray((channels,)) sqrt_var = np.sqrt(batchnorm.weight_data["variance"].weight + eps) a = ( batchnorm.weight_data["bias"].weight - batchnorm.weight_data["slope"].weight * batchnorm.weight_data["mean"].weight / sqrt_var ) b = batchnorm.weight_data["slope"].weight / sqrt_var bias_term = 1 if layer.op_type == "InnerProduct" else 5 if layer.params[bias_term].value == 0: # init bias as zero layer.params[bias_term] = 1 layer.add_weight("bias", np.zeros(channels, dtype=np.float32)) weight = layer.weight_data["weight"].weight layer.weight_data["weight"].weight = weight * ( np.transpose( np.broadcast_to(b, weight.shape[::-1]).astype(weight.dtype), (3, 2, 1, 0), ) ) layer.weight_data["bias"].weight = ( layer.weight_data["bias"].weight * b + a ) self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 batchnorm.op_type = "ncnnfused" def __fuse_x_mul(self): """Combines fuse_convolution_mul, fuse_convolutiondepthwise_mul, and fuse_deconvolution_mul""" for i, layer in enumerate(self.model.layers): if layer.op_type in ( "Convolution", "ConvolutionDepthWise", "Deconvolution", ): # Convolution - BinaryOp output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "BinaryOp": continue if self.model.layers[j].num_inputs != 2: continue if self.model.layers[j].inputs[0] == output: break else: j += 1 if j == len(self.model.layers): continue # fuse Convolution - BinaryOp to Convolution binaryop = self.model.layers[j] if binaryop.params[0].value != BOT.MUL or binaryop.params[1].value: continue # MemoryData - ..... - BinaryOp k = 0 for k in range(j): if self.model.layers[k].op_type != "MemoryData": continue if self.model.layers[k].outputs[0] == binaryop.inputs[1]: break else: k += 1 if k == j: continue memorydata = self.model.layers[k] channels = checked_cast(int, layer.params[0].value) if ( memorydata.params[0].value != channels or memorydata.params[1].value != 0 or memorydata.params[2].value != 0 ): # not bias-like broadcasting type continue data = memorydata.weight_data["data"].weight weight = layer.weight_data["weight"].weight layer.weight_data["weight"].weight = weight * ( np.transpose( np.broadcast_to(data, weight.shape[::-1]).astype(weight.dtype), (3, 2, 1, 0), ) ) try: layer.weight_data["bias"].weight = ( layer.weight_data["bias"].weight * data ) except KeyError: pass self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 binaryop.op_type = "ncnnfused" def __fuse_x_add(self): """Combines fuse_convolution_add, fuse_convolutiondepthwise_add, fuse_deconvolution_add, and fuse_innerproduct_add""" for i, layer in enumerate(self.model.layers): if layer.op_type in ( "Convolution", "ConvolutionDepthWise", "Deconvolution", "InnerProduct", ): # Convolution - Add output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "BinaryOp": continue if self.model.layers[j].num_inputs != 2: continue if self.model.layers[j].inputs[0] == output: break else: j += 1 if j == len(self.model.layers): continue # fuse Convolution - BinaryOp to Convolution binaryop = self.model.layers[j] if binaryop.params[0].value != BOT.ADD or binaryop.params[1].value: continue # MemoryData - ..... - BinaryOp k = 0 for k in range(j): if self.model.layers[k].op_type != "MemoryData": continue if self.model.layers[k].outputs[0] == binaryop.inputs[1]: break else: k += 1 if k == j: continue memorydata = self.model.layers[k] channels = checked_cast(int, layer.params[0].value) if not ( memorydata.params[0].value == channels and memorydata.params[1].value == 0 and memorydata.params[2].value == 0 ) or ( memorydata.params[0].value == 1 and memorydata.params[1].value == 1 and memorydata.params[2].value == channels ): # not bias-like broadcasting type continue bias_term = 1 if layer.op_type == "InnerProduct" else 5 bias_data = memorydata.weight_data["data"].weight.reshape(channels) if layer.params[bias_term].value == 0: # init bias layer.params[bias_term] = 1 layer.add_weight("bias", bias_data) else: layer.weight_data["bias"].weight = ( layer.weight_data["bias"].weight + bias_data ) self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 binaryop.op_type = "ncnnfused" def __fuse_innerproduct_dropout(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "InnerProduct": # InnerProduct - Dropout output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Dropout": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == output: break else: j += 1 if j == len(self.model.layers): continue # fuse InnerProduct - Dropout to InnerProduct dropout = self.model.layers[j] scale = checked_cast(float, dropout.params[0].value) if scale != 1: layer.weight_data["weight"].weight = ( layer.weight_data["weight"].weight * scale ) if layer.params[1].value == 1: layer.weight_data["bias"].weight = ( layer.weight_data["bias"].weight * scale ) self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 dropout.op_type = "ncnnfused" def __fuse_x_activation(self): """Combines fuse_convolution_activation, fuse_convolution1d_activation, fuse_convolutiondepthwise_activation, fuse_deconvolution_activation, fuse_deconvolutiondepthwise_activation, and fuse_innerproduct_activation""" for i, layer in enumerate(self.model.layers): if layer.op_type in ( "Convolution", "Convolution1D", "ConvolutionDepthWise", "Deconvolution", "DeconvolutionDepthWise", "InnerProduct", ): # Convolution - Activation output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type not in ( "ReLU", "Clip", "Sigmoid", "Mish", "Hardswish", ): continue if ( self.model.layers[j].op_type == "Mish" and layer.op_type in ("Deconvolution", "DeconvolutionDepthWise") ) or ( self.model.layers[j].op_type == "HardSwish" and layer.op_type in ( "Convolution1D", "Deconvolution", "DeconvolutionDepthWise", ) ): continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == output: break else: j += 1 if j == len(self.model.layers): continue # fuse Convolution - Activation to Convolution act = self.model.layers[j] if act.op_type == "ReLU": if act.params[0].value == 0: layer.params[9] = 1 else: layer.params[9] = 2 layer.params[10] = [1, checked_cast(float, act.params[0].value)] elif act.op_type == "Clip": layer.params[9] = 3 layer.params[10] = [ 2, checked_cast(float, act.params[0].value), checked_cast(float, act.params[1].value), ] elif act.op_type == "Sigmoid": layer.params[9] = 4 elif act.op_type == "Mish": layer.params[9] = 5 elif act.op_type == "HardSwish": layer.params[9] = 6 layer.params[10] = [ 2, checked_cast(float, act.params[0].value), checked_cast(float, act.params[1].value), ] self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 act.op_type = "ncnnfused" def __fuse_memorydata_binaryop(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "MemoryData": # MemoryData - BinaryOp output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "BinaryOp": continue if self.model.layers[j].num_inputs != 2: continue if ( self.model.layers[j].inputs[0] == output or self.model.layers[j].inputs[1] == output ): break else: j += 1 if j == len(self.model.layers): continue # fuse MemoryData - BinaryOp to BinaryOp binaryop = self.model.layers[j] if ( layer.params[0].value != 1 or layer.params[1].value != 0 or layer.params[2].value != 0 ): # not a scalar continue memorydata_index = 1 if binaryop.inputs[0] == output: op_type = checked_cast(int, binaryop.params[0].value) if op_type == BOT.ADD: memorydata_index = 0 elif op_type == BOT.SUB: binaryop.params[0] = BOT.RSUB memorydata_index = 0 elif op_type == BOT.DIV: binaryop.params[0] = BOT.RDIV memorydata_index = 0 else: # non-interchangeable binaryop continue binaryop.params[1] = 1 binaryop.params[2] = layer.weight_data["data"].weight[0] binaryop.inputs.pop(memorydata_index) self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" i = 0 while i in range(len(self.model.layers)): if self.model.layers[i].op_type != "MemoryData": # MemoryData - Split - BinaryOp output = self.model.layers[i].outputs[0] j0 = i for j0 in range(i + 1, len(self.model.layers)): if self.model.layers[j0].op_type != "Split": continue if self.model.layers[j0].num_inputs != 1: continue if self.model.layers[j0].inputs[0] == output: break else: j0 += 1 if j0 == len(self.model.layers): i += 1 continue split_output_index = -1 j1 = i for j1 in range(i + 1, len(self.model.layers)): if self.model.layers[j1].op_type != "BinaryOp": continue if self.model.layers[j1].num_inputs != 2: continue for k in range(self.model.layers[j0].num_outputs): if ( self.model.layers[j1].inputs[0] == self.model.layers[j0].outputs[k] or self.model.layers[j1].inputs[1] == self.model.layers[j0].outputs[k] ): split_output_index = k break if split_output_index != -1: break else: j1 += 1 if j1 == len(self.model.layers): i += 1 continue # fuse MemoryData - Split - BinaryOp to BinaryOp split = self.model.layers[j0] binaryop = self.model.layers[j1] if ( self.model.layers[i].params[0].value != 1 or self.model.layers[i].params[1].value != 0 or self.model.layers[i].params[2].value != 0 ): # not a scalar i += 1 continue memorydata_index = 1 if binaryop.inputs[0] == split.outputs[split_output_index]: op_type = checked_cast(int, binaryop.params[0].value) if op_type in (BOT.ADD, BOT.MUL, BOT.MAX, BOT.MIN): memorydata_index = 0 elif op_type == BOT.SUB: binaryop.params[0] = BOT.RSUB memorydata_index = 0 elif op_type == BOT.DIV: binaryop.params[0] = BOT.RDIV memorydata_index = 0 else: # non-interchangeable binaryop i += 1 continue binaryop.params[1] = 1 binaryop.params[2] = self.model.layers[i].weight_data["data"].weight[0] binaryop.inputs.pop(memorydata_index) binaryop.num_inputs -= 1 split.outputs.pop(split_output_index) split.num_outputs -= 1 if split.num_outputs == 0: self.model.node_count -= 2 self.model.blob_count -= 2 split.op_type = "ncnnfused" self.model.layers[i].op_type = "ncnnfused" i -= 1 i += 1 def __fuse_binaryop_eltwise(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "BinaryOp": if layer.num_inputs != 2: continue if layer.params[0].value != BOT.ADD or layer.params[1].value: continue # BinaryOp - BinaryOp - BinaryOp input0 = layer.inputs[0] input1 = layer.inputs[1] j0 = 0 for j0 in range(i): if self.model.layers[j0].op_type != "BinaryOp": continue if self.model.layers[j0].num_inputs != 1: continue if self.model.layers[j0].params[0].value != BOT.MUL: continue if self.model.layers[j0].outputs[0] == input0: break else: j0 += 1 j1 = 0 for j1 in range(i): if self.model.layers[j1].op_type != "BinaryOp": continue if self.model.layers[j1].num_inputs != 1: continue if self.model.layers[j1].params[0].value != BOT.MUL: continue if self.model.layers[j1].outputs[0] == input1: break else: j1 += 1 if j0 == i and j1 == i: continue binaryop0 = self.model.layers[j0] binaryop1 = self.model.layers[j1] eltwise = NcnnLayer( "Eltwise", layer.name, layer.num_inputs, layer.num_outputs, layer.inputs, layer.outputs, ) eltwise.add_param(0, EOT.SUM) if j0 != i and j1 != i: # fuse BinaryOp - BinaryOp - BinaryOp to Eltwise eltwise.add_param( 1, [ 2, checked_cast(float, binaryop0.params[2].value), checked_cast(float, binaryop1.params[2].value), ], ) eltwise.inputs[0] = binaryop0.inputs[0] eltwise.inputs[1] = binaryop1.inputs[0] self.model.node_count -= 2 self.model.blob_count -= 2 binaryop0.op_type = "ncnnfused" binaryop1.op_type = "ncnnfused" elif j0 != i and j1 == i: # fuse BinaryOp - X - BinaryOp to Eltwise eltwise.add_param( 1, [2, checked_cast(float, binaryop0.params[2].value), 1.0] ) eltwise.inputs[0] = binaryop0.inputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 binaryop0.op_type = "ncnnfused" else: # fuse X - BinaryOp - BinaryOp to Eltwise eltwise.add_param( 1, [2, 1.0, checked_cast(float, binaryop1.params[2].value)] ) eltwise.inputs[1] = binaryop1.inputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 binaryop1.op_type = "ncnnfused" self.model.layers[i] = eltwise def __eliminate_dropout(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Dropout": if layer.params[0].value != 1: continue # Any - Dropout dropout_input = layer.inputs[0] j = i - 1 for j in range(i - 1, -1, -1): if self.model.layers[j].op_type == "ncnnfused": continue if self.model.layers[j].num_outputs != 1: continue if self.model.layers[j].outputs[0] == dropout_input: break else: j -= 1 if j == -1: continue self.model.layers[j].outputs[0] = layer.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __eliminate_pooling1x1(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Pooling": if ( layer.params[3].value != 0 or layer.params[13].value != 0 or layer.params[14].value != 0 or layer.params[15].value != 0 ): continue if ( layer.params[1].value != 1 or layer.params[11].value != 1 or layer.params[2].value != 1 or layer.params[12].value != 1 ): continue if layer.params[4].value != 0: continue # Any - Pooling pooling_input = layer.inputs[0] top_i = -1 j = i - 1 for j in range(i - 1, -1, -1): if self.model.layers[j].op_type == "ncnnfused": continue for k in range(self.model.layers[j].num_outputs): if self.model.layers[j].outputs[k] == pooling_input: top_i = k break if top_i != -1: break else: j -= 1 if j == -1: continue self.model.layers[j].outputs[top_i] = layer.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __eliminate_noop(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Noop": if layer.num_inputs == 0: # Noop layer.op_type = "ncnnfused" continue # Any - Noop noop_input = layer.inputs[0] j = i - 1 any_k = -1 for j in range(i - 1, -1, -1): if self.model.layers[j].op_type == "ncnnfused": continue link_noop = False for k in range(self.model.layers[j].num_outputs): if self.model.layers[j].outputs[k] == noop_input: link_noop = True any_k = k break if link_noop: break else: j -= 1 if j == -1 or any_k == -1: continue self.model.layers[j].outputs[any_k] = layer.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __eliminate_split(self): blob_input_references = [] for i, layer in enumerate(self.model.layers): for input_name in layer.inputs: blob_input_references.append(input_name) for i, layer in enumerate(self.model.layers): if layer.op_type == "Split": real_split_output_count = 0 real_split_output_index = -1 for j in range(layer.num_outputs): if layer.outputs[j] in blob_input_references: real_split_output_count += 1 real_split_output_index = j if real_split_output_count > 1: continue # Any - Pooling split_input = layer.inputs[0] top_i = -1 j = i - 1 for j in range(i - 1, -1, -1): if self.model.layers[j].op_type == "ncnnfused": continue for k in range(self.model.layers[j].num_outputs): if self.model.layers[j].outputs[k] == split_input: top_i = k break if top_i != -1: break else: j -= 1 if j == -1: continue self.model.layers[j].outputs[top_i] = layer.outputs[ real_split_output_index ] self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __eliminate_orphaned_memorydata(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "MemoryData": # MemoryData - X memdata_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type == "ncnnfused": continue orphaned = True for k in range(self.model.layers[j].num_inputs): if self.model.layers[j].inputs[k] == memdata_output: orphaned = False break if not orphaned: break if j < len(self.model.layers): continue self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __eliminate_reshape_after_global_pooling(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Pooling": if layer.params[4].value == 0: continue # Pooling - Reshape pooling_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Reshape": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == pooling_output: break else: j += 1 if j == len(self.model.layers): continue reshape = self.model.layers[j] if ( reshape.params[1].value != -233 or reshape.params[2].value != -233 or reshape.params[3].value != 0 ): continue layer.outputs[0] = reshape.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 reshape.op_type = "ncnnfused" def __eliminate_flatten_after_global_pooling(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Pooling": if layer.params[4].value == 0: continue # Pooling - Flatten pooling_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Flatten": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == pooling_output: break else: j += 1 if j == len(self.model.layers): continue flatten = self.model.layers[j] layer.outputs[0] = flatten.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 flatten.op_type = "ncnnfused" def __eliminate_flatten_after_innerproduct(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "InnerProduct": # InnerProduct - Flatten inprod_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Flatten": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == inprod_output: break else: j += 1 if j == len(self.model.layers): continue flatten = self.model.layers[j] layer.outputs[0] = flatten.outputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 flatten.op_type = "ncnnfused" def __eliminate_reshape_before_binaryop(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Reshape": if ( layer.params[0].value != 1 or layer.params[1].value != 1 or layer.params[3].value != 1 ): continue # Reshape - BinaryOp reshape_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "BinaryOp": continue if self.model.layers[j].num_inputs != 2: continue if ( self.model.layers[j].inputs[0] == reshape_output or self.model.layers[j].inputs[1] == reshape_output ): break else: j += 1 if j == len(self.model.layers): continue binaryop = self.model.layers[j] input_blob_final = layer.inputs[0] if binaryop.inputs[0] == reshape_output: binaryop.inputs[0] = input_blob_final if binaryop.inputs[1] == reshape_output: binaryop.inputs[1] = input_blob_final self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __replace_reduction_with_global_pooling(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Reduction": if ( layer.params[0].value != 3 or layer.params[1].value != 0 or layer.params[2].value != 1 ): continue axes = checked_cast(list, layer.params[3].value) if len(axes) != 1: continue if axes[0] != 2 and axes[0] != 3: continue # Reduction(2/3) - Reduction(2) reduction1_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Reduction": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == reduction1_output: break else: j += 1 if j == len(self.model.layers): continue reduction2 = self.model.layers[j] if ( reduction2.params[0].value != 3 or reduction2.params[1].value != 0 or reduction2.params[2].value != 1 ): continue axes2 = checked_cast(list, layer.params[3].value) if len(axes2) != 1: continue if axes2[0] != 2: continue pooling = NcnnLayer( "Pooling", reduction2.name, reduction2.num_inputs, reduction2.num_outputs, reduction2.inputs, reduction2.outputs, ) pooling.add_param(0, 1) pooling.add_param(4, 1) self.model.layers[j] = pooling pooling.inputs[0] = layer.inputs[0] self.model.node_count -= 1 self.model.blob_count -= 1 layer.op_type = "ncnnfused" def __replace_prelu_with_leaky_relu(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "PReLU": if layer.params[0].value != 1: continue relu_layer = NcnnLayer( "ReLU", layer.name, layer.num_inputs, layer.num_outputs, layer.inputs, layer.outputs, ) relu_layer.add_param( 0, checked_cast(float, layer.weight_data["slope"].weight[0]) ) self.model.layers[i] = relu_layer def __replace_convolution_with_innerproduct_after_global_pooling(self): for i, layer in enumerate(self.model.layers): if layer.op_type == "Pooling": if layer.params[4].value == 0: continue # Pooling - Convolution pooling_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Convolution": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == pooling_output: break else: j += 1 if j == len(self.model.layers): continue convolution = self.model.layers[j] innerproduct = NcnnLayer( "InnerProduct", convolution.name, convolution.num_inputs, convolution.num_outputs, convolution.inputs, convolution.outputs, ) innerproduct.add_param( 0, checked_cast(int, convolution.params[0].value) ) innerproduct.add_param( 1, checked_cast(int, convolution.params[5].value) ) innerproduct.add_param( 2, checked_cast(int, convolution.params[6].value) ) innerproduct.add_param( 8, checked_cast(int, convolution.params[8].value) ) innerproduct.add_param( 9, checked_cast(int, convolution.params[9].value) ) innerproduct.add_param( 10, checked_cast(list, convolution.params[10].value), ) innerproduct.add_weight( "weight", convolution.weight_data["weight"].weight, convolution.weight_data["weight"].quantize_tag, ) innerproduct.add_weight("bias", convolution.weight_data["bias"].weight) self.model.layers[j] = innerproduct def __replace_convolution_with_innerproduct_after_innerproduct(self): while True: replaced = False for i, layer in enumerate(self.model.layers): if layer.op_type == "InnerProduct": # InnerProduct - Convolution inprod_output = layer.outputs[0] j = i for j in range(i + 1, len(self.model.layers)): if self.model.layers[j].op_type != "Convolution": continue if self.model.layers[j].num_inputs != 1: continue if self.model.layers[j].inputs[0] == inprod_output: break else: j += 1 if j == len(self.model.layers): continue convolution = self.model.layers[j] innerproduct2 = NcnnLayer( "InnerProduct", convolution.name, convolution.num_inputs, convolution.num_outputs, convolution.inputs, convolution.outputs, ) innerproduct2.add_param( 0, checked_cast(int, convolution.params[0].value) ) innerproduct2.add_param( 1, checked_cast(int, convolution.params[5].value) ) innerproduct2.add_param( 2, checked_cast(int, convolution.params[6].value) ) innerproduct2.add_param( 8, checked_cast(int, convolution.params[8].value) ) innerproduct2.add_param( 9, checked_cast(int, convolution.params[9].value) ) innerproduct2.add_param( 10, checked_cast(list, convolution.params[10].value), ) innerproduct2.add_weight( "weight", convolution.weight_data["weight"].weight, convolution.weight_data["weight"].quantize_tag, ) innerproduct2.add_weight( "bias", convolution.weight_data["bias"].weight ) self.model.layers[j] = innerproduct2 replaced = True if not replaced: break def optimize(self) -> None: self.__fuse_batchnorm_scale() self.__fuse_x_batchnorm() self.__fuse_x_mul() self.__fuse_x_add() self.__fuse_innerproduct_dropout() self.__replace_reduction_with_global_pooling() self.__replace_prelu_with_leaky_relu() self.__fuse_x_activation() self.__fuse_memorydata_binaryop() self.__fuse_binaryop_eltwise() self.__eliminate_dropout() self.__eliminate_pooling1x1() self.__eliminate_noop() self.__eliminate_split() self.__eliminate_flatten_after_global_pooling() self.__eliminate_reshape_after_global_pooling() self.__eliminate_reshape_before_binaryop() self.__replace_convolution_with_innerproduct_after_global_pooling() self.__replace_convolution_with_innerproduct_after_innerproduct() self.__eliminate_flatten_after_innerproduct() self.__eliminate_orphaned_memorydata()