""" CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용 """ import os import time from typing import List, Dict, Any, Optional from langchain.schema import Document from concurrent.futures import ThreadPoolExecutor # 멀티프로세싱 가져오기 import multiprocessing try: CPU_COUNT = multiprocessing.cpu_count() except: CPU_COUNT = 4 print(f"CPU 코어 수: {CPU_COUNT}") # docling 라이브러리 존재 여부 확인 try: from docling.datamodel.base_models import InputFormat from docling.document_converter import DocumentConverter, PdfFormatOption from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode from docling.chunking import HybridChunker DOCLING_AVAILABLE = True print("docling 라이브러리 사용 가능") except ImportError: print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.") DOCLING_AVAILABLE = False # LangChain 문서 로더 from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter class OptimizedDocumentProcessor: """ CPU에 최적화된 병렬 처리 문서 처리 클래스 """ def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200, tokenizer: str = "Alibaba-NLP/gte-multilingual-base", # 올바른 모델 경로로 수정 max_workers: int = CPU_COUNT): """ 문서 처리기 초기화 Args: chunk_size: 텍스트 청크 크기 chunk_overlap: 청크 간 겹침 크기 tokenizer: HybridChunker에서 사용할 토크나이저 max_workers: 병렬 처리시 최대 작업자 수 """ self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.tokenizer = tokenizer self.max_workers = max(1, min(max_workers, CPU_COUNT)) # CPU 코어 수 초과하지 않도록 print(f"병렬 처리 작업자 수: {self.max_workers}") # LangChain 텍스트 스플리터 self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", ". ", " ", ""], ) # docling 관련 컴포넌트 초기화 if DOCLING_AVAILABLE: # 파이프라인 옵션 설정 self.pipeline_options = PdfPipelineOptions(do_table_structure=True) self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE # 문서 변환기 초기화 self.doc_converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options) } ) # HybridChunker 초기화 (trust_remote_code=True 추가) self.hybrid_chunker = HybridChunker( tokenizer=tokenizer, chunk_size=chunk_size, overlap=chunk_overlap, tokenizer_kwargs={"trust_remote_code": True} # 원격 코드 실행 허용 ) print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})") def process_with_docling(self, pdf_path: str) -> Dict[str, Any]: """ docling을 사용하여 PDF 문서 처리 Args: pdf_path: PDF 파일 경로 Returns: 처리된 문서 데이터 """ if not DOCLING_AVAILABLE: raise ImportError("docling 라이브러리가 설치되지 않았습니다.") try: start_time = time.time() # 문서 변환 conv_res = self.doc_converter.convert(pdf_path) doc = conv_res.document # 성능 측정 conversion_time = time.time() - start_time print(f"PDF 변환 시간: {conversion_time:.2f}초") # 메타데이터 추출 metadata = { "source": pdf_path, "title": os.path.basename(pdf_path), "processing_time": conversion_time } return { "content": doc.export_to_markdown(), "metadata": metadata, "raw_document": doc, } except Exception as e: print(f"docling으로 문서 처리 중 오류 발생: {e}") raise def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]: """ HybridChunker를 사용하여 문서를 청크로 분할 Args: doc: docling 문서 객체 Returns: 청크 리스트 """ start_time = time.time() # 청킹 수행 chunk_iter = self.hybrid_chunker.chunk(doc) chunks = list(chunk_iter) chunking_time = time.time() - start_time print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})") return chunks def create_langchain_documents_from_chunks(self, chunks: List[Dict[str, Any]], metadata: Dict[str, Any]) -> List[Document]: """ docling 청크를 LangChain Document 객체로 변환 Args: chunks: docling HybridChunker로 생성한 청크 리스트 metadata: 문서 메타데이터 Returns: LangChain Document 객체 리스트 """ documents = [] for i, chunk in enumerate(chunks): # 각 청크에 대한 메타데이터 chunk_metadata = metadata.copy() chunk_metadata["chunk_id"] = i # 청크 내용 추출 if hasattr(chunk, "text"): content = chunk.text elif hasattr(chunk, "content"): content = chunk.content else: content = str(chunk) document = Document( page_content=content, metadata=chunk_metadata ) documents.append(document) return documents def process_with_langchain(self, pdf_path: str) -> List[Document]: """ LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드 Args: pdf_path: PDF 파일 경로 Returns: LangChain Document 객체 리스트 """ start_time = time.time() try: loader = PyPDFLoader(pdf_path) documents = loader.load() processing_time = time.time() - start_time print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초") return documents except Exception as e: print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}") raise def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]: """ PDF 파일 처리 Args: pdf_path: PDF 파일 경로 use_docling: docling 사용 여부 Returns: 처리된 문서의 청크 리스트 """ total_start_time = time.time() # docling 사용 가능 여부 확인 can_use_docling = use_docling and DOCLING_AVAILABLE if can_use_docling: try: # 1. docling으로 PDF 처리 docling_result = self.process_with_docling(pdf_path) doc = docling_result["raw_document"] metadata = docling_result["metadata"] # 2. HybridChunker로 청크 생성 chunks = self.chunk_with_hybrid_chunker(doc) # 3. 청크를 LangChain Document로 변환 documents = self.create_langchain_documents_from_chunks(chunks, metadata) total_time = time.time() - total_start_time print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초") return documents except Exception as e: print(f"docling 처리 실패, PyPDFLoader로 대체: {e}") can_use_docling = False if not can_use_docling: # PyPDFLoader로 처리 (대체 방안) documents = self.process_with_langchain(pdf_path) chunks = self.text_splitter.split_documents(documents) total_time = time.time() - total_start_time print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초") return chunks def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]: """ 디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩) Args: directory: PDF 파일 디렉토리 경로 use_docling: docling 사용 여부 Returns: 처리된 모든 문서의 청크 리스트 """ all_documents = [] pdf_files = [] # PDF 파일 목록 수집 for file in os.listdir(directory): if file.endswith(".pdf"): pdf_path = os.path.join(directory, file) pdf_files.append(pdf_path) if not pdf_files: print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.") return [] print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)") start_time = time.time() # 병렬 처리 실행 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 각 PDF 파일에 대해 process_pdf 함수 병렬 실행 future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path for pdf_path in pdf_files} # 결과 수집 for future in future_to_pdf: pdf_path = future_to_pdf[future] try: # 결과 가져오기 chunks = future.result() all_documents.extend(chunks) print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크") except Exception as e: print(f"'{pdf_path}' 처리 중 오류 발생: {e}") total_time = time.time() - start_time print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") return all_documents def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]: """ 디렉토리 내 모든 PDF 파일 처리 Args: directory: PDF 파일 디렉토리 경로 use_docling: docling 사용 여부 parallel: 병렬 처리 사용 여부 Returns: 처리된 모든 문서의 청크 리스트 """ # 병렬 처리 사용 if parallel: return self.process_directory_parallel(directory, use_docling) # 순차 처리 all_documents = [] start_time = time.time() for file in os.listdir(directory): if file.endswith(".pdf"): pdf_path = os.path.join(directory, file) print(f"처리 중: {pdf_path}") try: chunks = self.process_pdf(pdf_path, use_docling=use_docling) all_documents.extend(chunks) except Exception as e: print(f"'{pdf_path}' 처리 중 오류 발생: {e}") total_time = time.time() - start_time print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") return all_documents