• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 10016541428

20 Jul 2024 01:41AM UTC coverage: 93.17% (-6.8%) from 100.0%
10016541428

Pull #425

github

web-flow
Merge b0d7e7540 into f1c1636ca
Pull Request #425: v10

451 of 532 branches covered (84.77%)

357 of 381 new or added lines in 10 files covered. (93.7%)

40 existing lines in 5 files now uncovered.

873 of 937 relevant lines covered (93.17%)

805.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.92
/src/timekeeper.js
1
const EventEmitter = require('events')
4✔
2
const plans = require('./plans')
4✔
3
const cronParser = require('cron-parser')
4✔
4
const Attorney = require('./attorney')
4✔
5
const pMap = require('p-map')
4✔
6

7
const queues = {
4✔
8
  SEND_IT: '__pgboss__send-it'
9
}
10

11
const events = {
4✔
12
  error: 'error',
13
  schedule: 'schedule'
14
}
15

16
class Timekeeper extends EventEmitter {
17
  constructor (db, config) {
18
    super()
182✔
19

20
    this.db = db
182✔
21
    this.config = config
182✔
22
    this.manager = config.manager
182✔
23
    this.skewMonitorIntervalMs = config.clockMonitorIntervalSeconds * 1000
182✔
24
    this.cronMonitorIntervalMs = config.cronMonitorIntervalSeconds * 1000
182✔
25
    this.clockSkew = 0
182✔
26

27
    this.events = events
182✔
28

29
    this.getTimeCommand = plans.getTime(config.schema)
182✔
30
    this.getQueueCommand = plans.getQueueByName(config.schema)
182✔
31
    this.getSchedulesCommand = plans.getSchedules(config.schema)
182✔
32
    this.scheduleCommand = plans.schedule(config.schema)
182✔
33
    this.unscheduleCommand = plans.unschedule(config.schema)
182✔
34
    this.trySetCronTimeCommand = plans.trySetCronTime(config.schema)
182✔
35

36
    this.functions = [
182✔
37
      this.schedule,
38
      this.unschedule,
39
      this.getSchedules
40
    ]
41

42
    this.stopped = true
182✔
43
  }
44

45
  async start () {
46
    // setting the archive config too low breaks the cron 60s debounce interval so don't even try
47
    if (this.config.archiveSeconds < 60 || this.config.archiveFailedSeconds < 60) {
11✔
48
      return
1✔
49
    }
50

51
    this.stopped = false
10✔
52

53
    await this.cacheClockSkew()
10✔
54

55
    try {
10✔
56
      await this.manager.createQueue(queues.SEND_IT)
10✔
57
    } catch {}
58

59
    const options = {
10✔
60
      newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds,
61
      teamSize: 50,
62
      teamConcurrency: 5
63
    }
64

65
    await this.manager.work(queues.SEND_IT, options, (job) => this.onSendIt(job))
10✔
66

67
    setImmediate(() => this.onCron())
10✔
68

69
    this.cronMonitorInterval = setInterval(async () => await this.onCron(), this.cronMonitorIntervalMs)
14✔
70
    this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs)
10✔
71
  }
72

73
  async stop () {
74
    if (this.stopped) {
180✔
75
      return
171✔
76
    }
77

78
    this.stopped = true
9✔
79

80
    await this.manager.offWork(queues.SEND_IT)
9✔
81

82
    if (this.skewMonitorInterval) {
9!
83
      clearInterval(this.skewMonitorInterval)
9✔
84
      this.skewMonitorInterval = null
9✔
85
    }
86

87
    if (this.cronMonitorInterval) {
9!
88
      clearInterval(this.cronMonitorInterval)
9✔
89
      this.cronMonitorInterval = null
9✔
90
    }
91
  }
92

93
  async cacheClockSkew () {
94
    let skew = 0
15✔
95

96
    try {
15✔
97
      if (this.config.__test__force_clock_monitoring_error) {
15✔
98
        throw new Error(this.config.__test__force_clock_monitoring_error)
7✔
99
      }
100

101
      const { rows } = await this.db.executeSql(this.getTimeCommand)
8✔
102

103
      const local = Date.now()
8✔
104

105
      const dbTime = parseFloat(rows[0].time)
8✔
106

107
      skew = dbTime - local
8✔
108

109
      const skewSeconds = Math.abs(skew) / 1000
8✔
110

111
      if (skewSeconds >= 60 || this.config.__test__force_clock_skew_warning) {
8✔
112
        Attorney.warnClockSkew(`Instance clock is ${skewSeconds}s ${skew > 0 ? 'slower' : 'faster'} than database.`)
1!
113
      }
114
    } catch (err) {
115
      this.emit(this.events.error, err)
7✔
116
    } finally {
117
      this.clockSkew = skew
15✔
118
    }
119
  }
120

121
  async onCron () {
122
    try {
24✔
123
      if (this.stopped || this.timekeeping) return
24!
124

125
      if (this.config.__test__force_cron_monitoring_error) {
24✔
126
        throw new Error(this.config.__test__force_cron_monitoring_error)
3✔
127
      }
128

129
      this.timekeeping = true
21✔
130

131
      const { rows } = await this.db.executeSql(this.trySetCronTimeCommand, [this.config.cronMonitorIntervalSeconds])
21✔
132

133
      if (rows.length === 1 && !this.stopped) {
21✔
134
        await this.cron()
15✔
135
      }
136
    } catch (err) {
137
      this.emit(this.events.error, err)
3✔
138
    } finally {
139
      this.timekeeping = false
24✔
140
    }
141
  }
142

143
  async cron () {
144
    const items = await this.getSchedules()
15✔
145

146
    const sending = items.filter(i => this.shouldSendIt(i.cron, i.timezone))
15✔
147

148
    if (sending.length && !this.stopped) {
15✔
149
      await pMap(sending, it => this.send(it), { concurrency: 5 })
10✔
150
    }
151
  }
152

153
  shouldSendIt (cron, tz) {
154
    const interval = cronParser.parseExpression(cron, { tz })
10✔
155

156
    const prevTime = interval.prev()
10✔
157

158
    const databaseTime = Date.now() + this.clockSkew
10✔
159

160
    const prevDiff = (databaseTime - prevTime.getTime()) / 1000
10✔
161

162
    return prevDiff < 60
10✔
163
  }
164

165
  async send (job) {
166
    await this.manager.send(queues.SEND_IT, job, { singletonKey: job.name, singletonSeconds: 60 })
10✔
167
  }
168

169
  async onSendIt (job) {
170
    if (this.stopped) return
4!
171
    const { name, data, options } = job.data
4✔
172
    await this.manager.send(name, data, options)
4✔
173
  }
174

175
  async getSchedules () {
176
    const { rows } = await this.db.executeSql(this.getSchedulesCommand)
15✔
177
    return rows
15✔
178
  }
179

180
  async schedule (name, cron, data, options = {}) {
5✔
181
    const { tz = 'UTC' } = options
6✔
182

183
    cronParser.parseExpression(cron, { tz })
6✔
184

185
    // validation pre-check
186
    Attorney.checkSendArgs([name, data, options], this.config)
6✔
187

188
    // make sure queue exists before scheduling
189
    const queue = await this.db.executeSql(this.getQueueCommand, [name])
6✔
190

191
    if (!queue.rows.length === 0) {
6!
NEW
192
      throw new Error(`Queue '${name}' not found`)
×
193
    }
194

195
    const values = [name, cron, tz, data, options]
6✔
196

197
    await this.db.executeSql(this.scheduleCommand, values)
6✔
198
  }
199

200
  async unschedule (name) {
201
    await this.db.executeSql(this.unscheduleCommand, [name])
1✔
202
  }
203
}
204

205
module.exports = Timekeeper
4✔
206
module.exports.QUEUES = queues
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc