• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 24607504909

18 Apr 2026 03:09PM UTC coverage: 91.218% (+0.09%) from 91.133%
24607504909

push

github

VaclavObornik
fix: address Copilot review round 11

1. waitUntilReactiveTasksIdle (whitelist mode) now always checks the
   planner buffer. Skipping it entirely let the helper return idle
   before an in-flight change event had a chance to be turned into
   the task record the DB check looks for. Only the worker count
   check remains global-scope-skipped so unrelated tests' worker
   pools cannot block a scoped wait.

2. docs/reactive-tasks/testing.md now notes the narrow race where
   triggering a write and immediately calling the scoped wait can
   return early if stabilityDurationMs is too short, with a
   recommended workaround (raise stabilityDurationMs or pre-poll
   for the task record).

3. cronTasksConcurrency 'no polls after stop' assertion tightened:
   stopCronTasks is fire-and-forget, so a worker already inside
   findOneAndUpdate at the time could still tick the spy. Capture
   the poll count after a 200ms grace period and assert it does
   not grow over a full 5s back-off window, instead of requiring
   spy.called === false outright.

1492 of 1748 branches covered (85.35%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

54 existing lines in 9 files now uncovered.

2320 of 2431 relevant lines covered (95.43%)

373.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/src/reactiveTasks/ReactiveTaskRepository.ts
1
import { Collection, Document, Filter, UpdateFilter } from 'mongodb';
2
import { CompatibleBulkWriteOptions, CompatibleFindOneAndUpdateOptions, CompatibleModifyResult } from '../mongoCompatibility';
3
import { defaultOnError, OnError } from '../OnError';
267✔
4
import { defaultOnInfo, OnInfo } from '../OnInfo';
267✔
5
import { processInBatches } from '../processInBatches';
267✔
6
import { ReactiveTaskRetryStrategy } from './ReactiveTaskRetryStrategy';
7
import {
267✔
8
    CleanupDeleteWhen,
9
    CODE_REACTIVE_TASK_CLEANUP,
10
    ReactiveTaskInternal,
11
    ReactiveTaskRecord,
12
    ReactiveTaskStatsOptions,
13
    ReactiveTaskStatsResult,
14
} from './ReactiveTaskTypes';
15

16
/**
17
 * Handles all database interactions for reactive tasks.
18
 *
19
 * Responsibilities:
20
 * - Generates MongoDB operations for creating, updating, and deleting tasks.
21
 * - Manages task state transitions (pending -> processing -> completed/failed).
22
 * - Implements the locking mechanism for task execution.
23
 * - Handles task finalization (retries, completion, failure).
24
 * - Manages database indexes for performance.
25
 * - Cleans up orphaned tasks.
26
 */
27
export class ReactiveTaskRepository<T extends Document> {
267✔
28
    readonly initPromise: Promise<void>;
29

30
    constructor(
31
        private tasksCollection: Collection<ReactiveTaskRecord<T>>,
162✔
32
        private onInfo: OnInfo = defaultOnInfo,
162✔
33
        private onError: OnError = defaultOnError,
162✔
34
    ) {
35
        this.initPromise = this.ensureIndexes();
162✔
36
    }
37

38
    public async findAndLockNextTask(taskDefs: ReactiveTaskInternal<T>[], options: { visibilityTimeoutMs: number }): Promise<ReactiveTaskRecord<T> | null> {
39
        const now = new Date();
1,819✔
40
        const nextRunAt = new Date(now.getTime() + options.visibilityTimeoutMs);
1,819✔
41

42
        const filter: Filter<ReactiveTaskRecord<T>> = {
1,819✔
43
            task: { $in: taskDefs.map((c) => c.task) },
3,459✔
44
            nextRunAt: { $lte: now, $type: 'date' },
45
        };
46

47
        const update: UpdateFilter<ReactiveTaskRecord<T>> = {
1,819✔
48
            $set: {
49
                status: 'processing',
50
                nextRunAt: nextRunAt,
51
                startedAt: now,
52
            },
53
            $inc: { attempts: 1 },
54
        };
55

56
        try {
1,819✔
57
            const result = await this.tasksCollection.findOneAndUpdate(filter, update, {
1,819✔
58
                sort: { nextRunAt: 1 },
59
                returnDocument: 'after',
60
                includeResultMetadata: true,
61
            } as CompatibleFindOneAndUpdateOptions);
62

63
            // In MongoDB v4, findOneAndUpdate returns { value: T } by default.
64
            // In MongoDB v6+, if includeResultMetadata: true, it returns { value: T }.
65
            // If we cast options to any, TS might infer return type as Document or ModifyResult depending on library version.
66
            // We treat 'result' as any or check strictly.
67
            // result is derived from findOneAndUpdate which is generic.
68

69
            // Should be safe to access .value if runtime behaves as expected.
70
            return (result as unknown as CompatibleModifyResult<ReactiveTaskRecord<T>>).value || null;
1,818✔
71
        } catch (error) {
72
            this.onError(error as Error);
1✔
73
            return null;
1✔
74
        }
75
    }
76

77
    /**
78
     * Finalize a task record (success or failure). Returns `true` when the
79
     * update matched the record, `false` when it did not - which in
80
     * practice means another worker has since re-claimed the task (its
81
     * startedAt no longer matches) and this call was a no-op.
82
     *
83
     * Callers that care about the distinction (e.g. to suppress success /
84
     * failure metrics for a stolen task) should inspect the return value.
85
     */
86
    public async finalizeTask(
87
        taskRecord: ReactiveTaskRecord<T>,
88
        strategy: ReactiveTaskRetryStrategy,
89
        error?: Error,
90
        debounceMs = 1000,
×
91
        executionStats?: { durationMs: number },
92
        executionHistoryLimit = 5,
4✔
93
        options?: { session?: import('mongodb').ClientSession },
94
    ): Promise<boolean> {
95
        const isError = !!error;
724✔
96
        const errorMessage = error?.message || 'Unknown error';
724✔
97

98
        // Determine First Error At
99
        let firstErrorAt = taskRecord.firstErrorAt;
724✔
100
        if (isError && !firstErrorAt) {
724✔
101
            firstErrorAt = new Date();
29✔
102
        } else if (!isError) {
695✔
103
            firstErrorAt = null; // Reset on success
677✔
104
        }
105

106
        const attempts = taskRecord.attempts || 0;
724!
107
        const shouldFail = isError && strategy.shouldFail(attempts, firstErrorAt);
724✔
108
        const nextRetryScheduledAt = isError && !shouldFail ? strategy.calculateNextRetry(attempts) : null;
724✔
109

110
        const updateSet: Document = {
724✔
111
            status: {
112
                $cond: {
113
                    if: { $eq: ['$status', 'processing_dirty'] },
114
                    then: 'pending',
115
                    else: isError ? (shouldFail ? 'failed' : 'pending') : 'completed',
771✔
116
                },
117
            },
118
            nextRunAt: {
119
                $cond: {
120
                    if: { $eq: ['$status', 'processing_dirty'] },
121
                    then: { $add: ['$updatedAt', debounceMs] },
122
                    else: isError ? (shouldFail ? null : nextRetryScheduledAt) : null,
771✔
123
                },
124
            },
125
            completedAt: {
126
                $cond: {
127
                    if: { $eq: ['$status', 'processing_dirty'] },
128
                    then: '$completedAt',
129
                    else: isError ? '$completedAt' : new Date(),
724✔
130
                },
131
            },
132
            firstErrorAt: {
133
                $cond: {
134
                    if: { $eq: ['$status', 'processing_dirty'] },
135
                    then: '$firstErrorAt',
136
                    else: firstErrorAt,
137
                },
138
            },
139
            lastError: {
140
                $cond: {
141
                    if: { $eq: ['$status', 'processing_dirty'] },
142
                    then: '$lastError',
143
                    else: isError ? errorMessage : null,
724✔
144
                },
145
            },
146
            lastFinalizedAt: new Date(),
147
        };
148

149
        const durationMs = executionStats?.durationMs ?? 0;
724✔
150
        const historyEntry = {
724✔
151
            at: new Date(),
152
            status: isError ? 'failed' : 'completed',
724✔
153
            durationMs: durationMs,
154
            ...(isError ? { error: errorMessage } : {}),
724✔
155
        };
156

157
        if (!isError) {
724✔
158
            updateSet.lastSuccess = {
677✔
159
                at: new Date(),
160
                durationMs: durationMs,
161
            };
162
        }
163

164
        // CAS on startedAt: if another worker has since re-claimed this task
165
        // (visibility timeout expired and a new claim set its own startedAt),
166
        // the filter does not match and we leave the new claimant's state
167
        // alone. A task without startedAt (shouldn't happen once claimed) falls
168
        // back to _id-only matching to preserve BC.
169
        const finalizeFilter: Filter<ReactiveTaskRecord<T>> = taskRecord.startedAt
724✔
170
            ? ({ _id: taskRecord._id, startedAt: taskRecord.startedAt } as Filter<ReactiveTaskRecord<T>>)
171
            : ({ _id: taskRecord._id } as Filter<ReactiveTaskRecord<T>>);
172

173
        const result = await this.tasksCollection.updateOne(
724✔
174
            finalizeFilter,
175
            [
176
                {
177
                    $set: updateSet,
178
                },
179
                {
180
                    $set: {
181
                        executionHistory: {
182
                            $slice: [
183
                                {
184
                                    $concatArrays: [{ $ifNull: ['$executionHistory', []] }, [historyEntry]],
185
                                },
186
                                -executionHistoryLimit, // Keep last N
187
                            ],
188
                        },
189
                    },
190
                },
191
            ],
192
            options || {},
1,445✔
193
        );
194

195
        return result.matchedCount > 0;
724✔
196
    }
197

198
    public async deferTask(taskRecord: ReactiveTaskRecord<T>, delay: number | Date): Promise<void> {
199
        const now = Date.now();
2✔
200
        const nextRunAt = typeof delay === 'number' ? new Date(now + delay) : delay;
2!
201
        // Keeping dueAt unchanged (it shouldn't change on deferral if we want to track original intent,
202
        // but if we want 'lag' to be reset, we would update it.
203
        // Based on plan: "Never changes" -> so we don't update dueAt here.
204
        // Wait, if we defer, we are explicitly saying "don't run yet".
205
        // In clean slate, dueAt is set at creation and never changes (unless strictly needed for lag reset).
206

207
        await this.tasksCollection.updateOne(
2✔
208
            { _id: taskRecord._id },
209
            {
210
                $set: {
211
                    status: 'pending',
212
                    nextRunAt: nextRunAt,
213
                    // dueAt: not changed
214
                    attempts: 0,
215
                },
216
            },
217
        );
218
    }
219

220
    public async executeBulkWrite(
221
        operations: Parameters<Collection<ReactiveTaskRecord<T>>['bulkWrite']>[0],
222
        options?: CompatibleBulkWriteOptions,
223
    ): Promise<void> {
UNCOV
224
        await this.tasksCollection.bulkWrite(operations, options || {});
×
225
    }
226

227
    public async findTasks(
228
        filter: Filter<ReactiveTaskRecord<T>>,
229
        options: { limit?: number; skip?: number; sort?: Record<string, 1 | -1> } = {},
×
230
    ): Promise<ReactiveTaskRecord<T>[]> {
231
        return this.tasksCollection.find(filter, options).toArray();
14✔
232
    }
233

234
    public async countTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<number> {
235
        return this.tasksCollection.countDocuments(filter);
17✔
236
    }
237

238
    public async updateTasks(
239
        filter: Filter<ReactiveTaskRecord<T>>,
240
        update: UpdateFilter<ReactiveTaskRecord<T>> | Document[],
241
    ): Promise<{ matchedCount: number; modifiedCount: number }> {
UNCOV
242
        const result = await this.tasksCollection.updateMany(filter, update);
×
UNCOV
243
        return {
×
244
            matchedCount: result.matchedCount,
245
            modifiedCount: result.modifiedCount,
246
        };
247
    }
248

249
    public async resetTasks(filter: Filter<ReactiveTaskRecord<T>>): Promise<{ matchedCount: number; modifiedCount: number }> {
250
        const updatePipeline: Document[] = [
9✔
251
            {
252
                $set: {
253
                    updatedAt: '$$NOW',
254
                    status: {
255
                        $cond: {
256
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
257
                            then: 'processing_dirty',
258
                            else: 'pending',
259
                        },
260
                    },
261
                    nextRunAt: {
262
                        // If it was processing, keep it running (don't break lock) - wait, resetTasks usually implies "fix stuff".
263
                        // Logic: if processing/dirty -> keep nextRunAt (lock), else -> $$NOW.
264
                        // If we reset a stuck task, we want it to run NOW.
265
                        // If we reset a completed/failed task, we want it to run NOW.
266
                        $cond: {
267
                            if: { $in: ['$status', ['processing', 'processing_dirty']] },
268
                            then: '$nextRunAt', // Keep current timeout
269
                            else: '$$NOW', // Run immediately
270
                        },
271
                    },
272
                    // Preserve dueAt
273
                },
274
            },
275
        ];
276

277
        const result = await this.tasksCollection.updateMany(filter, updatePipeline);
9✔
278
        return {
9✔
279
            matchedCount: result.matchedCount,
280
            modifiedCount: result.modifiedCount,
281
        };
282
    }
283

284
    public async resetTasksForUpgrade(taskName: string, mode: 'failed' | 'all'): Promise<{ modifiedCount: number }> {
285
        const filter: Filter<ReactiveTaskRecord<T>> = { task: taskName };
2✔
286

287
        if (mode === 'failed') {
2✔
288
            filter.status = 'failed';
1✔
289
        } else if (mode === 'all') {
1!
290
            filter.status = { $in: ['failed', 'completed'] };
1✔
291
        }
292

293
        // Use safe reset logic
294
        const result = await this.resetTasks(filter);
2✔
295
        return { modifiedCount: result.modifiedCount };
2✔
296
    }
297

298
    private async ensureIndexes(): Promise<void> {
299
        // Optimized index for findAndLockNextTask (ESR Rule Compliance)
300
        // 1. Equality: task (via $in)
301
        // 2. Sort: nextRunAt
302
        // 3. Range: nextRunAt ($lte)
303
        // Partial Index: only index tasks that are eligible to run (nextRunAt != null)
304
        await this.tasksCollection.createIndex(
162✔
305
            {
306
                task: 1,
307
                nextRunAt: 1,
308
            },
309
            {
310
                partialFilterExpression: { nextRunAt: { $type: 'date' } },
311
                name: 'polling_idx',
312
            },
313
        );
314

315
        // Unique index to ensure one task per task definition per source document
316
        await this.tasksCollection.createIndex({ sourceDocId: 1, task: 1 }, { unique: true });
162✔
317
    }
318

319
    /**
320
     * Periodically cleans up orphaned tasks that match the cleanupPolicy.
321
     * This runs on a schedule (e.g. hourly) in the Leader instance.
322
     */
323
    public async deleteOrphanedTasks(
324
        taskName: string,
325
        sourceCollectionName: string,
326
        taskFilter: Filter<Document>,
327
        cleanupPolicy: { deleteWhen: CleanupDeleteWhen; keepForMs: number },
328
        shouldStop: () => boolean,
329
        limitToSourceIds?: unknown[],
330
    ): Promise<void> {
331
        const { deleteWhen, keepForMs } = cleanupPolicy;
165✔
332

333
        if (deleteWhen === 'never') {
165✔
334
            return;
5✔
335
        }
336
        const cutoffDate = new Date(Date.now() - keepForMs);
160✔
337

338
        const matchStage: Document = {
160✔
339
            task: taskName,
340
            $expr: {
341
                $lt: [
342
                    {
343
                        $max: ['$updatedAt', { $ifNull: ['$lastFinalizedAt', '$createdAt'] }],
344
                    },
345
                    cutoffDate,
346
                ],
347
            },
348
        };
349

350
        if (limitToSourceIds && limitToSourceIds.length > 0) {
160✔
351
            matchStage.sourceDocId = { $in: limitToSourceIds };
11✔
352
        }
353

354
        const pipeline: Document[] = [
160✔
355
            {
356
                $match: matchStage,
357
            },
358
        ];
359

360
        // We need to determine if the source document is "gone" or "no longer matching".
361
        // Strategy:
362
        // 1. If deleteWhen === 'sourceDocumentDeleted', we just check if document exists by ID.
363
        // 2. If deleteWhen === 'sourceDocumentDeletedOrNoLongerMatching', we check if document exists AND matches filter.
364

365
        const lookupPipeline: Document[] = [{ $match: { $expr: { $eq: ['$_id', '$$sId'] } } }];
160✔
366

367
        if (deleteWhen === 'sourceDocumentDeletedOrNoLongerMatching' && Object.keys(taskFilter).length > 0) {
160✔
368
            // taskFilter (normalized) is an Expression body. Must wrap in $expr for $match.
369
            lookupPipeline.push({ $match: { $expr: taskFilter } });
8✔
370
        }
371

372
        pipeline.push(
160✔
373
            {
374
                $lookup: {
375
                    from: sourceCollectionName,
376
                    let: { sId: '$sourceDocId' },
377
                    pipeline: lookupPipeline,
378
                    as: 'orphanCheck',
379
                },
380
            },
381
            {
382
                $match: {
383
                    'orphanCheck.0': { $exists: false }, // If empty, it means it was deleted OR didn't match filter
384
                },
385
            },
386
            {
387
                $project: {
388
                    _id: 1,
389
                    orphanCheck: 1,
390
                },
391
            },
392
        );
393

394
        await processInBatches(
160✔
395
            this.tasksCollection,
396
            pipeline,
397
            (task) => task._id,
5✔
398
            async (batch) => {
399
                // eslint-disable-next-line @typescript-eslint/no-explicit-any
400
                await this.tasksCollection.deleteMany({ _id: { $in: batch as any } });
5✔
401
                this.onInfo({
5✔
402
                    message: `Cleaned up ${batch.length} orphaned tasks for '${taskName}'`,
403
                    code: CODE_REACTIVE_TASK_CLEANUP,
404
                    meta: { count: batch.length },
405
                });
406
            },
407
            { batchSize: 1000, shouldStop },
408
        );
409
    }
410

411
    public async getStatistics(filter: Filter<ReactiveTaskRecord<T>>, options: ReactiveTaskStatsOptions): Promise<ReactiveTaskStatsResult> {
412
        const pipeline: object[] = [];
81✔
413

414
        if (Object.keys(filter).length > 0) {
81✔
415
            pipeline.push({ $match: filter });
8✔
416
        }
417

418
        const facets: Record<string, object[]> = {};
81✔
419

420
        if (options.includeStatusCounts) {
81!
421
            const groupId = options.groupByTask ? { task: '$task', status: '$status' } : '$status';
81✔
422
            facets.statuses = [{ $group: { _id: groupId, count: { $sum: 1 } } }];
81✔
423
        }
424

425
        if (options.includeErrorCount) {
81✔
426
            if (options.groupByTask) {
8✔
427
                facets.errorCounts = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $group: { _id: '$task', count: { $sum: 1 } } }];
1✔
428
            } else {
429
                facets.errorCount = [{ $match: { lastError: { $exists: true, $ne: null } } }, { $count: 'count' }];
7✔
430
            }
431
        }
432

433
        if (options.includeGlobalLag) {
81✔
434
            facets.globalLag = [{ $match: { status: 'pending' } }, { $group: { _id: '$task', minScheduledAt: { $min: '$dueAt' } } }];
73✔
435
        }
436

437
        pipeline.push({ $facet: facets });
81✔
438

439
        const projection: Record<string, unknown> = { statuses: 1 };
81✔
440

441
        if (options.includeErrorCount) {
81✔
442
            if (options.groupByTask) {
8✔
443
                projection.errorCounts = 1;
1✔
444
            } else {
445
                projection.errorCount = { $ifNull: [{ $arrayElemAt: ['$errorCount.count', 0] }, 0] };
7✔
446
            }
447
        }
448

449
        if (options.includeGlobalLag) {
81✔
450
            projection.globalLag = 1;
73✔
451
        }
452

453
        pipeline.push({ $project: projection });
81✔
454

455
        const [result] = await this.tasksCollection.aggregate<ReactiveTaskStatsResult>(pipeline, { readPreference: options.readPreference }).toArray();
81✔
456

457
        return (
81✔
458
            result || {
81!
459
                statuses: [],
460
                errorCount: options.includeErrorCount ? 0 : undefined,
×
461
                globalLag: options.includeGlobalLag ? [] : undefined,
×
462
            }
463
        );
464
    }
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc