|
import random |
|
from typing import Callable, Dict |
|
|
|
import torch |
|
from diffusers import DiffusionPipeline |
|
from diffusers.configuration_utils import ConfigMixin |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
def get_scaled_coeffs(): |
|
"""get_scaled_coeffs.""" |
|
beta_min = 0.85 |
|
beta_max = 12.0 |
|
return beta_min**0.5, beta_max**0.5 - beta_min**0.5 |
|
|
|
|
|
def beta(t): |
|
"""beta. |
|
|
|
Parameters |
|
---------- |
|
t : |
|
t |
|
""" |
|
a, b = get_scaled_coeffs() |
|
return (a + t * b) ** 2 |
|
|
|
|
|
def int_beta(t): |
|
"""int_beta. |
|
|
|
Parameters |
|
---------- |
|
t : |
|
t |
|
""" |
|
a, b = get_scaled_coeffs() |
|
return ((a + b * t) ** 3 - a**3) / (3 * b) |
|
|
|
|
|
def sigma(t): |
|
"""sigma. |
|
|
|
Parameters |
|
---------- |
|
t : |
|
t |
|
""" |
|
return torch.expm1(int_beta(t)) ** 0.5 |
|
|
|
|
|
def sigma_orig(t): |
|
"""sigma_orig. |
|
|
|
Parameters |
|
---------- |
|
t : |
|
t |
|
""" |
|
return (-torch.expm1(-int_beta(t))) ** 0.5 |
|
|
|
|
|
class SuperDiffSDXLPipeline(DiffusionPipeline, ConfigMixin): |
|
"""SuperDiffSDXLPipeline.""" |
|
|
|
def __init__( |
|
self, |
|
unet: Callable, |
|
vae: Callable, |
|
text_encoder: Callable, |
|
text_encoder_2: Callable, |
|
tokenizer: Callable, |
|
tokenizer_2: Callable, |
|
) -> None: |
|
"""__init__. |
|
|
|
Parameters |
|
---------- |
|
model : Callable |
|
model |
|
vae : Callable |
|
vae |
|
text_encoder : Callable |
|
text_encoder |
|
scheduler : Callable |
|
scheduler |
|
tokenizer : Callable |
|
tokenizer |
|
kwargs : |
|
kwargs |
|
|
|
Returns |
|
------- |
|
None |
|
|
|
""" |
|
super().__init__() |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
dtype = torch.float16 |
|
|
|
vae.to(device) |
|
unet.to(device, dtype=dtype) |
|
text_encoder.to(device, dtype=dtype) |
|
text_encoder_2.to(device, dtype=dtype) |
|
|
|
self.register_modules( |
|
unet=unet, |
|
vae=vae, |
|
text_encoder=text_encoder, |
|
text_encoder_2=text_encoder_2, |
|
tokenizer=tokenizer, |
|
tokenizer_2=tokenizer_2, |
|
) |
|
|
|
def prepare_prompt_input(self, prompt_o, prompt_b, batch_size, height, width): |
|
"""prepare_prompt_input. |
|
|
|
Parameters |
|
---------- |
|
prompt_o : |
|
prompt_o |
|
prompt_b : |
|
prompt_b |
|
batch_size : |
|
batch_size |
|
height : |
|
height |
|
width : |
|
width |
|
""" |
|
text_input = self.tokenizer( |
|
prompt_o * batch_size, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
text_input_2 = self.tokenizer_2( |
|
prompt_o * batch_size, |
|
padding="max_length", |
|
max_length=self.tokenizer_2.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
with torch.no_grad(): |
|
text_embeddings = self.text_encoder( |
|
text_input.input_ids.to(self.device), output_hidden_states=True |
|
) |
|
text_embeddings_2 = self.text_encoder_2( |
|
text_input_2.input_ids.to(self.device), output_hidden_states=True |
|
) |
|
prompt_embeds_o = torch.concat( |
|
(text_embeddings.hidden_states[-2], |
|
text_embeddings_2.hidden_states[-2]), |
|
dim=-1, |
|
) |
|
pooled_prompt_embeds_o = text_embeddings_2[0] |
|
negative_prompt_embeds = torch.zeros_like(prompt_embeds_o) |
|
negative_pooled_prompt_embeds = torch.zeros_like( |
|
pooled_prompt_embeds_o) |
|
|
|
text_input = self.tokenizer( |
|
prompt_b * batch_size, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
text_input_2 = self.tokenizer_2( |
|
prompt_b * batch_size, |
|
padding="max_length", |
|
max_length=self.tokenizer_2.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
with torch.no_grad(): |
|
text_embeddings = self.text_encoder( |
|
text_input.input_ids.to(self.device), output_hidden_states=True |
|
) |
|
text_embeddings_2 = self.text_encoder_2( |
|
text_input_2.input_ids.to(self.device), output_hidden_states=True |
|
) |
|
prompt_embeds_b = torch.concat( |
|
(text_embeddings.hidden_states[-2], |
|
text_embeddings_2.hidden_states[-2]), |
|
dim=-1, |
|
) |
|
pooled_prompt_embeds_b = text_embeddings_2[0] |
|
add_time_ids_o = torch.tensor([(height, width, 0, 0, height, width)]) |
|
add_time_ids_b = torch.tensor([(height, width, 0, 0, height, width)]) |
|
negative_add_time_ids = torch.tensor( |
|
[(height, width, 0, 0, height, width)]) |
|
prompt_embeds = torch.cat( |
|
[negative_prompt_embeds, prompt_embeds_o, prompt_embeds_b], dim=0 |
|
) |
|
add_text_embeds = torch.cat( |
|
[ |
|
negative_pooled_prompt_embeds, |
|
pooled_prompt_embeds_o, |
|
pooled_prompt_embeds_b, |
|
], |
|
dim=0, |
|
) |
|
add_time_ids = torch.cat( |
|
[negative_add_time_ids, add_time_ids_o, add_time_ids_b], dim=0 |
|
) |
|
|
|
prompt_embeds = prompt_embeds.to(self.device) |
|
add_text_embeds = add_text_embeds.to(self.device) |
|
add_time_ids = add_time_ids.to(self.device).repeat(batch_size, 1) |
|
added_cond_kwargs = { |
|
"text_embeds": add_text_embeds, "time_ids": add_time_ids} |
|
return prompt_embeds, added_cond_kwargs |
|
|
|
@torch.no_grad |
|
def get_batch(self, latents: Callable, nrow: int, ncol: int) -> Callable: |
|
"""get_batch. |
|
|
|
Parameters |
|
---------- |
|
latents : Callable |
|
latents |
|
nrow : int |
|
nrow |
|
ncol : int |
|
ncol |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
image = self.vae.decode( |
|
latents / self.vae.config.scaling_factor, return_dict=False |
|
)[0] |
|
image = (image / 2 + 0.5).clamp(0, 1).squeeze() |
|
if len(image.shape) < 4: |
|
image = image.unsqueeze(0) |
|
image = (image.permute(0, 2, 3, 1) * 255).to(torch.uint8) |
|
return image |
|
|
|
@torch.no_grad |
|
def get_text_embedding(self, prompt: str) -> Callable: |
|
"""get_text_embedding. |
|
|
|
Parameters |
|
---------- |
|
prompt : str |
|
prompt |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
text_input = self.tokenizer( |
|
prompt, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
return self.text_encoder(text_input.input_ids.to(self.device))[0] |
|
|
|
@torch.no_grad |
|
def get_vel(self, t: float, sigma: float, latents: Callable, embeddings: Callable): |
|
"""get_vel. |
|
|
|
Parameters |
|
---------- |
|
t : float |
|
t |
|
sigma : float |
|
sigma |
|
latents : Callable |
|
latents |
|
embeddings : Callable |
|
embeddings |
|
""" |
|
|
|
def v(_x, _e): |
|
"""v. |
|
|
|
Parameters |
|
---------- |
|
_x : |
|
_x |
|
_e : |
|
_e |
|
""" |
|
return self.model( |
|
_x / ((sigma**2 + 1) ** 0.5), t, encoder_hidden_states=_e |
|
).sample |
|
|
|
embeds = torch.cat(embeddings) |
|
latent_input = latents |
|
vel = v(latent_input, embeds) |
|
return vel |
|
|
|
def preprocess( |
|
self, |
|
prompt_1: str, |
|
prompt_2: str, |
|
seed: int = None, |
|
num_inference_steps: int = 200, |
|
batch_size: int = 1, |
|
height: int = 1024, |
|
width: int = 1024, |
|
guidance_scale: float = 7.5, |
|
) -> Callable: |
|
"""preprocess. |
|
|
|
Parameters |
|
---------- |
|
prompt_1 : str |
|
prompt_1 |
|
prompt_2 : str |
|
prompt_2 |
|
seed : int |
|
seed |
|
num_inference_steps : int |
|
num_inference_steps |
|
batch_size : int |
|
batch_size |
|
height : int |
|
height |
|
width : int |
|
width |
|
guidance_scale : float |
|
guidance_scale |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
|
|
self.batch_size = batch_size |
|
self.num_inference_steps = num_inference_steps |
|
self.guidance_scale = guidance_scale |
|
self.seed = seed |
|
if self.seed is None: |
|
self.seed = random.randint(0, 2**32 - 1) |
|
|
|
self.generator = torch.cuda.manual_seed( |
|
self.seed |
|
) |
|
|
|
latents = torch.randn( |
|
(batch_size, self.unet.in_channels, height // 8, width // 8), |
|
generator=self.generator, |
|
dtype=torch.float16, |
|
device=self.device, |
|
) |
|
prompt_embeds, added_cond_kwargs = self.prepare_prompt_input( |
|
prompt_1, prompt_2, batch_size, height, width |
|
) |
|
|
|
return { |
|
"latents": latents, |
|
"prompt_embeds": prompt_embeds, |
|
"added_cond_kwargs": added_cond_kwargs, |
|
} |
|
|
|
def _forward(self, model_inputs: Dict) -> Callable: |
|
"""_forward. |
|
|
|
Parameters |
|
---------- |
|
model_inputs : Dict |
|
model_inputs |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
latents = model_inputs["latents"] |
|
prompt_embeds = model_inputs["prompt_embeds"] |
|
added_cond_kwargs = model_inputs["added_cond_kwargs"] |
|
|
|
t = torch.tensor(1.0) |
|
dt = 1.0 / self.num_inference_steps |
|
train_number_steps = 1000 |
|
latents = latents * (sigma(t) ** 2 + 1) ** 0.5 |
|
with torch.no_grad(): |
|
for i in tqdm(range(self.num_inference_steps)): |
|
latent_model_input = torch.cat([latents] * 3) |
|
sigma_t = sigma(t) |
|
dsigma = sigma(t - dt) - sigma_t |
|
latent_model_input /= (sigma_t**2 + 1) ** 0.5 |
|
with torch.no_grad(): |
|
noise_pred = self.unet( |
|
latent_model_input, |
|
t * train_number_steps, |
|
encoder_hidden_states=prompt_embeds, |
|
added_cond_kwargs=added_cond_kwargs, |
|
return_dict=False, |
|
)[0] |
|
|
|
( |
|
noise_pred_uncond, |
|
noise_pred_text_o, |
|
noise_pred_text_b, |
|
) = noise_pred.chunk(3) |
|
|
|
|
|
noise = torch.sqrt(2 * torch.abs(dsigma) * sigma_t) * torch.empty_like( |
|
latents, device=self.device |
|
).normal_(generator=self.generator) |
|
|
|
dx_ind = ( |
|
2 |
|
* dsigma |
|
* ( |
|
noise_pred_uncond |
|
+ self.guidance_scale * |
|
(noise_pred_text_b - noise_pred_uncond) |
|
) |
|
+ noise |
|
) |
|
kappa = ( |
|
torch.abs(dsigma) |
|
* (noise_pred_text_b - noise_pred_text_o) |
|
* (noise_pred_text_b + noise_pred_text_o) |
|
).sum((1, 2, 3)) - ( |
|
dx_ind * ((noise_pred_text_o - noise_pred_text_b)) |
|
).sum( |
|
(1, 2, 3) |
|
) |
|
kappa /= ( |
|
2 |
|
* dsigma |
|
* self.guidance_scale |
|
* ((noise_pred_text_o - noise_pred_text_b) ** 2).sum((1, 2, 3)) |
|
) |
|
noise_pred = noise_pred_uncond + self.guidance_scale * ( |
|
(noise_pred_text_b - noise_pred_uncond) |
|
+ kappa[:, None, None, None] |
|
* (noise_pred_text_o - noise_pred_text_b) |
|
) |
|
|
|
if i < self.num_inference_steps - 3: |
|
latents += 2 * dsigma * noise_pred + noise |
|
else: |
|
latents += dsigma * noise_pred |
|
|
|
t -= dt |
|
return latents |
|
|
|
def postprocess(self, latents: Callable) -> Callable: |
|
"""postprocess. |
|
|
|
Parameters |
|
---------- |
|
latents : Callable |
|
latents |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
latents = latents / self.vae.config.scaling_factor |
|
latents = latents.to(torch.float32) |
|
with torch.no_grad(): |
|
image = self.vae.decode(latents, return_dict=False)[0] |
|
|
|
image = (image / 2 + 0.5).clamp(0, 1) |
|
image = image.detach().cpu().permute(0, 2, 3, 1).numpy() |
|
images = (image * 255).round().astype("uint8") |
|
return images |
|
|
|
def __call__( |
|
self, |
|
prompt_1: str, |
|
prompt_2: str, |
|
seed: int = None, |
|
num_inference_steps: int = 200, |
|
batch_size: int = 1, |
|
height: int = 1024, |
|
width: int = 1024, |
|
guidance_scale: float = 7.5, |
|
) -> Callable: |
|
"""__call__. |
|
|
|
Parameters |
|
---------- |
|
prompt_1 : str |
|
prompt_1 |
|
prompt_2 : str |
|
prompt_2 |
|
seed : int |
|
seed |
|
num_inference_steps : int |
|
num_inference_steps |
|
batch_size : int |
|
batch_size |
|
height : int |
|
height |
|
width : int |
|
width |
|
guidance_scale : float |
|
guidance_scale |
|
|
|
Returns |
|
------- |
|
Callable |
|
|
|
""" |
|
|
|
model_inputs = self.preprocess( |
|
prompt_1, |
|
prompt_2, |
|
seed, |
|
num_inference_steps, |
|
batch_size, |
|
height, |
|
width, |
|
guidance_scale, |
|
) |
|
|
|
|
|
latents = self._forward(model_inputs) |
|
|
|
|
|
images = self.postprocess(latents) |
|
return images |
|
|