• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence-mongodb / 21434178024

28 Jan 2026 10:15AM UTC coverage: 97.635% (+0.2%) from 97.406%
21434178024

Pull #89

github

web-flow
Merge 8993fdbca into 1459a4f24
Pull Request #89: fix: prevent "regular expression is too large" mongodb error when fetching retained packets

107 of 114 branches covered (93.86%)

Branch coverage included in aggregate %.

138 of 138 new or added lines in 1 file covered. (100.0%)

6 existing lines in 1 file now uncovered.

636 of 647 relevant lines covered (98.3%)

316.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.61
/asyncPersistence.js
1
'use strict'
4✔
2

4✔
3
const regEscape = require('escape-string-regexp')
4✔
4
const Packet = require('aedes-persistence').Packet
4✔
5
const BroadcastPersistence = require('aedes-persistence/broadcastPersistence.js')
4✔
6
const { MongoClient } = require('mongodb')
4✔
7
const { Qlobber } = require('qlobber')
4✔
8
const QlobberSub = require('qlobber/aedes/qlobber-sub')
4✔
9
const QLOBBER_OPTIONS = {
4✔
10
  separator: '/',
4✔
11
  wildcard_one: '+',
4✔
12
  wildcard_some: '#',
4✔
13
  match_empty_levels: true
4✔
14
}
4✔
15

4✔
16
// Batching limits for retained message pattern queries
4✔
17
// MongoDB has a BSON document size limit of 16MB, but regex patterns can hit
4✔
18
// practical compilation/execution limits around 32KB depending on the driver.
4✔
19
// These conservative values prevent "regular expression is too large" errors
4✔
20
// while still allowing efficient batch processing of large pattern sets.
4✔
21
//
4✔
22
// Research notes:
4✔
23
// - MongoDB regex size is limited by BSON document size and regex compilation
4✔
24
// - Practical regex limit in MongoDB is typically around 32KB
4✔
25
// - After escaping, MQTT wildcards (#, +) are replaced, reducing final regex size
4✔
26
// - Joining patterns with '|' adds minimal overhead ((n-1) characters)
4✔
27
//
4✔
28
// Current values are conservative to ensure compatibility across different
4✔
29
// MongoDB versions and deployment configurations. They can be increased if needed:
4✔
30
// - MAX_PATTERNS_PER_BATCH could be 100-200 for most use cases
4✔
31
// - MAX_TOTAL_PATTERN_LENGTH could be 15000-20000 (still well under 32KB limit)
4✔
32
const MAX_PATTERNS_PER_BATCH = 50
4✔
33
const MAX_TOTAL_PATTERN_LENGTH = 5000
4✔
34

4✔
35
class AsyncMongoPersistence {
72✔
36
  // private class members start with #
140✔
37
  #trie
140✔
38
  #destroyed
140✔
39
  #broker
140✔
40
  #opts
140✔
41
  #db
140✔
42
  #mongoDBclient
140✔
43
  #cl
140✔
44
  #broadcast
140✔
45
  #retainedBulkQueue
140✔
46
  #executing
140✔
47

140✔
48
  constructor (opts = {}) {
140✔
49
    this.#trie = new QlobberSub(QLOBBER_OPTIONS) // used to match packets
140✔
50
    opts.ttl = opts.ttl || {}
140✔
51

140✔
52
    if (typeof opts.ttl.packets === 'number') {
140✔
53
      const ttl = opts.ttl.packets
10✔
54
      opts.ttl.packets = {
10✔
55
        retained: ttl,
10✔
56
        will: ttl,
10✔
57
        outgoing: ttl,
10✔
58
        incoming: ttl
10✔
59
      }
10✔
60
    }
10✔
61

140✔
62
    this.#opts = opts
140✔
63
    this.#db = null
140✔
64
    this.#cl = null
140✔
65
    this.#destroyed = false
140✔
66
    this.#retainedBulkQueue = [] // used for storing retained packets with ordered bulks
140✔
67
    this.#executing = false // used as lock while a bulk is executing
140✔
68
  }
140✔
69

140✔
70
  #addTTLIndexes (indexes) {
140✔
71
    const addTTLIndex = (collection, key, expireAfterSeconds) => {
140✔
72
      if (expireAfterSeconds >= 0) {
52✔
73
        indexes.push({ collection, key, name: 'ttl', expireAfterSeconds })
52✔
74
      }
52✔
75
    }
52✔
76

140✔
77
    if (this.#opts.ttl.subscriptions >= 0) {
140✔
78
      addTTLIndex(
12✔
79
        'subscriptions',
12✔
80
        this.#opts.ttlAfterDisconnected ? 'disconnected' : 'added',
12✔
81
        this.#opts.ttl.subscriptions
12✔
82
      )
12✔
83
    }
12✔
84

140✔
85
    if (this.#opts.ttl.packets) {
140✔
86
      addTTLIndex('retained', 'added', this.#opts.ttl.packets.retained)
10✔
87
      addTTLIndex('will', 'packet.added', this.#opts.ttl.packets.will)
10✔
88
      addTTLIndex('outgoing', 'packet.added', this.#opts.ttl.packets.outgoing)
10✔
89
      addTTLIndex('incoming', 'packet.added', this.#opts.ttl.packets.incoming)
10✔
90
    }
10✔
91
  }
140✔
92

140✔
93
  // access #broker, only for testing
140✔
94
  get broker () {
140✔
95
    return this.#broker
4,182✔
96
  }
4,182✔
97

140✔
98
  // access #db, only for testing
140✔
99
  get _db () {
140✔
100
    return this.#db
14✔
101
  }
14✔
102

140✔
103
  // access #mongoDBclient, only for testing
140✔
104
  get _mongoDBclient () {
140✔
105
    return this.#mongoDBclient
2✔
106
  }
2✔
107

140✔
108
  // setup is called by aedes-persistence/callbackPersistence.js
140✔
109
  async setup (broker) {
140✔
110
    this.#broker = broker
140✔
111

140✔
112
    // database already connected
140✔
113
    if (this.#db) {
140!
UNCOV
114
      return
×
115
    }
×
116

140✔
117
    // database already provided in the options
140✔
118
    if (this.#opts.db) {
140✔
119
      this.#db = this.#opts.db
8✔
120
    } else {
140✔
121
      // connect to the database
132✔
122
      const conn = this.#opts.url || 'mongodb://127.0.0.1/aedes'
132!
123
      const options = this.#opts.mongoOptions
132✔
124

132✔
125
      const mongoDBclient = new MongoClient(conn, options)
132✔
126
      this.#mongoDBclient = mongoDBclient
132✔
127
      const urlParsed = URL.parse(this.#opts.url)
132✔
128
      // skip the first / of the pathname if it exists
132✔
129
      const pathname = urlParsed.pathname ? urlParsed.pathname.substring(1) : undefined
132!
130
      const databaseName = this.#opts.database || pathname
132✔
131
      this.#db = mongoDBclient.db(databaseName)
132✔
132
    }
132✔
133

140✔
134
    const collectionPrefix = `${this.#opts.collectionPrefix || ''}`
140✔
135

140✔
136
    const db = this.#db
140✔
137
    const subscriptions = db.collection(`${collectionPrefix}subscriptions`)
140✔
138
    const retained = db.collection(`${collectionPrefix}retained`)
140✔
139
    const will = db.collection(`${collectionPrefix}will`)
140✔
140
    const outgoing = db.collection(`${collectionPrefix}outgoing`)
140✔
141
    const incoming = db.collection(`${collectionPrefix}incoming`)
140✔
142
    this.#cl = {
140✔
143
      subscriptions,
140✔
144
      retained,
140✔
145
      will,
140✔
146
      outgoing,
140✔
147
      incoming
140✔
148
    }
140✔
149

140✔
150
    // drop existing TTL indexes (if exist)
140✔
151
    if (this.#opts.dropExistingIndexes) {
140✔
152
      const collections = await db.collections()
2✔
153
      for (const collection of collections) {
2✔
154
        const exists = await collection.indexExists('ttl')
12✔
155
        if (exists) {
12✔
156
          await collection.dropIndex('ttl')
10✔
157
        }
10✔
158
      }
12✔
159
    }
2✔
160

140✔
161
    // create indexes
140✔
162
    const createIndex = async (idx) => {
140✔
163
      const indexOpts = { name: idx.name }
612✔
164
      if (typeof idx.expireAfterSeconds === 'number') {
612✔
165
        indexOpts.expireAfterSeconds = idx.expireAfterSeconds
52✔
166
      }
52✔
167
      await this.#cl[idx.collection].createIndex(idx.key, indexOpts)
612✔
168
    }
612✔
169

140✔
170
    const indexes = [
140✔
171
      {
140✔
172
        collection: 'outgoing',
140✔
173
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
140✔
174
        name: 'query_clientId_brokerId'
140✔
175
      },
140✔
176
      {
140✔
177
        collection: 'outgoing',
140✔
178
        key: { clientId: 1, 'packet.messageId': 1 },
140✔
179
        name: 'query_clientId_messageId'
140✔
180
      },
140✔
181
      {
140✔
182
        collection: 'incoming',
140✔
183
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
140✔
184
        name: 'query_clientId_brokerId'
140✔
185
      },
140✔
186
      {
140✔
187
        collection: 'incoming',
140✔
188
        key: { clientId: 1, 'packet.messageId': 1 },
140✔
189
        name: 'query_clientId_messageId'
140✔
190
      }
140✔
191
    ]
140✔
192

140✔
193
    // Add TTL indexes
140✔
194
    this.#addTTLIndexes(indexes)
140✔
195
    // create all indexes in parallel
140✔
196
    await Promise.all(indexes.map(createIndex))
140✔
197

140✔
198
    if (this.#opts.ttlAfterDisconnected) {
140✔
199
      // To avoid stale subscriptions that might be left behind by broker shutting
4✔
200
      // down while clients were connected, set all to disconnected on startup.
4✔
201
      await this.#cl.subscriptions.updateMany({ disconnected: { $exists: false } }, { $currentDate: { disconnected: true } })
4✔
202

4✔
203
      // Handlers for setting and clearing the disconnected timestamp on subscriptions
4✔
204
      this.#broker.on('clientReady', (client) => {
4✔
UNCOV
205
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $unset: { disconnected: true } })
×
206
      })
4✔
207
      this.#broker.on('clientDisconnect', (client) => {
4✔
208
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $currentDate: { disconnected: true } })
2✔
209
      })
4✔
210
    }
4✔
211

140✔
212
    // add subscriptions to Trie
140✔
213
    for await (const subscription of subscriptions.find({
140✔
214
      qos: { $gte: 0 }
140✔
215
    })) {
140✔
216
      this.#trie.add(subscription.topic, subscription)
2✔
217
    }
2✔
218
    // subscribe to the broker for subscription updates
140✔
219
    this.#broadcast = new BroadcastPersistence(broker, this.#trie)
140✔
220
    await this.#broadcast.brokerSubscribe()
140✔
221
    // setup is done
140✔
222
  }
140✔
223

140✔
224
  async processRetainedBulk () {
140✔
225
    if (!this.#executing && !this.#destroyed && this.#retainedBulkQueue.length > 0) {
4,994✔
226
      this.#executing = true
2,496✔
227
      const operations = []
2,496✔
228
      const onEnd = []
2,496✔
229

2,496✔
230
      while (this.#retainedBulkQueue.length) {
2,496✔
231
        const { operation, resolve } = this.#retainedBulkQueue.shift()
2,496✔
232
        operations.push(operation)
2,496✔
233
        onEnd.push(resolve)
2,496✔
234
      }
2,496✔
235
      // execute operations and ignore the error
2,496✔
236
      await this.#cl.retained.bulkWrite(operations).catch(() => { })
2,496✔
237
      // resolve all promises
2,496✔
238
      while (onEnd.length) onEnd.shift().call()
2,496✔
239
      // check if we have new packets in queue
2,496✔
240
      this.#executing = false
2,496✔
241
      // do not await as we run this in background and ignore errors
2,496✔
242
      this.processRetainedBulk()
2,496✔
243
    }
2,496✔
244
    if (this.#destroyed) {
4,994✔
245
      // cleanup dangling promises
2✔
246
      while (this.#retainedBulkQueue.length) {
2✔
247
        const { resolve } = this.#retainedBulkQueue.shift()
2✔
248
        resolve() // resolve all promises
2✔
249
      }
2✔
250
    }
2✔
251
  }
4,994✔
252

140✔
253
  async storeRetained (packet) {
140✔
254
    const { promise, resolve } = promiseWithResolvers()
2,498✔
255
    const queue = this.#retainedBulkQueue
2,498✔
256
    const filter = { topic: packet.topic }
2,498✔
257

2,498✔
258
    if (packet.payload.length > 0) {
2,498✔
259
      queue.push({
2,496✔
260
        operation: {
2,496✔
261
          updateOne: {
2,496✔
262
            filter,
2,496✔
263
            update: { $set: decoratePacket(packet, this.#opts.ttl.packets) },
2,496✔
264
            upsert: true
2,496✔
265
          }
2,496✔
266
        },
2,496✔
267
        resolve
2,496✔
268
      })
2,496✔
269
    } else {
2,498✔
270
      queue.push({
2✔
271
        operation: {
2✔
272
          deleteOne: {
2✔
273
            filter
2✔
274
          }
2✔
275
        },
2✔
276
        resolve
2✔
277
      })
2✔
278
    }
2✔
279
    this.processRetainedBulk()
2,498✔
280
    return promise
2,498✔
281
  }
2,498✔
282

140✔
283
  createRetainedStream (pattern) {
140✔
284
    return this.createRetainedStreamCombi([pattern])
20✔
285
  }
20✔
286

140✔
287
  async * createRetainedStreamCombi (patterns) {
140✔
288
    const matcher = new Qlobber(QLOBBER_OPTIONS)
36✔
289

36✔
290
    for (let i = 0; i < patterns.length; i++) {
36✔
291
      matcher.add(patterns[i], true)
610✔
292
    }
610✔
293

36✔
294
    // Calculate total pattern length
36✔
295
    const totalLength = patterns.reduce((sum, p) => sum + p.length, 0)
36✔
296

36✔
297
    // Determine if we need to batch
36✔
298
    const needsBatching =
36✔
299
      patterns.length > MAX_PATTERNS_PER_BATCH ||
36✔
300
      totalLength > MAX_TOTAL_PATTERN_LENGTH
30✔
301

36✔
302
    if (needsBatching) {
36✔
303
      // Process patterns in batches to avoid creating regex that's too large
8✔
304
      // Use dynamic batching based on cumulative length
8✔
305
      const seenTopics = new Set() // Track yielded packets to avoid duplicates
8✔
306
      const batches = []
8✔
307
      let currentBatch = []
8✔
308
      let currentLength = 0
8✔
309

8✔
310
      for (const pattern of patterns) {
8✔
311
        const patternLength = pattern.length
500✔
312

500✔
313
        // Edge case: if a single pattern exceeds MAX_TOTAL_PATTERN_LENGTH,
500✔
314
        // it will be placed in its own batch. This is intentional behavior
500✔
315
        // to ensure the pattern is still processed (MongoDB will handle it
500✔
316
        // or fail with a clear error). Very long patterns (>32KB after escaping)
500✔
317
        // may still cause MongoDB "regular expression is too large" errors.
500✔
318

500✔
319
        // Start a new batch if adding this pattern would exceed limits
500✔
320
        if (currentBatch.length >= MAX_PATTERNS_PER_BATCH ||
500✔
321
            (currentLength + patternLength > MAX_TOTAL_PATTERN_LENGTH && currentBatch.length > 0)) {
500✔
322
          batches.push(currentBatch)
8✔
323
          currentBatch = []
8✔
324
          currentLength = 0
8✔
325
        }
8✔
326
        currentBatch.push(pattern)
500✔
327
        currentLength += patternLength
500✔
328
      }
500✔
329
      // Add the last batch if not empty
8✔
330
      if (currentBatch.length > 0) {
8✔
331
        batches.push(currentBatch)
8✔
332
      }
8✔
333

8✔
334
      for (const batch of batches) {
8✔
335
        for await (const packet of this.#queryRetainedByPatterns(batch, matcher)) {
16✔
336
          // Avoid duplicates across batches
404✔
337
          if (!seenTopics.has(packet.topic)) {
404✔
338
            seenTopics.add(packet.topic)
382✔
339
            yield packet
382✔
340
          }
382✔
341
        }
404✔
342
      }
16✔
343
    } else {
36✔
344
      // Original logic for small pattern sets
28✔
345
      for await (const packet of this.#queryRetainedByPatterns(patterns, matcher)) {
28✔
346
        yield packet
102✔
347
      }
102✔
348
    }
28✔
349
  }
36✔
350

140✔
351
  async * #queryRetainedByPatterns (patterns, matcher) {
140✔
352
    // Early return for empty patterns to avoid creating an empty regex
44✔
353
    // that would match all documents in the collection
44✔
354
    if (patterns.length === 0) {
44✔
355
      return
2✔
356
    }
2✔
357

42✔
358
    const regexes = patterns.map(pattern =>
42✔
359
      regEscape(pattern).replace(/(\/*#|\\\+).*$/, '')
610✔
360
    )
42✔
361

42✔
362
    const topic = new RegExp(regexes.join('|'))
42✔
363
    const filter = { topic }
42✔
364
    const exclude = { _id: 0 }
42✔
365

42✔
366
    for await (const result of this.#cl.retained.find(filter).project(exclude)) {
44✔
367
      const packet = asPacket(result)
506✔
368
      if (matcher.match(packet.topic).length > 0) {
506✔
369
        yield packet
506✔
370
      }
506✔
371
    }
506✔
372
  }
44✔
373

140✔
374
  async addSubscriptions (client, subs) {
140✔
375
    const subscriptions = []
64✔
376
    const operations = subs.map(sub => {
64✔
377
      const subscription = { ...sub, clientId: client.id }
114✔
378
      subscriptions.push(subscription)
114✔
379
      return {
114✔
380
        updateOne: {
114✔
381
          filter: {
114✔
382
            clientId: client.id,
114✔
383
            topic: sub.topic
114✔
384
          },
114✔
385
          update: {
114✔
386
            $set: decorateSubscription(subscription, this.#opts)
114✔
387
          },
114✔
388
          upsert: true
114✔
389
        }
114✔
390
      }
114✔
391
    })
64✔
392

64✔
393
    await this.#cl.subscriptions.bulkWrite(operations)
64✔
394
    // inform the broker
64✔
395
    await this.#broadcast.addedSubscriptions(client, subs)
64✔
396
  }
64✔
397

140✔
398
  async removeSubscriptions (client, subs) {
140✔
399
    const operations = subs.map(topic => ({
32✔
400
      deleteOne: {
34✔
401
        filter: {
34✔
402
          clientId: client.id,
34✔
403
          topic
34✔
404
        }
34✔
405
      }
34✔
406
    }))
32✔
407
    await this.#cl.subscriptions.bulkWrite(operations)
32✔
408
    // inform the broker
32✔
409
    await this.#broadcast.removedSubscriptions(client, subs)
32✔
410
  }
32✔
411

140✔
412
  async subscriptionsByClient (client) {
140✔
413
    const filter = { clientId: client.id }
32✔
414
    const exclude = { clientId: false, _id: false } // exclude these fields
32✔
415
    const subs = await this.#cl.subscriptions.find(filter).project(exclude).toArray()
32✔
416
    return subs
32✔
417
  }
32✔
418

140✔
419
  async countOffline () {
140✔
420
    const subsCount = this.#trie.subscriptionsCount
44✔
421
    const result = await this.#cl.subscriptions.aggregate([
44✔
422
      {
44✔
423
        $group: {
44✔
424
          _id: '$clientId'
44✔
425
        }
44✔
426
      }, {
44✔
427
        $count: 'clientsCount'
44✔
428
      }]).toArray()
44✔
429
    const clientsCount = result[0]?.clientsCount || 0
44✔
430
    return { subsCount, clientsCount }
44✔
431
  }
44✔
432

140✔
433
  async destroy () {
140✔
434
    if (this.#destroyed) {
140!
UNCOV
435
      throw new Error('destroyed called twice!')
×
436
    }
×
437
    this.#destroyed = true
140✔
438
    // stop listening to subscription updates
140✔
439
    await this.#broadcast.brokerUnsubscribe()
140✔
440

140✔
441
    if (this.#opts.db) {
140✔
442
      return
8✔
443
    }
8✔
444
    await this.#mongoDBclient.close()
132✔
445
  }
140✔
446

140✔
447
  async subscriptionsByTopic (topic) {
140✔
448
    return this.#trie.match(topic)
34✔
449
  }
34✔
450

140✔
451
  async cleanSubscriptions (client) {
140✔
452
    const subs = await this.subscriptionsByClient(client)
4✔
453
    if (subs.length > 0) {
4✔
454
      const remSubs = subs.map(sub => sub.topic)
2✔
455
      await this.removeSubscriptions(client, remSubs)
2✔
456
    }
2✔
457
  }
4✔
458

140✔
459
  async outgoingEnqueue (sub, packet) {
140✔
460
    return await this.outgoingEnqueueCombi([sub], packet)
12✔
461
  }
12✔
462

140✔
463
  async outgoingEnqueueCombi (subs, packet) {
140✔
464
    if (subs?.length === 0) {
30!
UNCOV
465
      return packet
×
466
    }
×
467

30✔
468
    const newPacket = new Packet(packet)
30✔
469
    const decoratedPacket = decoratePacket(newPacket, this.#opts.ttl.packets)
30✔
470
    const packets = subs.map(sub => ({
30✔
471
      clientId: sub.clientId,
32✔
472
      packet: decoratedPacket
32✔
473
    }))
30✔
474

30✔
475
    await this.#cl.outgoing.insertMany(packets)
30✔
476
  }
30✔
477

140✔
478
  async * outgoingStream (client) {
140✔
479
    for await (const result of this.#cl.outgoing.find({ clientId: client.id })) {
30✔
480
      yield asPacket(result)
34✔
481
    }
34✔
482
  }
30✔
483

140✔
484
  async outgoingUpdate (client, packet) {
140✔
485
    if (packet.brokerId) {
26✔
486
      await updateWithMessageId(this.#cl, client, packet)
24✔
487
    } else {
26✔
488
      await updatePacket(this.#cl, client, packet)
2✔
489
    }
2✔
490
  }
26✔
491

140✔
492
  async outgoingClearMessageId (client, packet) {
140✔
493
    const result = await this.#cl.outgoing.findOneAndDelete({
8✔
494
      clientId: client.id,
8✔
495
      'packet.messageId': packet.messageId
8✔
496
    })
8✔
497
    return result ? asPacket(result) : null
8✔
498
  }
8✔
499

140✔
500
  async incomingStorePacket (client, packet) {
140✔
501
    const newPacket = new Packet(packet)
4✔
502
    newPacket.messageId = packet.messageId
4✔
503

4✔
504
    await this.#cl.incoming.insertOne({
4✔
505
      clientId: client.id,
4✔
506
      packet: decoratePacket(newPacket, this.#opts.ttl.packets)
4✔
507
    })
4✔
508
  }
4✔
509

140✔
510
  async incomingGetPacket (client, packet) {
140✔
511
    const result = await this.#cl.incoming.findOne({
4✔
512
      clientId: client.id,
4✔
513
      'packet.messageId': packet.messageId
4✔
514
    })
4✔
515

4✔
516
    if (!result) {
4✔
517
      throw new Error(`packet not found for: ${client}`)
2✔
518
    }
2✔
519

2✔
520
    return asPacket(result)
2✔
521
  }
4✔
522

140✔
523
  async incomingDelPacket (client, packet) {
140✔
524
    await this.#cl.incoming.deleteOne({
2✔
525
      clientId: client.id,
2✔
526
      'packet.messageId': packet.messageId
2✔
527
    })
2✔
528
  }
2✔
529

140✔
530
  async putWill (client, packet) {
140✔
531
    packet.clientId = client.id
12✔
532
    packet.brokerId = this.#broker.id
12✔
533
    await this.#cl.will.insertOne({
12✔
534
      clientId: client.id,
12✔
535
      packet: decoratePacket(packet, this.#opts.ttl.packets)
12✔
536
    })
12✔
537
  }
12✔
538

140✔
539
  async getWill (client) {
140✔
540
    const result = await this.#cl.will.findOne({
4✔
541
      clientId: client.id
4✔
542
    })
4✔
543
    if (!result) {
4✔
544
      return null // packet not found
2✔
545
    }
2✔
546
    return asPacket(result)
2✔
547
  }
4✔
548

140✔
549
  async delWill (client) {
140✔
550
    const result = await this.#cl.will.findOneAndDelete({
8✔
551
      clientId: client.id
8✔
552
    })
8✔
553
    if (!result) {
8!
UNCOV
554
      return null // packet not found
×
555
    }
×
556
    return asPacket(result)
8✔
557
  }
8✔
558

140✔
559
  async * streamWill (brokers) {
140✔
560
    const filter = {}
4✔
561

4✔
562
    if (brokers) {
4✔
563
      filter['packet.brokerId'] = { $nin: Object.keys(brokers) }
2✔
564
    }
2✔
565
    for await (const will of this.#cl.will.find(filter)) {
4✔
566
      yield asPacket(will)
4✔
567
    }
4✔
568
  }
4✔
569

140✔
570
  async * getClientList (topic) {
140✔
571
    const filter = {}
4✔
572
    if (topic) {
4✔
573
      filter.topic = topic
4✔
574
    }
4✔
575
    for await (const sub of this.#cl.subscriptions.find(filter)) {
4✔
576
      yield sub.clientId
6✔
577
    }
6✔
578
  }
4✔
579
}
140✔
580

4✔
581
function decoratePacket (packet, setTTL) {
2,542✔
582
  if (setTTL) {
2,542✔
583
    packet.added = new Date()
10✔
584
  }
10✔
585
  return packet
2,542✔
586
}
2,542✔
587

4✔
588
function decorateSubscription (sub, opts) {
114✔
589
  if (opts.ttl.subscriptions) {
114✔
590
    sub.added = new Date()
2✔
591
  }
2✔
592
  return sub
114✔
593
}
114✔
594

4✔
595
function asPacket (obj) {
562✔
596
  const packet = obj?.packet || obj
562✔
597
  if (!packet) {
562!
UNCOV
598
    throw new Error('Invalid packet')
×
599
  }
×
600
  if (Buffer.isBuffer(packet?.payload?.buffer)) {
562✔
601
    packet.payload = packet.payload.buffer
558✔
602
  }
558✔
603
  return packet
562✔
604
}
562✔
605

4✔
606
async function updateWithMessageId (db, client, packet) {
24✔
607
  await db.outgoing.updateOne({
24✔
608
    clientId: client.id,
24✔
609
    'packet.brokerCounter': packet.brokerCounter,
24✔
610
    'packet.brokerId': packet.brokerId
24✔
611
  }, {
24✔
612
    $set: {
24✔
613
      'packet.messageId': packet.messageId
24✔
614
    }
24✔
615
  })
24✔
616
}
24✔
617

4✔
618
async function updatePacket (db, client, packet) {
2✔
619
  await db.outgoing.updateOne({
2✔
620
    clientId: client.id,
2✔
621
    'packet.messageId': packet.messageId
2✔
622
  }, {
2✔
623
    $set: {
2✔
624
      clientId: client.id,
2✔
625
      packet
2✔
626
    }
2✔
627
  })
2✔
628
}
2✔
629

4✔
630
function promiseWithResolvers () {
2,498✔
631
  // this can be replaced by Promise.withResolvers()in NodeJS >= 22
2,498✔
632
  let res
2,498✔
633
  let rej
2,498✔
634
  const promise = new Promise((resolve, reject) => {
2,498✔
635
    res = resolve
2,498✔
636
    rej = reject
2,498✔
637
  })
2,498✔
638
  return { promise, resolve: res, reject: rej }
2,498✔
639
}
2,498✔
640

4✔
641
module.exports = AsyncMongoPersistence
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc