from utils.utils import morph_open import torch from kornia.color import rgb_to_grayscale import cv2 import numpy as np class FlowEstimation: def __init__(self, flow_estimator: str = "farneback"): assert flow_estimator in ["farneback", "dualtvl1"], "Flow estimator must be one of [farneback, dualtvl1]" if flow_estimator == "farneback": self.flow_estimator = self.OptFlow_Farneback elif flow_estimator == "dualtvl1": self.flow_estimator = self.OptFlow_DualTVL1 else: raise NotImplementedError def OptFlow_Farneback(self, I0: torch.Tensor, I1: torch.Tensor) -> torch.Tensor: device = I0.device I0 = I0.cpu().clamp(0, 1) * 255 I1 = I1.cpu().clamp(0, 1) * 255 batch_size = I0.shape[0] for i in range(batch_size): I0_np = I0[i].permute(1, 2, 0).numpy().astype(np.uint8) I1_np = I1[i].permute(1, 2, 0).numpy().astype(np.uint8) I0_gray = cv2.cvtColor(I0_np, cv2.COLOR_BGR2GRAY) I1_gray = cv2.cvtColor(I1_np, cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback(I0_gray, I1_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) flow = torch.from_numpy(flow).permute(2, 0, 1).unsqueeze(0).float() if i == 0: flows = flow else: flows = torch.cat((flows, flow), dim = 0) return flows.to(device) def OptFlow_DualTVL1( self, I0: torch.Tensor, I1: torch.Tensor, tau: float = 0.25, lambda_: float = 0.15, theta: float = 0.3, scales_number: int = 5, warps: int = 5, epsilon: float = 0.01, inner_iterations: int = 30, outer_iterations: int = 10, scale_step: float = 0.8, gamma: float = 0.0 ) -> torch.Tensor: optical_flow = cv2.optflow.createOptFlow_DualTVL1() optical_flow.setTau(tau) optical_flow.setLambda(lambda_) optical_flow.setTheta(theta) optical_flow.setScalesNumber(scales_number) optical_flow.setWarpingsNumber(warps) optical_flow.setEpsilon(epsilon) optical_flow.setInnerIterations(inner_iterations) optical_flow.setOuterIterations(outer_iterations) optical_flow.setScaleStep(scale_step) optical_flow.setGamma(gamma) device = I0.device I0 = I0.cpu().clamp(0, 1) * 255 I1 = I1.cpu().clamp(0, 1) * 255 batch_size = I0.shape[0] for i in range(batch_size): I0_np = I0[i].permute(1, 2, 0).numpy().astype(np.uint8) I1_np = I1[i].permute(1, 2, 0).numpy().astype(np.uint8) I0_gray = cv2.cvtColor(I0_np, cv2.COLOR_BGR2GRAY) I1_gray = cv2.cvtColor(I1_np, cv2.COLOR_BGR2GRAY) flow = optical_flow.calc(I0_gray, I1_gray, None) flow = torch.from_numpy(flow).permute(2, 0, 1).unsqueeze(0).float() if i == 0: flows = flow else: flows = torch.cat((flows, flow), dim = 0) return flows.to(device) def __call__(self, I1: torch.Tensor, I0: torch.Tensor) -> torch.Tensor: return self.flow_estimator(I1, I0) def get_inter_frame_temp_index( I0: torch.Tensor, It: torch.Tensor, I1: torch.Tensor, flow0tot: torch.Tensor, flow1tot: torch.Tensor, k: int = 5, threshold: float = 2e-2 ) -> torch.Tensor: I0_gray = rgb_to_grayscale(I0) It_gray = rgb_to_grayscale(It) I1_gray = rgb_to_grayscale(I1) mask0tot = morph_open(It_gray - I0_gray, k=k) mask1tot = morph_open(I1_gray - It_gray, k=k) mask0tot = (abs(mask0tot) > threshold).to(torch.uint8) mask1tot = (abs(mask1tot) > threshold).to(torch.uint8) flow_mag0tot = torch.sqrt(flow0tot[:, 0, :, :]**2 + flow0tot[:, 1, :, :]**2).unsqueeze(1) flow_mag1tot = torch.sqrt(flow1tot[:, 0, :, :]**2 + flow1tot[:, 1, :, :]**2).unsqueeze(1) norm0tot = (flow_mag0tot*mask0tot).squeeze(1) norm1tot = (flow_mag1tot*mask1tot).squeeze(1) d0tot = torch.sum(norm0tot, dim = (1, 2)) d1tot = torch.sum(norm1tot, dim = (1, 2)) return d0tot / (d0tot + d1tot + 1e-12)