import os from copy import deepcopy from io import BufferedReader, StringIO from json import load as jload from typing import Dict, List, Tuple, Union import numpy as np from nodes.log import logger from ...utils.checked_cast import checked_cast param_schema_file = os.path.join( os.path.dirname(os.path.realpath(__file__)), "param_schema.json" ) with open(param_schema_file, encoding="utf-8") as schemaf: param_schema = jload(schemaf) DTYPE_FP32 = b"\x00\x00\x00\x00" DTYPE_FP16 = b"\x47\x6b\x30\x01" DTYPE_DICT = {b"\x00\x00\x00\x00": np.float32, b"\x47\x6b\x30\x01": np.float16} class UnaryOpTypes: ABS = 0 NEG = 1 FLOOR = 2 CEIL = 3 SQUARE = 4 SQRT = 5 RSQ = 6 EXP = 7 LOG = 8 SIN = 9 COS = 10 TAN = 11 ASIN = 12 ACOS = 13 ATAN = 14 RECIPROCAL = 15 TANH = 16 class BinaryOpTypes: ADD = 0 SUB = 1 MUL = 2 DIV = 3 MAX = 4 MIN = 5 POW = 6 RSUB = 7 RDIV = 8 class CastElementTypes: AUTO = 0 FLOAT32 = 1 FLOAT16 = 2 INT8 = 3 BFLOAT16 = 4 class EltwiseOpTypes: PROD = 0 SUM = 1 MAX = 2 class GruDirectionFlags: FORWARD = 0 REVERSE = 1 BIDIRECTIONAL = 2 class InterpResizeTypes: NEAREST = 1 BILINEAR = 2 BICUBIC = 3 class NormalizeEpsModes: CAFFE = 0 PYTORCH = 1 TENSORFLOW = 2 class PaddingTypes: CONSTANT = 0 REPLICATE = 1 REFLECT = 2 class PadModes: FULL = 0 VALID = 1 SAMEUPPER = 2 SAMELOWER = 3 class PermuteOrderTypes: WH_WHC_WHDC = 0 HW_HWC_HWDC = 1 WCH_WDHC = 2 CWH_DWHC = 3 HCW_HDWC = 4 CHW_DHWC = 5 WHCD = 6 HWCD = 7 WCHD = 8 CWHD = 9 HCWD = 10 CHWD = 11 WDCH = 12 DWCH = 13 WCDH = 14 CWDH = 15 DCWH = 16 CDWH = 17 HDCW = 18 DHCW = 19 HCDW = 20 CHDW = 21 DCHW = 22 CDHW = 23 class ReductionOpTypes: SUM = 0 ASUM = 1 SUMSQ = 2 MEAN = 3 MAX = 4 MIN = 5 PROD = 6 L1 = 7 L2 = 8 LOGSUM = 9 LOGSUMEXP = 10 class GridSampleSampleTypes: NEAREST = 1 BILINEAR = 2 BICUBIC = 3 class GridSamplePadModes: ZEROS = 1 BORDER = 2 REFLECTION = 3 class LrnRegionTypes: ACROSS_CHANNELS = 0 WITH_CHANNEL = 1 class NcnnWeight: def __init__(self, weight: np.ndarray, quantize_tag: bytes = b""): self.quantize_tag = quantize_tag self.weight = weight @property def shape(self) -> tuple: return self.weight.shape class NcnnParam: def __init__( self, pid: str, name: str, value: Union[float, int, List[Union[float, int]]], default: Union[float, int], ) -> None: self.id: str = pid self.name: str = name self.value: Union[float, int, List[Union[float, int]]] = value self.default: Union[float, int] = default class NcnnParamCollection: def __init__( self, op: str, param_dict: Union[Dict[int, NcnnParam], None] = None, ) -> None: self.op: str = op self.param_dict: Dict[int, NcnnParam] = {} if param_dict is None else param_dict self.weight_order: Dict[str, List[int]] = ( param_schema[self.op]["weightOrder"] if self.op else {} ) def __getitem__(self, pid: int) -> NcnnParam: try: return self.param_dict[pid] except KeyError as exc: idstr = str(pid) param_dict = param_schema[self.op] try: param = param_dict[idstr] except KeyError: logger.error(f"chaiNNer: op {self.op} does not have param {pid}, please report") raise defaultValue = param["defaultValue"] value = param["defaultValue"] if isinstance(value, str): for key, val in list(param_dict.items())[:-1]: if value == val["paramPhase"]: try: value = self.param_dict[int(key)].value except KeyError: value = val["defaultValue"] defaultValue = val["defaultValue"] break else: msg = f"Op {self.op} does not have param {value}, please report" raise KeyError(msg) from exc return NcnnParam(idstr, param["paramPhase"], value, defaultValue) def __setitem__( self, pid: int, value: Union[float, int, List[Union[float, int]]] ) -> None: idstr = str(pid) param_dict = param_schema[self.op] try: param = param_dict[idstr] except KeyError: logger.error(f"chaiNNer: op {self.op} does not have param {idstr}, please report") raise name = param["paramPhase"] def_val = param["defaultValue"] self.param_dict[pid] = NcnnParam(idstr, name, value, def_val) def __delitem__(self, key: int) -> None: try: del self.param_dict[key] except KeyError: pass def __contains__(self, item) -> bool: if item in self.param_dict: return True return False def __str__(self) -> str: output = "" param_dict = param_schema[self.op] self.param_dict = dict(sorted(self.param_dict.items())) for v in self.param_dict.values(): if v.value == v.default: continue if isinstance(v.default, str): pid = None for key, val in list(param_dict.items())[:-1]: if v.default == val["paramPhase"]: pid = int(key) break else: msg = f"Op {self.op} does not have param {v.default}, please report" raise KeyError(msg) # If a param that defaults to the value of another param, if it's value # equals that of the second param or its default, skip writing it if ( v.value == self.param_dict[pid].value or v.value == self.param_dict[pid].default ): continue if isinstance(v.value, list): output += " -233" + v.id.zfill(2) + "=" else: output += " " + v.id + "=" if isinstance(v.value, float): v_str = np.format_float_scientific(v.value, 6, False, exp_digits=2) elif isinstance(v.value, list): v_str = ",".join( [ np.format_float_scientific(n, 6, False, exp_digits=2) if isinstance(n, float) else str(n) for n in v.value ] ) else: v_str = str(v.value) output += v_str return output def set_op(self, op: str) -> None: self.op = op self.weight_order = param_schema[op]["weightOrder"] class NcnnLayer: def __init__( self, op_type: str = "", name: str = "", num_inputs: int = 0, num_outputs: int = 0, inputs: Union[List[str], None] = None, outputs: Union[List[str], None] = None, params: Union[NcnnParamCollection, None] = None, weight_data: Union[Dict[str, NcnnWeight], None] = None, ): self.op_type: str = op_type self.name: str = name self.num_inputs: int = num_inputs self.num_outputs: int = num_outputs self.inputs: List[str] = [] if inputs is None else inputs self.outputs: List[str] = [] if outputs is None else outputs self.params: NcnnParamCollection = ( NcnnParamCollection(op_type) if params is None else params ) self.weight_data: Dict[str, NcnnWeight] = ( {} if weight_data is None else weight_data ) def add_param( self, pid: int, value: Union[float, int, List[Union[float, int]]] ) -> None: self.params[pid] = value def add_weight( self, weight_name: str, data: Union[float, int, np.ndarray], quantize_tag: bytes = b"", ) -> int: if isinstance(data, float): data_array = np.array(data, np.float32) elif isinstance(data, int): data_array = np.array(data, np.int32) else: data_array = data if quantize_tag == DTYPE_FP16: data_array = data_array.astype(np.float16) else: # Since int8 not supported, all data that is not fp16 is fp32. # This covers issues caused by converting fp16 ONNX models. data_array = data_array.astype(np.float32) self.weight_data[weight_name] = NcnnWeight(data_array, quantize_tag) return len(quantize_tag) + len(data_array.tobytes()) class NcnnModel: def __init__( self, node_count: int = 0, blob_count: int = 0, ) -> None: self.node_count: int = node_count self.blob_count: int = blob_count self.layers: List[NcnnLayer] = [] self.bin_length = 0 @property def magic(self): return "7767517" @staticmethod def load_from_file(param_path: str = "", bin_path: str = "") -> "NcnnModel": if bin_path == "": bin_path = param_path.replace(".param", ".bin") elif param_path == "": param_path = bin_path.replace(".bin", ".param") model = NcnnModel() with open(param_path, "r", encoding="utf-8") as paramf: with open(bin_path, "rb") as binf: paramf.readline() counts = paramf.readline().strip().split(" ") model.node_count = int(counts[0]) model.blob_count = int(counts[1]) for line in paramf: op_type, layer = model.parse_param_layer(line) layer.weight_data = model.load_layer_weights(binf, op_type, layer) model.add_layer(layer) binf.seek(0, os.SEEK_END) model.bin_length = binf.tell() return model @staticmethod def interp_layers( a: NcnnLayer, b: NcnnLayer, alpha_a: float ) -> Tuple[NcnnLayer, bytes]: weights_a = a.weight_data weights_b = b.weight_data weights_interp: Dict[str, NcnnWeight] = {} layer_bytes = b"" if weights_a: assert len(weights_a) == len( weights_b ), "All corresponding nodes must have same number of weights" layer_bytes_list = [] for weight_name, weight_a in weights_a.items(): try: weight_b = weights_b[weight_name] except KeyError: logger.error(f"chaiNNer: weights in node {a.name} and {b.name} do not correspond") raise assert ( weight_a.shape == weight_b.shape ), "Corresponding weights must have the same size and shape" assert len(weight_a.quantize_tag) == len( weight_b.quantize_tag ), "Weights must either both have or both not have a quantize tag" if ( weight_a.quantize_tag == DTYPE_FP16 and weight_b.quantize_tag == DTYPE_FP32 ): weight_b.quantize_tag = DTYPE_FP16 weight_b.weight = weight_b.weight.astype(np.float16) elif ( weight_a.quantize_tag == DTYPE_FP32 and weight_b.quantize_tag == DTYPE_FP16 ): weight_a.quantize_tag = DTYPE_FP16 weight_a.weight = weight_a.weight.astype(np.float16) weight_c = NcnnWeight( (weight_a.weight * alpha_a + weight_b.weight * (1 - alpha_a)), weight_a.quantize_tag, ) layer_bytes_list.append( weight_c.quantize_tag + weight_c.weight.tobytes() ) weights_interp[weight_name] = weight_c layer_bytes = b"".join(layer_bytes_list) return ( NcnnLayer( a.op_type, a.name, a.num_inputs, a.num_outputs, a.inputs, a.outputs, a.params, weights_interp, ), layer_bytes, ) def add_layer(self, layer: NcnnLayer) -> None: self.layers.append(layer) def parse_param_layer(self, layer_str: str) -> Tuple[str, NcnnLayer]: param_list = layer_str.strip().split() op_type, name = param_list[:2] assert op_type != "MemoryData", "This NCNN param file contains invalid layers" num_inputs = int(param_list[2]) num_outputs = int(param_list[3]) input_end = 4 + num_inputs output_end = input_end + num_outputs inputs = list(param_list[4:input_end]) outputs = list(param_list[input_end:output_end]) params = param_list[output_end:] param_dict = {} for param_str in params: ks, vs = param_str.split("=") k = int(ks) if k < 0: v = [] for vi in vs.split(","): vi = float(vi) if "." in vi or "e" in vi else int(vi) v.append(vi) k = abs(k + 23300) ks = str(k) elif "." in vs or "e" in vs: v = float(vs) else: v = int(vs) param = NcnnParam( ks, param_schema[op_type][ks]["paramPhase"], v, param_schema[op_type][ks]["defaultValue"], ) param_dict[k] = param return op_type, NcnnLayer( op_type, name, num_inputs, num_outputs, inputs, outputs, NcnnParamCollection(op_type, param_dict), ) def load_layer_weights( self, binf: BufferedReader, op_type: str, layer: NcnnLayer ) -> Dict[str, NcnnWeight]: weight_dict = {} if op_type == "BatchNorm": channels_data = checked_cast(int, layer.params[0].value) * 4 slope = np.frombuffer(binf.read(channels_data), np.float32) weight_dict["slope"] = NcnnWeight(slope) mean = np.frombuffer(binf.read(channels_data), np.float32) weight_dict["mean"] = NcnnWeight(mean) variance = np.frombuffer(binf.read(channels_data), np.float32) weight_dict["variance"] = NcnnWeight(variance) bias = np.frombuffer(binf.read(channels_data), np.float32) weight_dict["bias"] = NcnnWeight(bias) elif op_type in ("Convolution", "ConvolutionDepthWise"): quantize_tag = binf.read(4) dtype = DTYPE_DICT[quantize_tag] weight_data_length = checked_cast(int, layer.params[6].value) weight_data_size = ( weight_data_length * 2 if quantize_tag == DTYPE_FP16 else weight_data_length * 4 ) has_bias = layer.params[5].value num_filters = checked_cast(int, layer.params[0].value) kernel_w = checked_cast(int, layer.params[1].value) kernel_h = checked_cast(int, layer.params[11].value) if op_type == "ConvolutionDepthWise": group = checked_cast(int, layer.params[7].value) num_input = ( weight_data_length // (num_filters // group) // kernel_w // kernel_h ) shape = ( group, num_filters // group, num_input // group, kernel_h, kernel_w, ) else: num_input = weight_data_length // num_filters // kernel_w // kernel_h shape = (num_filters, num_input, kernel_h, kernel_w) weight_data = np.frombuffer(binf.read(weight_data_size), dtype) weight_data = weight_data.reshape(shape) weight_dict["weight"] = NcnnWeight(weight_data, quantize_tag) if has_bias: bias_data_size = num_filters * 4 bias_data = np.frombuffer(binf.read(bias_data_size), np.float32) weight_dict["bias"] = NcnnWeight(bias_data) elif op_type == "Deconvolution": quantize_tag = binf.read(4) dtype = DTYPE_DICT[quantize_tag] weight_data_length = checked_cast(int, layer.params[6].value) weight_data_size = ( weight_data_length * 2 if quantize_tag == DTYPE_FP16 else weight_data_length * 4 ) has_bias = layer.params[5].value num_filters = checked_cast(int, layer.params[0].value) kernel_w = checked_cast(int, layer.params[1].value) kernel_h = checked_cast(int, layer.params[11].value) num_input = weight_data_length // num_filters // kernel_w // kernel_h shape = (num_filters, num_input, kernel_h, kernel_w) weight_data = np.frombuffer(binf.read(weight_data_size), dtype) weight_data = weight_data.reshape(shape) weight_dict["weight"] = NcnnWeight(weight_data, quantize_tag) if has_bias: bias_data_size = num_filters * 4 bias_data = np.frombuffer(binf.read(bias_data_size), np.float32) weight_dict["bias"] = NcnnWeight(bias_data) elif op_type == "InnerProduct": quantize_tag = binf.read(4) dtype = DTYPE_DICT[quantize_tag] weight_data_length = layer.params[2].value assert isinstance(weight_data_length, int), "Weight data size must be int" weight_data_size = ( weight_data_length * 2 if quantize_tag == DTYPE_FP16 else weight_data_length * 4 ) weight_data = np.frombuffer(binf.read(weight_data_size), dtype) num_output = layer.params[0].value assert isinstance(num_output, int), "Num output must be int" num_input = weight_data_length // num_output weight_data = weight_data.reshape((num_input, num_output)) weight_dict["weight"] = NcnnWeight(weight_data, quantize_tag) has_bias = layer.params[1].value if has_bias == 1: bias_data_size = num_output * 4 bias_data = np.frombuffer(binf.read(bias_data_size), np.float32) weight_dict["bias"] = NcnnWeight(bias_data) elif op_type == "PReLU": num_slope = layer.params[0].value assert isinstance(num_slope, int), "Num slopes must be int" slope_data_size = num_slope * 4 slope_data = np.frombuffer(binf.read(slope_data_size), np.float32) weight_dict["slope"] = NcnnWeight(slope_data) elif op_type == "Scale": scale_data_length = layer.params[0].value assert isinstance(scale_data_length, int), "Scale data size must be int" if scale_data_length != -233: quantize_tag = binf.read(4) dtype = DTYPE_DICT[quantize_tag] scale_data_size = ( scale_data_length * 2 if quantize_tag == DTYPE_FP16 else scale_data_length * 4 ) scale_data = np.frombuffer(binf.read(scale_data_size), dtype) weight_dict["weight"] = NcnnWeight(scale_data, quantize_tag) has_bias = layer.params[1].value if has_bias == 1: bias_data = np.frombuffer( binf.read(scale_data_length * 4), np.float32 ) weight_dict["bias"] = NcnnWeight(bias_data) else: if len(layer.params.weight_order) != 0: error_msg = f"Load weights not added for {op_type} yet, please report" raise ValueError(error_msg) return weight_dict def write_param(self, filename: str = "") -> str: with StringIO() as p: p.write(f"{self.magic}\n{self.node_count} {self.blob_count}\n") for layer in self.layers: if layer.op_type == "ncnnfused": continue p.write( f"{layer.op_type:<16}" f" {layer.name:<24}" f" {layer.num_inputs}" f" {layer.num_outputs}" ) if layer.inputs: p.write(f" {' '.join(layer.inputs)}") if layer.outputs: p.write(f" {' '.join(layer.outputs)}") if layer.params.param_dict: param_str = str(layer.params) if param_str: p.write(f"{param_str}") p.write("\n") if filename: with open(filename, "w", encoding="utf-8") as f: f.write(p.getvalue()) return "" else: return p.getvalue() def serialize_weights(self) -> bytes: layer_weights = [ b"".join((w.quantize_tag, np.ndarray.tobytes(w.weight))) for l in self.layers for w in l.weight_data.values() if l.weight_data and l.op_type != "ncnnfused" ] return b"".join(layer_weights) def write_bin(self, filename: str) -> None: with open(filename, "wb") as f: f.write(self.serialize_weights()) def interpolate(self, model_b: "NcnnModel", alpha: float) -> "NcnnModel": interp_model = deepcopy(self) layer_a_weights = [(i, l) for i, l in enumerate(self.layers) if l.weight_data] layer_b_weights = [ (i, l) for i, l in enumerate(model_b.layers) if l.weight_data ] assert len(layer_a_weights) == len( layer_b_weights ), "Models must have same number of layers containing weights" weight_bytes_list = [] for layer_a, layer_b in zip(layer_a_weights, layer_b_weights): interp_layer, layer_bytes = NcnnModel.interp_layers( layer_a[1], layer_b[1], alpha ) interp_model.layers[layer_a[0]] = interp_layer weight_bytes_list.append(layer_bytes) return interp_model @property def bin(self) -> bytes: return self.serialize_weights() class NcnnModelWrapper: def __init__(self, model: NcnnModel) -> None: self.model: NcnnModel = model scale, in_nc, out_nc, nf, fp = NcnnModelWrapper.get_broadcast_data(model) self.scale: int = scale self.nf: int = nf self.in_nc: int = in_nc self.out_nc: int = out_nc self.fp: str = fp @staticmethod def get_broadcast_data(model: NcnnModel) -> Tuple[int, int, int, int, str]: scale = 1.0 in_nc = 0 out_nc = 0 nf = 0 fp = "fp32" pixel_shuffle = 1 found_first_conv = False current_conv = None for i, layer in enumerate(model.layers): if layer.op_type == "Interp": try: if ( model.layers[i + 1].op_type != "BinaryOp" and model.layers[i + 1].params[0].value != 0 ): scale *= checked_cast(float, layer.params[1].value) except IndexError: scale *= checked_cast(float, layer.params[1].value) elif layer.op_type == "PixelShuffle": scale *= checked_cast(int, layer.params[0].value) pixel_shuffle *= checked_cast(int, layer.params[0].value) elif layer.op_type in ( "Convolution", "Convolution1D", "ConvolutionDepthWise", ): if found_first_conv is not True: nf, in_nc = NcnnModelWrapper.get_nf_and_in_nc(layer) if layer.weight_data["weight"].quantize_tag == DTYPE_FP16: fp = "fp16" found_first_conv = True scale /= checked_cast(int, layer.params[3].value) current_conv = layer elif layer.op_type in ("Deconvolution", "DeconvolutionDepthWise"): if found_first_conv is not True: nf, in_nc = NcnnModelWrapper.get_nf_and_in_nc(layer) found_first_conv = True scale *= checked_cast(int, layer.params[3].value) current_conv = layer assert ( current_conv is not None ), "Cannot broadcast; model has no Convolution layers" out_nc = checked_cast(int, current_conv.params[0].value) // pixel_shuffle**2 assert scale >= 1, "Models with scale less than 1x not supported" assert scale % 1 == 0, f"Model not supported, scale {scale} is not an integer" return int(scale), in_nc, out_nc, nf, fp @staticmethod def get_nf_and_in_nc(layer: NcnnLayer) -> Tuple[int, int]: nf = layer.params[0].value kernel_w = layer.params[1].value try: kernel_h = layer.params[11].value except KeyError: kernel_h = kernel_w weight_data_size = layer.params[6].value assert ( isinstance(nf, int) and isinstance(kernel_w, int) and isinstance(kernel_h, int) and isinstance(weight_data_size, int) ), "Out nc, kernel width and height, and weight data size must all be ints" in_nc = weight_data_size // nf // kernel_w // kernel_h return nf, in_nc