|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import List, Optional, Union |
|
|
|
from transformers.image_processing_utils import BatchFeature, get_patch_output_size, select_best_resolution |
|
from transformers.image_processing_utils_fast import ( |
|
BASE_IMAGE_PROCESSOR_FAST_DOCSTRING, |
|
BASE_IMAGE_PROCESSOR_FAST_DOCSTRING_PREPROCESS, |
|
BaseImageProcessorFast, |
|
DefaultFastImageProcessorKwargs, |
|
divide_to_patches, |
|
group_images_by_shape, |
|
reorder_images, |
|
) |
|
from transformers.image_utils import ( |
|
OPENAI_CLIP_MEAN, |
|
OPENAI_CLIP_STD, |
|
IMAGENET_STANDARD_MEAN, |
|
IMAGENET_STANDARD_STD, |
|
ChannelDimension, |
|
ImageInput, |
|
VideoInput, |
|
PILImageResampling, |
|
SizeDict, |
|
get_image_size, |
|
make_flat_list_of_images, |
|
make_batched_videos, |
|
validate_kwargs |
|
) |
|
from transformers.processing_utils import Unpack |
|
from transformers.utils import TensorType, add_start_docstrings, is_torch_available, is_torchvision_v2_available |
|
|
|
|
|
if is_torch_available(): |
|
import torch |
|
if is_torchvision_v2_available(): |
|
from transformers.image_utils import pil_torch_interpolation_mapping |
|
|
|
from torchvision.transforms.v2 import functional as F |
|
else: |
|
from torchvision.transforms import functional as F |
|
|
|
def crop(img: torch.Tensor, left: int, top: int, right: int, bottom: int) -> torch.Tensor: |
|
"""Crop the given numpy array. |
|
|
|
Args: |
|
img (torch.Tensor): Image to be cropped. Format should be (C, H, W). |
|
left (int): The left coordinate of the crop box. |
|
top (int): The top coordinate of the crop box. |
|
right (int): The right coordinate of the crop box. |
|
bottom (int): The bottom coordinate of the crop box. |
|
|
|
Returns: |
|
torch.Tensor: Cropped image. |
|
""" |
|
if not isinstance(img, torch.Tensor): |
|
raise TypeError('img should be torch.Tensor. Got {}'.format(type(img))) |
|
|
|
if img.ndim not in [2, 3]: |
|
raise ValueError('Image should have 2 or 3 dimensions. Got {}'.format(img.ndim)) |
|
|
|
img_height = img.shape[1] |
|
img_width = img.shape[2] |
|
if top < 0 or left < 0 or bottom > img_height or right > img_width: |
|
raise ValueError('Crop coordinates out of bounds') |
|
|
|
if top >= bottom or left >= right: |
|
raise ValueError('Invalid crop coordinates') |
|
|
|
return img[:, top:bottom, left:right] |
|
|
|
|
|
class Eagle2_5_VLFastImageProcessorKwargs(DefaultFastImageProcessorKwargs): |
|
max_dynamic_tiles: Optional[int] |
|
min_dynamic_tiles: Optional[int] |
|
use_thumbnail: Optional[bool] |
|
pad_during_tiling: Optional[bool] |
|
do_pad: Optional[bool] |
|
|
|
|
|
@add_start_docstrings( |
|
"Constructs a fast ConvNeXT image processor. Based on [`SiglipImageProcessor`] with incorporation of processing each video frame.", |
|
BASE_IMAGE_PROCESSOR_FAST_DOCSTRING, |
|
""" |
|
image_grid_pinpoints (`List[List[int]]`, *optional*): |
|
A list of possible resolutions to use for processing high resolution images. The best resolution is selected |
|
based on the original size of the image. Can be overridden by `image_grid_pinpoints` in the `preprocess` |
|
method. Not used for processing videos. |
|
do_pad (`bool`, *optional*): |
|
Whether to pad the image. If `True`, will pad the patch dimension of the images in the batch to the largest |
|
number of patches in the batch. Padding will be applied to the bottom and right with zeros. |
|
""", |
|
) |
|
class Eagle2_5_VLImageProcessorFast(BaseImageProcessorFast): |
|
resample = PILImageResampling.BICUBIC |
|
image_mean = IMAGENET_STANDARD_MEAN |
|
image_std = IMAGENET_STANDARD_STD |
|
size = {"height": 448, "width": 448} |
|
default_to_square = False |
|
crop_size = None |
|
do_resize = True |
|
do_center_crop = None |
|
do_rescale = True |
|
do_normalize = True |
|
do_convert_rgb = True |
|
do_pad = True |
|
max_dynamic_tiles = 12 |
|
min_dynamic_tiles = 1 |
|
use_thumbnail = True |
|
pad_during_tiling = False |
|
valid_kwargs = Eagle2_5_VLFastImageProcessorKwargs |
|
model_input_names = ["pixel_values_videos"] |
|
|
|
def __init__(self, **kwargs: Unpack[Eagle2_5_VLFastImageProcessorKwargs]): |
|
super().__init__(**kwargs) |
|
|
|
@add_start_docstrings( |
|
BASE_IMAGE_PROCESSOR_FAST_DOCSTRING_PREPROCESS, |
|
""" |
|
max_dynamic_tiles (`int`, *optional*): |
|
The maximum number of dynamic tiles to use for processing high resolution images. |
|
min_dynamic_tiles (`int`, *optional*): |
|
The minimum number of dynamic tiles to use for processing high resolution images. |
|
use_thumbnail (`bool`, *optional*): |
|
Whether to use a thumbnail for processing high resolution images. |
|
pad_during_tiling (`bool`, *optional*): |
|
Whether to pad the image during tiling. |
|
do_pad (`bool`, *optional*): |
|
Whether to pad the image. If `True`, will pad the patch dimension of the images in the batch to the largest |
|
number of patches in the batch. Padding will be applied to the bottom and right with zeros. |
|
""", |
|
) |
|
def preprocess(self, images: ImageInput, **kwargs: Unpack[Eagle2_5_VLFastImageProcessorKwargs]) -> BatchFeature: |
|
return super().preprocess(images, **kwargs) |
|
|
|
def _prepare_images_structure( |
|
self, |
|
images: ImageInput, |
|
) -> ImageInput: |
|
""" |
|
Prepare the images structure for processing. |
|
|
|
Args: |
|
images (`ImageInput`): |
|
The input images to process. |
|
|
|
Returns: |
|
`ImageInput`: The images with a valid nesting. |
|
""" |
|
return make_flat_list_of_images(images) |
|
|
|
def _prepare_videos_structure(self, videos: VideoInput) -> VideoInput: |
|
return self._prepare_images_structure(videos) |
|
|
|
def _prepare_input_videos( |
|
self, |
|
videos: VideoInput, |
|
do_convert_rgb: Optional[bool] = None, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
device: Optional["torch.device"] = None, |
|
) -> list["torch.Tensor"]: |
|
""" |
|
Prepare the input images for processing. |
|
""" |
|
videos = self._prepare_videos_structure(videos) |
|
process_video_fn = partial( |
|
self._process_image, |
|
do_convert_rgb=do_convert_rgb, |
|
input_data_format=input_data_format, |
|
device=device, |
|
) |
|
|
|
processed_videos = [] |
|
for video in videos: |
|
processed_videos.append(process_video_fn(video)) |
|
|
|
return processed_videos |
|
|
|
def _resize_for_patching( |
|
self, |
|
image: "torch.Tensor", |
|
target_resolution: tuple, |
|
interpolation: "F.InterpolationMode", |
|
input_data_format: ChannelDimension, |
|
) -> "torch.Tensor": |
|
""" |
|
Resizes an image to a target resolution while maintaining aspect ratio. |
|
|
|
Args: |
|
image ("torch.Tensor"): |
|
The input image. |
|
target_resolution (tuple): |
|
The target resolution (height, width) of the image. |
|
interpolation (`InterpolationMode`): |
|
Resampling filter to use if resizing the image. |
|
input_data_format (`ChannelDimension` or `str`): |
|
The channel dimension format of the input image. |
|
|
|
Returns: |
|
"torch.Tensor": The resized and padded image. |
|
""" |
|
new_height, new_width = get_patch_output_size(image, target_resolution, input_data_format) |
|
|
|
|
|
resized_image = F.resize(image, (new_height, new_width), interpolation=interpolation) |
|
|
|
return resized_image |
|
|
|
def find_closest_aspect_ratio(self, aspect_ratio, target_ratios, width, height, image_size): |
|
""" |
|
previous version mainly foucs on ratio. |
|
We also consider area ratio here. |
|
""" |
|
best_factor = float('-inf') |
|
best_ratio = (1, 1) |
|
area = width * height |
|
for ratio in target_ratios: |
|
target_aspect_ratio = ratio[0] / ratio[1] |
|
ratio_diff = abs(aspect_ratio - target_aspect_ratio) |
|
area_ratio = (ratio[0]*ratio[1]*image_size*image_size)/ area |
|
""" |
|
new area > 60% of original image area is enough. |
|
""" |
|
factor_based_on_area_n_ratio = min((ratio[0]*ratio[1]*image_size*image_size)/ area, 0.6)* \ |
|
min(target_aspect_ratio/aspect_ratio, aspect_ratio/target_aspect_ratio) |
|
|
|
if factor_based_on_area_n_ratio > best_factor: |
|
best_factor = factor_based_on_area_n_ratio |
|
best_ratio = ratio |
|
|
|
return best_ratio |
|
|
|
def _pad_for_patching( |
|
self, image: "torch.Tensor", target_resolution: tuple, input_data_format: ChannelDimension |
|
) -> "torch.Tensor": |
|
""" |
|
Pad an image to a target resolution while maintaining aspect ratio. |
|
""" |
|
target_height, target_width = target_resolution |
|
new_height, new_width = get_patch_output_size(image, target_resolution, input_data_format) |
|
|
|
paste_x = (target_width - new_width) // 2 |
|
paste_y = (target_height - new_height) // 2 |
|
|
|
padded_image = F.pad(image, padding=[paste_x, paste_y, paste_x, paste_y]) |
|
|
|
return padded_image |
|
|
|
def _get_image_patches( |
|
self, |
|
image: "torch.Tensor", |
|
min_num: int, |
|
max_num: int, |
|
size: tuple, |
|
tile_size: int, |
|
use_thumbnail: bool, |
|
interpolation: "F.InterpolationMode", |
|
pad_during_tiling: bool, |
|
) -> List["torch.Tensor"] : |
|
image_size = get_image_size(image, channel_dim=ChannelDimension.FIRST) |
|
orig_height, orig_width = image_size |
|
aspect_ratio = orig_width / orig_height |
|
|
|
|
|
target_ratios = set( |
|
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if |
|
i * j <= max_num and i * j >= min_num) |
|
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) |
|
|
|
|
|
target_aspect_ratio = self.find_closest_aspect_ratio( |
|
aspect_ratio, target_ratios, orig_width, orig_height, tile_size) |
|
|
|
|
|
target_width = tile_size * target_aspect_ratio[0] |
|
target_height = tile_size * target_aspect_ratio[1] |
|
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] |
|
if pad_during_tiling: |
|
resized_image = self._resize_for_patching( |
|
image, (target_height, target_width), interpolation=interpolation, input_data_format=ChannelDimension.FIRST |
|
) |
|
padded_image = self._pad_for_patching(resized_image, (target_height, target_width), input_data_format=ChannelDimension.FIRST) |
|
image_used_to_split = padded_image |
|
else: |
|
image_used_to_split = F.resize(image, (target_height, target_width), interpolation=interpolation) |
|
|
|
processed_tiles = [] |
|
for i in range(blocks): |
|
box = ( |
|
(i % (target_width // tile_size)) * tile_size, |
|
(i // (target_width // tile_size)) * tile_size, |
|
((i % (target_width // tile_size)) + 1) * tile_size, |
|
((i // (target_width // tile_size)) + 1) * tile_size |
|
) |
|
|
|
split_img = crop(image_used_to_split, box[0], box[1], box[2], box[3]) |
|
processed_tiles.append(split_img) |
|
assert len(processed_tiles) == blocks |
|
|
|
if use_thumbnail and len(processed_tiles) != 1: |
|
thumbnail_img = F.resize(image, (tile_size, tile_size), interpolation=interpolation) |
|
processed_tiles.append(thumbnail_img) |
|
|
|
return processed_tiles |
|
|
|
def _pad_for_batching( |
|
self, |
|
pixel_values: List["torch.Tensor"], |
|
) -> List["torch.Tensor"]: |
|
""" |
|
Pads images on the `num_of_patches` dimension with zeros to form a batch of same number of patches. |
|
|
|
Args: |
|
pixel_values (`List[torch.Tensor]`): |
|
An array of pixel values of each images of shape (`batch_size`, `num_patches`, `image_in_3D`) |
|
|
|
Returns: |
|
List[`torch.Tensor`]: The padded images. |
|
""" |
|
max_patch = max(len(x) for x in pixel_values) |
|
pixel_values = [ |
|
torch.nn.functional.pad(image, pad=[0, 0, 0, 0, 0, 0, 0, max_patch - image.shape[0]]) |
|
for image in pixel_values |
|
] |
|
|
|
return pixel_values |
|
|
|
def _preprocess( |
|
self, |
|
images: List["torch.Tensor"], |
|
do_resize: bool, |
|
size: SizeDict, |
|
max_dynamic_tiles: int, |
|
min_dynamic_tiles: int, |
|
use_thumbnail: bool, |
|
pad_during_tiling: bool, |
|
interpolation: Optional["F.InterpolationMode"], |
|
do_center_crop: bool, |
|
crop_size: SizeDict, |
|
do_rescale: bool, |
|
rescale_factor: float, |
|
do_normalize: bool, |
|
image_mean: Optional[Union[float, List[float]]], |
|
image_std: Optional[Union[float, List[float]]], |
|
do_pad: bool, |
|
return_tensors: Optional[Union[str, TensorType]], |
|
) -> BatchFeature: |
|
processed_images = [] |
|
image_sizes = [] |
|
|
|
if size and size.height and size.width: |
|
size_tuple = (size.height, size.width) |
|
else: |
|
size_tuple = (size.shortest_edge, size.shortest_edge) |
|
|
|
|
|
if crop_size and crop_size.height: |
|
tile_size = crop_size.height |
|
elif size and size.height: |
|
tile_size = size.height |
|
else: |
|
tile_size = size.shortest_edge |
|
|
|
for image in images: |
|
image_patches = self._get_image_patches( |
|
image, |
|
min_num=min_dynamic_tiles, |
|
max_num=max_dynamic_tiles, |
|
size=size_tuple, |
|
tile_size=tile_size, |
|
use_thumbnail=use_thumbnail, |
|
interpolation=interpolation, |
|
pad_during_tiling=pad_during_tiling, |
|
) |
|
|
|
|
|
processed_image_patches_grouped = {} |
|
grouped_image_patches, grouped_image_patches_index = group_images_by_shape(image_patches) |
|
|
|
for shape, stacked_image_patches in grouped_image_patches.items(): |
|
if do_resize: |
|
stacked_image_patches = self.resize( |
|
image=stacked_image_patches, |
|
size=size, |
|
interpolation=interpolation, |
|
) |
|
if do_center_crop: |
|
stacked_image_patches = self.center_crop(stacked_image_patches, crop_size) |
|
|
|
stacked_image_patches = self.rescale_and_normalize( |
|
stacked_image_patches, do_rescale, rescale_factor, do_normalize, image_mean, image_std |
|
) |
|
processed_image_patches_grouped[shape] = stacked_image_patches |
|
processed_image_patches = reorder_images(processed_image_patches_grouped, grouped_image_patches_index) |
|
processed_image_patches = ( |
|
torch.stack(processed_image_patches, dim=0) if return_tensors else processed_image_patches |
|
) |
|
processed_images.append(processed_image_patches) |
|
image_sizes.append(get_image_size(image, ChannelDimension.FIRST)) |
|
|
|
if do_pad: |
|
processed_images = self._pad_for_batching(processed_images) |
|
|
|
|
|
processed_images = torch.cat(processed_images, dim=0) if return_tensors else processed_images |
|
return BatchFeature( |
|
data={"pixel_values": processed_images, "image_sizes": image_sizes}, tensor_type=return_tensors |
|
) |
|
|
|
|
|
def preprocess(self, images: ImageInput, videos: VideoInput=None, **kwargs: Unpack[Eagle2_5_VLFastImageProcessorKwargs]) -> BatchFeature: |
|
validate_kwargs(captured_kwargs=kwargs.keys(), valid_processor_keys=self.valid_kwargs.__annotations__.keys()) |
|
|
|
|
|
for kwarg_name in self.valid_kwargs.__annotations__: |
|
kwargs.setdefault(kwarg_name, getattr(self, kwarg_name, None)) |
|
|
|
|
|
do_convert_rgb = kwargs.pop("do_convert_rgb") |
|
input_data_format = kwargs.pop("input_data_format") |
|
device = kwargs.pop("device") |
|
|
|
if images is not None: |
|
images = self._prepare_input_images( |
|
images=images, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device |
|
) |
|
|
|
if videos is not None: |
|
videos = self._prepare_input_images( |
|
images=videos, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device |
|
) |
|
|
|
|
|
kwargs = self._further_process_kwargs(**kwargs) |
|
|
|
|
|
self._validate_preprocess_kwargs(**kwargs) |
|
|
|
|
|
resample = kwargs.pop("resample") |
|
kwargs["interpolation"] = ( |
|
pil_torch_interpolation_mapping[resample] if isinstance(resample, (PILImageResampling, int)) else resample |
|
) |
|
|
|
|
|
kwargs.pop("default_to_square") |
|
kwargs.pop("data_format") |
|
if images is not None: |
|
return self._preprocess(images, **kwargs) |
|
elif videos is not None: |
|
return self._preprocess(videos, **kwargs) |
|
|
|
__all__ = ["Eagle2_5_VLImageProcessorFast"] |
|
|