• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

VaclavObornik / mongodash / 21111634189

18 Jan 2026 12:21PM UTC coverage: 91.133% (-0.4%) from 91.52%
21111634189

push

github

VaclavObornik
2.6.0

1343 of 1578 branches covered (85.11%)

Branch coverage included in aggregate %.

2172 of 2279 relevant lines covered (95.3%)

359.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.86
/src/testing/waitUntilReactiveTasksIdle.ts
1
import * as _debug from 'debug';
245✔
2
import { Document, Filter } from 'mongodb';
3
import { ReactiveTaskRecord, _scheduler } from '../reactiveTasks';
245✔
4
import { waitUntil, WaitUntilOptions } from './waitUntil';
245✔
5

6
const debug = _debug('mongodash:testing');
245✔
7

8
/**
9
 * Waits until the reactive task system is idle.
10
 * "Idle" means:
11
 * 1. No changes are buffered in the Planner.
12
 * 2. No workers are currently executing tasks (active count is 0).
13
 * 3. No tasks in the database are in a pending or processing state.
14
 *
15
 * This enables robust E2E testing by ensuring that all side effects and cascading tasks have finished.
16
 */
17
export interface WaitUntilReactiveTasksIdleOptions extends Partial<WaitUntilOptions> {
18
    /**
19
     * If provided, the function will only wait for tasks related to these specific entities.
20
     * Global checks (Planner buffer, Active workers) are SKIPPED in this mode to ensure isolation
21
     * from other running tests.
22
     */
23
    whitelist?: Array<{
24
        collection: string;
25
        /**
26
         * Filter to find relevant documents.
27
         * If not provided, ALL documents in the collection are considered (use carefully!).
28
         */
29
        filter?: Filter<Document>;
30
        /**
31
         * Optional task name filter.
32
         */
33
        task?: string;
34
    }>;
35
}
36

37
export async function waitUntilReactiveTasksIdle(customOptions: WaitUntilReactiveTasksIdleOptions = {}): Promise<void> {
245✔
38
    const options: WaitUntilOptions = {
12✔
39
        timeoutMs: 10000,
40
        pollIntervalMs: 50,
41
        stabilityDurationMs: 200, // Wait for silence to catch "in-flight" events
42
        ...customOptions,
43
    };
44

45
    const hasWhitelist = customOptions.whitelist && customOptions.whitelist.length > 0;
12✔
46

47
    await waitUntil(async () => {
12✔
48
        // Access scheduler internals
49
        const planner = _scheduler.taskPlannerInstance;
197✔
50
        const runner = _scheduler.concurrentRunnerInstance;
197✔
51
        const registry = _scheduler.getRegistry();
197✔
52

53
        // --- 1. Global Checks (Always run) ---
54
        // 1. Check Internal Buffers (Planner)
55
        if (planner && !planner.isEmpty) {
197✔
56
            debug('Planner not empty');
12✔
57
            return false;
12✔
58
        }
59

60
        // 2. Check Active Workers (Runner)
61
        if (runner && runner.activeWorkers > 0) {
185✔
62
            debug(`Active workers: ${runner.activeWorkers}`);
26✔
63
            return false;
26✔
64
        }
65

66
        // --- 2. Check Database ---
67
        const entries = registry.getAllEntries();
159✔
68

69
        // Optimized check: If any collection has pending work, we are not idle.
70
        for (const entry of entries) {
159✔
71
            // If whitelisting is active, we only check tasks that match the whitelist
72
            let whitelistFilter: Filter<ReactiveTaskRecord> | null = null;
162✔
73

74
            if (hasWhitelist) {
162✔
75
                const rules = customOptions.whitelist!.filter((rule) => rule.collection === entry.sourceCollection.collectionName);
33✔
76

77
                if (rules.length === 0) {
33✔
78
                    continue;
5✔
79
                }
80

81
                const criteria: Array<Filter<ReactiveTaskRecord>> = [];
28✔
82
                let matchAll = false;
28✔
83

84
                for (const rule of rules) {
28✔
85
                    let ruleIds: unknown[] | null = null;
28✔
86

87
                    if (rule.filter) {
28!
88
                        // If we have a filter, we need to find which docs match it.
89
                        // We can't filter tasks directly by source properties efficiently without joining,
90
                        // so we find the matching source docs first.
91
                        const matchingDocs = (await entry.sourceCollection.find(rule.filter, { projection: { _id: 1 } }).toArray()) as Document[];
×
92
                        ruleIds = matchingDocs.map((d) => d._id);
×
93
                    }
94

95
                    if (ruleIds === null && !rule.task) {
28!
96
                        // One rule validates 'all', so we wait for everything in this collection
97
                        matchAll = true;
28✔
98
                        break;
28✔
99
                    }
100

101
                    if (rule.task) {
×
102
                        criteria.push({ task: rule.task });
×
103
                    }
104
                    if (ruleIds !== null) {
×
105
                        // eslint-disable-next-line @typescript-eslint/no-explicit-any
106
                        criteria.push({ sourceDocId: { $in: ruleIds as any[] } });
×
107
                    }
108
                }
109

110
                if (!matchAll) {
28!
111
                    if (criteria.length > 0) {
×
112
                        whitelistFilter = { $or: criteria };
×
113
                    } else {
114
                        // Whitelist is active, but we have rules that result in effectively "nothing"
115
                        // (e.g. filter returned no docs).
116
                        // If we have NO criteria and NO matchAll, it implies we wait for nothing on this collection?
117
                        // Or should we treat it as blocking?
118
                        // If filter didn't match any doc, then we effectively wait for nothing for that rule.
119
                        // If ALL rules resulted in nothing, we continue to next entry.
120
                        continue;
×
121
                    }
122
                }
123
            }
124

125
            const stableThresholdMs = (options.timeoutMs || 0) + (options.stabilityDurationMs || 0) + 100;
157!
126

127
            const baseQuery: Filter<ReactiveTaskRecord> = {
157✔
128
                $or: [
129
                    { status: { $in: ['processing', 'processing_dirty'] } },
130
                    {
131
                        status: 'pending',
132
                        $or: [{ nextRunAt: { $lte: new Date(Date.now() + stableThresholdMs) } }, { nextRunAt: null }],
133
                    },
134
                ],
135
            };
136

137
            const query: Filter<ReactiveTaskRecord> = whitelistFilter ? { $and: [baseQuery, whitelistFilter] } : baseQuery;
157!
138

139
            const count = await entry.tasksCollection.countDocuments(query);
157✔
140

141
            if (count > 0) {
157✔
142
                debug(`Collection ${entry.tasksCollection.collectionName} has ${count} active tasks`);
90✔
143
                return false;
90✔
144
            }
145
        }
146

147
        return true;
69✔
148
    }, options);
149
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc