crpatel's picture
gradio app
ab3efd5
from deepseek_v3 import DeepSeekV3Model
import torch
import yaml
from transformers import AutoTokenizer
# from gptdataloader import GPTDataLoader
from torch.utils.data import DataLoader
import numpy as np
from datasets import load_dataset
import logging
import math
from utils import upload_file_to_s3
# At the start of training loop
# print(f"GPU Memory allocated: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
# print(f"GPU Memory reserved: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler('training.log')
file_handler.setFormatter(formatter) # Set formatter on the handler, not the logger
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
def encode_text(examples, tokenizer, seq_length):
"""Tokenize and prepare text examples for training."""
tokens = tokenizer(
examples["text"],
truncation=True,
padding="max_length",
max_length=seq_length + 1,
return_tensors="pt",
)
# Use clone().detach() as recommended
input_ids = tokens["input_ids"].squeeze(0).clone().detach()
input_ids = torch.clamp(input_ids, min=0, max=tokenizer.vocab_size - 1)
labels = input_ids.clone().detach()
labels = labels[1:].to(torch.int64)
input_ids = input_ids[:-1].to(torch.int64)
return {"input_ids": input_ids, "labels": labels}
def load_cosmopedia_dataset(batch_size=8, seq_length=1024, tokenizer=None):
"""
Returns a torch dataloader for the cosmopedia dataset
"""
# Set tokenizer parallelism explicitly
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
logger.info("tokenizer parallelism set to false")
try:
# Increase timeout and retries for dataset loading
from datasets import config
config.HF_DATASETS_TIMEOUT = 300 # 5 minutes timeout
config.MAX_RETRIES = 10 # Increase retry attempts
logger.info("dataset loading config set")
train_dataset = load_dataset(
"HuggingFaceTB/smollm-corpus",
name="cosmopedia-v2",
split="train",
streaming=True,
)
logger.info("dataset loaded")
# Use partial to bind tokenizer and seq_length to the encode function
from functools import partial
encode_fn = partial(encode_text, tokenizer=tokenizer, seq_length=seq_length)
train_dataset = train_dataset.map(
encode_fn,
remove_columns=["text"],
batched=False
)
train_dataset = train_dataset.with_format("torch")
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
num_workers=2,
pin_memory=True,
prefetch_factor=4,
persistent_workers=True
)
return train_dataloader
except Exception as e:
logger.error(f"Error loading dataset: {str(e)}")
return None
# def create_dataloader(file_path, tokenizer, context_size, stride):
# with open(file_path, "r") as file:
# text_data = file.read()
# total_characters = len(text_data)
# total_tokens = len(tokenizer.encode(text_data))
# logger.info(f"Characters: {total_characters}")
# logger.info(f"Tokens: {total_tokens}")
# # create dataloader
# train_ratio = 0.9
# val_ratio = 0.1
# split_idx = int(train_ratio * total_characters)
# train_data = text_data[:split_idx]
# valid_data = text_data[split_idx:]
# train_dataset = GPTDataLoader(train_data, tokenizer, context_size, stride)
# valid_dataset = GPTDataLoader(valid_data, tokenizer, context_size, stride)
# return DataLoader(train_dataset, batch_size=10, shuffle=True, drop_last=True), DataLoader(valid_dataset, batch_size=10, shuffle=False, drop_last=True)
# def calculate_loss_batch(input_batch, target_batch, model, device):
# input_batch = input_batch.to(device)
# target_batch = target_batch.to(device)
# logits, loss = model(input_batch, target_batch) # e.g. 10, 32, 49152
# logits = logits.view(-1, logits.size(-1)) # Shape: [320, 49152]
# target_batch = target_batch.view(-1) # Shape: [320]
# loss = torch.nn.functional.cross_entropy(logits, target_batch)
# return loss
# def calc_loss_loader(data_loader, model, device, num_batches=None):
# total_loss = 0.0
# if len(data_loader) == 0:
# return float("nan")
# elif num_batches is None:
# num_batches = len(data_loader)
# else:
# num_batches = min(num_batches, len(data_loader))
# for i, (input_batch, target_batch) in enumerate(data_loader):
# if i < num_batches:
# loss = calculate_loss_batch(input_batch, target_batch, model, device)
# total_loss += loss.item()
# else:
# break
# return total_loss / num_batches
# def evaluate_model(model, train_dataloader, valid_dataloader, device, eval_iter=100):
# model.eval()
# with torch.no_grad():
# train_loss = calc_loss_loader(train_dataloader, model, device, num_batches=eval_iter)
# valid_loss = calc_loss_loader(valid_dataloader, model, device, num_batches=eval_iter)
# model.train()
# return train_loss, valid_loss
def generate(model, idx, max_new_tokens, context_length, temperature=1.0, top_k=None, eos_token=None, device=None):
logger.info(f"Generating on device {device}")
model = model.to(device)
idx = idx.to(device)
model.eval()
for _ in range(max_new_tokens):
idx_cond = idx[:, -context_length:]
with torch.no_grad():
logits, _ = model(idx_cond) # Unpack both logits and loss (ignore loss)
logits = logits.view(idx_cond.shape[0], -1, model.config['vocab_size']) # Reshape to [batch, seq, vocab]
# Get the logits for the last token only
logits = logits[:, -1, :] # Shape: [batch_size, vocab_size]
if top_k is not None:
# top k sampling
top_logits, top_pos = torch.topk(logits, top_k)
min_logit = top_logits[:, -1].unsqueeze(-1)
logits = torch.where(logits < min_logit,
torch.tensor(float('-inf')).to(logits.device),
logits)
# temperature scaling
if temperature > 0.0:
logits /= temperature
probs = torch.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
else:
idx_next = torch.argmax(logits, dim=-1, keepdim=True)
if idx_next.item() == eos_token:
break
idx = torch.cat((idx, idx_next), dim=1)
model.train()
return idx
def sync_device(device):
if device.startswith('cuda'):
torch.cuda.synchronize()
elif device == 'cpu':
torch.cpu.synchronize() if hasattr(torch.cpu, 'synchronize') else None
elif device.startswith('mps'): # For Apple Silicon
torch.mps.synchronize()
def print_gpu_memory(step_name=""):
"""
Print GPU memory statistics with a specified step name
"""
if torch.cuda.is_available():
logger.info(f"\nGPU Memory Stats {step_name}:")
logger.info(f"GPU Memory allocated: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
logger.info(f"GPU Memory reserved: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")
logger.info(f"Max GPU Memory allocated: {torch.cuda.max_memory_allocated() / 1024**2:.2f} MB")
# Learning rate scheduler
def get_lr_lambda(current_step, warmup_steps, max_steps, max_lr):
"""
Modified learning rate scheduler with:
1. Linear warmup for first 3000 steps
2. Cosine decay from 3000 to 60000 steps
3. Minimum learning rate of 1.5e-5 (5% of max_lr)
"""
min_lr = max_lr * 0.05 # Minimum learning rate (5% of max_lr)
if current_step < warmup_steps:
# Linear warmup from 0 to max_lr
return float(current_step) / float(max(1, warmup_steps))
else:
# Cosine decay from max_lr to min_lr
progress = float(current_step - warmup_steps) / float(max(1, max_steps - warmup_steps))
return min_lr + 0.5 * (max_lr - min_lr) * (1.0 + math.cos(math.pi * progress))
def train_model(config, model, train_loader, test_loader, optimizer, device, num_epochs, eval_freq, eval_iter, start_context="Jack Gisburn rather a cheap genius- ", tokenizer=None):
total_loss = 0
tokens_seen, global_step = 0, -1
# Adjusted gradient accumulation setup for batch size 8
actual_batch_size = config['tokens']['micro_batch_size'] # Now 8
effective_batch_size_multiplier = 1 # Adjusted for batch size 8
target_batch_size = effective_batch_size_multiplier * config['tokens']['micro_batch_size']
gradient_accumulation_steps = target_batch_size // actual_batch_size
# Learning rate parameters adjusted for batch size 8
max_lr = 3e-4 # Keep the same max learning rate
warmup_steps = 3000 # Keep warmup steps
max_steps = 60000 # Keep max steps
min_lr = max_lr * 0.05 # Keep minimum LR at 5% of max
# Create LambdaLR scheduler with the improved lambda function
lr_lambda = lambda step: get_lr_lambda(step, warmup_steps, max_steps, max_lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
logger.info(f"Training with learning rate schedule:")
logger.info(f"Max LR: {max_lr}")
logger.info(f"Warmup Steps: {warmup_steps}")
logger.info(f"Max Steps: {max_steps}")
logger.info(f"Min LR: {max_lr * 0.05}")
logger.info(f"Gradient Accumulation Steps: {gradient_accumulation_steps}")
logger.info(f"Effective Batch Size: {actual_batch_size * gradient_accumulation_steps}")
print_gpu_memory("at start of training")
# Add these near the start of training loop
torch.cuda.empty_cache()
torch.backends.cudnn.benchmark = True
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad() # Zero gradients at start of epoch
for batch_idx, batch in enumerate(train_loader):
input_batch = batch['input_ids'].to(device)
target_batch = batch['labels'].to(device)
# Forward pass
with torch.autocast(device_type=device, dtype=torch.bfloat16):
logits, original_loss = model(input_batch, target_batch)
# Scale loss for gradient accumulation
scaled_loss = original_loss / gradient_accumulation_steps
scaled_loss.backward()
# Add the original loss to total_loss for logging
total_loss += original_loss.item() # Don't multiply back up
tokens_seen += input_batch.numel()
# Calculate running average loss
total_batches = batch_idx + 1
avg_loss = total_loss / total_batches
if batch_idx % 25 == 0:
logger.info(f"Batch {batch_idx + 1}, Running Avg Loss: {avg_loss:.5f}")
# Only update weights after accumulating gradients
if (batch_idx + 1) % gradient_accumulation_steps == 0:
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
scheduler.step() # Update learning rate
optimizer.zero_grad()
global_step += 1
# Evaluation block
if global_step % eval_freq == 0 and global_step > 0:
# Use total batches processed instead of global_step
current_lr = scheduler.get_last_lr()[0]
optimizer_lr = optimizer.param_groups[0]['lr']
print_gpu_memory(f"at step {global_step}")
logger.info(f"learning rate: {current_lr:.8f}")
logger.info(f"Ep {epoch+1} (Step {global_step:06d}): "
f"Avg loss {avg_loss:.3f} | {tokens_seen} tokens seen")
logger.info(f"optimizer lr: {optimizer_lr:.8f}")
logger.info(f"scheduler lr: {current_lr:.8f}")
# Generate sample text
start_context_list = ["In today's ever-evolving world, technology has become an integral part of our lives","Once upon a time, there was a friendly agency called Gaudette Insurance Agency, Inc. They help","A couple of years ago, I was working as an extra on the set of a low-budget British film.","Introduction: The Art of Crafting Vegan Sandwich Delights Sandwiches occupy a unique space in","Meet Chris, a superhero of supplies! Just like how Batman protects Gotham City","Identity formation is a complex and multifaceted process that involves the development of", "With the development of science and technology, computer has become more and more ","Just as there are many variants and forms of electronic malware and Internet-based ","Correctly identifying what is causing a problem is the most important step in pest control.","Lobster, California spiny The California Spiny Lobster fishery is a small but locally ","Bees are vital for pollination. You can buy leafcutter bee houses to attract ","Feeling Alone Together: Exploring Alienation and Isolation in Literature", "Imagine if someone got their hands on dangerous weapons","Once upon a time, in a colorful town called Popville, ","he bell above the door jangled as Sarah walked into her family's hardware store"]
# Randomly select a prompt from the list
random_prompt = np.random.choice(start_context_list)
logger.info(f"Selected prompt: {random_prompt}")
logger.info(f"+++"*30)
encoded_text = tokenizer.encode(random_prompt, return_tensors="pt")
random_topk = np.random.randint(1, 10)
logger.info(f"random_topk: {random_topk}")
random_temperature = np.random.uniform(0.7, 0.9)
logger.info(f"random_temperature: {random_temperature}")
logger.info(f"global step {global_step} , batch_idx {batch_idx} => generating text")
generated_text = generate(model,
idx=encoded_text,
max_new_tokens=256,
context_length=256,
temperature=random_temperature,
top_k=random_topk,
eos_token=tokenizer.eos_token_id,
device=device)
logger.info(f"+++"*30)
logger.info(tokenizer.decode(generated_text.squeeze(0)))
logger.info(f"+++"*30)
# Save checkpoint
model_file_name = f"model_{global_step}_steps_avg_loss_{avg_loss:.5f}_optimizer_lr_{optimizer_lr:.8f}.pth"
torch.save({
'step': global_step,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'loss': avg_loss,
}, model_file_name)
s3_path = upload_file_to_s3(model_file_name, config['model']['model_config']['s3_bucket'],
config['model']['model_config']['s3_checkpoint_folder'])
logger.info(f"Model saved to S3: {s3_path}")
log_path = upload_file_to_s3(config['model']['model_config']['s3_log_file_name'], config['model']['model_config']['s3_bucket'],
config['model']['model_config']['s3_log_folder'])
logger.info(f"Log saved to S3: {log_path}")
if batch_idx % 100 == 0:
logger.info(f"Batch {batch_idx} finished")
logger.info(f"+++"*30)
logger.info("Training complete")
if __name__ == "__main__":
config = yaml.load(open("config_smollm2_135M.yaml", "r"), Loader=yaml.FullLoader)
logger.info(config)
# Set memory efficient settings
torch.set_float32_matmul_precision('high')
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
# Empty cache before model creation
torch.cuda.empty_cache()
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:64'
model = DeepSeekV3Model(config['model'])
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Enable gradient checkpointing for memory efficiency
# model.gradient_checkpointing_enable()
model.to(device)
#model = torch.compile(model)
logger.info(model)
logger.info("++"*30)
total_params = sum(p.numel() for p in model.parameters())
logger.info(f"Total parameters: {total_params}")
optimizer = torch.optim.AdamW(
model.parameters(),
lr=3e-4,
weight_decay=0.15,
betas=(0.9, 0.95)
)
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/cosmo2-tokenizer")
tokenizer.pad_token = tokenizer.eos_token
vocab_size = tokenizer.vocab_size
# Adjusted batch size to 8
train_loader = load_cosmopedia_dataset(
batch_size=8, # Changed from 4 to 8
seq_length=512, # Kept at 512
tokenizer=tokenizer
)
import time
t1 = time.time()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Set environment variable for memory allocation
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512'
train_model(
config,
model,
train_loader,
train_loader,
optimizer=optimizer,
device=device,
num_epochs=1,
eval_freq=2500, # Increase eval frequency to every 500 steps
eval_iter=2500,
start_context="Once Upon a Time far far away in a galaxy",
tokenizer=tokenizer
)
t2 = time.time()
logger.info(f"Time taken for training: {t2 - t1:.2f} seconds")