• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 10016541428

20 Jul 2024 01:41AM UTC coverage: 93.17% (-6.8%) from 100.0%
10016541428

Pull #425

github

web-flow
Merge b0d7e7540 into f1c1636ca
Pull Request #425: v10

451 of 532 branches covered (84.77%)

357 of 381 new or added lines in 10 files covered. (93.7%)

40 existing lines in 5 files now uncovered.

873 of 937 relevant lines covered (93.17%)

805.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/plans.js
1
const DEFAULT_SCHEMA = 'pgboss'
4✔
2
const MIGRATE_RACE_MESSAGE = 'division by zero'
4✔
3
const CREATE_RACE_MESSAGE = 'already exists'
4✔
4

5
const JOB_STATES = Object.freeze({
4✔
6
  created: 'created',
7
  retry: 'retry',
8
  active: 'active',
9
  completed: 'completed',
10
  cancelled: 'cancelled',
11
  failed: 'failed'
12
})
13

14
const QUEUE_POLICIES = Object.freeze({
4✔
15
  standard: 'standard',
16
  short: 'short',
17
  singleton: 'singleton',
18
  stately: 'stately'
19
})
20

21
module.exports = {
4✔
22
  create,
23
  insertVersion,
24
  getVersion,
25
  setVersion,
26
  versionTableExists,
27
  fetchNextJob,
28
  completeJobs,
29
  cancelJobs,
30
  resumeJobs,
31
  failJobsById,
32
  failJobsByTimeout,
33
  insertJob,
34
  insertJobs,
35
  getTime,
36
  getSchedules,
37
  schedule,
38
  unschedule,
39
  subscribe,
40
  unsubscribe,
41
  getQueuesForEvent,
42
  archive,
43
  drop,
44
  countStates,
45
  createQueue,
46
  updateQueue,
47
  createPartition,
48
  dropPartition,
49
  deleteQueueRecords,
50
  getQueueByName,
51
  getQueueSize,
52
  purgeQueue,
53
  clearStorage,
54
  trySetMaintenanceTime,
55
  trySetMonitorTime,
56
  trySetCronTime,
57
  locked,
58
  assertMigration,
59
  getArchivedJobById,
60
  getJobById,
61
  QUEUE_POLICIES,
62
  JOB_STATES,
63
  MIGRATE_RACE_MESSAGE,
64
  CREATE_RACE_MESSAGE,
65
  DEFAULT_SCHEMA
66
}
67

68
const assert = require('assert')
4✔
69

70
function create (schema, version) {
71
  const commands = [
178✔
72
    createSchema(schema),
73
    createEnumJobState(schema),
74

75
    createTableJob(schema),
76
    createIndexJobFetch(schema),
77
    createIndexJobPolicyStately(schema),
78
    createIndexJobPolicyShort(schema),
79
    createIndexJobPolicySingleton(schema),
80
    createIndexJobThrottleOn(schema),
81
    createIndexJobThrottleKey(schema),
82

83
    createTableArchive(schema),
84
    createPrimaryKeyArchive(schema),
85
    createColumnArchiveArchivedOn(schema),
86
    createIndexArchiveArchivedOn(schema),
87

88
    createTableVersion(schema),
89
    createTableQueue(schema),
90
    createTableSchedule(schema),
91
    createTableSubscription(schema),
92

93
    getPartitionFunction(schema),
94
    createPartitionFunction(schema),
95
    dropPartitionFunction(schema),
96

97
    insertVersion(schema, version)
98
  ]
99

100
  return locked(schema, commands)
178✔
101
}
102

103
function createSchema (schema) {
104
  return `
178✔
105
    CREATE SCHEMA IF NOT EXISTS ${schema}
106
  `
107
}
108

109
function createTableVersion (schema) {
110
  return `
178✔
111
    CREATE TABLE ${schema}.version (
112
      version int primary key,
113
      maintained_on timestamp with time zone,
114
      cron_on timestamp with time zone,
115
      monitored_on timestamp with time zone
116
    )
117
  `
118
}
119

120
function createEnumJobState (schema) {
121
  // ENUM definition order is important
122
  // base type is numeric and first values are less than last values
123
  return `
178✔
124
    CREATE TYPE ${schema}.job_state AS ENUM (
125
      '${JOB_STATES.created}',
126
      '${JOB_STATES.retry}',
127
      '${JOB_STATES.active}',
128
      '${JOB_STATES.completed}',
129
      '${JOB_STATES.cancelled}',
130
      '${JOB_STATES.failed}'
131
    )
132
  `
133
}
134

135
function createTableJob (schema) {
136
  return `
178✔
137
    CREATE TABLE ${schema}.job (
138
      id uuid not null default gen_random_uuid(),
139
      name text not null,
140
      priority integer not null default(0),
141
      data jsonb,
142
      state ${schema}.job_state not null default('${JOB_STATES.created}'),
143
      retry_limit integer not null default(0),
144
      retry_count integer not null default(0),
145
      retry_delay integer not null default(0),
146
      retry_backoff boolean not null default false,
147
      start_after timestamp with time zone not null default now(),
148
      started_on timestamp with time zone,
149
      singleton_key text,
150
      singleton_on timestamp without time zone,
151
      expire_in interval not null default interval '15 minutes',
152
      created_on timestamp with time zone not null default now(),
153
      completed_on timestamp with time zone,
154
      keep_until timestamp with time zone NOT NULL default now() + interval '14 days',
155
      output jsonb,
156
      dead_letter text,
157
      policy text,
158
      CONSTRAINT job_pkey PRIMARY KEY (name, id)
159
    ) PARTITION BY LIST (name)
160
  `
161
}
162

163
const baseJobColumns = 'id, name, data, EXTRACT(epoch FROM expire_in) as "expireInSeconds"'
4✔
164
const allJobColumns = `${baseJobColumns}, policy, state, priority,
4✔
165
  retry_limit as "retryLimit",
166
  retry_count as "retryCount",
167
  retry_delay as "retryDelay",
168
  retry_backoff as "retryBackoff",
169
  start_after as "startAfter",  
170
  started_on as "startedOn",
171
  singleton_key as "singletonKey",
172
  singleton_on as "singletonOn",
173
  expire_in as "expireIn",
174
  created_on as "createdOn",
175
  completed_on as "completedOn",
176
  keep_until as "keepUntil",
177
  dead_letter as "deadLetter",
178
  output
179
`
180

181
function createPartition (schema, name) {
182
  return `SELECT ${schema}.create_partition('${name}');`
157✔
183
}
184

185
function getPartitionFunction (schema) {
186
  return `
178✔
187
    CREATE FUNCTION ${schema}.get_partition(queue_name text, out name text) AS
188
    $$
189
    SELECT '${schema}.job_' || encode(sha224(queue_name::bytea), 'hex');
190
    $$
191
    LANGUAGE SQL
192
    IMMUTABLE
193
  `
194
}
195

196
function createPartitionFunction (schema) {
197
  return `
178✔
198
    CREATE FUNCTION ${schema}.create_partition(queue_name text)
199
    RETURNS VOID AS
200
    $$
201
    DECLARE
202
      table_name varchar := ${schema}.get_partition(queue_name);
203
    BEGIN
204
      EXECUTE format('CREATE TABLE %I (LIKE ${schema}.job INCLUDING DEFAULTS INCLUDING CONSTRAINTS)', table_name);
205
      EXECUTE format('ALTER TABLE %I ADD CHECK (name=%L)', table_name, queue_name);
206
      EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION %I FOR VALUES IN (%L)', table_name, queue_name);
207
    END;
208
    $$
209
    LANGUAGE plpgsql;
210
  `
211
}
212

213
function dropPartitionFunction (schema) {
214
  return `
178✔
215
    CREATE FUNCTION ${schema}.drop_partition(queue_name text)
216
    RETURNS VOID AS
217
    $$
218
    BEGIN  
219
      EXECUTE format('DROP TABLE IF EXISTS %I', ${schema}.get_partition(queue_name));
220
    END;
221
    $$
222
    LANGUAGE plpgsql;
223
  `
224
}
225

226
function dropPartition (schema, name) {
227
  return `SELECT ${schema}.drop_partition('${name}');`
1✔
228
}
229

230
function createPrimaryKeyArchive (schema) {
231
  return `ALTER TABLE ${schema}.archive ADD CONSTRAINT archive_pkey PRIMARY KEY (name, id)`
178✔
232
}
233

234
function createIndexJobPolicyShort (schema) {
235
  return `CREATE UNIQUE INDEX job_policy_short ON ${schema}.job (name) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}'`
178✔
236
}
237

238
function createIndexJobPolicySingleton (schema) {
239
  return `CREATE UNIQUE INDEX job_policy_singleton ON ${schema}.job (name) WHERE state = '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.singleton}'`
178✔
240
}
241

242
function createIndexJobPolicyStately (schema) {
243
  return `CREATE UNIQUE INDEX job_policy_stately ON ${schema}.job (name, state) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'`
178✔
244
}
245

246
function createIndexJobThrottleOn (schema) {
247
  return `CREATE UNIQUE INDEX job_throttle_on ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NOT NULL`
178✔
248
}
249

250
function createIndexJobThrottleKey (schema) {
251
  return `CREATE UNIQUE INDEX job_throttle_key ON ${schema}.job (name, singleton_key) WHERE state <= '${JOB_STATES.completed}' AND singleton_on IS NULL`
178✔
252
}
253

254
function createIndexJobFetch (schema) {
255
  return `CREATE INDEX job_fetch ON ${schema}.job (name, start_after) INCLUDE (priority, created_on, id) WHERE state < '${JOB_STATES.active}'`
178✔
256
}
257

258
function createTableArchive (schema) {
259
  return `CREATE TABLE ${schema}.archive (LIKE ${schema}.job)`
178✔
260
}
261

262
function createColumnArchiveArchivedOn (schema) {
263
  return `ALTER TABLE ${schema}.archive ADD archived_on timestamptz NOT NULL DEFAULT now()`
178✔
264
}
265

266
function createIndexArchiveArchivedOn (schema) {
267
  return `CREATE INDEX archive_archived_on_idx ON ${schema}.archive(archived_on)`
178✔
268
}
269

270
function trySetMaintenanceTime (schema) {
271
  return trySetTimestamp(schema, 'maintained_on')
182✔
272
}
273

274
function trySetMonitorTime (schema) {
275
  return trySetTimestamp(schema, 'monitored_on')
182✔
276
}
277

278
function trySetCronTime (schema) {
279
  return trySetTimestamp(schema, 'cron_on')
182✔
280
}
281

282
function trySetTimestamp (schema, column) {
283
  return `
546✔
284
    UPDATE ${schema}.version SET ${column} = now()
285
    WHERE EXTRACT( EPOCH FROM (now() - COALESCE(${column}, now() - interval '1 week') ) ) > $1
286
    RETURNING true
287
  `
288
}
289

290
function createQueue (schema) {
291
  return `
157✔
292
    INSERT INTO ${schema}.queue (name, policy, retry_limit, retry_delay, retry_backoff, expire_seconds, retention_minutes, dead_letter)
293
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
294
  `
295
}
296

297
function updateQueue (schema) {
298
  return `
1✔
299
    UPDATE ${schema}.queue SET
300
      retry_limit = COALESCE($2, retry_limit),
301
      retry_delay = COALESCE($3, retry_delay),
302
      retry_backoff = COALESCE($4, retry_backoff),
303
      expire_seconds = COALESCE($5, expire_seconds),
304
      retention_minutes = COALESCE($6, retention_minutes),
305
      dead_letter = COALESCE($7, dead_letter)
306
    WHERE name = $1
307
  `
308
}
309

310
function getQueueByName (schema) {
311
  return `SELECT * FROM ${schema}.queue WHERE name = $1`
187✔
312
}
313

314
function deleteQueueRecords (schema) {
315
  return `WITH dq AS (
2✔
316
      DELETE FROM ${schema}.queue WHERE name = $1
317
    ), ds AS (
318
      DELETE FROM ${schema}.schedule WHERE name = $1
319
    )
320
    DELETE FROM ${schema}.job WHERE name = $1
321
  `
322
}
323

324
function purgeQueue (schema) {
325
  return `DELETE from ${schema}.job WHERE name = $1 and state < '${JOB_STATES.active}'`
3✔
326
}
327

328
function clearStorage (schema) {
329
  return `TRUNCATE ${schema}.job, ${schema}.archive`
2✔
330
}
331

332
function getQueueSize (schema, options = {}) {
6✔
333
  options.before = options.before || JOB_STATES.active
6✔
334
  assert(options.before in JOB_STATES, `${options.before} is not a valid state`)
6✔
335
  return `SELECT count(*) as count FROM ${schema}.job WHERE name = $1 AND state < '${options.before}'`
6✔
336
}
337

338
function createTableQueue (schema) {
339
  return `
178✔
340
    CREATE TABLE ${schema}.queue (
341
      name text primary key,
342
      policy text,
343
      retry_limit int,
344
      retry_delay int,
345
      retry_backoff bool,
346
      expire_seconds int,
347
      retention_minutes int,
348
      dead_letter text,
349
      created_on timestamp with time zone not null default now()
350
    )
351
  `
352
}
353

354
function createTableSchedule (schema) {
355
  return `
178✔
356
    CREATE TABLE ${schema}.schedule (
357
      name text primary key,
358
      cron text not null,
359
      timezone text,
360
      data jsonb,
361
      options jsonb,
362
      created_on timestamp with time zone not null default now(),
363
      updated_on timestamp with time zone not null default now()
364
    )
365
  `
366
}
367

368
function createTableSubscription (schema) {
369
  return `
178✔
370
    CREATE TABLE ${schema}.subscription (
371
      event text not null,
372
      name text not null,
373
      created_on timestamp with time zone not null default now(),
374
      updated_on timestamp with time zone not null default now(),
375
      PRIMARY KEY(event, name)
376
    )
377
  `
378
}
379

380
function getSchedules (schema) {
381
  return `
182✔
382
    SELECT s.*
383
    FROM ${schema}.schedule s
384
      JOIN ${schema}.queue q on s.name = q.name
385
  `
386
}
387

388
function schedule (schema) {
389
  return `
182✔
390
    INSERT INTO ${schema}.schedule (name, cron, timezone, data, options)
391
    VALUES ($1, $2, $3, $4, $5)
392
    ON CONFLICT (name) DO UPDATE SET
393
      cron = EXCLUDED.cron,
394
      timezone = EXCLUDED.timezone,
395
      data = EXCLUDED.data,
396
      options = EXCLUDED.options,
397
      updated_on = now()
398
  `
399
}
400

401
function unschedule (schema) {
402
  return `
182✔
403
    DELETE FROM ${schema}.schedule
404
    WHERE name = $1
405
  `
406
}
407

408
function subscribe (schema) {
409
  return `
182✔
410
    INSERT INTO ${schema}.subscription (event, name)
411
    VALUES ($1, $2)
412
    ON CONFLICT (event, name) DO UPDATE SET
413
      event = EXCLUDED.event,
414
      name = EXCLUDED.name,
415
      updated_on = now()
416
  `
417
}
418

419
function unsubscribe (schema) {
420
  return `
182✔
421
    DELETE FROM ${schema}.subscription
422
    WHERE event = $1 and name = $2
423
  `
424
}
425

426
function getQueuesForEvent (schema) {
427
  return `
182✔
428
    SELECT name FROM ${schema}.subscription
429
    WHERE event = $1
430
  `
431
}
432

433
function getTime () {
434
  return "SELECT round(date_part('epoch', now()) * 1000) as time"
182✔
435
}
436

437
function getVersion (schema) {
438
  return `SELECT version from ${schema}.version`
4✔
439
}
440

441
function setVersion (schema, version) {
UNCOV
442
  return `UPDATE ${schema}.version SET version = '${version}'`
×
443
}
444

445
function versionTableExists (schema) {
446
  return `SELECT to_regclass('${schema}.version') as name`
182✔
447
}
448

449
function insertVersion (schema, version) {
450
  return `INSERT INTO ${schema}.version(version) VALUES ('${version}')`
178✔
451
}
452

453
function fetchNextJob (schema) {
454
  return ({ includeMetadata, priority = true } = {}) => `
25,307!
455
    WITH next as (
456
      SELECT id
457
      FROM ${schema}.job
458
      WHERE name = $1
459
        AND state < '${JOB_STATES.active}'
460
        AND start_after < now()
461
      ORDER BY ${priority && 'priority desc, '} created_on, id
50,614✔
462
      LIMIT $2
463
      FOR UPDATE SKIP LOCKED
464
    )
465
    UPDATE ${schema}.job j SET
466
      state = '${JOB_STATES.active}',
467
      started_on = now(),
468
      retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END
469
    FROM next
470
    WHERE name = $1 AND j.id = next.id
471
    RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns}      
25,307✔
472
  `
473
}
474

475
function completeJobs (schema) {
476
  return `
182✔
477
    WITH results AS (
478
      UPDATE ${schema}.job
479
      SET completed_on = now(),
480
        state = '${JOB_STATES.completed}',
481
        output = $3::jsonb
482
      WHERE name = $1
483
        AND id IN (SELECT UNNEST($2::uuid[]))
484
        AND state = '${JOB_STATES.active}'
485
      RETURNING *
486
    )
487
    SELECT COUNT(*) FROM results
488
  `
489
}
490

491
function failJobsById (schema) {
492
  const where = `name = $1 AND id IN (SELECT UNNEST($2::uuid[])) AND state < '${JOB_STATES.completed}'`
182✔
493
  const output = '$3::jsonb'
182✔
494

495
  return failJobs(schema, where, output)
182✔
496
}
497

498
function failJobsByTimeout (schema) {
499
  const where = `state = '${JOB_STATES.active}' AND (started_on + expire_in) < now()`
182✔
500
  const output = '\'{ "value": { "message": "job failed by timeout in active state" } }\'::jsonb'
182✔
501
  return failJobs(schema, where, output)
182✔
502
}
503

504
function failJobs (schema, where, output) {
505
  return `
364✔
506
    WITH results AS (
507
      UPDATE ${schema}.job SET
508
        state = CASE
509
          WHEN retry_count < retry_limit THEN '${JOB_STATES.retry}'::${schema}.job_state
510
          ELSE '${JOB_STATES.failed}'::${schema}.job_state
511
          END,
512
        completed_on = CASE
513
          WHEN retry_count < retry_limit THEN NULL
514
          ELSE now()
515
          END,
516
        start_after = CASE
517
          WHEN retry_count = retry_limit THEN start_after
518
          WHEN NOT retry_backoff THEN now() + retry_delay * interval '1'
519
          ELSE now() + (
520
                retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 +
521
                retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 * random()
522
            ) * interval '1'
523
          END,
524
        output = ${output}
525
      WHERE ${where}
526
      RETURNING *
527
    ), dlq_jobs as (
528
      INSERT INTO ${schema}.job (name, data, output, retry_limit, keep_until)
529
      SELECT
530
        dead_letter,
531
        data,
532
        output,
533
        retry_limit,
534
        keep_until + (keep_until - start_after)
535
      FROM results
536
      WHERE state = '${JOB_STATES.failed}'
537
        AND dead_letter IS NOT NULL
538
        AND NOT name = dead_letter
539
    )
540
    SELECT COUNT(*) FROM results
541
  `
542
}
543

544
function cancelJobs (schema) {
545
  return `
182✔
546
    with results as (
547
      UPDATE ${schema}.job
548
      SET completed_on = now(),
549
        state = '${JOB_STATES.cancelled}'
550
      WHERE name = $1
551
        AND id IN (SELECT UNNEST($2::uuid[]))
552
        AND state < '${JOB_STATES.completed}'
553
      RETURNING 1
554
    )
555
    SELECT COUNT(*) from results
556
  `
557
}
558

559
function resumeJobs (schema) {
560
  return `
182✔
561
    with results as (
562
      UPDATE ${schema}.job
563
      SET completed_on = NULL,
564
        state = '${JOB_STATES.created}'
565
      WHERE name = $1
566
        AND id IN (SELECT UNNEST($2::uuid[]))
567
      RETURNING 1
568
    )
569
    SELECT COUNT(*) from results
570
  `
571
}
572

573
function insertJob (schema) {
574
  return `
182✔
575
    INSERT INTO ${schema}.job (
576
      id,
577
      name,
578
      data,
579
      priority,
580
      start_after,
581
      singleton_key,
582
      singleton_on,
583
      dead_letter,
584
      expire_in,
585
      keep_until,
586
      retry_limit,
587
      retry_delay,
588
      retry_backoff,
589
      policy
590
    )
591
    SELECT
592
      id,
593
      j.name,
594
      data,
595
      priority,
596
      start_after,
597
      singleton_key,
598
      singleton_on,
599
      COALESCE(j.dead_letter, q.dead_letter) as dead_letter,
600
      CASE
601
        WHEN expire_in IS NOT NULL THEN CAST(expire_in as interval)
602
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
603
        WHEN expire_in_default IS NOT NULL THEN CAST(expire_in_default as interval)
604
        ELSE interval '15 minutes'
605
        END as expire_in,
606
      CASE
607
        WHEN right(keep_until, 1) = 'Z' THEN CAST(keep_until as timestamp with time zone)
608
        ELSE start_after + CAST(COALESCE(keep_until, (q.retention_minutes * 60)::text, keep_until_default, '14 days') as interval)
609
        END as keep_until,
610
      COALESCE(j.retry_limit, q.retry_limit, retry_limit_default, 2) as retry_limit,
611
      CASE
612
        WHEN COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false)
613
        THEN GREATEST(COALESCE(j.retry_delay, q.retry_delay, retry_delay_default), 1)
614
        ELSE COALESCE(j.retry_delay, q.retry_delay, retry_delay_default, 0)
615
        END as retry_delay,
616
      COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false) as retry_backoff,
617
      q.policy
618
    FROM
619
      ( SELECT
620
          COALESCE($1::uuid, gen_random_uuid()) as id,
621
          $2 as name,
622
          $3::jsonb as data,
623
          COALESCE($4::int, 0) as priority,
624
          CASE
625
            WHEN right($5, 1) = 'Z' THEN CAST($5 as timestamp with time zone)
626
            ELSE now() + CAST(COALESCE($5,'0') as interval)
627
            END as start_after,
628
          $6 as singleton_key,
629
          CASE
630
            WHEN $7::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($7 * floor((date_part('epoch', now()) + $8) / $7))
631
            ELSE NULL
632
            END as singleton_on,
633
          $9 as dead_letter,
634
          $10 as expire_in,
635
          $11 as expire_in_default,
636
          $12 as keep_until,
637
          $13 as keep_until_default,
638
          $14::int as retry_limit,
639
          $15::int as retry_limit_default,
640
          $16::int as retry_delay,
641
          $17::int as retry_delay_default,
642
          $18::bool as retry_backoff,
643
          $19::bool as retry_backoff_default
644
      ) j LEFT JOIN ${schema}.queue q ON j.name = q.name
645
    ON CONFLICT DO NOTHING
646
    RETURNING id
647
  `
648
}
649

650
function insertJobs (schema) {
651
  return `
182✔
652
    WITH defaults as (
653
      SELECT 
654
        $2 as expire_in,
655
        $3 as keep_until,
656
        $4::int as retry_limit,
657
        $5::int as retry_delay,
658
        $6::bool as retry_backoff
659
    )
660
    INSERT INTO ${schema}.job (
661
      id,
662
      name,
663
      data,
664
      priority,
665
      start_after,
666
      singleton_key,
667
      dead_letter,
668
      expire_in,
669
      keep_until,
670
      retry_limit,
671
      retry_delay,
672
      retry_backoff,
673
      policy
674
    )
675
    SELECT
676
      COALESCE(id, gen_random_uuid()) as id,
677
      j.name,
678
      data,
679
      COALESCE(priority, 0),
680
      COALESCE("startAfter", now()),
681
      "singletonKey",
682
      COALESCE("deadLetter", q.dead_letter),
683
      CASE
684
        WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" *  interval '1s'
685
        WHEN q.expire_seconds IS NOT NULL THEN q.expire_seconds * interval '1s'
686
        WHEN defaults.expire_in IS NOT NULL THEN CAST(defaults.expire_in as interval)
687
        ELSE interval '15 minutes'
688
        END as expire_in,
689
      CASE
690
        WHEN "keepUntil" IS NOT NULL THEN "keepUntil"
691
        ELSE COALESCE("startAfter", now()) + CAST(COALESCE((q.retention_minutes * 60)::text, defaults.keep_until, '14 days') as interval)
692
        END as keep_until,
693
      COALESCE("retryLimit", q.retry_limit, defaults.retry_limit, 2),
694
      CASE
695
        WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false)
696
          THEN GREATEST(COALESCE("retryDelay", q.retry_delay, defaults.retry_delay), 1)
697
        ELSE COALESCE("retryDelay", q.retry_delay, defaults.retry_delay, 0)
698
        END as retry_delay,      
699
      COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) as retry_backoff,
700
      q.policy
701
    FROM json_to_recordset($1) as j (
702
      id uuid,
703
      name text,
704
      priority integer,
705
      data jsonb,
706
      "startAfter" timestamp with time zone,
707
      "retryLimit" integer,
708
      "retryDelay" integer,
709
      "retryBackoff" boolean,
710
      "singletonKey" text,
711
      "expireInSeconds" integer,
712
      "keepUntil" timestamp with time zone,
713
      "deadLetter" text
714
    )
715
    LEFT JOIN ${schema}.queue q ON j.name = q.name,
716
      defaults
717
    ON CONFLICT DO NOTHING
718
  `
719
}
720

721
function drop (schema, interval) {
722
  return `
182✔
723
    DELETE FROM ${schema}.archive
724
    WHERE archived_on < (now() - interval '${interval}')
725
  `
726
}
727

728
function archive (schema, completedInterval, failedInterval = completedInterval) {
×
729
  const columns = 'id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff, start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on, keep_until, dead_letter, policy, output'
182✔
730

731
  return `
182✔
732
    WITH archived_rows AS (
733
      DELETE FROM ${schema}.job
734
      WHERE (state <> '${JOB_STATES.failed}' AND completed_on < (now() - interval '${completedInterval}'))
735
        OR (state = '${JOB_STATES.failed}' AND completed_on < (now() - interval '${failedInterval}'))
736
        OR (state < '${JOB_STATES.active}' AND keep_until < now())
737
      RETURNING *
738
    )
739
    INSERT INTO ${schema}.archive (${columns})
740
    SELECT ${columns}
741
    FROM archived_rows
742
  `
743
}
744

745
function countStates (schema) {
746
  return `
182✔
747
    SELECT name, state, count(*) size
748
    FROM ${schema}.job
749
    GROUP BY rollup(name), rollup(state)
750
  `
751
}
752

753
function locked (schema, query) {
754
  if (Array.isArray(query)) {
724✔
755
    query = query.join(';\n')
178✔
756
  }
757

758
  return `
724✔
759
    BEGIN;
760
    SET LOCAL lock_timeout = '30s';
761
    SET LOCAL idle_in_transaction_session_timeout = '30s';
762
    ${advisoryLock(schema)};
763
    ${query};
764
    COMMIT;
765
  `
766
}
767

768
function advisoryLock (schema, key) {
769
  return `SELECT pg_advisory_xact_lock(      
724✔
770
      ('x' || encode(sha224((current_database() || '.pgboss.${schema}${key || ''}')::bytea), 'hex'))::bit(64)::bigint
1,448✔
771
  )`
772
}
773

774
function assertMigration (schema, version) {
775
  // raises 'division by zero' if already on desired schema version
UNCOV
776
  return `SELECT version::int/(version::int-${version}) from ${schema}.version`
×
777
}
778

779
function getJobById (schema) {
780
  return getJobByTableQueueId(schema, 'job')
182✔
781
}
782

783
function getArchivedJobById (schema) {
784
  return getJobByTableQueueId(schema, 'archive')
182✔
785
}
786

787
function getJobByTableQueueId (schema, table) {
788
  return `SELECT ${allJobColumns} FROM ${schema}.${table} WHERE name = $1 AND id = $2`
364✔
789
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc