• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20560403433

28 Dec 2025 10:26PM UTC coverage: 91.897% (+0.06%) from 91.836%
20560403433

push

github

VaclavObornik
2.3.0

1250 of 1452 branches covered (86.09%)

Branch coverage included in aggregate %.

2005 of 2090 relevant lines covered (95.93%)

381.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.63
/src/reactiveTasks/ReactiveTaskRepository.ts
1
import { Collection, Document, Filter, UpdateFilter } from 'mongodb';
2
import { CompatibleBulkWriteOptions, CompatibleFindOneAndUpdateOptions, CompatibleModifyResult } from '../mongoCompatibility';
3
import { defaultOnError, OnError } from '../OnError';
221✔
4
import { defaultOnInfo, OnInfo } from '../OnInfo';
221✔
5
import { processInBatches } from '../processInBatches';
221✔
6
import { ReactiveTaskRetryStrategy } from './ReactiveTaskRetryStrategy';
7
import {
221✔
8
    CleanupDeleteWhen,
9
    CODE_REACTIVE_TASK_CLEANUP,
10
    ReactiveTaskInternal,
11
    ReactiveTaskRecord,
12
    ReactiveTaskStatsOptions,
13
    ReactiveTaskStatsResult,
14
} from './ReactiveTaskTypes';
15

16
/**
17
 * Handles all database interactions for reactive tasks.
18
 *
19
 * Responsibilities:
20
 * - Generates MongoDB operations for creating, updating, and deleting tasks.
21
 * - Manages task state transitions (pending -> processing -> completed/failed).
22
 * - Implements the locking mechanism for task execution.
23
 * - Handles task finalization (retries, completion, failure).
24
 * - Manages database indexes for performance.
25
 * - Cleans up orphaned tasks.
26
 */
27
export class ReactiveTaskRepository<T extends Document> {
221✔
28
    readonly initPromise: Promise<void>;
29

30
    constructor(
31
        private tasksCollection: Collection<ReactiveTaskRecord<T>>,
139✔
32
        private onInfo: OnInfo = defaultOnInfo,
139✔
33
        private onError: OnError = defaultOnError,
139✔
34
    ) {
35
        this.initPromise = this.ensureIndexes();
139✔
36
    }
37

38
    public async findAndLockNextTask(taskDefs: ReactiveTaskInternal<T>[], options: { visibilityTimeoutMs: number }): Promise<ReactiveTaskRecord<T> | null> {
39
        const now = new Date();
1,841✔
40
        const nextRunAt = new Date(now.getTime() + options.visibilityTimeoutMs);
1,841✔
41

42
        const filter: Filter<ReactiveTaskRecord<T>> = {
1,841✔
43
            task: { $in: taskDefs.map((c) => c.task) },
3,611✔
44
            nextRunAt: { $lte: now, $type: 'date' },
45
        };
46

47
        const update: UpdateFilter<ReactiveTaskRecord<T>> = {
1,841✔
48
            $set: {
49
                status: 'processing',
50
                nextRunAt: nextRunAt,
51
                startedAt: now,
52
            },
53
            $inc: { attempts: 1 },
54
        };
55

56
        try {
1,841✔
57
            const result = await this.tasksCollection.findOneAndUpdate(filter, update, {
1,841✔
58
                sort: { nextRunAt: 1 },
59
                returnDocument: 'after',
60
                includeResultMetadata: true,
61
            } as CompatibleFindOneAndUpdateOptions);
62

63
            // In MongoDB v4, findOneAndUpdate returns { value: T } by default.
64
            // In MongoDB v6+, if includeResultMetadata: true, it returns { value: T }.
65
            // If we cast options to any, TS might infer return type as Document or ModifyResult depending on library version.
66
            // We treat 'result' as any or check strictly.
67
            // result is derived from findOneAndUpdate which is generic.
68

69
            // Should be safe to access .value if runtime behaves as expected.
70
            return (result as unknown as CompatibleModifyResult<ReactiveTaskRecord<T>>).value || null;
1,841✔
71
        } catch (error) {
72
            this.onError(error as Error);
×
73
            return null;
×
74
        }
75
    }
76

77
    public async finalizeTask(
78
        taskRecord: ReactiveTaskRecord<T>,
79
        strategy: ReactiveTaskRetryStrategy,
80
        error?: Error,
81
        debounceMs = 1000,
×
82
        executionStats?: { durationMs: number },
83
        executionHistoryLimit = 5,
4✔
84
        options?: { session?: import('mongodb').ClientSession },
85
    ): Promise<void> {
86
        const isError = !!error;
711✔
87
        const errorMessage = error?.message || 'Unknown error';
711✔
88

89
        // Determine First Error At
90
        let firstErrorAt = taskRecord.firstErrorAt;
711✔
91
        if (isError && !firstErrorAt) {
711✔
92
            firstErrorAt = new Date();
19✔
93
        } else if (!isError) {
692✔
94
            firstErrorAt = null; // Reset on success
674✔
95
        }
96

97
        const attempts = taskRecord.attempts || 0;
711!
98
        const shouldFail = isError && strategy.shouldFail(attempts, firstErrorAt);
711✔
99
        const nextRetryScheduledAt = isError && !shouldFail ? strategy.calculateNextRetry(attempts) : null;
711✔
100

101
        const updateSet: Document = {
711✔
102
            status: {
103
                $cond: {
104
                    if: { $eq: ['$status', 'processing_dirty'] },
105
                    then: 'pending',
106
                    else: isError ? (shouldFail ? 'failed' : 'pending') : 'completed',
748✔
107
                },
108
            },
109
            nextRunAt: {
110
                $cond: {
111
                    if: { $eq: ['$status', 'processing_dirty'] },
112
                    then: { $add: ['$updatedAt', debounceMs] },
113
                    else: isError ? (shouldFail ? null : nextRetryScheduledAt) : null,
748✔
114
                },
115
            },
116
            completedAt: {
117
                $cond: {
118
                    if: { $eq: ['$status', 'processing_dirty'] },
119
                    then: '$completedAt',
120
                    else: isError ? '$completedAt' : new Date(),
711✔
121
                },
122
            },
123
            firstErrorAt: {
124
                $cond: {
125
                    if: { $eq: ['$status', 'processing_dirty'] },
126
                    then: '$firstErrorAt',
127
                    else: firstErrorAt,
128
                },
129
            },
130
            lastError: {
131
                $cond: {
132
                    if: { $eq: ['$status', 'processing_dirty'] },
133
                    then: '$lastError',
134
                    else: isError ? errorMessage : null,
711✔
135
                },
136
            },
137
            lastFinalizedAt: new Date(),
138
        };
139

140
        const durationMs = executionStats?.durationMs ?? 0;
711✔
141
        const historyEntry = {
711✔
142
            at: new Date(),
143
            status: isError ? 'failed' : 'completed',
711✔
144
            durationMs: durationMs,
145
            ...(isError ? { error: errorMessage } : {}),
711✔
146
        };
147

148
        if (!isError) {
711✔
149
            updateSet.lastSuccess = {
674✔
150
                at: new Date(),
151
                durationMs: durationMs,
152
            };
153
        }
154

155
        await this.tasksCollection.updateOne(
711✔
156
            { _id: taskRecord._id },
157
            [
158
                {
159
                    $set: updateSet,
160
                },
161
                {
162
                    $set: {
163
                        executionHistory: {
164
                            $slice: [
165
                                {
166
                                    $concatArrays: [{ $ifNull: ['$executionHistory', []] }, [historyEntry]],
167
                                },
168
                                -executionHistoryLimit, // Keep last N
169
                            ],
170
                        },
171
                    },
172
                },
173
            ],
174
            options || {},
1,419✔
175
        );
176
    }
177

178
    public async deferTask(taskRecord: ReactiveTaskRecord<T>, delay: number | Date): Promise<void> {
179
        const now = Date.now();
2✔
180
        const nextRunAt = typeof delay === 'number' ? new Date(now + delay) : delay;
2!
181
        // Keeping dueAt unchanged (it shouldn't change on deferral if we want to track original intent,
182
        // but if we want 'lag' to be reset, we would update it.
183
        // Based on plan: "Never changes" -> so we don't update dueAt here.
184
        // Wait, if we defer, we are explicitly saying "don't run yet".
185
        // In clean slate, dueAt is set at creation and never changes (unless strictly needed for lag reset).
186

187
        await this.tasksCollection.updateOne(
2✔
188
            { _id: taskRecord._id },
189
            {
190
                $set: {
191
                    status: 'pending',
192
                    nextRunAt: nextRunAt,
193
                    // dueAt: not changed
194
                    attempts: 0,
195
                },
196
            },
197
        );
198
    }
199

200
    public async executeBulkWrite(
201
        operations: Parameters<Collection<ReactiveTaskRecord<T>>['bulkWrite']>[0],
202
        options?: CompatibleBulkWriteOptions,
203
    ): Promise<void> {
204
        await this.tasksCollection.bulkWrite(operations, options || {});
×
205
    }
206

207
    public async findTasks(
208
        filter: Filter<ReactiveTaskRecord<T>>,
209
        options: { limit?: number; skip?: number; sort?: Record<string, 1 | -1> } = {},
×
210
    ): Promise<ReactiveTaskRecord<T>[]> {
211
        return this.tasksCollection.find(filter, options).toArray();
23✔
212
    }
213

214
    public async countTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<number> {
215
        return this.tasksCollection.countDocuments(filter);
26✔
216
    }
217

218
    public async updateTasks(
219
        filter: Filter<ReactiveTaskRecord<T>>,
220
        update: UpdateFilter<ReactiveTaskRecord<T>> | Document[],
221
    ): Promise<{ matchedCount: number; modifiedCount: number }> {
222
        const result = await this.tasksCollection.updateMany(filter, update);
×
223
        return {
×
224
            matchedCount: result.matchedCount,
225
            modifiedCount: result.modifiedCount,
226
        };
227
    }
228

229
    public async resetTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<{ matchedCount: number; modifiedCount: number }> {
230
        const updatePipeline: Document[] = [
9✔
231
            {
232
                $set: {
233
                    updatedAt: '$$NOW',
234
                    status: {
235
                        $cond: {
236
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
237
                            then: 'processing_dirty',
238
                            else: 'pending',
239
                        },
240
                    },
241
                    nextRunAt: {
242
                        // If it was processing, keep it running (don't break lock) - wait, resetTasks usually implies "fix stuff".
243
                        // Logic: if processing/dirty -> keep nextRunAt (lock), else -> $$NOW.
244
                        // If we reset a stuck task, we want it to run NOW.
245
                        // If we reset a completed/failed task, we want it to run NOW.
246
                        $cond: {
247
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
248
                            then: '$nextRunAt', // Keep current timeout
249
                            else: '$$NOW', // Run immediately
250
                        },
251
                    },
252
                    // Preserve dueAt
253
                },
254
            },
255
        ];
256

257
        const result = await this.tasksCollection.updateMany(filter, updatePipeline);
9✔
258
        return {
9✔
259
            matchedCount: result.matchedCount,
260
            modifiedCount: result.modifiedCount,
261
        };
262
    }
263

264
    public async resetTasksForUpgrade(taskName: string, mode: 'failed' | 'all'): Promise<{ modifiedCount: number }> {
265
        const filter: Filter<ReactiveTaskRecord<T>> = { task: taskName };
2✔
266

267
        if (mode === 'failed') {
2✔
268
            filter.status = 'failed';
1✔
269
        } else if (mode === 'all') {
1!
270
            filter.status = { $in: ['failed', 'completed'] };
1✔
271
        }
272

273
        // Use safe reset logic
274
        const result = await this.resetTasks(filter);
2✔
275
        return { modifiedCount: result.modifiedCount };
2✔
276
    }
277

278
    private async ensureIndexes(): Promise<void> {
279
        // Optimized index for findAndLockNextTask (ESR Rule Compliance)
280
        // 1. Equality: task (via $in)
281
        // 2. Sort: nextRunAt
282
        // 3. Range: nextRunAt ($lte)
283
        // Partial Index: only index tasks that are eligible to run (nextRunAt != null)
284
        await this.tasksCollection.createIndex(
139✔
285
            {
286
                task: 1,
287
                nextRunAt: 1,
288
            },
289
            {
290
                partialFilterExpression: { nextRunAt: { $type: 'date' } },
291
                name: 'polling_idx',
292
            },
293
        );
294

295
        // Unique index to ensure one task per task definition per source document
296
        await this.tasksCollection.createIndex({ sourceDocId: 1, task: 1 }, { unique: true });
139✔
297
    }
298

299
    /**
300
     * Periodically cleans up orphaned tasks that match the cleanupPolicy.
301
     * This runs on a schedule (e.g. hourly) in the Leader instance.
302
     */
303
    public async deleteOrphanedTasks(
304
        taskName: string,
305
        sourceCollectionName: string,
306
        taskFilter: Filter<Document>,
307
        cleanupPolicy: { deleteWhen: CleanupDeleteWhen; keepForMs: number },
308
        shouldStop: () => boolean,
309
        limitToSourceIds?: unknown[],
310
    ): Promise<void> {
311
        const { deleteWhen, keepForMs } = cleanupPolicy;
143✔
312

313
        if (deleteWhen === 'never') {
143✔
314
            return;
5✔
315
        }
316
        const cutoffDate = new Date(Date.now() - keepForMs);
138✔
317

318
        const matchStage: Document = {
138✔
319
            task: taskName,
320
            $expr: {
321
                $lt: [
322
                    {
323
                        $max: ['$updatedAt', { $ifNull: ['$lastFinalizedAt', '$createdAt'] }],
324
                    },
325
                    cutoffDate,
326
                ],
327
            },
328
        };
329

330
        if (limitToSourceIds && limitToSourceIds.length > 0) {
138✔
331
            matchStage.sourceDocId = { $in: limitToSourceIds };
11✔
332
        }
333

334
        const pipeline: Document[] = [
138✔
335
            {
336
                $match: matchStage,
337
            },
338
        ];
339

340
        // We need to determine if the source document is "gone" or "no longer matching".
341
        // Strategy:
342
        // 1. If deleteWhen === 'sourceDocumentDeleted', we just check if document exists by ID.
343
        // 2. If deleteWhen === 'sourceDocumentDeletedOrNoLongerMatching', we check if document exists AND matches filter.
344

345
        const lookupPipeline: Document[] = [{ $match: { $expr: { $eq: ['$_id', '$$sId'] } } }];
138✔
346

347
        if (deleteWhen === 'sourceDocumentDeletedOrNoLongerMatching' && Object.keys(taskFilter).length > 0) {
138✔
348
            // taskFilter (normalized) is an Expression body. Must wrap in $expr for $match.
349
            lookupPipeline.push({ $match: { $expr: taskFilter } });
8✔
350
        }
351

352
        pipeline.push(
138✔
353
            {
354
                $lookup: {
355
                    from: sourceCollectionName,
356
                    let: { sId: '$sourceDocId' },
357
                    pipeline: lookupPipeline,
358
                    as: 'orphanCheck',
359
                },
360
            },
361
            {
362
                $match: {
363
                    'orphanCheck.0': { $exists: false }, // If empty, it means it was deleted OR didn't match filter
364
                },
365
            },
366
            {
367
                $project: {
368
                    _id: 1,
369
                    orphanCheck: 1,
370
                },
371
            },
372
        );
373

374
        await processInBatches(
138✔
375
            this.tasksCollection,
376
            pipeline,
377
            (task) => task._id,
4✔
378
            async (batch) => {
379
                // eslint-disable-next-line @typescript-eslint/no-explicit-any
380
                await this.tasksCollection.deleteMany({ _id: { $in: batch as any } });
4✔
381
                this.onInfo({
4✔
382
                    message: `Cleaned up ${batch.length} orphaned tasks for '${taskName}'`,
383
                    code: CODE_REACTIVE_TASK_CLEANUP,
384
                    meta: { count: batch.length },
385
                });
386
            },
387
            { batchSize: 1000, shouldStop },
388
        );
389
    }
390

391
    public async getStatistics(filter: Filter<ReactiveTaskRecord<T>>, options: ReactiveTaskStatsOptions): Promise<ReactiveTaskStatsResult> {
392
        const pipeline: object[] = [];
97✔
393

394
        if (Object.keys(filter).length > 0) {
97✔
395
            pipeline.push({ $match: filter });
8✔
396
        }
397

398
        const facets: Record<string, object[]> = {};
97✔
399

400
        if (options.includeStatusCounts) {
97!
401
            const groupId = options.groupByTask ? { task: '$task', status: '$status' } : '$status';
97✔
402
            facets.statuses = [{ $group: { _id: groupId, count: { $sum: 1 } } }];
97✔
403
        }
404

405
        if (options.includeErrorCount) {
97✔
406
            if (options.groupByTask) {
8✔
407
                facets.errorCounts = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $group: { _id: '$task', count: { $sum: 1 } } }];
1✔
408
            } else {
409
                facets.errorCount = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $count: 'count' }];
7✔
410
            }
411
        }
412

413
        if (options.includeGlobalLag) {
97✔
414
            facets.globalLag = [{ $match: { status: 'pending' } }, { $group: { _id: '$task', minScheduledAt: { $min: '$dueAt' } } }];
89✔
415
        }
416

417
        pipeline.push({ $facet: facets });
97✔
418

419
        const projection: Record<string, unknown> = { statuses: 1 };
97✔
420

421
        if (options.includeErrorCount) {
97✔
422
            if (options.groupByTask) {
8✔
423
                projection.errorCounts = 1;
1✔
424
            } else {
425
                projection.errorCount = { $ifNull: [{ $arrayElemAt: ['$errorCount.count', 0] }, 0] };
7✔
426
            }
427
        }
428

429
        if (options.includeGlobalLag) {
97✔
430
            projection.globalLag = 1;
89✔
431
        }
432

433
        pipeline.push({ $project: projection });
97✔
434

435
        const [result] = await this.tasksCollection.aggregate<ReactiveTaskStatsResult>(pipeline, { readPreference: options.readPreference }).toArray();
97✔
436

437
        return (
97✔
438
            result || {
97!
439
                statuses: [],
440
                errorCount: options.includeErrorCount ? 0 : undefined,
×
441
                globalLag: options.includeGlobalLag ? [] : undefined,
×
442
            }
443
        );
444
    }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc