|
import gradio as gr |
|
import torch |
|
from transformers import AutoTokenizer |
|
import yaml |
|
from deepseek_v3 import DeepSeekV3Model |
|
import os |
|
|
|
|
|
def generate_helper(model, idx, max_new_tokens, context_length, temperature=1.0, top_k=None, eos_token=None, device=None): |
|
|
|
model = model.to(device) |
|
idx = idx.to(device) |
|
model.eval() |
|
for _ in range(max_new_tokens): |
|
idx_cond = idx[:, -context_length:] |
|
with torch.no_grad(): |
|
logits, _ = model(idx_cond) |
|
logits = logits.view(idx_cond.shape[0], -1, model.config['vocab_size']) |
|
|
|
|
|
logits = logits[:, -1, :] |
|
|
|
if top_k is not None: |
|
|
|
top_logits, top_pos = torch.topk(logits, top_k) |
|
min_logit = top_logits[:, -1].unsqueeze(-1) |
|
logits = torch.where(logits < min_logit, |
|
torch.tensor(float('-inf')).to(logits.device), |
|
logits) |
|
|
|
|
|
if temperature > 0.0: |
|
logits /= temperature |
|
probs = torch.softmax(logits, dim=-1) |
|
idx_next = torch.multinomial(probs, num_samples=1) |
|
else: |
|
idx_next = torch.argmax(logits, dim=-1, keepdim=True) |
|
|
|
if idx_next.item() == eos_token: |
|
break |
|
|
|
idx = torch.cat((idx, idx_next), dim=1) |
|
model.train() |
|
return idx |
|
|
|
def get_config(config_path): |
|
config = yaml.load(open(config_path, "r"), Loader=yaml.FullLoader) |
|
return config |
|
|
|
def extract_and_save_weights(config_path, checkpoint_path, weights_path, device): |
|
"""Extract model weights from checkpoint and save as a separate .pt file""" |
|
print(f"Extracting weights from checkpoint: {checkpoint_path}") |
|
config = get_config(config_path) |
|
model = DeepSeekV3Model(config['model']) |
|
|
|
|
|
checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) |
|
state_dict = checkpoint['model_state_dict'] |
|
state_dict = {k.replace('_orig_mod.', ''): v for k, v in state_dict.items()} |
|
|
|
|
|
torch.save(state_dict, weights_path) |
|
print(f"Model weights saved to: {weights_path}") |
|
return state_dict |
|
|
|
def load_weights(config, weights_path, device): |
|
"""Load model from weights file""" |
|
print(f"Loading model from weights: {weights_path}") |
|
model = DeepSeekV3Model(config['model']) |
|
state_dict = torch.load(weights_path, map_location=torch.device(device)) |
|
model.load_state_dict(state_dict) |
|
return model |
|
|
|
def get_tokenizer(config): |
|
tokenizer_path = config['tokenizer']['tokenizer_name_or_path'] |
|
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
vocab_size = tokenizer.vocab_size |
|
return tokenizer, vocab_size |
|
|
|
def generate_text(model, tokenizer, input_text, max_new_tokens, context_length, temperature, top_k, eos_token, device): |
|
encoded_text = tokenizer.encode(input_text, return_tensors="pt").to(device) |
|
generated_text = generate_helper(model, |
|
idx=encoded_text, |
|
max_new_tokens=max_new_tokens, |
|
context_length=context_length, |
|
temperature=temperature, |
|
top_k=top_k, |
|
eos_token=eos_token, |
|
device=device) |
|
return tokenizer.decode(generated_text.squeeze(0)) |
|
|
|
|
|
|
|
|
|
def initialize_model(): |
|
config_path = "config_smollm2_135M.yaml" |
|
|
|
model_id = "crpatel/DeepSeek-V3-SmolLm2" |
|
weights_path = "model.pt" |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
config = get_config(config_path) |
|
|
|
|
|
if not os.path.exists(weights_path): |
|
try: |
|
from huggingface_hub import hf_hub_download |
|
print(f"Downloading model weights from Hugging Face Hub: {model_id}") |
|
weights_path = hf_hub_download( |
|
repo_id=model_id, |
|
filename="model.pt" |
|
) |
|
except Exception as e: |
|
print(f"Error downloading weights: {e}") |
|
print("Falling back to local checkpoint extraction if available") |
|
checkpoint_path = "checkpoints/model_100000_step_avg_loss_4.61663.pth" |
|
if os.path.exists(checkpoint_path): |
|
extract_and_save_weights(config_path, checkpoint_path, weights_path, device) |
|
else: |
|
raise FileNotFoundError(f"Neither weights file nor checkpoint found. Please upload model to HF Hub first.") |
|
|
|
|
|
model = load_weights(config, weights_path, device) |
|
model.to(device) |
|
model.eval() |
|
|
|
|
|
tokenizer, vocab_size = get_tokenizer(config) |
|
|
|
return model, tokenizer, device |
|
|
|
def generate_response(prompt, max_new_tokens): |
|
generated_text = generate_text( |
|
model=model, |
|
tokenizer=tokenizer, |
|
input_text=prompt, |
|
max_new_tokens=max_new_tokens, |
|
context_length=256, |
|
temperature=0.9, |
|
top_k=2, |
|
eos_token=tokenizer.eos_token_id, |
|
device=device |
|
) |
|
return generated_text |
|
|
|
|
|
model, tokenizer, device = initialize_model() |
|
|
|
|
|
iface = gr.Interface( |
|
fn=generate_response, |
|
inputs=[ |
|
gr.Textbox( |
|
lines=3, |
|
placeholder="Enter your prompt here...", |
|
label="Input Prompt" |
|
), |
|
gr.Slider( |
|
minimum=50, |
|
maximum=256, |
|
value=100, |
|
step=10, |
|
label="Max New Tokens" |
|
) |
|
], |
|
outputs=gr.Textbox( |
|
lines=5, |
|
label="Generated Text" |
|
), |
|
title="DeepSeek-V3 Text Generator", |
|
description="Enter a prompt and adjust the maximum number of tokens to generate text with DeepSeek-V3 SmolLM2 model." |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |