• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 15387080396

02 Jun 2025 08:04AM UTC coverage: 77.893% (-0.3%) from 78.159%
15387080396

Pull #696

github

web-flow
Merge f3f847646 into d82ebecc1
Pull Request #696: feat: pgboss v10

1536 of 2137 branches covered (71.88%)

Branch coverage included in aggregate %.

166 of 436 new or added lines in 28 files covered. (38.07%)

60 existing lines in 4 files now uncovered.

17374 of 22140 relevant lines covered (78.47%)

110.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.56
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { Job, SendOptions, WorkOptions, Queue as PgBossQueue } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7

1✔
8
export interface BasePayload {
1✔
9
  $version?: string
1✔
10
  singletonKey?: string
1✔
11
  scheduleAt?: Date
1✔
12
  reqId?: string
1✔
13
  tenant: {
1✔
14
    ref: string
1✔
15
    host: string
1✔
16
  }
1✔
17
}
1✔
18

1✔
19
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
20

1✔
21
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
22

1✔
23
interface BaseEventConstructor<Base extends Event<any>> {
1✔
24
  version: string
1✔
25

1✔
26
  new (...args: any): Base
1✔
27

1✔
28
  send(
1✔
29
    this: StaticThis<Base>,
1✔
30
    payload: Omit<Base['payload'], '$version'>
1✔
31
  ): Promise<string | void | null>
1✔
32

1✔
33
  eventName(): string
1✔
34
  getWorkerOptions(): WorkOptions
1✔
35
}
1✔
36

1✔
37
/**
1✔
38
 * Base class for all events that are sent to the queue
1✔
39
 */
1✔
40
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
41
  public static readonly version: string = 'v1'
1✔
42
  protected static queueName = ''
1✔
43
  protected static allowSync = true
1✔
44

1✔
45
  constructor(public readonly payload: T & BasePayload) {}
1✔
46

1✔
47
  static eventName() {
1✔
48
    return this.name
×
49
  }
×
50

1✔
51
  static getQueueName() {
1✔
52
    if (!this.queueName) {
10,042!
NEW
53
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
NEW
54
    }
×
55

10,042✔
56
    return this.queueName
10,042✔
57
  }
10,042✔
58

1✔
59
  static getQueueOptions(): PgBossQueue | undefined {
1✔
UNCOV
60
    return undefined
×
UNCOV
61
  }
×
62

1✔
63
  static getSendOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
64
    return undefined
5✔
65
  }
5✔
66

1✔
67
  static getWorkerOptions(): WorkOptions {
1✔
68
    return {}
×
69
  }
×
70

1✔
71
  static withSlowRetryQueue(): undefined {
1✔
NEW
72
    return undefined
×
UNCOV
73
  }
×
74

1✔
75
  static getSlowRetryQueueName() {
1✔
76
    if (!this.queueName) {
×
77
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
UNCOV
78
    }
×
UNCOV
79

×
80
    return this.queueName + '-slow'
×
81
  }
×
82

1✔
83
  static onClose() {
1✔
UNCOV
84
    // no-op
×
85
  }
×
86

1✔
87
  static onStart() {
1✔
88
    // no-op
×
89
  }
×
90

1✔
91
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
92
    if (!pgQueueEnable) {
1!
93
      if (this.allowSync) {
×
94
        return Promise.all(messages.map((message) => message.send()))
✔
UNCOV
95
      } else {
×
UNCOV
96
        logger.warn('[Queue] skipped sending batch messages', {
×
UNCOV
97
          type: 'queue',
×
NEW
98
          eventType: this.eventName(),
×
UNCOV
99
        })
×
UNCOV
100
        return
×
UNCOV
101
      }
×
UNCOV
102
    }
×
103

1✔
104
    return Queue.getInstance().insert(
1✔
105
      messages.map((message) => {
1✔
106
        const sendOptions = (this.getSendOptions(message.payload) as PgBoss.JobInsert) || {}
1!
107
        if (!message.payload.$version) {
1✔
108
          ;(message.payload as (typeof message)['payload']).$version = this.version
1✔
109
        }
1✔
110

1✔
111
        if (message.payload.scheduleAt) {
1!
UNCOV
112
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
UNCOV
113
        }
×
114

1✔
115
        return {
1✔
116
          ...sendOptions,
1✔
117
          name: this.getQueueName(),
1✔
118
          data: message.payload,
1✔
119
        }
1✔
120
      })
1✔
121
    )
1✔
122
  }
1✔
123

1✔
124
  static send<T extends Event<any>>(this: StaticThis<T>, payload: Omit<T['payload'], '$version'>) {
1✔
125
    if (!payload.$version) {
10,042✔
126
      ;(payload as T['payload']).$version = this.version
10,042✔
127
    }
10,042✔
128
    const that = new this(payload)
10,042✔
129
    return that.send()
10,042✔
130
  }
10,042✔
131

1✔
132
  static handle(job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[]) {
1✔
133
    throw new Error('not implemented')
×
134
  }
×
135

1✔
136
  static async shouldSend(payload: any) {
1✔
137
    if (isMultitenant) {
14!
UNCOV
138
      // Do not send an event if disabled for this specific tenant
×
UNCOV
139
      const tenant = await getTenantConfig(payload.tenant.ref)
×
UNCOV
140
      const disabledEvents = tenant.disableEvents || []
×
UNCOV
141
      if (disabledEvents.includes(this.eventName())) {
×
UNCOV
142
        return false
×
UNCOV
143
      }
×
UNCOV
144
    }
×
145
    return true
14✔
146
  }
14✔
147

1✔
148
  async send(): Promise<string | void | null> {
1✔
149
    const constructor = this.constructor as typeof Event
10,042✔
150

10,042✔
151
    const shouldSend = await constructor.shouldSend(this.payload)
10,042✔
152

10,042✔
153
    console.log('SHOULD SEND', shouldSend)
10,042✔
154

10,042✔
155
    if (!shouldSend) {
10,042!
UNCOV
156
      return
×
UNCOV
157
    }
×
158

10,042✔
159
    if (!pgQueueEnable) {
10,042✔
160
      if (constructor.allowSync) {
10,042✔
161
        return constructor.handle({
10,042✔
162
          id: '__sync',
10,042✔
163
          expireInSeconds: 0,
10,042✔
164
          name: constructor.getQueueName(),
10,042✔
165
          data: {
10,042✔
166
            region,
10,042✔
167
            ...this.payload,
10,042✔
168
            $version: constructor.version,
10,042✔
169
          },
10,042✔
170
        })
10,042✔
171
      } else {
10,042!
NEW
172
        logger.warn('[Queue] skipped sending message', {
×
173
          type: 'queue',
×
174
          eventType: constructor.eventName(),
×
175
        })
×
176
        return
×
177
      }
×
178
    }
10,042✔
NEW
179

×
180
    const timer = QueueJobSchedulingTime.startTimer()
×
181
    let sendOptions = constructor.getSendOptions(this.payload)
×
182

×
183
    if (this.payload.scheduleAt) {
×
184
      if (!sendOptions) {
×
185
        sendOptions = {}
×
186
      }
×
187
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
188
    }
×
189

×
190
    try {
×
191
      const res = await Queue.getInstance().send({
×
192
        name: constructor.getQueueName(),
×
193
        data: {
×
194
          region,
×
195
          ...this.payload,
×
196
          $version: constructor.version,
×
197
        },
×
198
        options: sendOptions,
×
199
      })
×
200

×
201
      QueueJobScheduled.inc({
×
202
        name: constructor.getQueueName(),
×
203
      })
×
204

×
205
      return res
×
206
    } catch (e) {
×
207
      // If we can't queue the message for some reason,
×
208
      // we run its handler right away.
×
209
      // This might create some latency with the benefit of being more fault-tolerant
×
210
      logSchema.warning(
×
NEW
211
        logger,
×
212
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
213
        {
×
214
          type: 'queue',
×
215
          error: e,
×
216
          metadata: JSON.stringify(this.payload),
×
217
        }
×
218
      )
×
219
      return constructor.handle({
×
220
        id: '__sync',
×
221
        expireInSeconds: 0,
×
222
        name: constructor.getQueueName(),
×
223
        data: {
×
UNCOV
224
          region,
×
UNCOV
225
          ...this.payload,
×
UNCOV
226
          $version: constructor.version,
×
UNCOV
227
        },
×
UNCOV
228
      })
×
UNCOV
229
    } finally {
×
UNCOV
230
      timer({
×
UNCOV
231
        name: constructor.getQueueName(),
×
UNCOV
232
      })
×
UNCOV
233
    }
×
234
  }
10,042✔
235
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc