""" 음성인식 기능이 추가된 RAG 챗봇 앱 """ import os import time import tempfile from typing import List, Dict, Tuple, Any, Optional import hashlib import pickle import json # 기존 임포트 from config import ( PDF_DIRECTORY, CHUNK_SIZE, CHUNK_OVERLAP, LLM_MODEL, STT_LANGUAGE, IS_HUGGINGFACE, OPENAI_API_KEY, USE_OPENAI ) from optimized_document_processor import OptimizedDocumentProcessor from vector_store import VectorStore from langchain.schema import Document # 클로바 STT 모듈 임포트 from clova_stt import ClovaSTT # 안전한 임포트 try: print("RAG 체인 모듈 로드 시도...") from rag_chain import RAGChain # RAGChain 클래스가 제대로 임포트되었는지 확인 RAG_CHAIN_AVAILABLE = True print("외부 RAG 체인 모듈 로드 성공") except Exception as e: print(f"외부 RAG 체인 로드 실패: {e}") print("내장 RAG 체인을 사용합니다.") # 내장 RAG 체인 구현 사용 RAG_CHAIN_AVAILABLE = True # 필요한 langchain 임포트 try: from langchain_openai import ChatOpenAI from langchain_community.chat_models import ChatOllama from langchain.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # SimpleRAGChain 내장 클래스 정의 class RAGChain: """앱에 내장된 간단한 RAG 체인""" def __init__(self, vector_store): print("내장 RAG 체인 초기화 중...") self.vector_store = vector_store # 환경 설정 임포트 from config import OPENAI_API_KEY, LLM_MODEL, USE_OPENAI, TOP_K_RETRIEVAL, IS_HUGGINGFACE # OLLAMA_HOST는 HuggingFace 환경에서는 정의되지 않으므로 조건부 가져오기 OLLAMA_HOST = "http://localhost:11434" # 기본값 if not IS_HUGGINGFACE and not USE_OPENAI: try: from config import OLLAMA_HOST print(f"Ollama 호스트: {OLLAMA_HOST}") except ImportError: print("OLLAMA_HOST 설정을 찾을 수 없어 기본값 사용") try: # LLM 초기화 if USE_OPENAI or IS_HUGGINGFACE: self.llm = ChatOpenAI( model_name=LLM_MODEL, temperature=0.2, api_key=OPENAI_API_KEY, ) print(f"OpenAI 모델 초기화: {LLM_MODEL}") else: try: self.llm = ChatOllama( model=LLM_MODEL, temperature=0.2, base_url=OLLAMA_HOST, ) print(f"Ollama 모델 초기화: {LLM_MODEL}") except Exception as e: print(f"Ollama 초기화 실패: {e}, OpenAI 모델로 대체") self.llm = ChatOpenAI( model_name="gpt-3.5-turbo", temperature=0.2, api_key=OPENAI_API_KEY, ) # 프롬프트 템플릿 template = """ 다음 정보를 기반으로 질문에 정확하게 답변해주세요. 질문: {question} 참고 정보: {context} 참고 정보에 답이 없는 경우 "제공된 문서에서 해당 정보를 찾을 수 없습니다."라고 답변하세요. 답변은 정확하고 간결하게 제공하되, 참고 정보에서 근거를 찾아 설명해주세요. 참고 정보의 출처도 함께 알려주세요. """ self.prompt = PromptTemplate.from_template(template) # 체인 구성 self.chain = ( {"context": self._retrieve, "question": RunnablePassthrough()} | self.prompt | self.llm | StrOutputParser() ) print("내장 RAG 체인 초기화 완료") except Exception as e: print(f"LLM 초기화 실패: {e}") import traceback traceback.print_exc() raise def _retrieve(self, query): """문서 검색""" try: from config import TOP_K_RETRIEVAL docs = self.vector_store.similarity_search(query, k=TOP_K_RETRIEVAL) # 검색 결과 컨텍스트 구성 context_parts = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "알 수 없는 출처") page = doc.metadata.get("page", "") source_info = f"{source}" if page: source_info += f" (페이지: {page})" context_parts.append(f"[참고자료 {i}] - 출처: {source_info}\n{doc.page_content}\n") return "\n".join(context_parts) except Exception as e: print(f"검색 중 오류: {e}") import traceback traceback.print_exc() return "문서 검색 중 오류가 발생했습니다." def run(self, query): """쿼리 처리""" try: return self.chain.invoke(query) except Exception as e: print(f"RAG 체인 실행 오류: {e}") import traceback traceback.print_exc() return f"오류 발생: {str(e)}" except Exception as inner_e: print(f"내장 RAG 체인 정의 실패: {inner_e}") import traceback traceback.print_exc() RAG_CHAIN_AVAILABLE = False class AutoRAGChatApp: """ documents 폴더의 PDF 파일을 자동으로 처리하고 음성인식 기능을 제공하는 RAG 챗봇 """ def __init__(self): """ RAG 챗봇 애플리케이션 초기화 """ print("=" * 50) print("음성인식 RAG 챗봇 애플리케이션 초기화 시작") print("=" * 50) # 데이터 디렉토리 정의 self.pdf_directory = PDF_DIRECTORY self.cache_directory = "cached_data" self.index_file = os.path.join(self.cache_directory, "file_index.json") self.chunks_dir = os.path.join(self.cache_directory, "chunks") self.vector_index_dir = os.path.join(self.cache_directory, "vector_index") # 디렉토리 생성 os.makedirs(self.pdf_directory, exist_ok=True) os.makedirs(self.cache_directory, exist_ok=True) os.makedirs(self.chunks_dir, exist_ok=True) os.makedirs(self.vector_index_dir, exist_ok=True) print(f"PDF 문서 디렉토리: '{self.pdf_directory}'") print(f"캐시 디렉토리: '{self.cache_directory}'") # 컴포넌트 초기화 self.document_processor = OptimizedDocumentProcessor( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) # 벡터 저장소 초기화 self.vector_store = VectorStore(use_milvus=False) # 문서 인덱스 로드 self.file_index = self._load_file_index() # 기본 변수 초기화 self.documents = [] self.processed_files = [] self.is_initialized = False self.rag_chain = None # 클로바 STT 클라이언트 초기화 self.stt_client = ClovaSTT() print("음성인식(STT) 기능이 초기화되었습니다.") # RAG 체인 사용 가능성 확인 print(f"RAG 체인 사용 가능: {RAG_CHAIN_AVAILABLE}") # 시작 시 자동으로 문서 로드 및 처리 print("문서 자동 로드 및 처리 시작...") result = self.auto_process_documents() print(f"초기화 완료 상태: {self.is_initialized}") print("=" * 50) def _fallback_response(self, query: str) -> str: """ RAG 체인 초기화 실패 시 기본 응답 생성 Args: query: 사용자 질문 Returns: 기본 응답 텍스트 """ try: # 벡터 검색이라도 실행 if self.vector_store and self.vector_store.vector_store: try: docs = self.vector_store.similarity_search(query, k=3) if docs: context = "\n\n".join([doc.page_content for doc in docs]) response = f""" 질문에 대한 응답을 생성할 수 없습니다. RAG 체인이 초기화되지 않았습니다. 그러나 문서에서 관련 정보를 찾았습니다: {context} RAG 체인 초기화 문제를 해결하려면 로그를 확인하세요. """ return response.strip() except Exception as e: print(f"벡터 검색 실패: {e}") # 기본 응답 return "죄송합니다. RAG 체인이 초기화되지 않아 질문에 응답할 수 없습니다. 기술적인 문제를 해결 중입니다." except Exception as e: print(f"기본 응답 생성 실패: {e}") return "시스템 오류가 발생했습니다. 관리자에게 문의하세요." def _process_pdf_file(self, file_path: str) -> List[Document]: """ PDF 파일 처리 - docling 실패 시 PyPDFLoader 사용 Args: file_path: 처리할 PDF 파일 경로 Returns: 처리된 문서 청크 리스트 """ try: print(f"docling으로 처리 시도: {file_path}") # docling 사용 시도 try: # 10초 타임아웃 설정 (옵션) import signal def timeout_handler(signum, frame): raise TimeoutError("docling 처리 시간 초과") # 리눅스/맥에서만 작동 (윈도우에서는 무시됨) try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(60) # 60초 타임아웃 except: pass # docling으로 처리 시도 chunks = self.document_processor.process_pdf(file_path, use_docling=True) # 타임아웃 취소 try: signal.alarm(0) except: pass return chunks except Exception as e: # docling 오류 확인 error_str = str(e) if "Invalid code point" in error_str or "RuntimeError" in error_str: print(f"docling 처리 오류 (코드 포인트 문제): {error_str}") print("PyPDFLoader로 대체합니다.") else: print(f"docling 처리 오류: {error_str}") print("PyPDFLoader로 대체합니다.") # PyPDFLoader로 대체 try: return self.document_processor.process_pdf(file_path, use_docling=False) except Exception as inner_e: print(f"PyPDFLoader 처리 오류: {inner_e}") raise # 두 방법 모두 실패하면 예외 발생 except Exception as e: print(f"PDF 처리 중 심각한 오류: {e}") # 빈 청크라도 반환하여 전체 처리가 중단되지 않도록 함 return [] def _load_file_index(self) -> Dict[str, Dict[str, Any]]: """ 파일 인덱스 로드 Returns: 파일 경로 -> 메타데이터 매핑 """ if os.path.exists(self.index_file): try: with open(self.index_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"인덱스 파일 로드 실패: {e}") return {} return {} def _save_file_index(self) -> None: """ 파일 인덱스 저장 """ with open(self.index_file, 'w', encoding='utf-8') as f: json.dump(self.file_index, f, ensure_ascii=False, indent=2) def _calculate_file_hash(self, file_path: str) -> str: """ 파일 해시 계산 Args: file_path: 파일 경로 Returns: MD5 해시값 """ hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read(65536) while len(buf) > 0: hasher.update(buf) buf = f.read(65536) return hasher.hexdigest() def _is_file_processed(self, file_path: str) -> bool: """ 파일이 이미 처리되었고 변경되지 않았는지 확인 Args: file_path: 파일 경로 Returns: 처리 여부 """ if file_path not in self.file_index: return False # 현재 해시값 계산 current_hash = self._calculate_file_hash(file_path) # 저장된 해시값과 비교 if self.file_index[file_path]['hash'] != current_hash: print(f"파일 변경 감지: {file_path}") return False # 청크 파일 존재 확인 chunks_path = self.file_index[file_path]['chunks_path'] if not os.path.exists(chunks_path): return False return True def _get_chunks_path(self, file_hash: str) -> str: """ 청크 파일 경로 생성 Args: file_hash: 파일 해시값 Returns: 청크 파일 경로 """ return os.path.join(self.chunks_dir, f"{file_hash}.pkl") def _save_chunks(self, file_path: str, chunks: List[Document]) -> None: """ 청크 데이터 저장 Args: file_path: 원본 파일 경로 chunks: 문서 청크 리스트 """ # 해시 계산 file_hash = self._calculate_file_hash(file_path) # 청크 파일 경로 chunks_path = self._get_chunks_path(file_hash) # 청크 데이터 저장 with open(chunks_path, 'wb') as f: pickle.dump(chunks, f) # 인덱스 업데이트 self.file_index[file_path] = { 'hash': file_hash, 'chunks_path': chunks_path, 'last_processed': time.time(), 'chunks_count': len(chunks) } # 인덱스 저장 self._save_file_index() print(f"청크 저장 완료: {file_path} ({len(chunks)}개 청크)") def _load_chunks(self, file_path: str) -> List[Document]: """ 저장된 청크 데이터 로드 Args: file_path: 파일 경로 Returns: 문서 청크 리스트 """ chunks_path = self.file_index[file_path]['chunks_path'] with open(chunks_path, 'rb') as f: chunks = pickle.load(f) print(f"청크 로드 완료: {file_path} ({len(chunks)}개 청크)") return chunks def auto_process_documents(self) -> str: """ documents 폴더의 PDF 파일 자동 처리 Returns: 처리 결과 메시지 """ try: start_time = time.time() # PDF 파일 목록 수집 pdf_files = [] for filename in os.listdir(self.pdf_directory): if filename.lower().endswith('.pdf'): pdf_files.append(os.path.join(self.pdf_directory, filename)) if not pdf_files: print(f"'{self.pdf_directory}' 폴더에 PDF 파일이 없습니다.") return f"'{self.pdf_directory}' 폴더에 PDF 파일이 없습니다." print(f"발견된 PDF 파일: {len(pdf_files)}개") # 폴더 내 PDF 파일 처리 new_files = [] updated_files = [] cached_files = [] failed_files = [] all_chunks = [] for file_path in pdf_files: if self._is_file_processed(file_path): # 캐시에서 청크 로드 chunks = self._load_chunks(file_path) all_chunks.extend(chunks) cached_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: # 새 파일 또는 변경된 파일 처리 print(f"처리 중: {file_path}") try: # 개선된 PDF 처리 메서드 사용 chunks = self._process_pdf_file(file_path) if chunks: # 청크가 있는 경우에만 저장 # 청크 저장 self._save_chunks(file_path, chunks) all_chunks.extend(chunks) if file_path in self.file_index: updated_files.append(file_path) else: new_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: print(f"'{file_path}' 처리 실패: 추출된 청크 없음") failed_files.append(file_path) except Exception as e: print(f"'{file_path}' 처리 중 오류: {e}") failed_files.append(file_path) # 모든 청크 저장 self.documents = all_chunks processing_time = time.time() - start_time print(f"문서 처리 완료: {len(all_chunks)}개 청크, {processing_time:.2f}초") # 벡터 인덱스 저장 경로 확인 if os.path.exists(self.vector_index_dir) and any(os.listdir(self.vector_index_dir)): # 기존 벡터 인덱스 로드 try: print("저장된 벡터 인덱스 로드 중...") vector_store_loaded = self.vector_store.load_local(self.vector_index_dir) # 인덱스 로드 성공 확인 if self.vector_store.vector_store is not None: # 새 문서나 변경된 문서가 있으면 인덱스 업데이트 if new_files or updated_files: print("벡터 인덱스 업데이트 중...") self.vector_store.add_documents(self.documents) print("벡터 인덱스 로드 완료") else: print("벡터 인덱스를 로드했으나 유효하지 않음, 새로 생성합니다.") self.vector_store.create_or_load(self.documents) except Exception as e: print(f"벡터 인덱스 로드 실패, 새로 생성합니다: {e}") # 오류 상세 정보 출력 import traceback traceback.print_exc() # 새 벡터 인덱스 생성 self.vector_store.create_or_load(self.documents) else: # 새 벡터 인덱스 생성 print("새 벡터 인덱스 생성 중...") self.vector_store.create_or_load(self.documents) # 벡터 인덱스 저장 if self.vector_store and self.vector_store.vector_store is not None: try: print(f"벡터 인덱스 저장 중: {self.vector_index_dir}") save_result = self.vector_store.save_local(self.vector_index_dir) print(f"벡터 인덱스 저장 완료: {self.vector_index_dir}") except Exception as e: print(f"벡터 인덱스 저장 실패: {e}") # 오류 상세 정보 출력 import traceback traceback.print_exc() else: print("벡터 인덱스가 초기화되지 않아 저장하지 않습니다.") # RAG 체인 초기화 try: print("RAG 체인 초기화 시도...") if RAG_CHAIN_AVAILABLE: print("RAG_CHAIN_AVAILABLE=True, 초기화 진행") # 직접 RAG 체인 클래스를 사용하여 초기화 self.rag_chain = RAGChain(self.vector_store) print("RAG 체인 객체 생성 완료") # 테스트 쿼리 실행하여 체인이 작동하는지 확인 try: test_response = self.rag_chain.run("테스트 쿼리입니다.") print(f"RAG 체인 테스트 성공: 응답 길이 {len(test_response)}") self.is_initialized = True except Exception as test_e: print(f"RAG 체인 테스트 실패: {test_e}") import traceback traceback.print_exc() self.is_initialized = False return f"RAG 체인 테스트 실패: {test_e}" else: print("RAG_CHAIN_AVAILABLE=False, 초기화 불가") self.is_initialized = False return "RAG 체인 모듈을 사용할 수 없습니다." # 최종 상태 확인 및 로그 print(f"RAG 체인 초기화 결과: is_initialized={self.is_initialized}") if self.is_initialized: print("RAG 체인 초기화 성공!") else: print("RAG 체인 초기화 실패했지만 예외는 발생하지 않음.") return "RAG 체인 초기화 실패: 원인 불명" except Exception as e: print(f"RAG 체인 초기화 중 예외 발생: {e}") import traceback traceback.print_exc() self.is_initialized = False return f"RAG 체인 초기화 실패: {e}" total_time = time.time() - start_time status_message = ( f"문서 처리 완료!\n" f"- 처리된 파일: {len(self.processed_files)}개\n" f"- 캐시된 파일: {len(cached_files)}개\n" f"- 새 파일: {len(new_files)}개\n" f"- 업데이트된 파일: {len(updated_files)}개\n" f"- 실패한 파일: {len(failed_files)}개\n" f"- 총 청크 수: {len(self.documents)}개\n" f"- 처리 시간: {total_time:.2f}초\n" f"- RAG 체인 초기화: {'성공' if self.is_initialized else '실패'}\n" f"이제 질문할 준비가 되었습니다!" ) print(status_message) return status_message except Exception as e: self.is_initialized = False error_message = f"문서 처리 중 오류 발생: {str(e)}" print(error_message) import traceback traceback.print_exc() return error_message def reset_cache(self) -> str: """ 캐시 초기화 Returns: 결과 메시지 """ try: # 청크 파일 삭제 for filename in os.listdir(self.chunks_dir): file_path = os.path.join(self.chunks_dir, filename) if os.path.isfile(file_path): os.remove(file_path) # 인덱스 초기화 self.file_index = {} self._save_file_index() # 벡터 인덱스 삭제 for filename in os.listdir(self.vector_index_dir): file_path = os.path.join(self.vector_index_dir, filename) if os.path.isfile(file_path): os.remove(file_path) self.documents = [] self.processed_files = [] self.is_initialized = False return "캐시가 초기화되었습니다. 다음 실행 시 모든 문서가 다시 처리됩니다." except Exception as e: return f"캐시 초기화 중 오류 발생: {str(e)}" def process_query(self, query: str, chat_history: List[List[str]]) -> Tuple[str, List[List[str]]]: """ 사용자 쿼리 처리 Args: query: 사용자 질문 chat_history: 대화 기록 (리스트 형식) Returns: 응답 및 업데이트된 대화 기록 """ if not query: # 비어있는 쿼리 처리 return "", chat_history if not self.is_initialized or self.rag_chain is None: print("텍스트 쿼리 처리: 문서 로드 초기화가 필요합니다.") response = "문서 로드가 초기화되지 않았습니다. 자동 로드를 시도합니다." new_history = list(chat_history) new_history.append([query, response]) # 자동 로드 시도 try: init_result = self.auto_process_documents() print(f"[DEBUG] 자동 로드 후 is_initialized = {self.is_initialized}, RAG 체인 존재 = {self.rag_chain is not None}") if not self.is_initialized or self.rag_chain is None: response = f"문서를 로드할 수 없습니다. 'documents' 폴더에 PDF 파일이 있는지 확인하세요.\n오류 정보: {init_result}" new_history = list(chat_history) new_history.append([query, response]) return "", new_history except Exception as e: response = f"문서 로드 중 오류 발생: {str(e)}" new_history = list(chat_history) new_history.append([query, response]) return "", new_history else: print("텍스트 쿼리 처리: 문서가 이미 로드되어 있습니다.") try: # RAG 체인 실행 및 응답 생성 start_time = time.time() print(f"RAG 체인 실행 중: 쿼리 = '{query}'") if self.is_initialized and self.rag_chain is not None: response = self.rag_chain.run(query) else: print("RAG 체인이 초기화되지 않음: 기본 응답 사용") response = self._fallback_response(query) end_time = time.time() query_time = end_time - start_time print(f"쿼리 처리 시간: {query_time:.2f}초") print(f"응답: {response[:100]}..." if len(response) > 100 else f"응답: {response}") # 메시지 형식에 맞게 추가 new_history = list(chat_history) new_history.append([query, response]) return "", new_history except Exception as e: error_msg = f"오류 발생: {str(e)}" print(f"RAG 체인 실행 중 오류: {error_msg}") import traceback traceback.print_exc() new_history = list(chat_history) new_history.append([query, error_msg]) return "", new_history def process_voice_query(self, audio, chat_history: List[List[str]]) -> List[List[str]]: """ 음성 쿼리 처리 Args: audio: 녹음된 오디오 데이터 (numpy 배열: (샘플, 채널)) chat_history: 대화 기록 Returns: 업데이트된 대화 기록 """ if audio is None: return chat_history try: import numpy as np import scipy.io.wavfile as wav # numpy 배열을 WAV 파일로 저장 with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: temp_path = temp_file.name # 샘플링 레이트와 오디오 데이터를 WAV 파일로 저장 sr, data = audio # 16비트 PCM 형식으로 변환 wav.write(temp_path, sr, data.astype(np.int16)) print(f"[STT] 임시 오디오 파일 생성: {temp_path}") # config.py에서 설정한 언어 코드로 STT 실행 result = self.stt_client.recognize_file(temp_path, language=STT_LANGUAGE) # 임시 파일 삭제 try: os.unlink(temp_path) print("[STT] 임시 오디오 파일 삭제됨") except Exception as e: print(f"[STT] 임시 파일 삭제 실패: {e}") # STT 결과 처리 if "error" in result: error_msg = f"음성인식 오류: {result.get('error')}" print(f"[STT] {error_msg}") new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) return new_history # 인식된 텍스트 추출 recognized_text = result.get("text", "") if not recognized_text: error_msg = "음성을 인식할 수 없습니다. 다시 시도해주세요." print("[STT] 인식된 텍스트 없음") new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) return new_history print(f"[STT] 인식된 텍스트: {recognized_text}") # 인식된 텍스트로 쿼리 처리 (음성 메시지 접두어 추가) query = f"🎤 {recognized_text}" print(f"[DEBUG] is_initialized = {self.is_initialized}, RAG 체인 존재 = {self.rag_chain is not None}") # RAG 체인 실행 및 응답 생성 if not self.is_initialized or self.rag_chain is None: print("음성 쿼리 처리: 문서 로드 초기화가 필요합니다.") response = "문서 로드가 초기화되지 않았습니다. 자동 로드를 시도합니다." new_history = list(chat_history) new_history.append([query, response]) # 자동 로드 시도 try: init_result = self.auto_process_documents() print(f"[DEBUG] 자동 로드 후 is_initialized = {self.is_initialized}, RAG 체인 존재 = {self.rag_chain is not None}") if not self.is_initialized or self.rag_chain is None: response = f"문서를 로드할 수 없습니다. 'documents' 폴더에 PDF 파일이 있는지 확인하세요.\n오류 정보: {init_result}" new_history = list(chat_history) new_history.append([query, response]) return new_history except Exception as e: response = f"문서 로드 중 오류 발생: {str(e)}" new_history = list(chat_history) new_history.append([query, response]) return new_history else: print("음성 쿼리 처리: 문서가 이미 로드되어 있습니다.") try: # RAG 체인 실행 및 응답 생성 start_time = time.time() print(f"RAG 체인 실행 중: 쿼리 = '{query}'") if self.is_initialized and self.rag_chain is not None: response = self.rag_chain.run(query) else: print("RAG 체인이 초기화되지 않음: 기본 응답 사용") response = self._fallback_response(query) end_time = time.time() query_time = end_time - start_time print(f"쿼리 처리 시간: {query_time:.2f}초") print(f"응답: {response[:100]}..." if len(response) > 100 else f"응답: {response}") # 메시지 형식에 맞게 추가 new_history = list(chat_history) # 기존 리스트를 복사 new_history.append([query, response]) # 리스트 형식으로 추가 return new_history except Exception as e: error_msg = f"오류 발생: {str(e)}" print(f"RAG 체인 실행 중 오류: {error_msg}") import traceback traceback.print_exc() # 메시지 형식에 맞게 추가 new_history = list(chat_history) # 기존 리스트를 복사 new_history.append([query, error_msg]) # 리스트 형식으로 추가 return new_history except Exception as e: error_msg = f"음성 처리 중 오류 발생: {str(e)}" print(f"[STT] {error_msg}") import traceback traceback.print_exc() new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) return new_history def launch_app(self) -> None: """ 음성인식 기능이 추가된 Gradio 앱 실행 (자동 음성처리 개선) """ import gradio as gr import time # 음성 인식 후 자동 처리 함수 def process_audio_auto(audio, chat_history): """녹음 완료 시 자동으로 STT 처리 후 질의 처리""" if audio is None: return chat_history, gr.update(interactive=True), "녹음 후 자동으로 처리됩니다", "" # 처리 중 상태 메시지 processing_msg = "음성 처리 중..." try: import numpy as np import scipy.io.wavfile as wav # 상태 업데이트 yield chat_history, gr.update(interactive=False), processing_msg, "음성을 텍스트로 변환 중..." # numpy 배열을 WAV 파일로 저장 with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: temp_path = temp_file.name # 샘플링 레이트와 오디오 데이터를 WAV 파일로 저장 sr, data = audio # 16비트 PCM 형식으로 변환 wav.write(temp_path, sr, data.astype(np.int16)) print(f"[STT] 임시 오디오 파일 생성: {temp_path}") # config.py에서 설정한 언어 코드로 STT 실행 result = self.stt_client.recognize_file(temp_path, language=STT_LANGUAGE) # 임시 파일 삭제 try: os.unlink(temp_path) print("[STT] 임시 오디오 파일 삭제됨") except Exception as e: print(f"[STT] 임시 파일 삭제 실패: {e}") # STT 결과 처리 if "error" in result: error_msg = f"음성인식 오류: {result.get('error')}" print(f"[STT] {error_msg}") new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) yield new_history, gr.update(interactive=True), error_msg, "" return # 인식된 텍스트 추출 recognized_text = result.get("text", "") if not recognized_text: error_msg = "음성을 인식할 수 없습니다. 다시 시도해주세요." print("[STT] 인식된 텍스트 없음") new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) yield new_history, gr.update(interactive=True), error_msg, "" return print(f"[STT] 인식된 텍스트: {recognized_text}") # 상태 업데이트 yield chat_history, gr.update( interactive=False), f"인식된 텍스트: {recognized_text}\n\n응답 생성 중...", "응답 생성 중..." # 인식된 텍스트로 쿼리 처리 (음성 메시지 접두어 추가) query = f"🎤 {recognized_text}" print(f"[DEBUG] is_initialized = {self.is_initialized}, RAG 체인 존재 = {self.rag_chain is not None}") # RAG 체인 실행 및 응답 생성 if not self.is_initialized or self.rag_chain is None: print("음성 쿼리 처리: 문서 로드 초기화가 필요합니다.") response = "문서 로드가 초기화되지 않았습니다. 자동 로드를 시도합니다." new_history = list(chat_history) new_history.append([query, response]) # 자동 로드 시도 try: init_result = self.auto_process_documents() print( f"[DEBUG] 자동 로드 후 is_initialized = {self.is_initialized}, RAG 체인 존재 = {self.rag_chain is not None}") if not self.is_initialized or self.rag_chain is None: response = f"문서를 로드할 수 없습니다. 'documents' 폴더에 PDF 파일이 있는지 확인하세요.\n오류 정보: {init_result}" new_history = list(chat_history) new_history.append([query, response]) yield new_history, gr.update(interactive=True), response, "" return except Exception as e: response = f"문서 로드 중 오류 발생: {str(e)}" new_history = list(chat_history) new_history.append([query, response]) yield new_history, gr.update(interactive=True), response, "" return else: print("음성 쿼리 처리: 문서가 이미 로드되어 있습니다.") try: # RAG 체인 실행 및 응답 생성 start_time = time.time() print(f"RAG 체인 실행 중: 쿼리 = '{query}'") if self.is_initialized and self.rag_chain is not None: response = self.rag_chain.run(query) else: print("RAG 체인이 초기화되지 않음: 기본 응답 사용") response = self._fallback_response(query) end_time = time.time() query_time = end_time - start_time print(f"쿼리 처리 시간: {query_time:.2f}초") print(f"응답: {response[:100]}..." if len(response) > 100 else f"응답: {response}") # 메시지 형식에 맞게 추가 new_history = list(chat_history) # 기존 리스트를 복사 new_history.append([query, response]) # 리스트 형식으로 추가 # 최종 상태 업데이트 yield new_history, gr.update(interactive=True), f"처리 완료: {recognized_text}", "" except Exception as e: error_msg = f"오류 발생: {str(e)}" print(f"RAG 체인 실행 중 오류: {error_msg}") import traceback traceback.print_exc() # 메시지 형식에 맞게 추가 new_history = list(chat_history) # 기존 리스트를 복사 new_history.append([query, error_msg]) # 리스트 형식으로 추가 yield new_history, gr.update(interactive=True), error_msg, "" except Exception as e: error_msg = f"음성 처리 중 오류 발생: {str(e)}" print(f"[STT] {error_msg}") import traceback traceback.print_exc() new_history = list(chat_history) new_history.append(["음성 메시지", error_msg]) yield new_history, gr.update(interactive=True), error_msg, "" with gr.Blocks(title="음성인식 기능이 추가된 PDF 문서 기반 RAG 챗봇") as app: gr.Markdown("# 음성인식 기능이 추가된 PDF 문서 기반 RAG 챗봇") gr.Markdown(f"* 사용 중인 LLM 모델: **{LLM_MODEL}**") gr.Markdown(f"* PDF 문서 폴더: **{self.pdf_directory}**") gr.Markdown("* 네이버 클로바 음성인식 API 통합") with gr.Row(): with gr.Column(scale=1): # 문서 상태 섹션 status_box = gr.Textbox( label="문서 처리 상태", value=f"처리된 문서 ({len(self.processed_files)}개): {', '.join(self.processed_files)}", lines=5, interactive=False ) # 캐시 관리 버튼 refresh_button = gr.Button("문서 새로 읽기", variant="primary") reset_button = gr.Button("캐시 초기화", variant="stop") # 처리된 파일 정보 with gr.Accordion("캐시 세부 정보", open=False): file_info = "" for file_path, info in self.file_index.items(): file_info += f"- {os.path.basename(file_path)}: {info['chunks_count']}개 청크\n" cache_info = gr.Textbox( label="캐시된 파일 정보", value=file_info or "캐시된 파일이 없습니다.", lines=5, interactive=False ) with gr.Column(scale=2): # 채팅 인터페이스 chatbot = gr.Chatbot( label="대화 내용", height=500, show_copy_button=True ) with gr.Tabs() as input_tabs: # 텍스트 입력 탭 with gr.Tab("텍스트 입력"): # 텍스트 입력과 전송 버튼을 수평으로 배치 with gr.Row(): query_box = gr.Textbox( label="질문", placeholder="처리된 문서 내용에 대해 질문하세요...", lines=2, scale=4 ) submit_btn = gr.Button("전송", variant="primary", scale=1) # 음성 입력 탭 (자동 처리 방식으로 개선) with gr.Tab("음성 입력"): # 상태 표시 processing_status = gr.Textbox( label="처리 상태", placeholder="녹음 후 자동으로 처리됩니다", lines=3, interactive=False, visible=True ) # 프로그레스 바 추가 progress_bar = gr.Textbox( label="진행 상황", value="", visible=True ) # 자동 처리 오디오 컴포넌트 audio_input_auto = gr.Audio( label="마이크 입력 (녹음 후 자동 처리)", sources=["microphone"], type="numpy", format="wav", streaming=False ) # 대화 초기화 버튼 clear_chat_button = gr.Button("대화 초기화") # 이벤트 핸들러 설정 refresh_button.click( fn=self.auto_process_documents, inputs=[], outputs=[status_box] ) reset_button.click( fn=lambda: (self.reset_cache(), self.auto_process_documents()), inputs=[], outputs=[status_box] ) # 텍스트 전송 버튼 클릭 이벤트 submit_btn.click( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) # 엔터키 입력 이벤트 query_box.submit( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) # 자동 음성처리 이벤트 (녹음이 끝나면 자동으로 처리 시작) audio_input_auto.stop_recording( fn=process_audio_auto, inputs=[audio_input_auto, chatbot], outputs=[chatbot, audio_input_auto, processing_status, progress_bar] ) # 대화 초기화 버튼 clear_chat_button.click( fn=lambda: [], # 빈 리스트 반환 outputs=[chatbot] ) # 앱 실행 app.launch(share=False) if __name__ == "__main__": app = AutoRAGChatApp() app.launch_app()