File size: 9,758 Bytes
058f1d9
 
 
 
 
 
 
 
 
 
 
 
1e9eb0b
058f1d9
 
 
 
 
1e9eb0b
058f1d9
 
1e9eb0b
058f1d9
eedbf0c
 
058f1d9
 
 
 
 
1e9eb0b
 
 
 
 
 
 
 
 
 
 
058f1d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e9eb0b
 
058f1d9
 
1e9eb0b
 
 
 
 
 
058f1d9
 
 
 
1e9eb0b
 
 
 
 
 
 
 
058f1d9
 
 
 
 
 
 
1e9eb0b
 
 
058f1d9
 
 
 
1e9eb0b
 
 
 
 
 
058f1d9
 
 
1e9eb0b
 
 
 
 
 
058f1d9
 
 
1e9eb0b
 
 
 
 
 
058f1d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e9eb0b
058f1d9
 
 
 
 
 
 
 
 
1e9eb0b
 
 
 
 
058f1d9
 
 
 
1e9eb0b
 
 
 
 
 
058f1d9
 
1e9eb0b
 
058f1d9
 
 
 
eedbf0c
 
058f1d9
 
 
 
 
 
 
 
 
1e9eb0b
 
 
 
 
 
058f1d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e9eb0b
 
 
058f1d9
eedbf0c
 
058f1d9
 
 
 
 
 
 
 
 
1e9eb0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
058f1d9
1e9eb0b
 
 
 
 
 
058f1d9
 
 
 
 
 
 
 
 
 
eedbf0c
058f1d9
 
 
 
 
 
1e9eb0b
058f1d9
 
 
 
 
 
 
 
1e9eb0b
 
 
 
 
 
 
058f1d9
1e9eb0b
058f1d9
1e9eb0b
 
 
 
 
 
058f1d9
1e9eb0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
058f1d9
1e9eb0b
 
 
eedbf0c
 
 
058f1d9
 
1e9eb0b
 
 
eedbf0c
058f1d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import evaluate
import numpy as np
from uvicorn.config import logger
from datasets import load_dataset
from transformers import (
	AutoModelForSequenceClassification,
	AutoTokenizer,
	Trainer,
	TrainingArguments,
	pipeline,
)
from huggingface_hub import login, logout
from scipy.special import softmax

import os
import mlflow
from tasks.inference import infer_task
from config import is_test_mode
import time


# MODEL = "cardiffnlp/twitter-roberta-base-sentiment-latest"
DATASET = "zeroshot/twitter-financial-news-sentiment"
MODEL = "gpicciuca/sentiment_trainer"
HF_REPO = "gpicciuca/sentiment_trainer"

RNG_SEED = 22

class TrainingTask:

	"""

	Implements a sequence of actions to control the training phase of the model.

	The class implements a callable overload method which initializes the old model,

	loads and prepares datasets and proceeds with the training.

	Upon completion, the new model will be uploaded to the HuggingFace repo only

	if its accuracy did not drop compared to the old model.



	This class is managed via singleton so that there may only be one

	instance at any time, unless manually allocated.

	"""

	TRAINING_TASK_INST_SINGLETON = None
	
	def __init__(self):
		self.__is_done = False
		self.__has_error = False

		self.__train_dataset = None
		self.__test_dataset = None
		self.__tokenizer = None
		self.__train_tokenized = None
		self.__test_tokenized = None
		self.__model = None
		self.__trainer = None
		self.__run_id = None

		self.__old_accuracy = 0.0

	@staticmethod
	def has_instance():
		"""

		Checks if a global singleton instance is available



		Returns:

			bool: True if instance available, false otherwise

		"""
		return TrainingTask.TRAINING_TASK_INST_SINGLETON is not None

	@staticmethod
	def get_instance():
		"""

		Returns the globally allocated singleton instance.

		Instance will be allocated with this method if none was previously

		allocated yet.



		Returns:

			TrainingTask: Singleton instance

		"""
		if TrainingTask.TRAINING_TASK_INST_SINGLETON is None:
			TrainingTask.TRAINING_TASK_INST_SINGLETON = TrainingTask()
		
		return TrainingTask.TRAINING_TASK_INST_SINGLETON

	@staticmethod
	def clear_instance():
		"""

		Destroys the global instance

		"""
		del TrainingTask.TRAINING_TASK_INST_SINGLETON
		TrainingTask.TRAINING_TASK_INST_SINGLETON = None

	def has_error(self):
		"""

		Checks whether an error occurred during training.



		Returns:

			bool: True if an exception was raised, false otherwise

		"""
		return self.__has_error

	def is_done(self):
		"""

		Checks whether the training is done.



		Returns:

			bool: True if done, false if still ongoing.

		"""
		return self.__is_done

	def __call__(self, *args, **kwds):
		"""

		Callable overload for this class. Initiates the training sequence

		for the existing model by loading it, loading and preparing datasets,

		fine-tuning and comparing performance against old model over the test dataset.

		"""

		self.__has_error = False
		self.__is_done = False

		if is_test_mode():
			# Simulate a successful training run in test mode
			self.__has_error = False
			self.__is_done = True
			return

		login(token=os.environ["HF_ACCESS_TOKEN"])

		try:
			self.__load_datasets()
			self.__tokenize()
			self.__load_model()
			self.__check_old_accuracy()
			self.__train()
			self.__evaluate()
			self.__deploy()
		except Exception as ex:
			logger.error(f"Error during training: {ex}")
			self.__has_error = True
		finally:
			self.__is_done = True

		if self.has_error():
			logger.error("Training did not complete and terminated with an error")
		else:
			logger.info("Training completed")

		logout()

		self.__reload_inference_model()

	def __load_datasets(self, test_size_ratio=0.2):
		"""

		Loads and splits the dataset in train and test sets.

		"""
		assert (test_size_ratio > 0.0 and test_size_ratio < 1.0)

		dataset = load_dataset(DATASET)

		# Split train/test by 'test_size_ratio'
		dataset_train_test = dataset["train"].train_test_split(test_size=test_size_ratio)
		self.__train_dataset = dataset_train_test["train"]
		self.__test_dataset = dataset_train_test["test"]

		# Swap labels so that they match what the model actually expects
		# The model expects {0: negative, 1: neutral, 2: positive}
		# But the dataset uses {0: negative, 1: positive, 2: neutral}
		# So here we just flip 1<->2 to remain consistent
		def label_filter(row):
			row["label"] = { 0: 0, 1: 2, 2: 1 }[row["label"]]
			return row

		self.__train_dataset = self.__train_dataset.map(label_filter)
		self.__test_dataset = self.__test_dataset.map(label_filter)

	def __tokenize(self):
		"""

		Loads the tokenizer previously used in the pretrained model

		and uses it to tokenize the datasets so that the input to the

		model remains consistent with what it has seen in previous

		trainings.

		"""
		# Load the tokenizer for the model.
		self.__tokenizer = AutoTokenizer.from_pretrained(MODEL)

		def tokenize_function(examples):
			# Pad/truncate each text to 512 tokens. Enforcing the same shape
			# could make the training faster.
			return self.__tokenizer(
				examples["text"],
				padding="max_length",
				truncation=True,
				max_length=256,
			)

		# Tokenize the train and test datasets
		self.__train_tokenized = self.__train_dataset.map(tokenize_function)
		self.__train_tokenized = self.__train_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)

		self.__test_tokenized = self.__test_dataset.map(tokenize_function)
		self.__test_tokenized = self.__test_tokenized.remove_columns(["text"]).shuffle(seed=RNG_SEED)

	def __load_model(self):
		"""

		Loads the model from the repository

		"""
		# Set the mapping between int label and its meaning.
		id2label = {0: "negative", 1: "neutral", 2: "positive"}
		label2id = {"negative": 0, "neutral": 1, "positive": 2}

		# Acquire the model from the Hugging Face Hub, providing label and id mappings so that both we and the model can 'speak' the same language.
		self.__model = AutoModelForSequenceClassification.from_pretrained(
			MODEL,
			num_labels=3,
			label2id=label2id,
			id2label=id2label,
		)

	def __check_old_accuracy(self):
		"""

		Run a prediction with the old model on the tokenized test dataset

		to evaluate the model's accuracy.

    	"""
		trainer = Trainer(model=self.__model, tokenizer=self.__tokenizer)
		output = trainer.predict(self.__test_tokenized)

		# Get logits from the prediction output.
		logits = output.predictions
		# Convert logits to predicted class labels.
		preds = np.argmax(logits, axis=1)
		# Get the true labels.
		labels = output.label_ids

		# Compute accuracy.
		self.__old_accuracy = (preds == labels).mean()
		logger.info(f"Old model accuracy: {self.__old_accuracy:.4f}")

	def __train(self):
		"""

		Performs the training/fine-tuning of the loaded model using the

		tokenized train and test datasets.

		The training run will be logged on the MLFlow Dashboard.

		Uses the 'accuracy' metric to evaluate performance.

		"""
		# Define the target optimization metric
		metric = evaluate.load("accuracy")

		# Define a function for calculating our defined target optimization metric during training
		def compute_metrics(eval_pred):
			logits, labels = eval_pred
			predictions = np.argmax(logits, axis=-1)
			return metric.compute(predictions=predictions, references=labels)

		# Checkpoints will be output to this `training_output_dir`.
		training_output_dir = "/tmp/sentiment_trainer"
		training_args = TrainingArguments(
			output_dir=training_output_dir,
			eval_strategy="epoch",
			per_device_train_batch_size=8,
			per_device_eval_batch_size=8,
			logging_steps=8,
			num_train_epochs=10,
		)

		mlflow.set_tracking_uri(os.environ["MLFLOW_ENDPOINT"])
		mlflow.set_experiment("Sentiment Classifier Training")

		with mlflow.start_run() as run:
			self.__run_id = run.info.run_id

			logger.info("Initializing trainer...")
			self.__trainer = Trainer(
				model=self.__model,
				args=training_args,
				train_dataset=self.__train_tokenized,
				eval_dataset=self.__test_tokenized,
				compute_metrics=compute_metrics,
			)
			logger.info("Trainer finished")

	def __evaluate(self):
		"""

		Evaluates the fine-tuned model's performance by comparing the new

		accuracy with the old one over the same test dataset.

  		"""
		logger.info("Evaluating new model's performance")

		with mlflow.start_run(run_id=self.__run_id):
			output = self.__trainer.predict(self.__test_tokenized)

			# Get logits from the prediction output.
			logits = output.predictions
			# Convert logits to predicted class labels.
			preds = np.argmax(logits, axis=1)
			# Get the true labels.
			labels = output.label_ids

			# Compute accuracy.
			new_accuracy = (preds == labels).mean()
			mlflow.log_metrics({
				"old_accuracy": self.__old_accuracy,
				"new_accuracy": new_accuracy
			}, step=int(time.time()))

			if self.__old_accuracy > new_accuracy:
				raise Exception(f"New trained model's accuracy dropped {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
			else:
				logger.info(f"New trained model's accuracy {self.__old_accuracy:.9f} -> {new_accuracy:.9f}")
	
	def __deploy(self):
		"""

		Uploads the fine-tuned model to HuggingFace

		"""
		logger.info("Deploying Model and Tokenizer to HuggingFace")
		self.__trainer.push_to_hub(HF_REPO)
		self.__tokenizer.push_to_hub(HF_REPO)

	def __reload_inference_model(self):
		"""

		Reloads the model used by the Inference class.

		"""
		logger.info("Reloading inference model")
		infer_task.load_model()