• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 20684680181

03 Jan 2026 11:52PM UTC coverage: 91.81% (-0.06%) from 91.869%
20684680181

push

github

VaclavObornik
2.4.0

1252 of 1454 branches covered (86.11%)

Branch coverage included in aggregate %.

2010 of 2099 relevant lines covered (95.76%)

372.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.55
/src/reactiveTasks/ReactiveTaskOps.ts
1
import * as _debug from 'debug';
222✔
2
import { Document } from 'mongodb';
3
import { compileWatchProjection } from './compileWatchProjection';
222✔
4
import { ReactiveTaskRegistry } from './ReactiveTaskRegistry';
5

6
const debug = _debug('mongodash:reactiveTasks:ops');
222✔
7

8
/**
9
 * Helper class for generating and executing reactive task operations.
10
 *
11
 * Responsibilities:
12
 * - Generates upsert operations for tasks based on source documents and task definitions.
13
 * - Executes bulk write operations to the task collections.
14
 * - Handles duplicate key errors gracefully (which can occur during reconciliation).
15
 * - Manages debouncing logic by tracking the minimum debounce time for planned tasks.
16
 */
17
export class ReactiveTaskOps {
222✔
18
    constructor(
19
        private registry: ReactiveTaskRegistry,
139✔
20
        private onTaskPlanned: (tasksCollectionName: string, debounceMs: number) => void,
139✔
21
    ) {}
22

23
    public async executePlanningPipeline(collectionName: string, sourceDocIds: unknown[], allowedTaskNames?: Set<string>): Promise<void> {
24
        debug(`executePlanningPipeline called for ${collectionName} with ${sourceDocIds.length} ids`);
303✔
25
        const entry = this.registry.getEntry(collectionName);
303✔
26
        if (!entry) {
303!
27
            debug(`No entry found for collection ${collectionName}`);
×
28
            return;
×
29
        }
30

31
        const matchFilter = { _id: { $in: sourceDocIds } };
303✔
32
        const pipeline = this.generatePlanningPipeline(entry, matchFilter, allowedTaskNames);
303✔
33
        if (pipeline.length === 0) {
303!
34
            debug(`Pipeline empty for ${collectionName} (allowedTasks: ${allowedTaskNames ? Array.from(allowedTaskNames).join(',') : 'all'})`);
×
35
            return;
×
36
        }
37

38
        debug(`Executing pipeline for ${collectionName} handling ${sourceDocIds.length} docs`);
303✔
39
        try {
303✔
40
            await entry.sourceCollection.aggregate(pipeline).toArray();
303✔
41

42
            debug(`Pipeline executed successfully for ${collectionName}`);
303✔
43

44
            // Notify that tasks have been planned
45
            for (const task of entry.tasks.values()) {
303✔
46
                if (allowedTaskNames && !allowedTaskNames.has(task.task)) continue;
308!
47
                this.onTaskPlanned(task.tasksCollection.collectionName, task.debounceMs);
308✔
48
            }
49
        } catch (error) {
50
            debug(`Error executing pipeline for ${collectionName}:`, error);
×
51
            throw error;
×
52
        }
53
    }
54

55
    private generatePlanningPipeline(entry: ReturnType<ReactiveTaskRegistry['getEntry']>, matchFilter?: Document, allowedTaskNames?: Set<string>): Document[] {
56
        let tasks = Array.from(entry.tasks.values());
305✔
57

58
        if (allowedTaskNames) {
305✔
59
            tasks = tasks.filter((t) => allowedTaskNames.has(t.task));
30✔
60
        }
61

62
        if (tasks.length === 0) {
305!
63
            return [];
×
64
        }
65

66
        const pipeline: Document[] = [
305✔
67
            { $match: matchFilter || {} },
305!
68
            {
69
                $project: {
70
                    _id: 0,
71
                    sourceDocId: '$_id',
72
                    tasks: {
73
                        $filter: {
74
                            input: tasks.map((task) => ({
310✔
75
                                task: task.task,
76
                                matches: task.filter || true,
507✔
77
                                watchedValues: compileWatchProjection(task.watchProjection),
78
                                debounceMs: task.debounceMs,
79
                                resetRetriesOnDataChange: task.retryStrategy.policy.resetRetriesOnDataChange,
80
                            })),
81
                            as: 't',
82
                            cond: '$$t.matches',
83
                        },
84
                    },
85
                },
86
            },
87
            { $unwind: '$tasks' },
88
            {
89
                $project: {
90
                    sourceDocId: 1,
91
                    task: '$tasks.task',
92
                    lastObservedValues: '$tasks.watchedValues',
93
                    status: { $literal: 'pending' },
94
                    attempts: { $literal: 0 },
95
                    createdAt: '$$NOW',
96
                    updatedAt: '$$NOW',
97
                    nextRunAt: { $add: ['$$NOW', '$tasks.debounceMs'] },
98
                    dueAt: { $add: ['$$NOW', '$tasks.debounceMs'] },
99
                    resetRetriesOnDataChange: { $ifNull: ['$tasks.resetRetriesOnDataChange', true] },
100
                },
101
            },
102
            {
103
                $merge: {
104
                    into: entry.tasksCollection.collectionName,
105
                    on: ['task', 'sourceDocId'],
106
                    whenNotMatched: 'insert',
107
                    whenMatched: [
108
                        {
109
                            $set: {
110
                                hasChanged: { $ne: ['$lastObservedValues', '$$new.lastObservedValues'] },
111
                            },
112
                        },
113
                        {
114
                            $set: {
115
                                sourceDocId: '$$new.sourceDocId',
116
                                task: '$$new.task',
117
                                lastObservedValues: '$$new.lastObservedValues',
118
                                updatedAt: {
119
                                    $cond: { if: '$hasChanged', then: '$$new.updatedAt', else: '$updatedAt' },
120
                                },
121
                                firstErrorAt: {
122
                                    $cond: {
123
                                        if: '$hasChanged',
124
                                        then: {
125
                                            $cond: {
126
                                                if: '$$new.resetRetriesOnDataChange',
127
                                                then: null,
128
                                                else: '$firstErrorAt',
129
                                            },
130
                                        },
131
                                        else: '$firstErrorAt',
132
                                    },
133
                                },
134
                                lastError: {
135
                                    $cond: {
136
                                        if: '$hasChanged',
137
                                        then: {
138
                                            $cond: {
139
                                                if: '$$new.resetRetriesOnDataChange',
140
                                                then: null,
141
                                                else: '$lastError',
142
                                            },
143
                                        },
144
                                        else: '$lastError',
145
                                    },
146
                                },
147
                                status: {
148
                                    $cond: {
149
                                        if: '$hasChanged',
150
                                        then: {
151
                                            $cond: {
152
                                                if: { $in: ['$status', ['processing', 'processing_dirty']] },
153
                                                then: 'processing_dirty',
154
                                                else: 'pending',
155
                                            },
156
                                        },
157
                                        else: '$status',
158
                                    },
159
                                },
160
                                dueAt: {
161
                                    $cond: {
162
                                        if: '$hasChanged',
163
                                        then: '$$new.dueAt', // Always reset dueAt for the new version
164
                                        else: '$dueAt',
165
                                    },
166
                                },
167
                                nextRunAt: {
168
                                    $cond: {
169
                                        if: '$hasChanged',
170
                                        then: {
171
                                            $cond: {
172
                                                if: { $in: ['$status', ['processing', 'processing_dirty']] },
173
                                                then: '$nextRunAt',
174
                                                else: '$$new.nextRunAt',
175
                                            },
176
                                        },
177
                                        else: '$nextRunAt',
178
                                    },
179
                                },
180
                                attempts: {
181
                                    $cond: { if: '$hasChanged', then: 0, else: '$attempts' },
182
                                },
183
                            },
184
                        },
185
                        { $unset: 'hasChanged' },
186
                    ],
187
                },
188
            },
189
        ];
190

191
        return pipeline;
305✔
192
    }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc