mhattingpete's picture
add first version of agent
912f746
raw
history blame
8.45 kB
import io
import json
from typing import Optional
import pandas as pd
import pdfminer.high_level
import PIL.Image
from docx import Document
from docx.opc.exceptions import PackageNotFoundError
from pdfminer.pdfparser import PDFSyntaxError
from src.file_handler.get_file import download_file_for_task
async def convert_excel_bytes_to_llm_format(task_id: str, url: str) -> str:
"""
Downloads an Excel file using download_file_for_task, removes empty rows
from each sheet, and converts its content to an LLM-friendly dictionary format.
Args:
task_id (str): The identifier for the task, used by download_file_for_task.
url (str): The URL of the Excel file to download and process.
Returns:
str: A dictionary where keys are sheet names and values are lists of
dictionaries (each dictionary representing a row, with column
headers as keys).
Returns None if a critical error occurs (e.g., download failure,
file unparseable).
Returns an empty dictionary if the Excel file is valid but contains
no sheets or no data after cleaning.
"""
try:
file_bytes = await download_file_for_task(task_id, url)
if not file_bytes:
print(f"Info [{task_id}]: No content downloaded from URL '{url}'.")
# Depending on desired behavior, could return {} or raise an error.
# Returning None indicates a problem preventing processing.
return None
# Use io.BytesIO to treat the bytes as a file-like object for pandas
excel_buffer = io.BytesIO(file_bytes)
# Use pd.ExcelFile to efficiently parse Excel files, especially with multiple sheets
# This will raise an error (e.g., ValueError, various zipfile/xlrd/openpyxl errors)
# if the file is not a valid Excel format or is corrupted.
xls = pd.ExcelFile(excel_buffer)
if not xls.sheet_names:
print(
f"Info [{task_id}]: Excel file from URL '{url}' has no sheets."
)
return {} # No sheets means no data to process
all_sheets_data = {}
for sheet_name in xls.sheet_names:
# Parse the current sheet into a DataFrame
df = xls.parse(sheet_name)
# Remove rows where all cells are NaN (these are considered empty rows)
df.dropna(how="all", inplace=True)
# Convert the cleaned DataFrame to a list of dictionaries (records format).
# If a sheet becomes empty after dropna, to_dict(orient='records')
# will correctly produce an empty list for that sheet's data.
all_sheets_data[sheet_name] = df.to_dict(orient="records")
return json.dumps(all_sheets_data, ensure_ascii=False)
except pd.errors.ParserError as e:
# Handles errors during the parsing of sheet data by pandas.
print(
f"Error [{task_id}]: Pandas parsing error for Excel file from '{url}': {e}"
)
return None
except ValueError as e:
# Catches errors like "Excel file format cannot be determined..." from pd.ExcelFile
# or other value-related issues during parsing.
print(
f"Error [{task_id}]: Value error processing Excel file from '{url}': {e}"
)
return None
except Exception as e:
# Catch-all for other unexpected errors (e.g., network issues if download_file_for_task
# is called here and raises something not caught, or other pandas/library issues).
# It's good practice to log the full traceback for unexpected errors in a real app.
# import traceback
# traceback.print_exc()
print(
f"Error [{task_id}]: Unexpected error processing Excel file from '{url}': {e}"
)
return None
# ---------------------------------------------------------------------------
# 1. Image → PIL Image
# ---------------------------------------------------------------------------
async def convert_image_to_pillow(
task_id: str, url: str
) -> Optional[PIL.Image.Image]:
"""
Downloads an image file and returns a PIL Image object.
Returns None on failure.
Args:
task_id (str): The ID of the task.
url (str): The URL of the image file.
Returns:
Optional[PIL.Image.Image]: The PIL Image object or None on failure.
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
return PIL.Image.open(io.BytesIO(raw))
except Exception as e:
print(f"Error [{task_id}]: converting image from '{url}' → base64: {e}")
return None
# ---------------------------------------------------------------------------
# 2. File → UTF‑8 string
# ---------------------------------------------------------------------------
async def convert_file_to_string(task_id: str, url: str) -> Optional[str]:
"""
Downloads a file and returns its text (UTF‑8, errors replaced).
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
return raw.decode("utf-8", errors="replace")
except Exception as e:
print(f"Error [{task_id}]: decoding file from '{url}': {e}")
return None
# ---------------------------------------------------------------------------
# 3. DOCX → Markdown
# ---------------------------------------------------------------------------
def _runs_to_md(runs):
"""Helper – convert a list of runs to markdown inline‑text."""
out = []
for run in runs:
text = run.text.replace("\n", " ")
if not text:
continue
if run.bold:
text = f"**{text}**"
if run.italic:
text = f"*{text}*"
out.append(text)
return "".join(out)
async def convert_docx_to_markdown(task_id: str, url: str) -> Optional[str]:
"""
Converts a Word document to *simple* Markdown.
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
doc = Document(io.BytesIO(raw))
md_lines = []
for p in doc.paragraphs:
style = (p.style.name or "").lower()
text = _runs_to_md(p.runs).strip()
if not text:
continue
if "heading" in style:
# e.g. 'Heading 1' → level 1, 'Heading 2' → level 2, etc.
level = int("".join(filter(str.isdigit, style)) or 1)
md_lines.append(f"{'#' * level} {text}")
else:
md_lines.append(text)
return "\n\n".join(md_lines)
except PackageNotFoundError:
print(f"Error [{task_id}]: file from '{url}' is not a valid DOCX.")
return None
except Exception as e:
print(f"Error [{task_id}]: DOCX→MD conversion failed for '{url}': {e}")
return None
# ---------------------------------------------------------------------------
# 4. PDF → Markdown (really, plain text with paragraph breaks)
# ---------------------------------------------------------------------------
async def convert_pdf_to_markdown(task_id: str, url: str) -> Optional[str]:
"""
Extracts text from a PDF and returns it as Markdown (plain paragraphs).
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
text = pdfminer.high_level.extract_text(io.BytesIO(raw))
if not text.strip():
print(f"Info [{task_id}]: PDF at '{url}' produced no text.")
return ""
# Very light Markdown: treat empty lines as paragraph separators
paragraphs = [p.strip() for p in text.splitlines() if p.strip()]
return "\n\n".join(paragraphs)
except (PDFSyntaxError, ValueError) as e:
print(f"Error [{task_id}]: PDF syntax error for '{url}': {e}")
return None
except Exception as e:
print(f"Error [{task_id}]: PDF→MD conversion failed for '{url}': {e}")
return None