from constants import * import torch import torch.nn.functional as F device = 'cuda' if torch.cuda.is_available() else 'cpu' def tokenizer_image_token(prompt, tokenizer, image_token_index=IMAGE_TOKEN_INDEX, return_tensors=None): prompt_chunks = [tokenizer(chunk).input_ids for chunk in prompt.split('')] def insert_separator(X, sep): return [ele for sublist in zip(X, [sep]*len(X)) for ele in sublist][:-1] input_ids = [] offset = 0 if len(prompt_chunks) > 0 and len(prompt_chunks[0]) > 0 and prompt_chunks[0][0] == tokenizer.bos_token_id: offset = 1 input_ids.append(prompt_chunks[0][0]) for x in insert_separator(prompt_chunks, [image_token_index] * (offset + 1)): input_ids.extend(x[offset:]) if return_tensors is not None: if return_tensors == 'pt': return torch.tensor(input_ids, dtype=torch.long) raise ValueError(f'Unsupported tensor type: {return_tensors}') return input_ids def top_k_top_p_filtering(logits, top_k=0, top_p=0.0, filter_value=-float('Inf')): """ Filter a distribution of logits using top-k and/or nucleus (top-p) filtering Args: logits: logits distribution shape (batch size x vocabulary size) top_k > 0: keep only top k tokens with highest probability (top-k filtering). top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). """ top_k = min(top_k, logits.size(-1)) # Safety check if top_k > 0: # Remove all tokens with a probability less than the last token of the top-k indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] logits[indices_to_remove] = filter_value if top_p > 0.0: sorted_logits, sorted_indices = torch.sort(logits, descending=True) cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) # Remove tokens with cumulative probability above the threshold sorted_indices_to_remove = cumulative_probs > top_p # Shift the indices to the right to keep also the first token above the threshold sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 indices_to_remove = sorted_indices[sorted_indices_to_remove] logits[indices_to_remove] = filter_value return logits ''' def get_image_feature_for_vision_projector(image_url): image_url = 'http://images.cocodataset.org/%s/%s' % (self.directory, self.image_indices_json[image_index]) image = Image.open(requests.get(image_url, stream=True).raw) inputs = self.processor(images=image, return_tensors="pt") x = self.model(**inputs, output_hidden_states=True) image_feature = x.hidden_states[-2][:, 1:].squeeze(0).cpu().detach() ''' def generate_output(model, tokenizer, length, input_ids=None, image_features=None, inputs_embeds=None, labels=None, temperature=1, top_k=0, top_p=0.0): if inputs_embeds is None and (image_features is None or input_ids is None): print("image_features or input_ids missing.. returning") return ie_size = inputs_embeds.size(1) - 1 inputs = inputs_embeds predicted_tokens = [] #torch.tensor([[]]).to(device) label_size = labels.size(1) out = {} if labels is None: with torch.no_grad(): for idx in range(length): outputs = model.phi_model(inputs_embeds=inputs) logits = outputs['logits'] next_token_logits = logits[:, -1, :] / temperature # Apply temperature filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) # Apply top-k and/or top-p next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample predicted_tokens.append(next_token) next_token_embed = model.text_embedding(next_token) inputs = torch.cat((inputs, next_token_embed), dim=1) predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1) out['pred'] = predicted_tokens out['logits'] = logits[:, ie_size:, :] return out else: # traverse_len = labels.size(1) - inputs_embeds.size(1) for idx in range(length): outputs = model.phi_model(inputs_embeds=inputs) logits = outputs['logits'] next_token_logits = logits[:, -1, :] / temperature # Apply temperature filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) # Apply top-k and/or top-p next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample predicted_tokens.append(next_token) tf_token = labels[:, idx : idx+1 ].to(device) tf_token_embed = model.text_embedding(tf_token) inputs = torch.cat((inputs, tf_token_embed), dim=1) # Add the token to the generated text predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1).to(device) #predicted_token_logits = torch.cat([x.unsqueeze(1) for x in predicted_token_logits], dim=1).to(device) out = dict(pred=predicted_tokens, logits=logits) labels = labels.contiguous().type(torch.LongTensor).to(device) logits = logits[:, ie_size:ie_size+label_size, :].contiguous() loss = model.loss(logits.view(-1, logits.size(-1)), labels.view(-1)) out['loss'] = loss #model.train() return out def generate_with_logits(logits, temperature=1, top_k=0, top_p=0.0): predicted_tokens = [] for idx in range(logits.size(1)): next_token_logits = logits[:, idx, :] / temperature # Apply temperature filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) # Apply top-k and/or top-p next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample predicted_tokens.append(next_token) predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1).to(device) out = dict(pred=predicted_tokens, logits=logits) return out