• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 24607504909

18 Apr 2026 03:09PM UTC coverage: 91.218% (+0.09%) from 91.133%
24607504909

push

github

VaclavObornik
fix: address Copilot review round 11

1. waitUntilReactiveTasksIdle (whitelist mode) now always checks the
   planner buffer. Skipping it entirely let the helper return idle
   before an in-flight change event had a chance to be turned into
   the task record the DB check looks for. Only the worker count
   check remains global-scope-skipped so unrelated tests' worker
   pools cannot block a scoped wait.

2. docs/reactive-tasks/testing.md now notes the narrow race where
   triggering a write and immediately calling the scoped wait can
   return early if stabilityDurationMs is too short, with a
   recommended workaround (raise stabilityDurationMs or pre-poll
   for the task record).

3. cronTasksConcurrency 'no polls after stop' assertion tightened:
   stopCronTasks is fire-and-forget, so a worker already inside
   findOneAndUpdate at the time could still tick the spy. Capture
   the poll count after a 200ms grace period and assert it does
   not grow over a full 5s back-off window, instead of requiring
   spy.called === false outright.

1492 of 1748 branches covered (85.35%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

54 existing lines in 9 files now uncovered.

2320 of 2431 relevant lines covered (95.43%)

373.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.52
/src/reactiveTasks/ReactiveTaskPlanner.ts
1
import { EJSON } from 'bson';
265✔
2
import * as _debug from 'debug';
265✔
3
import {
4
    ChangeStream,
5
    ChangeStreamDeleteDocument,
6
    ChangeStreamInsertDocument,
7
    ChangeStreamReplaceDocument,
8
    ChangeStreamUpdateDocument,
9
    Document,
10
    MongoError,
11
    ResumeToken,
12
} from 'mongodb';
13
import { getMongoClient } from '../getMongoClient';
265✔
14
import { GlobalsCollection } from '../globalsCollection';
15
import { defaultOnError, OnError } from '../OnError';
265✔
16
import { defaultOnInfo, OnInfo } from '../OnInfo';
265✔
17
import { prefixFilterKeys } from '../prefixFilterKeys';
265✔
18
import { ReactiveTaskOps } from './ReactiveTaskOps';
265✔
19
import { ReactiveTaskReconciler } from './ReactiveTaskReconciler';
265✔
20
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
21
import {
265✔
22
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
23
    CODE_REACTIVE_TASK_PLANNER_STARTED,
24
    CODE_REACTIVE_TASK_PLANNER_STOPPED,
25
    CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
26
    EvolutionConfig,
27
    MetaDocument,
28
    ReactiveTaskInternal,
29
    REACTIVE_TASK_META_DOC_ID,
30
} from './ReactiveTaskTypes';
31
import stringify = require('fast-json-stable-stringify');
265✔
32

33
const debug = _debug('mongodash:reactiveTasks:planner');
265✔
34

35
type FilteredChangeStreamDocument = Pick<
36
    ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument,
37
    '_id' | 'operationType' | 'ns' | 'documentKey' | 'clusterTime'
38
>;
39

40
export interface PlannerCallbacks {
41
    onStreamError: () => void;
42
    onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void;
43
    /** Fired when a batch flush fails. Records the metric and should trigger a planner restart. */
44
    onFlushFailure?: () => void;
45
    /**
46
     * Fired when the planner needs to restart due to a flush failure (distinct from a
47
     * real change-stream error). Callers should trigger a leader-election cycle here
48
     * instead of reacting to `onStreamError`, so flush failures don't pollute the
49
     * stream-error metric.
50
     */
51
    onRequestRestart?: () => void;
52
}
53

54
/**
55
 * Responsible for listening to MongoDB Change Stream events and planning tasks.
56
 *
57
 * Responsibilities:
58
 * - Manages the lifecycle of the Change Stream (start, stop, error handling).
59
 * - Batches Change Stream events to reduce database load.
60
 * - Coordinates with `ReactiveTaskOps` to generate and execute task operations.
61
 * - Coordinates with `ReactiveTaskReconciler` to handle reconciliation when the stream is interrupted or history is lost.
62
 * - Handles critical errors like `ChangeStreamHistoryLost` (code 280) by triggering reconciliation.
63
 */
64
export class ReactiveTaskPlanner {
265✔
65
    private changeStream: ChangeStream | null = null;
161✔
66
    private taskBatch = new Map<string, FilteredChangeStreamDocument>();
161✔
67
    private taskBatchLastResumeToken: ResumeToken | null = null;
161✔
68
    private batchFlushTimer: NodeJS.Timeout | null = null;
161✔
69
    private batchFirstEventTime: number | null = null;
161✔
70
    private isFlushing = false;
161✔
71
    private lastFlushFailed = false;
161✔
72
    private metaDocId = REACTIVE_TASK_META_DOC_ID;
161✔
73
    private lastClusterTime: number | null = null;
161✔
74

75
    private ops: ReactiveTaskOps;
76
    private reconciler: ReactiveTaskReconciler;
77

78
    constructor(
79
        private globalsCollection: GlobalsCollection,
161✔
80
        private instanceId: string,
161✔
81
        private registry: ReactiveTaskRegistry,
161✔
82
        private callbacks: PlannerCallbacks,
161✔
83
        private internalOptions: { batchSize: number; batchIntervalMs: number; minBatchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
161✔
84
        private onInfo: OnInfo = defaultOnInfo,
161✔
85
        private onError: OnError = defaultOnError,
161✔
86
    ) {
87
        this.ops = new ReactiveTaskOps(registry, callbacks.onTaskPlanned);
161✔
88
        this.reconciler = new ReactiveTaskReconciler(instanceId, globalsCollection, registry, this.ops, onInfo, internalOptions);
161✔
89
    }
90

91
    public updateOptions(options: Partial<typeof this.internalOptions>): void {
92
        this.internalOptions = { ...this.internalOptions, ...options };
1✔
93
        // If batch interval changes, we might want to reset timers, but it's fine for now (next batch will pick it up)
94
    }
95

96
    public setForceDebounce(debounceMs: number | undefined): void {
97
        this.ops.setForceDebounce(debounceMs);
4✔
98
    }
99

100
    public async start(): Promise<void> {
101
        this.onInfo({
143✔
102
            message: `Reactive task planner started.`,
103
            code: CODE_REACTIVE_TASK_PLANNER_STARTED,
104
        });
105

106
        // 1. Check for schema/logic evolution (Filter changes, Version upgrades)
107
        await this.checkEvolutionStrategies();
143✔
108

109
        // 2. Start stream first to ensure we don't miss events during reconciliation
110
        // We capture the time AFTER starting to ensure overlap with the stream.
111
        // This prevents a gap where events occurring between "now" and "stream start" would be missed.
112
        await this.startChangeStream();
143✔
113

114
        // Pass the current stream instance to reconcile. If stream fails/restarts, instance changes and reconcile aborts.
115
        if (this.changeStream) {
143!
116
            await this.reconciler.reconcile(this.getIsStoppedTester());
143✔
117
        }
118
    }
119

120
    public async stop(): Promise<void> {
121
        await this.stopChangeStream();
159✔
122
        this.onInfo({
159✔
123
            message: `Reactive task planner stopped.`,
124
            code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
125
        });
126
    }
127

128
    private getIsStoppedTester(): () => boolean {
129
        const changeStreamInstance = this.changeStream;
298✔
130
        return () => this.changeStream !== changeStreamInstance || this.changeStream === null;
1,519✔
131
    }
132

133
    public async saveResumeToken(token: ResumeToken, lastClusterTime?: Date): Promise<void> {
134
        const setFields: Document = { 'streamState.resumeToken': token };
440✔
135
        if (lastClusterTime) {
440✔
136
            setFields['streamState.lastClusterTime'] = lastClusterTime;
425✔
137
        }
138

139
        await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $set: setFields }, { upsert: true });
440✔
140
    }
141

142
    public get isEmpty(): boolean {
143
        return this.taskBatch.size === 0 && !this.isFlushing;
2,311✔
144
    }
145

146
    public async onHeartbeat(): Promise<void> {
147
        // Save resume token if stream is running and idle.
148
        // Skip if the previous flush failed: advancing the token would cause
149
        // the un-planned events to be lost forever on resume.
150
        if (this.changeStream && this.isEmpty && !this.lastFlushFailed) {
155✔
151
            await this.saveResumeToken(this.changeStream.resumeToken, this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined);
141✔
152
        }
153

154
        // Periodic cleanup of orphaned tasks
155
        await this.reconciler.performPeriodicCleanup(this.getIsStoppedTester());
155✔
156
    }
157

158
    private isStopping = false;
161✔
159

160
    private async startChangeStream(): Promise<void> {
161
        if (this.changeStream) {
144✔
162
            await this.stopChangeStream();
1✔
163
        }
164

165
        try {
144✔
166
            const streamOptions: Document = {
144✔
167
                resumeAfter: await this.getChangeStreamResumeToken(),
168
                fullDocument: 'updateLookup',
169
            };
170

171
            if (!streamOptions.resumeAfter) {
144✔
172
                // get current server time to guarantee we get any operation from the current time
173
                // even if the watch operation took a time, since it is async and we don't have
174
                // any guaranty at what point we start listening
175
                const serverStatus = await getMongoClient().db().command({ hello: 1 });
133✔
176
                if (serverStatus && serverStatus.operationTime) {
133!
177
                    this.onInfo({
133✔
178
                        message: `No token found. Starting from operationTime: ${serverStatus.operationTime}`,
179
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
180
                        operationTime: serverStatus.operationTime,
181
                    });
182
                    streamOptions.startAtOperationTime = serverStatus.operationTime;
133✔
183
                } else {
184
                    // Fallback pro standalone instance bez oplogu (méně časté v produkci)
UNCOV
185
                    this.onInfo({
×
186
                        message: `Could not fetch operationTime. Starting standard watch.`,
187
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
188
                    });
189
                }
190
            }
191

192
            const pipeline = this.getChangeStreamPipeline();
144✔
193
            if (debug.enabled) {
144!
UNCOV
194
                debug(`[Scheduler ${this.instanceId}] Change Stream Pipeline: `, JSON.stringify(pipeline, null, 2));
×
195
            }
196

197
            // Determine which database to watch
198
            // We assume all monitored collections are in the same database for now.
199
            const tasks = this.registry.getAllTasks();
144✔
200
            let dbToWatch = getMongoClient().db(); // Default
144✔
201
            if (tasks.length > 0) {
144!
202
                // Log all tasks and their source collections for debugging
203
                debug(`[ReactiveTaskPlanner] Registered tasks: ${tasks.map((t) => t.task + '(' + t.sourceCollection.collectionName + ')').join(', ')}`);
152✔
204

205
                const dbName = tasks[0].sourceCollection.dbName;
144✔
206
                dbToWatch = getMongoClient().db(dbName);
144✔
207
            }
208

209
            debug(`[ReactiveTaskPlanner] Watching database: ${dbToWatch.databaseName}`);
144✔
210

211
            const stream = dbToWatch.watch(pipeline, streamOptions);
144✔
212
            this.changeStream = stream;
144✔
213
            // Stream started successfully — safe to advance resume tokens again.
214
            this.lastFlushFailed = false;
144✔
215

216
            stream.on('change', (change: FilteredChangeStreamDocument) => {
144✔
217
                this.enqueueTaskChange(change);
1,619✔
218
            });
219
            stream.on('resumeTokenChanged', () => {
144✔
220
                if (this.isEmpty) {
1,848✔
221
                    this.lastClusterTime = Date.now() / 1000;
372✔
222
                }
223
            });
224
            stream.on('error', (error) => this.handleStreamError(error as MongoError));
144✔
225
            stream.on('close', () => {
144✔
226
                this.onInfo({
144✔
227
                    message: `Change Stream closed.`,
228
                    code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
229
                });
230
                if (!this.isStopping) {
144!
UNCOV
231
                    this.callbacks.onStreamError();
×
232
                }
233
            });
234
        } catch (error) {
UNCOV
235
            this.onInfo({
×
236
                message: `Failed to start Change Stream: ${(error as Error).message}`,
237
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
238
                error: (error as Error).message,
239
            });
UNCOV
240
            this.callbacks.onStreamError();
×
241
        }
242
    }
243

244
    private async stopChangeStream(): Promise<void> {
245
        if (this.changeStream) {
160✔
246
            this.isStopping = true;
144✔
247
            debug(`[Scheduler ${this.instanceId}] Stopping Change Stream...`);
144✔
248
            await this.changeStream.close();
144✔
249
            this.changeStream = null;
144✔
250
            this.isStopping = false;
144✔
251
        }
252
        await this.flushTaskBatch();
160✔
253
    }
254

255
    private getChangeStreamPipeline(): Document[] {
256
        const collectionFilters = this.registry.getAllTasks().reduce((acc, taskDef) => {
145✔
257
            const collectionName = taskDef.sourceCollection.collectionName;
153✔
258
            if (!acc.has(collectionName)) {
153✔
259
                acc.set(collectionName, { 'ns.coll': collectionName, $or: [] });
150✔
260
            }
261
            acc.get(collectionName)!.$or.push(prefixFilterKeys({ $expr: taskDef.filter || {} }, 'fullDocument'));
153✔
262
            return acc;
153✔
263
        }, new Map<string, Document>());
264

265
        const pipeline = [
145✔
266
            {
267
                $match: {
268
                    operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
269
                    $or: Array.from(collectionFilters.values()),
270
                },
271
            },
272
            {
273
                $project: {
274
                    _id: 1,
275
                    operationType: 1,
276
                    ns: 1,
277
                    documentKey: 1,
278
                    clusterTime: 1,
279
                },
280
            },
281
        ];
282
        return pipeline;
145✔
283
    }
284

285
    private async getChangeStreamResumeToken(): Promise<ResumeToken | undefined> {
286
        const state = (await this.globalsCollection.findOne({
144✔
287
            _id: this.metaDocId,
288
        })) as MetaDocument | null;
289
        debug(`[DEBUG] getChangeStreamResumeToken loaded state (${this.metaDocId}): `, JSON.stringify(state, null, 2));
144✔
290

291
        const token = state?.streamState?.resumeToken;
144!
292
        debug(`[DEBUG] Extracted token: `, token);
144✔
293
        return token ?? undefined;
144✔
294
    }
295

296
    private async enqueueTaskChange(change: FilteredChangeStreamDocument): Promise<void> {
297
        if (debug.enabled) {
2,628!
UNCOV
298
            debug(`[Scheduler ${this.instanceId}] Change detected: `, JSON.stringify(change, null, 2));
×
299
        }
300

301
        if (change.clusterTime) {
2,628!
302
            // clusterTime is a BSON Timestamp.
303
            // .getHighBits() returns the seconds since epoch.
304
            this.lastClusterTime = change.clusterTime.getHighBits();
2,628✔
305
        }
306

307
        const docId = EJSON.stringify(change.documentKey._id, { relaxed: false });
2,628✔
308
        this.taskBatch.set(docId, change);
2,628✔
309
        this.taskBatchLastResumeToken = change._id;
2,628✔
310

311
        const now = Date.now();
2,628✔
312

313
        // Immediate flush if batch size reached
314
        if (this.taskBatch.size >= this.internalOptions.batchSize) {
2,628✔
315
            await this.flushTaskBatch();
105✔
316
            return;
105✔
317
        }
318

319
        // Sliding Window with Max Wait Logic
320
        if (!this.batchFirstEventTime) {
2,523✔
321
            // First event of the batch
322
            this.batchFirstEventTime = now;
149✔
323
            this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
149✔
324
        } else {
325
            // Subsequent event
326
            const elapsedSinceFirst = now - this.batchFirstEventTime;
2,374✔
327

328
            if (elapsedSinceFirst >= this.internalOptions.batchIntervalMs) {
2,374✔
329
                // Max wait reached
330
                await this.flushTaskBatch();
51✔
331
            } else {
332
                // Reset sliding window timer
333
                if (this.batchFlushTimer) {
2,323✔
334
                    clearTimeout(this.batchFlushTimer);
2,217✔
335
                }
336
                this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.minBatchIntervalMs);
2,323✔
337
            }
338
        }
339
    }
340

341
    private async flushTaskBatch(): Promise<void> {
342
        if (this.batchFlushTimer) {
458✔
343
            clearTimeout(this.batchFlushTimer);
250✔
344
            this.batchFlushTimer = null;
250✔
345
        }
346

347
        if (this.taskBatch.size === 0) {
458✔
348
            return;
158✔
349
        }
350

351
        const events = Array.from(this.taskBatch.values());
300✔
352
        this.taskBatch.clear();
300✔
353

354
        const lastToken = this.taskBatchLastResumeToken;
300✔
355
        const lastClusterTime = this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined; // Capture time associated with this batch (approx)
300!
356
        this.isFlushing = true;
300✔
357

358
        try {
300✔
359
            const { idsByCollection, deletedIdsByTask } = this.groupEventsByCollection(events);
300✔
360

361
            await this.processDeletions(deletedIdsByTask);
300✔
362
            await this.executeUpsertOperations(idsByCollection);
300✔
363

364
            // Do not advance the token past events that could not be planned in a
365
            // previous batch.  lastFlushFailed is cleared only when the stream restarts
366
            // (startChangeStream), so it stays true across any subsequent successful
367
            // flushes that run before the restart completes.
368
            if (lastToken && !this.lastFlushFailed) {
299!
369
                await this.saveResumeToken(lastToken, lastClusterTime);
299✔
370
            }
371
        } catch (error) {
372
            this.onError(error as Error);
5✔
373
            // Mark as failed so heartbeat and future flushes do not advance the
374
            // resume token past events we could not plan.  The flag is cleared only
375
            // when the change stream is restarted (startChangeStream).
376
            this.lastFlushFailed = true;
5✔
377
            this.callbacks.onFlushFailure?.();
5!
378
            // Use onRequestRestart (not onStreamError) so flush failures don't
379
            // pollute the stream-error metric.
380
            this.callbacks.onRequestRestart?.();
5!
381
        } finally {
382
            this.isFlushing = false;
300✔
383
            this.batchFirstEventTime = null;
300✔
384
        }
385
    }
386

387
    private groupEventsByCollection(events: FilteredChangeStreamDocument[]) {
388
        const idsByCollection = new Map<string, Set<unknown>>();
300✔
389
        // Map<TaskName, Set<SourceId>>
390
        const deletedIdsByTask = new Map<string, Set<unknown>>();
300✔
391

392
        for (const event of events) {
300✔
393
            if (!event.ns || !event.ns.coll) continue;
1,617!
394
            const collectionName = event.ns.coll;
1,617✔
395

396
            if (event.operationType === 'delete') {
1,617✔
397
                const docId = event.documentKey._id;
30✔
398
                const entry = this.registry.getEntry(collectionName);
30✔
399

400
                if (entry) {
30!
401
                    for (const taskDef of entry.tasks.values()) {
30✔
402
                        let docIds = deletedIdsByTask.get(taskDef.task);
30✔
403
                        if (!docIds) {
30✔
404
                            docIds = new Set();
12✔
405
                            deletedIdsByTask.set(taskDef.task, docIds);
12✔
406
                        }
407
                        docIds.add(docId);
30✔
408
                    }
409
                }
410
            } else {
411
                // insert, update, replace
412
                if (!idsByCollection.has(collectionName)) {
1,587✔
413
                    idsByCollection.set(collectionName, new Set());
292✔
414
                }
415
                idsByCollection.get(collectionName)!.add(event.documentKey._id);
1,587✔
416
            }
417
        }
418

419
        return { idsByCollection, deletedIdsByTask };
300✔
420
    }
421

422
    private async processDeletions(deletedIdsByTask: Map<string, Set<unknown>>): Promise<void> {
423
        if (deletedIdsByTask.size === 0) return;
300✔
424

425
        const entries = Array.from(deletedIdsByTask.entries());
12✔
426
        const labels = entries.map(([taskName]) => `task=${taskName}`);
12✔
427

428
        const results = await Promise.allSettled(
12✔
429
            entries.map(async ([taskName, ids]) => {
430
                if (ids.size === 0) return;
12!
431

432
                const taskDef = this.registry.getTask(taskName);
12✔
433

434
                if (taskDef) {
12!
435
                    // We use deleteOrphanedTasks but limit it to the source IDs we just saw deleted.
436
                    // This reuses the EXACT same logic (including keepFor checks) as the background cleaner.
437
                    await taskDef.repository.deleteOrphanedTasks(
12✔
438
                        taskName,
439
                        taskDef.sourceCollection.collectionName,
440
                        taskDef.filter || {},
24✔
441
                        taskDef.cleanupPolicyParsed,
442
                        () => false, // shouldStop: immediate execution, no need to stop
1✔
443
                        Array.from(ids),
444
                    );
445
                }
446
            }),
447
        );
448

449
        this.throwOnAnyRejection(results, 'processDeletions', labels);
12✔
450
    }
451

452
    private async executeUpsertOperations(idsByCollection: Map<string, Set<unknown>>): Promise<void> {
453
        if (idsByCollection.size === 0) return;
300✔
454

455
        const entries = Array.from(idsByCollection.entries());
291✔
456
        const labels = entries.map(([collectionName]) => `collection=${collectionName}`);
292✔
457

458
        const results = await Promise.allSettled(
291✔
459
            entries.map(async ([collectionName, ids]) => {
460
                if (ids.size === 0) return;
292!
461
                await this.ops.executePlanningPipeline(collectionName, Array.from(ids));
292✔
462
            }),
463
        );
464

465
        this.throwOnAnyRejection(results, 'executeUpsertOperations', labels);
291✔
466
    }
467

468
    private throwOnAnyRejection(results: PromiseSettledResult<unknown>[], context: string, labels: string[] = []): void {
×
469
        if (results.every((r) => r.status !== 'rejected')) return;
304✔
470

471
        // Build a single aggregated error so the caller (flushTaskBatch) reports it once.
472
        // Calling onError here for each failure would duplicate the report since flushTaskBatch
473
        // also calls onError on the thrown error. We zip each result with its label (taskName /
474
        // collectionName) so operators can immediately identify the failing pipeline or delete.
475
        const failures: string[] = [];
1✔
476
        for (let i = 0; i < results.length; i += 1) {
1✔
477
            const r = results[i];
1✔
478
            if (r.status !== 'rejected') continue;
1!
479
            const msg = r.reason instanceof Error ? r.reason.message : String(r.reason);
1!
480
            const label = labels[i] ? ` (${labels[i]})` : '';
1!
481
            failures.push(`[${failures.length + 1}]${label} ${msg}`);
1✔
482
        }
483
        throw new Error(`${context}: ${failures.length} of ${results.length} operation(s) failed. Errors: ${failures.join('; ')}`);
1✔
484
    }
485

486
    private async handleStreamError(error: MongoError): Promise<void> {
487
        if (error.code === 280) {
3✔
488
            this.onError(new Error(`Critical error: Oplog history lost (ChangeStreamHistoryLost). Resetting Resume Token. Original error: ${error.message}`));
1✔
489
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $unset: { 'streamState.resumeToken': '', reconciliation: '' } });
1✔
490

491
            this.onInfo({
1✔
492
                message: `Oplog lost, triggering reconciliation...`,
493
                code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
494
            });
495

496
            // Start stream first to capture new events
497
            await this.startChangeStream();
1✔
498
            const currentStream = this.changeStream;
1✔
499
            if (currentStream) {
1!
500
                await this.reconciler.reconcile(() => this.changeStream !== currentStream || this.changeStream === null);
3✔
501
            }
502
        } else {
503
            this.onInfo({
2✔
504
                message: `Change Stream error: ${error.message}`,
505
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
506
                error: error.message,
507
            });
508
            this.callbacks.onStreamError();
2✔
509
        }
510
    }
511

512
    private async checkEvolutionStrategies(): Promise<void> {
513
        const metaDoc = (await this.globalsCollection.findOne({ _id: this.metaDocId })) as MetaDocument | null;
143✔
514
        const storedTasks = metaDoc?.tasks || {};
143✔
515
        const update: Document = {};
143✔
516
        const tasksToReconcile: string[] = [];
143✔
517
        let needsUpdate = false;
143✔
518

519
        const allTasks = this.registry.getAllTasks();
143✔
520

521
        for (const taskDef of allTasks) {
143✔
522
            const taskName = taskDef.task;
151✔
523
            const defaultEvolution: EvolutionConfig = {
151✔
524
                handlerVersion: 1,
525
                onHandlerVersionChange: 'none',
526
                reconcileOnTriggerChange: true,
527
            };
528
            const evolution = { ...defaultEvolution, ...(taskDef.evolution || {}) };
151✔
529

530
            const storedState = storedTasks[taskName];
151✔
531

532
            const triggerChanged = this.checkTriggerEvolution(taskName, taskDef, evolution, storedState, update, tasksToReconcile);
151✔
533
            if (triggerChanged) needsUpdate = true;
151✔
534

535
            const logicChanged = await this.checkLogicEvolution(taskName, taskDef, evolution, storedState, update);
151✔
536
            if (logicChanged) needsUpdate = true;
151✔
537
        }
538

539
        if (needsUpdate) {
143✔
540
            debug.enabled && debug(`[DEBUG] Updating meta doc with: `, JSON.stringify(update, null, 2));
139!
541
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, update, { upsert: true });
139✔
542
        } else {
543
            debug(`[DEBUG] No updates needed for meta doc.`);
4✔
544
        }
545

546
        if (tasksToReconcile.length > 0) {
143✔
547
            await this.reconciler.markAsUnreconciled(tasksToReconcile);
136✔
548
        }
549
    }
550

551
    private checkTriggerEvolution(
552
        taskName: string,
553
        taskDef: ReactiveTaskInternal<Document>,
554
        evolution: EvolutionConfig,
555
        storedState: { triggerConfig?: { filter: Document; watchProjection: Document }; handlerVersion?: number },
556
        update: Document,
557
        tasksToReconcile: string[],
558
    ): boolean {
559
        const currentTriggerConfig = {
151✔
560
            filter: taskDef.filter || {},
261✔
561
            watchProjection: taskDef.watchProjection || {},
288✔
562
        };
563
        const currentTriggerSig = stringify(currentTriggerConfig);
151✔
564
        const storedTriggerSig = storedState?.triggerConfig ? stringify(storedState.triggerConfig) : null;
151✔
565

566
        if (currentTriggerSig !== storedTriggerSig) {
151✔
567
            const shouldReconcile = evolution.reconcileOnTriggerChange !== false;
144✔
568
            const msg = storedTriggerSig === null ? `Initial trigger config captured for [${taskName}].` : `Trigger config changed for [${taskName}].`;
144✔
569

570
            if (shouldReconcile) {
144✔
571
                debug(`${msg} Queueing reconciliation.`);
143✔
572
                tasksToReconcile.push(taskName);
143✔
573
            } else {
574
                debug(`[mongodash] ${msg} Reconciliation disabled.`);
1✔
575
            }
576

577
            if (!update.$set) update.$set = {};
144✔
578
            update.$set[`tasks.${taskName}.triggerConfig`] = currentTriggerConfig;
144✔
579
            return true;
144✔
580
        }
581
        return false;
7✔
582
    }
583

584
    private async checkLogicEvolution(
585
        taskName: string,
586
        taskDef: ReactiveTaskInternal<Document>,
587
        evolution: EvolutionConfig,
588
        storedState: { triggerConfig?: unknown; handlerVersion?: number },
589
        update: Document,
590
    ): Promise<boolean> {
591
        const currentVersion = evolution.handlerVersion ?? 1;
151!
592
        const storedVersion = storedState?.handlerVersion ?? (storedState ? 0 : 1);
151!
593

594
        if (currentVersion > storedVersion) {
151✔
595
            const policy = evolution.onHandlerVersionChange || 'none';
2!
596
            debug(`Handler upgraded for [${taskName}](v${storedVersion} -> v${currentVersion}).Policy: ${policy} `);
2✔
597

598
            const entry = this.registry.getEntry(taskDef.sourceCollection.collectionName);
2✔
599
            if (entry) {
2!
600
                if (policy === 'reprocess_failed') {
2✔
601
                    await entry.repository.resetTasksForUpgrade(taskName, 'failed');
1✔
602
                } else if (policy === 'reprocess_all') {
1!
603
                    await entry.repository.resetTasksForUpgrade(taskName, 'all');
1✔
604
                }
605
            }
606

607
            if (!update.$set) update.$set = {};
2!
608
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
2✔
609
            return true;
2✔
610
        } else if (currentVersion < storedVersion) {
149!
UNCOV
611
            debug(
×
612
                `[mongodash] ReactiveTask[${taskName}]: Current handlerVersion(${currentVersion}) is LOWER than stored version(${storedVersion}).Rollback detected ? `,
613
            );
614
        } else if (!storedState && currentVersion === 1) {
149✔
615
            // Safe Adoption
616
            if (!update.$set) update.$set = {};
141!
617
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
141✔
618
            return true;
141✔
619
        }
620
        return false;
8✔
621
    }
622
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc