import re import threading import time import os import logging from datetime import datetime import torch import numpy as np from typing import List, Optional, Tuple, Dict import networkx as nx import gradio as gr import transformers from transformers import ( pipeline, AutoModelForCausalLM, AutoTokenizer, BartForConditionalGeneration, BartTokenizer, BitsAndBytesConfig ) # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ===================== RLRetrievalPolicy ===================== class RLRetrievalPolicy: def __init__(self): self.policy_data = {} self.alpha = 0.5 # 유사도 vs. RL 점수 간 가중치 def update_policy(self, contexts: List[str], reward: float): for ctx in contexts: if ctx not in self.policy_data: self.policy_data[ctx] = 0.0 self.policy_data[ctx] += reward def re_rank(self, candidates: List[Tuple[float, str]]) -> List[str]: reweighted = [] for sim, txt in candidates: rl_score = self.policy_data.get(txt, 0.0) reweighted_score = sim * (1 - self.alpha) + rl_score * self.alpha reweighted.append((reweighted_score, txt)) reweighted.sort(key=lambda x: x[0], reverse=True) return [t for _, t in reweighted] # ===================== GraphMemory ===================== class GraphMemory: def __init__(self): self.graph = nx.DiGraph() # 수학 문제 해결에 도움이 되는 기본 노드 추가 self.add_node("수학", "수학 문제 해결을 위한 일반적인 접근법") self.add_node("대수학", "방정식, 함수, 비례 관계 등을 다루는 수학의 한 분야") self.add_node("기하학", "공간, 도형, 각도 등을 다루는 수학의 한 분야") self.add_node("산술", "기본적인 수 연산, 비율, 백분율 등을 다루는 분야") self.add_node("확률", "사건의 발생 가능성을 측정하는 수학의 한 분야") # 관계 설정 self.add_edge("대수학", "수학") self.add_edge("기하학", "수학") self.add_edge("산술", "수학") self.add_edge("확률", "수학") def add_node(self, node_id: str, text: str = ""): self.graph.add_node(node_id, text=text) def add_edge(self, src: str, dst: str): self.graph.add_edge(src, dst) def get_text_by_node(self, node_id: str) -> str: return self.graph.nodes[node_id].get('text', "") def has_node(self, node_id: str) -> bool: return node_id in self.graph.nodes def search_nodes(self, keyword: str, max_nodes: int = 3) -> List[str]: matches = [] for n in self.graph.nodes(): node_text = self.get_text_by_node(n).lower() n_lower = n.lower() if keyword.lower() in node_text or keyword.lower() in n_lower: score = node_text.count(keyword.lower()) + n_lower.count(keyword.lower()) matches.append((score, n)) matches.sort(key=lambda x: x[0], reverse=True) top_nodes = [m[1] for m in matches[:max_nodes]] return top_nodes def get_connected_context(self, start_node: str, steps: int = 1) -> List[str]: contexts = [] visited = set() queue = [(start_node, 0)] while queue: current, depth = queue.pop(0) if current not in visited: visited.add(current) contexts.append(self.get_text_by_node(current)) if depth < steps: for neighbor in self.graph.successors(current): queue.append((neighbor, depth + 1)) for neighbor in self.graph.predecessors(current): queue.append((neighbor, depth + 1)) return contexts # ===================== SimpleSummarizer ===================== class SimpleSummarizer: def __init__(self, model_name="facebook/bart-large-cnn"): self.model_name = model_name self.model = None self.tokenizer = None def load_summarization_model(self): if self.model is None: try: self.tokenizer = BartTokenizer.from_pretrained(self.model_name) self.model = BartForConditionalGeneration.from_pretrained(self.model_name) if torch.cuda.is_available(): self.model = self.model.cuda() except Exception as e: logger.error(f"Error loading summarization model: {str(e)}") raise def summarize_text(self, text: str, max_length: int = 100) -> str: try: self.load_summarization_model() inputs = self.tokenizer([text], max_length=1024, return_tensors='pt', truncation=True) if torch.cuda.is_available(): inputs = {k: v.cuda() for k, v in inputs.items()} with torch.no_grad(): summary_ids = self.model.generate( inputs["input_ids"], num_beams=4, max_length=max_length, early_stopping=True ) summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True) return summary except Exception as e: logger.error(f"Error in summarization: {str(e)}") return "요약을 생성할 수 없습니다." # ===================== SemanticMemory ===================== class SemanticMemory: def __init__(self, max_entries: int = 4000): self.memories: List[dict] = [] self.max_entries = max_entries self.rl_policy = RLRetrievalPolicy() def add_memory(self, text: str, embedding: torch.Tensor): if len(self.memories) >= self.max_entries: self.memories.pop(0) self.memories.append({ 'text': text, 'embedding': embedding, 'timestamp': time.time() }) def get_candidates(self, query_embedding: torch.Tensor) -> List[Tuple[float, str]]: candidates = [] for mem in self.memories: if mem['embedding'].shape == query_embedding.shape: sim = torch.cosine_similarity( query_embedding.float(), mem['embedding'].float(), dim=-1 ) candidates.append((sim.item(), mem['text'])) candidates.sort(key=lambda x: x[0], reverse=True) return candidates def get_relevant_context(self, query_embedding: torch.Tensor, top_k: int = 3) -> List[str]: candidates = self.get_candidates(query_embedding) re_ranked = self.rl_policy.re_rank(candidates) return re_ranked[:top_k] def update_retrieval_reward(self, texts: List[str], reward: float): self.rl_policy.update_policy(texts, reward) def clear(self): self.memories = [] # ===================== GenericInferenceBuffer ===================== MAX_TOKEN_BUFFER = 1024 class GenericInferenceBuffer: def __init__(self, layer_idx: int, compression_rank: int = 128): self.layer_idx = layer_idx self.key_buffer: Optional[torch.Tensor] = None self.value_buffer: Optional[torch.Tensor] = None self.semantic_context: Optional[torch.Tensor] = None self.last_update: float = 0 self.compression_rank = compression_rank def update_buffer( self, key: torch.Tensor, value: torch.Tensor, semantic_context: Optional[torch.Tensor] = None ): try: if self.key_buffer is None: self.key_buffer = key.detach().clone() self.value_buffer = value.detach().clone() if semantic_context is not None: self.semantic_context = semantic_context.detach().clone() else: self.key_buffer = torch.cat([self.key_buffer, key.detach()], dim=2) self.value_buffer = torch.cat([self.value_buffer, value.detach()], dim=2) if semantic_context is not None and self.semantic_context is not None: self.semantic_context = torch.cat([self.semantic_context, semantic_context.detach()], dim=0) if self.key_buffer.shape[2] > MAX_TOKEN_BUFFER: excess = self.key_buffer.shape[2] - MAX_TOKEN_BUFFER self.key_buffer = self.key_buffer[:, :, excess:, :] self.value_buffer = self.value_buffer[:, :, excess:, :] if self.semantic_context is not None: self.semantic_context = self.semantic_context[excess:, :] self.last_update = time.time() except Exception as e: logger.error(f"Buffer update error in layer {self.layer_idx}: {str(e)}") def compress_buffer_svd(self): if self.key_buffer is None or self.value_buffer is None: return try: k_shape = self.key_buffer.shape v_shape = self.value_buffer.shape k_2d = self.key_buffer.reshape(k_shape[0]*k_shape[1], k_shape[2]*k_shape[3]).float() v_2d = self.value_buffer.reshape(v_shape[0]*v_shape[1], v_shape[2]*v_shape[3]).float() device = k_2d.device k_2d_cpu = k_2d.cpu() v_2d_cpu = v_2d.cpu() U_k, S_k, V_k = torch.linalg.svd(k_2d_cpu, full_matrices=False) U_v, S_v, V_v = torch.linalg.svd(v_2d_cpu, full_matrices=False) rank_k = min(self.compression_rank, S_k.shape[0]) rank_v = min(self.compression_rank, S_v.shape[0]) k_approx = (U_k[:, :rank_k] * S_k[:rank_k]) @ V_k[:rank_k, :] v_approx = (U_v[:, :rank_v] * S_v[:rank_v]) @ V_v[:rank_v, :] k_approx = k_approx.to(device) v_approx = v_approx.to(device) self.key_buffer = k_approx.reshape(k_shape).type(self.key_buffer.dtype) self.value_buffer = v_approx.reshape(v_shape).type(self.value_buffer.dtype) except Exception as e: logger.error(f"SVD compression error in layer {self.layer_idx}: {str(e)}") def get_buffer(self) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]: return self.key_buffer, self.value_buffer, self.semantic_context def clear(self): self.key_buffer = None self.value_buffer = None self.semantic_context = None self.last_update = 0 # ===================== InferenceBufferManager ===================== class InferenceBufferManager: def __init__(self, num_layers: int, hidden_size: int): self.num_layers = num_layers self.hidden_size = hidden_size self.layer_buffers = [ GenericInferenceBuffer(i, compression_rank=128) for i in range(num_layers) ] self.semantic_memory = SemanticMemory() self.graph_memory = GraphMemory() self.summarizer = SimpleSummarizer() self.summarize_threshold = 1500 self.generated_tokens_count = 0 self.compression_interval = 512 self.token_count_since_compress = 0 def _compute_semantic_embedding(self, key: Optional[torch.Tensor], value: Optional[torch.Tensor]) -> torch.Tensor: device = "cuda" if torch.cuda.is_available() else "cpu" if key is None or value is None: return torch.zeros((1, self.hidden_size), dtype=torch.float32, device=device) combined = key * value combined = combined.mean(dim=2) combined = combined.reshape(combined.shape[0], -1) combined = torch.nn.functional.normalize(combined, dim=-1) return combined def update_buffer(self, layer_outputs, current_tokens: List[int], semantic_context: torch.Tensor, tokenizer): try: if hasattr(layer_outputs, 'past_key_values'): for layer_idx, past_kv in enumerate(layer_outputs.past_key_values): if isinstance(past_kv, tuple) and len(past_kv) == 2: key, value = past_kv if key is not None and value is not None: self.layer_buffers[layer_idx].update_buffer( key.detach(), value.detach(), semantic_context ) self.generated_tokens_count += len(current_tokens) self.token_count_since_compress += len(current_tokens) if self.token_count_since_compress >= self.compression_interval: self.compress_all_buffers() self.token_count_since_compress = 0 except Exception as e: logger.error(f"Buffer update error: {str(e)}") def compress_all_buffers(self): for buf in self.layer_buffers: buf.compress_buffer_svd() def finalize_semantic_memory(self, tokenizer, generated_tokens: List[int]): if self.layer_buffers and len(self.layer_buffers) > 0 and self.layer_buffers[-1].key_buffer is not None: text_chunk = tokenizer.decode(generated_tokens, skip_special_tokens=True) key_buffer = self.layer_buffers[-1].key_buffer value_buffer = self.layer_buffers[-1].value_buffer embedding = self._compute_semantic_embedding(key_buffer, value_buffer) self.semantic_memory.add_memory(text_chunk, embedding) def get_relevant_context(self, query_embedding: torch.Tensor, top_k: int = 3) -> List[str]: candidates_sem = self.semantic_memory.get_candidates(query_embedding) # 키워드 추출 (간단한 구현) possible_keywords = ["수학", "대수학", "기하학", "산술", "확률"] text_candidates = [] for kw in possible_keywords: nodes = self.graph_memory.search_nodes(kw) for n in nodes: context_list = self.graph_memory.get_connected_context(n, steps=1) cscore = 1.0 for ctxt in context_list: text_candidates.append((cscore, ctxt)) merged_candidates = candidates_sem + text_candidates re_ranked = self.semantic_memory.rl_policy.re_rank(merged_candidates) return re_ranked[:top_k] def update_retrieval_reward(self, contexts: List[str], reward: float): self.semantic_memory.update_retrieval_reward(contexts, reward) def maybe_summarize_memory(self): if self.generated_tokens_count < self.summarize_threshold: return all_text = "\n".join([m['text'] for m in self.semantic_memory.memories]) if len(all_text) < 300: return summary = self.summarizer.summarize_text(all_text, max_length=120) device = "cuda" if torch.cuda.is_available() else "cpu" summary_embedding = torch.zeros((1, self.hidden_size), dtype=torch.float32, device=device) self.semantic_memory.clear() self.semantic_memory.add_memory(summary, summary_embedding) self.generated_tokens_count = 0 def clear(self): for layer in self.layer_buffers: layer.clear() self.semantic_memory.clear() # ===================== Enhanced ThinkFlow Implementation ===================== # 최종 답변을 감지하기 위한 마커 ANSWER_MARKER = "**답변**" # 단계별 추론을 시작하는 문장들 rethink_prepends = [ "자, 이제 다음을 파악해야 합니다 ", "제 생각에는 ", "잠시만요, 제 생각에는 ", "다음 사항이 맞는지 확인해 보겠습니다 ", "또한 기억해야 할 것은 ", "또 다른 주목할 점은 ", "그리고 저는 다음과 같은 사실도 기억합니다 ", "이제 충분히 이해했다고 생각합니다 ", ] # 최종 답변 생성을 위한 프롬프트 추가 final_answer_prompt = """ 지금까지의 추론 과정을 바탕으로, 원래 질문에 사용된 언어로 답변하겠습니다: {question} 아래는 내가 추론한 결론입니다: {reasoning_conclusion} 위 추론을 기반으로 최종 답변: {ANSWER_MARKER} """ # 수식 표시 문제 해결을 위한 설정 latex_delimiters = [ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, ] def reformat_math(text): """Gradio 구문(Katex)을 사용하도록 MathJax 구분 기호 수정.""" text = re.sub(r"\\\[\s*(.*?)\s*\\\]", r"$$\1$$", text, flags=re.DOTALL) text = re.sub(r"\\\(\s*(.*?)\s*\\\)", r"$\1$", text, flags=re.DOTALL) return text def extract_keywords(text: str) -> List[str]: """텍스트에서 간단한 키워드 추출 함수""" # 간단한 구현 - 실제로는 더 복잡한 NLP 기법을 사용할 수 있음 common_math_keywords = [ "수학", "대수학", "기하학", "산술", "확률", "공식", "방정식", "함수", "적분", "미분", "기하", "삼각형", "원", "각도", "비율", "비례", "평균", "분산", "표준편차" ] keywords = [] for kw in common_math_keywords: if kw in text: keywords.append(kw) return keywords[:5] # 최대 5개 키워드만 반환 def get_embedding_for_text(text: str, hidden_size: int = 768) -> torch.Tensor: """ 텍스트를 위한 임시 임베딩 생성 함수 실제 구현에서는 적절한 언어 모델을 사용해야 함 """ # 임시 구현: 텍스트의 해시 값을 기반으로 한 임베딩 device = "cuda" if torch.cuda.is_available() else "cpu" hash_val = hash(text) np.random.seed(hash_val) # 임의의 임베딩 생성 embedding = np.random.rand(1, hidden_size).astype(np.float32) # 정규화 norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm return torch.tensor(embedding, device=device) def user_input(message, history_original, history_thinking): """사용자 입력을 히스토리에 추가하고 입력 텍스트 상자 비우기""" return "", history_original + [ gr.ChatMessage(role="user", content=message.replace(ANSWER_MARKER, "")) ], history_thinking + [ gr.ChatMessage(role="user", content=message.replace(ANSWER_MARKER, "")) ] def rebuild_messages(history: list): """중간 생각 과정 없이 모델이 사용할 히스토리에서 메시지 재구성""" messages = [] for h in history: if isinstance(h, dict) and not h.get("metadata", {}).get("title", False): messages.append(h) elif ( isinstance(h, gr.ChatMessage) and h.metadata.get("title", None) is None and isinstance(h.content, str) ): messages.append({"role": h.role, "content": h.content}) return messages # 모델과 버퍼 매니저 초기화 함수 def initialize_model_and_manager(model_name): """모델과 버퍼 매니저 초기화 함수""" try: pipe = pipeline( "text-generation", model=model_name, device_map="auto", torch_dtype="auto", ) # 모델 구성에서 레이어 및 은닉 크기 정보 추출 config = pipe.model.config if hasattr(config, "n_layer"): num_layers = config.n_layer elif hasattr(config, "num_layers"): num_layers = config.num_layers elif hasattr(config, "num_hidden_layers"): num_layers = config.num_hidden_layers else: num_layers = 12 # 기본값 if hasattr(config, "n_embd"): hidden_size = config.n_embd elif hasattr(config, "hidden_size"): hidden_size = config.hidden_size else: hidden_size = 768 # 기본값 # 버퍼 매니저 초기화 buffer_manager = InferenceBufferManager(num_layers, hidden_size) return pipe, buffer_manager except Exception as e: logger.error(f"모델 초기화 오류: {str(e)}") raise def bot_original( history: list, max_num_tokens: int, do_sample: bool, temperature: float, pipe=None ): """원본 모델이 질문에 답변하도록 하기 (추론 과정 없이)""" if pipe is None: # 이 부분은 실제 구현에서는 전역 변수나 세션 상태로 관리해야 함 return history # 나중에 스레드에서 토큰을 스트림으로 가져오기 위함 streamer = transformers.TextIteratorStreamer( pipe.tokenizer, skip_special_tokens=True, skip_prompt=True, ) # 보조자 메시지 준비 history.append( gr.ChatMessage( role="assistant", content=str(""), ) ) # 현재 채팅에 표시될 메시지 messages = rebuild_messages(history[:-1]) # 마지막 빈 메시지 제외 # 원본 모델은 추론 없이 바로 답변 t = threading.Thread( target=pipe, args=(messages,), kwargs=dict( max_new_tokens=max_num_tokens, streamer=streamer, do_sample=do_sample, temperature=temperature, ), ) t.start() for token in streamer: history[-1].content += token history[-1].content = reformat_math(history[-1].content) yield history t.join() yield history def bot_thinking_enhanced( history: list, max_num_tokens: int, final_num_tokens: int, do_sample: bool, temperature: float, pipe=None, buffer_manager=None ): """추론 과정을 포함하여 모델이 질문에 답변하도록 하기 - DeepSeek 기능 통합""" if pipe is None or buffer_manager is None: # 이 부분은 실제 구현에서는 전역 변수나 세션 상태로 관리해야 함 return history # 나중에 스레드에서 토큰을 스트림으로 가져오기 위함 streamer = transformers.TextIteratorStreamer( pipe.tokenizer, skip_special_tokens=True, skip_prompt=True, ) # 필요한 경우 추론에 질문을 다시 삽입하기 위함 question = history[-1]["content"] # 쿼리 임베딩 생성 query_embedding = get_embedding_for_text(question, buffer_manager.hidden_size) # 관련 컨텍스트 검색 relevant_contexts = buffer_manager.get_relevant_context(query_embedding, top_k=3) # 키워드 추출 및 그래프 메모리에서 컨텍스트 가져오기 keywords = extract_keywords(question) graph_contexts = [] for keyword in keywords: nodes = buffer_manager.graph_memory.search_nodes(keyword) for node in nodes: contexts = buffer_manager.graph_memory.get_connected_context(node) graph_contexts.extend(contexts) # 모든 컨텍스트 병합 all_contexts = relevant_contexts + graph_contexts all_contexts = list(set(all_contexts)) # 중복 제거 all_contexts = all_contexts[:5] # 최대 5개 컨텍스트로 제한 # 보조자 메시지 준비 history.append( gr.ChatMessage( role="assistant", content=str(""), metadata={"title": "🧠 생각 중...", "status": "pending"}, ) ) # 현재 채팅에 표시될 추론 과정 messages = rebuild_messages(history) # 관련 컨텍스트가 있다면 메시지에 추가 if all_contexts: context_str = "\n\n관련 컨텍스트:\n" + "\n".join(all_contexts) messages[-1]["content"] += context_str history[-1].content += context_str # 전체 추론 과정을 저장할 변수 full_reasoning = "" # 생성된 토큰 추적을 위한 변수 generated_tokens = [] # 추론 단계 실행 for i, prepend in enumerate(rethink_prepends): if i > 0: messages[-1]["content"] += "\n\n" messages[-1]["content"] += prepend.format(question=question) t = threading.Thread( target=pipe, args=(messages,), kwargs=dict( max_new_tokens=max_num_tokens, streamer=streamer, do_sample=do_sample, temperature=temperature, ), ) t.start() # 새 내용으로 히스토리 재구성 history[-1].content += prepend.format(question=question) step_tokens = [] for token in streamer: history[-1].content += token history[-1].content = reformat_math(history[-1].content) step_tokens.append(token) generated_tokens.append(token) yield history t.join() # 각 추론 단계의 결과를 full_reasoning에 저장 full_reasoning = history[-1].content # 추론이 길어지면 중간 요약 생성 if i > 0 and i % 3 == 0 and len(generated_tokens) > 500: try: summary = buffer_manager.summarizer.summarize_text(full_reasoning, max_length=150) summary_text = f"\n\n**중간 요약:**\n{summary}\n\n" history[-1].content += summary_text messages[-1]["content"] += summary_text yield history except Exception as e: logger.error(f"요약 생성 오류: {str(e)}") # KV 캐시 압축 if i > 0 and i % 2 == 0: buffer_manager.compress_all_buffers() # 시맨틱 컨텍스트 업데이트 step_text = "".join(step_tokens) step_embedding = get_embedding_for_text(step_text, buffer_manager.hidden_size) buffer_manager.semantic_memory.add_memory(step_text, step_embedding) # 추론 완료, 이제 최종 답변을 생성 history[-1].metadata = {"title": "💭 사고 과정", "status": "done"} # 추론 과정을 시맨틱 메모리와 그래프 메모리에 저장 full_embedding = get_embedding_for_text(full_reasoning, buffer_manager.hidden_size) buffer_manager.semantic_memory.add_memory(full_reasoning, full_embedding) # 키워드에 대한 그래프 메모리 업데이트 for keyword in keywords: if not buffer_manager.graph_memory.has_node(keyword): buffer_manager.graph_memory.add_node(keyword, f"{keyword}에 관한 개념: 이 주제에 대한 추론을 수행했습니다.") # 관련 노드와 연결 for related_kw in keywords: if related_kw != keyword and buffer_manager.graph_memory.has_node(related_kw): buffer_manager.graph_memory.add_edge(keyword, related_kw) # 추론 과정에서 결론 부분을 추출 (마지막 1-2 문단 정도) reasoning_parts = full_reasoning.split("\n\n") reasoning_conclusion = "\n\n".join(reasoning_parts[-2:]) if len(reasoning_parts) > 2 else full_reasoning # 최종 답변 메시지 추가 history.append(gr.ChatMessage(role="assistant", content="")) # 최종 답변을 위한 메시지 구성 final_messages = rebuild_messages(history[:-1]) # 마지막 빈 메시지 제외 final_prompt = final_answer_prompt.format( question=question, reasoning_conclusion=reasoning_conclusion, ANSWER_MARKER=ANSWER_MARKER ) final_messages[-1]["content"] += final_prompt # 최종 답변 생성 t = threading.Thread( target=pipe, args=(final_messages,), kwargs=dict( max_new_tokens=final_num_tokens, streamer=streamer, do_sample=do_sample, temperature=temperature, ), ) t.start() # 최종 답변 스트리밍 final_tokens = [] for token in streamer: history[-1].content += token history[-1].content = reformat_math(history[-1].content) final_tokens.append(token) yield history t.join() # 최종 답변을 시맨틱 메모리에 저장 final_text = "".join(final_tokens) final_embedding = get_embedding_for_text(final_text, buffer_manager.hidden_size) buffer_manager.semantic_memory.add_memory(final_text, final_embedding) # 주기적 메모리 요약 체크 buffer_manager.maybe_summarize_memory() yield history with gr.Blocks(fill_height=True, title="Enhanced ThinkFlow") as demo: # 제목과 설명 gr.Markdown("# Enhanced ThinkFlow with DeepSeek Features") gr.Markdown("### 시맨틱 메모리, 그래프 메모리, 및 KV 캐시 압축을 통해 향상된 LLM 추론 생성 플랫폼") # 모델 및 버퍼 매니저 초기화 (실제 구현에서는 세션 상태로 관리) model_name = "CohereForAI/c4ai-command-r7b-arabic-02-2025" # 세션 변수 (실제 구현에서는 gr.State() 사용) pipe = None buffer_manager = None current_contexts = [] # 탭 인터페이스 with gr.Tabs() as tabs: # 채팅 탭 with gr.TabItem("통합 추론 인터페이스"): with gr.Row(scale=1): with gr.Column(scale=2): gr.Markdown("## Before (Original)") chatbot_original = gr.Chatbot( scale=1, type="messages", latex_delimiters=latex_delimiters, label="Original Model (No Reasoning)" ) with gr.Column(scale=2): gr.Markdown("## After (Enhanced Thinking)") chatbot_thinking = gr.Chatbot( scale=1, type="messages", latex_delimiters=latex_delimiters, label="Model with Enhanced Reasoning" ) with gr.Row(): # msg 텍스트박스를 먼저 정의 msg = gr.Textbox( submit_btn=True, label="", show_label=False, placeholder="여기에 질문을 입력하세요.", autofocus=True, ) # 피드백 버튼 with gr.Row(): with gr.Column(scale=1): feedback_btn_pos = gr.Button("👍 이 추론이 도움이 되었습니다") with gr.Column(scale=1): feedback_btn_neg = gr.Button("👎 이 추론은 개선이 필요합니다") with gr.Column(scale=1): clear_memory_btn = gr.Button("🧹 메모리 초기화") # 메모리 시각화 탭 with gr.TabItem("메모리 시각화"): gr.Markdown("## 시맨틱 메모리 내용") semantic_memory_display = gr.Textbox( label="현재 시맨틱 메모리 내용", placeholder="아직 메모리가 없습니다.", lines=10, max_lines=20, interactive=False ) gr.Markdown("## 그래프 지식베이스") graph_memory_display = gr.Textbox( label="현재 그래프 메모리 내용", placeholder="아직 그래프 노드가 없습니다.", lines=10, max_lines=20, interactive=False ) # 예제 섹션 - msg 변수 정의 이후에 배치 with gr.Accordion("EXAMPLES", open=False): examples = gr.Examples( examples=[ "[출처: MATH-500)] 처음 100개의 양의 정수 중에서 3, 4, 5로 나누어 떨어지는 수는 몇 개입니까?", "[출처: MATH-500)] 잉크의 땅에서 돈 시스템은 독특합니다. 트링킷 1개는 블링킷 4개와 같고, 블링킷 3개는 드링크 7개와 같습니다. 트링킷에서 드링크 56개의 가치는 얼마입니까?", "[출처: MATH-500)] 에이미, 벤, 크리스의 평균 나이는 6살입니다. 4년 전 크리스는 지금 에이미와 같은 나이였습니다. 4년 후 벤의 나이는 그때 에이미의 나이의 $\\frac{3}{5}$가 될 것입니다. 크리스는 지금 몇 살입니까?", "[출처: MATH-500)] 노란색과 파란색 구슬이 들어 있는 가방이 있습니다. 현재 파란색 구슬과 노란색 구슬의 비율은 4:3입니다. 파란색 구슬 5개를 더하고 노란색 구슬 3개를 제거하면 비율은 7:3이 됩니다. 더 넣기 전에 가방에 파란색 구슬이 몇 개 있었습니까?" ], inputs=msg ) with gr.Accordion("매개변수 조정", open=False): with gr.Row(): with gr.Column(): model_dropdown = gr.Dropdown( ["CohereForAI/c4ai-command-r7b-arabic-02-2025", "meta-llama/Meta-Llama-3-8B-Instruct"], label="모델 선택", value="CohereForAI/c4ai-command-r7b-arabic-02-2025" ) num_tokens = gr.Slider( 50, 4000, 2000, step=1, label="추론 단계당 최대 토큰 수", interactive=True, ) final_num_tokens = gr.Slider( 50, 4000, 2000, step=1, label="최종 답변의 최대 토큰 수", interactive=True, ) with gr.Column(): do_sample = gr.Checkbox(True, label="샘플링 사용") temperature = gr.Slider(0.1, 1.0, 0.7, step=0.1, label="온도") memory_weight = gr.Slider(0.0, 1.0, 0.5, step=0.1, label="메모리 반영 가중치") # 피드백 처리 함수 def process_positive_feedback(): global buffer_manager, current_contexts if buffer_manager: buffer_manager.update_retrieval_reward(current_contexts, reward=1.0) return "피드백 감사합니다! 이 접근 방식을 향후 유사한 질문에 더 자주 사용하겠습니다." def process_negative_feedback(): global buffer_manager, current_contexts if buffer_manager: buffer_manager.update_retrieval_reward(current_contexts, reward=-0.5) return "피드백 감사합니다! 이 접근 방식을 개선하겠습니다." def clear_memory(): global buffer_manager if buffer_manager: buffer_manager.clear() return "메모리가 초기화되었습니다." def update_memory_displays(): global buffer_manager if not buffer_manager: return "메모리가 초기화되지 않았습니다.", "그래프가 초기화되지 않았습니다." semantic_text = "현재 저장된 메모리:\n\n" for i, mem in enumerate(buffer_manager.semantic_memory.memories[:5]): # 최대 5개만 표시 semantic_text += f"{i+1}. {mem['text'][:100]}...\n\n" graph_text = "현재 그래프 노드:\n\n" for node in buffer_manager.graph_memory.graph.nodes(): node_text = buffer_manager.graph_memory.get_text_by_node(node) neighbors = list(buffer_manager.graph_memory.graph.neighbors(node)) graph_text += f"노드: {node}\n설명: {node_text[:50]}...\n연결: {', '.join(neighbors[:3])}\n\n" return semantic_text, graph_text # 초기화 함수 def initialize_models(): global pipe, buffer_manager, model_name try: pipe, buffer_manager = initialize_model_and_manager(model_name) semantic_text, graph_text = update_memory_displays() return "모델이 초기화되었습니다.", semantic_text, graph_text except Exception as e: return f"모델 초기화 오류: {str(e)}", "", "" # 모델 선택 변경 시 처리 def change_model(new_model_name): global model_name model_name = new_model_name status, semantic_text, graph_text = initialize_models() return status, semantic_text, graph_text # 초기화 함수 실행 model_dropdown.change( change_model, [model_dropdown], [gr.Textbox(visible=False), semantic_memory_display, graph_memory_display] ) # 피드백 버튼에 함수 연결 feedback_btn_pos.click(process_positive_feedback, [], gr.Textbox(visible=False)) feedback_btn_neg.click(process_negative_feedback, [], gr.Textbox(visible=False)) clear_memory_btn.click(clear_memory, [], gr.Textbox(visible=False)) # 탭 변경 시 메모리 디스플레이 업데이트 tabs.change(update_memory_displays, [], [semantic_memory_display, graph_memory_display]) # 사용자가 메시지를 제출하면 두 봇이 동시에 응답합니다 msg.submit( user_input, [msg, chatbot_original, chatbot_thinking], # 입력 [msg, chatbot_original, chatbot_thinking], # 출력 ).then( lambda h, n, d, t, p: bot_original(h, n, d, t, p), # pipe 매개변수 추가 [ chatbot_original, num_tokens, do_sample, temperature, gr.Textbox(value=lambda: pipe, visible=False), # pipe 전달 ], chatbot_original, # 출력에서 새 히스토리 저장 ).then( lambda h, n, f, d, t, p, b: bot_thinking_enhanced(h, n, f, d, t, p, b), # 매개변수 추가 [ chatbot_thinking, num_tokens, final_num_tokens, do_sample, temperature, gr.Textbox(value=lambda: pipe, visible=False), # pipe 전달 gr.Textbox(value=lambda: buffer_manager, visible=False), # buffer_manager 전달 ], chatbot_thinking, # 출력에서 새 히스토리 저장 ).then( update_memory_displays, [], [semantic_memory_display, graph_memory_display] ) # 시작 시 모델 초기화를 위한 코드 def load_on_startup(): global pipe, buffer_manager try: # 기본 모델 초기화 pipe, buffer_manager = initialize_model_and_manager( "CohereForAI/c4ai-command-r7b-arabic-02-2025" ) logger.info("모델 및 버퍼 매니저가 성공적으로 초기화되었습니다.") except Exception as e: logger.error(f"시작 시 모델 초기화 실패: {str(e)}") if __name__ == "__main__": # 응용 프로그램 시작 전에 모델 초기화 load_on_startup() # 대기열 및 서버 시작 demo.queue().launch( share=False, debug=True, title="Enhanced ThinkFlow with DeepSeek Features" )