• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 14273943858

04 Apr 2025 08:50PM UTC coverage: 77.778% (+1.2%) from 76.626%
14273943858

Pull #649

github

web-flow
Merge ca6564f55 into 954644b9a
Pull Request #649: feat: Pre-Signed URL Signatures with Storage JWT Secret

1399 of 1962 branches covered (71.3%)

Branch coverage included in aggregate %.

545 of 608 new or added lines in 16 files covered. (89.64%)

4 existing lines in 3 files now uncovered.

16227 of 20700 relevant lines covered (78.39%)

157.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.54
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7

1✔
8
export interface BasePayload {
1✔
9
  $version?: string
1✔
10
  singletonKey?: string
1✔
11
  scheduleAt?: Date
1✔
12
  reqId?: string
1✔
13
  tenant: {
1✔
14
    ref: string
1✔
15
    host: string
1✔
16
  }
1✔
17
}
1✔
18

1✔
19
export interface SlowRetryQueueOptions {
1✔
20
  retryLimit: number
1✔
21
  retryDelay: number
1✔
22
}
1✔
23

1✔
24
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
25

1✔
26
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
27

1✔
28
interface BaseEventConstructor<Base extends Event<any>> {
1✔
29
  version: string
1✔
30

1✔
31
  new (...args: any): Base
1✔
32

1✔
33
  send(
1✔
34
    this: StaticThis<Base>,
1✔
35
    payload: Omit<Base['payload'], '$version'>
1✔
36
  ): Promise<string | void | null>
1✔
37

1✔
38
  eventName(): string
1✔
39
  getWorkerOptions(): WorkOptions | BatchWorkOptions
1✔
40
}
1✔
41

1✔
42
/**
1✔
43
 * Base class for all events that are sent to the queue
1✔
44
 */
1✔
45
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
46
  public static readonly version: string = 'v1'
1✔
47
  protected static queueName = ''
1✔
48

1✔
49
  constructor(public readonly payload: T & BasePayload) {}
1✔
50

1✔
51
  static eventName() {
1✔
52
    return this.name
×
53
  }
×
54

1✔
55
  static getQueueName() {
1✔
56
    if (!this.queueName) {
10,042!
57
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
58
    }
×
59

10,042✔
60
    return this.queueName
10,042✔
61
  }
10,042✔
62

1✔
63
  static getQueueOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
64
    return undefined
5✔
65
  }
5✔
66

1✔
67
  static getWorkerOptions(): WorkOptions | BatchWorkOptions {
1✔
68
    return {}
×
69
  }
×
70

1✔
71
  static withSlowRetryQueue(): undefined | SlowRetryQueueOptions {
1✔
72
    return undefined
×
73
  }
×
74

1✔
75
  static getSlowRetryQueueName() {
1✔
76
    if (!this.queueName) {
×
77
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
78
    }
×
79

×
80
    return this.queueName + '-slow'
×
81
  }
×
82

1✔
83
  static onClose() {
1✔
84
    // no-op
×
85
  }
×
86

1✔
87
  static onStart() {
1✔
88
    // no-op
×
89
  }
×
90

1✔
91
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
92
    if (!pgQueueEnable) {
1!
UNCOV
93
      return Promise.all(messages.map((message) => message.send()))
✔
UNCOV
94
    }
×
95

1✔
96
    return Queue.getInstance().insert(
1✔
97
      messages.map((message) => {
1✔
98
        const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {}
1!
99
        if (!message.payload.$version) {
1✔
100
          ;(message.payload as (typeof message)['payload']).$version = this.version
1✔
101
        }
1✔
102

1✔
103
        if (message.payload.scheduleAt) {
1!
104
          sendOptions.startAfter = new Date(message.payload.scheduleAt)
×
105
        }
×
106

1✔
107
        return {
1✔
108
          ...sendOptions,
1✔
109
          name: this.getQueueName(),
1✔
110
          data: message.payload,
1✔
111
        }
1✔
112
      })
1✔
113
    )
1✔
114
  }
1✔
115

1✔
116
  static send<T extends Event<any>>(this: StaticThis<T>, payload: Omit<T['payload'], '$version'>) {
1✔
117
    if (!payload.$version) {
10,042✔
118
      ;(payload as T['payload']).$version = this.version
10,042✔
119
    }
10,042✔
120
    const that = new this(payload)
10,042✔
121
    return that.send()
10,042✔
122
  }
10,042✔
123

1✔
124
  static sendSlowRetryQueue<T extends Event<any>>(
1✔
125
    this: StaticThis<T>,
×
126
    payload: Omit<T['payload'], '$version'>
×
127
  ) {
×
128
    if (!payload.$version) {
×
129
      ;(payload as T['payload']).$version = this.version
×
130
    }
×
131
    const that = new this(payload)
×
132
    return that.sendSlowRetryQueue()
×
133
  }
×
134

1✔
135
  static handle(job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[]) {
1✔
136
    throw new Error('not implemented')
×
137
  }
×
138

1✔
139
  static async shouldSend(payload: any) {
1✔
140
    if (isMultitenant) {
14!
141
      // Do not send an event if disabled for this specific tenant
×
142
      const tenant = await getTenantConfig(payload.tenant.ref)
×
143
      const disabledEvents = tenant.disableEvents || []
×
144
      if (disabledEvents.includes(this.eventName())) {
×
145
        return false
×
146
      }
×
147
    }
×
148
    return true
14✔
149
  }
14✔
150

1✔
151
  async send(): Promise<string | void | null> {
1✔
152
    const constructor = this.constructor as typeof Event
10,042✔
153

10,042✔
154
    const shouldSend = await constructor.shouldSend(this.payload)
10,042✔
155

10,042✔
156
    if (!shouldSend) {
10,042!
157
      return
×
158
    }
×
159

10,042✔
160
    if (!pgQueueEnable) {
10,042✔
161
      return constructor.handle({
10,042✔
162
        id: '__sync',
10,042✔
163
        name: constructor.getQueueName(),
10,042✔
164
        data: {
10,042✔
165
          region,
10,042✔
166
          ...this.payload,
10,042✔
167
          $version: constructor.version,
10,042✔
168
        },
10,042✔
169
      })
10,042✔
170
    }
10,042✔
171

×
172
    const timer = QueueJobSchedulingTime.startTimer()
×
173
    let sendOptions = constructor.getQueueOptions(this.payload)
×
174

×
175
    if (this.payload.scheduleAt) {
×
176
      if (!sendOptions) {
×
177
        sendOptions = {}
×
178
      }
×
179
      sendOptions.startAfter = new Date(this.payload.scheduleAt)
×
180
    }
×
181

×
182
    try {
×
183
      const res = await Queue.getInstance().send({
×
184
        name: constructor.getQueueName(),
×
185
        data: {
×
186
          region,
×
187
          ...this.payload,
×
188
          $version: constructor.version,
×
189
        },
×
190
        options: sendOptions,
×
191
      })
×
192

×
193
      QueueJobScheduled.inc({
×
194
        name: constructor.getQueueName(),
×
195
      })
×
196

×
197
      return res
×
198
    } catch (e) {
×
199
      // If we can't queue the message for some reason,
×
200
      // we run its handler right away.
×
201
      // This might create some latency with the benefit of being more fault-tolerant
×
202
      logSchema.warning(
×
203
        logger,
×
204
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
205
        {
×
206
          type: 'queue',
×
207
          error: e,
×
208
          metadata: JSON.stringify(this.payload),
×
209
        }
×
210
      )
×
211
      return constructor.handle({
×
212
        id: '__sync',
×
213
        name: constructor.getQueueName(),
×
214
        data: {
×
215
          region,
×
216
          ...this.payload,
×
217
          $version: constructor.version,
×
218
        },
×
219
      })
×
220
    } finally {
×
221
      timer({
×
222
        name: constructor.getQueueName(),
×
223
      })
×
224
    }
×
225
  }
10,042✔
226

1✔
227
  async sendSlowRetryQueue() {
1✔
228
    const constructor = this.constructor as typeof Event
×
229
    const slowRetryQueue = constructor.withSlowRetryQueue()
×
230

×
231
    if (!pgQueueEnable || !slowRetryQueue) {
×
232
      return
×
233
    }
×
234

×
235
    const timer = QueueJobSchedulingTime.startTimer()
×
236
    const sendOptions = constructor.getQueueOptions(this.payload) || {}
×
237

×
238
    const res = await Queue.getInstance().send({
×
239
      name: constructor.getSlowRetryQueueName(),
×
240
      data: {
×
241
        region,
×
242
        ...this.payload,
×
243
        $version: constructor.version,
×
244
      },
×
245
      options: {
×
246
        retryBackoff: true,
×
247
        startAfter: 60 * 60 * 30, // 30 mins
×
248
        ...sendOptions,
×
249
        ...slowRetryQueue,
×
250
      },
×
251
    })
×
252

×
253
    timer({
×
254
      name: constructor.getSlowRetryQueueName(),
×
255
    })
×
256

×
257
    QueueJobScheduled.inc({
×
258
      name: constructor.getSlowRetryQueueName(),
×
259
    })
×
260

×
261
    return res
×
262
  }
×
263
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc