• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 15187324276

22 May 2025 01:03PM UTC coverage: 78.01% (-0.1%) from 78.144%
15187324276

Pull #696

github

web-flow
Merge 63e223c68 into 69e4a4079
Pull Request #696: feat: pgboss v10

1532 of 2132 branches covered (71.86%)

Branch coverage included in aggregate %.

35 of 173 new or added lines in 9 files covered. (20.23%)

3 existing lines in 2 files now uncovered.

17309 of 22020 relevant lines covered (78.61%)

109.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.51
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { Job, SendOptions, WorkOptions, Queue as PgBossQueue } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7

1✔
8
export interface BasePayload {
1✔
9
  $version?: string
1✔
10
  singletonKey?: string
1✔
11
  scheduleAt?: Date
1✔
12
  reqId?: string
1✔
13
  tenant: {
1✔
14
    ref: string
1✔
15
    host: string
1✔
16
  }
1✔
17
}
1✔
18

1✔
19
export interface SlowRetryQueueOptions {
1✔
20
  retryLimit: number
1✔
21
  retryDelay: number
1✔
22
}
1✔
23

1✔
24
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
25

1✔
26
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
27

1✔
28
interface BaseEventConstructor<Base extends Event<any>> {
1✔
29
  version: string
1✔
30

1✔
31
  new (...args: any): Base
1✔
32

1✔
33
  send(
1✔
34
    this: StaticThis<Base>,
1✔
35
    payload: Omit<Base['payload'], '$version'>
1✔
36
  ): Promise<string | void | null>
1✔
37

1✔
38
  eventName(): string
1✔
39
  getWorkerOptions(): WorkOptions
1✔
40
}
1✔
41

1✔
42
/**
1✔
43
 * Base class for all events that are sent to the queue
1✔
44
 */
1✔
45
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
46
  public static readonly version: string = 'v1'
1✔
47
  protected static queueName = ''
1✔
48
  protected static allowSync = true
1✔
49

1✔
50
  constructor(public readonly payload: T & BasePayload) {}
1✔
51

1✔
52
  static eventName() {
1✔
53
    return this.name
×
54
  }
×
55

1✔
56
  static getQueueName() {
1✔
57
    if (!this.queueName) {
10,042!
58
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
59
    }
×
60

10,042✔
61
    return this.queueName
10,042✔
62
  }
10,042✔
63

1✔
64
  static getQueueOptions(): PgBossQueue | undefined {
1✔
UNCOV
65
    return undefined
×
UNCOV
66
  }
×
67

1✔
68
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
69
    return undefined
5✔
70
  }
5✔
71

1✔
72
  static getWorkerOptions(): WorkOptions {
1✔
73
    return {}
×
74
  }
×
75

1✔
76
  static withSlowRetryQueue(): undefined | SlowRetryQueueOptions {
1✔
77
    return undefined
×
78
  }
×
79

1✔
80
  static getSlowRetryQueueName() {
1✔
81
    if (!this.queueName) {
×
82
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
83
    }
×
84

×
85
    return this.queueName + '-slow'
×
86
  }
×
87

1✔
88
  static onClose() {
1✔
89
    // no-op
×
90
  }
×
91

1✔
92
  static onStart() {
1✔
93
    // no-op
×
94
  }
×
95

1✔
96
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
97
    if (!pgQueueEnable) {
1!
98
      if (this.allowSync) {
×
99
        return Promise.all(messages.map((message) => message.send()))
✔
100
      } else {
×
101
        logger.warn('[Queue] skipped sending batch messages', {
×
102
          type: 'queue',
×
103
          eventType: this.eventName(),
×
104
        })
×
105
        return
×
106
      }
×
107
    }
×
108

1✔
109
    return Queue.getInstance().insert(
1✔
110
      messages.map((message) => {
1✔
111
        const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {}
1!
112
        if (!message.payload.$version) {
1✔
113
          ;(message.payload as (typeof message)['payload']).$version = this.version
1✔
114
        }
1✔
115

1✔
116
        if (message.payload.scheduleAt) {
1!
117
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
118
        }
×
119

1✔
120
        return {
1✔
121
          ...sendOptions,
1✔
122
          name: this.getQueueName(),
1✔
123
          data: message.payload,
1✔
124
        }
1✔
125
      })
1✔
126
    )
1✔
127
  }
1✔
128

1✔
129
  static send<T extends Event<any>>(this: StaticThis<T>, payload: Omit<T['payload'], '$version'>) {
1✔
130
    if (!payload.$version) {
10,042✔
131
      ;(payload as T['payload']).$version = this.version
10,042✔
132
    }
10,042✔
133
    const that = new this(payload)
10,042✔
134
    return that.send()
10,042✔
135
  }
10,042✔
136

1✔
137
  static sendSlowRetryQueue<T extends Event<any>>(
1✔
138
    this: StaticThis<T>,
×
139
    payload: Omit<T['payload'], '$version'>
×
140
  ) {
×
141
    if (!payload.$version) {
×
142
      ;(payload as T['payload']).$version = this.version
×
143
    }
×
144
    const that = new this(payload)
×
145
    return that.sendSlowRetryQueue()
×
146
  }
×
147

1✔
148
  static handle(job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[]) {
1✔
149
    throw new Error('not implemented')
×
150
  }
×
151

1✔
152
  static async shouldSend(payload: any) {
1✔
153
    if (isMultitenant) {
14!
154
      // Do not send an event if disabled for this specific tenant
×
155
      const tenant = await getTenantConfig(payload.tenant.ref)
×
156
      const disabledEvents = tenant.disableEvents || []
×
157
      if (disabledEvents.includes(this.eventName())) {
×
158
        return false
×
159
      }
×
160
    }
×
161
    return true
14✔
162
  }
14✔
163

1✔
164
  async send(): Promise<string | void | null> {
1✔
165
    const constructor = this.constructor as typeof Event
10,042✔
166

10,042✔
167
    const shouldSend = await constructor.shouldSend(this.payload)
10,042✔
168

10,042✔
169
    if (!shouldSend) {
10,042!
170
      return
×
171
    }
×
172

10,042✔
173
    if (!pgQueueEnable) {
10,042✔
174
      if (constructor.allowSync) {
10,042✔
175
        return constructor.handle({
10,042✔
176
          id: '__sync',
10,042✔
177
          expireInSeconds: 0,
10,042✔
178
          name: constructor.getQueueName(),
10,042✔
179
          data: {
10,042✔
180
            region,
10,042✔
181
            ...this.payload,
10,042✔
182
            $version: constructor.version,
10,042✔
183
          },
10,042✔
184
        })
10,042✔
185
      } else {
10,042!
186
        logger.warn('[Queue] skipped sending message', {
×
187
          type: 'queue',
×
188
          eventType: constructor.eventName(),
×
189
        })
×
190
        return
×
191
      }
×
192
    }
10,042✔
193

×
194
    const timer = QueueJobSchedulingTime.startTimer()
×
NEW
195
    let sendOptions = constructor.getSendOptions(this.payload)
×
196

×
197
    if (this.payload.scheduleAt) {
×
198
      if (!sendOptions) {
×
199
        sendOptions = {}
×
200
      }
×
201
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
202
    }
×
203

×
204
    try {
×
205
      const res = await Queue.getInstance().send({
×
206
        name: constructor.getQueueName(),
×
207
        data: {
×
208
          region,
×
209
          ...this.payload,
×
210
          $version: constructor.version,
×
211
        },
×
212
        options: sendOptions,
×
213
      })
×
214

×
215
      QueueJobScheduled.inc({
×
216
        name: constructor.getQueueName(),
×
217
      })
×
218

×
219
      return res
×
220
    } catch (e) {
×
221
      // If we can't queue the message for some reason,
×
222
      // we run its handler right away.
×
223
      // This might create some latency with the benefit of being more fault-tolerant
×
224
      logSchema.warning(
×
225
        logger,
×
226
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
227
        {
×
228
          type: 'queue',
×
229
          error: e,
×
230
          metadata: JSON.stringify(this.payload),
×
231
        }
×
232
      )
×
233
      return constructor.handle({
×
234
        id: '__sync',
×
NEW
235
        expireInSeconds: 0,
×
236
        name: constructor.getQueueName(),
×
237
        data: {
×
238
          region,
×
239
          ...this.payload,
×
240
          $version: constructor.version,
×
241
        },
×
242
      })
×
243
    } finally {
×
244
      timer({
×
245
        name: constructor.getQueueName(),
×
246
      })
×
247
    }
×
248
  }
10,042✔
249

1✔
250
  async sendSlowRetryQueue() {
1✔
251
    const constructor = this.constructor as typeof Event
×
252
    const slowRetryQueue = constructor.withSlowRetryQueue()
×
253

×
254
    if (!pgQueueEnable || !slowRetryQueue) {
×
255
      return
×
256
    }
×
257

×
258
    const timer = QueueJobSchedulingTime.startTimer()
×
NEW
259
    const sendOptions = constructor.getSendOptions(this.payload) || {}
×
260

×
261
    const res = await Queue.getInstance().send({
×
262
      name: constructor.getSlowRetryQueueName(),
×
263
      data: {
×
264
        region,
×
265
        ...this.payload,
×
266
        $version: constructor.version,
×
267
      },
×
268
      options: {
×
269
        retryBackoff: true,
×
270
        startAfter: 60 * 60 * 30, // 30 mins
×
271
        ...sendOptions,
×
272
        ...slowRetryQueue,
×
273
      },
×
274
    })
×
275

×
276
    timer({
×
277
      name: constructor.getSlowRetryQueueName(),
×
278
    })
×
279

×
280
    QueueJobScheduled.inc({
×
281
      name: constructor.getSlowRetryQueueName(),
×
282
    })
×
283

×
284
    return res
×
285
  }
×
286
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc