• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

caolan / async / 9331163491

01 Jun 2024 03:01PM UTC coverage: 99.244%. Remained the same
9331163491

Pull #1966

github

web-flow
Merge 080d3ceba into e16ff25a6
Pull Request #1966: build(deps-dev): bump @babel/eslint-parser from 7.24.5 to 7.24.6

468 of 492 branches covered (95.12%)

1182 of 1191 relevant lines covered (99.24%)

34837.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/lib/internal/queue.js
1
import onlyOnce from './onlyOnce.js'
1✔
2
import setImmediate from './setImmediate.js'
1✔
3
import DLL from './DoublyLinkedList.js'
1✔
4
import wrapAsync from './wrapAsync.js'
1✔
5

6
export default function queue(worker, concurrency, payload) {
7
    if (concurrency == null) {
74✔
8
        concurrency = 1;
11✔
9
    }
10
    else if(concurrency === 0) {
63✔
11
        throw new RangeError('Concurrency must not be zero');
1✔
12
    }
13

14
    var _worker = wrapAsync(worker);
73✔
15
    var numRunning = 0;
73✔
16
    var workersList = [];
73✔
17
    const events = {
73✔
18
        error: [],
19
        drain: [],
20
        saturated: [],
21
        unsaturated: [],
22
        empty: []
23
    }
24

25
    function on (event, handler) {
26
        events[event].push(handler)
70✔
27
    }
28

29
    function once (event, handler) {
30
        const handleAndRemove = (...args) => {
10✔
31
            off(event, handleAndRemove)
10✔
32
            handler(...args)
10✔
33
        }
34
        events[event].push(handleAndRemove)
10✔
35
    }
36

37
    function off (event, handler) {
38
        if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
82✔
39
        if (!handler) return events[event] = []
80✔
40
        events[event] = events[event].filter(ev => ev !== handler)
10✔
41
    }
42

43
    function trigger (event, ...args) {
44
        events[event].forEach(handler => handler(...args))
673✔
45
    }
46

47
    var processingScheduled = false;
73✔
48
    function _insert(data, insertAtFront, rejectOnError, callback) {
49
        if (callback != null && typeof callback !== 'function') {
337✔
50
            throw new Error('task callback must be a function');
2✔
51
        }
52
        q.started = true;
335✔
53

54
        var res, rej;
55
        function promiseCallback (err, ...args) {
56
            // we don't care about the error, let the global error handler
57
            // deal with it
58
            if (err) return rejectOnError ? rej(err) : res()
252✔
59
            if (args.length <= 1) return res(args[0])
232!
60
            res(args)
×
61
        }
62

63
        var item = q._createTaskItem(
335✔
64
            data,
65
            rejectOnError ? promiseCallback :
335✔
66
                (callback || promiseCallback)
566✔
67
        );
68

69
        if (insertAtFront) {
335✔
70
            q._tasks.unshift(item);
12✔
71
        } else {
72
            q._tasks.push(item);
323✔
73
        }
74

75
        if (!processingScheduled) {
335✔
76
            processingScheduled = true;
77✔
77
            setImmediate(() => {
77✔
78
                processingScheduled = false;
77✔
79
                q.process();
77✔
80
            });
81
        }
82

83
        if (rejectOnError || !callback) {
335✔
84
            return new Promise((resolve, reject) => {
255✔
85
                res = resolve
255✔
86
                rej = reject
255✔
87
            })
88
        }
89
    }
90

91
    function _createCB(tasks) {
92
        return function (err, ...args) {
283✔
93
            numRunning -= 1;
283✔
94

95
            for (var i = 0, l = tasks.length; i < l; i++) {
283✔
96
                var task = tasks[i];
332✔
97

98
                var index = workersList.indexOf(task);
332✔
99
                if (index === 0) {
332✔
100
                    workersList.shift();
322✔
101
                } else if (index > 0) {
10!
102
                    workersList.splice(index, 1);
10✔
103
                }
104

105
                task.callback(err, ...args);
332✔
106

107
                if (err != null) {
332✔
108
                    trigger('error', err, task.data);
62✔
109
                }
110
            }
111

112
            if (numRunning <= (q.concurrency - q.buffer) ) {
283✔
113
                trigger('unsaturated')
271✔
114
            }
115

116
            if (q.idle()) {
283✔
117
                trigger('drain')
66✔
118
            }
119
            q.process();
283✔
120
        };
121
    }
122

123
    function _maybeDrain(data) {
124
        if (data.length === 0 && q.idle()) {
24✔
125
            // call drain immediately if there are no tasks
126
            setImmediate(() => trigger('drain'));
2✔
127
            return true
2✔
128
        }
129
        return false
22✔
130
    }
131

132
    const eventMethod = (name) => (handler) => {
365✔
133
        if (!handler) {
80✔
134
            return new Promise((resolve, reject) => {
10✔
135
                once(name, (err, data) => {
10✔
136
                    if (err) return reject(err)
10✔
137
                    resolve(data)
8✔
138
                })
139
            })
140
        }
141
        off(name)
70✔
142
        on(name, handler)
70✔
143

144
    }
145

146
    var isProcessing = false;
73✔
147
    var q = {
73✔
148
        _tasks: new DLL(),
149
        _createTaskItem (data, callback) {
150
            return {
278✔
151
                data,
152
                callback
153
            };
154
        },
155
        *[Symbol.iterator] () {
156
            yield* q._tasks[Symbol.iterator]()
6✔
157
        },
158
        concurrency,
159
        payload,
160
        buffer: concurrency / 4,
161
        started: false,
162
        paused: false,
163
        push (data, callback) {
164
            if (Array.isArray(data)) {
275✔
165
                if (_maybeDrain(data)) return
21✔
166
                return data.map(datum => _insert(datum, false, false, callback))
63✔
167
            }
168
            return _insert(data, false, false, callback);
254✔
169
        },
170
        pushAsync (data, callback) {
171
            if (Array.isArray(data)) {
6✔
172
                if (_maybeDrain(data)) return
2!
173
                return data.map(datum => _insert(datum, false, true, callback))
4✔
174
            }
175
            return _insert(data, false, true, callback);
4✔
176
        },
177
        kill () {
178
            off()
2✔
179
            q._tasks.empty();
2✔
180
        },
181
        unshift (data, callback) {
182
            if (Array.isArray(data)) {
8!
183
                if (_maybeDrain(data)) return
×
184
                return data.map(datum => _insert(datum, true, false, callback))
×
185
            }
186
            return _insert(data, true, false, callback);
8✔
187
        },
188
        unshiftAsync (data, callback) {
189
            if (Array.isArray(data)) {
3✔
190
                if (_maybeDrain(data)) return
1!
191
                return data.map(datum => _insert(datum, true, true, callback))
2✔
192
            }
193
            return _insert(data, true, true, callback);
2✔
194
        },
195
        remove (testFn) {
196
            q._tasks.remove(testFn);
1✔
197
        },
198
        process () {
199
            // Avoid trying to start too many processing operations. This can occur
200
            // when callbacks resolve synchronously (#1267).
201
            if (isProcessing) {
364✔
202
                return;
67✔
203
            }
204
            isProcessing = true;
297✔
205
            while(!q.paused && numRunning < q.concurrency && q._tasks.length){
297✔
206
                var tasks = [], data = [];
283✔
207
                var l = q._tasks.length;
283✔
208
                if (q.payload) l = Math.min(l, q.payload);
283!
209
                for (var i = 0; i < l; i++) {
283✔
210
                    var node = q._tasks.shift();
332✔
211
                    tasks.push(node);
332✔
212
                    workersList.push(node);
332✔
213
                    data.push(node.data);
332✔
214
                }
215

216
                numRunning += 1;
283✔
217

218
                if (q._tasks.length === 0) {
283✔
219
                    trigger('empty');
72✔
220
                }
221

222
                if (numRunning === q.concurrency) {
283✔
223
                    trigger('saturated');
200✔
224
                }
225

226
                var cb = onlyOnce(_createCB(tasks));
283✔
227
                _worker(data, cb);
283✔
228
            }
229
            isProcessing = false;
297✔
230
        },
231
        length () {
232
            return q._tasks.length;
58✔
233
        },
234
        running () {
235
            return numRunning;
64✔
236
        },
237
        workersList () {
238
            return workersList;
46✔
239
        },
240
        idle() {
241
            return q._tasks.length + numRunning === 0;
295✔
242
        },
243
        pause () {
244
            q.paused = true;
4✔
245
        },
246
        resume () {
247
            if (q.paused === false) { return; }
4!
248
            q.paused = false;
4✔
249
            setImmediate(q.process);
4✔
250
        }
251
    };
252
    // define these as fixed properties, so people get useful errors when updating
253
    Object.defineProperties(q, {
73✔
254
        saturated: {
255
            writable: false,
256
            value: eventMethod('saturated')
257
        },
258
        unsaturated: {
259
            writable: false,
260
            value: eventMethod('unsaturated')
261
        },
262
        empty: {
263
            writable: false,
264
            value: eventMethod('empty')
265
        },
266
        drain: {
267
            writable: false,
268
            value: eventMethod('drain')
269
        },
270
        error: {
271
            writable: false,
272
            value: eventMethod('error')
273
        },
274
    })
275
    return q;
73✔
276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc