• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 8610775382

09 Apr 2024 05:25AM UTC coverage: 89.638% (+0.1%) from 89.522%
8610775382

push

github

web-flow
Fix issue #26 and #55 and add missing fields in type definition + quorum support (#56)

* chore(package.json): bump version

* feat(index.d.ts): Add `messageTtl`, `passive` (existing fields) and `type` (x-queue-type, for quorum support) + protocol definition for #26

* chore(docs): update docs t reflect changes

* fix(integration): proper protocol (#26) and fix url encode error (#55)

* chore(spec): update connection and queue options with new protocol and queue type field

* refactor(connection.js): use `startsWith` instead of equal, just in case

* feat(index.d.ts): Add `autoDelete` key to QueueOptions definition (it is documented and shown in examples)

* feat(queue.js): Add proper support for quorum queues and silently omit incompatible fields

* feat(queue.spec.js): Add test case for quorum queue type and check if call is valid

* refactor(queue.js): Use ternary operation instead for omition to improve readability

* feat(queue.spec.js): Add behavior test case for quorum queue type (not set)

* chore(topology.md): update docs with queue type and classic queue deprecation warning

* feat(connection.js): make sure `protocol` field is found and backwards compatible (#26)

* docs(topology.md): Added known workaround for issue #23 using async/await example with explanation

* feat(queue.js): add dead letter strategy argument (supports at least once since v3.10 for quorum) + update typing to reflect it

* test(queue.spec.js): Check behavior for `x-dead-letter-strategy` and validate it is correct (only for quorum)

* docs(topology.md): Add `deadLetterStrategy` and `overflow` to `addQueue` parameter description + explanation of choices

* docs(topology.md): refactor example and formatting recent addition to fit norm

* docs(topology.md): shorten example

* docs(topology.md): update configure example to properly use promises

* docs: finalized toplogy example and changes all outdated `var` variable to `const`

* r... (continued)

656 of 839 branches covered (78.19%)

Branch coverage included in aggregate %.

77 of 78 new or added lines in 3 files covered. (98.72%)

3 existing lines in 2 files now uncovered.

3773 of 4102 relevant lines covered (91.98%)

427.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.48
/src/amqp/queue.js
1
const AckBatch = require('../ackBatch.js');
2✔
2
const postal = require('postal');
2✔
3
const dispatch = postal.channel('rabbit.dispatch');
2✔
4
const responses = postal.channel('rabbit.responses');
2✔
5
const info = require('../info');
2✔
6
const log = require('../log')('rabbot.queue');
2✔
7
const format = require('util').format;
2✔
8
const topLog = require('../log')('rabbot.topology');
2✔
9
const unhandledLog = require('../log')('rabbot.unhandled');
2✔
10
const noOp = function () {};
2✔
11

12
/* log
13
  * `rabbot.amqp-queue`
14
    * `debug`
15
      * for all message operations - ack, nack, reply & reject
16
    * `info`
17
      * subscribing
18
      * unsubscribing
19
    * `warn`
20
      * no message handlers for message received
21
    * `error`
22
      * no serializer defined for outgoing message
23
      * no serializer defined for incoming message
24
      * message nacked/rejected when consumer is set to no-ack
25
  * `rabbot.topology`
26
    * `info`
27
      * queue declaration
28
*/
29

30
function aliasOptions (options, aliases, ...omit) {
31
  const keys = Object.keys(options);
13,151✔
32
  return keys.reduce((result, key) => {
13,151✔
33
    const alias = aliases[key] || key;
39,731✔
34
    if (omit.indexOf(key) < 0) {
39,731✔
35
      result[alias] = options[key];
39,539✔
36
    }
37
    return result;
39,731✔
38
  }, {});
39
}
40

41
function argOptions (options) {
42
  const queueType = options.type || 'classic';
13,151✔
43
  const args = {
13,151✔
44
    'x-queue-type': queueType
45
  };
46
  if (queueType === 'quorum' && options.deadLetterStrategy) {
13,151✔
47
    args['x-dead-letter-strategy'] = options.deadLetterStrategy;
2✔
48
  } else if (queueType === 'classic' && options.queueVersion) {
13,149✔
49
    args['x-queue-version'] = options.queueVersion;
4✔
50
  }
51
  return args;
13,151✔
52
}
53

54
function define (channel, options, subscriber, connectionName) {
55
  // Quorum queues dropped support for message prioritiy, exclusivity and non-durable queues
56
  // See: https://www.rabbitmq.com/docs/quorum-queues#feature-matrix
57
  const quorumIncompatible = ['exclusive', 'autoDelete', 'maxPriority'];
13,151✔
58
  const optsFields = ['subscribe', 'limit', 'noBatch', 'unique', 'type', 'queueVersion'];
13,151✔
59
  const omition = options.type === 'quorum' ? [...optsFields, ...quorumIncompatible] : optsFields;
13,151✔
60

61
  const valid = aliasOptions(options, {
13,151✔
62
    queuelimit: 'maxLength',
63
    queueLimit: 'maxLength',
64
    deadletter: 'deadLetterExchange',
65
    deadLetter: 'deadLetterExchange',
66
    deadLetterRoutingKey: 'deadLetterRoutingKey'
67
  }, ...omition);
68
  valid.arguments = argOptions(options);
13,151✔
69

70
  topLog.info("Declaring queue '%s' on connection '%s' with the options: %s",
13,151✔
71
    options.uniqueName, connectionName, JSON.stringify(options));
72

73
  let queuePromise;
74
  if (options.passive) {
13,151✔
75
    queuePromise = channel.checkQueue(options.uniqueName);
12,985✔
76
  } else {
77
    queuePromise = channel.assertQueue(options.uniqueName, valid);
166✔
78
  }
79

80
  return queuePromise.then(function (q) {
13,151✔
81
    if (options.limit) {
168✔
82
      channel.prefetch(options.limit);
12✔
83
    }
84
    return q;
168✔
85
  });
86
}
87

88
function finalize (channel, messages) {
89
  messages.reset();
136✔
90
  messages.ignoreSignal();
136✔
91
  channel.release();
136✔
92
  channel = undefined;
136✔
93
}
94

95
function getContentType (body, options) {
96
  if (options && options.contentType) {
28!
97
    return options.contentType;
×
98
  } else if (typeof body === 'string') {
28✔
99
    return 'text/plain';
22✔
100
  } else if (typeof body === 'object' && !body.length) {
6!
101
    return 'application/json';
6✔
102
  } else {
103
    return 'application/octet-stream';
×
104
  }
105
}
106

107
function getCount (messages) {
108
  if (messages) {
198!
109
    return messages.messages.length;
198✔
110
  } else {
111
    return 0;
×
112
  }
113
}
114

115
function getNoBatchOps (channel, raw, messages, noAck) {
116
  messages.receivedCount += 1;
20✔
117

118
  let ack, nack, reject;
119
  if (noAck) {
20!
120
    ack = noOp;
×
121
    nack = function () {
×
122
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
123
    };
124
    reject = function () {
×
125
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
126
    };
127
  } else {
128
    ack = function () {
20✔
129
      log.debug("Acking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
18✔
130
      channel.ack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
18✔
131
    };
132
    nack = function () {
20✔
133
      log.debug("Nacking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
134
      channel.nack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
×
135
    };
136
    reject = function () {
20✔
137
      log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
138
      channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false);
×
139
    };
140
  }
141

142
  return {
20✔
143
    ack: ack,
144
    nack: nack,
145
    reject: reject
146
  };
147
}
148

149
function getReply (channel, serializers, raw, replyQueue, connectionName) {
150
  let position = 0;
2,258✔
151
  return function (reply, options) {
2,258✔
152
    const defaultReplyType = raw.type + '.reply';
28✔
153
    const replyType = options ? (options.replyType || defaultReplyType) : defaultReplyType;
28✔
154
    const contentType = getContentType(reply, options);
28✔
155
    const serializer = serializers[contentType];
28✔
156
    if (!serializer) {
28!
157
      const message = format('Failed to publish message with contentType %s - no serializer defined', contentType);
×
158
      log.error(message);
×
159
      return Promise.reject(new Error(message));
×
160
    }
161
    const payload = serializer.serialize(reply);
28✔
162

163
    const replyTo = raw.properties.replyTo;
28✔
164
    raw.ack();
28✔
165
    if (replyTo) {
28!
166
      const publishOptions = {
28✔
167
        type: replyType,
168
        contentType: contentType,
169
        contentEncoding: 'utf8',
170
        correlationId: raw.properties.messageId,
171
        timestamp: options && options.timestamp ? options.timestamp : Date.now(),
62!
172
        replyTo: replyQueue === false ? undefined : replyQueue,
28!
173
        headers: options && options.headers ? options.headers : {}
62!
174
      };
175
      if (options && options.more) {
28✔
176
        publishOptions.headers.position = (position++);
6✔
177
      } else {
178
        publishOptions.headers.sequence_end = true; // jshint ignore:line
22✔
179
      }
180
      log.debug("Replying to message %s on '%s' - '%s' with type '%s'",
28✔
181
        raw.properties.messageId,
182
        replyTo,
183
        connectionName,
184
        publishOptions.type);
185
      if (raw.properties.headers && raw.properties.headers['direct-reply-to']) {
28✔
186
        return channel.publish(
6✔
187
          '',
188
          replyTo,
189
          payload,
190
          publishOptions
191
        );
192
      } else {
193
        return channel.sendToQueue(replyTo, payload, publishOptions);
22✔
194
      }
195
    } else {
196
      return Promise.reject(new Error('Cannot reply to a message that has no return address'));
×
197
    }
198
  };
199
}
200

201
function getResolutionOperations (channel, raw, messages, options) {
202
  if (options.noBatch) {
2,258✔
203
    return getNoBatchOps(channel, raw, messages, options.noAck);
20✔
204
  }
205

206
  if (options.noAck || options.noBatch) {
2,238✔
207
    return getUntrackedOps(channel, raw, messages);
36✔
208
  }
209

210
  return getTrackedOps(raw, messages);
2,202✔
211
}
212

213
function getTrackedOps (raw, messages) {
214
  return messages.getMessageOps(raw.fields.deliveryTag);
2,202✔
215
}
216

217
function getUntrackedOps (channel, raw, messages) {
218
  messages.receivedCount += 1;
36✔
219
  return {
36✔
220
    ack: noOp,
221
    nack: function () {
222
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
223
    },
224
    reject: function () {
225
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
226
    }
227
  };
228
}
229

230
// purging an auto-delete queue means unsubscribing is not
231
// an option as it will cause the queue, binding and possibly
232
// upstream auto-delete exchanges to be deleted as well
233
function purgeADQueue (channel, connectionName, options, messages) {
234
  const name = options.uniqueName || options.name;
4!
235
  return new Promise(function (resolve, reject) {
4✔
236
    const messageCount = messages.messages.length;
4✔
237
    if (messageCount > 0) {
4✔
238
      log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
239
      messages.once('empty', function () {
2✔
240
        channel.purgeQueue(name)
2✔
241
          .then(
242
            result => resolve(result.messageCount),
2✔
243
            reject
244
          );
245
      });
246
    } else {
247
      channel.purgeQueue(name)
2✔
248
        .then(
249
          result => resolve(result.messageCount),
2✔
250
          reject
251
        );
252
    }
253
  });
254
}
255

256
// queues not marked auto-delete should be unsubscribed from
257
// in order to stop incoming messages while the purge is
258
// taking place and avoid arrival of additional new messages
259
function purgeQueue (channel, connectionName, options, messages) {
260
  const name = options.uniqueName || options.name;
2!
261
  return new Promise(function (resolve, reject) {
2✔
262
    function onUnsubscribed () {
263
      const messageCount = messages.messages.length;
2✔
264
      if (messageCount > 0) {
2!
265
        log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
266
        messages.once('empty', function () {
2✔
267
          channel.purgeQueue(name)
2✔
268
            .then(
269
              result => resolve(result.messageCount),
2✔
270
              reject
271
            );
272
        });
273
      } else {
274
        channel.purgeQueue(name)
×
275
          .then(
276
            result => resolve(result.messageCount),
×
277
            reject
278
          );
279
      }
280
    }
281
    log.info(`Stopping subscription on '${options.name}' on '${connectionName}' before purging`);
2✔
282
    unsubscribe(channel, options)
2✔
283
      .then(onUnsubscribed, onUnsubscribed);
284
  });
285
}
286

287
function purge (channel, connectionName, options, messages, definer) {
288
  log.info(`Checking queue length on '${options.name}' on '${connectionName}' before purging`);
6✔
289
  return definer()
6✔
290
    .then(
291
      q => {
292
        if (q.messageCount > 0) {
6!
293
          const promise = options.autoDelete
6✔
294
            ? purgeADQueue(channel, connectionName, options, messages)
295
            : purgeQueue(channel, connectionName, options, messages);
296
          return promise
6✔
297
            .then(
298
              count => {
299
                log.info(`Purged ${count} messages from '${options.name}' on '${connectionName}'`);
6✔
300
                return count;
6✔
301
              }
302
            );
303
        } else {
304
          log.info(`'${options.name}' on '${connectionName}' was already empty when purge was called`);
×
305
          return Promise.resolve(0);
×
306
        }
307
      },
308
      Promise.reject
309
    );
310
}
311

312
function release (channel, options, messages, released) {
313
  function onUnsubscribed () {
314
    return new Promise(function (resolve) {
136✔
315
      const messageCount = messages.messages.length;
136✔
316
      if (messageCount > 0 && !released) {
136✔
317
        log.info(`Release operation for queue '${options.name}' is waiting for resolution on ${messageCount} messages`);
62✔
318
        messages.once('empty', function () {
62✔
319
          finalize(channel, messages);
62✔
320
          resolve();
62✔
321
        });
322
      } else {
323
        finalize(channel, messages);
74✔
324
        resolve();
74✔
325
      }
326
    });
327
  }
328
  return unsubscribe(channel, options)
136✔
329
    .then(onUnsubscribed, onUnsubscribed);
330
}
331

332
function resolveTags (channel, queue, connection) {
333
  return function (op, data) {
164✔
334
    switch (op) {
268!
335
      case 'ack':
336
        log.debug("Acking tag %d on '%s' - '%s'", data.tag, queue, connection);
68✔
337
        return channel.ack({ fields: { deliveryTag: data.tag } }, data.inclusive);
68✔
338
      case 'nack':
339
        log.debug("Nacking tag %d on '%s' - '%s'", data.tag, queue, connection);
×
340
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive);
×
341
      case 'reject':
342
        log.debug("Rejecting tag %d on '%s' - '%s'", data.tag, queue, connection);
8✔
343
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive, false);
8✔
344
      default:
345
        return Promise.resolve(true);
192✔
346
    }
347
  };
348
}
349

350
function subscribe (channelName, channel, topology, serializers, messages, options, exclusive) {
351
  const shouldAck = !options.noAck;
142✔
352
  const shouldBatch = !options.noBatch;
142✔
353
  const shouldCacheKeys = !options.noCacheKeys;
142✔
354
  // this is done to support rabbit-assigned queue names
355
  channelName = channelName || options.name;
142✔
356
  if (shouldAck && shouldBatch) {
142✔
357
    messages.listenForSignal();
130✔
358
  }
359

360
  options.consumerTag = info.createTag(channelName);
142✔
361
  if (Object.keys(channel.item.consumers).length > 0) {
142✔
362
    log.info('Duplicate subscription to queue %s ignored', channelName);
2✔
363
    return Promise.resolve(options.consumerTag);
2✔
364
  }
365
  log.info("Starting subscription to queue '%s' on '%s'", channelName, topology.connection.name);
140✔
366
  return channel.consume(channelName, function (raw) {
140✔
367
    if (!raw) {
2,258!
368
      // this happens when the consumer has been cancelled
369
      log.warn("Queue '%s' was sent a consumer cancel notification");
×
370
      throw new Error('Broker cancelled the consumer remotely');
×
371
    }
372
    const correlationId = raw.properties.correlationId;
2,258✔
373
    const ops = getResolutionOperations(channel, raw, messages, options);
2,258✔
374

375
    raw.ack = ops.ack.bind(ops);
2,258✔
376
    raw.reject = ops.reject.bind(ops);
2,258✔
377
    raw.nack = ops.nack.bind(ops);
2,258✔
378
    raw.reply = getReply(channel, serializers, raw, topology.replyQueue.name, topology.connection.name);
2,258✔
379
    raw.type = raw.properties.type || raw.fields.routingKey;
2,258✔
380
    if (exclusive) {
2,258!
381
      options.exclusive = true;
×
382
    }
383
    raw.queue = channelName;
2,258✔
384
    const parts = [options.name.replace(/[.]/g, '-')];
2,258✔
385
    if (raw.type) {
2,258✔
386
      parts.push(raw.type);
2,246✔
387
    }
388
    let topic = parts.join('.');
2,258✔
389
    const contentType = raw.properties.contentType || 'application/octet-stream';
2,258!
390
    const serializer = serializers[contentType];
2,258✔
391
    const track = () => {
2,258✔
392
      if (shouldAck && shouldBatch) {
2,258✔
393
        messages.addMessage(ops);
2,202✔
394
      }
395
    };
396
    if (!serializer) {
2,258!
397
      if (options.poison) {
×
398
        raw.body = raw.content;
×
399
        raw.contentEncoding = raw.properties.contentEncoding;
×
400
        raw.quarantined = true;
×
401
        topic = `${topic}.quarantined`;
×
402
      } else {
403
        log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined",
×
404
          raw.properties.messageId, channelName, topology.connection.name);
405
        track();
×
406
        ops.reject();
×
407
        return;
×
408
      }
409
    } else {
410
      try {
2,258✔
411
        raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding);
2,258✔
412
      } catch (err) {
413
        if (options.poison) {
4✔
414
          raw.quarantined = true;
2✔
415
          raw.body = raw.content;
2✔
416
          raw.contentEncoding = raw.properties.contentEncoding;
2✔
417
          topic = `${topic}.quarantined`;
2✔
418
        } else {
419
          track();
2✔
420
          ops.reject();
2✔
421
          return;
2✔
422
        }
423
      }
424
    }
425

426
    const onPublish = function (data) {
2,256✔
427
      let handled;
428

429
      if (data.activated) {
2,256✔
430
        handled = true;
2,198✔
431
      }
432
      track();
2,256✔
433

434
      if (!handled) {
2,256✔
435
        unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers",
58✔
436
          raw.type,
437
          channelName,
438
          topology.connection.name
439
        );
440
        topology.onUnhandled(raw);
58✔
441
      }
442
    };
443

444
    if (raw.fields.routingKey === topology.replyQueue.name) {
2,256✔
445
      responses.publish(
28✔
446
        {
447
          topic: correlationId,
448
          headers: {
449
            resolverNoCache: true
450
          },
451
          data: raw
452
        },
453
        onPublish
454
      );
455
    } else {
456
      dispatch.publish({
2,228✔
457
        topic: topic,
458
        headers: {
459
          resolverNoCache: !shouldCacheKeys
460
        },
461
        data: raw
462
      }, onPublish);
463
    }
464
  }, options)
465
    .then(function (result) {
466
      channel.tag = result.consumerTag;
140✔
467
      return result;
140✔
468
    }, function (err) {
UNCOV
469
      log.error('Error on channel consume', options);
×
UNCOV
470
      throw err;
×
471
    });
472
}
473

474
function unsubscribe (channel, options) {
475
  if (channel.tag) {
138✔
476
    log.info("Unsubscribing from queue '%s' with tag %s", options.name, channel.tag);
132✔
477
    return channel.cancel(channel.tag);
132✔
478
  } else {
479
    return Promise.resolve();
6✔
480
  }
481
}
482

483
module.exports = function (options, topology, serializers) {
2✔
484
  const channelName = ['queue', options.uniqueName].join(':');
170✔
485
  return topology.connection.getChannel(channelName, false, 'queue channel for ' + options.name)
170✔
486
    .then(function (channel) {
487
      const messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name));
164✔
488
      const subscriber = subscribe.bind(undefined, options.uniqueName, channel, topology, serializers, messages, options);
164✔
489
      const definer = define.bind(undefined, channel, options, subscriber, topology.connection.name);
164✔
490
      return {
164✔
491
        channel: channel,
492
        messages: messages,
493
        define: definer,
494
        finalize: finalize.bind(undefined, channel, messages),
495
        getMessageCount: getCount.bind(undefined, messages),
496
        purge: purge.bind(undefined, channel, topology.connection.name, options, messages, definer),
497
        release: release.bind(undefined, channel, options, messages),
498
        subscribe: subscriber,
499
        unsubscribe: unsubscribe.bind(undefined, channel, options, messages)
500
      };
501
    });
502
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc