朱东升 commited on
Commit
307f223
·
1 Parent(s): 1046fcc
Files changed (1) hide show
  1. app.py +49 -8
app.py CHANGED
@@ -44,8 +44,16 @@ def queue_processor():
44
  try:
45
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
  with lock:
47
- task_status[task_id]['status'] = 'processing'
48
- task_status[task_id]['start_time'] = time.time()
 
 
 
 
 
 
 
 
49
 
50
  if isinstance(input_data, list) and len(input_data) > 0:
51
  sample_task = input_data[0]
@@ -99,9 +107,16 @@ def queue_processor():
99
  except Exception as e:
100
  if 'task_id' in locals():
101
  with lock:
102
- task_status[task_id]['status'] = 'error'
103
- task_status[task_id]['error'] = str(e)
104
- task_status[task_id]['end_time'] = time.time()
 
 
 
 
 
 
 
105
  task_queue.task_done()
106
 
107
  def _estimate_task_complexity(tasks):
@@ -233,6 +248,7 @@ def synchronous_evaluate(input_data):
233
  task_id = str(uuid.uuid4())
234
  request_time = time.time()
235
 
 
236
  with lock:
237
  task_status[task_id] = {
238
  'status': 'queued',
@@ -247,19 +263,32 @@ def synchronous_evaluate(input_data):
247
  'estimated_time': estimated_total_time
248
  }
249
 
 
 
 
 
250
  task_queue.put((task_id, input_data, request_time))
251
 
 
252
  while True:
253
  with lock:
254
  if task_id in task_status:
255
  status = task_status[task_id]['status']
256
  if status == 'completed':
257
  result = task_status[task_id]['result']
258
- task_status.pop(task_id, None)
 
 
 
 
259
  return result
260
  elif status == 'error':
261
  error = task_status[task_id].get('error', 'Unknown error')
262
- task_status.pop(task_id, None)
 
 
 
 
263
  return {"status": "Exception", "error": error}
264
 
265
  time.sleep(0.1)
@@ -654,13 +683,19 @@ with gr.Blocks(css=custom_css) as demo:
654
  with gr.Row(visible=False):
655
  api_input = gr.JSON()
656
  api_output = gr.JSON()
 
 
 
 
 
 
657
 
658
  # Define update function
659
  def update_queue_info():
660
  return ui_get_queue_info()
661
 
662
  # Update queue info periodically
663
- demo.load(update_queue_info, None, queue_info_html, every=3)
664
 
665
  # Refresh button event
666
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
@@ -668,6 +703,12 @@ with gr.Blocks(css=custom_css) as demo:
668
  # Add evaluation endpoint compatible with original interface
669
  demo.queue()
670
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
 
 
 
 
 
 
671
 
672
  if __name__ == "__main__":
673
  try:
 
44
  try:
45
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
  with lock:
47
+ if task_id in task_status:
48
+ task_status[task_id]['status'] = 'processing'
49
+ task_status[task_id]['start_time'] = time.time()
50
+ else:
51
+ # Create task status entry if it doesn't exist
52
+ task_status[task_id] = {
53
+ 'status': 'processing',
54
+ 'queued_time': request_time,
55
+ 'start_time': time.time()
56
+ }
57
 
58
  if isinstance(input_data, list) and len(input_data) > 0:
59
  sample_task = input_data[0]
 
107
  except Exception as e:
108
  if 'task_id' in locals():
109
  with lock:
110
+ if task_id in task_status:
111
+ task_status[task_id]['status'] = 'error'
112
+ task_status[task_id]['error'] = str(e)
113
+ task_status[task_id]['end_time'] = time.time()
114
+ else:
115
+ task_status[task_id] = {
116
+ 'status': 'error',
117
+ 'error': str(e),
118
+ 'end_time': time.time()
119
+ }
120
  task_queue.task_done()
121
 
122
  def _estimate_task_complexity(tasks):
 
248
  task_id = str(uuid.uuid4())
249
  request_time = time.time()
250
 
251
+ # Add task to queue and update UI first
252
  with lock:
253
  task_status[task_id] = {
254
  'status': 'queued',
 
263
  'estimated_time': estimated_total_time
264
  }
265
 
266
+ # Show task in UI for at least 1 second to ensure visibility
267
+ time.sleep(1)
268
+
269
+ # Add to queue for processing
270
  task_queue.put((task_id, input_data, request_time))
271
 
272
+ # Wait for task to complete
273
  while True:
274
  with lock:
275
  if task_id in task_status:
276
  status = task_status[task_id]['status']
277
  if status == 'completed':
278
  result = task_status[task_id]['result']
279
+ # Keep the result in status for a short time to ensure it shows in history
280
+ if 'end_time' not in task_status[task_id]:
281
+ task_status[task_id]['end_time'] = time.time()
282
+ elif time.time() - task_status[task_id]['end_time'] > 5:
283
+ task_status.pop(task_id, None)
284
  return result
285
  elif status == 'error':
286
  error = task_status[task_id].get('error', 'Unknown error')
287
+ # Keep the error in status for a short time to ensure it shows in history
288
+ if 'end_time' not in task_status[task_id]:
289
+ task_status[task_id]['end_time'] = time.time()
290
+ elif time.time() - task_status[task_id]['end_time'] > 5:
291
+ task_status.pop(task_id, None)
292
  return {"status": "Exception", "error": error}
293
 
294
  time.sleep(0.1)
 
683
  with gr.Row(visible=False):
684
  api_input = gr.JSON()
685
  api_output = gr.JSON()
686
+
687
+ async_api_input = gr.JSON()
688
+ async_api_output = gr.JSON()
689
+
690
+ status_check_input = gr.Textbox()
691
+ status_check_output = gr.JSON()
692
 
693
  # Define update function
694
  def update_queue_info():
695
  return ui_get_queue_info()
696
 
697
  # Update queue info periodically
698
+ demo.load(update_queue_info, None, queue_info_html, every=1)
699
 
700
  # Refresh button event
701
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
 
703
  # Add evaluation endpoint compatible with original interface
704
  demo.queue()
705
  evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
706
+
707
+ # Add async evaluation endpoint
708
+ enqueue_endpoint = demo.load(fn=enqueue_task, inputs=async_api_input, outputs=async_api_output, api_name="enqueue")
709
+
710
+ # Add status check endpoint
711
+ status_endpoint = demo.load(fn=check_status, inputs=status_check_input, outputs=status_check_output, api_name="status")
712
 
713
  if __name__ == "__main__":
714
  try: