import json # Added for TLDR JSON parsing import logging import os import tempfile from huggingface_hub import HfApi from huggingface_hub.inference._generated.types import \ ChatCompletionOutput # Added for type hinting # Imports from other project modules from llm_interface import (ERROR_503_DICT, parse_qwen_response, query_qwen_endpoint) from prompts import format_privacy_prompt, format_summary_highlights_prompt from utils import (PRIVACY_FILENAME, # Import constants for filenames SUMMARY_FILENAME, TLDR_FILENAME, check_report_exists, download_cached_reports, get_space_code_files) # Configure logging (can inherit from app.py if called from there, but good practice) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # Load environment variables - redundant if always called by app.py which already loads them # load_dotenv() # Constants needed by helper functions (can be passed as args too) # Consider passing these from app.py if they might change or for clarity CACHE_INFO_MSG = "\n\n*(Report retrieved from cache)*" TRUNCATION_WARNING = """**⚠️ Warning:** The input data (code and/or prior analysis) was too long for the AI model's context limit and had to be truncated. The analysis below may be incomplete or based on partial information.\n\n---\n\n""" # --- Constants for TLDR Generation --- TLDR_SYSTEM_PROMPT = ( "You are an AI assistant specialized in summarizing privacy analysis reports for Hugging Face Spaces. " "You will receive two reports: a detailed privacy analysis and a summary/highlights report. " "Based **only** on the content of these two reports, generate a concise JSON object containing a structured TLDR (Too Long; Didn't Read). " "Do not use any information not present in the provided reports. " "The JSON object must have the following keys:\n" '- "app_description": A 1-2 sentence summary of what the application does from a user\'s perspective.\n' '- "privacy_tldr": A 2-3 sentence high-level overview of privacy. Mention if the analysis was conclusive based on available code, if data processing is local, or if/what data goes to external services.\n' '- "data_types": A list of JSON objects, where each object has two keys: \'name\' (a short, unique identifier string for the data type, e.g., "User Text") and \'description\' (a brief string explaining the data type in context, max 6-8 words, e.g., "Text prompt entered by the user").\n' "- \"user_input_data\": A list of strings, where each string is the 'name' of a data type defined in 'data_types' that is provided by the user to the app.\n" "- \"local_processing\": A list of strings describing data processed locally. Each string should start with the 'name' of a data type defined in 'data_types', followed by details (like the processing model) in parentheses if mentioned in the reports. Example: \"User Text (Local Model XYZ)\".\n" "- \"remote_processing\": A list of strings describing data sent to remote services. Each string should start with the 'name' of a data type defined in 'data_types', followed by the service/model name in parentheses if mentioned in the reports. Example: \"User Text (HF Inference API)\".\n" "- \"external_logging\": A list of strings describing data logged or saved externally. Each string should start with the 'name' of a data type defined in 'data_types', followed by the location/service in parentheses if mentioned. Example: \"User Text (External DB)\".\n" "Ensure the output is **only** a valid JSON object, starting with `{` and ending with `}`. Ensure all listed data types in the processing/logging lists exactly match a 'name' defined in the 'data_types' list." ) # --- Analysis Pipeline Helper Functions --- def check_cache_and_download(space_id: str, dataset_id: str, hf_token: str | None): """Checks cache and downloads if reports exist.""" logging.info(f"Checking cache for '{space_id}'...") found_in_cache = False if hf_token: try: found_in_cache = check_report_exists(space_id, dataset_id, hf_token) except Exception as e: logging.warning(f"Cache check failed for {space_id}: {e}. Proceeding.") # Return cache_miss even if check failed, proceed to live analysis return {"status": "cache_miss", "error_message": f"Cache check failed: {e}"} if found_in_cache: logging.info(f"Cache hit for {space_id}. Downloading.") try: cached_reports = download_cached_reports(space_id, dataset_id, hf_token) summary_report = ( cached_reports.get("summary", "Error: Cached summary not found.") + CACHE_INFO_MSG ) privacy_report = ( cached_reports.get("privacy", "Error: Cached privacy report not found.") + CACHE_INFO_MSG ) logging.info(f"Successfully downloaded cached reports for {space_id}.") return { "status": "cache_hit", "summary": summary_report, "privacy": privacy_report, "tldr_json_str": cached_reports.get("tldr_json_str"), } except Exception as e: error_msg = f"Cache download failed for {space_id}: {e}" logging.warning(f"{error_msg}. Proceeding with live analysis.") # Return error, but let caller decide if live analysis proceeds return {"status": "cache_error", "ui_message": error_msg} else: logging.info(f"Cache miss for {space_id}. Performing live analysis.") return {"status": "cache_miss"} def check_endpoint_status( endpoint_name: str, hf_token: str | None, error_503_user_message: str ): """Checks the status of the inference endpoint.""" logging.info(f"Checking endpoint status for '{endpoint_name}'...") if not hf_token: # Allow proceeding if token missing, maybe endpoint is public logging.warning("HF_TOKEN not set, cannot check endpoint status definitively.") return {"status": "ready", "warning": "HF_TOKEN not set"} try: api = HfApi(token=hf_token) endpoint = api.get_inference_endpoint(name=endpoint_name) status = endpoint.status logging.info(f"Endpoint '{endpoint_name}' status: {status}") if status == "running": return {"status": "ready"} else: logging.warning( f"Endpoint '{endpoint_name}' is not ready (Status: {status})." ) if status == "scaledToZero": logging.info( f"Endpoint '{endpoint_name}' is scaled to zero. Attempting to resume..." ) try: endpoint.resume() # Still return an error message suggesting retry, as resume takes time # Keep this message concise as the action is specific (wait) msg = f"**Endpoint Resuming:** The analysis endpoint ('{endpoint_name}') was scaled to zero and is now restarting.\n\n{error_503_user_message}" return {"status": "error", "ui_message": msg} except Exception as resume_error: # Resume failed, provide detailed message logging.error( f"Failed to resume endpoint {endpoint_name}: {resume_error}" ) # Construct detailed message including full explanation msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') is currently {status} and an attempt to resume it failed ({resume_error}).\n\n{error_503_user_message}" return {"status": "error", "ui_message": msg} else: # Paused, failed, pending etc. # Construct detailed message including full explanation msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') status is currently **{status}**.\n\n{error_503_user_message}" return {"status": "error", "ui_message": msg} except Exception as e: error_msg = f"Error checking analysis endpoint status for {endpoint_name}: {e}" logging.error(error_msg) # Let analysis stop if endpoint check fails critically return {"status": "error", "ui_message": f"Error checking endpoint status: {e}"} def fetch_and_validate_code(space_id: str): """Fetches and validates code files for the space.""" logging.info(f"Fetching code files for {space_id}...") code_files = get_space_code_files(space_id) if not code_files: error_msg = f"Could not retrieve code files for '{space_id}'. Check ID and ensure it's a public Space." logging.warning(error_msg) return { "status": "error", "ui_message": f"**Error:**\n{error_msg}\nAnalysis Canceled.", } logging.info(f"Successfully fetched {len(code_files)} files for {space_id}.") return {"status": "success", "code_files": code_files} def generate_detailed_report( space_id: str, code_files: dict, error_503_user_message: str ): """Generates the detailed privacy report using the LLM.""" logging.info("Generating detailed privacy analysis report...") privacy_prompt_messages, privacy_truncated = format_privacy_prompt( space_id, code_files ) privacy_api_response = query_qwen_endpoint(privacy_prompt_messages, max_tokens=3072) if privacy_api_response == ERROR_503_DICT: logging.warning("LLM Call 1 (Privacy) failed with 503.") return {"status": "error", "ui_message": error_503_user_message} detailed_privacy_report = parse_qwen_response(privacy_api_response) if "Error:" in detailed_privacy_report: error_msg = ( f"Failed to generate detailed privacy report: {detailed_privacy_report}" ) logging.error(error_msg) return { "status": "error", "ui_message": f"**Error Generating Detailed Privacy Report:**\n{detailed_privacy_report}\nAnalysis Halted.", } if privacy_truncated: detailed_privacy_report = TRUNCATION_WARNING + detailed_privacy_report logging.info("Successfully generated detailed privacy report.") return { "status": "success", "report": detailed_privacy_report, "truncated": privacy_truncated, } def generate_summary_report( space_id: str, code_files: dict, detailed_privacy_report: str, error_503_user_message: str, ): """Generates the summary & highlights report using the LLM.""" logging.info("Generating summary and highlights report...") # Remove potential truncation warning from detailed report before sending to next LLM clean_detailed_report = detailed_privacy_report.replace(TRUNCATION_WARNING, "") summary_highlights_prompt_messages, summary_truncated = ( format_summary_highlights_prompt(space_id, code_files, clean_detailed_report) ) summary_highlights_api_response = query_qwen_endpoint( summary_highlights_prompt_messages, max_tokens=2048 ) if summary_highlights_api_response == ERROR_503_DICT: logging.warning("LLM Call 2 (Summary) failed with 503.") # Return specific status to indicate partial success return {"status": "error_503_summary", "ui_message": error_503_user_message} summary_highlights_report = parse_qwen_response(summary_highlights_api_response) if "Error:" in summary_highlights_report: error_msg = ( f"Failed to generate summary/highlights report: {summary_highlights_report}" ) logging.error(error_msg) # Return specific status to indicate partial success return { "status": "error_summary", "ui_message": f"**Error Generating Summary/Highlights:**\n{summary_highlights_report}", } if summary_truncated: summary_highlights_report = TRUNCATION_WARNING + summary_highlights_report logging.info("Successfully generated summary & highlights report.") return { "status": "success", "report": summary_highlights_report, "truncated": summary_truncated, } def upload_results( space_id: str, summary_report: str, detailed_report: str, dataset_id: str, hf_token: str | None, tldr_json_data: dict | None = None, ): """Uploads the generated reports (Markdown and optional JSON TLDR) to the specified dataset repository.""" if not hf_token: logging.warning("HF Token not provided, skipping dataset report upload.") return {"status": "skipped", "reason": "HF_TOKEN not set"} if "Error:" in detailed_report or "Error:" in summary_report: msg = "Skipping cache upload due to errors in generated reports." logging.warning(msg) return {"status": "skipped", "reason": msg} safe_space_id = space_id.replace("..", "") try: with tempfile.TemporaryDirectory() as tmpdir: # Define local paths summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) tldr_json_path_local = os.path.join(tmpdir, TLDR_FILENAME) # Write Markdown reports with open(summary_path_local, "w", encoding="utf-8") as f: f.write(summary_report) with open(privacy_path_local, "w", encoding="utf-8") as f: f.write(detailed_report) # Prepare commit message commit_message = f"Add analysis reports for Space: {safe_space_id}" if tldr_json_data: commit_message += " (including TLDR JSON)" print(f"Successfully wrote TLDR JSON locally for {safe_space_id}.") # Write JSON TLDR data if available try: with open(tldr_json_path_local, "w", encoding="utf-8") as f: json.dump(tldr_json_data, f, indent=2, ensure_ascii=False) logging.info( f"Successfully wrote TLDR JSON locally for {safe_space_id}." ) except Exception as json_err: logging.error( f"Failed to write TLDR JSON locally for {safe_space_id}: {json_err}" ) tldr_json_data = None # Prevent upload attempt if writing failed # Ensure repo exists api = HfApi(token=hf_token) repo_url = api.create_repo( repo_id=dataset_id, repo_type="dataset", exist_ok=True, ) logging.info(f"Ensured dataset repo {repo_url} exists.") # Upload summary report api.upload_file( path_or_fileobj=summary_path_local, path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", repo_id=dataset_id, repo_type="dataset", commit_message=commit_message, ) logging.info(f"Successfully uploaded summary report for {safe_space_id}.") # Upload privacy report api.upload_file( path_or_fileobj=privacy_path_local, path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", repo_id=dataset_id, repo_type="dataset", commit_message=commit_message, ) logging.info( f"Successfully uploaded detailed privacy report for {safe_space_id}." ) # print(f"Successfully uploaded detailed privacy report for {safe_space_id}.") # Keep if needed for debug # Upload JSON TLDR if it was successfully written locally if tldr_json_data and os.path.exists(tldr_json_path_local): api.upload_file( path_or_fileobj=tldr_json_path_local, path_in_repo=f"{safe_space_id}/{TLDR_FILENAME}", repo_id=dataset_id, repo_type="dataset", commit_message=commit_message, # Can reuse commit message or make specific ) logging.info(f"Successfully uploaded TLDR JSON for {safe_space_id}.") print(f"Successfully uploaded TLDR JSON for {safe_space_id}.") # Return success if all uploads finished without error return {"status": "success"} except Exception as e: error_msg = f"Non-critical error during report upload for {safe_space_id}: {e}" logging.error(error_msg) print(error_msg) return {"status": "error", "message": error_msg} # --- New TLDR Generation Functions --- def format_tldr_prompt( detailed_report: str, summary_report: str ) -> list[dict[str, str]]: """Formats the prompt for the TLDR generation task.""" # Clean potential cache/truncation markers from input reports for the LLM cleaned_detailed = detailed_report.replace(CACHE_INFO_MSG, "").replace( TRUNCATION_WARNING, "" ) cleaned_summary = summary_report.replace(CACHE_INFO_MSG, "").replace( TRUNCATION_WARNING, "" ) user_content = ( "Please generate a structured JSON TLDR based on the following reports:\n\n" "--- DETAILED PRIVACY ANALYSIS REPORT START ---\n" f"{cleaned_detailed}\n" "--- DETAILED PRIVACY ANALYSIS REPORT END ---\n\n" "--- SUMMARY & HIGHLIGHTS REPORT START ---\n" f"{cleaned_summary}\n" "--- SUMMARY & HIGHLIGHTS REPORT END ---" ) # Note: We are not handling truncation here, assuming the input reports # are already reasonably sized from the previous steps. # If reports could be extremely long, add truncation logic similar to other format_* functions. messages = [ {"role": "system", "content": TLDR_SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] return messages def parse_tldr_json_response( response: ChatCompletionOutput | dict | None, ) -> dict | None: """Parses the LLM response, expecting JSON content for the TLDR.""" if response is None: logging.error("TLDR Generation: Failed to get response from LLM.") return None # Check for 503 error dict first if isinstance(response, dict) and response.get("error_type") == "503": logging.error(f"TLDR Generation: Received 503 error: {response.get('message')}") return None # Treat 503 as failure for this specific task # --- Direct Content Extraction (Replaces call to parse_qwen_response) --- raw_content = "" try: # Check if it's likely the expected ChatCompletionOutput structure if not hasattr(response, "choices"): logging.error( f"TLDR Generation: Unexpected response type received: {type(response)}. Content: {response}" ) return None # Return None if not the expected structure # Access the generated content according to the ChatCompletionOutput structure if response.choices and len(response.choices) > 0: content = response.choices[0].message.content if content: raw_content = content.strip() logging.info( "TLDR Generation: Successfully extracted raw content from response." ) else: logging.warning( "TLDR Generation: Response received, but content is empty." ) return None else: logging.warning("TLDR Generation: Response received, but no choices found.") return None except AttributeError as e: # This might catch cases where response looks like the object but lacks expected attributes logging.error( f"TLDR Generation: Attribute error parsing response object: {e}. Response structure might be unexpected. Response: {response}" ) return None except Exception as e: logging.error( f"TLDR Generation: Unexpected error extracting content from response object: {e}" ) return None # --- End Direct Content Extraction --- # --- JSON Parsing Logic --- if not raw_content: # Should be caught by checks above, but belts and suspenders logging.error("TLDR Generation: Raw content is empty after extraction attempt.") return None try: # Clean potential markdown code block formatting if raw_content.strip().startswith("```json"): raw_content = raw_content.strip()[7:-3].strip() elif raw_content.strip().startswith("```"): raw_content = raw_content.strip()[3:-3].strip() tldr_data = json.loads(raw_content) # Validate structure: Check if it's a dict and has all required keys required_keys = [ "app_description", "privacy_tldr", "data_types", "user_input_data", "local_processing", "remote_processing", "external_logging", ] if not isinstance(tldr_data, dict): logging.error( f"TLDR Generation: Parsed content is not a dictionary. Content: {raw_content[:500]}..." ) return None if not all(key in tldr_data for key in required_keys): missing_keys = [key for key in required_keys if key not in tldr_data] logging.error( f"TLDR Generation: Parsed JSON is missing required keys: {missing_keys}. Content: {raw_content[:500]}..." ) return None # --- Add validation for the new data_types structure --- data_types_list = tldr_data.get("data_types") if not isinstance(data_types_list, list): logging.error( f"TLDR Generation: 'data_types' is not a list. Content: {data_types_list}" ) return None for item in data_types_list: if ( not isinstance(item, dict) or "name" not in item or "description" not in item ): logging.error( f"TLDR Generation: Invalid item found in 'data_types' list: {item}. Must be dict with 'name' and 'description'." ) return None if not isinstance(item["name"], str) or not isinstance( item["description"], str ): logging.error( f"TLDR Generation: Invalid types for name/description in 'data_types' item: {item}. Must be strings." ) return None # --- End validation for data_types --- # Basic validation for other lists (should contain strings) validation_passed = True for key in [ "user_input_data", "local_processing", "remote_processing", "external_logging", ]: data_list = tldr_data.get(key) # Add more detailed check and logging if not isinstance(data_list, list): logging.error( f"TLDR Generation Validation Error: Key '{key}' is not a list. Found type: {type(data_list)}, Value: {data_list}" ) validation_passed = False # Allow continuing validation for other keys, but mark as failed elif not all(isinstance(x, str) for x in data_list): # This check might be too strict if LLM includes non-strings, but keep for now logging.warning( f"TLDR Generation Validation Warning: Not all items in list '{key}' are strings. Content: {data_list}" ) # Decide if this should cause failure - currently it doesn't, just warns if not validation_passed: logging.error( "TLDR Generation: Validation failed due to incorrect list types." ) return None # Ensure failure if any key wasn't a list logging.info("Successfully parsed and validated TLDR JSON response.") return tldr_data except json.JSONDecodeError as e: logging.error( f"TLDR Generation: Failed to decode JSON response: {e}. Content: {raw_content[:500]}..." ) return None except Exception as e: logging.error(f"TLDR Generation: Unexpected error parsing JSON response: {e}") return None def render_tldr_markdown(tldr_data: dict | None, space_id: str | None = None) -> str: """Renders the top-level TLDR (description, privacy) data into a Markdown string. (Does not include the data lists) """ if not tldr_data: # Return a more specific message for this part return "*TLDR Summary could not be generated.*\n" output = [] # Add Space link if space_id is provided if space_id: output.append( f"**Source Space:** [`{space_id}`](https://huggingface.co/spaces/{space_id})\n" ) output.append(f"**App Description:** {tldr_data.get('app_description', 'N/A')}\n") privacy_summary = tldr_data.get("privacy_tldr", "N/A") output.append(f"**Privacy TLDR:** {privacy_summary}") # Removed extra newline # Removed data list rendering from this function return "\n".join(output) def render_data_details_markdown(tldr_data: dict | None) -> str: """Renders the data lists (types, input, processing, logging) from TLDR data.""" if not tldr_data: return "*Data details could not be generated.*\n" output = [] # Get defined names for formatting defined_names = sorted( [ dt.get("name", "") for dt in tldr_data.get("data_types", []) if dt.get("name") ], key=len, reverse=True, ) output.append("**Data Types Defined:**") # Renamed slightly for clarity data_types = tldr_data.get("data_types") if data_types and isinstance(data_types, list): if not data_types: output.append("- None identified.") else: for item in data_types: name = item.get("name", "Unnamed") desc = item.get("description", "No description") output.append(f"- `{name}`: {desc}") else: output.append("- (Error loading data types)") output.append("") # Add newline for spacing # Reusable helper for rendering lists def render_list(title, key): output.append(f"**{title}:**") data_list = tldr_data.get(key) if isinstance(data_list, list): if not data_list: output.append("- None identified.") else: for item_str in data_list: formatted_item = item_str # Default found_match = False for name in defined_names: if item_str == name: formatted_item = f"`{name}`" found_match = True break elif item_str.startswith(name + " "): formatted_item = f"`{name}`{item_str[len(name):]}" found_match = True break if ( not found_match and " " not in item_str and not item_str.startswith("`") ): formatted_item = f"`{item_str}`" output.append(f"- {formatted_item}") else: output.append("- (Error loading list)") output.append("") render_list("Data Sent by User to App", "user_input_data") render_list("Data Processed Locally within App", "local_processing") render_list("Data Processed Remotely", "remote_processing") render_list("Data Logged/Saved Externally", "external_logging") # Remove the last empty line if output and output[-1] == "": output.pop() return "\n".join(output) # --- Combined TLDR Generation Function --- def generate_and_parse_tldr(detailed_report: str, summary_report: str) -> dict | None: """Formats prompt, queries LLM, and parses JSON response for TLDR. Args: detailed_report: The detailed privacy report content. summary_report: The summary & highlights report content. Returns: A dictionary with the parsed TLDR data, or None if any step fails. """ logging.info("Starting TLDR generation and parsing...") try: # Format tldr_prompt_messages = format_tldr_prompt(detailed_report, summary_report) if not tldr_prompt_messages: logging.error("TLDR Generation: Failed to format prompt.") return None # Query (using existing import within analysis_utils) # Use slightly smaller max_tokens llm_response = query_qwen_endpoint(tldr_prompt_messages, max_tokens=1024) if llm_response is None: # Check if query itself failed critically logging.error("TLDR Generation: LLM query returned None.") return None # 503 handled within parse function below # Parse parsed_data = parse_tldr_json_response(llm_response) if parsed_data: logging.info("Successfully generated and parsed TLDR.") return parsed_data else: logging.error("TLDR Generation: Failed to parse JSON response.") return None except Exception as e: logging.error( f"TLDR Generation: Unexpected error in generate_and_parse_tldr: {e}", exc_info=True, ) return None