• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 8344946353

19 Mar 2024 02:36PM UTC coverage: 89.536% (+0.01%) from 89.522%
8344946353

Pull #56

github

web-flow
Merge 13ef88957 into 98e631a88
Pull Request #56: Fix issue #26 and #55 and add missing fields in type definition

648 of 830 branches covered (78.07%)

Branch coverage included in aggregate %.

23 of 23 new or added lines in 3 files covered. (100.0%)

8 existing lines in 3 files now uncovered.

3716 of 4044 relevant lines covered (91.89%)

413.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.74
/src/amqp/queue.js
1
const AckBatch = require('../ackBatch.js');
2✔
2
const postal = require('postal');
2✔
3
const dispatch = postal.channel('rabbit.dispatch');
2✔
4
const responses = postal.channel('rabbit.responses');
2✔
5
const info = require('../info');
2✔
6
const log = require('../log')('rabbot.queue');
2✔
7
const format = require('util').format;
2✔
8
const topLog = require('../log')('rabbot.topology');
2✔
9
const unhandledLog = require('../log')('rabbot.unhandled');
2✔
10
const noOp = function () {};
2✔
11

12
/* log
13
  * `rabbot.amqp-queue`
14
    * `debug`
15
      * for all message operations - ack, nack, reply & reject
16
    * `info`
17
      * subscribing
18
      * unsubscribing
19
    * `warn`
20
      * no message handlers for message received
21
    * `error`
22
      * no serializer defined for outgoing message
23
      * no serializer defined for incoming message
24
      * message nacked/rejected when consumer is set to no-ack
25
  * `rabbot.topology`
26
    * `info`
27
      * queue declaration
28
*/
29

30
function aliasOptions (options, aliases, ...omit) {
31
  const keys = Object.keys(options);
13,012✔
32
  return keys.reduce((result, key) => {
13,012✔
33
    const alias = aliases[key] || key;
39,294✔
34
    if (omit.indexOf(key) < 0) {
39,294✔
35
      result[alias] = options[key];
39,117✔
36
    }
37
    return result;
39,294✔
38
  }, {});
39
}
40

41
function define (channel, options, subscriber, connectionName) {
42
  // Quorum queues dropped support for message prioritiy, exclusivity and non-durable queues
43
  // See: https://www.rabbitmq.com/docs/quorum-queues#feature-matrix
44
  const quorumIncompatible = ['exclusive', 'autoDelete', 'maxPriority'];
13,012✔
45
  const optsFields = ['subscribe', 'limit', 'noBatch', 'unique', 'type'];
13,012✔
46
  const omition = options.type === 'quorum' ? [...optsFields, ...quorumIncompatible] : optsFields;
13,012✔
47

48
  const valid = aliasOptions(options, {
13,012✔
49
    queuelimit: 'maxLength',
50
    queueLimit: 'maxLength',
51
    deadletter: 'deadLetterExchange',
52
    deadLetter: 'deadLetterExchange',
53
    deadLetterRoutingKey: 'deadLetterRoutingKey'
54
  }, ...omition);
55
  valid.arguments = { 'x-queue-type': options.type || 'classic' };
13,012✔
56
  topLog.info("Declaring queue '%s' on connection '%s' with the options: %s",
13,012✔
57
    options.uniqueName, connectionName, JSON.stringify(options));
58

59
  let queuePromise;
60
  if (options.passive) {
13,012✔
61
    queuePromise = channel.checkQueue(options.uniqueName);
12,857✔
62
  } else {
63
    queuePromise = channel.assertQueue(options.uniqueName, valid);
155✔
64
  }
65

66
  return queuePromise.then(function (q) {
13,012✔
67
    if (options.limit) {
157✔
68
      channel.prefetch(options.limit);
12✔
69
    }
70
    return q;
157✔
71
  });
72
}
73

74
function finalize (channel, messages) {
75
  messages.reset();
137✔
76
  messages.ignoreSignal();
137✔
77
  channel.release();
137✔
78
  channel = undefined;
137✔
79
}
80

81
function getContentType (body, options) {
82
  if (options && options.contentType) {
28!
83
    return options.contentType;
×
84
  } else if (typeof body === 'string') {
28✔
85
    return 'text/plain';
22✔
86
  } else if (typeof body === 'object' && !body.length) {
6!
87
    return 'application/json';
6✔
88
  } else {
89
    return 'application/octet-stream';
×
90
  }
91
}
92

93
function getCount (messages) {
94
  if (messages) {
199!
95
    return messages.messages.length;
199✔
96
  } else {
97
    return 0;
×
98
  }
99
}
100

101
function getNoBatchOps (channel, raw, messages, noAck) {
102
  messages.receivedCount += 1;
20✔
103

104
  let ack, nack, reject;
105
  if (noAck) {
20!
106
    ack = noOp;
×
107
    nack = function () {
×
108
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
109
    };
110
    reject = function () {
×
111
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
112
    };
113
  } else {
114
    ack = function () {
20✔
115
      log.debug("Acking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
18✔
116
      channel.ack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
18✔
117
    };
118
    nack = function () {
20✔
119
      log.debug("Nacking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
120
      channel.nack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
×
121
    };
122
    reject = function () {
20✔
123
      log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
124
      channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false);
×
125
    };
126
  }
127

128
  return {
20✔
129
    ack: ack,
130
    nack: nack,
131
    reject: reject
132
  };
133
}
134

135
function getReply (channel, serializers, raw, replyQueue, connectionName) {
136
  let position = 0;
2,258✔
137
  return function (reply, options) {
2,258✔
138
    const defaultReplyType = raw.type + '.reply';
28✔
139
    const replyType = options ? (options.replyType || defaultReplyType) : defaultReplyType;
28✔
140
    const contentType = getContentType(reply, options);
28✔
141
    const serializer = serializers[contentType];
28✔
142
    if (!serializer) {
28!
143
      const message = format('Failed to publish message with contentType %s - no serializer defined', contentType);
×
144
      log.error(message);
×
145
      return Promise.reject(new Error(message));
×
146
    }
147
    const payload = serializer.serialize(reply);
28✔
148

149
    const replyTo = raw.properties.replyTo;
28✔
150
    raw.ack();
28✔
151
    if (replyTo) {
28!
152
      const publishOptions = {
28✔
153
        type: replyType,
154
        contentType: contentType,
155
        contentEncoding: 'utf8',
156
        correlationId: raw.properties.messageId,
157
        timestamp: options && options.timestamp ? options.timestamp : Date.now(),
62!
158
        replyTo: replyQueue === false ? undefined : replyQueue,
28!
159
        headers: options && options.headers ? options.headers : {}
62!
160
      };
161
      if (options && options.more) {
28✔
162
        publishOptions.headers.position = (position++);
6✔
163
      } else {
164
        publishOptions.headers.sequence_end = true; // jshint ignore:line
22✔
165
      }
166
      log.debug("Replying to message %s on '%s' - '%s' with type '%s'",
28✔
167
        raw.properties.messageId,
168
        replyTo,
169
        connectionName,
170
        publishOptions.type);
171
      if (raw.properties.headers && raw.properties.headers['direct-reply-to']) {
28✔
172
        return channel.publish(
6✔
173
          '',
174
          replyTo,
175
          payload,
176
          publishOptions
177
        );
178
      } else {
179
        return channel.sendToQueue(replyTo, payload, publishOptions);
22✔
180
      }
181
    } else {
182
      return Promise.reject(new Error('Cannot reply to a message that has no return address'));
×
183
    }
184
  };
185
}
186

187
function getResolutionOperations (channel, raw, messages, options) {
188
  if (options.noBatch) {
2,258✔
189
    return getNoBatchOps(channel, raw, messages, options.noAck);
20✔
190
  }
191

192
  if (options.noAck || options.noBatch) {
2,238✔
193
    return getUntrackedOps(channel, raw, messages);
36✔
194
  }
195

196
  return getTrackedOps(raw, messages);
2,202✔
197
}
198

199
function getTrackedOps (raw, messages) {
200
  return messages.getMessageOps(raw.fields.deliveryTag);
2,202✔
201
}
202

203
function getUntrackedOps (channel, raw, messages) {
204
  messages.receivedCount += 1;
36✔
205
  return {
36✔
206
    ack: noOp,
207
    nack: function () {
208
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
209
    },
210
    reject: function () {
211
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
212
    }
213
  };
214
}
215

216
// purging an auto-delete queue means unsubscribing is not
217
// an option as it will cause the queue, binding and possibly
218
// upstream auto-delete exchanges to be deleted as well
219
function purgeADQueue (channel, connectionName, options, messages) {
220
  const name = options.uniqueName || options.name;
4!
221
  return new Promise(function (resolve, reject) {
4✔
222
    const messageCount = messages.messages.length;
4✔
223
    if (messageCount > 0) {
4✔
224
      log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
225
      messages.once('empty', function () {
2✔
226
        channel.purgeQueue(name)
2✔
227
          .then(
228
            result => resolve(result.messageCount),
2✔
229
            reject
230
          );
231
      });
232
    } else {
233
      channel.purgeQueue(name)
2✔
234
        .then(
235
          result => resolve(result.messageCount),
2✔
236
          reject
237
        );
238
    }
239
  });
240
}
241

242
// queues not marked auto-delete should be unsubscribed from
243
// in order to stop incoming messages while the purge is
244
// taking place and avoid arrival of additional new messages
245
function purgeQueue (channel, connectionName, options, messages) {
246
  const name = options.uniqueName || options.name;
2!
247
  return new Promise(function (resolve, reject) {
2✔
248
    function onUnsubscribed () {
249
      const messageCount = messages.messages.length;
2✔
250
      if (messageCount > 0) {
2!
251
        log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
252
        messages.once('empty', function () {
2✔
253
          channel.purgeQueue(name)
2✔
254
            .then(
255
              result => resolve(result.messageCount),
2✔
256
              reject
257
            );
258
        });
259
      } else {
260
        channel.purgeQueue(name)
×
261
          .then(
262
            result => resolve(result.messageCount),
×
263
            reject
264
          );
265
      }
266
    }
267
    log.info(`Stopping subscription on '${options.name}' on '${connectionName}' before purging`);
2✔
268
    unsubscribe(channel, options)
2✔
269
      .then(onUnsubscribed, onUnsubscribed);
270
  });
271
}
272

273
function purge (channel, connectionName, options, messages, definer) {
274
  log.info(`Checking queue length on '${options.name}' on '${connectionName}' before purging`);
6✔
275
  return definer()
6✔
276
    .then(
277
      q => {
278
        if (q.messageCount > 0) {
6!
279
          const promise = options.autoDelete
6✔
280
            ? purgeADQueue(channel, connectionName, options, messages)
281
            : purgeQueue(channel, connectionName, options, messages);
282
          return promise
6✔
283
            .then(
284
              count => {
285
                log.info(`Purged ${count} messages from '${options.name}' on '${connectionName}'`);
6✔
286
                return count;
6✔
287
              }
288
            );
289
        } else {
290
          log.info(`'${options.name}' on '${connectionName}' was already empty when purge was called`);
×
291
          return Promise.resolve(0);
×
292
        }
293
      },
294
      Promise.reject
295
    );
296
}
297

298
function release (channel, options, messages, released) {
299
  function onUnsubscribed () {
300
    return new Promise(function (resolve) {
137✔
301
      const messageCount = messages.messages.length;
137✔
302
      if (messageCount > 0 && !released) {
137✔
303
        log.info(`Release operation for queue '${options.name}' is waiting for resolution on ${messageCount} messages`);
62✔
304
        messages.once('empty', function () {
62✔
305
          finalize(channel, messages);
62✔
306
          resolve();
62✔
307
        });
308
      } else {
309
        finalize(channel, messages);
75✔
310
        resolve();
75✔
311
      }
312
    });
313
  }
314
  return unsubscribe(channel, options)
137✔
315
    .then(onUnsubscribed, onUnsubscribed);
316
}
317

318
function resolveTags (channel, queue, connection) {
319
  return function (op, data) {
153✔
320
    switch (op) {
268!
321
      case 'ack':
322
        log.debug("Acking tag %d on '%s' - '%s'", data.tag, queue, connection);
68✔
323
        return channel.ack({ fields: { deliveryTag: data.tag } }, data.inclusive);
68✔
324
      case 'nack':
325
        log.debug("Nacking tag %d on '%s' - '%s'", data.tag, queue, connection);
×
326
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive);
×
327
      case 'reject':
328
        log.debug("Rejecting tag %d on '%s' - '%s'", data.tag, queue, connection);
8✔
329
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive, false);
8✔
330
      default:
331
        return Promise.resolve(true);
192✔
332
    }
333
  };
334
}
335

336
function subscribe (channelName, channel, topology, serializers, messages, options, exclusive) {
337
  const shouldAck = !options.noAck;
143✔
338
  const shouldBatch = !options.noBatch;
143✔
339
  const shouldCacheKeys = !options.noCacheKeys;
143✔
340
  // this is done to support rabbit-assigned queue names
341
  channelName = channelName || options.name;
143✔
342
  if (shouldAck && shouldBatch) {
143✔
343
    messages.listenForSignal();
131✔
344
  }
345

346
  options.consumerTag = info.createTag(channelName);
143✔
347
  if (Object.keys(channel.item.consumers).length > 0) {
143✔
348
    log.info('Duplicate subscription to queue %s ignored', channelName);
2✔
349
    return Promise.resolve(options.consumerTag);
2✔
350
  }
351
  log.info("Starting subscription to queue '%s' on '%s'", channelName, topology.connection.name);
141✔
352
  return channel.consume(channelName, function (raw) {
141✔
353
    if (!raw) {
2,258!
354
      // this happens when the consumer has been cancelled
355
      log.warn("Queue '%s' was sent a consumer cancel notification");
×
356
      throw new Error('Broker cancelled the consumer remotely');
×
357
    }
358
    const correlationId = raw.properties.correlationId;
2,258✔
359
    const ops = getResolutionOperations(channel, raw, messages, options);
2,258✔
360

361
    raw.ack = ops.ack.bind(ops);
2,258✔
362
    raw.reject = ops.reject.bind(ops);
2,258✔
363
    raw.nack = ops.nack.bind(ops);
2,258✔
364
    raw.reply = getReply(channel, serializers, raw, topology.replyQueue.name, topology.connection.name);
2,258✔
365
    raw.type = raw.properties.type || raw.fields.routingKey;
2,258✔
366
    if (exclusive) {
2,258!
367
      options.exclusive = true;
×
368
    }
369
    raw.queue = channelName;
2,258✔
370
    const parts = [options.name.replace(/[.]/g, '-')];
2,258✔
371
    if (raw.type) {
2,258✔
372
      parts.push(raw.type);
2,246✔
373
    }
374
    let topic = parts.join('.');
2,258✔
375
    const contentType = raw.properties.contentType || 'application/octet-stream';
2,258!
376
    const serializer = serializers[contentType];
2,258✔
377
    const track = () => {
2,258✔
378
      if (shouldAck && shouldBatch) {
2,258✔
379
        messages.addMessage(ops);
2,202✔
380
      }
381
    };
382
    if (!serializer) {
2,258!
383
      if (options.poison) {
×
384
        raw.body = raw.content;
×
385
        raw.contentEncoding = raw.properties.contentEncoding;
×
386
        raw.quarantined = true;
×
387
        topic = `${topic}.quarantined`;
×
388
      } else {
389
        log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined",
×
390
          raw.properties.messageId, channelName, topology.connection.name);
391
        track();
×
392
        ops.reject();
×
393
        return;
×
394
      }
395
    } else {
396
      try {
2,258✔
397
        raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding);
2,258✔
398
      } catch (err) {
399
        if (options.poison) {
4✔
400
          raw.quarantined = true;
2✔
401
          raw.body = raw.content;
2✔
402
          raw.contentEncoding = raw.properties.contentEncoding;
2✔
403
          topic = `${topic}.quarantined`;
2✔
404
        } else {
405
          track();
2✔
406
          ops.reject();
2✔
407
          return;
2✔
408
        }
409
      }
410
    }
411

412
    const onPublish = function (data) {
2,256✔
413
      let handled;
414

415
      if (data.activated) {
2,256✔
416
        handled = true;
2,198✔
417
      }
418
      track();
2,256✔
419

420
      if (!handled) {
2,256✔
421
        unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers",
58✔
422
          raw.type,
423
          channelName,
424
          topology.connection.name
425
        );
426
        topology.onUnhandled(raw);
58✔
427
      }
428
    };
429

430
    if (raw.fields.routingKey === topology.replyQueue.name) {
2,256✔
431
      responses.publish(
28✔
432
        {
433
          topic: correlationId,
434
          headers: {
435
            resolverNoCache: true
436
          },
437
          data: raw
438
        },
439
        onPublish
440
      );
441
    } else {
442
      dispatch.publish({
2,228✔
443
        topic: topic,
444
        headers: {
445
          resolverNoCache: !shouldCacheKeys
446
        },
447
        data: raw
448
      }, onPublish);
449
    }
450
  }, options)
451
    .then(function (result) {
452
      channel.tag = result.consumerTag;
141✔
453
      return result;
141✔
454
    }, function (err) {
UNCOV
455
      log.error('Error on channel consume', options);
×
UNCOV
456
      throw err;
×
457
    });
458
}
459

460
function unsubscribe (channel, options) {
461
  if (channel.tag) {
139✔
462
    log.info("Unsubscribing from queue '%s' with tag %s", options.name, channel.tag);
133✔
463
    return channel.cancel(channel.tag);
133✔
464
  } else {
465
    return Promise.resolve();
6✔
466
  }
467
}
468

469
module.exports = function (options, topology, serializers) {
2✔
470
  const channelName = ['queue', options.uniqueName].join(':');
159✔
471
  return topology.connection.getChannel(channelName, false, 'queue channel for ' + options.name)
159✔
472
    .then(function (channel) {
473
      const messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name));
153✔
474
      const subscriber = subscribe.bind(undefined, options.uniqueName, channel, topology, serializers, messages, options);
153✔
475
      const definer = define.bind(undefined, channel, options, subscriber, topology.connection.name);
153✔
476
      return {
153✔
477
        channel: channel,
478
        messages: messages,
479
        define: definer,
480
        finalize: finalize.bind(undefined, channel, messages),
481
        getMessageCount: getCount.bind(undefined, messages),
482
        purge: purge.bind(undefined, channel, topology.connection.name, options, messages, definer),
483
        release: release.bind(undefined, channel, options, messages),
484
        subscribe: subscriber,
485
        unsubscribe: unsubscribe.bind(undefined, channel, options, messages)
486
      };
487
    });
488
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc