• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 28660863445

03 Jul 2026 12:34PM UTC coverage: 78.856% (+0.03%) from 78.822%
28660863445

push

github

web-flow
fix: replace mutex with single flight pattern (#1193)

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

5103 of 7034 branches covered (72.55%)

Branch coverage included in aggregate %.

72 of 87 new or added lines in 9 files covered. (82.76%)

2 existing lines in 2 files now uncovered.

10095 of 12239 relevant lines covered (82.48%)

420.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.29
/src/internal/queue/event.ts
1
import { getTenantConfig } from '@internal/database'
2
import { PgExecutor } from '@internal/database/pg-connection'
3
import { ERRORS } from '@internal/errors'
4
import { logger, logSchema } from '@internal/monitoring'
5
import { queueJobScheduled, queueJobSchedulingTime } from '@internal/monitoring/metrics'
6
import { PgQueueDB } from '@internal/queue/database'
7
import PgBoss, { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
8
import { getConfig } from '../../config'
9
import { SYSTEM_TENANT_REF } from './constants'
10
import { PG_BOSS_SCHEMA, Queue } from './queue'
11

12
export interface BasePayload {
13
  $version?: string
14
  singletonKey?: string
15
  scheduleAt?: Date
16
  reqId?: string
17
  sbReqId?: string
18
  tenant: {
19
    ref: string
20
    host: string
21
  }
22
}
23

24
const { pgQueueEnable, region, isMultitenant } = getConfig()
70✔
25
type TransactionalQueueDb = PgExecutor
26

27
function withPayloadVersion<TPayload extends BasePayload>(
28
  payload: TPayload,
29
  version: string
30
): TPayload {
31
  return {
1,785✔
32
    ...payload,
33
    $version: payload.$version ?? version,
3,570✔
34
  }
35
}
36

37
export type EventInputPayload = Omit<BasePayload, '$version'>
38
export type QueueEvent<T extends EventInputPayload = EventInputPayload> = Event<T>
39
export type StaticThis<TPayload extends BasePayload> = BaseEventConstructor<TPayload>
40

41
interface BaseEventConstructor<TPayload extends BasePayload> {
42
  version: string
43

44
  new (payload: TPayload): QueueEvent<Omit<TPayload, '$version'>>
45
}
46

47
/**
48
 * Base class for all events that are sent to the queue
49
 */
50
export class Event<T extends Omit<BasePayload, '$version'>> {
51
  public static readonly version: string = 'v1'
70✔
52
  protected static queueName = ''
70✔
53
  protected static allowSync = true
70✔
54

55
  constructor(public readonly payload: T & BasePayload) {}
1,795✔
56

57
  static eventName() {
58
    return this.name
3✔
59
  }
60

61
  static deadLetterQueueName() {
62
    return this.queueName + '-dead-letter'
1,074✔
63
  }
64

65
  static getQueueName() {
66
    if (!this.queueName) {
6,110!
67
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
68
    }
69

70
    return this.queueName
6,110✔
71
  }
72

73
  static getQueueOptions(): PgBossQueue | undefined {
74
    return undefined
×
75
  }
76

77
  static getSendOptions(payload: BasePayload): SendOptions | undefined {
78
    return undefined
2✔
79
  }
80

81
  static getWorkerOptions(): WorkOptions & { concurrentTaskCount?: number } {
82
    return {}
×
83
  }
84

85
  static onClose() {
86
    // no-op
87
  }
88

89
  static onStart() {
90
    // no-op
91
  }
92

93
  static batchSend<TPayload extends BasePayload>(
94
    this: StaticThis<TPayload>,
95
    messages: Array<{ payload: TPayload; send(): Promise<string | void | null> }>
96
  ) {
97
    const eventClass = this as typeof Event
3✔
98

99
    if (!pgQueueEnable) {
3✔
100
      if (eventClass.allowSync) {
1!
101
        return Promise.all(messages.map((message) => message.send()))
10✔
102
      } else {
103
        logger.warn(
×
104
          {
105
            type: 'queue',
106
            eventType: eventClass.eventName(),
107
          },
108
          '[Queue] skipped sending batch messages'
109
        )
110
        return
×
111
      }
112
    }
113

114
    return Queue.getInstance().insert(
2✔
115
      messages.map((message) => {
116
        const payloadWithVersion = withPayloadVersion(message.payload, eventClass.version)
2✔
117
        const sendOptions =
118
          (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {}
2✔
119

120
        if (payloadWithVersion.scheduleAt) {
2✔
121
          sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt)
1✔
122
        }
123

124
        return {
2✔
125
          ...sendOptions,
126
          name: eventClass.getQueueName(),
127
          data: payloadWithVersion,
128
          deadLetter: eventClass.deadLetterQueueName(),
129
        }
130
      })
131
    )
132
  }
133

134
  static send<TPayload extends BasePayload>(
135
    this: StaticThis<TPayload>,
136
    payload: Omit<TPayload, '$version'>,
137
    opts?: SendOptions & { tnx?: TransactionalQueueDb }
138
  ) {
139
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
1,756✔
140
    return that.send(opts)
1,756✔
141
  }
142

143
  static invoke<TPayload extends BasePayload>(
144
    this: StaticThis<TPayload>,
145
    payload: Omit<TPayload, '$version'>
146
  ) {
147
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
4✔
148
    return that.invoke()
4✔
149
  }
150

151
  static invokeOrSend<TPayload extends BasePayload>(
152
    this: StaticThis<TPayload>,
153
    payload: Omit<TPayload, '$version'>,
154
    options?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
155
  ) {
156
    const that = new this(withPayloadVersion(payload as TPayload, this.version))
23✔
157
    return that.invokeOrSend(options)
23✔
158
  }
159

160
  static handle(job: Job<BasePayload> | Job<BasePayload>[], opts?: { signal?: AbortSignal }) {
161
    throw new Error('not implemented')
×
162
  }
163

164
  static async shouldSend(payload: BasePayload) {
165
    if (isMultitenant && payload?.tenant?.ref) {
52!
166
      // Do not send an event if disabled for this specific tenant
167
      const tenant = await getTenantConfig(payload.tenant.ref)
×
168
      const disabledEvents = tenant.disableEvents || []
×
169
      if (disabledEvents.includes(this.eventName())) {
×
170
        return false
×
171
      }
172
    }
173
    return true
52✔
174
  }
175

176
  /**
177
   * See issue https://github.com/timgit/pg-boss/issues/535
178
   * @param queueName
179
   * @param singletonKey
180
   * @param jobId
181
   */
182
  static async deleteIfActiveExists(queueName: string, singletonKey: string, jobId: string) {
183
    if (!pgQueueEnable) {
×
184
      return Promise.resolve()
×
185
    }
186

187
    await Queue.getDb().executeSql(
×
188
      `DELETE FROM ${PG_BOSS_SCHEMA}.job
189
       WHERE id = $1
190
       AND EXISTS(
191
          SELECT 1 FROM ${PG_BOSS_SCHEMA}.job
192
             WHERE id != $2
193
             AND state < 'active'
194
             AND name = $3
195
             AND singleton_key = $4
196
       )
197
      `,
198
      [jobId, jobId, queueName, singletonKey]
199
    )
200
  }
201

202
  async invokeOrSend(
203
    sendOptions?: SendOptions & { sendWhenError?: (error: unknown) => boolean }
204
  ): Promise<string | void | null> {
205
    const eventClass = this.constructor as typeof Event
22✔
206

207
    if (!eventClass.allowSync) {
22!
208
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
209
    }
210

211
    try {
22✔
212
      await this.invoke()
22✔
213
    } catch (e) {
214
      if (sendOptions?.sendWhenError && !sendOptions.sendWhenError(e)) {
×
215
        throw e
×
216
      }
217

218
      logSchema.error(logger, '[Queue] Error invoking event synchronously, sending to queue', {
×
219
        type: 'queue',
220
        project: this.payload.tenant?.ref || SYSTEM_TENANT_REF,
×
221
        error: e,
222
        metadata: JSON.stringify(this.payload),
223
        sbReqId: this.payload.sbReqId,
224
      })
225

226
      return this.send(sendOptions)
×
227
    }
228
  }
229

230
  async invoke(): Promise<string | void | null> {
231
    const eventClass = this.constructor as typeof Event
25✔
232

233
    if (!eventClass.allowSync) {
25!
234
      throw ERRORS.InternalError(undefined, 'Cannot send this event synchronously')
×
235
    }
236

237
    await eventClass.handle({
25✔
238
      id: '__sync',
239
      expireInSeconds: 0,
240
      name: eventClass.getQueueName(),
241
      data: {
242
        region,
243
        ...this.payload,
244
        $version: eventClass.version,
245
      },
246
    })
247
  }
248

249
  async send(
250
    customSendOptions?: SendOptions & { tnx?: TransactionalQueueDb }
251
  ): Promise<string | void | null> {
252
    const eventClass = this.constructor as typeof Event
1,765✔
253

254
    const shouldSend = await eventClass.shouldSend(this.payload)
1,765✔
255

256
    if (!shouldSend) {
1,765!
257
      return
×
258
    }
259

260
    if (!pgQueueEnable) {
1,765✔
261
      if (eventClass.allowSync) {
693✔
262
        return eventClass.handle({
690✔
263
          id: '__sync',
264
          expireInSeconds: 0,
265
          name: eventClass.getQueueName(),
266
          data: {
267
            region,
268
            ...this.payload,
269
            $version: eventClass.version,
270
          },
271
        })
272
      } else {
273
        logger.warn(
3✔
274
          {
275
            type: 'queue',
276
            eventType: eventClass.eventName(),
277
          },
278
          '[Queue] skipped sending message'
279
        )
280
        return
3✔
281
      }
282
    }
283

284
    const startTime = process.hrtime.bigint()
1,072✔
285
    const sendOptions = eventClass.getSendOptions(this.payload) || {}
1,072✔
286

287
    if (this.payload.scheduleAt) {
1,765!
288
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
289
    }
290

291
    sendOptions!.deadLetter = eventClass.deadLetterQueueName()
1,072✔
292

293
    try {
1,072✔
294
      const queue = customSendOptions?.tnx
1,072✔
295
        ? Queue.createPgBoss({
296
            enableWorkers: false,
297
            db: createTransactionQueueDB(customSendOptions.tnx),
298
          })
299
        : Queue.getInstance()
300

301
      const res = await queue.send({
1,765✔
302
        name: eventClass.getQueueName(),
303
        data: {
304
          region,
305
          ...this.payload,
306
          $version: eventClass.version,
307
        },
308
        options: {
309
          ...sendOptions,
310
          ...customSendOptions,
311
        },
312
      })
313

314
      // pg-boss returns null when the insert was dropped by a queue policy
315
      // (e.g. an exactly_once job with the same singleton key is still queued or active).
316
      if (res === null) {
1,072!
NEW
317
        logSchema.info(logger, `[Queue Sender] Job not queued, dropped by queue policy`, {
×
318
          type: 'queue',
319
          project: this.payload.tenant?.ref || SYSTEM_TENANT_REF,
×
320
          metadata: JSON.stringify({ queue: eventClass.getQueueName() }),
321
          sbReqId: this.payload.sbReqId,
322
        })
323

NEW
324
        return res
×
325
      }
326

327
      queueJobScheduled.add(1, {
1,072✔
328
        name: eventClass.getQueueName(),
329
      })
330

331
      return res
1,072✔
332
    } catch (e) {
333
      // If we can't queue the message for some reason,
334
      // we run its handler right away.
335
      // This might create some latency with the benefit of being more fault-tolerant
336
      logSchema.warning(
×
337
        logger,
338
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
339
        {
340
          type: 'queue',
341
          project: this.payload.tenant?.ref || SYSTEM_TENANT_REF,
×
342
          error: e,
343
          metadata: JSON.stringify(this.payload),
344
          sbReqId: this.payload.sbReqId,
345
        }
346
      )
347

348
      if (!eventClass.allowSync) {
×
349
        throw e
×
350
      }
351

352
      return eventClass.handle({
×
353
        id: '__sync',
354
        expireInSeconds: 0,
355
        name: eventClass.getQueueName(),
356
        data: {
357
          region,
358
          ...this.payload,
359
          $version: eventClass.version,
360
        },
361
      })
362
    } finally {
363
      const duration = Number(process.hrtime.bigint() - startTime) / 1e9
1,072✔
364
      queueJobSchedulingTime.record(duration, {
1,072✔
365
        name: eventClass.getQueueName(),
366
      })
367
    }
368
  }
369
}
370

371
function createTransactionQueueDB(tnx: TransactionalQueueDb) {
372
  return new PgQueueDB(tnx)
1✔
373
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc