• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20544193028

27 Dec 2025 08:48PM UTC coverage: 91.89% (-0.05%) from 91.942%
20544193028

push

github

VaclavObornik
feat: add watchProjection to reactiveTask to optimize task triggering

Signed-off-by: Václav Oborník <vaclav.obornik@gmail.com>

1236 of 1436 branches covered (86.07%)

Branch coverage included in aggregate %.

1982 of 2066 relevant lines covered (95.93%)

387.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.94
/src/reactiveTasks/ReactiveTaskWorker.ts
1
import * as _debug from 'debug';
215✔
2
import { Collection, Document, Filter, FindOptions } from 'mongodb';
3
import { createContinuousLock } from '../createContinuousLock';
215✔
4
import { defaultOnError, OnError } from '../OnError';
215✔
5
import { defaultOnInfo, OnInfo } from '../OnInfo';
215✔
6
import { compileWatchProjection } from './compileWatchProjection';
215✔
7
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
8
import {
215✔
9
    CODE_REACTIVE_TASK_FAILED,
10
    CODE_REACTIVE_TASK_FINISHED,
11
    CODE_REACTIVE_TASK_STARTED,
12
    ReactiveTaskCaller,
13
    ReactiveTaskContext,
14
    ReactiveTaskFilter,
15
    ReactiveTaskRecord,
16
    TaskConditionFailedError,
17
} from './ReactiveTaskTypes';
18

19
import { MetricsCollector } from './MetricsCollector';
20

21
export interface WorkerCallbacks {
22
    onTaskFound: (collectionName: string) => void;
23
}
24

25
const debug = _debug('mongodash:reactiveTasks:worker');
215✔
26

27
/**
28
 * Responsible for executing reactive tasks.
29
 *
30
 * Responsibilities:
31
 * - Polls for pending tasks from the database.
32
 * - Applies filtering to restrict which tasks this worker processes.
33
 * - Locks tasks during execution to prevent concurrent processing.
34
 * - Fetches the source document and executes the user-defined handler.
35
 * - Handles task completion, failure, retries, and dead-letter queueing.
36
 * - Manages the visibility timeout lock extension.
37
 */
38
export class ReactiveTaskWorker {
215✔
39
    private taskCaller: ReactiveTaskCaller;
40
    private throttledUntil = new Map<string, Date>();
130✔
41

42
    constructor(
43
        private instanceId: string,
130✔
44
        private registry: ReactiveTaskRegistry,
130✔
45
        private callbacks: WorkerCallbacks,
130✔
46
        private internalOptions: { visibilityTimeoutMs: number } = { visibilityTimeoutMs: 300000 },
130!
47
        taskCaller?: ReactiveTaskCaller,
48
        private taskFilter?: ReactiveTaskFilter,
130✔
49
        private onInfo: OnInfo = defaultOnInfo,
130!
50
        private onError: OnError = defaultOnError,
130!
51
        private metricsCollector?: MetricsCollector,
130✔
52
    ) {
53
        this.taskCaller = taskCaller || ((task) => task());
130✔
54
    }
55

56
    public async tryRunATask(collectionName: string): Promise<void> {
57
        const entry = this.registry.getEntry(collectionName);
1,851✔
58

59
        let tasks = this.registry.getAllTasks();
1,851✔
60
        if (this.taskFilter) {
1,851✔
61
            tasks = tasks.filter((t) => this.taskFilter!({ task: t.task }));
18✔
62
        }
63
        if (!tasks.length) {
1,851✔
64
            return;
4✔
65
        }
66

67
        // Filter out throttled tasks
68
        const now = Date.now();
1,847✔
69
        tasks = tasks.filter((t) => {
1,847✔
70
            const until = this.throttledUntil.get(t.task);
4,001✔
71
            if (until && until.getTime() > now) {
4,001✔
72
                return false;
328✔
73
            }
74
            if (until) {
3,673✔
75
                this.throttledUntil.delete(t.task); // Cleanup expired throttle
4✔
76
            }
77
            return true;
3,673✔
78
        });
79

80
        if (!tasks.length) {
1,847!
81
            return;
×
82
        }
83

84
        const taskRecord = await entry.repository.findAndLockNextTask(tasks, {
1,847✔
85
            visibilityTimeoutMs: this.internalOptions.visibilityTimeoutMs,
86
        });
87
        if (taskRecord) {
1,847✔
88
            this.callbacks.onTaskFound(collectionName);
687✔
89
            await this.processTask(taskRecord);
687✔
90
        }
91
    }
92

93
    private async processTask(taskRecord: ReactiveTaskRecord<Document>): Promise<void> {
94
        const taskDef = this.registry.getTask(taskRecord.task)!;
687✔
95
        const tasksCollection = taskDef.tasksCollection;
687✔
96

97
        let deferredTo: Date | undefined;
98
        let throttledUntil: Date | undefined;
99

100
        let isManuallyFinalized = false;
687✔
101

102
        const finalizeTaskSuccess = async (duration: number, session?: import('mongodb').ClientSession) => {
687✔
103
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'success', duration);
651!
104

105
            const entry = this.registry.getEntry(tasksCollection.collectionName);
651✔
106
            await entry.repository.finalizeTask(
651✔
107
                taskRecord,
108
                taskDef.retryStrategy,
109
                undefined,
110
                taskDef.debounceMs,
111
                { durationMs: duration },
112
                taskDef.executionHistoryLimit,
113
                session ? { session } : undefined,
651✔
114
            );
115
        };
116

117
        const context: ReactiveTaskContext<Document> = {
687✔
118
            docId: taskRecord.sourceDocId,
119
            watchedValues: taskRecord.lastObservedValues || null,
687!
120
            getDocument: async (options?: FindOptions) => {
121
                const queryConditions: Filter<Document>[] = [{ _id: taskRecord.sourceDocId }];
63✔
122
                if (taskDef.filter) {
63✔
123
                    queryConditions.push({ $expr: taskDef.filter });
14✔
124
                }
125

126
                if (taskRecord.lastObservedValues && Object.keys(taskRecord.lastObservedValues).length > 0) {
63!
127
                    // Optimistic Locking: Ensure watched values match what triggered the task
128
                    // We use the same projection logic as the planner to compare current DB state vs snapshot
129
                    const projectionExpr = compileWatchProjection(taskDef.watchProjection);
63✔
130
                    queryConditions.push({ $expr: { $eq: [projectionExpr, taskRecord.lastObservedValues] } });
63✔
131
                }
132

133
                const query = (queryConditions.length > 1 ? { $and: queryConditions } : queryConditions[0]) as Filter<Document>;
63!
134
                const sourceDoc = await taskDef.sourceCollection.findOne(query, options);
63✔
135

136
                if (!sourceDoc) {
63✔
137
                    throw new TaskConditionFailedError();
3✔
138
                }
139

140
                return sourceDoc;
60✔
141
            },
142
            deferCurrent: (delay: number | Date) => {
143
                deferredTo = typeof delay === 'number' ? new Date(Date.now() + delay) : delay;
2✔
144
            },
145
            throttleAll: (until: number | Date) => {
146
                throttledUntil = typeof until === 'number' ? new Date(Date.now() + until) : until;
4✔
147
            },
148
            markCompleted: async (options?: { session?: import('mongodb').ClientSession }) => {
149
                if (isManuallyFinalized) {
5✔
150
                    return; // Idempotent
1✔
151
                }
152

153
                isManuallyFinalized = true;
4✔
154
                const duration = Date.now() - start;
4✔
155

156
                try {
4✔
157
                    await finalizeTaskSuccess(duration, options?.session);
4✔
158
                } catch (error) {
159
                    isManuallyFinalized = false;
×
160
                    throw error;
×
161
                }
162
            },
163
        };
164

165
        const stopLock = createContinuousLock(
687✔
166
            tasksCollection as unknown as Collection<{ _id: string; lockExpiresAt: Date | null }>,
167
            taskRecord._id.toString(),
168
            'lockExpiresAt',
169
            this.internalOptions.visibilityTimeoutMs,
170
            (error) => {
171
                this.onError(error);
×
172
            },
173
        );
174

175
        const processTheTask = async () => {
687✔
176
            const start = Date.now();
687✔
177
            this.onInfo({
687✔
178
                message: `Reactive task '${taskRecord.task}' started.`,
179
                taskId: taskRecord._id.toString(),
180
                code: CODE_REACTIVE_TASK_STARTED,
181
            });
182

183
            try {
687✔
184
                await taskDef.handler(context);
687✔
185

186
                const duration = Date.now() - start;
648✔
187
                this.onInfo({
648✔
188
                    message: `Reactive task '${taskRecord.task}' finished in ${duration}ms.`,
189
                    taskId: taskRecord._id.toString(),
190
                    code: CODE_REACTIVE_TASK_FINISHED,
191
                    duration,
192
                });
193
            } catch (err) {
194
                if (err instanceof TaskConditionFailedError) {
39✔
195
                    const duration = Date.now() - start;
3✔
196
                    debug(
3✔
197
                        `[Scheduler ${this.instanceId}] Source document ${taskRecord.sourceDocId} not found or does not match filter for task ${taskRecord._id}. Marking as completed (skipped).`,
198
                    );
199
                    this.onInfo({
3✔
200
                        message: `Reactive task '${taskRecord.task}' finished in ${duration}ms (skipped - filter mismatch).`,
201
                        taskId: taskRecord._id.toString(),
202
                        code: CODE_REACTIVE_TASK_FINISHED,
203
                        duration,
204
                    });
205
                    // Treat as success
206
                    return;
3✔
207
                }
208

209
                const duration = Date.now() - start;
36✔
210
                const reason = err instanceof Error ? err.message : `${err}`;
36!
211
                this.onInfo({
36✔
212
                    message: `Reactive task '${taskRecord.task}' failed in ${duration}ms.`,
213
                    taskId: taskRecord._id.toString(),
214
                    code: CODE_REACTIVE_TASK_FAILED,
215
                    reason,
216
                    duration,
217
                });
218
                throw err;
36✔
219
            }
220
        };
221

222
        const start = Date.now();
687✔
223

224
        if (taskRecord.attempts > 1) {
687✔
225
            this.metricsCollector?.recordRetry(taskRecord.task);
23!
226
        }
227

228
        try {
687✔
229
            await this.taskCaller(processTheTask);
687✔
230
            await stopLock();
651✔
231
            const duration = Date.now() - start;
651✔
232

233
            if (throttledUntil) {
651✔
234
                this.throttledUntil.set(taskRecord.task, throttledUntil);
4✔
235
                debug(`[Scheduler ${this.instanceId}] Throttling task '${taskRecord.task}' until ${throttledUntil.toISOString()}`);
4✔
236
            }
237

238
            if (deferredTo) {
651✔
239
                if (isManuallyFinalized) {
2!
240
                    this.onInfo({
×
241
                        message: `[ReactiveTask] Task '${taskRecord.task}' (ID: ${taskRecord._id}) was manually marked as completed, but deferCurrent() was also called. Ignoring defer request.`,
242
                        code: 'reactiveTaskDeferIgnored',
243
                        taskId: taskRecord._id.toString(),
244
                    });
245
                    return;
×
246
                }
247

248
                debug(`[Scheduler ${this.instanceId}] Deferring task '${taskRecord.task}' until ${deferredTo.toISOString()}`);
2✔
249
                const entry = this.registry.getEntry(tasksCollection.collectionName);
2✔
250
                await entry.repository.deferTask(taskRecord, deferredTo);
2✔
251
                return;
2✔
252
            }
253

254
            if (!isManuallyFinalized) {
649✔
255
                await finalizeTaskSuccess(duration);
647✔
256
            }
257
        } catch (error) {
258
            // Logging is already done in processTheTask via onInfo
259
            await stopLock();
36✔
260
            const duration = Date.now() - start;
36✔
261

262
            this.metricsCollector?.recordTaskExecution(taskRecord.task, 'failed', duration);
36!
263

264
            const entry = this.registry.getEntry(tasksCollection.collectionName);
36✔
265

266
            await entry.repository.finalizeTask(
36✔
267
                taskRecord,
268
                taskDef.retryStrategy,
269
                error as Error,
270
                taskDef.debounceMs,
271
                { durationMs: duration },
272
                taskDef.executionHistoryLimit,
273
            );
274
        }
275
    }
276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc