• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 21093867550

17 Jan 2026 11:56AM UTC coverage: 91.559% (-0.2%) from 91.769%
21093867550

push

github

VaclavObornik
2.5.0

1328 of 1554 branches covered (85.46%)

Branch coverage included in aggregate %.

2143 of 2237 relevant lines covered (95.8%)

364.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.41
/src/reactiveTasks/ReactiveTaskOps.ts
1
import * as _debug from 'debug';
248✔
2
import { Document } from 'mongodb';
3
import { compileWatchProjection } from './compileWatchProjection';
248✔
4
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
5

6
const debug = _debug('mongodash:reactiveTasks:ops');
248✔
7

8
/**
9
 * Helper class for generating and executing reactive task operations.
10
 *
11
 * Responsibilities:
12
 * - Generates upsert operations for tasks based on source documents and task definitions.
13
 * - Executes bulk write operations to the task collections.
14
 * - Handles duplicate key errors gracefully (which can occur during reconciliation).
15
 * - Manages debouncing logic by tracking the minimum debounce time for planned tasks.
16
 */
17
export class ReactiveTaskOps {
248✔
18
    constructor(
19
        private registry: ReactiveTaskRegistry,
155✔
20
        private onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void,
155✔
21
    ) {}
22

23
    private _forceDebounceMs?: number;
24

25
    public setForceDebounce(debounceMs: number | undefined) {
26
        this._forceDebounceMs = debounceMs;
4✔
27
    }
28

29
    public get forceDebounceMs(): number | undefined {
30
        return this._forceDebounceMs;
647✔
31
    }
32

33
    public async executePlanningPipeline(collectionName: string, sourceDocIds: unknown[], allowedTaskNames?: Set<string>): Promise<void> {
34
        debug(`executePlanningPipeline called for ${collectionName} with ${sourceDocIds.length} ids`);
314✔
35
        const entry = this.registry.getEntry(collectionName);
314✔
36
        if (!entry) {
314!
37
            debug(`No entry found for collection ${collectionName}`);
×
38
            return;
×
39
        }
40

41
        const matchFilter = { _id: { $in: sourceDocIds } };
314✔
42
        const pipeline = this.generatePlanningPipeline(entry, matchFilter, allowedTaskNames);
314✔
43
        if (pipeline.length === 0) {
314!
44
            debug(`Pipeline empty for ${collectionName}(allowedTasks: ${allowedTaskNames ? Array.from(allowedTaskNames).join(',') : 'all'})`);
×
45
            return;
×
46
        }
47

48
        debug(`Executing pipeline for ${collectionName} handling ${sourceDocIds.length} docs`);
314✔
49
        try {
314✔
50
            await entry.sourceCollection.aggregate(pipeline).toArray();
314✔
51

52
            debug(`Pipeline executed successfully for ${collectionName}`);
314✔
53

54
            // Notify that tasks have been planned
55
            for (const task of entry.tasks.values()) {
314✔
56
                if (allowedTaskNames && !allowedTaskNames.has(task.task)) continue;
321!
57
                // Use effective debounce
58
                const effectiveDebounce = this.forceDebounceMs !== undefined ? this.forceDebounceMs : task.debounceMs;
321!
59
                this.onTaskPlanned(task.tasksCollection.collectionName, effectiveDebounce);
321✔
60
            }
61
        } catch (error) {
62
            debug(`Error executing pipeline for ${collectionName}: `, error);
×
63
            throw error;
×
64
        }
65
    }
66

67
    private generatePlanningPipeline(entry: ReturnType<ReactiveTaskRegistry['getEntry']>, matchFilter?: Document, allowedTaskNames?: Set<string>): Document[] {
68
        let tasks = Array.from(entry.tasks.values());
316✔
69

70
        if (allowedTaskNames) {
316✔
71
            tasks = tasks.filter((t) => allowedTaskNames.has(t.task));
29✔
72
        }
73

74
        if (tasks.length === 0) {
316!
75
            return [];
×
76
        }
77

78
        const pipeline: Document[] = [
316✔
79
            { $match: matchFilter || {} },
316!
80
            {
81
                $project: {
82
                    _id: 0,
83
                    sourceDocId: '$_id',
84
                    tasks: {
85
                        $filter: {
86
                            input: tasks.map((task) => ({
323✔
87
                                task: task.task,
88
                                matches: task.filter || true,
525✔
89
                                watchedValues: compileWatchProjection(task.watchProjection),
90
                                debounceMs: this.forceDebounceMs !== undefined ? this.forceDebounceMs : task.debounceMs,
323!
91
                                resetRetriesOnDataChange: task.retryStrategy.policy.resetRetriesOnDataChange,
92
                            })),
93
                            as: 't',
94
                            cond: '$$t.matches',
95
                        },
96
                    },
97
                },
98
            },
99
            { $unwind: '$tasks' },
100
            {
101
                $project: {
102
                    sourceDocId: 1,
103
                    task: '$tasks.task',
104
                    lastObservedValues: '$tasks.watchedValues',
105
                    status: { $literal: 'pending' },
106
                    attempts: { $literal: 0 },
107
                    createdAt: '$$NOW',
108
                    updatedAt: '$$NOW',
109
                    nextRunAt: { $add: ['$$NOW', '$tasks.debounceMs'] },
110
                    dueAt: { $add: ['$$NOW', '$tasks.debounceMs'] },
111
                    resetRetriesOnDataChange: { $ifNull: ['$tasks.resetRetriesOnDataChange', true] },
112
                },
113
            },
114
            {
115
                $merge: {
116
                    into: entry.tasksCollection.collectionName,
117
                    on: ['task', 'sourceDocId'],
118
                    whenNotMatched: 'insert',
119
                    whenMatched: [
120
                        {
121
                            $set: {
122
                                hasChanged: { $ne: ['$lastObservedValues', '$$new.lastObservedValues'] },
123
                            },
124
                        },
125
                        {
126
                            $set: {
127
                                sourceDocId: '$$new.sourceDocId',
128
                                task: '$$new.task',
129
                                lastObservedValues: '$$new.lastObservedValues',
130
                                updatedAt: {
131
                                    $cond: { if: '$hasChanged', then: '$$new.updatedAt', else: '$updatedAt' },
132
                                },
133
                                firstErrorAt: {
134
                                    $cond: {
135
                                        if: '$hasChanged',
136
                                        then: {
137
                                            $cond: {
138
                                                if: '$$new.resetRetriesOnDataChange',
139
                                                then: null,
140
                                                else: '$firstErrorAt',
141
                                            },
142
                                        },
143
                                        else: '$firstErrorAt',
144
                                    },
145
                                },
146
                                lastError: {
147
                                    $cond: {
148
                                        if: '$hasChanged',
149
                                        then: {
150
                                            $cond: {
151
                                                if: '$$new.resetRetriesOnDataChange',
152
                                                then: null,
153
                                                else: '$lastError',
154
                                            },
155
                                        },
156
                                        else: '$lastError',
157
                                    },
158
                                },
159
                                status: {
160
                                    $cond: {
161
                                        if: '$hasChanged',
162
                                        then: {
163
                                            $cond: {
164
                                                if: { $in: ['$status', ['processing', 'processing_dirty']] },
165
                                                then: 'processing_dirty',
166
                                                else: 'pending',
167
                                            },
168
                                        },
169
                                        else: '$status',
170
                                    },
171
                                },
172
                                dueAt: {
173
                                    $cond: {
174
                                        if: '$hasChanged',
175
                                        then: '$$new.dueAt', // Always reset dueAt for the new version
176
                                        else: '$dueAt',
177
                                    },
178
                                },
179
                                nextRunAt: {
180
                                    $cond: {
181
                                        if: '$hasChanged',
182
                                        then: {
183
                                            $cond: {
184
                                                if: { $in: ['$status', ['processing', 'processing_dirty']] },
185
                                                then: '$nextRunAt',
186
                                                else: '$$new.nextRunAt',
187
                                            },
188
                                        },
189
                                        else: '$nextRunAt',
190
                                    },
191
                                },
192
                                attempts: {
193
                                    $cond: { if: '$hasChanged', then: 0, else: '$attempts' },
194
                                },
195
                            },
196
                        },
197
                        { $unset: 'hasChanged' },
198
                    ],
199
                },
200
            },
201
        ];
202

203
        return pipeline;
316✔
204
    }
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc