Artemis-IA's picture
Update app.py
69c9fec verified
import os
import io
import time
from pathlib import Path
from typing import List, Dict
from PIL import Image
import streamlit as st
import pandas as pd
import json
import yaml
import zipfile
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import ConversionStatus
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
AcceleratorOptions,
AcceleratorDevice,
TableStructureOptions,
TableFormerMode,
EasyOcrOptions,
TesseractCliOcrOptions,
TesseractOcrOptions,
RapidOcrOptions,
OcrMacOptions,
)
from docling_core.types.doc import PictureItem, TableItem
# Configuration des répertoires
OUTPUT_DIR = Path("output")
FIGURES_DIR = OUTPUT_DIR / "figures"
TABLES_DIR = OUTPUT_DIR / "tables"
def setup_directories():
OUTPUT_DIR.mkdir(exist_ok=True)
FIGURES_DIR.mkdir(exist_ok=True)
TABLES_DIR.mkdir(exist_ok=True)
def is_valid_file(file_path: Path) -> bool:
valid_extensions = [".pdf", ".docx", ".pptx", ".html", ".png", ".jpg"]
return file_path.suffix.lower() in valid_extensions
def create_document_converter(config: Dict) -> DocumentConverter:
accelerator_options = AcceleratorOptions(
num_threads=8,
device=AcceleratorDevice[config['accelerator'].upper()]
)
table_structure_options = TableStructureOptions(
mode=TableFormerMode[config['table_mode'].upper()],
do_cell_matching=True
)
ocr_engines = {
"easyocr": EasyOcrOptions(lang=config['ocr_languages']),
"tesseract_cli": TesseractCliOcrOptions(lang=config['ocr_languages']),
"tesserocr": TesseractOcrOptions(lang=config['ocr_languages']),
"rapidocr": RapidOcrOptions(lang=config['ocr_languages']),
"ocrmac": OcrMacOptions(lang=config['ocr_languages'])
}
pipeline_options = PdfPipelineOptions(
do_ocr=config['use_ocr'],
generate_page_images=True,
generate_picture_images=config['export_figures'],
generate_table_images=config['export_tables'],
accelerator_options=accelerator_options,
table_structure_options=table_structure_options,
ocr_options=ocr_engines[config['ocr_engine']]
)
return DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.DOCX,
InputFormat.PPTX,
InputFormat.HTML,
InputFormat.IMAGE
],
format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)}
)
def process_files(uploaded_files, config: Dict) -> Dict:
setup_directories()
converter = create_document_converter(config)
results = {
'figures': [],
'tables_csv': [],
'tables_html': [],
'exports': {fmt: [] for fmt in config['export_formats']}
}
progress_bar = st.progress(0)
status_placeholder = st.empty()
start_time = time.time()
for idx, uploaded_file in enumerate(uploaded_files):
try:
file_path = OUTPUT_DIR / uploaded_file.name
file_path.write_bytes(uploaded_file.getbuffer())
if not is_valid_file(file_path):
continue
status_placeholder.info(f"Traitement de {file_path.name} ({idx+1}/{len(uploaded_files)})")
conv_results = list(converter.convert_all([file_path], raises_on_error=False))
for conv_res in conv_results:
if conv_res.status == ConversionStatus.SUCCESS:
handle_successful_conversion(conv_res, results, config['export_formats'])
progress_bar.progress((idx + 1) / len(uploaded_files))
except Exception as e:
st.error(f"Erreur avec {uploaded_file.name}: {str(e)}")
results['processing_time'] = time.time() - start_time
return results
def handle_successful_conversion(conv_res, results: Dict, export_formats: List[str]):
# Export des formats de document
for fmt in export_formats:
output_file = OUTPUT_DIR / f"{conv_res.input.file.stem}.{fmt}"
with open(output_file, "w") as f:
if fmt == "md":
content = conv_res.document.export_to_markdown()
f.write(content)
results['exports']['md'].append((output_file, content))
elif fmt == "json":
content = json.dumps(conv_res.document.export_to_dict(), ensure_ascii=False, indent=2)
f.write(content)
results['exports']['json'].append((output_file, content))
elif fmt == "yaml":
content = yaml.dump(conv_res.document.export_to_dict(), allow_unicode=True)
f.write(content)
results['exports']['yaml'].append((output_file, content))
elif fmt == "multimodal":
results['exports']['multimodal'].append(output_file)
# Extraction des éléments
for element, _ in conv_res.document.iterate_items():
if isinstance(element, PictureItem):
handle_picture_element(element, conv_res, results)
elif isinstance(element, TableItem):
handle_table_element(element, conv_res, results)
def handle_picture_element(element: PictureItem, conv_res, results: Dict):
fig_path = FIGURES_DIR / f"{conv_res.input.file.stem}_figure_{len(results['figures'])}.png"
element.image.pil_image.save(fig_path)
results['figures'].append(fig_path)
def handle_table_element(element: TableItem, conv_res, results: Dict):
csv_path = TABLES_DIR / f"{conv_res.input.file.stem}_table_{len(results['tables_csv'])}.csv"
element.export_to_dataframe().to_csv(csv_path, index=False)
results['tables_csv'].append(csv_path)
html_path = TABLES_DIR / f"{conv_res.input.file.stem}_table_{len(results['tables_html'])}.html"
with open(html_path, "w") as f:
f.write(element.export_to_html())
results['tables_html'].append(html_path)
def display_export_content(title: str, content: str, format: str):
with st.expander(f"📄 {title}"):
if format == "md":
st.markdown(content)
elif format in ["json", "yaml"]:
st.code(content, language=format)
elif format == "multimodal":
st.info("Affichage multimodal combinant texte, images et tableaux")
st.markdown(content)
def display_results(results: Dict):
st.session_state.time_placeholder.success(f"⏱ Temps total de conversion : {int(results['processing_time'])} secondes")
# Affichage des exports
for fmt, exports in results['exports'].items():
if exports:
st.subheader(f"📁 Exports {fmt.upper()}")
for export in exports:
if fmt == "multimodal":
display_multimodal_result(export)
else:
file_path, content = export
display_export_content(file_path.name, content, fmt)
# Section des figures
if results['figures']:
st.subheader("🖼️ Figures extraites")
cols = st.columns(3)
for idx, fig_path in enumerate(results['figures']):
try:
cols[idx % 3].image(Image.open(fig_path), caption=fig_path.name, use_container_width=True)
except Exception as e:
cols[idx % 3].error(f"Erreur d'affichage de {fig_path.name}")
# Section des tableaux
if results['tables_csv'] or results['tables_html']:
st.subheader("📋 Tableaux extraits")
display_format = st.radio("Format d'affichage", ['CSV', 'HTML'], horizontal=True)
if display_format == 'CSV':
for table_path in results['tables_csv']:
try:
df = pd.read_csv(table_path)
st.write(f"**{table_path.stem}**")
st.dataframe(df.style.set_properties(**{'text-align': 'left'}))
except Exception as e:
st.error(f"Erreur de lecture CSV {table_path.name}: {str(e)}")
else:
for html_path in results['tables_html']:
try:
with open(html_path, "r") as f:
st.write(f"**{html_path.stem}**")
st.markdown(f.read(), unsafe_allow_html=True)
except Exception as e:
st.error(f"Erreur de lecture HTML {html_path.name}: {str(e)}")
def display_multimodal_result(file_path: Path):
with st.expander(f"🌈 {file_path.name}"):
col1, col2 = st.columns([2, 1])
with col1:
try:
with open(file_path, "r") as f:
content = f.read()
st.markdown(content)
except Exception as e:
st.error(f"Erreur de lecture : {str(e)}")
with col2:
related_files = [
f for f in OUTPUT_DIR.glob(f"{file_path.stem}*")
if f != file_path and not f.is_dir()
]
if related_files:
st.write("Fichiers associés :")
for f in related_files:
st.write(f"- `{f.name}`")
if f.suffix in [".png", ".jpg"]:
st.image(Image.open(f), use_column_width=True)
elif f.suffix == ".csv":
try:
st.dataframe(pd.read_csv(f).head(3))
except Exception as e:
st.error(f"Erreur d'affichage CSV : {str(e)}")
def create_zip_buffer(directory: Path) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(directory):
for file in files:
file_path = Path(root) / file
zipf.write(file_path, arcname=file_path.relative_to(directory.parent))
buffer.seek(0)
return buffer.getvalue()
# Interface utilisateur
def main():
st.title("📊🦆 Docling Document Converter")
st.session_state.time_placeholder = st.empty()
uploaded_files = st.file_uploader(
"Téléchargez vos documents",
accept_multiple_files=True,
type=["pdf", "docx", "pptx", "html", "png", "jpg"]
)
with st.expander("Options avancées"):
config = {
'use_ocr': st.checkbox("Activer OCR", True),
'export_figures': st.checkbox("Exporter les images", True),
'export_tables': st.checkbox("Exporter les tableaux", True),
'ocr_engine': st.selectbox("Moteur OCR", ["easyocr", "tesseract_cli", "tesserocr", "rapidocr", "ocrmac"]),
'ocr_languages': st.text_input("Langues OCR (séparées par des virgules)", "en").split(','),
'table_mode': st.selectbox("Mode des tableaux", ["ACCURATE", "FAST"]),
'export_formats': st.multiselect(
"Formats d'export",
["json", "yaml", "md", "multimodal"],
default=["md"]
),
'accelerator': st.selectbox("Accélérateur matériel", ["cpu", "cuda", "mps"], index=0)
}
if st.button("Démarrer la conversion"):
if uploaded_files:
results = process_files(uploaded_files, config)
display_results(results)
st.success("✅ Conversion terminée avec succès !")
# Création du buffer ZIP
try:
zip_buffer = create_zip_buffer(OUTPUT_DIR)
st.download_button(
label="📥 Télécharger tous les résultats",
data=zip_buffer,
file_name="conversion_results.zip",
mime="application/zip"
)
except Exception as e:
st.error(f"Erreur lors de la création du ZIP : {str(e)}")
if __name__ == "__main__":
main()