• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 22352456550

24 Feb 2026 01:15PM UTC coverage: 76.153% (-0.05%) from 76.205%
22352456550

push

github

web-flow
fix: handle error gracefully (#868)

2153 of 3133 branches covered (68.72%)

Branch coverage included in aggregate %.

0 of 24 new or added lines in 1 file covered. (0.0%)

2 existing lines in 1 file now uncovered.

26322 of 34259 relevant lines covered (76.83%)

94.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.82
/src/internal/queue/database.ts
1
import { Db } from 'pg-boss'
1✔
2
import pg from 'pg'
1✔
3
import { ERRORS } from '@internal/errors'
1✔
4
import { Knex } from 'knex'
1✔
5
import EventEmitter from 'node:events'
1✔
6

1✔
7
export class QueueDB extends EventEmitter implements Db {
1✔
8
  opened = false
1✔
9
  isOurs = true
1✔
10
  events = {
1✔
11
    error: 'error',
1✔
12
  }
1✔
13
  protected config: pg.PoolConfig
1✔
14
  protected pool?: pg.Pool
1✔
15

1✔
16
  constructor(config: pg.PoolConfig) {
1✔
17
    super()
×
18

×
19
    config.application_name = config.application_name || 'pgboss'
×
20

×
21
    this.config = config
×
22
  }
×
23

1✔
24
  async open() {
1✔
25
    this.pool = new pg.Pool({ ...this.config, min: 0 })
×
26
    this.pool.on('error', (error) => this.emit('error', error))
×
27

×
28
    this.opened = true
×
29
  }
×
30

1✔
31
  async close() {
1✔
32
    this.opened = false
×
33
    await this.pool?.end()
×
34
  }
×
35

1✔
36
  protected async useTransaction<T>(fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
1✔
37
    if (!this.opened || !this.pool) {
×
38
      throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened}`)
×
39
    }
×
40

×
41
    const client = await this.pool.connect()
×
NEW
42

×
NEW
43
    // Create a promise that rejects if the client emits an error
×
NEW
44
    // (e.g. connection lost, statement_timeout at the backend level)
×
NEW
45
    let clientError: Error | undefined
×
NEW
46
    const onError = (e: Error) => {
×
NEW
47
      clientError = e
×
NEW
48
    }
×
NEW
49
    client.on('error', onError)
×
NEW
50

×
51
    try {
×
52
      await client.query('BEGIN')
×
53

×
54
      if (this.config.statement_timeout && this.config.statement_timeout > 0) {
×
55
        await client.query(`SET LOCAL statement_timeout = ${this.config.statement_timeout}`)
×
56
      }
×
57

×
58
      const result = await fn(client)
×
NEW
59

×
NEW
60
      if (clientError) {
×
NEW
61
        throw clientError
×
NEW
62
      }
×
NEW
63

×
64
      await client.query('COMMIT')
×
65
      return result
×
66
    } catch (err) {
×
NEW
67
      const rollbackErr = await client.query('ROLLBACK').catch((e) => e as Error)
×
NEW
68

×
NEW
69
      const errors = [err as Error, clientError, rollbackErr].filter(
×
NEW
70
        (e): e is Error => e instanceof Error
×
NEW
71
      )
×
NEW
72

×
NEW
73
      if (errors.length === 1) throw errors[0]
×
NEW
74
      throw new AggregateError(errors, 'Queue transaction failed')
×
75
    } finally {
×
NEW
76
      client.off('error', onError)
×
NEW
77
      client.release(clientError)
×
78
    }
×
79
  }
×
80

1✔
81
  async executeSql(text: string, values: any[]) {
1✔
82
    if (this.opened && this.pool) {
×
83
      return this.useTransaction((client) => client.query(text, values))
×
84
    }
×
85

×
86
    throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened} ${text}`)
×
87
  }
×
88
}
1✔
89

1✔
90
export class KnexQueueDB extends EventEmitter implements Db {
1✔
91
  events = {
1✔
92
    error: 'error',
1✔
93
  }
1✔
94

1✔
95
  constructor(protected readonly knex: Knex) {
1✔
96
    super()
×
97
  }
×
98

1✔
99
  async executeSql(text: string, values: any[]): Promise<{ rows: any[] }> {
1✔
100
    const knexQuery = text.replaceAll('$', ':')
×
101
    const params: Record<string, any> = {}
×
102

×
103
    values.forEach((value, index) => {
×
104
      const key = (index + 1).toString()
×
105
      params[key] = value === undefined ? null : value
×
106
    })
×
107
    const result = await this.knex.raw(knexQuery, params)
×
108
    return { rows: result.rows }
×
109
  }
×
110
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc