llm-pricing-calculator / utils_on.py
ash-98's picture
v1.1
e6ef9f1
from typing import List, Dict, Tuple, Optional, Union
import re
import math
import requests
import numpy as np
from huggingface_hub import HfApi, ModelInfo
from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError
def parse_model_entries(model_entries: List[str]) -> List[Dict[str, str]]:
"""
Parse a list of model entries into structured dictionaries with provider, model name, version, region, and type.
Args:
model_entries: List of model entry strings as found in models.txt
Returns:
List of dictionaries with parsed model information containing keys:
- provider: Name of the provider (e.g., 'azure', 'openai', 'anthropic', etc.)
- model_name: Base name of the model
- version: Version of the model (if available)
- region: Deployment region (if available)
- model_type: Type of the model (text, image, audio based on pattern analysis)
"""
parsed_models = []
# Common provider prefixes to identify
known_providers = [
'azure', 'bedrock', 'anthropic', 'openai', 'cohere', 'google',
'mistral', 'meta', 'amazon', 'ai21', 'anyscale', 'stability',
'cloudflare', 'databricks', 'cerebras', 'assemblyai'
]
# Image-related keywords to identify image models
image_indicators = ['dall-e', 'stable-diffusion', 'image', 'canvas', 'x-', 'steps']
# Audio-related keywords to identify audio models
audio_indicators = ['whisper', 'tts', 'audio', 'voice']
for entry in model_entries:
model_info = {
'provider': '',
'model_name': '',
'version': '',
'region': '',
'model_type': 'text' # Default to text
}
# Check for image models
if any(indicator in entry.lower() for indicator in image_indicators):
model_info['model_type'] = 'image'
# Check for audio models
elif any(indicator in entry.lower() for indicator in audio_indicators):
model_info['model_type'] = 'audio'
# Parse the entry based on common patterns
parts = entry.split('/')
# Handle region and provider extraction
if len(parts) >= 2:
# Extract provider from the beginning (common pattern)
if parts[0].lower() in known_providers:
model_info['provider'] = parts[0].lower()
# For bedrock and azure, the region is often the next part
if parts[0].lower() in ['bedrock', 'azure'] and len(parts) >= 3:
# Skip commitment parts if present
if 'commitment' not in parts[1]:
model_info['region'] = parts[1]
# The last part typically contains the model name and possibly version
model_with_version = parts[-1]
else:
# For single-part entries
model_with_version = entry
# Extract provider from model name if not already set
if not model_info['provider']:
# Look for known providers within the model name
for provider in known_providers:
if provider in model_with_version.lower() or f'{provider}.' in model_with_version.lower():
model_info['provider'] = provider
# Remove provider prefix if it exists at the beginning
if model_with_version.lower().startswith(f'{provider}.'):
model_with_version = model_with_version[len(provider) + 1:]
break
# Extract version information
version_match = re.search(r'[:.-]v(\d+(?:\.\d+)*(?:-\d+)?|\d+)(?::\d+)?$', model_with_version)
if version_match:
model_info['version'] = version_match.group(1)
# Remove version from model name
model_name = model_with_version[:version_match.start()]
else:
# Look for date-based versions like 2024-08-06
date_match = re.search(r'-(\d{4}-\d{2}-\d{2})$', model_with_version)
if date_match:
model_info['version'] = date_match.group(1)
model_name = model_with_version[:date_match.start()]
else:
model_name = model_with_version
# Clean up model name by removing trailing/leading separators
model_info['model_name'] = model_name.strip('.-:')
parsed_models.append(model_info)
return parsed_models
def create_model_hierarchy(model_entries: List[str]) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]:
"""
Organize model entries into a nested dictionary structure by provider, model, version, and region.
Args:
model_entries: List of model entry strings as found in models.txt
Returns:
Nested dictionary with the structure:
Provider -> Model -> Version -> Region = full model string
If region or version is None, they are replaced with "NA".
"""
# Parse the model entries to get structured information
parsed_models = parse_model_entries(model_entries)
# Create the nested dictionary structure
hierarchy = {}
for i, model_info in enumerate(parsed_models):
provider = model_info['provider'] if model_info['provider'] else 'unknown'
model_name = model_info['model_name']
version = model_info['version'] if model_info['version'] else 'NA'
# For Azure models, always use 'NA' as region since they are globally available
region = 'NA' if provider == 'azure' else (model_info['region'] if model_info['region'] else 'NA')
# Initialize nested dictionaries if they don't exist
if provider not in hierarchy:
hierarchy[provider] = {}
if model_name not in hierarchy[provider]:
hierarchy[provider][model_name] = {}
if version not in hierarchy[provider][model_name]:
hierarchy[provider][model_name][version] = {}
# Store the full model string at the leaf node
hierarchy[provider][model_name][version][region] = model_entries[i]
return hierarchy
# NVIDIA GPU specifications - Name: (VRAM in GB, FP16 TOPS)
NVIDIA_GPUS = {
"RTX 3050": (8, 18),
"RTX 3060": (12, 25),
"RTX 3070": (8, 40),
"RTX 3080": (10, 58),
"RTX 3090": (24, 71),
"RTX 4060": (8, 41),
"RTX 4070": (12, 56),
"RTX 4080": (16, 113),
"RTX 4090": (24, 165),
"RTX A2000": (6, 20),
"RTX A4000": (16, 40),
"RTX A5000": (24, 64),
"RTX A6000": (48, 75),
"A100 40GB": (40, 312),
"A100 80GB": (80, 312),
"H100 80GB": (80, 989),
}
def get_hf_model_info(model_id: str) -> Optional[ModelInfo]:
"""
Retrieve model information from the Hugging Face Hub.
Args:
model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b")
Returns:
ModelInfo object or None if model not found
"""
try:
api = HfApi()
model_info = api.model_info(model_id)
return model_info
except (RepositoryNotFoundError, RevisionNotFoundError) as e:
print(f"Error fetching model info: {e}")
return None
def extract_model_size(model_info: ModelInfo) -> Optional[Tuple[float, str]]:
"""
Extract the parameter size and precision from model information.
Args:
model_info: ModelInfo object from Hugging Face Hub
Returns:
Tuple of (parameter size in billions, precision) or None if not found
"""
# Try to get parameter count from model card
if model_info.card_data is not None:
if "model-index" in model_info.card_data and isinstance(model_info.card_data["model-index"], list):
for item in model_info.card_data["model-index"]:
if "parameters" in item:
return float(item["parameters"]) / 1e9, "fp16" # Convert to billions and assume fp16
# Try to extract from model name
name = model_info.id.lower()
size_patterns = [
r"(\d+(\.\d+)?)b", # matches patterns like "1.3b" or "7b"
r"-(\d+(\.\d+)?)b", # matches patterns like "llama-7b"
r"(\d+(\.\d+)?)-b", # matches other formatting variations
]
for pattern in size_patterns:
match = re.search(pattern, name)
if match:
size_str = match.group(1)
return float(size_str), "fp16" # Default to fp16
# Extract precision if available
precision = "fp16" # Default
precision_patterns = {"fp16": r"fp16", "int8": r"int8", "int4": r"int4", "fp32": r"fp32"}
for prec, pattern in precision_patterns.items():
if re.search(pattern, name):
precision = prec
break
# If couldn't determine size, check sibling models or readme
if model_info.siblings:
for sibling in model_info.siblings:
if sibling.rfilename == "README.md" and sibling.size < 100000: # reasonable size for readme
try:
content = requests.get(sibling.lfs.url).text
param_pattern = r"(\d+(\.\d+)?)\s*[Bb](illion)?\s*[Pp]arameters"
match = re.search(param_pattern, content)
if match:
return float(match.group(1)), precision
except:
pass
# As a last resort, try to analyze config.json if it exists
config_sibling = next((s for s in model_info.siblings if s.rfilename == "config.json"), None)
if config_sibling:
try:
config = requests.get(config_sibling.lfs.url).json()
if "n_params" in config:
return float(config["n_params"]) / 1e9, precision
# Calculate from architecture if available
if all(k in config for k in ["n_layer", "n_head", "n_embd"]):
n_layer = config["n_layer"]
n_embd = config["n_embd"]
n_head = config["n_head"]
# Transformer parameter estimation formula
params = 12 * n_layer * (n_embd**2) * (1 + 13 / (12 * n_embd))
return params / 1e9, precision
except:
pass
return None
def calculate_vram_requirements(param_size: float, precision: str = "fp16") -> Dict[str, float]:
"""
Calculate VRAM requirements for inference using the EleutherAI transformer math formula.
Args:
param_size: Model size in billions of parameters
precision: Model precision ("fp32", "fp16", "int8", "int4")
Returns:
Dictionary with various memory requirements in GB
"""
# Convert parameters to actual count
param_count = param_size * 1e9
# Size per parameter based on precision
bytes_per_param = {
"fp32": 4,
"fp16": 2,
"int8": 1,
"int4": 0.5, # 4 bits = 0.5 bytes
}[precision]
# Base model size (parameters * bytes per parameter)
model_size_gb = (param_count * bytes_per_param) / (1024**3)
# EleutherAI formula components for inference memory
# Layer activations - scales with sequence length
activation_factor = 1.2 # varies by architecture
# KV cache size (scales with batch size and sequence length)
# Estimate for single batch, 2048-token context
kv_cache_size_gb = (param_count * 0.0625 * bytes_per_param) / (1024**3) # ~6.25% of params for KV cache
# Total VRAM needed for inference
total_inference_gb = model_size_gb + (model_size_gb * activation_factor) + kv_cache_size_gb
# Add overhead for CUDA, buffers, and fragmentation
overhead_gb = 0.8 # 800 MB overhead
# Dynamic computation graph allocation
compute_overhead_factor = 0.1 # varies based on attention computation method
# Final VRAM estimate
total_vram_required_gb = total_inference_gb + overhead_gb + (total_inference_gb * compute_overhead_factor)
return {
"model_size_gb": model_size_gb,
"kv_cache_gb": kv_cache_size_gb,
"activations_gb": model_size_gb * activation_factor,
"overhead_gb": overhead_gb + (total_inference_gb * compute_overhead_factor),
"total_vram_gb": total_vram_required_gb
}
def find_compatible_gpus(vram_required: float) -> List[str]:
"""
Find NVIDIA GPUs that can run a model requiring the specified VRAM.
Args:
vram_required: Required VRAM in GB
Returns:
List of compatible GPU names sorted by VRAM capacity (smallest first)
"""
compatible_gpus = [(name, specs[0]) for name, specs in NVIDIA_GPUS.items() if specs[0] >= vram_required]
return [gpu[0] for gpu in sorted(compatible_gpus, key=lambda x: x[1])]
def estimate_performance(param_size: float, precision: str, gpu_name: str) -> Dict[str, float]:
"""
Estimate token/second performance for a model on a specific GPU.
Args:
param_size: Model size in billions of parameters
precision: Model precision
gpu_name: Name of the NVIDIA GPU
Returns:
Dictionary with performance metrics
"""
if gpu_name not in NVIDIA_GPUS:
return {"tokens_per_second": 0, "tflops_utilization": 0}
gpu_vram, gpu_tops = NVIDIA_GPUS[gpu_name]
# Calculate FLOPs per token (based on model size)
# Formula: ~6 * num_parameters FLOPs per token (inference)
flops_per_token = 6 * param_size * 1e9
# Convert TOPS to TFLOPS based on precision
precision_factor = 1.0 if precision == "fp32" else 2.0 if precision == "fp16" else 4.0 if precision in ["int8", "int4"] else 1.0
gpu_tflops = gpu_tops * precision_factor
# Practical utilization (GPUs rarely achieve 100% of theoretical performance)
practical_utilization = 0.6 # 60% utilization
# Calculate tokens per second
effective_tflops = gpu_tflops * practical_utilization
tokens_per_second = (effective_tflops * 1e12) / flops_per_token
return {
"tokens_per_second": tokens_per_second,
"flops_per_token": flops_per_token,
"tflops_utilization": practical_utilization,
"effective_tflops": effective_tflops
}
def analyze_hf_model(model_id: str) -> Dict[str, any]:
"""
Comprehensive analysis of a Hugging Face model:
- Downloads model information
- Extracts parameter size and precision
- Estimates VRAM requirements
- Identifies compatible NVIDIA GPUs
- Estimates performance on these GPUs
Args:
model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b")
Returns:
Dictionary with analysis results or error message
"""
# Get model information
model_info = get_hf_model_info(model_id)
if not model_info:
return {"error": f"Model {model_id} not found on Hugging Face"}
# Extract model size and precision
size_info = extract_model_size(model_info)
if not size_info:
return {"error": f"Couldn't determine parameter count for {model_id}"}
param_size, precision = size_info
# Calculate VRAM requirements
vram_requirements = calculate_vram_requirements(param_size, precision)
total_vram_gb = vram_requirements["total_vram_gb"]
# Find compatible GPUs
compatible_gpus = find_compatible_gpus(total_vram_gb)
# Calculate performance for each compatible GPU
gpu_performance = {}
for gpu in compatible_gpus:
gpu_performance[gpu] = estimate_performance(param_size, precision, gpu)
# Determine the largest GPU that can run the model
largest_compatible_gpu = compatible_gpus[-1] if compatible_gpus else None
return {
"model_id": model_id,
"parameter_size": param_size, # in billions
"precision": precision,
"vram_requirements": vram_requirements,
"compatible_gpus": compatible_gpus,
"largest_compatible_gpu": largest_compatible_gpu,
"gpu_performance": gpu_performance,
#"model_info": {
#"description": model_info.description,
#"tags": model_info.tags,
#"downloads": model_info.downloads,
#"library": getattr(model_info, "library", None)
#}
}