• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 19363040917

14 Nov 2025 11:21AM UTC coverage: 75.913% (-1.8%) from 77.76%
19363040917

Pull #796

github

web-flow
Merge 0bc327f4f into b9acc7ca1
Pull Request #796: feat: add analytics bucket sharding

2074 of 3009 branches covered (68.93%)

Branch coverage included in aggregate %.

945 of 2098 new or added lines in 36 files covered. (45.04%)

50 existing lines in 7 files now uncovered.

25386 of 33164 relevant lines covered (76.55%)

94.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.9
/src/internal/queue/queue.ts
1
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
11
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
12
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
13
}
1✔
14

1✔
15
export abstract class Queue {
1✔
16
  protected static events: SubclassOfBaseClass[] = []
1✔
17
  private static pgBoss?: PgBoss
1✔
18
  private static pgBossDb?: PgBoss.Db
1✔
19

1✔
20
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
1✔
NEW
21
    const {
×
NEW
22
      isMultitenant,
×
NEW
23
      databaseURL,
×
NEW
24
      multitenantDatabaseUrl,
×
NEW
25
      pgQueueConnectionURL,
×
NEW
26
      pgQueueArchiveCompletedAfterSeconds,
×
NEW
27
      pgQueueDeleteAfterDays,
×
NEW
28
      pgQueueDeleteAfterHours,
×
NEW
29
      pgQueueRetentionDays,
×
NEW
30
    } = getConfig()
×
NEW
31

×
NEW
32
    let url = pgQueueConnectionURL ?? databaseURL
×
NEW
33

×
NEW
34
    if (isMultitenant && !pgQueueConnectionURL) {
×
NEW
35
      if (!multitenantDatabaseUrl) {
×
NEW
36
        throw new Error(
×
NEW
37
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
NEW
38
        )
×
NEW
39
      }
×
NEW
40
      url = multitenantDatabaseUrl
×
NEW
41
    }
×
NEW
42

×
NEW
43
    return new PgBoss({
×
NEW
44
      connectionString: url,
×
NEW
45
      migrate: true,
×
NEW
46
      db: opts.db,
×
NEW
47
      schema: 'pgboss_v10',
×
NEW
48
      application_name: 'storage-pgboss',
×
NEW
49
      ...(pgQueueDeleteAfterHours
×
NEW
50
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
NEW
51
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
NEW
52
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
NEW
53
      retentionDays: pgQueueRetentionDays,
×
NEW
54
      retryBackoff: true,
×
NEW
55
      retryLimit: 20,
×
NEW
56
      expireInHours: 23,
×
NEW
57
      maintenanceIntervalSeconds: 60 * 5,
×
NEW
58
      schedule: opts.enableWorkers,
×
NEW
59
      supervise: opts.enableWorkers,
×
NEW
60
    })
×
NEW
61
  }
×
62

1✔
63
  static async start(opts: {
1✔
64
    signal?: AbortSignal
×
65
    onMessage?: (job: Job) => void
×
66
    registerWorkers?: () => void
×
67
  }) {
×
68
    if (Queue.pgBoss) {
×
69
      return Queue.pgBoss
×
70
    }
×
71

×
72
    if (opts.signal?.aborted) {
×
73
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
74
    }
×
75

×
76
    const {
×
77
      isMultitenant,
×
78
      databaseURL,
×
79
      multitenantDatabaseUrl,
×
80
      pgQueueConnectionURL,
×
81
      pgQueueEnableWorkers,
×
82
      pgQueueReadWriteTimeout,
×
83
      pgQueueConcurrentTasksPerQueue,
×
84
      pgQueueMaxConnections,
×
85
    } = getConfig()
×
86

×
NEW
87
    let url = pgQueueConnectionURL || databaseURL
×
88

×
89
    if (isMultitenant && !pgQueueConnectionURL) {
×
90
      if (!multitenantDatabaseUrl) {
×
91
        throw new Error(
×
92
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
93
        )
×
94
      }
×
95
      url = multitenantDatabaseUrl
×
96
    }
×
97

×
98
    Queue.pgBossDb = new QueueDB({
×
99
      min: 0,
×
100
      max: pgQueueMaxConnections,
×
101
      connectionString: url,
×
102
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
103
    })
×
104

×
NEW
105
    Queue.pgBoss = this.createPgBoss({
×
106
      db: Queue.pgBossDb,
×
NEW
107
      enableWorkers: pgQueueEnableWorkers !== false,
×
108
    })
×
109

×
110
    Queue.pgBoss.on('error', (error) => {
×
111
      logSchema.error(logger, '[Queue] error', {
×
112
        type: 'queue',
×
113
        error,
×
114
      })
×
115
    })
×
116

×
117
    await Queue.pgBoss.start()
×
118

×
119
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
120
      opts.registerWorkers()
×
121
    }
×
122

×
123
    await Queue.callStart()
×
124
    await Queue.startWorkers({
×
125
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
126
      onMessage: opts.onMessage,
×
127
      signal: opts.signal,
×
128
    })
×
129

×
130
    if (opts.signal) {
×
131
      opts.signal.addEventListener(
×
132
        'abort',
×
133
        async () => {
×
134
          logSchema.info(logger, '[Queue] Stopping', {
×
135
            type: 'queue',
×
136
          })
×
137
          return Queue.stop()
×
138
            .then(async () => {
×
139
              logSchema.info(logger, '[Queue] Exited', {
×
140
                type: 'queue',
×
141
              })
×
142
            })
×
143
            .catch((e) => {
×
144
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
145
                error: e,
×
146
                type: 'queue',
×
147
              })
×
148
            })
×
149
            .finally(async () => {
×
150
              await Queue.callClose().catch(() => {
×
151
                // no-op
×
152
              })
×
153
            })
×
154
        },
×
155
        { once: true }
×
156
      )
×
157
    }
×
158

×
159
    return Queue.pgBoss
×
160
  }
×
161

1✔
162
  static getInstance() {
1✔
163
    if (!this.pgBoss) {
×
164
      throw new Error('pg boss not initialised')
×
165
    }
×
166

×
167
    return this.pgBoss
×
168
  }
×
169

1✔
170
  static getDb() {
1✔
171
    if (!this.pgBossDb) {
×
172
      throw new Error('pg boss not initialised')
×
173
    }
×
174

×
175
    return this.pgBossDb
×
176
  }
×
177

1✔
178
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
179
    Queue.events.push(event)
×
180
  }
×
181

1✔
182
  static async stop() {
1✔
183
    if (!this.pgBoss) {
×
184
      return
×
185
    }
×
186

×
187
    const boss = this.pgBoss
×
188
    const { isProduction } = getConfig()
×
189

×
190
    await boss.stop({
×
191
      timeout: 20 * 1000,
×
192
      graceful: isProduction,
×
193
      wait: true,
×
194
    })
×
195

×
196
    await new Promise((resolve) => {
×
197
      boss.once('stopped', async () => {
×
198
        await this.callClose()
×
199
        resolve(null)
×
200
      })
×
201
    })
×
202

×
203
    Queue.pgBoss = undefined
×
204
  }
×
205

1✔
206
  protected static async startWorkers(opts: {
1✔
207
    maxConcurrentTasks: number
×
208
    signal?: AbortSignal
×
209
    onMessage?: (job: Job) => void
×
210
  }) {
×
211
    for (const event of Queue.events) {
×
212
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
213
    }
×
214
  }
×
215

1✔
216
  protected static callStart() {
1✔
217
    const events = Queue.events.map((event) => {
×
218
      return event.onStart()
×
219
    })
×
220

×
221
    return Promise.all(events)
×
222
  }
×
223

1✔
224
  protected static callClose() {
1✔
225
    const events = Queue.events.map((event) => {
×
226
      return event.onClose()
×
227
    })
×
228

×
229
    return Promise.all(events)
×
230
  }
×
231

1✔
232
  protected static async registerTask(
1✔
233
    event: SubclassOfBaseClass,
×
234
    maxConcurrentTasks: number,
×
235
    onMessage?: (job: Job) => void,
×
236
    signal?: AbortSignal
×
237
  ) {
×
238
    const queueName = event.getQueueName()
×
239
    const deadLetterName = event.deadLetterQueueName()
×
240

×
241
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
242
    try {
×
243
      // Create dead-letter queue and the normal queue
×
244
      const queueOptions = {
×
245
        policy: 'standard',
×
246
        ...event.getQueueOptions(),
×
247
      } as const
×
248

×
249
      // dead-letter
×
250
      await this.pgBoss?.createQueue(deadLetterName, {
×
251
        ...queueOptions,
×
252
        name: deadLetterName,
×
253
        retentionDays: 30,
×
254
        retryBackoff: true,
×
255
      })
×
256

×
257
      // // normal queue
×
258
      await this.pgBoss?.createQueue(queueName, {
×
259
        ...queueOptions,
×
260
        name: queueName,
×
261
        deadLetter: deadLetterName,
×
262
      })
×
263
    } catch {
×
264
      // no-op
×
265
    }
×
266

×
267
    return this.pollQueue(event, {
×
268
      concurrentTaskCount,
×
269
      onMessage,
×
270
      signal,
×
271
    })
×
272
  }
×
273

1✔
274
  protected static pollQueue(
1✔
275
    event: SubclassOfBaseClass,
×
276
    queueOpts: {
×
277
      concurrentTaskCount: number
×
278
      onMessage?: (job: Job) => void
×
279
      signal?: AbortSignal
×
280
    }
×
281
  ) {
×
282
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
283
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
284
    const batchSize =
×
285
      event.getWorkerOptions().batchSize ||
×
286
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
287

×
288
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
289
      type: 'queue',
×
290
      metadata: JSON.stringify({
×
291
        queueName: event.getQueueName(),
×
292
        batchSize: batchSize,
×
293
        pollingInterval: pollingInterval,
×
294
      }),
×
295
    })
×
296

×
297
    let started = false
×
298
    let jobFetched = 0
×
299

×
300
    const interval = setInterval(async () => {
×
301
      if (started) {
×
302
        return
×
303
      }
×
304

×
305
      try {
×
306
        started = true
×
307
        const defaultFetch = {
×
308
          includeMetadata: true,
×
309
          batchSize,
×
310
        }
×
311

×
312
        const currentBatch = defaultFetch.batchSize - jobFetched
×
313

×
314
        if (currentBatch <= 0) {
×
315
          return
×
316
        }
×
317

×
318
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
319
          ...event.getWorkerOptions(),
×
320
          ...defaultFetch,
×
321
          batchSize: currentBatch,
×
322
        })
×
323

×
324
        jobFetched += jobs?.length || 0
×
325

×
326
        if (jobFetched < defaultFetch.batchSize) {
×
327
          started = false
×
328
        }
×
329

×
330
        if (queueOpts.signal?.aborted) {
×
331
          started = false
×
332
          return
×
333
        }
×
334

×
335
        if (!jobs || (jobs && jobs.length === 0)) {
×
336
          started = false
×
337
          return
×
338
        }
×
339

×
340
        await Promise.allSettled(
×
341
          jobs.map(async (job) => {
×
342
            const lock = await semaphore.acquire()
×
NEW
343
            const opts = event.getQueueOptions()
×
344
            try {
×
345
              queueOpts.onMessage?.(job as Job)
×
346

×
347
              await event.handle(job, { signal: queueOpts.signal })
×
348

×
349
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
350
              QueueJobCompleted.inc({
×
351
                name: event.getQueueName(),
×
352
              })
×
353
            } catch (e) {
×
354
              QueueJobRetryFailed.inc({
×
355
                name: event.getQueueName(),
×
356
              })
×
357

×
358
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
359

×
360
              try {
×
361
                const dbJob: JobWithMetadata | null =
×
362
                  (job as JobWithMetadata).priority !== undefined
×
363
                    ? (job as JobWithMetadata)
×
364
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
365

×
366
                if (!dbJob) {
×
367
                  return
×
368
                }
×
369
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
370
                  QueueJobError.inc({
×
371
                    name: event.getQueueName(),
×
372
                  })
×
373
                }
×
374
              } catch (e) {
×
375
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
376
                  type: 'queue-task',
×
377
                  error: e,
×
378
                  metadata: JSON.stringify(job),
×
379
                })
×
380
              }
×
381

×
382
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
383
                type: 'queue-task',
×
384
                error: e,
×
385
                metadata: JSON.stringify(job),
×
386
              })
×
387

×
388
              throw e
×
389
            } finally {
×
390
              jobFetched = Math.max(0, jobFetched - 1)
×
391
              await lock.release()
×
392
            }
×
393
          })
×
394
        )
×
395
      } catch (e) {
×
396
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
397
          type: 'queue',
×
398
          error: e,
×
399
          metadata: JSON.stringify({
×
400
            queueName: event.getQueueName(),
×
401
            batchSize,
×
402
            pollingInterval,
×
403
          }),
×
404
        })
×
405
      } finally {
×
406
        started = false
×
407
      }
×
408
    }, pollingInterval)
×
409

×
410
    queueOpts.signal?.addEventListener('abort', () => {
×
411
      clearInterval(interval)
×
412
    })
×
413
  }
×
414
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc