Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
import time | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import threading | |
import queue | |
from datetime import datetime, timedelta | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
# 添加全局任务队列和任务状态跟踪 | |
task_queue = queue.Queue() | |
task_results = {} | |
active_tasks = {} | |
completed_tasks = [] | |
task_id_counter = 0 | |
task_lock = threading.Lock() | |
last_update_time = datetime.now() # 替换update_event为时间戳跟踪 | |
# 持久化的状态用于API访问和轮询更新 | |
status_poll_counter = 0 # 用于强制更新UI,即使状态没有变化 | |
last_background_update = datetime.now() # 最后一次后台更新时间 | |
def trigger_ui_update(): | |
"""触发UI更新事件""" | |
global last_update_time, status_poll_counter, last_background_update | |
with task_lock: | |
last_update_time = datetime.now() # 更新时间戳 | |
status_poll_counter += 1 # 递增计数器,确保每次更新都被捕获 | |
last_background_update = last_update_time # 更新后台状态 | |
print(f"UI更新被触发: {datetime.now().strftime('%H:%M:%S')} [计数: {status_poll_counter}]") | |
def get_next_task_id(): | |
global task_id_counter | |
with task_lock: | |
task_id_counter += 1 | |
return f"task_{task_id_counter}" | |
def submit_task(input_data): | |
"""提交任务到队列 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
str: 任务ID | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "error", "message": "Input must be a list"} | |
task_id = get_next_task_id() | |
with task_lock: | |
estimated_time = estimate_completion_time(input_data) | |
task_info = { | |
"id": task_id, | |
"data": input_data, | |
"status": "queued", | |
"submitted_at": datetime.now(), | |
"estimated_completion_time": estimated_time, | |
"items_count": len(input_data) | |
} | |
active_tasks[task_id] = task_info | |
task_queue.put(task_info) | |
# 触发UI更新 | |
trigger_ui_update() | |
# 如果这是第一个任务,启动处理线程 | |
if len(active_tasks) == 1: | |
threading.Thread(target=process_task_queue, daemon=True).start() | |
return {"status": "success", "task_id": task_id} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
def estimate_completion_time(input_data): | |
"""估计完成任务所需的时间 | |
Args: | |
input_data: 任务数据 | |
Returns: | |
timedelta: 估计的完成时间 | |
""" | |
# 在Hugging Face Spaces环境中,资源通常受限,调整处理时间预估 | |
# 假设每个任务项平均需要5秒处理(HF环境中可能更慢) | |
avg_time_per_item = 5 | |
total_items = len(input_data) | |
# Hugging Face Spaces通常有限制的CPU资源 | |
# 保守估计并行处理能力 | |
try: | |
cpu_count = multiprocessing.cpu_count() | |
except: | |
# 如果获取失败,假设只有2个CPU | |
cpu_count = 2 | |
# 在HF环境中,即使有多核也可能性能受限,降低并行因子 | |
parallel_factor = min(2, total_items) # 限制最多2个并行任务 | |
if parallel_factor > 0: | |
estimated_seconds = (total_items * avg_time_per_item) / parallel_factor | |
# 为了避免过于乐观的估计,增加30%的缓冲时间 | |
estimated_seconds = estimated_seconds * 1.3 | |
return timedelta(seconds=round(estimated_seconds)) | |
else: | |
return timedelta(seconds=0) | |
def process_task_queue(): | |
"""处理任务队列的后台线程""" | |
while True: | |
try: | |
if task_queue.empty(): | |
time.sleep(0.5) | |
continue | |
task_info = task_queue.get() | |
task_id = task_info["id"] | |
# 更新任务状态 | |
with task_lock: | |
if task_id in active_tasks: # 确保任务仍存在 | |
active_tasks[task_id]["status"] = "processing" | |
print(f"任务 {task_id} 开始处理,当前时间: {datetime.now().strftime('%H:%M:%S')}") | |
trigger_ui_update() # 状态变更为处理中时更新UI | |
else: | |
print(f"警告: 任务 {task_id} 不在活跃任务列表中") | |
task_queue.task_done() | |
continue | |
# 处理任务 | |
print(f"开始评估任务 {task_id},数据项数: {len(task_info['data'])}") | |
result = evaluate(task_info["data"]) | |
print(f"任务 {task_id} 评估完成,结果数: {len(result) if isinstance(result, list) else 'Not a list'}") | |
# 更新任务结果 | |
with task_lock: | |
if task_id in active_tasks: # 再次确保任务仍存在 | |
completed_time = datetime.now() | |
active_tasks[task_id]["status"] = "completed" | |
active_tasks[task_id]["completed_at"] = completed_time | |
active_tasks[task_id]["result"] = result | |
# 计算处理持续时间 | |
start_time = active_tasks[task_id]["submitted_at"] | |
duration = completed_time - start_time | |
print(f"任务 {task_id} 已完成,耗时: {duration},当前时间: {completed_time.strftime('%H:%M:%S')}") | |
# 将任务移至已完成列表 | |
completed_tasks.append(active_tasks[task_id]) | |
del active_tasks[task_id] | |
# 保留最近的20个已完成任务 | |
if len(completed_tasks) > 20: | |
completed_tasks.pop(0) | |
# 状态更新后强制触发UI更新 | |
trigger_ui_update() | |
print(f"任务 {task_id} 完成后触发UI更新: {last_update_time}") | |
else: | |
print(f"警告: 任务 {task_id} 在处理完成后不在活跃任务列表中") | |
task_queue.task_done() | |
except Exception as e: | |
print(f"处理任务队列出错: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
time.sleep(1) | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
# 打印Gradio版本,用于调试 | |
import gradio | |
print(f"Gradio version: {gradio.__version__}") | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
# 在HF Spaces环境中可能受限,降低并行数量 | |
try: | |
max_workers = min(multiprocessing.cpu_count(), 2) # 最多2个并行任务 | |
except: | |
max_workers = 2 # 如果无法获取,默认为2 | |
# 增加超时处理 | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {} | |
# 分批处理,每批最多5个任务,避免资源耗尽 | |
batch_size = 5 | |
for i in range(0, len(input_data), batch_size): | |
batch = input_data[i:i+batch_size] | |
# 为每个任务提交并记录 | |
for item in batch: | |
future = executor.submit(evaluate_single_case, item) | |
future_to_item[future] = item | |
# 等待当前批次完成 | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
# 设置较短的超时时间,避免任务卡死 | |
result = future.result(timeout=60) # 60秒超时 | |
item.update(result) | |
results.append(item) | |
except concurrent.futures.TimeoutError: | |
# 处理超时情况 | |
item.update({ | |
"status": "Timeout", | |
"error": "Task processing timed out in Hugging Face environment" | |
}) | |
results.append(item) | |
except Exception as e: | |
# 处理其他异常 | |
item.update({ | |
"status": "Exception", | |
"error": f"Error in Hugging Face environment: {str(e)}" | |
}) | |
results.append(item) | |
# 清空当前批次 | |
future_to_item = {} | |
# 短暂休息,让系统喘息 | |
time.sleep(0.5) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": f"Evaluation error in Hugging Face environment: {str(e)}"} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def get_queue_status(): | |
"""获取当前队列状态 | |
Returns: | |
dict: 包含队列状态的字典 | |
""" | |
with task_lock: | |
queued_tasks = [task for task in active_tasks.values() if task["status"] == "queued"] | |
processing_tasks = [task for task in active_tasks.values() if task["status"] == "processing"] | |
# 计算总的预计完成时间 | |
total_estimated_time = timedelta(seconds=0) | |
for task in active_tasks.values(): | |
if isinstance(task["estimated_completion_time"], timedelta): | |
total_estimated_time += task["estimated_completion_time"] | |
# 计算队列中的项目总数 | |
total_items = sum(task["items_count"] for task in active_tasks.values()) | |
return { | |
"queued_tasks": len(queued_tasks), | |
"processing_tasks": len(processing_tasks), | |
"total_tasks": len(active_tasks), | |
"total_items": total_items, | |
"estimated_completion_time": str(total_estimated_time), | |
"active_tasks": [ | |
{ | |
"id": task["id"], | |
"status": task["status"], | |
"items_count": task["items_count"], | |
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), | |
"estimated_completion": str(task["estimated_completion_time"]) | |
} for task in active_tasks.values() | |
], | |
"recent_completed": [ | |
{ | |
"id": task["id"], | |
"items_count": task["items_count"], | |
"submitted_at": task["submitted_at"].strftime("%Y-%m-%d %H:%M:%S"), | |
"completed_at": task["completed_at"].strftime("%Y-%m-%d %H:%M:%S") if "completed_at" in task else "", | |
"duration": str(task["completed_at"] - task["submitted_at"]) if "completed_at" in task else "" | |
} for task in completed_tasks[-5:] # 只显示最近5个完成的任务 | |
], | |
"poll_counter": status_poll_counter, # 添加轮询计数器 | |
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') | |
} | |
def render_queue_status(): | |
"""渲染队列状态UI | |
Returns: | |
str: HTML格式的队列状态显示 | |
""" | |
status = get_queue_status() | |
html = f""" | |
<div style="font-family: Arial, sans-serif; max-width: 900px; margin: 0 auto; padding: 20px; background: #f9f9f9; border-radius: 10px; box-shadow: 0 4px 12px rgba(0,0,0,0.1);"> | |
<div style="display: flex; justify-content: space-between; margin-bottom: 20px;"> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">排队任务</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #3498db;">{status['queued_tasks']}</p> | |
</div> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">处理中任务</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #e74c3c;">{status['processing_tasks']}</p> | |
</div> | |
<div style="background: #fff; padding: 15px; border-radius: 8px; width: 30%; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h3 style="margin: 0 0 10px; color: #444; font-size: 16px;">总项目数</h3> | |
<p style="font-size: 28px; font-weight: bold; margin: 0; color: #2ecc71;">{status['total_items']}</p> | |
</div> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">预计完成时间</h2> | |
<div style="display: flex; align-items: center;"> | |
<div style="font-size: 24px; font-weight: bold; color: #9b59b6;">{status['estimated_completion_time']}</div> | |
<div style="margin-left: 10px; color: #777; font-size: 14px;">(小时:分钟:秒)</div> | |
</div> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">活跃任务</h2> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<thead> | |
<tr style="background: #f2f2f2;"> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">状态</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">预计完成</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if status['active_tasks']: | |
for task in status['active_tasks']: | |
status_color = "#3498db" if task['status'] == "queued" else "#e74c3c" | |
html += f""" | |
<tr> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;"> | |
<span style="padding: 4px 8px; border-radius: 4px; font-size: 12px; background: {status_color}; color: white;"> | |
{task['status']} | |
</span> | |
</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['estimated_completion']}</td> | |
</tr> | |
""" | |
else: | |
html += f""" | |
<tr> | |
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">当前没有活跃任务</td> | |
</tr> | |
""" | |
html += """ | |
</tbody> | |
</table> | |
</div> | |
<div style="background: #fff; padding: 20px; border-radius: 8px; box-shadow: 0 2px 6px rgba(0,0,0,0.05);"> | |
<h2 style="margin: 0 0 15px; color: #333; font-size: 18px;">最近完成的任务</h2> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<thead> | |
<tr style="background: #f2f2f2;"> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">任务ID</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">项目数</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">提交时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">完成时间</th> | |
<th style="padding: 12px; text-align: left; border-bottom: 1px solid #ddd;">持续时间</th> | |
</tr> | |
</thead> | |
<tbody> | |
""" | |
if status['recent_completed']: | |
for task in status['recent_completed']: | |
html += f""" | |
<tr> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['id']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['items_count']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['submitted_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['completed_at']}</td> | |
<td style="padding: 12px; border-bottom: 1px solid #eee;">{task['duration']}</td> | |
</tr> | |
""" | |
else: | |
html += f""" | |
<tr> | |
<td colspan="5" style="padding: 15px; text-align: center; color: #777;">暂无已完成任务</td> | |
</tr> | |
""" | |
html += """ | |
</tbody> | |
</table> | |
</div> | |
</div> | |
""" | |
return html | |
def refresh_ui(): | |
"""定期刷新UI函数,确保显示最新状态""" | |
global status_poll_counter | |
print(f"UI刷新被调用: {datetime.now().strftime('%H:%M:%S')} [状态计数: {status_poll_counter}]") | |
return render_queue_status(), status_poll_counter | |
def get_status_for_polling(): | |
"""为AJAX轮询提供的状态获取函数,简化返回数据量""" | |
global status_poll_counter | |
# 返回最新状态和计数器值,用于客户端判断是否有更新 | |
return { | |
"status_html": render_queue_status(), | |
"poll_counter": status_poll_counter, | |
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') | |
} | |
def submit_json_data(json_data): | |
"""提交JSON数据处理接口""" | |
try: | |
# 解析JSON字符串 | |
if isinstance(json_data, str): | |
data = json.loads(json_data) | |
else: | |
data = json_data | |
# 为调试记录提交来源 | |
print(f"接收到任务提交: {len(data) if isinstance(data, list) else 'Not a list'} 项") | |
print(f"当前队列状态 (提交前): 活跃任务 {len(active_tasks)}, 已完成任务 {len(completed_tasks)}") | |
result = submit_task(data) | |
response = json.dumps(result, ensure_ascii=False, indent=2) | |
# 强制触发UI更新 | |
trigger_ui_update() | |
print(f"任务提交后触发UI更新: {last_update_time}") | |
print(f"当前队列状态 (提交后): 活跃任务 {len(active_tasks)}, 已完成任务 {len(completed_tasks)}") | |
# 返回任务提交结果和最新的队列状态 | |
status_html = render_queue_status() | |
return response, status_html | |
except Exception as e: | |
print(f"提交任务出错: {str(e)}") | |
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False, indent=2), render_queue_status() | |
# API端点函数,用于获取最新队列状态 | |
def api_get_queue_status(): | |
"""API端点,获取最新队列状态""" | |
status = get_queue_status() | |
status["html"] = render_queue_status() # 添加HTML渲染版本 | |
return status | |
# 后台UI更新线程 | |
def background_ui_updater(): | |
"""每隔一段时间检查任务状态,如有变化则触发更新""" | |
global last_background_update | |
print("后台UI更新线程已启动") | |
while True: | |
try: | |
# 检查队列状态 | |
current_time = datetime.now() | |
time_since_last_update = (current_time - last_background_update).total_seconds() | |
# 如果30秒内没有更新,且有活跃任务,触发更新 | |
with task_lock: | |
has_active_tasks = len(active_tasks) > 0 | |
if time_since_last_update > 30 and has_active_tasks: | |
print(f"后台更新: 有{len(active_tasks)}个活跃任务,{time_since_last_update:.1f}秒未更新") | |
trigger_ui_update() | |
# 如果2分钟内没有更新,无论如何都触发一次更新保持UI活跃 | |
if time_since_last_update > 120: | |
print(f"后台更新: 保活触发,{time_since_last_update:.1f}秒未更新") | |
trigger_ui_update() | |
except Exception as e: | |
print(f"后台更新出错: {str(e)}") | |
# 每5秒检查一次 | |
time.sleep(5) | |
# 创建Gradio接口 | |
with gr.Blocks(title="代码评估服务", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 代码评估服务 | |
### 支持多种编程语言的代码评估服务 | |
""") | |
# 添加隐藏的API处理组件 | |
with gr.Row(visible=False): | |
api_input = gr.JSON() | |
api_output = gr.JSON() | |
# 设置API触发器 | |
def api_trigger(data): | |
"""处理API请求的函数""" | |
print(f"通过API接收到请求: {len(data) if isinstance(data, list) else 'Not a list'}") | |
try: | |
result = submit_task(data) | |
# 强制触发UI更新 | |
trigger_ui_update() | |
return result | |
except Exception as e: | |
print(f"API处理出错: {str(e)}") | |
return {"status": "error", "message": str(e)} | |
api_input.change(fn=api_trigger, inputs=api_input, outputs=api_output) | |
with gr.Tab("任务队列状态"): | |
status_html = gr.HTML(render_queue_status) | |
poll_counter = gr.Number(value=0, visible=False) # 隐藏的计数器,用于触发更新 | |
refresh_button = gr.Button("刷新状态") | |
status_text = gr.Markdown("上次更新时间: 未更新") | |
# 根据Gradio版本使用不同的事件注册方法 | |
if 'gradio_version' not in locals(): | |
import gradio | |
gradio_version = getattr(gradio, "__version__", "unknown") | |
if gradio_version.startswith("3."): | |
# Gradio 3.x 方式 | |
refresh_button.click(fn=refresh_ui, outputs=[status_html, poll_counter], concurrency_limit=2) | |
else: | |
# Gradio 4.x 方式 (不使用concurrency_limit参数) | |
refresh_button.click( | |
fn=refresh_ui, | |
outputs=[status_html, poll_counter], | |
every=10 # 每10秒自动刷新一次 | |
) | |
# API断点,用于轮询最新状态 | |
status_polling_input = gr.Number(value=0, visible=False, label="轮询触发器") | |
status_polling_output = gr.JSON(visible=False, label="轮询结果") | |
status_polling_input.change( | |
fn=get_status_for_polling, | |
inputs=[], | |
outputs=status_polling_output | |
) | |
# 使用JavaScript实现自动轮询 | |
polling_js = """ | |
<script> | |
// 初始化轮询计数器 | |
let lastPollCounter = 0; | |
let pollingActive = true; | |
// 轮询函数 | |
async function pollStatus() { | |
if (!pollingActive) return; | |
try { | |
// 查找JSON输出元素 (在Gradio 4.x中,元素ID可能不同) | |
const jsonElements = document.querySelectorAll('[data-testid="json"]'); | |
const statusElements = document.querySelectorAll('[id*="status_html"]'); | |
// 如果找不到元素,使用普通AJAX请求 | |
if (!jsonElements.length) { | |
// 通过已有的刷新按钮点击来刷新 | |
const refreshButton = [...document.querySelectorAll('button')].find(btn => | |
btn.textContent.includes('刷新状态')); | |
if (refreshButton) { | |
refreshButton.click(); | |
console.log("刷新按钮被触发"); | |
} | |
} else { | |
// 使用Gradio API调用来获取最新状态 | |
// 构建API请求的路径 | |
const apiBase = window.location.pathname.endsWith('/') | |
? window.location.pathname | |
: window.location.pathname + '/'; | |
const response = await fetch(apiBase + 'api/queue/status'); | |
if (response.ok) { | |
const data = await response.json(); | |
const statusHtml = statusElements[0]; | |
// 如果有状态更新,更新UI | |
if (statusHtml && data.status) { | |
// 根据API返回更新页面状态 | |
console.log("服务器状态已更新"); | |
// 触发刷新按钮 | |
const refreshButton = [...document.querySelectorAll('button')].find(btn => | |
btn.textContent.includes('刷新状态')); | |
if (refreshButton) { | |
refreshButton.click(); | |
} | |
} | |
} | |
} | |
} catch (err) { | |
console.error("轮询错误:", err); | |
} | |
// 继续轮询 | |
setTimeout(pollStatus, 5000); // 每5秒轮询一次 | |
} | |
// 页面加载完成后开始轮询 | |
if (document.readyState === 'loading') { | |
document.addEventListener('DOMContentLoaded', () => setTimeout(pollStatus, 1000)); | |
} else { | |
setTimeout(pollStatus, 1000); | |
} | |
// 页面可见性变化时管理轮询 | |
document.addEventListener('visibilitychange', function() { | |
pollingActive = document.visibilityState === 'visible'; | |
if (pollingActive) { | |
// 页面变为可见时立即轮询一次 | |
pollStatus(); | |
} | |
}); | |
</script> | |
""" | |
gr.HTML(polling_js) | |
# 以下是原来的自动刷新脚本,保留但不使用 | |
auto_refresh_js = """ | |
<script> | |
// 兼容Gradio 3.x和4.x的自动刷新机制 - 仅作为备用 | |
console.log('自动刷新机制已加载,但已被新的轮询系统替代'); | |
</script> | |
""" | |
# 不显示旧的自动刷新脚本 | |
# gr.HTML(auto_refresh_js) | |
with gr.Tab("提交新任务"): | |
with gr.Row(): | |
with gr.Column(): | |
json_input = gr.Textbox( | |
label="输入JSON数据", | |
placeholder='[{"language": "python", "prompt": "def add(a, b):\\n", "processed_completions": [" return a + b"], "tests": "assert add(1, 2) == 3"}]', | |
lines=10 | |
) | |
submit_button = gr.Button("提交任务") | |
with gr.Column(): | |
result_output = gr.Textbox(label="提交结果", lines=5) | |
# 根据Gradio版本使用不同的事件注册方法 | |
if gradio_version.startswith("3."): | |
# Gradio 3.x 方式 | |
submit_button.click(fn=submit_json_data, inputs=json_input, outputs=[result_output, status_html], concurrency_limit=2) | |
else: | |
# Gradio 4.x 方式 | |
submit_button.click(fn=submit_json_data, inputs=json_input, outputs=[result_output, status_html]) | |
with gr.Tab("API文档"): | |
gr.Markdown(""" | |
## API 文档 | |
### 1. 提交任务 | |
**请求:** | |
``` | |
POST /api/predict | |
Content-Type: application/json | |
[ | |
{ | |
"language": "python", | |
"prompt": "def add(a, b):\\n", | |
"processed_completions": [" return a + b"], | |
"tests": "assert add(1, 2) == 3" | |
} | |
] | |
``` | |
**响应:** | |
```json | |
{ | |
"status": "success", | |
"task_id": "task_1" | |
} | |
``` | |
### 2. 查询任务状态 | |
**请求:** | |
``` | |
GET /api/status | |
``` | |
**响应:** | |
```json | |
{ | |
"queued_tasks": 1, | |
"processing_tasks": 2, | |
"total_tasks": 3, | |
"total_items": 15, | |
"estimated_completion_time": "0:05:30", | |
"active_tasks": [...], | |
"recent_completed": [...] | |
} | |
``` | |
""") | |
# 这里不再添加状态API端点,避免与FastAPI冲突 | |
# demo.queue(api_open=True).add_api_route("/api/queue/status", api_get_queue_status, methods=["GET"]) | |
if __name__ == "__main__": | |
# 检测Gradio版本以适配不同版本的API | |
import gradio | |
gradio_version = getattr(gradio, "__version__", "unknown") | |
print(f"当前Gradio版本: {gradio_version}") | |
# 启动后台UI更新线程 | |
bg_thread = threading.Thread(target=background_ui_updater, daemon=True) | |
bg_thread.start() | |
print("后台UI更新线程已启动") | |
try: | |
# 设置标准日志记录,确保足够的调试信息 | |
import logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger("CodeEvalService") | |
logger.info(f"启动代码评估服务,Gradio版本: {gradio_version}") | |
# 尝试使用兼容所有版本的参数启动 | |
launch_kwargs = { | |
"server_name": "0.0.0.0", | |
"server_port": int(os.environ.get("PORT", 7860)), | |
"share": False, | |
} | |
# Gradio 4.x专用的FastAPI初始化 | |
if not gradio_version.startswith("3."): | |
# 针对Gradio 4的API配置 | |
import fastapi | |
from fastapi import FastAPI | |
# 创建FastAPI应用 | |
app = FastAPI(title="代码评估服务API") | |
# 添加队列状态API端点 | |
def get_queue_status_api(): | |
return api_get_queue_status() | |
# 添加任务提交API端点 | |
async def submit_task_api(data: list): | |
try: | |
result = submit_task(data) | |
return result | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
# 添加/evaluate API端点 | |
async def evaluate_api(data: list): | |
try: | |
result = evaluate(data) | |
return result | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
# 启动应用到FastAPI | |
demo.launch( | |
**launch_kwargs, | |
app=app, | |
max_threads=5, | |
) | |
else: | |
# Gradio 3.x的方式 | |
# 设置队列,并添加API支持 | |
try: | |
demo.queue(api_open=True, max_size=30) | |
# 添加/evaluate API端点 | |
demo.add_api_route("/evaluate", evaluate, methods=["POST"]) | |
except Exception as e: | |
logger.warning(f"配置队列时出错: {e}") | |
# 启动应用 | |
demo.launch( | |
**launch_kwargs, | |
debug=False, | |
show_api=True, | |
max_threads=5, | |
concurrency_limit=2 | |
) | |
except Exception as e: | |
print(f"启动时发生错误: {e}") | |
import traceback | |
traceback.print_exc() | |
# 尝试最简配置启动 | |
try: | |
print("使用最小配置重试...") | |
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) | |
except Exception as e2: | |
print(f"最小配置启动也失败: {e2}") | |
traceback.print_exc() | |
# 终极回退方案:创建最简单的接口并启动 | |
try: | |
print("尝试创建备用界面...") | |
import gradio as gr | |
def simple_evaluate(json_data): | |
try: | |
print(f"备用界面收到请求: {json_data[:100] if isinstance(json_data, str) else 'Not a string'}") | |
data = json.loads(json_data) if isinstance(json_data, str) else json_data | |
result = submit_task(data) | |
return json.dumps(result, ensure_ascii=False) | |
except Exception as e: | |
return {"error": str(e)} | |
backup_demo = gr.Interface( | |
fn=simple_evaluate, | |
inputs=gr.Textbox(label="JSON输入"), | |
outputs=gr.Textbox(label="结果"), | |
title="代码评估服务 (备用界面)", | |
description="原界面启动失败,这是简化版本。提交格式: [{\"language\": \"python\", \"prompt\": \"def add(a, b):\\n\", \"processed_completions\": [\" return a + b\"], \"tests\": \"assert add(1, 2) == 3\"}]" | |
) | |
backup_demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) | |
except Exception as e3: | |
print(f"备用界面也启动失败: {e3}") | |
traceback.print_exc() | |