import gradio as gr import json import importlib import os import sys import time from pathlib import Path import concurrent.futures import multiprocessing import threading import queue from datetime import datetime, timedelta from src.containerized_eval import eval_string_script # 添加当前目录和src目录到模块搜索路径 current_dir = os.path.dirname(os.path.abspath(__file__)) src_dir = os.path.join(current_dir, "src") if current_dir not in sys.path: sys.path.append(current_dir) if src_dir not in sys.path: sys.path.append(src_dir) # 添加全局任务队列和任务状态跟踪 task_queue = queue.Queue() task_results = {} active_tasks = {} completed_tasks = [] task_id_counter = 0 task_lock = threading.Lock() update_event = threading.Event() # 通知UI需要刷新 def trigger_ui_update(): """触发UI更新事件""" update_event.set() def get_next_task_id(): global task_id_counter with task_lock: task_id_counter += 1 return f"task_{task_id_counter}" def submit_task(input_data): """提交任务到队列 Args: input_data: 列表(批量处理多个测试用例) Returns: str: 任务ID """ try: if not isinstance(input_data, list): return {"status": "error", "message": "Input must be a list"} task_id = get_next_task_id() with task_lock: estimated_time = estimate_completion_time(input_data) task_info = { "id": task_id, "data": input_data, "status": "queued", "submitted_at": datetime.now(), "estimated_completion_time": estimated_time, "items_count": len(input_data) } active_tasks[task_id] = task_info task_queue.put(task_info) # 触发UI更新 trigger_ui_update() # 如果这是第一个任务,启动处理线程 if len(active_tasks) == 1: threading.Thread(target=process_task_queue, daemon=True).start() return {"status": "success", "task_id": task_id} except Exception as e: return {"status": "error", "message": str(e)} def estimate_completion_time(input_data): """估计完成任务所需的时间 Args: input_data: 任务数据 Returns: timedelta: 估计的完成时间 """ # 假设每个任务项平均需要2秒处理 # 这个值可以根据实际情况调整或从历史数据中学习 avg_time_per_item = 2 total_items = len(input_data) # 考虑并行处理因素 cpu_count = multiprocessing.cpu_count() parallel_factor = min(cpu_count, total_items) if parallel_factor > 0: estimated_seconds = (total_items * avg_time_per_item) / parallel_factor return timedelta(seconds=estimated_seconds) else: return timedelta(seconds=0) def process_task_queue(): """处理任务队列的后台线程""" while True: try: if task_queue.empty(): time.sleep(0.5) continue task_info = task_queue.get() task_id = task_info["id"] # 更新任务状态 with task_lock: active_tasks[task_id]["status"] = "processing" trigger_ui_update() # 状态变更为处理中时更新UI # 处理任务 result = evaluate(task_info["data"]) # 更新任务结果 with task_lock: active_tasks[task_id]["status"] = "completed" active_tasks[task_id]["completed_at"] = datetime.now() active_tasks[task_id]["result"] = result # 将任务移至已完成列表 completed_tasks.append(active_tasks[task_id]) del active_tasks[task_id] # 保留最近的20个已完成任务 if len(completed_tasks) > 20: completed_tasks.pop(0) trigger_ui_update() # 任务完成时更新UI task_queue.task_done() except Exception as e: print(f"Error processing task queue: {str(e)}") time.sleep(1) def evaluate(input_data): """评估代码的主函数 Args: input_data: 列表(批量处理多个测试用例) Returns: list: 包含评估结果的列表 """ try: if not isinstance(input_data, list): return {"status": "Exception", "error": "Input must be a list"} results = [] max_workers = multiprocessing.cpu_count() with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} for future in concurrent.futures.as_completed(future_to_item): item = future_to_item[future] try: result = future.result() item.update(result) results.append(item) except Exception as e: item.update({"status": "Exception", "error": str(e)}) results.append(item) return results except Exception as e: return {"status": "Exception", "error": str(e)} def evaluate_single_case(input_data): """评估单个代码用例 Args: input_data: 字典(包含代码信息) Returns: dict: 包含评估结果的字典 """ try: if not isinstance(input_data, dict): return {"status": "Exception", "error": "Input item must be a dictionary"} language = input_data.get('language') completions = input_data.get('processed_completions', []) if not completions: return {"status": "Exception", "error": "No code provided"} results = [] for comp in completions: code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') result = evaluate_code(code, language) if result["status"] == "OK": return result results.append(result) return results[0] except Exception as e: return {"status": "Exception", "error": str(e)} def evaluate_code(code, language): """评估特定语言的代码 Args: code (str): 要评估的代码 language (str): 编程语言 Returns: dict: 包含评估结果的字典 """ try: # 使用containerized_eval中的eval_string_script函数 result = eval_string_script(language, code) return result except Exception as e: return {"status": "Exception", "error": str(e)} def get_queue_status(): """获取当前队列状态 Returns: dict: 包含队列状态的字典 """ with task_lock: queued_tasks = [task for task in active_tasks.values() if task["status"] == "queued"] processing_tasks = [task for task in active_tasks.values() if task["status"] == "processing"] # 计算总的预计完成时间 total_estimated_time = timedelta(seconds=0) for task in active_tasks.values(): if isinstance(task["estimated_completion_time"], timedelta): total_estimated_time += task["estimated_completion_time"] # 计算队列中的项目总数 total_items = sum(task["items_count"] for task in active_tasks.values()) return { "queued_tasks": len(queued_tasks), "processing_tasks": len(processing_tasks), "total_tasks": len(active_tasks), "total_items": total_items, "estimated_completion_time": str(total_estimated_time), "active_tasks": [ { "id": task["id"], "status": task["status"], "items_count": task["items_count"], "submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), "estimated_completion": str(task["estimated_completion_time"]) } for task in active_tasks.values() ], "recent_completed": [ { "id": task["id"], "items_count": task["items_count"], "submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), "completed_at": task["completed_at"].strftime("%Y-%m-%d %H:%M:%S") if "completed_at" in task else "", "duration": str(task["completed_at"] - task["submitted_at"]) if "completed_at" in task else "" } for task in completed_tasks[-5:] # 只显示最近5个完成的任务 ] } def render_queue_status(): """渲染队列状态UI Returns: str: HTML格式的队列状态显示 """ status = get_queue_status() html = f"""
{status['queued_tasks']}
{status['processing_tasks']}
{status['total_items']}
任务ID | 状态 | 项目数 | 提交时间 | 预计完成 |
---|---|---|---|---|
{task['id']} | {task['status']} | {task['items_count']} | {task['submitted_at']} | {task['estimated_completion']} |
当前没有活跃任务 |
任务ID | 项目数 | 提交时间 | 完成时间 | 持续时间 |
---|---|---|---|---|
{task['id']} | {task['items_count']} | {task['submitted_at']} | {task['completed_at']} | {task['duration']} |
暂无已完成任务 |