Spaces:
Running
Running
import logging | |
import os | |
import re | |
import tempfile | |
from huggingface_hub import HfApi, hf_hub_download | |
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Files/extensions to definitely include | |
INCLUDE_PATTERNS = [ | |
".py", | |
"requirements.txt", | |
"Dockerfile", | |
".js", | |
".jsx", | |
".ts", | |
".tsx", | |
".html", | |
".css", | |
".svelte", | |
".vue", | |
".json", | |
".yaml", | |
".yml", | |
".toml", | |
"Procfile", | |
".sh", | |
] | |
# Files/extensions/folders to ignore | |
IGNORE_PATTERNS = [ | |
".git", | |
".hfignore", | |
"README.md", | |
"LICENSE", | |
"__pycache__", | |
".ipynb_checkpoints", | |
".png", | |
".jpg", | |
".jpeg", | |
".gif", | |
".svg", | |
".ico", | |
".mp3", | |
".wav", | |
".mp4", | |
".mov", | |
".avi", | |
".onnx", | |
".pt", | |
".pth", | |
".bin", | |
".safetensors", | |
".tflite", | |
".pickle", | |
".pkl", | |
".joblib", | |
".parquet", | |
".csv", | |
".tsv", | |
".zip", | |
".tar.gz", | |
".gz", | |
".ipynb", | |
".DS_Store", | |
"node_modules", | |
] | |
# Regex to find potential Hugging Face model IDs (e.g., "org/model-name", "user/model-name") | |
# This is a simple heuristic and might catch non-model strings or miss complex cases. | |
HF_MODEL_ID_PATTERN = re.compile(r"([\"\'])([\w\-.]+/[\w\-\.]+)\1\'") | |
# Max length for model descriptions to keep prompts manageable | |
MAX_MODEL_DESC_LENGTH = 1500 | |
SUMMARY_FILENAME = "summary_highlights.md" | |
PRIVACY_FILENAME = "privacy_report.md" | |
TLDR_FILENAME = "tldr_summary.json" | |
def _is_relevant_file(filename): | |
"""Check if a file should be included based on patterns.""" | |
# Ignore files matching ignore patterns (case-insensitive check for some) | |
lower_filename = filename.lower() | |
if any( | |
pattern in lower_filename | |
for pattern in [".git", ".hfignore", "readme.md", "license"] | |
): | |
return False | |
if any( | |
filename.endswith(ext) for ext in IGNORE_PATTERNS if ext.startswith(".") | |
): # Check extensions | |
return False | |
if any( | |
part == pattern | |
for part in filename.split("/") | |
for pattern in IGNORE_PATTERNS | |
if "." not in pattern and "/" not in pattern | |
): # Check directory/file names | |
return False | |
if filename in IGNORE_PATTERNS: # Check full filenames | |
return False | |
# Include files matching include patterns | |
if any(filename.endswith(ext) for ext in INCLUDE_PATTERNS if ext.startswith(".")): | |
return True | |
if any(filename == pattern for pattern in INCLUDE_PATTERNS if "." not in pattern): | |
return True | |
# Default to False if not explicitly included (safer) | |
# logging.debug(f"File '{filename}' excluded by default.") | |
return False | |
def get_space_code_files(space_id: str) -> dict[str, str]: | |
""" | |
Downloads relevant code and configuration files from a Hugging Face Space. | |
Args: | |
space_id: The ID of the Hugging Face Space (e.g., 'gradio/hello_world'). | |
Returns: | |
A dictionary where keys are filenames and values are file contents as strings. | |
Returns an empty dictionary if the space is not found or has no relevant files. | |
""" | |
code_files = {} | |
api = HfApi() | |
try: | |
logging.info(f"Fetching file list for Space: {space_id}") | |
repo_files = api.list_repo_files(repo_id=space_id, repo_type="space") | |
logging.info(f"Found {len(repo_files)} total files in {space_id}.") | |
relevant_files = [f for f in repo_files if _is_relevant_file(f)] | |
logging.info(f"Identified {len(relevant_files)} relevant files for download.") | |
for filename in relevant_files: | |
try: | |
logging.debug(f"Downloading {filename} from {space_id}...") | |
file_path = hf_hub_download( | |
repo_id=space_id, | |
filename=filename, | |
repo_type="space", | |
# Consider adding use_auth_token=os.getenv("HF_TOKEN") if accessing private spaces later | |
) | |
with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
code_files[filename] = content | |
logging.debug(f"Successfully read content of {filename}") | |
except EntryNotFoundError: | |
logging.warning( | |
f"File {filename} listed but not found in repo {space_id}." | |
) | |
except UnicodeDecodeError: | |
logging.warning( | |
f"Could not decode file {filename} from {space_id} as UTF-8. Skipping." | |
) | |
except OSError as e: | |
logging.warning(f"OS error reading file {filename} from cache: {e}") | |
except Exception as e: | |
logging.error( | |
f"Unexpected error downloading or reading file {filename} from {space_id}: {e}" | |
) | |
except RepositoryNotFoundError: | |
logging.error(f"Space repository '{space_id}' not found.") | |
return {} | |
except Exception as e: | |
logging.error(f"Failed to list or process files for space {space_id}: {e}") | |
return {} | |
logging.info( | |
f"Successfully retrieved content for {len(code_files)} files from {space_id}." | |
) | |
return code_files | |
def extract_hf_model_ids(code_files: dict[str, str]) -> set[str]: | |
""" | |
Extracts potential Hugging Face model IDs mentioned in code files. | |
Args: | |
code_files: Dictionary of {filename: content}. | |
Returns: | |
A set of unique potential model IDs found. | |
""" | |
potential_ids = set() | |
for filename, content in code_files.items(): | |
# Limit search to relevant file types | |
if filename.endswith((".py", ".json", ".yaml", ".yml", ".toml", ".md")): | |
try: | |
matches = HF_MODEL_ID_PATTERN.findall(content) | |
for _, model_id in matches: | |
# Basic validation: must contain exactly one '/' | |
if model_id.count("/") == 1: | |
# Avoid adding common paths that look like IDs | |
if not any( | |
part in model_id.lower() | |
for part in ["http", "www", "@", " ", ".", ":"] | |
): # Check if '/' is only separator | |
if len(model_id) < 100: # Avoid overly long strings | |
potential_ids.add(model_id) | |
except Exception as e: | |
logging.warning(f"Regex error processing file {filename}: {e}") | |
logging.info(f"Extracted {len(potential_ids)} potential model IDs.") | |
# Add simple filter for very common false positives if needed | |
# potential_ids = {id for id in potential_ids if id not in ['user/repo']} | |
return potential_ids | |
def get_model_descriptions(model_ids: set[str]) -> dict[str, str]: | |
""" | |
Fetches the README.md content (description) for a set of model IDs. | |
Args: | |
model_ids: A set of Hugging Face model IDs. | |
Returns: | |
A dictionary mapping model_id to its description string (or an error message). | |
""" | |
descriptions = {} | |
if not model_ids: | |
return descriptions | |
logging.info(f"Fetching descriptions for {len(model_ids)} models...") | |
for model_id in model_ids: | |
try: | |
# Check if the model exists first (optional but good practice) | |
# api.model_info(model_id) | |
# Download README.md | |
readme_path = hf_hub_download( | |
repo_id=model_id, | |
filename="README.md", | |
repo_type="model", | |
# Add token if needing to access private/gated models - unlikely for Space analysis | |
# use_auth_token=os.getenv("HF_TOKEN"), | |
error_if_not_found=True, # Raise error if README doesn't exist | |
) | |
with open(readme_path, "r", encoding="utf-8", errors="ignore") as f: | |
description = f.read() | |
descriptions[model_id] = description[:MAX_MODEL_DESC_LENGTH] + ( | |
"... [truncated]" if len(description) > MAX_MODEL_DESC_LENGTH else "" | |
) | |
logging.debug(f"Successfully fetched description for {model_id}") | |
except RepositoryNotFoundError: | |
logging.warning(f"Model repository '{model_id}' not found.") | |
descriptions[model_id] = "[Model repository not found]" | |
except EntryNotFoundError: | |
logging.warning(f"README.md not found in model repository '{model_id}'.") | |
descriptions[model_id] = "[README.md not found in model repository]" | |
except Exception as e: | |
logging.error(f"Error fetching description for model '{model_id}': {e}") | |
descriptions[model_id] = f"[Error fetching description: {e}]" | |
logging.info(f"Finished fetching descriptions for {len(descriptions)} models.") | |
return descriptions | |
def list_cached_spaces(dataset_id: str, hf_token: str | None) -> list[str]: | |
"""Lists the space IDs (owner/name) that have cached reports in the dataset repository.""" | |
if not hf_token: | |
logging.warning("HF Token not provided, cannot list cached spaces.") | |
return [] | |
try: | |
api = HfApi(token=hf_token) | |
# Get all filenames in the dataset repository | |
all_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
# Extract unique directory paths that look like owner/space_name | |
# by checking if they contain our specific report files. | |
space_ids = set() | |
for f_path in all_files: | |
# Check if the file is one of our report files | |
if f_path.endswith(f"/{PRIVACY_FILENAME}") or f_path.endswith( | |
f"/{SUMMARY_FILENAME}" | |
): | |
# Extract the directory path part (owner/space_name) | |
parts = f_path.split("/") | |
if len(parts) == 3: # Expecting owner/space_name/filename.md | |
owner_slash_space_name = "/".join(parts[:-1]) | |
# Basic validation: owner and space name shouldn't start with '.' | |
if not parts[0].startswith(".") and not parts[1].startswith("."): | |
space_ids.add(owner_slash_space_name) | |
sorted_space_ids = sorted(list(space_ids)) | |
logging.info( | |
f"Found {len(sorted_space_ids)} cached space reports in {dataset_id} via HfApi." | |
) | |
return sorted_space_ids | |
except RepositoryNotFoundError: | |
logging.warning( | |
f"Dataset {dataset_id} not found or empty when listing cached spaces." | |
) | |
return [] | |
except Exception as e: | |
logging.error(f"Error listing cached spaces in {dataset_id} via HfApi: {e}") | |
return [] # Return empty list on error | |
def check_report_exists(space_id: str, dataset_id: str, hf_token: str | None) -> bool: | |
"""Checks if report files already exist in the target dataset repo using HfApi.""" | |
print( | |
f"[Debug Cache Check] Checking for space_id: '{space_id}' in dataset: '{dataset_id}'" | |
) # DEBUG | |
if not hf_token: | |
logging.warning("HF Token not provided, cannot check dataset cache.") | |
print("[Debug Cache Check] No HF Token, returning False.") # DEBUG | |
return False | |
try: | |
api = HfApi(token=hf_token) | |
# List ALL files in the repo | |
print(f"[Debug Cache Check] Listing ALL files in repo '{dataset_id}'") # DEBUG | |
all_repo_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
# DEBUG: Optionally print a subset if the list is huge | |
# print(f"[Debug Cache Check] First 10 files returned by API: {all_repo_files[:10]}") | |
# Construct the exact paths we expect for the target space_id | |
expected_summary_path = f"{space_id}/{SUMMARY_FILENAME}" | |
expected_privacy_path = f"{space_id}/{PRIVACY_FILENAME}" | |
print( | |
f"[Debug Cache Check] Expecting summary file: '{expected_summary_path}'" | |
) # DEBUG | |
print( | |
f"[Debug Cache Check] Expecting privacy file: '{expected_privacy_path}'" | |
) # DEBUG | |
# Check if both expected paths exist in the full list of files | |
summary_exists = expected_summary_path in all_repo_files | |
privacy_exists = expected_privacy_path in all_repo_files | |
exists = summary_exists and privacy_exists | |
print( | |
f"[Debug Cache Check] Summary exists in full list: {summary_exists}" | |
) # DEBUG | |
print( | |
f"[Debug Cache Check] Privacy exists in full list: {privacy_exists}" | |
) # DEBUG | |
print(f"[Debug Cache Check] Overall exists check result: {exists}") # DEBUG | |
return exists | |
except RepositoryNotFoundError: | |
logging.warning( | |
f"Dataset repository {dataset_id} not found or not accessible during check." | |
) | |
print( | |
f"[Debug Cache Check] Repository {dataset_id} not found, returning False." | |
) # DEBUG | |
except Exception as e: | |
# ... (error handling remains the same) ... | |
print(f"[Debug Cache Check] Exception caught: {type(e).__name__}: {e}") # DEBUG | |
# Note: 404 check based on path_in_repo is no longer applicable here | |
# We rely on RepositoryNotFoundError or general Exception | |
logging.error( | |
f"Error checking dataset {dataset_id} for {space_id} via HfApi: {e}" | |
) | |
print("[Debug Cache Check] Other exception, returning False.") # DEBUG | |
return False # Treat errors as cache miss | |
def download_cached_reports( | |
space_id: str, dataset_id: str, hf_token: str | None | |
) -> dict[str, str]: | |
"""Downloads cached reports (summary, privacy, tldr json) from the dataset repo. | |
Returns: | |
Dict containing report contents keyed by 'summary', 'privacy', 'tldr_json_str'. | |
Keys will be missing if a specific file is not found. | |
Raises error on critical download failures (repo not found, etc.). | |
""" | |
if not hf_token: | |
raise ValueError("HF Token required to download cached reports.") | |
logging.info( | |
f"Attempting to download cached reports for {space_id} from {dataset_id}..." | |
) | |
reports = {} | |
# Define paths relative to dataset root for hf_hub_download | |
summary_repo_path = f"{space_id}/{SUMMARY_FILENAME}" | |
privacy_repo_path = f"{space_id}/{PRIVACY_FILENAME}" | |
tldr_repo_path = f"{space_id}/{TLDR_FILENAME}" # Path for TLDR JSON | |
try: | |
# Download summary | |
try: | |
summary_path_local = hf_hub_download( | |
repo_id=dataset_id, | |
filename=summary_repo_path, | |
repo_type="dataset", | |
token=hf_token, | |
) | |
with open(summary_path_local, "r", encoding="utf-8") as f: | |
reports["summary"] = f.read() | |
logging.info(f"Successfully downloaded cached summary for {space_id}.") | |
except EntryNotFoundError: | |
logging.warning( | |
f"Cached summary file {summary_repo_path} not found for {space_id}." | |
) | |
except Exception as e_summary: | |
logging.error( | |
f"Error downloading cached summary for {space_id}: {e_summary}" | |
) | |
# Decide if this is critical - for now, we warn and continue | |
# Download privacy report | |
try: | |
privacy_path_local = hf_hub_download( | |
repo_id=dataset_id, | |
filename=privacy_repo_path, | |
repo_type="dataset", | |
token=hf_token, | |
) | |
with open(privacy_path_local, "r", encoding="utf-8") as f: | |
reports["privacy"] = f.read() | |
logging.info( | |
f"Successfully downloaded cached privacy report for {space_id}." | |
) | |
except EntryNotFoundError: | |
logging.warning( | |
f"Cached privacy file {privacy_repo_path} not found for {space_id}." | |
) | |
except Exception as e_privacy: | |
logging.error( | |
f"Error downloading cached privacy report for {space_id}: {e_privacy}" | |
) | |
# Decide if this is critical - for now, we warn and continue | |
# Download TLDR JSON | |
try: | |
tldr_path_local = hf_hub_download( | |
repo_id=dataset_id, | |
filename=tldr_repo_path, | |
repo_type="dataset", | |
token=hf_token, | |
) | |
with open(tldr_path_local, "r", encoding="utf-8") as f: | |
reports["tldr_json_str"] = f.read() # Store raw string content | |
logging.info(f"Successfully downloaded cached TLDR JSON for {space_id}.") | |
except EntryNotFoundError: | |
logging.warning( | |
f"Cached TLDR file {tldr_repo_path} not found for {space_id}." | |
) | |
# Don't treat TLDR absence as an error, just won't be in the dict | |
except Exception as e_tldr: | |
logging.error( | |
f"Error downloading cached TLDR JSON for {space_id}: {e_tldr}" | |
) | |
# Don't treat TLDR download error as critical, just won't be included | |
# Check if at least one report was downloaded successfully | |
if not reports.get("summary") and not reports.get("privacy"): | |
raise FileNotFoundError( | |
f"Failed to download *any* primary cache files (summary/privacy) for {space_id}" | |
) | |
return reports | |
except RepositoryNotFoundError as e_repo: | |
logging.error( | |
f"Cache download error: Dataset repo {dataset_id} not found. {e_repo}" | |
) | |
raise FileNotFoundError(f"Dataset repo {dataset_id} not found") from e_repo | |
except Exception as e_critical: # Catch other potential critical errors | |
logging.error( | |
f"Unexpected critical error downloading cached reports for {space_id} from {dataset_id}: {e_critical}" | |
) | |
raise IOError( | |
f"Failed critically during cached report download for {space_id}" | |
) from e_critical | |
def upload_reports_to_dataset( | |
space_id: str, | |
summary_report: str, | |
detailed_report: str, | |
dataset_id: str, | |
hf_token: str | None, | |
): | |
"""Uploads the generated reports to the specified dataset repository.""" | |
if not hf_token: | |
logging.warning("HF Token not provided, skipping dataset report upload.") | |
return | |
logging.info( | |
f"Attempting to upload reports for {space_id} to dataset {dataset_id}..." | |
) | |
api = HfApi(token=hf_token) | |
# Sanitize space_id for path safety (though HF Hub usually handles this) | |
safe_space_id = space_id.replace("..", "") | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) | |
privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) | |
with open(summary_path_local, "w", encoding="utf-8") as f: | |
f.write(summary_report) | |
with open(privacy_path_local, "w", encoding="utf-8") as f: | |
f.write(detailed_report) | |
commit_message = f"Add privacy analysis reports for Space: {safe_space_id}" | |
repo_url = api.create_repo( | |
repo_id=dataset_id, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
logging.info(f"Ensured dataset repo {repo_url} exists.") | |
api.upload_file( | |
path_or_fileobj=summary_path_local, | |
path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", | |
repo_id=dataset_id, | |
repo_type="dataset", | |
commit_message=commit_message, | |
) | |
logging.info(f"Successfully uploaded summary report for {safe_space_id}.") | |
api.upload_file( | |
path_or_fileobj=privacy_path_local, | |
path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", | |
repo_id=dataset_id, | |
repo_type="dataset", | |
commit_message=commit_message, | |
) | |
logging.info( | |
f"Successfully uploaded detailed privacy report for {safe_space_id}." | |
) | |
except Exception as e: | |
logging.error( | |
f"Failed to upload reports for {safe_space_id} to dataset {dataset_id}: {e}" | |
) | |
# Example usage (for testing) | |
# if __name__ == '__main__': | |
# # Make sure HF_TOKEN is set if accessing private spaces or for higher rate limits | |
# from dotenv import load_dotenv | |
# load_dotenv() | |
# # test_space = "gradio/hello_world" | |
# test_space = "huggingface-projects/diffusers-gallery" # A more complex example | |
# # test_space = "nonexistent/space" # Test not found | |
# files_content = get_space_code_files(test_space) | |
# if files_content: | |
# print(f"\n--- Files retrieved from {test_space} ---") | |
# for name in files_content.keys(): | |
# print(f"- {name}") | |
# # print("\n--- Content of app.py (first 200 chars) ---") | |
# # print(files_content.get("app.py", "app.py not found")[:200]) | |
# else: | |
# print(f"Could not retrieve files from {test_space}") | |