• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

moscajs / aedes-persistence-mongodb / 21433171523

28 Jan 2026 09:44AM UTC coverage: 95.184% (-2.2%) from 97.406%
21433171523

Pull #89

github

web-flow
Merge b78b15f90 into 1459a4f24
Pull Request #89: fix: prevent "regular expression is too large" mongodb error when fetching retained packets

94 of 102 branches covered (92.16%)

Branch coverage included in aggregate %.

78 of 93 new or added lines in 1 file covered. (83.87%)

12 existing lines in 1 file now uncovered.

578 of 604 relevant lines covered (95.7%)

261.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/asyncPersistence.js
1
'use strict'
4✔
2

4✔
3
const regEscape = require('escape-string-regexp')
4✔
4
const Packet = require('aedes-persistence').Packet
4✔
5
const BroadcastPersistence = require('aedes-persistence/broadcastPersistence.js')
4✔
6
const { MongoClient } = require('mongodb')
4✔
7
const { Qlobber } = require('qlobber')
4✔
8
const QlobberSub = require('qlobber/aedes/qlobber-sub')
4✔
9
const QLOBBER_OPTIONS = {
4✔
10
  separator: '/',
4✔
11
  wildcard_one: '+',
4✔
12
  wildcard_some: '#',
4✔
13
  match_empty_levels: true
4✔
14
}
4✔
15

4✔
16
class AsyncMongoPersistence {
66✔
17
  // private class members start with #
128✔
18
  #trie
128✔
19
  #destroyed
128✔
20
  #broker
128✔
21
  #opts
128✔
22
  #db
128✔
23
  #mongoDBclient
128✔
24
  #cl
128✔
25
  #broadcast
128✔
26
  #retainedBulkQueue
128✔
27
  #executing
128✔
28

128✔
29
  constructor (opts = {}) {
128✔
30
    this.#trie = new QlobberSub(QLOBBER_OPTIONS) // used to match packets
128✔
31
    opts.ttl = opts.ttl || {}
128✔
32

128✔
33
    if (typeof opts.ttl.packets === 'number') {
128✔
34
      const ttl = opts.ttl.packets
10✔
35
      opts.ttl.packets = {
10✔
36
        retained: ttl,
10✔
37
        will: ttl,
10✔
38
        outgoing: ttl,
10✔
39
        incoming: ttl
10✔
40
      }
10✔
41
    }
10✔
42

128✔
43
    this.#opts = opts
128✔
44
    this.#db = null
128✔
45
    this.#cl = null
128✔
46
    this.#destroyed = false
128✔
47
    this.#retainedBulkQueue = [] // used for storing retained packets with ordered bulks
128✔
48
    this.#executing = false // used as lock while a bulk is executing
128✔
49
  }
128✔
50

128✔
51
  #addTTLIndexes (indexes) {
128✔
52
    const addTTLIndex = (collection, key, expireAfterSeconds) => {
128✔
53
      if (expireAfterSeconds >= 0) {
52✔
54
        indexes.push({ collection, key, name: 'ttl', expireAfterSeconds })
52✔
55
      }
52✔
56
    }
52✔
57

128✔
58
    if (this.#opts.ttl.subscriptions >= 0) {
128✔
59
      addTTLIndex(
12✔
60
        'subscriptions',
12✔
61
        this.#opts.ttlAfterDisconnected ? 'disconnected' : 'added',
12✔
62
        this.#opts.ttl.subscriptions
12✔
63
      )
12✔
64
    }
12✔
65

128✔
66
    if (this.#opts.ttl.packets) {
128✔
67
      addTTLIndex('retained', 'added', this.#opts.ttl.packets.retained)
10✔
68
      addTTLIndex('will', 'packet.added', this.#opts.ttl.packets.will)
10✔
69
      addTTLIndex('outgoing', 'packet.added', this.#opts.ttl.packets.outgoing)
10✔
70
      addTTLIndex('incoming', 'packet.added', this.#opts.ttl.packets.incoming)
10✔
71
    }
10✔
72
  }
128✔
73

128✔
74
  // access #broker, only for testing
128✔
75
  get broker () {
128✔
76
    return this.#broker
4,182✔
77
  }
4,182✔
78

128✔
79
  // access #db, only for testing
128✔
80
  get _db () {
128✔
81
    return this.#db
14✔
82
  }
14✔
83

128✔
84
  // access #mongoDBclient, only for testing
128✔
85
  get _mongoDBclient () {
128✔
86
    return this.#mongoDBclient
2✔
87
  }
2✔
88

128✔
89
  // setup is called by aedes-persistence/callbackPersistence.js
128✔
90
  async setup (broker) {
128✔
91
    this.#broker = broker
128✔
92

128✔
93
    // database already connected
128✔
94
    if (this.#db) {
128!
UNCOV
95
      return
×
UNCOV
96
    }
×
97

128✔
98
    // database already provided in the options
128✔
99
    if (this.#opts.db) {
128✔
100
      this.#db = this.#opts.db
8✔
101
    } else {
128✔
102
      // connect to the database
120✔
103
      const conn = this.#opts.url || 'mongodb://127.0.0.1/aedes'
120!
104
      const options = this.#opts.mongoOptions
120✔
105

120✔
106
      const mongoDBclient = new MongoClient(conn, options)
120✔
107
      this.#mongoDBclient = mongoDBclient
120✔
108
      const urlParsed = URL.parse(this.#opts.url)
120✔
109
      // skip the first / of the pathname if it exists
120✔
110
      const pathname = urlParsed.pathname ? urlParsed.pathname.substring(1) : undefined
120!
111
      const databaseName = this.#opts.database || pathname
120✔
112
      this.#db = mongoDBclient.db(databaseName)
120✔
113
    }
120✔
114

128✔
115
    const collectionPrefix = `${this.#opts.collectionPrefix || ''}`
128✔
116

128✔
117
    const db = this.#db
128✔
118
    const subscriptions = db.collection(`${collectionPrefix}subscriptions`)
128✔
119
    const retained = db.collection(`${collectionPrefix}retained`)
128✔
120
    const will = db.collection(`${collectionPrefix}will`)
128✔
121
    const outgoing = db.collection(`${collectionPrefix}outgoing`)
128✔
122
    const incoming = db.collection(`${collectionPrefix}incoming`)
128✔
123
    this.#cl = {
128✔
124
      subscriptions,
128✔
125
      retained,
128✔
126
      will,
128✔
127
      outgoing,
128✔
128
      incoming
128✔
129
    }
128✔
130

128✔
131
    // drop existing TTL indexes (if exist)
128✔
132
    if (this.#opts.dropExistingIndexes) {
128✔
133
      const collections = await db.collections()
2✔
134
      for (const collection of collections) {
2✔
135
        const exists = await collection.indexExists('ttl')
12✔
136
        if (exists) {
12✔
137
          await collection.dropIndex('ttl')
10✔
138
        }
10✔
139
      }
12✔
140
    }
2✔
141

128✔
142
    // create indexes
128✔
143
    const createIndex = async (idx) => {
128✔
144
      const indexOpts = { name: idx.name }
564✔
145
      if (typeof idx.expireAfterSeconds === 'number') {
564✔
146
        indexOpts.expireAfterSeconds = idx.expireAfterSeconds
52✔
147
      }
52✔
148
      await this.#cl[idx.collection].createIndex(idx.key, indexOpts)
564✔
149
    }
564✔
150

128✔
151
    const indexes = [
128✔
152
      {
128✔
153
        collection: 'outgoing',
128✔
154
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
128✔
155
        name: 'query_clientId_brokerId'
128✔
156
      },
128✔
157
      {
128✔
158
        collection: 'outgoing',
128✔
159
        key: { clientId: 1, 'packet.messageId': 1 },
128✔
160
        name: 'query_clientId_messageId'
128✔
161
      },
128✔
162
      {
128✔
163
        collection: 'incoming',
128✔
164
        key: { clientId: 1, 'packet.brokerId': 1, 'packet.brokerCounter': 1 },
128✔
165
        name: 'query_clientId_brokerId'
128✔
166
      },
128✔
167
      {
128✔
168
        collection: 'incoming',
128✔
169
        key: { clientId: 1, 'packet.messageId': 1 },
128✔
170
        name: 'query_clientId_messageId'
128✔
171
      }
128✔
172
    ]
128✔
173

128✔
174
    // Add TTL indexes
128✔
175
    this.#addTTLIndexes(indexes)
128✔
176
    // create all indexes in parallel
128✔
177
    await Promise.all(indexes.map(createIndex))
128✔
178

128✔
179
    if (this.#opts.ttlAfterDisconnected) {
128✔
180
      // To avoid stale subscriptions that might be left behind by broker shutting
4✔
181
      // down while clients were connected, set all to disconnected on startup.
4✔
182
      await this.#cl.subscriptions.updateMany({ disconnected: { $exists: false } }, { $currentDate: { disconnected: true } })
4✔
183

4✔
184
      // Handlers for setting and clearing the disconnected timestamp on subscriptions
4✔
185
      this.#broker.on('clientReady', (client) => {
4✔
UNCOV
186
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $unset: { disconnected: true } })
×
187
      })
4✔
188
      this.#broker.on('clientDisconnect', (client) => {
4✔
189
        this.#cl.subscriptions.updateMany({ clientId: client.id }, { $currentDate: { disconnected: true } })
2✔
190
      })
4✔
191
    }
4✔
192

128✔
193
    // add subscriptions to Trie
128✔
194
    for await (const subscription of subscriptions.find({
128✔
195
      qos: { $gte: 0 }
128✔
196
    })) {
128✔
197
      this.#trie.add(subscription.topic, subscription)
2✔
198
    }
2✔
199
    // subscribe to the broker for subscription updates
128✔
200
    this.#broadcast = new BroadcastPersistence(broker, this.#trie)
128✔
201
    await this.#broadcast.brokerSubscribe()
128✔
202
    // setup is done
128✔
203
  }
128✔
204

128✔
205
  async processRetainedBulk () {
128✔
206
    if (!this.#executing && !this.#destroyed && this.#retainedBulkQueue.length > 0) {
4,066✔
207
      this.#executing = true
2,032✔
208
      const operations = []
2,032✔
209
      const onEnd = []
2,032✔
210

2,032✔
211
      while (this.#retainedBulkQueue.length) {
2,032✔
212
        const { operation, resolve } = this.#retainedBulkQueue.shift()
2,032✔
213
        operations.push(operation)
2,032✔
214
        onEnd.push(resolve)
2,032✔
215
      }
2,032✔
216
      // execute operations and ignore the error
2,032✔
217
      await this.#cl.retained.bulkWrite(operations).catch(() => {})
2,032✔
218
      // resolve all promises
2,032✔
219
      while (onEnd.length) onEnd.shift().call()
2,032✔
220
      // check if we have new packets in queue
2,032✔
221
      this.#executing = false
2,032✔
222
      // do not await as we run this in background and ignore errors
2,032✔
223
      this.processRetainedBulk()
2,032✔
224
    }
2,032✔
225
    if (this.#destroyed) {
4,066✔
226
      // cleanup dangling promises
2✔
227
      while (this.#retainedBulkQueue.length) {
2✔
228
        const { resolve } = this.#retainedBulkQueue.shift()
2✔
229
        resolve() // resolve all promises
2✔
230
      }
2✔
231
    }
2✔
232
  }
4,066✔
233

128✔
234
  async storeRetained (packet) {
128✔
235
    const { promise, resolve } = promiseWithResolvers()
2,034✔
236
    const queue = this.#retainedBulkQueue
2,034✔
237
    const filter = { topic: packet.topic }
2,034✔
238

2,034✔
239
    if (packet.payload.length > 0) {
2,034✔
240
      queue.push({
2,032✔
241
        operation: {
2,032✔
242
          updateOne: {
2,032✔
243
            filter,
2,032✔
244
            update: { $set: decoratePacket(packet, this.#opts.ttl.packets) },
2,032✔
245
            upsert: true
2,032✔
246
          }
2,032✔
247
        },
2,032✔
248
        resolve
2,032✔
249
      })
2,032✔
250
    } else {
2,034✔
251
      queue.push({
2✔
252
        operation: {
2✔
253
          deleteOne: {
2✔
254
            filter
2✔
255
          }
2✔
256
        },
2✔
257
        resolve
2✔
258
      })
2✔
259
    }
2✔
260
    this.processRetainedBulk()
2,034✔
261
    return promise
2,034✔
262
  }
2,034✔
263

128✔
264
  createRetainedStream (pattern) {
128✔
265
    return this.createRetainedStreamCombi([pattern])
20✔
266
  }
20✔
267

128✔
268
  async * createRetainedStreamCombi (patterns) {
128✔
269
    const matcher = new Qlobber(QLOBBER_OPTIONS)
24✔
270

24✔
271
    for (let i = 0; i < patterns.length; i++) {
24✔
272
      matcher.add(patterns[i], true)
30✔
273
    }
30✔
274

24✔
275
    // To avoid MongoDB "regular expression is too large" errors,
24✔
276
    // we need to batch patterns when they're numerous or long.
24✔
277
    // MongoDB has a BSON document size limit of ~16MB, but regex patterns
24✔
278
    // can hit practical limits around 32KB depending on the driver.
24✔
279
    const MAX_PATTERNS_PER_BATCH = 50
24✔
280
    const MAX_TOTAL_PATTERN_LENGTH = 5000
24✔
281

24✔
282
    // Calculate total pattern length
24✔
283
    const totalLength = patterns.reduce((sum, p) => sum + p.length, 0)
24✔
284

24✔
285
    // Determine if we need to batch
24✔
286
    const needsBatching =
24✔
287
      patterns.length > MAX_PATTERNS_PER_BATCH ||
24✔
288
      totalLength > MAX_TOTAL_PATTERN_LENGTH
24✔
289

24✔
290
    if (needsBatching) {
24!
NEW
UNCOV
291
      // Process patterns in batches to avoid creating regex that's too large
×
NEW
UNCOV
292
      const batchSize = MAX_PATTERNS_PER_BATCH
×
NEW
UNCOV
293
      const seenTopics = new Set() // Track yielded packets to avoid duplicates
×
NEW
UNCOV
294

×
NEW
295
      for (let i = 0; i < patterns.length; i += batchSize) {
×
NEW
296
        const batch = patterns.slice(i, i + batchSize)
×
NEW
297

×
NEW
298
        for await (const packet of this.#queryRetainedByPatterns(batch, matcher)) {
×
NEW
299
          // Avoid duplicates across batches
×
NEW
300
          if (!seenTopics.has(packet.topic)) {
×
NEW
301
            seenTopics.add(packet.topic)
×
NEW
302
            yield packet
×
NEW
303
          }
×
NEW
304
        }
×
NEW
305
      }
×
306
    } else {
24✔
307
      // Original logic for small pattern sets
24✔
308
      for await (const packet of this.#queryRetainedByPatterns(patterns, matcher)) {
24✔
309
        yield packet
22✔
310
      }
22✔
311
    }
24✔
312
  }
24✔
313

128✔
314
  async * #queryRetainedByPatterns (patterns, matcher) {
128✔
315
    const regexes = patterns.map(pattern =>
24✔
316
      regEscape(pattern).replace(/(\/*#|\\\+).*$/, '')
30✔
317
    )
24✔
318

24✔
319
    const topic = new RegExp(regexes.join('|'))
24✔
320
    const filter = { topic }
24✔
321
    const exclude = { _id: 0 }
24✔
322

24✔
323
    for await (const result of this.#cl.retained.find(filter).project(exclude)) {
24✔
324
      const packet = asPacket(result)
22✔
325
      if (matcher.match(packet.topic).length > 0) {
22✔
326
        yield packet
22✔
327
      }
22✔
328
    }
22✔
329
  }
24✔
330

128✔
331
  async addSubscriptions (client, subs) {
128✔
332
    const subscriptions = []
64✔
333
    const operations = subs.map(sub => {
64✔
334
      const subscription = { ...sub, clientId: client.id }
114✔
335
      subscriptions.push(subscription)
114✔
336
      return {
114✔
337
        updateOne: {
114✔
338
          filter: {
114✔
339
            clientId: client.id,
114✔
340
            topic: sub.topic
114✔
341
          },
114✔
342
          update: {
114✔
343
            $set: decorateSubscription(subscription, this.#opts)
114✔
344
          },
114✔
345
          upsert: true
114✔
346
        }
114✔
347
      }
114✔
348
    })
64✔
349

64✔
350
    await this.#cl.subscriptions.bulkWrite(operations)
64✔
351
    // inform the broker
64✔
352
    await this.#broadcast.addedSubscriptions(client, subs)
64✔
353
  }
64✔
354

128✔
355
  async removeSubscriptions (client, subs) {
128✔
356
    const operations = subs.map(topic => ({
32✔
357
      deleteOne: {
34✔
358
        filter: {
34✔
359
          clientId: client.id,
34✔
360
          topic
34✔
361
        }
34✔
362
      }
34✔
363
    }))
32✔
364
    await this.#cl.subscriptions.bulkWrite(operations)
32✔
365
    // inform the broker
32✔
366
    await this.#broadcast.removedSubscriptions(client, subs)
32✔
367
  }
32✔
368

128✔
369
  async subscriptionsByClient (client) {
128✔
370
    const filter = { clientId: client.id }
32✔
371
    const exclude = { clientId: false, _id: false } // exclude these fields
32✔
372
    const subs = await this.#cl.subscriptions.find(filter).project(exclude).toArray()
32✔
373
    return subs
32✔
374
  }
32✔
375

128✔
376
  async countOffline () {
128✔
377
    const subsCount = this.#trie.subscriptionsCount
44✔
378
    const result = await this.#cl.subscriptions.aggregate([
44✔
379
      {
44✔
380
        $group: {
44✔
381
          _id: '$clientId'
44✔
382
        }
44✔
383
      }, {
44✔
384
        $count: 'clientsCount'
44✔
385
      }]).toArray()
44✔
386
    const clientsCount = result[0]?.clientsCount || 0
44✔
387
    return { subsCount, clientsCount }
44✔
388
  }
44✔
389

128✔
390
  async destroy () {
128✔
391
    if (this.#destroyed) {
128!
392
      throw new Error('destroyed called twice!')
×
UNCOV
393
    }
×
394
    this.#destroyed = true
128✔
395
    // stop listening to subscription updates
128✔
396
    await this.#broadcast.brokerUnsubscribe()
128✔
397

128✔
398
    if (this.#opts.db) {
128✔
399
      return
8✔
400
    }
8✔
401
    await this.#mongoDBclient.close()
120✔
402
  }
128✔
403

128✔
404
  async subscriptionsByTopic (topic) {
128✔
405
    return this.#trie.match(topic)
34✔
406
  }
34✔
407

128✔
408
  async cleanSubscriptions (client) {
128✔
409
    const subs = await this.subscriptionsByClient(client)
4✔
410
    if (subs.length > 0) {
4✔
411
      const remSubs = subs.map(sub => sub.topic)
2✔
412
      await this.removeSubscriptions(client, remSubs)
2✔
413
    }
2✔
414
  }
4✔
415

128✔
416
  async outgoingEnqueue (sub, packet) {
128✔
417
    return await this.outgoingEnqueueCombi([sub], packet)
12✔
418
  }
12✔
419

128✔
420
  async outgoingEnqueueCombi (subs, packet) {
128✔
421
    if (subs?.length === 0) {
30!
UNCOV
422
      return packet
×
UNCOV
423
    }
×
424

30✔
425
    const newPacket = new Packet(packet)
30✔
426
    const decoratedPacket = decoratePacket(newPacket, this.#opts.ttl.packets)
30✔
427
    const packets = subs.map(sub => ({
30✔
428
      clientId: sub.clientId,
32✔
429
      packet: decoratedPacket
32✔
430
    }))
30✔
431

30✔
432
    await this.#cl.outgoing.insertMany(packets)
30✔
433
  }
30✔
434

128✔
435
  async * outgoingStream (client) {
128✔
436
    for await (const result of this.#cl.outgoing.find({ clientId: client.id })) {
30✔
437
      yield asPacket(result)
34✔
438
    }
34✔
439
  }
30✔
440

128✔
441
  async outgoingUpdate (client, packet) {
128✔
442
    if (packet.brokerId) {
26✔
443
      await updateWithMessageId(this.#cl, client, packet)
24✔
444
    } else {
26✔
445
      await updatePacket(this.#cl, client, packet)
2✔
446
    }
2✔
447
  }
26✔
448

128✔
449
  async outgoingClearMessageId (client, packet) {
128✔
450
    const result = await this.#cl.outgoing.findOneAndDelete({
8✔
451
      clientId: client.id,
8✔
452
      'packet.messageId': packet.messageId
8✔
453
    })
8✔
454
    return result ? asPacket(result) : null
8✔
455
  }
8✔
456

128✔
457
  async incomingStorePacket (client, packet) {
128✔
458
    const newPacket = new Packet(packet)
4✔
459
    newPacket.messageId = packet.messageId
4✔
460

4✔
461
    await this.#cl.incoming.insertOne({
4✔
462
      clientId: client.id,
4✔
463
      packet: decoratePacket(newPacket, this.#opts.ttl.packets)
4✔
464
    })
4✔
465
  }
4✔
466

128✔
467
  async incomingGetPacket (client, packet) {
128✔
468
    const result = await this.#cl.incoming.findOne({
4✔
469
      clientId: client.id,
4✔
470
      'packet.messageId': packet.messageId
4✔
471
    })
4✔
472

4✔
473
    if (!result) {
4✔
474
      throw new Error(`packet not found for: ${client}`)
2✔
475
    }
2✔
476

2✔
477
    return asPacket(result)
2✔
478
  }
4✔
479

128✔
480
  async incomingDelPacket (client, packet) {
128✔
481
    await this.#cl.incoming.deleteOne({
2✔
482
      clientId: client.id,
2✔
483
      'packet.messageId': packet.messageId
2✔
484
    })
2✔
485
  }
2✔
486

128✔
487
  async putWill (client, packet) {
128✔
488
    packet.clientId = client.id
12✔
489
    packet.brokerId = this.#broker.id
12✔
490
    await this.#cl.will.insertOne({
12✔
491
      clientId: client.id,
12✔
492
      packet: decoratePacket(packet, this.#opts.ttl.packets)
12✔
493
    })
12✔
494
  }
12✔
495

128✔
496
  async getWill (client) {
128✔
497
    const result = await this.#cl.will.findOne({
4✔
498
      clientId: client.id
4✔
499
    })
4✔
500
    if (!result) {
4✔
501
      return null // packet not found
2✔
502
    }
2✔
503
    return asPacket(result)
2✔
504
  }
4✔
505

128✔
506
  async delWill (client) {
128✔
507
    const result = await this.#cl.will.findOneAndDelete({
8✔
508
      clientId: client.id
8✔
509
    })
8✔
510
    if (!result) {
8!
UNCOV
511
      return null // packet not found
×
UNCOV
512
    }
×
513
    return asPacket(result)
8✔
514
  }
8✔
515

128✔
516
  async * streamWill (brokers) {
128✔
517
    const filter = {}
4✔
518

4✔
519
    if (brokers) {
4✔
520
      filter['packet.brokerId'] = { $nin: Object.keys(brokers) }
2✔
521
    }
2✔
522
    for await (const will of this.#cl.will.find(filter)) {
4✔
523
      yield asPacket(will)
4✔
524
    }
4✔
525
  }
4✔
526

128✔
527
  async * getClientList (topic) {
128✔
528
    const filter = {}
4✔
529
    if (topic) {
4✔
530
      filter.topic = topic
4✔
531
    }
4✔
532
    for await (const sub of this.#cl.subscriptions.find(filter)) {
4✔
533
      yield sub.clientId
6✔
534
    }
6✔
535
  }
4✔
536
}
128✔
537

4✔
538
function decoratePacket (packet, setTTL) {
2,078✔
539
  if (setTTL) {
2,078✔
540
    packet.added = new Date()
10✔
541
  }
10✔
542
  return packet
2,078✔
543
}
2,078✔
544

4✔
545
function decorateSubscription (sub, opts) {
114✔
546
  if (opts.ttl.subscriptions) {
114✔
547
    sub.added = new Date()
2✔
548
  }
2✔
549
  return sub
114✔
550
}
114✔
551

4✔
552
function asPacket (obj) {
78✔
553
  const packet = obj?.packet || obj
78✔
554
  if (!packet) {
78!
555
    throw new Error('Invalid packet')
×
556
  }
×
557
  if (Buffer.isBuffer(packet?.payload?.buffer)) {
78✔
558
    packet.payload = packet.payload.buffer
74✔
559
  }
74✔
560
  return packet
78✔
561
}
78✔
562

4✔
563
async function updateWithMessageId (db, client, packet) {
24✔
564
  await db.outgoing.updateOne({
24✔
565
    clientId: client.id,
24✔
566
    'packet.brokerCounter': packet.brokerCounter,
24✔
567
    'packet.brokerId': packet.brokerId
24✔
568
  }, {
24✔
569
    $set: {
24✔
570
      'packet.messageId': packet.messageId
24✔
571
    }
24✔
572
  })
24✔
573
}
24✔
574

4✔
575
async function updatePacket (db, client, packet) {
2✔
576
  await db.outgoing.updateOne({
2✔
577
    clientId: client.id,
2✔
578
    'packet.messageId': packet.messageId
2✔
579
  }, {
2✔
580
    $set: {
2✔
581
      clientId: client.id,
2✔
582
      packet
2✔
583
    }
2✔
584
  })
2✔
585
}
2✔
586

4✔
587
function promiseWithResolvers () {
2,034✔
588
  // this can be replaced by Promise.withResolvers()in NodeJS >= 22
2,034✔
589
  let res
2,034✔
590
  let rej
2,034✔
591
  const promise = new Promise((resolve, reject) => {
2,034✔
592
    res = resolve
2,034✔
593
    rej = reject
2,034✔
594
  })
2,034✔
595
  return { promise, resolve: res, reject: rej }
2,034✔
596
}
2,034✔
597

4✔
598
module.exports = AsyncMongoPersistence
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc