• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 6916573836

18 Nov 2023 10:14PM UTC coverage: 89.543% (+0.02%) from 89.522%
6916573836

push

github

web-flow
feat(.nvmrc, package.json, package-lock.json, test-coverage-actions.yml): add support for Node 20 (#49)

Add support for Node 20, which is now LTS

fix #48

644 of 826 branches covered (0.0%)

Branch coverage included in aggregate %.

3706 of 4032 relevant lines covered (91.91%)

790.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.21
/src/queueFsm.js
1
const machina = require('machina');
4✔
2
const format = require('util').format;
4✔
3
const Monologue = require('node-monologue');
4✔
4
Monologue.mixInto(machina.Fsm);
4✔
5
const log = require('./log.js')('rabbot.queue');
4✔
6
const defer = require('./defer');
4✔
7

8
/* log
9
  * `rabbot.queue`
10
    * `debug`
11
      * release called
12
    * `info`
13
      * subscription started
14
      * queue released
15
    * `warn`
16
      * queue released with pending messages
17
*/
18

19
function unhandle (handlers) {
20
  handlers.forEach((handle) =>
612✔
21
    handle.unsubscribe()
2,404✔
22
  );
23
}
24

25
const Factory = function (options, connection, topology, serializers, queueFn) {
4✔
26
  // allows us to optionally provide a mock
27
  queueFn = queueFn || require('./amqp/queue');
312✔
28

29
  const Fsm = machina.Fsm.extend({
312✔
30
    name: options.name,
31
    uniqueName: options.uniqueName,
32
    responseSubscriptions: {},
33
    signalSubscription: undefined,
34
    subscribed: false,
35
    subscriber: undefined,
36
    purger: undefined,
37
    unsubscribers: [],
38
    releasers: [],
39

40
    _define: function (queue) {
41
      const onError = function (err) {
25,376✔
42
        this.failedWith = err;
25,079✔
43
        this.transition('failed');
25,079✔
44
      }.bind(this);
45
      const onDefined = function (defined) {
25,376✔
46
        if (!this.name) {
296✔
47
          this.name = defined.queue;
4✔
48
          options.name = defined.queue;
4✔
49
          queue.messages.changeName(this.name);
4✔
50
          topology.renameQueue(defined.queue);
4✔
51
        }
52
        this.transition('ready');
296✔
53
      }.bind(this);
54
      queue.define()
25,376✔
55
        .then(onDefined, onError);
56
    },
57

58
    _listen: function (queue) {
59
      const handlers = [];
304✔
60
      const emit = this.emit.bind(this);
304✔
61

62
      const unsubscriber = function () {
304✔
63
        return queue.unsubscribe();
4✔
64
      };
65

66
      const onPurge = function (messageCount) {
304✔
67
        log.info(`Purged ${messageCount} queue ${options.name} - ${connection.name}`);
16✔
68
        this.handle('purged', messageCount);
16✔
69
      }.bind(this);
70

71
      const purger = function () {
304✔
72
        return queue
16✔
73
          .purge()
74
          .then(onPurge)
75
          .catch(function (err) {
76
            emit('purgeFailed', err);
×
77
          });
78
      };
79

80
      const onSubscribe = function () {
304✔
81
        log.info('Subscription to (%s) queue %s - %s started with consumer tag %s',
292✔
82
          options.noAck ? 'untracked' : 'tracked',
292✔
83
          options.name,
84
          connection.name,
85
          queue.channel.tag);
86
        this.unsubscribers.push(unsubscriber);
292✔
87
        this.handle('subscribed');
292✔
88
      }.bind(this);
89

90
      const subscriber = function (exclusive) {
304✔
91
        return queue
293✔
92
          .subscribe(!!exclusive)
93
          .then(onSubscribe)
94
          .catch(function (err) {
95
            emit('subscribeFailed', err);
1✔
96
          });
97
      };
98

99
      const releaser = function (closed) {
304✔
100
        // remove handlers established on queue
101
        unhandle(handlers);
280✔
102
        if (queue && queue.getMessageCount() > 0) {
280✔
103
          log.warn('!!! Queue %s - %s was released with %d pending messages !!!',
124✔
104
            options.name, connection.name, queue.getMessageCount());
105
        } else if (queue) {
156!
106
          log.info('Released queue %s - %s', options.name, connection.name);
156✔
107
        }
108

109
        if (!closed) {
280✔
110
          queue.release()
276✔
111
            .then(function () {
112
              this.handle('released');
276✔
113
            }.bind(this));
114
        }
115
      }.bind(this);
116

117
      this.subscriber = subscriber;
304✔
118
      this.releasers.push(releaser);
304✔
119
      this.purger = purger;
304✔
120

121
      handlers.push(queue.channel.on('acquired', function () {
304✔
122
        this._define(queue);
25,072✔
123
      }.bind(this))
124
      );
125
      handlers.push(queue.channel.on('released', function () {
304✔
126
        this.handle('released', queue);
×
127
      }.bind(this))
128
      );
129
      handlers.push(queue.channel.on('closed', function () {
304✔
130
        this.handle('closed', queue);
4✔
131
      }.bind(this))
132
      );
133
      handlers.push(connection.on('unreachable', function (err) {
304✔
134
        err = err || new Error('Could not establish a connection to any known nodes.');
×
135
        this.handle('unreachable', queue);
×
136
      }.bind(this))
137
      );
138

139
      if (options.subscribe) {
304✔
140
        this.handle('subscribe');
280✔
141
      }
142
    },
143

144
    _release: function (closed) {
145
      const release = this.releasers.shift();
284✔
146
      if (release) {
284✔
147
        release(closed);
280✔
148
      } else {
149
        return Promise.resolve();
4✔
150
      }
151
    },
152

153
    check: function () {
154
      const deferred = defer();
112✔
155
      this.handle('check', deferred);
112✔
156
      return deferred.promise;
112✔
157
    },
158

159
    purge: function () {
160
      return new Promise(function (resolve, reject) {
20✔
161
        let _handlers = null;
20✔
162
        function cleanResolve (result) {
163
          unhandle(_handlers);
16✔
164
          resolve(result);
16✔
165
        }
166
        function cleanReject (err) {
167
          unhandle(_handlers);
4✔
168
          this.transition('failed');
4✔
169
          reject(err);
4✔
170
        }
171
        _handlers = [
20✔
172
          this.once('purged', cleanResolve),
173
          this.once('purgeFailed', cleanReject.bind(this)),
174
          this.once('failed', cleanReject.bind(this))
175
        ];
176
        this.handle('purge');
20✔
177
      }.bind(this));
178
    },
179

180
    reconnect: function () {
181
      if (/releas/.test(this.state)) {
96!
182
        this.transition('initializing');
×
183
      }
184
      return this.check();
96✔
185
    },
186

187
    release: function () {
188
      return new Promise(function (resolve, reject) {
288✔
189
        let _handlers = null;
288✔
190
        function cleanResolve () {
191
          unhandle(_handlers);
288✔
192
          resolve();
288✔
193
        }
194
        function cleanReject (err) {
195
          unhandle(_handlers);
×
196
          reject(err);
×
197
        }
198
        _handlers = [
288✔
199
          this.once('released', cleanResolve),
200
          this.once('failed', cleanReject),
201
          this.once('unreachable', cleanReject),
202
          this.once('noqueue', cleanResolve)
203
        ];
204
        this.handle('release');
288✔
205
      }.bind(this));
206
    },
207

208
    retry: function () {
209
      this.transition('initializing');
×
210
    },
211

212
    subscribe: function (exclusive) {
213
      options.subscribe = true;
28✔
214
      options.exclusive = exclusive;
28✔
215
      return new Promise(function (resolve, reject) {
28✔
216
        let _handlers = null;
28✔
217
        function cleanResolve () {
218
          unhandle(_handlers);
20✔
219
          resolve();
20✔
220
        }
221
        function cleanReject (err) {
222
          unhandle(_handlers);
4✔
223
          this.transition('failed');
4✔
224
          reject(err);
4✔
225
        }
226
        _handlers = [
28✔
227
          this.once('subscribed', cleanResolve),
228
          this.once('subscribeFailed', cleanReject.bind(this)),
229
          this.once('failed', cleanReject.bind(this))
230
        ];
231
        this.handle('subscribe');
28✔
232
      }.bind(this));
233
    },
234

235
    unsubscribe: function () {
236
      options.subscribe = false;
4✔
237
      const unsubscriber = this.unsubscribers.shift();
4✔
238
      if (unsubscriber) {
4!
239
        return unsubscriber();
4✔
240
      } else {
241
        return Promise.reject(new Error('No active subscription presently exists on the queue'));
×
242
      }
243
    },
244

245
    initialState: 'initializing',
246
    states: {
247
      closed: {
248
        _onEnter: function () {
249
          this.subscribed = false;
4✔
250
          this._release(true);
4✔
251
          this.emit('closed');
4✔
252
        },
253
        check: function () {
254
          this.deferUntilTransition('ready');
4✔
255
          this.transition('initializing');
4✔
256
        },
257
        purge: function () {
258
          this.deferUntilTransition('ready');
×
259
        },
260
        subscribe: function () {
261
          this.deferUntilTransition('ready');
×
262
        }
263
      },
264
      failed: {
265
        _onEnter: function () {
266
          this.subscribed = false;
8✔
267
          this.emit('failed', this.failedWith);
8✔
268
        },
269
        check: function (deferred) {
270
          if (deferred) {
4!
271
            deferred.reject(this.failedWith);
4✔
272
          }
273
          this.emit('failed', this.failedWith);
4✔
274
        },
275
        release: function (queue) {
276
          if (queue) {
×
277
            this._removeHandlers();
×
278
            queue.release()
×
279
              .then(function () {
280
                this.handle('released', queue);
×
281
              });
282
          }
283
        },
284
        released: function () {
285
          this.transition('released');
×
286
        },
287
        purge: function () {
288
          this.emit('purgeFailed', this.failedWith);
4✔
289
        },
290
        subscribe: function () {
291
          this.emit('subscribeFailed', this.failedWith);
4✔
292
        }
293
      },
294
      initializing: {
295
        _onEnter: function () {
296
          queueFn(options, topology, serializers)
316✔
297
            .then(
298
              queue => {
299
                this.lastQueue = queue;
304✔
300
                this.handle('acquired', queue);
304✔
301
              },
302
              err => {
303
                this.failedWith = err;
×
304
                this.transition('failed');
×
305
              }
306
            );
307
        },
308
        acquired: function (queue) {
309
          this.receivedMessages = queue.messages;
304✔
310
          this._define(queue);
304✔
311
          this._listen(queue);
304✔
312
        },
313
        check: function () {
314
          this.deferUntilTransition('ready');
96✔
315
        },
316
        release: function () {
317
          this.deferUntilTransition('ready');
4✔
318
        },
319
        closed: function () {
320
          this.deferUntilTransition('ready');
×
321
        },
322
        purge: function () {
323
          this.deferUntilTransition('ready');
×
324
        },
325
        subscribe: function () {
326
          this.deferUntilTransition('ready');
280✔
327
        }
328
      },
329
      ready: {
330
        _onEnter: function () {
331
          this.emit('defined');
324✔
332
        },
333
        check: function (deferred) {
334
          deferred.resolve();
97✔
335
        },
336
        closed: function () {
337
          this.transition('closed');
×
338
        },
339
        purge: function () {
340
          if (this.purger) {
16!
341
            this.transition('purging');
16✔
342
            return this.purger();
16✔
343
          }
344
        },
345
        release: function () {
346
          this.transition('releasing');
15✔
347
          this.handle('release');
15✔
348
        },
349
        released: function () {
350
          this._release(true);
×
351
          this.transition('initializing');
×
352
        },
353
        subscribe: function () {
354
          if (this.subscriber) {
293!
355
            this.deferAndTransition('subscribing');
293✔
356
            return this.subscriber();
293✔
357
          }
358
        }
359
      },
360
      purging: {
361
        closed: function () {
362
          this.transition('closed');
×
363
        },
364
        purged: function () {
365
          this.deferAndTransition('purged');
16✔
366
        },
367
        release: function () {
368
          this.transition('releasing');
×
369
          this.handle('release');
×
370
        },
371
        released: function () {
372
          this._release(true);
×
373
          this.transition('initializing');
×
374
        },
375
        subscribe: function () {
376
          this.deferUntilTransition('subscribed');
×
377
        }
378
      },
379
      purged: {
380
        check: function (deferred) {
381
          deferred.resolve();
×
382
        },
383
        closed: function () {
384
          this.transition('closed');
×
385
        },
386
        release: function () {
387
          this.transition('releasing');
×
388
          this.handle('release');
×
389
        },
390
        released: function () {
391
          this._release(true);
×
392
          this.transition('initializing');
×
393
        },
394
        purged: function (result) {
395
          this.emit('purged', result);
16✔
396
          if (this.subscribed && this.subscriber) {
16✔
397
            this.subscribe()
12✔
398
              .then(
399
                null,
400
                () => {
401
                  this.subscribed = false;
×
402
                }
403
              );
404
          } else {
405
            this.transition('ready');
4✔
406
          }
407
        },
408
        subscribe: function () {
409
          this.deferAndTransition('ready');
12✔
410
        }
411
      },
412
      releasing: {
413
        release: function () {
414
          this._release(false);
280✔
415
        },
416
        released: function () {
417
          this.transition('released');
276✔
418
        }
419
      },
420
      released: {
421
        _onEnter: function () {
422
          this.subscribed = false;
276✔
423
          this.emit('released');
276✔
424
        },
425
        check: function (deferred) {
426
          deferred.reject(new Error(format("Cannot establish queue '%s' after intentionally closing its connection", this.name)));
4✔
427
        },
428
        purge: function () {
429
          this.emit('purgeFailed', new Error(format("Cannot purge to queue '%s' after intentionally closing its connection", this.name)));
×
430
        },
431
        release: function () {
432
          this.emit('released');
8✔
433
        },
434
        subscribe: function () {
435
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' after intentionally closing its connection", this.name)));
×
436
        }
437
      },
438
      subscribing: {
439
        closed: function () {
440
          this.transition('closed');
×
441
        },
442
        purge: function () {
443
          this.deferUntilTransition('ready');
×
444
        },
445
        release: function () {
446
          this.transition('releasing');
×
447
          this.handle('release');
×
448
        },
449
        released: function () {
450
          this._release(true);
×
451
          this.transition('initializing');
×
452
        },
453
        subscribe: function () {
454
          this.transition('subscribed');
293✔
455
        }
456
      },
457
      subscribed: {
458
        check: function (deferred) {
459
          deferred.resolve();
7✔
460
        },
461
        closed: function () {
462
          this.transition('closed');
4✔
463
        },
464
        purge: function () {
465
          this.deferUntilTransition('ready');
12✔
466
          this.transition('ready');
12✔
467
        },
468
        release: function () {
469
          this.transition('releasing');
261✔
470
          this.handle('release');
261✔
471
        },
472
        released: function () {
473
          this._release(true);
×
474
          this.transition('initializing');
×
475
        },
476
        subscribed: function () {
477
          this.subscribed = true;
292✔
478
          this.emit('subscribed', {});
292✔
479
        }
480
      },
481
      unreachable: {
482
        check: function (deferred) {
483
          deferred.reject(new Error(format("Cannot establish queue '%s' when no nodes can be reached", this.name)));
×
484
        },
485
        purge: function () {
486
          this.emit('purgeFailed', new Error(format("Cannot purge queue '%s' when no nodes can be reached", this.name)));
×
487
        },
488
        subscribe: function (sub) {
489
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' when no nodes can be reached", this.name)));
×
490
        }
491
      }
492
    }
493
  });
494

495
  const fsm = new Fsm();
312✔
496
  connection.addQueue(fsm);
312✔
497
  return fsm;
312✔
498
};
499

500
module.exports = Factory;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc