3v324v23 commited on
Commit
0985e79
·
1 Parent(s): 700777f
Files changed (1) hide show
  1. app.py +119 -29
app.py CHANGED
@@ -69,10 +69,17 @@ def queue_processor():
69
  queue_size = task_queue.qsize()
70
  if queue_size > 0:
71
  debug_log(f"Queue processor found {queue_size} tasks waiting")
 
 
 
 
72
 
73
  # Get a task from the queue with small timeout to prevent blocking
74
- task_id, input_data, request_time = task_queue.get(timeout=0.1)
75
- debug_log(f"Processing task {task_id}")
 
 
 
76
 
77
  # Increment processing count to track active tasks
78
  with lock:
@@ -150,7 +157,8 @@ def queue_processor():
150
  debug_log(f"Task {task_id} completed and marked as done")
151
 
152
  except queue.Empty:
153
- continue
 
154
  except Exception as e:
155
  debug_log(f"Error in queue processor: {str(e)}")
156
  if 'task_id' in locals():
@@ -285,11 +293,20 @@ def synchronous_evaluate(input_data):
285
  """Synchronously evaluate code, compatible with original interface"""
286
  debug_log(f"Received synchronous evaluation request")
287
 
 
 
 
 
 
 
288
  # Create a task and queue it
289
  task_info = enqueue_task(input_data)
290
  task_id = task_info['task_id']
291
  debug_log(f"Created task {task_id} for synchronous evaluation")
292
 
 
 
 
293
  # Wait for task to complete
294
  while True:
295
  with lock:
@@ -349,22 +366,36 @@ def enqueue_task(input_data):
349
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
350
  estimated_total_time = estimated_time_per_task * task_size
351
 
352
- task_counter += 1
353
- task_id = f"task_{task_counter}_{str(uuid.uuid4())}"
 
 
 
 
354
  request_time = time.time()
355
 
356
  debug_log(f"Creating new task: {task_id}")
357
 
358
- # Get current queue status before adding to task_status
359
- current_queue_size = task_queue.qsize()
360
- debug_log(f"Current queue size before adding: {current_queue_size}")
361
 
362
- # Add to task_status with 'queued' status first
363
  with lock:
 
 
 
 
 
 
 
 
 
 
364
  task_status[task_id] = {
365
  'status': 'queued',
366
  'queued_time': request_time,
367
- 'queue_position': current_queue_size + 1,
 
368
  'estimated_factors': {
369
  'language': language,
370
  'size': task_size,
@@ -372,7 +403,7 @@ def enqueue_task(input_data):
372
  },
373
  'estimated_time': estimated_total_time
374
  }
375
- debug_log(f"Added task {task_id} to task_status with queue position {current_queue_size + 1}")
376
 
377
  # Get queue info for wait time estimation
378
  queue_info = get_queue_status()
@@ -383,14 +414,16 @@ def enqueue_task(input_data):
383
  task_queue.put((task_id, input_data, request_time))
384
  debug_log(f"Added task {task_id} to task_queue")
385
 
386
- # Double-check queue status after adding
387
- new_queue_size = task_queue.qsize()
388
- debug_log(f"New queue size after adding: {new_queue_size}")
389
-
390
- # Count queued tasks in task_status
391
  with lock:
392
  queued_count = sum(1 for t in task_status.values() if t['status'] == 'queued')
393
- debug_log(f"Total tasks with 'queued' status: {queued_count}")
 
 
 
 
 
 
394
 
395
  return {
396
  'task_id': task_id,
@@ -416,19 +449,23 @@ def check_status(task_id):
416
  def get_queue_status():
417
  """Get queue status"""
418
  with lock:
419
- queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
420
- processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
421
 
422
  queue_size = task_queue.qsize()
423
  active_tasks = processing_count
424
  waiting_tasks = len(queued_tasks)
425
 
426
- debug_log(f"Queue status: size={queue_size}, active={active_tasks}, waiting={waiting_tasks}")
 
 
 
427
  debug_log(f"Queue status details: {len(queued_tasks)} queued tasks found in task_status")
428
  if queued_tasks:
429
- task_ids = [t.get('task_id', 'unknown') for t in queued_tasks if 'task_id' in t]
430
  debug_log(f"Queued task IDs: {task_ids}")
431
 
 
432
  remaining_processing_time = 0
433
  for task in processing_tasks:
434
  if 'start_time' in task and 'estimated_time' in task:
@@ -472,7 +509,7 @@ def get_queue_status():
472
 
473
  return {
474
  'queue_size': queue_size,
475
- 'active_tasks': active_tasks,
476
  'waiting_tasks': waiting_tasks,
477
  'worker_threads': worker_threads,
478
  'estimated_wait': estimated_wait,
@@ -496,16 +533,26 @@ def ui_get_queue_info():
496
  """Get queue info for UI"""
497
  queue_info = get_queue_status()
498
 
499
- # List queued tasks with details
500
  queued_tasks_html = ""
501
  with lock:
502
- queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
 
 
 
 
 
 
503
  if queued_tasks:
 
 
504
  queued_tasks_html = "<div class='queued-tasks'><h4>Tasks in Queue:</h4><ul>"
505
- for idx, task in enumerate(sorted(queued_tasks, key=lambda x: x.get('queued_time', 0))):
506
- task_id = next((k for k, v in task_status.items() if v == task), "unknown")
507
  queued_time = datetime.fromtimestamp(task.get('queued_time', 0)).strftime('%H:%M:%S')
508
- queued_tasks_html += f"<li>Task {task_id[:8]}... - Queued at {queued_time} - Position {idx+1}</li>"
 
 
509
  queued_tasks_html += "</ul></div>"
510
 
511
  tasks_html = ""
@@ -538,12 +585,33 @@ def ui_get_queue_info():
538
 
539
  processing_details = ""
540
  if queue_info['active_tasks'] > 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
541
  processing_details = f"""
542
  <div class="alert alert-warning">
543
  <p><strong>Currently {queue_info['active_tasks']} tasks being processed</strong></p>
 
544
  </div>
545
  """
546
 
 
 
 
 
 
 
 
547
  return f"""
548
  <div class="dashboard">
549
  <div class="queue-info-card main-card">
@@ -567,6 +635,7 @@ def ui_get_queue_info():
567
  <p><b>Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
568
  {queue_details}
569
  {processing_details}
 
570
  <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
571
  </div>
572
  </div>
@@ -788,6 +857,26 @@ button.primary:hover {
788
  }
789
  """
790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791
  # Initialize and launch worker threads
792
  launch_workers()
793
 
@@ -830,7 +919,7 @@ with gr.Blocks(css=custom_css) as demo:
830
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate", concurrency_limit=1)
831
 
832
  # Add async evaluation endpoint
833
- enqueue_endpoint = demo.load(fn=enqueue_task, inputs=async_api_input, outputs=async_api_output, api_name="enqueue", concurrency_limit=1)
834
 
835
  # Add status check endpoint
836
  status_endpoint = demo.load(fn=check_status, inputs=status_check_input, outputs=status_check_output, api_name="status", concurrency_limit=1)
@@ -838,7 +927,8 @@ with gr.Blocks(css=custom_css) as demo:
838
  if __name__ == "__main__":
839
  debug_log("Starting application")
840
  try:
841
- demo.launch()
 
842
  finally:
843
  # Stop worker threads
844
  running = False
 
69
  queue_size = task_queue.qsize()
70
  if queue_size > 0:
71
  debug_log(f"Queue processor found {queue_size} tasks waiting")
72
+ else:
73
+ # No tasks waiting, sleep briefly to avoid CPU spinning
74
+ time.sleep(0.1)
75
+ continue
76
 
77
  # Get a task from the queue with small timeout to prevent blocking
78
+ try:
79
+ task_id, input_data, request_time = task_queue.get(timeout=0.1)
80
+ debug_log(f"Processing task {task_id}")
81
+ except queue.Empty:
82
+ continue
83
 
84
  # Increment processing count to track active tasks
85
  with lock:
 
157
  debug_log(f"Task {task_id} completed and marked as done")
158
 
159
  except queue.Empty:
160
+ # Use a small timeout to avoid CPU spinning
161
+ time.sleep(0.1)
162
  except Exception as e:
163
  debug_log(f"Error in queue processor: {str(e)}")
164
  if 'task_id' in locals():
 
293
  """Synchronously evaluate code, compatible with original interface"""
294
  debug_log(f"Received synchronous evaluation request")
295
 
296
+ # Add metadata to identify sync requests
297
+ if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict):
298
+ if 'metadata' not in input_data[0]:
299
+ input_data[0]['metadata'] = {}
300
+ input_data[0]['metadata']['source'] = 'sync_api'
301
+
302
  # Create a task and queue it
303
  task_info = enqueue_task(input_data)
304
  task_id = task_info['task_id']
305
  debug_log(f"Created task {task_id} for synchronous evaluation")
306
 
307
+ # Ensure the task appears in the queue UI, add artificial delay if needed
308
+ time.sleep(0.1) # Small delay to make sure the task is visible in queue
309
+
310
  # Wait for task to complete
311
  while True:
312
  with lock:
 
366
  estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
367
  estimated_total_time = estimated_time_per_task * task_size
368
 
369
+ # Generate task ID in a thread-safe way
370
+ with lock:
371
+ task_counter += 1
372
+ local_counter = task_counter
373
+
374
+ task_id = f"task_{local_counter}_{str(uuid.uuid4())[:8]}"
375
  request_time = time.time()
376
 
377
  debug_log(f"Creating new task: {task_id}")
378
 
379
+ # Track if this is a synchronous or asynchronous submission
380
+ is_async = 'async_submission' in str(threading.current_thread().name).lower() or 'async' in input_data[0].get('metadata', {}).get('source', '') if isinstance(input_data, list) and input_data and isinstance(input_data[0], dict) and 'metadata' in input_data[0] else False
 
381
 
382
+ # Get current queue status before adding to task_status
383
  with lock:
384
+ # Count actual queue status - both in queue AND waiting in task_status
385
+ current_queue_size = task_queue.qsize()
386
+ actual_waiting = sum(1 for t in task_status.values() if t['status'] == 'queued')
387
+ total_waiting = actual_waiting # Use the actual count from task_status
388
+
389
+ debug_log(f"Current queue metrics: queue_size={current_queue_size}, task_status_waiting={actual_waiting}, total={total_waiting}")
390
+
391
+ queue_position = total_waiting + 1
392
+
393
+ # Add to task_status with 'queued' status first
394
  task_status[task_id] = {
395
  'status': 'queued',
396
  'queued_time': request_time,
397
+ 'queue_position': queue_position,
398
+ 'is_async': is_async,
399
  'estimated_factors': {
400
  'language': language,
401
  'size': task_size,
 
403
  },
404
  'estimated_time': estimated_total_time
405
  }
406
+ debug_log(f"Added task {task_id} to task_status with queue position {queue_position}")
407
 
408
  # Get queue info for wait time estimation
409
  queue_info = get_queue_status()
 
414
  task_queue.put((task_id, input_data, request_time))
415
  debug_log(f"Added task {task_id} to task_queue")
416
 
417
+ # Count queued tasks in task_status after adding
 
 
 
 
418
  with lock:
419
  queued_count = sum(1 for t in task_status.values() if t['status'] == 'queued')
420
+ processing_tasks = sum(1 for t in task_status.values() if t['status'] == 'processing')
421
+ debug_log(f"Queue status after adding: {task_queue.qsize()} in queue, {queued_count} with 'queued' status, {processing_tasks} processing")
422
+
423
+ # Display all task IDs currently in queue
424
+ task_ids = [(k, v['status']) for k, v in task_status.items() if v['status'] in ('queued', 'processing')]
425
+ if task_ids:
426
+ debug_log(f"Current tasks: {task_ids}")
427
 
428
  return {
429
  'task_id': task_id,
 
449
  def get_queue_status():
450
  """Get queue status"""
451
  with lock:
452
+ queued_tasks = [v for k, v in task_status.items() if v['status'] == 'queued']
453
+ processing_tasks = [v for k, v in task_status.items() if v['status'] == 'processing']
454
 
455
  queue_size = task_queue.qsize()
456
  active_tasks = processing_count
457
  waiting_tasks = len(queued_tasks)
458
 
459
+ debug_log(f"Queue status check: size={queue_size}, active={active_tasks}, waiting={waiting_tasks}")
460
+ if waiting_tasks != queue_size and abs(waiting_tasks - queue_size) > 1:
461
+ debug_log(f"WARNING: Queue size mismatch - task_queue has {queue_size} items but task_status has {waiting_tasks} queued items")
462
+
463
  debug_log(f"Queue status details: {len(queued_tasks)} queued tasks found in task_status")
464
  if queued_tasks:
465
+ task_ids = [k for k, v in task_status.items() if v['status'] == 'queued']
466
  debug_log(f"Queued task IDs: {task_ids}")
467
 
468
+ # Calculate remaining processing time for active tasks
469
  remaining_processing_time = 0
470
  for task in processing_tasks:
471
  if 'start_time' in task and 'estimated_time' in task:
 
509
 
510
  return {
511
  'queue_size': queue_size,
512
+ 'active_tasks': active_tasks,
513
  'waiting_tasks': waiting_tasks,
514
  'worker_threads': worker_threads,
515
  'estimated_wait': estimated_wait,
 
533
  """Get queue info for UI"""
534
  queue_info = get_queue_status()
535
 
536
+ # List queued tasks with details - make sure to use task_id as key
537
  queued_tasks_html = ""
538
  with lock:
539
+ queued_tasks = []
540
+ for task_id, task in task_status.items():
541
+ if task['status'] == 'queued':
542
+ task_with_id = task.copy()
543
+ task_with_id['task_id'] = task_id
544
+ queued_tasks.append(task_with_id)
545
+
546
  if queued_tasks:
547
+ # Sort by queue position
548
+ queued_tasks.sort(key=lambda x: x.get('queue_position', 999999))
549
  queued_tasks_html = "<div class='queued-tasks'><h4>Tasks in Queue:</h4><ul>"
550
+ for idx, task in enumerate(queued_tasks):
551
+ task_id = task['task_id']
552
  queued_time = datetime.fromtimestamp(task.get('queued_time', 0)).strftime('%H:%M:%S')
553
+ source = "async" if task.get('is_async', False) else "sync"
554
+ time_in_queue = time.time() - task.get('queued_time', time.time())
555
+ queued_tasks_html += f"<li>Task {task_id[:8]}... - Queued at {queued_time} ({time_in_queue:.1f}s ago) - Position {idx+1} ({source})</li>"
556
  queued_tasks_html += "</ul></div>"
557
 
558
  tasks_html = ""
 
585
 
586
  processing_details = ""
587
  if queue_info['active_tasks'] > 0:
588
+ # Display which tasks are being processed
589
+ processing_tasks_html = ""
590
+ with lock:
591
+ processing_task_ids = [k for k, v in task_status.items() if v['status'] == 'processing']
592
+ if processing_task_ids:
593
+ processing_tasks_html = "<ul>"
594
+ for task_id in processing_task_ids:
595
+ task = task_status[task_id]
596
+ start_time = datetime.fromtimestamp(task.get('start_time', 0)).strftime('%H:%M:%S')
597
+ time_processing = time.time() - task.get('start_time', time.time())
598
+ processing_tasks_html += f"<li>Task {task_id[:8]}... - Started at {start_time} ({time_processing:.1f}s ago)</li>"
599
+ processing_tasks_html += "</ul>"
600
+
601
  processing_details = f"""
602
  <div class="alert alert-warning">
603
  <p><strong>Currently {queue_info['active_tasks']} tasks being processed</strong></p>
604
+ {processing_tasks_html}
605
  </div>
606
  """
607
 
608
+ # Add debug info
609
+ debug_details = f"""
610
+ <div class="debug-info">
611
+ <p><small>Queue: {queue_info['queue_size']} in queue, {queue_info['waiting_tasks']} waiting, {queue_info['active_tasks']} processing</small></p>
612
+ </div>
613
+ """
614
+
615
  return f"""
616
  <div class="dashboard">
617
  <div class="queue-info-card main-card">
 
635
  <p><b>Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
636
  {queue_details}
637
  {processing_details}
638
+ {debug_details}
639
  <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
640
  </div>
641
  </div>
 
857
  }
858
  """
859
 
860
+ def async_enqueue(input_data):
861
+ """Async version of enqueue_task - specifically for async API calls"""
862
+ # Add metadata to identify async requests
863
+ if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict):
864
+ if 'metadata' not in input_data[0]:
865
+ input_data[0]['metadata'] = {}
866
+ input_data[0]['metadata']['source'] = 'async_api'
867
+
868
+ # Just call enqueue_task but set thread name to identify as async
869
+ current_thread = threading.current_thread()
870
+ original_name = current_thread.name
871
+ current_thread.name = f"async_submission_{original_name}"
872
+
873
+ result = enqueue_task(input_data)
874
+
875
+ # Reset thread name
876
+ current_thread.name = original_name
877
+
878
+ return result
879
+
880
  # Initialize and launch worker threads
881
  launch_workers()
882
 
 
919
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate", concurrency_limit=1)
920
 
921
  # Add async evaluation endpoint
922
+ enqueue_endpoint = demo.load(fn=async_enqueue, inputs=async_api_input, outputs=async_api_output, api_name="enqueue", concurrency_limit=1)
923
 
924
  # Add status check endpoint
925
  status_endpoint = demo.load(fn=check_status, inputs=status_check_input, outputs=status_check_output, api_name="status", concurrency_limit=1)
 
927
  if __name__ == "__main__":
928
  debug_log("Starting application")
929
  try:
930
+ # Set max_threads for overall concurrency
931
+ demo.launch(max_threads=100)
932
  finally:
933
  # Stop worker threads
934
  running = False