• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 26887874376

03 Jun 2026 01:27PM UTC coverage: 75.379% (+0.03%) from 75.354%
26887874376

push

github

web-flow
refactor: get rid of pg queue app name (#959)

4266 of 6233 branches covered (68.44%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

8449 of 10635 relevant lines covered (79.45%)

366.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.89
/src/internal/queue/queue.ts
1
import { ERRORS } from '@internal/errors'
2
import { QueueDB } from '@internal/queue/database'
3
import { Semaphore } from '@shopify/semaphore'
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
5
import { getConfig } from '../../config'
6
import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring'
7
import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics'
8
import { Event } from './event'
9

10
type RegisteredEvent = {
11
  deadLetterQueueName(): string
12
  getQueueName(): string
13
  getQueueOptions(): ReturnType<typeof Event.getQueueOptions>
14
  getWorkerOptions(): ReturnType<typeof Event.getWorkerOptions>
15
  handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }): unknown
16
  onClose(): unknown
17
  onStart(): unknown
18
  name: string
19
}
20

21
export const PG_BOSS_SCHEMA = 'pgboss_v10'
55✔
22

23
export abstract class Queue {
24
  protected static events: RegisteredEvent[] = []
55✔
25
  private static pgBoss?: PgBoss
26
  private static pgBossDb?: PgBoss.Db
27

28
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
29
    const {
30
      isMultitenant,
31
      databaseURL,
32
      multitenantDatabasePoolUrl,
33
      multitenantDatabaseUrl,
34
      pgQueueConnectionURL,
35
      pgQueueArchiveCompletedAfterSeconds,
36
      pgQueueDeleteAfterDays,
37
      pgQueueDeleteAfterHours,
38
      pgQueueRetentionDays,
×
39
    } = getConfig()
40

41
    let url = pgQueueConnectionURL ?? databaseURL
×
42
    let migrate = true
×
43

44
    if (isMultitenant && !pgQueueConnectionURL) {
×
45
      if (!multitenantDatabaseUrl) {
×
46
        throw new Error(
×
47
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
48
        )
49
      }
50
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
51

52
      if (multitenantDatabasePoolUrl) {
×
53
        migrate = false
×
54
      }
55
    }
56

57
    return new PgBoss({
×
58
      connectionString: url,
59
      migrate,
60
      db: opts.db,
61
      schema: PG_BOSS_SCHEMA,
62
      ...(pgQueueDeleteAfterHours
×
63
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
64
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
65
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
66
      retentionDays: pgQueueRetentionDays,
67
      retryBackoff: true,
68
      retryLimit: 20,
69
      expireInHours: 23,
70
      maintenanceIntervalSeconds: 60 * 5,
71
      schedule: opts.enableWorkers,
72
      supervise: opts.enableWorkers,
73
    })
74
  }
75

76
  static async start(opts: {
77
    signal?: AbortSignal
78
    onMessage?: (job: Job) => void
79
    registerWorkers?: () => void
80
  }) {
81
    if (Queue.pgBoss) {
×
82
      return Queue.pgBoss
×
83
    }
84

85
    if (opts.signal?.aborted) {
×
86
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
87
    }
88

89
    const {
90
      isMultitenant,
91
      databaseURL,
92
      multitenantDatabaseUrl,
93
      multitenantDatabasePoolUrl,
94
      pgQueueConnectionURL,
95
      pgQueueEnableWorkers,
96
      pgQueueReadWriteTimeout,
97
      pgQueueConcurrentTasksPerQueue,
98
      pgQueueMaxConnections,
NEW
99
      databaseApplicationName,
×
100
    } = getConfig()
101

102
    let url = pgQueueConnectionURL || databaseURL
×
103

104
    if (isMultitenant && !pgQueueConnectionURL) {
×
105
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
106
        throw new Error(
×
107
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
108
        )
109
      }
110
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
111
    }
112

113
    Queue.pgBossDb = new QueueDB({
×
114
      min: 0,
115
      max: pgQueueMaxConnections,
116
      connectionString: url,
117
      application_name: databaseApplicationName,
118
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
119
    })
120

121
    Queue.pgBoss = this.createPgBoss({
×
122
      db: Queue.pgBossDb,
123
      enableWorkers: pgQueueEnableWorkers !== false,
124
    })
125

126
    Queue.pgBoss.on('error', (error) => {
×
127
      logSchema.error(logger, '[Queue] error', {
×
128
        type: 'queue',
129
        error,
130
      })
131
    })
132

133
    await Queue.pgBoss.start()
×
134

135
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
136
      opts.registerWorkers()
×
137
    }
138

139
    await Queue.callStart()
×
140
    await Queue.startWorkers({
×
141
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
142
      onMessage: opts.onMessage,
143
      signal: opts.signal,
144
    })
145

146
    if (opts.signal) {
×
147
      opts.signal.addEventListener(
×
148
        'abort',
149
        async () => {
150
          logSchema.info(logger, '[Queue] Stopping', {
×
151
            type: 'queue',
152
          })
153
          return Queue.stop()
×
154
            .then(async () => {
155
              logSchema.info(logger, '[Queue] Exited', {
×
156
                type: 'queue',
157
              })
158
            })
159
            .catch((e) => {
160
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
161
                error: e,
162
                type: 'queue',
163
              })
164
            })
165
            .finally(async () => {
166
              await Queue.callClose().catch(() => {
×
167
                // no-op
168
              })
169
            })
170
        },
171
        { once: true }
172
      )
173
    }
174

175
    return Queue.pgBoss
×
176
  }
177

178
  static getInstance() {
179
    if (!this.pgBoss) {
×
180
      throw new Error('pg boss not initialised')
×
181
    }
182

183
    return this.pgBoss
×
184
  }
185

186
  static getDb() {
187
    if (!this.pgBossDb) {
×
188
      throw new Error('pg boss not initialised')
×
189
    }
190

191
    return this.pgBossDb
×
192
  }
193

194
  static register<T extends RegisteredEvent>(event: T) {
195
    Queue.events.push(event)
×
196
  }
197

198
  static async stop() {
199
    if (!this.pgBoss) {
×
200
      return
×
201
    }
202

203
    const boss = this.pgBoss
×
204
    const { isProduction } = getConfig()
×
205

206
    await boss.stop({
×
207
      timeout: 20 * 1000,
208
      graceful: isProduction,
209
      wait: true,
210
    })
211

212
    await this.callClose()
×
213

214
    Queue.pgBoss = undefined
×
215
  }
216

217
  protected static async startWorkers(opts: {
218
    maxConcurrentTasks: number
219
    signal?: AbortSignal
220
    onMessage?: (job: Job) => void
221
  }) {
222
    for (const event of Queue.events) {
×
223
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
224
    }
225
  }
226

227
  protected static callStart() {
228
    const events = Queue.events.map((event) => {
×
229
      return event.onStart()
×
230
    })
231

232
    return Promise.all(events)
×
233
  }
234

235
  protected static callClose() {
236
    const events = Queue.events.map((event) => {
×
237
      return event.onClose()
×
238
    })
239

240
    return Promise.all(events)
×
241
  }
242

243
  protected static async registerTask(
244
    event: RegisteredEvent,
245
    maxConcurrentTasks: number,
246
    onMessage?: (job: Job) => void,
247
    signal?: AbortSignal
248
  ) {
249
    const queueName = event.getQueueName()
×
250
    const deadLetterName = event.deadLetterQueueName()
×
251

252
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
253
    try {
×
254
      // Create dead-letter queue and the normal queue
255
      const queueOptions = {
×
256
        policy: 'standard',
257
        ...event.getQueueOptions(),
258
      } as const
259

260
      // dead-letter
261
      await this.pgBoss?.createQueue(deadLetterName, {
×
262
        ...queueOptions,
263
        name: deadLetterName,
264
        retentionDays: 30,
265
        retryBackoff: true,
266
      })
267

268
      // // normal queue
269
      await this.pgBoss?.createQueue(queueName, {
×
270
        ...queueOptions,
271
        name: queueName,
272
        deadLetter: deadLetterName,
273
      })
274
    } catch {
275
      // no-op
276
    }
277

278
    return this.pollQueue(event, {
×
279
      concurrentTaskCount,
280
      onMessage,
281
      signal,
282
    })
283
  }
284

285
  protected static pollQueue(
286
    event: RegisteredEvent,
287
    queueOpts: {
288
      concurrentTaskCount: number
289
      onMessage?: (job: Job) => void
290
      signal?: AbortSignal
291
    }
292
  ) {
293
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
3✔
294
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
3!
295
    const batchSize =
296
      event.getWorkerOptions().batchSize ||
3✔
297
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
298

299
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
3✔
300
      type: 'queue',
301
      metadata: JSON.stringify({
302
        queueName: event.getQueueName(),
303
        batchSize,
304
        pollingInterval,
305
      }),
306
    })
307

308
    let started = false
3✔
309
    let jobFetched = 0
3✔
310

311
    const interval = setInterval(async () => {
3✔
312
      if (started) {
3!
313
        return
×
314
      }
315

316
      try {
3✔
317
        started = true
3✔
318
        const defaultFetch = {
3✔
319
          includeMetadata: true,
320
          batchSize,
321
        }
322

323
        const currentBatch = defaultFetch.batchSize - jobFetched
3✔
324

325
        if (currentBatch <= 0) {
3!
326
          return
×
327
        }
328

329
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
3✔
330
          ...event.getWorkerOptions(),
331
          ...defaultFetch,
332
          batchSize: currentBatch,
333
        })
334

335
        jobFetched += jobs?.length || 0
3!
336

337
        if (jobFetched < defaultFetch.batchSize) {
3!
338
          started = false
3✔
339
        }
340

341
        if (queueOpts.signal?.aborted) {
3!
342
          started = false
×
343
          return
×
344
        }
345

346
        if (!jobs || (jobs && jobs.length === 0)) {
3!
347
          started = false
×
348
          return
×
349
        }
350

351
        await Promise.allSettled(
3✔
352
          jobs.map(async (job) => {
353
            const lock = await semaphore.acquire()
3✔
354
            const sbReqId = getSbReqIdFromPayload(job.data)
3✔
355

356
            try {
3✔
357
              queueOpts.onMessage?.(job as Job)
3✔
358

359
              await event.handle(job, { signal: queueOpts.signal })
3✔
360

361
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
362
              queueJobCompleted.add(1, {
×
363
                name: event.getQueueName(),
364
              })
365
            } catch (e) {
366
              queueJobRetryFailed.add(1, {
3✔
367
                name: event.getQueueName(),
368
              })
369

370
              await this.pgBoss?.fail(event.getQueueName(), job.id)
3✔
371

372
              try {
3✔
373
                const dbJob: JobWithMetadata | null =
374
                  (job as JobWithMetadata).priority !== undefined
3!
375
                    ? (job as JobWithMetadata)
376
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
377

378
                if (!dbJob) {
×
379
                  return
×
380
                }
381
                if (dbJob.retryCount >= dbJob.retryLimit) {
3!
382
                  queueJobError.add(1, {
×
383
                    name: event.getQueueName(),
384
                  })
385
                }
386
              } catch (e) {
387
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
388
                  type: 'queue-task',
389
                  error: e,
390
                  metadata: JSON.stringify(job),
391
                  sbReqId,
392
                })
393
              }
394

395
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
3✔
396
                type: 'queue-task',
397
                error: e,
398
                metadata: JSON.stringify(job),
399
                sbReqId,
400
              })
401

402
              throw e
3✔
403
            } finally {
404
              jobFetched = Math.max(0, jobFetched - 1)
3✔
405
              await lock.release()
3✔
406
            }
407
          })
408
        )
409
      } catch (e) {
410
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
411
          type: 'queue',
412
          error: e,
413
          metadata: JSON.stringify({
414
            queueName: event.getQueueName(),
415
            batchSize,
416
            pollingInterval,
417
          }),
418
        })
419
      } finally {
420
        started = false
3✔
421
      }
422
    }, pollingInterval)
423

424
    queueOpts.signal?.addEventListener('abort', () => {
3✔
425
      clearInterval(interval)
3✔
426
    })
427
  }
428
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc