• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20865709108

09 Jan 2026 09:12PM UTC coverage: 91.874% (-0.07%) from 91.94%
20865709108

push

github

VaclavObornik
2.4.5

1259 of 1460 branches covered (86.23%)

Branch coverage included in aggregate %.

2020 of 2109 relevant lines covered (95.78%)

372.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.62
/src/reactiveTasks/ReactiveTaskPlanner.ts
1
import { EJSON } from 'bson';
229✔
2
import * as _debug from 'debug';
229✔
3
import {
4
    ChangeStream,
5
    ChangeStreamDeleteDocument,
6
    ChangeStreamInsertDocument,
7
    ChangeStreamReplaceDocument,
8
    ChangeStreamUpdateDocument,
9
    Document,
10
    MongoError,
11
    ResumeToken,
12
} from 'mongodb';
13
import { getMongoClient } from '../getMongoClient';
229✔
14
import { GlobalsCollection } from '../globalsCollection';
15
import { defaultOnError, OnError } from '../OnError';
229✔
16
import { defaultOnInfo, OnInfo } from '../OnInfo';
229✔
17
import { prefixFilterKeys } from '../prefixFilterKeys';
229✔
18
import { ReactiveTaskOps } from './ReactiveTaskOps';
229✔
19
import { ReactiveTaskReconciler } from './ReactiveTaskReconciler';
229✔
20
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
21
import {
229✔
22
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
23
    CODE_REACTIVE_TASK_PLANNER_STARTED,
24
    CODE_REACTIVE_TASK_PLANNER_STOPPED,
25
    CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
26
    EvolutionConfig,
27
    MetaDocument,
28
    ReactiveTaskInternal,
29
    REACTIVE_TASK_META_DOC_ID,
30
} from './ReactiveTaskTypes';
31
import stringify = require('fast-json-stable-stringify');
229✔
32

33
const debug = _debug('mongodash:reactiveTasks:planner');
229✔
34

35
type FilteredChangeStreamDocument = Pick<
36
    ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument,
37
    '_id' | 'operationType' | 'ns' | 'documentKey' | 'clusterTime'
38
>;
39

40
export interface PlannerCallbacks {
41
    onStreamError: () => void;
42
    onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void;
43
}
44

45
/**
46
 * Responsible for listening to MongoDB Change Stream events and planning tasks.
47
 *
48
 * Responsibilities:
49
 * - Manages the lifecycle of the Change Stream (start, stop, error handling).
50
 * - Batches Change Stream events to reduce database load.
51
 * - Coordinates with `ReactiveTaskOps` to generate and execute task operations.
52
 * - Coordinates with `ReactiveTaskReconciler` to handle reconciliation when the stream is interrupted or history is lost.
53
 * - Handles critical errors like `ChangeStreamHistoryLost` (code 280) by triggering reconciliation.
54
 */
55
export class ReactiveTaskPlanner {
229✔
56
    private changeStream: ChangeStream | null = null;
138✔
57
    private taskBatch = new Map<string, FilteredChangeStreamDocument>();
138✔
58
    private taskBatchLastResumeToken: ResumeToken | null = null;
138✔
59
    private batchFlushTimer: NodeJS.Timeout | null = null;
138✔
60
    private batchFirstEventTime: number | null = null;
138✔
61
    private isFlushing = false;
138✔
62
    private metaDocId = REACTIVE_TASK_META_DOC_ID;
138✔
63
    private lastClusterTime: number | null = null;
138✔
64

65
    private ops: ReactiveTaskOps;
66
    private reconciler: ReactiveTaskReconciler;
67

68
    private get isStoppedTester(): () => boolean {
69
        return () => this.changeStream === null;
1,530✔
70
    }
71

72
    constructor(
73
        private globalsCollection: GlobalsCollection,
138✔
74
        private instanceId: string,
138✔
75
        private registry: ReactiveTaskRegistry,
138✔
76
        private callbacks: PlannerCallbacks,
138✔
77
        private internalOptions: { batchSize: number; batchIntervalMs: number; minBatchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
138✔
78
        private onInfo: OnInfo = defaultOnInfo,
138✔
79
        private onError: OnError = defaultOnError,
138✔
80
    ) {
81
        this.ops = new ReactiveTaskOps(registry, callbacks.onTaskPlanned);
138✔
82
        this.reconciler = new ReactiveTaskReconciler(instanceId, globalsCollection, registry, this.ops, onInfo, internalOptions);
138✔
83
    }
84

85
    public async start(): Promise<void> {
86
        this.onInfo({
123✔
87
            message: `Reactive task planner started.`,
88
            code: CODE_REACTIVE_TASK_PLANNER_STARTED,
89
        });
90

91
        // 1. Check for schema/logic evolution (Filter changes, Version upgrades)
92
        await this.checkEvolutionStrategies();
123✔
93

94
        // 2. Start stream first to ensure we don't miss events during reconciliation
95
        // We capture the time AFTER starting to ensure overlap with the stream.
96
        // This prevents a gap where events occurring between "now" and "stream start" would be missed.
97
        await this.startChangeStream();
123✔
98

99
        // Pass the current stream instance to reconcile. If stream fails/restarts, instance changes and reconcile aborts.
100
        if (this.changeStream) {
123!
101
            await this.reconciler.reconcile(this.isStoppedTester);
123✔
102
        }
103
    }
104

105
    public async stop(): Promise<void> {
106
        await this.stopChangeStream();
133✔
107
        this.onInfo({
133✔
108
            message: `Reactive task planner stopped.`,
109
            code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
110
        });
111
    }
112

113
    public async saveResumeToken(token: ResumeToken, lastClusterTime?: Date): Promise<void> {
114
        const setFields: Document = { 'streamState.resumeToken': token };
404✔
115
        if (lastClusterTime) {
404✔
116
            setFields['streamState.lastClusterTime'] = lastClusterTime;
392✔
117
        }
118

119
        await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $set: setFields }, { upsert: true });
404✔
120
    }
121

122
    public get isEmpty(): boolean {
123
        return this.taskBatch.size === 0 && !this.isFlushing;
1,917✔
124
    }
125

126
    public async onHeartbeat(): Promise<void> {
127
        // Save resume token if stream is running and idle
128
        if (this.changeStream && this.isEmpty) {
132✔
129
            await this.saveResumeToken(this.changeStream.resumeToken, this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined);
120✔
130
        }
131

132
        // Periodic cleanup of orphaned tasks
133
        await this.reconciler.performPeriodicCleanup(this.isStoppedTester);
132✔
134
    }
135

136
    private isStopping = false;
138✔
137

138
    private async startChangeStream(): Promise<void> {
139
        if (this.changeStream) {
124✔
140
            await this.stopChangeStream();
1✔
141
        }
142

143
        try {
124✔
144
            const streamOptions: Document = {
124✔
145
                resumeAfter: await this.getChangeStreamResumeToken(),
146
                fullDocument: 'updateLookup',
147
            };
148

149
            if (!streamOptions.resumeAfter) {
124✔
150
                // get current server time to guarantee we get any operation from the current time
151
                // even if the watch operation took a time, since it is async and we don't have
152
                // any guaranty at what point we start listening
153
                const serverStatus = await getMongoClient().db().command({ hello: 1 });
114✔
154
                if (serverStatus && serverStatus.operationTime) {
114!
155
                    this.onInfo({
114✔
156
                        message: `No token found. Starting from operationTime: ${serverStatus.operationTime}`,
157
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
158
                        operationTime: serverStatus.operationTime,
159
                    });
160
                    streamOptions.startAtOperationTime = serverStatus.operationTime;
114✔
161
                } else {
162
                    // Fallback pro standalone instance bez oplogu (méně časté v produkci)
163
                    this.onInfo({
×
164
                        message: `Could not fetch operationTime. Starting standard watch.`,
165
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
166
                    });
167
                }
168
            }
169

170
            const pipeline = this.getChangeStreamPipeline();
124✔
171
            debug(`[Scheduler ${this.instanceId}] Change Stream Pipeline: `, JSON.stringify(pipeline, null, 2));
124✔
172

173
            // Determine which database to watch
174
            // We assume all monitored collections are in the same database for now.
175
            const tasks = this.registry.getAllTasks();
124✔
176
            let dbToWatch = getMongoClient().db(); // Default
124✔
177
            if (tasks.length > 0) {
124!
178
                const dbName = tasks[0].sourceCollection.dbName;
124✔
179
                dbToWatch = getMongoClient().db(dbName);
124✔
180
                debug(`[ReactiveTaskPlanner] Watching database: ${dbName}`);
124✔
181
            }
182

183
            const stream = dbToWatch.watch(pipeline, streamOptions);
124✔
184
            this.changeStream = stream;
124✔
185

186
            stream.on('change', (change: FilteredChangeStreamDocument) => {
124✔
187
                this.enqueueTaskChange(change);
1,601✔
188
            });
189
            stream.on('resumeTokenChanged', () => {
124✔
190
                if (this.isEmpty) {
1,795✔
191
                    this.lastClusterTime = Date.now() / 1000;
321✔
192
                }
193
            });
194
            stream.on('error', (error) => this.handleStreamError(error as MongoError));
124✔
195
            stream.on('close', () => {
124✔
196
                this.onInfo({
124✔
197
                    message: `Change Stream closed.`,
198
                    code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
199
                });
200
                if (!this.isStopping) {
124!
201
                    this.callbacks.onStreamError();
×
202
                }
203
            });
204
        } catch (error) {
205
            this.onInfo({
×
206
                message: `Failed to start Change Stream: ${(error as Error).message}`,
207
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
208
                error: (error as Error).message,
209
            });
210
            this.callbacks.onStreamError();
×
211
        }
212
    }
213

214
    private async stopChangeStream(): Promise<void> {
215
        if (this.changeStream) {
134✔
216
            this.isStopping = true;
124✔
217
            debug(`[Scheduler ${this.instanceId}] Stopping Change Stream...`);
124✔
218
            await this.changeStream.close();
124✔
219
            this.changeStream = null;
124✔
220
            this.isStopping = false;
124✔
221
        }
222
        await this.flushTaskBatch();
134✔
223
    }
224

225
    private getChangeStreamPipeline(): Document[] {
226
        const collectionFilters = this.registry.getAllTasks().reduce((acc, taskDef) => {
125✔
227
            const collectionName = taskDef.sourceCollection.collectionName;
131✔
228
            if (!acc.has(collectionName)) {
131✔
229
                acc.set(collectionName, { 'ns.coll': collectionName, $or: [] });
129✔
230
            }
231
            acc.get(collectionName)!.$or.push(prefixFilterKeys({ $expr: taskDef.filter || {} }, 'fullDocument'));
131✔
232
            return acc;
131✔
233
        }, new Map<string, Document>());
234

235
        const pipeline = [
125✔
236
            {
237
                $match: {
238
                    operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
239
                    $or: Array.from(collectionFilters.values()),
240
                },
241
            },
242
            {
243
                $project: {
244
                    _id: 1,
245
                    operationType: 1,
246
                    ns: 1,
247
                    documentKey: 1,
248
                    clusterTime: 1,
249
                },
250
            },
251
        ];
252
        return pipeline;
125✔
253
    }
254

255
    private async getChangeStreamResumeToken(): Promise<ResumeToken | undefined> {
256
        const state = (await this.globalsCollection.findOne({
124✔
257
            _id: this.metaDocId,
258
        })) as MetaDocument | null;
259
        debug(`[DEBUG] getChangeStreamResumeToken loaded state (${this.metaDocId}): `, JSON.stringify(state, null, 2));
124✔
260

261
        const token = state?.streamState?.resumeToken;
124!
262
        debug(`[DEBUG] Extracted token: `, token);
124✔
263
        return token ?? undefined;
124✔
264
    }
265

266
    private async enqueueTaskChange(change: FilteredChangeStreamDocument): Promise<void> {
267
        debug(`[Scheduler ${this.instanceId}] Change detected: `, change._id);
2,610✔
268

269
        if (change.clusterTime) {
2,610!
270
            // clusterTime is a BSON Timestamp.
271
            // .getHighBits() returns the seconds since epoch.
272
            this.lastClusterTime = change.clusterTime.getHighBits();
2,610✔
273
        }
274

275
        const docId = EJSON.stringify(change.documentKey._id, { relaxed: false });
2,610✔
276
        this.taskBatch.set(docId, change);
2,610✔
277
        this.taskBatchLastResumeToken = change._id;
2,610✔
278

279
        const now = Date.now();
2,610✔
280

281
        // Immediate flush if batch size reached
282
        if (this.taskBatch.size >= this.internalOptions.batchSize) {
2,610✔
283
            await this.flushTaskBatch();
106✔
284
            return;
106✔
285
        }
286

287
        // Sliding Window with Max Wait Logic
288
        if (!this.batchFirstEventTime) {
2,504✔
289
            // First event of the batch
290
            this.batchFirstEventTime = now;
134✔
291
            this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
134✔
292
        } else {
293
            // Subsequent event
294
            const elapsedSinceFirst = now - this.batchFirstEventTime;
2,370✔
295

296
            if (elapsedSinceFirst >= this.internalOptions.batchIntervalMs) {
2,370✔
297
                // Max wait reached
298
                await this.flushTaskBatch();
51✔
299
            } else {
300
                // Reset sliding window timer
301
                if (this.batchFlushTimer) {
2,319✔
302
                    clearTimeout(this.batchFlushTimer);
2,214✔
303
                }
304
                this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
2,319✔
305
            }
306
        }
307
    }
308

309
    private async flushTaskBatch(): Promise<void> {
310
        if (this.batchFlushTimer) {
416✔
311
            clearTimeout(this.batchFlushTimer);
234✔
312
            this.batchFlushTimer = null;
234✔
313
        }
314

315
        if (this.taskBatch.size === 0) {
416✔
316
            return;
132✔
317
        }
318

319
        const events = Array.from(this.taskBatch.values());
284✔
320
        this.taskBatch.clear();
284✔
321

322
        const lastToken = this.taskBatchLastResumeToken;
284✔
323
        const lastClusterTime = this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined; // Capture time associated with this batch (approx)
284!
324
        this.isFlushing = true;
284✔
325

326
        try {
284✔
327
            const { idsByCollection, deletedIdsByTask } = this.groupEventsByCollection(events);
284✔
328

329
            await this.processDeletions(deletedIdsByTask);
284✔
330
            await this.executeUpsertOperations(idsByCollection);
284✔
331

332
            if (lastToken) {
284!
333
                await this.saveResumeToken(lastToken, lastClusterTime);
284✔
334
            }
335
        } catch (error) {
336
            this.onError(error as Error);
×
337
            // We lost the batch, but we can't easily retry without complicating logic.
338
            // The stream continues.
339
        } finally {
340
            this.isFlushing = false;
284✔
341
            this.batchFirstEventTime = null;
284✔
342
        }
343
    }
344

345
    private groupEventsByCollection(events: FilteredChangeStreamDocument[]) {
346
        const idsByCollection = new Map<string, Set<unknown>>();
284✔
347
        // Map<TaskName, Set<SourceId>>
348
        const deletedIdsByTask = new Map<string, Set<unknown>>();
284✔
349

350
        for (const event of events) {
284✔
351
            if (!event.ns || !event.ns.coll) continue;
1,599!
352
            const collectionName = event.ns.coll;
1,599✔
353

354
            if (event.operationType === 'delete') {
1,599✔
355
                const docId = event.documentKey._id;
30✔
356
                const entry = this.registry.getEntry(collectionName);
30✔
357

358
                if (entry) {
30!
359
                    for (const taskDef of entry.tasks.values()) {
30✔
360
                        let docIds = deletedIdsByTask.get(taskDef.task);
30✔
361
                        if (!docIds) {
30✔
362
                            docIds = new Set();
12✔
363
                            deletedIdsByTask.set(taskDef.task, docIds);
12✔
364
                        }
365
                        docIds.add(docId);
30✔
366
                    }
367
                }
368
            } else {
369
                // insert, update, replace
370
                if (!idsByCollection.has(collectionName)) {
1,569✔
371
                    idsByCollection.set(collectionName, new Set());
275✔
372
                }
373
                idsByCollection.get(collectionName)!.add(event.documentKey._id);
1,569✔
374
            }
375
        }
376

377
        return { idsByCollection, deletedIdsByTask };
284✔
378
    }
379

380
    private async processDeletions(deletedIdsByTask: Map<string, Set<unknown>>): Promise<void> {
381
        if (deletedIdsByTask.size > 0) {
284✔
382
            await Promise.all(
12✔
383
                Array.from(deletedIdsByTask.entries()).map(async ([taskName, ids]) => {
384
                    if (ids.size === 0) return;
12!
385

386
                    const taskDef = this.registry.getTask(taskName);
12✔
387

388
                    if (taskDef) {
12!
389
                        // We use deleteOrphanedTasks but limit it to the source IDs we just saw deleted.
390
                        // This reuses the EXACT same logic (including keepFor checks) as the background cleaner.
391
                        await taskDef.repository.deleteOrphanedTasks(
12✔
392
                            taskName,
393
                            taskDef.sourceCollection.collectionName,
394
                            taskDef.filter || {},
24✔
395
                            taskDef.cleanupPolicyParsed,
396
                            () => false, // shouldStop: immediate execution, no need to stop
1✔
397
                            Array.from(ids),
398
                        );
399
                    }
400
                }),
401
            );
402
        }
403
    }
404

405
    private async executeUpsertOperations(idsByCollection: Map<string, Set<unknown>>): Promise<void> {
406
        if (idsByCollection.size > 0) {
284✔
407
            await Promise.all(
275✔
408
                Array.from(idsByCollection.entries()).map(async ([collectionName, ids]) => {
409
                    if (ids.size === 0) return;
275!
410
                    try {
275✔
411
                        await this.ops.executePlanningPipeline(collectionName, Array.from(ids));
275✔
412
                    } catch (error) {
413
                        this.onError(error as Error);
×
414
                    }
415
                }),
416
            );
417
        }
418
    }
419

420
    private async handleStreamError(error: MongoError): Promise<void> {
421
        if (error.code === 280) {
2✔
422
            this.onError(new Error(`Critical error: Oplog history lost(ChangeStreamHistoryLost).Resetting Resume Token.Original error: ${error.message} `));
1✔
423
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $unset: { 'streamState.resumeToken': '', reconciliation: '' } });
1✔
424

425
            this.onInfo({
1✔
426
                message: `Oplog lost, triggering reconciliation...`,
427
                code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
428
            });
429

430
            // Start stream first to capture new events
431
            await this.startChangeStream();
1✔
432
            if (this.changeStream) {
1!
433
                await this.reconciler.reconcile(this.isStoppedTester);
1✔
434
            }
435
        } else {
436
            this.onInfo({
1✔
437
                message: `Change Stream error: ${error.message} `,
438
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
439
                error: error.message,
440
            });
441
            this.callbacks.onStreamError();
1✔
442
        }
443
    }
444

445
    private async checkEvolutionStrategies(): Promise<void> {
446
        const metaDoc = (await this.globalsCollection.findOne({ _id: this.metaDocId })) as MetaDocument | null;
123✔
447
        const storedTasks = metaDoc?.tasks || {};
123✔
448
        const update: Document = {};
123✔
449
        const tasksToReconcile: string[] = [];
123✔
450
        let needsUpdate = false;
123✔
451

452
        const allTasks = this.registry.getAllTasks();
123✔
453

454
        for (const taskDef of allTasks) {
123✔
455
            const taskName = taskDef.task;
129✔
456
            const defaultEvolution: EvolutionConfig = {
129✔
457
                handlerVersion: 1,
458
                onHandlerVersionChange: 'none',
459
                reconcileOnTriggerChange: true,
460
            };
461
            const evolution = { ...defaultEvolution, ...(taskDef.evolution || {}) };
129✔
462

463
            const storedState = storedTasks[taskName];
129✔
464

465
            const triggerChanged = this.checkTriggerEvolution(taskName, taskDef, evolution, storedState, update, tasksToReconcile);
129✔
466
            if (triggerChanged) needsUpdate = true;
129✔
467

468
            const logicChanged = await this.checkLogicEvolution(taskName, taskDef, evolution, storedState, update);
129✔
469
            if (logicChanged) needsUpdate = true;
129✔
470
        }
471

472
        if (needsUpdate) {
123✔
473
            debug(`[DEBUG] Updating meta doc with: `, JSON.stringify(update, null, 2));
120✔
474
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, update, { upsert: true });
120✔
475
        } else {
476
            debug(`[DEBUG] No updates needed for meta doc.`);
3✔
477
        }
478

479
        if (tasksToReconcile.length > 0) {
123✔
480
            await this.reconciler.markAsUnreconciled(tasksToReconcile);
117✔
481
        }
482
    }
483

484
    private checkTriggerEvolution(
485
        taskName: string,
486
        taskDef: ReactiveTaskInternal<Document>,
487
        evolution: EvolutionConfig,
488
        storedState: { triggerConfig?: { filter: Document; watchProjection: Document }; handlerVersion?: number },
489
        update: Document,
490
        tasksToReconcile: string[],
491
    ): boolean {
492
        const currentTriggerConfig = {
129✔
493
            filter: taskDef.filter || {},
221✔
494
            watchProjection: taskDef.watchProjection || {},
244✔
495
        };
496
        const currentTriggerSig = stringify(currentTriggerConfig);
129✔
497
        const storedTriggerSig = storedState?.triggerConfig ? stringify(storedState.triggerConfig) : null;
129✔
498

499
        if (currentTriggerSig !== storedTriggerSig) {
129✔
500
            const shouldReconcile = evolution.reconcileOnTriggerChange !== false;
123✔
501
            const msg = storedTriggerSig === null ? `Initial trigger config captured for [${taskName}].` : `Trigger config changed for [${taskName}].`;
123✔
502

503
            if (shouldReconcile) {
123✔
504
                debug(`${msg} Queueing reconciliation.`);
122✔
505
                tasksToReconcile.push(taskName);
122✔
506
            } else {
507
                debug(`[mongodash] ${msg} Reconciliation disabled.`);
1✔
508
            }
509

510
            if (!update.$set) update.$set = {};
123✔
511
            update.$set[`tasks.${taskName}.triggerConfig`] = currentTriggerConfig;
123✔
512
            return true;
123✔
513
        }
514
        return false;
6✔
515
    }
516

517
    private async checkLogicEvolution(
518
        taskName: string,
519
        taskDef: ReactiveTaskInternal<Document>,
520
        evolution: EvolutionConfig,
521
        storedState: { triggerConfig?: unknown; handlerVersion?: number },
522
        update: Document,
523
    ): Promise<boolean> {
524
        const currentVersion = evolution.handlerVersion ?? 1;
129!
525
        const storedVersion = storedState?.handlerVersion ?? (storedState ? 0 : 1);
129!
526

527
        if (currentVersion > storedVersion) {
129✔
528
            const policy = evolution.onHandlerVersionChange || 'none';
2!
529
            debug(`Handler upgraded for [${taskName}](v${storedVersion} -> v${currentVersion}).Policy: ${policy} `);
2✔
530

531
            const entry = this.registry.getEntry(taskDef.sourceCollection.collectionName);
2✔
532
            if (entry) {
2!
533
                if (policy === 'reprocess_failed') {
2✔
534
                    await entry.repository.resetTasksForUpgrade(taskName, 'failed');
1✔
535
                } else if (policy === 'reprocess_all') {
1!
536
                    await entry.repository.resetTasksForUpgrade(taskName, 'all');
1✔
537
                }
538
            }
539

540
            if (!update.$set) update.$set = {};
2!
541
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
2✔
542
            return true;
2✔
543
        } else if (currentVersion < storedVersion) {
127!
544
            debug(
×
545
                `[mongodash] ReactiveTask[${taskName}]: Current handlerVersion(${currentVersion}) is LOWER than stored version(${storedVersion}).Rollback detected ? `,
546
            );
547
        } else if (!storedState && currentVersion === 1) {
127✔
548
            // Safe Adoption
549
            if (!update.$set) update.$set = {};
120!
550
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
120✔
551
            return true;
120✔
552
        }
553
        return false;
7✔
554
    }
555
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc