3v324v23 commited on
Commit
55c9531
·
1 Parent(s): 0b56f9f
Files changed (1) hide show
  1. app.py +171 -106
app.py CHANGED
@@ -42,67 +42,80 @@ def queue_processor():
42
  """Process tasks in the queue"""
43
  while running:
44
  try:
45
- task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
- with lock:
47
- task_status[task_id]['status'] = 'processing'
48
- task_status[task_id]['start_time'] = time.time()
49
-
50
- if isinstance(input_data, list) and len(input_data) > 0:
51
- sample_task = input_data[0]
52
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
53
- task_size = len(input_data)
54
- task_complexity = _estimate_task_complexity(input_data)
55
 
 
56
  with lock:
57
- task_status[task_id]['estimated_factors'] = {
58
- 'language': language,
59
- 'size': task_size,
60
- 'complexity': task_complexity
61
- }
62
-
63
- result = evaluate(input_data)
64
-
65
- end_time = time.time()
66
- process_time = end_time - task_status[task_id]['start_time']
67
-
68
- with lock:
69
- task_status[task_id]['status'] = 'completed'
70
- task_status[task_id]['result'] = result
71
- task_status[task_id]['end_time'] = end_time
72
- task_status[task_id]['process_time'] = process_time
73
 
74
- if 'estimated_factors' in task_status[task_id]:
75
- factors = task_status[task_id]['estimated_factors']
76
- key = f"{factors['language']}_{factors['complexity']}"
77
-
78
- if key not in task_type_times:
79
- task_type_times[key] = []
80
 
81
- task_type_times[key].append(process_time / factors['size'])
82
- if len(task_type_times[key]) > 10:
83
- task_type_times[key] = task_type_times[key][-10:]
 
 
 
 
 
 
 
 
84
 
85
- task_history.append({
86
- 'task_id': task_id,
87
- 'request_time': request_time,
88
- 'process_time': process_time,
89
- 'status': 'completed',
90
- 'factors': task_status[task_id].get('estimated_factors', {})
91
- })
92
- while len(task_history) > 200:
93
- task_history.pop(0)
94
-
95
- task_queue.task_done()
96
-
97
- except queue.Empty:
98
- continue
99
- except Exception as e:
100
- if 'task_id' in locals():
101
  with lock:
102
- task_status[task_id]['status'] = 'error'
103
- task_status[task_id]['error'] = str(e)
104
- task_status[task_id]['end_time'] = time.time()
105
- task_queue.task_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  def _estimate_task_complexity(tasks):
108
  """Estimate task complexity
@@ -214,55 +227,75 @@ def evaluate_code(code, language):
214
 
215
  def synchronous_evaluate(input_data):
216
  """Synchronously evaluate code, compatible with original interface"""
217
- if isinstance(input_data, list) and len(input_data) > 0:
218
- sample_task = input_data[0]
219
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
220
- task_size = len(input_data)
221
- task_complexity = _estimate_task_complexity(input_data)
222
- else:
223
- language = 'unknown'
224
- task_size = 1
225
- task_complexity = 'medium'
226
-
227
- estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
228
- estimated_total_time = estimated_time_per_task * task_size
229
-
230
- queue_info = get_queue_status()
231
- waiting_tasks = queue_info['waiting_tasks']
232
-
233
- task_id = str(uuid.uuid4())
234
- request_time = time.time()
235
-
236
- with lock:
237
- task_status[task_id] = {
238
- 'status': 'queued',
239
- 'queued_time': request_time,
240
- 'queue_position': task_queue.qsize() + 1,
241
- 'synchronous': True,
242
- 'estimated_factors': {
243
- 'language': language,
244
- 'size': task_size,
245
- 'complexity': task_complexity
246
- },
247
- 'estimated_time': estimated_total_time
248
- }
249
-
250
- task_queue.put((task_id, input_data, request_time))
251
-
252
- while True:
253
  with lock:
254
- if task_id in task_status:
255
- status = task_status[task_id]['status']
256
- if status == 'completed':
257
- result = task_status[task_id]['result']
258
- task_status.pop(task_id, None)
259
- return result
260
- elif status == 'error':
261
- error = task_status[task_id].get('error', 'Unknown error')
262
- task_status.pop(task_id, None)
263
- return {"status": "Exception", "error": error}
 
 
 
 
264
 
265
- time.sleep(0.1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
 
267
  def _get_estimated_time_for_task(language, complexity):
268
  """Get estimated processing time for a specific task type"""
@@ -647,7 +680,7 @@ with gr.Blocks(css=custom_css) as demo:
647
  with gr.Row():
648
  with gr.Column(scale=3):
649
  # Queue status info card
650
- queue_info_html = gr.HTML()
651
  refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary")
652
 
653
  # Hidden API interface components
@@ -657,10 +690,34 @@ with gr.Blocks(css=custom_css) as demo:
657
 
658
  # Define update function
659
  def update_queue_info():
660
- return ui_get_queue_info()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
661
 
662
  # Update queue info periodically
663
- demo.load(update_queue_info, None, queue_info_html, every=3)
664
 
665
  # Refresh button event
666
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
@@ -671,7 +728,15 @@ with gr.Blocks(css=custom_css) as demo:
671
 
672
  if __name__ == "__main__":
673
  try:
674
- demo.launch()
 
 
 
 
 
 
 
 
675
  finally:
676
  # Stop worker threads
677
  running = False
 
42
  """Process tasks in the queue"""
43
  while running:
44
  try:
45
+ # Use get with a timeout to prevent busy waiting
46
+ try:
47
+ task_id, input_data, request_time = task_queue.get(timeout=0.5)
48
+ except queue.Empty:
49
+ continue
 
 
 
 
 
50
 
51
+ try:
52
  with lock:
53
+ if task_id not in task_status:
54
+ # Task was cancelled or removed
55
+ task_queue.task_done()
56
+ continue
57
+
58
+ task_status[task_id]['status'] = 'processing'
59
+ task_status[task_id]['start_time'] = time.time()
 
 
 
 
 
 
 
 
 
60
 
61
+ if isinstance(input_data, list) and len(input_data) > 0:
62
+ sample_task = input_data[0]
63
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
64
+ task_size = len(input_data)
65
+ task_complexity = _estimate_task_complexity(input_data)
 
66
 
67
+ with lock:
68
+ task_status[task_id]['estimated_factors'] = {
69
+ 'language': language,
70
+ 'size': task_size,
71
+ 'complexity': task_complexity
72
+ }
73
+
74
+ result = evaluate(input_data)
75
+
76
+ end_time = time.time()
77
+ process_time = end_time - task_status[task_id]['start_time']
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  with lock:
80
+ if task_id in task_status: # Check if task still exists
81
+ task_status[task_id]['status'] = 'completed'
82
+ task_status[task_id]['result'] = result
83
+ task_status[task_id]['end_time'] = end_time
84
+ task_status[task_id]['process_time'] = process_time
85
+
86
+ if 'estimated_factors' in task_status[task_id]:
87
+ factors = task_status[task_id]['estimated_factors']
88
+ key = f"{factors['language']}_{factors['complexity']}"
89
+
90
+ if key not in task_type_times:
91
+ task_type_times[key] = []
92
+
93
+ task_type_times[key].append(process_time / factors['size'])
94
+ if len(task_type_times[key]) > 10:
95
+ task_type_times[key] = task_type_times[key][-10:]
96
+
97
+ task_history.append({
98
+ 'task_id': task_id,
99
+ 'request_time': request_time,
100
+ 'process_time': process_time,
101
+ 'status': 'completed',
102
+ 'factors': task_status[task_id].get('estimated_factors', {})
103
+ })
104
+ while len(task_history) > 200:
105
+ task_history.pop(0)
106
+ except Exception as e:
107
+ print(f"Error processing task {task_id}: {str(e)}")
108
+ with lock:
109
+ if task_id in task_status:
110
+ task_status[task_id]['status'] = 'error'
111
+ task_status[task_id]['error'] = str(e)
112
+ task_status[task_id]['end_time'] = time.time()
113
+ finally:
114
+ task_queue.task_done()
115
+
116
+ except Exception as e:
117
+ print(f"Critical error in queue processor: {str(e)}")
118
+ time.sleep(1) # Avoid tight loop in case of persistent errors
119
 
120
  def _estimate_task_complexity(tasks):
121
  """Estimate task complexity
 
227
 
228
  def synchronous_evaluate(input_data):
229
  """Synchronously evaluate code, compatible with original interface"""
230
+ try:
231
+ if isinstance(input_data, list) and len(input_data) > 0:
232
+ sample_task = input_data[0]
233
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
234
+ task_size = len(input_data)
235
+ task_complexity = _estimate_task_complexity(input_data)
236
+ else:
237
+ language = 'unknown'
238
+ task_size = 1
239
+ task_complexity = 'medium'
240
+
241
+ estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
242
+ estimated_total_time = estimated_time_per_task * task_size
243
+
244
+ queue_info = get_queue_status()
245
+ waiting_tasks = queue_info['waiting_tasks']
246
+
247
+ task_id = str(uuid.uuid4())
248
+ request_time = time.time()
249
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
  with lock:
251
+ task_status[task_id] = {
252
+ 'status': 'queued',
253
+ 'queued_time': request_time,
254
+ 'queue_position': task_queue.qsize() + 1,
255
+ 'synchronous': True,
256
+ 'estimated_factors': {
257
+ 'language': language,
258
+ 'size': task_size,
259
+ 'complexity': task_complexity
260
+ },
261
+ 'estimated_time': estimated_total_time
262
+ }
263
+
264
+ task_queue.put((task_id, input_data, request_time))
265
 
266
+ # Set a reasonable timeout to avoid hanging
267
+ max_wait_time = max(300, estimated_total_time * 2) # At least 5 minutes or double the estimated time
268
+ start_wait = time.time()
269
+
270
+ while True:
271
+ with lock:
272
+ if task_id in task_status:
273
+ status = task_status[task_id]['status']
274
+ if status == 'completed':
275
+ result = task_status[task_id]['result']
276
+ task_status.pop(task_id, None)
277
+ return result
278
+ elif status == 'error':
279
+ error = task_status[task_id].get('error', 'Unknown error')
280
+ task_status.pop(task_id, None)
281
+ return {"status": "Exception", "error": error}
282
+ else:
283
+ # Task somehow disappeared
284
+ return {"status": "Exception", "error": "Task was lost during processing"}
285
+
286
+ # Check if we've waited too long
287
+ if time.time() - start_wait > max_wait_time:
288
+ with lock:
289
+ if task_id in task_status:
290
+ task_status[task_id]['status'] = 'error'
291
+ task_status[task_id]['error'] = f"Task timed out after {max_wait_time} seconds"
292
+ return {"status": "Exception", "error": f"Evaluation timed out after {max_wait_time} seconds"}
293
+
294
+ time.sleep(0.1)
295
+
296
+ except Exception as e:
297
+ print(f"Error in synchronous_evaluate: {str(e)}")
298
+ return {"status": "Exception", "error": str(e)}
299
 
300
  def _get_estimated_time_for_task(language, complexity):
301
  """Get estimated processing time for a specific task type"""
 
680
  with gr.Row():
681
  with gr.Column(scale=3):
682
  # Queue status info card
683
+ queue_info_html = gr.HTML(update_queue_info())
684
  refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary")
685
 
686
  # Hidden API interface components
 
690
 
691
  # Define update function
692
  def update_queue_info():
693
+ try:
694
+ info = ui_get_queue_info()
695
+ return info
696
+ except Exception as e:
697
+ print(f"Error updating queue info: {str(e)}")
698
+ return f"""
699
+ <div class="dashboard">
700
+ <div class="queue-info-card main-card">
701
+ <h3 class="card-title">Queue Status Monitor</h3>
702
+ <div class="queue-stats">
703
+ <p>Error refreshing queue status. Click refresh to try again.</p>
704
+ </div>
705
+ </div>
706
+ </div>
707
+ """
708
+
709
+ # Set up periodic refresh with error handling
710
+ refresh_interval = 3
711
+
712
+ def safe_refresh():
713
+ try:
714
+ return update_queue_info()
715
+ except Exception as e:
716
+ print(f"Safe refresh error: {str(e)}")
717
+ return queue_info_html.value # Keep existing value on error
718
 
719
  # Update queue info periodically
720
+ demo.load(safe_refresh, None, queue_info_html, every=refresh_interval)
721
 
722
  # Refresh button event
723
  refresh_queue_btn.click(update_queue_info, None, queue_info_html)
 
728
 
729
  if __name__ == "__main__":
730
  try:
731
+ # Use configuration suitable for Hugging Face Spaces
732
+ demo.launch(
733
+ server_name="0.0.0.0", # Listen on all network interfaces
734
+ share=False, # No need for sharing link in Spaces
735
+ show_error=True, # Show detailed error messages
736
+ debug=True, # Enable debug mode for better error reporting
737
+ max_threads=10, # Limit number of threads to avoid resource contention
738
+ quiet=False # Log to console for debugging
739
+ )
740
  finally:
741
  # Stop worker threads
742
  running = False