Spaces:
Sleeping
Sleeping
import queue | |
import threading | |
import time | |
from dataclasses import dataclass, field | |
from typing import Dict, List, Any, Optional | |
import uuid | |
class QueueItem: | |
"""代表队列中的一个评估请求""" | |
id: str # 唯一标识符 | |
input_data: Dict[str, Any] # 输入数据 | |
status: str = "queued" # 状态:queued, processing, completed, error | |
result: Optional[Dict[str, Any]] = None # 评估结果 | |
created_at: float = field(default_factory=time.time) # 创建时间 | |
started_at: Optional[float] = None # 开始处理时间 | |
completed_at: Optional[float] = None # 完成时间 | |
class QueueManager: | |
"""管理代码评估请求队列""" | |
def __init__(self): | |
self.queue = queue.Queue() # 请求队列 | |
self.queue_items = {} # 存储所有队列项,以ID为键 | |
self.processing_thread = None # 处理线程 | |
self.running = False # 是否正在运行 | |
self.evaluator = None # 评估器,将在开始时设置 | |
def set_evaluator(self, evaluator_func): | |
"""设置评估函数 | |
Args: | |
evaluator_func: 评估函数,接受输入数据并返回结果 | |
""" | |
self.evaluator = evaluator_func | |
def enqueue(self, input_data): | |
"""添加新的评估请求到队列 | |
Args: | |
input_data: 输入数据,包含要评估的代码 | |
Returns: | |
str: 请求的唯一ID | |
""" | |
item_id = str(uuid.uuid4()) | |
queue_item = QueueItem(id=item_id, input_data=input_data) | |
self.queue_items[item_id] = queue_item | |
self.queue.put(item_id) | |
# 如果处理线程未运行,则启动它 | |
if not self.running: | |
self.start_processing() | |
return item_id | |
def start_processing(self): | |
"""启动队列处理线程""" | |
if self.processing_thread is None or not self.processing_thread.is_alive(): | |
self.running = True | |
self.processing_thread = threading.Thread(target=self._process_queue) | |
self.processing_thread.daemon = True | |
self.processing_thread.start() | |
def stop_processing(self): | |
"""停止队列处理""" | |
self.running = False | |
if self.processing_thread and self.processing_thread.is_alive(): | |
self.processing_thread.join(timeout=1.0) | |
def _process_queue(self): | |
"""处理队列中的请求(在单独的线程中运行)""" | |
while self.running: | |
try: | |
# 尝试从队列获取一个项目,如果队列为空,等待1秒后重试 | |
try: | |
item_id = self.queue.get(timeout=1.0) | |
except queue.Empty: | |
continue | |
# 获取队列项并更新状态 | |
queue_item = self.queue_items[item_id] | |
queue_item.status = "processing" | |
queue_item.started_at = time.time() | |
# 处理请求 | |
try: | |
if self.evaluator: | |
result = self.evaluator(queue_item.input_data) | |
queue_item.result = result | |
queue_item.status = "completed" | |
else: | |
queue_item.status = "error" | |
queue_item.result = {"status": "Error", "error": "No evaluator function set"} | |
except Exception as e: | |
queue_item.status = "error" | |
queue_item.result = {"status": "Exception", "error": str(e)} | |
# 更新完成时间 | |
queue_item.completed_at = time.time() | |
# 标记任务为完成 | |
self.queue.task_done() | |
except Exception as e: | |
print(f"Error in queue processing: {e}") | |
time.sleep(1) # 防止过度CPU使用 | |
def get_queue_status(self): | |
"""获取队列状态信息 | |
Returns: | |
dict: 包含队列状态的字典 | |
""" | |
queued = 0 | |
processing = 0 | |
completed = 0 | |
error = 0 | |
# 统计各状态的数量 | |
for item in self.queue_items.values(): | |
if item.status == "queued": | |
queued += 1 | |
elif item.status == "processing": | |
processing += 1 | |
elif item.status == "completed": | |
completed += 1 | |
elif item.status == "error": | |
error += 1 | |
# 获取队列中的项目,仅包括等待和处理中的 | |
queue_items = [] | |
for item in self.queue_items.values(): | |
if item.status in ["queued", "processing"]: | |
queue_items.append({ | |
"id": item.id, | |
"status": item.status, | |
"created_at": item.created_at, | |
"started_at": item.started_at, | |
}) | |
# 按创建时间排序 | |
queue_items.sort(key=lambda x: x["created_at"]) | |
return { | |
"stats": { | |
"queued": queued, | |
"processing": processing, | |
"completed": completed, | |
"error": error, | |
"total": len(self.queue_items) | |
}, | |
"queue_items": queue_items | |
} | |
def get_result(self, item_id): | |
"""获取特定请求的结果 | |
Args: | |
item_id: 请求的唯一ID | |
Returns: | |
dict: 请求的状态和结果 | |
""" | |
if item_id in self.queue_items: | |
item = self.queue_items[item_id] | |
return { | |
"id": item.id, | |
"status": item.status, | |
"result": item.result, | |
"created_at": item.created_at, | |
"started_at": item.started_at, | |
"completed_at": item.completed_at | |
} | |
return {"status": "not_found", "error": "Request ID not found"} | |
def clean_completed(self, max_age=3600): | |
"""清理完成的请求 | |
Args: | |
max_age: 最大保留时间(秒),默认为1小时 | |
""" | |
current_time = time.time() | |
to_remove = [] | |
for item_id, item in self.queue_items.items(): | |
if item.status in ["completed", "error"]: | |
if item.completed_at and (current_time - item.completed_at) > max_age: | |
to_remove.append(item_id) | |
for item_id in to_remove: | |
del self.queue_items[item_id] |