bilegentile's picture
Upload folder using huggingface_hub
c19ca42 verified
raw
history blame contribute delete
45.1 kB
import numpy as np
from ...utils.checked_cast import checked_cast
from .model import BinaryOpTypes as BOT
from .model import EltwiseOpTypes as EOT
from .model import NcnnLayer, NcnnModel
class NcnnOptimizer:
def __init__(self, model: NcnnModel) -> None:
self.model = model
def __fuse_batchnorm_scale(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "BatchNorm":
# BatchNorm - Scale
batchnorm_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Scale":
continue
if len(self.model.layers[j].inputs) != 1:
continue
if self.model.layers[j].inputs[0] == batchnorm_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse BatchNorm - Scale to BatchNorm
scale = self.model.layers[j]
bias = layer.weight_data["bias"].weight
layer.weight_data["slope"].weight = (
layer.weight_data["slope"].weight
* scale.weight_data["scale"].weight
)
bias = bias * scale.weight_data["scale"].weight
if scale.params[1].value:
bias += scale.weight_data["bias"].weight
layer.weight_data["bias"].weight = bias
layer.outputs[0] = scale.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
scale.op_type = "ncnnfused"
def __fuse_x_batchnorm(self):
"""Combines fuse_convolution_batchnorm, fuse_convolutiondepthwise_batchnorm,
fuse_deconvolution_batchnorm, fuse_deconvolutiondepthwise_batchnorm, and
fuse_innerproduct_batchnorm"""
for i, layer in enumerate(self.model.layers):
if layer.op_type in (
"Convolution",
"ConvolutionDepthWise",
"Deconvolution",
"DeconvolutionDepthWise",
"InnerProduct",
):
# Convolution - BatchNorm
conv_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "BatchNorm":
continue
if len(self.model.layers[j].inputs) != 1:
continue
if self.model.layers[j].inputs[0] == conv_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse Convolution - BatchNorm to Convolution
batchnorm = self.model.layers[j]
channels = checked_cast(int, batchnorm.params[0].value)
eps = checked_cast(float, batchnorm.params[1].value)
# a = bias - slope * mean / sqrt(var + eps)
# b = slope / sqrt(var + eps)
# value = value * b + a
a = np.ndarray((channels,))
b = np.ndarray((channels,))
sqrt_var = np.sqrt(batchnorm.weight_data["variance"].weight + eps)
a = (
batchnorm.weight_data["bias"].weight
- batchnorm.weight_data["slope"].weight
* batchnorm.weight_data["mean"].weight
/ sqrt_var
)
b = batchnorm.weight_data["slope"].weight / sqrt_var
bias_term = 1 if layer.op_type == "InnerProduct" else 5
if layer.params[bias_term].value == 0:
# init bias as zero
layer.params[bias_term] = 1
layer.add_weight("bias", np.zeros(channels, dtype=np.float32))
weight = layer.weight_data["weight"].weight
layer.weight_data["weight"].weight = weight * (
np.transpose(
np.broadcast_to(b, weight.shape[::-1]).astype(weight.dtype),
(3, 2, 1, 0),
)
)
layer.weight_data["bias"].weight = (
layer.weight_data["bias"].weight * b + a
)
self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
batchnorm.op_type = "ncnnfused"
def __fuse_x_mul(self):
"""Combines fuse_convolution_mul, fuse_convolutiondepthwise_mul,
and fuse_deconvolution_mul"""
for i, layer in enumerate(self.model.layers):
if layer.op_type in (
"Convolution",
"ConvolutionDepthWise",
"Deconvolution",
):
# Convolution - BinaryOp
output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "BinaryOp":
continue
if self.model.layers[j].num_inputs != 2:
continue
if self.model.layers[j].inputs[0] == output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse Convolution - BinaryOp to Convolution
binaryop = self.model.layers[j]
if binaryop.params[0].value != BOT.MUL or binaryop.params[1].value:
continue
# MemoryData - ..... - BinaryOp
k = 0
for k in range(j):
if self.model.layers[k].op_type != "MemoryData":
continue
if self.model.layers[k].outputs[0] == binaryop.inputs[1]:
break
else:
k += 1
if k == j:
continue
memorydata = self.model.layers[k]
channels = checked_cast(int, layer.params[0].value)
if (
memorydata.params[0].value != channels
or memorydata.params[1].value != 0
or memorydata.params[2].value != 0
):
# not bias-like broadcasting type
continue
data = memorydata.weight_data["data"].weight
weight = layer.weight_data["weight"].weight
layer.weight_data["weight"].weight = weight * (
np.transpose(
np.broadcast_to(data, weight.shape[::-1]).astype(weight.dtype),
(3, 2, 1, 0),
)
)
try:
layer.weight_data["bias"].weight = (
layer.weight_data["bias"].weight * data
)
except KeyError:
pass
self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
binaryop.op_type = "ncnnfused"
def __fuse_x_add(self):
"""Combines fuse_convolution_add, fuse_convolutiondepthwise_add,
fuse_deconvolution_add, and fuse_innerproduct_add"""
for i, layer in enumerate(self.model.layers):
if layer.op_type in (
"Convolution",
"ConvolutionDepthWise",
"Deconvolution",
"InnerProduct",
):
# Convolution - Add
output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "BinaryOp":
continue
if self.model.layers[j].num_inputs != 2:
continue
if self.model.layers[j].inputs[0] == output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse Convolution - BinaryOp to Convolution
binaryop = self.model.layers[j]
if binaryop.params[0].value != BOT.ADD or binaryop.params[1].value:
continue
# MemoryData - ..... - BinaryOp
k = 0
for k in range(j):
if self.model.layers[k].op_type != "MemoryData":
continue
if self.model.layers[k].outputs[0] == binaryop.inputs[1]:
break
else:
k += 1
if k == j:
continue
memorydata = self.model.layers[k]
channels = checked_cast(int, layer.params[0].value)
if not (
memorydata.params[0].value == channels
and memorydata.params[1].value == 0
and memorydata.params[2].value == 0
) or (
memorydata.params[0].value == 1
and memorydata.params[1].value == 1
and memorydata.params[2].value == channels
):
# not bias-like broadcasting type
continue
bias_term = 1 if layer.op_type == "InnerProduct" else 5
bias_data = memorydata.weight_data["data"].weight.reshape(channels)
if layer.params[bias_term].value == 0:
# init bias
layer.params[bias_term] = 1
layer.add_weight("bias", bias_data)
else:
layer.weight_data["bias"].weight = (
layer.weight_data["bias"].weight + bias_data
)
self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
binaryop.op_type = "ncnnfused"
def __fuse_innerproduct_dropout(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "InnerProduct":
# InnerProduct - Dropout
output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Dropout":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse InnerProduct - Dropout to InnerProduct
dropout = self.model.layers[j]
scale = checked_cast(float, dropout.params[0].value)
if scale != 1:
layer.weight_data["weight"].weight = (
layer.weight_data["weight"].weight * scale
)
if layer.params[1].value == 1:
layer.weight_data["bias"].weight = (
layer.weight_data["bias"].weight * scale
)
self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
dropout.op_type = "ncnnfused"
def __fuse_x_activation(self):
"""Combines fuse_convolution_activation, fuse_convolution1d_activation,
fuse_convolutiondepthwise_activation, fuse_deconvolution_activation,
fuse_deconvolutiondepthwise_activation, and fuse_innerproduct_activation"""
for i, layer in enumerate(self.model.layers):
if layer.op_type in (
"Convolution",
"Convolution1D",
"ConvolutionDepthWise",
"Deconvolution",
"DeconvolutionDepthWise",
"InnerProduct",
):
# Convolution - Activation
output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type not in (
"ReLU",
"Clip",
"Sigmoid",
"Mish",
"Hardswish",
):
continue
if (
self.model.layers[j].op_type == "Mish"
and layer.op_type in ("Deconvolution", "DeconvolutionDepthWise")
) or (
self.model.layers[j].op_type == "HardSwish"
and layer.op_type
in (
"Convolution1D",
"Deconvolution",
"DeconvolutionDepthWise",
)
):
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == output:
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse Convolution - Activation to Convolution
act = self.model.layers[j]
if act.op_type == "ReLU":
if act.params[0].value == 0:
layer.params[9] = 1
else:
layer.params[9] = 2
layer.params[10] = [1, checked_cast(float, act.params[0].value)]
elif act.op_type == "Clip":
layer.params[9] = 3
layer.params[10] = [
2,
checked_cast(float, act.params[0].value),
checked_cast(float, act.params[1].value),
]
elif act.op_type == "Sigmoid":
layer.params[9] = 4
elif act.op_type == "Mish":
layer.params[9] = 5
elif act.op_type == "HardSwish":
layer.params[9] = 6
layer.params[10] = [
2,
checked_cast(float, act.params[0].value),
checked_cast(float, act.params[1].value),
]
self.model.layers[i].outputs[0] = self.model.layers[j].outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
act.op_type = "ncnnfused"
def __fuse_memorydata_binaryop(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "MemoryData":
# MemoryData - BinaryOp
output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "BinaryOp":
continue
if self.model.layers[j].num_inputs != 2:
continue
if (
self.model.layers[j].inputs[0] == output
or self.model.layers[j].inputs[1] == output
):
break
else:
j += 1
if j == len(self.model.layers):
continue
# fuse MemoryData - BinaryOp to BinaryOp
binaryop = self.model.layers[j]
if (
layer.params[0].value != 1
or layer.params[1].value != 0
or layer.params[2].value != 0
):
# not a scalar
continue
memorydata_index = 1
if binaryop.inputs[0] == output:
op_type = checked_cast(int, binaryop.params[0].value)
if op_type == BOT.ADD:
memorydata_index = 0
elif op_type == BOT.SUB:
binaryop.params[0] = BOT.RSUB
memorydata_index = 0
elif op_type == BOT.DIV:
binaryop.params[0] = BOT.RDIV
memorydata_index = 0
else:
# non-interchangeable binaryop
continue
binaryop.params[1] = 1
binaryop.params[2] = layer.weight_data["data"].weight[0]
binaryop.inputs.pop(memorydata_index)
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
i = 0
while i in range(len(self.model.layers)):
if self.model.layers[i].op_type != "MemoryData":
# MemoryData - Split - BinaryOp
output = self.model.layers[i].outputs[0]
j0 = i
for j0 in range(i + 1, len(self.model.layers)):
if self.model.layers[j0].op_type != "Split":
continue
if self.model.layers[j0].num_inputs != 1:
continue
if self.model.layers[j0].inputs[0] == output:
break
else:
j0 += 1
if j0 == len(self.model.layers):
i += 1
continue
split_output_index = -1
j1 = i
for j1 in range(i + 1, len(self.model.layers)):
if self.model.layers[j1].op_type != "BinaryOp":
continue
if self.model.layers[j1].num_inputs != 2:
continue
for k in range(self.model.layers[j0].num_outputs):
if (
self.model.layers[j1].inputs[0]
== self.model.layers[j0].outputs[k]
or self.model.layers[j1].inputs[1]
== self.model.layers[j0].outputs[k]
):
split_output_index = k
break
if split_output_index != -1:
break
else:
j1 += 1
if j1 == len(self.model.layers):
i += 1
continue
# fuse MemoryData - Split - BinaryOp to BinaryOp
split = self.model.layers[j0]
binaryop = self.model.layers[j1]
if (
self.model.layers[i].params[0].value != 1
or self.model.layers[i].params[1].value != 0
or self.model.layers[i].params[2].value != 0
):
# not a scalar
i += 1
continue
memorydata_index = 1
if binaryop.inputs[0] == split.outputs[split_output_index]:
op_type = checked_cast(int, binaryop.params[0].value)
if op_type in (BOT.ADD, BOT.MUL, BOT.MAX, BOT.MIN):
memorydata_index = 0
elif op_type == BOT.SUB:
binaryop.params[0] = BOT.RSUB
memorydata_index = 0
elif op_type == BOT.DIV:
binaryop.params[0] = BOT.RDIV
memorydata_index = 0
else:
# non-interchangeable binaryop
i += 1
continue
binaryop.params[1] = 1
binaryop.params[2] = self.model.layers[i].weight_data["data"].weight[0]
binaryop.inputs.pop(memorydata_index)
binaryop.num_inputs -= 1
split.outputs.pop(split_output_index)
split.num_outputs -= 1
if split.num_outputs == 0:
self.model.node_count -= 2
self.model.blob_count -= 2
split.op_type = "ncnnfused"
self.model.layers[i].op_type = "ncnnfused"
i -= 1
i += 1
def __fuse_binaryop_eltwise(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "BinaryOp":
if layer.num_inputs != 2:
continue
if layer.params[0].value != BOT.ADD or layer.params[1].value:
continue
# BinaryOp - BinaryOp - BinaryOp
input0 = layer.inputs[0]
input1 = layer.inputs[1]
j0 = 0
for j0 in range(i):
if self.model.layers[j0].op_type != "BinaryOp":
continue
if self.model.layers[j0].num_inputs != 1:
continue
if self.model.layers[j0].params[0].value != BOT.MUL:
continue
if self.model.layers[j0].outputs[0] == input0:
break
else:
j0 += 1
j1 = 0
for j1 in range(i):
if self.model.layers[j1].op_type != "BinaryOp":
continue
if self.model.layers[j1].num_inputs != 1:
continue
if self.model.layers[j1].params[0].value != BOT.MUL:
continue
if self.model.layers[j1].outputs[0] == input1:
break
else:
j1 += 1
if j0 == i and j1 == i:
continue
binaryop0 = self.model.layers[j0]
binaryop1 = self.model.layers[j1]
eltwise = NcnnLayer(
"Eltwise",
layer.name,
layer.num_inputs,
layer.num_outputs,
layer.inputs,
layer.outputs,
)
eltwise.add_param(0, EOT.SUM)
if j0 != i and j1 != i:
# fuse BinaryOp - BinaryOp - BinaryOp to Eltwise
eltwise.add_param(
1,
[
2,
checked_cast(float, binaryop0.params[2].value),
checked_cast(float, binaryop1.params[2].value),
],
)
eltwise.inputs[0] = binaryop0.inputs[0]
eltwise.inputs[1] = binaryop1.inputs[0]
self.model.node_count -= 2
self.model.blob_count -= 2
binaryop0.op_type = "ncnnfused"
binaryop1.op_type = "ncnnfused"
elif j0 != i and j1 == i:
# fuse BinaryOp - X - BinaryOp to Eltwise
eltwise.add_param(
1, [2, checked_cast(float, binaryop0.params[2].value), 1.0]
)
eltwise.inputs[0] = binaryop0.inputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
binaryop0.op_type = "ncnnfused"
else:
# fuse X - BinaryOp - BinaryOp to Eltwise
eltwise.add_param(
1, [2, 1.0, checked_cast(float, binaryop1.params[2].value)]
)
eltwise.inputs[1] = binaryop1.inputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
binaryop1.op_type = "ncnnfused"
self.model.layers[i] = eltwise
def __eliminate_dropout(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Dropout":
if layer.params[0].value != 1:
continue
# Any - Dropout
dropout_input = layer.inputs[0]
j = i - 1
for j in range(i - 1, -1, -1):
if self.model.layers[j].op_type == "ncnnfused":
continue
if self.model.layers[j].num_outputs != 1:
continue
if self.model.layers[j].outputs[0] == dropout_input:
break
else:
j -= 1
if j == -1:
continue
self.model.layers[j].outputs[0] = layer.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __eliminate_pooling1x1(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Pooling":
if (
layer.params[3].value != 0
or layer.params[13].value != 0
or layer.params[14].value != 0
or layer.params[15].value != 0
):
continue
if (
layer.params[1].value != 1
or layer.params[11].value != 1
or layer.params[2].value != 1
or layer.params[12].value != 1
):
continue
if layer.params[4].value != 0:
continue
# Any - Pooling
pooling_input = layer.inputs[0]
top_i = -1
j = i - 1
for j in range(i - 1, -1, -1):
if self.model.layers[j].op_type == "ncnnfused":
continue
for k in range(self.model.layers[j].num_outputs):
if self.model.layers[j].outputs[k] == pooling_input:
top_i = k
break
if top_i != -1:
break
else:
j -= 1
if j == -1:
continue
self.model.layers[j].outputs[top_i] = layer.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __eliminate_noop(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Noop":
if layer.num_inputs == 0:
# Noop
layer.op_type = "ncnnfused"
continue
# Any - Noop
noop_input = layer.inputs[0]
j = i - 1
any_k = -1
for j in range(i - 1, -1, -1):
if self.model.layers[j].op_type == "ncnnfused":
continue
link_noop = False
for k in range(self.model.layers[j].num_outputs):
if self.model.layers[j].outputs[k] == noop_input:
link_noop = True
any_k = k
break
if link_noop:
break
else:
j -= 1
if j == -1 or any_k == -1:
continue
self.model.layers[j].outputs[any_k] = layer.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __eliminate_split(self):
blob_input_references = []
for i, layer in enumerate(self.model.layers):
for input_name in layer.inputs:
blob_input_references.append(input_name)
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Split":
real_split_output_count = 0
real_split_output_index = -1
for j in range(layer.num_outputs):
if layer.outputs[j] in blob_input_references:
real_split_output_count += 1
real_split_output_index = j
if real_split_output_count > 1:
continue
# Any - Pooling
split_input = layer.inputs[0]
top_i = -1
j = i - 1
for j in range(i - 1, -1, -1):
if self.model.layers[j].op_type == "ncnnfused":
continue
for k in range(self.model.layers[j].num_outputs):
if self.model.layers[j].outputs[k] == split_input:
top_i = k
break
if top_i != -1:
break
else:
j -= 1
if j == -1:
continue
self.model.layers[j].outputs[top_i] = layer.outputs[
real_split_output_index
]
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __eliminate_orphaned_memorydata(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "MemoryData":
# MemoryData - X
memdata_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type == "ncnnfused":
continue
orphaned = True
for k in range(self.model.layers[j].num_inputs):
if self.model.layers[j].inputs[k] == memdata_output:
orphaned = False
break
if not orphaned:
break
if j < len(self.model.layers):
continue
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __eliminate_reshape_after_global_pooling(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Pooling":
if layer.params[4].value == 0:
continue
# Pooling - Reshape
pooling_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Reshape":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == pooling_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
reshape = self.model.layers[j]
if (
reshape.params[1].value != -233
or reshape.params[2].value != -233
or reshape.params[3].value != 0
):
continue
layer.outputs[0] = reshape.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
reshape.op_type = "ncnnfused"
def __eliminate_flatten_after_global_pooling(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Pooling":
if layer.params[4].value == 0:
continue
# Pooling - Flatten
pooling_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Flatten":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == pooling_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
flatten = self.model.layers[j]
layer.outputs[0] = flatten.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
flatten.op_type = "ncnnfused"
def __eliminate_flatten_after_innerproduct(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "InnerProduct":
# InnerProduct - Flatten
inprod_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Flatten":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == inprod_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
flatten = self.model.layers[j]
layer.outputs[0] = flatten.outputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
flatten.op_type = "ncnnfused"
def __eliminate_reshape_before_binaryop(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Reshape":
if (
layer.params[0].value != 1
or layer.params[1].value != 1
or layer.params[3].value != 1
):
continue
# Reshape - BinaryOp
reshape_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "BinaryOp":
continue
if self.model.layers[j].num_inputs != 2:
continue
if (
self.model.layers[j].inputs[0] == reshape_output
or self.model.layers[j].inputs[1] == reshape_output
):
break
else:
j += 1
if j == len(self.model.layers):
continue
binaryop = self.model.layers[j]
input_blob_final = layer.inputs[0]
if binaryop.inputs[0] == reshape_output:
binaryop.inputs[0] = input_blob_final
if binaryop.inputs[1] == reshape_output:
binaryop.inputs[1] = input_blob_final
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __replace_reduction_with_global_pooling(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Reduction":
if (
layer.params[0].value != 3
or layer.params[1].value != 0
or layer.params[2].value != 1
):
continue
axes = checked_cast(list, layer.params[3].value)
if len(axes) != 1:
continue
if axes[0] != 2 and axes[0] != 3:
continue
# Reduction(2/3) - Reduction(2)
reduction1_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Reduction":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == reduction1_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
reduction2 = self.model.layers[j]
if (
reduction2.params[0].value != 3
or reduction2.params[1].value != 0
or reduction2.params[2].value != 1
):
continue
axes2 = checked_cast(list, layer.params[3].value)
if len(axes2) != 1:
continue
if axes2[0] != 2:
continue
pooling = NcnnLayer(
"Pooling",
reduction2.name,
reduction2.num_inputs,
reduction2.num_outputs,
reduction2.inputs,
reduction2.outputs,
)
pooling.add_param(0, 1)
pooling.add_param(4, 1)
self.model.layers[j] = pooling
pooling.inputs[0] = layer.inputs[0]
self.model.node_count -= 1
self.model.blob_count -= 1
layer.op_type = "ncnnfused"
def __replace_prelu_with_leaky_relu(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "PReLU":
if layer.params[0].value != 1:
continue
relu_layer = NcnnLayer(
"ReLU",
layer.name,
layer.num_inputs,
layer.num_outputs,
layer.inputs,
layer.outputs,
)
relu_layer.add_param(
0, checked_cast(float, layer.weight_data["slope"].weight[0])
)
self.model.layers[i] = relu_layer
def __replace_convolution_with_innerproduct_after_global_pooling(self):
for i, layer in enumerate(self.model.layers):
if layer.op_type == "Pooling":
if layer.params[4].value == 0:
continue
# Pooling - Convolution
pooling_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Convolution":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == pooling_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
convolution = self.model.layers[j]
innerproduct = NcnnLayer(
"InnerProduct",
convolution.name,
convolution.num_inputs,
convolution.num_outputs,
convolution.inputs,
convolution.outputs,
)
innerproduct.add_param(
0, checked_cast(int, convolution.params[0].value)
)
innerproduct.add_param(
1, checked_cast(int, convolution.params[5].value)
)
innerproduct.add_param(
2, checked_cast(int, convolution.params[6].value)
)
innerproduct.add_param(
8, checked_cast(int, convolution.params[8].value)
)
innerproduct.add_param(
9, checked_cast(int, convolution.params[9].value)
)
innerproduct.add_param(
10,
checked_cast(list, convolution.params[10].value),
)
innerproduct.add_weight(
"weight",
convolution.weight_data["weight"].weight,
convolution.weight_data["weight"].quantize_tag,
)
innerproduct.add_weight("bias", convolution.weight_data["bias"].weight)
self.model.layers[j] = innerproduct
def __replace_convolution_with_innerproduct_after_innerproduct(self):
while True:
replaced = False
for i, layer in enumerate(self.model.layers):
if layer.op_type == "InnerProduct":
# InnerProduct - Convolution
inprod_output = layer.outputs[0]
j = i
for j in range(i + 1, len(self.model.layers)):
if self.model.layers[j].op_type != "Convolution":
continue
if self.model.layers[j].num_inputs != 1:
continue
if self.model.layers[j].inputs[0] == inprod_output:
break
else:
j += 1
if j == len(self.model.layers):
continue
convolution = self.model.layers[j]
innerproduct2 = NcnnLayer(
"InnerProduct",
convolution.name,
convolution.num_inputs,
convolution.num_outputs,
convolution.inputs,
convolution.outputs,
)
innerproduct2.add_param(
0, checked_cast(int, convolution.params[0].value)
)
innerproduct2.add_param(
1, checked_cast(int, convolution.params[5].value)
)
innerproduct2.add_param(
2, checked_cast(int, convolution.params[6].value)
)
innerproduct2.add_param(
8, checked_cast(int, convolution.params[8].value)
)
innerproduct2.add_param(
9, checked_cast(int, convolution.params[9].value)
)
innerproduct2.add_param(
10,
checked_cast(list, convolution.params[10].value),
)
innerproduct2.add_weight(
"weight",
convolution.weight_data["weight"].weight,
convolution.weight_data["weight"].quantize_tag,
)
innerproduct2.add_weight(
"bias", convolution.weight_data["bias"].weight
)
self.model.layers[j] = innerproduct2
replaced = True
if not replaced:
break
def optimize(self) -> None:
self.__fuse_batchnorm_scale()
self.__fuse_x_batchnorm()
self.__fuse_x_mul()
self.__fuse_x_add()
self.__fuse_innerproduct_dropout()
self.__replace_reduction_with_global_pooling()
self.__replace_prelu_with_leaky_relu()
self.__fuse_x_activation()
self.__fuse_memorydata_binaryop()
self.__fuse_binaryop_eltwise()
self.__eliminate_dropout()
self.__eliminate_pooling1x1()
self.__eliminate_noop()
self.__eliminate_split()
self.__eliminate_flatten_after_global_pooling()
self.__eliminate_reshape_after_global_pooling()
self.__eliminate_reshape_before_binaryop()
self.__replace_convolution_with_innerproduct_after_global_pooling()
self.__replace_convolution_with_innerproduct_after_innerproduct()
self.__eliminate_flatten_after_innerproduct()
self.__eliminate_orphaned_memorydata()