• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

telefonicaid / fiware-sth-comet / 9171759696

21 May 2024 08:50AM UTC coverage: 81.86% (-0.05%) from 81.909%
9171759696

push

github

web-flow
Merge pull request #637 from telefonicaid/dependabot/pip/test/acceptance/requests-2.32.0

Bump requests from 2.31.0 to 2.32.0 in /test/acceptance

1235 of 1626 branches covered (75.95%)

Branch coverage included in aggregate %.

2127 of 2481 relevant lines covered (85.73%)

141.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.39
/lib/database/model/sthDatabaseModelTool.js
1
/*
2
 * Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U
3
 *
4
 * This file is part of the Short Time Historic (STH) component
5
 *
6
 * STH is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the License,
9
 * or (at your option) any later version.
10
 *
11
 * STH is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
14
 * See the GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public
17
 * License along with STH.
18
 * If not, see http://www.gnu.org/licenses/.
19
 *
20
 * For those usages not covered by the GNU Affero General Public License
21
 * please contact with: [german.torodelvalle@telefonica.com]
22
 */
23

24
/* eslint-disable consistent-return */
25

26
const ROOT_PATH = require('app-root-path');
1✔
27
const fs = require('fs');
1✔
28
const async = require('async');
1✔
29
const events = require('events');
1✔
30
const sthLogger = require('logops');
1✔
31
const csvParser = require('csv-parser');
1✔
32
const sthDatabase = require(ROOT_PATH + '/lib/database/sthDatabase');
1✔
33
const sthDatabaseNaming = require(ROOT_PATH + '/lib/database/model/sthDatabaseNaming');
1✔
34
const sthDatabaseNameCodec = require(ROOT_PATH + '/lib/database/model/sthDatabaseNameCodec');
1✔
35
const STHWritableStream = require(ROOT_PATH + '/lib/database/model/sthDatabaseWritableStream');
1✔
36
const sthConfig = require(ROOT_PATH + '/lib/configuration/sthConfiguration');
1✔
37
const sthError = require(ROOT_PATH + '/lib/utils/sthError');
1✔
38

39
const DATABASE_CONNECTION_PARAMS = {
1✔
40
    authentication: sthConfig.DB_AUTHENTICATION,
41
    dbURI: sthConfig.DB_URI,
42
    replicaSet: sthConfig.REPLICA_SET,
43
    database: sthDatabaseNaming.getDatabaseName(sthConfig.DEFAULT_SERVICE),
44
    poolSize: sthConfig.POOL_SIZE,
45
    authSource: sthConfig.DB_AUTH_SOURCE
46
};
47

48
/**
49
 * Releases all the resources used by the application
50
 * @param  {Function} callback The callback
51
 */
52
function cleanResources(callback) {
1✔
53
    sthDatabase.closeConnection(callback);
×
54
}
55

56
/**
57
 * Returns all the databases managed by the MongoDB instance asynchronously
58
 * @param  {Object}   db       The database connection
59
 * @param  {Function} callback The callback
60
 */
61
function listDatabasesInfo(db, callback) {
1✔
62
    const adminDb = db.admin();
72✔
63
    adminDb.listDatabases(function(err, result) {
72✔
64
        if (!err) {
72!
65
            sthLogger.info(sthConfig.LOGGING_CONTEXT.DB_CONN_OPEN, 'Obtained %s databases', result.databases.length);
72✔
66
        }
67
        process.nextTick(callback.bind(null, err, result.databases));
72✔
68
    });
69
}
70

71
/**
72
 * Filter to only consider the databases with the configured database prefix
73
 * @param  {Object}   options      The options object including the following properties:
74
 *                                   - database: The database to restrict the analysis to
75
 *                                   - collection: The collection to restrict the analysis to
76
 * @param  {Object}   databaseData The data about the database
77
 * @param  {Function} callback     The callback
78
 */
79
function databaseFilter(options, databaseData, callback) {
1✔
80
    if (options && options.database) {
288!
81
        return process.nextTick(callback.bind(null, null, options.database === databaseData.name));
×
82
    }
83
    let decodedDatabaseName = databaseData.name;
288✔
84
    if (sthConfig.NAME_ENCODING) {
288!
85
        decodedDatabaseName = sthDatabaseNameCodec.decodeDatabaseName(databaseData.name);
×
86
    }
87
    return process.nextTick(callback.bind(null, null, decodedDatabaseName.indexOf(sthConfig.DB_PREFIX) !== -1));
288✔
88
}
89

90
/**
91
 * Filters the list of databases only to the related to the STH component
92
 * @param  {Object}   options  The options object including the following properties:
93
 *                               - database: The database to restrict the analysis to
94
 *                               - collection: The collection to restrict the analysis to
95
 * @param  {Array}   databases The database array returned by Node.js MongoDB Driver's listDatabases()
96
 * @param  {Function} callback The callback
97
 */
98
function filterDatabases(options, databases, callback) {
1✔
99
    async.filter(databases, async.apply(databaseFilter, options), callback);
72✔
100
}
101

102
/**
103
 * Returns the list of collections of a database asynchronously
104
 * @param  {Object}   database The database connection
105
 * @param  {Function} callback The callback
106
 */
107
function listCollections(database, callback) {
1✔
108
    database.collections(callback);
72✔
109
}
110

111
/**
112
 * Test to check if a collection includes documents with certain property
113
 * @param  {Object}   collection   The collection
114
 * @param  {String}   propertyName The property name
115
 * @param  {Function} callback     The callback
116
 */
117
function hasPropertyTest(collection, propertyName, callback) {
1✔
118
    const query = {};
552✔
119
    const excludes = propertyName.charAt(0) === '!';
552✔
120
    query[excludes ? propertyName.substr(1) : propertyName] = {
552✔
121
        $exists: true
122
    };
123
    collection.count(query, function(err, count) {
552✔
124
        process.nextTick(callback.bind(null, err, excludes ? count === 0 : count > 0));
552✔
125
    });
126
}
127

128
/**
129
 * Checks if the passed collection follows the collection per service path data model asynchronously
130
 * @param  {Object}   collection The collection
131
 * @param  {Function} callback   The callback
132
 */
133
function isCollectionPerServicePath(collection, callback) {
1✔
134
    if (sthDatabase.isAggregated(collection.s.namespace.collection)) {
68✔
135
        async.every(['_id.attrName', '_id.entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
136
    } else {
137
        async.every(['attrName', 'entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
138
    }
139
}
140

141
/**
142
 * Checks if the passed collection follows the collection per entity data model asynchronously
143
 * @param  {Object}   collection The collection
144
 * @param  {Function} callback   The callback
145
 */
146
function isCollectionPerEntity(collection, callback) {
1✔
147
    if (sthDatabase.isAggregated(collection.s.namespace.collection)) {
68✔
148
        async.every(['_id.attrName', '!_id.entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
149
    } else {
150
        async.every(['attrName', '!entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
151
    }
152
}
153

154
/**
155
 * Checks if the passed collection follows the collection per attribute data model asynchronously
156
 * @param  {Object}   collection The collection
157
 * @param  {Function} callback   The callback
158
 */
159
function isCollectionPerAttribute(collection, callback) {
1✔
160
    if (sthDatabase.isAggregated(collection.s.namespace.collection)) {
68✔
161
        async.every(['!_id.attrName', '!_id.entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
162
    } else {
163
        async.every(['!attrName', '!entityId'], hasPropertyTest.bind(null, collection), callback);
34✔
164
    }
165
}
166

167
/**
168
 * Tests the data model used in a collection
169
 * @param  {Object}   collection The collection
170
 * @param  {String}   dataModel  The data model
171
 * @param  {Function} callback   The callback
172
 */
173
function dataModelTest(collection, dataModel, callback) {
1✔
174
    switch (dataModel) {
204✔
175
        case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
176
            isCollectionPerServicePath(collection, callback);
68✔
177
            break;
68✔
178
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
179
            isCollectionPerEntity(collection, callback);
68✔
180
            break;
68✔
181
        case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
182
            isCollectionPerAttribute(collection, callback);
68✔
183
            break;
68✔
184
    }
185
}
186

187
/**
188
 * Returns the data model used in the passed collection asynchronously
189
 * @param  {Object}   collection The collection
190
 * @param  {Function} callback   The callback
191
 */
192
function getDataModel(collection, callback) {
1✔
193
    async.detect(
68✔
194
        [
195
            sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH,
196
            sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY,
197
            sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE
198
        ],
199
        async.apply(dataModelTest, collection),
200
        callback
201
    );
202
}
203

204
/**
205
 * Filters collections to those having the same data model as the currently configured one
206
 * @param  {Object}   options    The options object including the following properties:
207
 *                                 - database: The database to restrict the analysis to
208
 *                                 - collection: The collection to restrict the analysis to
209
 * @param  {Object}   collection The collection
210
 * @param  {Function} callback   The callback
211
 */
212
function collectionFilter(options, collection, callback) {
1✔
213
    if (
72!
214
        (options && options.collection && options.collection !== collection.s.namespace.collection) ||
144!
215
        collection.s.namespace.collection.indexOf('system.') === 0
216
    ) {
217
        return process.nextTick(callback.bind(null, null, true));
×
218
    }
219

220
    collection.count(function(err, count) {
72✔
221
        if (err) {
72!
222
            return process.nextTick(callback.bind(null, err));
×
223
        }
224
        if (count > 0) {
72!
225
            if (sthDatabase.isAggregated(collection.s.namespace.collection)) {
72✔
226
                switch (sthConfig.DATA_MODEL) {
36✔
227
                    case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
228
                        async.every(['_id.entityId', '_id.attrName'], hasPropertyTest.bind(null, collection), callback);
12✔
229
                        break;
12✔
230
                    case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
231
                        async.every(
12✔
232
                            ['!_id.entityId', '_id.attrName'],
233
                            hasPropertyTest.bind(null, collection),
234
                            callback
235
                        );
236
                        break;
12✔
237
                    case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
238
                        async.every(
12✔
239
                            ['!_id.entityId', '!_id.attrName'],
240
                            hasPropertyTest.bind(null, collection),
241
                            callback
242
                        );
243
                        break;
12✔
244
                }
245
            } else {
246
                switch (sthConfig.DATA_MODEL) {
36✔
247
                    case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
248
                        async.every(['entityId', 'attrName'], hasPropertyTest.bind(null, collection), callback);
12✔
249
                        break;
12✔
250
                    case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
251
                        async.every(['!entityId', 'attrName'], hasPropertyTest.bind(null, collection), callback);
12✔
252
                        break;
12✔
253
                    case sthConfig.DATA_MODELS.COLLECTION_PER_ATTRIBUTE:
254
                        async.every(['!entityId', '!attrName'], hasPropertyTest.bind(null, collection), callback);
12✔
255
                        break;
12✔
256
                }
257
            }
258
        } else {
259
            return process.nextTick(callback.bind(null, null, true));
×
260
        }
261
    });
262
}
263

264
/**
265
 * Filters the collections to include only the ones created for a data mode distinct from the one the STH component
266
 *  is currently configured
267
 * @param  {Object}   options     The options object including the following properties:
268
 *                                  - database: The database to restrict the analysis to
269
 *                                  - collection: The collection to restrict the analysis to
270
 * @param  {Object}   collections The collections
271
 * @param  {Function} callback    The callback
272
 */
273
function filterCollections(options, collections, callback) {
1✔
274
    async.reject(collections, async.apply(collectionFilter, options), callback);
72✔
275
}
276

277
/**
278
 * Maps a collection to its name and data model
279
 * @param  {Object}   collection The collection
280
 * @param  {Function} callback   The callback
281
 */
282
function collectionMapper(collection, callback) {
1✔
283
    const mappedCollection = {};
48✔
284
    getDataModel(collection, function(err, result) {
48✔
285
        mappedCollection.collectionName = collection.s.namespace.collection;
48✔
286
        mappedCollection.dataModel = result;
48✔
287
        process.nextTick(callback.bind(null, null, mappedCollection));
48✔
288
    });
289
}
290

291
/**
292
 * Generates the analysis report including the collections of each database which need migration
293
 * @param  {String}   databaseName          The database name
294
 * @param  {Array}   migratableCollections  The array of collections needing migration
295
 * @param  {Function} callback              The callback
296
 */
297
function generateReport(databaseName, migratableCollections, callback) {
1✔
298
    async.waterfall(
72✔
299
        [
300
            async.map.bind(null, migratableCollections, collectionMapper),
301
            function(mappedMigratableCollections, callback) {
302
                const report = {};
72✔
303
                report.databaseName = databaseName;
72✔
304
                report.collections2Migrate = mappedMigratableCollections;
72✔
305
                process.nextTick(callback.bind(null, null, report));
72✔
306
            }
307
        ],
308
        callback
309
    );
310
}
311

312
/**
313
 * Includes the current data model in the generated report
314
 * @param  {Object}   report   The data model analysis report
315
 * @param  {Function} callback The callback
316
 */
317
function includeCurrentDataModel(report, callback) {
1✔
318
    const finalReport = {};
72✔
319
    finalReport.currentDataModel = sthConfig.DATA_MODEL;
72✔
320
    finalReport.result = report;
72✔
321
    process.nextTick(callback.bind(null, null, finalReport));
72✔
322
}
323

324
/**
325
 * Analyses the passed database's collections data models
326
 * @param  {Object}   options      The options object including the following properties:
327
 *                                   - database: The database to restrict the analysis to
328
 *                                   - collection: The collection to restrict the analysis to
329
 * @param  {Object}   databaseData The data about the database
330
 * @param  {Function} callback     The callback
331
 */
332
function analyseDatabase(options, database, callback) {
1✔
333
    const db = sthDatabase.client.db(database.name);
72✔
334
    async.waterfall(
72✔
335
        [
336
            async.apply(listCollections, db),
337
            async.apply(filterCollections, options),
338
            async.apply(generateReport, database.name)
339
        ],
340
        callback
341
    );
342
}
343

344
/**
345
 * Analyses a collection of databases' collections data models
346
 * @param  {Object}   options  The options object including the following properties:
347
 *                               - database: The database to restrict the analysis to
348
 *                               - collection: The collection to restrict the analysis to
349
 * @param  {Array}   databases The collection of databases to analyse
350
 * @param  {Function} callback The callback
351
 */
352
function analyseDatabases(options, databases, callback) {
1✔
353
    async.map(databases, async.apply(analyseDatabase, options), callback);
72✔
354
}
355

356
/**
357
 * Returns a connection to the database asynchronously
358
 * @param  {Object}   params   The database connection parameters
359
 * @param  {Function} callback The callback
360
 */
361
function getDatabaseConnection(params, callback) {
1✔
362
    if (sthDatabase.connection) {
92!
363
        process.nextTick(callback.bind(null, null, sthDatabase.connection));
92✔
364
    } else {
365
        sthDatabase.connect(
×
366
            params,
367
            callback
368
        );
369
    }
370
}
371

372
/**
373
 * Returns the result of the analysis of the available databases' collections data models
374
 * @param  {Object}   options  The options object including the following properties:
375
 *                               - database: The database to restrict the analysis to
376
 *                               - collection: The collection to restrict the analysis to
377
 * @param  {Function} callback The callback
378
 */
379
function getDataModelAnalysis(options, callback) {
1✔
380
    if (typeof options === 'function' && !callback) {
72!
381
        callback = options;
72✔
382
        options = undefined;
72✔
383
    }
384
    async.waterfall(
72✔
385
        [
386
            async.apply(getDatabaseConnection, DATABASE_CONNECTION_PARAMS),
387
            listDatabasesInfo,
388
            async.apply(filterDatabases, options),
389
            async.apply(analyseDatabases, options),
390
            includeCurrentDataModel
391
        ],
392
        callback
393
    );
394
}
395

396
/**
397
 * Checks if the passed collection exists in certain database asynchronously
398
 * @param  {String}   databaseName   The database name
399
 * @param  {String}   collectionName The collection name
400
 * @param  {Function} callback       The callback
401
 */
402
function collectionExists(databaseName, collectionName, callback) {
1✔
403
    const db = sthDatabase.client.db(databaseName);
32✔
404
    db.listCollections({ name: collectionName }).toArray(function(err, items) {
32✔
405
        process.nextTick(callback.bind(null, err, !!items.length));
32✔
406
    });
407
}
408

409
/**
410
 * Returns asynchronously the data model of a collection from its name and the database it belongs to
411
 * @param  {String}   databaseName       The database name
412
 * @param  {String}   collectionName     The collection name
413
 * @param  {Object}   databaseConnection The database connection
414
 * @param  {Function} callback           The callback
415
 */
416
function getDataModelFromName(databaseName, collectionName, callback) {
1✔
417
    const db = sthDatabase.client.db(databaseName);
20✔
418
    async.waterfall([async.apply(db.collection.bind(db), collectionName), getDataModel], callback);
20✔
419
}
420

421
/**
422
 * Returns the target collection name asynchronously
423
 * @param  {Object} originCollectionParams The origin collection params
424
 */
425
function getTargetCollectionName(originCollectionParams) {
1✔
426
    if (originCollectionParams.isAggregated) {
20✔
427
        return sthDatabaseNaming.getAggregatedCollectionName(originCollectionParams);
10✔
428
    }
429
    return sthDatabaseNaming.getRawCollectionName(originCollectionParams);
10✔
430
}
431

432
/**
433
 * Inferes the data associated to a collection from its name, if possible
434
 * @param  {String}   databaseName         The database name
435
 * @param  {String}   originCollectionName The collectin name
436
 * @param  {Function} callback             The callback
437
 */
438
function infereTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, callback) {
1✔
439
    const isAggregated = sthDatabase.isAggregated(originCollectionName);
20✔
440
    const decodedOriginCollectionName = sthConfig.NAME_ENCODING
20!
441
        ? sthDatabaseNameCodec.decodeCollectionName(originCollectionName)
442
        : originCollectionName;
443
    const cleanedOriginCollectionName = decodedOriginCollectionName.substring(
20✔
444
        sthConfig.COLLECTION_PREFIX.length,
445
        decodedOriginCollectionName.length - (isAggregated ? '.aggr'.length : 0)
20✔
446
    );
447
    const cleanedOriginCollectionNameSplit = cleanedOriginCollectionName.split(sthConfig.NAME_SEPARATOR);
20✔
448
    let servicePath;
20✔
449
    let originCollectionNameParams;
20✔
450
    switch (cleanedOriginCollectionNameSplit.length) {
20!
451
        case 3:
452
        case 2:
453
            servicePath = cleanedOriginCollectionNameSplit[0];
20✔
454
            originCollectionNameParams = {
20✔
455
                databaseName,
456
                service: sthDatabaseNaming.getService(databaseName),
457
                servicePath,
458
                isAggregated
459
            };
460
            return process.nextTick(
20✔
461
                callback.bind(null, null, {
462
                    databaseName,
463
                    service: sthDatabaseNaming.getService(databaseName),
464
                    servicePath,
465
                    entityId: cleanedOriginCollectionNameSplit[1],
466
                    entityType: cleanedOriginCollectionNameSplit[2],
467
                    collectionName: getTargetCollectionName(originCollectionNameParams),
468
                    isAggregated
469
                })
470
            );
471
        default:
472
            return process.nextTick(
×
473
                callback.bind(null, new sthError.CollectionDataInferenceError(databaseName, originCollectionName))
474
            );
475
    }
476
}
477

478
/**
479
 * Returns asynchronously the target collection data for a collection-per-entity to collection-per-service-path
480
 *  migration
481
 * @param  {String}   databaseName         The database name
482
 * @param  {String}   originCollectionName The origin collection name
483
 * @param  {String}   dictionary           The file name where the dictionary of collection names to collection
484
 *                                           associated data is stored
485
 * @param  {Function} callback             The callback
486
 */
487
function getTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, dictionary, callback) {
1✔
488
    let collectionFound = false;
20✔
489
    if (dictionary) {
20!
490
        try {
×
491
            const dictionaryStream = fs.createReadStream(dictionary);
×
492
            dictionaryStream.on('error', function() {
×
493
                return infereTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, callback);
×
494
            });
495
            dictionaryStream
×
496
                .pipe(
497
                    csvParser({
498
                        headers: [
499
                            'collectionName',
500
                            'servicePath',
501
                            'entityId',
502
                            'entityType',
503
                            'attributeName',
504
                            'attributeType'
505
                        ]
506
                    })
507
                )
508
                .on('data', function(data) {
509
                    if (data.collectionName === originCollectionName) {
×
510
                        collectionFound = true;
×
511
                        const originCollectionNameParams = {
×
512
                            databaseName,
513
                            service: sthDatabaseNaming.getService(databaseName),
514
                            servicePath: data.servicePath,
515
                            isAggregated: sthDatabase.isAggregated(originCollectionName)
516
                        };
517
                        const targetCollectionName = getTargetCollectionName(originCollectionNameParams);
×
518
                        return process.nextTick(
×
519
                            callback.bind(null, null, {
520
                                databaseName,
521
                                service: sthDatabaseNaming.getService(databaseName),
522
                                servicePath: data.servicePath,
523
                                entityId: data.entityId,
524
                                entityType: data.entityType,
525
                                collectionName: targetCollectionName,
526
                                isAggregated: sthDatabase.isAggregated(targetCollectionName)
527
                            })
528
                        );
529
                    }
530
                })
531
                .on('end', function() {
532
                    if (!collectionFound) {
×
533
                        infereTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, callback);
×
534
                    }
535
                });
536
        } catch (err) {
537
            infereTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, callback);
×
538
        }
539
    } else {
540
        infereTargetCollectionDataFromCpE2CpSP(databaseName, originCollectionName, callback);
20✔
541
    }
542
}
543

544
/**
545
 * Transforms a document from a collection-per-entity data model to a collection-per-service-path data model collection
546
 * @param  {Object} originCollection     The origin collection
547
 * @param  {Object} targetCollectionData The data about the target collection
548
 * @param  {Object} doc                  The document to transform
549
 */
550
function transformFromCpE2CpSP(originCollection, targetCollectionData, doc) {
1✔
551
    if (sthDatabase.isAggregated(originCollection.s.namespace.collection)) {
32✔
552
        doc._id.entityId = targetCollectionData.entityId;
24✔
553
        doc._id.entityType = targetCollectionData.entityType;
24✔
554
    } else {
555
        doc.entityId = targetCollectionData.entityId;
8✔
556
        doc.entityType = targetCollectionData.entityType;
8✔
557
    }
558
    return doc;
32✔
559
}
560

561
/**
562
 * Pipes a collection-per-entity data model collection to a collection-per-service-path data model collection once
563
 *  the document count of the original collection is available
564
 * @param  {Object}   params      Object enclosing the following params taken by this function as input:
565
 *                                  - collectionMigrationProgress: EventEmitter to notify progress events
566
 *                                  - databaseName: The database name
567
 *                                  - originCollectionName: The origin collection name
568
 *                                  - targetCollectionData: The target collection data
569
 *                                  - originCollection: The origin collection
570
 * @param  {Object}   options     The migration options object including the following properties:
571
 *                                  - removeCollection: Boolean indicating if the origin collection should be
572
 *                                                        removed after the migration
573
 *                                  - updateCollection: Boolean indicating if the migration progress should
574
 *                                                        take place even if the target collection already
575
 *                                                        exists
576
 * @param  {Number}   originCount The origin collection count
577
 * @param  {Function} callback    The callback
578
 */
579
function doPipeFromCpE2CpSP(params, options, originCount, callback) {
1✔
580
    const sthWritableStream = new STHWritableStream(
16✔
581
        sthDatabase.client,
582
        params.databaseName,
583
        params.targetCollectionData.collectionName,
584
        originCount
585
    );
586
    params.originCollection
16✔
587
        .find({})
588
        .stream({
589
            transform: transformFromCpE2CpSP.bind(null, params.originCollection, params.targetCollectionData)
590
        })
591
        .pipe(sthWritableStream)
592
        .on('progress', params.collectionMigrationProgress.emit.bind(params.collectionMigrationProgress, 'progress'))
593
        .on('finish', callback);
594
}
595

596
/**
597
 * Pipes a collection-per-entity data model collection to a collection-per-service-path data model collection
598
 * @param  {Object}   params           Object enclosing the following params taken by this function as input:
599
 *                                       - collectionMigrationProgress EventEmitter to notify progress events
600
 *                                       - databaseName The database name
601
 *                                       - originCollectionName: The origin collection name
602
 *                                       - targetCollectionData: The target collection data
603
 * @param  {Object}   options          The migration options object including the following properties:
604
 *                                       - removeCollection: Boolean indicating if the origin collection should be
605
 *                                                             removed after the migration
606
 *                                       - updateCollection: Boolean indicating if the migration progress should
607
 *                                                             take place even if the target collection already
608
 *                                                             exists
609
 * @param  {Object}   originCollection The origin collection
610
 * @param  {Function} callback         The callback
611
 */
612
function pipeFromCpE2CpSP(params, options, originCollection, callback) {
1✔
613
    params.originCollection = originCollection;
16✔
614
    async.waterfall(
16✔
615
        [originCollection.count.bind(originCollection), async.apply(doPipeFromCpE2CpSP, params, options)],
616
        callback
617
    );
618
}
619

620
/**
621
 * Migrates a collection from the collection-per-entity data model to the collection-per-service-path data model
622
 * @param  {Object}   params               Object enclosing the following params taken by this function as input:
623
 *                                           - collectionMigrationProgress: EventEmitter to notify progress events
624
 *                                           - databaseName: The database name
625
 *                                           - originCollectionName: The origin collection name
626
 * @param  {Object}   options              The migration options object including the following properties:
627
 *                                           - removeCollection: Boolean indicating if the origin collection should be
628
 *                                                                 removed after the migration
629
 *                                           - updateCollection: Boolean indicating if the migration progress should
630
 *                                                                 take place even if the target collection already
631
 *                                                                 exists
632
 * @param  {Object}   targetCollectionData The data about the target collection
633
 * @param  {Function} callback             The callback
634
 */
635
function doMigrateFromCpE2CpSP(params, options, targetCollectionData, callback) {
1✔
636
    const db = sthDatabase.client.db(params.databaseName);
16✔
637
    const functions = [];
16✔
638
    params.targetCollectionData = targetCollectionData;
16✔
639
    functions.push(
16✔
640
        async.apply(db.collection.bind(db), params.originCollectionName),
641
        async.apply(pipeFromCpE2CpSP, params, options)
642
    );
643
    async.waterfall(functions, callback);
16✔
644
}
645

646
/**
647
 * Checks that the new collection data is valid
648
 * @param  {String}   databaseName         The database name
649
 * @param  {Object}   options              The migration options object including the following properties:
650
 *                                           - removeCollection: Boolean indicating if the origin collection should be
651
 *                                                                 removed after the migration
652
 *                                           - updateCollection: Boolean indicating if the migration progress
653
 *                                                                 should take place even if the target collection
654
 *                                                                 already exists
655
 * @param  {Object}   targetCollectionData The data about the target collection
656
 * @param  {Function} callback             The callback
657
 */
658
function targetCollectionDataCheck(databaseName, options, targetCollectionData, callback) {
1✔
659
    if (!options.updateCollection) {
20✔
660
        collectionExists(databaseName, targetCollectionData.collectionName, function(err, result) {
12✔
661
            if (err) {
12!
662
                return process.nextTick(callback.bind(null, err));
×
663
            } else if (result) {
12✔
664
                process.nextTick(
4✔
665
                    callback.bind(
666
                        null,
667
                        new sthError.CollectionExistsError(databaseName, targetCollectionData.collectionName)
668
                    )
669
                );
670
            } else {
671
                return process.nextTick(callback.bind(null, null, targetCollectionData));
8✔
672
            }
673
        });
674
    } else {
675
        return process.nextTick(callback.bind(null, null, targetCollectionData));
8✔
676
    }
677
}
678

679
/**
680
 * Migrates a collection of certain database from the collection-per-entity data model to the
681
 *  collection-per-service data model
682
 * @param  {Object}   params   The migration paramaters including the following properties:
683
 *                               - collectionMigrationProgress: EventEmitter to notify progress events
684
 *                               - databaseName: The database name
685
 *                               - originCollectionName: The collection name
686
 * @param  {Object}   options  The migration options object including the following properties:
687
 *                               - dictionary:       A dictionary to resolve the collection name to the
688
 *                                                     associated data (service path, entity id, entity type,
689
 *                                                     attribute name and attribute type)
690
 *                               - removeCollection: Boolean indicating if the origin collection should be
691
 *                                                     removed after the migration
692
 *                               - updateCollection: Boolean indicating if the migration progress should take place
693
 *                                                     even if the target collection already exists
694
 * @param  {Function} callback The callback
695
 */
696
function migrateFromCpE2CpSP(params, options, callback) {
1✔
697
    async.waterfall(
20✔
698
        [
699
            async.apply(
700
                getTargetCollectionDataFromCpE2CpSP,
701
                params.databaseName,
702
                params.originCollectionName,
703
                options.dictionary
704
            ),
705
            async.apply(targetCollectionDataCheck, params.databaseName, options),
706
            async.apply(doMigrateFromCpE2CpSP, params, options)
707
        ],
708
        callback
709
    );
710
}
711

712
/**
713
 * Migrates a collection from its current data model to the currently configured one
714
 * @param  {Object}   params          The migration paramaters including the following properties:
715
 *                                      - collectionMigrationProgress: EventEmitter to notify progress events
716
 *                                      - databaseName: The database name
717
 *                                      - originCollectionName: The collection name
718
 * @param  {Object}   options         The migration options object including the following properties:
719
 *                                      - dictionary:       A dictionary to resolve the collection name to the
720
 *                                                            associated data (service path, entity id, entity type,
721
 *                                                            attribute name and attribute type)
722
 *                                      - removeCollection: Boolean indicating if the origin collection should be
723
 *                                                            removed after the migration
724
 *                                      - updateCollection: Boolean indicating if the migration progress should take
725
 *                                                            place even if the target collection already exists
726
 * @param  {String}   originDataModel The data model
727
 * @param  {Function} callback        The callback
728
 */
729
function migrate2DataModel(params, options, originDataModel, callback) {
1✔
730
    switch (originDataModel) {
20!
731
        case sthConfig.DATA_MODELS.COLLECTION_PER_ENTITY:
732
            switch (sthConfig.DATA_MODEL) {
20!
733
                case sthConfig.DATA_MODELS.COLLECTION_PER_SERVICE_PATH:
734
                    migrateFromCpE2CpSP(params, options, callback);
20✔
735
                    break;
20✔
736
                default:
737
                    return process.nextTick(
×
738
                        callback.bind(
739
                            null,
740
                            new sthError.NotSupportedMigration(
741
                                params.databaseName,
742
                                params.originCollectionName,
743
                                originDataModel,
744
                                sthConfig.DATA_MODEL
745
                            )
746
                        )
747
                    );
748
            }
749
            break;
20✔
750
        default:
751
            return process.nextTick(
×
752
                callback.bind(
753
                    null,
754
                    new sthError.NotSupportedMigration(
755
                        params.databaseName,
756
                        params.originCollectionName,
757
                        originDataModel,
758
                        sthConfig.DATA_MODEL
759
                    )
760
                )
761
            );
762
    }
763
}
764

765
/**
766
 * Checks if a collection exists
767
 * @param  {String}   databaseName       The database name
768
 * @param  {String}   collectionName     The collection name
769
 * @param  {Object}   databaseConnection The connection to the database
770
 * @param  {Function} callback           The callback
771
 */
772
function originCollectionCheck(databaseName, collectionName, databaseConnection, callback) {
1✔
773
    let error;
20✔
774
    collectionExists(databaseName, collectionName, function(err, collectionExistsResult) {
20✔
775
        if (err) {
20!
776
            return process.nextTick(callback.bind(null, err));
×
777
        }
778
        if (!collectionExistsResult) {
20!
779
            error = new sthError.NotExistentCollectionError(databaseName, collectionName);
×
780
        }
781
        process.nextTick(callback.bind(null, error));
20✔
782
    });
783
}
784

785
/**
786
 * Drops a collection from a database
787
 * @param  {String}   databaseName   The database name
788
 * @param  {String}   collectionName The collection name
789
 * @param   {Object}  options        The migration options object including the following properties:
790
 *                                     - removeCollection: Boolean indicating if the origin collection should be removed
791
 *                                                           after the migration
792
 *                                     - updateCollection: Boolean indicating if the migration progress
793
 *                                                           should take place even if the target collection already
794
 *                                                           exists
795
 * @param  {Function} callback       The callback
796
 */
797
function dropCollection(databaseName, collectionName, options, callback) {
1✔
798
    if (options.removeCollection) {
16✔
799
        const db = sthDatabase.client.db(databaseName);
8✔
800
        db.dropCollection(collectionName, callback);
8✔
801
    } else {
802
        process.nextTick(callback);
8✔
803
    }
804
}
805

806
/**
807
 * Migrates a collection from its current data model to the configured one
808
 * @param   {String}       databaseName   The database name
809
 * @param   {String}       collectionName The collection name
810
 * @param   {Object}       options        The migration options object including the following properties:
811
 *                                          - dictionary:       A dictionary to resolve the collection name to the
812
 *                                                                associated data (service path, entity id, entity type,
813
 *                                                                attribute name and attribute type)
814
 *                                          - removeCollection: Boolean indicating if the origin collection should be
815
 *                                                                removed after the migration
816
 *                                          - updateCollection: Boolean indicating if the migration progress
817
 *                                                                should take place even if the target collection
818
 *                                                                already exists
819
 * @param   {Function}     callback       The callback
820
 * @returns {EventEmitter}                An EventEmitter instance to notify the progress of the migration process
821
 */
822
function migrateCollection(databaseName, collectionName, options, callback) {
1✔
823
    const collectionMigrationProgress = new events.EventEmitter();
20✔
824

825
    if (typeof options === 'function') {
20✔
826
        callback = options;
4✔
827
        options = {
4✔
828
            removeCollection: false,
829
            updateCollection: false
830
        };
831
    }
832

833
    async.waterfall(
20✔
834
        [
835
            async.apply(getDatabaseConnection, DATABASE_CONNECTION_PARAMS),
836
            async.apply(originCollectionCheck, databaseName, collectionName),
837
            async.apply(getDataModelFromName, databaseName, collectionName),
838
            async.apply(
839
                migrate2DataModel,
840
                {
841
                    collectionMigrationProgress,
842
                    databaseName,
843
                    originCollectionName: collectionName
844
                },
845
                options
846
            ),
847
            async.apply(dropCollection, databaseName, collectionName, options)
848
        ],
849
        callback
850
    );
851
    return collectionMigrationProgress;
20✔
852
}
853

854
module.exports = {
1✔
855
    cleanResources,
856
    getDataModelAnalysis,
857
    migrateCollection
858
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc