import psutil import logging from typing import Dict, Any, Optional logger = logging.getLogger(__name__) def get_memory_usage() -> Dict[str, Any]: """Récupère les informations sur l'utilisation de la mémoire système. Optimisé pour les environnements conteneurisés (Docker/Hugging Face Spaces). Returns: Dictionnaire contenant les informations de mémoire en Mo et pourcentages """ try: # Vérifier d'abord si nous sommes dans un conteneur Docker avec des limites cgroup container_limit = None try: # Essayer de lire la limite de mémoire du cgroup (disponible dans les conteneurs) with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f: container_limit = int(f.read().strip()) # Si c'est proche de la valeur maximale d'un entier 64-bit, alors c'est illimité if container_limit > 2**60: # Une valeur très proche de la max container_limit = None except (IOError, FileNotFoundError): # Essayer le chemin cgroup v2 try: with open('/sys/fs/cgroup/memory.max', 'r') as f: content = f.read().strip() if content != 'max': container_limit = int(content) except (IOError, FileNotFoundError): # Pas de limite cgroup lisible pass # Déterminer la mémoire totale pour Hugging Face Spaces (32GB) total_memory = 32 * (1024**3) # 32 GB en octets fixe pour Hugging Face # Essayer une approche plus directe pour les conteneurs Docker docker_memory_used = None memory_source = "unknown" # Log du début de la tentative de lecture logger.info("Tentative de lecture des informations de mémoire Docker...") try: # Lire la mémoire utilisée directement depuis les stats cgroup v1 cgroup_path = '/sys/fs/cgroup/memory/memory.usage_in_bytes' with open(cgroup_path, 'r') as f: docker_memory_used = int(f.read().strip()) memory_source = f"cgroup v1 ({cgroup_path})" logger.info(f"Mémoire Docker lue depuis {memory_source}: {docker_memory_used / (1024*1024):.2f} Mo") except (IOError, FileNotFoundError) as e: logger.info(f"Impossible de lire depuis cgroup v1: {e}") try: # Essayer le chemin cgroup v2 cgroup_path = '/sys/fs/cgroup/memory.current' with open(cgroup_path, 'r') as f: docker_memory_used = int(f.read().strip()) memory_source = f"cgroup v2 ({cgroup_path})" logger.info(f"Mémoire Docker lue depuis {memory_source}: {docker_memory_used / (1024*1024):.2f} Mo") except (IOError, FileNotFoundError) as e: logger.info(f"Impossible de lire depuis cgroup v2: {e}") # Utiliser le RSS du processus Python comme approximation docker_memory_used = psutil.Process().memory_info().rss memory_source = "RSS du processus Python" logger.info(f"Mémoire utilisée approximative depuis {memory_source}: {docker_memory_used / (1024*1024):.2f} Mo") # Si nous n'avons pas pu lire directement la mémoire du conteneur # Utiliser les informations du système mais avec une correction virtual_memory = psutil.virtual_memory() # Utiliser la valeur Docker si disponible, sinon utiliser une approximation # Dans Hugging Face Spaces, la vraie mémoire utilisée est généralement plus proche # de la mémoire RSS du processus Python logger.info(f"Informations psutil - total: {virtual_memory.total / (1024*1024):.2f} Mo, utilisé: {virtual_memory.used / (1024*1024):.2f} Mo") if docker_memory_used: used_memory = docker_memory_used logger.info(f"Utilisation de la mémoire depuis {memory_source}") else: # Approche conservatrice: utiliser 10% du total rapporté par psutil ou la mémoire RSS # Cela correspond mieux à ce que HF affiche typiquement process_rss = psutil.Process().memory_info().rss psutil_adjusted = virtual_memory.used * 0.10 used_memory = max(process_rss, psutil_adjusted) logger.info(f"Utilisation de la mémoire estimée - RSS: {process_rss / (1024*1024):.2f} Mo, psutil ajusté: {psutil_adjusted / (1024*1024):.2f} Mo") memory_source = "estimation (max de RSS et psutil ajusté)" # S'assurer que la mémoire utilisée ne dépasse pas le total used_memory = min(used_memory, total_memory) # Calculer la mémoire disponible available_memory = total_memory - used_memory # Calculer les pourcentages percent_used = (used_memory / total_memory) * 100 if total_memory > 0 else 0 # Log des résultats finaux import datetime current_time = datetime.datetime.now().strftime("%H:%M:%S") logger.info(f"[{current_time}] Mémoire totale: {total_memory / (1024*1024):.2f} Mo (fixée à 32 Go)") logger.info(f"[{current_time}] Mémoire utilisée: {used_memory / (1024*1024):.2f} Mo (source: {memory_source})") logger.info(f"[{current_time}] Mémoire disponible: {available_memory / (1024*1024):.2f} Mo") logger.info(f"[{current_time}] Pourcentage utilisé: {percent_used:.1f}%") # Convertir les bytes en Mo pour être plus lisible memory_info = { "total": round(total_memory / (1024 * 1024), 2), # Mo "available": round(available_memory / (1024 * 1024), 2), # Mo "used": round(used_memory / (1024 * 1024), 2), # Mo "percent_used": round(percent_used, 1), # % "percent_free": round(100 - percent_used, 1) # % } return memory_info except Exception as e: logger.error(f"Failed to get memory usage: {e}") return { "error": f"Failed to get memory usage: {e}", "total": 32 * 1024, # 32 GB par défaut en Mo "available": 16 * 1024, # 16 GB par défaut en Mo "used": 16 * 1024, # 16 GB par défaut en Mo "percent_used": 50.0, # % "percent_free": 50.0 # % } def get_loaded_models_memory(model_pipelines: Dict[str, Any]) -> Dict[str, Any]: """Estime la mémoire utilisée par les modèles chargés. Args: model_pipelines: Dictionnaire des modèles chargés Returns: Dictionnaire avec le nombre de modèles et l'utilisation mémoire estimée """ try: models_count = len(model_pipelines) # Utilisation mémoire approximative basée sur le nombre de paramètres des modèles memory_usage = 0 model_sizes = {} for model_id, model_data in model_pipelines.items(): pipeline = model_data.get('pipeline') if pipeline is not None: # Calculer le nombre de paramètres param_count = sum(p.numel() for p in pipeline.parameters()) # Estimer la mémoire (4 bytes par paramètre pour les float32) size_mb = round(param_count * 4 / (1024 * 1024), 2) model_sizes[model_id] = { "parameters": param_count, "size_mb": size_mb, "name": model_data.get('metadata', {}).get('hf_filename', 'unknown') } memory_usage += size_mb return { "models_count": models_count, "total_memory_mb": round(memory_usage, 2), "models": model_sizes } except Exception as e: logger.error(f"Failed to calculate models memory usage: {e}") return { "error": str(e), "models_count": len(model_pipelines), "total_memory_mb": 0, "models": {} } def get_memory_status(model_pipelines: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Récupère les informations de mémoire système et des modèles. Args: model_pipelines: Dictionnaire des modèles chargés (optionnel) Returns: Dictionnaire contenant les informations de mémoire """ memory_info = { "system_memory": get_memory_usage(), } if model_pipelines is not None: memory_info["models_memory"] = get_loaded_models_memory(model_pipelines) return memory_info def format_memory_status(memory_status: Dict[str, Any]) -> str: """Formate les informations de mémoire de manière élégante pour l'affichage. Args: memory_status: Dictionnaire contenant les informations de mémoire Returns: Chaîne formatée pour affichage """ # Ajouter un horodatage pour vérifier le rafraîchissement import datetime current_time = datetime.datetime.now().strftime("%H:%M:%S") try: system_memory = memory_status.get("system_memory", {}) models_memory = memory_status.get("models_memory", {}) # Formater les infos système avec horodatage formatted_text = f"--- Mémoire Système (maj à {current_time}) ---\n" formatted_text += f"Total: {system_memory.get('total', 0):.1f} Mo \n" formatted_text += f"Utilisée: {system_memory.get('used', 0):.1f} Mo ({system_memory.get('percent_used', 0):.1f}%) \n" formatted_text += f"Disponible: {system_memory.get('available', 0):.1f} Mo ({system_memory.get('percent_free', 0):.1f}%)\n\n" # Ajouter les infos modèles models_count = models_memory.get("models_count", 0) total_model_mb = models_memory.get("total_memory_mb", 0) formatted_text += f"--- Mémoire des Modèles (maj à {current_time}) ---\n" formatted_text += f"Nombre de modèles chargés: {models_count}\n" formatted_text += f"Mémoire estimée: {total_model_mb:.1f} Mo" return formatted_text except Exception as e: logger.error(f"Error formatting memory status: {e}") return "Erreur lors de la récupération des informations de mémoire."