{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [], "machine_shape": "hm", "gpuType": "A100" }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" }, "accelerator": "GPU" }, "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "9gYFoxi68eer" }, "outputs": [], "source": [ "!pip install datasets transformers" ] }, { "cell_type": "code", "source": [ "import pandas as pd\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import os\n", "import math\n", "import time\n", "from tqdm.notebook import trange, tqdm\n", "\n", "import torch\n", "import torch.nn as nn\n", "from torch import optim\n", "from torch.utils.data import DataLoader\n", "from torch import Tensor\n", "from torch.utils.data.dataset import Dataset\n", "import torch.nn.functional as F\n", "from torch.distributions import Categorical\n", "from torch.cuda.amp import autocast, GradScaler\n", "\n", "from datasets import load_dataset\n", "from transformers import AutoTokenizer\n", "\n", "torch.backends.cuda.matmul.allow_tf32 = True\n", "device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", "device" ], "metadata": { "id": "rhkTsyBn8j_m" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "train_dataset = load_dataset(\"sst5\", split=\"train\")\n", "test_dataset = load_dataset(\"sst5\", split=\"test\")\n", "\n", "print(f\"Length of train dataset: {len(train_dataset)}\")\n", "print(f\"Length of test dataset: {len(test_dataset)}\")" ], "metadata": { "id": "c0wKEehd8lfH" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "train_dataset[1][\"text\"], train_dataset[1][\"label\"]" ], "metadata": { "id": "Oj6qWm8H8uYK" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "tokenizer = AutoTokenizer.from_pretrained(\"bert-base-uncased\")" ], "metadata": { "id": "7wbogVwT8ulJ" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "len(tokenizer.vocab)" ], "metadata": { "id": "tPDFZ3xK8wZb" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "tokenizer.vocab_size" ], "metadata": { "id": "EY2TbtGZ8xdA" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "print(\"[PAD] token id:\", tokenizer.pad_token_id) # 0\n", "print(\"[CLS] token id:\", tokenizer.cls_token_id) # 101\n", "print(\"[SEP] token id:\", tokenizer.sep_token_id) # 102" ], "metadata": { "id": "1_Wq4KEj81lb" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class SST5Dataset(Dataset):\n", " def __init__(self, dataset, tokenizer, max_length=128):\n", " self.dataset = dataset\n", " self.tokenizer = tokenizer\n", " self.max_length = max_length\n", "\n", " def __len__(self):\n", " return len(self.dataset)\n", "\n", " def __getitem__(self, idx):\n", " sample = self.dataset[idx]\n", " text = sample[\"text\"]\n", " label = torch.tensor(sample[\"label\"])\n", "\n", " encoded_text = self.tokenizer(\n", " text,\n", " truncation=True,\n", " padding=\"max_length\",\n", " max_length=self.max_length,\n", " return_tensors=\"pt\"\n", " )\n", "\n", " # Remove the extra batch dimension for each item in the encoded dictionary.\n", " encoded_text = {key: val.squeeze(dim=0) for key, val in encoded_text.items()}\n", "\n", " return {\n", " \"text\": encoded_text,\n", " \"label\": label\n", " }\n", "\n", "train_dataset = SST5Dataset(dataset=train_dataset,\n", " tokenizer=tokenizer,\n", " max_length=32)\n", "\n", "test_dataset = SST5Dataset(dataset=test_dataset,\n", " tokenizer=tokenizer,\n", " max_length=32)" ], "metadata": { "id": "jQY8xfZa-ilL" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "batch_size = 128\n", "num_workers = os.cpu_count()\n", "\n", "train_dataloader = DataLoader(train_dataset,\n", " batch_size=batch_size,\n", " shuffle=True,\n", " num_workers=num_workers,\n", " pin_memory=True)\n", "\n", "test_dataloader = DataLoader(test_dataset,\n", " batch_size=batch_size,\n", " shuffle=False,\n", " num_workers=num_workers,\n", " pin_memory=True)" ], "metadata": { "id": "ItktnvlfApqz" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "test_items = next(iter(train_dataloader))\n", "print(tokenizer.decode(test_items[\"text\"][\"input_ids\"][0]))" ], "metadata": { "id": "KrroXe5aAtzs" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class EmbeddingLayer(nn.Module):\n", " def __init__(self,\n", " vocab_size: int,\n", " d_model: int = 768):\n", " super().__init__()\n", "\n", " self.d_model = d_model\n", "\n", " self.lut = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model) # (vocab_size, d_model)\n", "\n", " def forward(self, x):\n", " # x shape: (batch_size, seq_len)\n", " return self.lut(x) * math.sqrt(self.d_model) # (batch_size, seq_len, d_model)" ], "metadata": { "id": "el4Tnb37AvO7" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class PositionalEncoding(nn.Module):\n", " def __init__(self,\n", " d_model: int = 768,\n", " dropout: float = 0.1,\n", " max_length: int = 128):\n", " super().__init__()\n", "\n", " self.dropout = nn.Dropout(p=dropout)\n", "\n", " pe = torch.zeros(max_length, d_model) # (max_length, d_model)\n", " # Create position column\n", " k = torch.arange(0, max_length).unsqueeze(dim=1) # (max_length, 1)\n", "\n", " # Use the log version of the function for positional encodings\n", " div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)) # (d_model / 2)\n", "\n", " # Use sine for the even indices and cosine for the odd indices\n", " pe[:, 0::2] = torch.sin(k * div_term)\n", " pe[:, 1::2] = torch.cos(k * div_term)\n", "\n", " pe = pe.unsqueeze(dim=0) # Add the batch dimension(1, max_length, d_model)\n", "\n", " # We use a buffer because the positional encoding is fixed and not a model paramter that we want to be updated during backpropagation.\n", " self.register_buffer(\"pe\", pe) # Buffers are saved with the model state and are moved to the correct device\n", "\n", " def forward(self, x):\n", " # x shape: (batch_size, seq_length, d_model)\n", " x += self.pe[:, :x.size(1)]\n", " return self.dropout(x)" ], "metadata": { "id": "Qk0sNjc7A6sZ" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class MultiHeadAttention(nn.Module):\n", " def __init__(self,\n", " d_model: int = 768,\n", " n_heads: int = 8,\n", " dropout: float = 0.1):\n", " super().__init__()\n", " assert d_model % n_heads == 0\n", "\n", " self.d_model = d_model\n", " self.n_heads = n_heads\n", " self.d_key = d_model // n_heads\n", "\n", " self.Wq = nn.Linear(in_features=d_model, out_features=d_model)\n", " self.Wk = nn.Linear(in_features=d_model, out_features=d_model)\n", " self.Wv = nn.Linear(in_features=d_model, out_features=d_model)\n", " self.Wo = nn.Linear(in_features=d_model, out_features=d_model)\n", "\n", " self.dropout = nn.Dropout(p=dropout)\n", "\n", "\n", " def forward(self,\n", " query: Tensor,\n", " key: Tensor,\n", " value: Tensor,\n", " mask: Tensor = None):\n", " # input shape: (batch_size, seq_len, d_model)\n", "\n", " batch_size = key.size(0)\n", "\n", " Q = self.Wq(query)\n", " K = self.Wk(key)\n", " V = self.Wv(value)\n", "\n", " Q = Q.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3) # (batch_size, n_heads, q_length, d_key)\n", " K = K.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3) # (batch_size, n_heads, k_length, d_key)\n", " V = V.view(batch_size, -1, self.n_heads, self.d_key).permute(0, 2, 1, 3) # (batch_size, n_heads, v_length, d_key)\n", "\n", " scaled_dot_product = torch.matmul(Q, K.permute(0, 1, 3, 2)) / math.sqrt(self.d_key) # (batch_size, n_heads, q_length, k_length)\n", "\n", " if mask is not None:\n", " scaled_dot_product = scaled_dot_product.masked_fill(mask == 0, float('-inf'))\n", "\n", " attention_probs = torch.softmax(scaled_dot_product, dim=-1)\n", "\n", " A = torch.matmul(self.dropout(attention_probs), V) # (batch_size, n_heads, q_length, d_key)\n", "\n", " A = A.permute(0, 2, 1, 3) # (batch_size, q_length, n_heads, d_key)\n", " A = A.contiguous().view(batch_size, -1, self.n_heads * self.d_key) # (batch_size, q_length, d_model)\n", "\n", " output = self.Wo(A) # (batch_size, q_length, d_model)\n", "\n", " return output, attention_probs" ], "metadata": { "id": "8ugM9m7rA9zL" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class PositionwiseFeedForward(nn.Module):\n", " def __init__(self,\n", " d_model: int = 768,\n", " dropout: float = 0.1):\n", " super().__init__()\n", "\n", " self.ffn = nn.Sequential(\n", " nn.Linear(in_features=d_model, out_features=(d_model * 4)),\n", " nn.ReLU(),\n", " nn.Linear(in_features=(d_model * 4), out_features=d_model),\n", " nn.Dropout(p=dropout)\n", " )\n", "\n", " def forward(self, x):\n", " # x shape: (batch_size, q_length, d_model)\n", " return self.ffn(x) # (batch_size, q_length, d_model)" ], "metadata": { "id": "kqQGZf6rA_KL" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class EncoderLayer(nn.Module):\n", " def __init__(self,\n", " d_model: int = 768,\n", " n_heads: int = 8,\n", " dropout: float = 0.1):\n", " super().__init__()\n", "\n", " self.attention = MultiHeadAttention(d_model=d_model, n_heads=n_heads, dropout=dropout)\n", " self.attention_layer_norm = nn.LayerNorm(d_model)\n", "\n", " self.position_wise_ffn = PositionwiseFeedForward(d_model=d_model, dropout=dropout)\n", " self.ffn_layer_norm = nn.LayerNorm(d_model)\n", "\n", " self.dropout = nn.Dropout(p=dropout)\n", "\n", " def forward(self,\n", " src: Tensor,\n", " src_mask: Tensor):\n", " _src, attention_probs = self.attention(query=src, key=src, value=src, mask=src_mask)\n", " src = self.attention_layer_norm(src + self.dropout(_src))\n", "\n", " _src = self.position_wise_ffn(src)\n", " src = self.ffn_layer_norm(src + self.dropout(_src))\n", "\n", " return src, attention_probs" ], "metadata": { "id": "_jypLBCiBDb-" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class Encoder(nn.Module):\n", " def __init__(self,\n", " d_model: int = 768,\n", " n_layers: int = 3,\n", " n_heads: int = 8,\n", " dropout: float = 0.1):\n", " super().__init__()\n", "\n", " self.layers = nn.ModuleList([EncoderLayer(d_model=d_model, n_heads=n_heads, dropout=dropout) for layer in range(n_layers)])\n", " self.dropout = nn.Dropout(p=dropout)\n", "\n", " def forward(self,\n", " src: Tensor,\n", " src_mask: Tensor):\n", "\n", " for layer in self.layers:\n", " src, attention_probs = layer(src, src_mask)\n", "\n", " self.attention_probs = attention_probs\n", "\n", " # src += torch.randn_like(src) * 0.001\n", " return src" ], "metadata": { "id": "o-cPP_YLBF8y" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "class Transformer(nn.Module):\n", " def __init__(self,\n", " encoder: Encoder,\n", " src_embed: EmbeddingLayer,\n", " src_pad_idx: int,\n", " device,\n", " d_model: int = 768,\n", " num_labels: int = 5):\n", " super().__init__()\n", "\n", " self.encoder = encoder\n", " self.src_embed = src_embed\n", " self.device = device\n", " self.src_pad_idx = src_pad_idx\n", "\n", " self.dropout = nn.Dropout(p=0.1)\n", " self.classifier = nn.Linear(in_features=d_model, out_features=num_labels)\n", "\n", " def make_src_mask(self, src: Tensor):\n", " # Assign 1 to tokens that need attended to and 0 to padding tokens, then add 2 dimensions\n", " src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)\n", "\n", " return src_mask\n", "\n", " def forward(self, src: Tensor):\n", " src_mask = self.make_src_mask(src) # (batch_size, 1, 1, src_seq_length)\n", " output = self.encoder(self.src_embed(src), src_mask) # (batch_size, src_seq_length, d_model)\n", " output = output[:, 0, :] # Get the sos token vector representation (works sort of like a cls token in ViT) shape: (batch_size, 1, d_model)\n", " logits = self.classifier(self.dropout(output))\n", "\n", " return logits" ], "metadata": { "id": "5fcff-6oBX_w" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def make_model(device,\n", " tokenizer,\n", " n_layers: int = 3,\n", " d_model: int = 768,\n", " num_labels: int = 5,\n", " n_heads: int = 8,\n", " dropout: float = 0.1,\n", " max_length: int = 128):\n", " encoder = Encoder(d_model=d_model,\n", " n_layers=n_layers,\n", " n_heads=n_heads,\n", " dropout=dropout)\n", "\n", " src_embed = EmbeddingLayer(vocab_size=tokenizer.vocab_size, d_model=d_model)\n", "\n", " pos_enc = PositionalEncoding(d_model=d_model,\n", " dropout=dropout,\n", " max_length=max_length)\n", "\n", " model = Transformer(encoder=encoder,\n", " src_embed=nn.Sequential(src_embed, pos_enc),\n", " src_pad_idx=tokenizer.pad_token_id,\n", " device=device,\n", " d_model=d_model,\n", " num_labels=num_labels)\n", "\n", " # Initialize parameters with Xaviar/Glorot\n", " # This maintains a consistent variance of activations throughout the network\n", " # Helps avoid issues like vanishing or exploding gradients.\n", " for p in model.parameters():\n", " if p.dim() > 1:\n", " nn.init.xavier_uniform_(p)\n", "\n", " return model" ], "metadata": { "id": "-7adHoyYBcqT" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "model = make_model(device=device,\n", " tokenizer=tokenizer,\n", " n_layers=4,\n", " d_model=768,\n", " num_labels=5,\n", " n_heads=8,\n", " dropout=0.1,\n", " max_length=32)\n", "\n", "model.to(device)" ], "metadata": { "id": "M0EbhBuQBhUK" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "print(f\"The model has {(sum(p.numel() for p in model.parameters() if p.requires_grad)):,} trainable parameters\")" ], "metadata": { "id": "NT37aWKnBk4y" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "lr = 1e-4\n", "\n", "optimizer = torch.optim.Adam(params=model.parameters(),\n", " lr=lr,\n", " betas=(0.9, 0.999))\n", "loss_fn = nn.CrossEntropyLoss()\n", "scaler = GradScaler()" ], "metadata": { "id": "hZmiAxW-BmLW" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def train(model,\n", " iterator,\n", " optimizer,\n", " loss_fn,\n", " clip,\n", " epoch):\n", " model.train()\n", " epoch_loss = 0\n", "\n", " pbar = tqdm(iterator, total=len(iterator), desc=f\"Epoch {epoch + 1} Progress\", colour=\"#005500\")\n", " for i, batch in enumerate(pbar):\n", " src = batch[\"text\"][\"input_ids\"].to(device)\n", " labels = batch[\"label\"].to(device)\n", "\n", " optimizer.zero_grad()\n", " with autocast():\n", " # Forward pass\n", " logits = model(src)\n", "\n", " # Calculate the loss\n", " loss = loss_fn(logits, labels)\n", "\n", " scaler.scale(loss).backward()\n", " scaler.unscale_(optimizer)\n", " nn.utils.clip_grad_norm_(model.parameters(), clip)\n", " scaler.step(optimizer)\n", " scaler.update()\n", " epoch_loss += loss.item()\n", "\n", " pbar.set_postfix(loss=loss.item()) # Update the loss on the tqdm progress bar\n", "\n", " return (epoch_loss / len(iterator))" ], "metadata": { "id": "WMNVjg0UBqQF" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def evaluate(model,\n", " iterator,\n", " loss_fn):\n", " model.eval()\n", " epoch_loss = 0\n", "\n", " with torch.inference_mode():\n", " for i, batch in enumerate(iterator):\n", " src = batch[\"text\"][\"input_ids\"].to(device)\n", " labels = batch[\"label\"].to(device)\n", "\n", " # Forward pass\n", " logits = model(src)\n", "\n", " # Calculate the loss\n", " loss = loss_fn(logits, labels)\n", " epoch_loss += loss.item()\n", "\n", " return (epoch_loss / len(iterator))" ], "metadata": { "id": "V0McrJ1FF5d3" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def epoch_time(start_time, end_time):\n", " elapsed_time = end_time - start_time\n", " elapsed_mins = int(elapsed_time / 60)\n", " elapsed_secs = int(elapsed_time - (elapsed_mins * 60))\n", " return elapsed_mins, elapsed_secs" ], "metadata": { "id": "rq9YQv_eF5YQ" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "epochs = 10\n", "clip = 1\n", "\n", "best_valid_loss = float(\"inf\")\n", "model_path = \"sentiment_analysis_model.pt\"\n", "\n", "if os.path.exists(model_path):\n", " print(f\"Loading model from {model_path}...\")\n", " model.load_state_dict(torch.load(model_path, map_location=device))" ], "metadata": { "id": "JE6JAXM-F5Qc" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "should_train = True\n", "\n", "if should_train:\n", " for epoch in tqdm(range(epochs), desc=f\"Training progress\", colour=\"#00ff00\"):\n", " start_time = time.time()\n", "\n", " train_loss = train(model=model,\n", " iterator=train_dataloader,\n", " optimizer=optimizer,\n", " loss_fn=loss_fn,\n", " clip=clip,\n", " epoch=epoch)\n", "\n", " end_time = time.time()\n", " epoch_mins, epoch_secs = epoch_time(start_time, end_time)\n", "\n", " message = f\"Epoch: {epoch + 1} | Time: {epoch_mins}m {epoch_secs}s --> STORED\"\n", "\n", " torch.save(model.state_dict(), model_path)\n", "\n", " print(message)\n", " print(f\"Train Loss: {train_loss:.6f}\")" ], "metadata": { "id": "ruWsYqeYGCi0" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "test_loss = evaluate(model=model,\n", " iterator=test_dataloader,\n", " loss_fn=loss_fn)\n", "\n", "print(f\"Test Loss: {test_loss:.6f}\")" ], "metadata": { "id": "GNHZ-ft8GHGy" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def get_sentiment(question, model, device, max_length: int = 32):\n", " model.eval()\n", "\n", " encoded = tokenizer(question, truncation=True, max_length=max_length, return_tensors=\"pt\")\n", " src_tensor = encoded[\"input_ids\"].to(device)\n", "\n", " with torch.inference_mode():\n", " # Forward pass for classification.\n", " logits = model(src_tensor) # shape: (1, num_labels)\n", "\n", " # Get the predicted class (index) with the highest score.\n", " pred_index = torch.argmax(logits, dim=1).item()\n", "\n", " sentiment_map = {\n", " 0: \"Very Negative\",\n", " 1: \"Negative\",\n", " 2: \"Neutral\",\n", " 3: \"Positive\",\n", " 4: \"Very Positive\"\n", " }\n", " predicted_sentiment = sentiment_map.get(pred_index, \"unknown\")\n", "\n", " return predicted_sentiment" ], "metadata": { "id": "0ej2-U8dGrot" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "#@title Question Answering\n", "src_sentence = \"That book was amazing!\" #@param \"\"\n", "\n", "predicted_sentiment = get_sentiment(src_sentence, model, device, max_length=32)\n", "\n", "print(predicted_sentiment)" ], "metadata": { "id": "oCwZfvW5IpWG" }, "execution_count": null, "outputs": [] } ] }