Spaces:
Sleeping
Sleeping
File size: 7,608 Bytes
30b1610 08681f4 30b1610 4f32597 08681f4 3499425 fc6c268 f41205f 30b1610 74d43a2 4f32597 74d43a2 4f32597 74d43a2 4f32597 08681f4 30b1610 141e12d 3499425 141e12d 3499425 141e12d 3499425 b01f5f4 e74db4f 3499425 b01f5f4 3499425 b01f5f4 3499425 141e12d 08681f4 3499425 30b1610 08681f4 3499425 52d43e7 3499425 52d43e7 3499425 4d4a4b6 0900021 08681f4 3499425 08681f4 22cec65 08681f4 30b1610 08681f4 f41205f e18e210 30b1610 08681f4 30b1610 08681f4 30b1610 08681f4 2f2f63e 30b1610 08681f4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import gradio as gr
import json
import importlib
import os
import sys
from pathlib import Path
import concurrent.futures
import multiprocessing
from src.containerized_eval import eval_string_script
# 添加当前目录和src目录到模块搜索路径
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(current_dir, "src")
if current_dir not in sys.path:
sys.path.append(current_dir)
if src_dir not in sys.path:
sys.path.append(src_dir)
def evaluate(input_data):
"""评估代码的主函数
Args:
input_data: 列表(批量处理多个测试用例)
Returns:
list: 包含评估结果的列表
"""
try:
if not isinstance(input_data, list):
return {"status": "Exception", "error": "Input must be a list"}
results = []
# 定义系统错误关键词,用于判断是否需要重试
system_error_keywords = [
"resource", "timeout", "busy", "congestion", "memory",
"connection", "system", "overload", "refused", "reset"
]
max_workers = multiprocessing.cpu_count()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
for future in concurrent.futures.as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
# 检查是否是系统错误,如果是,立即重试一次
if isinstance(result, dict) and result.get("status") == "Exception":
error_msg = str(result.get("error", "")).lower()
# 如果错误信息包含系统错误关键词,则重试
if any(keyword in error_msg for keyword in system_error_keywords):
print(f"检测到系统错误: {error_msg},正在重试...")
# 立即重试
retry_result = evaluate_single_case(item)
if isinstance(retry_result, dict) and retry_result.get("status") != "Exception":
# 重试成功,使用重试结果
result = retry_result
print(f"重试成功")
else:
print(f"重试失败")
# 检查结果列表
if isinstance(result, list):
for i, res in enumerate(result):
if isinstance(res, dict) and res.get("status") == "Exception":
error_msg = str(res.get("error", "")).lower()
# 如果错误信息包含系统错误关键词,则重试
if any(keyword in error_msg for keyword in system_error_keywords):
print(f"检测到列表中的系统错误: {error_msg},正在重试...")
# 仅重试这个失败的情况
code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests')
retry_result = evaluate_code(code, item.get('language'))
if isinstance(retry_result, dict) and retry_result.get("status") != "Exception":
# 重试成功,更新结果
result[i] = retry_result
print(f"重试成功")
else:
print(f"重试失败")
# 如果是超时错误,也尝试重试一次
if isinstance(result, dict) and result.get("status") == "Timeout":
print(f"检测到超时错误,正在重试...")
# 立即重试
retry_result = evaluate_single_case(item)
if isinstance(retry_result, dict) and retry_result.get("status") != "Timeout":
# 重试成功,使用重试结果
result = retry_result
print(f"重试成功")
else:
print(f"重试失败")
item.update(result)
results.append(item)
except Exception as e:
error_msg = str(e).lower()
# 检查是否是系统错误
if any(keyword in error_msg for keyword in system_error_keywords):
print(f"执行过程中检测到系统错误: {error_msg},正在重试...")
try:
# 立即重试
retry_result = evaluate_single_case(item)
item.update(retry_result)
results.append(item)
print(f"重试成功")
continue
except Exception as retry_e:
print(f"重试失败: {str(retry_e)}")
# 如果重试失败或不是系统错误,记录原始错误
item.update({"status": "Exception", "error": str(e)})
results.append(item)
return results
except Exception as e:
return {"status": "Exception", "error": str(e)}
def evaluate_single_case(input_data):
"""评估单个代码用例
Args:
input_data: 字典(包含代码信息)
Returns:
dict: 包含评估结果的字典
"""
try:
if not isinstance(input_data, dict):
return {"status": "Exception", "error": "Input item must be a dictionary"}
language = input_data.get('language')
completions = input_data.get('processed_completions', [])
if not completions:
return {"status": "Exception", "error": "No code provided"}
results = []
for comp in completions:
code = input_data.get('prompt') + comp + '\n' + input_data.get('tests')
result = evaluate_code(code, language)
if result["status"] == "OK":
return result
results.append(result)
return results[0]
except Exception as e:
return {"status": "Exception", "error": str(e)}
def evaluate_code(code, language):
"""评估特定语言的代码
Args:
code (str): 要评估的代码
language (str): 编程语言
Returns:
dict: 包含评估结果的字典
"""
try:
# 使用containerized_eval中的eval_string_script函数
result = eval_string_script(language, code)
return result
except Exception as e:
return {"status": "Exception", "error": str(e)}
# 创建Gradio接口
demo = gr.Interface(
fn=evaluate,
inputs=gr.JSON(),
outputs=gr.JSON(),
title="代码评估服务",
description="支持多种编程语言的代码评估服务"
)
if __name__ == "__main__":
demo.launch()
|