space-privacy / analysis_utils.py
Yacine Jernite
added TLDR functionality
36de078
import json # Added for TLDR JSON parsing
import logging
import os
import tempfile
from huggingface_hub import HfApi
from huggingface_hub.inference._generated.types import \
ChatCompletionOutput # Added for type hinting
# Imports from other project modules
from llm_interface import (ERROR_503_DICT, parse_qwen_response,
query_qwen_endpoint)
from prompts import format_privacy_prompt, format_summary_highlights_prompt
from utils import (PRIVACY_FILENAME, # Import constants for filenames
SUMMARY_FILENAME, TLDR_FILENAME, check_report_exists,
download_cached_reports, get_space_code_files)
# Configure logging (can inherit from app.py if called from there, but good practice)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Load environment variables - redundant if always called by app.py which already loads them
# load_dotenv()
# Constants needed by helper functions (can be passed as args too)
# Consider passing these from app.py if they might change or for clarity
CACHE_INFO_MSG = "\n\n*(Report retrieved from cache)*"
TRUNCATION_WARNING = """**⚠️ Warning:** The input data (code and/or prior analysis) was too long for the AI model's context limit and had to be truncated. The analysis below may be incomplete or based on partial information.\n\n---\n\n"""
# --- Constants for TLDR Generation ---
TLDR_SYSTEM_PROMPT = (
"You are an AI assistant specialized in summarizing privacy analysis reports for Hugging Face Spaces. "
"You will receive two reports: a detailed privacy analysis and a summary/highlights report. "
"Based **only** on the content of these two reports, generate a concise JSON object containing a structured TLDR (Too Long; Didn't Read). "
"Do not use any information not present in the provided reports. "
"The JSON object must have the following keys:\n"
'- "app_description": A 1-2 sentence summary of what the application does from a user\'s perspective.\n'
'- "privacy_tldr": A 2-3 sentence high-level overview of privacy. Mention if the analysis was conclusive based on available code, if data processing is local, or if/what data goes to external services.\n'
'- "data_types": A list of JSON objects, where each object has two keys: \'name\' (a short, unique identifier string for the data type, e.g., "User Text") and \'description\' (a brief string explaining the data type in context, max 6-8 words, e.g., "Text prompt entered by the user").\n'
"- \"user_input_data\": A list of strings, where each string is the 'name' of a data type defined in 'data_types' that is provided by the user to the app.\n"
"- \"local_processing\": A list of strings describing data processed locally. Each string should start with the 'name' of a data type defined in 'data_types', followed by details (like the processing model) in parentheses if mentioned in the reports. Example: \"User Text (Local Model XYZ)\".\n"
"- \"remote_processing\": A list of strings describing data sent to remote services. Each string should start with the 'name' of a data type defined in 'data_types', followed by the service/model name in parentheses if mentioned in the reports. Example: \"User Text (HF Inference API)\".\n"
"- \"external_logging\": A list of strings describing data logged or saved externally. Each string should start with the 'name' of a data type defined in 'data_types', followed by the location/service in parentheses if mentioned. Example: \"User Text (External DB)\".\n"
"Ensure the output is **only** a valid JSON object, starting with `{` and ending with `}`. Ensure all listed data types in the processing/logging lists exactly match a 'name' defined in the 'data_types' list."
)
# --- Analysis Pipeline Helper Functions ---
def check_cache_and_download(space_id: str, dataset_id: str, hf_token: str | None):
"""Checks cache and downloads if reports exist."""
logging.info(f"Checking cache for '{space_id}'...")
found_in_cache = False
if hf_token:
try:
found_in_cache = check_report_exists(space_id, dataset_id, hf_token)
except Exception as e:
logging.warning(f"Cache check failed for {space_id}: {e}. Proceeding.")
# Return cache_miss even if check failed, proceed to live analysis
return {"status": "cache_miss", "error_message": f"Cache check failed: {e}"}
if found_in_cache:
logging.info(f"Cache hit for {space_id}. Downloading.")
try:
cached_reports = download_cached_reports(space_id, dataset_id, hf_token)
summary_report = (
cached_reports.get("summary", "Error: Cached summary not found.")
+ CACHE_INFO_MSG
)
privacy_report = (
cached_reports.get("privacy", "Error: Cached privacy report not found.")
+ CACHE_INFO_MSG
)
logging.info(f"Successfully downloaded cached reports for {space_id}.")
return {
"status": "cache_hit",
"summary": summary_report,
"privacy": privacy_report,
"tldr_json_str": cached_reports.get("tldr_json_str"),
}
except Exception as e:
error_msg = f"Cache download failed for {space_id}: {e}"
logging.warning(f"{error_msg}. Proceeding with live analysis.")
# Return error, but let caller decide if live analysis proceeds
return {"status": "cache_error", "ui_message": error_msg}
else:
logging.info(f"Cache miss for {space_id}. Performing live analysis.")
return {"status": "cache_miss"}
def check_endpoint_status(
endpoint_name: str, hf_token: str | None, error_503_user_message: str
):
"""Checks the status of the inference endpoint."""
logging.info(f"Checking endpoint status for '{endpoint_name}'...")
if not hf_token:
# Allow proceeding if token missing, maybe endpoint is public
logging.warning("HF_TOKEN not set, cannot check endpoint status definitively.")
return {"status": "ready", "warning": "HF_TOKEN not set"}
try:
api = HfApi(token=hf_token)
endpoint = api.get_inference_endpoint(name=endpoint_name)
status = endpoint.status
logging.info(f"Endpoint '{endpoint_name}' status: {status}")
if status == "running":
return {"status": "ready"}
else:
logging.warning(
f"Endpoint '{endpoint_name}' is not ready (Status: {status})."
)
if status == "scaledToZero":
logging.info(
f"Endpoint '{endpoint_name}' is scaled to zero. Attempting to resume..."
)
try:
endpoint.resume()
# Still return an error message suggesting retry, as resume takes time
# Keep this message concise as the action is specific (wait)
msg = f"**Endpoint Resuming:** The analysis endpoint ('{endpoint_name}') was scaled to zero and is now restarting.\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
except Exception as resume_error:
# Resume failed, provide detailed message
logging.error(
f"Failed to resume endpoint {endpoint_name}: {resume_error}"
)
# Construct detailed message including full explanation
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') is currently {status} and an attempt to resume it failed ({resume_error}).\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
else: # Paused, failed, pending etc.
# Construct detailed message including full explanation
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') status is currently <span style='color:red'>**{status}**</span>.\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
except Exception as e:
error_msg = f"Error checking analysis endpoint status for {endpoint_name}: {e}"
logging.error(error_msg)
# Let analysis stop if endpoint check fails critically
return {"status": "error", "ui_message": f"Error checking endpoint status: {e}"}
def fetch_and_validate_code(space_id: str):
"""Fetches and validates code files for the space."""
logging.info(f"Fetching code files for {space_id}...")
code_files = get_space_code_files(space_id)
if not code_files:
error_msg = f"Could not retrieve code files for '{space_id}'. Check ID and ensure it's a public Space."
logging.warning(error_msg)
return {
"status": "error",
"ui_message": f"**Error:**\n{error_msg}\nAnalysis Canceled.",
}
logging.info(f"Successfully fetched {len(code_files)} files for {space_id}.")
return {"status": "success", "code_files": code_files}
def generate_detailed_report(
space_id: str, code_files: dict, error_503_user_message: str
):
"""Generates the detailed privacy report using the LLM."""
logging.info("Generating detailed privacy analysis report...")
privacy_prompt_messages, privacy_truncated = format_privacy_prompt(
space_id, code_files
)
privacy_api_response = query_qwen_endpoint(privacy_prompt_messages, max_tokens=3072)
if privacy_api_response == ERROR_503_DICT:
logging.warning("LLM Call 1 (Privacy) failed with 503.")
return {"status": "error", "ui_message": error_503_user_message}
detailed_privacy_report = parse_qwen_response(privacy_api_response)
if "Error:" in detailed_privacy_report:
error_msg = (
f"Failed to generate detailed privacy report: {detailed_privacy_report}"
)
logging.error(error_msg)
return {
"status": "error",
"ui_message": f"**Error Generating Detailed Privacy Report:**\n{detailed_privacy_report}\nAnalysis Halted.",
}
if privacy_truncated:
detailed_privacy_report = TRUNCATION_WARNING + detailed_privacy_report
logging.info("Successfully generated detailed privacy report.")
return {
"status": "success",
"report": detailed_privacy_report,
"truncated": privacy_truncated,
}
def generate_summary_report(
space_id: str,
code_files: dict,
detailed_privacy_report: str,
error_503_user_message: str,
):
"""Generates the summary & highlights report using the LLM."""
logging.info("Generating summary and highlights report...")
# Remove potential truncation warning from detailed report before sending to next LLM
clean_detailed_report = detailed_privacy_report.replace(TRUNCATION_WARNING, "")
summary_highlights_prompt_messages, summary_truncated = (
format_summary_highlights_prompt(space_id, code_files, clean_detailed_report)
)
summary_highlights_api_response = query_qwen_endpoint(
summary_highlights_prompt_messages, max_tokens=2048
)
if summary_highlights_api_response == ERROR_503_DICT:
logging.warning("LLM Call 2 (Summary) failed with 503.")
# Return specific status to indicate partial success
return {"status": "error_503_summary", "ui_message": error_503_user_message}
summary_highlights_report = parse_qwen_response(summary_highlights_api_response)
if "Error:" in summary_highlights_report:
error_msg = (
f"Failed to generate summary/highlights report: {summary_highlights_report}"
)
logging.error(error_msg)
# Return specific status to indicate partial success
return {
"status": "error_summary",
"ui_message": f"**Error Generating Summary/Highlights:**\n{summary_highlights_report}",
}
if summary_truncated:
summary_highlights_report = TRUNCATION_WARNING + summary_highlights_report
logging.info("Successfully generated summary & highlights report.")
return {
"status": "success",
"report": summary_highlights_report,
"truncated": summary_truncated,
}
def upload_results(
space_id: str,
summary_report: str,
detailed_report: str,
dataset_id: str,
hf_token: str | None,
tldr_json_data: dict | None = None,
):
"""Uploads the generated reports (Markdown and optional JSON TLDR) to the specified dataset repository."""
if not hf_token:
logging.warning("HF Token not provided, skipping dataset report upload.")
return {"status": "skipped", "reason": "HF_TOKEN not set"}
if "Error:" in detailed_report or "Error:" in summary_report:
msg = "Skipping cache upload due to errors in generated reports."
logging.warning(msg)
return {"status": "skipped", "reason": msg}
safe_space_id = space_id.replace("..", "")
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Define local paths
summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME)
privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME)
tldr_json_path_local = os.path.join(tmpdir, TLDR_FILENAME)
# Write Markdown reports
with open(summary_path_local, "w", encoding="utf-8") as f:
f.write(summary_report)
with open(privacy_path_local, "w", encoding="utf-8") as f:
f.write(detailed_report)
# Prepare commit message
commit_message = f"Add analysis reports for Space: {safe_space_id}"
if tldr_json_data:
commit_message += " (including TLDR JSON)"
print(f"Successfully wrote TLDR JSON locally for {safe_space_id}.")
# Write JSON TLDR data if available
try:
with open(tldr_json_path_local, "w", encoding="utf-8") as f:
json.dump(tldr_json_data, f, indent=2, ensure_ascii=False)
logging.info(
f"Successfully wrote TLDR JSON locally for {safe_space_id}."
)
except Exception as json_err:
logging.error(
f"Failed to write TLDR JSON locally for {safe_space_id}: {json_err}"
)
tldr_json_data = None # Prevent upload attempt if writing failed
# Ensure repo exists
api = HfApi(token=hf_token)
repo_url = api.create_repo(
repo_id=dataset_id,
repo_type="dataset",
exist_ok=True,
)
logging.info(f"Ensured dataset repo {repo_url} exists.")
# Upload summary report
api.upload_file(
path_or_fileobj=summary_path_local,
path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(f"Successfully uploaded summary report for {safe_space_id}.")
# Upload privacy report
api.upload_file(
path_or_fileobj=privacy_path_local,
path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(
f"Successfully uploaded detailed privacy report for {safe_space_id}."
)
# print(f"Successfully uploaded detailed privacy report for {safe_space_id}.") # Keep if needed for debug
# Upload JSON TLDR if it was successfully written locally
if tldr_json_data and os.path.exists(tldr_json_path_local):
api.upload_file(
path_or_fileobj=tldr_json_path_local,
path_in_repo=f"{safe_space_id}/{TLDR_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message, # Can reuse commit message or make specific
)
logging.info(f"Successfully uploaded TLDR JSON for {safe_space_id}.")
print(f"Successfully uploaded TLDR JSON for {safe_space_id}.")
# Return success if all uploads finished without error
return {"status": "success"}
except Exception as e:
error_msg = f"Non-critical error during report upload for {safe_space_id}: {e}"
logging.error(error_msg)
print(error_msg)
return {"status": "error", "message": error_msg}
# --- New TLDR Generation Functions ---
def format_tldr_prompt(
detailed_report: str, summary_report: str
) -> list[dict[str, str]]:
"""Formats the prompt for the TLDR generation task."""
# Clean potential cache/truncation markers from input reports for the LLM
cleaned_detailed = detailed_report.replace(CACHE_INFO_MSG, "").replace(
TRUNCATION_WARNING, ""
)
cleaned_summary = summary_report.replace(CACHE_INFO_MSG, "").replace(
TRUNCATION_WARNING, ""
)
user_content = (
"Please generate a structured JSON TLDR based on the following reports:\n\n"
"--- DETAILED PRIVACY ANALYSIS REPORT START ---\n"
f"{cleaned_detailed}\n"
"--- DETAILED PRIVACY ANALYSIS REPORT END ---\n\n"
"--- SUMMARY & HIGHLIGHTS REPORT START ---\n"
f"{cleaned_summary}\n"
"--- SUMMARY & HIGHLIGHTS REPORT END ---"
)
# Note: We are not handling truncation here, assuming the input reports
# are already reasonably sized from the previous steps.
# If reports could be extremely long, add truncation logic similar to other format_* functions.
messages = [
{"role": "system", "content": TLDR_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
return messages
def parse_tldr_json_response(
response: ChatCompletionOutput | dict | None,
) -> dict | None:
"""Parses the LLM response, expecting JSON content for the TLDR."""
if response is None:
logging.error("TLDR Generation: Failed to get response from LLM.")
return None
# Check for 503 error dict first
if isinstance(response, dict) and response.get("error_type") == "503":
logging.error(f"TLDR Generation: Received 503 error: {response.get('message')}")
return None # Treat 503 as failure for this specific task
# --- Direct Content Extraction (Replaces call to parse_qwen_response) ---
raw_content = ""
try:
# Check if it's likely the expected ChatCompletionOutput structure
if not hasattr(response, "choices"):
logging.error(
f"TLDR Generation: Unexpected response type received: {type(response)}. Content: {response}"
)
return None # Return None if not the expected structure
# Access the generated content according to the ChatCompletionOutput structure
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
if content:
raw_content = content.strip()
logging.info(
"TLDR Generation: Successfully extracted raw content from response."
)
else:
logging.warning(
"TLDR Generation: Response received, but content is empty."
)
return None
else:
logging.warning("TLDR Generation: Response received, but no choices found.")
return None
except AttributeError as e:
# This might catch cases where response looks like the object but lacks expected attributes
logging.error(
f"TLDR Generation: Attribute error parsing response object: {e}. Response structure might be unexpected. Response: {response}"
)
return None
except Exception as e:
logging.error(
f"TLDR Generation: Unexpected error extracting content from response object: {e}"
)
return None
# --- End Direct Content Extraction ---
# --- JSON Parsing Logic ---
if not raw_content: # Should be caught by checks above, but belts and suspenders
logging.error("TLDR Generation: Raw content is empty after extraction attempt.")
return None
try:
# Clean potential markdown code block formatting
if raw_content.strip().startswith("```json"):
raw_content = raw_content.strip()[7:-3].strip()
elif raw_content.strip().startswith("```"):
raw_content = raw_content.strip()[3:-3].strip()
tldr_data = json.loads(raw_content)
# Validate structure: Check if it's a dict and has all required keys
required_keys = [
"app_description",
"privacy_tldr",
"data_types",
"user_input_data",
"local_processing",
"remote_processing",
"external_logging",
]
if not isinstance(tldr_data, dict):
logging.error(
f"TLDR Generation: Parsed content is not a dictionary. Content: {raw_content[:500]}..."
)
return None
if not all(key in tldr_data for key in required_keys):
missing_keys = [key for key in required_keys if key not in tldr_data]
logging.error(
f"TLDR Generation: Parsed JSON is missing required keys: {missing_keys}. Content: {raw_content[:500]}..."
)
return None
# --- Add validation for the new data_types structure ---
data_types_list = tldr_data.get("data_types")
if not isinstance(data_types_list, list):
logging.error(
f"TLDR Generation: 'data_types' is not a list. Content: {data_types_list}"
)
return None
for item in data_types_list:
if (
not isinstance(item, dict)
or "name" not in item
or "description" not in item
):
logging.error(
f"TLDR Generation: Invalid item found in 'data_types' list: {item}. Must be dict with 'name' and 'description'."
)
return None
if not isinstance(item["name"], str) or not isinstance(
item["description"], str
):
logging.error(
f"TLDR Generation: Invalid types for name/description in 'data_types' item: {item}. Must be strings."
)
return None
# --- End validation for data_types ---
# Basic validation for other lists (should contain strings)
validation_passed = True
for key in [
"user_input_data",
"local_processing",
"remote_processing",
"external_logging",
]:
data_list = tldr_data.get(key)
# Add more detailed check and logging
if not isinstance(data_list, list):
logging.error(
f"TLDR Generation Validation Error: Key '{key}' is not a list. Found type: {type(data_list)}, Value: {data_list}"
)
validation_passed = False
# Allow continuing validation for other keys, but mark as failed
elif not all(isinstance(x, str) for x in data_list):
# This check might be too strict if LLM includes non-strings, but keep for now
logging.warning(
f"TLDR Generation Validation Warning: Not all items in list '{key}' are strings. Content: {data_list}"
)
# Decide if this should cause failure - currently it doesn't, just warns
if not validation_passed:
logging.error(
"TLDR Generation: Validation failed due to incorrect list types."
)
return None # Ensure failure if any key wasn't a list
logging.info("Successfully parsed and validated TLDR JSON response.")
return tldr_data
except json.JSONDecodeError as e:
logging.error(
f"TLDR Generation: Failed to decode JSON response: {e}. Content: {raw_content[:500]}..."
)
return None
except Exception as e:
logging.error(f"TLDR Generation: Unexpected error parsing JSON response: {e}")
return None
def render_tldr_markdown(tldr_data: dict | None, space_id: str | None = None) -> str:
"""Renders the top-level TLDR (description, privacy) data into a Markdown string.
(Does not include the data lists)
"""
if not tldr_data:
# Return a more specific message for this part
return "*TLDR Summary could not be generated.*\n"
output = []
# Add Space link if space_id is provided
if space_id:
output.append(
f"**Source Space:** [`{space_id}`](https://huggingface.co/spaces/{space_id})\n"
)
output.append(f"**App Description:** {tldr_data.get('app_description', 'N/A')}\n")
privacy_summary = tldr_data.get("privacy_tldr", "N/A")
output.append(f"**Privacy TLDR:** {privacy_summary}") # Removed extra newline
# Removed data list rendering from this function
return "\n".join(output)
def render_data_details_markdown(tldr_data: dict | None) -> str:
"""Renders the data lists (types, input, processing, logging) from TLDR data."""
if not tldr_data:
return "*Data details could not be generated.*\n"
output = []
# Get defined names for formatting
defined_names = sorted(
[
dt.get("name", "")
for dt in tldr_data.get("data_types", [])
if dt.get("name")
],
key=len,
reverse=True,
)
output.append("**Data Types Defined:**") # Renamed slightly for clarity
data_types = tldr_data.get("data_types")
if data_types and isinstance(data_types, list):
if not data_types:
output.append("- None identified.")
else:
for item in data_types:
name = item.get("name", "Unnamed")
desc = item.get("description", "No description")
output.append(f"- `{name}`: {desc}")
else:
output.append("- (Error loading data types)")
output.append("") # Add newline for spacing
# Reusable helper for rendering lists
def render_list(title, key):
output.append(f"**{title}:**")
data_list = tldr_data.get(key)
if isinstance(data_list, list):
if not data_list:
output.append("- None identified.")
else:
for item_str in data_list:
formatted_item = item_str # Default
found_match = False
for name in defined_names:
if item_str == name:
formatted_item = f"`{name}`"
found_match = True
break
elif item_str.startswith(name + " "):
formatted_item = f"`{name}`{item_str[len(name):]}"
found_match = True
break
if (
not found_match
and " " not in item_str
and not item_str.startswith("`")
):
formatted_item = f"`{item_str}`"
output.append(f"- {formatted_item}")
else:
output.append("- (Error loading list)")
output.append("")
render_list("Data Sent by User to App", "user_input_data")
render_list("Data Processed Locally within App", "local_processing")
render_list("Data Processed Remotely", "remote_processing")
render_list("Data Logged/Saved Externally", "external_logging")
# Remove the last empty line
if output and output[-1] == "":
output.pop()
return "\n".join(output)
# --- Combined TLDR Generation Function ---
def generate_and_parse_tldr(detailed_report: str, summary_report: str) -> dict | None:
"""Formats prompt, queries LLM, and parses JSON response for TLDR.
Args:
detailed_report: The detailed privacy report content.
summary_report: The summary & highlights report content.
Returns:
A dictionary with the parsed TLDR data, or None if any step fails.
"""
logging.info("Starting TLDR generation and parsing...")
try:
# Format
tldr_prompt_messages = format_tldr_prompt(detailed_report, summary_report)
if not tldr_prompt_messages:
logging.error("TLDR Generation: Failed to format prompt.")
return None
# Query (using existing import within analysis_utils)
# Use slightly smaller max_tokens
llm_response = query_qwen_endpoint(tldr_prompt_messages, max_tokens=1024)
if llm_response is None: # Check if query itself failed critically
logging.error("TLDR Generation: LLM query returned None.")
return None
# 503 handled within parse function below
# Parse
parsed_data = parse_tldr_json_response(llm_response)
if parsed_data:
logging.info("Successfully generated and parsed TLDR.")
return parsed_data
else:
logging.error("TLDR Generation: Failed to parse JSON response.")
return None
except Exception as e:
logging.error(
f"TLDR Generation: Unexpected error in generate_and_parse_tldr: {e}",
exc_info=True,
)
return None