Spaces:
Running
Running
# Standard library imports | |
import os | |
import json | |
import asyncio | |
from abc import ABC, abstractmethod | |
from collections import OrderedDict | |
from typing import Optional, Any, Annotated | |
# Related third-party imports | |
import yaml | |
import torch | |
import openai | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
load_dotenv() | |
class LanguageModel(ABC): | |
""" | |
Abstract base class for language models. | |
This class provides a common interface for language models with methods | |
to generate text and unload resources. | |
Parameters | |
---------- | |
config : dict | |
Configuration for the language model. | |
""" | |
def __init__(self, config: Annotated[dict, "Configuration for the language model"]): | |
self.config = config | |
def generate( | |
self, | |
messages: Annotated[list, "List of message dictionaries"], | |
**kwargs: Annotated[Any, "Additional keyword arguments"] | |
) -> Annotated[str, "Generated text"]: | |
""" | |
Generate text based on the given input messages. | |
Parameters | |
---------- | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
**kwargs : Any | |
Additional keyword arguments. | |
Returns | |
------- | |
str | |
Generated text output. | |
""" | |
pass | |
def unload(self) -> Annotated[None, "Unload resources used by the language model"]: | |
""" | |
Unload resources used by the language model. | |
""" | |
pass | |
class LLaMAModel(LanguageModel): | |
""" | |
LLaMA language model implementation using Hugging Face Transformers. | |
Parameters | |
---------- | |
config : dict | |
Configuration for the LLaMA model. | |
""" | |
def __init__(self, config: Annotated[dict, "Configuration for the LLaMA model"]): | |
super().__init__(config) | |
model_name = config['model_name'] | |
compute_type = config.get('compute_type') | |
torch.cuda.empty_cache() | |
print(f"Loading LLaMA model: {model_name}") | |
print(f"CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
print(f"CUDA Version: {torch.version.cuda}") | |
print(f"GPU: {torch.cuda.get_device_name(0)}") | |
else: | |
print("GPU not available, using CPU.") | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
device_map="auto", | |
torch_dtype=torch.bfloat16 if torch.cuda.is_available() and compute_type == "float16" else torch.float32, | |
low_cpu_mem_usage=True | |
) | |
self.pipe = pipeline( | |
"text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
device_map="auto", | |
) | |
def generate( | |
self, | |
messages: Annotated[list, "List of message dictionaries"], | |
max_new_tokens: Annotated[int, "Maximum number of new tokens to generate"] = 10000, | |
truncation: Annotated[bool, "Whether to truncate the input"] = True, | |
batch_size: Annotated[int, "Batch size for generation"] = 1, | |
pad_token_id: Annotated[Optional[int], "Padding token ID"] = None | |
) -> Annotated[str, "Generated text"]: | |
""" | |
Generate text based on input messages using the LLaMA model. | |
Parameters | |
---------- | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
max_new_tokens : int, optional | |
Maximum number of tokens to generate. Default is 10000. | |
truncation : bool, optional | |
Whether to truncate the input. Default is True. | |
batch_size : int, optional | |
Batch size for generation. Default is 1. | |
pad_token_id : int, optional | |
Padding token ID. Defaults to the tokenizer's EOS token ID. | |
Returns | |
------- | |
str | |
Generated text. | |
""" | |
prompt = self._format_messages_llama(messages) | |
output = self.pipe( | |
prompt, | |
max_new_tokens=max_new_tokens, | |
truncation=truncation, | |
batch_size=batch_size, | |
pad_token_id=pad_token_id if pad_token_id is not None else self.tokenizer.eos_token_id | |
) | |
return output[0]['generated_text'] | |
def _format_messages_llama(messages: Annotated[list, "List of message dictionaries"]) -> Annotated[ | |
str, "Formatted prompt"]: | |
""" | |
Format messages into a single prompt for LLaMA. | |
Parameters | |
---------- | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
Returns | |
------- | |
str | |
Formatted prompt. | |
""" | |
prompt = "" | |
for message in messages: | |
role = message.get("role", "").lower() | |
content = message.get("content", "") | |
if role == "system": | |
prompt += f"System: {content}\n" | |
elif role == "user": | |
prompt += f"User: {content}\n" | |
elif role == "assistant": | |
prompt += f"Assistant: {content}\n" | |
prompt += "Assistant:" | |
return prompt | |
def unload(self) -> Annotated[None, "Unload the LLaMA model and release resources"]: | |
""" | |
Unload the LLaMA model and release resources. | |
""" | |
del self.pipe | |
del self.model | |
del self.tokenizer | |
torch.cuda.empty_cache() | |
print(f"LLaMA model '{self.config['model_name']}' unloaded.") | |
class OpenAIModel(LanguageModel): | |
""" | |
OpenAI GPT model integration. | |
Parameters | |
---------- | |
config : dict | |
Configuration for the OpenAI model. | |
""" | |
def __init__(self, config: Annotated[dict, "Configuration for the OpenAI model"]): | |
super().__init__(config) | |
openai_api_key = config.get('openai_api_key') | |
if not openai_api_key: | |
raise ValueError("OpenAI API key must be provided.") | |
self.client = OpenAI(api_key=openai_api_key) | |
self.model_name = config.get('model_name', 'gpt-4') | |
def generate( | |
self, | |
messages: Annotated[list, "List of message dictionaries"], | |
max_length: Annotated[int, "Maximum number of tokens for the output"] = 10000, | |
return_as_json: bool = False, | |
**kwargs: Annotated[Any, "Additional keyword arguments"] | |
) -> Annotated[str, "Generated text"]: | |
""" | |
Generate text using OpenAI's API. | |
Parameters | |
---------- | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
max_length : int, optional | |
Maximum number of tokens for the output. Default is 10000. | |
return_as_json : bool, optional | |
If True, response_format={"type": "json_object"} parametresi eklenir ve dönen içerik | |
json.loads ile dict'e dönüştürülür. Varsayılan False'dur. | |
**kwargs : Any | |
Additional keyword arguments. | |
Returns | |
------- | |
str or dict | |
Generated text as a string if return_as_json=False. | |
If return_as_json=True and the response is in valid JSON format, | |
returns a dict. | |
""" | |
create_kwargs = { | |
"model": self.model_name, | |
"messages": messages, | |
"max_tokens": max_length, | |
"temperature": kwargs.get('temperature', 0.7) | |
} | |
if return_as_json is True: | |
create_kwargs["response_format"] = {"type": "json_object"} | |
completion = self.client.chat.completions.create(**create_kwargs) | |
response_text = completion.choices[0].message.content | |
if return_as_json: | |
try: | |
return json.loads(response_text) | |
except json.JSONDecodeError: | |
return response_text | |
return response_text | |
def unload(self) -> Annotated[None, "Placeholder for OpenAI model unload (no local resources to release)"]: | |
""" | |
Placeholder for OpenAI model unload (no local resources to release). | |
""" | |
print(f"OpenAI model '{self.model_name}' unloaded.") | |
class AzureOpenAIModel(LanguageModel): | |
""" | |
Azure OpenAI model integration. | |
Parameters | |
---------- | |
config : dict | |
Configuration for the Azure OpenAI model. | |
""" | |
def __init__(self, config: Annotated[dict, "Configuration for the Azure OpenAI model"]): | |
super().__init__(config) | |
self.model_name = config.get('model_name', 'gpt-4o') | |
self.api_key = config.get('azure_openai_api_key') | |
self.api_base = config.get('azure_openai_api_base') | |
self.api_version = config.get('azure_openai_api_version') | |
if not all([self.api_key, self.api_base, self.api_version]): | |
raise ValueError("Azure OpenAI API key, base, and version must be provided.") | |
openai.api_type = "azure" | |
openai.api_base = self.api_base | |
openai.api_version = self.api_version | |
openai.api_key = self.api_key | |
def generate( | |
self, | |
messages: Annotated[list, "List of message dictionaries"], | |
max_length: Annotated[int, "Maximum number of tokens for the output"] = 10000, | |
**kwargs: Annotated[Any, "Additional keyword arguments"] | |
) -> Annotated[str, "Generated text"]: | |
""" | |
Generate text using Azure OpenAI's API. | |
Parameters | |
---------- | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
max_length : int, optional | |
Maximum number of tokens for the output. Default is 10000. | |
**kwargs : Any | |
Additional keyword arguments. | |
Returns | |
------- | |
str | |
Generated text. | |
""" | |
response = openai.ChatCompletion.create( | |
deployment_id=self.model_name, | |
messages=messages, | |
max_tokens=max_length, | |
temperature=kwargs.get('temperature', 0.7) | |
) | |
return response.choices[0].message['content'] | |
def unload(self) -> Annotated[None, "Placeholder for Azure OpenAI model unload (no local resources to release)"]: | |
""" | |
Placeholder for Azure OpenAI model unload (no local resources to release). | |
""" | |
print(f"Azure OpenAI model '{self.model_name}' unloaded.") | |
class ModelRegistry: | |
""" | |
Registry to manage language model class registrations. | |
This class allows dynamic registration and retrieval of model classes. | |
""" | |
_registry = {} | |
def register( | |
cls, | |
model_id: Annotated[str, "Unique identifier for the model"], | |
model_class: Annotated[type, "The class to register"] | |
) -> Annotated[None, "Registration completed"]: | |
""" | |
Register a model class with the registry. | |
Parameters | |
---------- | |
model_id : str | |
Unique identifier for the model class. | |
model_class : type | |
The class to register. | |
""" | |
cls._registry[model_id.lower()] = model_class | |
def get_model_class(cls, model_id: Annotated[str, "Unique identifier for the model"]) -> Annotated[ | |
type, "Model class"]: | |
""" | |
Retrieve a model class by its unique identifier. | |
Parameters | |
---------- | |
model_id : str | |
Unique identifier for the model class. | |
Returns | |
------- | |
type | |
The model class corresponding to the identifier. | |
Raises | |
------ | |
ValueError | |
If the model ID is not registered. | |
""" | |
model_class = cls._registry.get(model_id.lower()) | |
if not model_class: | |
raise ValueError(f"No class found for model ID '{model_id}'.") | |
return model_class | |
class ModelFactory: | |
""" | |
Factory to create language model instances. | |
This class uses the `ModelRegistry` to create instances of registered model classes. | |
""" | |
def create_model( | |
model_id: Annotated[str, "Unique identifier for the model"], | |
config: Annotated[dict, "Configuration for the model"] | |
) -> Annotated[LanguageModel, "Instance of the language model"]: | |
""" | |
Create a language model instance based on its unique identifier. | |
Parameters | |
---------- | |
model_id : str | |
Unique identifier for the model. | |
config : dict | |
Configuration for the model. | |
Returns | |
------- | |
LanguageModel | |
An instance of the language model. | |
""" | |
model_class = ModelRegistry.get_model_class(model_id) | |
return model_class(config) | |
class LanguageModelManager: | |
""" | |
Manages multiple language models with caching and async support. | |
Parameters | |
---------- | |
config_path : str | |
Path to the YAML configuration file. | |
cache_size : int, optional | |
Maximum number of models to cache. Default is 10. | |
""" | |
def __init__( | |
self, | |
config_path: Annotated[str, "Path to the YAML configuration file"], | |
cache_size: Annotated[int, "Maximum number of models to cache"] = 10 | |
): | |
self.config_path = config_path | |
self.cache_size = cache_size | |
self.models = OrderedDict() | |
self.full_config = self._load_full_config(config_path) | |
self.runtime_config = self.full_config.get('runtime', {}) | |
self.models_config = self.full_config.get('models', {}) | |
self.lock = asyncio.Lock() | |
def _load_full_config(config_path: Annotated[str, "Path to the YAML configuration file"]) -> Annotated[ | |
dict, "Parsed configuration"]: | |
""" | |
Load and parse the YAML configuration file. | |
Parameters | |
---------- | |
config_path : str | |
Path to the YAML file. | |
Returns | |
------- | |
dict | |
Parsed configuration. | |
""" | |
with open(config_path, encoding='utf-8') as f: | |
config = yaml.safe_load(f) | |
for model_id, model_config in config.get('models', {}).items(): | |
for key, value in model_config.items(): | |
if isinstance(value, str) and value.startswith("${") and value.endswith("}"): | |
env_var = value[2:-1] | |
model_config[key] = os.getenv(env_var, "") | |
return config | |
async def get_model( | |
self, | |
model_id: Annotated[str, "Unique identifier for the model"] | |
) -> Annotated[LanguageModel, "Instance of the language model"]: | |
""" | |
Retrieve a language model instance from the cache or create a new one. | |
Parameters | |
---------- | |
model_id : str | |
Unique identifier for the model. | |
Returns | |
------- | |
LanguageModel | |
An instance of the language model. | |
Raises | |
------ | |
ValueError | |
If the model ID is not found in the configuration. | |
""" | |
async with self.lock: | |
torch.cuda.empty_cache() | |
if model_id in self.models: | |
self.models.move_to_end(model_id) | |
return self.models[model_id] | |
else: | |
config = self.models_config.get(model_id) | |
if not config: | |
raise ValueError(f"Model ID '{model_id}' not found in configuration.") | |
config['compute_type'] = self.runtime_config.get('compute_type', 'float16') | |
model = ModelFactory.create_model(model_id, config) | |
self.models[model_id] = model | |
if len(self.models) > self.cache_size: | |
oldest_model_id, oldest_model = self.models.popitem(last=False) | |
oldest_model.unload() | |
return model | |
async def generate( | |
self, | |
model_id: Annotated[str, "Unique identifier for the model"], | |
messages: Annotated[list, "List of message dictionaries"], | |
**kwargs: Annotated[Any, "Additional keyword arguments"] | |
) -> Annotated[Optional[str], "Generated text or None if an error occurs"]: | |
""" | |
Generate text using a specific language model. | |
Parameters | |
---------- | |
model_id : str | |
Unique identifier for the model. | |
messages : list | |
List of message dictionaries with 'role' and 'content'. | |
**kwargs : Any | |
Additional keyword arguments. | |
Returns | |
------- | |
str or None | |
Generated text or None if an error occurs. | |
""" | |
try: | |
model = await self.get_model(model_id) | |
return model.generate(messages, **kwargs) | |
except Exception as e: | |
print(f"Error with model ({model_id}): {e}") | |
return None | |
def unload_all(self) -> Annotated[None, "Unload all cached models and release resources"]: | |
""" | |
Unload all cached models and release resources. | |
""" | |
for model in self.models.values(): | |
model.unload() | |
self.models.clear() | |
print("All models have been unloaded.") | |
if __name__ == "__main__": | |
# noinspection PyMissingOrEmptyDocstring | |
async def main(): | |
config_path = 'config/config.yaml' | |
manager = LanguageModelManager(config_path=config_path, cache_size=11) | |
llama_model_id = "llama" | |
llama_messages = [ | |
{"role": "system", "content": "You are a pirate. Answer accordingly!"}, | |
{"role": "user", "content": "Who are you?"} | |
] | |
llama_output = await manager.generate(model_id=llama_model_id, messages=llama_messages) | |
print(f"LLaMA Model Output: {llama_output}") | |
asyncio.run(main()) | |