ml_deploy / app /tasks /training.py
gpicciuca's picture
Fix logs and model training and deployment
eedbf0c
import evaluate
import numpy as np
from uvicorn.config import logger
from datasets import load_dataset
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
Trainer,
TrainingArguments,
pipeline,
)
from huggingface_hub import login, logout
from scipy.special import softmax
import os
import mlflow
from tasks.inference import infer_task
from config import is_test_mode
import time
# MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
DATASET = "zeroshot/twitter-financial-news-sentiment"
MODEL = "gpicciuca/sentiment_trainer"
HF_REPO = "gpicciuca/sentiment_trainer"
RNG_SEED = 22
class TrainingTask:
"""
Implements a sequence of actions to control the training phase of the model.
The class implements a callable overload method which initializes the old model,
loads and prepares datasets and proceeds with the training.
Upon completion, the new model will be uploaded to the HuggingFace repo only
if its accuracy did not drop compared to the old model.
This class is managed via singleton so that there may only be one
instance at any time, unless manually allocated.
"""
TRAINING_TASK_INST_SINGLETON = None
def __init__(self):
self.__is_done = False
self.__has_error = False
self.__train_dataset = None
self.__test_dataset = None
self.__tokenizer = None
self.__train_tokenized = None
self.__test_tokenized = None
self.__model = None
self.__trainer = None
self.__run_id = None
self.__old_accuracy = 0.0
@staticmethod
def has_instance():
"""
Checks if a global singleton instance is available
Returns:
bool: True if instance available, false otherwise
"""
return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None
@staticmethod
def get_instance():
"""
Returns the globally allocated singleton instance.
Instance will be allocated with this method if none was previously
allocated yet.
Returns:
TrainingTask: Singleton instance
"""
if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
return TrainingTask.TRAINING_TASK_INST_SINGLETON
@staticmethod
def clear_instance():
"""
Destroys the global instance
"""
del TrainingTask.TRAINING_TASK_INST_SINGLETON
TrainingTask.TRAINING_TASK_INST_SINGLETON = None
def has_error(self):
"""
Checks whether an error occurred during training.
Returns:
bool: True if an exception was raised, false otherwise
"""
return self.__has_error
def is_done(self):
"""
Checks whether the training is done.
Returns:
bool: True if done, false if still ongoing.
"""
return self.__is_done
def __call__(self, *args, **kwds):
"""
Callable overload for this class. Initiates the training sequence
for the existing model by loading it, loading and preparing datasets,
fine-tuning and comparing performance against old model over the test dataset.
"""
self.__has_error = False
self.__is_done = False
if is_test_mode():
# Simulate a successful training run in test mode
self.__has_error = False
self.__is_done = True
return
login(token=os.environ["HF_ACCESS_TOKEN"])
try:
self.__load_datasets()
self.__tokenize()
self.__load_model()
self.__check_old_accuracy()
self.__train()
self.__evaluate()
self.__deploy()
except Exception as ex:
logger.error(f"Error during training: {ex}")
self.__has_error = True
finally:
self.__is_done = True
if self.has_error():
logger.error("Training did not complete and terminated with an error")
else:
logger.info("Training completed")
logout()
self.__reload_inference_model()
def __load_datasets(self, test_size_ratio=0.2):
"""
Loads and splits the dataset in train and test sets.
"""
assert (test_size_ratio > 0.0 and test_size_ratio < 1.0)
dataset = load_dataset(DATASET)
# Split train/test by 'test_size_ratio'
dataset_train_test = dataset["train"].train_test_split(test_size=test_size_ratio)
self.__train_dataset = dataset_train_test["train"]
self.__test_dataset = dataset_train_test["test"]
# Swap labels so that they match what the model actually expects
# The model expects {0: negative, 1: neutral, 2: positive}
# But the dataset uses {0: negative, 1: positive, 2: neutral}
# So here we just flip 1<->2 to remain consistent
def label_filter(row):
row["label"] = { 0: 0, 1: 2, 2: 1 }[row["label"]]
return row
self.__train_dataset = self.__train_dataset.map(label_filter)
self.__test_dataset = self.__test_dataset.map(label_filter)
def __tokenize(self):
"""
Loads the tokenizer previously used in the pretrained model
and uses it to tokenize the datasets so that the input to the
model remains consistent with what it has seen in previous
trainings.
"""
# Load the tokenizer for the model.
self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
def tokenize_function(examples):
# Pad/truncate each text to 512 tokens. Enforcing the same shape
# could make the training faster.
return self.__tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=256,
)
# Tokenize the train and test datasets
self.__train_tokenized = self.__train_dataset.map(tokenize_function)
self.__train_tokenized = self.__train_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
self.__test_tokenized = self.__test_dataset.map(tokenize_function)
self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
def __load_model(self):
"""
Loads the model from the repository
"""
# Set the mapping between int label and its meaning.
id2label = {0: "negative", 1: "neutral", 2: "positive"}
label2id = {"negative": 0, "neutral": 1, "positive": 2}
# Acquire the model from the Hugging Face Hub, providing label and id mappings so that both we and the model can 'speak' the same language.
self.__model = AutoModelForSequenceClassification.from_pretrained(
MODEL,
num_labels=3,
label2id=label2id,
id2label=id2label,
)
def __check_old_accuracy(self):
"""
Run a prediction with the old model on the tokenized test dataset
to evaluate the model's accuracy.
"""
trainer = Trainer(model=self.__model, tokenizer=self.__tokenizer)
output = trainer.predict(self.__test_tokenized)
# Get logits from the prediction output.
logits = output.predictions
# Convert logits to predicted class labels.
preds = np.argmax(logits, axis=1)
# Get the true labels.
labels = output.label_ids
# Compute accuracy.
self.__old_accuracy = (preds == labels).mean()
logger.info(f"Old model accuracy: {self.__old_accuracy:.4f}")
def __train(self):
"""
Performs the training/fine-tuning of the loaded model using the
tokenized train and test datasets.
The training run will be logged on the MLFlow Dashboard.
Uses the 'accuracy' metric to evaluate performance.
"""
# Define the target optimization metric
metric = evaluate.load("accuracy")
# Define a function for calculating our defined target optimization metric during training
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Checkpoints will be output to this `training_output_dir`.
training_output_dir = "/tmp/sentiment_trainer"
training_args = TrainingArguments(
output_dir=training_output_dir,
eval_strategy="epoch",
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
logging_steps=8,
num_train_epochs=10,
)
mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
mlflow.set_experiment("Sentiment Classifier Training")
with mlflow.start_run() as run:
self.__run_id = run.info.run_id
logger.info("Initializing trainer...")
self.__trainer = Trainer(
model=self.__model,
args=training_args,
train_dataset=self.__train_tokenized,
eval_dataset=self.__test_tokenized,
compute_metrics=compute_metrics,
)
logger.info("Trainer finished")
def __evaluate(self):
"""
Evaluates the fine-tuned model's performance by comparing the new
accuracy with the old one over the same test dataset.
"""
logger.info("Evaluating new model's performance")
with mlflow.start_run(run_id=self.__run_id):
output = self.__trainer.predict(self.__test_tokenized)
# Get logits from the prediction output.
logits = output.predictions
# Convert logits to predicted class labels.
preds = np.argmax(logits, axis=1)
# Get the true labels.
labels = output.label_ids
# Compute accuracy.
new_accuracy = (preds == labels).mean()
mlflow.log_metrics({
"old_accuracy": self.__old_accuracy,
"new_accuracy": new_accuracy
}, step=int(time.time()))
if self.__old_accuracy > new_accuracy:
raise Exception(f"New trained model's accuracy dropped {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
else:
logger.info(f"New trained model's accuracy {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
def __deploy(self):
"""
Uploads the fine-tuned model to HuggingFace
"""
logger.info("Deploying Model and Tokenizer to HuggingFace")
self.__trainer.push_to_hub(HF_REPO)
self.__tokenizer.push_to_hub(HF_REPO)
def __reload_inference_model(self):
"""
Reloads the model used by the Inference class.
"""
logger.info("Reloading inference model")
infer_task.load_model()