Spaces:
Running
Running
File size: 30,086 Bytes
36de078 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
import json # Added for TLDR JSON parsing
import logging
import os
import tempfile
from huggingface_hub import HfApi
from huggingface_hub.inference._generated.types import \
ChatCompletionOutput # Added for type hinting
# Imports from other project modules
from llm_interface import (ERROR_503_DICT, parse_qwen_response,
query_qwen_endpoint)
from prompts import format_privacy_prompt, format_summary_highlights_prompt
from utils import (PRIVACY_FILENAME, # Import constants for filenames
SUMMARY_FILENAME, TLDR_FILENAME, check_report_exists,
download_cached_reports, get_space_code_files)
# Configure logging (can inherit from app.py if called from there, but good practice)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Load environment variables - redundant if always called by app.py which already loads them
# load_dotenv()
# Constants needed by helper functions (can be passed as args too)
# Consider passing these from app.py if they might change or for clarity
CACHE_INFO_MSG = "\n\n*(Report retrieved from cache)*"
TRUNCATION_WARNING = """**⚠️ Warning:** The input data (code and/or prior analysis) was too long for the AI model's context limit and had to be truncated. The analysis below may be incomplete or based on partial information.\n\n---\n\n"""
# --- Constants for TLDR Generation ---
TLDR_SYSTEM_PROMPT = (
"You are an AI assistant specialized in summarizing privacy analysis reports for Hugging Face Spaces. "
"You will receive two reports: a detailed privacy analysis and a summary/highlights report. "
"Based **only** on the content of these two reports, generate a concise JSON object containing a structured TLDR (Too Long; Didn't Read). "
"Do not use any information not present in the provided reports. "
"The JSON object must have the following keys:\n"
'- "app_description": A 1-2 sentence summary of what the application does from a user\'s perspective.\n'
'- "privacy_tldr": A 2-3 sentence high-level overview of privacy. Mention if the analysis was conclusive based on available code, if data processing is local, or if/what data goes to external services.\n'
'- "data_types": A list of JSON objects, where each object has two keys: \'name\' (a short, unique identifier string for the data type, e.g., "User Text") and \'description\' (a brief string explaining the data type in context, max 6-8 words, e.g., "Text prompt entered by the user").\n'
"- \"user_input_data\": A list of strings, where each string is the 'name' of a data type defined in 'data_types' that is provided by the user to the app.\n"
"- \"local_processing\": A list of strings describing data processed locally. Each string should start with the 'name' of a data type defined in 'data_types', followed by details (like the processing model) in parentheses if mentioned in the reports. Example: \"User Text (Local Model XYZ)\".\n"
"- \"remote_processing\": A list of strings describing data sent to remote services. Each string should start with the 'name' of a data type defined in 'data_types', followed by the service/model name in parentheses if mentioned in the reports. Example: \"User Text (HF Inference API)\".\n"
"- \"external_logging\": A list of strings describing data logged or saved externally. Each string should start with the 'name' of a data type defined in 'data_types', followed by the location/service in parentheses if mentioned. Example: \"User Text (External DB)\".\n"
"Ensure the output is **only** a valid JSON object, starting with `{` and ending with `}`. Ensure all listed data types in the processing/logging lists exactly match a 'name' defined in the 'data_types' list."
)
# --- Analysis Pipeline Helper Functions ---
def check_cache_and_download(space_id: str, dataset_id: str, hf_token: str | None):
"""Checks cache and downloads if reports exist."""
logging.info(f"Checking cache for '{space_id}'...")
found_in_cache = False
if hf_token:
try:
found_in_cache = check_report_exists(space_id, dataset_id, hf_token)
except Exception as e:
logging.warning(f"Cache check failed for {space_id}: {e}. Proceeding.")
# Return cache_miss even if check failed, proceed to live analysis
return {"status": "cache_miss", "error_message": f"Cache check failed: {e}"}
if found_in_cache:
logging.info(f"Cache hit for {space_id}. Downloading.")
try:
cached_reports = download_cached_reports(space_id, dataset_id, hf_token)
summary_report = (
cached_reports.get("summary", "Error: Cached summary not found.")
+ CACHE_INFO_MSG
)
privacy_report = (
cached_reports.get("privacy", "Error: Cached privacy report not found.")
+ CACHE_INFO_MSG
)
logging.info(f"Successfully downloaded cached reports for {space_id}.")
return {
"status": "cache_hit",
"summary": summary_report,
"privacy": privacy_report,
"tldr_json_str": cached_reports.get("tldr_json_str"),
}
except Exception as e:
error_msg = f"Cache download failed for {space_id}: {e}"
logging.warning(f"{error_msg}. Proceeding with live analysis.")
# Return error, but let caller decide if live analysis proceeds
return {"status": "cache_error", "ui_message": error_msg}
else:
logging.info(f"Cache miss for {space_id}. Performing live analysis.")
return {"status": "cache_miss"}
def check_endpoint_status(
endpoint_name: str, hf_token: str | None, error_503_user_message: str
):
"""Checks the status of the inference endpoint."""
logging.info(f"Checking endpoint status for '{endpoint_name}'...")
if not hf_token:
# Allow proceeding if token missing, maybe endpoint is public
logging.warning("HF_TOKEN not set, cannot check endpoint status definitively.")
return {"status": "ready", "warning": "HF_TOKEN not set"}
try:
api = HfApi(token=hf_token)
endpoint = api.get_inference_endpoint(name=endpoint_name)
status = endpoint.status
logging.info(f"Endpoint '{endpoint_name}' status: {status}")
if status == "running":
return {"status": "ready"}
else:
logging.warning(
f"Endpoint '{endpoint_name}' is not ready (Status: {status})."
)
if status == "scaledToZero":
logging.info(
f"Endpoint '{endpoint_name}' is scaled to zero. Attempting to resume..."
)
try:
endpoint.resume()
# Still return an error message suggesting retry, as resume takes time
# Keep this message concise as the action is specific (wait)
msg = f"**Endpoint Resuming:** The analysis endpoint ('{endpoint_name}') was scaled to zero and is now restarting.\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
except Exception as resume_error:
# Resume failed, provide detailed message
logging.error(
f"Failed to resume endpoint {endpoint_name}: {resume_error}"
)
# Construct detailed message including full explanation
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') is currently {status} and an attempt to resume it failed ({resume_error}).\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
else: # Paused, failed, pending etc.
# Construct detailed message including full explanation
msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') status is currently <span style='color:red'>**{status}**</span>.\n\n{error_503_user_message}"
return {"status": "error", "ui_message": msg}
except Exception as e:
error_msg = f"Error checking analysis endpoint status for {endpoint_name}: {e}"
logging.error(error_msg)
# Let analysis stop if endpoint check fails critically
return {"status": "error", "ui_message": f"Error checking endpoint status: {e}"}
def fetch_and_validate_code(space_id: str):
"""Fetches and validates code files for the space."""
logging.info(f"Fetching code files for {space_id}...")
code_files = get_space_code_files(space_id)
if not code_files:
error_msg = f"Could not retrieve code files for '{space_id}'. Check ID and ensure it's a public Space."
logging.warning(error_msg)
return {
"status": "error",
"ui_message": f"**Error:**\n{error_msg}\nAnalysis Canceled.",
}
logging.info(f"Successfully fetched {len(code_files)} files for {space_id}.")
return {"status": "success", "code_files": code_files}
def generate_detailed_report(
space_id: str, code_files: dict, error_503_user_message: str
):
"""Generates the detailed privacy report using the LLM."""
logging.info("Generating detailed privacy analysis report...")
privacy_prompt_messages, privacy_truncated = format_privacy_prompt(
space_id, code_files
)
privacy_api_response = query_qwen_endpoint(privacy_prompt_messages, max_tokens=3072)
if privacy_api_response == ERROR_503_DICT:
logging.warning("LLM Call 1 (Privacy) failed with 503.")
return {"status": "error", "ui_message": error_503_user_message}
detailed_privacy_report = parse_qwen_response(privacy_api_response)
if "Error:" in detailed_privacy_report:
error_msg = (
f"Failed to generate detailed privacy report: {detailed_privacy_report}"
)
logging.error(error_msg)
return {
"status": "error",
"ui_message": f"**Error Generating Detailed Privacy Report:**\n{detailed_privacy_report}\nAnalysis Halted.",
}
if privacy_truncated:
detailed_privacy_report = TRUNCATION_WARNING + detailed_privacy_report
logging.info("Successfully generated detailed privacy report.")
return {
"status": "success",
"report": detailed_privacy_report,
"truncated": privacy_truncated,
}
def generate_summary_report(
space_id: str,
code_files: dict,
detailed_privacy_report: str,
error_503_user_message: str,
):
"""Generates the summary & highlights report using the LLM."""
logging.info("Generating summary and highlights report...")
# Remove potential truncation warning from detailed report before sending to next LLM
clean_detailed_report = detailed_privacy_report.replace(TRUNCATION_WARNING, "")
summary_highlights_prompt_messages, summary_truncated = (
format_summary_highlights_prompt(space_id, code_files, clean_detailed_report)
)
summary_highlights_api_response = query_qwen_endpoint(
summary_highlights_prompt_messages, max_tokens=2048
)
if summary_highlights_api_response == ERROR_503_DICT:
logging.warning("LLM Call 2 (Summary) failed with 503.")
# Return specific status to indicate partial success
return {"status": "error_503_summary", "ui_message": error_503_user_message}
summary_highlights_report = parse_qwen_response(summary_highlights_api_response)
if "Error:" in summary_highlights_report:
error_msg = (
f"Failed to generate summary/highlights report: {summary_highlights_report}"
)
logging.error(error_msg)
# Return specific status to indicate partial success
return {
"status": "error_summary",
"ui_message": f"**Error Generating Summary/Highlights:**\n{summary_highlights_report}",
}
if summary_truncated:
summary_highlights_report = TRUNCATION_WARNING + summary_highlights_report
logging.info("Successfully generated summary & highlights report.")
return {
"status": "success",
"report": summary_highlights_report,
"truncated": summary_truncated,
}
def upload_results(
space_id: str,
summary_report: str,
detailed_report: str,
dataset_id: str,
hf_token: str | None,
tldr_json_data: dict | None = None,
):
"""Uploads the generated reports (Markdown and optional JSON TLDR) to the specified dataset repository."""
if not hf_token:
logging.warning("HF Token not provided, skipping dataset report upload.")
return {"status": "skipped", "reason": "HF_TOKEN not set"}
if "Error:" in detailed_report or "Error:" in summary_report:
msg = "Skipping cache upload due to errors in generated reports."
logging.warning(msg)
return {"status": "skipped", "reason": msg}
safe_space_id = space_id.replace("..", "")
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Define local paths
summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME)
privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME)
tldr_json_path_local = os.path.join(tmpdir, TLDR_FILENAME)
# Write Markdown reports
with open(summary_path_local, "w", encoding="utf-8") as f:
f.write(summary_report)
with open(privacy_path_local, "w", encoding="utf-8") as f:
f.write(detailed_report)
# Prepare commit message
commit_message = f"Add analysis reports for Space: {safe_space_id}"
if tldr_json_data:
commit_message += " (including TLDR JSON)"
print(f"Successfully wrote TLDR JSON locally for {safe_space_id}.")
# Write JSON TLDR data if available
try:
with open(tldr_json_path_local, "w", encoding="utf-8") as f:
json.dump(tldr_json_data, f, indent=2, ensure_ascii=False)
logging.info(
f"Successfully wrote TLDR JSON locally for {safe_space_id}."
)
except Exception as json_err:
logging.error(
f"Failed to write TLDR JSON locally for {safe_space_id}: {json_err}"
)
tldr_json_data = None # Prevent upload attempt if writing failed
# Ensure repo exists
api = HfApi(token=hf_token)
repo_url = api.create_repo(
repo_id=dataset_id,
repo_type="dataset",
exist_ok=True,
)
logging.info(f"Ensured dataset repo {repo_url} exists.")
# Upload summary report
api.upload_file(
path_or_fileobj=summary_path_local,
path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(f"Successfully uploaded summary report for {safe_space_id}.")
# Upload privacy report
api.upload_file(
path_or_fileobj=privacy_path_local,
path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message,
)
logging.info(
f"Successfully uploaded detailed privacy report for {safe_space_id}."
)
# print(f"Successfully uploaded detailed privacy report for {safe_space_id}.") # Keep if needed for debug
# Upload JSON TLDR if it was successfully written locally
if tldr_json_data and os.path.exists(tldr_json_path_local):
api.upload_file(
path_or_fileobj=tldr_json_path_local,
path_in_repo=f"{safe_space_id}/{TLDR_FILENAME}",
repo_id=dataset_id,
repo_type="dataset",
commit_message=commit_message, # Can reuse commit message or make specific
)
logging.info(f"Successfully uploaded TLDR JSON for {safe_space_id}.")
print(f"Successfully uploaded TLDR JSON for {safe_space_id}.")
# Return success if all uploads finished without error
return {"status": "success"}
except Exception as e:
error_msg = f"Non-critical error during report upload for {safe_space_id}: {e}"
logging.error(error_msg)
print(error_msg)
return {"status": "error", "message": error_msg}
# --- New TLDR Generation Functions ---
def format_tldr_prompt(
detailed_report: str, summary_report: str
) -> list[dict[str, str]]:
"""Formats the prompt for the TLDR generation task."""
# Clean potential cache/truncation markers from input reports for the LLM
cleaned_detailed = detailed_report.replace(CACHE_INFO_MSG, "").replace(
TRUNCATION_WARNING, ""
)
cleaned_summary = summary_report.replace(CACHE_INFO_MSG, "").replace(
TRUNCATION_WARNING, ""
)
user_content = (
"Please generate a structured JSON TLDR based on the following reports:\n\n"
"--- DETAILED PRIVACY ANALYSIS REPORT START ---\n"
f"{cleaned_detailed}\n"
"--- DETAILED PRIVACY ANALYSIS REPORT END ---\n\n"
"--- SUMMARY & HIGHLIGHTS REPORT START ---\n"
f"{cleaned_summary}\n"
"--- SUMMARY & HIGHLIGHTS REPORT END ---"
)
# Note: We are not handling truncation here, assuming the input reports
# are already reasonably sized from the previous steps.
# If reports could be extremely long, add truncation logic similar to other format_* functions.
messages = [
{"role": "system", "content": TLDR_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
return messages
def parse_tldr_json_response(
response: ChatCompletionOutput | dict | None,
) -> dict | None:
"""Parses the LLM response, expecting JSON content for the TLDR."""
if response is None:
logging.error("TLDR Generation: Failed to get response from LLM.")
return None
# Check for 503 error dict first
if isinstance(response, dict) and response.get("error_type") == "503":
logging.error(f"TLDR Generation: Received 503 error: {response.get('message')}")
return None # Treat 503 as failure for this specific task
# --- Direct Content Extraction (Replaces call to parse_qwen_response) ---
raw_content = ""
try:
# Check if it's likely the expected ChatCompletionOutput structure
if not hasattr(response, "choices"):
logging.error(
f"TLDR Generation: Unexpected response type received: {type(response)}. Content: {response}"
)
return None # Return None if not the expected structure
# Access the generated content according to the ChatCompletionOutput structure
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
if content:
raw_content = content.strip()
logging.info(
"TLDR Generation: Successfully extracted raw content from response."
)
else:
logging.warning(
"TLDR Generation: Response received, but content is empty."
)
return None
else:
logging.warning("TLDR Generation: Response received, but no choices found.")
return None
except AttributeError as e:
# This might catch cases where response looks like the object but lacks expected attributes
logging.error(
f"TLDR Generation: Attribute error parsing response object: {e}. Response structure might be unexpected. Response: {response}"
)
return None
except Exception as e:
logging.error(
f"TLDR Generation: Unexpected error extracting content from response object: {e}"
)
return None
# --- End Direct Content Extraction ---
# --- JSON Parsing Logic ---
if not raw_content: # Should be caught by checks above, but belts and suspenders
logging.error("TLDR Generation: Raw content is empty after extraction attempt.")
return None
try:
# Clean potential markdown code block formatting
if raw_content.strip().startswith("```json"):
raw_content = raw_content.strip()[7:-3].strip()
elif raw_content.strip().startswith("```"):
raw_content = raw_content.strip()[3:-3].strip()
tldr_data = json.loads(raw_content)
# Validate structure: Check if it's a dict and has all required keys
required_keys = [
"app_description",
"privacy_tldr",
"data_types",
"user_input_data",
"local_processing",
"remote_processing",
"external_logging",
]
if not isinstance(tldr_data, dict):
logging.error(
f"TLDR Generation: Parsed content is not a dictionary. Content: {raw_content[:500]}..."
)
return None
if not all(key in tldr_data for key in required_keys):
missing_keys = [key for key in required_keys if key not in tldr_data]
logging.error(
f"TLDR Generation: Parsed JSON is missing required keys: {missing_keys}. Content: {raw_content[:500]}..."
)
return None
# --- Add validation for the new data_types structure ---
data_types_list = tldr_data.get("data_types")
if not isinstance(data_types_list, list):
logging.error(
f"TLDR Generation: 'data_types' is not a list. Content: {data_types_list}"
)
return None
for item in data_types_list:
if (
not isinstance(item, dict)
or "name" not in item
or "description" not in item
):
logging.error(
f"TLDR Generation: Invalid item found in 'data_types' list: {item}. Must be dict with 'name' and 'description'."
)
return None
if not isinstance(item["name"], str) or not isinstance(
item["description"], str
):
logging.error(
f"TLDR Generation: Invalid types for name/description in 'data_types' item: {item}. Must be strings."
)
return None
# --- End validation for data_types ---
# Basic validation for other lists (should contain strings)
validation_passed = True
for key in [
"user_input_data",
"local_processing",
"remote_processing",
"external_logging",
]:
data_list = tldr_data.get(key)
# Add more detailed check and logging
if not isinstance(data_list, list):
logging.error(
f"TLDR Generation Validation Error: Key '{key}' is not a list. Found type: {type(data_list)}, Value: {data_list}"
)
validation_passed = False
# Allow continuing validation for other keys, but mark as failed
elif not all(isinstance(x, str) for x in data_list):
# This check might be too strict if LLM includes non-strings, but keep for now
logging.warning(
f"TLDR Generation Validation Warning: Not all items in list '{key}' are strings. Content: {data_list}"
)
# Decide if this should cause failure - currently it doesn't, just warns
if not validation_passed:
logging.error(
"TLDR Generation: Validation failed due to incorrect list types."
)
return None # Ensure failure if any key wasn't a list
logging.info("Successfully parsed and validated TLDR JSON response.")
return tldr_data
except json.JSONDecodeError as e:
logging.error(
f"TLDR Generation: Failed to decode JSON response: {e}. Content: {raw_content[:500]}..."
)
return None
except Exception as e:
logging.error(f"TLDR Generation: Unexpected error parsing JSON response: {e}")
return None
def render_tldr_markdown(tldr_data: dict | None, space_id: str | None = None) -> str:
"""Renders the top-level TLDR (description, privacy) data into a Markdown string.
(Does not include the data lists)
"""
if not tldr_data:
# Return a more specific message for this part
return "*TLDR Summary could not be generated.*\n"
output = []
# Add Space link if space_id is provided
if space_id:
output.append(
f"**Source Space:** [`{space_id}`](https://huggingface.co/spaces/{space_id})\n"
)
output.append(f"**App Description:** {tldr_data.get('app_description', 'N/A')}\n")
privacy_summary = tldr_data.get("privacy_tldr", "N/A")
output.append(f"**Privacy TLDR:** {privacy_summary}") # Removed extra newline
# Removed data list rendering from this function
return "\n".join(output)
def render_data_details_markdown(tldr_data: dict | None) -> str:
"""Renders the data lists (types, input, processing, logging) from TLDR data."""
if not tldr_data:
return "*Data details could not be generated.*\n"
output = []
# Get defined names for formatting
defined_names = sorted(
[
dt.get("name", "")
for dt in tldr_data.get("data_types", [])
if dt.get("name")
],
key=len,
reverse=True,
)
output.append("**Data Types Defined:**") # Renamed slightly for clarity
data_types = tldr_data.get("data_types")
if data_types and isinstance(data_types, list):
if not data_types:
output.append("- None identified.")
else:
for item in data_types:
name = item.get("name", "Unnamed")
desc = item.get("description", "No description")
output.append(f"- `{name}`: {desc}")
else:
output.append("- (Error loading data types)")
output.append("") # Add newline for spacing
# Reusable helper for rendering lists
def render_list(title, key):
output.append(f"**{title}:**")
data_list = tldr_data.get(key)
if isinstance(data_list, list):
if not data_list:
output.append("- None identified.")
else:
for item_str in data_list:
formatted_item = item_str # Default
found_match = False
for name in defined_names:
if item_str == name:
formatted_item = f"`{name}`"
found_match = True
break
elif item_str.startswith(name + " "):
formatted_item = f"`{name}`{item_str[len(name):]}"
found_match = True
break
if (
not found_match
and " " not in item_str
and not item_str.startswith("`")
):
formatted_item = f"`{item_str}`"
output.append(f"- {formatted_item}")
else:
output.append("- (Error loading list)")
output.append("")
render_list("Data Sent by User to App", "user_input_data")
render_list("Data Processed Locally within App", "local_processing")
render_list("Data Processed Remotely", "remote_processing")
render_list("Data Logged/Saved Externally", "external_logging")
# Remove the last empty line
if output and output[-1] == "":
output.pop()
return "\n".join(output)
# --- Combined TLDR Generation Function ---
def generate_and_parse_tldr(detailed_report: str, summary_report: str) -> dict | None:
"""Formats prompt, queries LLM, and parses JSON response for TLDR.
Args:
detailed_report: The detailed privacy report content.
summary_report: The summary & highlights report content.
Returns:
A dictionary with the parsed TLDR data, or None if any step fails.
"""
logging.info("Starting TLDR generation and parsing...")
try:
# Format
tldr_prompt_messages = format_tldr_prompt(detailed_report, summary_report)
if not tldr_prompt_messages:
logging.error("TLDR Generation: Failed to format prompt.")
return None
# Query (using existing import within analysis_utils)
# Use slightly smaller max_tokens
llm_response = query_qwen_endpoint(tldr_prompt_messages, max_tokens=1024)
if llm_response is None: # Check if query itself failed critically
logging.error("TLDR Generation: LLM query returned None.")
return None
# 503 handled within parse function below
# Parse
parsed_data = parse_tldr_json_response(llm_response)
if parsed_data:
logging.info("Successfully generated and parsed TLDR.")
return parsed_data
else:
logging.error("TLDR Generation: Failed to parse JSON response.")
return None
except Exception as e:
logging.error(
f"TLDR Generation: Unexpected error in generate_and_parse_tldr: {e}",
exc_info=True,
)
return None
|