• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / iotagent-json / 13789149328

11 Mar 2025 01:24PM UTC coverage: 77.155% (-3.0%) from 80.121%
13789149328

Pull #866

github

web-flow
Merge 94c66260f into 11a46f9f3
Pull Request #866: [WIP] add throtting (by topic) using a topic based queue (MQTT, HTTP)

509 of 755 branches covered (67.42%)

Branch coverage included in aggregate %.

13 of 55 new or added lines in 3 files covered. (23.64%)

13 existing lines in 2 files now uncovered.

1102 of 1333 relevant lines covered (82.67%)

134.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.43
/lib/commonBindings.js
1
/*
2
 * Copyright 2016 Telefonica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of iotagent-ul
5
 *
6
 * iotagent-ul is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * iotagent-ul is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with iotagent-ul.
18
 * If not, seehttp://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with::[iot_support@tid.es]
22
 *
23
 * Modified by: Daniel Calvo - ATOS Research & Innovation
24
 */
25

26
/* eslint-disable no-prototype-builtins */
27

28
const iotAgentLib = require('iotagent-node-lib');
1✔
29
const regenerateTransid = iotAgentLib.regenerateTransid;
1✔
30
const intoTrans = iotAgentLib.intoTrans;
1✔
31
const finishSouthBoundTransaction = iotAgentLib.finishSouthBoundTransaction;
1✔
32
const fillService = iotAgentLib.fillService;
1✔
33
const _ = require('underscore');
1✔
34
const commandHandler = require('./commandHandler');
1✔
35
const transportSelector = require('./transportSelector');
1✔
36
const async = require('async');
1✔
37
const iotaUtils = require('./iotaUtils');
1✔
38
const constants = require('./constants');
1✔
39
let context = {
1✔
40
    op: 'IoTAgentJSON.commonBinding'
41
};
42
const config = require('./configService');
1✔
43

44
let lastProcessedTime = {};
1✔
45
const queues = {};
1✔
46

47
/**
48
 * Parse a message received from a Topic.
49
 *
50
 * @param {Buffer} message          Message to be parsed
51
 * @return {Object}                 Parsed message or null if an error has occurred.
52
 */
53
function parseMessage(message) {
54
    let parsedMessage;
55
    let messageArray;
56
    context = fillService(context, { service: 'n/a', subservice: 'n/a' });
56✔
57
    const stringMessage = message.toString();
56✔
58
    try {
56✔
59
        parsedMessage = JSON.parse(stringMessage);
56✔
60
    } catch (e) {
61
        parsedMessage = message.toString('hex');
3✔
62
    }
63
    config.getLogger().debug(context, 'stringMessage: %s parsedMessage: %s', stringMessage, parsedMessage);
56✔
64
    messageArray = [];
56✔
65
    if (Array.isArray(parsedMessage)) {
56✔
66
        if (parsedMessage.length === 1) {
16✔
67
            // Allow single array measures of 1 element not handled like single measures
68
            messageArray.push(parsedMessage);
2✔
69
        } else {
70
            messageArray = parsedMessage;
14✔
71
        }
72
    } else {
73
        messageArray.push(parsedMessage);
40✔
74
    }
75

76
    config.getLogger().debug(context, 'parserMessage array: %s', messageArray);
56✔
77
    return messageArray;
56✔
78
}
79

80
/**
81
 * Find the attribute given by its name between all the active attributes of the given device, returning its type, or
82
 * null otherwise.
83
 *
84
 * @param {String}      attribute   Name of the attribute to find.
85
 * @param {Object}      device      Device object containing all the information about a device.
86
 * @param {measureType} type        Type of measure attribute according with measure when available (ngsiv2 and ngsild measures)
87
 * @return {String}                 String identifier of the attribute type.
88
 */
89
function guessType(attribute, device, measureType) {
90
    if (device.active) {
762!
91
        for (let i = 0; i < device.active.length; i++) {
762✔
92
            if (device.active[i].name === attribute) {
3,549✔
93
                return device.active[i].type;
155✔
94
            }
95
        }
96
    }
97

98
    if (attribute === constants.TIMESTAMP_ATTRIBUTE) {
607✔
99
        return constants.TIMESTAMP_TYPE_NGSI2;
34✔
100
    }
101
    if (measureType) {
573✔
102
        return measureType;
225✔
103
    } else {
104
        return constants.DEFAULT_ATTRIBUTE_TYPE;
348✔
105
    }
106
}
107

108
function extractAttributes(device, current, payloadType) {
109
    let values = [];
221✔
110

111
    const ctxt = fillService(context, device);
221✔
112
    config.getLogger().debug(ctxt, 'extractAttributes current %j payloadType %j', current, payloadType);
221✔
113

114
    if (payloadType && [constants.PAYLOAD_NGSIv2, constants.PAYLOAD_NGSILD].includes(payloadType.toLowerCase())) {
221✔
115
        let arrayEntities = [];
26✔
116
        if (current.hasOwnProperty('actionType') && current.hasOwnProperty('entities')) {
26✔
117
            arrayEntities = current.entities;
6✔
118
        } else {
119
            arrayEntities = [current];
20✔
120
        }
121
        for (const entity of arrayEntities) {
26✔
122
            const valuesEntity = [];
29✔
123
            for (const k in entity) {
29✔
124
                if (entity.hasOwnProperty(k)) {
294!
125
                    if (['id', 'type'].includes(k)) {
294✔
126
                        // Include ngsi id and type as measures by inserting here as is
127
                        // and later in iota-node-lib sendUpdateValueNgsi2 rename as measure_X
128
                        valuesEntity.push({
58✔
129
                            name: k,
130
                            type: guessType(k, device, null),
131
                            value: entity[k]
132
                        });
133
                    } else {
134
                        if (payloadType.toLowerCase() === constants.PAYLOAD_NGSIv2) {
236✔
135
                            valuesEntity.push({
182✔
136
                                name: k,
137
                                type: guessType(k, device, entity[k].type),
138
                                value: entity[k].value,
139
                                metadata: entity[k].metadata ? entity[k].metadata : undefined
182✔
140
                            });
141
                        } else if (payloadType.toLowerCase() === constants.PAYLOAD_NGSILD) {
54!
142
                            const ent = {
54✔
143
                                name: k
144
                            };
145
                            if (k.toLowerCase() === '@context') {
54✔
146
                                ent.type = '@context';
9✔
147
                                ent.value = entity[k];
9✔
148
                            } else {
149
                                if (entity[k].type) {
45!
150
                                    ent.type = guessType(k, device, entity[k].type);
45✔
151
                                    if (['property', 'geoproperty'].includes(entity[k].type.toLowerCase())) {
45✔
152
                                        ent.value = entity[k].value;
36✔
153
                                    } else if (entity[k].type.toLowerCase() === 'relationship') {
9!
154
                                        ent.value = entity[k].object;
9✔
155
                                    }
156
                                }
157
                                // Add other stuff as metadata
158
                                for (const key in entity[k]) {
45✔
159
                                    if (!['type', 'value', 'object'].includes(key.toLowerCase())) {
99✔
160
                                        if (!ent.metadata) {
9!
161
                                            ent.metadata = {};
9✔
162
                                        }
163
                                        ent.metadata[key] = { value: entity[k][key] };
9✔
164
                                    }
165
                                }
166
                            }
167
                            valuesEntity.push(ent);
54✔
168
                        }
169
                    }
170
                }
171
            }
172
            if (arrayEntities.length > 1) {
29✔
173
                values.push(valuesEntity); // like a multimeasure
6✔
174
            } else {
175
                values = valuesEntity;
23✔
176
            }
177
        }
178
    } else {
179
        for (const k in current) {
195✔
180
            if (current.hasOwnProperty(k)) {
462!
181
                values.push({
462✔
182
                    name: k,
183
                    type: guessType(k, device, null),
184
                    value: current[k]
185
                });
186
            }
187
        }
188
    }
189
    return values;
221✔
190
}
191

192
function sendConfigurationToDevice(device, apiKey, group, deviceId, results, callback) {
193
    iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
3!
194
        error,
195
        foundGroup
196
    ) {
197
        if (!error) {
3!
198
            group = foundGroup;
×
199
        }
200
        transportSelector.applyFunctionFromBinding(
3✔
201
            [apiKey, group, deviceId, results],
202
            'sendConfigurationToDevice',
203
            device.transport || group.transport || config.getConfig().defaultTransport,
3!
204
            callback
205
        );
206
    });
207
}
208

209
/**
210
 * Deals with configuration requests coming from the device. Whenever a new configuration requests arrives with a list
211
 * of attributes to retrieve, this handler asks the Context Broker for the values of those attributes, and publish a
212
 * new message in the "/1234/MQTT_2/configuration/values" topic
213
 *
214
 * @param {String} apiKey           API Key corresponding to the Devices configuration.
215
 * @param {String} deviceId         Id of the device to be updated.
216
 * @param {Object} device           Device object containing all the information about a device.
217
 * @param {Object} objMessage          Array of JSON object received.
218
 */
219
function manageConfigurationRequest(apiKey, deviceId, device, objMessage) {
220
    const ctxt = fillService(context, device);
5✔
221
    for (let i = 0; i < objMessage.length; i++) {
5✔
222
        iotaUtils.manageConfiguration(
5✔
223
            apiKey,
224
            deviceId,
225
            device,
226
            objMessage[i],
227
            async.apply(sendConfigurationToDevice, device),
228
            function (error) {
229
                if (error) {
5!
230
                    iotAgentLib.alarms.raise(constants.MQTTB_ALARM, error);
×
231
                } else {
232
                    iotAgentLib.alarms.release(constants.MQTTB_ALARM);
5✔
233
                    config
5✔
234
                        .getLogger()
235
                        .debug(ctxt, 'Configuration request finished for APIKey %s and Device %s', apiKey, deviceId);
236
                }
237
                finishSouthBoundTransaction(null);
5✔
238
            }
239
        );
240
    }
241
}
242

243
/**
244
 * Adds a single measure to the context broker. The message for single measures contains the direct value to
245
 * be inserted in the attribute, given by its name.
246
 *
247
 * @param {String} apiKey           API Key corresponding to the Devices configuration.
248
 * @param {String} deviceId         Id of the device to be updated.
249
 * @param {String} attribute        Name of the attribute to update.
250
 * @param {Object} device           Device object containing all the information about a device.
251
 * @param {Object} parsedMessage    ParsedMessage (JSON or string) message coming from the client.
252
 */
253
function singleMeasure(apiKey, deviceId, attribute, device, parsedMessage) {
254
    context = fillService(context, device);
10✔
255
    config.getLogger().debug(context, 'Processing single measure for device %s with apiKey %s', deviceId, apiKey);
10✔
256

257
    const values = [
10✔
258
        {
259
            name: attribute,
260
            type: guessType(attribute, device, null),
261
            value: parsedMessage.length === 1 ? parsedMessage[0] : parsedMessage
10✔
262
        }
263
    ];
264
    config.getLogger().debug(context, 'values updates %s', JSON.stringify(values));
10✔
265
    iotAgentLib.update(device.name, device.type, '', values, device, function (error) {
10✔
266
        if (error) {
10!
267
            config.getLogger().error(
×
268
                context,
269
                /*jshint quotmark: double */
270
                "MEASURES-002: Couldn't send the updated values to the Context Broker due to an error: %j",
271
                /*jshint quotmark: single */
272
                error
273
            );
274
        } else {
275
            config
10✔
276
                .getLogger()
277
                .debug(context, 'Single measure for device %s with apiKey %s successfully updated', deviceId, apiKey);
278
        }
279
        finishSouthBoundTransaction(null);
10✔
280
    });
281
}
282

283
/**
284
 * Adds multiple measures to the Context Broker. Multiple measures come in the form of single-level JSON objects,
285
 * whose keys are the attribute names and whose values are the attribute values.
286
 *
287
 * @param {String} apiKey           API Key corresponding to the Devices configuration.
288
 * @param {String} deviceId         Id of the device to be updated.
289
 * @param {Object} device           Device object containing all the information about a device.
290
 * @param {Object} messageObj       Array of JSON object sent using.
291
 */
292
function multipleMeasures(apiKey, deviceId, device, messageObj) {
293
    let measure;
294
    let values;
295
    const ctxt = fillService(context, device);
36✔
296
    config.getLogger().debug(context, 'Processing multiple measures for device %s with apiKey %s', deviceId, apiKey);
36✔
297

298
    let attributesArray = [];
36✔
299
    for (let j = 0; j < messageObj.length; j++) {
36✔
300
        measure = messageObj[j];
48✔
301
        values = extractAttributes(device, measure, device.payloadType);
48✔
302
        if (values && values[0] && values[0][0]) {
48✔
303
            // Check multimeasure from a ngsiv2/ngsild entities array
304
            attributesArray = attributesArray.concat(values);
1✔
305
        } else {
306
            attributesArray.push(values);
47✔
307
        }
308
    }
309
    config
36✔
310
        .getLogger()
311
        .debug(
312
            context,
313
            'Processing multiple measures for device %s with apiKey %s values %j',
314
            deviceId,
315
            apiKey,
316
            attributesArray
317
        );
318
    iotAgentLib.update(device.name, device.type, '', attributesArray, device, function (error) {
36✔
319
        if (error) {
36✔
320
            config.getLogger().error(
1✔
321
                ctxt,
322
                /*jshint quotmark: double */
323
                "MEASURES-002: Couldn't send the updated values to the Context Broker due to an error: %j",
324
                /*jshint quotmark: single */
325
                error
326
            );
327
        } else {
328
            config
35✔
329
                .getLogger()
330
                .info(ctxt, 'Multiple measures for device %s with apiKey %s successfully updated', deviceId, apiKey);
331
        }
332
        finishSouthBoundTransaction(null);
36✔
333
    });
334
}
335

336
/**
337
 * Handles an incoming message, extracting the API Key, device Id and attribute to update (in the case of single
338
 * measures) from the topic.
339
 *
340
 * @param {String} topic        Topic of the form: '/<APIKey>/deviceId/attrs[/<attributeName>]'.
341
 * @param {Object} message      message body (Object or Buffer, depending on the value).
342
 */
343
function messageHandler(topic, message) {
344
    if (topic[0] !== '/') {
56✔
345
        topic = '/' + topic;
17✔
346
    }
347
    const topicInformation = topic.split('/');
56✔
348
    if (topicInformation[1].toLowerCase() === 'json') {
56✔
349
        topicInformation.splice(1, 1);
24✔
350
    }
351
    const apiKey = topicInformation[1];
56✔
352
    const deviceId = topicInformation[2];
56✔
353
    const parsedMessage = parseMessage(message);
56✔
354

355
    function processMessageForDevice(device, apiKey, topicInformation) {
356
        if (topicInformation[3] === 'configuration' && topicInformation[4] === 'commands' && parsedMessage) {
56✔
357
            manageConfigurationRequest(apiKey, deviceId, device, parsedMessage);
5✔
358
        } else if (topicInformation[4]) {
51✔
359
            singleMeasure(apiKey, deviceId, topicInformation[4], device, parsedMessage);
10✔
360
        } else if (topicInformation[3] === constants.CONFIGURATION_COMMAND_UPDATE) {
41✔
361
            for (let i = 0; i < parsedMessage.length; i++) {
3✔
362
                commandHandler.updateCommand(apiKey, deviceId, device, parsedMessage[i]);
3✔
363
            }
364
        } else if (parsedMessage && Array.isArray(parsedMessage) && parsedMessage.every((x) => typeof x === 'object')) {
50✔
365
            // it must be an array of object
366
            multipleMeasures(apiKey, deviceId, device, parsedMessage);
36✔
367
        } else {
368
            context = fillService(context, device);
2✔
369
            config.getLogger().error(
2✔
370
                context,
371
                /*jshint quotmark: double */
372
                "Couldn't process message %s for device %j due to format issues.",
373
                /*jshint quotmark: single */
374
                message,
375
                device
376
            );
377
        }
378
    }
379

380
    function processDeviceMeasure(error, device) {
381
        if (error) {
56!
382
            context = fillService(context, { service: 'n/a', subservice: 'n/a' });
×
383
            config.getLogger().warn(context, 'MEASURES-004: Device not found for topic %s', topic);
×
384
        } else {
385
            const localContext = _.clone(context);
56✔
386

387
            localContext.service = device.service;
56✔
388
            localContext.subservice = device.subservice;
56✔
389

390
            intoTrans(localContext, processMessageForDevice)(device, apiKey, topicInformation);
56✔
391
        }
392
    }
393

394
    iotAgentLib.alarms.release(constants.MQTTB_ALARM);
56✔
395
    iotaUtils.retrieveDevice(deviceId, apiKey, processDeviceMeasure);
56✔
396
}
397

398
function processQueue(topic) {
NEW
399
    if (queues[topic] && queues[topic].length > 0) {
×
NEW
400
        const now = Date.now();
×
NEW
401
        if (now - lastProcessedTime[topic] >= config.getConfig().mqtt.throttleInterval) {
×
NEW
402
            const message = queues[topic].shift();
×
NEW
403
            messageHandler(topic, message);
×
NEW
404
            lastProcessedTime[topic] = now;
×
405
        }
406
    }
NEW
407
    if (queues[topic].length > 0) {
×
NEW
408
        setTimeout(() => processQueue(topic), config.getConfig().mqtt.throttleInterval / 4);
×
409
    }
410
}
411

412
/**
413
 * Handles an incoming AMQP message, extracting the API Key, device Id and attribute to update (in the case of single
414
 * measures) from the AMQP topic.
415
 *
416
 * @param {String} topic        Topic of the form: '/<APIKey>/deviceId/attributes[/<attributeName>]'.
417
 * @param {Object} message      AMQP message body (Object or Buffer, depending on the value).
418
 */
419
function amqpMessageHandler(topic, message) {
420
    regenerateTransid(topic);
8✔
421
    messageHandler(topic, message);
8✔
422
}
423

424
/**
425
 * Handles an incoming MQTT message, extracting the API Key, device Id and attribute to update (in the case of single
426
 * measures) from the MQTT topic.
427
 *
428
 * @param {String} topic        Topic of the form: '/<APIKey>/deviceId/attributes[/<attributeName>]'.
429
 * @param {Object} message      MQTT message body (Object or Buffer, depending on the value).
430
 */
431
function mqttMessageHandler(topic, message) {
432
    regenerateTransid(topic);
48✔
433
    config.getLogger().debug(context, 'message topic: %s', topic);
48✔
434
    if (!config.getConfig().mqtt.throttleInterval || config.getConfig().mqtt.throttleInterval <= 0) {
48!
435
        messageHandler(topic, message);
48✔
436
    } else {
NEW
437
        if (!queues[topic]) {
×
NEW
438
            queues[topic] = [];
×
NEW
439
            lastProcessedTime[topic] = 0;
×
440
        }
NEW
UNCOV
441
        if (config.getConfig().mqtt.maxQueueSize && queues[topic].length >= config.getConfig().mqtt.maxQueueSize) {
×
NEW
UNCOV
442
            config
×
443
                .getLogger()
444
                .info(context, 'discarding measure %s for topic: %s due queue for topic is full', message, topic);
NEW
UNCOV
445
            return;
×
446
        } else {
NEW
UNCOV
447
            queues[topic].push(message);
×
448
        }
NEW
UNCOV
449
        if (queues[topic].length === 1) {
×
NEW
UNCOV
450
            setTimeout(() => processQueue(topic), config.getConfig().mqtt.throttleInterval / 4);
×
451
        }
452
    }
453
}
454

455
exports.amqpMessageHandler = amqpMessageHandler;
1✔
456
exports.mqttMessageHandler = mqttMessageHandler;
1✔
457
exports.messageHandler = messageHandler;
1✔
458
exports.extractAttributes = extractAttributes;
1✔
459
exports.guessType = guessType;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc