• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20699666460

04 Jan 2026 09:53PM UTC coverage: 91.884% (-0.1%) from 91.998%
20699666460

push

github

VaclavObornik
2.4.3

1255 of 1456 branches covered (86.2%)

Branch coverage included in aggregate %.

2017 of 2105 relevant lines covered (95.82%)

374.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.86
/src/reactiveTasks/ReactiveTaskReconciler.ts
1
import * as _debug from 'debug';
228✔
2
import { Document } from 'mongodb';
3
import { GlobalsCollection } from '../globalsCollection';
4
import { OnInfo } from '../OnInfo';
5
import { processInBatches } from '../processInBatches';
228✔
6
import { withLock } from '../withLock';
228✔
7
import { ReactiveTaskOps } from './ReactiveTaskOps';
8
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
9
import {
228✔
10
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_FINISHED,
11
    CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
12
    MetaDocument,
13
    REACTIVE_TASK_META_DOC_ID,
14
} from './ReactiveTaskTypes';
15

16
const debug = _debug('mongodash:reactiveTasks:reconciler');
228✔
17

18
/**
19
 * Responsible for reconciling reactive tasks when the Change Stream history is lost or on startup.
20
 *
21
 * Responsibilities:
22
 * - Scans source collections to identify tasks that should exist.
23
 * - Uses `ReactiveTaskOps` to generate and execute task operations.
24
 * - Tracks reconciliation status in the meta document.
25
 */
26
export class ReactiveTaskReconciler {
228✔
27
    constructor(
28
        private instanceId: string,
138✔
29
        private globalsCollection: GlobalsCollection,
138✔
30
        private registry: ReactiveTaskRegistry,
138✔
31
        private ops: ReactiveTaskOps,
138✔
32
        private onInfo: OnInfo,
138✔
33
        private internalOptions: { batchSize: number; batchIntervalMs: number; minBatchIntervalMs: number; getNextCleanupDate: (date?: Date) => Date },
138✔
34
    ) {}
35

36
    private nextCleanupTime: number | null = null;
138✔
37

38
    public async reconcile(shouldStop: () => boolean): Promise<void> {
39
        debug(`[Scheduler ${this.instanceId}] Reconciliation started.`);
124✔
40
        this.onInfo({
124✔
41
            message: `Reconciliation started.`,
42
            code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_STARTED,
43
            taskCount: this.registry.getAllTasks().length,
44
        });
45

46
        const metaDoc = (await this.globalsCollection.findOne({ _id: REACTIVE_TASK_META_DOC_ID })) as MetaDocument | null;
124✔
47
        debug(`[Scheduler ${this.instanceId}] Meta doc loaded:`, metaDoc);
124✔
48

49
        // Iterate over all tasks and reconcile
50
        const taskEntries = this.registry.getAllEntries();
124✔
51
        for (const entry of taskEntries) {
124✔
52
            if (shouldStop()) {
128!
53
                debug(`[Scheduler ${this.instanceId}] Reconciliation stopped.`);
×
54
                return;
×
55
            }
56
            debug(`[Scheduler ${this.instanceId}] Reconciling collection: ${entry.tasksCollection.collectionName}`);
128✔
57

58
            // Filter tasks that need reconciliation
59
            const tasksToReconcile = new Set<string>();
128✔
60
            for (const task of entry.tasks.values()) {
128✔
61
                if (!metaDoc?.reconciliation?.[task.task]) {
130!
62
                    tasksToReconcile.add(task.task);
124✔
63
                } else {
64
                    debug(`[Scheduler ${this.instanceId}] Task ${task.task} is already reconciled. Skipping.`);
6✔
65
                }
66
            }
67

68
            if (tasksToReconcile.size === 0) {
128✔
69
                debug(`[Scheduler ${this.instanceId}] No tasks to reconcile for collection ${entry.tasksCollection.collectionName}.`);
6✔
70
                continue;
6✔
71
            }
72

73
            debug(`[Scheduler ${this.instanceId}] Checks for existing reconciliation state for collection: ${entry.tasksCollection.collectionName}`);
122✔
74

75
            // Check for existing checkpoint
76
            const collectionName = entry.sourceCollection.collectionName;
122✔
77
            const checkpoint = metaDoc?.reconciliationState?.[collectionName];
122!
78
            let lastId: unknown = null;
122✔
79
            let resume = false;
122✔
80

81
            if (checkpoint) {
122✔
82
                // Validate if the set of tasks matches
83
                // We must ensure that the tasks currently needing reconciliation are the subset of what was being reconciled
84
                // ACTUALLY: The checkpoint stores the set of tasks that WERE being reconciled.
85
                // If the current set `tasksToReconcile` is DIFFERENT from `checkpoint.taskNames`, we cannot guarantee consistency.
86
                // Example: We were reconciling "A" and "B". Now we need to reconcile "A", "B", "C". We must start over to include "C" for the already processed range.
87
                // Example 2: We were reconciling "A". Now we need "A" and "B". Start over.
88
                // Example 3: We were reconciling "A" and "B". Now we need only "A". Technically we could resume, but for safety/simplicity, we restart if sets enforce strict equality.
89

90
                const savedTasksSet = new Set(checkpoint.taskNames);
2✔
91
                const currentTasksSet = tasksToReconcile;
2✔
92

93
                const areSetsEqual = savedTasksSet.size === currentTasksSet.size && [...savedTasksSet].every((t) => currentTasksSet.has(t));
2✔
94

95
                if (areSetsEqual) {
2✔
96
                    debug(`[Scheduler ${this.instanceId}] Resuming reconciliation for ${collectionName} from id: ${checkpoint.lastId}`);
1✔
97
                    lastId = checkpoint.lastId;
1✔
98
                    resume = true;
1✔
99
                } else {
100
                    debug(`[Scheduler ${this.instanceId}] Reconciliation checkpoint invalid (task definitions changed). Restarting.`);
1✔
101
                }
102
            }
103

104
            const pipeline: Document[] = [
122✔
105
                { $sort: { _id: 1 } }, // Ensure stable sort for checkpointing
106
                { $project: { _id: 1 } },
107
            ];
108

109
            if (resume && lastId) {
122✔
110
                pipeline.unshift({ $match: { _id: { $gt: lastId } } });
1✔
111
            }
112

113
            try {
122✔
114
                // Use processInBatches to iterate over source documents and trigger planning for batches
115
                await processInBatches(
122✔
116
                    entry.sourceCollection,
117
                    pipeline,
118
                    (doc) => doc._id, // Transform to ID
1,329✔
119
                    async (ids) => {
120
                        await this.ops.executePlanningPipeline(entry.tasksCollection.collectionName, ids, tasksToReconcile);
30✔
121

122
                        // Update Checkpoint
123
                        if (ids.length > 0) {
30!
124
                            const lastProcessedId = ids[ids.length - 1];
30✔
125
                            const updatePath = `reconciliationState.${collectionName}`;
30✔
126
                            await this.globalsCollection.updateOne(
30✔
127
                                { _id: REACTIVE_TASK_META_DOC_ID },
128
                                {
129
                                    $set: {
130
                                        [`${updatePath}.lastId`]: lastProcessedId,
131
                                        [`${updatePath}.taskNames`]: Array.from(tasksToReconcile),
132
                                        [`${updatePath}.updatedAt`]: new Date(),
133
                                    },
134
                                },
135
                                { upsert: true },
136
                            );
137
                        }
138
                    },
139
                    {
140
                        batchSize: this.internalOptions.batchSize,
141
                        shouldStop,
142
                    },
143
                );
144

145
                if (shouldStop()) {
122!
146
                    debug(`[Scheduler ${this.instanceId}] Reconciliation stopped during processing. Checkpoint preserved.`);
×
147
                    return;
×
148
                }
149
                debug(`[Scheduler ${this.instanceId}] Reconciled collection: ${entry.tasksCollection.collectionName}`);
122✔
150

151
                // Mark processed tasks as reconciled AND remove checkpoint
152
                const update: Document = {
122✔
153
                    $unset: {
154
                        [`reconciliationState.${collectionName}`]: '',
155
                    },
156
                };
157
                for (const taskName of tasksToReconcile) {
122✔
158
                    const taskDef = entry.tasks.get(taskName);
124✔
159
                    if (taskDef) {
124!
160
                        await entry.repository.deleteOrphanedTasks(
124✔
161
                            taskName,
162
                            entry.sourceCollection.collectionName,
163
                            taskDef.filter || {},
213✔
164
                            taskDef.cleanupPolicyParsed,
165
                            shouldStop,
166
                        );
167
                    }
168
                    update.$set = update.$set || {};
124✔
169
                    update.$set[`reconciliation.${taskName}`] = true;
124✔
170
                }
171

172
                await this.globalsCollection.updateOne({ _id: REACTIVE_TASK_META_DOC_ID }, update, { upsert: true });
122✔
173
            } catch (error) {
174
                debug(`[Scheduler ${this.instanceId}] Error reconciling collection: ${entry.tasksCollection.collectionName}`, error);
×
175
                // Continue with other collections
176
            }
177
        }
178

179
        debug(`[Scheduler ${this.instanceId}] Reconciliation complete.`);
124✔
180

181
        try {
124✔
182
            await this.globalsCollection.updateOne({ _id: REACTIVE_TASK_META_DOC_ID }, { $set: { lastReconciledAt: new Date() } }, { upsert: true });
124✔
183
        } catch (e) {
184
            // Ignore error, metrics are best-effort
185
            debug(`[Scheduler ${this.instanceId}] Failed to update reconciliation timestamp`, e);
×
186
        }
187

188
        this.onInfo({
124✔
189
            message: `Reconciliation complete.`,
190
            code: CODE_REACTIVE_TASK_PLANNER_RECONCILIATION_FINISHED,
191
        });
192
    }
193

194
    public async markAsUnreconciled(taskNames: string[]): Promise<void> {
195
        if (taskNames.length === 0) return;
117!
196
        const update: Document = { $unset: {} };
117✔
197
        for (const task of taskNames) {
117✔
198
            update.$unset[`reconciliation.${task}`] = '';
122✔
199
        }
200
        await this.globalsCollection.updateOne({ _id: REACTIVE_TASK_META_DOC_ID }, update, { upsert: true });
117✔
201
    }
202

203
    public async performPeriodicCleanup(shouldStop: () => boolean): Promise<void> {
204
        const now = Date.now();
140✔
205

206
        // Fast path: check in-memory cache first to avoid database query
207
        if (this.nextCleanupTime !== null && now < this.nextCleanupTime) {
140✔
208
            return;
1✔
209
        }
210

211
        // Slow path: query database to get accurate lastCleanupAt
212
        const metaDoc = (await this.globalsCollection.findOne({ _id: REACTIVE_TASK_META_DOC_ID })) as MetaDocument | null;
139✔
213
        const lastCleanupDate = metaDoc?.lastCleanupAt ? new Date(metaDoc.lastCleanupAt) : undefined;
139!
214

215
        // Calculate next run time based on last run
216
        const nextRun = this.internalOptions.getNextCleanupDate(lastCleanupDate);
139✔
217
        this.nextCleanupTime = nextRun.getTime();
139✔
218

219
        if (now < nextRun.getTime()) {
139✔
220
            return;
131✔
221
        }
222

223
        // Acquire lock to prevent parallel runs during deployment transitions
224
        const lockKey = `${REACTIVE_TASK_META_DOC_ID}:cleanup`;
8✔
225
        try {
8✔
226
            await withLock(
8✔
227
                lockKey,
228
                async () => {
229
                    // Double-check after acquiring lock
230
                    const freshMetaDoc = (await this.globalsCollection.findOne({ _id: REACTIVE_TASK_META_DOC_ID })) as MetaDocument | null;
8✔
231
                    const freshLastCleanupDate = freshMetaDoc?.lastCleanupAt ? new Date(freshMetaDoc.lastCleanupAt) : undefined;
8!
232

233
                    const freshNextRun = this.internalOptions.getNextCleanupDate(freshLastCleanupDate);
8✔
234

235
                    if (now < freshNextRun.getTime()) {
8!
236
                        this.nextCleanupTime = freshNextRun.getTime();
×
237
                        return;
×
238
                    }
239

240
                    debug(`[Scheduler ${this.instanceId}] Starting periodic cleanup of orphaned tasks.`);
8✔
241

242
                    const entries = this.registry.getAllEntries();
8✔
243
                    for (const entry of entries) {
8✔
244
                        if (shouldStop()) return;
8!
245
                        for (const task of entry.tasks.values()) {
8✔
246
                            await entry.repository.deleteOrphanedTasks(
8✔
247
                                task.task,
248
                                entry.sourceCollection.collectionName,
249
                                task.filter || {},
8!
250
                                task.cleanupPolicyParsed,
251
                                shouldStop,
252
                            );
253
                        }
254
                    }
255

256
                    // Update lastCleanupAt in meta document
257
                    const cleanupTime = new Date();
8✔
258
                    await this.globalsCollection.updateOne({ _id: REACTIVE_TASK_META_DOC_ID }, { $set: { lastCleanupAt: cleanupTime } }, { upsert: true });
8✔
259

260
                    // Update in-memory cache with next cleanup time
261
                    this.nextCleanupTime = this.internalOptions.getNextCleanupDate(cleanupTime).getTime();
8✔
262
                },
263
                { maxWaitForLock: 1000, expireIn: 5 * 60 * 1000 },
264
            );
265
        } catch {
266
            // Lock already acquired by another process, skip this run
267
            debug(`[Scheduler ${this.instanceId}] Cleanup skipped - lock already acquired`);
×
268
        }
269
    }
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc