Spaces:
Running
on
Zero
Running
on
Zero
import time | |
import torch | |
import spaces | |
from PIL import Image | |
from tqdm import tqdm | |
from threading import Thread | |
from torchvision import transforms | |
from huggingface_hub import hf_hub_download | |
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
from model import * | |
from unitok.config import Args | |
from unitok.model import UniTok | |
from conversation import conv_templates | |
from mm_utils import tokenizer_image_token | |
from helpers import sample, expand2square | |
import os | |
os.system("wget -q https://huggingface.co/FoundationVision/unitok_tokenizer/resolve/main/unitok_tokenizer.pth") | |
PILtransform = transforms.ToPILImage() | |
os.system("pip uninstall -y gradio") | |
os.system("pip install gradio==4.44.1") | |
os.system("pip install gradio_client==1.3.0") | |
import gradio as gr | |
IMAGE_TOKEN_INDEX=-200 | |
PLACEHOLDER = """ | |
<div style="padding: 30px; text-align: center; display: flex; flex-direction: column; align-items: center;"> | |
<img src='file/Liquid_icon.png' style="width: 80%; max-width: 600px; height: auto; opacity: 0.5;"> | |
<h1 style="font-size: 20px; margin-bottom: 1px; opacity: 0.55;">UniTok-MLLM-7B</h1> | |
</div> | |
""" | |
CSS =""" | |
.contain { display: flex; flex-direction: column; } | |
#component-0 { height: 100%; } | |
#chatbot { flex-grow: 1; } | |
""" | |
title_html = """ | |
<div style="display: flex; flex-direction: column; align-items: center; gap: 10px;"> | |
<h1 style="margin: 0; line-height: 1; text-align: center;">UniTok: A Unified Tokenizer for Visual Generation and Understanding</h1> | |
</div> | |
""" | |
links_html = f""" | |
<center><font size=3><a href='https://foundationvision.github.io/Liquid/'>UniTok</a> has been open-sourced on <a href='https://huggingface.co/FoundationVision/unitok_mllm'>😊 Huggingface</a> and <a href='https://github.com/FoundationVision/UniTok'>🌟 GitHub</a>. If you find Liquid useful, a like❤️ or a star🌟 would be appreciated.</font></center> | |
""" | |
introduction = f""" | |
This is a native MLLM built with UniTok, a unified visual tokenizer well-suited for both generation and understanding tasks. | |
More details can be found on the project <a href='https://foundationvision.github.io/UniTok/'> homepage</a> and in the <a href='https://arxiv.org/abs/2502.20321'> paper</a>. """ | |
ckpt = torch.load('unitok_tokenizer.pth', map_location='cpu') | |
vae_cfg = Args() | |
vae_cfg.load_state_dict(ckpt['args']) | |
vq_model = UniTok(vae_cfg) | |
vq_model.load_state_dict(ckpt['trainer']['unitok']) | |
vq_model.to('cuda') | |
vq_model.eval() | |
mllm_ckpt = 'FoundationVision/unitok_mllm' | |
tokenizer = AutoTokenizer.from_pretrained(mllm_ckpt, padding_side='left') | |
vqllm = MiniGeminiLlamaForCausalLM.from_pretrained(mllm_ckpt).cuda() | |
vqllm = vqllm.to(dtype=torch.bfloat16) | |
vqllm = vqllm.eval() | |
num_codebooks = vae_cfg.num_codebooks | |
def bot_streaming_I2T(message, history): | |
print(message) | |
global stop_flag | |
stop_flag = True | |
time.sleep(0.2) | |
stop_flag = False | |
torch.cuda.empty_cache() | |
if message["files"]: | |
# message["files"][-1] is a Dict or just a string | |
if type(message["files"][-1]) == dict: | |
image = message["files"][-1]["path"] | |
else: | |
image = message["files"][-1] | |
else: | |
# if there's no image uploaded for this turn, look for images in the past turns | |
# kept inside tuples, take the last one | |
for hist in history: | |
if type(hist[0]) == tuple: | |
image = hist[0][0] | |
try: | |
if image is None: | |
# Handle the case where image is None | |
gr.Error("You need to upload an image for UniTok to work.") | |
except NameError: | |
# Handle the case where 'image' is not defined at all | |
gr.Error("You need to upload an image for UniTok to work.") | |
qs = message['text'] | |
qs = '\x00<image>\x01' + '\n' + qs | |
conv = conv_templates['llava_v1'].copy() | |
conv.append_message(conv.roles[0], qs) | |
conv.append_message(conv.roles[1], None) | |
prompt = conv.get_prompt() | |
crop_size = 256 | |
transform = transforms.Compose([ | |
transforms.Resize((crop_size, crop_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True) | |
]) | |
print(prompt) | |
image = Image.open(image).convert('RGB') | |
pad_image = expand2square(image, (122, 116, 104) ) | |
# import pdb;pdb.set_trace() | |
img = transform(pad_image).unsqueeze(0) | |
img = img.to('cuda') | |
# import pdb;pdb.set_trace() | |
with torch.no_grad(): | |
vq_code = vq_model.img_to_idx(img) | |
image_codes = vq_code.unsqueeze(0) | |
input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt') | |
inputs = { | |
"inputs":input_ids.unsqueeze(0).to("cuda:0"), | |
"images":image_codes.to("cuda:0"), | |
"max_new_tokens":1024, | |
"bos_token_id":tokenizer.bos_token_id, # Begin of sequence token | |
"eos_token_id":tokenizer.eos_token_id, # End of sequence token | |
"pad_token_id":tokenizer.pad_token_id, # Pad token | |
} | |
streamer = TextIteratorStreamer(tokenizer, **{"skip_special_tokens": True, "skip_prompt": True}) | |
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way. | |
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024) | |
thread = Thread(target=vqllm.generate_mllm, kwargs=generation_kwargs) | |
thread.start() | |
generated_text = "" | |
for new_text in streamer: | |
generated_text += new_text | |
time.sleep(0.06) | |
yield generated_text | |
def show_gallery(images): | |
gallery = gr.Gallery(images, label="Gallery", columns=4, height="auto",preview=True,scale=0.05) # 设置两行两列的布局 | |
return gallery | |
def bot_streaming_T2I(message, history,guidance_scale, temperature, top_K, top_P): | |
global stop_flag | |
stop_flag = True | |
time.sleep(0.2) | |
stop_flag = False | |
text_inputs = [message]*4 # generate 4 samples once | |
uncondition_text_inputs = ['<unconditional>\x00']*len(text_inputs) | |
for i in range(len(text_inputs)): | |
text_inputs[i] = text_inputs[i]+' Generate an image based on this description.\x00' | |
ori_batchsize = len(text_inputs) | |
with torch.no_grad(): | |
if guidance_scale > 1: | |
model_inputs = tokenizer(text_inputs + uncondition_text_inputs, return_tensors="pt", padding=True).to('cuda') | |
else: | |
model_inputs = tokenizer(text_inputs, return_tensors="pt", padding=True).to('cuda') | |
model_kwargs = {'attention_mask':model_inputs.pop('attention_mask'), 'use_cache': True} | |
input_ids = model_inputs.pop('input_ids') | |
batch_size, cur_len = input_ids.shape | |
if "inputs_embeds" in model_kwargs: | |
cur_len = model_kwargs["inputs_embeds"].shape[1] | |
model_kwargs["cache_position"] = torch.arange(cur_len, device=input_ids.device) | |
with torch.no_grad(): | |
sampling_kwargs={'temperature': temperature, 'top_k': top_K, 'top_p': top_P, 'sample_logits': True} | |
pred_tokens = [] | |
input_multi_ids = None | |
for i in tqdm(range(256)): | |
model_inputs = vqllm.prepare_inputs_for_generation(input_ids, **model_kwargs) | |
outputs = vqllm.T2I_forward_withcache( | |
**model_inputs, | |
input_multi_ids=input_multi_ids, | |
return_dict=True, | |
output_attentions=False, | |
output_hidden_states=False, | |
) | |
next_embed = outputs['last_hidden_state'][:, -1:, :] | |
indices_arhead = [] | |
for i_head in range(num_codebooks): | |
ar_next_embed = vqllm.ar_head( | |
inputs_embeds=next_embed, | |
use_cache=False, | |
output_attentions=False, | |
output_hidden_states=False, | |
return_dict=False, | |
) | |
next_token_logits = vqllm.ar_head.linear_head(ar_next_embed[0]) | |
if guidance_scale > 1: | |
cond_logits, uncond_logits = torch.split(next_token_logits, len(next_token_logits) // 2, dim=0) | |
cfg_logits = uncond_logits + (cond_logits - uncond_logits) * guidance_scale | |
half_next_token, _ = sample(cfg_logits, **sampling_kwargs) | |
# pred_tokens.append(half_next_token) | |
next_token = torch.cat([half_next_token, half_next_token]) # [bz,1] | |
else: | |
next_token, next_prob = sample(next_token_logits, **sampling_kwargs) | |
# pred_tokens.append(next_token) | |
indices_arhead.append(next_token) | |
if i_head < num_codebooks - 1: | |
predicted_embed = vqllm.ar_head.codebooks[i_head](next_token) | |
next_embed = torch.cat([next_embed, predicted_embed], dim=1) | |
pred_tokens.append(torch.cat(indices_arhead, dim=1)) # [numcodebook,bz*2] | |
input_multi_ids = torch.stack(pred_tokens, dim=-1) | |
fake_id = torch.zeros_like(input_ids[:,:1]) | |
input_ids = torch.cat([input_ids, fake_id], dim=-1) # add fake id for cache | |
model_kwargs = vqllm._update_model_kwargs_for_generation( | |
outputs, | |
model_kwargs, | |
is_encoder_decoder=vqllm.config.is_encoder_decoder, | |
) | |
del sampling_kwargs | |
del model_inputs | |
del outputs | |
del model_kwargs | |
# image_vq_id = input_ids[:,prompt_length:prompt_length+256]-ori_vocabe_size | |
image_vq_id = torch.stack(pred_tokens, dim=-1)[:ori_batchsize] | |
generated_image_list = [] | |
rec_images = vq_model.idx_to_img(image_vq_id) | |
for index, rec_image in enumerate(rec_images): | |
rec_img = PILtransform(rec_image.squeeze(0).add(1).mul_(0.5).clamp_(0, 1)) | |
generated_image_list.append(rec_img) | |
torch.cuda.empty_cache() | |
yield show_gallery(generated_image_list) | |
chatbot_T2I=gr.Chatbot(height=600) | |
chat_input_T2I = gr.Textbox(placeholder="Enter text prompts...", show_label=False) | |
chatbot_I2T=gr.Chatbot(placeholder=PLACEHOLDER, scale=1) | |
chat_input_I2T = gr.MultimodalTextbox(interactive=True, file_types=["image"], placeholder="Enter message or upload file...", show_label=False) | |
with gr.Blocks(fill_height=True) as demo: | |
gr.Markdown(title_html) | |
gr.Markdown(links_html) | |
gr.Markdown(introduction) | |
with gr.Tab("Text To Image"): | |
description="Enter a text prompt or simply try one of the examples below to generate 4 images at once. Click to display the full image. You can configure hyperparameters for image generation in the Advanced Settings. " | |
gr.Markdown(description) | |
with gr.Accordion("⚙️ Advanced Settings", open=False): | |
with gr.Row(): | |
guidance_scale = gr.Slider(1.0, 20.0, value=7.0, label="Guidance Scale") | |
temperature = gr.Slider(0.0, 1.0, value=1.0, label="temperature") | |
top_K = gr.Slider(1, 4096, value=2048, label="Top K") | |
top_P = gr.Slider(0.0, 1.0, value=1.0, label="Top P") | |
aaa = gr.ChatInterface( | |
fn=bot_streaming_T2I, | |
examples=[ | |
["cherry tree on the surface of the moon", 5.0, 1.0, 2048, 1.0], | |
["New York City at night with starry night vincent van gogh style", 5.0, 1.0, 2048, 1.0], | |
["cavalier king charles spaniel being cute and ultra realistic with cute sunglasses", 5.0, 1.0, 2048, 1.0], | |
["anthophomorphic Shaman owl portrait, light rays, facepaint, detailed, digital photography", 5.0, 1.0, 2048, 1.0], | |
["denzel washington as lor krishna front facing looking straight into the eye in the battlefield of kurukshetra", 5.0, 1.0, 2048, 1.0], | |
["realxing mountain scene, warm colors, sunset, river in front of mountain, pine trees, oil painting, photo realistic, blue ambiant lighting", 5.0, 1.0, 2048, 1.0], | |
["the ship of the dead by aaron hawthorne, in the style of en plein air beach scenes, ian miller, jasper francis cropsey, joram roukes, emotional and dramatic scenes, rusty debris, danish golden age, sunset", 5.0, 1.0, 2048, 1.0], | |
["japanese sakura bonsai, best quality, ultra high res, scene featuring volumetric lighting, Urban alleyway, warm color temperature, Straight On, variable depth of field, dynamic composition", 5.0, 1.0, 2048, 1.0], | |
], | |
stop_btn="Stop Generation", | |
additional_inputs = [guidance_scale, temperature, top_K, top_P], | |
additional_inputs_accordion="⚙️ Advanced Settings", | |
multimodal=False, | |
cache_examples=False, | |
textbox=chat_input_T2I, | |
chatbot=chatbot_T2I, | |
fill_height=True, | |
) | |
with gr.Tab("Image To Text"): | |
bbb = gr.ChatInterface( | |
fn=bot_streaming_I2T, | |
examples=[{"text": "How to make this pastry?", "files": ["./baklava.png"]}], | |
description="Upload an image and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error.", | |
stop_btn="Stop Generation", | |
multimodal=True, | |
cache_examples=False, | |
textbox=chat_input_I2T, | |
chatbot=chatbot_I2T, | |
) | |
# demo.queue(api_open=False) | |
demo.launch(allowed_paths=["./"], share=True ) |