• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20986486768

14 Jan 2026 07:48AM UTC coverage: 91.769% (-0.1%) from 91.874%
20986486768

push

github

VaclavObornik
2.4.6

1265 of 1470 branches covered (86.05%)

Branch coverage included in aggregate %.

2024 of 2114 relevant lines covered (95.74%)

371.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.91
/src/reactiveTasks/ReactiveTaskPlanner.ts
1
import { EJSON } from 'bson';
229✔
2
import * as _debug from 'debug';
229✔
3
import {
4
    ChangeStream,
5
    ChangeStreamDeleteDocument,
6
    ChangeStreamInsertDocument,
7
    ChangeStreamReplaceDocument,
8
    ChangeStreamUpdateDocument,
9
    Document,
10
    MongoError,
11
    ResumeToken,
12
} from 'mongodb';
13
import { getMongoClient } from '../getMongoClient';
229✔
14
import { GlobalsCollection } from '../globalsCollection';
15
import { defaultOnError, OnError } from '../OnError';
229✔
16
import { defaultOnInfo, OnInfo } from '../OnInfo';
229✔
17
import { prefixFilterKeys } from '../prefixFilterKeys';
229✔
18
import { ReactiveTaskOps } from './ReactiveTaskOps';
229✔
19
import { ReactiveTaskReconciler } from './ReactiveTaskReconciler';
229✔
20
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
21
import {
229✔
22
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
23
    CODE_REACTIVE_TASK_PLANNER_STARTED,
24
    CODE_REACTIVE_TASK_PLANNER_STOPPED,
25
    CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
26
    EvolutionConfig,
27
    MetaDocument,
28
    ReactiveTaskInternal,
29
    REACTIVE_TASK_META_DOC_ID,
30
} from './ReactiveTaskTypes';
31
import stringify = require('fast-json-stable-stringify');
229✔
32

33
const debug = _debug('mongodash:reactiveTasks:planner');
229✔
34

35
type FilteredChangeStreamDocument = Pick<
36
    ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument,
37
    '_id' | 'operationType' | 'ns' | 'documentKey' | 'clusterTime'
38
>;
39

40
export interface PlannerCallbacks {
41
    onStreamError: () => void;
42
    onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void;
43
}
44

45
/**
46
 * Responsible for listening to MongoDB Change Stream events and planning tasks.
47
 *
48
 * Responsibilities:
49
 * - Manages the lifecycle of the Change Stream (start, stop, error handling).
50
 * - Batches Change Stream events to reduce database load.
51
 * - Coordinates with `ReactiveTaskOps` to generate and execute task operations.
52
 * - Coordinates with `ReactiveTaskReconciler` to handle reconciliation when the stream is interrupted or history is lost.
53
 * - Handles critical errors like `ChangeStreamHistoryLost` (code 280) by triggering reconciliation.
54
 */
55
export class ReactiveTaskPlanner {
229✔
56
    private changeStream: ChangeStream | null = null;
138✔
57
    private taskBatch = new Map<string, FilteredChangeStreamDocument>();
138✔
58
    private taskBatchLastResumeToken: ResumeToken | null = null;
138✔
59
    private batchFlushTimer: NodeJS.Timeout | null = null;
138✔
60
    private batchFirstEventTime: number | null = null;
138✔
61
    private isFlushing = false;
138✔
62
    private metaDocId = REACTIVE_TASK_META_DOC_ID;
138✔
63
    private lastClusterTime: number | null = null;
138✔
64

65
    private ops: ReactiveTaskOps;
66
    private reconciler: ReactiveTaskReconciler;
67

68
    constructor(
69
        private globalsCollection: GlobalsCollection,
138✔
70
        private instanceId: string,
138✔
71
        private registry: ReactiveTaskRegistry,
138✔
72
        private callbacks: PlannerCallbacks,
138✔
73
        private internalOptions: { batchSize: number; batchIntervalMs: number; minBatchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
138✔
74
        private onInfo: OnInfo = defaultOnInfo,
138✔
75
        private onError: OnError = defaultOnError,
138✔
76
    ) {
77
        this.ops = new ReactiveTaskOps(registry, callbacks.onTaskPlanned);
138✔
78
        this.reconciler = new ReactiveTaskReconciler(instanceId, globalsCollection, registry, this.ops, onInfo, internalOptions);
138✔
79
    }
80

81
    public async start(): Promise<void> {
82
        this.onInfo({
123✔
83
            message: `Reactive task planner started.`,
84
            code: CODE_REACTIVE_TASK_PLANNER_STARTED,
85
        });
86

87
        // 1. Check for schema/logic evolution (Filter changes, Version upgrades)
88
        await this.checkEvolutionStrategies();
123✔
89

90
        // 2. Start stream first to ensure we don't miss events during reconciliation
91
        // We capture the time AFTER starting to ensure overlap with the stream.
92
        // This prevents a gap where events occurring between "now" and "stream start" would be missed.
93
        await this.startChangeStream();
123✔
94

95
        // Pass the current stream instance to reconcile. If stream fails/restarts, instance changes and reconcile aborts.
96
        if (this.changeStream) {
123!
97
            await this.reconciler.reconcile(this.getIsStoppedTester());
123✔
98
        }
99
    }
100

101
    public async stop(): Promise<void> {
102
        await this.stopChangeStream();
133✔
103
        this.onInfo({
133✔
104
            message: `Reactive task planner stopped.`,
105
            code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
106
        });
107
    }
108

109
    private getIsStoppedTester(): () => boolean {
110
        const changeStreamInstance = this.changeStream;
255✔
111
        return () => this.changeStream !== changeStreamInstance || this.changeStream === null;
1,527✔
112
    }
113

114
    public async saveResumeToken(token: ResumeToken, lastClusterTime?: Date): Promise<void> {
115
        const setFields: Document = { 'streamState.resumeToken': token };
356✔
116
        if (lastClusterTime) {
356✔
117
            setFields['streamState.lastClusterTime'] = lastClusterTime;
342✔
118
        }
119

120
        await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $set: setFields }, { upsert: true });
356✔
121
    }
122

123
    public get isEmpty(): boolean {
124
        return this.taskBatch.size === 0 && !this.isFlushing;
1,917✔
125
    }
126

127
    public async onHeartbeat(): Promise<void> {
128
        // Save resume token if stream is running and idle
129
        if (this.changeStream && this.isEmpty) {
132✔
130
            await this.saveResumeToken(this.changeStream.resumeToken, this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined);
121✔
131
        }
132

133
        // Periodic cleanup of orphaned tasks
134
        await this.reconciler.performPeriodicCleanup(this.getIsStoppedTester());
132✔
135
    }
136

137
    private isStopping = false;
138✔
138

139
    private async startChangeStream(): Promise<void> {
140
        if (this.changeStream) {
124✔
141
            await this.stopChangeStream();
1✔
142
        }
143

144
        try {
124✔
145
            const streamOptions: Document = {
124✔
146
                resumeAfter: await this.getChangeStreamResumeToken(),
147
                fullDocument: 'updateLookup',
148
            };
149

150
            if (!streamOptions.resumeAfter) {
124✔
151
                // get current server time to guarantee we get any operation from the current time
152
                // even if the watch operation took a time, since it is async and we don't have
153
                // any guaranty at what point we start listening
154
                const serverStatus = await getMongoClient().db().command({ hello: 1 });
114✔
155
                if (serverStatus && serverStatus.operationTime) {
114!
156
                    this.onInfo({
114✔
157
                        message: `No token found. Starting from operationTime: ${serverStatus.operationTime}`,
158
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
159
                        operationTime: serverStatus.operationTime,
160
                    });
161
                    streamOptions.startAtOperationTime = serverStatus.operationTime;
114✔
162
                } else {
163
                    // Fallback pro standalone instance bez oplogu (méně časté v produkci)
164
                    this.onInfo({
×
165
                        message: `Could not fetch operationTime. Starting standard watch.`,
166
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
167
                    });
168
                }
169
            }
170

171
            const pipeline = this.getChangeStreamPipeline();
124✔
172
            if (debug.enabled) {
124!
173
                debug(`[Scheduler ${this.instanceId}] Change Stream Pipeline: `, JSON.stringify(pipeline, null, 2));
×
174
            }
175

176
            // Determine which database to watch
177
            // We assume all monitored collections are in the same database for now.
178
            const tasks = this.registry.getAllTasks();
124✔
179
            let dbToWatch = getMongoClient().db(); // Default
124✔
180
            if (tasks.length > 0) {
124!
181
                // Log all tasks and their source collections for debugging
182
                debug(`[ReactiveTaskPlanner] Registered tasks: ${tasks.map((t) => t.task + '(' + t.sourceCollection.collectionName + ')').join(', ')}`);
130✔
183

184
                const dbName = tasks[0].sourceCollection.dbName;
124✔
185
                dbToWatch = getMongoClient().db(dbName);
124✔
186
            }
187

188
            debug(`[ReactiveTaskPlanner] Watching database: ${dbToWatch.databaseName}`);
124✔
189

190
            const stream = dbToWatch.watch(pipeline, streamOptions);
124✔
191
            this.changeStream = stream;
124✔
192

193
            stream.on('change', (change: FilteredChangeStreamDocument) => {
124✔
194
                this.enqueueTaskChange(change);
1,601✔
195
            });
196
            stream.on('resumeTokenChanged', () => {
124✔
197
                if (this.isEmpty) {
1,794✔
198
                    this.lastClusterTime = Date.now() / 1000;
320✔
199
                }
200
            });
201
            stream.on('error', (error) => this.handleStreamError(error as MongoError));
124✔
202
            stream.on('close', () => {
124✔
203
                this.onInfo({
124✔
204
                    message: `Change Stream closed.`,
205
                    code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
206
                });
207
                if (!this.isStopping) {
124!
208
                    this.callbacks.onStreamError();
×
209
                }
210
            });
211
        } catch (error) {
212
            this.onInfo({
×
213
                message: `Failed to start Change Stream: ${(error as Error).message}`,
214
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
215
                error: (error as Error).message,
216
            });
217
            this.callbacks.onStreamError();
×
218
        }
219
    }
220

221
    private async stopChangeStream(): Promise<void> {
222
        if (this.changeStream) {
134✔
223
            this.isStopping = true;
124✔
224
            debug(`[Scheduler ${this.instanceId}] Stopping Change Stream...`);
124✔
225
            await this.changeStream.close();
124✔
226
            this.changeStream = null;
124✔
227
            this.isStopping = false;
124✔
228
        }
229
        await this.flushTaskBatch();
134✔
230
    }
231

232
    private getChangeStreamPipeline(): Document[] {
233
        const collectionFilters = this.registry.getAllTasks().reduce((acc, taskDef) => {
125✔
234
            const collectionName = taskDef.sourceCollection.collectionName;
131✔
235
            if (!acc.has(collectionName)) {
131✔
236
                acc.set(collectionName, { 'ns.coll': collectionName, $or: [] });
129✔
237
            }
238
            acc.get(collectionName)!.$or.push(prefixFilterKeys({ $expr: taskDef.filter || {} }, 'fullDocument'));
131✔
239
            return acc;
131✔
240
        }, new Map<string, Document>());
241

242
        const pipeline = [
125✔
243
            {
244
                $match: {
245
                    operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
246
                    $or: Array.from(collectionFilters.values()),
247
                },
248
            },
249
            {
250
                $project: {
251
                    _id: 1,
252
                    operationType: 1,
253
                    ns: 1,
254
                    documentKey: 1,
255
                    clusterTime: 1,
256
                },
257
            },
258
        ];
259
        return pipeline;
125✔
260
    }
261

262
    private async getChangeStreamResumeToken(): Promise<ResumeToken | undefined> {
263
        const state = (await this.globalsCollection.findOne({
124✔
264
            _id: this.metaDocId,
265
        })) as MetaDocument | null;
266
        debug(`[DEBUG] getChangeStreamResumeToken loaded state (${this.metaDocId}): `, JSON.stringify(state, null, 2));
124✔
267

268
        const token = state?.streamState?.resumeToken;
124!
269
        debug(`[DEBUG] Extracted token: `, token);
124✔
270
        return token ?? undefined;
124✔
271
    }
272

273
    private async enqueueTaskChange(change: FilteredChangeStreamDocument): Promise<void> {
274
        if (debug.enabled) {
2,610!
275
            debug(`[Scheduler ${this.instanceId}] Change detected: `, JSON.stringify(change, null, 2));
×
276
        }
277

278
        if (change.clusterTime) {
2,610!
279
            // clusterTime is a BSON Timestamp.
280
            // .getHighBits() returns the seconds since epoch.
281
            this.lastClusterTime = change.clusterTime.getHighBits();
2,610✔
282
        }
283

284
        const docId = EJSON.stringify(change.documentKey._id, { relaxed: false });
2,610✔
285
        this.taskBatch.set(docId, change);
2,610✔
286
        this.taskBatchLastResumeToken = change._id;
2,610✔
287

288
        const now = Date.now();
2,610✔
289

290
        // Immediate flush if batch size reached
291
        if (this.taskBatch.size >= this.internalOptions.batchSize) {
2,610✔
292
            await this.flushTaskBatch();
106✔
293
            return;
106✔
294
        }
295

296
        // Sliding Window with Max Wait Logic
297
        if (!this.batchFirstEventTime) {
2,504✔
298
            // First event of the batch
299
            this.batchFirstEventTime = now;
134✔
300
            this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
134✔
301
        } else {
302
            // Subsequent event
303
            const elapsedSinceFirst = now - this.batchFirstEventTime;
2,370✔
304

305
            if (elapsedSinceFirst >= this.internalOptions.batchIntervalMs) {
2,370✔
306
                // Max wait reached
307
                await this.flushTaskBatch();
1✔
308
            } else {
309
                // Reset sliding window timer
310
                if (this.batchFlushTimer) {
2,369✔
311
                    clearTimeout(this.batchFlushTimer);
2,263✔
312
                }
313
                this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
2,369✔
314
            }
315
        }
316
    }
317

318
    private async flushTaskBatch(): Promise<void> {
319
        if (this.batchFlushTimer) {
367✔
320
            clearTimeout(this.batchFlushTimer);
235✔
321
            this.batchFlushTimer = null;
235✔
322
        }
323

324
        if (this.taskBatch.size === 0) {
367✔
325
            return;
132✔
326
        }
327

328
        const events = Array.from(this.taskBatch.values());
235✔
329
        this.taskBatch.clear();
235✔
330

331
        const lastToken = this.taskBatchLastResumeToken;
235✔
332
        const lastClusterTime = this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined; // Capture time associated with this batch (approx)
235!
333
        this.isFlushing = true;
235✔
334

335
        try {
235✔
336
            const { idsByCollection, deletedIdsByTask } = this.groupEventsByCollection(events);
235✔
337

338
            await this.processDeletions(deletedIdsByTask);
235✔
339
            await this.executeUpsertOperations(idsByCollection);
235✔
340

341
            if (lastToken) {
235!
342
                await this.saveResumeToken(lastToken, lastClusterTime);
235✔
343
            }
344
        } catch (error) {
345
            this.onError(error as Error);
3✔
346
            // We lost the batch, but we can't easily retry without complicating logic.
347
            // The stream continues.
348
        } finally {
349
            this.isFlushing = false;
235✔
350
            this.batchFirstEventTime = null;
235✔
351
        }
352
    }
353

354
    private groupEventsByCollection(events: FilteredChangeStreamDocument[]) {
355
        const idsByCollection = new Map<string, Set<unknown>>();
235✔
356
        // Map<TaskName, Set<SourceId>>
357
        const deletedIdsByTask = new Map<string, Set<unknown>>();
235✔
358

359
        for (const event of events) {
235✔
360
            if (!event.ns || !event.ns.coll) continue;
1,599!
361
            const collectionName = event.ns.coll;
1,599✔
362

363
            if (event.operationType === 'delete') {
1,599✔
364
                const docId = event.documentKey._id;
30✔
365
                const entry = this.registry.getEntry(collectionName);
30✔
366

367
                if (entry) {
30!
368
                    for (const taskDef of entry.tasks.values()) {
30✔
369
                        let docIds = deletedIdsByTask.get(taskDef.task);
30✔
370
                        if (!docIds) {
30✔
371
                            docIds = new Set();
12✔
372
                            deletedIdsByTask.set(taskDef.task, docIds);
12✔
373
                        }
374
                        docIds.add(docId);
30✔
375
                    }
376
                }
377
            } else {
378
                // insert, update, replace
379
                if (!idsByCollection.has(collectionName)) {
1,569✔
380
                    idsByCollection.set(collectionName, new Set());
226✔
381
                }
382
                idsByCollection.get(collectionName)!.add(event.documentKey._id);
1,569✔
383
            }
384
        }
385

386
        return { idsByCollection, deletedIdsByTask };
235✔
387
    }
388

389
    private async processDeletions(deletedIdsByTask: Map<string, Set<unknown>>): Promise<void> {
390
        if (deletedIdsByTask.size > 0) {
235✔
391
            await Promise.all(
12✔
392
                Array.from(deletedIdsByTask.entries()).map(async ([taskName, ids]) => {
393
                    if (ids.size === 0) return;
12!
394

395
                    const taskDef = this.registry.getTask(taskName);
12✔
396

397
                    if (taskDef) {
12!
398
                        // We use deleteOrphanedTasks but limit it to the source IDs we just saw deleted.
399
                        // This reuses the EXACT same logic (including keepFor checks) as the background cleaner.
400
                        await taskDef.repository.deleteOrphanedTasks(
12✔
401
                            taskName,
402
                            taskDef.sourceCollection.collectionName,
403
                            taskDef.filter || {},
24✔
404
                            taskDef.cleanupPolicyParsed,
405
                            () => false, // shouldStop: immediate execution, no need to stop
1✔
406
                            Array.from(ids),
407
                        );
408
                    }
409
                }),
410
            );
411
        }
412
    }
413

414
    private async executeUpsertOperations(idsByCollection: Map<string, Set<unknown>>): Promise<void> {
415
        if (idsByCollection.size > 0) {
235✔
416
            await Promise.all(
226✔
417
                Array.from(idsByCollection.entries()).map(async ([collectionName, ids]) => {
418
                    if (ids.size === 0) return;
226!
419
                    try {
226✔
420
                        await this.ops.executePlanningPipeline(collectionName, Array.from(ids));
226✔
421
                    } catch (error) {
422
                        this.onError(error as Error);
×
423
                    }
424
                }),
425
            );
426
        }
427
    }
428

429
    private async handleStreamError(error: MongoError): Promise<void> {
430
        if (error.code === 280) {
2✔
431
            this.onError(new Error(`Critical error: Oplog history lost(ChangeStreamHistoryLost).Resetting Resume Token.Original error: ${error.message} `));
1✔
432
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $unset: { 'streamState.resumeToken': '', reconciliation: '' } });
1✔
433

434
            this.onInfo({
1✔
435
                message: `Oplog lost, triggering reconciliation...`,
436
                code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
437
            });
438

439
            // Start stream first to capture new events
440
            await this.startChangeStream();
1✔
441
            const currentStream = this.changeStream;
1✔
442
            if (currentStream) {
1!
443
                await this.reconciler.reconcile(() => this.changeStream !== currentStream || this.changeStream === null);
3✔
444
            }
445
        } else {
446
            this.onInfo({
1✔
447
                message: `Change Stream error: ${error.message} `,
448
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
449
                error: error.message,
450
            });
451
            this.callbacks.onStreamError();
1✔
452
        }
453
    }
454

455
    private async checkEvolutionStrategies(): Promise<void> {
456
        const metaDoc = (await this.globalsCollection.findOne({ _id: this.metaDocId })) as MetaDocument | null;
123✔
457
        const storedTasks = metaDoc?.tasks || {};
123✔
458
        const update: Document = {};
123✔
459
        const tasksToReconcile: string[] = [];
123✔
460
        let needsUpdate = false;
123✔
461

462
        const allTasks = this.registry.getAllTasks();
123✔
463

464
        for (const taskDef of allTasks) {
123✔
465
            const taskName = taskDef.task;
129✔
466
            const defaultEvolution: EvolutionConfig = {
129✔
467
                handlerVersion: 1,
468
                onHandlerVersionChange: 'none',
469
                reconcileOnTriggerChange: true,
470
            };
471
            const evolution = { ...defaultEvolution, ...(taskDef.evolution || {}) };
129✔
472

473
            const storedState = storedTasks[taskName];
129✔
474

475
            const triggerChanged = this.checkTriggerEvolution(taskName, taskDef, evolution, storedState, update, tasksToReconcile);
129✔
476
            if (triggerChanged) needsUpdate = true;
129✔
477

478
            const logicChanged = await this.checkLogicEvolution(taskName, taskDef, evolution, storedState, update);
129✔
479
            if (logicChanged) needsUpdate = true;
129✔
480
        }
481

482
        if (needsUpdate) {
123✔
483
            debug.enabled && debug(`[DEBUG] Updating meta doc with: `, JSON.stringify(update, null, 2));
120!
484
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, update, { upsert: true });
120✔
485
        } else {
486
            debug(`[DEBUG] No updates needed for meta doc.`);
3✔
487
        }
488

489
        if (tasksToReconcile.length > 0) {
123✔
490
            await this.reconciler.markAsUnreconciled(tasksToReconcile);
117✔
491
        }
492
    }
493

494
    private checkTriggerEvolution(
495
        taskName: string,
496
        taskDef: ReactiveTaskInternal<Document>,
497
        evolution: EvolutionConfig,
498
        storedState: { triggerConfig?: { filter: Document; watchProjection: Document }; handlerVersion?: number },
499
        update: Document,
500
        tasksToReconcile: string[],
501
    ): boolean {
502
        const currentTriggerConfig = {
129✔
503
            filter: taskDef.filter || {},
221✔
504
            watchProjection: taskDef.watchProjection || {},
244✔
505
        };
506
        const currentTriggerSig = stringify(currentTriggerConfig);
129✔
507
        const storedTriggerSig = storedState?.triggerConfig ? stringify(storedState.triggerConfig) : null;
129✔
508

509
        if (currentTriggerSig !== storedTriggerSig) {
129✔
510
            const shouldReconcile = evolution.reconcileOnTriggerChange !== false;
123✔
511
            const msg = storedTriggerSig === null ? `Initial trigger config captured for [${taskName}].` : `Trigger config changed for [${taskName}].`;
123✔
512

513
            if (shouldReconcile) {
123✔
514
                debug(`${msg} Queueing reconciliation.`);
122✔
515
                tasksToReconcile.push(taskName);
122✔
516
            } else {
517
                debug(`[mongodash] ${msg} Reconciliation disabled.`);
1✔
518
            }
519

520
            if (!update.$set) update.$set = {};
123✔
521
            update.$set[`tasks.${taskName}.triggerConfig`] = currentTriggerConfig;
123✔
522
            return true;
123✔
523
        }
524
        return false;
6✔
525
    }
526

527
    private async checkLogicEvolution(
528
        taskName: string,
529
        taskDef: ReactiveTaskInternal<Document>,
530
        evolution: EvolutionConfig,
531
        storedState: { triggerConfig?: unknown; handlerVersion?: number },
532
        update: Document,
533
    ): Promise<boolean> {
534
        const currentVersion = evolution.handlerVersion ?? 1;
129!
535
        const storedVersion = storedState?.handlerVersion ?? (storedState ? 0 : 1);
129!
536

537
        if (currentVersion > storedVersion) {
129✔
538
            const policy = evolution.onHandlerVersionChange || 'none';
2!
539
            debug(`Handler upgraded for [${taskName}](v${storedVersion} -> v${currentVersion}).Policy: ${policy} `);
2✔
540

541
            const entry = this.registry.getEntry(taskDef.sourceCollection.collectionName);
2✔
542
            if (entry) {
2!
543
                if (policy === 'reprocess_failed') {
2✔
544
                    await entry.repository.resetTasksForUpgrade(taskName, 'failed');
1✔
545
                } else if (policy === 'reprocess_all') {
1!
546
                    await entry.repository.resetTasksForUpgrade(taskName, 'all');
1✔
547
                }
548
            }
549

550
            if (!update.$set) update.$set = {};
2!
551
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
2✔
552
            return true;
2✔
553
        } else if (currentVersion < storedVersion) {
127!
554
            debug(
×
555
                `[mongodash] ReactiveTask[${taskName}]: Current handlerVersion(${currentVersion}) is LOWER than stored version(${storedVersion}).Rollback detected ? `,
556
            );
557
        } else if (!storedState && currentVersion === 1) {
127✔
558
            // Safe Adoption
559
            if (!update.$set) update.$set = {};
120!
560
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
120✔
561
            return true;
120✔
562
        }
563
        return false;
7✔
564
    }
565
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc