• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 28601863430

02 Jul 2026 03:28PM UTC coverage: 78.861% (+0.04%) from 78.821%
28601863430

Pull #1193

github

web-flow
Merge c6c1f82b5 into 3ce1ad0bc
Pull Request #1193: fix: replace mutex with single flight pattern

5104 of 7034 branches covered (72.56%)

Branch coverage included in aggregate %.

72 of 87 new or added lines in 9 files covered. (82.76%)

2 existing lines in 2 files now uncovered.

10094 of 12238 relevant lines covered (82.48%)

420.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.04
/src/internal/queue/queue.ts
1
import { createConcurrencyLimiter } from '@internal/concurrency'
2
import { ERRORS } from '@internal/errors'
3
import { QueueDB } from '@internal/queue/database'
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
5
import { getConfig } from '../../config'
6
import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring'
7
import {
8
  queueJobCompleted,
9
  queueJobCompleteFailed,
10
  queueJobError,
11
  queueJobRetryFailed,
12
} from '../monitoring/metrics'
13
import { Event } from './event'
14

15
type RegisteredEvent = {
16
  deadLetterQueueName(): string
17
  getQueueName(): string
18
  getQueueOptions(): ReturnType<typeof Event.getQueueOptions>
19
  getWorkerOptions(): ReturnType<typeof Event.getWorkerOptions>
20
  handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }): unknown
21
  onClose(): unknown
22
  onStart(): unknown
23
  name: string
24
}
25

26
export const PG_BOSS_SCHEMA = 'pgboss_v10'
79✔
27
const queueStopTimeoutMs = 25_000
79✔
28

29
export abstract class Queue {
30
  protected static events: RegisteredEvent[] = []
79✔
31
  private static pgBoss?: PgBoss
32
  private static pgBossDb?: PgBoss.Db
33

34
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
35
    const {
36
      isMultitenant,
37
      databaseURL,
38
      multitenantDatabasePoolUrl,
39
      multitenantDatabaseUrl,
40
      pgQueueConnectionURL,
41
      pgQueueArchiveCompletedAfterSeconds,
42
      pgQueueDeleteAfterDays,
43
      pgQueueDeleteAfterHours,
44
      pgQueueRetentionDays,
×
45
    } = getConfig()
46

47
    let url = pgQueueConnectionURL ?? databaseURL
×
48
    let migrate = true
×
49

50
    if (isMultitenant && !pgQueueConnectionURL) {
×
51
      if (!multitenantDatabaseUrl) {
×
52
        throw new Error(
×
53
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
54
        )
55
      }
56
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
57

58
      if (multitenantDatabasePoolUrl) {
×
59
        migrate = false
×
60
      }
61
    }
62

63
    return new PgBoss({
×
64
      connectionString: url,
65
      migrate,
66
      db: opts.db,
67
      schema: PG_BOSS_SCHEMA,
68
      ...(pgQueueDeleteAfterHours
×
69
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
70
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
71
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
72
      retentionDays: pgQueueRetentionDays,
73
      retryBackoff: true,
74
      retryLimit: 20,
75
      expireInHours: 23,
76
      maintenanceIntervalSeconds: 60 * 5,
77
      schedule: opts.enableWorkers,
78
      supervise: opts.enableWorkers,
79
    })
80
  }
81

82
  static async start(opts: {
83
    signal?: AbortSignal
84
    onMessage?: (job: Job) => void
85
    registerWorkers?: () => void
86
  }) {
87
    if (Queue.pgBoss) {
×
88
      return Queue.pgBoss
×
89
    }
90

91
    if (opts.signal?.aborted) {
×
92
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
93
    }
94

95
    const {
96
      isMultitenant,
97
      databaseURL,
98
      multitenantDatabaseUrl,
99
      multitenantDatabasePoolUrl,
100
      pgQueueConnectionURL,
101
      pgQueueEnableWorkers,
102
      pgQueueReadWriteTimeout,
103
      pgQueueConcurrentTasksPerQueue,
104
      pgQueueMaxConnections,
105
      databaseApplicationName,
×
106
    } = getConfig()
107

108
    let url = pgQueueConnectionURL || databaseURL
×
109

110
    if (isMultitenant && !pgQueueConnectionURL) {
×
111
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
112
        throw new Error(
×
113
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
114
        )
115
      }
116
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
117
    }
118

119
    Queue.pgBossDb = new QueueDB({
×
120
      min: 0,
121
      max: pgQueueMaxConnections,
122
      connectionString: url,
123
      application_name: databaseApplicationName,
124
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
125
    })
126

127
    Queue.pgBoss = this.createPgBoss({
×
128
      db: Queue.pgBossDb,
129
      enableWorkers: pgQueueEnableWorkers !== false,
130
    })
131

132
    Queue.pgBoss.on('error', (error) => {
×
133
      logSchema.error(logger, '[Queue] error', {
×
134
        type: 'queue',
135
        error,
136
      })
137
    })
138

139
    await Queue.pgBoss.start()
×
140

141
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
142
      opts.registerWorkers()
×
143
    }
144

145
    await Queue.callStart()
×
146
    await Queue.startWorkers({
×
147
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
148
      onMessage: opts.onMessage,
149
      signal: opts.signal,
150
    })
151

152
    if (opts.signal) {
×
153
      opts.signal.addEventListener(
×
154
        'abort',
155
        async () => {
156
          logSchema.info(logger, '[Queue] Stopping', {
×
157
            type: 'queue',
158
          })
159
          return Queue.stop()
×
160
            .then(async () => {
161
              logSchema.info(logger, '[Queue] Exited', {
×
162
                type: 'queue',
163
              })
164
            })
165
            .catch((e) => {
166
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
167
                error: e,
168
                type: 'queue',
169
              })
170
            })
171
        },
172
        { once: true }
173
      )
174
    }
175

176
    return Queue.pgBoss
×
177
  }
178

179
  static getInstance() {
180
    if (!this.pgBoss) {
×
181
      throw new Error('pg boss not initialised')
×
182
    }
183

184
    return this.pgBoss
×
185
  }
186

187
  static getDb() {
188
    if (!this.pgBossDb) {
×
189
      throw new Error('pg boss not initialised')
×
190
    }
191

192
    return this.pgBossDb
×
193
  }
194

195
  static register<T extends RegisteredEvent>(event: T) {
196
    Queue.events.push(event)
×
197
  }
198

199
  static async stop() {
200
    if (!this.pgBoss) {
2!
201
      return
×
202
    }
203

204
    const boss = this.pgBoss
2✔
205
    const db = this.pgBossDb
2✔
206
    const { isProduction } = getConfig()
2✔
207

208
    try {
2✔
209
      await withQueueStopTimeout(
2✔
210
        boss.stop({
211
          timeout: 20 * 1000,
212
          graceful: isProduction,
213
          wait: true,
214
        }),
215
        'Queue stop'
216
      )
217
    } finally {
218
      try {
2✔
219
        await withQueueStopTimeout(this.callClose(), 'Queue close')
2✔
220
      } finally {
221
        if (Queue.pgBoss === boss) {
2!
222
          Queue.pgBoss = undefined
2✔
223
        }
224

225
        if (Queue.pgBossDb === db) {
2!
226
          Queue.pgBossDb = undefined
2✔
227
        }
228
      }
229
    }
230
  }
231

232
  protected static async startWorkers(opts: {
233
    maxConcurrentTasks: number
234
    signal?: AbortSignal
235
    onMessage?: (job: Job) => void
236
  }) {
237
    for (const event of Queue.events) {
×
238
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
239
    }
240
  }
241

242
  protected static callStart() {
243
    const events = Queue.events.map((event) => {
×
244
      return event.onStart()
×
245
    })
246

247
    return Promise.all(events)
×
248
  }
249

250
  protected static callClose() {
251
    const events = Queue.events.map((event) => {
2✔
252
      return event.onClose()
1✔
253
    })
254

255
    return Promise.all(events)
2✔
256
  }
257

258
  protected static async registerTask(
259
    event: RegisteredEvent,
260
    maxConcurrentTasks: number,
261
    onMessage?: (job: Job) => void,
262
    signal?: AbortSignal
263
  ) {
264
    const queueName = event.getQueueName()
×
265
    const deadLetterName = event.deadLetterQueueName()
×
266

267
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
268
    try {
×
269
      // Create dead-letter queue and the normal queue
270
      const queueOptions = {
×
271
        policy: 'standard',
272
        ...event.getQueueOptions(),
273
      } as const
274

275
      // dead-letter
276
      await this.pgBoss?.createQueue(deadLetterName, {
×
277
        ...queueOptions,
278
        name: deadLetterName,
279
        retentionDays: 30,
280
        retryBackoff: true,
281
      })
282

283
      // // normal queue
284
      await this.pgBoss?.createQueue(queueName, {
×
285
        ...queueOptions,
286
        name: queueName,
287
        deadLetter: deadLetterName,
288
      })
289
    } catch {
290
      // no-op
291
    }
292

293
    return this.pollQueue(event, {
×
294
      concurrentTaskCount,
295
      onMessage,
296
      signal,
297
    })
298
  }
299

300
  protected static pollQueue(
301
    event: RegisteredEvent,
302
    queueOpts: {
303
      concurrentTaskCount: number
304
      onMessage?: (job: Job) => void
305
      signal?: AbortSignal
306
    }
307
  ) {
308
    const limitConcurrency = createConcurrencyLimiter(queueOpts.concurrentTaskCount)
7✔
309
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
7!
310
    const batchSize =
311
      event.getWorkerOptions().batchSize ||
7✔
312
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
313

314
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
7✔
315
      type: 'queue',
316
      metadata: JSON.stringify({
317
        queueName: event.getQueueName(),
318
        batchSize,
319
        pollingInterval,
320
      }),
321
    })
322

323
    let started = false
7✔
324
    let jobFetched = 0
7✔
325

326
    const interval = setInterval(async () => {
7✔
327
      if (started) {
10!
328
        return
×
329
      }
330

331
      try {
10✔
332
        started = true
10✔
333
        const defaultFetch = {
10✔
334
          includeMetadata: true,
335
          batchSize,
336
        }
337

338
        const currentBatch = defaultFetch.batchSize - jobFetched
10✔
339

340
        if (currentBatch <= 0) {
10!
341
          return
×
342
        }
343

344
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
10✔
345
          ...event.getWorkerOptions(),
346
          ...defaultFetch,
347
          batchSize: currentBatch,
348
        })
349

350
        jobFetched += jobs?.length || 0
10✔
351

352
        if (jobFetched < defaultFetch.batchSize) {
10✔
353
          started = false
8✔
354
        }
355

356
        if (queueOpts.signal?.aborted) {
10!
357
          started = false
×
358

359
          // The fetch above already marked these jobs as active. Fail them so
360
          // they become retryable right after restart instead of sitting in the
361
          // active state until pg-boss expires them.
NEW
362
          if (jobs && jobs.length > 0) {
×
NEW
363
            jobFetched = Math.max(0, jobFetched - jobs.length)
×
364

NEW
365
            try {
×
NEW
366
              await this.pgBoss?.fail(
×
367
                event.getQueueName(),
NEW
368
                jobs.map((job) => job.id)
×
369
              )
370
            } catch (e) {
NEW
371
              logSchema.error(
×
372
                logger,
373
                `[Queue] Error failing jobs fetched during shutdown ${event.name}`,
374
                {
375
                  type: 'queue',
376
                  error: e,
377
                  metadata: JSON.stringify({ queueName: event.getQueueName(), jobs: jobs.length }),
378
                }
379
              )
380
            }
381
          }
382

UNCOV
383
          return
×
384
        }
385

386
        if (!jobs || (jobs && jobs.length === 0)) {
10✔
387
          started = false
2✔
388
          return
2✔
389
        }
390

391
        await Promise.allSettled(
8✔
392
          jobs.map((job) =>
393
            limitConcurrency(async () => {
10✔
394
              const sbReqId = getSbReqIdFromPayload(job.data)
10✔
395
              const logJobError = (message: string, error: unknown) => {
10✔
396
                logSchema.error(logger, message, {
8✔
397
                  type: 'queue-task',
398
                  error,
399
                  metadata: JSON.stringify(job),
400
                  sbReqId,
401
                })
402
              }
403

404
              try {
10✔
405
                try {
10✔
406
                  queueOpts.onMessage?.(job as Job)
10✔
407

408
                  await event.handle(job, { signal: queueOpts.signal })
10✔
409
                } catch (e) {
410
                  queueJobRetryFailed.add(1, {
6✔
411
                    name: event.getQueueName(),
412
                  })
413

414
                  logJobError(`[Queue Handler] Error while processing job ${event.name}`, e)
6✔
415

416
                  try {
6✔
417
                    await this.pgBoss?.fail(event.getQueueName(), job.id)
6✔
418

419
                    try {
5✔
420
                      const dbJob: JobWithMetadata | null =
421
                        (job as JobWithMetadata).priority !== undefined
5!
422
                          ? (job as JobWithMetadata)
423
                          : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
424

NEW
425
                      if (dbJob && dbJob.retryCount >= dbJob.retryLimit) {
×
NEW
426
                        queueJobError.add(1, {
×
427
                          name: event.getQueueName(),
428
                        })
429
                      }
430
                    } catch (fetchError) {
NEW
431
                      logJobError(`[Queue Handler] fetching job ${event.name}`, fetchError)
×
432
                    }
433
                  } catch (failError) {
434
                    logJobError(
1✔
435
                      `[Queue Handler] Error while marking job as failed ${event.name}`,
436
                      failError
437
                    )
438
                  }
439

440
                  throw e
6✔
441
                }
442

443
                try {
4✔
444
                  await this.pgBoss?.complete(event.getQueueName(), job.id)
4✔
445
                  queueJobCompleted.add(1, {
3✔
446
                    name: event.getQueueName(),
447
                  })
448
                } catch (e) {
449
                  queueJobCompleteFailed.add(1, {
1✔
450
                    name: event.getQueueName(),
451
                  })
452
                  logJobError(`[Queue Handler] Error while completing job ${event.name}`, e)
1✔
453
                  throw e
1✔
454
                }
455
              } finally {
456
                jobFetched = Math.max(0, jobFetched - 1)
10✔
457
              }
458
            })
459
          )
460
        )
461
      } catch (e) {
462
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
463
          type: 'queue',
464
          error: e,
465
          metadata: JSON.stringify({
466
            queueName: event.getQueueName(),
467
            batchSize,
468
            pollingInterval,
469
          }),
470
        })
471
      } finally {
472
        started = false
10✔
473
      }
474
    }, pollingInterval)
475

476
    queueOpts.signal?.addEventListener('abort', () => {
7✔
477
      clearInterval(interval)
7✔
478
    })
479
  }
480
}
481

482
async function withQueueStopTimeout<T>(promise: Promise<T>, label: string): Promise<T> {
483
  let timeout: ReturnType<typeof setTimeout> | undefined
484

485
  const timeoutPromise = new Promise<never>((_, reject) => {
4✔
486
    timeout = setTimeout(() => {
4✔
487
      reject(new Error(`${label} timed out after ${queueStopTimeoutMs}ms`))
2✔
488
    }, queueStopTimeoutMs)
489
    timeout.unref?.()
4✔
490
  })
491

492
  try {
4✔
493
    return await Promise.race([promise, timeoutPromise])
4✔
494
  } finally {
495
    if (timeout) {
4!
496
      clearTimeout(timeout)
4✔
497
    }
498
  }
499
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc