|
|
|
""" |
|
Export a YOLO PyTorch model to other formats. TensorFlow exports authored by https://github.com/zldrobit. |
|
|
|
Format | `format=argument` | Model |
|
--- | --- | --- |
|
PyTorch | - | yolo11n.pt |
|
TorchScript | `torchscript` | yolo11n.torchscript |
|
ONNX | `onnx` | yolo11n.onnx |
|
OpenVINO | `openvino` | yolo11n_openvino_model/ |
|
TensorRT | `engine` | yolo11n.engine |
|
CoreML | `coreml` | yolo11n.mlpackage |
|
TensorFlow SavedModel | `saved_model` | yolo11n_saved_model/ |
|
TensorFlow GraphDef | `pb` | yolo11n.pb |
|
TensorFlow Lite | `tflite` | yolo11n.tflite |
|
TensorFlow Edge TPU | `edgetpu` | yolo11n_edgetpu.tflite |
|
TensorFlow.js | `tfjs` | yolo11n_web_model/ |
|
PaddlePaddle | `paddle` | yolo11n_paddle_model/ |
|
MNN | `mnn` | yolo11n.mnn |
|
NCNN | `ncnn` | yolo11n_ncnn_model/ |
|
IMX | `imx` | yolo11n_imx_model/ |
|
|
|
Requirements: |
|
$ pip install "ultralytics[export]" |
|
|
|
Python: |
|
from ultralytics import YOLO |
|
model = YOLO('yolo11n.pt') |
|
results = model.export(format='onnx') |
|
|
|
CLI: |
|
$ yolo mode=export model=yolo11n.pt format=onnx |
|
|
|
Inference: |
|
$ yolo predict model=yolo11n.pt # PyTorch |
|
yolo11n.torchscript # TorchScript |
|
yolo11n.onnx # ONNX Runtime or OpenCV DNN with dnn=True |
|
yolo11n_openvino_model # OpenVINO |
|
yolo11n.engine # TensorRT |
|
yolo11n.mlpackage # CoreML (macOS-only) |
|
yolo11n_saved_model # TensorFlow SavedModel |
|
yolo11n.pb # TensorFlow GraphDef |
|
yolo11n.tflite # TensorFlow Lite |
|
yolo11n_edgetpu.tflite # TensorFlow Edge TPU |
|
yolo11n_paddle_model # PaddlePaddle |
|
yolo11n.mnn # MNN |
|
yolo11n_ncnn_model # NCNN |
|
yolo11n_imx_model # IMX |
|
|
|
TensorFlow.js: |
|
$ cd .. && git clone https://github.com/zldrobit/tfjs-yolov5-example.git && cd tfjs-yolov5-example |
|
$ npm install |
|
$ ln -s ../../yolo11n_web_model public/yolo11n_web_model |
|
$ npm start |
|
""" |
|
|
|
import gc |
|
import json |
|
import os |
|
import shutil |
|
import subprocess |
|
import time |
|
import warnings |
|
from copy import deepcopy |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
import numpy as np |
|
import torch |
|
|
|
from ultralytics.cfg import TASK2DATA, get_cfg |
|
from ultralytics.data import build_dataloader |
|
from ultralytics.data.dataset import YOLODataset |
|
from ultralytics.data.utils import check_cls_dataset, check_det_dataset |
|
from ultralytics.nn.autobackend import check_class_names, default_class_names |
|
from ultralytics.nn.modules import C2f, Classify, Detect, RTDETRDecoder |
|
from ultralytics.nn.tasks import DetectionModel, SegmentationModel, WorldModel |
|
from ultralytics.utils import ( |
|
ARM64, |
|
DEFAULT_CFG, |
|
IS_JETSON, |
|
LINUX, |
|
LOGGER, |
|
MACOS, |
|
PYTHON_VERSION, |
|
ROOT, |
|
WINDOWS, |
|
__version__, |
|
callbacks, |
|
colorstr, |
|
get_default_args, |
|
yaml_save, |
|
) |
|
from ultralytics.utils.checks import ( |
|
check_imgsz, |
|
check_is_path_safe, |
|
check_requirements, |
|
check_version, |
|
is_sudo_available, |
|
) |
|
from ultralytics.utils.downloads import attempt_download_asset, get_github_assets, safe_download |
|
from ultralytics.utils.files import file_size, spaces_in_path |
|
from ultralytics.utils.ops import Profile |
|
from ultralytics.utils.torch_utils import TORCH_1_13, get_latest_opset, select_device |
|
|
|
|
|
def export_formats(): |
|
"""Ultralytics YOLO export formats.""" |
|
x = [ |
|
["PyTorch", "-", ".pt", True, True, []], |
|
["TorchScript", "torchscript", ".torchscript", True, True, ["batch", "optimize"]], |
|
["ONNX", "onnx", ".onnx", True, True, ["batch", "dynamic", "half", "opset", "simplify"]], |
|
["OpenVINO", "openvino", "_openvino_model", True, False, ["batch", "dynamic", "half", "int8"]], |
|
["TensorRT", "engine", ".engine", False, True, ["batch", "dynamic", "half", "int8", "simplify"]], |
|
["CoreML", "coreml", ".mlpackage", True, False, ["batch", "half", "int8", "nms"]], |
|
["TensorFlow SavedModel", "saved_model", "_saved_model", True, True, ["batch", "int8", "keras"]], |
|
["TensorFlow GraphDef", "pb", ".pb", True, True, ["batch"]], |
|
["TensorFlow Lite", "tflite", ".tflite", True, False, ["batch", "half", "int8"]], |
|
["TensorFlow Edge TPU", "edgetpu", "_edgetpu.tflite", True, False, []], |
|
["TensorFlow.js", "tfjs", "_web_model", True, False, ["batch", "half", "int8"]], |
|
["PaddlePaddle", "paddle", "_paddle_model", True, True, ["batch"]], |
|
["MNN", "mnn", ".mnn", True, True, ["batch", "half", "int8"]], |
|
["NCNN", "ncnn", "_ncnn_model", True, True, ["batch", "half"]], |
|
["IMX", "imx", "_imx_model", True, True, ["int8"]], |
|
] |
|
return dict(zip(["Format", "Argument", "Suffix", "CPU", "GPU", "Arguments"], zip(*x))) |
|
|
|
|
|
def validate_args(format, passed_args, valid_args): |
|
""" |
|
Validates arguments based on format. |
|
|
|
Args: |
|
format (str): The export format. |
|
passed_args (Namespace): The arguments used during export. |
|
valid_args (dict): List of valid arguments for the format. |
|
|
|
Raises: |
|
AssertionError: If an argument that's not supported by the export format is used, or if format doesn't have the supported arguments listed. |
|
""" |
|
|
|
export_args = ["half", "int8", "dynamic", "keras", "nms", "batch"] |
|
|
|
assert valid_args is not None, f"ERROR ❌️ valid arguments for '{format}' not listed." |
|
custom = {"batch": 1, "data": None, "device": None} |
|
default_args = get_cfg(DEFAULT_CFG, custom) |
|
for arg in export_args: |
|
not_default = getattr(passed_args, arg, None) != getattr(default_args, arg, None) |
|
if not_default: |
|
assert arg in valid_args, f"ERROR ❌️ argument '{arg}' is not supported for format='{format}'" |
|
|
|
|
|
def gd_outputs(gd): |
|
"""TensorFlow GraphDef model output node names.""" |
|
name_list, input_list = [], [] |
|
for node in gd.node: |
|
name_list.append(node.name) |
|
input_list.extend(node.input) |
|
return sorted(f"{x}:0" for x in list(set(name_list) - set(input_list)) if not x.startswith("NoOp")) |
|
|
|
|
|
def try_export(inner_func): |
|
"""YOLO export decorator, i.e. @try_export.""" |
|
inner_args = get_default_args(inner_func) |
|
|
|
def outer_func(*args, **kwargs): |
|
"""Export a model.""" |
|
prefix = inner_args["prefix"] |
|
try: |
|
with Profile() as dt: |
|
f, model = inner_func(*args, **kwargs) |
|
LOGGER.info(f"{prefix} export success ✅ {dt.t:.1f}s, saved as '{f}' ({file_size(f):.1f} MB)") |
|
return f, model |
|
except Exception as e: |
|
LOGGER.error(f"{prefix} export failure ❌ {dt.t:.1f}s: {e}") |
|
raise e |
|
|
|
return outer_func |
|
|
|
|
|
class Exporter: |
|
""" |
|
A class for exporting a model. |
|
|
|
Attributes: |
|
args (SimpleNamespace): Configuration for the exporter. |
|
callbacks (list, optional): List of callback functions. Defaults to None. |
|
""" |
|
|
|
def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None): |
|
""" |
|
Initializes the Exporter class. |
|
|
|
Args: |
|
cfg (str, optional): Path to a configuration file. Defaults to DEFAULT_CFG. |
|
overrides (dict, optional): Configuration overrides. Defaults to None. |
|
_callbacks (dict, optional): Dictionary of callback functions. Defaults to None. |
|
""" |
|
self.args = get_cfg(cfg, overrides) |
|
if self.args.format.lower() in {"coreml", "mlmodel"}: |
|
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" |
|
|
|
self.callbacks = _callbacks or callbacks.get_default_callbacks() |
|
callbacks.add_integration_callbacks(self) |
|
|
|
def __call__(self, model=None) -> str: |
|
"""Returns list of exported files/dirs after running callbacks.""" |
|
self.run_callbacks("on_export_start") |
|
t = time.time() |
|
fmt = self.args.format.lower() |
|
if fmt in {"tensorrt", "trt"}: |
|
fmt = "engine" |
|
if fmt in {"mlmodel", "mlpackage", "mlprogram", "apple", "ios", "coreml"}: |
|
fmt = "coreml" |
|
fmts_dict = export_formats() |
|
fmts = tuple(fmts_dict["Argument"][1:]) |
|
if fmt not in fmts: |
|
import difflib |
|
|
|
|
|
matches = difflib.get_close_matches(fmt, fmts, n=1, cutoff=0.6) |
|
if not matches: |
|
raise ValueError(f"Invalid export format='{fmt}'. Valid formats are {fmts}") |
|
LOGGER.warning(f"WARNING ⚠️ Invalid export format='{fmt}', updating to format='{matches[0]}'") |
|
fmt = matches[0] |
|
flags = [x == fmt for x in fmts] |
|
if sum(flags) != 1: |
|
raise ValueError(f"Invalid export format='{fmt}'. Valid formats are {fmts}") |
|
( |
|
jit, |
|
onnx, |
|
xml, |
|
engine, |
|
coreml, |
|
saved_model, |
|
pb, |
|
tflite, |
|
edgetpu, |
|
tfjs, |
|
paddle, |
|
mnn, |
|
ncnn, |
|
imx, |
|
) = flags |
|
is_tf_format = any((saved_model, pb, tflite, edgetpu, tfjs)) |
|
|
|
|
|
dla = None |
|
if fmt == "engine" and self.args.device is None: |
|
LOGGER.warning("WARNING ⚠️ TensorRT requires GPU export, automatically assigning device=0") |
|
self.args.device = "0" |
|
if fmt == "engine" and "dla" in str(self.args.device): |
|
dla = self.args.device.split(":")[-1] |
|
self.args.device = "0" |
|
assert dla in {"0", "1"}, f"Expected self.args.device='dla:0' or 'dla:1, but got {self.args.device}." |
|
self.device = select_device("cpu" if self.args.device is None else self.args.device) |
|
|
|
|
|
fmt_keys = fmts_dict["Arguments"][flags.index(True) + 1] |
|
validate_args(fmt, self.args, fmt_keys) |
|
if imx and not self.args.int8: |
|
LOGGER.warning("WARNING ⚠️ IMX only supports int8 export, setting int8=True.") |
|
self.args.int8 = True |
|
if not hasattr(model, "names"): |
|
model.names = default_class_names() |
|
model.names = check_class_names(model.names) |
|
if self.args.half and self.args.int8: |
|
LOGGER.warning("WARNING ⚠️ half=True and int8=True are mutually exclusive, setting half=False.") |
|
self.args.half = False |
|
if self.args.half and onnx and self.device.type == "cpu": |
|
LOGGER.warning("WARNING ⚠️ half=True only compatible with GPU export, i.e. use device=0") |
|
self.args.half = False |
|
assert not self.args.dynamic, "half=True not compatible with dynamic=True, i.e. use only one." |
|
self.imgsz = check_imgsz(self.args.imgsz, stride=model.stride, min_dim=2) |
|
if self.args.int8 and engine: |
|
self.args.dynamic = True |
|
if self.args.optimize: |
|
assert not ncnn, "optimize=True not compatible with format='ncnn', i.e. use optimize=False" |
|
assert self.device.type == "cpu", "optimize=True not compatible with cuda devices, i.e. use device='cpu'" |
|
if self.args.int8 and tflite: |
|
assert not getattr(model, "end2end", False), "TFLite INT8 export not supported for end2end models." |
|
if edgetpu: |
|
if not LINUX: |
|
raise SystemError("Edge TPU export only supported on Linux. See https://coral.ai/docs/edgetpu/compiler") |
|
elif self.args.batch != 1: |
|
LOGGER.warning("WARNING ⚠️ Edge TPU export requires batch size 1, setting batch=1.") |
|
self.args.batch = 1 |
|
if isinstance(model, WorldModel): |
|
LOGGER.warning( |
|
"WARNING ⚠️ YOLOWorld (original version) export is not supported to any format.\n" |
|
"WARNING ⚠️ YOLOWorldv2 models (i.e. 'yolov8s-worldv2.pt') only support export to " |
|
"(torchscript, onnx, openvino, engine, coreml) formats. " |
|
"See https://docs.ultralytics.com/models/yolo-world for details." |
|
) |
|
model.clip_model = None |
|
if self.args.int8 and not self.args.data: |
|
self.args.data = DEFAULT_CFG.data or TASK2DATA[getattr(model, "task", "detect")] |
|
LOGGER.warning( |
|
"WARNING ⚠️ INT8 export requires a missing 'data' arg for calibration. " |
|
f"Using default 'data={self.args.data}'." |
|
) |
|
|
|
|
|
im = torch.zeros(self.args.batch, 3, *self.imgsz).to(self.device) |
|
file = Path( |
|
getattr(model, "pt_path", None) or getattr(model, "yaml_file", None) or model.yaml.get("yaml_file", "") |
|
) |
|
if file.suffix in {".yaml", ".yml"}: |
|
file = Path(file.name) |
|
|
|
|
|
model = deepcopy(model).to(self.device) |
|
for p in model.parameters(): |
|
p.requires_grad = False |
|
model.eval() |
|
model.float() |
|
model = model.fuse() |
|
|
|
if imx: |
|
from ultralytics.utils.torch_utils import FXModel |
|
|
|
model = FXModel(model) |
|
for m in model.modules(): |
|
if isinstance(m, Classify): |
|
m.export = True |
|
if isinstance(m, (Detect, RTDETRDecoder)): |
|
m.dynamic = self.args.dynamic |
|
m.export = True |
|
m.format = self.args.format |
|
m.max_det = self.args.max_det |
|
elif isinstance(m, C2f) and not is_tf_format: |
|
|
|
m.forward = m.forward_split |
|
if isinstance(m, Detect) and imx: |
|
from ultralytics.utils.tal import make_anchors |
|
|
|
m.anchors, m.strides = ( |
|
x.transpose(0, 1) |
|
for x in make_anchors( |
|
torch.cat([s / m.stride.unsqueeze(-1) for s in self.imgsz], dim=1), m.stride, 0.5 |
|
) |
|
) |
|
|
|
y = None |
|
for _ in range(2): |
|
y = model(im) |
|
if self.args.half and onnx and self.device.type != "cpu": |
|
im, model = im.half(), model.half() |
|
|
|
|
|
warnings.filterwarnings("ignore", category=torch.jit.TracerWarning) |
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
warnings.filterwarnings("ignore", category=DeprecationWarning) |
|
|
|
|
|
self.im = im |
|
self.model = model |
|
self.file = file |
|
self.output_shape = ( |
|
tuple(y.shape) |
|
if isinstance(y, torch.Tensor) |
|
else tuple(tuple(x.shape if isinstance(x, torch.Tensor) else []) for x in y) |
|
) |
|
self.pretty_name = Path(self.model.yaml.get("yaml_file", self.file)).stem.replace("yolo", "YOLO") |
|
data = model.args["data"] if hasattr(model, "args") and isinstance(model.args, dict) else "" |
|
description = f"Ultralytics {self.pretty_name} model {f'trained on {data}' if data else ''}" |
|
self.metadata = { |
|
"description": description, |
|
"author": "Ultralytics", |
|
"date": datetime.now().isoformat(), |
|
"version": __version__, |
|
"license": "AGPL-3.0 License (https://ultralytics.com/license)", |
|
"docs": "https://docs.ultralytics.com", |
|
"stride": int(max(model.stride)), |
|
"task": model.task, |
|
"batch": self.args.batch, |
|
"imgsz": self.imgsz, |
|
"names": model.names, |
|
"args": {k: v for k, v in self.args if k in fmt_keys}, |
|
} |
|
if model.task == "pose": |
|
self.metadata["kpt_shape"] = model.model[-1].kpt_shape |
|
|
|
LOGGER.info( |
|
f"\n{colorstr('PyTorch:')} starting from '{file}' with input shape {tuple(im.shape)} BCHW and " |
|
f"output shape(s) {self.output_shape} ({file_size(file):.1f} MB)" |
|
) |
|
|
|
|
|
f = [""] * len(fmts) |
|
if jit or ncnn: |
|
f[0], _ = self.export_torchscript() |
|
if engine: |
|
f[1], _ = self.export_engine(dla=dla) |
|
if onnx: |
|
f[2], _ = self.export_onnx() |
|
if xml: |
|
f[3], _ = self.export_openvino() |
|
if coreml: |
|
f[4], _ = self.export_coreml() |
|
if is_tf_format: |
|
self.args.int8 |= edgetpu |
|
f[5], keras_model = self.export_saved_model() |
|
if pb or tfjs: |
|
f[6], _ = self.export_pb(keras_model=keras_model) |
|
if tflite: |
|
f[7], _ = self.export_tflite(keras_model=keras_model, nms=False, agnostic_nms=self.args.agnostic_nms) |
|
if edgetpu: |
|
f[8], _ = self.export_edgetpu(tflite_model=Path(f[5]) / f"{self.file.stem}_full_integer_quant.tflite") |
|
if tfjs: |
|
f[9], _ = self.export_tfjs() |
|
if paddle: |
|
f[10], _ = self.export_paddle() |
|
if mnn: |
|
f[11], _ = self.export_mnn() |
|
if ncnn: |
|
f[12], _ = self.export_ncnn() |
|
if imx: |
|
f[13], _ = self.export_imx() |
|
|
|
|
|
f = [str(x) for x in f if x] |
|
if any(f): |
|
f = str(Path(f[-1])) |
|
square = self.imgsz[0] == self.imgsz[1] |
|
s = ( |
|
"" |
|
if square |
|
else f"WARNING ⚠️ non-PyTorch val requires square images, 'imgsz={self.imgsz}' will not " |
|
f"work. Use export 'imgsz={max(self.imgsz)}' if val is required." |
|
) |
|
imgsz = self.imgsz[0] if square else str(self.imgsz)[1:-1].replace(" ", "") |
|
predict_data = f"data={data}" if model.task == "segment" and fmt == "pb" else "" |
|
q = "int8" if self.args.int8 else "half" if self.args.half else "" |
|
LOGGER.info( |
|
f"\nExport complete ({time.time() - t:.1f}s)" |
|
f"\nResults saved to {colorstr('bold', file.parent.resolve())}" |
|
f"\nPredict: yolo predict task={model.task} model={f} imgsz={imgsz} {q} {predict_data}" |
|
f"\nValidate: yolo val task={model.task} model={f} imgsz={imgsz} data={data} {q} {s}" |
|
f"\nVisualize: https://netron.app" |
|
) |
|
|
|
self.run_callbacks("on_export_end") |
|
return f |
|
|
|
def get_int8_calibration_dataloader(self, prefix=""): |
|
"""Build and return a dataloader suitable for calibration of INT8 models.""" |
|
LOGGER.info(f"{prefix} collecting INT8 calibration images from 'data={self.args.data}'") |
|
data = (check_cls_dataset if self.model.task == "classify" else check_det_dataset)(self.args.data) |
|
|
|
batch = self.args.batch * (2 if self.args.format == "engine" else 1) |
|
dataset = YOLODataset( |
|
data[self.args.split or "val"], |
|
data=data, |
|
task=self.model.task, |
|
imgsz=self.imgsz[0], |
|
augment=False, |
|
batch_size=batch, |
|
) |
|
n = len(dataset) |
|
if n < self.args.batch: |
|
raise ValueError( |
|
f"The calibration dataset ({n} images) must have at least as many images as the batch size ('batch={self.args.batch}')." |
|
) |
|
elif n < 300: |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ >300 images recommended for INT8 calibration, found {n} images.") |
|
return build_dataloader(dataset, batch=batch, workers=0) |
|
|
|
@try_export |
|
def export_torchscript(self, prefix=colorstr("TorchScript:")): |
|
"""YOLO TorchScript model export.""" |
|
LOGGER.info(f"\n{prefix} starting export with torch {torch.__version__}...") |
|
f = self.file.with_suffix(".torchscript") |
|
|
|
ts = torch.jit.trace(self.model, self.im, strict=False) |
|
extra_files = {"config.txt": json.dumps(self.metadata)} |
|
if self.args.optimize: |
|
LOGGER.info(f"{prefix} optimizing for mobile...") |
|
from torch.utils.mobile_optimizer import optimize_for_mobile |
|
|
|
optimize_for_mobile(ts)._save_for_lite_interpreter(str(f), _extra_files=extra_files) |
|
else: |
|
ts.save(str(f), _extra_files=extra_files) |
|
return f, None |
|
|
|
@try_export |
|
def export_onnx(self, prefix=colorstr("ONNX:")): |
|
"""YOLO ONNX export.""" |
|
requirements = ["onnx>=1.12.0"] |
|
if self.args.simplify: |
|
requirements += ["onnxslim", "onnxruntime" + ("-gpu" if torch.cuda.is_available() else "")] |
|
check_requirements(requirements) |
|
import onnx |
|
|
|
opset_version = self.args.opset or get_latest_opset() |
|
LOGGER.info(f"\n{prefix} starting export with onnx {onnx.__version__} opset {opset_version}...") |
|
f = str(self.file.with_suffix(".onnx")) |
|
|
|
output_names = ["output0", "output1"] if isinstance(self.model, SegmentationModel) else ["output0"] |
|
dynamic = self.args.dynamic |
|
if dynamic: |
|
dynamic = {"images": {0: "batch", 2: "height", 3: "width"}} |
|
if isinstance(self.model, SegmentationModel): |
|
dynamic["output0"] = {0: "batch", 2: "anchors"} |
|
dynamic["output1"] = {0: "batch", 2: "mask_height", 3: "mask_width"} |
|
elif isinstance(self.model, DetectionModel): |
|
dynamic["output0"] = {0: "batch", 2: "anchors"} |
|
|
|
torch.onnx.export( |
|
self.model.cpu() if dynamic else self.model, |
|
self.im.cpu() if dynamic else self.im, |
|
f, |
|
verbose=False, |
|
opset_version=opset_version, |
|
do_constant_folding=True, |
|
input_names=["images"], |
|
output_names=output_names, |
|
dynamic_axes=dynamic or None, |
|
) |
|
|
|
|
|
model_onnx = onnx.load(f) |
|
|
|
|
|
if self.args.simplify: |
|
try: |
|
import onnxslim |
|
|
|
LOGGER.info(f"{prefix} slimming with onnxslim {onnxslim.__version__}...") |
|
model_onnx = onnxslim.slim(model_onnx) |
|
|
|
except Exception as e: |
|
LOGGER.warning(f"{prefix} simplifier failure: {e}") |
|
|
|
|
|
for k, v in self.metadata.items(): |
|
meta = model_onnx.metadata_props.add() |
|
meta.key, meta.value = k, str(v) |
|
|
|
onnx.save(model_onnx, f) |
|
return f, model_onnx |
|
|
|
@try_export |
|
def export_openvino(self, prefix=colorstr("OpenVINO:")): |
|
"""YOLO OpenVINO export.""" |
|
check_requirements("openvino>=2024.5.0") |
|
import openvino as ov |
|
|
|
LOGGER.info(f"\n{prefix} starting export with openvino {ov.__version__}...") |
|
assert TORCH_1_13, f"OpenVINO export requires torch>=1.13.0 but torch=={torch.__version__} is installed" |
|
ov_model = ov.convert_model( |
|
self.model, |
|
input=None if self.args.dynamic else [self.im.shape], |
|
example_input=self.im, |
|
) |
|
|
|
def serialize(ov_model, file): |
|
"""Set RT info, serialize and save metadata YAML.""" |
|
ov_model.set_rt_info("YOLO", ["model_info", "model_type"]) |
|
ov_model.set_rt_info(True, ["model_info", "reverse_input_channels"]) |
|
ov_model.set_rt_info(114, ["model_info", "pad_value"]) |
|
ov_model.set_rt_info([255.0], ["model_info", "scale_values"]) |
|
ov_model.set_rt_info(self.args.iou, ["model_info", "iou_threshold"]) |
|
ov_model.set_rt_info([v.replace(" ", "_") for v in self.model.names.values()], ["model_info", "labels"]) |
|
if self.model.task != "classify": |
|
ov_model.set_rt_info("fit_to_window_letterbox", ["model_info", "resize_type"]) |
|
|
|
ov.runtime.save_model(ov_model, file, compress_to_fp16=self.args.half) |
|
yaml_save(Path(file).parent / "metadata.yaml", self.metadata) |
|
|
|
if self.args.int8: |
|
fq = str(self.file).replace(self.file.suffix, f"_int8_openvino_model{os.sep}") |
|
fq_ov = str(Path(fq) / self.file.with_suffix(".xml").name) |
|
check_requirements("nncf>=2.14.0") |
|
import nncf |
|
|
|
def transform_fn(data_item) -> np.ndarray: |
|
"""Quantization transform function.""" |
|
data_item: torch.Tensor = data_item["img"] if isinstance(data_item, dict) else data_item |
|
assert data_item.dtype == torch.uint8, "Input image must be uint8 for the quantization preprocessing" |
|
im = data_item.numpy().astype(np.float32) / 255.0 |
|
return np.expand_dims(im, 0) if im.ndim == 3 else im |
|
|
|
|
|
ignored_scope = None |
|
if isinstance(self.model.model[-1], Detect): |
|
|
|
head_module_name = ".".join(list(self.model.named_modules())[-1][0].split(".")[:2]) |
|
ignored_scope = nncf.IgnoredScope( |
|
patterns=[ |
|
f".*{head_module_name}/.*/Add", |
|
f".*{head_module_name}/.*/Sub*", |
|
f".*{head_module_name}/.*/Mul*", |
|
f".*{head_module_name}/.*/Div*", |
|
f".*{head_module_name}\\.dfl.*", |
|
], |
|
types=["Sigmoid"], |
|
) |
|
|
|
quantized_ov_model = nncf.quantize( |
|
model=ov_model, |
|
calibration_dataset=nncf.Dataset(self.get_int8_calibration_dataloader(prefix), transform_fn), |
|
preset=nncf.QuantizationPreset.MIXED, |
|
ignored_scope=ignored_scope, |
|
) |
|
serialize(quantized_ov_model, fq_ov) |
|
return fq, None |
|
|
|
f = str(self.file).replace(self.file.suffix, f"_openvino_model{os.sep}") |
|
f_ov = str(Path(f) / self.file.with_suffix(".xml").name) |
|
|
|
serialize(ov_model, f_ov) |
|
return f, None |
|
|
|
@try_export |
|
def export_paddle(self, prefix=colorstr("PaddlePaddle:")): |
|
"""YOLO Paddle export.""" |
|
check_requirements(("paddlepaddle-gpu" if torch.cuda.is_available() else "paddlepaddle", "x2paddle")) |
|
import x2paddle |
|
from x2paddle.convert import pytorch2paddle |
|
|
|
LOGGER.info(f"\n{prefix} starting export with X2Paddle {x2paddle.__version__}...") |
|
f = str(self.file).replace(self.file.suffix, f"_paddle_model{os.sep}") |
|
|
|
pytorch2paddle(module=self.model, save_dir=f, jit_type="trace", input_examples=[self.im]) |
|
yaml_save(Path(f) / "metadata.yaml", self.metadata) |
|
return f, None |
|
|
|
@try_export |
|
def export_mnn(self, prefix=colorstr("MNN:")): |
|
"""YOLOv8 MNN export using MNN https://github.com/alibaba/MNN.""" |
|
f_onnx, _ = self.export_onnx() |
|
|
|
check_requirements("MNN>=2.9.6") |
|
import MNN |
|
from MNN.tools import mnnconvert |
|
|
|
|
|
LOGGER.info(f"\n{prefix} starting export with MNN {MNN.version()}...") |
|
assert Path(f_onnx).exists(), f"failed to export ONNX file: {f_onnx}" |
|
f = str(self.file.with_suffix(".mnn")) |
|
args = ["", "-f", "ONNX", "--modelFile", f_onnx, "--MNNModel", f, "--bizCode", json.dumps(self.metadata)] |
|
if self.args.int8: |
|
args.extend(("--weightQuantBits", "8")) |
|
if self.args.half: |
|
args.append("--fp16") |
|
mnnconvert.convert(args) |
|
|
|
convert_scratch = Path(self.file.parent / ".__convert_external_data.bin") |
|
if convert_scratch.exists(): |
|
convert_scratch.unlink() |
|
return f, None |
|
|
|
@try_export |
|
def export_ncnn(self, prefix=colorstr("NCNN:")): |
|
"""YOLO NCNN export using PNNX https://github.com/pnnx/pnnx.""" |
|
check_requirements("ncnn") |
|
import ncnn |
|
|
|
LOGGER.info(f"\n{prefix} starting export with NCNN {ncnn.__version__}...") |
|
f = Path(str(self.file).replace(self.file.suffix, f"_ncnn_model{os.sep}")) |
|
f_ts = self.file.with_suffix(".torchscript") |
|
|
|
name = Path("pnnx.exe" if WINDOWS else "pnnx") |
|
pnnx = name if name.is_file() else (ROOT / name) |
|
if not pnnx.is_file(): |
|
LOGGER.warning( |
|
f"{prefix} WARNING ⚠️ PNNX not found. Attempting to download binary file from " |
|
"https://github.com/pnnx/pnnx/.\nNote PNNX Binary file must be placed in current working directory " |
|
f"or in {ROOT}. See PNNX repo for full installation instructions." |
|
) |
|
system = "macos" if MACOS else "windows" if WINDOWS else "linux-aarch64" if ARM64 else "linux" |
|
try: |
|
release, assets = get_github_assets(repo="pnnx/pnnx") |
|
asset = [x for x in assets if f"{system}.zip" in x][0] |
|
assert isinstance(asset, str), "Unable to retrieve PNNX repo assets" |
|
LOGGER.info(f"{prefix} successfully found latest PNNX asset file {asset}") |
|
except Exception as e: |
|
release = "20240410" |
|
asset = f"pnnx-{release}-{system}.zip" |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ PNNX GitHub assets not found: {e}, using default {asset}") |
|
unzip_dir = safe_download(f"https://github.com/pnnx/pnnx/releases/download/{release}/{asset}", delete=True) |
|
if check_is_path_safe(Path.cwd(), unzip_dir): |
|
shutil.move(src=unzip_dir / name, dst=pnnx) |
|
pnnx.chmod(0o777) |
|
shutil.rmtree(unzip_dir) |
|
|
|
ncnn_args = [ |
|
f"ncnnparam={f / 'model.ncnn.param'}", |
|
f"ncnnbin={f / 'model.ncnn.bin'}", |
|
f"ncnnpy={f / 'model_ncnn.py'}", |
|
] |
|
|
|
pnnx_args = [ |
|
f"pnnxparam={f / 'model.pnnx.param'}", |
|
f"pnnxbin={f / 'model.pnnx.bin'}", |
|
f"pnnxpy={f / 'model_pnnx.py'}", |
|
f"pnnxonnx={f / 'model.pnnx.onnx'}", |
|
] |
|
|
|
cmd = [ |
|
str(pnnx), |
|
str(f_ts), |
|
*ncnn_args, |
|
*pnnx_args, |
|
f"fp16={int(self.args.half)}", |
|
f"device={self.device.type}", |
|
f'inputshape="{[self.args.batch, 3, *self.imgsz]}"', |
|
] |
|
f.mkdir(exist_ok=True) |
|
LOGGER.info(f"{prefix} running '{' '.join(cmd)}'") |
|
subprocess.run(cmd, check=True) |
|
|
|
|
|
pnnx_files = [x.split("=")[-1] for x in pnnx_args] |
|
for f_debug in ("debug.bin", "debug.param", "debug2.bin", "debug2.param", *pnnx_files): |
|
Path(f_debug).unlink(missing_ok=True) |
|
|
|
yaml_save(f / "metadata.yaml", self.metadata) |
|
return str(f), None |
|
|
|
@try_export |
|
def export_coreml(self, prefix=colorstr("CoreML:")): |
|
"""YOLO CoreML export.""" |
|
mlmodel = self.args.format.lower() == "mlmodel" |
|
check_requirements("coremltools>=6.0,<=6.2" if mlmodel else "coremltools>=7.0") |
|
import coremltools as ct |
|
|
|
LOGGER.info(f"\n{prefix} starting export with coremltools {ct.__version__}...") |
|
assert not WINDOWS, "CoreML export is not supported on Windows, please run on macOS or Linux." |
|
assert self.args.batch == 1, "CoreML batch sizes > 1 are not supported. Please retry at 'batch=1'." |
|
f = self.file.with_suffix(".mlmodel" if mlmodel else ".mlpackage") |
|
if f.is_dir(): |
|
shutil.rmtree(f) |
|
if self.args.nms and getattr(self.model, "end2end", False): |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ 'nms=True' is not available for end2end models. Forcing 'nms=False'.") |
|
self.args.nms = False |
|
|
|
bias = [0.0, 0.0, 0.0] |
|
scale = 1 / 255 |
|
classifier_config = None |
|
if self.model.task == "classify": |
|
classifier_config = ct.ClassifierConfig(list(self.model.names.values())) if self.args.nms else None |
|
model = self.model |
|
elif self.model.task == "detect": |
|
model = IOSDetectModel(self.model, self.im) if self.args.nms else self.model |
|
else: |
|
if self.args.nms: |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ 'nms=True' is only available for Detect models like 'yolov8n.pt'.") |
|
|
|
model = self.model |
|
|
|
ts = torch.jit.trace(model.eval(), self.im, strict=False) |
|
ct_model = ct.convert( |
|
ts, |
|
inputs=[ct.ImageType("image", shape=self.im.shape, scale=scale, bias=bias)], |
|
classifier_config=classifier_config, |
|
convert_to="neuralnetwork" if mlmodel else "mlprogram", |
|
) |
|
bits, mode = (8, "kmeans") if self.args.int8 else (16, "linear") if self.args.half else (32, None) |
|
if bits < 32: |
|
if "kmeans" in mode: |
|
check_requirements("scikit-learn") |
|
if mlmodel: |
|
ct_model = ct.models.neural_network.quantization_utils.quantize_weights(ct_model, bits, mode) |
|
elif bits == 8: |
|
import coremltools.optimize.coreml as cto |
|
|
|
op_config = cto.OpPalettizerConfig(mode="kmeans", nbits=bits, weight_threshold=512) |
|
config = cto.OptimizationConfig(global_config=op_config) |
|
ct_model = cto.palettize_weights(ct_model, config=config) |
|
if self.args.nms and self.model.task == "detect": |
|
if mlmodel: |
|
|
|
check_version(PYTHON_VERSION, "<3.11", name="Python ", hard=True) |
|
weights_dir = None |
|
else: |
|
ct_model.save(str(f)) |
|
weights_dir = str(f / "Data/com.apple.CoreML/weights") |
|
ct_model = self._pipeline_coreml(ct_model, weights_dir=weights_dir) |
|
|
|
m = self.metadata |
|
ct_model.short_description = m.pop("description") |
|
ct_model.author = m.pop("author") |
|
ct_model.license = m.pop("license") |
|
ct_model.version = m.pop("version") |
|
ct_model.user_defined_metadata.update({k: str(v) for k, v in m.items()}) |
|
try: |
|
ct_model.save(str(f)) |
|
except Exception as e: |
|
LOGGER.warning( |
|
f"{prefix} WARNING ⚠️ CoreML export to *.mlpackage failed ({e}), reverting to *.mlmodel export. " |
|
f"Known coremltools Python 3.11 and Windows bugs https://github.com/apple/coremltools/issues/1928." |
|
) |
|
f = f.with_suffix(".mlmodel") |
|
ct_model.save(str(f)) |
|
return f, ct_model |
|
|
|
@try_export |
|
def export_engine(self, dla=None, prefix=colorstr("TensorRT:")): |
|
"""YOLO TensorRT export https://developer.nvidia.com/tensorrt.""" |
|
assert self.im.device.type != "cpu", "export running on CPU but must be on GPU, i.e. use 'device=0'" |
|
f_onnx, _ = self.export_onnx() |
|
|
|
try: |
|
import tensorrt as trt |
|
except ImportError: |
|
if LINUX: |
|
check_requirements("tensorrt>7.0.0,!=10.1.0") |
|
import tensorrt as trt |
|
check_version(trt.__version__, ">=7.0.0", hard=True) |
|
check_version(trt.__version__, "!=10.1.0", msg="https://github.com/ultralytics/ultralytics/pull/14239") |
|
|
|
|
|
LOGGER.info(f"\n{prefix} starting export with TensorRT {trt.__version__}...") |
|
is_trt10 = int(trt.__version__.split(".")[0]) >= 10 |
|
assert Path(f_onnx).exists(), f"failed to export ONNX file: {f_onnx}" |
|
f = self.file.with_suffix(".engine") |
|
logger = trt.Logger(trt.Logger.INFO) |
|
if self.args.verbose: |
|
logger.min_severity = trt.Logger.Severity.VERBOSE |
|
|
|
|
|
builder = trt.Builder(logger) |
|
config = builder.create_builder_config() |
|
workspace = int(self.args.workspace * (1 << 30)) if self.args.workspace is not None else 0 |
|
if is_trt10 and workspace > 0: |
|
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace) |
|
elif workspace > 0: |
|
config.max_workspace_size = workspace |
|
flag = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) |
|
network = builder.create_network(flag) |
|
half = builder.platform_has_fast_fp16 and self.args.half |
|
int8 = builder.platform_has_fast_int8 and self.args.int8 |
|
|
|
|
|
if dla is not None: |
|
if not IS_JETSON: |
|
raise ValueError("DLA is only available on NVIDIA Jetson devices") |
|
LOGGER.info(f"{prefix} enabling DLA on core {dla}...") |
|
if not self.args.half and not self.args.int8: |
|
raise ValueError( |
|
"DLA requires either 'half=True' (FP16) or 'int8=True' (INT8) to be enabled. Please enable one of them and try again." |
|
) |
|
config.default_device_type = trt.DeviceType.DLA |
|
config.DLA_core = int(dla) |
|
config.set_flag(trt.BuilderFlag.GPU_FALLBACK) |
|
|
|
|
|
parser = trt.OnnxParser(network, logger) |
|
if not parser.parse_from_file(f_onnx): |
|
raise RuntimeError(f"failed to load ONNX file: {f_onnx}") |
|
|
|
|
|
inputs = [network.get_input(i) for i in range(network.num_inputs)] |
|
outputs = [network.get_output(i) for i in range(network.num_outputs)] |
|
for inp in inputs: |
|
LOGGER.info(f'{prefix} input "{inp.name}" with shape{inp.shape} {inp.dtype}') |
|
for out in outputs: |
|
LOGGER.info(f'{prefix} output "{out.name}" with shape{out.shape} {out.dtype}') |
|
|
|
if self.args.dynamic: |
|
shape = self.im.shape |
|
if shape[0] <= 1: |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ 'dynamic=True' model requires max batch size, i.e. 'batch=16'") |
|
profile = builder.create_optimization_profile() |
|
min_shape = (1, shape[1], 32, 32) |
|
max_shape = (*shape[:2], *(int(max(1, workspace) * d) for d in shape[2:])) |
|
for inp in inputs: |
|
profile.set_shape(inp.name, min=min_shape, opt=shape, max=max_shape) |
|
config.add_optimization_profile(profile) |
|
|
|
LOGGER.info(f"{prefix} building {'INT8' if int8 else 'FP' + ('16' if half else '32')} engine as {f}") |
|
if int8: |
|
config.set_flag(trt.BuilderFlag.INT8) |
|
config.set_calibration_profile(profile) |
|
config.profiling_verbosity = trt.ProfilingVerbosity.DETAILED |
|
|
|
class EngineCalibrator(trt.IInt8Calibrator): |
|
def __init__( |
|
self, |
|
dataset, |
|
batch: int, |
|
cache: str = "", |
|
) -> None: |
|
trt.IInt8Calibrator.__init__(self) |
|
self.dataset = dataset |
|
self.data_iter = iter(dataset) |
|
self.algo = trt.CalibrationAlgoType.ENTROPY_CALIBRATION_2 |
|
self.batch = batch |
|
self.cache = Path(cache) |
|
|
|
def get_algorithm(self) -> trt.CalibrationAlgoType: |
|
"""Get the calibration algorithm to use.""" |
|
return self.algo |
|
|
|
def get_batch_size(self) -> int: |
|
"""Get the batch size to use for calibration.""" |
|
return self.batch or 1 |
|
|
|
def get_batch(self, names) -> list: |
|
"""Get the next batch to use for calibration, as a list of device memory pointers.""" |
|
try: |
|
im0s = next(self.data_iter)["img"] / 255.0 |
|
im0s = im0s.to("cuda") if im0s.device.type == "cpu" else im0s |
|
return [int(im0s.data_ptr())] |
|
except StopIteration: |
|
|
|
return None |
|
|
|
def read_calibration_cache(self) -> bytes: |
|
"""Use existing cache instead of calibrating again, otherwise, implicitly return None.""" |
|
if self.cache.exists() and self.cache.suffix == ".cache": |
|
return self.cache.read_bytes() |
|
|
|
def write_calibration_cache(self, cache) -> None: |
|
"""Write calibration cache to disk.""" |
|
_ = self.cache.write_bytes(cache) |
|
|
|
|
|
config.int8_calibrator = EngineCalibrator( |
|
dataset=self.get_int8_calibration_dataloader(prefix), |
|
batch=2 * self.args.batch, |
|
cache=str(self.file.with_suffix(".cache")), |
|
) |
|
|
|
elif half: |
|
config.set_flag(trt.BuilderFlag.FP16) |
|
|
|
|
|
del self.model |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
|
|
|
|
build = builder.build_serialized_network if is_trt10 else builder.build_engine |
|
with build(network, config) as engine, open(f, "wb") as t: |
|
|
|
meta = json.dumps(self.metadata) |
|
t.write(len(meta).to_bytes(4, byteorder="little", signed=True)) |
|
t.write(meta.encode()) |
|
|
|
t.write(engine if is_trt10 else engine.serialize()) |
|
|
|
return f, None |
|
|
|
@try_export |
|
def export_saved_model(self, prefix=colorstr("TensorFlow SavedModel:")): |
|
"""YOLO TensorFlow SavedModel export.""" |
|
cuda = torch.cuda.is_available() |
|
try: |
|
import tensorflow as tf |
|
except ImportError: |
|
suffix = "-macos" if MACOS else "-aarch64" if ARM64 else "" if cuda else "-cpu" |
|
version = ">=2.0.0" |
|
check_requirements(f"tensorflow{suffix}{version}") |
|
import tensorflow as tf |
|
check_requirements( |
|
( |
|
"keras", |
|
"tf_keras", |
|
"sng4onnx>=1.0.1", |
|
"onnx_graphsurgeon>=0.3.26", |
|
"onnx>=1.12.0", |
|
"onnx2tf>1.17.5,<=1.26.3", |
|
"onnxslim>=0.1.31", |
|
"tflite_support<=0.4.3" if IS_JETSON else "tflite_support", |
|
"flatbuffers>=23.5.26,<100", |
|
"onnxruntime-gpu" if cuda else "onnxruntime", |
|
), |
|
cmds="--extra-index-url https://pypi.ngc.nvidia.com", |
|
) |
|
|
|
LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") |
|
check_version( |
|
tf.__version__, |
|
">=2.0.0", |
|
name="tensorflow", |
|
verbose=True, |
|
msg="https://github.com/ultralytics/ultralytics/issues/5161", |
|
) |
|
import onnx2tf |
|
|
|
f = Path(str(self.file).replace(self.file.suffix, "_saved_model")) |
|
if f.is_dir(): |
|
shutil.rmtree(f) |
|
|
|
|
|
onnx2tf_file = Path("calibration_image_sample_data_20x128x128x3_float32.npy") |
|
if not onnx2tf_file.exists(): |
|
attempt_download_asset(f"{onnx2tf_file}.zip", unzip=True, delete=True) |
|
|
|
|
|
self.args.simplify = True |
|
f_onnx, _ = self.export_onnx() |
|
|
|
|
|
np_data = None |
|
if self.args.int8: |
|
tmp_file = f / "tmp_tflite_int8_calibration_images.npy" |
|
if self.args.data: |
|
f.mkdir() |
|
images = [batch["img"] for batch in self.get_int8_calibration_dataloader(prefix)] |
|
images = torch.nn.functional.interpolate(torch.cat(images, 0).float(), size=self.imgsz).permute( |
|
0, 2, 3, 1 |
|
) |
|
np.save(str(tmp_file), images.numpy().astype(np.float32)) |
|
np_data = [["images", tmp_file, [[[[0, 0, 0]]]], [[[[255, 255, 255]]]]]] |
|
|
|
LOGGER.info(f"{prefix} starting TFLite export with onnx2tf {onnx2tf.__version__}...") |
|
keras_model = onnx2tf.convert( |
|
input_onnx_file_path=f_onnx, |
|
output_folder_path=str(f), |
|
not_use_onnxsim=True, |
|
verbosity="error", |
|
output_integer_quantized_tflite=self.args.int8, |
|
quant_type="per-tensor", |
|
custom_input_op_name_np_data_path=np_data, |
|
disable_group_convolution=True, |
|
enable_batchmatmul_unfold=True, |
|
) |
|
yaml_save(f / "metadata.yaml", self.metadata) |
|
|
|
|
|
if self.args.int8: |
|
tmp_file.unlink(missing_ok=True) |
|
for file in f.rglob("*_dynamic_range_quant.tflite"): |
|
file.rename(file.with_name(file.stem.replace("_dynamic_range_quant", "_int8") + file.suffix)) |
|
for file in f.rglob("*_integer_quant_with_int16_act.tflite"): |
|
file.unlink() |
|
|
|
|
|
for file in f.rglob("*.tflite"): |
|
f.unlink() if "quant_with_int16_act.tflite" in str(f) else self._add_tflite_metadata(file) |
|
|
|
return str(f), keras_model |
|
|
|
@try_export |
|
def export_pb(self, keras_model, prefix=colorstr("TensorFlow GraphDef:")): |
|
"""YOLO TensorFlow GraphDef *.pb export https://github.com/leimao/Frozen_Graph_TensorFlow.""" |
|
import tensorflow as tf |
|
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2 |
|
|
|
LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") |
|
f = self.file.with_suffix(".pb") |
|
|
|
m = tf.function(lambda x: keras_model(x)) |
|
m = m.get_concrete_function(tf.TensorSpec(keras_model.inputs[0].shape, keras_model.inputs[0].dtype)) |
|
frozen_func = convert_variables_to_constants_v2(m) |
|
frozen_func.graph.as_graph_def() |
|
tf.io.write_graph(graph_or_graph_def=frozen_func.graph, logdir=str(f.parent), name=f.name, as_text=False) |
|
return f, None |
|
|
|
@try_export |
|
def export_tflite(self, keras_model, nms, agnostic_nms, prefix=colorstr("TensorFlow Lite:")): |
|
"""YOLO TensorFlow Lite export.""" |
|
|
|
import tensorflow as tf |
|
|
|
LOGGER.info(f"\n{prefix} starting export with tensorflow {tf.__version__}...") |
|
saved_model = Path(str(self.file).replace(self.file.suffix, "_saved_model")) |
|
if self.args.int8: |
|
f = saved_model / f"{self.file.stem}_int8.tflite" |
|
elif self.args.half: |
|
f = saved_model / f"{self.file.stem}_float16.tflite" |
|
else: |
|
f = saved_model / f"{self.file.stem}_float32.tflite" |
|
return str(f), None |
|
|
|
@try_export |
|
def export_edgetpu(self, tflite_model="", prefix=colorstr("Edge TPU:")): |
|
"""YOLO Edge TPU export https://coral.ai/docs/edgetpu/models-intro/.""" |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ Edge TPU known bug https://github.com/ultralytics/ultralytics/issues/1185") |
|
|
|
cmd = "edgetpu_compiler --version" |
|
help_url = "https://coral.ai/docs/edgetpu/compiler/" |
|
assert LINUX, f"export only supported on Linux. See {help_url}" |
|
if subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True).returncode != 0: |
|
LOGGER.info(f"\n{prefix} export requires Edge TPU compiler. Attempting install from {help_url}") |
|
for c in ( |
|
"curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -", |
|
'echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | ' |
|
"sudo tee /etc/apt/sources.list.d/coral-edgetpu.list", |
|
"sudo apt-get update", |
|
"sudo apt-get install edgetpu-compiler", |
|
): |
|
subprocess.run(c if is_sudo_available() else c.replace("sudo ", ""), shell=True, check=True) |
|
ver = subprocess.run(cmd, shell=True, capture_output=True, check=True).stdout.decode().split()[-1] |
|
|
|
LOGGER.info(f"\n{prefix} starting export with Edge TPU compiler {ver}...") |
|
f = str(tflite_model).replace(".tflite", "_edgetpu.tflite") |
|
|
|
cmd = ( |
|
"edgetpu_compiler " |
|
f'--out_dir "{Path(f).parent}" ' |
|
"--show_operations " |
|
"--search_delegate " |
|
"--delegate_search_step 30 " |
|
"--timeout_sec 180 " |
|
f'"{tflite_model}"' |
|
) |
|
LOGGER.info(f"{prefix} running '{cmd}'") |
|
subprocess.run(cmd, shell=True) |
|
self._add_tflite_metadata(f) |
|
return f, None |
|
|
|
@try_export |
|
def export_tfjs(self, prefix=colorstr("TensorFlow.js:")): |
|
"""YOLO TensorFlow.js export.""" |
|
check_requirements("tensorflowjs") |
|
if ARM64: |
|
|
|
check_requirements("numpy==1.23.5") |
|
import tensorflow as tf |
|
import tensorflowjs as tfjs |
|
|
|
LOGGER.info(f"\n{prefix} starting export with tensorflowjs {tfjs.__version__}...") |
|
f = str(self.file).replace(self.file.suffix, "_web_model") |
|
f_pb = str(self.file.with_suffix(".pb")) |
|
|
|
gd = tf.Graph().as_graph_def() |
|
with open(f_pb, "rb") as file: |
|
gd.ParseFromString(file.read()) |
|
outputs = ",".join(gd_outputs(gd)) |
|
LOGGER.info(f"\n{prefix} output node names: {outputs}") |
|
|
|
quantization = "--quantize_float16" if self.args.half else "--quantize_uint8" if self.args.int8 else "" |
|
with spaces_in_path(f_pb) as fpb_, spaces_in_path(f) as f_: |
|
cmd = ( |
|
"tensorflowjs_converter " |
|
f'--input_format=tf_frozen_model {quantization} --output_node_names={outputs} "{fpb_}" "{f_}"' |
|
) |
|
LOGGER.info(f"{prefix} running '{cmd}'") |
|
subprocess.run(cmd, shell=True) |
|
|
|
if " " in f: |
|
LOGGER.warning(f"{prefix} WARNING ⚠️ your model may not work correctly with spaces in path '{f}'.") |
|
|
|
|
|
yaml_save(Path(f) / "metadata.yaml", self.metadata) |
|
return f, None |
|
|
|
@try_export |
|
def export_imx(self, prefix=colorstr("IMX:")): |
|
"""YOLO IMX export.""" |
|
gptq = False |
|
assert LINUX, ( |
|
"export only supported on Linux. See https://developer.aitrios.sony-semicon.com/en/raspberrypi-ai-camera/documentation/imx500-converter" |
|
) |
|
if getattr(self.model, "end2end", False): |
|
raise ValueError("IMX export is not supported for end2end models.") |
|
if "C2f" not in self.model.__str__(): |
|
raise ValueError("IMX export is only supported for YOLOv8n detection models") |
|
check_requirements(("model-compression-toolkit==2.1.1", "sony-custom-layers==0.2.0", "tensorflow==2.12.0")) |
|
check_requirements("imx500-converter[pt]==3.14.3") |
|
|
|
import model_compression_toolkit as mct |
|
import onnx |
|
from sony_custom_layers.pytorch.object_detection.nms import multiclass_nms |
|
|
|
try: |
|
out = subprocess.run( |
|
["java", "--version"], check=True, capture_output=True |
|
) |
|
if "openjdk 17" not in str(out.stdout): |
|
raise FileNotFoundError |
|
except FileNotFoundError: |
|
c = ["apt", "install", "-y", "openjdk-17-jdk", "openjdk-17-jre"] |
|
if is_sudo_available(): |
|
c.insert(0, "sudo") |
|
subprocess.run(c, check=True) |
|
|
|
def representative_dataset_gen(dataloader=self.get_int8_calibration_dataloader(prefix)): |
|
for batch in dataloader: |
|
img = batch["img"] |
|
img = img / 255.0 |
|
yield [img] |
|
|
|
tpc = mct.get_target_platform_capabilities( |
|
fw_name="pytorch", target_platform_name="imx500", target_platform_version="v1" |
|
) |
|
|
|
config = mct.core.CoreConfig( |
|
mixed_precision_config=mct.core.MixedPrecisionQuantizationConfig(num_of_images=10), |
|
quantization_config=mct.core.QuantizationConfig(concat_threshold_update=True), |
|
) |
|
|
|
resource_utilization = mct.core.ResourceUtilization(weights_memory=3146176 * 0.76) |
|
|
|
quant_model = ( |
|
mct.gptq.pytorch_gradient_post_training_quantization( |
|
model=self.model, |
|
representative_data_gen=representative_dataset_gen, |
|
target_resource_utilization=resource_utilization, |
|
gptq_config=mct.gptq.get_pytorch_gptq_config(n_epochs=1000, use_hessian_based_weights=False), |
|
core_config=config, |
|
target_platform_capabilities=tpc, |
|
)[0] |
|
if gptq |
|
else mct.ptq.pytorch_post_training_quantization( |
|
in_module=self.model, |
|
representative_data_gen=representative_dataset_gen, |
|
target_resource_utilization=resource_utilization, |
|
core_config=config, |
|
target_platform_capabilities=tpc, |
|
)[0] |
|
) |
|
|
|
class NMSWrapper(torch.nn.Module): |
|
def __init__( |
|
self, |
|
model: torch.nn.Module, |
|
score_threshold: float = 0.001, |
|
iou_threshold: float = 0.7, |
|
max_detections: int = 300, |
|
): |
|
""" |
|
Wrapping PyTorch Module with multiclass_nms layer from sony_custom_layers. |
|
|
|
Args: |
|
model (nn.Module): Model instance. |
|
score_threshold (float): Score threshold for non-maximum suppression. |
|
iou_threshold (float): Intersection over union threshold for non-maximum suppression. |
|
max_detections (float): The number of detections to return. |
|
""" |
|
super().__init__() |
|
self.model = model |
|
self.score_threshold = score_threshold |
|
self.iou_threshold = iou_threshold |
|
self.max_detections = max_detections |
|
|
|
def forward(self, images): |
|
|
|
outputs = self.model(images) |
|
|
|
boxes = outputs[0] |
|
scores = outputs[1] |
|
nms = multiclass_nms( |
|
boxes=boxes, |
|
scores=scores, |
|
score_threshold=self.score_threshold, |
|
iou_threshold=self.iou_threshold, |
|
max_detections=self.max_detections, |
|
) |
|
return nms |
|
|
|
quant_model = NMSWrapper( |
|
model=quant_model, |
|
score_threshold=self.args.conf or 0.001, |
|
iou_threshold=self.args.iou, |
|
max_detections=self.args.max_det, |
|
).to(self.device) |
|
|
|
f = Path(str(self.file).replace(self.file.suffix, "_imx_model")) |
|
f.mkdir(exist_ok=True) |
|
onnx_model = f / Path(str(self.file).replace(self.file.suffix, "_imx.onnx")) |
|
mct.exporter.pytorch_export_model( |
|
model=quant_model, save_model_path=onnx_model, repr_dataset=representative_dataset_gen |
|
) |
|
|
|
model_onnx = onnx.load(onnx_model) |
|
for k, v in self.metadata.items(): |
|
meta = model_onnx.metadata_props.add() |
|
meta.key, meta.value = k, str(v) |
|
|
|
onnx.save(model_onnx, onnx_model) |
|
|
|
subprocess.run( |
|
["imxconv-pt", "-i", str(onnx_model), "-o", str(f), "--no-input-persistency", "--overwrite-output"], |
|
check=True, |
|
) |
|
|
|
|
|
with open(f / "labels.txt", "w") as file: |
|
file.writelines([f"{name}\n" for _, name in self.model.names.items()]) |
|
|
|
return f, None |
|
|
|
def _add_tflite_metadata(self, file): |
|
"""Add metadata to *.tflite models per https://www.tensorflow.org/lite/models/convert/metadata.""" |
|
import flatbuffers |
|
|
|
try: |
|
|
|
from tensorflow_lite_support.metadata import metadata_schema_py_generated as schema |
|
from tensorflow_lite_support.metadata.python import metadata |
|
except ImportError: |
|
from tflite_support import metadata |
|
from tflite_support import metadata_schema_py_generated as schema |
|
|
|
|
|
model_meta = schema.ModelMetadataT() |
|
model_meta.name = self.metadata["description"] |
|
model_meta.version = self.metadata["version"] |
|
model_meta.author = self.metadata["author"] |
|
model_meta.license = self.metadata["license"] |
|
|
|
|
|
tmp_file = Path(file).parent / "temp_meta.txt" |
|
with open(tmp_file, "w") as f: |
|
f.write(str(self.metadata)) |
|
|
|
label_file = schema.AssociatedFileT() |
|
label_file.name = tmp_file.name |
|
label_file.type = schema.AssociatedFileType.TENSOR_AXIS_LABELS |
|
|
|
|
|
input_meta = schema.TensorMetadataT() |
|
input_meta.name = "image" |
|
input_meta.description = "Input image to be detected." |
|
input_meta.content = schema.ContentT() |
|
input_meta.content.contentProperties = schema.ImagePropertiesT() |
|
input_meta.content.contentProperties.colorSpace = schema.ColorSpaceType.RGB |
|
input_meta.content.contentPropertiesType = schema.ContentProperties.ImageProperties |
|
|
|
|
|
output1 = schema.TensorMetadataT() |
|
output1.name = "output" |
|
output1.description = "Coordinates of detected objects, class labels, and confidence score" |
|
output1.associatedFiles = [label_file] |
|
if self.model.task == "segment": |
|
output2 = schema.TensorMetadataT() |
|
output2.name = "output" |
|
output2.description = "Mask protos" |
|
output2.associatedFiles = [label_file] |
|
|
|
|
|
subgraph = schema.SubGraphMetadataT() |
|
subgraph.inputTensorMetadata = [input_meta] |
|
subgraph.outputTensorMetadata = [output1, output2] if self.model.task == "segment" else [output1] |
|
model_meta.subgraphMetadata = [subgraph] |
|
|
|
b = flatbuffers.Builder(0) |
|
b.Finish(model_meta.Pack(b), metadata.MetadataPopulator.METADATA_FILE_IDENTIFIER) |
|
metadata_buf = b.Output() |
|
|
|
populator = metadata.MetadataPopulator.with_model_file(str(file)) |
|
populator.load_metadata_buffer(metadata_buf) |
|
populator.load_associated_files([str(tmp_file)]) |
|
populator.populate() |
|
tmp_file.unlink() |
|
|
|
def _pipeline_coreml(self, model, weights_dir=None, prefix=colorstr("CoreML Pipeline:")): |
|
"""YOLO CoreML pipeline.""" |
|
import coremltools as ct |
|
|
|
LOGGER.info(f"{prefix} starting pipeline with coremltools {ct.__version__}...") |
|
_, _, h, w = list(self.im.shape) |
|
|
|
|
|
spec = model.get_spec() |
|
out0, out1 = iter(spec.description.output) |
|
if MACOS: |
|
from PIL import Image |
|
|
|
img = Image.new("RGB", (w, h)) |
|
out = model.predict({"image": img}) |
|
out0_shape = out[out0.name].shape |
|
out1_shape = out[out1.name].shape |
|
else: |
|
out0_shape = self.output_shape[2], self.output_shape[1] - 4 |
|
out1_shape = self.output_shape[2], 4 |
|
|
|
|
|
names = self.metadata["names"] |
|
nx, ny = spec.description.input[0].type.imageType.width, spec.description.input[0].type.imageType.height |
|
_, nc = out0_shape |
|
assert len(names) == nc, f"{len(names)} names found for nc={nc}" |
|
|
|
|
|
out0.type.multiArrayType.shape[:] = out0_shape |
|
out1.type.multiArrayType.shape[:] = out1_shape |
|
|
|
|
|
model = ct.models.MLModel(spec, weights_dir=weights_dir) |
|
|
|
|
|
nms_spec = ct.proto.Model_pb2.Model() |
|
nms_spec.specificationVersion = 5 |
|
for i in range(2): |
|
decoder_output = model._spec.description.output[i].SerializeToString() |
|
nms_spec.description.input.add() |
|
nms_spec.description.input[i].ParseFromString(decoder_output) |
|
nms_spec.description.output.add() |
|
nms_spec.description.output[i].ParseFromString(decoder_output) |
|
|
|
nms_spec.description.output[0].name = "confidence" |
|
nms_spec.description.output[1].name = "coordinates" |
|
|
|
output_sizes = [nc, 4] |
|
for i in range(2): |
|
ma_type = nms_spec.description.output[i].type.multiArrayType |
|
ma_type.shapeRange.sizeRanges.add() |
|
ma_type.shapeRange.sizeRanges[0].lowerBound = 0 |
|
ma_type.shapeRange.sizeRanges[0].upperBound = -1 |
|
ma_type.shapeRange.sizeRanges.add() |
|
ma_type.shapeRange.sizeRanges[1].lowerBound = output_sizes[i] |
|
ma_type.shapeRange.sizeRanges[1].upperBound = output_sizes[i] |
|
del ma_type.shape[:] |
|
|
|
nms = nms_spec.nonMaximumSuppression |
|
nms.confidenceInputFeatureName = out0.name |
|
nms.coordinatesInputFeatureName = out1.name |
|
nms.confidenceOutputFeatureName = "confidence" |
|
nms.coordinatesOutputFeatureName = "coordinates" |
|
nms.iouThresholdInputFeatureName = "iouThreshold" |
|
nms.confidenceThresholdInputFeatureName = "confidenceThreshold" |
|
nms.iouThreshold = 0.45 |
|
nms.confidenceThreshold = 0.25 |
|
nms.pickTop.perClass = True |
|
nms.stringClassLabels.vector.extend(names.values()) |
|
nms_model = ct.models.MLModel(nms_spec) |
|
|
|
|
|
pipeline = ct.models.pipeline.Pipeline( |
|
input_features=[ |
|
("image", ct.models.datatypes.Array(3, ny, nx)), |
|
("iouThreshold", ct.models.datatypes.Double()), |
|
("confidenceThreshold", ct.models.datatypes.Double()), |
|
], |
|
output_features=["confidence", "coordinates"], |
|
) |
|
pipeline.add_model(model) |
|
pipeline.add_model(nms_model) |
|
|
|
|
|
pipeline.spec.description.input[0].ParseFromString(model._spec.description.input[0].SerializeToString()) |
|
pipeline.spec.description.output[0].ParseFromString(nms_model._spec.description.output[0].SerializeToString()) |
|
pipeline.spec.description.output[1].ParseFromString(nms_model._spec.description.output[1].SerializeToString()) |
|
|
|
|
|
pipeline.spec.specificationVersion = 5 |
|
pipeline.spec.description.metadata.userDefined.update( |
|
{"IoU threshold": str(nms.iouThreshold), "Confidence threshold": str(nms.confidenceThreshold)} |
|
) |
|
|
|
|
|
model = ct.models.MLModel(pipeline.spec, weights_dir=weights_dir) |
|
model.input_description["image"] = "Input image" |
|
model.input_description["iouThreshold"] = f"(optional) IoU threshold override (default: {nms.iouThreshold})" |
|
model.input_description["confidenceThreshold"] = ( |
|
f"(optional) Confidence threshold override (default: {nms.confidenceThreshold})" |
|
) |
|
model.output_description["confidence"] = 'Boxes × Class confidence (see user-defined metadata "classes")' |
|
model.output_description["coordinates"] = "Boxes × [x, y, width, height] (relative to image size)" |
|
LOGGER.info(f"{prefix} pipeline success") |
|
return model |
|
|
|
def add_callback(self, event: str, callback): |
|
"""Appends the given callback.""" |
|
self.callbacks[event].append(callback) |
|
|
|
def run_callbacks(self, event: str): |
|
"""Execute all callbacks for a given event.""" |
|
for callback in self.callbacks.get(event, []): |
|
callback(self) |
|
|
|
|
|
class IOSDetectModel(torch.nn.Module): |
|
"""Wrap an Ultralytics YOLO model for Apple iOS CoreML export.""" |
|
|
|
def __init__(self, model, im): |
|
"""Initialize the IOSDetectModel class with a YOLO model and example image.""" |
|
super().__init__() |
|
_, _, h, w = im.shape |
|
self.model = model |
|
self.nc = len(model.names) |
|
if w == h: |
|
self.normalize = 1.0 / w |
|
else: |
|
self.normalize = torch.tensor([1.0 / w, 1.0 / h, 1.0 / w, 1.0 / h]) |
|
|
|
def forward(self, x): |
|
"""Normalize predictions of object detection model with input size-dependent factors.""" |
|
xywh, cls = self.model(x)[0].transpose(0, 1).split((4, self.nc), 1) |
|
return cls, xywh * self.normalize |
|
|