docker_test / app.py
朱东升
update30
0e2f5b3
raw
history blame
3.12 kB
import gradio as gr
import json
import importlib
import os
import sys
from pathlib import Path
import concurrent.futures
import multiprocessing
import time
import threading
import queue
import uuid
import numpy as np
from datetime import datetime
from tqdm.auto import tqdm
# Import modules
from src.utils.helpers import setup_environment
from src.evaluation.evaluator import evaluate
from src.queue.task_queue import TaskQueue
from src.queue.queue_processor import QueueProcessor
from src.estimation.time_estimator import TimeEstimator
from src.ui.dashboard import Dashboard
# Setup environment
setup_environment()
# Create evaluator
class Evaluator:
def __init__(self):
"""Initialize evaluator"""
pass
def evaluate(self, input_data):
"""Evaluate code"""
return evaluate(input_data)
# Initialize components
evaluator = Evaluator()
task_queue = TaskQueue(worker_threads=max(1, multiprocessing.cpu_count() // 2))
time_estimator = TimeEstimator(task_queue=task_queue)
queue_processor = QueueProcessor(task_queue, evaluator, time_estimator)
dashboard = Dashboard(task_queue, time_estimator)
# Launch workers
queue_processor.launch_workers()
# Function to handle synchronous evaluation (compatible with original interface)
def synchronous_evaluate(input_data):
"""Synchronous evaluation function"""
return task_queue.synchronous_evaluate(input_data, time_estimator, evaluator)
# Function to handle asynchronous task enqueuing
def enqueue_task(input_data):
"""Add task to queue"""
return task_queue.enqueue_task(input_data, time_estimator)
# Function to check task status
def check_status(task_id):
"""Check task status"""
return task_queue.check_status(task_id)
# Function to get queue status
def get_queue_status():
"""Get queue status"""
return task_queue.get_queue_status()
# Create Gradio interface
with gr.Blocks(css=dashboard.css) as demo:
gr.Markdown("# Code Evaluation Service")
gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests")
with gr.Row():
with gr.Column(scale=3):
# Queue status info card
queue_info_html = gr.HTML()
refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary")
# Hidden API interface components
with gr.Row(visible=False):
api_input = gr.JSON()
api_output = gr.JSON()
# Define update function
def update_queue_info():
return dashboard.ui_get_queue_info()
# Update queue info periodically
demo.load(update_queue_info, None, queue_info_html, every=3)
# Refresh button event
refresh_queue_btn.click(update_queue_info, None, queue_info_html)
# Add evaluation endpoint compatible with original interface
demo.queue()
evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate")
if __name__ == "__main__":
try:
demo.launch()
finally:
# Stop worker threads
queue_processor.stop_workers()