• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / fiware-sth-comet / 9171759696

21 May 2024 08:50AM UTC coverage: 81.86% (-0.05%) from 81.909%
9171759696

push

github

web-flow
Merge pull request #637 from telefonicaid/dependabot/pip/test/acceptance/requests-2.32.0

Bump requests from 2.31.0 to 2.32.0 in /test/acceptance

1235 of 1626 branches covered (75.95%)

Branch coverage included in aggregate %.

2127 of 2481 relevant lines covered (85.73%)

141.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.51
/lib/database/sthDatabase.js
1
/*
2
 * Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of the Short Time Historic (STH) component
5
 *
6
 * STH is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * STH is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with STH.
18
 * If not, see http://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with: [german.torodelvalle@telefonica.com]
22
 */
23

24
/* eslint-disable consistent-return */
25

26
Object.assign = Object.assign || require('object-assign');
1!
27

28
const ROOT_PATH = require('app-root-path').toString();
1✔
29
const sthLogger = require('logops');
1✔
30
const sthConfig = require(ROOT_PATH + '/lib/configuration/sthConfiguration.js');
1✔
31
const sthUtils = require(ROOT_PATH + '/lib/utils/sthUtils.js');
1✔
32
const sthDatabaseNaming = require(ROOT_PATH + '/lib/database/model/sthDatabaseNaming');
1✔
33
const mongoClient = require('mongodb').MongoClient;
1✔
34
const boom = require('boom');
1✔
35
const jsoncsv = require('json-csv');
1✔
36
const fs = require('fs');
1✔
37
const path = require('path');
1✔
38
const mkdirp = require('mkdirp');
1✔
39
const async = require('async');
1✔
40
const _ = require('lodash');
1✔
41

42
let db;
1✔
43
let client;
1✔
44
let connectionURL;
1✔
45
let count = 0;
1✔
46

47
/**
48
 * Returns the options to use for the CSV file generation
49
 * @param attrName The attribute name
50
 * @return {{fields: *[], fieldSeparator: string}} The options to use
51
 */
52
function getJSONCSVOptions(attrName) {
1✔
53
    const jsonCSVOptions = {
36✔
54
        fields: [
55
            {
56
                name: 'attrName',
57
                label: 'attrName'
58
            },
59
            {
60
                name: 'attrType',
61
                label: 'attrType'
62
            },
63
            {
64
                name: 'attrValue',
65
                label: 'attrValue'
66
            },
67
            {
68
                name: 'recvTime',
69
                label: 'recvTime'
70
            }
71
        ],
72
        fieldSeparator: ','
73
    };
74
    jsonCSVOptions.fields[0].filter = function() {
36✔
75
        return attrName;
90✔
76
    };
77
    return jsonCSVOptions;
36✔
78
}
79

80
/**
81
 * Connects to a (MongoDB) database endpoint asynchronously
82
 * @param {object} params It is an object including the following properties:
83
 *  - {string} authentication The authentication schema to use for the connection
84
 *  - {string} dbURI The database URI
85
 *  - {string} replicaSet The replica set name, if any
86
 *  - {string} database The name of the database to connect to
87
 *  - {string} authSource The name of the database for the authorization
88
 *  - {Number} poolSize The size of the pool of connections to the database
89
 *  - {Number} reconnectTries Server attempt to reconnect #times
90
 *  - {Number} reconnectInterval Server will wait # milliseconds between retries.
91
 * @param {Function} callback A callback to inform about the result of the operation
92
 */
93
function connect(params, callback) {
1✔
94
    connectionURL =
42✔
95
        // prettier-ignore
96
        'mongodb://' + (params.authentication ? params.authentication + '@' : '') + params.dbURI + '/' +
42!
97
            (params.replicaSet ? '?replicaSet=' + params.replicaSet : '') +
42!
98
            (params.authSource ? (params.replicaSet ? '&authSource=' +
42!
99
                params.authSource : '?authSource=' + params.authSource) : '');
100

101
    sthLogger.info(
42✔
102
        sthConfig.LOGGING_CONTEXT.DB_CONN_OPEN,
103
        'Establishing connection to the database at %s',
104
        connectionURL
105
    );
106
    mongoClient.connect(
42✔
107
        connectionURL,
108
        {
109
            poolSize: params.poolSize,
110
            reconnectTries: params.reconnectTries,
111
            reconnectInterval: params.reconnectInterval
112
        },
113
        function(err, theClient) {
114
            if (!err) {
42✔
115
                db = theClient.db(params.database);
40✔
116
                client = theClient;
40✔
117
                sthLogger.info(
40✔
118
                    sthConfig.LOGGING_CONTEXT.DB_CONN_OPEN,
119
                    'Connection successfully established to the database %s at %s',
120
                    params.database,
121
                    connectionURL
122
                );
123
            }
124
            return process.nextTick(callback.bind(null, err, db));
42✔
125
        }
126
    );
127
}
128

129
/**
130
 * Closes a connection to the database asynchronously
131
 * @param {Function} callback Callback function to notify the result
132
 *  of the operation
133
 */
134
function closeConnection(callback) {
1✔
135
    sthLogger.info(sthConfig.LOGGING_CONTEXT.DB_CONN_CLOSE, 'Closing the connection to the database...');
3✔
136
    if (client) {
3!
137
        client.close(function(err) {
3✔
138
            if (err) {
3!
139
                sthLogger.error(
×
140
                    sthConfig.LOGGING_CONTEXT.DB_CONN_CLOSE,
141
                    'Error when closing the connection to the database: ' + err
142
                );
143
            } else {
144
                sthLogger.info(sthConfig.LOGGING_CONTEXT.DB_CONN_CLOSE, 'Connection to MongoDb succesfully closed');
3✔
145
                db = null;
3✔
146
            }
147
            return process.nextTick(callback.bind(null, err));
3✔
148
        });
149
    } else {
150
        sthLogger.info(sthConfig.LOGGING_CONTEXT.DB_CONN_CLOSE, 'No connection to the database available');
×
151
        return process.nextTick(callback);
×
152
    }
153
}
154

155
/**
156
 * Sets the unique index for the raw data collections
157
 * @param {object} collection The raw data collection
158
 */
159
function setRawDataUniqueIndex(collection) {
1✔
160
    let uniqueCompoundIndex;
21✔
161
    switch (sthConfig.DATA_MODEL) {
21✔
162
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
163
            uniqueCompoundIndex = {
5✔
164
                recvTime: 1,
165
                entityId: 1,
166
                entityType: 1,
167
                attrName: 1,
168
                attrType: 1,
169
                attrValue: 1
170
            };
171
            break;
5✔
172
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
173
            uniqueCompoundIndex = {
11✔
174
                recvTime: 1,
175
                attrName: 1,
176
                attrType: 1,
177
                attrValue: 1
178
            };
179
            break;
11✔
180
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
181
            uniqueCompoundIndex = {
5✔
182
                recvTime: 1,
183
                attrType: 1,
184
                attrValue: 1
185
            };
186
            break;
5✔
187
    }
188
    collection.ensureIndex(uniqueCompoundIndex, { unique: true }, function(err) {
21✔
189
        if (err) {
21!
190
            sthLogger.error(
×
191
                sthConfig.LOGGING_CONTEXT.DB_LOG,
192
                "Error when creating the unique compound index for collection '" +
193
                    collection.s.namespace.collection +
194
                    "': " +
195
                    err
196
            );
197
        }
198
    });
199
}
200

201
/**
202
 * Returns true is the collection name corresponds to an aggregated data collection. False otherwise.
203
 * @param collectionName The collection name
204
 * @return {boolean} True is the collection name corresponds to an aggregated data collection. False otherwise.
205
 */
206
function isAggregated(collectionName) {
1✔
207
    if (
360✔
208
        collectionName.lastIndexOf('.aggr') === -1 ||
556✔
209
        collectionName.lastIndexOf('.aggr') !== collectionName.length - '.aggr'.length
210
    ) {
211
        return false;
164✔
212
    }
213
    return true;
196✔
214
}
215

216
/**
217
 * Sets the time to live policy on a collection
218
 * @param {object} collection The collection
219
 */
220
function setTTLPolicy(collection) {
1✔
221
    // Set the TTL policy if required
222
    if (sthConfig.TRUNCATION_EXPIRE_AFTER_SECONDS > 0) {
2!
223
        if (!isAggregated(collection.collectionName)) {
×
224
            if (sthConfig.TRUNCATION_SIZE === 0) {
×
225
                collection.ensureIndex(
×
226
                    {
227
                        recvTime: 1
228
                    },
229
                    {
230
                        expireAfterSeconds: sthConfig.TRUNCATION_EXPIRE_AFTER_SECONDS
231
                    },
232
                    function(err) {
233
                        if (err) {
×
234
                            // prettier-ignore
235
                            sthLogger.error(
×
236
                                sthConfig.LOGGING_CONTEXT.DB_LOG,
237
                                "Error when creating the index for TTL for collection '" + collection.s.namespace.collection + "': " +
238
                                err
239
                            );
240
                        }
241
                    }
242
                );
243
            }
244
        } else {
245
            collection.ensureIndex(
×
246
                {
247
                    '_id.origin': 1
248
                },
249
                {
250
                    expireAfterSeconds: sthConfig.TRUNCATION_EXPIRE_AFTER_SECONDS
251
                },
252
                function(err) {
253
                    if (err) {
×
254
                        sthLogger.error(
×
255
                            sthConfig.LOGGING_CONTEXT.DB_LOG,
256
                            "Error when creating the index for TTL for collection '" +
257
                                collection.s.namespace.collection +
258
                                "': " +
259
                                err
260
                        );
261
                    }
262
                }
263
            );
264
        }
265
    }
266
}
267

268
/**
269
 * Returns asynchronously a reference to a collection of the database
270
 * @param  {object}   params   Params object including the following properties:
271
 *                               - service: The service
272
 *                               - servicePath: The service path
273
 *                               - entityId: The entity id
274
 *                               - entityType: The entity type
275
 *                               - attrName: The attribute name
276
 * @param  {object}   options  Options object including the following properties:
277
 *                               - isAggregated: Flag indicating if the aggregated collection is desired
278
 *                               - shouldCreate: Flag indicating if the collection should be created if it does not
279
 *                                 exist
280
 *                               - shouldTruncate: Flag indicating if the collection should be trucate in time or size
281
 * @param  {Function} callback The callback
282
 */
283
function getCollection(params, options, callback) {
1✔
284
    const isAggregated = options.isAggregated;
1,656✔
285
    const shouldCreate = options.shouldCreate;
1,656✔
286
    const shouldTruncate = options.shouldTruncate;
1,656✔
287

288
    const databaseName = sthDatabaseNaming.getDatabaseName(params.service);
1,656✔
289

290
    let collectionName;
1,656✔
291
    if (params.collection) {
1,656!
292
        collectionName = params.collection;
×
293
    } else {
294
        collectionName = isAggregated
1,656✔
295
            ? sthDatabaseNaming.getAggregatedCollectionName({
296
                  service: params.service,
297
                  servicePath: params.servicePath,
298
                  entityId: params.entityId,
299
                  entityType: params.entityType,
300
                  attrName: params.attrName
301
              })
302
            : sthDatabaseNaming.getRawCollectionName({
303
                  service: params.service,
304
                  servicePath: params.servicePath,
305
                  entityId: params.entityId,
306
                  entityType: params.entityType,
307
                  attrName: params.attrName
308
              });
309
    }
310

311
    if (!collectionName) {
1,656!
312
        const error = boom.badRequest('The collection name could not be generated');
×
313
        return process.nextTick(callback.bind(null, error));
×
314
    }
315

316
    //Call fetchCollection if DB is connected else raise error 500 DB is not connected
317
    if (isConnectionAlive()) {
1,656✔
318
        fetchCollection(databaseName, isAggregated, shouldTruncate, shouldCreate, collectionName, callback);
1,652✔
319
    } else {
320
        return callback({ name: 'MongoConnectionError' }, null);
4✔
321
    }
322
}
323

324
/**
325
 * Check whether database is connected or not
326
 */
327
function isConnectionAlive() {
1✔
328
    let dbConnected = true;
1,656✔
329
    if (db === null || (client && !client.isConnected())) {
1,656✔
330
        dbConnected = false;
4✔
331
    }
332
    return dbConnected;
1,656✔
333
}
334

335
/**
336
 * Returns asynchronously a reference to a collection of the database
337
 * @param {string} databaseName The name of the database
338
 * @param {boolean} isAggregated Flag indicating if the aggregated collection is desired
339
 * @param {boolean} shouldTruncate Flag indicating if the collection should be trucate in time or size
340
 * @param {boolean} shouldCreate Flag indicating if the collection should be created if it does not exist
341
 * @param {string} collectionName The name of the collection
342
 * @param {function} callback THe callback
343
 */
344
function fetchCollection(databaseName, isAggregated, shouldTruncate, shouldCreate, collectionName, callback) {
1✔
345
    // Switch to the right database
346
    const connection = client.db(databaseName);
1,652✔
347

348
    function createCollectionCB(err, collection) {
1✔
349
        if (err) {
42!
350
            if (err.message === 'collection already exists') {
×
351
                // We have observed that although leaving the strict option to the default value, sometimes
352
                //  we get a 'collection already exists' error when executing connection.db#createCollection()
353
                connection.collection(collectionName, { strict: true }, function(err, collection) {
×
354
                    return callback(err, collection);
×
355
                });
356
            } else {
357
                return callback(err, collection);
×
358
            }
359
        } else if (collection && !isAggregated) {
42✔
360
            setRawDataUniqueIndex(collection);
21✔
361
            if (shouldTruncate) {
21✔
362
                setTTLPolicy(collection);
2✔
363
            }
364
            return callback(err, collection);
21✔
365
        } else {
366
            return callback(err, collection);
21✔
367
        }
368
    }
369

370
    connection.collection(collectionName, { strict: true }, function(err, collection) {
1,652✔
371
        if (
1,652✔
372
            err &&
1,834✔
373
            err.message === 'Collection ' + collectionName + ' does not exist. Currently in strict mode.' &&
374
            shouldCreate
375
        ) {
376
            if (shouldTruncate && !isAggregated) {
42✔
377
                // Set the size removal policy if required
378
                if (sthConfig.TRUNCATION_SIZE > 0) {
2!
379
                    const collectionCreationOptions = {
×
380
                        capped: true,
381
                        size: sthConfig.TRUNCATION_SIZE,
382
                        max: sthConfig.TRUNCATION_MAX || null
×
383
                    };
384
                    return connection.createCollection(collectionName, collectionCreationOptions, createCollectionCB);
×
385
                }
386
            }
387
            connection.createCollection(collectionName, createCollectionCB);
42✔
388
        } else {
389
            return callback(err, collection);
1,610✔
390
        }
391
    });
392
}
393

394
/**
395
 * Saves the contents of a readable stream into a file in the local file system
396
 * @param {string} attrName The name of the attribute the stream contents refers to
397
 * @param {object} stream The stream
398
 * @param {string} fileName The file name where the stream contents should be stored
399
 * @param {function} callback THe callback
400
 */
401
function save2File(attrName, stream, fileName, callback) {
1✔
402
    const tempDir = ROOT_PATH + path.sep + sthConfig.TEMPORAL_DIR;
36✔
403
    if (!fs.existsSync(tempDir) || (fs.existsSync(tempDir) && !fs.statSync(tempDir).isDirectory())) {
36✔
404
        mkdirp.sync(tempDir);
1✔
405
    }
406

407
    const outputFile = fs.createWriteStream(tempDir + path.sep + fileName, { encoding: 'utf8' });
36✔
408
    stream
36✔
409
        .pipe(jsoncsv.csv(getJSONCSVOptions(attrName)))
410
        .pipe(outputFile)
411
        .on('finish', callback);
412
}
413

414
/**
415
 * Generates a CSV file from a stream containing raw data associated to certain attribute
416
 * @param {string} attrName The attribute name
417
 * @param {object} stream The stream
418
 * @param {function} callback The callback
419
 */
420
function generateCSV(attrName, stream, callback) {
1✔
421
    if (count === Number.MAX_SAFE_INTEGER) {
36!
422
        count = 0;
×
423
    }
424
    const fileName = attrName + '-' + Date.now() + '-' + count + '.csv';
36✔
425
    count++;
36✔
426
    save2File(
36✔
427
        attrName,
428
        stream,
429
        fileName,
430
        callback.bind(null, null, ROOT_PATH + path.sep + sthConfig.TEMPORAL_DIR + path.sep + fileName)
431
    );
432
}
433

434
/**
435
 * Returns the required raw data from the database asynchronously
436
 * @param {object} data The data for which return the raw data. It is an object including the following properties:
437
 *  - {object} collection: The collection from where the data should be extracted
438
 *  - {string} entityId: The entity id related to the event
439
 *  - {string} entityType: The type of entity related to the event
440
 *  - {string} attrName: The attribute id related to the event
441
 *  - {number} lastN: Only return the last n matching entries
442
 *  - {number} hLimit: Maximum number of results to retrieve when paginating
443
 *  - {number} hOffset: Offset to apply when paginating
444
 *  - {date} from: The date from which retrieve the aggregated data
445
 *  - {date} to: The date to which retrieve the aggregated data
446
 *  - {string} filetype: The file type to return the data in
447
 * @param {Function} callback Callback to inform about any possible error or results
448
 */
449
function getRawData(data, callback) {
1✔
450
    const collection = data.collection;
450✔
451
    const entityId = data.entityId;
450✔
452
    const entityType = data.entityType;
450✔
453
    const attrName = data.attrName;
450✔
454
    const lastN = data.lastN;
450✔
455
    const hLimit = data.hLimit;
450✔
456
    const hOffset = data.hOffset;
450✔
457
    const from = data.from;
450✔
458
    const to = data.to;
450✔
459
    const filetype = data.filetype;
450✔
460

461
    let findCondition;
450✔
462
    switch (sthConfig.DATA_MODEL) {
450✔
463
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
464
            findCondition = {
258✔
465
                entityId,
466
                entityType,
467
                attrName
468
            };
469
            break;
258✔
470
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
471
            findCondition = {
96✔
472
                attrName
473
            };
474
            break;
96✔
475
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
476
            findCondition = {};
96✔
477
            break;
96✔
478
    }
479

480
    let recvTimeFilter;
450✔
481
    if (from && to) {
450✔
482
        recvTimeFilter = {
58✔
483
            $lte: to,
484
            $gte: from
485
        };
486
    } else if (from) {
392✔
487
        recvTimeFilter = {
98✔
488
            $gte: from
489
        };
490
    } else if (to) {
294✔
491
        recvTimeFilter = {
98✔
492
            $lte: to
493
        };
494
    }
495
    if (recvTimeFilter) {
450✔
496
        findCondition.recvTime = recvTimeFilter;
254✔
497
    }
498

499
    let cursor;
450✔
500
    let totalCount = 0;
450✔
501
    if (lastN || lastN === 0) {
450✔
502
        cursor = collection
128✔
503
            .find(findCondition, {
504
                _id: 0,
505
                attrType: 1,
506
                attrValue: 1,
507
                recvTime: 1
508
            })
509
            .sort({ recvTime: -1 });
510
        cursor.count(function(err, count) {
128✔
511
            totalCount = count;
128✔
512
            cursor = cursor.limit(lastN);
128✔
513
            if (filetype === 'csv') {
128!
514
                generateCSV(attrName, cursor.stream(), callback);
×
515
            } else {
516
                cursor.toArray(function(err, results) {
128✔
517
                    if (!err) {
128!
518
                        results.reverse();
128✔
519
                    }
520
                    return process.nextTick(callback.bind(null, err, results, totalCount));
128✔
521
                });
522
            }
523
        });
524
    } else if (hOffset || hLimit) {
322✔
525
        cursor = collection
106✔
526
            .find(findCondition, {
527
                _id: 0,
528
                attrType: 1,
529
                attrValue: 1,
530
                recvTime: 1
531
            })
532
            .sort({ recvTime: 1 });
533
        cursor.count(function(err, count) {
106✔
534
            totalCount = count;
106✔
535
            cursor = cursor.skip(hOffset || 0).limit(hLimit || 0);
106!
536
            if (filetype === 'csv') {
106!
537
                generateCSV(attrName, cursor.stream(), callback);
×
538
            } else {
539
                cursor.toArray(function(err, results) {
106✔
540
                    return process.nextTick(callback.bind(null, err, results, totalCount));
106✔
541
                });
542
            }
543
        });
544
    } else {
545
        cursor = collection.find(findCondition, {
216✔
546
            _id: 0,
547
            attrType: 1,
548
            attrValue: 1,
549
            recvTime: 1
550
        });
551
        cursor.count(function(err, count) {
216✔
552
            totalCount = count;
216✔
553
            if (filetype === 'csv') {
216✔
554
                generateCSV(attrName, cursor.stream(), callback);
36✔
555
            } else {
556
                cursor.toArray(function(err, results) {
180✔
557
                    return process.nextTick(callback.bind(null, err, results, totalCount));
180✔
558
                });
559
            }
560
        });
561
    }
562
}
563

564
/**
565
 * Filters out a concrete point from the beginning of the results array
566
 * @param points The points array
567
 * @param i The point index to filter out
568
 * @param aggregatedFunction The aggregated function
569
 * @param shouldFilter A flag indicating if results have been filtered out
570
 */
571
function filterPointFromBeginning(points, i, aggregatedFunction, shouldFilter) {
1✔
572
    if (shouldFilter) {
×
573
        points.splice(i, 1);
×
574
        i--;
×
575
    } else {
576
        points[i].samples = 0;
×
577
        if (aggregatedFunction === 'occur') {
×
578
            points[i].occur = {};
×
579
        } else if (aggregatedFunction === 'min') {
×
580
            points[i].min = Number.POSITIVE_INFINITY;
×
581
        } else if (aggregatedFunction === 'max') {
×
582
            points[i].max = Number.NEGATIVE_INFINITY;
×
583
        } else {
584
            points[i][aggregatedFunction] = 0;
×
585
        }
586
    }
587
    return i;
×
588
}
589

590
/**
591
 * Filters out points from the beginning of the results array
592
 * @param results The results array from which the points should be removed
593
 * @param minOffset The minimum offset which should be included in the results
594
 * @param aggregatedFunction The aggregation function
595
 * @param shouldFilter A flag indicating if the results should be filtered
596
 */
597
function filterResultsFromBeginning(results, minOffset, aggregatedFunction, shouldFilter) {
1✔
598
    const points = results[0].points;
285✔
599
    for (let i = 0; i < points.length; i++) {
285✔
600
        if (points[i]) {
450!
601
            if (points[i].offset < minOffset) {
450✔
602
                if (points[i].samples) {
165!
603
                    i = filterPointFromBeginning(points, i, aggregatedFunction, shouldFilter);
×
604
                }
605
            } else {
606
                break;
285✔
607
            }
608
        }
609
    }
610
    if (!points.length) {
285!
611
        results.splice(0, 1);
×
612
    }
613
}
614

615
/**
616
 * Filters out a concrete point from the end of the results array
617
 * @param points The points array
618
 * @param i The point index to filter out
619
 * @param aggregatedFunction The aggregated function
620
 * @param shouldFilter A flag indicating if results have been filtered out
621
 */
622
function filterPointFromEnd(points, i, aggregatedFunction, shouldFilter) {
1✔
623
    if (shouldFilter) {
×
624
        points.splice(i, 1);
×
625
    } else {
626
        points[i].samples = 0;
×
627
        if (aggregatedFunction === 'occur') {
×
628
            points[i].occur = {};
×
629
        } else if (aggregatedFunction === 'min') {
×
630
            points[i].min = Number.POSITIVE_INFINITY;
×
631
        } else if (aggregatedFunction === 'max') {
×
632
            points[i].max = Number.NEGATIVE_INFINITY;
×
633
        } else {
634
            points[i][aggregatedFunction] = 0;
×
635
        }
636
    }
637
}
638

639
/**
640
 * Filters out points from the end of the results array
641
 * @param results The results array from which the points should be removed
642
 * @param maxOffset The maximum offset which should be included in the results
643
 * @param aggregatedFunction The aggregation function
644
 * @param shouldFilter A flag indicating if the results should be filtered
645
 */
646
function filterResultsFromEnd(results, maxOffset, aggregatedFunction, shouldFilter) {
1✔
647
    const points = results[results.length - 1].points;
150✔
648
    for (let i = points.length - 1; i >= 0; i--) {
150✔
649
        if (points[i]) {
150!
650
            if (points[i].offset > maxOffset) {
150!
651
                if (points[i].samples) {
×
652
                    filterPointFromEnd(points, i, aggregatedFunction, shouldFilter);
×
653
                }
654
            } else {
655
                break;
150✔
656
            }
657
        }
658
    }
659
    if (!points.length) {
150!
660
        results.splice(results.length - 1, 1);
×
661
    }
662
}
663

664
/**
665
 * Filters out the results based on the resolution and the optional from and to dates. For certain resolution, the
666
 *  from and to dates are considered and applied until the unit of time indicated by the resolution.
667
 * @param results The array of results
668
 * @param options Additional data to be considered in the filtering. It is an object including the following properties:
669
 *  - resolution: The resolution
670
 *  - from: The starting date
671
 *  - to: The ending date
672
 *  - aggregatedFunction: The aggregation function
673
 *  - shouldFilter: Flag indicating if null results should be filtered
674
 */
675
function filterResults(results, options) {
1✔
676
    const resolution = options.resolution;
918✔
677
    const from = options.from;
918✔
678
    const to = options.to;
918✔
679
    const aggregatedFunction = options.aggregatedFunction;
918✔
680
    const shouldFilter = options.shouldFilter;
918✔
681

682
    if (!results.length) {
918✔
683
        return;
363✔
684
    }
685

686
    if (from && results[0]._id.origin.getTime() === sthUtils.getOrigin(from, resolution).getTime()) {
555✔
687
        let minOffset;
285✔
688
        switch (resolution) {
285!
689
            case sthConfig.RESOLUTION.SECOND:
690
                minOffset = from.getUTCSeconds();
×
691
                break;
×
692
            case sthConfig.RESOLUTION.MINUTE:
693
                minOffset = from.getUTCMinutes();
95✔
694
                break;
95✔
695
            case sthConfig.RESOLUTION.HOUR:
696
                minOffset = from.getUTCHours();
95✔
697
                break;
95✔
698
            case sthConfig.RESOLUTION.DAY:
699
                minOffset = from.getUTCDate();
95✔
700
                break;
95✔
701
            case sthConfig.RESOLUTION.MONTH:
702
                minOffset = from.getUTCMonth();
×
703
                break;
×
704
        }
705
        filterResultsFromBeginning(results, minOffset, aggregatedFunction, shouldFilter);
285✔
706
    }
707
    if (
555✔
708
        results.length &&
1,305✔
709
        to &&
710
        results[results.length - 1]._id.origin.getTime() === sthUtils.getOrigin(to, resolution).getTime()
711
    ) {
712
        let maxOffset;
150✔
713
        switch (resolution) {
150!
714
            case sthConfig.RESOLUTION.SECOND:
715
                maxOffset = to.getUTCSeconds();
×
716
                break;
×
717
            case sthConfig.RESOLUTION.MINUTE:
718
                maxOffset = to.getUTCMinutes();
50✔
719
                break;
50✔
720
            case sthConfig.RESOLUTION.HOUR:
721
                maxOffset = to.getUTCHours();
50✔
722
                break;
50✔
723
            case sthConfig.RESOLUTION.DAY:
724
                maxOffset = to.getUTCDate();
50✔
725
                break;
50✔
726
            case sthConfig.RESOLUTION.MONTH:
727
                maxOffset = to.getUTCMonth();
×
728
                break;
×
729
        }
730
        filterResultsFromEnd(results, maxOffset, aggregatedFunction, shouldFilter);
150✔
731
    }
732
}
733

734
/**
735
 * Returns the required aggregated data from the database asynchronously
736
 * @param {object} data The data to get the aggregated data. It is an object including the following properties:
737
 *  - {object} collection: The collection from where the data should be extracted
738
 *  - {string} entityId: The entity id related to the event
739
 *  - {string} entityType: The type of entity related to the event
740
 *  - {string} attrName: The attribute id related to the event
741
 *  - {string} aggregatedFunction: The aggregated function or method to retrieve
742
 *  - {string} resolution: The resolution of the data to use
743
 *  - {date} from: The date from which retrieve the aggregated data
744
 *  - {date} to: The date to which retrieve the aggregated data
745
 *  - {boolean} shouldFilter: If true, the null results are filter out
746
 * @param {Function} callback Callback to inform about any possible error or results
747
 */
748
function getAggregatedData(data, callback) {
1✔
749
    const collection = data.collection;
918✔
750
    const entityId = data.entityId;
918✔
751
    const entityType = data.entityType;
918✔
752
    const attrName = data.attrName;
918✔
753
    const aggregatedFunction = data.aggregatedFunction;
918✔
754
    const resolution = data.resolution;
918✔
755
    const from = data.from;
918✔
756
    const to = data.to;
918✔
757
    const shouldFilter = data.shouldFilter;
918✔
758

759
    const fieldFilter = {
918✔
760
        'points.offset': 1,
761
        'points.samples': 1
762
    };
763
    fieldFilter['points.' + aggregatedFunction] = 1;
918✔
764

765
    let originFilter;
918✔
766
    if (from && to) {
918✔
767
        originFilter = {
270✔
768
            $lte: sthUtils.getOrigin(to, resolution),
769
            $gte: sthUtils.getOrigin(from, resolution)
770
        };
771
    } else if (from) {
648✔
772
        originFilter = {
234✔
773
            $gte: sthUtils.getOrigin(from, resolution)
774
        };
775
    } else if (to) {
414✔
776
        originFilter = {
54✔
777
            $lte: sthUtils.getOrigin(to, resolution)
778
        };
779
    }
780

781
    if (shouldFilter) {
918✔
782
        const pushAccumulator = {
540✔
783
            offset: '$points.offset',
784
            samples: '$points.samples'
785
        };
786
        pushAccumulator[aggregatedFunction] = '$points.' + aggregatedFunction;
540✔
787

788
        let matchCondition;
540✔
789
        switch (sthConfig.DATA_MODEL) {
540!
790
            case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
791
                matchCondition = {
540✔
792
                    '_id.entityId': entityId,
793
                    '_id.entityType': entityType,
794
                    '_id.attrName': attrName,
795
                    '_id.resolution': resolution
796
                };
797
                break;
540✔
798
            case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
799
                matchCondition = {
×
800
                    '_id.attrName': attrName,
801
                    '_id.resolution': resolution
802
                };
803
                break;
×
804
            case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
805
                matchCondition = {
×
806
                    '_id.resolution': resolution
807
                };
808
                break;
×
809
        }
810
        if (originFilter) {
540✔
811
            matchCondition['_id.origin'] = originFilter;
450✔
812
        }
813

814
        let groupId;
540✔
815
        switch (sthConfig.DATA_MODEL) {
540!
816
            case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
817
                groupId = {
540✔
818
                    entityId: '$_id.entityId',
819
                    entityType: '$_id.entityType',
820
                    attrName: '$_id.attrName',
821
                    origin: '$_id.origin',
822
                    resolution: '$_id.resolution'
823
                };
824
                break;
540✔
825
            case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
826
                groupId = {
×
827
                    attrName: '$_id.attrName',
828
                    origin: '$_id.origin',
829
                    resolution: '$_id.resolution'
830
                };
831
                break;
×
832
            case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
833
                groupId = {
×
834
                    origin: '$_id.origin',
835
                    resolution: '$_id.resolution'
836
                };
837
                break;
×
838
        }
839

840
        collection.aggregate(
540✔
841
            [
842
                {
843
                    $match: matchCondition
844
                },
845
                {
846
                    $project: fieldFilter
847
                },
848
                {
849
                    $unwind: '$points'
850
                },
851
                {
852
                    $match: {
853
                        'points.samples': {
854
                            $gt: 0
855
                        }
856
                    }
857
                },
858
                {
859
                    $group: {
860
                        _id: groupId,
861
                        points: {
862
                            $push: pushAccumulator
863
                        }
864
                    }
865
                },
866
                {
867
                    $sort: {
868
                        '_id.origin': 1
869
                    }
870
                }
871
            ],
872
            function(err, cursor) {
873
                cursor.toArray(function(err, resultsArr) {
540✔
874
                    filterResults(resultsArr, {
540✔
875
                        resolution,
876
                        from,
877
                        to,
878
                        aggregatedFunction,
879
                        shouldFilter
880
                    });
881
                    process.nextTick(callback.bind(null, err, resultsArr));
540✔
882
                });
883
            }
884
        );
885
    } else {
886
        // Get the aggregated data from the database
887
        // Return the data in ascending order based on the origin
888
        let findCondition;
378✔
889
        switch (sthConfig.DATA_MODEL) {
378✔
890
            case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
891
                findCondition = {
126✔
892
                    '_id.entityId': entityId,
893
                    '_id.entityType': entityType,
894
                    '_id.attrName': attrName,
895
                    '_id.resolution': resolution
896
                };
897
                break;
126✔
898
            case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
899
                findCondition = {
126✔
900
                    '_id.attrName': attrName,
901
                    '_id.resolution': resolution
902
                };
903
                break;
126✔
904
            case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
905
                findCondition = {
126✔
906
                    '_id.resolution': resolution
907
                };
908
                break;
126✔
909
        }
910
        if (originFilter) {
378✔
911
            findCondition['_id.origin'] = originFilter;
108✔
912
        }
913

914
        collection
378✔
915
            .find(findCondition, fieldFilter)
916
            .sort({ '_id.origin': 1 })
917
            .toArray(function(err, resultsArr) {
918
                filterResults(resultsArr, {
378✔
919
                    resolution,
920
                    from,
921
                    to,
922
                    aggregatedFunction,
923
                    shouldFilter
924
                });
925
                process.nextTick(callback.bind(null, err, resultsArr));
378✔
926
            });
927
    }
928
}
929

930
/**
931
 * Returns the condition to be used in the MongoDB update operation for aggregated data
932
 * @param {object} data The data from which the update condition should be generated. It is an object including the
933
 *  following properties:
934
 *    - {string} entityId: The entity id
935
 *    - {string} entityType: The entity type
936
 *    - {string} attrName: The attribute name
937
 *    - {string} resolution The resolution
938
 *    - {date} recvTime The date (or recvTime) of the notification (attribute value change)
939
 * @returns {Object} The update condition
940
 */
941
function getAggregateUpdateCondition(data) {
1✔
942
    const entityId = data.entityId;
497✔
943
    const entityType = data.entityType;
497✔
944
    const attrName = data.attrName;
497✔
945
    const resolution = data.resolution;
497✔
946
    const timestamp = data.timestamp;
497✔
947

948
    const offset = sthUtils.getOffset(resolution, timestamp);
497✔
949

950
    let aggregateUpdateCondition;
497✔
951
    switch (sthConfig.DATA_MODEL) {
497✔
952
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
953
            aggregateUpdateCondition = {
245✔
954
                '_id.entityId': entityId,
955
                '_id.entityType': entityType,
956
                '_id.attrName': attrName,
957
                '_id.origin': sthUtils.getOrigin(timestamp, resolution),
958
                '_id.resolution': resolution,
959
                'points.offset': offset
960
            };
961
            break;
245✔
962
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
963
            aggregateUpdateCondition = {
156✔
964
                '_id.attrName': attrName,
965
                '_id.origin': sthUtils.getOrigin(timestamp, resolution),
966
                '_id.resolution': resolution,
967
                'points.offset': offset
968
            };
969
            break;
156✔
970
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
971
            aggregateUpdateCondition = {
96✔
972
                '_id.origin': sthUtils.getOrigin(timestamp, resolution),
973
                '_id.resolution': resolution,
974
                'points.offset': offset
975
            };
976
            break;
96✔
977
    }
978
    return aggregateUpdateCondition;
497✔
979
}
980

981
/**
982
 * Returns the data to prepopulate the aggregated data collection with
983
 * @param {string} attrType The attribute type
984
 * @param {string} attrValue The attribute value
985
 * @param {string} resolution The resolution
986
 */
987
function getAggregatePrepopulatedData(attrType, attrValue, resolution) {
1✔
988
    const points = [];
244✔
989
    let totalValues;
244✔
990
    let offsetOrigin = 0;
244✔
991

992
    switch (resolution) {
244✔
993
        case sthConfig.RESOLUTION.SECOND:
994
            totalValues = 60;
2✔
995
            break;
2✔
996
        case sthConfig.RESOLUTION.MINUTE:
997
            totalValues = 60;
80✔
998
            break;
80✔
999
        case sthConfig.RESOLUTION.HOUR:
1000
            totalValues = 24;
80✔
1001
            break;
80✔
1002
        case sthConfig.RESOLUTION.DAY:
1003
            offsetOrigin = 1;
80✔
1004
            totalValues = 32;
80✔
1005
            break;
80✔
1006
        case sthConfig.RESOLUTION.MONTH:
1007
            offsetOrigin = 1;
2✔
1008
            totalValues = 13;
2✔
1009
            break;
2✔
1010
    }
1011

1012
    if (sthUtils.getAggregationType(attrValue) === sthConfig.AGGREGATIONS.NUMERIC) {
244✔
1013
        for (let i = offsetOrigin; i < totalValues; i++) {
122✔
1014
            points.push({
4,672✔
1015
                offset: i,
1016
                samples: 0,
1017
                sum: 0,
1018
                sum2: 0,
1019
                min: Number.POSITIVE_INFINITY,
1020
                max: Number.NEGATIVE_INFINITY
1021
            });
1022
        }
1023
    } else {
1024
        for (let j = offsetOrigin; j < totalValues; j++) {
122✔
1025
            const entry = {
4,672✔
1026
                offset: j,
1027
                samples: 0,
1028
                occur: {}
1029
            };
1030
            points.push(entry);
4,672✔
1031
        }
1032
    }
1033

1034
    return points;
244✔
1035
}
1036

1037
/**
1038
 * Returns the update to be used in the MongoDB update operation for aggregated data
1039
 * @param {string} attrType The type of the attribute to aggregate
1040
 * @param {string} attrValue The value of the attribute to aggregate
1041
 * @param {string} resolution The resolution
1042
 * @returns {Object} The update operation
1043
 */
1044
function getAggregateUpdate4Insert(attrType, attrValue, resolution) {
1✔
1045
    return {
244✔
1046
        $setOnInsert: {
1047
            attrType,
1048
            points: getAggregatePrepopulatedData(attrType, attrValue, resolution)
1049
        }
1050
    };
1051
}
1052

1053
/**
1054
 * Returns the update to be used in the MongoDB update operation for aggregated data
1055
 * @param {number} attrType The type of the attribute to aggregate
1056
 * @param {number} attrValue The value of the attribute to aggregate
1057
 * @param {string} resolution The resolution
1058
 * @param {date} recvTime The data reception time
1059
 * @returns {Object} The update operation
1060
 */
1061
function getAggregateUpdate4Update(attrType, attrValue, resolution, recvTime) {
1✔
1062
    let aggregateUpdate4Update;
244✔
1063
    let attrValueAsNumber;
244✔
1064
    let escapedAttrValue;
244✔
1065
    if (sthUtils.getAggregationType(attrValue) === sthConfig.AGGREGATIONS.NUMERIC) {
244✔
1066
        attrValueAsNumber = parseFloat(attrValue);
122✔
1067
        aggregateUpdate4Update = {
122✔
1068
            $set: {
1069
                attrType
1070
            },
1071
            $inc: {
1072
                'points.$.samples': 1,
1073
                'points.$.sum': attrValueAsNumber,
1074
                'points.$.sum2': Math.pow(attrValueAsNumber, 2)
1075
            },
1076
            $min: {
1077
                'points.$.min': attrValueAsNumber
1078
            },
1079
            $max: {
1080
                'points.$.max': attrValueAsNumber
1081
            }
1082
        };
1083
    } else {
1084
        const offset = sthUtils.getOffset(resolution, recvTime);
122✔
1085
        aggregateUpdate4Update = {
122✔
1086
            $set: {
1087
                attrType
1088
            },
1089
            $inc: {
1090
                'points.$.samples': 1
1091
            }
1092
        };
1093
        escapedAttrValue = attrValue.replace(/\$/g, '\uFF04').replace(/\./g, '\uFF0E');
122✔
1094
        // prettier-ignore
1095
        aggregateUpdate4Update.$inc[ 'points.' +
122✔
1096
            (offset - (resolution === 'day' || resolution === 'month' || resolution === 'year' ? 1 : 0)) +
407✔
1097
            '.occur.' + escapedAttrValue] = 1;
1098
    }
1099
    return aggregateUpdate4Update;
244✔
1100
}
1101

1102
/**
1103
 * Returns the update to be used in the MongoDB update operation for the removal of aggregated data
1104
 * @param {object} data Object including the following properties:
1105
 *  - {string} attrType The type of the attribute to aggregate
1106
 *  - {string} attrValue The value of the attribute to aggregate
1107
 *  - {object} notificationInfo Information about the notification
1108
 *  - {string} resolution The resolution
1109
 *  - {date} recvTime The data reception time
1110
 * @returns {Object} The update operation
1111
 */
1112
function getAggregateUpdate4Removal(data) {
1✔
1113
    const attrType = data.attrType;
9✔
1114
    const attrValue = data.attrValue;
9✔
1115
    const notificationInfo = data.notificationInfo;
9✔
1116
    const resolution = data.resolution;
9✔
1117
    const timestamp = data.timestamp;
9✔
1118
    let aggregateUpdate4Removal;
9✔
1119
    let escapedAttrValue = parseFloat(notificationInfo.updates.attrValue);
9✔
1120
    if (sthUtils.getAggregationType(notificationInfo.updates.attrValue) === sthConfig.AGGREGATIONS.NUMERIC) {
9✔
1121
        aggregateUpdate4Removal = {
6✔
1122
            $inc: {
1123
                'points.$.samples': -1,
1124
                'points.$.sum': -escapedAttrValue,
1125
                'points.$.sum2': -Math.pow(escapedAttrValue, 2)
1126
            },
1127
            $set: {
1128
                attrType,
1129
                'points.$.min':
1130
                    notificationInfo.newMinValues && notificationInfo.newMinValues[resolution]
18!
1131
                        ? Math.min(parseFloat(notificationInfo.newMinValues[resolution]), parseFloat(attrValue))
1132
                        : parseFloat(attrValue),
1133
                'points.$.max':
1134
                    notificationInfo.newMaxValues && notificationInfo.newMaxValues[resolution]
18!
1135
                        ? Math.max(parseFloat(notificationInfo.newMaxValues[resolution]), parseFloat(attrValue))
1136
                        : parseFloat(attrValue)
1137
            }
1138
        };
1139
    } else {
1140
        const offset = sthUtils.getOffset(resolution, timestamp);
3✔
1141
        aggregateUpdate4Removal = {
3✔
1142
            $set: {
1143
                attrType
1144
            },
1145
            $inc: {
1146
                'points.$.samples': -1
1147
            }
1148
        };
1149
        escapedAttrValue = notificationInfo.updates.attrValue.replace(/\$/g, '\uFF04').replace(/\./g, '\uFF0E');
3✔
1150
        aggregateUpdate4Removal.$inc[
3✔
1151
            'points.' +
1152
                (offset - (resolution === 'day' || resolution === 'month' || resolution === 'year' ? 1 : 0)) +
10✔
1153
                '.occur.' +
1154
                escapedAttrValue
1155
        ] = -1;
1156
    }
1157
    return aggregateUpdate4Removal;
9✔
1158
}
1159

1160
/**
1161
 * Removes previously aggregated data based on the data included in the received notification
1162
 * @param {object} data The data to be stored. It is an object including the following properties:
1163
 *  - {object} collection: The collection where the data should be stored
1164
 *  - {string} entityId The entity id
1165
 *  - {string} entityType: The entity type
1166
 *  - {string} attrName: The attribute name
1167
 *  - {string} attrType: The attribute type
1168
 *  - {string} attrValue: The attribute value
1169
 *  - {string} resolution: The resolution
1170
 *  - {date} timestamp: The attribute value timestamp
1171
 *  - {object} notificationInfo: Info about the notification
1172
 * @param {Function} callback Function to call once the operation completes
1173
 */
1174
function removePreviouslyAggregatedData(data, callback) {
1✔
1175
    // Undo the previously aggregated data if any
1176
    if (data.notificationInfo.updates) {
244✔
1177
        data.collection.update(
9✔
1178
            getAggregateUpdateCondition({
1179
                entityId: data.entityId,
1180
                entityType: data.entityType,
1181
                attrName: data.attrName,
1182
                resolution: data.resolution,
1183
                timestamp: data.timestamp
1184
            }),
1185
            getAggregateUpdate4Removal({
1186
                attrType: data.attrType,
1187
                attrValue: data.attrValue,
1188
                notificationInfo: data.notificationInfo,
1189
                resolution: data.resolution,
1190
                timestamp: data.timestamp
1191
            }),
1192
            {
1193
                writeConcern: {
1194
                    w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
9!
1195
                }
1196
            },
1197
            function(err) {
1198
                if (callback) {
9!
1199
                    process.nextTick(callback.bind(null, err));
9✔
1200
                }
1201
            }
1202
        );
1203
    } else {
1204
        return process.nextTick(callback);
235✔
1205
    }
1206
}
1207

1208
/**
1209
 * Updates the aggregated data based on the included in the received notification
1210
 * @param {object} data The data to be stored. It is an object including the following properties:
1211
 *  - {object} collection: The collection where the data should be stored
1212
 *  - {string} entityId The entity id
1213
 *  - {string} entityType: The entity type
1214
 *  - {string} attrName: The attribute name
1215
 *  - {string} attrType: The attribute type
1216
 *  - {string} attrValue: The attribute value
1217
 *  - {string} resolution: The resolution
1218
 *  - {date} timestamp: The attribute value timestamp
1219
 *  - {object} notificationInfo: Info about the notification
1220
 * @param {Function} callback Function to call once the operation completes
1221
 */
1222
function updateAggregatedData(data, callback) {
1✔
1223
    data.collection.update(
244✔
1224
        getAggregateUpdateCondition({
1225
            entityId: data.entityId,
1226
            entityType: data.entityType,
1227
            attrName: data.attrName,
1228
            resolution: data.resolution,
1229
            timestamp: data.timestamp
1230
        }),
1231
        getAggregateUpdate4Update(data.attrType, data.attrValue, data.resolution, data.timestamp),
1232
        {
1233
            writeConcern: {
1234
                w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
244!
1235
            }
1236
        },
1237
        function(err) {
1238
            if (err && callback) {
244!
1239
                return process.nextTick(callback.bind(null, err));
×
1240
            }
1241
            removePreviouslyAggregatedData(data, callback);
244✔
1242
        }
1243
    );
1244
}
1245

1246
/**
1247
 * Stores the aggregated data for a new event (attribute value)
1248
 * @param {object} data The data to be stored. It is an object including the following properties:
1249
 *  - {object} collection: The collection where the data should be stored
1250
 *  - {string} entityId The entity id
1251
 *  - {string} entityType: The entity type
1252
 *  - {string} attrName: The attribute name
1253
 *  - {string} attrType: The attribute type
1254
 *  - {string} attrValue: The attribute value
1255
 *  - {string} resolution: The resolution
1256
 *  - {date} timestamp: The attribute value timestamp
1257
 *  - {object} notificationInfo: Info about the notification
1258
 * @param {Function} callback Function to call once the operation completes
1259
 */
1260
function storeAggregatedData4Resolution(data, callback) {
1✔
1261
    const collection = data.collection;
244✔
1262
    const entityId = data.entityId;
244✔
1263
    const entityType = data.entityType;
244✔
1264
    const attrName = data.attrName;
244✔
1265
    const attrType = data.attrType;
244✔
1266
    const attrValue = data.attrValue;
244✔
1267
    const resolution = data.resolution;
244✔
1268
    const timestamp = data.timestamp;
244✔
1269

1270
    /*
1271
   Currently the MongoDB $ positional update operator cannot be combined with upserts
1272
     (see http://docs.mongodb.org/manual/reference/operator/update/positional/#upsert).
1273
   This issue is known and currently under study: https://jira.mongodb.org/browse/SERVER-3326
1274
   Once the issue is solved, it will be possible to prepopulate collections or update their docs
1275
     using just one update operation like this:
1276
     collection.update(
1277
       // Returning all the update operators currently returned by getAggregateUpdate4Insert
1278
       //  and getAggregateUpdate4Update in the same object
1279
       getAggregateUpdateCondition(entityId, entityType, attrName, resolution, timestamp),
1280
       getAggregateUpdate(attrValue),
1281
       {
1282
         upsert: true,
1283
         writeConcern: {
1284
           w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
1285
         }
1286
       },
1287
       function (err) {
1288
         callback(err);
1289
       }
1290
     );
1291
  */
1292

1293
    // Prepopulate the aggregated data collection if there is no entry for the concrete
1294
    //  origin and resolution.
1295
    collection.update(
244✔
1296
        getAggregateUpdateCondition({
1297
            entityId,
1298
            entityType,
1299
            attrName,
1300
            resolution,
1301
            timestamp
1302
        }),
1303
        getAggregateUpdate4Insert(attrType, attrValue, resolution),
1304
        {
1305
            upsert: true,
1306
            writeConcern: {
1307
                w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
244!
1308
            }
1309
        },
1310
        function(err) {
1311
            if (err && callback) {
244!
1312
                return process.nextTick(callback.bind(null, err));
×
1313
            }
1314
            updateAggregatedData(data, callback);
244✔
1315
        }
1316
    );
1317
}
1318

1319
/**
1320
 * Stores the aggregated data for a new event (attribute value)
1321
 * @param {object} data The data to be stored. It is an object including the following properties:
1322
 *  - {object} collection: The collection where the data should be stored in
1323
 *  - {date} recvTime The date the event arrived
1324
 *  - {string} entityId The entity id associated to updated attribute
1325
 *  - {string} entityType The entity type associated to the updated attribute
1326
 *  - {object} attribute The updated attribute
1327
 * @param {Function} callback Function to call once the operation completes
1328
 */
1329
function storeAggregatedData(data, callback) {
1✔
1330
    let counter = 0;
78✔
1331
    let error;
78✔
1332

1333
    const collection = data.collection;
78✔
1334
    const recvTime = data.recvTime;
78✔
1335
    const entityId = data.entityId;
78✔
1336
    const entityType = data.entityType;
78✔
1337
    const attribute = data.attribute;
78✔
1338
    const notificationInfo = data.notificationInfo;
78✔
1339

1340
    function onCompletion(err) {
1✔
1341
        error = error || err;
234✔
1342
        if (++counter === sthConfig.AGGREGATION_BY.length) {
234✔
1343
            callback(error);
78✔
1344
        }
1345
    }
1346

1347
    const timestamp = sthUtils.getAttributeTimestamp(attribute, recvTime);
78✔
1348

1349
    sthConfig.AGGREGATION_BY.forEach(function(entry) {
78✔
1350
        storeAggregatedData4Resolution(
234✔
1351
            {
1352
                collection,
1353
                entityId,
1354
                entityType,
1355
                attrName: attribute.name,
1356
                attrType: attribute.type,
1357
                attrValue: attribute.value,
1358
                resolution: entry,
1359
                timestamp,
1360
                notificationInfo
1361
            },
1362
            onCompletion
1363
        );
1364
    });
1365
}
1366

1367
/**
1368
 * Updates already registered raw data
1369
 * @param collection The collection where the raw data is stored
1370
 * @param oldRawData Old raw data to update
1371
 * @param newRawData The new raw data received
1372
 * @param callback The callback to notify once the processing completes
1373
 */
1374
function updateRawData(collection, oldRawData, newRawData, callback) {
1✔
1375
    collection.update(
3✔
1376
        oldRawData,
1377
        newRawData,
1378
        {
1379
            writeConcern: {
1380
                w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
3!
1381
            }
1382
        },
1383
        function(err) {
1384
            if (callback) {
3!
1385
                process.nextTick(callback.bind(null, err));
3✔
1386
            }
1387
        }
1388
    );
1389
}
1390

1391
/**
1392
 * Insert new raw data in the database
1393
 * @param collection The collection where the raw data is stored
1394
 * @param newRawData The new raw data received
1395
 * @param callback The callback to notify once the processing completes
1396
 */
1397
function insertRawData(collection, newRawData, callback) {
1✔
1398
    collection.insert(
107✔
1399
        newRawData,
1400
        {
1401
            writeConcern: {
1402
                w: !isNaN(sthConfig.WRITE_CONCERN) ? parseInt(sthConfig.WRITE_CONCERN, 10) : sthConfig.WRITE_CONCERN
107!
1403
            }
1404
        },
1405
        function(err) {
1406
            if (callback) {
107!
1407
                process.nextTick(callback.bind(null, err));
107✔
1408
            }
1409
        }
1410
    );
1411
}
1412

1413
/**
1414
 * Stores the raw data for a new event (attribute value)
1415
 * @param {object} data The data to be stored. It is an object including the following properties:
1416
 *  - {object} collection: The collection where the data should be stored in
1417
 *  - {date} recvTime The date the event arrived
1418
 *  - {string} entityId The entity id associated to updated attribute
1419
 *  - {string} entityType The entity type associated to the updated attribute
1420
 *  - {object} attribute The updated attribute
1421
 *  - {object} notificationInfo Information about the notification including the following properties:
1422
 *  - {object} updates The database entry the notification updates, if any
1423
 * @param {Function} callback Function to call once the operation completes
1424
 */
1425
function storeRawData(data, callback) {
1✔
1426
    const collection = data.collection;
110✔
1427
    const recvTime = data.recvTime;
110✔
1428
    const entityId = data.entityId;
110✔
1429
    const entityType = data.entityType;
110✔
1430
    const attribute = data.attribute;
110✔
1431
    const notificationInfo = data.notificationInfo;
110✔
1432

1433
    const timestamp = sthUtils.getAttributeTimestamp(attribute, recvTime);
110✔
1434

1435
    let newRawData;
110✔
1436
    switch (sthConfig.DATA_MODEL) {
110✔
1437
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
1438
            newRawData = {
68✔
1439
                recvTime: timestamp,
1440
                entityId,
1441
                entityType,
1442
                attrName: attribute.name,
1443
                attrType: attribute.type,
1444
                attrValue: attribute.value
1445
            };
1446
            break;
68✔
1447
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
1448
            newRawData = {
26✔
1449
                recvTime: timestamp,
1450
                attrName: attribute.name,
1451
                attrType: attribute.type,
1452
                attrValue: attribute.value
1453
            };
1454
            break;
26✔
1455
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
1456
            newRawData = {
16✔
1457
                recvTime: timestamp,
1458
                attrType: attribute.type,
1459
                attrValue: attribute.value
1460
            };
1461
            break;
16✔
1462
    }
1463

1464
    if (notificationInfo && notificationInfo.updates) {
110✔
1465
        // The raw data to store is a raw data update
1466
        updateRawData(collection, notificationInfo.updates, newRawData, callback);
3✔
1467
    } else {
1468
        insertRawData(collection, newRawData, callback);
107✔
1469
    }
1470
}
1471

1472
/**
1473
 * Returns the find() condition to use to get the new minimum and maximum values after some raw data update
1474
 * @param {object} data The data to be stored. It is an object including the following properties:
1475
 *  - {string} entityId The entity id associated to updated attribute
1476
 *  - {string} entityType The entity type associated to the updated attribute
1477
 *  - {object} attribute The updated attribute
1478
 *  - {Date} timestamp The timestamp associated to the updated raw data
1479
 * @param {string} The resolution for which the find() condition should be returned
1480
 * @return {object} The find() condition to use to get the new minimum and maximum values after some raw data update
1481
 */
1482
function getNewMinMaxCondition(data, resolution) {
1✔
1483
    const entityId = data.entityId;
30✔
1484
    const entityType = data.entityType;
30✔
1485
    const attrName = data.attribute.name;
30✔
1486
    const timestamp = data.timestamp;
30✔
1487

1488
    let findCondition;
30✔
1489
    switch (sthConfig.DATA_MODEL) {
30✔
1490
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
1491
            findCondition = {
18✔
1492
                entityId,
1493
                entityType,
1494
                attrName,
1495
                recvTime: {
1496
                    $gte: sthUtils.getOriginStart(timestamp, resolution),
1497
                    $lte: sthUtils.getOriginEnd(timestamp, resolution),
1498
                    $ne: timestamp
1499
                }
1500
            };
1501
            break;
18✔
1502
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
1503
            findCondition = {
6✔
1504
                attrName,
1505
                recvTime: {
1506
                    $gte: sthUtils.getOriginStart(timestamp, resolution),
1507
                    $lte: sthUtils.getOriginEnd(timestamp, resolution),
1508
                    $ne: timestamp
1509
                }
1510
            };
1511
            break;
6✔
1512
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
1513
            findCondition = {
6✔
1514
                recvTime: {
1515
                    $ne: timestamp,
1516
                    $gte: sthUtils.getOriginStart(timestamp, resolution),
1517
                    $lte: sthUtils.getOriginEnd(timestamp, resolution)
1518
                }
1519
            };
1520
            break;
6✔
1521
    }
1522
    return findCondition;
30✔
1523
}
1524

1525
/**
1526
 * Asynchronously returns the new maximum values for the resolutions of interest after some raw data update
1527
 * @param {object} data The data to be stored. It is an object including the following properties:
1528
 *  - {string} entityId The entity id associated to the updated attribute
1529
 *  - {string} entityType The entity type associated to the updated attribute
1530
 *  - {object} attribute The updated attribute
1531
 *  - {Date} timestamp The timestamp associated to the updated raw data
1532
 * @param callback The callback to notify in case of error or once the new maximum value has been calculated
1533
 */
1534
function getNewMaxValues(data, callback) {
1✔
1535
    function getNewMaxValue(data, resolution, callback) {
1✔
1536
        // prettier-ignore
1537
        data.collection
15✔
1538
            .find(getNewMinMaxCondition(data, resolution))
1539
            .sort({ attrValue: -1 })
1540
            .toArray(function(err, restArray) {
1541
                if (callback) {
15!
1542
                    return process.nextTick(
15✔
1543
                        callback.bind(null, err, restArray && restArray.length ? restArray[0].attrValue : null)
45✔
1544
                    );
1545
                }
1546
            });
1547
    }
1548
    const getNewMaxValue4Resolutions = {};
5✔
1549
    for (let i = 0; i < sthConfig.AGGREGATION_BY.length; i++) {
5✔
1550
        getNewMaxValue4Resolutions[sthConfig.AGGREGATION_BY[i]] = getNewMaxValue.bind(
15✔
1551
            null,
1552
            data,
1553
            sthConfig.AGGREGATION_BY[i]
1554
        );
1555
    }
1556
    async.parallel(getNewMaxValue4Resolutions, function(err, result) {
5✔
1557
        if (callback) {
5!
1558
            return process.nextTick(callback.bind(null, err, result));
5✔
1559
        }
1560
    });
1561
}
1562

1563
/**
1564
 * Asynchronously returns the new minimum values for the resolutions of interest after some raw data update
1565
 * @param {object} data The data to be stored. It is an object including the following properties:
1566
 *  - {string} entityId The entity id associated to the updated attribute
1567
 *  - {string} entityType The entity type associated to the updated attribute
1568
 *  - {object} attribute The updated attribute
1569
 *  - {Date} timestamp The timestamp associated to the updated raw data
1570
 * @param callback The callback to notify in case of error or once the new maximum value has been calculated
1571
 */
1572
function getNewMinValues(data, callback) {
1✔
1573
    function getNewMinValue(data, resolution, callback) {
1✔
1574
        // prettier-ignore
1575
        data.collection
15✔
1576
            .find(getNewMinMaxCondition(data, resolution))
1577
            .sort({ attrValue: -1 })
1578
            .toArray(function(err, restArray) {
1579
                if (callback) {
15!
1580
                    return process.nextTick(
15✔
1581
                        callback.bind(null, err, restArray && restArray.length ? restArray[0].attrValue : null)
45✔
1582
                    );
1583
                }
1584
            });
1585
    }
1586
    const getNewMinValue4Resolutions = {};
5✔
1587
    for (let i = 0; i < sthConfig.AGGREGATION_BY.length; i++) {
5✔
1588
        getNewMinValue4Resolutions[sthConfig.AGGREGATION_BY[i]] = getNewMinValue.bind(
15✔
1589
            null,
1590
            data,
1591
            sthConfig.AGGREGATION_BY[i]
1592
        );
1593
    }
1594
    async.parallel(getNewMinValue4Resolutions, function(err, result) {
5✔
1595
        if (callback) {
5!
1596
            return process.nextTick(callback.bind(null, err, result));
5✔
1597
        }
1598
    });
1599
}
1600

1601
/**
1602
 * Generates the notification info from the data received in the received notification and the result of checking
1603
 *  if the data has already been registered
1604
 * @param {object} data The data to be stored. It is an object including the following properties:
1605
 *  - {object} collection: The collection where the data should be stored in
1606
 *  - {date} recvTime The date the event arrived
1607
 *  - {string} entityId The entity id associated to the updated attribute
1608
 *  - {string} entityType The entity type associated to the updated attribute
1609
 *  - {object} attribute The updated attribute
1610
 * @param {object} searchResult The result of searching for the received raw data in the database
1611
 * @param {Function} callback Function to call once the operation completes
1612
 */
1613
function generateNotificationInfo(data, searchResult, callback) {
1✔
1614
    if (searchResult) {
65✔
1615
        if (_.isEqual(searchResult.attrValue, data.attribute.value)) {
19✔
1616
            // The notification is already registered
1617
            return process.nextTick(callback.bind(null, null, { exists: searchResult }));
10✔
1618
        }
1619
        // The notification is an already existent data update
1620
        if (sthUtils.getAggregationType(searchResult.attrValue) === sthConfig.AGGREGATIONS.NUMERIC) {
9✔
1621
            getNewMaxValues(data, function(err, newMaxValues) {
5✔
1622
                if (err && callback) {
5!
1623
                    return process.nextTick(callback.bind(null, err, { updates: searchResult }));
×
1624
                }
1625
                if (newMaxValues) {
5!
1626
                    getNewMinValues(data, function(err, newMinValues) {
5✔
1627
                        if (err && callback) {
5!
1628
                            return process.nextTick(callback.bind(null, err, { updates: searchResult }));
×
1629
                        }
1630
                        return process.nextTick(
5✔
1631
                            callback.bind(null, err, {
1632
                                updates: searchResult,
1633
                                newMinValues,
1634
                                newMaxValues
1635
                            })
1636
                        );
1637
                    });
1638
                } else {
1639
                    return process.nextTick(callback.bind(null, err, { updates: searchResult }));
×
1640
                }
1641
            });
1642
        } else {
1643
            return process.nextTick(callback.bind(null, null, { updates: searchResult }));
4✔
1644
        }
1645
    } else {
1646
        // The notification is an new data insertion
1647
        return process.nextTick(callback.bind(null, null, { inserts: true }));
46✔
1648
    }
1649
}
1650

1651
/**
1652
 * Returns information about the notification such as if the notification aims to insert new data,
1653
 *  update already existent data or if it corresponds to already registered data
1654
 * @param {object} data The data to be stored. It is an object including the following properties:
1655
 *  - {object} collection: The collection where the data should be stored in
1656
 *  - {date} recvTime The date the event arrived
1657
 *  - {string} entityId The entity id associated to the updated attribute
1658
 *  - {string} entityType The entity type associated to the updated attribute
1659
 *  - {object} attribute The updated attribute
1660
 * @param {Function} callback Function to call once the operation completes
1661
 */
1662
function getNotificationInfo(data, callback) {
1✔
1663
    const collection = data.collection;
65✔
1664
    const recvTime = data.recvTime;
65✔
1665
    const entityId = data.entityId;
65✔
1666
    const entityType = data.entityType;
65✔
1667
    const attribute = data.attribute;
65✔
1668

1669
    const timestamp = sthUtils.getAttributeTimestamp(attribute, recvTime);
65✔
1670
    data.timestamp = timestamp;
65✔
1671

1672
    let findCondition;
65✔
1673
    switch (sthConfig.DATA_MODEL) {
65✔
1674
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
1675
            findCondition = {
53✔
1676
                recvTime: timestamp,
1677
                entityId,
1678
                entityType,
1679
                attrName: attribute.name,
1680
                attrType: attribute.type
1681
            };
1682
            break;
53✔
1683
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
1684
            findCondition = {
6✔
1685
                recvTime: timestamp,
1686
                attrName: attribute.name,
1687
                attrType: attribute.type
1688
            };
1689
            break;
6✔
1690
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
1691
            findCondition = {
6✔
1692
                recvTime: timestamp,
1693
                attrType: attribute.type
1694
            };
1695
            break;
6✔
1696
    }
1697

1698
    collection.findOne(findCondition, function(err, result) {
65✔
1699
        if (err && callback) {
65!
1700
            return process.nextTick(callback.bind(null, err));
×
1701
        }
1702
        generateNotificationInfo(data, result, callback);
65✔
1703
    });
1704
}
1705

1706
/**
1707
 * Drops a collection
1708
 * @param {string} collectionName The name of the collection to drop
1709
 * @param {string} service The service associated to the collection to drop
1710
 * @param {function} callback The callback to call with error or the result of the operation
1711
 */
1712
function dropCollection(collectionName, service, callback) {
1✔
1713
    client.db(sthDatabaseNaming.getDatabaseName(service)).dropCollection(collectionName, callback);
4✔
1714
}
1715

1716
/**
1717
 * Returns the find condition to apply for data removal
1718
 * @param {object} data It is an object including the following properties:
1719
 *  - {boolean} aggregated Flag indicating if the collection to drop refers to a aggregated data one or
1720
 *      not (to a raw data one)
1721
 *  - {string} service The service associated to the collection to drop
1722
 *  - {string} servicePath The service path to the collection to drop
1723
 *  - {string} entityId The entity id to the collection to drop
1724
 *  - {string} entityType The entity type to the collection to drop
1725
 *  - {string} attrName The attribute name to the collection to drop
1726
 * @param {object} options It is an object including the following properties:
1727
 *  - {boolean} isAggregated Flag indicating if the operation refers to aggregated data. Raw data otherwise
1728
 * @return {object} The find condition to apply for data removal
1729
 */
1730
function getFindCondition4DataRemoval(data, options) {
1✔
1731
    const entityId = data.entityId;
8✔
1732
    const entityType = data.entityType;
8✔
1733
    const attrName = data.attrName;
8✔
1734
    const isAggregated = options.isAggregated;
8✔
1735
    let findCondition;
8✔
1736

1737
    switch (sthConfig.DATA_MODEL) {
8!
1738
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
1739
            if (isAggregated) {
8✔
1740
                findCondition = {
4✔
1741
                    '_id.entityId': entityId,
1742
                    '_id.entityType': entityType
1743
                };
1744
                if (attrName) {
4✔
1745
                    findCondition['_id.attrName'] = attrName;
2✔
1746
                }
1747
            } else {
1748
                findCondition = {
4✔
1749
                    entityId,
1750
                    entityType
1751
                };
1752
                if (attrName) {
4✔
1753
                    findCondition.attrName = attrName;
2✔
1754
                }
1755
            }
1756
            break;
8✔
1757
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
1758
            if (isAggregated) {
×
1759
                findCondition = {
×
1760
                    '_id.attrName': attrName
1761
                };
1762
            } else {
1763
                findCondition = {
×
1764
                    attrName
1765
                };
1766
            }
1767
            break;
×
1768
    }
1769
    return findCondition;
8✔
1770
}
1771

1772
/**
1773
 * Removes data from a collection according to the provided data and options
1774
 * @param {object} data It is an object including the following properties:
1775
 *  - {boolean} aggregated Flag indicating if the collection to drop refers to a aggregated data one or
1776
 *      not (to a raw data one)
1777
 *  - {string} service The service associated to the collection to drop
1778
 *  - {string} servicePath The service path to the collection to drop
1779
 *  - {string} entityId The entity id to the collection to drop
1780
 *  - {string} entityType The entity type to the collection to drop
1781
 *  - {string} attrName The attribute name to the collection to drop
1782
 * @param {object} options It is an object including the following properties:
1783
 *  - {boolean} isAggregated Flag indicating if the operation refers to aggregated data. Raw data otherwise
1784
 * @param callback The callback to call with possible errors and the result of the operation
1785
 */
1786
function removeDataFromCollection(data, options, callback) {
1✔
1787
    const service = data.service;
10✔
1788
    const servicePath = data.servicePath;
10✔
1789
    const entityId = data.entityId;
10✔
1790
    const entityType = data.entityType;
10✔
1791
    const isAggregated = options.isAggregated;
10✔
1792

1793
    getCollection(
10✔
1794
        {
1795
            service,
1796
            servicePath,
1797
            entityId,
1798
            entityType
1799
        },
1800
        {
1801
            isAggregated,
1802
            shouldCreate: false,
1803
            shouldTruncate: false
1804
        },
1805
        function(err, collection) {
1806
            if (err) {
10✔
1807
                if (callback) {
2!
1808
                    return process.nextTick(callback.bind(null, err));
2✔
1809
                }
1810
            } else {
1811
                collection.remove(
8✔
1812
                    getFindCondition4DataRemoval(data, options),
1813
                    {
1814
                        writeConcern: {
1815
                            w: !isNaN(sthConfig.WRITE_CONCERN)
8!
1816
                                ? parseInt(sthConfig.WRITE_CONCERN, 10)
1817
                                : sthConfig.WRITE_CONCERN
1818
                        }
1819
                    },
1820
                    callback
1821
                );
1822
            }
1823
        }
1824
    );
1825
}
1826

1827
/**
1828
 * Drops collections from the passed ones if applicable according to the passed data and options
1829
 * @param collections
1830
 * @param {object} data It is an object including the following properties:
1831
 *  - {boolean} aggregated Flag indicating if the collection to drop refers to a aggregated data one or
1832
 *      not (to a raw data one)
1833
 *  - {string} service The service associated to the collection to drop
1834
 *  - {string} servicePath The service path to the collection to drop
1835
 *  - {string} entityId The entity id to the collection to drop
1836
 *  - {string} entityType The entity type to the collection to drop
1837
 *  - {string} attrName The attribute name to the collection to drop
1838
 * @param {object} options It is an object including the following properties:
1839
 *  - {boolean} isAggregated Flag indicating if the operation refers to aggregated data. Raw data otherwise
1840
 * @param callback The callback to call with possible errors and the result of the operation
1841
 */
1842
function dropCollectionsFromList(collections, data, options, callback) {
1✔
1843
    const service = data.service;
4✔
1844
    const servicePath = data.servicePath;
4✔
1845
    const entityId = data.entityId;
4✔
1846
    const entityType = data.entityType;
4✔
1847
    const attrName = data.attrName;
4✔
1848
    const isAggregated = options.isAggregated;
4✔
1849
    const collectionNameOptions = {
4✔
1850
        service,
1851
        servicePath,
1852
        entityId,
1853
        entityType,
1854
        attrName
1855
    };
1856

1857
    const collectionName = isAggregated
4✔
1858
        ? sthDatabaseNaming.getAggregatedCollectionName(collectionNameOptions)
1859
        : sthDatabaseNaming.getRawCollectionName(collectionNameOptions);
1860

1861
    const dropCollectionFunctions = [];
4✔
1862
    collections.forEach(function(collection) {
4✔
1863
        if (
8!
1864
            sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH ||
8!
1865
            (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY && entityId) ||
1866
            (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY && attrName)
1867
        ) {
1868
            if (collection.collectionName === collectionName) {
8✔
1869
                dropCollectionFunctions.push(async.apply(dropCollection, collection.collectionName, service));
4✔
1870
            }
1871
        } else if (isAggregated) {
×
1872
            if (
×
1873
                collection.collectionName.indexOf('.aggr') >= 0 &&
×
1874
                collection.collectionName.indexOf(collectionName.slice(0, collectionName.indexOf('undefined'))) >= 0
1875
            ) {
1876
                dropCollectionFunctions.push(async.apply(dropCollection, collection.collectionName, service));
×
1877
            }
1878
        } else if (
×
1879
            collection.collectionName.indexOf('.aggr') < 0 &&
×
1880
            collection.collectionName.indexOf(collectionName.slice(0, collectionName.indexOf('undefined'))) >= 0
1881
        ) {
1882
            dropCollectionFunctions.push(async.apply(dropCollection, collection.collectionName, service));
×
1883
        }
1884
    });
1885

1886
    async.parallel(dropCollectionFunctions, callback);
4✔
1887
}
1888

1889
/**
1890
 * Removes data dropping the appropriate collections depending on the passed data and options
1891
 * @param {object} data It is an object including the following properties:
1892
 *  - {boolean} aggregated Flag indicating if the collection to drop refers to a aggregated data one or
1893
 *      not (to a raw data one)
1894
 *  - {string} service The service associated to the collection to drop
1895
 *  - {string} servicePath The service path to the collection to drop
1896
 *  - {string} entityId The entity id to the collection to drop
1897
 *  - {string} entityType The entity type to the collection to drop
1898
 *  - {string} attrName The attribute name to the collection to drop
1899
 * @param {object} options It is an object including the following properties:
1900
 *  - {boolean} isAggregated Flag indicating if the operation refers to aggregated data. Raw data otherwise
1901
 * @param callback The callback to call with possible errors and the result of the operation
1902
 */
1903
function removeDataDroppingCollections(data, options, callback) {
1✔
1904
    const service = data.service;
4✔
1905
    const databaseName = sthDatabaseNaming.getDatabaseName(service);
4✔
1906
    const database = client.db(databaseName);
4✔
1907

1908
    database.collections(function(err, collections) {
4✔
1909
        if (err) {
4!
1910
            if (callback) {
×
1911
                return process.nextTick(callback.bind(null, err));
×
1912
            }
1913
        } else {
1914
            dropCollectionsFromList(collections, data, options, callback);
4✔
1915
        }
1916
    });
1917
}
1918

1919
/**
1920
 * Removes the raw data associated to the attributes specified in the provided data
1921
 * @param {object} data It is an object including the following properties:
1922
 *  - {string} service The service
1923
 *  - {string} servicePath The service path
1924
 *  - {string} entityId The entity id
1925
 *  - {string} entityType The entity type
1926
 *  - {string} attrName The attribute name. In case no attribute name is provided, all the attributes of the
1927
 *      provided entity are removed
1928
 * @param callback The callback to call with error or the result of the operation
1929
 */
1930
function removeRawData(data, callback) {
1✔
1931
    if (
7✔
1932
        (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH &&
18!
1933
            (data.entityId || data.attrName)) ||
1934
        (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY && data.attrName)
1935
    ) {
1936
        removeDataFromCollection(data, { isAggregated: false }, callback);
5✔
1937
    } else {
1938
        removeDataDroppingCollections(data, { isAggregated: false }, callback);
2✔
1939
    }
1940
}
1941

1942
/**
1943
 * Removes the aggregated data associated to the attributes specified in the provided data
1944
 * @param {object} data It is an object including the following properties:
1945
 *  - {string} service The service
1946
 *  - {string} servicePath The service path
1947
 *  - {string} entityId The entity id
1948
 *  - {string} entityType The entity type
1949
 *  - {string} attrName The attribute name. In case no attribute name is provided, all the attributes of the
1950
 *      provided entity are removed
1951
 * @param callback The callback to call with error or the result of the operation
1952
 */
1953
function removeAggregatedData(data, callback) {
1✔
1954
    if (
7✔
1955
        (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH &&
18!
1956
            (data.entityId || data.attrName)) ||
1957
        (sthConfig.DATA_MODEL === sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY && data.attrName)
1958
    ) {
1959
        removeDataFromCollection(data, { isAggregated: true }, callback);
5✔
1960
    } else {
1961
        removeDataDroppingCollections(data, { isAggregated: true }, callback);
2✔
1962
    }
1963
}
1964

1965
/**
1966
 * Removes the attributes specified in the provided data
1967
 * @param {object} data It is an object including the following properties:
1968
 *  - {string} service The service
1969
 *  - {string} servicePath The service path
1970
 *  - {string} entityId The entity id
1971
 *  - {string} entityType The entity type
1972
 *  - {string} attrName The attribute name. In case no attribute name is provided, all the attributes of the
1973
 *    provided entity are removed
1974
 * @param callback The callback to call with error or the result of the operation
1975
 */
1976
function removeData(data, callback) {
1✔
1977
    const dataRemovalFunctions = [];
7✔
1978
    if (sthConfig.SHOULD_STORE === sthConfig.DATA_TO_STORE.BOTH) {
7!
1979
        dataRemovalFunctions.push(async.apply(removeRawData, data), async.apply(removeAggregatedData, data));
7✔
1980
    } else if (sthConfig.SHOULD_STORE === sthConfig.DATA_TO_STORE.ONLY_RAW) {
×
1981
        dataRemovalFunctions.push(async.apply(removeRawData, data));
×
1982
    } else if (sthConfig.SHOULD_STORE === sthConfig.DATA_TO_STORE.ONLY_AGGREGATED) {
×
1983
        dataRemovalFunctions.push(async.apply(removeAggregatedData, data));
×
1984
    }
1985
    async.parallel(dataRemovalFunctions, callback);
7✔
1986
}
1987

1988
module.exports = {
1✔
1989
    get driver() {
1990
        return mongoClient;
×
1991
    },
1992
    get connectionURL() {
1993
        return connectionURL;
×
1994
    },
1995
    get connection() {
1996
        return db;
492✔
1997
    },
1998
    get client() {
1999
        return client;
257✔
2000
    },
2001
    connect,
2002
    closeConnection,
2003
    getCollection,
2004
    getRawData,
2005
    getAggregatedData,
2006
    getAggregateUpdateCondition,
2007
    getAggregatePrepopulatedData,
2008
    storeAggregatedData,
2009
    storeAggregatedData4Resolution,
2010
    storeRawData,
2011
    getNotificationInfo,
2012
    removeData,
2013
    isAggregated
2014
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc