Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
# 定义系统错误关键词,用于判断是否需要重试 | |
system_error_keywords = [ | |
"resource", "timeout", "busy", "congestion", "memory", | |
"connection", "system", "overload", "refused", "reset" | |
] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
# 检查是否是系统错误,如果是,立即重试一次 | |
if isinstance(result, dict) and result.get("status") == "Exception": | |
error_msg = str(result.get("error", "")).lower() | |
# 如果错误信息包含系统错误关键词,则重试 | |
if any(keyword in error_msg for keyword in system_error_keywords): | |
print(f"检测到系统错误: {error_msg},正在重试...") | |
# 立即重试 | |
retry_result = evaluate_single_case(item) | |
if isinstance(retry_result, dict) and retry_result.get("status") != "Exception": | |
# 重试成功,使用重试结果 | |
result = retry_result | |
print(f"重试成功") | |
else: | |
print(f"重试失败") | |
# 检查结果列表 | |
if isinstance(result, list): | |
for i, res in enumerate(result): | |
if isinstance(res, dict) and res.get("status") == "Exception": | |
error_msg = str(res.get("error", "")).lower() | |
# 如果错误信息包含系统错误关键词,则重试 | |
if any(keyword in error_msg for keyword in system_error_keywords): | |
print(f"检测到列表中的系统错误: {error_msg},正在重试...") | |
# 仅重试这个失败的情况 | |
code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests') | |
retry_result = evaluate_code(code, item.get('language')) | |
if isinstance(retry_result, dict) and retry_result.get("status") != "Exception": | |
# 重试成功,更新结果 | |
result[i] = retry_result | |
print(f"重试成功") | |
else: | |
print(f"重试失败") | |
# 如果是超时错误,也尝试重试一次 | |
if isinstance(result, dict) and result.get("status") == "Timeout": | |
print(f"检测到超时错误,正在重试...") | |
# 立即重试 | |
retry_result = evaluate_single_case(item) | |
if isinstance(retry_result, dict) and retry_result.get("status") != "Timeout": | |
# 重试成功,使用重试结果 | |
result = retry_result | |
print(f"重试成功") | |
else: | |
print(f"重试失败") | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
error_msg = str(e).lower() | |
# 检查是否是系统错误 | |
if any(keyword in error_msg for keyword in system_error_keywords): | |
print(f"执行过程中检测到系统错误: {error_msg},正在重试...") | |
try: | |
# 立即重试 | |
retry_result = evaluate_single_case(item) | |
item.update(retry_result) | |
results.append(item) | |
print(f"重试成功") | |
continue | |
except Exception as retry_e: | |
print(f"重试失败: {str(retry_e)}") | |
# 如果重试失败或不是系统错误,记录原始错误 | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
# 创建Gradio接口 | |
demo = gr.Interface( | |
fn=evaluate, | |
inputs=gr.JSON(), | |
outputs=gr.JSON(), | |
title="代码评估服务", | |
description="支持多种编程语言的代码评估服务" | |
) | |
if __name__ == "__main__": | |
demo.launch() | |