• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence-mongodb / 14638279046

24 Apr 2025 09:31AM UTC coverage: 64.762%. First build
14638279046

Pull #83

github

web-flow
Merge ee5de9f24 into 9e2eb6856
Pull Request #83: chore: upgrade to mongodb@6

65 of 79 branches covered (82.28%)

Branch coverage included in aggregate %.

402 of 618 new or added lines in 2 files covered. (65.05%)

479 of 761 relevant lines covered (62.94%)

8.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.81
/asyncPersistence.js
1
'use strict'
2✔
2

2✔
3
const regEscape = require('escape-string-regexp')
2✔
4
const CachedPersistence = require('aedes-cached-persistence')
2✔
5
const Packet = CachedPersistence.Packet
2✔
6
const { MongoClient } = require('mongodb')
2✔
7
const { Qlobber } = require('qlobber')
2✔
8
const qlobberOpts = {
2✔
9
  separator: '/',
2✔
10
  wildcard_one: '+',
2✔
11
  wildcard_some: '#',
2✔
12
  match_empty_levels: true
2✔
13
}
2✔
14

2✔
15
class AsyncMongoPersistence {
2✔
16
  constructor (opts = {}) {
2✔
17
    opts.ttl = opts.ttl || {}
34✔
18

34✔
19
    if (typeof opts.ttl.packets === 'number') {
34✔
20
      const ttl = opts.ttl.packets
10✔
21
      opts.ttl.packets = {
10✔
22
        retained: ttl,
10✔
23
        will: ttl,
10✔
24
        outgoing: ttl,
10✔
25
        incoming: ttl
10✔
26
      }
10✔
27
    }
10✔
28

34✔
29
    this._opts = opts
34✔
30
    this._db = null
34✔
31
    this._cl = null
34✔
32
    this.retainedBulkQueue = [] // used for storing retained packets with ordered bulks
34✔
33
    this.executing = false // used as lock while a bulk is executing
34✔
34
  }
34✔
35

2✔
36
  // setup is called by "set broker" in aedes-abstract-persistence
2✔
37
  async setup () {
2✔
38
    // database already connected
40✔
39
    if (this._db) {
40✔
40
      return
6✔
41
    }
6✔
42

34✔
43
    // database already provided in the options
34✔
44
    if (this._opts.db) {
40✔
45
      this._db = this._opts.db
8✔
46
    } else {
40✔
47
      // connect to the database
26✔
48
      const conn = this._opts.url || 'mongodb://127.0.0.1/aedes'
26!
49
      const options = this._opts.mongoOptions
26✔
50

26✔
51
      const mongoDBclient = new MongoClient(conn, options)
26✔
52
      this._mongoDBclient = mongoDBclient
26✔
53
      const urlParsed = URL.parse(this._opts.url)
26✔
54
      // skip the first / of the pathname if it exists
26✔
55
      const pathname = urlParsed.pathname ? urlParsed.pathname.substring(1) : undefined
26!
56
      const databaseName = this._opts.database || pathname
26✔
57
      this._db = mongoDBclient.db(databaseName)
26✔
58
    }
26✔
59
    const db = this._db
34✔
60
    const subscriptions = db.collection('subscriptions')
34✔
61
    const retained = db.collection('retained')
34✔
62
    const will = db.collection('will')
34✔
63
    const outgoing = db.collection('outgoing')
34✔
64
    const incoming = db.collection('incoming')
34✔
65
    this._cl = {
34✔
66
      subscriptions,
34✔
67
      retained,
34✔
68
      will,
34✔
69
      outgoing,
34✔
70
      incoming
34✔
71
    }
34✔
72

34✔
73
    // drop existing TTL indexes (if exist)
34✔
74
    if (this._opts.dropExistingIndexes) {
40✔
75
      const collections = await db.collections()
2✔
76
      for (const collection of collections) {
2✔
77
        const exists = await collection.indexExists('ttl')
12✔
78
        if (exists) {
12✔
79
          await collection.dropIndex('ttl')
10✔
80
        }
10✔
81
      }
12✔
82
    }
2✔
83

34✔
84
    // create indexes
34✔
85
    const createIndex = async (idx) => {
34✔
86
      const indexOpts = { name: idx.name }
188✔
87
      if (typeof idx.expireAfterSeconds === 'number') {
188✔
88
        indexOpts.expireAfterSeconds = idx.expireAfterSeconds
52✔
89
      }
52✔
90
      await this._cl[idx.collection].createIndex(idx.key, indexOpts)
188✔
91
    }
188✔
92

34✔
93
    const indexes = [
34✔
94
      {
34✔
95
        collection: 'outgoing',
34✔
96
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
34✔
97
        name: 'query_clientId_brokerId'
34✔
98
      },
34✔
99
      {
34✔
100
        collection: 'outgoing',
34✔
101
        key: { clientId: 1, 'packet.messageId': 1 },
34✔
102
        name: 'query_clientId_messageId'
34✔
103
      },
34✔
104
      {
34✔
105
        collection: 'incoming',
34✔
106
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
34✔
107
        name: 'query_clientId_brokerId'
34✔
108
      },
34✔
109
      {
34✔
110
        collection: 'incoming',
34✔
111
        key: { clientId: 1, 'packet.messageId': 1 },
34✔
112
        name: 'query_clientId_messageId'
34✔
113
      }
34✔
114
    ]
34✔
115

34✔
116
    if (this._opts.ttl.subscriptions >= 0) {
40✔
117
      indexes.push({
12✔
118
        collection: 'subscriptions',
12✔
119
        key: this._opts.ttlAfterDisconnected ? 'disconnected' : 'added',
12✔
120
        name: 'ttl',
12✔
121
        expireAfterSeconds: this._opts.ttl.subscriptions
12✔
122
      })
12✔
123
    }
12✔
124

34✔
125
    if (this._opts.ttl.packets) {
40✔
126
      if (this._opts.ttl.packets.retained >= 0) {
10✔
127
        indexes.push({
10✔
128
          collection: 'retained',
10✔
129
          key: 'added',
10✔
130
          name: 'ttl',
10✔
131
          expireAfterSeconds: this._opts.ttl.packets.retained
10✔
132
        })
10✔
133
      }
10✔
134

10✔
135
      if (this._opts.ttl.packets.will >= 0) {
10✔
136
        indexes.push({
10✔
137
          collection: 'will',
10✔
138
          key: 'packet.added',
10✔
139
          name: 'ttl',
10✔
140
          expireAfterSeconds: this._opts.ttl.packets.will
10✔
141
        })
10✔
142
      }
10✔
143

10✔
144
      if (this._opts.ttl.packets.outgoing >= 0) {
10✔
145
        indexes.push({
10✔
146
          collection: 'outgoing',
10✔
147
          key: 'packet.added',
10✔
148
          name: 'ttl',
10✔
149
          expireAfterSeconds: this._opts.ttl.packets.outgoing
10✔
150
        })
10✔
151
      }
10✔
152

10✔
153
      if (this._opts.ttl.packets.incoming >= 0) {
10✔
154
        indexes.push({
10✔
155
          collection: 'incoming',
10✔
156
          key: 'packet.added',
10✔
157
          name: 'ttl',
10✔
158
          expireAfterSeconds: this._opts.ttl.packets.incoming
10✔
159
        })
10✔
160
      }
10✔
161
    }
10✔
162
    // create all indexes in parallel
34✔
163
    await Promise.all(indexes.map(createIndex))
34✔
164

34✔
165
    if (this._opts.ttlAfterDisconnected) {
40✔
166
      // To avoid stale subscriptions that might be left behind by broker shutting
4✔
167
      // down while clients were connected, set all to disconnected on startup.
4✔
168
      await this._cl.subscriptions.updateMany({ disconnected: { $exists: false } }, { $currentDate: { disconnected: true } })
4✔
169

4✔
170
      // Handlers for setting and clearing the disconnected timestamp on subscriptions
4✔
171
      this.broker.on('clientReady', (client) => {
4✔
NEW
172
        this._cl.subscriptions.updateMany({ clientId: client.id }, { $unset: { disconnected: true } })
×
173
      })
4✔
174
      this.broker.on('clientDisconnect', (client) => {
4✔
175
        this._cl.subscriptions.updateMany({ clientId: client.id }, { $currentDate: { disconnected: true } })
2✔
176
      })
4✔
177
    }
4✔
178

34✔
179
    // add subscriptions to Trie
34✔
180
    for await (const subscription of subscriptions.find({
34✔
181
      qos: { $gte: 0 }
34✔
182
    })) {
40✔
183
      this._trie.add(subscription.topic, subscription)
2✔
184
    }
2✔
185
    // setup is done
34✔
186
  }
40✔
187

2✔
188
  async storeRetained (packet) {
2✔
189
    await this.setup()
6✔
190
    const { promise, resolve } = promiseWithResolvers()
6✔
191
    const queue = this.retainedBulkQueue
6✔
192
    const filter = { topic: packet.topic }
6✔
193
    const setTTL = this._opts.ttl.packets
6✔
194

6✔
195
    if (packet.payload.length > 0) {
6✔
196
      queue.push({
6✔
197
        operation: {
6✔
198
          updateOne: {
6✔
199
            filter,
6✔
200
            update: { $set: decoratePacket(packet, setTTL) },
6✔
201
            upsert: true
6✔
202
          }
6✔
203
        },
6✔
204
        resolve
6✔
205
      })
6✔
206
    } else {
6!
NEW
207
      queue.push({
×
NEW
208
        operation: {
×
NEW
209
          deleteOne: {
×
NEW
210
            filter
×
NEW
211
          }
×
NEW
212
        },
×
NEW
213
        resolve
×
NEW
214
      })
×
NEW
215
    }
×
216
    processRetainedBulk(this)
6✔
217
    return promise
6✔
218
  }
6✔
219

2✔
220
  createRetainedStream (pattern) {
2✔
NEW
221
    return this.createRetainedStreamCombi([pattern])
×
NEW
222
  }
×
223

2✔
224
  async * createRetainedStreamCombi (patterns) {
2✔
NEW
225
    const regexes = []
×
NEW
226
    const matcher = new Qlobber(qlobberOpts)
×
NEW
227

×
NEW
228
    for (let i = 0; i < patterns.length; i++) {
×
NEW
229
      matcher.add(patterns[i], true)
×
NEW
230
      regexes.push(regEscape(patterns[i]).replace(/(\/*#|\\\+).*$/, ''))
×
NEW
231
    }
×
NEW
232

×
NEW
233
    const topic = new RegExp(regexes.join('|'))
×
NEW
234
    const filter = { topic }
×
NEW
235
    const exclude = { _id: 0 } // exclude the _id field
×
NEW
236
    for await (const result of this._cl.retained.find(filter).project(exclude)) {
×
NEW
237
      const packet = asPacket(result)
×
NEW
238
      if (matcher.match(packet.topic).length > 0) {
×
NEW
239
        yield packet
×
NEW
240
      }
×
NEW
241
    }
×
NEW
242
  }
×
243

2✔
244
  async addSubscriptions (client, subs) {
2✔
245
    const operations = []
10✔
246
    for (const sub of subs) {
10✔
247
      const subscription = Object.assign({}, sub)
22✔
248
      subscription.clientId = client.id
22✔
249
      operations.push({
22✔
250
        updateOne: {
22✔
251
          filter: {
22✔
252
            clientId: client.id,
22✔
253
            topic: sub.topic
22✔
254
          },
22✔
255
          update: {
22✔
256
            $set: decorateSubscription(subscription, this._opts)
22✔
257
          },
22✔
258
          upsert: true
22✔
259
        }
22✔
260
      })
22✔
261
    }
22✔
262

10✔
263
    await this._cl.subscriptions.bulkWrite(operations)
10✔
264
  }
10✔
265

2✔
266
  async removeSubscriptions (client, subs) {
2✔
NEW
267
    const operations = []
×
NEW
268

×
NEW
269
    for (const topic of subs) {
×
NEW
270
      operations.push({
×
NEW
271
        deleteOne: {
×
NEW
272
          filter: {
×
NEW
273
            clientId: client.id,
×
NEW
274
            topic
×
NEW
275
          }
×
NEW
276
        }
×
NEW
277
      })
×
NEW
278
    }
×
NEW
279
    await this._cl.subscriptions.bulkWrite(operations)
×
NEW
280
  }
×
281

2✔
282
  async subscriptionsByClient (client) {
2✔
NEW
283
    const filter = { clientId: client.id }
×
NEW
284
    const exclude = { clientId: false, _id: false } // exclude these fields
×
NEW
285
    const subs = await this._cl.subscriptions.find(filter).project(exclude).toArray()
×
NEW
286
    return subs
×
NEW
287
  }
×
288

2✔
289
  async countOffline () {
2✔
NEW
290
    const subscriptionsCount = this._trie.subscriptionsCount
×
NEW
291
    const result = await this._cl.subscriptions.aggregate([
×
NEW
292
      {
×
NEW
293
        $group: {
×
NEW
294
          _id: '$clientId'
×
NEW
295
        }
×
NEW
296
      }, {
×
NEW
297
        $count: 'clientsCount'
×
NEW
298
      }]).toArray()
×
NEW
299
    const clientsCount = result[0]?.clientsCount || 0
×
NEW
300
    return { subscriptionsCount, clientsCount }
×
NEW
301
  }
×
302

2✔
303
  async destroy () {
2✔
304
    if (this._opts.db) {
34✔
305
      return
8✔
306
    }
8✔
307
    await this._mongoDBclient.close()
26✔
308
  }
34✔
309

2✔
310
  async outgoingEnqueue (sub, packet) {
2✔
311
    return await this.outgoingEnqueueCombi([sub], packet)
2✔
312
  }
2✔
313

2✔
314
  async outgoingEnqueueCombi (subs, packet) {
2✔
315
    if (subs?.length === 0) {
2!
NEW
316
      return packet
×
NEW
317
    }
×
318

2✔
319
    const packets = []
2✔
320
    const newPacket = new Packet(packet)
2✔
321
    const setTTL = this._opts.ttl.packets
2✔
322

2✔
323
    for (const sub of subs) {
2✔
324
      packets.push({
2✔
325
        clientId: sub.clientId,
2✔
326
        packet: decoratePacket(newPacket, setTTL)
2✔
327
      })
2✔
328
    }
2✔
329

2✔
330
    await this._cl.outgoing.insertMany(packets)
2✔
331
  }
2✔
332

2✔
333
  async * outgoingStream (client) {
2✔
NEW
334
    for await (const result of this._cl.outgoing.find({ clientId: client.id })) {
×
NEW
335
      yield asPacket(result)
×
NEW
336
    }
×
NEW
337
  }
×
338

2✔
339
  async outgoingUpdate (client, packet) {
2✔
NEW
340
    if (packet.brokerId) {
×
NEW
341
      await updateWithMessageId(this._cl, client, packet)
×
NEW
342
    } else {
×
NEW
343
      await updatePacket(this._cl, client, packet)
×
NEW
344
    }
×
NEW
345
  }
×
346

2✔
347
  async outgoingClearMessageId (client, packet) {
2✔
NEW
348
    const outgoing = this._cl.outgoing
×
NEW
349

×
NEW
350
    const result = await outgoing.findOneAndDelete({
×
NEW
351
      clientId: client.id,
×
NEW
352
      'packet.messageId': packet.messageId
×
NEW
353
    })
×
NEW
354
    if (!result) {
×
NEW
355
      return null // packet not found
×
NEW
356
    }
×
NEW
357
    return asPacket(result)
×
NEW
358
  }
×
359

2✔
360
  async incomingStorePacket (client, packet) {
2✔
361
    const newPacket = new Packet(packet)
2✔
362
    newPacket.messageId = packet.messageId
2✔
363
    const setTTL = this._opts.ttl.packets
2✔
364

2✔
365
    await this._cl.incoming.insertOne({
2✔
366
      clientId: client.id,
2✔
367
      packet: decoratePacket(newPacket, setTTL)
2✔
368
    })
2✔
369
  }
2✔
370

2✔
371
  async incomingGetPacket (client, packet) {
2✔
NEW
372
    const result = await this._cl.incoming.findOne({
×
NEW
373
      clientId: client.id,
×
NEW
374
      'packet.messageId': packet.messageId
×
NEW
375
    })
×
NEW
376

×
NEW
377
    if (!result) {
×
NEW
378
      throw new Error(`packet not found for: ${client}`)
×
NEW
379
    }
×
NEW
380

×
NEW
381
    return asPacket(result)
×
NEW
382
  }
×
383

2✔
384
  async incomingDelPacket (client, packet) {
2✔
NEW
385
    await this._cl.incoming.deleteOne({
×
NEW
386
      clientId: client.id,
×
NEW
387
      'packet.messageId': packet.messageId
×
NEW
388
    })
×
NEW
389
  }
×
390

2✔
391
  async putWill (client, packet) {
2✔
392
    const setTTL = this._opts.ttl.packets
2✔
393
    packet.clientId = client.id
2✔
394
    packet.brokerId = this.broker.id
2✔
395
    await this._cl.will.insertOne({
2✔
396
      clientId: client.id,
2✔
397
      packet: decoratePacket(packet, setTTL)
2✔
398
    })
2✔
399
  }
2✔
400

2✔
401
  async getWill (client) {
2✔
NEW
402
    const result = await this._cl.will.findOne({
×
NEW
403
      clientId: client.id
×
NEW
404
    })
×
NEW
405
    if (!result) {
×
NEW
406
      return null // packet not found
×
NEW
407
    }
×
NEW
408
    return asPacket(result)
×
NEW
409
  }
×
410

2✔
411
  async delWill (client) {
2✔
NEW
412
    const result = await this._cl.will.findOneAndDelete({
×
NEW
413
      clientId: client.id
×
NEW
414
    })
×
NEW
415
    if (!result) {
×
NEW
416
      return null // packet not found
×
NEW
417
    }
×
NEW
418
    return asPacket(result)
×
NEW
419
  }
×
420

2✔
421
  async * streamWill (brokers) {
2✔
NEW
422
    const filter = {}
×
NEW
423

×
NEW
424
    if (brokers) {
×
NEW
425
      filter['packet.brokerId'] = { $nin: Object.keys(brokers) }
×
NEW
426
    }
×
NEW
427
    for await (const will of this._cl.will.find(filter)) {
×
NEW
428
      yield asPacket(will)
×
NEW
429
    }
×
NEW
430
  }
×
431

2✔
432
  async * getClientList (topic) {
2✔
NEW
433
    const filter = {}
×
NEW
434
    if (topic) {
×
NEW
435
      filter.topic = topic
×
NEW
436
    }
×
NEW
437
    for await (const sub of this._cl.subscriptions.find(filter)) {
×
NEW
438
      yield sub.clientId
×
NEW
439
    }
×
NEW
440
  }
×
441
}
2✔
442

2✔
443
async function processRetainedBulk (ctx) {
12✔
444
  if (!ctx.executing && !ctx._destroyed && ctx.retainedBulkQueue.length > 0) {
12✔
445
    ctx.executing = true
6✔
446
    const operations = []
6✔
447
    const onEnd = []
6✔
448

6✔
449
    while (ctx.retainedBulkQueue.length) {
6✔
450
      const { operation, resolve } = ctx.retainedBulkQueue.shift()
6✔
451
      operations.push(operation)
6✔
452
      onEnd.push(resolve)
6✔
453
    }
6✔
454
    try {
6✔
455
      await ctx._cl.retained.bulkWrite(operations)
6✔
456
    } catch (err) {
6✔
457
      // ingnore error
2✔
458
    }
2✔
459
    // resolve all promises
6✔
460
    while (onEnd.length) onEnd.shift().call()
6✔
461
    // check if we have new packets in queue
6✔
462
    ctx.executing = false
6✔
463
    // do not await as we run this in background and ignore errors
6✔
464
    processRetainedBulk(ctx)
6✔
465
  }
6✔
466
}
12✔
467

2✔
468
function decoratePacket (packet, setTTL) {
12✔
469
  if (setTTL) {
12✔
470
    packet.added = new Date()
10✔
471
  }
10✔
472
  return packet
12✔
473
}
12✔
474

2✔
475
function decorateSubscription (sub, opts) {
22✔
476
  if (opts.ttl.subscriptions) {
22✔
477
    sub.added = new Date()
2✔
478
  }
2✔
479
  return sub
22✔
480
}
22✔
481

2✔
NEW
482
function asPacket (obj) {
×
NEW
483
  const packet = obj?.packet || obj
×
NEW
484
  if (!packet) {
×
NEW
485
    throw new Error('Invalid packet')
×
NEW
486
  }
×
NEW
487
  if (Buffer.isBuffer(packet?.payload?.buffer)) {
×
NEW
488
    packet.payload = packet.payload.buffer
×
NEW
489
  }
×
NEW
490
  return packet
×
NEW
491
}
×
492

2✔
NEW
493
async function updateWithMessageId (db, client, packet) {
×
NEW
494
  await db.outgoing.updateOne({
×
NEW
495
    clientId: client.id,
×
NEW
496
    'packet.brokerCounter': packet.brokerCounter,
×
NEW
497
    'packet.brokerId': packet.brokerId
×
NEW
498
  }, {
×
NEW
499
    $set: {
×
NEW
500
      'packet.messageId': packet.messageId
×
NEW
501
    }
×
NEW
502
  })
×
NEW
503
}
×
504

2✔
NEW
505
async function updatePacket (db, client, packet) {
×
NEW
506
  await db.outgoing.updateOne({
×
NEW
507
    clientId: client.id,
×
NEW
508
    'packet.messageId': packet.messageId
×
NEW
509
  }, {
×
NEW
510
    $set: {
×
NEW
511
      clientId: client.id,
×
NEW
512
      packet
×
NEW
513
    }
×
NEW
514
  })
×
NEW
515
}
×
516

2✔
517
function promiseWithResolvers () {
6✔
518
  // this can be replaced by Promise.withResolvers()in NodeJS >= 22
6✔
519
  let res
6✔
520
  let rej
6✔
521
  const promise = new Promise((resolve, reject) => {
6✔
522
    res = resolve
6✔
523
    rej = reject
6✔
524
  })
6✔
525
  return { promise, resolve: res, reject: rej }
6✔
526
}
6✔
527

2✔
528
module.exports = AsyncMongoPersistence
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc