|
""" |
|
3D Point Cloud Augmentation |
|
|
|
Inspirited by chrischoy/SpatioTemporalSegmentation |
|
|
|
Author: Xiaoyang Wu ([email protected]) |
|
Please cite our work if the code is helpful to you. |
|
""" |
|
|
|
import random |
|
import numbers |
|
import scipy |
|
import scipy.ndimage |
|
import scipy.interpolate |
|
import scipy.stats |
|
import numpy as np |
|
import torch |
|
import copy |
|
from collections.abc import Sequence, Mapping |
|
|
|
from pointcept.utils.registry import Registry |
|
|
|
TRANSFORMS = Registry("transforms") |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class Collect(object): |
|
def __init__(self, keys, offset_keys_dict=None, **kwargs): |
|
""" |
|
e.g. Collect(keys=[coord], feat_keys=[coord, color]) |
|
""" |
|
if offset_keys_dict is None: |
|
offset_keys_dict = dict(offset="coord") |
|
self.keys = keys |
|
self.offset_keys = offset_keys_dict |
|
self.kwargs = kwargs |
|
|
|
def __call__(self, data_dict): |
|
data = dict() |
|
if isinstance(self.keys, str): |
|
self.keys = [self.keys] |
|
for key in self.keys: |
|
data[key] = data_dict[key] |
|
for key, value in self.offset_keys.items(): |
|
data[key] = torch.tensor([data_dict[value].shape[0]]) |
|
for name, keys in self.kwargs.items(): |
|
name = name.replace("_keys", "") |
|
assert isinstance(keys, Sequence) |
|
data[name] = torch.cat([data_dict[key].float() for key in keys], dim=1) |
|
return data |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class Copy(object): |
|
def __init__(self, keys_dict=None): |
|
if keys_dict is None: |
|
keys_dict = dict(coord="origin_coord", segment="origin_segment") |
|
self.keys_dict = keys_dict |
|
|
|
def __call__(self, data_dict): |
|
for key, value in self.keys_dict.items(): |
|
if isinstance(data_dict[key], np.ndarray): |
|
data_dict[value] = data_dict[key].copy() |
|
elif isinstance(data_dict[key], torch.Tensor): |
|
data_dict[value] = data_dict[key].clone().detach() |
|
else: |
|
data_dict[value] = copy.deepcopy(data_dict[key]) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ToTensor(object): |
|
def __call__(self, data): |
|
if isinstance(data, torch.Tensor): |
|
return data |
|
elif isinstance(data, str): |
|
|
|
return data |
|
elif isinstance(data, int): |
|
return torch.LongTensor([data]) |
|
elif isinstance(data, float): |
|
return torch.FloatTensor([data]) |
|
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, bool): |
|
return torch.from_numpy(data) |
|
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.integer): |
|
return torch.from_numpy(data).long() |
|
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.floating): |
|
return torch.from_numpy(data).float() |
|
elif isinstance(data, Mapping): |
|
result = {sub_key: self(item) for sub_key, item in data.items()} |
|
return result |
|
elif isinstance(data, Sequence): |
|
result = [self(item) for item in data] |
|
return result |
|
else: |
|
raise TypeError(f"type {type(data)} cannot be converted to tensor.") |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class Add(object): |
|
def __init__(self, keys_dict=None): |
|
if keys_dict is None: |
|
keys_dict = dict() |
|
self.keys_dict = keys_dict |
|
|
|
def __call__(self, data_dict): |
|
for key, value in self.keys_dict.items(): |
|
data_dict[key] = value |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class NormalizeColor(object): |
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys(): |
|
data_dict["color"] = data_dict["color"] / 127.5 - 1 |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class NormalizeCoord(object): |
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
|
|
centroid = np.mean(data_dict["coord"], axis=0) |
|
data_dict["coord"] -= centroid |
|
m = np.max(np.sqrt(np.sum(data_dict["coord"] ** 2, axis=1))) |
|
data_dict["coord"] = data_dict["coord"] / m |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class PositiveShift(object): |
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
coord_min = np.min(data_dict["coord"], 0) |
|
data_dict["coord"] -= coord_min |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class CenterShift(object): |
|
def __init__(self, apply_z=True): |
|
self.apply_z = apply_z |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
|
x_max, y_max, _ = data_dict["coord"].max(axis=0) |
|
if self.apply_z: |
|
shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, z_min] |
|
else: |
|
shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, 0] |
|
data_dict["coord"] -= shift |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomShift(object): |
|
def __init__(self, shift=((-0.2, 0.2), (-0.2, 0.2), (0, 0))): |
|
self.shift = shift |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
shift_x = np.random.uniform(self.shift[0][0], self.shift[0][1]) |
|
shift_y = np.random.uniform(self.shift[1][0], self.shift[1][1]) |
|
shift_z = np.random.uniform(self.shift[2][0], self.shift[2][1]) |
|
data_dict["coord"] += [shift_x, shift_y, shift_z] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class PointClip(object): |
|
def __init__(self, point_cloud_range=(-80, -80, -3, 80, 80, 1)): |
|
self.point_cloud_range = point_cloud_range |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"] = np.clip( |
|
data_dict["coord"], |
|
a_min=self.point_cloud_range[:3], |
|
a_max=self.point_cloud_range[3:], |
|
) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomDropout(object): |
|
def __init__(self, dropout_ratio=0.2, dropout_application_ratio=0.5): |
|
""" |
|
upright_axis: axis index among x,y,z, i.e. 2 for z |
|
""" |
|
self.dropout_ratio = dropout_ratio |
|
self.dropout_application_ratio = dropout_application_ratio |
|
|
|
def __call__(self, data_dict): |
|
if random.random() < self.dropout_application_ratio: |
|
n = len(data_dict["coord"]) |
|
idx = np.random.choice(n, int(n * (1 - self.dropout_ratio)), replace=False) |
|
if "sampled_index" in data_dict: |
|
|
|
idx = np.unique(np.append(idx, data_dict["sampled_index"])) |
|
mask = np.zeros_like(data_dict["segment"]).astype(bool) |
|
mask[data_dict["sampled_index"]] = True |
|
data_dict["sampled_index"] = np.where(mask[idx])[0] |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"] = data_dict["coord"][idx] |
|
if "color" in data_dict.keys(): |
|
data_dict["color"] = data_dict["color"][idx] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = data_dict["normal"][idx] |
|
if "strength" in data_dict.keys(): |
|
data_dict["strength"] = data_dict["strength"][idx] |
|
if "segment" in data_dict.keys(): |
|
data_dict["segment"] = data_dict["segment"][idx] |
|
if "instance" in data_dict.keys(): |
|
data_dict["instance"] = data_dict["instance"][idx] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomRotate(object): |
|
def __init__(self, angle=None, center=None, axis="z", always_apply=False, p=0.5): |
|
self.angle = [-1, 1] if angle is None else angle |
|
self.axis = axis |
|
self.always_apply = always_apply |
|
self.p = p if not self.always_apply else 1 |
|
self.center = center |
|
|
|
def __call__(self, data_dict): |
|
if random.random() > self.p: |
|
return data_dict |
|
angle = np.random.uniform(self.angle[0], self.angle[1]) * np.pi |
|
rot_cos, rot_sin = np.cos(angle), np.sin(angle) |
|
if self.axis == "x": |
|
rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) |
|
elif self.axis == "y": |
|
rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) |
|
elif self.axis == "z": |
|
rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) |
|
else: |
|
raise NotImplementedError |
|
if "coord" in data_dict.keys(): |
|
if self.center is None: |
|
x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
|
x_max, y_max, z_max = data_dict["coord"].max(axis=0) |
|
center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] |
|
else: |
|
center = self.center |
|
data_dict["coord"] -= center |
|
data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) |
|
data_dict["coord"] += center |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomRotateTargetAngle(object): |
|
def __init__( |
|
self, angle=(1 / 2, 1, 3 / 2), center=None, axis="z", always_apply=False, p=0.75 |
|
): |
|
self.angle = angle |
|
self.axis = axis |
|
self.always_apply = always_apply |
|
self.p = p if not self.always_apply else 1 |
|
self.center = center |
|
|
|
def __call__(self, data_dict): |
|
if random.random() > self.p: |
|
return data_dict |
|
angle = np.random.choice(self.angle) * np.pi |
|
rot_cos, rot_sin = np.cos(angle), np.sin(angle) |
|
if self.axis == "x": |
|
rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) |
|
elif self.axis == "y": |
|
rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) |
|
elif self.axis == "z": |
|
rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) |
|
else: |
|
raise NotImplementedError |
|
if "coord" in data_dict.keys(): |
|
if self.center is None: |
|
x_min, y_min, z_min = data_dict["coord"].min(axis=0) |
|
x_max, y_max, z_max = data_dict["coord"].max(axis=0) |
|
center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] |
|
else: |
|
center = self.center |
|
data_dict["coord"] -= center |
|
data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) |
|
data_dict["coord"] += center |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomScale(object): |
|
def __init__(self, scale=None, anisotropic=False): |
|
self.scale = scale if scale is not None else [0.95, 1.05] |
|
self.anisotropic = anisotropic |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
scale = np.random.uniform( |
|
self.scale[0], self.scale[1], 3 if self.anisotropic else 1 |
|
) |
|
data_dict["coord"] *= scale |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomFlip(object): |
|
def __init__(self, p=0.5): |
|
self.p = p |
|
|
|
def __call__(self, data_dict): |
|
if np.random.rand() < self.p: |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"][:, 0] = -data_dict["coord"][:, 0] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"][:, 0] = -data_dict["normal"][:, 0] |
|
if np.random.rand() < self.p: |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"][:, 1] = -data_dict["coord"][:, 1] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"][:, 1] = -data_dict["normal"][:, 1] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomJitter(object): |
|
def __init__(self, sigma=0.01, clip=0.05): |
|
assert clip > 0 |
|
self.sigma = sigma |
|
self.clip = clip |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
jitter = np.clip( |
|
self.sigma * np.random.randn(data_dict["coord"].shape[0], 3), |
|
-self.clip, |
|
self.clip, |
|
) |
|
data_dict["coord"] += jitter |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ClipGaussianJitter(object): |
|
def __init__(self, scalar=0.02, store_jitter=False): |
|
self.scalar = scalar |
|
self.mean = np.mean(3) |
|
self.cov = np.identity(3) |
|
self.quantile = 1.96 |
|
self.store_jitter = store_jitter |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys(): |
|
jitter = np.random.multivariate_normal( |
|
self.mean, self.cov, data_dict["coord"].shape[0] |
|
) |
|
jitter = self.scalar * np.clip(jitter / 1.96, -1, 1) |
|
data_dict["coord"] += jitter |
|
if self.store_jitter: |
|
data_dict["jitter"] = jitter |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ChromaticAutoContrast(object): |
|
def __init__(self, p=0.2, blend_factor=None): |
|
self.p = p |
|
self.blend_factor = blend_factor |
|
|
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys() and np.random.rand() < self.p: |
|
lo = np.min(data_dict["color"], 0, keepdims=True) |
|
hi = np.max(data_dict["color"], 0, keepdims=True) |
|
scale = 255 / (hi - lo) |
|
contrast_feat = (data_dict["color"][:, :3] - lo) * scale |
|
blend_factor = ( |
|
np.random.rand() if self.blend_factor is None else self.blend_factor |
|
) |
|
data_dict["color"][:, :3] = (1 - blend_factor) * data_dict["color"][ |
|
:, :3 |
|
] + blend_factor * contrast_feat |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ChromaticTranslation(object): |
|
def __init__(self, p=0.95, ratio=0.05): |
|
self.p = p |
|
self.ratio = ratio |
|
|
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys() and np.random.rand() < self.p: |
|
tr = (np.random.rand(1, 3) - 0.5) * 255 * 2 * self.ratio |
|
data_dict["color"][:, :3] = np.clip(tr + data_dict["color"][:, :3], 0, 255) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ChromaticJitter(object): |
|
def __init__(self, p=0.95, std=0.005): |
|
self.p = p |
|
self.std = std |
|
|
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys() and np.random.rand() < self.p: |
|
noise = np.random.randn(data_dict["color"].shape[0], 3) |
|
noise *= self.std * 255 |
|
data_dict["color"][:, :3] = np.clip( |
|
noise + data_dict["color"][:, :3], 0, 255 |
|
) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomColorGrayScale(object): |
|
def __init__(self, p): |
|
self.p = p |
|
|
|
@staticmethod |
|
def rgb_to_grayscale(color, num_output_channels=1): |
|
if color.shape[-1] < 3: |
|
raise TypeError( |
|
"Input color should have at least 3 dimensions, but found {}".format( |
|
color.shape[-1] |
|
) |
|
) |
|
|
|
if num_output_channels not in (1, 3): |
|
raise ValueError("num_output_channels should be either 1 or 3") |
|
|
|
r, g, b = color[..., 0], color[..., 1], color[..., 2] |
|
gray = (0.2989 * r + 0.587 * g + 0.114 * b).astype(color.dtype) |
|
gray = np.expand_dims(gray, axis=-1) |
|
|
|
if num_output_channels == 3: |
|
gray = np.broadcast_to(gray, color.shape) |
|
|
|
return gray |
|
|
|
def __call__(self, data_dict): |
|
if np.random.rand() < self.p: |
|
data_dict["color"] = self.rgb_to_grayscale(data_dict["color"], 3) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomColorJitter(object): |
|
""" |
|
Random Color Jitter for 3D point cloud (refer torchvision) |
|
""" |
|
|
|
def __init__(self, brightness=0, contrast=0, saturation=0, hue=0, p=0.95): |
|
self.brightness = self._check_input(brightness, "brightness") |
|
self.contrast = self._check_input(contrast, "contrast") |
|
self.saturation = self._check_input(saturation, "saturation") |
|
self.hue = self._check_input( |
|
hue, "hue", center=0, bound=(-0.5, 0.5), clip_first_on_zero=False |
|
) |
|
self.p = p |
|
|
|
@staticmethod |
|
def _check_input( |
|
value, name, center=1, bound=(0, float("inf")), clip_first_on_zero=True |
|
): |
|
if isinstance(value, numbers.Number): |
|
if value < 0: |
|
raise ValueError( |
|
"If {} is a single number, it must be non negative.".format(name) |
|
) |
|
value = [center - float(value), center + float(value)] |
|
if clip_first_on_zero: |
|
value[0] = max(value[0], 0.0) |
|
elif isinstance(value, (tuple, list)) and len(value) == 2: |
|
if not bound[0] <= value[0] <= value[1] <= bound[1]: |
|
raise ValueError("{} values should be between {}".format(name, bound)) |
|
else: |
|
raise TypeError( |
|
"{} should be a single number or a list/tuple with length 2.".format( |
|
name |
|
) |
|
) |
|
|
|
|
|
|
|
if value[0] == value[1] == center: |
|
value = None |
|
return value |
|
|
|
@staticmethod |
|
def blend(color1, color2, ratio): |
|
ratio = float(ratio) |
|
bound = 255.0 |
|
return ( |
|
(ratio * color1 + (1.0 - ratio) * color2) |
|
.clip(0, bound) |
|
.astype(color1.dtype) |
|
) |
|
|
|
@staticmethod |
|
def rgb2hsv(rgb): |
|
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] |
|
maxc = np.max(rgb, axis=-1) |
|
minc = np.min(rgb, axis=-1) |
|
eqc = maxc == minc |
|
cr = maxc - minc |
|
s = cr / (np.ones_like(maxc) * eqc + maxc * (1 - eqc)) |
|
cr_divisor = np.ones_like(maxc) * eqc + cr * (1 - eqc) |
|
rc = (maxc - r) / cr_divisor |
|
gc = (maxc - g) / cr_divisor |
|
bc = (maxc - b) / cr_divisor |
|
|
|
hr = (maxc == r) * (bc - gc) |
|
hg = ((maxc == g) & (maxc != r)) * (2.0 + rc - bc) |
|
hb = ((maxc != g) & (maxc != r)) * (4.0 + gc - rc) |
|
h = hr + hg + hb |
|
h = (h / 6.0 + 1.0) % 1.0 |
|
return np.stack((h, s, maxc), axis=-1) |
|
|
|
@staticmethod |
|
def hsv2rgb(hsv): |
|
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
|
i = np.floor(h * 6.0) |
|
f = (h * 6.0) - i |
|
i = i.astype(np.int32) |
|
|
|
p = np.clip((v * (1.0 - s)), 0.0, 1.0) |
|
q = np.clip((v * (1.0 - s * f)), 0.0, 1.0) |
|
t = np.clip((v * (1.0 - s * (1.0 - f))), 0.0, 1.0) |
|
i = i % 6 |
|
mask = np.expand_dims(i, axis=-1) == np.arange(6) |
|
|
|
a1 = np.stack((v, q, p, p, t, v), axis=-1) |
|
a2 = np.stack((t, v, v, q, p, p), axis=-1) |
|
a3 = np.stack((p, p, t, v, v, q), axis=-1) |
|
a4 = np.stack((a1, a2, a3), axis=-1) |
|
|
|
return np.einsum("...na, ...nab -> ...nb", mask.astype(hsv.dtype), a4) |
|
|
|
def adjust_brightness(self, color, brightness_factor): |
|
if brightness_factor < 0: |
|
raise ValueError( |
|
"brightness_factor ({}) is not non-negative.".format(brightness_factor) |
|
) |
|
|
|
return self.blend(color, np.zeros_like(color), brightness_factor) |
|
|
|
def adjust_contrast(self, color, contrast_factor): |
|
if contrast_factor < 0: |
|
raise ValueError( |
|
"contrast_factor ({}) is not non-negative.".format(contrast_factor) |
|
) |
|
mean = np.mean(RandomColorGrayScale.rgb_to_grayscale(color)) |
|
return self.blend(color, mean, contrast_factor) |
|
|
|
def adjust_saturation(self, color, saturation_factor): |
|
if saturation_factor < 0: |
|
raise ValueError( |
|
"saturation_factor ({}) is not non-negative.".format(saturation_factor) |
|
) |
|
gray = RandomColorGrayScale.rgb_to_grayscale(color) |
|
return self.blend(color, gray, saturation_factor) |
|
|
|
def adjust_hue(self, color, hue_factor): |
|
if not (-0.5 <= hue_factor <= 0.5): |
|
raise ValueError( |
|
"hue_factor ({}) is not in [-0.5, 0.5].".format(hue_factor) |
|
) |
|
orig_dtype = color.dtype |
|
hsv = self.rgb2hsv(color / 255.0) |
|
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
|
h = (h + hue_factor) % 1.0 |
|
hsv = np.stack((h, s, v), axis=-1) |
|
color_hue_adj = (self.hsv2rgb(hsv) * 255.0).astype(orig_dtype) |
|
return color_hue_adj |
|
|
|
@staticmethod |
|
def get_params(brightness, contrast, saturation, hue): |
|
fn_idx = torch.randperm(4) |
|
b = ( |
|
None |
|
if brightness is None |
|
else np.random.uniform(brightness[0], brightness[1]) |
|
) |
|
c = None if contrast is None else np.random.uniform(contrast[0], contrast[1]) |
|
s = ( |
|
None |
|
if saturation is None |
|
else np.random.uniform(saturation[0], saturation[1]) |
|
) |
|
h = None if hue is None else np.random.uniform(hue[0], hue[1]) |
|
return fn_idx, b, c, s, h |
|
|
|
def __call__(self, data_dict): |
|
( |
|
fn_idx, |
|
brightness_factor, |
|
contrast_factor, |
|
saturation_factor, |
|
hue_factor, |
|
) = self.get_params(self.brightness, self.contrast, self.saturation, self.hue) |
|
|
|
for fn_id in fn_idx: |
|
if ( |
|
fn_id == 0 |
|
and brightness_factor is not None |
|
and np.random.rand() < self.p |
|
): |
|
data_dict["color"] = self.adjust_brightness( |
|
data_dict["color"], brightness_factor |
|
) |
|
elif ( |
|
fn_id == 1 and contrast_factor is not None and np.random.rand() < self.p |
|
): |
|
data_dict["color"] = self.adjust_contrast( |
|
data_dict["color"], contrast_factor |
|
) |
|
elif ( |
|
fn_id == 2 |
|
and saturation_factor is not None |
|
and np.random.rand() < self.p |
|
): |
|
data_dict["color"] = self.adjust_saturation( |
|
data_dict["color"], saturation_factor |
|
) |
|
elif fn_id == 3 and hue_factor is not None and np.random.rand() < self.p: |
|
data_dict["color"] = self.adjust_hue(data_dict["color"], hue_factor) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class HueSaturationTranslation(object): |
|
@staticmethod |
|
def rgb_to_hsv(rgb): |
|
|
|
|
|
|
|
rgb = rgb.astype("float") |
|
hsv = np.zeros_like(rgb) |
|
|
|
hsv[..., 3:] = rgb[..., 3:] |
|
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] |
|
maxc = np.max(rgb[..., :3], axis=-1) |
|
minc = np.min(rgb[..., :3], axis=-1) |
|
hsv[..., 2] = maxc |
|
mask = maxc != minc |
|
hsv[mask, 1] = (maxc - minc)[mask] / maxc[mask] |
|
rc = np.zeros_like(r) |
|
gc = np.zeros_like(g) |
|
bc = np.zeros_like(b) |
|
rc[mask] = (maxc - r)[mask] / (maxc - minc)[mask] |
|
gc[mask] = (maxc - g)[mask] / (maxc - minc)[mask] |
|
bc[mask] = (maxc - b)[mask] / (maxc - minc)[mask] |
|
hsv[..., 0] = np.select( |
|
[r == maxc, g == maxc], [bc - gc, 2.0 + rc - bc], default=4.0 + gc - rc |
|
) |
|
hsv[..., 0] = (hsv[..., 0] / 6.0) % 1.0 |
|
return hsv |
|
|
|
@staticmethod |
|
def hsv_to_rgb(hsv): |
|
|
|
|
|
|
|
|
|
rgb = np.empty_like(hsv) |
|
rgb[..., 3:] = hsv[..., 3:] |
|
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
|
i = (h * 6.0).astype("uint8") |
|
f = (h * 6.0) - i |
|
p = v * (1.0 - s) |
|
q = v * (1.0 - s * f) |
|
t = v * (1.0 - s * (1.0 - f)) |
|
i = i % 6 |
|
conditions = [s == 0.0, i == 1, i == 2, i == 3, i == 4, i == 5] |
|
rgb[..., 0] = np.select(conditions, [v, q, p, p, t, v], default=v) |
|
rgb[..., 1] = np.select(conditions, [v, v, v, q, p, p], default=t) |
|
rgb[..., 2] = np.select(conditions, [v, p, t, v, v, q], default=p) |
|
return rgb.astype("uint8") |
|
|
|
def __init__(self, hue_max=0.5, saturation_max=0.2): |
|
self.hue_max = hue_max |
|
self.saturation_max = saturation_max |
|
|
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys(): |
|
|
|
hsv = HueSaturationTranslation.rgb_to_hsv(data_dict["color"][:, :3]) |
|
hue_val = (np.random.rand() - 0.5) * 2 * self.hue_max |
|
sat_ratio = 1 + (np.random.rand() - 0.5) * 2 * self.saturation_max |
|
hsv[..., 0] = np.remainder(hue_val + hsv[..., 0] + 1, 1) |
|
hsv[..., 1] = np.clip(sat_ratio * hsv[..., 1], 0, 1) |
|
data_dict["color"][:, :3] = np.clip( |
|
HueSaturationTranslation.hsv_to_rgb(hsv), 0, 255 |
|
) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class RandomColorDrop(object): |
|
def __init__(self, p=0.2, color_augment=0.0): |
|
self.p = p |
|
self.color_augment = color_augment |
|
|
|
def __call__(self, data_dict): |
|
if "color" in data_dict.keys() and np.random.rand() < self.p: |
|
data_dict["color"] *= self.color_augment |
|
return data_dict |
|
|
|
def __repr__(self): |
|
return "RandomColorDrop(color_augment: {}, p: {})".format( |
|
self.color_augment, self.p |
|
) |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ElasticDistortion(object): |
|
def __init__(self, distortion_params=None): |
|
self.distortion_params = ( |
|
[[0.2, 0.4], [0.8, 1.6]] if distortion_params is None else distortion_params |
|
) |
|
|
|
@staticmethod |
|
def elastic_distortion(coords, granularity, magnitude): |
|
""" |
|
Apply elastic distortion on sparse coordinate space. |
|
pointcloud: numpy array of (number of points, at least 3 spatial dims) |
|
granularity: size of the noise grid (in same scale[m/cm] as the voxel grid) |
|
magnitude: noise multiplier |
|
""" |
|
blurx = np.ones((3, 1, 1, 1)).astype("float32") / 3 |
|
blury = np.ones((1, 3, 1, 1)).astype("float32") / 3 |
|
blurz = np.ones((1, 1, 3, 1)).astype("float32") / 3 |
|
coords_min = coords.min(0) |
|
|
|
|
|
noise_dim = ((coords - coords_min).max(0) // granularity).astype(int) + 3 |
|
noise = np.random.randn(*noise_dim, 3).astype(np.float32) |
|
|
|
|
|
for _ in range(2): |
|
noise = scipy.ndimage.filters.convolve( |
|
noise, blurx, mode="constant", cval=0 |
|
) |
|
noise = scipy.ndimage.filters.convolve( |
|
noise, blury, mode="constant", cval=0 |
|
) |
|
noise = scipy.ndimage.filters.convolve( |
|
noise, blurz, mode="constant", cval=0 |
|
) |
|
|
|
|
|
ax = [ |
|
np.linspace(d_min, d_max, d) |
|
for d_min, d_max, d in zip( |
|
coords_min - granularity, |
|
coords_min + granularity * (noise_dim - 2), |
|
noise_dim, |
|
) |
|
] |
|
interp = scipy.interpolate.RegularGridInterpolator( |
|
ax, noise, bounds_error=False, fill_value=0 |
|
) |
|
coords += interp(coords) * magnitude |
|
return coords |
|
|
|
def __call__(self, data_dict): |
|
if "coord" in data_dict.keys() and self.distortion_params is not None: |
|
if random.random() < 0.95: |
|
for granularity, magnitude in self.distortion_params: |
|
data_dict["coord"] = self.elastic_distortion( |
|
data_dict["coord"], granularity, magnitude |
|
) |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class GridSample(object): |
|
def __init__( |
|
self, |
|
grid_size=0.05, |
|
hash_type="fnv", |
|
mode="train", |
|
keys=("coord", "color", "normal", "segment"), |
|
return_inverse=False, |
|
return_grid_coord=False, |
|
return_min_coord=False, |
|
return_displacement=False, |
|
project_displacement=False, |
|
): |
|
self.grid_size = grid_size |
|
self.hash = self.fnv_hash_vec if hash_type == "fnv" else self.ravel_hash_vec |
|
assert mode in ["train", "test"] |
|
self.mode = mode |
|
self.keys = keys |
|
self.return_inverse = return_inverse |
|
self.return_grid_coord = return_grid_coord |
|
self.return_min_coord = return_min_coord |
|
self.return_displacement = return_displacement |
|
self.project_displacement = project_displacement |
|
|
|
def __call__(self, data_dict): |
|
assert "coord" in data_dict.keys() |
|
scaled_coord = data_dict["coord"] / np.array(self.grid_size) |
|
grid_coord = np.floor(scaled_coord).astype(int) |
|
min_coord = grid_coord.min(0) |
|
grid_coord -= min_coord |
|
scaled_coord -= min_coord |
|
min_coord = min_coord * np.array(self.grid_size) |
|
key = self.hash(grid_coord) |
|
idx_sort = np.argsort(key) |
|
key_sort = key[idx_sort] |
|
_, inverse, count = np.unique(key_sort, return_inverse=True, return_counts=True) |
|
if self.mode == "train": |
|
idx_select = ( |
|
np.cumsum(np.insert(count, 0, 0)[0:-1]) |
|
+ np.random.randint(0, count.max(), count.size) % count |
|
) |
|
idx_unique = idx_sort[idx_select] |
|
if "sampled_index" in data_dict: |
|
|
|
idx_unique = np.unique( |
|
np.append(idx_unique, data_dict["sampled_index"]) |
|
) |
|
mask = np.zeros_like(data_dict["segment"]).astype(bool) |
|
mask[data_dict["sampled_index"]] = True |
|
data_dict["sampled_index"] = np.where(mask[idx_unique])[0] |
|
if self.return_inverse: |
|
data_dict["inverse"] = np.zeros_like(inverse) |
|
data_dict["inverse"][idx_sort] = inverse |
|
if self.return_grid_coord: |
|
data_dict["grid_coord"] = grid_coord[idx_unique] |
|
if self.return_min_coord: |
|
data_dict["min_coord"] = min_coord.reshape([1, 3]) |
|
if self.return_displacement: |
|
displacement = ( |
|
scaled_coord - grid_coord - 0.5 |
|
) |
|
if self.project_displacement: |
|
displacement = np.sum( |
|
displacement * data_dict["normal"], axis=-1, keepdims=True |
|
) |
|
data_dict["displacement"] = displacement[idx_unique] |
|
for key in self.keys: |
|
data_dict[key] = data_dict[key][idx_unique] |
|
return data_dict |
|
|
|
elif self.mode == "test": |
|
data_part_list = [] |
|
for i in range(count.max()): |
|
idx_select = np.cumsum(np.insert(count, 0, 0)[0:-1]) + i % count |
|
idx_part = idx_sort[idx_select] |
|
data_part = dict(index=idx_part) |
|
if self.return_inverse: |
|
data_dict["inverse"] = np.zeros_like(inverse) |
|
data_dict["inverse"][idx_sort] = inverse |
|
if self.return_grid_coord: |
|
data_part["grid_coord"] = grid_coord[idx_part] |
|
if self.return_min_coord: |
|
data_part["min_coord"] = min_coord.reshape([1, 3]) |
|
if self.return_displacement: |
|
displacement = ( |
|
scaled_coord - grid_coord - 0.5 |
|
) |
|
if self.project_displacement: |
|
displacement = np.sum( |
|
displacement * data_dict["normal"], axis=-1, keepdims=True |
|
) |
|
data_dict["displacement"] = displacement[idx_part] |
|
for key in data_dict.keys(): |
|
if key in self.keys: |
|
data_part[key] = data_dict[key][idx_part] |
|
else: |
|
data_part[key] = data_dict[key] |
|
data_part_list.append(data_part) |
|
return data_part_list |
|
else: |
|
raise NotImplementedError |
|
|
|
@staticmethod |
|
def ravel_hash_vec(arr): |
|
""" |
|
Ravel the coordinates after subtracting the min coordinates. |
|
""" |
|
assert arr.ndim == 2 |
|
arr = arr.copy() |
|
arr -= arr.min(0) |
|
arr = arr.astype(np.uint64, copy=False) |
|
arr_max = arr.max(0).astype(np.uint64) + 1 |
|
|
|
keys = np.zeros(arr.shape[0], dtype=np.uint64) |
|
|
|
for j in range(arr.shape[1] - 1): |
|
keys += arr[:, j] |
|
keys *= arr_max[j + 1] |
|
keys += arr[:, -1] |
|
return keys |
|
|
|
@staticmethod |
|
def fnv_hash_vec(arr): |
|
""" |
|
FNV64-1A |
|
""" |
|
assert arr.ndim == 2 |
|
|
|
arr = arr.copy() |
|
arr = arr.astype(np.uint64, copy=False) |
|
hashed_arr = np.uint64(14695981039346656037) * np.ones( |
|
arr.shape[0], dtype=np.uint64 |
|
) |
|
for j in range(arr.shape[1]): |
|
hashed_arr *= np.uint64(1099511628211) |
|
hashed_arr = np.bitwise_xor(hashed_arr, arr[:, j]) |
|
return hashed_arr |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class SphereCrop(object): |
|
def __init__(self, point_max=80000, sample_rate=None, mode="random"): |
|
self.point_max = point_max |
|
self.sample_rate = sample_rate |
|
assert mode in ["random", "center", "all"] |
|
self.mode = mode |
|
|
|
def __call__(self, data_dict): |
|
point_max = ( |
|
int(self.sample_rate * data_dict["coord"].shape[0]) |
|
if self.sample_rate is not None |
|
else self.point_max |
|
) |
|
|
|
assert "coord" in data_dict.keys() |
|
if self.mode == "all": |
|
|
|
if "index" not in data_dict.keys(): |
|
data_dict["index"] = np.arange(data_dict["coord"].shape[0]) |
|
data_part_list = [] |
|
|
|
if data_dict["coord"].shape[0] > point_max: |
|
coord_p, idx_uni = np.random.rand( |
|
data_dict["coord"].shape[0] |
|
) * 1e-3, np.array([]) |
|
while idx_uni.size != data_dict["index"].shape[0]: |
|
init_idx = np.argmin(coord_p) |
|
dist2 = np.sum( |
|
np.power(data_dict["coord"] - data_dict["coord"][init_idx], 2), |
|
1, |
|
) |
|
idx_crop = np.argsort(dist2)[:point_max] |
|
|
|
data_crop_dict = dict() |
|
if "coord" in data_dict.keys(): |
|
data_crop_dict["coord"] = data_dict["coord"][idx_crop] |
|
if "grid_coord" in data_dict.keys(): |
|
data_crop_dict["grid_coord"] = data_dict["grid_coord"][idx_crop] |
|
if "normal" in data_dict.keys(): |
|
data_crop_dict["normal"] = data_dict["normal"][idx_crop] |
|
if "color" in data_dict.keys(): |
|
data_crop_dict["color"] = data_dict["color"][idx_crop] |
|
if "displacement" in data_dict.keys(): |
|
data_crop_dict["displacement"] = data_dict["displacement"][ |
|
idx_crop |
|
] |
|
if "strength" in data_dict.keys(): |
|
data_crop_dict["strength"] = data_dict["strength"][idx_crop] |
|
data_crop_dict["weight"] = dist2[idx_crop] |
|
data_crop_dict["index"] = data_dict["index"][idx_crop] |
|
data_part_list.append(data_crop_dict) |
|
|
|
delta = np.square( |
|
1 - data_crop_dict["weight"] / np.max(data_crop_dict["weight"]) |
|
) |
|
coord_p[idx_crop] += delta |
|
idx_uni = np.unique( |
|
np.concatenate((idx_uni, data_crop_dict["index"])) |
|
) |
|
else: |
|
data_crop_dict = data_dict.copy() |
|
data_crop_dict["weight"] = np.zeros(data_dict["coord"].shape[0]) |
|
data_crop_dict["index"] = data_dict["index"] |
|
data_part_list.append(data_crop_dict) |
|
return data_part_list |
|
|
|
elif data_dict["coord"].shape[0] > point_max: |
|
if self.mode == "random": |
|
center = data_dict["coord"][ |
|
np.random.randint(data_dict["coord"].shape[0]) |
|
] |
|
elif self.mode == "center": |
|
center = data_dict["coord"][data_dict["coord"].shape[0] // 2] |
|
else: |
|
raise NotImplementedError |
|
idx_crop = np.argsort(np.sum(np.square(data_dict["coord"] - center), 1))[ |
|
:point_max |
|
] |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"] = data_dict["coord"][idx_crop] |
|
if "origin_coord" in data_dict.keys(): |
|
data_dict["origin_coord"] = data_dict["origin_coord"][idx_crop] |
|
if "grid_coord" in data_dict.keys(): |
|
data_dict["grid_coord"] = data_dict["grid_coord"][idx_crop] |
|
if "color" in data_dict.keys(): |
|
data_dict["color"] = data_dict["color"][idx_crop] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = data_dict["normal"][idx_crop] |
|
if "segment" in data_dict.keys(): |
|
data_dict["segment"] = data_dict["segment"][idx_crop] |
|
if "instance" in data_dict.keys(): |
|
data_dict["instance"] = data_dict["instance"][idx_crop] |
|
if "displacement" in data_dict.keys(): |
|
data_dict["displacement"] = data_dict["displacement"][idx_crop] |
|
if "strength" in data_dict.keys(): |
|
data_dict["strength"] = data_dict["strength"][idx_crop] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ShufflePoint(object): |
|
def __call__(self, data_dict): |
|
assert "coord" in data_dict.keys() |
|
shuffle_index = np.arange(data_dict["coord"].shape[0]) |
|
np.random.shuffle(shuffle_index) |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"] = data_dict["coord"][shuffle_index] |
|
if "grid_coord" in data_dict.keys(): |
|
data_dict["grid_coord"] = data_dict["grid_coord"][shuffle_index] |
|
if "displacement" in data_dict.keys(): |
|
data_dict["displacement"] = data_dict["displacement"][shuffle_index] |
|
if "color" in data_dict.keys(): |
|
data_dict["color"] = data_dict["color"][shuffle_index] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = data_dict["normal"][shuffle_index] |
|
if "segment" in data_dict.keys(): |
|
data_dict["segment"] = data_dict["segment"][shuffle_index] |
|
if "instance" in data_dict.keys(): |
|
data_dict["instance"] = data_dict["instance"][shuffle_index] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class CropBoundary(object): |
|
def __call__(self, data_dict): |
|
assert "segment" in data_dict |
|
segment = data_dict["segment"].flatten() |
|
mask = (segment != 0) * (segment != 1) |
|
if "coord" in data_dict.keys(): |
|
data_dict["coord"] = data_dict["coord"][mask] |
|
if "grid_coord" in data_dict.keys(): |
|
data_dict["grid_coord"] = data_dict["grid_coord"][mask] |
|
if "color" in data_dict.keys(): |
|
data_dict["color"] = data_dict["color"][mask] |
|
if "normal" in data_dict.keys(): |
|
data_dict["normal"] = data_dict["normal"][mask] |
|
if "segment" in data_dict.keys(): |
|
data_dict["segment"] = data_dict["segment"][mask] |
|
if "instance" in data_dict.keys(): |
|
data_dict["instance"] = data_dict["instance"][mask] |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class ContrastiveViewsGenerator(object): |
|
def __init__( |
|
self, |
|
view_keys=("coord", "color", "normal", "origin_coord"), |
|
view_trans_cfg=None, |
|
): |
|
self.view_keys = view_keys |
|
self.view_trans = Compose(view_trans_cfg) |
|
|
|
def __call__(self, data_dict): |
|
view1_dict = dict() |
|
view2_dict = dict() |
|
for key in self.view_keys: |
|
view1_dict[key] = data_dict[key].copy() |
|
view2_dict[key] = data_dict[key].copy() |
|
view1_dict = self.view_trans(view1_dict) |
|
view2_dict = self.view_trans(view2_dict) |
|
for key, value in view1_dict.items(): |
|
data_dict["view1_" + key] = value |
|
for key, value in view2_dict.items(): |
|
data_dict["view2_" + key] = value |
|
return data_dict |
|
|
|
|
|
@TRANSFORMS.register_module() |
|
class InstanceParser(object): |
|
def __init__(self, segment_ignore_index=(-1, 0, 1), instance_ignore_index=-1): |
|
self.segment_ignore_index = segment_ignore_index |
|
self.instance_ignore_index = instance_ignore_index |
|
|
|
def __call__(self, data_dict): |
|
coord = data_dict["coord"] |
|
segment = data_dict["segment"] |
|
instance = data_dict["instance"] |
|
mask = ~np.in1d(segment, self.segment_ignore_index) |
|
|
|
instance[~mask] = self.instance_ignore_index |
|
|
|
unique, inverse = np.unique(instance[mask], return_inverse=True) |
|
instance_num = len(unique) |
|
instance[mask] = inverse |
|
|
|
centroid = np.ones((coord.shape[0], 3)) * self.instance_ignore_index |
|
bbox = np.ones((instance_num, 8)) * self.instance_ignore_index |
|
vacancy = [ |
|
index for index in self.segment_ignore_index if index >= 0 |
|
] |
|
|
|
for instance_id in range(instance_num): |
|
mask_ = instance == instance_id |
|
coord_ = coord[mask_] |
|
bbox_min = coord_.min(0) |
|
bbox_max = coord_.max(0) |
|
bbox_centroid = coord_.mean(0) |
|
bbox_center = (bbox_max + bbox_min) / 2 |
|
bbox_size = bbox_max - bbox_min |
|
bbox_theta = np.zeros(1, dtype=coord_.dtype) |
|
bbox_class = np.array([segment[mask_][0]], dtype=coord_.dtype) |
|
|
|
bbox_class -= np.greater(bbox_class, vacancy).sum() |
|
|
|
centroid[mask_] = bbox_centroid |
|
bbox[instance_id] = np.concatenate( |
|
[bbox_center, bbox_size, bbox_theta, bbox_class] |
|
) |
|
data_dict["instance"] = instance |
|
data_dict["instance_centroid"] = centroid |
|
data_dict["bbox"] = bbox |
|
return data_dict |
|
|
|
|
|
class Compose(object): |
|
def __init__(self, cfg=None): |
|
self.cfg = cfg if cfg is not None else [] |
|
self.transforms = [] |
|
for t_cfg in self.cfg: |
|
self.transforms.append(TRANSFORMS.build(t_cfg)) |
|
|
|
def __call__(self, data_dict): |
|
for t in self.transforms: |
|
data_dict = t(data_dict) |
|
return data_dict |
|
|