朱东升 commited on
Commit
6df6e43
·
1 Parent(s): 3399cd9
Files changed (1) hide show
  1. app.py +52 -174
app.py CHANGED
@@ -15,7 +15,7 @@ from datetime import datetime
15
  from tqdm.auto import tqdm
16
  from src.containerized_eval import eval_string_script
17
 
18
- # 添加当前目录和src目录到模块搜索路径
19
  current_dir = os.path.dirname(os.path.abspath(__file__))
20
  src_dir = os.path.join(current_dir, "src")
21
  if current_dir not in sys.path:
@@ -23,33 +23,30 @@ if current_dir not in sys.path:
23
  if src_dir not in sys.path:
24
  sys.path.append(src_dir)
25
 
26
- # 创建消息队列
27
  task_queue = queue.Queue()
28
- # 存储任务状态的字典
29
  task_status = {}
30
- # 存储任务历史的列表,最多保存最近20个任务
31
  task_history = []
32
- # 用于保护共享资源的锁
33
  lock = threading.Lock()
34
- # 工作线程数
35
  worker_threads = multiprocessing.cpu_count()
36
- # 后台线程是否运行的标志
37
  running = True
38
- # 任务类型到处理时间的映射
39
  task_type_times = {}
40
 
41
  def queue_processor():
42
- """处理队列中的任务"""
43
  while running:
44
  try:
45
- # 从队列中获取任务,如果队列为空等待0.1
46
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
47
  with lock:
48
  task_status[task_id]['status'] = 'processing'
49
  task_status[task_id]['start_time'] = time.time()
50
 
51
- # 识别任务特征以估计完成时间
52
- # 例如:语言类型、代码大小等
53
  if isinstance(input_data, list) and len(input_data) > 0:
54
  sample_task = input_data[0]
55
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
@@ -63,10 +60,8 @@ def queue_processor():
63
  'complexity': task_complexity
64
  }
65
 
66
- # 处理任务
67
  result = evaluate(input_data)
68
 
69
- # 更新任务状态
70
  end_time = time.time()
71
  process_time = end_time - task_status[task_id]['start_time']
72
 
@@ -76,7 +71,6 @@ def queue_processor():
76
  task_status[task_id]['end_time'] = end_time
77
  task_status[task_id]['process_time'] = process_time
78
 
79
- # 更新任务类型到处理时间的映射
80
  if 'estimated_factors' in task_status[task_id]:
81
  factors = task_status[task_id]['estimated_factors']
82
  key = f"{factors['language']}_{factors['complexity']}"
@@ -84,13 +78,10 @@ def queue_processor():
84
  if key not in task_type_times:
85
  task_type_times[key] = []
86
 
87
- # 记录此类型任务的处理时间
88
  task_type_times[key].append(process_time / factors['size'])
89
- # 只保留最近的10个记录
90
  if len(task_type_times[key]) > 10:
91
  task_type_times[key] = task_type_times[key][-10:]
92
 
93
- # 更新任务历史
94
  task_history.append({
95
  'task_id': task_id,
96
  'request_time': request_time,
@@ -98,15 +89,12 @@ def queue_processor():
98
  'status': 'completed',
99
  'factors': task_status[task_id].get('estimated_factors', {})
100
  })
101
- # 只保留最近20个任务
102
  while len(task_history) > 20:
103
  task_history.pop(0)
104
 
105
- # 标记任务完成
106
  task_queue.task_done()
107
 
108
  except queue.Empty:
109
- # 队列为空,继续等待
110
  continue
111
  except Exception as e:
112
  if 'task_id' in locals():
@@ -117,15 +105,10 @@ def queue_processor():
117
  task_queue.task_done()
118
 
119
  def _estimate_task_complexity(tasks):
120
- """估计任务复杂度
121
 
122
- Args:
123
- tasks: 任务列表
124
-
125
- Returns:
126
- str: 复杂度评级 ('simple', 'medium', 'complex')
127
  """
128
- # 基于代码和测试的长度评估复杂度
129
  total_code_length = 0
130
  count = 0
131
 
@@ -143,7 +126,7 @@ def _estimate_task_complexity(tasks):
143
  count += 1
144
 
145
  if count == 0:
146
- return 'medium' # 默认中等复杂度
147
 
148
  avg_length = total_code_length / count
149
 
@@ -155,14 +138,7 @@ def _estimate_task_complexity(tasks):
155
  return 'complex'
156
 
157
  def evaluate(input_data):
158
- """评估代码的主函数
159
-
160
- Args:
161
- input_data: 列表(批量处理多个测试用例)
162
-
163
- Returns:
164
- list: 包含评估结果的列表
165
- """
166
  try:
167
  if not isinstance(input_data, list):
168
  return {"status": "Exception", "error": "Input must be a list"}
@@ -186,14 +162,7 @@ def evaluate(input_data):
186
  return {"status": "Exception", "error": str(e)}
187
 
188
  def evaluate_single_case(input_data):
189
- """评估单个代码用例
190
-
191
- Args:
192
- input_data: 字典(包含代码信息)
193
-
194
- Returns:
195
- dict: 包含评估结果的字典
196
- """
197
  try:
198
  if not isinstance(input_data, dict):
199
  return {"status": "Exception", "error": "Input item must be a dictionary"}
@@ -218,17 +187,8 @@ def evaluate_single_case(input_data):
218
  return {"status": "Exception", "error": str(e)}
219
 
220
  def evaluate_code(code, language):
221
- """评估特定语言的代码
222
-
223
- Args:
224
- code (str): 要评估的代码
225
- language (str): 编程语言
226
-
227
- Returns:
228
- dict: 包含评估结果的字典
229
- """
230
  try:
231
- # 使用containerized_eval中的eval_string_script函数
232
  result = eval_string_script(language, code)
233
  return result
234
 
@@ -236,17 +196,7 @@ def evaluate_code(code, language):
236
  return {"status": "Exception", "error": str(e)}
237
 
238
  def synchronous_evaluate(input_data):
239
- """同步评估代码,兼容原来的接口
240
-
241
- 这个函数会阻塞直到评估完成,然后返回结果
242
-
243
- Args:
244
- input_data: 要评估的输入数据
245
-
246
- Returns:
247
- dict: 评估结果
248
- """
249
- # a) 估计此任务的特征
250
  if isinstance(input_data, list) and len(input_data) > 0:
251
  sample_task = input_data[0]
252
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
@@ -257,25 +207,21 @@ def synchronous_evaluate(input_data):
257
  task_size = 1
258
  task_complexity = 'medium'
259
 
260
- # b) 估计完成时间用于前端显示
261
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
262
  estimated_total_time = estimated_time_per_task * task_size
263
 
264
- # 获取队列当前状态
265
  queue_info = get_queue_status()
266
  waiting_tasks = queue_info['waiting_tasks']
267
 
268
- # 创建任务
269
  task_id = str(uuid.uuid4())
270
  request_time = time.time()
271
 
272
  with lock:
273
- # 创建任务状态记录
274
  task_status[task_id] = {
275
  'status': 'queued',
276
  'queued_time': request_time,
277
  'queue_position': task_queue.qsize() + 1,
278
- 'synchronous': True, # 标记为同步任务
279
  'estimated_factors': {
280
  'language': language,
281
  'size': task_size,
@@ -284,45 +230,30 @@ def synchronous_evaluate(input_data):
284
  'estimated_time': estimated_total_time
285
  }
286
 
287
- # 将任务添加到队列
288
  task_queue.put((task_id, input_data, request_time))
289
 
290
- # 等待任务完成
291
  while True:
292
  with lock:
293
  if task_id in task_status:
294
  status = task_status[task_id]['status']
295
  if status == 'completed':
296
  result = task_status[task_id]['result']
297
- # 任务完成后清理状态
298
  task_status.pop(task_id, None)
299
  return result
300
  elif status == 'error':
301
- error = task_status[task_id].get('error', '未知错误')
302
- # 任务出错后清理状态
303
  task_status.pop(task_id, None)
304
  return {"status": "Exception", "error": error}
305
 
306
- # 短暂睡眠避免CPU占用过高
307
  time.sleep(0.1)
308
 
309
  def _get_estimated_time_for_task(language, complexity):
310
- """获取特定类型任务的估计处理时间
311
-
312
- Args:
313
- language: 编程语言
314
- complexity: 任务复杂度
315
-
316
- Returns:
317
- float: 估计的处理时间(秒)
318
- """
319
  key = f"{language}_{complexity}"
320
 
321
- # 如果有历史数据,使用中位数作为估计值
322
  if key in task_type_times and len(task_type_times[key]) > 0:
323
  return np.median(task_type_times[key])
324
 
325
- # 否则使用基于复杂度的默认估计值
326
  if complexity == 'simple':
327
  return 1.0
328
  elif complexity == 'medium':
@@ -331,15 +262,7 @@ def _get_estimated_time_for_task(language, complexity):
331
  return 8.0
332
 
333
  def enqueue_task(input_data):
334
- """将任务添加到队列
335
-
336
- Args:
337
- input_data: 要处理的任务数据
338
-
339
- Returns:
340
- dict: 包含任务ID和状态的字典
341
- """
342
- # 估计任务特征和处理时间
343
  if isinstance(input_data, list) and len(input_data) > 0:
344
  sample_task = input_data[0]
345
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
@@ -357,7 +280,6 @@ def enqueue_task(input_data):
357
  request_time = time.time()
358
 
359
  with lock:
360
- # 创建任务状态记录
361
  task_status[task_id] = {
362
  'status': 'queued',
363
  'queued_time': request_time,
@@ -370,11 +292,9 @@ def enqueue_task(input_data):
370
  'estimated_time': estimated_total_time
371
  }
372
 
373
- # 获取队列状态以计算等待时间
374
  queue_info = get_queue_status()
375
  est_wait = queue_info['estimated_wait']
376
 
377
- # 将任务添加到队列
378
  task_queue.put((task_id, input_data, request_time))
379
 
380
  return {
@@ -386,34 +306,21 @@ def enqueue_task(input_data):
386
  }
387
 
388
  def check_status(task_id):
389
- """检查任务状态
390
-
391
- Args:
392
- task_id: 任务ID
393
-
394
- Returns:
395
- dict: 包含任务状态的字典
396
- """
397
  with lock:
398
  if task_id not in task_status:
399
  return {'status': 'not_found'}
400
 
401
  status_info = task_status[task_id].copy()
402
 
403
- # 如果任务已完成,从状态字典中移除(避免内存泄漏)
404
  if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
405
  task_status.pop(task_id, None)
406
 
407
  return status_info
408
 
409
  def get_queue_status():
410
- """获取队列状态
411
-
412
- Returns:
413
- dict: 包含队列状态的字典
414
- """
415
  with lock:
416
- # 获取队列中的所有任务
417
  queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
418
  processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
419
 
@@ -421,61 +328,45 @@ def get_queue_status():
421
  active_tasks = len(processing_tasks)
422
  waiting_tasks = len(queued_tasks)
423
 
424
- # 更准确地估计等待时间
425
- # 1. 计算当前处理中任务的剩余时间
426
  remaining_processing_time = 0
427
  for task in processing_tasks:
428
- # 如果任务有开始时间和估计总时间
429
  if 'start_time' in task and 'estimated_time' in task:
430
  elapsed = time.time() - task['start_time']
431
- # 剩余时间 = 估计总时间 - 已经过去的时间
432
  remaining = max(0, task['estimated_time'] - elapsed)
433
  remaining_processing_time += remaining
434
  else:
435
- # 默认假设还需要2秒
436
  remaining_processing_time += 2
437
 
438
- # 使用动态均衡:根据工作线程数量平衡负载
439
  if active_tasks > 0:
440
  remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
441
 
442
- # 2. 计算排队中任务的估计处理时间
443
  queued_processing_time = 0
444
  for task in queued_tasks:
445
  if 'estimated_time' in task:
446
  queued_processing_time += task['estimated_time']
447
  else:
448
- # 默认假设每个任务5秒
449
  queued_processing_time += 5
450
 
451
- # 考虑并行处理:分摊到可用工作线程
452
  if worker_threads > 0 and queued_processing_time > 0:
453
  queued_processing_time = queued_processing_time / worker_threads
454
 
455
- # 总估计等待时间
456
  estimated_wait = remaining_processing_time + queued_processing_time
457
 
458
- # 应用统计校正:使用历史数据调整预测
459
  if task_history:
460
- # 计算历史预测与实际处理时间的比例
461
  prediction_ratios = []
462
  for task in task_history:
463
  if 'factors' in task and 'estimated_time' in task:
464
  prediction_ratios.append(task['process_time'] / task['estimated_time'])
465
 
466
- # 如果有足够数据,使用中位数比例调整当前预测
467
  if prediction_ratios:
468
  correction_factor = np.median(prediction_ratios)
469
- # 应用校正因子,但限制在合理范围内
470
  correction_factor = max(0.5, min(2.0, correction_factor))
471
  estimated_wait *= correction_factor
472
 
473
- # 确保等待时间有意义
474
  estimated_wait = max(0.1, estimated_wait)
475
  if waiting_tasks == 0 and active_tasks == 0:
476
  estimated_wait = 0
477
 
478
- # 获取最近处理的任务
479
  recent_tasks = task_history[-5:] if task_history else []
480
 
481
  return {
@@ -488,31 +379,20 @@ def get_queue_status():
488
  }
489
 
490
  def format_time(seconds):
491
- """格式化时间为易读格式
492
-
493
- Args:
494
- seconds: 秒数
495
-
496
- Returns:
497
- str: 格式化的时间字符串
498
- """
499
  if seconds < 60:
500
- return f"{seconds:.1f}"
501
  elif seconds < 3600:
502
  minutes = int(seconds / 60)
503
  seconds = seconds % 60
504
- return f"{minutes}{seconds:.1f}"
505
  else:
506
  hours = int(seconds / 3600)
507
  minutes = int((seconds % 3600) / 60)
508
- return f"{hours}小时{minutes}分钟"
509
 
510
  def ui_get_queue_info():
511
- """获取队列信息的UI函数
512
-
513
- Returns:
514
- str: 包含队列信息的HTML
515
- """
516
  queue_info = get_queue_status()
517
 
518
  tasks_html = ""
@@ -525,47 +405,46 @@ def ui_get_queue_info():
525
  </tr>
526
  """
527
 
528
- # 如果没有任务历史,显示提示信息
529
  if not tasks_html:
530
  tasks_html = """
531
  <tr>
532
- <td colspan="3" style="text-align: center; padding: 20px;">暂无历史任务</td>
533
  </tr>
534
  """
535
 
536
  return f"""
537
  <div class="dashboard">
538
  <div class="queue-info-card main-card">
539
- <h3 class="card-title">队列状态监控</h3>
540
  <div class="queue-stats">
541
  <div class="stat-item">
542
  <div class="stat-value">{queue_info['waiting_tasks']}</div>
543
- <div class="stat-label">等待中</div>
544
  </div>
545
  <div class="stat-item">
546
  <div class="stat-value">{queue_info['active_tasks']}</div>
547
- <div class="stat-label">处理中</div>
548
  </div>
549
  <div class="stat-item">
550
  <div class="stat-value">{queue_info['worker_threads']}</div>
551
- <div class="stat-label">工作线程</div>
552
  </div>
553
  </div>
554
 
555
  <div class="wait-time">
556
- <p><b>当前预计等待时间:</b> {format_time(queue_info['estimated_wait'])}</p>
557
- <p class="last-update"><small>最后更新: {datetime.now().strftime('%H:%M:%S')}</small></p>
558
  </div>
559
  </div>
560
 
561
  <div class="queue-info-card history-card">
562
- <h3 class="card-title">最近处理的任务</h3>
563
  <table class="recent-tasks">
564
  <thead>
565
  <tr>
566
- <th>任务ID</th>
567
- <th>请求时间</th>
568
- <th>处理时间</th>
569
  </tr>
570
  </thead>
571
  <tbody>
@@ -577,17 +456,16 @@ def ui_get_queue_info():
577
  """
578
 
579
  def launch_workers():
580
- """启动工作线程"""
581
  global running
582
  running = True
583
 
584
- # 创建工作线程
585
  for _ in range(worker_threads):
586
  worker = threading.Thread(target=queue_processor)
587
  worker.daemon = True
588
  worker.start()
589
 
590
- # 自定义CSS
591
  custom_css = """
592
  .container {
593
  max-width: 1200px;
@@ -741,36 +619,36 @@ button.primary:hover {
741
  }
742
  """
743
 
744
- # 初始化并启动工作线程
745
  launch_workers()
746
 
747
- # 创建Gradio接口
748
  with gr.Blocks(css=custom_css) as demo:
749
- gr.Markdown("# 代码评估服务")
750
- gr.Markdown("支持多种编程语言的代码评估服务,采用队列机制处理请求")
751
 
752
  with gr.Row():
753
  with gr.Column(scale=3):
754
- # 队列状态信息卡片
755
  queue_info_html = gr.HTML()
756
- refresh_queue_btn = gr.Button("刷新队列状态", variant="primary")
757
 
758
- # 隐藏的API接口组件,不在UI上显示
759
  with gr.Row(visible=False):
760
  api_input = gr.JSON()
761
  api_output = gr.JSON()
762
 
763
- # 定义更新函数
764
  def update_queue_info():
765
  return ui_get_queue_info()
766
 
767
- # 定时更新队列信息
768
  demo.load(update_queue_info, None, queue_info_html, every=3)
769
 
770
- # 刷新按钮事件
771
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
772
 
773
- # 添加兼容原有接口的评估端点,但不在UI显示
774
  demo.queue()
775
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
776
 
@@ -778,5 +656,5 @@ if __name__ == "__main__":
778
  try:
779
  demo.launch()
780
  finally:
781
- # 停止工作线程
782
  running = False
 
15
  from tqdm.auto import tqdm
16
  from src.containerized_eval import eval_string_script
17
 
18
+ # Add current directory and src directory to module search path
19
  current_dir = os.path.dirname(os.path.abspath(__file__))
20
  src_dir = os.path.join(current_dir, "src")
21
  if current_dir not in sys.path:
 
23
  if src_dir not in sys.path:
24
  sys.path.append(src_dir)
25
 
26
+ # Create message queue
27
  task_queue = queue.Queue()
28
+ # Dictionary to store task status
29
  task_status = {}
30
+ # List to store task history, max 20 tasks
31
  task_history = []
32
+ # Lock for shared resources
33
  lock = threading.Lock()
34
+ # Number of worker threads
35
  worker_threads = multiprocessing.cpu_count()
36
+ # Flag for running background threads
37
  running = True
38
+ # Mapping from task type to processing time
39
  task_type_times = {}
40
 
41
  def queue_processor():
42
+ """Process tasks in the queue"""
43
  while running:
44
  try:
 
45
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
  with lock:
47
  task_status[task_id]['status'] = 'processing'
48
  task_status[task_id]['start_time'] = time.time()
49
 
 
 
50
  if isinstance(input_data, list) and len(input_data) > 0:
51
  sample_task = input_data[0]
52
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
 
60
  'complexity': task_complexity
61
  }
62
 
 
63
  result = evaluate(input_data)
64
 
 
65
  end_time = time.time()
66
  process_time = end_time - task_status[task_id]['start_time']
67
 
 
71
  task_status[task_id]['end_time'] = end_time
72
  task_status[task_id]['process_time'] = process_time
73
 
 
74
  if 'estimated_factors' in task_status[task_id]:
75
  factors = task_status[task_id]['estimated_factors']
76
  key = f"{factors['language']}_{factors['complexity']}"
 
78
  if key not in task_type_times:
79
  task_type_times[key] = []
80
 
 
81
  task_type_times[key].append(process_time / factors['size'])
 
82
  if len(task_type_times[key]) > 10:
83
  task_type_times[key] = task_type_times[key][-10:]
84
 
 
85
  task_history.append({
86
  'task_id': task_id,
87
  'request_time': request_time,
 
89
  'status': 'completed',
90
  'factors': task_status[task_id].get('estimated_factors', {})
91
  })
 
92
  while len(task_history) > 20:
93
  task_history.pop(0)
94
 
 
95
  task_queue.task_done()
96
 
97
  except queue.Empty:
 
98
  continue
99
  except Exception as e:
100
  if 'task_id' in locals():
 
105
  task_queue.task_done()
106
 
107
  def _estimate_task_complexity(tasks):
108
+ """Estimate task complexity
109
 
110
+ Returns: 'simple', 'medium', or 'complex'
 
 
 
 
111
  """
 
112
  total_code_length = 0
113
  count = 0
114
 
 
126
  count += 1
127
 
128
  if count == 0:
129
+ return 'medium'
130
 
131
  avg_length = total_code_length / count
132
 
 
138
  return 'complex'
139
 
140
  def evaluate(input_data):
141
+ """Main function for code evaluation"""
 
 
 
 
 
 
 
142
  try:
143
  if not isinstance(input_data, list):
144
  return {"status": "Exception", "error": "Input must be a list"}
 
162
  return {"status": "Exception", "error": str(e)}
163
 
164
  def evaluate_single_case(input_data):
165
+ """Evaluate a single code case"""
 
 
 
 
 
 
 
166
  try:
167
  if not isinstance(input_data, dict):
168
  return {"status": "Exception", "error": "Input item must be a dictionary"}
 
187
  return {"status": "Exception", "error": str(e)}
188
 
189
  def evaluate_code(code, language):
190
+ """Evaluate code in a specific language"""
 
 
 
 
 
 
 
 
191
  try:
 
192
  result = eval_string_script(language, code)
193
  return result
194
 
 
196
  return {"status": "Exception", "error": str(e)}
197
 
198
  def synchronous_evaluate(input_data):
199
+ """Synchronously evaluate code, compatible with original interface"""
 
 
 
 
 
 
 
 
 
 
200
  if isinstance(input_data, list) and len(input_data) > 0:
201
  sample_task = input_data[0]
202
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
 
207
  task_size = 1
208
  task_complexity = 'medium'
209
 
 
210
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
211
  estimated_total_time = estimated_time_per_task * task_size
212
 
 
213
  queue_info = get_queue_status()
214
  waiting_tasks = queue_info['waiting_tasks']
215
 
 
216
  task_id = str(uuid.uuid4())
217
  request_time = time.time()
218
 
219
  with lock:
 
220
  task_status[task_id] = {
221
  'status': 'queued',
222
  'queued_time': request_time,
223
  'queue_position': task_queue.qsize() + 1,
224
+ 'synchronous': True,
225
  'estimated_factors': {
226
  'language': language,
227
  'size': task_size,
 
230
  'estimated_time': estimated_total_time
231
  }
232
 
 
233
  task_queue.put((task_id, input_data, request_time))
234
 
 
235
  while True:
236
  with lock:
237
  if task_id in task_status:
238
  status = task_status[task_id]['status']
239
  if status == 'completed':
240
  result = task_status[task_id]['result']
 
241
  task_status.pop(task_id, None)
242
  return result
243
  elif status == 'error':
244
+ error = task_status[task_id].get('error', 'Unknown error')
 
245
  task_status.pop(task_id, None)
246
  return {"status": "Exception", "error": error}
247
 
 
248
  time.sleep(0.1)
249
 
250
  def _get_estimated_time_for_task(language, complexity):
251
+ """Get estimated processing time for a specific task type"""
 
 
 
 
 
 
 
 
252
  key = f"{language}_{complexity}"
253
 
 
254
  if key in task_type_times and len(task_type_times[key]) > 0:
255
  return np.median(task_type_times[key])
256
 
 
257
  if complexity == 'simple':
258
  return 1.0
259
  elif complexity == 'medium':
 
262
  return 8.0
263
 
264
  def enqueue_task(input_data):
265
+ """Add task to queue"""
 
 
 
 
 
 
 
 
266
  if isinstance(input_data, list) and len(input_data) > 0:
267
  sample_task = input_data[0]
268
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
 
280
  request_time = time.time()
281
 
282
  with lock:
 
283
  task_status[task_id] = {
284
  'status': 'queued',
285
  'queued_time': request_time,
 
292
  'estimated_time': estimated_total_time
293
  }
294
 
 
295
  queue_info = get_queue_status()
296
  est_wait = queue_info['estimated_wait']
297
 
 
298
  task_queue.put((task_id, input_data, request_time))
299
 
300
  return {
 
306
  }
307
 
308
  def check_status(task_id):
309
+ """Check task status"""
 
 
 
 
 
 
 
310
  with lock:
311
  if task_id not in task_status:
312
  return {'status': 'not_found'}
313
 
314
  status_info = task_status[task_id].copy()
315
 
 
316
  if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
317
  task_status.pop(task_id, None)
318
 
319
  return status_info
320
 
321
  def get_queue_status():
322
+ """Get queue status"""
 
 
 
 
323
  with lock:
 
324
  queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
325
  processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
326
 
 
328
  active_tasks = len(processing_tasks)
329
  waiting_tasks = len(queued_tasks)
330
 
 
 
331
  remaining_processing_time = 0
332
  for task in processing_tasks:
 
333
  if 'start_time' in task and 'estimated_time' in task:
334
  elapsed = time.time() - task['start_time']
 
335
  remaining = max(0, task['estimated_time'] - elapsed)
336
  remaining_processing_time += remaining
337
  else:
 
338
  remaining_processing_time += 2
339
 
 
340
  if active_tasks > 0:
341
  remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
342
 
 
343
  queued_processing_time = 0
344
  for task in queued_tasks:
345
  if 'estimated_time' in task:
346
  queued_processing_time += task['estimated_time']
347
  else:
 
348
  queued_processing_time += 5
349
 
 
350
  if worker_threads > 0 and queued_processing_time > 0:
351
  queued_processing_time = queued_processing_time / worker_threads
352
 
 
353
  estimated_wait = remaining_processing_time + queued_processing_time
354
 
 
355
  if task_history:
 
356
  prediction_ratios = []
357
  for task in task_history:
358
  if 'factors' in task and 'estimated_time' in task:
359
  prediction_ratios.append(task['process_time'] / task['estimated_time'])
360
 
 
361
  if prediction_ratios:
362
  correction_factor = np.median(prediction_ratios)
 
363
  correction_factor = max(0.5, min(2.0, correction_factor))
364
  estimated_wait *= correction_factor
365
 
 
366
  estimated_wait = max(0.1, estimated_wait)
367
  if waiting_tasks == 0 and active_tasks == 0:
368
  estimated_wait = 0
369
 
 
370
  recent_tasks = task_history[-5:] if task_history else []
371
 
372
  return {
 
379
  }
380
 
381
  def format_time(seconds):
382
+ """Format time into readable format"""
 
 
 
 
 
 
 
383
  if seconds < 60:
384
+ return f"{seconds:.1f} seconds"
385
  elif seconds < 3600:
386
  minutes = int(seconds / 60)
387
  seconds = seconds % 60
388
+ return f"{minutes}m {seconds:.1f}s"
389
  else:
390
  hours = int(seconds / 3600)
391
  minutes = int((seconds % 3600) / 60)
392
+ return f"{hours}h {minutes}m"
393
 
394
  def ui_get_queue_info():
395
+ """Get queue info for UI"""
 
 
 
 
396
  queue_info = get_queue_status()
397
 
398
  tasks_html = ""
 
405
  </tr>
406
  """
407
 
 
408
  if not tasks_html:
409
  tasks_html = """
410
  <tr>
411
+ <td colspan="3" style="text-align: center; padding: 20px;">No historical tasks</td>
412
  </tr>
413
  """
414
 
415
  return f"""
416
  <div class="dashboard">
417
  <div class="queue-info-card main-card">
418
+ <h3 class="card-title">Queue Status Monitor</h3>
419
  <div class="queue-stats">
420
  <div class="stat-item">
421
  <div class="stat-value">{queue_info['waiting_tasks']}</div>
422
+ <div class="stat-label">Waiting</div>
423
  </div>
424
  <div class="stat-item">
425
  <div class="stat-value">{queue_info['active_tasks']}</div>
426
+ <div class="stat-label">Processing</div>
427
  </div>
428
  <div class="stat-item">
429
  <div class="stat-value">{queue_info['worker_threads']}</div>
430
+ <div class="stat-label">Worker Threads</div>
431
  </div>
432
  </div>
433
 
434
  <div class="wait-time">
435
+ <p><b>Current Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
436
+ <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
437
  </div>
438
  </div>
439
 
440
  <div class="queue-info-card history-card">
441
+ <h3 class="card-title">Recently Processed Tasks</h3>
442
  <table class="recent-tasks">
443
  <thead>
444
  <tr>
445
+ <th>Task ID</th>
446
+ <th>Request Time</th>
447
+ <th>Processing Time</th>
448
  </tr>
449
  </thead>
450
  <tbody>
 
456
  """
457
 
458
  def launch_workers():
459
+ """Launch worker threads"""
460
  global running
461
  running = True
462
 
 
463
  for _ in range(worker_threads):
464
  worker = threading.Thread(target=queue_processor)
465
  worker.daemon = True
466
  worker.start()
467
 
468
+ # Custom CSS
469
  custom_css = """
470
  .container {
471
  max-width: 1200px;
 
619
  }
620
  """
621
 
622
+ # Initialize and launch worker threads
623
  launch_workers()
624
 
625
+ # Create Gradio interface
626
  with gr.Blocks(css=custom_css) as demo:
627
+ gr.Markdown("# Code Evaluation Service")
628
+ gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests")
629
 
630
  with gr.Row():
631
  with gr.Column(scale=3):
632
+ # Queue status info card
633
  queue_info_html = gr.HTML()
634
+ refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary")
635
 
636
+ # Hidden API interface components
637
  with gr.Row(visible=False):
638
  api_input = gr.JSON()
639
  api_output = gr.JSON()
640
 
641
+ # Define update function
642
  def update_queue_info():
643
  return ui_get_queue_info()
644
 
645
+ # Update queue info periodically
646
  demo.load(update_queue_info, None, queue_info_html, every=3)
647
 
648
+ # Refresh button event
649
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
650
 
651
+ # Add evaluation endpoint compatible with original interface
652
  demo.queue()
653
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
654
 
 
656
  try:
657
  demo.launch()
658
  finally:
659
+ # Stop worker threads
660
  running = False