• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 22342374672

24 Feb 2026 08:18AM UTC coverage: 76.202% (-0.06%) from 76.262%
22342374672

Pull #865

github

web-flow
Merge a2fc6c26b into 2a4fa0a84
Pull Request #865: feat: support pgbouncer for multitenant database

2153 of 3133 branches covered (68.72%)

Branch coverage included in aggregate %.

37 of 75 new or added lines in 5 files covered. (49.33%)

2 existing lines in 1 file now uncovered.

26322 of 34235 relevant lines covered (76.89%)

95.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.33
/src/internal/queue/database.ts
1
import { Db } from 'pg-boss'
1✔
2
import pg from 'pg'
1✔
3
import { ERRORS } from '@internal/errors'
1✔
4
import { Knex } from 'knex'
1✔
5
import EventEmitter from 'node:events'
1✔
6

1✔
7
export class QueueDB extends EventEmitter implements Db {
1✔
8
  opened = false
1✔
9
  isOurs = true
1✔
10
  events = {
1✔
11
    error: 'error',
1✔
12
  }
1✔
13
  protected config: pg.PoolConfig
1✔
14
  protected pool?: pg.Pool
1✔
15

1✔
16
  constructor(config: pg.PoolConfig) {
1✔
17
    super()
×
18

×
19
    config.application_name = config.application_name || 'pgboss'
×
20

×
21
    this.config = config
×
22
  }
×
23

1✔
24
  async open() {
1✔
25
    this.pool = new pg.Pool({ ...this.config, min: 0 })
×
26
    this.pool.on('error', (error) => this.emit('error', error))
×
27

×
28
    this.opened = true
×
29
  }
×
30

1✔
31
  async close() {
1✔
32
    this.opened = false
×
33
    await this.pool?.end()
×
34
  }
×
35

1✔
36
  protected async useTransaction<T>(fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
1✔
NEW
37
    if (!this.opened || !this.pool) {
×
NEW
38
      throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened}`)
×
NEW
39
    }
×
NEW
40

×
NEW
41
    const client = await this.pool.connect()
×
NEW
42
    try {
×
NEW
43
      await client.query('BEGIN')
×
NEW
44

×
NEW
45
      if (this.config.statement_timeout && this.config.statement_timeout > 0) {
×
NEW
46
        await client.query(`SET LOCAL statement_timeout = ${this.config.statement_timeout}`)
×
NEW
47
      }
×
NEW
48

×
NEW
49
      const result = await fn(client)
×
NEW
50
      await client.query('COMMIT')
×
NEW
51
      return result
×
NEW
52
    } catch (err) {
×
NEW
53
      await client.query('ROLLBACK').catch(() => {})
×
NEW
54
      throw err
×
NEW
55
    } finally {
×
NEW
56
      client.release()
×
NEW
57
    }
×
NEW
58
  }
×
59

1✔
60
  async executeSql(text: string, values: any[]) {
1✔
61
    if (this.opened && this.pool) {
×
NEW
62
      return this.useTransaction((client) => client.query(text, values))
×
63
    }
×
64

×
65
    throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened} ${text}`)
×
66
  }
×
67
}
1✔
68

1✔
69
export class KnexQueueDB extends EventEmitter implements Db {
1✔
70
  events = {
1✔
71
    error: 'error',
1✔
72
  }
1✔
73

1✔
74
  constructor(protected readonly knex: Knex) {
1✔
75
    super()
×
76
  }
×
77

1✔
78
  async executeSql(text: string, values: any[]): Promise<{ rows: any[] }> {
1✔
79
    const knexQuery = text.replaceAll('$', ':')
×
80
    const params: Record<string, any> = {}
×
81

×
82
    values.forEach((value, index) => {
×
83
      const key = (index + 1).toString()
×
84
      params[key] = value === undefined ? null : value
×
85
    })
×
86
    const result = await this.knex.raw(knexQuery, params)
×
87
    return { rows: result.rows }
×
88
  }
×
89
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc