""" 개선된 벡터 스토어 모듈 - Milvus 설정 최적화 및 예외 처리 강화 """ import os import logging from typing import List, Dict, Any, Optional import uuid from langchain.schema import Document # 로깅 설정 logger = logging.getLogger("VectorStore") # 벡터 스토어 관련 예외 클래스 class VectorStoreInitError(Exception): """벡터 스토어 초기화 중 발생한 오류""" pass class EmbeddingModelError(Exception): """임베딩 모델 초기화 중 발생한 오류""" pass class DocumentIndexError(Exception): """문서 인덱싱 중 발생한 오류""" pass class VectorSearchError(Exception): """벡터 검색 중 발생한 오류""" pass class PersistenceError(Exception): """인덱스 저장/로드 중 발생한 오류""" pass # 벡터 스토어 임포트 try: # 최신 버전 임포트 from langchain_milvus import Milvus from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings MODERN_IMPORTS = True logger.info("최신 langchain 패키지 임포트 성공") except ImportError: try: # 이전 버전 임포트 from langchain_community.vectorstores import Milvus, FAISS from langchain_community.embeddings import HuggingFaceEmbeddings MODERN_IMPORTS = False logger.info("레거시 langchain_community 패키지 사용") except ImportError as e: logger.error(f"필수 벡터 스토어 라이브러리를 임포트할 수 없습니다: {e}") raise VectorStoreInitError(f"필수 벡터 스토어 라이브러리를 임포트할 수 없습니다: {str(e)}") from config import MILVUS_HOST, MILVUS_PORT, MILVUS_COLLECTION, EMBEDDING_MODEL class VectorStore: def __init__(self, use_milvus: bool = True): """ 벡터 스토어 초기화 Args: use_milvus: Milvus 사용 여부 (False이면 FAISS 사용) """ self.use_milvus = use_milvus self.vector_store = None # 임베딩 모델 설정 logger.info(f"임베딩 모델 로드 중: {EMBEDDING_MODEL}") model_kwargs = { "device": "cpu", "trust_remote_code": True # 원격 코드 실행 허용 (필수) } encode_kwargs = {"normalize_embeddings": True} try: self.embeddings = HuggingFaceEmbeddings( model_name=EMBEDDING_MODEL, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs ) logger.info(f"임베딩 모델 초기화 완료: {EMBEDDING_MODEL}") except Exception as e: logger.error(f"임베딩 모델 초기화 실패: {e}", exc_info=True) raise EmbeddingModelError(f"임베딩 모델 '{EMBEDDING_MODEL}' 초기화 실패: {str(e)}") def init_milvus(self) -> Milvus: """ Milvus 벡터 스토어 초기화 Returns: Milvus 벡터 스토어 인스턴스 """ try: connection_args = { "host": MILVUS_HOST, "port": MILVUS_PORT, } # 벡터 검색 인덱스 파라미터 (FLAT 인덱스 및 코사인 유사도 메트릭) index_params = { "index_type": "FLAT", # 정확도 우선 FLAT 인덱스 "metric_type": "COSINE", # 코사인 유사도 (정규화된 벡터에 적합) "params": {} # FLAT 인덱스에는 추가 파라미터 없음 } logger.info(f"Milvus 연결 시도 중: {MILVUS_HOST}:{MILVUS_PORT}") milvus_store = Milvus( embedding_function=self.embeddings, collection_name=MILVUS_COLLECTION, connection_args=connection_args, index_params=index_params ) logger.info(f"Milvus 연결 성공: {MILVUS_COLLECTION}") return milvus_store except Exception as e: logger.error(f"Milvus 초기화 실패: {e}", exc_info=True) raise VectorStoreInitError(f"Milvus 벡터 스토어 초기화 실패: {str(e)}") def init_faiss(self) -> FAISS: """ FAISS 벡터 스토어 초기화 (로컬 대체용) Returns: FAISS 벡터 스토어 인스턴스 """ try: logger.info("FAISS 벡터 스토어 초기화 중") faiss_store = FAISS.from_documents([], self.embeddings) logger.info("FAISS 벡터 스토어 초기화 완료") return faiss_store except Exception as e: logger.error(f"FAISS 초기화 실패: {e}", exc_info=True) raise VectorStoreInitError(f"FAISS 벡터 스토어 초기화 실패: {str(e)}") def create_or_load(self, documents: Optional[List[Document]] = None) -> Any: """ 벡터 스토어 생성 또는 로드 Args: documents: 저장할 문서 리스트 (None이면 빈 스토어 생성) Returns: 벡터 스토어 인스턴스 """ if self.use_milvus: if documents: # 문서가 제공된 경우 새 컬렉션 생성 try: # 연결 설정 connection_args = { "host": MILVUS_HOST, "port": MILVUS_PORT, } # 검색 인덱스 설정 index_params = { "index_type": "FLAT", # 정확도 우선 "metric_type": "COSINE", # 코사인 유사도 "params": {} } logger.info(f"Milvus 컬렉션 생성 중: {MILVUS_COLLECTION} (기존 컬렉션 삭제)") # 문서로부터 Milvus 컬렉션 생성 self.vector_store = Milvus.from_documents( documents=documents, embedding=self.embeddings, collection_name=MILVUS_COLLECTION, connection_args=connection_args, index_params=index_params, drop_old=True # 기존 컬렉션 삭제 (재구축) ) logger.info(f"Milvus 컬렉션 생성 완료: {len(documents)}개 문서 인덱싱됨") except Exception as e: logger.error(f"Milvus 컬렉션 생성 실패: {e}", exc_info=True) # 대체 방안으로 FAISS 사용 logger.warning("Milvus 실패로 FAISS로 대체합니다") self.use_milvus = False try: self.vector_store = FAISS.from_documents(documents, self.embeddings) logger.info(f"FAISS로 대체 성공: {len(documents)}개 문서 인덱싱됨") except Exception as faiss_err: logger.error(f"FAISS 대체 실패: {faiss_err}", exc_info=True) raise DocumentIndexError(f"문서 인덱싱 실패 (Milvus 및 FAISS): {str(e)} / {str(faiss_err)}") else: # 기존 컬렉션 로드 try: self.vector_store = self.init_milvus() except VectorStoreInitError as e: logger.error(f"Milvus 컬렉션 로드 실패: {e}") # 대체 방안으로 FAISS 사용 logger.warning("Milvus 실패로 FAISS로 대체합니다") self.use_milvus = False try: self.vector_store = self.init_faiss() except VectorStoreInitError as faiss_err: logger.error(f"FAISS 대체 실패: {faiss_err}", exc_info=True) raise VectorStoreInitError(f"벡터 스토어 초기화 실패 (Milvus 및 FAISS): {str(e)} / {str(faiss_err)}") else: # FAISS 사용 if documents: try: logger.info(f"FAISS 인덱스 생성 중: {len(documents)}개 문서") self.vector_store = FAISS.from_documents(documents, self.embeddings) logger.info("FAISS 인덱스 생성 완료") except Exception as e: logger.error(f"FAISS 인덱스 생성 실패: {e}", exc_info=True) raise DocumentIndexError(f"FAISS 문서 인덱싱 실패: {str(e)}") else: try: self.vector_store = self.init_faiss() except VectorStoreInitError as e: # 이미 로깅됨 raise return self.vector_store def add_documents(self, documents: List[Document]) -> None: """ 벡터 스토어에 문서 추가 Args: documents: 추가할 문서 리스트 """ if not documents: logger.warning("추가할 문서가 없습니다") return try: if self.vector_store is None: logger.info("벡터 스토어가 초기화되지 않았습니다. 새 벡터 스토어를 생성합니다.") self.create_or_load(documents) else: logger.info(f"{len(documents)}개 문서를 기존 벡터 스토어에 추가합니다") self.vector_store.add_documents(documents) logger.info(f"{len(documents)}개 문서 추가 완료") except Exception as e: logger.error(f"문서 추가 실패: {e}", exc_info=True) raise DocumentIndexError(f"벡터 스토어에 문서 추가 실패: {str(e)}") def similarity_search(self, query: str, k: int = 5) -> List[Document]: """ 벡터 유사도 검색 수행 Args: query: 검색 쿼리 k: 반환할 결과 수 Returns: 유사도가 높은 문서 리스트 """ if not query or not query.strip(): logger.warning("빈 쿼리로 검색 시도") return [] if self.vector_store is None: logger.error("벡터 스토어가 초기화되지 않았습니다") raise VectorSearchError("벡터 스토어가 초기화되지 않았습니다") try: logger.info(f"검색 쿼리 실행: '{query[:50]}{'...' if len(query) > 50 else ''}', 상위 {k}개 결과 요청") results = self.vector_store.similarity_search(query, k=k) logger.info(f"검색 완료: {len(results)}개 결과 찾음") return results except Exception as e: logger.error(f"검색 중 오류 발생: {e}", exc_info=True) raise VectorSearchError(f"벡터 검색 실패: {str(e)}") def save_local(self, path: str = "faiss_index") -> bool: """ FAISS 인덱스 로컬 저장 (Milvus 사용 안 할 경우) Args: path: 저장 경로 Returns: 저장 성공 여부 """ if self.vector_store is None: logger.error("저장할 벡터 스토어가 초기화되지 않았습니다") raise PersistenceError("저장할 벡터 스토어가 초기화되지 않았습니다") # FAISS만 로컬 저장 가능 if not self.use_milvus: try: # 저장 디렉토리가 존재하는지 확인 os.makedirs(os.path.dirname(path) if os.path.dirname(path) else path, exist_ok=True) self.vector_store.save_local(path) logger.info(f"FAISS 인덱스 로컬 저장 완료: {path}") return True except Exception as e: logger.error(f"FAISS 인덱스 저장 실패: {e}", exc_info=True) raise PersistenceError(f"벡터 인덱스 저장 실패: {str(e)}") else: logger.info("Milvus는 로컬 저장이 필요하지 않습니다") return True def load_local(self, path: str = "faiss_index") -> bool: """ FAISS 인덱스 로컬 로드 (Milvus 사용 안 할 경우) Args: path: 로드할 인덱스 경로 Returns: 로드 성공 여부 """ if self.use_milvus: logger.info("Milvus 사용 중이므로 로컬 로드를 건너뜁니다") try: # Milvus 연결 확인 self.vector_store = self.init_milvus() return True except Exception as e: logger.error(f"Milvus 연결 실패, FAISS로 대체: {e}") self.use_milvus = False # FAISS로 계속 진행 if not os.path.exists(path): logger.warning(f"인덱스 경로가 존재하지 않음: {path}") raise FileNotFoundError(f"벡터 인덱스 경로가 존재하지 않음: {path}") try: logger.info(f"FAISS 인덱스 로드 중: {path}") # 역직렬화 허용 옵션 추가 (보안 경고 확인 필요) self.vector_store = FAISS.load_local( path, self.embeddings, allow_dangerous_deserialization=True # 역직렬화 허용 ) logger.info(f"FAISS 인덱스 로드 완료: {path}") return True except FileNotFoundError as e: logger.error(f"FAISS 인덱스 파일을 찾을 수 없음: {e}") raise PersistenceError(f"벡터 인덱스 파일을 찾을 수 없음: {str(e)}") except Exception as e: logger.error(f"FAISS 인덱스 로드 실패: {e}", exc_info=True) # 오류 세부 정보 출력 import traceback logger.error(f"상세 오류: {traceback.format_exc()}") # 새 인덱스 초기화 logger.warning("인덱스 로드 실패로 새 FAISS 인덱스 초기화") self.vector_store = self.init_faiss() return False