• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bee-queue / bee-queue / 20648554568

02 Jan 2026 01:07AM UTC coverage: 99.655% (-0.2%) from 99.827%
20648554568

push

github

web-flow
chore(deps-dev): bump @commitlint/config-conventional (#954)

Bumps [@commitlint/config-conventional](https://github.com/conventional-changelog/commitlint/tree/HEAD/@commitlint/config-conventional) from 20.2.0 to 20.3.0.
- [Release notes](https://github.com/conventional-changelog/commitlint/releases)
- [Changelog](https://github.com/conventional-changelog/commitlint/blob/master/@commitlint/config-conventional/CHANGELOG.md)
- [Commits](https://github.com/conventional-changelog/commitlint/commits/v20.3.0/@commitlint/config-conventional)

---
updated-dependencies:
- dependency-name: "@commitlint/config-conventional"
  dependency-version: 20.3.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

320 of 324 branches covered (98.77%)

577 of 579 relevant lines covered (99.65%)

2817.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.47
/lib/queue.js
1
'use strict';
2

3
const redis = require('./redis');
3✔
4
const Emitter = require('events').EventEmitter;
3✔
5

6
const Job = require('./job');
3✔
7
const defaults = require('./defaults');
3✔
8
const lua = require('./lua');
3✔
9
const helpers = require('./helpers');
3✔
10
const backoff = require('./backoff');
3✔
11
const EagerTimer = require('./eager-timer');
3✔
12
const finally_ = require('p-finally');
3✔
13

14
class Queue extends Emitter {
15
  constructor(name, settings = {}) {
×
16
    super();
155✔
17

18
    this.name = name;
155✔
19
    this.paused = false;
155✔
20
    this.jobs = new Map();
155✔
21
    this.activeJobs = new Set();
155✔
22
    this.checkTimer = null;
155✔
23
    this.backoffStrategies = new Map(backoff);
155✔
24

25
    this._closed = null;
155✔
26
    this._isClosed = false;
155✔
27

28
    this._emitError = (err) => void this.emit('error', err);
155✔
29
    this._emitErrorAfterTick = (err) =>
155✔
30
      void process.nextTick(() => this.emit('error', err));
3✔
31

32
    this.client = null;
155✔
33
    this.bclient = null;
155✔
34
    this.eclient = null;
155✔
35

36
    this.settings = {
155✔
37
      redis: settings.redis || {},
155!
38
      quitCommandClient: settings.quitCommandClient,
39
      keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':',
310✔
40
      autoConnect: helpers.bool(settings.autoConnect, true),
41
    };
42

43
    this._isReady = false;
155✔
44
    this._ready = false;
155✔
45

46
    for (const prop in defaults) {
155✔
47
      const def = defaults[prop],
2,480✔
48
        setting = settings[prop],
2,480✔
49
        type = typeof def;
2,480✔
50
      if (type === 'boolean') {
2,480✔
51
        this.settings[prop] = typeof setting === 'boolean' ? setting : def;
1,240✔
52
      } else if (type === 'number') {
1,240✔
53
        this.settings[prop] = Number.isSafeInteger(setting) ? setting : def;
775✔
54
      }
55
    }
56

57
    /* istanbul ignore if */
58
    if (this.settings.redis.socket) {
155✔
59
      this.settings.redis = Object.assign({}, this.settings.redis, {
60
        path: this.settings.redis.socket,
61
      });
62
    }
63

64
    // By default, if we're given a redis client and no additional instructions,
65
    // don't quit the connection on Queue#close.
66
    if (typeof this.settings.quitCommandClient !== 'boolean') {
155✔
67
      this.settings.quitCommandClient = !redis.isClient(this.settings.redis);
152✔
68
    }
69

70
    // To avoid changing the hidden class of the Queue.
71
    this._delayedTimer = this.settings.activateDelayedJobs
155✔
72
      ? new EagerTimer(this.settings.nearTermWindow)
73
      : null;
74
    if (this._delayedTimer) {
155✔
75
      this._delayedTimer.on('trigger', this._activateDelayed.bind(this));
9✔
76
    }
77

78
    if (this.settings.autoConnect) {
155✔
79
      this.connect();
154✔
80
    }
81
  }
82

83
  makeClient(clientName, createNew) {
84
    return redis.createClient(this.settings.redis, createNew).then((client) => {
442✔
85
      // This event gets cleaned up and removed in Queue#close for the
86
      // primary client if quitCommandClient is disabled.
87
      client.on('error', this._emitError);
441✔
88
      return (this[clientName] = client);
441✔
89
    });
90
  }
91

92
  connect() {
93
    return new Promise((resolve, reject) => {
155✔
94
      try {
155✔
95
        if (this._isReady) return resolve(this._isReady);
155!
96

97
        const getEventPromise = () => {
155✔
98
          if (this.settings.getEvents || this.settings.activateDelayedJobs) {
155✔
99
            return this.makeClient('eclient', true).then(() => {
141✔
100
              this.eclient.on('message', this._onMessage.bind(this));
141✔
101
              const channels = [];
141✔
102
              if (this.settings.getEvents) {
141✔
103
                channels.push(this.toKey('events'));
138✔
104
              }
105
              if (this.settings.activateDelayedJobs) {
141✔
106
                channels.push(this.toKey('earlierDelayed'));
9✔
107
              }
108
              return Promise.all(
141✔
109
                channels.map((channel) =>
110
                  helpers.callAsync((done) =>
147✔
111
                    this.eclient.subscribe(channel, done)
147✔
112
                  )
113
                )
114
              );
115
            });
116
          }
117

118
          return null;
14✔
119
        };
120

121
        const eventsPromise = getEventPromise();
155✔
122
        // Wait for Lua scripts and client connections to load. Also wait for
123
        // bclient and eclient/subscribe if they're needed.
124
        this._ready = Promise.all([
155✔
125
          // Make the clients
126
          this.makeClient('client', false),
127
          this.settings.isWorker ? this.makeClient('bclient', true) : null,
155✔
128
          eventsPromise,
129
        ])
130
          .then(() => {
131
            if (this.settings.ensureScripts) {
154✔
132
              return lua.buildCache(this.client);
153✔
133
            }
134
          })
135
          .then(() => {
136
            this._isReady = true;
154✔
137
            setImmediate(() => this.emit('ready'));
154✔
138
            resolve(this._isReady);
154✔
139
            return this;
154✔
140
          });
141
      } catch (err) {
142
        reject(err);
×
143
      }
144
    });
145
  }
146

147
  _onMessage(channel, message) {
148
    if (channel === this.toKey('earlierDelayed')) {
1,269✔
149
      // We should only receive these messages if activateDelayedJobs is
150
      // enabled.
151
      this._delayedTimer.schedule(parseInt(message, 10));
9✔
152
      return;
9✔
153
    }
154

155
    message = JSON.parse(message);
1,260✔
156
    if (message.event === 'failed' || message.event === 'retrying') {
1,260✔
157
      message.data = new Error(message.data);
24✔
158
    }
159

160
    this.emit('job ' + message.event, message.id, message.data);
1,260✔
161

162
    const job = this.jobs.get(message.id);
1,260✔
163
    if (job) {
1,260✔
164
      if (message.event === 'progress') {
1,165✔
165
        job.progress = message.data;
3✔
166
      } else if (message.event === 'retrying') {
1,162✔
167
        job.options.retries -= 1;
9✔
168
      }
169

170
      job.emit(message.event, message.data);
1,165✔
171

172
      if (message.event === 'succeeded' || message.event === 'failed') {
1,165✔
173
        this.jobs.delete(message.id);
1,153✔
174
      }
175
    }
176
  }
177

178
  isRunning() {
179
    return !this.paused;
3✔
180
  }
181

182
  ready(cb) {
183
    if (cb) {
46✔
184
      helpers.asCallback(
1✔
185
        this._ready.then(() => {}),
186
        cb
187
      );
188
    }
189

190
    return this._ready;
46✔
191
  }
192

193
  _commandable(requireBlocking) {
194
    if (requireBlocking ? this.paused : this._isClosed) {
45,083✔
195
      return Promise.reject(new Error('closed'));
1✔
196
    }
197

198
    if (this._isReady) {
45,082✔
199
      return Promise.resolve(requireBlocking ? this.bclient : this.client);
33,842✔
200
    }
201

202
    return this._ready.then(() => this._commandable(requireBlocking));
11,240✔
203
  }
204

205
  close(timeout, cb) {
206
    if (typeof timeout === 'function') {
139✔
207
      cb = timeout;
2✔
208
      timeout = defaults['#close'].timeout;
2✔
209
    } else if (!Number.isSafeInteger(timeout) || timeout <= 0) {
137✔
210
      timeout = defaults['#close'].timeout;
131✔
211
    }
212

213
    if (this.paused) {
139✔
214
      if (cb) helpers.asCallback(this._closed, cb);
2✔
215
      return this._closed;
2✔
216
    }
217

218
    this.paused = true;
137✔
219

220
    if (this.checkTimer) {
137✔
221
      clearTimeout(this.checkTimer);
4✔
222
      this.checkTimer = null;
4✔
223
    }
224

225
    if (this._delayedTimer) {
137✔
226
      this._delayedTimer.stop();
9✔
227
    }
228

229
    const drain = () =>
137✔
230
      helpers.finallyRejectsWithInitial(this._ready, () => {
137✔
231
        // Stop the blocking connection, ensures that we don't accept additional
232
        // jobs while waiting for the ongoing jobs to terminate.
233
        if (this.settings.isWorker) {
137✔
234
          redis.disconnect(this.bclient);
128✔
235
        }
236

237
        // Wait for all the jobs to complete. Ignore job errors during shutdown.
238
        return Promise.all(
137✔
239
          Array.from(this.activeJobs, (promise) => promise.catch(() => {}))
25✔
240
        ).then(() => {});
241
      });
242

243
    const cleanup = () => {
137✔
244
      this._isClosed = true;
137✔
245

246
      const clients = [];
137✔
247
      if (this.client) {
137✔
248
        if (this.settings.quitCommandClient) {
136✔
249
          clients.push(this.client);
130✔
250
        } else {
251
          this.client.removeListener('error', this._emitError);
6✔
252
        }
253
      }
254
      if (this.eclient) {
137✔
255
        clients.push(this.eclient);
123✔
256
      }
257

258
      // node_redis' implementation of the QUIT command does not permit it to
259
      // fail. We do not need to consider the case that the quit command aborts.
260
      return Promise.all(
137✔
261
        clients.map((client) => helpers.callAsync((done) => client.quit(done)))
253✔
262
      );
263
    };
264

265
    const closed = helpers.finallyRejectsWithInitial(
137✔
266
      helpers.withTimeout(drain(), timeout),
267
      () => cleanup()
137✔
268
    );
269

270
    this._closed = closed;
137✔
271

272
    if (cb) helpers.asCallback(closed, cb);
137✔
273
    return closed;
137✔
274
  }
275

276
  destroy(cb) {
277
    const promise = this._commandable().then((client) => {
3✔
278
      const deleted = helpers.deferred();
2✔
279
      const args = [
2✔
280
        'id',
281
        'jobs',
282
        'stallBlock',
283
        'stalling',
284
        'waiting',
285
        'active',
286
        'succeeded',
287
        'failed',
288
        'delayed',
289
      ].map((key) => this.toKey(key));
18✔
290
      args.push(deleted.defer());
2✔
291
      client.del.apply(client, args);
2✔
292
      return deleted;
2✔
293
    });
294

295
    if (cb) helpers.asCallback(promise, cb);
3✔
296
    return promise;
3✔
297
  }
298

299
  checkHealth(cb) {
300
    const promise = this._commandable()
7✔
301
      .then((client) =>
302
        helpers.callAsync((done) =>
7✔
303
          client
7✔
304
            .multi()
305
            .llen(this.toKey('waiting'))
306
            .llen(this.toKey('active'))
307
            .scard(this.toKey('succeeded'))
308
            .scard(this.toKey('failed'))
309
            .zcard(this.toKey('delayed'))
310
            .get(this.toKey('id'))
311
            .exec(done)
312
        )
313
      )
314
      .then((results) => ({
7✔
315
        waiting: results[0],
316
        active: results[1],
317
        succeeded: results[2],
318
        failed: results[3],
319
        delayed: results[4],
320
        newestJob: results[5] ? parseInt(results[5], 10) : 0,
7✔
321
      }));
322

323
    if (cb) helpers.asCallback(promise, cb);
7✔
324
    return promise;
7✔
325
  }
326

327
  _scanForJobs(key, cursor, size, set, cb) {
328
    const batchCount = Math.min(size, this.settings.redisScanCount);
6✔
329
    this.client.sscan(key, cursor, 'COUNT', batchCount, (err, results) => {
6✔
330
      /* istanbul ignore if */
331
      if (err) {
6✔
332
        return cb(err);
333
      }
334

335
      const nextCursor = results[0];
6✔
336
      const ids = results[1];
6✔
337

338
      // A given element may be returned multiple times in SSCAN.
339
      // So, we use a set to remove duplicates.
340
      // https://redis.io/commands/scan#scan-guarantees
341
      for (const id of ids) {
6✔
342
        // For small sets, encoded as intsets, SSCAN will ignore COUNT.
343
        // https://redis.io/commands/scan#the-count-option
344
        if (set.size === size) break;
87✔
345

346
        set.add(id);
86✔
347
      }
348

349
      if (nextCursor === '0' || set.size >= size) {
6✔
350
        return cb(null, set);
5✔
351
      }
352

353
      this._scanForJobs(key, nextCursor, size, set, cb);
1✔
354
    });
355
  }
356

357
  _addJobsByIds(jobs, ids) {
358
    // We need to re-ensure the queue is commandable, as we might be shutting
359
    // down during this operation.
360
    return this._commandable()
5✔
361
      .then((client) => {
362
        const got = helpers.deferred();
5✔
363
        const commandArgs = [this.toKey('jobs')].concat(ids, got.defer());
5✔
364
        client.hmget.apply(client, commandArgs);
5✔
365
        return got;
5✔
366
      })
367
      .then((dataArray) => {
368
        const count = ids.length;
5✔
369
        // Some jobs returned by the scan may have already been removed, so
370
        // filter them out.
371
        for (let i = 0; i < count; ++i) {
5✔
372
          const jobData = dataArray[i];
87✔
373
          /* istanbul ignore else: not worth unit-testing this edge case */
374
          if (jobData) {
87✔
375
            jobs.push(Job.fromData(this, ids[i], jobData));
87✔
376
          }
377
        }
378
        return jobs;
5✔
379
      });
380
  }
381

382
  /**
383
   * Get jobs from queue type.
384
   *
385
   * @param {String} type The queue type (failed, succeeded, waiting, etc.)
386
   * @param {?Object=} page An object containing some of the following fields.
387
   * @param {Number=} page.start Start of query range for waiting/active/delayed
388
   *   queue types. Defaults to 0.
389
   * @param {Number=} page.end End of query range for waiting/active/delayed
390
   *   queue types. Defaults to 100.
391
   * @param {Number=} page.size Number jobs to return for failed/succeeded (SET)
392
   *   types. Defaults to 100.
393
   * @param {Function=} callback Called with the equivalent of the returned
394
   *   promise.
395
   * @return {Promise<Job[]>} Resolves to the jobs the function found.
396
   */
397
  getJobs(type, page, cb) {
398
    if (typeof page === 'function') {
14✔
399
      cb = page;
1✔
400
      page = null;
1✔
401
    }
402
    // Set defaults.
403
    page = Object.assign(
14✔
404
      {
405
        size: 100,
406
        start: 0,
407
        end: 100,
408
      },
409
      page
410
    );
411
    const promise = this._commandable()
14✔
412
      .then((client) => {
413
        const idsPromise = helpers.deferred(),
14✔
414
          next = idsPromise.defer();
14✔
415
        const key = this.toKey(type);
14✔
416
        switch (type) {
14✔
417
          case 'failed':
418
          case 'succeeded':
419
            this._scanForJobs(key, '0', page.size, new Set(), next);
5✔
420
            break;
5✔
421
          case 'waiting':
422
          case 'active':
423
            client.lrange(key, page.start, page.end, next);
7✔
424
            break;
7✔
425
          case 'delayed':
426
            client.zrange(key, page.start, page.end, next);
1✔
427
            break;
1✔
428
          default:
429
            throw new Error('Improper queue type');
1✔
430
        }
431

432
        return idsPromise;
13✔
433
      })
434
      .then((ids) => {
435
        const jobs = [],
13✔
436
          idsToFetch = [];
13✔
437
        // ids might be a Set or an Array, but this will iterate just the same.
438
        for (const jobId of ids) {
13✔
439
          const job = this.jobs.get(jobId);
93✔
440
          if (job) {
93✔
441
            jobs.push(job);
6✔
442
          } else {
443
            idsToFetch.push(jobId);
87✔
444
          }
445
        }
446
        if (!idsToFetch.length) {
13✔
447
          return jobs;
8✔
448
        }
449
        return this._addJobsByIds(jobs, idsToFetch);
5✔
450
      });
451

452
    if (cb) helpers.asCallback(promise, cb);
14✔
453
    return promise;
14✔
454
  }
455

456
  createJob(data) {
457
    return new Job(this, null, data);
11,236✔
458
  }
459

460
  getJob(jobId, cb) {
461
    const promise = this._commandable()
16✔
462
      .then(() =>
463
        this.jobs.has(jobId) ? this.jobs.get(jobId) : Job.fromId(this, jobId)
16✔
464
      )
465
      .then((job) => {
466
        if (job && this.settings.storeJobs) {
16✔
467
          this.jobs.set(jobId, job);
5✔
468
        }
469
        return job;
16✔
470
      });
471

472
    if (cb) helpers.asCallback(promise, cb);
16✔
473
    return promise;
16✔
474
  }
475

476
  removeJob(jobId, cb) {
477
    const promise = this._evalScript(
8✔
478
      'removeJob',
479
      7,
480
      this.toKey('succeeded'),
481
      this.toKey('failed'),
482
      this.toKey('waiting'),
483
      this.toKey('active'),
484
      this.toKey('stalling'),
485
      this.toKey('jobs'),
486
      this.toKey('delayed'),
487
      jobId
488
    ).then(() => {
489
      if (this.settings.storeJobs) {
8✔
490
        this.jobs.delete(jobId);
6✔
491
      }
492
      return this;
8✔
493
    });
494

495
    if (cb) helpers.asCallback(promise, cb);
8✔
496
    return promise;
8✔
497
  }
498

499
  _waitForJob() {
500
    return helpers
11,245✔
501
      .callAsync((done) =>
502
        this.bclient.brpoplpush(
11,245✔
503
          this.toKey('waiting'),
504
          this.toKey('active'),
505
          0,
506
          done
507
        )
508
      )
509
      .then(
510
        (jobId) =>
511
          // Note that the job may be null in the case that the client has
512
          // removed the job before processing can take place, but after the
513
          // brpoplpush has returned the job id.
514
          Job.fromId(this, jobId),
11,209✔
515
        (err) => {
516
          if (redis.isAbortError(err) && this.paused) {
36✔
517
            return null;
25✔
518
          }
519

520
          this.emit('error', err);
11✔
521

522
          // Retry the brpoplpush after a delay
523
          this._redisFailureRetryDelay = this._redisFailureRetryDelay
11✔
524
            ? this._redisFailureRetryDelay * 2
525
            : this.settings.initialRedisFailureRetryDelay;
526

527
          return helpers
11✔
528
            .delay(this._redisFailureRetryDelay)
529
            .then(() => this._waitForJob());
10✔
530
        }
531
      );
532
  }
533

534
  _getNextJob() {
535
    this._redisFailureRetryDelay = 0;
11,235✔
536
    // Under normal calling conditions, commandable will not reject because we
537
    // will have just checked paused in Queue#process.
538
    return this._commandable(true).then(() => this._waitForJob());
11,235✔
539
  }
540

541
  _runJob(job) {
542
    let psTimeout = null,
11,202✔
543
      completed = false;
11,202✔
544

545
    const preventStalling = () => {
11,202✔
546
      psTimeout = null;
11,235✔
547
      if (this._isClosed) return;
11,235✔
548
      finally_(this._preventStall(job.id), () => {
11,229✔
549
        if (completed || this._isClosed) return;
11,229✔
550
        const interval = this.settings.stallInterval / 2;
80✔
551
        psTimeout = setTimeout(preventStalling, interval);
80✔
552
      }).catch(this._emitErrorAfterTick);
553
    };
554
    preventStalling();
11,202✔
555

556
    const handleOutcome = (err, data) => {
11,202✔
557
      completed = true;
11,194✔
558
      if (psTimeout) {
11,194✔
559
        clearTimeout(psTimeout);
47✔
560
        psTimeout = null;
47✔
561
      }
562

563
      return this._finishJob(err, data, job);
11,194✔
564
    };
565

566
    let promise = this.handler(job);
11,202✔
567

568
    if (job.options.timeout) {
11,202✔
569
      const message = `Job ${job.id} timed out (${job.options.timeout} ms)`;
3✔
570
      promise = helpers.withTimeout(promise, job.options.timeout, message);
3✔
571
    }
572

573
    const jobPromise = finally_(
11,202✔
574
      promise.then((data) => handleOutcome(null, data), handleOutcome),
11,170✔
575
      // The only error that can happen here is either network- or
576
      // Redis-related, or if Queue#close times out while a job is processing,
577
      // and the job later finishes.
578
      () => this.activeJobs.delete(jobPromise)
11,194✔
579
    );
580

581
    // We specifically use the value produced by then to avoid cases where the
582
    // process handler returns the same Promise object each invocation.
583
    this.activeJobs.add(jobPromise);
11,202✔
584
    return jobPromise;
11,202✔
585
  }
586

587
  _preventStall(jobId) {
588
    return helpers.callAsync((done) =>
11,197✔
589
      this.client.srem(this.toKey('stalling'), jobId, done)
11,197✔
590
    );
591
  }
592

593
  _finishJob(err, data, job) {
594
    if (this._isClosed) {
11,194✔
595
      const status = err ? 'failed' : 'succeeded';
2✔
596
      throw new Error(`unable to update the status of ${status} job ${job.id}`);
2✔
597
    }
598

599
    const multi = this.client
11,192✔
600
      .multi()
601
      .lrem(this.toKey('active'), 0, job.id)
602
      .srem(this.toKey('stalling'), job.id);
603

604
    const delay = err ? job.computeDelay() : -1;
11,192✔
605
    const status = err ? (delay >= 0 ? 'retrying' : 'failed') : 'succeeded';
11,192✔
606

607
    job.status = status;
11,192✔
608
    if (err) {
11,192✔
609
      const errInfo = err.stack || err.message || err;
23✔
610
      job.options.stacktraces.unshift(errInfo);
23✔
611
    }
612

613
    switch (status) {
11,192✔
614
      case 'failed':
615
        if (this.settings.removeOnFailure) {
13✔
616
          multi.hdel(this.toKey('jobs'), job.id);
1✔
617
        } else {
618
          multi.hset(this.toKey('jobs'), job.id, job.toData());
12✔
619
          multi.sadd(this.toKey('failed'), job.id);
12✔
620
        }
621
        break;
13✔
622
      case 'retrying':
623
        --job.options.retries;
10✔
624
        multi.hset(this.toKey('jobs'), job.id, job.toData());
10✔
625
        if (delay === 0) {
10✔
626
          multi.lpush(this.toKey('waiting'), job.id);
6✔
627
        } else {
628
          const time = Date.now() + delay;
4✔
629
          multi
4✔
630
            .zadd(this.toKey('delayed'), time, job.id)
631
            .publish(this.toKey('earlierDelayed'), time);
632
        }
633
        break;
10✔
634
      case 'succeeded':
635
        if (this.settings.removeOnSuccess) {
11,169✔
636
          multi.hdel(this.toKey('jobs'), job.id);
1✔
637
        } else {
638
          multi.hset(this.toKey('jobs'), job.id, job.toData());
11,168✔
639
          multi.sadd(this.toKey('succeeded'), job.id);
11,168✔
640
        }
641
        break;
11,169✔
642
    }
643

644
    if (this.settings.sendEvents) {
11,192✔
645
      multi.publish(
1,189✔
646
        this.toKey('events'),
647
        JSON.stringify({
648
          id: job.id,
649
          event: status,
650
          data: err ? err.message : data,
1,189✔
651
        })
652
      );
653
    }
654

655
    const result = err || data;
11,192✔
656

657
    return helpers
11,192✔
658
      .callAsync((done) => multi.exec(done))
11,192✔
659
      .then(() => [status, result]);
11,192✔
660
  }
661

662
  process(concurrency, handler) {
663
    if (!this.settings.isWorker) {
70✔
664
      throw new Error('Cannot call Queue#process on a non-worker');
1✔
665
    }
666

667
    if (this.handler) {
69✔
668
      throw new Error('Cannot call Queue#process twice');
1✔
669
    }
670

671
    if (this.paused) {
68✔
672
      throw new Error('closed');
1✔
673
    }
674

675
    if (typeof concurrency === 'function') {
67✔
676
      handler = concurrency;
63✔
677
      concurrency = defaults['#process'].concurrency;
63✔
678
    }
679

680
    // If the handler throws a synchronous exception (only applicable to
681
    // non-`async` functions), catch it, and fail the job.
682
    const catchExceptions = true;
67✔
683
    this.handler = helpers.wrapAsync(handler, catchExceptions);
67✔
684
    this.running = 0;
67✔
685
    this.queued = 1;
67✔
686
    this.concurrency = concurrency;
67✔
687

688
    const jobTick = () => {
67✔
689
      if (this.paused) {
11,299✔
690
        this.queued -= 1;
71✔
691
        return;
71✔
692
      }
693

694
      // invariant: in this code path, this.running < this.concurrency, always
695
      // after spoolup, this.running + this.queued === this.concurrency
696
      finally_(
11,228✔
697
        this._getNextJob().then((job) => {
698
          // We're shutting down.
699
          if (this.paused) {
11,227✔
700
            // This job will get picked up later as a stalled job if we happen
701
            // to get here. We can't easily process this job because at this
702
            // point Queue#close has already captured the activeJobs set in a
703
            // Promise.all call, and we'd prefer to delay a job than
704
            // half-process it.
705
            this.queued -= 1;
25✔
706
            return;
25✔
707
          }
708

709
          this.running += 1;
11,202✔
710
          this.queued -= 1;
11,202✔
711
          if (this.running + this.queued < this.concurrency) {
11,202✔
712
            this.queued += 1;
13✔
713
            setImmediate(jobTick);
13✔
714
          }
715

716
          if (!job) {
11,202!
717
            // Per comment in Queue#_waitForJob, this branch is possible when
718
            // the job is removed before processing can take place, but after
719
            // being initially acquired.
720
            return;
×
721
          }
722

723
          return this._runJob(job).then((results) => {
11,202✔
724
            this.running -= 1;
11,192✔
725
            this.queued += 1;
11,192✔
726

727
            /* istanbul ignore else */
728
            if (results) {
11,192✔
729
              const status = results[0],
11,192✔
730
                result = results[1];
11,192✔
731
              this.emit(status, job, result);
11,192✔
732

733
              // Workaround for #184: emit failed event for backwards
734
              // compatibility while affording for a separate event that
735
              // identifies the final failure.
736
              const emitExtra =
737
                status === 'retrying'
11,192✔
738
                  ? 'failed'
739
                  : status === 'failed'
11,182✔
740
                    ? 'failed:fatal'
741
                    : null;
742
              if (emitExtra) this.emit(emitExtra, job, result);
11,192✔
743
            }
744
          }, this._emitErrorAfterTick);
745
        }),
746
        () => setImmediate(jobTick)
11,219✔
747
      ).catch(this._emitErrorAfterTick);
748
    };
749

750
    this._doStalledJobCheck().then(jobTick).catch(this._emitErrorAfterTick);
67✔
751
    this._activateDelayed();
67✔
752

753
    return this;
67✔
754
  }
755

756
  _doStalledJobCheck() {
757
    return this._evalScript(
77✔
758
      'checkStalledJobs',
759
      4,
760
      this.toKey('stallBlock'),
761
      this.toKey('stalling'),
762
      this.toKey('waiting'),
763
      this.toKey('active'),
764
      this.settings.stallInterval
765
    ).then((stalled) => {
766
      for (const jobId of stalled) {
77✔
767
        this.emit('stalled', jobId);
14✔
768
      }
769
      return stalled.length;
77✔
770
    });
771
  }
772

773
  _safeCheckStalledJobs(interval, cb) {
774
    const promise = this._checkStalledJobs(interval, cb);
6✔
775
    // If a callback is not defined, then we must emit errors to avoid unhandled
776
    // rejections. If there is a callback, then _checkStalledJobs will attach it
777
    // as an error handler to `promise`.
778
    if (!cb) promise.catch(this._emitErrorAfterTick);
6✔
779
  }
780

781
  _scheduleStalledCheck(interval, cb) {
782
    if (this.checkTimer || this.paused) return;
11✔
783
    this.checkTimer = setTimeout(() => {
10✔
784
      // The checkTimer is unset and cleared when Queue#close is called,
785
      // so we don't need to check for it here.
786
      this.checkTimer = null;
6✔
787
      this._safeCheckStalledJobs(interval, cb);
6✔
788
    }, interval);
789
  }
790

791
  _checkStalledJobs(interval, cb) {
792
    const promise = this._doStalledJobCheck();
16✔
793
    if (cb) helpers.asCallback(promise, cb);
16✔
794
    return interval && !this.checkTimer
16✔
795
      ? finally_(promise, () => {
796
          try {
11✔
797
            this._scheduleStalledCheck(interval, cb);
11✔
798
          } catch (err) {
799
            // istanbul ignore next: safety belts
800
            this._emitErrorAfterTick(err);
801
          }
802
        })
803
      : promise;
804
  }
805

806
  /**
807
   * Check for stalled jobs.
808
   *
809
   * @param {Number=} interval The interval on which to check for stalled jobs.
810
   *   This should be set to half the stallInterval setting, to avoid
811
   *   unnecessary work.
812
   * @param {Function=} callback Called with the equivalent of the returned
813
   *   promise. If interval is provided, the callback will be invoked after each
814
   *   invocation of checkStalledJobs.
815
   * @return {Promise<Number>} Resolves to the number of stalled jobs the
816
   *   function found.
817
   */
818
  checkStalledJobs(interval, cb) {
819
    if (typeof interval === 'function') {
10✔
820
      cb = interval;
1✔
821
      interval = null;
1✔
822
    } else if (!Number.isSafeInteger(interval)) {
9✔
823
      interval = null;
4✔
824
    }
825
    return this._checkStalledJobs(interval, cb);
10✔
826
  }
827

828
  /**
829
   * Save all the provided jobs, without waiting for each job to be created.
830
   * This pipelines the requests which avoids the waiting 2N*RTT for N jobs -
831
   * the client waits to receive each command result before sending the next
832
   * command. Note that this method does not support a callback parameter - you
833
   * must use the returned Promise.
834
   *
835
   * @param {Iterable<Job>} jobs The jobs to save. Jobs that have no ID will be
836
   *   assigned one by mutation.
837
   * @return {Promise<Map<Job, Error>>} The errors that occurred when saving
838
   *   jobs. Will be empty if no errors occurred. Will reject if there was an
839
   *   exception executing the batch or readying the connection.
840
   * @modifies {arguments}
841
   */
842
  saveAll(jobs) {
843
    return this._commandable().then((client) => {
4✔
844
      const batch = client.batch(),
4✔
845
        errors = new Map();
4✔
846

847
      for (const job of jobs) {
4✔
848
        try {
8✔
849
          job
8✔
850
            ._save((evalArgs) => this._evalScriptOn(batch, evalArgs))
6✔
851
            .catch((err) => void errors.set(job, err));
1✔
852
        } catch (err) {
853
          errors.set(job, err);
2✔
854
        }
855
      }
856
      return helpers.callAsync((done) => batch.exec(done)).then(() => errors);
4✔
857
    });
858
  }
859

860
  _activateDelayed() {
861
    if (!this.settings.activateDelayedJobs) return;
78✔
862
    this._evalScript(
19✔
863
      'raiseDelayedJobs',
864
      2,
865
      this.toKey('delayed'),
866
      this.toKey('waiting'),
867
      Date.now(),
868
      this.settings.delayedDebounce
869
    ).then(
870
      (results) => {
871
        const numRaised = results[0],
19✔
872
          nextOpportunity = results[1];
19✔
873
        if (numRaised) {
19✔
874
          this.emit('raised jobs', numRaised);
10✔
875
        }
876
        this._delayedTimer.schedule(parseInt(nextOpportunity, 10));
19✔
877
      },
878
      /* istanbul ignore next */ (err) => {
879
        // Handle aborted redis connections.
880
        if (redis.isAbortError(err)) {
881
          if (this.paused) return;
882
          // Retry.
883
          return this._activateDelayed();
884
        }
885
        this._emitErrorAfterTick(err);
886
      }
887
    );
888
  }
889

890
  toKey(str) {
891
    return this.settings.keyPrefix + str;
126,504✔
892
  }
893

894
  /**
895
   * Evaluate the named script on the given commandable object, which might be a
896
   * RedisClient or a Batch or Multi object. This exists to facilitate
897
   * command pipelining.
898
   *
899
   * @modifies {arguments}
900
   */
901
  _evalScriptOn(commandable, args) {
902
    // Precondition: Queue is ready - otherwise lua.shas may not have loaded.
903
    args[0] = lua.shas[args[0]];
11,331✔
904
    return helpers.callAsync((done) => {
11,331✔
905
      args.push(done);
11,331✔
906
      commandable.evalsha.apply(commandable, args);
11,331✔
907
    });
908
  }
909

910
  /**
911
   * Evaluate the named script, return a promise with its results.
912
   *
913
   * Same parameter list/syntax as evalsha, except for the name.
914
   *
915
   * @param {string} name
916
   */
917
  _evalScript() {
918
    // Avoid deoptimization by leaking arguments: store them directly in an
919
    // array instead of passing them to a helper.
920
    const args = new Array(arguments.length);
11,325✔
921
    for (let i = 0; i < arguments.length; ++i) {
11,325✔
922
      args[i] = arguments[i];
79,300✔
923
    }
924

925
    return this._commandable().then((client) =>
11,325✔
926
      this._evalScriptOn(client, args)
11,325✔
927
    );
928
  }
929
}
930

931
module.exports = Queue;
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc