Spaces:
Sleeping
Sleeping
File size: 6,753 Bytes
dd10f90 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import queue
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import uuid
@dataclass
class QueueItem:
"""代表队列中的一个评估请求"""
id: str # 唯一标识符
input_data: Dict[str, Any] # 输入数据
status: str = "queued" # 状态:queued, processing, completed, error
result: Optional[Dict[str, Any]] = None # 评估结果
created_at: float = field(default_factory=time.time) # 创建时间
started_at: Optional[float] = None # 开始处理时间
completed_at: Optional[float] = None # 完成时间
class QueueManager:
"""管理代码评估请求队列"""
def __init__(self):
self.queue = queue.Queue() # 请求队列
self.queue_items = {} # 存储所有队列项,以ID为键
self.processing_thread = None # 处理线程
self.running = False # 是否正在运行
self.evaluator = None # 评估器,将在开始时设置
def set_evaluator(self, evaluator_func):
"""设置评估函数
Args:
evaluator_func: 评估函数,接受输入数据并返回结果
"""
self.evaluator = evaluator_func
def enqueue(self, input_data):
"""添加新的评估请求到队列
Args:
input_data: 输入数据,包含要评估的代码
Returns:
str: 请求的唯一ID
"""
item_id = str(uuid.uuid4())
queue_item = QueueItem(id=item_id, input_data=input_data)
self.queue_items[item_id] = queue_item
self.queue.put(item_id)
# 如果处理线程未运行,则启动它
if not self.running:
self.start_processing()
return item_id
def start_processing(self):
"""启动队列处理线程"""
if self.processing_thread is None or not self.processing_thread.is_alive():
self.running = True
self.processing_thread = threading.Thread(target=self._process_queue)
self.processing_thread.daemon = True
self.processing_thread.start()
def stop_processing(self):
"""停止队列处理"""
self.running = False
if self.processing_thread and self.processing_thread.is_alive():
self.processing_thread.join(timeout=1.0)
def _process_queue(self):
"""处理队列中的请求(在单独的线程中运行)"""
while self.running:
try:
# 尝试从队列获取一个项目,如果队列为空,等待1秒后重试
try:
item_id = self.queue.get(timeout=1.0)
except queue.Empty:
continue
# 获取队列项并更新状态
queue_item = self.queue_items[item_id]
queue_item.status = "processing"
queue_item.started_at = time.time()
# 处理请求
try:
if self.evaluator:
result = self.evaluator(queue_item.input_data)
queue_item.result = result
queue_item.status = "completed"
else:
queue_item.status = "error"
queue_item.result = {"status": "Error", "error": "No evaluator function set"}
except Exception as e:
queue_item.status = "error"
queue_item.result = {"status": "Exception", "error": str(e)}
# 更新完成时间
queue_item.completed_at = time.time()
# 标记任务为完成
self.queue.task_done()
except Exception as e:
print(f"Error in queue processing: {e}")
time.sleep(1) # 防止过度CPU使用
def get_queue_status(self):
"""获取队列状态信息
Returns:
dict: 包含队列状态的字典
"""
queued = 0
processing = 0
completed = 0
error = 0
# 统计各状态的数量
for item in self.queue_items.values():
if item.status == "queued":
queued += 1
elif item.status == "processing":
processing += 1
elif item.status == "completed":
completed += 1
elif item.status == "error":
error += 1
# 获取队列中的项目,仅包括等待和处理中的
queue_items = []
for item in self.queue_items.values():
if item.status in ["queued", "processing"]:
queue_items.append({
"id": item.id,
"status": item.status,
"created_at": item.created_at,
"started_at": item.started_at,
})
# 按创建时间排序
queue_items.sort(key=lambda x: x["created_at"])
return {
"stats": {
"queued": queued,
"processing": processing,
"completed": completed,
"error": error,
"total": len(self.queue_items)
},
"queue_items": queue_items
}
def get_result(self, item_id):
"""获取特定请求的结果
Args:
item_id: 请求的唯一ID
Returns:
dict: 请求的状态和结果
"""
if item_id in self.queue_items:
item = self.queue_items[item_id]
return {
"id": item.id,
"status": item.status,
"result": item.result,
"created_at": item.created_at,
"started_at": item.started_at,
"completed_at": item.completed_at
}
return {"status": "not_found", "error": "Request ID not found"}
def clean_completed(self, max_age=3600):
"""清理完成的请求
Args:
max_age: 最大保留时间(秒),默认为1小时
"""
current_time = time.time()
to_remove = []
for item_id, item in self.queue_items.items():
if item.status in ["completed", "error"]:
if item.completed_at and (current_time - item.completed_at) > max_age:
to_remove.append(item_id)
for item_id in to_remove:
del self.queue_items[item_id] |