• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 15387080396

02 Jun 2025 08:04AM UTC coverage: 77.893% (-0.3%) from 78.159%
15387080396

Pull #696

github

web-flow
Merge f3f847646 into d82ebecc1
Pull Request #696: feat: pgboss v10

1536 of 2137 branches covered (71.88%)

Branch coverage included in aggregate %.

166 of 436 new or added lines in 28 files covered. (38.07%)

60 existing lines in 4 files now uncovered.

17374 of 22140 relevant lines covered (78.47%)

110.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.08
/src/internal/database/pool.ts
1
import { getConfig } from '../../config'
1✔
2
import TTLCache from '@isaacs/ttlcache'
1✔
3
import { knex, Knex } from 'knex'
1✔
4
import { logger, logSchema } from '@internal/monitoring'
1✔
5
import { getSslSettings } from '@internal/database/util'
1✔
6
import { wait } from '@internal/concurrency'
1✔
7
import { JWTPayload } from 'jose'
1✔
8
import { DbActivePool } from '@internal/monitoring/metrics'
1✔
9

1✔
10
const {
1✔
11
  region,
1✔
12
  isMultitenant,
1✔
13
  databaseSSLRootCert,
1✔
14
  databaseMaxConnections,
1✔
15
  databaseFreePoolAfterInactivity,
1✔
16
  databaseConnectionTimeout,
1✔
17
  dbSearchPath,
1✔
18
  dbPostgresVersion,
1✔
19
} = getConfig()
1✔
20

1✔
21
export interface TenantConnectionOptions {
1✔
22
  user: User
1✔
23
  superUser: User
1✔
24

1✔
25
  tenantId: string
1✔
26
  dbUrl: string
1✔
27
  isExternalPool?: boolean
1✔
28
  isSingleUse?: boolean
1✔
29
  idleTimeoutMillis?: number
1✔
30
  reapIntervalMillis?: number
1✔
31
  maxConnections: number
1✔
32
  clusterSize?: number
1✔
33
  headers?: Record<string, string | undefined | string[]>
1✔
34
  method?: string
1✔
35
  path?: string
1✔
36
  operation?: () => string | undefined
1✔
37
}
1✔
38

1✔
39
export interface User {
1✔
40
  jwt: string
1✔
41
  payload: { role?: string } & JWTPayload
1✔
42
}
1✔
43

1✔
44
export interface PoolStrategy {
1✔
45
  acquire(): Knex
1✔
46
  rebalance(options: { clusterSize: number }): void
1✔
47
  destroy(): Promise<void>
1✔
48
}
1✔
49

1✔
50
export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.split(',')].filter(
1✔
51
  Boolean
1✔
52
)
1✔
53

1✔
54
const multiTenantLRUConfig = {
1✔
55
  ttl: 1000 * 10,
1✔
56
  updateAgeOnGet: true,
1✔
57
  checkAgeOnGet: true,
1✔
58
}
1✔
59

1✔
60
const tenantPools = new TTLCache<string, PoolStrategy>({
1✔
61
  ...(isMultitenant ? multiTenantLRUConfig : { max: 1, ttl: Infinity }),
1!
62
  dispose: async (pool) => {
1✔
63
    if (!pool) return
×
64
    try {
×
65
      await pool.destroy()
×
66
    } catch (e) {
×
67
      logSchema.error(logger, 'pool was not able to be destroyed', {
×
68
        type: 'db',
×
69
        error: e,
×
70
      })
×
71
    }
×
72
  },
1✔
73
})
1✔
74

1✔
75
/**
1✔
76
 * PoolManager is a class that manages a pool of Knex connections.
1✔
77
 * It creates a new pool for each tenant and reuses existing pools.
1✔
78
 */
1✔
79
export class PoolManager {
1✔
80
  monitor(signal: AbortSignal) {
1✔
81
    const monitorInterval = setInterval(() => {
×
82
      DbActivePool.set(
×
83
        {
×
84
          region,
×
85
        },
×
86
        tenantPools.size
×
87
      )
×
88
    }, 2000)
×
89

×
90
    signal.addEventListener(
×
91
      'abort',
×
92
      () => {
×
93
        clearInterval(monitorInterval)
×
94
      },
×
95
      { once: true }
×
96
    )
×
97
  }
×
98

1✔
99
  rebalanceAll(data: { clusterSize: number }) {
1✔
100
    for (const pool of tenantPools.values()) {
×
101
      pool.rebalance({
×
102
        clusterSize: data.clusterSize,
×
103
      })
×
104
    }
×
105
  }
×
106

1✔
107
  rebalance(tenantId: string, data: { clusterSize: number }) {
1✔
108
    const pool = tenantPools.get(tenantId)
×
109
    if (pool) {
×
110
      pool.rebalance({
×
111
        clusterSize: data.clusterSize,
×
112
      })
×
113
    }
×
114
  }
×
115

1✔
116
  getPool(settings: TenantConnectionOptions) {
1✔
117
    const existingPool = tenantPools.get(settings.tenantId)
135✔
118
    if (existingPool) {
135✔
119
      return existingPool
134✔
120
    }
134✔
121

1✔
122
    const newPool = this.newPool(settings)
1✔
123

1✔
124
    if ((settings.isSingleUse && !settings.isExternalPool) || !settings.isSingleUse) {
135!
125
      tenantPools.set(settings.tenantId, newPool)
1✔
126
    }
1✔
127
    return newPool
1✔
128
  }
1✔
129

1✔
130
  destroy(tenantId: string) {
1✔
131
    const pool = tenantPools.get(tenantId)
×
132
    if (pool) {
×
133
      tenantPools.delete(tenantId)
×
134
      return pool.destroy()
×
135
    }
×
136
    return Promise.resolve()
×
137
  }
×
138

1✔
139
  destroyAll() {
1✔
140
    const promises: Promise<void>[] = []
×
141

×
142
    for (const [connectionString, pool] of tenantPools) {
×
143
      promises.push(pool.destroy())
×
144
      tenantPools.delete(connectionString)
×
145
    }
×
146
    return Promise.allSettled(promises)
×
147
  }
×
148

1✔
149
  protected newPool(settings: TenantConnectionOptions) {
1✔
150
    return new TenantPool(settings)
1✔
151
  }
1✔
152
}
1✔
153

1✔
154
/**
1✔
155
 * TenantPool create a new Knex pool for each tenant, with rebalance
1✔
156
 * functionality to adjust the number of connections based on the cluster size.
1✔
157
 */
1✔
158
class TenantPool implements PoolStrategy {
1✔
159
  protected pool?: Knex
1✔
160

1✔
161
  constructor(protected readonly options: TenantConnectionOptions) {}
1✔
162

1✔
163
  acquire() {
1✔
164
    if (this.pool) {
341✔
165
      return this.pool
340✔
166
    }
340✔
167

1✔
168
    this.pool = this.createKnexPool()
1✔
169
    return this.pool
1✔
170
  }
1✔
171

1✔
172
  destroy(): Promise<void> {
1✔
173
    if (!this.pool) {
×
174
      return Promise.resolve()
×
175
    }
×
176
    return this.drainPool(this.pool)
×
177
  }
×
178

1✔
179
  getSettings() {
1✔
180
    const isSingleUseExternalPool = this.options.isSingleUse && this.options.isExternalPool
1✔
181

1✔
182
    const clusterSize = this.options.clusterSize || 0
1✔
183
    let maxConnection = this.options.maxConnections || databaseMaxConnections
1!
184

1✔
185
    if (clusterSize > 0) {
1!
186
      maxConnection = Math.ceil(maxConnection / clusterSize)
×
187
    }
×
188

1✔
189
    if (isSingleUseExternalPool) {
1!
190
      maxConnection = 1
×
191
    }
×
192

1✔
193
    return {
1✔
194
      ...this.options,
1✔
195
      searchPath: this.options.isExternalPool ? undefined : searchPath,
1!
196
      idleTimeoutMillis: isSingleUseExternalPool ? 100 : databaseFreePoolAfterInactivity,
1!
197
      reapIntervalMillis: isSingleUseExternalPool ? 50 : undefined,
1!
198
      maxConnections: maxConnection,
1✔
199
    }
1✔
200
  }
1✔
201

1✔
202
  rebalance(options: { clusterSize: number }) {
1✔
203
    if (options.clusterSize === 0) {
×
204
      return
×
205
    }
×
206

×
207
    const originalPool = this.pool
×
208

×
209
    this.options.clusterSize = options.clusterSize
×
210
    this.pool = undefined
×
211

×
212
    if (originalPool) {
×
213
      this.drainPool(originalPool).catch((e) => {
×
214
        logger.error({ type: 'pool', error: e })
×
215
      })
×
216
    }
×
217
  }
×
218

1✔
219
  protected async drainPool(pool: Knex) {
1✔
NEW
220
    while (true) {
×
NEW
221
      let waiting = 0
×
NEW
222
      waiting += pool.client.pool.numPendingAcquires()
×
NEW
223
      waiting += pool.client.pool.numPendingValidations()
×
NEW
224
      waiting += pool.client.pool.numPendingCreates()
×
225

×
226
      if (waiting === 0) {
×
227
        break
×
228
      }
×
229

×
230
      await wait(200)
×
231
    }
×
232

×
233
    return pool.destroy()
×
234
  }
×
235

1✔
236
  protected createKnexPool() {
1✔
237
    const settings = this.getSettings()
1✔
238
    const sslSettings = getSslSettings({
1✔
239
      connectionString: settings.dbUrl,
1✔
240
      databaseSSLRootCert,
1✔
241
    })
1✔
242

1✔
243
    const maxConnections = settings.maxConnections
1✔
244

1✔
245
    return knex({
1✔
246
      client: 'pg',
1✔
247
      version: dbPostgresVersion,
1✔
248
      searchPath: settings.searchPath,
1✔
249
      pool: {
1✔
250
        min: 0,
1✔
251
        max: maxConnections,
1✔
252
        acquireTimeoutMillis: databaseConnectionTimeout,
1✔
253
        idleTimeoutMillis: settings.idleTimeoutMillis,
1✔
254
        reapIntervalMillis: 1000,
1✔
255
      },
1✔
256
      connection: {
1✔
257
        connectionString: settings.dbUrl,
1✔
258
        connectionTimeoutMillis: databaseConnectionTimeout,
1✔
259
        ssl: sslSettings ? { ...sslSettings } : undefined,
1!
260
      },
1✔
261
      acquireConnectionTimeout: databaseConnectionTimeout,
1✔
262
    })
1✔
263
  }
1✔
264
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc