• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

conwetlab / ngsi-proxy / 4328917822

pending completion
4328917822

Pull #25

github

GitHub
Merge 175e2b508 into 3462fb2d0
Pull Request #25: Bump minimist from 1.2.5 to 1.2.8

51 of 73 branches covered (69.86%)

Branch coverage included in aggregate %.

249 of 277 relevant lines covered (89.89%)

12.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.82
/logic.js
1
/*
2
 *     Copyright (c) 2014-2017 CoNWeT Lab., Universidad Politécnica de Madrid
3
 *     Copyright (c) 2018-2021 Future Internet Consulting and Development Solutions S.L.
4
 *
5
 *     This file is part of ngsi-proxy.
6
 *
7
 *     Ngsi-proxy is free software: you can redistribute it and/or modify it
8
 *     under the terms of the GNU Affero General Public License as published by
9
 *     the Free Software Foundation, either version 3 of the License, or (at
10
 *     your option) any later version.
11
 *
12
 *     Ngsi-proxy is distributed in the hope that it will be useful, but
13
 *     WITHOUT ANY WARRANTY; without even the implied warranty of
14
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
15
 *     General Public License for more details.
16
 *
17
 *     You should have received a copy of the GNU Affero General Public License
18
 *     along with ngsi-proxy. If not, see <http://www.gnu.org/licenses/>.
19
 *
20
 *     Linking this library statically or dynamically with other modules is
21
 *     making a combined work based on this library.  Thus, the terms and
22
 *     conditions of the GNU Affero General Public License cover the whole
23
 *     combination.
24
 *
25
 *     As a special exception, the copyright holders of this library give you
26
 *     permission to link this library with independent modules to produce an
27
 *     executable, regardless of the license terms of these independent
28
 *     modules, and to copy and distribute the resulting executable under
29
 *     terms of your choice, provided that you also meet, for each linked
30
 *     independent module, the terms and conditions of the license of that
31
 *     module.  An independent module is a module which is not derived from
32
 *     or based on this library.  If you modify this library, you may extend
33
 *     this exception to your version of the library, but you are not
34
 *     obligated to do so.  If you do not wish to do so, delete this
35
 *     exception statement from your version.
36
 *
37
 */
38

39
"use strict";
40

41
const uuid = require('uuid/v1');
4✔
42

43
let RECONNECTION_TIMEOUT = Number(process.env.RECONNECTION_TIMEOUT);
4✔
44
if (Number.isNaN(RECONNECTION_TIMEOUT)) {
4!
45
    // Default value: 24h
46
    RECONNECTION_TIMEOUT = 24 * 60 * 1000;
4✔
47
} else if (RECONNECTION_TIMEOUT < 30000) {
×
48
    // Minimun value: 30s
49
    RECONNECTION_TIMEOUT = 30000;
×
50
}
51

52
const connections = {};
4✔
53
const callbacks = {};
4✔
54

55
const createConnection = function createConnection() {
4✔
56
    const id = uuid();
56✔
57
    const connection = {
56✔
58
        id: id,
59
        client_ip: null,
60
        close_timestamp: null,
61
        reconnection_count: 0,
62
        response: null,
63
        callbacks: {}
64
    };
65
    connections[id] = connection;
56✔
66

67
    console.log('Created connection with id: ' + connection.id);
56✔
68
    return connection;
56✔
69
};
70

71
const createCallback = function createCallback(connection) {
4✔
72
    const id = uuid();
24✔
73
    const callback_info = callbacks[id] = connection.callbacks[id] = {
24✔
74
        id: id,
75
        connection: connection,
76
        notification_counter: 0
77
    };
78

79
    console.log('Created callback with id: ' + id);
24✔
80
    return callback_info;
24✔
81
};
82

83
const removeCallback = function removeCallback(id) {
4✔
84
    const callback_info = callbacks[id];
8✔
85

86
    delete callback_info.connection.callbacks[id];
8✔
87
    delete callbacks[id];
8✔
88
    callback_info.connection = null;
8✔
89

90
    console.log('Deleted callback with id: ' + id);
8✔
91
};
92

93

94
const URL = require('url');
4✔
95
const build_absolute_url = function build_absolute_url(req, url) {
4✔
96
    const protocol = req.protocol;
76✔
97
    const domain = req.hostname;
76✔
98
    const path = req.url;
76✔
99
    const port = (process.env.TRUST_PROXY_HEADERS && req.get('X-Forwarded-Port')) || req.socket.localPort;
76!
100

101
    if (protocol === "http" && port !== 80 || protocol === "https" && port !== 443) {
76!
102
        return URL.resolve(protocol + "://" + domain + ':' + port + path, url);
76✔
103
    } else {
104
        return URL.resolve(protocol + "://" + domain + path, url);
×
105
    }
106
};
107

108
exports.options_eventsource = function options_eventsource(req, res) {
4✔
109
    const origin = req.header('Origin');
4✔
110
    if (origin != null) {
4!
111
        res.header('Access-Control-Allow-Origin', origin);
4✔
112
        res.header('Access-Control-Allow-Methods', 'POST, OPTIONS');
4✔
113
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
114
        res.header('Access-Control-Expose-Headers', 'Location');
4✔
115
    }
116
    res.header('Connection', 'keep-alive');
4✔
117
    res.header('Content-Length', '0');
4✔
118
    res.sendStatus(204);
4✔
119
};
120

121
exports.options_eventsource_entry = function options_eventsource_entry(req, res) {
4✔
122
    const origin = req.header('Origin');
4✔
123
    if (origin != null) {
4!
124
        res.header('Access-Control-Allow-Origin', origin);
4✔
125
        res.header('Access-Control-Allow-Methods', 'GET, DELETE');
4✔
126
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
127
    }
128
    res.header('Connection', 'keep-alive');
4✔
129
    res.header('Content-Length', '0');
4✔
130
    res.sendStatus(204);
4✔
131
};
132

133
exports.list_eventsources = function list_eventsources(req, res) {
4✔
134
    res.writeHead(200, {'Content-Type': 'application/xhtml+xml; charset=UTF-8'});
4✔
135
    let content = '<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhtml"><head></head><body>';
4✔
136
    content += '<h1>Current connections</h1>'
4✔
137
    if (Object.keys(connections).length > 0) {
4!
138
        content += '<ul>';
4✔
139
        for (const connection_id in connections) {
4✔
140
            const connection = connections[connection_id];
29✔
141
            content += '<li><b>' + connection_id + '</b>. ';
29✔
142

143
            content += 'The client has started the connection ' + connection.reconnection_count + ' times and is currently '
29✔
144
            if (connection.response != null) {
29✔
145
                content += ' connected (' + connection.client_ip + '). ';
9✔
146
            } else {
147
                content += ' not connected. ';
20✔
148
            }
149

150
            const callback_count = Object.keys(connection.callbacks).length;
29✔
151

152
            if (callback_count > 0) {
29✔
153
                content += callback_count + ' callbacks:';
10✔
154

155
                content += '<ul>';
10✔
156
                for (const callback_id in connection.callbacks) {
10✔
157
                    content += '<li><b>' + callback_id + '</b> (' + connection.callbacks[callback_id].notification_counter + ' received notifications)</li>';
10✔
158
                }
159
                content += '</ul>';
10✔
160
            } else {
161
                content += "No callbacks";
19✔
162
            }
163

164
            content += '</li>';
29✔
165
        }
166
        content += '</ul>';
4✔
167
    } else {
168
        content += 'Currently there is not connection';
×
169
    }
170
    content += '</body></html>';
4✔
171
    res.write(content);
4✔
172
    res.end();
4✔
173
};
174

175
exports.create_eventsource = function create_eventsource(req, res) {
4✔
176

177
    const origin = req.header('Origin');
44✔
178
    const connection = createConnection(origin);
44✔
179

180
    const url = build_absolute_url(req, '/eventsource/' + connection.id);
44✔
181

182
    res.header('Cache-Control', 'no-cache');
44✔
183
    res.header('Connection', 'keep-alive');
44✔
184
    if (origin != null) {
44✔
185
        res.header('Access-Control-Allow-Origin', origin);
4✔
186
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
187
        res.header('Access-Control-Expose-Headers', 'Location');
4✔
188
    }
189
    res.header('Content-Type', 'application/json');
44✔
190
    res.location(url);
44✔
191
    res.status(201).send(JSON.stringify({
44✔
192
        connection_id: connection.id,
193
        url: url
194
    }));
195
};
196

197
exports.eventsource = function eventsource(req, res) {
4✔
198
    const connection = connections[req.params.id];
12✔
199
    const origin = req.header('Origin');
12✔
200

201
    if (origin != null) {
12✔
202
        res.header('Access-Control-Allow-Origin', origin);
4✔
203
    }
204
    if (connection == null) {
12✔
205
        // https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events-intro
206
        return res.sendStatus(204);
4✔
207
    }
208

209
    res.header('Cache-Control', 'no-cache');
8✔
210
    res.header('Connection', 'keep-alive');
8✔
211
    // Forbid Nginx buffering
212
    res.header('X-Accel-Buffering', 'no');
8✔
213
    req.socket.setTimeout(0);
8✔
214

215
    if (connection.response != null) {
8!
216
        console.log('A client is currently connected to this eventsource. Closing connection with the old client (' + connection.client_ip + ').');
×
217
        try {
×
218
            connection.response.removeListener('close', connection.close_listener);
×
219
        } catch (e) {}
220
        try {
×
221
            connection.response.end();
×
222
        } catch (e) {}
223
    }
224
    connection.response = res;
8✔
225
    connection.client_ip = req.connection.remoteAddress;
8✔
226
    connection.reconnection_count++;
8✔
227
    connection.close_listener = function close_listener() {
8✔
228
        console.log('Client closed connection with eventsource: ' + connection.id);
×
229
        connection.response = null;
×
230
        connection.client_ip = null;
×
231
        connection.close_timestamp = new Date();
×
232
    };
233

234
    res.header('Content-Type', 'text/event-stream');
8✔
235
    res.write('event: init\n');
8✔
236
    res.write('retry: 10\n');
8✔
237
    res.write('data: ' + JSON.stringify({
8✔
238
        id: connection.id,
239
        url: build_absolute_url(req, '/eventsource/' + connection.id)
240
    }).toString('utf8') + '\n\n');
241

242
    // Force sending init event
243
    res.flush();
8✔
244
    res.on('close', connection.close_listener);
8✔
245
};
246

247
exports.delete_eventsource = function delete_eventsource(req, res) {
4✔
248
    const connection = connections[req.params.id];
16✔
249
    const origin = req.header('Origin');
16✔
250

251
    if (origin != null) {
16✔
252
        res.header('Access-Control-Allow-Origin', origin);
4✔
253
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
254
    }
255
    if (connection == null) {
16✔
256
        return res.sendStatus(404);
4✔
257
    }
258

259
    console.log('Deleting subscription ' + req.params.id);
12✔
260
    delete connections[req.params.id];
12✔
261

262
    if (connection.response != null) {
12!
263
        console.log('A client is currently connected to this eventsource. Closing connection with (' + connection.client_ip + ').');
×
264
        try {
×
265
            connection.response.removeListener('close', connection.close_listener);
×
266
        } catch (e) {}
267
        try {
×
268
            connection.response.end();
×
269
        } catch (e) {}
270
    }
271

272
    for (const callback_id in connection.callbacks) {
12✔
273
        console.log('Deleting callback ' + callback_id);
4✔
274
        delete callbacks[callback_id];
4✔
275
    }
276

277
    res.header('Content-Length', '0');
12✔
278
    res.sendStatus(204);
12✔
279
};
280

281
exports.options_callbacks = function options_callbacks(req, res) {
4✔
282
    const origin = req.header('Origin');
4✔
283
    if (origin != null) {
4!
284
        res.header('Access-Control-Allow-Origin', origin);
4✔
285
        res.header('Access-Control-Allow-Methods', 'OPTIONS, POST');
4✔
286
        res.header('Access-Control-Allow-Headers', 'Content-Type, X-Requested-With');
4✔
287
        res.header('Access-Control-Expose-Headers', 'Location');
4✔
288
    }
289
    res.header('Cache-Control', 'no-cache');
4✔
290
    res.header('Connection', 'keep-alive');
4✔
291
    res.header('Content-Length', '0');
4✔
292
    res.sendStatus(204);
4✔
293
};
294

295
exports.create_callback = function create_callback(req, res) {
4✔
296
    res.header('Cache-Control', 'no-cache');
36✔
297
    res.header('Connection', 'keep-alive');
36✔
298
    res.header('Content-Length', '0');
36✔
299

300
    const origin = req.header('Origin');
36✔
301
    if (origin != null) {
36✔
302
        res.header('Access-Control-Allow-Origin', origin);
4✔
303
        res.header('Access-Control-Allow-Headers', 'Content-Type, X-Requested-With');
4✔
304
        res.header('Access-Control-Expose-Headers', 'Location');
4✔
305
    }
306

307
    let buf = '';
36✔
308
    req.setEncoding('utf8');
36✔
309
    req.on('data', function (chunck) { buf += chunck; });
36✔
310
    req.on('end', function () {
36✔
311
        buf = buf.trim();
36✔
312

313
        if (buf.length === 0) {
36✔
314
            res.status(400).send('invalid json: empty request body');
4✔
315
            return;
4✔
316
        }
317

318
        let data;
32✔
319
        try {
32✔
320
            data = JSON.parse(buf);
32✔
321
        } catch (e) {
322
            res.status(400).send('invalid json: ' + e);
4✔
323
            return;
4✔
324
        }
325

326
        const connection = connections[data.connection_id];
28✔
327

328
        if (connection == null) {
28✔
329
            res.sendStatus(404);
4✔
330
            return;
4✔
331
        }
332
        const callback_info = createCallback(connection);
24✔
333
        const url = build_absolute_url(req, '/callbacks/' + callback_info.id);
24✔
334
        res.header('Content-Type', 'application/json');
24✔
335
        res.location(url);
24✔
336
        res.status(201).send(JSON.stringify({
24✔
337
            callback_id: callback_info.id,
338
            url: url
339
        }));
340
    });
341
};
342

343
exports.process_callback = function process_callback(req, res) {
4✔
344

345
    if (!(req.params.id in callbacks)) {
8✔
346
        res.sendStatus(404);
4✔
347
        return;
4✔
348
    }
349

350
    console.log('Processing callback ' + req.params.id);
4✔
351
    const connection = callbacks[req.params.id].connection;
4✔
352

353
    let buf = '';
4✔
354
    req.on('data', function (chunck) { buf += chunck; });
4✔
355
    req.on('end', function () {
4✔
356
        const eventsource = connection.response;
4✔
357

358
        if (eventsource != null) {
4!
359
            const data = JSON.stringify({
4✔
360
                callback_id: req.params.id,
361
                payload: buf,
362
                headers: req.headers
363
            }).toString('utf8');
364
            eventsource.write('event: notification\n');
4✔
365
            eventsource.write('data: ' + data + '\n\n');
4✔
366
            // Send this event
367
            eventsource.flush();
4✔
368
        } else {
369
            console.log('Ignoring notification as the client is not connected');
×
370
        }
371

372
        res.header('Content-Length', '0');
4✔
373
        res.sendStatus(204);
4✔
374
        callbacks[req.params.id].notification_counter++;
4✔
375
    });
376
};
377

378
exports.options_callback_entry = function options_callback_entry(req, res) {
4✔
379
    const origin = req.header('Origin');
4✔
380
    if (origin != null) {
4!
381
        res.header('Access-Control-Allow-Origin', origin);
4✔
382
        res.header('Access-Control-Allow-Methods', 'DELETE, OPTIONS, POST');
4✔
383
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
384
    }
385
    res.header('Cache-Control', 'no-cache');
4✔
386
    res.header('Connection', 'keep-alive');
4✔
387
    res.header('Content-Length', '0');
4✔
388
    res.sendStatus(204);
4✔
389
};
390

391
exports.delete_callback = function delete_callback(req, res) {
4✔
392
    console.log('Deleting callback ' + req.params.id);
12✔
393

394
    const origin = req.header('Origin');
12✔
395
    if (origin != null) {
12✔
396
        res.header('Access-Control-Allow-Origin', origin);
4✔
397
        res.header('Access-Control-Allow-Headers', 'X-Requested-With');
4✔
398
    }
399

400
    if (!(req.params.id in callbacks)) {
12✔
401
        res.sendStatus(404);
4✔
402
        return;
4✔
403
    }
404

405
    removeCallback(req.params.id);
8✔
406
    res.header('Cache-Control', 'no-cache');
8✔
407
    res.header('Connection', 'keep-alive');
8✔
408
    res.header('Content-Length', '0');
8✔
409
    res.sendStatus(204);
8✔
410
};
411

412
exports.heartbeat = function heartbeat() {
4✔
413
    const now = new Date();
12✔
414

415
    console.log("Checking current connections:");
12✔
416
    Object.values(connections).forEach((connection) => {
12✔
417
        const eventsource = connection.response;
46✔
418

419
        if (eventsource != null) {
46✔
420
            console.log(`  - Sending heartbeat message to eventsource ${connection.id}`);
18✔
421
            // Send a heartbeat message
422
            eventsource.write('; heartbeat\n');
18✔
423
            eventsource.flush();
18✔
424
        } else if ((now - connection.close_timestamp) > RECONNECTION_TIMEOUT) {
28✔
425
            console.log(`  - Closing dead connection ${connection.id}`);
16✔
426
            delete connections[connection.id];
16✔
427
            for (const callback_id in connection.callbacks) {
16✔
428
                delete callbacks[callback_id];
4✔
429
            }
430
        }
431
    });
432
};
433

434
exports.createConnection = createConnection;
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc