朱东升 commited on
Commit
1046fcc
·
1 Parent(s): 254fe03
Files changed (2) hide show
  1. app.py +158 -278
  2. requirements.txt +1 -2
app.py CHANGED
@@ -13,8 +13,6 @@ import uuid
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
16
- import redis
17
- import pickle
18
  from src.containerized_eval import eval_string_script
19
 
20
  # Add current directory and src directory to module search path
@@ -25,80 +23,29 @@ if current_dir not in sys.path:
25
  if src_dir not in sys.path:
26
  sys.path.append(src_dir)
27
 
28
- # Initialize Redis connection (will use environment variables in Hugging Face Space)
29
- REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
30
- redis_client = redis.from_url(REDIS_URL)
31
-
32
- # Keys for Redis
33
- QUEUE_KEY = 'eval_task_queue'
34
- STATUS_KEY = 'eval_task_status'
35
- HISTORY_KEY = 'eval_task_history'
36
- TASK_TIMES_KEY = 'eval_task_times'
37
-
38
- # Local queue for worker threads
39
- local_task_queue = queue.Queue()
40
  # Lock for shared resources
41
  lock = threading.Lock()
42
  # Number of worker threads
43
  worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
44
  # Flag for running background threads
45
  running = True
46
-
47
- def redis_queue_monitor():
48
- """Monitor Redis queue and add tasks to local queue"""
49
- last_check = 0
50
- while running:
51
- try:
52
- # Check Redis queue every second
53
- if time.time() - last_check >= 1:
54
- last_check = time.time()
55
- # Get all tasks in the queue
56
- task_list = redis_client.lrange(QUEUE_KEY, 0, -1)
57
- for task_data in task_list:
58
- task = pickle.loads(task_data)
59
- task_id = task['id']
60
-
61
- # Check if task is already in processing
62
- status_data = redis_client.hget(STATUS_KEY, task_id)
63
- if status_data:
64
- status = pickle.loads(status_data)
65
- if status['status'] == 'queued':
66
- # Add to local queue if not already processing
67
- local_task_queue.put((task_id, task['input_data'], task['request_time']))
68
- # Update status to processing
69
- with lock:
70
- status['status'] = 'processing'
71
- status['start_time'] = time.time()
72
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(status))
73
- # Remove from Redis queue
74
- redis_client.lrem(QUEUE_KEY, 1, task_data)
75
-
76
- time.sleep(0.1)
77
- except Exception as e:
78
- print(f"Redis queue monitor error: {e}")
79
- time.sleep(1)
80
 
81
  def queue_processor():
82
- """Process tasks in the local queue"""
83
  while running:
84
  try:
85
- task_id, input_data, request_time = local_task_queue.get(timeout=0.1)
86
-
87
- # Get current status
88
- status_data = redis_client.hget(STATUS_KEY, task_id)
89
- if status_data:
90
- task_status = pickle.loads(status_data)
91
- else:
92
- task_status = {
93
- 'status': 'processing',
94
- 'queued_time': request_time,
95
- 'start_time': time.time()
96
- }
97
-
98
- # Update status
99
- task_status['status'] = 'processing'
100
- task_status['start_time'] = time.time()
101
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
102
 
103
  if isinstance(input_data, list) and len(input_data) > 0:
104
  sample_task = input_data[0]
@@ -106,83 +53,56 @@ def queue_processor():
106
  task_size = len(input_data)
107
  task_complexity = _estimate_task_complexity(input_data)
108
 
109
- estimated_factors = {
110
- 'language': language,
111
- 'size': task_size,
112
- 'complexity': task_complexity
113
- }
114
- task_status['estimated_factors'] = estimated_factors
115
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
116
 
117
  result = evaluate(input_data)
118
 
119
  end_time = time.time()
120
- process_time = end_time - task_status['start_time']
121
 
122
- # Update status
123
- task_status['status'] = 'completed'
124
- task_status['result'] = result
125
- task_status['end_time'] = end_time
126
- task_status['process_time'] = process_time
127
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
128
-
129
- # Update task type times
130
- if 'estimated_factors' in task_status:
131
- factors = task_status['estimated_factors']
132
- key = f"{factors['language']}_{factors['complexity']}"
133
-
134
- # Update task times in Redis
135
- times_data = redis_client.hget(TASK_TIMES_KEY, key)
136
- if times_data:
137
- times = pickle.loads(times_data)
138
- else:
139
- times = []
140
 
141
- times.append(process_time / factors['size'])
142
- if len(times) > 10:
143
- times = times[-10:]
 
 
 
 
 
 
 
144
 
145
- redis_client.hset(TASK_TIMES_KEY, key, pickle.dumps(times))
146
-
147
- # Add to history
148
- history_item = {
149
- 'task_id': task_id,
150
- 'request_time': request_time,
151
- 'process_time': process_time,
152
- 'status': 'completed',
153
- 'factors': task_status.get('estimated_factors', {})
154
- }
155
-
156
- # Get current history
157
- history_data = redis_client.get(HISTORY_KEY)
158
- if history_data:
159
- history = pickle.loads(history_data)
160
- else:
161
- history = []
162
-
163
- history.append(history_item)
164
- while len(history) > 200:
165
- history.pop(0)
166
-
167
- redis_client.set(HISTORY_KEY, pickle.dumps(history))
168
-
169
- local_task_queue.task_done()
170
 
171
  except queue.Empty:
172
  continue
173
  except Exception as e:
174
  if 'task_id' in locals():
175
- status_data = redis_client.hget(STATUS_KEY, task_id)
176
- if status_data:
177
- task_status = pickle.loads(status_data)
178
- else:
179
- task_status = {}
180
-
181
- task_status['status'] = 'error'
182
- task_status['error'] = str(e)
183
- task_status['end_time'] = time.time()
184
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
185
- local_task_queue.task_done()
186
 
187
  def _estimate_task_complexity(tasks):
188
  """Estimate task complexity
@@ -313,41 +233,34 @@ def synchronous_evaluate(input_data):
313
  task_id = str(uuid.uuid4())
314
  request_time = time.time()
315
 
316
- task_status = {
317
- 'status': 'queued',
318
- 'queued_time': request_time,
319
- 'queue_position': queue_info['queue_size'] + 1,
320
- 'synchronous': True,
321
- 'estimated_factors': {
322
- 'language': language,
323
- 'size': task_size,
324
- 'complexity': task_complexity
325
- },
326
- 'estimated_time': estimated_total_time
327
- }
328
-
329
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
330
-
331
- # Add to queue
332
- task = {
333
- 'id': task_id,
334
- 'input_data': input_data,
335
- 'request_time': request_time
336
- }
337
- redis_client.rpush(QUEUE_KEY, pickle.dumps(task))
338
 
339
  while True:
340
- status_data = redis_client.hget(STATUS_KEY, task_id)
341
- if status_data:
342
- status_info = pickle.loads(status_data)
343
- if status_info['status'] == 'completed':
344
- result = status_info.get('result', {"status": "Exception", "error": "No result found"})
345
- redis_client.hdel(STATUS_KEY, task_id)
346
- return result
347
- elif status_info['status'] == 'error':
348
- error = status_info.get('error', 'Unknown error')
349
- redis_client.hdel(STATUS_KEY, task_id)
350
- return {"status": "Exception", "error": error}
351
 
352
  time.sleep(0.1)
353
 
@@ -355,11 +268,8 @@ def _get_estimated_time_for_task(language, complexity):
355
  """Get estimated processing time for a specific task type"""
356
  key = f"{language}_{complexity}"
357
 
358
- times_data = redis_client.hget(TASK_TIMES_KEY, key)
359
- if times_data:
360
- times = pickle.loads(times_data)
361
- if times:
362
- return np.median(times)
363
 
364
  if complexity == 'simple':
365
  return 1.0
@@ -386,128 +296,104 @@ def enqueue_task(input_data):
386
  task_id = str(uuid.uuid4())
387
  request_time = time.time()
388
 
389
- queue_info = get_queue_status()
390
-
391
- task_status = {
392
- 'status': 'queued',
393
- 'queued_time': request_time,
394
- 'queue_position': queue_info['queue_size'] + 1,
395
- 'estimated_factors': {
396
- 'language': language,
397
- 'size': task_size,
398
- 'complexity': task_complexity
399
- },
400
- 'estimated_time': estimated_total_time
401
- }
402
-
403
- redis_client.hset(STATUS_KEY, task_id, pickle.dumps(task_status))
404
-
405
- # Add to queue
406
- task = {
407
- 'id': task_id,
408
- 'input_data': input_data,
409
- 'request_time': request_time
410
- }
411
- redis_client.rpush(QUEUE_KEY, pickle.dumps(task))
412
 
 
413
  est_wait = queue_info['estimated_wait']
414
 
 
 
415
  return {
416
  'task_id': task_id,
417
  'status': 'queued',
418
- 'queue_position': task_status['queue_position'],
419
  'estimated_wait': est_wait,
420
  'estimated_processing': estimated_total_time
421
  }
422
 
423
  def check_status(task_id):
424
  """Check task status"""
425
- status_data = redis_client.hget(STATUS_KEY, task_id)
426
- if not status_data:
427
- return {'status': 'not_found'}
428
-
429
- status_info = pickle.loads(status_data)
430
-
431
- if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
432
- redis_client.hdel(STATUS_KEY, task_id)
433
 
434
- return status_info
 
 
 
435
 
436
  def get_queue_status():
437
  """Get queue status"""
438
- # Get all task statuses
439
- all_statuses = redis_client.hgetall(STATUS_KEY)
440
-
441
- queued_tasks = []
442
- processing_tasks = []
443
-
444
- for task_id, status_data in all_statuses.items():
445
- status_info = pickle.loads(status_data)
446
- if status_info['status'] == 'queued':
447
- queued_tasks.append(status_info)
448
- elif status_info['status'] == 'processing':
449
- processing_tasks.append(status_info)
450
-
451
- queue_size = redis_client.llen(QUEUE_KEY)
452
- active_tasks = len(processing_tasks)
453
- waiting_tasks = len(queued_tasks)
454
-
455
- remaining_processing_time = 0
456
- for task in processing_tasks:
457
- if 'start_time' in task and 'estimated_time' in task:
458
- elapsed = time.time() - task['start_time']
459
- remaining = max(0, task['estimated_time'] - elapsed)
460
- remaining_processing_time += remaining
461
- else:
462
- remaining_processing_time += 2
463
-
464
- if active_tasks > 0:
465
- remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
466
-
467
- queued_processing_time = 0
468
- for task in queued_tasks:
469
- if 'estimated_time' in task:
470
- queued_processing_time += task['estimated_time']
471
- else:
472
- queued_processing_time += 5
473
-
474
- if worker_threads > 0 and queued_processing_time > 0:
475
- queued_processing_time = queued_processing_time / worker_threads
476
-
477
- estimated_wait = remaining_processing_time + queued_processing_time
478
-
479
- # Get task history
480
- history_data = redis_client.get(HISTORY_KEY)
481
- if history_data:
482
- task_history = pickle.loads(history_data)
483
- else:
484
- task_history = []
485
-
486
- if task_history:
487
- prediction_ratios = []
488
- for task in task_history:
489
- if 'factors' in task and 'estimated_time' in task:
490
- prediction_ratios.append(task['process_time'] / task['estimated_time'])
491
 
492
- if prediction_ratios:
493
- correction_factor = np.median(prediction_ratios)
494
- correction_factor = max(0.5, min(2.0, correction_factor))
495
- estimated_wait *= correction_factor
496
-
497
- estimated_wait = max(0.1, estimated_wait)
498
- if waiting_tasks == 0 and active_tasks == 0:
499
- estimated_wait = 0
 
 
 
 
500
 
501
- recent_tasks = task_history[-5:] if task_history else []
 
502
 
503
- return {
504
- 'queue_size': queue_size,
505
- 'active_tasks': active_tasks,
506
- 'waiting_tasks': waiting_tasks,
507
- 'worker_threads': worker_threads,
508
- 'estimated_wait': estimated_wait,
509
- 'recent_tasks': recent_tasks
510
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
511
 
512
  def format_time(seconds):
513
  """Format time into readable format"""
@@ -591,12 +477,6 @@ def launch_workers():
591
  global running
592
  running = True
593
 
594
- # Start Redis queue monitor
595
- monitor = threading.Thread(target=redis_queue_monitor)
596
- monitor.daemon = True
597
- monitor.start()
598
-
599
- # Start worker threads
600
  for _ in range(worker_threads):
601
  worker = threading.Thread(target=queue_processor)
602
  worker.daemon = True
 
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
 
 
16
  from src.containerized_eval import eval_string_script
17
 
18
  # Add current directory and src directory to module search path
 
23
  if src_dir not in sys.path:
24
  sys.path.append(src_dir)
25
 
26
+ # Create message queue
27
+ task_queue = queue.Queue()
28
+ # Dictionary to store task status
29
+ task_status = {}
30
+ # List to store task history, max 200 tasks
31
+ task_history = []
 
 
 
 
 
 
32
  # Lock for shared resources
33
  lock = threading.Lock()
34
  # Number of worker threads
35
  worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
36
  # Flag for running background threads
37
  running = True
38
+ # Mapping from task type to processing time
39
+ task_type_times = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def queue_processor():
42
+ """Process tasks in the queue"""
43
  while running:
44
  try:
45
+ task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
+ with lock:
47
+ task_status[task_id]['status'] = 'processing'
48
+ task_status[task_id]['start_time'] = time.time()
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  if isinstance(input_data, list) and len(input_data) > 0:
51
  sample_task = input_data[0]
 
53
  task_size = len(input_data)
54
  task_complexity = _estimate_task_complexity(input_data)
55
 
56
+ with lock:
57
+ task_status[task_id]['estimated_factors'] = {
58
+ 'language': language,
59
+ 'size': task_size,
60
+ 'complexity': task_complexity
61
+ }
 
62
 
63
  result = evaluate(input_data)
64
 
65
  end_time = time.time()
66
+ process_time = end_time - task_status[task_id]['start_time']
67
 
68
+ with lock:
69
+ task_status[task_id]['status'] = 'completed'
70
+ task_status[task_id]['result'] = result
71
+ task_status[task_id]['end_time'] = end_time
72
+ task_status[task_id]['process_time'] = process_time
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
+ if 'estimated_factors' in task_status[task_id]:
75
+ factors = task_status[task_id]['estimated_factors']
76
+ key = f"{factors['language']}_{factors['complexity']}"
77
+
78
+ if key not in task_type_times:
79
+ task_type_times[key] = []
80
+
81
+ task_type_times[key].append(process_time / factors['size'])
82
+ if len(task_type_times[key]) > 10:
83
+ task_type_times[key] = task_type_times[key][-10:]
84
 
85
+ task_history.append({
86
+ 'task_id': task_id,
87
+ 'request_time': request_time,
88
+ 'process_time': process_time,
89
+ 'status': 'completed',
90
+ 'factors': task_status[task_id].get('estimated_factors', {})
91
+ })
92
+ while len(task_history) > 200:
93
+ task_history.pop(0)
94
+
95
+ task_queue.task_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  except queue.Empty:
98
  continue
99
  except Exception as e:
100
  if 'task_id' in locals():
101
+ with lock:
102
+ task_status[task_id]['status'] = 'error'
103
+ task_status[task_id]['error'] = str(e)
104
+ task_status[task_id]['end_time'] = time.time()
105
+ task_queue.task_done()
 
 
 
 
 
 
106
 
107
  def _estimate_task_complexity(tasks):
108
  """Estimate task complexity
 
233
  task_id = str(uuid.uuid4())
234
  request_time = time.time()
235
 
236
+ with lock:
237
+ task_status[task_id] = {
238
+ 'status': 'queued',
239
+ 'queued_time': request_time,
240
+ 'queue_position': task_queue.qsize() + 1,
241
+ 'synchronous': True,
242
+ 'estimated_factors': {
243
+ 'language': language,
244
+ 'size': task_size,
245
+ 'complexity': task_complexity
246
+ },
247
+ 'estimated_time': estimated_total_time
248
+ }
249
+
250
+ task_queue.put((task_id, input_data, request_time))
 
 
 
 
 
 
 
251
 
252
  while True:
253
+ with lock:
254
+ if task_id in task_status:
255
+ status = task_status[task_id]['status']
256
+ if status == 'completed':
257
+ result = task_status[task_id]['result']
258
+ task_status.pop(task_id, None)
259
+ return result
260
+ elif status == 'error':
261
+ error = task_status[task_id].get('error', 'Unknown error')
262
+ task_status.pop(task_id, None)
263
+ return {"status": "Exception", "error": error}
264
 
265
  time.sleep(0.1)
266
 
 
268
  """Get estimated processing time for a specific task type"""
269
  key = f"{language}_{complexity}"
270
 
271
+ if key in task_type_times and len(task_type_times[key]) > 0:
272
+ return np.median(task_type_times[key])
 
 
 
273
 
274
  if complexity == 'simple':
275
  return 1.0
 
296
  task_id = str(uuid.uuid4())
297
  request_time = time.time()
298
 
299
+ with lock:
300
+ task_status[task_id] = {
301
+ 'status': 'queued',
302
+ 'queued_time': request_time,
303
+ 'queue_position': task_queue.qsize() + 1,
304
+ 'estimated_factors': {
305
+ 'language': language,
306
+ 'size': task_size,
307
+ 'complexity': task_complexity
308
+ },
309
+ 'estimated_time': estimated_total_time
310
+ }
 
 
 
 
 
 
 
 
 
 
 
311
 
312
+ queue_info = get_queue_status()
313
  est_wait = queue_info['estimated_wait']
314
 
315
+ task_queue.put((task_id, input_data, request_time))
316
+
317
  return {
318
  'task_id': task_id,
319
  'status': 'queued',
320
+ 'queue_position': task_status[task_id]['queue_position'],
321
  'estimated_wait': est_wait,
322
  'estimated_processing': estimated_total_time
323
  }
324
 
325
  def check_status(task_id):
326
  """Check task status"""
327
+ with lock:
328
+ if task_id not in task_status:
329
+ return {'status': 'not_found'}
330
+
331
+ status_info = task_status[task_id].copy()
 
 
 
332
 
333
+ if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
334
+ task_status.pop(task_id, None)
335
+
336
+ return status_info
337
 
338
  def get_queue_status():
339
  """Get queue status"""
340
+ with lock:
341
+ queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
342
+ processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
343
 
344
+ queue_size = task_queue.qsize()
345
+ active_tasks = len(processing_tasks)
346
+ waiting_tasks = len(queued_tasks)
347
+
348
+ remaining_processing_time = 0
349
+ for task in processing_tasks:
350
+ if 'start_time' in task and 'estimated_time' in task:
351
+ elapsed = time.time() - task['start_time']
352
+ remaining = max(0, task['estimated_time'] - elapsed)
353
+ remaining_processing_time += remaining
354
+ else:
355
+ remaining_processing_time += 2
356
 
357
+ if active_tasks > 0:
358
+ remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
359
 
360
+ queued_processing_time = 0
361
+ for task in queued_tasks:
362
+ if 'estimated_time' in task:
363
+ queued_processing_time += task['estimated_time']
364
+ else:
365
+ queued_processing_time += 5
366
+
367
+ if worker_threads > 0 and queued_processing_time > 0:
368
+ queued_processing_time = queued_processing_time / worker_threads
369
+
370
+ estimated_wait = remaining_processing_time + queued_processing_time
371
+
372
+ if task_history:
373
+ prediction_ratios = []
374
+ for task in task_history:
375
+ if 'factors' in task and 'estimated_time' in task:
376
+ prediction_ratios.append(task['process_time'] / task['estimated_time'])
377
+
378
+ if prediction_ratios:
379
+ correction_factor = np.median(prediction_ratios)
380
+ correction_factor = max(0.5, min(2.0, correction_factor))
381
+ estimated_wait *= correction_factor
382
+
383
+ estimated_wait = max(0.1, estimated_wait)
384
+ if waiting_tasks == 0 and active_tasks == 0:
385
+ estimated_wait = 0
386
+
387
+ recent_tasks = task_history[-5:] if task_history else []
388
+
389
+ return {
390
+ 'queue_size': queue_size,
391
+ 'active_tasks': active_tasks,
392
+ 'waiting_tasks': waiting_tasks,
393
+ 'worker_threads': worker_threads,
394
+ 'estimated_wait': estimated_wait,
395
+ 'recent_tasks': recent_tasks
396
+ }
397
 
398
  def format_time(seconds):
399
  """Format time into readable format"""
 
477
  global running
478
  running = True
479
 
 
 
 
 
 
 
480
  for _ in range(worker_threads):
481
  worker = threading.Thread(target=queue_processor)
482
  worker.daemon = True
requirements.txt CHANGED
@@ -1,2 +1 @@
1
- gradio==4.44.1
2
- redis==5.0.1
 
1
+ gradio==4.44.1