• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Foo-Foo-MQ / foo-foo-mq / 7660807694

25 Jan 2024 09:41PM UTC coverage: 89.461% (-0.08%) from 89.543%
7660807694

push

github

zlintz
fix(src/connectionfsm.js): adding `channel.on('failed'` is breaking some tests,disabling for now

643 of 826 branches covered (0.0%)

Branch coverage included in aggregate %.

3703 of 4032 relevant lines covered (91.84%)

395.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.32
/src/amqp/queue.js
1
const AckBatch = require('../ackBatch.js');
2✔
2
const postal = require('postal');
2✔
3
const dispatch = postal.channel('rabbit.dispatch');
2✔
4
const responses = postal.channel('rabbit.responses');
2✔
5
const info = require('../info');
2✔
6
const log = require('../log')('rabbot.queue');
2✔
7
const format = require('util').format;
2✔
8
const topLog = require('../log')('rabbot.topology');
2✔
9
const unhandledLog = require('../log')('rabbot.unhandled');
2✔
10
const noOp = function () {};
2✔
11

12
/* log
13
  * `rabbot.amqp-queue`
14
    * `debug`
15
      * for all message operations - ack, nack, reply & reject
16
    * `info`
17
      * subscribing
18
      * unsubscribing
19
    * `warn`
20
      * no message handlers for message received
21
    * `error`
22
      * no serializer defined for outgoing message
23
      * no serializer defined for incoming message
24
      * message nacked/rejected when consumer is set to no-ack
25
  * `rabbot.topology`
26
    * `info`
27
      * queue declaration
28
*/
29

30
function aliasOptions (options, aliases, ...omit) {
31
  const keys = Object.keys(options);
12,678✔
32
  return keys.reduce((result, key) => {
12,678✔
33
    const alias = aliases[key] || key;
38,282✔
34
    if (omit.indexOf(key) < 0) {
38,282✔
35
      result[alias] = options[key];
38,118✔
36
    }
37
    return result;
38,282✔
38
  }, {});
39
}
40

41
function define (channel, options, subscriber, connectionName) {
42
  const valid = aliasOptions(options, {
12,678✔
43
    queuelimit: 'maxLength',
44
    queueLimit: 'maxLength',
45
    deadletter: 'deadLetterExchange',
46
    deadLetter: 'deadLetterExchange',
47
    deadLetterRoutingKey: 'deadLetterRoutingKey'
48
  }, 'subscribe', 'limit', 'noBatch', 'unique');
49
  topLog.info("Declaring queue '%s' on connection '%s' with the options: %s",
12,678✔
50
    options.uniqueName, connectionName, JSON.stringify(options));
51

52
  let queuePromise;
53

54
  if (options.passive) {
12,678✔
55
    queuePromise = channel.checkQueue(options.uniqueName);
12,526✔
56
  } else {
57
    queuePromise = channel.assertQueue(options.uniqueName, valid);
152✔
58
  }
59

60
  return queuePromise.then(function (q) {
12,678✔
61
    if (options.limit) {
154✔
62
      channel.prefetch(options.limit);
12✔
63
    }
64
    return q;
154✔
65
  });
66
}
67

68
function finalize (channel, messages) {
69
  messages.reset();
136✔
70
  messages.ignoreSignal();
136✔
71
  channel.release();
136✔
72
  channel = undefined;
136✔
73
}
74

75
function getContentType (body, options) {
76
  if (options && options.contentType) {
28!
77
    return options.contentType;
×
78
  } else if (typeof body === 'string') {
28✔
79
    return 'text/plain';
22✔
80
  } else if (typeof body === 'object' && !body.length) {
6!
81
    return 'application/json';
6✔
82
  } else {
83
    return 'application/octet-stream';
×
84
  }
85
}
86

87
function getCount (messages) {
88
  if (messages) {
198!
89
    return messages.messages.length;
198✔
90
  } else {
91
    return 0;
×
92
  }
93
}
94

95
function getNoBatchOps (channel, raw, messages, noAck) {
96
  messages.receivedCount += 1;
20✔
97

98
  let ack, nack, reject;
99
  if (noAck) {
20!
100
    ack = noOp;
×
101
    nack = function () {
×
102
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
103
    };
104
    reject = function () {
×
105
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
106
    };
107
  } else {
108
    ack = function () {
20✔
109
      log.debug("Acking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
18✔
110
      channel.ack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
18✔
111
    };
112
    nack = function () {
20✔
113
      log.debug("Nacking tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
114
      channel.nack({ fields: { deliveryTag: raw.fields.deliveryTag } }, false);
×
115
    };
116
    reject = function () {
20✔
117
      log.debug("Rejecting tag %d on '%s' - '%s'", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
118
      channel.reject({ fields: { deliveryTag: raw.fields.deliveryTag } }, false, false);
×
119
    };
120
  }
121

122
  return {
20✔
123
    ack: ack,
124
    nack: nack,
125
    reject: reject
126
  };
127
}
128

129
function getReply (channel, serializers, raw, replyQueue, connectionName) {
130
  let position = 0;
2,258✔
131
  return function (reply, options) {
2,258✔
132
    const defaultReplyType = raw.type + '.reply';
28✔
133
    const replyType = options ? (options.replyType || defaultReplyType) : defaultReplyType;
28✔
134
    const contentType = getContentType(reply, options);
28✔
135
    const serializer = serializers[contentType];
28✔
136
    if (!serializer) {
28!
137
      const message = format('Failed to publish message with contentType %s - no serializer defined', contentType);
×
138
      log.error(message);
×
139
      return Promise.reject(new Error(message));
×
140
    }
141
    const payload = serializer.serialize(reply);
28✔
142

143
    const replyTo = raw.properties.replyTo;
28✔
144
    raw.ack();
28✔
145
    if (replyTo) {
28!
146
      const publishOptions = {
28✔
147
        type: replyType,
148
        contentType: contentType,
149
        contentEncoding: 'utf8',
150
        correlationId: raw.properties.messageId,
151
        timestamp: options && options.timestamp ? options.timestamp : Date.now(),
62!
152
        replyTo: replyQueue === false ? undefined : replyQueue,
28!
153
        headers: options && options.headers ? options.headers : {}
62!
154
      };
155
      if (options && options.more) {
28✔
156
        publishOptions.headers.position = (position++);
6✔
157
      } else {
158
        publishOptions.headers.sequence_end = true; // jshint ignore:line
22✔
159
      }
160
      log.debug("Replying to message %s on '%s' - '%s' with type '%s'",
28✔
161
        raw.properties.messageId,
162
        replyTo,
163
        connectionName,
164
        publishOptions.type);
165
      if (raw.properties.headers && raw.properties.headers['direct-reply-to']) {
28✔
166
        return channel.publish(
6✔
167
          '',
168
          replyTo,
169
          payload,
170
          publishOptions
171
        );
172
      } else {
173
        return channel.sendToQueue(replyTo, payload, publishOptions);
22✔
174
      }
175
    } else {
176
      return Promise.reject(new Error('Cannot reply to a message that has no return address'));
×
177
    }
178
  };
179
}
180

181
function getResolutionOperations (channel, raw, messages, options) {
182
  if (options.noBatch) {
2,258✔
183
    return getNoBatchOps(channel, raw, messages, options.noAck);
20✔
184
  }
185

186
  if (options.noAck || options.noBatch) {
2,238✔
187
    return getUntrackedOps(channel, raw, messages);
36✔
188
  }
189

190
  return getTrackedOps(raw, messages);
2,202✔
191
}
192

193
function getTrackedOps (raw, messages) {
194
  return messages.getMessageOps(raw.fields.deliveryTag);
2,202✔
195
}
196

197
function getUntrackedOps (channel, raw, messages) {
198
  messages.receivedCount += 1;
36✔
199
  return {
36✔
200
    ack: noOp,
201
    nack: function () {
202
      log.error("Tag %d on '%s' - '%s' cannot be nacked in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
203
    },
204
    reject: function () {
205
      log.error("Tag %d on '%s' - '%s' cannot be rejected in noAck mode - message will be lost!", raw.fields.deliveryTag, messages.name, messages.connectionName);
×
206
    }
207
  };
208
}
209

210
// purging an auto-delete queue means unsubscribing is not
211
// an option as it will cause the queue, binding and possibly
212
// upstream auto-delete exchanges to be deleted as well
213
function purgeADQueue (channel, connectionName, options, messages) {
214
  const name = options.uniqueName || options.name;
4!
215
  return new Promise(function (resolve, reject) {
4✔
216
    const messageCount = messages.messages.length;
4✔
217
    if (messageCount > 0) {
4✔
218
      log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
219
      messages.once('empty', function () {
2✔
220
        channel.purgeQueue(name)
2✔
221
          .then(
222
            result => resolve(result.messageCount),
2✔
223
            reject
224
          );
225
      });
226
    } else {
227
      channel.purgeQueue(name)
2✔
228
        .then(
229
          result => resolve(result.messageCount),
2✔
230
          reject
231
        );
232
    }
233
  });
234
}
235

236
// queues not marked auto-delete should be unsubscribed from
237
// in order to stop incoming messages while the purge is
238
// taking place and avoid arrival of additional new messages
239
function purgeQueue (channel, connectionName, options, messages) {
240
  const name = options.uniqueName || options.name;
2!
241
  return new Promise(function (resolve, reject) {
2✔
242
    function onUnsubscribed () {
243
      const messageCount = messages.messages.length;
2✔
244
      if (messageCount > 0) {
2!
245
        log.info(`Purge operation for queue '${options.name}' on '${connectionName}' is waiting for resolution on ${messageCount} messages`);
2✔
246
        messages.once('empty', function () {
2✔
247
          channel.purgeQueue(name)
2✔
248
            .then(
249
              result => resolve(result.messageCount),
2✔
250
              reject
251
            );
252
        });
253
      } else {
254
        channel.purgeQueue(name)
×
255
          .then(
256
            result => resolve(result.messageCount),
×
257
            reject
258
          );
259
      }
260
    }
261
    log.info(`Stopping subscription on '${options.name}' on '${connectionName}' before purging`);
2✔
262
    unsubscribe(channel, options)
2✔
263
      .then(onUnsubscribed, onUnsubscribed);
264
  });
265
}
266

267
function purge (channel, connectionName, options, messages, definer) {
268
  log.info(`Checking queue length on '${options.name}' on '${connectionName}' before purging`);
6✔
269
  return definer()
6✔
270
    .then(
271
      q => {
272
        if (q.messageCount > 0) {
6!
273
          const promise = options.autoDelete
6✔
274
            ? purgeADQueue(channel, connectionName, options, messages)
275
            : purgeQueue(channel, connectionName, options, messages);
276
          return promise
6✔
277
            .then(
278
              count => {
279
                log.info(`Purged ${count} messages from '${options.name}' on '${connectionName}'`);
6✔
280
                return count;
6✔
281
              }
282
            );
283
        } else {
284
          log.info(`'${options.name}' on '${connectionName}' was already empty when purge was called`);
×
285
          return Promise.resolve(0);
×
286
        }
287
      },
288
      Promise.reject
289
    );
290
}
291

292
function release (channel, options, messages, released) {
293
  function onUnsubscribed () {
294
    return new Promise(function (resolve) {
136✔
295
      const messageCount = messages.messages.length;
136✔
296
      if (messageCount > 0 && !released) {
136✔
297
        log.info(`Release operation for queue '${options.name}' is waiting for resolution on ${messageCount} messages`);
62✔
298
        messages.once('empty', function () {
62✔
299
          finalize(channel, messages);
62✔
300
          resolve();
62✔
301
        });
302
      } else {
303
        finalize(channel, messages);
74✔
304
        resolve();
74✔
305
      }
306
    });
307
  }
308
  return unsubscribe(channel, options)
136✔
309
    .then(onUnsubscribed, onUnsubscribed);
310
}
311

312
function resolveTags (channel, queue, connection) {
313
  return function (op, data) {
150✔
314
    switch (op) {
268!
315
      case 'ack':
316
        log.debug("Acking tag %d on '%s' - '%s'", data.tag, queue, connection);
68✔
317
        return channel.ack({ fields: { deliveryTag: data.tag } }, data.inclusive);
68✔
318
      case 'nack':
319
        log.debug("Nacking tag %d on '%s' - '%s'", data.tag, queue, connection);
×
320
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive);
×
321
      case 'reject':
322
        log.debug("Rejecting tag %d on '%s' - '%s'", data.tag, queue, connection);
8✔
323
        return channel.nack({ fields: { deliveryTag: data.tag } }, data.inclusive, false);
8✔
324
      default:
325
        return Promise.resolve(true);
192✔
326
    }
327
  };
328
}
329

330
function subscribe (channelName, channel, topology, serializers, messages, options, exclusive) {
331
  const shouldAck = !options.noAck;
142✔
332
  const shouldBatch = !options.noBatch;
142✔
333
  const shouldCacheKeys = !options.noCacheKeys;
142✔
334
  // this is done to support rabbit-assigned queue names
335
  channelName = channelName || options.name;
142✔
336
  if (shouldAck && shouldBatch) {
142✔
337
    messages.listenForSignal();
130✔
338
  }
339

340
  options.consumerTag = info.createTag(channelName);
142✔
341
  if (Object.keys(channel.item.consumers).length > 0) {
142✔
342
    log.info('Duplicate subscription to queue %s ignored', channelName);
2✔
343
    return Promise.resolve(options.consumerTag);
2✔
344
  }
345
  log.info("Starting subscription to queue '%s' on '%s'", channelName, topology.connection.name);
140✔
346
  return channel.consume(channelName, function (raw) {
140✔
347
    if (!raw) {
2,258!
348
      // this happens when the consumer has been cancelled
349
      log.warn("Queue '%s' was sent a consumer cancel notification");
×
350
      throw new Error('Broker cancelled the consumer remotely');
×
351
    }
352
    const correlationId = raw.properties.correlationId;
2,258✔
353
    const ops = getResolutionOperations(channel, raw, messages, options);
2,258✔
354

355
    raw.ack = ops.ack.bind(ops);
2,258✔
356
    raw.reject = ops.reject.bind(ops);
2,258✔
357
    raw.nack = ops.nack.bind(ops);
2,258✔
358
    raw.reply = getReply(channel, serializers, raw, topology.replyQueue.name, topology.connection.name);
2,258✔
359
    raw.type = raw.properties.type || raw.fields.routingKey;
2,258✔
360
    if (exclusive) {
2,258!
361
      options.exclusive = true;
×
362
    }
363
    raw.queue = channelName;
2,258✔
364
    const parts = [options.name.replace(/[.]/g, '-')];
2,258✔
365
    if (raw.type) {
2,258✔
366
      parts.push(raw.type);
2,246✔
367
    }
368
    let topic = parts.join('.');
2,258✔
369
    const contentType = raw.properties.contentType || 'application/octet-stream';
2,258!
370
    const serializer = serializers[contentType];
2,258✔
371
    const track = () => {
2,258✔
372
      if (shouldAck && shouldBatch) {
2,258✔
373
        messages.addMessage(ops);
2,202✔
374
      }
375
    };
376
    if (!serializer) {
2,258!
377
      if (options.poison) {
×
378
        raw.body = raw.content;
×
379
        raw.contentEncoding = raw.properties.contentEncoding;
×
380
        raw.quarantined = true;
×
381
        topic = `${topic}.quarantined`;
×
382
      } else {
383
        log.error("Could not deserialize message id %s on queue '%s', connection '%s' - no serializer defined",
×
384
          raw.properties.messageId, channelName, topology.connection.name);
385
        track();
×
386
        ops.reject();
×
387
        return;
×
388
      }
389
    } else {
390
      try {
2,258✔
391
        raw.body = serializer.deserialize(raw.content, raw.properties.contentEncoding);
2,258✔
392
      } catch (err) {
393
        if (options.poison) {
4✔
394
          raw.quarantined = true;
2✔
395
          raw.body = raw.content;
2✔
396
          raw.contentEncoding = raw.properties.contentEncoding;
2✔
397
          topic = `${topic}.quarantined`;
2✔
398
        } else {
399
          track();
2✔
400
          ops.reject();
2✔
401
          return;
2✔
402
        }
403
      }
404
    }
405

406
    const onPublish = function (data) {
2,256✔
407
      let handled;
408

409
      if (data.activated) {
2,256✔
410
        handled = true;
2,198✔
411
      }
412
      track();
2,256✔
413

414
      if (!handled) {
2,256✔
415
        unhandledLog.warn("Message of %s on queue '%s', connection '%s' was not processed by any registered handlers",
58✔
416
          raw.type,
417
          channelName,
418
          topology.connection.name
419
        );
420
        topology.onUnhandled(raw);
58✔
421
      }
422
    };
423

424
    if (raw.fields.routingKey === topology.replyQueue.name) {
2,256✔
425
      responses.publish(
28✔
426
        {
427
          topic: correlationId,
428
          headers: {
429
            resolverNoCache: true
430
          },
431
          data: raw
432
        },
433
        onPublish
434
      );
435
    } else {
436
      dispatch.publish({
2,228✔
437
        topic: topic,
438
        headers: {
439
          resolverNoCache: !shouldCacheKeys
440
        },
441
        data: raw
442
      }, onPublish);
443
    }
444
  }, options)
445
    .then(function (result) {
446
      channel.tag = result.consumerTag;
140✔
447
      return result;
140✔
448
    }, function (err) {
449
      log.error('Error on channel consume', options);
×
450
      throw err;
×
451
    });
452
}
453

454
function unsubscribe (channel, options) {
455
  if (channel.tag) {
138✔
456
    log.info("Unsubscribing from queue '%s' with tag %s", options.name, channel.tag);
132✔
457
    return channel.cancel(channel.tag);
132✔
458
  } else {
459
    return Promise.resolve();
6✔
460
  }
461
}
462

463
module.exports = function (options, topology, serializers) {
2✔
464
  const channelName = ['queue', options.uniqueName].join(':');
156✔
465
  return topology.connection.getChannel(channelName, false, 'queue channel for ' + options.name)
156✔
466
    .then(function (channel) {
467
      const messages = new AckBatch(options.name, topology.connection.name, resolveTags(channel, options.name, topology.connection.name));
150✔
468
      const subscriber = subscribe.bind(undefined, options.uniqueName, channel, topology, serializers, messages, options);
150✔
469
      const definer = define.bind(undefined, channel, options, subscriber, topology.connection.name);
150✔
470
      return {
150✔
471
        channel: channel,
472
        messages: messages,
473
        define: definer,
474
        finalize: finalize.bind(undefined, channel, messages),
475
        getMessageCount: getCount.bind(undefined, messages),
476
        purge: purge.bind(undefined, channel, topology.connection.name, options, messages, definer),
477
        release: release.bind(undefined, channel, options, messages),
478
        subscribe: subscriber,
479
        unsubscribe: unsubscribe.bind(undefined, channel, options, messages)
480
      };
481
    });
482
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc