• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-cached-persistence / 14779209625

01 May 2025 04:36PM UTC coverage: 88.889% (+1.0%) from 87.925%
14779209625

Pull #59

github

web-flow
Merge 57b3ba7cd into 47bcbdc98
Pull Request #59: feature: addition of call back persistence

104 of 123 branches covered (84.55%)

Branch coverage included in aggregate %.

229 of 253 new or added lines in 2 files covered. (90.51%)

16 existing lines in 1 file now uncovered.

424 of 471 relevant lines covered (90.02%)

190.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.51
/callBackPersistence.js
1
'use strict'
3✔
2

3✔
3
/* This module provides a callback layer for async persistence implementations */
3✔
4
const { Readable } = require('node:stream')
3✔
5
const CachedPersistence = require('./index.js')
3✔
6

3✔
7
function toValue (obj, prop) {
18✔
8
  if (typeof obj === 'object' && obj !== null && prop in obj) {
18✔
9
    return obj[prop]
18✔
10
  }
18✔
NEW
11
  return obj
×
12
}
18✔
13

3✔
14
class CallBackPersistence extends CachedPersistence {
3✔
15
  constructor (asyncInstanceFactory, opts = {}) {
3✔
16
    super(opts)
126✔
17
    this.asyncPersistence = asyncInstanceFactory(opts)
126✔
18
  }
126✔
19

3✔
20
  _setup () {
3✔
21
    if (this.ready) {
126!
NEW
22
      return
×
NEW
23
    }
×
24
    this.asyncPersistence.broker = this.broker
126✔
25
    this.asyncPersistence._trie = this._trie
126✔
26
    this.asyncPersistence.setup()
126✔
27
      .then(() => {
126✔
28
        this.emit('ready')
126✔
29
      })
126✔
30
      .catch(err => {
126✔
NEW
31
        this.emit('error', err)
×
32
      })
126✔
33
  }
126✔
34

3✔
35
  storeRetained (packet, cb) {
3✔
36
    if (!this.ready) {
3,057✔
37
      this.once('ready', this.storeRetained.bind(this, packet, cb))
27✔
38
      return
27✔
39
    }
27✔
40
    this.asyncPersistence.storeRetained(packet).then(() => {
3,030✔
41
      cb(null)
3,030✔
42
    }).catch(cb)
3,030✔
43
  }
3,057✔
44

3✔
45
  createRetainedStream (pattern) {
3✔
46
    return Readable.from(this.asyncPersistence.createRetainedStream(pattern))
21✔
47
  }
21✔
48

3✔
49
  createRetainedStreamCombi (patterns) {
3✔
50
    return Readable.from(this.asyncPersistence.createRetainedStreamCombi(patterns))
3✔
51
  }
3✔
52

3✔
53
  addSubscriptions (client, subs, cb) {
3✔
54
    if (!this.ready) {
129✔
55
      this.once('ready', this.addSubscriptions.bind(this, client, subs, cb))
48✔
56
      return
48✔
57
    }
48✔
58

81✔
59
    const addSubs1 = this.asyncPersistence.addSubscriptions(client, subs)
81✔
60
    // promisify
81✔
61
    const addSubs2 = new Promise((resolve, reject) => {
81✔
62
      this._addedSubscriptions(client, subs, (err) => {
81✔
63
        if (err) {
81!
NEW
64
          reject(err)
×
65
        } else {
81✔
66
          resolve()
81✔
67
        }
81✔
68
      })
81✔
69
    })
81✔
70
    Promise.all([addSubs1, addSubs2])
81✔
71
      .then(() => cb(null, client))
81✔
72
      .catch(err => cb(err, client))
81✔
73
  }
129✔
74

3✔
75
  removeSubscriptions (client, subs, cb) {
3✔
76
    if (!this.ready) {
48!
NEW
77
      this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb))
×
NEW
78
      return
×
NEW
79
    }
×
80

48✔
81
    const remSubs1 = this.asyncPersistence.removeSubscriptions(client, subs)
48✔
82
    // promisify
48✔
83
    const mappedSubs = subs.map(sub => { return { topic: sub } })
48✔
84
    const remSubs2 = new Promise((resolve, reject) => {
48✔
85
      this._removedSubscriptions(client, mappedSubs, (err) => {
48✔
86
        if (err) {
48!
NEW
87
          reject(err)
×
88
        } else {
48✔
89
          resolve()
48✔
90
        }
48✔
91
      })
48✔
92
    })
48✔
93
    Promise.all([remSubs1, remSubs2])
48✔
94
      .then(() => process.nextTick(cb, null, client))
48✔
95
      .catch(err => cb(err, client))
48✔
96
  }
48✔
97

3✔
98
  subscriptionsByClient (client, cb) {
3✔
99
    if (!this.ready) {
51✔
100
      this.once('ready', this.subscriptionsByClient.bind(this, client, cb))
3✔
101
      return
3✔
102
    }
3✔
103

48✔
104
    this.asyncPersistence.subscriptionsByClient(client)
48✔
105
      .then(results => {
48✔
106
        // promisified shim returns an object, true async only the resubs
48✔
107
        const resubs = results?.resubs || results
48✔
108
        process.nextTick(cb, null, resubs.length > 0 ? resubs : null, client)
48✔
109
      })
48✔
110
      .catch(cb)
48✔
111
  }
51✔
112

3✔
113
  countOffline (cb) {
3✔
114
    this.asyncPersistence.countOffline()
66✔
115
      .then(res => process.nextTick(cb, null, res.subsCount, res.clientsCount))
66✔
116
      .catch(cb)
66✔
117
  }
66✔
118

3✔
119
  destroy (cb = noop) {
3✔
120
    if (!this.ready) {
126!
NEW
121
      this.once('ready', this.destroy.bind(this, cb))
×
NEW
122
      return
×
NEW
123
    }
×
124

126✔
125
    if (this._destroyed) {
126!
NEW
126
      throw new Error('destroyed called twice!')
×
NEW
127
    }
×
128

126✔
129
    this._destroyed = true
126✔
130

126✔
131
    this.asyncPersistence.destroy()
126✔
132
      .finally(cb) // swallow err in case of failure
126✔
133
  }
126✔
134

3✔
135
  outgoingEnqueue (sub, packet, cb) {
3✔
136
    if (!this.ready) {
24✔
137
      this.once('ready', this.outgoingEnqueue.bind(this, sub, packet, cb))
9✔
138
      return
9✔
139
    }
9✔
140
    this.asyncPersistence.outgoingEnqueue(sub, packet)
15✔
141
      .then(() => process.nextTick(cb, null, packet))
15✔
142
      .catch(cb)
15✔
143
  }
24✔
144

3✔
145
  outgoingEnqueueCombi (subs, packet, cb) {
3✔
146
    if (!this.ready) {
48✔
147
      this.once('ready', this.outgoingEnqueueCombi.bind(this, subs, packet, cb))
21✔
148
      return
21✔
149
    }
21✔
150
    this.asyncPersistence.outgoingEnqueueCombi(subs, packet)
27✔
151
      .then(() => process.nextTick(cb, null, packet))
27✔
152
      .catch(cb)
27✔
153
  }
48✔
154

3✔
155
  outgoingStream (client) {
3✔
156
    return Readable.from(this.asyncPersistence.outgoingStream(client))
45✔
157
  }
45✔
158

3✔
159
  outgoingUpdate (client, packet, cb) {
3✔
160
    if (!this.ready) {
39!
NEW
161
      this.once('ready', this.outgoingUpdate.bind(this, client, packet, cb))
×
NEW
162
      return
×
NEW
163
    }
×
164
    this.asyncPersistence.outgoingUpdate(client, packet)
39✔
165
      .then(() => cb(null, client, packet))
39✔
166
      .catch(cb)
39✔
167
  }
39✔
168

3✔
169
  outgoingClearMessageId (client, packet, cb) {
3✔
170
    if (!this.ready) {
15✔
171
      this.once('ready', this.outgoingClearMessageId.bind(this, client, packet, cb))
3✔
172
      return
3✔
173
    }
3✔
174
    this.asyncPersistence.outgoingClearMessageId(client, packet)
12✔
175
      .then((packet) => cb(null, packet))
12✔
176
      .catch(cb)
12✔
177
  }
15✔
178

3✔
179
  incomingStorePacket (client, packet, cb) {
3✔
180
    if (!this.ready) {
6✔
181
      this.once('ready', this.incomingStorePacket.bind(this, client, packet, cb))
3✔
182
      return
3✔
183
    }
3✔
184
    this.asyncPersistence.incomingStorePacket(client, packet)
3✔
185
      .then(() => cb(null))
3✔
186
      .catch(cb)
3✔
187
  }
6✔
188

3✔
189
  incomingGetPacket (client, packet, cb) {
3✔
190
    if (!this.ready) {
6!
NEW
191
      this.once('ready', this.incomingGetPacket.bind(this, client, packet, cb))
×
NEW
192
      return
×
NEW
193
    }
×
194
    this.asyncPersistence.incomingGetPacket(client, packet)
6✔
195
      .then((packet) => cb(null, packet, client))
6✔
196
      .catch(cb)
6✔
197
  }
6✔
198

3✔
199
  incomingDelPacket (client, packet, cb) {
3✔
200
    if (!this.ready) {
3!
NEW
201
      this.once('ready', this.incomingDelPacket.bind(this, client, packet, cb))
×
NEW
202
      return
×
NEW
203
    }
×
204
    this.asyncPersistence.incomingDelPacket(client, packet)
3✔
205
      .then(() => cb(null))
3✔
206
      .catch(cb)
3✔
207
  }
3✔
208

3✔
209
  putWill (client, packet, cb) {
3✔
210
    if (!this.ready) {
27✔
211
      this.once('ready', this.putWill.bind(this, client, packet, cb))
12✔
212
      return
12✔
213
    }
12✔
214
    this.asyncPersistence.putWill(client, packet)
15✔
215
      .then(() => cb(null, client))
15✔
216
      .catch(cb)
15✔
217
  }
27✔
218

3✔
219
  getWill (client, cb) {
3✔
220
    this.asyncPersistence.getWill(client)
6✔
221
      .then((result) => {
6✔
222
        // promisified shim returns an object, true async only the resubs
6✔
223
        const packet = toValue(result, 'packet')
6✔
224
        cb(null, packet, client)
6✔
225
      })
6✔
226
      .catch(cb)
6✔
227
  }
6✔
228

3✔
229
  delWill (client, cb) {
3✔
230
    this.asyncPersistence.delWill(client)
12✔
231
      .then(result => {
12✔
232
        // promisified shim returns an object, true async only the resubs
12✔
233
        const packet = toValue(result, 'packet')
12✔
234
        cb(null, packet, client)
12✔
235
      })
12✔
236
      .catch(cb)
12✔
237
  }
12✔
238

3✔
239
  streamWill (brokers) {
3✔
240
    return Readable.from(this.asyncPersistence.streamWill(brokers))
6✔
241
  }
6✔
242

3✔
243
  getClientList (topic) {
3✔
244
    return Readable.from(this.asyncPersistence.getClientList(topic))
6✔
245
  }
6✔
246
}
3✔
247

3✔
NEW
248
function noop () {}
×
249

3✔
250
module.exports = { CallBackPersistence }
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc