Spaces:
Running
on
Zero
Running
on
Zero
import time | |
import torch | |
# import spaces | |
from PIL import Image | |
from tqdm import tqdm | |
from threading import Thread | |
from torchvision import transforms | |
from huggingface_hub import hf_hub_download | |
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
from model import * | |
from unitok.config import Args | |
from unitok.model import UniTok | |
from conversation import conv_templates | |
from mm_utils import tokenizer_image_token | |
from helpers import sample, expand2square | |
import os | |
os.system("wget -q https://huggingface.co/FoundationVision/unitok_tokenizer/resolve/main/unitok_tokenizer.pth") | |
PILtransform = transforms.ToPILImage() | |
os.system("pip uninstall -y gradio") | |
os.system("pip install gradio==4.44.1") | |
os.system("pip install gradio_client==1.3.0") | |
import gradio as gr | |
IMAGE_TOKEN_INDEX=-200 | |
PLACEHOLDER = """ | |
<div style="padding: 30px; text-align: center; display: flex; flex-direction: column; align-items: center;"> | |
<img src='file/Liquid_icon.png' style="width: 80%; max-width: 600px; height: auto; opacity: 0.5;"> | |
<h1 style="font-size: 20px; margin-bottom: 1px; opacity: 0.55;">UniTok-MLLM-7B</h1> | |
</div> | |
""" | |
CSS =""" | |
.contain { display: flex; flex-direction: column; } | |
#component-0 { height: 100%; } | |
#chatbot { flex-grow: 1; } | |
""" | |
title_html = """ | |
<div style="display: flex; flex-direction: column; align-items: center; gap: 10px;"> | |
<h1 style="margin: 0; line-height: 1; text-align: center;">UniTok: A Unified Tokenizer for Visual Generation and Understanding</h1> | |
</div> | |
""" | |
links_html = f""" | |
<center><font size=3><a href='https://foundationvision.github.io/Liquid/'>UniTok</a> has been open-sourced on <a href='https://huggingface.co/FoundationVision/unitok_mllm'>😊 Huggingface</a> and <a href='https://github.com/FoundationVision/UniTok'>🌟 GitHub</a>. If you find Liquid useful, a like❤️ or a star🌟 would be appreciated.</font></center> | |
""" | |
introduction = f""" | |
This is a native MLLM built with UniTok, a unified visual tokenizer well-suited for both generation and understanding tasks. | |
More details can be found on the project <a href='https://foundationvision.github.io/UniTok/'> homepage</a> and in the <a href='https://arxiv.org/abs/2502.20321'> paper</a>. """ | |
ckpt = torch.load('unitok_tokenizer.pth', map_location='cpu') | |
vae_cfg = Args() | |
vae_cfg.load_state_dict(ckpt['args']) | |
vq_model = UniTok(vae_cfg) | |
vq_model.load_state_dict(ckpt['trainer']['unitok']) | |
vq_model.to('cuda') | |
vq_model.eval() | |
mllm_ckpt = 'FoundationVision/unitok_mllm' | |
tokenizer = AutoTokenizer.from_pretrained(mllm_ckpt, padding_side='left') | |
vqllm = MiniGeminiLlamaForCausalLM.from_pretrained(mllm_ckpt).cuda() | |
vqllm = vqllm.to(dtype=torch.bfloat16) | |
vqllm = vqllm.eval() | |
num_codebooks = vae_cfg.num_codebooks | |
# @spaces.GPU | |
def bot_streaming_I2T(message, history): | |
print(message) | |
global stop_flag | |
stop_flag = True | |
time.sleep(0.2) | |
stop_flag = False | |
torch.cuda.empty_cache() | |
if message["files"]: | |
# message["files"][-1] is a Dict or just a string | |
if type(message["files"][-1]) == dict: | |
image = message["files"][-1]["path"] | |
else: | |
image = message["files"][-1] | |
else: | |
# if there's no image uploaded for this turn, look for images in the past turns | |
# kept inside tuples, take the last one | |
for hist in history: | |
if type(hist[0]) == tuple: | |
image = hist[0][0] | |
try: | |
if image is None: | |
# Handle the case where image is None | |
gr.Error("You need to upload an image for LLaVA to work.") | |
except NameError: | |
# Handle the case where 'image' is not defined at all | |
gr.Error("You need to upload an image for LLaVA to work.") | |
qs = message['text'] | |
qs = '\x00<image>\x01' + '\n' + qs | |
conv = conv_templates['llava_v1'].copy() | |
conv.append_message(conv.roles[0], qs) | |
conv.append_message(conv.roles[1], None) | |
prompt = conv.get_prompt() | |
crop_size = 256 | |
transform = transforms.Compose([ | |
transforms.Resize((crop_size, crop_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True) | |
]) | |
print(prompt) | |
image = Image.open(image).convert('RGB') | |
pad_image = expand2square(image, (122, 116, 104) ) | |
# import pdb;pdb.set_trace() | |
img = transform(pad_image).unsqueeze(0) | |
img = img.to('cuda') | |
# import pdb;pdb.set_trace() | |
with torch.no_grad(): | |
vq_code = vq_model.img_to_idx(img) | |
image_codes = vq_code.unsqueeze(0) | |
input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt') | |
inputs = { | |
"inputs":input_ids.unsqueeze(0).to("cuda:0"), | |
"images":image_codes.to("cuda:0"), | |
"max_new_tokens":1024, | |
"bos_token_id":tokenizer.bos_token_id, # Begin of sequence token | |
"eos_token_id":tokenizer.eos_token_id, # End of sequence token | |
"pad_token_id":tokenizer.pad_token_id, # Pad token | |
} | |
streamer = TextIteratorStreamer(tokenizer, **{"skip_special_tokens": True, "skip_prompt": True}) | |
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way. | |
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024) | |
thread = Thread(target=vqllm.generate_mllm, kwargs=generation_kwargs) | |
thread.start() | |
generated_text = "" | |
for new_text in streamer: | |
generated_text += new_text | |
time.sleep(0.06) | |
yield generated_text | |
def show_gallery(images): | |
gallery = gr.Gallery(images, label="Gallery", columns=4, height="auto",preview=True,scale=0.05) # 设置两行两列的布局 | |
return gallery | |
# @spaces.GPU | |
def bot_streaming_T2I(message, history,guidance_scale, temperature, top_K, top_P): | |
global stop_flag | |
stop_flag = True | |
time.sleep(0.2) | |
stop_flag = False | |
text_inputs = [message]*4 # generate 4 samples once | |
uncondition_text_inputs = ['<unconditional>\x00']*len(text_inputs) | |
for i in range(len(text_inputs)): | |
text_inputs[i] = text_inputs[i]+' Generate an image based on this description.\x00' | |
ori_batchsize = len(text_inputs) | |
with torch.no_grad(): | |
if guidance_scale > 1: | |
model_inputs = tokenizer(text_inputs + uncondition_text_inputs, return_tensors="pt", padding=True).to('cuda') | |
else: | |
model_inputs = tokenizer(text_inputs, return_tensors="pt", padding=True).to('cuda') | |
model_kwargs = {'attention_mask':model_inputs.pop('attention_mask'), 'use_cache': True} | |
input_ids = model_inputs.pop('input_ids') | |
batch_size, cur_len = input_ids.shape | |
if "inputs_embeds" in model_kwargs: | |
cur_len = model_kwargs["inputs_embeds"].shape[1] | |
model_kwargs["cache_position"] = torch.arange(cur_len, device=input_ids.device) | |
with torch.no_grad(): | |
sampling_kwargs={'temperature': temperature, 'top_k': top_K, 'top_p': top_P, 'sample_logits': True} | |
pred_tokens = [] | |
input_multi_ids = None | |
for i in tqdm(range(256)): | |
model_inputs = vqllm.prepare_inputs_for_generation(input_ids, **model_kwargs) | |
outputs = vqllm.T2I_forward_withcache( | |
**model_inputs, | |
input_multi_ids=input_multi_ids, | |
return_dict=True, | |
output_attentions=False, | |
output_hidden_states=False, | |
) | |
next_embed = outputs['last_hidden_state'][:, -1:, :] | |
indices_arhead = [] | |
for i_head in range(num_codebooks): | |
ar_next_embed = vqllm.ar_head( | |
inputs_embeds=next_embed, | |
use_cache=False, | |
output_attentions=False, | |
output_hidden_states=False, | |
return_dict=False, | |
) | |
next_token_logits = vqllm.ar_head.linear_head(ar_next_embed[0]) | |
if guidance_scale > 1: | |
cond_logits, uncond_logits = torch.split(next_token_logits, len(next_token_logits) // 2, dim=0) | |
cfg_logits = uncond_logits + (cond_logits - uncond_logits) * guidance_scale | |
half_next_token, _ = sample(cfg_logits, **sampling_kwargs) | |
# pred_tokens.append(half_next_token) | |
next_token = torch.cat([half_next_token, half_next_token]) # [bz,1] | |
else: | |
next_token, next_prob = sample(next_token_logits, **sampling_kwargs) | |
# pred_tokens.append(next_token) | |
indices_arhead.append(next_token) | |
if i_head < num_codebooks - 1: | |
predicted_embed = vqllm.ar_head.codebooks[i_head](next_token) | |
next_embed = torch.cat([next_embed, predicted_embed], dim=1) | |
pred_tokens.append(torch.cat(indices_arhead, dim=1)) # [numcodebook,bz*2] | |
input_multi_ids = torch.stack(pred_tokens, dim=-1) | |
fake_id = torch.zeros_like(input_ids[:,:1]) | |
input_ids = torch.cat([input_ids, fake_id], dim=-1) # add fake id for cache | |
model_kwargs = vqllm._update_model_kwargs_for_generation( | |
outputs, | |
model_kwargs, | |
is_encoder_decoder=vqllm.config.is_encoder_decoder, | |
) | |
del sampling_kwargs | |
del model_inputs | |
del outputs | |
del model_kwargs | |
# image_vq_id = input_ids[:,prompt_length:prompt_length+256]-ori_vocabe_size | |
image_vq_id = torch.stack(pred_tokens, dim=-1)[:ori_batchsize] | |
generated_image_list = [] | |
rec_images = vq_model.idx_to_img(image_vq_id) | |
for index, rec_image in enumerate(rec_images): | |
rec_img = PILtransform(rec_image.squeeze(0).add(1).mul_(0.5).clamp_(0, 1)) | |
generated_image_list.append(rec_img) | |
torch.cuda.empty_cache() | |
yield show_gallery(generated_image_list) | |
chatbot_T2I=gr.Chatbot(height=600) | |
chat_input_T2I = gr.Textbox(placeholder="Enter text prompts...", show_label=False) | |
chatbot_I2T=gr.Chatbot(placeholder=PLACEHOLDER, scale=1) | |
chat_input_I2T = gr.MultimodalTextbox(interactive=True, file_types=["image"], placeholder="Enter message or upload file...", show_label=False) | |
with gr.Blocks(fill_height=True) as demo: | |
gr.Markdown(title_html) | |
gr.Markdown(links_html) | |
gr.Markdown(introduction) | |
with gr.Tab("Text To Image"): | |
description="Enter a text prompt or simply try one of the examples below to generate 4 images at once. Click to display the full image. You can configure hyperparameters for image generation in the Advanced Settings. " | |
gr.Markdown(description) | |
with gr.Accordion("⚙️ Advanced Settings", open=False): | |
with gr.Row(): | |
guidance_scale = gr.Slider(1.0, 20.0, value=7.0, label="Guidance Scale") | |
temperature = gr.Slider(0.0, 1.0, value=0.9, label="temperature") | |
top_K = gr.Slider(1, 8192, value=4096, label="Top K") | |
top_P = gr.Slider(0.0, 1.0, value=0.99, label="Top P") | |
aaa = gr.ChatInterface( | |
fn=bot_streaming_T2I, | |
examples=[ | |
["realxing mountain scene, warm colors, sunset, river in front of mountain, pine trees, oil painting, photo realistic, blue ambiant lighting", 7.0, 1.0, 0, 1.0], | |
["Brandon Routh, color portrait by Herb Ritts, 1980s1990s", 7.0, 1.0, 0, 1.0], | |
["cabbage, Shot on 35mm, UltraWide Angle, Depth of Field, DOF, shutter Speed 11000, F22, photography, 8k", 7.0, 1.0, 0, 1.0], | |
["award winning photography, beautiful modern skyline, city sunset ", 7.0, 1.0, 0, 1.0], | |
["New York City at night with starry night vincent van gogh style", 7.0, 1.0, 0, 1.0], | |
["cute happy hedgehog holding sunflower in the morning ", 7.0, 1.0, 0, 1.0], | |
["cherry tree on the surface of the moon", 7.0, 1.0, 0, 1.0], | |
["a closeup of the owls eyes piercing through a dense fog. Blended in the ethereal, volumetric dust particles and embers, bokeh embers, microplumes of sand smoke, textured ash and smoke. Elegant, hyperdetailed, insane details, Unreal Engine, Cinematic, White Balance, 32k, Super Resolution, Chiaroscuro", 7.0, 1.0, 0, 1.0], | |
["Automotive front Photo Environment muddy Road in the jungle, golden hour Car vintage concept sports car yellow at high speed, motion blur, photo taken with Canon Eos R5 8k 45mp telephoto lens. 3D render, Vray, PBR car paint texture, hyper detailed, intricate symmetrical shape", 7.0, 1.0, 0, 1.0], | |
], | |
stop_btn="Stop Generation", | |
additional_inputs = [guidance_scale, temperature, top_K, top_P], | |
additional_inputs_accordion="⚙️ Advanced Settings", | |
multimodal=False, | |
cache_examples=False, | |
textbox=chat_input_T2I, | |
chatbot=chatbot_T2I, | |
fill_height=True, | |
) | |
with gr.Tab("Image To Text"): | |
bbb = gr.ChatInterface( | |
fn=bot_streaming_I2T, | |
examples=[{"text": "How to make this pastry?", "files": ["./baklava.png"]}], | |
description="Upload an image and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error.", | |
stop_btn="Stop Generation", | |
multimodal=True, | |
cache_examples=False, | |
textbox=chat_input_I2T, | |
chatbot=chatbot_I2T, | |
) | |
# demo.queue(api_open=False) | |
demo.launch(allowed_paths=["./"], share=True ) |