• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / iotagent-json / 13789149328

11 Mar 2025 01:24PM UTC coverage: 77.155% (-3.0%) from 80.121%
13789149328

Pull #866

github

web-flow
Merge 94c66260f into 11a46f9f3
Pull Request #866: [WIP] add throtting (by topic) using a topic based queue (MQTT, HTTP)

509 of 755 branches covered (67.42%)

Branch coverage included in aggregate %.

13 of 55 new or added lines in 3 files covered. (23.64%)

13 existing lines in 2 files now uncovered.

1102 of 1333 relevant lines covered (82.67%)

134.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.57
/lib/configService.js
1
/*
2
 * Copyright 2016 Telefonica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of iotagent-json
5
 *
6
 * iotagent-json is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * iotagent-json is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with iotagent-json.
18
 * If not, seehttp://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with::[contacto@tid.es]
22
 */
23

24
let config = {};
1✔
25
const fs = require('fs');
1✔
26
let logger = require('logops');
1✔
27
const iotAgentLib = require('iotagent-node-lib');
1✔
28

29
function anyIsSet(variableSet) {
30
    for (let i = 0; i < variableSet.length; i++) {
780✔
31
        if (process.env[variableSet[i]]) {
9,586✔
32
            return true;
3✔
33
        }
34
    }
35

36
    return false;
777✔
37
}
38

39
/**
40
 * For a parameter pointing to a file, check the file exists
41
 *
42
 * @param {string} path        Path to the file
43
 */
44
function fileExists(path) {
45
    try {
5✔
46
        fs.statSync(path);
5✔
47
        logger.debug(path + ' - File exists.');
5✔
48
    } catch (e) {
49
        logger.fatal(path + ' - File does not exist.');
×
50
        throw Error(path + ' - File does not exist.');
×
51
    }
52
}
53

54
function processEnvironmentVariables() {
55
    const environmentVariables = [
260✔
56
        'IOTA_MQTT_PROTOCOL',
57
        'IOTA_MQTT_HOST',
58
        'IOTA_MQTT_PORT',
59
        'IOTA_MQTT_CA',
60
        'IOTA_MQTT_CERT',
61
        'IOTA_MQTT_KEY',
62
        'IOTA_MQTT_REJECT_UNAUTHORIZED',
63
        'IOTA_MQTT_USERNAME',
64
        'IOTA_MQTT_PASSWORD',
65
        'IOTA_MQTT_QOS',
66
        'IOTA_MQTT_RETAIN',
67
        'IOTA_MQTT_RETRIES',
68
        'IOTA_MQTT_RETRY_TIME',
69
        'IOTA_MQTT_KEEPALIVE',
70
        'IOTA_MQTT_AVOID_LEADING_SLASH',
71
        'IOTA_MQTT_CLEAN',
72
        'IOTA_MQTT_CLIENT_ID',
73
        'IOTA_MQTT_DISABLED',
74
        'IOTA_MQTT_THROTTLE_INTERVAL',
75
        'IOTA_MQTT_MAX_QUEUE_SIZE',
76
        'IOTA_AMQP_HOST',
77
        'IOTA_AMQP_PORT',
78
        'IOTA_AMQP_USERNAME',
79
        'IOTA_AMQP_PASSWORD',
80
        'IOTA_AMQP_EXCHANGE',
81
        'IOTA_AMQP_QUEUE',
82
        'IOTA_AMQP_DURABLE',
83
        'IOTA_AMQP_RETRIES',
84
        'IOTA_AMQP_RETRY_TIME',
85
        'IOTA_AMQP_DISABLED',
86
        'IOTA_HTTP_HOST',
87
        'IOTA_HTTP_PORT',
88
        'IOTA_HTTP_TIMEOUT',
89
        'IOTA_HTTP_KEY',
90
        'IOTA_HTTP_CERT',
91
        'IOTA_HTTP_DISABLED',
92
        'IOTA_HTTP_THROTTLE_INTERVAL',
93
        'IOTA_HTTP_MAX_QUEUE_SIZE',
94
        'IOTA_CONFIG_RETRIEVAL',
95
        'IOTA_DEFAULT_KEY',
96
        'IOTA_DEFAULT_TRANSPORT'
97
    ];
98
    const mqttVariables = [
260✔
99
        'IOTA_MQTT_PROTOCOL',
100
        'IOTA_MQTT_HOST',
101
        'IOTA_MQTT_PORT',
102
        'IOTA_MQTT_CA',
103
        'IOTA_MQTT_CERT',
104
        'IOTA_MQTT_KEY',
105
        'IOTA_MQTT_REJECT_UNAUTHORIZED',
106
        'IOTA_MQTT_USERNAME',
107
        'IOTA_MQTT_PASSWORD',
108
        'IOTA_MQTT_QOS',
109
        'IOTA_MQTT_RETAIN',
110
        'IOTA_MQTT_RETRIES',
111
        'IOTA_MQTT_RETRY_TIME',
112
        'IOTA_MQTT_KEEPALIVE',
113
        'IOTA_MQTT_AVOID_LEADING_SLASH',
114
        'IOTA_MQTT_CLEAN',
115
        'IOTA_MQTT_CLIENT_ID',
116
        'IOTA_MQTT_DISABLED',
117
        'IOTA_MQTT_THROTTLE_INTERVAL',
118
        'IOTA_MQTT_MAX_QUEUE_SIZE'
119
    ];
120
    const amqpVariables = [
260✔
121
        'IOTA_AMQP_HOST',
122
        'IOTA_AMQP_PORT',
123
        'IOTA_AMQP_USERNAME',
124
        'IOTA_AMQP_PASSWORD',
125
        'IOTA_AMQP_EXCHANGE',
126
        'IOTA_AMQP_QUEUE',
127
        'IOTA_AMQP_DURABLE',
128
        'IOTA_AMQP_RETRIES',
129
        'IOTA_AMQP_RETRY_TIME',
130
        'IOTA_AMQP_DISABLED'
131
    ];
132
    const httpVariables = [
260✔
133
        'IOTA_HTTP_HOST',
134
        'IOTA_HTTP_PORT',
135
        'IOTA_HTTP_TIMEOUT',
136
        'IOTA_HTTP_KEY',
137
        'IOTA_HTTP_CERT',
138
        'IOTA_HTTP_THROTTLE_INTERVAL',
139
        'IOTA_HTTP_MAX_QUEUE_SIZE'
140
    ];
141

142
    const protectedVariables = [
260✔
143
        'IOTA_MQTT_KEY',
144
        'IOTA_MQTT_USERNAME',
145
        'IOTA_MQTT_PASSWORD',
146
        'IOTA_AMQP_USERNAME',
147
        'IOTA_AMQP_PASSWORD'
148
    ];
149
    // Substitute Docker Secret Variables where set.
150
    protectedVariables.forEach((key) => {
260✔
151
        iotAgentLib.configModule.getSecretData(key);
1,300✔
152
    });
153
    environmentVariables.forEach((key) => {
260✔
154
        let value = process.env[key];
10,660✔
155
        if (value) {
10,660✔
156
            if (key.endsWith('USERNAME') || key.endsWith('PASSWORD') || key.endsWith('KEY')) {
30✔
157
                value = '********';
6✔
158
            }
159
            logger.info('Setting %s to environment value: %s', key, value);
30✔
160
        }
161
    });
162

163
    if (process.env.IOTA_CONFIG_RETRIEVAL) {
260!
UNCOV
164
        config.configRetrieval = process.env.IOTA_CONFIG_RETRIEVAL;
×
165
    }
166
    if (process.env.IOTA_DEFAULT_KEY) {
260!
UNCOV
167
        config.defaultKey = process.env.IOTA_DEFAULT_KEY;
×
168
    }
169
    if (process.env.IOTA_DEFAULT_TRANSPORT) {
260!
UNCOV
170
        config.defaultTransport = process.env.IOTA_DEFAULT_TRANSPORT;
×
171
    }
172

173
    if (anyIsSet(mqttVariables)) {
260✔
174
        config.mqtt = config.mqtt || {};
1!
175
    }
176

177
    if (process.env.IOTA_MQTT_PROTOCOL) {
260✔
178
        config.mqtt.protocol = process.env.IOTA_MQTT_PROTOCOL;
1✔
179
    }
180

181
    if (process.env.IOTA_MQTT_HOST) {
260✔
182
        config.mqtt.host = process.env.IOTA_MQTT_HOST;
1✔
183
    }
184

185
    if (process.env.IOTA_MQTT_PORT) {
260✔
186
        config.mqtt.port = process.env.IOTA_MQTT_PORT;
1✔
187
    }
188

189
    if (process.env.IOTA_MQTT_CA) {
260✔
190
        fileExists(process.env.IOTA_MQTT_CA);
1✔
191
        config.mqtt.ca = process.env.IOTA_MQTT_CA;
1✔
192
    }
193

194
    if (process.env.IOTA_MQTT_CERT) {
260✔
195
        fileExists(process.env.IOTA_MQTT_CERT);
1✔
196
        config.mqtt.cert = process.env.IOTA_MQTT_CERT;
1✔
197
    }
198

199
    if (process.env.IOTA_MQTT_KEY) {
260✔
200
        fileExists(process.env.IOTA_MQTT_KEY);
1✔
201
        config.mqtt.key = process.env.IOTA_MQTT_KEY;
1✔
202
    }
203

204
    // Since default is true, need to be able accept "false" as
205
    // a valid Environment variable
206
    if (process.env.IOTA_MQTT_REJECT_UNAUTHORIZED !== undefined) {
260✔
207
        config.mqtt.rejectUnauthorized = process.env.IOTA_MQTT_REJECT_UNAUTHORIZED.trim().toLowerCase() === 'true';
1✔
208
    }
209

210
    if (process.env.IOTA_MQTT_USERNAME) {
260✔
211
        config.mqtt.username = process.env.IOTA_MQTT_USERNAME;
1✔
212
    }
213

214
    if (process.env.IOTA_MQTT_PASSWORD) {
260✔
215
        config.mqtt.password = process.env.IOTA_MQTT_PASSWORD;
1✔
216
    }
217

218
    if (process.env.IOTA_MQTT_QOS) {
260✔
219
        config.mqtt.qos = process.env.IOTA_MQTT_QOS;
1✔
220
    }
221

222
    if (process.env.IOTA_MQTT_RETAIN) {
260✔
223
        config.mqtt.retain = process.env.IOTA_MQTT_RETAIN.trim().toLowerCase() === 'true';
1✔
224
    }
225

226
    if (process.env.IOTA_MQTT_RETRIES) {
260✔
227
        config.mqtt.retries = process.env.IOTA_MQTT_RETRIES;
1✔
228
    }
229

230
    if (process.env.IOTA_MQTT_RETRY_TIME) {
260✔
231
        config.mqtt.retryTime = process.env.IOTA_MQTT_RETRY_TIME;
1✔
232
    }
233

234
    if (process.env.IOTA_MQTT_KEEPALIVE) {
260✔
235
        config.mqtt.keepalive = process.env.IOTA_MQTT_KEEPALIVE;
1✔
236
    }
237

238
    if (process.env.IOTA_MQTT_AVOID_LEADING_SLASH) {
260!
UNCOV
239
        config.mqtt.avoidLeadingSlash = process.env.IOTA_MQTT_AVOID_LEADING_SLASH;
×
240
    }
241

242
    if (process.env.IOTA_MQTT_CLEAN) {
260!
243
        config.mqtt.clean = process.env.IOTA_MQTT_CLEAN.trim().toLowerCase() === 'true';
×
244
    }
245

246
    if (process.env.IOTA_MQTT_CLIENT_ID) {
260!
247
        config.mqtt.clientId = process.env.IOTA_MQTT_CLIENT_ID;
×
248
    }
249

250
    if (process.env.IOTA_MQTT_DISABLED && process.env.IOTA_MQTT_DISABLED.trim().toLowerCase() === 'true') {
260✔
251
        config.mqtt.disabled = true;
1✔
252
    }
253

254
    if (process.env.IOTA_MQTT_THROTTLE_INTERVAL) {
260!
NEW
UNCOV
255
        config.mqtt.throttleInterval = process.env.IOTA_MQTT_THROTTLE_INTERVAL;
×
256
    }
257

258
    if (process.env.IOTA_MQTT_MAX_QUEUE_SIZE) {
260!
NEW
259
        config.mqtt.maxQueueSize = process.env.IOTA_MQTT_MAX_QUEUE_SIZE;
×
260
    }
261

262
    if (anyIsSet(amqpVariables)) {
260✔
263
        config.amqp = config.amqp || {};
1!
264
    }
265

266
    if (process.env.IOTA_AMQP_HOST) {
260✔
267
        config.amqp.host = process.env.IOTA_AMQP_HOST;
1✔
268
    }
269

270
    if (process.env.IOTA_AMQP_PORT) {
260✔
271
        config.amqp.port = process.env.IOTA_AMQP_PORT;
1✔
272
    }
273

274
    if (process.env.IOTA_AMQP_USERNAME) {
260✔
275
        config.amqp.username = process.env.IOTA_AMQP_USERNAME;
1✔
276
    }
277

278
    if (process.env.IOTA_AMQP_PASSWORD) {
260✔
279
        config.amqp.password = process.env.IOTA_AMQP_PASSWORD;
1✔
280
    }
281

282
    if (process.env.IOTA_AMQP_EXCHANGE) {
260✔
283
        config.amqp.exchange = process.env.IOTA_AMQP_EXCHANGE;
1✔
284
    }
285

286
    if (process.env.IOTA_AMQP_QUEUE) {
260✔
287
        config.amqp.queue = process.env.IOTA_AMQP_QUEUE;
1✔
288
    }
289

290
    if (process.env.IOTA_AMQP_DURABLE) {
260✔
291
        config.amqp.options = {};
1✔
292
        config.amqp.options.durable = process.env.IOTA_AMQP_DURABLE.trim().toLowerCase() === 'true';
1✔
293
    }
294

295
    if (process.env.IOTA_AMQP_RETRIES) {
260✔
296
        config.amqp.retries = process.env.IOTA_AMQP_RETRIES;
1✔
297
    }
298

299
    if (process.env.IOTA_AMQP_RETRY_TIME) {
260✔
300
        config.amqp.retryTime = process.env.IOTA_AMQP_RETRY_TIME;
1✔
301
    }
302

303
    if (process.env.IOTA_AMQP_DISABLED && process.env.IOTA_AMQP_DISABLED.trim().toLowerCase() === 'true') {
260✔
304
        config.amqp.disabled = true;
1✔
305
    }
306

307
    if (anyIsSet(httpVariables)) {
260✔
308
        config.http = {};
1✔
309
    }
310

311
    if (process.env.IOTA_HTTP_HOST) {
260✔
312
        config.http.host = process.env.IOTA_HTTP_HOST;
1✔
313
    }
314

315
    if (process.env.IOTA_HTTP_PORT) {
260✔
316
        config.http.port = process.env.IOTA_HTTP_PORT;
1✔
317
    }
318

319
    if (process.env.IOTA_HTTP_TIMEOUT) {
260✔
320
        config.http.timeout = process.env.IOTA_HTTP_TIMEOUT;
1✔
321
    }
322

323
    if (process.env.IOTA_HTTP_KEY) {
260✔
324
        fileExists(process.env.IOTA_HTTP_KEY);
1✔
325
        config.http.key = process.env.IOTA_HTTP_KEY;
1✔
326
    }
327

328
    if (process.env.IOTA_HTTP_CERT) {
260✔
329
        fileExists(process.env.IOTA_HTTP_KEY);
1✔
330
        config.http.cert = process.env.IOTA_HTTP_CERT;
1✔
331
    }
332

333
    if (process.env.IOTA_HTTP_THROTTLE_INTERVAL) {
260!
NEW
UNCOV
334
        config.http.throttleInterval = process.env.IOTA_HTTP_THROTTLE_INTERVAL;
×
335
    }
336

337
    if (process.env.IOTA_HTTP_MAX_QUEUE_SIZE) {
260!
NEW
UNCOV
338
        config.http.maxQueueSize = process.env.IOTA_HTTP_MAX_QUEUE_SIZE;
×
339
    }
340
}
341

342
function setConfig(newConfig) {
343
    config = newConfig;
260✔
344

345
    processEnvironmentVariables();
260✔
346
}
347

348
function getConfig() {
349
    return config;
5,983✔
350
}
351

352
function setLogger(newLogger) {
353
    logger = newLogger;
257✔
354
}
355

356
function getLogger() {
357
    return logger;
13,334✔
358
}
359

360
exports.setConfig = setConfig;
1✔
361
exports.getConfig = getConfig;
1✔
362
exports.setLogger = setLogger;
1✔
363
exports.getLogger = getLogger;
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc