• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20560403433

28 Dec 2025 10:26PM UTC coverage: 91.897% (+0.06%) from 91.836%
20560403433

push

github

VaclavObornik
2.3.0

1250 of 1452 branches covered (86.09%)

Branch coverage included in aggregate %.

2005 of 2090 relevant lines covered (95.93%)

381.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.45
/src/reactiveTasks/ReactiveTaskWorker.ts
1
import * as _debug from 'debug';
218✔
2
import { Document, Filter, FindOptions } from 'mongodb';
3
import { createContinuousLock } from '../createContinuousLock';
218✔
4
import { defaultOnError, OnError } from '../OnError';
218✔
5
import { defaultOnInfo, OnInfo } from '../OnInfo';
218✔
6
import { compileWatchProjection } from './compileWatchProjection';
218✔
7
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
8
import {
218✔
9
    CODE_REACTIVE_TASK_FAILED,
10
    CODE_REACTIVE_TASK_FINISHED,
11
    CODE_REACTIVE_TASK_STARTED,
12
    ReactiveTaskCaller,
13
    ReactiveTaskContext,
14
    ReactiveTaskFilter,
15
    ReactiveTaskRecord,
16
    TaskConditionFailedError,
17
} from './ReactiveTaskTypes';
18

19
import { MetricsCollector } from './MetricsCollector';
20

21
export interface WorkerCallbacks {
22
    onTaskFound: (collectionName: string) => void;
23
}
24

25
const debug = _debug('mongodash:reactiveTasks:worker');
218✔
26

27
/**
28
 * Responsible for executing reactive tasks.
29
 *
30
 * Responsibilities:
31
 * - Polls for pending tasks from the database.
32
 * - Applies filtering to restrict which tasks this worker processes.
33
 * - Locks tasks during execution to prevent concurrent processing.
34
 * - Fetches the source document and executes the user-defined handler.
35
 * - Handles task completion, failure, retries, and dead-letter queueing.
36
 * - Manages the visibility timeout lock extension.
37
 */
38
export class ReactiveTaskWorker {
218✔
39
    private taskCaller: ReactiveTaskCaller;
40
    private throttledUntil = new Map<string, Date>();
131✔
41

42
    constructor(
43
        private instanceId: string,
131✔
44
        private registry: ReactiveTaskRegistry,
131✔
45
        private callbacks: WorkerCallbacks,
131✔
46
        private internalOptions: { visibilityTimeoutMs: number } = { visibilityTimeoutMs: 300000 },
131!
47
        taskCaller?: ReactiveTaskCaller,
48
        private taskFilter?: ReactiveTaskFilter,
131✔
49
        private onInfo: OnInfo = defaultOnInfo,
131!
50
        private onError: OnError = defaultOnError,
131!
51
        private metricsCollector?: MetricsCollector,
131✔
52
    ) {
53
        this.taskCaller = taskCaller || ((task) => task());
131✔
54
    }
55

56
    public async tryRunATask(collectionName: string): Promise<void> {
57
        const entry = this.registry.getEntry(collectionName);
1,837✔
58

59
        let tasks = this.registry.getAllTasks();
1,837✔
60
        if (this.taskFilter) {
1,837✔
61
            tasks = tasks.filter((t) => this.taskFilter!({ task: t.task }));
18✔
62
        }
63
        if (!tasks.length) {
1,837✔
64
            return;
4✔
65
        }
66

67
        // Filter out throttled tasks
68
        const now = Date.now();
1,833✔
69
        tasks = tasks.filter((t) => {
1,833✔
70
            const until = this.throttledUntil.get(t.task);
3,924✔
71
            if (until && until.getTime() > now) {
3,924✔
72
                return false;
321✔
73
            }
74
            if (until) {
3,603✔
75
                this.throttledUntil.delete(t.task); // Cleanup expired throttle
4✔
76
            }
77
            return true;
3,603✔
78
        });
79

80
        if (!tasks.length) {
1,833!
81
            return;
×
82
        }
83

84
        const taskRecord = await entry.repository.findAndLockNextTask(tasks, {
1,833✔
85
            visibilityTimeoutMs: this.internalOptions.visibilityTimeoutMs,
86
        });
87
        if (taskRecord) {
1,833✔
88
            this.callbacks.onTaskFound(collectionName);
687✔
89
            await this.processTask(taskRecord);
687✔
90
        }
91
    }
92

93
    private async processTask(taskRecord: ReactiveTaskRecord<Document>): Promise<void> {
94
        const taskDef = this.registry.getTask(taskRecord.task)!;
687✔
95
        const tasksCollection = taskDef.tasksCollection;
687✔
96

97
        let deferredTo: Date | undefined;
98
        let throttledUntil: Date | undefined;
99

100
        let isManuallyFinalized = false;
687✔
101

102
        const finalizeTaskSuccess = async (duration: number, session?: import('mongodb').ClientSession) => {
687✔
103
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'success', duration);
651!
104

105
            const entry = this.registry.getEntry(tasksCollection.collectionName);
651✔
106
            await entry.repository.finalizeTask(
651✔
107
                taskRecord,
108
                taskDef.retryStrategy,
109
                undefined,
110
                taskDef.debounceMs,
111
                { durationMs: duration },
112
                taskDef.executionHistoryLimit,
113
                session ? { session } : undefined,
651✔
114
            );
115
        };
116

117
        const context: ReactiveTaskContext<Document> = {
687✔
118
            docId: taskRecord.sourceDocId,
119
            watchedValues: taskRecord.lastObservedValues || null,
687!
120
            getDocument: async (options?: FindOptions) => {
121
                const queryConditions: Filter<Document>[] = [{ _id: taskRecord.sourceDocId }];
63✔
122
                if (taskDef.filter) {
63✔
123
                    queryConditions.push({ $expr: taskDef.filter });
14✔
124
                }
125

126
                if (taskRecord.lastObservedValues && Object.keys(taskRecord.lastObservedValues).length > 0) {
63!
127
                    // Optimistic Locking: Ensure watched values match what triggered the task
128
                    // We use the same projection logic as the planner to compare current DB state vs snapshot
129
                    const projectionExpr = compileWatchProjection(taskDef.watchProjection);
63✔
130
                    queryConditions.push({ $expr: { $eq: [projectionExpr, taskRecord.lastObservedValues] } });
63✔
131
                }
132

133
                const query = (queryConditions.length > 1 ? { $and: queryConditions } : queryConditions[0]) as Filter<Document>;
63!
134
                const sourceDoc = await taskDef.sourceCollection.findOne(query, options);
63✔
135

136
                if (!sourceDoc) {
63✔
137
                    throw new TaskConditionFailedError();
3✔
138
                }
139

140
                return sourceDoc;
60✔
141
            },
142
            deferCurrent: (delay: number | Date) => {
143
                deferredTo = typeof delay === 'number' ? new Date(Date.now() + delay) : delay;
2✔
144
            },
145
            throttleAll: (until: number | Date) => {
146
                throttledUntil = typeof until === 'number' ? new Date(Date.now() + until) : until;
4✔
147
            },
148
            markCompleted: async (options?: { session?: import('mongodb').ClientSession }) => {
149
                if (isManuallyFinalized) {
5✔
150
                    return; // Idempotent
1✔
151
                }
152

153
                isManuallyFinalized = true;
4✔
154
                const duration = Date.now() - start;
4✔
155

156
                try {
4✔
157
                    await finalizeTaskSuccess(duration, options?.session);
4✔
158
                } catch (error) {
159
                    isManuallyFinalized = false;
×
160
                    throw error;
×
161
                }
162
            },
163
        };
164

165
        const stopLock = createContinuousLock(tasksCollection, taskRecord._id, 'nextRunAt', this.internalOptions.visibilityTimeoutMs);
687✔
166

167
        const processTheTask = async () => {
687✔
168
            const start = Date.now();
687✔
169
            this.onInfo({
687✔
170
                message: `Reactive task '${taskRecord.task}' started.`,
171
                taskId: taskRecord._id.toString(),
172
                code: CODE_REACTIVE_TASK_STARTED,
173
            });
174

175
            try {
687✔
176
                await taskDef.handler(context);
687✔
177

178
                const duration = Date.now() - start;
648✔
179
                this.onInfo({
648✔
180
                    message: `Reactive task '${taskRecord.task}' finished in ${duration}ms.`,
181
                    taskId: taskRecord._id.toString(),
182
                    code: CODE_REACTIVE_TASK_FINISHED,
183
                    duration,
184
                });
185
            } catch (err) {
186
                if (err instanceof TaskConditionFailedError) {
39✔
187
                    const duration = Date.now() - start;
3✔
188
                    debug(
3✔
189
                        `[Scheduler ${this.instanceId}] Source document ${taskRecord.sourceDocId} not found or does not match filter for task ${taskRecord._id}. Marking as completed (skipped).`,
190
                    );
191
                    this.onInfo({
3✔
192
                        message: `Reactive task '${taskRecord.task}' finished in ${duration}ms (skipped - filter mismatch).`,
193
                        taskId: taskRecord._id.toString(),
194
                        code: CODE_REACTIVE_TASK_FINISHED,
195
                        duration,
196
                    });
197
                    // Treat as success
198
                    return;
3✔
199
                }
200

201
                const duration = Date.now() - start;
36✔
202
                const reason = err instanceof Error ? err.message : `${err}`;
36!
203
                this.onInfo({
36✔
204
                    message: `Reactive task '${taskRecord.task}' failed in ${duration}ms.`,
205
                    taskId: taskRecord._id.toString(),
206
                    code: CODE_REACTIVE_TASK_FAILED,
207
                    reason,
208
                    duration,
209
                });
210
                throw err;
36✔
211
            }
212
        };
213

214
        const start = Date.now();
687✔
215

216
        if (taskRecord.attempts > 1) {
687✔
217
            this.metricsCollector?.recordRetry(taskRecord.task);
23!
218
        }
219

220
        try {
687✔
221
            await this.taskCaller(processTheTask);
687✔
222
            await stopLock();
651✔
223
            const duration = Date.now() - start;
651✔
224

225
            if (throttledUntil) {
651✔
226
                this.throttledUntil.set(taskRecord.task, throttledUntil);
4✔
227
                debug(`[Scheduler ${this.instanceId}] Throttling task '${taskRecord.task}' until ${throttledUntil.toISOString()}`);
4✔
228
            }
229

230
            if (deferredTo) {
651✔
231
                if (isManuallyFinalized) {
2!
232
                    this.onInfo({
×
233
                        message: `[ReactiveTask] Task '${taskRecord.task}' (ID: ${taskRecord._id}) was manually marked as completed, but deferCurrent() was also called. Ignoring defer request.`,
234
                        code: 'reactiveTaskDeferIgnored',
235
                        taskId: taskRecord._id.toString(),
236
                    });
237
                    return;
×
238
                }
239

240
                debug(`[Scheduler ${this.instanceId}] Deferring task '${taskRecord.task}' until ${deferredTo.toISOString()}`);
2✔
241
                const entry = this.registry.getEntry(tasksCollection.collectionName);
2✔
242
                await entry.repository.deferTask(taskRecord, deferredTo);
2✔
243
                return;
2✔
244
            }
245

246
            if (!isManuallyFinalized) {
649✔
247
                await finalizeTaskSuccess(duration);
647✔
248
            }
249
        } catch (error) {
250
            // Logging is already done in processTheTask via onInfo
251
            await stopLock();
36✔
252
            const duration = Date.now() - start;
36✔
253

254
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'failed', duration);
36!
255

256
            const entry = this.registry.getEntry(tasksCollection.collectionName);
36✔
257

258
            await entry.repository.finalizeTask(
36✔
259
                taskRecord,
260
                taskDef.retryStrategy,
261
                error as Error,
262
                taskDef.debounceMs,
263
                { durationMs: duration },
264
                taskDef.executionHistoryLimit,
265
            );
266
        }
267
    }
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc