• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 9949043480

16 Jul 2024 01:09AM UTC coverage: 94.41% (-5.6%) from 100.0%
9949043480

Pull #425

github

web-flow
Merge 6b98fbe61 into f1c1636ca
Pull Request #425: v10

466 of 546 branches covered (85.35%)

351 of 365 new or added lines in 10 files covered. (96.16%)

40 existing lines in 5 files now uncovered.

912 of 966 relevant lines covered (94.41%)

803.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.62
/src/plans.js
1
const assert = require('assert')
4✔
2

3
const states = {
4✔
4
  created: 'created',
5
  retry: 'retry',
6
  active: 'active',
7
  completed: 'completed',
8
  cancelled: 'cancelled',
9
  failed: 'failed'
10
}
11

12
const DEFAULT_SCHEMA = 'pgboss'
4✔
13
const MIGRATE_RACE_MESSAGE = 'division by zero'
4✔
14
const CREATE_RACE_MESSAGE = 'already exists'
4✔
15

16
const QUEUE_POLICY = {
4✔
17
  standard: 'standard',
18
  short: 'short',
19
  singleton: 'singleton',
20
  stately: 'stately'
21
}
22

23
module.exports = {
4✔
24
  create,
25
  insertVersion,
26
  getVersion,
27
  setVersion,
28
  versionTableExists,
29
  fetchNextJob,
30
  completeJobs,
31
  cancelJobs,
32
  resumeJobs,
33
  failJobsById,
34
  failJobsByTimeout,
35
  insertJob,
36
  insertJobs,
37
  getTime,
38
  getSchedules,
39
  schedule,
40
  unschedule,
41
  subscribe,
42
  unsubscribe,
43
  getQueuesForEvent,
44
  archive,
45
  drop,
46
  countStates,
47
  createQueue,
48
  updateQueue,
49
  createPartition,
50
  dropPartition,
51
  deleteQueueRecords,
52
  getQueueByName,
53
  getQueueSize,
54
  purgeQueue,
55
  clearStorage,
56
  getMaintenanceTime,
57
  setMaintenanceTime,
58
  getMonitorTime,
59
  setMonitorTime,
60
  getCronTime,
61
  setCronTime,
62
  locked,
63
  advisoryLock,
64
  assertMigration,
65
  getArchivedJobById,
66
  getJobById,
67
  QUEUE_POLICY,
68
  states: { ...states },
69
  MIGRATE_RACE_MESSAGE,
70
  CREATE_RACE_MESSAGE,
71
  DEFAULT_SCHEMA
72
}
73

74
function create (schema, version) {
75
  const commands = [
178✔
76
    createSchema(schema),
77
    createEnumJobState(schema),
78

79
    createTableJob(schema),
80
    createIndexJobName(schema),
81
    createIndexJobFetch(schema),
82
    createIndexJobPolicyStately(schema),
83
    createIndexJobPolicyShort(schema),
84
    createIndexJobPolicySingleton(schema),
85
    createIndexJobThrottleOn(schema),
86
    createIndexJobThrottleKey(schema),
87

88
    createTableArchive(schema),
89
    createPrimaryKeyArchive(schema),
90
    createColumnArchiveArchivedOn(schema),
91
    createIndexArchiveArchivedOn(schema),
92
    createIndexArchiveName(schema),
93

94
    createTableVersion(schema),
95
    createTableQueue(schema),
96
    createTableSchedule(schema),
97
    createTableSubscription(schema),
98

99
    getPartitionFunction(schema),
100
    createPartitionFunction(schema),
101
    dropPartitionFunction(schema),
102

103
    insertVersion(schema, version)
104
  ]
105

106
  return locked(schema, commands)
178✔
107
}
108

109
function createSchema (schema) {
110
  return `
178✔
111
    CREATE SCHEMA IF NOT EXISTS ${schema}
112
  `
113
}
114

115
function createTableVersion (schema) {
116
  return `
178✔
117
    CREATE TABLE ${schema}.version (
118
      version int primary key,
119
      maintained_on timestamp with time zone,
120
      cron_on timestamp with time zone,
121
      monitored_on timestamp with time zone
122
    )
123
  `
124
}
125

126
function createEnumJobState (schema) {
127
  // ENUM definition order is important
128
  // base type is numeric and first values are less than last values
129
  return `
178✔
130
    CREATE TYPE ${schema}.job_state AS ENUM (
131
      '${states.created}',
132
      '${states.retry}',
133
      '${states.active}',
134
      '${states.completed}',
135
      '${states.cancelled}',
136
      '${states.failed}'
137
    )
138
  `
139
}
140

141
function createTableJob (schema) {
142
  return `
178✔
143
    CREATE TABLE ${schema}.job (
144
      id uuid not null default gen_random_uuid(),
145
      name text not null,
146
      priority integer not null default(0),
147
      data jsonb,
148
      state ${schema}.job_state not null default('${states.created}'),
149
      retryLimit integer not null default(0),
150
      retryCount integer not null default(0),
151
      retryDelay integer not null default(0),
152
      retryBackoff boolean not null default false,
153
      startAfter timestamp with time zone not null default now(),
154
      startedOn timestamp with time zone,
155
      singletonKey text,
156
      singletonOn timestamp without time zone,
157
      expireIn interval not null default interval '15 minutes',
158
      createdOn timestamp with time zone not null default now(),
159
      completedOn timestamp with time zone,
160
      keepUntil timestamp with time zone NOT NULL default now() + interval '14 days',
161
      output jsonb,
162
      deadletter text,
163
      policy text,
164
      CONSTRAINT job_pkey PRIMARY KEY (name, id)
165
    ) PARTITION BY LIST (name)
166
  `
167
}
168

169
function createPartition (schema, name) {
170
  return `SELECT ${schema}.create_partition('${name}');`
157✔
171
}
172

173
function getPartitionFunction (schema) {
174
  return `
178✔
175
    CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS
176
    $$
177
    SELECT '${schema}.job_' || encode(sha224(queue_name::bytea), 'hex');
178
    $$
179
    LANGUAGE SQL
180
    IMMUTABLE
181
  `
182
}
183

184
function createPartitionFunction (schema) {
185
  return `
178✔
186
    CREATE FUNCTION ${schema}.create_partition(queue_name text)
187
    RETURNS VOID AS
188
    $$
189
    DECLARE
190
      table_name varchar := ${schema}.get_partition(queue_name);
191
    BEGIN
192
      EXECUTE format('CREATE TABLE %I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name);
193
      EXECUTE format('ALTER TABLE %I ADD CHECK (name=%L)', table_name, queue_name);
194
      EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION %I FOR VALUES IN (%L)', table_name, queue_name);
195
    END;
196
    $$
197
    LANGUAGE plpgsql;
198
  `
199
}
200

201
function dropPartitionFunction (schema) {
202
  return `
178✔
203
    CREATE FUNCTION ${schema}.drop_partition(queue_name text)
204
    RETURNS VOID AS
205
    $$
206
    BEGIN  
207
      EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name));
208
    END;
209
    $$
210
    LANGUAGE plpgsql;
211
  `
212
}
213

214
function dropPartition (schema, name) {
215
  return `SELECT ${schema}.drop_partition('${name}');`
1✔
216
}
217

218
function createPrimaryKeyArchive (schema) {
219
  return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)`
178✔
220
}
221

222
function createIndexJobPolicyShort (schema) {
223
  return `CREATE UNIQUE INDEX job_policy_short ON ${schema}.job (name) WHERE state = '${states.created}' AND policy = '${QUEUE_POLICY.short}'`
178✔
224
}
225

226
function createIndexJobPolicySingleton (schema) {
227
  return `CREATE UNIQUE INDEX job_policy_singleton ON ${schema}.job (name) WHERE state = '${states.active}' AND policy = '${QUEUE_POLICY.singleton}'`
178✔
228
}
229

230
function createIndexJobPolicyStately (schema) {
231
  return `CREATE UNIQUE INDEX job_policy_stately ON ${schema}.job (name, state) WHERE state <= '${states.active}' AND policy = '${QUEUE_POLICY.stately}'`
178✔
232
}
233

234
function createIndexJobThrottleOn (schema) {
235
  return `CREATE UNIQUE INDEX job_throttle_on ON ${schema}.job (name, singletonOn, COALESCE(singletonKey, '')) WHERE state <= '${states.completed}' AND singletonOn IS NOT NULL`
178✔
236
}
237

238
function createIndexJobThrottleKey (schema) {
239
  return `CREATE UNIQUE INDEX job_throttle_key ON ${schema}.job (name, singletonKey) WHERE state <= '${states.completed}' AND singletonOn IS NULL`
178✔
240
}
241

242
function createIndexJobName (schema) {
243
  return `CREATE INDEX job_name ON ${schema}.job (name)`
178✔
244
}
245

246
function createIndexJobFetch (schema) {
247
  return `CREATE INDEX job_fetch ON ${schema}.job (name, startAfter) INCLUDE (priority, createdOn, id) WHERE state < '${states.active}'`
178✔
248
}
249

250
function createTableArchive (schema) {
251
  return `CREATE TABLE ${schema}.archive (LIKE ${schema}.job)`
178✔
252
}
253

254
function createColumnArchiveArchivedOn (schema) {
255
  return `ALTER TABLE ${schema}.archive ADD archivedOn timestamptz NOT NULL DEFAULT now()`
178✔
256
}
257

258
function createIndexArchiveArchivedOn (schema) {
259
  return `CREATE INDEX archive_archivedon_idx ON ${schema}.archive(archivedon)`
178✔
260
}
261

262
function createIndexArchiveName (schema) {
263
  return `CREATE INDEX archive_name_idx ON ${schema}.archive(name)`
178✔
264
}
265

266
function getMaintenanceTime (schema) {
267
  return `SELECT maintained_on, EXTRACT( EPOCH FROM (now() - maintained_on) ) seconds_ago FROM ${schema}.version`
182✔
268
}
269

270
function setMaintenanceTime (schema) {
271
  return `UPDATE ${schema}.version SET maintained_on = now()`
182✔
272
}
273

274
function getMonitorTime (schema) {
275
  return `SELECT monitored_on, EXTRACT( EPOCH FROM (now() - monitored_on) ) seconds_ago FROM ${schema}.version`
182✔
276
}
277

278
function setMonitorTime (schema) {
279
  return `UPDATE ${schema}.version SET monitored_on = now()`
182✔
280
}
281

282
function setCronTime (schema, time) {
283
  time = time || 'now()'
182✔
284
  return `UPDATE ${schema}.version SET cron_on = ${time}`
182✔
285
}
286

287
function getCronTime (schema) {
288
  return `SELECT cron_on, EXTRACT( EPOCH FROM (now() - cron_on) ) seconds_ago FROM ${schema}.version`
182✔
289
}
290

291
function createQueue (schema) {
292
  return `
157✔
293
    INSERT INTO ${schema}.queue (name, policy, retry_limit, retry_delay, retry_backoff, expire_seconds, retention_minutes, dead_letter)
294
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
295
  `
296
}
297

298
function updateQueue (schema) {
299
  return `
1✔
300
    UPDATE ${schema}.queue SET
301
      retry_limit = COALESCE($2, retry_limit),
302
      retry_delay = COALESCE($3, retry_delay),
303
      retry_backoff = COALESCE($4, retry_backoff),
304
      expire_seconds = COALESCE($5, expire_seconds),
305
      retention_minutes = COALESCE($6, retention_minutes),
306
      dead_letter = COALESCE($7, dead_letter)
307
    WHERE name = $1
308
  `
309
}
310

311
function getQueueByName (schema) {
312
  return `SELECT * FROM ${schema}.queue WHERE name = $1`
187✔
313
}
314

315
function deleteQueueRecords (schema) {
316
  return `WITH dq AS (
2✔
317
      DELETE FROM ${schema}.queue WHERE name = $1
318
    ), ds AS (
319
      DELETE FROM ${schema}.schedule WHERE name = $1
320
    )
321
    DELETE FROM ${schema}.job WHERE name = $1
322
  `
323
}
324

325
function purgeQueue (schema) {
326
  return `DELETE from ${schema}.job WHERE name = $1 and state < '${states.active}'`
3✔
327
}
328

329
function clearStorage (schema) {
330
  return `TRUNCATE ${schema}.job, ${schema}.archive`
2✔
331
}
332

333
function getQueueSize (schema, options = {}) {
6✔
334
  options.before = options.before || states.active
6✔
335
  assert(options.before in states, `${options.before} is not a valid state`)
6✔
336
  return `SELECT count(*) as count FROM ${schema}.job WHERE name = $1 AND state < '${options.before}'`
6✔
337
}
338

339
function createTableQueue (schema) {
340
  return `
178✔
341
    CREATE TABLE ${schema}.queue (
342
      name text primary key,
343
      policy text,
344
      retry_limit int,
345
      retry_delay int,
346
      retry_backoff bool,
347
      expire_seconds int,
348
      retention_minutes int,
349
      dead_letter text,
350
      created_on timestamp with time zone not null default now()
351
    )
352
  `
353
}
354

355
function createTableSchedule (schema) {
356
  return `
178✔
357
    CREATE TABLE ${schema}.schedule (
358
      name text primary key,
359
      cron text not null,
360
      timezone text,
361
      data jsonb,
362
      options jsonb,
363
      created_on timestamp with time zone not null default now(),
364
      updated_on timestamp with time zone not null default now()
365
    )
366
  `
367
}
368

369
function createTableSubscription (schema) {
370
  return `
178✔
371
    CREATE TABLE ${schema}.subscription (
372
      event text not null,
373
      name text not null,
374
      created_on timestamp with time zone not null default now(),
375
      updated_on timestamp with time zone not null default now(),
376
      PRIMARY KEY(event, name)
377
    )
378
  `
379
}
380

381
function getSchedules (schema) {
382
  return `
182✔
383
    SELECT s.*
384
    FROM ${schema}.schedule s
385
      JOIN ${schema}.queue q on s.name = q.name
386
  `
387
}
388

389
function schedule (schema) {
390
  return `
182✔
391
    INSERT INTO ${schema}.schedule (name, cron, timezone, data, options)
392
    VALUES ($1, $2, $3, $4, $5)
393
    ON CONFLICT (name) DO UPDATE SET
394
      cron = EXCLUDED.cron,
395
      timezone = EXCLUDED.timezone,
396
      data = EXCLUDED.data,
397
      options = EXCLUDED.options,
398
      updated_on = now()
399
  `
400
}
401

402
function unschedule (schema) {
403
  return `
182✔
404
    DELETE FROM ${schema}.schedule
405
    WHERE name = $1
406
  `
407
}
408

409
function subscribe (schema) {
410
  return `
182✔
411
    INSERT INTO ${schema}.subscription (event, name)
412
    VALUES ($1, $2)
413
    ON CONFLICT (event, name) DO UPDATE SET
414
      event = EXCLUDED.event,
415
      name = EXCLUDED.name,
416
      updated_on = now()
417
  `
418
}
419

420
function unsubscribe (schema) {
421
  return `
182✔
422
    DELETE FROM ${schema}.subscription
423
    WHERE event = $1 and name = $2
424
  `
425
}
426

427
function getQueuesForEvent (schema) {
428
  return `
182✔
429
    SELECT name FROM ${schema}.subscription
430
    WHERE event = $1
431
  `
432
}
433

434
function getTime () {
435
  return "SELECT round(date_part('epoch', now()) * 1000) as time"
182✔
436
}
437

438
function getVersion (schema) {
439
  return `SELECT version from ${schema}.version`
4✔
440
}
441

442
function setVersion (schema, version) {
UNCOV
443
  return `UPDATE ${schema}.version SET version = '${version}'`
×
444
}
445

446
function versionTableExists (schema) {
447
  return `SELECT to_regclass('${schema}.version') as name`
182✔
448
}
449

450
function insertVersion (schema, version) {
451
  return `INSERT INTO ${schema}.version(version) VALUES ('${version}')`
178✔
452
}
453

454
function fetchNextJob (schema) {
455
  return ({ includeMetadata, priority = true } = {}) => `
26,075!
456
    WITH next as (
457
      SELECT id
458
      FROM ${schema}.job
459
      WHERE name = $1
460
        AND state < '${states.active}'
461
        AND startAfter < now()
462
      ORDER BY ${priority && 'priority desc, '} createdOn, id
52,150✔
463
      LIMIT $2
464
      FOR UPDATE SKIP LOCKED
465
    )
466
    UPDATE ${schema}.job j SET
467
      state = '${states.active}',
468
      startedOn = now(),
469
      retryCount = CASE WHEN startedOn IS NOT NULL THEN retryCount + 1 ELSE retryCount END
470
    FROM next
471
    WHERE name = $1 AND j.id = next.id
472
    RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data'}, 
26,075✔
473
      EXTRACT(epoch FROM expireIn) as expire_in_seconds
474
  `
475
}
476

477
function completeJobs (schema) {
478
  return `
182✔
479
    WITH results AS (
480
      UPDATE ${schema}.job
481
      SET completedOn = now(),
482
        state = '${states.completed}',
483
        output = $3::jsonb
484
      WHERE name = $1
485
        AND id IN (SELECT UNNEST($2::uuid[]))
486
        AND state = '${states.active}'
487
      RETURNING *
488
    )
489
    SELECT COUNT(*) FROM results
490
  `
491
}
492

493
function failJobsById (schema) {
494
  const where = `name = $1 AND id IN (SELECT UNNEST($2::uuid[])) AND state < '${states.completed}'`
182✔
495
  const output = '$3::jsonb'
182✔
496

497
  return failJobs(schema, where, output)
182✔
498
}
499

500
function failJobsByTimeout (schema) {
501
  const where = `state = '${states.active}' AND (startedOn + expireIn) < now()`
182✔
502
  const output = '\'{ "value": { "message": "job failed by timeout in active state" } }\'::jsonb'
182✔
503
  return failJobs(schema, where, output)
182✔
504
}
505

506
function failJobs (schema, where, output) {
507
  return `
364✔
508
    WITH results AS (
509
      UPDATE ${schema}.job SET
510
        state = CASE
511
          WHEN retryCount < retryLimit THEN '${states.retry}'::${schema}.job_state
512
          ELSE '${states.failed}'::${schema}.job_state
513
          END,
514
        completedOn = CASE
515
          WHEN retryCount < retryLimit THEN NULL
516
          ELSE now()
517
          END,
518
        startAfter = CASE
519
          WHEN retryCount = retryLimit THEN startAfter
520
          WHEN NOT retryBackoff THEN now() + retryDelay * interval '1'
521
          ELSE now() + (
522
                retryDelay * 2 ^ LEAST(16, retryCount + 1) / 2 +
523
                retryDelay * 2 ^ LEAST(16, retryCount + 1) / 2 * random()
524
            ) * interval '1'
525
          END,
526
        output = ${output}
527
      WHERE ${where}
528
      RETURNING *
529
    ), dlq_jobs as (
530
      INSERT INTO ${schema}.job (name, data, output, retryLimit, keepUntil)
531
      SELECT
532
        deadletter,
533
        data,
534
        output,
535
        retryLimit,
536
        keepUntil + (keepUntil - startAfter)
537
      FROM results
538
      WHERE state = '${states.failed}'
539
        AND deadletter IS NOT NULL
540
        AND NOT name = deadletter
541
    )
542
    SELECT COUNT(*) FROM results
543
  `
544
}
545

546
function cancelJobs (schema) {
547
  return `
182✔
548
    with results as (
549
      UPDATE ${schema}.job
550
      SET completedOn = now(),
551
        state = '${states.cancelled}'
552
      WHERE name = $1
553
        AND id IN (SELECT UNNEST($2::uuid[]))
554
        AND state < '${states.completed}'
555
      RETURNING 1
556
    )
557
    SELECT COUNT(*) from results
558
  `
559
}
560

561
function resumeJobs (schema) {
562
  return `
182✔
563
    with results as (
564
      UPDATE ${schema}.job
565
      SET completedOn = NULL,
566
        state = '${states.created}'
567
      WHERE name = $1
568
        AND id IN (SELECT UNNEST($2::uuid[]))
569
      RETURNING 1
570
    )
571
    SELECT COUNT(*) from results
572
  `
573
}
574

575
function insertJob (schema) {
576
  return `
182✔
577
    INSERT INTO ${schema}.job (
578
      id,
579
      name,
580
      data,
581
      priority,
582
      startAfter,
583
      singletonKey,
584
      singletonOn,
585
      deadletter,
586
      expireIn,
587
      keepUntil,
588
      retryLimit,
589
      retryDelay,
590
      retryBackoff,
591
      policy
592
    )
593
    SELECT
594
      id,
595
      j.name,
596
      data,
597
      priority,
598
      startAfter,
599
      singletonKey,
600
      singletonOn,
601
      COALESCE(deadLetter, q.dead_letter) as deadletter,
602
      CASE
603
        WHEN expireIn IS NOT NULL THEN CAST(expireIn as interval)
604
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
605
        WHEN expireInDefault IS NOT NULL THEN CAST(expireInDefault as interval)
606
        ELSE interval '15 minutes'
607
        END as expireIn,
608
      CASE
609
        WHEN right(keepUntil, 1) = 'Z' THEN CAST(keepUntil as timestamp with time zone)
610
        ELSE startAfter + CAST(COALESCE(keepUntil, (q.retention_minutes * 60)::text, keepUntilDefault, '14 days') as interval)
611
        END as keepUntil,
612
      COALESCE(retryLimit, q.retry_limit, retryLimitDefault, 2) as retryLimit,
613
      CASE
614
        WHEN COALESCE(retryBackoff, q.retry_backoff, retryBackoffDefault, false)
615
        THEN GREATEST(COALESCE(retryDelay, q.retry_delay, retryDelayDefault), 1)
616
        ELSE COALESCE(retryDelay, q.retry_delay, retryDelayDefault, 0)
617
        END as retryDelay,
618
      COALESCE(retryBackoff, q.retry_backoff, retryBackoffDefault, false) as retryBackoff,
619
      q.policy
620
    FROM
621
      ( SELECT
622
          COALESCE($1::uuid, gen_random_uuid()) as id,
623
          $2 as name,
624
          $3::jsonb as data,
625
          COALESCE($4::int, 0) as priority,
626
          CASE
627
            WHEN right($5, 1) = 'Z' THEN CAST($5 as timestamp with time zone)
628
            ELSE now() + CAST(COALESCE($5,'0') as interval)
629
            END as startAfter,
630
          $6 as singletonKey,
631
          CASE
632
            WHEN $7::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($7 * floor((date_part('epoch', now()) + $8) / $7))
633
            ELSE NULL
634
            END as singletonOn,
635
          $9 as deadletter,
636
          $10 as expireIn,
637
          $11 as expireInDefault,
638
          $12 as keepUntil,
639
          $13 as keepUntilDefault,
640
          $14::int as retryLimit,
641
          $15::int as retryLimitDefault,
642
          $16::int as retryDelay,
643
          $17::int as retryDelayDefault,
644
          $18::bool as retryBackoff,
645
          $19::bool as retryBackoffDefault
646
      ) j LEFT JOIN ${schema}.queue q ON j.name = q.name
647
    ON CONFLICT DO NOTHING
648
    RETURNING id
649
  `
650
}
651

652
function insertJobs (schema) {
653
  return `
182✔
654
    WITH defaults as (
655
      SELECT 
656
        $2 as expireIn,
657
        $3 as keepUntil,
658
        $4::int as retryLimit,
659
        $5::int as retryDelay,
660
        $6::bool as retryBackoff
661
    )
662
    INSERT INTO ${schema}.job (
663
      id,
664
      name,
665
      data,
666
      priority,
667
      startAfter,
668
      singletonKey,
669
      deadletter,
670
      expireIn,
671
      keepUntil,
672
      retryLimit,
673
      retryDelay,
674
      retryBackoff,
675
      policy
676
    )
677
    SELECT
678
      COALESCE(id, gen_random_uuid()) as id,
679
      j.name,
680
      data,
681
      COALESCE(priority, 0),
682
      COALESCE("startAfter", now()),
683
      "singletonKey",
684
      COALESCE("deadLetter", q.dead_letter),
685
      CASE
686
        WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" *  interval '1s'
687
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
688
        WHEN defaults.expireIn IS NOT NULL THEN CAST(defaults.expireIn as interval)
689
        ELSE interval '15 minutes'
690
        END as expireIn,
691
      CASE
692
        WHEN "keepUntil" IS NOT NULL THEN "keepUntil"
693
        ELSE COALESCE("startAfter", now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keepUntil, '14 days') as interval)
694
        END as keepUntil,
695
      COALESCE("retryLimit", q.retry_limit, defaults.retryLimit, 2),
696
      CASE
697
        WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retryBackoff, false)
698
          THEN GREATEST(COALESCE("retryDelay", q.retry_delay, defaults.retryDelay), 1)
699
        ELSE COALESCE("retryDelay", q.retry_delay, defaults.retryDelay, 0)
700
        END as retryDelay,      
701
      COALESCE("retryBackoff", q.retry_backoff, defaults.retryBackoff, false) as retryBackoff,
702
      q.policy
703
    FROM json_to_recordset($1) as j (
704
      id uuid,
705
      name text,
706
      priority integer,
707
      data jsonb,
708
      "startAfter" timestamp with time zone,
709
      "retryLimit" integer,
710
      "retryDelay" integer,
711
      "retryBackoff" boolean,
712
      "singletonKey" text,
713
      "expireInSeconds" integer,
714
      "keepUntil" timestamp with time zone,
715
      "deadLetter" text
716
    )
717
    LEFT JOIN ${schema}.queue q ON j.name = q.name,
718
      defaults
719
    ON CONFLICT DO NOTHING
720
  `
721
}
722

723
function drop (schema, interval) {
724
  return `
182✔
725
    DELETE FROM ${schema}.archive
726
    WHERE archivedOn < (now() - interval '${interval}')
727
  `
728
}
729

730
function archive (schema, completedInterval, failedInterval = completedInterval) {
×
731
  return `
182✔
732
    WITH archived_rows AS (
733
      DELETE FROM ${schema}.job
734
      WHERE (state <> '${states.failed}' AND completedOn < (now() - interval '${completedInterval}'))
735
        OR (state = '${states.failed}' AND completedOn < (now() - interval '${failedInterval}'))
736
        OR (state < '${states.active}' AND keepUntil < now())
737
      RETURNING *
738
    )
739
    INSERT INTO ${schema}.archive (
740
      id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil, deadletter, policy, output
741
    )
742
    SELECT
743
      id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil, deadletter, policy, output
744
    FROM archived_rows
745
  `
746
}
747

748
function countStates (schema) {
749
  return `
182✔
750
    SELECT name, state, count(*) size
751
    FROM ${schema}.job
752
    GROUP BY rollup(name), rollup(state)
753
  `
754
}
755

756
function locked (schema, query) {
757
  if (Array.isArray(query)) {
724✔
758
    query = query.join(';\n')
178✔
759
  }
760

761
  return `
724✔
762
    BEGIN;
763
    SET LOCAL lock_timeout = '30s';
764
    ${advisoryLock(schema)};
765
    ${query};
766
    COMMIT;
767
  `
768
}
769

770
function advisoryLock (schema, key) {
771
  return `SELECT pg_advisory_xact_lock(      
751✔
772
      ('x' || encode(sha224((current_database() || '.pgboss.${schema}${key || ''}')::bytea), 'hex'))::bit(64)::bigint
1,475✔
773
  )`
774
}
775

776
function assertMigration (schema, version) {
777
  // raises 'division by zero' if already on desired schema version
UNCOV
778
  return `SELECT version::int/(version::int-${version}) from ${schema}.version`
×
779
}
780

781
function getJobById (schema) {
782
  return getJobByTableAndId(schema, 'job')
182✔
783
}
784

785
function getArchivedJobById (schema) {
786
  return getJobByTableAndId(schema, 'archive')
182✔
787
}
788

789
function getJobByTableAndId (schema, table) {
790
  return `SELECT * FROM ${schema}.${table} WHERE name = $1 AND id = $2`
364✔
791
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc