• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / storage / 24514857133

16 Apr 2026 02:06PM UTC coverage: 67.909% (-14.7%) from 82.565%
24514857133

Pull #1027

github

web-flow
Merge 5d56f382e into fef8da0ac
Pull Request #1027: fix: drop jest and its dependencies

2963 of 4866 branches covered (60.89%)

Branch coverage included in aggregate %.

30 of 31 new or added lines in 6 files covered. (96.77%)

33 existing lines in 23 files now uncovered.

6399 of 8920 relevant lines covered (71.74%)

404.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/internal/queue/database.ts
1
import EventEmitter from 'node:events'
2
import { ERRORS } from '@internal/errors'
3
import { Knex } from 'knex'
4
import pg from 'pg'
5
import { Db } from 'pg-boss'
6
import { getConfig } from '../../config'
7

8
export class QueueDB extends EventEmitter implements Db {
UNCOV
9
  opened = false
×
10
  isOurs = true
×
11
  events = {
×
12
    error: 'error',
13
  }
14
  protected config: pg.PoolConfig
15
  protected pool?: pg.Pool
16

17
  constructor(config: pg.PoolConfig) {
18
    super()
×
19

20
    config.application_name = config.application_name || getConfig().pgQueueApplicationName
×
21

22
    this.config = config
×
23
  }
24

25
  async open() {
26
    this.pool = new pg.Pool({ ...this.config, min: 0 })
×
27
    this.pool.on('error', (error) => this.emit('error', error))
×
28

29
    this.opened = true
×
30
  }
31

32
  async close() {
33
    this.opened = false
×
34
    await this.pool?.end()
×
35
  }
36

37
  protected async useTransaction<T>(fn: (client: pg.PoolClient) => Promise<T>): Promise<T> {
38
    if (!this.opened || !this.pool) {
×
39
      throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened}`)
×
40
    }
41

42
    const client = await this.pool.connect()
×
43

44
    // Create a promise that rejects if the client emits an error
45
    // (e.g. connection lost, statement_timeout at the backend level)
46
    let clientError: Error | undefined
47
    const onError = (e: Error) => {
×
48
      clientError = e
×
49
    }
50
    client.on('error', onError)
×
51

52
    try {
×
53
      await client.query('BEGIN')
×
54

55
      if (this.config.statement_timeout && this.config.statement_timeout > 0) {
×
56
        await client.query(`SET LOCAL statement_timeout = ${this.config.statement_timeout}`)
×
57
      }
58

59
      const result = await fn(client)
×
60

61
      if (clientError) {
×
62
        throw clientError
×
63
      }
64

65
      await client.query('COMMIT')
×
66
      return result
×
67
    } catch (err) {
68
      const rollbackErr = await client.query('ROLLBACK').catch((e) => e as Error)
×
69

70
      const errors = [err as Error, clientError, rollbackErr].filter(
×
71
        (e): e is Error => e instanceof Error
×
72
      )
73

74
      if (errors.length === 1) throw errors[0]
×
75
      throw new AggregateError(errors, 'Queue transaction failed')
×
76
    } finally {
77
      client.off('error', onError)
×
78
      client.release(clientError)
×
79
    }
80
  }
81

82
  async executeSql(text: string, values: any[]) {
83
    if (this.opened && this.pool) {
×
84
      return this.useTransaction((client) => client.query(text, values))
×
85
    }
86

87
    throw ERRORS.InternalError(undefined, `QueueDB not opened ${this.opened} ${text}`)
×
88
  }
89
}
90

91
export class KnexQueueDB extends EventEmitter implements Db {
UNCOV
92
  events = {
×
93
    error: 'error',
94
  }
95

UNCOV
96
  constructor(protected readonly knex: Knex) {
×
97
    super()
×
98
  }
99

100
  async executeSql(text: string, values: any[]): Promise<{ rows: any[] }> {
101
    const knexQuery = text.replaceAll('$', ':')
×
102
    const params: Record<string, any> = {}
×
103

104
    values.forEach((value, index) => {
×
105
      const key = (index + 1).toString()
×
106
      params[key] = value === undefined ? null : value
×
107
    })
108
    const result = await this.knex.raw(knexQuery, params)
×
109
    return { rows: result.rows }
×
110
  }
111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc