• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 15461122357

05 Jun 2025 07:29AM UTC coverage: 77.79% (-0.4%) from 78.159%
15461122357

push

github

web-flow
feat: pgboss v10 (#696)

1542 of 2144 branches covered (71.92%)

Branch coverage included in aggregate %.

172 of 475 new or added lines in 28 files covered. (36.21%)

5 existing lines in 3 files now uncovered.

17389 of 22192 relevant lines covered (78.36%)

110.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.43
/src/internal/queue/queue.ts
1
import PgBoss, { Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
11
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
12
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
13
}
1✔
14

1✔
15
export abstract class Queue {
1✔
16
  protected static events: SubclassOfBaseClass[] = []
1✔
17
  private static pgBoss?: PgBoss
1✔
18

1✔
19
  static async start(opts: {
1✔
20
    signal?: AbortSignal
×
21
    onMessage?: (job: Job) => void
×
22
    registerWorkers?: () => void
×
23
  }) {
×
24
    if (Queue.pgBoss) {
×
25
      return Queue.pgBoss
×
26
    }
×
27

×
28
    if (opts.signal?.aborted) {
×
29
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
30
    }
×
31

×
32
    const {
×
33
      isMultitenant,
×
34
      databaseURL,
×
35
      multitenantDatabaseUrl,
×
36
      pgQueueConnectionURL,
×
NEW
37
      pgQueueArchiveCompletedAfterSeconds,
×
38
      pgQueueDeleteAfterDays,
×
39
      pgQueueDeleteAfterHours,
×
40
      pgQueueRetentionDays,
×
41
      pgQueueEnableWorkers,
×
42
      pgQueueReadWriteTimeout,
×
NEW
43
      pgQueueConcurrentTasksPerQueue,
×
44
      pgQueueMaxConnections,
×
45
    } = getConfig()
×
46

×
47
    let url = pgQueueConnectionURL ?? databaseURL
×
48

×
49
    if (isMultitenant && !pgQueueConnectionURL) {
×
50
      if (!multitenantDatabaseUrl) {
×
51
        throw new Error(
×
52
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
53
        )
×
54
      }
×
55
      url = multitenantDatabaseUrl
×
56
    }
×
57

×
58
    Queue.pgBoss = new PgBoss({
×
59
      connectionString: url,
×
NEW
60

×
61
      db: new QueueDB({
×
62
        min: 0,
×
63
        max: pgQueueMaxConnections,
×
64
        connectionString: url,
×
65
        statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
66
      }),
×
NEW
67
      schema: 'pgboss_v10',
×
68
      application_name: 'storage-pgboss',
×
NEW
69
      ...(pgQueueDeleteAfterHours
×
NEW
70
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
NEW
71
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
72
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
73
      retentionDays: pgQueueRetentionDays,
×
74
      retryBackoff: true,
×
75
      retryLimit: 20,
×
NEW
76
      expireInHours: 23,
×
NEW
77
      maintenanceIntervalSeconds: 60 * 5,
×
NEW
78
      schedule: pgQueueEnableWorkers !== false,
×
NEW
79
      supervise: pgQueueEnableWorkers !== false,
×
80
    })
×
81

×
82
    Queue.pgBoss.on('error', (error) => {
×
83
      logSchema.error(logger, '[Queue] error', {
×
84
        type: 'queue',
×
85
        error,
×
86
      })
×
87
    })
×
88

×
89
    await Queue.pgBoss.start()
×
90

×
91
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
92
      opts.registerWorkers()
×
93
    }
×
94

×
95
    await Queue.callStart()
×
NEW
96
    await Queue.startWorkers({
×
NEW
97
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
NEW
98
      onMessage: opts.onMessage,
×
NEW
99
      signal: opts.signal,
×
NEW
100
    })
×
101

×
102
    if (opts.signal) {
×
103
      opts.signal.addEventListener(
×
104
        'abort',
×
105
        async () => {
×
106
          logSchema.info(logger, '[Queue] Stopping', {
×
107
            type: 'queue',
×
108
          })
×
109
          return Queue.stop()
×
110
            .then(async () => {
×
111
              logSchema.info(logger, '[Queue] Exited', {
×
112
                type: 'queue',
×
113
              })
×
114
            })
×
115
            .catch((e) => {
×
116
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
117
                error: e,
×
118
                type: 'queue',
×
119
              })
×
120
            })
×
121
            .finally(async () => {
×
122
              await Queue.callClose().catch(() => {
×
123
                // no-op
×
124
              })
×
125
            })
×
126
        },
×
127
        { once: true }
×
128
      )
×
129
    }
×
130

×
131
    return Queue.pgBoss
×
132
  }
×
133

1✔
134
  static getInstance() {
1✔
135
    if (!this.pgBoss) {
×
136
      throw new Error('pg boss not initialised')
×
137
    }
×
138

×
139
    return this.pgBoss
×
140
  }
×
141

1✔
142
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
143
    Queue.events.push(event)
×
144
  }
×
145

1✔
146
  static async stop() {
1✔
147
    if (!this.pgBoss) {
×
148
      return
×
149
    }
×
150

×
151
    const boss = this.pgBoss
×
152
    const { isProduction } = getConfig()
×
153

×
154
    await boss.stop({
×
155
      timeout: 20 * 1000,
×
156
      graceful: isProduction,
×
NEW
157
      wait: true,
×
158
    })
×
159

×
160
    await new Promise((resolve) => {
×
161
      boss.once('stopped', async () => {
×
162
        await this.callClose()
×
163
        resolve(null)
×
164
      })
×
165
    })
×
166

×
167
    Queue.pgBoss = undefined
×
168
  }
×
169

1✔
170
  protected static startWorkers(opts: {
1✔
NEW
171
    maxConcurrentTasks: number
×
NEW
172
    signal?: AbortSignal
×
NEW
173
    onMessage?: (job: Job) => void
×
NEW
174
  }) {
×
NEW
175
    const workers: Promise<any>[] = []
×
176

×
177
    Queue.events.forEach((event) => {
×
NEW
178
      workers.push(Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal))
×
179
    })
×
180

×
181
    return Promise.all(workers)
×
182
  }
×
183

1✔
184
  protected static callStart() {
1✔
185
    const events = Queue.events.map((event) => {
×
186
      return event.onStart()
×
187
    })
×
188

×
189
    return Promise.all(events)
×
190
  }
×
191

1✔
192
  protected static callClose() {
1✔
193
    const events = Queue.events.map((event) => {
×
194
      return event.onClose()
×
195
    })
×
196

×
197
    return Promise.all(events)
×
198
  }
×
199

1✔
200
  protected static async registerTask(
1✔
201
    event: SubclassOfBaseClass,
×
NEW
202
    maxConcurrentTasks: number,
×
NEW
203
    onMessage?: (job: Job) => void,
×
NEW
204
    signal?: AbortSignal
×
205
  ) {
×
NEW
206
    const queueName = event.getQueueName()
×
NEW
207
    const deadLetterName = event.deadLetterQueueName()
×
NEW
208

×
NEW
209
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
NEW
210
    try {
×
NEW
211
      // Create dead-letter queue and the normal queue
×
NEW
212
      const queueOptions = {
×
NEW
213
        policy: 'standard',
×
NEW
214
        ...event.getQueueOptions(),
×
NEW
215
      } as const
×
NEW
216

×
NEW
217
      // dead-letter
×
NEW
218
      await this.pgBoss?.createQueue(deadLetterName, {
×
NEW
219
        ...queueOptions,
×
NEW
220
        name: deadLetterName,
×
NEW
221
        retentionDays: 30,
×
NEW
222
        retryBackoff: true,
×
NEW
223
      })
×
224

×
NEW
225
      // normal queue
×
NEW
226
      await this.pgBoss?.createQueue(queueName, {
×
NEW
227
        name: queueName,
×
NEW
228
        ...queueOptions,
×
NEW
229
        deadLetter: deadLetterName,
×
NEW
230
      })
×
NEW
231
    } catch {
×
NEW
232
      // no-op
×
NEW
233
    }
×
234

×
NEW
235
    return this.pollQueue(event, {
×
NEW
236
      concurrentTaskCount,
×
NEW
237
      onMessage,
×
NEW
238
      signal,
×
NEW
239
    })
×
NEW
240
  }
×
241

1✔
242
  protected static pollQueue(
1✔
NEW
243
    event: SubclassOfBaseClass,
×
NEW
244
    queueOpts: {
×
NEW
245
      concurrentTaskCount: number
×
NEW
246
      onMessage?: (job: Job) => void
×
NEW
247
      signal?: AbortSignal
×
NEW
248
    }
×
NEW
249
  ) {
×
NEW
250
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
NEW
251
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
NEW
252
    const batchSize =
×
NEW
253
      event.getWorkerOptions().batchSize ||
×
NEW
254
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
NEW
255

×
NEW
256
    let started = false
×
NEW
257
    const interval = setInterval(async () => {
×
NEW
258
      if (started) {
×
NEW
259
        return
×
NEW
260
      }
×
NEW
261

×
NEW
262
      try {
×
NEW
263
        started = true
×
NEW
264
        const defaultFetch = {
×
NEW
265
          includeMetadata: true,
×
NEW
266
          batchSize,
×
NEW
267
        }
×
NEW
268
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
NEW
269
          ...event.getWorkerOptions(),
×
NEW
270
          ...defaultFetch,
×
NEW
271
        })
×
NEW
272

×
NEW
273
        if (queueOpts.signal?.aborted) {
×
NEW
274
          started = false
×
NEW
275
          return
×
NEW
276
        }
×
NEW
277

×
NEW
278
        if (!jobs || (jobs && jobs.length === 0)) {
×
NEW
279
          started = false
×
NEW
280
          return
×
NEW
281
        }
×
NEW
282

×
NEW
283
        await Promise.allSettled(
×
NEW
284
          jobs.map(async (job) => {
×
NEW
285
            const lock = await semaphore.acquire()
×
NEW
286
            try {
×
NEW
287
              queueOpts.onMessage?.(job as Job)
×
NEW
288

×
NEW
289
              await event.handle(job, { signal: queueOpts.signal })
×
NEW
290

×
NEW
291
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
NEW
292
              QueueJobCompleted.inc({
×
NEW
293
                name: event.getQueueName(),
×
NEW
294
              })
×
NEW
295
            } catch (e) {
×
NEW
296
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
NEW
297

×
NEW
298
              QueueJobRetryFailed.inc({
×
299
                name: event.getQueueName(),
×
300
              })
×
301

×
NEW
302
              try {
×
NEW
303
                const dbJob: JobWithMetadata | null =
×
NEW
304
                  (job as JobWithMetadata).priority !== undefined
×
NEW
305
                    ? (job as JobWithMetadata)
×
NEW
306
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
NEW
307

×
NEW
308
                if (!dbJob) {
×
NEW
309
                  return
×
NEW
310
                }
×
NEW
311
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
NEW
312
                  QueueJobError.inc({
×
NEW
313
                    name: event.getQueueName(),
×
NEW
314
                  })
×
NEW
315
                }
×
NEW
316
              } catch (e) {
×
NEW
317
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
NEW
318
                  type: 'queue-task',
×
NEW
319
                  error: e,
×
NEW
320
                  metadata: JSON.stringify(job),
×
321
                })
×
322
              }
×
323

×
NEW
324
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
NEW
325
                type: 'queue-task',
×
NEW
326
                error: e,
×
NEW
327
                metadata: JSON.stringify(job),
×
NEW
328
              })
×
329

×
NEW
330
              throw e
×
NEW
331
            } finally {
×
NEW
332
              await lock.release()
×
NEW
333
            }
×
NEW
334
          })
×
NEW
335
        )
×
NEW
336
      } finally {
×
NEW
337
        started = false
×
338
      }
×
NEW
339
    }, pollingInterval)
×
NEW
340

×
NEW
341
    queueOpts.signal?.addEventListener('abort', () => {
×
NEW
342
      clearInterval(interval)
×
NEW
343
    })
×
UNCOV
344
  }
×
345
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc