• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 28591991730

02 Jul 2026 01:00PM UTC coverage: 78.877% (+0.06%) from 78.821%
28591991730

Pull #1193

github

web-flow
Merge 24e8f5321 into 3ce1ad0bc
Pull Request #1193: fix: replace mutex with single flight pattern

5106 of 7036 branches covered (72.57%)

Branch coverage included in aggregate %.

80 of 95 new or added lines in 9 files covered. (84.21%)

2 existing lines in 2 files now uncovered.

10103 of 12246 relevant lines covered (82.5%)

420.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.49
/src/internal/queue/queue.ts
1
import { createConcurrencyLimiter, wait } from '@internal/concurrency'
2
import { ERRORS } from '@internal/errors'
3
import { QueueDB } from '@internal/queue/database'
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
5
import { getConfig } from '../../config'
6
import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring'
7
import {
8
  queueJobCompleted,
9
  queueJobCompleteFailed,
10
  queueJobError,
11
  queueJobRetryFailed,
12
} from '../monitoring/metrics'
13
import { Event } from './event'
14

15
type RegisteredEvent = {
16
  deadLetterQueueName(): string
17
  getQueueName(): string
18
  getQueueOptions(): ReturnType<typeof Event.getQueueOptions>
19
  getWorkerOptions(): ReturnType<typeof Event.getWorkerOptions>
20
  handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }): unknown
21
  onClose(): unknown
22
  onStart(): unknown
23
  name: string
24
}
25

26
export const PG_BOSS_SCHEMA = 'pgboss_v10'
80✔
27
const queueStopTimeoutMs = 25_000
80✔
28
const jobAckMaxAttempts = 3
80✔
29
const jobAckRetryDelayMs = 250
80✔
30

31
export abstract class Queue {
32
  protected static events: RegisteredEvent[] = []
80✔
33
  private static pgBoss?: PgBoss
34
  private static pgBossDb?: PgBoss.Db
35

36
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
37
    const {
38
      isMultitenant,
39
      databaseURL,
40
      multitenantDatabasePoolUrl,
41
      multitenantDatabaseUrl,
42
      pgQueueConnectionURL,
43
      pgQueueArchiveCompletedAfterSeconds,
44
      pgQueueDeleteAfterDays,
45
      pgQueueDeleteAfterHours,
46
      pgQueueRetentionDays,
×
47
    } = getConfig()
48

49
    let url = pgQueueConnectionURL ?? databaseURL
×
50
    let migrate = true
×
51

52
    if (isMultitenant && !pgQueueConnectionURL) {
×
53
      if (!multitenantDatabaseUrl) {
×
54
        throw new Error(
×
55
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
56
        )
57
      }
58
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
59

60
      if (multitenantDatabasePoolUrl) {
×
61
        migrate = false
×
62
      }
63
    }
64

65
    return new PgBoss({
×
66
      connectionString: url,
67
      migrate,
68
      db: opts.db,
69
      schema: PG_BOSS_SCHEMA,
70
      ...(pgQueueDeleteAfterHours
×
71
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
72
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
73
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
74
      retentionDays: pgQueueRetentionDays,
75
      retryBackoff: true,
76
      retryLimit: 20,
77
      expireInHours: 23,
78
      maintenanceIntervalSeconds: 60 * 5,
79
      schedule: opts.enableWorkers,
80
      supervise: opts.enableWorkers,
81
    })
82
  }
83

84
  static async start(opts: {
85
    signal?: AbortSignal
86
    onMessage?: (job: Job) => void
87
    registerWorkers?: () => void
88
  }) {
89
    if (Queue.pgBoss) {
×
90
      return Queue.pgBoss
×
91
    }
92

93
    if (opts.signal?.aborted) {
×
94
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
95
    }
96

97
    const {
98
      isMultitenant,
99
      databaseURL,
100
      multitenantDatabaseUrl,
101
      multitenantDatabasePoolUrl,
102
      pgQueueConnectionURL,
103
      pgQueueEnableWorkers,
104
      pgQueueReadWriteTimeout,
105
      pgQueueConcurrentTasksPerQueue,
106
      pgQueueMaxConnections,
107
      databaseApplicationName,
×
108
    } = getConfig()
109

110
    let url = pgQueueConnectionURL || databaseURL
×
111

112
    if (isMultitenant && !pgQueueConnectionURL) {
×
113
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
114
        throw new Error(
×
115
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
116
        )
117
      }
118
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
119
    }
120

121
    Queue.pgBossDb = new QueueDB({
×
122
      min: 0,
123
      max: pgQueueMaxConnections,
124
      connectionString: url,
125
      application_name: databaseApplicationName,
126
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
127
    })
128

129
    Queue.pgBoss = this.createPgBoss({
×
130
      db: Queue.pgBossDb,
131
      enableWorkers: pgQueueEnableWorkers !== false,
132
    })
133

134
    Queue.pgBoss.on('error', (error) => {
×
135
      logSchema.error(logger, '[Queue] error', {
×
136
        type: 'queue',
137
        error,
138
      })
139
    })
140

141
    await Queue.pgBoss.start()
×
142

143
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
144
      opts.registerWorkers()
×
145
    }
146

147
    await Queue.callStart()
×
148
    await Queue.startWorkers({
×
149
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
150
      onMessage: opts.onMessage,
151
      signal: opts.signal,
152
    })
153

154
    if (opts.signal) {
×
155
      opts.signal.addEventListener(
×
156
        'abort',
157
        async () => {
158
          logSchema.info(logger, '[Queue] Stopping', {
×
159
            type: 'queue',
160
          })
161
          return Queue.stop()
×
162
            .then(async () => {
163
              logSchema.info(logger, '[Queue] Exited', {
×
164
                type: 'queue',
165
              })
166
            })
167
            .catch((e) => {
168
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
169
                error: e,
170
                type: 'queue',
171
              })
172
            })
173
        },
174
        { once: true }
175
      )
176
    }
177

178
    return Queue.pgBoss
×
179
  }
180

181
  static getInstance() {
182
    if (!this.pgBoss) {
×
183
      throw new Error('pg boss not initialised')
×
184
    }
185

186
    return this.pgBoss
×
187
  }
188

189
  static getDb() {
190
    if (!this.pgBossDb) {
×
191
      throw new Error('pg boss not initialised')
×
192
    }
193

194
    return this.pgBossDb
×
195
  }
196

197
  static register<T extends RegisteredEvent>(event: T) {
198
    Queue.events.push(event)
×
199
  }
200

201
  static async stop() {
202
    if (!this.pgBoss) {
2!
203
      return
×
204
    }
205

206
    const boss = this.pgBoss
2✔
207
    const db = this.pgBossDb
2✔
208
    const { isProduction } = getConfig()
2✔
209

210
    try {
2✔
211
      await withQueueStopTimeout(
2✔
212
        boss.stop({
213
          timeout: 20 * 1000,
214
          graceful: isProduction,
215
          wait: true,
216
        }),
217
        'Queue stop'
218
      )
219
    } finally {
220
      try {
2✔
221
        await withQueueStopTimeout(this.callClose(), 'Queue close')
2✔
222
      } finally {
223
        if (Queue.pgBoss === boss) {
2!
224
          Queue.pgBoss = undefined
2✔
225
        }
226

227
        if (Queue.pgBossDb === db) {
2!
228
          Queue.pgBossDb = undefined
2✔
229
        }
230
      }
231
    }
232
  }
233

234
  protected static async startWorkers(opts: {
235
    maxConcurrentTasks: number
236
    signal?: AbortSignal
237
    onMessage?: (job: Job) => void
238
  }) {
239
    for (const event of Queue.events) {
×
240
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
241
    }
242
  }
243

244
  protected static callStart() {
245
    const events = Queue.events.map((event) => {
×
246
      return event.onStart()
×
247
    })
248

249
    return Promise.all(events)
×
250
  }
251

252
  protected static callClose() {
253
    const events = Queue.events.map((event) => {
2✔
254
      return event.onClose()
1✔
255
    })
256

257
    return Promise.all(events)
2✔
258
  }
259

260
  protected static async registerTask(
261
    event: RegisteredEvent,
262
    maxConcurrentTasks: number,
263
    onMessage?: (job: Job) => void,
264
    signal?: AbortSignal
265
  ) {
266
    const queueName = event.getQueueName()
×
267
    const deadLetterName = event.deadLetterQueueName()
×
268

269
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
270
    try {
×
271
      // Create dead-letter queue and the normal queue
272
      const queueOptions = {
×
273
        policy: 'standard',
274
        ...event.getQueueOptions(),
275
      } as const
276

277
      // dead-letter
278
      await this.pgBoss?.createQueue(deadLetterName, {
×
279
        ...queueOptions,
280
        name: deadLetterName,
281
        retentionDays: 30,
282
        retryBackoff: true,
283
      })
284

285
      // // normal queue
286
      await this.pgBoss?.createQueue(queueName, {
×
287
        ...queueOptions,
288
        name: queueName,
289
        deadLetter: deadLetterName,
290
      })
291
    } catch {
292
      // no-op
293
    }
294

295
    return this.pollQueue(event, {
×
296
      concurrentTaskCount,
297
      onMessage,
298
      signal,
299
    })
300
  }
301

302
  /**
303
   * an unacked job sits in the active state until pg-boss expires it,
304
   * re-running the handler and blocking singleton queues.
305
   * Retry transient errors briefly before giving up.
306
   */
307
  private static async ackWithRetry(ack: () => Promise<void> | undefined) {
308
    for (let attempt = 1; ; attempt++) {
11✔
309
      try {
16✔
310
        return await ack()
16✔
311
      } catch (e) {
312
        if (attempt >= jobAckMaxAttempts) {
7✔
313
          throw e
2✔
314
        }
315

316
        await wait(jobAckRetryDelayMs * attempt)
5✔
317
      }
318
    }
319
  }
320

321
  protected static pollQueue(
322
    event: RegisteredEvent,
323
    queueOpts: {
324
      concurrentTaskCount: number
325
      onMessage?: (job: Job) => void
326
      signal?: AbortSignal
327
    }
328
  ) {
329
    const limitConcurrency = createConcurrencyLimiter(queueOpts.concurrentTaskCount)
8✔
330
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
8!
331
    const batchSize =
332
      event.getWorkerOptions().batchSize ||
8✔
333
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
334

335
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
8✔
336
      type: 'queue',
337
      metadata: JSON.stringify({
338
        queueName: event.getQueueName(),
339
        batchSize,
340
        pollingInterval,
341
      }),
342
    })
343

344
    let started = false
8✔
345
    let jobFetched = 0
8✔
346

347
    const interval = setInterval(async () => {
8✔
348
      if (started) {
14!
349
        return
×
350
      }
351

352
      try {
14✔
353
        started = true
14✔
354
        const defaultFetch = {
14✔
355
          includeMetadata: true,
356
          batchSize,
357
        }
358

359
        const currentBatch = defaultFetch.batchSize - jobFetched
14✔
360

361
        if (currentBatch <= 0) {
14!
362
          return
×
363
        }
364

365
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
14✔
366
          ...event.getWorkerOptions(),
367
          ...defaultFetch,
368
          batchSize: currentBatch,
369
        })
370

371
        jobFetched += jobs?.length || 0
14✔
372

373
        if (jobFetched < defaultFetch.batchSize) {
14✔
374
          started = false
12✔
375
        }
376

377
        if (queueOpts.signal?.aborted) {
14!
378
          started = false
×
379

380
          // The fetch above already marked these jobs as active. Fail them so
381
          // they become retryable right after restart instead of sitting in the
382
          // active state until pg-boss expires them.
NEW
383
          if (jobs && jobs.length > 0) {
×
NEW
384
            jobFetched = Math.max(0, jobFetched - jobs.length)
×
385

NEW
386
            try {
×
NEW
387
              await this.pgBoss?.fail(
×
388
                event.getQueueName(),
NEW
389
                jobs.map((job) => job.id)
×
390
              )
391
            } catch (e) {
NEW
392
              logSchema.error(
×
393
                logger,
394
                `[Queue] Error failing jobs fetched during shutdown ${event.name}`,
395
                {
396
                  type: 'queue',
397
                  error: e,
398
                  metadata: JSON.stringify({ queueName: event.getQueueName(), jobs: jobs.length }),
399
                }
400
              )
401
            }
402
          }
403

UNCOV
404
          return
×
405
        }
406

407
        if (!jobs || (jobs && jobs.length === 0)) {
14✔
408
          started = false
5✔
409
          return
5✔
410
        }
411

412
        await Promise.allSettled(
9✔
413
          jobs.map((job) =>
414
            limitConcurrency(async () => {
11✔
415
              const sbReqId = getSbReqIdFromPayload(job.data)
11✔
416
              const logJobError = (message: string, error: unknown) => {
11✔
417
                logSchema.error(logger, message, {
8✔
418
                  type: 'queue-task',
419
                  error,
420
                  metadata: JSON.stringify(job),
421
                  sbReqId,
422
                })
423
              }
424

425
              try {
11✔
426
                try {
11✔
427
                  queueOpts.onMessage?.(job as Job)
11✔
428

429
                  await event.handle(job, { signal: queueOpts.signal })
11✔
430
                } catch (e) {
431
                  queueJobRetryFailed.add(1, {
6✔
432
                    name: event.getQueueName(),
433
                  })
434

435
                  logJobError(`[Queue Handler] Error while processing job ${event.name}`, e)
6✔
436

437
                  try {
6✔
438
                    await this.ackWithRetry(() => this.pgBoss?.fail(event.getQueueName(), job.id))
8✔
439

440
                    try {
5✔
441
                      const dbJob: JobWithMetadata | null =
442
                        (job as JobWithMetadata).priority !== undefined
5!
443
                          ? (job as JobWithMetadata)
444
                          : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
445

NEW
446
                      if (dbJob && dbJob.retryCount >= dbJob.retryLimit) {
×
NEW
447
                        queueJobError.add(1, {
×
448
                          name: event.getQueueName(),
449
                        })
450
                      }
451
                    } catch (fetchError) {
NEW
452
                      logJobError(`[Queue Handler] fetching job ${event.name}`, fetchError)
×
453
                    }
454
                  } catch (failError) {
455
                    logJobError(
1✔
456
                      `[Queue Handler] Error while marking job as failed ${event.name}`,
457
                      failError
458
                    )
459
                  }
460

461
                  throw e
6✔
462
                }
463

464
                try {
5✔
465
                  await this.ackWithRetry(() => this.pgBoss?.complete(event.getQueueName(), job.id))
8✔
466
                  queueJobCompleted.add(1, {
4✔
467
                    name: event.getQueueName(),
468
                  })
469
                } catch (e) {
470
                  queueJobCompleteFailed.add(1, {
1✔
471
                    name: event.getQueueName(),
472
                  })
473
                  logJobError(`[Queue Handler] Error while completing job ${event.name}`, e)
1✔
474
                  throw e
1✔
475
                }
476
              } finally {
477
                jobFetched = Math.max(0, jobFetched - 1)
11✔
478
              }
479
            })
480
          )
481
        )
482
      } catch (e) {
483
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
484
          type: 'queue',
485
          error: e,
486
          metadata: JSON.stringify({
487
            queueName: event.getQueueName(),
488
            batchSize,
489
            pollingInterval,
490
          }),
491
        })
492
      } finally {
493
        started = false
14✔
494
      }
495
    }, pollingInterval)
496

497
    queueOpts.signal?.addEventListener('abort', () => {
8✔
498
      clearInterval(interval)
8✔
499
    })
500
  }
501
}
502

503
async function withQueueStopTimeout<T>(promise: Promise<T>, label: string): Promise<T> {
504
  let timeout: ReturnType<typeof setTimeout> | undefined
505

506
  const timeoutPromise = new Promise<never>((_, reject) => {
4✔
507
    timeout = setTimeout(() => {
4✔
508
      reject(new Error(`${label} timed out after ${queueStopTimeoutMs}ms`))
2✔
509
    }, queueStopTimeoutMs)
510
    timeout.unref?.()
4✔
511
  })
512

513
  try {
4✔
514
    return await Promise.race([promise, timeoutPromise])
4✔
515
  } finally {
516
    if (timeout) {
4!
517
      clearTimeout(timeout)
4✔
518
    }
519
  }
520
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc