• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence / 15283162130

27 May 2025 06:42PM UTC coverage: 96.798% (-2.7%) from 99.545%
15283162130

Pull #105

github

web-flow
Merge ae94d5e0a into 2d6950570
Pull Request #105: feat: asyncify

372 of 401 branches covered (92.77%)

Branch coverage included in aggregate %.

875 of 937 new or added lines in 6 files covered. (93.38%)

1 existing line in 1 file now uncovered.

2439 of 2503 relevant lines covered (97.44%)

324.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/callBackPersistence.js
1
'use strict'
12✔
2

12✔
3
/* This module provides a callback layer for async persistence implementations */
12✔
4
const { Readable } = require('node:stream')
12✔
5
const { EventEmitter } = require('node:events')
12✔
6

12✔
7
class CallBackPersistence extends EventEmitter {
12✔
8
  constructor (asyncInstanceFactory, opts = {}) {
12✔
9
    super()
690✔
10

690✔
11
    this.ready = false
690✔
12
    this.asyncPersistence = asyncInstanceFactory(opts)
690✔
13
  }
690✔
14

12✔
15
  get broker () {
12✔
16
    return this.asyncPersistence.broker
30,600✔
17
  }
30,600✔
18

12✔
19
  set broker (broker) {
12✔
20
    this.setup(broker)
414✔
21
  }
414✔
22

12✔
23
  setup (broker) {
12✔
24
    if (this.ready) {
690!
NEW
25
      return
×
NEW
26
    }
×
27

690✔
28
    this.asyncPersistence.setup(broker)
690✔
29
      .then(() => {
690✔
30
        this.ready = true
690✔
31
        this.emit('ready')
690✔
32
      })
690✔
33
      .catch(err => {
690✔
NEW
34
        this.emit('error', err)
×
35
      })
690✔
36
  }
690✔
37

12✔
38
  subscriptionsByTopic (topic, cb) {
12✔
39
    if (cb) {
195✔
40
      if (!this.ready) {
117!
NEW
41
        this.once('ready', this.subscriptionsByTopic.bind(this, topic, cb))
×
NEW
42
        return this
×
NEW
43
      }
×
44
      this.asyncPersistence.subscriptionsByTopic(topic)
117✔
45
        .then(resubs => {
117✔
46
          process.nextTick(cb, null, resubs)
117✔
47
        })
117✔
48
        .catch(cb)
117✔
49
    } else {
195✔
50
      return this.asyncPersistence.subscriptionsByTopic(topic)
78✔
51
    }
78✔
52
  }
195✔
53

12✔
54
  cleanSubscriptions (client, cb) {
12✔
55
    if (cb) {
30✔
56
      this.asyncPersistence.cleanSubscriptions(client)
18✔
57
        .then(client => {
18✔
58
          process.nextTick(cb, null, client)
18✔
59
        })
18✔
60
        .catch(cb)
18✔
61
    } else {
24✔
62
      return this.asyncPersistence.cleanSubscriptions(client)
12✔
63
    }
12✔
64
  }
30✔
65

12✔
66
  storeRetained (packet, cb) {
12✔
67
    if (cb) {
15,210✔
68
      if (!this.ready) {
9,126!
NEW
69
        this.once('ready', this.storeRetained.bind(this, packet, cb))
×
NEW
70
        return
×
NEW
71
      }
×
72
      this.asyncPersistence.storeRetained(packet).then(() => {
9,126✔
73
        cb(null)
9,126✔
74
      }).catch(cb)
9,126✔
75
    } else {
15,210✔
76
      return this.asyncPersistence.storeRetained(packet)
6,084✔
77
    }
6,084✔
78
  }
15,210✔
79

12✔
80
  createRetainedStream (pattern) {
12✔
81
    return Readable.from(this.asyncPersistence.createRetainedStream(pattern))
150✔
82
  }
150✔
83

12✔
84
  createRetainedStreamCombi (patterns) {
12✔
85
    return Readable.from(this.asyncPersistence.createRetainedStreamCombi(patterns))
30✔
86
  }
30✔
87

12✔
88
  addSubscriptions (client, subs, cb) {
12✔
89
    if (cb) {
405✔
90
      if (!this.ready) {
243!
NEW
91
        this.once('ready', this.addSubscriptions.bind(this, client, subs, cb))
×
NEW
92
        return
×
NEW
93
      }
×
94
      this.asyncPersistence.addSubscriptions(client, subs)
243✔
95
        .then(() => cb(null, client))
243✔
96
        .catch(cb)
243✔
97
    } else {
405✔
98
      return this.asyncPersistence.addSubscriptions(client, subs)
162✔
99
    }
162✔
100
  }
405✔
101

12✔
102
  removeSubscriptions (client, subs, cb) {
12✔
103
    if (cb) {
225✔
104
      if (!this.ready) {
135!
NEW
105
        this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb))
×
NEW
106
        return
×
NEW
107
      }
×
108

135✔
109
      this.asyncPersistence.removeSubscriptions(client, subs)
135✔
110
        .then(() => cb(null, client))
135✔
111
        .catch(cb)
135✔
112
    } else {
225✔
113
      return this.asyncPersistence.removeSubscriptions(client, subs)
90✔
114
    }
90✔
115
  }
225✔
116

12✔
117
  subscriptionsByClient (client, cb) {
12✔
118
    if (cb) {
210✔
119
      if (!this.ready) {
126!
NEW
120
        this.once('ready', this.subscriptionsByClient.bind(this, client, cb))
×
NEW
121
        return
×
NEW
122
      }
×
123

126✔
124
      this.asyncPersistence.subscriptionsByClient(client)
126✔
125
        .then(resubs => {
126✔
126
          process.nextTick(cb, null, resubs.length > 0 ? resubs : null, client)
126✔
127
        })
126✔
128
        .catch(cb)
126✔
129
    } else {
210✔
130
      return this.asyncPersistence.subscriptionsByClient(client)
84✔
131
    }
84✔
132
  }
210✔
133

12✔
134
  countOffline (cb) {
12✔
135
    if (cb) {
330✔
136
      this.asyncPersistence.countOffline()
198✔
137
        .then(res => process.nextTick(cb, null, res.subsCount, res.clientsCount))
198✔
138
        .catch(cb)
198✔
139
    } else {
264✔
140
      return this.asyncPersistence.countOffline()
132✔
141
    }
132✔
142
  }
330✔
143

12✔
144
  destroy (cb) {
12✔
145
    if (cb) {
690✔
146
      if (!this.ready) {
414!
NEW
147
        this.once('ready', this.destroy.bind(this, cb))
×
NEW
148
        return
×
NEW
149
      }
×
150
      this.asyncPersistence.destroy()
414✔
151
        .finally(cb) // swallow err in case of failure
414✔
152
    } else {
690✔
153
      return this.asyncPersistence.destroy()
276✔
154
    }
276✔
155
  }
690✔
156

12✔
157
  outgoingEnqueue (sub, packet, cb) {
12✔
158
    if (cb) {
75✔
159
      if (!this.ready) {
45!
NEW
160
        this.once('ready', this.outgoingEnqueue.bind(this, sub, packet, cb))
×
NEW
161
        return
×
NEW
162
      }
×
163
      this.asyncPersistence.outgoingEnqueue(sub, packet)
45✔
164
        .then(() => process.nextTick(cb, null, packet))
45✔
165
        .catch(cb)
45✔
166
    } else {
75✔
167
      return this.asyncPersistence.outgoingEnqueue(sub, packet)
30✔
168
    }
30✔
169
  }
75✔
170

12✔
171
  outgoingEnqueueCombi (subs, packet, cb) {
12✔
172
    if (cb) {
135✔
173
      if (!this.ready) {
81!
NEW
174
        this.once('ready', this.outgoingEnqueueCombi.bind(this, subs, packet, cb))
×
NEW
175
        return
×
NEW
176
      }
×
177
      this.asyncPersistence.outgoingEnqueueCombi(subs, packet)
81✔
178
        .then(() => process.nextTick(cb, null, packet))
81✔
179
        .catch(cb)
81✔
180
    } else {
135✔
181
      return this.asyncPersistence.outgoingEnqueueCombi(subs, packet)
54✔
182
    }
54✔
183
  }
135✔
184

12✔
185
  outgoingStream (client) {
12✔
186
    return Readable.from(this.asyncPersistence.outgoingStream(client))
225✔
187
  }
225✔
188

12✔
189
  outgoingUpdate (client, packet, cb) {
12✔
190
    if (cb) {
195✔
191
      if (!this.ready) {
117!
NEW
192
        this.once('ready', this.outgoingUpdate.bind(this, client, packet, cb))
×
NEW
193
        return
×
NEW
194
      }
×
195
      this.asyncPersistence.outgoingUpdate(client, packet)
117✔
196
        .then(() => cb(null, client, packet))
117✔
197
        .catch(cb)
117✔
198
    } else {
195✔
199
      return this.asyncPersistence.outgoingUpdate(client, packet)
78✔
200
    }
78✔
201
  }
195✔
202

12✔
203
  outgoingClearMessageId (client, packet, cb) {
12✔
204
    if (cb) {
60✔
205
      if (!this.ready) {
36!
NEW
206
        this.once('ready', this.outgoingClearMessageId.bind(this, client, packet, cb))
×
NEW
207
        return
×
NEW
208
      }
×
209
      this.asyncPersistence.outgoingClearMessageId(client, packet)
36✔
210
        .then((packet) => cb(null, packet))
36✔
211
        .catch(cb)
36✔
212
    } else {
60✔
213
      return this.asyncPersistence.outgoingClearMessageId(client, packet)
24✔
214
    }
24✔
215
  }
60✔
216

12✔
217
  incomingStorePacket (client, packet, cb) {
12✔
218
    if (cb) {
15✔
219
      if (!this.ready) {
9!
NEW
220
        this.once('ready', this.incomingStorePacket.bind(this, client, packet, cb))
×
NEW
221
        return
×
NEW
222
      }
×
223
      this.asyncPersistence.incomingStorePacket(client, packet)
9✔
224
        .then(() => cb(null))
9✔
225
        .catch(cb)
9✔
226
    } else {
15✔
227
      return this.asyncPersistence.incomingStorePacket(client, packet)
6✔
228
    }
6✔
229
  }
15✔
230

12✔
231
  incomingGetPacket (client, packet, cb) {
12✔
232
    if (cb) {
30✔
233
      if (!this.ready) {
18!
NEW
234
        this.once('ready', this.incomingGetPacket.bind(this, client, packet, cb))
×
NEW
235
        return
×
NEW
236
      }
×
237
      this.asyncPersistence.incomingGetPacket(client, packet)
18✔
238
        .then((packet) => cb(null, packet, client))
18✔
239
        .catch(cb)
18✔
240
    } else {
30✔
241
      return this.asyncPersistence.incomingGetPacket(client, packet)
12✔
242
    }
12✔
243
  }
30✔
244

12✔
245
  incomingDelPacket (client, packet, cb) {
12✔
246
    if (cb) {
15✔
247
      if (!this.ready) {
9!
NEW
248
        this.once('ready', this.incomingDelPacket.bind(this, client, packet, cb))
×
NEW
249
        return
×
NEW
250
      }
×
251
      this.asyncPersistence.incomingDelPacket(client, packet)
9✔
252
        .then(() => cb(null))
9✔
253
        .catch(cb)
9✔
254
    } else {
15✔
255
      return this.asyncPersistence.incomingDelPacket(client, packet)
6✔
256
    }
6✔
257
  }
15✔
258

12✔
259
  putWill (client, packet, cb) {
12✔
260
    if (cb) {
75✔
261
      if (!this.ready) {
45!
NEW
262
        this.once('ready', this.putWill.bind(this, client, packet, cb))
×
NEW
263
        return
×
NEW
264
      }
×
265
      this.asyncPersistence.putWill(client, packet)
45✔
266
        .then(() => cb(null, client))
45✔
267
        .catch(cb)
45✔
268
    } else {
75✔
269
      return this.asyncPersistence.putWill(client, packet)
30✔
270
    }
30✔
271
  }
75✔
272

12✔
273
  getWill (client, cb) {
12✔
274
    if (cb) {
30✔
275
      this.asyncPersistence.getWill(client)
18✔
276
        .then(packet => {
18✔
277
          cb(null, packet, client)
18✔
278
        })
18✔
279
        .catch(cb)
18✔
280
    } else {
24✔
281
      return this.asyncPersistence.getWill(client)
12✔
282
    }
12✔
283
  }
30✔
284

12✔
285
  delWill (client, cb) {
12✔
286
    if (cb) {
60✔
287
      this.asyncPersistence.delWill(client)
36✔
288
        .then(packet => {
36✔
289
          cb(null, packet, client)
36✔
290
        })
36✔
291
        .catch(cb)
36✔
292
    } else {
48✔
293
      return this.asyncPersistence.delWill(client)
24✔
294
    }
24✔
295
  }
60✔
296

12✔
297
  streamWill (brokers) {
12✔
298
    return Readable.from(this.asyncPersistence.streamWill(brokers))
30✔
299
  }
30✔
300

12✔
301
  getClientList (topic) {
12✔
302
    return Readable.from(this.asyncPersistence.getClientList(topic))
30✔
303
  }
30✔
304
}
12✔
305

12✔
306
module.exports = { CallBackPersistence }
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc