File size: 9,758 Bytes
058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 eedbf0c 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 eedbf0c 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 eedbf0c 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 eedbf0c 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b 058f1d9 1e9eb0b eedbf0c 058f1d9 1e9eb0b eedbf0c 058f1d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
import evaluate
import numpy as np
from uvicorn.config import logger
from datasets import load_dataset
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
Trainer,
TrainingArguments,
pipeline,
)
from huggingface_hub import login, logout
from scipy.special import softmax
import os
import mlflow
from tasks.inference import infer_task
from config import is_test_mode
import time
# MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
DATASET = "zeroshot/twitter-financial-news-sentiment"
MODEL = "gpicciuca/sentiment_trainer"
HF_REPO = "gpicciuca/sentiment_trainer"
RNG_SEED = 22
class TrainingTask:
"""
Implements a sequence of actions to control the training phase of the model.
The class implements a callable overload method which initializes the old model,
loads and prepares datasets and proceeds with the training.
Upon completion, the new model will be uploaded to the HuggingFace repo only
if its accuracy did not drop compared to the old model.
This class is managed via singleton so that there may only be one
instance at any time, unless manually allocated.
"""
TRAINING_TASK_INST_SINGLETON = None
def __init__(self):
self.__is_done = False
self.__has_error = False
self.__train_dataset = None
self.__test_dataset = None
self.__tokenizer = None
self.__train_tokenized = None
self.__test_tokenized = None
self.__model = None
self.__trainer = None
self.__run_id = None
self.__old_accuracy = 0.0
@staticmethod
def has_instance():
"""
Checks if a global singleton instance is available
Returns:
bool: True if instance available, false otherwise
"""
return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None
@staticmethod
def get_instance():
"""
Returns the globally allocated singleton instance.
Instance will be allocated with this method if none was previously
allocated yet.
Returns:
TrainingTask: Singleton instance
"""
if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
return TrainingTask.TRAINING_TASK_INST_SINGLETON
@staticmethod
def clear_instance():
"""
Destroys the global instance
"""
del TrainingTask.TRAINING_TASK_INST_SINGLETON
TrainingTask.TRAINING_TASK_INST_SINGLETON = None
def has_error(self):
"""
Checks whether an error occurred during training.
Returns:
bool: True if an exception was raised, false otherwise
"""
return self.__has_error
def is_done(self):
"""
Checks whether the training is done.
Returns:
bool: True if done, false if still ongoing.
"""
return self.__is_done
def __call__(self, *args, **kwds):
"""
Callable overload for this class. Initiates the training sequence
for the existing model by loading it, loading and preparing datasets,
fine-tuning and comparing performance against old model over the test dataset.
"""
self.__has_error = False
self.__is_done = False
if is_test_mode():
# Simulate a successful training run in test mode
self.__has_error = False
self.__is_done = True
return
login(token=os.environ["HF_ACCESS_TOKEN"])
try:
self.__load_datasets()
self.__tokenize()
self.__load_model()
self.__check_old_accuracy()
self.__train()
self.__evaluate()
self.__deploy()
except Exception as ex:
logger.error(f"Error during training: {ex}")
self.__has_error = True
finally:
self.__is_done = True
if self.has_error():
logger.error("Training did not complete and terminated with an error")
else:
logger.info("Training completed")
logout()
self.__reload_inference_model()
def __load_datasets(self, test_size_ratio=0.2):
"""
Loads and splits the dataset in train and test sets.
"""
assert (test_size_ratio > 0.0 and test_size_ratio < 1.0)
dataset = load_dataset(DATASET)
# Split train/test by 'test_size_ratio'
dataset_train_test = dataset["train"].train_test_split(test_size=test_size_ratio)
self.__train_dataset = dataset_train_test["train"]
self.__test_dataset = dataset_train_test["test"]
# Swap labels so that they match what the model actually expects
# The model expects {0: negative, 1: neutral, 2: positive}
# But the dataset uses {0: negative, 1: positive, 2: neutral}
# So here we just flip 1<->2 to remain consistent
def label_filter(row):
row["label"] = { 0: 0, 1: 2, 2: 1 }[row["label"]]
return row
self.__train_dataset = self.__train_dataset.map(label_filter)
self.__test_dataset = self.__test_dataset.map(label_filter)
def __tokenize(self):
"""
Loads the tokenizer previously used in the pretrained model
and uses it to tokenize the datasets so that the input to the
model remains consistent with what it has seen in previous
trainings.
"""
# Load the tokenizer for the model.
self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)
def tokenize_function(examples):
# Pad/truncate each text to 512 tokens. Enforcing the same shape
# could make the training faster.
return self.__tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=256,
)
# Tokenize the train and test datasets
self.__train_tokenized = self.__train_dataset.map(tokenize_function)
self.__train_tokenized = self.__train_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
self.__test_tokenized = self.__test_dataset.map(tokenize_function)
self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)
def __load_model(self):
"""
Loads the model from the repository
"""
# Set the mapping between int label and its meaning.
id2label = {0: "negative", 1: "neutral", 2: "positive"}
label2id = {"negative": 0, "neutral": 1, "positive": 2}
# Acquire the model from the Hugging Face Hub, providing label and id mappings so that both we and the model can 'speak' the same language.
self.__model = AutoModelForSequenceClassification.from_pretrained(
MODEL,
num_labels=3,
label2id=label2id,
id2label=id2label,
)
def __check_old_accuracy(self):
"""
Run a prediction with the old model on the tokenized test dataset
to evaluate the model's accuracy.
"""
trainer = Trainer(model=self.__model, tokenizer=self.__tokenizer)
output = trainer.predict(self.__test_tokenized)
# Get logits from the prediction output.
logits = output.predictions
# Convert logits to predicted class labels.
preds = np.argmax(logits, axis=1)
# Get the true labels.
labels = output.label_ids
# Compute accuracy.
self.__old_accuracy = (preds == labels).mean()
logger.info(f"Old model accuracy: {self.__old_accuracy:.4f}")
def __train(self):
"""
Performs the training/fine-tuning of the loaded model using the
tokenized train and test datasets.
The training run will be logged on the MLFlow Dashboard.
Uses the 'accuracy' metric to evaluate performance.
"""
# Define the target optimization metric
metric = evaluate.load("accuracy")
# Define a function for calculating our defined target optimization metric during training
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Checkpoints will be output to this `training_output_dir`.
training_output_dir = "/tmp/sentiment_trainer"
training_args = TrainingArguments(
output_dir=training_output_dir,
eval_strategy="epoch",
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
logging_steps=8,
num_train_epochs=10,
)
mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
mlflow.set_experiment("Sentiment Classifier Training")
with mlflow.start_run() as run:
self.__run_id = run.info.run_id
logger.info("Initializing trainer...")
self.__trainer = Trainer(
model=self.__model,
args=training_args,
train_dataset=self.__train_tokenized,
eval_dataset=self.__test_tokenized,
compute_metrics=compute_metrics,
)
logger.info("Trainer finished")
def __evaluate(self):
"""
Evaluates the fine-tuned model's performance by comparing the new
accuracy with the old one over the same test dataset.
"""
logger.info("Evaluating new model's performance")
with mlflow.start_run(run_id=self.__run_id):
output = self.__trainer.predict(self.__test_tokenized)
# Get logits from the prediction output.
logits = output.predictions
# Convert logits to predicted class labels.
preds = np.argmax(logits, axis=1)
# Get the true labels.
labels = output.label_ids
# Compute accuracy.
new_accuracy = (preds == labels).mean()
mlflow.log_metrics({
"old_accuracy": self.__old_accuracy,
"new_accuracy": new_accuracy
}, step=int(time.time()))
if self.__old_accuracy > new_accuracy:
raise Exception(f"New trained model's accuracy dropped {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
else:
logger.info(f"New trained model's accuracy {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
def __deploy(self):
"""
Uploads the fine-tuned model to HuggingFace
"""
logger.info("Deploying Model and Tokenizer to HuggingFace")
self.__trainer.push_to_hub(HF_REPO)
self.__tokenizer.push_to_hub(HF_REPO)
def __reload_inference_model(self):
"""
Reloads the model used by the Inference class.
"""
logger.info("Reloading inference model")
infer_task.load_model()
|