""" From https://github.com/lm-sys/FastChat/blob/main/fastchat/conversation.py """ import dataclasses import logging import copy from enum import IntEnum, auto from typing import Dict, List import base64 import gradio as gr import torch import os from .utils import pil_to_base64 import mimetypes IMAGE_TOKEN = "" logger = logging.getLogger("gradio_logger") import cv2 import base64 import tempfile import os import imageio def compress_video_to_base64( video_path: str, max_frames: int = 128, resolution: tuple = (960, 540), target_crf: int = 28 ) -> str: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"无法打开视频:{video_path}") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None original_fps = cap.get(cv2.CAP_PROP_FPS) or None if not total_frames or not original_fps: cap.release() raise RuntimeError("无法获取视频帧数或帧率,请检查视频文件或使用 ffprobe。") step = max(1, total_frames // max_frames) new_fps = max(1, round(original_fps / step)) with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: tmp_path = tmp.name writer = imageio.get_writer( tmp_path, fps=new_fps, codec='libx264', ffmpeg_params=[ '-crf', str(target_crf), '-pix_fmt', 'yuv420p' ] ) frame_idx = 0 while True: ret, frame = cap.read() if not ret: break if frame_idx % step == 0: small = cv2.resize(frame, resolution) writer.append_data(cv2.cvtColor(small, cv2.COLOR_BGR2RGB)) frame_idx += 1 cap.release() writer.close() with open(tmp_path, "rb") as f: data = f.read() os.remove(tmp_path) return base64.b64encode(data).decode("utf-8") class SeparatorStyle(IntEnum): """Separator styles.""" PLAIN = auto() ALIGNMENT = auto() KIMI_VL = auto() @dataclasses.dataclass class Conversation: """A class that manages prompt templates and keeps all conversation history.""" # The name of this template name: str # The template of the system prompt system_template: str = "{system_message}" # The system message system_message: str = "" # The names of two roles roles: List[str] = (("USER", "ASSISTANT"),) # All messages. Each item is (role, message). messages: List[List[str]] = () # The number of few shot examples offset: int = 0 # The separator style and configurations sep_style: SeparatorStyle = SeparatorStyle.PLAIN sep: str = "\n" sep2: str = None # Stop criteria (the default one is EOS token) stop_str: str = None # Stops generation if meeting any token in this list stop_token_ids: List[int] = None def get_prompt(self) -> str: """Get the prompt for generation.""" system_prompt = self.system_template.format(system_message=self.system_message) if self.sep_style == SeparatorStyle.PLAIN: seps = [self.sep, self.sep2] ret = "" for i, (role, message) in enumerate(self.messages): if message: if type(message) is tuple: message = message[0] if i % 2 == 0: ret += message + seps[i % 2] else: ret += message + seps[i % 2] else: ret += "" return ret elif self.sep_style == SeparatorStyle.ALIGNMENT: seps = [self.sep, self.sep2] ret = "" for i, (role, message) in enumerate(self.messages): if message: if type(message) is tuple: message, _, _ = message if i % 2 == 0: ret += '\n' + seps[i % 2] else: ret += message + seps[i % 2] else: ret += "" return ret elif self.sep_style == SeparatorStyle.KIMI_VL: seps = [self.sep, self.sep2] if system_prompt == "" or system_prompt is None: ret = "" else: ret = system_prompt + seps[0] for i, (role, message) in enumerate(self.messages): if message: if type(message) is tuple: message = message[0] if role == "user": ret += message + self.sep else: if self.sep2 is not None: ret += message + self.sep2 else: ret += message else: ret = ret return ret else: raise ValueError(f"Invalid style: {self.sep_style}") def set_system_message(self, system_message: str): """Set the system message.""" self.system_message = system_message def append_message(self, role: str, message: str): """Append a new message.""" self.messages.append([role, message]) def update_last_message(self, message: str): """Update the last output. The last message is typically set to be None when constructing the prompt, so we need to update it in-place after getting the response from a model. """ self.messages[-1][1] = message def reset_message(self): """Reset a new message.""" self.messages = [] def to_gradio_chatbot(self): """Convert the conversation to gradio chatbot format.""" ret = [] for i, (role, msg) in enumerate(self.messages[self.offset :]): if i % 2 == 0: ret.append([msg, None]) else: ret[-1][-1] = msg return ret def to_openai_api_messages(self): """Convert the conversation to OpenAI chat completion format.""" system_prompt = self.system_template.format(system_message=self.system_message) ret = [{"role": "system", "content": system_prompt}] for i, (_, msg) in enumerate(self.messages[self.offset :]): if i % 2 == 0: ret.append({"role": "user", "content": msg}) else: if msg is not None: ret.append({"role": "assistant", "content": msg}) return ret def copy(self): return Conversation( name=self.name, system_template=self.system_template, system_message=self.system_message, roles=self.roles, messages=[[x, y] for x, y in self.messages], offset=self.offset, sep_style=self.sep_style, sep=self.sep, sep2=self.sep2, stop_str=self.stop_str, stop_token_ids=self.stop_token_ids, ) def dict(self): return { "template_name": self.name, "system_message": self.system_message, "roles": self.roles, "messages": self.messages, "offset": self.offset, } # A global registry for all conversation templates conv_templates: Dict[str, Conversation] = {} def register_conv_template(template: Conversation, override: bool = False): """Register a new conversation template.""" if not override: assert template.name not in conv_templates, f"{template.name} has been registered." conv_templates[template.name] = template def get_conv_template(name: str) -> Conversation: """Get a conversation template.""" return conv_templates[name].copy() register_conv_template( Conversation( name="plain", system_template="", system_message="", roles=("", ""), messages=(), offset=0, sep_style=SeparatorStyle.PLAIN, sep="", sep2="", stop_token_ids=[100001], stop_str=[''], ) ) register_conv_template( Conversation( name="alignment", system_template="", system_message="", roles=("", ""), messages=(), offset=0, sep_style=SeparatorStyle.ALIGNMENT, sep="", sep2="", stop_token_ids=[100001], stop_str=[''], ) ) register_conv_template( Conversation( name="kimi-vl", system_template="{system_message}", system_message="You are a helpful assistant", roles=("user", "assistant"), messages=(), offset=0, sep_style=SeparatorStyle.KIMI_VL, sep="<|im_end|>", sep2=None, stop_token_ids=None, stop_str=["<|im_end|>"], ) ) def new_chat_template(sft_format: str = "kimi-vl"): return get_conv_template(sft_format) def get_prompt(conv: Conversation) -> str: """Get the prompt for generation.""" return conv.get_prompt() def generate_prompt_with_history(text, images, history, processor, max_length=2048): """ Generate a prompt with the chat history. Args: text (str): The text prompt. images (list[PIL.Image.Image]): The image prompt. history (list): List of previous conversation messages. processor (KimiVLProcessor): The chat processor used for encoding the prompt. max_length (int): The maximum length of the prompt. """ global IMAGE_TOKEN user_role_ind = 0 bot_role_ind = 1 # Initialize conversation conversation = new_chat_template(sft_format="kimi-vl") if history: conversation.messages = history if images is not None and len(images) > 0: # num_image_tags = text.count(IMAGE_TOKEN) # num_images = len(images) # if num_images > num_image_tags: # pad_image_tags = num_images - num_image_tags # image_tokens = "\n".join([IMAGE_TOKEN] * pad_image_tags) # # append the in a new line after the text prompt # text = image_tokens + "\n" + text # elif num_images < num_image_tags: # remove_image_tags = num_image_tags - num_images # text = text.replace(IMAGE_TOKEN, "", remove_image_tags) print(f"prompt = {text}, len(images) = {len(images)}") text = (text, images) conversation.append_message(conversation.roles[user_role_ind], text) conversation.append_message(conversation.roles[bot_role_ind], "") # Create a copy of the conversation to avoid history truncation in the UI conversation_copy = conversation.copy() logger.info("=" * 80) logger.info(get_prompt(conversation)) rounds = len(conversation.messages) // 2 for _ in range(rounds): current_prompt = get_prompt(conversation) assert isinstance(current_prompt, str) and len(current_prompt) > 0, f"current_prompt = {current_prompt}" if torch.tensor(processor.tokenizer.encode(current_prompt)).size(-1) <= max_length: return conversation_copy if len(conversation.messages) % 2 != 0: gr.Error("The messages between user and assistant are not paired.") return try: for _ in range(2): # pop out two messages in a row conversation.messages.pop(0) except IndexError: gr.Error("Input text processing failed, unable to respond in this round.") return None gr.Error("Prompt could not be generated within max_length limit.") return None def convert_conversation_to_prompts(conversation: Conversation): """ Convert the conversation to prompts. """ conv_prompts = [] last_image = None messages = conversation.messages for i in range(0, len(messages), 2): if isinstance(messages[i][1], tuple): text, images = messages[i][1] last_image = images[-1] else: text, images = messages[i][1], [] prompt = {"role": messages[i][0], "content": text, "images": images} response = {"role": messages[i + 1][0], "content": messages[i + 1][1]} conv_prompts.extend([prompt, response]) return conv_prompts, last_image def to_gradio_chatbot2(conversation: Conversation) -> list: """Convert the conversation to gradio chatbot format.""" ret = [] for i, (_, msg) in enumerate(conversation.messages[conversation.offset :]): if i % 2 == 0: if type(msg) is tuple: msg, images = copy.deepcopy(msg) if isinstance(images, list): img_str = "" for j, image in enumerate(images): if isinstance(image, str): with open(image, "rb") as f: data = f.read() img_b64_str = base64.b64encode(data).decode() image_str = ( f'' ) else: image_str = pil_to_base64(image, f"user upload image_{j}", max_size=800, min_size=400) img_str += image_str msg = img_str + msg else: pass ret.append([msg, None]) else: ret[-1][-1] = msg return ret def to_gradio_chatbot(conversation: Conversation) -> list: """Convert the conversation to gradio chatbot format, supporting images and video.""" ret = [] for i, (_, msg) in enumerate(conversation.messages[conversation.offset :]): # User message if i % 2 == 0: if isinstance(msg, tuple): msg_text, media = copy.deepcopy(msg) media_str = "" # Handle list of media items if isinstance(media, list): items = media else: items = [media] for j, item in enumerate(items): # If string path, determine type if isinstance(item, str) and (not item.endswith((".mp4", ".mov", ".avi", ".webm"))): mime, _ = mimetypes.guess_type(item) with open(item, "rb") as f: data = f.read() b64 = base64.b64encode(data).decode() if mime and mime.startswith("image/"): media_str += ( f'' ) else: # Fallback to link media_str += f'{item}' elif isinstance(item, str) and (item.endswith((".mp4", ".mov", ".avi", ".webm"))): try: b64 = compress_video_to_base64(item) media_str += ( f'' ) except: pass # If PIL image else: media_str += pil_to_base64(item, f"user upload image_{j}", max_size=800, min_size=400) msg = media_str + msg_text # Append user side ret.append([msg, None]) else: # Assistant side, fill previous tuple ret[-1][-1] = msg return ret def to_gradio_history(conversation: Conversation): """Convert the conversation to gradio history format.""" return conversation.messages[conversation.offset :]