import io import json from typing import Optional import pandas as pd import pdfminer.high_level import PIL.Image from docx import Document from docx.opc.exceptions import PackageNotFoundError from pdfminer.pdfparser import PDFSyntaxError from src.file_handler.get_file import download_file_for_task async def convert_excel_bytes_to_llm_format(task_id: str, url: str) -> str: """ Downloads an Excel file using download_file_for_task, removes empty rows from each sheet, and converts its content to an LLM-friendly dictionary format. Args: task_id (str): The identifier for the task, used by download_file_for_task. url (str): The URL of the Excel file to download and process. Returns: str: A dictionary where keys are sheet names and values are lists of dictionaries (each dictionary representing a row, with column headers as keys). Returns None if a critical error occurs (e.g., download failure, file unparseable). Returns an empty dictionary if the Excel file is valid but contains no sheets or no data after cleaning. """ try: file_bytes = await download_file_for_task(task_id, url) if not file_bytes: print(f"Info [{task_id}]: No content downloaded from URL '{url}'.") # Depending on desired behavior, could return {} or raise an error. # Returning None indicates a problem preventing processing. return None # Use io.BytesIO to treat the bytes as a file-like object for pandas excel_buffer = io.BytesIO(file_bytes) # Use pd.ExcelFile to efficiently parse Excel files, especially with multiple sheets # This will raise an error (e.g., ValueError, various zipfile/xlrd/openpyxl errors) # if the file is not a valid Excel format or is corrupted. xls = pd.ExcelFile(excel_buffer) if not xls.sheet_names: print( f"Info [{task_id}]: Excel file from URL '{url}' has no sheets." ) return {} # No sheets means no data to process all_sheets_data = {} for sheet_name in xls.sheet_names: # Parse the current sheet into a DataFrame df = xls.parse(sheet_name) # Remove rows where all cells are NaN (these are considered empty rows) df.dropna(how="all", inplace=True) # Convert the cleaned DataFrame to a list of dictionaries (records format). # If a sheet becomes empty after dropna, to_dict(orient='records') # will correctly produce an empty list for that sheet's data. all_sheets_data[sheet_name] = df.to_dict(orient="records") return json.dumps(all_sheets_data, ensure_ascii=False) except pd.errors.ParserError as e: # Handles errors during the parsing of sheet data by pandas. print( f"Error [{task_id}]: Pandas parsing error for Excel file from '{url}': {e}" ) return None except ValueError as e: # Catches errors like "Excel file format cannot be determined..." from pd.ExcelFile # or other value-related issues during parsing. print( f"Error [{task_id}]: Value error processing Excel file from '{url}': {e}" ) return None except Exception as e: # Catch-all for other unexpected errors (e.g., network issues if download_file_for_task # is called here and raises something not caught, or other pandas/library issues). # It's good practice to log the full traceback for unexpected errors in a real app. # import traceback # traceback.print_exc() print( f"Error [{task_id}]: Unexpected error processing Excel file from '{url}': {e}" ) return None # --------------------------------------------------------------------------- # 1. Image → PIL Image # --------------------------------------------------------------------------- async def convert_image_to_pillow( task_id: str, url: str ) -> Optional[PIL.Image.Image]: """ Downloads an image file and returns a PIL Image object. Returns None on failure. Args: task_id (str): The ID of the task. url (str): The URL of the image file. Returns: Optional[PIL.Image.Image]: The PIL Image object or None on failure. """ try: raw = await download_file_for_task(task_id, url) if not raw: print(f"Info [{task_id}]: No bytes downloaded from '{url}'.") return None return PIL.Image.open(io.BytesIO(raw)) except Exception as e: print(f"Error [{task_id}]: converting image from '{url}' → base64: {e}") return None # --------------------------------------------------------------------------- # 2. File → UTF‑8 string # --------------------------------------------------------------------------- async def convert_file_to_string(task_id: str, url: str) -> Optional[str]: """ Downloads a file and returns its text (UTF‑8, errors replaced). """ try: raw = await download_file_for_task(task_id, url) if not raw: print(f"Info [{task_id}]: No bytes downloaded from '{url}'.") return None return raw.decode("utf-8", errors="replace") except Exception as e: print(f"Error [{task_id}]: decoding file from '{url}': {e}") return None # --------------------------------------------------------------------------- # 3. DOCX → Markdown # --------------------------------------------------------------------------- def _runs_to_md(runs): """Helper – convert a list of runs to markdown inline‑text.""" out = [] for run in runs: text = run.text.replace("\n", " ") if not text: continue if run.bold: text = f"**{text}**" if run.italic: text = f"*{text}*" out.append(text) return "".join(out) async def convert_docx_to_markdown(task_id: str, url: str) -> Optional[str]: """ Converts a Word document to *simple* Markdown. """ try: raw = await download_file_for_task(task_id, url) if not raw: print(f"Info [{task_id}]: No bytes downloaded from '{url}'.") return None doc = Document(io.BytesIO(raw)) md_lines = [] for p in doc.paragraphs: style = (p.style.name or "").lower() text = _runs_to_md(p.runs).strip() if not text: continue if "heading" in style: # e.g. 'Heading 1' → level 1, 'Heading 2' → level 2, etc. level = int("".join(filter(str.isdigit, style)) or 1) md_lines.append(f"{'#' * level} {text}") else: md_lines.append(text) return "\n\n".join(md_lines) except PackageNotFoundError: print(f"Error [{task_id}]: file from '{url}' is not a valid DOCX.") return None except Exception as e: print(f"Error [{task_id}]: DOCX→MD conversion failed for '{url}': {e}") return None # --------------------------------------------------------------------------- # 4. PDF → Markdown (really, plain text with paragraph breaks) # --------------------------------------------------------------------------- async def convert_pdf_to_markdown(task_id: str, url: str) -> Optional[str]: """ Extracts text from a PDF and returns it as Markdown (plain paragraphs). """ try: raw = await download_file_for_task(task_id, url) if not raw: print(f"Info [{task_id}]: No bytes downloaded from '{url}'.") return None text = pdfminer.high_level.extract_text(io.BytesIO(raw)) if not text.strip(): print(f"Info [{task_id}]: PDF at '{url}' produced no text.") return "" # Very light Markdown: treat empty lines as paragraph separators paragraphs = [p.strip() for p in text.splitlines() if p.strip()] return "\n\n".join(paragraphs) except (PDFSyntaxError, ValueError) as e: print(f"Error [{task_id}]: PDF syntax error for '{url}': {e}") return None except Exception as e: print(f"Error [{task_id}]: PDF→MD conversion failed for '{url}': {e}") return None