Spaces:
Sleeping
Sleeping
import gradio as gr | |
import time | |
import re | |
from typing import Iterator, List, Tuple, Any | |
from openai import OpenAI | |
# 初始化OpenAI客户端 | |
client = OpenAI( | |
api_key = "37793d47-dfd4-4b3d-9cbc-8e4b32119033", | |
base_url = "https://ark.cn-beijing.volces.com/api/v3", | |
) | |
# 处理API响应,美化输出格式 | |
def process_response(text): | |
"""移除标签并添加格式化的HTML""" | |
# 检测并处理thinking标签 | |
thinking_match = re.search(r'<thinking>([\s\S]*?)<\/thinking>', text) | |
reasoning_match = re.search(r'<reasoning>([\s\S]*?)<\/reasoning>', text) | |
# 如果找到完整的thinking或reasoning标签,重新格式化整个文本 | |
if thinking_match or reasoning_match: | |
# 提取思考和推理部分 | |
thinking_content = thinking_match.group(1) if thinking_match else "" | |
reasoning_content = reasoning_match.group(1) if reasoning_match else "" | |
# 构建新的格式化文本 | |
formatted_text = "" | |
if thinking_content: | |
formatted_text += f"<div class='section thinking-section'>思考过程:{thinking_content}</div>" | |
if reasoning_content: | |
formatted_text += f"<div class='section reasoning-section'><strong>推理结论:{reasoning_content}</strong></div>" | |
return formatted_text | |
# 如果没有找到完整标签,保留原文 | |
return text | |
# 实际API流式响应 | |
def api_stream_response(message: str) -> Iterator[str]: | |
"""调用实际API的流式响应""" | |
try: | |
# 流式调用API | |
stream = client.chat.completions.create( | |
model = "deepseek-r1-250120", | |
messages = [ | |
{"role": "system", "content": "你是材料科学领域的AI助手,专注于材料管道相关问题。请将回答分为两个部分:\n1. <thinking>思考过程,分析问题和组织思路</thinking>\n2. <reasoning>推理过程,包含最终结论和完整答案</reasoning>\n请确保每个回答都包含这两个部分,并用正确的标签包裹。"}, | |
{"role": "user", "content": message}, | |
], | |
stream=True, # 启用流式输出 | |
) | |
# 用于累积响应以处理标签 | |
response_buffer = "" | |
processed_text = "" | |
for chunk in stream: | |
if chunk.choices[0].delta.content is not None: | |
# 添加新内容到缓冲区 | |
response_buffer += chunk.choices[0].delta.content | |
# 处理缓冲区 | |
current_processed = process_response(response_buffer) | |
# 如果处理后的文本与之前不同,返回新处理的文本 | |
if current_processed != processed_text: | |
processed_text = current_processed | |
yield processed_text | |
except Exception as e: | |
yield f"API调用出错: {str(e)}" | |
# 流式响应函数 | |
def stream_response(message: str, history: List[Tuple[str, str]]) -> Iterator[List[Tuple[str, Any]]]: | |
"""流式响应并格式化为Gradio Chatbot需要的格式""" | |
# 添加用户消息到历史 | |
history = history + [(message, "")] | |
# 获取AI的回复 | |
response_generator = api_stream_response(message) | |
for processed_text in response_generator: | |
# 更新最后一条消息的AI回复部分 | |
updated_history = history.copy() | |
updated_history[-1] = (message, processed_text) | |
yield updated_history | |
# 创建自定义CSS样式 | |
custom_css = """ | |
.gradio-container { | |
font-family: 'Arial', sans-serif; | |
} | |
.section { | |
margin-bottom: 15px; | |
} | |
.thinking-section { | |
color: #333333; | |
font-weight: normal; | |
padding-bottom: 10px; | |
border-bottom: 1px dashed #cccccc; | |
} | |
.reasoning-section { | |
color: #000000; | |
margin-top: 10px; | |
} | |
.reasoning-section strong { | |
font-weight: bold; | |
} | |
""" | |
# 创建Gradio界面 | |
def create_demo(): | |
with gr.Blocks(css=custom_css) as demo: | |
gr.Markdown("# 材料科学 AI 问答助手") | |
gr.Markdown("基于DeepSeek模型的材料管道专家系统。回答分为思考过程(常规字体)和推理结论(粗体)两部分。") | |
chatbot = gr.Chatbot( | |
show_label=False, | |
height=500, | |
render=True, # 启用HTML渲染 | |
# bubble=True, # 使用气泡样式 | |
avatar_images=("👤", "🤖") # 设置头像 | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
show_label=False, | |
placeholder="在这里输入您的问题...", | |
scale=9 | |
) | |
submit = gr.Button("发送", scale=1) | |
with gr.Accordion("示例问题", open=False): | |
examples = gr.Examples( | |
examples=[ | |
["材料管道中的数据预处理步骤有哪些?"], | |
["如何优化材料管道的计算效率?"], | |
["材料管道中常用的特征工程方法有哪些?"], | |
["如何在材料管道中整合机器学习模型?"], | |
["材料管道中的高通量筛选技术有什么优势?"] | |
], | |
inputs=msg | |
) | |
# 设置提交操作 | |
submit_event = submit.click( | |
fn=stream_response, | |
inputs=[msg, chatbot], | |
outputs=[chatbot], | |
queue=True | |
) | |
# 清空输入框 | |
submit_event.then( | |
fn=lambda: "", | |
inputs=None, | |
outputs=[msg] | |
) | |
# 回车键提交 | |
msg.submit( | |
fn=stream_response, | |
inputs=[msg, chatbot], | |
outputs=[chatbot], | |
queue=True | |
).then( | |
fn=lambda: "", | |
inputs=None, | |
outputs=[msg] | |
) | |
return demo | |
# 创建演示 | |
demo = create_demo() | |
# 主函数 | |
if __name__ == "__main__": | |
demo.queue() # 启用队列处理 | |
demo.launch() # 移除自定义参数,Hugging Face会自动处理 | |
else: | |
# 用于Hugging Face Spaces部署 | |
demo.queue() |