File size: 6,753 Bytes
dd10f90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import queue
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import uuid


@dataclass
class QueueItem:
    """代表队列中的一个评估请求"""
    id: str  # 唯一标识符
    input_data: Dict[str, Any]  # 输入数据
    status: str = "queued"  # 状态:queued, processing, completed, error
    result: Optional[Dict[str, Any]] = None  # 评估结果
    created_at: float = field(default_factory=time.time)  # 创建时间
    started_at: Optional[float] = None  # 开始处理时间
    completed_at: Optional[float] = None  # 完成时间


class QueueManager:
    """管理代码评估请求队列"""
    
    def __init__(self):
        self.queue = queue.Queue()  # 请求队列
        self.queue_items = {}  # 存储所有队列项,以ID为键
        self.processing_thread = None  # 处理线程
        self.running = False  # 是否正在运行
        self.evaluator = None  # 评估器,将在开始时设置
    
    def set_evaluator(self, evaluator_func):
        """设置评估函数
        
        Args:
            evaluator_func: 评估函数,接受输入数据并返回结果
        """
        self.evaluator = evaluator_func
    
    def enqueue(self, input_data):
        """添加新的评估请求到队列
        
        Args:
            input_data: 输入数据,包含要评估的代码
            
        Returns:
            str: 请求的唯一ID
        """
        item_id = str(uuid.uuid4())
        queue_item = QueueItem(id=item_id, input_data=input_data)
        self.queue_items[item_id] = queue_item
        self.queue.put(item_id)
        
        # 如果处理线程未运行,则启动它
        if not self.running:
            self.start_processing()
            
        return item_id
    
    def start_processing(self):
        """启动队列处理线程"""
        if self.processing_thread is None or not self.processing_thread.is_alive():
            self.running = True
            self.processing_thread = threading.Thread(target=self._process_queue)
            self.processing_thread.daemon = True
            self.processing_thread.start()
    
    def stop_processing(self):
        """停止队列处理"""
        self.running = False
        if self.processing_thread and self.processing_thread.is_alive():
            self.processing_thread.join(timeout=1.0)
    
    def _process_queue(self):
        """处理队列中的请求(在单独的线程中运行)"""
        while self.running:
            try:
                # 尝试从队列获取一个项目,如果队列为空,等待1秒后重试
                try:
                    item_id = self.queue.get(timeout=1.0)
                except queue.Empty:
                    continue
                
                # 获取队列项并更新状态
                queue_item = self.queue_items[item_id]
                queue_item.status = "processing"
                queue_item.started_at = time.time()
                
                # 处理请求
                try:
                    if self.evaluator:
                        result = self.evaluator(queue_item.input_data)
                        queue_item.result = result
                        queue_item.status = "completed"
                    else:
                        queue_item.status = "error"
                        queue_item.result = {"status": "Error", "error": "No evaluator function set"}
                except Exception as e:
                    queue_item.status = "error"
                    queue_item.result = {"status": "Exception", "error": str(e)}
                
                # 更新完成时间
                queue_item.completed_at = time.time()
                
                # 标记任务为完成
                self.queue.task_done()
                
            except Exception as e:
                print(f"Error in queue processing: {e}")
                time.sleep(1)  # 防止过度CPU使用
    
    def get_queue_status(self):
        """获取队列状态信息
        
        Returns:
            dict: 包含队列状态的字典
        """
        queued = 0
        processing = 0
        completed = 0
        error = 0
        
        # 统计各状态的数量
        for item in self.queue_items.values():
            if item.status == "queued":
                queued += 1
            elif item.status == "processing":
                processing += 1
            elif item.status == "completed":
                completed += 1
            elif item.status == "error":
                error += 1
        
        # 获取队列中的项目,仅包括等待和处理中的
        queue_items = []
        for item in self.queue_items.values():
            if item.status in ["queued", "processing"]:
                queue_items.append({
                    "id": item.id,
                    "status": item.status,
                    "created_at": item.created_at,
                    "started_at": item.started_at,
                })
        
        # 按创建时间排序
        queue_items.sort(key=lambda x: x["created_at"])
        
        return {
            "stats": {
                "queued": queued,
                "processing": processing,
                "completed": completed,
                "error": error,
                "total": len(self.queue_items)
            },
            "queue_items": queue_items
        }
    
    def get_result(self, item_id):
        """获取特定请求的结果
        
        Args:
            item_id: 请求的唯一ID
            
        Returns:
            dict: 请求的状态和结果
        """
        if item_id in self.queue_items:
            item = self.queue_items[item_id]
            return {
                "id": item.id,
                "status": item.status,
                "result": item.result,
                "created_at": item.created_at,
                "started_at": item.started_at,
                "completed_at": item.completed_at
            }
        return {"status": "not_found", "error": "Request ID not found"}
    
    def clean_completed(self, max_age=3600):
        """清理完成的请求
        
        Args:
            max_age: 最大保留时间(秒),默认为1小时
        """
        current_time = time.time()
        to_remove = []
        
        for item_id, item in self.queue_items.items():
            if item.status in ["completed", "error"]:
                if item.completed_at and (current_time - item.completed_at) > max_age:
                    to_remove.append(item_id)
        
        for item_id in to_remove:
            del self.queue_items[item_id]