Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import torch | |
import os | |
import time | |
import logging | |
import subprocess | |
import sys | |
# 設定logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# 頁面配置 | |
st.set_page_config( | |
page_title="Excel 問答 AI(ChatGLM 驅動)", | |
page_icon="🤖", | |
layout="wide" | |
) | |
# 應用標題與說明 | |
st.title("🤖 Excel 問答 AI(ChatGLM 驅動)") | |
st.markdown(""" | |
### 使用說明 | |
1. 可直接提問一般知識,AI 將使用內建能力回答 | |
2. 上傳 Excel 檔案(包含「問題」和「答案」欄位)以添加專業知識 | |
3. 系統會優先使用您上傳的知識庫進行回答 | |
""") | |
# 檢查並安裝必要套件 | |
def install_missing_packages(): | |
required_packages = ["sentencepiece", "protobuf", "bitsandbytes"] # 加入 bitsandbytes | |
for package in required_packages: | |
try: | |
__import__(package) | |
st.write(f"{package} 已安裝") | |
except ImportError: | |
st.write(f"安裝缺失的套件: {package}") | |
try: | |
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) | |
st.write(f"{package} 已安裝成功") | |
except Exception as e: | |
st.error(f"安裝 {package} 失敗: {str(e)}") | |
return False | |
return True | |
# 安裝缺失的套件 | |
if not install_missing_packages(): | |
st.error("必要套件安裝失敗,請刷新頁面重試") | |
st.stop() | |
st.write("正在導入依賴項...") | |
# 依次導入並檢查每個依賴 | |
try: | |
from langchain_community.embeddings.huggingface import HuggingFaceEmbeddings | |
st.write("成功導入 HuggingFaceEmbeddings") | |
except Exception as e: | |
st.error(f"導入 HuggingFaceEmbeddings 失敗: {str(e)}") | |
st.stop() | |
try: | |
from langchain_community.vectorstores import FAISS | |
st.write("成功導入 FAISS") | |
except Exception as e: | |
st.error(f"導入 FAISS 失敗: {str(e)}") | |
st.stop() | |
try: | |
from langchain_community.llms import HuggingFacePipeline | |
st.write("成功導入 HuggingFacePipeline") | |
except Exception as e: | |
st.error(f"導入 HuggingFacePipeline 失敗: {str(e)}") | |
st.stop() | |
try: | |
from langchain.chains import RetrievalQA, LLMChain | |
st.write("成功導入 RetrievalQA, LLMChain") | |
except Exception as e: | |
st.error(f"導入 RetrievalQA, LLMChain 失敗: {str(e)}") | |
st.stop() | |
try: | |
from langchain.prompts import PromptTemplate | |
st.write("成功導入 PromptTemplate") | |
except Exception as e: | |
st.error(f"導入 PromptTemplate 失敗: {str(e)}") | |
st.stop() | |
try: | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
st.write("成功導入 transformers 組件") | |
except Exception as e: | |
st.error(f"導入 transformers 組件失敗: {str(e)}") | |
st.stop() | |
try: | |
import bitsandbytes # 檢查 bitsandbytes | |
st.write("成功導入 bitsandbytes") | |
has_bitsandbytes = True | |
except ImportError: | |
st.warning("未安裝 bitsandbytes,將無法使用 4 位元量化。") | |
has_bitsandbytes = False | |
st.write("所有依賴項導入成功!") | |
# 側邊欄設定 | |
with st.sidebar: | |
st.header("參數設定") | |
model_option = st.selectbox( | |
"選擇模型", | |
["THUDM/chatglm3-6b", "THUDM/chatglm2-6b", "THUDM/chatglm-6b"], | |
index=0 | |
) | |
embedding_option = st.selectbox( | |
"選擇嵌入模型", | |
["shibing624/text2vec-base-chinese", "GanymedeNil/text2vec-large-chinese"], | |
index=0 | |
) | |
mode = st.radio( | |
"回答模式", | |
["混合模式(優先使用上傳資料)", "僅使用上傳資料", "僅使用模型知識"] | |
) | |
max_tokens = st.slider("最大回應長度", 128, 2048, 512) | |
temperature = st.slider("溫度(創造性)", 0.0, 1.0, 0.7, 0.1) | |
top_k = st.slider("檢索相關文檔數", 1, 5, 3) | |
st.markdown("---") | |
st.markdown("### 關於") | |
st.markdown("此應用使用 ChatGLM 模型結合 LangChain 框架,將您的 Excel 數據轉化為智能問答系統。同時支持一般知識問答。") | |
# 全局變量 | |
def load_embeddings(model_name): | |
try: | |
logger.info(f"加載嵌入模型: {model_name}") | |
st.write(f"開始加載嵌入模型: {model_name}...") | |
embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
st.write(f"嵌入模型加載成功!") | |
return embeddings | |
except Exception as e: | |
logger.error(f"嵌入模型加載失敗: {str(e)}") | |
st.error(f"嵌入模型加載失敗: {str(e)}") | |
return None | |
def load_llm(_model_name, _max_tokens, _temperature): | |
try: | |
logger.info(f"加載語言模型: {_model_name}") | |
st.write(f"開始加載語言模型: {_model_name}...") | |
# 檢查可用資源 | |
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() if torch.cuda.is_available() else 0 | |
st.write(f"可用GPU記憶體: {free_memory / (1024**3):.2f} GB" if torch.cuda.is_available() else "無GPU可用,將使用CPU") | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
dtype = torch.float16 | |
load_args = {"trust_remote_code": True, "device_map": device, "torch_dtype": dtype} | |
if device == "cpu": | |
st.warning("注意:在 CPU 上載入大型語言模型可能會非常緩慢且需要大量記憶體。") | |
elif has_bitsandbytes: | |
try: | |
load_args["load_in_4bit"] = True | |
load_args["bnb_4bit_compute_dtype"] = torch.float16 | |
st.info("嘗試使用 4 位元量化載入模型 (需要 bitsandbytes)。") | |
except Exception as e: | |
st.warning(f"載入 4 位元量化模型失敗: {e}") | |
st.info("將嘗試以半精度浮點數載入。") | |
# 使用超時保護 | |
with st.spinner(f"正在加載 {_model_name} 模型,這可能需要幾分鐘..."): | |
# 加載tokenizer | |
st.write("加載tokenizer...") | |
tokenizer = AutoTokenizer.from_pretrained(_model_name, trust_remote_code=True) | |
st.write("Tokenizer加載成功") | |
# 加載模型 | |
st.write(f"開始加載模型到{device}...") | |
try: | |
model = AutoModelForCausalLM.from_pretrained(_model_name, **load_args) | |
st.write("模型加載成功!") | |
except Exception as e: | |
st.error(f"模型加載失敗: {e}") | |
st.error("嘗試使用不同的載入配置。") | |
raise e | |
# 創建pipeline | |
st.write("創建文本生成pipeline...") | |
pipe = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=_max_tokens, | |
temperature=_temperature, | |
top_p=0.9, | |
repetition_penalty=1.1 | |
) | |
st.write("Pipeline創建成功!") | |
return HuggingFacePipeline(pipeline=pipe) | |
except Exception as e: | |
logger.error(f"語言模型加載失敗: {str(e)}") | |
st.error(f"語言模型加載失敗: {str(e)}") | |
st.error("如果是因為記憶體不足,請考慮使用較小的模型或增加系統記憶體") | |
return None | |
# 創建向量資料庫 | |
def create_vectorstore(texts, embeddings): | |
try: | |
st.write("開始創建向量資料庫...") | |
vectorstore = FAISS.from_texts(texts, embedding=embeddings) | |
st.write("向量資料庫創建成功!") | |
return vectorstore | |
except Exception as e: | |
logger.error(f"向量資料庫創建失敗: {str(e)}") | |
st.error(f"向量資料庫創建失敗: {str(e)}") | |
return None | |
# 創建直接問答的LLM鏈 | |
def create_general_qa_chain(llm): | |
prompt_template = """請回答以下問題: | |
問題: {question} | |
請提供詳細且有幫助的回答:""" | |
prompt = PromptTemplate( | |
template=prompt_template, | |
input_variables=["question"] | |
) | |
return LLMChain(llm=llm, prompt=prompt) | |
# 混合模式問答處理 | |
def hybrid_qa(query, qa_chain, general_chain, confidence_threshold=0.7): | |
# 先嘗試使用知識庫回答 | |
try: | |
st.write("嘗試從知識庫查詢答案...") | |
kb_result = qa_chain({"query": query}) | |
# 檢查向量存儲的相似度分數,判斷是否有足夠相關的內容 | |
if (hasattr(kb_result, 'source_documents') and | |
kb_result.get("source_documents") and | |
len(kb_result["source_documents"]) > 0): | |
# 這裡假設我們能獲取到相似度分數,實際上可能需要根據您使用的向量存儲方法調整 | |
relevance = True # 在實際應用中,這裡應根據相似度分數確定 | |
if relevance: | |
st.write("找到相關知識庫內容") | |
return kb_result, "knowledge_base", kb_result["source_documents"] | |
st.write("知識庫中未找到足夠相關的內容") | |
except Exception as e: | |
logger.warning(f"知識庫查詢失敗: {str(e)}") | |
st.warning(f"知識庫查詢失敗: {str(e)}") | |
# 如果知識庫沒有足夠相關的答案,使用一般知識模式 | |
try: | |
st.write("使用模型一般知識回答...") | |
general_result = general_chain.run(question=query) | |
return {"result": general_result}, "general", [] | |
except Exception as e: | |
logger.error(f"一般知識查詢失敗: {str(e)}") | |
st.error(f"一般知識查詢失敗: {str(e)}") | |
return {"result": "很抱歉,無法處理您的問題,請稍後再試。"}, "error", [] | |
# 主應用邏輯 | |
# 加載嵌入模型(先加載嵌入模型,因為這通常較小較快) | |
embeddings = None | |
if "embeddings" not in st.session_state: | |
with st.spinner("正在加載嵌入模型..."): | |
embeddings = load_embeddings(embedding_option) | |
if embeddings is not None: | |
st.session_state.embeddings = embeddings | |
else: | |
st.error("嵌入模型加載失敗,請刷新頁面重試") | |
st.stop() | |
else: | |
embeddings = st.session_state.embeddings | |
# 加載語言模型(不管是否上傳文件都需要) | |
llm = None | |
if "llm" not in st.session_state: | |
llm = load_llm(model_option, max_tokens, temperature) | |
if llm is not None: | |
st.session_state.llm = llm | |
else: | |
st.error("語言模型加載失敗,請刷新頁面重試") | |
st.stop() | |
else: | |
llm = st.session_state.llm | |
# 創建一般問答鏈 | |
general_qa_chain = create_general_qa_chain(llm) | |
st.write("一般問答鏈創建成功!") | |
# 變數初始化 | |
kb_qa_chain = None | |
has_knowledge_base = False | |
vectorstore = None | |
# 上傳Excel文件 | |
uploaded_file = st.file_uploader("上傳你的問答 Excel(可選)", type=["xlsx"]) | |
if uploaded_file: | |
# 讀取Excel文件 | |
try: | |
st.write("開始讀取Excel文件...") | |
df = pd.read_excel(uploaded_file) | |
# 檢查必要欄位 | |
if not {'問題', '答案'}.issubset(df.columns): | |
st.error("Excel 檔案需包含 '問題' 和 '答案' 欄位") | |
else: | |
# 顯示資料預覽 | |
with st.expander("Excel 資料預覽"): | |
st.dataframe(df.head()) | |
st.info(f"成功讀取 {len(df)} 筆問答對") | |
# 建立文本列表 | |
texts = [f"問題:{q}\n答案:{a}" for q, a in zip(df['問題'], df['答案'])] | |
# 進度條 | |
progress_text = "正在處理中..." | |
my_bar = st.progress(0, text=progress_text) | |
# 使用之前加載的嵌入模型 | |
my_bar.progress(25, text="準備嵌入模型...") | |
# 建立向量資料庫 | |
my_bar.progress(50, text="正在建立向量資料庫...") | |
vectorstore = create_vectorstore(texts, embeddings) | |
if vectorstore is None: | |
st.stop() | |
# 創建問答鏈 | |
my_bar.progress(75, text="正在建立知識庫問答系統...") | |
kb_qa_chain = RetrievalQA.from_chain_type( | |
llm=llm, | |
retriever=vectorstore.as_retriever(search_kwargs={"k": top_k}), | |
chain_type="stuff", | |
return_source_documents=True | |
) | |
has_knowledge_base = True | |
my_bar.progress(100, text="準備完成!") | |
time.sleep(1) | |
my_bar.empty() | |
st.success("知識庫已準備就緒,請輸入您的問題") | |
except Exception as e: | |
logger.error(f"Excel 檔案處理失敗: {str(e)}") | |
st.error(f"Excel 檔案處理失敗: {str(e)}") | |
# 查詢部分 | |
st.markdown("## 開始對話") | |
query = st.text_input("請輸入你的問題:") | |
if query: | |
with st.spinner("AI 思考中..."): | |
try: | |
start_time = time.time() | |
# 根據模式選擇問答方式 | |
if mode == "僅使用上傳資料": | |
if has_knowledge_base: | |
st.write("使用知識庫模式回答...") | |
result = kb_qa_chain({"query": query}) | |
source = "knowledge_base" | |
source_docs = result["source_documents"] | |
else: | |
st.warning("您選擇了僅使用上傳資料模式,但尚未上傳Excel檔案。請上傳檔案或變更模式。") | |
st.stop() | |
elif mode == "僅使用模型知識": | |
st.write("使用模型一般知識模式回答...") | |
result = {"result": general_qa_chain.run(question=query)} | |
source = "general" | |
source_docs = [] | |
else: # 混合模式 | |
if has_knowledge_base: | |
st.write("使用混合模式回答...") | |
result, source, source_docs = hybrid_qa(query, kb_qa_chain, general_qa_chain) | |
else: | |
st.write("未檢測到知識庫,使用模型一般知識回答...") | |
result = {"result": general_qa_chain.run(question=query)} | |
source = "general" | |
source_docs = [] | |
end_time = time.time() | |
# 顯示回答 | |
st.markdown("### AI 回答:") | |
st.markdown(result["result"]) | |
# 根據來源顯示不同信息 | |
if source == "knowledge_base": | |
st.success("✅ 回答來自您的知識庫") | |
# 顯示參考資料 | |
with st.expander("參考資料"): | |
for i, doc in enumerate(source_docs): | |
st.markdown(f"**參考 {i+1}**") | |
st.markdown(doc.page_content) | |
st.markdown("---") | |
elif source == "general": | |
if has_knowledge_base: | |
st.info("ℹ️ 回答來自模型的一般知識(知識庫中未找到相關內容)") | |
else: | |
st.info("ℹ️ 回答來自模型的一般知識") | |
st.text(f"回答生成時間: {(end_time - start_time):.2f} 秒") | |
except Exception as e: | |
logger.error(f"查詢處理失敗: {str(e)}") | |
st.error(f"查詢處理失敗,請重試: {str(e)}") | |
st.error(f"錯誤詳情: {str(e)}") | |
# 添加會話歷史功能 | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
# 底部資訊 | |
st.markdown("---") | |
st.markdown("Made with ❤️ | Excel 問答 AI") |