• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24213058814

09 Apr 2026 08:59PM UTC coverage: 18.868% (-62.0%) from 80.843%
24213058814

Pull #1000

github

web-flow
Merge 2c815e22e into 41cfe7541
Pull Request #1000: feat: intro vitest & split units

524 of 3205 branches covered (16.35%)

Branch coverage included in aggregate %.

2 of 12 new or added lines in 1 file covered. (16.67%)

2701 existing lines in 93 files now uncovered.

1123 of 5524 relevant lines covered (20.33%)

3.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.25
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2
import { ERRORS } from '@internal/errors'
3
import { logger, logSchema } from '@internal/monitoring'
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
5
import { KnexQueueDB } from '@internal/queue/database'
6
import { Knex } from 'knex'
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
8
import { getConfig } from '../../config'
9
import { PG_BOSS_SCHEMA, Queue } from './queue'
10

11
export interface BasePayload {
12
  $version?: string
13
  singletonKey?: string
14
  scheduleAt?: Date
15
  reqId?: string
16
  tenant: {
17
    ref: string
18
    host: string
19
  }
20
}
21

22
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
23

24
function withPayloadVersion<TPayload extends BasePayload>(
25
  payload: TPayload,
26
  version: string
27
): TPayload {
UNCOV
28
  return {
×
29
    ...payload,
30
    $version: payload.$version ?? version,
×
31
  }
32
}
33

34
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
35

36
interface BaseEventConstructor<Base extends Event<any>> {
37
  version: string
38

39
  new (...args: any): Base
40

41
  send(
42
    this: StaticThis<Base>,
43
    payload: Omit<Base['payload'], '$version'>
44
  ): Promise<string | void | null>
45

46
  eventName(): string
47
  getWorkerOptions(): WorkOptions
48
}
49

50
/**
51
 * Base class for all events that are sent to the queue
52
 */
53
export class Event<T extends Omit<BasePayload, '$version'>> {
54
  public static readonly version: string = 'v1'
1✔
55
  protected static queueName = ''
1✔
56
  protected static allowSync = true
1✔
57

UNCOV
58
  constructor(public readonly payload: T & BasePayload) {}
×
59

60
  static eventName() {
UNCOV
61
    return this.name
×
62
  }
63

64
  static deadLetterQueueName() {
UNCOV
65
    return this.queueName + '-dead-letter'
×
66
  }
67

68
  static getQueueName() {
UNCOV
69
    if (!this.queueName) {
×
70
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
71
    }
72

UNCOV
73
    return this.queueName
×
74
  }
75

76
  static getQueueOptions(): PgBossQueue | undefined {
77
    return undefined
×
78
  }
79

80
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
UNCOV
81
    return undefined
×
82
  }
83

84
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
85
    return {}
×
86
  }
87

88
  static onClose() {
89
    // no-op
90
  }
91

92
  static onStart() {
93
    // no-op
94
  }
95

96
  static batchSend<T extends Event<any>[]>(messages: T) {
UNCOV
97
    if (!pgQueueEnable) {
×
UNCOV
98
      if (this.allowSync) {
×
UNCOV
99
        return Promise.all(messages.map((message) => message.send()))
×
100
      } else {
101
        logger.warn(
×
102
          {
103
            type: 'queue',
104
            eventType: this.eventName(),
105
          },
106
          '[Queue] skipped sending batch messages'
107
        )
108
        return
×
109
      }
110
    }
111

UNCOV
112
    return Queue.getInstance().insert(
×
113
      messages.map((message) => {
UNCOV
114
        const payloadWithVersion = withPayloadVersion(
×
115
          message.payload as (typeof message)['payload'],
116
          this.version
117
        )
UNCOV
118
        const sendOptions = (this.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
×
119

UNCOV
120
        if (payloadWithVersion.scheduleAt) {
×
UNCOV
121
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
×
122
        }
123

UNCOV
124
        return {
×
125
          ...sendOptions,
126
          name: this.getQueueName(),
127
          data: payloadWithVersion,
128
          deadLetter: this.deadLetterQueueName(),
129
        }
130
      })
131
    )
132
  }
133

134
  static send<T extends Event<any>>(
135
    this: StaticThis<T>,
136
    payload: Omit<T['payload'], '$version'>,
137
    opts?: SendOptions & { tnx?: Knex }
138
  ) {
UNCOV
139
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
×
UNCOV
140
    return that.send(opts)
×
141
  }
142

143
  static invoke<T extends Event<any>>(
144
    this: StaticThis<T>,
145
    payload: Omit<T['payload'], '$version'>
146
  ) {
UNCOV
147
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
×
UNCOV
148
    return that.invoke()
×
149
  }
150

151
  static invokeOrSend<T extends Event<any>>(
152
    this: StaticThis<T>,
153
    payload: Omit<T['payload'], '$version'>,
154
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
155
  ) {
UNCOV
156
    const that = new this(withPayloadVersion(payload as T['payload'], this.version))
×
UNCOV
157
    return that.invokeOrSend(options)
×
158
  }
159

160
  static handle(
161
    job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[],
162
    opts?: { signal?: AbortSignal }
163
  ) {
164
    throw new Error('not implemented')
×
165
  }
166

167
  static async shouldSend(payload: any) {
UNCOV
168
    if (isMultitenant && payload?.tenant?.ref) {
×
169
      // Do not send an event if disabled for this specific tenant
170
      const tenant = await getTenantConfig(payload.tenant.ref)
×
171
      const disabledEvents = tenant.disableEvents || []
×
172
      if (disabledEvents.includes(this.eventName())) {
×
173
        return false
×
174
      }
175
    }
UNCOV
176
    return true
×
177
  }
178

179
  /**
180
   * See issue https://github.com/timgit/pg-boss/issues/535
181
   * @param queueName
182
   * @param singletonKey
183
   * @param jobId
184
   */
185
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
186
    if (!pgQueueEnable) {
×
187
      return Promise.resolve()
×
188
    }
189

190
    await Queue.getDb().executeSql(
×
191
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
192
       WHERE id = $1
193
       AND EXISTS(
194
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
195
             WHERE id != $2
196
             AND state < 'active'
197
             AND name = $3
198
             AND singleton_key = $4
199
       )
200
      `,
201
      [jobId, jobId, queueName, singletonKey]
202
    )
203
  }
204

205
  async invokeOrSend(
206
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
207
  ): Promise<string | void | null> {
UNCOV
208
    const eventClass = this.constructor as typeof Event
×
209

UNCOV
210
    if (!eventClass.allowSync) {
×
211
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
212
    }
213

UNCOV
214
    try {
×
UNCOV
215
      await this.invoke()
×
216
    } catch (e) {
217
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
218
        throw e
×
219
      }
220
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
221
        type: 'queue',
222
        project: this.payload.tenant?.ref,
223
        error: e,
224
        metadata: JSON.stringify(this.payload),
225
      })
226

227
      return this.send(sendOptions)
×
228
    }
229
  }
230

231
  async invoke(): Promise<string | void | null> {
UNCOV
232
    const eventClass = this.constructor as typeof Event
×
233

UNCOV
234
    if (!eventClass.allowSync) {
×
235
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
236
    }
237

UNCOV
238
    await eventClass.handle({
×
239
      id: '__sync',
240
      expireInSeconds: 0,
241
      name: eventClass.getQueueName(),
242
      data: {
243
        region,
244
        ...this.payload,
245
        $version: eventClass.version,
246
      },
247
    })
248
  }
249

250
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
UNCOV
251
    const eventClass = this.constructor as typeof Event
×
252

UNCOV
253
    const shouldSend = await eventClass.shouldSend(this.payload)
×
254

UNCOV
255
    if (!shouldSend) {
×
256
      return
×
257
    }
258

UNCOV
259
    if (!pgQueueEnable) {
×
UNCOV
260
      if (eventClass.allowSync) {
×
UNCOV
261
        return eventClass.handle({
×
262
          id: '__sync',
263
          expireInSeconds: 0,
264
          name: eventClass.getQueueName(),
265
          data: {
266
            region,
267
            ...this.payload,
268
            $version: eventClass.version,
269
          },
270
        })
271
      } else {
UNCOV
272
        logger.warn(
×
273
          {
274
            type: 'queue',
275
            eventType: eventClass.eventName(),
276
          },
277
          '[Queue] skipped sending message'
278
        )
UNCOV
279
        return
×
280
      }
281
    }
282

UNCOV
283
    const startTime = process.hrtime.bigint()
×
UNCOV
284
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
×
285

UNCOV
286
    if (this.payload.scheduleAt) {
×
287
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
288
    }
289

UNCOV
290
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
×
291

UNCOV
292
    try {
×
UNCOV
293
      const queue = customSendOptions?.tnx
×
294
        ? Queue.createPgBoss({
295
            enableWorkers: false,
296
            db: new KnexQueueDB(customSendOptions.tnx),
297
          })
298
        : Queue.getInstance()
299

UNCOV
300
      const res = await queue.send({
×
301
        name: eventClass.getQueueName(),
302
        data: {
303
          region,
304
          ...this.payload,
305
          $version: eventClass.version,
306
        },
307
        options: {
308
          ...sendOptions,
309
          ...customSendOptions,
310
        },
311
      })
312

UNCOV
313
      queueJobScheduled.add(1, {
×
314
        name: eventClass.getQueueName(),
315
      })
316

UNCOV
317
      return res
×
318
    } catch (e) {
319
      // If we can't queue the message for some reason,
320
      // we run its handler right away.
321
      // This might create some latency with the benefit of being more fault-tolerant
322
      logSchema.warning(
×
323
        logger,
324
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
325
        {
326
          type: 'queue',
327
          error: e,
328
          metadata: JSON.stringify(this.payload),
329
        }
330
      )
331

332
      if (!eventClass.allowSync) {
×
333
        throw e
×
334
      }
335

336
      return eventClass.handle({
×
337
        id: '__sync',
338
        expireInSeconds: 0,
339
        name: eventClass.getQueueName(),
340
        data: {
341
          region,
342
          ...this.payload,
343
          $version: eventClass.version,
344
        },
345
      })
346
    } finally {
UNCOV
347
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
×
UNCOV
348
      queueJobSchedulingTime.record(duration, {
×
349
        name: eventClass.getQueueName(),
350
      })
351
    }
352
  }
353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc