Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import concurrent.futures | |
import multiprocessing | |
import gradio as gr | |
from src.containerized_eval import eval_string_script | |
# 添加当前目录和src目录到模块搜索路径 | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
src_dir = os.path.join(current_dir, "src") | |
if current_dir not in sys.path: | |
sys.path.append(current_dir) | |
if src_dir not in sys.path: | |
sys.path.append(src_dir) | |
# 定义系统错误关键词,用于判断是否需要重试 | |
SYSTEM_ERROR_KEYWORDS = [ | |
"resource", "timeout", "busy", "congestion", "memory", | |
"connection", "system", "overload", "refused", "reset" | |
] | |
def is_system_error(error_msg): | |
"""检查错误信息是否包含系统错误关键词 | |
Args: | |
error_msg (str): 错误信息 | |
Returns: | |
bool: 是否是系统错误 | |
""" | |
error_msg = str(error_msg).lower() | |
return any(keyword in error_msg for keyword in SYSTEM_ERROR_KEYWORDS) | |
def retry_with_logging(func, args, test_name="未知测试用例", error_context=""): | |
"""带日志记录的重试函数 | |
Args: | |
func: 要重试的函数 | |
args: 函数参数 | |
test_name (str): 测试用例名称 | |
error_context (str): 错误上下文描述 | |
Returns: | |
tuple: (结果, 是否成功) | |
""" | |
try: | |
print(f"{error_context},正在重试测试用例 '{test_name}'...") | |
result = func(*args) | |
success = True | |
if isinstance(result, dict) and result.get("status") == "Exception": | |
success = False | |
else: | |
print(f"测试用例 '{test_name}' 重试成功") | |
return result, success | |
except Exception as e: | |
print(f"测试用例 '{test_name}' 重试失败: {str(e)}") | |
return {"status": "Exception", "error": str(e)}, False | |
def evaluate(input_data): | |
"""评估代码的主函数 | |
Args: | |
input_data: 列表(批量处理多个测试用例) | |
Returns: | |
list: 包含评估结果的列表 | |
""" | |
try: | |
if not isinstance(input_data, list): | |
return {"status": "Exception", "error": "Input must be a list"} | |
results = [] | |
max_workers = multiprocessing.cpu_count() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
result = future.result() | |
# 检查结果列表 | |
if not isinstance(result, list): | |
return {"status": "Exception", "error": "Input data must be a list"} | |
# 处理结果中的系统错误 | |
for i, res in enumerate(result): | |
if isinstance(res, dict) and res.get("status") == "Exception" and is_system_error(res.get("error", "")): | |
test_name = item.get('name', '未知测试用例') | |
error_context = f"检测到列表中的系统错误: {res.get('error', '')}" | |
# 仅重试这个失败的情况 | |
code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests') | |
retry_result, success = retry_with_logging( | |
evaluate_code, | |
[code, item.get('language')], | |
test_name, | |
error_context | |
) | |
if success: | |
result[i] = retry_result | |
item.update(result) | |
results.append(item) | |
except Exception as e: | |
# 处理执行过程中的系统错误 | |
if is_system_error(e): | |
test_name = item.get('name', '未知测试用例') | |
error_context = f"执行过程中检测到系统错误: {str(e)}" | |
retry_result, success = retry_with_logging( | |
evaluate_single_case, | |
[item], | |
test_name, | |
error_context | |
) | |
if success: | |
item.update(retry_result) | |
results.append(item) | |
continue | |
# 如果重试失败或不是系统错误,记录原始错误 | |
item.update({"status": "Exception", "error": str(e)}) | |
results.append(item) | |
return results | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_single_case(input_data): | |
"""评估单个代码用例 | |
Args: | |
input_data: 字典(包含代码信息) | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
if not isinstance(input_data, dict): | |
return {"status": "Exception", "error": "Input item must be a dictionary"} | |
try: | |
language = input_data.get('language') | |
completions = input_data.get('processed_completions', []) | |
if not completions: | |
return {"status": "Exception", "error": "No code provided"} | |
results = [] | |
for comp in completions: | |
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') | |
result = evaluate_code(code, language) | |
if result["status"] == "OK": | |
return result | |
results.append(result) | |
return results[0] | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
def evaluate_code(code, language): | |
"""评估特定语言的代码 | |
Args: | |
code (str): 要评估的代码 | |
language (str): 编程语言 | |
Returns: | |
dict: 包含评估结果的字典 | |
""" | |
try: | |
# 使用containerized_eval中的eval_string_script函数 | |
result = eval_string_script(language, code) | |
return result | |
except Exception as e: | |
return {"status": "Exception", "error": str(e)} | |
# 创建Gradio接口 | |
demo = gr.Interface( | |
fn=evaluate, | |
inputs=gr.JSON(), | |
outputs=gr.JSON(), | |
title="代码评估服务", | |
description="支持多种编程语言的代码评估服务" | |
) | |
if __name__ == "__main__": | |
demo.launch() | |