Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Tuple, Optional, Union | |
| import re | |
| import math | |
| import requests | |
| import numpy as np | |
| from huggingface_hub import HfApi, ModelInfo | |
| from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError | |
| def parse_model_entries(model_entries: List[str]) -> List[Dict[str, str]]: | |
| """ | |
| Parse a list of model entries into structured dictionaries with provider, model name, version, region, and type. | |
| Args: | |
| model_entries: List of model entry strings as found in models.txt | |
| Returns: | |
| List of dictionaries with parsed model information containing keys: | |
| - provider: Name of the provider (e.g., 'azure', 'openai', 'anthropic', etc.) | |
| - model_name: Base name of the model | |
| - version: Version of the model (if available) | |
| - region: Deployment region (if available) | |
| - model_type: Type of the model (text, image, audio based on pattern analysis) | |
| """ | |
| parsed_models = [] | |
| # Common provider prefixes to identify | |
| known_providers = [ | |
| 'azure', 'bedrock', 'anthropic', 'openai', 'cohere', 'google', | |
| 'mistral', 'meta', 'amazon', 'ai21', 'anyscale', 'stability', | |
| 'cloudflare', 'databricks', 'cerebras', 'assemblyai' | |
| ] | |
| # Image-related keywords to identify image models | |
| image_indicators = ['dall-e', 'stable-diffusion', 'image', 'canvas', 'x-', 'steps'] | |
| # Audio-related keywords to identify audio models | |
| audio_indicators = ['whisper', 'tts', 'audio', 'voice'] | |
| for entry in model_entries: | |
| model_info = { | |
| 'provider': '', | |
| 'model_name': '', | |
| 'version': '', | |
| 'region': '', | |
| 'model_type': 'text' # Default to text | |
| } | |
| # Check for image models | |
| if any(indicator in entry.lower() for indicator in image_indicators): | |
| model_info['model_type'] = 'image' | |
| # Check for audio models | |
| elif any(indicator in entry.lower() for indicator in audio_indicators): | |
| model_info['model_type'] = 'audio' | |
| # Parse the entry based on common patterns | |
| parts = entry.split('/') | |
| # Handle region and provider extraction | |
| if len(parts) >= 2: | |
| # Extract provider from the beginning (common pattern) | |
| if parts[0].lower() in known_providers: | |
| model_info['provider'] = parts[0].lower() | |
| # For bedrock and azure, the region is often the next part | |
| if parts[0].lower() in ['bedrock', 'azure'] and len(parts) >= 3: | |
| # Skip commitment parts if present | |
| if 'commitment' not in parts[1]: | |
| model_info['region'] = parts[1] | |
| # The last part typically contains the model name and possibly version | |
| model_with_version = parts[-1] | |
| else: | |
| # For single-part entries | |
| model_with_version = entry | |
| # Extract provider from model name if not already set | |
| if not model_info['provider']: | |
| # Look for known providers within the model name | |
| for provider in known_providers: | |
| if provider in model_with_version.lower() or f'{provider}.' in model_with_version.lower(): | |
| model_info['provider'] = provider | |
| # Remove provider prefix if it exists at the beginning | |
| if model_with_version.lower().startswith(f'{provider}.'): | |
| model_with_version = model_with_version[len(provider) + 1:] | |
| break | |
| # Extract version information | |
| version_match = re.search(r'[:.-]v(\d+(?:\.\d+)*(?:-\d+)?|\d+)(?::\d+)?$', model_with_version) | |
| if version_match: | |
| model_info['version'] = version_match.group(1) | |
| # Remove version from model name | |
| model_name = model_with_version[:version_match.start()] | |
| else: | |
| # Look for date-based versions like 2024-08-06 | |
| date_match = re.search(r'-(\d{4}-\d{2}-\d{2})$', model_with_version) | |
| if date_match: | |
| model_info['version'] = date_match.group(1) | |
| model_name = model_with_version[:date_match.start()] | |
| else: | |
| model_name = model_with_version | |
| # Clean up model name by removing trailing/leading separators | |
| model_info['model_name'] = model_name.strip('.-:') | |
| parsed_models.append(model_info) | |
| return parsed_models | |
| def create_model_hierarchy(model_entries: List[str]) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]: | |
| """ | |
| Organize model entries into a nested dictionary structure by provider, model, version, and region. | |
| Args: | |
| model_entries: List of model entry strings as found in models.txt | |
| Returns: | |
| Nested dictionary with the structure: | |
| Provider -> Model -> Version -> Region = full model string | |
| If region or version is None, they are replaced with "NA". | |
| """ | |
| # Parse the model entries to get structured information | |
| parsed_models = parse_model_entries(model_entries) | |
| # Create the nested dictionary structure | |
| hierarchy = {} | |
| for i, model_info in enumerate(parsed_models): | |
| provider = model_info['provider'] if model_info['provider'] else 'unknown' | |
| model_name = model_info['model_name'] | |
| version = model_info['version'] if model_info['version'] else 'NA' | |
| # For Azure models, always use 'NA' as region since they are globally available | |
| region = 'NA' if provider == 'azure' else (model_info['region'] if model_info['region'] else 'NA') | |
| # Initialize nested dictionaries if they don't exist | |
| if provider not in hierarchy: | |
| hierarchy[provider] = {} | |
| if model_name not in hierarchy[provider]: | |
| hierarchy[provider][model_name] = {} | |
| if version not in hierarchy[provider][model_name]: | |
| hierarchy[provider][model_name][version] = {} | |
| # Store the full model string at the leaf node | |
| hierarchy[provider][model_name][version][region] = model_entries[i] | |
| return hierarchy | |
| # NVIDIA GPU specifications - Name: (VRAM in GB, FP16 TOPS) | |
| NVIDIA_GPUS = { | |
| "RTX 3050": (8, 18), | |
| "RTX 3060": (12, 25), | |
| "RTX 3070": (8, 40), | |
| "RTX 3080": (10, 58), | |
| "RTX 3090": (24, 71), | |
| "RTX 4060": (8, 41), | |
| "RTX 4070": (12, 56), | |
| "RTX 4080": (16, 113), | |
| "RTX 4090": (24, 165), | |
| "RTX A2000": (6, 20), | |
| "RTX A4000": (16, 40), | |
| "RTX A5000": (24, 64), | |
| "RTX A6000": (48, 75), | |
| "A100 40GB": (40, 312), | |
| "A100 80GB": (80, 312), | |
| "H100 80GB": (80, 989), | |
| } | |
| def get_hf_model_info(model_id: str) -> Optional[ModelInfo]: | |
| """ | |
| Retrieve model information from the Hugging Face Hub. | |
| Args: | |
| model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") | |
| Returns: | |
| ModelInfo object or None if model not found | |
| """ | |
| try: | |
| api = HfApi() | |
| model_info = api.model_info(model_id) | |
| return model_info | |
| except (RepositoryNotFoundError, RevisionNotFoundError) as e: | |
| print(f"Error fetching model info: {e}") | |
| return None | |
| def extract_model_size(model_info: ModelInfo) -> Optional[Tuple[float, str]]: | |
| """ | |
| Extract the parameter size and precision from model information. | |
| Args: | |
| model_info: ModelInfo object from Hugging Face Hub | |
| Returns: | |
| Tuple of (parameter size in billions, precision) or None if not found | |
| """ | |
| # Try to get parameter count from model card | |
| if model_info.card_data is not None: | |
| if "model-index" in model_info.card_data and isinstance(model_info.card_data["model-index"], list): | |
| for item in model_info.card_data["model-index"]: | |
| if "parameters" in item: | |
| return float(item["parameters"]) / 1e9, "fp16" # Convert to billions and assume fp16 | |
| # Try to extract from model name | |
| name = model_info.id.lower() | |
| size_patterns = [ | |
| r"(\d+(\.\d+)?)b", # matches patterns like "1.3b" or "7b" | |
| r"-(\d+(\.\d+)?)b", # matches patterns like "llama-7b" | |
| r"(\d+(\.\d+)?)-b", # matches other formatting variations | |
| ] | |
| for pattern in size_patterns: | |
| match = re.search(pattern, name) | |
| if match: | |
| size_str = match.group(1) | |
| return float(size_str), "fp16" # Default to fp16 | |
| # Extract precision if available | |
| precision = "fp16" # Default | |
| precision_patterns = {"fp16": r"fp16", "int8": r"int8", "int4": r"int4", "fp32": r"fp32"} | |
| for prec, pattern in precision_patterns.items(): | |
| if re.search(pattern, name): | |
| precision = prec | |
| break | |
| # If couldn't determine size, check sibling models or readme | |
| if model_info.siblings: | |
| for sibling in model_info.siblings: | |
| if sibling.rfilename == "README.md" and sibling.size < 100000: # reasonable size for readme | |
| try: | |
| content = requests.get(sibling.lfs.url).text | |
| param_pattern = r"(\d+(\.\d+)?)\s*[Bb](illion)?\s*[Pp]arameters" | |
| match = re.search(param_pattern, content) | |
| if match: | |
| return float(match.group(1)), precision | |
| except: | |
| pass | |
| # As a last resort, try to analyze config.json if it exists | |
| config_sibling = next((s for s in model_info.siblings if s.rfilename == "config.json"), None) | |
| if config_sibling: | |
| try: | |
| config = requests.get(config_sibling.lfs.url).json() | |
| if "n_params" in config: | |
| return float(config["n_params"]) / 1e9, precision | |
| # Calculate from architecture if available | |
| if all(k in config for k in ["n_layer", "n_head", "n_embd"]): | |
| n_layer = config["n_layer"] | |
| n_embd = config["n_embd"] | |
| n_head = config["n_head"] | |
| # Transformer parameter estimation formula | |
| params = 12 * n_layer * (n_embd**2) * (1 + 13 / (12 * n_embd)) | |
| return params / 1e9, precision | |
| except: | |
| pass | |
| return None | |
| def calculate_vram_requirements(param_size: float, precision: str = "fp16") -> Dict[str, float]: | |
| """ | |
| Calculate VRAM requirements for inference using the EleutherAI transformer math formula. | |
| Args: | |
| param_size: Model size in billions of parameters | |
| precision: Model precision ("fp32", "fp16", "int8", "int4") | |
| Returns: | |
| Dictionary with various memory requirements in GB | |
| """ | |
| # Convert parameters to actual count | |
| param_count = param_size * 1e9 | |
| # Size per parameter based on precision | |
| bytes_per_param = { | |
| "fp32": 4, | |
| "fp16": 2, | |
| "int8": 1, | |
| "int4": 0.5, # 4 bits = 0.5 bytes | |
| }[precision] | |
| # Base model size (parameters * bytes per parameter) | |
| model_size_gb = (param_count * bytes_per_param) / (1024**3) | |
| # EleutherAI formula components for inference memory | |
| # Layer activations - scales with sequence length | |
| activation_factor = 1.2 # varies by architecture | |
| # KV cache size (scales with batch size and sequence length) | |
| # Estimate for single batch, 2048-token context | |
| kv_cache_size_gb = (param_count * 0.0625 * bytes_per_param) / (1024**3) # ~6.25% of params for KV cache | |
| # Total VRAM needed for inference | |
| total_inference_gb = model_size_gb + (model_size_gb * activation_factor) + kv_cache_size_gb | |
| # Add overhead for CUDA, buffers, and fragmentation | |
| overhead_gb = 0.8 # 800 MB overhead | |
| # Dynamic computation graph allocation | |
| compute_overhead_factor = 0.1 # varies based on attention computation method | |
| # Final VRAM estimate | |
| total_vram_required_gb = total_inference_gb + overhead_gb + (total_inference_gb * compute_overhead_factor) | |
| return { | |
| "model_size_gb": model_size_gb, | |
| "kv_cache_gb": kv_cache_size_gb, | |
| "activations_gb": model_size_gb * activation_factor, | |
| "overhead_gb": overhead_gb + (total_inference_gb * compute_overhead_factor), | |
| "total_vram_gb": total_vram_required_gb | |
| } | |
| def find_compatible_gpus(vram_required: float) -> List[str]: | |
| """ | |
| Find NVIDIA GPUs that can run a model requiring the specified VRAM. | |
| Args: | |
| vram_required: Required VRAM in GB | |
| Returns: | |
| List of compatible GPU names sorted by VRAM capacity (smallest first) | |
| """ | |
| compatible_gpus = [(name, specs[0]) for name, specs in NVIDIA_GPUS.items() if specs[0] >= vram_required] | |
| return [gpu[0] for gpu in sorted(compatible_gpus, key=lambda x: x[1])] | |
| def estimate_performance(param_size: float, precision: str, gpu_name: str) -> Dict[str, float]: | |
| """ | |
| Estimate token/second performance for a model on a specific GPU. | |
| Args: | |
| param_size: Model size in billions of parameters | |
| precision: Model precision | |
| gpu_name: Name of the NVIDIA GPU | |
| Returns: | |
| Dictionary with performance metrics | |
| """ | |
| if gpu_name not in NVIDIA_GPUS: | |
| return {"tokens_per_second": 0, "tflops_utilization": 0} | |
| gpu_vram, gpu_tops = NVIDIA_GPUS[gpu_name] | |
| # Calculate FLOPs per token (based on model size) | |
| # Formula: ~6 * num_parameters FLOPs per token (inference) | |
| flops_per_token = 6 * param_size * 1e9 | |
| # Convert TOPS to TFLOPS based on precision | |
| precision_factor = 1.0 if precision == "fp32" else 2.0 if precision == "fp16" else 4.0 if precision in ["int8", "int4"] else 1.0 | |
| gpu_tflops = gpu_tops * precision_factor | |
| # Practical utilization (GPUs rarely achieve 100% of theoretical performance) | |
| practical_utilization = 0.6 # 60% utilization | |
| # Calculate tokens per second | |
| effective_tflops = gpu_tflops * practical_utilization | |
| tokens_per_second = (effective_tflops * 1e12) / flops_per_token | |
| return { | |
| "tokens_per_second": tokens_per_second, | |
| "flops_per_token": flops_per_token, | |
| "tflops_utilization": practical_utilization, | |
| "effective_tflops": effective_tflops | |
| } | |
| def analyze_hf_model(model_id: str) -> Dict[str, any]: | |
| """ | |
| Comprehensive analysis of a Hugging Face model: | |
| - Downloads model information | |
| - Extracts parameter size and precision | |
| - Estimates VRAM requirements | |
| - Identifies compatible NVIDIA GPUs | |
| - Estimates performance on these GPUs | |
| Args: | |
| model_id: Hugging Face model ID (e.g., "facebook/opt-1.3b") | |
| Returns: | |
| Dictionary with analysis results or error message | |
| """ | |
| # Get model information | |
| model_info = get_hf_model_info(model_id) | |
| if not model_info: | |
| return {"error": f"Model {model_id} not found on Hugging Face"} | |
| # Extract model size and precision | |
| size_info = extract_model_size(model_info) | |
| if not size_info: | |
| return {"error": f"Couldn't determine parameter count for {model_id}"} | |
| param_size, precision = size_info | |
| # Calculate VRAM requirements | |
| vram_requirements = calculate_vram_requirements(param_size, precision) | |
| total_vram_gb = vram_requirements["total_vram_gb"] | |
| # Find compatible GPUs | |
| compatible_gpus = find_compatible_gpus(total_vram_gb) | |
| # Calculate performance for each compatible GPU | |
| gpu_performance = {} | |
| for gpu in compatible_gpus: | |
| gpu_performance[gpu] = estimate_performance(param_size, precision, gpu) | |
| # Determine the largest GPU that can run the model | |
| largest_compatible_gpu = compatible_gpus[-1] if compatible_gpus else None | |
| return { | |
| "model_id": model_id, | |
| "parameter_size": param_size, # in billions | |
| "precision": precision, | |
| "vram_requirements": vram_requirements, | |
| "compatible_gpus": compatible_gpus, | |
| "largest_compatible_gpu": largest_compatible_gpu, | |
| "gpu_performance": gpu_performance, | |
| #"model_info": { | |
| #"description": model_info.description, | |
| #"tags": model_info.tags, | |
| #"downloads": model_info.downloads, | |
| #"library": getattr(model_info, "library", None) | |
| #} | |
| } |