• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

timgit / pg-boss / 9949043480

16 Jul 2024 01:09AM UTC coverage: 94.41% (-5.6%) from 100.0%
9949043480

Pull #425

github

web-flow
Merge 6b98fbe61 into f1c1636ca
Pull Request #425: v10

466 of 546 branches covered (85.35%)

351 of 365 new or added lines in 10 files covered. (96.16%)

40 existing lines in 5 files now uncovered.

912 of 966 relevant lines covered (94.41%)

803.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.03
/src/timekeeper.js
1
const EventEmitter = require('events')
4✔
2
const plans = require('./plans')
4✔
3
const cronParser = require('cron-parser')
4✔
4
const Attorney = require('./attorney')
4✔
5
const pMap = require('p-map')
4✔
6

7
const queues = {
4✔
8
  SEND_IT: '__pgboss__send-it'
9
}
10

11
const events = {
4✔
12
  error: 'error',
13
  schedule: 'schedule'
14
}
15

16
class Timekeeper extends EventEmitter {
17
  constructor (db, config) {
18
    super()
182✔
19

20
    this.db = db
182✔
21
    this.config = config
182✔
22
    this.manager = config.manager
182✔
23
    this.skewMonitorIntervalMs = config.clockMonitorIntervalSeconds * 1000
182✔
24
    this.cronMonitorIntervalMs = config.cronMonitorIntervalSeconds * 1000
182✔
25
    this.clockSkew = 0
182✔
26

27
    this.events = events
182✔
28

29
    this.getTimeCommand = plans.getTime(config.schema)
182✔
30
    this.getQueueCommand = plans.getQueueByName(config.schema)
182✔
31
    this.getSchedulesCommand = plans.getSchedules(config.schema)
182✔
32
    this.scheduleCommand = plans.schedule(config.schema)
182✔
33
    this.unscheduleCommand = plans.unschedule(config.schema)
182✔
34
    this.getCronTimeCommand = plans.getCronTime(config.schema)
182✔
35
    this.setCronTimeCommand = plans.setCronTime(config.schema)
182✔
36

37
    this.functions = [
182✔
38
      this.schedule,
39
      this.unschedule,
40
      this.getSchedules
41
    ]
42

43
    this.stopped = true
182✔
44
  }
45

46
  async start () {
47
    this.stopped = false
11✔
48

49
    // setting the archive config too low breaks the cron 60s debounce interval so don't even try
50
    if (this.config.archiveSeconds < 60 || this.config.archiveFailedSeconds < 60) {
11✔
51
      return
1✔
52
    }
53

54
    // cache the clock skew from the db server
55
    await this.cacheClockSkew()
10✔
56

57
    try {
10✔
58
      await this.manager.createQueue(queues.SEND_IT)
10✔
59
    } catch {}
60

61
    await this.manager.work(queues.SEND_IT, { newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))
10✔
62

63
    setImmediate(() => this.onCron())
10✔
64

65
    // create monitoring interval to make sure cron hasn't crashed
66
    this.cronMonitorInterval = setInterval(async () => await this.onCron(), this.cronMonitorIntervalMs)
14✔
67
    // create monitoring interval to measure and adjust for drift in clock skew
68
    this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs)
10✔
69
  }
70

71
  async stop () {
72
    if (this.stopped) {
180✔
73
      return
170✔
74
    }
75

76
    this.stopped = true
10✔
77

78
    await this.manager.offWork(queues.SEND_IT)
10✔
79

80
    if (this.skewMonitorInterval) {
10✔
81
      clearInterval(this.skewMonitorInterval)
9✔
82
      this.skewMonitorInterval = null
9✔
83
    }
84

85
    if (this.cronMonitorInterval) {
10✔
86
      clearInterval(this.cronMonitorInterval)
9✔
87
      this.cronMonitorInterval = null
9✔
88
    }
89
  }
90

91
  async cacheClockSkew () {
92
    let skew = 0
16✔
93

94
    try {
16✔
95
      if (this.config.__test__force_clock_monitoring_error) {
16✔
96
        throw new Error(this.config.__test__force_clock_monitoring_error)
8✔
97
      }
98

99
      const { rows } = await this.db.executeSql(this.getTimeCommand)
8✔
100

101
      const local = Date.now()
8✔
102

103
      const dbTime = parseFloat(rows[0].time)
8✔
104

105
      skew = dbTime - local
8✔
106

107
      const skewSeconds = Math.abs(skew) / 1000
8✔
108

109
      if (skewSeconds >= 60 || this.config.__test__force_clock_skew_warning) {
8✔
110
        Attorney.warnClockSkew(`Instance clock is ${skewSeconds}s ${skew > 0 ? 'slower' : 'faster'} than database.`)
1!
111
      }
112
    } catch (err) {
113
      this.emit(this.events.error, err)
8✔
114
    } finally {
115
      this.clockSkew = skew
16✔
116
    }
117
  }
118

119
  async onCron () {
120
    let locker
121

122
    try {
24✔
123
      if (this.stopped || this.timekeeping) return
24!
124

125
      if (this.config.__test__force_cron_monitoring_error) {
24✔
126
        throw new Error(this.config.__test__force_cron_monitoring_error)
3✔
127
      }
128

129
      this.timekeeping = true
21✔
130

131
      locker = await this.db.lock({ key: 'timekeeper' })
21✔
132

133
      const { secondsAgo } = await this.getCronTime()
21✔
134

135
      if (secondsAgo > this.config.cronMonitorIntervalSeconds) {
19✔
136
        await this.cron()
12✔
137
        await this.setCronTime()
12✔
138
      }
139
    } catch (err) {
140
      this.emit(this.events.error, err)
5✔
141
    } finally {
142
      this.timekeeping = false
24✔
143
      await locker?.unlock()
24✔
144
    }
145
  }
146

147
  async cron () {
148
    const items = await this.getSchedules()
12✔
149

150
    const sending = items.filter(i => this.shouldSendIt(i.cron, i.timezone))
12✔
151

152
    if (sending.length && !this.stopped) {
12✔
153
      await pMap(sending, it => this.send(it), { concurrency: 5 })
7✔
154
    }
155
  }
156

157
  shouldSendIt (cron, tz) {
158
    const interval = cronParser.parseExpression(cron, { tz })
7✔
159

160
    const prevTime = interval.prev()
7✔
161

162
    const databaseTime = Date.now() + this.clockSkew
7✔
163

164
    const prevDiff = (databaseTime - prevTime.getTime()) / 1000
7✔
165

166
    return prevDiff < 60
7✔
167
  }
168

169
  async send (job) {
170
    await this.manager.send(queues.SEND_IT, job, { singletonKey: job.name, singletonSeconds: 60 })
7✔
171
  }
172

173
  async onSendIt (job) {
174
    if (this.stopped) return
4!
175
    const { name, data, options } = job.data
4✔
176
    await this.manager.send(name, data, options)
4✔
177
  }
178

179
  async getSchedules () {
180
    const { rows } = await this.db.executeSql(this.getSchedulesCommand)
12✔
181
    return rows
12✔
182
  }
183

184
  async schedule (name, cron, data, options = {}) {
5✔
185
    const { tz = 'UTC' } = options
6✔
186

187
    cronParser.parseExpression(cron, { tz })
6✔
188

189
    // validation pre-check
190
    Attorney.checkSendArgs([name, data, options], this.config)
6✔
191

192
    // make sure queue exists before scheduling
193
    const queue = await this.db.executeSql(this.getQueueCommand, [name])
6✔
194

195
    if (!queue.rows.length === 0) {
6!
NEW
196
      throw new Error(`Queue '${name}' not found`)
×
197
    }
198

199
    const values = [name, cron, tz, data, options]
6✔
200

201
    const result = await this.db.executeSql(this.scheduleCommand, values)
6✔
202

203
    return result ? result.rowCount : null
6!
204
  }
205

206
  async unschedule (name) {
207
    const result = await this.db.executeSql(this.unscheduleCommand, [name])
1✔
208
    return result ? result.rowCount : null
1!
209
  }
210

211
  async setCronTime () {
212
    await this.db.executeSql(this.setCronTimeCommand)
12✔
213
  }
214

215
  async getCronTime () {
216
    const { rows } = await this.db.executeSql(this.getCronTimeCommand)
21✔
217

218
    let { cron_on: cronOn, seconds_ago: secondsAgo } = rows[0]
19✔
219

220
    secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 61
19✔
221

222
    return { cronOn, secondsAgo }
19✔
223
  }
224
}
225

226
module.exports = Timekeeper
4✔
227
module.exports.QUEUES = queues
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc