Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import time | |
import threading | |
import queue | |
import uuid | |
from datetime import datetime | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
# 创建消息队列 | |
task_queue = queue.Queue() | |
# 存储任务状态的字典 | |
task_status = {} | |
# 存储任务历史的列表,最多保存最近10个任务 | |
task_history = [] | |
# 用于保护共享资源的锁 | |
lock = threading.Lock() | |
# 工作线程数 | |
worker_threads = multiprocessing.cpu_count() | |
# 后台线程是否运行的标志 | |
running = True | |
def queue_processor(): | |
"""处理队列中的任务""" | |
while running: | |
try: | |
# 从队列中获取任务,如果队列为空等待0.1秒 | |
task_id, input_data, request_time = task_queue.get(timeout=0.1) | |
with lock: | |
task_status[task_id]['status'] = 'processing' | |
task_status[task_id]['start_time'] = time.time() | |
# 处理任务 | |
result = evaluate(input_data) | |
# 更新任务状态 | |
end_time = time.time() | |
process_time = end_time - task_status[task_id]['start_time'] | |
with lock: | |
task_status[task_id]['status'] = 'completed' | |
task_status[task_id]['result'] = result | |
task_status[task_id]['end_time'] = end_time | |
task_status[task_id]['process_time'] = process_time | |
# 更新任务历史 | |
task_history.append({ | |
'task_id': task_id, | |
'request_time': request_time, | |
'process_time': process_time, | |
'status': 'completed' | |
}) | |
# 只保留最近10个任务 | |
while len(task_history) > 10: | |
task_history.pop(0) | |
# 标记任务完成 | |
task_queue.task_done() | |
except queue.Empty: | |
# 队列为空,继续等待 | |
continue | |
except Exception as e: | |
if 'task_id' in locals(): | |
with lock: | |
task_status[task_id]['status'] = 'error' | |
task_status[task_id]['error'] = str(e) | |
task_status[task_id]['end_time'] = time.time() | |
task_queue.task_done() | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def synchronous_evaluate(input_data): | |
"""同步评估代码,兼容原来的接口 | |
这个函数会阻塞直到评估完成,然后返回结果 | |
Args: | |
input_data: 要评估的输入数据 | |
Returns: | |
dict: 评估结果 | |
""" | |
# 获取队列当前状态 | |
queue_info = get_queue_status() | |
waiting_tasks = queue_info['waiting_tasks'] | |
# 创建任务 | |
task_id = str(uuid.uuid4()) | |
request_time = time.time() | |
with lock: | |
# 创建任务状态记录 | |
task_status[task_id] = { | |
'status': 'queued', | |
'queued_time': request_time, | |
'queue_position': task_queue.qsize() + 1, | |
'synchronous': True # 标记为同步任务 | |
} | |
# 将任务添加到队列 | |
task_queue.put((task_id, input_data, request_time)) | |
# 等待任务完成 | |
while True: | |
with lock: | |
if task_id in task_status: | |
status = task_status[task_id]['status'] | |
if status == 'completed': | |
result = task_status[task_id]['result'] | |
# 任务完成后清理状态 | |
task_status.pop(task_id, None) | |
return result | |
elif status == 'error': | |
error = task_status[task_id].get('error', '未知错误') | |
# 任务出错后清理状态 | |
task_status.pop(task_id, None) | |
return {"status": "Exception", "error": error} | |
# 短暂睡眠避免CPU占用过高 | |
time.sleep(0.1) | |
def enqueue_task(input_data): | |
"""将任务添加到队列 | |
Args: | |
input_data: 要处理的任务数据 | |
Returns: | |
dict: 包含任务ID和状态的字典 | |
""" | |
task_id = str(uuid.uuid4()) | |
request_time = time.time() | |
with lock: | |
# 创建任务状态记录 | |
task_status[task_id] = { | |
'status': 'queued', | |
'queued_time': request_time, | |
'queue_position': task_queue.qsize() + 1 | |
} | |
# 将任务添加到队列 | |
task_queue.put((task_id, input_data, request_time)) | |
return { | |
'task_id': task_id, | |
'status': 'queued', | |
'queue_position': task_status[task_id]['queue_position'], | |
'estimated_wait': task_status[task_id]['queue_position'] * 5 # 估计等待时间,假设每个任务平均5秒 | |
} | |
def check_status(task_id): | |
"""检查任务状态 | |
Args: | |
task_id: 任务ID | |
Returns: | |
dict: 包含任务状态的字典 | |
""" | |
with lock: | |
if task_id not in task_status: | |
return {'status': 'not_found'} | |
status_info = task_status[task_id].copy() | |
# 如果任务已完成,从状态字典中移除(避免内存泄漏) | |
if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600: | |
task_status.pop(task_id, None) | |
return status_info | |
def get_queue_status(): | |
"""获取队列状态 | |
Returns: | |
dict: 包含队列状态的字典 | |
""" | |
with lock: | |
queue_size = task_queue.qsize() | |
active_tasks = sum(1 for status in task_status.values() if status['status'] == 'processing') | |
waiting_tasks = sum(1 for status in task_status.values() if status['status'] == 'queued') | |
recent_tasks = task_history[-5:] if task_history else [] | |
avg_time = 0 | |
if recent_tasks: | |
avg_time = sum(task['process_time'] for task in recent_tasks) / len(recent_tasks) | |
return { | |
'queue_size': queue_size, | |
'active_tasks': active_tasks, | |
'waiting_tasks': waiting_tasks, | |
'worker_threads': worker_threads, | |
'estimated_wait': waiting_tasks * avg_time if avg_time > 0 else waiting_tasks * 5, | |
'recent_tasks': recent_tasks | |
} | |
def format_time(seconds): | |
"""格式化时间为易读格式 | |
Args: | |
seconds: 秒数 | |
Returns: | |
str: 格式化的时间字符串 | |
""" | |
if seconds < 60: | |
return f"{seconds:.1f}秒" | |
elif seconds < 3600: | |
minutes = int(seconds / 60) | |
seconds = seconds % 60 | |
return f"{minutes}分{seconds:.1f}秒" | |
else: | |
hours = int(seconds / 3600) | |
minutes = int((seconds % 3600) / 60) | |
return f"{hours}小时{minutes}分钟" | |
def ui_submit(input_json): | |
"""提交任务到队列的UI函数 | |
Args: | |
input_json: JSON格式的输入数据 | |
Returns: | |
tuple: 任务ID和初始状态信息 | |
""" | |
try: | |
input_data = json.loads(input_json) if isinstance(input_json, str) else input_json | |
response = enqueue_task(input_data) | |
task_id = response['task_id'] | |
status_html = f""" | |
<div class="status-card"> | |
<h3>任务状态</h3> | |
<p><b>任务ID:</b> {task_id}</p> | |
<p><b>状态:</b> 已入队</p> | |
<p><b>队列位置:</b> {response['queue_position']}</p> | |
<p><b>预计等待时间:</b> {format_time(response['estimated_wait'])}</p> | |
</div> | |
""" | |
return task_id, status_html | |
except Exception as e: | |
return None, f"<div class='error-message'>提交失败: {str(e)}</div>" | |
def ui_check_status(task_id): | |
"""检查任务状态的UI函数 | |
Args: | |
task_id: 任务ID | |
Returns: | |
str: 包含任务状态的HTML | |
""" | |
if not task_id: | |
return "<div class='notice'>请提供任务ID</div>" | |
status = check_status(task_id) | |
if status['status'] == 'not_found': | |
return "<div class='error-message'>任务未找到</div>" | |
if status['status'] == 'queued': | |
queue_info = get_queue_status() | |
est_wait = queue_info['estimated_wait'] / queue_info['waiting_tasks'] * status['queue_position'] if queue_info['waiting_tasks'] > 0 else 0 | |
return f""" | |
<div class="status-card"> | |
<h3>任务状态</h3> | |
<p><b>任务ID:</b> {task_id}</p> | |
<p><b>状态:</b> 等待中</p> | |
<p><b>队列位置:</b> {status['queue_position']}</p> | |
<p><b>入队时间:</b> {datetime.fromtimestamp(status['queued_time']).strftime('%H:%M:%S')}</p> | |
<p><b>预计等待时间:</b> {format_time(est_wait)}</p> | |
</div> | |
""" | |
if status['status'] == 'processing': | |
process_time = time.time() - status['start_time'] | |
return f""" | |
<div class="status-card"> | |
<h3>任务状态</h3> | |
<p><b>任务ID:</b> {task_id}</p> | |
<p><b>状态:</b> 处理中</p> | |
<p><b>已处理时间:</b> {format_time(process_time)}</p> | |
<p><b>入队时间:</b> {datetime.fromtimestamp(status['queued_time']).strftime('%H:%M:%S')}</p> | |
</div> | |
""" | |
if status['status'] == 'completed': | |
result = status.get('result', {}) | |
return f""" | |
<div class="status-card success"> | |
<h3>任务状态</h3> | |
<p><b>任务ID:</b> {task_id}</p> | |
<p><b>状态:</b> 已完成</p> | |
<p><b>处理时间:</b> {format_time(status['process_time'])}</p> | |
<p><b>完成时间:</b> {datetime.fromtimestamp(status['end_time']).strftime('%H:%M:%S')}</p> | |
</div> | |
""" | |
if status['status'] == 'error': | |
return f""" | |
<div class="status-card error"> | |
<h3>任务状态</h3> | |
<p><b>任务ID:</b> {task_id}</p> | |
<p><b>状态:</b> 错误</p> | |
<p><b>错误信息:</b> {status.get('error', '未知错误')}</p> | |
</div> | |
""" | |
return "<div class='error-message'>未知状态</div>" | |
def ui_get_queue_info(): | |
"""获取队列信息的UI函数 | |
Returns: | |
str: 包含队列信息的HTML | |
""" | |
queue_info = get_queue_status() | |
tasks_html = "" | |
for task in reversed(queue_info['recent_tasks']): | |
tasks_html += f""" | |
<tr> | |
<td>{task['task_id'][:8]}...</td> | |
<td>{datetime.fromtimestamp(task['request_time']).strftime('%H:%M:%S')}</td> | |
<td>{format_time(task['process_time'])}</td> | |
</tr> | |
""" | |
return f""" | |
<div class="queue-info-card"> | |
<h3>队列状态</h3> | |
<div class="queue-stats"> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['waiting_tasks']}</div> | |
<div class="stat-label">等待中</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['active_tasks']}</div> | |
<div class="stat-label">处理中</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['worker_threads']}</div> | |
<div class="stat-label">工作线程</div> | |
</div> | |
</div> | |
<div class="wait-time"> | |
<p><b>当前预计等待时间:</b> {format_time(queue_info['estimated_wait'])}</p> | |
</div> | |
<h4>最近处理的任务</h4> | |
<table class="recent-tasks"> | |
<tr> | |
<th>任务ID</th> | |
<th>请求时间</th> | |
<th>处理时间</th> | |
</tr> | |
{tasks_html} | |
</table> | |
</div> | |
""" | |
def ui_get_result(task_id): | |
"""获取任务结果的UI函数 | |
Args: | |
task_id: 任务ID | |
Returns: | |
任务结果 | |
""" | |
if not task_id: | |
return None | |
status = check_status(task_id) | |
if status['status'] == 'completed': | |
return status.get('result', None) | |
return None | |
def launch_workers(): | |
"""启动工作线程""" | |
global running | |
running = True | |
# 创建工作线程 | |
for _ in range(worker_threads): | |
worker = threading.Thread(target=queue_processor) | |
worker.daemon = True | |
worker.start() | |
# 自定义CSS | |
custom_css = """ | |
.container { | |
max-width: 1200px; | |
margin: 0 auto; | |
} | |
.status-card, .queue-info-card { | |
background: #f7f7f7; | |
border-radius: 10px; | |
padding: 15px; | |
margin: 10px 0; | |
box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
} | |
.status-card.success { | |
background: #e7f5e7; | |
border-left: 5px solid #28a745; | |
} | |
.status-card.error { | |
background: #f8d7da; | |
border-left: 5px solid #dc3545; | |
} | |
.error-message { | |
color: #dc3545; | |
font-weight: bold; | |
padding: 10px; | |
background: #f8d7da; | |
border-radius: 5px; | |
} | |
.notice { | |
color: #0c5460; | |
background-color: #d1ecf1; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.queue-stats { | |
display: flex; | |
justify-content: space-around; | |
margin: 20px 0; | |
} | |
.stat-item { | |
text-align: center; | |
} | |
.stat-value { | |
font-size: 24px; | |
font-weight: bold; | |
color: #007bff; | |
} | |
.stat-label { | |
color: #6c757d; | |
} | |
.wait-time { | |
text-align: center; | |
margin: 15px 0; | |
padding: 10px; | |
background: #e2e3e5; | |
border-radius: 5px; | |
} | |
.recent-tasks { | |
width: 100%; | |
border-collapse: collapse; | |
margin-top: 10px; | |
} | |
.recent-tasks th, .recent-tasks td { | |
border: 1px solid #dee2e6; | |
padding: 8px; | |
text-align: center; | |
} | |
.recent-tasks th { | |
background-color: #e9ecef; | |
} | |
.tabs { | |
margin-top: 20px; | |
} | |
""" | |
# 初始化并启动工作线程 | |
launch_workers() | |
# 创建Gradio接口 | |
with gr.Blocks(css=custom_css) as demo: | |
gr.Markdown("# 代码评估服务") | |
gr.Markdown("支持多种编程语言的代码评估服务,采用队列机制处理请求") | |
with gr.Tab("提交任务"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
input_json = gr.JSON(label="输入数据") | |
submit_btn = gr.Button("提交任务", variant="primary") | |
task_id_output = gr.Textbox(label="任务ID", visible=True) | |
status_html = gr.HTML(label="状态") | |
with gr.Column(scale=1): | |
queue_info_html = gr.HTML() | |
refresh_queue_btn = gr.Button("刷新队列状态") | |
with gr.Tab("查询任务"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
check_task_id = gr.Textbox(label="任务ID") | |
check_btn = gr.Button("查询状态", variant="primary") | |
get_result_btn = gr.Button("获取结果") | |
with gr.Column(scale=2): | |
check_status_html = gr.HTML() | |
result_output = gr.JSON(label="任务结果") | |
# 定义更新函数 | |
def update_queue_info(): | |
return ui_get_queue_info() | |
# 定时更新队列信息 | |
demo.load(update_queue_info, None, queue_info_html, every=5) | |
# 事件处理 | |
submit_btn.click(ui_submit, inputs=[input_json], outputs=[task_id_output, status_html]) | |
refresh_queue_btn.click(update_queue_info, None, queue_info_html) | |
check_btn.click(ui_check_status, inputs=[check_task_id], outputs=[check_status_html]) | |
get_result_btn.click(ui_get_result, inputs=[check_task_id], outputs=[result_output]) | |
# 添加兼容原有接口的评估端点 | |
demo.queue() | |
evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=gr.JSON(), outputs=gr.JSON(), api_name="evaluate") | |
if __name__ == "__main__": | |
try: | |
demo.launch() | |
finally: | |
# 停止工作线程 | |
running = False |