Spaces:
Running
Running
import json # Added for TLDR JSON parsing | |
import logging | |
import os | |
import tempfile | |
from huggingface_hub import HfApi | |
from huggingface_hub.inference._generated.types import \ | |
ChatCompletionOutput # Added for type hinting | |
# Imports from other project modules | |
from llm_interface import (ERROR_503_DICT, parse_qwen_response, | |
query_qwen_endpoint) | |
from prompts import format_privacy_prompt, format_summary_highlights_prompt | |
from utils import (PRIVACY_FILENAME, # Import constants for filenames | |
SUMMARY_FILENAME, TLDR_FILENAME, check_report_exists, | |
download_cached_reports, get_space_code_files) | |
# Configure logging (can inherit from app.py if called from there, but good practice) | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Load environment variables - redundant if always called by app.py which already loads them | |
# load_dotenv() | |
# Constants needed by helper functions (can be passed as args too) | |
# Consider passing these from app.py if they might change or for clarity | |
CACHE_INFO_MSG = "\n\n*(Report retrieved from cache)*" | |
TRUNCATION_WARNING = """**⚠️ Warning:** The input data (code and/or prior analysis) was too long for the AI model's context limit and had to be truncated. The analysis below may be incomplete or based on partial information.\n\n---\n\n""" | |
# --- Constants for TLDR Generation --- | |
TLDR_SYSTEM_PROMPT = ( | |
"You are an AI assistant specialized in summarizing privacy analysis reports for Hugging Face Spaces. " | |
"You will receive two reports: a detailed privacy analysis and a summary/highlights report. " | |
"Based **only** on the content of these two reports, generate a concise JSON object containing a structured TLDR (Too Long; Didn't Read). " | |
"Do not use any information not present in the provided reports. " | |
"The JSON object must have the following keys:\n" | |
'- "app_description": A 1-2 sentence summary of what the application does from a user\'s perspective.\n' | |
'- "privacy_tldr": A 2-3 sentence high-level overview of privacy. Mention if the analysis was conclusive based on available code, if data processing is local, or if/what data goes to external services.\n' | |
'- "data_types": A list of JSON objects, where each object has two keys: \'name\' (a short, unique identifier string for the data type, e.g., "User Text") and \'description\' (a brief string explaining the data type in context, max 6-8 words, e.g., "Text prompt entered by the user").\n' | |
"- \"user_input_data\": A list of strings, where each string is the 'name' of a data type defined in 'data_types' that is provided by the user to the app.\n" | |
"- \"local_processing\": A list of strings describing data processed locally. Each string should start with the 'name' of a data type defined in 'data_types', followed by details (like the processing model) in parentheses if mentioned in the reports. Example: \"User Text (Local Model XYZ)\".\n" | |
"- \"remote_processing\": A list of strings describing data sent to remote services. Each string should start with the 'name' of a data type defined in 'data_types', followed by the service/model name in parentheses if mentioned in the reports. Example: \"User Text (HF Inference API)\".\n" | |
"- \"external_logging\": A list of strings describing data logged or saved externally. Each string should start with the 'name' of a data type defined in 'data_types', followed by the location/service in parentheses if mentioned. Example: \"User Text (External DB)\".\n" | |
"Ensure the output is **only** a valid JSON object, starting with `{` and ending with `}`. Ensure all listed data types in the processing/logging lists exactly match a 'name' defined in the 'data_types' list." | |
) | |
# --- Analysis Pipeline Helper Functions --- | |
def check_cache_and_download(space_id: str, dataset_id: str, hf_token: str | None): | |
"""Checks cache and downloads if reports exist.""" | |
logging.info(f"Checking cache for '{space_id}'...") | |
found_in_cache = False | |
if hf_token: | |
try: | |
found_in_cache = check_report_exists(space_id, dataset_id, hf_token) | |
except Exception as e: | |
logging.warning(f"Cache check failed for {space_id}: {e}. Proceeding.") | |
# Return cache_miss even if check failed, proceed to live analysis | |
return {"status": "cache_miss", "error_message": f"Cache check failed: {e}"} | |
if found_in_cache: | |
logging.info(f"Cache hit for {space_id}. Downloading.") | |
try: | |
cached_reports = download_cached_reports(space_id, dataset_id, hf_token) | |
summary_report = ( | |
cached_reports.get("summary", "Error: Cached summary not found.") | |
+ CACHE_INFO_MSG | |
) | |
privacy_report = ( | |
cached_reports.get("privacy", "Error: Cached privacy report not found.") | |
+ CACHE_INFO_MSG | |
) | |
logging.info(f"Successfully downloaded cached reports for {space_id}.") | |
return { | |
"status": "cache_hit", | |
"summary": summary_report, | |
"privacy": privacy_report, | |
"tldr_json_str": cached_reports.get("tldr_json_str"), | |
} | |
except Exception as e: | |
error_msg = f"Cache download failed for {space_id}: {e}" | |
logging.warning(f"{error_msg}. Proceeding with live analysis.") | |
# Return error, but let caller decide if live analysis proceeds | |
return {"status": "cache_error", "ui_message": error_msg} | |
else: | |
logging.info(f"Cache miss for {space_id}. Performing live analysis.") | |
return {"status": "cache_miss"} | |
def check_endpoint_status( | |
endpoint_name: str, hf_token: str | None, error_503_user_message: str | |
): | |
"""Checks the status of the inference endpoint.""" | |
logging.info(f"Checking endpoint status for '{endpoint_name}'...") | |
if not hf_token: | |
# Allow proceeding if token missing, maybe endpoint is public | |
logging.warning("HF_TOKEN not set, cannot check endpoint status definitively.") | |
return {"status": "ready", "warning": "HF_TOKEN not set"} | |
try: | |
api = HfApi(token=hf_token) | |
endpoint = api.get_inference_endpoint(name=endpoint_name) | |
status = endpoint.status | |
logging.info(f"Endpoint '{endpoint_name}' status: {status}") | |
if status == "running": | |
return {"status": "ready"} | |
else: | |
logging.warning( | |
f"Endpoint '{endpoint_name}' is not ready (Status: {status})." | |
) | |
if status == "scaledToZero": | |
logging.info( | |
f"Endpoint '{endpoint_name}' is scaled to zero. Attempting to resume..." | |
) | |
try: | |
endpoint.resume() | |
# Still return an error message suggesting retry, as resume takes time | |
# Keep this message concise as the action is specific (wait) | |
msg = f"**Endpoint Resuming:** The analysis endpoint ('{endpoint_name}') was scaled to zero and is now restarting.\n\n{error_503_user_message}" | |
return {"status": "error", "ui_message": msg} | |
except Exception as resume_error: | |
# Resume failed, provide detailed message | |
logging.error( | |
f"Failed to resume endpoint {endpoint_name}: {resume_error}" | |
) | |
# Construct detailed message including full explanation | |
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') is currently {status} and an attempt to resume it failed ({resume_error}).\n\n{error_503_user_message}" | |
return {"status": "error", "ui_message": msg} | |
else: # Paused, failed, pending etc. | |
# Construct detailed message including full explanation | |
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') status is currently <span style='color:red'>**{status}**</span>.\n\n{error_503_user_message}" | |
return {"status": "error", "ui_message": msg} | |
except Exception as e: | |
error_msg = f"Error checking analysis endpoint status for {endpoint_name}: {e}" | |
logging.error(error_msg) | |
# Let analysis stop if endpoint check fails critically | |
return {"status": "error", "ui_message": f"Error checking endpoint status: {e}"} | |
def fetch_and_validate_code(space_id: str): | |
"""Fetches and validates code files for the space.""" | |
logging.info(f"Fetching code files for {space_id}...") | |
code_files = get_space_code_files(space_id) | |
if not code_files: | |
error_msg = f"Could not retrieve code files for '{space_id}'. Check ID and ensure it's a public Space." | |
logging.warning(error_msg) | |
return { | |
"status": "error", | |
"ui_message": f"**Error:**\n{error_msg}\nAnalysis Canceled.", | |
} | |
logging.info(f"Successfully fetched {len(code_files)} files for {space_id}.") | |
return {"status": "success", "code_files": code_files} | |
def generate_detailed_report( | |
space_id: str, code_files: dict, error_503_user_message: str | |
): | |
"""Generates the detailed privacy report using the LLM.""" | |
logging.info("Generating detailed privacy analysis report...") | |
privacy_prompt_messages, privacy_truncated = format_privacy_prompt( | |
space_id, code_files | |
) | |
privacy_api_response = query_qwen_endpoint(privacy_prompt_messages, max_tokens=3072) | |
if privacy_api_response == ERROR_503_DICT: | |
logging.warning("LLM Call 1 (Privacy) failed with 503.") | |
return {"status": "error", "ui_message": error_503_user_message} | |
detailed_privacy_report = parse_qwen_response(privacy_api_response) | |
if "Error:" in detailed_privacy_report: | |
error_msg = ( | |
f"Failed to generate detailed privacy report: {detailed_privacy_report}" | |
) | |
logging.error(error_msg) | |
return { | |
"status": "error", | |
"ui_message": f"**Error Generating Detailed Privacy Report:**\n{detailed_privacy_report}\nAnalysis Halted.", | |
} | |
if privacy_truncated: | |
detailed_privacy_report = TRUNCATION_WARNING + detailed_privacy_report | |
logging.info("Successfully generated detailed privacy report.") | |
return { | |
"status": "success", | |
"report": detailed_privacy_report, | |
"truncated": privacy_truncated, | |
} | |
def generate_summary_report( | |
space_id: str, | |
code_files: dict, | |
detailed_privacy_report: str, | |
error_503_user_message: str, | |
): | |
"""Generates the summary & highlights report using the LLM.""" | |
logging.info("Generating summary and highlights report...") | |
# Remove potential truncation warning from detailed report before sending to next LLM | |
clean_detailed_report = detailed_privacy_report.replace(TRUNCATION_WARNING, "") | |
summary_highlights_prompt_messages, summary_truncated = ( | |
format_summary_highlights_prompt(space_id, code_files, clean_detailed_report) | |
) | |
summary_highlights_api_response = query_qwen_endpoint( | |
summary_highlights_prompt_messages, max_tokens=2048 | |
) | |
if summary_highlights_api_response == ERROR_503_DICT: | |
logging.warning("LLM Call 2 (Summary) failed with 503.") | |
# Return specific status to indicate partial success | |
return {"status": "error_503_summary", "ui_message": error_503_user_message} | |
summary_highlights_report = parse_qwen_response(summary_highlights_api_response) | |
if "Error:" in summary_highlights_report: | |
error_msg = ( | |
f"Failed to generate summary/highlights report: {summary_highlights_report}" | |
) | |
logging.error(error_msg) | |
# Return specific status to indicate partial success | |
return { | |
"status": "error_summary", | |
"ui_message": f"**Error Generating Summary/Highlights:**\n{summary_highlights_report}", | |
} | |
if summary_truncated: | |
summary_highlights_report = TRUNCATION_WARNING + summary_highlights_report | |
logging.info("Successfully generated summary & highlights report.") | |
return { | |
"status": "success", | |
"report": summary_highlights_report, | |
"truncated": summary_truncated, | |
} | |
def upload_results( | |
space_id: str, | |
summary_report: str, | |
detailed_report: str, | |
dataset_id: str, | |
hf_token: str | None, | |
tldr_json_data: dict | None = None, | |
): | |
"""Uploads the generated reports (Markdown and optional JSON TLDR) to the specified dataset repository.""" | |
if not hf_token: | |
logging.warning("HF Token not provided, skipping dataset report upload.") | |
return {"status": "skipped", "reason": "HF_TOKEN not set"} | |
if "Error:" in detailed_report or "Error:" in summary_report: | |
msg = "Skipping cache upload due to errors in generated reports." | |
logging.warning(msg) | |
return {"status": "skipped", "reason": msg} | |
safe_space_id = space_id.replace("..", "") | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
# Define local paths | |
summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) | |
privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) | |
tldr_json_path_local = os.path.join(tmpdir, TLDR_FILENAME) | |
# Write Markdown reports | |
with open(summary_path_local, "w", encoding="utf-8") as f: | |
f.write(summary_report) | |
with open(privacy_path_local, "w", encoding="utf-8") as f: | |
f.write(detailed_report) | |
# Prepare commit message | |
commit_message = f"Add analysis reports for Space: {safe_space_id}" | |
if tldr_json_data: | |
commit_message += " (including TLDR JSON)" | |
print(f"Successfully wrote TLDR JSON locally for {safe_space_id}.") | |
# Write JSON TLDR data if available | |
try: | |
with open(tldr_json_path_local, "w", encoding="utf-8") as f: | |
json.dump(tldr_json_data, f, indent=2, ensure_ascii=False) | |
logging.info( | |
f"Successfully wrote TLDR JSON locally for {safe_space_id}." | |
) | |
except Exception as json_err: | |
logging.error( | |
f"Failed to write TLDR JSON locally for {safe_space_id}: {json_err}" | |
) | |
tldr_json_data = None # Prevent upload attempt if writing failed | |
# Ensure repo exists | |
api = HfApi(token=hf_token) | |
repo_url = api.create_repo( | |
repo_id=dataset_id, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
logging.info(f"Ensured dataset repo {repo_url} exists.") | |
# Upload summary report | |
api.upload_file( | |
path_or_fileobj=summary_path_local, | |
path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", | |
repo_id=dataset_id, | |
repo_type="dataset", | |
commit_message=commit_message, | |
) | |
logging.info(f"Successfully uploaded summary report for {safe_space_id}.") | |
# Upload privacy report | |
api.upload_file( | |
path_or_fileobj=privacy_path_local, | |
path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", | |
repo_id=dataset_id, | |
repo_type="dataset", | |
commit_message=commit_message, | |
) | |
logging.info( | |
f"Successfully uploaded detailed privacy report for {safe_space_id}." | |
) | |
# print(f"Successfully uploaded detailed privacy report for {safe_space_id}.") # Keep if needed for debug | |
# Upload JSON TLDR if it was successfully written locally | |
if tldr_json_data and os.path.exists(tldr_json_path_local): | |
api.upload_file( | |
path_or_fileobj=tldr_json_path_local, | |
path_in_repo=f"{safe_space_id}/{TLDR_FILENAME}", | |
repo_id=dataset_id, | |
repo_type="dataset", | |
commit_message=commit_message, # Can reuse commit message or make specific | |
) | |
logging.info(f"Successfully uploaded TLDR JSON for {safe_space_id}.") | |
print(f"Successfully uploaded TLDR JSON for {safe_space_id}.") | |
# Return success if all uploads finished without error | |
return {"status": "success"} | |
except Exception as e: | |
error_msg = f"Non-critical error during report upload for {safe_space_id}: {e}" | |
logging.error(error_msg) | |
print(error_msg) | |
return {"status": "error", "message": error_msg} | |
# --- New TLDR Generation Functions --- | |
def format_tldr_prompt( | |
detailed_report: str, summary_report: str | |
) -> list[dict[str, str]]: | |
"""Formats the prompt for the TLDR generation task.""" | |
# Clean potential cache/truncation markers from input reports for the LLM | |
cleaned_detailed = detailed_report.replace(CACHE_INFO_MSG, "").replace( | |
TRUNCATION_WARNING, "" | |
) | |
cleaned_summary = summary_report.replace(CACHE_INFO_MSG, "").replace( | |
TRUNCATION_WARNING, "" | |
) | |
user_content = ( | |
"Please generate a structured JSON TLDR based on the following reports:\n\n" | |
"--- DETAILED PRIVACY ANALYSIS REPORT START ---\n" | |
f"{cleaned_detailed}\n" | |
"--- DETAILED PRIVACY ANALYSIS REPORT END ---\n\n" | |
"--- SUMMARY & HIGHLIGHTS REPORT START ---\n" | |
f"{cleaned_summary}\n" | |
"--- SUMMARY & HIGHLIGHTS REPORT END ---" | |
) | |
# Note: We are not handling truncation here, assuming the input reports | |
# are already reasonably sized from the previous steps. | |
# If reports could be extremely long, add truncation logic similar to other format_* functions. | |
messages = [ | |
{"role": "system", "content": TLDR_SYSTEM_PROMPT}, | |
{"role": "user", "content": user_content}, | |
] | |
return messages | |
def parse_tldr_json_response( | |
response: ChatCompletionOutput | dict | None, | |
) -> dict | None: | |
"""Parses the LLM response, expecting JSON content for the TLDR.""" | |
if response is None: | |
logging.error("TLDR Generation: Failed to get response from LLM.") | |
return None | |
# Check for 503 error dict first | |
if isinstance(response, dict) and response.get("error_type") == "503": | |
logging.error(f"TLDR Generation: Received 503 error: {response.get('message')}") | |
return None # Treat 503 as failure for this specific task | |
# --- Direct Content Extraction (Replaces call to parse_qwen_response) --- | |
raw_content = "" | |
try: | |
# Check if it's likely the expected ChatCompletionOutput structure | |
if not hasattr(response, "choices"): | |
logging.error( | |
f"TLDR Generation: Unexpected response type received: {type(response)}. Content: {response}" | |
) | |
return None # Return None if not the expected structure | |
# Access the generated content according to the ChatCompletionOutput structure | |
if response.choices and len(response.choices) > 0: | |
content = response.choices[0].message.content | |
if content: | |
raw_content = content.strip() | |
logging.info( | |
"TLDR Generation: Successfully extracted raw content from response." | |
) | |
else: | |
logging.warning( | |
"TLDR Generation: Response received, but content is empty." | |
) | |
return None | |
else: | |
logging.warning("TLDR Generation: Response received, but no choices found.") | |
return None | |
except AttributeError as e: | |
# This might catch cases where response looks like the object but lacks expected attributes | |
logging.error( | |
f"TLDR Generation: Attribute error parsing response object: {e}. Response structure might be unexpected. Response: {response}" | |
) | |
return None | |
except Exception as e: | |
logging.error( | |
f"TLDR Generation: Unexpected error extracting content from response object: {e}" | |
) | |
return None | |
# --- End Direct Content Extraction --- | |
# --- JSON Parsing Logic --- | |
if not raw_content: # Should be caught by checks above, but belts and suspenders | |
logging.error("TLDR Generation: Raw content is empty after extraction attempt.") | |
return None | |
try: | |
# Clean potential markdown code block formatting | |
if raw_content.strip().startswith("```json"): | |
raw_content = raw_content.strip()[7:-3].strip() | |
elif raw_content.strip().startswith("```"): | |
raw_content = raw_content.strip()[3:-3].strip() | |
tldr_data = json.loads(raw_content) | |
# Validate structure: Check if it's a dict and has all required keys | |
required_keys = [ | |
"app_description", | |
"privacy_tldr", | |
"data_types", | |
"user_input_data", | |
"local_processing", | |
"remote_processing", | |
"external_logging", | |
] | |
if not isinstance(tldr_data, dict): | |
logging.error( | |
f"TLDR Generation: Parsed content is not a dictionary. Content: {raw_content[:500]}..." | |
) | |
return None | |
if not all(key in tldr_data for key in required_keys): | |
missing_keys = [key for key in required_keys if key not in tldr_data] | |
logging.error( | |
f"TLDR Generation: Parsed JSON is missing required keys: {missing_keys}. Content: {raw_content[:500]}..." | |
) | |
return None | |
# --- Add validation for the new data_types structure --- | |
data_types_list = tldr_data.get("data_types") | |
if not isinstance(data_types_list, list): | |
logging.error( | |
f"TLDR Generation: 'data_types' is not a list. Content: {data_types_list}" | |
) | |
return None | |
for item in data_types_list: | |
if ( | |
not isinstance(item, dict) | |
or "name" not in item | |
or "description" not in item | |
): | |
logging.error( | |
f"TLDR Generation: Invalid item found in 'data_types' list: {item}. Must be dict with 'name' and 'description'." | |
) | |
return None | |
if not isinstance(item["name"], str) or not isinstance( | |
item["description"], str | |
): | |
logging.error( | |
f"TLDR Generation: Invalid types for name/description in 'data_types' item: {item}. Must be strings." | |
) | |
return None | |
# --- End validation for data_types --- | |
# Basic validation for other lists (should contain strings) | |
validation_passed = True | |
for key in [ | |
"user_input_data", | |
"local_processing", | |
"remote_processing", | |
"external_logging", | |
]: | |
data_list = tldr_data.get(key) | |
# Add more detailed check and logging | |
if not isinstance(data_list, list): | |
logging.error( | |
f"TLDR Generation Validation Error: Key '{key}' is not a list. Found type: {type(data_list)}, Value: {data_list}" | |
) | |
validation_passed = False | |
# Allow continuing validation for other keys, but mark as failed | |
elif not all(isinstance(x, str) for x in data_list): | |
# This check might be too strict if LLM includes non-strings, but keep for now | |
logging.warning( | |
f"TLDR Generation Validation Warning: Not all items in list '{key}' are strings. Content: {data_list}" | |
) | |
# Decide if this should cause failure - currently it doesn't, just warns | |
if not validation_passed: | |
logging.error( | |
"TLDR Generation: Validation failed due to incorrect list types." | |
) | |
return None # Ensure failure if any key wasn't a list | |
logging.info("Successfully parsed and validated TLDR JSON response.") | |
return tldr_data | |
except json.JSONDecodeError as e: | |
logging.error( | |
f"TLDR Generation: Failed to decode JSON response: {e}. Content: {raw_content[:500]}..." | |
) | |
return None | |
except Exception as e: | |
logging.error(f"TLDR Generation: Unexpected error parsing JSON response: {e}") | |
return None | |
def render_tldr_markdown(tldr_data: dict | None, space_id: str | None = None) -> str: | |
"""Renders the top-level TLDR (description, privacy) data into a Markdown string. | |
(Does not include the data lists) | |
""" | |
if not tldr_data: | |
# Return a more specific message for this part | |
return "*TLDR Summary could not be generated.*\n" | |
output = [] | |
# Add Space link if space_id is provided | |
if space_id: | |
output.append( | |
f"**Source Space:** [`{space_id}`](https://huggingface.co/spaces/{space_id})\n" | |
) | |
output.append(f"**App Description:** {tldr_data.get('app_description', 'N/A')}\n") | |
privacy_summary = tldr_data.get("privacy_tldr", "N/A") | |
output.append(f"**Privacy TLDR:** {privacy_summary}") # Removed extra newline | |
# Removed data list rendering from this function | |
return "\n".join(output) | |
def render_data_details_markdown(tldr_data: dict | None) -> str: | |
"""Renders the data lists (types, input, processing, logging) from TLDR data.""" | |
if not tldr_data: | |
return "*Data details could not be generated.*\n" | |
output = [] | |
# Get defined names for formatting | |
defined_names = sorted( | |
[ | |
dt.get("name", "") | |
for dt in tldr_data.get("data_types", []) | |
if dt.get("name") | |
], | |
key=len, | |
reverse=True, | |
) | |
output.append("**Data Types Defined:**") # Renamed slightly for clarity | |
data_types = tldr_data.get("data_types") | |
if data_types and isinstance(data_types, list): | |
if not data_types: | |
output.append("- None identified.") | |
else: | |
for item in data_types: | |
name = item.get("name", "Unnamed") | |
desc = item.get("description", "No description") | |
output.append(f"- `{name}`: {desc}") | |
else: | |
output.append("- (Error loading data types)") | |
output.append("") # Add newline for spacing | |
# Reusable helper for rendering lists | |
def render_list(title, key): | |
output.append(f"**{title}:**") | |
data_list = tldr_data.get(key) | |
if isinstance(data_list, list): | |
if not data_list: | |
output.append("- None identified.") | |
else: | |
for item_str in data_list: | |
formatted_item = item_str # Default | |
found_match = False | |
for name in defined_names: | |
if item_str == name: | |
formatted_item = f"`{name}`" | |
found_match = True | |
break | |
elif item_str.startswith(name + " "): | |
formatted_item = f"`{name}`{item_str[len(name):]}" | |
found_match = True | |
break | |
if ( | |
not found_match | |
and " " not in item_str | |
and not item_str.startswith("`") | |
): | |
formatted_item = f"`{item_str}`" | |
output.append(f"- {formatted_item}") | |
else: | |
output.append("- (Error loading list)") | |
output.append("") | |
render_list("Data Sent by User to App", "user_input_data") | |
render_list("Data Processed Locally within App", "local_processing") | |
render_list("Data Processed Remotely", "remote_processing") | |
render_list("Data Logged/Saved Externally", "external_logging") | |
# Remove the last empty line | |
if output and output[-1] == "": | |
output.pop() | |
return "\n".join(output) | |
# --- Combined TLDR Generation Function --- | |
def generate_and_parse_tldr(detailed_report: str, summary_report: str) -> dict | None: | |
"""Formats prompt, queries LLM, and parses JSON response for TLDR. | |
Args: | |
detailed_report: The detailed privacy report content. | |
summary_report: The summary & highlights report content. | |
Returns: | |
A dictionary with the parsed TLDR data, or None if any step fails. | |
""" | |
logging.info("Starting TLDR generation and parsing...") | |
try: | |
# Format | |
tldr_prompt_messages = format_tldr_prompt(detailed_report, summary_report) | |
if not tldr_prompt_messages: | |
logging.error("TLDR Generation: Failed to format prompt.") | |
return None | |
# Query (using existing import within analysis_utils) | |
# Use slightly smaller max_tokens | |
llm_response = query_qwen_endpoint(tldr_prompt_messages, max_tokens=1024) | |
if llm_response is None: # Check if query itself failed critically | |
logging.error("TLDR Generation: LLM query returned None.") | |
return None | |
# 503 handled within parse function below | |
# Parse | |
parsed_data = parse_tldr_json_response(llm_response) | |
if parsed_data: | |
logging.info("Successfully generated and parsed TLDR.") | |
return parsed_data | |
else: | |
logging.error("TLDR Generation: Failed to parse JSON response.") | |
return None | |
except Exception as e: | |
logging.error( | |
f"TLDR Generation: Unexpected error in generate_and_parse_tldr: {e}", | |
exc_info=True, | |
) | |
return None | |