• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 26899901438

03 Jun 2026 04:55PM UTC coverage: 42.425% (-33.0%) from 75.379%
26899901438

Pull #1132

github

web-flow
Merge 9c68b15ac into 0afb09325
Pull Request #1132: fix: add acceptance matrix for vectors

2460 of 6402 branches covered (38.43%)

Branch coverage included in aggregate %.

257 of 446 new or added lines in 27 files covered. (57.62%)

3664 existing lines in 167 files now uncovered.

4857 of 10845 relevant lines covered (44.79%)

34.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.85
/src/internal/database/pool.ts
1
import { type CacheLookupOutcome, createTtlCache, TENANT_POOL_CACHE_NAME } from '@internal/cache'
2
import { wait } from '@internal/concurrency'
3
import { getSslSettings } from '@internal/database/ssl'
4
import { logger, logSchema } from '@internal/monitoring'
5
import {
6
  cacheEvictionsTotal,
7
  cacheRequestsTotal,
8
  dbActiveConnection,
9
  dbActivePool,
10
  dbInUseConnection,
11
  isMetricEnabled,
12
  meter,
13
} from '@internal/monitoring/metrics'
14
import { JWTPayload } from 'jose'
15
import { Knex, knex } from 'knex'
16
import { getConfig } from '../../config'
17

18
const {
19
  isMultitenant,
20
  databaseSSLRootCert,
21
  databaseMaxConnections,
22
  databaseFreePoolAfterInactivity,
23
  databaseConnectionTimeout,
24
  dbSearchPath,
25
  dbPostgresVersion,
26
  databaseApplicationName,
27
  tenantPoolCacheTtlMs,
28
  tenantPoolCacheHitLogSampleRate,
29
  tenantPoolCacheMissLogSampleRate,
57✔
30
} = getConfig()
31

32
export const TENANT_POOL_CACHE_LOOKUP_LOG_TYPE = 'cache'
57✔
33
export const TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE = '[Cache] Tenant pool lookup'
57✔
34

35
/**
36
 * Pool-level settings: everything required to build the underlying Knex/tarn
37
 * pool. No request-scoped fields (JWT, headers, route). Internal callers that
38
 * (re)build a pool outside of a request — e.g. recycle from a tenant-config
39
 * change — operate on this narrower shape.
40
 */
41
export interface PoolOptions {
42
  tenantId: string
43
  dbUrl: string
44
  isExternalPool?: boolean
45
  isSingleUse?: boolean
46
  idleTimeoutMillis?: number
47
  reapIntervalMillis?: number
48
  maxConnections: number
49
  clusterSize?: number
50
  numWorkers?: number
51
}
52

53
/**
54
 * Full settings for a tenant connection: pool options plus the request-scoped
55
 * JWT/route context that `TenantConnection` consumes when running queries.
56
 */
57
export interface TenantConnectionOptions extends PoolOptions {
58
  user: User
59
  superUser: User
60
  headers?: Record<string, string | undefined | string[]>
61
  method?: string
62
  path?: string
63
  operation?: () => string | undefined
64
}
65

66
export interface User {
67
  jwt: string
68
  payload: { role?: string } & JWTPayload
69
}
70

71
export interface PoolStats {
72
  used: number
73
  total: number
74
}
75

76
export interface PoolRebalanceOptions {
77
  clusterSize?: number
78
  maxConnections?: number
79
}
80

81
export interface PoolStrategy {
82
  acquire(): Knex
83
  rebalance(options: PoolRebalanceOptions): void
84
  destroy(): Promise<void>
85
  getPoolStats(): PoolStats | null
86
}
87

88
export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
57✔
89
  Boolean
90
)
91

92
const multiTenantTtlConfig = {
57✔
93
  ttl: tenantPoolCacheTtlMs,
94
  updateAgeOnGet: true,
95
  checkAgeOnGet: true,
96
}
97

98
const manuallyDestroyedPools = new WeakSet<PoolStrategy>()
57✔
99

100
type RebalanceableTarnPool = {
101
  max: number
102
  _tryAcquireOrCreate?: () => void
103
}
104

105
function logPoolDestroyError(error: unknown): void {
106
  logSchema.error(logger, 'pool was not able to be destroyed', {
×
107
    type: 'db',
108
    error,
109
  })
110
}
111

112
async function destroyPool(pool: PoolStrategy): Promise<void> {
113
  await pool.destroy()
51✔
114
}
115

116
async function destroyPoolSafely(pool: PoolStrategy): Promise<void> {
117
  try {
13✔
118
    await destroyPool(pool)
13✔
119
  } catch (e) {
120
    logPoolDestroyError(e)
×
121
  }
122
}
123

124
function recordTenantPoolCacheEviction(reason: string): void {
125
  // Explicit destroy paths are filtered before this helper is called.
126
  if (reason === 'stale' || reason === 'evict' || reason === 'delete') {
7!
127
    cacheEvictionsTotal.add(1, {
7✔
128
      cache: TENANT_POOL_CACHE_NAME,
129
    })
130
  }
131
}
132

133
function recordTenantPoolCacheRequest(outcome: string): void {
134
  cacheRequestsTotal.add(1, {
54✔
135
    cache: TENANT_POOL_CACHE_NAME,
136
    outcome,
137
  })
138
}
139

140
function shouldLogTenantPoolCacheLookup(sampleRate: number): boolean {
141
  return sampleRate >= 1 || (sampleRate > 0 && Math.random() < sampleRate)
54!
142
}
143

144
function logTenantPoolCacheLookup(
145
  settings: TenantConnectionOptions,
146
  isCacheable: boolean,
147
  outcome: CacheLookupOutcome
148
): void {
149
  const sampleRate =
150
    outcome === 'hit' ? tenantPoolCacheHitLogSampleRate : tenantPoolCacheMissLogSampleRate
54✔
151

152
  if (!shouldLogTenantPoolCacheLookup(sampleRate)) {
54✔
153
    return
52✔
154
  }
155

156
  const log = {
2✔
157
    type: TENANT_POOL_CACHE_LOOKUP_LOG_TYPE,
158
    cache: TENANT_POOL_CACHE_NAME,
159
    tenantId: settings.tenantId,
160
    project: settings.tenantId,
161
    outcome,
162
    sampleRate,
163
    sampleWeight: 1 / sampleRate,
164
    isCacheable,
165
    isExternalPool: Boolean(settings.isExternalPool),
166
    isSingleUse: Boolean(settings.isSingleUse),
167
  }
168

169
  logSchema.info(logger, TENANT_POOL_CACHE_LOOKUP_LOG_MESSAGE, log)
2✔
170
}
171

172
const tenantPools = createTtlCache<string, PoolStrategy>({
57✔
173
  ...(isMultitenant ? multiTenantTtlConfig : { max: 1, ttl: Infinity }),
57✔
174
  dispose: async (pool, _tenantId, reason) => {
175
    if (!pool || manuallyDestroyedPools.has(pool)) {
51✔
176
      return
44✔
177
    }
178

179
    recordTenantPoolCacheEviction(reason)
7✔
180

181
    await destroyPoolSafely(pool)
7✔
182
  },
183
})
184

185
// ============================================================================
186
// Pool stats collection — chunked to avoid blocking the event loop
187
// ============================================================================
188
interface PoolStatsSnapshot {
189
  poolCount: number
190
  totalConnections: number
191
  totalInUse: number
192
}
193

194
const STATS_CHUNK_SIZE = 100
57✔
195
const STATS_INTERVAL_MS = 5_000
57✔
196

197
let cachedPoolStats: PoolStatsSnapshot = {
57✔
198
  poolCount: 0,
199
  totalConnections: 0,
200
  totalInUse: 0,
201
}
202
let collectInProgress = false
57✔
203

204
async function collectPoolStats() {
205
  if (collectInProgress) return
5!
206
  collectInProgress = true
5✔
207

208
  try {
5✔
209
    let poolCount = 0
5✔
210
    let totalConnections = 0
5✔
211
    let totalInUse = 0
5✔
212
    let chunkCount = 0
5✔
213

214
    for (const [, pool] of tenantPools.entries()) {
5✔
215
      poolCount++
2✔
216
      const stats = pool.getPoolStats()
2✔
217
      if (stats) {
2!
218
        totalConnections += stats.total
2✔
219
        totalInUse += stats.used
2✔
220
      }
221
      // Yield to the event loop between chunks
222
      if (++chunkCount % STATS_CHUNK_SIZE === 0) {
2!
223
        await new Promise<void>((resolve) => setImmediate(resolve))
×
224
      }
225
    }
226

227
    cachedPoolStats = {
5✔
228
      poolCount,
229
      totalConnections,
230
      totalInUse,
231
    }
232
  } finally {
233
    collectInProgress = false
5✔
234
  }
235
}
236

237
/**
238
 * PoolManager is a class that manages a pool of Knex connections.
239
 * It creates a new pool for each tenant and reuses existing pools.
240
 */
241
export class PoolManager {
242
  protected numWorkers: number = 1
57✔
243

244
  setNumWorkers(numWorkers: number) {
245
    this.numWorkers = Math.max(numWorkers ?? 1, 1)
×
246
  }
247

248
  monitor() {
249
    // Periodically collect stats in a non-blocking way
250
    const interval = setInterval(() => {
1✔
251
      void collectPoolStats()
5✔
252
    }, STATS_INTERVAL_MS)
253
    interval.unref()
1✔
254

255
    // Observable callback reads the cached snapshot — O(1)
256
    meter.addBatchObservableCallback(
1✔
257
      (observer) => {
258
        if (isMetricEnabled('db_active_local_pools')) {
1!
259
          observer.observe(dbActivePool, cachedPoolStats.poolCount)
1✔
260
        }
261
        if (isMetricEnabled('db_connections')) {
1!
262
          observer.observe(dbActiveConnection, cachedPoolStats.totalConnections)
1✔
263
        }
264
        if (isMetricEnabled('db_connections_in_use')) {
1!
265
          observer.observe(dbInUseConnection, cachedPoolStats.totalInUse)
1✔
266
        }
267
      },
268
      [dbActivePool, dbActiveConnection, dbInUseConnection]
269
    )
270
  }
271

272
  rebalanceAll(data: { clusterSize: number }) {
273
    for (const pool of tenantPools.values()) {
2✔
274
      pool.rebalance({
4✔
275
        clusterSize: data.clusterSize,
276
      })
277
    }
278
  }
279

280
  getPool(settings: TenantConnectionOptions) {
281
    const isCacheable = (settings.isSingleUse && !settings.isExternalPool) || !settings.isSingleUse
57✔
282
    const { value: existingPool, outcome } = tenantPools.getWithOutcome(settings.tenantId)
57✔
283

284
    if (existingPool) {
57✔
285
      recordTenantPoolCacheRequest(outcome)
9✔
286
      logTenantPoolCacheLookup(settings, isCacheable, outcome)
9✔
287

288
      return existingPool
9✔
289
    }
290

291
    if (!isCacheable) {
48✔
292
      return this.newPool({ ...settings, numWorkers: this.numWorkers })
3✔
293
    }
294

295
    recordTenantPoolCacheRequest(outcome)
45✔
296
    logTenantPoolCacheLookup(settings, isCacheable, outcome)
45✔
297

298
    const newPool = this.newPool({ ...settings, numWorkers: this.numWorkers })
45✔
299

300
    tenantPools.set(settings.tenantId, newPool)
45✔
301
    return newPool
45✔
302
  }
303

304
  destroy(tenantId: string) {
305
    const pool = tenantPools.get(tenantId)
3✔
306
    if (pool) {
3!
307
      manuallyDestroyedPools.add(pool)
3✔
308
      tenantPools.delete(tenantId)
3✔
309
      return destroyPool(pool).finally(() => {
3✔
310
        manuallyDestroyedPools.delete(pool)
3✔
311
      })
312
    }
UNCOV
313
    return Promise.resolve()
×
314
  }
315

316
  /**
317
   * Replace the cached pool for a tenant with a fresh one built from `settings`.
318
   *
319
   * The new pool is swapped into the cache synchronously so any subsequent
320
   * `getPool()` call observes it immediately. The old pool (if any) is drained
321
   * and destroyed in the background via `TenantPool.destroy()` → `drainPool`,
322
   * which waits for in-flight acquires/queries to settle before tearing down.
323
   *
324
   * Use this for tenant-config changes where the underlying DB endpoint is
325
   * unchanged but the pool needs to be rebuilt with new settings (e.g.
326
   * `maxConnections`). For endpoint changes (dbUrl/credentials), use
327
   * `destroy()` instead — the next `getPool()` will rebuild from current
328
   * settings rather than reusing the cached connection string.
329
   */
330
  recycle(tenantId: string, settings: PoolOptions): PoolStrategy {
331
    const oldPool = tenantPools.get(tenantId)
7✔
332
    const newPool = this.newPool({ ...settings, numWorkers: this.numWorkers })
7✔
333

334
    // Mark the old pool so the cache `dispose` hook (fired by the `set` below
335
    // for the replaced entry) doesn't race our explicit destroy.
336
    if (oldPool) {
7✔
337
      manuallyDestroyedPools.add(oldPool)
6✔
338
    }
339

340
    tenantPools.set(tenantId, newPool)
7✔
341

342
    if (oldPool) {
7✔
343
      void destroyPoolSafely(oldPool).finally(() => {
6✔
344
        manuallyDestroyedPools.delete(oldPool)
6✔
345
      })
346
    }
347

348
    return newPool
7✔
349
  }
350

351
  destroyAll() {
352
    const promises: Promise<void>[] = []
36✔
353

354
    for (const [connectionString, pool] of tenantPools) {
36✔
355
      manuallyDestroyedPools.add(pool)
35✔
356
      tenantPools.delete(connectionString)
35✔
357
      promises.push(
35✔
358
        destroyPool(pool).finally(() => {
359
          manuallyDestroyedPools.delete(pool)
35✔
360
        })
361
      )
362
    }
363
    return Promise.allSettled(promises)
36✔
364
  }
365

366
  protected newPool(settings: PoolOptions): PoolStrategy {
367
    return new TenantPool(settings)
16✔
368
  }
369
}
370

371
/**
372
 * TenantPool create a new Knex pool for each tenant, with rebalance
373
 * functionality to adjust the number of connections based on the cluster size.
374
 */
375
class TenantPool implements PoolStrategy {
376
  protected pool?: Knex
377

378
  constructor(protected readonly options: PoolOptions) {}
16✔
379

380
  acquire() {
381
    if (this.pool) {
33✔
382
      return this.pool
17✔
383
    }
384

385
    this.pool = this.createKnexPool()
16✔
386
    return this.pool
16✔
387
  }
388

389
  destroy(): Promise<void> {
390
    const originalPool = this.pool
16✔
391

392
    if (!originalPool) {
16!
UNCOV
393
      return Promise.resolve()
×
394
    }
395

396
    this.pool = undefined
16✔
397
    return this.drainPool(originalPool)
16✔
398
  }
399

400
  getPoolStats(): PoolStats | null {
401
    const tarnPool = this.pool?.client?.pool
×
402
    if (!tarnPool) return null
×
403
    return {
×
404
      used: tarnPool.numUsed(),
405
      total: tarnPool.numUsed() + tarnPool.numFree(),
406
    }
407
  }
408

409
  getSettings() {
410
    const isSingleUseExternalPool = this.options.isSingleUse && this.options.isExternalPool
43!
411

412
    const numWorkers = Math.max(this.options.numWorkers ?? 1, 1)
43!
413
    const clusterSize = this.options.clusterSize || 0
43✔
414
    let maxConnection = this.options.maxConnections || databaseMaxConnections
43✔
415

416
    const divisor = Math.max(clusterSize, 1) * numWorkers
43✔
417
    if (divisor > 1) {
43✔
418
      maxConnection = Math.ceil(maxConnection / divisor) || 1
10!
419
    }
420

421
    if (isSingleUseExternalPool) {
43!
UNCOV
422
      maxConnection = 1
×
423
    }
424

425
    return {
43✔
426
      ...this.options,
427
      searchPath: this.options.isExternalPool ? undefined : searchPath,
43!
428
      idleTimeoutMillis: isSingleUseExternalPool ? 100 : databaseFreePoolAfterInactivity,
43!
429
      reapIntervalMillis: isSingleUseExternalPool ? 50 : undefined,
43!
430
      maxConnections: maxConnection,
431
    }
432
  }
433

434
  rebalance(options: PoolRebalanceOptions) {
435
    let shouldUpdatePoolMax = false
32✔
436

437
    if (options.clusterSize !== undefined && options.clusterSize !== 0) {
32✔
438
      this.options.clusterSize = options.clusterSize
7✔
439
      shouldUpdatePoolMax = true
7✔
440
    }
441

442
    if (options.maxConnections !== undefined) {
32✔
443
      this.options.maxConnections = options.maxConnections
22✔
444
      shouldUpdatePoolMax = true
22✔
445
    }
446

447
    if (!shouldUpdatePoolMax) {
32✔
448
      return
4✔
449
    }
450

451
    const tarnPool = this.pool?.client?.pool as RebalanceableTarnPool | undefined
28✔
452
    if (tarnPool) {
32✔
453
      tarnPool.max = this.getSettings().maxConnections
27✔
454
      tarnPool._tryAcquireOrCreate?.()
27✔
455
    }
456
  }
457

458
  protected async drainPool(pool: Knex) {
459
    for (; pool?.client?.pool; ) {
16✔
460
      let waiting = 0
16✔
461
      waiting += pool.client.pool.numPendingAcquires()
16✔
462
      waiting += pool.client.pool.numPendingValidations()
16✔
463
      waiting += pool.client.pool.numPendingCreates()
16✔
464

465
      if (waiting === 0) {
16!
466
        break
16✔
467
      }
468

469
      await wait(200)
×
470
    }
471

472
    return pool.destroy()
16✔
473
  }
474

475
  protected createKnexPool() {
476
    const settings = this.getSettings()
16✔
477
    const sslSettings = getSslSettings({
16✔
478
      connectionString: settings.dbUrl,
479
      databaseSSLRootCert,
480
    })
481

482
    const maxConnections = settings.maxConnections
16✔
483

484
    return knex({
16✔
485
      client: 'pg',
486
      version: dbPostgresVersion,
487
      searchPath: settings.searchPath,
488
      pool: {
489
        min: 0,
490
        max: maxConnections,
491
        acquireTimeoutMillis: databaseConnectionTimeout,
492
        idleTimeoutMillis: settings.idleTimeoutMillis,
493
        reapIntervalMillis: 1000,
494
      },
495
      connection: {
496
        connectionString: settings.dbUrl,
497
        connectionTimeoutMillis: databaseConnectionTimeout,
498
        ssl: sslSettings ? { ...sslSettings } : undefined,
16!
499
        application_name: databaseApplicationName,
500
      },
501
      acquireConnectionTimeout: databaseConnectionTimeout,
502
    })
503
  }
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc