• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 22342901268

24 Feb 2026 08:34AM UTC coverage: 76.198% (-0.06%) from 76.262%
22342901268

Pull #865

github

web-flow
Merge d72e721d9 into 2a4fa0a84
Pull Request #865: feat: support pgbouncer for multitenant database

2154 of 3133 branches covered (68.75%)

Branch coverage included in aggregate %.

38 of 87 new or added lines in 6 files covered. (43.68%)

2 existing lines in 1 file now uncovered.

26322 of 34238 relevant lines covered (76.88%)

95.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.74
/src/internal/queue/queue.ts
1
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { queueJobRetryFailed, queueJobCompleted, queueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
11
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
12
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
13
}
1✔
14

1✔
15
export abstract class Queue {
1✔
16
  protected static events: SubclassOfBaseClass[] = []
1✔
17
  private static pgBoss?: PgBoss
1✔
18
  private static pgBossDb?: PgBoss.Db
1✔
19

1✔
20
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
1✔
21
    const {
×
22
      isMultitenant,
×
23
      databaseURL,
×
NEW
24
      multitenantDatabasePoolUrl,
×
25
      multitenantDatabaseUrl,
×
26
      pgQueueConnectionURL,
×
27
      pgQueueArchiveCompletedAfterSeconds,
×
28
      pgQueueDeleteAfterDays,
×
29
      pgQueueDeleteAfterHours,
×
30
      pgQueueRetentionDays,
×
31
    } = getConfig()
×
32

×
33
    let url = pgQueueConnectionURL ?? databaseURL
×
NEW
34
    let migrate = true
×
35

×
36
    if (isMultitenant && !pgQueueConnectionURL) {
×
37
      if (!multitenantDatabaseUrl) {
×
38
        throw new Error(
×
39
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
40
        )
×
41
      }
×
NEW
42
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
NEW
43

×
NEW
44
      if (multitenantDatabasePoolUrl) {
×
NEW
45
        migrate = false
×
NEW
46
      }
×
47
    }
×
48

×
49
    return new PgBoss({
×
50
      connectionString: url,
×
NEW
51
      migrate: migrate,
×
52
      db: opts.db,
×
53
      schema: 'pgboss_v10',
×
54
      application_name: 'storage-pgboss',
×
55
      ...(pgQueueDeleteAfterHours
×
56
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
57
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
58
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
59
      retentionDays: pgQueueRetentionDays,
×
60
      retryBackoff: true,
×
61
      retryLimit: 20,
×
62
      expireInHours: 23,
×
63
      maintenanceIntervalSeconds: 60 * 5,
×
64
      schedule: opts.enableWorkers,
×
65
      supervise: opts.enableWorkers,
×
66
    })
×
67
  }
×
68

1✔
69
  static async start(opts: {
1✔
70
    signal?: AbortSignal
×
71
    onMessage?: (job: Job) => void
×
72
    registerWorkers?: () => void
×
73
  }) {
×
74
    if (Queue.pgBoss) {
×
75
      return Queue.pgBoss
×
76
    }
×
77

×
78
    if (opts.signal?.aborted) {
×
79
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
80
    }
×
81

×
82
    const {
×
83
      isMultitenant,
×
84
      databaseURL,
×
85
      multitenantDatabaseUrl,
×
NEW
86
      multitenantDatabasePoolUrl,
×
87
      pgQueueConnectionURL,
×
88
      pgQueueEnableWorkers,
×
89
      pgQueueReadWriteTimeout,
×
90
      pgQueueConcurrentTasksPerQueue,
×
91
      pgQueueMaxConnections,
×
92
    } = getConfig()
×
93

×
94
    let url = pgQueueConnectionURL || databaseURL
×
95

×
96
    if (isMultitenant && !pgQueueConnectionURL) {
×
NEW
97
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
98
        throw new Error(
×
99
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
100
        )
×
101
      }
×
NEW
102
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
103
    }
×
104

×
105
    Queue.pgBossDb = new QueueDB({
×
106
      min: 0,
×
107
      max: pgQueueMaxConnections,
×
108
      connectionString: url,
×
109
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
110
    })
×
111

×
112
    Queue.pgBoss = this.createPgBoss({
×
113
      db: Queue.pgBossDb,
×
114
      enableWorkers: pgQueueEnableWorkers !== false,
×
115
    })
×
116

×
117
    Queue.pgBoss.on('error', (error) => {
×
118
      logSchema.error(logger, '[Queue] error', {
×
119
        type: 'queue',
×
120
        error,
×
121
      })
×
122
    })
×
123

×
124
    await Queue.pgBoss.start()
×
125

×
126
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
127
      opts.registerWorkers()
×
128
    }
×
129

×
130
    await Queue.callStart()
×
131
    await Queue.startWorkers({
×
132
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
133
      onMessage: opts.onMessage,
×
134
      signal: opts.signal,
×
135
    })
×
136

×
137
    if (opts.signal) {
×
138
      opts.signal.addEventListener(
×
139
        'abort',
×
140
        async () => {
×
141
          logSchema.info(logger, '[Queue] Stopping', {
×
142
            type: 'queue',
×
143
          })
×
144
          return Queue.stop()
×
145
            .then(async () => {
×
146
              logSchema.info(logger, '[Queue] Exited', {
×
147
                type: 'queue',
×
148
              })
×
149
            })
×
150
            .catch((e) => {
×
151
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
152
                error: e,
×
153
                type: 'queue',
×
154
              })
×
155
            })
×
156
            .finally(async () => {
×
157
              await Queue.callClose().catch(() => {
×
158
                // no-op
×
159
              })
×
160
            })
×
161
        },
×
162
        { once: true }
×
163
      )
×
164
    }
×
165

×
166
    return Queue.pgBoss
×
167
  }
×
168

1✔
169
  static getInstance() {
1✔
170
    if (!this.pgBoss) {
×
171
      throw new Error('pg boss not initialised')
×
172
    }
×
173

×
174
    return this.pgBoss
×
175
  }
×
176

1✔
177
  static getDb() {
1✔
178
    if (!this.pgBossDb) {
×
179
      throw new Error('pg boss not initialised')
×
180
    }
×
181

×
182
    return this.pgBossDb
×
183
  }
×
184

1✔
185
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
186
    Queue.events.push(event)
×
187
  }
×
188

1✔
189
  static async stop() {
1✔
190
    if (!this.pgBoss) {
×
191
      return
×
192
    }
×
193

×
194
    const boss = this.pgBoss
×
195
    const { isProduction } = getConfig()
×
196

×
197
    await boss.stop({
×
198
      timeout: 20 * 1000,
×
199
      graceful: isProduction,
×
200
      wait: true,
×
201
    })
×
202

×
203
    await new Promise((resolve) => {
×
204
      boss.once('stopped', async () => {
×
205
        await this.callClose()
×
206
        resolve(null)
×
207
      })
×
208
    })
×
209

×
210
    Queue.pgBoss = undefined
×
211
  }
×
212

1✔
213
  protected static async startWorkers(opts: {
1✔
214
    maxConcurrentTasks: number
×
215
    signal?: AbortSignal
×
216
    onMessage?: (job: Job) => void
×
217
  }) {
×
218
    for (const event of Queue.events) {
×
219
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
220
    }
×
221
  }
×
222

1✔
223
  protected static callStart() {
1✔
224
    const events = Queue.events.map((event) => {
×
225
      return event.onStart()
×
226
    })
×
227

×
228
    return Promise.all(events)
×
229
  }
×
230

1✔
231
  protected static callClose() {
1✔
232
    const events = Queue.events.map((event) => {
×
233
      return event.onClose()
×
234
    })
×
235

×
236
    return Promise.all(events)
×
237
  }
×
238

1✔
239
  protected static async registerTask(
1✔
240
    event: SubclassOfBaseClass,
×
241
    maxConcurrentTasks: number,
×
242
    onMessage?: (job: Job) => void,
×
243
    signal?: AbortSignal
×
244
  ) {
×
245
    const queueName = event.getQueueName()
×
246
    const deadLetterName = event.deadLetterQueueName()
×
247

×
248
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
249
    try {
×
250
      // Create dead-letter queue and the normal queue
×
251
      const queueOptions = {
×
252
        policy: 'standard',
×
253
        ...event.getQueueOptions(),
×
254
      } as const
×
255

×
256
      // dead-letter
×
257
      await this.pgBoss?.createQueue(deadLetterName, {
×
258
        ...queueOptions,
×
259
        name: deadLetterName,
×
260
        retentionDays: 30,
×
261
        retryBackoff: true,
×
262
      })
×
263

×
264
      // // normal queue
×
265
      await this.pgBoss?.createQueue(queueName, {
×
266
        ...queueOptions,
×
267
        name: queueName,
×
268
        deadLetter: deadLetterName,
×
269
      })
×
270
    } catch {
×
271
      // no-op
×
272
    }
×
273

×
274
    return this.pollQueue(event, {
×
275
      concurrentTaskCount,
×
276
      onMessage,
×
277
      signal,
×
278
    })
×
279
  }
×
280

1✔
281
  protected static pollQueue(
1✔
282
    event: SubclassOfBaseClass,
×
283
    queueOpts: {
×
284
      concurrentTaskCount: number
×
285
      onMessage?: (job: Job) => void
×
286
      signal?: AbortSignal
×
287
    }
×
288
  ) {
×
289
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
290
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
291
    const batchSize =
×
292
      event.getWorkerOptions().batchSize ||
×
293
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
294

×
295
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
296
      type: 'queue',
×
297
      metadata: JSON.stringify({
×
298
        queueName: event.getQueueName(),
×
299
        batchSize: batchSize,
×
300
        pollingInterval: pollingInterval,
×
301
      }),
×
302
    })
×
303

×
304
    let started = false
×
305
    let jobFetched = 0
×
306

×
307
    const interval = setInterval(async () => {
×
308
      if (started) {
×
309
        return
×
310
      }
×
311

×
312
      try {
×
313
        started = true
×
314
        const defaultFetch = {
×
315
          includeMetadata: true,
×
316
          batchSize,
×
317
        }
×
318

×
319
        const currentBatch = defaultFetch.batchSize - jobFetched
×
320

×
321
        if (currentBatch <= 0) {
×
322
          return
×
323
        }
×
324

×
325
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
326
          ...event.getWorkerOptions(),
×
327
          ...defaultFetch,
×
328
          batchSize: currentBatch,
×
329
        })
×
330

×
331
        jobFetched += jobs?.length || 0
×
332

×
333
        if (jobFetched < defaultFetch.batchSize) {
×
334
          started = false
×
335
        }
×
336

×
337
        if (queueOpts.signal?.aborted) {
×
338
          started = false
×
339
          return
×
340
        }
×
341

×
342
        if (!jobs || (jobs && jobs.length === 0)) {
×
343
          started = false
×
344
          return
×
345
        }
×
346

×
347
        await Promise.allSettled(
×
348
          jobs.map(async (job) => {
×
349
            const lock = await semaphore.acquire()
×
350
            const opts = event.getQueueOptions()
×
351
            try {
×
352
              queueOpts.onMessage?.(job as Job)
×
353

×
354
              await event.handle(job, { signal: queueOpts.signal })
×
355

×
356
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
357
              queueJobCompleted.add(1, {
×
358
                name: event.getQueueName(),
×
359
              })
×
360
            } catch (e) {
×
361
              queueJobRetryFailed.add(1, {
×
362
                name: event.getQueueName(),
×
363
              })
×
364

×
365
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
366

×
367
              try {
×
368
                const dbJob: JobWithMetadata | null =
×
369
                  (job as JobWithMetadata).priority !== undefined
×
370
                    ? (job as JobWithMetadata)
×
371
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
372

×
373
                if (!dbJob) {
×
374
                  return
×
375
                }
×
376
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
377
                  queueJobError.add(1, {
×
378
                    name: event.getQueueName(),
×
379
                  })
×
380
                }
×
381
              } catch (e) {
×
382
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
383
                  type: 'queue-task',
×
384
                  error: e,
×
385
                  metadata: JSON.stringify(job),
×
386
                })
×
387
              }
×
388

×
389
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
390
                type: 'queue-task',
×
391
                error: e,
×
392
                metadata: JSON.stringify(job),
×
393
              })
×
394

×
395
              throw e
×
396
            } finally {
×
397
              jobFetched = Math.max(0, jobFetched - 1)
×
398
              await lock.release()
×
399
            }
×
400
          })
×
401
        )
×
402
      } catch (e) {
×
403
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
404
          type: 'queue',
×
405
          error: e,
×
406
          metadata: JSON.stringify({
×
407
            queueName: event.getQueueName(),
×
408
            batchSize,
×
409
            pollingInterval,
×
410
          }),
×
411
        })
×
412
      } finally {
×
413
        started = false
×
414
      }
×
415
    }, pollingInterval)
×
416

×
417
    queueOpts.signal?.addEventListener('abort', () => {
×
418
      clearInterval(interval)
×
419
    })
×
420
  }
×
421
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc