朱东升 commited on
Commit
a27816a
·
1 Parent(s): cd10de7

requirements update29

Browse files
Files changed (1) hide show
  1. app.py +71 -86
app.py CHANGED
@@ -1,12 +1,13 @@
 
 
 
1
  import os
2
  import sys
 
3
  import concurrent.futures
4
  import multiprocessing
5
-
6
- import gradio as gr
7
  from src.containerized_eval import eval_string_script
8
 
9
-
10
  # 添加当前目录和src目录到模块搜索路径
11
  current_dir = os.path.dirname(os.path.abspath(__file__))
12
  src_dir = os.path.join(current_dir, "src")
@@ -15,53 +16,6 @@ if current_dir not in sys.path:
15
  if src_dir not in sys.path:
16
  sys.path.append(src_dir)
17
 
18
-
19
- # 定义系统错误关键词,用于判断是否需要重试
20
- SYSTEM_ERROR_KEYWORDS = [
21
- "resource", "timeout", "busy", "congestion", "memory",
22
- "connection", "system", "overload", "refused", "reset"
23
- ]
24
-
25
-
26
- def is_system_error(error_msg):
27
- """检查错误信息是否包含系统错误关键词
28
-
29
- Args:
30
- error_msg (str): 错误信息
31
-
32
- Returns:
33
- bool: 是否是系统错误
34
- """
35
- error_msg = str(error_msg).lower()
36
- return any(keyword in error_msg for keyword in SYSTEM_ERROR_KEYWORDS)
37
-
38
-
39
- def retry_with_logging(func, args, test_name="未知测试用例", error_context=""):
40
- """带日志记录的重试函数
41
-
42
- Args:
43
- func: 要重试的函数
44
- args: 函数参数
45
- test_name (str): 测试用例名称
46
- error_context (str): 错误上下文描述
47
-
48
- Returns:
49
- tuple: (结果, 是否成功)
50
- """
51
- try:
52
- print(f"{error_context},正在重试测试用例 '{test_name}'...")
53
- result = func(*args)
54
- success = True
55
- if isinstance(result, dict) and result.get("status") == "Exception":
56
- success = False
57
- else:
58
- print(f"测试用例 '{test_name}' 重试成功")
59
- return result, success
60
- except Exception as e:
61
- print(f"测试用例 '{test_name}' 重试失败: {str(e)}")
62
- return {"status": "Exception", "error": str(e)}, False
63
-
64
-
65
  def evaluate(input_data):
66
  """评估代码的主函数
67
 
@@ -76,48 +30,83 @@ def evaluate(input_data):
76
  return {"status": "Exception", "error": "Input must be a list"}
77
 
78
  results = []
79
- max_workers = multiprocessing.cpu_count()
 
 
 
 
80
 
 
81
  with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
82
  future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
83
  for future in concurrent.futures.as_completed(future_to_item):
84
  item = future_to_item[future]
85
  try:
86
  result = future.result()
87
-
88
- # 处理结果中的系统错误
89
- for i, res in enumerate(result):
90
- if isinstance(res, dict) and res.get("status") == "Exception" and is_system_error(res.get("error", "")):
91
- test_name = item.get('name', '未知测试用例')
92
- error_context = f"检测到列表中的系统错误: {res.get('error', '')}"
93
- # 仅重试这个失败的情况
94
- code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests')
95
- retry_result, success = retry_with_logging(
96
- evaluate_code,
97
- [code, item.get('language')],
98
- test_name,
99
- error_context
100
- )
101
- if success:
102
- result[i] = retry_result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  item.update(result)
105
  results.append(item)
106
  except Exception as e:
107
- # 处理执行过程中的系统错误
108
- if is_system_error(e):
109
- test_name = item.get('name', '未知测试用例')
110
- error_context = f"执行过程中检测到系统错误: {str(e)}"
111
- retry_result, success = retry_with_logging(
112
- evaluate_single_case,
113
- [item],
114
- test_name,
115
- error_context
116
- )
117
- if success:
118
  item.update(retry_result)
119
  results.append(item)
 
120
  continue
 
 
121
 
122
  # 如果重试失败或不是系统错误,记录原始错误
123
  item.update({"status": "Exception", "error": str(e)})
@@ -127,7 +116,6 @@ def evaluate(input_data):
127
  except Exception as e:
128
  return {"status": "Exception", "error": str(e)}
129
 
130
-
131
  def evaluate_single_case(input_data):
132
  """评估单个代码用例
133
 
@@ -137,10 +125,10 @@ def evaluate_single_case(input_data):
137
  Returns:
138
  dict: 包含评估结果的字典
139
  """
140
- if not isinstance(input_data, dict):
141
- return {"status": "Exception", "error": "Input item must be a dictionary"}
142
-
143
  try:
 
 
 
144
  language = input_data.get('language')
145
  completions = input_data.get('processed_completions', [])
146
 
@@ -160,7 +148,6 @@ def evaluate_single_case(input_data):
160
  except Exception as e:
161
  return {"status": "Exception", "error": str(e)}
162
 
163
-
164
  def evaluate_code(code, language):
165
  """评估特定语言的代码
166
 
@@ -179,7 +166,6 @@ def evaluate_code(code, language):
179
  except Exception as e:
180
  return {"status": "Exception", "error": str(e)}
181
 
182
-
183
  # 创建Gradio接口
184
  demo = gr.Interface(
185
  fn=evaluate,
@@ -189,6 +175,5 @@ demo = gr.Interface(
189
  description="支持多种编程语言的代码评估服务"
190
  )
191
 
192
-
193
  if __name__ == "__main__":
194
- demo.launch()
 
1
+ import gradio as gr
2
+ import json
3
+ import importlib
4
  import os
5
  import sys
6
+ from pathlib import Path
7
  import concurrent.futures
8
  import multiprocessing
 
 
9
  from src.containerized_eval import eval_string_script
10
 
 
11
  # 添加当前目录和src目录到模块搜索路径
12
  current_dir = os.path.dirname(os.path.abspath(__file__))
13
  src_dir = os.path.join(current_dir, "src")
 
16
  if src_dir not in sys.path:
17
  sys.path.append(src_dir)
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  def evaluate(input_data):
20
  """评估代码的主函数
21
 
 
30
  return {"status": "Exception", "error": "Input must be a list"}
31
 
32
  results = []
33
+ # 定义系统错误关键词,用于判断是否需要重试
34
+ system_error_keywords = [
35
+ "resource", "timeout", "busy", "congestion", "memory",
36
+ "connection", "system", "overload", "refused", "reset"
37
+ ]
38
 
39
+ max_workers = multiprocessing.cpu_count()
40
  with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
41
  future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
42
  for future in concurrent.futures.as_completed(future_to_item):
43
  item = future_to_item[future]
44
  try:
45
  result = future.result()
46
+
47
+ # 检查是否是系统错误,如果是,立即重试一次
48
+ if isinstance(result, dict) and result.get("status") == "Exception":
49
+ error_msg = str(result.get("error", "")).lower()
50
+
51
+ # 如果错误信息包含系统错误关键词,则重试
52
+ if any(keyword in error_msg for keyword in system_error_keywords):
53
+ print(f"检测到系统错误: {error_msg},正在重试...")
54
+ # 立即重试
55
+ retry_result = evaluate_single_case(item)
56
+ if isinstance(retry_result, dict) and retry_result.get("status") != "Exception":
57
+ # 重试成功,使用重试结果
58
+ result = retry_result
59
+ print(f"重试成功")
60
+ else:
61
+ print(f"重试失败")
62
+
63
+ # 检查结���列表
64
+ if isinstance(result, list):
65
+ for i, res in enumerate(result):
66
+ if isinstance(res, dict) and res.get("status") == "Exception":
67
+ error_msg = str(res.get("error", "")).lower()
68
+
69
+ # 如果错误信息包含系统错误关键词,则重试
70
+ if any(keyword in error_msg for keyword in system_error_keywords):
71
+ print(f"检测到列表中的系统错误: {error_msg},正在重试...")
72
+ # 仅重试这个失败的情况
73
+ code = item.get('prompt') + item.get('processed_completions', [])[i] + '\n' + item.get('tests')
74
+ retry_result = evaluate_code(code, item.get('language'))
75
+ if isinstance(retry_result, dict) and retry_result.get("status") != "Exception":
76
+ # 重试成功,更新结果
77
+ result[i] = retry_result
78
+ print(f"重试成功")
79
+ else:
80
+ print(f"重试失败")
81
+
82
+ # 如果是超时错误,也尝试重试一次
83
+ if isinstance(result, dict) and result.get("status") == "Timeout":
84
+ print(f"检测到超时错误,正在重试...")
85
+ # 立即重试
86
+ retry_result = evaluate_single_case(item)
87
+ if isinstance(retry_result, dict) and retry_result.get("status") != "Timeout":
88
+ # 重试成功,使用重试结果
89
+ result = retry_result
90
+ print(f"重试成功")
91
+ else:
92
+ print(f"重试失败")
93
 
94
  item.update(result)
95
  results.append(item)
96
  except Exception as e:
97
+ error_msg = str(e).lower()
98
+ # 检查是否是系统错误
99
+ if any(keyword in error_msg for keyword in system_error_keywords):
100
+ print(f"执行过程中检测到系统错误: {error_msg},正在重试...")
101
+ try:
102
+ # 立即重试
103
+ retry_result = evaluate_single_case(item)
 
 
 
 
104
  item.update(retry_result)
105
  results.append(item)
106
+ print(f"重试成功")
107
  continue
108
+ except Exception as retry_e:
109
+ print(f"重试失败: {str(retry_e)}")
110
 
111
  # 如果重试失败或不是系统错误,记录原始错误
112
  item.update({"status": "Exception", "error": str(e)})
 
116
  except Exception as e:
117
  return {"status": "Exception", "error": str(e)}
118
 
 
119
  def evaluate_single_case(input_data):
120
  """评估单个代码用例
121
 
 
125
  Returns:
126
  dict: 包含评估结果的字典
127
  """
 
 
 
128
  try:
129
+ if not isinstance(input_data, dict):
130
+ return {"status": "Exception", "error": "Input item must be a dictionary"}
131
+
132
  language = input_data.get('language')
133
  completions = input_data.get('processed_completions', [])
134
 
 
148
  except Exception as e:
149
  return {"status": "Exception", "error": str(e)}
150
 
 
151
  def evaluate_code(code, language):
152
  """评估特定语言的代码
153
 
 
166
  except Exception as e:
167
  return {"status": "Exception", "error": str(e)}
168
 
 
169
  # 创建Gradio接口
170
  demo = gr.Interface(
171
  fn=evaluate,
 
175
  description="支持多种编程语言的代码评估服务"
176
  )
177
 
 
178
  if __name__ == "__main__":
179
+ demo.launch()