Spaces:
Sleeping
Sleeping
import gradio as gr | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, TrainerCallback, DataCollatorWithPadding, DefaultDataCollator | |
from openai import OpenAI | |
from huggingface_hub import login | |
import datasets | |
from datasets import Dataset | |
import json | |
import pandas as pd | |
import numpy as np | |
import torch | |
import wandb | |
import copy | |
import os | |
import sys | |
import re | |
from peft import LoraConfig, TaskType, get_peft_model, AutoPeftModelForCausalLM | |
from sklearn.model_selection import train_test_split | |
import nltk | |
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction | |
IS_COLAB = False | |
if "google.colab" in sys.modules or "google.colab" in os.environ: | |
IS_COLAB = True | |
# Load env secrets | |
if IS_COLAB: | |
from google.colab import userdata | |
OPENAI_API_KEY=userdata.get('OPENAI_API_KEY') | |
WANDB_API_KEY=userdata.get('WANDB_API_KEY') | |
else: | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
WANDB_API_KEY = os.environ.get("WANDB_API_KEY") | |
# Authenticate Weights and Biases | |
wandb.login(key=WANDB_API_KEY) | |
# Custom callback to capture logs | |
class LoggingCallback(TrainerCallback): | |
def __init__(self): | |
self.logs = [] # Store logs | |
def on_log(self, args, state, control, logs=None, **kwargs): | |
if logs: | |
self.logs.append(logs) # Append logs to list | |
class LLMTrainingApp: | |
def __init__(self): | |
# self.metric = datasets.load_metric('sacrebleu') | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.finetuning_dataset = [] | |
self.prompt_template = """### Question: {question} ### Answer: """ | |
self.training_output = "/content/peft-model" if IS_COLAB else "./peft-model" | |
self.localpath = "/content/finetuned-model" if IS_COLAB else "./finetuned-model" | |
self.tokenizer = None | |
self.model = None | |
self.model_name = None | |
self.fine_tuned_model = None | |
self.teacher_model = OpenAI(api_key=OPENAI_API_KEY) | |
self.base_models = { | |
"SmolLM": {"hf_name":"HuggingFaceTB/SmolLM2-135M", | |
"model_size": "135M", | |
"training_size": "2T", | |
"context_window": "8192"}, | |
"GPT2": {"hf_name":"openai-community/gpt2", | |
"model_size": "137M", | |
"training_size": "2T", | |
"context_window": "1024"} | |
} | |
self.peft_config = LoraConfig(task_type=TaskType.CAUSAL_LM, inference_mode=False, r=8, lora_alpha=32, lora_dropout=0.1) | |
self.logging_callback = LoggingCallback() | |
def login_into_hf(self, token): | |
if not token: | |
return "β Please enter a valid token." | |
try: | |
login(token) | |
return f"β Logged in successfully!" | |
except Exception as e: | |
return f"β Login failed: {str(e)}" | |
def select_model(self, model_name): | |
self.model_name = model_name | |
model_hf_name = self.base_models[model_name]["hf_name"] | |
try: | |
self.tokenizer = AutoTokenizer.from_pretrained(model_hf_name) | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
base_model = AutoModelForCausalLM.from_pretrained( | |
model_hf_name, | |
torch_dtype="auto", | |
device_map="auto" | |
) | |
self.model = get_peft_model(base_model, self.peft_config) | |
params = self.model.get_nb_trainable_parameters() | |
percent_trainable = round(100 * (params[0] / params[1]), 2) | |
return f"β Loaded model into memory! Base Model card: {json.dumps(self.base_models[model_name])} - % of trainable parameters for PEFT model: {percent_trainable}%" | |
except Exception as e: | |
return f"β Failed to load model and/or tokenizer: {str(e)}" | |
def create_golden_dataset(self, dataset): | |
try: | |
dataset = pd.DataFrame(dataset) | |
for i, row in dataset.iterrows(): | |
self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["Question"]), "answer": row["Answer"]}) | |
return "β Golden dataset created!" | |
except Exception as e: | |
return f"β Failed to create dataset: {str(e)}" | |
def extend_dataset(self): | |
try: | |
completion = self.teacher_model.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "system", | |
"content": """Given the following question-answer pairs, generate 20 similar pairs in the following json format below. Do not respond with anything other than the json. | |
```json | |
[ | |
{ | |
"question": "question 1", | |
"answer": "answer 1" | |
}, | |
{ | |
"question": "question 2", | |
"answer": "answer 2" | |
} | |
] | |
""" | |
}, | |
{ | |
"role": "user", | |
"content": f"""Here are the question-answer pairs: {json.dumps(self.finetuning_dataset)} | |
""" | |
} | |
] | |
) | |
response = completion.choices[0].message.content | |
print(f"raw response: {response}") | |
clean_response = response.replace("```json", "").replace("```", "").strip() | |
print(f"clean response: {clean_response}") | |
new_data = json.loads(clean_response) | |
for i, row in enumerate(new_data): | |
row["question"] = row["question"].replace("### Question:", "").replace("### Answer:", "").strip() | |
row["answer"] = row["answer"].replace("### Answer:", "").strip() | |
self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["question"]), "answer": row["answer"]}) | |
# create df to display | |
df = pd.DataFrame(new_data) | |
return "β Synthetic dataset generated!", df | |
except Exception as e: | |
return f"β Failed to generate synthetic dataset: {str(e)}", pd.DataFrame() | |
def tokenize_function(self, examples): | |
try: | |
# Tokenize the question and answer as input and target (labels) for causal LM | |
encoding = self.tokenizer(examples['question'], examples['answer'], padding=True) | |
# Create labels (same as input_ids, but mask the non-answer part) | |
labels = copy.deepcopy(encoding["input_ids"]) | |
for i in range(len(examples["question"])): | |
# print(examples["question"][i]) | |
question_length = len(self.tokenizer(examples['question'][i], add_special_tokens=False)["input_ids"]) | |
# print(f'question length: {question_length}') | |
labels[i][:question_length] = [-100] * question_length # Mask question tokens | |
encoding["labels"] = labels | |
return encoding | |
except Exception as e: | |
return f"β Failed to tokenize input: {str(e)}" | |
def prepare_data_for_training(self): | |
try: | |
dataset = Dataset.from_dict({ | |
"question": [entry["question"] for entry in self.finetuning_dataset], | |
"answer": [entry["answer"] for entry in self.finetuning_dataset], | |
}) | |
dataset = dataset.map(self.tokenize_function, batched=True) | |
train_dataset, test_dataset = dataset.train_test_split(test_size=0.2).values() | |
return {"train": train_dataset, "test": test_dataset} | |
except Exception as e: | |
return f"β Failed to prepare data for training: {str(e)}" | |
def compute_bleu(self, eval_pred): | |
predictions, labels = eval_pred | |
self.predictions = predictions | |
self.labels = labels | |
# Convert logits to token IDs using argmax | |
predictions = np.argmax(predictions, axis=-1) | |
# Ensure predictions and labels are integers within vocab range | |
predictions = np.clip(predictions, 0, self.tokenizer.vocab_size - 1).astype(int) | |
labels = np.clip(labels, 0, self.tokenizer.vocab_size - 1).astype(int) | |
scores = [] | |
for prediction, label in zip(predictions, labels): | |
print(f"Prediction: {prediction}, Label: {label}") | |
# Remove leading 0's from array | |
prediction = prediction[np.argmax(prediction != 0):] | |
label = label[np.argmax(label != 0):] | |
# Decode predicted tokens | |
decoded_preds = self.tokenizer.decode(prediction, skip_special_tokens=True).split() | |
decoded_labels = self.tokenizer.decode(label, skip_special_tokens=True).split() | |
scores.append(sentence_bleu([decoded_labels], decoded_preds, smoothing_function=SmoothingFunction().method1)) | |
average_score = sum(scores) / len(scores) | |
print(f"Average BLEU score: {average_score}") | |
return {"bleu": average_score} | |
# return score | |
# return {"bleu": 1} | |
def train_model(self): | |
try: | |
tokenized_datasets = self.prepare_data_for_training() | |
print('finished preparing data for training') | |
# Create training arguments | |
training_args = TrainingArguments( | |
output_dir=self.training_output, | |
learning_rate=1e-3, | |
per_device_train_batch_size=32, | |
per_device_eval_batch_size=32, | |
num_train_epochs=5, | |
weight_decay=0.01, | |
eval_strategy="epoch", | |
save_strategy="epoch", | |
load_best_model_at_end=True, | |
) | |
print('training arguments set...') | |
# Create trainer & attach logging callback | |
trainer = Trainer( | |
model=self.model, | |
args=training_args, | |
train_dataset=tokenized_datasets["train"], | |
eval_dataset=tokenized_datasets["test"], | |
tokenizer=self.tokenizer, | |
data_collator=DefaultDataCollator(), | |
compute_metrics=self.compute_bleu, | |
callbacks=[self.logging_callback], | |
) | |
print('trainer set...') | |
# Start training and yield logs in real-time | |
trainer.train() | |
# Save trained model to HF | |
self.model.save_pretrained(self.localpath) # save to local | |
self.model.push_to_hub(f"{self.model_name}-lora") | |
return f"β Training complete!\n {json.dumps(self.logging_callback.logs)}" | |
except Exception as e: | |
return f"β Training failed: {str(e)}" | |
def run_inference(self, prompt): | |
try: | |
# Load fine-tuned memory into memory and set mode to eval | |
self.fine_tuned_model = AutoPeftModelForCausalLM.from_pretrained(self.localpath) | |
self.fine_tuned_model = self.fine_tuned_model.to(self.device) | |
self.fine_tuned_model.eval() | |
# Tokenize input with padding and attention mask | |
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True).to(self.device) | |
# Generate response | |
output = self.fine_tuned_model.generate( | |
**inputs, | |
max_length=50, # Limit response length | |
num_return_sequences=1, # Single response | |
temperature=0.7, # Sampling randomness | |
top_p=0.9 # Nucleus sampling | |
) | |
response = self.tokenizer.batch_decode(output.detach().cpu().numpy(), skip_special_tokens=True)[0] | |
return response | |
except Exception as e: | |
return f"β Inference failed: {str(e)}" | |
def build_ui(self): | |
with gr.Blocks() as demo: | |
gr.Markdown("# LLM Fine-tuning") | |
# Model Selection | |
with gr.Group(): | |
gr.Markdown("### 1. Login into Hugging Face") | |
with gr.Column(): | |
token = gr.Textbox(label="Enter Hugging Face Access Token (w/ write permissions)", type="password") | |
inference_btn = gr.Button("Login", variant="primary") | |
status = gr.Textbox(label="Status") | |
inference_btn.click(self.login_into_hf, inputs=token, outputs=status) | |
# Model Selection | |
with gr.Group(): | |
gr.Markdown("### 2. Select Model") | |
with gr.Column(): | |
model_dropdown = gr.Dropdown([key for key in self.base_models.keys()], label="Small Models") | |
select_model_btn = gr.Button("Select", variant="primary") | |
selected_model_text = gr.Textbox(label="Model Status") | |
select_model_btn.click(self.select_model, inputs=model_dropdown, outputs=[selected_model_text]) | |
# Create Golden Dataset | |
with gr.Group(): | |
gr.Markdown("### 3. Create Golden Dataset") | |
with gr.Column(): | |
dataset_table = gr.Dataframe( | |
headers=["Question", "Answer"], | |
value=[["", ""] for _ in range(3)], | |
label="Golden Dataset" | |
) | |
create_data_btn = gr.Button("Create Dataset", variant="primary") | |
dataset_status = gr.Textbox(label="Dataset Status") | |
create_data_btn.click(self.create_golden_dataset, inputs=dataset_table, outputs=[dataset_status]) | |
# Generate Full Dataset | |
with gr.Group(): | |
gr.Markdown("### 4. Extend Dataset with Synthetic Data") | |
with gr.Column(): | |
dataset_table = gr.Dataframe( | |
headers=["Question", "Answer"], | |
label="Golden + Synthetic Dataset" | |
) | |
generate_status = gr.Textbox(label="Dataset Generation Status") | |
generate_data_btn = gr.Button("Extend Dataset", variant="primary") | |
generate_data_btn.click(self.extend_dataset, outputs=[generate_status, dataset_table]) | |
# Train Model & Visualize Loss | |
with gr.Group(): | |
gr.Markdown("### 5. Train Model") | |
with gr.Column(): | |
train_status = gr.Textbox(label="Training Status") | |
train_btn = gr.Button("Train", variant="primary") | |
train_btn.click(self.train_model, outputs=[train_status]) | |
# Run Inference | |
with gr.Group(): | |
gr.Markdown("### 6. Run Inference") | |
with gr.Column(): | |
user_prompt = gr.Textbox(label="Enter Prompt") | |
inference_btn = gr.Button("Run Inference", variant="primary") | |
inference_output = gr.Textbox(label="Inference Output") | |
inference_btn.click(self.run_inference, inputs=user_prompt, outputs=inference_output) | |
return demo | |
# Create an instance of the app | |
app = LLMTrainingApp() | |
# Launch the Gradio app using the class method | |
app.build_ui().launch() | |