• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 25748746478

12 May 2026 04:45PM UTC coverage: 74.406% (+0.1%) from 74.259%
25748746478

Pull #1097

github

web-flow
Merge 222e0733e into defbbb616
Pull Request #1097: fix(auth): pass alg to importJWK for jose v6 compatibility with Auth0…

4027 of 5957 branches covered (67.6%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

40 existing lines in 4 files now uncovered.

8067 of 10297 relevant lines covered (78.34%)

410.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.59
/src/internal/database/pool.ts
1
import { type CacheLookupOutcome, createTtlCache, TENANT_POOL_CACHE_NAME } from '@internal/cache'
2
import { wait } from '@internal/concurrency'
3
import { getSslSettings } from '@internal/database/ssl'
4
import { logger, logSchema } from '@internal/monitoring'
5
import {
6
  cacheEvictionsTotal,
7
  cacheRequestsTotal,
8
  dbActiveConnection,
9
  dbActivePool,
10
  dbInUseConnection,
11
  isMetricEnabled,
12
  meter,
13
} from '@internal/monitoring/metrics'
14
import { JWTPayload } from 'jose'
15
import { Knex, knex } from 'knex'
16
import { getConfig } from '../../config'
17

18
const {
19
  isMultitenant,
20
  databaseSSLRootCert,
21
  databaseMaxConnections,
22
  databaseFreePoolAfterInactivity,
23
  databaseConnectionTimeout,
24
  dbSearchPath,
25
  dbPostgresVersion,
26
  databaseApplicationName,
27
  tenantPoolCacheTtlMs,
28
  tenantPoolCacheHitLogSampleRate,
29
  tenantPoolCacheMissLogSampleRate,
73✔
30
} = getConfig()
31

32
export const TENANT_POOL_CACHE_LOOKUP_LOG_TYPE = 'cache'
73✔
33
export const TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE = '[Cache] Tenant pool lookup'
73✔
34

35
export interface TenantConnectionOptions {
36
  user: User
37
  superUser: User
38

39
  tenantId: string
40
  dbUrl: string
41
  isExternalPool?: boolean
42
  isSingleUse?: boolean
43
  idleTimeoutMillis?: number
44
  reapIntervalMillis?: number
45
  maxConnections: number
46
  clusterSize?: number
47
  numWorkers?: number
48
  headers?: Record<string, string | undefined | string[]>
49
  method?: string
50
  path?: string
51
  operation?: () => string | undefined
52
}
53

54
export interface User {
55
  jwt: string
56
  payload: { role?: string } & JWTPayload
57
}
58

59
export interface PoolStats {
60
  used: number
61
  total: number
62
}
63

64
export interface PoolRebalanceOptions {
65
  clusterSize?: number
66
  maxConnections?: number
67
}
68

69
export interface PoolStrategy {
70
  acquire(): Knex
71
  rebalance(options: PoolRebalanceOptions): void
72
  destroy(): Promise<void>
73
  getPoolStats(): PoolStats | null
74
}
75

76
export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
73✔
77
  Boolean
78
)
79

80
const multiTenantTtlConfig = {
73✔
81
  ttl: tenantPoolCacheTtlMs,
82
  updateAgeOnGet: true,
83
  checkAgeOnGet: true,
84
}
85

86
const manuallyDestroyedPools = new WeakSet<PoolStrategy>()
73✔
87

88
function logPoolDestroyError(error: unknown): void {
UNCOV
89
  logSchema.error(logger, 'pool was not able to be destroyed', {
×
90
    type: 'db',
91
    error,
92
  })
93
}
94

95
async function destroyPool(pool: PoolStrategy): Promise<void> {
96
  await pool.destroy()
24✔
97
}
98

99
async function destroyPoolSafely(pool: PoolStrategy): Promise<void> {
100
  try {
7✔
101
    await destroyPool(pool)
7✔
102
  } catch (e) {
UNCOV
103
    logPoolDestroyError(e)
×
104
  }
105
}
106

107
function recordTenantPoolCacheEviction(reason: string): void {
108
  // Explicit destroy paths are filtered before this helper is called.
109
  if (reason === 'stale' || reason === 'evict' || reason === 'delete') {
7!
110
    cacheEvictionsTotal.add(1, {
7✔
111
      cache: TENANT_POOL_CACHE_NAME,
112
    })
113
  }
114
}
115

116
function recordTenantPoolCacheRequest(outcome: string): void {
117
  cacheRequestsTotal.add(1, {
35✔
118
    cache: TENANT_POOL_CACHE_NAME,
119
    outcome,
120
  })
121
}
122

123
function shouldLogTenantPoolCacheLookup(sampleRate: number): boolean {
124
  return sampleRate >= 1 || (sampleRate > 0 && Math.random() < sampleRate)
35!
125
}
126

127
function logTenantPoolCacheLookup(
128
  settings: TenantConnectionOptions,
129
  isCacheable: boolean,
130
  outcome: CacheLookupOutcome
131
): void {
132
  const sampleRate =
133
    outcome === 'hit' ? tenantPoolCacheHitLogSampleRate : tenantPoolCacheMissLogSampleRate
35✔
134

135
  if (!shouldLogTenantPoolCacheLookup(sampleRate)) {
35✔
136
    return
33✔
137
  }
138

139
  const log = {
2✔
140
    type: TENANT_POOL_CACHE_LOOKUP_LOG_TYPE,
141
    cache: TENANT_POOL_CACHE_NAME,
142
    tenantId: settings.tenantId,
143
    project: settings.tenantId,
144
    outcome,
145
    sampleRate,
146
    sampleWeight: 1 / sampleRate,
147
    isCacheable,
148
    isExternalPool: Boolean(settings.isExternalPool),
149
    isSingleUse: Boolean(settings.isSingleUse),
150
  }
151

152
  logSchema.info(logger, TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE, log)
2✔
153
}
154

155
const tenantPools = createTtlCache<string, PoolStrategy>({
73✔
156
  ...(isMultitenant ? multiTenantTtlConfig : { max: 1, ttl: Infinity }),
73✔
157
  dispose: async (pool, _tenantId, reason) => {
158
    if (!pool || manuallyDestroyedPools.has(pool)) {
24✔
159
      return
17✔
160
    }
161

162
    recordTenantPoolCacheEviction(reason)
7✔
163

164
    await destroyPoolSafely(pool)
7✔
165
  },
166
})
167

168
// ============================================================================
169
// Pool stats collection — chunked to avoid blocking the event loop
170
// ============================================================================
171
interface PoolStatsSnapshot {
172
  poolCount: number
173
  totalConnections: number
174
  totalInUse: number
175
}
176

177
const STATS_CHUNK_SIZE = 100
73✔
178
const STATS_INTERVAL_MS = 5_000
73✔
179

180
let cachedPoolStats: PoolStatsSnapshot = {
73✔
181
  poolCount: 0,
182
  totalConnections: 0,
183
  totalInUse: 0,
184
}
185
let collectInProgress = false
73✔
186

187
async function collectPoolStats() {
188
  if (collectInProgress) return
5!
189
  collectInProgress = true
5✔
190

191
  try {
5✔
192
    let poolCount = 0
5✔
193
    let totalConnections = 0
5✔
194
    let totalInUse = 0
5✔
195
    let chunkCount = 0
5✔
196

197
    for (const [, pool] of tenantPools.entries()) {
5✔
198
      poolCount++
2✔
199
      const stats = pool.getPoolStats()
2✔
200
      if (stats) {
2!
201
        totalConnections += stats.total
2✔
202
        totalInUse += stats.used
2✔
203
      }
204
      // Yield to the event loop between chunks
205
      if (++chunkCount % STATS_CHUNK_SIZE === 0) {
2!
UNCOV
206
        await new Promise<void>((resolve) => setImmediate(resolve))
×
207
      }
208
    }
209

210
    cachedPoolStats = {
5✔
211
      poolCount,
212
      totalConnections,
213
      totalInUse,
214
    }
215
  } finally {
216
    collectInProgress = false
5✔
217
  }
218
}
219

220
/**
221
 * PoolManager is a class that manages a pool of Knex connections.
222
 * It creates a new pool for each tenant and reuses existing pools.
223
 */
224
export class PoolManager {
225
  protected numWorkers: number = 1
77✔
226

227
  setNumWorkers(numWorkers: number) {
UNCOV
228
    this.numWorkers = Math.max(numWorkers ?? 1, 1)
×
229
  }
230

231
  monitor() {
232
    // Periodically collect stats in a non-blocking way
233
    const interval = setInterval(() => {
1✔
234
      void collectPoolStats()
5✔
235
    }, STATS_INTERVAL_MS)
236
    interval.unref()
1✔
237

238
    // Observable callback reads the cached snapshot — O(1)
239
    meter.addBatchObservableCallback(
1✔
240
      (observer) => {
241
        if (isMetricEnabled('db_active_local_pools')) {
1!
242
          observer.observe(dbActivePool, cachedPoolStats.poolCount)
1✔
243
        }
244
        if (isMetricEnabled('db_connections')) {
1!
245
          observer.observe(dbActiveConnection, cachedPoolStats.totalConnections)
1✔
246
        }
247
        if (isMetricEnabled('db_connections_in_use')) {
1!
248
          observer.observe(dbInUseConnection, cachedPoolStats.totalInUse)
1✔
249
        }
250
      },
251
      [dbActivePool, dbActiveConnection, dbInUseConnection]
252
    )
253
  }
254

255
  rebalanceAll(data: { clusterSize: number }) {
256
    for (const pool of tenantPools.values()) {
1✔
257
      pool.rebalance({
2✔
258
        clusterSize: data.clusterSize,
259
      })
260
    }
261
  }
262

263
  rebalance(tenantId: string, data: PoolRebalanceOptions) {
264
    const pool = tenantPools.get(tenantId)
1✔
265
    if (pool) {
1!
UNCOV
266
      pool.rebalance({ ...data })
×
267
    }
268
  }
269

270
  getPool(settings: TenantConnectionOptions) {
271
    const isCacheable = (settings.isSingleUse && !settings.isExternalPool) || !settings.isSingleUse
1,316✔
272
    const { value: existingPool, outcome } = tenantPools.getWithOutcome(settings.tenantId)
1,316✔
273

274
    if (existingPool) {
1,316✔
275
      recordTenantPoolCacheRequest(outcome)
7✔
276
      logTenantPoolCacheLookup(settings, isCacheable, outcome)
7✔
277

278
      return existingPool
7✔
279
    }
280

281
    if (!isCacheable) {
1,309✔
282
      return this.newPool({ ...settings, numWorkers: this.numWorkers })
1,281✔
283
    }
284

285
    recordTenantPoolCacheRequest(outcome)
28✔
286
    logTenantPoolCacheLookup(settings, isCacheable, outcome)
28✔
287

288
    const newPool = this.newPool({ ...settings, numWorkers: this.numWorkers })
28✔
289

290
    tenantPools.set(settings.tenantId, newPool)
28✔
291
    return newPool
28✔
292
  }
293

294
  destroy(tenantId: string) {
295
    const pool = tenantPools.get(tenantId)
18✔
296
    if (pool) {
18✔
297
      manuallyDestroyedPools.add(pool)
2✔
298
      tenantPools.delete(tenantId)
2✔
299
      return destroyPool(pool).finally(() => {
2✔
300
        manuallyDestroyedPools.delete(pool)
2✔
301
      })
302
    }
303
    return Promise.resolve()
16✔
304
  }
305

306
  destroyAll() {
307
    const promises: Promise<void>[] = []
17✔
308

309
    for (const [connectionString, pool] of tenantPools) {
17✔
310
      manuallyDestroyedPools.add(pool)
15✔
311
      tenantPools.delete(connectionString)
15✔
312
      promises.push(
15✔
313
        destroyPool(pool).finally(() => {
314
          manuallyDestroyedPools.delete(pool)
15✔
315
        })
316
      )
317
    }
318
    return Promise.allSettled(promises)
17✔
319
  }
320

321
  protected newPool(settings: TenantConnectionOptions): PoolStrategy {
322
    return new TenantPool(settings)
1,282✔
323
  }
324
}
325

326
/**
327
 * TenantPool create a new Knex pool for each tenant, with rebalance
328
 * functionality to adjust the number of connections based on the cluster size.
329
 */
330
class TenantPool implements PoolStrategy {
331
  protected pool?: Knex
332

333
  constructor(protected readonly options: TenantConnectionOptions) {}
1,282✔
334

335
  acquire() {
336
    if (this.pool) {
3,515✔
337
      return this.pool
2,276✔
338
    }
339

340
    this.pool = this.createKnexPool()
1,239✔
341
    return this.pool
1,239✔
342
  }
343

344
  destroy(): Promise<void> {
345
    const originalPool = this.pool
1,251✔
346

347
    if (!originalPool) {
1,251✔
348
      return Promise.resolve()
51✔
349
    }
350

351
    this.pool = undefined
1,200✔
352
    return this.drainPool(originalPool)
1,200✔
353
  }
354

355
  getPoolStats(): PoolStats | null {
UNCOV
356
    const tarnPool = this.pool?.client?.pool
×
UNCOV
357
    if (!tarnPool) return null
×
UNCOV
358
    return {
×
359
      used: tarnPool.numUsed(),
360
      total: tarnPool.numUsed() + tarnPool.numFree(),
361
    }
362
  }
363

364
  getSettings() {
365
    const isSingleUseExternalPool = this.options.isSingleUse && this.options.isExternalPool
1,239✔
366

367
    const numWorkers = Math.max(this.options.numWorkers ?? 1, 1)
1,239!
368
    const clusterSize = this.options.clusterSize || 0
1,239✔
369
    let maxConnection = this.options.maxConnections || databaseMaxConnections
1,239✔
370

371
    const divisor = Math.max(clusterSize, 1) * numWorkers
1,239✔
372
    if (divisor > 1) {
1,239!
UNCOV
373
      maxConnection = Math.ceil(maxConnection / divisor) || 1
×
374
    }
375

376
    if (isSingleUseExternalPool) {
1,239✔
377
      maxConnection = 1
1,234✔
378
    }
379

380
    return {
1,239✔
381
      ...this.options,
382
      searchPath: this.options.isExternalPool ? undefined : searchPath,
1,239✔
383
      idleTimeoutMillis: isSingleUseExternalPool ? 100 : databaseFreePoolAfterInactivity,
1,239✔
384
      reapIntervalMillis: isSingleUseExternalPool ? 50 : undefined,
1,239✔
385
      maxConnections: maxConnection,
386
    }
387
  }
388

389
  rebalance(options: PoolRebalanceOptions) {
390
    let shouldReplacePool = false
1✔
391

392
    if (options.clusterSize !== undefined && options.clusterSize !== 0) {
1!
393
      this.options.clusterSize = options.clusterSize
×
394
      shouldReplacePool = true
×
395
    }
396

397
    if (options.maxConnections !== undefined) {
1!
398
      this.options.maxConnections = options.maxConnections
1✔
399
      shouldReplacePool = true
1✔
400
    }
401

402
    if (!shouldReplacePool) {
1!
UNCOV
403
      return
×
404
    }
405

406
    const originalPool = this.pool
1✔
407
    this.pool = undefined
1✔
408

409
    if (originalPool) {
1!
410
      this.drainPool(originalPool).catch((e) => {
1✔
UNCOV
411
        logSchema.error(logger, 'Error draining tenant pool', {
×
412
          type: 'pool',
413
          error: e,
414
        })
415
      })
416
    }
417
  }
418

419
  protected async drainPool(pool: Knex) {
420
    for (; pool?.client?.pool; ) {
1,201✔
421
      let waiting = 0
1,201✔
422
      waiting += pool.client.pool.numPendingAcquires()
1,201✔
423
      waiting += pool.client.pool.numPendingValidations()
1,201✔
424
      waiting += pool.client.pool.numPendingCreates()
1,201✔
425

426
      if (waiting === 0) {
1,201!
427
        break
1,201✔
428
      }
429

UNCOV
430
      await wait(200)
×
431
    }
432

433
    return pool.destroy()
1,201✔
434
  }
435

436
  protected createKnexPool() {
437
    const settings = this.getSettings()
1,239✔
438
    const sslSettings = getSslSettings({
1,239✔
439
      connectionString: settings.dbUrl,
440
      databaseSSLRootCert,
441
    })
442

443
    const maxConnections = settings.maxConnections
1,239✔
444

445
    return knex({
1,239✔
446
      client: 'pg',
447
      version: dbPostgresVersion,
448
      searchPath: settings.searchPath,
449
      pool: {
450
        min: 0,
451
        max: maxConnections,
452
        acquireTimeoutMillis: databaseConnectionTimeout,
453
        idleTimeoutMillis: settings.idleTimeoutMillis,
454
        reapIntervalMillis: 1000,
455
      },
456
      connection: {
457
        connectionString: settings.dbUrl,
458
        connectionTimeoutMillis: databaseConnectionTimeout,
459
        ssl: sslSettings ? { ...sslSettings } : undefined,
1,239!
460
        application_name: databaseApplicationName,
461
      },
462
      acquireConnectionTimeout: databaseConnectionTimeout,
463
    })
464
  }
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc