import abc import asyncio from abc import abstractmethod import math import tiktoken import openai import backoff class LLM(abc.ABC): prompt_percent = 0.9 @abstractmethod def __init__(self): raise NotImplementedError("Subclasses should implement this!") @abstractmethod def infer(self, prompts): raise NotImplementedError("Subclasses should implement this!") @abstractmethod def split_input( self, fixed_instruction, few_shot_examples, splittable_input, input_header, output_header ): raise NotImplementedError("Subclasses should implement this!") class GPT(LLM): prompt_percent = 0.8 openai_cxn_dict = { "default": { "endpoint": "INSERT YOUR AZURE OPENAI ENDPOINT HERE", "api_key": "INSERT YOUR AZURE OPENAI API KEY HERE", }, } deployment_max_length_dict = { "gpt-4": 8192, "gpt-4-0314": 8192, "gpt-4-32k": 32768, "gpt-35-turbo": 4096, "gpt-35-turbo-16k": 16385, } def __init__(self, model_id): self.temperature = 0.0 self.top_k = 1 self.encoding = tiktoken.encoding_for_model( "-".join(model_id.split("-", 2)[:2]).replace("5", ".5") ) self.openai_api = "default" self.model_id = model_id self.max_length = self.deployment_max_length_dict[model_id] self.client = openai.AsyncAzureOpenAI( api_key=self.openai_cxn_dict[self.openai_api]["api_key"], api_version="2023-12-01-preview", azure_endpoint=self.openai_cxn_dict[self.openai_api]["endpoint"], ) def gen_messages( self, fixed_instruction, few_shot_examples, input, input_header, output_header ): messages = [ { "role": "system", "content": fixed_instruction, }, ] for example in few_shot_examples: messages.extend( [ { "role": "user", "content": input_header + "\n" + example["user"] + "\n\n" + output_header, }, { "role": "assistant", "content": example["assistant"], }, ] ) messages.extend( [ { "role": "user", "content": input_header + "\n" + input + "\n\n" + output_header, }, ] ) return messages # Define the coroutine for making API calls to GPT @backoff.on_exception(backoff.expo, openai.RateLimitError) async def make_api_call_to_gpt(self, messages): response = await self.client.chat.completions.create( model=self.model_id, messages=messages, temperature=self.temperature, ) return response.choices[0].message.content async def dispatch_openai_requests( self, messages_list, ): # Asynchronously call the function for each prompt tasks = [self.make_api_call_to_gpt(messages) for messages in messages_list] # Gather and run the tasks concurrently results = await asyncio.gather(*tasks) return results def infer( self, messages_list, ): return asyncio.run(self.dispatch_openai_requests(messages_list)) def split_input( self, fixed_instruction, few_shot_examples, splittable_input, input_header, output_header ): # Tokenize fixed_prompt fixed_token_ids = self.encoding.encode( fixed_instruction + " ".join([x["user"] + " " + x["assistant"] for x in few_shot_examples]) ) # Calculate remaining token length remaining_token_len = math.ceil( (self.prompt_percent * self.max_length) - len(fixed_token_ids) ) # Tokenize splittable_input split_token_ids = self.encoding.encode(splittable_input) # Split tokenized split_prompt into list of individual inputs strings. Uses tokens to calculate length split_token_ids_list = [ split_token_ids[i : i + remaining_token_len + 10] for i in range(0, len(split_token_ids), remaining_token_len) ] split_input_list = [ self.encoding.decode(split_token_ids) for split_token_ids in split_token_ids_list ] # Take the fixed_prompt, few_shot_examples, splitted inputs, and input/output headers and generate list of prompt strings. return [ self.gen_messages( fixed_instruction, few_shot_examples, split_input, input_header, output_header ) for split_input in split_input_list ]