pmkhanh7890's picture
1st
22e1b62
raw
history blame
4.46 kB
# %%
import traceback
from typing import List
import numpy as np
import torch
import torch.nn as nn
from transformers import (
BartForConditionalGeneration,
BartTokenizer,
)
class BARTScorer:
def __init__(
self,
device="cuda:0",
max_length=1024,
checkpoint="facebook/bart-large-cnn",
):
# Set up model
self.device = device
self.max_length = max_length
self.tokenizer = BartTokenizer.from_pretrained(checkpoint)
self.model = BartForConditionalGeneration.from_pretrained(checkpoint)
self.model.eval()
self.model.to(device)
# Set up loss
self.loss_fct = nn.NLLLoss(
reduction="none",
ignore_index=self.model.config.pad_token_id,
)
self.lsm = nn.LogSoftmax(dim=1)
def load(self, path=None):
"""Load model from paraphrase finetuning"""
if path is None:
path = "./bart.pth"
self.model.load_state_dict(torch.load(path, map_location=self.device))
def score(self, srcs, tgts, batch_size=16):
"""Score a batch of examples"""
score_list = []
for i in range(0, len(srcs), batch_size):
src_list = srcs[i : i + batch_size]
tgt_list = tgts[i : i + batch_size]
try:
with torch.no_grad():
encoded_src = self.tokenizer(
src_list,
max_length=self.max_length,
truncation=True,
padding=True,
return_tensors="pt",
)
encoded_tgt = self.tokenizer(
tgt_list,
max_length=self.max_length,
truncation=True,
padding=True,
return_tensors="pt",
)
src_tokens = encoded_src["input_ids"].to(self.device)
src_mask = encoded_src["attention_mask"].to(self.device)
tgt_tokens = encoded_tgt["input_ids"].to(self.device)
tgt_mask = encoded_tgt["attention_mask"]
tgt_len = tgt_mask.sum(dim=1).to(self.device)
output = self.model(
input_ids=src_tokens,
attention_mask=src_mask,
labels=tgt_tokens,
)
logits = output.logits.view(
-1,
self.model.config.vocab_size,
)
loss = self.loss_fct(self.lsm(logits), tgt_tokens.view(-1))
loss = loss.view(tgt_tokens.shape[0], -1)
loss = loss.sum(dim=1) / tgt_len
curr_score_list = [-x.item() for x in loss]
score_list += curr_score_list
except RuntimeError:
traceback.print_exc()
print(f"source: {src_list}")
print(f"target: {tgt_list}")
exit(0)
return score_list
def multi_ref_score(
self,
srcs,
tgts: List[List[str]],
agg="mean",
batch_size=4,
):
# Assert we have the same number of references
ref_nums = [len(x) for x in tgts]
if len(set(ref_nums)) > 1:
raise Exception(
"You have different number of references per test sample.",
)
ref_num = len(tgts[0])
score_matrix = []
for i in range(ref_num):
curr_tgts = [x[i] for x in tgts]
scores = self.score(srcs, curr_tgts, batch_size)
score_matrix.append(scores)
if agg == "mean":
score_list = np.mean(score_matrix, axis=0)
elif agg == "max":
score_list = np.max(score_matrix, axis=0)
else:
raise NotImplementedError
return list(score_list)
def test(self, batch_size=3):
"""Test"""
src_list = [
"This is a very good idea. Although simple, but very insightful.",
"Can I take a look?",
"Do not trust him, he is a liar.",
]
tgt_list = [
"That's stupid.",
"What's the problem?",
"He is trustworthy.",
]
print(self.score(src_list, tgt_list, batch_size))