"""PyTorch OpenAI GPT-2 model, code copied from Huggingface""" import math import os import warnings from dataclasses import dataclass from typing import Callable, Optional, Tuple, Union import torch import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from transformers.activations import ACT2FN from transformers.generation import GenerationMixin from transformers.modeling_attn_mask_utils import _prepare_4d_attention_mask_for_sdpa, _prepare_4d_causal_attention_mask_for_sdpa from transformers.modeling_outputs import ( BaseModelOutputWithPastAndCrossAttentions, CausalLMOutputWithCrossAttentions, QuestionAnsweringModelOutput, SequenceClassifierOutputWithPast, TokenClassifierOutput, ) from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel, SequenceSummary from transformers.pytorch_utils import Conv1D, find_pruneable_heads_and_indices, prune_conv1d_layer from transformers.utils import ( ModelOutput, add_code_sample_docstrings, add_start_docstrings, add_start_docstrings_to_model_forward, logging, replace_return_docstrings, ) from transformers.utils.model_parallel_utils import assert_device_map, get_device_map from transformers.models.gpt2.configuration_gpt2 import GPT2Config from src.models.modeling_gpt2 import GPT2PreTrainedModel, GPT2Block from transformers.models.gpt2.configuration_gpt2 import GPT2Config from transformers.modeling_attn_mask_utils import _prepare_4d_attention_mask_for_sdpa, _prepare_4d_causal_attention_mask_for_sdpa logger = logging.get_logger(__name__) import torch def create_attention_mask_matrix(tn): # Initialize the 10x10 matrix tn = tn + 1 ### add 1 for the extra token to create correct matrix, temporary fix matrix = torch.zeros(tn, tn) # Define odd columns mask (j=1,3,5,7,9) odd_cols = torch.arange(tn) % 2 == 1 # [False, True, False, True, ..., True] # Define row indices odd_rows = torch.tensor([x for x in range(1, tn) if x%2==1]) even_rows = torch.tensor([x for x in range(1, tn) if x%2==0]) # For odd rows: ones at odd columns j ≤ i # Use tril to get 1s where j ≤ i, then mask with odd columns tril_matrix = torch.tril(torch.ones(tn, tn)) matrix[odd_rows, :] = tril_matrix[odd_rows, :] * odd_cols # For even rows: ones at odd j ≤ i-2, plus j=i and j=i+1 # Use tril with diagonal=-2 for j ≤ i-2, mask with odd columns tril_minus2 = torch.tril(torch.ones(tn, tn), diagonal=-2) matrix[even_rows, :] = tril_minus2[even_rows, :] * odd_cols # Set specific positions for even rows matrix[even_rows, even_rows] = 1 # j = i matrix[even_rows, even_rows + 1] = 1 # j = i+1 return matrix[1:, 1:].bool() # Efficient implementation equivalent to the following: def scaled_dot_product_attention(query, key, value, attn_mask=None, dropout_p=0.0, is_causal=False, scale=None, enable_gqa=False) -> torch.Tensor: L, S = query.size(-2), key.size(-2) scale_factor = 1 / math.sqrt(query.size(-1)) if scale is None else scale attn_bias = torch.zeros(L, S, dtype=query.dtype, device=query.device) if is_causal: assert attn_mask is None temp_mask = torch.ones(L, S, dtype=torch.bool, device=query.device).tril(diagonal=0) attn_bias.masked_fill_(temp_mask.logical_not(), float("-inf")) attn_bias.to(query.dtype) if attn_mask is not None: if attn_mask.dtype == torch.bool: attn_bias.masked_fill_(attn_mask.logical_not(), float("-inf")) else: attn_bias = attn_mask + attn_bias if enable_gqa: key = key.repeat_interleave(query.size(-3)//key.size(-3), -3) value = value.repeat_interleave(query.size(-3)//value.size(-3), -3) attn_weight = query @ key.transpose(-2, -1) * scale_factor attn_weight += attn_bias attn_weight = torch.softmax(attn_weight, dim=-1) attn_weight = torch.dropout(attn_weight, dropout_p, train=True) return attn_weight @ value def sdpa_attention_forward( module: torch.nn.Module, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attention_mask: Optional[torch.Tensor], dropout: float = 0.0, scaling: Optional[float] = None, is_causal: Optional[bool] = None, **kwargs, ) -> Tuple[torch.Tensor, None]: if hasattr(module, "num_key_value_groups"): key = repeat_kv(key, module.num_key_value_groups) value = repeat_kv(value, module.num_key_value_groups) # SDPA with memory-efficient backend is bugged with non-contiguous inputs and custom attn_mask for some torch versions # Reference: https://github.com/pytorch/pytorch/issues/112577. query = query.contiguous() key = key.contiguous() value = value.contiguous() # Shapes (e.g. query.shape[2]) are tensors during jit tracing, resulting in `is_causal` being a tensor. # We convert it to a bool for the SDPA kernel that only accepts bools. if torch.jit.is_tracing() and isinstance(is_causal, torch.Tensor): is_causal = is_causal.item() attn_output = scaled_dot_product_attention( query, key, value, attn_mask=create_attention_mask_matrix(query.shape[-2]).to(query.device) if query.shape[1]>module.config.max_position_embeddings else None, dropout_p=dropout, scale=scaling, is_causal=False if query.shape[1]>module.config.max_position_embeddings else True, ) attn_output = attn_output.transpose(1, 2).contiguous() return attn_output, None class DuoPredictGPT2Config(GPT2Config): model_type = "duo-predict-gpt2" architectures = ["DuoPredictGPT2LMHeadModel"] class DuoPredictGPT2Attention(nn.Module): def __init__(self, config, is_cross_attention=False, layer_idx=None): super().__init__() self.config = config max_positions = config.max_position_embeddings self.register_buffer( "bias", torch.tril(torch.ones((max_positions, max_positions), dtype=torch.bool)).view( 1, 1, max_positions, max_positions ), persistent=False, ) self.register_buffer("masked_bias", torch.tensor(-1e4), persistent=False) self.embed_dim = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = self.embed_dim // self.num_heads self.split_size = self.embed_dim if self.head_dim * self.num_heads != self.embed_dim: raise ValueError( f"`embed_dim` must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {self.num_heads})." ) self.scale_attn_weights = config.scale_attn_weights self.is_cross_attention = is_cross_attention # Layer-wise attention scaling, reordering, and upcasting self.scale_attn_by_inverse_layer_idx = config.scale_attn_by_inverse_layer_idx self.layer_idx = layer_idx self.reorder_and_upcast_attn = config.reorder_and_upcast_attn if self.is_cross_attention: self.c_attn = Conv1D(2 * self.embed_dim, self.embed_dim) self.q_attn = Conv1D(self.embed_dim, self.embed_dim) else: self.c_attn = Conv1D(3 * self.embed_dim, self.embed_dim) self.c_proj = Conv1D(self.embed_dim, self.embed_dim) self.attn_dropout = nn.Dropout(config.attn_pdrop) self.resid_dropout = nn.Dropout(config.resid_pdrop) self.is_causal = True self.pruned_heads = set() def prune_heads(self, heads): if len(heads) == 0: return heads, index = find_pruneable_heads_and_indices(heads, self.num_heads, self.head_dim, self.pruned_heads) index_attn = torch.cat([index, index + self.split_size, index + (2 * self.split_size)]) # Prune conv1d layers self.c_attn = prune_conv1d_layer(self.c_attn, index_attn, dim=1) self.c_proj = prune_conv1d_layer(self.c_proj, index, dim=0) # Update hyper params self.split_size = (self.split_size // self.num_heads) * (self.num_heads - len(heads)) self.num_heads = self.num_heads - len(heads) self.pruned_heads = self.pruned_heads.union(heads) def _upcast_and_reordered_attn(self, query, key, value, attention_mask=None, head_mask=None): # Use `torch.baddbmm` (a bit more efficient w/ alpha param for scaling -- from Megatron-LM) bsz, num_heads, q_seq_len, dk = query.size() _, _, k_seq_len, _ = key.size() # Preallocate attn_weights for `baddbmm` attn_weights = torch.empty(bsz * num_heads, q_seq_len, k_seq_len, dtype=torch.float32, device=query.device) # Compute Scale Factor scale_factor = 1.0 if self.scale_attn_weights: scale_factor /= float(value.size(-1)) ** 0.5 if self.scale_attn_by_inverse_layer_idx: scale_factor /= float(self.layer_idx + 1) # Upcast (turn off autocast) and reorder (Scale K by 1 / root(dk)) with torch.amp.autocast(query.device.type, enabled=False): q, k = query.reshape(-1, q_seq_len, dk), key.transpose(-1, -2).reshape(-1, dk, k_seq_len) attn_weights = torch.baddbmm(attn_weights, q.float(), k.float(), beta=0, alpha=scale_factor) attn_weights = attn_weights.reshape(bsz, num_heads, q_seq_len, k_seq_len) if not self.is_cross_attention: # if only "normal" attention layer implements causal mask query_length, key_length = query.size(-2), key.size(-2) causal_mask = self.bias[:, :, key_length - query_length : key_length, :key_length] mask_value = torch.finfo(attn_weights.dtype).min # Need to be a tensor, otherwise we get error: `RuntimeError: expected scalar type float but found double`. # Need to be on the same device, otherwise `RuntimeError: ..., x and y to be on the same device` mask_value = torch.tensor(mask_value, dtype=attn_weights.dtype).to(attn_weights.device) attn_weights = torch.where(causal_mask, attn_weights, mask_value) if attention_mask is not None: # Apply the attention mask attn_weights = attn_weights + attention_mask attn_weights = nn.functional.softmax(attn_weights, dim=-1) # Downcast (if necessary) back to V's dtype (if in mixed-precision) -- No-Op if otherwise if attn_weights.dtype != torch.float32: raise RuntimeError("Error with upcasting, attn_weights does not have dtype torch.float32") attn_weights = attn_weights.type(value.dtype) attn_weights = self.attn_dropout(attn_weights) # Mask heads if we want to if head_mask is not None: attn_weights = attn_weights * head_mask attn_output = torch.matmul(attn_weights, value) attn_output = attn_output.transpose(1, 2) return attn_output, attn_weights def forward( self, hidden_states: Optional[Tuple[torch.FloatTensor]], layer_past: Optional[Tuple[torch.Tensor]] = None, attention_mask: Optional[torch.FloatTensor] = None, head_mask: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = False, output_attentions: Optional[bool] = False, **kwargs, ) -> Tuple[Union[torch.Tensor, Tuple[torch.Tensor]], ...]: if encoder_hidden_states is not None: if not hasattr(self, "q_attn"): raise ValueError( "If class is used as cross attention, the weights `q_attn` have to be defined. " "Please make sure to instantiate class with `GPT2Attention(..., is_cross_attention=True)`." ) query_states = self.q_attn(hidden_states) key_states, value_states = self.c_attn(encoder_hidden_states).split(self.split_size, dim=2) attention_mask = encoder_attention_mask else: query_states, key_states, value_states = self.c_attn(hidden_states).split(self.split_size, dim=2) shape_q = (*query_states.shape[:-1], -1, self.head_dim) shape_kv = (*key_states.shape[:-1], -1, self.head_dim) query_states = query_states.view(shape_q).transpose(1, 2) key_states = key_states.view(shape_kv).transpose(1, 2) value_states = value_states.view(shape_kv).transpose(1, 2) if layer_past is not None: past_key, past_value = layer_past key_states = torch.cat((past_key, key_states), dim=-2) value_states = torch.cat((past_value, value_states), dim=-2) if use_cache is True: present = (key_states, value_states) else: present = None is_cross_attention = encoder_hidden_states is not None is_causal = False #attention_mask is None and query_states.shape[-2] > 1 and not is_cross_attention using_eager = self.config._attn_implementation == "eager" # attention_interface: Callable = eager_attention_forward # if self.config._attn_implementation != "eager": # if self.config._attn_implementation == "sdpa" and (output_attentions or head_mask is not None): # using_eager = True # logger.warning_once( # "`torch.nn.functional.scaled_dot_product_attention` does not support `output_attentions=True`. Falling back to " # 'eager attention. This warning can be removed using the argument `attn_implementation="eager"` when loading the model.' # ) # else: # # Attention functions are consistent with previous equivalent attention classes, however they do not support some options # # (e.g. layer scaling, head mask) that eager supports. These implementations are thus equivalent to previous code, but # # not necessarily to eager (if mentionned options are provided). # attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] attention_interface = sdpa_attention_forward if using_eager and self.reorder_and_upcast_attn: attn_output, attn_weights = self._upcast_and_reordered_attn( query_states, key_states, value_states, attention_mask, head_mask ) else: attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, head_mask=head_mask, dropout=self.attn_dropout.p if self.training else 0.0, is_causal=is_causal, **kwargs, ) attn_output = attn_output.reshape(*attn_output.shape[:-2], -1).contiguous() attn_output = self.c_proj(attn_output) attn_output = self.resid_dropout(attn_output) outputs = (attn_output, present) if output_attentions: outputs += (attn_weights,) return outputs # a, present, (attentions) class DuoPredictGPT2MLP(nn.Module): def __init__(self, intermediate_size, config): super().__init__() embed_dim = config.hidden_size self.c_fc = Conv1D(intermediate_size, embed_dim) self.c_proj = Conv1D(embed_dim, intermediate_size) self.act = ACT2FN[config.activation_function] self.dropout = nn.Dropout(config.resid_pdrop) def forward(self, hidden_states: Optional[Tuple[torch.FloatTensor]]) -> torch.FloatTensor: hidden_states = self.c_fc(hidden_states) hidden_states = self.act(hidden_states) hidden_states = self.c_proj(hidden_states) hidden_states = self.dropout(hidden_states) return hidden_states class DuoPredictGPT2Block(nn.Module): def __init__(self, config, layer_idx=None): super().__init__() hidden_size = config.hidden_size inner_dim = config.n_inner if config.n_inner is not None else 4 * hidden_size self.ln_1 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) self.attn = DuoPredictGPT2Attention(config=config, layer_idx=layer_idx) self.ln_2 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) if config.add_cross_attention: self.crossattention = DuoPredictGPT2Attention(config=config, is_cross_attention=True, layer_idx=layer_idx) self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon) self.mlp = DuoPredictGPT2MLP(inner_dim, config) def forward( self, hidden_states: Optional[Tuple[torch.FloatTensor]], layer_past: Optional[Tuple[torch.Tensor]] = None, attention_mask: Optional[torch.FloatTensor] = None, head_mask: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = False, output_attentions: Optional[bool] = False, ) -> Union[Tuple[torch.Tensor], Optional[Tuple[torch.Tensor, Tuple[torch.FloatTensor, ...]]]]: residual = hidden_states hidden_states = self.ln_1(hidden_states) attn_outputs = self.attn( hidden_states, layer_past=layer_past, attention_mask=attention_mask, head_mask=head_mask, use_cache=use_cache, output_attentions=output_attentions, ) attn_output = attn_outputs[0] # output_attn: a, present, (attentions) outputs = attn_outputs[1:] # residual connection hidden_states = attn_output + residual if encoder_hidden_states is not None: # add one self-attention block for cross-attention if not hasattr(self, "crossattention"): raise ValueError( f"If `encoder_hidden_states` are passed, {self} has to be instantiated with " "cross-attention layers by setting `config.add_cross_attention=True`" ) residual = hidden_states hidden_states = self.ln_cross_attn(hidden_states) cross_attn_outputs = self.crossattention( hidden_states, attention_mask=attention_mask, head_mask=head_mask, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, output_attentions=output_attentions, ) attn_output = cross_attn_outputs[0] # residual connection hidden_states = residual + attn_output outputs = outputs + cross_attn_outputs[2:] # add cross attentions if we output attention weights residual = hidden_states hidden_states = self.ln_2(hidden_states) feed_forward_hidden_states = self.mlp(hidden_states) # residual connection hidden_states = residual + feed_forward_hidden_states if use_cache: outputs = (hidden_states,) + outputs else: outputs = (hidden_states,) + outputs[1:] return outputs # hidden_states, present, (attentions, cross_attentions) class DuoPredictGPT2PretrainedModel(GPT2PreTrainedModel): config_class = DuoPredictGPT2Config class DuoPredictGPT2Model(DuoPredictGPT2PretrainedModel): _supports_param_buffer_assignment = False def __init__(self, config): super().__init__(config) self.embed_dim = config.hidden_size self.wte = nn.Embedding(config.vocab_size, self.embed_dim) self.wpe = nn.Embedding(config.max_position_embeddings, self.embed_dim) self.drop = nn.Dropout(config.embd_pdrop) self.h = nn.ModuleList([DuoPredictGPT2Block(config, layer_idx=i) for i in range(config.num_hidden_layers)]) self.ln_f = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_epsilon) # Model parallel self.model_parallel = False self.device_map = None self.gradient_checkpointing = False self._attn_implementation = config._attn_implementation # Initialize weights and apply final processing self.post_init() def parallelize(self, device_map=None): # Check validity of device_map warnings.warn( "`GPT2Model.parallelize` is deprecated and will be removed in v5 of Transformers, you should load your" " model with `device_map='balanced'` in the call to `from_pretrained`. You can also provide your own" " `device_map` but it needs to be a dictionary module_name to device, so for instance {'h.0': 0, 'h.1': 1," " ...}", FutureWarning, ) self.device_map = ( get_device_map(len(self.h), range(torch.cuda.device_count())) if device_map is None else device_map ) assert_device_map(self.device_map, len(self.h)) self.model_parallel = True self.first_device = "cpu" if "cpu" in self.device_map.keys() else "cuda:" + str(min(self.device_map.keys())) self.last_device = "cuda:" + str(max(self.device_map.keys())) self.wte = self.wte.to(self.first_device) self.wpe = self.wpe.to(self.first_device) # Load onto devices for k, v in self.device_map.items(): for block in v: cuda_device = "cuda:" + str(k) self.h[block] = self.h[block].to(cuda_device) # ln_f to last self.ln_f = self.ln_f.to(self.last_device) def deparallelize(self): warnings.warn( "Like `parallelize`, `deparallelize` is deprecated and will be removed in v5 of Transformers.", FutureWarning, ) self.model_parallel = False self.device_map = None self.first_device = "cpu" self.last_device = "cpu" self.wte = self.wte.to("cpu") self.wpe = self.wpe.to("cpu") for index in range(len(self.h)): self.h[index] = self.h[index].to("cpu") self.ln_f = self.ln_f.to("cpu") torch.cuda.empty_cache() def get_input_embeddings(self): return self.wte def set_input_embeddings(self, new_embeddings): self.wte = new_embeddings def _prune_heads(self, heads_to_prune): """ Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} """ for layer, heads in heads_to_prune.items(): self.h[layer].attn.prune_heads(heads) def forward( self, input_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None, attention_mask: Optional[torch.FloatTensor] = None, token_type_ids: Optional[torch.LongTensor] = None, position_ids: Optional[torch.LongTensor] = None, head_mask: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, BaseModelOutputWithPastAndCrossAttentions]: output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache return_dict = return_dict if return_dict is not None else self.config.use_return_dict if input_ids is not None and inputs_embeds is not None: raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") elif input_ids is not None: self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask) input_shape = input_ids.size() input_ids = input_ids.view(-1, input_shape[-1]) batch_size = input_ids.shape[0] elif inputs_embeds is not None: input_shape = inputs_embeds.size()[:-1] batch_size = inputs_embeds.shape[0] else: raise ValueError("You have to specify either input_ids or inputs_embeds") device = input_ids.device if input_ids is not None else inputs_embeds.device if token_type_ids is not None: token_type_ids = token_type_ids.view(-1, input_shape[-1]) if past_key_values is None: past_length = 0 past_key_values = tuple([None] * len(self.h)) else: past_length = past_key_values[0][0].size(-2) if position_ids is None: position_ids = torch.arange(past_length, input_shape[-1] + past_length, dtype=torch.long, device=device) position_ids = position_ids.unsqueeze(0) position_ids = position_ids[:, :self.config.max_position_embeddings] #TODO: remember if inputs_embeds is None: inputs_embeds = self.wte(input_ids) position_embeds = self.wpe(position_ids) ###TODO: correctly initialized if inputs_embeds.shape[1] != position_embeds.shape[1]: hidden_states = torch.empty((batch_size, input_shape[-1], self.embed_dim), device=device) hidden_states[:, ::2] = inputs_embeds[:, ::2] + position_embeds.to(inputs_embeds.device) hidden_states[:, 1::2] = inputs_embeds[:, 1::2] + position_embeds[:, :self.config.max_position_embeddings-1].to(inputs_embeds.device) else: hidden_states = inputs_embeds + position_embeds # Attention mask. _use_sdpa = self._attn_implementation == "sdpa" and output_attentions is False and head_mask is None attention_mask = attention_mask.view(batch_size, -1) if attention_mask is not None else None if self._attn_implementation == "flash_attention_2": attention_mask = attention_mask if (attention_mask is not None and 0 in attention_mask) else None elif _use_sdpa: attention_mask = _prepare_4d_causal_attention_mask_for_sdpa( attention_mask=attention_mask, input_shape=(batch_size, input_shape[-1]), inputs_embeds=inputs_embeds, past_key_values_length=past_length, ) else: if attention_mask is not None: # We create a 3D attention mask from a 2D tensor mask. # Sizes are [batch_size, 1, 1, to_seq_length] # So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] # this attention mask is more simple than the triangular masking of causal attention # used in OpenAI GPT, we just need to prepare the broadcast dimension here. attention_mask = attention_mask[:, None, None, :] # Since attention_mask is 1.0 for positions we want to attend and 0.0 for # masked positions, this operation will create a tensor which is 0.0 for # positions we want to attend and the dtype's smallest value for masked positions. # Since we are adding it to the raw scores before the softmax, this is # effectively the same as removing these entirely. attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min # If a 2D or 3D attention mask is provided for the cross-attention # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] if self.config.add_cross_attention and encoder_hidden_states is not None: encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) if encoder_attention_mask is None: encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) if _use_sdpa: encoder_attention_mask = _prepare_4d_attention_mask_for_sdpa( mask=encoder_attention_mask, dtype=inputs_embeds.dtype, tgt_len=input_shape[-1] ) elif not self._attn_implementation == "flash_attention_2": encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask) else: encoder_attention_mask = None # Prepare head mask if needed # 1.0 in head_mask indicate we keep the head # attention_probs has shape bsz x n_heads x N x N # head_mask has shape n_layer x batch x n_heads x N x N head_mask = self.get_head_mask(head_mask, self.config.n_layer) if token_type_ids is not None: token_type_embeds = self.wte(token_type_ids) hidden_states = hidden_states + token_type_embeds hidden_states = self.drop(hidden_states) output_shape = (-1,) + input_shape[1:] + (hidden_states.size(-1),) if self.gradient_checkpointing and self.training: if use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." ) use_cache = False presents = () if use_cache else None all_self_attentions = () if output_attentions else None all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None all_hidden_states = () if output_hidden_states else None for i in range(len(self.h)): block, layer_past = self.h[i], past_key_values[i] # Model parallel if self.model_parallel: torch.cuda.set_device(hidden_states.device) # Ensure layer_past is on same device as hidden_states (might not be correct) if layer_past is not None: layer_past = tuple(past_state.to(hidden_states.device) for past_state in layer_past) # Ensure that attention_mask is always on the same device as hidden_states if attention_mask is not None: attention_mask = attention_mask.to(hidden_states.device) if isinstance(head_mask, torch.Tensor): head_mask = head_mask.to(hidden_states.device) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if self.gradient_checkpointing and self.training: outputs = self._gradient_checkpointing_func( block.__call__, hidden_states, None, attention_mask, head_mask[i], encoder_hidden_states, encoder_attention_mask, use_cache, output_attentions, ) else: outputs = block( hidden_states, layer_past=layer_past, attention_mask=attention_mask, head_mask=head_mask[i], encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, use_cache=use_cache, output_attentions=output_attentions, ) hidden_states = outputs[0] if use_cache is True: presents = presents + (outputs[1],) if output_attentions: all_self_attentions = all_self_attentions + (outputs[2 if use_cache else 1],) if self.config.add_cross_attention: all_cross_attentions = all_cross_attentions + (outputs[3 if use_cache else 2],) # Model Parallel: If it's the last layer for that device, put things on the next device if self.model_parallel: for k, v in self.device_map.items(): if i == v[-1] and "cuda:" + str(k) != self.last_device: hidden_states = hidden_states.to("cuda:" + str(k + 1)) hidden_states = self.ln_f(hidden_states) hidden_states = hidden_states.view(output_shape) # Add last hidden state if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if not return_dict: return tuple( v for v in [hidden_states, presents, all_hidden_states, all_self_attentions, all_cross_attentions] if v is not None ) return BaseModelOutputWithPastAndCrossAttentions( last_hidden_state=hidden_states, past_key_values=presents, hidden_states=all_hidden_states, attentions=all_self_attentions, cross_attentions=all_cross_attentions, ) class DuoPredictGPT2LMHeadModel(DuoPredictGPT2PretrainedModel, GenerationMixin): _tied_weights_keys = ["lm_head.weight"] def __init__(self, config): super().__init__(config) self.transformer = DuoPredictGPT2Model(config) self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # Model parallel self.model_parallel = False self.device_map = None # Initialize weights and apply final processing self.post_init() def parallelize(self, device_map=None): warnings.warn( "`GPT2LMHeadModel.parallelize` is deprecated and will be removed in v5 of Transformers, you should load" " your model with `device_map='balanced'` in the call to `from_pretrained`. You can also provide your own" " `device_map` but it needs to be a dictionary module_name to device, so for instance {'transformer.h.0':" " 0, 'transformer.h.1': 1, ...}", FutureWarning, ) self.device_map = ( get_device_map(len(self.transformer.h), range(torch.cuda.device_count())) if device_map is None else device_map ) assert_device_map(self.device_map, len(self.transformer.h)) self.transformer.parallelize(self.device_map) self.lm_head = self.lm_head.to(self.transformer.first_device) self.model_parallel = True def deparallelize(self): warnings.warn( "Like `parallelize`, `deparallelize` is deprecated and will be removed in v5 of Transformers.", FutureWarning, ) self.transformer.deparallelize() self.transformer = self.transformer.to("cpu") self.lm_head = self.lm_head.to("cpu") self.model_parallel = False torch.cuda.empty_cache() def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def forward( self, input_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None, attention_mask: Optional[torch.FloatTensor] = None, token_type_ids: Optional[torch.LongTensor] = None, position_ids: Optional[torch.LongTensor] = None, head_mask: Optional[torch.FloatTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.Tensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs, ) -> Union[Tuple, CausalLMOutputWithCrossAttentions]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for language modeling. Note that the labels **are shifted** inside the model, i.e. you can set `labels = input_ids` Indices are selected in `[-100, 0, ..., config.vocab_size]` All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict transformer_outputs = self.transformer( input_ids, past_key_values=past_key_values, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = transformer_outputs[0] # Set device for model parallelism if self.model_parallel: torch.cuda.set_device(self.transformer.first_device) hidden_states = hidden_states.to(self.lm_head.weight.device) lm_logits = self.lm_head(hidden_states) loss = None bs, seq = lm_logits.shape[:2] if labels is not None: if seq>labels.shape[1]: # Flatten the tokens total_labels = torch.full((bs, seq-1), -100, dtype=input_ids.dtype, device=input_ids.device) total_labels[:, :-1:2] = labels[:, 1: ] total_labels[:, 1::2] = labels[:, :-1] else: total_labels = labels[:, 1:] loss = self.loss_function( lm_logits[:, :-1], total_labels, vocab_size=self.config.vocab_size, **kwargs, ) if not return_dict: output = (lm_logits,) + transformer_outputs[1:] return ((loss,) + output) if loss is not None else output return CausalLMOutputWithCrossAttentions( loss=loss, logits=lm_logits, past_key_values=transformer_outputs.past_key_values, hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, cross_attentions=transformer_outputs.cross_attentions, ) @staticmethod def _reorder_cache( past_key_values: Tuple[Tuple[torch.Tensor]], beam_idx: torch.Tensor ) -> Tuple[Tuple[torch.Tensor]]: """ This function is used to re-order the `past_key_values` cache if [`~PreTrainedModel.beam_search`] or [`~PreTrainedModel.beam_sample`] is called. This is required to match `past_key_values` with the correct beam_idx at every generation step. """ return tuple( tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past) for layer_past in past_key_values ) from transformers import AutoConfig, AutoModel AutoConfig.register("duo-predict-gpt2", DuoPredictGPT2Config) AutoModel.register(DuoPredictGPT2Config, DuoPredictGPT2LMHeadModel) __all__ = [ "DuoPredictGPT2LMHeadModel", "DuoPredictGPT2Model", "DuoPredictGPT2Config", "DuoPredictGPT2Attention", "DuoPredictGPT2MLP", "DuoPredictGPT2Block", ] if __name__ == "__main__": cg = DuoPredictGPT2Config() model = DuoPredictGPT2LMHeadModel(cg) from src.utils.model_utlis import print_trainable_parameters print_trainable_parameters(model) model.eval() model(torch.randint(0, 10000, (1, 100))) print()