nsarrazin HF Staff commited on
Commit
295d9d9
·
unverified ·
1 Parent(s): 21705b2

Move stats to their own job (#1324)

Browse files
src/hooks.server.ts CHANGED
@@ -9,12 +9,13 @@ import { sha256 } from "$lib/utils/sha256";
9
  import { addWeeks } from "date-fns";
10
  import { checkAndRunMigrations } from "$lib/migrations/migrations";
11
  import { building } from "$app/environment";
12
- import { refreshAssistantsCounts } from "$lib/assistantStats/refresh-assistants-counts";
13
  import { logger } from "$lib/server/logger";
14
  import { AbortedGenerations } from "$lib/server/abortedGenerations";
15
  import { MetricsServer } from "$lib/server/metrics";
16
  import { initExitHandler } from "$lib/server/exitHandler";
17
  import { ObjectId } from "mongodb";
 
 
18
 
19
  // TODO: move this code on a started server hook, instead of using a "building" flag
20
  if (!building) {
@@ -24,6 +25,7 @@ if (!building) {
24
  if (env.ENABLE_ASSISTANTS) {
25
  refreshAssistantsCounts();
26
  }
 
27
 
28
  // Init metrics server
29
  MetricsServer.getInstance();
 
9
  import { addWeeks } from "date-fns";
10
  import { checkAndRunMigrations } from "$lib/migrations/migrations";
11
  import { building } from "$app/environment";
 
12
  import { logger } from "$lib/server/logger";
13
  import { AbortedGenerations } from "$lib/server/abortedGenerations";
14
  import { MetricsServer } from "$lib/server/metrics";
15
  import { initExitHandler } from "$lib/server/exitHandler";
16
  import { ObjectId } from "mongodb";
17
+ import { refreshAssistantsCounts } from "$lib/jobs/refresh-assistants-counts";
18
+ import { refreshConversationStats } from "$lib/jobs/refresh-conversation-stats";
19
 
20
  // TODO: move this code on a started server hook, instead of using a "building" flag
21
  if (!building) {
 
25
  if (env.ENABLE_ASSISTANTS) {
26
  refreshAssistantsCounts();
27
  }
28
+ refreshConversationStats();
29
 
30
  // Init metrics server
31
  MetricsServer.getInstance();
src/lib/{assistantStats → jobs}/refresh-assistants-counts.ts RENAMED
File without changes
src/lib/jobs/refresh-conversation-stats.ts ADDED
@@ -0,0 +1,254 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import type { ConversationStats } from "$lib/types/ConversationStats";
2
+ import { CONVERSATION_STATS_COLLECTION, collections } from "$lib/server/database";
3
+ import { logger } from "$lib/server/logger";
4
+ import type { ObjectId } from "mongodb";
5
+ import { acquireLock, refreshLock } from "$lib/migrations/lock";
6
+
7
+ export async function computeAllStats() {
8
+ for (const span of ["day", "week", "month"] as const) {
9
+ computeStats({ dateField: "updatedAt", type: "conversation", span }).catch((e) =>
10
+ logger.error(e)
11
+ );
12
+ computeStats({ dateField: "createdAt", type: "conversation", span }).catch((e) =>
13
+ logger.error(e)
14
+ );
15
+ computeStats({ dateField: "createdAt", type: "message", span }).catch((e) => logger.error(e));
16
+ }
17
+ }
18
+
19
+ async function computeStats(params: {
20
+ dateField: ConversationStats["date"]["field"];
21
+ span: ConversationStats["date"]["span"];
22
+ type: ConversationStats["type"];
23
+ }) {
24
+ const lastComputed = await collections.conversationStats.findOne(
25
+ { "date.field": params.dateField, "date.span": params.span, type: params.type },
26
+ { sort: { "date.at": -1 } }
27
+ );
28
+
29
+ // If the last computed week is at the beginning of the last computed month, we need to include some days from the previous month
30
+ // In those cases we need to compute the stats from before the last month as everything is one aggregation
31
+ const minDate = lastComputed ? lastComputed.date.at : new Date(0);
32
+
33
+ logger.info(
34
+ { minDate, dateField: params.dateField, span: params.span, type: params.type },
35
+ "Computing conversation stats"
36
+ );
37
+
38
+ const dateField = params.type === "message" ? "messages." + params.dateField : params.dateField;
39
+
40
+ const pipeline = [
41
+ {
42
+ $match: {
43
+ [dateField]: { $gte: minDate },
44
+ },
45
+ },
46
+ {
47
+ $project: {
48
+ [dateField]: 1,
49
+ sessionId: 1,
50
+ userId: 1,
51
+ },
52
+ },
53
+ ...(params.type === "message"
54
+ ? [
55
+ {
56
+ $unwind: "$messages",
57
+ },
58
+ {
59
+ $match: {
60
+ [dateField]: { $gte: minDate },
61
+ },
62
+ },
63
+ ]
64
+ : []),
65
+ {
66
+ $sort: {
67
+ [dateField]: 1,
68
+ },
69
+ },
70
+ {
71
+ $facet: {
72
+ userId: [
73
+ {
74
+ $match: {
75
+ userId: { $exists: true },
76
+ },
77
+ },
78
+ {
79
+ $group: {
80
+ _id: {
81
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
82
+ userId: "$userId",
83
+ },
84
+ },
85
+ },
86
+ {
87
+ $group: {
88
+ _id: "$_id.at",
89
+ count: { $sum: 1 },
90
+ },
91
+ },
92
+ {
93
+ $project: {
94
+ _id: 0,
95
+ date: {
96
+ at: "$_id",
97
+ field: params.dateField,
98
+ span: params.span,
99
+ },
100
+ distinct: "userId",
101
+ count: 1,
102
+ },
103
+ },
104
+ ],
105
+ sessionId: [
106
+ {
107
+ $match: {
108
+ sessionId: { $exists: true },
109
+ },
110
+ },
111
+ {
112
+ $group: {
113
+ _id: {
114
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
115
+ sessionId: "$sessionId",
116
+ },
117
+ },
118
+ },
119
+ {
120
+ $group: {
121
+ _id: "$_id.at",
122
+ count: { $sum: 1 },
123
+ },
124
+ },
125
+ {
126
+ $project: {
127
+ _id: 0,
128
+ date: {
129
+ at: "$_id",
130
+ field: params.dateField,
131
+ span: params.span,
132
+ },
133
+ distinct: "sessionId",
134
+ count: 1,
135
+ },
136
+ },
137
+ ],
138
+ userOrSessionId: [
139
+ {
140
+ $group: {
141
+ _id: {
142
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
143
+ userOrSessionId: { $ifNull: ["$userId", "$sessionId"] },
144
+ },
145
+ },
146
+ },
147
+ {
148
+ $group: {
149
+ _id: "$_id.at",
150
+ count: { $sum: 1 },
151
+ },
152
+ },
153
+ {
154
+ $project: {
155
+ _id: 0,
156
+ date: {
157
+ at: "$_id",
158
+ field: params.dateField,
159
+ span: params.span,
160
+ },
161
+ distinct: "userOrSessionId",
162
+ count: 1,
163
+ },
164
+ },
165
+ ],
166
+ _id: [
167
+ {
168
+ $group: {
169
+ _id: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
170
+ count: { $sum: 1 },
171
+ },
172
+ },
173
+ {
174
+ $project: {
175
+ _id: 0,
176
+ date: {
177
+ at: "$_id",
178
+ field: params.dateField,
179
+ span: params.span,
180
+ },
181
+ distinct: "_id",
182
+ count: 1,
183
+ },
184
+ },
185
+ ],
186
+ },
187
+ },
188
+ {
189
+ $project: {
190
+ stats: {
191
+ $concatArrays: ["$userId", "$sessionId", "$userOrSessionId", "$_id"],
192
+ },
193
+ },
194
+ },
195
+ {
196
+ $unwind: "$stats",
197
+ },
198
+ {
199
+ $replaceRoot: {
200
+ newRoot: "$stats",
201
+ },
202
+ },
203
+ {
204
+ $set: {
205
+ type: params.type,
206
+ },
207
+ },
208
+ {
209
+ $merge: {
210
+ into: CONVERSATION_STATS_COLLECTION,
211
+ on: ["date.at", "type", "date.span", "date.field", "distinct"],
212
+ whenMatched: "replace",
213
+ whenNotMatched: "insert",
214
+ },
215
+ },
216
+ ];
217
+
218
+ await collections.conversations.aggregate(pipeline, { allowDiskUse: true }).next();
219
+
220
+ logger.info(
221
+ { minDate, dateField: params.dateField, span: params.span, type: params.type },
222
+ "Computed conversation stats"
223
+ );
224
+ }
225
+
226
+ const LOCK_KEY = "conversation.stats";
227
+
228
+ let hasLock = false;
229
+ let lockId: ObjectId | null = null;
230
+
231
+ async function maintainLock() {
232
+ if (hasLock && lockId) {
233
+ hasLock = await refreshLock(LOCK_KEY, lockId);
234
+
235
+ if (!hasLock) {
236
+ lockId = null;
237
+ }
238
+ } else if (!hasLock) {
239
+ lockId = (await acquireLock(LOCK_KEY)) || null;
240
+ hasLock = !!lockId;
241
+ }
242
+
243
+ setTimeout(maintainLock, 10_000);
244
+ }
245
+
246
+ export function refreshConversationStats() {
247
+ const ONE_HOUR_MS = 3_600_000;
248
+
249
+ maintainLock().then(() => {
250
+ computeAllStats();
251
+
252
+ setInterval(computeAllStats, 12 * ONE_HOUR_MS);
253
+ });
254
+ }
src/routes/admin/stats/compute/+server.ts CHANGED
@@ -1,222 +1,16 @@
1
  import { json } from "@sveltejs/kit";
2
- import type { ConversationStats } from "$lib/types/ConversationStats";
3
- import { CONVERSATION_STATS_COLLECTION, collections } from "$lib/server/database";
4
  import { logger } from "$lib/server/logger";
 
5
 
6
  // Triger like this:
7
  // curl -X POST "http://localhost:5173/chat/admin/stats/compute" -H "Authorization: Bearer <ADMIN_API_SECRET>"
8
 
9
  export async function POST() {
10
- for (const span of ["day", "week", "month"] as const) {
11
- computeStats({ dateField: "updatedAt", type: "conversation", span }).catch((e) =>
12
- logger.error(e)
13
- );
14
- computeStats({ dateField: "createdAt", type: "conversation", span }).catch((e) =>
15
- logger.error(e)
16
- );
17
- computeStats({ dateField: "createdAt", type: "message", span }).catch((e) => logger.error(e));
18
- }
19
-
20
- return json({}, { status: 202 });
21
- }
22
-
23
- async function computeStats(params: {
24
- dateField: ConversationStats["date"]["field"];
25
- span: ConversationStats["date"]["span"];
26
- type: ConversationStats["type"];
27
- }) {
28
- const lastComputed = await collections.conversationStats.findOne(
29
- { "date.field": params.dateField, "date.span": params.span, type: params.type },
30
- { sort: { "date.at": -1 } }
31
- );
32
-
33
- // If the last computed week is at the beginning of the last computed month, we need to include some days from the previous month
34
- // In those cases we need to compute the stats from before the last month as everything is one aggregation
35
- const minDate = lastComputed ? lastComputed.date.at : new Date(0);
36
-
37
- logger.info("Computing stats for", params.type, params.span, params.dateField, "from", minDate);
38
-
39
- const dateField = params.type === "message" ? "messages." + params.dateField : params.dateField;
40
-
41
- const pipeline = [
42
- {
43
- $match: {
44
- [dateField]: { $gte: minDate },
45
- },
46
- },
47
- {
48
- $project: {
49
- [dateField]: 1,
50
- sessionId: 1,
51
- userId: 1,
52
- },
53
- },
54
- ...(params.type === "message"
55
- ? [
56
- {
57
- $unwind: "$messages",
58
- },
59
- {
60
- $match: {
61
- [dateField]: { $gte: minDate },
62
- },
63
- },
64
- ]
65
- : []),
66
- {
67
- $sort: {
68
- [dateField]: 1,
69
- },
70
- },
71
  {
72
- $facet: {
73
- userId: [
74
- {
75
- $match: {
76
- userId: { $exists: true },
77
- },
78
- },
79
- {
80
- $group: {
81
- _id: {
82
- at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
83
- userId: "$userId",
84
- },
85
- },
86
- },
87
- {
88
- $group: {
89
- _id: "$_id.at",
90
- count: { $sum: 1 },
91
- },
92
- },
93
- {
94
- $project: {
95
- _id: 0,
96
- date: {
97
- at: "$_id",
98
- field: params.dateField,
99
- span: params.span,
100
- },
101
- distinct: "userId",
102
- count: 1,
103
- },
104
- },
105
- ],
106
- sessionId: [
107
- {
108
- $match: {
109
- sessionId: { $exists: true },
110
- },
111
- },
112
- {
113
- $group: {
114
- _id: {
115
- at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
116
- sessionId: "$sessionId",
117
- },
118
- },
119
- },
120
- {
121
- $group: {
122
- _id: "$_id.at",
123
- count: { $sum: 1 },
124
- },
125
- },
126
- {
127
- $project: {
128
- _id: 0,
129
- date: {
130
- at: "$_id",
131
- field: params.dateField,
132
- span: params.span,
133
- },
134
- distinct: "sessionId",
135
- count: 1,
136
- },
137
- },
138
- ],
139
- userOrSessionId: [
140
- {
141
- $group: {
142
- _id: {
143
- at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
144
- userOrSessionId: { $ifNull: ["$userId", "$sessionId"] },
145
- },
146
- },
147
- },
148
- {
149
- $group: {
150
- _id: "$_id.at",
151
- count: { $sum: 1 },
152
- },
153
- },
154
- {
155
- $project: {
156
- _id: 0,
157
- date: {
158
- at: "$_id",
159
- field: params.dateField,
160
- span: params.span,
161
- },
162
- distinct: "userOrSessionId",
163
- count: 1,
164
- },
165
- },
166
- ],
167
- _id: [
168
- {
169
- $group: {
170
- _id: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
171
- count: { $sum: 1 },
172
- },
173
- },
174
- {
175
- $project: {
176
- _id: 0,
177
- date: {
178
- at: "$_id",
179
- field: params.dateField,
180
- span: params.span,
181
- },
182
- distinct: "_id",
183
- count: 1,
184
- },
185
- },
186
- ],
187
- },
188
  },
189
- {
190
- $project: {
191
- stats: {
192
- $concatArrays: ["$userId", "$sessionId", "$userOrSessionId", "$_id"],
193
- },
194
- },
195
- },
196
- {
197
- $unwind: "$stats",
198
- },
199
- {
200
- $replaceRoot: {
201
- newRoot: "$stats",
202
- },
203
- },
204
- {
205
- $set: {
206
- type: params.type,
207
- },
208
- },
209
- {
210
- $merge: {
211
- into: CONVERSATION_STATS_COLLECTION,
212
- on: ["date.at", "type", "date.span", "date.field", "distinct"],
213
- whenMatched: "replace",
214
- whenNotMatched: "insert",
215
- },
216
- },
217
- ];
218
-
219
- await collections.conversations.aggregate(pipeline, { allowDiskUse: true }).next();
220
-
221
- logger.info("Computed stats for", params.type, params.span, params.dateField);
222
  }
 
1
  import { json } from "@sveltejs/kit";
 
 
2
  import { logger } from "$lib/server/logger";
3
+ import { computeAllStats } from "$lib/jobs/refresh-conversation-stats";
4
 
5
  // Triger like this:
6
  // curl -X POST "http://localhost:5173/chat/admin/stats/compute" -H "Authorization: Bearer <ADMIN_API_SECRET>"
7
 
8
  export async function POST() {
9
+ computeAllStats().catch((e) => logger.error(e));
10
+ return json(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  {
12
+ message: "Stats job started",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  },
14
+ { status: 202 }
15
+ );
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  }