朱东升 commited on
Commit
6f98bd6
·
1 Parent(s): 307f223
Files changed (1) hide show
  1. app.py +31 -40
app.py CHANGED
@@ -37,13 +37,30 @@ worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the avai
37
  running = True
38
  # Mapping from task type to processing time
39
  task_type_times = {}
 
 
40
 
41
  def queue_processor():
42
  """Process tasks in the queue"""
 
 
43
  while running:
44
  try:
 
 
 
 
 
 
 
 
45
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
 
 
46
  with lock:
 
 
 
47
  if task_id in task_status:
48
  task_status[task_id]['status'] = 'processing'
49
  task_status[task_id]['start_time'] = time.time()
@@ -74,6 +91,10 @@ def queue_processor():
74
  process_time = end_time - task_status[task_id]['start_time']
75
 
76
  with lock:
 
 
 
 
77
  task_status[task_id]['status'] = 'completed'
78
  task_status[task_id]['result'] = result
79
  task_status[task_id]['end_time'] = end_time
@@ -107,6 +128,9 @@ def queue_processor():
107
  except Exception as e:
108
  if 'task_id' in locals():
109
  with lock:
 
 
 
110
  if task_id in task_status:
111
  task_status[task_id]['status'] = 'error'
112
  task_status[task_id]['error'] = str(e)
@@ -229,45 +253,9 @@ def evaluate_code(code, language):
229
 
230
  def synchronous_evaluate(input_data):
231
  """Synchronously evaluate code, compatible with original interface"""
232
- if isinstance(input_data, list) and len(input_data) > 0:
233
- sample_task = input_data[0]
234
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
235
- task_size = len(input_data)
236
- task_complexity = _estimate_task_complexity(input_data)
237
- else:
238
- language = 'unknown'
239
- task_size = 1
240
- task_complexity = 'medium'
241
-
242
- estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
243
- estimated_total_time = estimated_time_per_task * task_size
244
-
245
- queue_info = get_queue_status()
246
- waiting_tasks = queue_info['waiting_tasks']
247
-
248
- task_id = str(uuid.uuid4())
249
- request_time = time.time()
250
-
251
- # Add task to queue and update UI first
252
- with lock:
253
- task_status[task_id] = {
254
- 'status': 'queued',
255
- 'queued_time': request_time,
256
- 'queue_position': task_queue.qsize() + 1,
257
- 'synchronous': True,
258
- 'estimated_factors': {
259
- 'language': language,
260
- 'size': task_size,
261
- 'complexity': task_complexity
262
- },
263
- 'estimated_time': estimated_total_time
264
- }
265
-
266
- # Show task in UI for at least 1 second to ensure visibility
267
- time.sleep(1)
268
-
269
- # Add to queue for processing
270
- task_queue.put((task_id, input_data, request_time))
271
 
272
  # Wait for task to complete
273
  while True:
@@ -325,6 +313,7 @@ def enqueue_task(input_data):
325
  task_id = str(uuid.uuid4())
326
  request_time = time.time()
327
 
 
328
  with lock:
329
  task_status[task_id] = {
330
  'status': 'queued',
@@ -338,9 +327,11 @@ def enqueue_task(input_data):
338
  'estimated_time': estimated_total_time
339
  }
340
 
 
341
  queue_info = get_queue_status()
342
  est_wait = queue_info['estimated_wait']
343
 
 
344
  task_queue.put((task_id, input_data, request_time))
345
 
346
  return {
@@ -371,7 +362,7 @@ def get_queue_status():
371
  processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
372
 
373
  queue_size = task_queue.qsize()
374
- active_tasks = len(processing_tasks)
375
  waiting_tasks = len(queued_tasks)
376
 
377
  remaining_processing_time = 0
 
37
  running = True
38
  # Mapping from task type to processing time
39
  task_type_times = {}
40
+ # Currently processing tasks counter
41
+ processing_count = 0
42
 
43
  def queue_processor():
44
  """Process tasks in the queue"""
45
+ global processing_count
46
+
47
  while running:
48
  try:
49
+ # Only process if we haven't reached worker thread limit
50
+ with lock:
51
+ if processing_count >= worker_threads:
52
+ # Already at max capacity, wait and try again
53
+ time.sleep(0.5)
54
+ continue
55
+
56
+ # Get a task from the queue with small timeout to prevent blocking
57
  task_id, input_data, request_time = task_queue.get(timeout=0.1)
58
+
59
+ # Increment processing count to track active tasks
60
  with lock:
61
+ processing_count += 1
62
+
63
+ # Update task status
64
  if task_id in task_status:
65
  task_status[task_id]['status'] = 'processing'
66
  task_status[task_id]['start_time'] = time.time()
 
91
  process_time = end_time - task_status[task_id]['start_time']
92
 
93
  with lock:
94
+ # Decrease processing count now that we're done
95
+ processing_count -= 1
96
+
97
+ # Update task status
98
  task_status[task_id]['status'] = 'completed'
99
  task_status[task_id]['result'] = result
100
  task_status[task_id]['end_time'] = end_time
 
128
  except Exception as e:
129
  if 'task_id' in locals():
130
  with lock:
131
+ # Decrease processing count on error
132
+ processing_count -= 1
133
+
134
  if task_id in task_status:
135
  task_status[task_id]['status'] = 'error'
136
  task_status[task_id]['error'] = str(e)
 
253
 
254
  def synchronous_evaluate(input_data):
255
  """Synchronously evaluate code, compatible with original interface"""
256
+ # Create a task and queue it
257
+ task_info = enqueue_task(input_data)
258
+ task_id = task_info['task_id']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
 
260
  # Wait for task to complete
261
  while True:
 
313
  task_id = str(uuid.uuid4())
314
  request_time = time.time()
315
 
316
+ # Add to task_status with 'queued' status first
317
  with lock:
318
  task_status[task_id] = {
319
  'status': 'queued',
 
327
  'estimated_time': estimated_total_time
328
  }
329
 
330
+ # Get queue info for wait time estimation
331
  queue_info = get_queue_status()
332
  est_wait = queue_info['estimated_wait']
333
 
334
+ # Add to the task queue - this must be done AFTER adding to task_status
335
  task_queue.put((task_id, input_data, request_time))
336
 
337
  return {
 
362
  processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
363
 
364
  queue_size = task_queue.qsize()
365
+ active_tasks = processing_count
366
  waiting_tasks = len(queued_tasks)
367
 
368
  remaining_processing_time = 0