Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import time | |
import threading | |
import queue | |
import uuid | |
import numpy as np | |
from datetime import datetime | |
from tqdm.auto import tqdm | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
# 创建消息队列 | |
task_queue = queue.Queue() | |
# 存储任务状态的字典 | |
task_status = {} | |
# 存储任务历史的列表,最多保存最近20个任务 | |
task_history = [] | |
# 用于保护共享资源的锁 | |
lock = threading.Lock() | |
# 工作线程数 | |
worker_threads = multiprocessing.cpu_count() | |
# 后台线程是否运行的标志 | |
running = True | |
# 任务类型到处理时间的映射 | |
task_type_times = {} | |
def queue_processor(): | |
"""处理队列中的任务""" | |
while running: | |
try: | |
# 从队列中获取任务,如果队列为空等待0.1 | |
task_id, input_data, request_time = task_queue.get(timeout=0.1) | |
with lock: | |
task_status[task_id]['status'] = 'processing' | |
task_status[task_id]['start_time'] = time.time() | |
# 识别任务特征以估计完成时间 | |
# 例如:语言类型、代码大小等 | |
if isinstance(input_data, list) and len(input_data) > 0: | |
sample_task = input_data[0] | |
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' | |
task_size = len(input_data) | |
task_complexity = _estimate_task_complexity(input_data) | |
with lock: | |
task_status[task_id]['estimated_factors'] = { | |
'language': language, | |
'size': task_size, | |
'complexity': task_complexity | |
} | |
# 处理任务 | |
result = evaluate(input_data) | |
# 更新任务状态 | |
end_time = time.time() | |
process_time = end_time - task_status[task_id]['start_time'] | |
with lock: | |
task_status[task_id]['status'] = 'completed' | |
task_status[task_id]['result'] = result | |
task_status[task_id]['end_time'] = end_time | |
task_status[task_id]['process_time'] = process_time | |
# 更新任务类型到处理时间的映射 | |
if 'estimated_factors' in task_status[task_id]: | |
factors = task_status[task_id]['estimated_factors'] | |
key = f"{factors['language']}_{factors['complexity']}" | |
if key not in task_type_times: | |
task_type_times[key] = [] | |
# 记录此类型任务的处理时间 | |
task_type_times[key].append(process_time / factors['size']) | |
# 只保留最近的10个记录 | |
if len(task_type_times[key]) > 10: | |
task_type_times[key] = task_type_times[key][-10:] | |
# 更新任务历史 | |
task_history.append({ | |
'task_id': task_id, | |
'request_time': request_time, | |
'process_time': process_time, | |
'status': 'completed', | |
'factors': task_status[task_id].get('estimated_factors', {}) | |
}) | |
# 只保留最近20个任务 | |
while len(task_history) > 20: | |
task_history.pop(0) | |
# 标记任务完成 | |
task_queue.task_done() | |
except queue.Empty: | |
# 队列为空,继续等待 | |
continue | |
except Exception as e: | |
if 'task_id' in locals(): | |
with lock: | |
task_status[task_id]['status'] = 'error' | |
task_status[task_id]['error'] = str(e) | |
task_status[task_id]['end_time'] = time.time() | |
task_queue.task_done() | |
def _estimate_task_complexity(tasks): | |
"""估计任务复杂度 | |
Args: | |
tasks: 任务列表 | |
Returns: | |
str: 复杂度评级 ('simple', 'medium', 'complex') | |
""" | |
# 基于代码和测试的长度评估复杂度 | |
total_code_length = 0 | |
count = 0 | |
for task in tasks: | |
if isinstance(task, dict): | |
prompt = task.get('prompt', '') | |
tests = task.get('tests', '') | |
completions = task.get('processed_completions', []) | |
code_length = len(prompt) + len(tests) | |
if completions: | |
code_length += sum(len(comp) for comp in completions) | |
total_code_length += code_length | |
count += 1 | |
if count == 0: | |
return 'medium' # 默认中等复杂度 | |
avg_length = total_code_length / count | |
if avg_length < 1000: | |
return 'simple' | |
elif avg_length < 5000: | |
return 'medium' | |
else: | |
return 'complex' | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def synchronous_evaluate(input_data): | |
"""同步评估代码,兼容原来的接口 | |
这个函数会阻塞直到评估完成,然后返回结果 | |
Args: | |
input_data: 要评估的输入数据 | |
Returns: | |
dict: 评估结果 | |
""" | |
# a) 估计此任务的特征 | |
if isinstance(input_data, list) and len(input_data) > 0: | |
sample_task = input_data[0] | |
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' | |
task_size = len(input_data) | |
task_complexity = _estimate_task_complexity(input_data) | |
else: | |
language = 'unknown' | |
task_size = 1 | |
task_complexity = 'medium' | |
# b) 估计完成时间用于前端显示 | |
estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity) | |
estimated_total_time = estimated_time_per_task * task_size | |
# 获取队列当前状态 | |
queue_info = get_queue_status() | |
waiting_tasks = queue_info['waiting_tasks'] | |
# 创建任务 | |
task_id = str(uuid.uuid4()) | |
request_time = time.time() | |
with lock: | |
# 创建任务状态记录 | |
task_status[task_id] = { | |
'status': 'queued', | |
'queued_time': request_time, | |
'queue_position': task_queue.qsize() + 1, | |
'synchronous': True, # 标记为同步任务 | |
'estimated_factors': { | |
'language': language, | |
'size': task_size, | |
'complexity': task_complexity | |
}, | |
'estimated_time': estimated_total_time | |
} | |
# 将任务添加到队列 | |
task_queue.put((task_id, input_data, request_time)) | |
# 等待任务完成 | |
while True: | |
with lock: | |
if task_id in task_status: | |
status = task_status[task_id]['status'] | |
if status == 'completed': | |
result = task_status[task_id]['result'] | |
# 任务完成后清理状态 | |
task_status.pop(task_id, None) | |
return result | |
elif status == 'error': | |
error = task_status[task_id].get('error', '未知错误') | |
# 任务出错后清理状态 | |
task_status.pop(task_id, None) | |
return {"status": "Exception", "error": error} | |
# 短暂睡眠避免CPU占用过高 | |
time.sleep(0.1) | |
def _get_estimated_time_for_task(language, complexity): | |
"""获取特定类型任务的估计处理时间 | |
Args: | |
language: 编程语言 | |
complexity: 任务复杂度 | |
Returns: | |
float: 估计的处理时间(秒) | |
""" | |
key = f"{language}_{complexity}" | |
# 如果有历史数据,使用中位数作为估计值 | |
if key in task_type_times and len(task_type_times[key]) > 0: | |
return np.median(task_type_times[key]) | |
# 否则使用基于复杂度的默认估计值 | |
if complexity == 'simple': | |
return 1.0 | |
elif complexity == 'medium': | |
return 3.0 | |
else: # complex | |
return 8.0 | |
def enqueue_task(input_data): | |
"""将任务添加到队列 | |
Args: | |
input_data: 要处理的任务数据 | |
Returns: | |
dict: 包含任务ID和状态的字典 | |
""" | |
# 估计任务特征和处理时间 | |
if isinstance(input_data, list) and len(input_data) > 0: | |
sample_task = input_data[0] | |
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' | |
task_size = len(input_data) | |
task_complexity = _estimate_task_complexity(input_data) | |
else: | |
language = 'unknown' | |
task_size = 1 | |
task_complexity = 'medium' | |
estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity) | |
estimated_total_time = estimated_time_per_task * task_size | |
task_id = str(uuid.uuid4()) | |
request_time = time.time() | |
with lock: | |
# 创建任务状态记录 | |
task_status[task_id] = { | |
'status': 'queued', | |
'queued_time': request_time, | |
'queue_position': task_queue.qsize() + 1, | |
'estimated_factors': { | |
'language': language, | |
'size': task_size, | |
'complexity': task_complexity | |
}, | |
'estimated_time': estimated_total_time | |
} | |
# 获取队列状态以计算等待时间 | |
queue_info = get_queue_status() | |
est_wait = queue_info['estimated_wait'] | |
# 将任务添加到队列 | |
task_queue.put((task_id, input_data, request_time)) | |
return { | |
'task_id': task_id, | |
'status': 'queued', | |
'queue_position': task_status[task_id]['queue_position'], | |
'estimated_wait': est_wait, | |
'estimated_processing': estimated_total_time | |
} | |
def check_status(task_id): | |
"""检查任务状态 | |
Args: | |
task_id: 任务ID | |
Returns: | |
dict: 包含任务状态的字典 | |
""" | |
with lock: | |
if task_id not in task_status: | |
return {'status': 'not_found'} | |
status_info = task_status[task_id].copy() | |
# 如果任务已完成,从状态字典中移除(避免内存泄漏) | |
if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600: | |
task_status.pop(task_id, None) | |
return status_info | |
def get_queue_status(): | |
"""获取队列状态 | |
Returns: | |
dict: 包含队列状态的字典 | |
""" | |
with lock: | |
# 获取队列中的所有任务 | |
queued_tasks = [t for t in task_status.values() if t['status'] == 'queued'] | |
processing_tasks = [t for t in task_status.values() if t['status'] == 'processing'] | |
queue_size = task_queue.qsize() | |
active_tasks = len(processing_tasks) | |
waiting_tasks = len(queued_tasks) | |
# 更准确地估计等待时间 | |
# 1. 计算当前处理中任务的剩余时间 | |
remaining_processing_time = 0 | |
for task in processing_tasks: | |
# 如果任务有开始时间和估计总时间 | |
if 'start_time' in task and 'estimated_time' in task: | |
elapsed = time.time() - task['start_time'] | |
# 剩余时间 = 估计总时间 - 已经过去的时间 | |
remaining = max(0, task['estimated_time'] - elapsed) | |
remaining_processing_time += remaining | |
else: | |
# 默认假设还需要2秒 | |
remaining_processing_time += 2 | |
# 使用动态均衡:根据工作线程数量平衡负载 | |
if active_tasks > 0: | |
remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads) | |
# 2. 计算排队中任务的估计处理时间 | |
queued_processing_time = 0 | |
for task in queued_tasks: | |
if 'estimated_time' in task: | |
queued_processing_time += task['estimated_time'] | |
else: | |
# 默认假设每个任务5秒 | |
queued_processing_time += 5 | |
# 考虑并行处理:分摊到可用工作线程 | |
if worker_threads > 0 and queued_processing_time > 0: | |
queued_processing_time = queued_processing_time / worker_threads | |
# 总估计等待时间 | |
estimated_wait = remaining_processing_time + queued_processing_time | |
# 应用统计校正:使用历史数据调整预测 | |
if task_history: | |
# 计算历史预测与实际处理时间的比例 | |
prediction_ratios = [] | |
for task in task_history: | |
if 'factors' in task and 'estimated_time' in task: | |
prediction_ratios.append(task['process_time'] / task['estimated_time']) | |
# 如果有足够数据,使用中位数比例调整当前预测 | |
if prediction_ratios: | |
correction_factor = np.median(prediction_ratios) | |
# 应用校正因子,但限制在合理范围内 | |
correction_factor = max(0.5, min(2.0, correction_factor)) | |
estimated_wait *= correction_factor | |
# 确保等待时间有意义 | |
estimated_wait = max(0.1, estimated_wait) | |
if waiting_tasks == 0 and active_tasks == 0: | |
estimated_wait = 0 | |
# 获取最近处理的任务 | |
recent_tasks = task_history[-5:] if task_history else [] | |
return { | |
'queue_size': queue_size, | |
'active_tasks': active_tasks, | |
'waiting_tasks': waiting_tasks, | |
'worker_threads': worker_threads, | |
'estimated_wait': estimated_wait, | |
'recent_tasks': recent_tasks | |
} | |
def format_time(seconds): | |
"""格式化时间为易读格式 | |
Args: | |
seconds: 秒数 | |
Returns: | |
str: 格式化的时间字符串 | |
""" | |
if seconds < 60: | |
return f"{seconds:.1f}秒" | |
elif seconds < 3600: | |
minutes = int(seconds / 60) | |
seconds = seconds % 60 | |
return f"{minutes}分{seconds:.1f}秒" | |
else: | |
hours = int(seconds / 3600) | |
minutes = int((seconds % 3600) / 60) | |
return f"{hours}小时{minutes}分钟" | |
def ui_get_queue_info(): | |
"""获取队列信息的UI函数 | |
Returns: | |
str: 包含队列信息的HTML | |
""" | |
queue_info = get_queue_status() | |
tasks_html = "" | |
for task in reversed(queue_info['recent_tasks']): | |
tasks_html += f""" | |
<tr> | |
<td>{task['task_id'][:8]}...</td> | |
<td>{datetime.fromtimestamp(task['request_time']).strftime('%H:%M:%S')}</td> | |
<td>{format_time(task['process_time'])}</td> | |
</tr> | |
""" | |
# 如果没有任务历史,显示提示信息 | |
if not tasks_html: | |
tasks_html = """ | |
<tr> | |
<td colspan="3" style="text-align: center; padding: 20px;">暂无历史任务</td> | |
</tr> | |
""" | |
return f""" | |
<div class="dashboard"> | |
<div class="queue-info-card main-card"> | |
<h3 class="card-title">队列状态监控</h3> | |
<div class="queue-stats"> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['waiting_tasks']}</div> | |
<div class="stat-label">等待中</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['active_tasks']}</div> | |
<div class="stat-label">处理中</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{queue_info['worker_threads']}</div> | |
<div class="stat-label">工作线程</div> | |
</div> | |
</div> | |
<div class="wait-time"> | |
<p><b>当前预计等待时间:</b> {format_time(queue_info['estimated_wait'])}</p> | |
<p class="last-update"><small>最后更新: {datetime.now().strftime('%H:%M:%S')}</small></p> | |
</div> | |
</div> | |
<div class="queue-info-card history-card"> | |
<h3 class="card-title">最近处理的任务</h3> | |
<table class="recent-tasks"> | |
<thead> | |
<tr> | |
<th>任务ID</th> | |
<th>请求时间</th> | |
<th>处理时间</th> | |
</tr> | |
</thead> | |
<tbody> | |
{tasks_html} | |
</tbody> | |
</table> | |
</div> | |
</div> | |
""" | |
def launch_workers(): | |
"""启动工作线程""" | |
global running | |
running = True | |
# 创建工作线程 | |
for _ in range(worker_threads): | |
worker = threading.Thread(target=queue_processor) | |
worker.daemon = True | |
worker.start() | |
# 自定义CSS | |
custom_css = """ | |
.container { | |
max-width: 1200px; | |
margin: 0 auto; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
} | |
.dashboard { | |
display: flex; | |
flex-direction: column; | |
gap: 20px; | |
} | |
.card-title { | |
color: #333; | |
border-bottom: 2px solid #ddd; | |
padding-bottom: 10px; | |
margin-top: 0; | |
} | |
.status-card, .queue-info-card { | |
background: #fff; | |
border-radius: 12px; | |
padding: 20px; | |
margin: 10px 0; | |
box-shadow: 0 4px 15px rgba(0,0,0,0.08); | |
} | |
.main-card { | |
border-top: 5px solid #4285f4; | |
} | |
.history-card { | |
border-top: 5px solid #34a853; | |
} | |
.status-card.success { | |
background: #e7f5e7; | |
border-left: 5px solid #28a745; | |
} | |
.status-card.error { | |
background: #f8d7da; | |
border-left: 5px solid #dc3545; | |
} | |
.error-message { | |
color: #dc3545; | |
font-weight: bold; | |
padding: 10px; | |
background: #f8d7da; | |
border-radius: 5px; | |
} | |
.notice { | |
color: #0c5460; | |
background-color: #d1ecf1; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.queue-stats { | |
display: flex; | |
justify-content: space-around; | |
margin: 20px 0; | |
} | |
.stat-item { | |
text-align: center; | |
padding: 15px; | |
background: #f8f9fa; | |
border-radius: 10px; | |
min-width: 120px; | |
transition: transform 0.3s ease; | |
} | |
.stat-item:hover { | |
transform: translateY(-5px); | |
box-shadow: 0 5px 15px rgba(0,0,0,0.1); | |
} | |
.stat-value { | |
font-size: 32px; | |
font-weight: bold; | |
color: #4285f4; | |
margin-bottom: 5px; | |
} | |
.stat-label { | |
color: #5f6368; | |
font-size: 16px; | |
} | |
.wait-time { | |
text-align: center; | |
margin: 20px 0; | |
padding: 15px; | |
background: #f1f3f4; | |
border-radius: 8px; | |
font-size: 18px; | |
} | |
.last-update { | |
color: #80868b; | |
margin-top: 10px; | |
margin-bottom: 0; | |
} | |
.recent-tasks { | |
width: 100%; | |
border-collapse: collapse; | |
margin-top: 15px; | |
background: white; | |
box-shadow: 0 1px 3px rgba(0,0,0,0.05); | |
} | |
.recent-tasks th, .recent-tasks td { | |
border: 1px solid #e0e0e0; | |
padding: 12px 15px; | |
text-align: center; | |
} | |
.recent-tasks th { | |
background-color: #f1f3f4; | |
color: #202124; | |
font-weight: 500; | |
} | |
.recent-tasks tbody tr:hover { | |
background-color: #f8f9fa; | |
} | |
.tabs { | |
margin-top: 20px; | |
} | |
button.primary { | |
background-color: #4285f4; | |
color: white; | |
padding: 10px 20px; | |
border: none; | |
border-radius: 4px; | |
cursor: pointer; | |
font-size: 16px; | |
font-weight: 500; | |
transition: background-color 0.3s; | |
} | |
button.primary:hover { | |
background-color: #3367d6; | |
} | |
""" | |
# 初始化并启动工作线程 | |
launch_workers() | |
# 创建Gradio接口 | |
with gr.Blocks(css=custom_css) as demo: | |
gr.Markdown("# 代码评估服务") | |
gr.Markdown("支持多种编程语言的代码评估服务,采用队列机制处理请求") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
# 队列状态信息卡片 | |
queue_info_html = gr.HTML() | |
refresh_queue_btn = gr.Button("刷新队列状态", variant="primary") | |
# 隐藏的API接口组件,不在UI上显示 | |
with gr.Row(visible=False): | |
api_input = gr.JSON() | |
api_output = gr.JSON() | |
# 定义更新函数 | |
def update_queue_info(): | |
return ui_get_queue_info() | |
# 定时更新队列信息 | |
demo.load(update_queue_info, None, queue_info_html, every=3) | |
# 刷新按钮事件 | |
refresh_queue_btn.click(update_queue_info, None, queue_info_html) | |
# 添加兼容原有接口的评估端点,但不在UI显示 | |
demo.queue() | |
evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate") | |
if __name__ == "__main__": | |
try: | |
demo.launch() | |
finally: | |
# 停止工作线程 | |
running = False |