File size: 11,916 Bytes
14586a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""
CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용
"""
import os
import time
from typing import List, Dict, Any, Optional
from langchain.schema import Document
from concurrent.futures import ThreadPoolExecutor

# 멀티프로세싱 가져오기
import multiprocessing

try:
    CPU_COUNT = multiprocessing.cpu_count()
except:
    CPU_COUNT = 4

print(f"CPU 코어 수: {CPU_COUNT}")

# docling 라이브러리 존재 여부 확인
try:
    from docling.datamodel.base_models import InputFormat
    from docling.document_converter import DocumentConverter, PdfFormatOption
    from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode
    from docling.chunking import HybridChunker

    DOCLING_AVAILABLE = True
    print("docling 라이브러리 사용 가능")
except ImportError:
    print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.")
    DOCLING_AVAILABLE = False

# LangChain 문서 로더
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter


class OptimizedDocumentProcessor:
    """
    CPU에 최적화된 병렬 처리 문서 처리 클래스
    """

    def __init__(self,
                 chunk_size: int = 1000,
                 chunk_overlap: int = 200,
                 tokenizer: str = "Alibaba-NLP/gte-multilingual-base",  # 올바른 모델 경로로 수정
                 max_workers: int = CPU_COUNT):
        """
        문서 처리기 초기화

        Args:
            chunk_size: 텍스트 청크 크기
            chunk_overlap: 청크 간 겹침 크기
            tokenizer: HybridChunker에서 사용할 토크나이저
            max_workers: 병렬 처리시 최대 작업자 수
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.tokenizer = tokenizer
        self.max_workers = max(1, min(max_workers, CPU_COUNT))  # CPU 코어 수 초과하지 않도록

        print(f"병렬 처리 작업자 수: {self.max_workers}")

        # LangChain 텍스트 스플리터
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", " ", ""],
        )

        # docling 관련 컴포넌트 초기화
        if DOCLING_AVAILABLE:
            # 파이프라인 옵션 설정
            self.pipeline_options = PdfPipelineOptions(do_table_structure=True)
            self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE

            # 문서 변환기 초기화
            self.doc_converter = DocumentConverter(
                format_options={
                    InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options)
                }
            )

            # HybridChunker 초기화 (trust_remote_code=True 추가)
            self.hybrid_chunker = HybridChunker(
                tokenizer=tokenizer,
                chunk_size=chunk_size,
                overlap=chunk_overlap,
                tokenizer_kwargs={"trust_remote_code": True}  # 원격 코드 실행 허용
            )

            print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})")

    def process_with_docling(self, pdf_path: str) -> Dict[str, Any]:
        """
        docling을 사용하여 PDF 문서 처리

        Args:
            pdf_path: PDF 파일 경로

        Returns:
            처리된 문서 데이터
        """
        if not DOCLING_AVAILABLE:
            raise ImportError("docling 라이브러리가 설치되지 않았습니다.")

        try:
            start_time = time.time()

            # 문서 변환
            conv_res = self.doc_converter.convert(pdf_path)
            doc = conv_res.document

            # 성능 측정
            conversion_time = time.time() - start_time
            print(f"PDF 변환 시간: {conversion_time:.2f}초")

            # 메타데이터 추출
            metadata = {
                "source": pdf_path,
                "title": os.path.basename(pdf_path),
                "processing_time": conversion_time
            }

            return {
                "content": doc.export_to_markdown(),
                "metadata": metadata,
                "raw_document": doc,
            }

        except Exception as e:
            print(f"docling으로 문서 처리 중 오류 발생: {e}")
            raise

    def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]:
        """
        HybridChunker를 사용하여 문서를 청크로 분할

        Args:
            doc: docling 문서 객체

        Returns:
            청크 리스트
        """
        start_time = time.time()

        # 청킹 수행
        chunk_iter = self.hybrid_chunker.chunk(doc)
        chunks = list(chunk_iter)

        chunking_time = time.time() - start_time
        print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})")

        return chunks

    def create_langchain_documents_from_chunks(self,
                                               chunks: List[Dict[str, Any]],
                                               metadata: Dict[str, Any]) -> List[Document]:
        """
        docling 청크를 LangChain Document 객체로 변환

        Args:
            chunks: docling HybridChunker로 생성한 청크 리스트
            metadata: 문서 메타데이터

        Returns:
            LangChain Document 객체 리스트
        """
        documents = []

        for i, chunk in enumerate(chunks):
            # 각 청크에 대한 메타데이터
            chunk_metadata = metadata.copy()
            chunk_metadata["chunk_id"] = i

            # 청크 내용 추출
            if hasattr(chunk, "text"):
                content = chunk.text
            elif hasattr(chunk, "content"):
                content = chunk.content
            else:
                content = str(chunk)

            document = Document(
                page_content=content,
                metadata=chunk_metadata
            )
            documents.append(document)

        return documents

    def process_with_langchain(self, pdf_path: str) -> List[Document]:
        """
        LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드

        Args:
            pdf_path: PDF 파일 경로

        Returns:
            LangChain Document 객체 리스트
        """
        start_time = time.time()

        try:
            loader = PyPDFLoader(pdf_path)
            documents = loader.load()

            processing_time = time.time() - start_time
            print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초")

            return documents
        except Exception as e:
            print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}")
            raise

    def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]:
        """
        PDF 파일 처리

        Args:
            pdf_path: PDF 파일 경로
            use_docling: docling 사용 여부

        Returns:
            처리된 문서의 청크 리스트
        """
        total_start_time = time.time()

        # docling 사용 가능 여부 확인
        can_use_docling = use_docling and DOCLING_AVAILABLE

        if can_use_docling:
            try:
                # 1. docling으로 PDF 처리
                docling_result = self.process_with_docling(pdf_path)
                doc = docling_result["raw_document"]
                metadata = docling_result["metadata"]

                # 2. HybridChunker로 청크 생성
                chunks = self.chunk_with_hybrid_chunker(doc)

                # 3. 청크를 LangChain Document로 변환
                documents = self.create_langchain_documents_from_chunks(chunks, metadata)

                total_time = time.time() - total_start_time
                print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초")

                return documents
            except Exception as e:
                print(f"docling 처리 실패, PyPDFLoader로 대체: {e}")
                can_use_docling = False

        if not can_use_docling:
            # PyPDFLoader로 처리 (대체 방안)
            documents = self.process_with_langchain(pdf_path)
            chunks = self.text_splitter.split_documents(documents)

            total_time = time.time() - total_start_time
            print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초")

            return chunks

    def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]:
        """
        디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩)

        Args:
            directory: PDF 파일 디렉토리 경로
            use_docling: docling 사용 여부

        Returns:
            처리된 모든 문서의 청크 리스트
        """
        all_documents = []
        pdf_files = []

        # PDF 파일 목록 수집
        for file in os.listdir(directory):
            if file.endswith(".pdf"):
                pdf_path = os.path.join(directory, file)
                pdf_files.append(pdf_path)

        if not pdf_files:
            print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.")
            return []

        print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)")
        start_time = time.time()

        # 병렬 처리 실행
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 각 PDF 파일에 대해 process_pdf 함수 병렬 실행
            future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path
                             for pdf_path in pdf_files}

            # 결과 수집
            for future in future_to_pdf:
                pdf_path = future_to_pdf[future]
                try:
                    # 결과 가져오기
                    chunks = future.result()
                    all_documents.extend(chunks)
                    print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크")
                except Exception as e:
                    print(f"'{pdf_path}' 처리 중 오류 발생: {e}")

        total_time = time.time() - start_time
        print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")

        return all_documents

    def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]:
        """
        디렉토리 내 모든 PDF 파일 처리

        Args:
            directory: PDF 파일 디렉토리 경로
            use_docling: docling 사용 여부
            parallel: 병렬 처리 사용 여부

        Returns:
            처리된 모든 문서의 청크 리스트
        """
        # 병렬 처리 사용
        if parallel:
            return self.process_directory_parallel(directory, use_docling)

        # 순차 처리
        all_documents = []
        start_time = time.time()

        for file in os.listdir(directory):
            if file.endswith(".pdf"):
                pdf_path = os.path.join(directory, file)
                print(f"처리 중: {pdf_path}")

                try:
                    chunks = self.process_pdf(pdf_path, use_docling=use_docling)
                    all_documents.extend(chunks)
                except Exception as e:
                    print(f"'{pdf_path}' 처리 중 오류 발생: {e}")

        total_time = time.time() - start_time
        print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")

        return all_documents