|
from typing import Union, Mapping, Optional, Tuple, TypedDict, Dict, List |
|
from functools import partial |
|
|
|
import torch |
|
import numpy as np |
|
from transformers import AutoModel, PreTrainedTokenizerFast, BatchEncoding, DataCollatorWithPadding |
|
from transformers.models.auto import AutoTokenizer |
|
from transformers.models.llama.modeling_llama import LLAMA_INPUTS_DOCSTRING |
|
from transformers.modeling_outputs import BaseModelOutputWithPast |
|
from transformers.modeling_attn_mask_utils import _prepare_4d_attention_mask, _prepare_4d_attention_mask_for_sdpa |
|
from transformers import LlamaModel |
|
from transformers.cache_utils import Cache, DynamicCache |
|
from transformers.utils import ( |
|
add_start_docstrings_to_model_forward, |
|
logging, |
|
) |
|
from tqdm.auto import tqdm |
|
from datasets import Dataset |
|
from torch.utils.data import DataLoader |
|
from .configuration_conan import ConanEmbedConfig |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
class ConanEmbedFeatures(TypedDict): |
|
input_dict: torch.Tensor |
|
attention_mask: torch.Tensor |
|
pool_mask: torch.Tensor |
|
|
|
|
|
def _move_to_device(maybe_tensor, device: torch.device): |
|
if torch.is_tensor(maybe_tensor): |
|
return maybe_tensor.to(device, non_blocking=device.type == "cuda") |
|
elif isinstance(maybe_tensor, dict): |
|
return {key: _move_to_device(value, device) for key, value in maybe_tensor.items()} |
|
elif isinstance(maybe_tensor, list): |
|
return [_move_to_device(x, device) for x in maybe_tensor] |
|
elif isinstance(maybe_tensor, tuple): |
|
return tuple([_move_to_device(x, device) for x in maybe_tensor]) |
|
elif isinstance(maybe_tensor, Mapping): |
|
return type(maybe_tensor)({k: _move_to_device(v, device) for k, v in maybe_tensor.items()}) |
|
else: |
|
return maybe_tensor |
|
|
|
|
|
def move_to_device(sample, device: torch.device): |
|
if device.type == "cpu": |
|
return sample |
|
|
|
if len(sample) == 0: |
|
return {} |
|
return _move_to_device(sample, device) |
|
|
|
|
|
def input_transform_func( |
|
tokenizer: PreTrainedTokenizerFast, |
|
examples: Dict[str, List], |
|
always_add_eos: bool, |
|
max_length: int, |
|
instruction: str, |
|
) -> BatchEncoding: |
|
if always_add_eos: |
|
examples["input_texts"] = [ |
|
instruction + input_example + tokenizer.eos_token for input_example in examples["input_texts"] |
|
] |
|
print(examples["input_texts"]) |
|
batch_dict = tokenizer( |
|
examples["input_texts"], |
|
max_length=max_length, |
|
padding=True, |
|
return_token_type_ids=False, |
|
return_tensors="pt", |
|
truncation=True, |
|
) |
|
print(examples["input_texts"]) |
|
return batch_dict |
|
|
|
|
|
class ConanEmbedModel(LlamaModel): |
|
config_class = ConanEmbedConfig |
|
|
|
def __init__(self, config: ConanEmbedConfig) -> None: |
|
""" |
|
Initialize the model with a given configuration. |
|
|
|
Args: |
|
config (ConanEmbedConfig): The configuration for the model. |
|
""" |
|
super().__init__(config) |
|
for layer in self.layers: |
|
layer.self_attn.is_causal = not config.do_dir |
|
self._attn_implementation = "eager" |
|
self.tokenizer = AutoTokenizer.from_pretrained(config._name_or_path) |
|
self.padding_side = config.padding_side |
|
self.is_mask_instruction = config.is_mask_instruction |
|
self.add_eos = config.add_eos |
|
self.mask_type = config.mask_type |
|
self.sentence_pooling_method = config.sentence_pooling_method |
|
if config.add_pad_token and self.tokenizer is not None: |
|
self.add_pad_token() |
|
|
|
def add_pad_token(self): |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
self.tokenizer.padding_side = self.padding_side |
|
|
|
def _sentence_embedding(self, last_hidden_state, attention_mask=None): |
|
"""Use the pooling method to get the sentence embedding. |
|
|
|
Args: |
|
last_hidden_state (torch.Tensor): The model output's last hidden state. |
|
attention_mask (torch.Tensor): Mask out padding tokens during pooling. |
|
|
|
Raises: |
|
NotImplementedError: Specified pooling method not implemented. |
|
|
|
Returns: |
|
torch.Tensor: The sentence embeddings. |
|
""" |
|
if self.sentence_pooling_method == "cls": |
|
return last_hidden_state[:, 0] |
|
elif self.sentence_pooling_method == "mean": |
|
s = torch.sum(last_hidden_state, dim=1) |
|
|
|
return s |
|
elif self.sentence_pooling_method == "last_token": |
|
left_padding = attention_mask[:, -1].sum() == attention_mask.shape[0] |
|
if left_padding: |
|
return last_hidden_state[:, -1] |
|
else: |
|
sequence_lengths = attention_mask.sum(dim=1) - 1 |
|
batch_size = last_hidden_state.shape[0] |
|
return last_hidden_state[ |
|
torch.arange(batch_size, device=last_hidden_state.device), |
|
sequence_lengths, |
|
] |
|
else: |
|
raise NotImplementedError(f"pooling method {self.sentence_pooling_method} not implemented") |
|
|
|
def prepare_kwargs_from_batch( |
|
self, |
|
batch_dict: Dict[str, torch.Tensor], |
|
instruction_lens: int, |
|
device: torch.device, |
|
) -> ConanEmbedFeatures: |
|
""" |
|
Prepare the batch dictionary for encoding. |
|
|
|
Args: |
|
batch_dict: A dictionary containing the input_ids and attention_mask. |
|
instruction_lens: The length of the instruction. |
|
device: The device to move the data to. |
|
|
|
Returns: |
|
A ConanEmbedFeatures object with the prepared input_ids and attention_mask. |
|
""" |
|
batch_dict = move_to_device(batch_dict, device) |
|
attention_mask = batch_dict["attention_mask"].clone() if "attention_mask" in batch_dict else None |
|
if ( |
|
attention_mask is not None |
|
and self.padding_side == "right" |
|
and self.is_mask_instruction |
|
and instruction_lens > 0 |
|
): |
|
|
|
attention_mask[:, :instruction_lens] = 0 |
|
features: ConanEmbedFeatures = { |
|
"input_ids": torch.tensor(batch_dict.get("input_ids").to(batch_dict.get("input_ids")).long()), |
|
"attention_mask": batch_dict["attention_mask"], |
|
} |
|
return features |
|
|
|
@torch.no_grad() |
|
def _do_encode( |
|
self, |
|
prompts: List[str], |
|
batch_size: int = 1, |
|
instruction: str = "", |
|
max_length: int = 4096, |
|
num_workers: int = 32, |
|
return_numpy: bool = False, |
|
) -> Union[torch.FloatTensor, np.ndarray]: |
|
""" |
|
Encode a list of prompts using the model. |
|
|
|
Args: |
|
prompts: A list of prompts to encode. |
|
batch_size: The batch size to use for encoding. Defaults to 1. |
|
instruction: An instruction to prepend to the prompts. Defaults to "". |
|
max_length: The maximum length of the input_ids. Defaults to 4096. |
|
num_workers: The number of workers to use for encoding. Defaults to 32. |
|
return_numpy: Whether to return the output as a numpy array or a torch tensor. Defaults to False. |
|
|
|
Returns: |
|
A tensor or numpy array of shape (len(prompts), hidden_size) containing the encoded prompts. |
|
""" |
|
dataset: Dataset = Dataset.from_dict({"input_texts": prompts}) |
|
dataset.set_transform( |
|
partial( |
|
input_transform_func, |
|
self.tokenizer, |
|
always_add_eos=True, |
|
max_length=max_length, |
|
instruction=instruction, |
|
) |
|
) |
|
|
|
data_collator = DataCollatorWithPadding(self.tokenizer) |
|
data_loader = DataLoader( |
|
dataset, |
|
batch_size=batch_size, |
|
shuffle=False, |
|
drop_last=False, |
|
num_workers=num_workers, |
|
collate_fn=data_collator, |
|
pin_memory=True, |
|
) |
|
|
|
if self.padding_side == "right" and self.is_mask_instruction and len(instruction) > 0: |
|
instruction_lens = len(self.tokenizer.tokenize(instruction)) |
|
else: |
|
instruction_lens = 0 |
|
|
|
encoded_embeds: List[torch.Tensor] = [] |
|
device = next(self.parameters()).device |
|
for batch_dict in tqdm(data_loader, desc="encoding", mininterval=10): |
|
features = self.prepare_kwargs_from_batch(batch_dict, instruction_lens, device=device) |
|
embeds = self(**features)["sentence_embeddings"].squeeze(1) |
|
encoded_embeds.append(embeds) |
|
encoded_embeds = torch.cat(encoded_embeds, axis=0) |
|
if return_numpy: |
|
encoded_embeds = encoded_embeds.cpu().detach().numpy() |
|
return encoded_embeds |
|
|
|
@add_start_docstrings_to_model_forward(LLAMA_INPUTS_DOCSTRING) |
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
token_type_ids: Optional[torch.LongTensor] = None, |
|
) -> Union[Tuple[torch.Tensor, ...], BaseModelOutputWithPast]: |
|
""" |
|
Args: |
|
input_ids: a tensor of shape (batch_size, sequence_length) |
|
attention_mask: a tensor of shape (batch_size, sequence_length) |
|
position_ids: a tensor of shape (batch_size, sequence_length) |
|
past_key_values: a list of tensors of shape (batch_size, key_length, hidden_size) |
|
inputs_embeds: a tensor of shape (batch_size, sequence_length, hidden_size) |
|
use_cache: a boolean indicating whether to use the cache |
|
output_attentions: a boolean indicating whether to output the attention weights |
|
output_hidden_states: a boolean indicating whether to output the hidden states |
|
return_dict: a boolean indicating whether to return a dictionary |
|
|
|
Returns: |
|
a tuple of length 4 containing the last hidden state, the cache, the hidden states, |
|
and the attention weights |
|
or a BaseModelOutputWithPast object |
|
""" |
|
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions |
|
output_hidden_states = ( |
|
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
|
) |
|
use_cache = use_cache if use_cache is not None else self.config.use_cache |
|
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
if input_ids is not None and inputs_embeds is not None: |
|
raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") |
|
elif input_ids is not None: |
|
batch_size, seq_length = input_ids.shape |
|
elif inputs_embeds is not None: |
|
batch_size, seq_length, _ = inputs_embeds.shape |
|
else: |
|
raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds") |
|
|
|
if self.gradient_checkpointing and self.training: |
|
if use_cache: |
|
logger.warning_once( |
|
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." |
|
) |
|
use_cache = False |
|
|
|
past_key_values_length = 0 |
|
|
|
if use_cache: |
|
use_legacy_cache = not isinstance(past_key_values, Cache) |
|
if use_legacy_cache: |
|
past_key_values = DynamicCache.from_legacy_cache(past_key_values) |
|
past_key_values_length = past_key_values.get_usable_length(seq_length) |
|
|
|
if position_ids is None: |
|
device = input_ids.device if input_ids is not None else inputs_embeds.device |
|
position_ids = torch.arange( |
|
past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device |
|
) |
|
position_ids = position_ids.unsqueeze(0).view(-1, seq_length) |
|
else: |
|
position_ids = position_ids.view(-1, seq_length).long() |
|
|
|
if inputs_embeds is None: |
|
inputs_embeds = self.embed_tokens(input_ids) |
|
|
|
if attention_mask is not None and self._attn_implementation == "flash_attention_2" and use_cache: |
|
is_padding_right = attention_mask[:, -1].sum().item() != batch_size |
|
if is_padding_right: |
|
raise ValueError( |
|
"You are attempting to perform batched generation with padding_side='right'" |
|
" this may lead to unexpected behaviour for Flash Attention version of Mistral. Make sure to " |
|
" call `tokenizer.padding_side = 'left'` before tokenizing the input. " |
|
) |
|
|
|
if self._attn_implementation == "flash_attention_2": |
|
|
|
attention_mask = attention_mask if (attention_mask is not None and 0 in attention_mask) else None |
|
elif self._attn_implementation == "sdpa" and not output_attentions: |
|
|
|
|
|
attention_mask = _prepare_4d_attention_mask_for_sdpa(attention_mask, inputs_embeds.dtype) |
|
else: |
|
|
|
attention_mask = _prepare_4d_attention_mask( |
|
attention_mask, |
|
inputs_embeds.dtype, |
|
) |
|
|
|
hidden_states = inputs_embeds |
|
|
|
|
|
all_hidden_states = () if output_hidden_states else None |
|
all_self_attns = () if output_attentions else None |
|
next_decoder_cache = None |
|
|
|
for decoder_layer in self.layers: |
|
if output_hidden_states: |
|
all_hidden_states += (hidden_states,) |
|
|
|
if self.gradient_checkpointing and self.training: |
|
layer_outputs = self._gradient_checkpointing_func( |
|
decoder_layer.__call__, |
|
hidden_states, |
|
attention_mask, |
|
position_ids, |
|
past_key_values, |
|
output_attentions, |
|
use_cache, |
|
) |
|
else: |
|
layer_outputs = decoder_layer( |
|
hidden_states, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_value=past_key_values, |
|
output_attentions=output_attentions, |
|
use_cache=use_cache, |
|
) |
|
|
|
hidden_states = layer_outputs[0] |
|
|
|
if use_cache: |
|
next_decoder_cache = layer_outputs[2 if output_attentions else 1] |
|
|
|
if output_attentions: |
|
all_self_attns += (layer_outputs[1],) |
|
|
|
hidden_states = self.norm(hidden_states) |
|
|
|
|
|
if output_hidden_states: |
|
all_hidden_states += (hidden_states,) |
|
|
|
next_cache = None |
|
if use_cache: |
|
next_cache = next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache |
|
|
|
if not return_dict: |
|
return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None) |
|
|
|
return BaseModelOutputWithPast( |
|
last_hidden_state=hidden_states, |
|
past_key_values=next_cache, |
|
hidden_states=all_hidden_states, |
|
attentions=all_self_attns, |
|
) |
|
|
|
@torch.no_grad() |
|
def encode( |
|
self, |
|
prompts: List[str], |
|
instruction: str = "", |
|
max_length: int = 4096, |
|
) -> Dict[str, torch.Tensor]: |
|
""" |
|
Encode a list of prompts and an instruction using the model. |
|
|
|
Args: |
|
prompts: A list of prompts to encode. |
|
instruction: An instruction to prepend to the prompts. Defaults to "". |
|
max_length: The maximum length of the input_ids. Defaults to 4096. |
|
|
|
Returns: |
|
A dictionary containing the sentence embeddings with key "sentence_embeddings". |
|
""" |
|
if self.padding_side == "right" and self.is_mask_instruction and len(instruction) > 0: |
|
instruction_lens = len(self.tokenizer.tokenize(instruction)) |
|
else: |
|
instruction_lens = 0 |
|
|
|
device = next(self.parameters()).device |
|
batch_dict = input_transform_func( |
|
self.tokenizer, |
|
{"input_texts": [prompt for prompt in prompts]}, |
|
always_add_eos=False, |
|
max_length=max_length, |
|
instruction=instruction, |
|
) |
|
|
|
features: ConanEmbedFeatures = self.prepare_kwargs_from_batch(batch_dict, instruction_lens, device=device) |
|
outputs = self(**features) |
|
|
|
embeds = self._sentence_embedding(outputs.last_hidden_state) |
|
return {"sentence_embeddings": embeds} |
|
|
|
|
|
|
|
AutoModel.register(ConanEmbedConfig, ConanEmbedModel) |
|
|
|
|
|
ConanEmbedModel.register_for_auto_class("AutoModel") |
|
|