# Copyright (c) OpenMMLab. All rights reserved. from collections import defaultdict from typing import Tuple import numpy as np import torch from mmengine.model import BaseModule from mmengine.runner.checkpoint import load_checkpoint from scipy.optimize import linear_sum_assignment from torch import Tensor, nn from mmdet.registry import TASK_UTILS INFINITY = 1e5 class TemporalBlock(BaseModule): """The temporal block of AFLink model. Args: in_channel (int): the dimension of the input channels. out_channel (int): the dimension of the output channels. """ def __init__(self, in_channel: int, out_channel: int, kernel_size: tuple = (7, 1)): super(TemporalBlock, self).__init__() self.conv = nn.Conv2d(in_channel, out_channel, kernel_size, bias=False) self.relu = nn.ReLU(inplace=True) self.bnf = nn.BatchNorm1d(out_channel) self.bnx = nn.BatchNorm1d(out_channel) self.bny = nn.BatchNorm1d(out_channel) def bn(self, x: Tensor) -> Tensor: x[:, :, :, 0] = self.bnf(x[:, :, :, 0]) x[:, :, :, 1] = self.bnx(x[:, :, :, 1]) x[:, :, :, 2] = self.bny(x[:, :, :, 2]) return x def forward(self, x: Tensor) -> Tensor: x = self.conv(x) x = self.bn(x) x = self.relu(x) return x class FusionBlock(BaseModule): """The fusion block of AFLink model. Args: in_channel (int): the dimension of the input channels. out_channel (int): the dimension of the output channels. """ def __init__(self, in_channel: int, out_channel: int): super(FusionBlock, self).__init__() self.conv = nn.Conv2d(in_channel, out_channel, (1, 3), bias=False) self.bn = nn.BatchNorm2d(out_channel) self.relu = nn.ReLU(inplace=True) def forward(self, x: Tensor) -> Tensor: x = self.conv(x) x = self.bn(x) x = self.relu(x) return x class Classifier(BaseModule): """The classifier of AFLink model. Args: in_channel (int): the dimension of the input channels. """ def __init__(self, in_channel: int, out_channel: int): super(Classifier, self).__init__() self.fc1 = nn.Linear(in_channel * 2, in_channel // 2) self.relu = nn.ReLU(inplace=True) self.fc2 = nn.Linear(in_channel // 2, out_channel) def forward(self, x1: Tensor, x2: Tensor) -> Tensor: x = torch.cat((x1, x2), dim=1) x = self.fc1(x) x = self.relu(x) x = self.fc2(x) return x class AFLinkModel(BaseModule): """Appearance-Free Link Model.""" def __init__(self, temporal_module_channels: list = [1, 32, 64, 128, 256], fusion_module_channels: list = [256, 256], classifier_channels: list = [256, 2]): super(AFLinkModel, self).__init__() self.TemporalModule_1 = nn.Sequential(*[ TemporalBlock(temporal_module_channels[i], temporal_module_channels[i + 1]) for i in range(len(temporal_module_channels) - 1) ]) self.TemporalModule_2 = nn.Sequential(*[ TemporalBlock(temporal_module_channels[i], temporal_module_channels[i + 1]) for i in range(len(temporal_module_channels) - 1) ]) self.FusionBlock_1 = FusionBlock(*fusion_module_channels) self.FusionBlock_2 = FusionBlock(*fusion_module_channels) self.pooling = nn.AdaptiveAvgPool2d((1, 1)) self.classifier = Classifier(*classifier_channels) def forward(self, x1: Tensor, x2: Tensor) -> Tensor: assert not self.training, 'Only testing is supported for AFLink.' x1 = x1[:, :, :, :3] x2 = x2[:, :, :, :3] x1 = self.TemporalModule_1(x1) # [B,1,30,3] -> [B,256,6,3] x2 = self.TemporalModule_2(x2) x1 = self.FusionBlock_1(x1) x2 = self.FusionBlock_2(x2) x1 = self.pooling(x1).squeeze(-1).squeeze(-1) x2 = self.pooling(x2).squeeze(-1).squeeze(-1) y = self.classifier(x1, x2) y = torch.softmax(y, dim=1)[0, 1] return y @TASK_UTILS.register_module() class AppearanceFreeLink(BaseModule): """Appearance-Free Link method. This method is proposed in "StrongSORT: Make DeepSORT Great Again" `StrongSORT`_. Args: checkpoint (str): Checkpoint path. temporal_threshold (tuple, optional): The temporal constraint for tracklets association. Defaults to (0, 30). spatial_threshold (int, optional): The spatial constraint for tracklets association. Defaults to 75. confidence_threshold (float, optional): The minimum confidence threshold for tracklets association. Defaults to 0.95. """ def __init__(self, checkpoint: str, temporal_threshold: tuple = (0, 30), spatial_threshold: int = 75, confidence_threshold: float = 0.95): super(AppearanceFreeLink, self).__init__() self.temporal_threshold = temporal_threshold self.spatial_threshold = spatial_threshold self.confidence_threshold = confidence_threshold self.model = AFLinkModel() if checkpoint: load_checkpoint(self.model, checkpoint) if torch.cuda.is_available(): self.model.cuda() self.model.eval() self.device = next(self.model.parameters()).device self.fn_l2 = lambda x, y: np.sqrt(x**2 + y**2) def data_transform(self, track1: np.ndarray, track2: np.ndarray, length: int = 30) -> Tuple[np.ndarray]: """Data Transformation. This is used to standardize the length of tracks to a unified length. Then perform min-max normalization to the motion embeddings. Args: track1 (ndarray): the first track with shape (N,C). track2 (ndarray): the second track with shape (M,C). length (int): the unified length of tracks. Defaults to 30. Returns: Tuple[ndarray]: the transformed track1 and track2. """ # fill or cut track1 length_1 = track1.shape[0] track1 = track1[-length:] if length_1 >= length else \ np.pad(track1, ((length - length_1, 0), (0, 0))) # fill or cut track1 length_2 = track2.shape[0] track2 = track2[:length] if length_2 >= length else \ np.pad(track2, ((0, length - length_2), (0, 0))) # min-max normalization min_ = np.concatenate((track1, track2), axis=0).min(axis=0) max_ = np.concatenate((track1, track2), axis=0).max(axis=0) subtractor = (max_ + min_) / 2 divisor = (max_ - min_) / 2 + 1e-5 track1 = (track1 - subtractor) / divisor track2 = (track2 - subtractor) / divisor return track1, track2 def forward(self, pred_tracks: np.ndarray) -> np.ndarray: """Forward function. pred_tracks (ndarray): With shape (N, 7). Each row denotes (frame_id, track_id, x1, y1, x2, y2, score). Returns: ndarray: The linked tracks with shape (N, 7). Each row denotes (frame_id, track_id, x1, y1, x2, y2, score) """ # sort tracks by the frame id pred_tracks = pred_tracks[np.argsort(pred_tracks[:, 0])] # gather tracks information id2info = defaultdict(list) for row in pred_tracks: frame_id, track_id, x1, y1, x2, y2 = row[:6] id2info[track_id].append([frame_id, x1, y1, x2 - x1, y2 - y1]) id2info = {k: np.array(v) for k, v in id2info.items()} num_track = len(id2info) track_ids = np.array(list(id2info)) cost_matrix = np.full((num_track, num_track), INFINITY) # compute the cost matrix for i, id_i in enumerate(track_ids): for j, id_j in enumerate(track_ids): if id_i == id_j: continue info_i, info_j = id2info[id_i], id2info[id_j] frame_i, box_i = info_i[-1][0], info_i[-1][1:3] frame_j, box_j = info_j[0][0], info_j[0][1:3] # temporal constraint if not self.temporal_threshold[0] <= \ frame_j - frame_i <= self.temporal_threshold[1]: continue # spatial constraint if self.fn_l2(box_i[0] - box_j[0], box_i[1] - box_j[1]) \ > self.spatial_threshold: continue # confidence constraint track_i, track_j = self.data_transform(info_i, info_j) # numpy to torch track_i = torch.tensor( track_i, dtype=torch.float).to(self.device) track_j = torch.tensor( track_j, dtype=torch.float).to(self.device) track_i = track_i.unsqueeze(0).unsqueeze(0) track_j = track_j.unsqueeze(0).unsqueeze(0) confidence = self.model(track_i, track_j).detach().cpu().numpy() if confidence >= self.confidence_threshold: cost_matrix[i, j] = 1 - confidence # linear assignment indices = linear_sum_assignment(cost_matrix) _id2id = dict() # the temporary assignment results id2id = dict() # the final assignment results for i, j in zip(indices[0], indices[1]): if cost_matrix[i, j] < INFINITY: _id2id[i] = j for k, v in _id2id.items(): if k in id2id: id2id[v] = id2id[k] else: id2id[v] = k # link for k, v in id2id.items(): pred_tracks[pred_tracks[:, 1] == k, 1] = v # deduplicate _, index = np.unique(pred_tracks[:, :2], return_index=True, axis=0) return pred_tracks[index]