#!/usr/bin/env python """ Simplified fine-tuning script for DeepSeek-R1-Distill-Qwen-14B-unsloth-bnb-4bit - Optimized for L40S GPU - Works with pre-tokenized datasets - Research training only (no inference) - Added CPU fallback support for Hugging Face Spaces """ import os import logging import json import torch import argparse from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, AutoConfig, BitsAndBytesConfig from transformers.data.data_collator import DataCollatorMixin from peft import LoraConfig, get_peft_model from dotenv import load_dotenv from huggingface_hub import HfApi, upload_folder # Basic environment setup for L40S os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True,max_split_size_mb:256" os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1" # Default dataset with proper namespace DEFAULT_DATASET = "George-API/phi4-cognitive-dataset" # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Check if CUDA is available for bitsandbytes def is_bnb_available(): """Check if bitsandbytes with CUDA is available""" try: import bitsandbytes as bnb if torch.cuda.is_available(): # Try to create a dummy 4-bit tensor to see if it works try: _ = torch.zeros(1, dtype=torch.float16, device="cuda").to(bnb.nn.Linear4bit) logger.info("BitsAndBytes with CUDA support is available") return True except Exception as e: logger.warning(f"CUDA available but bitsandbytes test failed: {e}") return False else: logger.warning("CUDA not available for bitsandbytes") return False except (ImportError, RuntimeError) as e: logger.warning(f"Error checking bitsandbytes: {e}") return False # Create a marker file to indicate training is active def create_training_marker(output_dir): os.makedirs(output_dir, exist_ok=True) with open("TRAINING_ACTIVE", "w") as f: f.write(f"Training active in {output_dir}") with open(os.path.join(output_dir, "RESEARCH_TRAINING_ONLY"), "w") as f: f.write("This model is for research training only. No interactive outputs.") # Remove the training marker file def remove_training_marker(): if os.path.exists("TRAINING_ACTIVE"): os.remove("TRAINING_ACTIVE") logger.info("Removed training active marker") # Function to upload model to Hugging Face Hub def upload_to_huggingface(output_dir, repo_name=None, private=False): """ Upload the trained model to Hugging Face Hub Args: output_dir: Directory containing the model files repo_name: Name of the repository on HF Hub (default: derived from output_dir) private: Whether the repository should be private (default: False) Returns: str: URL of the uploaded model on HF Hub """ logger.info(f"Uploading model from {output_dir} to Hugging Face Hub") # Get HF token from environment token = os.environ.get("HF_TOKEN") if not token: logger.error("HF_TOKEN environment variable not set. Please set it to upload to Hugging Face Hub.") logger.error("You can get a token from https://huggingface.co/settings/tokens") raise ValueError("HF_TOKEN not set") # Get or create repo name if not repo_name: # Use the output directory name as the repository name repo_name = os.path.basename(os.path.normpath(output_dir)) logger.info(f"Using repository name: {repo_name}") # Get HF username api = HfApi(token=token) user_info = api.whoami() username = user_info["name"] # Create full repository name full_repo_name = f"{username}/{repo_name}" logger.info(f"Creating repository: {full_repo_name}") # Create repository if it doesn't exist api.create_repo( repo_id=full_repo_name, exist_ok=True, private=private ) # Upload model files logger.info(f"Uploading files from {output_dir} to {full_repo_name}") api.upload_folder( folder_path=output_dir, repo_id=full_repo_name, commit_message="Upload model files" ) # Create model card model_card = f""" # {repo_name} This model was fine-tuned using the script at https://github.com/George-API/phi4-cognitive-dataset. ## Model details - Base model: DeepSeek-R1-Distill-Qwen-14B-unsloth-bnb-4bit - Dataset: {DEFAULT_DATASET} - Training: Research only """ with open(os.path.join(output_dir, "README.md"), "w") as f: f.write(model_card) # Upload the model card api.upload_file( path_or_fileobj=os.path.join(output_dir, "README.md"), path_in_repo="README.md", repo_id=full_repo_name, commit_message="Add model card" ) logger.info(f"Model successfully uploaded to https://huggingface.co/{full_repo_name}") return f"https://huggingface.co/{full_repo_name}" # Custom data collator for pre-tokenized data class PreTokenizedCollator(DataCollatorMixin): def __init__(self, pad_token_id=0, tokenizer=None): self.pad_token_id = pad_token_id self.tokenizer = tokenizer # Keep reference to tokenizer for fallback def __call__(self, features): # Extract features properly from the batch processed_features = [] for feature in features: # If input_ids is directly available, use it if 'input_ids' in feature and isinstance(feature['input_ids'], list): processed_features.append(feature) continue # If input_ids is not available, try to extract from conversations if 'input_ids' not in feature and 'conversations' in feature: conversations = feature['conversations'] if isinstance(conversations, list) and len(conversations) > 0: # Case 1: If conversations has 'input_ids' field (pre-tokenized) if isinstance(conversations[0], dict) and 'input_ids' in conversations[0]: feature['input_ids'] = conversations[0]['input_ids'] # Case 2: If conversations itself contains input_ids elif all(isinstance(x, int) for x in conversations): feature['input_ids'] = conversations # Case 3: If conversations has 'content' field elif isinstance(conversations[0], dict) and 'content' in conversations[0]: content = conversations[0]['content'] # If content is already tokens, use directly if isinstance(content, list) and all(isinstance(x, int) for x in content): feature['input_ids'] = content # If content is a string and we have tokenizer, tokenize as fallback elif isinstance(content, str) and self.tokenizer: logger.warning("Tokenizing string content as fallback") feature['input_ids'] = self.tokenizer.encode(content, add_special_tokens=False) # Ensure input_ids is present and is a list of integers if 'input_ids' in feature: if isinstance(feature['input_ids'], str) and self.tokenizer: feature['input_ids'] = self.tokenizer.encode(feature['input_ids'], add_special_tokens=False) elif not isinstance(feature['input_ids'], list): try: feature['input_ids'] = list(feature['input_ids']) except Exception as e: logger.error(f"Could not convert input_ids to list: {e}") continue processed_features.append(feature) if len(processed_features) == 0: raise ValueError("No valid examples found. Check dataset structure.") # Determine max length in this batch batch_max_len = max(len(x["input_ids"]) for x in processed_features) # Initialize batch tensors batch = { "input_ids": torch.ones((len(processed_features), batch_max_len), dtype=torch.long) * self.pad_token_id, "attention_mask": torch.zeros((len(processed_features), batch_max_len), dtype=torch.long), "labels": torch.ones((len(processed_features), batch_max_len), dtype=torch.long) * -100 # -100 is ignored in loss } # Fill batch tensors for i, feature in enumerate(processed_features): input_ids = feature["input_ids"] seq_len = len(input_ids) # Convert to tensor if it's a list if isinstance(input_ids, list): input_ids = torch.tensor(input_ids, dtype=torch.long) # Copy data to batch tensors batch["input_ids"][i, :seq_len] = input_ids batch["attention_mask"][i, :seq_len] = 1 # If there are labels, use them, otherwise use input_ids if "labels" in feature: labels = feature["labels"] if isinstance(labels, list): labels = torch.tensor(labels, dtype=torch.long) batch["labels"][i, :len(labels)] = labels else: batch["labels"][i, :seq_len] = input_ids return batch # Load and prepare dataset with proper sorting def load_and_prepare_dataset(dataset_name, config): """Load and prepare the dataset for fine-tuning with proper sorting""" # Use the default dataset if the provided one matches the default name without namespace if dataset_name == "phi4-cognitive-dataset": dataset_name = DEFAULT_DATASET logger.info(f"Using full dataset path: {dataset_name}") logger.info(f"Loading dataset: {dataset_name}") try: # Load dataset try: dataset = load_dataset(dataset_name) except Exception as e: if "doesn't exist on the Hub or cannot be accessed" in str(e): logger.error(f"Dataset '{dataset_name}' not found. Make sure it exists and is accessible.") logger.error(f"If using a private dataset, check your HF_TOKEN is set in your environment.") logger.error(f"If missing namespace, try using the full path: 'George-API/phi4-cognitive-dataset'") raise # Extract the split we want to use (usually 'train') if 'train' in dataset: dataset = dataset['train'] # Get the dataset config dataset_config = config.get("dataset_config", {}) sort_field = dataset_config.get("sort_by_field", "prompt_number") # Sort in ascending order by specified field logger.info(f"Sorting dataset by {sort_field} in ascending order") dataset = dataset.sort(sort_field) # Print dataset info logger.info(f"Dataset loaded with {len(dataset)} entries") logger.info(f"Dataset columns: {dataset.column_names}") # Print sample for debugging if len(dataset) > 0: logger.info(f"Sample entry structure: {list(dataset[0].keys())}") return dataset except Exception as e: logger.error(f"Error loading dataset: {str(e)}") raise # Main training function def train(config_path, dataset_name, output_dir, upload_to_hub=False, hub_repo_name=None, private_repo=False): # Load environment variables load_dotenv() # Load config with open(config_path, 'r') as f: config = json.load(f) # Create training marker create_training_marker(output_dir) try: # Extract configs model_config = config.get("model_config", {}) training_config = config.get("training_config", {}) hardware_config = config.get("hardware_config", {}) lora_config = config.get("lora_config", {}) dataset_config = config.get("dataset_config", {}) # Log dataset info before loading logger.info(f"Will load dataset: {dataset_name}") if dataset_name != DEFAULT_DATASET and "phi4-cognitive-dataset" in dataset_name: logger.warning(f"Dataset name may need namespace prefix. Current: {dataset_name}") # Load and prepare dataset with proper sorting dataset = load_and_prepare_dataset(dataset_name, config) # Load model settings model_name = model_config.get("model_name_or_path") logger.info(f"Using model: {model_name}") # Initialize tokenizer logger.info("Loading tokenizer") tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True ) tokenizer.pad_token = tokenizer.eos_token # Get quantization config quant_config = config.get("quantization_config", {}) # Check if bitsandbytes with CUDA is available use_4bit = is_bnb_available() and quant_config.get("load_in_4bit", True) # Create model with proper configuration logger.info(f"Loading model (4-bit quantization: {use_4bit})") if use_4bit: # Create quantization config for GPU bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type=quant_config.get("bnb_4bit_quant_type", "nf4"), bnb_4bit_use_double_quant=quant_config.get("bnb_4bit_use_double_quant", True) ) # Load 4-bit quantized model for GPU model = AutoModelForCausalLM.from_pretrained( model_name, quantization_config=bnb_config, device_map="auto", torch_dtype=torch.float16, trust_remote_code=True, use_cache=model_config.get("use_cache", False), attn_implementation=hardware_config.get("attn_implementation", "eager") ) else: # CPU fallback (or non-quantized GPU) mode logger.warning("Loading model in CPU fallback mode (no 4-bit quantization)") # Determine best dtype based on available hardware if torch.cuda.is_available(): dtype = torch.float16 device_map = "auto" logger.info("Using GPU with fp16") else: dtype = torch.float32 device_map = "cpu" logger.info("Using CPU with fp32") # Load model without quantization model = AutoModelForCausalLM.from_pretrained( model_name, device_map=device_map, torch_dtype=dtype, trust_remote_code=True, use_cache=model_config.get("use_cache", False), low_cpu_mem_usage=True ) # Apply rope scaling if configured if "rope_scaling" in model_config: logger.info(f"Applying rope scaling: {model_config['rope_scaling']}") if hasattr(model.config, "rope_scaling"): model.config.rope_scaling = model_config["rope_scaling"] # Create LoRA config logger.info("Creating LoRA configuration") lora_config_obj = LoraConfig( r=lora_config.get("r", 16), lora_alpha=lora_config.get("lora_alpha", 32), lora_dropout=lora_config.get("lora_dropout", 0.05), bias=lora_config.get("bias", "none"), target_modules=lora_config.get("target_modules", ["q_proj", "k_proj", "v_proj", "o_proj"]) ) # Apply LoRA to model logger.info("Applying LoRA to model") model = get_peft_model(model, lora_config_obj) logger.info("Successfully applied LoRA") # Determine batch size based on available hardware if torch.cuda.is_available(): gpu_info = torch.cuda.get_device_properties(0) logger.info(f"GPU: {gpu_info.name}, VRAM: {gpu_info.total_memory / 1e9:.2f} GB") # Check if it's an L40S or high-memory GPU if "L40S" in gpu_info.name or gpu_info.total_memory > 40e9: logger.info("Detected L40S GPU - optimizing for high-memory GPU") per_device_train_batch_size = training_config.get("per_device_train_batch_size", 4) else: # Use a smaller batch size for other GPUs per_device_train_batch_size = 2 logger.info(f"Using conservative batch size for non-L40S GPU: {per_device_train_batch_size}") else: # Use minimal batch size for CPU per_device_train_batch_size = 1 logger.warning("No GPU detected - using minimal batch size for CPU training") # Configure reporting backends reports = training_config.get("report_to", ["tensorboard"]) # Create training arguments logger.info("Creating training arguments") training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=training_config.get("num_train_epochs", 3), per_device_train_batch_size=per_device_train_batch_size, gradient_accumulation_steps=training_config.get("gradient_accumulation_steps", 4), learning_rate=training_config.get("learning_rate", 2e-5), lr_scheduler_type=training_config.get("lr_scheduler_type", "cosine"), warmup_ratio=training_config.get("warmup_ratio", 0.03), weight_decay=training_config.get("weight_decay", 0.01), optim=training_config.get("optim", "adamw_torch"), fp16=torch.cuda.is_available() and hardware_config.get("fp16", True), bf16=torch.cuda.is_available() and hardware_config.get("bf16", False), max_grad_norm=training_config.get("max_grad_norm", 0.3), logging_steps=training_config.get("logging_steps", 10), save_steps=training_config.get("save_steps", 200), save_total_limit=training_config.get("save_total_limit", 3), evaluation_strategy=training_config.get("evaluation_strategy", "steps"), eval_steps=training_config.get("eval_steps", 200), load_best_model_at_end=training_config.get("load_best_model_at_end", True), report_to=reports, logging_first_step=training_config.get("logging_first_step", True), disable_tqdm=training_config.get("disable_tqdm", False), remove_unused_columns=False, gradient_checkpointing=torch.cuda.is_available() and hardware_config.get("gradient_checkpointing", True), dataloader_num_workers=training_config.get("dataloader_num_workers", 4) ) # Create trainer with pre-tokenized collator logger.info("Creating trainer with pre-tokenized collator") trainer = Trainer( model=model, args=training_args, train_dataset=dataset, data_collator=PreTokenizedCollator( pad_token_id=tokenizer.pad_token_id, tokenizer=tokenizer ), ) # Start training logger.info("Starting training - RESEARCH PHASE ONLY") trainer.train() # Save the model logger.info(f"Saving model to {output_dir}") trainer.save_model(output_dir) # Save LoRA adapter separately lora_output_dir = os.path.join(output_dir, "lora_adapter") model.save_pretrained(lora_output_dir) logger.info(f"Saved LoRA adapter to {lora_output_dir}") # Save tokenizer tokenizer_output_dir = os.path.join(output_dir, "tokenizer") tokenizer.save_pretrained(tokenizer_output_dir) logger.info(f"Saved tokenizer to {tokenizer_output_dir}") # Save config for reference with open(os.path.join(output_dir, "training_config.json"), "w") as f: json.dump(config, f, indent=2) logger.info("Training complete - RESEARCH PHASE ONLY") # Upload to Hugging Face Hub if requested if upload_to_hub: hub_url = upload_to_huggingface( output_dir=output_dir, repo_name=hub_repo_name, private=private_repo ) logger.info(f"Model uploaded to Hugging Face Hub: {hub_url}") return output_dir finally: # Always remove the training marker when done remove_training_marker() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fine-tune DeepSeek model (Research Only)") parser.add_argument("--config", type=str, default="transformers_config.json", help="Path to the configuration file") parser.add_argument("--dataset", type=str, default=DEFAULT_DATASET, help="Dataset name or path") parser.add_argument("--output_dir", type=str, default="fine_tuned_model", help="Output directory for the fine-tuned model") parser.add_argument("--upload_to_hub", action="store_true", help="Upload the model to Hugging Face Hub after training") parser.add_argument("--hub_repo_name", type=str, default=None, help="Repository name for the model on Hugging Face Hub") parser.add_argument("--private_repo", action="store_true", help="Make the Hugging Face Hub repository private") args = parser.parse_args() try: output_path = train( args.config, args.dataset, args.output_dir, upload_to_hub=args.upload_to_hub, hub_repo_name=args.hub_repo_name, private_repo=args.private_repo ) print(f"Research training completed. Model saved to: {output_path}") if args.upload_to_hub: print("Model was also uploaded to Hugging Face Hub.") except Exception as e: logging.error(f"Training failed: {str(e)}") remove_training_marker() # Clean up marker if training fails raise