朱东升 commited on
Commit
7032d9e
·
1 Parent(s): 49b4498

requirements update31

Browse files
Files changed (1) hide show
  1. app.py +234 -8
app.py CHANGED
@@ -6,6 +6,10 @@ import sys
6
  from pathlib import Path
7
  import concurrent.futures
8
  import multiprocessing
 
 
 
 
9
  from src.containerized_eval import eval_string_script
10
 
11
  # 添加当前目录和src目录到模块搜索路径
@@ -97,14 +101,236 @@ def evaluate_code(code, language):
97
  except Exception as e:
98
  return {"status": "Exception", "error": str(e)}
99
 
100
- # 创建Gradio接口
101
- demo = gr.Interface(
102
- fn=evaluate,
103
- inputs=gr.JSON(),
104
- outputs=gr.JSON(),
105
- title="代码评估服务",
106
- description="支持多种编程语言的代码评估服务"
107
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
 
109
  if __name__ == "__main__":
110
  demo.launch()
 
6
  from pathlib import Path
7
  import concurrent.futures
8
  import multiprocessing
9
+ import time
10
+ import threading
11
+ import queue
12
+ from datetime import datetime, timedelta
13
  from src.containerized_eval import eval_string_script
14
 
15
  # 添加当前目录和src目录到模块搜索路径
 
101
  except Exception as e:
102
  return {"status": "Exception", "error": str(e)}
103
 
104
+ # 创建任务队列和相关变量
105
+ task_queue = queue.Queue()
106
+ tasks_info = []
107
+ tasks_lock = threading.Lock()
108
+ total_tasks_count = 0
109
+ completed_tasks_count = 0
110
+ estimated_completion_time = None
111
+ average_task_time = 5 # 默认平均任务时间(秒)
112
+
113
+ def add_task(input_data):
114
+ """添加任务到队列
115
+
116
+ Args:
117
+ input_data: 任务数据
118
+
119
+ Returns:
120
+ dict: 包含任务ID和状态的字典
121
+ """
122
+ global total_tasks_count, tasks_info
123
+
124
+ try:
125
+ if not isinstance(input_data, list):
126
+ return {"status": "错误", "message": "输入必须是列表格式"}
127
+
128
+ task_id = int(time.time() * 1000) # 使用时间戳作为任务ID
129
+ task_count = len(input_data)
130
+
131
+ with tasks_lock:
132
+ for item in input_data:
133
+ task_queue.put((task_id, item))
134
+ total_tasks_count += 1
135
+
136
+ # 更新任务信息
137
+ task_info = {
138
+ "id": task_id,
139
+ "count": task_count,
140
+ "status": "等待中",
141
+ "submit_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
142
+ "completed": 0
143
+ }
144
+ tasks_info.append(task_info)
145
+
146
+ # 更新预计完成时间
147
+ update_estimated_completion_time()
148
+
149
+ return {"status": "成功", "task_id": task_id, "message": f"已添加{task_count}个任务到队列"}
150
+
151
+ except Exception as e:
152
+ return {"status": "错误", "message": str(e)}
153
+
154
+ def update_estimated_completion_time():
155
+ """更新预计完成时间"""
156
+ global estimated_completion_time, average_task_time
157
+
158
+ remaining_tasks = total_tasks_count - completed_tasks_count
159
+ if remaining_tasks > 0:
160
+ # 计算预计完成时间
161
+ estimated_seconds = remaining_tasks * average_task_time
162
+ estimated_completion_time = datetime.now() + timedelta(seconds=estimated_seconds)
163
+ else:
164
+ estimated_completion_time = None
165
+
166
+ def process_tasks():
167
+ """处理队列中的任务"""
168
+ global completed_tasks_count, average_task_time, tasks_info
169
+
170
+ while True:
171
+ try:
172
+ if not task_queue.empty():
173
+ start_time = time.time()
174
+
175
+ # 获取任务并处理
176
+ task_id, task_data = task_queue.get()
177
+ result = evaluate_single_case(task_data)
178
+
179
+ # 更新任务状态
180
+ with tasks_lock:
181
+ completed_tasks_count += 1
182
+
183
+ # 更新平均任务时间(使用移动平均)
184
+ task_time = time.time() - start_time
185
+ average_task_time = (average_task_time * 0.9) + (task_time * 0.1)
186
+
187
+ # 更新任务信息
188
+ for task in tasks_info:
189
+ if task["id"] == task_id:
190
+ task["completed"] += 1
191
+ if task["completed"] >= task["count"]:
192
+ task["status"] = "已完成"
193
+ break
194
+
195
+ # 更新预计完成时间
196
+ update_estimated_completion_time()
197
+
198
+ task_queue.task_done()
199
+ else:
200
+ time.sleep(0.1)
201
+ except Exception as e:
202
+ print(f"处理任务时出错: {str(e)}")
203
+ time.sleep(1)
204
+
205
+ # 启动任务处理线程
206
+ for _ in range(multiprocessing.cpu_count()):
207
+ threading.Thread(target=process_tasks, daemon=True).start()
208
+
209
+ def get_queue_status():
210
+ """获取队列状态
211
+
212
+ Returns:
213
+ tuple: 包含任务信息、完成进度和预计完成时间的元组
214
+ """
215
+ with tasks_lock:
216
+ # 复制任务信息以避免并发问题
217
+ current_tasks = tasks_info.copy()
218
+
219
+ # 计算进度
220
+ if total_tasks_count > 0:
221
+ progress = completed_tasks_count / total_tasks_count
222
+ else:
223
+ progress = 0
224
+
225
+ # 格式化预计完成时间
226
+ if estimated_completion_time:
227
+ eta = estimated_completion_time.strftime("%Y-%m-%d %H:%M:%S")
228
+ remaining_seconds = (estimated_completion_time - datetime.now()).total_seconds()
229
+
230
+ if remaining_seconds < 60:
231
+ eta_text = f"{int(remaining_seconds)}秒后完成"
232
+ elif remaining_seconds < 3600:
233
+ eta_text = f"{int(remaining_seconds/60)}分钟后完成"
234
+ else:
235
+ eta_text = f"{int(remaining_seconds/3600)}小时{int((remaining_seconds%3600)/60)}分钟后完成"
236
+ else:
237
+ eta = "无任务"
238
+ eta_text = "无任务"
239
+
240
+ return current_tasks, progress, eta, eta_text
241
+
242
+ # 创建Gradio界面
243
+ with gr.Blocks(title="代码评估服务", theme=gr.themes.Soft(primary_hue="blue")) as demo:
244
+ gr.Markdown(
245
+ """
246
+ # 代码评估服务任务队列监控
247
+
248
+ 实时监控系统中的任务队列状态和预计完成时间
249
+ """
250
+ )
251
+
252
+ with gr.Row():
253
+ with gr.Column(scale=2):
254
+ # 任务提交区域
255
+ with gr.Box():
256
+ gr.Markdown("### 提交新任务")
257
+ input_json = gr.JSON(label="输入数据(JSON格式列表)")
258
+ submit_btn = gr.Button("提交任务", variant="primary")
259
+
260
+ with gr.Column(scale=3):
261
+ # 状态概览区域
262
+ with gr.Box():
263
+ gr.Markdown("### 队列状态概览")
264
+ with gr.Row():
265
+ with gr.Column():
266
+ total_tasks = gr.Textbox(label="总任务数", value="0")
267
+ with gr.Column():
268
+ completed = gr.Textbox(label="已完成任务", value="0")
269
+ with gr.Column():
270
+ remaining = gr.Textbox(label="剩余任务", value="0")
271
+
272
+ progress_bar = gr.Slider(minimum=0, maximum=1, value=0, label="完成进度", interactive=False)
273
+ eta_display = gr.Textbox(label="预计完成时间", value="无任务")
274
+
275
+ # 任务列表区域
276
+ with gr.Box():
277
+ gr.Markdown("### 任务队列详情")
278
+ task_table = gr.Dataframe(
279
+ headers=["任务ID", "任务数量", "状态", "提交时间", "已完成/总数"],
280
+ datatype=["str", "number", "str", "str", "str"],
281
+ row_count=10,
282
+ col_count=(5, "fixed"),
283
+ interactive=False
284
+ )
285
+
286
+ # 刷新按钮
287
+ refresh_btn = gr.Button("刷新状态", variant="secondary")
288
+
289
+ # 添加任务的处理函数
290
+ def handle_submit(input_data):
291
+ result = add_task(input_data)
292
+ return update_ui_status(), result["message"]
293
+
294
+ # 更新UI状态的函数
295
+ def update_ui_status():
296
+ tasks, progress, eta, eta_text = get_queue_status()
297
+
298
+ # 准备表格数据
299
+ table_data = []
300
+ for task in tasks:
301
+ table_data.append([
302
+ str(task["id"]),
303
+ task["count"],
304
+ task["status"],
305
+ task["submit_time"],
306
+ f"{task['completed']}/{task['count']}"
307
+ ])
308
+
309
+ return (
310
+ table_data,
311
+ str(total_tasks_count),
312
+ str(completed_tasks_count),
313
+ str(total_tasks_count - completed_tasks_count),
314
+ progress,
315
+ eta_text
316
+ )
317
+
318
+ # 设置事件处理
319
+ submit_btn.click(
320
+ handle_submit,
321
+ inputs=[input_json],
322
+ outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display, gr.Textbox(visible=False)]
323
+ )
324
+
325
+ refresh_btn.click(
326
+ update_ui_status,
327
+ inputs=[],
328
+ outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display]
329
+ )
330
+
331
+ # 定时刷新UI
332
+ demo.load(update_ui_status, inputs=[], outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display])
333
+ demo.every(3, update_ui_status, inputs=[], outputs=[task_table, total_tasks, completed, remaining, progress_bar, eta_display])
334
 
335
  if __name__ == "__main__":
336
  demo.launch()