• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 12886525928

21 Jan 2025 12:03PM UTC coverage: 77.302% (-0.7%) from 78.032%
12886525928

push

github

web-flow
feat: reconcile orphan objects from admin endpoint (#606)

1278 of 1809 branches covered (70.65%)

Branch coverage included in aggregate %.

912 of 1340 new or added lines in 26 files covered. (68.06%)

1 existing line in 1 file now uncovered.

15175 of 19475 relevant lines covered (77.92%)

159.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.77
/src/internal/queue/event.ts
1
import { Queue } from './queue'
1✔
2
import PgBoss, { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss'
1✔
3
import { getConfig } from '../../config'
1✔
4
import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics'
1✔
5
import { logger, logSchema } from '@internal/monitoring'
1✔
6
import { getTenantConfig } from '@internal/database'
1✔
7

1✔
8
export interface BasePayload {
1✔
9
  $version?: string
1✔
10
  singletonKey?: string
1✔
11
  reqId?: string
1✔
12
  tenant: {
1✔
13
    ref: string
1✔
14
    host: string
1✔
15
  }
1✔
16
}
1✔
17

1✔
18
export interface SlowRetryQueueOptions {
1✔
19
  retryLimit: number
1✔
20
  retryDelay: number
1✔
21
}
1✔
22

1✔
23
const { pgQueueEnable, region, isMultitenant } = getConfig()
1✔
24

1✔
25
export type StaticThis<T extends Event<any>> = BaseEventConstructor<T>
1✔
26

1✔
27
interface BaseEventConstructor<Base extends Event<any>> {
1✔
28
  version: string
1✔
29

1✔
30
  new (...args: any): Base
1✔
31

1✔
32
  send(
1✔
33
    this: StaticThis<Base>,
1✔
34
    payload: Omit<Base['payload'], '$version'>
1✔
35
  ): Promise<string | void | null>
1✔
36

1✔
37
  eventName(): string
1✔
38
  getWorkerOptions(): WorkOptions | BatchWorkOptions
1✔
39
}
1✔
40

1✔
41
/**
1✔
42
 * Base class for all events that are sent to the queue
1✔
43
 */
1✔
44
export class Event<T extends Omit<BasePayload, '$version'>> {
1✔
45
  public static readonly version: string = 'v1'
1✔
46
  protected static queueName = ''
1✔
47

1✔
48
  constructor(public readonly payload: T & BasePayload) {}
1✔
49

1✔
50
  static eventName() {
1✔
51
    return this.name
×
52
  }
×
53

1✔
54
  static getQueueName() {
1✔
55
    if (!this.queueName) {
10,040!
56
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
57
    }
×
58

10,040✔
59
    return this.queueName
10,040✔
60
  }
10,040✔
61

1✔
62
  static getQueueOptions<T extends Event<any>>(payload: T['payload']): SendOptions | undefined {
1✔
63
    return undefined
5✔
64
  }
5✔
65

1✔
66
  static getWorkerOptions(): WorkOptions | BatchWorkOptions {
1✔
67
    return {}
×
68
  }
×
69

1✔
70
  static withSlowRetryQueue(): undefined | SlowRetryQueueOptions {
1✔
71
    return undefined
×
72
  }
×
73

1✔
74
  static getSlowRetryQueueName() {
1✔
75
    if (!this.queueName) {
×
76
      throw new Error(`Queue name not set on ${this.constructor.name}`)
×
77
    }
×
78

×
79
    return this.queueName + '-slow'
×
80
  }
×
81

1✔
82
  static onClose() {
1✔
83
    // no-op
×
84
  }
×
85

1✔
86
  static onStart() {
1✔
87
    // no-op
×
88
  }
×
89

1✔
90
  static batchSend<T extends Event<any>[]>(messages: T) {
1✔
91
    if (!pgQueueEnable) {
1✔
92
      return Promise.all(messages.map((message) => message.send()))
1✔
93
    }
1✔
NEW
94

×
95
    return Queue.getInstance().insert(
×
96
      messages.map((message) => {
×
97
        const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {}
×
98
        if (!message.payload.$version) {
×
99
          ;(message.payload as (typeof message)['payload']).$version = this.version
×
100
        }
×
101
        return {
×
102
          ...sendOptions,
×
103
          name: this.getQueueName(),
×
104
          data: message.payload,
×
105
        }
×
106
      })
×
107
    )
×
108
  }
×
109

1✔
110
  static send<T extends Event<any>>(this: StaticThis<T>, payload: Omit<T['payload'], '$version'>) {
1✔
111
    if (!payload.$version) {
10,040✔
112
      ;(payload as T['payload']).$version = this.version
10,040✔
113
    }
10,040✔
114
    const that = new this(payload)
10,040✔
115
    return that.send()
10,040✔
116
  }
10,040✔
117

1✔
118
  static sendSlowRetryQueue<T extends Event<any>>(
1✔
119
    this: StaticThis<T>,
×
120
    payload: Omit<T['payload'], '$version'>
×
121
  ) {
×
122
    if (!payload.$version) {
×
123
      ;(payload as T['payload']).$version = this.version
×
124
    }
×
125
    const that = new this(payload)
×
126
    return that.sendSlowRetryQueue()
×
127
  }
×
128

1✔
129
  static handle(job: Job<Event<any>['payload']> | Job<Event<any>['payload']>[]) {
1✔
130
    throw new Error('not implemented')
×
131
  }
×
132

1✔
133
  static async shouldSend(payload: any) {
1✔
134
    if (isMultitenant) {
13!
135
      // Do not send an event if disabled for this specific tenant
×
136
      const tenant = await getTenantConfig(payload.tenant.ref)
×
137
      const disabledEvents = tenant.disableEvents || []
×
138
      if (disabledEvents.includes(this.eventName())) {
×
139
        return false
×
140
      }
×
141
    }
×
142
    return true
13✔
143
  }
13✔
144

1✔
145
  async send(): Promise<string | void | null> {
1✔
146
    const constructor = this.constructor as typeof Event
10,040✔
147

10,040✔
148
    const shouldSend = await constructor.shouldSend(this.payload)
10,040✔
149

10,040✔
150
    if (!shouldSend) {
10,040!
151
      return
×
152
    }
×
153

10,040✔
154
    if (!pgQueueEnable) {
10,040✔
155
      return constructor.handle({
10,040✔
156
        id: '__sync',
10,040✔
157
        name: constructor.getQueueName(),
10,040✔
158
        data: {
10,040✔
159
          region,
10,040✔
160
          ...this.payload,
10,040✔
161
          $version: constructor.version,
10,040✔
162
        },
10,040✔
163
      })
10,040✔
164
    }
10,040✔
165

×
166
    const timer = QueueJobSchedulingTime.startTimer()
×
167
    const sendOptions = constructor.getQueueOptions(this.payload)
×
168

×
169
    try {
×
170
      const res = await Queue.getInstance().send({
×
171
        name: constructor.getQueueName(),
×
172
        data: {
×
173
          region,
×
174
          ...this.payload,
×
175
          $version: constructor.version,
×
176
        },
×
177
        options: sendOptions,
×
178
      })
×
179

×
180
      QueueJobScheduled.inc({
×
181
        name: constructor.getQueueName(),
×
182
      })
×
183

×
184
      return res
×
185
    } catch (e) {
×
186
      // If we can't queue the message for some reason,
×
187
      // we run its handler right away.
×
188
      // This might create some latency with the benefit of being more fault-tolerant
×
189
      logSchema.warning(
×
190
        logger,
×
191
        `[Queue Sender] Error while sending job to queue, sending synchronously`,
×
192
        {
×
193
          type: 'queue',
×
194
          error: e,
×
195
          metadata: JSON.stringify(this.payload),
×
196
        }
×
197
      )
×
198
      return constructor.handle({
×
199
        id: '__sync',
×
200
        name: constructor.getQueueName(),
×
201
        data: {
×
202
          region,
×
203
          ...this.payload,
×
204
          $version: constructor.version,
×
205
        },
×
206
      })
×
207
    } finally {
×
208
      timer({
×
209
        name: constructor.getQueueName(),
×
210
      })
×
211
    }
×
212
  }
10,040✔
213

1✔
214
  async sendSlowRetryQueue() {
1✔
215
    const constructor = this.constructor as typeof Event
×
216
    const slowRetryQueue = constructor.withSlowRetryQueue()
×
217

×
218
    if (!pgQueueEnable || !slowRetryQueue) {
×
219
      return
×
220
    }
×
221

×
222
    const timer = QueueJobSchedulingTime.startTimer()
×
223
    const sendOptions = constructor.getQueueOptions(this.payload) || {}
×
224

×
225
    const res = await Queue.getInstance().send({
×
226
      name: constructor.getSlowRetryQueueName(),
×
227
      data: {
×
228
        region,
×
229
        ...this.payload,
×
230
        $version: constructor.version,
×
231
      },
×
232
      options: {
×
233
        retryBackoff: true,
×
234
        startAfter: 60 * 60 * 30, // 30 mins
×
235
        ...sendOptions,
×
236
        ...slowRetryQueue,
×
237
      },
×
238
    })
×
239

×
240
    timer({
×
241
      name: constructor.getSlowRetryQueueName(),
×
242
    })
×
243

×
244
    QueueJobScheduled.inc({
×
245
      name: constructor.getSlowRetryQueueName(),
×
246
    })
×
247

×
248
    return res
×
249
  }
×
250
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc