• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

silvermine / dynamodb-table-sync / 9819479197

06 Jul 2024 01:03PM UTC coverage: 0.0%. Remained the same
9819479197

Pull #26

github

web-flow
Merge 7ef89f21c into 80edaf9a1
Pull Request #26: feat: Add support for synchronizing to a local slave instance

0 of 132 branches covered (0.0%)

Branch coverage included in aggregate %.

0 of 18 new or added lines in 2 files covered. (0.0%)

157 existing lines in 2 files now uncovered.

0 of 287 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Synchronizer.js
1
'use strict';
2

3
var readline = require('readline'),
×
4
    _ = require('underscore'),
×
5
    Q = require('q'),
×
6
    AWS = require('aws-sdk'),
×
7
    Class = require('class.extend'),
×
8
    Counter = require('./lib/Counter'),
×
9
    REPLICATION_FIELDS = [ 'aws:rep:updateregion', 'aws:rep:updatetime', 'aws:rep:deleting' ];
×
10

11
module.exports = Class.extend({
×
12

13
   /**
14
    * The possible options supplied in `opts` are:
15
    *
16
    * ```
17
    * {
18
    *    writeMissing: false,
19
    *    writeDiffering: false,
20
    *    scanForExtra: false,
21
    *    deleteExtra: false,
22
    *    ignoreAtts: [ 'attributes', 'to', 'ignore' ],
23
    *    startingKey: { hashKey: 'abc', rangeKey: 'xyz' },
24
    *    scanLimit: 100,
25
    *    batchReadLimit: 50,
26
    *    maxRetries: 10,
27
    *    retryDelayBase: 50,
28
    *    parallel: 4,
29
    *    slaveCredentials: AWSCredentialsProvider(...),
30
    * }
31
    * ```
32
    *
33
    * @class Synchronizer
34
    * @param master {object} table definition { region: 'region', name: 'table-name' }
35
    * @param slaves {object[]} array of table definitions
36
    * @param opts {object} options to determine how this class operates
37
    */
38
   init: function(master, slaves, opts) {
39
      this._opts = _.extend({ batchReadLimit: 50, maxRetries: 10, retryDelayBase: 50 }, opts);
×
40

41
      this._master = _.extend({}, master, { id: (master.region + ':' + master.name), docs: this._makeDocClient(master) });
×
42

43
      this._slaves = _.map(slaves, function(def) {
×
44
         var client;
45

NEW
46
         if (opts.localhostTarget) {
×
NEW
UNCOV
47
            client = this._makeLocalDocClient(def, opts.localhostTarget);
×
48
         } else {
NEW
49
            client = this._makeDocClient(def, opts.slaveCredentials);
×
50
         }
51

NEW
52
         return _.extend({}, def, { id: (def.region + ':' + def.name), docs: client });
×
53
      }.bind(this));
54

UNCOV
55
      this._abortScanning = false;
×
56

UNCOV
57
      this._stats = _.reduce(this._slaves, function(stats, slave) {
×
UNCOV
58
         stats[slave.id] = { extra: 0, sameAs: 0, differing: 0, missing: 0 };
×
UNCOV
59
         return stats;
×
60
      }, {});
61
   },
62

63
   /**
64
    * Main entry function that runs a complete synchronization (regardless of whether that
65
    * is a "dry run" or an actual sync job that will write/delete items).
66
    *
67
    * The statistics that are returned contain stats for each slave table, comparing it to
68
    * the master, and are in the format:
69
    *
70
    * ```
71
    * {
72
    *    'region-1:table': {
73
    *       extra: 0, // number of items only in the slave, not in the master
74
    *       sameAs: 0, // number of items that matched
75
    *       differing: 0, // number of items the slave had, but that differed
76
    *       missing: 0, // number of items the slave was missing
77
    *    },
78
    *    'region-2:table': {
79
    *       extra: 0,
80
    *       sameAs: 0,
81
    *       differing: 0,
82
    *       missing: 0,
83
    *    },
84
    * }
85
    * ```
86
    *
87
    * @return {object} statistics on what the operation completed
88
    */
89
   run: function() {
90
      var self = this;
×
91

UNCOV
92
      return this._compareTableDescriptions()
×
93
         .then(this.compareSlavesToMasterScan.bind(this))
94
         .then(function() {
UNCOV
95
            if (self._opts.scanForExtra || self._opts.deleteExtra) {
×
96
               return _.reduce(self._slaves, function(prev, slaveDef) {
×
97
                  return prev.then(self.scanSlaveForExtraItems.bind(self, slaveDef));
×
98
               }, Q.when());
99
            }
100
         })
101
         .catch(function(err) {
102
            self._abortScanning = true;
×
UNCOV
103
            console.error('Encountered an error while comparing tables', err, err.stack);
×
104
         })
105
         .then(this._outputStats.bind(this))
106
         .then(function() {
UNCOV
107
            if (self._abortScanning) {
×
108
               process.exit(1); // eslint-disable-line no-process-exit
×
109
            }
110
         });
111
   },
112

113
   trackScanProgress: function(enabled, approxItems, f) {
114
      var tick = approxItems > 100 ? 1 : 5,
×
115
          currentPercent = 0,
×
UNCOV
116
          nextProgressPercent = 0,
×
117
          ret;
118

UNCOV
119
      ret = f(function(currentItems) {
×
120
         if (!enabled) {
×
121
            return;
×
122
         }
123

UNCOV
124
         currentPercent = Math.round(currentItems / approxItems * 100.0);
×
125

UNCOV
126
         if (currentPercent < 100 && currentPercent >= (nextProgressPercent + tick)) {
×
UNCOV
127
            readline.clearLine(process.stdout);
×
UNCOV
128
            readline.cursorTo(process.stdout, 0);
×
129
            process.stdout.write('(' + currentPercent + '%)');
×
130

UNCOV
131
            nextProgressPercent = currentPercent >= (nextProgressPercent + tick) ?
×
132
               (nextProgressPercent + tick) :
133
               (currentPercent + tick);
134
         } else {
135
            process.stdout.write('.');
×
136
         }
137
      });
138

UNCOV
139
      return ret.then(function() {
×
UNCOV
140
         readline.clearLine(process.stdout);
×
UNCOV
141
         readline.cursorTo(process.stdout, 0);
×
UNCOV
142
         process.stdout.write('(100%)\n');
×
143
      });
144
   },
145

146
   /**
147
    * Scans the master table, and for each batch of items that the master table has in it,
148
    * queries the slaves to see if they contain the exact same items. If an item is
149
    * missing from a slave table, or an item in a slave table differs, this will call the
150
    * appropriate function to handle that event.
151
    *
152
    * @see Synchronizer#slaveMissingItem
153
    * @see Synchronizer#slaveItemDiffers
154
    * @see Synchronizer#slaveItemMatchesMaster
155
    */
156
   compareSlavesToMasterScan: function() {
157
      var self = this;
×
158

159
      // TODO: BatchGetItem will allow up to 100 items per request, but you may have to
160
      // iterate on UnprocessedKeys if the response would be too large.
161
      // See http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html
162
      return this.trackScanProgress(!self._opts.verbose, self._master.approxItems, function(trackProgress) {
×
163
         return self.scanTable(self._master, self._opts.batchReadLimit, function(batch, counter) {
×
UNCOV
164
            var keys = _.map(batch, self._makeKeyFromItem.bind(self));
×
165

UNCOV
166
            return Q.all(_.map(self._slaves, self._batchGetItems.bind(self, keys, false)))
×
167
               .then(function(slaveBatches) {
168
                  return Q.all(_.map(slaveBatches, function(slaveBatch, i) {
×
UNCOV
169
                     return self._compareBatch(batch, self._slaves[i], slaveBatch);
×
170
                  }));
171
               })
172
               .then(function() {
UNCOV
173
                  if (self._opts.verbose) {
×
174
                     console.log(
×
175
                        'Status: have compared %d of approximately %d items from the master table to its slaves',
176
                        counter.get() + batch.length,
177
                        self._master.approxItems
178
                     );
179
                  } else {
UNCOV
180
                     if (counter.get() === 0) {
×
181
                        console.log(
×
182
                           'Status: Comparing approximately %d items from the master table to its slaves',
183
                           self._master.approxItems
184
                        );
185
                     }
186

UNCOV
187
                     trackProgress(counter.get() + batch.length);
×
188
                  }
189
               });
190
         });
191
      });
192
   },
193

194
   /**
195
    * Scans the provided slave table comparing every item that it contains to the master
196
    * table. If items are found in the slave that do not exist in the master table,
197
    * `slaveExtraItem` will be called to handle the extra item (possibly deleting it from
198
    * the slave).
199
    *
200
    * @see Synchronizer#slaveExtraItem
201
    */
202
   scanSlaveForExtraItems: function(slaveDef) {
UNCOV
203
      var self = this;
×
204

205
      console.log('\nStarting to scan slave %s for extra items', slaveDef.id);
×
206

207
      // Remember that in this function we are only comparing keys (we pass `true` to both
208
      // `scanTable` and `_batchGetItems`) because we are simply looking for items on the
209
      // slave that do not exist in the master.
210

211
      return this.trackScanProgress(!self._opts.verbose, slaveDef.approxItems, function(trackProgress) {
×
UNCOV
212
         return self.scanTable(slaveDef, self._opts.batchReadLimit, function(slaveBatch, counter) {
×
UNCOV
213
            return self._batchGetItems(slaveBatch, true, self._master)
×
214
               .then(self._compareForExtraItems.bind(self, slaveDef, slaveBatch))
215
               .then(function() {
UNCOV
216
                  if (self._opts.verbose) {
×
UNCOV
217
                     console.log(
×
218
                        'Status: have compared %d of approximately %d items from the slave table to the master',
219
                        counter.get() + slaveBatch.length,
220
                        slaveDef.approxItems
221
                     );
222
                  } else {
223
                     // Simple . increment w/% logging
UNCOV
224
                     if (counter.get() === 0) {
×
225
                        console.log(
×
226
                           'Status: Comparing approximately %d items from the slave table to the master',
227
                           slaveDef.approxItems
228
                        );
229
                     }
230

UNCOV
231
                     trackProgress(counter.get() + slaveBatch.length);
×
232
                  }
233
               });
234
         }, true);
235
      });
236
   },
237

238
   _compareForExtraItems: function(slaveDef, slaveBatch, masterBatch) {
239
      var self = this;
×
240

UNCOV
241
      return Q.all(_.map(slaveBatch, function(slaveKey) {
×
UNCOV
242
         var masterKey = _.findWhere(masterBatch, slaveKey);
×
243

UNCOV
244
         if (!masterKey) {
×
UNCOV
245
            self._stats[slaveDef.id].extra = self._stats[slaveDef.id].extra + 1;
×
UNCOV
246
            return self.slaveExtraItem(slaveKey, slaveDef);
×
247
         }
248
      }));
249
   },
250

251
   /**
252
    * Comparator that determines if a master item and a slave item match or differ.
253
    * Default implementation takes into account the attributes that are to be ignored
254
    * during comparison, leaving them out of the compared objects.
255
    *
256
    * In some cases you may want to override this to provide custom comparison. See the
257
    * README for one example of where you might do this.
258
    *
259
    * @param masterItem {object} the item from the master table
260
    * @param slaveItem {object} the item from the slave table that will be compared to the
261
    * master item
262
    * @returns {boolean} **true** if the items are **different**, false if they are the
263
    * same
264
    */
265
   isItemDifferent: function(masterItem, slaveItem) {
UNCOV
266
      var atts = _.union(this._opts.ignoreAtts, REPLICATION_FIELDS);
×
267

UNCOV
268
      return !_.isEqual(_.omit(masterItem, atts), _.omit(slaveItem, atts));
×
269
   },
270

271
   /**
272
    * This method is called each time the comparison operation finds an item that exists
273
    * on the master and does not exist on the slave. In its default implementation it will
274
    * write the item to the slave if the `writeMissing` option is enabled.
275
    *
276
    * @param masterItem {object} the item from the master table
277
    * @param slaveDef {object} the table definition for the slave table
278
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
279
    * @returns {Promise} that is fulfilled when it is done processing the missing item (it
280
    * is not necessary to return a promise if you are not doing any processing)
281
    */
282
   slaveMissingItem: function(masterItem, slaveDef, key) {
UNCOV
283
      if (this._opts.verbose) {
×
UNCOV
284
         console.warn('%s is missing item present in master table: %j', slaveDef.id, key);
×
285
      }
286

UNCOV
287
      if (this._opts.writeMissing) {
×
UNCOV
288
         return this.writeItem(masterItem, slaveDef);
×
289
      }
290
   },
291

292
   /**
293
    * This method is called each time the comparison operation finds an item that exists
294
    * on the master and slave, but the slave's version of the item is different from the
295
    * master's. In its default implementation it will write the item to the slave if the
296
    * `writeDiffering` option is enabled.
297
    *
298
    * @param masterItem {object} the item from the master table
299
    * @param slaveItem {object} the item from the slave table
300
    * @param slaveDef {object} the table definition for the slave table
301
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
302
    * @returns {Promise} that is fulfilled when it is done processing the differing item
303
    * (it is not necessary to return a promise if you are not doing any processing)
304
    */
305
   slaveItemDiffers: function(masterItem, slaveItem, slaveDef, key) {
UNCOV
306
      if (this._opts.verbose) {
×
307
         console.warn('Item in %s differs from same item in master table: %j', slaveDef.id, key);
×
308
      }
309

310
      // TODO: output the differences
311
      // console.log('master', masterItem);
312
      // console.log('slave', slaveItem);
UNCOV
313
      if (this._opts.writeDiffering) {
×
UNCOV
314
         return this.writeItem(masterItem, slaveDef);
×
315
      }
316
   },
317

318
   /**
319
    * This method is called each time the comparison operation finds an item that exists
320
    * on the master and slave, and both items match each other. In its default
321
    * implementation it does nothing.
322
    *
323
    * @param masterItem {object} the item from the master table
324
    * @param slaveItem {object} the item from the slave table
325
    * @param slaveDef {object} the table definition for the slave table
326
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
327
    * @returns {Promise} that is fulfilled when it is done processing the items (it is not
328
    * necessary to return a promise if you are not doing any processing)
329
    */
330
   slaveItemMatchesMaster: function() {
331
      // defaults to no-op, can be overridden
332
   },
333

334
   /**
335
    * This method is called each time the comparison operation finds an item that exists
336
    * on the slave that does not exist on the master. If the `deleteExtra` option is set
337
    * to true, this will delete the item from the slave.
338
    *
339
    * @param key {object} the DynamoDB key (hash and range if applicable) of the item that
340
    * exists in the slave table but not in the master table
341
    * @param slaveDef {object} the table definition for the slave table
342
    * @returns {Promise} that is fulfilled when it is done processing the items (it is not
343
    * necessary to return a promise if you are not doing any processing)
344
    */
345
   slaveExtraItem: function(key, slaveDef) {
UNCOV
346
      if (this._opts.verbose) {
×
UNCOV
347
         console.warn('Slave %s had an item that was not in the master table: %j', slaveDef.id, key);
×
348
      }
349

UNCOV
350
      if (this._opts.deleteExtra) {
×
UNCOV
351
         return this.deleteItem(key, slaveDef);
×
352
      }
353
   },
354

355
   /**
356
    * Writes an item to a table.
357
    *
358
    * @param item {object} the item to write
359
    * @param tableDef {object} the table definition for the table to write to
360
    */
361
   writeItem: function(item, tableDef) {
UNCOV
362
      if (this._opts.verbose) {
×
UNCOV
363
         console.log('Writing item to %s: %j', tableDef.id, this._makeKeyFromItem(item));
×
364
      }
365

UNCOV
366
      return Q.ninvoke(tableDef.docs, 'put', { TableName: tableDef.name, Item: _.omit(item, REPLICATION_FIELDS) });
×
367
   },
368

369
   /**
370
    * Deletes an item from a table.
371
    *
372
    * @param key {object} the key of the item to delete - should contain hash and sort
373
    * keys if the table uses both, otherwise just the hash key
374
    * @param tableDef {object} the table definition for the table to delete from
375
    */
376
   deleteItem: function(key, tableDef) {
UNCOV
377
      if (this._opts.verbose) {
×
UNCOV
378
         console.log('Deleting item from %s: %j', tableDef.id, key);
×
379
      }
380

UNCOV
381
      return Q.ninvoke(tableDef.docs, 'delete', { TableName: tableDef.name, Key: key });
×
382
   },
383

384
   /**
385
    * Scans a table, iterating over every item in the table. The number of items returned
386
    * by each call to DynamoDB's scan operation can be controlled by the `scanLimit`
387
    * option. Otherwise, will allow DynamoDB to dictate the number of items returned in
388
    * each call to `scan`.
389
    *
390
    * Each time scan is called, the items that are returned are "chunked" into batches
391
    * matching `callbackBatchSize`. Then the `callback` is invoked, being passed the
392
    * current batch (an array of items less than or equal to the `callbackBatchSize`) and
393
    * a counter that is tracking the number of items that have been processed prior to
394
    * this invocation of the callback.
395
    *
396
    * @param tableDef {object} table definition for the table to scan
397
    * @param callbackBatchSize {integer} how many items to pass the callback in each batch
398
    * @param callback {function} the callback to invoke for each batch of items (see
399
    * above for details on the args passed to the callback)
400
    * @param [keysOnly] {boolean} if true, returns only the keys of the items
401
    * @see Counter
402
    */
403
   scanTable: function(tableDef, callbackBatchSize, callback, keysOnly) {
UNCOV
404
      var self = this,
×
405
          counter = new Counter();
×
406

UNCOV
407
      if (this._opts.parallel) {
×
UNCOV
408
         return Q.all(_.times(this._opts.parallel, function(i) {
×
UNCOV
409
            return self._doScan(tableDef, keysOnly, callbackBatchSize, callback, i, counter)
×
410
               .catch(function(err) {
411
                  self._abortScanning = true;
×
UNCOV
412
                  throw err;
×
413
               });
414
         }));
415
      }
416

417
      return this._doScan(tableDef, keysOnly, callbackBatchSize, callback);
×
418
   },
419

420
   _doScan: function(tableDef, keysOnly, callbackBatchSize, callback, segment, counter) { // eslint-disable-line max-params
421
      var self = this,
×
UNCOV
422
          lastKey = this._opts.startingKey,
×
UNCOV
423
          deferred = Q.defer();
×
424

UNCOV
425
      counter = counter || new Counter();
×
426

427
      console.log('Scanning %s', tableDef.id);
×
428

429
      function loopOnce() {
430
         var params = { TableName: tableDef.name, ExclusiveStartKey: lastKey };
×
431

432
         if (keysOnly) {
×
UNCOV
433
            params.AttributesToGet = _.values(tableDef.schema);
×
434
         }
435

436
         if (self._abortScanning) {
×
437
            console.error('Segment %d is stopping because of an error in another segment scanner', segment);
×
UNCOV
438
            return deferred.resolve(counter.get());
×
439
         }
440

441
         if (segment !== undefined) {
×
UNCOV
442
            params.Segment = segment;
×
UNCOV
443
            params.TotalSegments = self._opts.parallel;
×
444
         }
445

UNCOV
446
         if (self._opts.scanLimit !== undefined) {
×
UNCOV
447
            params.Limit = self._opts.scanLimit;
×
448
         }
449

450
         // trace-level logging: console.log('scan', params);
UNCOV
451
         return Q.ninvoke(tableDef.docs, 'scan', params)
×
452
            .then(function(resp) {
453
               // trace-level logging: console.log('resp', _.omit(resp, 'Items'));
454
               return _.chain(resp.Items)
×
455
                  .groupBy(function(item, i) {
456
                     return Math.floor(i / callbackBatchSize);
×
457
                  })
458
                  .values()
459
                  .reduce(function(prev, batch) {
UNCOV
460
                     return prev
×
461
                        .then(function() {
UNCOV
462
                           return callback(batch, counter);
×
463
                        })
464
                        .then(function() {
465
                           counter.increment(batch.length);
×
466
                        });
467
                  }, Q.when())
468
                  .value()
469
                  .then(function() {
470
                     if (resp.LastEvaluatedKey) {
×
UNCOV
471
                        lastKey = resp.LastEvaluatedKey;
×
UNCOV
472
                        Q.nextTick(loopOnce);
×
473
                     } else {
UNCOV
474
                        if (segment !== undefined) {
×
UNCOV
475
                           if (self._opts.verbose) {
×
UNCOV
476
                              console.log('Segment %d of %d has completed', segment, self._opts.parallel);
×
477
                           }
478
                        }
UNCOV
479
                        deferred.resolve(counter.get());
×
480
                     }
481
                  });
482
            })
483
            .catch(deferred.reject);
484
      }
485

UNCOV
486
      Q.nextTick(function() {
×
UNCOV
487
         loopOnce().catch(deferred.reject);
×
488
      });
489

UNCOV
490
      return deferred.promise;
×
491
   },
492

493
   _makeKeyFromItem: function(item) {
UNCOV
494
      return _.pick(item, _.values(this._master.schema));
×
495
   },
496

497
   /**
498
    * Describes the master and all slave tables, comparing their descriptions to make sure
499
    * that they have the same key schema. The comparison will not work on tables that have
500
    * different key schemas.
501
    *
502
    * Also updates the table definition that we hold in memory to have an object
503
    * containing information about the key schema. The object will look like this:
504
    *
505
    * ```
506
    * {
507
    *    hash: 'NameOfHashKeyField',
508
    *    range: 'NameOfRangeKeyFieldIfThereIsOne', // undefined if there is not one
509
    * }
510
    * ```
511
    *
512
    * Additionally updates the table definition in memory to have an `approxItems` field
513
    * that has the approximate number of items that DynamoDB reports for the table.
514
    */
515
   _compareTableDescriptions: function() {
516
      var def = Q.defer(),
×
517
          describeMaster = this._describeTable(this._master),
×
NEW
518
          slaveCreds = this._opts.slaveCredentials,
×
NEW
519
          localTarget = this._opts.localhostTarget,
×
NEW
UNCOV
520
          describeSlaves = Q.all(_.map(this._slaves, _.partial(this._describeTable.bind(this), _, slaveCreds, localTarget)));
×
521

522
      function logDescription(title, tableDef, tableDesc) {
523
         console.log('%s table %s', title, tableDef.id);
×
524
         console.log('Approx. item count:', tableDesc.ItemCount);
×
UNCOV
525
         console.log('Key schema:', tableDef.schema);
×
526
         console.log();
×
527
      }
528

529
      function addTableInfoToDefinition(tableDef, desc) {
UNCOV
530
         var hash = _.findWhere(desc.KeySchema, { KeyType: 'HASH' }),
×
531
             range = _.findWhere(desc.KeySchema, { KeyType: 'RANGE' });
×
532

UNCOV
533
         tableDef.schema = { hash: hash.AttributeName };
×
534
         if (range) {
×
UNCOV
535
            tableDef.schema.range = range.AttributeName;
×
536
         }
537

538
         tableDef.approxItems = desc.ItemCount;
×
539
      }
540

541
      Q.all([ describeMaster, describeSlaves ])
×
542
         .spread(function(master, slaves) {
543
            var unlikeTables = [];
×
544

545
            addTableInfoToDefinition(this._master, master);
×
546
            logDescription('Master', this._master, master);
×
547

UNCOV
548
            _.each(slaves, function(slaveDesc, i) {
×
UNCOV
549
               addTableInfoToDefinition(this._slaves[i], slaveDesc);
×
550
               logDescription('Slave[' + (i + 1) + ']', this._slaves[i], slaveDesc);
×
551

UNCOV
552
               if (!_.isEqual(master.KeySchema, slaveDesc.KeySchema)) {
×
553
                  unlikeTables.push(this._slaves[i].id);
×
554
               }
555
            }.bind(this));
556

UNCOV
557
            return unlikeTables;
×
558
         }.bind(this))
559
         .then(function(unlikeTables) {
UNCOV
560
            if (!_.isEmpty(unlikeTables)) {
×
UNCOV
561
               console.error('The following slave tables have key schemas that do not match the master table:', unlikeTables);
×
UNCOV
562
               return def.reject(unlikeTables);
×
563
            }
564

UNCOV
565
            def.resolve();
×
566
         })
567
         .catch(def.reject)
568
         .done();
569

570
      return def.promise;
×
571
   },
572

573
   _describeTable: function(tableDef, creds, localhostTarget) {
NEW
574
      var options = { region: tableDef.region },
×
575
          dyn;
576

NEW
577
      if (localhostTarget) {
×
NEW
UNCOV
578
         options.endpoint = localhostTarget;
×
579
      } else {
NEW
UNCOV
580
         options.credentials = creds || AWS.config.credentials;
×
581
      }
582

NEW
UNCOV
583
      dyn = new AWS.DynamoDB(options);
×
584

585
      return Q.ninvoke(dyn, 'describeTable', { TableName: tableDef.name })
×
586
         .then(function(resp) {
UNCOV
587
            return resp.Table;
×
588
         });
589
   },
590

591
   _makeDocClient: function(def, creds) {
UNCOV
592
      return new AWS.DynamoDB.DocumentClient({
×
593
         region: def.region,
594
         credentials: creds || AWS.config.credentials,
×
595
         maxRetries: this._opts.maxRetries,
596
         retryDelayOptions: {
597
            base: this._opts.retryDelayBase,
598
         },
599
      });
600
   },
601

602
   _makeLocalDocClient: function(def, localhostTarget) {
NEW
UNCOV
603
      return new AWS.DynamoDB.DocumentClient({
×
604
         region: def.region,
605
         endpoint: localhostTarget,
606
         maxRetries: this._opts.maxRetries,
607
         retryDelayOptions: {
608
            base: this._opts.retryDelayBase,
609
         },
610
      });
611
   },
612

613
   _outputStats: function() {
614
      console.log('\nSynchronization completed. Stats:');
×
615

616
      _.each(this._slaves, function(slave) {
×
UNCOV
617
         var stats = this._stats[slave.id];
×
618

619
         console.log('\n%s', slave.id);
×
UNCOV
620
         console.log('Had %d items that were the same as the master', stats.sameAs);
×
UNCOV
621
         if (this._opts.deleteExtra || this._opts.scanForExtra) {
×
622
            console.log('Had %d items more than master', stats.extra);
×
623
         } else {
624
            console.log('(We did not scan the slave to find if it had "extra" items that the master does not have)');
×
625
         }
UNCOV
626
         console.log('Had %d items that were different from the master', stats.differing);
×
UNCOV
627
         console.log('Was missing %d items that the master had', stats.missing);
×
628
      }.bind(this));
629

UNCOV
630
      return this._stats;
×
631
   },
632

633
   _batchGetItems: function(keys, keysOnly, tableDef) {
UNCOV
634
      var params = { RequestItems: {} };
×
635

636
      params.RequestItems[tableDef.name] = { Keys: keys };
×
637

UNCOV
638
      if (keysOnly) {
×
639
         params.RequestItems[tableDef.name].AttributesToGet = _.values(tableDef.schema);
×
640
      }
641

UNCOV
642
      return Q.ninvoke(tableDef.docs, 'batchGet', params)
×
643
         .then(function(resp) {
644
            if (!_.isEmpty(resp.UnprocessedKeys)) {
×
645
               throw new Error('Unprocessed keys in batchGet'); // TODO: recursive querying as needed
×
646
            }
647

648
            return resp.Responses[tableDef.name];
×
649
         });
650
   },
651

652
   _compareBatch: function(masterBatch, slaveDef, slaveBatch) {
UNCOV
653
      var self = this;
×
654

UNCOV
655
      if (this._opts.verbose) {
×
UNCOV
656
         console.log('Comparing batch of %d from master to %d from slave %s', masterBatch.length, slaveBatch.length, slaveDef.id);
×
657
      }
658

UNCOV
659
      return Q.all(_.map(masterBatch, function(masterItem) {
×
UNCOV
660
         var key = self._makeKeyFromItem(masterItem),
×
UNCOV
661
             slaveItem = _.findWhere(slaveBatch, key);
×
662

UNCOV
663
         if (!slaveItem) {
×
UNCOV
664
            self._stats[slaveDef.id].missing = self._stats[slaveDef.id].missing + 1;
×
UNCOV
665
            return self.slaveMissingItem(masterItem, slaveDef, key);
×
UNCOV
666
         } else if (self.isItemDifferent(masterItem, slaveItem)) {
×
UNCOV
667
            self._stats[slaveDef.id].differing = self._stats[slaveDef.id].differing + 1;
×
UNCOV
668
            return self.slaveItemDiffers(masterItem, slaveItem, slaveDef, key);
×
669
         }
670

UNCOV
671
         self._stats[slaveDef.id].sameAs = self._stats[slaveDef.id].sameAs + 1;
×
UNCOV
672
         return self.slaveItemMatchesMaster(masterItem, slaveItem, slaveDef, key);
×
673
      }));
674
   },
675

676
});
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc