Spaces:
Paused
Paused
""" | |
CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용 | |
""" | |
import os | |
import time | |
from typing import List, Dict, Any, Optional | |
from langchain.schema import Document | |
from concurrent.futures import ThreadPoolExecutor | |
# 멀티프로세싱 가져오기 | |
import multiprocessing | |
try: | |
CPU_COUNT = multiprocessing.cpu_count() | |
except: | |
CPU_COUNT = 4 | |
print(f"CPU 코어 수: {CPU_COUNT}") | |
# docling 라이브러리 존재 여부 확인 | |
try: | |
from docling.datamodel.base_models import InputFormat | |
from docling.document_converter import DocumentConverter, PdfFormatOption | |
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode | |
from docling.chunking import HybridChunker | |
DOCLING_AVAILABLE = True | |
print("docling 라이브러리 사용 가능") | |
except ImportError: | |
print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.") | |
DOCLING_AVAILABLE = False | |
# LangChain 문서 로더 | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
class OptimizedDocumentProcessor: | |
""" | |
CPU에 최적화된 병렬 처리 문서 처리 클래스 | |
""" | |
def __init__(self, | |
chunk_size: int = 1000, | |
chunk_overlap: int = 200, | |
tokenizer: str = "Alibaba-NLP/gte-multilingual-base", # 올바른 모델 경로로 수정 | |
max_workers: int = CPU_COUNT): | |
""" | |
문서 처리기 초기화 | |
Args: | |
chunk_size: 텍스트 청크 크기 | |
chunk_overlap: 청크 간 겹침 크기 | |
tokenizer: HybridChunker에서 사용할 토크나이저 | |
max_workers: 병렬 처리시 최대 작업자 수 | |
""" | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
self.tokenizer = tokenizer | |
self.max_workers = max(1, min(max_workers, CPU_COUNT)) # CPU 코어 수 초과하지 않도록 | |
print(f"병렬 처리 작업자 수: {self.max_workers}") | |
# LangChain 텍스트 스플리터 | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
separators=["\n\n", "\n", ". ", " ", ""], | |
) | |
# docling 관련 컴포넌트 초기화 | |
if DOCLING_AVAILABLE: | |
# 파이프라인 옵션 설정 | |
self.pipeline_options = PdfPipelineOptions(do_table_structure=True) | |
self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE | |
# 문서 변환기 초기화 | |
self.doc_converter = DocumentConverter( | |
format_options={ | |
InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options) | |
} | |
) | |
# HybridChunker 초기화 (trust_remote_code=True 추가) | |
self.hybrid_chunker = HybridChunker( | |
tokenizer=tokenizer, | |
chunk_size=chunk_size, | |
overlap=chunk_overlap, | |
tokenizer_kwargs={"trust_remote_code": True} # 원격 코드 실행 허용 | |
) | |
print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})") | |
def process_with_docling(self, pdf_path: str) -> Dict[str, Any]: | |
""" | |
docling을 사용하여 PDF 문서 처리 | |
Args: | |
pdf_path: PDF 파일 경로 | |
Returns: | |
처리된 문서 데이터 | |
""" | |
if not DOCLING_AVAILABLE: | |
raise ImportError("docling 라이브러리가 설치되지 않았습니다.") | |
try: | |
start_time = time.time() | |
# 문서 변환 | |
conv_res = self.doc_converter.convert(pdf_path) | |
doc = conv_res.document | |
# 성능 측정 | |
conversion_time = time.time() - start_time | |
print(f"PDF 변환 시간: {conversion_time:.2f}초") | |
# 메타데이터 추출 | |
metadata = { | |
"source": pdf_path, | |
"title": os.path.basename(pdf_path), | |
"processing_time": conversion_time | |
} | |
return { | |
"content": doc.export_to_markdown(), | |
"metadata": metadata, | |
"raw_document": doc, | |
} | |
except Exception as e: | |
print(f"docling으로 문서 처리 중 오류 발생: {e}") | |
raise | |
def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]: | |
""" | |
HybridChunker를 사용하여 문서를 청크로 분할 | |
Args: | |
doc: docling 문서 객체 | |
Returns: | |
청크 리스트 | |
""" | |
start_time = time.time() | |
# 청킹 수행 | |
chunk_iter = self.hybrid_chunker.chunk(doc) | |
chunks = list(chunk_iter) | |
chunking_time = time.time() - start_time | |
print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})") | |
return chunks | |
def create_langchain_documents_from_chunks(self, | |
chunks: List[Dict[str, Any]], | |
metadata: Dict[str, Any]) -> List[Document]: | |
""" | |
docling 청크를 LangChain Document 객체로 변환 | |
Args: | |
chunks: docling HybridChunker로 생성한 청크 리스트 | |
metadata: 문서 메타데이터 | |
Returns: | |
LangChain Document 객체 리스트 | |
""" | |
documents = [] | |
for i, chunk in enumerate(chunks): | |
# 각 청크에 대한 메타데이터 | |
chunk_metadata = metadata.copy() | |
chunk_metadata["chunk_id"] = i | |
# 청크 내용 추출 | |
if hasattr(chunk, "text"): | |
content = chunk.text | |
elif hasattr(chunk, "content"): | |
content = chunk.content | |
else: | |
content = str(chunk) | |
document = Document( | |
page_content=content, | |
metadata=chunk_metadata | |
) | |
documents.append(document) | |
return documents | |
def process_with_langchain(self, pdf_path: str) -> List[Document]: | |
""" | |
LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드 | |
Args: | |
pdf_path: PDF 파일 경로 | |
Returns: | |
LangChain Document 객체 리스트 | |
""" | |
start_time = time.time() | |
try: | |
loader = PyPDFLoader(pdf_path) | |
documents = loader.load() | |
processing_time = time.time() - start_time | |
print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초") | |
return documents | |
except Exception as e: | |
print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}") | |
raise | |
def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]: | |
""" | |
PDF 파일 처리 | |
Args: | |
pdf_path: PDF 파일 경로 | |
use_docling: docling 사용 여부 | |
Returns: | |
처리된 문서의 청크 리스트 | |
""" | |
total_start_time = time.time() | |
# docling 사용 가능 여부 확인 | |
can_use_docling = use_docling and DOCLING_AVAILABLE | |
if can_use_docling: | |
try: | |
# 1. docling으로 PDF 처리 | |
docling_result = self.process_with_docling(pdf_path) | |
doc = docling_result["raw_document"] | |
metadata = docling_result["metadata"] | |
# 2. HybridChunker로 청크 생성 | |
chunks = self.chunk_with_hybrid_chunker(doc) | |
# 3. 청크를 LangChain Document로 변환 | |
documents = self.create_langchain_documents_from_chunks(chunks, metadata) | |
total_time = time.time() - total_start_time | |
print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초") | |
return documents | |
except Exception as e: | |
print(f"docling 처리 실패, PyPDFLoader로 대체: {e}") | |
can_use_docling = False | |
if not can_use_docling: | |
# PyPDFLoader로 처리 (대체 방안) | |
documents = self.process_with_langchain(pdf_path) | |
chunks = self.text_splitter.split_documents(documents) | |
total_time = time.time() - total_start_time | |
print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초") | |
return chunks | |
def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]: | |
""" | |
디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩) | |
Args: | |
directory: PDF 파일 디렉토리 경로 | |
use_docling: docling 사용 여부 | |
Returns: | |
처리된 모든 문서의 청크 리스트 | |
""" | |
all_documents = [] | |
pdf_files = [] | |
# PDF 파일 목록 수집 | |
for file in os.listdir(directory): | |
if file.endswith(".pdf"): | |
pdf_path = os.path.join(directory, file) | |
pdf_files.append(pdf_path) | |
if not pdf_files: | |
print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.") | |
return [] | |
print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)") | |
start_time = time.time() | |
# 병렬 처리 실행 | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# 각 PDF 파일에 대해 process_pdf 함수 병렬 실행 | |
future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path | |
for pdf_path in pdf_files} | |
# 결과 수집 | |
for future in future_to_pdf: | |
pdf_path = future_to_pdf[future] | |
try: | |
# 결과 가져오기 | |
chunks = future.result() | |
all_documents.extend(chunks) | |
print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크") | |
except Exception as e: | |
print(f"'{pdf_path}' 처리 중 오류 발생: {e}") | |
total_time = time.time() - start_time | |
print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") | |
return all_documents | |
def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]: | |
""" | |
디렉토리 내 모든 PDF 파일 처리 | |
Args: | |
directory: PDF 파일 디렉토리 경로 | |
use_docling: docling 사용 여부 | |
parallel: 병렬 처리 사용 여부 | |
Returns: | |
처리된 모든 문서의 청크 리스트 | |
""" | |
# 병렬 처리 사용 | |
if parallel: | |
return self.process_directory_parallel(directory, use_docling) | |
# 순차 처리 | |
all_documents = [] | |
start_time = time.time() | |
for file in os.listdir(directory): | |
if file.endswith(".pdf"): | |
pdf_path = os.path.join(directory, file) | |
print(f"처리 중: {pdf_path}") | |
try: | |
chunks = self.process_pdf(pdf_path, use_docling=use_docling) | |
all_documents.extend(chunks) | |
except Exception as e: | |
print(f"'{pdf_path}' 처리 중 오류 발생: {e}") | |
total_time = time.time() - start_time | |
print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") | |
return all_documents |