• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 15187324276

22 May 2025 01:03PM UTC coverage: 78.01% (-0.1%) from 78.144%
15187324276

Pull #696

github

web-flow
Merge 63e223c68 into 69e4a4079
Pull Request #696: feat: pgboss v10

1532 of 2132 branches covered (71.86%)

Branch coverage included in aggregate %.

35 of 173 new or added lines in 9 files covered. (20.23%)

3 existing lines in 2 files now uncovered.

17309 of 22020 relevant lines covered (78.61%)

109.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.18
/src/internal/queue/queue.ts
1
import PgBoss, { Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
11
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
12
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
13
}
1✔
14

1✔
15
export abstract class Queue {
1✔
16
  protected static events: SubclassOfBaseClass[] = []
1✔
17
  private static pgBoss?: PgBoss
1✔
18

1✔
19
  static async start(opts: {
1✔
20
    signal?: AbortSignal
×
21
    onMessage?: (job: Job) => void
×
22
    registerWorkers?: () => void
×
23
  }) {
×
24
    if (Queue.pgBoss) {
×
25
      return Queue.pgBoss
×
26
    }
×
27

×
28
    if (opts.signal?.aborted) {
×
29
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
30
    }
×
31

×
32
    const {
×
33
      isMultitenant,
×
34
      databaseURL,
×
35
      multitenantDatabaseUrl,
×
36
      pgQueueConnectionURL,
×
NEW
37
      pgQueueArchiveCompletedAfterSeconds,
×
38
      pgQueueDeleteAfterDays,
×
39
      pgQueueDeleteAfterHours,
×
40
      pgQueueRetentionDays,
×
41
      pgQueueEnableWorkers,
×
42
      pgQueueReadWriteTimeout,
×
43
      pgQueueMaxConnections,
×
44
    } = getConfig()
×
45

×
46
    let url = pgQueueConnectionURL ?? databaseURL
×
47

×
48
    if (isMultitenant && !pgQueueConnectionURL) {
×
49
      if (!multitenantDatabaseUrl) {
×
50
        throw new Error(
×
51
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
52
        )
×
53
      }
×
54
      url = multitenantDatabaseUrl
×
55
    }
×
56

×
57
    Queue.pgBoss = new PgBoss({
×
58
      connectionString: url,
×
NEW
59

×
60
      db: new QueueDB({
×
61
        min: 0,
×
62
        max: pgQueueMaxConnections,
×
63
        connectionString: url,
×
64
        statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
65
      }),
×
NEW
66
      schema: 'pgboss_v10',
×
67
      application_name: 'storage-pgboss',
×
68
      ...(pgQueueDeleteAfterHours ? {} : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
69
      ...(pgQueueDeleteAfterHours ? { deleteAfterHours: pgQueueDeleteAfterHours } : {}),
×
70
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
71
      retentionDays: pgQueueRetentionDays,
×
72
      retryBackoff: true,
×
73
      retryLimit: 20,
×
NEW
74
      expireInHours: 23,
×
NEW
75
      maintenanceIntervalSeconds: 60 * 5,
×
NEW
76
      schedule: pgQueueEnableWorkers !== false,
×
NEW
77
      supervise: pgQueueEnableWorkers !== false,
×
78
    })
×
79

×
80
    Queue.pgBoss.on('error', (error) => {
×
81
      logSchema.error(logger, '[Queue] error', {
×
82
        type: 'queue',
×
83
        error,
×
84
      })
×
85
    })
×
86

×
87
    await Queue.pgBoss.start()
×
88

×
89
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
90
      opts.registerWorkers()
×
91
    }
×
92

×
93
    await Queue.callStart()
×
NEW
94
    await Queue.startWorkers({
×
NEW
95
      onMessage: opts.onMessage,
×
NEW
96
      signal: opts.signal,
×
NEW
97
    })
×
98

×
99
    if (opts.signal) {
×
100
      opts.signal.addEventListener(
×
101
        'abort',
×
102
        async () => {
×
103
          logSchema.info(logger, '[Queue] Stopping', {
×
104
            type: 'queue',
×
105
          })
×
106
          return Queue.stop()
×
107
            .then(async () => {
×
108
              logSchema.info(logger, '[Queue] Exited', {
×
109
                type: 'queue',
×
110
              })
×
111
            })
×
112
            .catch((e) => {
×
113
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
114
                error: e,
×
115
                type: 'queue',
×
116
              })
×
117
            })
×
118
            .finally(async () => {
×
119
              await Queue.callClose().catch(() => {
×
120
                // no-op
×
121
              })
×
122
            })
×
123
        },
×
124
        { once: true }
×
125
      )
×
126
    }
×
127

×
128
    return Queue.pgBoss
×
129
  }
×
130

1✔
131
  static getInstance() {
1✔
132
    if (!this.pgBoss) {
×
133
      throw new Error('pg boss not initialised')
×
134
    }
×
135

×
136
    return this.pgBoss
×
137
  }
×
138

1✔
139
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
140
    Queue.events.push(event)
×
141
  }
×
142

1✔
143
  static async stop() {
1✔
144
    if (!this.pgBoss) {
×
145
      return
×
146
    }
×
147

×
148
    const boss = this.pgBoss
×
149
    const { isProduction } = getConfig()
×
150

×
151
    await boss.stop({
×
152
      timeout: 20 * 1000,
×
153
      graceful: isProduction,
×
NEW
154
      wait: true,
×
155
    })
×
156

×
157
    await new Promise((resolve) => {
×
158
      boss.once('stopped', async () => {
×
159
        await this.callClose()
×
160
        resolve(null)
×
161
      })
×
162
    })
×
163

×
164
    Queue.pgBoss = undefined
×
165
  }
×
166

1✔
167
  protected static startWorkers(opts: { signal?: AbortSignal; onMessage?: (job: Job) => void }) {
1✔
NEW
168
    const workers: Promise<any>[] = []
×
169

×
170
    Queue.events.forEach((event) => {
×
NEW
171
      workers.push(Queue.registerTask(event.getQueueName(), event, opts.onMessage, opts.signal))
×
172
    })
×
173

×
174
    return Promise.all(workers)
×
175
  }
×
176

1✔
177
  protected static callStart() {
1✔
178
    const events = Queue.events.map((event) => {
×
179
      return event.onStart()
×
180
    })
×
181

×
182
    return Promise.all(events)
×
183
  }
×
184

1✔
185
  protected static callClose() {
1✔
186
    const events = Queue.events.map((event) => {
×
187
      return event.onClose()
×
188
    })
×
189

×
190
    return Promise.all(events)
×
191
  }
×
192

1✔
193
  protected static async registerTask(
1✔
194
    queueName: string,
×
195
    event: SubclassOfBaseClass,
×
NEW
196
    onMessage?: (job: Job) => void,
×
NEW
197
    signal?: AbortSignal
×
198
  ) {
×
NEW
199
    const concurrentTaskCount = event.getWorkerOptions().batchSize || 50
×
NEW
200
    const pollingInterval = event.getWorkerOptions().pollingIntervalSeconds || 5 * 1000
×
NEW
201
    const semaphore = new Semaphore(concurrentTaskCount)
×
NEW
202

×
NEW
203
    try {
×
NEW
204
      await this.pgBoss?.createQueue(
×
NEW
205
        queueName,
×
NEW
206
        event.getQueueOptions() || {
×
NEW
207
          name: queueName,
×
NEW
208
          policy: 'standard',
×
NEW
209
        }
×
NEW
210
      )
×
NEW
211
    } catch {
×
NEW
212
      // no-op
×
NEW
213
    }
×
214

×
NEW
215
    let started = false
×
NEW
216
    const interval = setInterval(async () => {
×
NEW
217
      if (started) {
×
NEW
218
        return
×
NEW
219
      }
×
220

×
NEW
221
      started = true
×
NEW
222
      const defaultFetch = {
×
NEW
223
        includeMetadata: true,
×
NEW
224
        batchSize: concurrentTaskCount * 2,
×
NEW
225
      }
×
NEW
226
      const jobs = await this.pgBoss?.fetch(queueName, {
×
NEW
227
        ...event.getWorkerOptions(),
×
NEW
228
        ...defaultFetch,
×
NEW
229
      })
×
230

×
NEW
231
      if (signal?.aborted) {
×
NEW
232
        started = false
×
NEW
233
        return
×
NEW
234
      }
×
NEW
235

×
NEW
236
      if (!jobs || (jobs && jobs.length === 0)) {
×
NEW
237
        started = false
×
NEW
238
        return
×
NEW
239
      }
×
NEW
240

×
NEW
241
      try {
×
NEW
242
        await Promise.allSettled(
×
NEW
243
          jobs.map(async (job) => {
×
NEW
244
            const lock = await semaphore.acquire()
×
NEW
245
            try {
×
NEW
246
              onMessage?.(job as Job)
×
NEW
247

×
NEW
248
              await event.handle(job)
×
NEW
249

×
NEW
250
              await this.pgBoss?.complete(queueName, job.id)
×
NEW
251
              QueueJobCompleted.inc({
×
252
                name: event.getQueueName(),
×
253
              })
×
NEW
254
            } catch (e) {
×
NEW
255
              await this.pgBoss?.fail(queueName, job.id)
×
256

×
NEW
257
              QueueJobRetryFailed.inc({
×
NEW
258
                name: event.getQueueName(),
×
NEW
259
              })
×
NEW
260

×
NEW
261
              try {
×
NEW
262
                const dbJob: JobWithMetadata | null =
×
NEW
263
                  (job as JobWithMetadata).priority !== undefined
×
NEW
264
                    ? (job as JobWithMetadata)
×
NEW
265
                    : await Queue.getInstance().getJobById(queueName, job.id)
×
NEW
266

×
NEW
267
                if (!dbJob) {
×
NEW
268
                  return
×
NEW
269
                }
×
NEW
270
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
NEW
271
                  QueueJobError.inc({
×
NEW
272
                    name: event.getQueueName(),
×
NEW
273
                  })
×
NEW
274
                }
×
NEW
275
              } catch (e) {
×
NEW
276
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
NEW
277
                  type: 'queue-task',
×
NEW
278
                  error: e,
×
NEW
279
                  metadata: JSON.stringify(job),
×
280
                })
×
281
              }
×
282

×
NEW
283
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
NEW
284
                type: 'queue-task',
×
NEW
285
                error: e,
×
NEW
286
                metadata: JSON.stringify(job),
×
NEW
287
              })
×
288

×
NEW
289
              throw e
×
NEW
290
            } finally {
×
NEW
291
              await lock.release()
×
NEW
292
            }
×
NEW
293
          })
×
NEW
294
        )
×
NEW
295
      } finally {
×
NEW
296
        started = false
×
297
      }
×
NEW
298
    }, pollingInterval)
×
NEW
299

×
NEW
300
    signal?.addEventListener('abort', () => {
×
NEW
301
      clearInterval(interval)
×
NEW
302
    })
×
UNCOV
303
  }
×
304
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc