|
|
|
|
|
import argparse |
|
import cv2 |
|
import torch |
|
import numpy as np |
|
import os, shutil, time |
|
import sys, random |
|
from multiprocessing import Pool |
|
from os import path as osp |
|
from tqdm import tqdm |
|
from math import log10, sqrt |
|
import torch.nn.functional as F |
|
|
|
root_path = os.path.abspath('.') |
|
sys.path.append(root_path) |
|
from degradation.ESR.degradations_functionality import * |
|
from degradation.ESR.diffjpeg import * |
|
from degradation.ESR.utils import filter2D |
|
from degradation.image_compression.jpeg import JPEG |
|
from degradation.image_compression.webp import WEBP |
|
from degradation.image_compression.heif import HEIF |
|
from degradation.image_compression.avif import AVIF |
|
from opt import opt |
|
|
|
|
|
def PSNR(original, compressed): |
|
mse = np.mean((original - compressed) ** 2) |
|
if(mse == 0): |
|
|
|
return 100 |
|
max_pixel = 255.0 |
|
psnr = 20 * log10(max_pixel / sqrt(mse)) |
|
return psnr |
|
|
|
|
|
|
|
def downsample_1st(out, opt): |
|
|
|
updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob'])[0] |
|
if updown_type == 'up': |
|
scale = np.random.uniform(1, opt['resize_range'][1]) |
|
elif updown_type == 'down': |
|
scale = np.random.uniform(opt['resize_range'][0], 1) |
|
else: |
|
scale = 1 |
|
mode = random.choice(opt['resize_options']) |
|
out = F.interpolate(out, scale_factor=scale, mode=mode) |
|
|
|
return out |
|
|
|
|
|
def downsample_2nd(out, opt, ori_h, ori_w): |
|
|
|
if opt['scale'] == 4: |
|
updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob2'])[0] |
|
if updown_type == 'up': |
|
scale = np.random.uniform(1, opt['resize_range2'][1]) |
|
elif updown_type == 'down': |
|
scale = np.random.uniform(opt['resize_range2'][0], 1) |
|
else: |
|
scale = 1 |
|
mode = random.choice(opt['resize_options']) |
|
|
|
|
|
out = F.interpolate( |
|
out, size=(int(ori_h / opt['scale'] * scale), int(ori_w / opt['scale'] * scale)), mode=mode |
|
) |
|
|
|
return out |
|
|
|
|
|
def common_degradation(out, opt, kernels, process_id, verbose = False): |
|
jpeger = DiffJPEG(differentiable=False).cuda() |
|
kernel1, kernel2 = kernels |
|
|
|
|
|
downsample_1st_position = random.choices([0, 1, 2])[0] |
|
if opt['scale'] == 4: |
|
|
|
downsample_2nd_position = random.choices([0, 1, 2])[0] |
|
else: |
|
|
|
downsample_2nd_position = -1 |
|
|
|
|
|
|
|
batch_size, _, ori_h, ori_w = out.size() |
|
|
|
if downsample_1st_position == 0: |
|
out = downsample_1st(out, opt) |
|
|
|
|
|
out = filter2D(out, kernel1) |
|
if verbose: print(f"(1st) blur noise") |
|
|
|
|
|
if downsample_1st_position == 1: |
|
out = downsample_1st(out, opt) |
|
|
|
|
|
|
|
gray_noise_prob = opt['gray_noise_prob'] |
|
if np.random.uniform() < opt['gaussian_noise_prob']: |
|
|
|
out = random_add_gaussian_noise_pt( |
|
out, sigma_range=opt['noise_range'], clip=True, rounds=False, gray_prob=gray_noise_prob) |
|
name = "gaussian_noise" |
|
else: |
|
|
|
out = random_add_poisson_noise_pt( |
|
out, |
|
scale_range=opt['poisson_scale_range'], |
|
gray_prob=gray_noise_prob, |
|
clip=True, |
|
rounds=False) |
|
name = "poisson_noise" |
|
if verbose: print("(1st) " + str(name)) |
|
|
|
|
|
if downsample_1st_position == 2: |
|
out = downsample_1st(out, opt) |
|
|
|
|
|
|
|
image_codec = random.choices(opt['compression_codec1'], opt['compression_codec_prob1'])[0] |
|
if image_codec == "jpeg": |
|
out = JPEG.compress_tensor(out) |
|
elif image_codec == "webp": |
|
try: |
|
out = WEBP.compress_tensor(out, idx=process_id) |
|
except Exception: |
|
print("There is exception again in webp!") |
|
out = WEBP.compress_tensor(out, idx=process_id) |
|
elif image_codec == "heif": |
|
out = HEIF.compress_tensor(out, idx=process_id) |
|
elif image_codec == "avif": |
|
out = AVIF.compress_tensor(out, idx=process_id) |
|
else: |
|
raise NotImplementedError("We don't have such image compression designed!") |
|
|
|
|
|
|
|
|
|
if downsample_2nd_position == 0: |
|
out = downsample_2nd(out, opt, ori_h, ori_w) |
|
|
|
|
|
|
|
if np.random.uniform() < opt['second_blur_prob']: |
|
|
|
if verbose: print("(2nd) blur noise") |
|
out = filter2D(out, kernel2) |
|
|
|
|
|
if downsample_2nd_position == 1: |
|
out = downsample_2nd(out, opt, ori_h, ori_w) |
|
|
|
|
|
|
|
gray_noise_prob = opt['gray_noise_prob2'] |
|
if np.random.uniform() < opt['gaussian_noise_prob2']: |
|
|
|
if verbose: print("(2nd) gaussian noise") |
|
out = random_add_gaussian_noise_pt( |
|
out, sigma_range=opt['noise_range2'], clip=True, rounds=False, gray_prob=gray_noise_prob) |
|
name = "gaussian_noise" |
|
else: |
|
|
|
if verbose: print("(2nd) poisson noise") |
|
out = random_add_poisson_noise_pt( |
|
out, scale_range=opt['poisson_scale_range2'], gray_prob=gray_noise_prob, clip=True, rounds=False) |
|
name = "poisson_noise" |
|
|
|
|
|
if downsample_2nd_position == 2: |
|
out = downsample_2nd(out, opt, ori_h, ori_w) |
|
|
|
|
|
return out |