• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence-mongodb / 21433832162

28 Jan 2026 10:05AM UTC coverage: 97.574% (+0.2%) from 97.406%
21433832162

Pull #89

github

web-flow
Merge e1669f7db into 1459a4f24
Pull Request #89: fix: prevent "regular expression is too large" mongodb error when fetching retained packets

107 of 114 branches covered (93.86%)

Branch coverage included in aggregate %.

137 of 138 new or added lines in 1 file covered. (99.28%)

10 existing lines in 1 file now uncovered.

617 of 628 relevant lines covered (98.25%)

321.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.54
/asyncPersistence.js
1
'use strict'
4✔
2

4✔
3
const regEscape = require('escape-string-regexp')
4✔
4
const Packet = require('aedes-persistence').Packet
4✔
5
const BroadcastPersistence = require('aedes-persistence/broadcastPersistence.js')
4✔
6
const { MongoClient } = require('mongodb')
4✔
7
const { Qlobber } = require('qlobber')
4✔
8
const QlobberSub = require('qlobber/aedes/qlobber-sub')
4✔
9
const QLOBBER_OPTIONS = {
4✔
10
  separator: '/',
4✔
11
  wildcard_one: '+',
4✔
12
  wildcard_some: '#',
4✔
13
  match_empty_levels: true
4✔
14
}
4✔
15

4✔
16
class AsyncMongoPersistence {
72✔
17
  // private class members start with #
140✔
18
  #trie
140✔
19
  #destroyed
140✔
20
  #broker
140✔
21
  #opts
140✔
22
  #db
140✔
23
  #mongoDBclient
140✔
24
  #cl
140✔
25
  #broadcast
140✔
26
  #retainedBulkQueue
140✔
27
  #executing
140✔
28

140✔
29
  constructor (opts = {}) {
140✔
30
    this.#trie = new QlobberSub(QLOBBER_OPTIONS) // used to match packets
140✔
31
    opts.ttl = opts.ttl || {}
140✔
32

140✔
33
    if (typeof opts.ttl.packets === 'number') {
140✔
34
      const ttl = opts.ttl.packets
10✔
35
      opts.ttl.packets = {
10✔
36
        retained: ttl,
10✔
37
        will: ttl,
10✔
38
        outgoing: ttl,
10✔
39
        incoming: ttl
10✔
40
      }
10✔
41
    }
10✔
42

140✔
43
    this.#opts = opts
140✔
44
    this.#db = null
140✔
45
    this.#cl = null
140✔
46
    this.#destroyed = false
140✔
47
    this.#retainedBulkQueue = [] // used for storing retained packets with ordered bulks
140✔
48
    this.#executing = false // used as lock while a bulk is executing
140✔
49
  }
140✔
50

140✔
51
  #addTTLIndexes (indexes) {
140✔
52
    const addTTLIndex = (collection, key, expireAfterSeconds) => {
140✔
53
      if (expireAfterSeconds >= 0) {
52✔
54
        indexes.push({ collection, key, name: 'ttl', expireAfterSeconds })
52✔
55
      }
52✔
56
    }
52✔
57

140✔
58
    if (this.#opts.ttl.subscriptions >= 0) {
140✔
59
      addTTLIndex(
12✔
60
        'subscriptions',
12✔
61
        this.#opts.ttlAfterDisconnected ? 'disconnected' : 'added',
12✔
62
        this.#opts.ttl.subscriptions
12✔
63
      )
12✔
64
    }
12✔
65

140✔
66
    if (this.#opts.ttl.packets) {
140✔
67
      addTTLIndex('retained', 'added', this.#opts.ttl.packets.retained)
10✔
68
      addTTLIndex('will', 'packet.added', this.#opts.ttl.packets.will)
10✔
69
      addTTLIndex('outgoing', 'packet.added', this.#opts.ttl.packets.outgoing)
10✔
70
      addTTLIndex('incoming', 'packet.added', this.#opts.ttl.packets.incoming)
10✔
71
    }
10✔
72
  }
140✔
73

140✔
74
  // access #broker, only for testing
140✔
75
  get broker () {
140✔
76
    return this.#broker
4,182✔
77
  }
4,182✔
78

140✔
79
  // access #db, only for testing
140✔
80
  get _db () {
140✔
81
    return this.#db
14✔
82
  }
14✔
83

140✔
84
  // access #mongoDBclient, only for testing
140✔
85
  get _mongoDBclient () {
140✔
86
    return this.#mongoDBclient
2✔
87
  }
2✔
88

140✔
89
  // setup is called by aedes-persistence/callbackPersistence.js
140✔
90
  async setup (broker) {
140✔
91
    this.#broker = broker
140✔
92

140✔
93
    // database already connected
140✔
94
    if (this.#db) {
140!
UNCOV
95
      return
×
UNCOV
96
    }
×
97

140✔
98
    // database already provided in the options
140✔
99
    if (this.#opts.db) {
140✔
100
      this.#db = this.#opts.db
8✔
101
    } else {
140✔
102
      // connect to the database
132✔
103
      const conn = this.#opts.url || 'mongodb://127.0.0.1/aedes'
132!
104
      const options = this.#opts.mongoOptions
132✔
105

132✔
106
      const mongoDBclient = new MongoClient(conn, options)
132✔
107
      this.#mongoDBclient = mongoDBclient
132✔
108
      const urlParsed = URL.parse(this.#opts.url)
132✔
109
      // skip the first / of the pathname if it exists
132✔
110
      const pathname = urlParsed.pathname ? urlParsed.pathname.substring(1) : undefined
132!
111
      const databaseName = this.#opts.database || pathname
132✔
112
      this.#db = mongoDBclient.db(databaseName)
132✔
113
    }
132✔
114

140✔
115
    const collectionPrefix = `${this.#opts.collectionPrefix || ''}`
140✔
116

140✔
117
    const db = this.#db
140✔
118
    const subscriptions = db.collection(`${collectionPrefix}subscriptions`)
140✔
119
    const retained = db.collection(`${collectionPrefix}retained`)
140✔
120
    const will = db.collection(`${collectionPrefix}will`)
140✔
121
    const outgoing = db.collection(`${collectionPrefix}outgoing`)
140✔
122
    const incoming = db.collection(`${collectionPrefix}incoming`)
140✔
123
    this.#cl = {
140✔
124
      subscriptions,
140✔
125
      retained,
140✔
126
      will,
140✔
127
      outgoing,
140✔
128
      incoming
140✔
129
    }
140✔
130

140✔
131
    // drop existing TTL indexes (if exist)
140✔
132
    if (this.#opts.dropExistingIndexes) {
140✔
133
      const collections = await db.collections()
2✔
134
      for (const collection of collections) {
2✔
135
        const exists = await collection.indexExists('ttl')
12✔
136
        if (exists) {
12✔
137
          await collection.dropIndex('ttl')
10✔
138
        }
10✔
139
      }
12✔
140
    }
2✔
141

140✔
142
    // create indexes
140✔
143
    const createIndex = async (idx) => {
140✔
144
      const indexOpts = { name: idx.name }
612✔
145
      if (typeof idx.expireAfterSeconds === 'number') {
612✔
146
        indexOpts.expireAfterSeconds = idx.expireAfterSeconds
52✔
147
      }
52✔
148
      await this.#cl[idx.collection].createIndex(idx.key, indexOpts)
612✔
149
    }
612✔
150

140✔
151
    const indexes = [
140✔
152
      {
140✔
153
        collection: 'outgoing',
140✔
154
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
140✔
155
        name: 'query_clientId_brokerId'
140✔
156
      },
140✔
157
      {
140✔
158
        collection: 'outgoing',
140✔
159
        key: { clientId: 1, 'packet.messageId': 1 },
140✔
160
        name: 'query_clientId_messageId'
140✔
161
      },
140✔
162
      {
140✔
163
        collection: 'incoming',
140✔
164
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
140✔
165
        name: 'query_clientId_brokerId'
140✔
166
      },
140✔
167
      {
140✔
168
        collection: 'incoming',
140✔
169
        key: { clientId: 1, 'packet.messageId': 1 },
140✔
170
        name: 'query_clientId_messageId'
140✔
171
      }
140✔
172
    ]
140✔
173

140✔
174
    // Add TTL indexes
140✔
175
    this.#addTTLIndexes(indexes)
140✔
176
    // create all indexes in parallel
140✔
177
    await Promise.all(indexes.map(createIndex))
140✔
178

140✔
179
    if (this.#opts.ttlAfterDisconnected) {
140✔
180
      // To avoid stale subscriptions that might be left behind by broker shutting
4✔
181
      // down while clients were connected, set all to disconnected on startup.
4✔
182
      await this.#cl.subscriptions.updateMany({ disconnected: { $exists: false } }, { $currentDate: { disconnected: true } })
4✔
183

4✔
184
      // Handlers for setting and clearing the disconnected timestamp on subscriptions
4✔
185
      this.#broker.on('clientReady', (client) => {
4✔
UNCOV
186
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $unset: { disconnected: true } })
×
187
      })
4✔
188
      this.#broker.on('clientDisconnect', (client) => {
4✔
189
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $currentDate: { disconnected: true } })
2✔
190
      })
4✔
191
    }
4✔
192

140✔
193
    // add subscriptions to Trie
140✔
194
    for await (const subscription of subscriptions.find({
140✔
195
      qos: { $gte: 0 }
140✔
196
    })) {
140✔
197
      this.#trie.add(subscription.topic, subscription)
2✔
198
    }
2✔
199
    // subscribe to the broker for subscription updates
140✔
200
    this.#broadcast = new BroadcastPersistence(broker, this.#trie)
140✔
201
    await this.#broadcast.brokerSubscribe()
140✔
202
    // setup is done
140✔
203
  }
140✔
204

140✔
205
  async processRetainedBulk () {
140✔
206
    if (!this.#executing && !this.#destroyed && this.#retainedBulkQueue.length > 0) {
4,994✔
207
      this.#executing = true
2,496✔
208
      const operations = []
2,496✔
209
      const onEnd = []
2,496✔
210

2,496✔
211
      while (this.#retainedBulkQueue.length) {
2,496✔
212
        const { operation, resolve } = this.#retainedBulkQueue.shift()
2,496✔
213
        operations.push(operation)
2,496✔
214
        onEnd.push(resolve)
2,496✔
215
      }
2,496✔
216
      // execute operations and ignore the error
2,496✔
217
      await this.#cl.retained.bulkWrite(operations).catch(() => {})
2,496✔
218
      // resolve all promises
2,496✔
219
      while (onEnd.length) onEnd.shift().call()
2,496✔
220
      // check if we have new packets in queue
2,496✔
221
      this.#executing = false
2,496✔
222
      // do not await as we run this in background and ignore errors
2,496✔
223
      this.processRetainedBulk()
2,496✔
224
    }
2,496✔
225
    if (this.#destroyed) {
4,994✔
226
      // cleanup dangling promises
2✔
227
      while (this.#retainedBulkQueue.length) {
2✔
228
        const { resolve } = this.#retainedBulkQueue.shift()
2✔
229
        resolve() // resolve all promises
2✔
230
      }
2✔
231
    }
2✔
232
  }
4,994✔
233

140✔
234
  async storeRetained (packet) {
140✔
235
    const { promise, resolve } = promiseWithResolvers()
2,498✔
236
    const queue = this.#retainedBulkQueue
2,498✔
237
    const filter = { topic: packet.topic }
2,498✔
238

2,498✔
239
    if (packet.payload.length > 0) {
2,498✔
240
      queue.push({
2,496✔
241
        operation: {
2,496✔
242
          updateOne: {
2,496✔
243
            filter,
2,496✔
244
            update: { $set: decoratePacket(packet, this.#opts.ttl.packets) },
2,496✔
245
            upsert: true
2,496✔
246
          }
2,496✔
247
        },
2,496✔
248
        resolve
2,496✔
249
      })
2,496✔
250
    } else {
2,498✔
251
      queue.push({
2✔
252
        operation: {
2✔
253
          deleteOne: {
2✔
254
            filter
2✔
255
          }
2✔
256
        },
2✔
257
        resolve
2✔
258
      })
2✔
259
    }
2✔
260
    this.processRetainedBulk()
2,498✔
261
    return promise
2,498✔
262
  }
2,498✔
263

140✔
264
  createRetainedStream (pattern) {
140✔
265
    return this.createRetainedStreamCombi([pattern])
20✔
266
  }
20✔
267

140✔
268
  async * createRetainedStreamCombi (patterns) {
140✔
269
    const matcher = new Qlobber(QLOBBER_OPTIONS)
36✔
270

36✔
271
    for (let i = 0; i < patterns.length; i++) {
36✔
272
      matcher.add(patterns[i], true)
610✔
273
    }
610✔
274

36✔
275
    // To avoid MongoDB "regular expression is too large" errors,
36✔
276
    // we need to batch patterns when they're numerous or long.
36✔
277
    // MongoDB has a BSON document size limit of ~16MB, but regex patterns
36✔
278
    // can hit practical limits around 32KB depending on the driver.
36✔
279
    const MAX_PATTERNS_PER_BATCH = 50
36✔
280
    const MAX_TOTAL_PATTERN_LENGTH = 5000
36✔
281

36✔
282
    // Calculate total pattern length
36✔
283
    const totalLength = patterns.reduce((sum, p) => sum + p.length, 0)
36✔
284

36✔
285
    // Determine if we need to batch
36✔
286
    const needsBatching =
36✔
287
      patterns.length > MAX_PATTERNS_PER_BATCH ||
36✔
288
      totalLength > MAX_TOTAL_PATTERN_LENGTH
30✔
289

36✔
290
    if (needsBatching) {
36✔
291
      // Process patterns in batches to avoid creating regex that's too large
8✔
292
      // Use dynamic batching based on cumulative length
8✔
293
      const seenTopics = new Set() // Track yielded packets to avoid duplicates
8✔
294
      const batches = []
8✔
295
      let currentBatch = []
8✔
296
      let currentLength = 0
8✔
297

8✔
298
      for (const pattern of patterns) {
8✔
299
        const patternLength = pattern.length
500✔
300
        // Start a new batch if adding this pattern would exceed limits
500✔
301
        if (currentBatch.length >= MAX_PATTERNS_PER_BATCH ||
500✔
302
            (currentLength + patternLength > MAX_TOTAL_PATTERN_LENGTH && currentBatch.length > 0)) {
500✔
303
          batches.push(currentBatch)
8✔
304
          currentBatch = []
8✔
305
          currentLength = 0
8✔
306
        }
8✔
307
        currentBatch.push(pattern)
500✔
308
        currentLength += patternLength
500✔
309
      }
500✔
310
      // Add the last batch if not empty
8✔
311
      if (currentBatch.length > 0) {
8✔
312
        batches.push(currentBatch)
8✔
313
      }
8✔
314

8✔
315
      for (const batch of batches) {
8✔
316
        for await (const packet of this.#queryRetainedByPatterns(batch, matcher)) {
16✔
317
          // Avoid duplicates across batches
404✔
318
          if (!seenTopics.has(packet.topic)) {
404✔
319
            seenTopics.add(packet.topic)
382✔
320
            yield packet
382✔
321
          }
382✔
322
        }
404✔
323
      }
16✔
324
    } else {
36✔
325
      // Original logic for small pattern sets
28✔
326
      for await (const packet of this.#queryRetainedByPatterns(patterns, matcher)) {
28✔
327
        yield packet
102✔
328
      }
102✔
329
    }
28✔
330
  }
36✔
331

140✔
332
  async * #queryRetainedByPatterns (patterns, matcher) {
140✔
333
    // Early return for empty patterns to avoid creating an empty regex
44✔
334
    // that would match all documents in the collection
44✔
335
    if (patterns.length === 0) {
44✔
336
      return
2✔
337
    }
2✔
338

42✔
339
    const regexes = patterns.map(pattern =>
42✔
340
      regEscape(pattern).replace(/(\/*#|\\\+).*$/, '')
610✔
341
    )
42✔
342

42✔
343
    const topic = new RegExp(regexes.join('|'))
42✔
344
    const filter = { topic }
42✔
345
    const exclude = { _id: 0 }
42✔
346

42✔
347
    for await (const result of this.#cl.retained.find(filter).project(exclude)) {
44✔
348
      const packet = asPacket(result)
506✔
349
      if (matcher.match(packet.topic).length > 0) {
506✔
350
        yield packet
506✔
351
      }
506✔
352
    }
506✔
353
  }
44✔
354

140✔
355
  async addSubscriptions (client, subs) {
140✔
356
    const subscriptions = []
64✔
357
    const operations = subs.map(sub => {
64✔
358
      const subscription = { ...sub, clientId: client.id }
114✔
359
      subscriptions.push(subscription)
114✔
360
      return {
114✔
361
        updateOne: {
114✔
362
          filter: {
114✔
363
            clientId: client.id,
114✔
364
            topic: sub.topic
114✔
365
          },
114✔
366
          update: {
114✔
367
            $set: decorateSubscription(subscription, this.#opts)
114✔
368
          },
114✔
369
          upsert: true
114✔
370
        }
114✔
371
      }
114✔
372
    })
64✔
373

64✔
374
    await this.#cl.subscriptions.bulkWrite(operations)
64✔
375
    // inform the broker
64✔
376
    await this.#broadcast.addedSubscriptions(client, subs)
64✔
377
  }
64✔
378

140✔
379
  async removeSubscriptions (client, subs) {
140✔
380
    const operations = subs.map(topic => ({
32✔
381
      deleteOne: {
34✔
382
        filter: {
34✔
383
          clientId: client.id,
34✔
384
          topic
34✔
385
        }
34✔
386
      }
34✔
387
    }))
32✔
388
    await this.#cl.subscriptions.bulkWrite(operations)
32✔
389
    // inform the broker
32✔
390
    await this.#broadcast.removedSubscriptions(client, subs)
32✔
391
  }
32✔
392

140✔
393
  async subscriptionsByClient (client) {
140✔
394
    const filter = { clientId: client.id }
32✔
395
    const exclude = { clientId: false, _id: false } // exclude these fields
32✔
396
    const subs = await this.#cl.subscriptions.find(filter).project(exclude).toArray()
32✔
397
    return subs
32✔
398
  }
32✔
399

140✔
400
  async countOffline () {
140✔
401
    const subsCount = this.#trie.subscriptionsCount
44✔
402
    const result = await this.#cl.subscriptions.aggregate([
44✔
403
      {
44✔
404
        $group: {
44✔
405
          _id: '$clientId'
44✔
406
        }
44✔
407
      }, {
44✔
408
        $count: 'clientsCount'
44✔
409
      }]).toArray()
44✔
410
    const clientsCount = result[0]?.clientsCount || 0
44✔
411
    return { subsCount, clientsCount }
44✔
412
  }
44✔
413

140✔
414
  async destroy () {
140✔
415
    if (this.#destroyed) {
140!
UNCOV
416
      throw new Error('destroyed called twice!')
×
UNCOV
417
    }
×
418
    this.#destroyed = true
140✔
419
    // stop listening to subscription updates
140✔
420
    await this.#broadcast.brokerUnsubscribe()
140✔
421

140✔
422
    if (this.#opts.db) {
140✔
423
      return
8✔
424
    }
8✔
425
    await this.#mongoDBclient.close()
132✔
426
  }
140✔
427

140✔
428
  async subscriptionsByTopic (topic) {
140✔
429
    return this.#trie.match(topic)
34✔
430
  }
34✔
431

140✔
432
  async cleanSubscriptions (client) {
140✔
433
    const subs = await this.subscriptionsByClient(client)
4✔
434
    if (subs.length > 0) {
4✔
435
      const remSubs = subs.map(sub => sub.topic)
2✔
436
      await this.removeSubscriptions(client, remSubs)
2✔
437
    }
2✔
438
  }
4✔
439

140✔
440
  async outgoingEnqueue (sub, packet) {
140✔
441
    return await this.outgoingEnqueueCombi([sub], packet)
12✔
442
  }
12✔
443

140✔
444
  async outgoingEnqueueCombi (subs, packet) {
140✔
445
    if (subs?.length === 0) {
30!
UNCOV
446
      return packet
×
UNCOV
447
    }
×
448

30✔
449
    const newPacket = new Packet(packet)
30✔
450
    const decoratedPacket = decoratePacket(newPacket, this.#opts.ttl.packets)
30✔
451
    const packets = subs.map(sub => ({
30✔
452
      clientId: sub.clientId,
32✔
453
      packet: decoratedPacket
32✔
454
    }))
30✔
455

30✔
456
    await this.#cl.outgoing.insertMany(packets)
30✔
457
  }
30✔
458

140✔
459
  async * outgoingStream (client) {
140✔
460
    for await (const result of this.#cl.outgoing.find({ clientId: client.id })) {
30✔
461
      yield asPacket(result)
34✔
462
    }
34✔
463
  }
30✔
464

140✔
465
  async outgoingUpdate (client, packet) {
140✔
466
    if (packet.brokerId) {
26✔
467
      await updateWithMessageId(this.#cl, client, packet)
24✔
468
    } else {
26✔
469
      await updatePacket(this.#cl, client, packet)
2✔
470
    }
2✔
471
  }
26✔
472

140✔
473
  async outgoingClearMessageId (client, packet) {
140✔
474
    const result = await this.#cl.outgoing.findOneAndDelete({
8✔
475
      clientId: client.id,
8✔
476
      'packet.messageId': packet.messageId
8✔
477
    })
8✔
478
    return result ? asPacket(result) : null
8✔
479
  }
8✔
480

140✔
481
  async incomingStorePacket (client, packet) {
140✔
482
    const newPacket = new Packet(packet)
4✔
483
    newPacket.messageId = packet.messageId
4✔
484

4✔
485
    await this.#cl.incoming.insertOne({
4✔
486
      clientId: client.id,
4✔
487
      packet: decoratePacket(newPacket, this.#opts.ttl.packets)
4✔
488
    })
4✔
489
  }
4✔
490

140✔
491
  async incomingGetPacket (client, packet) {
140✔
492
    const result = await this.#cl.incoming.findOne({
4✔
493
      clientId: client.id,
4✔
494
      'packet.messageId': packet.messageId
4✔
495
    })
4✔
496

4✔
497
    if (!result) {
4✔
498
      throw new Error(`packet not found for: ${client}`)
2✔
499
    }
2✔
500

2✔
501
    return asPacket(result)
2✔
502
  }
4✔
503

140✔
504
  async incomingDelPacket (client, packet) {
140✔
505
    await this.#cl.incoming.deleteOne({
2✔
506
      clientId: client.id,
2✔
507
      'packet.messageId': packet.messageId
2✔
508
    })
2✔
509
  }
2✔
510

140✔
511
  async putWill (client, packet) {
140✔
512
    packet.clientId = client.id
12✔
513
    packet.brokerId = this.#broker.id
12✔
514
    await this.#cl.will.insertOne({
12✔
515
      clientId: client.id,
12✔
516
      packet: decoratePacket(packet, this.#opts.ttl.packets)
12✔
517
    })
12✔
518
  }
12✔
519

140✔
520
  async getWill (client) {
140✔
521
    const result = await this.#cl.will.findOne({
4✔
522
      clientId: client.id
4✔
523
    })
4✔
524
    if (!result) {
4✔
525
      return null // packet not found
2✔
526
    }
2✔
527
    return asPacket(result)
2✔
528
  }
4✔
529

140✔
530
  async delWill (client) {
140✔
531
    const result = await this.#cl.will.findOneAndDelete({
8✔
532
      clientId: client.id
8✔
533
    })
8✔
534
    if (!result) {
8!
UNCOV
535
      return null // packet not found
×
NEW
536
    }
×
537
    return asPacket(result)
8✔
538
  }
8✔
539

140✔
540
  async * streamWill (brokers) {
140✔
541
    const filter = {}
4✔
542

4✔
543
    if (brokers) {
4✔
544
      filter['packet.brokerId'] = { $nin: Object.keys(brokers) }
2✔
545
    }
2✔
546
    for await (const will of this.#cl.will.find(filter)) {
4✔
547
      yield asPacket(will)
4✔
548
    }
4✔
549
  }
4✔
550

140✔
551
  async * getClientList (topic) {
140✔
552
    const filter = {}
4✔
553
    if (topic) {
4✔
554
      filter.topic = topic
4✔
555
    }
4✔
556
    for await (const sub of this.#cl.subscriptions.find(filter)) {
4✔
557
      yield sub.clientId
6✔
558
    }
6✔
559
  }
4✔
560
}
140✔
561

4✔
562
function decoratePacket (packet, setTTL) {
2,542✔
563
  if (setTTL) {
2,542✔
564
    packet.added = new Date()
10✔
565
  }
10✔
566
  return packet
2,542✔
567
}
2,542✔
568

4✔
569
function decorateSubscription (sub, opts) {
114✔
570
  if (opts.ttl.subscriptions) {
114✔
571
    sub.added = new Date()
2✔
572
  }
2✔
573
  return sub
114✔
574
}
114✔
575

4✔
576
function asPacket (obj) {
562✔
577
  const packet = obj?.packet || obj
562✔
578
  if (!packet) {
562!
UNCOV
579
    throw new Error('Invalid packet')
×
UNCOV
580
  }
×
581
  if (Buffer.isBuffer(packet?.payload?.buffer)) {
562✔
582
    packet.payload = packet.payload.buffer
558✔
583
  }
558✔
584
  return packet
562✔
585
}
562✔
586

4✔
587
async function updateWithMessageId (db, client, packet) {
24✔
588
  await db.outgoing.updateOne({
24✔
589
    clientId: client.id,
24✔
590
    'packet.brokerCounter': packet.brokerCounter,
24✔
591
    'packet.brokerId': packet.brokerId
24✔
592
  }, {
24✔
593
    $set: {
24✔
594
      'packet.messageId': packet.messageId
24✔
595
    }
24✔
596
  })
24✔
597
}
24✔
598

4✔
599
async function updatePacket (db, client, packet) {
2✔
600
  await db.outgoing.updateOne({
2✔
601
    clientId: client.id,
2✔
602
    'packet.messageId': packet.messageId
2✔
603
  }, {
2✔
604
    $set: {
2✔
605
      clientId: client.id,
2✔
606
      packet
2✔
607
    }
2✔
608
  })
2✔
609
}
2✔
610

4✔
611
function promiseWithResolvers () {
2,498✔
612
  // this can be replaced by Promise.withResolvers()in NodeJS >= 22
2,498✔
613
  let res
2,498✔
614
  let rej
2,498✔
615
  const promise = new Promise((resolve, reject) => {
2,498✔
616
    res = resolve
2,498✔
617
    rej = reject
2,498✔
618
  })
2,498✔
619
  return { promise, resolve: res, reject: rej }
2,498✔
620
}
2,498✔
621

4✔
622
module.exports = AsyncMongoPersistence
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc