File size: 2,545 Bytes
a8a9533
1b76365
 
 
486ffa7
8848296
1b76365
 
 
 
 
8848296
 
 
 
 
 
 
 
 
 
 
1b76365
 
 
 
 
 
d4471e3
 
 
a8a9533
d4471e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b76365
d4471e3
 
1b76365
d4471e3
 
 
 
 
1b76365
d4471e3
 
 
 
 
 
 
1b76365
d4471e3
 
 
 
 
1b76365
dc98038
1b76365
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8848296
 
 
 
1b76365
8848296
 
 
 
 
1b76365
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import { Database } from "$lib/server/database";
import { acquireLock, refreshLock } from "$lib/migrations/lock";
import type { ObjectId } from "mongodb";
import { subDays } from "date-fns";
import { logger } from "$lib/server/logger";
import { collections } from "$lib/server/database";
const LOCK_KEY = "assistants.count";

let hasLock = false;
let lockId: ObjectId | null = null;

async function getLastComputationTime(): Promise<Date> {
	const lastStats = await collections.assistantStats.findOne({}, { sort: { "date.at": -1 } });
	return lastStats?.date?.at || new Date(0);
}

async function shouldComputeStats(): Promise<boolean> {
	const lastComputationTime = await getLastComputationTime();
	const oneDayAgo = new Date(Date.now() - 24 * 3_600_000);
	return lastComputationTime < oneDayAgo;
}

async function refreshAssistantsCountsHelper() {
	if (!hasLock) {
		return;
	}

	try {
		await (await Database.getInstance()).getClient().withSession((session) =>
			session.withTransaction(async () => {
				await (
					await Database.getInstance()
				)
					.getCollections()
					.assistants.aggregate([
						{ $project: { _id: 1 } },
						{ $set: { last24HoursCount: 0 } },
						{
							$unionWith: {
								coll: "assistants.stats",
								pipeline: [
									{
										$match: { "date.at": { $gte: subDays(new Date(), 1) }, "date.span": "hour" },
									},
									{
										$group: {
											_id: "$assistantId",
											last24HoursCount: { $sum: "$count" },
										},
									},
								],
							},
						},
						{
							$group: {
								_id: "$_id",
								last24HoursCount: { $sum: "$last24HoursCount" },
							},
						},
						{
							$merge: {
								into: "assistants",
								on: "_id",
								whenMatched: "merge",
								whenNotMatched: "discard",
							},
						},
					])
					.next();
			})
		);
	} catch (e) {
		logger.error(e, "Refresh assistants counter failed!");
	}
}

async function maintainLock() {
	if (hasLock && lockId) {
		hasLock = await refreshLock(LOCK_KEY, lockId);

		if (!hasLock) {
			lockId = null;
		}
	} else if (!hasLock) {
		lockId = (await acquireLock(LOCK_KEY)) || null;
		hasLock = !!lockId;
	}

	setTimeout(maintainLock, 10_000);
}

export function refreshAssistantsCounts() {
	const ONE_HOUR_MS = 3_600_000;

	maintainLock().then(async () => {
		if (await shouldComputeStats()) {
			refreshAssistantsCountsHelper();
		}

		setInterval(async () => {
			if (await shouldComputeStats()) {
				refreshAssistantsCountsHelper();
			}
		}, 24 * ONE_HOUR_MS);
	});
}