atatakun's picture
Duplicate from atatakun/testapp2
18dd6ad
raw
history blame
22.9 kB
# Author: thygate
# https://github.com/thygate/stable-diffusion-webui-depthmap-script
from modules import devices
from modules.shared import opts
from torchvision.transforms import transforms
from operator import getitem
import torch, gc
import cv2
import numpy as np
import skimage.measure
whole_size_threshold = 1600 # R_max from the paper
pix2pixsize = 1024
def scale_torch(img):
"""
Scale the image and output it in torch.tensor.
:param img: input rgb is in shape [H, W, C], input depth/disp is in shape [H, W]
:param scale: the scale factor. float
:return: img. [C, H, W]
"""
if len(img.shape) == 2:
img = img[np.newaxis, :, :]
if img.shape[2] == 3:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406) , (0.229, 0.224, 0.225) )])
img = transform(img.astype(np.float32))
else:
img = img.astype(np.float32)
img = torch.from_numpy(img)
return img
def estimateleres(img, model, w, h):
# leres transform input
rgb_c = img[:, :, ::-1].copy()
A_resize = cv2.resize(rgb_c, (w, h))
img_torch = scale_torch(A_resize)[None, :, :, :]
# compute
with torch.no_grad():
img_torch = img_torch.to(devices.get_device_for("controlnet"))
prediction = model.depth_model(img_torch)
prediction = prediction.squeeze().cpu().numpy()
prediction = cv2.resize(prediction, (img.shape[1], img.shape[0]), interpolation=cv2.INTER_CUBIC)
return prediction
def generatemask(size):
# Generates a Guassian mask
mask = np.zeros(size, dtype=np.float32)
sigma = int(size[0]/16)
k_size = int(2 * np.ceil(2 * int(size[0]/16)) + 1)
mask[int(0.15*size[0]):size[0] - int(0.15*size[0]), int(0.15*size[1]): size[1] - int(0.15*size[1])] = 1
mask = cv2.GaussianBlur(mask, (int(k_size), int(k_size)), sigma)
mask = (mask - mask.min()) / (mask.max() - mask.min())
mask = mask.astype(np.float32)
return mask
def resizewithpool(img, size):
i_size = img.shape[0]
n = int(np.floor(i_size/size))
out = skimage.measure.block_reduce(img, (n, n), np.max)
return out
def rgb2gray(rgb):
# Converts rgb to gray
return np.dot(rgb[..., :3], [0.2989, 0.5870, 0.1140])
def calculateprocessingres(img, basesize, confidence=0.1, scale_threshold=3, whole_size_threshold=3000):
# Returns the R_x resolution described in section 5 of the main paper.
# Parameters:
# img :input rgb image
# basesize : size the dilation kernel which is equal to receptive field of the network.
# confidence: value of x in R_x; allowed percentage of pixels that are not getting any contextual cue.
# scale_threshold: maximum allowed upscaling on the input image ; it has been set to 3.
# whole_size_threshold: maximum allowed resolution. (R_max from section 6 of the main paper)
# Returns:
# outputsize_scale*speed_scale :The computed R_x resolution
# patch_scale: K parameter from section 6 of the paper
# speed scale parameter is to process every image in a smaller size to accelerate the R_x resolution search
speed_scale = 32
image_dim = int(min(img.shape[0:2]))
gray = rgb2gray(img)
grad = np.abs(cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)) + np.abs(cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3))
grad = cv2.resize(grad, (image_dim, image_dim), cv2.INTER_AREA)
# thresholding the gradient map to generate the edge-map as a proxy of the contextual cues
m = grad.min()
M = grad.max()
middle = m + (0.4 * (M - m))
grad[grad < middle] = 0
grad[grad >= middle] = 1
# dilation kernel with size of the receptive field
kernel = np.ones((int(basesize/speed_scale), int(basesize/speed_scale)), float)
# dilation kernel with size of the a quarter of receptive field used to compute k
# as described in section 6 of main paper
kernel2 = np.ones((int(basesize / (4*speed_scale)), int(basesize / (4*speed_scale))), float)
# Output resolution limit set by the whole_size_threshold and scale_threshold.
threshold = min(whole_size_threshold, scale_threshold * max(img.shape[:2]))
outputsize_scale = basesize / speed_scale
for p_size in range(int(basesize/speed_scale), int(threshold/speed_scale), int(basesize / (2*speed_scale))):
grad_resized = resizewithpool(grad, p_size)
grad_resized = cv2.resize(grad_resized, (p_size, p_size), cv2.INTER_NEAREST)
grad_resized[grad_resized >= 0.5] = 1
grad_resized[grad_resized < 0.5] = 0
dilated = cv2.dilate(grad_resized, kernel, iterations=1)
meanvalue = (1-dilated).mean()
if meanvalue > confidence:
break
else:
outputsize_scale = p_size
grad_region = cv2.dilate(grad_resized, kernel2, iterations=1)
patch_scale = grad_region.mean()
return int(outputsize_scale*speed_scale), patch_scale
# Generate a double-input depth estimation
def doubleestimate(img, size1, size2, pix2pixsize, model, net_type, pix2pixmodel):
# Generate the low resolution estimation
estimate1 = singleestimate(img, size1, model, net_type)
# Resize to the inference size of merge network.
estimate1 = cv2.resize(estimate1, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC)
# Generate the high resolution estimation
estimate2 = singleestimate(img, size2, model, net_type)
# Resize to the inference size of merge network.
estimate2 = cv2.resize(estimate2, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC)
# Inference on the merge model
pix2pixmodel.set_input(estimate1, estimate2)
pix2pixmodel.test()
visuals = pix2pixmodel.get_current_visuals()
prediction_mapped = visuals['fake_B']
prediction_mapped = (prediction_mapped+1)/2
prediction_mapped = (prediction_mapped - torch.min(prediction_mapped)) / (
torch.max(prediction_mapped) - torch.min(prediction_mapped))
prediction_mapped = prediction_mapped.squeeze().cpu().numpy()
return prediction_mapped
# Generate a single-input depth estimation
def singleestimate(img, msize, model, net_type):
# if net_type == 0:
return estimateleres(img, model, msize, msize)
# else:
# return estimatemidasBoost(img, model, msize, msize)
def applyGridpatch(blsize, stride, img, box):
# Extract a simple grid patch.
counter1 = 0
patch_bound_list = {}
for k in range(blsize, img.shape[1] - blsize, stride):
for j in range(blsize, img.shape[0] - blsize, stride):
patch_bound_list[str(counter1)] = {}
patchbounds = [j - blsize, k - blsize, j - blsize + 2 * blsize, k - blsize + 2 * blsize]
patch_bound = [box[0] + patchbounds[1], box[1] + patchbounds[0], patchbounds[3] - patchbounds[1],
patchbounds[2] - patchbounds[0]]
patch_bound_list[str(counter1)]['rect'] = patch_bound
patch_bound_list[str(counter1)]['size'] = patch_bound[2]
counter1 = counter1 + 1
return patch_bound_list
# Generating local patches to perform the local refinement described in section 6 of the main paper.
def generatepatchs(img, base_size):
# Compute the gradients as a proxy of the contextual cues.
img_gray = rgb2gray(img)
whole_grad = np.abs(cv2.Sobel(img_gray, cv2.CV_64F, 0, 1, ksize=3)) +\
np.abs(cv2.Sobel(img_gray, cv2.CV_64F, 1, 0, ksize=3))
threshold = whole_grad[whole_grad > 0].mean()
whole_grad[whole_grad < threshold] = 0
# We use the integral image to speed-up the evaluation of the amount of gradients for each patch.
gf = whole_grad.sum()/len(whole_grad.reshape(-1))
grad_integral_image = cv2.integral(whole_grad)
# Variables are selected such that the initial patch size would be the receptive field size
# and the stride is set to 1/3 of the receptive field size.
blsize = int(round(base_size/2))
stride = int(round(blsize*0.75))
# Get initial Grid
patch_bound_list = applyGridpatch(blsize, stride, img, [0, 0, 0, 0])
# Refine initial Grid of patches by discarding the flat (in terms of gradients of the rgb image) ones. Refine
# each patch size to ensure that there will be enough depth cues for the network to generate a consistent depth map.
print("Selecting patches ...")
patch_bound_list = adaptiveselection(grad_integral_image, patch_bound_list, gf)
# Sort the patch list to make sure the merging operation will be done with the correct order: starting from biggest
# patch
patchset = sorted(patch_bound_list.items(), key=lambda x: getitem(x[1], 'size'), reverse=True)
return patchset
def getGF_fromintegral(integralimage, rect):
# Computes the gradient density of a given patch from the gradient integral image.
x1 = rect[1]
x2 = rect[1]+rect[3]
y1 = rect[0]
y2 = rect[0]+rect[2]
value = integralimage[x2, y2]-integralimage[x1, y2]-integralimage[x2, y1]+integralimage[x1, y1]
return value
# Adaptively select patches
def adaptiveselection(integral_grad, patch_bound_list, gf):
patchlist = {}
count = 0
height, width = integral_grad.shape
search_step = int(32/factor)
# Go through all patches
for c in range(len(patch_bound_list)):
# Get patch
bbox = patch_bound_list[str(c)]['rect']
# Compute the amount of gradients present in the patch from the integral image.
cgf = getGF_fromintegral(integral_grad, bbox)/(bbox[2]*bbox[3])
# Check if patching is beneficial by comparing the gradient density of the patch to
# the gradient density of the whole image
if cgf >= gf:
bbox_test = bbox.copy()
patchlist[str(count)] = {}
# Enlarge each patch until the gradient density of the patch is equal
# to the whole image gradient density
while True:
bbox_test[0] = bbox_test[0] - int(search_step/2)
bbox_test[1] = bbox_test[1] - int(search_step/2)
bbox_test[2] = bbox_test[2] + search_step
bbox_test[3] = bbox_test[3] + search_step
# Check if we are still within the image
if bbox_test[0] < 0 or bbox_test[1] < 0 or bbox_test[1] + bbox_test[3] >= height \
or bbox_test[0] + bbox_test[2] >= width:
break
# Compare gradient density
cgf = getGF_fromintegral(integral_grad, bbox_test)/(bbox_test[2]*bbox_test[3])
if cgf < gf:
break
bbox = bbox_test.copy()
# Add patch to selected patches
patchlist[str(count)]['rect'] = bbox
patchlist[str(count)]['size'] = bbox[2]
count = count + 1
# Return selected patches
return patchlist
def impatch(image, rect):
# Extract the given patch pixels from a given image.
w1 = rect[0]
h1 = rect[1]
w2 = w1 + rect[2]
h2 = h1 + rect[3]
image_patch = image[h1:h2, w1:w2]
return image_patch
class ImageandPatchs:
def __init__(self, root_dir, name, patchsinfo, rgb_image, scale=1):
self.root_dir = root_dir
self.patchsinfo = patchsinfo
self.name = name
self.patchs = patchsinfo
self.scale = scale
self.rgb_image = cv2.resize(rgb_image, (round(rgb_image.shape[1]*scale), round(rgb_image.shape[0]*scale)),
interpolation=cv2.INTER_CUBIC)
self.do_have_estimate = False
self.estimation_updated_image = None
self.estimation_base_image = None
def __len__(self):
return len(self.patchs)
def set_base_estimate(self, est):
self.estimation_base_image = est
if self.estimation_updated_image is not None:
self.do_have_estimate = True
def set_updated_estimate(self, est):
self.estimation_updated_image = est
if self.estimation_base_image is not None:
self.do_have_estimate = True
def __getitem__(self, index):
patch_id = int(self.patchs[index][0])
rect = np.array(self.patchs[index][1]['rect'])
msize = self.patchs[index][1]['size']
## applying scale to rect:
rect = np.round(rect * self.scale)
rect = rect.astype('int')
msize = round(msize * self.scale)
patch_rgb = impatch(self.rgb_image, rect)
if self.do_have_estimate:
patch_whole_estimate_base = impatch(self.estimation_base_image, rect)
patch_whole_estimate_updated = impatch(self.estimation_updated_image, rect)
return {'patch_rgb': patch_rgb, 'patch_whole_estimate_base': patch_whole_estimate_base,
'patch_whole_estimate_updated': patch_whole_estimate_updated, 'rect': rect,
'size': msize, 'id': patch_id}
else:
return {'patch_rgb': patch_rgb, 'rect': rect, 'size': msize, 'id': patch_id}
def print_options(self, opt):
"""Print and save options
It will print both current options and default values(if different).
It will save options into a text file / [checkpoints_dir] / opt.txt
"""
message = ''
message += '----------------- Options ---------------\n'
for k, v in sorted(vars(opt).items()):
comment = ''
default = self.parser.get_default(k)
if v != default:
comment = '\t[default: %s]' % str(default)
message += '{:>25}: {:<30}{}\n'.format(str(k), str(v), comment)
message += '----------------- End -------------------'
print(message)
# save to the disk
"""
expr_dir = os.path.join(opt.checkpoints_dir, opt.name)
util.mkdirs(expr_dir)
file_name = os.path.join(expr_dir, '{}_opt.txt'.format(opt.phase))
with open(file_name, 'wt') as opt_file:
opt_file.write(message)
opt_file.write('\n')
"""
def parse(self):
"""Parse our options, create checkpoints directory suffix, and set up gpu device."""
opt = self.gather_options()
opt.isTrain = self.isTrain # train or test
# process opt.suffix
if opt.suffix:
suffix = ('_' + opt.suffix.format(**vars(opt))) if opt.suffix != '' else ''
opt.name = opt.name + suffix
#self.print_options(opt)
# set gpu ids
str_ids = opt.gpu_ids.split(',')
opt.gpu_ids = []
for str_id in str_ids:
id = int(str_id)
if id >= 0:
opt.gpu_ids.append(id)
#if len(opt.gpu_ids) > 0:
# torch.cuda.set_device(opt.gpu_ids[0])
self.opt = opt
return self.opt
def estimateboost(img, model, model_type, pix2pixmodel, max_res=512):
global whole_size_threshold
# get settings
if hasattr(opts, 'depthmap_script_boost_rmax'):
whole_size_threshold = opts.depthmap_script_boost_rmax
if model_type == 0: #leres
net_receptive_field_size = 448
patch_netsize = 2 * net_receptive_field_size
elif model_type == 1: #dpt_beit_large_512
net_receptive_field_size = 512
patch_netsize = 2 * net_receptive_field_size
else: #other midas
net_receptive_field_size = 384
patch_netsize = 2 * net_receptive_field_size
gc.collect()
devices.torch_gc()
# Generate mask used to smoothly blend the local pathc estimations to the base estimate.
# It is arbitrarily large to avoid artifacts during rescaling for each crop.
mask_org = generatemask((3000, 3000))
mask = mask_org.copy()
# Value x of R_x defined in the section 5 of the main paper.
r_threshold_value = 0.2
#if R0:
# r_threshold_value = 0
input_resolution = img.shape
scale_threshold = 3 # Allows up-scaling with a scale up to 3
# Find the best input resolution R-x. The resolution search described in section 5-double estimation of the main paper and section B of the
# supplementary material.
whole_image_optimal_size, patch_scale = calculateprocessingres(img, net_receptive_field_size, r_threshold_value, scale_threshold, whole_size_threshold)
# print('wholeImage being processed in :', whole_image_optimal_size)
# Generate the base estimate using the double estimation.
whole_estimate = doubleestimate(img, net_receptive_field_size, whole_image_optimal_size, pix2pixsize, model, model_type, pix2pixmodel)
# Compute the multiplier described in section 6 of the main paper to make sure our initial patch can select
# small high-density regions of the image.
global factor
factor = max(min(1, 4 * patch_scale * whole_image_optimal_size / whole_size_threshold), 0.2)
# print('Adjust factor is:', 1/factor)
# Check if Local boosting is beneficial.
if max_res < whole_image_optimal_size:
# print("No Local boosting. Specified Max Res is smaller than R20, Returning doubleestimate result")
return cv2.resize(whole_estimate, (input_resolution[1], input_resolution[0]), interpolation=cv2.INTER_CUBIC)
# Compute the default target resolution.
if img.shape[0] > img.shape[1]:
a = 2 * whole_image_optimal_size
b = round(2 * whole_image_optimal_size * img.shape[1] / img.shape[0])
else:
a = round(2 * whole_image_optimal_size * img.shape[0] / img.shape[1])
b = 2 * whole_image_optimal_size
b = int(round(b / factor))
a = int(round(a / factor))
"""
# recompute a, b and saturate to max res.
if max(a,b) > max_res:
print('Default Res is higher than max-res: Reducing final resolution')
if img.shape[0] > img.shape[1]:
a = max_res
b = round(max_res * img.shape[1] / img.shape[0])
else:
a = round(max_res * img.shape[0] / img.shape[1])
b = max_res
b = int(b)
a = int(a)
"""
img = cv2.resize(img, (b, a), interpolation=cv2.INTER_CUBIC)
# Extract selected patches for local refinement
base_size = net_receptive_field_size * 2
patchset = generatepatchs(img, base_size)
# print('Target resolution: ', img.shape)
# Computing a scale in case user prompted to generate the results as the same resolution of the input.
# Notice that our method output resolution is independent of the input resolution and this parameter will only
# enable a scaling operation during the local patch merge implementation to generate results with the same resolution
# as the input.
"""
if output_resolution == 1:
mergein_scale = input_resolution[0] / img.shape[0]
print('Dynamicly change merged-in resolution; scale:', mergein_scale)
else:
mergein_scale = 1
"""
# always rescale to input res for now
mergein_scale = input_resolution[0] / img.shape[0]
imageandpatchs = ImageandPatchs('', '', patchset, img, mergein_scale)
whole_estimate_resized = cv2.resize(whole_estimate, (round(img.shape[1]*mergein_scale),
round(img.shape[0]*mergein_scale)), interpolation=cv2.INTER_CUBIC)
imageandpatchs.set_base_estimate(whole_estimate_resized.copy())
imageandpatchs.set_updated_estimate(whole_estimate_resized.copy())
print('Resulting depthmap resolution will be :', whole_estimate_resized.shape[:2])
print('Patches to process: '+str(len(imageandpatchs)))
# Enumerate through all patches, generate their estimations and refining the base estimate.
for patch_ind in range(len(imageandpatchs)):
# Get patch information
patch = imageandpatchs[patch_ind] # patch object
patch_rgb = patch['patch_rgb'] # rgb patch
patch_whole_estimate_base = patch['patch_whole_estimate_base'] # corresponding patch from base
rect = patch['rect'] # patch size and location
patch_id = patch['id'] # patch ID
org_size = patch_whole_estimate_base.shape # the original size from the unscaled input
print('\t Processing patch', patch_ind, '/', len(imageandpatchs)-1, '|', rect)
# We apply double estimation for patches. The high resolution value is fixed to twice the receptive
# field size of the network for patches to accelerate the process.
patch_estimation = doubleestimate(patch_rgb, net_receptive_field_size, patch_netsize, pix2pixsize, model, model_type, pix2pixmodel)
patch_estimation = cv2.resize(patch_estimation, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC)
patch_whole_estimate_base = cv2.resize(patch_whole_estimate_base, (pix2pixsize, pix2pixsize), interpolation=cv2.INTER_CUBIC)
# Merging the patch estimation into the base estimate using our merge network:
# We feed the patch estimation and the same region from the updated base estimate to the merge network
# to generate the target estimate for the corresponding region.
pix2pixmodel.set_input(patch_whole_estimate_base, patch_estimation)
# Run merging network
pix2pixmodel.test()
visuals = pix2pixmodel.get_current_visuals()
prediction_mapped = visuals['fake_B']
prediction_mapped = (prediction_mapped+1)/2
prediction_mapped = prediction_mapped.squeeze().cpu().numpy()
mapped = prediction_mapped
# We use a simple linear polynomial to make sure the result of the merge network would match the values of
# base estimate
p_coef = np.polyfit(mapped.reshape(-1), patch_whole_estimate_base.reshape(-1), deg=1)
merged = np.polyval(p_coef, mapped.reshape(-1)).reshape(mapped.shape)
merged = cv2.resize(merged, (org_size[1],org_size[0]), interpolation=cv2.INTER_CUBIC)
# Get patch size and location
w1 = rect[0]
h1 = rect[1]
w2 = w1 + rect[2]
h2 = h1 + rect[3]
# To speed up the implementation, we only generate the Gaussian mask once with a sufficiently large size
# and resize it to our needed size while merging the patches.
if mask.shape != org_size:
mask = cv2.resize(mask_org, (org_size[1],org_size[0]), interpolation=cv2.INTER_LINEAR)
tobemergedto = imageandpatchs.estimation_updated_image
# Update the whole estimation:
# We use a simple Gaussian mask to blend the merged patch region with the base estimate to ensure seamless
# blending at the boundaries of the patch region.
tobemergedto[h1:h2, w1:w2] = np.multiply(tobemergedto[h1:h2, w1:w2], 1 - mask) + np.multiply(merged, mask)
imageandpatchs.set_updated_estimate(tobemergedto)
# output
return cv2.resize(imageandpatchs.estimation_updated_image, (input_resolution[1], input_resolution[0]), interpolation=cv2.INTER_CUBIC)