• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 23640559358

27 Mar 2026 09:49AM UTC coverage: 79.417% (+0.06%) from 79.36%
23640559358

push

github

web-flow
fix: bump build target (#937)

5107 of 6725 branches covered (75.94%)

Branch coverage included in aggregate %.

0 of 3 new or added lines in 1 file covered. (0.0%)

17 existing lines in 6 files now uncovered.

28747 of 35903 relevant lines covered (80.07%)

588.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.93
/src/internal/queue/database.ts
1
import EventEmitter from 'node:events'
2✔
2
import { ERRORS } from '@internal/errors'
2✔
3
import { Knex } from 'knex'
2✔
4
import pg from 'pg'
2✔
5
import { Db } from 'pg-boss'
2✔
6
import { getConfig } from '../../config'
2✔
7

2✔
8
export class QueueDB extends EventEmitter implements Db {
2✔
9
  opened = false
2✔
UNCOV
10
  isOurs = true
×
UNCOV
11
  events = {
×
UNCOV
12
    error: 'error',
×
UNCOV
13
  }
×
UNCOV
14
  protected config: pg.PoolConfig
×
15
  protected pool?: pg.Pool
2✔
16

2✔
17
  constructor(config: pg.PoolConfig) {
2✔
18
    super()
×
19

×
20
    config.application_name = config.application_name || getConfig().pgQueueApplicationName
×
21

×
22
    this.config = config
×
23
  }
×
24

2✔
25
  async open() {
2✔
26
    this.pool = new pg.Pool({ ...this.config, min: 0 })
×
27
    this.pool.on('error', (error) => this.emit('error', error))
×
28

×
29
    this.opened = true
×
30
  }
×
31

2✔
32
  async close() {
2✔
33
    this.opened = false
×
34
    await this.pool?.end()
×
35
  }
×
36

2✔
37
  protected async useTransaction<T>(fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
2✔
38
    if (!this.opened || !this.pool) {
×
39
      throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened}`)
×
40
    }
×
41

×
42
    const client = await this.pool.connect()
×
43

×
44
    // Create a promise that rejects if the client emits an error
×
45
    // (e.g. connection lost, statement_timeout at the backend level)
×
46
    let clientError: Error | undefined
×
47
    const onError = (e: Error) => {
×
48
      clientError = e
×
49
    }
×
50
    client.on('error', onError)
×
51

×
52
    try {
×
53
      await client.query('BEGIN')
×
54

×
55
      if (this.config.statement_timeout && this.config.statement_timeout > 0) {
×
56
        await client.query(`SET LOCAL statement_timeout = ${this.config.statement_timeout}`)
×
57
      }
×
58

×
59
      const result = await fn(client)
×
60

×
61
      if (clientError) {
×
62
        throw clientError
×
63
      }
×
64

×
65
      await client.query('COMMIT')
×
66
      return result
×
67
    } catch (err) {
×
68
      const rollbackErr = await client.query('ROLLBACK').catch((e) => e as Error)
×
69

×
70
      const errors = [err as Error, clientError, rollbackErr].filter(
×
71
        (e): e is Error => e instanceof Error
×
72
      )
×
73

×
74
      if (errors.length === 1) throw errors[0]
×
75
      throw new AggregateError(errors, 'Queue transaction failed')
×
76
    } finally {
×
77
      client.off('error', onError)
×
78
      client.release(clientError)
×
79
    }
×
80
  }
×
81

2✔
82
  async executeSql(text: string, values: any[]) {
2✔
83
    if (this.opened && this.pool) {
×
84
      return this.useTransaction((client) => client.query(text, values))
×
85
    }
×
86

×
87
    throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened} ${text}`)
×
88
  }
×
89
}
2✔
90

2✔
91
export class KnexQueueDB extends EventEmitter implements Db {
2✔
92
  events = {
2✔
93
    error: 'error',
2✔
94
  }
2✔
95

2✔
96
  constructor(protected readonly knex: Knex) {
2✔
97
    super()
×
98
  }
×
99

2✔
100
  async executeSql(text: string, values: any[]): Promise<{ rows: any[] }> {
2✔
101
    const knexQuery = text.replaceAll('$', ':')
×
102
    const params: Record<string, any> = {}
×
103

×
104
    values.forEach((value, index) => {
×
105
      const key = (index + 1).toString()
×
106
      params[key] = value === undefined ? null : value
×
107
    })
×
108
    const result = await this.knex.raw(knexQuery, params)
×
109
    return { rows: result.rows }
×
110
  }
×
111
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc