Spaces:
Paused
Paused
from constants import * | |
import torch | |
import torch.nn.functional as F | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
def tokenizer_image_token(prompt, tokenizer, image_token_index=IMAGE_TOKEN_INDEX, return_tensors=None): | |
prompt_chunks = [tokenizer(chunk).input_ids for chunk in prompt.split('<image>')] | |
def insert_separator(X, sep): | |
return [ele for sublist in zip(X, [sep]*len(X)) for ele in sublist][:-1] | |
input_ids = [] | |
offset = 0 | |
if len(prompt_chunks) > 0 and len(prompt_chunks[0]) > 0 and prompt_chunks[0][0] == tokenizer.bos_token_id: | |
offset = 1 | |
input_ids.append(prompt_chunks[0][0]) | |
for x in insert_separator(prompt_chunks, [image_token_index] * (offset + 1)): | |
input_ids.extend(x[offset:]) | |
if return_tensors is not None: | |
if return_tensors == 'pt': | |
return torch.tensor(input_ids, dtype=torch.long) | |
raise ValueError(f'Unsupported tensor type: {return_tensors}') | |
return input_ids | |
def top_k_top_p_filtering(logits, top_k=0, top_p=0.0, filter_value=-float('Inf')): | |
""" Filter a distribution of logits using top-k and/or nucleus (top-p) filtering | |
Args: | |
logits: logits distribution shape (batch size x vocabulary size) | |
top_k > 0: keep only top k tokens with highest probability (top-k filtering). | |
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). | |
""" | |
top_k = min(top_k, logits.size(-1)) # Safety check | |
if top_k > 0: | |
# Remove all tokens with a probability less than the last token of the top-k | |
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] | |
logits[indices_to_remove] = filter_value | |
if top_p > 0.0: | |
sorted_logits, sorted_indices = torch.sort(logits, descending=True) | |
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) | |
# Remove tokens with cumulative probability above the threshold | |
sorted_indices_to_remove = cumulative_probs > top_p | |
# Shift the indices to the right to keep also the first token above the threshold | |
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
sorted_indices_to_remove[..., 0] = 0 | |
indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
logits[indices_to_remove] = filter_value | |
return logits | |
''' | |
def get_image_feature_for_vision_projector(image_url): | |
image_url = 'http://images.cocodataset.org/%s/%s' % (self.directory, self.image_indices_json[image_index]) | |
image = Image.open(requests.get(image_url, stream=True).raw) | |
inputs = self.processor(images=image, return_tensors="pt") | |
x = self.model(**inputs, output_hidden_states=True) | |
image_feature = x.hidden_states[-2][:, 1:].squeeze(0).cpu().detach() | |
''' | |
def generate_output(model, tokenizer, length, input_ids=None, image_features=None, inputs_embeds=None, labels=None, | |
temperature=1, top_k=0, top_p=0.0): | |
if inputs_embeds is None and (image_features is None or input_ids is None): | |
print("image_features or input_ids missing.. returning") | |
return | |
ie_size = inputs_embeds.size(1) - 1 | |
inputs = inputs_embeds | |
predicted_tokens = [] #torch.tensor([[]]).to(device) | |
label_size = labels.size(1) | |
out = {} | |
if labels is None: | |
with torch.no_grad(): | |
for idx in range(length): | |
outputs = model.phi_model(inputs_embeds=inputs) | |
logits = outputs['logits'] | |
next_token_logits = logits[:, -1, :] / temperature # Apply temperature | |
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, | |
top_p=top_p) # Apply top-k and/or top-p | |
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample | |
predicted_tokens.append(next_token) | |
next_token_embed = model.text_embedding(next_token) | |
inputs = torch.cat((inputs, next_token_embed), dim=1) | |
predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1) | |
out['pred'] = predicted_tokens | |
out['logits'] = logits[:, ie_size:, :] | |
return out | |
else: | |
# traverse_len = labels.size(1) - inputs_embeds.size(1) | |
for idx in range(length): | |
outputs = model.phi_model(inputs_embeds=inputs) | |
logits = outputs['logits'] | |
next_token_logits = logits[:, -1, :] / temperature # Apply temperature | |
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, | |
top_p=top_p) # Apply top-k and/or top-p | |
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample | |
predicted_tokens.append(next_token) | |
tf_token = labels[:, idx : idx+1 ].to(device) | |
tf_token_embed = model.text_embedding(tf_token) | |
inputs = torch.cat((inputs, tf_token_embed), dim=1) # Add the token to the generated text | |
predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1).to(device) | |
#predicted_token_logits = torch.cat([x.unsqueeze(1) for x in predicted_token_logits], dim=1).to(device) | |
out = dict(pred=predicted_tokens, | |
logits=logits) | |
labels = labels.contiguous().type(torch.LongTensor).to(device) | |
logits = logits[:, ie_size:ie_size+label_size, :].contiguous() | |
loss = model.loss(logits.view(-1, logits.size(-1)), labels.view(-1)) | |
out['loss'] = loss | |
#model.train() | |
return out | |
def generate_with_logits(logits, temperature=1, top_k=0, top_p=0.0): | |
predicted_tokens = [] | |
for idx in range(logits.size(1)): | |
next_token_logits = logits[:, idx, :] / temperature # Apply temperature | |
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, | |
top_p=top_p) # Apply top-k and/or top-p | |
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) # Sample | |
predicted_tokens.append(next_token) | |
predicted_tokens = torch.cat([x.unsqueeze(1) for x in predicted_tokens], dim=1).to(device) | |
out = dict(pred=predicted_tokens, | |
logits=logits) | |
return out |