Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import time | |
import threading | |
import queue | |
from datetime import datetime, timedelta | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
# 创建任务队列和相关变量 | |
task_queue = queue.Queue() | |
tasks_info = [] | |
tasks_lock = threading.Lock() | |
total_tasks_count = 0 | |
completed_tasks_count = 0 | |
estimated_completion_time = None | |
average_task_time = 5 # 默认平均任务时间(秒) | |
def add_task(input_data): | |
"""添加任务到队列 | |
Args: | |
input_data: 任务数据 | |
Returns: | |
dict: 包含任务ID和状态的字典 | |
""" | |
global total_tasks_count, tasks_info | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "错误", "message": "输入必须是列表格式"} | |
task_id = int(time.time() * 1000) # 使用时间戳作为任务ID | |
task_count = len(input_data) | |
with tasks_lock: | |
for item in input_data: | |
task_queue.put((task_id, item)) | |
total_tasks_count += 1 | |
# 更新任务信息 | |
task_info = { | |
"id": task_id, | |
"count": task_count, | |
"status": "等待中", | |
"submit_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"completed": 0 | |
} | |
tasks_info.append(task_info) | |
# 更新预计完成时间 | |
update_estimated_completion_time() | |
return {"status": "成功", "task_id": task_id, "message": f"已添加{task_count}个任务到队列"} | |
except Exception as e: | |
return {"status": "错误", "message": str(e)} | |
def update_estimated_completion_time(): | |
"""更新预计完成时间""" | |
global estimated_completion_time, average_task_time | |
remaining_tasks = total_tasks_count - completed_tasks_count | |
if remaining_tasks > 0: | |
# 计算预计完成时间 | |
estimated_seconds = remaining_tasks * average_task_time | |
estimated_completion_time = datetime.now() + timedelta(seconds=estimated_seconds) | |
else: | |
estimated_completion_time = None | |
def process_tasks(): | |
"""处理队列中的任务""" | |
global completed_tasks_count, average_task_time, tasks_info | |
while True: | |
try: | |
if not task_queue.empty(): | |
start_time = time.time() | |
# 获取任务并处理 | |
task_id, task_data = task_queue.get() | |
result = evaluate_single_case(task_data) | |
# 更新任务状态 | |
with tasks_lock: | |
completed_tasks_count += 1 | |
# 更新平均任务时间(使用移动平均) | |
task_time = time.time() - start_time | |
average_task_time = (average_task_time * 0.9) + (task_time * 0.1) | |
# 更新任务信息 | |
for task in tasks_info: | |
if task["id"] == task_id: | |
task["completed"] += 1 | |
if task["completed"] >= task["count"]: | |
task["status"] = "已完成" | |
break | |
# 更新预计完成时间 | |
update_estimated_completion_time() | |
task_queue.task_done() | |
else: | |
time.sleep(0.1) | |
except Exception as e: | |
print(f"处理任务时出错: {str(e)}") | |
time.sleep(1) | |
# 启动任务处理线程 | |
for _ in range(multiprocessing.cpu_count()): | |
threading.Thread(target=process_tasks, daemon=True).start() | |
def get_queue_status(): | |
"""获取队列状态 | |
Returns: | |
tuple: 包含任务信息、完成进度和预计完成时间的元组 | |
""" | |
with tasks_lock: | |
# 复制任务信息以避免并发问题 | |
current_tasks = tasks_info.copy() | |
# 计算进度 | |
if total_tasks_count > 0: | |
progress = completed_tasks_count / total_tasks_count | |
else: | |
progress = 0 | |
# 格式化预计完成时间 | |
if estimated_completion_time: | |
eta = estimated_completion_time.strftime("%Y-%m-%d %H:%M:%S") | |
remaining_seconds = (estimated_completion_time - datetime.now()).total_seconds() | |
if remaining_seconds < 60: | |
eta_text = f"{int(remaining_seconds)}秒后完成" | |
elif remaining_seconds < 3600: | |
eta_text = f"{int(remaining_seconds/60)}分钟后完成" | |
else: | |
eta_text = f"{int(remaining_seconds/3600)}小时{int((remaining_seconds%3600)/60)}分钟后完成" | |
else: | |
eta = "无任务" | |
eta_text = "无任务" | |
return current_tasks, progress, eta, eta_text | |
# 创建Gradio界面 | |
with gr.Blocks(title="代码评估服务", theme=gr.themes.Soft(primary_hue="blue")) as demo: | |
gr.Markdown( | |
""" | |
# 代码评估服务任务队列监控 | |
实时监控系统中的任务队列状态和预计完成时间 | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# 任务提交区域 | |
with gr.Group(): | |
gr.Markdown("### 提交新任务") | |
input_json = gr.JSON(label="输入数据(JSON格式列表)") | |
submit_btn = gr.Button("提交任务", variant="primary") | |
with gr.Column(scale=3): | |
# 状态概览区域 | |
with gr.Group(): | |
gr.Markdown("### 队列状态概览") | |
with gr.Row(): | |
with gr.Column(): | |
total_tasks = gr.Textbox(label="总任务数", value="0") | |
with gr.Column(): | |
completed = gr.Textbox(label="已完成任务", value="0") | |
with gr.Column(): | |
remaining = gr.Textbox(label="剩余任务", value="0") | |
progress_bar = gr.Slider(minimum=0, maximum=1, value=0, label="完成进度", interactive=False) | |
eta_display = gr.Textbox(label="预计完成时间", value="无任务") | |
# 任务列表区域 | |
with gr.Group(): | |
gr.Markdown("### 任务队列详情") | |
task_table = gr.Dataframe( | |
headers=["任务ID", "任务数量", "状态", "提交时间", "已完成/总数"], | |
datatype=["str", "number", "str", "str", "str"], | |
row_count=10, | |
col_count=(5, "fixed"), | |
interactive=False | |
) | |
# 刷新按钮 | |
refresh_btn = gr.Button("刷新状态", variant="secondary") | |
# 添加任务的处理函数 | |
def handle_submit(input_data): | |
result = add_task(input_data) | |
return update_ui_status(), result["message"] | |
# 更新UI状态的函数 | |
def update_ui_status(): | |
tasks, progress, eta, eta_text = get_queue_status() | |
# 准备表格数据 | |
table_data = [] | |
for task in tasks: | |
table_data.append([ | |
str(task["id"]), | |
task["count"], | |
task["status"], | |
task["submit_time"], | |
f"{task['completed']}/{task['count']}" | |
]) | |
return ( | |
table_data, | |
str(total_tasks_count), | |
str(completed_tasks_count), | |
str(total_tasks_count - completed_tasks_count), | |
progress, | |
eta_text | |
) | |
# 设置事件处理 | |
submit_btn.click( | |
handle_submit, | |
inputs=[input_json], | |
outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display, gr.Textbox(visible=False)] | |
) | |
refresh_btn.click( | |
update_ui_status, | |
inputs=[], | |
outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display] | |
) | |
# 定时刷新UI | |
demo.load(update_ui_status, inputs=[], outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display]) | |
# 注释掉不兼容的every方法,改用JavaScript实现自动刷新 | |
refresh_interval = gr.Number(value=3, visible=False) | |
demo.load(lambda: None, None, js="(()=>{setInterval(()=>{document.querySelector('#refresh-btn').click();}, 3000);return [];})") | |
# 添加evaluate函数作为API端点 | |
demo.queue() | |
demo.add_api_route("/evaluate", evaluate, methods=["POST"]) | |
if __name__ == "__main__": | |
demo.launch() |