• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Akamaozu / tibbarkcaj / #85

14 Jan 2025 11:09AM UTC coverage: 72.414% (-0.3%) from 72.759%
#85

push

Akamaozu
test: cleanup exchanges created for tests

111 of 179 branches covered (62.01%)

Branch coverage included in aggregate %.

309 of 401 relevant lines covered (77.06%)

22.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.97
/lib/queue.js
1
'use strict';
2

3
const Extend = require('lodash.assignin');
1✔
4
const EventEmitter = require('events').EventEmitter;
1✔
5

6
const DEFAULT_QUEUE_OPTIONS = {
1✔
7
    exclusive: false,
8
    durable: true,
9
    prefetch: 1,              // can be set on the queue because we use a per-queue channel
10
    messageTtl: undefined,
11
    maxLength: undefined
12
};
13

14
const DEFAULT_CONSUME_OPTIONS = {
1✔
15
    consumerTag: undefined,
16
    noAck: false,
17
    exclusive: false,
18
    priority: undefined
19
};
20

21
const queue = (options) => {
1✔
22

23
    const consumers = [];
14✔
24

25
    const connect = (connection) => {
14✔
26

27
        connection.createChannel(onChannel);
13✔
28
    };
29

30
    const consume = (callback, consumeOptions) => {
14✔
31

32
        const onMessage = (msg) => {
8✔
33

34
            const data = parseMessage(msg);
6✔
35

36
            const ack = (reply) => {
6✔
37

38
                const replyTo = msg.properties.replyTo;
1✔
39
                const id = msg.properties.correlationId;
1✔
40
                if (replyTo && id) {
1!
41
                    const buffer = encodeMessage(reply, msg.properties.contentType);
×
42
                    channel.publish('', replyTo, buffer, {
×
43
                        correlationId: id,
44
                        contentType: msg.properties.contentType
45
                    });
46
                }
47

48
                channel.ack(msg);
1✔
49
            };
50

51
            const nack = (opts) => {
6✔
52

53
                opts = opts || {};
×
54
                opts.allUpTo = opts.allUpTo !== undefined ? opts.allUpTo : false;
×
55
                opts.requeue = opts.requeue !== undefined ? opts.requeue : true;
×
56
                channel.nack(msg, opts.allUpTo, opts.requeue);
×
57
            };
58

59
            callback(data, ack, nack, msg);
6✔
60
        };
61

62
        if ( ready ){
8✔
63

64
            const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
1✔
65
            channel.consume(emitter.amqLabel, onMessage, opts, onConsume);
1✔
66
            consumers.push({ onMessage, opts });
1✔
67
            return;
1✔
68
        }
69

70
        const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
7✔
71
        consumers.push({ onMessage, opts });
7✔
72
    };
73

74
    const cancel = (done) => {
14✔
75

76
        if (!consumerTag) {
7!
77
            return;
×
78
        }
79

80
        if (!channel) {
7!
81
            return;
×
82
        }
83

84
        channel.cancel(consumerTag, done);
7✔
85
    };
86

87
    const purge = (done) => {
14✔
88

89
        const onPurged = (err, obj) => {
6✔
90

91
            if (err) {
6!
92
                return done(err);
×
93
            }
94

95
            done(undefined, obj.messageCount);
6✔
96
        };
97

98
        if (channel) {
6!
99
            channel.purgeQueue(emitter.amqLabel, onPurged);
6✔
100
        }
101
        else {
102
            emitter.once('ready', () => {
×
103

104
                channel.purgeQueue(emitter.amqLabel, onPurged);
×
105
            });
106
        }
107
    };
108

109
    const encodeMessage = (message, contentType) => {
14✔
110

111
        if (contentType === 'application/json') {
×
112
            return Buffer.from(JSON.stringify(message));
×
113
        }
114

115
        return Buffer.from(message.toString());
×
116
    };
117

118
    const parseMessage = (msg) => {
14✔
119

120
        msg = msg || {};
6!
121

122
        if (msg.properties && msg.properties.contentType === 'application/json') {
6!
123
            try {
6✔
124
                return JSON.parse(msg.content.toString());
6✔
125
            }
126
            catch (e) {
127
                emitter.emit('error', new Error('unable to parse message as JSON'));
×
128
                return;
×
129
            }
130
        }
131

132
        return msg.content;
×
133
    };
134

135
    const onConsume = (err, info) => {
14✔
136

137
        if (err) {
8!
138
            return bail(err);
×
139
        }
140

141
        consumerTag = info.consumerTag; // required to stop consuming
8✔
142
        emitter.emit('consuming');
8✔
143
    };
144

145
    const bail = (err) => {
14✔
146
    // TODO: close the channel if still open
147
        channel = undefined;
13✔
148
        emitter.amqLabel = undefined;
13✔
149
        consumerTag = undefined;
13✔
150
        emitter.emit('close', err);
13✔
151
    };
152

153
    const onChannel = (err, chan) => {
14✔
154

155
        if (err) {
13✔
156
            return bail(err);
2✔
157
        }
158

159
        channel = chan;
11✔
160
        channel.prefetch(emitter.options.prefetch);
11✔
161
        channel.once('close', bail.bind(this, new Error('channel closed')));
11✔
162
        emitter.emit('connected');
11✔
163
        channel.assertQueue(emitter.name, emitter.options, onQueue);
11✔
164

165
        emitter.once('ready', () => {
11✔
166

167
            consumers.forEach((consumer) => channel.consume(emitter.amqLabel, consumer.onMessage, consumer.opts, onConsume));
10✔
168
        });
169
    };
170

171
    const onQueue = (err, info) => {
14✔
172

173
        if (err) {
11✔
174
            return bail(err);
1✔
175
        }
176

177
        emitter.amqLabel = info.queue;
10✔
178
        ready = true;
10✔
179
        emitter.emit('ready');
10✔
180
    };
181

182
    options = options || {};
14!
183
    let channel; let consumerTag; let ready;
184
    const emitter = Extend(new EventEmitter(), {
14✔
185
        name: options.name,
186
        amqLabel: undefined, // Holds the current connection's name
187
        options: Extend({}, DEFAULT_QUEUE_OPTIONS, options),
188
        connect,
189
        consume,
190
        cancel,
191
        purge
192
    });
193

194
    return emitter;
14✔
195
};
196

197
module.exports = queue;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc