import logging import os import re import tempfile from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Files/extensions to definitely include INCLUDE_PATTERNS = [ ".py", "requirements.txt", "Dockerfile", ".js", ".jsx", ".ts", ".tsx", ".html", ".css", ".svelte", ".vue", ".json", ".yaml", ".yml", ".toml", "Procfile", ".sh", ] # Files/extensions/folders to ignore IGNORE_PATTERNS = [ ".git", ".hfignore", "README.md", "LICENSE", "__pycache__", ".ipynb_checkpoints", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", ".mp3", ".wav", ".mp4", ".mov", ".avi", ".onnx", ".pt", ".pth", ".bin", ".safetensors", ".tflite", ".pickle", ".pkl", ".joblib", ".parquet", ".csv", ".tsv", ".zip", ".tar.gz", ".gz", ".ipynb", ".DS_Store", "node_modules", ] # Regex to find potential Hugging Face model IDs (e.g., "org/model-name", "user/model-name") # This is a simple heuristic and might catch non-model strings or miss complex cases. HF_MODEL_ID_PATTERN = re.compile(r"([\"\'])([\w\-.]+/[\w\-\.]+)\1\'") # Max length for model descriptions to keep prompts manageable MAX_MODEL_DESC_LENGTH = 1500 SUMMARY_FILENAME = "summary_highlights.md" PRIVACY_FILENAME = "privacy_report.md" TLDR_FILENAME = "tldr_summary.json" def _is_relevant_file(filename): """Check if a file should be included based on patterns.""" # Ignore files matching ignore patterns (case-insensitive check for some) lower_filename = filename.lower() if any( pattern in lower_filename for pattern in [".git", ".hfignore", "readme.md", "license"] ): return False if any( filename.endswith(ext) for ext in IGNORE_PATTERNS if ext.startswith(".") ): # Check extensions return False if any( part == pattern for part in filename.split("/") for pattern in IGNORE_PATTERNS if "." not in pattern and "/" not in pattern ): # Check directory/file names return False if filename in IGNORE_PATTERNS: # Check full filenames return False # Include files matching include patterns if any(filename.endswith(ext) for ext in INCLUDE_PATTERNS if ext.startswith(".")): return True if any(filename == pattern for pattern in INCLUDE_PATTERNS if "." not in pattern): return True # Default to False if not explicitly included (safer) # logging.debug(f"File '{filename}' excluded by default.") return False def get_space_code_files(space_id: str) -> dict[str, str]: """ Downloads relevant code and configuration files from a Hugging Face Space. Args: space_id: The ID of the Hugging Face Space (e.g., 'gradio/hello_world'). Returns: A dictionary where keys are filenames and values are file contents as strings. Returns an empty dictionary if the space is not found or has no relevant files. """ code_files = {} api = HfApi() try: logging.info(f"Fetching file list for Space: {space_id}") repo_files = api.list_repo_files(repo_id=space_id, repo_type="space") logging.info(f"Found {len(repo_files)} total files in {space_id}.") relevant_files = [f for f in repo_files if _is_relevant_file(f)] logging.info(f"Identified {len(relevant_files)} relevant files for download.") for filename in relevant_files: try: logging.debug(f"Downloading {filename} from {space_id}...") file_path = hf_hub_download( repo_id=space_id, filename=filename, repo_type="space", # Consider adding use_auth_token=os.getenv("HF_TOKEN") if accessing private spaces later ) with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() code_files[filename] = content logging.debug(f"Successfully read content of {filename}") except EntryNotFoundError: logging.warning( f"File {filename} listed but not found in repo {space_id}." ) except UnicodeDecodeError: logging.warning( f"Could not decode file {filename} from {space_id} as UTF-8. Skipping." ) except OSError as e: logging.warning(f"OS error reading file {filename} from cache: {e}") except Exception as e: logging.error( f"Unexpected error downloading or reading file {filename} from {space_id}: {e}" ) except RepositoryNotFoundError: logging.error(f"Space repository '{space_id}' not found.") return {} except Exception as e: logging.error(f"Failed to list or process files for space {space_id}: {e}") return {} logging.info( f"Successfully retrieved content for {len(code_files)} files from {space_id}." ) return code_files def extract_hf_model_ids(code_files: dict[str, str]) -> set[str]: """ Extracts potential Hugging Face model IDs mentioned in code files. Args: code_files: Dictionary of {filename: content}. Returns: A set of unique potential model IDs found. """ potential_ids = set() for filename, content in code_files.items(): # Limit search to relevant file types if filename.endswith((".py", ".json", ".yaml", ".yml", ".toml", ".md")): try: matches = HF_MODEL_ID_PATTERN.findall(content) for _, model_id in matches: # Basic validation: must contain exactly one '/' if model_id.count("/") == 1: # Avoid adding common paths that look like IDs if not any( part in model_id.lower() for part in ["http", "www", "@", " ", ".", ":"] ): # Check if '/' is only separator if len(model_id) < 100: # Avoid overly long strings potential_ids.add(model_id) except Exception as e: logging.warning(f"Regex error processing file {filename}: {e}") logging.info(f"Extracted {len(potential_ids)} potential model IDs.") # Add simple filter for very common false positives if needed # potential_ids = {id for id in potential_ids if id not in ['user/repo']} return potential_ids def get_model_descriptions(model_ids: set[str]) -> dict[str, str]: """ Fetches the README.md content (description) for a set of model IDs. Args: model_ids: A set of Hugging Face model IDs. Returns: A dictionary mapping model_id to its description string (or an error message). """ descriptions = {} if not model_ids: return descriptions logging.info(f"Fetching descriptions for {len(model_ids)} models...") for model_id in model_ids: try: # Check if the model exists first (optional but good practice) # api.model_info(model_id) # Download README.md readme_path = hf_hub_download( repo_id=model_id, filename="README.md", repo_type="model", # Add token if needing to access private/gated models - unlikely for Space analysis # use_auth_token=os.getenv("HF_TOKEN"), error_if_not_found=True, # Raise error if README doesn't exist ) with open(readme_path, "r", encoding="utf-8", errors="ignore") as f: description = f.read() descriptions[model_id] = description[:MAX_MODEL_DESC_LENGTH] + ( "... [truncated]" if len(description) > MAX_MODEL_DESC_LENGTH else "" ) logging.debug(f"Successfully fetched description for {model_id}") except RepositoryNotFoundError: logging.warning(f"Model repository '{model_id}' not found.") descriptions[model_id] = "[Model repository not found]" except EntryNotFoundError: logging.warning(f"README.md not found in model repository '{model_id}'.") descriptions[model_id] = "[README.md not found in model repository]" except Exception as e: logging.error(f"Error fetching description for model '{model_id}': {e}") descriptions[model_id] = f"[Error fetching description: {e}]" logging.info(f"Finished fetching descriptions for {len(descriptions)} models.") return descriptions def list_cached_spaces(dataset_id: str, hf_token: str | None) -> list[str]: """Lists the space IDs (owner/name) that have cached reports in the dataset repository.""" if not hf_token: logging.warning("HF Token not provided, cannot list cached spaces.") return [] try: api = HfApi(token=hf_token) # Get all filenames in the dataset repository all_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") # Extract unique directory paths that look like owner/space_name # by checking if they contain our specific report files. space_ids = set() for f_path in all_files: # Check if the file is one of our report files if f_path.endswith(f"/{PRIVACY_FILENAME}") or f_path.endswith( f"/{SUMMARY_FILENAME}" ): # Extract the directory path part (owner/space_name) parts = f_path.split("/") if len(parts) == 3: # Expecting owner/space_name/filename.md owner_slash_space_name = "/".join(parts[:-1]) # Basic validation: owner and space name shouldn't start with '.' if not parts[0].startswith(".") and not parts[1].startswith("."): space_ids.add(owner_slash_space_name) sorted_space_ids = sorted(list(space_ids)) logging.info( f"Found {len(sorted_space_ids)} cached space reports in {dataset_id} via HfApi." ) return sorted_space_ids except RepositoryNotFoundError: logging.warning( f"Dataset {dataset_id} not found or empty when listing cached spaces." ) return [] except Exception as e: logging.error(f"Error listing cached spaces in {dataset_id} via HfApi: {e}") return [] # Return empty list on error def check_report_exists(space_id: str, dataset_id: str, hf_token: str | None) -> bool: """Checks if report files already exist in the target dataset repo using HfApi.""" print( f"[Debug Cache Check] Checking for space_id: '{space_id}' in dataset: '{dataset_id}'" ) # DEBUG if not hf_token: logging.warning("HF Token not provided, cannot check dataset cache.") print("[Debug Cache Check] No HF Token, returning False.") # DEBUG return False try: api = HfApi(token=hf_token) # List ALL files in the repo print(f"[Debug Cache Check] Listing ALL files in repo '{dataset_id}'") # DEBUG all_repo_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") # DEBUG: Optionally print a subset if the list is huge # print(f"[Debug Cache Check] First 10 files returned by API: {all_repo_files[:10]}") # Construct the exact paths we expect for the target space_id expected_summary_path = f"{space_id}/{SUMMARY_FILENAME}" expected_privacy_path = f"{space_id}/{PRIVACY_FILENAME}" print( f"[Debug Cache Check] Expecting summary file: '{expected_summary_path}'" ) # DEBUG print( f"[Debug Cache Check] Expecting privacy file: '{expected_privacy_path}'" ) # DEBUG # Check if both expected paths exist in the full list of files summary_exists = expected_summary_path in all_repo_files privacy_exists = expected_privacy_path in all_repo_files exists = summary_exists and privacy_exists print( f"[Debug Cache Check] Summary exists in full list: {summary_exists}" ) # DEBUG print( f"[Debug Cache Check] Privacy exists in full list: {privacy_exists}" ) # DEBUG print(f"[Debug Cache Check] Overall exists check result: {exists}") # DEBUG return exists except RepositoryNotFoundError: logging.warning( f"Dataset repository {dataset_id} not found or not accessible during check." ) print( f"[Debug Cache Check] Repository {dataset_id} not found, returning False." ) # DEBUG except Exception as e: # ... (error handling remains the same) ... print(f"[Debug Cache Check] Exception caught: {type(e).__name__}: {e}") # DEBUG # Note: 404 check based on path_in_repo is no longer applicable here # We rely on RepositoryNotFoundError or general Exception logging.error( f"Error checking dataset {dataset_id} for {space_id} via HfApi: {e}" ) print("[Debug Cache Check] Other exception, returning False.") # DEBUG return False # Treat errors as cache miss def download_cached_reports( space_id: str, dataset_id: str, hf_token: str | None ) -> dict[str, str]: """Downloads cached reports (summary, privacy, tldr json) from the dataset repo. Returns: Dict containing report contents keyed by 'summary', 'privacy', 'tldr_json_str'. Keys will be missing if a specific file is not found. Raises error on critical download failures (repo not found, etc.). """ if not hf_token: raise ValueError("HF Token required to download cached reports.") logging.info( f"Attempting to download cached reports for {space_id} from {dataset_id}..." ) reports = {} # Define paths relative to dataset root for hf_hub_download summary_repo_path = f"{space_id}/{SUMMARY_FILENAME}" privacy_repo_path = f"{space_id}/{PRIVACY_FILENAME}" tldr_repo_path = f"{space_id}/{TLDR_FILENAME}" # Path for TLDR JSON try: # Download summary try: summary_path_local = hf_hub_download( repo_id=dataset_id, filename=summary_repo_path, repo_type="dataset", token=hf_token, ) with open(summary_path_local, "r", encoding="utf-8") as f: reports["summary"] = f.read() logging.info(f"Successfully downloaded cached summary for {space_id}.") except EntryNotFoundError: logging.warning( f"Cached summary file {summary_repo_path} not found for {space_id}." ) except Exception as e_summary: logging.error( f"Error downloading cached summary for {space_id}: {e_summary}" ) # Decide if this is critical - for now, we warn and continue # Download privacy report try: privacy_path_local = hf_hub_download( repo_id=dataset_id, filename=privacy_repo_path, repo_type="dataset", token=hf_token, ) with open(privacy_path_local, "r", encoding="utf-8") as f: reports["privacy"] = f.read() logging.info( f"Successfully downloaded cached privacy report for {space_id}." ) except EntryNotFoundError: logging.warning( f"Cached privacy file {privacy_repo_path} not found for {space_id}." ) except Exception as e_privacy: logging.error( f"Error downloading cached privacy report for {space_id}: {e_privacy}" ) # Decide if this is critical - for now, we warn and continue # Download TLDR JSON try: tldr_path_local = hf_hub_download( repo_id=dataset_id, filename=tldr_repo_path, repo_type="dataset", token=hf_token, ) with open(tldr_path_local, "r", encoding="utf-8") as f: reports["tldr_json_str"] = f.read() # Store raw string content logging.info(f"Successfully downloaded cached TLDR JSON for {space_id}.") except EntryNotFoundError: logging.warning( f"Cached TLDR file {tldr_repo_path} not found for {space_id}." ) # Don't treat TLDR absence as an error, just won't be in the dict except Exception as e_tldr: logging.error( f"Error downloading cached TLDR JSON for {space_id}: {e_tldr}" ) # Don't treat TLDR download error as critical, just won't be included # Check if at least one report was downloaded successfully if not reports.get("summary") and not reports.get("privacy"): raise FileNotFoundError( f"Failed to download *any* primary cache files (summary/privacy) for {space_id}" ) return reports except RepositoryNotFoundError as e_repo: logging.error( f"Cache download error: Dataset repo {dataset_id} not found. {e_repo}" ) raise FileNotFoundError(f"Dataset repo {dataset_id} not found") from e_repo except Exception as e_critical: # Catch other potential critical errors logging.error( f"Unexpected critical error downloading cached reports for {space_id} from {dataset_id}: {e_critical}" ) raise IOError( f"Failed critically during cached report download for {space_id}" ) from e_critical def upload_reports_to_dataset( space_id: str, summary_report: str, detailed_report: str, dataset_id: str, hf_token: str | None, ): """Uploads the generated reports to the specified dataset repository.""" if not hf_token: logging.warning("HF Token not provided, skipping dataset report upload.") return logging.info( f"Attempting to upload reports for {space_id} to dataset {dataset_id}..." ) api = HfApi(token=hf_token) # Sanitize space_id for path safety (though HF Hub usually handles this) safe_space_id = space_id.replace("..", "") try: with tempfile.TemporaryDirectory() as tmpdir: summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) with open(summary_path_local, "w", encoding="utf-8") as f: f.write(summary_report) with open(privacy_path_local, "w", encoding="utf-8") as f: f.write(detailed_report) commit_message = f"Add privacy analysis reports for Space: {safe_space_id}" repo_url = api.create_repo( repo_id=dataset_id, repo_type="dataset", exist_ok=True, ) logging.info(f"Ensured dataset repo {repo_url} exists.") api.upload_file( path_or_fileobj=summary_path_local, path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", repo_id=dataset_id, repo_type="dataset", commit_message=commit_message, ) logging.info(f"Successfully uploaded summary report for {safe_space_id}.") api.upload_file( path_or_fileobj=privacy_path_local, path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", repo_id=dataset_id, repo_type="dataset", commit_message=commit_message, ) logging.info( f"Successfully uploaded detailed privacy report for {safe_space_id}." ) except Exception as e: logging.error( f"Failed to upload reports for {safe_space_id} to dataset {dataset_id}: {e}" ) # Example usage (for testing) # if __name__ == '__main__': # # Make sure HF_TOKEN is set if accessing private spaces or for higher rate limits # from dotenv import load_dotenv # load_dotenv() # # test_space = "gradio/hello_world" # test_space = "huggingface-projects/diffusers-gallery" # A more complex example # # test_space = "nonexistent/space" # Test not found # files_content = get_space_code_files(test_space) # if files_content: # print(f"\n--- Files retrieved from {test_space} ---") # for name in files_content.keys(): # print(f"- {name}") # # print("\n--- Content of app.py (first 200 chars) ---") # # print(files_content.get("app.py", "app.py not found")[:200]) # else: # print(f"Could not retrieve files from {test_space}")