# Standard library imports import os import json import asyncio from abc import ABC, abstractmethod from collections import OrderedDict from typing import Optional, Any, Annotated # Related third-party imports import yaml import torch import openai from openai import OpenAI from dotenv import load_dotenv from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM load_dotenv() class LanguageModel(ABC): """ Abstract base class for language models. This class provides a common interface for language models with methods to generate text and unload resources. Parameters ---------- config : dict Configuration for the language model. """ def __init__(self, config: Annotated[dict, "Configuration for the language model"]): self.config = config @abstractmethod def generate( self, messages: Annotated[list, "List of message dictionaries"], **kwargs: Annotated[Any, "Additional keyword arguments"] ) -> Annotated[str, "Generated text"]: """ Generate text based on the given input messages. Parameters ---------- messages : list List of message dictionaries with 'role' and 'content'. **kwargs : Any Additional keyword arguments. Returns ------- str Generated text output. """ pass def unload(self) -> Annotated[None, "Unload resources used by the language model"]: """ Unload resources used by the language model. """ pass class LLaMAModel(LanguageModel): """ LLaMA language model implementation using Hugging Face Transformers. Parameters ---------- config : dict Configuration for the LLaMA model. """ def __init__(self, config: Annotated[dict, "Configuration for the LLaMA model"]): super().__init__(config) model_name = config['model_name'] compute_type = config.get('compute_type') torch.cuda.empty_cache() print(f"Loading LLaMA model: {model_name}") print(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"CUDA Version: {torch.version.cuda}") print(f"GPU: {torch.cuda.get_device_name(0)}") else: print("GPU not available, using CPU.") self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", torch_dtype=torch.bfloat16 if torch.cuda.is_available() and compute_type == "float16" else torch.float32, low_cpu_mem_usage=True ) self.pipe = pipeline( "text-generation", model=self.model, tokenizer=self.tokenizer, device_map="auto", ) def generate( self, messages: Annotated[list, "List of message dictionaries"], max_new_tokens: Annotated[int, "Maximum number of new tokens to generate"] = 10000, truncation: Annotated[bool, "Whether to truncate the input"] = True, batch_size: Annotated[int, "Batch size for generation"] = 1, pad_token_id: Annotated[Optional[int], "Padding token ID"] = None ) -> Annotated[str, "Generated text"]: """ Generate text based on input messages using the LLaMA model. Parameters ---------- messages : list List of message dictionaries with 'role' and 'content'. max_new_tokens : int, optional Maximum number of tokens to generate. Default is 10000. truncation : bool, optional Whether to truncate the input. Default is True. batch_size : int, optional Batch size for generation. Default is 1. pad_token_id : int, optional Padding token ID. Defaults to the tokenizer's EOS token ID. Returns ------- str Generated text. """ prompt = self._format_messages_llama(messages) output = self.pipe( prompt, max_new_tokens=max_new_tokens, truncation=truncation, batch_size=batch_size, pad_token_id=pad_token_id if pad_token_id is not None else self.tokenizer.eos_token_id ) return output[0]['generated_text'] @staticmethod def _format_messages_llama(messages: Annotated[list, "List of message dictionaries"]) -> Annotated[ str, "Formatted prompt"]: """ Format messages into a single prompt for LLaMA. Parameters ---------- messages : list List of message dictionaries with 'role' and 'content'. Returns ------- str Formatted prompt. """ prompt = "" for message in messages: role = message.get("role", "").lower() content = message.get("content", "") if role == "system": prompt += f"System: {content}\n" elif role == "user": prompt += f"User: {content}\n" elif role == "assistant": prompt += f"Assistant: {content}\n" prompt += "Assistant:" return prompt def unload(self) -> Annotated[None, "Unload the LLaMA model and release resources"]: """ Unload the LLaMA model and release resources. """ del self.pipe del self.model del self.tokenizer torch.cuda.empty_cache() print(f"LLaMA model '{self.config['model_name']}' unloaded.") class OpenAIModel(LanguageModel): """ OpenAI GPT model integration. Parameters ---------- config : dict Configuration for the OpenAI model. """ def __init__(self, config: Annotated[dict, "Configuration for the OpenAI model"]): super().__init__(config) openai_api_key = config.get('openai_api_key') if not openai_api_key: raise ValueError("OpenAI API key must be provided.") self.client = OpenAI(api_key=openai_api_key) self.model_name = config.get('model_name', 'gpt-4') def generate( self, messages: Annotated[list, "List of message dictionaries"], max_length: Annotated[int, "Maximum number of tokens for the output"] = 10000, return_as_json: bool = False, **kwargs: Annotated[Any, "Additional keyword arguments"] ) -> Annotated[str, "Generated text"]: """ Generate text using OpenAI's API. Parameters ---------- messages : list List of message dictionaries with 'role' and 'content'. max_length : int, optional Maximum number of tokens for the output. Default is 10000. return_as_json : bool, optional If True, response_format={"type": "json_object"} parametresi eklenir ve dönen içerik json.loads ile dict'e dönüştürülür. Varsayılan False'dur. **kwargs : Any Additional keyword arguments. Returns ------- str or dict Generated text as a string if return_as_json=False. If return_as_json=True and the response is in valid JSON format, returns a dict. """ create_kwargs = { "model": self.model_name, "messages": messages, "max_tokens": max_length, "temperature": kwargs.get('temperature', 0.7) } if return_as_json is True: create_kwargs["response_format"] = {"type": "json_object"} completion = self.client.chat.completions.create(**create_kwargs) response_text = completion.choices[0].message.content if return_as_json: try: return json.loads(response_text) except json.JSONDecodeError: return response_text return response_text def unload(self) -> Annotated[None, "Placeholder for OpenAI model unload (no local resources to release)"]: """ Placeholder for OpenAI model unload (no local resources to release). """ print(f"OpenAI model '{self.model_name}' unloaded.") class AzureOpenAIModel(LanguageModel): """ Azure OpenAI model integration. Parameters ---------- config : dict Configuration for the Azure OpenAI model. """ def __init__(self, config: Annotated[dict, "Configuration for the Azure OpenAI model"]): super().__init__(config) self.model_name = config.get('model_name', 'gpt-4o') self.api_key = config.get('azure_openai_api_key') self.api_base = config.get('azure_openai_api_base') self.api_version = config.get('azure_openai_api_version') if not all([self.api_key, self.api_base, self.api_version]): raise ValueError("Azure OpenAI API key, base, and version must be provided.") openai.api_type = "azure" openai.api_base = self.api_base openai.api_version = self.api_version openai.api_key = self.api_key def generate( self, messages: Annotated[list, "List of message dictionaries"], max_length: Annotated[int, "Maximum number of tokens for the output"] = 10000, **kwargs: Annotated[Any, "Additional keyword arguments"] ) -> Annotated[str, "Generated text"]: """ Generate text using Azure OpenAI's API. Parameters ---------- messages : list List of message dictionaries with 'role' and 'content'. max_length : int, optional Maximum number of tokens for the output. Default is 10000. **kwargs : Any Additional keyword arguments. Returns ------- str Generated text. """ response = openai.ChatCompletion.create( deployment_id=self.model_name, messages=messages, max_tokens=max_length, temperature=kwargs.get('temperature', 0.7) ) return response.choices[0].message['content'] def unload(self) -> Annotated[None, "Placeholder for Azure OpenAI model unload (no local resources to release)"]: """ Placeholder for Azure OpenAI model unload (no local resources to release). """ print(f"Azure OpenAI model '{self.model_name}' unloaded.") class ModelRegistry: """ Registry to manage language model class registrations. This class allows dynamic registration and retrieval of model classes. """ _registry = {} @classmethod def register( cls, model_id: Annotated[str, "Unique identifier for the model"], model_class: Annotated[type, "The class to register"] ) -> Annotated[None, "Registration completed"]: """ Register a model class with the registry. Parameters ---------- model_id : str Unique identifier for the model class. model_class : type The class to register. """ cls._registry[model_id.lower()] = model_class @classmethod def get_model_class(cls, model_id: Annotated[str, "Unique identifier for the model"]) -> Annotated[ type, "Model class"]: """ Retrieve a model class by its unique identifier. Parameters ---------- model_id : str Unique identifier for the model class. Returns ------- type The model class corresponding to the identifier. Raises ------ ValueError If the model ID is not registered. """ model_class = cls._registry.get(model_id.lower()) if not model_class: raise ValueError(f"No class found for model ID '{model_id}'.") return model_class class ModelFactory: """ Factory to create language model instances. This class uses the `ModelRegistry` to create instances of registered model classes. """ @staticmethod def create_model( model_id: Annotated[str, "Unique identifier for the model"], config: Annotated[dict, "Configuration for the model"] ) -> Annotated[LanguageModel, "Instance of the language model"]: """ Create a language model instance based on its unique identifier. Parameters ---------- model_id : str Unique identifier for the model. config : dict Configuration for the model. Returns ------- LanguageModel An instance of the language model. """ model_class = ModelRegistry.get_model_class(model_id) return model_class(config) class LanguageModelManager: """ Manages multiple language models with caching and async support. Parameters ---------- config_path : str Path to the YAML configuration file. cache_size : int, optional Maximum number of models to cache. Default is 10. """ def __init__( self, config_path: Annotated[str, "Path to the YAML configuration file"], cache_size: Annotated[int, "Maximum number of models to cache"] = 10 ): self.config_path = config_path self.cache_size = cache_size self.models = OrderedDict() self.full_config = self._load_full_config(config_path) self.runtime_config = self.full_config.get('runtime', {}) self.models_config = self.full_config.get('models', {}) self.lock = asyncio.Lock() @staticmethod def _load_full_config(config_path: Annotated[str, "Path to the YAML configuration file"]) -> Annotated[ dict, "Parsed configuration"]: """ Load and parse the YAML configuration file. Parameters ---------- config_path : str Path to the YAML file. Returns ------- dict Parsed configuration. """ with open(config_path, encoding='utf-8') as f: config = yaml.safe_load(f) for model_id, model_config in config.get('models', {}).items(): for key, value in model_config.items(): if isinstance(value, str) and value.startswith("${") and value.endswith("}"): env_var = value[2:-1] model_config[key] = os.getenv(env_var, "") return config async def get_model( self, model_id: Annotated[str, "Unique identifier for the model"] ) -> Annotated[LanguageModel, "Instance of the language model"]: """ Retrieve a language model instance from the cache or create a new one. Parameters ---------- model_id : str Unique identifier for the model. Returns ------- LanguageModel An instance of the language model. Raises ------ ValueError If the model ID is not found in the configuration. """ async with self.lock: torch.cuda.empty_cache() if model_id in self.models: self.models.move_to_end(model_id) return self.models[model_id] else: config = self.models_config.get(model_id) if not config: raise ValueError(f"Model ID '{model_id}' not found in configuration.") config['compute_type'] = self.runtime_config.get('compute_type', 'float16') model = ModelFactory.create_model(model_id, config) self.models[model_id] = model if len(self.models) > self.cache_size: oldest_model_id, oldest_model = self.models.popitem(last=False) oldest_model.unload() return model async def generate( self, model_id: Annotated[str, "Unique identifier for the model"], messages: Annotated[list, "List of message dictionaries"], **kwargs: Annotated[Any, "Additional keyword arguments"] ) -> Annotated[Optional[str], "Generated text or None if an error occurs"]: """ Generate text using a specific language model. Parameters ---------- model_id : str Unique identifier for the model. messages : list List of message dictionaries with 'role' and 'content'. **kwargs : Any Additional keyword arguments. Returns ------- str or None Generated text or None if an error occurs. """ try: model = await self.get_model(model_id) return model.generate(messages, **kwargs) except Exception as e: print(f"Error with model ({model_id}): {e}") return None def unload_all(self) -> Annotated[None, "Unload all cached models and release resources"]: """ Unload all cached models and release resources. """ for model in self.models.values(): model.unload() self.models.clear() print("All models have been unloaded.") if __name__ == "__main__": # noinspection PyMissingOrEmptyDocstring async def main(): config_path = 'config/config.yaml' manager = LanguageModelManager(config_path=config_path, cache_size=11) llama_model_id = "llama" llama_messages = [ {"role": "system", "content": "You are a pirate. Answer accordingly!"}, {"role": "user", "content": "Who are you?"} ] llama_output = await manager.generate(model_id=llama_model_id, messages=llama_messages) print(f"LLaMA Model Output: {llama_output}") asyncio.run(main())