from typing import List, Dict, Tuple, Optional, Union import re import math import requests import numpy as np from huggingface_hub import HfApi, ModelInfo from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError def parse_model_entries(model_entries: List[str]) -> List[Dict[str, str]]: """ Parse a list of model entries into structured dictionaries with provider, model name, version, region, and type. Args: model_entries: List of model entry strings as found in models.txt Returns: List of dictionaries with parsed model information containing keys: - provider: Name of the provider (e.g., 'azure', 'openai', 'anthropic', etc.) - model_name: Base name of the model - version: Version of the model (if available) - region: Deployment region (if available) - model_type: Type of the model (text, image, audio based on pattern analysis) """ parsed_models = [] # Common provider prefixes to identify known_providers = [ 'azure', 'bedrock', 'anthropic', 'openai', 'cohere', 'google', 'mistral', 'meta', 'amazon', 'ai21', 'anyscale', 'stability', 'cloudflare', 'databricks', 'cerebras', 'assemblyai' ] # Image-related keywords to identify image models image_indicators = ['dall-e', 'stable-diffusion', 'image', 'canvas', 'x-', 'steps'] # Audio-related keywords to identify audio models audio_indicators = ['whisper', 'tts', 'audio', 'voice'] for entry in model_entries: model_info = { 'provider': '', 'model_name': '', 'version': '', 'region': '', 'model_type': 'text' # Default to text } # Check for image models if any(indicator in entry.lower() for indicator in image_indicators): model_info['model_type'] = 'image' # Check for audio models elif any(indicator in entry.lower() for indicator in audio_indicators): model_info['model_type'] = 'audio' # Parse the entry based on common patterns parts = entry.split('/') # Handle region and provider extraction if len(parts) >= 2: # Extract provider from the beginning (common pattern) if parts[0].lower() in known_providers: model_info['provider'] = parts[0].lower() # For bedrock and azure, the region is often the next part if parts[0].lower() in ['bedrock', 'azure'] and len(parts) >= 3: # Skip commitment parts if present if 'commitment' not in parts[1]: model_info['region'] = parts[1] # The last part typically contains the model name and possibly version model_with_version = parts[-1] else: # For single-part entries model_with_version = entry # Extract provider from model name if not already set if not model_info['provider']: # Look for known providers within the model name for provider in known_providers: if provider in model_with_version.lower() or f'{provider}.' in model_with_version.lower(): model_info['provider'] = provider # Remove provider prefix if it exists at the beginning if model_with_version.lower().startswith(f'{provider}.'): model_with_version = model_with_version[len(provider) + 1:] break # Extract version information version_match = re.search(r'[:.-]v(\d+(?:\.\d+)*(?:-\d+)?|\d+)(?::\d+)?$', model_with_version) if version_match: model_info['version'] = version_match.group(1) # Remove version from model name model_name = model_with_version[:version_match.start()] else: # Look for date-based versions like 2024-08-06 date_match = re.search(r'-(\d{4}-\d{2}-\d{2})$', model_with_version) if date_match: model_info['version'] = date_match.group(1) model_name = model_with_version[:date_match.start()] else: model_name = model_with_version # Clean up model name by removing trailing/leading separators model_info['model_name'] = model_name.strip('.-:') parsed_models.append(model_info) return parsed_models def create_model_hierarchy(model_entries: List[str]) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]: """ Organize model entries into a nested dictionary structure by provider, model, version, and region. Args: model_entries: List of model entry strings as found in models.txt Returns: Nested dictionary with the structure: Provider -> Model -> Version -> Region = full model string If region or version is None, they are replaced with "NA". """ # Parse the model entries to get structured information parsed_models = parse_model_entries(model_entries) # Create the nested dictionary structure hierarchy = {} for i, model_info in enumerate(parsed_models): provider = model_info['provider'] if model_info['provider'] else 'unknown' model_name = model_info['model_name'] version = model_info['version'] if model_info['version'] else 'NA' # For Azure models, always use 'NA' as region since they are globally available region = 'NA' if provider == 'azure' else (model_info['region'] if model_info['region'] else 'NA') # Initialize nested dictionaries if they don't exist if provider not in hierarchy: hierarchy[provider] = {} if model_name not in hierarchy[provider]: hierarchy[provider][model_name] = {} if version not in hierarchy[provider][model_name]: hierarchy[provider][model_name][version] = {} # Store the full model string at the leaf node hierarchy[provider][model_name][version][region] = model_entries[i] return hierarchy # NVIDIA GPU specifications - Name: (VRAM in GB, FP16 TOPS) NVIDIA_GPUS = { "RTX 3050": (8, 18), "RTX 3060": (12, 25), "RTX 3070": (8, 40), "RTX 3080": (10, 58), "RTX 3090": (24, 71), "RTX 4060": (8, 41), "RTX 4070": (12, 56), "RTX 4080": (16, 113), "RTX 4090": (24, 165), "RTX A2000": (6, 20), "RTX A4000": (16, 40), "RTX A5000": (24, 64), "RTX A6000": (48, 75), "A100 40GB": (40, 312), "A100 80GB": (80, 312), "H100 80GB": (80, 989), } def get_hf_model_info(model_id: str) -> Optional[ModelInfo]: """ Retrieve model information from the Hugging Face Hub. Args: model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") Returns: ModelInfo object or None if model not found """ try: api = HfApi() model_info = api.model_info(model_id) return model_info except (RepositoryNotFoundError, RevisionNotFoundError) as e: print(f"Error fetching model info: {e}") return None def extract_model_size(model_info: ModelInfo) -> Optional[Tuple[float, str]]: """ Extract the parameter size and precision from model information. Args: model_info: ModelInfo object from Hugging Face Hub Returns: Tuple of (parameter size in billions, precision) or None if not found """ # Try to get parameter count from model card if model_info.card_data is not None: if "model-index" in model_info.card_data and isinstance(model_info.card_data["model-index"], list): for item in model_info.card_data["model-index"]: if "parameters" in item: return float(item["parameters"]) / 1e9, "fp16" # Convert to billions and assume fp16 # Try to extract from model name name = model_info.id.lower() size_patterns = [ r"(\d+(\.\d+)?)b", # matches patterns like "1.3b" or "7b" r"-(\d+(\.\d+)?)b", # matches patterns like "llama-7b" r"(\d+(\.\d+)?)-b", # matches other formatting variations ] for pattern in size_patterns: match = re.search(pattern, name) if match: size_str = match.group(1) return float(size_str), "fp16" # Default to fp16 # Extract precision if available precision = "fp16" # Default precision_patterns = {"fp16": r"fp16", "int8": r"int8", "int4": r"int4", "fp32": r"fp32"} for prec, pattern in precision_patterns.items(): if re.search(pattern, name): precision = prec break # If couldn't determine size, check sibling models or readme if model_info.siblings: for sibling in model_info.siblings: if sibling.rfilename == "README.md" and sibling.size < 100000: # reasonable size for readme try: content = requests.get(sibling.lfs.url).text param_pattern = r"(\d+(\.\d+)?)\s*[Bb](illion)?\s*[Pp]arameters" match = re.search(param_pattern, content) if match: return float(match.group(1)), precision except: pass # As a last resort, try to analyze config.json if it exists config_sibling = next((s for s in model_info.siblings if s.rfilename == "config.json"), None) if config_sibling: try: config = requests.get(config_sibling.lfs.url).json() if "n_params" in config: return float(config["n_params"]) / 1e9, precision # Calculate from architecture if available if all(k in config for k in ["n_layer", "n_head", "n_embd"]): n_layer = config["n_layer"] n_embd = config["n_embd"] n_head = config["n_head"] # Transformer parameter estimation formula params = 12 * n_layer * (n_embd**2) * (1 + 13 / (12 * n_embd)) return params / 1e9, precision except: pass return None def calculate_vram_requirements(param_size: float, precision: str = "fp16") -> Dict[str, float]: """ Calculate VRAM requirements for inference using the EleutherAI transformer math formula. Args: param_size: Model size in billions of parameters precision: Model precision ("fp32", "fp16", "int8", "int4") Returns: Dictionary with various memory requirements in GB """ # Convert parameters to actual count param_count = param_size * 1e9 # Size per parameter based on precision bytes_per_param = { "fp32": 4, "fp16": 2, "int8": 1, "int4": 0.5, # 4 bits = 0.5 bytes }[precision] # Base model size (parameters * bytes per parameter) model_size_gb = (param_count * bytes_per_param) / (1024**3) # EleutherAI formula components for inference memory # Layer activations - scales with sequence length activation_factor = 1.2 # varies by architecture # KV cache size (scales with batch size and sequence length) # Estimate for single batch, 2048-token context kv_cache_size_gb = (param_count * 0.0625 * bytes_per_param) / (1024**3) # ~6.25% of params for KV cache # Total VRAM needed for inference total_inference_gb = model_size_gb + (model_size_gb * activation_factor) + kv_cache_size_gb # Add overhead for CUDA, buffers, and fragmentation overhead_gb = 0.8 # 800 MB overhead # Dynamic computation graph allocation compute_overhead_factor = 0.1 # varies based on attention computation method # Final VRAM estimate total_vram_required_gb = total_inference_gb + overhead_gb + (total_inference_gb * compute_overhead_factor) return { "model_size_gb": model_size_gb, "kv_cache_gb": kv_cache_size_gb, "activations_gb": model_size_gb * activation_factor, "overhead_gb": overhead_gb + (total_inference_gb * compute_overhead_factor), "total_vram_gb": total_vram_required_gb } def find_compatible_gpus(vram_required: float) -> List[str]: """ Find NVIDIA GPUs that can run a model requiring the specified VRAM. Args: vram_required: Required VRAM in GB Returns: List of compatible GPU names sorted by VRAM capacity (smallest first) """ compatible_gpus = [(name, specs[0]) for name, specs in NVIDIA_GPUS.items() if specs[0] >= vram_required] return [gpu[0] for gpu in sorted(compatible_gpus, key=lambda x: x[1])] def estimate_performance(param_size: float, precision: str, gpu_name: str) -> Dict[str, float]: """ Estimate token/second performance for a model on a specific GPU. Args: param_size: Model size in billions of parameters precision: Model precision gpu_name: Name of the NVIDIA GPU Returns: Dictionary with performance metrics """ if gpu_name not in NVIDIA_GPUS: return {"tokens_per_second": 0, "tflops_utilization": 0} gpu_vram, gpu_tops = NVIDIA_GPUS[gpu_name] # Calculate FLOPs per token (based on model size) # Formula: ~6 * num_parameters FLOPs per token (inference) flops_per_token = 6 * param_size * 1e9 # Convert TOPS to TFLOPS based on precision precision_factor = 1.0 if precision == "fp32" else 2.0 if precision == "fp16" else 4.0 if precision in ["int8", "int4"] else 1.0 gpu_tflops = gpu_tops * precision_factor # Practical utilization (GPUs rarely achieve 100% of theoretical performance) practical_utilization = 0.6 # 60% utilization # Calculate tokens per second effective_tflops = gpu_tflops * practical_utilization tokens_per_second = (effective_tflops * 1e12) / flops_per_token return { "tokens_per_second": tokens_per_second, "flops_per_token": flops_per_token, "tflops_utilization": practical_utilization, "effective_tflops": effective_tflops } def analyze_hf_model(model_id: str) -> Dict[str, any]: """ Comprehensive analysis of a Hugging Face model: - Downloads model information - Extracts parameter size and precision - Estimates VRAM requirements - Identifies compatible NVIDIA GPUs - Estimates performance on these GPUs Args: model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") Returns: Dictionary with analysis results or error message """ # Get model information model_info = get_hf_model_info(model_id) if not model_info: return {"error": f"Model {model_id} not found on Hugging Face"} # Extract model size and precision size_info = extract_model_size(model_info) if not size_info: return {"error": f"Couldn't determine parameter count for {model_id}"} param_size, precision = size_info # Calculate VRAM requirements vram_requirements = calculate_vram_requirements(param_size, precision) total_vram_gb = vram_requirements["total_vram_gb"] # Find compatible GPUs compatible_gpus = find_compatible_gpus(total_vram_gb) # Calculate performance for each compatible GPU gpu_performance = {} for gpu in compatible_gpus: gpu_performance[gpu] = estimate_performance(param_size, precision, gpu) # Determine the largest GPU that can run the model largest_compatible_gpu = compatible_gpus[-1] if compatible_gpus else None return { "model_id": model_id, "parameter_size": param_size, # in billions "precision": precision, "vram_requirements": vram_requirements, "compatible_gpus": compatible_gpus, "largest_compatible_gpu": largest_compatible_gpu, "gpu_performance": gpu_performance, #"model_info": { #"description": model_info.description, #"tags": model_info.tags, #"downloads": model_info.downloads, #"library": getattr(model_info, "library", None) #} }