|
import evaluate
|
|
import numpy as np
|
|
from uvicorn.config import logger
|
|
from datasets import load_dataset
|
|
from transformers import (
|
|
AutoModelForSequenceClassification,
|
|
AutoTokenizer,
|
|
Trainer,
|
|
TrainingArguments,
|
|
pipeline,
|
|
)
|
|
from huggingface_hub import login, logout
|
|
from scipy.special import softmax
|
|
|
|
import os
|
|
import mlflow
|
|
from tasks.inference import infer_task
|
|
from config import is_test_mode
|
|
import time
|
|
|
|
|
|
|
|
DATASET = "zeroshot/twitter-financial-news-sentiment"
|
|
MODEL = "gpicciuca/sentiment_trainer"
|
|
HF_REPO = "gpicciuca/sentiment_trainer"
|
|
|
|
RNG_SEED = 22
|
|
|
|
class TrainingTask:
|
|
|
|
"""
|
|
Implements a sequence of actions to control the training phase of the model.
|
|
The class implements a callable overload method which initializes the old model,
|
|
loads and prepares datasets and proceeds with the training.
|
|
Upon completion, the new model will be uploaded to the HuggingFace repo only
|
|
if its accuracy did not drop compared to the old model.
|
|
|
|
This class is managed via singleton so that there may only be one
|
|
instance at any time, unless manually allocated.
|
|
"""
|
|
|
|
TRAINING_TASK_INST_SINGLETON = None
|
|
|
|
def __init__(self):
|
|
self.__is_done = False
|
|
self.__has_error = False
|
|
|
|
self.__train_dataset = None
|
|
self.__test_dataset = None
|
|
self.__tokenizer = None
|
|
self.__train_tokenized = None
|
|
self.__test_tokenized = None
|
|
self.__model = None
|
|
self.__trainer = None
|
|
self.__run_id = None
|
|
|
|
self.__old_accuracy = 0.0
|
|
|
|
@staticmethod
|
|
def has_instance():
|
|
"""
|
|
Checks if a global singleton instance is available
|
|
|
|
Returns:
|
|
bool: True if instance available, false otherwise
|
|
"""
|
|
return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None
|
|
|
|
@staticmethod
|
|
def get_instance():
|
|
"""
|
|
Returns the globally allocated singleton instance.
|
|
Instance will be allocated with this method if none was previously
|
|
allocated yet.
|
|
|
|
Returns:
|
|
TrainingTask: Singleton instance
|
|
"""
|
|
if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
|
|
TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
|
|
|
|
return TrainingTask.TRAINING_TASK_INST_SINGLETON
|
|
|
|
@staticmethod
|
|
def clear_instance():
|
|
"""
|
|
Destroys the global instance
|
|
"""
|
|
del TrainingTask.TRAINING_TASK_INST_SINGLETON
|
|
TrainingTask.TRAINING_TASK_INST_SINGLETON = None
|
|
|
|
def has_error(self):
|
|
"""
|
|
Checks whether an error occurred during training.
|
|
|
|
Returns:
|
|
bool: True if an exception was raised, false otherwise
|
|
"""
|
|
return self.__has_error
|
|
|
|
def is_done(self):
|
|
"""
|
|
Checks whether the training is done.
|
|
|
|
Returns:
|
|
bool: True if done, false if still ongoing.
|
|
"""
|
|
return self.__is_done
|
|
|
|
def __call__(self, *args, **kwds):
|
|
"""
|
|
Callable overload for this class. Initiates the training sequence
|
|
for the existing model by loading it, loading and preparing datasets,
|
|
fine-tuning and comparing performance against old model over the test dataset.
|
|
"""
|
|
|
|
self.__has_error = False
|
|
self.__is_done = False
|
|
|
|
if is_test_mode():
|
|
|
|
self.__has_error = False
|
|
self.__is_done = True
|
|
return
|
|
|
|
login(token=os.environ["HF_ACCESS_TOKEN"])
|
|
|
|
try:
|
|
self.__load_datasets()
|
|
self.__tokenize()
|
|
self.__load_model()
|
|
self.__check_old_accuracy()
|
|
self.__train()
|
|
self.__evaluate()
|
|
self.__deploy()
|
|
except Exception as ex:
|
|
logger.error(f"Error during training: {ex}")
|
|
self.__has_error = True
|
|
finally:
|
|
self.__is_done = True
|
|
|
|
if self.has_error():
|
|
logger.error("Training did not complete and terminated with an error")
|
|
else:
|
|
logger.info("Training completed")
|
|
|
|
logout()
|
|
|
|
self.__reload_inference_model()
|
|
|
|
def __load_datasets(self, test_size_ratio=0.2):
|
|
"""
|
|
Loads and splits the dataset in train and test sets.
|
|
"""
|
|
assert (test_size_ratio > 0.0 and test_size_ratio < 1.0)
|
|
|
|
dataset = load_dataset(DATASET)
|
|
|
|
|
|
dataset_train_test = dataset["train"].train_test_split(test_size=test_size_ratio)
|
|
self.__train_dataset = dataset_train_test["train"]
|
|
self.__test_dataset = dataset_train_test["test"]
|
|
|
|
|
|
|
|
|
|
|
|
def label_filter(row):
|
|
row["label"] = { 0: 0, 1: 2, 2: 1 }[row["label"]]
|
|
return row
|
|
|
|
self.__train_dataset = self.__train_dataset.map(label_filter)
|
|
self.__test_dataset = self.__test_dataset.map(label_filter)
|
|
|
|
def __tokenize(self):
|
|
"""
|
|
Loads the tokenizer previously used in the pretrained model
|
|
and uses it to tokenize the datasets so that the input to the
|
|
model remains consistent with what it has seen in previous
|
|
trainings.
|
|
"""
|
|
|
|
self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
|
|
|
|
def tokenize_function(examples):
|
|
|
|
|
|
return self.__tokenizer(
|
|
examples["text"],
|
|
padding="max_length",
|
|
truncation=True,
|
|
max_length=256,
|
|
)
|
|
|
|
|
|
self.__train_tokenized = self.__train_dataset.map(tokenize_function)
|
|
self.__train_tokenized = self.__train_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
|
|
|
|
self.__test_tokenized = self.__test_dataset.map(tokenize_function)
|
|
self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
|
|
|
|
def __load_model(self):
|
|
"""
|
|
Loads the model from the repository
|
|
"""
|
|
|
|
id2label = {0: "negative", 1: "neutral", 2: "positive"}
|
|
label2id = {"negative": 0, "neutral": 1, "positive": 2}
|
|
|
|
|
|
self.__model = AutoModelForSequenceClassification.from_pretrained(
|
|
MODEL,
|
|
num_labels=3,
|
|
label2id=label2id,
|
|
id2label=id2label,
|
|
)
|
|
|
|
def __check_old_accuracy(self):
|
|
"""
|
|
Run a prediction with the old model on the tokenized test dataset
|
|
to evaluate the model's accuracy.
|
|
"""
|
|
trainer = Trainer(model=self.__model, tokenizer=self.__tokenizer)
|
|
output = trainer.predict(self.__test_tokenized)
|
|
|
|
|
|
logits = output.predictions
|
|
|
|
preds = np.argmax(logits, axis=1)
|
|
|
|
labels = output.label_ids
|
|
|
|
|
|
self.__old_accuracy = (preds == labels).mean()
|
|
logger.info(f"Old model accuracy: {self.__old_accuracy:.4f}")
|
|
|
|
def __train(self):
|
|
"""
|
|
Performs the training/fine-tuning of the loaded model using the
|
|
tokenized train and test datasets.
|
|
The training run will be logged on the MLFlow Dashboard.
|
|
Uses the 'accuracy' metric to evaluate performance.
|
|
"""
|
|
|
|
metric = evaluate.load("accuracy")
|
|
|
|
|
|
def compute_metrics(eval_pred):
|
|
logits, labels = eval_pred
|
|
predictions = np.argmax(logits, axis=-1)
|
|
return metric.compute(predictions=predictions, references=labels)
|
|
|
|
|
|
training_output_dir = "/tmp/sentiment_trainer"
|
|
training_args = TrainingArguments(
|
|
output_dir=training_output_dir,
|
|
eval_strategy="epoch",
|
|
per_device_train_batch_size=8,
|
|
per_device_eval_batch_size=8,
|
|
logging_steps=8,
|
|
num_train_epochs=10,
|
|
)
|
|
|
|
mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
|
|
mlflow.set_experiment("Sentiment Classifier Training")
|
|
|
|
with mlflow.start_run() as run:
|
|
self.__run_id = run.info.run_id
|
|
|
|
logger.info("Initializing trainer...")
|
|
self.__trainer = Trainer(
|
|
model=self.__model,
|
|
args=training_args,
|
|
train_dataset=self.__train_tokenized,
|
|
eval_dataset=self.__test_tokenized,
|
|
compute_metrics=compute_metrics,
|
|
)
|
|
logger.info("Trainer finished")
|
|
|
|
def __evaluate(self):
|
|
"""
|
|
Evaluates the fine-tuned model's performance by comparing the new
|
|
accuracy with the old one over the same test dataset.
|
|
"""
|
|
logger.info("Evaluating new model's performance")
|
|
|
|
with mlflow.start_run(run_id=self.__run_id):
|
|
output = self.__trainer.predict(self.__test_tokenized)
|
|
|
|
|
|
logits = output.predictions
|
|
|
|
preds = np.argmax(logits, axis=1)
|
|
|
|
labels = output.label_ids
|
|
|
|
|
|
new_accuracy = (preds == labels).mean()
|
|
mlflow.log_metrics({
|
|
"old_accuracy": self.__old_accuracy,
|
|
"new_accuracy": new_accuracy
|
|
}, step=int(time.time()))
|
|
|
|
if self.__old_accuracy > new_accuracy:
|
|
raise Exception(f"New trained model's accuracy dropped {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
|
|
else:
|
|
logger.info(f"New trained model's accuracy {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
|
|
|
|
def __deploy(self):
|
|
"""
|
|
Uploads the fine-tuned model to HuggingFace
|
|
"""
|
|
logger.info("Deploying Model and Tokenizer to HuggingFace")
|
|
self.__trainer.push_to_hub(HF_REPO)
|
|
self.__tokenizer.push_to_hub(HF_REPO)
|
|
|
|
def __reload_inference_model(self):
|
|
"""
|
|
Reloads the model used by the Inference class.
|
|
"""
|
|
logger.info("Reloading inference model")
|
|
infer_task.load_model()
|
|
|