• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24849782094

23 Apr 2026 05:40PM UTC coverage: 32.29% (-39.0%) from 71.255%
24849782094

Pull #1046

github

web-flow
Merge bbd764db2 into 82d3f6383
Pull Request #1046: refactor: drop axios from tests and webhooks

1530 of 5420 branches covered (28.23%)

Branch coverage included in aggregate %.

4 of 20 new or added lines in 1 file covered. (20.0%)

3875 existing lines in 165 files now uncovered.

3317 of 9591 relevant lines covered (34.58%)

10.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.36
/src/internal/database/pool.ts
1
import { type CacheLookupOutcome, createTtlCache, TENANT_POOL_CACHE_NAME } from '@internal/cache'
2
import { wait } from '@internal/concurrency'
3
import { getSslSettings } from '@internal/database/ssl'
4
import { logger, logSchema } from '@internal/monitoring'
5
import {
6
  cacheEvictionsTotal,
7
  cacheRequestsTotal,
8
  dbActiveConnection,
9
  dbActivePool,
10
  dbInUseConnection,
11
  isMetricEnabled,
12
  meter,
13
} from '@internal/monitoring/metrics'
14
import { JWTPayload } from 'jose'
15
import { Knex, knex } from 'knex'
16
import { getConfig } from '../../config'
17

18
const {
19
  isMultitenant,
20
  databaseSSLRootCert,
21
  databaseMaxConnections,
22
  databaseFreePoolAfterInactivity,
23
  databaseConnectionTimeout,
24
  dbSearchPath,
25
  dbPostgresVersion,
26
  databaseApplicationName,
27
  tenantPoolCacheHitLogSampleRate,
28
  tenantPoolCacheMissLogSampleRate,
29✔
29
} = getConfig()
30

31
export const TENANT_POOL_CACHE_LOOKUP_LOG_TYPE = 'cache'
29✔
32
export const TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE = '[Cache] Tenant pool lookup'
29✔
33

34
export interface TenantConnectionOptions {
35
  user: User
36
  superUser: User
37

38
  tenantId: string
39
  dbUrl: string
40
  isExternalPool?: boolean
41
  isSingleUse?: boolean
42
  idleTimeoutMillis?: number
43
  reapIntervalMillis?: number
44
  maxConnections: number
45
  clusterSize?: number
46
  numWorkers?: number
47
  headers?: Record<string, string | undefined | string[]>
48
  method?: string
49
  path?: string
50
  operation?: () => string | undefined
51
}
52

53
export interface User {
54
  jwt: string
55
  payload: { role?: string } & JWTPayload
56
}
57

58
export interface PoolStats {
59
  used: number
60
  total: number
61
}
62

63
export interface PoolStrategy {
64
  acquire(): Knex
65
  rebalance(options: { clusterSize: number }): void
66
  destroy(): Promise<void>
67
  getPoolStats(): PoolStats | null
68
}
69

70
export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
29✔
71
  Boolean
72
)
73

74
const multiTenantTtlConfig = {
29✔
75
  ttl: 1000 * 10,
76
  updateAgeOnGet: true,
77
  checkAgeOnGet: true,
78
}
79

80
const manuallyDestroyedPools = new WeakSet<PoolStrategy>()
29✔
81

82
function logPoolDestroyError(error: unknown): void {
83
  logSchema.error(logger, 'pool was not able to be destroyed', {
×
84
    type: 'db',
85
    error,
86
  })
87
}
88

89
async function destroyPool(pool: PoolStrategy): Promise<void> {
90
  await pool.destroy()
21✔
91
}
92

93
async function destroyPoolSafely(pool: PoolStrategy): Promise<void> {
94
  try {
6✔
95
    await destroyPool(pool)
6✔
96
  } catch (e) {
97
    logPoolDestroyError(e)
×
98
  }
99
}
100

101
function recordTenantPoolCacheEviction(reason: string): void {
102
  // Explicit destroy paths are filtered before this helper is called.
103
  if (reason === 'stale' || reason === 'evict' || reason === 'delete') {
6!
104
    cacheEvictionsTotal.add(1, {
6✔
105
      cache: TENANT_POOL_CACHE_NAME,
106
    })
107
  }
108
}
109

110
function recordTenantPoolCacheRequest(outcome: string): void {
111
  cacheRequestsTotal.add(1, {
28✔
112
    cache: TENANT_POOL_CACHE_NAME,
113
    outcome,
114
  })
115
}
116

117
function shouldLogTenantPoolCacheLookup(sampleRate: number): boolean {
118
  return sampleRate >= 1 || (sampleRate > 0 && Math.random() < sampleRate)
28!
119
}
120

121
function logTenantPoolCacheLookup(
122
  settings: TenantConnectionOptions,
123
  isCacheable: boolean,
124
  outcome: CacheLookupOutcome
125
): void {
126
  const sampleRate =
127
    outcome === 'hit' ? tenantPoolCacheHitLogSampleRate : tenantPoolCacheMissLogSampleRate
28✔
128

129
  if (!shouldLogTenantPoolCacheLookup(sampleRate)) {
28✔
130
    return
26✔
131
  }
132

133
  const log = {
2✔
134
    type: TENANT_POOL_CACHE_LOOKUP_LOG_TYPE,
135
    cache: TENANT_POOL_CACHE_NAME,
136
    tenantId: settings.tenantId,
137
    project: settings.tenantId,
138
    outcome,
139
    sampleRate,
140
    sampleWeight: 1 / sampleRate,
141
    isCacheable,
142
    isExternalPool: Boolean(settings.isExternalPool),
143
    isSingleUse: Boolean(settings.isSingleUse),
144
  }
145

146
  logSchema.info(logger, TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE, log)
2✔
147
}
148

149
const tenantPools = createTtlCache<string, PoolStrategy>({
29✔
150
  ...(isMultitenant ? multiTenantTtlConfig : { max: 1, ttl: Infinity }),
29✔
151
  dispose: async (pool, _tenantId, reason) => {
152
    if (!pool || manuallyDestroyedPools.has(pool)) {
21✔
153
      return
15✔
154
    }
155

156
    recordTenantPoolCacheEviction(reason)
6✔
157

158
    await destroyPoolSafely(pool)
6✔
159
  },
160
})
161

162
// ============================================================================
163
// Pool stats collection — chunked to avoid blocking the event loop
164
// ============================================================================
165
interface PoolStatsSnapshot {
166
  poolCount: number
167
  totalConnections: number
168
  totalInUse: number
169
}
170

171
const STATS_CHUNK_SIZE = 100
29✔
172
const STATS_INTERVAL_MS = 5_000
29✔
173

174
let cachedPoolStats: PoolStatsSnapshot = {
29✔
175
  poolCount: 0,
176
  totalConnections: 0,
177
  totalInUse: 0,
178
}
179
let collectInProgress = false
29✔
180

181
async function collectPoolStats() {
182
  if (collectInProgress) return
5!
183
  collectInProgress = true
5✔
184

185
  try {
5✔
186
    let poolCount = 0
5✔
187
    let totalConnections = 0
5✔
188
    let totalInUse = 0
5✔
189
    let chunkCount = 0
5✔
190

191
    for (const [, pool] of tenantPools.entries()) {
5✔
192
      poolCount++
2✔
193
      const stats = pool.getPoolStats()
2✔
194
      if (stats) {
2!
195
        totalConnections += stats.total
2✔
196
        totalInUse += stats.used
2✔
197
      }
198
      // Yield to the event loop between chunks
199
      if (++chunkCount % STATS_CHUNK_SIZE === 0) {
2!
200
        await new Promise<void>((resolve) => setImmediate(resolve))
×
201
      }
202
    }
203

204
    cachedPoolStats = {
5✔
205
      poolCount,
206
      totalConnections,
207
      totalInUse,
208
    }
209
  } finally {
210
    collectInProgress = false
5✔
211
  }
212
}
213

214
/**
215
 * PoolManager is a class that manages a pool of Knex connections.
216
 * It creates a new pool for each tenant and reuses existing pools.
217
 */
218
export class PoolManager {
219
  protected numWorkers: number = 1
29✔
220

221
  setNumWorkers(numWorkers: number) {
222
    this.numWorkers = Math.max(numWorkers ?? 1, 1)
×
223
  }
224

225
  monitor() {
226
    // Periodically collect stats in a non-blocking way
227
    const interval = setInterval(() => {
1✔
228
      void collectPoolStats()
5✔
229
    }, STATS_INTERVAL_MS)
230
    interval.unref()
1✔
231

232
    // Observable callback reads the cached snapshot — O(1)
233
    meter.addBatchObservableCallback(
1✔
234
      (observer) => {
235
        if (isMetricEnabled('db_active_local_pools')) {
1!
236
          observer.observe(dbActivePool, cachedPoolStats.poolCount)
1✔
237
        }
238
        if (isMetricEnabled('db_connections')) {
1!
239
          observer.observe(dbActiveConnection, cachedPoolStats.totalConnections)
1✔
240
        }
241
        if (isMetricEnabled('db_connections_in_use')) {
1!
242
          observer.observe(dbInUseConnection, cachedPoolStats.totalInUse)
1✔
243
        }
244
      },
245
      [dbActivePool, dbActiveConnection, dbInUseConnection]
246
    )
247
  }
248

249
  rebalanceAll(data: { clusterSize: number }) {
250
    for (const pool of tenantPools.values()) {
1✔
251
      pool.rebalance({
2✔
252
        clusterSize: data.clusterSize,
253
      })
254
    }
255
  }
256

257
  rebalance(tenantId: string, data: { clusterSize: number }) {
258
    const pool = tenantPools.get(tenantId)
×
259
    if (pool) {
×
260
      pool.rebalance({
×
261
        clusterSize: data.clusterSize,
262
      })
263
    }
264
  }
265

266
  getPool(settings: TenantConnectionOptions) {
267
    const isCacheable = (settings.isSingleUse && !settings.isExternalPool) || !settings.isSingleUse
31✔
268
    const { value: existingPool, outcome } = tenantPools.getWithOutcome(settings.tenantId)
31✔
269

270
    if (existingPool) {
31✔
271
      recordTenantPoolCacheRequest(outcome)
6✔
272
      logTenantPoolCacheLookup(settings, isCacheable, outcome)
6✔
273

274
      return existingPool
6✔
275
    }
276

277
    if (!isCacheable) {
25✔
278
      return this.newPool({ ...settings, numWorkers: this.numWorkers })
3✔
279
    }
280

281
    recordTenantPoolCacheRequest(outcome)
22✔
282
    logTenantPoolCacheLookup(settings, isCacheable, outcome)
22✔
283

284
    const newPool = this.newPool({ ...settings, numWorkers: this.numWorkers })
22✔
285

286
    tenantPools.set(settings.tenantId, newPool)
22✔
287
    return newPool
22✔
288
  }
289

290
  destroy(tenantId: string) {
291
    const pool = tenantPools.get(tenantId)
2✔
292
    if (pool) {
2!
293
      manuallyDestroyedPools.add(pool)
2✔
294
      tenantPools.delete(tenantId)
2✔
295
      return destroyPool(pool).finally(() => {
2✔
296
        manuallyDestroyedPools.delete(pool)
2✔
297
      })
298
    }
UNCOV
299
    return Promise.resolve()
×
300
  }
301

302
  destroyAll() {
303
    const promises: Promise<void>[] = []
15✔
304

305
    for (const [connectionString, pool] of tenantPools) {
15✔
306
      manuallyDestroyedPools.add(pool)
13✔
307
      tenantPools.delete(connectionString)
13✔
308
      promises.push(
13✔
309
        destroyPool(pool).finally(() => {
310
          manuallyDestroyedPools.delete(pool)
13✔
311
        })
312
      )
313
    }
314
    return Promise.allSettled(promises)
15✔
315
  }
316

317
  protected newPool(settings: TenantConnectionOptions): PoolStrategy {
UNCOV
318
    return new TenantPool(settings)
×
319
  }
320
}
321

322
/**
323
 * TenantPool create a new Knex pool for each tenant, with rebalance
324
 * functionality to adjust the number of connections based on the cluster size.
325
 */
326
class TenantPool implements PoolStrategy {
327
  protected pool?: Knex
328

UNCOV
329
  constructor(protected readonly options: TenantConnectionOptions) {}
×
330

331
  acquire() {
UNCOV
332
    if (this.pool) {
×
UNCOV
333
      return this.pool
×
334
    }
335

UNCOV
336
    this.pool = this.createKnexPool()
×
UNCOV
337
    return this.pool
×
338
  }
339

340
  destroy(): Promise<void> {
UNCOV
341
    const originalPool = this.pool
×
342

UNCOV
343
    if (!originalPool) {
×
UNCOV
344
      return Promise.resolve()
×
345
    }
346

UNCOV
347
    this.pool = undefined
×
UNCOV
348
    return this.drainPool(originalPool)
×
349
  }
350

351
  getPoolStats(): PoolStats | null {
352
    const tarnPool = this.pool?.client?.pool
×
353
    if (!tarnPool) return null
×
354
    return {
×
355
      used: tarnPool.numUsed(),
356
      total: tarnPool.numUsed() + tarnPool.numFree(),
357
    }
358
  }
359

360
  getSettings() {
UNCOV
361
    const isSingleUseExternalPool = this.options.isSingleUse && this.options.isExternalPool
×
362

UNCOV
363
    const numWorkers = Math.max(this.options.numWorkers ?? 1, 1)
×
UNCOV
364
    const clusterSize = this.options.clusterSize || 0
×
UNCOV
365
    let maxConnection = this.options.maxConnections || databaseMaxConnections
×
366

UNCOV
367
    const divisor = Math.max(clusterSize, 1) * numWorkers
×
UNCOV
368
    if (divisor > 1) {
×
369
      maxConnection = Math.ceil(maxConnection / divisor) || 1
×
370
    }
371

UNCOV
372
    if (isSingleUseExternalPool) {
×
UNCOV
373
      maxConnection = 1
×
374
    }
375

UNCOV
376
    return {
×
377
      ...this.options,
378
      searchPath: this.options.isExternalPool ? undefined : searchPath,
×
379
      idleTimeoutMillis: isSingleUseExternalPool ? 100 : databaseFreePoolAfterInactivity,
×
380
      reapIntervalMillis: isSingleUseExternalPool ? 50 : undefined,
×
381
      maxConnections: maxConnection,
382
    }
383
  }
384

385
  rebalance(options: { clusterSize: number }) {
386
    if (options.clusterSize === 0) {
×
387
      return
×
388
    }
389

390
    const originalPool = this.pool
×
391

392
    this.options.clusterSize = options.clusterSize
×
393
    this.pool = undefined
×
394

395
    if (originalPool) {
×
396
      this.drainPool(originalPool).catch((e) => {
×
397
        logSchema.error(logger, 'Error draining tenant pool', {
×
398
          type: 'pool',
399
          error: e,
400
        })
401
      })
402
    }
403
  }
404

405
  protected async drainPool(pool: Knex) {
UNCOV
406
    for (; pool?.client?.pool; ) {
×
UNCOV
407
      let waiting = 0
×
UNCOV
408
      waiting += pool.client.pool.numPendingAcquires()
×
UNCOV
409
      waiting += pool.client.pool.numPendingValidations()
×
UNCOV
410
      waiting += pool.client.pool.numPendingCreates()
×
411

UNCOV
412
      if (waiting === 0) {
×
UNCOV
413
        break
×
414
      }
415

416
      await wait(200)
×
417
    }
418

UNCOV
419
    return pool.destroy()
×
420
  }
421

422
  protected createKnexPool() {
UNCOV
423
    const settings = this.getSettings()
×
UNCOV
424
    const sslSettings = getSslSettings({
×
425
      connectionString: settings.dbUrl,
426
      databaseSSLRootCert,
427
    })
428

UNCOV
429
    const maxConnections = settings.maxConnections
×
430

UNCOV
431
    return knex({
×
432
      client: 'pg',
433
      version: dbPostgresVersion,
434
      searchPath: settings.searchPath,
435
      pool: {
436
        min: 0,
437
        max: maxConnections,
438
        acquireTimeoutMillis: databaseConnectionTimeout,
439
        idleTimeoutMillis: settings.idleTimeoutMillis,
440
        reapIntervalMillis: 1000,
441
      },
442
      connection: {
443
        connectionString: settings.dbUrl,
444
        connectionTimeoutMillis: databaseConnectionTimeout,
445
        ssl: sslSettings ? { ...sslSettings } : undefined,
×
446
        application_name: databaseApplicationName,
447
      },
448
      acquireConnectionTimeout: databaseConnectionTimeout,
449
    })
450
  }
451
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc