• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 8610775382

09 Apr 2024 05:25AM UTC coverage: 89.638% (+0.1%) from 89.522%
8610775382

push

github

web-flow
Fix issue #26 and #55 and add missing fields in type definition + quorum support (#56)

* chore(package.json): bump version

* feat(index.d.ts): Add `messageTtl`, `passive` (existing fields) and `type` (x-queue-type, for quorum support) + protocol definition for #26

* chore(docs): update docs t reflect changes

* fix(integration): proper protocol (#26) and fix url encode error (#55)

* chore(spec): update connection and queue options with new protocol and queue type field

* refactor(connection.js): use `startsWith` instead of equal, just in case

* feat(index.d.ts): Add `autoDelete` key to QueueOptions definition (it is documented and shown in examples)

* feat(queue.js): Add proper support for quorum queues and silently omit incompatible fields

* feat(queue.spec.js): Add test case for quorum queue type and check if call is valid

* refactor(queue.js): Use ternary operation instead for omition to improve readability

* feat(queue.spec.js): Add behavior test case for quorum queue type (not set)

* chore(topology.md): update docs with queue type and classic queue deprecation warning

* feat(connection.js): make sure `protocol` field is found and backwards compatible (#26)

* docs(topology.md): Added known workaround for issue #23 using async/await example with explanation

* feat(queue.js): add dead letter strategy argument (supports at least once since v3.10 for quorum) + update typing to reflect it

* test(queue.spec.js): Check behavior for `x-dead-letter-strategy` and validate it is correct (only for quorum)

* docs(topology.md): Add `deadLetterStrategy` and `overflow` to `addQueue` parameter description + explanation of choices

* docs(topology.md): refactor example and formatting recent addition to fit norm

* docs(topology.md): shorten example

* docs(topology.md): update configure example to properly use promises

* docs: finalized toplogy example and changes all outdated `var` variable to `const`

* r... (continued)

656 of 839 branches covered (78.19%)

Branch coverage included in aggregate %.

77 of 78 new or added lines in 3 files covered. (98.72%)

3 existing lines in 2 files now uncovered.

3773 of 4102 relevant lines covered (91.98%)

427.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.79
/src/queueFsm.js
1
const machina = require('machina');
2✔
2
const format = require('util').format;
2✔
3
const Monologue = require('node-monologue');
2✔
4
Monologue.mixInto(machina.Fsm);
2✔
5
const log = require('./log.js')('rabbot.queue');
2✔
6
const defer = require('./defer');
2✔
7

8
/* log
9
  * `rabbot.queue`
10
    * `debug`
11
      * release called
12
    * `info`
13
      * subscription started
14
      * queue released
15
    * `warn`
16
      * queue released with pending messages
17
*/
18

19
function unhandle (handlers) {
20
  handlers.forEach((handle) =>
306✔
21
    handle.unsubscribe()
1,202✔
22
  );
23
}
24

25
const Factory = function (options, connection, topology, serializers, queueFn) {
2✔
26
  // allows us to optionally provide a mock
27
  queueFn = queueFn || require('./amqp/queue');
156✔
28

29
  const Fsm = machina.Fsm.extend({
156✔
30
    name: options.name,
31
    uniqueName: options.uniqueName,
32
    responseSubscriptions: {},
33
    signalSubscription: undefined,
34
    subscribed: false,
35
    subscriber: undefined,
36
    purger: undefined,
37
    unsubscribers: [],
38
    releasers: [],
39

40
    _define: function (queue) {
41
      const onError = function (err) {
13,133✔
42
        this.failedWith = err;
12,985✔
43
        this.transition('failed');
12,985✔
44
      }.bind(this);
45
      const onDefined = function (defined) {
13,133✔
46
        if (!this.name) {
148✔
47
          this.name = defined.queue;
2✔
48
          options.name = defined.queue;
2✔
49
          queue.messages.changeName(this.name);
2✔
50
          topology.renameQueue(defined.queue);
2✔
51
        }
52
        this.transition('ready');
148✔
53
      }.bind(this);
54
      queue.define()
13,133✔
55
        .then(onDefined, onError);
56
    },
57

58
    _listen: function (queue) {
59
      const handlers = [];
152✔
60
      const emit = this.emit.bind(this);
152✔
61

62
      const unsubscriber = function () {
152✔
63
        return queue.unsubscribe();
2✔
64
      };
65

66
      const onPurge = function (messageCount) {
152✔
67
        log.info(`Purged ${messageCount} queue ${options.name} - ${connection.name}`);
8✔
68
        this.handle('purged', messageCount);
8✔
69
      }.bind(this);
70

71
      const purger = function () {
152✔
72
        return queue
8✔
73
          .purge()
74
          .then(onPurge)
75
          .catch(function (err) {
76
            emit('purgeFailed', err);
×
77
          });
78
      };
79

80
      const onSubscribe = function () {
152✔
81
        log.info('Subscription to (%s) queue %s - %s started with consumer tag %s',
146✔
82
          options.noAck ? 'untracked' : 'tracked',
146✔
83
          options.name,
84
          connection.name,
85
          queue.channel.tag);
86
        this.unsubscribers.push(unsubscriber);
146✔
87
        this.handle('subscribed');
146✔
88
      }.bind(this);
89

90
      const subscriber = function (exclusive) {
152✔
91
        return queue
146✔
92
          .subscribe(!!exclusive)
93
          .then(onSubscribe)
94
          .catch(function (err) {
UNCOV
95
            emit('subscribeFailed', err);
×
96
          });
97
      };
98

99
      const releaser = function (closed) {
152✔
100
        // remove handlers established on queue
101
        unhandle(handlers);
140✔
102
        if (queue && queue.getMessageCount() > 0) {
140✔
103
          log.warn('!!! Queue %s - %s was released with %d pending messages !!!',
62✔
104
            options.name, connection.name, queue.getMessageCount());
105
        } else if (queue) {
78!
106
          log.info('Released queue %s - %s', options.name, connection.name);
78✔
107
        }
108

109
        if (!closed) {
140✔
110
          queue.release()
138✔
111
            .then(function () {
112
              this.handle('released');
138✔
113
            }.bind(this));
114
        }
115
      }.bind(this);
116

117
      this.subscriber = subscriber;
152✔
118
      this.releasers.push(releaser);
152✔
119
      this.purger = purger;
152✔
120

121
      handlers.push(queue.channel.on('acquired', function () {
152✔
122
        this._define(queue);
12,981✔
123
      }.bind(this))
124
      );
125
      handlers.push(queue.channel.on('released', function () {
152✔
126
        this.handle('released', queue);
×
127
      }.bind(this))
128
      );
129
      handlers.push(queue.channel.on('closed', function () {
152✔
130
        this.handle('closed', queue);
2✔
131
      }.bind(this))
132
      );
133
      handlers.push(connection.on('unreachable', function (err) {
152✔
134
        err = err || new Error('Could not establish a connection to any known nodes.');
×
135
        this.handle('unreachable', queue);
×
136
      }.bind(this))
137
      );
138

139
      if (options.subscribe) {
152✔
140
        this.handle('subscribe');
140✔
141
      }
142
    },
143

144
    _release: function (closed) {
145
      const release = this.releasers.shift();
142✔
146
      if (release) {
142✔
147
        release(closed);
140✔
148
      } else {
149
        return Promise.resolve();
2✔
150
      }
151
    },
152

153
    check: function () {
154
      const deferred = defer();
56✔
155
      this.handle('check', deferred);
56✔
156
      return deferred.promise;
56✔
157
    },
158

159
    purge: function () {
160
      return new Promise(function (resolve, reject) {
10✔
161
        let _handlers = null;
10✔
162
        function cleanResolve (result) {
163
          unhandle(_handlers);
8✔
164
          resolve(result);
8✔
165
        }
166
        function cleanReject (err) {
167
          unhandle(_handlers);
2✔
168
          this.transition('failed');
2✔
169
          reject(err);
2✔
170
        }
171
        _handlers = [
10✔
172
          this.once('purged', cleanResolve),
173
          this.once('purgeFailed', cleanReject.bind(this)),
174
          this.once('failed', cleanReject.bind(this))
175
        ];
176
        this.handle('purge');
10✔
177
      }.bind(this));
178
    },
179

180
    reconnect: function () {
181
      if (/releas/.test(this.state)) {
48!
182
        this.transition('initializing');
×
183
      }
184
      return this.check();
48✔
185
    },
186

187
    release: function () {
188
      return new Promise(function (resolve, reject) {
144✔
189
        let _handlers = null;
144✔
190
        function cleanResolve () {
191
          unhandle(_handlers);
144✔
192
          resolve();
144✔
193
        }
194
        function cleanReject (err) {
195
          unhandle(_handlers);
×
196
          reject(err);
×
197
        }
198
        _handlers = [
144✔
199
          this.once('released', cleanResolve),
200
          this.once('failed', cleanReject),
201
          this.once('unreachable', cleanReject),
202
          this.once('noqueue', cleanResolve)
203
        ];
204
        this.handle('release');
144✔
205
      }.bind(this));
206
    },
207

208
    retry: function () {
209
      this.transition('initializing');
×
210
    },
211

212
    subscribe: function (exclusive) {
213
      options.subscribe = true;
14✔
214
      options.exclusive = exclusive;
14✔
215
      return new Promise(function (resolve, reject) {
14✔
216
        let _handlers = null;
14✔
217
        function cleanResolve () {
218
          unhandle(_handlers);
10✔
219
          resolve();
10✔
220
        }
221
        function cleanReject (err) {
222
          unhandle(_handlers);
2✔
223
          this.transition('failed');
2✔
224
          reject(err);
2✔
225
        }
226
        _handlers = [
14✔
227
          this.once('subscribed', cleanResolve),
228
          this.once('subscribeFailed', cleanReject.bind(this)),
229
          this.once('failed', cleanReject.bind(this))
230
        ];
231
        this.handle('subscribe');
14✔
232
      }.bind(this));
233
    },
234

235
    unsubscribe: function () {
236
      options.subscribe = false;
2✔
237
      const unsubscriber = this.unsubscribers.shift();
2✔
238
      if (unsubscriber) {
2!
239
        return unsubscriber();
2✔
240
      } else {
241
        return Promise.reject(new Error('No active subscription presently exists on the queue'));
×
242
      }
243
    },
244

245
    initialState: 'initializing',
246
    states: {
247
      closed: {
248
        _onEnter: function () {
249
          this.subscribed = false;
2✔
250
          this._release(true);
2✔
251
          this.emit('closed');
2✔
252
        },
253
        check: function () {
254
          this.deferUntilTransition('ready');
2✔
255
          this.transition('initializing');
2✔
256
        },
257
        purge: function () {
258
          this.deferUntilTransition('ready');
×
259
        },
260
        subscribe: function () {
261
          this.deferUntilTransition('ready');
×
262
        }
263
      },
264
      failed: {
265
        _onEnter: function () {
266
          this.subscribed = false;
4✔
267
          this.emit('failed', this.failedWith);
4✔
268
        },
269
        check: function (deferred) {
270
          if (deferred) {
2!
271
            deferred.reject(this.failedWith);
2✔
272
          }
273
          this.emit('failed', this.failedWith);
2✔
274
        },
275
        release: function (queue) {
276
          if (queue) {
×
277
            this._removeHandlers();
×
278
            queue.release()
×
279
              .then(function () {
280
                this.handle('released', queue);
×
281
              });
282
          }
283
        },
284
        released: function () {
285
          this.transition('released');
×
286
        },
287
        purge: function () {
288
          this.emit('purgeFailed', this.failedWith);
2✔
289
        },
290
        subscribe: function () {
291
          this.emit('subscribeFailed', this.failedWith);
2✔
292
        }
293
      },
294
      initializing: {
295
        _onEnter: function () {
296
          queueFn(options, topology, serializers)
158✔
297
            .then(
298
              queue => {
299
                this.lastQueue = queue;
152✔
300
                this.handle('acquired', queue);
152✔
301
              },
302
              err => {
303
                this.failedWith = err;
×
304
                this.transition('failed');
×
305
              }
306
            );
307
        },
308
        acquired: function (queue) {
309
          this.receivedMessages = queue.messages;
152✔
310
          this._define(queue);
152✔
311
          this._listen(queue);
152✔
312
        },
313
        check: function () {
314
          this.deferUntilTransition('ready');
48✔
315
        },
316
        release: function () {
317
          this.deferUntilTransition('ready');
2✔
318
        },
319
        closed: function () {
320
          this.deferUntilTransition('ready');
×
321
        },
322
        purge: function () {
323
          this.deferUntilTransition('ready');
×
324
        },
325
        subscribe: function () {
326
          this.deferUntilTransition('ready');
140✔
327
        }
328
      },
329
      ready: {
330
        _onEnter: function () {
331
          this.emit('defined');
162✔
332
        },
333
        check: function (deferred) {
334
          deferred.resolve();
48✔
335
        },
336
        closed: function () {
337
          this.transition('closed');
×
338
        },
339
        purge: function () {
340
          if (this.purger) {
8!
341
            this.transition('purging');
8✔
342
            return this.purger();
8✔
343
          }
344
        },
345
        release: function () {
346
          this.transition('releasing');
8✔
347
          this.handle('release');
8✔
348
        },
349
        released: function () {
350
          this._release(true);
×
351
          this.transition('initializing');
×
352
        },
353
        subscribe: function () {
354
          if (this.subscriber) {
146!
355
            this.deferAndTransition('subscribing');
146✔
356
            return this.subscriber();
146✔
357
          }
358
        }
359
      },
360
      purging: {
361
        closed: function () {
362
          this.transition('closed');
×
363
        },
364
        purged: function () {
365
          this.deferAndTransition('purged');
8✔
366
        },
367
        release: function () {
368
          this.transition('releasing');
×
369
          this.handle('release');
×
370
        },
371
        released: function () {
372
          this._release(true);
×
373
          this.transition('initializing');
×
374
        },
375
        subscribe: function () {
376
          this.deferUntilTransition('subscribed');
×
377
        }
378
      },
379
      purged: {
380
        check: function (deferred) {
381
          deferred.resolve();
×
382
        },
383
        closed: function () {
384
          this.transition('closed');
×
385
        },
386
        release: function () {
387
          this.transition('releasing');
×
388
          this.handle('release');
×
389
        },
390
        released: function () {
391
          this._release(true);
×
392
          this.transition('initializing');
×
393
        },
394
        purged: function (result) {
395
          this.emit('purged', result);
8✔
396
          if (this.subscribed && this.subscriber) {
8✔
397
            this.subscribe()
6✔
398
              .then(
399
                null,
400
                () => {
401
                  this.subscribed = false;
×
402
                }
403
              );
404
          } else {
405
            this.transition('ready');
2✔
406
          }
407
        },
408
        subscribe: function () {
409
          this.deferAndTransition('ready');
6✔
410
        }
411
      },
412
      releasing: {
413
        release: function () {
414
          this._release(false);
140✔
415
        },
416
        released: function () {
417
          this.transition('released');
138✔
418
        }
419
      },
420
      released: {
421
        _onEnter: function () {
422
          this.subscribed = false;
138✔
423
          this.emit('released');
138✔
424
        },
425
        check: function (deferred) {
426
          deferred.reject(new Error(format("Cannot establish queue '%s' after intentionally closing its connection", this.name)));
2✔
427
        },
428
        purge: function () {
429
          this.emit('purgeFailed', new Error(format("Cannot purge to queue '%s' after intentionally closing its connection", this.name)));
×
430
        },
431
        release: function () {
432
          this.emit('released');
4✔
433
        },
434
        subscribe: function () {
435
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' after intentionally closing its connection", this.name)));
×
436
        }
437
      },
438
      subscribing: {
439
        closed: function () {
440
          this.transition('closed');
×
441
        },
442
        purge: function () {
443
          this.deferUntilTransition('ready');
×
444
        },
445
        release: function () {
446
          this.transition('releasing');
×
447
          this.handle('release');
×
448
        },
449
        released: function () {
450
          this._release(true);
×
451
          this.transition('initializing');
×
452
        },
453
        subscribe: function () {
454
          this.transition('subscribed');
146✔
455
        }
456
      },
457
      subscribed: {
458
        check: function (deferred) {
459
          deferred.resolve();
4✔
460
        },
461
        closed: function () {
462
          this.transition('closed');
2✔
463
        },
464
        purge: function () {
465
          this.deferUntilTransition('ready');
6✔
466
          this.transition('ready');
6✔
467
        },
468
        release: function () {
469
          this.transition('releasing');
130✔
470
          this.handle('release');
130✔
471
        },
472
        released: function () {
473
          this._release(true);
×
474
          this.transition('initializing');
×
475
        },
476
        subscribed: function () {
477
          this.subscribed = true;
146✔
478
          this.emit('subscribed', {});
146✔
479
        }
480
      },
481
      unreachable: {
482
        check: function (deferred) {
483
          deferred.reject(new Error(format("Cannot establish queue '%s' when no nodes can be reached", this.name)));
×
484
        },
485
        purge: function () {
486
          this.emit('purgeFailed', new Error(format("Cannot purge queue '%s' when no nodes can be reached", this.name)));
×
487
        },
488
        subscribe: function (sub) {
489
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' when no nodes can be reached", this.name)));
×
490
        }
491
      }
492
    }
493
  });
494

495
  const fsm = new Fsm();
156✔
496
  connection.addQueue(fsm);
156✔
497
  return fsm;
156✔
498
};
499

500
module.exports = Factory;
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc