• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence / 15283162130

27 May 2025 06:42PM UTC coverage: 96.798% (-2.7%) from 99.545%
15283162130

Pull #105

github

web-flow
Merge ae94d5e0a into 2d6950570
Pull Request #105: feat: asyncify

372 of 401 branches covered (92.77%)

Branch coverage included in aggregate %.

875 of 937 new or added lines in 6 files covered. (93.38%)

1 existing line in 1 file now uncovered.

2439 of 2503 relevant lines covered (97.44%)

324.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.06
/asyncPersistence.js
1
const { Readable } = require('node:stream')
15✔
2
const QlobberSub = require('qlobber/aedes/qlobber-sub')
15✔
3
const { QlobberTrue } = require('qlobber')
15✔
4
const Packet = require('aedes-packet')
15✔
5
const BroadcastPersistence = require('./broadcastPersistence.js')
15✔
6

15✔
7
const QLOBBER_OPTIONS = {
15✔
8
  wildcard_one: '+',
15✔
9
  wildcard_some: '#',
15✔
10
  separator: '/'
15✔
11
}
15✔
12
const CREATE_ON_EMPTY = true
15✔
13

15✔
14
function * multiIterables (iterables) {
30✔
15
  for (const iter of iterables) {
30✔
16
    yield * iter
75✔
17
  }
75✔
18
}
30✔
19

15✔
20
function * retainedMessagesByPattern (retained, pattern) {
225✔
21
  const qlobber = new QlobberTrue(QLOBBER_OPTIONS)
225✔
22
  qlobber.add(pattern)
225✔
23

225✔
24
  for (const [topic, packet] of retained) {
225✔
25
    if (qlobber.test(topic)) {
210✔
26
      yield packet
165✔
27
    }
165✔
28
  }
210✔
29
}
225✔
30

15✔
31
function * willsByBrokers (wills, brokers) {
30✔
32
  for (const will of wills.values()) {
30✔
33
    if (!brokers[will.brokerId]) {
45✔
34
      yield will
30✔
35
    }
30✔
36
  }
45✔
37
}
30✔
38

15✔
39
function * clientListbyTopic (subscriptions, topic) {
30✔
40
  for (const [clientId, topicMap] of subscriptions) {
30✔
41
    if (topicMap.has(topic)) {
45✔
42
      yield clientId
45✔
43
    }
45✔
44
  }
45✔
45
}
30✔
46

15✔
47
class MemoryPersistence {
240✔
48
  // private class members start with #
465✔
49
  #retained
465✔
50
  #subscriptions
690✔
51
  #outgoing
690✔
52
  #incoming
690✔
53
  #wills
690✔
54
  #clientsCount
690✔
55
  #destroyed
690✔
56
  #broadcastSubscriptions
690✔
57
  _trie
690✔
58
  broker
690✔
59

465✔
60
  constructor (opts = {}) {
465✔
61
    // using Maps for convenience and security (risk on prototype polution)
690✔
62
    // Map ( topic -> packet )
690✔
63
    this.#retained = new Map()
690✔
64
    // Map ( clientId -> Map( topic -> { qos, rh, rap, nl } ))
690✔
65
    this.#subscriptions = new Map()
690✔
66
    // Map ( clientId  > [ packet ] }
690✔
67
    this.#outgoing = new Map()
690✔
68
    // Map ( clientId -> { packetId -> Packet } )
690✔
69
    this.#incoming = new Map()
690✔
70
    // Map( clientId -> will )
690✔
71
    this.#wills = new Map()
690✔
72
    this.#clientsCount = 0
690✔
73
    this.#destroyed = false
690✔
74
    this.#broadcastSubscriptions = opts.broadcastSubscriptions
690✔
75
    this._trie = new QlobberSub(QLOBBER_OPTIONS)
690✔
76
  }
690✔
77

465✔
78
  async setup (broker) {
465✔
79
    this.broker = broker
690✔
80
    if (this.#broadcastSubscriptions) {
690✔
81
      this.broadcast = new BroadcastPersistence(broker, this._trie)
276✔
82
      await this.broadcast.brokerSubscribe()
276✔
83
    }
276✔
84
  }
690✔
85

465✔
86
  async storeRetained (pkt) {
465✔
87
    const packet = Object.assign({}, pkt)
15,210✔
88
    if (packet.payload.length === 0) {
15,210✔
89
      this.#retained.delete(packet.topic)
15✔
90
    } else {
15,210✔
91
      this.#retained.set(packet.topic, packet)
15,195✔
92
    }
15,195✔
93
  }
15,210✔
94

465✔
95
  createRetainedStreamCombi (patterns) {
465✔
96
    const iterables = patterns.map((p) => {
30✔
97
      return retainedMessagesByPattern(this.#retained, p)
75✔
98
    })
30✔
99
    return Readable.from(multiIterables(iterables))
30✔
100
  }
30✔
101

465✔
102
  createRetainedStream (pattern) {
465✔
103
    return Readable.from(retainedMessagesByPattern(this.#retained, pattern))
150✔
104
  }
150✔
105

465✔
106
  async addSubscriptions (client, subs) {
465✔
107
    let stored = this.#subscriptions.get(client.id)
405✔
108
    const trie = this._trie
405✔
109

405✔
110
    if (!stored) {
405✔
111
      stored = new Map()
300✔
112
      this.#subscriptions.set(client.id, stored)
300✔
113
      this.#clientsCount++
300✔
114
    }
300✔
115

405✔
116
    for (const sub of subs) {
405✔
117
      const storedSub = stored.get(sub.topic)
690✔
118
      if (sub.qos > 0) {
690✔
119
        trie.add(sub.topic, {
510✔
120
          clientId: client.id,
510✔
121
          topic: sub.topic,
510✔
122
          qos: sub.qos,
510✔
123
          rh: sub.rh,
510✔
124
          rap: sub.rap,
510✔
125
          nl: sub.nl
510✔
126
        })
510✔
127
      } else if (storedSub?.qos > 0) {
690✔
128
        trie.remove(sub.topic, {
30✔
129
          clientId: client.id,
30✔
130
          topic: sub.topic
30✔
131
        })
30✔
132
      }
30✔
133
      stored.set(sub.topic, { qos: sub.qos, rh: sub.rh, rap: sub.rap, nl: sub.nl })
690✔
134
    }
690✔
135
    if (this.#broadcastSubscriptions) {
405✔
136
      await this.broadcast.addedSubscriptions(client, subs)
162✔
137
    }
162✔
138
  }
405✔
139

465✔
140
  async removeSubscriptions (client, subs) {
465✔
141
    const stored = this.#subscriptions.get(client.id)
225✔
142
    const trie = this._trie
225✔
143

225✔
144
    if (stored) {
225✔
145
      for (const topic of subs) {
210✔
146
        const storedSub = stored.get(topic)
210✔
147
        if (storedSub !== undefined) {
210✔
148
          if (storedSub.qos > 0) {
180✔
149
            trie.remove(topic, { clientId: client.id, topic })
135✔
150
          }
135✔
151
          stored.delete(topic)
180✔
152
        }
180✔
153
      }
210✔
154

210✔
155
      if (stored.size === 0) {
210✔
156
        this.#clientsCount--
75✔
157
        this.#subscriptions.delete(client.id)
75✔
158
      }
75✔
159
    }
210✔
160
    if (this.#broadcastSubscriptions) {
225✔
161
      await this.broadcast.removedSubscriptions(client, subs)
90✔
162
    }
90✔
163
  }
225✔
164

465✔
165
  async subscriptionsByClient (client) {
465✔
166
    const subs = []
210✔
167
    const stored = this.#subscriptions.get(client.id)
210✔
168
    if (stored) {
210✔
169
      for (const [topic, storedSub] of stored) {
180✔
170
        subs.push({ topic, ...storedSub })
255✔
171
      }
255✔
172
    }
180✔
173
    return subs
210✔
174
  }
210✔
175

465✔
176
  async countOffline () {
465✔
177
    return { subsCount: this._trie.subscriptionsCount, clientsCount: this.#clientsCount }
330✔
178
  }
330✔
179

465✔
180
  async subscriptionsByTopic (pattern) {
465✔
181
    return this._trie.match(pattern)
195✔
182
  }
195✔
183

465✔
184
  async cleanSubscriptions (client) {
465✔
185
    const trie = this._trie
30✔
186
    const stored = this.#subscriptions.get(client.id)
30✔
187

30✔
188
    if (stored) {
30✔
189
      for (const [topic, storedSub] of stored) {
15✔
190
        if (storedSub.qos > 0) {
30✔
191
          trie.remove(topic, { clientId: client.id, topic })
30✔
192
        }
30✔
193
      }
30✔
194

15✔
195
      this.#clientsCount--
15✔
196
      this.#subscriptions.delete(client.id)
15✔
197
    }
15✔
198
  }
30✔
199

465✔
200
  #outgoingEnqueuePerSub (sub, packet) {
465✔
201
    const id = sub.clientId
225✔
202
    const queue = getMapRef(this.#outgoing, id, [], CREATE_ON_EMPTY)
225✔
203
    queue[queue.length] = new Packet(packet)
225✔
204
  }
225✔
205

465✔
206
  async outgoingEnqueue (sub, packet) {
465✔
207
    this.#outgoingEnqueuePerSub(sub, packet)
75✔
208
  }
75✔
209

465✔
210
  async outgoingEnqueueCombi (subs, packet) {
465✔
211
    for (let i = 0; i < subs.length; i++) {
135✔
212
      this.#outgoingEnqueuePerSub(subs[i], packet)
150✔
213
    }
150✔
214
  }
135✔
215

465✔
216
  async outgoingUpdate (client, packet) {
465✔
217
    const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
195✔
218

195✔
219
    let temp
195✔
220
    for (let i = 0; i < outgoing.length; i++) {
195✔
221
      temp = outgoing[i]
270✔
222
      if (temp.brokerId === packet.brokerId) {
270✔
223
        if (temp.brokerCounter === packet.brokerCounter) {
255✔
224
          temp.messageId = packet.messageId
180✔
225
          return
180✔
226
        }
180✔
227
        /*
255✔
228
                Maximum of messageId (packet identifier) is 65535 and will be rotated,
255✔
229
                brokerCounter is to ensure the packet identifier be unique.
255✔
230
                The for loop is going to search which packet messageId should be updated
255✔
231
                in the #outgoing queue.
255✔
232
                If there is a case that brokerCounter is different but messageId is same,
255✔
233
                we need to let the loop keep searching
255✔
234
                */
255✔
235
      } else if (temp.messageId === packet.messageId) {
270✔
236
        outgoing[i] = packet
15✔
237
        return
15✔
238
      }
15✔
239
    }
270✔
NEW
240
    throw new Error('no such packet here')
×
241
  }
195✔
242

465✔
243
  async outgoingClearMessageId (client, packet, cb) {
465✔
244
    const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
60✔
245

60✔
246
    let temp
60✔
247
    for (let i = 0; i < outgoing.length; i++) {
60✔
248
      temp = outgoing[i]
45✔
249
      if (temp.messageId === packet.messageId) {
45✔
250
        outgoing.splice(i, 1)
45✔
251
        return temp
45✔
252
      }
45✔
253
    }
45✔
254
  }
60✔
255

465✔
256
  outgoingStream (client) {
465✔
257
    // shallow clone the outgoing queue for this client to avoid race conditions
225✔
258
    const outgoing = [].concat(getMapRef(this.#outgoing, client.id, []))
225✔
259
    return Readable.from(outgoing)
225✔
260
  }
225✔
261

465✔
262
  async incomingStorePacket (client, packet) {
465✔
263
    const id = client.id
15✔
264
    const store = getMapRef(this.#incoming, id, {}, CREATE_ON_EMPTY)
15✔
265

15✔
266
    store[packet.messageId] = new Packet(packet)
15✔
267
    store[packet.messageId].messageId = packet.messageId
15✔
268
  }
15✔
269

465✔
270
  async incomingGetPacket (client, packet) {
465✔
271
    const id = client.id
30✔
272
    const store = getMapRef(this.#incoming, id, {})
30✔
273

30✔
274
    this.#incoming.set(id, store)
30✔
275

30✔
276
    if (!store[packet.messageId]) {
30✔
277
      throw new Error('no such packet')
15✔
278
    }
15✔
279
    return store[packet.messageId]
15✔
280
  }
30✔
281

465✔
282
  async incomingDelPacket (client, packet) {
465✔
283
    const id = client.id
15✔
284
    const store = getMapRef(this.#incoming, id, {})
15✔
285
    const toDelete = store[packet.messageId]
15✔
286

15✔
287
    if (!toDelete) {
15!
NEW
288
      throw new Error('no such packet')
×
NEW
289
    }
×
290
    delete store[packet.messageId]
15✔
291
  }
15✔
292

465✔
293
  async putWill (client, packet) {
465✔
294
    packet.brokerId = this.broker.id
75✔
295
    packet.clientId = client.id
75✔
296
    this.#wills.set(client.id, packet)
75✔
297
  }
75✔
298

465✔
299
  async getWill (client) {
465✔
300
    return this.#wills.get(client.id)
30✔
301
  }
30✔
302

465✔
303
  async delWill (client) {
465✔
304
    const will = this.#wills.get(client.id)
60✔
305
    this.#wills.delete(client.id)
60✔
306
    return will
60✔
307
  }
60✔
308

465✔
309
  streamWill (brokers = {}) {
465✔
310
    return Readable.from(willsByBrokers(this.#wills, brokers))
30✔
311
  }
30✔
312

465✔
313
  getClientList (topic) {
465✔
314
    return Readable.from(clientListbyTopic(this.#subscriptions, topic))
30✔
315
  }
30✔
316

465✔
317
  async destroy () {
465✔
318
    if (this.#destroyed) {
690!
NEW
319
      throw new Error('destroyed called twice!')
×
NEW
320
    }
×
321
    this.#destroyed = true
690✔
322
    if (this.#broadcastSubscriptions) {
690✔
323
      await this.broadcast.brokerUnsubscribe()
276✔
324
    }
276✔
325
    this.#retained = null
690✔
326
  }
690✔
327
}
465✔
328

15✔
329
function getMapRef (map, key, ifEmpty, createOnEmpty = false) {
765✔
330
  const value = map.get(key)
765✔
331
  if (value === undefined && createOnEmpty) {
765✔
332
    map.set(key, ifEmpty)
195✔
333
  }
195✔
334
  return value || ifEmpty
765✔
335
}
765✔
336

15✔
337
module.exports = MemoryPersistence
15✔
338
module.exports.Packet = Packet
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc