space-privacy / utils.py
Yacine Jernite
added TLDR functionality
36de078
import logging
import os
import re
import tempfile
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Files/extensions to definitely include
INCLUDE_PATTERNS = [
".py",
"requirements.txt",
"Dockerfile",
".js",
".jsx",
".ts",
".tsx",
".html",
".css",
".svelte",
".vue",
".json",
".yaml",
".yml",
".toml",
"Procfile",
".sh",
]
# Files/extensions/folders to ignore
IGNORE_PATTERNS = [
".git",
".hfignore",
"README.md",
"LICENSE",
"__pycache__",
".ipynb_checkpoints",
".png",
".jpg",
".jpeg",
".gif",
".svg",
".ico",
".mp3",
".wav",
".mp4",
".mov",
".avi",
".onnx",
".pt",
".pth",
".bin",
".safetensors",
".tflite",
".pickle",
".pkl",
".joblib",
".parquet",
".csv",
".tsv",
".zip",
".tar.gz",
".gz",
".ipynb",
".DS_Store",
"node_modules",
]
# Regex to find potential Hugging Face model IDs (e.g., "org/model-name", "user/model-name")
# This is a simple heuristic and might catch non-model strings or miss complex cases.
HF_MODEL_ID_PATTERN = re.compile(r"([\"\'])([\w\-.]+/[\w\-\.]+)\1\'")
# Max length for model descriptions to keep prompts manageable
MAX_MODEL_DESC_LENGTH = 1500
SUMMARY_FILENAME = "summary_highlights.md"
PRIVACY_FILENAME = "privacy_report.md"
TLDR_FILENAME = "tldr_summary.json"
def _is_relevant_file(filename):
"""Check if a file should be included based on patterns."""
# Ignore files matching ignore patterns (case-insensitive check for some)
lower_filename = filename.lower()
if any(
pattern in lower_filename
for pattern in [".git", ".hfignore", "readme.md", "license"]
):
return False
if any(
filename.endswith(ext) for ext in IGNORE_PATTERNS if ext.startswith(".")
): # Check extensions
return False
if any(
part == pattern
for part in filename.split("/")
for pattern in IGNORE_PATTERNS
if "." not in pattern and "/" not in pattern
): # Check directory/file names
return False
if filename in IGNORE_PATTERNS: # Check full filenames
return False
# Include files matching include patterns
if any(filename.endswith(ext) for ext in INCLUDE_PATTERNS if ext.startswith(".")):
return True
if any(filename == pattern for pattern in INCLUDE_PATTERNS if "." not in pattern):
return True
# Default to False if not explicitly included (safer)
# logging.debug(f"File '{filename}' excluded by default.")
return False
def get_space_code_files(space_id: str) -> dict[str, str]:
"""
Downloads relevant code and configuration files from a Hugging Face Space.
Args:
space_id: The ID of the Hugging Face Space (e.g., 'gradio/hello_world').
Returns:
A dictionary where keys are filenames and values are file contents as strings.
Returns an empty dictionary if the space is not found or has no relevant files.
"""
code_files = {}
api = HfApi()
try:
logging.info(f"Fetching file list for Space: {space_id}")
repo_files = api.list_repo_files(repo_id=space_id, repo_type="space")
logging.info(f"Found {len(repo_files)} total files in {space_id}.")
relevant_files = [f for f in repo_files if _is_relevant_file(f)]
logging.info(f"Identified {len(relevant_files)} relevant files for download.")
for filename in relevant_files:
try:
logging.debug(f"Downloading {filename} from {space_id}...")
file_path = hf_hub_download(
repo_id=space_id,
filename=filename,
repo_type="space",
# Consider adding use_auth_token=os.getenv("HF_TOKEN") if accessing private spaces later
)
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
code_files[filename] = content
logging.debug(f"Successfully read content of {filename}")
except EntryNotFoundError:
logging.warning(
f"File {filename} listed but not found in repo {space_id}."
)
except UnicodeDecodeError:
logging.warning(
f"Could not decode file {filename} from {space_id} as UTF-8. Skipping."
)
except OSError as e:
logging.warning(f"OS error reading file {filename} from cache: {e}")
except Exception as e:
logging.error(
f"Unexpected error downloading or reading file {filename} from {space_id}: {e}"
)
except RepositoryNotFoundError:
logging.error(f"Space repository '{space_id}' not found.")
return {}
except Exception as e:
logging.error(f"Failed to list or process files for space {space_id}: {e}")
return {}
logging.info(
f"Successfully retrieved content for {len(code_files)} files from {space_id}."
)
return code_files
def extract_hf_model_ids(code_files: dict[str, str]) -> set[str]:
"""
Extracts potential Hugging Face model IDs mentioned in code files.
Args:
code_files: Dictionary of {filename: content}.
Returns:
A set of unique potential model IDs found.
"""
potential_ids = set()
for filename, content in code_files.items():
# Limit search to relevant file types
if filename.endswith((".py", ".json", ".yaml", ".yml", ".toml", ".md")):
try:
matches = HF_MODEL_ID_PATTERN.findall(content)
for _, model_id in matches:
# Basic validation: must contain exactly one '/'
if model_id.count("/") == 1:
# Avoid adding common paths that look like IDs
if not any(
part in model_id.lower()
for part in ["http", "www", "@", " ", ".", ":"]
): # Check if '/' is only separator
if len(model_id) < 100: # Avoid overly long strings
potential_ids.add(model_id)
except Exception as e:
logging.warning(f"Regex error processing file {filename}: {e}")
logging.info(f"Extracted {len(potential_ids)} potential model IDs.")
# Add simple filter for very common false positives if needed
# potential_ids = {id for id in potential_ids if id not in ['user/repo']}
return potential_ids
def get_model_descriptions(model_ids: set[str]) -> dict[str, str]:
"""
Fetches the README.md content (description) for a set of model IDs.
Args:
model_ids: A set of Hugging Face model IDs.
Returns:
A dictionary mapping model_id to its description string (or an error message).
"""
descriptions = {}
if not model_ids:
return descriptions
logging.info(f"Fetching descriptions for {len(model_ids)} models...")
for model_id in model_ids:
try:
# Check if the model exists first (optional but good practice)
# api.model_info(model_id)
# Download README.md
readme_path = hf_hub_download(
repo_id=model_id,
filename="README.md",
repo_type="model",
# Add token if needing to access private/gated models - unlikely for Space analysis
# use_auth_token=os.getenv("HF_TOKEN"),
error_if_not_found=True, # Raise error if README doesn't exist
)
with open(readme_path, "r", encoding="utf-8", errors="ignore") as f:
description = f.read()
descriptions[model_id] = description[:MAX_MODEL_DESC_LENGTH] + (
"... [truncated]" if len(description) > MAX_MODEL_DESC_LENGTH else ""
)
logging.debug(f"Successfully fetched description for {model_id}")
except RepositoryNotFoundError:
logging.warning(f"Model repository '{model_id}' not found.")
descriptions[model_id] = "[Model repository not found]"
except EntryNotFoundError:
logging.warning(f"README.md not found in model repository '{model_id}'.")
descriptions[model_id] = "[README.md not found in model repository]"
except Exception as e:
logging.error(f"Error fetching description for model '{model_id}': {e}")
descriptions[model_id] = f"[Error fetching description: {e}]"
logging.info(f"Finished fetching descriptions for {len(descriptions)} models.")
return descriptions
def list_cached_spaces(dataset_id: str, hf_token: str | None) -> list[str]:
"""Lists the space IDs (owner/name) that have cached reports in the dataset repository."""
if not hf_token:
logging.warning("HF Token not provided, cannot list cached spaces.")
return []
try:
api = HfApi(token=hf_token)
# Get all filenames in the dataset repository
all_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset")
# Extract unique directory paths that look like owner/space_name
# by checking if they contain our specific report files.
space_ids = set()
for f_path in all_files:
# Check if the file is one of our report files
if f_path.endswith(f"/{PRIVACY_FILENAME}") or f_path.endswith(
f"/{SUMMARY_FILENAME}"
):
# Extract the directory path part (owner/space_name)
parts = f_path.split("/")
if len(parts) == 3: # Expecting owner/space_name/filename.md
owner_slash_space_name = "/".join(parts[:-1])
# Basic validation: owner and space name shouldn't start with '.'
if not parts[0].startswith(".") and not parts[1].startswith("."):
space_ids.add(owner_slash_space_name)
sorted_space_ids = sorted(list(space_ids))
logging.info(
f"Found {len(sorted_space_ids)} cached space reports in {dataset_id} via HfApi."
)
return sorted_space_ids
except RepositoryNotFoundError:
logging.warning(
f"Dataset {dataset_id} not found or empty when listing cached spaces."
)
return []
except Exception as e:
logging.error(f"Error listing cached spaces in {dataset_id} via HfApi: {e}")
return [] # Return empty list on error
def check_report_exists(space_id: str, dataset_id: str, hf_token: str | None) -> bool:
"""Checks if report files already exist in the target dataset repo using HfApi."""
print(
f"[Debug Cache Check] Checking for space_id: '{space_id}' in dataset: '{dataset_id}'"
) # DEBUG
if not hf_token:
logging.warning("HF Token not provided, cannot check dataset cache.")
print("[Debug Cache Check] No HF Token, returning False.") # DEBUG
return False
try:
api = HfApi(token=hf_token)
# List ALL files in the repo
print(f"[Debug Cache Check] Listing ALL files in repo '{dataset_id}'") # DEBUG
all_repo_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset")
# DEBUG: Optionally print a subset if the list is huge
# print(f"[Debug Cache Check] First 10 files returned by API: {all_repo_files[:10]}")
# Construct the exact paths we expect for the target space_id
expected_summary_path = f"{space_id}/{SUMMARY_FILENAME}"
expected_privacy_path = f"{space_id}/{PRIVACY_FILENAME}"
print(
f"[Debug Cache Check] Expecting summary file: '{expected_summary_path}'"
) # DEBUG
print(
f"[Debug Cache Check] Expecting privacy file: '{expected_privacy_path}'"
) # DEBUG
# Check if both expected paths exist in the full list of files
summary_exists = expected_summary_path in all_repo_files
privacy_exists = expected_privacy_path in all_repo_files
exists = summary_exists and privacy_exists
print(
f"[Debug Cache Check] Summary exists in full list: {summary_exists}"
) # DEBUG
print(
f"[Debug Cache Check] Privacy exists in full list: {privacy_exists}"
) # DEBUG
print(f"[Debug Cache Check] Overall exists check result: {exists}") # DEBUG
return exists
except RepositoryNotFoundError:
logging.warning(
f"Dataset repository {dataset_id} not found or not accessible during check."
)
print(
f"[Debug Cache Check] Repository {dataset_id} not found, returning False."
) # DEBUG
except Exception as e:
# ... (error handling remains the same) ...
print(f"[Debug Cache Check] Exception caught: {type(e).__name__}: {e}") # DEBUG
# Note: 404 check based on path_in_repo is no longer applicable here
# We rely on RepositoryNotFoundError or general Exception
logging.error(
f"Error checking dataset {dataset_id} for {space_id} via HfApi: {e}"
)
print("[Debug Cache Check] Other exception, returning False.") # DEBUG
return False # Treat errors as cache miss
def download_cached_reports(
space_id: str, dataset_id: str, hf_token: str | None
) -> dict[str, str]:
"""Downloads cached reports (summary, privacy, tldr json) from the dataset repo.
Returns:
Dict containing report contents keyed by 'summary', 'privacy', 'tldr_json_str'.
Keys will be missing if a specific file is not found.
Raises error on critical download failures (repo not found, etc.).
"""
if not hf_token:
raise ValueError("HF Token required to download cached reports.")
logging.info(
f"Attempting to download cached reports for {space_id} from {dataset_id}..."
)
reports = {}
# Define paths relative to dataset root for hf_hub_download
summary_repo_path = f"{space_id}/{SUMMARY_FILENAME}"
privacy_repo_path = f"{space_id}/{PRIVACY_FILENAME}"
tldr_repo_path = f"{space_id}/{TLDR_FILENAME}" # Path for TLDR JSON
try:
# Download summary
try:
summary_path_local = hf_hub_download(
repo_id=dataset_id,
filename=summary_repo_path,
repo_type="dataset",
token=hf_token,
)
with open(summary_path_local, "r", encoding="utf-8") as f:
reports["summary"] = f.read()
logging.info(f"Successfully downloaded cached summary for {space_id}.")
except EntryNotFoundError:
logging.warning(
f"Cached summary file {summary_repo_path} not found for {space_id}."
)
except Exception as e_summary:
logging.error(
f"Error downloading cached summary for {space_id}: {e_summary}"
)
# Decide if this is critical - for now, we warn and continue
# Download privacy report
try:
privacy_path_local = hf_hub_download(
repo_id=dataset_id,
filename=privacy_repo_path,
repo_type="dataset",
token=hf_token,
)
with open(privacy_path_local, "r", encoding="utf-8") as f:
reports["privacy"] = f.read()
logging.info(
f"Successfully downloaded cached privacy report for {space_id}."
)
except EntryNotFoundError:
logging.warning(
f"Cached privacy file {privacy_repo_path} not found for {space_id}."
)
except Exception as e_privacy:
logging.error(
f"Error downloading cached privacy report for {space_id}: {e_privacy}"
)
# Decide if this is critical - for now, we warn and continue
# Download TLDR JSON
try:
tldr_path_local = hf_hub_download(
repo_id=dataset_id,
filename=tldr_repo_path,
repo_type="dataset",
token=hf_token,
)
with open(tldr_path_local, "r", encoding="utf-8") as f:
reports["tldr_json_str"] = f.read() # Store raw string content
logging.info(f"Successfully downloaded cached TLDR JSON for {space_id}.")
except EntryNotFoundError:
logging.warning(
f"Cached TLDR file {tldr_repo_path} not found for {space_id}."
)
# Don't treat TLDR absence as an error, just won't be in the dict
except Exception as e_tldr:
logging.error(
f"Error downloading cached TLDR JSON for {space_id}: {e_tldr}"
)
# Don't treat TLDR download error as critical, just won't be included
# Check if at least one report was downloaded successfully
if not reports.get("summary") and not reports.get("privacy"):
raise FileNotFoundError(
f"Failed to download *any* primary cache files (summary/privacy) for {space_id}"
)
return reports
except RepositoryNotFoundError as e_repo:
logging.error(
f"Cache download error: Dataset repo {dataset_id} not found. {e_repo}"
)
raise FileNotFoundError(f"Dataset repo {dataset_id} not found") from e_repo
except Exception as e_critical: # Catch other potential critical errors
logging.error(
f"Unexpected critical error downloading cached reports for {space_id} from {dataset_id}: {e_critical}"
)
raise IOError(
f"Failed critically during cached report download for {space_id}"
) from e_critical
def upload_reports_to_dataset(
space_id: str,
summary_report: str,
detailed_report: str,
dataset_id: str,
hf_token: str | None,
):
"""Uploads the generated reports to the specified dataset repository."""
if not hf_token:
logging.warning("HF Token not provided, skipping dataset report upload.")
return
logging.info(
f"Attempting to upload reports for {space_id} to dataset {dataset_id}..."
)
api = HfApi(token=hf_token)
# Sanitize space_id for path safety (though HF Hub usually handles this)
safe_space_id = space_id.replace("..", "")
try:
with tempfile.TemporaryDirectory() as tmpdir:
summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME)
privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME)
with open(summary_path_local, "w", encoding="utf-8") as f:
f.write(summary_report)
with open(privacy_path_local, "w", encoding="utf-8") as f:
f.write(detailed_report)
commit_message = f"Add privacy analysis reports for Space: {safe_space_id}"
repo_url = api.create_repo(
repo_id=dataset_id,
repo_type="dataset",
exist_ok=True,
)
logging.info(f"Ensured dataset repo {repo_url} exists.")
api.upload_file(
path_or_fileobj=summary_path_local,
path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(f"Successfully uploaded summary report for {safe_space_id}.")
api.upload_file(
path_or_fileobj=privacy_path_local,
path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(
f"Successfully uploaded detailed privacy report for {safe_space_id}."
)
except Exception as e:
logging.error(
f"Failed to upload reports for {safe_space_id} to dataset {dataset_id}: {e}"
)
# Example usage (for testing)
# if __name__ == '__main__':
# # Make sure HF_TOKEN is set if accessing private spaces or for higher rate limits
# from dotenv import load_dotenv
# load_dotenv()
# # test_space = "gradio/hello_world"
# test_space = "huggingface-projects/diffusers-gallery" # A more complex example
# # test_space = "nonexistent/space" # Test not found
# files_content = get_space_code_files(test_space)
# if files_content:
# print(f"\n--- Files retrieved from {test_space} ---")
# for name in files_content.keys():
# print(f"- {name}")
# # print("\n--- Content of app.py (first 200 chars) ---")
# # print(files_content.get("app.py", "app.py not found")[:200])
# else:
# print(f"Could not retrieve files from {test_space}")