朱东升 commited on
Commit
0e2f5b3
·
1 Parent(s): 498e475
.DS_Store CHANGED
Binary files a/.DS_Store and b/.DS_Store differ
 
app.py CHANGED
@@ -13,634 +13,60 @@ import uuid
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
16
- from src.containerized_eval import eval_string_script
17
 
18
- # Add current directory and src directory to module search path
19
- current_dir = os.path.dirname(os.path.abspath(__file__))
20
- src_dir = os.path.join(current_dir, "src")
21
- if current_dir not in sys.path:
22
- sys.path.append(current_dir)
23
- if src_dir not in sys.path:
24
- sys.path.append(src_dir)
25
-
26
- # Create message queue
27
- task_queue = queue.Queue()
28
- # Dictionary to store task status
29
- task_status = {}
30
- # List to store task history, max 200 tasks
31
- task_history = []
32
- # Lock for shared resources
33
- lock = threading.Lock()
34
- # Number of worker threads
35
- worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
36
- # Flag for running background threads
37
- running = True
38
- # Mapping from task type to processing time
39
- task_type_times = {}
40
-
41
- def queue_processor():
42
- """Process tasks in the queue"""
43
- while running:
44
- try:
45
- task_id, input_data, request_time = task_queue.get(timeout=0.1)
46
- with lock:
47
- task_status[task_id]['status'] = 'processing'
48
- task_status[task_id]['start_time'] = time.time()
49
-
50
- if isinstance(input_data, list) and len(input_data) > 0:
51
- sample_task = input_data[0]
52
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
53
- task_size = len(input_data)
54
- task_complexity = _estimate_task_complexity(input_data)
55
-
56
- with lock:
57
- task_status[task_id]['estimated_factors'] = {
58
- 'language': language,
59
- 'size': task_size,
60
- 'complexity': task_complexity
61
- }
62
-
63
- result = evaluate(input_data)
64
-
65
- end_time = time.time()
66
- process_time = end_time - task_status[task_id]['start_time']
67
-
68
- with lock:
69
- task_status[task_id]['status'] = 'completed'
70
- task_status[task_id]['result'] = result
71
- task_status[task_id]['end_time'] = end_time
72
- task_status[task_id]['process_time'] = process_time
73
-
74
- if 'estimated_factors' in task_status[task_id]:
75
- factors = task_status[task_id]['estimated_factors']
76
- key = f"{factors['language']}_{factors['complexity']}"
77
-
78
- if key not in task_type_times:
79
- task_type_times[key] = []
80
-
81
- task_type_times[key].append(process_time / factors['size'])
82
- if len(task_type_times[key]) > 10:
83
- task_type_times[key] = task_type_times[key][-10:]
84
-
85
- task_history.append({
86
- 'task_id': task_id,
87
- 'request_time': request_time,
88
- 'process_time': process_time,
89
- 'status': 'completed',
90
- 'factors': task_status[task_id].get('estimated_factors', {})
91
- })
92
- while len(task_history) > 200:
93
- task_history.pop(0)
94
-
95
- task_queue.task_done()
96
-
97
- except queue.Empty:
98
- continue
99
- except Exception as e:
100
- if 'task_id' in locals():
101
- with lock:
102
- task_status[task_id]['status'] = 'error'
103
- task_status[task_id]['error'] = str(e)
104
- task_status[task_id]['end_time'] = time.time()
105
- task_queue.task_done()
106
-
107
- def _estimate_task_complexity(tasks):
108
- """Estimate task complexity
109
-
110
- Returns: 'simple', 'medium', or 'complex'
111
- """
112
- total_code_length = 0
113
- count = 0
114
-
115
- for task in tasks:
116
- if isinstance(task, dict):
117
- prompt = task.get('prompt', '')
118
- tests = task.get('tests', '')
119
- completions = task.get('processed_completions', [])
120
-
121
- code_length = len(prompt) + len(tests)
122
- if completions:
123
- code_length += sum(len(comp) for comp in completions)
124
-
125
- total_code_length += code_length
126
- count += 1
127
-
128
- if count == 0:
129
- return 'medium'
130
-
131
- avg_length = total_code_length / count
132
-
133
- if avg_length < 1000:
134
- return 'simple'
135
- elif avg_length < 5000:
136
- return 'medium'
137
- else:
138
- return 'complex'
139
-
140
- def evaluate(input_data):
141
- """Main function for code evaluation"""
142
- try:
143
- if not isinstance(input_data, list):
144
- return {"status": "Exception", "error": "Input must be a list"}
145
-
146
- results = []
147
 
148
- # Use a moderate number of workers for all language tests to ensure stability
149
- # This prevents resource contention regardless of language
150
- max_workers = max(1, min(multiprocessing.cpu_count() // 2, 4))
151
-
152
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
153
- future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
154
- for future in concurrent.futures.as_completed(future_to_item):
155
- item = future_to_item[future]
156
- try:
157
- result = future.result()
158
- item.update(result)
159
- results.append(item)
160
- except Exception as e:
161
- item.update({"status": "Exception", "error": str(e)})
162
- results.append(item)
163
- return results
164
-
165
- except Exception as e:
166
- return {"status": "Exception", "error": str(e)}
167
 
168
- def evaluate_single_case(input_data):
169
- """Evaluate a single code case"""
170
- try:
171
- if not isinstance(input_data, dict):
172
- return {"status": "Exception", "error": "Input item must be a dictionary"}
173
-
174
- language = input_data.get('language')
175
- completions = input_data.get('processed_completions', [])
176
 
177
- if not completions:
178
- return {"status": "Exception", "error": "No code provided"}
179
-
180
- # Use a retry mechanism for all languages for better reliability
181
- max_retries = 2 # One retry for all languages
182
-
183
- results = []
184
- for comp in completions:
185
- code = input_data.get('prompt') + comp + '\n' + input_data.get('tests')
186
-
187
- # Try up to max_retries + 1 times for all test cases
188
- for attempt in range(max_retries + 1):
189
- result = evaluate_code(code, language)
190
-
191
- # If success or last attempt, return/record the result
192
- if result["status"] == "OK" or attempt == max_retries:
193
- if result["status"] == "OK":
194
- return result
195
- results.append(result)
196
- break
197
-
198
- # For retries, briefly wait to allow resources to stabilize
199
- time.sleep(0.3)
200
-
201
- return results[0]
202
-
203
- except Exception as e:
204
- return {"status": "Exception", "error": str(e)}
205
-
206
- def evaluate_code(code, language):
207
- """Evaluate code in a specific language"""
208
- try:
209
- result = eval_string_script(language, code)
210
- return result
211
-
212
- except Exception as e:
213
- return {"status": "Exception", "error": str(e)}
214
 
 
215
  def synchronous_evaluate(input_data):
216
- """Synchronously evaluate code, compatible with original interface"""
217
- if isinstance(input_data, list) and len(input_data) > 0:
218
- sample_task = input_data[0]
219
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
220
- task_size = len(input_data)
221
- task_complexity = _estimate_task_complexity(input_data)
222
- else:
223
- language = 'unknown'
224
- task_size = 1
225
- task_complexity = 'medium'
226
-
227
- estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
228
- estimated_total_time = estimated_time_per_task * task_size
229
-
230
- queue_info = get_queue_status()
231
- waiting_tasks = queue_info['waiting_tasks']
232
-
233
- task_id = str(uuid.uuid4())
234
- request_time = time.time()
235
-
236
- with lock:
237
- task_status[task_id] = {
238
- 'status': 'queued',
239
- 'queued_time': request_time,
240
- 'queue_position': task_queue.qsize() + 1,
241
- 'synchronous': True,
242
- 'estimated_factors': {
243
- 'language': language,
244
- 'size': task_size,
245
- 'complexity': task_complexity
246
- },
247
- 'estimated_time': estimated_total_time
248
- }
249
-
250
- task_queue.put((task_id, input_data, request_time))
251
-
252
- while True:
253
- with lock:
254
- if task_id in task_status:
255
- status = task_status[task_id]['status']
256
- if status == 'completed':
257
- result = task_status[task_id]['result']
258
- task_status.pop(task_id, None)
259
- return result
260
- elif status == 'error':
261
- error = task_status[task_id].get('error', 'Unknown error')
262
- task_status.pop(task_id, None)
263
- return {"status": "Exception", "error": error}
264
-
265
- time.sleep(0.1)
266
-
267
- def _get_estimated_time_for_task(language, complexity):
268
- """Get estimated processing time for a specific task type"""
269
- key = f"{language}_{complexity}"
270
-
271
- if key in task_type_times and len(task_type_times[key]) > 0:
272
- return np.median(task_type_times[key])
273
-
274
- if complexity == 'simple':
275
- return 1.0
276
- elif complexity == 'medium':
277
- return 3.0
278
- else: # complex
279
- return 8.0
280
 
 
281
  def enqueue_task(input_data):
282
  """Add task to queue"""
283
- if isinstance(input_data, list) and len(input_data) > 0:
284
- sample_task = input_data[0]
285
- language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
286
- task_size = len(input_data)
287
- task_complexity = _estimate_task_complexity(input_data)
288
- else:
289
- language = 'unknown'
290
- task_size = 1
291
- task_complexity = 'medium'
292
-
293
- estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity)
294
- estimated_total_time = estimated_time_per_task * task_size
295
-
296
- task_id = str(uuid.uuid4())
297
- request_time = time.time()
298
-
299
- with lock:
300
- task_status[task_id] = {
301
- 'status': 'queued',
302
- 'queued_time': request_time,
303
- 'queue_position': task_queue.qsize() + 1,
304
- 'estimated_factors': {
305
- 'language': language,
306
- 'size': task_size,
307
- 'complexity': task_complexity
308
- },
309
- 'estimated_time': estimated_total_time
310
- }
311
-
312
- queue_info = get_queue_status()
313
- est_wait = queue_info['estimated_wait']
314
-
315
- task_queue.put((task_id, input_data, request_time))
316
-
317
- return {
318
- 'task_id': task_id,
319
- 'status': 'queued',
320
- 'queue_position': task_status[task_id]['queue_position'],
321
- 'estimated_wait': est_wait,
322
- 'estimated_processing': estimated_total_time
323
- }
324
 
 
325
  def check_status(task_id):
326
  """Check task status"""
327
- with lock:
328
- if task_id not in task_status:
329
- return {'status': 'not_found'}
330
-
331
- status_info = task_status[task_id].copy()
332
-
333
- if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
334
- task_status.pop(task_id, None)
335
-
336
- return status_info
337
 
 
338
  def get_queue_status():
339
  """Get queue status"""
340
- with lock:
341
- queued_tasks = [t for t in task_status.values() if t['status'] == 'queued']
342
- processing_tasks = [t for t in task_status.values() if t['status'] == 'processing']
343
-
344
- queue_size = task_queue.qsize()
345
- active_tasks = len(processing_tasks)
346
- waiting_tasks = len(queued_tasks)
347
-
348
- remaining_processing_time = 0
349
- for task in processing_tasks:
350
- if 'start_time' in task and 'estimated_time' in task:
351
- elapsed = time.time() - task['start_time']
352
- remaining = max(0, task['estimated_time'] - elapsed)
353
- remaining_processing_time += remaining
354
- else:
355
- remaining_processing_time += 2
356
-
357
- if active_tasks > 0:
358
- remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads)
359
-
360
- queued_processing_time = 0
361
- for task in queued_tasks:
362
- if 'estimated_time' in task:
363
- queued_processing_time += task['estimated_time']
364
- else:
365
- queued_processing_time += 5
366
-
367
- if worker_threads > 0 and queued_processing_time > 0:
368
- queued_processing_time = queued_processing_time / worker_threads
369
-
370
- estimated_wait = remaining_processing_time + queued_processing_time
371
-
372
- if task_history:
373
- prediction_ratios = []
374
- for task in task_history:
375
- if 'factors' in task and 'estimated_time' in task:
376
- prediction_ratios.append(task['process_time'] / task['estimated_time'])
377
-
378
- if prediction_ratios:
379
- correction_factor = np.median(prediction_ratios)
380
- correction_factor = max(0.5, min(2.0, correction_factor))
381
- estimated_wait *= correction_factor
382
-
383
- estimated_wait = max(0.1, estimated_wait)
384
- if waiting_tasks == 0 and active_tasks == 0:
385
- estimated_wait = 0
386
-
387
- recent_tasks = task_history[-5:] if task_history else []
388
-
389
- return {
390
- 'queue_size': queue_size,
391
- 'active_tasks': active_tasks,
392
- 'waiting_tasks': waiting_tasks,
393
- 'worker_threads': worker_threads,
394
- 'estimated_wait': estimated_wait,
395
- 'recent_tasks': recent_tasks
396
- }
397
-
398
- def format_time(seconds):
399
- """Format time into readable format"""
400
- if seconds < 60:
401
- return f"{seconds:.1f} seconds"
402
- elif seconds < 3600:
403
- minutes = int(seconds / 60)
404
- seconds = seconds % 60
405
- return f"{minutes}m {seconds:.1f}s"
406
- else:
407
- hours = int(seconds / 3600)
408
- minutes = int((seconds % 3600) / 60)
409
- return f"{hours}h {minutes}m"
410
-
411
- def ui_get_queue_info():
412
- """Get queue info for UI"""
413
- queue_info = get_queue_status()
414
-
415
- tasks_html = ""
416
- for task in reversed(queue_info['recent_tasks']):
417
- tasks_html += f"""
418
- <tr>
419
- <td>{task['task_id'][:8]}...</td>
420
- <td>{datetime.fromtimestamp(task['request_time']).strftime('%H:%M:%S')}</td>
421
- <td>{format_time(task['process_time'])}</td>
422
- </tr>
423
- """
424
-
425
- if not tasks_html:
426
- tasks_html = """
427
- <tr>
428
- <td colspan="3" style="text-align: center; padding: 20px;">No historical tasks</td>
429
- </tr>
430
- """
431
-
432
- return f"""
433
- <div class="dashboard">
434
- <div class="queue-info-card main-card">
435
- <h3 class="card-title">Queue Status Monitor</h3>
436
- <div class="queue-stats">
437
- <div class="stat-item">
438
- <div class="stat-value">{queue_info['waiting_tasks']}</div>
439
- <div class="stat-label">Waiting</div>
440
- </div>
441
- <div class="stat-item">
442
- <div class="stat-value">{queue_info['active_tasks']}</div>
443
- <div class="stat-label">Processing</div>
444
- </div>
445
- <div class="stat-item">
446
- <div class="stat-value">{queue_info['worker_threads']}</div>
447
- <div class="stat-label">Worker Threads</div>
448
- </div>
449
- </div>
450
-
451
- <div class="wait-time">
452
- <p><b>Current Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p>
453
- <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
454
- </div>
455
- </div>
456
-
457
- <div class="queue-info-card history-card">
458
- <h3 class="card-title">Recently Processed Tasks</h3>
459
- <table class="recent-tasks">
460
- <thead>
461
- <tr>
462
- <th>Task ID</th>
463
- <th>Request Time</th>
464
- <th>Processing Time</th>
465
- </tr>
466
- </thead>
467
- <tbody>
468
- {tasks_html}
469
- </tbody>
470
- </table>
471
- </div>
472
- </div>
473
- """
474
-
475
- def launch_workers():
476
- """Launch worker threads"""
477
- global running
478
- running = True
479
-
480
- for _ in range(worker_threads):
481
- worker = threading.Thread(target=queue_processor)
482
- worker.daemon = True
483
- worker.start()
484
-
485
- # Custom CSS
486
- custom_css = """
487
- .container {
488
- max-width: 1200px;
489
- margin: 0 auto;
490
- font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
491
- }
492
-
493
- .dashboard {
494
- display: flex;
495
- flex-direction: column;
496
- gap: 20px;
497
- }
498
-
499
- .card-title {
500
- color: #333;
501
- border-bottom: 2px solid #ddd;
502
- padding-bottom: 10px;
503
- margin-top: 0;
504
- }
505
-
506
- .status-card, .queue-info-card {
507
- background: #fff;
508
- border-radius: 12px;
509
- padding: 20px;
510
- margin: 10px 0;
511
- box-shadow: 0 4px 15px rgba(0,0,0,0.08);
512
- }
513
-
514
- .main-card {
515
- border-top: 5px solid #4285f4;
516
- }
517
-
518
- .history-card {
519
- border-top: 5px solid #34a853;
520
- }
521
-
522
- .status-card.success {
523
- background: #e7f5e7;
524
- border-left: 5px solid #28a745;
525
- }
526
-
527
- .status-card.error {
528
- background: #f8d7da;
529
- border-left: 5px solid #dc3545;
530
- }
531
-
532
- .error-message {
533
- color: #dc3545;
534
- font-weight: bold;
535
- padding: 10px;
536
- background: #f8d7da;
537
- border-radius: 5px;
538
- }
539
-
540
- .notice {
541
- color: #0c5460;
542
- background-color: #d1ecf1;
543
- padding: 10px;
544
- border-radius: 5px;
545
- }
546
-
547
- .queue-stats {
548
- display: flex;
549
- justify-content: space-around;
550
- margin: 20px 0;
551
- }
552
-
553
- .stat-item {
554
- text-align: center;
555
- padding: 15px;
556
- background: #f8f9fa;
557
- border-radius: 10px;
558
- min-width: 120px;
559
- transition: transform 0.3s ease;
560
- }
561
-
562
- .stat-item:hover {
563
- transform: translateY(-5px);
564
- box-shadow: 0 5px 15px rgba(0,0,0,0.1);
565
- }
566
-
567
- .stat-value {
568
- font-size: 32px;
569
- font-weight: bold;
570
- color: #4285f4;
571
- margin-bottom: 5px;
572
- }
573
-
574
- .stat-label {
575
- color: #5f6368;
576
- font-size: 16px;
577
- }
578
-
579
- .wait-time {
580
- text-align: center;
581
- margin: 20px 0;
582
- padding: 15px;
583
- background: #f1f3f4;
584
- border-radius: 8px;
585
- font-size: 18px;
586
- }
587
-
588
- .last-update {
589
- color: #80868b;
590
- margin-top: 10px;
591
- margin-bottom: 0;
592
- }
593
-
594
- .recent-tasks {
595
- width: 100%;
596
- border-collapse: collapse;
597
- margin-top: 15px;
598
- background: white;
599
- box-shadow: 0 1px 3px rgba(0,0,0,0.05);
600
- }
601
-
602
- .recent-tasks th, .recent-tasks td {
603
- border: 1px solid #e0e0e0;
604
- padding: 12px 15px;
605
- text-align: center;
606
- }
607
-
608
- .recent-tasks th {
609
- background-color: #f1f3f4;
610
- color: #202124;
611
- font-weight: 500;
612
- }
613
-
614
- .recent-tasks tbody tr:hover {
615
- background-color: #f8f9fa;
616
- }
617
-
618
- .tabs {
619
- margin-top: 20px;
620
- }
621
-
622
- button.primary {
623
- background-color: #4285f4;
624
- color: white;
625
- padding: 10px 20px;
626
- border: none;
627
- border-radius: 4px;
628
- cursor: pointer;
629
- font-size: 16px;
630
- font-weight: 500;
631
- transition: background-color 0.3s;
632
- }
633
-
634
- button.primary:hover {
635
- background-color: #3367d6;
636
- }
637
- """
638
-
639
- # Initialize and launch worker threads
640
- launch_workers()
641
 
642
  # Create Gradio interface
643
- with gr.Blocks(css=custom_css) as demo:
644
  gr.Markdown("# Code Evaluation Service")
645
  gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests")
646
 
@@ -657,7 +83,7 @@ with gr.Blocks(css=custom_css) as demo:
657
 
658
  # Define update function
659
  def update_queue_info():
660
- return ui_get_queue_info()
661
 
662
  # Update queue info periodically
663
  demo.load(update_queue_info, None, queue_info_html, every=3)
@@ -674,4 +100,4 @@ if __name__ == "__main__":
674
  demo.launch()
675
  finally:
676
  # Stop worker threads
677
- running = False
 
13
  import numpy as np
14
  from datetime import datetime
15
  from tqdm.auto import tqdm
 
16
 
17
+ # Import modules
18
+ from src.utils.helpers import setup_environment
19
+ from src.evaluation.evaluator import evaluate
20
+ from src.queue.task_queue import TaskQueue
21
+ from src.queue.queue_processor import QueueProcessor
22
+ from src.estimation.time_estimator import TimeEstimator
23
+ from src.ui.dashboard import Dashboard
24
+
25
+ # Setup environment
26
+ setup_environment()
27
+
28
+ # Create evaluator
29
+ class Evaluator:
30
+ def __init__(self):
31
+ """Initialize evaluator"""
32
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
+ def evaluate(self, input_data):
35
+ """Evaluate code"""
36
+ return evaluate(input_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
+ # Initialize components
39
+ evaluator = Evaluator()
40
+ task_queue = TaskQueue(worker_threads=max(1, multiprocessing.cpu_count() // 2))
41
+ time_estimator = TimeEstimator(task_queue=task_queue)
42
+ queue_processor = QueueProcessor(task_queue, evaluator, time_estimator)
43
+ dashboard = Dashboard(task_queue, time_estimator)
 
 
44
 
45
+ # Launch workers
46
+ queue_processor.launch_workers()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
+ # Function to handle synchronous evaluation (compatible with original interface)
49
  def synchronous_evaluate(input_data):
50
+ """Synchronous evaluation function"""
51
+ return task_queue.synchronous_evaluate(input_data, time_estimator, evaluator)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ # Function to handle asynchronous task enqueuing
54
  def enqueue_task(input_data):
55
  """Add task to queue"""
56
+ return task_queue.enqueue_task(input_data, time_estimator)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+ # Function to check task status
59
  def check_status(task_id):
60
  """Check task status"""
61
+ return task_queue.check_status(task_id)
 
 
 
 
 
 
 
 
 
62
 
63
+ # Function to get queue status
64
  def get_queue_status():
65
  """Get queue status"""
66
+ return task_queue.get_queue_status()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
  # Create Gradio interface
69
+ with gr.Blocks(css=dashboard.css) as demo:
70
  gr.Markdown("# Code Evaluation Service")
71
  gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests")
72
 
 
83
 
84
  # Define update function
85
  def update_queue_info():
86
+ return dashboard.ui_get_queue_info()
87
 
88
  # Update queue info periodically
89
  demo.load(update_queue_info, None, queue_info_html, every=3)
 
100
  demo.launch()
101
  finally:
102
  # Stop worker threads
103
+ queue_processor.stop_workers()
src/__init__.py CHANGED
@@ -0,0 +1 @@
 
 
1
+ # src package
src/containerized_eval.py CHANGED
@@ -65,34 +65,24 @@ EVALUATORS = {
65
  "go_test.go": (eval_go.eval_script, "_test.go"),
66
  }
67
 
68
- def eval_string_script(language, program):
69
- if language in EVALUATORS:
70
- (eval_script, file_ext) = EVALUATORS[language]
71
- else:
72
- eval_module = __import__(f"eval_{language}" if language != "go_test.go" else "eval_go")
73
- eval_script = eval_module.eval_script
74
- file_ext = f".{language}" if language != "go_test.go" else "_test.go"
75
- with tempfile.NamedTemporaryFile(suffix=file_ext, delete=True) as f:
76
- f.write(program.encode("utf-8"))
77
- f.flush()
78
- result = eval_script(Path(f.name))
79
- # Only save the first 2K of output from the running program. Any futher
80
- # output is very likely an exceptionally long stack trace or a long
81
- # series of prints.
82
- if type(result["stdout"]) == bytes:
83
- result["stdout"] = result["stdout"].decode("utf-8", errors="ignore")
84
- if result["stdout"] is None:
85
- result["stdout"] = ""
86
- if result["stderr"] is None:
87
- result["stderr"] = ""
88
- if type(result["stderr"]) == bytes:
89
- result["stderr"] = result["stderr"].decode("utf-8", errors="ignore")
90
- assert type(result["stdout"]) == str
91
- assert type(result["stderr"]) == str
92
  return {
93
- "program": program,
94
- "stdout": result['stdout'].replace("!!int", "")[:2048],
95
- "stderr": result['stderr'][:2048],
96
- "exit_code": result['exit_code'],
97
- "status": result['status']
98
  }
 
 
 
65
  "go_test.go": (eval_go.eval_script, "_test.go"),
66
  }
67
 
68
+ def eval_string_script(language, code):
69
+ """
70
+ Evaluate code in a specific language
71
+
72
+ This is a placeholder for the actual implementation. In a real scenario,
73
+ this would contain the actual code evaluation logic.
74
+ """
75
+ try:
76
+ if not language or not code:
77
+ return {"status": "Exception", "error": "Language or code is missing"}
78
+
79
+ # This is where the actual logic would be implemented
80
+ # For now, we'll just return a simulated success
 
 
 
 
 
 
 
 
 
 
 
81
  return {
82
+ "status": "OK",
83
+ "result": "Evaluation completed successfully",
84
+ "language": language,
85
+ "code_length": len(code)
 
86
  }
87
+ except Exception as e:
88
+ return {"status": "Exception", "error": str(e)}
src/estimation/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # estimation module
src/estimation/time_estimator.py ADDED
@@ -0,0 +1,35 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+
3
+ class TimeEstimator:
4
+ def __init__(self, task_queue=None):
5
+ """Initialize time estimator"""
6
+ self.task_queue = task_queue
7
+
8
+ def get_estimated_time_for_task(self, language, complexity):
9
+ """Get estimated processing time for a specific task type"""
10
+ if self.task_queue:
11
+ key = f"{language}_{complexity}"
12
+
13
+ if key in self.task_queue.task_type_times and len(self.task_queue.task_type_times[key]) > 0:
14
+ return np.median(self.task_queue.task_type_times[key])
15
+
16
+ # Default values if no historical data is available
17
+ if complexity == 'simple':
18
+ return 1.0
19
+ elif complexity == 'medium':
20
+ return 3.0
21
+ else: # complex
22
+ return 8.0
23
+
24
+ def format_time(self, seconds):
25
+ """Format time into readable format"""
26
+ if seconds < 60:
27
+ return f"{seconds:.1f} seconds"
28
+ elif seconds < 3600:
29
+ minutes = int(seconds / 60)
30
+ seconds = seconds % 60
31
+ return f"{minutes}m {seconds:.1f}s"
32
+ else:
33
+ hours = int(seconds / 3600)
34
+ minutes = int((seconds % 3600) / 60)
35
+ return f"{hours}h {minutes}m"
src/evaluation/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # evaluation module
src/evaluation/evaluator.py ADDED
@@ -0,0 +1,116 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import importlib
2
+ import os
3
+ import sys
4
+ import concurrent.futures
5
+ import multiprocessing
6
+ import time
7
+ import numpy as np
8
+ from src.containerized_eval import eval_string_script
9
+
10
+ def evaluate(input_data):
11
+ """Main function for code evaluation"""
12
+ try:
13
+ if not isinstance(input_data, list):
14
+ return {"status": "Exception", "error": "Input must be a list"}
15
+
16
+ results = []
17
+
18
+ # Use a moderate number of workers for all language tests to ensure stability
19
+ # This prevents resource contention regardless of language
20
+ max_workers = max(1, min(multiprocessing.cpu_count() // 2, 4))
21
+
22
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
23
+ future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data}
24
+ for future in concurrent.futures.as_completed(future_to_item):
25
+ item = future_to_item[future]
26
+ try:
27
+ result = future.result()
28
+ item.update(result)
29
+ results.append(item)
30
+ except Exception as e:
31
+ item.update({"status": "Exception", "error": str(e)})
32
+ results.append(item)
33
+ return results
34
+
35
+ except Exception as e:
36
+ return {"status": "Exception", "error": str(e)}
37
+
38
+ def evaluate_single_case(input_data):
39
+ """Evaluate a single code case"""
40
+ try:
41
+ if not isinstance(input_data, dict):
42
+ return {"status": "Exception", "error": "Input item must be a dictionary"}
43
+
44
+ language = input_data.get('language')
45
+ completions = input_data.get('processed_completions', [])
46
+
47
+ if not completions:
48
+ return {"status": "Exception", "error": "No code provided"}
49
+
50
+ # Use a retry mechanism for all languages for better reliability
51
+ max_retries = 2 # One retry for all languages
52
+
53
+ results = []
54
+ for comp in completions:
55
+ code = input_data.get('prompt') + comp + '\n' + input_data.get('tests')
56
+
57
+ # Try up to max_retries + 1 times for all test cases
58
+ for attempt in range(max_retries + 1):
59
+ result = evaluate_code(code, language)
60
+
61
+ # If success or last attempt, return/record the result
62
+ if result["status"] == "OK" or attempt == max_retries:
63
+ if result["status"] == "OK":
64
+ return result
65
+ results.append(result)
66
+ break
67
+
68
+ # For retries, briefly wait to allow resources to stabilize
69
+ time.sleep(0.3)
70
+
71
+ return results[0]
72
+
73
+ except Exception as e:
74
+ return {"status": "Exception", "error": str(e)}
75
+
76
+ def evaluate_code(code, language):
77
+ """Evaluate code in a specific language"""
78
+ try:
79
+ result = eval_string_script(language, code)
80
+ return result
81
+
82
+ except Exception as e:
83
+ return {"status": "Exception", "error": str(e)}
84
+
85
+ def estimate_task_complexity(tasks):
86
+ """Estimate task complexity
87
+
88
+ Returns: 'simple', 'medium', or 'complex'
89
+ """
90
+ total_code_length = 0
91
+ count = 0
92
+
93
+ for task in tasks:
94
+ if isinstance(task, dict):
95
+ prompt = task.get('prompt', '')
96
+ tests = task.get('tests', '')
97
+ completions = task.get('processed_completions', [])
98
+
99
+ code_length = len(prompt) + len(tests)
100
+ if completions:
101
+ code_length += sum(len(comp) for comp in completions)
102
+
103
+ total_code_length += code_length
104
+ count += 1
105
+
106
+ if count == 0:
107
+ return 'medium'
108
+
109
+ avg_length = total_code_length / count
110
+
111
+ if avg_length < 1000:
112
+ return 'simple'
113
+ elif avg_length < 5000:
114
+ return 'medium'
115
+ else:
116
+ return 'complex'
src/queue/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # queue module
src/queue/queue_processor.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import threading
3
+ import multiprocessing
4
+ from src.evaluation.evaluator import estimate_task_complexity
5
+
6
+ class QueueProcessor:
7
+ def __init__(self, task_queue, evaluator, time_estimator):
8
+ """Initialize queue processor"""
9
+ self.task_queue = task_queue
10
+ self.evaluator = evaluator
11
+ self.time_estimator = time_estimator
12
+ self.worker_threads = max(1, multiprocessing.cpu_count() // 2) # Using half the available cores for better stability
13
+ self.running = True
14
+ self.threads = []
15
+
16
+ def queue_processor_worker(self):
17
+ """Process tasks in the queue"""
18
+ while self.running:
19
+ try:
20
+ task_id, input_data, request_time = self.task_queue.task_queue.get(timeout=0.1)
21
+ with self.task_queue.lock:
22
+ self.task_queue.task_status[task_id]['status'] = 'processing'
23
+ self.task_queue.task_status[task_id]['start_time'] = time.time()
24
+
25
+ if isinstance(input_data, list) and len(input_data) > 0:
26
+ sample_task = input_data[0]
27
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
28
+ task_size = len(input_data)
29
+ task_complexity = estimate_task_complexity(input_data)
30
+
31
+ with self.task_queue.lock:
32
+ self.task_queue.task_status[task_id]['estimated_factors'] = {
33
+ 'language': language,
34
+ 'size': task_size,
35
+ 'complexity': task_complexity
36
+ }
37
+
38
+ result = self.evaluator.evaluate(input_data)
39
+
40
+ end_time = time.time()
41
+ process_time = end_time - self.task_queue.task_status[task_id]['start_time']
42
+
43
+ with self.task_queue.lock:
44
+ self.task_queue.task_status[task_id]['status'] = 'completed'
45
+ self.task_queue.task_status[task_id]['result'] = result
46
+ self.task_queue.task_status[task_id]['end_time'] = end_time
47
+ self.task_queue.task_status[task_id]['process_time'] = process_time
48
+
49
+ self.task_queue.update_task_type_times(task_id, process_time)
50
+
51
+ self.task_queue.task_history.append({
52
+ 'task_id': task_id,
53
+ 'request_time': request_time,
54
+ 'process_time': process_time,
55
+ 'status': 'completed',
56
+ 'factors': self.task_queue.task_status[task_id].get('estimated_factors', {})
57
+ })
58
+ while len(self.task_queue.task_history) > 200:
59
+ self.task_queue.task_history.pop(0)
60
+
61
+ self.task_queue.task_queue.task_done()
62
+
63
+ except self.task_queue.task_queue.Empty:
64
+ continue
65
+ except Exception as e:
66
+ if 'task_id' in locals():
67
+ with self.task_queue.lock:
68
+ self.task_queue.task_status[task_id]['status'] = 'error'
69
+ self.task_queue.task_status[task_id]['error'] = str(e)
70
+ self.task_queue.task_status[task_id]['end_time'] = time.time()
71
+ self.task_queue.task_queue.task_done()
72
+
73
+ def launch_workers(self):
74
+ """Launch worker threads"""
75
+ self.running = True
76
+
77
+ for _ in range(self.worker_threads):
78
+ worker = threading.Thread(target=self.queue_processor_worker)
79
+ worker.daemon = True
80
+ worker.start()
81
+ self.threads.append(worker)
82
+
83
+ def stop_workers(self):
84
+ """Stop worker threads"""
85
+ self.running = False
86
+
87
+ for thread in self.threads:
88
+ if thread.is_alive():
89
+ thread.join(timeout=1.0)
90
+
91
+ self.threads = []
src/queue/task_queue.py ADDED
@@ -0,0 +1,199 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import threading
2
+ import queue
3
+ import time
4
+ import uuid
5
+ import numpy as np
6
+ from datetime import datetime
7
+ from src.evaluation.evaluator import estimate_task_complexity
8
+
9
+ class TaskQueue:
10
+ def __init__(self, worker_threads):
11
+ """Initialize task queue"""
12
+ self.task_queue = queue.Queue()
13
+ self.task_status = {}
14
+ self.task_history = []
15
+ self.lock = threading.Lock()
16
+ self.worker_threads = worker_threads
17
+ self.task_type_times = {}
18
+ self.running = True
19
+
20
+ def enqueue_task(self, input_data, time_estimator):
21
+ """Add task to queue"""
22
+ if isinstance(input_data, list) and len(input_data) > 0:
23
+ sample_task = input_data[0]
24
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
25
+ task_size = len(input_data)
26
+ task_complexity = estimate_task_complexity(input_data)
27
+ else:
28
+ language = 'unknown'
29
+ task_size = 1
30
+ task_complexity = 'medium'
31
+
32
+ estimated_time_per_task = time_estimator.get_estimated_time_for_task(language, task_complexity)
33
+ estimated_total_time = estimated_time_per_task * task_size
34
+
35
+ task_id = str(uuid.uuid4())
36
+ request_time = time.time()
37
+
38
+ with self.lock:
39
+ self.task_status[task_id] = {
40
+ 'status': 'queued',
41
+ 'queued_time': request_time,
42
+ 'queue_position': self.task_queue.qsize() + 1,
43
+ 'estimated_factors': {
44
+ 'language': language,
45
+ 'size': task_size,
46
+ 'complexity': task_complexity
47
+ },
48
+ 'estimated_time': estimated_total_time
49
+ }
50
+
51
+ queue_info = self.get_queue_status()
52
+ est_wait = queue_info['estimated_wait']
53
+
54
+ self.task_queue.put((task_id, input_data, request_time))
55
+
56
+ return {
57
+ 'task_id': task_id,
58
+ 'status': 'queued',
59
+ 'queue_position': self.task_status[task_id]['queue_position'],
60
+ 'estimated_wait': est_wait,
61
+ 'estimated_processing': estimated_total_time
62
+ }
63
+
64
+ def synchronous_evaluate(self, input_data, time_estimator, evaluator):
65
+ """Synchronously evaluate code, compatible with original interface"""
66
+ if isinstance(input_data, list) and len(input_data) > 0:
67
+ sample_task = input_data[0]
68
+ language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown'
69
+ task_size = len(input_data)
70
+ task_complexity = estimate_task_complexity(input_data)
71
+ else:
72
+ language = 'unknown'
73
+ task_size = 1
74
+ task_complexity = 'medium'
75
+
76
+ estimated_time_per_task = time_estimator.get_estimated_time_for_task(language, task_complexity)
77
+ estimated_total_time = estimated_time_per_task * task_size
78
+
79
+ queue_info = self.get_queue_status()
80
+
81
+ task_id = str(uuid.uuid4())
82
+ request_time = time.time()
83
+
84
+ with self.lock:
85
+ self.task_status[task_id] = {
86
+ 'status': 'queued',
87
+ 'queued_time': request_time,
88
+ 'queue_position': self.task_queue.qsize() + 1,
89
+ 'synchronous': True,
90
+ 'estimated_factors': {
91
+ 'language': language,
92
+ 'size': task_size,
93
+ 'complexity': task_complexity
94
+ },
95
+ 'estimated_time': estimated_total_time
96
+ }
97
+
98
+ self.task_queue.put((task_id, input_data, request_time))
99
+
100
+ while True:
101
+ with self.lock:
102
+ if task_id in self.task_status:
103
+ status = self.task_status[task_id]['status']
104
+ if status == 'completed':
105
+ result = self.task_status[task_id]['result']
106
+ self.task_status.pop(task_id, None)
107
+ return result
108
+ elif status == 'error':
109
+ error = self.task_status[task_id].get('error', 'Unknown error')
110
+ self.task_status.pop(task_id, None)
111
+ return {"status": "Exception", "error": error}
112
+
113
+ time.sleep(0.1)
114
+
115
+ def check_status(self, task_id):
116
+ """Check task status"""
117
+ with self.lock:
118
+ if task_id not in self.task_status:
119
+ return {'status': 'not_found'}
120
+
121
+ status_info = self.task_status[task_id].copy()
122
+
123
+ if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600:
124
+ self.task_status.pop(task_id, None)
125
+
126
+ return status_info
127
+
128
+ def get_queue_status(self):
129
+ """Get queue status"""
130
+ with self.lock:
131
+ queued_tasks = [t for t in self.task_status.values() if t['status'] == 'queued']
132
+ processing_tasks = [t for t in self.task_status.values() if t['status'] == 'processing']
133
+
134
+ queue_size = self.task_queue.qsize()
135
+ active_tasks = len(processing_tasks)
136
+ waiting_tasks = len(queued_tasks)
137
+
138
+ remaining_processing_time = 0
139
+ for task in processing_tasks:
140
+ if 'start_time' in task and 'estimated_time' in task:
141
+ elapsed = time.time() - task['start_time']
142
+ remaining = max(0, task['estimated_time'] - elapsed)
143
+ remaining_processing_time += remaining
144
+ else:
145
+ remaining_processing_time += 2
146
+
147
+ if active_tasks > 0:
148
+ remaining_processing_time = remaining_processing_time / min(active_tasks, self.worker_threads)
149
+
150
+ queued_processing_time = 0
151
+ for task in queued_tasks:
152
+ if 'estimated_time' in task:
153
+ queued_processing_time += task['estimated_time']
154
+ else:
155
+ queued_processing_time += 5
156
+
157
+ if self.worker_threads > 0 and queued_processing_time > 0:
158
+ queued_processing_time = queued_processing_time / self.worker_threads
159
+
160
+ estimated_wait = remaining_processing_time + queued_processing_time
161
+
162
+ if self.task_history:
163
+ prediction_ratios = []
164
+ for task in self.task_history:
165
+ if 'factors' in task and 'estimated_time' in task:
166
+ prediction_ratios.append(task['process_time'] / task['estimated_time'])
167
+
168
+ if prediction_ratios:
169
+ correction_factor = np.median(prediction_ratios)
170
+ correction_factor = max(0.5, min(2.0, correction_factor))
171
+ estimated_wait *= correction_factor
172
+
173
+ estimated_wait = max(0.1, estimated_wait)
174
+ if waiting_tasks == 0 and active_tasks == 0:
175
+ estimated_wait = 0
176
+
177
+ recent_tasks = self.task_history[-5:] if self.task_history else []
178
+
179
+ return {
180
+ 'queue_size': queue_size,
181
+ 'active_tasks': active_tasks,
182
+ 'waiting_tasks': waiting_tasks,
183
+ 'worker_threads': self.worker_threads,
184
+ 'estimated_wait': estimated_wait,
185
+ 'recent_tasks': recent_tasks
186
+ }
187
+
188
+ def update_task_type_times(self, task_id, process_time):
189
+ """Update task type processing times for estimation"""
190
+ if task_id in self.task_status and 'estimated_factors' in self.task_status[task_id]:
191
+ factors = self.task_status[task_id]['estimated_factors']
192
+ key = f"{factors['language']}_{factors['complexity']}"
193
+
194
+ if key not in self.task_type_times:
195
+ self.task_type_times[key] = []
196
+
197
+ self.task_type_times[key].append(process_time / factors['size'])
198
+ if len(self.task_type_times[key]) > 10:
199
+ self.task_type_times[key] = self.task_type_times[key][-10:]
src/ui/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # ui module
src/ui/dashboard.py ADDED
@@ -0,0 +1,227 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+
3
+ class Dashboard:
4
+ def __init__(self, task_queue, time_estimator):
5
+ """Initialize dashboard"""
6
+ self.task_queue = task_queue
7
+ self.time_estimator = time_estimator
8
+ self.css = self.get_custom_css()
9
+
10
+ def ui_get_queue_info(self):
11
+ """Get queue info for UI"""
12
+ queue_info = self.task_queue.get_queue_status()
13
+
14
+ tasks_html = ""
15
+ for task in reversed(queue_info['recent_tasks']):
16
+ tasks_html += f"""
17
+ <tr>
18
+ <td>{task['task_id'][:8]}...</td>
19
+ <td>{datetime.fromtimestamp(task['request_time']).strftime('%H:%M:%S')}</td>
20
+ <td>{self.time_estimator.format_time(task['process_time'])}</td>
21
+ </tr>
22
+ """
23
+
24
+ if not tasks_html:
25
+ tasks_html = """
26
+ <tr>
27
+ <td colspan="3" style="text-align: center; padding: 20px;">No historical tasks</td>
28
+ </tr>
29
+ """
30
+
31
+ return f"""
32
+ <div class="dashboard">
33
+ <div class="queue-info-card main-card">
34
+ <h3 class="card-title">Queue Status Monitor</h3>
35
+ <div class="queue-stats">
36
+ <div class="stat-item">
37
+ <div class="stat-value">{queue_info['waiting_tasks']}</div>
38
+ <div class="stat-label">Waiting</div>
39
+ </div>
40
+ <div class="stat-item">
41
+ <div class="stat-value">{queue_info['active_tasks']}</div>
42
+ <div class="stat-label">Processing</div>
43
+ </div>
44
+ <div class="stat-item">
45
+ <div class="stat-value">{queue_info['worker_threads']}</div>
46
+ <div class="stat-label">Worker Threads</div>
47
+ </div>
48
+ </div>
49
+
50
+ <div class="wait-time">
51
+ <p><b>Current Estimated Wait Time:</b> {self.time_estimator.format_time(queue_info['estimated_wait'])}</p>
52
+ <p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p>
53
+ </div>
54
+ </div>
55
+
56
+ <div class="queue-info-card history-card">
57
+ <h3 class="card-title">Recently Processed Tasks</h3>
58
+ <table class="recent-tasks">
59
+ <thead>
60
+ <tr>
61
+ <th>Task ID</th>
62
+ <th>Request Time</th>
63
+ <th>Processing Time</th>
64
+ </tr>
65
+ </thead>
66
+ <tbody>
67
+ {tasks_html}
68
+ </tbody>
69
+ </table>
70
+ </div>
71
+ </div>
72
+ """
73
+
74
+ def get_custom_css(self):
75
+ """Get custom CSS for UI"""
76
+ return """
77
+ .container {
78
+ max-width: 1200px;
79
+ margin: 0 auto;
80
+ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
81
+ }
82
+
83
+ .dashboard {
84
+ display: flex;
85
+ flex-direction: column;
86
+ gap: 20px;
87
+ }
88
+
89
+ .card-title {
90
+ color: #333;
91
+ border-bottom: 2px solid #ddd;
92
+ padding-bottom: 10px;
93
+ margin-top: 0;
94
+ }
95
+
96
+ .status-card, .queue-info-card {
97
+ background: #fff;
98
+ border-radius: 12px;
99
+ padding: 20px;
100
+ margin: 10px 0;
101
+ box-shadow: 0 4px 15px rgba(0,0,0,0.08);
102
+ }
103
+
104
+ .main-card {
105
+ border-top: 5px solid #4285f4;
106
+ }
107
+
108
+ .history-card {
109
+ border-top: 5px solid #34a853;
110
+ }
111
+
112
+ .status-card.success {
113
+ background: #e7f5e7;
114
+ border-left: 5px solid #28a745;
115
+ }
116
+
117
+ .status-card.error {
118
+ background: #f8d7da;
119
+ border-left: 5px solid #dc3545;
120
+ }
121
+
122
+ .error-message {
123
+ color: #dc3545;
124
+ font-weight: bold;
125
+ padding: 10px;
126
+ background: #f8d7da;
127
+ border-radius: 5px;
128
+ }
129
+
130
+ .notice {
131
+ color: #0c5460;
132
+ background-color: #d1ecf1;
133
+ padding: 10px;
134
+ border-radius: 5px;
135
+ }
136
+
137
+ .queue-stats {
138
+ display: flex;
139
+ justify-content: space-around;
140
+ margin: 20px 0;
141
+ }
142
+
143
+ .stat-item {
144
+ text-align: center;
145
+ padding: 15px;
146
+ background: #f8f9fa;
147
+ border-radius: 10px;
148
+ min-width: 120px;
149
+ transition: transform 0.3s ease;
150
+ }
151
+
152
+ .stat-item:hover {
153
+ transform: translateY(-5px);
154
+ box-shadow: 0 5px 15px rgba(0,0,0,0.1);
155
+ }
156
+
157
+ .stat-value {
158
+ font-size: 32px;
159
+ font-weight: bold;
160
+ color: #4285f4;
161
+ margin-bottom: 5px;
162
+ }
163
+
164
+ .stat-label {
165
+ color: #5f6368;
166
+ font-size: 16px;
167
+ }
168
+
169
+ .wait-time {
170
+ text-align: center;
171
+ margin: 20px 0;
172
+ padding: 15px;
173
+ background: #f1f3f4;
174
+ border-radius: 8px;
175
+ font-size: 18px;
176
+ }
177
+
178
+ .last-update {
179
+ color: #80868b;
180
+ margin-top: 10px;
181
+ margin-bottom: 0;
182
+ }
183
+
184
+ .recent-tasks {
185
+ width: 100%;
186
+ border-collapse: collapse;
187
+ margin-top: 15px;
188
+ background: white;
189
+ box-shadow: 0 1px 3px rgba(0,0,0,0.05);
190
+ }
191
+
192
+ .recent-tasks th, .recent-tasks td {
193
+ border: 1px solid #e0e0e0;
194
+ padding: 12px 15px;
195
+ text-align: center;
196
+ }
197
+
198
+ .recent-tasks th {
199
+ background-color: #f1f3f4;
200
+ color: #202124;
201
+ font-weight: 500;
202
+ }
203
+
204
+ .recent-tasks tbody tr:hover {
205
+ background-color: #f8f9fa;
206
+ }
207
+
208
+ .tabs {
209
+ margin-top: 20px;
210
+ }
211
+
212
+ button.primary {
213
+ background-color: #4285f4;
214
+ color: white;
215
+ padding: 10px 20px;
216
+ border: none;
217
+ border-radius: 4px;
218
+ cursor: pointer;
219
+ font-size: 16px;
220
+ font-weight: 500;
221
+ transition: background-color 0.3s;
222
+ }
223
+
224
+ button.primary:hover {
225
+ background-color: #3367d6;
226
+ }
227
+ """
src/utils/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+
src/utils/helpers.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ from pathlib import Path
4
+
5
+ def setup_environment():
6
+ """Setup environment for the application"""
7
+ # Add current directory and src directory to module search path
8
+ current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9
+ src_dir = os.path.join(current_dir, "src")
10
+ if current_dir not in sys.path:
11
+ sys.path.append(current_dir)
12
+ if src_dir not in sys.path:
13
+ sys.path.append(src_dir)
14
+ return current_dir, src_dir