|
import os |
|
import sys |
|
import threading |
|
from queue import Queue |
|
from typing import Any, Dict, List, Optional |
|
import google.generativeai as genai |
|
import torch |
|
from langchain.callbacks.base import BaseCallbackHandler |
|
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler |
|
from langchain_openai.chat_models import ChatOpenAI |
|
from langchain_openai.llms import OpenAI |
|
from langchain_google_genai import ( |
|
ChatGoogleGenerativeAI, |
|
HarmBlockThreshold, |
|
HarmCategory, |
|
) |
|
from langchain_community.llms import ( |
|
HuggingFaceTextGenInference, |
|
CTransformers, |
|
GPT4All, |
|
HuggingFacePipeline, |
|
LlamaCpp, |
|
VLLM, |
|
) |
|
from langchain_community.chat_models import ChatOllama |
|
from langchain.schema import LLMResult |
|
from transformers import ( |
|
AutoConfig, |
|
AutoModelForCausalLM, |
|
AutoModelForSeq2SeqLM, |
|
AutoTokenizer, |
|
BitsAndBytesConfig, |
|
StoppingCriteria, |
|
StoppingCriteriaList, |
|
T5Tokenizer, |
|
TextStreamer, |
|
pipeline, |
|
) |
|
|
|
from app_modules.utils import ensure_model_is_downloaded |
|
|
|
|
|
class TextIteratorStreamer(TextStreamer, StreamingStdOutCallbackHandler): |
|
def __init__( |
|
self, |
|
tokenizer: "AutoTokenizer", |
|
skip_prompt: bool = False, |
|
timeout: Optional[float] = None, |
|
for_huggingface: bool = False, |
|
**decode_kwargs, |
|
): |
|
super().__init__(tokenizer, skip_prompt, **decode_kwargs) |
|
self.text_queue = Queue() |
|
self.stop_signal = None |
|
self.timeout = timeout |
|
self.total_tokens = 0 |
|
self.for_huggingface = for_huggingface |
|
self.end_token = "" |
|
|
|
def on_finalized_text(self, text: str, stream_end: bool = False): |
|
super().on_finalized_text(text, stream_end=stream_end) |
|
|
|
"""Put the new text in the queue. If the stream is ending, also put a stop signal in the queue.""" |
|
self.text_queue.put(text, timeout=self.timeout) |
|
self.total_tokens = self.total_tokens + 1 |
|
if stream_end: |
|
print("\n") |
|
self.text_queue.put("\n", timeout=self.timeout) |
|
self.text_queue.put(self.stop_signal, timeout=self.timeout) |
|
|
|
def check_end_token(self, token): |
|
new_token = self.end_token + token |
|
if "<|im_end|>".startswith(new_token): |
|
self.end_token = "" if new_token == "<|im_end|>" else new_token |
|
return None |
|
elif self.end_token != "": |
|
self.end_token = "" |
|
|
|
return new_token |
|
|
|
def on_llm_new_token(self, token: str, **kwargs: Any) -> None: |
|
token = self.check_end_token(token) |
|
if token: |
|
sys.stdout.write(token) |
|
sys.stdout.flush() |
|
self.text_queue.put(token, timeout=self.timeout) |
|
self.total_tokens = self.total_tokens + 1 |
|
|
|
def on_llm_start( |
|
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any |
|
) -> Any: |
|
|
|
pass |
|
|
|
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: |
|
print("\n") |
|
self.text_queue.put("\n", timeout=self.timeout) |
|
self.text_queue.put(self.stop_signal, timeout=self.timeout) |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
value = self.text_queue.get(timeout=self.timeout) |
|
if value == self.stop_signal: |
|
raise StopIteration() |
|
else: |
|
return value |
|
|
|
def reset(self, q: Queue = None): |
|
|
|
self.text_queue = q if q is not None else Queue() |
|
self.end_token = "" |
|
|
|
def empty(self): |
|
return self.text_queue.empty() |
|
|
|
|
|
class LLMLoader: |
|
def __init__(self, llm_model_type): |
|
self.llm_model_type = llm_model_type |
|
self.llm = None |
|
self.streamer = TextIteratorStreamer( |
|
"", |
|
for_huggingface=True, |
|
) |
|
self.max_tokens_limit = 4096 |
|
self.search_kwargs = {"k": 8} |
|
self.lock = threading.Lock() |
|
self.model_name = os.getenv("HUGGINGFACE_MODEL_NAME_OR_PATH").split("/")[-1] |
|
self.repetition_penalty = "" |
|
self.batch_size = int(os.getenv("BATCH_SIZE", "1")) |
|
|
|
def _init_hf_streamer(self, tokenizer): |
|
if self.batch_size == 1: |
|
self.streamer = TextIteratorStreamer( |
|
tokenizer, |
|
timeout=10.0, |
|
skip_prompt=True, |
|
skip_special_tokens=True, |
|
for_huggingface=True, |
|
) |
|
else: |
|
self.streamer = None |
|
|
|
def huggingfaceStreamingEnabled(self): |
|
return self.streamer is not None |
|
|
|
def init( |
|
self, |
|
custom_handler: Optional[BaseCallbackHandler] = None, |
|
n_threds: int = 4, |
|
hf_pipeline_device_type: str = None, |
|
): |
|
print("initializing LLM: " + self.llm_model_type) |
|
|
|
if hf_pipeline_device_type is None: |
|
hf_pipeline_device_type = "cpu" |
|
|
|
using_cuda = hf_pipeline_device_type.startswith("cuda") |
|
using_mps = hf_pipeline_device_type.startswith("mps") |
|
torch_dtype = torch.float16 if using_cuda or using_mps else torch.float32 |
|
if not using_mps and os.environ.get("USING_TORCH_BFLOAT16") == "true": |
|
torch_dtype = torch.bfloat16 |
|
|
|
load_quantized_model = os.environ.get("LOAD_QUANTIZED_MODEL") |
|
|
|
print(f" hf_pipeline_device_type: {hf_pipeline_device_type}") |
|
print(f" load_quantized_model: {load_quantized_model}") |
|
print(f" torch_dtype: {torch_dtype}") |
|
print(f" n_threds: {n_threds}") |
|
|
|
torch.set_default_dtype(torch_dtype) |
|
|
|
double_quant_config = BitsAndBytesConfig( |
|
load_in_4bit=load_quantized_model == "4bit", |
|
bnb_4bit_use_double_quant=load_quantized_model == "4bit", |
|
load_in_8bit=load_quantized_model == "8bit", |
|
) |
|
|
|
callbacks = [] |
|
if self.streamer is not None and self.streamer.for_huggingface: |
|
callbacks.append(self.streamer) |
|
if custom_handler is not None: |
|
callbacks.append(custom_handler) |
|
|
|
if self.llm is None: |
|
if self.llm_model_type == "openai": |
|
MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME") or "gpt-3.5-turbo" |
|
print(f" using model: {MODEL_NAME}") |
|
self.model_name = MODEL_NAME |
|
self.llm = ( |
|
OpenAI( |
|
model_name=MODEL_NAME, |
|
streaming=True, |
|
callbacks=callbacks, |
|
verbose=True, |
|
temperature=0, |
|
) |
|
if "instruct" in MODEL_NAME |
|
else ChatOpenAI( |
|
model_name=MODEL_NAME, |
|
streaming=True, |
|
callbacks=callbacks, |
|
verbose=True, |
|
temperature=0, |
|
) |
|
) |
|
elif self.llm_model_type == "google": |
|
MODEL_NAME = os.environ.get("GOOGLE_MODEL_NAME") or "gemini-pro" |
|
print(f" using model: {MODEL_NAME}") |
|
self.llm = ChatGoogleGenerativeAI( |
|
model=MODEL_NAME, |
|
callbacks=callbacks, |
|
streaming=True, |
|
safety_settings={ |
|
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, |
|
}, |
|
) |
|
|
|
|
|
|
|
|
|
elif self.llm_model_type.startswith("gpt4all"): |
|
MODEL_PATH = ensure_model_is_downloaded(self.llm_model_type) |
|
self.llm = GPT4All( |
|
model=MODEL_PATH, |
|
max_tokens=2048, |
|
n_threads=n_threds, |
|
backend="gptj" if self.llm_model_type == "gpt4all-j" else "llama", |
|
callbacks=callbacks, |
|
verbose=True, |
|
use_mlock=True, |
|
) |
|
elif self.llm_model_type == "llamacpp": |
|
MODEL_PATH = ensure_model_is_downloaded(self.llm_model_type) |
|
self.llm = LlamaCpp( |
|
model_path=MODEL_PATH, |
|
n_ctx=8192, |
|
n_threads=n_threds, |
|
seed=0, |
|
temperature=0, |
|
max_tokens=2048, |
|
callbacks=callbacks, |
|
verbose=True, |
|
use_mlock=True, |
|
) |
|
elif self.llm_model_type == "ctransformers": |
|
MODEL_PATH = ensure_model_is_downloaded(self.llm_model_type) |
|
config = { |
|
"max_new_tokens": self.max_tokens_limit, |
|
"repetition_penalty": 1.1, |
|
} |
|
self.llm = CTransformers( |
|
model=MODEL_PATH, |
|
model_type="llama", |
|
config=config, |
|
callbacks=callbacks, |
|
) |
|
elif self.llm_model_type == "hftgi": |
|
HFTGI_SERVER_URL = os.environ.get("HFTGI_SERVER_URL") |
|
HFTGI_RP = os.environ.get("HFTGI_RP") |
|
repetition_penalty = 1.120 if HFTGI_RP is None else float(HFTGI_RP) |
|
print(f" repetition_penalty: {repetition_penalty}") |
|
self.repetition_penalty = repetition_penalty |
|
self.max_tokens_limit = 4096 |
|
self.llm = HuggingFaceTextGenInference( |
|
inference_server_url=HFTGI_SERVER_URL, |
|
max_new_tokens=self.max_tokens_limit / 2, |
|
|
|
top_p=0.95, |
|
|
|
temperature=0.01, |
|
repetition_penalty=repetition_penalty, |
|
callbacks=callbacks, |
|
timeout=600, |
|
streaming=True, |
|
) |
|
elif self.llm_model_type == "ollama": |
|
MODEL_NAME = os.environ.get("OLLAMA_MODEL_NAME") or "mistral" |
|
self.model_name = MODEL_NAME |
|
print(f" loading model: {MODEL_NAME}") |
|
|
|
OLLAMA_RP = os.getenv("OLLAMA_RP") |
|
repetition_penalty = float(OLLAMA_RP) if OLLAMA_RP else 1.15 |
|
self.repetition_penalty = repetition_penalty |
|
print(f" repetition_penalty: {repetition_penalty}") |
|
|
|
self.llm = ChatOllama( |
|
model=MODEL_NAME, |
|
callbacks=callbacks, |
|
temperature=0, |
|
repeat_penalty=repetition_penalty, |
|
max_new_tokens=2048, |
|
max_tokens=8192, |
|
) |
|
elif self.llm_model_type == "vllm": |
|
MODEL_NAME = ( |
|
os.environ.get("HUGGINGFACE_MODEL_NAME_OR_PATH") |
|
or "google/gemma-1.1-2b-it" |
|
) |
|
print(f" loading model: {MODEL_NAME}") |
|
|
|
VLLM_RP = os.getenv("HF_RP") |
|
repetition_penalty = float(VLLM_RP) if VLLM_RP else 1.15 |
|
self.repetition_penalty = repetition_penalty |
|
print(f" repetition_penalty: {repetition_penalty}") |
|
|
|
vllm_kwargs = { |
|
"max_model_len": 4096, |
|
"enforce_eager": True, |
|
} |
|
|
|
quantization = os.getenv("VLLM_QUANTIZATION") |
|
if quantization: |
|
vllm_kwargs["quantization"] = quantization |
|
|
|
self.llm = VLLM( |
|
model=MODEL_NAME, |
|
callbacks=callbacks, |
|
temperature=0, |
|
repeat_penalty=repetition_penalty, |
|
top_p=0.95, |
|
max_new_tokens=2048, |
|
max_tokens=8192, |
|
tensor_parallel_size=torch.cuda.device_count(), |
|
trust_remote_code=True, |
|
vllm_kwargs=vllm_kwargs, |
|
) |
|
elif self.llm_model_type.startswith("huggingface"): |
|
MODEL_NAME_OR_PATH = os.environ.get("HUGGINGFACE_MODEL_NAME_OR_PATH") |
|
print(f" loading model: {MODEL_NAME_OR_PATH}") |
|
|
|
hf_auth_token = ( |
|
os.environ.get("HUGGINGFACE_AUTH_TOKEN") |
|
if "Llama-2" in MODEL_NAME_OR_PATH |
|
or "gemma" in MODEL_NAME_OR_PATH |
|
or "Mistral" in MODEL_NAME_OR_PATH |
|
else None |
|
) |
|
transformers_offline = os.environ.get("TRANSFORMERS_OFFLINE") == "1" |
|
token = ( |
|
hf_auth_token |
|
if hf_auth_token is not None |
|
and len(hf_auth_token) > 0 |
|
and not transformers_offline |
|
else None |
|
) |
|
print(f" HF auth token: {str(token)[-5:]}") |
|
|
|
if "Llama-2" in MODEL_NAME_OR_PATH: |
|
self.max_tokens_limit = 4096 |
|
elif "TinyLlama" in MODEL_NAME_OR_PATH: |
|
self.max_tokens_limit = 1024 |
|
|
|
class StopOnTokens(StoppingCriteria): |
|
def __call__( |
|
self, |
|
input_ids: torch.LongTensor, |
|
scores: torch.FloatTensor, |
|
**kwargs, |
|
) -> bool: |
|
stop_ids = [ |
|
2 |
|
] |
|
for stop_id in stop_ids: |
|
if ( |
|
input_ids[0][-1] == stop_id |
|
): |
|
return True |
|
return False |
|
|
|
stopping_criteria = StoppingCriteriaList([StopOnTokens()]) |
|
|
|
is_t5 = "t5" in MODEL_NAME_OR_PATH |
|
temperature = ( |
|
0.01 |
|
if "gpt4all-j" in MODEL_NAME_OR_PATH |
|
or "dolly" in MODEL_NAME_OR_PATH |
|
or "Qwen" in MODEL_NAME_OR_PATH |
|
or "Llama" in MODEL_NAME_OR_PATH |
|
or "Orca-2" in MODEL_NAME_OR_PATH |
|
or "phi-2" in MODEL_NAME_OR_PATH |
|
or "Phi-3" in MODEL_NAME_OR_PATH |
|
or "Mistral" in MODEL_NAME_OR_PATH |
|
or "gemma" in MODEL_NAME_OR_PATH |
|
else 0 |
|
) |
|
|
|
use_fast = ( |
|
"stable" in MODEL_NAME_OR_PATH |
|
or "RedPajama" in MODEL_NAME_OR_PATH |
|
or "dolly" in MODEL_NAME_OR_PATH |
|
) |
|
padding_side = "left" |
|
|
|
config = ( |
|
AutoConfig.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
trust_remote_code=True, |
|
token=token, |
|
fp32=hf_pipeline_device_type == "cpu", |
|
bf16=( |
|
hf_pipeline_device_type != "cpu" |
|
and torch_dtype == torch.bfloat16 |
|
), |
|
fp16=( |
|
hf_pipeline_device_type != "cpu" |
|
and torch_dtype != torch.bfloat16 |
|
), |
|
) |
|
if "Qwen" in MODEL_NAME_OR_PATH |
|
else AutoConfig.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
trust_remote_code=True, |
|
token=token, |
|
) |
|
) |
|
|
|
|
|
|
|
|
|
|
|
tokenizer = ( |
|
T5Tokenizer.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
token=token, |
|
) |
|
if is_t5 |
|
else AutoTokenizer.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
use_fast=use_fast, |
|
trust_remote_code=True, |
|
padding_side=padding_side, |
|
token=token, |
|
) |
|
) |
|
|
|
self._init_hf_streamer(tokenizer) |
|
|
|
task = "text2text-generation" if is_t5 else "text-generation" |
|
|
|
return_full_text = True if "dolly" in MODEL_NAME_OR_PATH else False |
|
|
|
repetition_penalty = ( |
|
1.15 |
|
if "falcon" in MODEL_NAME_OR_PATH |
|
else (1.25 if "dolly" in MODEL_NAME_OR_PATH else 1.1) |
|
) |
|
|
|
HF_RP = os.environ.get("HF_RP") |
|
if HF_RP is not None and len(HF_RP) > 0: |
|
repetition_penalty = float(HF_RP) |
|
print(f" repetition_penalty: {repetition_penalty}") |
|
self.repetition_penalty = repetition_penalty |
|
self.model_name = MODEL_NAME_OR_PATH.split("/")[-1] |
|
|
|
if load_quantized_model is not None: |
|
model = ( |
|
AutoModelForSeq2SeqLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
quantization_config=double_quant_config, |
|
trust_remote_code=True, |
|
token=token, |
|
) |
|
if is_t5 |
|
else AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
quantization_config=double_quant_config, |
|
trust_remote_code=True, |
|
token=token, |
|
) |
|
) |
|
|
|
print(f"Model memory footprint: {model.get_memory_footprint()}") |
|
|
|
eos_token_id = -1 |
|
|
|
if "starchat" in MODEL_NAME_OR_PATH: |
|
eos_token_id = 49155 |
|
pad_token_id = eos_token_id |
|
|
|
pipe = ( |
|
pipeline( |
|
task, |
|
model=model, |
|
tokenizer=tokenizer, |
|
eos_token_id=eos_token_id, |
|
pad_token_id=pad_token_id, |
|
streamer=self.streamer, |
|
return_full_text=return_full_text, |
|
device_map="auto", |
|
trust_remote_code=True, |
|
max_new_tokens=2048, |
|
do_sample=True, |
|
temperature=0.01, |
|
top_p=0.95, |
|
top_k=50, |
|
repetition_penalty=repetition_penalty, |
|
) |
|
if eos_token_id != -1 |
|
else pipeline( |
|
task, |
|
model=model, |
|
tokenizer=tokenizer, |
|
streamer=self.streamer, |
|
return_full_text=return_full_text, |
|
device_map="auto", |
|
trust_remote_code=True, |
|
max_new_tokens=2048, |
|
do_sample=True, |
|
temperature=temperature, |
|
top_p=0.95, |
|
top_k=0, |
|
repetition_penalty=repetition_penalty, |
|
) |
|
) |
|
else: |
|
if os.environ.get("DISABLE_MODEL_PRELOADING") != "true": |
|
model = ( |
|
AutoModelForSeq2SeqLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
trust_remote_code=True, |
|
) |
|
if is_t5 |
|
else ( |
|
AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
trust_remote_code=True, |
|
) |
|
if "Qwen" in MODEL_NAME_OR_PATH |
|
else ( |
|
AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
trust_remote_code=True, |
|
) |
|
if token is None |
|
else AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME_OR_PATH, |
|
config=config, |
|
trust_remote_code=True, |
|
token=token, |
|
) |
|
) |
|
) |
|
) |
|
print(f"Model memory footprint: {model.get_memory_footprint()}") |
|
model = model.eval() |
|
|
|
else: |
|
model = MODEL_NAME_OR_PATH |
|
|
|
pipe = pipeline( |
|
task, |
|
model=model, |
|
tokenizer=tokenizer, |
|
streamer=self.streamer, |
|
return_full_text=return_full_text, |
|
device_map="auto", |
|
torch_dtype=torch_dtype, |
|
max_new_tokens=2048, |
|
trust_remote_code=True, |
|
do_sample=True, |
|
temperature=temperature, |
|
top_p=0.95, |
|
top_k=0, |
|
repetition_penalty=repetition_penalty, |
|
token=token, |
|
batch_size=self.batch_size, |
|
) |
|
|
|
pipe.model.config.pad_token_id = pipe.model.config.eos_token_id |
|
pipe.tokenizer.pad_token_id = pipe.model.config.eos_token_id |
|
self.llm = HuggingFacePipeline(pipeline=pipe, callbacks=callbacks) |
|
|
|
print("initialization complete") |
|
|