• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20691682217

04 Jan 2026 10:40AM UTC coverage: 91.944% (+0.1%) from 91.81%
20691682217

push

github

VaclavObornik
2.4.1

1255 of 1452 branches covered (86.43%)

Branch coverage included in aggregate %.

2009 of 2098 relevant lines covered (95.76%)

379.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.43
/src/reactiveTasks/ReactiveTaskWorker.ts
1
import * as _debug from 'debug';
221✔
2
import { Document, Filter, FindOptions } from 'mongodb';
3
import { createContinuousLock } from '../createContinuousLock';
221✔
4
import { onError } from '../OnError';
221✔
5
import { onInfo } from '../OnInfo';
221✔
6
import { compileWatchProjection } from './compileWatchProjection';
221✔
7
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
8
import {
221✔
9
    CODE_REACTIVE_TASK_FAILED,
10
    CODE_REACTIVE_TASK_FINISHED,
11
    CODE_REACTIVE_TASK_STARTED,
12
    ReactiveTaskCaller,
13
    ReactiveTaskContext,
14
    ReactiveTaskFilter,
15
    ReactiveTaskRecord,
16
    TaskConditionFailedError,
17
} from './ReactiveTaskTypes';
18

19
import { MetricsCollector } from './MetricsCollector';
20

21
export interface WorkerCallbacks {
22
    onTaskFound: (collectionName: string) => void;
23
}
24

25
const debug = _debug('mongodash:reactiveTasks:worker');
221✔
26

27
/**
28
 * Responsible for executing reactive tasks.
29
 *
30
 * Responsibilities:
31
 * - Polls for pending tasks from the database.
32
 * - Applies filtering to restrict which tasks this worker processes.
33
 * - Locks tasks during execution to prevent concurrent processing.
34
 * - Fetches the source document and executes the user-defined handler.
35
 * - Handles task completion, failure, retries, and dead-letter queueing.
36
 * - Manages the visibility timeout lock extension.
37
 */
38
export class ReactiveTaskWorker {
221✔
39
    private taskCaller: ReactiveTaskCaller;
40
    private throttledUntil = new Map<string, Date>();
132✔
41

42
    constructor(
43
        private instanceId: string,
132✔
44
        private registry: ReactiveTaskRegistry,
132✔
45
        private callbacks: WorkerCallbacks,
132✔
46
        private internalOptions: { visibilityTimeoutMs: number } = { visibilityTimeoutMs: 300000 },
132!
47
        taskCaller?: ReactiveTaskCaller,
48
        private taskFilter?: ReactiveTaskFilter,
132✔
49
        private metricsCollector?: MetricsCollector,
132✔
50
    ) {
51
        this.taskCaller = taskCaller || ((task) => task());
132✔
52
    }
53

54
    public async tryRunATask(collectionName: string): Promise<void> {
55
        const entry = this.registry.getEntry(collectionName);
1,725✔
56

57
        let tasks = this.registry.getAllTasks();
1,725✔
58
        if (this.taskFilter) {
1,725✔
59
            tasks = tasks.filter((t) => this.taskFilter!({ task: t.task }));
16✔
60
        }
61
        if (!tasks.length) {
1,725✔
62
            return;
4✔
63
        }
64

65
        // Filter out throttled tasks
66
        const now = Date.now();
1,721✔
67
        tasks = tasks.filter((t) => {
1,721✔
68
            const until = this.throttledUntil.get(t.task);
3,688✔
69
            if (until && until.getTime() > now) {
3,688✔
70
                return false;
331✔
71
            }
72
            if (until) {
3,357✔
73
                this.throttledUntil.delete(t.task); // Cleanup expired throttle
4✔
74
            }
75
            return true;
3,357✔
76
        });
77

78
        if (!tasks.length) {
1,721!
79
            return;
×
80
        }
81

82
        const taskRecord = await entry.repository.findAndLockNextTask(tasks, {
1,721✔
83
            visibilityTimeoutMs: this.internalOptions.visibilityTimeoutMs,
84
        });
85
        if (taskRecord) {
1,721✔
86
            this.callbacks.onTaskFound(collectionName);
690✔
87
            await this.processTask(taskRecord);
690✔
88
        }
89
    }
90

91
    private async processTask(taskRecord: ReactiveTaskRecord<Document>): Promise<void> {
92
        const taskDef = this.registry.getTask(taskRecord.task)!;
690✔
93
        const tasksCollection = taskDef.tasksCollection;
690✔
94

95
        let deferredTo: Date | undefined;
96
        let throttledUntil: Date | undefined;
97

98
        let isManuallyFinalized = false;
690✔
99

100
        const finalizeTaskSuccess = async (duration: number, session?: import('mongodb').ClientSession) => {
690✔
101
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'success', duration);
653!
102

103
            const entry = this.registry.getEntry(tasksCollection.collectionName);
653✔
104
            await entry.repository.finalizeTask(
653✔
105
                taskRecord,
106
                taskDef.retryStrategy,
107
                undefined,
108
                taskDef.debounceMs,
109
                { durationMs: duration },
110
                taskDef.executionHistoryLimit,
111
                session ? { session } : undefined,
653✔
112
            );
113
        };
114

115
        const context: ReactiveTaskContext<Document> = {
690✔
116
            docId: taskRecord.sourceDocId,
117
            watchedValues: taskRecord.lastObservedValues || null,
690!
118
            getDocument: async (options?: FindOptions) => {
119
                const queryConditions: Filter<Document>[] = [{ _id: taskRecord.sourceDocId }];
63✔
120
                if (taskDef.filter) {
63✔
121
                    queryConditions.push({ $expr: taskDef.filter });
14✔
122
                }
123

124
                if (taskRecord.lastObservedValues && Object.keys(taskRecord.lastObservedValues).length > 0) {
63!
125
                    // Optimistic Locking: Ensure watched values match what triggered the task
126
                    // We use the same projection logic as the planner to compare current DB state vs snapshot
127
                    const projectionExpr = compileWatchProjection(taskDef.watchProjection);
63✔
128
                    queryConditions.push({ $expr: { $eq: [projectionExpr, taskRecord.lastObservedValues] } });
63✔
129
                }
130

131
                const query = (queryConditions.length > 1 ? { $and: queryConditions } : queryConditions[0]) as Filter<Document>;
63!
132
                const sourceDoc = await taskDef.sourceCollection.findOne(query, options);
63✔
133

134
                if (!sourceDoc) {
63✔
135
                    throw new TaskConditionFailedError();
3✔
136
                }
137

138
                return sourceDoc;
60✔
139
            },
140
            deferCurrent: (delay: number | Date) => {
141
                deferredTo = typeof delay === 'number' ? new Date(Date.now() + delay) : delay;
2✔
142
            },
143
            throttleAll: (until: number | Date) => {
144
                throttledUntil = typeof until === 'number' ? new Date(Date.now() + until) : until;
4✔
145
            },
146
            markCompleted: async (options?: { session?: import('mongodb').ClientSession }) => {
147
                if (isManuallyFinalized) {
5✔
148
                    return; // Idempotent
1✔
149
                }
150

151
                isManuallyFinalized = true;
4✔
152
                const duration = Date.now() - start;
4✔
153

154
                try {
4✔
155
                    await finalizeTaskSuccess(duration, options?.session);
4✔
156
                } catch (error) {
157
                    isManuallyFinalized = false;
×
158
                    throw error;
×
159
                }
160
            },
161
        };
162

163
        const stopLock = createContinuousLock(tasksCollection, taskRecord._id, 'nextRunAt', this.internalOptions.visibilityTimeoutMs);
690✔
164

165
        const processTheTask = async () => {
690✔
166
            const start = Date.now();
690✔
167
            onInfo({
690✔
168
                message: `Reactive task '${taskRecord.task}' started.`,
169
                taskId: taskRecord._id.toString(),
170
                code: CODE_REACTIVE_TASK_STARTED,
171
            });
172

173
            try {
690✔
174
                await taskDef.handler(context);
690✔
175

176
                const duration = Date.now() - start;
650✔
177
                onInfo({
650✔
178
                    message: `Reactive task '${taskRecord.task}' finished in ${duration}ms.`,
179
                    taskId: taskRecord._id.toString(),
180
                    code: CODE_REACTIVE_TASK_FINISHED,
181
                    duration,
182
                });
183
            } catch (err) {
184
                if (err instanceof TaskConditionFailedError) {
40✔
185
                    const duration = Date.now() - start;
3✔
186
                    debug(
3✔
187
                        `[Scheduler ${this.instanceId}] Source document ${taskRecord.sourceDocId} not found or does not match filter for task ${taskRecord._id}. Marking as completed (skipped).`,
188
                    );
189
                    onInfo({
3✔
190
                        message: `Reactive task '${taskRecord.task}' finished in ${duration}ms (skipped - filter mismatch).`,
191
                        taskId: taskRecord._id.toString(),
192
                        code: CODE_REACTIVE_TASK_FINISHED,
193
                        duration,
194
                    });
195
                    // Treat as success
196
                    return;
3✔
197
                }
198

199
                onError(err as Error);
37✔
200

201
                const duration = Date.now() - start;
37✔
202
                const reason = err instanceof Error ? err.message : `${err}`;
37!
203
                onInfo({
37✔
204
                    message: `Reactive task '${taskRecord.task}' failed in ${duration}ms.`,
205
                    taskId: taskRecord._id.toString(),
206
                    code: CODE_REACTIVE_TASK_FAILED,
207
                    reason,
208
                    duration,
209
                });
210
                throw err;
37✔
211
            }
212
        };
213

214
        const start = Date.now();
690✔
215

216
        if (taskRecord.attempts > 1) {
690✔
217
            this.metricsCollector?.recordRetry(taskRecord.task);
22!
218
        }
219

220
        try {
690✔
221
            await this.taskCaller(processTheTask);
690✔
222
            await stopLock();
653✔
223
            const duration = Date.now() - start;
653✔
224

225
            if (throttledUntil) {
653✔
226
                this.throttledUntil.set(taskRecord.task, throttledUntil);
4✔
227
                debug(`[Scheduler ${this.instanceId}] Throttling task '${taskRecord.task}' until ${throttledUntil.toISOString()}`);
4✔
228
            }
229

230
            if (deferredTo) {
653✔
231
                if (isManuallyFinalized) {
2!
232
                    onInfo({
×
233
                        message: `[ReactiveTask] Task '${taskRecord.task}' (ID: ${taskRecord._id}) was manually marked as completed, but deferCurrent() was also called. Ignoring defer request.`,
234
                        code: 'reactiveTaskDeferIgnored',
235
                        taskId: taskRecord._id.toString(),
236
                    });
237
                    return;
×
238
                }
239

240
                debug(`[Scheduler ${this.instanceId}] Deferring task '${taskRecord.task}' until ${deferredTo.toISOString()}`);
2✔
241
                const entry = this.registry.getEntry(tasksCollection.collectionName);
2✔
242
                await entry.repository.deferTask(taskRecord, deferredTo);
2✔
243
                return;
2✔
244
            }
245

246
            if (!isManuallyFinalized) {
651✔
247
                await finalizeTaskSuccess(duration);
649✔
248
            }
249
        } catch (error) {
250
            // Logging is already done in processTheTask via onInfo
251
            await stopLock();
37✔
252
            const duration = Date.now() - start;
37✔
253

254
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'failed', duration);
37!
255

256
            const entry = this.registry.getEntry(tasksCollection.collectionName);
37✔
257

258
            await entry.repository.finalizeTask(
37✔
259
                taskRecord,
260
                taskDef.retryStrategy,
261
                error as Error,
262
                taskDef.debounceMs,
263
                { durationMs: duration },
264
                taskDef.executionHistoryLimit,
265
            );
266
        }
267
    }
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc