• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 16127615119

07 Jul 2025 08:51PM UTC coverage: 94.475% (-5.5%) from 100.0%
16127615119

Pull #495

github

web-flow
Merge ec219eae0 into 6d4c88633
Pull Request #495: v11

368 of 430 branches covered (85.58%)

304 of 305 new or added lines in 8 files covered. (99.67%)

49 existing lines in 4 files now uncovered.

855 of 905 relevant lines covered (94.48%)

103.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.65
/src/manager.js
1
const assert = require('node:assert')
4✔
2
const EventEmitter = require('node:events')
4✔
3
const { randomUUID } = require('node:crypto')
4✔
4
const { serializeError: stringify } = require('serialize-error')
4✔
5
const { delay, resolveWithinSeconds } = require('./tools')
4✔
6
const Attorney = require('./attorney')
4✔
7
const Worker = require('./worker')
4✔
8
const plans = require('./plans')
4✔
9

10
const { QUEUES: TIMEKEEPER_QUEUES } = require('./timekeeper')
4✔
11
const { QUEUE_POLICIES } = plans
4✔
12

13
const INTERNAL_QUEUES = Object.values(TIMEKEEPER_QUEUES).reduce((acc, i) => ({ ...acc, [i]: i }), {})
4✔
14

15
const events = {
4✔
16
  error: 'error',
17
  wip: 'wip'
18
}
19

20
class Manager extends EventEmitter {
21
  constructor (db, config) {
22
    super()
187✔
23

24
    this.config = config
187✔
25
    this.db = db
187✔
26
    this.wipTs = Date.now()
187✔
27
    this.workers = new Map()
187✔
28
    this.queues = null
187✔
29

30
    this.events = events
187✔
31
    this.functions = [
187✔
32
      this.complete,
33
      this.cancel,
34
      this.resume,
35
      this.retry,
36
      this.fail,
37
      this.fetch,
38
      this.work,
39
      this.offWork,
40
      this.notifyWorker,
41
      this.publish,
42
      this.subscribe,
43
      this.unsubscribe,
44
      this.insert,
45
      this.send,
46
      this.sendDebounced,
47
      this.sendThrottled,
48
      this.sendAfter,
49
      this.createQueue,
50
      this.updateQueue,
51
      this.deleteQueue,
52
      this.getQueueStats,
53
      this.getQueue,
54
      this.getQueues,
55
      this.deleteQueuedJobs,
56
      this.deleteStoredJobs,
57
      this.deleteAllJobs,
58
      this.deleteJob,
59
      this.getJobById
60
    ]
61
  }
62

63
  async start () {
64
    this.stopped = false
185✔
65
    this.queueCacheInterval = setInterval(() => this.onCacheQueues({ emit: true }), this.config.queueCacheIntervalSeconds * 1000)
185✔
66
    await this.onCacheQueues()
185✔
67
  }
68

69
  async onCacheQueues ({ emit = false } = {}) {
370✔
70
    try {
187✔
71
      assert(!this.config.__test__throw_queueCache, 'test error')
187✔
72
      const queues = await this.getQueues()
184✔
73
      this.queues = queues.reduce((acc, i) => { acc[i.name] = i; return acc }, {})
184✔
74
    } catch (error) {
75
      emit && this.emit(events.error, { ...error, message: error.message, stack: error.stack })
3✔
76
    }
77
  }
78

79
  async getQueueCache (name) {
80
    let queue = this.queues[name]
565✔
81

82
    if (queue) {
564✔
83
      return queue
413✔
84
    }
85

86
    queue = await this.getQueue(name)
151✔
87

88
    if (!queue) {
151!
NEW
89
      throw new Error(`Queue ${name} does not exist`)
×
90
    }
91

92
    this.queues[name] = queue
151✔
93

94
    return queue
151✔
95
  }
96

97
  async stop () {
98
    this.stopped = true
185✔
99

100
    clearInterval(this.queueCacheInterval)
185✔
101

102
    for (const worker of this.workers.values()) {
185✔
103
      if (!INTERNAL_QUEUES[worker.name]) {
35✔
104
        await this.offWork(worker.name)
25✔
105
      }
106
    }
107
  }
108

109
  async failWip () {
110
    for (const worker of this.workers.values()) {
183✔
111
      const jobIds = worker.jobs.map(j => j.id)
11✔
112
      if (jobIds.length) {
11✔
113
        await this.fail(worker.name, jobIds, 'pg-boss shut down while active')
1✔
114
      }
115
    }
116
  }
117

118
  async work (name, ...args) {
119
    const { options, callback } = Attorney.checkWorkArgs(name, args)
40✔
120
    return await this.watch(name, options, callback)
37✔
121
  }
122

123
  addWorker (worker) {
124
    this.workers.set(worker.id, worker)
36✔
125
  }
126

127
  removeWorker (worker) {
128
    this.workers.delete(worker.id)
35✔
129
  }
130

131
  getWorkers () {
132
    return Array.from(this.workers.values())
222✔
133
  }
134

135
  emitWip (name) {
136
    if (!INTERNAL_QUEUES[name]) {
104✔
137
      const now = Date.now()
78✔
138

139
      if (now - this.wipTs > 2000) {
78✔
140
        this.emit(events.wip, this.getWipData())
16✔
141
        this.wipTs = now
16✔
142
      }
143
    }
144
  }
145

146
  getWipData (options = {}) {
16✔
147
    const { includeInternal = false } = options
185✔
148

149
    const data = this.getWorkers()
185✔
150
      .map(({
151
        id,
152
        name,
153
        options,
154
        state,
155
        jobs,
156
        createdOn,
157
        lastFetchedOn,
158
        lastJobStartedOn,
159
        lastJobEndedOn,
160
        lastError,
161
        lastErrorOn
162
      }) => ({
35✔
163
        id,
164
        name,
165
        options,
166
        state,
167
        count: jobs.length,
168
        createdOn,
169
        lastFetchedOn,
170
        lastJobStartedOn,
171
        lastJobEndedOn,
172
        lastError,
173
        lastErrorOn
174
      }))
175
      .filter(i => i.count > 0 && (!INTERNAL_QUEUES[i.name] || includeInternal))
35!
176

177
    return data
185✔
178
  }
179

180
  async watch (name, options, callback) {
181
    if (this.stopped) {
37✔
182
      throw new Error('Workers are disabled. pg-boss is stopped')
1✔
183
    }
184

185
    const {
186
      pollingInterval: interval = this.config.pollingInterval,
×
187
      batchSize,
188
      includeMetadata = false,
34✔
189
      priority = true
36✔
190
    } = options
36✔
191

192
    const id = randomUUID({ disableEntropyCache: true })
36✔
193

194
    const fetch = () => this.fetch(name, { batchSize, includeMetadata, priority })
111✔
195

196
    const onFetch = async (jobs) => {
36✔
197
      if (!jobs.length) {
111✔
198
        return
58✔
199
      }
200

201
      if (this.config.__test__throw_worker) {
53✔
202
        throw new Error('__test__throw_worker')
1✔
203
      }
204

205
      this.emitWip(name)
52✔
206

207
      const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expireInSeconds), 0)
56✔
208
      const jobIds = jobs.map(job => job.id)
56✔
209

210
      try {
52✔
211
        const result = await resolveWithinSeconds(callback(jobs), maxExpiration, `handler execution exceeded ${maxExpiration}s`)
52✔
212
        this.complete(name, jobIds, jobIds.length === 1 ? result : undefined)
36✔
213
      } catch (err) {
214
        this.fail(name, jobIds, err)
16✔
215
      }
216

217
      this.emitWip(name)
52✔
218
    }
219

220
    const onError = error => {
36✔
221
      this.emit(events.error, { ...error, message: error.message, stack: error.stack, queue: name, worker: id })
1✔
222
    }
223

224
    const worker = new Worker({ id, name, options, interval, fetch, onFetch, onError })
36✔
225

226
    this.addWorker(worker)
36✔
227

228
    worker.start()
36✔
229

230
    return id
36✔
231
  }
232

233
  async offWork (value) {
234
    assert(value, 'Missing required argument')
38✔
235

236
    const query = (typeof value === 'string')
37✔
237
      ? { filter: i => i.name === value }
38✔
238
      : (typeof value === 'object' && value.id)
3!
239
          ? { filter: i => i.id === value.id }
1✔
240
          : null
241

242
    assert(query, 'Invalid argument. Expected string or object: { id }')
37✔
243

244
    const workers = this.getWorkers().filter(i => query.filter(i) && !i.stopping && !i.stopped)
39✔
245

246
    if (workers.length === 0) {
37✔
247
      return
2✔
248
    }
249

250
    for (const worker of workers) {
35✔
251
      worker.stop()
36✔
252
    }
253

254
    setImmediate(async () => {
35✔
255
      while (!workers.every(w => w.stopped)) {
47✔
256
        await delay(1000)
12✔
257
      }
258

259
      for (const worker of workers) {
34✔
260
        this.removeWorker(worker)
35✔
261
      }
262
    })
263
  }
264

265
  notifyWorker (workerId) {
266
    if (this.workers.has(workerId)) {
1!
267
      this.workers.get(workerId).notify()
1✔
268
    }
269
  }
270

271
  async subscribe (event, name) {
272
    assert(event, 'Missing required argument')
5✔
273
    assert(name, 'Missing required argument')
5✔
274
    const sql = plans.subscribe(this.config.schema)
5✔
275
    return await this.db.executeSql(sql, [event, name])
5✔
276
  }
277

278
  async unsubscribe (event, name) {
279
    assert(event, 'Missing required argument')
4✔
280
    assert(name, 'Missing required argument')
3✔
281
    const sql = plans.unsubscribe(this.config.schema)
2✔
282
    return await this.db.executeSql(sql, [event, name])
2✔
283
  }
284

285
  async publish (event, ...args) {
286
    assert(event, 'Missing required argument')
11✔
287
    const sql = plans.getQueuesForEvent(this.config.schema)
10✔
288
    const { rows } = await this.db.executeSql(sql, [event])
10✔
289

290
    await Promise.allSettled(rows.map(({ name }) => this.send(name, ...args)))
10✔
291
  }
292

293
  async send (...args) {
294
    const { name, data, options } = Attorney.checkSendArgs(args)
186✔
295

296
    return await this.createJob(name, data, options)
183✔
297
  }
298

299
  async sendAfter (name, data, options, after) {
300
    options = options ? { ...options } : {}
1!
301
    options.startAfter = after
1✔
302

303
    const result = Attorney.checkSendArgs([name, data, options])
1✔
304

305
    return await this.createJob(result.name, result.data, result.options)
1✔
306
  }
307

308
  async sendThrottled (name, data, options, seconds, key) {
309
    options = options ? { ...options } : {}
2!
310
    options.singletonSeconds = seconds
2✔
311
    options.singletonNextSlot = false
2✔
312
    options.singletonKey = key
2✔
313

314
    const result = Attorney.checkSendArgs([name, data, options])
2✔
315

316
    return await this.createJob(result.name, result.data, result.options)
2✔
317
  }
318

319
  async sendDebounced (name, data, options, seconds, key) {
320
    options = options ? { ...options } : {}
3!
321
    options.singletonSeconds = seconds
3✔
322
    options.singletonNextSlot = true
3✔
323
    options.singletonKey = key
3✔
324

325
    const result = Attorney.checkSendArgs([name, data, options])
3✔
326

327
    return await this.createJob(result.name, result.data, result.options)
3✔
328
  }
329

330
  async createJob (name, data, options) {
331
    const singletonOffset = 0
189✔
332

333
    const {
334
      id = null,
189✔
335
      db: wrapper,
336
      priority,
337
      startAfter,
338
      singletonKey = null,
176✔
339
      singletonSeconds,
340
      singletonNextSlot,
341
      expireInSeconds,
342
      deleteAfterSeconds,
343
      keepUntil,
344
      retryLimit,
345
      retryDelay,
346
      retryBackoff,
347
      retryDelayMax
348
    } = options
189✔
349

350
    const job = {
189✔
351
      id,
352
      name,
353
      data,
354
      priority,
355
      startAfter,
356
      singletonKey,
357
      singletonSeconds,
358
      singletonOffset,
359
      expireInSeconds,
360
      deleteAfterSeconds,
361
      keepUntil,
362
      retryLimit,
363
      retryDelay,
364
      retryBackoff,
365
      retryDelayMax
366
    }
367

368
    const db = wrapper || this.db
189✔
369

370
    const { table } = await this.getQueueCache(name)
189✔
371

372
    const sql = plans.insertJobs(this.config.schema, { table, name, returnId: true })
188✔
373

374
    const { rows: try1 } = await db.executeSql(sql, [JSON.stringify([job])])
188✔
375

376
    if (try1.length === 1) {
188✔
377
      return try1[0].id
175✔
378
    }
379

380
    if (singletonNextSlot) {
13✔
381
      // delay starting by the offset to honor throttling config
382
      job.startAfter = this.getDebounceStartAfter(singletonSeconds, this.timekeeper.clockSkew)
3✔
383
      job.singletonOffset = singletonSeconds
3✔
384

385
      const { rows: try2 } = await db.executeSql(sql, [JSON.stringify([job])])
3✔
386

387
      if (try2.length === 1) {
3✔
388
        return try2[0].id
2✔
389
      }
390
    }
391

392
    return null
11✔
393
  }
394

395
  async insert (name, jobs, options = {}) {
20✔
396
    assert(Array.isArray(jobs), 'jobs argument should be an array')
21✔
397

398
    const { table } = await this.getQueueCache(name)
21✔
399

400
    const db = this.assertDb(options)
21✔
401

402
    const sql = plans.insertJobs(this.config.schema, { table, name, returnId: false })
21✔
403

404
    const { rows } = await db.executeSql(sql, [JSON.stringify(jobs)])
21✔
405

406
    return (rows.length) ? rows.map(i => i.id) : null
21!
407
  }
408

409
  getDebounceStartAfter (singletonSeconds, clockOffset) {
410
    const debounceInterval = singletonSeconds * 1000
3✔
411

412
    const now = Date.now() + clockOffset
3✔
413

414
    const slot = Math.floor(now / debounceInterval) * debounceInterval
3✔
415

416
    // prevent startAfter=0 during debouncing
417
    let startAfter = (singletonSeconds - Math.floor((now - slot) / 1000)) || 1
3!
418

419
    if (singletonSeconds > 1) {
3!
420
      startAfter++
3✔
421
    }
422

423
    return startAfter
3✔
424
  }
425

426
  async fetch (name, options = {}) {
79✔
427
    Attorney.checkFetchArgs(name, options)
207✔
428

429
    const db = this.assertDb(options)
206✔
430

431
    const { table } = await this.getQueueCache(name)
206✔
432

433
    const sql = plans.fetchNextJob({ ...options, schema: this.config.schema, table, name, limit: options.batchSize })
206✔
434

435
    let result
436

437
    try {
206✔
438
      result = await db.executeSql(sql)
206✔
439
    } catch (err) {
440
      // errors from fetchquery should only be unique constraint violations
441
    }
442

443
    return result?.rows || []
206✔
444
  }
445

446
  mapCompletionIdArg (id, funcName) {
447
    const errorMessage = `${funcName}() requires an id`
92✔
448

449
    assert(id, errorMessage)
92✔
450

451
    const ids = Array.isArray(id) ? id : [id]
92✔
452

453
    assert(ids.length, errorMessage)
92✔
454

455
    return ids
92✔
456
  }
457

458
  mapCompletionDataArg (data) {
459
    if (data === null || typeof data === 'undefined' || typeof data === 'function') { return null }
81✔
460

461
    const result = (typeof data === 'object' && !Array.isArray(data))
31✔
462
      ? data
463
      : { value: data }
464

465
    return stringify(result)
31✔
466
  }
467

468
  mapCommandResponse (ids, result) {
469
    return {
92✔
470
      jobs: ids,
471
      requested: ids.length,
472
      affected: result && result.rows ? parseInt(result.rows[0].count) : 0
276!
473
    }
474
  }
475

476
  async complete (name, id, data, options = {}) {
48✔
477
    Attorney.assertQueueName(name)
49✔
478
    const db = this.assertDb(options)
48✔
479
    const ids = this.mapCompletionIdArg(id, 'complete')
48✔
480
    const { table } = await this.getQueueCache(name)
48✔
481
    const sql = plans.completeJobs(this.config.schema, table)
48✔
482
    const result = await db.executeSql(sql, [name, ids, this.mapCompletionDataArg(data)])
48✔
483
    return this.mapCommandResponse(ids, result)
48✔
484
  }
485

486
  async fail (name, id, data, options = {}) {
33✔
487
    Attorney.assertQueueName(name)
34✔
488
    const db = this.assertDb(options)
33✔
489
    const ids = this.mapCompletionIdArg(id, 'fail')
33✔
490
    const { table } = await this.getQueueCache(name)
33✔
491
    const sql = plans.failJobsById(this.config.schema, table)
33✔
492
    const result = await db.executeSql(sql, [name, ids, this.mapCompletionDataArg(data)])
33✔
493
    return this.mapCommandResponse(ids, result)
33✔
494
  }
495

496
  async cancel (name, id, options = {}) {
5✔
497
    Attorney.assertQueueName(name)
7✔
498
    const db = this.assertDb(options)
6✔
499
    const ids = this.mapCompletionIdArg(id, 'cancel')
6✔
500
    const { table } = await this.getQueueCache(name)
6✔
501
    const sql = plans.cancelJobs(this.config.schema, table)
6✔
502
    const result = await db.executeSql(sql, [name, ids])
6✔
503
    return this.mapCommandResponse(ids, result)
6✔
504
  }
505

506
  async deleteJob (name, id, options = {}) {
2✔
507
    Attorney.assertQueueName(name)
2✔
508
    const db = this.assertDb(options)
2✔
509
    const ids = this.mapCompletionIdArg(id, 'deleteJob')
2✔
510
    const { table } = await this.getQueueCache(name)
2✔
511
    const sql = plans.deleteJobsById(this.config.schema, table)
2✔
512
    const result = await db.executeSql(sql, [name, ids])
2✔
513
    return this.mapCommandResponse(ids, result)
2✔
514
  }
515

516
  async resume (name, id, options = {}) {
2✔
517
    Attorney.assertQueueName(name)
3✔
518
    const db = this.assertDb(options)
2✔
519
    const ids = this.mapCompletionIdArg(id, 'resume')
2✔
520
    const { table } = await this.getQueueCache(name)
2✔
521
    const sql = plans.resumeJobs(this.config.schema, table)
2✔
522
    const result = await db.executeSql(sql, [name, ids])
2✔
523
    return this.mapCommandResponse(ids, result)
2✔
524
  }
525

526
  async retry (name, id, options = {}) {
1✔
527
    Attorney.assertQueueName(name)
1✔
528
    const db = options.db || this.db
1✔
529
    const ids = this.mapCompletionIdArg(id, 'retry')
1✔
530
    const { table } = await this.getQueueCache(name)
1✔
531
    const sql = plans.retryJobs(this.config.schema, table)
1✔
532
    const result = await db.executeSql(sql, [name, ids])
1✔
533
    return this.mapCommandResponse(ids, result)
1✔
534
  }
535

536
  async createQueue (name, options = {}) {
156✔
537
    name = name || options.name
176!
538

539
    Attorney.assertQueueName(name)
176✔
540

541
    options.policy = options.policy || QUEUE_POLICIES.standard
176✔
542

543
    assert(options.policy in QUEUE_POLICIES, `${options.policy} is not a valid queue policy`)
176✔
544

545
    Attorney.validateQueueArgs(options)
175✔
546

547
    if (options.deadLetter) {
175✔
548
      Attorney.assertQueueName(options.deadLetter)
4✔
549
      assert.notStrictEqual(name, options.deadLetter, 'deadLetter cannot be itself')
4✔
550
      await this.getQueueCache(options.deadLetter)
4✔
551
    }
552

553
    const sql = plans.createQueue(this.config.schema, name, options)
175✔
554
    await this.db.executeSql(sql)
175✔
555
  }
556

557
  async getQueues (names) {
558
    if (names) {
203✔
559
      names = Array.isArray(names) ? names : [names]
7!
560
      for (const name of names) {
7✔
561
        Attorney.assertQueueName(name)
7✔
562
      }
563
    }
564

565
    const sql = plans.getQueues(this.config.schema, names)
203✔
566
    const { rows } = await this.db.executeSql(sql)
203✔
567
    return rows
203✔
568
  }
569

570
  async updateQueue (name, options = {}) {
×
571
    Attorney.assertQueueName(name)
3✔
572

573
    assert(Object.keys(options).length > 0, 'no properties found to update')
3✔
574

575
    if ('policy' in options) {
3✔
576
      assert(options.policy in QUEUE_POLICIES, `${options.policy} is not a valid queue policy`)
1✔
577
    }
578

579
    Attorney.validateQueueArgs(options)
3✔
580

581
    const { deadLetter } = options
3✔
582

583
    if (deadLetter) {
3!
584
      Attorney.assertQueueName(deadLetter)
3✔
585
      assert.notStrictEqual(name, deadLetter, 'deadLetter cannot be itself')
3✔
586
    }
587

588
    const sql = plans.updateQueue(this.config.schema, { deadLetter })
3✔
589
    await this.db.executeSql(sql, [name, options])
3✔
590
  }
591

592
  async getQueue (name) {
593
    Attorney.assertQueueName(name)
159✔
594

595
    const sql = plans.getQueues(this.config.schema, [name])
159✔
596
    const { rows } = await this.db.executeSql(sql)
159✔
597

598
    return rows[0] || null
159✔
599
  }
600

601
  async deleteQueue (name) {
602
    Attorney.assertQueueName(name)
5✔
603

604
    try {
5✔
605
      await this.getQueueCache(name)
5✔
606
      const sql = plans.deleteQueue(this.config.schema, name)
5✔
607
      await this.db.executeSql(sql)
5✔
608
    } catch {}
609
  }
610

611
  async deleteQueuedJobs (name) {
612
    Attorney.assertQueueName(name)
1✔
613
    const { table } = await this.getQueueCache(name)
1✔
614
    const sql = plans.deleteQueuedJobs(this.config.schema, table)
1✔
615
    await this.db.executeSql(sql, [name])
1✔
616
  }
617

618
  async deleteStoredJobs (name) {
619
    Attorney.assertQueueName(name)
1✔
620
    const { table } = await this.getQueueCache(name)
1✔
621
    const sql = plans.deleteStoredJobs(this.config.schema, table)
1✔
622
    await this.db.executeSql(sql, [name])
1✔
623
  }
624

625
  async deleteAllJobs (name) {
626
    Attorney.assertQueueName(name)
3✔
627
    const { table, partition } = await this.getQueueCache(name)
3✔
628

629
    if (partition) {
3✔
630
      const sql = plans.deleteAllJobs(this.config.schema, table)
2✔
631
      await this.db.executeSql(sql, [name])
2✔
632
    } else {
633
      const sql = plans.truncateTable(this.config.schema, table)
1✔
634
      await this.db.executeSql(sql)
1✔
635
    }
636
  }
637

638
  async getQueueStats (name) {
639
    Attorney.assertQueueName(name)
2✔
640

641
    const { table } = await this.getQueueCache(name)
2✔
642

643
    const sql = plans.getQueueStats(this.config.schema, table, [name])
2✔
644

645
    const { rows } = await this.db.executeSql(sql)
2✔
646

647
    return rows.at(0) || null
2!
648
  }
649

650
  async getJobById (name, id, options = {}) {
39✔
651
    Attorney.assertQueueName(name)
41✔
652

653
    const db = this.assertDb(options)
41✔
654

655
    const { table } = await this.getQueueCache(name)
41✔
656

657
    const sql = plans.getJobById(this.config.schema, table)
41✔
658

659
    const result1 = await db.executeSql(sql, [name, id])
41✔
660

661
    if (result1?.rows?.length === 1) {
41✔
662
      return result1.rows[0]
36✔
663
    } else {
664
      return null
5✔
665
    }
666
  }
667

668
  assertDb (options) {
669
    if (options.db) {
359✔
670
      return options.db
10✔
671
    }
672

673
    assert(this.db._pgbdb && this.db.opened, 'Database connection is not opened')
349✔
674

675
    return this.db
349✔
676
  }
677
}
678

679
module.exports = Manager
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc