• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence-mongodb / 14953586204

11 May 2025 07:31AM UTC coverage: 98.074%. First build
14953586204

Pull #83

github

web-flow
Merge 477fcaa44 into 9e2eb6856
Pull Request #83: chore: upgrade to mongodb@6

87 of 92 branches covered (94.57%)

Branch coverage included in aggregate %.

522 of 529 new or added lines in 2 files covered. (98.68%)

524 of 531 relevant lines covered (98.68%)

307.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.05
/asyncPersistence.js
1
'use strict'
4✔
2

4✔
3
const regEscape = require('escape-string-regexp')
4✔
4
const CachedPersistence = require('aedes-cached-persistence')
4✔
5
const Packet = CachedPersistence.Packet
4✔
6
const { MongoClient } = require('mongodb')
4✔
7
const { Qlobber } = require('qlobber')
4✔
8
const qlobberOpts = {
4✔
9
  separator: '/',
4✔
10
  wildcard_one: '+',
4✔
11
  wildcard_some: '#',
4✔
12
  match_empty_levels: true
4✔
13
}
4✔
14

4✔
15
class AsyncMongoPersistence {
4✔
16
  constructor (opts = {}) {
4✔
17
    opts.ttl = opts.ttl || {}
118✔
18

118✔
19
    if (typeof opts.ttl.packets === 'number') {
118✔
20
      const ttl = opts.ttl.packets
10✔
21
      opts.ttl.packets = {
10✔
22
        retained: ttl,
10✔
23
        will: ttl,
10✔
24
        outgoing: ttl,
10✔
25
        incoming: ttl
10✔
26
      }
10✔
27
    }
10✔
28

118✔
29
    this._opts = opts
118✔
30
    this._db = null
118✔
31
    this._cl = null
118✔
32
    this.retainedBulkQueue = [] // used for storing retained packets with ordered bulks
118✔
33
    this.executing = false // used as lock while a bulk is executing
118✔
34
  }
118✔
35

4✔
36
  // setup is called by "set broker" in aedes-abstract-persistence
4✔
37
  async setup () {
4✔
38
    // database already connected
2,144✔
39
    if (this._db) {
2,144✔
40
      return
2,026✔
41
    }
2,026✔
42

118✔
43
    // database already provided in the options
118✔
44
    if (this._opts.db) {
2,144✔
45
      this._db = this._opts.db
8✔
46
    } else {
2,144✔
47
      // connect to the database
110✔
48
      const conn = this._opts.url || 'mongodb://127.0.0.1/aedes'
110!
49
      const options = this._opts.mongoOptions
110✔
50

110✔
51
      const mongoDBclient = new MongoClient(conn, options)
110✔
52
      this._mongoDBclient = mongoDBclient
110✔
53
      const urlParsed = URL.parse(this._opts.url)
110✔
54
      // skip the first / of the pathname if it exists
110✔
55
      const pathname = urlParsed.pathname ? urlParsed.pathname.substring(1) : undefined
110!
56
      const databaseName = this._opts.database || pathname
110✔
57
      this._db = mongoDBclient.db(databaseName)
110✔
58
    }
110✔
59
    const db = this._db
118✔
60
    const subscriptions = db.collection('subscriptions')
118✔
61
    const retained = db.collection('retained')
118✔
62
    const will = db.collection('will')
118✔
63
    const outgoing = db.collection('outgoing')
118✔
64
    const incoming = db.collection('incoming')
118✔
65
    this._cl = {
118✔
66
      subscriptions,
118✔
67
      retained,
118✔
68
      will,
118✔
69
      outgoing,
118✔
70
      incoming
118✔
71
    }
118✔
72

118✔
73
    // drop existing TTL indexes (if exist)
118✔
74
    if (this._opts.dropExistingIndexes) {
2,144✔
75
      const collections = await db.collections()
2✔
76
      for (const collection of collections) {
2✔
77
        const exists = await collection.indexExists('ttl')
12✔
78
        if (exists) {
12✔
79
          await collection.dropIndex('ttl')
10✔
80
        }
10✔
81
      }
12✔
82
    }
2✔
83

118✔
84
    // create indexes
118✔
85
    const createIndex = async (idx) => {
118✔
86
      const indexOpts = { name: idx.name }
524✔
87
      if (typeof idx.expireAfterSeconds === 'number') {
524✔
88
        indexOpts.expireAfterSeconds = idx.expireAfterSeconds
52✔
89
      }
52✔
90
      await this._cl[idx.collection].createIndex(idx.key, indexOpts)
524✔
91
    }
524✔
92

118✔
93
    const indexes = [
118✔
94
      {
118✔
95
        collection: 'outgoing',
118✔
96
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
118✔
97
        name: 'query_clientId_brokerId'
118✔
98
      },
118✔
99
      {
118✔
100
        collection: 'outgoing',
118✔
101
        key: { clientId: 1, 'packet.messageId': 1 },
118✔
102
        name: 'query_clientId_messageId'
118✔
103
      },
118✔
104
      {
118✔
105
        collection: 'incoming',
118✔
106
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
118✔
107
        name: 'query_clientId_brokerId'
118✔
108
      },
118✔
109
      {
118✔
110
        collection: 'incoming',
118✔
111
        key: { clientId: 1, 'packet.messageId': 1 },
118✔
112
        name: 'query_clientId_messageId'
118✔
113
      }
118✔
114
    ]
118✔
115

118✔
116
    if (this._opts.ttl.subscriptions >= 0) {
2,144✔
117
      indexes.push({
12✔
118
        collection: 'subscriptions',
12✔
119
        key: this._opts.ttlAfterDisconnected ? 'disconnected' : 'added',
12✔
120
        name: 'ttl',
12✔
121
        expireAfterSeconds: this._opts.ttl.subscriptions
12✔
122
      })
12✔
123
    }
12✔
124

118✔
125
    if (this._opts.ttl.packets) {
2,144✔
126
      if (this._opts.ttl.packets.retained >= 0) {
10✔
127
        indexes.push({
10✔
128
          collection: 'retained',
10✔
129
          key: 'added',
10✔
130
          name: 'ttl',
10✔
131
          expireAfterSeconds: this._opts.ttl.packets.retained
10✔
132
        })
10✔
133
      }
10✔
134

10✔
135
      if (this._opts.ttl.packets.will >= 0) {
10✔
136
        indexes.push({
10✔
137
          collection: 'will',
10✔
138
          key: 'packet.added',
10✔
139
          name: 'ttl',
10✔
140
          expireAfterSeconds: this._opts.ttl.packets.will
10✔
141
        })
10✔
142
      }
10✔
143

10✔
144
      if (this._opts.ttl.packets.outgoing >= 0) {
10✔
145
        indexes.push({
10✔
146
          collection: 'outgoing',
10✔
147
          key: 'packet.added',
10✔
148
          name: 'ttl',
10✔
149
          expireAfterSeconds: this._opts.ttl.packets.outgoing
10✔
150
        })
10✔
151
      }
10✔
152

10✔
153
      if (this._opts.ttl.packets.incoming >= 0) {
10✔
154
        indexes.push({
10✔
155
          collection: 'incoming',
10✔
156
          key: 'packet.added',
10✔
157
          name: 'ttl',
10✔
158
          expireAfterSeconds: this._opts.ttl.packets.incoming
10✔
159
        })
10✔
160
      }
10✔
161
    }
10✔
162
    // create all indexes in parallel
118✔
163
    await Promise.all(indexes.map(createIndex))
118✔
164

118✔
165
    if (this._opts.ttlAfterDisconnected) {
2,144✔
166
      // To avoid stale subscriptions that might be left behind by broker shutting
4✔
167
      // down while clients were connected, set all to disconnected on startup.
4✔
168
      await this._cl.subscriptions.updateMany({ disconnected: { $exists: false } }, { $currentDate: { disconnected: true } })
4✔
169

4✔
170
      // Handlers for setting and clearing the disconnected timestamp on subscriptions
4✔
171
      this.broker.on('clientReady', (client) => {
4✔
NEW
172
        this._cl.subscriptions.updateMany({ clientId: client.id }, { $unset: { disconnected: true } })
×
173
      })
4✔
174
      this.broker.on('clientDisconnect', (client) => {
4✔
175
        this._cl.subscriptions.updateMany({ clientId: client.id }, { $currentDate: { disconnected: true } })
2✔
176
      })
4✔
177
    }
4✔
178

118✔
179
    // add subscriptions to Trie
118✔
180
    for await (const subscription of subscriptions.find({
118✔
181
      qos: { $gte: 0 }
118✔
182
    })) {
2,144✔
183
      this._trie.add(subscription.topic, subscription)
8✔
184
    }
8✔
185
    // setup is done
118✔
186
  }
2,144✔
187

4✔
188
  async storeRetained (packet) {
4✔
189
    await this.setup()
2,026✔
190
    const { promise, resolve } = promiseWithResolvers()
2,026✔
191
    const queue = this.retainedBulkQueue
2,026✔
192
    const filter = { topic: packet.topic }
2,026✔
193
    const setTTL = this._opts.ttl.packets
2,026✔
194

2,026✔
195
    if (packet.payload.length > 0) {
2,026✔
196
      queue.push({
2,024✔
197
        operation: {
2,024✔
198
          updateOne: {
2,024✔
199
            filter,
2,024✔
200
            update: { $set: decoratePacket(packet, setTTL) },
2,024✔
201
            upsert: true
2,024✔
202
          }
2,024✔
203
        },
2,024✔
204
        resolve
2,024✔
205
      })
2,024✔
206
    } else {
2,026✔
207
      queue.push({
2✔
208
        operation: {
2✔
209
          deleteOne: {
2✔
210
            filter
2✔
211
          }
2✔
212
        },
2✔
213
        resolve
2✔
214
      })
2✔
215
    }
2✔
216
    processRetainedBulk(this)
2,026✔
217
    return promise
2,026✔
218
  }
2,026✔
219

4✔
220
  createRetainedStream (pattern) {
4✔
221
    return this.createRetainedStreamCombi([pattern])
14✔
222
  }
14✔
223

4✔
224
  async * createRetainedStreamCombi (patterns) {
4✔
225
    const regexes = []
16✔
226
    const matcher = new Qlobber(qlobberOpts)
16✔
227

16✔
228
    for (let i = 0; i < patterns.length; i++) {
16✔
229
      matcher.add(patterns[i], true)
18✔
230
      regexes.push(regEscape(patterns[i]).replace(/(\/*#|\\\+).*$/, ''))
18✔
231
    }
18✔
232

16✔
233
    const topic = new RegExp(regexes.join('|'))
16✔
234
    const filter = { topic }
16✔
235
    const exclude = { _id: 0 } // exclude the _id field
16✔
236
    for await (const result of this._cl.retained.find(filter).project(exclude)) {
16✔
237
      const packet = asPacket(result)
14✔
238
      if (matcher.match(packet.topic).length > 0) {
14✔
239
        yield packet
14✔
240
      }
14✔
241
    }
14✔
242
  }
16✔
243

4✔
244
  async addSubscriptions (client, subs) {
4✔
245
    const operations = []
64✔
246
    for (const sub of subs) {
64✔
247
      const subscription = Object.assign({}, sub)
114✔
248
      subscription.clientId = client.id
114✔
249
      operations.push({
114✔
250
        updateOne: {
114✔
251
          filter: {
114✔
252
            clientId: client.id,
114✔
253
            topic: sub.topic
114✔
254
          },
114✔
255
          update: {
114✔
256
            $set: decorateSubscription(subscription, this._opts)
114✔
257
          },
114✔
258
          upsert: true
114✔
259
        }
114✔
260
      })
114✔
261
    }
114✔
262

64✔
263
    await this._cl.subscriptions.bulkWrite(operations)
64✔
264
  }
64✔
265

4✔
266
  async removeSubscriptions (client, subs) {
4✔
267
    const operations = []
32✔
268

32✔
269
    for (const topic of subs) {
32✔
270
      operations.push({
34✔
271
        deleteOne: {
34✔
272
          filter: {
34✔
273
            clientId: client.id,
34✔
274
            topic
34✔
275
          }
34✔
276
        }
34✔
277
      })
34✔
278
    }
34✔
279
    await this._cl.subscriptions.bulkWrite(operations)
32✔
280
  }
32✔
281

4✔
282
  async subscriptionsByClient (client) {
4✔
283
    const filter = { clientId: client.id }
32✔
284
    const exclude = { clientId: false, _id: false } // exclude these fields
32✔
285
    const subs = await this._cl.subscriptions.find(filter).project(exclude).toArray()
32✔
286
    return subs
32✔
287
  }
32✔
288

4✔
289
  async countOffline () {
4✔
290
    const subsCount = this._trie.subscriptionsCount
44✔
291
    const result = await this._cl.subscriptions.aggregate([
44✔
292
      {
44✔
293
        $group: {
44✔
294
          _id: '$clientId'
44✔
295
        }
44✔
296
      }, {
44✔
297
        $count: 'clientsCount'
44✔
298
      }]).toArray()
44✔
299
    const clientsCount = result[0]?.clientsCount || 0
44✔
300
    return { subsCount, clientsCount }
44✔
301
  }
44✔
302

4✔
303
  async destroy () {
4✔
304
    if (this._opts.db) {
118✔
305
      return
8✔
306
    }
8✔
307
    await this._mongoDBclient.close()
110✔
308
  }
118✔
309

4✔
310
  async outgoingEnqueue (sub, packet) {
4✔
311
    return await this.outgoingEnqueueCombi([sub], packet)
12✔
312
  }
12✔
313

4✔
314
  async outgoingEnqueueCombi (subs, packet) {
4✔
315
    if (subs?.length === 0) {
30!
NEW
316
      return packet
×
NEW
317
    }
×
318

30✔
319
    const packets = []
30✔
320
    const newPacket = new Packet(packet)
30✔
321
    const setTTL = this._opts.ttl.packets
30✔
322

30✔
323
    for (const sub of subs) {
30✔
324
      packets.push({
32✔
325
        clientId: sub.clientId,
32✔
326
        packet: decoratePacket(newPacket, setTTL)
32✔
327
      })
32✔
328
    }
32✔
329

30✔
330
    await this._cl.outgoing.insertMany(packets)
30✔
331
  }
30✔
332

4✔
333
  async * outgoingStream (client) {
4✔
334
    for await (const result of this._cl.outgoing.find({ clientId: client.id })) {
30✔
335
      yield asPacket(result)
34✔
336
    }
34✔
337
  }
30✔
338

4✔
339
  async outgoingUpdate (client, packet) {
4✔
340
    if (packet.brokerId) {
26✔
341
      await updateWithMessageId(this._cl, client, packet)
24✔
342
    } else {
26✔
343
      await updatePacket(this._cl, client, packet)
2✔
344
    }
2✔
345
  }
26✔
346

4✔
347
  async outgoingClearMessageId (client, packet) {
4✔
348
    const outgoing = this._cl.outgoing
8✔
349

8✔
350
    const result = await outgoing.findOneAndDelete({
8✔
351
      clientId: client.id,
8✔
352
      'packet.messageId': packet.messageId
8✔
353
    })
8✔
354
    if (!result) {
8✔
355
      return null // packet not found
2✔
356
    }
2✔
357
    return asPacket(result)
6✔
358
  }
8✔
359

4✔
360
  async incomingStorePacket (client, packet) {
4✔
361
    const newPacket = new Packet(packet)
4✔
362
    newPacket.messageId = packet.messageId
4✔
363
    const setTTL = this._opts.ttl.packets
4✔
364

4✔
365
    await this._cl.incoming.insertOne({
4✔
366
      clientId: client.id,
4✔
367
      packet: decoratePacket(newPacket, setTTL)
4✔
368
    })
4✔
369
  }
4✔
370

4✔
371
  async incomingGetPacket (client, packet) {
4✔
372
    const result = await this._cl.incoming.findOne({
4✔
373
      clientId: client.id,
4✔
374
      'packet.messageId': packet.messageId
4✔
375
    })
4✔
376

4✔
377
    if (!result) {
4✔
378
      throw new Error(`packet not found for: ${client}`)
2✔
379
    }
2✔
380

2✔
381
    return asPacket(result)
2✔
382
  }
4✔
383

4✔
384
  async incomingDelPacket (client, packet) {
4✔
385
    await this._cl.incoming.deleteOne({
2✔
386
      clientId: client.id,
2✔
387
      'packet.messageId': packet.messageId
2✔
388
    })
2✔
389
  }
2✔
390

4✔
391
  async putWill (client, packet) {
4✔
392
    const setTTL = this._opts.ttl.packets
12✔
393
    packet.clientId = client.id
12✔
394
    packet.brokerId = this.broker.id
12✔
395
    await this._cl.will.insertOne({
12✔
396
      clientId: client.id,
12✔
397
      packet: decoratePacket(packet, setTTL)
12✔
398
    })
12✔
399
  }
12✔
400

4✔
401
  async getWill (client) {
4✔
402
    const result = await this._cl.will.findOne({
4✔
403
      clientId: client.id
4✔
404
    })
4✔
405
    if (!result) {
4✔
406
      return null // packet not found
2✔
407
    }
2✔
408
    return asPacket(result)
2✔
409
  }
4✔
410

4✔
411
  async delWill (client) {
4✔
412
    const result = await this._cl.will.findOneAndDelete({
8✔
413
      clientId: client.id
8✔
414
    })
8✔
415
    if (!result) {
8!
NEW
416
      return null // packet not found
×
NEW
417
    }
×
418
    return asPacket(result)
8✔
419
  }
8✔
420

4✔
421
  async * streamWill (brokers) {
4✔
422
    const filter = {}
4✔
423

4✔
424
    if (brokers) {
4✔
425
      filter['packet.brokerId'] = { $nin: Object.keys(brokers) }
2✔
426
    }
2✔
427
    for await (const will of this._cl.will.find(filter)) {
4✔
428
      yield asPacket(will)
4✔
429
    }
4✔
430
  }
4✔
431

4✔
432
  async * getClientList (topic) {
4✔
433
    const filter = {}
4✔
434
    if (topic) {
4✔
435
      filter.topic = topic
4✔
436
    }
4✔
437
    for await (const sub of this._cl.subscriptions.find(filter)) {
4✔
438
      yield sub.clientId
6✔
439
    }
6✔
440
  }
4✔
441
}
4✔
442

4✔
443
async function processRetainedBulk (ctx) {
4,052✔
444
  if (!ctx.executing && !ctx._destroyed && ctx.retainedBulkQueue.length > 0) {
4,052✔
445
    ctx.executing = true
2,026✔
446
    const operations = []
2,026✔
447
    const onEnd = []
2,026✔
448

2,026✔
449
    while (ctx.retainedBulkQueue.length) {
2,026✔
450
      const { operation, resolve } = ctx.retainedBulkQueue.shift()
2,026✔
451
      operations.push(operation)
2,026✔
452
      onEnd.push(resolve)
2,026✔
453
    }
2,026✔
454
    // execute operations and ignore the error
2,026✔
455
    await ctx._cl.retained.bulkWrite(operations).catch(() => {})
2,026✔
456
    // resolve all promises
2,026✔
457
    while (onEnd.length) onEnd.shift().call()
2,026✔
458
    // check if we have new packets in queue
2,026✔
459
    ctx.executing = false
2,026✔
460
    // do not await as we run this in background and ignore errors
2,026✔
461
    processRetainedBulk(ctx)
2,026✔
462
  }
2,026✔
463
}
4,052✔
464

4✔
465
function decoratePacket (packet, setTTL) {
2,072✔
466
  if (setTTL) {
2,072✔
467
    packet.added = new Date()
10✔
468
  }
10✔
469
  return packet
2,072✔
470
}
2,072✔
471

4✔
472
function decorateSubscription (sub, opts) {
114✔
473
  if (opts.ttl.subscriptions) {
114✔
474
    sub.added = new Date()
2✔
475
  }
2✔
476
  return sub
114✔
477
}
114✔
478

4✔
479
function asPacket (obj) {
70✔
480
  const packet = obj?.packet || obj
70✔
481
  if (!packet) {
70!
NEW
482
    throw new Error('Invalid packet')
×
NEW
483
  }
×
484
  if (Buffer.isBuffer(packet?.payload?.buffer)) {
70✔
485
    packet.payload = packet.payload.buffer
66✔
486
  }
66✔
487
  return packet
70✔
488
}
70✔
489

4✔
490
async function updateWithMessageId (db, client, packet) {
24✔
491
  await db.outgoing.updateOne({
24✔
492
    clientId: client.id,
24✔
493
    'packet.brokerCounter': packet.brokerCounter,
24✔
494
    'packet.brokerId': packet.brokerId
24✔
495
  }, {
24✔
496
    $set: {
24✔
497
      'packet.messageId': packet.messageId
24✔
498
    }
24✔
499
  })
24✔
500
}
24✔
501

4✔
502
async function updatePacket (db, client, packet) {
2✔
503
  await db.outgoing.updateOne({
2✔
504
    clientId: client.id,
2✔
505
    'packet.messageId': packet.messageId
2✔
506
  }, {
2✔
507
    $set: {
2✔
508
      clientId: client.id,
2✔
509
      packet
2✔
510
    }
2✔
511
  })
2✔
512
}
2✔
513

4✔
514
function promiseWithResolvers () {
2,026✔
515
  // this can be replaced by Promise.withResolvers()in NodeJS >= 22
2,026✔
516
  let res
2,026✔
517
  let rej
2,026✔
518
  const promise = new Promise((resolve, reject) => {
2,026✔
519
    res = resolve
2,026✔
520
    rej = reject
2,026✔
521
  })
2,026✔
522
  return { promise, resolve: res, reject: rej }
2,026✔
523
}
2,026✔
524

4✔
525
module.exports = AsyncMongoPersistence
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc