# generation_utils.py from threading import Thread from time import perf_counter from typing import List import gradio as gr from transformers import AutoTokenizer, TextIteratorStreamer import numpy as np import os def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int: """ Gets the token ID for a given string that has been added to the tokenizer as a special token. Args: tokenizer (PreTrainedTokenizer): the tokenizer key (str): the key to convert to a single token Raises: ValueError: if more than one ID was generated Returns: int: the token ID for the given key """ token_ids = tokenizer.encode(key) if len(token_ids) > 1: raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}") return token_ids[0] def estimate_latency( current_time: float, current_perf_text: str, new_gen_text: str, per_token_time: List[float], num_tokens: int, ) -> tuple: """ Helper function for performance estimation Parameters: current_time (float): This step time in seconds. current_perf_text (str): Current content of performance UI field. new_gen_text (str): New generated text. per_token_time (List[float]): history of performance from previous steps. num_tokens (int): Total number of generated tokens. Returns: update for performance text field update for a total number of tokens """ num_current_toks = len(tokenizer.encode(new_gen_text)) num_tokens += num_current_toks per_token_time.append(num_current_toks / current_time) if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: current_bucket = per_token_time[:-10] return ( f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", num_tokens, ) return current_perf_text, num_tokens def run_generation( user_text: str, top_p: float, temperature: float, top_k: int, max_new_tokens: int, perf_text: str, tokenizer: AutoTokenizer, tokenizer_kwargs: dict, model_configuration: dict, ov_model, ) -> tuple: """ Text generation function Parameters: user_text (str): User-provided instruction for generation. top_p (float): Nucleus sampling. If < 1, keeps smallest set of most probable tokens. temperature (float): Modulates logits distribution. top_k (int): Number of highest probability vocabulary tokens to keep for top-k-filtering. max_new_tokens (int): Maximum length of generated sequence. perf_text (str): Content of text field for performance results. tokenizer (AutoTokenizer): The tokenizer object. tokenizer_kwargs (dict): Additional kwargs for tokenizer. model_configuration (dict): Configuration for the model. ov_model: Your OpenVINO model object. Returns: model_output (str): Model-generated text. perf_text (str): Updated performance text. """ # Extract necessary configurations from model_configuration response_key = model_configuration.get("response_key") prompt_template = model_configuration.get("prompt_template", "{instruction}") end_key = model_configuration.get("end_key") end_key_token_id = None # Handle special tokens if response_key: tokenizer_response_key = next( (token for token in tokenizer.additional_special_tokens if token.startswith(response_key)), None, ) if tokenizer_response_key and end_key: try: end_key_token_id = get_special_token_id(tokenizer, end_key) except ValueError: pass # Ensure defaults for token IDs end_key_token_id = end_key_token_id or tokenizer.eos_token_id pad_token_id = end_key_token_id or tokenizer.pad_token_id # Prepare input prompt according to model expected template prompt_text = prompt_template.format(instruction=user_text) # Tokenize the user text. model_inputs = tokenizer(prompt_text, return_tensors="pt", **tokenizer_kwargs) # Start generation on a separate thread, so that we don't block the UI. streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generate_kwargs = { **model_inputs, "streamer": streamer, "max_new_tokens": max_new_tokens, "do_sample": True, "top_p": top_p, "temperature": float(temperature), "top_k": top_k, "eos_token_id": end_key_token_id, "pad_token_id": pad_token_id, } # Start generation in a separate thread t = Thread(target=ov_model.generate, kwargs=generate_kwargs) t.start() # Pull the generated text from the streamer and update model output model_output = "" per_token_time = [] num_tokens = 0 start = perf_counter() for new_text in streamer: current_time = perf_counter() - start model_output += new_text perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens) yield model_output, perf_text start = perf_counter() return model_output, perf_text def estimate_latency( current_time: float, current_perf_text: str, new_gen_text: str, per_token_time: List[float], num_tokens: int, ): """ Helper function for performance estimation Parameters: current_time (float): This step time in seconds. current_perf_text (str): Current content of performance UI field. new_gen_text (str): New generated text. per_token_time (List[float]): history of performance from previous steps. num_tokens (int): Total number of generated tokens. Returns: update for performance text field update for a total number of tokens """ num_current_toks = len(tokenizer.encode(new_gen_text)) num_tokens += num_current_toks per_token_time.append(num_current_toks / current_time) if len(per_token_time) > 10 and len(per_token_time) % 4 == 0: current_bucket = per_token_time[:-10] return ( f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", num_tokens, ) return current_perf_text, num_tokens def reset_textbox(instruction: str, response: str, perf: str): """ Helper function for resetting content of all text fields Parameters: instruction (str): Content of user instruction field. response (str): Content of model response field. perf (str): Content of performance info filed Returns: empty string for each placeholder """ return "", "", ""