Spaces:
Running
on
Zero
Running
on
Zero
import argparse | |
import gradio as gr | |
import os | |
from PIL import Image # This was missing | |
import spaces | |
import copy | |
import numpy as np # Required if you're doing image array work | |
from kimi_vl.serve.frontend import reload_javascript | |
from kimi_vl.serve.utils import ( | |
configure_logger, | |
pil_to_base64, | |
parse_ref_bbox, | |
strip_stop_words, | |
is_variable_assigned, | |
) | |
from kimi_vl.serve.gradio_utils import ( | |
cancel_outputing, | |
delete_last_conversation, | |
reset_state, | |
reset_textbox, | |
transfer_input, | |
wrap_gen_fn, | |
) | |
from kimi_vl.serve.chat_utils import ( | |
generate_prompt_with_history, | |
convert_conversation_to_prompts, | |
to_gradio_chatbot, | |
to_gradio_history, | |
) | |
from kimi_vl.serve.inference import kimi_vl_generate, load_model | |
from kimi_vl.serve.examples import get_examples | |
TITLE = """<h1 align="left" style="min-width:200px; margin-top:0;">Chat with Kimi-VL-A3B-Thinking🤔 </h1>""" | |
DESCRIPTION_TOP = """<a href="https://github.com/MoonshotAI/Kimi-VL" target="_blank">Kimi-VL-A3B-Thinking</a> is a multi-modal LLM that can understand text, video images, and generate text with thinking processes. \n this specific space was hacked to also accept videos, and the system prompt has been changed to favor video analysis.""" | |
DESCRIPTION = """""" | |
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
DEPLOY_MODELS = dict() | |
logger = configure_logger() | |
def resize_image(image: Image.Image, max_size: int = 640, min_size: int = 28): | |
width, height = image.size | |
if width < min_size or height < min_size: | |
scale = min_size / min(width, height) | |
new_width = int(width * scale) | |
new_height = int(height * scale) | |
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
elif max_size > 0 and (width > max_size or height > max_size): | |
scale = max_size / max(width, height) | |
new_width = int(width * scale) | |
new_height = int(height * scale) | |
image = image.resize((new_width, new_height)) | |
return image | |
def load_frames(video_file, max_num_frames=64, long_edge=448): | |
from decord import VideoReader | |
vr = VideoReader(video_file) | |
duration = len(vr) | |
fps = vr.get_avg_fps() | |
length = int(duration / fps) | |
num_frames = min(max_num_frames, length) | |
frame_indices = [int(duration / num_frames * (i + 0.5)) for i in range(num_frames)] | |
frames_data = vr.get_batch(frame_indices).asnumpy() | |
imgs = [] | |
for idx in range(num_frames): | |
img = resize_image(Image.fromarray(frames_data[idx]).convert("RGB"), long_edge) | |
imgs.append(img) | |
return imgs | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--model", type=str, default="Kimi-VL-A3B-Thinking") | |
parser.add_argument("--local-path", type=str, default="", help="huggingface ckpt, optional") | |
parser.add_argument("--ip", type=str, default="0.0.0.0") | |
parser.add_argument("--port", type=int, default=7860) | |
return parser.parse_args() | |
def fetch_model(model_name: str): | |
global args, DEPLOY_MODELS | |
if args.local_path: | |
model_path = args.local_path | |
else: | |
model_path = f"moonshotai/{args.model}" | |
if model_name in DEPLOY_MODELS: | |
model_info = DEPLOY_MODELS[model_name] | |
print(f"{model_name} has been loaded.") | |
else: | |
print(f"{model_name} is loading...") | |
DEPLOY_MODELS[model_name] = load_model(model_path) | |
print(f"Load {model_name} successfully...") | |
model_info = DEPLOY_MODELS[model_name] | |
return model_info | |
def highlight_thinking(msg: str) -> str: | |
msg = copy.deepcopy(msg) | |
if "◁think▷" in msg: | |
msg = msg.replace("◁think▷", "<b style='color:blue;'>🤔Thinking...</b>\n") | |
if "◁/think▷" in msg: | |
msg = msg.replace("◁/think▷", "\n<b style='color:purple;'>💡Summary</b>\n") | |
return msg | |
def predict( | |
text, | |
images, | |
chatbot, | |
history, | |
top_p, | |
temperature, | |
max_length_tokens, | |
max_context_length_tokens, | |
video_num_frames, | |
video_long_edge, | |
chunk_size: int = 512, | |
): | |
print("running the prediction function") | |
try: | |
model, processor = fetch_model(args.model) | |
if text == "": | |
yield chatbot, history, "Empty context." | |
return | |
except KeyError: | |
yield [[text, "No Model Found"]], [], "No Model Found" | |
return | |
if images is None: | |
images = [] | |
pil_images = [] | |
for img_or_file in images: | |
try: | |
if isinstance(img_or_file, Image.Image): | |
pil_images.append(img_or_file) | |
else: | |
image = Image.open(img_or_file.name).convert("RGB") | |
pil_images.append(image) | |
except: | |
try: | |
pil_images = load_frames(img_or_file, video_num_frames, video_long_edge) | |
break | |
except Exception as e: | |
print(f"Error loading image or video: {e}") | |
conversation = generate_prompt_with_history( | |
text=text, | |
images=pil_images, | |
history=history, | |
processor=processor, | |
max_length=max_context_length_tokens, | |
) | |
all_conv, last_image = convert_conversation_to_prompts(conversation) | |
stop_words = conversation.stop_str | |
gradio_chatbot_output = to_gradio_chatbot(conversation) | |
full_response = "" | |
for x in kimi_vl_generate( | |
conversations=all_conv, | |
model=model, | |
processor=processor, | |
stop_words=stop_words, | |
max_length=max_length_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
full_response += x | |
response = strip_stop_words(full_response, stop_words) | |
conversation.update_last_message(response) | |
gradio_chatbot_output[-1][1] = highlight_thinking(response) | |
yield gradio_chatbot_output, to_gradio_history(conversation), "Generating..." | |
if last_image is not None: | |
vg_image = parse_ref_bbox(response, last_image) | |
if vg_image is not None: | |
vg_base64 = pil_to_base64(vg_image, "vg", max_size=800, min_size=400) | |
gradio_chatbot_output[-1][1] += vg_base64 | |
yield gradio_chatbot_output, to_gradio_history(conversation), "Generating..." | |
logger.info("flushed result to gradio") | |
if is_variable_assigned("x"): | |
print(f"temperature: {temperature}, top_p: {top_p}, max_length_tokens: {max_length_tokens}") | |
yield gradio_chatbot_output, to_gradio_history(conversation), "Generate: Success" | |
# ------------------------------ | |
# Interface + Launch | |
# ------------------------------ | |
if __name__ == "__main__": | |
args = parse_args() | |
reload_javascript() | |
with gr.Blocks(title="Kimi-VL") as demo: | |
gr.Markdown(TITLE) | |
gr.Markdown(DESCRIPTION_TOP) | |
with gr.Row(): | |
text_input = gr.Textbox(label="Enter your message", scale=4) | |
image_input = gr.File(label="Upload image or video", file_types=["image", "video"], file_count="multiple") | |
chatbot_output = gr.Chatbot(label="Kimi-VL Output") | |
history_state = gr.State([]) | |
top_p = gr.Slider(0, 1, value=0.9, label="Top-p") | |
temperature = gr.Slider(0.1, 1.5, value=0.6, label="Temperature") | |
max_length_tokens = gr.Slider(16, 4096, value=2048, step=64, label="Max Length") | |
max_context_length_tokens = gr.Slider(128, 4096, value=2048, step=64, label="Max Context") | |
video_num_frames = gr.Slider(4, 64, value=24, step=4, label="Frames (for video)") | |
video_long_edge = gr.Slider(128, 1024, value=1024, step=32, label="Long edge resize (video)") | |
submit_btn = gr.Button("Submit") | |
submit_btn.click( | |
predict, | |
inputs=[ | |
text_input, | |
image_input, | |
chatbot_output, | |
history_state, | |
top_p, | |
temperature, | |
max_length_tokens, | |
max_context_length_tokens, | |
video_num_frames, | |
video_long_edge, | |
], | |
outputs=[chatbot_output, history_state, gr.Textbox(visible=False)], | |
) | |
demo.queue().launch(server_name=args.ip, server_port=args.port) | |