|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
r"""This file contains code to track based on IoU overlaps. |
|
|
|
The IoUTracker takes frame-by-frame panoptic segmentation prediction and |
|
generates video panoptic segmentation with re-ordered identities based on IoU |
|
overlaps within consecutive frames. |
|
|
|
To run this script, you need to install scipy. |
|
For example, install it via pip: |
|
$pip install scipy |
|
""" |
|
|
|
import collections |
|
import os |
|
import pprint |
|
from typing import List, Text, Tuple, Optional |
|
|
|
from absl import app |
|
from absl import flags |
|
from absl import logging |
|
import numpy as np |
|
from scipy import optimize |
|
import tensorflow as tf |
|
|
|
from deeplab2.data import dataset |
|
from deeplab2.evaluation import segmentation_and_tracking_quality as stq |
|
from deeplab2.tracker import optical_flow_utils |
|
from deeplab2.trainer import vis_utils |
|
|
|
FLAGS = flags.FLAGS |
|
|
|
flags.DEFINE_string('gt', None, 'The path to the gt video frames. This folder ' |
|
'should contain one folder per sequence.') |
|
flags.DEFINE_string('pred', None, 'The path to the prediction video frames. ' |
|
'This folder should contain one folder per sequence.') |
|
flags.DEFINE_string('output', '', 'The path to store the tracked video frames.' |
|
'This folder should contain one folder per sequence.') |
|
flags.DEFINE_string('sequence', '', 'The sequence ID to evaluate on.') |
|
flags.DEFINE_string( |
|
'dataset', 'kitti_step', 'The specified dataset is used' |
|
' to interpret the labels. Supported options are: ' + |
|
', '.join(dataset.MAP_NAMES)) |
|
flags.DEFINE_string('optical_flow', None, |
|
'The path to the optical flow predictions. This folder ' |
|
'should contain one folder per sequence.') |
|
|
|
_LABEL_DIVISOR = 10000 |
|
_OCCLUSION_EXT = '.occ_forward' |
|
_FLOW_EXT = '.flow_forward' |
|
|
|
|
|
def _format_output(output, indent=4): |
|
"""Formats `output`, either on one line, or indented across multiple lines.""" |
|
formatted = pprint.pformat(output) |
|
lines = formatted.splitlines() |
|
if len(lines) == 1: |
|
return formatted |
|
lines = [' ' * indent + line for line in lines] |
|
return '\n' + '\n'.join(lines) |
|
|
|
|
|
def _compute_mask_iou(instance_a: np.ndarray, instance_b: np.ndarray) -> int: |
|
"""Computes the IoU of two binary masks.""" |
|
intersection = np.count_nonzero( |
|
np.logical_and(instance_a > 0, instance_b > 0).astype(np.uint8)) |
|
non_intersection_a = np.count_nonzero(instance_a > 0) - intersection |
|
non_intersection_b = np.count_nonzero(instance_b > 0) - intersection |
|
return intersection / ( |
|
intersection + non_intersection_a + non_intersection_b) |
|
|
|
|
|
class IoUTracker(object): |
|
"""This class computes track IDs based on IoU overlap.""" |
|
|
|
def __init__(self, |
|
classes_to_track: List[int], |
|
label_divisor: int, |
|
sigma=10, |
|
iou_threshold=0.3): |
|
"""Initializes the tracker. |
|
|
|
Args: |
|
classes_to_track: A list of class IDs that should be tracked. |
|
label_divisor: The divisor to split the label map into semantic classes |
|
and instance IDs. |
|
sigma: An integer specifying the number of frames that tracks should be |
|
kept active while being discontinued. |
|
iou_threshold: A float specifying the minimum IoU value for a match. |
|
""" |
|
self._sigma = sigma |
|
self._iou_threshold = iou_threshold |
|
self._classes_to_track = classes_to_track |
|
self._label_divisor = label_divisor |
|
self.reset_states() |
|
|
|
def reset_states(self): |
|
"""Resets all tracking states.""" |
|
self._last_mask_per_track = { |
|
i: collections.OrderedDict() for i in self._classes_to_track |
|
} |
|
self._frames_since_last_update = { |
|
i: collections.OrderedDict() for i in self._classes_to_track |
|
} |
|
|
|
self._next_track_id = 1 |
|
|
|
def _add_track(self, object_mask: np.ndarray, class_index: int): |
|
"""Adds a new track.""" |
|
track_id = self._next_track_id |
|
self._last_mask_per_track[class_index][track_id] = object_mask |
|
self._frames_since_last_update[class_index][track_id] = 0 |
|
self._next_track_id += 1 |
|
|
|
def _remove_track(self, track_id: int, class_index: int): |
|
"""Removes a track.""" |
|
del self._last_mask_per_track[class_index][track_id] |
|
del self._frames_since_last_update[class_index][track_id] |
|
|
|
def _increase_inactivity_of_track(self, track_id: int, class_index: int): |
|
"""Increases inactivity of track and potentially remove it.""" |
|
self._frames_since_last_update[class_index][track_id] += 1 |
|
if (self._frames_since_last_update[class_index][track_id] > |
|
self._sigma): |
|
self._remove_track(track_id, class_index) |
|
|
|
def _match_instances_to_tracks( |
|
self, instances: List[np.ndarray], class_index: int, |
|
instances_with_track_id: np.ndarray, |
|
warped_instances: List[np.ndarray]) -> np.ndarray: |
|
"""Match instances to tracks and update tracks accordingly.""" |
|
track_ids = list(self._last_mask_per_track[class_index].keys()) |
|
|
|
|
|
if warped_instances: |
|
matches, unmatched_instances, unmatched_tracks = ( |
|
self._associate_instances_to_tracks(warped_instances, class_index)) |
|
else: |
|
matches, unmatched_instances, unmatched_tracks = ( |
|
self._associate_instances_to_tracks(instances, class_index)) |
|
|
|
|
|
for instance_index, track_id_index in matches: |
|
track_id = track_ids[track_id_index] |
|
instance_mask = instances[instance_index] |
|
self._last_mask_per_track[class_index][track_id] = instance_mask |
|
self._frames_since_last_update[class_index][track_id] = 0 |
|
instances_with_track_id[instance_mask] = track_id |
|
|
|
|
|
for instance_index in unmatched_instances: |
|
instance_mask = instances[instance_index] |
|
self._add_track(instance_mask, class_index) |
|
instances_with_track_id[instance_mask] = self._next_track_id - 1 |
|
|
|
|
|
for track_id_index in unmatched_tracks: |
|
track_id = track_ids[track_id_index] |
|
self._increase_inactivity_of_track(track_id, class_index) |
|
|
|
return instances_with_track_id |
|
|
|
def update(self, predicted_frame: np.ndarray, |
|
predicted_flow: Optional[np.ndarray], |
|
predicted_occlusion: Optional[np.ndarray]) -> np.ndarray: |
|
"""Updates the tracking states and computes the track IDs. |
|
|
|
Args: |
|
predicted_frame: The panoptic label map for a particular video frame. |
|
predicted_flow: An optional np.array containing the optical flow. |
|
predicted_occlusion: An optional np.array containing the predicted |
|
occlusion map. |
|
|
|
Returns: |
|
The updated panoptic label map for the input frame containing track IDs. |
|
""" |
|
predicted_classes = predicted_frame // self._label_divisor |
|
predicted_instances = predicted_frame % self._label_divisor |
|
|
|
instances_with_track_id = np.zeros_like(predicted_instances) |
|
|
|
for class_index in self._classes_to_track: |
|
instances_mask = np.logical_and(predicted_classes == class_index, |
|
predicted_instances > 0) |
|
instance_ids = np.unique(predicted_instances[instances_mask]) |
|
instances = [ |
|
np.logical_and(instances_mask, predicted_instances == i) |
|
for i in instance_ids |
|
] |
|
|
|
|
|
if not instances: |
|
immutable_key_list = list(self._frames_since_last_update[class_index]) |
|
for track_id in immutable_key_list: |
|
self._increase_inactivity_of_track(track_id, class_index) |
|
continue |
|
|
|
|
|
if not self._last_mask_per_track[class_index]: |
|
for instance_mask in instances: |
|
self._add_track(instance_mask, class_index) |
|
instances_with_track_id[instance_mask] = self._next_track_id - 1 |
|
else: |
|
|
|
warped_instances = [] |
|
if predicted_occlusion is not None and predicted_flow is not None: |
|
for instance in instances: |
|
warped_instance = optical_flow_utils.warp_flow( |
|
instance.astype(np.float32), predicted_flow) |
|
warped_instances.append( |
|
optical_flow_utils.remove_occlusions(warped_instance, |
|
predicted_occlusion)) |
|
instances_with_track_id = self._match_instances_to_tracks( |
|
instances, class_index, instances_with_track_id, warped_instances) |
|
|
|
if self._next_track_id >= self._label_divisor: |
|
raise ValueError('To many tracks were detected for the given ' |
|
'label_divisor. Please increase the label_divisor to ' |
|
'make sure that the track Ids are less than the ' |
|
'label_divisor.') |
|
|
|
return predicted_classes * self._label_divisor + instances_with_track_id |
|
|
|
def _associate_instances_to_tracks( |
|
self, instances: List[np.ndarray], |
|
class_index: int) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: |
|
"""Matches the instances to existing tracks. |
|
|
|
Args: |
|
instances: A list of numpy arrays specifying the instance masks. |
|
class_index: An integer specifying the class index. |
|
|
|
Returns: |
|
A tuple of Lists: |
|
- Containing all indices of matches between instances and tracks. |
|
- Containing all indices of unmatched instances. |
|
- Containing all indices of unmatched tracks. |
|
""" |
|
number_of_instances = len(instances) |
|
number_of_tracks = len(self._last_mask_per_track[class_index]) |
|
iou_matrix = np.zeros((number_of_instances, number_of_tracks)) |
|
|
|
for i, instance_mask in enumerate(instances): |
|
for j, last_mask in enumerate( |
|
self._last_mask_per_track[class_index].values()): |
|
iou_matrix[i, j] = _compute_mask_iou(instance_mask, last_mask) |
|
|
|
matches_indices = np.stack( |
|
list(optimize.linear_sum_assignment(-iou_matrix)), axis=1) |
|
unmatched_instances = [ |
|
inst_id for inst_id in range(number_of_instances) |
|
if inst_id not in matches_indices[:, 0] |
|
] |
|
unmatched_tracks = [ |
|
inst_id for inst_id in range(number_of_tracks) |
|
if inst_id not in matches_indices[:, 1] |
|
] |
|
|
|
list_of_matches = [] |
|
for m in matches_indices: |
|
if iou_matrix[m[0], m[1]] > self._iou_threshold: |
|
list_of_matches.append(m) |
|
else: |
|
unmatched_instances.append(m[0]) |
|
unmatched_tracks.append(m[1]) |
|
|
|
return list_of_matches, unmatched_instances, unmatched_tracks |
|
|
|
|
|
def read_panoptic_image(path: Text, label_divisor: int) -> np.ndarray: |
|
"""Reads in a panoptic image in 2 channel format and returns as np array.""" |
|
with tf.io.gfile.GFile(path, 'rb') as f: |
|
image = tf.cast(tf.io.decode_image(f.read()), tf.int32).numpy() |
|
return image[..., 0] * label_divisor + image[..., 1] |
|
|
|
|
|
def read_numpy_tensor(path: Text) -> np.ndarray: |
|
"""Reads a numpy array from `path` and returns it.""" |
|
with tf.io.gfile.GFile(path, 'rb') as f: |
|
return np.load(f) |
|
|
|
|
|
def main(unused_args): |
|
if FLAGS.dataset not in dataset.MAP_NAME_TO_DATASET_INFO: |
|
raise ValueError('Given dataset option is not a valid dataset. Please use ' |
|
'--help to see available options.') |
|
dataset_info = dataset.MAP_NAME_TO_DATASET_INFO[FLAGS.dataset] |
|
thing_classes = dataset_info.class_has_instances_list |
|
ignore_label = dataset_info.ignore_label |
|
num_classes = dataset_info.num_classes |
|
colormap_name = dataset_info.colormap |
|
use_optical_flow = FLAGS.optical_flow is not None |
|
|
|
|
|
tracker = IoUTracker(thing_classes, _LABEL_DIVISOR) |
|
metric = stq.STQuality(num_classes, thing_classes, ignore_label, |
|
_LABEL_DIVISOR, 256*256*256) |
|
|
|
|
|
for gt_sequence_folder in tf.io.gfile.glob(os.path.join(FLAGS.gt, '*')): |
|
tracker.reset_states() |
|
color_map = dict() |
|
|
|
sequence = os.path.basename(gt_sequence_folder) |
|
if FLAGS.sequence and FLAGS.sequence != sequence: |
|
continue |
|
pred_sequence_folder = os.path.join(FLAGS.pred, sequence) |
|
if use_optical_flow: |
|
optical_flow_sequence_folder = os.path.join(FLAGS.optical_flow, sequence) |
|
|
|
for gt_frame_path in sorted(tf.io.gfile.glob( |
|
os.path.join(gt_sequence_folder, '*.png'))): |
|
gt_frame_name = gt_frame_path.split('/')[-1] |
|
pred_frame_name = os.path.join(pred_sequence_folder, gt_frame_name) |
|
flow = None |
|
occlusion = None |
|
logging.info('Processing sequence %s: frame %s.', sequence, gt_frame_name) |
|
gt_frame = read_panoptic_image(gt_frame_path, _LABEL_DIVISOR) |
|
pred_frame = read_panoptic_image(pred_frame_name, _LABEL_DIVISOR) |
|
if use_optical_flow: |
|
frame_id = int(os.path.splitext(gt_frame_name)[0]) |
|
flow_path = os.path.join(optical_flow_sequence_folder, |
|
'%06d%s' % (frame_id - 1, _FLOW_EXT)) |
|
occlusion_path = os.path.join(optical_flow_sequence_folder, |
|
'%06d%s' % (frame_id - 1, _OCCLUSION_EXT)) |
|
if tf.io.gfile.exists(flow_path): |
|
flow = read_numpy_tensor(flow_path) |
|
occlusion = read_numpy_tensor(occlusion_path)[0, ..., 0] |
|
else: |
|
logging.info('Could not find optical flow for current frame.') |
|
h, w = gt_frame.shape |
|
flow = np.zeros_like((h, w, 2), np.float32) |
|
occlusion = np.zeros_like((h, w), np.float32) |
|
pred_frame = tracker.update(pred_frame, flow, occlusion) |
|
if FLAGS.output: |
|
output_folder = os.path.join(FLAGS.output, sequence) |
|
tf.io.gfile.makedirs(output_folder) |
|
color_map = vis_utils.save_parsing_result(pred_frame, _LABEL_DIVISOR, |
|
thing_classes, output_folder, |
|
os.path.splitext( |
|
gt_frame_name)[0], |
|
color_map, |
|
colormap_name=colormap_name) |
|
metric.update_state( |
|
tf.convert_to_tensor(gt_frame), tf.convert_to_tensor(pred_frame), |
|
sequence) |
|
|
|
logging.info('Final results:') |
|
logging.info(_format_output(metric.result())) |
|
|
|
|
|
if __name__ == '__main__': |
|
flags.mark_flags_as_required(['gt', 'pred']) |
|
app.run(main) |
|
|