• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 16127615119

07 Jul 2025 08:51PM UTC coverage: 94.475% (-5.5%) from 100.0%
16127615119

Pull #495

github

web-flow
Merge ec219eae0 into 6d4c88633
Pull Request #495: v11

368 of 430 branches covered (85.58%)

304 of 305 new or added lines in 8 files covered. (99.67%)

49 existing lines in 4 files now uncovered.

855 of 905 relevant lines covered (94.48%)

103.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.19
/src/index.js
1
const EventEmitter = require('node:events')
4✔
2
const plans = require('./plans')
4✔
3
const Attorney = require('./attorney')
4✔
4
const Contractor = require('./contractor')
4✔
5
const Manager = require('./manager')
4✔
6
const Timekeeper = require('./timekeeper')
4✔
7
const Boss = require('./boss')
4✔
8
const Db = require('./db')
4✔
9
const { delay } = require('./tools')
4✔
10

11
const events = {
4✔
12
  error: 'error',
13
  stopped: 'stopped'
14
}
15
class PgBoss extends EventEmitter {
16
  #stoppingOn
17
  #stopped
18
  #starting
19
  #started
20
  #config
21
  #db
22
  #boss
23
  #contractor
24
  #manager
25
  #timekeeper
26

27
  static getConstructionPlans (schema) {
UNCOV
28
    return Contractor.constructionPlans(schema)
×
29
  }
30

31
  static getMigrationPlans (schema, version) {
UNCOV
32
    return Contractor.migrationPlans(schema, version)
×
33
  }
34

35
  static getRollbackPlans (schema, version) {
UNCOV
36
    return Contractor.rollbackPlans(schema, version)
×
37
  }
38

39
  static states = plans.JOB_STATES
4✔
40
  static policies = plans.QUEUE_POLICIES
4✔
41

42
  constructor (value) {
43
    super()
188✔
44

45
    this.#stoppingOn = null
188✔
46
    this.#stopped = true
188✔
47

48
    const config = Attorney.getConfig(value)
188✔
49
    this.#config = config
187✔
50

51
    const db = this.getDb()
187✔
52
    this.#db = db
187✔
53

54
    if (db._pgbdb) {
187✔
55
      this.#promoteEvents(db)
186✔
56
    }
57

58
    const contractor = new Contractor(db, config)
187✔
59

60
    const manager = new Manager(db, config)
187✔
61
    const bossConfig = { ...config, manager }
187✔
62

63
    const boss = new Boss(db, bossConfig)
187✔
64

65
    const timekeeper = new Timekeeper(db, bossConfig)
187✔
66
    manager.timekeeper = timekeeper
187✔
67

68
    this.#promoteEvents(manager)
187✔
69
    this.#promoteEvents(boss)
187✔
70
    this.#promoteEvents(timekeeper)
187✔
71

72
    this.#promoteFunctions(boss)
187✔
73
    this.#promoteFunctions(contractor)
187✔
74
    this.#promoteFunctions(manager)
187✔
75
    this.#promoteFunctions(timekeeper)
187✔
76

77
    this.#boss = boss
187✔
78
    this.#contractor = contractor
187✔
79
    this.#manager = manager
187✔
80
    this.#timekeeper = timekeeper
187✔
81
  }
82

83
  getDb () {
84
    if (this.#db) {
191✔
85
      return this.#db
4✔
86
    }
87

88
    if (this.#config.db) {
187✔
89
      return this.#config.db
1✔
90
    }
91

92
    return new Db(this.#config)
186✔
93
  }
94

95
  #promoteEvents (emitter) {
96
    for (const event of Object.values(emitter?.events)) {
747✔
97
      emitter.on(event, arg => this.emit(event, arg))
1,495✔
98
    }
99
  }
100

101
  #promoteFunctions (obj) {
102
    for (const func of obj?.functions) {
748✔
103
      this[func.name] = (...args) => func.apply(obj, args)
6,358✔
104
    }
105
  }
106

107
  async start () {
108
    if (this.#starting || this.#started) {
189✔
109
      return this
3✔
110
    }
111

112
    this.#starting = true
186✔
113

114
    if (this.#db._pgbdb && !this.#db.opened) {
186!
115
      await this.#db.open()
186✔
116
    }
117

118
    if (this.#config.migrate) {
186!
119
      await this.#contractor.start()
186✔
120
    } else {
UNCOV
121
      await this.#contractor.check()
×
122
    }
123

124
    await this.#manager.start()
185✔
125

126
    if (this.#config.supervise) {
185✔
127
      await this.#boss.start()
4✔
128
    }
129

130
    if (this.#config.schedule) {
185✔
131
      await this.#timekeeper.start()
10✔
132
    }
133

134
    this.#starting = false
185✔
135
    this.#started = true
185✔
136
    this.#stopped = false
185✔
137

138
    return this
185✔
139
  }
140

141
  async stop (options = {}) {
×
142
    if (this.#stoppingOn || this.#stopped) {
193✔
143
      return
8✔
144
    }
145

146
    let { close = true, graceful = true, timeout = 30000, wait = true } = options
185✔
147

148
    timeout = Math.max(timeout, 1000)
185✔
149

150
    this.#stoppingOn = Date.now()
185✔
151

152
    await this.#manager.stop()
185✔
153
    await this.#timekeeper.stop()
185✔
154
    await this.#boss.stop()
185✔
155

156
    await new Promise((resolve, reject) => {
185✔
157
      const shutdown = async () => {
185✔
158
        try {
184✔
159
          if (this.#config.__test__throw_shutdown) {
184✔
160
            throw new Error(this.#config.__test__throw_shutdown)
1✔
161
          }
162

163
          await this.#manager.failWip()
183✔
164

165
          if (this.#db._pgbdb && this.#db.opened && close) {
183✔
166
            await this.#db.close()
181✔
167
          }
168

169
          this.#stopped = true
183✔
170
          this.#stoppingOn = null
183✔
171
          this.#started = false
183✔
172

173
          this.emit(events.stopped)
183✔
174
          resolve()
183✔
175
        } catch (err) {
176
          this.emit(events.error, err)
1✔
177
          reject(err)
1✔
178
        }
179
      }
180

181
      if (!graceful) {
185✔
182
        return shutdown()
24✔
183
      }
184

185
      if (!wait) {
161✔
186
        resolve()
2✔
187
      }
188

189
      setImmediate(async () => {
161✔
190
        try {
161✔
191
          if (this.#config.__test__throw_stop_monitor) {
161✔
192
            throw new Error(this.#config.__test__throw_stop_monitor)
1✔
193
          }
194

195
          const isWip = () => this.#manager.getWipData({ includeInternal: false }).length > 0
169✔
196

197
          while ((Date.now() - this.#stoppingOn) < timeout && isWip()) {
160✔
198
            await delay(500)
10✔
199
          }
200

201
          await shutdown()
160✔
202
        } catch (err) {
203
          reject(err)
1✔
204
          this.emit(events.error, err)
1✔
205
        }
206
      })
207
    })
208
  }
209
}
210

211
module.exports = PgBoss
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc