Spaces:
Sleeping
Sleeping
import gradio as gr | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, TrainerCallback, DataCollatorWithPadding, DefaultDataCollator | |
from openai import OpenAI | |
from huggingface_hub import login | |
import datasets | |
from datasets import Dataset | |
import json | |
import pandas as pd | |
import torch | |
import wandb | |
import os | |
import sys | |
from peft import LoraConfig, TaskType, get_peft_model, AutoPeftModelForCausalLM | |
from sklearn.model_selection import train_test_split | |
IS_COLAB = False | |
if "google.colab" in sys.modules or "google.colab" in os.environ: | |
IS_COLAB = True | |
# Load env secrets | |
if IS_COLAB: | |
from google.colab import userdata | |
OPENAI_API_KEY=userdata.get('OPENAI_API_KEY') | |
WANDB_API_KEY=userdata.get('WANDB_API_KEY') | |
else: | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
WANDB_API_KEY = os.environ.get("WANDB_API_KEY") | |
# Authenticate Weights and Biases | |
wandb.login(key=WANDB_API_KEY) | |
# Custom callback to capture logs | |
class LoggingCallback(TrainerCallback): | |
def __init__(self): | |
self.logs = [] # Store logs | |
def on_log(self, args, state, control, logs=None, **kwargs): | |
if logs: | |
self.logs.append(logs) # Append logs to list | |
class LLMTrainingApp: | |
def __init__(self): | |
# self.metric = datasets.load_metric('sacrebleu') | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.finetuning_dataset = [] | |
self.prompt_template = """### Question: {question} ### Answer: """ | |
self.training_output = "/content/peft-model" if IS_COLAB else "./peft-model" | |
self.localpath = "/content/finetuned-model" if IS_COLAB else "./finetuned-model" | |
self.tokenizer = None | |
self.model = None | |
self.model_name = None | |
self.fine_tuned_model = None | |
self.teacher_model = OpenAI(api_key=OPENAI_API_KEY) | |
self.base_models = { | |
"SmolLM": {"hf_name":"HuggingFaceTB/SmolLM2-135M", | |
"model_size": "135M", | |
"training_size": "2T", | |
"context_window": "8192"}, | |
"GPT2": {"hf_name":"openai-community/gpt2", | |
"model_size": "137M", | |
"training_size": "2T", | |
"context_window": "1024"} | |
} | |
self.peft_config = LoraConfig(task_type=TaskType.CAUSAL_LM, inference_mode=False, r=8, lora_alpha=32, lora_dropout=0.1) | |
self.logging_callback = LoggingCallback() | |
def login_into_hf(self, token): | |
if not token: | |
return "β Please enter a valid token." | |
try: | |
login(token) | |
return f"β Logged in successfully!" | |
except Exception as e: | |
return f"β Login failed: {str(e)}" | |
def select_model(self, model_name): | |
self.model_name = model_name | |
model_hf_name = self.base_models[model_name]["hf_name"] | |
try: | |
self.tokenizer = AutoTokenizer.from_pretrained(model_hf_name) | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
base_model = AutoModelForCausalLM.from_pretrained( | |
model_hf_name, | |
torch_dtype="auto", | |
device_map="auto" | |
) | |
self.model = get_peft_model(base_model, self.peft_config) | |
params = self.model.get_nb_trainable_parameters() | |
percent_trainable = round(100 * (params[0] / params[1]), 2) | |
return f"β Loaded model into memory! Base Model card: {json.dumps(self.base_models[model_name])} - % of trainable parameters for PEFT model: {percent_trainable}" | |
except Exception as e: | |
return f"β Failed to load model and/or tokenizer: {str(e)}" | |
def create_golden_dataset(self, dataset): | |
try: | |
dataset = pd.DataFrame(dataset) | |
for i, row in dataset.iterrows(): | |
self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["Question"]), "answer": row["Answer"]}) | |
return "β Golden dataset created!" | |
except Exception as e: | |
return f"β Failed to create dataset: {str(e)}" | |
def extend_dataset(self): | |
try: | |
completion = self.teacher_model.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "user", | |
"content": """Given the following question-answer pairs, generate 10 similar pairs in the following json format below. Do not respond with anything other than the json. | |
```json | |
[ | |
{ | |
"question": "question 1", | |
"answer": "answer 1" | |
}, | |
{ | |
"question": "question 2", | |
"answer": "answer 2" | |
} | |
] | |
""" | |
} | |
] | |
) | |
response = completion.choices[0].message.content | |
print(f"raw response: {response}") | |
clean_response = response.replace("```json", "").replace("```", "").strip() | |
print(f"clean response: {clean_response}") | |
new_data = json.loads(clean_response) | |
for i, row in enumerate(new_data): | |
self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["question"]), "answer": row["answer"]}) | |
# create df to display | |
df = pd.DataFrame(new_data) | |
return "β Synthetic dataset generated!", df | |
except Exception as e: | |
return f"β Failed to generate synthetic dataset: {str(e)}", pd.DataFrame() | |
def tokenize_function(self, examples): | |
try: | |
# Tokenize the question and answer as input and target (labels) for causal LM | |
encoding = self.tokenizer(examples['question'], examples['answer'], padding=True) | |
# Set the labels as the input_ids | |
encoding['labels'] = encoding['input_ids'].copy() | |
return encoding | |
except Exception as e: | |
return f"β Failed to tokenize input: {str(e)}" | |
def prepare_data_for_training(self): | |
try: | |
dataset = Dataset.from_dict({ | |
"question": [entry["question"] for entry in self.finetuning_dataset], | |
"answer": [entry["answer"] for entry in self.finetuning_dataset], | |
}) | |
dataset = dataset.map(self.tokenize_function, batched=True) | |
train_dataset, test_dataset = dataset.train_test_split(test_size=0.2).values() | |
return {"train": train_dataset, "test": test_dataset} | |
except Exception as e: | |
return f"β Failed to prepare data for training: {str(e)}" | |
def compute_bleu(self, eval_pred): | |
predictions, labels = eval_pred | |
# # Flatten predictions and labels if they are in nested lists | |
# predictions = predictions.flatten() | |
# labels = labels.flatten() | |
# # Ensure that predictions and labels are integers | |
# predictions = predictions.astype(int) # Convert to integer | |
# labels = labels.astype(int) # Convert to integer | |
# # Decode the predicted tokens | |
# decoded_preds = self.tokenizer.decode(predictions, skip_special_tokens=True) | |
# decoded_labels = self.tokenizer.decode(labels, skip_special_tokens=True) | |
# result = self.metric.compute(predictions=[decoded_preds], references=[[decoded_labels]]) | |
result = {"bleu": 1} | |
return result | |
def log_generator(self): | |
""" Continuously send logs to frontend during training """ | |
for log in self.logging_callback.logs: | |
yield str(log) | |
def train_model(self): | |
try: | |
tokenized_datasets = self.prepare_data_for_training() | |
# Create training arguments | |
training_args = TrainingArguments( | |
output_dir=self.training_output, | |
learning_rate=1e-3, | |
per_device_train_batch_size=32, | |
per_device_eval_batch_size=32, | |
num_train_epochs=2, | |
weight_decay=0.01, | |
eval_strategy="epoch", | |
save_strategy="epoch", | |
load_best_model_at_end=True, | |
) | |
# Create trainer & attach logging callback | |
trainer = Trainer( | |
model=self.model, | |
args=training_args, | |
train_dataset=tokenized_datasets["train"], | |
eval_dataset=tokenized_datasets["test"], | |
tokenizer=self.tokenizer, | |
data_collator=DefaultDataCollator(), | |
compute_metrics=self.compute_bleu, | |
callbacks=[self.logging_callback], | |
) | |
# Start training and yield logs in real-time | |
trainer.train() | |
# for log in logging_callback.logs: | |
# yield str(log) | |
# Save trained model to HF | |
self.model.save_pretrained(self.localpath) # save to local | |
self.model.push_to_hub(f"{self.model_name}-lora") | |
return "β Training complete!" | |
except Exception as e: | |
return f"β Training failed: {str(e)}" | |
def run_inference(self, prompt): | |
try: | |
# Load fine-tuned memory into memory and set mode to eval | |
self.fine_tuned_model = AutoPeftModelForCausalLM.from_pretrained(self.localpath) | |
self.fine_tuned_model = self.fine_tuned_model.to(self.device) | |
self.fine_tuned_model.eval() | |
# Tokenize input with padding and attention mask | |
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True).to(self.device) | |
# Generate response | |
output = self.fine_tuned_model.generate( | |
**inputs, | |
max_length=50, # Limit response length | |
num_return_sequences=1, # Single response | |
temperature=0.7, # Sampling randomness | |
top_p=0.9 # Nucleus sampling | |
) | |
response = self.tokenizer.batch_decode(output.detach().cpu().numpy(), skip_special_tokens=True)[0] | |
return response | |
except Exception as e: | |
return f"β Inference failed: {str(e)}" | |
def build_ui(self): | |
with gr.Blocks() as demo: | |
gr.Markdown("# LLM Fine-tuning") | |
# Model Selection | |
with gr.Group(): | |
gr.Markdown("### 1. Login into Hugging Face") | |
with gr.Column(): | |
token = gr.Textbox(label="Enter Hugging Face Access Token (w/ write permissions)", type="password") | |
inference_btn = gr.Button("Login", variant="primary") | |
status = gr.Textbox(label="Status") | |
inference_btn.click(self.login_into_hf, inputs=token, outputs=status) | |
# Model Selection | |
with gr.Group(): | |
gr.Markdown("### 2. Select Model") | |
with gr.Column(): | |
model_dropdown = gr.Dropdown([key for key in self.base_models.keys()], label="Small Models") | |
select_model_btn = gr.Button("Select", variant="primary") | |
selected_model_text = gr.Textbox(label="Model Status") | |
select_model_btn.click(self.select_model, inputs=model_dropdown, outputs=[selected_model_text]) | |
# Create Golden Dataset | |
with gr.Group(): | |
gr.Markdown("### 3. Create Golden Dataset") | |
with gr.Column(): | |
dataset_table = gr.Dataframe( | |
headers=["Question", "Answer"], | |
value=[["", ""] for _ in range(3)], | |
label="Golden Dataset" | |
) | |
create_data_btn = gr.Button("Create Dataset", variant="primary") | |
dataset_status = gr.Textbox(label="Dataset Status") | |
create_data_btn.click(self.create_golden_dataset, inputs=dataset_table, outputs=[dataset_status]) | |
# Generate Full Dataset | |
with gr.Group(): | |
gr.Markdown("### 4. Extend Dataset with Synthetic Data") | |
with gr.Column(): | |
dataset_table = gr.Dataframe( | |
headers=["Question", "Answer"], | |
label="Golden + Synthetic Dataset" | |
) | |
generate_status = gr.Textbox(label="Dataset Generation Status") | |
generate_data_btn = gr.Button("Generate Dataset", variant="primary") | |
generate_data_btn.click(self.extend_dataset, outputs=[generate_status, dataset_table]) | |
# Train Model & Visualize Loss | |
with gr.Group(): | |
gr.Markdown("### 5. Start Logging") | |
with gr.Column(): | |
train_status = gr.Textbox(label="Training Status", lines=10) | |
train_btn = gr.Button("Train", variant="primary") | |
train_btn.click(self.log_generator, outputs=[train_status]) | |
# Train Model & Visualize Loss | |
with gr.Group(): | |
gr.Markdown("### 6. Train Model") | |
with gr.Column(): | |
train_status = gr.Textbox(label="Training Status") | |
train_btn = gr.Button("Train", variant="primary") | |
train_btn.click(self.train_model, outputs=[train_status]) | |
# Run Inference | |
with gr.Group(): | |
gr.Markdown("### 7. Run Inference") | |
with gr.Column(): | |
user_prompt = gr.Textbox(label="Enter Prompt") | |
inference_btn = gr.Button("Run Inference", variant="primary") | |
inference_output = gr.Textbox(label="Inference Output") | |
inference_btn.click(self.run_inference, inputs=user_prompt, outputs=inference_output) | |
return demo | |
# Create an instance of the app | |
app = LLMTrainingApp() | |
# Launch the Gradio app using the class method | |
app.build_ui().launch() |