• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 16229612421

11 Jul 2025 08:57PM UTC coverage: 76.92% (-0.2%) from 77.08%
16229612421

push

github

web-flow
fix: migration check

* fix: migration check

1623 of 2339 branches covered (69.39%)

Branch coverage included in aggregate %.

14 of 90 new or added lines in 7 files covered. (15.56%)

1 existing line in 1 file now uncovered.

20333 of 26205 relevant lines covered (77.59%)

102.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.51
/src/internal/queue/queue.ts
1
import PgBoss, { Job, JobWithMetadata } from 'pg-boss'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { QueueDB } from '@internal/queue/database'
1✔
4
import { getConfig } from '../../config'
1✔
5
import { logger, logSchema } from '../monitoring'
1✔
6
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
1✔
7
import { Event } from './event'
1✔
8
import { Semaphore } from '@shopify/semaphore'
1✔
9

1✔
10
//eslint-disable-next-line @typescript-eslint/no-explicit-any
1✔
11
type SubclassOfBaseClass = (new (payload: any) => Event<any>) & {
1✔
12
  [K in keyof typeof Event]: (typeof Event)[K]
1✔
13
}
1✔
14

1✔
15
export abstract class Queue {
1✔
16
  protected static events: SubclassOfBaseClass[] = []
1✔
17
  private static pgBoss?: PgBoss
1✔
18
  private static pgBossDb?: PgBoss.Db
1✔
19

1✔
20
  static async start(opts: {
1✔
21
    signal?: AbortSignal
×
22
    onMessage?: (job: Job) => void
×
23
    registerWorkers?: () => void
×
24
  }) {
×
25
    if (Queue.pgBoss) {
×
26
      return Queue.pgBoss
×
27
    }
×
28

×
29
    if (opts.signal?.aborted) {
×
30
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
31
    }
×
32

×
33
    const {
×
34
      isMultitenant,
×
35
      databaseURL,
×
36
      multitenantDatabaseUrl,
×
37
      pgQueueConnectionURL,
×
38
      pgQueueArchiveCompletedAfterSeconds,
×
39
      pgQueueDeleteAfterDays,
×
40
      pgQueueDeleteAfterHours,
×
41
      pgQueueRetentionDays,
×
42
      pgQueueEnableWorkers,
×
43
      pgQueueReadWriteTimeout,
×
44
      pgQueueConcurrentTasksPerQueue,
×
45
      pgQueueMaxConnections,
×
46
    } = getConfig()
×
47

×
48
    let url = pgQueueConnectionURL ?? databaseURL
×
49

×
50
    if (isMultitenant && !pgQueueConnectionURL) {
×
51
      if (!multitenantDatabaseUrl) {
×
52
        throw new Error(
×
53
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
×
54
        )
×
55
      }
×
56
      url = multitenantDatabaseUrl
×
57
    }
×
58

×
NEW
59
    Queue.pgBossDb = new QueueDB({
×
NEW
60
      min: 0,
×
NEW
61
      max: pgQueueMaxConnections,
×
NEW
62
      connectionString: url,
×
NEW
63
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
NEW
64
    })
×
NEW
65

×
66
    Queue.pgBoss = new PgBoss({
×
67
      connectionString: url,
×
68

×
NEW
69
      db: Queue.pgBossDb,
×
70
      schema: 'pgboss_v10',
×
71
      application_name: 'storage-pgboss',
×
72
      ...(pgQueueDeleteAfterHours
×
73
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
×
74
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
×
75
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
×
76
      retentionDays: pgQueueRetentionDays,
×
77
      retryBackoff: true,
×
78
      retryLimit: 20,
×
79
      expireInHours: 23,
×
80
      maintenanceIntervalSeconds: 60 * 5,
×
81
      schedule: pgQueueEnableWorkers !== false,
×
82
      supervise: pgQueueEnableWorkers !== false,
×
83
    })
×
84

×
85
    Queue.pgBoss.on('error', (error) => {
×
86
      logSchema.error(logger, '[Queue] error', {
×
87
        type: 'queue',
×
88
        error,
×
89
      })
×
90
    })
×
91

×
92
    await Queue.pgBoss.start()
×
93

×
94
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
95
      opts.registerWorkers()
×
96
    }
×
97

×
98
    await Queue.callStart()
×
99
    await Queue.startWorkers({
×
100
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
×
101
      onMessage: opts.onMessage,
×
102
      signal: opts.signal,
×
103
    })
×
104

×
105
    if (opts.signal) {
×
106
      opts.signal.addEventListener(
×
107
        'abort',
×
108
        async () => {
×
109
          logSchema.info(logger, '[Queue] Stopping', {
×
110
            type: 'queue',
×
111
          })
×
112
          return Queue.stop()
×
113
            .then(async () => {
×
114
              logSchema.info(logger, '[Queue] Exited', {
×
115
                type: 'queue',
×
116
              })
×
117
            })
×
118
            .catch((e) => {
×
119
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
120
                error: e,
×
121
                type: 'queue',
×
122
              })
×
123
            })
×
124
            .finally(async () => {
×
125
              await Queue.callClose().catch(() => {
×
126
                // no-op
×
127
              })
×
128
            })
×
129
        },
×
130
        { once: true }
×
131
      )
×
132
    }
×
133

×
134
    return Queue.pgBoss
×
135
  }
×
136

1✔
137
  static getInstance() {
1✔
138
    if (!this.pgBoss) {
×
139
      throw new Error('pg boss not initialised')
×
140
    }
×
141

×
142
    return this.pgBoss
×
143
  }
×
144

1✔
145
  static getDb() {
1✔
NEW
146
    if (!this.pgBossDb) {
×
NEW
147
      throw new Error('pg boss not initialised')
×
NEW
148
    }
×
NEW
149

×
NEW
150
    return this.pgBossDb
×
NEW
151
  }
×
152

1✔
153
  static register<T extends SubclassOfBaseClass>(event: T) {
1✔
154
    Queue.events.push(event)
×
155
  }
×
156

1✔
157
  static async stop() {
1✔
158
    if (!this.pgBoss) {
×
159
      return
×
160
    }
×
161

×
162
    const boss = this.pgBoss
×
163
    const { isProduction } = getConfig()
×
164

×
165
    await boss.stop({
×
166
      timeout: 20 * 1000,
×
167
      graceful: isProduction,
×
168
      wait: true,
×
169
    })
×
170

×
171
    await new Promise((resolve) => {
×
172
      boss.once('stopped', async () => {
×
173
        await this.callClose()
×
174
        resolve(null)
×
175
      })
×
176
    })
×
177

×
178
    Queue.pgBoss = undefined
×
179
  }
×
180

1✔
181
  protected static async startWorkers(opts: {
1✔
182
    maxConcurrentTasks: number
×
183
    signal?: AbortSignal
×
184
    onMessage?: (job: Job) => void
×
185
  }) {
×
186
    for (const event of Queue.events) {
×
187
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
188
    }
×
189
  }
×
190

1✔
191
  protected static callStart() {
1✔
192
    const events = Queue.events.map((event) => {
×
193
      return event.onStart()
×
194
    })
×
195

×
196
    return Promise.all(events)
×
197
  }
×
198

1✔
199
  protected static callClose() {
1✔
200
    const events = Queue.events.map((event) => {
×
201
      return event.onClose()
×
202
    })
×
203

×
204
    return Promise.all(events)
×
205
  }
×
206

1✔
207
  protected static async registerTask(
1✔
208
    event: SubclassOfBaseClass,
×
209
    maxConcurrentTasks: number,
×
210
    onMessage?: (job: Job) => void,
×
211
    signal?: AbortSignal
×
212
  ) {
×
213
    const queueName = event.getQueueName()
×
214
    const deadLetterName = event.deadLetterQueueName()
×
215

×
216
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
217
    try {
×
218
      // Create dead-letter queue and the normal queue
×
219
      const queueOptions = {
×
220
        policy: 'standard',
×
221
        ...event.getQueueOptions(),
×
222
      } as const
×
223

×
224
      // dead-letter
×
225
      await this.pgBoss?.createQueue(deadLetterName, {
×
226
        ...queueOptions,
×
227
        name: deadLetterName,
×
228
        retentionDays: 30,
×
229
        retryBackoff: true,
×
230
      })
×
231

×
232
      // // normal queue
×
233
      await this.pgBoss?.createQueue(queueName, {
×
234
        ...queueOptions,
×
NEW
235
        name: queueName,
×
236
        deadLetter: deadLetterName,
×
237
      })
×
238
    } catch {
×
239
      // no-op
×
240
    }
×
241

×
242
    return this.pollQueue(event, {
×
243
      concurrentTaskCount,
×
244
      onMessage,
×
245
      signal,
×
246
    })
×
247
  }
×
248

1✔
249
  protected static pollQueue(
1✔
250
    event: SubclassOfBaseClass,
×
251
    queueOpts: {
×
252
      concurrentTaskCount: number
×
253
      onMessage?: (job: Job) => void
×
254
      signal?: AbortSignal
×
255
    }
×
256
  ) {
×
257
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
258
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
259
    const batchSize =
×
260
      event.getWorkerOptions().batchSize ||
×
261
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
×
262

×
NEW
263
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
NEW
264
      type: 'queue',
×
NEW
265
      metadata: JSON.stringify({
×
NEW
266
        queueName: event.getQueueName(),
×
NEW
267
        batchSize: batchSize,
×
NEW
268
        pollingInterval: pollingInterval,
×
NEW
269
      }),
×
NEW
270
    })
×
NEW
271

×
272
    let started = false
×
273
    const interval = setInterval(async () => {
×
274
      if (started) {
×
275
        return
×
276
      }
×
277

×
278
      try {
×
279
        started = true
×
280
        const defaultFetch = {
×
281
          includeMetadata: true,
×
282
          batchSize,
×
283
        }
×
284
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
285
          ...event.getWorkerOptions(),
×
286
          ...defaultFetch,
×
287
        })
×
288

×
289
        if (queueOpts.signal?.aborted) {
×
290
          started = false
×
291
          return
×
292
        }
×
293

×
294
        if (!jobs || (jobs && jobs.length === 0)) {
×
295
          started = false
×
296
          return
×
297
        }
×
298

×
299
        await Promise.allSettled(
×
300
          jobs.map(async (job) => {
×
301
            const lock = await semaphore.acquire()
×
302
            try {
×
303
              queueOpts.onMessage?.(job as Job)
×
304

×
305
              await event.handle(job, { signal: queueOpts.signal })
×
306

×
307
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
308
              QueueJobCompleted.inc({
×
309
                name: event.getQueueName(),
×
310
              })
×
311
            } catch (e) {
×
312
              QueueJobRetryFailed.inc({
×
313
                name: event.getQueueName(),
×
314
              })
×
315

×
NEW
316
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
NEW
317

×
318
              try {
×
319
                const dbJob: JobWithMetadata | null =
×
320
                  (job as JobWithMetadata).priority !== undefined
×
321
                    ? (job as JobWithMetadata)
×
322
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
×
323

×
324
                if (!dbJob) {
×
325
                  return
×
326
                }
×
327
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
328
                  QueueJobError.inc({
×
329
                    name: event.getQueueName(),
×
330
                  })
×
331
                }
×
332
              } catch (e) {
×
333
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
334
                  type: 'queue-task',
×
335
                  error: e,
×
336
                  metadata: JSON.stringify(job),
×
337
                })
×
338
              }
×
339

×
340
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
341
                type: 'queue-task',
×
342
                error: e,
×
343
                metadata: JSON.stringify(job),
×
344
              })
×
345

×
346
              throw e
×
347
            } finally {
×
348
              await lock.release()
×
349
            }
×
350
          })
×
351
        )
×
NEW
352
      } catch (e) {
×
NEW
353
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
NEW
354
          type: 'queue',
×
NEW
355
          error: e,
×
NEW
356
          metadata: JSON.stringify({
×
NEW
357
            queueName: event.getQueueName(),
×
NEW
358
            batchSize,
×
NEW
359
            pollingInterval,
×
NEW
360
          }),
×
NEW
361
        })
×
362
      } finally {
×
363
        started = false
×
364
      }
×
365
    }, pollingInterval)
×
366

×
367
    queueOpts.signal?.addEventListener('abort', () => {
×
368
      clearInterval(interval)
×
369
    })
×
370
  }
×
371
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc