docker_test / src /queue_manager.py
3v324v23's picture
new1
dd10f90
raw
history blame
6.75 kB
import queue
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import uuid
@dataclass
class QueueItem:
"""代表队列中的一个评估请求"""
id: str # 唯一标识符
input_data: Dict[str, Any] # 输入数据
status: str = "queued" # 状态:queued, processing, completed, error
result: Optional[Dict[str, Any]] = None # 评估结果
created_at: float = field(default_factory=time.time) # 创建时间
started_at: Optional[float] = None # 开始处理时间
completed_at: Optional[float] = None # 完成时间
class QueueManager:
"""管理代码评估请求队列"""
def __init__(self):
self.queue = queue.Queue() # 请求队列
self.queue_items = {} # 存储所有队列项,以ID为键
self.processing_thread = None # 处理线程
self.running = False # 是否正在运行
self.evaluator = None # 评估器,将在开始时设置
def set_evaluator(self, evaluator_func):
"""设置评估函数
Args:
evaluator_func: 评估函数,接受输入数据并返回结果
"""
self.evaluator = evaluator_func
def enqueue(self, input_data):
"""添加新的评估请求到队列
Args:
input_data: 输入数据,包含要评估的代码
Returns:
str: 请求的唯一ID
"""
item_id = str(uuid.uuid4())
queue_item = QueueItem(id=item_id, input_data=input_data)
self.queue_items[item_id] = queue_item
self.queue.put(item_id)
# 如果处理线程未运行,则启动它
if not self.running:
self.start_processing()
return item_id
def start_processing(self):
"""启动队列处理线程"""
if self.processing_thread is None or not self.processing_thread.is_alive():
self.running = True
self.processing_thread = threading.Thread(target=self._process_queue)
self.processing_thread.daemon = True
self.processing_thread.start()
def stop_processing(self):
"""停止队列处理"""
self.running = False
if self.processing_thread and self.processing_thread.is_alive():
self.processing_thread.join(timeout=1.0)
def _process_queue(self):
"""处理队列中的请求(在单独的线程中运行)"""
while self.running:
try:
# 尝试从队列获取一个项目,如果队列为空,等待1秒后重试
try:
item_id = self.queue.get(timeout=1.0)
except queue.Empty:
continue
# 获取队列项并更新状态
queue_item = self.queue_items[item_id]
queue_item.status = "processing"
queue_item.started_at = time.time()
# 处理请求
try:
if self.evaluator:
result = self.evaluator(queue_item.input_data)
queue_item.result = result
queue_item.status = "completed"
else:
queue_item.status = "error"
queue_item.result = {"status": "Error", "error": "No evaluator function set"}
except Exception as e:
queue_item.status = "error"
queue_item.result = {"status": "Exception", "error": str(e)}
# 更新完成时间
queue_item.completed_at = time.time()
# 标记任务为完成
self.queue.task_done()
except Exception as e:
print(f"Error in queue processing: {e}")
time.sleep(1) # 防止过度CPU使用
def get_queue_status(self):
"""获取队列状态信息
Returns:
dict: 包含队列状态的字典
"""
queued = 0
processing = 0
completed = 0
error = 0
# 统计各状态的数量
for item in self.queue_items.values():
if item.status == "queued":
queued += 1
elif item.status == "processing":
processing += 1
elif item.status == "completed":
completed += 1
elif item.status == "error":
error += 1
# 获取队列中的项目,仅包括等待和处理中的
queue_items = []
for item in self.queue_items.values():
if item.status in ["queued", "processing"]:
queue_items.append({
"id": item.id,
"status": item.status,
"created_at": item.created_at,
"started_at": item.started_at,
})
# 按创建时间排序
queue_items.sort(key=lambda x: x["created_at"])
return {
"stats": {
"queued": queued,
"processing": processing,
"completed": completed,
"error": error,
"total": len(self.queue_items)
},
"queue_items": queue_items
}
def get_result(self, item_id):
"""获取特定请求的结果
Args:
item_id: 请求的唯一ID
Returns:
dict: 请求的状态和结果
"""
if item_id in self.queue_items:
item = self.queue_items[item_id]
return {
"id": item.id,
"status": item.status,
"result": item.result,
"created_at": item.created_at,
"started_at": item.started_at,
"completed_at": item.completed_at
}
return {"status": "not_found", "error": "Request ID not found"}
def clean_completed(self, max_age=3600):
"""清理完成的请求
Args:
max_age: 最大保留时间(秒),默认为1小时
"""
current_time = time.time()
to_remove = []
for item_id, item in self.queue_items.items():
if item.status in ["completed", "error"]:
if item.completed_at and (current_time - item.completed_at) > max_age:
to_remove.append(item_id)
for item_id in to_remove:
del self.queue_items[item_id]