• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 24607504909

18 Apr 2026 03:09PM UTC coverage: 91.218% (+0.09%) from 91.133%
24607504909

push

github

VaclavObornik
fix: address Copilot review round 11

1. waitUntilReactiveTasksIdle (whitelist mode) now always checks the
   planner buffer. Skipping it entirely let the helper return idle
   before an in-flight change event had a chance to be turned into
   the task record the DB check looks for. Only the worker count
   check remains global-scope-skipped so unrelated tests' worker
   pools cannot block a scoped wait.

2. docs/reactive-tasks/testing.md now notes the narrow race where
   triggering a write and immediately calling the scoped wait can
   return early if stabilityDurationMs is too short, with a
   recommended workaround (raise stabilityDurationMs or pre-poll
   for the task record).

3. cronTasksConcurrency 'no polls after stop' assertion tightened:
   stopCronTasks is fire-and-forget, so a worker already inside
   findOneAndUpdate at the time could still tick the spy. Capture
   the poll count after a 200ms grace period and assert it does
   not grow over a full 5s back-off window, instead of requiring
   spy.called === false outright.

1492 of 1748 branches covered (85.35%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

54 existing lines in 9 files now uncovered.

2320 of 2431 relevant lines covered (95.43%)

373.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.16
/src/reactiveTasks/ReactiveTaskWorker.ts
1
import * as _debug from 'debug';
263✔
2
import { Document, Filter, FindOptions } from 'mongodb';
3
import { createContinuousLock } from '../createContinuousLock';
263✔
4
import { onError } from '../OnError';
263✔
5
import { onInfo } from '../OnInfo';
263✔
6
import { compileWatchProjection } from './compileWatchProjection';
263✔
7
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
8
import {
263✔
9
    CODE_REACTIVE_TASK_FAILED,
10
    CODE_REACTIVE_TASK_FINISHED,
11
    CODE_REACTIVE_TASK_LOCK_LOST,
12
    CODE_REACTIVE_TASK_STARTED,
13
    ReactiveTaskCaller,
14
    ReactiveTaskContext,
15
    ReactiveTaskFilter,
16
    ReactiveTaskRecord,
17
    TaskConditionFailedError,
18
} from './ReactiveTaskTypes';
19

20
import { MetricsCollector } from './MetricsCollector';
21

22
export interface WorkerCallbacks {
23
    onTaskFound: (collectionName: string) => void;
24
}
25

26
const debug = _debug('mongodash:reactiveTasks:worker');
263✔
27

28
/**
29
 * Responsible for executing reactive tasks.
30
 *
31
 * Responsibilities:
32
 * - Polls for pending tasks from the database.
33
 * - Applies filtering to restrict which tasks this worker processes.
34
 * - Locks tasks during execution to prevent concurrent processing.
35
 * - Fetches the source document and executes the user-defined handler.
36
 * - Handles task completion, failure, retries, and dead-letter queueing.
37
 * - Manages the visibility timeout lock extension.
38
 */
39
export class ReactiveTaskWorker {
263✔
40
    private taskCaller: ReactiveTaskCaller;
41
    private throttledUntil = new Map<string, Date>();
155✔
42

43
    constructor(
44
        private instanceId: string,
155✔
45
        private registry: ReactiveTaskRegistry,
155✔
46
        private callbacks: WorkerCallbacks,
155✔
47
        private internalOptions: { visibilityTimeoutMs: number } = { visibilityTimeoutMs: 300000 },
155!
48
        taskCaller?: ReactiveTaskCaller,
49
        private taskFilter?: ReactiveTaskFilter,
155✔
50
        private metricsCollector?: MetricsCollector,
155✔
51
    ) {
52
        this.taskCaller = taskCaller || ((task) => task());
155✔
53
    }
54

55
    public async tryRunATask(collectionName: string): Promise<void> {
56
        const entry = this.registry.getEntry(collectionName);
1,815✔
57

58
        let tasks = this.registry.getAllTasks();
1,815✔
59
        if (this.taskFilter) {
1,815✔
60
            tasks = tasks.filter((t) => this.taskFilter!({ task: t.task }));
16✔
61
        }
62
        if (!tasks.length) {
1,815✔
63
            return;
4✔
64
        }
65

66
        // Filter out throttled tasks
67
        const now = Date.now();
1,811✔
68
        tasks = tasks.filter((t) => {
1,811✔
69
            const until = this.throttledUntil.get(t.task);
3,772✔
70
            if (until && until.getTime() > now) {
3,772✔
71
                return false;
321✔
72
            }
73
            if (until) {
3,451✔
74
                this.throttledUntil.delete(t.task); // Cleanup expired throttle
4✔
75
            }
76
            return true;
3,451✔
77
        });
78

79
        if (!tasks.length) {
1,811!
UNCOV
80
            return;
×
81
        }
82

83
        const taskRecord = await entry.repository.findAndLockNextTask(tasks, {
1,811✔
84
            visibilityTimeoutMs: this.internalOptions.visibilityTimeoutMs,
85
        });
86
        if (taskRecord) {
1,811✔
87
            this.callbacks.onTaskFound(collectionName);
702✔
88
            await this.processTask(taskRecord);
702✔
89
        }
90
    }
91

92
    private async processTask(taskRecord: ReactiveTaskRecord<Document>): Promise<void> {
93
        const taskDef = this.registry.getTask(taskRecord.task)!;
702✔
94
        const tasksCollection = taskDef.tasksCollection;
702✔
95

96
        let deferredTo: Date | undefined;
97
        let throttledUntil: Date | undefined;
98

99
        let isManuallyFinalized = false;
702✔
100
        let lockLost = false;
702✔
101
        // Set by the outer flow once the continuous-lock is stopped; used by
102
        // markCompleted below so we can halt renewal *before* the finalize
103
        // write changes nextRunAt. Without this the next CAS renewal would
104
        // see its expected value no longer present and falsely report
105
        // onLockLost for a completion the same worker performed.
106
        let stopLock: () => Promise<void> = async () => {};
702✔
107

108
        const finalizeTaskSuccess = async (duration: number, session?: import('mongodb').ClientSession) => {
702✔
109
            if (lockLost) {
654!
110
                // Lock was stolen mid-handler. The new owner's claim rewrote
111
                // nextRunAt; writing completion here would either clobber that
112
                // claim or violate the at-least-once contract by marking the
113
                // task complete before the new owner finishes.
UNCOV
114
                return;
×
115
            }
116

117
            // Record the metric eagerly so scrapes that happen between the
118
            // handler returning and this DB write observing the success are
119
            // not misled. The finalize CAS below is a second line of defence
120
            // against a stolen lock; if it fires we surface lock-lost but
121
            // keep the duration sample (the handler did run successfully).
122
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'success', duration);
654!
123

124
            const entry = this.registry.getEntry(tasksCollection.collectionName);
654✔
125
            const finalized = await entry.repository.finalizeTask(
654✔
126
                taskRecord,
127
                taskDef.retryStrategy,
128
                undefined,
129
                taskDef.debounceMs,
130
                { durationMs: duration },
131
                taskDef.executionHistoryLimit,
132
                session ? { session } : undefined,
654✔
133
            );
134

135
            if (!finalized && !lockLost) {
654✔
136
                // Silent lock loss: startedAt changed out from under us
137
                // between the continuous-lock CAS renewal ticks. Flip the
138
                // flag so later paths skip their own writes, bump the
139
                // lock-lost counter, and emit onInfo for operator visibility.
140
                lockLost = true;
1✔
141
                this.metricsCollector?.recordLockLost(taskRecord.task);
1!
142
                onInfo({
1✔
143
                    message: `Reactive task '${taskRecord.task}' finalize skipped - lock lost (startedAt mismatch). Another worker is handling this task.`,
144
                    taskId: taskRecord._id.toString(),
145
                    code: CODE_REACTIVE_TASK_LOCK_LOST,
146
                });
147
            }
148
        };
149

150
        const context: ReactiveTaskContext<Document> = {
702✔
151
            docId: taskRecord.sourceDocId,
152
            watchedValues: taskRecord.lastObservedValues || null,
702!
153
            getDocument: async (options?: FindOptions) => {
154
                const queryConditions: Filter<Document>[] = [{ _id: taskRecord.sourceDocId }];
65✔
155
                if (taskDef.filter) {
65✔
156
                    queryConditions.push({ $expr: taskDef.filter });
14✔
157
                }
158

159
                if (taskRecord.lastObservedValues && Object.keys(taskRecord.lastObservedValues).length > 0) {
65!
160
                    // Optimistic Locking: Ensure watched values match what triggered the task
161
                    // We use the same projection logic as the planner to compare current DB state vs snapshot
162
                    const projectionExpr = compileWatchProjection(taskDef.watchProjection);
65✔
163
                    queryConditions.push({ $expr: { $eq: [projectionExpr, taskRecord.lastObservedValues] } });
65✔
164
                }
165

166
                const query = (queryConditions.length > 1 ? { $and: queryConditions } : queryConditions[0]) as Filter<Document>;
65!
167
                const sourceDoc = await taskDef.sourceCollection.findOne(query, options);
65✔
168

169
                if (!sourceDoc) {
65✔
170
                    throw new TaskConditionFailedError();
3✔
171
                }
172

173
                return sourceDoc;
62✔
174
            },
175
            deferCurrent: (delay: number | Date) => {
176
                deferredTo = typeof delay === 'number' ? new Date(Date.now() + delay) : delay;
2✔
177
            },
178
            throttleAll: (until: number | Date) => {
179
                throttledUntil = typeof until === 'number' ? new Date(Date.now() + until) : until;
4✔
180
            },
181
            markCompleted: async (options?: { session?: import('mongodb').ClientSession }) => {
182
                if (isManuallyFinalized) {
5✔
183
                    return; // Idempotent
1✔
184
                }
185

186
                isManuallyFinalized = true;
4✔
187
                const duration = Date.now() - start;
4✔
188

189
                // Stop the continuous-lock renewal *before* finalize writes a
190
                // new nextRunAt. Otherwise an in-flight renewal CAS would see
191
                // its expected value overwritten and report a false
192
                // onLockLost for a completion the same worker performed.
193
                await stopLock();
4✔
194

195
                try {
4✔
196
                    await finalizeTaskSuccess(duration, options?.session);
4✔
197
                } catch (error) {
UNCOV
198
                    isManuallyFinalized = false;
×
UNCOV
199
                    throw error;
×
200
                }
201
            },
202
        };
203

204
        stopLock = createContinuousLock(tasksCollection, taskRecord._id, 'nextRunAt', this.internalOptions.visibilityTimeoutMs, {
702✔
205
            expectedInitialValue: taskRecord.nextRunAt,
206
            onLockLost: () => {
207
                lockLost = true;
2✔
208
                this.metricsCollector?.recordLockLost(taskRecord.task);
2!
209
                onInfo({
2✔
210
                    message: `Reactive task '${taskRecord.task}' lock lost - another worker took over (likely visibility timeout elapsed). Skipping finalize to preserve new claim.`,
211
                    taskId: taskRecord._id.toString(),
212
                    code: CODE_REACTIVE_TASK_LOCK_LOST,
213
                });
214
            },
215
        });
216

217
        const processTheTask = async () => {
702✔
218
            const start = Date.now();
702✔
219
            onInfo({
702✔
220
                message: `Reactive task '${taskRecord.task}' started.`,
221
                taskId: taskRecord._id.toString(),
222
                code: CODE_REACTIVE_TASK_STARTED,
223
            });
224

225
            try {
702✔
226
                await taskDef.handler(context);
702✔
227

228
                const duration = Date.now() - start;
653✔
229
                onInfo({
653✔
230
                    message: `Reactive task '${taskRecord.task}' finished in ${duration}ms.`,
231
                    taskId: taskRecord._id.toString(),
232
                    code: CODE_REACTIVE_TASK_FINISHED,
233
                    duration,
234
                });
235
            } catch (err) {
236
                if (err instanceof TaskConditionFailedError) {
49✔
237
                    const duration = Date.now() - start;
3✔
238
                    debug(
3✔
239
                        `[Scheduler ${this.instanceId}] Source document ${taskRecord.sourceDocId} not found or does not match filter for task ${taskRecord._id}. Marking as completed (skipped).`,
240
                    );
241
                    onInfo({
3✔
242
                        message: `Reactive task '${taskRecord.task}' finished in ${duration}ms (skipped - filter mismatch).`,
243
                        taskId: taskRecord._id.toString(),
244
                        code: CODE_REACTIVE_TASK_FINISHED,
245
                        duration,
246
                    });
247
                    // Treat as success
248
                    return;
3✔
249
                }
250

251
                onError(err as Error);
46✔
252

253
                const duration = Date.now() - start;
46✔
254
                const reason = err instanceof Error ? err.message : `${err}`;
46!
255
                onInfo({
46✔
256
                    message: `Reactive task '${taskRecord.task}' failed in ${duration}ms.`,
257
                    taskId: taskRecord._id.toString(),
258
                    code: CODE_REACTIVE_TASK_FAILED,
259
                    reason,
260
                    duration,
261
                });
262
                throw err;
46✔
263
            }
264
        };
265

266
        const start = Date.now();
702✔
267

268
        if (taskRecord.attempts > 1) {
702✔
269
            this.metricsCollector?.recordRetry(taskRecord.task);
22!
270
        }
271

272
        try {
702✔
273
            await this.taskCaller(processTheTask);
702✔
274
            await stopLock();
656✔
275
            const duration = Date.now() - start;
656✔
276

277
            if (throttledUntil) {
656✔
278
                this.throttledUntil.set(taskRecord.task, throttledUntil);
4✔
279
                debug(`[Scheduler ${this.instanceId}] Throttling task '${taskRecord.task}' until ${throttledUntil.toISOString()}`);
4✔
280
            }
281

282
            if (lockLost) {
656✔
283
                // Another worker took over this task. Skip state transitions to
284
                // avoid stomping on the new owner's updates. Side effects done
285
                // by the handler have executed (at-least-once), the replacement
286
                // worker will run the task again and finalize it.
287
                return;
2✔
288
            }
289

290
            if (deferredTo) {
654✔
291
                if (isManuallyFinalized) {
2!
UNCOV
292
                    onInfo({
×
293
                        message: `[ReactiveTask] Task '${taskRecord.task}' (ID: ${taskRecord._id}) was manually marked as completed, but deferCurrent() was also called. Ignoring defer request.`,
294
                        code: 'reactiveTaskDeferIgnored',
295
                        taskId: taskRecord._id.toString(),
296
                    });
UNCOV
297
                    return;
×
298
                }
299

300
                debug(`[Scheduler ${this.instanceId}] Deferring task '${taskRecord.task}' until ${deferredTo.toISOString()}`);
2✔
301
                const entry = this.registry.getEntry(tasksCollection.collectionName);
2✔
302
                await entry.repository.deferTask(taskRecord, deferredTo);
2✔
303
                return;
2✔
304
            }
305

306
            if (!isManuallyFinalized) {
652✔
307
                await finalizeTaskSuccess(duration);
650✔
308
            }
309
        } catch (error) {
310
            // Logging is already done in processTheTask via onInfo
311
            await stopLock();
46✔
312
            const duration = Date.now() - start;
46✔
313

314
            if (lockLost) {
46!
315
                // Skip both metrics and finalize: the new owner will execute
316
                // and record its own metrics. See the success branch above
317
                // for the reasoning behind skipping finalize.
UNCOV
318
                return;
×
319
            }
320

321
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'failed', duration);
46!
322

323
            const entry = this.registry.getEntry(tasksCollection.collectionName);
46✔
324

325
            const finalized = await entry.repository.finalizeTask(
46✔
326
                taskRecord,
327
                taskDef.retryStrategy,
328
                error as Error,
329
                taskDef.debounceMs,
330
                { durationMs: duration },
331
                taskDef.executionHistoryLimit,
332
            );
333

334
            if (!finalized) {
46!
335
                // Silent lock loss (startedAt no longer matches): another
336
                // worker has re-claimed the task. The failure metric is
337
                // kept (the handler did throw) but we surface lock-lost
338
                // so operators can see why retry scheduling / dead-letter
339
                // transitions did not persist.
UNCOV
340
                this.metricsCollector?.recordLockLost(taskRecord.task);
×
UNCOV
341
                onInfo({
×
342
                    message: `Reactive task '${taskRecord.task}' error-finalize skipped - lock lost (startedAt mismatch). Another worker is handling this task.`,
343
                    taskId: taskRecord._id.toString(),
344
                    code: CODE_REACTIVE_TASK_LOCK_LOST,
345
                });
346
            }
347
        }
348
    }
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc