|
""" |
|
CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용 |
|
""" |
|
import os |
|
import time |
|
from typing import List, Dict, Any, Optional |
|
from langchain.schema import Document |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
import multiprocessing |
|
|
|
try: |
|
CPU_COUNT = multiprocessing.cpu_count() |
|
except: |
|
CPU_COUNT = 4 |
|
|
|
print(f"CPU 코어 수: {CPU_COUNT}") |
|
|
|
|
|
try: |
|
from docling.datamodel.base_models import InputFormat |
|
from docling.document_converter import DocumentConverter, PdfFormatOption |
|
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode |
|
from docling.chunking import HybridChunker |
|
|
|
DOCLING_AVAILABLE = True |
|
print("docling 라이브러리 사용 가능") |
|
except ImportError: |
|
print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.") |
|
DOCLING_AVAILABLE = False |
|
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
|
|
|
class OptimizedDocumentProcessor: |
|
""" |
|
CPU에 최적화된 병렬 처리 문서 처리 클래스 |
|
""" |
|
|
|
def __init__(self, |
|
chunk_size: int = 1000, |
|
chunk_overlap: int = 200, |
|
tokenizer: str = "Alibaba-NLP/gte-multilingual-base", |
|
max_workers: int = CPU_COUNT): |
|
""" |
|
문서 처리기 초기화 |
|
|
|
Args: |
|
chunk_size: 텍스트 청크 크기 |
|
chunk_overlap: 청크 간 겹침 크기 |
|
tokenizer: HybridChunker에서 사용할 토크나이저 |
|
max_workers: 병렬 처리시 최대 작업자 수 |
|
""" |
|
self.chunk_size = chunk_size |
|
self.chunk_overlap = chunk_overlap |
|
self.tokenizer = tokenizer |
|
self.max_workers = max(1, min(max_workers, CPU_COUNT)) |
|
|
|
print(f"병렬 처리 작업자 수: {self.max_workers}") |
|
|
|
|
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=chunk_size, |
|
chunk_overlap=chunk_overlap, |
|
separators=["\n\n", "\n", ". ", " ", ""], |
|
) |
|
|
|
|
|
if DOCLING_AVAILABLE: |
|
|
|
self.pipeline_options = PdfPipelineOptions(do_table_structure=True) |
|
self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE |
|
|
|
|
|
self.doc_converter = DocumentConverter( |
|
format_options={ |
|
InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options) |
|
} |
|
) |
|
|
|
|
|
self.hybrid_chunker = HybridChunker( |
|
tokenizer=tokenizer, |
|
chunk_size=chunk_size, |
|
overlap=chunk_overlap, |
|
tokenizer_kwargs={"trust_remote_code": True} |
|
) |
|
|
|
print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})") |
|
|
|
def process_with_docling(self, pdf_path: str) -> Dict[str, Any]: |
|
""" |
|
docling을 사용하여 PDF 문서 처리 |
|
|
|
Args: |
|
pdf_path: PDF 파일 경로 |
|
|
|
Returns: |
|
처리된 문서 데이터 |
|
""" |
|
if not DOCLING_AVAILABLE: |
|
raise ImportError("docling 라이브러리가 설치되지 않았습니다.") |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
|
|
conv_res = self.doc_converter.convert(pdf_path) |
|
doc = conv_res.document |
|
|
|
|
|
conversion_time = time.time() - start_time |
|
print(f"PDF 변환 시간: {conversion_time:.2f}초") |
|
|
|
|
|
metadata = { |
|
"source": pdf_path, |
|
"title": os.path.basename(pdf_path), |
|
"processing_time": conversion_time |
|
} |
|
|
|
return { |
|
"content": doc.export_to_markdown(), |
|
"metadata": metadata, |
|
"raw_document": doc, |
|
} |
|
|
|
except Exception as e: |
|
print(f"docling으로 문서 처리 중 오류 발생: {e}") |
|
raise |
|
|
|
def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]: |
|
""" |
|
HybridChunker를 사용하여 문서를 청크로 분할 |
|
|
|
Args: |
|
doc: docling 문서 객체 |
|
|
|
Returns: |
|
청크 리스트 |
|
""" |
|
start_time = time.time() |
|
|
|
|
|
chunk_iter = self.hybrid_chunker.chunk(doc) |
|
chunks = list(chunk_iter) |
|
|
|
chunking_time = time.time() - start_time |
|
print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})") |
|
|
|
return chunks |
|
|
|
def create_langchain_documents_from_chunks(self, |
|
chunks: List[Dict[str, Any]], |
|
metadata: Dict[str, Any]) -> List[Document]: |
|
""" |
|
docling 청크를 LangChain Document 객체로 변환 |
|
|
|
Args: |
|
chunks: docling HybridChunker로 생성한 청크 리스트 |
|
metadata: 문서 메타데이터 |
|
|
|
Returns: |
|
LangChain Document 객체 리스트 |
|
""" |
|
documents = [] |
|
|
|
for i, chunk in enumerate(chunks): |
|
|
|
chunk_metadata = metadata.copy() |
|
chunk_metadata["chunk_id"] = i |
|
|
|
|
|
if hasattr(chunk, "text"): |
|
content = chunk.text |
|
elif hasattr(chunk, "content"): |
|
content = chunk.content |
|
else: |
|
content = str(chunk) |
|
|
|
document = Document( |
|
page_content=content, |
|
metadata=chunk_metadata |
|
) |
|
documents.append(document) |
|
|
|
return documents |
|
|
|
def process_with_langchain(self, pdf_path: str) -> List[Document]: |
|
""" |
|
LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드 |
|
|
|
Args: |
|
pdf_path: PDF 파일 경로 |
|
|
|
Returns: |
|
LangChain Document 객체 리스트 |
|
""" |
|
start_time = time.time() |
|
|
|
try: |
|
loader = PyPDFLoader(pdf_path) |
|
documents = loader.load() |
|
|
|
processing_time = time.time() - start_time |
|
print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초") |
|
|
|
return documents |
|
except Exception as e: |
|
print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}") |
|
raise |
|
|
|
def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]: |
|
""" |
|
PDF 파일 처리 |
|
|
|
Args: |
|
pdf_path: PDF 파일 경로 |
|
use_docling: docling 사용 여부 |
|
|
|
Returns: |
|
처리된 문서의 청크 리스트 |
|
""" |
|
total_start_time = time.time() |
|
|
|
|
|
can_use_docling = use_docling and DOCLING_AVAILABLE |
|
|
|
if can_use_docling: |
|
try: |
|
|
|
docling_result = self.process_with_docling(pdf_path) |
|
doc = docling_result["raw_document"] |
|
metadata = docling_result["metadata"] |
|
|
|
|
|
chunks = self.chunk_with_hybrid_chunker(doc) |
|
|
|
|
|
documents = self.create_langchain_documents_from_chunks(chunks, metadata) |
|
|
|
total_time = time.time() - total_start_time |
|
print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초") |
|
|
|
return documents |
|
except Exception as e: |
|
print(f"docling 처리 실패, PyPDFLoader로 대체: {e}") |
|
can_use_docling = False |
|
|
|
if not can_use_docling: |
|
|
|
documents = self.process_with_langchain(pdf_path) |
|
chunks = self.text_splitter.split_documents(documents) |
|
|
|
total_time = time.time() - total_start_time |
|
print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초") |
|
|
|
return chunks |
|
|
|
def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]: |
|
""" |
|
디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩) |
|
|
|
Args: |
|
directory: PDF 파일 디렉토리 경로 |
|
use_docling: docling 사용 여부 |
|
|
|
Returns: |
|
처리된 모든 문서의 청크 리스트 |
|
""" |
|
all_documents = [] |
|
pdf_files = [] |
|
|
|
|
|
for file in os.listdir(directory): |
|
if file.endswith(".pdf"): |
|
pdf_path = os.path.join(directory, file) |
|
pdf_files.append(pdf_path) |
|
|
|
if not pdf_files: |
|
print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.") |
|
return [] |
|
|
|
print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)") |
|
start_time = time.time() |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
|
|
future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path |
|
for pdf_path in pdf_files} |
|
|
|
|
|
for future in future_to_pdf: |
|
pdf_path = future_to_pdf[future] |
|
try: |
|
|
|
chunks = future.result() |
|
all_documents.extend(chunks) |
|
print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크") |
|
except Exception as e: |
|
print(f"'{pdf_path}' 처리 중 오류 발생: {e}") |
|
|
|
total_time = time.time() - start_time |
|
print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") |
|
|
|
return all_documents |
|
|
|
def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]: |
|
""" |
|
디렉토리 내 모든 PDF 파일 처리 |
|
|
|
Args: |
|
directory: PDF 파일 디렉토리 경로 |
|
use_docling: docling 사용 여부 |
|
parallel: 병렬 처리 사용 여부 |
|
|
|
Returns: |
|
처리된 모든 문서의 청크 리스트 |
|
""" |
|
|
|
if parallel: |
|
return self.process_directory_parallel(directory, use_docling) |
|
|
|
|
|
all_documents = [] |
|
start_time = time.time() |
|
|
|
for file in os.listdir(directory): |
|
if file.endswith(".pdf"): |
|
pdf_path = os.path.join(directory, file) |
|
print(f"처리 중: {pdf_path}") |
|
|
|
try: |
|
chunks = self.process_pdf(pdf_path, use_docling=use_docling) |
|
all_documents.extend(chunks) |
|
except Exception as e: |
|
print(f"'{pdf_path}' 처리 중 오류 발생: {e}") |
|
|
|
total_time = time.time() - start_time |
|
print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") |
|
|
|
return all_documents |