• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 21110491895

18 Jan 2026 10:49AM UTC coverage: 91.52% (-0.07%) from 91.594%
21110491895

push

github

VaclavObornik
2.5.2

1332 of 1556 branches covered (85.6%)

Branch coverage included in aggregate %.

2154 of 2253 relevant lines covered (95.61%)

364.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.71
/src/reactiveTasks/ReactiveTaskPlanner.ts
1
import { EJSON } from 'bson';
247✔
2
import * as _debug from 'debug';
247✔
3
import {
4
    ChangeStream,
5
    ChangeStreamDeleteDocument,
6
    ChangeStreamInsertDocument,
7
    ChangeStreamReplaceDocument,
8
    ChangeStreamUpdateDocument,
9
    Document,
10
    MongoError,
11
    ResumeToken,
12
} from 'mongodb';
13
import { getMongoClient } from '../getMongoClient';
247✔
14
import { GlobalsCollection } from '../globalsCollection';
15
import { defaultOnError, OnError } from '../OnError';
247✔
16
import { defaultOnInfo, OnInfo } from '../OnInfo';
247✔
17
import { prefixFilterKeys } from '../prefixFilterKeys';
247✔
18
import { ReactiveTaskOps } from './ReactiveTaskOps';
247✔
19
import { ReactiveTaskReconciler } from './ReactiveTaskReconciler';
247✔
20
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
21
import {
247✔
22
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
23
    CODE_REACTIVE_TASK_PLANNER_STARTED,
24
    CODE_REACTIVE_TASK_PLANNER_STOPPED,
25
    CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
26
    EvolutionConfig,
27
    MetaDocument,
28
    ReactiveTaskInternal,
29
    REACTIVE_TASK_META_DOC_ID,
30
} from './ReactiveTaskTypes';
31
import stringify = require('fast-json-stable-stringify');
247✔
32

33
const debug = _debug('mongodash:reactiveTasks:planner');
247✔
34

35
type FilteredChangeStreamDocument = Pick<
36
    ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument,
37
    '_id' | 'operationType' | 'ns' | 'documentKey' | 'clusterTime'
38
>;
39

40
export interface PlannerCallbacks {
41
    onStreamError: () => void;
42
    onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void;
43
}
44

45
/**
46
 * Responsible for listening to MongoDB Change Stream events and planning tasks.
47
 *
48
 * Responsibilities:
49
 * - Manages the lifecycle of the Change Stream (start, stop, error handling).
50
 * - Batches Change Stream events to reduce database load.
51
 * - Coordinates with `ReactiveTaskOps` to generate and execute task operations.
52
 * - Coordinates with `ReactiveTaskReconciler` to handle reconciliation when the stream is interrupted or history is lost.
53
 * - Handles critical errors like `ChangeStreamHistoryLost` (code 280) by triggering reconciliation.
54
 */
55
export class ReactiveTaskPlanner {
247✔
56
    private changeStream: ChangeStream | null = null;
153✔
57
    private taskBatch = new Map<string, FilteredChangeStreamDocument>();
153✔
58
    private taskBatchLastResumeToken: ResumeToken | null = null;
153✔
59
    private batchFlushTimer: NodeJS.Timeout | null = null;
153✔
60
    private batchFirstEventTime: number | null = null;
153✔
61
    private isFlushing = false;
153✔
62
    private metaDocId = REACTIVE_TASK_META_DOC_ID;
153✔
63
    private lastClusterTime: number | null = null;
153✔
64

65
    private ops: ReactiveTaskOps;
66
    private reconciler: ReactiveTaskReconciler;
67

68
    constructor(
69
        private globalsCollection: GlobalsCollection,
153✔
70
        private instanceId: string,
153✔
71
        private registry: ReactiveTaskRegistry,
153✔
72
        private callbacks: PlannerCallbacks,
153✔
73
        private internalOptions: { batchSize: number; batchIntervalMs: number; minBatchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
153✔
74
        private onInfo: OnInfo = defaultOnInfo,
153✔
75
        private onError: OnError = defaultOnError,
153✔
76
    ) {
77
        this.ops = new ReactiveTaskOps(registry, callbacks.onTaskPlanned);
153✔
78
        this.reconciler = new ReactiveTaskReconciler(instanceId, globalsCollection, registry, this.ops, onInfo, internalOptions);
153✔
79
    }
80

81
    public updateOptions(options: Partial<typeof this.internalOptions>): void {
82
        this.internalOptions = { ...this.internalOptions, ...options };
1✔
83
        // If batch interval changes, we might want to reset timers, but it's fine for now (next batch will pick it up)
84
    }
85

86
    public setForceDebounce(debounceMs: number | undefined): void {
87
        this.ops.setForceDebounce(debounceMs);
4✔
88
    }
89

90
    public async start(): Promise<void> {
91
        this.onInfo({
135✔
92
            message: `Reactive task planner started.`,
93
            code: CODE_REACTIVE_TASK_PLANNER_STARTED,
94
        });
95

96
        // 1. Check for schema/logic evolution (Filter changes, Version upgrades)
97
        await this.checkEvolutionStrategies();
135✔
98

99
        // 2. Start stream first to ensure we don't miss events during reconciliation
100
        // We capture the time AFTER starting to ensure overlap with the stream.
101
        // This prevents a gap where events occurring between "now" and "stream start" would be missed.
102
        await this.startChangeStream();
135✔
103

104
        // Pass the current stream instance to reconcile. If stream fails/restarts, instance changes and reconcile aborts.
105
        if (this.changeStream) {
135!
106
            await this.reconciler.reconcile(this.getIsStoppedTester());
135✔
107
        }
108
    }
109

110
    public async stop(): Promise<void> {
111
        await this.stopChangeStream();
148✔
112
        this.onInfo({
148✔
113
            message: `Reactive task planner stopped.`,
114
            code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
115
        });
116
    }
117

118
    private getIsStoppedTester(): () => boolean {
119
        const changeStreamInstance = this.changeStream;
281✔
120
        return () => this.changeStream !== changeStreamInstance || this.changeStream === null;
1,551✔
121
    }
122

123
    public async saveResumeToken(token: ResumeToken, lastClusterTime?: Date): Promise<void> {
124
        const setFields: Document = { 'streamState.resumeToken': token };
376✔
125
        if (lastClusterTime) {
376✔
126
            setFields['streamState.lastClusterTime'] = lastClusterTime;
364✔
127
        }
128

129
        await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $set: setFields }, { upsert: true });
376✔
130
    }
131

132
    public get isEmpty(): boolean {
133
        return this.taskBatch.size === 0 && !this.isFlushing;
2,100✔
134
    }
135

136
    public async onHeartbeat(): Promise<void> {
137
        // Save resume token if stream is running and idle
138
        if (this.changeStream && this.isEmpty) {
146✔
139
            await this.saveResumeToken(this.changeStream.resumeToken, this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined);
132✔
140
        }
141

142
        // Periodic cleanup of orphaned tasks
143
        await this.reconciler.performPeriodicCleanup(this.getIsStoppedTester());
146✔
144
    }
145

146
    private isStopping = false;
153✔
147

148
    private async startChangeStream(): Promise<void> {
149
        if (this.changeStream) {
136✔
150
            await this.stopChangeStream();
1✔
151
        }
152

153
        try {
136✔
154
            const streamOptions: Document = {
136✔
155
                resumeAfter: await this.getChangeStreamResumeToken(),
156
                fullDocument: 'updateLookup',
157
            };
158

159
            if (!streamOptions.resumeAfter) {
136✔
160
                // get current server time to guarantee we get any operation from the current time
161
                // even if the watch operation took a time, since it is async and we don't have
162
                // any guaranty at what point we start listening
163
                const serverStatus = await getMongoClient().db().command({ hello: 1 });
126✔
164
                if (serverStatus && serverStatus.operationTime) {
126!
165
                    this.onInfo({
126✔
166
                        message: `No token found. Starting from operationTime: ${serverStatus.operationTime}`,
167
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
168
                        operationTime: serverStatus.operationTime,
169
                    });
170
                    streamOptions.startAtOperationTime = serverStatus.operationTime;
126✔
171
                } else {
172
                    // Fallback pro standalone instance bez oplogu (méně časté v produkci)
173
                    this.onInfo({
×
174
                        message: `Could not fetch operationTime. Starting standard watch.`,
175
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
176
                    });
177
                }
178
            }
179

180
            const pipeline = this.getChangeStreamPipeline();
136✔
181
            if (debug.enabled) {
136!
182
                debug(`[Scheduler ${this.instanceId}] Change Stream Pipeline: `, JSON.stringify(pipeline, null, 2));
×
183
            }
184

185
            // Determine which database to watch
186
            // We assume all monitored collections are in the same database for now.
187
            const tasks = this.registry.getAllTasks();
136✔
188
            let dbToWatch = getMongoClient().db(); // Default
136✔
189
            if (tasks.length > 0) {
136!
190
                // Log all tasks and their source collections for debugging
191
                debug(`[ReactiveTaskPlanner] Registered tasks: ${tasks.map((t) => t.task + '(' + t.sourceCollection.collectionName + ')').join(', ')}`);
143✔
192

193
                const dbName = tasks[0].sourceCollection.dbName;
136✔
194
                dbToWatch = getMongoClient().db(dbName);
136✔
195
            }
196

197
            debug(`[ReactiveTaskPlanner] Watching database: ${dbToWatch.databaseName}`);
136✔
198

199
            const stream = dbToWatch.watch(pipeline, streamOptions);
136✔
200
            this.changeStream = stream;
136✔
201

202
            stream.on('change', (change: FilteredChangeStreamDocument) => {
136✔
203
                this.enqueueTaskChange(change);
1,612✔
204
            });
205
            stream.on('resumeTokenChanged', () => {
136✔
206
                if (this.isEmpty) {
1,821✔
207
                    this.lastClusterTime = Date.now() / 1000;
346✔
208
                }
209
            });
210
            stream.on('error', (error) => this.handleStreamError(error as MongoError));
136✔
211
            stream.on('close', () => {
136✔
212
                this.onInfo({
136✔
213
                    message: `Change Stream closed.`,
214
                    code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
215
                });
216
                if (!this.isStopping) {
136!
217
                    this.callbacks.onStreamError();
×
218
                }
219
            });
220
        } catch (error) {
221
            this.onInfo({
×
222
                message: `Failed to start Change Stream: ${(error as Error).message}`,
223
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
224
                error: (error as Error).message,
225
            });
226
            this.callbacks.onStreamError();
×
227
        }
228
    }
229

230
    private async stopChangeStream(): Promise<void> {
231
        if (this.changeStream) {
149✔
232
            this.isStopping = true;
136✔
233
            debug(`[Scheduler ${this.instanceId}] Stopping Change Stream...`);
136✔
234
            await this.changeStream.close();
136✔
235
            this.changeStream = null;
136✔
236
            this.isStopping = false;
136✔
237
        }
238
        await this.flushTaskBatch();
149✔
239
    }
240

241
    private getChangeStreamPipeline(): Document[] {
242
        const collectionFilters = this.registry.getAllTasks().reduce((acc, taskDef) => {
137✔
243
            const collectionName = taskDef.sourceCollection.collectionName;
144✔
244
            if (!acc.has(collectionName)) {
144✔
245
                acc.set(collectionName, { 'ns.coll': collectionName, $or: [] });
141✔
246
            }
247
            acc.get(collectionName)!.$or.push(prefixFilterKeys({ $expr: taskDef.filter || {} }, 'fullDocument'));
144✔
248
            return acc;
144✔
249
        }, new Map<string, Document>());
250

251
        const pipeline = [
137✔
252
            {
253
                $match: {
254
                    operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
255
                    $or: Array.from(collectionFilters.values()),
256
                },
257
            },
258
            {
259
                $project: {
260
                    _id: 1,
261
                    operationType: 1,
262
                    ns: 1,
263
                    documentKey: 1,
264
                    clusterTime: 1,
265
                },
266
            },
267
        ];
268
        return pipeline;
137✔
269
    }
270

271
    private async getChangeStreamResumeToken(): Promise<ResumeToken | undefined> {
272
        const state = (await this.globalsCollection.findOne({
136✔
273
            _id: this.metaDocId,
274
        })) as MetaDocument | null;
275
        debug(`[DEBUG] getChangeStreamResumeToken loaded state (${this.metaDocId}): `, JSON.stringify(state, null, 2));
136✔
276

277
        const token = state?.streamState?.resumeToken;
136!
278
        debug(`[DEBUG] Extracted token: `, token);
136✔
279
        return token ?? undefined;
136✔
280
    }
281

282
    private async enqueueTaskChange(change: FilteredChangeStreamDocument): Promise<void> {
283
        if (debug.enabled) {
2,621!
284
            debug(`[Scheduler ${this.instanceId}] Change detected: `, JSON.stringify(change, null, 2));
×
285
        }
286

287
        if (change.clusterTime) {
2,621!
288
            // clusterTime is a BSON Timestamp.
289
            // .getHighBits() returns the seconds since epoch.
290
            this.lastClusterTime = change.clusterTime.getHighBits();
2,621✔
291
        }
292

293
        const docId = EJSON.stringify(change.documentKey._id, { relaxed: false });
2,621✔
294
        this.taskBatch.set(docId, change);
2,621✔
295
        this.taskBatchLastResumeToken = change._id;
2,621✔
296

297
        const now = Date.now();
2,621✔
298

299
        // Immediate flush if batch size reached
300
        if (this.taskBatch.size >= this.internalOptions.batchSize) {
2,621✔
301
            await this.flushTaskBatch();
107✔
302
            return;
107✔
303
        }
304

305
        // Sliding Window with Max Wait Logic
306
        if (!this.batchFirstEventTime) {
2,514✔
307
            // First event of the batch
308
            this.batchFirstEventTime = now;
144✔
309
            this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
144✔
310
        } else {
311
            // Subsequent event
312
            const elapsedSinceFirst = now - this.batchFirstEventTime;
2,370✔
313

314
            if (elapsedSinceFirst >= this.internalOptions.batchIntervalMs) {
2,370✔
315
                // Max wait reached
316
                await this.flushTaskBatch();
1✔
317
            } else {
318
                // Reset sliding window timer
319
                if (this.batchFlushTimer) {
2,369✔
320
                    clearTimeout(this.batchFlushTimer);
2,263✔
321
                }
322
                this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
2,369✔
323
            }
324
        }
325
    }
326

327
    private async flushTaskBatch(): Promise<void> {
328
        if (this.batchFlushTimer) {
392✔
329
            clearTimeout(this.batchFlushTimer);
244✔
330
            this.batchFlushTimer = null;
244✔
331
        }
332

333
        if (this.taskBatch.size === 0) {
392✔
334
            return;
148✔
335
        }
336

337
        const events = Array.from(this.taskBatch.values());
244✔
338
        this.taskBatch.clear();
244✔
339

340
        const lastToken = this.taskBatchLastResumeToken;
244✔
341
        const lastClusterTime = this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined; // Capture time associated with this batch (approx)
244!
342
        this.isFlushing = true;
244✔
343

344
        try {
244✔
345
            const { idsByCollection, deletedIdsByTask } = this.groupEventsByCollection(events);
244✔
346

347
            await this.processDeletions(deletedIdsByTask);
244✔
348
            await this.executeUpsertOperations(idsByCollection);
244✔
349

350
            if (lastToken) {
244!
351
                await this.saveResumeToken(lastToken, lastClusterTime);
244✔
352
            }
353
        } catch (error) {
354
            this.onError(error as Error);
×
355
            // We lost the batch, but we can't easily retry without complicating logic.
356
            // The stream continues.
357
        } finally {
358
            this.isFlushing = false;
244✔
359
            this.batchFirstEventTime = null;
244✔
360
        }
361
    }
362

363
    private groupEventsByCollection(events: FilteredChangeStreamDocument[]) {
364
        const idsByCollection = new Map<string, Set<unknown>>();
244✔
365
        // Map<TaskName, Set<SourceId>>
366
        const deletedIdsByTask = new Map<string, Set<unknown>>();
244✔
367

368
        for (const event of events) {
244✔
369
            if (!event.ns || !event.ns.coll) continue;
1,610!
370
            const collectionName = event.ns.coll;
1,610✔
371

372
            if (event.operationType === 'delete') {
1,610✔
373
                const docId = event.documentKey._id;
30✔
374
                const entry = this.registry.getEntry(collectionName);
30✔
375

376
                if (entry) {
30!
377
                    for (const taskDef of entry.tasks.values()) {
30✔
378
                        let docIds = deletedIdsByTask.get(taskDef.task);
30✔
379
                        if (!docIds) {
30✔
380
                            docIds = new Set();
12✔
381
                            deletedIdsByTask.set(taskDef.task, docIds);
12✔
382
                        }
383
                        docIds.add(docId);
30✔
384
                    }
385
                }
386
            } else {
387
                // insert, update, replace
388
                if (!idsByCollection.has(collectionName)) {
1,580✔
389
                    idsByCollection.set(collectionName, new Set());
235✔
390
                }
391
                idsByCollection.get(collectionName)!.add(event.documentKey._id);
1,580✔
392
            }
393
        }
394

395
        return { idsByCollection, deletedIdsByTask };
244✔
396
    }
397

398
    private async processDeletions(deletedIdsByTask: Map<string, Set<unknown>>): Promise<void> {
399
        if (deletedIdsByTask.size > 0) {
244✔
400
            await Promise.all(
12✔
401
                Array.from(deletedIdsByTask.entries()).map(async ([taskName, ids]) => {
402
                    if (ids.size === 0) return;
12!
403

404
                    const taskDef = this.registry.getTask(taskName);
12✔
405

406
                    if (taskDef) {
12!
407
                        // We use deleteOrphanedTasks but limit it to the source IDs we just saw deleted.
408
                        // This reuses the EXACT same logic (including keepFor checks) as the background cleaner.
409
                        await taskDef.repository.deleteOrphanedTasks(
12✔
410
                            taskName,
411
                            taskDef.sourceCollection.collectionName,
412
                            taskDef.filter || {},
24✔
413
                            taskDef.cleanupPolicyParsed,
414
                            () => false, // shouldStop: immediate execution, no need to stop
1✔
415
                            Array.from(ids),
416
                        );
417
                    }
418
                }),
419
            );
420
        }
421
    }
422

423
    private async executeUpsertOperations(idsByCollection: Map<string, Set<unknown>>): Promise<void> {
424
        if (idsByCollection.size > 0) {
244✔
425
            await Promise.all(
235✔
426
                Array.from(idsByCollection.entries()).map(async ([collectionName, ids]) => {
427
                    if (ids.size === 0) return;
235!
428
                    try {
235✔
429
                        await this.ops.executePlanningPipeline(collectionName, Array.from(ids));
235✔
430
                    } catch (error) {
431
                        this.onError(error as Error);
×
432
                    }
433
                }),
434
            );
435
        }
436
    }
437

438
    private async handleStreamError(error: MongoError): Promise<void> {
439
        if (error.code === 280) {
2✔
440
            this.onError(new Error(`Critical error: Oplog history lost(ChangeStreamHistoryLost).Resetting Resume Token.Original error: ${error.message} `));
1✔
441
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $unset: { 'streamState.resumeToken': '', reconciliation: '' } });
1✔
442

443
            this.onInfo({
1✔
444
                message: `Oplog lost, triggering reconciliation...`,
445
                code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
446
            });
447

448
            // Start stream first to capture new events
449
            await this.startChangeStream();
1✔
450
            const currentStream = this.changeStream;
1✔
451
            if (currentStream) {
1!
452
                await this.reconciler.reconcile(() => this.changeStream !== currentStream || this.changeStream === null);
3✔
453
            }
454
        } else {
455
            this.onInfo({
1✔
456
                message: `Change Stream error: ${error.message} `,
457
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
458
                error: error.message,
459
            });
460
            this.callbacks.onStreamError();
1✔
461
        }
462
    }
463

464
    private async checkEvolutionStrategies(): Promise<void> {
465
        const metaDoc = (await this.globalsCollection.findOne({ _id: this.metaDocId })) as MetaDocument | null;
135✔
466
        const storedTasks = metaDoc?.tasks || {};
135✔
467
        const update: Document = {};
135✔
468
        const tasksToReconcile: string[] = [];
135✔
469
        let needsUpdate = false;
135✔
470

471
        const allTasks = this.registry.getAllTasks();
135✔
472

473
        for (const taskDef of allTasks) {
135✔
474
            const taskName = taskDef.task;
142✔
475
            const defaultEvolution: EvolutionConfig = {
142✔
476
                handlerVersion: 1,
477
                onHandlerVersionChange: 'none',
478
                reconcileOnTriggerChange: true,
479
            };
480
            const evolution = { ...defaultEvolution, ...(taskDef.evolution || {}) };
142✔
481

482
            const storedState = storedTasks[taskName];
142✔
483

484
            const triggerChanged = this.checkTriggerEvolution(taskName, taskDef, evolution, storedState, update, tasksToReconcile);
142✔
485
            if (triggerChanged) needsUpdate = true;
142✔
486

487
            const logicChanged = await this.checkLogicEvolution(taskName, taskDef, evolution, storedState, update);
142✔
488
            if (logicChanged) needsUpdate = true;
142✔
489
        }
490

491
        if (needsUpdate) {
135✔
492
            debug.enabled && debug(`[DEBUG] Updating meta doc with: `, JSON.stringify(update, null, 2));
132!
493
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, update, { upsert: true });
132✔
494
        } else {
495
            debug(`[DEBUG] No updates needed for meta doc.`);
3✔
496
        }
497

498
        if (tasksToReconcile.length > 0) {
135✔
499
            await this.reconciler.markAsUnreconciled(tasksToReconcile);
129✔
500
        }
501
    }
502

503
    private checkTriggerEvolution(
504
        taskName: string,
505
        taskDef: ReactiveTaskInternal<Document>,
506
        evolution: EvolutionConfig,
507
        storedState: { triggerConfig?: { filter: Document; watchProjection: Document }; handlerVersion?: number },
508
        update: Document,
509
        tasksToReconcile: string[],
510
    ): boolean {
511
        const currentTriggerConfig = {
142✔
512
            filter: taskDef.filter || {},
241✔
513
            watchProjection: taskDef.watchProjection || {},
270✔
514
        };
515
        const currentTriggerSig = stringify(currentTriggerConfig);
142✔
516
        const storedTriggerSig = storedState?.triggerConfig ? stringify(storedState.triggerConfig) : null;
142✔
517

518
        if (currentTriggerSig !== storedTriggerSig) {
142✔
519
            const shouldReconcile = evolution.reconcileOnTriggerChange !== false;
136✔
520
            const msg = storedTriggerSig === null ? `Initial trigger config captured for [${taskName}].` : `Trigger config changed for [${taskName}].`;
136✔
521

522
            if (shouldReconcile) {
136✔
523
                debug(`${msg} Queueing reconciliation.`);
135✔
524
                tasksToReconcile.push(taskName);
135✔
525
            } else {
526
                debug(`[mongodash] ${msg} Reconciliation disabled.`);
1✔
527
            }
528

529
            if (!update.$set) update.$set = {};
136✔
530
            update.$set[`tasks.${taskName}.triggerConfig`] = currentTriggerConfig;
136✔
531
            return true;
136✔
532
        }
533
        return false;
6✔
534
    }
535

536
    private async checkLogicEvolution(
537
        taskName: string,
538
        taskDef: ReactiveTaskInternal<Document>,
539
        evolution: EvolutionConfig,
540
        storedState: { triggerConfig?: unknown; handlerVersion?: number },
541
        update: Document,
542
    ): Promise<boolean> {
543
        const currentVersion = evolution.handlerVersion ?? 1;
142!
544
        const storedVersion = storedState?.handlerVersion ?? (storedState ? 0 : 1);
142!
545

546
        if (currentVersion > storedVersion) {
142✔
547
            const policy = evolution.onHandlerVersionChange || 'none';
2!
548
            debug(`Handler upgraded for [${taskName}](v${storedVersion} -> v${currentVersion}).Policy: ${policy} `);
2✔
549

550
            const entry = this.registry.getEntry(taskDef.sourceCollection.collectionName);
2✔
551
            if (entry) {
2!
552
                if (policy === 'reprocess_failed') {
2✔
553
                    await entry.repository.resetTasksForUpgrade(taskName, 'failed');
1✔
554
                } else if (policy === 'reprocess_all') {
1!
555
                    await entry.repository.resetTasksForUpgrade(taskName, 'all');
1✔
556
                }
557
            }
558

559
            if (!update.$set) update.$set = {};
2!
560
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
2✔
561
            return true;
2✔
562
        } else if (currentVersion < storedVersion) {
140!
563
            debug(
×
564
                `[mongodash] ReactiveTask[${taskName}]: Current handlerVersion(${currentVersion}) is LOWER than stored version(${storedVersion}).Rollback detected ? `,
565
            );
566
        } else if (!storedState && currentVersion === 1) {
140✔
567
            // Safe Adoption
568
            if (!update.$set) update.$set = {};
133!
569
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
133✔
570
            return true;
133✔
571
        }
572
        return false;
7✔
573
    }
574
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc