File size: 8,446 Bytes
912f746 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import io
import json
from typing import Optional
import pandas as pd
import pdfminer.high_level
import PIL.Image
from docx import Document
from docx.opc.exceptions import PackageNotFoundError
from pdfminer.pdfparser import PDFSyntaxError
from src.file_handler.get_file import download_file_for_task
async def convert_excel_bytes_to_llm_format(task_id: str, url: str) -> str:
"""
Downloads an Excel file using download_file_for_task, removes empty rows
from each sheet, and converts its content to an LLM-friendly dictionary format.
Args:
task_id (str): The identifier for the task, used by download_file_for_task.
url (str): The URL of the Excel file to download and process.
Returns:
str: A dictionary where keys are sheet names and values are lists of
dictionaries (each dictionary representing a row, with column
headers as keys).
Returns None if a critical error occurs (e.g., download failure,
file unparseable).
Returns an empty dictionary if the Excel file is valid but contains
no sheets or no data after cleaning.
"""
try:
file_bytes = await download_file_for_task(task_id, url)
if not file_bytes:
print(f"Info [{task_id}]: No content downloaded from URL '{url}'.")
# Depending on desired behavior, could return {} or raise an error.
# Returning None indicates a problem preventing processing.
return None
# Use io.BytesIO to treat the bytes as a file-like object for pandas
excel_buffer = io.BytesIO(file_bytes)
# Use pd.ExcelFile to efficiently parse Excel files, especially with multiple sheets
# This will raise an error (e.g., ValueError, various zipfile/xlrd/openpyxl errors)
# if the file is not a valid Excel format or is corrupted.
xls = pd.ExcelFile(excel_buffer)
if not xls.sheet_names:
print(
f"Info [{task_id}]: Excel file from URL '{url}' has no sheets."
)
return {} # No sheets means no data to process
all_sheets_data = {}
for sheet_name in xls.sheet_names:
# Parse the current sheet into a DataFrame
df = xls.parse(sheet_name)
# Remove rows where all cells are NaN (these are considered empty rows)
df.dropna(how="all", inplace=True)
# Convert the cleaned DataFrame to a list of dictionaries (records format).
# If a sheet becomes empty after dropna, to_dict(orient='records')
# will correctly produce an empty list for that sheet's data.
all_sheets_data[sheet_name] = df.to_dict(orient="records")
return json.dumps(all_sheets_data, ensure_ascii=False)
except pd.errors.ParserError as e:
# Handles errors during the parsing of sheet data by pandas.
print(
f"Error [{task_id}]: Pandas parsing error for Excel file from '{url}': {e}"
)
return None
except ValueError as e:
# Catches errors like "Excel file format cannot be determined..." from pd.ExcelFile
# or other value-related issues during parsing.
print(
f"Error [{task_id}]: Value error processing Excel file from '{url}': {e}"
)
return None
except Exception as e:
# Catch-all for other unexpected errors (e.g., network issues if download_file_for_task
# is called here and raises something not caught, or other pandas/library issues).
# It's good practice to log the full traceback for unexpected errors in a real app.
# import traceback
# traceback.print_exc()
print(
f"Error [{task_id}]: Unexpected error processing Excel file from '{url}': {e}"
)
return None
# ---------------------------------------------------------------------------
# 1. Image → PIL Image
# ---------------------------------------------------------------------------
async def convert_image_to_pillow(
task_id: str, url: str
) -> Optional[PIL.Image.Image]:
"""
Downloads an image file and returns a PIL Image object.
Returns None on failure.
Args:
task_id (str): The ID of the task.
url (str): The URL of the image file.
Returns:
Optional[PIL.Image.Image]: The PIL Image object or None on failure.
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
return PIL.Image.open(io.BytesIO(raw))
except Exception as e:
print(f"Error [{task_id}]: converting image from '{url}' → base64: {e}")
return None
# ---------------------------------------------------------------------------
# 2. File → UTF‑8 string
# ---------------------------------------------------------------------------
async def convert_file_to_string(task_id: str, url: str) -> Optional[str]:
"""
Downloads a file and returns its text (UTF‑8, errors replaced).
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
return raw.decode("utf-8", errors="replace")
except Exception as e:
print(f"Error [{task_id}]: decoding file from '{url}': {e}")
return None
# ---------------------------------------------------------------------------
# 3. DOCX → Markdown
# ---------------------------------------------------------------------------
def _runs_to_md(runs):
"""Helper – convert a list of runs to markdown inline‑text."""
out = []
for run in runs:
text = run.text.replace("\n", " ")
if not text:
continue
if run.bold:
text = f"**{text}**"
if run.italic:
text = f"*{text}*"
out.append(text)
return "".join(out)
async def convert_docx_to_markdown(task_id: str, url: str) -> Optional[str]:
"""
Converts a Word document to *simple* Markdown.
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
doc = Document(io.BytesIO(raw))
md_lines = []
for p in doc.paragraphs:
style = (p.style.name or "").lower()
text = _runs_to_md(p.runs).strip()
if not text:
continue
if "heading" in style:
# e.g. 'Heading 1' → level 1, 'Heading 2' → level 2, etc.
level = int("".join(filter(str.isdigit, style)) or 1)
md_lines.append(f"{'#' * level} {text}")
else:
md_lines.append(text)
return "\n\n".join(md_lines)
except PackageNotFoundError:
print(f"Error [{task_id}]: file from '{url}' is not a valid DOCX.")
return None
except Exception as e:
print(f"Error [{task_id}]: DOCX→MD conversion failed for '{url}': {e}")
return None
# ---------------------------------------------------------------------------
# 4. PDF → Markdown (really, plain text with paragraph breaks)
# ---------------------------------------------------------------------------
async def convert_pdf_to_markdown(task_id: str, url: str) -> Optional[str]:
"""
Extracts text from a PDF and returns it as Markdown (plain paragraphs).
"""
try:
raw = await download_file_for_task(task_id, url)
if not raw:
print(f"Info [{task_id}]: No bytes downloaded from '{url}'.")
return None
text = pdfminer.high_level.extract_text(io.BytesIO(raw))
if not text.strip():
print(f"Info [{task_id}]: PDF at '{url}' produced no text.")
return ""
# Very light Markdown: treat empty lines as paragraph separators
paragraphs = [p.strip() for p in text.splitlines() if p.strip()]
return "\n\n".join(paragraphs)
except (PDFSyntaxError, ValueError) as e:
print(f"Error [{task_id}]: PDF syntax error for '{url}': {e}")
return None
except Exception as e:
print(f"Error [{task_id}]: PDF→MD conversion failed for '{url}': {e}")
return None
|