• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 26887874376

03 Jun 2026 01:27PM UTC coverage: 75.379% (+0.03%) from 75.354%
26887874376

push

github

web-flow
refactor: get rid of pg queue app name (#959)

4266 of 6233 branches covered (68.44%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

8449 of 10635 relevant lines covered (79.45%)

366.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/internal/queue/database.ts
1
import EventEmitter from 'node:events'
2
import { ERRORS } from '@internal/errors'
3
import { Knex } from 'knex'
4
import pg from 'pg'
5
import { Db } from 'pg-boss'
6

7
export class QueueDB extends EventEmitter implements Db {
8
  opened = false
×
9
  isOurs = true
×
10
  events = {
×
11
    error: 'error',
12
  }
13
  protected config: pg.PoolConfig
14
  protected pool?: pg.Pool
15

16
  constructor(config: pg.PoolConfig) {
17
    super()
×
UNCOV
18
    this.config = config
×
19
  }
20

21
  async open() {
22
    this.pool = new pg.Pool({ ...this.config, min: 0 })
×
23
    this.pool.on('error', (error) => this.emit('error', error))
×
24

25
    this.opened = true
×
26
  }
27

28
  async close() {
29
    this.opened = false
×
30
    await this.pool?.end()
×
31
  }
32

33
  protected async useTransaction<T>(fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
34
    if (!this.opened || !this.pool) {
×
35
      throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened}`)
×
36
    }
37

38
    const client = await this.pool.connect()
×
39

40
    // Create a promise that rejects if the client emits an error
41
    // (e.g. connection lost, statement_timeout at the backend level)
42
    let clientError: Error | undefined
43
    const onError = (e: Error) => {
×
44
      clientError = e
×
45
    }
46
    client.on('error', onError)
×
47

48
    try {
×
49
      await client.query('BEGIN')
×
50

51
      if (this.config.statement_timeout && this.config.statement_timeout > 0) {
×
52
        await client.query(`SET LOCAL statement_timeout = ${this.config.statement_timeout}`)
×
53
      }
54

55
      const result = await fn(client)
×
56

57
      if (clientError) {
×
58
        throw clientError
×
59
      }
60

61
      await client.query('COMMIT')
×
62
      return result
×
63
    } catch (err) {
64
      const rollbackErr = await client.query('ROLLBACK').catch((e) => e as Error)
×
65

66
      const errors = [err as Error, clientError, rollbackErr].filter(
×
67
        (e): e is Error => e instanceof Error
×
68
      )
69

70
      if (errors.length === 1) throw errors[0]
×
71
      throw new AggregateError(errors, 'Queue transaction failed')
×
72
    } finally {
73
      client.off('error', onError)
×
74
      client.release(clientError)
×
75
    }
76
  }
77

78
  async executeSql(text: string, values: unknown[]): Promise<{ rows: unknown[] }> {
79
    if (this.opened && this.pool) {
×
80
      return this.useTransaction((client) => client.query(text, values))
×
81
    }
82

83
    throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened} ${text}`)
×
84
  }
85
}
86

87
export class KnexQueueDB extends EventEmitter implements Db {
88
  events = {
×
89
    error: 'error',
90
  }
91

92
  constructor(protected readonly knex: Knex) {
×
93
    super()
×
94
  }
95

96
  async executeSql(text: string, values: unknown[]): Promise<{ rows: unknown[] }> {
97
    const knexQuery = text.replaceAll('$', ':')
×
98
    const params: Record<string, unknown> = {}
×
99

100
    values.forEach((value, index) => {
×
101
      const key = (index + 1).toString()
×
102
      params[key] = value === undefined ? null : value
×
103
    })
104
    const result = await this.knex.raw(knexQuery, params)
×
105
    return { rows: result.rows }
×
106
  }
107
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc