• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20582330527

29 Dec 2025 08:44PM UTC coverage: 91.869%. Remained the same
20582330527

push

github

VaclavObornik
ci: bump path-to-regexp from 1.9.0 to 8.3.0

Bumps [path-to-regexp](https://github.com/pillarjs/path-to-regexp) from 1.9.0 to 8.3.0.
- [Release notes](https://github.com/pillarjs/path-to-regexp/releases)
- [Changelog](https://github.com/pillarjs/path-to-regexp/blob/master/History.md)
- [Commits](https://github.com/pillarjs/path-to-regexp/compare/v1.9.0...v8.3.0)

---
updated-dependencies:
- dependency-name: path-to-regexp
  dependency-version: 8.3.0
  dependency-type: direct:development
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

1250 of 1452 branches covered (86.09%)

Branch coverage included in aggregate %.

2004 of 2090 relevant lines covered (95.89%)

377.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.43
/src/reactiveTasks/ReactiveTaskPlanner.ts
1
import {
2
    ChangeStream,
3
    ChangeStreamDeleteDocument,
4
    ChangeStreamInsertDocument,
5
    ChangeStreamReplaceDocument,
6
    ChangeStreamUpdateDocument,
7
    Document,
8
    MongoError,
9
    ResumeToken,
10
} from 'mongodb';
11
import stringify = require('fast-json-stable-stringify');
220✔
12
import * as _debug from 'debug';
220✔
13
import { GlobalsCollection } from '../globalsCollection';
14
import { getMongoClient } from '../getMongoClient';
220✔
15
import { prefixFilterKeys } from '../prefixFilterKeys';
220✔
16
import {
220✔
17
    MetaDocument,
18
    CODE_REACTIVE_TASK_PLANNER_STARTED,
19
    CODE_REACTIVE_TASK_PLANNER_STOPPED,
20
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
21
    CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
22
    REACTIVE_TASK_META_DOC_ID,
23
    EvolutionConfig,
24
    ReactiveTaskInternal,
25
} from './ReactiveTaskTypes';
26
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
27
import { OnInfo, defaultOnInfo } from '../OnInfo';
220✔
28
import { OnError, defaultOnError } from '../OnError';
220✔
29
import { EJSON } from 'bson';
220✔
30
import { ReactiveTaskOps } from './ReactiveTaskOps';
220✔
31
import { ReactiveTaskReconciler } from './ReactiveTaskReconciler';
220✔
32

33
const debug = _debug('mongodash:reactiveTasks:planner');
220✔
34

35
type FilteredChangeStreamDocument = Pick<
36
    ChangeStreamInsertDocument | ChangeStreamUpdateDocument | ChangeStreamReplaceDocument | ChangeStreamDeleteDocument,
37
    '_id' | 'operationType' | 'ns' | 'documentKey' | 'clusterTime'
38
>;
39

40
export interface PlannerCallbacks {
41
    onStreamError: () => void;
42
    onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void;
43
}
44

45
/**
46
 * Responsible for listening to MongoDB Change Stream events and planning tasks.
47
 *
48
 * Responsibilities:
49
 * - Manages the lifecycle of the Change Stream (start, stop, error handling).
50
 * - Batches Change Stream events to reduce database load.
51
 * - Coordinates with `ReactiveTaskOps` to generate and execute task operations.
52
 * - Coordinates with `ReactiveTaskReconciler` to handle reconciliation when the stream is interrupted or history is lost.
53
 * - Handles critical errors like `ChangeStreamHistoryLost` (code 280) by triggering reconciliation.
54
 */
55
export class ReactiveTaskPlanner {
220✔
56
    private changeStream: ChangeStream | null = null;
133✔
57
    private taskBatch = new Map<string, FilteredChangeStreamDocument>();
133✔
58
    private taskBatchLastResumeToken: ResumeToken | null = null;
133✔
59
    private batchFlushTimer: NodeJS.Timeout | null = null;
133✔
60
    private isFlushing = false;
133✔
61
    private metaDocId = REACTIVE_TASK_META_DOC_ID;
133✔
62
    private lastClusterTime: number | null = null;
133✔
63

64
    private ops: ReactiveTaskOps;
65
    private reconciler: ReactiveTaskReconciler;
66

67
    private get isStoppedTester(): () => boolean {
68
        return () => this.changeStream === null;
1,528✔
69
    }
70

71
    constructor(
72
        private globalsCollection: GlobalsCollection,
133✔
73
        private instanceId: string,
133✔
74
        private registry: ReactiveTaskRegistry,
133✔
75
        private callbacks: PlannerCallbacks,
133✔
76
        private internalOptions: { batchSize: number; batchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
133✔
77
        private onInfo: OnInfo = defaultOnInfo,
133✔
78
        private onError: OnError = defaultOnError,
133✔
79
    ) {
80
        this.ops = new ReactiveTaskOps(registry, callbacks.onTaskPlanned);
133✔
81
        this.reconciler = new ReactiveTaskReconciler(instanceId, globalsCollection, registry, this.ops, onInfo, internalOptions);
133✔
82
    }
83

84
    public async start(): Promise<void> {
85
        this.onInfo({
122✔
86
            message: `Reactive task planner started.`,
87
            code: CODE_REACTIVE_TASK_PLANNER_STARTED,
88
        });
89

90
        // 1. Check for schema/logic evolution (Filter changes, Version upgrades)
91
        await this.checkEvolutionStrategies();
122✔
92

93
        // 2. Start stream first to ensure we don't miss events during reconciliation
94
        // We capture the time AFTER starting to ensure overlap with the stream.
95
        // This prevents a gap where events occurring between "now" and "stream start" would be missed.
96
        await this.startChangeStream();
122✔
97

98
        // Pass the current stream instance to reconcile. If stream fails/restarts, instance changes and reconcile aborts.
99
        if (this.changeStream) {
122!
100
            await this.reconciler.reconcile(this.isStoppedTester);
122✔
101
        }
102
    }
103

104
    public async stop(): Promise<void> {
105
        await this.stopChangeStream();
132✔
106
        this.onInfo({
132✔
107
            message: `Reactive task planner stopped.`,
108
            code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
109
        });
110
    }
111

112
    public async saveResumeToken(token: ResumeToken, lastClusterTime?: Date): Promise<void> {
113
        const setFields: Document = { 'streamState.resumeToken': token };
349✔
114
        if (lastClusterTime) {
349✔
115
            setFields['streamState.lastClusterTime'] = lastClusterTime;
331✔
116
        }
117

118
        await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $set: setFields }, { upsert: true });
349✔
119
    }
120

121
    public get isEmpty(): boolean {
122
        return this.taskBatch.size === 0 && !this.isFlushing;
1,934✔
123
    }
124

125
    public async onHeartbeat(): Promise<void> {
126
        // Save resume token if stream is running and idle
127
        if (this.changeStream && this.isEmpty) {
131✔
128
            await this.saveResumeToken(this.changeStream.resumeToken, this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined);
120✔
129
        }
130

131
        // Periodic cleanup of orphaned tasks
132
        await this.reconciler.performPeriodicCleanup(this.isStoppedTester);
131✔
133
    }
134

135
    private isStopping = false;
133✔
136

137
    private async startChangeStream(): Promise<void> {
138
        if (this.changeStream) {
123✔
139
            await this.stopChangeStream();
1✔
140
        }
141

142
        try {
123✔
143
            const streamOptions: Document = {
123✔
144
                resumeAfter: await this.getChangeStreamResumeToken(),
145
                fullDocument: 'updateLookup',
146
            };
147

148
            if (!streamOptions.resumeAfter) {
123✔
149
                // get current server time to guarantee we get any operation from the current time
150
                // even if the watch operation took a time, since it is async and we don't have
151
                // any guaranty at what point we start listening
152
                const serverStatus = await getMongoClient().db().command({ hello: 1 });
113✔
153
                if (serverStatus && serverStatus.operationTime) {
113!
154
                    this.onInfo({
113✔
155
                        message: `No token found. Starting from operationTime: ${serverStatus.operationTime}`,
156
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
157
                        operationTime: serverStatus.operationTime,
158
                    });
159
                    streamOptions.startAtOperationTime = serverStatus.operationTime;
113✔
160
                } else {
161
                    // Fallback pro standalone instance bez oplogu (méně časté v produkci)
162
                    this.onInfo({
×
163
                        message: `Could not fetch operationTime. Starting standard watch.`,
164
                        code: CODE_REACTIVE_TASK_PLANNER_STARTED,
165
                    });
166
                }
167
            }
168

169
            const pipeline = this.getChangeStreamPipeline();
123✔
170
            debug(`[Scheduler ${this.instanceId}] Change Stream Pipeline: `, JSON.stringify(pipeline, null, 2));
123✔
171

172
            // Determine which database to watch
173
            // We assume all monitored collections are in the same database for now.
174
            const tasks = this.registry.getAllTasks();
123✔
175
            let dbToWatch = getMongoClient().db(); // Default
123✔
176
            if (tasks.length > 0) {
123!
177
                const dbName = tasks[0].sourceCollection.dbName;
123✔
178
                dbToWatch = getMongoClient().db(dbName);
123✔
179
                debug(`[ReactiveTaskPlanner] Watching database: ${dbName}`);
123✔
180
            }
181

182
            const stream = dbToWatch.watch(pipeline, streamOptions);
123✔
183
            this.changeStream = stream;
123✔
184

185
            stream.on('change', (change: FilteredChangeStreamDocument) => {
123✔
186
                this.enqueueTaskChange(change);
1,600✔
187
            });
188
            stream.on('resumeTokenChanged', () => {
123✔
189
                if (this.isEmpty) {
1,812✔
190
                    this.lastClusterTime = Date.now() / 1000;
329✔
191
                }
192
            });
193
            stream.on('error', (error) => this.handleStreamError(error as MongoError));
123✔
194
            stream.on('close', () => {
123✔
195
                this.onInfo({
123✔
196
                    message: `Change Stream closed.`,
197
                    code: CODE_REACTIVE_TASK_PLANNER_STOPPED,
198
                });
199
                if (!this.isStopping) {
123!
200
                    this.callbacks.onStreamError();
×
201
                }
202
            });
203
        } catch (error) {
204
            this.onInfo({
×
205
                message: `Failed to start Change Stream: ${(error as Error).message}`,
206
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
207
                error: (error as Error).message,
208
            });
209
            this.callbacks.onStreamError();
×
210
        }
211
    }
212

213
    private async stopChangeStream(): Promise<void> {
214
        if (this.changeStream) {
133✔
215
            this.isStopping = true;
123✔
216
            debug(`[Scheduler ${this.instanceId}] Stopping Change Stream...`);
123✔
217
            await this.changeStream.close();
123✔
218
            this.changeStream = null;
123✔
219
            this.isStopping = false;
123✔
220
        }
221
        await this.flushTaskBatch();
133✔
222
    }
223

224
    private getChangeStreamPipeline(): Document[] {
225
        const collectionFilters = this.registry.getAllTasks().reduce((acc, taskDef) => {
124✔
226
            const collectionName = taskDef.sourceCollection.collectionName;
130✔
227
            if (!acc.has(collectionName)) {
130✔
228
                acc.set(collectionName, { 'ns.coll': collectionName, $or: [] });
128✔
229
            }
230
            acc.get(collectionName)!.$or.push(prefixFilterKeys({ $expr: taskDef.filter || {} }, 'fullDocument'));
130✔
231
            return acc;
130✔
232
        }, new Map<string, Document>());
233

234
        const pipeline = [
124✔
235
            {
236
                $match: {
237
                    operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
238
                    $or: Array.from(collectionFilters.values()),
239
                },
240
            },
241
            {
242
                $project: {
243
                    _id: 1,
244
                    operationType: 1,
245
                    ns: 1,
246
                    documentKey: 1,
247
                    clusterTime: 1,
248
                },
249
            },
250
        ];
251
        return pipeline;
124✔
252
    }
253

254
    private async getChangeStreamResumeToken(): Promise<ResumeToken | undefined> {
255
        const state = (await this.globalsCollection.findOne({
123✔
256
            _id: this.metaDocId,
257
        })) as MetaDocument | null;
258
        debug(`[DEBUG] getChangeStreamResumeToken loaded state (${this.metaDocId}): `, JSON.stringify(state, null, 2));
123✔
259

260
        const token = state?.streamState?.resumeToken;
123!
261
        debug(`[DEBUG] Extracted token: `, token);
123✔
262
        return token ?? undefined;
123✔
263
    }
264

265
    private async enqueueTaskChange(change: FilteredChangeStreamDocument): Promise<void> {
266
        debug(`[Scheduler ${this.instanceId}] Change detected: `, change._id);
1,600✔
267

268
        if (change.clusterTime) {
1,600!
269
            // clusterTime is a BSON Timestamp.
270
            // .getHighBits() returns the seconds since epoch.
271
            this.lastClusterTime = change.clusterTime.getHighBits();
1,600✔
272
        }
273

274
        const docId = EJSON.stringify(change.documentKey._id, { relaxed: false });
1,600✔
275
        this.taskBatch.set(docId, change);
1,600✔
276
        this.taskBatchLastResumeToken = change._id;
1,600✔
277

278
        if (this.taskBatch.size >= this.internalOptions.batchSize) {
1,600✔
279
            if (this.batchFlushTimer) {
108!
280
                clearTimeout(this.batchFlushTimer);
108✔
281
                this.batchFlushTimer = null;
108✔
282
            }
283
            await this.flushTaskBatch();
108✔
284
        } else if (!this.batchFlushTimer) {
1,492✔
285
            this.batchFlushTimer = setTimeout(() => this.flushTaskBatch(), this.internalOptions.batchIntervalMs);
229✔
286
        }
287
    }
288

289
    private async flushTaskBatch(): Promise<void> {
290
        if (this.batchFlushTimer) {
356✔
291
            clearTimeout(this.batchFlushTimer);
121✔
292
            this.batchFlushTimer = null;
121✔
293
        }
294

295
        if (this.taskBatch.size === 0) {
356✔
296
            return;
127✔
297
        }
298

299
        const events = Array.from(this.taskBatch.values());
229✔
300
        this.taskBatch.clear();
229✔
301

302
        const lastToken = this.taskBatchLastResumeToken;
229✔
303
        const lastClusterTime = this.lastClusterTime ? new Date(this.lastClusterTime * 1000) : undefined; // Capture time associated with this batch (approx)
229!
304
        this.isFlushing = true;
229✔
305

306
        try {
229✔
307
            const { idsByCollection, deletedIdsByTask } = this.groupEventsByCollection(events);
229✔
308

309
            await this.processDeletions(deletedIdsByTask);
229✔
310
            await this.executeUpsertOperations(idsByCollection);
229✔
311

312
            if (lastToken) {
229!
313
                await this.saveResumeToken(lastToken, lastClusterTime);
229✔
314
            }
315
        } catch (error) {
316
            this.onError(error as Error);
3✔
317
            // We lost the batch, but we can't easily retry without complicating logic.
318
            // The stream continues.
319
        } finally {
320
            this.isFlushing = false;
229✔
321
        }
322
    }
323

324
    private groupEventsByCollection(events: FilteredChangeStreamDocument[]) {
325
        const idsByCollection = new Map<string, Set<unknown>>();
229✔
326
        // Map<TaskName, Set<SourceId>>
327
        const deletedIdsByTask = new Map<string, Set<unknown>>();
229✔
328

329
        for (const event of events) {
229✔
330
            if (!event.ns || !event.ns.coll) continue;
1,596!
331
            const collectionName = event.ns.coll;
1,596✔
332

333
            if (event.operationType === 'delete') {
1,596✔
334
                const docId = event.documentKey._id;
30✔
335
                const entry = this.registry.getEntry(collectionName);
30✔
336

337
                if (entry) {
30!
338
                    for (const taskDef of entry.tasks.values()) {
30✔
339
                        let docIds = deletedIdsByTask.get(taskDef.task);
30✔
340
                        if (!docIds) {
30✔
341
                            docIds = new Set();
12✔
342
                            deletedIdsByTask.set(taskDef.task, docIds);
12✔
343
                        }
344
                        docIds.add(docId);
30✔
345
                    }
346
                }
347
            } else {
348
                // insert, update, replace
349
                if (!idsByCollection.has(collectionName)) {
1,566✔
350
                    idsByCollection.set(collectionName, new Set());
220✔
351
                }
352
                idsByCollection.get(collectionName)!.add(event.documentKey._id);
1,566✔
353
            }
354
        }
355

356
        return { idsByCollection, deletedIdsByTask };
229✔
357
    }
358

359
    private async processDeletions(deletedIdsByTask: Map<string, Set<unknown>>): Promise<void> {
360
        if (deletedIdsByTask.size > 0) {
229✔
361
            await Promise.all(
12✔
362
                Array.from(deletedIdsByTask.entries()).map(async ([taskName, ids]) => {
363
                    if (ids.size === 0) return;
12!
364

365
                    const taskDef = this.registry.getTask(taskName);
12✔
366

367
                    if (taskDef) {
12!
368
                        // We use deleteOrphanedTasks but limit it to the source IDs we just saw deleted.
369
                        // This reuses the EXACT same logic (including keepFor checks) as the background cleaner.
370
                        await taskDef.repository.deleteOrphanedTasks(
12✔
371
                            taskName,
372
                            taskDef.sourceCollection.collectionName,
373
                            taskDef.filter || {},
24✔
374
                            taskDef.cleanupPolicyParsed,
375
                            () => false, // shouldStop: immediate execution, no need to stop
×
376
                            Array.from(ids),
377
                        );
378
                    }
379
                }),
380
            );
381
        }
382
    }
383

384
    private async executeUpsertOperations(idsByCollection: Map<string, Set<unknown>>): Promise<void> {
385
        if (idsByCollection.size > 0) {
229✔
386
            await Promise.all(
220✔
387
                Array.from(idsByCollection.entries()).map(async ([collectionName, ids]) => {
388
                    if (ids.size === 0) return;
220!
389
                    try {
220✔
390
                        await this.ops.executePlanningPipeline(collectionName, Array.from(ids));
220✔
391
                    } catch (error) {
392
                        this.onError(error as Error);
2✔
393
                    }
394
                }),
395
            );
396
        }
397
    }
398

399
    private async handleStreamError(error: MongoError): Promise<void> {
400
        if (error.code === 280) {
2✔
401
            this.onError(new Error(`Critical error: Oplog history lost(ChangeStreamHistoryLost).Resetting Resume Token.Original error: ${error.message} `));
1✔
402
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, { $unset: { 'streamState.resumeToken': '', reconciliation: '' } });
1✔
403

404
            this.onInfo({
1✔
405
                message: `Oplog lost, triggering reconciliation...`,
406
                code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
407
            });
408

409
            // Start stream first to capture new events
410
            await this.startChangeStream();
1✔
411
            if (this.changeStream) {
1!
412
                await this.reconciler.reconcile(this.isStoppedTester);
1✔
413
            }
414
        } else {
415
            this.onInfo({
1✔
416
                message: `Change Stream error: ${error.message} `,
417
                code: CODE_REACTIVE_TASK_PLANNER_STREAM_ERROR,
418
                error: error.message,
419
            });
420
            this.callbacks.onStreamError();
1✔
421
        }
422
    }
423

424
    private async checkEvolutionStrategies(): Promise<void> {
425
        const metaDoc = (await this.globalsCollection.findOne({ _id: this.metaDocId })) as MetaDocument | null;
122✔
426
        const storedTasks = metaDoc?.tasks || {};
122✔
427
        const update: Document = {};
122✔
428
        const tasksToReconcile: string[] = [];
122✔
429
        let needsUpdate = false;
122✔
430

431
        const allTasks = this.registry.getAllTasks();
122✔
432

433
        for (const taskDef of allTasks) {
122✔
434
            const taskName = taskDef.task;
128✔
435
            const defaultEvolution: EvolutionConfig = {
128✔
436
                handlerVersion: 1,
437
                onHandlerVersionChange: 'none',
438
                reconcileOnTriggerChange: true,
439
            };
440
            const evolution = { ...defaultEvolution, ...(taskDef.evolution || {}) };
128✔
441

442
            const storedState = storedTasks[taskName];
128✔
443

444
            const triggerChanged = this.checkTriggerEvolution(taskName, taskDef, evolution, storedState, update, tasksToReconcile);
128✔
445
            if (triggerChanged) needsUpdate = true;
128✔
446

447
            const logicChanged = await this.checkLogicEvolution(taskName, taskDef, evolution, storedState, update);
128✔
448
            if (logicChanged) needsUpdate = true;
128✔
449
        }
450

451
        if (needsUpdate) {
122✔
452
            debug(`[DEBUG] Updating meta doc with: `, JSON.stringify(update, null, 2));
119✔
453
            await this.globalsCollection.updateOne({ _id: this.metaDocId }, update, { upsert: true });
119✔
454
        } else {
455
            debug(`[DEBUG] No updates needed for meta doc.`);
3✔
456
        }
457

458
        if (tasksToReconcile.length > 0) {
122✔
459
            await this.reconciler.markAsUnreconciled(tasksToReconcile);
116✔
460
        }
461
    }
462

463
    private checkTriggerEvolution(
464
        taskName: string,
465
        taskDef: ReactiveTaskInternal<Document>,
466
        evolution: EvolutionConfig,
467
        storedState: { triggerConfig?: { filter: Document; watchProjection: Document }; handlerVersion?: number },
468
        update: Document,
469
        tasksToReconcile: string[],
470
    ): boolean {
471
        const currentTriggerConfig = {
128✔
472
            filter: taskDef.filter || {},
219✔
473
            watchProjection: taskDef.watchProjection || {},
242✔
474
        };
475
        const currentTriggerSig = stringify(currentTriggerConfig);
128✔
476
        const storedTriggerSig = storedState?.triggerConfig ? stringify(storedState.triggerConfig) : null;
128✔
477

478
        if (currentTriggerSig !== storedTriggerSig) {
128✔
479
            const shouldReconcile = evolution.reconcileOnTriggerChange !== false;
122✔
480
            const msg = storedTriggerSig === null ? `Initial trigger config captured for [${taskName}].` : `Trigger config changed for [${taskName}].`;
122✔
481

482
            if (shouldReconcile) {
122✔
483
                debug(`${msg} Queueing reconciliation.`);
121✔
484
                tasksToReconcile.push(taskName);
121✔
485
            } else {
486
                debug(`[mongodash] ${msg} Reconciliation disabled.`);
1✔
487
            }
488

489
            if (!update.$set) update.$set = {};
122✔
490
            update.$set[`tasks.${taskName}.triggerConfig`] = currentTriggerConfig;
122✔
491
            return true;
122✔
492
        }
493
        return false;
6✔
494
    }
495

496
    private async checkLogicEvolution(
497
        taskName: string,
498
        taskDef: ReactiveTaskInternal<Document>,
499
        evolution: EvolutionConfig,
500
        storedState: { triggerConfig?: unknown; handlerVersion?: number },
501
        update: Document,
502
    ): Promise<boolean> {
503
        const currentVersion = evolution.handlerVersion ?? 1;
128!
504
        const storedVersion = storedState?.handlerVersion ?? (storedState ? 0 : 1);
128!
505

506
        if (currentVersion > storedVersion) {
128✔
507
            const policy = evolution.onHandlerVersionChange || 'none';
2!
508
            debug(`Handler upgraded for [${taskName}](v${storedVersion} -> v${currentVersion}).Policy: ${policy} `);
2✔
509

510
            const entry = this.registry.getEntry(taskDef.sourceCollection.collectionName);
2✔
511
            if (entry) {
2!
512
                if (policy === 'reprocess_failed') {
2✔
513
                    await entry.repository.resetTasksForUpgrade(taskName, 'failed');
1✔
514
                } else if (policy === 'reprocess_all') {
1!
515
                    await entry.repository.resetTasksForUpgrade(taskName, 'all');
1✔
516
                }
517
            }
518

519
            if (!update.$set) update.$set = {};
2!
520
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
2✔
521
            return true;
2✔
522
        } else if (currentVersion < storedVersion) {
126!
523
            debug(
×
524
                `[mongodash] ReactiveTask[${taskName}]: Current handlerVersion(${currentVersion}) is LOWER than stored version(${storedVersion}).Rollback detected ? `,
525
            );
526
        } else if (!storedState && currentVersion === 1) {
126✔
527
            // Safe Adoption
528
            if (!update.$set) update.$set = {};
119!
529
            update.$set[`tasks.${taskName}.handlerVersion`] = currentVersion;
119✔
530
            return true;
119✔
531
        }
532
        return false;
7✔
533
    }
534
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc