• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 6922504481

19 Nov 2023 07:28PM UTC coverage: 89.481% (-0.06%) from 89.543%
6922504481

push

github

zlintz
feat(package.json): node 14 support removed

Node 14 suppport removed as it is no longer lts

BREAKING CHANGE: Removed support for node 14

644 of 826 branches covered (0.0%)

Branch coverage included in aggregate %.

3703 of 4032 relevant lines covered (91.84%)

600.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.79
/src/queueFsm.js
1
const machina = require('machina');
3✔
2
const format = require('util').format;
3✔
3
const Monologue = require('node-monologue');
3✔
4
Monologue.mixInto(machina.Fsm);
3✔
5
const log = require('./log.js')('rabbot.queue');
3✔
6
const defer = require('./defer');
3✔
7

8
/* log
9
  * `rabbot.queue`
10
    * `debug`
11
      * release called
12
    * `info`
13
      * subscription started
14
      * queue released
15
    * `warn`
16
      * queue released with pending messages
17
*/
18

19
function unhandle (handlers) {
20
  handlers.forEach((handle) =>
459✔
21
    handle.unsubscribe()
1,803✔
22
  );
23
}
24

25
const Factory = function (options, connection, topology, serializers, queueFn) {
3✔
26
  // allows us to optionally provide a mock
27
  queueFn = queueFn || require('./amqp/queue');
234✔
28

29
  const Fsm = machina.Fsm.extend({
234✔
30
    name: options.name,
31
    uniqueName: options.uniqueName,
32
    responseSubscriptions: {},
33
    signalSubscription: undefined,
34
    subscribed: false,
35
    subscriber: undefined,
36
    purger: undefined,
37
    unsubscribers: [],
38
    releasers: [],
39

40
    _define: function (queue) {
41
      const onError = function (err) {
19,367✔
42
        this.failedWith = err;
19,145✔
43
        this.transition('failed');
19,145✔
44
      }.bind(this);
45
      const onDefined = function (defined) {
19,367✔
46
        if (!this.name) {
222✔
47
          this.name = defined.queue;
3✔
48
          options.name = defined.queue;
3✔
49
          queue.messages.changeName(this.name);
3✔
50
          topology.renameQueue(defined.queue);
3✔
51
        }
52
        this.transition('ready');
222✔
53
      }.bind(this);
54
      queue.define()
19,367✔
55
        .then(onDefined, onError);
56
    },
57

58
    _listen: function (queue) {
59
      const handlers = [];
228✔
60
      const emit = this.emit.bind(this);
228✔
61

62
      const unsubscriber = function () {
228✔
63
        return queue.unsubscribe();
3✔
64
      };
65

66
      const onPurge = function (messageCount) {
228✔
67
        log.info(`Purged ${messageCount} queue ${options.name} - ${connection.name}`);
12✔
68
        this.handle('purged', messageCount);
12✔
69
      }.bind(this);
70

71
      const purger = function () {
228✔
72
        return queue
12✔
73
          .purge()
74
          .then(onPurge)
75
          .catch(function (err) {
76
            emit('purgeFailed', err);
×
77
          });
78
      };
79

80
      const onSubscribe = function () {
228✔
81
        log.info('Subscription to (%s) queue %s - %s started with consumer tag %s',
219✔
82
          options.noAck ? 'untracked' : 'tracked',
219✔
83
          options.name,
84
          connection.name,
85
          queue.channel.tag);
86
        this.unsubscribers.push(unsubscriber);
219✔
87
        this.handle('subscribed');
219✔
88
      }.bind(this);
89

90
      const subscriber = function (exclusive) {
228✔
91
        return queue
219✔
92
          .subscribe(!!exclusive)
93
          .then(onSubscribe)
94
          .catch(function (err) {
95
            emit('subscribeFailed', err);
×
96
          });
97
      };
98

99
      const releaser = function (closed) {
228✔
100
        // remove handlers established on queue
101
        unhandle(handlers);
210✔
102
        if (queue && queue.getMessageCount() > 0) {
210✔
103
          log.warn('!!! Queue %s - %s was released with %d pending messages !!!',
93✔
104
            options.name, connection.name, queue.getMessageCount());
105
        } else if (queue) {
117!
106
          log.info('Released queue %s - %s', options.name, connection.name);
117✔
107
        }
108

109
        if (!closed) {
210✔
110
          queue.release()
207✔
111
            .then(function () {
112
              this.handle('released');
207✔
113
            }.bind(this));
114
        }
115
      }.bind(this);
116

117
      this.subscriber = subscriber;
228✔
118
      this.releasers.push(releaser);
228✔
119
      this.purger = purger;
228✔
120

121
      handlers.push(queue.channel.on('acquired', function () {
228✔
122
        this._define(queue);
19,139✔
123
      }.bind(this))
124
      );
125
      handlers.push(queue.channel.on('released', function () {
228✔
126
        this.handle('released', queue);
×
127
      }.bind(this))
128
      );
129
      handlers.push(queue.channel.on('closed', function () {
228✔
130
        this.handle('closed', queue);
3✔
131
      }.bind(this))
132
      );
133
      handlers.push(connection.on('unreachable', function (err) {
228✔
134
        err = err || new Error('Could not establish a connection to any known nodes.');
×
135
        this.handle('unreachable', queue);
×
136
      }.bind(this))
137
      );
138

139
      if (options.subscribe) {
228✔
140
        this.handle('subscribe');
210✔
141
      }
142
    },
143

144
    _release: function (closed) {
145
      const release = this.releasers.shift();
213✔
146
      if (release) {
213✔
147
        release(closed);
210✔
148
      } else {
149
        return Promise.resolve();
3✔
150
      }
151
    },
152

153
    check: function () {
154
      const deferred = defer();
84✔
155
      this.handle('check', deferred);
84✔
156
      return deferred.promise;
84✔
157
    },
158

159
    purge: function () {
160
      return new Promise(function (resolve, reject) {
15✔
161
        let _handlers = null;
15✔
162
        function cleanResolve (result) {
163
          unhandle(_handlers);
12✔
164
          resolve(result);
12✔
165
        }
166
        function cleanReject (err) {
167
          unhandle(_handlers);
3✔
168
          this.transition('failed');
3✔
169
          reject(err);
3✔
170
        }
171
        _handlers = [
15✔
172
          this.once('purged', cleanResolve),
173
          this.once('purgeFailed', cleanReject.bind(this)),
174
          this.once('failed', cleanReject.bind(this))
175
        ];
176
        this.handle('purge');
15✔
177
      }.bind(this));
178
    },
179

180
    reconnect: function () {
181
      if (/releas/.test(this.state)) {
72!
182
        this.transition('initializing');
×
183
      }
184
      return this.check();
72✔
185
    },
186

187
    release: function () {
188
      return new Promise(function (resolve, reject) {
216✔
189
        let _handlers = null;
216✔
190
        function cleanResolve () {
191
          unhandle(_handlers);
216✔
192
          resolve();
216✔
193
        }
194
        function cleanReject (err) {
195
          unhandle(_handlers);
×
196
          reject(err);
×
197
        }
198
        _handlers = [
216✔
199
          this.once('released', cleanResolve),
200
          this.once('failed', cleanReject),
201
          this.once('unreachable', cleanReject),
202
          this.once('noqueue', cleanResolve)
203
        ];
204
        this.handle('release');
216✔
205
      }.bind(this));
206
    },
207

208
    retry: function () {
209
      this.transition('initializing');
×
210
    },
211

212
    subscribe: function (exclusive) {
213
      options.subscribe = true;
21✔
214
      options.exclusive = exclusive;
21✔
215
      return new Promise(function (resolve, reject) {
21✔
216
        let _handlers = null;
21✔
217
        function cleanResolve () {
218
          unhandle(_handlers);
15✔
219
          resolve();
15✔
220
        }
221
        function cleanReject (err) {
222
          unhandle(_handlers);
3✔
223
          this.transition('failed');
3✔
224
          reject(err);
3✔
225
        }
226
        _handlers = [
21✔
227
          this.once('subscribed', cleanResolve),
228
          this.once('subscribeFailed', cleanReject.bind(this)),
229
          this.once('failed', cleanReject.bind(this))
230
        ];
231
        this.handle('subscribe');
21✔
232
      }.bind(this));
233
    },
234

235
    unsubscribe: function () {
236
      options.subscribe = false;
3✔
237
      const unsubscriber = this.unsubscribers.shift();
3✔
238
      if (unsubscriber) {
3!
239
        return unsubscriber();
3✔
240
      } else {
241
        return Promise.reject(new Error('No active subscription presently exists on the queue'));
×
242
      }
243
    },
244

245
    initialState: 'initializing',
246
    states: {
247
      closed: {
248
        _onEnter: function () {
249
          this.subscribed = false;
3✔
250
          this._release(true);
3✔
251
          this.emit('closed');
3✔
252
        },
253
        check: function () {
254
          this.deferUntilTransition('ready');
3✔
255
          this.transition('initializing');
3✔
256
        },
257
        purge: function () {
258
          this.deferUntilTransition('ready');
×
259
        },
260
        subscribe: function () {
261
          this.deferUntilTransition('ready');
×
262
        }
263
      },
264
      failed: {
265
        _onEnter: function () {
266
          this.subscribed = false;
6✔
267
          this.emit('failed', this.failedWith);
6✔
268
        },
269
        check: function (deferred) {
270
          if (deferred) {
3!
271
            deferred.reject(this.failedWith);
3✔
272
          }
273
          this.emit('failed', this.failedWith);
3✔
274
        },
275
        release: function (queue) {
276
          if (queue) {
×
277
            this._removeHandlers();
×
278
            queue.release()
×
279
              .then(function () {
280
                this.handle('released', queue);
×
281
              });
282
          }
283
        },
284
        released: function () {
285
          this.transition('released');
×
286
        },
287
        purge: function () {
288
          this.emit('purgeFailed', this.failedWith);
3✔
289
        },
290
        subscribe: function () {
291
          this.emit('subscribeFailed', this.failedWith);
3✔
292
        }
293
      },
294
      initializing: {
295
        _onEnter: function () {
296
          queueFn(options, topology, serializers)
237✔
297
            .then(
298
              queue => {
299
                this.lastQueue = queue;
228✔
300
                this.handle('acquired', queue);
228✔
301
              },
302
              err => {
303
                this.failedWith = err;
×
304
                this.transition('failed');
×
305
              }
306
            );
307
        },
308
        acquired: function (queue) {
309
          this.receivedMessages = queue.messages;
228✔
310
          this._define(queue);
228✔
311
          this._listen(queue);
228✔
312
        },
313
        check: function () {
314
          this.deferUntilTransition('ready');
72✔
315
        },
316
        release: function () {
317
          this.deferUntilTransition('ready');
3✔
318
        },
319
        closed: function () {
320
          this.deferUntilTransition('ready');
×
321
        },
322
        purge: function () {
323
          this.deferUntilTransition('ready');
×
324
        },
325
        subscribe: function () {
326
          this.deferUntilTransition('ready');
210✔
327
        }
328
      },
329
      ready: {
330
        _onEnter: function () {
331
          this.emit('defined');
243✔
332
        },
333
        check: function (deferred) {
334
          deferred.resolve();
74✔
335
        },
336
        closed: function () {
337
          this.transition('closed');
×
338
        },
339
        purge: function () {
340
          if (this.purger) {
12!
341
            this.transition('purging');
12✔
342
            return this.purger();
12✔
343
          }
344
        },
345
        release: function () {
346
          this.transition('releasing');
12✔
347
          this.handle('release');
12✔
348
        },
349
        released: function () {
350
          this._release(true);
×
351
          this.transition('initializing');
×
352
        },
353
        subscribe: function () {
354
          if (this.subscriber) {
219!
355
            this.deferAndTransition('subscribing');
219✔
356
            return this.subscriber();
219✔
357
          }
358
        }
359
      },
360
      purging: {
361
        closed: function () {
362
          this.transition('closed');
×
363
        },
364
        purged: function () {
365
          this.deferAndTransition('purged');
12✔
366
        },
367
        release: function () {
368
          this.transition('releasing');
×
369
          this.handle('release');
×
370
        },
371
        released: function () {
372
          this._release(true);
×
373
          this.transition('initializing');
×
374
        },
375
        subscribe: function () {
376
          this.deferUntilTransition('subscribed');
×
377
        }
378
      },
379
      purged: {
380
        check: function (deferred) {
381
          deferred.resolve();
×
382
        },
383
        closed: function () {
384
          this.transition('closed');
×
385
        },
386
        release: function () {
387
          this.transition('releasing');
×
388
          this.handle('release');
×
389
        },
390
        released: function () {
391
          this._release(true);
×
392
          this.transition('initializing');
×
393
        },
394
        purged: function (result) {
395
          this.emit('purged', result);
12✔
396
          if (this.subscribed && this.subscriber) {
12✔
397
            this.subscribe()
9✔
398
              .then(
399
                null,
400
                () => {
401
                  this.subscribed = false;
×
402
                }
403
              );
404
          } else {
405
            this.transition('ready');
3✔
406
          }
407
        },
408
        subscribe: function () {
409
          this.deferAndTransition('ready');
9✔
410
        }
411
      },
412
      releasing: {
413
        release: function () {
414
          this._release(false);
210✔
415
        },
416
        released: function () {
417
          this.transition('released');
207✔
418
        }
419
      },
420
      released: {
421
        _onEnter: function () {
422
          this.subscribed = false;
207✔
423
          this.emit('released');
207✔
424
        },
425
        check: function (deferred) {
426
          deferred.reject(new Error(format("Cannot establish queue '%s' after intentionally closing its connection", this.name)));
3✔
427
        },
428
        purge: function () {
429
          this.emit('purgeFailed', new Error(format("Cannot purge to queue '%s' after intentionally closing its connection", this.name)));
×
430
        },
431
        release: function () {
432
          this.emit('released');
6✔
433
        },
434
        subscribe: function () {
435
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' after intentionally closing its connection", this.name)));
×
436
        }
437
      },
438
      subscribing: {
439
        closed: function () {
440
          this.transition('closed');
×
441
        },
442
        purge: function () {
443
          this.deferUntilTransition('ready');
×
444
        },
445
        release: function () {
446
          this.transition('releasing');
×
447
          this.handle('release');
×
448
        },
449
        released: function () {
450
          this._release(true);
×
451
          this.transition('initializing');
×
452
        },
453
        subscribe: function () {
454
          this.transition('subscribed');
219✔
455
        }
456
      },
457
      subscribed: {
458
        check: function (deferred) {
459
          deferred.resolve();
4✔
460
        },
461
        closed: function () {
462
          this.transition('closed');
3✔
463
        },
464
        purge: function () {
465
          this.deferUntilTransition('ready');
9✔
466
          this.transition('ready');
9✔
467
        },
468
        release: function () {
469
          this.transition('releasing');
195✔
470
          this.handle('release');
195✔
471
        },
472
        released: function () {
473
          this._release(true);
×
474
          this.transition('initializing');
×
475
        },
476
        subscribed: function () {
477
          this.subscribed = true;
219✔
478
          this.emit('subscribed', {});
219✔
479
        }
480
      },
481
      unreachable: {
482
        check: function (deferred) {
483
          deferred.reject(new Error(format("Cannot establish queue '%s' when no nodes can be reached", this.name)));
×
484
        },
485
        purge: function () {
486
          this.emit('purgeFailed', new Error(format("Cannot purge queue '%s' when no nodes can be reached", this.name)));
×
487
        },
488
        subscribe: function (sub) {
489
          this.emit('subscribeFailed', new Error(format("Cannot subscribe to queue '%s' when no nodes can be reached", this.name)));
×
490
        }
491
      }
492
    }
493
  });
494

495
  const fsm = new Fsm();
234✔
496
  connection.addQueue(fsm);
234✔
497
  return fsm;
234✔
498
};
499

500
module.exports = Factory;
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc