NEXAS commited on
Commit
4a97e8c
Β·
verified Β·
1 Parent(s): 2723c4f

Update utils/ingestion.py

Browse files
Files changed (1) hide show
  1. utils/ingestion.py +45 -111
utils/ingestion.py CHANGED
@@ -7,150 +7,85 @@ import chromadb
7
 
8
  from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
9
  from docling.datamodel.base_models import InputFormat
10
- from docling.datamodel.pipeline_options import (
11
- AcceleratorDevice,
12
- AcceleratorOptions,
13
- PdfPipelineOptions,
14
- TableFormerMode
15
  )
16
- from docling.document_converter import DocumentConverter, PdfFormatOption
17
- from docling_core.transforms.chunker.hybrid_chunker import HybridChunker
 
 
18
  from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
19
 
20
- from docx import Document # DOCX support
21
- from pptx import Presentation # PPTX support
22
- from bs4 import BeautifulSoup # HTML support
23
-
24
-
25
  class DocumentProcessor:
26
  def __init__(self):
27
- """Initialize document processor with necessary components"""
28
  self.setup_document_converter()
29
  self.embed_model = FastEmbedEmbeddings()
30
- self.client = chromadb.PersistentClient(path="chroma_db") # Persistent Storage
31
 
32
  def setup_document_converter(self):
33
- """Configure document converter with advanced processing capabilities"""
34
  pipeline_options = PdfPipelineOptions()
35
  pipeline_options.do_ocr = True
36
  pipeline_options.do_table_structure = True
37
- pipeline_options.table_structure_options.do_cell_matching = True
38
- pipeline_options.ocr_options.lang = ["en"]
39
- pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
40
-
41
- try:
42
- pipeline_options.accelerator_options = AcceleratorOptions(
43
- num_threads=8, device=AcceleratorDevice.MPS
44
- )
45
- except Exception:
46
- print("⚠️ MPS is not available. Falling back to CPU.")
47
- pipeline_options.accelerator_options = AcceleratorOptions(
48
- num_threads=8, device=AcceleratorDevice.CPU
49
- )
50
 
51
  self.converter = DocumentConverter(
 
 
 
 
 
 
 
52
  format_options={
53
  InputFormat.PDF: PdfFormatOption(
54
  pipeline_options=pipeline_options,
55
- backend=PyPdfiumDocumentBackend
56
- )
57
- }
 
 
 
58
  )
59
 
60
- def extract_chunk_metadata(self, chunk) -> Dict[str, Any]:
61
- """Extract essential metadata from a chunk"""
62
- metadata = {
63
- "text": chunk.text.strip(),
64
- "headings": [],
65
- "page_info": None,
66
- "content_type": None
67
- }
68
-
69
- if hasattr(chunk, 'meta'):
70
- if hasattr(chunk.meta, 'headings') and chunk.meta.headings:
71
- metadata["headings"] = chunk.meta.headings
72
-
73
- if hasattr(chunk.meta, 'doc_items'):
74
- for item in chunk.meta.doc_items:
75
- if hasattr(item, 'label'):
76
- metadata["content_type"] = str(item.label)
77
-
78
- if hasattr(item, 'prov') and item.prov:
79
- for prov in item.prov:
80
- if hasattr(prov, 'page_no'):
81
- metadata["page_info"] = prov.page_no
82
-
83
- return metadata
84
-
85
- def extract_text_from_docx(self, docx_path: str) -> List[str]:
86
- """Extract text from a DOCX file"""
87
- doc = Document(docx_path)
88
- return [para.text.strip() for para in doc.paragraphs if para.text.strip()]
89
-
90
- def extract_text_from_pptx(self, pptx_path: str) -> List[str]:
91
- """Extract text from a PPTX file"""
92
- ppt = Presentation(pptx_path)
93
- slides_text = []
94
- for slide in ppt.slides:
95
- text = " ".join([shape.text for shape in slide.shapes if hasattr(shape, "text")])
96
- if text.strip():
97
- slides_text.append(text.strip())
98
- return slides_text
99
-
100
- def extract_text_from_html(self, html_path: str) -> List[str]:
101
- """Extract text from an HTML file"""
102
- with open(html_path, "r", encoding="utf-8") as file:
103
- soup = BeautifulSoup(file, "html.parser")
104
- return [text.strip() for text in soup.stripped_strings if text.strip()]
105
-
106
  def process_document(self, file_path: str):
107
  """Process document and create searchable index with metadata"""
108
  print(f"πŸ“„ Processing document: {file_path}")
109
  start_time = time.time()
110
  file_ext = Path(file_path).suffix.lower()
111
 
112
- if file_ext == ".pdf":
113
- result = self.converter.convert(file_path)
114
- doc = result.document
115
- chunker = HybridChunker(tokenizer="jinaai/jina-embeddings-v3")
116
- chunks = list(chunker.chunk(doc))
117
-
118
- processed_chunks = []
119
- for chunk in chunks:
120
- metadata = self.extract_chunk_metadata(chunk)
121
- processed_chunks.append(metadata)
122
-
123
- elif file_ext == ".docx":
124
- texts = self.extract_text_from_docx(file_path)
125
- processed_chunks = [{"text": text, "headings": [], "content_type": "DOCX"} for text in texts]
126
-
127
- elif file_ext == ".pptx":
128
- texts = self.extract_text_from_pptx(file_path)
129
- processed_chunks = [{"text": text, "headings": [], "content_type": "PPTX"} for text in texts]
130
 
131
- elif file_ext == ".html":
132
- texts = self.extract_text_from_html(file_path)
133
- processed_chunks = [{"text": text, "headings": [], "content_type": "HTML"} for text in texts]
134
 
135
- else:
136
- print(f"❌ Unsupported file format: {file_ext}")
137
- return None
 
 
 
 
 
138
 
139
  print("βœ… Chunking completed. Creating vector database...")
140
  collection = self.client.get_or_create_collection(name="document_chunks")
141
 
142
- documents = []
143
- embeddings = []
144
- metadata_list = []
145
- ids = []
146
-
147
  for idx, chunk in enumerate(processed_chunks):
148
  text = chunk.get('text', '').strip()
149
  if not text:
150
- print(f"⚠️ Skipping empty chunk at index {idx}")
151
- continue # Skip empty chunks
152
 
153
- embedding = self.embed_model.embed_documents([text])[0] # βœ… Corrected method
154
  documents.append(text)
155
  embeddings.append(embedding)
156
  metadata_list.append({
@@ -168,6 +103,5 @@ class DocumentProcessor:
168
  )
169
  print(f"βœ… Successfully added {len(documents)} chunks to the database.")
170
 
171
- processing_time = time.time() - start_time
172
- print(f"βœ… Document processing completed in {processing_time:.2f} seconds")
173
  return collection
 
7
 
8
  from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
9
  from docling.datamodel.base_models import InputFormat
10
+ from docling.datamodel.pipeline_options import PdfPipelineOptions
11
+ from docling.document_converter import (
12
+ DocumentConverter,
13
+ PdfFormatOption,
14
+ WordFormatOption,
15
  )
16
+ from docling.pipeline.simple_pipeline import SimplePipeline
17
+ from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
18
+ from docling.document import DoclingDocument
19
+ from docling.chunking.hierarchical_chunker import HierarchicalChunker
20
  from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
21
 
 
 
 
 
 
22
  class DocumentProcessor:
23
  def __init__(self):
24
+ """Initialize document processor with Docling v2 changes"""
25
  self.setup_document_converter()
26
  self.embed_model = FastEmbedEmbeddings()
27
+ self.client = chromadb.PersistentClient(path="chroma_db")
28
 
29
  def setup_document_converter(self):
30
+ """Configure document converter to support multiple formats"""
31
  pipeline_options = PdfPipelineOptions()
32
  pipeline_options.do_ocr = True
33
  pipeline_options.do_table_structure = True
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  self.converter = DocumentConverter(
36
+ allowed_formats=[
37
+ InputFormat.PDF,
38
+ InputFormat.IMAGE,
39
+ InputFormat.DOCX,
40
+ InputFormat.HTML,
41
+ InputFormat.PPTX,
42
+ ],
43
  format_options={
44
  InputFormat.PDF: PdfFormatOption(
45
  pipeline_options=pipeline_options,
46
+ backend=PyPdfiumDocumentBackend()
47
+ ),
48
+ InputFormat.DOCX: WordFormatOption(
49
+ pipeline_cls=SimplePipeline
50
+ ),
51
+ },
52
  )
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  def process_document(self, file_path: str):
55
  """Process document and create searchable index with metadata"""
56
  print(f"πŸ“„ Processing document: {file_path}")
57
  start_time = time.time()
58
  file_ext = Path(file_path).suffix.lower()
59
 
60
+ try:
61
+ conv_result = self.converter.convert(file_path)
62
+ doc: DoclingDocument = conv_result.document
63
+ except Exception as e:
64
+ print(f"❌ Conversion failed: {e}")
65
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
+ chunker = HierarchicalChunker()
68
+ chunks = list(chunker.chunk(doc))
 
69
 
70
+ processed_chunks = []
71
+ for chunk in chunks:
72
+ metadata = {
73
+ "text": chunk.text.strip(),
74
+ "headings": [item.text for item in chunk.doc_items if hasattr(item, "text")],
75
+ "content_type": chunk.doc_items[0].label if chunk.doc_items else "Unknown",
76
+ }
77
+ processed_chunks.append(metadata)
78
 
79
  print("βœ… Chunking completed. Creating vector database...")
80
  collection = self.client.get_or_create_collection(name="document_chunks")
81
 
82
+ documents, embeddings, metadata_list, ids = [], [], [], []
 
 
 
 
83
  for idx, chunk in enumerate(processed_chunks):
84
  text = chunk.get('text', '').strip()
85
  if not text:
86
+ continue
 
87
 
88
+ embedding = self.embed_model.embed_documents([text])[0]
89
  documents.append(text)
90
  embeddings.append(embedding)
91
  metadata_list.append({
 
103
  )
104
  print(f"βœ… Successfully added {len(documents)} chunks to the database.")
105
 
106
+ print(f"βœ… Document processing completed in {time.time() - start_time:.2f} seconds")
 
107
  return collection