• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 22692371830

04 Mar 2026 10:27PM UTC coverage: 76.025% (-0.01%) from 76.039%
22692371830

Pull #893

github

web-flow
Merge 41e7eb2da into d8dba5374
Pull Request #893: feat: upgrade fastify to v5

3941 of 5638 branches covered (69.9%)

Branch coverage included in aggregate %.

52 of 81 new or added lines in 10 files covered. (64.2%)

3 existing lines in 2 files now uncovered.

26539 of 34454 relevant lines covered (77.03%)

190.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.35
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2✔
2
import { ERRORS } from '@internal/errors'
2✔
3
import { logger, logSchema } from '@internal/monitoring'
2✔
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
2✔
5
import { KnexQueueDB } from '@internal/queue/database'
2✔
6
import { Knex } from 'knex'
2✔
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
2✔
8
import { getConfig } from '../../config'
2✔
9
import { Queue } from './queue'
2✔
10

2✔
11
export interface BasePayload {
2✔
12
  $version?: string
2✔
13
  singletonKey?: string
2✔
14
  scheduleAt?: Date
2✔
15
  reqId?: string
2✔
16
  tenant: {
2✔
17
    ref: string
2✔
18
    host: string
2✔
19
  }
2✔
20
}
2✔
21

2✔
22
const { pgQueueEnable, region, isMultitenant } = getConfig()
2✔
23

2✔
24
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
2✔
25

2✔
26
interface BaseEventConstructor<Base extends Event<any>> {
2✔
27
  version: string
2✔
28

2✔
29
  new (...args: any): Base
2✔
30

2✔
31
  send(
2✔
32
    this: StaticThis<Base>,
2✔
33
    payload: Omit<Base['payload'], '$version'>
2✔
34
  ): Promise<string | void | null>
2✔
35

2✔
36
  eventName(): string
2✔
37
  getWorkerOptions(): WorkOptions
2✔
38
}
2✔
39

2✔
40
/**
2✔
41
 * Base class for all events that are sent to the queue
2✔
42
 */
2✔
43
export class Event<T extends Omit<BasePayload, '$version'>> {
2✔
44
  public static readonly version: string = 'v1'
2✔
45
  protected static queueName = ''
2✔
46
  protected static allowSync = true
2✔
47

2✔
48
  constructor(public readonly payload: T & BasePayload) {}
2✔
49

2✔
50
  static eventName() {
2✔
51
    return this.name
2✔
52
  }
2✔
53

2✔
54
  static deadLetterQueueName() {
2✔
55
    return this.queueName + '-dead-letter'
2✔
56
  }
2✔
57

2✔
58
  static getQueueName() {
2✔
59
    if (!this.queueName) {
20,098!
60
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
61
    }
×
62

20,098✔
63
    return this.queueName
20,098✔
64
  }
20,098✔
65

2✔
66
  static getQueueOptions(): PgBossQueue | undefined {
2✔
67
    return undefined
×
68
  }
×
69

2✔
70
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
2✔
71
    return undefined
×
72
  }
×
73

2✔
74
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
2✔
75
    return {}
×
76
  }
×
77

2✔
78
  static onClose() {
2✔
79
    // no-op
×
80
  }
×
81

2✔
82
  static onStart() {
2✔
83
    // no-op
×
84
  }
×
85

2✔
86
  static batchSend<T extends Event<any>[]>(messages: T) {
2✔
87
    if (!pgQueueEnable) {
2!
88
      if (this.allowSync) {
×
89
        return Promise.all(messages.map((message) => message.send()))
✔
90
      } else {
×
NEW
91
        logger.warn(
×
NEW
92
          {
×
NEW
93
            type: 'queue',
×
NEW
94
            eventType: this.eventName(),
×
NEW
95
          },
×
NEW
96
          '[Queue] skipped sending batch messages'
×
NEW
97
        )
×
98
        return
×
99
      }
×
100
    }
×
101

2✔
102
    return Queue.getInstance().insert(
2✔
103
      messages.map((message) => {
2✔
104
        const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {}
2!
105
        if (!message.payload.$version) {
2✔
106
          ;(message.payload as (typeof message)['payload']).$version = this.version
2✔
107
        }
2✔
108

2✔
109
        if (message.payload.scheduleAt) {
2!
110
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
111
        }
×
112

2✔
113
        return {
2✔
114
          ...sendOptions,
2✔
115
          name: this.getQueueName(),
2✔
116
          data: message.payload,
2✔
117
          deadLetter: this.deadLetterQueueName(),
2✔
118
        }
2✔
119
      })
2✔
120
    )
2✔
121
  }
2✔
122

2✔
123
  static send<T extends Event<any>>(
2✔
124
    this: StaticThis<T>,
20,098✔
125
    payload: Omit<T['payload'], '$version'>,
20,098✔
126
    opts?: SendOptions & { tnx?: Knex }
20,098✔
127
  ) {
20,098✔
128
    if (!payload.$version) {
20,098✔
129
      ;(payload as T['payload']).$version = this.version
20,098✔
130
    }
20,098✔
131
    const that = new this(payload)
20,098✔
132
    return that.send(opts)
20,098✔
133
  }
20,098✔
134

2✔
135
  static invoke<T extends Event<any>>(
2✔
136
    this: StaticThis<T>,
2✔
137
    payload: Omit<T['payload'], '$version'>
2✔
138
  ) {
2✔
139
    if (!payload.$version) {
2✔
140
      ;(payload as T['payload']).$version = this.version
2✔
141
    }
2✔
142
    const that = new this(payload)
2✔
143
    return that.invoke()
2✔
144
  }
2✔
145

2✔
146
  static invokeOrSend<T extends Event<any>>(
2✔
147
    this: StaticThis<T>,
32✔
148
    payload: Omit<T['payload'], '$version'>,
32✔
149
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
32✔
150
  ) {
32✔
151
    if (!payload.$version) {
32✔
152
      ;(payload as T['payload']).$version = this.version
32✔
153
    }
32✔
154
    const that = new this(payload)
32✔
155
    return that.invokeOrSend(options)
32✔
156
  }
32✔
157

2✔
158
  static handle(
2✔
159
    job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[],
×
160
    opts?: { signal?: AbortSignal }
×
161
  ) {
×
162
    throw new Error('not implemented')
×
163
  }
×
164

2✔
165
  static async shouldSend(payload: any) {
2✔
166
    if (isMultitenant && payload?.tenant?.ref) {
34!
167
      // Do not send an event if disabled for this specific tenant
×
168
      const tenant = await getTenantConfig(payload.tenant.ref)
×
169
      const disabledEvents = tenant.disableEvents || []
×
170
      if (disabledEvents.includes(this.eventName())) {
×
171
        return false
×
172
      }
×
173
    }
×
174
    return true
34✔
175
  }
34✔
176

2✔
177
  /**
2✔
178
   * See issue https://github.com/timgit/pg-boss/issues/535
2✔
179
   * @param queueName
2✔
180
   * @param singletonKey
2✔
181
   * @param jobId
2✔
182
   */
2✔
183
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
2✔
184
    if (!pgQueueEnable) {
×
185
      return Promise.resolve()
×
186
    }
×
187

×
188
    await Queue.getDb().executeSql(
×
189
      `DELETE FROM pgboss_v10.job
×
190
       WHERE id = $1
×
191
       AND EXISTS(
×
192
          SELECT 1 FROM pgboss_v10.job
×
193
             WHERE id != $2
×
194
             AND state < 'active'
×
195
             AND name = $3
×
196
             AND singleton_key = $4
×
197
       )
×
198
      `,
×
199
      [jobId, jobId, queueName, singletonKey]
×
200
    )
×
201
  }
×
202

2✔
203
  async invokeOrSend(
2✔
204
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
32✔
205
  ): Promise<string | void | null> {
32✔
206
    const constructor = this.constructor as typeof Event
32✔
207

32✔
208
    if (!constructor.allowSync) {
32!
209
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
210
    }
×
211

32✔
212
    try {
32✔
213
      await this.invoke()
32✔
214
    } catch (e) {
32!
215
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
216
        throw e
×
217
      }
×
218
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
219
        type: 'queue',
×
220
        project: this.payload.tenant?.ref,
×
221
        error: e,
×
222
        metadata: JSON.stringify(this.payload),
×
223
      })
×
224

×
225
      return this.send(sendOptions)
×
226
    }
×
227
  }
32✔
228

2✔
229
  async invoke(): Promise<string | void | null> {
2✔
230
    const constructor = this.constructor as typeof Event
34✔
231

34✔
232
    if (!constructor.allowSync) {
34!
233
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
234
    }
×
235

34✔
236
    await constructor.handle({
34✔
237
      id: '__sync',
34✔
238
      expireInSeconds: 0,
34✔
239
      name: constructor.getQueueName(),
34✔
240
      data: {
34✔
241
        region,
34✔
242
        ...this.payload,
34✔
243
        $version: constructor.version,
34✔
244
      },
34✔
245
    })
34✔
246
  }
34✔
247

2✔
248
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
2✔
249
    const constructor = this.constructor as typeof Event
20,098✔
250

20,098✔
251
    const shouldSend = await constructor.shouldSend(this.payload)
20,098✔
252

20,098✔
253
    if (!shouldSend) {
20,098!
254
      return
×
255
    }
×
256

20,098✔
257
    if (!pgQueueEnable) {
20,098✔
258
      if (constructor.allowSync) {
20,098✔
259
        return constructor.handle({
20,098✔
260
          id: '__sync',
20,098✔
261
          expireInSeconds: 0,
20,098✔
262
          name: constructor.getQueueName(),
20,098✔
263
          data: {
20,098✔
264
            region,
20,098✔
265
            ...this.payload,
20,098✔
266
            $version: constructor.version,
20,098✔
267
          },
20,098✔
268
        })
20,098✔
269
      } else {
20,098!
NEW
270
        logger.warn(
×
NEW
271
          {
×
NEW
272
            type: 'queue',
×
NEW
273
            eventType: constructor.eventName(),
×
NEW
274
          },
×
NEW
275
          '[Queue] skipped sending message'
×
NEW
276
        )
×
277
        return
×
278
      }
×
279
    }
20,098✔
280

×
281
    const startTime = process.hrtime.bigint()
×
282
    const sendOptions = constructor.getSendOptions(this.payload) || {}
×
283

20,098✔
284
    if (this.payload.scheduleAt) {
20,098!
285
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
286
    }
×
287

×
288
    sendOptions!.deadLetter = constructor.deadLetterQueueName()
×
289

×
290
    try {
×
291
      const queue = customSendOptions?.tnx
×
292
        ? Queue.createPgBoss({
20,098!
293
            enableWorkers: false,
×
294
            db: new KnexQueueDB(customSendOptions.tnx),
×
295
          })
×
296
        : Queue.getInstance()
20,098!
297

20,098✔
298
      const res = await queue.send({
20,098✔
299
        name: constructor.getQueueName(),
20,098✔
300
        data: {
20,098✔
301
          region,
20,098✔
302
          ...this.payload,
20,098✔
303
          $version: constructor.version,
20,098✔
304
        },
20,098✔
305
        options: {
20,098✔
306
          ...sendOptions,
20,098✔
307
          ...customSendOptions,
20,098✔
308
        },
20,098✔
309
      })
20,098✔
310

×
311
      queueJobScheduled.add(1, {
×
312
        name: constructor.getQueueName(),
×
313
      })
×
314

×
315
      return res
×
316
    } catch (e) {
×
317
      // If we can't queue the message for some reason,
×
318
      // we run its handler right away.
×
319
      // This might create some latency with the benefit of being more fault-tolerant
×
320
      logSchema.warning(
×
321
        logger,
×
322
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
323
        {
×
324
          type: 'queue',
×
325
          error: e,
×
326
          metadata: JSON.stringify(this.payload),
×
327
        }
×
328
      )
×
329

×
330
      if (!constructor.allowSync) {
×
331
        throw e
×
332
      }
×
333

×
334
      return constructor.handle({
×
335
        id: '__sync',
×
336
        expireInSeconds: 0,
×
337
        name: constructor.getQueueName(),
×
338
        data: {
×
339
          region,
×
340
          ...this.payload,
×
341
          $version: constructor.version,
×
342
        },
×
343
      })
×
344
    } finally {
×
345
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
×
346
      queueJobSchedulingTime.record(duration, {
×
347
        name: constructor.getQueueName(),
×
348
      })
×
349
    }
×
350
  }
20,098✔
351
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc