• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / iotagent-json / 13830702860

13 Mar 2025 08:58AM UTC coverage: 76.393% (-3.8%) from 80.221%
13830702860

Pull #866

github

web-flow
Merge b939e8fa4 into e53d58c86
Pull Request #866: add throtting (by topic) using a topic based queue (MQTT, HTTP, AMQP)

513 of 771 branches covered (66.54%)

Branch coverage included in aggregate %.

17 of 70 new or added lines in 3 files covered. (24.29%)

1 existing line in 1 file now uncovered.

1105 of 1347 relevant lines covered (82.03%)

134.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.72
/lib/bindings/HTTPBinding.js
1
/*
2
 * Copyright 2016 Telefonica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of iotagent-json
5
 *
6
 * iotagent-json is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * iotagent-json is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with iotagent-json.
18
 * If not, seehttp://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with::[contacto@tid.es]
22
 */
23

24
/* eslint-disable no-unused-vars */
25

26
const fs = require('fs');
1✔
27
const iotAgentLib = require('iotagent-node-lib');
1✔
28
const regenerateTransid = iotAgentLib.regenerateTransid;
1✔
29
const finishSouthBoundTransaction = iotAgentLib.finishSouthBoundTransaction;
1✔
30
const fillService = iotAgentLib.fillService;
1✔
31
const intoTrans = iotAgentLib.intoTrans;
1✔
32
const _ = require('underscore');
1✔
33
const commandHandler = require('../commandHandler');
1✔
34
const async = require('async');
1✔
35
const apply = async.apply;
1✔
36
const request = iotAgentLib.request;
1✔
37
const errors = require('../errors');
1✔
38
const express = require('express');
1✔
39
const iotaUtils = require('../iotaUtils');
1✔
40
const http = require('http');
1✔
41
const https = require('https');
1✔
42
const commonBindings = require('../commonBindings');
1✔
43
const bodyParser = require('body-parser');
1✔
44
require('body-parser-xml')(bodyParser);
1✔
45
const constants = require('../constants');
1✔
46
let context = {
1✔
47
    op: 'IOTAJSON.HTTP.Binding'
48
};
49

50
const config = require('../configService');
1✔
51
let httpBindingServer;
52
const transport = 'HTTP';
1✔
53

54
const { promisify } = require('util');
1✔
55
const json = promisify(bodyParser.json({ strict: false, limit: config.getConfig().iota.expressLimit })); // accept anything JSON.parse accepts.
1✔
56
const text = promisify(bodyParser.text({ limit: config.getConfig().iota.expressLimit }));
1✔
57
const raw = promisify(bodyParser.raw({ limit: config.getConfig().iota.expressLimit }));
1✔
58
const xml2js = require('xml2js');
1✔
59
const xmlStripPrefix = xml2js.processors.stripPrefix;
1✔
60
const xml = promisify(
1✔
61
    bodyParser.xml({
62
        xmlParseOptions: {
63
            // XML namespaces might change from one request to the next.
64
            // It is useful to remove them from the document,
65
            // to be able to refer to tags later in JEXL transformations.
66
            // See https://github.com/Leonidas-from-XIV/node-xml2js/issues/87
67
            tagNameProcessors: [xmlStripPrefix],
68
            attrNameProcessors: [xmlStripPrefix]
69
        }
70
    })
71
);
72

73
let lastProcessedTime = {};
1✔
74
const queues = {};
1✔
75

76
function parserBody() {
77
    // generic bodyParser
78
    return function (req, res, next) {
257✔
79
        if (req.is('text/plain')) {
8✔
80
            text(req, res).then(() => next(), next);
2✔
81
        } else if (req.is('application/octet-stream')) {
6✔
82
            raw(req, res).then(() => next(), next);
2✔
83
        } else if (req.is('application/soap+xml')) {
4✔
84
            xml(req, res).then(() => next(), next);
2✔
85
        } else {
86
            // req.is('json')
87
            json(req, res).then(() => next(), next);
2✔
88
        }
89
    };
90
}
91

92
function checkMandatoryParams(queryPayload) {
93
    return function (req, res, next) {
1,285✔
94
        const notFoundParams = [];
153✔
95
        let error;
96

97
        req.apiKey = req.query.k;
153✔
98
        req.deviceId = req.query.i;
153✔
99
        req.attr = req.params ? req.params.attrValue : undefined;
153!
100

101
        if (!req.apiKey) {
153!
102
            notFoundParams.push('API Key');
×
103
        }
104

105
        if (!req.deviceId) {
153!
106
            notFoundParams.push('Device Id');
×
107
        }
108

109
        // Check if retrievingParam
110
        if (queryPayload && !req.query.d && req.query.getCmd !== '1') {
153!
111
            notFoundParams.push('Payload');
×
112
        }
113
        if (
153!
114
            req.method === 'POST' &&
317✔
115
            !req.is('json') &&
116
            !req.is('text/plain') &&
117
            !req.is('application/octet-stream') &&
118
            !req.is('application/soap+xml')
119
        ) {
120
            error = new errors.UnsupportedType(
×
121
                'application/json, text/plain, application/octet-stream, application/soap+xml'
122
            );
123
        }
124

125
        if (notFoundParams.length !== 0) {
153!
126
            next(new errors.MandatoryParamsNotFound(notFoundParams));
×
127
        } else {
128
            next(error);
153✔
129
        }
130
    };
131
}
132

133
function parseData(req, res, next) {
134
    let data;
135
    let error;
136
    let payload;
137

138
    if (req.body) {
15✔
139
        config.getLogger().debug(context, 'Using body %s', req.body);
14✔
140
        data = req.body;
14✔
141
        regenerateTransid(data);
14✔
142
    } else {
143
        payload = req.query.d;
1✔
144
        regenerateTransid(payload);
1✔
145
        config.getLogger().debug(context, 'Parsing payload %s', payload);
1✔
146

147
        try {
1✔
148
            if (payload) {
1!
149
                data = JSON.parse(payload);
×
150
            }
151
        } catch (e) {
152
            error = e;
×
153
        }
154
    }
155

156
    if (error) {
15!
157
        next(error);
×
158
    } else {
159
        req.jsonPayload = data;
15✔
160
        if (req.body !== undefined) {
15✔
161
            try {
14✔
162
                // This is just for log data
163
                data = data.toString('hex');
14✔
164
            } catch (e) {
165
                // no error should be reported
166
            }
167
        }
168
        config.getLogger().debug(context, 'Parsed data: %j', data);
15✔
169
        next();
15✔
170
    }
171
}
172

173
function parseDataMultipleMeasure(req, res, next) {
174
    let data;
175
    let error;
176
    let payload;
177
    let dataArray;
178
    dataArray = [];
138✔
179
    if (req.body) {
138!
180
        config.getLogger().debug(context, 'Using body %s', req.body);
138✔
181
        if (!Array.isArray(req.body)) {
138✔
182
            dataArray.push(req.body);
122✔
183
        } else {
184
            dataArray = req.body;
16✔
185
        }
186
        regenerateTransid(dataArray);
138✔
187
    } else {
188
        payload = req.query.d;
×
189
        regenerateTransid(payload);
×
190
        config.getLogger().debug(context, 'Parsing payload %s', payload);
×
191
        try {
×
192
            if (payload) {
×
193
                data = JSON.parse(payload);
×
194
                dataArray.push(data);
×
195
            }
196
        } catch (e) {
197
            error = e;
×
198
        }
199
    }
200
    if (error) {
138!
201
        next(error);
×
202
    } else {
203
        req.jsonPayload = dataArray;
138✔
204
        config.getLogger().debug(context, 'Parsed data array: %j', dataArray);
138✔
205
        next();
138✔
206
    }
207
}
208

209
function executeCommand(apiKey, group, device, cmdName, serializedPayload, contentType, callback) {
210
    const options = {
7✔
211
        url: device.endpoint || (group && group.endpoint ? group.endpoint : undefined),
11!
212
        method: 'POST',
213
        body: serializedPayload,
214
        headers: {
215
            'fiware-service': device.service,
216
            'fiware-servicepath': device.subservice,
217
            'content-type': contentType
218
        }
219
    };
220
    if (options.url) {
7!
221
        // endpoint could be an expression
222
        const parser = iotAgentLib.dataPlugins.expressionTransformation;
7✔
223
        let attrList = iotAgentLib.dataPlugins.utils.getIdTypeServSubServiceFromDevice(device);
7✔
224
        attrList = device.staticAttributes ? attrList.concat(device.staticAttributes) : attrList.concat([]);
7!
225
        const ctxt = parser.extractContext(attrList, device);
7✔
226
        config.getLogger().debug(context, 'attrList %j for device %j', attrList, device);
7✔
227
        // expression result will be the full command payload
228
        let endpointRes = null;
7✔
229
        try {
7✔
230
            endpointRes = parser.applyExpression(options.url, ctxt, device);
7✔
231
        } catch (e) {
232
            // no error should be reported
233
        }
234
        options.url = endpointRes ? endpointRes : options.url;
7✔
235
    }
236
    if (config.getConfig().http.timeout) {
7!
237
        options.timeout = config.getConfig().http.timeout;
×
238
    }
239
    config
7✔
240
        .getLogger()
241
        .debug(
242
            context,
243
            'Sending command %s with payload %s and with http options %j',
244
            cmdName,
245
            serializedPayload,
246
            options
247
        );
248
    request(options, function (error, response, body) {
7✔
249
        if (error || !response || (response.statusCode !== 200 && response.statusCode !== 201)) {
7!
250
            callback(new errors.HTTPCommandResponseError(response ? response.statusCode : 400, error));
×
251
        } else if (apiKey) {
7!
252
            config
7✔
253
                .getLogger()
254
                .info(
255
                    context,
256
                    'Cmd: %j was sent to the device %s with http options %j',
257
                    serializedPayload,
258
                    cmdName,
259
                    options
260
                );
261
            process.nextTick(commandHandler.updateCommand.bind(null, apiKey, device.id, device, body));
7✔
262
            callback();
7✔
263
        } else {
264
            config
×
265
                .getLogger()
266
                .info(
267
                    context,
268
                    'Cmd: %j was sent to the device %s with http options %j',
269
                    serializedPayload,
270
                    cmdName,
271
                    options
272
                );
273
            callback();
×
274
        }
275
    });
276
}
277

278
function addTimestamp(req, res, next) {
279
    const arr = req.jsonPayload;
148✔
280
    let timeStampData;
281
    for (const i in arr) {
148✔
282
        if (req.query.t && arr[i]) {
377✔
283
            timeStampData = arr[i];
4✔
284
            timeStampData[constants.TIMESTAMP_ATTRIBUTE] = req.query.t;
4✔
285
        }
286
    }
287
    next();
148✔
288
}
289

290
function handleIncomingMeasure(req, res, next) {
291
    let values;
292
    context = fillService(context, { service: 'n/a', subservice: 'n/a' });
148✔
293
    config
148✔
294
        .getLogger()
295
        .debug(context, 'Processing multiple HTTP measures for device %s with apiKey %s', req.deviceId, req.apiKey);
296

297
    function updateCommandHandler(error) {
298
        if (error) {
147!
299
            next(error);
×
300
            config.getLogger().error(
×
301
                context,
302
                /*jshint quotmark: double */
303
                "MEASURES-002: Couldn't send the updated values to the Context Broker due to an error: %j",
304
                /*jshint quotmark: single */
305
                error
306
            );
307
        } else {
308
            config
147✔
309
                .getLogger()
310
                .info(
311
                    context,
312
                    'Multiple measures for device %s with apiKey %s successfully updated',
313
                    req.deviceId,
314
                    req.apiKey
315
                );
316

317
            finishSouthBoundTransaction(next);
147✔
318
        }
319
    }
320

321
    function processHTTPWithDevice(device) {
322
        let payloadDataArr;
323
        let attributeArr;
324
        let attributeValues;
325
        attributeArr = [];
148✔
326
        payloadDataArr = [];
148✔
327

328
        if (req.attr && req.jsonPayload) {
148✔
329
            config.getLogger().debug(context, 'Parsing attr %s with value %s', req.attr, req.jsonPayload);
8✔
330
            try {
8✔
331
                req.jsonPayload = req.jsonPayload.toString('hex');
8✔
332
            } catch (e) {
333
                // no error should be reported
334
            }
335
            const theAttr = [{ name: req.attr, value: req.jsonPayload, type: 'None' }];
8✔
336
            attributeArr.push(theAttr);
8✔
337
        } else {
338
            if (!Array.isArray(req.jsonPayload)) {
140✔
339
                payloadDataArr.push(req.jsonPayload);
2✔
340
            } else {
341
                payloadDataArr = req.jsonPayload;
138✔
342
            }
343

344
            if (req.jsonPayload) {
140✔
345
                config.getLogger().debug(context, 'Parsing payloadDataArr %j for device %j', payloadDataArr, device);
139✔
346
                for (const i in payloadDataArr) {
139✔
347
                    values = commonBindings.extractAttributes(device, payloadDataArr[i], device.payloadType);
173✔
348
                    if (values && values[0] && values[0][0]) {
173✔
349
                        // Check multimeasure from a ngsiv2/ngsild entities array
350
                        attributeArr = attributeArr.concat(values);
2✔
351
                    } else {
352
                        attributeArr.push(values);
171✔
353
                    }
354
                }
355
            } else {
356
                attributeArr = [];
1✔
357
            }
358
        }
359
        if (attributeArr.length === 0) {
148✔
360
            finishSouthBoundTransaction(next);
1✔
361
        } else {
362
            config
147✔
363
                .getLogger()
364
                .debug(context, 'Processing measure device %s with attributeArr %j', device.name, attributeArr);
365
            if (req.isCommand) {
147✔
366
                const executions = [];
1✔
367
                for (const j in attributeArr) {
1✔
368
                    attributeValues = attributeArr[j];
1✔
369
                    for (const k in attributeValues) {
1✔
370
                        executions.push(
1✔
371
                            iotAgentLib.setCommandResult.bind(
372
                                null,
373
                                device.name,
374
                                config.getConfig().iota.defaultResource,
375
                                req.apiKey,
376
                                attributeValues[k].name,
377
                                attributeValues[k].value,
378
                                constants.COMMAND_STATUS_COMPLETED,
379
                                device
380
                            )
381
                        );
382
                    }
383
                }
384
                async.parallel(executions, updateCommandHandler);
1✔
385
            } else {
386
                iotAgentLib.update(device.name, device.type, '', attributeArr, device, updateCommandHandler);
146✔
387
            }
388
        }
389
    }
390

391
    function processDeviceMeasure(error, device) {
392
        if (error) {
148!
393
            next(error);
×
394
        } else {
395
            const localContext = _.clone(context);
148✔
396
            req.device = device;
148✔
397
            localContext.service = device.service;
148✔
398
            localContext.subservice = device.subservice;
148✔
399
            intoTrans(localContext, processHTTPWithDevice)(device);
148✔
400
        }
401
    }
402

403
    iotaUtils.retrieveDevice(req.deviceId, req.apiKey, processDeviceMeasure);
148✔
404
}
405

406
function handleIncomingMeasureProxy(req, res, next) {
407
    function processQueue(topic) {
NEW
408
        if (queues[topic] && queues[topic].length > 0) {
×
NEW
409
            const now = Date.now();
×
NEW
410
            if (now - lastProcessedTime[topic] >= config.getConfig().http.throttleInterval) {
×
NEW
411
                var item = queues[topic].shift();
×
NEW
412
                var request = item.req;
×
NEW
413
                var response = item.res;
×
NEW
414
                var nextf = item.next;
×
NEW
415
                handleIncomingMeasure(request, response, nextf);
×
NEW
416
                lastProcessedTime[topic] = now;
×
417
            }
418
        }
NEW
419
        if (queues[topic].length > 0) {
×
NEW
420
            setTimeout(() => processQueue(topic), config.getConfig().http.throttleInterval / 4);
×
421
        }
422
    }
423

424
    if (!config.getConfig().http.throttleInterval || config.getConfig().http.throttleInterval <= 0) {
148!
425
        handleIncomingMeasure(req, res, next);
148✔
426
    } else {
NEW
427
        var topic = req.deviceId + '/' + req.apiKey;
×
NEW
428
        if (!queues[topic]) {
×
NEW
429
            queues[topic] = [];
×
NEW
430
            lastProcessedTime[topic] = 0;
×
431
        }
NEW
432
        if (config.getConfig().http.maxQueueSize && queues[topic].length >= config.getConfig().http.maxQueueSize) {
×
NEW
433
            config
×
434
                .getLogger()
435
                .info(
436
                    context,
437
                    'discarding measure for deviceId + apikey: %s due queue for deviceId + apiKey is full',
438
                    topic
439
                );
NEW
440
            return;
×
441
        } else {
NEW
442
            queues[topic].push({ req: req, res: res, next: next });
×
443
        }
NEW
444
        if (queues[topic].length === 1) {
×
NEW
445
            setTimeout(() => processQueue(topic), config.getConfig().http.throttleInterval / 4);
×
446
        }
447
    }
448
}
449

450
function isCommand(req, res, next) {
451
    if (
1!
452
        req.path ===
453
        (config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH) + constants.HTTP_COMMANDS_PATH
1!
454
    ) {
455
        req.isCommand = true;
1✔
456
    }
457

458
    next();
1✔
459
}
460

461
function sendConfigurationToDevice(apiKey, group, deviceId, results, callback) {
462
    function handleDeviceResponse(innerCallback) {
463
        return function (error, response, body) {
4✔
464
            if (error) {
4!
465
                innerCallback(error);
×
466
            } else if (response && response.statusCode !== 200) {
4!
467
                innerCallback(new errors.DeviceEndpointError(response.statusCode, body));
×
468
            } else {
469
                innerCallback();
4✔
470
            }
471
        };
472
    }
473

474
    function sendRequest(device, group, results, innerCallback) {
475
        const resultRequest = {
4✔
476
            url:
477
                (device.endpoint || (group && group.endpoint ? group.endpoint : undefined)) +
4!
478
                constants.HTTP_CONFIGURATION_PATH,
479

480
            method: 'POST',
481
            json: iotaUtils.createConfigurationNotification(results),
482
            headers: {
483
                'fiware-service': device.service,
484
                'fiware-servicepath': device.subservice,
485
                'content-type': 'application/json'
486
            }
487
        };
488

489
        request(resultRequest, handleDeviceResponse(innerCallback));
4✔
490
    }
491
    iotaUtils.retrieveDevice(deviceId, apiKey, function (error, device) {
4✔
492
        if (error) {
4!
493
            callback(error);
×
494
        } else if (!device.endpoint) {
4!
495
            callback(new errors.EndpointNotFound(device.id));
×
496
        } else {
497
            sendRequest(device, group, results, callback);
4✔
498
        }
499
    });
500
}
501

502
function handleConfigurationRequest(req, res, next) {
503
    function replyToDevice(error) {
504
        res.set(constants.X_PROCESSING_TIME, Date.now() - req.startTime);
5✔
505
        if (error) {
5!
506
            res.status(error.code).json(error);
×
507
        } else {
508
            res.status(200).json({});
5✔
509
        }
510
    }
511
    iotaUtils.retrieveDevice(req.deviceId, req.apiKey, function (error, device) {
5✔
512
        if (error) {
5!
513
            next(error);
×
514
        } else {
515
            iotaUtils.manageConfiguration(
5✔
516
                req.apiKey,
517
                req.deviceId,
518
                device,
519
                req.jsonPayload,
520
                sendConfigurationToDevice,
521
                replyToDevice
522
            );
523
        }
524
    });
525
}
526

527
function reqTiming(req, res, next) {
528
    req.startTime = Date.now();
153✔
529
    next();
153✔
530
}
531

532
function handleError(error, req, res, next) {
533
    let code = 500;
×
534

535
    config.getLogger().debug(context, 'Error %s handling request: %s', error.name, error.message);
×
536

537
    if (error.code && String(error.code).match(/^[2345]\d\d$/)) {
×
538
        code = error.code;
×
539
    }
540
    if (error.status && String(error.status).match(/^[2345]\d\d$/)) {
×
541
        code = error.status;
×
542
    }
543

544
    res.set(constants.X_PROCESSING_TIME, Date.now() - req.startTime);
×
545
    res.status(code).json({
×
546
        name: error.name,
547
        message: error.message
548
    });
549
}
550

551
/**
552
 * Just fills in the transport protocol in case there is none and polling if endpoint.
553
 *
554
 * @param {Object} device           Device object containing all the information about the device.
555
 * @param {Object} group            Group object containing all the information about the device.
556
 */
557
function setPollingAndDefaultTransport(device, group, callback) {
558
    config.getLogger().debug(context, 'httpbinding.setPollingAndDefaultTransport device %j group %j', device, group);
164✔
559
    if (!device.transport) {
164✔
560
        device.transport = group && group.transport ? group.transport : config.getConfig().defaultTransport;
88!
561
    }
562

563
    if (device.transport === 'HTTP') {
164✔
564
        if (device.endpoint) {
110✔
565
            device.polling = false;
74✔
566
        } else {
567
            device.polling = !(group && group.endpoint);
36✔
568
        }
569
    }
570
    callback(null, device);
164✔
571
}
572

573
/**
574
 * Device provisioning handler.
575
 *
576
 * @param {Object} device           Device object containing all the information about the provisioned device.
577
 */
578
function deviceProvisioningHandler(device, callback) {
579
    config.getLogger().debug(context, 'httpbinding.deviceProvisioningHandler device %j', device);
160✔
580
    let group = {};
160✔
581
    iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', device.apikey, function (
160!
582
        error,
583
        foundGroup
584
    ) {
585
        if (!error) {
160!
586
            group = foundGroup;
×
587
        }
588
        config.getLogger().debug(context, 'httpbinding.deviceProvisioningHandler group %j', group);
160✔
589
        setPollingAndDefaultTransport(device, group, callback);
160✔
590
    });
591
}
592

593
/**
594
 * Device updating handler. This handler just fills in the transport protocol in case there is none.
595
 *
596
 * @param {Object} device           Device object containing all the information about the updated device.
597
 */
598
function deviceUpdatingHandler(newDevice, oldDevice, callback) {
599
    config
4✔
600
        .getLogger()
601
        .debug(context, 'httpbinding.deviceUpdatingHandler newDevice %j oldDevice %j', newDevice, oldDevice);
602
    let group = {};
4✔
603
    iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', oldDevice.apikey, function (
4!
604
        error,
605
        foundGroup
606
    ) {
607
        if (!error) {
4!
608
            group = foundGroup;
×
609
        }
610
        config.getLogger().debug(context, 'httpbinding.deviceUpdatingHandler group %j', group);
4✔
611
        setPollingAndDefaultTransport(newDevice, group, callback);
4✔
612
    });
613
}
614

615
/**
616
 * This middleware checks whether there is any polling command pending to be sent to the device. If there is some,
617
 * add the command information to the return payload. Otherwise it returns an empty payload.
618
 */
619
function returnCommands(req, res, next) {
620
    function updateCommandStatus(device, commandList) {
621
        context = fillService(context, device);
6✔
622
        function createCommandUpdate(command) {
623
            return apply(
6✔
624
                iotAgentLib.setCommandResult,
625
                device.name,
626
                device.resource,
627
                req.query.k,
628
                command.name,
629
                ' ',
630
                'DELIVERED',
631
                device
632
            );
633
        }
634

635
        function cleanCommand(command) {
636
            return apply(iotAgentLib.removeCommand, device.service, device.subservice, device.id, command.name);
6✔
637
        }
638

639
        const updates = commandList.map(createCommandUpdate);
6✔
640
        const cleanCommands = commandList.map(cleanCommand);
6✔
641
        if (updates) {
6!
642
            async.parallel(updates.concat(cleanCommands), function (error, results) {
6✔
643
                if (error) {
6!
644
                    config
×
645
                        .getLogger()
646
                        .error(
647
                            context,
648
                            'Error updating command status after delivering commands for device %s',
649
                            device.id
650
                        );
651
                } else {
652
                    config
6✔
653
                        .getLogger()
654
                        .debug(
655
                            context,
656
                            'Command status updated successfully after delivering command list to device %s',
657
                            device.id
658
                        );
659
                }
660
            });
661
        }
662
    }
663

664
    function parseCommand(item) {
665
        const result = {};
6✔
666
        const cleanedValue = String(item.value).trim();
6✔
667

668
        if (cleanedValue !== '') {
6!
669
            result[item.name] = item.value;
6✔
670
        }
671
        return result;
6✔
672
    }
673

674
    function concatCommand(previous, current) {
675
        if (previous === {}) {
6!
676
            return current;
×
677
        }
678
        return _.extend(previous, current);
6✔
679
    }
680
    if (req.query && req.query.getCmd === '1') {
148✔
681
        iotAgentLib.commandQueue(req.device.service, req.device.subservice, req.deviceId, function (error, list) {
6✔
682
            res.set(constants.X_PROCESSING_TIME, Date.now() - req.startTime);
6✔
683
            if (error || !list || list.count === 0) {
6!
684
                if (req.accepts('json')) {
×
685
                    res.status(200).send({});
×
686
                } else {
687
                    res.status(200).send('');
×
688
                }
689
            } else {
690
                if (req.accepts('json')) {
6!
691
                    res.status(200).send(list.commands.map(parseCommand).reduce(concatCommand, {}));
6✔
692
                } else {
693
                    res.status(200).send(JSON.stringify(list.commands.map(parseCommand).reduce(concatCommand, {})));
×
694
                }
695
                process.nextTick(updateCommandStatus.bind(null, req.device, list.commands));
6✔
696
            }
697
        });
698
    } else if (req.accepts('json')) {
142!
699
        res.set(constants.X_PROCESSING_TIME, Date.now() - req.startTime);
142✔
700
        res.status(200).send({});
142✔
701
    } else {
702
        res.set(constants.X_PROCESSING_TIME, Date.now() - req.startTime);
×
703
        res.status(200).send('');
×
704
    }
705
}
706

707
function start(callback) {
708
    const baseRoot = '/';
257✔
709

710
    httpBindingServer = {
257✔
711
        server: null,
712
        app: express(),
713
        router: express.Router()
714
    };
715

716
    if (!config.getConfig().http) {
257!
717
        config
×
718
            .getLogger()
719
            .fatal(context, 'GLOBAL-002: Configuration error. Configuration object [config.http] is missing');
720
        callback(new errors.ConfigurationError('config.http'));
×
721
        return;
×
722
    }
723

724
    httpBindingServer.app.set('port', config.getConfig().http.port);
257✔
725
    httpBindingServer.app.set('host', config.getConfig().http.host || '0.0.0.0');
257!
726
    httpBindingServer.app.use(reqTiming);
257✔
727

728
    httpBindingServer.router.get(
257✔
729
        config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH,
257!
730
        checkMandatoryParams(true),
731
        parseData,
732
        addTimestamp,
733
        handleIncomingMeasureProxy,
734
        returnCommands
735
    );
736

737
    httpBindingServer.router.post(
257✔
738
        config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH,
257!
739
        bodyParser.json({ strict: false, limit: config.getConfig().iota.expressLimit }), // accept anything JSON.parse accepts
740
        checkMandatoryParams(false),
741
        parseDataMultipleMeasure,
742
        addTimestamp,
743
        handleIncomingMeasureProxy,
744
        returnCommands
745
    );
746

747
    httpBindingServer.router.post(
257✔
748
        (config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH) +
257!
749
            '/' +
750
            constants.MEASURES_SUFIX +
751
            '/:attrValue',
752
        parserBody(),
753
        checkMandatoryParams(false),
754
        parseData, // non multiple measures are expected in this route
755
        addTimestamp,
756
        handleIncomingMeasureProxy,
757
        returnCommands
758
    );
759

760
    httpBindingServer.router.post(
257✔
761
        (config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH) + constants.HTTP_COMMANDS_PATH,
257!
762
        bodyParser.json({ strict: false, limit: config.getConfig().iota.expressLimit }), // accept anything JSON.parse accepts.
763
        checkMandatoryParams(false),
764
        parseData,
765
        addTimestamp,
766
        isCommand,
767
        handleIncomingMeasureProxy,
768
        returnCommands
769
    );
770

771
    httpBindingServer.router.post(
257✔
772
        (config.getConfig().iota.defaultResource || constants.HTTP_MEASURE_PATH) + constants.HTTP_CONFIGURATION_PATH,
257!
773
        bodyParser.json({ strict: false, limit: config.getConfig().iota.expressLimit }), // accept anything JSON.parse accepts.
774
        checkMandatoryParams(false),
775
        parseData,
776
        handleConfigurationRequest
777
    );
778

779
    httpBindingServer.app.use(baseRoot, httpBindingServer.router);
257✔
780
    httpBindingServer.app.use(handleError);
257✔
781

782
    if (config.getConfig().http && config.getConfig().http.key && config.getConfig().http.cert) {
257!
783
        const privateKey = fs.readFileSync(config.getConfig().http.key, 'utf8');
×
784
        const certificate = fs.readFileSync(config.getConfig().http.cert, 'utf8');
×
785
        const credentials = { key: privateKey, cert: certificate };
×
786

787
        config.getLogger().info(context, 'HTTPS Binding listening on port %s', config.getConfig().http.port);
×
788
        httpBindingServer.server = https.createServer(credentials, httpBindingServer.app);
×
789
    } else {
790
        config.getLogger().info(context, 'HTTP Binding listening on port %s', config.getConfig().http.port);
257✔
791
        httpBindingServer.server = http.createServer(httpBindingServer.app);
257✔
792
    }
793

794
    httpBindingServer.server.listen(httpBindingServer.app.get('port'), httpBindingServer.app.get('host'), callback);
257✔
795
}
796

797
function stop(callback) {
798
    config.getLogger().info(context, 'Stopping JSON HTTP Binding: ');
257✔
799

800
    if (httpBindingServer) {
257!
801
        httpBindingServer.server.close(function () {
257✔
802
            config.getLogger().info(context, 'HTTP Binding Stopped');
257✔
803
            callback();
257✔
804
        });
805
    } else {
806
        callback();
×
807
    }
808
}
809

810
function sendPushNotifications(device, values, callback) {
811
    const executions = _.flatten(values.map(commandHandler.generateCommandExecution.bind(null, null, device)));
×
812

813
    async.series(executions, function (error) {
×
814
        callback(error);
×
815
    });
816
}
817

818
function storePollNotifications(device, values, callback) {
819
    function addPollNotification(item, innerCallback) {
820
        iotAgentLib.addCommand(device.service, device.subservice, device.id, item, innerCallback);
×
821
    }
822

823
    async.map(values, addPollNotification, callback);
×
824
}
825

826
function notificationHandler(device, values, callback) {
827
    if (device.endpoint) {
×
828
        sendPushNotifications(device, values, callback);
×
829
    } else {
830
        storePollNotifications(device, values, callback);
×
831
    }
832
}
833

834
exports.start = start;
1✔
835
exports.stop = stop;
1✔
836
exports.sendConfigurationToDevice = sendConfigurationToDevice;
1✔
837
exports.deviceProvisioningHandler = deviceProvisioningHandler;
1✔
838
exports.deviceUpdatingHandler = deviceUpdatingHandler;
1✔
839
exports.notificationHandler = notificationHandler;
1✔
840
exports.executeCommand = executeCommand;
1✔
841
exports.protocol = 'HTTP';
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc