• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 21516573288

30 Jan 2026 12:56PM UTC coverage: 75.548% (-0.4%) from 75.944%
21516573288

push

github

web-flow
feat: otel metrics (#819)

2112 of 3066 branches covered (68.88%)

Branch coverage included in aggregate %.

673 of 1285 new or added lines in 33 files covered. (52.37%)

2 existing lines in 2 files now uncovered.

25902 of 34015 relevant lines covered (76.15%)

94.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.34
/src/internal/queue/queue.ts
1
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { queueJobRetryFailed, queueJobCompleted, queueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
const { region } = getConfig()
1✔
11

1✔
12
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
13
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
14
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
15
}
1✔
16

1✔
17
export abstract class Queue {
1✔
18
  protected static events: SubclassOfBaseClass[] = []
1✔
19
  private static pgBoss?: PgBoss
1✔
20
  private static pgBossDb?: PgBoss.Db
1✔
21

1✔
22
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
1✔
23
    const {
×
24
      isMultitenant,
×
25
      databaseURL,
×
26
      multitenantDatabaseUrl,
×
27
      pgQueueConnectionURL,
×
28
      pgQueueArchiveCompletedAfterSeconds,
×
29
      pgQueueDeleteAfterDays,
×
30
      pgQueueDeleteAfterHours,
×
31
      pgQueueRetentionDays,
×
32
    } = getConfig()
×
33

×
34
    let url = pgQueueConnectionURL ?? databaseURL
×
35

×
36
    if (isMultitenant && !pgQueueConnectionURL) {
×
37
      if (!multitenantDatabaseUrl) {
×
38
        throw new Error(
×
39
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
40
        )
×
41
      }
×
42
      url = multitenantDatabaseUrl
×
43
    }
×
44

×
45
    return new PgBoss({
×
46
      connectionString: url,
×
47
      migrate: true,
×
48
      db: opts.db,
×
49
      schema: 'pgboss_v10',
×
50
      application_name: 'storage-pgboss',
×
51
      ...(pgQueueDeleteAfterHours
×
52
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
53
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
54
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
55
      retentionDays: pgQueueRetentionDays,
×
56
      retryBackoff: true,
×
57
      retryLimit: 20,
×
58
      expireInHours: 23,
×
59
      maintenanceIntervalSeconds: 60 * 5,
×
60
      schedule: opts.enableWorkers,
×
61
      supervise: opts.enableWorkers,
×
62
    })
×
63
  }
×
64

1✔
65
  static async start(opts: {
1✔
66
    signal?: AbortSignal
×
67
    onMessage?: (job: Job) => void
×
68
    registerWorkers?: () => void
×
69
  }) {
×
70
    if (Queue.pgBoss) {
×
71
      return Queue.pgBoss
×
72
    }
×
73

×
74
    if (opts.signal?.aborted) {
×
75
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
76
    }
×
77

×
78
    const {
×
79
      isMultitenant,
×
80
      databaseURL,
×
81
      multitenantDatabaseUrl,
×
82
      pgQueueConnectionURL,
×
83
      pgQueueEnableWorkers,
×
84
      pgQueueReadWriteTimeout,
×
85
      pgQueueConcurrentTasksPerQueue,
×
86
      pgQueueMaxConnections,
×
87
    } = getConfig()
×
88

×
89
    let url = pgQueueConnectionURL || databaseURL
×
90

×
91
    if (isMultitenant && !pgQueueConnectionURL) {
×
92
      if (!multitenantDatabaseUrl) {
×
93
        throw new Error(
×
94
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
95
        )
×
96
      }
×
97
      url = multitenantDatabaseUrl
×
98
    }
×
99

×
100
    Queue.pgBossDb = new QueueDB({
×
101
      min: 0,
×
102
      max: pgQueueMaxConnections,
×
103
      connectionString: url,
×
104
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
105
    })
×
106

×
107
    Queue.pgBoss = this.createPgBoss({
×
108
      db: Queue.pgBossDb,
×
109
      enableWorkers: pgQueueEnableWorkers !== false,
×
110
    })
×
111

×
112
    Queue.pgBoss.on('error', (error) => {
×
113
      logSchema.error(logger, '[Queue] error', {
×
114
        type: 'queue',
×
115
        error,
×
116
      })
×
117
    })
×
118

×
119
    await Queue.pgBoss.start()
×
120

×
121
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
122
      opts.registerWorkers()
×
123
    }
×
124

×
125
    await Queue.callStart()
×
126
    await Queue.startWorkers({
×
127
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
128
      onMessage: opts.onMessage,
×
129
      signal: opts.signal,
×
130
    })
×
131

×
132
    if (opts.signal) {
×
133
      opts.signal.addEventListener(
×
134
        'abort',
×
135
        async () => {
×
136
          logSchema.info(logger, '[Queue] Stopping', {
×
137
            type: 'queue',
×
138
          })
×
139
          return Queue.stop()
×
140
            .then(async () => {
×
141
              logSchema.info(logger, '[Queue] Exited', {
×
142
                type: 'queue',
×
143
              })
×
144
            })
×
145
            .catch((e) => {
×
146
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
147
                error: e,
×
148
                type: 'queue',
×
149
              })
×
150
            })
×
151
            .finally(async () => {
×
152
              await Queue.callClose().catch(() => {
×
153
                // no-op
×
154
              })
×
155
            })
×
156
        },
×
157
        { once: true }
×
158
      )
×
159
    }
×
160

×
161
    return Queue.pgBoss
×
162
  }
×
163

1✔
164
  static getInstance() {
1✔
165
    if (!this.pgBoss) {
×
166
      throw new Error('pg boss not initialised')
×
167
    }
×
168

×
169
    return this.pgBoss
×
170
  }
×
171

1✔
172
  static getDb() {
1✔
173
    if (!this.pgBossDb) {
×
174
      throw new Error('pg boss not initialised')
×
175
    }
×
176

×
177
    return this.pgBossDb
×
178
  }
×
179

1✔
180
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
181
    Queue.events.push(event)
×
182
  }
×
183

1✔
184
  static async stop() {
1✔
185
    if (!this.pgBoss) {
×
186
      return
×
187
    }
×
188

×
189
    const boss = this.pgBoss
×
190
    const { isProduction } = getConfig()
×
191

×
192
    await boss.stop({
×
193
      timeout: 20 * 1000,
×
194
      graceful: isProduction,
×
195
      wait: true,
×
196
    })
×
197

×
198
    await new Promise((resolve) => {
×
199
      boss.once('stopped', async () => {
×
200
        await this.callClose()
×
201
        resolve(null)
×
202
      })
×
203
    })
×
204

×
205
    Queue.pgBoss = undefined
×
206
  }
×
207

1✔
208
  protected static async startWorkers(opts: {
1✔
209
    maxConcurrentTasks: number
×
210
    signal?: AbortSignal
×
211
    onMessage?: (job: Job) => void
×
212
  }) {
×
213
    for (const event of Queue.events) {
×
214
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
215
    }
×
216
  }
×
217

1✔
218
  protected static callStart() {
1✔
219
    const events = Queue.events.map((event) => {
×
220
      return event.onStart()
×
221
    })
×
222

×
223
    return Promise.all(events)
×
224
  }
×
225

1✔
226
  protected static callClose() {
1✔
227
    const events = Queue.events.map((event) => {
×
228
      return event.onClose()
×
229
    })
×
230

×
231
    return Promise.all(events)
×
232
  }
×
233

1✔
234
  protected static async registerTask(
1✔
235
    event: SubclassOfBaseClass,
×
236
    maxConcurrentTasks: number,
×
237
    onMessage?: (job: Job) => void,
×
238
    signal?: AbortSignal
×
239
  ) {
×
240
    const queueName = event.getQueueName()
×
241
    const deadLetterName = event.deadLetterQueueName()
×
242

×
243
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
244
    try {
×
245
      // Create dead-letter queue and the normal queue
×
246
      const queueOptions = {
×
247
        policy: 'standard',
×
248
        ...event.getQueueOptions(),
×
249
      } as const
×
250

×
251
      // dead-letter
×
252
      await this.pgBoss?.createQueue(deadLetterName, {
×
253
        ...queueOptions,
×
254
        name: deadLetterName,
×
255
        retentionDays: 30,
×
256
        retryBackoff: true,
×
257
      })
×
258

×
259
      // // normal queue
×
260
      await this.pgBoss?.createQueue(queueName, {
×
261
        ...queueOptions,
×
262
        name: queueName,
×
263
        deadLetter: deadLetterName,
×
264
      })
×
265
    } catch {
×
266
      // no-op
×
267
    }
×
268

×
269
    return this.pollQueue(event, {
×
270
      concurrentTaskCount,
×
271
      onMessage,
×
272
      signal,
×
273
    })
×
274
  }
×
275

1✔
276
  protected static pollQueue(
1✔
277
    event: SubclassOfBaseClass,
×
278
    queueOpts: {
×
279
      concurrentTaskCount: number
×
280
      onMessage?: (job: Job) => void
×
281
      signal?: AbortSignal
×
282
    }
×
283
  ) {
×
284
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
285
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
286
    const batchSize =
×
287
      event.getWorkerOptions().batchSize ||
×
288
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
289

×
290
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
291
      type: 'queue',
×
292
      metadata: JSON.stringify({
×
293
        queueName: event.getQueueName(),
×
294
        batchSize: batchSize,
×
295
        pollingInterval: pollingInterval,
×
296
      }),
×
297
    })
×
298

×
299
    let started = false
×
300
    let jobFetched = 0
×
301

×
302
    const interval = setInterval(async () => {
×
303
      if (started) {
×
304
        return
×
305
      }
×
306

×
307
      try {
×
308
        started = true
×
309
        const defaultFetch = {
×
310
          includeMetadata: true,
×
311
          batchSize,
×
312
        }
×
313

×
314
        const currentBatch = defaultFetch.batchSize - jobFetched
×
315

×
316
        if (currentBatch <= 0) {
×
317
          return
×
318
        }
×
319

×
320
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
321
          ...event.getWorkerOptions(),
×
322
          ...defaultFetch,
×
323
          batchSize: currentBatch,
×
324
        })
×
325

×
326
        jobFetched += jobs?.length || 0
×
327

×
328
        if (jobFetched < defaultFetch.batchSize) {
×
329
          started = false
×
330
        }
×
331

×
332
        if (queueOpts.signal?.aborted) {
×
333
          started = false
×
334
          return
×
335
        }
×
336

×
337
        if (!jobs || (jobs && jobs.length === 0)) {
×
338
          started = false
×
339
          return
×
340
        }
×
341

×
342
        await Promise.allSettled(
×
343
          jobs.map(async (job) => {
×
344
            const lock = await semaphore.acquire()
×
345
            const opts = event.getQueueOptions()
×
346
            try {
×
347
              queueOpts.onMessage?.(job as Job)
×
348

×
349
              await event.handle(job, { signal: queueOpts.signal })
×
350

×
351
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
NEW
352
              queueJobCompleted.add(1, {
×
353
                name: event.getQueueName(),
×
354
              })
×
355
            } catch (e) {
×
NEW
356
              queueJobRetryFailed.add(1, {
×
357
                name: event.getQueueName(),
×
358
              })
×
359

×
360
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
361

×
362
              try {
×
363
                const dbJob: JobWithMetadata | null =
×
364
                  (job as JobWithMetadata).priority !== undefined
×
365
                    ? (job as JobWithMetadata)
×
366
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
367

×
368
                if (!dbJob) {
×
369
                  return
×
370
                }
×
371
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
NEW
372
                  queueJobError.add(1, {
×
373
                    name: event.getQueueName(),
×
374
                  })
×
375
                }
×
376
              } catch (e) {
×
377
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
378
                  type: 'queue-task',
×
379
                  error: e,
×
380
                  metadata: JSON.stringify(job),
×
381
                })
×
382
              }
×
383

×
384
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
385
                type: 'queue-task',
×
386
                error: e,
×
387
                metadata: JSON.stringify(job),
×
388
              })
×
389

×
390
              throw e
×
391
            } finally {
×
392
              jobFetched = Math.max(0, jobFetched - 1)
×
393
              await lock.release()
×
394
            }
×
395
          })
×
396
        )
×
397
      } catch (e) {
×
398
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
399
          type: 'queue',
×
400
          error: e,
×
401
          metadata: JSON.stringify({
×
402
            queueName: event.getQueueName(),
×
403
            batchSize,
×
404
            pollingInterval,
×
405
          }),
×
406
        })
×
407
      } finally {
×
408
        started = false
×
409
      }
×
410
    }, pollingInterval)
×
411

×
412
    queueOpts.signal?.addEventListener('abort', () => {
×
413
      clearInterval(interval)
×
414
    })
×
415
  }
×
416
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc