• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence / 14692580379

27 Apr 2025 01:28PM UTC coverage: 97.491% (+2.9%) from 94.578%
14692580379

push

github

web-flow
Merge pull request #100 from seriousme/c8-for-nyc

chore: update dependencies

204 of 228 branches covered (89.47%)

Branch coverage included in aggregate %.

1894 of 1924 relevant lines covered (98.44%)

134.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.77
/persistence.js
1
const { Readable } = require('stream')
3✔
2
const QlobberSub = require('qlobber/aedes/qlobber-sub')
3✔
3
const { QlobberTrue } = require('qlobber')
3✔
4
const Packet = require('aedes-packet')
3✔
5
const QlobberOpts = {
3✔
6
  wildcard_one: '+',
3✔
7
  wildcard_some: '#',
3✔
8
  separator: '/'
3✔
9
}
3✔
10
const CREATE_ON_EMPTY = true
3✔
11

3✔
12
function * multiIterables (iterables) {
6✔
13
  for (const iter of iterables) {
6✔
14
    yield * iter
12✔
15
  }
12✔
16
}
6✔
17

3✔
18
function * retainedMessagesByPattern (retained, pattern) {
54✔
19
  const qlobber = new QlobberTrue(QlobberOpts)
54✔
20
  qlobber.add(pattern)
54✔
21

54✔
22
  for (const [topic, packet] of retained) {
54✔
23
    if (qlobber.test(topic)) {
48✔
24
      yield packet
42✔
25
    }
42✔
26
  }
48✔
27
}
54✔
28

3✔
29
function * willsByBrokers (wills, brokers) {
12✔
30
  for (const will of wills.values()) {
12✔
31
    if (!brokers[will.brokerId]) {
18✔
32
      yield will
12✔
33
    }
12✔
34
  }
18✔
35
}
12✔
36

3✔
37
function * clientListbyTopic (subscriptions, topic) {
12✔
38
  for (const [clientId, topicMap] of subscriptions) {
12✔
39
    if (topicMap.has(topic)) {
18✔
40
      yield clientId
18✔
41
    }
18✔
42
  }
18✔
43
}
12✔
44

3✔
45
class MemoryPersistence {
86✔
46
  // private class members start with #
169✔
47
  #retained
169✔
48
  #subscriptions
252✔
49
  #outgoing
252✔
50
  #incoming
252✔
51
  #wills
252✔
52
  #clientsCount
252✔
53
  #trie
252✔
54

169✔
55
  constructor () {
169✔
56
    // using Maps for convenience and security (risk on prototype polution)
252✔
57
    // Map ( topic -> packet )
252✔
58
    this.#retained = new Map()
252✔
59
    // Map ( clientId -> Map( topic -> { qos, rh, rap, nl } ))
252✔
60
    this.#subscriptions = new Map()
252✔
61
    // Map ( clientId  > [ packet ] }
252✔
62
    this.#outgoing = new Map()
252✔
63
    // Map ( clientId -> { packetId -> Packet } )
252✔
64
    this.#incoming = new Map()
252✔
65
    // Map( clientId -> will )
252✔
66
    this.#wills = new Map()
252✔
67
    this.#clientsCount = 0
252✔
68
    this.#trie = new QlobberSub(QlobberOpts)
252✔
69
  }
252✔
70

169✔
71
  storeRetained (pkt, cb) {
169✔
72
    const packet = Object.assign({}, pkt)
6,060✔
73
    if (packet.payload.length === 0) {
6,060✔
74
      this.#retained.delete(packet.topic)
6✔
75
    } else {
6,060✔
76
      this.#retained.set(packet.topic, packet)
6,054✔
77
    }
6,054✔
78
    cb(null)
6,060✔
79
  }
6,060✔
80

169✔
81
  createRetainedStreamCombi (patterns) {
169✔
82
    const iterables = patterns.map((p) => {
6✔
83
      return retainedMessagesByPattern(this.#retained, p)
12✔
84
    })
6✔
85
    return Readable.from(multiIterables(iterables))
6✔
86
  }
6✔
87

169✔
88
  createRetainedStream (pattern) {
169✔
89
    return Readable.from(retainedMessagesByPattern(this.#retained, pattern))
42✔
90
  }
42✔
91

169✔
92
  addSubscriptions (client, subs, cb) {
169✔
93
    let stored = this.#subscriptions.get(client.id)
162✔
94
    const trie = this.#trie
162✔
95

162✔
96
    if (!stored) {
162✔
97
      stored = new Map()
120✔
98
      this.#subscriptions.set(client.id, stored)
120✔
99
      this.#clientsCount++
120✔
100
    }
120✔
101

162✔
102
    for (const sub of subs) {
162✔
103
      const storedSub = stored.get(sub.topic)
276✔
104
      if (sub.qos > 0) {
276✔
105
        trie.add(sub.topic, {
204✔
106
          clientId: client.id,
204✔
107
          topic: sub.topic,
204✔
108
          qos: sub.qos,
204✔
109
          rh: sub.rh,
204✔
110
          rap: sub.rap,
204✔
111
          nl: sub.nl
204✔
112
        })
204✔
113
      } else if (storedSub?.qos > 0) {
276✔
114
        trie.remove(sub.topic, {
12✔
115
          clientId: client.id,
12✔
116
          topic: sub.topic
12✔
117
        })
12✔
118
      }
12✔
119
      stored.set(sub.topic, { qos: sub.qos, rh: sub.rh, rap: sub.rap, nl: sub.nl })
276✔
120
    }
276✔
121

162✔
122
    cb(null, client)
162✔
123
  }
162✔
124

169✔
125
  removeSubscriptions (client, subs, cb) {
169✔
126
    const stored = this.#subscriptions.get(client.id)
90✔
127
    const trie = this.#trie
90✔
128

90✔
129
    if (stored) {
90✔
130
      for (const topic of subs) {
84✔
131
        const storedSub = stored.get(topic)
84✔
132
        if (storedSub !== undefined) {
84✔
133
          if (storedSub.qos > 0) {
72✔
134
            trie.remove(topic, { clientId: client.id, topic })
54✔
135
          }
54✔
136
          stored.delete(topic)
72✔
137
        }
72✔
138
      }
84✔
139

84✔
140
      if (stored.size === 0) {
84✔
141
        this.#clientsCount--
30✔
142
        this.#subscriptions.delete(client.id)
30✔
143
      }
30✔
144
    }
84✔
145

90✔
146
    cb(null, client)
90✔
147
  }
90✔
148

169✔
149
  subscriptionsByClient (client, cb) {
169✔
150
    let subs = null
84✔
151
    const stored = this.#subscriptions.get(client.id)
84✔
152
    if (stored) {
84✔
153
      subs = []
72✔
154
      for (const [topic, storedSub] of stored) {
72✔
155
        subs.push({ topic, ...storedSub })
102✔
156
      }
102✔
157
    }
72✔
158
    cb(null, subs, client)
84✔
159
  }
84✔
160

169✔
161
  countOffline (cb) {
169✔
162
    return cb(null, this.#trie.subscriptionsCount, this.#clientsCount)
132✔
163
  }
132✔
164

169✔
165
  subscriptionsByTopic (pattern, cb) {
169✔
166
    cb(null, this.#trie.match(pattern))
78✔
167
  }
78✔
168

169✔
169
  cleanSubscriptions (client, cb) {
169✔
170
    const trie = this.#trie
12✔
171
    const stored = this.#subscriptions.get(client.id)
12✔
172

12✔
173
    if (stored) {
12✔
174
      for (const [topic, storedSub] of stored) {
6✔
175
        if (storedSub.qos > 0) {
12✔
176
          trie.remove(topic, { clientId: client.id, topic })
12✔
177
        }
12✔
178
      }
12✔
179

6✔
180
      this.#clientsCount--
6✔
181
      this.#subscriptions.delete(client.id)
6✔
182
    }
6✔
183

12✔
184
    cb(null, client)
12✔
185
  }
12✔
186

169✔
187
  #outgoingEnqueuePerSub (sub, packet) {
169✔
188
    const id = sub.clientId
90✔
189
    const queue = getMapRef(this.#outgoing, id, [], CREATE_ON_EMPTY)
90✔
190
    queue[queue.length] = new Packet(packet)
90✔
191
  }
90✔
192

169✔
193
  outgoingEnqueue (sub, packet, cb) {
169✔
194
    this.#outgoingEnqueuePerSub(sub, packet)
30✔
195
    process.nextTick(cb)
30✔
196
  }
30✔
197

169✔
198
  outgoingEnqueueCombi (subs, packet, cb) {
169✔
199
    for (let i = 0; i < subs.length; i++) {
54✔
200
      this.#outgoingEnqueuePerSub(subs[i], packet)
60✔
201
    }
60✔
202
    process.nextTick(cb)
54✔
203
  }
54✔
204

169✔
205
  outgoingUpdate (client, packet, cb) {
169✔
206
    const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
78✔
207

78✔
208
    let temp
78✔
209
    for (let i = 0; i < outgoing.length; i++) {
78✔
210
      temp = outgoing[i]
108✔
211
      if (temp.brokerId === packet.brokerId) {
108✔
212
        if (temp.brokerCounter === packet.brokerCounter) {
102✔
213
          temp.messageId = packet.messageId
72✔
214
          return cb(null, client, packet)
72✔
215
        }
72✔
216
        /*
102✔
217
                Maximum of messageId (packet identifier) is 65535 and will be rotated,
102✔
218
                brokerCounter is to ensure the packet identifier be unique.
102✔
219
                The for loop is going to search which packet messageId should be updated
102✔
220
                in the #outgoing queue.
102✔
221
                If there is a case that brokerCounter is different but messageId is same,
102✔
222
                we need to let the loop keep searching
102✔
223
                */
102✔
224
      } else if (temp.messageId === packet.messageId) {
108✔
225
        outgoing[i] = packet
6✔
226
        return cb(null, client, packet)
6✔
227
      }
6✔
228
    }
108✔
229

×
230
    cb(new Error('no such packet'), client, packet)
×
231
  }
78✔
232

169✔
233
  outgoingClearMessageId (client, packet, cb) {
169✔
234
    const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
24✔
235

24✔
236
    let temp
24✔
237
    for (let i = 0; i < outgoing.length; i++) {
24✔
238
      temp = outgoing[i]
18✔
239
      if (temp.messageId === packet.messageId) {
18✔
240
        outgoing.splice(i, 1)
18✔
241
        return cb(null, temp)
18✔
242
      }
18✔
243
    }
18✔
244

6✔
245
    cb()
6✔
246
  }
24✔
247

169✔
248
  outgoingStream (client) {
169✔
249
    // shallow clone the outgoing queue for this client to avoid race conditions
90✔
250
    const outgoing = [].concat(getMapRef(this.#outgoing, client.id, []))
90✔
251
    return Readable.from(outgoing)
90✔
252
  }
90✔
253

169✔
254
  incomingStorePacket (client, packet, cb) {
169✔
255
    const id = client.id
6✔
256
    const store = getMapRef(this.#incoming, id, {}, CREATE_ON_EMPTY)
6✔
257

6✔
258
    store[packet.messageId] = new Packet(packet)
6✔
259
    store[packet.messageId].messageId = packet.messageId
6✔
260

6✔
261
    cb(null)
6✔
262
  }
6✔
263

169✔
264
  incomingGetPacket (client, packet, cb) {
169✔
265
    const id = client.id
12✔
266
    const store = getMapRef(this.#incoming, id, {})
12✔
267
    let err = null
12✔
268

12✔
269
    this.#incoming.set(id, store)
12✔
270

12✔
271
    if (!store[packet.messageId]) {
12✔
272
      err = new Error('no such packet')
6✔
273
    }
6✔
274

12✔
275
    cb(err, store[packet.messageId])
12✔
276
  }
12✔
277

169✔
278
  incomingDelPacket (client, packet, cb) {
169✔
279
    const id = client.id
6✔
280
    const store = getMapRef(this.#incoming, id, {})
6✔
281
    const toDelete = store[packet.messageId]
6✔
282
    let err = null
6✔
283

6✔
284
    if (!toDelete) {
6!
285
      err = new Error('no such packet')
×
286
    } else {
6✔
287
      delete store[packet.messageId]
6✔
288
    }
6✔
289

6✔
290
    cb(err)
6✔
291
  }
6✔
292

169✔
293
  putWill (client, packet, cb) {
169✔
294
    packet.brokerId = this.broker.id
30✔
295
    packet.clientId = client.id
30✔
296
    this.#wills.set(client.id, packet)
30✔
297
    cb(null, client)
30✔
298
  }
30✔
299

169✔
300
  getWill (client, cb) {
169✔
301
    cb(null, this.#wills.get(client.id), client)
12✔
302
  }
12✔
303

169✔
304
  delWill (client, cb) {
169✔
305
    const will = this.#wills.get(client.id)
24✔
306
    this.#wills.delete(client.id)
24✔
307
    cb(null, will, client)
24✔
308
  }
24✔
309

169✔
310
  streamWill (brokers = {}) {
169✔
311
    return Readable.from(willsByBrokers(this.#wills, brokers))
12✔
312
  }
12✔
313

169✔
314
  getClientList (topic) {
169✔
315
    return Readable.from(clientListbyTopic(this.#subscriptions, topic))
12✔
316
  }
12✔
317

169✔
318
  destroy (cb) {
169✔
319
    this.#retained = null
252✔
320
    if (cb) {
252✔
321
      cb(null)
252✔
322
    }
252✔
323
  }
252✔
324
}
169✔
325

3✔
326
function getMapRef (map, key, ifEmpty, createOnEmpty = false) {
306✔
327
  const value = map.get(key)
306✔
328
  if (value === undefined && createOnEmpty) {
306✔
329
    map.set(key, ifEmpty)
78✔
330
  }
78✔
331
  return value || ifEmpty
306✔
332
}
306✔
333

3✔
334
module.exports = () => { return new MemoryPersistence() }
3✔
335
module.exports.Packet = Packet
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc