Spaces:
Running
on
Zero
Running
on
Zero
""" | |
From https://github.com/lm-sys/FastChat/blob/main/fastchat/conversation.py | |
""" | |
import dataclasses | |
import logging | |
import copy | |
from enum import IntEnum, auto | |
from typing import Dict, List | |
import base64 | |
import gradio as gr | |
import torch | |
import os | |
from .utils import pil_to_base64 | |
import mimetypes | |
IMAGE_TOKEN = "<image>" | |
logger = logging.getLogger("gradio_logger") | |
import cv2 | |
import base64 | |
import tempfile | |
import os | |
import imageio | |
def compress_video_to_base64( | |
video_path: str, | |
max_frames: int = 128, | |
resolution: tuple = (960, 540), | |
target_crf: int = 28 | |
) -> str: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise RuntimeError(f"无法打开视频:{video_path}") | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None | |
original_fps = cap.get(cv2.CAP_PROP_FPS) or None | |
if not total_frames or not original_fps: | |
cap.release() | |
raise RuntimeError("无法获取视频帧数或帧率,请检查视频文件或使用 ffprobe。") | |
step = max(1, total_frames // max_frames) | |
new_fps = max(1, round(original_fps / step)) | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: | |
tmp_path = tmp.name | |
writer = imageio.get_writer( | |
tmp_path, | |
fps=new_fps, | |
codec='libx264', | |
ffmpeg_params=[ | |
'-crf', str(target_crf), | |
'-pix_fmt', 'yuv420p' | |
] | |
) | |
frame_idx = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_idx % step == 0: | |
small = cv2.resize(frame, resolution) | |
writer.append_data(cv2.cvtColor(small, cv2.COLOR_BGR2RGB)) | |
frame_idx += 1 | |
cap.release() | |
writer.close() | |
with open(tmp_path, "rb") as f: | |
data = f.read() | |
os.remove(tmp_path) | |
return base64.b64encode(data).decode("utf-8") | |
class SeparatorStyle(IntEnum): | |
"""Separator styles.""" | |
PLAIN = auto() | |
ALIGNMENT = auto() | |
KIMI_VL = auto() | |
class Conversation: | |
"""A class that manages prompt templates and keeps all conversation history.""" | |
# The name of this template | |
name: str | |
# The template of the system prompt | |
system_template: str = "{system_message}" | |
# The system message | |
system_message: str = "" | |
# The names of two roles | |
roles: List[str] = (("USER", "ASSISTANT"),) | |
# All messages. Each item is (role, message). | |
messages: List[List[str]] = () | |
# The number of few shot examples | |
offset: int = 0 | |
# The separator style and configurations | |
sep_style: SeparatorStyle = SeparatorStyle.PLAIN | |
sep: str = "\n" | |
sep2: str = None | |
# Stop criteria (the default one is EOS token) | |
stop_str: str = None | |
# Stops generation if meeting any token in this list | |
stop_token_ids: List[int] = None | |
def get_prompt(self) -> str: | |
"""Get the prompt for generation.""" | |
system_prompt = self.system_template.format(system_message=self.system_message) | |
if self.sep_style == SeparatorStyle.PLAIN: | |
seps = [self.sep, self.sep2] | |
ret = "" | |
for i, (role, message) in enumerate(self.messages): | |
if message: | |
if type(message) is tuple: | |
message = message[0] | |
if i % 2 == 0: | |
ret += message + seps[i % 2] | |
else: | |
ret += message + seps[i % 2] | |
else: | |
ret += "" | |
return ret | |
elif self.sep_style == SeparatorStyle.ALIGNMENT: | |
seps = [self.sep, self.sep2] | |
ret = "" | |
for i, (role, message) in enumerate(self.messages): | |
if message: | |
if type(message) is tuple: | |
message, _, _ = message | |
if i % 2 == 0: | |
ret += '<image>\n' + seps[i % 2] | |
else: | |
ret += message + seps[i % 2] | |
else: | |
ret += "" | |
return ret | |
elif self.sep_style == SeparatorStyle.KIMI_VL: | |
seps = [self.sep, self.sep2] | |
if system_prompt == "" or system_prompt is None: | |
ret = "" | |
else: | |
ret = system_prompt + seps[0] | |
for i, (role, message) in enumerate(self.messages): | |
if message: | |
if type(message) is tuple: | |
message = message[0] | |
if role == "user": | |
ret += message + self.sep | |
else: | |
if self.sep2 is not None: | |
ret += message + self.sep2 | |
else: | |
ret += message | |
else: | |
ret = ret | |
return ret | |
else: | |
raise ValueError(f"Invalid style: {self.sep_style}") | |
def set_system_message(self, system_message: str): | |
"""Set the system message.""" | |
self.system_message = system_message | |
def append_message(self, role: str, message: str): | |
"""Append a new message.""" | |
self.messages.append([role, message]) | |
def update_last_message(self, message: str): | |
"""Update the last output. | |
The last message is typically set to be None when constructing the prompt, | |
so we need to update it in-place after getting the response from a model. | |
""" | |
self.messages[-1][1] = message | |
def reset_message(self): | |
"""Reset a new message.""" | |
self.messages = [] | |
def to_gradio_chatbot(self): | |
"""Convert the conversation to gradio chatbot format.""" | |
ret = [] | |
for i, (role, msg) in enumerate(self.messages[self.offset :]): | |
if i % 2 == 0: | |
ret.append([msg, None]) | |
else: | |
ret[-1][-1] = msg | |
return ret | |
def to_openai_api_messages(self): | |
"""Convert the conversation to OpenAI chat completion format.""" | |
system_prompt = self.system_template.format(system_message=self.system_message) | |
ret = [{"role": "system", "content": system_prompt}] | |
for i, (_, msg) in enumerate(self.messages[self.offset :]): | |
if i % 2 == 0: | |
ret.append({"role": "user", "content": msg}) | |
else: | |
if msg is not None: | |
ret.append({"role": "assistant", "content": msg}) | |
return ret | |
def copy(self): | |
return Conversation( | |
name=self.name, | |
system_template=self.system_template, | |
system_message=self.system_message, | |
roles=self.roles, | |
messages=[[x, y] for x, y in self.messages], | |
offset=self.offset, | |
sep_style=self.sep_style, | |
sep=self.sep, | |
sep2=self.sep2, | |
stop_str=self.stop_str, | |
stop_token_ids=self.stop_token_ids, | |
) | |
def dict(self): | |
return { | |
"template_name": self.name, | |
"system_message": self.system_message, | |
"roles": self.roles, | |
"messages": self.messages, | |
"offset": self.offset, | |
} | |
# A global registry for all conversation templates | |
conv_templates: Dict[str, Conversation] = {} | |
def register_conv_template(template: Conversation, override: bool = False): | |
"""Register a new conversation template.""" | |
if not override: | |
assert template.name not in conv_templates, f"{template.name} has been registered." | |
conv_templates[template.name] = template | |
def get_conv_template(name: str) -> Conversation: | |
"""Get a conversation template.""" | |
return conv_templates[name].copy() | |
register_conv_template( | |
Conversation( | |
name="plain", | |
system_template="", | |
system_message="", | |
roles=("", ""), | |
messages=(), | |
offset=0, | |
sep_style=SeparatorStyle.PLAIN, | |
sep="", | |
sep2="", | |
stop_token_ids=[100001], | |
stop_str=['</s>'], | |
) | |
) | |
register_conv_template( | |
Conversation( | |
name="alignment", | |
system_template="", | |
system_message="", | |
roles=("", ""), | |
messages=(), | |
offset=0, | |
sep_style=SeparatorStyle.ALIGNMENT, | |
sep="", | |
sep2="", | |
stop_token_ids=[100001], | |
stop_str=['</s>'], | |
) | |
) | |
register_conv_template( | |
Conversation( | |
name="kimi-vl", | |
system_template="{system_message}", | |
system_message="You are a helpful assistant", | |
roles=("user", "assistant"), | |
messages=(), | |
offset=0, | |
sep_style=SeparatorStyle.KIMI_VL, | |
sep="<|im_end|>", | |
sep2=None, | |
stop_token_ids=None, | |
stop_str=["<|im_end|>"], | |
) | |
) | |
def new_chat_template(sft_format: str = "kimi-vl"): | |
return get_conv_template(sft_format) | |
def get_prompt(conv: Conversation) -> str: | |
"""Get the prompt for generation.""" | |
return conv.get_prompt() | |
def generate_prompt_with_history(text, images, history, processor, max_length=2048): | |
""" | |
Generate a prompt with the chat history. | |
Args: | |
text (str): The text prompt. | |
images (list[PIL.Image.Image]): The image prompt. | |
history (list): List of previous conversation messages. | |
processor (KimiVLProcessor): The chat processor used for encoding the prompt. | |
max_length (int): The maximum length of the prompt. | |
""" | |
global IMAGE_TOKEN | |
user_role_ind = 0 | |
bot_role_ind = 1 | |
# Initialize conversation | |
conversation = new_chat_template(sft_format="kimi-vl") | |
if history: | |
conversation.messages = history | |
if images is not None and len(images) > 0: | |
# num_image_tags = text.count(IMAGE_TOKEN) | |
# num_images = len(images) | |
# if num_images > num_image_tags: | |
# pad_image_tags = num_images - num_image_tags | |
# image_tokens = "\n".join([IMAGE_TOKEN] * pad_image_tags) | |
# # append the <image> in a new line after the text prompt | |
# text = image_tokens + "\n" + text | |
# elif num_images < num_image_tags: | |
# remove_image_tags = num_image_tags - num_images | |
# text = text.replace(IMAGE_TOKEN, "", remove_image_tags) | |
print(f"prompt = {text}, len(images) = {len(images)}") | |
text = (text, images) | |
conversation.append_message(conversation.roles[user_role_ind], text) | |
conversation.append_message(conversation.roles[bot_role_ind], "") | |
# Create a copy of the conversation to avoid history truncation in the UI | |
conversation_copy = conversation.copy() | |
logger.info("=" * 80) | |
logger.info(get_prompt(conversation)) | |
rounds = len(conversation.messages) // 2 | |
for _ in range(rounds): | |
current_prompt = get_prompt(conversation) | |
assert isinstance(current_prompt, str) and len(current_prompt) > 0, f"current_prompt = {current_prompt}" | |
if torch.tensor(processor.tokenizer.encode(current_prompt)).size(-1) <= max_length: | |
return conversation_copy | |
if len(conversation.messages) % 2 != 0: | |
gr.Error("The messages between user and assistant are not paired.") | |
return | |
try: | |
for _ in range(2): # pop out two messages in a row | |
conversation.messages.pop(0) | |
except IndexError: | |
gr.Error("Input text processing failed, unable to respond in this round.") | |
return None | |
gr.Error("Prompt could not be generated within max_length limit.") | |
return None | |
def convert_conversation_to_prompts(conversation: Conversation): | |
""" | |
Convert the conversation to prompts. | |
""" | |
conv_prompts = [] | |
last_image = None | |
messages = conversation.messages | |
for i in range(0, len(messages), 2): | |
if isinstance(messages[i][1], tuple): | |
text, images = messages[i][1] | |
last_image = images[-1] | |
else: | |
text, images = messages[i][1], [] | |
prompt = {"role": messages[i][0], "content": text, "images": images} | |
response = {"role": messages[i + 1][0], "content": messages[i + 1][1]} | |
conv_prompts.extend([prompt, response]) | |
return conv_prompts, last_image | |
def to_gradio_chatbot2(conversation: Conversation) -> list: | |
"""Convert the conversation to gradio chatbot format.""" | |
ret = [] | |
for i, (_, msg) in enumerate(conversation.messages[conversation.offset :]): | |
if i % 2 == 0: | |
if type(msg) is tuple: | |
msg, images = copy.deepcopy(msg) | |
if isinstance(images, list): | |
img_str = "" | |
for j, image in enumerate(images): | |
if isinstance(image, str): | |
with open(image, "rb") as f: | |
data = f.read() | |
img_b64_str = base64.b64encode(data).decode() | |
image_str = ( | |
f'<img src="data:image/png;base64,{img_b64_str}" ' | |
f'alt="user upload image" style="max-width: 300px; height: auto;" />' | |
) | |
else: | |
image_str = pil_to_base64(image, f"user upload image_{j}", max_size=800, min_size=400) | |
img_str += image_str | |
msg = img_str + msg | |
else: | |
pass | |
ret.append([msg, None]) | |
else: | |
ret[-1][-1] = msg | |
return ret | |
def to_gradio_chatbot(conversation: Conversation) -> list: | |
"""Convert the conversation to gradio chatbot format, supporting images and video.""" | |
ret = [] | |
for i, (_, msg) in enumerate(conversation.messages[conversation.offset :]): | |
# User message | |
if i % 2 == 0: | |
if isinstance(msg, tuple): | |
msg_text, media = copy.deepcopy(msg) | |
media_str = "" | |
# Handle list of media items | |
if isinstance(media, list): | |
items = media | |
else: | |
items = [media] | |
for j, item in enumerate(items): | |
# If string path, determine type | |
if isinstance(item, str) and (not item.endswith((".mp4", ".mov", ".avi", ".webm"))): | |
mime, _ = mimetypes.guess_type(item) | |
with open(item, "rb") as f: | |
data = f.read() | |
b64 = base64.b64encode(data).decode() | |
if mime and mime.startswith("image/"): | |
media_str += ( | |
f'<img src="data:{mime};base64,{b64}" ' | |
f'alt="user upload image_{j}" ' | |
f'style="max-width:300px;height:auto;" />' | |
) | |
else: | |
# Fallback to link | |
media_str += f'<a href="{item}" target="_blank">{item}</a>' | |
elif isinstance(item, str) and (item.endswith((".mp4", ".mov", ".avi", ".webm"))): | |
try: | |
b64 = compress_video_to_base64(item) | |
media_str += ( | |
f'<video controls style="max-width:300px;height:auto;" ' | |
f'src="data:video/mp4;base64,{b64}"></video>' | |
) | |
except: | |
pass | |
# If PIL image | |
else: | |
media_str += pil_to_base64(item, f"user upload image_{j}", max_size=800, min_size=400) | |
msg = media_str + msg_text | |
# Append user side | |
ret.append([msg, None]) | |
else: | |
# Assistant side, fill previous tuple | |
ret[-1][-1] = msg | |
return ret | |
def to_gradio_history(conversation: Conversation): | |
"""Convert the conversation to gradio history format.""" | |
return conversation.messages[conversation.offset :] | |