docker_test / app.py
朱东升
update8
890be77
raw
history blame
31.3 kB
import gradio as gr
import json
import importlib
import os
import sys
import time
from pathlib import Path
import concurrent.futures
import multiprocessing
import threading
import queue
from datetime import datetime, timedelta
from src.containerized_eval import eval_string_script
# 添加当前目录和src目录到模块搜索路径
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(current_dir, "src")
if current_dir not in sys.path:
sys.path.append(current_dir)
if src_dir not in sys.path:
sys.path.append(src_dir)
# 添加全局任务队列和任务状态跟踪
task_queue = queue.Queue()
task_results = {}
active_tasks = {}
completed_tasks = []
task_id_counter = 0
task_lock = threading.Lock()
last_update_time = datetime.now() # 替换update_event为时间戳跟踪
def trigger_ui_update():
"""触发UI更新事件"""
global last_update_time
with task_lock:
last_update_time = datetime.now() # 更新时间戳而不是使用Event
def get_next_task_id():
global task_id_counter
with task_lock:
task_id_counter += 1
return f"task_{task_id_counter}"
def submit_task(input_data):
"""提交任务到队列
Args:
input_data: 列表(批量处理多个测试用例)
Returns:
str: 任务ID
"""
try:
if not isinstance(input_data, list):
return {"status": "error", "message": "Input must be a list"}
task_id = get_next_task_id()
with task_lock:
estimated_time = estimate_completion_time(input_data)
task_info = {
"id": task_id,
"data": input_data,
"status": "queued",
"submitted_at": datetime.now(),
"estimated_completion_time": estimated_time,
"items_count": len(input_data)
}
active_tasks[task_id] = task_info
task_queue.put(task_info)
# 触发UI更新
trigger_ui_update()
# 如果这是第一个任务,启动处理线程
if len(active_tasks) == 1:
threading.Thread(target=process_task_queue, daemon=True).start()
return {"status": "success", "task_id": task_id}
except Exception as e:
return {"status": "error", "message": str(e)}
def estimate_completion_time(input_data):
"""估计完成任务所需的时间
Args:
input_data: 任务数据
Returns:
timedelta: 估计的完成时间
"""
# 在Hugging Face Spaces环境中,资源通常受限,调整处理时间预估
# 假设每个任务项平均需要5秒处理(HF环境中可能更慢)
avg_time_per_item = 5
total_items = len(input_data)
# Hugging Face Spaces通常有限制的CPU资源
# 保守估计并行处理能力
try:
cpu_count = multiprocessing.cpu_count()
except:
# 如果获取失败,假设只有2个CPU
cpu_count = 2
# 在HF环境中,即使有多核也可能性能受限,降低并行因子
parallel_factor = min(2, total_items) # 限制最多2个并行任务
if parallel_factor > 0:
estimated_seconds = (total_items * avg_time_per_item) / parallel_factor
# 为了避免过于乐观的估计,增加30%的缓冲时间
estimated_seconds = estimated_seconds * 1.3
return timedelta(seconds=round(estimated_seconds))
else:
return timedelta(seconds=0)
def process_task_queue():
"""处理任务队列的后台线程"""
while True:
try:
if task_queue.empty():
time.sleep(0.5)
continue
task_info = task_queue.get()
task_id = task_info["id"]
# 更新任务状态
with task_lock:
active_tasks[task_id]["status"] = "processing"
trigger_ui_update() # 状态变更为处理中时更新UI
# 处理任务
result = evaluate(task_info["data"])
# 更新任务结果
with task_lock:
active_tasks[task_id]["status"] = "completed"
active_tasks[task_id]["completed_at"] = datetime.now()
active_tasks[task_id]["result"] = result
# 将任务移至已完成列表
completed_tasks.append(active_tasks[task_id])
del active_tasks[task_id]
# 保留最近的20个已完成任务
if len(completed_tasks) > 20:
completed_tasks.pop(0)
trigger_ui_update() # 任务完成时更新UI
task_queue.task_done()
except Exception as e:
print(f"Error processing task queue: {str(e)}")
time.sleep(1)
def evaluate(input_data):
"""评估代码的主函数
Args:
input_data: 列表(批量处理多个测试用例)
Returns:
list: 包含评估结果的列表
"""
# 打印Gradio版本,用于调试
import gradio
print(f"Gradio version: {gradio.__version__}")
try:
if not isinstance(input_data, list):
return {"status": "Exception", "error": "Input must be a list"}
results = []
# 在HF Spaces环境中可能受限,降低并行数量
try:
max_workers = min(multiprocessing.cpu_count(), 2) # 最多2个并行任务
except:
max_workers = 2 # 如果无法获取,默认为2
# 增加超时处理
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_item = {}
# 分批处理,每批最多5个任务,避免资源耗尽
batch_size = 5
for i in range(0, len(input_data), batch_size):
batch = input_data[i:i+batch_size]
# 为每个任务提交并记录
for item in batch:
future = executor.submit(evaluate_single_case, item)
future_to_item[future] = item
# 等待当前批次完成
for future in concurrent.futures.as_completed(future_to_item):
item = future_to_item[future]
try:
# 设置较短的超时时间,避免任务卡死
result = future.result(timeout=60) # 60秒超时
item.update(result)
results.append(item)
except concurrent.futures.TimeoutError:
# 处理超时情况
item.update({
"status": "Timeout",
"error": "Task processing timed out in Hugging Face environment"
})
results.append(item)
except Exception as e:
# 处理其他异常
item.update({
"status": "Exception",
"error": f"Error in Hugging Face environment: {str(e)}"
})
results.append(item)
# 清空当前批次
future_to_item = {}
# 短暂休息,让系统喘息
time.sleep(0.5)
return results
except Exception as e:
return {"status": "Exception", "error": f"Evaluation error in Hugging Face environment: {str(e)}"}
def evaluate_single_case(input_data):
"""评估单个代码用例
Args:
input_data: 字典(包含代码信息)
Returns:
dict: 包含评估结果的字典
"""
try:
if not isinstance(input_data, dict):
return {"status": "Exception", "error": "Input item must be a dictionary"}
language = input_data.get('language')
completions = input_data.get('processed_completions', [])
if not completions:
return {"status": "Exception", "error": "No code provided"}
results = []
for comp in completions:
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests')
result = evaluate_code(code, language)
if result["status"] == "OK":
return result
results.append(result)
return results[0]
except Exception as e:
return {"status": "Exception", "error": str(e)}
def evaluate_code(code, language):
"""评估特定语言的代码
Args:
code (str): 要评估的代码
language (str): 编程语言
Returns:
dict: 包含评估结果的字典
"""
try:
# 使用containerized_eval中的eval_string_script函数
result = eval_string_script(language, code)
return result
except Exception as e:
return {"status": "Exception", "error": str(e)}
def get_queue_status():
"""获取当前队列状态
Returns:
dict: 包含队列状态的字典
"""
with task_lock:
queued_tasks = [task for task in active_tasks.values() if task["status"] == "queued"]
processing_tasks = [task for task in active_tasks.values() if task["status"] == "processing"]
# 计算总的预计完成时间
total_estimated_time = timedelta(seconds=0)
for task in active_tasks.values():
if isinstance(task["estimated_completion_time"], timedelta):
total_estimated_time += task["estimated_completion_time"]
# 计算队列中的项目总数
total_items = sum(task["items_count"] for task in active_tasks.values())
return {
"queued_tasks": len(queued_tasks),
"processing_tasks": len(processing_tasks),
"total_tasks": len(active_tasks),
"total_items": total_items,
"estimated_completion_time": str(total_estimated_time),
"active_tasks": [
{
"id": task["id"],
"status": task["status"],
"items_count": task["items_count"],
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"),
"estimated_completion": str(task["estimated_completion_time"])
} for task in active_tasks.values()
],
"recent_completed": [
{
"id": task["id"],
"items_count": task["items_count"],
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"),
"completed_at": task["completed_at"].strftime("%Y-%m-%d %H:%M:%S") if "completed_at" in task else "",
"duration": str(task["completed_at"] - task["submitted_at"]) if "completed_at" in task else ""
} for task in completed_tasks[-5:] # 只显示最近5个完成的任务
]
}
def render_queue_status():
"""渲染队列状态UI
Returns:
str: HTML格式的队列状态显示
"""
status = get_queue_status()
html = f"""
<div style="font-family: Arial, sans-serif; max-width: 900px; margin: 0 auto; padding: 20px; background: #f9f9f9; border-radius: 10px; box-shadow: 0 4px 12px rgba(0,0,0,0.1);">
<div style="display: flex; justify-content: space-between; margin-bottom: 20px;">
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">排队任务</h3>
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #3498db;">{status['queued_tasks']}</p>
</div>
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">处理中任务</h3>
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #e74c3c;">{status['processing_tasks']}</p>
</div>
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">总项目数</h3>
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #2ecc71;">{status['total_items']}</p>
</div>
</div>
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">预计完成时间</h2>
<div style="display: flex; align-items: center;">
<div style="font-size: 24px; font-weight: bold; color: #9b59b6;">{status['estimated_completion_time']}</div>
<div style="margin-left: 10px; color: #777; font-size: 14px;">(小时:分钟:秒)</div>
</div>
</div>
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">活跃任务</h2>
<table style="width: 100%; border-collapse: collapse;">
<thead>
<tr style="background: #f2f2f2;">
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">状态</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">预计完成</th>
</tr>
</thead>
<tbody>
"""
if status['active_tasks']:
for task in status['active_tasks']:
status_color = "#3498db" if task['status'] == "queued" else "#e74c3c"
html += f"""
<tr>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">
<span style="padding: 4px 8px; border-radius: 4px; font-size: 12px; background: {status_color}; color: white;">
{task['status']}
</span>
</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['estimated_completion']}</td>
</tr>
"""
else:
html += f"""
<tr>
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">当前没有活跃任务</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
<div style="background: #fff; padding: 20px; border-radius: 8px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);">
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">最近完成的任务</h2>
<table style="width: 100%; border-collapse: collapse;">
<thead>
<tr style="background: #f2f2f2;">
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">完成时间</th>
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">持续时间</th>
</tr>
</thead>
<tbody>
"""
if status['recent_completed']:
for task in status['recent_completed']:
html += f"""
<tr>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['completed_at']}</td>
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['duration']}</td>
</tr>
"""
else:
html += f"""
<tr>
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">暂无已完成任务</td>
</tr>
"""
html += """
</tbody>
</table>
</div>
</div>
"""
return html
def refresh_ui():
"""定期刷新UI函数"""
# 不再需要等待事件,直接返回最新状态
return render_queue_status()
def submit_json_data(json_data):
"""提交JSON数据处理接口"""
try:
# 解析JSON字符串
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
result = submit_task(data)
response = json.dumps(result, ensure_ascii=False, indent=2)
# 返回任务提交结果和最新的队列状态
return response, render_queue_status()
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False, indent=2), render_queue_status()
# 创建Gradio接口
with gr.Blocks(title="代码评估服务", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 代码评估服务
### 支持多种编程语言的代码评估服务
""")
with gr.Tab("任务队列状态"):
status_html = gr.HTML(render_queue_status)
refresh_button = gr.Button("刷新状态")
# 根据Gradio版本使用不同的事件注册方法
if 'gradio_version' not in locals():
import gradio
gradio_version = getattr(gradio, "__version__", "unknown")
if gradio_version.startswith("3."):
# Gradio 3.x 方式
refresh_button.click(fn=refresh_ui, outputs=status_html, concurrency_limit=2)
else:
# Gradio 4.x 方式 (不使用concurrency_limit参数)
refresh_button.click(fn=refresh_ui, outputs=status_html)
# 使用正确的轮询方式替代
dummy_input = gr.Textbox(value="", visible=False)
# We'll rely on the JavaScript solution below for auto-refresh
# The refresh button still works for manual refresh
# JavaScript-based auto-refresh solution
gr.HTML("""
<script>
// 兼容Gradio 3.x和4.x的自动刷新机制
function setupAutoRefresh() {
// 在HF Spaces中更保守的刷新间隔,避免过多请求
const refreshInterval = 3000; // 3秒刷新一次
let lastTaskCount = 0;
let consecutiveNoChange = 0;
// 查找刷新按钮并模拟点击
function autoRefresh() {
try {
// 兼容Gradio 3.x和4.x的按钮查找方法
const buttons = document.querySelectorAll('button');
let refreshButton = null;
// 尝试通过文本查找按钮
for (const button of buttons) {
if (button.textContent === '刷新状态' ||
button.textContent.includes('刷新') ||
button.innerHTML.includes('刷新状态')) {
refreshButton = button;
break;
}
}
// 如果没找到,尝试查找第一个标签页中的第一个按钮
if (!refreshButton) {
const firstTab = document.querySelector('.tabitem');
if (firstTab) {
const firstTabButton = firstTab.querySelector('button');
if (firstTabButton) {
refreshButton = firstTabButton;
}
}
}
// 如果找到按钮,模拟点击
if (refreshButton) {
refreshButton.click();
console.log("自动刷新已触发");
} else {
console.warn("未找到刷新按钮");
// 如果找不到按钮,尝试重新加载整个页面
// 但要限制频率,避免无限刷新
if (consecutiveNoChange > 10) {
console.log("长时间未找到刷新按钮,刷新页面");
location.reload();
return;
}
}
// 在Gradio 3.x和4.x中检查任务数量变化
setTimeout(() => {
// 尝试不同的选择器来适配不同版本
const tables = document.querySelectorAll('table');
let activeTasks = [];
if (tables.length > 0) {
// 在所有表格中查找行
for (const table of tables) {
const rows = table.querySelectorAll('tbody tr');
if (rows.length > 0) {
activeTasks = rows;
break;
}
}
}
const currentTaskCount = activeTasks.length || 0;
if (currentTaskCount === lastTaskCount) {
consecutiveNoChange++;
} else {
consecutiveNoChange = 0;
}
lastTaskCount = currentTaskCount;
}, 500);
} catch (e) {
console.error("自动刷新错误:", e);
}
}
// 周期性检查刷新,确保即使在HF Spaces资源受限情况下也能工作
let refreshTimer = setInterval(autoRefresh, refreshInterval);
// 确保页面可见性变化时刷新机制正常工作
document.addEventListener('visibilitychange', function() {
if (document.visibilityState === 'visible') {
// 页面变为可见时立即刷新一次
autoRefresh();
// 如果计时器被清除,重新创建
if (!refreshTimer) {
refreshTimer = setInterval(autoRefresh, refreshInterval);
}
}
});
// 设置首次加载时立即刷新一次,但稍微延迟确保DOM已加载
setTimeout(autoRefresh, 500);
}
// 确保在DOM完全加载后运行
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', setupAutoRefresh);
} else {
setupAutoRefresh();
}
</script>
""")
with gr.Tab("提交新任务"):
with gr.Row():
with gr.Column():
json_input = gr.Textbox(
label="输入JSON数据",
placeholder='[{"language": "python", "prompt": "def add(a, b):\\n", "processed_completions": [" return a + b"], "tests": "assert add(1, 2) == 3"}]',
lines=10
)
submit_button = gr.Button("提交任务")
with gr.Column():
result_output = gr.Textbox(label="提交结果", lines=5)
# 根据Gradio版本使用不同的事件注册方法
if gradio_version.startswith("3."):
# Gradio 3.x 方式
submit_button.click(fn=submit_json_data, inputs=json_input, outputs=[result_output, status_html], concurrency_limit=2)
else:
# Gradio 4.x 方式
submit_button.click(fn=submit_json_data, inputs=json_input, outputs=[result_output, status_html])
with gr.Tab("API文档"):
gr.Markdown("""
## API 文档
### 1. 提交任务
**请求:**
```
POST /api/predict
Content-Type: application/json
[
{
"language": "python",
"prompt": "def add(a, b):\\n",
"processed_completions": [" return a + b"],
"tests": "assert add(1, 2) == 3"
}
]
```
**响应:**
```json
{
"status": "success",
"task_id": "task_1"
}
```
### 2. 查询任务状态
**请求:**
```
GET /api/status
```
**响应:**
```json
{
"queued_tasks": 1,
"processing_tasks": 2,
"total_tasks": 3,
"total_items": 15,
"estimated_completion_time": "0:05:30",
"active_tasks": [...],
"recent_completed": [...]
}
```
""")
# 添加API端点
with gr.Row(visible=False):
api_input = gr.JSON()
api_output = gr.JSON()
# 根据Gradio版本使用不同的事件注册方法
if gradio_version.startswith("3."):
# Gradio 3.x 方式
api_input.change(fn=evaluate, inputs=api_input, outputs=api_output, concurrency_limit=2)
else:
# Gradio 4.x 方式
api_input.change(fn=evaluate, inputs=api_input, outputs=api_output)
if __name__ == "__main__":
# 检测Gradio版本以适配不同版本的API
import gradio
gradio_version = getattr(gradio, "__version__", "unknown")
print(f"当前Gradio版本: {gradio_version}")
try:
# 设置队列,并添加API支持
try:
# 根据Gradio版本使用不同的队列配置
if gradio_version.startswith("3."):
# Gradio 3.x 版本
demo.queue(api_open=True, max_size=30)
else:
# Gradio 4.x 或更高版本可能有不同的队列API
try:
# 尝试新版本的queue方法
demo.queue(api_name="/api", max_size=30)
except:
# 如果失败,尝试不带参数的queue方法
demo.queue()
except Exception as e:
# 任何队列配置错误,记录后继续
print(f"配置队列时出错: {e}")
print("继续启动应用,但队列功能可能不可用")
# 针对Hugging Face Spaces环境的优化配置
is_hf_space = os.environ.get("SPACE_ID") is not None
server_port = int(os.environ.get("PORT", 7860))
server_name = "0.0.0.0" if is_hf_space else "127.0.0.1"
# 尝试使用兼容所有版本的参数启动
launch_kwargs = {
"server_name": server_name,
"server_port": server_port,
"share": False,
}
# 对于Gradio 3.x添加额外参数
if gradio_version.startswith("3."):
launch_kwargs.update({
"debug": False,
"show_api": True,
"max_threads": 5,
"concurrency_limit": 2
})
# 启动应用
demo.launch(**launch_kwargs)
except Exception as e:
# 记录错误并使用最简配置重试
print(f"启动时发生错误: {e}")
print("使用最小配置重试...")
try:
# 最小配置,确保应用能启动
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))
except Exception as e2:
print(f"最小配置启动也失败: {e2}")
# 终极回退方案:创建最简单的接口并启动
print("尝试创建备用界面...")
try:
import gradio as gr
def simple_evaluate(json_data):
try:
return evaluate(json.loads(json_data) if isinstance(json_data, str) else json_data)
except Exception as e:
return {"error": str(e)}
backup_demo = gr.Interface(
fn=simple_evaluate,
inputs=gr.Textbox(label="JSON输入"),
outputs=gr.JSON(label="结果"),
title="代码评估服务 (备用界面)",
description="原界面启动失败,这是简化版本"
)
backup_demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))
except Exception as e3:
print(f"备用界面也启动失败: {e3}")