Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import importlib | |
import os | |
import sys | |
from pathlib import Path | |
import concurrent.futures | |
import multiprocessing | |
import time | |
import threading | |
import queue | |
import uuid | |
import numpy as np | |
from datetime import datetime | |
from tqdm.auto import tqdm | |
# Import modules | |
from src.utils.helpers import setup_environment | |
from src.evaluation.evaluator import evaluate | |
from src.queue.task_queue import TaskQueue | |
from src.queue.queue_processor import QueueProcessor | |
from src.estimation.time_estimator import TimeEstimator | |
from src.ui.dashboard import Dashboard | |
# Setup environment | |
setup_environment() | |
# Create evaluator | |
class Evaluator: | |
def __init__(self): | |
"""Initialize evaluator""" | |
pass | |
def evaluate(self, input_data): | |
"""Evaluate code""" | |
return evaluate(input_data) | |
# Initialize components | |
evaluator = Evaluator() | |
task_queue = TaskQueue(worker_threads=max(1, multiprocessing.cpu_count() // 2)) | |
time_estimator = TimeEstimator(task_queue=task_queue) | |
queue_processor = QueueProcessor(task_queue, evaluator, time_estimator) | |
dashboard = Dashboard(task_queue, time_estimator) | |
# Launch workers | |
queue_processor.launch_workers() | |
# Function to handle synchronous evaluation (compatible with original interface) | |
def synchronous_evaluate(input_data): | |
"""Synchronous evaluation function""" | |
return task_queue.synchronous_evaluate(input_data, time_estimator, evaluator) | |
# Function to handle asynchronous task enqueuing | |
def enqueue_task(input_data): | |
"""Add task to queue""" | |
return task_queue.enqueue_task(input_data, time_estimator) | |
# Function to check task status | |
def check_status(task_id): | |
"""Check task status""" | |
return task_queue.check_status(task_id) | |
# Function to get queue status | |
def get_queue_status(): | |
"""Get queue status""" | |
return task_queue.get_queue_status() | |
# Create Gradio interface | |
with gr.Blocks(css=dashboard.css) as demo: | |
gr.Markdown("# Code Evaluation Service") | |
gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
# Queue status info card | |
queue_info_html = gr.HTML() | |
refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary") | |
# Hidden API interface components | |
with gr.Row(visible=False): | |
api_input = gr.JSON() | |
api_output = gr.JSON() | |
# Define update function | |
def update_queue_info(): | |
return dashboard.ui_get_queue_info() | |
# Update queue info periodically | |
demo.load(update_queue_info, None, queue_info_html, every=3) | |
# Refresh button event | |
refresh_queue_btn.click(update_queue_info, None, queue_info_html) | |
# Add evaluation endpoint compatible with original interface | |
demo.queue() | |
evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate") | |
if __name__ == "__main__": | |
try: | |
demo.launch() | |
finally: | |
# Stop worker threads | |
queue_processor.stop_workers() |