|
from utils.utils import morph_open |
|
|
|
import torch |
|
from kornia.color import rgb_to_grayscale |
|
|
|
import cv2 |
|
import numpy as np |
|
|
|
class FlowEstimation: |
|
def __init__(self, flow_estimator: str = "farneback"): |
|
assert flow_estimator in ["farneback", "dualtvl1"], "Flow estimator must be one of [farneback, dualtvl1]" |
|
|
|
if flow_estimator == "farneback": |
|
self.flow_estimator = self.OptFlow_Farneback |
|
elif flow_estimator == "dualtvl1": |
|
self.flow_estimator = self.OptFlow_DualTVL1 |
|
else: |
|
raise NotImplementedError |
|
|
|
def OptFlow_Farneback(self, I0: torch.Tensor, I1: torch.Tensor) -> torch.Tensor: |
|
device = I0.device |
|
|
|
I0 = I0.cpu().clamp(0, 1) * 255 |
|
I1 = I1.cpu().clamp(0, 1) * 255 |
|
|
|
batch_size = I0.shape[0] |
|
for i in range(batch_size): |
|
I0_np = I0[i].permute(1, 2, 0).numpy().astype(np.uint8) |
|
I1_np = I1[i].permute(1, 2, 0).numpy().astype(np.uint8) |
|
|
|
I0_gray = cv2.cvtColor(I0_np, cv2.COLOR_BGR2GRAY) |
|
I1_gray = cv2.cvtColor(I1_np, cv2.COLOR_BGR2GRAY) |
|
|
|
flow = cv2.calcOpticalFlowFarneback(I0_gray, I1_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) |
|
flow = torch.from_numpy(flow).permute(2, 0, 1).unsqueeze(0).float() |
|
if i == 0: |
|
flows = flow |
|
else: |
|
flows = torch.cat((flows, flow), dim = 0) |
|
|
|
return flows.to(device) |
|
|
|
def OptFlow_DualTVL1( |
|
self, |
|
I0: torch.Tensor, |
|
I1: torch.Tensor, |
|
tau: float = 0.25, |
|
lambda_: float = 0.15, |
|
theta: float = 0.3, |
|
scales_number: int = 5, |
|
warps: int = 5, |
|
epsilon: float = 0.01, |
|
inner_iterations: int = 30, |
|
outer_iterations: int = 10, |
|
scale_step: float = 0.8, |
|
gamma: float = 0.0 |
|
) -> torch.Tensor: |
|
optical_flow = cv2.optflow.createOptFlow_DualTVL1() |
|
optical_flow.setTau(tau) |
|
optical_flow.setLambda(lambda_) |
|
optical_flow.setTheta(theta) |
|
optical_flow.setScalesNumber(scales_number) |
|
optical_flow.setWarpingsNumber(warps) |
|
optical_flow.setEpsilon(epsilon) |
|
optical_flow.setInnerIterations(inner_iterations) |
|
optical_flow.setOuterIterations(outer_iterations) |
|
optical_flow.setScaleStep(scale_step) |
|
optical_flow.setGamma(gamma) |
|
|
|
device = I0.device |
|
|
|
I0 = I0.cpu().clamp(0, 1) * 255 |
|
I1 = I1.cpu().clamp(0, 1) * 255 |
|
|
|
batch_size = I0.shape[0] |
|
for i in range(batch_size): |
|
I0_np = I0[i].permute(1, 2, 0).numpy().astype(np.uint8) |
|
I1_np = I1[i].permute(1, 2, 0).numpy().astype(np.uint8) |
|
|
|
I0_gray = cv2.cvtColor(I0_np, cv2.COLOR_BGR2GRAY) |
|
I1_gray = cv2.cvtColor(I1_np, cv2.COLOR_BGR2GRAY) |
|
|
|
flow = optical_flow.calc(I0_gray, I1_gray, None) |
|
flow = torch.from_numpy(flow).permute(2, 0, 1).unsqueeze(0).float() |
|
if i == 0: |
|
flows = flow |
|
else: |
|
flows = torch.cat((flows, flow), dim = 0) |
|
|
|
return flows.to(device) |
|
|
|
def __call__(self, I1: torch.Tensor, I0: torch.Tensor) -> torch.Tensor: |
|
return self.flow_estimator(I1, I0) |
|
|
|
def get_inter_frame_temp_index( |
|
I0: torch.Tensor, |
|
It: torch.Tensor, |
|
I1: torch.Tensor, |
|
flow0tot: torch.Tensor, |
|
flow1tot: torch.Tensor, |
|
k: int = 5, |
|
threshold: float = 2e-2 |
|
) -> torch.Tensor: |
|
|
|
I0_gray = rgb_to_grayscale(I0) |
|
It_gray = rgb_to_grayscale(It) |
|
I1_gray = rgb_to_grayscale(I1) |
|
|
|
mask0tot = morph_open(It_gray - I0_gray, k=k) |
|
mask1tot = morph_open(I1_gray - It_gray, k=k) |
|
|
|
mask0tot = (abs(mask0tot) > threshold).to(torch.uint8) |
|
mask1tot = (abs(mask1tot) > threshold).to(torch.uint8) |
|
|
|
flow_mag0tot = torch.sqrt(flow0tot[:, 0, :, :]**2 + flow0tot[:, 1, :, :]**2).unsqueeze(1) |
|
flow_mag1tot = torch.sqrt(flow1tot[:, 0, :, :]**2 + flow1tot[:, 1, :, :]**2).unsqueeze(1) |
|
|
|
norm0tot = (flow_mag0tot*mask0tot).squeeze(1) |
|
norm1tot = (flow_mag1tot*mask1tot).squeeze(1) |
|
d0tot = torch.sum(norm0tot, dim = (1, 2)) |
|
d1tot = torch.sum(norm1tot, dim = (1, 2)) |
|
|
|
return d0tot / (d0tot + d1tot + 1e-12) |