• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 6922504481

19 Nov 2023 07:28PM UTC coverage: 89.481% (-0.06%) from 89.543%
6922504481

push

github

zlintz
feat(package.json): node 14 support removed

Node 14 suppport removed as it is no longer lts

BREAKING CHANGE: Removed support for node 14

644 of 826 branches covered (0.0%)

Branch coverage included in aggregate %.

3703 of 4032 relevant lines covered (91.84%)

600.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.32
/src/amqp/queue.js
1
const AckBatch = require('../ackBatch.js');
3✔
2
const postal = require('postal');
3✔
3
const dispatch = postal.channel('rabbit.dispatch');
3✔
4
const responses = postal.channel('rabbit.responses');
3✔
5
const info = require('../info');
3✔
6
const log = require('../log')('rabbot.queue');
3✔
7
const format = require('util').format;
3✔
8
const topLog = require('../log')('rabbot.topology');
3✔
9
const unhandledLog = require('../log')('rabbot.unhandled');
3✔
10
const noOp = function () {};
3✔
11

12
/* log
13
  * `rabbot.amqp-queue`
14
    * `debug`
15
      * for all message operations - ack, nack, reply & reject
16
    * `info`
17
      * subscribing
18
      * unsubscribing
19
    * `warn`
20
      * no message handlers for message received
21
    * `error`
22
      * no serializer defined for outgoing message
23
      * no serializer defined for incoming message
24
      * message nacked/rejected when consumer is set to no-ack
25
  * `rabbot.topology`
26
    * `info`
27
      * queue declaration
28
*/
29

30
function aliasOptions (options, aliases, ...omit) {
31
  const keys = Object.keys(options);
19,373✔
32
  return keys.reduce((result, key) => {
19,373✔
33
    const alias = aliases[key] || key;
58,491✔
34
    if (omit.indexOf(key) < 0) {
58,491✔
35
      result[alias] = options[key];
58,245✔
36
    }
37
    return result;
58,491✔
38
  }, {});
39
}
40

41
function define (channel, options, subscriber, connectionName) {
42
  const valid = aliasOptions(options, {
19,373✔
43
    queuelimit: 'maxLength',
44
    queueLimit: 'maxLength',
45
    deadletter: 'deadLetterExchange',
46
    deadLetter: 'deadLetterExchange',
47
    deadLetterRoutingKey: 'deadLetterRoutingKey'
48
  }, 'subscribe', 'limit', 'noBatch', 'unique');
49
  topLog.info("Declaring queue '%s' on connection '%s' with the options: %s",
19,373✔
50
    options.uniqueName, connectionName, JSON.stringify(options));
51

52
  let queuePromise;
53

54
  if (options.passive) {
19,373✔
55
    queuePromise = channel.checkQueue(options.uniqueName);
19,145✔
56
  } else {
57
    queuePromise = channel.assertQueue(options.uniqueName, valid);
228✔
58
  }
59

60
  return queuePromise.then(function (q) {
19,373✔
61
    if (options.limit) {
231✔
62
      channel.prefetch(options.limit);
18✔
63
    }
64
    return q;
231✔
65
  });
66
}
67

68
function finalize (channel, messages) {
69
  messages.reset();
204✔
70
  messages.ignoreSignal();
204✔
71
  channel.release();
204✔
72
  channel = undefined;
204✔
73
}
74

75
function getContentType (body, options) {
76
  if (options && options.contentType) {
42!
77
    return options.contentType;
×
78
  } else if (typeof body === 'string') {
42✔
79
    return 'text/plain';
33✔
80
  } else if (typeof body === 'object' && !body.length) {
9!
81
    return 'application/json';
9✔
82
  } else {
83
    return 'application/octet-stream';
×
84
  }
85
}
86

87
function getCount (messages) {
88
  if (messages) {
297!
89
    return messages.messages.length;
297✔
90
  } else {
91
    return 0;
×
92
  }
93
}
94

95
function getNoBatchOps (channel, raw, messages, noAck) {
96
  messages.receivedCount += 1;
30✔
97

98
  let ack, nack, reject;
99
  if (noAck) {
30!
100
    ack = noOp;
×
101
    nack = function () {
×
102
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
103
    };
104
    reject = function () {
×
105
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
106
    };
107
  } else {
108
    ack = function () {
30✔
109
      log.debug("Acking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
27✔
110
      channel.ack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
27✔
111
    };
112
    nack = function () {
30✔
113
      log.debug("Nacking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
114
      channel.nack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
×
115
    };
116
    reject = function () {
30✔
117
      log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
118
      channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false);
×
119
    };
120
  }
121

122
  return {
30✔
123
    ack: ack,
124
    nack: nack,
125
    reject: reject
126
  };
127
}
128

129
function getReply (channel, serializers, raw, replyQueue, connectionName) {
130
  let position = 0;
3,387✔
131
  return function (reply, options) {
3,387✔
132
    const defaultReplyType = raw.type + '.reply';
42✔
133
    const replyType = options ? (options.replyType || defaultReplyType) : defaultReplyType;
42✔
134
    const contentType = getContentType(reply, options);
42✔
135
    const serializer = serializers[contentType];
42✔
136
    if (!serializer) {
42!
137
      const message = format('Failed to publish message with contentType %s - no serializer defined', contentType);
×
138
      log.error(message);
×
139
      return Promise.reject(new Error(message));
×
140
    }
141
    const payload = serializer.serialize(reply);
42✔
142

143
    const replyTo = raw.properties.replyTo;
42✔
144
    raw.ack();
42✔
145
    if (replyTo) {
42!
146
      const publishOptions = {
42✔
147
        type: replyType,
148
        contentType: contentType,
149
        contentEncoding: 'utf8',
150
        correlationId: raw.properties.messageId,
151
        timestamp: options && options.timestamp ? options.timestamp : Date.now(),
93!
152
        replyTo: replyQueue === false ? undefined : replyQueue,
42!
153
        headers: options && options.headers ? options.headers : {}
93!
154
      };
155
      if (options && options.more) {
42✔
156
        publishOptions.headers.position = (position++);
9✔
157
      } else {
158
        publishOptions.headers.sequence_end = true; // jshint ignore:line
33✔
159
      }
160
      log.debug("Replying to message %s on '%s' - '%s' with type '%s'",
42✔
161
        raw.properties.messageId,
162
        replyTo,
163
        connectionName,
164
        publishOptions.type);
165
      if (raw.properties.headers && raw.properties.headers['direct-reply-to']) {
42✔
166
        return channel.publish(
9✔
167
          '',
168
          replyTo,
169
          payload,
170
          publishOptions
171
        );
172
      } else {
173
        return channel.sendToQueue(replyTo, payload, publishOptions);
33✔
174
      }
175
    } else {
176
      return Promise.reject(new Error('Cannot reply to a message that has no return address'));
×
177
    }
178
  };
179
}
180

181
function getResolutionOperations (channel, raw, messages, options) {
182
  if (options.noBatch) {
3,387✔
183
    return getNoBatchOps(channel, raw, messages, options.noAck);
30✔
184
  }
185

186
  if (options.noAck || options.noBatch) {
3,357✔
187
    return getUntrackedOps(channel, raw, messages);
54✔
188
  }
189

190
  return getTrackedOps(raw, messages);
3,303✔
191
}
192

193
function getTrackedOps (raw, messages) {
194
  return messages.getMessageOps(raw.fields.deliveryTag);
3,303✔
195
}
196

197
function getUntrackedOps (channel, raw, messages) {
198
  messages.receivedCount += 1;
54✔
199
  return {
54✔
200
    ack: noOp,
201
    nack: function () {
202
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
203
    },
204
    reject: function () {
205
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
206
    }
207
  };
208
}
209

210
// purging an auto-delete queue means unsubscribing is not
211
// an option as it will cause the queue, binding and possibly
212
// upstream auto-delete exchanges to be deleted as well
213
function purgeADQueue (channel, connectionName, options, messages) {
214
  const name = options.uniqueName || options.name;
6!
215
  return new Promise(function (resolve, reject) {
6✔
216
    const messageCount = messages.messages.length;
6✔
217
    if (messageCount > 0) {
6✔
218
      log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
3✔
219
      messages.once('empty', function () {
3✔
220
        channel.purgeQueue(name)
3✔
221
          .then(
222
            result => resolve(result.messageCount),
3✔
223
            reject
224
          );
225
      });
226
    } else {
227
      channel.purgeQueue(name)
3✔
228
        .then(
229
          result => resolve(result.messageCount),
3✔
230
          reject
231
        );
232
    }
233
  });
234
}
235

236
// queues not marked auto-delete should be unsubscribed from
237
// in order to stop incoming messages while the purge is
238
// taking place and avoid arrival of additional new messages
239
function purgeQueue (channel, connectionName, options, messages) {
240
  const name = options.uniqueName || options.name;
3!
241
  return new Promise(function (resolve, reject) {
3✔
242
    function onUnsubscribed () {
243
      const messageCount = messages.messages.length;
3✔
244
      if (messageCount > 0) {
3!
245
        log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
3✔
246
        messages.once('empty', function () {
3✔
247
          channel.purgeQueue(name)
3✔
248
            .then(
249
              result => resolve(result.messageCount),
3✔
250
              reject
251
            );
252
        });
253
      } else {
254
        channel.purgeQueue(name)
×
255
          .then(
256
            result => resolve(result.messageCount),
×
257
            reject
258
          );
259
      }
260
    }
261
    log.info(`Stopping subscription on '${options.name}' on '${connectionName}' before purging`);
3✔
262
    unsubscribe(channel, options)
3✔
263
      .then(onUnsubscribed, onUnsubscribed);
264
  });
265
}
266

267
function purge (channel, connectionName, options, messages, definer) {
268
  log.info(`Checking queue length on '${options.name}' on '${connectionName}' before purging`);
9✔
269
  return definer()
9✔
270
    .then(
271
      q => {
272
        if (q.messageCount > 0) {
9!
273
          const promise = options.autoDelete
9✔
274
            ? purgeADQueue(channel, connectionName, options, messages)
275
            : purgeQueue(channel, connectionName, options, messages);
276
          return promise
9✔
277
            .then(
278
              count => {
279
                log.info(`Purged ${count} messages from '${options.name}' on '${connectionName}'`);
9✔
280
                return count;
9✔
281
              }
282
            );
283
        } else {
284
          log.info(`'${options.name}' on '${connectionName}' was already empty when purge was called`);
×
285
          return Promise.resolve(0);
×
286
        }
287
      },
288
      Promise.reject
289
    );
290
}
291

292
function release (channel, options, messages, released) {
293
  function onUnsubscribed () {
294
    return new Promise(function (resolve) {
204✔
295
      const messageCount = messages.messages.length;
204✔
296
      if (messageCount > 0 && !released) {
204✔
297
        log.info(`Release operation for queue '${options.name}' is waiting for resolution on ${messageCount} messages`);
93✔
298
        messages.once('empty', function () {
93✔
299
          finalize(channel, messages);
93✔
300
          resolve();
93✔
301
        });
302
      } else {
303
        finalize(channel, messages);
111✔
304
        resolve();
111✔
305
      }
306
    });
307
  }
308
  return unsubscribe(channel, options)
204✔
309
    .then(onUnsubscribed, onUnsubscribed);
310
}
311

312
function resolveTags (channel, queue, connection) {
313
  return function (op, data) {
225✔
314
    switch (op) {
402!
315
      case 'ack':
316
        log.debug("Acking tag %d on '%s' - '%s'", data.tag, queue, connection);
102✔
317
        return channel.ack({ fields: { deliveryTag: data.tag } }, data.inclusive);
102✔
318
      case 'nack':
319
        log.debug("Nacking tag %d on '%s' - '%s'", data.tag, queue, connection);
×
320
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive);
×
321
      case 'reject':
322
        log.debug("Rejecting tag %d on '%s' - '%s'", data.tag, queue, connection);
12✔
323
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive, false);
12✔
324
      default:
325
        return Promise.resolve(true);
288✔
326
    }
327
  };
328
}
329

330
function subscribe (channelName, channel, topology, serializers, messages, options, exclusive) {
331
  const shouldAck = !options.noAck;
213✔
332
  const shouldBatch = !options.noBatch;
213✔
333
  const shouldCacheKeys = !options.noCacheKeys;
213✔
334
  // this is done to support rabbit-assigned queue names
335
  channelName = channelName || options.name;
213✔
336
  if (shouldAck && shouldBatch) {
213✔
337
    messages.listenForSignal();
195✔
338
  }
339

340
  options.consumerTag = info.createTag(channelName);
213✔
341
  if (Object.keys(channel.item.consumers).length > 0) {
213✔
342
    log.info('Duplicate subscription to queue %s ignored', channelName);
3✔
343
    return Promise.resolve(options.consumerTag);
3✔
344
  }
345
  log.info("Starting subscription to queue '%s' on '%s'", channelName, topology.connection.name);
210✔
346
  return channel.consume(channelName, function (raw) {
210✔
347
    if (!raw) {
3,387!
348
      // this happens when the consumer has been cancelled
349
      log.warn("Queue '%s' was sent a consumer cancel notification");
×
350
      throw new Error('Broker cancelled the consumer remotely');
×
351
    }
352
    const correlationId = raw.properties.correlationId;
3,387✔
353
    const ops = getResolutionOperations(channel, raw, messages, options);
3,387✔
354

355
    raw.ack = ops.ack.bind(ops);
3,387✔
356
    raw.reject = ops.reject.bind(ops);
3,387✔
357
    raw.nack = ops.nack.bind(ops);
3,387✔
358
    raw.reply = getReply(channel, serializers, raw, topology.replyQueue.name, topology.connection.name);
3,387✔
359
    raw.type = raw.properties.type || raw.fields.routingKey;
3,387✔
360
    if (exclusive) {
3,387!
361
      options.exclusive = true;
×
362
    }
363
    raw.queue = channelName;
3,387✔
364
    const parts = [options.name.replace(/[.]/g, '-')];
3,387✔
365
    if (raw.type) {
3,387✔
366
      parts.push(raw.type);
3,369✔
367
    }
368
    let topic = parts.join('.');
3,387✔
369
    const contentType = raw.properties.contentType || 'application/octet-stream';
3,387!
370
    const serializer = serializers[contentType];
3,387✔
371
    const track = () => {
3,387✔
372
      if (shouldAck && shouldBatch) {
3,387✔
373
        messages.addMessage(ops);
3,303✔
374
      }
375
    };
376
    if (!serializer) {
3,387!
377
      if (options.poison) {
×
378
        raw.body = raw.content;
×
379
        raw.contentEncoding = raw.properties.contentEncoding;
×
380
        raw.quarantined = true;
×
381
        topic = `${topic}.quarantined`;
×
382
      } else {
383
        log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined",
×
384
          raw.properties.messageId, channelName, topology.connection.name);
385
        track();
×
386
        ops.reject();
×
387
        return;
×
388
      }
389
    } else {
390
      try {
3,387✔
391
        raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding);
3,387✔
392
      } catch (err) {
393
        if (options.poison) {
6✔
394
          raw.quarantined = true;
3✔
395
          raw.body = raw.content;
3✔
396
          raw.contentEncoding = raw.properties.contentEncoding;
3✔
397
          topic = `${topic}.quarantined`;
3✔
398
        } else {
399
          track();
3✔
400
          ops.reject();
3✔
401
          return;
3✔
402
        }
403
      }
404
    }
405

406
    const onPublish = function (data) {
3,384✔
407
      let handled;
408

409
      if (data.activated) {
3,384✔
410
        handled = true;
3,297✔
411
      }
412
      track();
3,384✔
413

414
      if (!handled) {
3,384✔
415
        unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers",
87✔
416
          raw.type,
417
          channelName,
418
          topology.connection.name
419
        );
420
        topology.onUnhandled(raw);
87✔
421
      }
422
    };
423

424
    if (raw.fields.routingKey === topology.replyQueue.name) {
3,384✔
425
      responses.publish(
42✔
426
        {
427
          topic: correlationId,
428
          headers: {
429
            resolverNoCache: true
430
          },
431
          data: raw
432
        },
433
        onPublish
434
      );
435
    } else {
436
      dispatch.publish({
3,342✔
437
        topic: topic,
438
        headers: {
439
          resolverNoCache: !shouldCacheKeys
440
        },
441
        data: raw
442
      }, onPublish);
443
    }
444
  }, options)
445
    .then(function (result) {
446
      channel.tag = result.consumerTag;
210✔
447
      return result;
210✔
448
    }, function (err) {
449
      log.error('Error on channel consume', options);
×
450
      throw err;
×
451
    });
452
}
453

454
function unsubscribe (channel, options) {
455
  if (channel.tag) {
207✔
456
    log.info("Unsubscribing from queue '%s' with tag %s", options.name, channel.tag);
198✔
457
    return channel.cancel(channel.tag);
198✔
458
  } else {
459
    return Promise.resolve();
9✔
460
  }
461
}
462

463
module.exports = function (options, topology, serializers) {
3✔
464
  const channelName = ['queue', options.uniqueName].join(':');
234✔
465
  return topology.connection.getChannel(channelName, false, 'queue channel for ' + options.name)
234✔
466
    .then(function (channel) {
467
      const messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name));
225✔
468
      const subscriber = subscribe.bind(undefined, options.uniqueName, channel, topology, serializers, messages, options);
225✔
469
      const definer = define.bind(undefined, channel, options, subscriber, topology.connection.name);
225✔
470
      return {
225✔
471
        channel: channel,
472
        messages: messages,
473
        define: definer,
474
        finalize: finalize.bind(undefined, channel, messages),
475
        getMessageCount: getCount.bind(undefined, messages),
476
        purge: purge.bind(undefined, channel, topology.connection.name, options, messages, definer),
477
        release: release.bind(undefined, channel, options, messages),
478
        subscribe: subscriber,
479
        unsubscribe: unsubscribe.bind(undefined, channel, options, messages)
480
      };
481
    });
482
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc