• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 19362855920

14 Nov 2025 11:14AM UTC coverage: 75.913% (-1.8%) from 77.76%
19362855920

Pull #796

github

web-flow
Merge e4cf9b460 into b9acc7ca1
Pull Request #796: feat: add analytics bucket sharding

2074 of 3009 branches covered (68.93%)

Branch coverage included in aggregate %.

945 of 2098 new or added lines in 36 files covered. (45.04%)

50 existing lines in 7 files now uncovered.

25386 of 33164 relevant lines covered (76.55%)

94.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.96
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { Job, SendOptions, WorkOptions, Queue as PgBossQueue } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7
import { ERRORS } from '@internal/errors'
1✔
8
import { KnexQueueDB } from '@internal/queue/database'
1✔
9
import { Knex } from 'knex'
1✔
10

1✔
11
export interface BasePayload {
1✔
12
  $version?: string
1✔
13
  singletonKey?: string
1✔
14
  scheduleAt?: Date
1✔
15
  reqId?: string
1✔
16
  tenant: {
1✔
17
    ref: string
1✔
18
    host: string
1✔
19
  }
1✔
20
}
1✔
21

1✔
22
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
23

1✔
24
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
25

1✔
26
interface BaseEventConstructor<Base extends Event<any>> {
1✔
27
  version: string
1✔
28

1✔
29
  new (...args: any): Base
1✔
30

1✔
31
  send(
1✔
32
    this: StaticThis<Base>,
1✔
33
    payload: Omit<Base['payload'], '$version'>
1✔
34
  ): Promise<string | void | null>
1✔
35

1✔
36
  eventName(): string
1✔
37
  getWorkerOptions(): WorkOptions
1✔
38
}
1✔
39

1✔
40
/**
1✔
41
 * Base class for all events that are sent to the queue
1✔
42
 */
1✔
43
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
44
  public static readonly version: string = 'v1'
1✔
45
  protected static queueName = ''
1✔
46
  protected static allowSync = true
1✔
47

1✔
48
  constructor(public readonly payload: T & BasePayload) {}
1✔
49

1✔
50
  static eventName() {
1✔
51
    return this.name
1✔
52
  }
1✔
53

1✔
54
  static deadLetterQueueName() {
1✔
55
    return this.queueName + '-dead-letter'
1✔
56
  }
1✔
57

1✔
58
  static getQueueName() {
1✔
59
    if (!this.queueName) {
10,048!
60
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
61
    }
×
62

10,048✔
63
    return this.queueName
10,048✔
64
  }
10,048✔
65

1✔
66
  static getQueueOptions(): PgBossQueue | undefined {
1✔
67
    return undefined
×
68
  }
×
69

1✔
70
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
71
    return undefined
×
72
  }
×
73

1✔
74
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
1✔
75
    return {}
×
76
  }
×
77

1✔
78
  static onClose() {
1✔
79
    // no-op
×
80
  }
×
81

1✔
82
  static onStart() {
1✔
83
    // no-op
×
84
  }
×
85

1✔
86
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
87
    if (!pgQueueEnable) {
1!
88
      if (this.allowSync) {
×
89
        return Promise.all(messages.map((message) => message.send()))
✔
90
      } else {
×
91
        logger.warn('[Queue] skipped sending batch messages', {
×
92
          type: 'queue',
×
93
          eventType: this.eventName(),
×
94
        })
×
95
        return
×
96
      }
×
97
    }
×
98

1✔
99
    return Queue.getInstance().insert(
1✔
100
      messages.map((message) => {
1✔
101
        const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {}
1!
102
        if (!message.payload.$version) {
1✔
103
          ;(message.payload as (typeof message)['payload']).$version = this.version
1✔
104
        }
1✔
105

1✔
106
        if (message.payload.scheduleAt) {
1!
107
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
108
        }
×
109

1✔
110
        return {
1✔
111
          ...sendOptions,
1✔
112
          name: this.getQueueName(),
1✔
113
          data: message.payload,
1✔
114
          deadLetter: this.deadLetterQueueName(),
1✔
115
        }
1✔
116
      })
1✔
117
    )
1✔
118
  }
1✔
119

1✔
120
  static send<T extends Event<any>>(
1✔
121
    this: StaticThis<T>,
10,048✔
122
    payload: Omit<T['payload'], '$version'>,
10,048✔
123
    opts?: SendOptions & { tnx?: Knex }
10,048✔
124
  ) {
10,048✔
125
    if (!payload.$version) {
10,048✔
126
      ;(payload as T['payload']).$version = this.version
10,048✔
127
    }
10,048✔
128
    const that = new this(payload)
10,048✔
129
    return that.send(opts)
10,048✔
130
  }
10,048✔
131

1✔
132
  static invoke<T extends Event<any>>(
1✔
133
    this: StaticThis<T>,
1✔
134
    payload: Omit<T['payload'], '$version'>
1✔
135
  ) {
1✔
136
    if (!payload.$version) {
1✔
137
      ;(payload as T['payload']).$version = this.version
1✔
138
    }
1✔
139
    const that = new this(payload)
1✔
140
    return that.invoke()
1✔
141
  }
1✔
142

1✔
143
  static invokeOrSend<T extends Event<any>>(
1✔
144
    this: StaticThis<T>,
16✔
145
    payload: Omit<T['payload'], '$version'>,
16✔
146
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
16✔
147
  ) {
16✔
148
    if (!payload.$version) {
16✔
149
      ;(payload as T['payload']).$version = this.version
16✔
150
    }
16✔
151
    const that = new this(payload)
16✔
152
    return that.invokeOrSend(options)
16✔
153
  }
16✔
154

1✔
155
  static handle(
1✔
156
    job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[],
×
157
    opts?: { signal?: AbortSignal }
×
158
  ) {
×
159
    throw new Error('not implemented')
×
160
  }
×
161

1✔
162
  static async shouldSend(payload: any) {
1✔
163
    if (isMultitenant && payload?.tenant?.ref) {
16!
164
      // Do not send an event if disabled for this specific tenant
×
165
      const tenant = await getTenantConfig(payload.tenant.ref)
×
166
      const disabledEvents = tenant.disableEvents || []
×
167
      if (disabledEvents.includes(this.eventName())) {
×
168
        return false
×
169
      }
×
170
    }
×
171
    return true
16✔
172
  }
16✔
173

1✔
174
  /**
1✔
175
   * See issue https://github.com/timgit/pg-boss/issues/535
1✔
176
   * @param queueName
1✔
177
   * @param singletonKey
1✔
178
   * @param jobId
1✔
179
   */
1✔
180
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
1✔
181
    if (!pgQueueEnable) {
×
182
      return Promise.resolve()
×
183
    }
×
184

×
185
    await Queue.getDb().executeSql(
×
186
      `DELETE FROM pgboss_v10.job
×
187
       WHERE id = $1
×
188
       AND EXISTS(
×
189
          SELECT 1 FROM pgboss_v10.job
×
190
             WHERE id != $2
×
191
             AND state < 'active'
×
192
             AND name = $3
×
193
             AND singleton_key = $4
×
194
       )
×
195
      `,
×
196
      [jobId, jobId, queueName, singletonKey]
×
197
    )
×
198
  }
×
199

1✔
200
  async invokeOrSend(
1✔
201
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
16✔
202
  ): Promise<string | void | null> {
16✔
203
    const constructor = this.constructor as typeof Event
16✔
204

16✔
205
    if (!constructor.allowSync) {
16!
206
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
207
    }
×
208

16✔
209
    try {
16✔
210
      await this.invoke()
16✔
211
    } catch (e) {
16!
NEW
212
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
NEW
213
        throw e
×
NEW
214
      }
×
215
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
216
        type: 'queue',
×
217
        project: this.payload.tenant?.ref,
×
218
        error: e,
×
219
        metadata: JSON.stringify(this.payload),
×
220
      })
×
221

×
222
      return this.send(sendOptions)
×
223
    }
×
224
  }
16✔
225

1✔
226
  async invoke(): Promise<string | void | null> {
1✔
227
    const constructor = this.constructor as typeof Event
17✔
228

17✔
229
    if (!constructor.allowSync) {
17!
230
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
231
    }
×
232

17✔
233
    await constructor.handle({
17✔
234
      id: '__sync',
17✔
235
      expireInSeconds: 0,
17✔
236
      name: constructor.getQueueName(),
17✔
237
      data: {
17✔
238
        region,
17✔
239
        ...this.payload,
17✔
240
        $version: constructor.version,
17✔
241
      },
17✔
242
    })
17✔
243
  }
17✔
244

1✔
245
  async send(customSendOptions?: SendOptions & { tnx?: Knex }): Promise<string | void | null> {
1✔
246
    const constructor = this.constructor as typeof Event
10,048✔
247

10,048✔
248
    const shouldSend = await constructor.shouldSend(this.payload)
10,048✔
249

10,048✔
250
    if (!shouldSend) {
10,048!
251
      return
×
252
    }
×
253

10,048✔
254
    if (!pgQueueEnable) {
10,048✔
255
      if (constructor.allowSync) {
10,048✔
256
        return constructor.handle({
10,048✔
257
          id: '__sync',
10,048✔
258
          expireInSeconds: 0,
10,048✔
259
          name: constructor.getQueueName(),
10,048✔
260
          data: {
10,048✔
261
            region,
10,048✔
262
            ...this.payload,
10,048✔
263
            $version: constructor.version,
10,048✔
264
          },
10,048✔
265
        })
10,048✔
266
      } else {
10,048!
267
        logger.warn('[Queue] skipped sending message', {
×
268
          type: 'queue',
×
269
          eventType: constructor.eventName(),
×
270
        })
×
271
        return
×
272
      }
×
273
    }
10,048✔
274

×
275
    const timer = QueueJobSchedulingTime.startTimer()
×
276
    const sendOptions = constructor.getSendOptions(this.payload) || {}
×
277

10,048✔
278
    if (this.payload.scheduleAt) {
10,048!
279
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
280
    }
×
281

×
282
    sendOptions!.deadLetter = constructor.deadLetterQueueName()
×
283

×
284
    try {
×
NEW
285
      const queue = customSendOptions?.tnx
×
286
        ? Queue.createPgBoss({
10,048!
NEW
287
            enableWorkers: false,
×
NEW
288
            db: new KnexQueueDB(customSendOptions.tnx),
×
NEW
289
          })
×
290
        : Queue.getInstance()
10,048!
291

10,048✔
292
      const res = await queue.send({
10,048✔
293
        name: constructor.getQueueName(),
10,048✔
294
        data: {
10,048✔
295
          region,
10,048✔
296
          ...this.payload,
10,048✔
297
          $version: constructor.version,
10,048✔
298
        },
10,048✔
299
        options: {
10,048✔
300
          ...sendOptions,
10,048✔
301
          ...customSendOptions,
10,048✔
302
        },
10,048✔
303
      })
10,048✔
304

×
305
      QueueJobScheduled.inc({
×
306
        name: constructor.getQueueName(),
×
307
      })
×
308

×
309
      return res
×
310
    } catch (e) {
×
311
      // If we can't queue the message for some reason,
×
312
      // we run its handler right away.
×
313
      // This might create some latency with the benefit of being more fault-tolerant
×
314
      logSchema.warning(
×
315
        logger,
×
316
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
317
        {
×
318
          type: 'queue',
×
319
          error: e,
×
320
          metadata: JSON.stringify(this.payload),
×
321
        }
×
322
      )
×
NEW
323

×
NEW
324
      if (!constructor.allowSync) {
×
NEW
325
        throw e
×
NEW
326
      }
×
NEW
327

×
328
      return constructor.handle({
×
329
        id: '__sync',
×
330
        expireInSeconds: 0,
×
331
        name: constructor.getQueueName(),
×
332
        data: {
×
333
          region,
×
334
          ...this.payload,
×
335
          $version: constructor.version,
×
336
        },
×
337
      })
×
338
    } finally {
×
339
      timer({
×
340
        name: constructor.getQueueName(),
×
341
      })
×
342
    }
×
343
  }
10,048✔
344
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc