""" DeepSeek API를 활용한 커스텀 RAG 체인 구현 """ import os import logging import time from typing import List, Dict, Any, Optional, Tuple from langchain.schema import Document from langchain.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # DeepSeek 커스텀 LLM 임포트 from deepseek_llm import DeepSeekLLM, DeepSeekChat # 설정 가져오기 try: from config import ( DEEPSEEK_API_KEY, DEEPSEEK_MODEL, DEEPSEEK_ENDPOINT, TOP_K_RETRIEVAL, TOP_K_RERANK ) except ImportError: # 설정 모듈을 가져올 수 없는 경우 기본값 설정 DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "") DEEPSEEK_MODEL = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat") DEEPSEEK_ENDPOINT = os.environ.get("DEEPSEEK_ENDPOINT", "https://api.deepseek.com/v1/chat/completions") TOP_K_RETRIEVAL = int(os.environ.get("TOP_K_RETRIEVAL", "5")) TOP_K_RERANK = int(os.environ.get("TOP_K_RERANK", "3")) # 로깅 설정 logger = logging.getLogger("CustomRAGChain") class CustomRAGChain: """ DeepSeek API를 활용한 커스텀 RAG 체인 """ def __init__(self, vector_store, use_reranker=False): """ RAG 체인 초기화 Args: vector_store: 벡터 스토어 인스턴스 use_reranker: 리랭커 사용 여부 (현재 미지원) """ logger.info("커스텀 RAG 체인 초기화...") self.vector_store = vector_store self.use_reranker = use_reranker # API 키 확인 if not DEEPSEEK_API_KEY: logger.error("DeepSeek API 키가 설정되지 않았습니다.") raise ValueError("DeepSeek API 키가 설정되지 않았습니다.") # DeepSeek LLM 초기화 try: self.llm = DeepSeekLLM( api_key=DEEPSEEK_API_KEY, model=DEEPSEEK_MODEL, endpoint=DEEPSEEK_ENDPOINT, temperature=0.3, max_tokens=1000, request_timeout=120, max_retries=5 ) logger.info(f"DeepSeek LLM 초기화 성공: {DEEPSEEK_MODEL}") except Exception as e: logger.error(f"DeepSeek LLM 초기화 실패: {e}") raise ValueError(f"DeepSeek LLM 초기화 실패: {str(e)}") # 챗 인터페이스 초기화 (대체용) self.chat = DeepSeekChat( api_key=DEEPSEEK_API_KEY, model=DEEPSEEK_MODEL, endpoint=DEEPSEEK_ENDPOINT ) # RAG 프롬프트 템플릿 self.prompt = PromptTemplate.from_template(""" 다음 정보를 기반으로 질문에 정확하게 답변해주세요. 질문: {question} 참고 정보: {context} 참고 정보에 답이 있으면 반드시 그 정보를 기반으로 답변하세요. 참고 정보에 답이 없는 경우에는 일반적인 지식을 활용하여 답변할 수 있지만, "제공된 문서에는 이 정보가 없으나, 일반적으로는..." 식으로 시작하세요. 답변은 정확하고 간결하게 제공하되, 가능한 참고 정보에서 근거를 찾아 설명해주세요. 참고 정보의 출처도 함께 알려주세요. """) # RAG 체인 구성 self.chain = ( {"context": self._retrieve, "question": RunnablePassthrough()} | self.prompt | self.llm | StrOutputParser() ) logger.info("커스텀 RAG 체인 초기화 완료") def _retrieve(self, query: str) -> str: """ 쿼리에 대한 관련 문서 검색 및 컨텍스트 구성 Args: query: 사용자 질문 Returns: 검색 결과를 포함한 컨텍스트 문자열 """ if not query or not query.strip(): logger.warning("빈 쿼리로 검색 시도") return "검색 쿼리가 비어있습니다." try: # 벡터 검색 수행 logger.info(f"벡터 검색 수행: '{query[:50]}{'...' if len(query) > 50 else ''}'") docs = self.vector_store.similarity_search(query, k=TOP_K_RETRIEVAL) if not docs: logger.warning("검색 결과가 없습니다") return "관련 문서를 찾을 수 없습니다." # 검색 결과 컨텍스트 구성 context_parts = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "알 수 없는 출처") page = doc.metadata.get("page", "") source_info = f"{source}" if page: source_info += f" (페이지: {page})" context_parts.append(f"[참고자료 {i}] - 출처: {source_info}\n{doc.page_content}\n") context = "\n".join(context_parts) # 컨텍스트 길이 제한 (토큰 수 제한) if len(context) > 6000: logger.warning(f"컨텍스트가 너무 깁니다 ({len(context)} 문자). 제한합니다.") context = context[:2500] + "\n...(중략)...\n" + context[-2500:] logger.info(f"컨텍스트 생성 완료: {len(context_parts)}개 문서, {len(context)} 문자") return context except Exception as e: logger.error(f"검색 중 오류: {e}") return f"검색 중 오류 발생: {str(e)}" def run(self, query: str) -> str: """ 사용자 쿼리에 대한 RAG 파이프라인 실행 Args: query: 사용자 질문 Returns: 모델 응답 문자열 """ if not query or not query.strip(): logger.warning("빈 쿼리로 실행 시도") return "질문이 비어있습니다. 질문을 입력해 주세요." try: logger.info(f"RAG 체인 실행: '{query[:50]}{'...' if len(query) > 50 else ''}'") start_time = time.time() # 벡터 검색 실행 context = self._retrieve(query) # 직접 LLM 호출 (체인 사용) try: response = self.chain.invoke(query) logger.info(f"LangChain 체인 호출 성공") except Exception as chain_error: logger.error(f"체인 호출 실패: {chain_error}, 대체 방식 시도") # 대체 방식: 직접 채팅 API 호출 try: prompt = self.prompt.format(question=query, context=context) response = self.chat.generate([{"role": "user", "content": prompt}]) logger.info("대체 채팅 API 호출 성공") except Exception as chat_error: logger.error(f"대체 채팅 API 호출 실패: {chat_error}") # 미리 정의된 응답으로 폴백 predefined_answers = { "대한민국의 수도": "대한민국의 수도는 서울입니다.", "수도": "대한민국의 수도는 서울입니다.", "누구야": "저는 RAG 기반 질의응답 시스템입니다. 문서를 검색하고 관련 정보를 찾아드립니다.", "안녕": "안녕하세요! 무엇을 도와드릴까요?", "뭐해": "사용자의 질문에 답변하기 위해 문서를 검색하고 있습니다. 무엇을 알려드릴까요?" } # 질문에 맞는 미리 정의된 응답이 있는지 확인 for key, answer in predefined_answers.items(): if key in query.lower(): response = answer logger.info(f"미리 정의된 응답 제공: {key}") break else: # 검색 결과만 표시 response = f""" API 연결 오류로 인해 검색 결과만 표시합니다. 질문: {query} 검색된 관련 문서: {context} [참고] API 연결 문제로 인해 자동 요약이 제공되지 않습니다. 다시 시도하거나 다른 질문을 해보세요. """ logger.info("검색 결과만 표시") end_time = time.time() logger.info(f"RAG 체인 실행 완료: {end_time - start_time:.2f}초") return response except Exception as e: logger.error(f"RAG 체인 실행 중 오류: {e}") return f"질문 처리 중 오류가 발생했습니다: {str(e)}"