• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 19099233262

05 Nov 2025 10:38AM UTC coverage: 77.75% (+1.4%) from 76.389%
19099233262

Pull #774

github

web-flow
Merge 528b46699 into 76df298f8
Pull Request #774: feat: vector buckets

2021 of 2925 branches covered (69.09%)

Branch coverage included in aggregate %.

3428 of 3935 new or added lines in 58 files covered. (87.12%)

31 existing lines in 3 files now uncovered.

24588 of 31299 relevant lines covered (78.56%)

91.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.21
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { Job, SendOptions, WorkOptions, Queue as PgBossQueue } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7
import { ERRORS } from '@internal/errors'
1✔
8

1✔
9
export interface BasePayload {
1✔
10
  $version?: string
1✔
11
  singletonKey?: string
1✔
12
  scheduleAt?: Date
1✔
13
  reqId?: string
1✔
14
  tenant: {
1✔
15
    ref: string
1✔
16
    host: string
1✔
17
  }
1✔
18
}
1✔
19

1✔
20
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
21

1✔
22
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
23

1✔
24
interface BaseEventConstructor<Base extends Event<any>> {
1✔
25
  version: string
1✔
26

1✔
27
  new (...args: any): Base
1✔
28

1✔
29
  send(
1✔
30
    this: StaticThis<Base>,
1✔
31
    payload: Omit<Base['payload'], '$version'>
1✔
32
  ): Promise<string | void | null>
1✔
33

1✔
34
  eventName(): string
1✔
35
  getWorkerOptions(): WorkOptions
1✔
36
}
1✔
37

1✔
38
/**
1✔
39
 * Base class for all events that are sent to the queue
1✔
40
 */
1✔
41
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
42
  public static readonly version: string = 'v1'
1✔
43
  protected static queueName = ''
1✔
44
  protected static allowSync = true
1✔
45

1✔
46
  constructor(public readonly payload: T & BasePayload) {}
1✔
47

1✔
48
  static eventName() {
1✔
49
    return this.name
×
50
  }
×
51

1✔
52
  static deadLetterQueueName() {
1✔
53
    return this.queueName + '-dead-letter'
1✔
54
  }
1✔
55

1✔
56
  static getQueueName() {
1✔
57
    if (!this.queueName) {
10,042!
58
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
59
    }
×
60

10,042✔
61
    return this.queueName
10,042✔
62
  }
10,042✔
63

1✔
64
  static getQueueOptions(): PgBossQueue | undefined {
1✔
65
    return undefined
×
66
  }
×
67

1✔
68
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
69
    return undefined
×
70
  }
×
71

1✔
72
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
1✔
73
    return {}
×
74
  }
×
75

1✔
76
  static onClose() {
1✔
77
    // no-op
×
78
  }
×
79

1✔
80
  static onStart() {
1✔
81
    // no-op
×
82
  }
×
83

1✔
84
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
85
    if (!pgQueueEnable) {
1!
86
      if (this.allowSync) {
×
87
        return Promise.all(messages.map((message) => message.send()))
✔
88
      } else {
×
89
        logger.warn('[Queue] skipped sending batch messages', {
×
90
          type: 'queue',
×
91
          eventType: this.eventName(),
×
92
        })
×
93
        return
×
94
      }
×
95
    }
×
96

1✔
97
    return Queue.getInstance().insert(
1✔
98
      messages.map((message) => {
1✔
99
        const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {}
1!
100
        if (!message.payload.$version) {
1✔
101
          ;(message.payload as (typeof message)['payload']).$version = this.version
1✔
102
        }
1✔
103

1✔
104
        if (message.payload.scheduleAt) {
1!
105
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
106
        }
×
107

1✔
108
        return {
1✔
109
          ...sendOptions,
1✔
110
          name: this.getQueueName(),
1✔
111
          data: message.payload,
1✔
112
          deadLetter: this.deadLetterQueueName(),
1✔
113
        }
1✔
114
      })
1✔
115
    )
1✔
116
  }
1✔
117

1✔
118
  static send<T extends Event<any>>(this: StaticThis<T>, payload: Omit<T['payload'], '$version'>) {
1✔
119
    if (!payload.$version) {
10,042✔
120
      ;(payload as T['payload']).$version = this.version
10,042✔
121
    }
10,042✔
122
    const that = new this(payload)
10,042✔
123
    return that.send()
10,042✔
124
  }
10,042✔
125

1✔
126
  static invoke<T extends Event<any>>(
1✔
127
    this: StaticThis<T>,
16✔
128
    payload: Omit<T['payload'], '$version'>
16✔
129
  ) {
16✔
130
    if (!payload.$version) {
16✔
131
      ;(payload as T['payload']).$version = this.version
16✔
132
    }
16✔
133
    const that = new this(payload)
16✔
134
    return that.invoke()
16✔
135
  }
16✔
136

1✔
137
  static invokeOrSend<T extends Event<any>>(
1✔
138
    this: StaticThis<T>,
1✔
139
    payload: Omit<T['payload'], '$version'>,
1✔
140
    options?: SendOptions
1✔
141
  ) {
1✔
142
    if (!payload.$version) {
1✔
143
      ;(payload as T['payload']).$version = this.version
1✔
144
    }
1✔
145
    const that = new this(payload)
1✔
146
    return that.invokeOrSend(options)
1✔
147
  }
1✔
148

1✔
149
  static handle(
1✔
150
    job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[],
×
151
    opts?: { signal?: AbortSignal }
×
152
  ) {
×
153
    throw new Error('not implemented')
×
154
  }
×
155

1✔
156
  static async shouldSend(payload: any) {
1✔
157
    if (isMultitenant && payload?.tenant?.ref) {
14!
158
      // Do not send an event if disabled for this specific tenant
×
159
      const tenant = await getTenantConfig(payload.tenant.ref)
×
160
      const disabledEvents = tenant.disableEvents || []
×
161
      if (disabledEvents.includes(this.eventName())) {
×
162
        return false
×
163
      }
×
164
    }
×
165
    return true
14✔
166
  }
14✔
167

1✔
168
  /**
1✔
169
   * See issue https://github.com/timgit/pg-boss/issues/535
1✔
170
   * @param queueName
1✔
171
   * @param singletonKey
1✔
172
   * @param jobId
1✔
173
   */
1✔
174
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
1✔
175
    if (!pgQueueEnable) {
×
176
      return Promise.resolve()
×
177
    }
×
178

×
179
    await Queue.getDb().executeSql(
×
180
      `DELETE FROM pgboss_v10.job
×
181
       WHERE id = $1
×
182
       AND EXISTS(
×
183
          SELECT 1 FROM pgboss_v10.job
×
184
             WHERE id != $2
×
185
             AND state < 'active'
×
186
             AND name = $3
×
187
             AND singleton_key = $4
×
188
       )
×
189
      `,
×
190
      [jobId, jobId, queueName, singletonKey]
×
191
    )
×
192
  }
×
193

1✔
194
  async invokeOrSend(sendOptions?: SendOptions): Promise<string | void | null> {
1✔
195
    const constructor = this.constructor as typeof Event
1✔
196

1✔
197
    if (!constructor.allowSync) {
1!
NEW
198
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
NEW
199
    }
×
200

1✔
201
    try {
1✔
202
      await this.invoke()
1✔
203
    } catch (e) {
1!
NEW
204
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
NEW
205
        type: 'queue',
×
NEW
206
        project: this.payload.tenant?.ref,
×
NEW
207
        error: e,
×
NEW
208
        metadata: JSON.stringify(this.payload),
×
NEW
209
      })
×
NEW
210

×
NEW
211
      return this.send(sendOptions)
×
NEW
212
    }
×
213
  }
1✔
214

1✔
215
  async invoke(): Promise<string | void | null> {
1✔
216
    const constructor = this.constructor as typeof Event
17✔
217

17✔
218
    if (!constructor.allowSync) {
17!
219
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
220
    }
×
221

17✔
222
    await constructor.handle({
17✔
223
      id: '__sync',
17✔
224
      expireInSeconds: 0,
17✔
225
      name: constructor.getQueueName(),
17✔
226
      data: {
17✔
227
        region,
17✔
228
        ...this.payload,
17✔
229
        $version: constructor.version,
17✔
230
      },
17✔
231
    })
17✔
232
  }
17✔
233

1✔
234
  async send(customSendOptions?: SendOptions): Promise<string | void | null> {
1✔
235
    const constructor = this.constructor as typeof Event
10,042✔
236

10,042✔
237
    const shouldSend = await constructor.shouldSend(this.payload)
10,042✔
238

10,042✔
239
    if (!shouldSend) {
10,042!
240
      return
×
241
    }
×
242

10,042✔
243
    if (!pgQueueEnable) {
10,042✔
244
      if (constructor.allowSync) {
10,042✔
245
        return constructor.handle({
10,042✔
246
          id: '__sync',
10,042✔
247
          expireInSeconds: 0,
10,042✔
248
          name: constructor.getQueueName(),
10,042✔
249
          data: {
10,042✔
250
            region,
10,042✔
251
            ...this.payload,
10,042✔
252
            $version: constructor.version,
10,042✔
253
          },
10,042✔
254
        })
10,042✔
255
      } else {
10,042!
256
        logger.warn('[Queue] skipped sending message', {
×
257
          type: 'queue',
×
258
          eventType: constructor.eventName(),
×
259
        })
×
260
        return
×
261
      }
×
262
    }
10,042✔
263

×
264
    const timer = QueueJobSchedulingTime.startTimer()
×
265
    const sendOptions = constructor.getSendOptions(this.payload) || {}
×
266

10,042✔
267
    if (this.payload.scheduleAt) {
10,042!
268
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
269
    }
×
270

×
271
    sendOptions!.deadLetter = constructor.deadLetterQueueName()
×
272

×
273
    try {
×
274
      const res = await Queue.getInstance().send({
×
275
        name: constructor.getQueueName(),
×
276
        data: {
×
277
          region,
×
278
          ...this.payload,
×
279
          $version: constructor.version,
×
280
        },
×
NEW
281
        options: {
×
NEW
282
          ...sendOptions,
×
NEW
283
          ...customSendOptions,
×
NEW
284
        },
×
285
      })
×
286

×
287
      QueueJobScheduled.inc({
×
288
        name: constructor.getQueueName(),
×
289
      })
×
290

×
291
      return res
×
292
    } catch (e) {
×
293
      // If we can't queue the message for some reason,
×
294
      // we run its handler right away.
×
295
      // This might create some latency with the benefit of being more fault-tolerant
×
296
      logSchema.warning(
×
297
        logger,
×
298
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
299
        {
×
300
          type: 'queue',
×
301
          error: e,
×
302
          metadata: JSON.stringify(this.payload),
×
303
        }
×
304
      )
×
305
      return constructor.handle({
×
306
        id: '__sync',
×
307
        expireInSeconds: 0,
×
308
        name: constructor.getQueueName(),
×
309
        data: {
×
310
          region,
×
311
          ...this.payload,
×
312
          $version: constructor.version,
×
313
        },
×
314
      })
×
315
    } finally {
×
316
      timer({
×
317
        name: constructor.getQueueName(),
×
318
      })
×
319
    }
×
320
  }
10,042✔
321
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc