• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20544193028

27 Dec 2025 08:48PM UTC coverage: 91.89% (-0.05%) from 91.942%
20544193028

push

github

VaclavObornik
feat: add watchProjection to reactiveTask to optimize task triggering

Signed-off-by: Václav Oborník <vaclav.obornik@gmail.com>

1236 of 1436 branches covered (86.07%)

Branch coverage included in aggregate %.

1982 of 2066 relevant lines covered (95.93%)

387.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.05
/src/reactiveTasks/ReactiveTaskRepository.ts
1
import { Collection, Document, Filter, UpdateFilter } from 'mongodb';
2
import { CompatibleBulkWriteOptions, CompatibleFindOneAndUpdateOptions, CompatibleModifyResult } from '../mongoCompatibility';
3
import { defaultOnError, OnError } from '../OnError';
218✔
4
import { defaultOnInfo, OnInfo } from '../OnInfo';
218✔
5
import { processInBatches } from '../processInBatches';
218✔
6
import { ReactiveTaskRetryStrategy } from './ReactiveTaskRetryStrategy';
7
import {
218✔
8
    CleanupDeleteWhen,
9
    CODE_REACTIVE_TASK_CLEANUP,
10
    ReactiveTaskInternal,
11
    ReactiveTaskRecord,
12
    ReactiveTaskStatsOptions,
13
    ReactiveTaskStatsResult,
14
} from './ReactiveTaskTypes';
15

16
/**
17
 * Handles all database interactions for reactive tasks.
18
 *
19
 * Responsibilities:
20
 * - Generates MongoDB operations for creating, updating, and deleting tasks.
21
 * - Manages task state transitions (pending -> processing -> completed/failed).
22
 * - Implements the locking mechanism for task execution.
23
 * - Handles task finalization (retries, completion, failure).
24
 * - Manages database indexes for performance.
25
 * - Cleans up orphaned tasks.
26
 */
27
export class ReactiveTaskRepository<T extends Document> {
218✔
28
    readonly initPromise: Promise<void>;
29

30
    constructor(
31
        private tasksCollection: Collection<ReactiveTaskRecord<T>>,
138✔
32
        private onInfo: OnInfo = defaultOnInfo,
138✔
33
        private onError: OnError = defaultOnError,
138✔
34
    ) {
35
        this.initPromise = this.ensureIndexes();
138✔
36
    }
37

38
    public async findAndLockNextTask(taskDefs: ReactiveTaskInternal<T>[], options: { visibilityTimeoutMs: number }): Promise<ReactiveTaskRecord<T> | null> {
39
        const now = new Date();
1,855✔
40
        const lockExpiresAt = new Date(now.getTime() + options.visibilityTimeoutMs);
1,855✔
41

42
        const filter: Filter<ReactiveTaskRecord<T>> = {
1,855✔
43
            task: { $in: taskDefs.map((c) => c.task) },
3,681✔
44
            $or: [
45
                {
46
                    status: { $in: ['pending', 'processing_dirty'] },
47
                    scheduledAt: { $lte: now },
48
                    $or: [{ lockExpiresAt: { $lt: now } }, { lockExpiresAt: { $exists: false } }, { lockExpiresAt: null }],
49
                },
50
                {
51
                    status: 'processing',
52
                    lockExpiresAt: { $lt: now },
53
                },
54
            ],
55
        };
56

57
        const update: UpdateFilter<ReactiveTaskRecord<T>> = {
1,855✔
58
            $set: {
59
                status: 'processing',
60
                lockExpiresAt: lockExpiresAt,
61
                startedAt: now,
62
            },
63
            $inc: { attempts: 1 },
64
        };
65

66
        try {
1,855✔
67
            const result = await this.tasksCollection.findOneAndUpdate(filter, update, {
1,855✔
68
                sort: { scheduledAt: 1 },
69
                returnDocument: 'after',
70
                includeResultMetadata: true,
71
            } as CompatibleFindOneAndUpdateOptions);
72

73
            // In MongoDB v4, findOneAndUpdate returns { value: T } by default.
74
            // In MongoDB v6+, if includeResultMetadata: true, it returns { value: T }.
75
            // If we cast options to any, TS might infer return type as Document or ModifyResult depending on library version.
76
            // We treat 'result' as any or check strictly.
77
            // result is derived from findOneAndUpdate which is generic.
78

79
            // Should be safe to access .value if runtime behaves as expected.
80
            return (result as unknown as CompatibleModifyResult<ReactiveTaskRecord<T>>).value || null;
1,855✔
81
        } catch (error) {
82
            this.onError(error as Error);
×
83
            return null;
×
84
        }
85
    }
86

87
    public async finalizeTask(
88
        taskRecord: ReactiveTaskRecord<T>,
89
        strategy: ReactiveTaskRetryStrategy,
90
        error?: Error,
91
        debounceMs = 1000,
×
92
        executionStats?: { durationMs: number },
93
        executionHistoryLimit = 5,
4✔
94
        options?: { session?: import('mongodb').ClientSession },
95
    ): Promise<void> {
96
        const isError = !!error;
711✔
97
        const errorMessage = error?.message || 'Unknown error';
711✔
98

99
        // Determine First Error At
100
        let firstErrorAt = taskRecord.firstErrorAt;
711✔
101
        if (isError && !firstErrorAt) {
711✔
102
            firstErrorAt = new Date();
19✔
103
        } else if (!isError) {
692✔
104
            firstErrorAt = null; // Reset on success
674✔
105
        }
106

107
        const attempts = taskRecord.attempts || 0;
711!
108
        const shouldFail = isError && strategy.shouldFail(attempts, firstErrorAt);
711✔
109
        const nextRetryScheduledAt = isError && !shouldFail ? strategy.calculateNextRetry(attempts) : null;
711✔
110

111
        const updateSet: Document = {
711✔
112
            status: {
113
                $cond: {
114
                    if: { $eq: ['$status', 'processing_dirty'] },
115
                    then: 'pending',
116
                    else: isError ? (shouldFail ? 'failed' : 'pending') : 'completed',
748✔
117
                },
118
            },
119
            scheduledAt: {
120
                $cond: {
121
                    if: { $eq: ['$status', 'processing_dirty'] },
122
                    then: { $add: ['$updatedAt', debounceMs] },
123
                    else: isError ? (shouldFail ? '$scheduledAt' : nextRetryScheduledAt) : '$scheduledAt',
748✔
124
                },
125
            },
126
            completedAt: {
127
                $cond: {
128
                    if: { $eq: ['$status', 'processing_dirty'] },
129
                    then: '$completedAt',
130
                    else: isError ? '$completedAt' : new Date(),
711✔
131
                },
132
            },
133
            lockExpiresAt: null,
134
            firstErrorAt: {
135
                $cond: {
136
                    if: { $eq: ['$status', 'processing_dirty'] },
137
                    then: '$firstErrorAt',
138
                    else: firstErrorAt,
139
                },
140
            },
141
            lastError: {
142
                $cond: {
143
                    if: { $eq: ['$status', 'processing_dirty'] },
144
                    then: '$lastError',
145
                    else: isError ? errorMessage : null,
711✔
146
                },
147
            },
148
            lastFinalizedAt: new Date(),
149
        };
150

151
        const durationMs = executionStats?.durationMs ?? 0;
711✔
152
        const historyEntry = {
711✔
153
            at: new Date(),
154
            status: isError ? 'failed' : 'completed',
711✔
155
            durationMs: durationMs,
156
            ...(isError ? { error: errorMessage } : {}),
711✔
157
        };
158

159
        if (!isError) {
711✔
160
            updateSet.lastSuccess = {
674✔
161
                at: new Date(),
162
                durationMs: durationMs,
163
            };
164
        }
165

166
        await this.tasksCollection.updateOne(
711✔
167
            { _id: taskRecord._id },
168
            [
169
                {
170
                    $set: updateSet,
171
                },
172
                {
173
                    $set: {
174
                        executionHistory: {
175
                            $slice: [
176
                                {
177
                                    $concatArrays: [{ $ifNull: ['$executionHistory', []] }, [historyEntry]],
178
                                },
179
                                -executionHistoryLimit, // Keep last N
180
                            ],
181
                        },
182
                    },
183
                },
184
            ],
185
            options,
186
        );
187
    }
188

189
    public async deferTask(taskRecord: ReactiveTaskRecord<T>, delay: number | Date): Promise<void> {
190
        const now = Date.now();
2✔
191
        const newScheduledAt = typeof delay === 'number' ? new Date(now + delay) : delay;
2!
192
        const originalScheduledAt = taskRecord.initialScheduledAt ?? taskRecord.scheduledAt;
2!
193

194
        await this.tasksCollection.updateOne(
2✔
195
            { _id: taskRecord._id },
196
            {
197
                $set: {
198
                    status: 'pending',
199
                    scheduledAt: newScheduledAt,
200
                    initialScheduledAt: originalScheduledAt,
201
                    lockExpiresAt: null,
202
                    attempts: 0,
203
                },
204
            },
205
        );
206
    }
207

208
    public async executeBulkWrite(
209
        operations: Parameters<Collection<ReactiveTaskRecord<T>>['bulkWrite']>[0],
210
        options?: CompatibleBulkWriteOptions,
211
    ): Promise<void> {
212
        await this.tasksCollection.bulkWrite(operations, options || {});
×
213
    }
214

215
    public async findTasks(
216
        filter: Filter<ReactiveTaskRecord<T>>,
217
        options: { limit?: number; skip?: number; sort?: Record<string, 1 | -1> } = {},
×
218
    ): Promise<ReactiveTaskRecord<T>[]> {
219
        return this.tasksCollection.find(filter, options).toArray();
23✔
220
    }
221

222
    public async countTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<number> {
223
        return this.tasksCollection.countDocuments(filter);
26✔
224
    }
225

226
    public async updateTasks(
227
        filter: Filter<ReactiveTaskRecord<T>>,
228
        update: UpdateFilter<ReactiveTaskRecord<T>> | Document[],
229
    ): Promise<{ matchedCount: number; modifiedCount: number }> {
230
        const result = await this.tasksCollection.updateMany(filter, update);
×
231
        return {
×
232
            matchedCount: result.matchedCount,
233
            modifiedCount: result.modifiedCount,
234
        };
235
    }
236

237
    public async resetTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<{ matchedCount: number; modifiedCount: number }> {
238
        const updatePipeline: Document[] = [
9✔
239
            {
240
                $set: {
241
                    updatedAt: '$$NOW',
242
                    status: {
243
                        $cond: {
244
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
245
                            then: 'processing_dirty',
246
                            else: 'pending',
247
                        },
248
                    },
249
                    scheduledAt: {
250
                        $cond: {
251
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
252
                            then: '$scheduledAt',
253
                            else: '$$NOW',
254
                        },
255
                    },
256
                    // Preserve history - don't reset attempts, firstErrorAt, or lastError
257
                    initialScheduledAt: { $ifNull: ['$initialScheduledAt', '$scheduledAt'] },
258
                },
259
            },
260
        ];
261

262
        const result = await this.tasksCollection.updateMany(filter, updatePipeline);
9✔
263
        return {
9✔
264
            matchedCount: result.matchedCount,
265
            modifiedCount: result.modifiedCount,
266
        };
267
    }
268

269
    public async resetTasksForUpgrade(taskName: string, mode: 'failed' | 'all'): Promise<{ modifiedCount: number }> {
270
        const filter: Filter<ReactiveTaskRecord<T>> = { task: taskName };
2✔
271

272
        if (mode === 'failed') {
2✔
273
            filter.status = 'failed';
1✔
274
        } else if (mode === 'all') {
1!
275
            filter.status = { $in: ['failed', 'completed'] };
1✔
276
        }
277

278
        // Use safe reset logic
279
        const result = await this.resetTasks(filter);
2✔
280
        return { modifiedCount: result.modifiedCount };
2✔
281
    }
282

283
    private async ensureIndexes(): Promise<void> {
284
        // Optimized index for findAndLockNextTask (ESR Rule Compliance)
285
        // 1. Equality: task (since we query by IN loop usually, and it's always equality context)
286
        // 2. Status: status is next
287
        await this.tasksCollection.createIndex({
138✔
288
            task: 1,
289
            status: 1,
290
            scheduledAt: 1,
291
            lockExpiresAt: 1,
292
        });
293

294
        // Unique index to ensure one task per task definition per source document
295
        await this.tasksCollection.createIndex({ sourceDocId: 1, task: 1 }, { unique: true });
138✔
296
    }
297

298
    /**
299
     * Periodically cleans up orphaned tasks that match the cleanupPolicy.
300
     * This runs on a schedule (e.g. hourly) in the Leader instance.
301
     */
302
    public async deleteOrphanedTasks(
303
        taskName: string,
304
        sourceCollectionName: string,
305
        taskFilter: Filter<Document>,
306
        cleanupPolicy: { deleteWhen: CleanupDeleteWhen; keepForMs: number },
307
        shouldStop: () => boolean,
308
        limitToSourceIds?: unknown[],
309
    ): Promise<void> {
310
        const { deleteWhen, keepForMs } = cleanupPolicy;
142✔
311

312
        if (deleteWhen === 'never') {
142✔
313
            return;
5✔
314
        }
315
        const cutoffDate = new Date(Date.now() - keepForMs);
137✔
316

317
        const matchStage: Document = {
137✔
318
            task: taskName,
319
            $expr: {
320
                $lt: [{ $max: ['$updatedAt', { $ifNull: ['$lastFinalizedAt', '$createdAt'] }] }, cutoffDate],
321
            },
322
        };
323

324
        if (limitToSourceIds && limitToSourceIds.length > 0) {
137✔
325
            matchStage.sourceDocId = { $in: limitToSourceIds };
11✔
326
        }
327

328
        const pipeline: Document[] = [
137✔
329
            {
330
                $match: matchStage,
331
            },
332
            {
333
                $lookup: {
334
                    from: sourceCollectionName,
335
                    let: { sId: '$sourceDocId' },
336
                    pipeline: [
337
                        { $match: { $expr: { $eq: ['$_id', '$$sId'] } } },
338
                        {
339
                            $project: {
340
                                _id: 0,
341
                                matches: Object.keys(taskFilter).length > 0 ? taskFilter : true,
137✔
342
                            },
343
                        },
344
                    ],
345
                    as: 'orphanCheck',
346
                },
347
            },
348
            {
349
                $match: {
350
                    $or: [
351
                        { orphanCheck: { $size: 0 } }, // Source document deleted
352
                        ...(deleteWhen === 'sourceDocumentDeletedOrNoLongerMatching'
137✔
353
                            ? [{ 'orphanCheck.matches': false }] // Filter no longer matches
354
                            : []),
355
                    ],
356
                },
357
            },
358
            {
359
                $project: {
360
                    _id: 1,
361
                },
362
            },
363
        ];
364

365
        await processInBatches(
137✔
366
            this.tasksCollection,
367
            pipeline,
368
            (task) => task._id,
4✔
369
            async (batch) => {
370
                // eslint-disable-next-line @typescript-eslint/no-explicit-any
371
                await this.tasksCollection.deleteMany({ _id: { $in: batch as any } });
4✔
372
                this.onInfo({
4✔
373
                    message: `Cleaned up ${batch.length} orphaned tasks for '${taskName}'`,
374
                    code: CODE_REACTIVE_TASK_CLEANUP,
375
                    meta: { count: batch.length },
376
                });
377
            },
378
            { batchSize: 1000, shouldStop },
379
        );
380
    }
381

382
    public async getStatistics(filter: Filter<ReactiveTaskRecord<T>>, options: ReactiveTaskStatsOptions): Promise<ReactiveTaskStatsResult> {
383
        const pipeline: object[] = [];
97✔
384

385
        if (Object.keys(filter).length > 0) {
97✔
386
            pipeline.push({ $match: filter });
8✔
387
        }
388

389
        const facets: Record<string, object[]> = {};
97✔
390

391
        if (options.includeStatusCounts) {
97!
392
            const groupId = options.groupByTask ? { task: '$task', status: '$status' } : '$status';
97✔
393
            facets.statuses = [{ $group: { _id: groupId, count: { $sum: 1 } } }];
97✔
394
        }
395

396
        if (options.includeErrorCount) {
97✔
397
            if (options.groupByTask) {
8✔
398
                facets.errorCounts = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $group: { _id: '$task', count: { $sum: 1 } } }];
1✔
399
            } else {
400
                facets.errorCount = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $count: 'count' }];
7✔
401
            }
402
        }
403

404
        if (options.includeGlobalLag) {
97✔
405
            facets.globalLag = [
89✔
406
                { $match: { status: 'pending' } },
407
                { $group: { _id: '$task', minScheduledAt: { $min: { $ifNull: ['$initialScheduledAt', '$scheduledAt'] } } } },
408
            ];
409
        }
410

411
        pipeline.push({ $facet: facets });
97✔
412

413
        const projection: Record<string, unknown> = { statuses: 1 };
97✔
414

415
        if (options.includeErrorCount) {
97✔
416
            if (options.groupByTask) {
8✔
417
                projection.errorCounts = 1;
1✔
418
            } else {
419
                projection.errorCount = { $ifNull: [{ $arrayElemAt: ['$errorCount.count', 0] }, 0] };
7✔
420
            }
421
        }
422

423
        if (options.includeGlobalLag) {
97✔
424
            projection.globalLag = 1;
89✔
425
        }
426

427
        pipeline.push({ $project: projection });
97✔
428

429
        const [result] = await this.tasksCollection.aggregate<ReactiveTaskStatsResult>(pipeline, { readPreference: options.readPreference }).toArray();
97✔
430

431
        return (
97✔
432
            result || {
97!
433
                statuses: [],
434
                errorCount: options.includeErrorCount ? 0 : undefined,
×
435
                globalLag: options.includeGlobalLag ? [] : undefined,
×
436
            }
437
        );
438
    }
439
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc