Spaces:
Running
on
Zero
Running
on
Zero
# Adapted from https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/unet_2d_blocks.py | |
import torch | |
from torch import nn | |
from .attention_2d import Transformer2DModel | |
from .resnet_2d import Downsample2D, ResnetBlock2D, Upsample2D | |
def get_down_block( | |
down_block_type, | |
num_layers, | |
in_channels, | |
out_channels, | |
temb_channels, | |
add_downsample, | |
resnet_eps, | |
resnet_act_fn, | |
attn_num_head_channels, | |
resnet_groups=None, | |
cross_attention_dim=None, | |
downsample_padding=None, | |
dual_cross_attention=False, | |
use_linear_projection=False, | |
only_cross_attention=False, | |
upcast_attention=False, | |
resnet_time_scale_shift="default", | |
use_sc_attn=False, | |
use_st_attn=False, | |
layer_id = 0, | |
): | |
down_block_type = down_block_type[7:] if down_block_type.startswith("UNetRes") else down_block_type | |
if down_block_type == "DownBlock2D": | |
return DownBlock2D( | |
num_layers=num_layers, | |
in_channels=in_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
add_downsample=add_downsample, | |
resnet_eps=resnet_eps, | |
resnet_act_fn=resnet_act_fn, | |
resnet_groups=resnet_groups, | |
downsample_padding=downsample_padding, | |
resnet_time_scale_shift=resnet_time_scale_shift, | |
) | |
elif down_block_type == "CrossAttnDownBlock2D": | |
if cross_attention_dim is None: | |
raise ValueError("cross_attention_dim must be specified for CrossAttnDownBlock2D") | |
return CrossAttnDownBlock2D( | |
num_layers=num_layers, | |
in_channels=in_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
add_downsample=add_downsample, | |
resnet_eps=resnet_eps, | |
resnet_act_fn=resnet_act_fn, | |
resnet_groups=resnet_groups, | |
downsample_padding=downsample_padding, | |
cross_attention_dim=cross_attention_dim, | |
attn_num_head_channels=attn_num_head_channels, | |
dual_cross_attention=dual_cross_attention, | |
use_linear_projection=use_linear_projection, | |
only_cross_attention=only_cross_attention, | |
upcast_attention=upcast_attention, | |
resnet_time_scale_shift=resnet_time_scale_shift, | |
use_sc_attn=use_sc_attn, | |
use_st_attn=use_st_attn, | |
layer_id=layer_id | |
) | |
raise ValueError(f"{down_block_type} does not exist.") | |
def get_up_block( | |
up_block_type, | |
num_layers, | |
in_channels, | |
out_channels, | |
prev_output_channel, | |
temb_channels, | |
add_upsample, | |
resnet_eps, | |
resnet_act_fn, | |
attn_num_head_channels, | |
resnet_groups=None, | |
cross_attention_dim=None, | |
dual_cross_attention=False, | |
use_linear_projection=False, | |
only_cross_attention=False, | |
upcast_attention=False, | |
resnet_time_scale_shift="default", | |
use_sc_attn=False, | |
use_st_attn=False, | |
layer_id=0 | |
): | |
up_block_type = up_block_type[7:] if up_block_type.startswith("UNetRes") else up_block_type | |
if up_block_type == "UpBlock2D": | |
return UpBlock2D( | |
num_layers=num_layers, | |
in_channels=in_channels, | |
out_channels=out_channels, | |
prev_output_channel=prev_output_channel, | |
temb_channels=temb_channels, | |
add_upsample=add_upsample, | |
resnet_eps=resnet_eps, | |
resnet_act_fn=resnet_act_fn, | |
resnet_groups=resnet_groups, | |
resnet_time_scale_shift=resnet_time_scale_shift, | |
) | |
elif up_block_type == "CrossAttnUpBlock2D": | |
if cross_attention_dim is None: | |
raise ValueError("cross_attention_dim must be specified for CrossAttnUpBlock2D") | |
return CrossAttnUpBlock2D( | |
num_layers=num_layers, | |
in_channels=in_channels, | |
out_channels=out_channels, | |
prev_output_channel=prev_output_channel, | |
temb_channels=temb_channels, | |
add_upsample=add_upsample, | |
resnet_eps=resnet_eps, | |
resnet_act_fn=resnet_act_fn, | |
resnet_groups=resnet_groups, | |
cross_attention_dim=cross_attention_dim, | |
attn_num_head_channels=attn_num_head_channels, | |
dual_cross_attention=dual_cross_attention, | |
use_linear_projection=use_linear_projection, | |
only_cross_attention=only_cross_attention, | |
upcast_attention=upcast_attention, | |
resnet_time_scale_shift=resnet_time_scale_shift, | |
use_sc_attn=use_sc_attn, | |
use_st_attn=use_st_attn, | |
layer_id=layer_id, | |
) | |
raise ValueError(f"{up_block_type} does not exist.") | |
class UNetMidBlock2DCrossAttn(nn.Module): | |
def __init__( | |
self, | |
in_channels: int, | |
temb_channels: int, | |
dropout: float = 0.0, | |
num_layers: int = 1, | |
resnet_eps: float = 1e-6, | |
resnet_time_scale_shift: str = "default", | |
resnet_act_fn: str = "swish", | |
resnet_groups: int = 32, | |
resnet_pre_norm: bool = True, | |
attn_num_head_channels=1, | |
output_scale_factor=1.0, | |
cross_attention_dim=1280, | |
dual_cross_attention=False, | |
use_linear_projection=False, | |
upcast_attention=False, | |
use_sc_attn=False, | |
use_st_attn=False, | |
): | |
super().__init__() | |
self.has_cross_attention = True | |
self.attn_num_head_channels = attn_num_head_channels | |
resnet_groups = resnet_groups if resnet_groups is not None else min(in_channels // 4, 32) | |
# there is always at least one resnet | |
resnets = [ | |
ResnetBlock2D( | |
in_channels=in_channels, | |
out_channels=in_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
] | |
attentions = [] | |
for _ in range(num_layers): | |
if dual_cross_attention: | |
raise NotImplementedError | |
attentions.append( | |
Transformer2DModel( | |
attn_num_head_channels, | |
in_channels // attn_num_head_channels, | |
in_channels=in_channels, | |
num_layers=1, | |
cross_attention_dim=cross_attention_dim, | |
norm_num_groups=resnet_groups, | |
use_linear_projection=use_linear_projection, | |
upcast_attention=upcast_attention, | |
use_sc_attn=use_sc_attn, | |
use_st_attn=True if (use_st_attn and _ == 0) else False, | |
updown='mid' | |
) | |
) | |
resnets.append( | |
ResnetBlock2D( | |
in_channels=in_channels, | |
out_channels=in_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
) | |
self.attentions = nn.ModuleList(attentions) | |
self.resnets = nn.ModuleList(resnets) | |
self.layer_id=0 | |
def forward( | |
self, | |
hidden_states, | |
temb=None, | |
encoder_hidden_states=None, | |
attention_mask=None, | |
encoder_attention_mask=None, | |
iter_cur=0, | |
save_kv=True, | |
mode='drag', | |
mask=None | |
): | |
hidden_states = self.resnets[0](hidden_states, temb) | |
for attn, resnet in zip(self.attentions, self.resnets[1:]): | |
hidden_states = attn( | |
hidden_states, | |
encoder_hidden_states=encoder_hidden_states, | |
encoder_attention_mask=encoder_attention_mask, | |
iter_cur=iter_cur, | |
save_kv=save_kv, | |
mode=mode, | |
mask=mask | |
).sample | |
hidden_states = resnet(hidden_states, temb) | |
return hidden_states | |
class CrossAttnDownBlock2D(nn.Module): | |
def __init__( | |
self, | |
in_channels: int, | |
out_channels: int, | |
temb_channels: int, | |
dropout: float = 0.0, | |
num_layers: int = 1, | |
resnet_eps: float = 1e-6, | |
resnet_time_scale_shift: str = "default", | |
resnet_act_fn: str = "swish", | |
resnet_groups: int = 32, | |
resnet_pre_norm: bool = True, | |
attn_num_head_channels=1, | |
cross_attention_dim=1280, | |
output_scale_factor=1.0, | |
downsample_padding=1, | |
add_downsample=True, | |
dual_cross_attention=False, | |
use_linear_projection=False, | |
only_cross_attention=False, | |
upcast_attention=False, | |
use_sc_attn=False, | |
use_st_attn=False, | |
layer_id=0 | |
): | |
super().__init__() | |
resnets = [] | |
attentions = [] | |
self.has_cross_attention = True | |
self.attn_num_head_channels = attn_num_head_channels | |
for i in range(num_layers): | |
in_channels = in_channels if i == 0 else out_channels | |
resnets.append( | |
ResnetBlock2D( | |
in_channels=in_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
) | |
if dual_cross_attention: | |
raise NotImplementedError | |
attentions.append( | |
Transformer2DModel( | |
attn_num_head_channels, | |
out_channels // attn_num_head_channels, | |
in_channels=out_channels, | |
num_layers=1, | |
cross_attention_dim=cross_attention_dim, | |
norm_num_groups=resnet_groups, | |
use_linear_projection=use_linear_projection, | |
only_cross_attention=only_cross_attention, | |
upcast_attention=upcast_attention, | |
use_sc_attn=use_sc_attn, | |
use_st_attn=True if (use_st_attn and i == 0) else False, | |
updown = 'down', | |
layer_id=layer_id | |
) | |
) | |
self.attentions = nn.ModuleList(attentions) | |
self.resnets = nn.ModuleList(resnets) | |
if add_downsample: | |
self.downsamplers = nn.ModuleList( | |
[ | |
Downsample2D( | |
out_channels, use_conv=True, out_channels=out_channels, padding=downsample_padding, name="op" | |
) | |
] | |
) | |
else: | |
self.downsamplers = None | |
self.gradient_checkpointing = False | |
def forward( | |
self, | |
hidden_states, | |
temb=None, | |
encoder_hidden_states=None, | |
attention_mask=None, | |
encoder_attention_mask=None, | |
iter_cur=0, | |
save_kv=True, | |
mode='drag', | |
mask=None | |
): # FIXMIE: attn mask | |
output_states = () | |
for resnet, attn in zip(self.resnets, self.attentions): | |
if self.training and self.gradient_checkpointing: | |
def create_custom_forward(module, return_dict=None): | |
def custom_forward(*inputs): | |
if return_dict is not None: | |
return module(*inputs, return_dict=return_dict, iter_cur=iter_cur) | |
else: | |
return module(*inputs) | |
return custom_forward | |
hidden_states = torch.utils.checkpoint.checkpoint(create_custom_forward(resnet), hidden_states, temb) | |
hidden_states = torch.utils.checkpoint.checkpoint( | |
create_custom_forward(attn, return_dict=False, iter_cur=iter_cur), | |
hidden_states, | |
encoder_hidden_states, | |
encoder_attention_mask, | |
)[0] | |
else: | |
hidden_states = resnet(hidden_states, temb) | |
hidden_states = attn( | |
hidden_states, | |
encoder_hidden_states=encoder_hidden_states, | |
encoder_attention_mask=encoder_attention_mask, | |
iter_cur=iter_cur, | |
save_kv=save_kv, | |
mode=mode, | |
mask=mask | |
).sample | |
output_states += (hidden_states,) | |
if self.downsamplers is not None: | |
for downsampler in self.downsamplers: | |
hidden_states = downsampler(hidden_states) | |
output_states += (hidden_states,) | |
return hidden_states, output_states | |
class DownBlock2D(nn.Module): | |
def __init__( | |
self, | |
in_channels: int, | |
out_channels: int, | |
temb_channels: int, | |
dropout: float = 0.0, | |
num_layers: int = 1, | |
resnet_eps: float = 1e-6, | |
resnet_time_scale_shift: str = "default", | |
resnet_act_fn: str = "swish", | |
resnet_groups: int = 32, | |
resnet_pre_norm: bool = True, | |
output_scale_factor=1.0, | |
add_downsample=True, | |
downsample_padding=1, | |
): | |
super().__init__() | |
resnets = [] | |
for i in range(num_layers): | |
in_channels = in_channels if i == 0 else out_channels | |
resnets.append( | |
ResnetBlock2D( | |
in_channels=in_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
) | |
self.resnets = nn.ModuleList(resnets) | |
if add_downsample: | |
self.downsamplers = nn.ModuleList( | |
[ | |
Downsample2D( | |
out_channels, use_conv=True, out_channels=out_channels, padding=downsample_padding, name="op" | |
) | |
] | |
) | |
else: | |
self.downsamplers = None | |
self.gradient_checkpointing = False | |
def forward(self, hidden_states, temb=None): | |
output_states = () | |
for resnet in self.resnets: | |
if self.training and self.gradient_checkpointing: | |
def create_custom_forward(module): | |
def custom_forward(*inputs): | |
return module(*inputs) | |
return custom_forward | |
hidden_states = torch.utils.checkpoint.checkpoint(create_custom_forward(resnet), hidden_states, temb) | |
else: | |
hidden_states = resnet(hidden_states, temb) | |
output_states += (hidden_states,) | |
if self.downsamplers is not None: | |
for downsampler in self.downsamplers: | |
hidden_states = downsampler(hidden_states) | |
output_states += (hidden_states,) | |
return hidden_states, output_states | |
from torch.fft import fftn, fftshift, ifftn, ifftshift | |
def fourier_filter(x_in: torch.Tensor, threshold: int, scale: int) -> torch.Tensor: | |
"""Fourier filter as introduced in FreeU (https://arxiv.org/abs/2309.11497). | |
This version of the method comes from here: | |
https://github.com/huggingface/diffusers/pull/5164#issuecomment-1732638706 | |
""" | |
x = x_in[:,:,0] | |
B, C, H, W = x.shape | |
# Non-power of 2 images must be float32 | |
if (W & (W - 1)) != 0 or (H & (H - 1)) != 0: | |
x = x.to(dtype=torch.float32) | |
# FFT | |
x_freq = fftn(x, dim=(-2, -1)) | |
x_freq = fftshift(x_freq, dim=(-2, -1)) | |
B, C, H, W = x_freq.shape | |
mask = torch.ones((B, C, H, W), device=x.device) | |
crow, ccol = H // 2, W // 2 | |
mask[..., crow - threshold : crow + threshold, ccol - threshold : ccol + threshold] = scale | |
x_freq = x_freq * mask | |
# IFFT | |
x_freq = ifftshift(x_freq, dim=(-2, -1)) | |
x_filtered = ifftn(x_freq, dim=(-2, -1)).real | |
x_filtered = x_filtered.unsqueeze(2) | |
return x_filtered.to(dtype=x_in.dtype) | |
def apply_freeu( | |
resolution_idx: int, hidden_states: torch.Tensor, res_hidden_states: torch.Tensor, **freeu_kwargs | |
): | |
"""Applies the FreeU mechanism as introduced in https: | |
//arxiv.org/abs/2309.11497. Adapted from the official code repository: https://github.com/ChenyangSi/FreeU. | |
Args: | |
resolution_idx (`int`): Integer denoting the UNet block where FreeU is being applied. | |
hidden_states (`torch.Tensor`): Inputs to the underlying block. | |
res_hidden_states (`torch.Tensor`): Features from the skip block corresponding to the underlying block. | |
s1 (`float`): Scaling factor for stage 1 to attenuate the contributions of the skip features. | |
s2 (`float`): Scaling factor for stage 2 to attenuate the contributions of the skip features. | |
b1 (`float`): Scaling factor for stage 1 to amplify the contributions of backbone features. | |
b2 (`float`): Scaling factor for stage 2 to amplify the contributions of backbone features. | |
""" | |
if resolution_idx == 0: | |
num_half_channels = hidden_states.shape[1] // 2 | |
hidden_states[:, :num_half_channels] = hidden_states[:, :num_half_channels] * freeu_kwargs["b1"] | |
res_hidden_states = fourier_filter(res_hidden_states, threshold=1, scale=freeu_kwargs["s1"]) | |
if resolution_idx == 1: | |
num_half_channels = hidden_states.shape[1] // 2 | |
hidden_states[:, :num_half_channels] = hidden_states[:, :num_half_channels] * freeu_kwargs["b2"] | |
res_hidden_states = fourier_filter(res_hidden_states, threshold=1, scale=freeu_kwargs["s2"]) | |
return hidden_states, res_hidden_states | |
class CrossAttnUpBlock2D(nn.Module): | |
def __init__( | |
self, | |
in_channels: int, | |
out_channels: int, | |
prev_output_channel: int, | |
temb_channels: int, | |
dropout: float = 0.0, | |
num_layers: int = 1, | |
resnet_eps: float = 1e-6, | |
resnet_time_scale_shift: str = "default", | |
resnet_act_fn: str = "swish", | |
resnet_groups: int = 32, | |
resnet_pre_norm: bool = True, | |
attn_num_head_channels=1, | |
cross_attention_dim=1280, | |
output_scale_factor=1.0, | |
add_upsample=True, | |
dual_cross_attention=False, | |
use_linear_projection=False, | |
only_cross_attention=False, | |
upcast_attention=False, | |
use_sc_attn=False, | |
use_st_attn=False, | |
layer_id=0 | |
): | |
super().__init__() | |
resnets = [] | |
attentions = [] | |
self.has_cross_attention = True | |
self.attn_num_head_channels = attn_num_head_channels | |
for i in range(num_layers): | |
res_skip_channels = in_channels if (i == num_layers - 1) else out_channels | |
resnet_in_channels = prev_output_channel if i == 0 else out_channels | |
resnets.append( | |
ResnetBlock2D( | |
in_channels=resnet_in_channels + res_skip_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
) | |
if dual_cross_attention: | |
raise NotImplementedError | |
attentions.append( | |
Transformer2DModel( | |
attn_num_head_channels, | |
out_channels // attn_num_head_channels, | |
in_channels=out_channels, | |
num_layers=1, | |
cross_attention_dim=cross_attention_dim, | |
norm_num_groups=resnet_groups, | |
use_linear_projection=use_linear_projection, | |
only_cross_attention=only_cross_attention, | |
upcast_attention=upcast_attention, | |
use_sc_attn=use_sc_attn, | |
use_st_attn=True if (use_st_attn and i == 0) else False, | |
updown = 'up', | |
layer_id=layer_id | |
) | |
) | |
self.attentions = nn.ModuleList(attentions) | |
self.resnets = nn.ModuleList(resnets) | |
if add_upsample: | |
self.upsamplers = nn.ModuleList([Upsample2D(out_channels, use_conv=True, out_channels=out_channels)]) | |
else: | |
self.upsamplers = None | |
self.gradient_checkpointing = False | |
def forward( | |
self, | |
hidden_states, | |
res_hidden_states_tuple, | |
temb=None, | |
encoder_hidden_states=None, | |
upsample_size=None, | |
attention_mask=None, | |
encoder_attention_mask=None, | |
iter_cur=0, | |
save_kv=True, | |
mode='drag', | |
mask=None | |
): | |
for resnet, attn in zip(self.resnets, self.attentions): | |
# pop res hidden states | |
res_hidden_states = res_hidden_states_tuple[-1] | |
res_hidden_states_tuple = res_hidden_states_tuple[:-1] | |
# # --------------- FreeU code ----------------------- | |
# if save_kv == False: | |
# self.b1 = 1.1 | |
# self.b2 = 1.2 | |
# self.s1 = 0.6 | |
# self.s2 = 0.4 | |
# # Only operate on the first two stages | |
# hidden_states, res_hidden_states = apply_freeu( | |
# self.resolution_idx, | |
# hidden_states, | |
# res_hidden_states, | |
# s1=self.s1, | |
# s2=self.s2, | |
# b1=self.b1, | |
# b2=self.b2, | |
# ) | |
# # --------------------------------------------------------- | |
hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) | |
if self.training and self.gradient_checkpointing: | |
def create_custom_forward(module, return_dict=None, iter_cur=iter_cur): | |
def custom_forward(*inputs): | |
if return_dict is not None: | |
return module(*inputs, return_dict=return_dict, iter_cur=iter_cur) | |
else: | |
return module(*inputs) | |
return custom_forward | |
hidden_states = torch.utils.checkpoint.checkpoint(create_custom_forward(resnet), hidden_states, temb) | |
hidden_states = torch.utils.checkpoint.checkpoint( | |
create_custom_forward(attn, return_dict=False, iter_cur=iter_cur), | |
hidden_states, | |
encoder_hidden_states, | |
encoder_attention_mask, | |
)[0] | |
else: | |
hidden_states = resnet(hidden_states, temb) | |
hidden_states = attn( | |
hidden_states, | |
encoder_hidden_states=encoder_hidden_states, | |
encoder_attention_mask=encoder_attention_mask, | |
iter_cur=iter_cur, | |
save_kv=save_kv, | |
mode=mode, | |
mask=mask | |
).sample | |
if self.upsamplers is not None: | |
for upsampler in self.upsamplers: | |
hidden_states = upsampler(hidden_states, upsample_size) | |
return hidden_states | |
class UpBlock2D(nn.Module): | |
def __init__( | |
self, | |
in_channels: int, | |
prev_output_channel: int, | |
out_channels: int, | |
temb_channels: int, | |
dropout: float = 0.0, | |
num_layers: int = 1, | |
resnet_eps: float = 1e-6, | |
resnet_time_scale_shift: str = "default", | |
resnet_act_fn: str = "swish", | |
resnet_groups: int = 32, | |
resnet_pre_norm: bool = True, | |
output_scale_factor=1.0, | |
add_upsample=True, | |
): | |
super().__init__() | |
resnets = [] | |
for i in range(num_layers): | |
res_skip_channels = in_channels if (i == num_layers - 1) else out_channels | |
resnet_in_channels = prev_output_channel if i == 0 else out_channels | |
resnets.append( | |
ResnetBlock2D( | |
in_channels=resnet_in_channels + res_skip_channels, | |
out_channels=out_channels, | |
temb_channels=temb_channels, | |
eps=resnet_eps, | |
groups=resnet_groups, | |
dropout=dropout, | |
time_embedding_norm=resnet_time_scale_shift, | |
non_linearity=resnet_act_fn, | |
output_scale_factor=output_scale_factor, | |
pre_norm=resnet_pre_norm, | |
) | |
) | |
self.resnets = nn.ModuleList(resnets) | |
if add_upsample: | |
self.upsamplers = nn.ModuleList([Upsample2D(out_channels, use_conv=True, out_channels=out_channels)]) | |
else: | |
self.upsamplers = None | |
self.gradient_checkpointing = False | |
def forward(self, hidden_states, res_hidden_states_tuple, temb=None, upsample_size=None): | |
for resnet in self.resnets: | |
# pop res hidden states | |
res_hidden_states = res_hidden_states_tuple[-1] | |
res_hidden_states_tuple = res_hidden_states_tuple[:-1] | |
hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) | |
if self.training and self.gradient_checkpointing: | |
def create_custom_forward(module): | |
def custom_forward(*inputs): | |
return module(*inputs) | |
return custom_forward | |
hidden_states = torch.utils.checkpoint.checkpoint(create_custom_forward(resnet), hidden_states, temb) | |
else: | |
hidden_states = resnet(hidden_states, temb) | |
if self.upsamplers is not None: | |
for upsampler in self.upsamplers: | |
hidden_states = upsampler(hidden_states, upsample_size) | |
return hidden_states |