Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
import time | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import threading | |
import queue | |
from datetime import datetime, timedelta | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
# 添加全局任务队列和任务状态跟踪 | |
task_queue = queue.Queue() | |
task_results = {} | |
active_tasks = {} | |
completed_tasks = [] | |
task_id_counter = 0 | |
task_lock = threading.Lock() | |
update_event = threading.Event() # 通知UI需要刷新 | |
def trigger_ui_update(): | |
"""触发UI更新事件""" | |
update_event.set() | |
def get_next_task_id(): | |
global task_id_counter | |
with task_lock: | |
task_id_counter += 1 | |
return f"task_{task_id_counter}" | |
def submit_task(input_data): | |
"""提交任务到队列 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
str: 任务ID | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "error", "message": "Input must be a list"} | |
task_id = get_next_task_id() | |
with task_lock: | |
estimated_time = estimate_completion_time(input_data) | |
task_info = { | |
"id": task_id, | |
"data": input_data, | |
"status": "queued", | |
"submitted_at": datetime.now(), | |
"estimated_completion_time": estimated_time, | |
"items_count": len(input_data) | |
} | |
active_tasks[task_id] = task_info | |
task_queue.put(task_info) | |
# 触发UI更新 | |
trigger_ui_update() | |
# 如果这是第一个任务,启动处理线程 | |
if len(active_tasks) == 1: | |
threading.Thread(target=process_task_queue, daemon=True).start() | |
return {"status": "success", "task_id": task_id} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
def estimate_completion_time(input_data): | |
"""估计完成任务所需的时间 | |
Args: | |
input_data: 任务数据 | |
Returns: | |
timedelta: 估计的完成时间 | |
""" | |
# 假设每个任务项平均需要2秒处理 | |
# 这个值可以根据实际情况调整或从历史数据中学习 | |
avg_time_per_item = 2 | |
total_items = len(input_data) | |
# 考虑并行处理因素 | |
cpu_count = multiprocessing.cpu_count() | |
parallel_factor = min(cpu_count, total_items) | |
if parallel_factor > 0: | |
estimated_seconds = (total_items * avg_time_per_item) / parallel_factor | |
return timedelta(seconds=estimated_seconds) | |
else: | |
return timedelta(seconds=0) | |
def process_task_queue(): | |
"""处理任务队列的后台线程""" | |
while True: | |
try: | |
if task_queue.empty(): | |
time.sleep(0.5) | |
continue | |
task_info = task_queue.get() | |
task_id = task_info["id"] | |
# 更新任务状态 | |
with task_lock: | |
active_tasks[task_id]["status"] = "processing" | |
trigger_ui_update() # 状态变更为处理中时更新UI | |
# 处理任务 | |
result = evaluate(task_info["data"]) | |
# 更新任务结果 | |
with task_lock: | |
active_tasks[task_id]["status"] = "completed" | |
active_tasks[task_id]["completed_at"] = datetime.now() | |
active_tasks[task_id]["result"] = result | |
# 将任务移至已完成列表 | |
completed_tasks.append(active_tasks[task_id]) | |
del active_tasks[task_id] | |
# 保留最近的20个已完成任务 | |
if len(completed_tasks) > 20: | |
completed_tasks.pop(0) | |
trigger_ui_update() # 任务完成时更新UI | |
task_queue.task_done() | |
except Exception as e: | |
print(f"Error processing task queue: {str(e)}") | |
# 出错时也触发UI更新 | |
with task_lock: | |
if task_id in active_tasks: | |
active_tasks[task_id]["status"] = "error" | |
active_tasks[task_id]["error"] = str(e) | |
active_tasks[task_id]["completed_at"] = datetime.now() | |
# 将任务移至已完成列表 | |
completed_tasks.append(active_tasks[task_id]) | |
del active_tasks[task_id] | |
# 保留最近的20个已完成任务 | |
if len(completed_tasks) > 20: | |
completed_tasks.pop(0) | |
trigger_ui_update() | |
time.sleep(1) | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def get_queue_status(): | |
"""获取当前队列状态 | |
Returns: | |
dict: 包含队列状态的字典 | |
""" | |
with task_lock: | |
queued_tasks = [task for task in active_tasks.values() if task["status"] == "queued"] | |
processing_tasks = [task for task in active_tasks.values() if task["status"] == "processing"] | |
# 计算总的预计完成时间 | |
total_estimated_time = timedelta(seconds=0) | |
for task in active_tasks.values(): | |
if isinstance(task["estimated_completion_time"], timedelta): | |
total_estimated_time += task["estimated_completion_time"] | |
# 计算队列中的项目总数 | |
total_items = sum(task["items_count"] for task in active_tasks.values()) | |
return { | |
"queued_tasks": len(queued_tasks), | |
"processing_tasks": len(processing_tasks), | |
"total_tasks": len(active_tasks), | |
"total_items": total_items, | |
"estimated_completion_time": str(total_estimated_time), | |
"active_tasks": [ | |
{ | |
"id": task["id"], | |
"status": task["status"], | |
"items_count": task["items_count"], | |
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), | |
"estimated_completion": str(task["estimated_completion_time"]) | |
} for task in active_tasks.values() | |
], | |
"recent_completed": [ | |
{ | |
"id": task["id"], | |
"items_count": task["items_count"], | |
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), | |
"completed_at": task["completed_at"].strftime("%Y-%m-%d %H:%M:%S") if "completed_at" in task else "", | |
"duration": str(task["completed_at"] - task["submitted_at"]) if "completed_at" in task else "" | |
} for task in completed_tasks[-5:] # 只显示最近5个完成的任务 | |
] | |
} | |
def render_queue_status(): | |
"""渲染队列状态UI | |
Returns: | |
str: HTML格式的队列状态显示 | |
""" | |
status = get_queue_status() | |
html = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 900px; margin: 0 auto; padding: 20px; background: #f9f9f9; border-radius: 10px; box-shadow: 0 4px 12px rgba(0,0,0,0.1);"> | |
<div style="display: flex; justify-content: space-between; margin-bottom: 20px;"> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">排队任务</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #3498db;">{status['queued_tasks']}</p> | |
</div> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">处理中任务</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #e74c3c;">{status['processing_tasks']}</p> | |
</div> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">总项目数</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #2ecc71;">{status['total_items']}</p> | |
</div> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">预计完成时间</h2> | |
<div style="display: flex; align-items: center;"> | |
<div style="font-size: 24px; font-weight: bold; color: #9b59b6;">{status['estimated_completion_time']}</div> | |
<div style="margin-left: 10px; color: #777; font-size: 14px;">(小时:分钟:秒)</div> | |
</div> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">活跃任务</h2> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<thead> | |
<tr style="background: #f2f2f2;"> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">状态</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">预计完成</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if status['active_tasks']: | |
for task in status['active_tasks']: | |
status_color = "#3498db" if task['status'] == "queued" else "#e74c3c" | |
html += f""" | |
<tr> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;"> | |
<span style="padding: 4px 8px; border-radius: 4px; font-size: 12px; background: {status_color}; color: white;"> | |
{task['status']} | |
</span> | |
</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['estimated_completion']}</td> | |
</tr> | |
""" | |
else: | |
html += f""" | |
<tr> | |
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">当前没有活跃任务</td> | |
</tr> | |
""" | |
html += """ | |
</tbody> | |
</table> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">最近完成的任务</h2> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<thead> | |
<tr style="background: #f2f2f2;"> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">完成时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">持续时间</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if status['recent_completed']: | |
for task in status['recent_completed']: | |
html += f""" | |
<tr> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['completed_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['duration']}</td> | |
</tr> | |
""" | |
else: | |
html += f""" | |
<tr> | |
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">暂无已完成任务</td> | |
</tr> | |
""" | |
html += """ | |
</tbody> | |
</table> | |
</div> | |
</div> | |
""" | |
return html | |
def refresh_ui(): | |
"""定期刷新UI函数""" | |
return render_queue_status() | |
def check_for_updates(): | |
"""检查是否有UI更新请求""" | |
if update_event.is_set(): | |
update_event.clear() | |
return render_queue_status() | |
return gr.update() | |
def submit_json_data(json_data): | |
"""提交JSON数据处理接口""" | |
try: | |
# 解析JSON字符串 | |
if isinstance(json_data, str): | |
data = json.loads(json_data) | |
else: | |
data = json_data | |
result = submit_task(data) | |
response = json.dumps(result, ensure_ascii=False, indent=2) | |
# 返回任务提交结果和最新的队列状态 | |
return response, render_queue_status() | |
except Exception as e: | |
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False, indent=2), render_queue_status() | |
# 创建Gradio接口 | |
with gr.Blocks(title="代码评估服务", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 代码评估服务 | |
### 支持多种编程语言的代码评估服务 | |
""") | |
with gr.Tab("任务队列状态"): | |
status_html = gr.HTML(render_queue_status) | |
refresh_button = gr.Button("刷新状态") | |
refresh_button.click(fn=refresh_ui, outputs=status_html) | |
# 设置自动刷新 - 使用Gradio内置的轮询功能 | |
demo.load(check_for_updates, None, status_html, every=1) | |
# JavaScript刷新功能保留作为备份 | |
gr.HTML(""" | |
<script> | |
// 使用JavaScript作为备份的自动刷新 | |
function setupAutoRefresh() { | |
const refreshInterval = 5000; // 5秒 | |
// 查找刷新按钮并模拟点击 | |
function autoRefresh() { | |
// 获取所有按钮元素 | |
const buttons = document.querySelectorAll('button'); | |
// 查找刷新状态按钮 | |
for (const button of buttons) { | |
if (button.textContent === '刷新状态') { | |
button.click(); | |
break; | |
} | |
} | |
} | |
// 设置定时器 | |
setInterval(autoRefresh, refreshInterval); | |
} | |
// 页面加载完成后设置自动刷新 | |
window.addEventListener('load', setupAutoRefresh); | |
</script> | |
""") | |
with gr.Tab("提交新任务"): | |
with gr.Row(): | |
with gr.Column(): | |
json_input = gr.Textbox( | |
label="输入JSON数据", | |
placeholder='[{"language": "python", "prompt": "def add(a, b):\\n", "processed_completions": [" return a + b"], "tests": "assert add(1, 2) == 3"}]', | |
lines=10 | |
) | |
submit_button = gr.Button("提交任务") | |
with gr.Column(): | |
result_output = gr.Textbox(label="提交结果", lines=5) | |
submit_button.click(fn=submit_json_data, inputs=json_input, outputs=[result_output, status_html]) | |
with gr.Tab("API文档"): | |
gr.Markdown(""" | |
## API 文档 | |
### 1. 提交任务 | |
**请求:** | |
``` | |
POST /api/predict | |
Content-Type: application/json | |
[ | |
{ | |
"language": "python", | |
"prompt": "def add(a, b):\\n", | |
"processed_completions": [" return a + b"], | |
"tests": "assert add(1, 2) == 3" | |
} | |
] | |
``` | |
**响应:** | |
```json | |
{ | |
"status": "success", | |
"task_id": "task_1" | |
} | |
``` | |
### 2. 查询任务状态 | |
**请求:** | |
``` | |
GET /api/status | |
``` | |
**响应:** | |
```json | |
{ | |
"queued_tasks": 1, | |
"processing_tasks": 2, | |
"total_tasks": 3, | |
"total_items": 15, | |
"estimated_completion_time": "0:05:30", | |
"active_tasks": [...], | |
"recent_completed": [...] | |
} | |
``` | |
""") | |
# 添加API端点 | |
with gr.Row(visible=False): | |
api_input = gr.JSON() | |
api_output = gr.JSON() | |
api_input.change(fn=evaluate, inputs=api_input, outputs=api_output) | |
if __name__ == "__main__": | |
demo.queue() | |
demo.launch() |