• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 25721665641

12 May 2026 08:03AM UTC coverage: 74.298% (+0.008%) from 74.29%
25721665641

push

github

web-flow
fix: make progressive batch send cleanup unconditional (#1099)

Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>

4009 of 5943 branches covered (67.46%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

8051 of 10289 relevant lines covered (78.25%)

410.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.55
/src/internal/database/migrations/progressive.ts
1
import { areMigrationsUpToDate } from '@internal/database/migrations/migrate'
2
import { ErrorCode, isStorageError } from '@internal/errors'
3
import { RunMigrationsOnTenants } from '@storage/events'
4
import { getConfig } from '../../../config'
5
import { logger, logSchema } from '../../monitoring'
6
import { getTenantConfig, TenantMigrationStatus } from '../tenant'
7

8
const { dbMigrationFreezeAt } = getConfig()
43✔
9

10
export class ProgressiveMigrations {
11
  protected tenants: string[] = []
51✔
12
  protected emittingJobs = false
51✔
13
  protected inFlightCreateJobs?: Promise<void>
14
  protected pendingCreateJobsMax = 0
51✔
15
  protected watchInterval: NodeJS.Timeout | undefined
16

17
  constructor(protected readonly options: { maxSize: number; interval: number; watch?: boolean }) {
51✔
18
    if (typeof options.watch === 'undefined') {
51!
19
      this.options.watch = true
×
20
    }
21
  }
22

23
  start(signal: AbortSignal) {
24
    this.watchTenants(signal)
×
25

26
    signal.addEventListener('abort', () => {
×
27
      if (this.watchInterval) {
×
28
        clearInterval(this.watchInterval)
×
29
        logSchema.info(logger, '[Migrations] Stopping', {
×
30
          type: 'migrations',
31
        })
32
        this.drain().catch((e) => {
×
33
          logSchema.error(logger, '[Migrations] Error creating migration jobs', {
×
34
            type: 'migrations',
35
            error: e,
36
            metadata: JSON.stringify({
37
              strategy: 'progressive',
38
            }),
39
          })
40
        })
41
      }
42
    })
43
  }
44

45
  async drain() {
46
    return this.createJobs(this.tenants.length).catch((e) => {
3✔
UNCOV
47
      logSchema.error(logger, '[Migrations] Error creating migration jobs', {
×
48
        type: 'migrations',
49
        error: e,
50
        metadata: JSON.stringify({
51
          strategy: 'progressive',
52
        }),
53
      })
54
    })
55
  }
56

57
  addTenant(tenant: string) {
58
    const tenantIndex = this.tenants.indexOf(tenant)
3✔
59

60
    if (tenantIndex !== -1) {
3✔
61
      return
1✔
62
    }
63

64
    this.tenants.push(tenant)
2✔
65

66
    if (this.tenants.length < this.options.maxSize || this.emittingJobs) {
2!
67
      return
2✔
68
    }
69

70
    this.createJobs(this.options.maxSize).catch((e) => {
×
71
      logSchema.error(logger, '[Migrations] Error creating migration jobs', {
×
72
        type: 'migrations',
73
        error: e,
74
        metadata: JSON.stringify({
75
          strategy: 'progressive',
76
        }),
77
      })
78
    })
79
  }
80

81
  protected watchTenants(signal: AbortSignal) {
82
    if (signal.aborted || !this.options.watch) {
×
83
      return
×
84
    }
85
    this.watchInterval = setInterval(() => {
×
86
      if (this.emittingJobs) {
×
87
        return
×
88
      }
89

90
      this.createJobs(this.options.maxSize).catch((e) => {
×
91
        logSchema.error(logger, '[Migrations] Error creating migration jobs', {
×
92
          type: 'migrations',
93
          error: e,
94
          metadata: JSON.stringify({
95
            strategy: 'progressive',
96
          }),
97
        })
98
      })
99
    }, this.options.interval)
100
  }
101

102
  protected createJobs(maxJobs: number) {
103
    this.pendingCreateJobsMax = Math.max(this.pendingCreateJobsMax, maxJobs)
13✔
104

105
    if (this.inFlightCreateJobs) {
13✔
106
      return this.inFlightCreateJobs
2✔
107
    }
108

109
    this.emittingJobs = true
11✔
110
    this.inFlightCreateJobs = this.runCreateJobs().finally(() => {
11✔
111
      this.emittingJobs = false
11✔
112
      this.inFlightCreateJobs = undefined
11✔
113
      this.pendingCreateJobsMax = 0
11✔
114
    })
115

116
    return this.inFlightCreateJobs
11✔
117
  }
118

119
  protected async runCreateJobs() {
120
    while (this.pendingCreateJobsMax > 0) {
11✔
121
      const maxJobs = this.pendingCreateJobsMax
13✔
122
      this.pendingCreateJobsMax = 0
13✔
123
      await this.createJobsBatch(maxJobs)
13✔
124
    }
125
  }
126

127
  protected async createJobsBatch(maxJobs: number) {
128
    const tenantsBatch = this.tenants.slice(0, maxJobs)
13✔
129
    const jobs = await Promise.allSettled(
13✔
130
      tenantsBatch.map(async (tenant) => {
131
        const tenantConfig = await getTenantConfig(tenant)
17✔
132
        const migrationsUpToDate = await areMigrationsUpToDate(tenant)
12✔
133

134
        if (migrationsUpToDate || tenantConfig.syncMigrationsDone) {
12!
135
          return
×
136
        }
137

138
        const scheduleAt = new Date()
12✔
139
        scheduleAt.setMinutes(scheduleAt.getMinutes() + 5)
12✔
140
        const scheduleForLater =
141
          tenantConfig.migrationStatus === TenantMigrationStatus.FAILED_STALE
12!
142
            ? scheduleAt
143
            : undefined
144

145
        return new RunMigrationsOnTenants({
17✔
146
          tenantId: tenant,
147
          scheduleAt: scheduleForLater,
148
          upToMigration: dbMigrationFreezeAt,
149
          tenant: {
150
            host: '',
151
            ref: tenant,
152
          },
153
        })
154
      })
155
    )
156

157
    const completedTenants = new Set<string>()
13✔
158
    const droppedTenants = new Set<string>()
13✔
159
    const retryableFailedTenants = new Set<string>()
13✔
160
    const validJobs = jobs
13✔
161
      .map((job, index) => {
162
        const tenant = tenantsBatch[index]
17✔
163

164
        if (job.status === 'rejected') {
17✔
165
          // If there are more terminal errors later, we need to extend this check.
166
          if (isStorageError(ErrorCode.TenantNotFound, job.reason)) {
5✔
167
            droppedTenants.add(tenant)
2✔
168
            logSchema.warning(
2✔
169
              logger,
170
              `[Migrations] Failed to prepare migration job for tenant ${tenant}; dropping tenant from queue because it no longer exists`,
171
              {
172
                type: 'migrations',
173
                error: job.reason,
174
                project: tenant,
175
                metadata: JSON.stringify({
176
                  strategy: 'progressive',
177
                }),
178
              }
179
            )
180
            return
2✔
181
          }
182

183
          retryableFailedTenants.add(tenant)
3✔
184
          logSchema.warning(
3✔
185
            logger,
186
            `[Migrations] Failed to prepare migration job for tenant ${tenant}; keeping tenant queued for retry`,
187
            {
188
              type: 'migrations',
189
              error: job.reason,
190
              project: tenant,
191
              metadata: JSON.stringify({
192
                strategy: 'progressive',
193
              }),
194
            }
195
          )
196
          return
3✔
197
        }
198

199
        completedTenants.add(tenant)
12✔
200
        return job.value
12✔
201
      })
202
      .filter((job) => job)
17✔
203

204
    if (validJobs.length > 0) {
13✔
205
      try {
12✔
206
        await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[])
12✔
207
      } catch (e) {
208
        // batchSend failure: treat the would-be-completed tenants as retryable,
209
        // but still honor terminal drops (TenantNotFound).
210
        for (const tenant of completedTenants) {
3✔
211
          retryableFailedTenants.add(tenant)
3✔
212
        }
213
        completedTenants.clear()
3✔
214
        logSchema.error(logger, '[Migrations] Error sending migration jobs batch', {
3✔
215
          type: 'migrations',
216
          error: e,
217
          metadata: JSON.stringify({
218
            strategy: 'progressive',
219
          }),
220
        })
221
      }
222
    }
223

224
    // Cleanup runs whether batchSend succeeded or not.
225
    if (completedTenants.size > 0 || droppedTenants.size > 0 || retryableFailedTenants.size > 0) {
13!
226
      const remainingTenants = this.tenants.filter(
13✔
227
        (tenant) =>
228
          !completedTenants.has(tenant) &&
23✔
229
          !droppedTenants.has(tenant) &&
230
          !retryableFailedTenants.has(tenant)
231
      )
232
      const failedTenantsInQueue = this.tenants.filter((tenant) =>
13✔
233
        retryableFailedTenants.has(tenant)
23✔
234
      )
235
      this.tenants = remainingTenants.concat(failedTenantsInQueue)
13✔
236
    }
237
  }
238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc