• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 9981127987

17 Jul 2024 08:36PM UTC coverage: 94.416% (-5.6%) from 100.0%
9981127987

Pull #425

github

web-flow
Merge 99cf534b7 into f1c1636ca
Pull Request #425: v10

465 of 546 branches covered (85.16%)

363 of 377 new or added lines in 10 files covered. (96.29%)

40 existing lines in 5 files now uncovered.

913 of 967 relevant lines covered (94.42%)

786.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.65
/src/plans.js
1
const assert = require('assert')
4✔
2

3
const states = {
4✔
4
  created: 'created',
5
  retry: 'retry',
6
  active: 'active',
7
  completed: 'completed',
8
  cancelled: 'cancelled',
9
  failed: 'failed'
10
}
11

12
const DEFAULT_SCHEMA = 'pgboss'
4✔
13
const MIGRATE_RACE_MESSAGE = 'division by zero'
4✔
14
const CREATE_RACE_MESSAGE = 'already exists'
4✔
15

16
const QUEUE_POLICY = {
4✔
17
  standard: 'standard',
18
  short: 'short',
19
  singleton: 'singleton',
20
  stately: 'stately'
21
}
22

23
module.exports = {
4✔
24
  create,
25
  insertVersion,
26
  getVersion,
27
  setVersion,
28
  versionTableExists,
29
  fetchNextJob,
30
  completeJobs,
31
  cancelJobs,
32
  resumeJobs,
33
  failJobsById,
34
  failJobsByTimeout,
35
  insertJob,
36
  insertJobs,
37
  getTime,
38
  getSchedules,
39
  schedule,
40
  unschedule,
41
  subscribe,
42
  unsubscribe,
43
  getQueuesForEvent,
44
  archive,
45
  drop,
46
  countStates,
47
  createQueue,
48
  updateQueue,
49
  createPartition,
50
  dropPartition,
51
  deleteQueueRecords,
52
  getQueueByName,
53
  getQueueSize,
54
  purgeQueue,
55
  clearStorage,
56
  getMaintenanceTime,
57
  setMaintenanceTime,
58
  getMonitorTime,
59
  setMonitorTime,
60
  getCronTime,
61
  setCronTime,
62
  locked,
63
  advisoryLock,
64
  assertMigration,
65
  getArchivedJobById,
66
  getJobById,
67
  QUEUE_POLICY,
68
  states: { ...states },
69
  MIGRATE_RACE_MESSAGE,
70
  CREATE_RACE_MESSAGE,
71
  DEFAULT_SCHEMA
72
}
73

74
function create (schema, version) {
75
  const commands = [
178✔
76
    createSchema(schema),
77
    createEnumJobState(schema),
78

79
    createTableJob(schema),
80
    createIndexJobFetch(schema),
81
    createIndexJobPolicyStately(schema),
82
    createIndexJobPolicyShort(schema),
83
    createIndexJobPolicySingleton(schema),
84
    createIndexJobThrottleOn(schema),
85
    createIndexJobThrottleKey(schema),
86

87
    createTableArchive(schema),
88
    createPrimaryKeyArchive(schema),
89
    createColumnArchiveArchivedOn(schema),
90
    createIndexArchiveArchivedOn(schema),
91

92
    createTableVersion(schema),
93
    createTableQueue(schema),
94
    createTableSchedule(schema),
95
    createTableSubscription(schema),
96

97
    getPartitionFunction(schema),
98
    createPartitionFunction(schema),
99
    dropPartitionFunction(schema),
100

101
    insertVersion(schema, version)
102
  ]
103

104
  return locked(schema, commands)
178✔
105
}
106

107
function createSchema (schema) {
108
  return `
178✔
109
    CREATE SCHEMA IF NOT EXISTS ${schema}
110
  `
111
}
112

113
function createTableVersion (schema) {
114
  return `
178✔
115
    CREATE TABLE ${schema}.version (
116
      version int primary key,
117
      maintained_on timestamp with time zone,
118
      cron_on timestamp with time zone,
119
      monitored_on timestamp with time zone
120
    )
121
  `
122
}
123

124
function createEnumJobState (schema) {
125
  // ENUM definition order is important
126
  // base type is numeric and first values are less than last values
127
  return `
178✔
128
    CREATE TYPE ${schema}.job_state AS ENUM (
129
      '${states.created}',
130
      '${states.retry}',
131
      '${states.active}',
132
      '${states.completed}',
133
      '${states.cancelled}',
134
      '${states.failed}'
135
    )
136
  `
137
}
138

139
function createTableJob (schema) {
140
  return `
178✔
141
    CREATE TABLE ${schema}.job (
142
      id uuid not null default gen_random_uuid(),
143
      name text not null,
144
      priority integer not null default(0),
145
      data jsonb,
146
      state ${schema}.job_state not null default('${states.created}'),
147
      retry_limit integer not null default(0),
148
      retry_count integer not null default(0),
149
      retry_delay integer not null default(0),
150
      retry_backoff boolean not null default false,
151
      start_after timestamp with time zone not null default now(),
152
      started_on timestamp with time zone,
153
      singleton_key text,
154
      singleton_on timestamp without time zone,
155
      expire_in interval not null default interval '15 minutes',
156
      created_on timestamp with time zone not null default now(),
157
      completed_on timestamp with time zone,
158
      keep_until timestamp with time zone NOT NULL default now() + interval '14 days',
159
      output jsonb,
160
      dead_letter text,
161
      policy text,
162
      CONSTRAINT job_pkey PRIMARY KEY (name, id)
163
    ) PARTITION BY LIST (name)
164
  `
165
}
166

167
const baseJobColumns = 'id, name, data, EXTRACT(epoch FROM expire_in) as "expireInSeconds"'
4✔
168
const allJobColumns = `${baseJobColumns}, policy, state, priority,
4✔
169
  retry_limit as "retryLimit",
170
  retry_count as "retryCount",
171
  retry_delay as "retryDelay",
172
  retry_backoff as "retryBackoff",
173
  start_after as "startAfter",  
174
  started_on as "startedOn",
175
  singleton_key as "singletonKey",
176
  singleton_on as "singletonOn",
177
  expire_in as "expireIn",
178
  created_on as "createdOn",
179
  completed_on as "completedOn",
180
  keep_until as "keepUntil",
181
  dead_letter as "deadLetter",
182
  output
183
`
184

185
function createPartition (schema, name) {
186
  return `SELECT ${schema}.create_partition('${name}');`
157✔
187
}
188

189
function getPartitionFunction (schema) {
190
  return `
178✔
191
    CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS
192
    $$
193
    SELECT '${schema}.job_' || encode(sha224(queue_name::bytea), 'hex');
194
    $$
195
    LANGUAGE SQL
196
    IMMUTABLE
197
  `
198
}
199

200
function createPartitionFunction (schema) {
201
  return `
178✔
202
    CREATE FUNCTION ${schema}.create_partition(queue_name text)
203
    RETURNS VOID AS
204
    $$
205
    DECLARE
206
      table_name varchar := ${schema}.get_partition(queue_name);
207
    BEGIN
208
      EXECUTE format('CREATE TABLE %I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name);
209
      EXECUTE format('ALTER TABLE %I ADD CHECK (name=%L)', table_name, queue_name);
210
      EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION %I FOR VALUES IN (%L)', table_name, queue_name);
211
    END;
212
    $$
213
    LANGUAGE plpgsql;
214
  `
215
}
216

217
function dropPartitionFunction (schema) {
218
  return `
178✔
219
    CREATE FUNCTION ${schema}.drop_partition(queue_name text)
220
    RETURNS VOID AS
221
    $$
222
    BEGIN  
223
      EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name));
224
    END;
225
    $$
226
    LANGUAGE plpgsql;
227
  `
228
}
229

230
function dropPartition (schema, name) {
231
  return `SELECT ${schema}.drop_partition('${name}');`
1✔
232
}
233

234
function createPrimaryKeyArchive (schema) {
235
  return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)`
178✔
236
}
237

238
function createIndexJobPolicyShort (schema) {
239
  return `CREATE UNIQUE INDEX job_policy_short ON ${schema}.job (name) WHERE state = '${states.created}' AND policy = '${QUEUE_POLICY.short}'`
178✔
240
}
241

242
function createIndexJobPolicySingleton (schema) {
243
  return `CREATE UNIQUE INDEX job_policy_singleton ON ${schema}.job (name) WHERE state = '${states.active}' AND policy = '${QUEUE_POLICY.singleton}'`
178✔
244
}
245

246
function createIndexJobPolicyStately (schema) {
247
  return `CREATE UNIQUE INDEX job_policy_stately ON ${schema}.job (name, state) WHERE state <= '${states.active}' AND policy = '${QUEUE_POLICY.stately}'`
178✔
248
}
249

250
function createIndexJobThrottleOn (schema) {
251
  return `CREATE UNIQUE INDEX job_throttle_on ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <= '${states.completed}' AND singleton_on IS NOT NULL`
178✔
252
}
253

254
function createIndexJobThrottleKey (schema) {
255
  return `CREATE UNIQUE INDEX job_throttle_key ON ${schema}.job (name, singleton_key) WHERE state <= '${states.completed}' AND singleton_on IS NULL`
178✔
256
}
257

258
function createIndexJobFetch (schema) {
259
  return `CREATE INDEX job_fetch ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${states.active}'`
178✔
260
}
261

262
function createTableArchive (schema) {
263
  return `CREATE TABLE ${schema}.archive (LIKE ${schema}.job)`
178✔
264
}
265

266
function createColumnArchiveArchivedOn (schema) {
267
  return `ALTER TABLE ${schema}.archive ADD archived_on timestamptz NOT NULL DEFAULT now()`
178✔
268
}
269

270
function createIndexArchiveArchivedOn (schema) {
271
  return `CREATE INDEX archive_archived_on_idx ON ${schema}.archive(archived_on)`
178✔
272
}
273

274
function getMaintenanceTime (schema) {
275
  return `SELECT maintained_on, EXTRACT( EPOCH FROM (now() - maintained_on) ) seconds_ago FROM ${schema}.version`
182✔
276
}
277

278
function setMaintenanceTime (schema) {
279
  return `UPDATE ${schema}.version SET maintained_on = now()`
182✔
280
}
281

282
function getMonitorTime (schema) {
283
  return `SELECT monitored_on, EXTRACT( EPOCH FROM (now() - monitored_on) ) seconds_ago FROM ${schema}.version`
182✔
284
}
285

286
function setMonitorTime (schema) {
287
  return `UPDATE ${schema}.version SET monitored_on = now()`
182✔
288
}
289

290
function setCronTime (schema, time) {
291
  time = time || 'now()'
182✔
292
  return `UPDATE ${schema}.version SET cron_on = ${time}`
182✔
293
}
294

295
function getCronTime (schema) {
296
  return `SELECT cron_on, EXTRACT( EPOCH FROM (now() - cron_on) ) seconds_ago FROM ${schema}.version`
182✔
297
}
298

299
function createQueue (schema) {
300
  return `
157✔
301
    INSERT INTO ${schema}.queue (name, policy, retry_limit, retry_delay, retry_backoff, expire_seconds, retention_minutes, dead_letter)
302
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
303
  `
304
}
305

306
function updateQueue (schema) {
307
  return `
1✔
308
    UPDATE ${schema}.queue SET
309
      retry_limit = COALESCE($2, retry_limit),
310
      retry_delay = COALESCE($3, retry_delay),
311
      retry_backoff = COALESCE($4, retry_backoff),
312
      expire_seconds = COALESCE($5, expire_seconds),
313
      retention_minutes = COALESCE($6, retention_minutes),
314
      dead_letter = COALESCE($7, dead_letter)
315
    WHERE name = $1
316
  `
317
}
318

319
function getQueueByName (schema) {
320
  return `SELECT * FROM ${schema}.queue WHERE name = $1`
187✔
321
}
322

323
function deleteQueueRecords (schema) {
324
  return `WITH dq AS (
2✔
325
      DELETE FROM ${schema}.queue WHERE name = $1
326
    ), ds AS (
327
      DELETE FROM ${schema}.schedule WHERE name = $1
328
    )
329
    DELETE FROM ${schema}.job WHERE name = $1
330
  `
331
}
332

333
function purgeQueue (schema) {
334
  return `DELETE from ${schema}.job WHERE name = $1 and state < '${states.active}'`
3✔
335
}
336

337
function clearStorage (schema) {
338
  return `TRUNCATE ${schema}.job, ${schema}.archive`
2✔
339
}
340

341
function getQueueSize (schema, options = {}) {
6✔
342
  options.before = options.before || states.active
6✔
343
  assert(options.before in states, `${options.before} is not a valid state`)
6✔
344
  return `SELECT count(*) as count FROM ${schema}.job WHERE name = $1 AND state < '${options.before}'`
6✔
345
}
346

347
function createTableQueue (schema) {
348
  return `
178✔
349
    CREATE TABLE ${schema}.queue (
350
      name text primary key,
351
      policy text,
352
      retry_limit int,
353
      retry_delay int,
354
      retry_backoff bool,
355
      expire_seconds int,
356
      retention_minutes int,
357
      dead_letter text,
358
      created_on timestamp with time zone not null default now()
359
    )
360
  `
361
}
362

363
function createTableSchedule (schema) {
364
  return `
178✔
365
    CREATE TABLE ${schema}.schedule (
366
      name text primary key,
367
      cron text not null,
368
      timezone text,
369
      data jsonb,
370
      options jsonb,
371
      created_on timestamp with time zone not null default now(),
372
      updated_on timestamp with time zone not null default now()
373
    )
374
  `
375
}
376

377
function createTableSubscription (schema) {
378
  return `
178✔
379
    CREATE TABLE ${schema}.subscription (
380
      event text not null,
381
      name text not null,
382
      created_on timestamp with time zone not null default now(),
383
      updated_on timestamp with time zone not null default now(),
384
      PRIMARY KEY(event, name)
385
    )
386
  `
387
}
388

389
function getSchedules (schema) {
390
  return `
182✔
391
    SELECT s.*
392
    FROM ${schema}.schedule s
393
      JOIN ${schema}.queue q on s.name = q.name
394
  `
395
}
396

397
function schedule (schema) {
398
  return `
182✔
399
    INSERT INTO ${schema}.schedule (name, cron, timezone, data, options)
400
    VALUES ($1, $2, $3, $4, $5)
401
    ON CONFLICT (name) DO UPDATE SET
402
      cron = EXCLUDED.cron,
403
      timezone = EXCLUDED.timezone,
404
      data = EXCLUDED.data,
405
      options = EXCLUDED.options,
406
      updated_on = now()
407
  `
408
}
409

410
function unschedule (schema) {
411
  return `
182✔
412
    DELETE FROM ${schema}.schedule
413
    WHERE name = $1
414
  `
415
}
416

417
function subscribe (schema) {
418
  return `
182✔
419
    INSERT INTO ${schema}.subscription (event, name)
420
    VALUES ($1, $2)
421
    ON CONFLICT (event, name) DO UPDATE SET
422
      event = EXCLUDED.event,
423
      name = EXCLUDED.name,
424
      updated_on = now()
425
  `
426
}
427

428
function unsubscribe (schema) {
429
  return `
182✔
430
    DELETE FROM ${schema}.subscription
431
    WHERE event = $1 and name = $2
432
  `
433
}
434

435
function getQueuesForEvent (schema) {
436
  return `
182✔
437
    SELECT name FROM ${schema}.subscription
438
    WHERE event = $1
439
  `
440
}
441

442
function getTime () {
443
  return "SELECT round(date_part('epoch', now()) * 1000) as time"
182✔
444
}
445

446
function getVersion (schema) {
447
  return `SELECT version from ${schema}.version`
4✔
448
}
449

450
function setVersion (schema, version) {
UNCOV
451
  return `UPDATE ${schema}.version SET version = '${version}'`
×
452
}
453

454
function versionTableExists (schema) {
455
  return `SELECT to_regclass('${schema}.version') as name`
182✔
456
}
457

458
function insertVersion (schema, version) {
459
  return `INSERT INTO ${schema}.version(version) VALUES ('${version}')`
178✔
460
}
461

462
function fetchNextJob (schema) {
463
  return ({ includeMetadata, priority = true } = {}) => `
25,510!
464
    WITH next as (
465
      SELECT id
466
      FROM ${schema}.job
467
      WHERE name = $1
468
        AND state < '${states.active}'
469
        AND start_after < now()
470
      ORDER BY ${priority && 'priority desc, '} created_on, id
51,020✔
471
      LIMIT $2
472
      FOR UPDATE SKIP LOCKED
473
    )
474
    UPDATE ${schema}.job j SET
475
      state = '${states.active}',
476
      started_on = now(),
477
      retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END
478
    FROM next
479
    WHERE name = $1 AND j.id = next.id
480
    RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns}      
25,510✔
481
  `
482
}
483

484
function completeJobs (schema) {
485
  return `
182✔
486
    WITH results AS (
487
      UPDATE ${schema}.job
488
      SET completed_on = now(),
489
        state = '${states.completed}',
490
        output = $3::jsonb
491
      WHERE name = $1
492
        AND id IN (SELECT UNNEST($2::uuid[]))
493
        AND state = '${states.active}'
494
      RETURNING *
495
    )
496
    SELECT COUNT(*) FROM results
497
  `
498
}
499

500
function failJobsById (schema) {
501
  const where = `name = $1 AND id IN (SELECT UNNEST($2::uuid[])) AND state < '${states.completed}'`
182✔
502
  const output = '$3::jsonb'
182✔
503

504
  return failJobs(schema, where, output)
182✔
505
}
506

507
function failJobsByTimeout (schema) {
508
  const where = `state = '${states.active}' AND (started_on + expire_in) < now()`
182✔
509
  const output = '\'{ "value": { "message": "job failed by timeout in active state" } }\'::jsonb'
182✔
510
  return failJobs(schema, where, output)
182✔
511
}
512

513
function failJobs (schema, where, output) {
514
  return `
364✔
515
    WITH results AS (
516
      UPDATE ${schema}.job SET
517
        state = CASE
518
          WHEN retry_count < retry_limit THEN '${states.retry}'::${schema}.job_state
519
          ELSE '${states.failed}'::${schema}.job_state
520
          END,
521
        completed_on = CASE
522
          WHEN retry_count < retry_limit THEN NULL
523
          ELSE now()
524
          END,
525
        start_after = CASE
526
          WHEN retry_count = retry_limit THEN start_after
527
          WHEN NOT retry_backoff THEN now() + retry_delay * interval '1'
528
          ELSE now() + (
529
                retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 +
530
                retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 * random()
531
            ) * interval '1'
532
          END,
533
        output = ${output}
534
      WHERE ${where}
535
      RETURNING *
536
    ), dlq_jobs as (
537
      INSERT INTO ${schema}.job (name, data, output, retry_limit, keep_until)
538
      SELECT
539
        dead_letter,
540
        data,
541
        output,
542
        retry_limit,
543
        keep_until + (keep_until - start_after)
544
      FROM results
545
      WHERE state = '${states.failed}'
546
        AND dead_letter IS NOT NULL
547
        AND NOT name = dead_letter
548
    )
549
    SELECT COUNT(*) FROM results
550
  `
551
}
552

553
function cancelJobs (schema) {
554
  return `
182✔
555
    with results as (
556
      UPDATE ${schema}.job
557
      SET completed_on = now(),
558
        state = '${states.cancelled}'
559
      WHERE name = $1
560
        AND id IN (SELECT UNNEST($2::uuid[]))
561
        AND state < '${states.completed}'
562
      RETURNING 1
563
    )
564
    SELECT COUNT(*) from results
565
  `
566
}
567

568
function resumeJobs (schema) {
569
  return `
182✔
570
    with results as (
571
      UPDATE ${schema}.job
572
      SET completed_on = NULL,
573
        state = '${states.created}'
574
      WHERE name = $1
575
        AND id IN (SELECT UNNEST($2::uuid[]))
576
      RETURNING 1
577
    )
578
    SELECT COUNT(*) from results
579
  `
580
}
581

582
function insertJob (schema) {
583
  return `
182✔
584
    INSERT INTO ${schema}.job (
585
      id,
586
      name,
587
      data,
588
      priority,
589
      start_after,
590
      singleton_key,
591
      singleton_on,
592
      dead_letter,
593
      expire_in,
594
      keep_until,
595
      retry_limit,
596
      retry_delay,
597
      retry_backoff,
598
      policy
599
    )
600
    SELECT
601
      id,
602
      j.name,
603
      data,
604
      priority,
605
      start_after,
606
      singleton_key,
607
      singleton_on,
608
      COALESCE(j.dead_letter, q.dead_letter) as dead_letter,
609
      CASE
610
        WHEN expire_in IS NOT NULL THEN CAST(expire_in as interval)
611
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
612
        WHEN expire_in_default IS NOT NULL THEN CAST(expire_in_default as interval)
613
        ELSE interval '15 minutes'
614
        END as expire_in,
615
      CASE
616
        WHEN right(keep_until, 1) = 'Z' THEN CAST(keep_until as timestamp with time zone)
617
        ELSE start_after + CAST(COALESCE(keep_until, (q.retention_minutes * 60)::text, keep_until_default, '14 days') as interval)
618
        END as keep_until,
619
      COALESCE(j.retry_limit, q.retry_limit, retry_limit_default, 2) as retry_limit,
620
      CASE
621
        WHEN COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false)
622
        THEN GREATEST(COALESCE(j.retry_delay, q.retry_delay, retry_delay_default), 1)
623
        ELSE COALESCE(j.retry_delay, q.retry_delay, retry_delay_default, 0)
624
        END as retry_delay,
625
      COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false) as retry_backoff,
626
      q.policy
627
    FROM
628
      ( SELECT
629
          COALESCE($1::uuid, gen_random_uuid()) as id,
630
          $2 as name,
631
          $3::jsonb as data,
632
          COALESCE($4::int, 0) as priority,
633
          CASE
634
            WHEN right($5, 1) = 'Z' THEN CAST($5 as timestamp with time zone)
635
            ELSE now() + CAST(COALESCE($5,'0') as interval)
636
            END as start_after,
637
          $6 as singleton_key,
638
          CASE
639
            WHEN $7::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($7 * floor((date_part('epoch', now()) + $8) / $7))
640
            ELSE NULL
641
            END as singleton_on,
642
          $9 as dead_letter,
643
          $10 as expire_in,
644
          $11 as expire_in_default,
645
          $12 as keep_until,
646
          $13 as keep_until_default,
647
          $14::int as retry_limit,
648
          $15::int as retry_limit_default,
649
          $16::int as retry_delay,
650
          $17::int as retry_delay_default,
651
          $18::bool as retry_backoff,
652
          $19::bool as retry_backoff_default
653
      ) j LEFT JOIN ${schema}.queue q ON j.name = q.name
654
    ON CONFLICT DO NOTHING
655
    RETURNING id
656
  `
657
}
658

659
function insertJobs (schema) {
660
  return `
182✔
661
    WITH defaults as (
662
      SELECT 
663
        $2 as expire_in,
664
        $3 as keep_until,
665
        $4::int as retry_limit,
666
        $5::int as retry_delay,
667
        $6::bool as retry_backoff
668
    )
669
    INSERT INTO ${schema}.job (
670
      id,
671
      name,
672
      data,
673
      priority,
674
      start_after,
675
      singleton_key,
676
      dead_letter,
677
      expire_in,
678
      keep_until,
679
      retry_limit,
680
      retry_delay,
681
      retry_backoff,
682
      policy
683
    )
684
    SELECT
685
      COALESCE(id, gen_random_uuid()) as id,
686
      j.name,
687
      data,
688
      COALESCE(priority, 0),
689
      COALESCE("startAfter", now()),
690
      "singletonKey",
691
      COALESCE("deadLetter", q.dead_letter),
692
      CASE
693
        WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" *  interval '1s'
694
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
695
        WHEN defaults.expire_in IS NOT NULL THEN CAST(defaults.expire_in as interval)
696
        ELSE interval '15 minutes'
697
        END as expire_in,
698
      CASE
699
        WHEN "keepUntil" IS NOT NULL THEN "keepUntil"
700
        ELSE COALESCE("startAfter", now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval)
701
        END as keep_until,
702
      COALESCE("retryLimit", q.retry_limit, defaults.retry_limit, 2),
703
      CASE
704
        WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false)
705
          THEN GREATEST(COALESCE("retryDelay", q.retry_delay, defaults.retry_delay), 1)
706
        ELSE COALESCE("retryDelay", q.retry_delay, defaults.retry_delay, 0)
707
        END as retry_delay,      
708
      COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) as retry_backoff,
709
      q.policy
710
    FROM json_to_recordset($1) as j (
711
      id uuid,
712
      name text,
713
      priority integer,
714
      data jsonb,
715
      "startAfter" timestamp with time zone,
716
      "retryLimit" integer,
717
      "retryDelay" integer,
718
      "retryBackoff" boolean,
719
      "singletonKey" text,
720
      "expireInSeconds" integer,
721
      "keepUntil" timestamp with time zone,
722
      "deadLetter" text
723
    )
724
    LEFT JOIN ${schema}.queue q ON j.name = q.name,
725
      defaults
726
    ON CONFLICT DO NOTHING
727
  `
728
}
729

730
function drop (schema, interval) {
731
  return `
182✔
732
    DELETE FROM ${schema}.archive
733
    WHERE archived_on < (now() - interval '${interval}')
734
  `
735
}
736

737
function archive (schema, completedInterval, failedInterval = completedInterval) {
×
738
  const columns = 'id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff, start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on, keep_until, dead_letter, policy, output'
182✔
739

740
  return `
182✔
741
    WITH archived_rows AS (
742
      DELETE FROM ${schema}.job
743
      WHERE (state <> '${states.failed}' AND completed_on < (now() - interval '${completedInterval}'))
744
        OR (state = '${states.failed}' AND completed_on < (now() - interval '${failedInterval}'))
745
        OR (state < '${states.active}' AND keep_until < now())
746
      RETURNING *
747
    )
748
    INSERT INTO ${schema}.archive (${columns})
749
    SELECT ${columns}
750
    FROM archived_rows
751
  `
752
}
753

754
function countStates (schema) {
755
  return `
182✔
756
    SELECT name, state, count(*) size
757
    FROM ${schema}.job
758
    GROUP BY rollup(name), rollup(state)
759
  `
760
}
761

762
function locked (schema, query) {
763
  if (Array.isArray(query)) {
724✔
764
    query = query.join(';\n')
178✔
765
  }
766

767
  return `
724✔
768
    BEGIN;
769
    SET LOCAL lock_timeout = '30s';
770
    ${advisoryLock(schema)};
771
    ${query};
772
    COMMIT;
773
  `
774
}
775

776
function advisoryLock (schema, key) {
777
  return `SELECT pg_advisory_xact_lock(      
750✔
778
      ('x' || encode(sha224((current_database() || '.pgboss.${schema}${key || ''}')::bytea), 'hex'))::bit(64)::bigint
1,474✔
779
  )`
780
}
781

782
function assertMigration (schema, version) {
783
  // raises 'division by zero' if already on desired schema version
UNCOV
784
  return `SELECT version::int/(version::int-${version}) from ${schema}.version`
×
785
}
786

787
function getJobById (schema) {
788
  return getJobByTableQueueId(schema, 'job')
182✔
789
}
790

791
function getArchivedJobById (schema) {
792
  return getJobByTableQueueId(schema, 'archive')
182✔
793
}
794

795
function getJobByTableQueueId (schema, table) {
796
  return `SELECT ${allJobColumns} FROM ${schema}.${table} WHERE name = $1 AND id = $2`
364✔
797
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc