• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24848158243

23 Apr 2026 05:03PM UTC coverage: 71.255% (+1.3%) from 69.952%
24848158243

push

github

web-flow
feat: add sb-request-id logging (#1041)

* feat: add sb-request-id logging

Also propagate the existing `reqId` where missed.

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

* chore: system tenant

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

---------

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

3480 of 5443 branches covered (63.94%)

Branch coverage included in aggregate %.

81 of 89 new or added lines in 25 files covered. (91.01%)

6 existing lines in 4 files now uncovered.

7333 of 9732 relevant lines covered (75.35%)

377.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.16
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
5
import { KnexQueueDB } from '@internal/queue/database'
6
import { Knex } from 'knex'
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
8
import { getConfig } from '../../config'
9
import { SYSTEM_TENANT_REF } from './constants'
10
import { PG_BOSS_SCHEMA, Queue } from './queue'
11

12
export interface BasePayload {
13
  $version?: string
14
  singletonKey?: string
15
  scheduleAt?: Date
16
  reqId?: string
17
  sbReqId?: string
18
  tenant: {
19
    ref: string
20
    host: string
21
  }
22
}
23

24
const { pgQueueEnable, region, isMultitenant } = getConfig()
43✔
25

26
function withPayloadVersion<TPayload extends BasePayload>(
27
  payload: TPayload,
28
  version: string
29
): TPayload {
30
  return {
10,760✔
31
    ...payload,
32
    $version: payload.$version ?? version,
21,520✔
33
  }
34
}
35

36
export type EventInputPayload = Omit<BasePayload, '$version'>
37
export type QueueEvent<T extends EventInputPayload = EventInputPayload> = Event<T>
38
export type StaticThis<TPayload extends BasePayload> = BaseEventConstructor<TPayload>
39

40
interface BaseEventConstructor<TPayload extends BasePayload> {
41
  version: string
42

43
  new (payload: TPayload): QueueEvent<Omit<TPayload, '$version'>>
44
}
45

46
/**
47
 * Base class for all events that are sent to the queue
48
 */
49
export class Event<T extends Omit<BasePayload, '$version'>> {
50
  public static readonly version: string = 'v1'
43✔
51
  protected static queueName = ''
43✔
52
  protected static allowSync = true
43✔
53

54
  constructor(public readonly payload: T & BasePayload) {}
10,770✔
55

56
  static eventName() {
57
    return this.name
3✔
58
  }
59

60
  static deadLetterQueueName() {
61
    return this.queueName + '-dead-letter'
10,066✔
62
  }
63

64
  static getQueueName() {
65
    if (!this.queueName) {
30,957!
66
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
67
    }
68

69
    return this.queueName
30,957✔
70
  }
71

72
  static getQueueOptions(): PgBossQueue | undefined {
73
    return undefined
×
74
  }
75

76
  static getSendOptions(payload: BasePayload): SendOptions | undefined {
77
    return undefined
1✔
78
  }
79

80
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
81
    return {}
×
82
  }
83

84
  static onClose() {
85
    // no-op
86
  }
87

88
  static onStart() {
89
    // no-op
90
  }
91

92
  static batchSend<TPayload extends BasePayload>(
93
    this: StaticThis<TPayload>,
94
    messages: Array<{ payload: TPayload; send(): Promise<string | void | null> }>
95
  ) {
96
    const eventClass = this as typeof Event
3✔
97

98
    if (!pgQueueEnable) {
3✔
99
      if (eventClass.allowSync) {
1!
100
        return Promise.all(messages.map((message) => message.send()))
10✔
101
      } else {
102
        logger.warn(
×
103
          {
104
            type: 'queue',
105
            eventType: eventClass.eventName(),
106
          },
107
          '[Queue] skipped sending batch messages'
108
        )
109
        return
×
110
      }
111
    }
112

113
    return Queue.getInstance().insert(
2✔
114
      messages.map((message) => {
115
        const payloadWithVersion = withPayloadVersion(message.payload, eventClass.version)
2✔
116
        const sendOptions =
117
          (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
2✔
118

119
        if (payloadWithVersion.scheduleAt) {
2✔
120
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
1✔
121
        }
122

123
        return {
2✔
124
          ...sendOptions,
125
          name: eventClass.getQueueName(),
126
          data: payloadWithVersion,
127
          deadLetter: eventClass.deadLetterQueueName(),
128
        }
129
      })
130
    )
131
  }
132

133
  static send<TPayload extends BasePayload>(
134
    this: StaticThis<TPayload>,
135
    payload: Omit<TPayload, '$version'>,
136
    opts?: SendOptions & { tnx?: Knex }
137
  ) {
138
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
10,733✔
139
    return that.send(opts)
10,733✔
140
  }
141

142
  static invoke<TPayload extends BasePayload>(
143
    this: StaticThis<TPayload>,
144
    payload: Omit<TPayload, '$version'>
145
  ) {
146
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
4✔
147
    return that.invoke()
4✔
148
  }
149

150
  static invokeOrSend<TPayload extends BasePayload>(
151
    this: StaticThis<TPayload>,
152
    payload: Omit<TPayload, '$version'>,
153
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
154
  ) {
155
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
21✔
156
    return that.invokeOrSend(options)
21✔
157
  }
158

159
  static handle(job: Job<BasePayload> | Job<BasePayload>[], opts?: { signal?: AbortSignal }) {
UNCOV
160
    throw new Error('not implemented')
×
161
  }
162

163
  static async shouldSend(payload: BasePayload) {
164
    if (isMultitenant && payload?.tenant?.ref) {
50!
165
      // Do not send an event if disabled for this specific tenant
166
      const tenant = await getTenantConfig(payload.tenant.ref)
×
167
      const disabledEvents = tenant.disableEvents || []
×
168
      if (disabledEvents.includes(this.eventName())) {
×
169
        return false
×
170
      }
171
    }
172
    return true
50✔
173
  }
174

175
  /**
176
   * See issue https://github.com/timgit/pg-boss/issues/535
177
   * @param queueName
178
   * @param singletonKey
179
   * @param jobId
180
   */
181
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
182
    if (!pgQueueEnable) {
×
183
      return Promise.resolve()
×
184
    }
185

186
    await Queue.getDb().executeSql(
×
187
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
188
       WHERE id = $1
189
       AND EXISTS(
190
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
191
             WHERE id != $2
192
             AND state < 'active'
193
             AND name = $3
194
             AND singleton_key = $4
195
       )
196
      `,
197
      [jobId, jobId, queueName, singletonKey]
198
    )
199
  }
200

201
  async invokeOrSend(
202
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
203
  ): Promise<string | void | null> {
204
    const eventClass = this.constructor as typeof Event
20✔
205

206
    if (!eventClass.allowSync) {
20!
207
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
208
    }
209

210
    try {
20✔
211
      await this.invoke()
20✔
212
    } catch (e) {
213
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
214
        throw e
×
215
      }
216

UNCOV
217
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
218
        type: 'queue',
219
        project: this.payload.tenant?.ref || SYSTEM_TENANT_REF,
×
220
        error: e,
221
        metadata: JSON.stringify(this.payload),
222
        sbReqId: this.payload.sbReqId,
223
      })
224

225
      return this.send(sendOptions)
×
226
    }
227
  }
228

229
  async invoke(): Promise<string | void | null> {
230
    const eventClass = this.constructor as typeof Event
23✔
231

232
    if (!eventClass.allowSync) {
23!
233
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
234
    }
235

236
    await eventClass.handle({
23✔
237
      id: '__sync',
238
      expireInSeconds: 0,
239
      name: eventClass.getQueueName(),
240
      data: {
241
        region,
242
        ...this.payload,
243
        $version: eventClass.version,
244
      },
245
    })
246
  }
247

248
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
249
    const eventClass = this.constructor as typeof Event
10,742✔
250

251
    const shouldSend = await eventClass.shouldSend(this.payload)
10,742✔
252

253
    if (!shouldSend) {
10,742!
254
      return
×
255
    }
256

257
    if (!pgQueueEnable) {
10,742✔
258
      if (eventClass.allowSync) {
678✔
259
        return eventClass.handle({
675✔
260
          id: '__sync',
261
          expireInSeconds: 0,
262
          name: eventClass.getQueueName(),
263
          data: {
264
            region,
265
            ...this.payload,
266
            $version: eventClass.version,
267
          },
268
        })
269
      } else {
270
        logger.warn(
3✔
271
          {
272
            type: 'queue',
273
            eventType: eventClass.eventName(),
274
          },
275
          '[Queue] skipped sending message'
276
        )
277
        return
3✔
278
      }
279
    }
280

281
    const startTime = process.hrtime.bigint()
10,064✔
282
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
10,064!
283

284
    if (this.payload.scheduleAt) {
10,742!
285
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
286
    }
287

288
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
10,064✔
289

290
    try {
10,064✔
291
      const queue = customSendOptions?.tnx
10,064!
292
        ? Queue.createPgBoss({
293
            enableWorkers: false,
294
            db: new KnexQueueDB(customSendOptions.tnx),
295
          })
296
        : Queue.getInstance()
297

298
      const res = await queue.send({
10,742✔
299
        name: eventClass.getQueueName(),
300
        data: {
301
          region,
302
          ...this.payload,
303
          $version: eventClass.version,
304
        },
305
        options: {
306
          ...sendOptions,
307
          ...customSendOptions,
308
        },
309
      })
310

311
      queueJobScheduled.add(1, {
10,064✔
312
        name: eventClass.getQueueName(),
313
      })
314

315
      return res
10,064✔
316
    } catch (e) {
317
      // If we can't queue the message for some reason,
318
      // we run its handler right away.
319
      // This might create some latency with the benefit of being more fault-tolerant
320
      logSchema.warning(
×
321
        logger,
322
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
323
        {
324
          type: 'queue',
325
          project: this.payload.tenant?.ref || SYSTEM_TENANT_REF,
×
326
          error: e,
327
          metadata: JSON.stringify(this.payload),
328
          sbReqId: this.payload.sbReqId,
329
        }
330
      )
331

332
      if (!eventClass.allowSync) {
×
333
        throw e
×
334
      }
335

336
      return eventClass.handle({
×
337
        id: '__sync',
338
        expireInSeconds: 0,
339
        name: eventClass.getQueueName(),
340
        data: {
341
          region,
342
          ...this.payload,
343
          $version: eventClass.version,
344
        },
345
      })
346
    } finally {
347
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
10,064✔
348
      queueJobSchedulingTime.record(duration, {
10,064✔
349
        name: eventClass.getQueueName(),
350
      })
351
    }
352
  }
353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc