3v324v23 commited on
Commit
0b56f9f
·
1 Parent(s): 0985e79
Files changed (1) hide show
  1. app.py +62 -320
app.py CHANGED
@@ -31,74 +31,21 @@ task_status = {}
31
  task_history = []
32
  # Lock for shared resources
33
  lock = threading.Lock()
34
- # Number of worker threads - set to 1 to process one task at a time
35
- worker_threads = 1
36
  # Flag for running background threads
37
  running = True
38
  # Mapping from task type to processing time
39
  task_type_times = {}
40
- # Currently processing tasks counter
41
- processing_count = 0
42
- # Available CPU cores for task processing
43
- available_cores = multiprocessing.cpu_count()
44
- # Task ID counter for debugging
45
- task_counter = 0
46
-
47
- # Enable logging
48
- DEBUG_MODE = True
49
-
50
- def debug_log(message):
51
- """Log debug messages if debug mode is enabled"""
52
- if DEBUG_MODE:
53
- print(f"[DEBUG] {datetime.now().strftime('%H:%M:%S')} - {message}")
54
 
55
  def queue_processor():
56
  """Process tasks in the queue"""
57
- global processing_count
58
-
59
  while running:
60
  try:
61
- # Only process if we're not already processing a task
62
- with lock:
63
- if processing_count >= worker_threads:
64
- # Already processing a task, wait and try again
65
- time.sleep(0.5)
66
- continue
67
-
68
- # Check queue size before attempting to get a task
69
- queue_size = task_queue.qsize()
70
- if queue_size > 0:
71
- debug_log(f"Queue processor found {queue_size} tasks waiting")
72
- else:
73
- # No tasks waiting, sleep briefly to avoid CPU spinning
74
- time.sleep(0.1)
75
- continue
76
-
77
- # Get a task from the queue with small timeout to prevent blocking
78
- try:
79
- task_id, input_data, request_time = task_queue.get(timeout=0.1)
80
- debug_log(f"Processing task {task_id}")
81
- except queue.Empty:
82
- continue
83
-
84
- # Increment processing count to track active tasks
85
  with lock:
86
- processing_count += 1
87
- debug_log(f"Incremented processing count to {processing_count}")
88
-
89
- # Update task status
90
- if task_id in task_status:
91
- task_status[task_id]['status'] = 'processing'
92
- task_status[task_id]['start_time'] = time.time()
93
- debug_log(f"Updated existing task {task_id} to processing state")
94
- else:
95
- # Create task status entry if it doesn't exist
96
- task_status[task_id] = {
97
- 'status': 'processing',
98
- 'queued_time': request_time,
99
- 'start_time': time.time()
100
- }
101
- debug_log(f"Created new task status entry for {task_id}")
102
 
103
  if isinstance(input_data, list) and len(input_data) > 0:
104
  sample_task = input_data[0]
@@ -113,24 +60,16 @@ def queue_processor():
113
  'complexity': task_complexity
114
  }
115
 
116
- debug_log(f"Starting evaluation for task {task_id}")
117
  result = evaluate(input_data)
118
- debug_log(f"Finished evaluation for task {task_id}")
119
 
120
  end_time = time.time()
121
  process_time = end_time - task_status[task_id]['start_time']
122
 
123
  with lock:
124
- # Decrease processing count now that we're done
125
- processing_count -= 1
126
- debug_log(f"Decremented processing count to {processing_count}")
127
-
128
- # Update task status
129
  task_status[task_id]['status'] = 'completed'
130
  task_status[task_id]['result'] = result
131
  task_status[task_id]['end_time'] = end_time
132
  task_status[task_id]['process_time'] = process_time
133
- debug_log(f"Updated task {task_id} to completed state")
134
 
135
  if 'estimated_factors' in task_status[task_id]:
136
  factors = task_status[task_id]['estimated_factors']
@@ -154,32 +93,15 @@ def queue_processor():
154
  task_history.pop(0)
155
 
156
  task_queue.task_done()
157
- debug_log(f"Task {task_id} completed and marked as done")
158
 
159
  except queue.Empty:
160
- # Use a small timeout to avoid CPU spinning
161
- time.sleep(0.1)
162
  except Exception as e:
163
- debug_log(f"Error in queue processor: {str(e)}")
164
  if 'task_id' in locals():
165
- debug_log(f"Error occurred while processing task {task_id}")
166
  with lock:
167
- # Decrease processing count on error
168
- processing_count -= 1
169
- debug_log(f"Decremented processing count to {processing_count} due to error")
170
-
171
- if task_id in task_status:
172
- task_status[task_id]['status'] = 'error'
173
- task_status[task_id]['error'] = str(e)
174
- task_status[task_id]['end_time'] = time.time()
175
- debug_log(f"Updated task {task_id} to error state")
176
- else:
177
- task_status[task_id] = {
178
- 'status': 'error',
179
- 'error': str(e),
180
- 'end_time': time.time()
181
- }
182
- debug_log(f"Created new error entry for task {task_id}")
183
  task_queue.task_done()
184
 
185
  def _estimate_task_complexity(tasks):
@@ -223,8 +145,9 @@ def evaluate(input_data):
223
 
224
  results = []
225
 
226
- # Use all available cores for this single task but with a reasonable cap
227
- max_workers = max(1, min(available_cores // 2, 8))
 
228
 
229
  with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
230
  future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
@@ -291,47 +214,53 @@ def evaluate_code(code, language):
291
 
292
  def synchronous_evaluate(input_data):
293
  """Synchronously evaluate code, compatible with original interface"""
294
- debug_log(f"Received synchronous evaluation request")
 
 
 
 
 
 
 
 
295
 
296
- # Add metadata to identify sync requests
297
- if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict):
298
- if 'metadata' not in input_data[0]:
299
- input_data[0]['metadata'] = {}
300
- input_data[0]['metadata']['source'] = 'sync_api'
301
 
302
- # Create a task and queue it
303
- task_info = enqueue_task(input_data)
304
- task_id = task_info['task_id']
305
- debug_log(f"Created task {task_id} for synchronous evaluation")
306
 
307
- # Ensure the task appears in the queue UI, add artificial delay if needed
308
- time.sleep(0.1) # Small delay to make sure the task is visible in queue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
- # Wait for task to complete
311
  while True:
312
  with lock:
313
  if task_id in task_status:
314
  status = task_status[task_id]['status']
315
  if status == 'completed':
316
- debug_log(f"Task {task_id} completed, returning result")
317
  result = task_status[task_id]['result']
318
- # Keep the result in status for a short time to ensure it shows in history
319
- if 'end_time' not in task_status[task_id]:
320
- task_status[task_id]['end_time'] = time.time()
321
- elif time.time() - task_status[task_id]['end_time'] > 5:
322
- task_status.pop(task_id, None)
323
  return result
324
  elif status == 'error':
325
- debug_log(f"Task {task_id} failed with error")
326
  error = task_status[task_id].get('error', 'Unknown error')
327
- # Keep the error in status for a short time to ensure it shows in history
328
- if 'end_time' not in task_status[task_id]:
329
- task_status[task_id]['end_time'] = time.time()
330
- elif time.time() - task_status[task_id]['end_time'] > 5:
331
- task_status.pop(task_id, None)
332
  return {"status": "Exception", "error": error}
333
- else:
334
- debug_log(f"Task {task_id} still in status: {status}")
335
 
336
  time.sleep(0.1)
337
 
@@ -351,8 +280,6 @@ def _get_estimated_time_for_task(language, complexity):
351
 
352
  def enqueue_task(input_data):
353
  """Add task to queue"""
354
- global task_counter
355
-
356
  if isinstance(input_data, list) and len(input_data) > 0:
357
  sample_task = input_data[0]
358
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
@@ -366,36 +293,17 @@ def enqueue_task(input_data):
366
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
367
  estimated_total_time = estimated_time_per_task * task_size
368
 
369
- # Generate task ID in a thread-safe way
370
- with lock:
371
- task_counter += 1
372
- local_counter = task_counter
373
-
374
- task_id = f"task_{local_counter}_{str(uuid.uuid4())[:8]}"
375
  request_time = time.time()
376
 
377
- debug_log(f"Creating new task: {task_id}")
 
378
 
379
- # Track if this is a synchronous or asynchronous submission
380
- is_async = 'async_submission' in str(threading.current_thread().name).lower() or 'async' in input_data[0].get('metadata', {}).get('source', '') if isinstance(input_data, list) and input_data and isinstance(input_data[0], dict) and 'metadata' in input_data[0] else False
381
-
382
- # Get current queue status before adding to task_status
383
  with lock:
384
- # Count actual queue status - both in queue AND waiting in task_status
385
- current_queue_size = task_queue.qsize()
386
- actual_waiting = sum(1 for t in task_status.values() if t['status'] == 'queued')
387
- total_waiting = actual_waiting # Use the actual count from task_status
388
-
389
- debug_log(f"Current queue metrics: queue_size={current_queue_size}, task_status_waiting={actual_waiting}, total={total_waiting}")
390
-
391
- queue_position = total_waiting + 1
392
-
393
- # Add to task_status with 'queued' status first
394
  task_status[task_id] = {
395
  'status': 'queued',
396
  'queued_time': request_time,
397
- 'queue_position': queue_position,
398
- 'is_async': is_async,
399
  'estimated_factors': {
400
  'language': language,
401
  'size': task_size,
@@ -403,27 +311,11 @@ def enqueue_task(input_data):
403
  },
404
  'estimated_time': estimated_total_time
405
  }
406
- debug_log(f"Added task {task_id} to task_status with queue position {queue_position}")
407
 
408
- # Get queue info for wait time estimation
409
  queue_info = get_queue_status()
410
  est_wait = queue_info['estimated_wait']
411
- debug_log(f"Estimated wait time for task {task_id}: {est_wait} seconds")
412
 
413
- # Add to the task queue - this must be done AFTER adding to task_status
414
  task_queue.put((task_id, input_data, request_time))
415
- debug_log(f"Added task {task_id} to task_queue")
416
-
417
- # Count queued tasks in task_status after adding
418
- with lock:
419
- queued_count = sum(1 for t in task_status.values() if t['status'] == 'queued')
420
- processing_tasks = sum(1 for t in task_status.values() if t['status'] == 'processing')
421
- debug_log(f"Queue status after adding: {task_queue.qsize()} in queue, {queued_count} with 'queued' status, {processing_tasks} processing")
422
-
423
- # Display all task IDs currently in queue
424
- task_ids = [(k, v['status']) for k, v in task_status.items() if v['status'] in ('queued', 'processing')]
425
- if task_ids:
426
- debug_log(f"Current tasks: {task_ids}")
427
 
428
  return {
429
  'task_id': task_id,
@@ -449,23 +341,13 @@ def check_status(task_id):
449
  def get_queue_status():
450
  """Get queue status"""
451
  with lock:
452
- queued_tasks = [v for k, v in task_status.items() if v['status'] == 'queued']
453
- processing_tasks = [v for k, v in task_status.items() if v['status'] == 'processing']
454
-
455
- queue_size = task_queue.qsize()
456
- active_tasks = processing_count
457
- waiting_tasks = len(queued_tasks)
458
 
459
- debug_log(f"Queue status check: size={queue_size}, active={active_tasks}, waiting={waiting_tasks}")
460
- if waiting_tasks != queue_size and abs(waiting_tasks - queue_size) > 1:
461
- debug_log(f"WARNING: Queue size mismatch - task_queue has {queue_size} items but task_status has {waiting_tasks} queued items")
462
-
463
- debug_log(f"Queue status details: {len(queued_tasks)} queued tasks found in task_status")
464
- if queued_tasks:
465
- task_ids = [k for k, v in task_status.items() if v['status'] == 'queued']
466
- debug_log(f"Queued task IDs: {task_ids}")
467
 
468
- # Calculate remaining processing time for active tasks
469
  remaining_processing_time = 0
470
  for task in processing_tasks:
471
  if 'start_time' in task and 'estimated_time' in task:
@@ -485,9 +367,6 @@ def get_queue_status():
485
  else:
486
  queued_processing_time += 5
487
 
488
- if worker_threads > 0 and queued_processing_time > 0:
489
- queued_processing_time = queued_processing_time / worker_threads
490
-
491
  estimated_wait = remaining_processing_time + queued_processing_time
492
 
493
  if task_history:
@@ -509,7 +388,7 @@ def get_queue_status():
509
 
510
  return {
511
  'queue_size': queue_size,
512
- 'active_tasks': active_tasks,
513
  'waiting_tasks': waiting_tasks,
514
  'worker_threads': worker_threads,
515
  'estimated_wait': estimated_wait,
@@ -533,28 +412,6 @@ def ui_get_queue_info():
533
  """Get queue info for UI"""
534
  queue_info = get_queue_status()
535
 
536
- # List queued tasks with details - make sure to use task_id as key
537
- queued_tasks_html = ""
538
- with lock:
539
- queued_tasks = []
540
- for task_id, task in task_status.items():
541
- if task['status'] == 'queued':
542
- task_with_id = task.copy()
543
- task_with_id['task_id'] = task_id
544
- queued_tasks.append(task_with_id)
545
-
546
- if queued_tasks:
547
- # Sort by queue position
548
- queued_tasks.sort(key=lambda x: x.get('queue_position', 999999))
549
- queued_tasks_html = "<div class='queued-tasks'><h4>Tasks in Queue:</h4><ul>"
550
- for idx, task in enumerate(queued_tasks):
551
- task_id = task['task_id']
552
- queued_time = datetime.fromtimestamp(task.get('queued_time', 0)).strftime('%H:%M:%S')
553
- source = "async" if task.get('is_async', False) else "sync"
554
- time_in_queue = time.time() - task.get('queued_time', time.time())
555
- queued_tasks_html += f"<li>Task {task_id[:8]}... - Queued at {queued_time} ({time_in_queue:.1f}s ago) - Position {idx+1} ({source})</li>"
556
- queued_tasks_html += "</ul></div>"
557
-
558
  tasks_html = ""
559
  for task in reversed(queue_info['recent_tasks']):
560
  tasks_html += f"""
@@ -572,46 +429,6 @@ def ui_get_queue_info():
572
  </tr>
573
  """
574
 
575
- # Add more detailed queue information
576
- queue_details = ""
577
- if queue_info['waiting_tasks'] > 0:
578
- queue_details = f"""
579
- <div class="alert alert-info">
580
- <p><strong>Currently {queue_info['waiting_tasks']} tasks in queue</strong></p>
581
- <p>Estimated wait time: {format_time(queue_info['estimated_wait'])}</p>
582
- {queued_tasks_html}
583
- </div>
584
- """
585
-
586
- processing_details = ""
587
- if queue_info['active_tasks'] > 0:
588
- # Display which tasks are being processed
589
- processing_tasks_html = ""
590
- with lock:
591
- processing_task_ids = [k for k, v in task_status.items() if v['status'] == 'processing']
592
- if processing_task_ids:
593
- processing_tasks_html = "<ul>"
594
- for task_id in processing_task_ids:
595
- task = task_status[task_id]
596
- start_time = datetime.fromtimestamp(task.get('start_time', 0)).strftime('%H:%M:%S')
597
- time_processing = time.time() - task.get('start_time', time.time())
598
- processing_tasks_html += f"<li>Task {task_id[:8]}... - Started at {start_time} ({time_processing:.1f}s ago)</li>"
599
- processing_tasks_html += "</ul>"
600
-
601
- processing_details = f"""
602
- <div class="alert alert-warning">
603
- <p><strong>Currently {queue_info['active_tasks']} tasks being processed</strong></p>
604
- {processing_tasks_html}
605
- </div>
606
- """
607
-
608
- # Add debug info
609
- debug_details = f"""
610
- <div class="debug-info">
611
- <p><small>Queue: {queue_info['queue_size']} in queue, {queue_info['waiting_tasks']} waiting, {queue_info['active_tasks']} processing</small></p>
612
- </div>
613
- """
614
-
615
  return f"""
616
  <div class="dashboard">
617
  <div class="queue-info-card main-card">
@@ -632,10 +449,7 @@ def ui_get_queue_info():
632
  </div>
633
 
634
  <div class="wait-time">
635
- <p><b>Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
636
- {queue_details}
637
- {processing_details}
638
- {debug_details}
639
  <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
640
  </div>
641
  </div>
@@ -820,63 +634,8 @@ button.primary {
820
  button.primary:hover {
821
  background-color: #3367d6;
822
  }
823
-
824
- .alert {
825
- padding: 12px;
826
- margin: 10px 0;
827
- border-radius: 6px;
828
- }
829
-
830
- .alert-info {
831
- background-color: #d1ecf1;
832
- color: #0c5460;
833
- border: 1px solid #bee5eb;
834
- }
835
-
836
- .alert-warning {
837
- background-color: #fff3cd;
838
- color: #856404;
839
- border: 1px solid #ffeeba;
840
- }
841
-
842
- .queued-tasks {
843
- text-align: left;
844
- margin: 10px 0;
845
- padding: 8px;
846
- background: rgba(255, 255, 255, 0.5);
847
- border-radius: 4px;
848
- }
849
-
850
- .queued-tasks ul {
851
- margin: 5px 0;
852
- padding-left: 20px;
853
- }
854
-
855
- .queued-tasks li {
856
- margin-bottom: 3px;
857
- }
858
  """
859
 
860
- def async_enqueue(input_data):
861
- """Async version of enqueue_task - specifically for async API calls"""
862
- # Add metadata to identify async requests
863
- if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict):
864
- if 'metadata' not in input_data[0]:
865
- input_data[0]['metadata'] = {}
866
- input_data[0]['metadata']['source'] = 'async_api'
867
-
868
- # Just call enqueue_task but set thread name to identify as async
869
- current_thread = threading.current_thread()
870
- original_name = current_thread.name
871
- current_thread.name = f"async_submission_{original_name}"
872
-
873
- result = enqueue_task(input_data)
874
-
875
- # Reset thread name
876
- current_thread.name = original_name
877
-
878
- return result
879
-
880
  # Initialize and launch worker threads
881
  launch_workers()
882
 
@@ -895,41 +654,24 @@ with gr.Blocks(css=custom_css) as demo:
895
  with gr.Row(visible=False):
896
  api_input = gr.JSON()
897
  api_output = gr.JSON()
898
-
899
- async_api_input = gr.JSON()
900
- async_api_output = gr.JSON()
901
-
902
- status_check_input = gr.Textbox()
903
- status_check_output = gr.JSON()
904
 
905
  # Define update function
906
  def update_queue_info():
907
  return ui_get_queue_info()
908
 
909
- # Update queue info more frequently
910
- demo.load(update_queue_info, None, queue_info_html, every=0.5)
911
 
912
  # Refresh button event
913
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
914
 
915
- # Force sync when handling API requests to prevent gradio's queue from interfering
916
- # Use the correct queue configuration method for current Gradio version
917
-
918
  # Add evaluation endpoint compatible with original interface
919
- evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate", concurrency_limit=1)
920
-
921
- # Add async evaluation endpoint
922
- enqueue_endpoint = demo.load(fn=async_enqueue, inputs=async_api_input, outputs=async_api_output, api_name="enqueue", concurrency_limit=1)
923
-
924
- # Add status check endpoint
925
- status_endpoint = demo.load(fn=check_status, inputs=status_check_input, outputs=status_check_output, api_name="status", concurrency_limit=1)
926
 
927
  if __name__ == "__main__":
928
- debug_log("Starting application")
929
  try:
930
- # Set max_threads for overall concurrency
931
- demo.launch(max_threads=100)
932
  finally:
933
  # Stop worker threads
934
- running = False
935
- debug_log("Shutting down application")
 
31
  task_history = []
32
  # Lock for shared resources
33
  lock = threading.Lock()
34
+ # Number of worker threads
35
+ worker_threads = 1 # Process only one task at a time
36
  # Flag for running background threads
37
  running = True
38
  # Mapping from task type to processing time
39
  task_type_times = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def queue_processor():
42
  """Process tasks in the queue"""
 
 
43
  while running:
44
  try:
45
+ task_id, input_data, request_time = task_queue.get(timeout=0.1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  with lock:
47
+ task_status[task_id]['status'] = 'processing'
48
+ task_status[task_id]['start_time'] = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  if isinstance(input_data, list) and len(input_data) > 0:
51
  sample_task = input_data[0]
 
60
  'complexity': task_complexity
61
  }
62
 
 
63
  result = evaluate(input_data)
 
64
 
65
  end_time = time.time()
66
  process_time = end_time - task_status[task_id]['start_time']
67
 
68
  with lock:
 
 
 
 
 
69
  task_status[task_id]['status'] = 'completed'
70
  task_status[task_id]['result'] = result
71
  task_status[task_id]['end_time'] = end_time
72
  task_status[task_id]['process_time'] = process_time
 
73
 
74
  if 'estimated_factors' in task_status[task_id]:
75
  factors = task_status[task_id]['estimated_factors']
 
93
  task_history.pop(0)
94
 
95
  task_queue.task_done()
 
96
 
97
  except queue.Empty:
98
+ continue
 
99
  except Exception as e:
 
100
  if 'task_id' in locals():
 
101
  with lock:
102
+ task_status[task_id]['status'] = 'error'
103
+ task_status[task_id]['error'] = str(e)
104
+ task_status[task_id]['end_time'] = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  task_queue.task_done()
106
 
107
  def _estimate_task_complexity(tasks):
 
145
 
146
  results = []
147
 
148
+ # Use a moderate number of workers for all language tests to ensure stability
149
+ # This prevents resource contention regardless of language
150
+ max_workers = max(1, min(multiprocessing.cpu_count() // 2, 4))
151
 
152
  with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
153
  future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
 
214
 
215
  def synchronous_evaluate(input_data):
216
  """Synchronously evaluate code, compatible with original interface"""
217
+ if isinstance(input_data, list) and len(input_data) > 0:
218
+ sample_task = input_data[0]
219
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
220
+ task_size = len(input_data)
221
+ task_complexity = _estimate_task_complexity(input_data)
222
+ else:
223
+ language = 'unknown'
224
+ task_size = 1
225
+ task_complexity = 'medium'
226
 
227
+ estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
228
+ estimated_total_time = estimated_time_per_task * task_size
 
 
 
229
 
230
+ queue_info = get_queue_status()
231
+ waiting_tasks = queue_info['waiting_tasks']
 
 
232
 
233
+ task_id = str(uuid.uuid4())
234
+ request_time = time.time()
235
+
236
+ with lock:
237
+ task_status[task_id] = {
238
+ 'status': 'queued',
239
+ 'queued_time': request_time,
240
+ 'queue_position': task_queue.qsize() + 1,
241
+ 'synchronous': True,
242
+ 'estimated_factors': {
243
+ 'language': language,
244
+ 'size': task_size,
245
+ 'complexity': task_complexity
246
+ },
247
+ 'estimated_time': estimated_total_time
248
+ }
249
+
250
+ task_queue.put((task_id, input_data, request_time))
251
 
 
252
  while True:
253
  with lock:
254
  if task_id in task_status:
255
  status = task_status[task_id]['status']
256
  if status == 'completed':
 
257
  result = task_status[task_id]['result']
258
+ task_status.pop(task_id, None)
 
 
 
 
259
  return result
260
  elif status == 'error':
 
261
  error = task_status[task_id].get('error', 'Unknown error')
262
+ task_status.pop(task_id, None)
 
 
 
 
263
  return {"status": "Exception", "error": error}
 
 
264
 
265
  time.sleep(0.1)
266
 
 
280
 
281
  def enqueue_task(input_data):
282
  """Add task to queue"""
 
 
283
  if isinstance(input_data, list) and len(input_data) > 0:
284
  sample_task = input_data[0]
285
  language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
 
293
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
294
  estimated_total_time = estimated_time_per_task * task_size
295
 
296
+ task_id = str(uuid.uuid4())
 
 
 
 
 
297
  request_time = time.time()
298
 
299
+ # Get current queue size before adding new task
300
+ current_queue_size = task_queue.qsize()
301
 
 
 
 
 
302
  with lock:
 
 
 
 
 
 
 
 
 
 
303
  task_status[task_id] = {
304
  'status': 'queued',
305
  'queued_time': request_time,
306
+ 'queue_position': current_queue_size + 1,
 
307
  'estimated_factors': {
308
  'language': language,
309
  'size': task_size,
 
311
  },
312
  'estimated_time': estimated_total_time
313
  }
 
314
 
 
315
  queue_info = get_queue_status()
316
  est_wait = queue_info['estimated_wait']
 
317
 
 
318
  task_queue.put((task_id, input_data, request_time))
 
 
 
 
 
 
 
 
 
 
 
 
319
 
320
  return {
321
  'task_id': task_id,
 
341
  def get_queue_status():
342
  """Get queue status"""
343
  with lock:
344
+ queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
345
+ processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
 
 
 
 
346
 
347
+ queue_size = len(queued_tasks) # Use actual count of queued tasks
348
+ active_tasks = len(processing_tasks)
349
+ waiting_tasks = queue_size
 
 
 
 
 
350
 
 
351
  remaining_processing_time = 0
352
  for task in processing_tasks:
353
  if 'start_time' in task and 'estimated_time' in task:
 
367
  else:
368
  queued_processing_time += 5
369
 
 
 
 
370
  estimated_wait = remaining_processing_time + queued_processing_time
371
 
372
  if task_history:
 
388
 
389
  return {
390
  'queue_size': queue_size,
391
+ 'active_tasks': active_tasks,
392
  'waiting_tasks': waiting_tasks,
393
  'worker_threads': worker_threads,
394
  'estimated_wait': estimated_wait,
 
412
  """Get queue info for UI"""
413
  queue_info = get_queue_status()
414
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
  tasks_html = ""
416
  for task in reversed(queue_info['recent_tasks']):
417
  tasks_html += f"""
 
429
  </tr>
430
  """
431
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
432
  return f"""
433
  <div class="dashboard">
434
  <div class="queue-info-card main-card">
 
449
  </div>
450
 
451
  <div class="wait-time">
452
+ <p><b>Current Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
 
 
 
453
  <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
454
  </div>
455
  </div>
 
634
  button.primary:hover {
635
  background-color: #3367d6;
636
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
637
  """
638
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
639
  # Initialize and launch worker threads
640
  launch_workers()
641
 
 
654
  with gr.Row(visible=False):
655
  api_input = gr.JSON()
656
  api_output = gr.JSON()
 
 
 
 
 
 
657
 
658
  # Define update function
659
  def update_queue_info():
660
  return ui_get_queue_info()
661
 
662
+ # Update queue info periodically
663
+ demo.load(update_queue_info, None, queue_info_html, every=3)
664
 
665
  # Refresh button event
666
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
667
 
 
 
 
668
  # Add evaluation endpoint compatible with original interface
669
+ demo.queue()
670
+ evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
 
 
 
 
 
671
 
672
  if __name__ == "__main__":
 
673
  try:
674
+ demo.launch()
 
675
  finally:
676
  # Stop worker threads
677
+ running = False