RAG4_Voice_Fast / optimized_document_processor.py
jeongsoo's picture
Add greeting function to app.py
1f59ca4
"""
CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용
"""
import os
import time
from typing import List, Dict, Any, Optional
from langchain.schema import Document
from concurrent.futures import ThreadPoolExecutor
# 멀티프로세싱 가져오기
import multiprocessing
try:
CPU_COUNT = multiprocessing.cpu_count()
except:
CPU_COUNT = 4
print(f"CPU 코어 수: {CPU_COUNT}")
# docling 라이브러리 존재 여부 확인
try:
from docling.datamodel.base_models import InputFormat
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode
from docling.chunking import HybridChunker
DOCLING_AVAILABLE = True
print("docling 라이브러리 사용 가능")
except ImportError:
print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.")
DOCLING_AVAILABLE = False
# LangChain 문서 로더
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class OptimizedDocumentProcessor:
"""
CPU에 최적화된 병렬 처리 문서 처리 클래스
"""
def __init__(self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
tokenizer: str = "Alibaba-NLP/gte-multilingual-base", # 올바른 모델 경로로 수정
max_workers: int = CPU_COUNT):
"""
문서 처리기 초기화
Args:
chunk_size: 텍스트 청크 크기
chunk_overlap: 청크 간 겹침 크기
tokenizer: HybridChunker에서 사용할 토크나이저
max_workers: 병렬 처리시 최대 작업자 수
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.tokenizer = tokenizer
self.max_workers = max(1, min(max_workers, CPU_COUNT)) # CPU 코어 수 초과하지 않도록
print(f"병렬 처리 작업자 수: {self.max_workers}")
# LangChain 텍스트 스플리터
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
)
# docling 관련 컴포넌트 초기화
if DOCLING_AVAILABLE:
# 파이프라인 옵션 설정
self.pipeline_options = PdfPipelineOptions(do_table_structure=True)
self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
# 문서 변환기 초기화
self.doc_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options)
}
)
# HybridChunker 초기화 (trust_remote_code=True 추가)
self.hybrid_chunker = HybridChunker(
tokenizer=tokenizer,
chunk_size=chunk_size,
overlap=chunk_overlap,
tokenizer_kwargs={"trust_remote_code": True} # 원격 코드 실행 허용
)
print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})")
def process_with_docling(self, pdf_path: str) -> Dict[str, Any]:
"""
docling을 사용하여 PDF 문서 처리
Args:
pdf_path: PDF 파일 경로
Returns:
처리된 문서 데이터
"""
if not DOCLING_AVAILABLE:
raise ImportError("docling 라이브러리가 설치되지 않았습니다.")
try:
start_time = time.time()
# 문서 변환
conv_res = self.doc_converter.convert(pdf_path)
doc = conv_res.document
# 성능 측정
conversion_time = time.time() - start_time
print(f"PDF 변환 시간: {conversion_time:.2f}초")
# 메타데이터 추출
metadata = {
"source": pdf_path,
"title": os.path.basename(pdf_path),
"processing_time": conversion_time
}
return {
"content": doc.export_to_markdown(),
"metadata": metadata,
"raw_document": doc,
}
except Exception as e:
print(f"docling으로 문서 처리 중 오류 발생: {e}")
raise
def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]:
"""
HybridChunker를 사용하여 문서를 청크로 분할
Args:
doc: docling 문서 객체
Returns:
청크 리스트
"""
start_time = time.time()
# 청킹 수행
chunk_iter = self.hybrid_chunker.chunk(doc)
chunks = list(chunk_iter)
chunking_time = time.time() - start_time
print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})")
return chunks
def create_langchain_documents_from_chunks(self,
chunks: List[Dict[str, Any]],
metadata: Dict[str, Any]) -> List[Document]:
"""
docling 청크를 LangChain Document 객체로 변환
Args:
chunks: docling HybridChunker로 생성한 청크 리스트
metadata: 문서 메타데이터
Returns:
LangChain Document 객체 리스트
"""
documents = []
for i, chunk in enumerate(chunks):
# 각 청크에 대한 메타데이터
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = i
# 청크 내용 추출
if hasattr(chunk, "text"):
content = chunk.text
elif hasattr(chunk, "content"):
content = chunk.content
else:
content = str(chunk)
document = Document(
page_content=content,
metadata=chunk_metadata
)
documents.append(document)
return documents
def process_with_langchain(self, pdf_path: str) -> List[Document]:
"""
LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드
Args:
pdf_path: PDF 파일 경로
Returns:
LangChain Document 객체 리스트
"""
start_time = time.time()
try:
loader = PyPDFLoader(pdf_path)
documents = loader.load()
processing_time = time.time() - start_time
print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초")
return documents
except Exception as e:
print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}")
raise
def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]:
"""
PDF 파일 처리
Args:
pdf_path: PDF 파일 경로
use_docling: docling 사용 여부
Returns:
처리된 문서의 청크 리스트
"""
total_start_time = time.time()
# docling 사용 가능 여부 확인
can_use_docling = use_docling and DOCLING_AVAILABLE
if can_use_docling:
try:
# 1. docling으로 PDF 처리
docling_result = self.process_with_docling(pdf_path)
doc = docling_result["raw_document"]
metadata = docling_result["metadata"]
# 2. HybridChunker로 청크 생성
chunks = self.chunk_with_hybrid_chunker(doc)
# 3. 청크를 LangChain Document로 변환
documents = self.create_langchain_documents_from_chunks(chunks, metadata)
total_time = time.time() - total_start_time
print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초")
return documents
except Exception as e:
print(f"docling 처리 실패, PyPDFLoader로 대체: {e}")
can_use_docling = False
if not can_use_docling:
# PyPDFLoader로 처리 (대체 방안)
documents = self.process_with_langchain(pdf_path)
chunks = self.text_splitter.split_documents(documents)
total_time = time.time() - total_start_time
print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초")
return chunks
def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]:
"""
디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩)
Args:
directory: PDF 파일 디렉토리 경로
use_docling: docling 사용 여부
Returns:
처리된 모든 문서의 청크 리스트
"""
all_documents = []
pdf_files = []
# PDF 파일 목록 수집
for file in os.listdir(directory):
if file.endswith(".pdf"):
pdf_path = os.path.join(directory, file)
pdf_files.append(pdf_path)
if not pdf_files:
print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.")
return []
print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)")
start_time = time.time()
# 병렬 처리 실행
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 각 PDF 파일에 대해 process_pdf 함수 병렬 실행
future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path
for pdf_path in pdf_files}
# 결과 수집
for future in future_to_pdf:
pdf_path = future_to_pdf[future]
try:
# 결과 가져오기
chunks = future.result()
all_documents.extend(chunks)
print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크")
except Exception as e:
print(f"'{pdf_path}' 처리 중 오류 발생: {e}")
total_time = time.time() - start_time
print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")
return all_documents
def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]:
"""
디렉토리 내 모든 PDF 파일 처리
Args:
directory: PDF 파일 디렉토리 경로
use_docling: docling 사용 여부
parallel: 병렬 처리 사용 여부
Returns:
처리된 모든 문서의 청크 리스트
"""
# 병렬 처리 사용
if parallel:
return self.process_directory_parallel(directory, use_docling)
# 순차 처리
all_documents = []
start_time = time.time()
for file in os.listdir(directory):
if file.endswith(".pdf"):
pdf_path = os.path.join(directory, file)
print(f"처리 중: {pdf_path}")
try:
chunks = self.process_pdf(pdf_path, use_docling=use_docling)
all_documents.extend(chunks)
except Exception as e:
print(f"'{pdf_path}' 처리 중 오류 발생: {e}")
total_time = time.time() - start_time
print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")
return all_documents