• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 23562155747

25 Mar 2026 08:20PM UTC coverage: 77.143% (+0.1%) from 77.04%
23562155747

push

github

web-flow
fix: admin handlers for migrations (#929)

4183 of 5921 branches covered (70.65%)

Branch coverage included in aggregate %.

20 of 25 new or added lines in 6 files covered. (80.0%)

4 existing lines in 1 file now uncovered.

27336 of 34937 relevant lines covered (78.24%)

185.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.43
/src/internal/queue/queue.ts
1
import { ERRORS } from '@internal/errors'
2✔
2
import { QueueDB } from '@internal/queue/database'
2✔
3
import { Semaphore } from '@shopify/semaphore'
2✔
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
2✔
5
import { getConfig } from '../../config'
2✔
6
import { logger, logSchema } from '../monitoring'
2✔
7
import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics'
2✔
8
import { Event } from './event'
2✔
9

2✔
10
type SubclassOfBaseClass = (new (
2✔
11
  payload: any
2✔
12
) => Event<any>) & {
2✔
13
  [K in keyof typeof Event]: (typeof Event)[K]
2✔
14
}
2✔
15

2✔
16
export const PG_BOSS_SCHEMA = 'pgboss_v10'
2✔
17

2✔
18
export abstract class Queue {
2✔
19
  protected static events: SubclassOfBaseClass[] = []
2✔
20
  private static pgBoss?: PgBoss
2✔
21
  private static pgBossDb?: PgBoss.Db
2✔
22

2✔
23
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
2✔
24
    const {
×
25
      isMultitenant,
×
26
      databaseURL,
×
27
      multitenantDatabasePoolUrl,
×
28
      multitenantDatabaseUrl,
×
29
      pgQueueConnectionURL,
×
30
      pgQueueArchiveCompletedAfterSeconds,
×
31
      pgQueueDeleteAfterDays,
×
32
      pgQueueDeleteAfterHours,
×
33
      pgQueueRetentionDays,
×
34
    } = getConfig()
×
35

×
36
    let url = pgQueueConnectionURL ?? databaseURL
×
37
    let migrate = true
×
38

×
39
    if (isMultitenant && !pgQueueConnectionURL) {
×
40
      if (!multitenantDatabaseUrl) {
×
41
        throw new Error(
×
42
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
43
        )
×
44
      }
×
45
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
46

×
47
      if (multitenantDatabasePoolUrl) {
×
48
        migrate = false
×
49
      }
×
50
    }
×
51

×
52
    return new PgBoss({
×
53
      connectionString: url,
×
54
      migrate,
×
55
      db: opts.db,
×
NEW
56
      schema: PG_BOSS_SCHEMA,
×
57
      ...(pgQueueDeleteAfterHours
×
58
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
59
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
60
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
61
      retentionDays: pgQueueRetentionDays,
×
62
      retryBackoff: true,
×
63
      retryLimit: 20,
×
64
      expireInHours: 23,
×
65
      maintenanceIntervalSeconds: 60 * 5,
×
66
      schedule: opts.enableWorkers,
×
67
      supervise: opts.enableWorkers,
×
68
    })
×
69
  }
×
70

2✔
71
  static async start(opts: {
2✔
72
    signal?: AbortSignal
×
73
    onMessage?: (job: Job) => void
×
74
    registerWorkers?: () => void
×
75
  }) {
×
76
    if (Queue.pgBoss) {
×
77
      return Queue.pgBoss
×
78
    }
×
79

×
80
    if (opts.signal?.aborted) {
×
81
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
82
    }
×
83

×
84
    const {
×
85
      isMultitenant,
×
86
      databaseURL,
×
87
      multitenantDatabaseUrl,
×
88
      multitenantDatabasePoolUrl,
×
89
      pgQueueConnectionURL,
×
90
      pgQueueEnableWorkers,
×
91
      pgQueueReadWriteTimeout,
×
92
      pgQueueConcurrentTasksPerQueue,
×
93
      pgQueueMaxConnections,
×
94
    } = getConfig()
×
95

×
96
    let url = pgQueueConnectionURL || databaseURL
×
97

×
98
    if (isMultitenant && !pgQueueConnectionURL) {
×
99
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
100
        throw new Error(
×
101
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
102
        )
×
103
      }
×
104
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
105
    }
×
106

×
107
    Queue.pgBossDb = new QueueDB({
×
108
      min: 0,
×
109
      max: pgQueueMaxConnections,
×
110
      connectionString: url,
×
111
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
112
    })
×
113

×
114
    Queue.pgBoss = this.createPgBoss({
×
115
      db: Queue.pgBossDb,
×
116
      enableWorkers: pgQueueEnableWorkers !== false,
×
117
    })
×
118

×
119
    Queue.pgBoss.on('error', (error) => {
×
120
      logSchema.error(logger, '[Queue] error', {
×
121
        type: 'queue',
×
122
        error,
×
123
      })
×
124
    })
×
125

×
126
    await Queue.pgBoss.start()
×
127

×
128
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
129
      opts.registerWorkers()
×
130
    }
×
131

×
132
    await Queue.callStart()
×
133
    await Queue.startWorkers({
×
134
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
135
      onMessage: opts.onMessage,
×
136
      signal: opts.signal,
×
137
    })
×
138

×
139
    if (opts.signal) {
×
140
      opts.signal.addEventListener(
×
141
        'abort',
×
142
        async () => {
×
143
          logSchema.info(logger, '[Queue] Stopping', {
×
144
            type: 'queue',
×
145
          })
×
146
          return Queue.stop()
×
147
            .then(async () => {
×
148
              logSchema.info(logger, '[Queue] Exited', {
×
149
                type: 'queue',
×
150
              })
×
151
            })
×
152
            .catch((e) => {
×
153
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
154
                error: e,
×
155
                type: 'queue',
×
156
              })
×
157
            })
×
158
            .finally(async () => {
×
159
              await Queue.callClose().catch(() => {
×
160
                // no-op
×
161
              })
×
162
            })
×
163
        },
×
164
        { once: true }
×
165
      )
×
166
    }
×
167

×
168
    return Queue.pgBoss
×
169
  }
×
170

2✔
171
  static getInstance() {
2✔
172
    if (!this.pgBoss) {
×
173
      throw new Error('pg boss not initialised')
×
174
    }
×
175

×
176
    return this.pgBoss
×
177
  }
×
178

2✔
179
  static getDb() {
2✔
180
    if (!this.pgBossDb) {
×
181
      throw new Error('pg boss not initialised')
×
182
    }
×
183

×
184
    return this.pgBossDb
×
185
  }
×
186

2✔
187
  static register<T extends SubclassOfBaseClass>(event: T) {
2✔
188
    Queue.events.push(event)
×
189
  }
×
190

2✔
191
  static async stop() {
2✔
192
    if (!this.pgBoss) {
×
193
      return
×
194
    }
×
195

×
196
    const boss = this.pgBoss
×
197
    const { isProduction } = getConfig()
×
198

×
199
    await boss.stop({
×
200
      timeout: 20 * 1000,
×
201
      graceful: isProduction,
×
202
      wait: true,
×
203
    })
×
204

×
205
    await new Promise((resolve) => {
×
206
      boss.once('stopped', async () => {
×
207
        await this.callClose()
×
208
        resolve(null)
×
209
      })
×
210
    })
×
211

×
212
    Queue.pgBoss = undefined
×
213
  }
×
214

2✔
215
  protected static async startWorkers(opts: {
2✔
216
    maxConcurrentTasks: number
×
217
    signal?: AbortSignal
×
218
    onMessage?: (job: Job) => void
×
219
  }) {
×
220
    for (const event of Queue.events) {
×
221
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
222
    }
×
223
  }
×
224

2✔
225
  protected static callStart() {
2✔
226
    const events = Queue.events.map((event) => {
×
227
      return event.onStart()
×
228
    })
×
229

×
230
    return Promise.all(events)
×
231
  }
×
232

2✔
233
  protected static callClose() {
2✔
234
    const events = Queue.events.map((event) => {
×
235
      return event.onClose()
×
236
    })
×
237

×
238
    return Promise.all(events)
×
239
  }
×
240

2✔
241
  protected static async registerTask(
2✔
242
    event: SubclassOfBaseClass,
×
243
    maxConcurrentTasks: number,
×
244
    onMessage?: (job: Job) => void,
×
245
    signal?: AbortSignal
×
246
  ) {
×
247
    const queueName = event.getQueueName()
×
248
    const deadLetterName = event.deadLetterQueueName()
×
249

×
250
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
251
    try {
×
252
      // Create dead-letter queue and the normal queue
×
253
      const queueOptions = {
×
254
        policy: 'standard',
×
255
        ...event.getQueueOptions(),
×
256
      } as const
×
257

×
258
      // dead-letter
×
259
      await this.pgBoss?.createQueue(deadLetterName, {
×
260
        ...queueOptions,
×
261
        name: deadLetterName,
×
262
        retentionDays: 30,
×
263
        retryBackoff: true,
×
264
      })
×
265

×
266
      // // normal queue
×
267
      await this.pgBoss?.createQueue(queueName, {
×
268
        ...queueOptions,
×
269
        name: queueName,
×
270
        deadLetter: deadLetterName,
×
271
      })
×
272
    } catch {
×
273
      // no-op
×
274
    }
×
275

×
276
    return this.pollQueue(event, {
×
277
      concurrentTaskCount,
×
278
      onMessage,
×
279
      signal,
×
280
    })
×
281
  }
×
282

2✔
283
  protected static pollQueue(
2✔
284
    event: SubclassOfBaseClass,
×
285
    queueOpts: {
×
286
      concurrentTaskCount: number
×
287
      onMessage?: (job: Job) => void
×
288
      signal?: AbortSignal
×
289
    }
×
290
  ) {
×
291
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
292
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
293
    const batchSize =
×
294
      event.getWorkerOptions().batchSize ||
×
295
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
296

×
297
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
298
      type: 'queue',
×
299
      metadata: JSON.stringify({
×
300
        queueName: event.getQueueName(),
×
301
        batchSize,
×
302
        pollingInterval,
×
303
      }),
×
304
    })
×
305

×
306
    let started = false
×
307
    let jobFetched = 0
×
308

×
309
    const interval = setInterval(async () => {
×
310
      if (started) {
×
311
        return
×
312
      }
×
313

×
314
      try {
×
315
        started = true
×
316
        const defaultFetch = {
×
317
          includeMetadata: true,
×
318
          batchSize,
×
319
        }
×
320

×
321
        const currentBatch = defaultFetch.batchSize - jobFetched
×
322

×
323
        if (currentBatch <= 0) {
×
324
          return
×
325
        }
×
326

×
327
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
328
          ...event.getWorkerOptions(),
×
329
          ...defaultFetch,
×
330
          batchSize: currentBatch,
×
331
        })
×
332

×
333
        jobFetched += jobs?.length || 0
×
334

×
335
        if (jobFetched < defaultFetch.batchSize) {
×
336
          started = false
×
337
        }
×
338

×
339
        if (queueOpts.signal?.aborted) {
×
340
          started = false
×
341
          return
×
342
        }
×
343

×
344
        if (!jobs || (jobs && jobs.length === 0)) {
×
345
          started = false
×
346
          return
×
347
        }
×
348

×
349
        await Promise.allSettled(
×
350
          jobs.map(async (job) => {
×
351
            const lock = await semaphore.acquire()
×
352
            try {
×
353
              queueOpts.onMessage?.(job as Job)
×
354

×
355
              await event.handle(job, { signal: queueOpts.signal })
×
356

×
357
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
358
              queueJobCompleted.add(1, {
×
359
                name: event.getQueueName(),
×
360
              })
×
361
            } catch (e) {
×
362
              queueJobRetryFailed.add(1, {
×
363
                name: event.getQueueName(),
×
364
              })
×
365

×
366
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
367

×
368
              try {
×
369
                const dbJob: JobWithMetadata | null =
×
370
                  (job as JobWithMetadata).priority !== undefined
×
371
                    ? (job as JobWithMetadata)
×
372
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
373

×
374
                if (!dbJob) {
×
375
                  return
×
376
                }
×
377
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
378
                  queueJobError.add(1, {
×
379
                    name: event.getQueueName(),
×
380
                  })
×
381
                }
×
382
              } catch (e) {
×
383
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
384
                  type: 'queue-task',
×
385
                  error: e,
×
386
                  metadata: JSON.stringify(job),
×
387
                })
×
388
              }
×
389

×
390
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
391
                type: 'queue-task',
×
392
                error: e,
×
393
                metadata: JSON.stringify(job),
×
394
              })
×
395

×
396
              throw e
×
397
            } finally {
×
398
              jobFetched = Math.max(0, jobFetched - 1)
×
399
              await lock.release()
×
400
            }
×
401
          })
×
402
        )
×
403
      } catch (e) {
×
404
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
405
          type: 'queue',
×
406
          error: e,
×
407
          metadata: JSON.stringify({
×
408
            queueName: event.getQueueName(),
×
409
            batchSize,
×
410
            pollingInterval,
×
411
          }),
×
412
        })
×
413
      } finally {
×
414
        started = false
×
415
      }
×
416
    }, pollingInterval)
×
417

×
418
    queueOpts.signal?.addEventListener('abort', () => {
×
419
      clearInterval(interval)
×
420
    })
×
421
  }
×
422
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc