Ronochieng commited on
Commit
2e83ef9
·
verified ·
1 Parent(s): f0d60d1

Update Ingestion/ingest.py

Browse files
Files changed (1) hide show
  1. Ingestion/ingest.py +110 -107
Ingestion/ingest.py CHANGED
@@ -4,18 +4,21 @@ import pandas as pd
4
  import tempfile
5
  from typing import Dict, Any, Optional, List
6
 
7
- # Import unstructured components for different file types
8
- from unstructured.partition.auto import partition
9
- from unstructured.partition.pdf import partition_pdf
10
- from unstructured.partition.docx import partition_docx
11
- from unstructured.partition.pptx import partition_pptx
12
- from unstructured.partition.xlsx import partition_xlsx
13
- from unstructured.partition.md import partition_md
14
- from unstructured.partition.html import partition_html
15
- from unstructured.partition.xml import partition_xml
16
- from unstructured.partition.email import partition_email
17
- from unstructured.partition.text import partition_text
18
- from unstructured.partition.epub import partition_epub
 
 
 
19
 
20
  def get_processor_for_file(file_path: str) -> Optional[callable]:
21
  """
@@ -23,7 +26,7 @@ def get_processor_for_file(file_path: str) -> Optional[callable]:
23
  """
24
  file_extension = os.path.splitext(file_path)[1].lower()
25
 
26
- # Map file extensions to specific partition functions
27
  processors = {
28
  ".pdf": process_pdf,
29
  ".docx": process_docx,
@@ -40,7 +43,7 @@ def get_processor_for_file(file_path: str) -> Optional[callable]:
40
  ".eml": process_email,
41
  ".epub": process_epub,
42
  ".txt": process_text,
43
- ".csv": process_text,
44
  ".rtf": process_text,
45
 
46
  # Code files
@@ -75,183 +78,183 @@ def process_document(file_path: str) -> Optional[str]:
75
 
76
  def process_pdf(file_path: str) -> str:
77
  """
78
- Process PDF documents using unstructured
79
  """
80
- temp_dir = tempfile.mkdtemp()
 
81
 
82
- try:
83
- # Try hi_res mode first with OCR capabilities
84
- elements = partition_pdf(
85
- filename=file_path,
86
- strategy="hi_res",
87
- extract_images_in_pdf=True,
88
- extract_image_block_types=["Image", "Table"],
89
- extract_image_block_to_payload=False,
90
- extract_image_block_output_dir=temp_dir,
91
- hi_res_model_name="yolox",
92
- infer_table_structure=True,
93
- chunking_strategy="by_title",
94
- max_characters=4000,
95
- new_after_n_chars=3800,
96
- combine_text_under_n_chars=2000,
97
- )
98
- except Exception as e:
99
- # Fall back to fast mode if hi_res fails
100
- elements = partition_pdf(
101
- filename=file_path,
102
- strategy="fast",
103
- chunking_strategy="by_title",
104
- max_characters=4000,
105
- new_after_n_chars=3800,
106
- combine_text_under_n_chars=2000,
107
- )
108
 
109
- # Extract text from elements
110
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
111
- combined_text = "\n\n".join(texts)
112
 
113
- return combined_text
 
 
 
114
 
115
  def process_docx(file_path: str) -> str:
116
  """
117
- Process DOCX documents using unstructured
118
  """
119
- elements = partition_docx(
120
- filename=file_path,
121
- chunking_strategy="by_title",
122
- max_characters=4000,
123
- new_after_n_chars=3800,
124
- combine_text_under_n_chars=2000,
125
- )
126
 
127
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
128
  combined_text = "\n\n".join(texts)
129
 
130
  return combined_text
131
 
132
  def process_pptx(file_path: str) -> str:
133
  """
134
- Process PPTX documents using unstructured
135
  """
136
- elements = partition_pptx(
137
- filename=file_path,
138
- )
139
 
140
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
141
  combined_text = "\n\n".join(texts)
142
 
143
  return combined_text
144
 
145
  def process_xlsx(file_path: str) -> str:
146
  """
147
- Process XLSX documents using unstructured
148
  """
149
- elements = partition_xlsx(
150
- filename=file_path,
151
- )
152
 
153
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
154
  combined_text = "\n\n".join(texts)
155
 
156
  return combined_text
157
 
158
  def process_markdown(file_path: str) -> str:
159
  """
160
- Process Markdown documents using unstructured
161
  """
162
- elements = partition_md(
163
- filename=file_path,
164
- )
165
 
166
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
167
  combined_text = "\n\n".join(texts)
168
 
169
  return combined_text
170
 
171
  def process_html(file_path: str) -> str:
172
  """
173
- Process HTML documents using unstructured
174
  """
175
- elements = partition_html(
176
- filename=file_path,
177
- )
178
 
179
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
180
  combined_text = "\n\n".join(texts)
181
 
182
  return combined_text
183
 
184
  def process_xml(file_path: str) -> str:
185
  """
186
- Process XML documents using unstructured
187
  """
188
- elements = partition_xml(
189
- filename=file_path,
190
- )
191
 
192
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
193
  combined_text = "\n\n".join(texts)
194
 
195
  return combined_text
196
 
197
  def process_email(file_path: str) -> str:
198
  """
199
- Process email documents using unstructured
200
  """
201
- elements = partition_email(
202
- filename=file_path,
203
- )
204
 
205
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
206
  combined_text = "\n\n".join(texts)
207
 
208
  return combined_text
209
 
210
  def process_text(file_path: str) -> str:
211
  """
212
- Process text documents using unstructured
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  """
214
- elements = partition_text(
215
- filename=file_path,
216
- chunking_strategy="by_title",
217
- max_characters=4000,
218
- new_after_n_chars=3800,
219
- combine_text_under_n_chars=2000,
220
- )
221
 
222
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
223
- combined_text = "\n\n".join(texts)
 
 
 
 
 
 
 
 
224
 
225
- return combined_text
226
 
227
  def process_epub(file_path: str) -> str:
228
  """
229
- Process EPUB documents using unstructured
230
  """
231
- elements = partition_epub(
232
- filename=file_path,
233
- )
234
 
235
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
236
  combined_text = "\n\n".join(texts)
237
 
238
  return combined_text
239
 
240
  def process_generic(file_path: str) -> str:
241
  """
242
- Generic document processor using unstructured's auto partitioning
243
  """
244
  try:
245
- elements = partition(
246
- filename=file_path,
247
- )
248
 
249
- texts = [element.text for element in elements if hasattr(element, 'text') and element.text]
250
  combined_text = "\n\n".join(texts)
251
 
252
  return combined_text
253
  except Exception as e:
254
- # Fall back to basic text processing if auto-partition fails
255
  try:
256
  with open(file_path, 'r', encoding='utf-8') as f:
257
  return f.read()
 
4
  import tempfile
5
  from typing import Dict, Any, Optional, List
6
 
7
+ # Import Langchain document loaders
8
+ from langchain_community.document_loaders import (
9
+ PyMuPDFLoader,
10
+ UnstructuredWordDocumentLoader,
11
+ UnstructuredPowerPointLoader,
12
+ UnstructuredExcelLoader,
13
+ UnstructuredMarkdownLoader,
14
+ UnstructuredHTMLLoader,
15
+ UnstructuredXMLLoader,
16
+ UnstructuredEmailLoader,
17
+ UnstructuredFileLoader,
18
+ UnstructuredEPubLoader,
19
+ CSVLoader,
20
+ TextLoader
21
+ )
22
 
23
  def get_processor_for_file(file_path: str) -> Optional[callable]:
24
  """
 
26
  """
27
  file_extension = os.path.splitext(file_path)[1].lower()
28
 
29
+ # Map file extensions to specific processor functions
30
  processors = {
31
  ".pdf": process_pdf,
32
  ".docx": process_docx,
 
43
  ".eml": process_email,
44
  ".epub": process_epub,
45
  ".txt": process_text,
46
+ ".csv": process_csv,
47
  ".rtf": process_text,
48
 
49
  # Code files
 
78
 
79
  def process_pdf(file_path: str) -> str:
80
  """
81
+ Process PDF documents using pymupdf4llm for better PDF handling
82
  """
83
+ # For PDFs, we'll still use pymupdf4llm as it handles tables and images better
84
+ pdf_processor = pymupdf4llm.PdfProcessor(file_path)
85
 
86
+ # Extract text, tables, and images
87
+ extracted_text = pdf_processor.extract_text()
88
+ extracted_tables = pdf_processor.extract_tables()
89
+ extracted_images = pdf_processor.extract_images()
90
+
91
+ # Combine extracted content
92
+ combined_content = []
93
+
94
+ if extracted_text:
95
+ combined_content.append(extracted_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
+ if extracted_tables:
98
+ for table in extracted_tables:
99
+ combined_content.append(str(table))
100
 
101
+ if extracted_images:
102
+ combined_content.append(f"Extracted {len(extracted_images)} images.")
103
+
104
+ return "\n\n".join(combined_content)
105
 
106
  def process_docx(file_path: str) -> str:
107
  """
108
+ Process DOCX documents using Langchain's UnstructuredWordDocumentLoader
109
  """
110
+ loader = UnstructuredWordDocumentLoader(file_path)
111
+ docs = loader.load()
 
 
 
 
 
112
 
113
+ texts = [doc.page_content for doc in docs if doc.page_content]
114
  combined_text = "\n\n".join(texts)
115
 
116
  return combined_text
117
 
118
  def process_pptx(file_path: str) -> str:
119
  """
120
+ Process PPTX documents using Langchain's UnstructuredPowerPointLoader
121
  """
122
+ loader = UnstructuredPowerPointLoader(file_path)
123
+ docs = loader.load()
 
124
 
125
+ texts = [doc.page_content for doc in docs if doc.page_content]
126
  combined_text = "\n\n".join(texts)
127
 
128
  return combined_text
129
 
130
  def process_xlsx(file_path: str) -> str:
131
  """
132
+ Process XLSX documents using Langchain's UnstructuredExcelLoader
133
  """
134
+ loader = UnstructuredExcelLoader(file_path)
135
+ docs = loader.load()
 
136
 
137
+ texts = [doc.page_content for doc in docs if doc.page_content]
138
  combined_text = "\n\n".join(texts)
139
 
140
  return combined_text
141
 
142
  def process_markdown(file_path: str) -> str:
143
  """
144
+ Process Markdown documents using Langchain's UnstructuredMarkdownLoader
145
  """
146
+ loader = UnstructuredMarkdownLoader(file_path)
147
+ docs = loader.load()
 
148
 
149
+ texts = [doc.page_content for doc in docs if doc.page_content]
150
  combined_text = "\n\n".join(texts)
151
 
152
  return combined_text
153
 
154
  def process_html(file_path: str) -> str:
155
  """
156
+ Process HTML documents using Langchain's UnstructuredHTMLLoader
157
  """
158
+ loader = UnstructuredHTMLLoader(file_path)
159
+ docs = loader.load()
 
160
 
161
+ texts = [doc.page_content for doc in docs if doc.page_content]
162
  combined_text = "\n\n".join(texts)
163
 
164
  return combined_text
165
 
166
  def process_xml(file_path: str) -> str:
167
  """
168
+ Process XML documents using Langchain's UnstructuredXMLLoader
169
  """
170
+ loader = UnstructuredXMLLoader(file_path)
171
+ docs = loader.load()
 
172
 
173
+ texts = [doc.page_content for doc in docs if doc.page_content]
174
  combined_text = "\n\n".join(texts)
175
 
176
  return combined_text
177
 
178
  def process_email(file_path: str) -> str:
179
  """
180
+ Process email documents using Langchain's UnstructuredEmailLoader
181
  """
182
+ loader = UnstructuredEmailLoader(file_path)
183
+ docs = loader.load()
 
184
 
185
+ texts = [doc.page_content for doc in docs if doc.page_content]
186
  combined_text = "\n\n".join(texts)
187
 
188
  return combined_text
189
 
190
  def process_text(file_path: str) -> str:
191
  """
192
+ Process text documents using Langchain's TextLoader
193
+ """
194
+ loader = TextLoader(file_path, encoding="utf-8")
195
+ try:
196
+ docs = loader.load()
197
+
198
+ texts = [doc.page_content for doc in docs if doc.page_content]
199
+ combined_text = "\n\n".join(texts)
200
+
201
+ return combined_text
202
+ except UnicodeDecodeError:
203
+ # Try with a different encoding if utf-8 fails
204
+ loader = TextLoader(file_path, encoding="latin-1")
205
+ docs = loader.load()
206
+
207
+ texts = [doc.page_content for doc in docs if doc.page_content]
208
+ combined_text = "\n\n".join(texts)
209
+
210
+ return combined_text
211
+
212
+ def process_csv(file_path: str) -> str:
213
+ """
214
+ Process CSV documents using Langchain's CSVLoader
215
  """
216
+ loader = CSVLoader(file_path)
217
+ docs = loader.load()
 
 
 
 
 
218
 
219
+ # Create a formatted string representation of the CSV data
220
+ rows = []
221
+ if docs:
222
+ # Get column names from metadata if available
223
+ if hasattr(docs[0], 'metadata') and 'columns' in docs[0].metadata:
224
+ rows.append(",".join(docs[0].metadata['columns']))
225
+
226
+ # Add content rows
227
+ for doc in docs:
228
+ rows.append(doc.page_content)
229
 
230
+ return "\n".join(rows)
231
 
232
  def process_epub(file_path: str) -> str:
233
  """
234
+ Process EPUB documents using Langchain's UnstructuredEPubLoader
235
  """
236
+ loader = UnstructuredEPubLoader(file_path)
237
+ docs = loader.load()
 
238
 
239
+ texts = [doc.page_content for doc in docs if doc.page_content]
240
  combined_text = "\n\n".join(texts)
241
 
242
  return combined_text
243
 
244
  def process_generic(file_path: str) -> str:
245
  """
246
+ Generic document processor using Langchain's UnstructuredFileLoader
247
  """
248
  try:
249
+ loader = UnstructuredFileLoader(file_path)
250
+ docs = loader.load()
 
251
 
252
+ texts = [doc.page_content for doc in docs if doc.page_content]
253
  combined_text = "\n\n".join(texts)
254
 
255
  return combined_text
256
  except Exception as e:
257
+ # Fall back to basic text processing if UnstructuredFileLoader fails
258
  try:
259
  with open(file_path, 'r', encoding='utf-8') as f:
260
  return f.read()