• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / iotagent-json / 6432399895

06 Oct 2023 01:39PM UTC coverage: 77.093% (-0.2%) from 77.279%
6432399895

Pull #760

github

web-flow
Merge e7c81881d into 555a350a4
Pull Request #760: Prelanding/fix binary data

364 of 540 branches covered (0.0%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

824 of 1001 relevant lines covered (82.32%)

70.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.57
/lib/bindings/MQTTBinding.js
1
/*
2
 * Copyright 2016 Telefonica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of iotagent-json
5
 *
6
 * iotagent-json is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * iotagent-json is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with iotagent-json.
18
 * If not, seehttp://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with::[contacto@tid.es]
22
 */
23

24
/* eslint-disable consistent-return */
25
/* eslint-disable no-unused-vars */
26

27
const fs = require('fs');
1✔
28
const iotAgentLib = require('iotagent-node-lib');
1✔
29
const mqtt = require('mqtt');
1✔
30
const commonBindings = require('../commonBindings');
1✔
31
const async = require('async');
1✔
32
const iotaUtils = require('../iotaUtils');
1✔
33
const constants = require('../constants');
1✔
34
const context = {
1✔
35
    op: 'IOTAJSON.MQTT.Binding'
36
};
37
let mqttClient;
38
let mqttConn;
39
const config = require('../configService');
1✔
40

41
/**
42
 * Generate the list of global topics to listen to.
43
 */
44
function generateTopics(callback) {
45
    const topics = [];
325✔
46

47
    config.getLogger().debug(context, 'Generating topics');
325✔
48

49
    // With leading slashes
50
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX + '/+');
325✔
51
    topics.push(
325✔
52
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
53
            '/' +
54
            constants.MQTT_TOPIC_PROTOCOL +
55
            '/+/+/' +
56
            constants.MEASURES_SUFIX +
57
            '/+'
58
    );
59
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX);
325✔
60
    topics.push(
325✔
61
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
62
            '/' +
63
            constants.MQTT_TOPIC_PROTOCOL +
64
            '/+/+/' +
65
            constants.MEASURES_SUFIX
66
    );
67
    topics.push(
325✔
68
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
69
            '/+/+/' +
70
            constants.CONFIGURATION_SUFIX +
71
            '/' +
72
            constants.CONFIGURATION_COMMAND_SUFIX
73
    );
74
    topics.push(
325✔
75
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
76
            '/' +
77
            constants.MQTT_TOPIC_PROTOCOL +
78
            '/+/+/' +
79
            constants.CONFIGURATION_SUFIX +
80
            '/' +
81
            constants.CONFIGURATION_COMMAND_SUFIX
82
    );
83
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
325✔
84
    topics.push(
325✔
85
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
86
            '/' +
87
            constants.MQTT_TOPIC_PROTOCOL +
88
            '/+/+/' +
89
            constants.CONFIGURATION_COMMAND_UPDATE
90
    );
91

92
    //Without leading slashes
93
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX + '/+');
325✔
94
    topics.push(
325✔
95
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
96
            constants.MQTT_TOPIC_PROTOCOL +
97
            '/+/+/' +
98
            constants.MEASURES_SUFIX +
99
            '/+'
100
    );
101
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX);
325✔
102
    topics.push(
325✔
103
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX
104
    );
105
    topics.push(
325✔
106
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
107
            '/+/+/' +
108
            constants.CONFIGURATION_SUFIX +
109
            '/' +
110
            constants.CONFIGURATION_COMMAND_SUFIX
111
    );
112
    topics.push(
325✔
113
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
114
            constants.MQTT_TOPIC_PROTOCOL +
115
            '/+/+/' +
116
            constants.CONFIGURATION_SUFIX +
117
            '/' +
118
            constants.CONFIGURATION_COMMAND_SUFIX
119
    );
120
    topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
325✔
121
    topics.push(
325✔
122
        constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
123
            constants.MQTT_TOPIC_PROTOCOL +
124
            '/+/+/' +
125
            constants.CONFIGURATION_COMMAND_UPDATE
126
    );
127

128
    callback(null, topics);
325✔
129
}
130

131
/**
132
 * Recreate the MQTT subscriptions for all the registered devices.
133
 */
134
function recreateSubscriptions(callback) {
135
    config.getLogger().debug(context, 'Recreating subscriptions for all devices');
206✔
136

137
    function subscribeToTopics(topics, callback) {
138
        config.getLogger().debug(context, 'Subscribing to topics: %j', topics);
206✔
139
        const options = {};
206✔
140
        mqttClient.subscribe(topics, options, function (error) {
206✔
141
            if (error) {
206!
142
                iotAgentLib.alarms.raise(constants.MQTTB_ALARM, error);
×
143
                config.getLogger().error(context, 'GLOBAL-001: Error subscribing to topics: %s', error);
×
144
                callback(error);
×
145
            } else {
146
                iotAgentLib.alarms.release(constants.MQTTB_ALARM);
206✔
147
                config.getLogger().info(context, 'Successfully subscribed to the following topics:\n%j\n', topics);
206✔
148
                if (callback) {
206!
149
                    callback(null);
206✔
150
                }
151
            }
152
        });
153
    }
154

155
    async.waterfall([generateTopics, subscribeToTopics], callback);
206✔
156
}
157

158
/**
159
 * Extract all the information from a Context Broker response and send it to the topic indicated by the APIKey and
160
 * DeviceId.
161
 *
162
 * @param {String} apiKey           API Key for the Device Group
163
 * @param {String} deviceId         ID of the Device.
164
 * @param {Object} results          Context Broker response.
165
 */
166
function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
167
    const configurations = iotaUtils.createConfigurationNotification(results);
4✔
168
    const options = {};
4✔
169
    if (config.getConfig().mqtt.qos) {
4!
170
        options.qos = parseInt(config.getConfig().mqtt.qos) || 0;
×
171
    }
172
    if (config.getConfig().mqtt.retain === true) {
4!
173
        options.retain = config.getConfig().mqtt.retain;
×
174
    }
175
    config.getLogger().debug(context, 'Sending requested configuration to the device:\n %j', configurations);
4✔
176
    const leadingSlash = config.getConfig().mqtt.avoidLeadingSlash ? '' : '/';
4!
177

178
    const commandTopic =
179
        leadingSlash +
4✔
180
        apiKey +
181
        '/' +
182
        deviceId +
183
        '/' +
184
        constants.CONFIGURATION_SUFIX +
185
        '/' +
186
        constants.CONFIGURATION_VALUES_SUFIX;
187
    //prettier-ignore
188
    if (mqttClient.connected) {
4!
189
        mqttClient.publish(
4✔
190
            commandTopic,
191
            JSON.stringify(configurations),
192
            options,
193
            (error) => {
194
                if (error) {
4!
195
                    config
×
196
                        .getLogger()
197
                        .error(
198
                            context,
199
                            'Error %j in Configuration:\n %j sent to the device %s with mqtt options %j',
200
                            error,
201
                            JSON.stringify(configurations),
202
                            commandTopic,
203
                            options
204
                        );
205
                }
206
            });
207
        config.getLogger().info(context, 'Configuration:\n %j was sent to the device: %s', configurations, commandTopic);
4✔
208
    } else {
209
        config.getLogger().error(context, 'Configuration:\n %j was not set to the device: %s due to not connected', configurations, commandTopic);
×
210
    }
211
    callback();
4✔
212
}
213

214
/**
215
 * Unsubscribe the MQTT Client from all the topics.
216
 */
217
function unsubscribeAll(callback) {
218
    function unsubscribeFromTopics(topics, callback) {
219
        mqttClient.unsubscribe(topics, null);
119✔
220

221
        callback();
119✔
222
    }
223

224
    async.waterfall([generateTopics, unsubscribeFromTopics], callback);
119✔
225
}
226

227
/**
228
 * Start the binding.
229
 */
230
function start(callback) {
231
    const mqttConfig = config.getConfig().mqtt;
119✔
232
    if (!mqttConfig) {
119!
233
        return config.getLogger().error(context, 'Error MQTT is not configured');
×
234
    }
235
    if (mqttConfig.disabled) {
119!
236
        return config.getLogger().warn(context, 'MQTT is disabled');
×
237
    }
238
    const rejectUnauthorized =
239
        typeof mqttConfig.rejectUnauthorized === 'boolean' ? mqttConfig.rejectUnauthorized : true;
119!
240
    let rndSuffix = '_' + Math.random().toString(16).substr(2, 8);
119✔
241
    const options = {
119✔
242
        protocol: mqttConfig.protocol ? mqttConfig.protocol : 'mqtt',
119!
243
        host: mqttConfig.host ? mqttConfig.host : 'localhost',
119!
244
        port: mqttConfig.port ? mqttConfig.port : 1883,
119!
245
        key: mqttConfig.key ? fs.readFileSync(mqttConfig.key, 'utf8') : null,
119!
246
        ca: mqttConfig.ca ? fs.readFileSync(mqttConfig.ca, 'utf8') : null,
119!
247
        cert: mqttConfig.cert ? fs.readFileSync(mqttConfig.cert, 'utf8') : null,
119!
248
        rejectUnauthorized,
249
        username: mqttConfig.username ? mqttConfig.username : null,
119!
250
        password: mqttConfig.password ? mqttConfig.password : null,
119!
251
        clean: typeof mqttConfig.clean === 'boolean' ? mqttConfig.clean : true,
119!
252
        clientId: mqttConfig.clientId ? mqttConfig.clientId + rndSuffix : 'iotajson' + rndSuffix,
119!
253
        keepalive: mqttConfig.keepalive ? parseInt(mqttConfig.keepalive) : 60,
119!
254
        connectTimeout: 60 * 60 * 1000
255
    };
256

257
    const retries = mqttConfig.retries ? mqttConfig.retries : constants.MQTT_DEFAULT_RETRIES;
119!
258
    const retryTime = mqttConfig.retryTime ? mqttConfig.retryTime : constants.MQTT_DEFAULT_RETRY_TIME;
119!
259
    let isConnecting = false;
119✔
260
    let numRetried = 1; // retries will be disabled when MQTT_DEFAULT_RETRIES=0
119✔
261
    config
119✔
262
        .getLogger()
263
        .info(context, 'Starting MQTT binding with options %j retries %s retryTIme %s', options, retries, retryTime);
264

265
    function createConnection(callback) {
266
        config.getLogger().debug(context, 'creating connection');
207✔
267
        if (isConnecting) {
207!
268
            return;
×
269
        }
270
        isConnecting = true;
207✔
271
        // Ensure clientId is unique when reconnect to avoid loop closing old connection which the same name
272
        rndSuffix = '_' + Math.random().toString(16).substr(2, 8);
207✔
273
        options.clientId = mqttConfig.clientId ? mqttConfig.clientId + rndSuffix : 'iotajson' + rndSuffix;
207!
274
        mqttClient = mqtt.connect(options.protocol + '://' + mqttConfig.host + ':' + mqttConfig.port, options);
207✔
275
        isConnecting = false;
207✔
276
        if (!mqttClient) {
207!
277
            config.getLogger().error(context, 'error mqttClient not created');
×
278
            if (numRetried <= retries) {
×
279
                numRetried++;
×
280
                return setTimeout(createConnection, retryTime * 1000, callback);
×
281
            }
282
        }
283
        mqttClient.on('error', function (e) {
207✔
284
            /*jshint quotmark: double */
285
            config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e);
×
286
            /*jshint quotmark: single */
287
            mqttClient.end();
×
288
        });
289
        mqttClient.on('message', commonBindings.mqttMessageHandler);
207✔
290
        mqttClient.on('connect', function (ack) {
207✔
291
            config.getLogger().info(context, 'MQTT Client connected');
206✔
292
            recreateSubscriptions();
206✔
293
            numRetried = 1;
206✔
294
        });
295
        mqttClient.on('reconnect', function () {
207✔
296
            config.getLogger().debug(context, 'MQTT Client reconnect');
×
297
        });
298
        mqttClient.on('offline', function () {
207✔
299
            config.getLogger().debug(context, 'MQTT Client offline');
×
300
        });
301
        mqttClient.on('close', function () {
207✔
302
            config.getLogger().info(context, 'MQTT Client closed');
119✔
303
            // If mqttConn is null, the connection has been closed on purpose
304
            if (mqttConn) {
119✔
305
                config.getLogger().debug(context, 'MQTT Client closed connected? %s', mqttClient.connected);
118✔
306
                if (!mqttClient.connected && numRetried <= retries) {
118!
307
                    config.getLogger().warn(context, 'reconnecting #%s...', numRetried);
118✔
308
                    numRetried++;
118✔
309
                    return setTimeout(createConnection, retryTime * 1000);
118✔
310
                }
311
            } else {
312
                // Do nothing
313
            }
314
        });
315
        config.getLogger().info(context, 'connected');
207✔
316
        mqttConn = mqttClient;
207✔
317
        if (callback) {
207✔
318
            callback();
119✔
319
        }
320
    } // function createConnection
321

322
    async.waterfall([createConnection], function (error) {
119✔
323
        if (error) {
119!
324
            config.getLogger().info('MQTT error %j', error);
×
325
        }
326
        callback();
119✔
327
    });
328
}
329

330
/**
331
 * Device provisioning handler.
332
 *
333
 * @param {Object} device           Device object containing all the information about the provisioned device.
334
 */
335
function deviceProvisioningHandler(device, callback) {
336
    callback(null, device);
113✔
337
}
338

339
/**
340
 * Device updating handler.
341
 *
342
 * @param {Object} device           Device object containing all the information about the provisioned device.
343
 */
344
function deviceUpdatingHandler(device, callback) {
345
    callback(null, device);
2✔
346
}
347

348
/**
349
 * Stop the binding, releasing its resources.
350
 */
351
function stop(callback) {
352
    config.getLogger().info('Stopping MQTT Binding');
119✔
353

354
    async.series([unsubscribeAll, mqttClient.end.bind(mqttClient, true)], function () {
119✔
355
        config.getLogger().info('MQTT Binding Stopped');
119✔
356
        if (mqttConn) {
119!
357
            mqttConn = null;
119✔
358
        }
359
        callback();
119✔
360
    });
361
}
362

363
/**
364
 * Execute a command for the device represented by the device object and the given APIKey, sending the serialized
365
 * JSON payload (already containing the command information).
366
 *
367
 * @param {String} apiKey                   APIKey of the device that will be receiving the command.
368
 * @param {Object} device                   Data object for the device receiving the command.
369
 * @param {String} serializedPayload        String payload in JSON format for the command.
370
 */
371
function executeCommand(apiKey, device, cmdName, serializedPayload, contentType, callback) {
372
    const options = {};
10✔
373
    // retrieve command mqtt options from device
374
    var commands = Object.assign({}, ...device.commands.map((c) => ({ [c.name]: c })));
40✔
375

376
    options.qos =
10✔
377
        commands[cmdName].mqtt && commands[cmdName].mqtt.qos
20!
378
            ? parseInt(commands[cmdName].mqtt.qos)
379
            : config.getConfig().mqtt.qos
10!
380
            ? parseInt(config.getConfig().mqtt.qos)
381
            : 0;
382
    options.retain =
10✔
383
        commands[cmdName].mqtt && commands[cmdName].mqtt.retain
20!
384
            ? commands[cmdName].mqtt.retain
385
            : config.getConfig().mqtt.retain
10!
386
            ? config.getConfig().mqtt.retain
387
            : false;
388

389
    const commandTopic = '/' + apiKey + '/' + device.id + '/cmd';
10✔
390
    config
10✔
391
        .getLogger()
392
        .debug(
393
            context,
394
            'Sending command execution to [%s] with payload [%s] and with mqtt options [%j]',
395
            commandTopic,
396
            serializedPayload,
397
            options
398
        );
399
    if (mqttClient.connected) {
10!
400
        mqttClient.publish(commandTopic, serializedPayload, options, (error) => {
10✔
401
            if (error) {
10!
402
                config
×
403
                    .getLogger()
404
                    .error(
405
                        context,
406
                        'Error %j in Cmd:\n %j sent to the device %s with mqtt options %j',
407
                        error,
408
                        serializedPayload,
409
                        commandTopic,
410
                        options
411
                    );
412
            }
413
        });
414
        config
10✔
415
            .getLogger()
416
            .info(
417
                context,
418
                'Cmd:\n %j was sent to the device %s with mqtt options %j',
419
                serializedPayload,
420
                commandTopic,
421
                options
422
            );
423
    } else {
424
        config.getLogger().error(context, 'Cmd: not set due to not connected');
×
425
    }
426
    callback();
10✔
427
}
428

429
exports.start = start;
1✔
430
exports.stop = stop;
1✔
431
exports.sendConfigurationToDevice = sendConfigurationToDevice;
1✔
432
exports.deviceProvisioningHandler = deviceProvisioningHandler;
1✔
433
exports.deviceUpdatingHandler = deviceUpdatingHandler;
1✔
434
exports.executeCommand = executeCommand;
1✔
435
exports.protocol = 'MQTT';
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc