• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24798577000

22 Apr 2026 07:34PM UTC coverage: 71.209% (+1.3%) from 69.944%
24798577000

Pull #1041

github

web-flow
Merge 5def5ea43 into 8e937109d
Pull Request #1041: feat: add sb-request-id logging

3465 of 5424 branches covered (63.88%)

Branch coverage included in aggregate %.

51 of 58 new or added lines in 23 files covered. (87.93%)

5 existing lines in 4 files now uncovered.

7321 of 9723 relevant lines covered (75.3%)

376.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.79
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
5
import { KnexQueueDB } from '@internal/queue/database'
6
import { Knex } from 'knex'
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
8
import { getConfig } from '../../config'
9
import { PG_BOSS_SCHEMA, Queue } from './queue'
10

11
export interface BasePayload {
12
  $version?: string
13
  singletonKey?: string
14
  scheduleAt?: Date
15
  reqId?: string
16
  sbReqId?: string
17
  tenant: {
18
    ref: string
19
    host: string
20
  }
21
}
22

23
const { pgQueueEnable, region, isMultitenant } = getConfig()
43✔
24

25
function withPayloadVersion<TPayload extends BasePayload>(
26
  payload: TPayload,
27
  version: string
28
): TPayload {
29
  return {
10,760✔
30
    ...payload,
31
    $version: payload.$version ?? version,
21,520✔
32
  }
33
}
34

35
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
36

37
interface BaseEventConstructor<Base extends Event<any>> {
38
  version: string
39

40
  new (...args: any): Base
41

42
  send(
43
    this: StaticThis<Base>,
44
    payload: Omit<Base['payload'], '$version'>
45
  ): Promise<string | void | null>
46

47
  eventName(): string
48
  getWorkerOptions(): WorkOptions
49
}
50

51
/**
52
 * Base class for all events that are sent to the queue
53
 */
54
export class Event<T extends Omit<BasePayload, '$version'>> {
55
  public static readonly version: string = 'v1'
43✔
56
  protected static queueName = ''
43✔
57
  protected static allowSync = true
43✔
58

59
  constructor(public readonly payload: T & BasePayload) {}
10,770✔
60

61
  static eventName() {
62
    return this.name
3✔
63
  }
64

65
  static deadLetterQueueName() {
66
    return this.queueName + '-dead-letter'
10,066✔
67
  }
68

69
  static getQueueName() {
70
    if (!this.queueName) {
30,957!
71
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
72
    }
73

74
    return this.queueName
30,957✔
75
  }
76

77
  static getQueueOptions(): PgBossQueue | undefined {
78
    return undefined
×
79
  }
80

81
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
82
    return undefined
1✔
83
  }
84

85
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
86
    return {}
×
87
  }
88

89
  static onClose() {
90
    // no-op
91
  }
92

93
  static onStart() {
94
    // no-op
95
  }
96

97
  static batchSend<T extends Event<any>[]>(messages: T) {
98
    if (!pgQueueEnable) {
3✔
99
      if (this.allowSync) {
1!
100
        return Promise.all(messages.map((message) => message.send()))
10✔
101
      } else {
102
        logger.warn(
×
103
          {
104
            type: 'queue',
105
            eventType: this.eventName(),
106
          },
107
          '[Queue] skipped sending batch messages'
108
        )
109
        return
×
110
      }
111
    }
112

113
    return Queue.getInstance().insert(
2✔
114
      messages.map((message) => {
115
        const payloadWithVersion = withPayloadVersion(
2✔
116
          message.payload as (typeof message)['payload'],
117
          this.version
118
        )
119
        const sendOptions = (this.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
2✔
120

121
        if (payloadWithVersion.scheduleAt) {
2✔
122
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
1✔
123
        }
124

125
        return {
2✔
126
          ...sendOptions,
127
          name: this.getQueueName(),
128
          data: payloadWithVersion,
129
          deadLetter: this.deadLetterQueueName(),
130
        }
131
      })
132
    )
133
  }
134

135
  static send<T extends Event<any>>(
136
    this: StaticThis<T>,
137
    payload: Omit<T['payload'], '$version'>,
138
    opts?: SendOptions & { tnx?: Knex }
139
  ) {
140
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
10,733✔
141
    return that.send(opts)
10,733✔
142
  }
143

144
  static invoke<T extends Event<any>>(
145
    this: StaticThis<T>,
146
    payload: Omit<T['payload'], '$version'>
147
  ) {
148
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
4✔
149
    return that.invoke()
4✔
150
  }
151

152
  static invokeOrSend<T extends Event<any>>(
153
    this: StaticThis<T>,
154
    payload: Omit<T['payload'], '$version'>,
155
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
156
  ) {
157
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
21✔
158
    return that.invokeOrSend(options)
21✔
159
  }
160

161
  static handle(
162
    job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[],
163
    opts?: { signal?: AbortSignal }
164
  ) {
165
    throw new Error('not implemented')
×
166
  }
167

168
  static async shouldSend(payload: any) {
169
    if (isMultitenant && payload?.tenant?.ref) {
50!
170
      // Do not send an event if disabled for this specific tenant
171
      const tenant = await getTenantConfig(payload.tenant.ref)
×
172
      const disabledEvents = tenant.disableEvents || []
×
173
      if (disabledEvents.includes(this.eventName())) {
×
174
        return false
×
175
      }
176
    }
177
    return true
50✔
178
  }
179

180
  /**
181
   * See issue https://github.com/timgit/pg-boss/issues/535
182
   * @param queueName
183
   * @param singletonKey
184
   * @param jobId
185
   */
186
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
187
    if (!pgQueueEnable) {
×
188
      return Promise.resolve()
×
189
    }
190

191
    await Queue.getDb().executeSql(
×
192
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
193
       WHERE id = $1
194
       AND EXISTS(
195
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
196
             WHERE id != $2
197
             AND state < 'active'
198
             AND name = $3
199
             AND singleton_key = $4
200
       )
201
      `,
202
      [jobId, jobId, queueName, singletonKey]
203
    )
204
  }
205

206
  async invokeOrSend(
207
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
208
  ): Promise<string | void | null> {
209
    const eventClass = this.constructor as typeof Event
20✔
210

211
    if (!eventClass.allowSync) {
20!
212
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
213
    }
214

215
    try {
20✔
216
      await this.invoke()
20✔
217
    } catch (e) {
218
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
219
        throw e
×
220
      }
221

UNCOV
222
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
223
        type: 'queue',
224
        project: this.payload.tenant?.ref,
225
        error: e,
226
        metadata: JSON.stringify(this.payload),
227
        sbReqId: this.payload.sbReqId,
228
      })
229

230
      return this.send(sendOptions)
×
231
    }
232
  }
233

234
  async invoke(): Promise<string | void | null> {
235
    const eventClass = this.constructor as typeof Event
23✔
236

237
    if (!eventClass.allowSync) {
23!
238
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
239
    }
240

241
    await eventClass.handle({
23✔
242
      id: '__sync',
243
      expireInSeconds: 0,
244
      name: eventClass.getQueueName(),
245
      data: {
246
        region,
247
        ...this.payload,
248
        $version: eventClass.version,
249
      },
250
    })
251
  }
252

253
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
254
    const eventClass = this.constructor as typeof Event
10,742✔
255

256
    const shouldSend = await eventClass.shouldSend(this.payload)
10,742✔
257

258
    if (!shouldSend) {
10,742!
259
      return
×
260
    }
261

262
    if (!pgQueueEnable) {
10,742✔
263
      if (eventClass.allowSync) {
678✔
264
        return eventClass.handle({
675✔
265
          id: '__sync',
266
          expireInSeconds: 0,
267
          name: eventClass.getQueueName(),
268
          data: {
269
            region,
270
            ...this.payload,
271
            $version: eventClass.version,
272
          },
273
        })
274
      } else {
275
        logger.warn(
3✔
276
          {
277
            type: 'queue',
278
            eventType: eventClass.eventName(),
279
          },
280
          '[Queue] skipped sending message'
281
        )
282
        return
3✔
283
      }
284
    }
285

286
    const startTime = process.hrtime.bigint()
10,064✔
287
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
10,064!
288

289
    if (this.payload.scheduleAt) {
10,742!
290
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
291
    }
292

293
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
10,064✔
294

295
    try {
10,064✔
296
      const queue = customSendOptions?.tnx
10,064!
297
        ? Queue.createPgBoss({
298
            enableWorkers: false,
299
            db: new KnexQueueDB(customSendOptions.tnx),
300
          })
301
        : Queue.getInstance()
302

303
      const res = await queue.send({
10,742✔
304
        name: eventClass.getQueueName(),
305
        data: {
306
          region,
307
          ...this.payload,
308
          $version: eventClass.version,
309
        },
310
        options: {
311
          ...sendOptions,
312
          ...customSendOptions,
313
        },
314
      })
315

316
      queueJobScheduled.add(1, {
10,064✔
317
        name: eventClass.getQueueName(),
318
      })
319

320
      return res
10,064✔
321
    } catch (e) {
322
      // If we can't queue the message for some reason,
323
      // we run its handler right away.
324
      // This might create some latency with the benefit of being more fault-tolerant
325
      logSchema.warning(
×
326
        logger,
327
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
328
        {
329
          type: 'queue',
330
          error: e,
331
          metadata: JSON.stringify(this.payload),
332
          sbReqId: this.payload.sbReqId,
333
        }
334
      )
335

336
      if (!eventClass.allowSync) {
×
337
        throw e
×
338
      }
339

340
      return eventClass.handle({
×
341
        id: '__sync',
342
        expireInSeconds: 0,
343
        name: eventClass.getQueueName(),
344
        data: {
345
          region,
346
          ...this.payload,
347
          $version: eventClass.version,
348
        },
349
      })
350
    } finally {
351
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
10,064✔
352
      queueJobSchedulingTime.record(duration, {
10,064✔
353
        name: eventClass.getQueueName(),
354
      })
355
    }
356
  }
357
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc