• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 22587723157

02 Mar 2026 05:29PM UTC coverage: 75.836% (-0.3%) from 76.153%
22587723157

push

github

web-flow
fix: add explicit metric shutdown for clean exit (#870)

There is no clean metric shutdown so jest doesn't
exit cleanly. Add shutdown in jest setup for clean exit.

Jest setup is using .env fallback so explicitly
disable metrics for .env.test sample.

Add missing vector bucket env var into test
config so that readme explanation works directly.

Drop old unused ENABLE_DEFAULT_METRICS env var.

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

3860 of 5528 branches covered (69.83%)

Branch coverage included in aggregate %.

22 of 68 new or added lines in 1 file covered. (32.35%)

369 existing lines in 33 files now uncovered.

26391 of 34362 relevant lines covered (76.8%)

189.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.17
/src/internal/queue/queue.ts
1
import { ERRORS } from '@internal/errors'
2✔
2
import { QueueDB } from '@internal/queue/database'
2✔
3
import { Semaphore } from '@shopify/semaphore'
2✔
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
2✔
5
import { getConfig } from '../../config'
2✔
6
import { logger, logSchema } from '../monitoring'
2✔
7
import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics'
2✔
8
import { Event } from './event'
2✔
9

2✔
10
// biome-ignore lint/suspicious/noExplicitAny: Event payload types are intentionally generic.
2✔
11
type SubclassOfBaseClass = (new (
2✔
12
  payload: any
2✔
13
) => Event<any>) & {
2✔
14
  [K in keyof typeof Event]: (typeof Event)[K]
2✔
15
}
2✔
16

2✔
17
export abstract class Queue {
2✔
18
  protected static events: SubclassOfBaseClass[] = []
2✔
19
  private static pgBoss?: PgBoss
2✔
20
  private static pgBossDb?: PgBoss.Db
2✔
21

2✔
22
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
2✔
23
    const {
×
24
      isMultitenant,
×
25
      databaseURL,
×
26
      multitenantDatabasePoolUrl,
×
27
      multitenantDatabaseUrl,
×
28
      pgQueueConnectionURL,
×
29
      pgQueueArchiveCompletedAfterSeconds,
×
30
      pgQueueDeleteAfterDays,
×
31
      pgQueueDeleteAfterHours,
×
32
      pgQueueRetentionDays,
×
33
    } = getConfig()
×
34

×
35
    let url = pgQueueConnectionURL ?? databaseURL
×
36
    let migrate = true
×
37

×
38
    if (isMultitenant && !pgQueueConnectionURL) {
×
39
      if (!multitenantDatabaseUrl) {
×
40
        throw new Error(
×
41
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
42
        )
×
43
      }
×
44
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
45

×
46
      if (multitenantDatabasePoolUrl) {
×
47
        migrate = false
×
48
      }
×
49
    }
×
50

×
51
    return new PgBoss({
×
52
      connectionString: url,
×
53
      migrate,
×
54
      db: opts.db,
×
55
      schema: 'pgboss_v10',
×
56
      application_name: 'storage-pgboss',
×
57
      ...(pgQueueDeleteAfterHours
×
58
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
59
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
60
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
61
      retentionDays: pgQueueRetentionDays,
×
62
      retryBackoff: true,
×
63
      retryLimit: 20,
×
64
      expireInHours: 23,
×
65
      maintenanceIntervalSeconds: 60 * 5,
×
66
      schedule: opts.enableWorkers,
×
67
      supervise: opts.enableWorkers,
×
UNCOV
68
    })
×
UNCOV
69
  }
×
70

2✔
71
  static async start(opts: {
2✔
72
    signal?: AbortSignal
×
73
    onMessage?: (job: Job) => void
×
74
    registerWorkers?: () => void
×
75
  }) {
×
76
    if (Queue.pgBoss) {
×
77
      return Queue.pgBoss
×
78
    }
×
79

×
80
    if (opts.signal?.aborted) {
×
81
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
82
    }
×
83

×
84
    const {
×
85
      isMultitenant,
×
86
      databaseURL,
×
87
      multitenantDatabaseUrl,
×
88
      multitenantDatabasePoolUrl,
×
89
      pgQueueConnectionURL,
×
90
      pgQueueEnableWorkers,
×
91
      pgQueueReadWriteTimeout,
×
92
      pgQueueConcurrentTasksPerQueue,
×
93
      pgQueueMaxConnections,
×
94
    } = getConfig()
×
95

×
96
    let url = pgQueueConnectionURL || databaseURL
×
97

×
98
    if (isMultitenant && !pgQueueConnectionURL) {
×
99
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
100
        throw new Error(
×
101
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
102
        )
×
103
      }
×
104
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
105
    }
×
106

×
107
    Queue.pgBossDb = new QueueDB({
×
108
      min: 0,
×
109
      max: pgQueueMaxConnections,
×
110
      connectionString: url,
×
111
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
112
    })
×
113

×
114
    Queue.pgBoss = this.createPgBoss({
×
115
      db: Queue.pgBossDb,
×
116
      enableWorkers: pgQueueEnableWorkers !== false,
×
117
    })
×
118

×
119
    Queue.pgBoss.on('error', (error) => {
×
120
      logSchema.error(logger, '[Queue] error', {
×
121
        type: 'queue',
×
122
        error,
×
123
      })
×
124
    })
×
125

×
126
    await Queue.pgBoss.start()
×
127

×
128
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
129
      opts.registerWorkers()
×
130
    }
×
131

×
132
    await Queue.callStart()
×
133
    await Queue.startWorkers({
×
134
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
135
      onMessage: opts.onMessage,
×
136
      signal: opts.signal,
×
137
    })
×
138

×
139
    if (opts.signal) {
×
140
      opts.signal.addEventListener(
×
141
        'abort',
×
142
        async () => {
×
143
          logSchema.info(logger, '[Queue] Stopping', {
×
144
            type: 'queue',
×
145
          })
×
146
          return Queue.stop()
×
147
            .then(async () => {
×
148
              logSchema.info(logger, '[Queue] Exited', {
×
149
                type: 'queue',
×
150
              })
×
151
            })
×
152
            .catch((e) => {
×
153
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
154
                error: e,
×
155
                type: 'queue',
×
156
              })
×
157
            })
×
158
            .finally(async () => {
×
159
              await Queue.callClose().catch(() => {
×
160
                // no-op
×
161
              })
×
162
            })
×
163
        },
×
164
        { once: true }
×
165
      )
×
166
    }
×
167

×
UNCOV
168
    return Queue.pgBoss
×
UNCOV
169
  }
×
170

2✔
171
  static getInstance() {
2✔
172
    if (!this.pgBoss) {
×
173
      throw new Error('pg boss not initialised')
×
174
    }
×
175

×
UNCOV
176
    return this.pgBoss
×
UNCOV
177
  }
×
178

2✔
179
  static getDb() {
2✔
180
    if (!this.pgBossDb) {
×
181
      throw new Error('pg boss not initialised')
×
182
    }
×
183

×
UNCOV
184
    return this.pgBossDb
×
UNCOV
185
  }
×
186

2✔
187
  static register<T extends SubclassOfBaseClass>(event: T) {
2✔
UNCOV
188
    Queue.events.push(event)
×
UNCOV
189
  }
×
190

2✔
191
  static async stop() {
2✔
192
    if (!this.pgBoss) {
×
193
      return
×
194
    }
×
195

×
196
    const boss = this.pgBoss
×
197
    const { isProduction } = getConfig()
×
198

×
199
    await boss.stop({
×
200
      timeout: 20 * 1000,
×
201
      graceful: isProduction,
×
202
      wait: true,
×
203
    })
×
204

×
205
    await new Promise((resolve) => {
×
206
      boss.once('stopped', async () => {
×
207
        await this.callClose()
×
208
        resolve(null)
×
209
      })
×
210
    })
×
211

×
UNCOV
212
    Queue.pgBoss = undefined
×
UNCOV
213
  }
×
214

2✔
215
  protected static async startWorkers(opts: {
2✔
216
    maxConcurrentTasks: number
×
217
    signal?: AbortSignal
×
218
    onMessage?: (job: Job) => void
×
219
  }) {
×
220
    for (const event of Queue.events) {
×
221
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
UNCOV
222
    }
×
UNCOV
223
  }
×
224

2✔
225
  protected static callStart() {
2✔
226
    const events = Queue.events.map((event) => {
×
227
      return event.onStart()
×
228
    })
×
229

×
UNCOV
230
    return Promise.all(events)
×
UNCOV
231
  }
×
232

2✔
233
  protected static callClose() {
2✔
234
    const events = Queue.events.map((event) => {
×
235
      return event.onClose()
×
236
    })
×
237

×
UNCOV
238
    return Promise.all(events)
×
UNCOV
239
  }
×
240

2✔
241
  protected static async registerTask(
2✔
242
    event: SubclassOfBaseClass,
×
243
    maxConcurrentTasks: number,
×
244
    onMessage?: (job: Job) => void,
×
245
    signal?: AbortSignal
×
246
  ) {
×
247
    const queueName = event.getQueueName()
×
248
    const deadLetterName = event.deadLetterQueueName()
×
249

×
250
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
251
    try {
×
252
      // Create dead-letter queue and the normal queue
×
253
      const queueOptions = {
×
254
        policy: 'standard',
×
255
        ...event.getQueueOptions(),
×
256
      } as const
×
257

×
258
      // dead-letter
×
259
      await this.pgBoss?.createQueue(deadLetterName, {
×
260
        ...queueOptions,
×
261
        name: deadLetterName,
×
262
        retentionDays: 30,
×
263
        retryBackoff: true,
×
264
      })
×
265

×
266
      // // normal queue
×
267
      await this.pgBoss?.createQueue(queueName, {
×
268
        ...queueOptions,
×
269
        name: queueName,
×
270
        deadLetter: deadLetterName,
×
271
      })
×
272
    } catch {
×
273
      // no-op
×
274
    }
×
275

×
276
    return this.pollQueue(event, {
×
277
      concurrentTaskCount,
×
278
      onMessage,
×
279
      signal,
×
UNCOV
280
    })
×
UNCOV
281
  }
×
282

2✔
283
  protected static pollQueue(
2✔
284
    event: SubclassOfBaseClass,
×
285
    queueOpts: {
×
286
      concurrentTaskCount: number
×
287
      onMessage?: (job: Job) => void
×
288
      signal?: AbortSignal
×
289
    }
×
290
  ) {
×
291
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
292
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
293
    const batchSize =
×
294
      event.getWorkerOptions().batchSize ||
×
295
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
296

×
297
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
298
      type: 'queue',
×
299
      metadata: JSON.stringify({
×
300
        queueName: event.getQueueName(),
×
301
        batchSize,
×
302
        pollingInterval,
×
303
      }),
×
304
    })
×
305

×
306
    let started = false
×
307
    let jobFetched = 0
×
308

×
309
    const interval = setInterval(async () => {
×
310
      if (started) {
×
311
        return
×
312
      }
×
313

×
314
      try {
×
315
        started = true
×
316
        const defaultFetch = {
×
317
          includeMetadata: true,
×
318
          batchSize,
×
319
        }
×
320

×
321
        const currentBatch = defaultFetch.batchSize - jobFetched
×
322

×
323
        if (currentBatch <= 0) {
×
324
          return
×
325
        }
×
326

×
327
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
328
          ...event.getWorkerOptions(),
×
329
          ...defaultFetch,
×
330
          batchSize: currentBatch,
×
331
        })
×
332

×
333
        jobFetched += jobs?.length || 0
×
334

×
335
        if (jobFetched < defaultFetch.batchSize) {
×
336
          started = false
×
337
        }
×
338

×
339
        if (queueOpts.signal?.aborted) {
×
340
          started = false
×
341
          return
×
342
        }
×
343

×
344
        if (!jobs || (jobs && jobs.length === 0)) {
×
345
          started = false
×
346
          return
×
347
        }
×
348

×
349
        await Promise.allSettled(
×
350
          jobs.map(async (job) => {
×
351
            const lock = await semaphore.acquire()
×
352
            const opts = event.getQueueOptions()
×
353
            try {
×
354
              queueOpts.onMessage?.(job as Job)
×
355

×
356
              await event.handle(job, { signal: queueOpts.signal })
×
357

×
358
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
359
              queueJobCompleted.add(1, {
×
360
                name: event.getQueueName(),
×
361
              })
×
362
            } catch (e) {
×
363
              queueJobRetryFailed.add(1, {
×
364
                name: event.getQueueName(),
×
365
              })
×
366

×
367
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
368

×
369
              try {
×
370
                const dbJob: JobWithMetadata | null =
×
371
                  (job as JobWithMetadata).priority !== undefined
×
372
                    ? (job as JobWithMetadata)
×
373
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
374

×
375
                if (!dbJob) {
×
376
                  return
×
377
                }
×
378
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
379
                  queueJobError.add(1, {
×
380
                    name: event.getQueueName(),
×
381
                  })
×
382
                }
×
383
              } catch (e) {
×
384
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
385
                  type: 'queue-task',
×
386
                  error: e,
×
387
                  metadata: JSON.stringify(job),
×
388
                })
×
389
              }
×
390

×
391
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
392
                type: 'queue-task',
×
393
                error: e,
×
394
                metadata: JSON.stringify(job),
×
395
              })
×
396

×
397
              throw e
×
398
            } finally {
×
399
              jobFetched = Math.max(0, jobFetched - 1)
×
400
              await lock.release()
×
401
            }
×
402
          })
×
403
        )
×
404
      } catch (e) {
×
405
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
406
          type: 'queue',
×
407
          error: e,
×
408
          metadata: JSON.stringify({
×
409
            queueName: event.getQueueName(),
×
410
            batchSize,
×
411
            pollingInterval,
×
412
          }),
×
413
        })
×
414
      } finally {
×
415
        started = false
×
416
      }
×
417
    }, pollingInterval)
×
418

×
419
    queueOpts.signal?.addEventListener('abort', () => {
×
420
      clearInterval(interval)
×
UNCOV
421
    })
×
UNCOV
422
  }
×
423
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc