|
|
|
import random |
|
from numbers import Number |
|
from typing import List, Optional, Sequence, Tuple, Union |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from mmengine.dist import barrier, broadcast, get_dist_info |
|
from mmengine.logging import MessageHub |
|
from mmengine.model import BaseDataPreprocessor |
|
from mmengine.structures import PixelData |
|
from mmengine.utils import is_seq_of |
|
from mmengine.model.utils import stack_batch |
|
from torch import Tensor |
|
|
|
from mmdet.models.utils import unfold_wo_center |
|
from mmdet.models.utils.misc import samplelist_boxtype2tensor |
|
from mmdet.registry import MODELS |
|
from mmdet.structures import DetDataSample |
|
from mmdet.structures.mask import BitmapMasks |
|
from mmdet.utils import ConfigType |
|
import mmcv |
|
import math |
|
|
|
try: |
|
import skimage |
|
except ImportError: |
|
skimage = None |
|
|
|
|
|
|
|
|
|
|
|
@MODELS.register_module() |
|
class HSIImgDataPreprocessor(BaseDataPreprocessor): |
|
"""Image pre-processor for normalization and bgr to rgb conversion. |
|
|
|
Accepts the data sampled by the dataloader, and preprocesses it into the |
|
format of the model input. ``ImgDataPreprocessor`` provides the |
|
basic data pre-processing as follows |
|
|
|
- Collates and moves data to the target device. |
|
- Converts inputs from bgr to rgb if the shape of input is (3, H, W). |
|
- Normalizes image with defined std and mean. |
|
- Pads inputs to the maximum size of current batch with defined |
|
``pad_value``. The padding size can be divisible by a defined |
|
``pad_size_divisor`` |
|
- Stack inputs to batch_inputs. |
|
|
|
For ``ImgDataPreprocessor``, the dimension of the single inputs must be |
|
(3, H, W). |
|
|
|
Note: |
|
``ImgDataPreprocessor`` and its subclass is built in the |
|
constructor of :class:`BaseDataset`. |
|
|
|
Args: |
|
mean (Sequence[float or int], optional): The pixel mean of image |
|
channels. If ``bgr_to_rgb=True`` it means the mean value of R, |
|
G, B channels. If the length of `mean` is 1, it means all |
|
channels have the same mean value, or the input is a gray image. |
|
If it is not specified, images will not be normalized. Defaults |
|
None. |
|
std (Sequence[float or int], optional): The pixel standard deviation of |
|
image channels. If ``bgr_to_rgb=True`` it means the standard |
|
deviation of R, G, B channels. If the length of `std` is 1, |
|
it means all channels have the same standard deviation, or the |
|
input is a gray image. If it is not specified, images will |
|
not be normalized. Defaults None. |
|
pad_size_divisor (int): The size of padded image should be |
|
divisible by ``pad_size_divisor``. Defaults to 1. |
|
pad_value (float or int): The padded pixel value. Defaults to 0. |
|
bgr_to_rgb (bool): whether to convert image from BGR to RGB. |
|
Defaults to False. |
|
rgb_to_bgr (bool): whether to convert image from RGB to RGB. |
|
Defaults to False. |
|
non_blocking (bool): Whether block current process |
|
when transferring data to device. |
|
New in version v0.3.0. |
|
|
|
Note: |
|
if images do not need to be normalized, `std` and `mean` should be |
|
both set to None, otherwise both of them should be set to a tuple of |
|
corresponding values. |
|
""" |
|
|
|
def __init__(self, |
|
mean: Optional[Sequence[Union[float, int]]] = None, |
|
std: Optional[Sequence[Union[float, int]]] = None, |
|
pad_size_divisor: int = 1, |
|
pad_value: Union[float, int] = 0, |
|
non_blocking: Optional[bool] = False): |
|
super().__init__(non_blocking) |
|
assert (mean is None) == (std is None), ( |
|
'mean and std should be both None or tuple') |
|
if mean is not None: |
|
self._enable_normalize = True |
|
self.register_buffer('mean', |
|
torch.tensor(mean).view(-1, 1, 1), False) |
|
self.register_buffer('std', |
|
torch.tensor(std).view(-1, 1, 1), False) |
|
else: |
|
self._enable_normalize = False |
|
self.pad_size_divisor = pad_size_divisor |
|
self.pad_value = pad_value |
|
|
|
def forward(self, data: dict, training: bool = False) -> Union[dict, list]: |
|
"""Performs normalization、padding and bgr2rgb conversion based on |
|
``BaseDataPreprocessor``. |
|
|
|
Args: |
|
data (dict): Data sampled from dataset. If the collate |
|
function of DataLoader is :obj:`pseudo_collate`, data will be a |
|
list of dict. If collate function is :obj:`default_collate`, |
|
data will be a tuple with batch input tensor and list of data |
|
samples. |
|
training (bool): Whether to enable training time augmentation. If |
|
subclasses override this method, they can perform different |
|
preprocessing strategies for training and testing based on the |
|
value of ``training``. |
|
|
|
Returns: |
|
dict or list: Data in the same format as the model input. |
|
""" |
|
data = self.cast_data(data) |
|
_batch_inputs = data['inputs'] |
|
|
|
if is_seq_of(_batch_inputs, torch.Tensor): |
|
batch_inputs = [] |
|
for _batch_input in _batch_inputs: |
|
|
|
_batch_input = _batch_input.float() |
|
|
|
batch_inputs.append(_batch_input) |
|
|
|
batch_inputs = stack_batch(batch_inputs, self.pad_size_divisor, |
|
self.pad_value) |
|
|
|
elif isinstance(_batch_inputs, torch.Tensor): |
|
assert _batch_inputs.dim() == 4, ( |
|
'The input of `ImgDataPreprocessor` should be a NCHW tensor ' |
|
'or a list of tensor, but got a tensor with shape: ' |
|
f'{_batch_inputs.shape}') |
|
|
|
|
|
_batch_inputs = _batch_inputs.float() |
|
h, w = _batch_inputs.shape[2:] |
|
target_h = math.ceil( |
|
h / self.pad_size_divisor) * self.pad_size_divisor |
|
target_w = math.ceil( |
|
w / self.pad_size_divisor) * self.pad_size_divisor |
|
pad_h = target_h - h |
|
pad_w = target_w - w |
|
batch_inputs = F.pad(_batch_inputs, (0, pad_w, 0, pad_h), |
|
'constant', self.pad_value) |
|
else: |
|
raise TypeError('Output of `cast_data` should be a dict of ' |
|
'list/tuple with inputs and data_samples, ' |
|
f'but got {type(data)}: {data}') |
|
data['inputs'] = batch_inputs |
|
data.setdefault('data_samples', None) |
|
return data |
|
|
|
@MODELS.register_module() |
|
class HSIDetDataPreprocessor(HSIImgDataPreprocessor): |
|
"""Image pre-processor for detection tasks. |
|
|
|
Comparing with the :class:`mmengine.ImgDataPreprocessor`, |
|
|
|
1. It supports batch augmentations. |
|
2. It will additionally append batch_input_shape and pad_shape |
|
to data_samples considering the object detection task. |
|
|
|
It provides the data pre-processing as follows |
|
|
|
- Collate and move data to the target device. |
|
- Pad inputs to the maximum size of current batch with defined |
|
``pad_value``. The padding size can be divisible by a defined |
|
``pad_size_divisor`` |
|
- Stack inputs to batch_inputs. |
|
- Convert inputs from bgr to rgb if the shape of input is (3, H, W). |
|
- Normalize image with defined std and mean. |
|
- Do batch augmentations during training. |
|
|
|
Args: |
|
mean (Sequence[Number], optional): The pixel mean of R, G, B channels. |
|
Defaults to None. |
|
std (Sequence[Number], optional): The pixel standard deviation of |
|
R, G, B channels. Defaults to None. |
|
pad_size_divisor (int): The size of padded image should be |
|
divisible by ``pad_size_divisor``. Defaults to 1. |
|
pad_value (Number): The padded pixel value. Defaults to 0. |
|
pad_mask (bool): Whether to pad instance masks. Defaults to False. |
|
mask_pad_value (int): The padded pixel value for instance masks. |
|
Defaults to 0. |
|
pad_seg (bool): Whether to pad semantic segmentation maps. |
|
Defaults to False. |
|
seg_pad_value (int): The padded pixel value for semantic |
|
segmentation maps. Defaults to 255. |
|
bgr_to_rgb (bool): whether to convert image from BGR to RGB. |
|
Defaults to False. |
|
rgb_to_bgr (bool): whether to convert image from RGB to RGB. |
|
Defaults to False. |
|
boxtype2tensor (bool): Whether to keep the ``BaseBoxes`` type of |
|
bboxes data or not. Defaults to True. |
|
non_blocking (bool): Whether block current process |
|
when transferring data to device. Defaults to False. |
|
batch_augments (list[dict], optional): Batch-level augmentations |
|
""" |
|
|
|
def __init__(self, |
|
mean: Sequence[Number] = None, |
|
std: Sequence[Number] = None, |
|
pad_size_divisor: int = 1, |
|
pad_value: Union[float, int] = 0, |
|
pad_mask: bool = False, |
|
mask_pad_value: int = 0, |
|
pad_seg: bool = False, |
|
seg_pad_value: int = 255, |
|
boxtype2tensor: bool = True, |
|
non_blocking: Optional[bool] = False, |
|
batch_augments: Optional[List[dict]] = None): |
|
super().__init__( |
|
mean=mean, |
|
std=std, |
|
pad_size_divisor=pad_size_divisor, |
|
pad_value=pad_value, |
|
non_blocking=non_blocking) |
|
if batch_augments is not None: |
|
self.batch_augments = nn.ModuleList( |
|
[MODELS.build(aug) for aug in batch_augments]) |
|
else: |
|
self.batch_augments = None |
|
self.pad_mask = pad_mask |
|
self.mask_pad_value = mask_pad_value |
|
self.pad_seg = pad_seg |
|
self.seg_pad_value = seg_pad_value |
|
self.boxtype2tensor = boxtype2tensor |
|
|
|
def forward(self, data: dict, training: bool = False) -> dict: |
|
"""Perform normalization、padding and bgr2rgb conversion based on |
|
``BaseDataPreprocessor``. |
|
|
|
Args: |
|
data (dict): Data sampled from dataloader. |
|
training (bool): Whether to enable training time augmentation. |
|
|
|
Returns: |
|
dict: Data in the same format as the model input. |
|
""" |
|
batch_pad_shape = self._get_pad_shape(data) |
|
data = super().forward(data=data, training=training) |
|
inputs, data_samples = data['inputs'], data['data_samples'] |
|
|
|
if data_samples is not None: |
|
|
|
|
|
|
|
batch_input_shape = tuple(inputs[0].size()[-2:]) |
|
for data_sample, pad_shape in zip(data_samples, batch_pad_shape): |
|
data_sample.set_metainfo({ |
|
'batch_input_shape': batch_input_shape, |
|
'pad_shape': pad_shape |
|
}) |
|
|
|
if self.boxtype2tensor: |
|
samplelist_boxtype2tensor(data_samples) |
|
|
|
if hasattr(data_samples[0].gt_instances, 'masks') and training: |
|
self.pad_gt_masks(data_samples) |
|
if hasattr(data_samples[0], 'gt_pixel') and training: |
|
self.pad_gt_pixel(data_samples) |
|
|
|
|
|
if self.pad_seg and training: |
|
self.pad_gt_sem_seg(data_samples) |
|
|
|
if training and self.batch_augments is not None: |
|
for batch_aug in self.batch_augments: |
|
inputs, data_samples = batch_aug(inputs, data_samples) |
|
|
|
return {'inputs': inputs, 'data_samples': data_samples} |
|
|
|
def _get_pad_shape(self, data: dict) -> List[tuple]: |
|
"""Get the pad_shape of each image based on data and |
|
pad_size_divisor.""" |
|
_batch_inputs = data['inputs'] |
|
|
|
if is_seq_of(_batch_inputs, torch.Tensor): |
|
batch_pad_shape = [] |
|
for ori_input in _batch_inputs: |
|
pad_h = int( |
|
np.ceil(ori_input.shape[1] / |
|
self.pad_size_divisor)) * self.pad_size_divisor |
|
pad_w = int( |
|
np.ceil(ori_input.shape[2] / |
|
self.pad_size_divisor)) * self.pad_size_divisor |
|
batch_pad_shape.append((pad_h, pad_w)) |
|
|
|
elif isinstance(_batch_inputs, torch.Tensor): |
|
assert _batch_inputs.dim() == 4, ( |
|
'The input of `ImgDataPreprocessor` should be a NCHW tensor ' |
|
'or a list of tensor, but got a tensor with shape: ' |
|
f'{_batch_inputs.shape}') |
|
pad_h = int( |
|
np.ceil(_batch_inputs.shape[1] / |
|
self.pad_size_divisor)) * self.pad_size_divisor |
|
pad_w = int( |
|
np.ceil(_batch_inputs.shape[2] / |
|
self.pad_size_divisor)) * self.pad_size_divisor |
|
batch_pad_shape = [(pad_h, pad_w)] * _batch_inputs.shape[0] |
|
else: |
|
raise TypeError('Output of `cast_data` should be a dict ' |
|
'or a tuple with inputs and data_samples, but got' |
|
f'{type(data)}: {data}') |
|
return batch_pad_shape |
|
|
|
def pad_gt_masks(self, |
|
batch_data_samples: Sequence[DetDataSample]) -> None: |
|
"""Pad gt_masks to shape of batch_input_shape.""" |
|
if 'masks' in batch_data_samples[0].gt_instances: |
|
for data_samples in batch_data_samples: |
|
masks = data_samples.gt_instances.masks |
|
data_samples.gt_instances.masks = masks.pad( |
|
data_samples.batch_input_shape, |
|
pad_val=self.mask_pad_value) |
|
|
|
def pad_gt_pixel(self, |
|
batch_data_samples: Sequence[DetDataSample]) -> None: |
|
"""Pad gt_masks to shape of batch_input_shape.""" |
|
for data_samples in batch_data_samples: |
|
seg=data_samples.gt_pixel.seg |
|
h, w = data_samples.gt_pixel.shape[-2:] |
|
pad_h, pad_w = data_samples.batch_input_shape |
|
seg = F.pad( |
|
seg, |
|
pad=(0, max(pad_w - w, 0), 0, max(pad_h - h, 0)), |
|
mode='constant', |
|
value=0) |
|
if hasattr(data_samples.gt_pixel, 'abu'): |
|
abu = data_samples.gt_pixel.abu |
|
abu = F.pad( |
|
abu, |
|
pad=(0, max(pad_w - w, 0), 0, max(pad_h - h, 0)), |
|
mode='constant', |
|
value=0) |
|
data_samples.gt_pixel = PixelData(seg=seg, abu=abu) |
|
else: |
|
data_samples.gt_pixel = PixelData(seg=seg,) |
|
|
|
|
|
def pad_gt_sem_seg(self, |
|
batch_data_samples: Sequence[DetDataSample]) -> None: |
|
"""Pad gt_sem_seg to shape of batch_input_shape.""" |
|
if 'gt_sem_seg' in batch_data_samples[0]: |
|
for data_samples in batch_data_samples: |
|
gt_sem_seg = data_samples.gt_sem_seg.sem_seg |
|
h, w = gt_sem_seg.shape[-2:] |
|
pad_h, pad_w = data_samples.batch_input_shape |
|
gt_sem_seg = F.pad( |
|
gt_sem_seg, |
|
pad=(0, max(pad_w - w, 0), 0, max(pad_h - h, 0)), |
|
mode='constant', |
|
value=self.seg_pad_value) |
|
data_samples.gt_sem_seg = PixelData(sem_seg=gt_sem_seg) |
|
|
|
|