docling_rag / utils /ingestion.py
NEXAS's picture
Update utils/ingestion.py
0a394f8 verified
raw
history blame
4.34 kB
import json
import time
import os
from pathlib import Path
from typing import Dict, Any, List
import chromadb
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import (
DocumentConverter,
PdfFormatOption,
WordFormatOption,
)
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.document import DoclingDocument
from docling.chunking.hierarchical_chunker import HierarchicalChunker
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
class DocumentProcessor:
def __init__(self):
"""Initialize document processor with Docling v2 changes"""
self.setup_document_converter()
self.embed_model = FastEmbedEmbeddings()
self.client = chromadb.PersistentClient(path="chroma_db")
def setup_document_converter(self):
"""Configure document converter to support multiple formats"""
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = False
pipeline_options.do_table_structure = True
self.converter = DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.IMAGE,
InputFormat.DOCX,
InputFormat.HTML,
InputFormat.PPTX,
InputFormat.TXT, # Added text format
InputFormat.CSV, # Added CSV format
],
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options,
backend=PyPdfiumDocumentBackend()
),
InputFormat.DOCX: WordFormatOption(
pipeline_cls=SimplePipeline
),
},
)
def process_document(self, file_path: str):
"""Process document and create searchable index with metadata"""
print(f"πŸ“„ Processing document: {file_path}")
start_time = time.time()
file_ext = Path(file_path).suffix.lower()
try:
conv_result = self.converter.convert(file_path)
doc: DoclingDocument = conv_result.document
except Exception as e:
print(f"❌ Conversion failed: {e}")
return None
# Save document as markdown
output_dir = Path("parsed-doc")
output_dir.mkdir(parents=True, exist_ok=True)
doc_filename = Path(file_path).stem
md_filename = output_dir / f"{doc_filename}.md"
doc.save_as_markdown(md_filename)
chunker = HierarchicalChunker()
chunks = list(chunker.chunk(doc))
processed_chunks = []
for chunk in chunks:
metadata = {
"text": chunk.text.strip(),
"headings": [item.text for item in chunk.doc_items if hasattr(item, "text")],
"content_type": chunk.doc_items[0].label if chunk.doc_items else "Unknown",
}
processed_chunks.append(metadata)
print("βœ… Chunking completed. Creating vector database...")
collection = self.client.get_or_create_collection(name="document_chunks")
documents, embeddings, metadata_list, ids = [], [], [], []
for idx, chunk in enumerate(processed_chunks):
text = chunk.get('text', '').strip()
if not text:
continue
embedding = self.embed_model.embed_documents([text])[0]
documents.append(text)
embeddings.append(embedding)
metadata_list.append({
"headings": json.dumps(chunk.get('headings', [])),
"content_type": chunk.get('content_type', None)
})
ids.append(str(idx))
if documents:
collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadata_list
)
print(f"βœ… Successfully added {len(documents)} chunks to the database.")
print(f"βœ… Document processing completed in {time.time() - start_time:.2f} seconds")
return collection