|
from typing import * |
|
import torch |
|
import torch.nn as nn |
|
from . import SparseTensor |
|
|
|
__all__ = ["SparseDownsample", "SparseUpsample", "SparseSubdivide"] |
|
|
|
|
|
class SparseDownsample(nn.Module): |
|
""" |
|
Downsample a sparse tensor by a factor of `factor`. |
|
Implemented as average pooling. |
|
""" |
|
|
|
def __init__(self, factor: Union[int, Tuple[int, ...], List[int]]): |
|
super(SparseDownsample, self).__init__() |
|
self.factor = tuple(factor) if isinstance(factor, (list, tuple)) else factor |
|
|
|
def forward(self, input: SparseTensor) -> SparseTensor: |
|
DIM = input.coords.shape[-1] - 1 |
|
factor = self.factor if isinstance(self.factor, tuple) else (self.factor,) * DIM |
|
assert DIM == len( |
|
factor |
|
), "Input coordinates must have the same dimension as the downsample factor." |
|
|
|
coord = list(input.coords.unbind(dim=-1)) |
|
for i, f in enumerate(factor): |
|
coord[i + 1] = coord[i + 1] // f |
|
|
|
MAX = [coord[i + 1].max().item() + 1 for i in range(DIM)] |
|
OFFSET = torch.cumprod(torch.tensor(MAX[::-1]), 0).tolist()[::-1] + [1] |
|
code = sum([c * o for c, o in zip(coord, OFFSET)]) |
|
code, idx = code.unique(return_inverse=True) |
|
|
|
new_feats = torch.scatter_reduce( |
|
torch.zeros( |
|
code.shape[0], |
|
input.feats.shape[1], |
|
device=input.feats.device, |
|
dtype=input.feats.dtype, |
|
), |
|
dim=0, |
|
index=idx.unsqueeze(1).expand(-1, input.feats.shape[1]), |
|
src=input.feats, |
|
reduce="mean", |
|
) |
|
new_coords = torch.stack( |
|
[code // OFFSET[0]] |
|
+ [(code // OFFSET[i + 1]) % MAX[i] for i in range(DIM)], |
|
dim=-1, |
|
) |
|
out = SparseTensor( |
|
new_feats, |
|
new_coords, |
|
input.shape, |
|
) |
|
out._scale = tuple([s // f for s, f in zip(input._scale, factor)]) |
|
out._spatial_cache = input._spatial_cache |
|
|
|
out.register_spatial_cache(f"upsample_{factor}_coords", input.coords) |
|
out.register_spatial_cache(f"upsample_{factor}_layout", input.layout) |
|
out.register_spatial_cache(f"upsample_{factor}_idx", idx) |
|
|
|
return out |
|
|
|
|
|
class SparseUpsample(nn.Module): |
|
""" |
|
Upsample a sparse tensor by a factor of `factor`. |
|
Implemented as nearest neighbor interpolation. |
|
""" |
|
|
|
def __init__(self, factor: Union[int, Tuple[int, int, int], List[int]]): |
|
super(SparseUpsample, self).__init__() |
|
self.factor = tuple(factor) if isinstance(factor, (list, tuple)) else factor |
|
|
|
def forward(self, input: SparseTensor) -> SparseTensor: |
|
DIM = input.coords.shape[-1] - 1 |
|
factor = self.factor if isinstance(self.factor, tuple) else (self.factor,) * DIM |
|
assert DIM == len( |
|
factor |
|
), "Input coordinates must have the same dimension as the upsample factor." |
|
|
|
new_coords = input.get_spatial_cache(f"upsample_{factor}_coords") |
|
new_layout = input.get_spatial_cache(f"upsample_{factor}_layout") |
|
idx = input.get_spatial_cache(f"upsample_{factor}_idx") |
|
if any([x is None for x in [new_coords, new_layout, idx]]): |
|
raise ValueError( |
|
"Upsample cache not found. SparseUpsample must be paired with SparseDownsample." |
|
) |
|
new_feats = input.feats[idx] |
|
out = SparseTensor(new_feats, new_coords, input.shape, new_layout) |
|
out._scale = tuple([s * f for s, f in zip(input._scale, factor)]) |
|
out._spatial_cache = input._spatial_cache |
|
return out |
|
|
|
|
|
class SparseSubdivide(nn.Module): |
|
""" |
|
Upsample a sparse tensor by a factor of `factor`. |
|
Implemented as nearest neighbor interpolation. |
|
""" |
|
|
|
def __init__(self): |
|
super(SparseSubdivide, self).__init__() |
|
|
|
def forward(self, input: SparseTensor) -> SparseTensor: |
|
DIM = input.coords.shape[-1] - 1 |
|
|
|
n_cube = torch.ones([2] * DIM, device=input.device, dtype=torch.int) |
|
n_coords = torch.nonzero(n_cube) |
|
n_coords = torch.cat([torch.zeros_like(n_coords[:, :1]), n_coords], dim=-1) |
|
factor = n_coords.shape[0] |
|
assert factor == 2**DIM |
|
|
|
new_coords = input.coords.clone() |
|
new_coords[:, 1:] *= 2 |
|
new_coords = new_coords.unsqueeze(1) + n_coords.unsqueeze(0).to( |
|
new_coords.dtype |
|
) |
|
|
|
new_feats = input.feats.unsqueeze(1).expand( |
|
input.feats.shape[0], factor, *input.feats.shape[1:] |
|
) |
|
out = SparseTensor( |
|
new_feats.flatten(0, 1), new_coords.flatten(0, 1), input.shape |
|
) |
|
out._scale = input._scale * 2 |
|
out._spatial_cache = input._spatial_cache |
|
return out |
|
|