File size: 6,571 Bytes
30b1610
4f32597
3499425
fc6c268
0087c59
 
f41205f
30b1610
0087c59
74d43a2
4f32597
74d43a2
4f32597
 
74d43a2
 
4f32597
0087c59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08681f4
 
30b1610
141e12d
3499425
141e12d
 
3499425
141e12d
 
3499425
 
 
 
e74db4f
0087c59
e74db4f
3499425
 
 
 
 
0087c59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b01f5f4
3499425
 
 
0087c59
 
 
 
 
 
 
 
 
 
 
b01f5f4
 
 
 
 
3499425
 
 
141e12d
 
 
 
0087c59
141e12d
 
 
08681f4
3499425
30b1610
08681f4
 
 
0087c59
 
 
08681f4
3499425
 
52d43e7
3499425
 
52d43e7
3499425
 
 
 
 
4d4a4b6
0900021
08681f4
3499425
 
08681f4
 
22cec65
0087c59
08681f4
 
 
 
 
 
30b1610
08681f4
 
 
 
f41205f
 
e18e210
30b1610
08681f4
 
30b1610
0087c59
08681f4
 
30b1610
08681f4
 
 
2f2f63e
30b1610
 
0087c59
08681f4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import sys
import concurrent.futures
import multiprocessing

import gradio as gr
from src.containerized_eval import eval_string_script


# 添加当前目录和src目录到模块搜索路径
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.join(current_dir, "src")
if current_dir not in sys.path:
    sys.path.append(current_dir)
if src_dir not in sys.path:
    sys.path.append(src_dir)


# 定义系统错误关键词,用于判断是否需要重试
SYSTEM_ERROR_KEYWORDS = [
    "resource", "timeout", "busy", "congestion", "memory", 
    "connection", "system", "overload", "refused", "reset"
]


def is_system_error(error_msg):
    """检查错误信息是否包含系统错误关键词
    
    Args:
        error_msg (str): 错误信息
        
    Returns:
        bool: 是否是系统错误
    """
    error_msg = str(error_msg).lower()
    return any(keyword in error_msg for keyword in SYSTEM_ERROR_KEYWORDS)


def retry_with_logging(func, args, test_name="未知测试用例", error_context=""):
    """带日志记录的重试函数
    
    Args:
        func: 要重试的函数
        args: 函数参数
        test_name (str): 测试用例名称
        error_context (str): 错误上下文描述
        
    Returns:
        tuple: (结果, 是否成功)
    """
    try:
        print(f"{error_context},正在重试测试用例 '{test_name}'...")
        result = func(*args)
        success = True
        if isinstance(result, dict) and result.get("status") == "Exception":
            success = False
        else:
            print(f"测试用例 '{test_name}' 重试成功")
        return result, success
    except Exception as e:
        print(f"测试用例 '{test_name}' 重试失败: {str(e)}")
        return {"status": "Exception", "error": str(e)}, False


def evaluate(input_data):
    """评估代码的主函数
    
    Args:
        input_data: 列表(批量处理多个测试用例)
        
    Returns:
        list: 包含评估结果的列表
    """
    try:
        if not isinstance(input_data, list):
            return {"status": "Exception", "error": "Input must be a list"}
            
        results = []
        max_workers = multiprocessing.cpu_count()
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
            for future in concurrent.futures.as_completed(future_to_item):
                item = future_to_item[future]
                try:
                    result = future.result()

                    # 处理结果中的系统错误
                    for i, res in enumerate(result):
                        if isinstance(res, dict) and res.get("status") == "Exception" and is_system_error(res.get("error", "")):
                            test_name = item.get('name', '未知测试用例')
                            error_context = f"检测到列表中的系统错误: {res.get('error', '')}"                            
                            # 仅重试这个失败的情况
                            code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests')
                            retry_result, success = retry_with_logging(
                                evaluate_code, 
                                [code, item.get('language')],
                                test_name,
                                error_context
                            )
                            if success:
                                result[i] = retry_result
                    
                    item.update(result)
                    results.append(item)
                except Exception as e:
                    # 处理执行过程中的系统错误
                    if is_system_error(e):
                        test_name = item.get('name', '未知测试用例')
                        error_context = f"执行过程中检测到系统错误: {str(e)}"
                        retry_result, success = retry_with_logging(
                            evaluate_single_case, 
                            [item],
                            test_name,
                            error_context
                        )
                        if success:
                            item.update(retry_result)
                            results.append(item)
                            continue
                    
                    # 如果重试失败或不是系统错误,记录原始错误
                    item.update({"status": "Exception", "error": str(e)})
                    results.append(item)
        return results
            
    except Exception as e:
        return {"status": "Exception", "error": str(e)}


def evaluate_single_case(input_data):
    """评估单个代码用例
    
    Args:
        input_data: 字典(包含代码信息)
        
    Returns:
        dict: 包含评估结果的字典
    """
    if not isinstance(input_data, dict):
        return {"status": "Exception", "error": "Input item must be a dictionary"}
        
    try:
        language = input_data.get('language')
        completions = input_data.get('processed_completions', [])

        if not completions:
            return {"status": "Exception", "error": "No code provided"}

        results = []
        for comp in completions:
            code = input_data.get('prompt') + comp + '\n' + input_data.get('tests')
            result = evaluate_code(code, language)
            if result["status"] == "OK":
                return result
            results.append(result)
            
        return results[0]
                
    except Exception as e:
        return {"status": "Exception", "error": str(e)}


def evaluate_code(code, language):
    """评估特定语言的代码
    
    Args:
        code (str): 要评估的代码
        language (str): 编程语言
        
    Returns:
        dict: 包含评估结果的字典
    """
    try:
        # 使用containerized_eval中的eval_string_script函数
        result = eval_string_script(language, code)
        return result

    except Exception as e:
        return {"status": "Exception", "error": str(e)}


# 创建Gradio接口
demo = gr.Interface(
    fn=evaluate,
    inputs=gr.JSON(),
    outputs=gr.JSON(),
    title="代码评估服务",
    description="支持多种编程语言的代码评估服务"
)


if __name__ == "__main__":
    demo.launch()