Spaces:
Runtime error
Runtime error
import time | |
import datetime | |
import torch | |
import numpy as np | |
import tqdm | |
import random | |
from torch import nn | |
from transformers import RobertaTokenizer, RobertaModel, AdamW, RobertaConfig | |
from sklearn.model_selection import train_test_split | |
from transformers import BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup, AutoModel, AutoTokenizer | |
from torch.utils.data import TensorDataset, random_split, DataLoader, RandomSampler, SequentialSampler | |
class BERTClassifier(): | |
def __init__(self, model_name="bert-base-uncased", tokenizer_name="bert-base-uncased") -> None: | |
print(f'Loading BERT:{model_name}...') | |
self.model_name = model_name | |
# self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) | |
self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name, do_lower_case=True) | |
if model_name.startswith('jeevavijay10'): | |
# self.model = torch.load(model_name) | |
self.model = BertForSequenceClassification.from_pretrained(model_name) | |
else: | |
self.model = BertForSequenceClassification.from_pretrained( | |
self.model_name, | |
num_labels=14, | |
output_attentions=False, | |
output_hidden_states=False | |
) | |
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
self.model.to(self.device) | |
def tokenizeText(self, sentence: str): | |
# return self.tokenizer.encode(sentence, add_special_tokens=True) | |
encoded_dict = self.tokenizer.encode_plus( | |
sentence, | |
add_special_tokens=True, | |
max_length=64, | |
pad_to_max_length=True, | |
return_attention_mask=True, | |
return_tensors='pt') | |
return encoded_dict['input_ids'], encoded_dict['attention_mask'] | |
def tokenizeSentences(self, sentences: list, labels: list): | |
input_ids = [] | |
attention_masks = [] | |
for sent in sentences: | |
input_id, attention_mask = self.tokenizeText(sent) | |
input_ids.append(input_id) | |
attention_masks.append(attention_mask) | |
input_ids = torch.cat(input_ids, dim=0) | |
attention_masks = torch.cat(attention_masks, dim=0) | |
dataset = TensorDataset(input_ids, attention_masks, labels) | |
train_size = int(0.9 * len(dataset)) | |
val_size = len(dataset) - train_size | |
return random_split(dataset, [train_size, val_size]) | |
def flat_accuracy(self, preds, labels): | |
pred_flat = np.argmax(preds, axis=1).flatten() | |
labels_flat = labels.flatten() | |
return np.sum(pred_flat == labels_flat) / len(labels_flat) | |
def format_time(self, elapsed): | |
# Round to the nearest second. | |
elapsed_rounded = int(round((elapsed))) | |
# Format as hh:mm:ss | |
return str(datetime.timedelta(seconds=elapsed_rounded)) | |
def trainModel(self, sentences: list, labels: list, epochs=4, batch_size=32): | |
optimizer = AdamW(self.model.parameters(), lr=2e-5, eps=1e-8) | |
train_dataset, val_dataset = self.tokenizeSentences(sentences, labels) | |
train_dataloader = DataLoader( | |
train_dataset, | |
sampler=RandomSampler(train_dataset), | |
batch_size=batch_size | |
) | |
validation_dataloader = DataLoader( | |
val_dataset, | |
sampler=SequentialSampler(val_dataset), | |
batch_size=batch_size | |
) | |
total_steps = len(train_dataloader) * epochs | |
# Create the learning rate scheduler. | |
scheduler = get_linear_schedule_with_warmup(optimizer, | |
num_warmup_steps=0, # Default value in run_glue.py | |
num_training_steps=total_steps) | |
self.train(train_dataloader, optimizer, scheduler, epochs) | |
torch.save(self.model, f"Bert_GoEmotions_BS{batch_size}_E{epochs}.model") | |
def train(self, train_dataloader, optimizer, scheduler, epochs): | |
# This training code is based on the `run_glue.py` script here: | |
# https://github.com/huggingface/transformers/blob/5bfcd0485ece086ebcbed2d008813037968a9e58/examples/run_glue.py#L128 | |
# Measure the total training time for the whole run. | |
total_t0 = time.time() | |
# For each epoch... | |
for epoch_i in range(epochs): | |
print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs)) | |
print('Training...') | |
# Measure how long the training epoch takes. | |
t0 = time.time() | |
# Reset the total loss for this epoch. | |
total_train_loss = 0 | |
# Put the model into training mode. Don't be mislead--the call to | |
# `train` just changes the *mode*, it doesn't *perform* the training. | |
# `dropout` and `batchnorm` layers behave differently during training | |
# vs. test (source: https://stackoverflow.com/questions/51433378/what-does-model-train-do-in-pytorch) | |
self.model.train() | |
# For each batch of training data... | |
for step, batch in enumerate(train_dataloader): | |
# Progress update every 40 batches. | |
if step % 40 == 0 and step != 0: | |
# Calculate elapsed time in minutes. | |
elapsed = self.format_time(time.time() - t0) | |
# Report progress. | |
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed)) | |
# Unpack this training batch from our dataloader. | |
# | |
# As we unpack the batch, we'll also copy each tensor to the GPU using the | |
# `to` method. | |
# | |
# `batch` contains three pytorch tensors: | |
# [0]: input ids | |
# [1]: attention masks | |
# [2]: labels | |
b_input_ids = batch[0].to(self.device) | |
b_input_mask = batch[1].to(self.device) | |
b_labels = batch[2].to(self.device) | |
# Always clear any previously calculated gradients before performing a | |
# backward pass. PyTorch doesn't do this automatically because | |
# accumulating the gradients is "convenient while training RNNs". | |
# (source: https://stackoverflow.com/questions/48001598/why-do-we-need-to-call-zero-grad-in-pytorch) | |
self.model.zero_grad() | |
# Perform a forward pass (evaluate the model on this training batch). | |
# The documentation for this `model` function is here: | |
# https://huggingface.co/transformers/v2.2.0/model_doc/bert.html#transformers.BertForSequenceClassification | |
# It returns different numbers of parameters depending on what arguments | |
# arge given and what flags are set. For our useage here, it returns | |
# the loss (because we provided labels) and the "logits"--the model | |
# outputs prior to activation. | |
output = self.model(b_input_ids, | |
token_type_ids=None, | |
attention_mask=b_input_mask, | |
labels=b_labels) | |
loss = output.loss | |
logits = output.logits | |
# Accumulate the training loss over all of the batches so that we can | |
# calculate the average loss at the end. `loss` is a Tensor containing a | |
# single value; the `.item()` function just returns the Python value | |
# from the tensor. | |
total_train_loss += loss.item() | |
# Perform a backward pass to calculate the gradients. | |
loss.backward() | |
# Clip the norm of the gradients to 1.0. | |
# This is to help prevent the "exploding gradients" problem. | |
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) | |
# Update parameters and take a step using the computed gradient. | |
# The optimizer dictates the "update rule"--how the parameters are | |
# modified based on their gradients, the learning rate, etc. | |
optimizer.step() | |
# Update the learning rate. | |
scheduler.step() | |
# Calculate the average loss over all of the batches. | |
avg_train_loss = total_train_loss / len(train_dataloader) | |
# Measure how long this epoch took. | |
training_time = self.format_time(time.time() - t0) | |
print("") | |
print(" Average training loss: {0:.2f}".format(avg_train_loss)) | |
print(" Training epoch took: {:}".format(training_time)) | |
print("") | |
print("Training complete!") | |
print("Total training took {:} (h:mm:ss)".format(self.format_time(time.time()-total_t0))) | |
def evaluate(self, sentences:list): | |
input_ids = [] | |
attention_masks = [] | |
for sent in sentences: | |
input_id, attention_mask = self.tokenizeText(sent) | |
input_ids.append(input_id) | |
attention_masks.append(attention_mask) | |
input_ids = torch.cat(input_ids, dim=0) | |
attention_masks = torch.cat(attention_masks, dim=0) | |
labels = torch.zeros(len(sentences)) | |
batch_size = 32 | |
prediction_data = TensorDataset(input_ids, attention_masks, labels) | |
prediction_sampler = SequentialSampler(prediction_data) | |
prediction_dataloader = DataLoader(prediction_data, sampler=prediction_sampler, batch_size=batch_size) | |
self.model.eval() | |
predictions = [] | |
for batch in prediction_dataloader: | |
batch = tuple(t.to(self.device) for t in batch) | |
b_input_ids, b_input_mask, _ = batch | |
with torch.no_grad(): | |
outputs = self.model(b_input_ids, token_type_ids=None, | |
attention_mask=b_input_mask) | |
logits = outputs[0] | |
logits = logits.detach().cpu().numpy() | |
predictions.append(logits) | |
# print(predictions) | |
return [predictions[0][i].argmax() for i, x in enumerate(sentences)] | |