import gradio as gr import json import importlib import os import sys from pathlib import Path import concurrent.futures import multiprocessing import time import threading import queue import uuid import numpy as np from datetime import datetime from tqdm.auto import tqdm from src.containerized_eval import eval_string_script # Add current directory and src directory to module search path current_dir = os.path.dirname(os.path.abspath(__file__)) src_dir = os.path.join(current_dir, "src") if current_dir not in sys.path: sys.path.append(current_dir) if src_dir not in sys.path: sys.path.append(src_dir) # Create message queue task_queue = queue.Queue() # Dictionary to store task status task_status = {} # List to store task history, max 200 tasks task_history = [] # Lock for shared resources lock = threading.Lock() # Number of worker threads - set to 1 to process one task at a time worker_threads = 1 # Flag for running background threads running = True # Mapping from task type to processing time task_type_times = {} # Currently processing tasks counter processing_count = 0 # Available CPU cores for task processing available_cores = multiprocessing.cpu_count() # Task ID counter for debugging task_counter = 0 # Enable logging DEBUG_MODE = True def debug_log(message): """Log debug messages if debug mode is enabled""" if DEBUG_MODE: print(f"[DEBUG] {datetime.now().strftime('%H:%M:%S')} - {message}") def queue_processor(): """Process tasks in the queue""" global processing_count while running: try: # Only process if we're not already processing a task with lock: if processing_count >= worker_threads: # Already processing a task, wait and try again time.sleep(0.5) continue # Check queue size before attempting to get a task queue_size = task_queue.qsize() if queue_size > 0: debug_log(f"Queue processor found {queue_size} tasks waiting") else: # No tasks waiting, sleep briefly to avoid CPU spinning time.sleep(0.1) continue # Get a task from the queue with small timeout to prevent blocking try: task_id, input_data, request_time = task_queue.get(timeout=0.1) debug_log(f"Processing task {task_id}") except queue.Empty: continue # Increment processing count to track active tasks with lock: processing_count += 1 debug_log(f"Incremented processing count to {processing_count}") # Update task status if task_id in task_status: task_status[task_id]['status'] = 'processing' task_status[task_id]['start_time'] = time.time() debug_log(f"Updated existing task {task_id} to processing state") else: # Create task status entry if it doesn't exist task_status[task_id] = { 'status': 'processing', 'queued_time': request_time, 'start_time': time.time() } debug_log(f"Created new task status entry for {task_id}") if isinstance(input_data, list) and len(input_data) > 0: sample_task = input_data[0] language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' task_size = len(input_data) task_complexity = _estimate_task_complexity(input_data) with lock: task_status[task_id]['estimated_factors'] = { 'language': language, 'size': task_size, 'complexity': task_complexity } debug_log(f"Starting evaluation for task {task_id}") result = evaluate(input_data) debug_log(f"Finished evaluation for task {task_id}") end_time = time.time() process_time = end_time - task_status[task_id]['start_time'] with lock: # Decrease processing count now that we're done processing_count -= 1 debug_log(f"Decremented processing count to {processing_count}") # Update task status task_status[task_id]['status'] = 'completed' task_status[task_id]['result'] = result task_status[task_id]['end_time'] = end_time task_status[task_id]['process_time'] = process_time debug_log(f"Updated task {task_id} to completed state") if 'estimated_factors' in task_status[task_id]: factors = task_status[task_id]['estimated_factors'] key = f"{factors['language']}_{factors['complexity']}" if key not in task_type_times: task_type_times[key] = [] task_type_times[key].append(process_time / factors['size']) if len(task_type_times[key]) > 10: task_type_times[key] = task_type_times[key][-10:] task_history.append({ 'task_id': task_id, 'request_time': request_time, 'process_time': process_time, 'status': 'completed', 'factors': task_status[task_id].get('estimated_factors', {}) }) while len(task_history) > 200: task_history.pop(0) task_queue.task_done() debug_log(f"Task {task_id} completed and marked as done") except queue.Empty: # Use a small timeout to avoid CPU spinning time.sleep(0.1) except Exception as e: debug_log(f"Error in queue processor: {str(e)}") if 'task_id' in locals(): debug_log(f"Error occurred while processing task {task_id}") with lock: # Decrease processing count on error processing_count -= 1 debug_log(f"Decremented processing count to {processing_count} due to error") if task_id in task_status: task_status[task_id]['status'] = 'error' task_status[task_id]['error'] = str(e) task_status[task_id]['end_time'] = time.time() debug_log(f"Updated task {task_id} to error state") else: task_status[task_id] = { 'status': 'error', 'error': str(e), 'end_time': time.time() } debug_log(f"Created new error entry for task {task_id}") task_queue.task_done() def _estimate_task_complexity(tasks): """Estimate task complexity Returns: 'simple', 'medium', or 'complex' """ total_code_length = 0 count = 0 for task in tasks: if isinstance(task, dict): prompt = task.get('prompt', '') tests = task.get('tests', '') completions = task.get('processed_completions', []) code_length = len(prompt) + len(tests) if completions: code_length += sum(len(comp) for comp in completions) total_code_length += code_length count += 1 if count == 0: return 'medium' avg_length = total_code_length / count if avg_length < 1000: return 'simple' elif avg_length < 5000: return 'medium' else: return 'complex' def evaluate(input_data): """Main function for code evaluation""" try: if not isinstance(input_data, list): return {"status": "Exception", "error": "Input must be a list"} results = [] # Use all available cores for this single task but with a reasonable cap max_workers = max(1, min(available_cores // 2, 8)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} for future in concurrent.futures.as_completed(future_to_item): item = future_to_item[future] try: result = future.result() item.update(result) results.append(item) except Exception as e: item.update({"status": "Exception", "error": str(e)}) results.append(item) return results except Exception as e: return {"status": "Exception", "error": str(e)} def evaluate_single_case(input_data): """Evaluate a single code case""" try: if not isinstance(input_data, dict): return {"status": "Exception", "error": "Input item must be a dictionary"} language = input_data.get('language') completions = input_data.get('processed_completions', []) if not completions: return {"status": "Exception", "error": "No code provided"} # Use a retry mechanism for all languages for better reliability max_retries = 2 # One retry for all languages results = [] for comp in completions: code = input_data.get('prompt') + comp + '\n' + input_data.get('tests') # Try up to max_retries + 1 times for all test cases for attempt in range(max_retries + 1): result = evaluate_code(code, language) # If success or last attempt, return/record the result if result["status"] == "OK" or attempt == max_retries: if result["status"] == "OK": return result results.append(result) break # For retries, briefly wait to allow resources to stabilize time.sleep(0.3) return results[0] except Exception as e: return {"status": "Exception", "error": str(e)} def evaluate_code(code, language): """Evaluate code in a specific language""" try: result = eval_string_script(language, code) return result except Exception as e: return {"status": "Exception", "error": str(e)} def synchronous_evaluate(input_data): """Synchronously evaluate code, compatible with original interface""" debug_log(f"Received synchronous evaluation request") # Add metadata to identify sync requests if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict): if 'metadata' not in input_data[0]: input_data[0]['metadata'] = {} input_data[0]['metadata']['source'] = 'sync_api' # Create a task and queue it task_info = enqueue_task(input_data) task_id = task_info['task_id'] debug_log(f"Created task {task_id} for synchronous evaluation") # Ensure the task appears in the queue UI, add artificial delay if needed time.sleep(0.1) # Small delay to make sure the task is visible in queue # Wait for task to complete while True: with lock: if task_id in task_status: status = task_status[task_id]['status'] if status == 'completed': debug_log(f"Task {task_id} completed, returning result") result = task_status[task_id]['result'] # Keep the result in status for a short time to ensure it shows in history if 'end_time' not in task_status[task_id]: task_status[task_id]['end_time'] = time.time() elif time.time() - task_status[task_id]['end_time'] > 5: task_status.pop(task_id, None) return result elif status == 'error': debug_log(f"Task {task_id} failed with error") error = task_status[task_id].get('error', 'Unknown error') # Keep the error in status for a short time to ensure it shows in history if 'end_time' not in task_status[task_id]: task_status[task_id]['end_time'] = time.time() elif time.time() - task_status[task_id]['end_time'] > 5: task_status.pop(task_id, None) return {"status": "Exception", "error": error} else: debug_log(f"Task {task_id} still in status: {status}") time.sleep(0.1) def _get_estimated_time_for_task(language, complexity): """Get estimated processing time for a specific task type""" key = f"{language}_{complexity}" if key in task_type_times and len(task_type_times[key]) > 0: return np.median(task_type_times[key]) if complexity == 'simple': return 1.0 elif complexity == 'medium': return 3.0 else: # complex return 8.0 def enqueue_task(input_data): """Add task to queue""" global task_counter if isinstance(input_data, list) and len(input_data) > 0: sample_task = input_data[0] language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' task_size = len(input_data) task_complexity = _estimate_task_complexity(input_data) else: language = 'unknown' task_size = 1 task_complexity = 'medium' estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity) estimated_total_time = estimated_time_per_task * task_size # Generate task ID in a thread-safe way with lock: task_counter += 1 local_counter = task_counter task_id = f"task_{local_counter}_{str(uuid.uuid4())[:8]}" request_time = time.time() debug_log(f"Creating new task: {task_id}") # Track if this is a synchronous or asynchronous submission is_async = 'async_submission' in str(threading.current_thread().name).lower() or 'async' in input_data[0].get('metadata', {}).get('source', '') if isinstance(input_data, list) and input_data and isinstance(input_data[0], dict) and 'metadata' in input_data[0] else False # Get current queue status before adding to task_status with lock: # Count actual queue status - both in queue AND waiting in task_status current_queue_size = task_queue.qsize() actual_waiting = sum(1 for t in task_status.values() if t['status'] == 'queued') total_waiting = actual_waiting # Use the actual count from task_status debug_log(f"Current queue metrics: queue_size={current_queue_size}, task_status_waiting={actual_waiting}, total={total_waiting}") queue_position = total_waiting + 1 # Add to task_status with 'queued' status first task_status[task_id] = { 'status': 'queued', 'queued_time': request_time, 'queue_position': queue_position, 'is_async': is_async, 'estimated_factors': { 'language': language, 'size': task_size, 'complexity': task_complexity }, 'estimated_time': estimated_total_time } debug_log(f"Added task {task_id} to task_status with queue position {queue_position}") # Get queue info for wait time estimation queue_info = get_queue_status() est_wait = queue_info['estimated_wait'] debug_log(f"Estimated wait time for task {task_id}: {est_wait} seconds") # Add to the task queue - this must be done AFTER adding to task_status task_queue.put((task_id, input_data, request_time)) debug_log(f"Added task {task_id} to task_queue") # Count queued tasks in task_status after adding with lock: queued_count = sum(1 for t in task_status.values() if t['status'] == 'queued') processing_tasks = sum(1 for t in task_status.values() if t['status'] == 'processing') debug_log(f"Queue status after adding: {task_queue.qsize()} in queue, {queued_count} with 'queued' status, {processing_tasks} processing") # Display all task IDs currently in queue task_ids = [(k, v['status']) for k, v in task_status.items() if v['status'] in ('queued', 'processing')] if task_ids: debug_log(f"Current tasks: {task_ids}") return { 'task_id': task_id, 'status': 'queued', 'queue_position': task_status[task_id]['queue_position'], 'estimated_wait': est_wait, 'estimated_processing': estimated_total_time } def check_status(task_id): """Check task status""" with lock: if task_id not in task_status: return {'status': 'not_found'} status_info = task_status[task_id].copy() if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600: task_status.pop(task_id, None) return status_info def get_queue_status(): """Get queue status""" with lock: queued_tasks = [v for k, v in task_status.items() if v['status'] == 'queued'] processing_tasks = [v for k, v in task_status.items() if v['status'] == 'processing'] queue_size = task_queue.qsize() active_tasks = processing_count waiting_tasks = len(queued_tasks) debug_log(f"Queue status check: size={queue_size}, active={active_tasks}, waiting={waiting_tasks}") if waiting_tasks != queue_size and abs(waiting_tasks - queue_size) > 1: debug_log(f"WARNING: Queue size mismatch - task_queue has {queue_size} items but task_status has {waiting_tasks} queued items") debug_log(f"Queue status details: {len(queued_tasks)} queued tasks found in task_status") if queued_tasks: task_ids = [k for k, v in task_status.items() if v['status'] == 'queued'] debug_log(f"Queued task IDs: {task_ids}") # Calculate remaining processing time for active tasks remaining_processing_time = 0 for task in processing_tasks: if 'start_time' in task and 'estimated_time' in task: elapsed = time.time() - task['start_time'] remaining = max(0, task['estimated_time'] - elapsed) remaining_processing_time += remaining else: remaining_processing_time += 2 if active_tasks > 0: remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads) queued_processing_time = 0 for task in queued_tasks: if 'estimated_time' in task: queued_processing_time += task['estimated_time'] else: queued_processing_time += 5 if worker_threads > 0 and queued_processing_time > 0: queued_processing_time = queued_processing_time / worker_threads estimated_wait = remaining_processing_time + queued_processing_time if task_history: prediction_ratios = [] for task in task_history: if 'factors' in task and 'estimated_time' in task: prediction_ratios.append(task['process_time'] / task['estimated_time']) if prediction_ratios: correction_factor = np.median(prediction_ratios) correction_factor = max(0.5, min(2.0, correction_factor)) estimated_wait *= correction_factor estimated_wait = max(0.1, estimated_wait) if waiting_tasks == 0 and active_tasks == 0: estimated_wait = 0 recent_tasks = task_history[-5:] if task_history else [] return { 'queue_size': queue_size, 'active_tasks': active_tasks, 'waiting_tasks': waiting_tasks, 'worker_threads': worker_threads, 'estimated_wait': estimated_wait, 'recent_tasks': recent_tasks } def format_time(seconds): """Format time into readable format""" if seconds < 60: return f"{seconds:.1f} seconds" elif seconds < 3600: minutes = int(seconds / 60) seconds = seconds % 60 return f"{minutes}m {seconds:.1f}s" else: hours = int(seconds / 3600) minutes = int((seconds % 3600) / 60) return f"{hours}h {minutes}m" def ui_get_queue_info(): """Get queue info for UI""" queue_info = get_queue_status() # List queued tasks with details - make sure to use task_id as key queued_tasks_html = "" with lock: queued_tasks = [] for task_id, task in task_status.items(): if task['status'] == 'queued': task_with_id = task.copy() task_with_id['task_id'] = task_id queued_tasks.append(task_with_id) if queued_tasks: # Sort by queue position queued_tasks.sort(key=lambda x: x.get('queue_position', 999999)) queued_tasks_html = "
Currently {queue_info['waiting_tasks']} tasks in queue
Estimated wait time: {format_time(queue_info['estimated_wait'])}
{queued_tasks_html}Currently {queue_info['active_tasks']} tasks being processed
{processing_tasks_html}Queue: {queue_info['queue_size']} in queue, {queue_info['waiting_tasks']} waiting, {queue_info['active_tasks']} processing
Estimated Wait Time: {format_time(queue_info['estimated_wait'])}
{queue_details} {processing_details} {debug_details}Last update: {datetime.now().strftime('%H:%M:%S')}
Task ID | Request Time | Processing Time |
---|