|
from typing import List, Dict, Tuple, Optional, Union |
|
import re |
|
import math |
|
import requests |
|
import numpy as np |
|
from huggingface_hub import HfApi, ModelInfo |
|
from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError |
|
|
|
def parse_model_entries(model_entries: List[str]) -> List[Dict[str, str]]: |
|
""" |
|
Parse a list of model entries into structured dictionaries with provider, model name, version, region, and type. |
|
|
|
Args: |
|
model_entries: List of model entry strings as found in models.txt |
|
|
|
Returns: |
|
List of dictionaries with parsed model information containing keys: |
|
- provider: Name of the provider (e.g., 'azure', 'openai', 'anthropic', etc.) |
|
- model_name: Base name of the model |
|
- version: Version of the model (if available) |
|
- region: Deployment region (if available) |
|
- model_type: Type of the model (text, image, audio based on pattern analysis) |
|
""" |
|
parsed_models = [] |
|
|
|
|
|
known_providers = [ |
|
'azure', 'bedrock', 'anthropic', 'openai', 'cohere', 'google', |
|
'mistral', 'meta', 'amazon', 'ai21', 'anyscale', 'stability', |
|
'cloudflare', 'databricks', 'cerebras', 'assemblyai' |
|
] |
|
|
|
|
|
image_indicators = ['dall-e', 'stable-diffusion', 'image', 'canvas', 'x-', 'steps'] |
|
|
|
|
|
audio_indicators = ['whisper', 'tts', 'audio', 'voice'] |
|
|
|
for entry in model_entries: |
|
model_info = { |
|
'provider': '', |
|
'model_name': '', |
|
'version': '', |
|
'region': '', |
|
'model_type': 'text' |
|
} |
|
|
|
|
|
if any(indicator in entry.lower() for indicator in image_indicators): |
|
model_info['model_type'] = 'image' |
|
|
|
|
|
elif any(indicator in entry.lower() for indicator in audio_indicators): |
|
model_info['model_type'] = 'audio' |
|
|
|
|
|
parts = entry.split('/') |
|
|
|
|
|
if len(parts) >= 2: |
|
|
|
if parts[0].lower() in known_providers: |
|
model_info['provider'] = parts[0].lower() |
|
|
|
|
|
if parts[0].lower() in ['bedrock', 'azure'] and len(parts) >= 3: |
|
|
|
if 'commitment' not in parts[1]: |
|
model_info['region'] = parts[1] |
|
|
|
|
|
model_with_version = parts[-1] |
|
else: |
|
|
|
model_with_version = entry |
|
|
|
|
|
if not model_info['provider']: |
|
|
|
for provider in known_providers: |
|
if provider in model_with_version.lower() or f'{provider}.' in model_with_version.lower(): |
|
model_info['provider'] = provider |
|
|
|
if model_with_version.lower().startswith(f'{provider}.'): |
|
model_with_version = model_with_version[len(provider) + 1:] |
|
break |
|
|
|
|
|
version_match = re.search(r'[:.-]v(\d+(?:\.\d+)*(?:-\d+)?|\d+)(?::\d+)?$', model_with_version) |
|
if version_match: |
|
model_info['version'] = version_match.group(1) |
|
|
|
model_name = model_with_version[:version_match.start()] |
|
else: |
|
|
|
date_match = re.search(r'-(\d{4}-\d{2}-\d{2})$', model_with_version) |
|
if date_match: |
|
model_info['version'] = date_match.group(1) |
|
model_name = model_with_version[:date_match.start()] |
|
else: |
|
model_name = model_with_version |
|
|
|
|
|
model_info['model_name'] = model_name.strip('.-:') |
|
|
|
parsed_models.append(model_info) |
|
|
|
return parsed_models |
|
|
|
|
|
def create_model_hierarchy(model_entries: List[str]) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]: |
|
""" |
|
Organize model entries into a nested dictionary structure by provider, model, version, and region. |
|
|
|
Args: |
|
model_entries: List of model entry strings as found in models.txt |
|
|
|
Returns: |
|
Nested dictionary with the structure: |
|
Provider -> Model -> Version -> Region = full model string |
|
If region or version is None, they are replaced with "NA". |
|
""" |
|
|
|
parsed_models = parse_model_entries(model_entries) |
|
|
|
|
|
hierarchy = {} |
|
|
|
for i, model_info in enumerate(parsed_models): |
|
provider = model_info['provider'] if model_info['provider'] else 'unknown' |
|
model_name = model_info['model_name'] |
|
version = model_info['version'] if model_info['version'] else 'NA' |
|
|
|
region = 'NA' if provider == 'azure' else (model_info['region'] if model_info['region'] else 'NA') |
|
|
|
|
|
if provider not in hierarchy: |
|
hierarchy[provider] = {} |
|
|
|
if model_name not in hierarchy[provider]: |
|
hierarchy[provider][model_name] = {} |
|
|
|
if version not in hierarchy[provider][model_name]: |
|
hierarchy[provider][model_name][version] = {} |
|
|
|
|
|
hierarchy[provider][model_name][version][region] = model_entries[i] |
|
|
|
return hierarchy |
|
|
|
|
|
|
|
NVIDIA_GPUS = { |
|
"RTX 3050": (8, 18), |
|
"RTX 3060": (12, 25), |
|
"RTX 3070": (8, 40), |
|
"RTX 3080": (10, 58), |
|
"RTX 3090": (24, 71), |
|
"RTX 4060": (8, 41), |
|
"RTX 4070": (12, 56), |
|
"RTX 4080": (16, 113), |
|
"RTX 4090": (24, 165), |
|
"RTX A2000": (6, 20), |
|
"RTX A4000": (16, 40), |
|
"RTX A5000": (24, 64), |
|
"RTX A6000": (48, 75), |
|
"A100 40GB": (40, 312), |
|
"A100 80GB": (80, 312), |
|
"H100 80GB": (80, 989), |
|
} |
|
|
|
|
|
def get_hf_model_info(model_id: str) -> Optional[ModelInfo]: |
|
""" |
|
Retrieve model information from the Hugging Face Hub. |
|
|
|
Args: |
|
model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") |
|
|
|
Returns: |
|
ModelInfo object or None if model not found |
|
""" |
|
try: |
|
api = HfApi() |
|
model_info = api.model_info(model_id) |
|
return model_info |
|
except (RepositoryNotFoundError, RevisionNotFoundError) as e: |
|
print(f"Error fetching model info: {e}") |
|
return None |
|
|
|
|
|
def extract_model_size(model_info: ModelInfo) -> Optional[Tuple[float, str]]: |
|
""" |
|
Extract the parameter size and precision from model information. |
|
|
|
Args: |
|
model_info: ModelInfo object from Hugging Face Hub |
|
|
|
Returns: |
|
Tuple of (parameter size in billions, precision) or None if not found |
|
""" |
|
|
|
if model_info.card_data is not None: |
|
if "model-index" in model_info.card_data and isinstance(model_info.card_data["model-index"], list): |
|
for item in model_info.card_data["model-index"]: |
|
if "parameters" in item: |
|
return float(item["parameters"]) / 1e9, "fp16" |
|
|
|
|
|
name = model_info.id.lower() |
|
size_patterns = [ |
|
r"(\d+(\.\d+)?)b", |
|
r"-(\d+(\.\d+)?)b", |
|
r"(\d+(\.\d+)?)-b", |
|
] |
|
|
|
for pattern in size_patterns: |
|
match = re.search(pattern, name) |
|
if match: |
|
size_str = match.group(1) |
|
return float(size_str), "fp16" |
|
|
|
|
|
precision = "fp16" |
|
precision_patterns = {"fp16": r"fp16", "int8": r"int8", "int4": r"int4", "fp32": r"fp32"} |
|
for prec, pattern in precision_patterns.items(): |
|
if re.search(pattern, name): |
|
precision = prec |
|
break |
|
|
|
|
|
if model_info.siblings: |
|
for sibling in model_info.siblings: |
|
if sibling.rfilename == "README.md" and sibling.size < 100000: |
|
try: |
|
content = requests.get(sibling.lfs.url).text |
|
param_pattern = r"(\d+(\.\d+)?)\s*[Bb](illion)?\s*[Pp]arameters" |
|
match = re.search(param_pattern, content) |
|
if match: |
|
return float(match.group(1)), precision |
|
except: |
|
pass |
|
|
|
|
|
config_sibling = next((s for s in model_info.siblings if s.rfilename == "config.json"), None) |
|
if config_sibling: |
|
try: |
|
config = requests.get(config_sibling.lfs.url).json() |
|
if "n_params" in config: |
|
return float(config["n_params"]) / 1e9, precision |
|
|
|
if all(k in config for k in ["n_layer", "n_head", "n_embd"]): |
|
n_layer = config["n_layer"] |
|
n_embd = config["n_embd"] |
|
n_head = config["n_head"] |
|
|
|
params = 12 * n_layer * (n_embd**2) * (1 + 13 / (12 * n_embd)) |
|
return params / 1e9, precision |
|
except: |
|
pass |
|
|
|
return None |
|
|
|
|
|
def calculate_vram_requirements(param_size: float, precision: str = "fp16") -> Dict[str, float]: |
|
""" |
|
Calculate VRAM requirements for inference using the EleutherAI transformer math formula. |
|
|
|
Args: |
|
param_size: Model size in billions of parameters |
|
precision: Model precision ("fp32", "fp16", "int8", "int4") |
|
|
|
Returns: |
|
Dictionary with various memory requirements in GB |
|
""" |
|
|
|
param_count = param_size * 1e9 |
|
|
|
|
|
bytes_per_param = { |
|
"fp32": 4, |
|
"fp16": 2, |
|
"int8": 1, |
|
"int4": 0.5, |
|
}[precision] |
|
|
|
|
|
model_size_gb = (param_count * bytes_per_param) / (1024**3) |
|
|
|
|
|
|
|
activation_factor = 1.2 |
|
|
|
|
|
|
|
kv_cache_size_gb = (param_count * 0.0625 * bytes_per_param) / (1024**3) |
|
|
|
|
|
total_inference_gb = model_size_gb + (model_size_gb * activation_factor) + kv_cache_size_gb |
|
|
|
|
|
overhead_gb = 0.8 |
|
|
|
|
|
compute_overhead_factor = 0.1 |
|
|
|
|
|
total_vram_required_gb = total_inference_gb + overhead_gb + (total_inference_gb * compute_overhead_factor) |
|
|
|
return { |
|
"model_size_gb": model_size_gb, |
|
"kv_cache_gb": kv_cache_size_gb, |
|
"activations_gb": model_size_gb * activation_factor, |
|
"overhead_gb": overhead_gb + (total_inference_gb * compute_overhead_factor), |
|
"total_vram_gb": total_vram_required_gb |
|
} |
|
|
|
|
|
def find_compatible_gpus(vram_required: float) -> List[str]: |
|
""" |
|
Find NVIDIA GPUs that can run a model requiring the specified VRAM. |
|
|
|
Args: |
|
vram_required: Required VRAM in GB |
|
|
|
Returns: |
|
List of compatible GPU names sorted by VRAM capacity (smallest first) |
|
""" |
|
compatible_gpus = [(name, specs[0]) for name, specs in NVIDIA_GPUS.items() if specs[0] >= vram_required] |
|
return [gpu[0] for gpu in sorted(compatible_gpus, key=lambda x: x[1])] |
|
|
|
|
|
def estimate_performance(param_size: float, precision: str, gpu_name: str) -> Dict[str, float]: |
|
""" |
|
Estimate token/second performance for a model on a specific GPU. |
|
|
|
Args: |
|
param_size: Model size in billions of parameters |
|
precision: Model precision |
|
gpu_name: Name of the NVIDIA GPU |
|
|
|
Returns: |
|
Dictionary with performance metrics |
|
""" |
|
if gpu_name not in NVIDIA_GPUS: |
|
return {"tokens_per_second": 0, "tflops_utilization": 0} |
|
|
|
gpu_vram, gpu_tops = NVIDIA_GPUS[gpu_name] |
|
|
|
|
|
|
|
flops_per_token = 6 * param_size * 1e9 |
|
|
|
|
|
precision_factor = 1.0 if precision == "fp32" else 2.0 if precision == "fp16" else 4.0 if precision in ["int8", "int4"] else 1.0 |
|
gpu_tflops = gpu_tops * precision_factor |
|
|
|
|
|
practical_utilization = 0.6 |
|
|
|
|
|
effective_tflops = gpu_tflops * practical_utilization |
|
tokens_per_second = (effective_tflops * 1e12) / flops_per_token |
|
|
|
return { |
|
"tokens_per_second": tokens_per_second, |
|
"flops_per_token": flops_per_token, |
|
"tflops_utilization": practical_utilization, |
|
"effective_tflops": effective_tflops |
|
} |
|
|
|
|
|
def analyze_hf_model(model_id: str) -> Dict[str, any]: |
|
""" |
|
Comprehensive analysis of a Hugging Face model: |
|
- Downloads model information |
|
- Extracts parameter size and precision |
|
- Estimates VRAM requirements |
|
- Identifies compatible NVIDIA GPUs |
|
- Estimates performance on these GPUs |
|
|
|
Args: |
|
model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") |
|
|
|
Returns: |
|
Dictionary with analysis results or error message |
|
""" |
|
|
|
model_info = get_hf_model_info(model_id) |
|
if not model_info: |
|
return {"error": f"Model {model_id} not found on Hugging Face"} |
|
|
|
|
|
size_info = extract_model_size(model_info) |
|
if not size_info: |
|
return {"error": f"Couldn't determine parameter count for {model_id}"} |
|
|
|
param_size, precision = size_info |
|
|
|
|
|
vram_requirements = calculate_vram_requirements(param_size, precision) |
|
total_vram_gb = vram_requirements["total_vram_gb"] |
|
|
|
|
|
compatible_gpus = find_compatible_gpus(total_vram_gb) |
|
|
|
|
|
gpu_performance = {} |
|
for gpu in compatible_gpus: |
|
gpu_performance[gpu] = estimate_performance(param_size, precision, gpu) |
|
|
|
|
|
largest_compatible_gpu = compatible_gpus[-1] if compatible_gpus else None |
|
|
|
return { |
|
"model_id": model_id, |
|
"parameter_size": param_size, |
|
"precision": precision, |
|
"vram_requirements": vram_requirements, |
|
"compatible_gpus": compatible_gpus, |
|
"largest_compatible_gpu": largest_compatible_gpu, |
|
"gpu_performance": gpu_performance, |
|
|
|
|
|
|
|
|
|
|
|
|
|
} |