朱东升 commited on
Commit
254fe03
·
1 Parent(s): de3b744
Files changed (2) hide show
  1. app.py +278 -158
  2. requirements.txt +2 -1
app.py CHANGED
@@ -13,6 +13,8 @@ import uuid
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
 
 
16
  from src.containerized_eval import eval_string_script
17
 
18
  # Add current directory and src directory to module search path
@@ -23,29 +25,80 @@ if current_dir not in sys.path:
23
  if src_dir not in sys.path:
24
  sys.path.append(src_dir)
25
 
26
- # Create message queue
27
- task_queue = queue.Queue()
28
- # Dictionary to store task status
29
- task_status = {}
30
- # List to store task history, max 200 tasks
31
- task_history = []
 
 
 
 
 
 
32
  # Lock for shared resources
33
  lock = threading.Lock()
34
  # Number of worker threads
35
  worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
36
  # Flag for running background threads
37
  running = True
38
- # Mapping from task type to processing time
39
- task_type_times = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def queue_processor():
42
- """Process tasks in the queue"""
43
  while running:
44
  try:
45
- task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
- with lock:
47
- task_status[task_id]['status'] = 'processing'
48
- task_status[task_id]['start_time'] = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  if isinstance(input_data, list) and len(input_data) > 0:
51
  sample_task = input_data[0]
@@ -53,56 +106,83 @@ def queue_processor():
53
  task_size = len(input_data)
54
  task_complexity = _estimate_task_complexity(input_data)
55
 
56
- with lock:
57
- task_status[task_id]['estimated_factors'] = {
58
- 'language': language,
59
- 'size': task_size,
60
- 'complexity': task_complexity
61
- }
 
62
 
63
  result = evaluate(input_data)
64
 
65
  end_time = time.time()
66
- process_time = end_time - task_status[task_id]['start_time']
67
 
68
- with lock:
69
- task_status[task_id]['status'] = 'completed'
70
- task_status[task_id]['result'] = result
71
- task_status[task_id]['end_time'] = end_time
72
- task_status[task_id]['process_time'] = process_time
 
 
 
 
 
 
73
 
74
- if 'estimated_factors' in task_status[task_id]:
75
- factors = task_status[task_id]['estimated_factors']
76
- key = f"{factors['language']}_{factors['complexity']}"
77
-
78
- if key not in task_type_times:
79
- task_type_times[key] = []
80
-
81
- task_type_times[key].append(process_time / factors['size'])
82
- if len(task_type_times[key]) > 10:
83
- task_type_times[key] = task_type_times[key][-10:]
84
 
85
- task_history.append({
86
- 'task_id': task_id,
87
- 'request_time': request_time,
88
- 'process_time': process_time,
89
- 'status': 'completed',
90
- 'factors': task_status[task_id].get('estimated_factors', {})
91
- })
92
- while len(task_history) > 200:
93
- task_history.pop(0)
94
-
95
- task_queue.task_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  except queue.Empty:
98
  continue
99
  except Exception as e:
100
  if 'task_id' in locals():
101
- with lock:
102
- task_status[task_id]['status'] = 'error'
103
- task_status[task_id]['error'] = str(e)
104
- task_status[task_id]['end_time'] = time.time()
105
- task_queue.task_done()
 
 
 
 
 
 
106
 
107
  def _estimate_task_complexity(tasks):
108
  """Estimate task complexity
@@ -233,34 +313,41 @@ def synchronous_evaluate(input_data):
233
  task_id = str(uuid.uuid4())
234
  request_time = time.time()
235
 
236
- with lock:
237
- task_status[task_id] = {
238
- 'status': 'queued',
239
- 'queued_time': request_time,
240
- 'queue_position': task_queue.qsize() + 1,
241
- 'synchronous': True,
242
- 'estimated_factors': {
243
- 'language': language,
244
- 'size': task_size,
245
- 'complexity': task_complexity
246
- },
247
- 'estimated_time': estimated_total_time
248
- }
249
-
250
- task_queue.put((task_id, input_data, request_time))
 
 
 
 
 
 
 
251
 
252
  while True:
253
- with lock:
254
- if task_id in task_status:
255
- status = task_status[task_id]['status']
256
- if status == 'completed':
257
- result = task_status[task_id]['result']
258
- task_status.pop(task_id, None)
259
- return result
260
- elif status == 'error':
261
- error = task_status[task_id].get('error', 'Unknown error')
262
- task_status.pop(task_id, None)
263
- return {"status": "Exception", "error": error}
264
 
265
  time.sleep(0.1)
266
 
@@ -268,8 +355,11 @@ def _get_estimated_time_for_task(language, complexity):
268
  """Get estimated processing time for a specific task type"""
269
  key = f"{language}_{complexity}"
270
 
271
- if key in task_type_times and len(task_type_times[key]) > 0:
272
- return np.median(task_type_times[key])
 
 
 
273
 
274
  if complexity == 'simple':
275
  return 1.0
@@ -296,104 +386,128 @@ def enqueue_task(input_data):
296
  task_id = str(uuid.uuid4())
297
  request_time = time.time()
298
 
299
- with lock:
300
- task_status[task_id] = {
301
- 'status': 'queued',
302
- 'queued_time': request_time,
303
- 'queue_position': task_queue.qsize() + 1,
304
- 'estimated_factors': {
305
- 'language': language,
306
- 'size': task_size,
307
- 'complexity': task_complexity
308
- },
309
- 'estimated_time': estimated_total_time
310
- }
311
-
312
  queue_info = get_queue_status()
313
- est_wait = queue_info['estimated_wait']
314
 
315
- task_queue.put((task_id, input_data, request_time))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
316
 
317
  return {
318
  'task_id': task_id,
319
  'status': 'queued',
320
- 'queue_position': task_status[task_id]['queue_position'],
321
  'estimated_wait': est_wait,
322
  'estimated_processing': estimated_total_time
323
  }
324
 
325
  def check_status(task_id):
326
  """Check task status"""
327
- with lock:
328
- if task_id not in task_status:
329
- return {'status': 'not_found'}
330
-
331
- status_info = task_status[task_id].copy()
 
 
 
332
 
333
- if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
334
- task_status.pop(task_id, None)
335
-
336
- return status_info
337
 
338
  def get_queue_status():
339
  """Get queue status"""
340
- with lock:
341
- queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
342
- processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
343
-
344
- queue_size = task_queue.qsize()
345
- active_tasks = len(processing_tasks)
346
- waiting_tasks = len(queued_tasks)
347
-
348
- remaining_processing_time = 0
349
- for task in processing_tasks:
350
- if 'start_time' in task and 'estimated_time' in task:
351
- elapsed = time.time() - task['start_time']
352
- remaining = max(0, task['estimated_time'] - elapsed)
353
- remaining_processing_time += remaining
354
- else:
355
- remaining_processing_time += 2
356
-
357
- if active_tasks > 0:
358
- remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
359
-
360
- queued_processing_time = 0
361
- for task in queued_tasks:
362
- if 'estimated_time' in task:
363
- queued_processing_time += task['estimated_time']
364
- else:
365
- queued_processing_time += 5
366
-
367
- if worker_threads > 0 and queued_processing_time > 0:
368
- queued_processing_time = queued_processing_time / worker_threads
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
369
 
370
- estimated_wait = remaining_processing_time + queued_processing_time
 
 
 
 
 
 
 
371
 
372
- if task_history:
373
- prediction_ratios = []
374
- for task in task_history:
375
- if 'factors' in task and 'estimated_time' in task:
376
- prediction_ratios.append(task['process_time'] / task['estimated_time'])
377
-
378
- if prediction_ratios:
379
- correction_factor = np.median(prediction_ratios)
380
- correction_factor = max(0.5, min(2.0, correction_factor))
381
- estimated_wait *= correction_factor
382
 
383
- estimated_wait = max(0.1, estimated_wait)
384
- if waiting_tasks == 0 and active_tasks == 0:
385
- estimated_wait = 0
386
-
387
- recent_tasks = task_history[-5:] if task_history else []
388
-
389
- return {
390
- 'queue_size': queue_size,
391
- 'active_tasks': active_tasks,
392
- 'waiting_tasks': waiting_tasks,
393
- 'worker_threads': worker_threads,
394
- 'estimated_wait': estimated_wait,
395
- 'recent_tasks': recent_tasks
396
- }
397
 
398
  def format_time(seconds):
399
  """Format time into readable format"""
@@ -477,6 +591,12 @@ def launch_workers():
477
  global running
478
  running = True
479
 
 
 
 
 
 
 
480
  for _ in range(worker_threads):
481
  worker = threading.Thread(target=queue_processor)
482
  worker.daemon = True
 
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
16
+ import redis
17
+ import pickle
18
  from src.containerized_eval import eval_string_script
19
 
20
  # Add current directory and src directory to module search path
 
25
  if src_dir not in sys.path:
26
  sys.path.append(src_dir)
27
 
28
+ # Initialize Redis connection (will use environment variables in Hugging Face Space)
29
+ REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
30
+ redis_client = redis.from_url(REDIS_URL)
31
+
32
+ # Keys for Redis
33
+ QUEUE_KEY = 'eval_task_queue'
34
+ STATUS_KEY = 'eval_task_status'
35
+ HISTORY_KEY = 'eval_task_history'
36
+ TASK_TIMES_KEY = 'eval_task_times'
37
+
38
+ # Local queue for worker threads
39
+ local_task_queue = queue.Queue()
40
  # Lock for shared resources
41
  lock = threading.Lock()
42
  # Number of worker threads
43
  worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
44
  # Flag for running background threads
45
  running = True
46
+
47
+ def redis_queue_monitor():
48
+ """Monitor Redis queue and add tasks to local queue"""
49
+ last_check = 0
50
+ while running:
51
+ try:
52
+ # Check Redis queue every second
53
+ if time.time() - last_check >= 1:
54
+ last_check = time.time()
55
+ # Get all tasks in the queue
56
+ task_list = redis_client.lrange(QUEUE_KEY, 0, -1)
57
+ for task_data in task_list:
58
+ task = pickle.loads(task_data)
59
+ task_id = task['id']
60
+
61
+ # Check if task is already in processing
62
+ status_data = redis_client.hget(STATUS_KEY, task_id)
63
+ if status_data:
64
+ status = pickle.loads(status_data)
65
+ if status['status'] == 'queued':
66
+ # Add to local queue if not already processing
67
+ local_task_queue.put((task_id, task['input_data'], task['request_time']))
68
+ # Update status to processing
69
+ with lock:
70
+ status['status'] = 'processing'
71
+ status['start_time'] = time.time()
72
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(status))
73
+ # Remove from Redis queue
74
+ redis_client.lrem(QUEUE_KEY, 1, task_data)
75
+
76
+ time.sleep(0.1)
77
+ except Exception as e:
78
+ print(f"Redis queue monitor error: {e}")
79
+ time.sleep(1)
80
 
81
  def queue_processor():
82
+ """Process tasks in the local queue"""
83
  while running:
84
  try:
85
+ task_id, input_data, request_time = local_task_queue.get(timeout=0.1)
86
+
87
+ # Get current status
88
+ status_data = redis_client.hget(STATUS_KEY, task_id)
89
+ if status_data:
90
+ task_status = pickle.loads(status_data)
91
+ else:
92
+ task_status = {
93
+ 'status': 'processing',
94
+ 'queued_time': request_time,
95
+ 'start_time': time.time()
96
+ }
97
+
98
+ # Update status
99
+ task_status['status'] = 'processing'
100
+ task_status['start_time'] = time.time()
101
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
102
 
103
  if isinstance(input_data, list) and len(input_data) > 0:
104
  sample_task = input_data[0]
 
106
  task_size = len(input_data)
107
  task_complexity = _estimate_task_complexity(input_data)
108
 
109
+ estimated_factors = {
110
+ 'language': language,
111
+ 'size': task_size,
112
+ 'complexity': task_complexity
113
+ }
114
+ task_status['estimated_factors'] = estimated_factors
115
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
116
 
117
  result = evaluate(input_data)
118
 
119
  end_time = time.time()
120
+ process_time = end_time - task_status['start_time']
121
 
122
+ # Update status
123
+ task_status['status'] = 'completed'
124
+ task_status['result'] = result
125
+ task_status['end_time'] = end_time
126
+ task_status['process_time'] = process_time
127
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
128
+
129
+ # Update task type times
130
+ if 'estimated_factors' in task_status:
131
+ factors = task_status['estimated_factors']
132
+ key = f"{factors['language']}_{factors['complexity']}"
133
 
134
+ # Update task times in Redis
135
+ times_data = redis_client.hget(TASK_TIMES_KEY, key)
136
+ if times_data:
137
+ times = pickle.loads(times_data)
138
+ else:
139
+ times = []
 
 
 
 
140
 
141
+ times.append(process_time / factors['size'])
142
+ if len(times) > 10:
143
+ times = times[-10:]
144
+
145
+ redis_client.hset(TASK_TIMES_KEY, key, pickle.dumps(times))
146
+
147
+ # Add to history
148
+ history_item = {
149
+ 'task_id': task_id,
150
+ 'request_time': request_time,
151
+ 'process_time': process_time,
152
+ 'status': 'completed',
153
+ 'factors': task_status.get('estimated_factors', {})
154
+ }
155
+
156
+ # Get current history
157
+ history_data = redis_client.get(HISTORY_KEY)
158
+ if history_data:
159
+ history = pickle.loads(history_data)
160
+ else:
161
+ history = []
162
+
163
+ history.append(history_item)
164
+ while len(history) > 200:
165
+ history.pop(0)
166
+
167
+ redis_client.set(HISTORY_KEY, pickle.dumps(history))
168
+
169
+ local_task_queue.task_done()
170
 
171
  except queue.Empty:
172
  continue
173
  except Exception as e:
174
  if 'task_id' in locals():
175
+ status_data = redis_client.hget(STATUS_KEY, task_id)
176
+ if status_data:
177
+ task_status = pickle.loads(status_data)
178
+ else:
179
+ task_status = {}
180
+
181
+ task_status['status'] = 'error'
182
+ task_status['error'] = str(e)
183
+ task_status['end_time'] = time.time()
184
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
185
+ local_task_queue.task_done()
186
 
187
  def _estimate_task_complexity(tasks):
188
  """Estimate task complexity
 
313
  task_id = str(uuid.uuid4())
314
  request_time = time.time()
315
 
316
+ task_status = {
317
+ 'status': 'queued',
318
+ 'queued_time': request_time,
319
+ 'queue_position': queue_info['queue_size'] + 1,
320
+ 'synchronous': True,
321
+ 'estimated_factors': {
322
+ 'language': language,
323
+ 'size': task_size,
324
+ 'complexity': task_complexity
325
+ },
326
+ 'estimated_time': estimated_total_time
327
+ }
328
+
329
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
330
+
331
+ # Add to queue
332
+ task = {
333
+ 'id': task_id,
334
+ 'input_data': input_data,
335
+ 'request_time': request_time
336
+ }
337
+ redis_client.rpush(QUEUE_KEY, pickle.dumps(task))
338
 
339
  while True:
340
+ status_data = redis_client.hget(STATUS_KEY, task_id)
341
+ if status_data:
342
+ status_info = pickle.loads(status_data)
343
+ if status_info['status'] == 'completed':
344
+ result = status_info.get('result', {"status": "Exception", "error": "No result found"})
345
+ redis_client.hdel(STATUS_KEY, task_id)
346
+ return result
347
+ elif status_info['status'] == 'error':
348
+ error = status_info.get('error', 'Unknown error')
349
+ redis_client.hdel(STATUS_KEY, task_id)
350
+ return {"status": "Exception", "error": error}
351
 
352
  time.sleep(0.1)
353
 
 
355
  """Get estimated processing time for a specific task type"""
356
  key = f"{language}_{complexity}"
357
 
358
+ times_data = redis_client.hget(TASK_TIMES_KEY, key)
359
+ if times_data:
360
+ times = pickle.loads(times_data)
361
+ if times:
362
+ return np.median(times)
363
 
364
  if complexity == 'simple':
365
  return 1.0
 
386
  task_id = str(uuid.uuid4())
387
  request_time = time.time()
388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
389
  queue_info = get_queue_status()
 
390
 
391
+ task_status = {
392
+ 'status': 'queued',
393
+ 'queued_time': request_time,
394
+ 'queue_position': queue_info['queue_size'] + 1,
395
+ 'estimated_factors': {
396
+ 'language': language,
397
+ 'size': task_size,
398
+ 'complexity': task_complexity
399
+ },
400
+ 'estimated_time': estimated_total_time
401
+ }
402
+
403
+ redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
404
+
405
+ # Add to queue
406
+ task = {
407
+ 'id': task_id,
408
+ 'input_data': input_data,
409
+ 'request_time': request_time
410
+ }
411
+ redis_client.rpush(QUEUE_KEY, pickle.dumps(task))
412
+
413
+ est_wait = queue_info['estimated_wait']
414
 
415
  return {
416
  'task_id': task_id,
417
  'status': 'queued',
418
+ 'queue_position': task_status['queue_position'],
419
  'estimated_wait': est_wait,
420
  'estimated_processing': estimated_total_time
421
  }
422
 
423
  def check_status(task_id):
424
  """Check task status"""
425
+ status_data = redis_client.hget(STATUS_KEY, task_id)
426
+ if not status_data:
427
+ return {'status': 'not_found'}
428
+
429
+ status_info = pickle.loads(status_data)
430
+
431
+ if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
432
+ redis_client.hdel(STATUS_KEY, task_id)
433
 
434
+ return status_info
 
 
 
435
 
436
  def get_queue_status():
437
  """Get queue status"""
438
+ # Get all task statuses
439
+ all_statuses = redis_client.hgetall(STATUS_KEY)
440
+
441
+ queued_tasks = []
442
+ processing_tasks = []
443
+
444
+ for task_id, status_data in all_statuses.items():
445
+ status_info = pickle.loads(status_data)
446
+ if status_info['status'] == 'queued':
447
+ queued_tasks.append(status_info)
448
+ elif status_info['status'] == 'processing':
449
+ processing_tasks.append(status_info)
450
+
451
+ queue_size = redis_client.llen(QUEUE_KEY)
452
+ active_tasks = len(processing_tasks)
453
+ waiting_tasks = len(queued_tasks)
454
+
455
+ remaining_processing_time = 0
456
+ for task in processing_tasks:
457
+ if 'start_time' in task and 'estimated_time' in task:
458
+ elapsed = time.time() - task['start_time']
459
+ remaining = max(0, task['estimated_time'] - elapsed)
460
+ remaining_processing_time += remaining
461
+ else:
462
+ remaining_processing_time += 2
463
+
464
+ if active_tasks > 0:
465
+ remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
466
+
467
+ queued_processing_time = 0
468
+ for task in queued_tasks:
469
+ if 'estimated_time' in task:
470
+ queued_processing_time += task['estimated_time']
471
+ else:
472
+ queued_processing_time += 5
473
+
474
+ if worker_threads > 0 and queued_processing_time > 0:
475
+ queued_processing_time = queued_processing_time / worker_threads
476
+
477
+ estimated_wait = remaining_processing_time + queued_processing_time
478
+
479
+ # Get task history
480
+ history_data = redis_client.get(HISTORY_KEY)
481
+ if history_data:
482
+ task_history = pickle.loads(history_data)
483
+ else:
484
+ task_history = []
485
+
486
+ if task_history:
487
+ prediction_ratios = []
488
+ for task in task_history:
489
+ if 'factors' in task and 'estimated_time' in task:
490
+ prediction_ratios.append(task['process_time'] / task['estimated_time'])
491
 
492
+ if prediction_ratios:
493
+ correction_factor = np.median(prediction_ratios)
494
+ correction_factor = max(0.5, min(2.0, correction_factor))
495
+ estimated_wait *= correction_factor
496
+
497
+ estimated_wait = max(0.1, estimated_wait)
498
+ if waiting_tasks == 0 and active_tasks == 0:
499
+ estimated_wait = 0
500
 
501
+ recent_tasks = task_history[-5:] if task_history else []
 
 
 
 
 
 
 
 
 
502
 
503
+ return {
504
+ 'queue_size': queue_size,
505
+ 'active_tasks': active_tasks,
506
+ 'waiting_tasks': waiting_tasks,
507
+ 'worker_threads': worker_threads,
508
+ 'estimated_wait': estimated_wait,
509
+ 'recent_tasks': recent_tasks
510
+ }
 
 
 
 
 
 
511
 
512
  def format_time(seconds):
513
  """Format time into readable format"""
 
591
  global running
592
  running = True
593
 
594
+ # Start Redis queue monitor
595
+ monitor = threading.Thread(target=redis_queue_monitor)
596
+ monitor.daemon = True
597
+ monitor.start()
598
+
599
+ # Start worker threads
600
  for _ in range(worker_threads):
601
  worker = threading.Thread(target=queue_processor)
602
  worker.daemon = True
requirements.txt CHANGED
@@ -1 +1,2 @@
1
- gradio==4.44.1
 
 
1
+ gradio==4.44.1
2
+ redis==5.0.1