• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 27760253447

18 Jun 2026 12:43PM UTC coverage: 60.005% (-18.2%) from 78.232%
27760253447

push

github

web-flow
fix: limit for delete objects and sign urls (#1160)

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

3351 of 6375 branches covered (52.56%)

Branch coverage included in aggregate %.

25 of 31 new or added lines in 4 files covered. (80.65%)

2041 existing lines in 86 files now uncovered.

7151 of 11127 relevant lines covered (64.27%)

354.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.43
/src/internal/queue/queue.ts
1
import { ERRORS } from '@internal/errors'
2
import { QueueDB } from '@internal/queue/database'
3
import { Semaphore } from '@shopify/semaphore'
4
import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss'
5
import { getConfig } from '../../config'
6
import { getSbReqIdFromPayload, logger, logSchema } from '../monitoring'
7
import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics'
8
import { Event } from './event'
9

10
type RegisteredEvent = {
11
  deadLetterQueueName(): string
12
  getQueueName(): string
13
  getQueueOptions(): ReturnType<typeof Event.getQueueOptions>
14
  getWorkerOptions(): ReturnType<typeof Event.getWorkerOptions>
15
  handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }): unknown
16
  onClose(): unknown
17
  onStart(): unknown
18
  name: string
19
}
20

21
export const PG_BOSS_SCHEMA = 'pgboss_v10'
43✔
22
const queueStopTimeoutMs = 25_000
43✔
23

24
export abstract class Queue {
25
  protected static events: RegisteredEvent[] = []
43✔
26
  private static pgBoss?: PgBoss
27
  private static pgBossDb?: PgBoss.Db
28

29
  static createPgBoss(opts: { db: Db; enableWorkers: boolean }) {
30
    const {
31
      isMultitenant,
32
      databaseURL,
33
      multitenantDatabasePoolUrl,
34
      multitenantDatabaseUrl,
35
      pgQueueConnectionURL,
36
      pgQueueArchiveCompletedAfterSeconds,
37
      pgQueueDeleteAfterDays,
38
      pgQueueDeleteAfterHours,
39
      pgQueueRetentionDays,
×
40
    } = getConfig()
41

42
    let url = pgQueueConnectionURL ?? databaseURL
×
43
    let migrate = true
×
44

45
    if (isMultitenant && !pgQueueConnectionURL) {
×
46
      if (!multitenantDatabaseUrl) {
×
47
        throw new Error(
×
48
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
49
        )
50
      }
51
      url = multitenantDatabasePoolUrl || multitenantDatabaseUrl
×
52

53
      if (multitenantDatabasePoolUrl) {
×
54
        migrate = false
×
55
      }
56
    }
57

58
    return new PgBoss({
×
59
      connectionString: url,
60
      migrate,
61
      db: opts.db,
62
      schema: PG_BOSS_SCHEMA,
63
      ...(pgQueueDeleteAfterHours
×
64
        ? { deleteAfterHours: pgQueueDeleteAfterHours }
65
        : { deleteAfterDays: pgQueueDeleteAfterDays }),
66
      archiveCompletedAfterSeconds: pgQueueArchiveCompletedAfterSeconds,
67
      retentionDays: pgQueueRetentionDays,
68
      retryBackoff: true,
69
      retryLimit: 20,
70
      expireInHours: 23,
71
      maintenanceIntervalSeconds: 60 * 5,
72
      schedule: opts.enableWorkers,
73
      supervise: opts.enableWorkers,
74
    })
75
  }
76

77
  static async start(opts: {
78
    signal?: AbortSignal
79
    onMessage?: (job: Job) => void
80
    registerWorkers?: () => void
81
  }) {
82
    if (Queue.pgBoss) {
×
83
      return Queue.pgBoss
×
84
    }
85

86
    if (opts.signal?.aborted) {
×
87
      throw ERRORS.Aborted('Cannot start queue with aborted signal')
×
88
    }
89

90
    const {
91
      isMultitenant,
92
      databaseURL,
93
      multitenantDatabaseUrl,
94
      multitenantDatabasePoolUrl,
95
      pgQueueConnectionURL,
96
      pgQueueEnableWorkers,
97
      pgQueueReadWriteTimeout,
98
      pgQueueConcurrentTasksPerQueue,
99
      pgQueueMaxConnections,
100
      databaseApplicationName,
×
101
    } = getConfig()
102

103
    let url = pgQueueConnectionURL || databaseURL
×
104

105
    if (isMultitenant && !pgQueueConnectionURL) {
×
106
      if (!multitenantDatabaseUrl && !multitenantDatabasePoolUrl) {
×
107
        throw new Error(
×
108
          'running storage in multi-tenant but DB_MULTITENANT_DATABASE_URL is not set'
109
        )
110
      }
111
      url = (multitenantDatabasePoolUrl || multitenantDatabaseUrl) as string
×
112
    }
113

114
    Queue.pgBossDb = new QueueDB({
×
115
      min: 0,
116
      max: pgQueueMaxConnections,
117
      connectionString: url,
118
      application_name: databaseApplicationName,
119
      statement_timeout: pgQueueReadWriteTimeout > 0 ? pgQueueReadWriteTimeout : undefined,
×
120
    })
121

122
    Queue.pgBoss = this.createPgBoss({
×
123
      db: Queue.pgBossDb,
124
      enableWorkers: pgQueueEnableWorkers !== false,
125
    })
126

127
    Queue.pgBoss.on('error', (error) => {
×
128
      logSchema.error(logger, '[Queue] error', {
×
129
        type: 'queue',
130
        error,
131
      })
132
    })
133

134
    await Queue.pgBoss.start()
×
135

136
    if (opts.registerWorkers && pgQueueEnableWorkers) {
×
137
      opts.registerWorkers()
×
138
    }
139

140
    await Queue.callStart()
×
141
    await Queue.startWorkers({
×
142
      maxConcurrentTasks: pgQueueConcurrentTasksPerQueue,
143
      onMessage: opts.onMessage,
144
      signal: opts.signal,
145
    })
146

147
    if (opts.signal) {
×
148
      opts.signal.addEventListener(
×
149
        'abort',
150
        async () => {
151
          logSchema.info(logger, '[Queue] Stopping', {
×
152
            type: 'queue',
153
          })
154
          return Queue.stop()
×
155
            .then(async () => {
156
              logSchema.info(logger, '[Queue] Exited', {
×
157
                type: 'queue',
158
              })
159
            })
160
            .catch((e) => {
161
              logSchema.error(logger, '[Queue] Error while stopping queue', {
×
162
                error: e,
163
                type: 'queue',
164
              })
165
            })
166
        },
167
        { once: true }
168
      )
169
    }
170

171
    return Queue.pgBoss
×
172
  }
173

174
  static getInstance() {
175
    if (!this.pgBoss) {
×
176
      throw new Error('pg boss not initialised')
×
177
    }
178

179
    return this.pgBoss
×
180
  }
181

182
  static getDb() {
183
    if (!this.pgBossDb) {
×
184
      throw new Error('pg boss not initialised')
×
185
    }
186

187
    return this.pgBossDb
×
188
  }
189

190
  static register<T extends RegisteredEvent>(event: T) {
191
    Queue.events.push(event)
×
192
  }
193

194
  static async stop() {
UNCOV
195
    if (!this.pgBoss) {
×
196
      return
×
197
    }
198

UNCOV
199
    const boss = this.pgBoss
×
UNCOV
200
    const db = this.pgBossDb
×
UNCOV
201
    const { isProduction } = getConfig()
×
202

UNCOV
203
    try {
×
UNCOV
204
      await withQueueStopTimeout(
×
205
        boss.stop({
206
          timeout: 20 * 1000,
207
          graceful: isProduction,
208
          wait: true,
209
        }),
210
        'Queue stop'
211
      )
212
    } finally {
UNCOV
213
      try {
×
UNCOV
214
        await withQueueStopTimeout(this.callClose(), 'Queue close')
×
215
      } finally {
UNCOV
216
        if (Queue.pgBoss === boss) {
×
UNCOV
217
          Queue.pgBoss = undefined
×
218
        }
219

UNCOV
220
        if (Queue.pgBossDb === db) {
×
UNCOV
221
          Queue.pgBossDb = undefined
×
222
        }
223
      }
224
    }
225
  }
226

227
  protected static async startWorkers(opts: {
228
    maxConcurrentTasks: number
229
    signal?: AbortSignal
230
    onMessage?: (job: Job) => void
231
  }) {
232
    for (const event of Queue.events) {
×
233
      await Queue.registerTask(event, opts.maxConcurrentTasks, opts.onMessage, opts.signal)
×
234
    }
235
  }
236

237
  protected static callStart() {
238
    const events = Queue.events.map((event) => {
×
239
      return event.onStart()
×
240
    })
241

242
    return Promise.all(events)
×
243
  }
244

245
  protected static callClose() {
UNCOV
246
    const events = Queue.events.map((event) => {
×
UNCOV
247
      return event.onClose()
×
248
    })
249

UNCOV
250
    return Promise.all(events)
×
251
  }
252

253
  protected static async registerTask(
254
    event: RegisteredEvent,
255
    maxConcurrentTasks: number,
256
    onMessage?: (job: Job) => void,
257
    signal?: AbortSignal
258
  ) {
259
    const queueName = event.getQueueName()
×
260
    const deadLetterName = event.deadLetterQueueName()
×
261

262
    const concurrentTaskCount = event.getWorkerOptions().concurrentTaskCount || maxConcurrentTasks
×
263
    try {
×
264
      // Create dead-letter queue and the normal queue
265
      const queueOptions = {
×
266
        policy: 'standard',
267
        ...event.getQueueOptions(),
268
      } as const
269

270
      // dead-letter
271
      await this.pgBoss?.createQueue(deadLetterName, {
×
272
        ...queueOptions,
273
        name: deadLetterName,
274
        retentionDays: 30,
275
        retryBackoff: true,
276
      })
277

278
      // // normal queue
279
      await this.pgBoss?.createQueue(queueName, {
×
280
        ...queueOptions,
281
        name: queueName,
282
        deadLetter: deadLetterName,
283
      })
284
    } catch {
285
      // no-op
286
    }
287

288
    return this.pollQueue(event, {
×
289
      concurrentTaskCount,
290
      onMessage,
291
      signal,
292
    })
293
  }
294

295
  protected static pollQueue(
296
    event: RegisteredEvent,
297
    queueOpts: {
298
      concurrentTaskCount: number
299
      onMessage?: (job: Job) => void
300
      signal?: AbortSignal
301
    }
302
  ) {
UNCOV
303
    const semaphore = new Semaphore(queueOpts.concurrentTaskCount)
×
UNCOV
304
    const pollingInterval = (event.getWorkerOptions().pollingIntervalSeconds || 5) * 1000
×
305
    const batchSize =
UNCOV
306
      event.getWorkerOptions().batchSize ||
×
307
      queueOpts.concurrentTaskCount + Math.max(1, Math.floor(queueOpts.concurrentTaskCount * 1.2))
308

UNCOV
309
    logSchema.info(logger, `[Queue] Polling queue ${event.getQueueName()}`, {
×
310
      type: 'queue',
311
      metadata: JSON.stringify({
312
        queueName: event.getQueueName(),
313
        batchSize,
314
        pollingInterval,
315
      }),
316
    })
317

UNCOV
318
    let started = false
×
UNCOV
319
    let jobFetched = 0
×
320

UNCOV
321
    const interval = setInterval(async () => {
×
UNCOV
322
      if (started) {
×
323
        return
×
324
      }
325

UNCOV
326
      try {
×
UNCOV
327
        started = true
×
UNCOV
328
        const defaultFetch = {
×
329
          includeMetadata: true,
330
          batchSize,
331
        }
332

UNCOV
333
        const currentBatch = defaultFetch.batchSize - jobFetched
×
334

UNCOV
335
        if (currentBatch <= 0) {
×
336
          return
×
337
        }
338

UNCOV
339
        const jobs = await this.pgBoss?.fetch(event.getQueueName(), {
×
340
          ...event.getWorkerOptions(),
341
          ...defaultFetch,
342
          batchSize: currentBatch,
343
        })
344

UNCOV
345
        jobFetched += jobs?.length || 0
×
346

UNCOV
347
        if (jobFetched < defaultFetch.batchSize) {
×
UNCOV
348
          started = false
×
349
        }
350

UNCOV
351
        if (queueOpts.signal?.aborted) {
×
352
          started = false
×
353
          return
×
354
        }
355

UNCOV
356
        if (!jobs || (jobs && jobs.length === 0)) {
×
357
          started = false
×
358
          return
×
359
        }
360

UNCOV
361
        await Promise.allSettled(
×
362
          jobs.map(async (job) => {
UNCOV
363
            const lock = await semaphore.acquire()
×
UNCOV
364
            const sbReqId = getSbReqIdFromPayload(job.data)
×
365

UNCOV
366
            try {
×
UNCOV
367
              queueOpts.onMessage?.(job as Job)
×
368

UNCOV
369
              await event.handle(job, { signal: queueOpts.signal })
×
370

371
              await this.pgBoss?.complete(event.getQueueName(), job.id)
×
372
              queueJobCompleted.add(1, {
×
373
                name: event.getQueueName(),
374
              })
375
            } catch (e) {
UNCOV
376
              queueJobRetryFailed.add(1, {
×
377
                name: event.getQueueName(),
378
              })
379

UNCOV
380
              await this.pgBoss?.fail(event.getQueueName(), job.id)
×
381

UNCOV
382
              try {
×
383
                const dbJob: JobWithMetadata | null =
UNCOV
384
                  (job as JobWithMetadata).priority !== undefined
×
385
                    ? (job as JobWithMetadata)
386
                    : await Queue.getInstance().getJobById(event.getQueueName(), job.id)
387

388
                if (!dbJob) {
×
389
                  return
×
390
                }
UNCOV
391
                if (dbJob.retryCount >= dbJob.retryLimit) {
×
392
                  queueJobError.add(1, {
×
393
                    name: event.getQueueName(),
394
                  })
395
                }
396
              } catch (e) {
397
                logSchema.error(logger, `[Queue Handler] fetching job ${event.name}`, {
×
398
                  type: 'queue-task',
399
                  error: e,
400
                  metadata: JSON.stringify(job),
401
                  sbReqId,
402
                })
403
              }
404

UNCOV
405
              logSchema.error(logger, `[Queue Handler] Error while processing job ${event.name}`, {
×
406
                type: 'queue-task',
407
                error: e,
408
                metadata: JSON.stringify(job),
409
                sbReqId,
410
              })
411

UNCOV
412
              throw e
×
413
            } finally {
UNCOV
414
              jobFetched = Math.max(0, jobFetched - 1)
×
UNCOV
415
              await lock.release()
×
416
            }
417
          })
418
        )
419
      } catch (e) {
420
        logSchema.error(logger, `[Queue] Error while polling queue ${event.name}`, {
×
421
          type: 'queue',
422
          error: e,
423
          metadata: JSON.stringify({
424
            queueName: event.getQueueName(),
425
            batchSize,
426
            pollingInterval,
427
          }),
428
        })
429
      } finally {
UNCOV
430
        started = false
×
431
      }
432
    }, pollingInterval)
433

UNCOV
434
    queueOpts.signal?.addEventListener('abort', () => {
×
UNCOV
435
      clearInterval(interval)
×
436
    })
437
  }
438
}
439

440
async function withQueueStopTimeout<T>(promise: Promise<T>, label: string): Promise<T> {
441
  let timeout: ReturnType<typeof setTimeout> | undefined
442

UNCOV
443
  const timeoutPromise = new Promise<never>((_, reject) => {
×
UNCOV
444
    timeout = setTimeout(() => {
×
UNCOV
445
      reject(new Error(`${label} timed out after ${queueStopTimeoutMs}ms`))
×
446
    }, queueStopTimeoutMs)
UNCOV
447
    timeout.unref?.()
×
448
  })
449

UNCOV
450
  try {
×
UNCOV
451
    return await Promise.race([promise, timeoutPromise])
×
452
  } finally {
UNCOV
453
    if (timeout) {
×
UNCOV
454
      clearTimeout(timeout)
×
455
    }
456
  }
457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc