• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 26155689234

20 May 2026 10:06AM UTC coverage: 57.697% (-17.2%) from 74.874%
26155689234

Pull #1115

github

web-flow
Merge d6a84934b into 9a13d3ea2
Pull Request #1115: Encapsulate storage database transaction APIs

2784 of 5519 branches covered (50.44%)

Branch coverage included in aggregate %.

122 of 145 new or added lines in 7 files covered. (84.14%)

1665 existing lines in 69 files now uncovered.

5938 of 9598 relevant lines covered (61.87%)

407.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.05
/src/internal/queue/queue.ts
1
import { ERRORS } from '@internal/errors'
2
import { QueueDB } from '@internal/queue/database'
3
import { Semaphore } from '@shopify/semaphore'
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
5
import { getConfig } from '../../config'
6
import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring'
7
import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics'
8
import { Event } from './event'
9

10
type RegisteredEvent = {
11
  deadLetterQueueName(): string
12
  getQueueName(): string
13
  getQueueOptions(): ReturnType<typeof Event.getQueueOptions>
14
  getWorkerOptions(): ReturnType<typeof Event.getWorkerOptions>
15
  handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }): unknown
16
  onClose(): unknown
17
  onStart(): unknown
18
  name: string
19
}
20

21
export const PG_BOSS_SCHEMA = 'pgboss_v10'
33✔
22

23
export abstract class Queue {
24
  protected static events: RegisteredEvent[] = []
33✔
25
  private static pgBoss?: PgBoss
26
  private static pgBossDb?: PgBoss.Db
27

28
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
29
    const {
30
      isMultitenant,
31
      databaseURL,
32
      multitenantDatabasePoolUrl,
33
      multitenantDatabaseUrl,
34
      pgQueueConnectionURL,
35
      pgQueueArchiveCompletedAfterSeconds,
36
      pgQueueDeleteAfterDays,
37
      pgQueueDeleteAfterHours,
38
      pgQueueRetentionDays,
×
39
    } = getConfig()
40

41
    let url = pgQueueConnectionURL ?? databaseURL
×
42
    let migrate = true
×
43

44
    if (isMultitenant && !pgQueueConnectionURL) {
×
45
      if (!multitenantDatabaseUrl) {
×
46
        throw new Error(
×
47
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
48
        )
49
      }
50
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
51

52
      if (multitenantDatabasePoolUrl) {
×
53
        migrate = false
×
54
      }
55
    }
56

57
    return new PgBoss({
×
58
      connectionString: url,
59
      migrate,
60
      db: opts.db,
61
      schema: PG_BOSS_SCHEMA,
62
      ...(pgQueueDeleteAfterHours
×
63
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
64
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
65
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
66
      retentionDays: pgQueueRetentionDays,
67
      retryBackoff: true,
68
      retryLimit: 20,
69
      expireInHours: 23,
70
      maintenanceIntervalSeconds: 60 * 5,
71
      schedule: opts.enableWorkers,
72
      supervise: opts.enableWorkers,
73
    })
74
  }
75

76
  static async start(opts: {
77
    signal?: AbortSignal
78
    onMessage?: (job: Job) => void
79
    registerWorkers?: () => void
80
  }) {
81
    if (Queue.pgBoss) {
×
82
      return Queue.pgBoss
×
83
    }
84

85
    if (opts.signal?.aborted) {
×
86
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
87
    }
88

89
    const {
90
      isMultitenant,
91
      databaseURL,
92
      multitenantDatabaseUrl,
93
      multitenantDatabasePoolUrl,
94
      pgQueueConnectionURL,
95
      pgQueueEnableWorkers,
96
      pgQueueReadWriteTimeout,
97
      pgQueueConcurrentTasksPerQueue,
98
      pgQueueMaxConnections,
×
99
    } = getConfig()
100

101
    let url = pgQueueConnectionURL || databaseURL
×
102

103
    if (isMultitenant && !pgQueueConnectionURL) {
×
104
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
105
        throw new Error(
×
106
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
107
        )
108
      }
109
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
110
    }
111

112
    Queue.pgBossDb = new QueueDB({
×
113
      min: 0,
114
      max: pgQueueMaxConnections,
115
      connectionString: url,
116
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
117
    })
118

119
    Queue.pgBoss = this.createPgBoss({
×
120
      db: Queue.pgBossDb,
121
      enableWorkers: pgQueueEnableWorkers !== false,
122
    })
123

124
    Queue.pgBoss.on('error', (error) => {
×
125
      logSchema.error(logger, '[Queue] error', {
×
126
        type: 'queue',
127
        error,
128
      })
129
    })
130

131
    await Queue.pgBoss.start()
×
132

133
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
134
      opts.registerWorkers()
×
135
    }
136

137
    await Queue.callStart()
×
138
    await Queue.startWorkers({
×
139
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
140
      onMessage: opts.onMessage,
141
      signal: opts.signal,
142
    })
143

144
    if (opts.signal) {
×
145
      opts.signal.addEventListener(
×
146
        'abort',
147
        async () => {
148
          logSchema.info(logger, '[Queue] Stopping', {
×
149
            type: 'queue',
150
          })
151
          return Queue.stop()
×
152
            .then(async () => {
153
              logSchema.info(logger, '[Queue] Exited', {
×
154
                type: 'queue',
155
              })
156
            })
157
            .catch((e) => {
158
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
159
                error: e,
160
                type: 'queue',
161
              })
162
            })
163
            .finally(async () => {
164
              await Queue.callClose().catch(() => {
×
165
                // no-op
166
              })
167
            })
168
        },
169
        { once: true }
170
      )
171
    }
172

173
    return Queue.pgBoss
×
174
  }
175

176
  static getInstance() {
177
    if (!this.pgBoss) {
×
178
      throw new Error('pg boss not initialised')
×
179
    }
180

181
    return this.pgBoss
×
182
  }
183

184
  static getDb() {
185
    if (!this.pgBossDb) {
×
186
      throw new Error('pg boss not initialised')
×
187
    }
188

189
    return this.pgBossDb
×
190
  }
191

192
  static register<T extends RegisteredEvent>(event: T) {
193
    Queue.events.push(event)
×
194
  }
195

196
  static async stop() {
197
    if (!this.pgBoss) {
×
198
      return
×
199
    }
200

201
    const boss = this.pgBoss
×
202
    const { isProduction } = getConfig()
×
203

204
    await boss.stop({
×
205
      timeout: 20 * 1000,
206
      graceful: isProduction,
207
      wait: true,
208
    })
209

210
    await this.callClose()
×
211

212
    Queue.pgBoss = undefined
×
213
  }
214

215
  protected static async startWorkers(opts: {
216
    maxConcurrentTasks: number
217
    signal?: AbortSignal
218
    onMessage?: (job: Job) => void
219
  }) {
220
    for (const event of Queue.events) {
×
221
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
222
    }
223
  }
224

225
  protected static callStart() {
226
    const events = Queue.events.map((event) => {
×
227
      return event.onStart()
×
228
    })
229

230
    return Promise.all(events)
×
231
  }
232

233
  protected static callClose() {
234
    const events = Queue.events.map((event) => {
×
235
      return event.onClose()
×
236
    })
237

238
    return Promise.all(events)
×
239
  }
240

241
  protected static async registerTask(
242
    event: RegisteredEvent,
243
    maxConcurrentTasks: number,
244
    onMessage?: (job: Job) => void,
245
    signal?: AbortSignal
246
  ) {
247
    const queueName = event.getQueueName()
×
248
    const deadLetterName = event.deadLetterQueueName()
×
249

250
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
251
    try {
×
252
      // Create dead-letter queue and the normal queue
253
      const queueOptions = {
×
254
        policy: 'standard',
255
        ...event.getQueueOptions(),
256
      } as const
257

258
      // dead-letter
259
      await this.pgBoss?.createQueue(deadLetterName, {
×
260
        ...queueOptions,
261
        name: deadLetterName,
262
        retentionDays: 30,
263
        retryBackoff: true,
264
      })
265

266
      // // normal queue
267
      await this.pgBoss?.createQueue(queueName, {
×
268
        ...queueOptions,
269
        name: queueName,
270
        deadLetter: deadLetterName,
271
      })
272
    } catch {
273
      // no-op
274
    }
275

276
    return this.pollQueue(event, {
×
277
      concurrentTaskCount,
278
      onMessage,
279
      signal,
280
    })
281
  }
282

283
  protected static pollQueue(
284
    event: RegisteredEvent,
285
    queueOpts: {
286
      concurrentTaskCount: number
287
      onMessage?: (job: Job) => void
288
      signal?: AbortSignal
289
    }
290
  ) {
UNCOV
291
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
UNCOV
292
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
293
    const batchSize =
UNCOV
294
      event.getWorkerOptions().batchSize ||
×
295
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
296

UNCOV
297
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
298
      type: 'queue',
299
      metadata: JSON.stringify({
300
        queueName: event.getQueueName(),
301
        batchSize,
302
        pollingInterval,
303
      }),
304
    })
305

UNCOV
306
    let started = false
×
UNCOV
307
    let jobFetched = 0
×
308

UNCOV
309
    const interval = setInterval(async () => {
×
UNCOV
310
      if (started) {
×
311
        return
×
312
      }
313

UNCOV
314
      try {
×
UNCOV
315
        started = true
×
UNCOV
316
        const defaultFetch = {
×
317
          includeMetadata: true,
318
          batchSize,
319
        }
320

UNCOV
321
        const currentBatch = defaultFetch.batchSize - jobFetched
×
322

UNCOV
323
        if (currentBatch <= 0) {
×
324
          return
×
325
        }
326

UNCOV
327
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
328
          ...event.getWorkerOptions(),
329
          ...defaultFetch,
330
          batchSize: currentBatch,
331
        })
332

UNCOV
333
        jobFetched += jobs?.length || 0
×
334

UNCOV
335
        if (jobFetched < defaultFetch.batchSize) {
×
UNCOV
336
          started = false
×
337
        }
338

UNCOV
339
        if (queueOpts.signal?.aborted) {
×
340
          started = false
×
341
          return
×
342
        }
343

UNCOV
344
        if (!jobs || (jobs && jobs.length === 0)) {
×
345
          started = false
×
346
          return
×
347
        }
348

UNCOV
349
        await Promise.allSettled(
×
350
          jobs.map(async (job) => {
UNCOV
351
            const lock = await semaphore.acquire()
×
UNCOV
352
            const sbReqId = getSbReqIdFromPayload(job.data)
×
353

UNCOV
354
            try {
×
UNCOV
355
              queueOpts.onMessage?.(job as Job)
×
356

UNCOV
357
              await event.handle(job, { signal: queueOpts.signal })
×
358

359
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
360
              queueJobCompleted.add(1, {
×
361
                name: event.getQueueName(),
362
              })
363
            } catch (e) {
UNCOV
364
              queueJobRetryFailed.add(1, {
×
365
                name: event.getQueueName(),
366
              })
367

UNCOV
368
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
369

UNCOV
370
              try {
×
371
                const dbJob: JobWithMetadata | null =
UNCOV
372
                  (job as JobWithMetadata).priority !== undefined
×
373
                    ? (job as JobWithMetadata)
374
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
375

376
                if (!dbJob) {
×
377
                  return
×
378
                }
UNCOV
379
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
380
                  queueJobError.add(1, {
×
381
                    name: event.getQueueName(),
382
                  })
383
                }
384
              } catch (e) {
385
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
386
                  type: 'queue-task',
387
                  error: e,
388
                  metadata: JSON.stringify(job),
389
                  sbReqId,
390
                })
391
              }
392

UNCOV
393
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
394
                type: 'queue-task',
395
                error: e,
396
                metadata: JSON.stringify(job),
397
                sbReqId,
398
              })
399

UNCOV
400
              throw e
×
401
            } finally {
UNCOV
402
              jobFetched = Math.max(0, jobFetched - 1)
×
UNCOV
403
              await lock.release()
×
404
            }
405
          })
406
        )
407
      } catch (e) {
408
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
409
          type: 'queue',
410
          error: e,
411
          metadata: JSON.stringify({
412
            queueName: event.getQueueName(),
413
            batchSize,
414
            pollingInterval,
415
          }),
416
        })
417
      } finally {
UNCOV
418
        started = false
×
419
      }
420
    }, pollingInterval)
421

UNCOV
422
    queueOpts.signal?.addEventListener('abort', () => {
×
UNCOV
423
      clearInterval(interval)
×
424
    })
425
  }
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc