• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24158627314

08 Apr 2026 09:04PM UTC coverage: 80.854% (+0.04%) from 80.819%
24158627314

Pull #922

github

web-flow
Merge a110561e0 into f6e193abb
Pull Request #922: fix: no any rule

3196 of 4184 branches covered (76.39%)

Branch coverage included in aggregate %.

526 of 627 new or added lines in 52 files covered. (83.89%)

1 existing line in 1 file now uncovered.

30545 of 37547 relevant lines covered (81.35%)

323.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.93
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
1✔
2
import { ERRORS } from '@internal/errors'
1✔
3
import { logger, logSchema } from '@internal/monitoring'
1✔
4
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { KnexQueueDB } from '@internal/queue/database'
1✔
6
import { Knex } from 'knex'
1✔
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
1✔
8
import { getConfig } from '../../config'
1✔
9
import { PG_BOSS_SCHEMA, Queue } from './queue'
1✔
10

1✔
11
export interface BasePayload {
1✔
12
  $version?: string
1✔
13
  singletonKey?: string
1✔
14
  scheduleAt?: Date
1✔
15
  reqId?: string
1✔
16
  tenant: {
1✔
17
    ref: string
1✔
18
    host: string
1✔
19
  }
1✔
20
}
1✔
21

1✔
22
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
23

1✔
24
function withPayloadVersion<TPayload extends BasePayload>(
10,664✔
25
  payload: TPayload,
10,664✔
26
  version: string
10,664✔
27
): TPayload {
10,664✔
28
  return {
10,664✔
29
    ...payload,
10,664✔
30
    $version: payload.$version ?? version,
10,664✔
31
  }
10,664✔
32
}
10,664✔
33

1✔
34
export type EventPayload = Omit<BasePayload, '$version'>
1✔
35
type EventInstance = Event<EventPayload>
1✔
36
type EventClass = abstract new (...args: never[]) => EventInstance
1✔
37
type EventInput<T extends EventInstance> = Omit<T['payload'], '$version'> & {
1✔
38
  $version?: string
1✔
39
}
1✔
40
export type StaticThis<TPayload extends EventPayload> = BaseEventConstructor<TPayload>
1✔
41

1✔
42
interface BaseEventConstructor<TPayload extends EventPayload> {
1✔
43
  version: string
1✔
44

1✔
45
  new (...args: never[]): Event<TPayload>
1✔
46

1✔
47
  send(
1✔
48
    this: StaticThis<TPayload>,
1✔
49
    payload: EventInput<Event<TPayload>>
1✔
50
  ): Promise<string | void | null>
1✔
51

1✔
52
  eventName(): string
1✔
53
  getWorkerOptions(): WorkOptions
1✔
54
}
1✔
55

1✔
56
/**
1✔
57
 * Base class for all events that are sent to the queue
1✔
58
 */
1✔
59
export class Event<T extends EventPayload> {
1✔
60
  public static readonly version: string = 'v1'
1✔
61
  protected static queueName = ''
37✔
62
  protected static allowSync = true
37✔
63

1✔
64
  constructor(public readonly payload: T & BasePayload) {}
1✔
65

1✔
66
  static eventName() {
1✔
67
    return this.name
3✔
68
  }
3✔
69

1✔
70
  static deadLetterQueueName() {
1✔
71
    return this.queueName + '-dead-letter'
12✔
72
  }
12✔
73

1✔
74
  static getQueueName() {
1✔
75
    if (!this.queueName) {
10,749!
76
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
77
    }
×
78

10,749✔
79
    return this.queueName
10,749✔
80
  }
10,749✔
81

1✔
82
  static getQueueOptions(): PgBossQueue | undefined {
1✔
83
    return undefined
×
84
  }
×
85

1✔
86
  static getSendOptions(payload: unknown): SendOptions | undefined {
1✔
87
    return undefined
1✔
88
  }
1✔
89

1✔
90
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
1✔
91
    return {}
×
92
  }
×
93

1✔
94
  static onClose() {
1✔
95
    // no-op
×
96
  }
×
97

1✔
98
  static onStart() {
1✔
99
    // no-op
×
100
  }
×
101

1✔
102
  static batchSend<TThis extends EventClass>(
1✔
103
    this: TThis & { version: string },
3✔
104
    messages: Array<InstanceType<TThis>>
3✔
105
  ) {
3✔
106
    const eventClass = this as unknown as typeof Event
3✔
107

3✔
108
    if (!pgQueueEnable) {
3✔
109
      if (eventClass.allowSync) {
1✔
110
        return Promise.all(messages.map((message) => message.send()))
1✔
111
      } else {
1!
112
        logger.warn(
×
113
          {
×
114
            type: 'queue',
×
NEW
115
            eventType: eventClass.eventName(),
×
116
          },
×
117
          '[Queue] skipped sending batch messages'
×
118
        )
×
119
        return
×
120
      }
×
121
    }
1✔
122

2✔
123
    return Queue.getInstance().insert(
2✔
124
      messages.map((message) => {
2✔
125
        const payloadWithVersion = withPayloadVersion(message.payload, this.version)
2✔
126
        const sendOptions =
2✔
127
          (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
2✔
128

2✔
129
        if (payloadWithVersion.scheduleAt) {
2✔
130
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
1✔
131
        }
1✔
132

2✔
133
        return {
2✔
134
          ...sendOptions,
2✔
135
          name: eventClass.getQueueName(),
2✔
136
          data: payloadWithVersion,
2✔
137
          deadLetter: eventClass.deadLetterQueueName(),
2✔
138
        }
2✔
139
      })
2✔
140
    )
2✔
141
  }
2✔
142

1✔
143
  static send<TThis extends EventClass>(
1✔
144
    this: TThis & { version: string },
10,637✔
145
    payload: EventInput<InstanceType<TThis>>,
10,637✔
146
    opts?: SendOptions & { tnx?: Knex }
10,637✔
147
  ) {
10,637✔
148
    const payloadWithVersion = withPayloadVersion(
10,637✔
149
      payload as InstanceType<TThis>['payload'],
10,637✔
150
      this.version
10,637✔
151
    ) as InstanceType<TThis>['payload']
10,637✔
152
    const EventCtor = this as unknown as new (
10,637✔
153
      payload: InstanceType<TThis>['payload']
10,637✔
154
    ) => InstanceType<TThis>
10,637✔
155
    const that = new EventCtor(payloadWithVersion)
10,637✔
156
    return that.send(opts)
10,637✔
157
  }
10,637✔
158

1✔
159
  static invoke<TThis extends EventClass>(
1✔
160
    this: TThis & { version: string },
4✔
161
    payload: EventInput<InstanceType<TThis>>
4✔
162
  ) {
4✔
163
    const payloadWithVersion = withPayloadVersion(
4✔
164
      payload as InstanceType<TThis>['payload'],
4✔
165
      this.version
4✔
166
    ) as InstanceType<TThis>['payload']
4✔
167
    const EventCtor = this as unknown as new (
4✔
168
      payload: InstanceType<TThis>['payload']
4✔
169
    ) => InstanceType<TThis>
4✔
170
    const that = new EventCtor(payloadWithVersion)
4✔
171
    return that.invoke()
4✔
172
  }
4✔
173

1✔
174
  static invokeOrSend<TThis extends EventClass>(
1✔
175
    this: TThis & { version: string },
21✔
176
    payload: EventInput<InstanceType<TThis>>,
21✔
177
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
21✔
178
  ) {
21✔
179
    const payloadWithVersion = withPayloadVersion(
21✔
180
      payload as InstanceType<TThis>['payload'],
21✔
181
      this.version
21✔
182
    ) as InstanceType<TThis>['payload']
21✔
183
    const EventCtor = this as unknown as new (
21✔
184
      payload: InstanceType<TThis>['payload']
21✔
185
    ) => InstanceType<TThis>
21✔
186
    const that = new EventCtor(payloadWithVersion)
21✔
187
    return that.invokeOrSend(options)
21✔
188
  }
21✔
189

1✔
190
  static handle(job: Job<unknown> | Job<unknown>[], opts?: { signal?: AbortSignal }) {
1✔
191
    throw new Error('not implemented')
×
192
  }
×
193

1✔
194
  static async shouldSend(payload: { tenant?: { ref?: string } } | null | undefined) {
1✔
195
    if (isMultitenant && payload?.tenant?.ref) {
49!
196
      // Do not send an event if disabled for this specific tenant
×
197
      const tenant = await getTenantConfig(payload.tenant.ref)
×
198
      const disabledEvents = tenant.disableEvents || []
×
199
      if (disabledEvents.includes(this.eventName())) {
×
200
        return false
×
201
      }
×
202
    }
×
203
    return true
49✔
204
  }
49✔
205

1✔
206
  /**
1✔
207
   * See issue https://github.com/timgit/pg-boss/issues/535
1✔
208
   * @param queueName
1✔
209
   * @param singletonKey
1✔
210
   * @param jobId
1✔
211
   */
1✔
212
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
1✔
213
    if (!pgQueueEnable) {
×
214
      return Promise.resolve()
×
215
    }
×
216

×
217
    await Queue.getDb().executeSql(
×
218
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
×
219
       WHERE id = $1
×
220
       AND EXISTS(
×
221
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
×
222
             WHERE id != $2
×
223
             AND state < 'active'
×
224
             AND name = $3
×
225
             AND singleton_key = $4
×
226
       )
×
227
      `,
×
228
      [jobId, jobId, queueName, singletonKey]
×
229
    )
×
230
  }
×
231

1✔
232
  async invokeOrSend(
1✔
233
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
20✔
234
  ): Promise<string | void | null> {
20✔
235
    const eventClass = this.constructor as typeof Event
20✔
236

20✔
237
    if (!eventClass.allowSync) {
20!
238
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
239
    }
×
240

20✔
241
    try {
20✔
242
      await this.invoke()
20✔
243
    } catch (e) {
20!
244
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
245
        throw e
×
246
      }
×
247
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
248
        type: 'queue',
×
249
        project: this.payload.tenant?.ref,
×
250
        error: e,
×
251
        metadata: JSON.stringify(this.payload),
×
252
      })
×
253

×
254
      return this.send(sendOptions)
×
255
    }
×
256
  }
20✔
257

1✔
258
  async invoke(): Promise<string | void | null> {
1✔
259
    const eventClass = this.constructor as typeof Event
23✔
260

23✔
261
    if (!eventClass.allowSync) {
23!
262
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
263
    }
×
264

23✔
265
    await eventClass.handle({
23✔
266
      id: '__sync',
23✔
267
      expireInSeconds: 0,
23✔
268
      name: eventClass.getQueueName(),
23✔
269
      data: {
23✔
270
        region,
23✔
271
        ...this.payload,
23✔
272
        $version: eventClass.version,
23✔
273
      },
23✔
274
    })
23✔
275
  }
23✔
276

1✔
277
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
1✔
278
    const eventClass = this.constructor as typeof Event
10,646✔
279

10,646✔
280
    const shouldSend = await eventClass.shouldSend(this.payload)
10,646✔
281

10,646✔
282
    if (!shouldSend) {
10,646!
283
      return
×
284
    }
×
285

10,646✔
286
    if (!pgQueueEnable) {
10,646✔
287
      if (eventClass.allowSync) {
10,636✔
288
        return eventClass.handle({
10,633✔
289
          id: '__sync',
10,633✔
290
          expireInSeconds: 0,
10,633✔
291
          name: eventClass.getQueueName(),
10,633✔
292
          data: {
10,633✔
293
            region,
10,633✔
294
            ...this.payload,
10,633✔
295
            $version: eventClass.version,
10,633✔
296
          },
10,633✔
297
        })
10,633✔
298
      } else {
10,636✔
299
        logger.warn(
3✔
300
          {
3✔
301
            type: 'queue',
3✔
302
            eventType: eventClass.eventName(),
3✔
303
          },
3✔
304
          '[Queue] skipped sending message'
3✔
305
        )
3✔
306
        return
3✔
307
      }
3✔
308
    }
10,636✔
309

10✔
310
    const startTime = process.hrtime.bigint()
10✔
311
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
10!
312

10,646✔
313
    if (this.payload.scheduleAt) {
10,646!
314
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
315
    }
×
316

10✔
317
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
10✔
318

10✔
319
    try {
10✔
320
      const queue = customSendOptions?.tnx
10!
321
        ? Queue.createPgBoss({
10,646!
322
            enableWorkers: false,
×
323
            db: new KnexQueueDB(customSendOptions.tnx),
×
324
          })
×
325
        : Queue.getInstance()
10,646✔
326

10,646✔
327
      const res = await queue.send({
10,646✔
328
        name: eventClass.getQueueName(),
10,646✔
329
        data: {
10,646✔
330
          region,
10,646✔
331
          ...this.payload,
10,646✔
332
          $version: eventClass.version,
10,646✔
333
        },
10,646✔
334
        options: {
10,646✔
335
          ...sendOptions,
10,646✔
336
          ...customSendOptions,
10,646✔
337
        },
10,646✔
338
      })
10,646✔
339

10✔
340
      queueJobScheduled.add(1, {
10✔
341
        name: eventClass.getQueueName(),
10✔
342
      })
10✔
343

10✔
344
      return res
10✔
345
    } catch (e) {
10!
346
      // If we can't queue the message for some reason,
×
347
      // we run its handler right away.
×
348
      // This might create some latency with the benefit of being more fault-tolerant
×
349
      logSchema.warning(
×
350
        logger,
×
351
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
352
        {
×
353
          type: 'queue',
×
354
          error: e,
×
355
          metadata: JSON.stringify(this.payload),
×
356
        }
×
357
      )
×
358

×
359
      if (!eventClass.allowSync) {
×
360
        throw e
×
361
      }
×
362

×
363
      return eventClass.handle({
×
364
        id: '__sync',
×
365
        expireInSeconds: 0,
×
366
        name: eventClass.getQueueName(),
×
367
        data: {
×
368
          region,
×
369
          ...this.payload,
×
370
          $version: eventClass.version,
×
371
        },
×
372
      })
×
373
    } finally {
×
374
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
10✔
375
      queueJobSchedulingTime.record(duration, {
10✔
376
        name: eventClass.getQueueName(),
10✔
377
      })
10✔
378
    }
10✔
379
  }
10,646✔
380
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc