• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24802340028

22 Apr 2026 08:58PM UTC coverage: 71.253% (+1.3%) from 69.944%
24802340028

Pull #1041

github

web-flow
Merge 7fee90809 into 8e937109d
Pull Request #1041: feat: add sb-request-id logging

3472 of 5431 branches covered (63.93%)

Branch coverage included in aggregate %.

80 of 88 new or added lines in 25 files covered. (90.91%)

6 existing lines in 4 files now uncovered.

7330 of 9729 relevant lines covered (75.34%)

377.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.1
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
5
import { KnexQueueDB } from '@internal/queue/database'
6
import { Knex } from 'knex'
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
8
import { getConfig } from '../../config'
9
import { PG_BOSS_SCHEMA, Queue } from './queue'
10

11
export interface BasePayload {
12
  $version?: string
13
  singletonKey?: string
14
  scheduleAt?: Date
15
  reqId?: string
16
  sbReqId?: string
17
  tenant: {
18
    ref: string
19
    host: string
20
  }
21
}
22

23
const { pgQueueEnable, region, isMultitenant } = getConfig()
43✔
24

25
function withPayloadVersion<TPayload extends BasePayload>(
26
  payload: TPayload,
27
  version: string
28
): TPayload {
29
  return {
10,760✔
30
    ...payload,
31
    $version: payload.$version ?? version,
21,520✔
32
  }
33
}
34

35
export type EventInputPayload = Omit<BasePayload, '$version'>
36
export type QueueEvent<T extends EventInputPayload = EventInputPayload> = Event<T>
37
export type StaticThis<TPayload extends BasePayload> = BaseEventConstructor<TPayload>
38

39
interface BaseEventConstructor<TPayload extends BasePayload> {
40
  version: string
41

42
  new (payload: TPayload): QueueEvent<Omit<TPayload, '$version'>>
43
}
44

45
/**
46
 * Base class for all events that are sent to the queue
47
 */
48
export class Event<T extends Omit<BasePayload, '$version'>> {
49
  public static readonly version: string = 'v1'
43✔
50
  protected static queueName = ''
43✔
51
  protected static allowSync = true
43✔
52

53
  constructor(public readonly payload: T & BasePayload) {}
10,770✔
54

55
  static eventName() {
56
    return this.name
3✔
57
  }
58

59
  static deadLetterQueueName() {
60
    return this.queueName + '-dead-letter'
10,066✔
61
  }
62

63
  static getQueueName() {
64
    if (!this.queueName) {
30,957!
65
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
66
    }
67

68
    return this.queueName
30,957✔
69
  }
70

71
  static getQueueOptions(): PgBossQueue | undefined {
72
    return undefined
×
73
  }
74

75
  static getSendOptions(payload: BasePayload): SendOptions | undefined {
76
    return undefined
1✔
77
  }
78

79
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
80
    return {}
×
81
  }
82

83
  static onClose() {
84
    // no-op
85
  }
86

87
  static onStart() {
88
    // no-op
89
  }
90

91
  static batchSend<TPayload extends BasePayload>(
92
    this: StaticThis<TPayload>,
93
    messages: Array<{ payload: TPayload; send(): Promise<string | void | null> }>
94
  ) {
95
    const eventClass = this as typeof Event
3✔
96

97
    if (!pgQueueEnable) {
3✔
98
      if (eventClass.allowSync) {
1!
99
        return Promise.all(messages.map((message) => message.send()))
10✔
100
      } else {
101
        logger.warn(
×
102
          {
103
            type: 'queue',
104
            eventType: eventClass.eventName(),
105
          },
106
          '[Queue] skipped sending batch messages'
107
        )
108
        return
×
109
      }
110
    }
111

112
    return Queue.getInstance().insert(
2✔
113
      messages.map((message) => {
114
        const payloadWithVersion = withPayloadVersion(message.payload, eventClass.version)
2✔
115
        const sendOptions =
116
          (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
2✔
117

118
        if (payloadWithVersion.scheduleAt) {
2✔
119
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
1✔
120
        }
121

122
        return {
2✔
123
          ...sendOptions,
124
          name: eventClass.getQueueName(),
125
          data: payloadWithVersion,
126
          deadLetter: eventClass.deadLetterQueueName(),
127
        }
128
      })
129
    )
130
  }
131

132
  static send<TPayload extends BasePayload>(
133
    this: StaticThis<TPayload>,
134
    payload: Omit<TPayload, '$version'>,
135
    opts?: SendOptions & { tnx?: Knex }
136
  ) {
137
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
10,733✔
138
    return that.send(opts)
10,733✔
139
  }
140

141
  static invoke<TPayload extends BasePayload>(
142
    this: StaticThis<TPayload>,
143
    payload: Omit<TPayload, '$version'>
144
  ) {
145
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
4✔
146
    return that.invoke()
4✔
147
  }
148

149
  static invokeOrSend<TPayload extends BasePayload>(
150
    this: StaticThis<TPayload>,
151
    payload: Omit<TPayload, '$version'>,
152
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
153
  ) {
154
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
21✔
155
    return that.invokeOrSend(options)
21✔
156
  }
157

158
  static handle(job: Job<BasePayload> | Job<BasePayload>[], opts?: { signal?: AbortSignal }) {
UNCOV
159
    throw new Error('not implemented')
×
160
  }
161

162
  static async shouldSend(payload: BasePayload) {
163
    if (isMultitenant && payload?.tenant?.ref) {
50!
164
      // Do not send an event if disabled for this specific tenant
165
      const tenant = await getTenantConfig(payload.tenant.ref)
×
166
      const disabledEvents = tenant.disableEvents || []
×
167
      if (disabledEvents.includes(this.eventName())) {
×
168
        return false
×
169
      }
170
    }
171
    return true
50✔
172
  }
173

174
  /**
175
   * See issue https://github.com/timgit/pg-boss/issues/535
176
   * @param queueName
177
   * @param singletonKey
178
   * @param jobId
179
   */
180
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
181
    if (!pgQueueEnable) {
×
182
      return Promise.resolve()
×
183
    }
184

185
    await Queue.getDb().executeSql(
×
186
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
187
       WHERE id = $1
188
       AND EXISTS(
189
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
190
             WHERE id != $2
191
             AND state < 'active'
192
             AND name = $3
193
             AND singleton_key = $4
194
       )
195
      `,
196
      [jobId, jobId, queueName, singletonKey]
197
    )
198
  }
199

200
  async invokeOrSend(
201
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
202
  ): Promise<string | void | null> {
203
    const eventClass = this.constructor as typeof Event
20✔
204

205
    if (!eventClass.allowSync) {
20!
206
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
207
    }
208

209
    try {
20✔
210
      await this.invoke()
20✔
211
    } catch (e) {
212
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
213
        throw e
×
214
      }
215

UNCOV
216
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
217
        type: 'queue',
218
        project: this.payload.tenant?.ref,
219
        error: e,
220
        metadata: JSON.stringify(this.payload),
221
        sbReqId: this.payload.sbReqId,
222
      })
223

224
      return this.send(sendOptions)
×
225
    }
226
  }
227

228
  async invoke(): Promise<string | void | null> {
229
    const eventClass = this.constructor as typeof Event
23✔
230

231
    if (!eventClass.allowSync) {
23!
232
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
233
    }
234

235
    await eventClass.handle({
23✔
236
      id: '__sync',
237
      expireInSeconds: 0,
238
      name: eventClass.getQueueName(),
239
      data: {
240
        region,
241
        ...this.payload,
242
        $version: eventClass.version,
243
      },
244
    })
245
  }
246

247
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
248
    const eventClass = this.constructor as typeof Event
10,742✔
249

250
    const shouldSend = await eventClass.shouldSend(this.payload)
10,742✔
251

252
    if (!shouldSend) {
10,742!
253
      return
×
254
    }
255

256
    if (!pgQueueEnable) {
10,742✔
257
      if (eventClass.allowSync) {
678✔
258
        return eventClass.handle({
675✔
259
          id: '__sync',
260
          expireInSeconds: 0,
261
          name: eventClass.getQueueName(),
262
          data: {
263
            region,
264
            ...this.payload,
265
            $version: eventClass.version,
266
          },
267
        })
268
      } else {
269
        logger.warn(
3✔
270
          {
271
            type: 'queue',
272
            eventType: eventClass.eventName(),
273
          },
274
          '[Queue] skipped sending message'
275
        )
276
        return
3✔
277
      }
278
    }
279

280
    const startTime = process.hrtime.bigint()
10,064✔
281
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
10,064!
282

283
    if (this.payload.scheduleAt) {
10,742!
284
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
285
    }
286

287
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
10,064✔
288

289
    try {
10,064✔
290
      const queue = customSendOptions?.tnx
10,064!
291
        ? Queue.createPgBoss({
292
            enableWorkers: false,
293
            db: new KnexQueueDB(customSendOptions.tnx),
294
          })
295
        : Queue.getInstance()
296

297
      const res = await queue.send({
10,742✔
298
        name: eventClass.getQueueName(),
299
        data: {
300
          region,
301
          ...this.payload,
302
          $version: eventClass.version,
303
        },
304
        options: {
305
          ...sendOptions,
306
          ...customSendOptions,
307
        },
308
      })
309

310
      queueJobScheduled.add(1, {
10,064✔
311
        name: eventClass.getQueueName(),
312
      })
313

314
      return res
10,064✔
315
    } catch (e) {
316
      // If we can't queue the message for some reason,
317
      // we run its handler right away.
318
      // This might create some latency with the benefit of being more fault-tolerant
319
      logSchema.warning(
×
320
        logger,
321
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
322
        {
323
          type: 'queue',
324
          error: e,
325
          metadata: JSON.stringify(this.payload),
326
          sbReqId: this.payload.sbReqId,
327
        }
328
      )
329

330
      if (!eventClass.allowSync) {
×
331
        throw e
×
332
      }
333

334
      return eventClass.handle({
×
335
        id: '__sync',
336
        expireInSeconds: 0,
337
        name: eventClass.getQueueName(),
338
        data: {
339
          region,
340
          ...this.payload,
341
          $version: eventClass.version,
342
        },
343
      })
344
    } finally {
345
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
10,064✔
346
      queueJobSchedulingTime.record(duration, {
10,064✔
347
        name: eventClass.getQueueName(),
348
      })
349
    }
350
  }
351
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc