• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

silvermine / dynamodb-table-sync / 9822079146

06 Jul 2024 09:03PM UTC coverage: 0.0%. Remained the same
9822079146

Pull #26

github

web-flow
Merge 95d4595de into 80edaf9a1
Pull Request #26: feat: Add support for synchronizing to a local slave instance

0 of 132 branches covered (0.0%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 2 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 278 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Synchronizer.js
1
'use strict';
2

3
var readline = require('readline'),
×
4
    _ = require('underscore'),
×
5
    Q = require('q'),
×
6
    AWS = require('aws-sdk'),
×
7
    Class = require('class.extend'),
×
8
    Counter = require('./lib/Counter'),
×
9
    REPLICATION_FIELDS = [ 'aws:rep:updateregion', 'aws:rep:updatetime', 'aws:rep:deleting' ];
×
10

11
module.exports = Class.extend({
×
12

13
   /**
14
    * The possible options supplied in `opts` are:
15
    *
16
    * ```
17
    * {
18
    *    writeMissing: false,
19
    *    writeDiffering: false,
20
    *    scanForExtra: false,
21
    *    deleteExtra: false,
22
    *    ignoreAtts: [ 'attributes', 'to', 'ignore' ],
23
    *    startingKey: { hashKey: 'abc', rangeKey: 'xyz' },
24
    *    scanLimit: 100,
25
    *    batchReadLimit: 50,
26
    *    maxRetries: 10,
27
    *    retryDelayBase: 50,
28
    *    parallel: 4,
29
    *    slaveCredentials: AWSCredentialsProvider(...),
30
    * }
31
    * ```
32
    *
33
    * @class Synchronizer
34
    * @param master {object} table definition { region: 'region', name: 'table-name' }
35
    * @param slaves {object[]} array of table definitions
36
    * @param opts {object} options to determine how this class operates
37
    */
38
   init: function(master, slaves, opts) {
39
      this._opts = _.extend({ batchReadLimit: 50, maxRetries: 10, retryDelayBase: 50 }, opts);
×
40

41
      this._master = _.extend({}, master, { id: (master.region + ':' + master.name), docs: this._makeDocClient(master) });
×
42

43
      this._slaves = _.map(slaves, function(def) {
×
NEW
44
         var client = this._makeDocClient(def, opts.slaveCredentials, opts.slaveEndpoint);
×
45

NEW
46
         return _.extend({}, def, { id: (def.region + ':' + def.name), docs: client });
×
47
      }.bind(this));
48

49
      this._abortScanning = false;
×
50

51
      this._stats = _.reduce(this._slaves, function(stats, slave) {
×
52
         stats[slave.id] = { extra: 0, sameAs: 0, differing: 0, missing: 0 };
×
53
         return stats;
×
54
      }, {});
55
   },
56

57
   /**
58
    * Main entry function that runs a complete synchronization (regardless of whether that
59
    * is a "dry run" or an actual sync job that will write/delete items).
60
    *
61
    * The statistics that are returned contain stats for each slave table, comparing it to
62
    * the master, and are in the format:
63
    *
64
    * ```
65
    * {
66
    *    'region-1:table': {
67
    *       extra: 0, // number of items only in the slave, not in the master
68
    *       sameAs: 0, // number of items that matched
69
    *       differing: 0, // number of items the slave had, but that differed
70
    *       missing: 0, // number of items the slave was missing
71
    *    },
72
    *    'region-2:table': {
73
    *       extra: 0,
74
    *       sameAs: 0,
75
    *       differing: 0,
76
    *       missing: 0,
77
    *    },
78
    * }
79
    * ```
80
    *
81
    * @return {object} statistics on what the operation completed
82
    */
83
   run: function() {
84
      var self = this;
×
85

86
      return this._compareTableDescriptions()
×
87
         .then(this.compareSlavesToMasterScan.bind(this))
88
         .then(function() {
89
            if (self._opts.scanForExtra || self._opts.deleteExtra) {
×
90
               return _.reduce(self._slaves, function(prev, slaveDef) {
×
91
                  return prev.then(self.scanSlaveForExtraItems.bind(self, slaveDef));
×
92
               }, Q.when());
93
            }
94
         })
95
         .catch(function(err) {
96
            self._abortScanning = true;
×
97
            console.error('Encountered an error while comparing tables', err, err.stack);
×
98
         })
99
         .then(this._outputStats.bind(this))
100
         .then(function() {
101
            if (self._abortScanning) {
×
102
               process.exit(1); // eslint-disable-line no-process-exit
×
103
            }
104
         });
105
   },
106

107
   trackScanProgress: function(enabled, approxItems, f) {
108
      var tick = approxItems > 100 ? 1 : 5,
×
109
          currentPercent = 0,
×
110
          nextProgressPercent = 0,
×
111
          ret;
112

113
      ret = f(function(currentItems) {
×
114
         if (!enabled) {
×
115
            return;
×
116
         }
117

118
         currentPercent = Math.round(currentItems / approxItems * 100.0);
×
119

120
         if (currentPercent < 100 && currentPercent >= (nextProgressPercent + tick)) {
×
121
            readline.clearLine(process.stdout);
×
122
            readline.cursorTo(process.stdout, 0);
×
123
            process.stdout.write('(' + currentPercent + '%)');
×
124

125
            nextProgressPercent = currentPercent >= (nextProgressPercent + tick) ?
×
126
               (nextProgressPercent + tick) :
127
               (currentPercent + tick);
128
         } else {
129
            process.stdout.write('.');
×
130
         }
131
      });
132

133
      return ret.then(function() {
×
134
         readline.clearLine(process.stdout);
×
135
         readline.cursorTo(process.stdout, 0);
×
136
         process.stdout.write('(100%)\n');
×
137
      });
138
   },
139

140
   /**
141
    * Scans the master table, and for each batch of items that the master table has in it,
142
    * queries the slaves to see if they contain the exact same items. If an item is
143
    * missing from a slave table, or an item in a slave table differs, this will call the
144
    * appropriate function to handle that event.
145
    *
146
    * @see Synchronizer#slaveMissingItem
147
    * @see Synchronizer#slaveItemDiffers
148
    * @see Synchronizer#slaveItemMatchesMaster
149
    */
150
   compareSlavesToMasterScan: function() {
151
      var self = this;
×
152

153
      // TODO: BatchGetItem will allow up to 100 items per request, but you may have to
154
      // iterate on UnprocessedKeys if the response would be too large.
155
      // See http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html
156
      return this.trackScanProgress(!self._opts.verbose, self._master.approxItems, function(trackProgress) {
×
157
         return self.scanTable(self._master, self._opts.batchReadLimit, function(batch, counter) {
×
158
            var keys = _.map(batch, self._makeKeyFromItem.bind(self));
×
159

160
            return Q.all(_.map(self._slaves, self._batchGetItems.bind(self, keys, false)))
×
161
               .then(function(slaveBatches) {
162
                  return Q.all(_.map(slaveBatches, function(slaveBatch, i) {
×
163
                     return self._compareBatch(batch, self._slaves[i], slaveBatch);
×
164
                  }));
165
               })
166
               .then(function() {
167
                  if (self._opts.verbose) {
×
168
                     console.log(
×
169
                        'Status: have compared %d of approximately %d items from the master table to its slaves',
170
                        counter.get() + batch.length,
171
                        self._master.approxItems
172
                     );
173
                  } else {
174
                     if (counter.get() === 0) {
×
175
                        console.log(
×
176
                           'Status: Comparing approximately %d items from the master table to its slaves',
177
                           self._master.approxItems
178
                        );
179
                     }
180

181
                     trackProgress(counter.get() + batch.length);
×
182
                  }
183
               });
184
         });
185
      });
186
   },
187

188
   /**
189
    * Scans the provided slave table comparing every item that it contains to the master
190
    * table. If items are found in the slave that do not exist in the master table,
191
    * `slaveExtraItem` will be called to handle the extra item (possibly deleting it from
192
    * the slave).
193
    *
194
    * @see Synchronizer#slaveExtraItem
195
    */
196
   scanSlaveForExtraItems: function(slaveDef) {
197
      var self = this;
×
198

199
      console.log('\nStarting to scan slave %s for extra items', slaveDef.id);
×
200

201
      // Remember that in this function we are only comparing keys (we pass `true` to both
202
      // `scanTable` and `_batchGetItems`) because we are simply looking for items on the
203
      // slave that do not exist in the master.
204

205
      return this.trackScanProgress(!self._opts.verbose, slaveDef.approxItems, function(trackProgress) {
×
206
         return self.scanTable(slaveDef, self._opts.batchReadLimit, function(slaveBatch, counter) {
×
207
            return self._batchGetItems(slaveBatch, true, self._master)
×
208
               .then(self._compareForExtraItems.bind(self, slaveDef, slaveBatch))
209
               .then(function() {
210
                  if (self._opts.verbose) {
×
211
                     console.log(
×
212
                        'Status: have compared %d of approximately %d items from the slave table to the master',
213
                        counter.get() + slaveBatch.length,
214
                        slaveDef.approxItems
215
                     );
216
                  } else {
217
                     // Simple . increment w/% logging
218
                     if (counter.get() === 0) {
×
219
                        console.log(
×
220
                           'Status: Comparing approximately %d items from the slave table to the master',
221
                           slaveDef.approxItems
222
                        );
223
                     }
224

225
                     trackProgress(counter.get() + slaveBatch.length);
×
226
                  }
227
               });
228
         }, true);
229
      });
230
   },
231

232
   _compareForExtraItems: function(slaveDef, slaveBatch, masterBatch) {
233
      var self = this;
×
234

235
      return Q.all(_.map(slaveBatch, function(slaveKey) {
×
236
         var masterKey = _.findWhere(masterBatch, slaveKey);
×
237

238
         if (!masterKey) {
×
239
            self._stats[slaveDef.id].extra = self._stats[slaveDef.id].extra + 1;
×
240
            return self.slaveExtraItem(slaveKey, slaveDef);
×
241
         }
242
      }));
243
   },
244

245
   /**
246
    * Comparator that determines if a master item and a slave item match or differ.
247
    * Default implementation takes into account the attributes that are to be ignored
248
    * during comparison, leaving them out of the compared objects.
249
    *
250
    * In some cases you may want to override this to provide custom comparison. See the
251
    * README for one example of where you might do this.
252
    *
253
    * @param masterItem {object} the item from the master table
254
    * @param slaveItem {object} the item from the slave table that will be compared to the
255
    * master item
256
    * @returns {boolean} **true** if the items are **different**, false if they are the
257
    * same
258
    */
259
   isItemDifferent: function(masterItem, slaveItem) {
260
      var atts = _.union(this._opts.ignoreAtts, REPLICATION_FIELDS);
×
261

262
      return !_.isEqual(_.omit(masterItem, atts), _.omit(slaveItem, atts));
×
263
   },
264

265
   /**
266
    * This method is called each time the comparison operation finds an item that exists
267
    * on the master and does not exist on the slave. In its default implementation it will
268
    * write the item to the slave if the `writeMissing` option is enabled.
269
    *
270
    * @param masterItem {object} the item from the master table
271
    * @param slaveDef {object} the table definition for the slave table
272
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
273
    * @returns {Promise} that is fulfilled when it is done processing the missing item (it
274
    * is not necessary to return a promise if you are not doing any processing)
275
    */
276
   slaveMissingItem: function(masterItem, slaveDef, key) {
277
      if (this._opts.verbose) {
×
278
         console.warn('%s is missing item present in master table: %j', slaveDef.id, key);
×
279
      }
280

281
      if (this._opts.writeMissing) {
×
282
         return this.writeItem(masterItem, slaveDef);
×
283
      }
284
   },
285

286
   /**
287
    * This method is called each time the comparison operation finds an item that exists
288
    * on the master and slave, but the slave's version of the item is different from the
289
    * master's. In its default implementation it will write the item to the slave if the
290
    * `writeDiffering` option is enabled.
291
    *
292
    * @param masterItem {object} the item from the master table
293
    * @param slaveItem {object} the item from the slave table
294
    * @param slaveDef {object} the table definition for the slave table
295
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
296
    * @returns {Promise} that is fulfilled when it is done processing the differing item
297
    * (it is not necessary to return a promise if you are not doing any processing)
298
    */
299
   slaveItemDiffers: function(masterItem, slaveItem, slaveDef, key) {
300
      if (this._opts.verbose) {
×
301
         console.warn('Item in %s differs from same item in master table: %j', slaveDef.id, key);
×
302
      }
303

304
      // TODO: output the differences
305
      // console.log('master', masterItem);
306
      // console.log('slave', slaveItem);
307
      if (this._opts.writeDiffering) {
×
308
         return this.writeItem(masterItem, slaveDef);
×
309
      }
310
   },
311

312
   /**
313
    * This method is called each time the comparison operation finds an item that exists
314
    * on the master and slave, and both items match each other. In its default
315
    * implementation it does nothing.
316
    *
317
    * @param masterItem {object} the item from the master table
318
    * @param slaveItem {object} the item from the slave table
319
    * @param slaveDef {object} the table definition for the slave table
320
    * @param key {object} the item's DynamoDB key (hash and range if applicable)
321
    * @returns {Promise} that is fulfilled when it is done processing the items (it is not
322
    * necessary to return a promise if you are not doing any processing)
323
    */
324
   slaveItemMatchesMaster: function() {
325
      // defaults to no-op, can be overridden
326
   },
327

328
   /**
329
    * This method is called each time the comparison operation finds an item that exists
330
    * on the slave that does not exist on the master. If the `deleteExtra` option is set
331
    * to true, this will delete the item from the slave.
332
    *
333
    * @param key {object} the DynamoDB key (hash and range if applicable) of the item that
334
    * exists in the slave table but not in the master table
335
    * @param slaveDef {object} the table definition for the slave table
336
    * @returns {Promise} that is fulfilled when it is done processing the items (it is not
337
    * necessary to return a promise if you are not doing any processing)
338
    */
339
   slaveExtraItem: function(key, slaveDef) {
340
      if (this._opts.verbose) {
×
341
         console.warn('Slave %s had an item that was not in the master table: %j', slaveDef.id, key);
×
342
      }
343

344
      if (this._opts.deleteExtra) {
×
345
         return this.deleteItem(key, slaveDef);
×
346
      }
347
   },
348

349
   /**
350
    * Writes an item to a table.
351
    *
352
    * @param item {object} the item to write
353
    * @param tableDef {object} the table definition for the table to write to
354
    */
355
   writeItem: function(item, tableDef) {
356
      if (this._opts.verbose) {
×
357
         console.log('Writing item to %s: %j', tableDef.id, this._makeKeyFromItem(item));
×
358
      }
359

360
      return Q.ninvoke(tableDef.docs, 'put', { TableName: tableDef.name, Item: _.omit(item, REPLICATION_FIELDS) });
×
361
   },
362

363
   /**
364
    * Deletes an item from a table.
365
    *
366
    * @param key {object} the key of the item to delete - should contain hash and sort
367
    * keys if the table uses both, otherwise just the hash key
368
    * @param tableDef {object} the table definition for the table to delete from
369
    */
370
   deleteItem: function(key, tableDef) {
371
      if (this._opts.verbose) {
×
372
         console.log('Deleting item from %s: %j', tableDef.id, key);
×
373
      }
374

375
      return Q.ninvoke(tableDef.docs, 'delete', { TableName: tableDef.name, Key: key });
×
376
   },
377

378
   /**
379
    * Scans a table, iterating over every item in the table. The number of items returned
380
    * by each call to DynamoDB's scan operation can be controlled by the `scanLimit`
381
    * option. Otherwise, will allow DynamoDB to dictate the number of items returned in
382
    * each call to `scan`.
383
    *
384
    * Each time scan is called, the items that are returned are "chunked" into batches
385
    * matching `callbackBatchSize`. Then the `callback` is invoked, being passed the
386
    * current batch (an array of items less than or equal to the `callbackBatchSize`) and
387
    * a counter that is tracking the number of items that have been processed prior to
388
    * this invocation of the callback.
389
    *
390
    * @param tableDef {object} table definition for the table to scan
391
    * @param callbackBatchSize {integer} how many items to pass the callback in each batch
392
    * @param callback {function} the callback to invoke for each batch of items (see
393
    * above for details on the args passed to the callback)
394
    * @param [keysOnly] {boolean} if true, returns only the keys of the items
395
    * @see Counter
396
    */
397
   scanTable: function(tableDef, callbackBatchSize, callback, keysOnly) {
398
      var self = this,
×
399
          counter = new Counter();
×
400

401
      if (this._opts.parallel) {
×
402
         return Q.all(_.times(this._opts.parallel, function(i) {
×
403
            return self._doScan(tableDef, keysOnly, callbackBatchSize, callback, i, counter)
×
404
               .catch(function(err) {
405
                  self._abortScanning = true;
×
406
                  throw err;
×
407
               });
408
         }));
409
      }
410

411
      return this._doScan(tableDef, keysOnly, callbackBatchSize, callback);
×
412
   },
413

414
   _doScan: function(tableDef, keysOnly, callbackBatchSize, callback, segment, counter) { // eslint-disable-line max-params
415
      var self = this,
×
416
          lastKey = this._opts.startingKey,
×
417
          deferred = Q.defer();
×
418

419
      counter = counter || new Counter();
×
420

421
      console.log('Scanning %s', tableDef.id);
×
422

423
      function loopOnce() {
424
         var params = { TableName: tableDef.name, ExclusiveStartKey: lastKey };
×
425

426
         if (keysOnly) {
×
427
            params.AttributesToGet = _.values(tableDef.schema);
×
428
         }
429

430
         if (self._abortScanning) {
×
431
            console.error('Segment %d is stopping because of an error in another segment scanner', segment);
×
432
            return deferred.resolve(counter.get());
×
433
         }
434

435
         if (segment !== undefined) {
×
436
            params.Segment = segment;
×
437
            params.TotalSegments = self._opts.parallel;
×
438
         }
439

440
         if (self._opts.scanLimit !== undefined) {
×
441
            params.Limit = self._opts.scanLimit;
×
442
         }
443

444
         // trace-level logging: console.log('scan', params);
445
         return Q.ninvoke(tableDef.docs, 'scan', params)
×
446
            .then(function(resp) {
447
               // trace-level logging: console.log('resp', _.omit(resp, 'Items'));
448
               return _.chain(resp.Items)
×
449
                  .groupBy(function(item, i) {
450
                     return Math.floor(i / callbackBatchSize);
×
451
                  })
452
                  .values()
453
                  .reduce(function(prev, batch) {
454
                     return prev
×
455
                        .then(function() {
456
                           return callback(batch, counter);
×
457
                        })
458
                        .then(function() {
459
                           counter.increment(batch.length);
×
460
                        });
461
                  }, Q.when())
462
                  .value()
463
                  .then(function() {
464
                     if (resp.LastEvaluatedKey) {
×
465
                        lastKey = resp.LastEvaluatedKey;
×
466
                        Q.nextTick(loopOnce);
×
467
                     } else {
468
                        if (segment !== undefined) {
×
469
                           if (self._opts.verbose) {
×
470
                              console.log('Segment %d of %d has completed', segment, self._opts.parallel);
×
471
                           }
472
                        }
473
                        deferred.resolve(counter.get());
×
474
                     }
475
                  });
476
            })
477
            .catch(deferred.reject);
478
      }
479

480
      Q.nextTick(function() {
×
481
         loopOnce().catch(deferred.reject);
×
482
      });
483

484
      return deferred.promise;
×
485
   },
486

487
   _makeKeyFromItem: function(item) {
488
      return _.pick(item, _.values(this._master.schema));
×
489
   },
490

491
   /**
492
    * Describes the master and all slave tables, comparing their descriptions to make sure
493
    * that they have the same key schema. The comparison will not work on tables that have
494
    * different key schemas.
495
    *
496
    * Also updates the table definition that we hold in memory to have an object
497
    * containing information about the key schema. The object will look like this:
498
    *
499
    * ```
500
    * {
501
    *    hash: 'NameOfHashKeyField',
502
    *    range: 'NameOfRangeKeyFieldIfThereIsOne', // undefined if there is not one
503
    * }
504
    * ```
505
    *
506
    * Additionally updates the table definition in memory to have an `approxItems` field
507
    * that has the approximate number of items that DynamoDB reports for the table.
508
    */
509
   _compareTableDescriptions: function() {
510
      var def = Q.defer(),
×
511
          describeMaster = this._describeTable(this._master),
×
NEW
512
          slaveDescribeFn = _.partial(this._describeTable.bind(this), _, this._opts.slaveCredentials, this._opts.slaveEndpoint),
×
NEW
513
          describeSlaves = Q.all(_.map(this._slaves, slaveDescribeFn));
×
514

515
      function logDescription(title, tableDef, tableDesc) {
516
         console.log('%s table %s', title, tableDef.id);
×
517
         console.log('Approx. item count:', tableDesc.ItemCount);
×
518
         console.log('Key schema:', tableDef.schema);
×
519
         console.log();
×
520
      }
521

522
      function addTableInfoToDefinition(tableDef, desc) {
523
         var hash = _.findWhere(desc.KeySchema, { KeyType: 'HASH' }),
×
524
             range = _.findWhere(desc.KeySchema, { KeyType: 'RANGE' });
×
525

526
         tableDef.schema = { hash: hash.AttributeName };
×
527
         if (range) {
×
528
            tableDef.schema.range = range.AttributeName;
×
529
         }
530

531
         tableDef.approxItems = desc.ItemCount;
×
532
      }
533

534
      Q.all([ describeMaster, describeSlaves ])
×
535
         .spread(function(master, slaves) {
536
            var unlikeTables = [];
×
537

538
            addTableInfoToDefinition(this._master, master);
×
539
            logDescription('Master', this._master, master);
×
540

541
            _.each(slaves, function(slaveDesc, i) {
×
542
               addTableInfoToDefinition(this._slaves[i], slaveDesc);
×
543
               logDescription('Slave[' + (i + 1) + ']', this._slaves[i], slaveDesc);
×
544

545
               if (!_.isEqual(master.KeySchema, slaveDesc.KeySchema)) {
×
546
                  unlikeTables.push(this._slaves[i].id);
×
547
               }
548
            }.bind(this));
549

550
            return unlikeTables;
×
551
         }.bind(this))
552
         .then(function(unlikeTables) {
553
            if (!_.isEmpty(unlikeTables)) {
×
554
               console.error('The following slave tables have key schemas that do not match the master table:', unlikeTables);
×
555
               return def.reject(unlikeTables);
×
556
            }
557

558
            def.resolve();
×
559
         })
560
         .catch(def.reject)
561
         .done();
562

563
      return def.promise;
×
564
   },
565

566
   _describeTable: function(tableDef, creds, endpoint) {
567
      var dyn;
568

NEW
569
      dyn = new AWS.DynamoDB({
×
570
         region: tableDef.region,
571
         endpoint: endpoint,
572
         credentials: (endpoint ? undefined : (creds || AWS.config.credentials)),
×
573
      });
574

575
      return Q.ninvoke(dyn, 'describeTable', { TableName: tableDef.name })
×
576
         .then(function(resp) {
577
            return resp.Table;
×
578
         });
579
   },
580

581
   _makeDocClient: function(def, creds, endpoint) {
582
      return new AWS.DynamoDB.DocumentClient({
×
583
         region: def.region,
584
         endpoint: endpoint,
585
         credentials: (endpoint ? undefined : (creds || AWS.config.credentials)),
×
586
         maxRetries: this._opts.maxRetries,
587
         retryDelayOptions: {
588
            base: this._opts.retryDelayBase,
589
         },
590
      });
591
   },
592

593
   _outputStats: function() {
594
      console.log('\nSynchronization completed. Stats:');
×
595

596
      _.each(this._slaves, function(slave) {
×
597
         var stats = this._stats[slave.id];
×
598

599
         console.log('\n%s', slave.id);
×
600
         console.log('Had %d items that were the same as the master', stats.sameAs);
×
601
         if (this._opts.deleteExtra || this._opts.scanForExtra) {
×
602
            console.log('Had %d items more than master', stats.extra);
×
603
         } else {
604
            console.log('(We did not scan the slave to find if it had "extra" items that the master does not have)');
×
605
         }
606
         console.log('Had %d items that were different from the master', stats.differing);
×
607
         console.log('Was missing %d items that the master had', stats.missing);
×
608
      }.bind(this));
609

610
      return this._stats;
×
611
   },
612

613
   _batchGetItems: function(keys, keysOnly, tableDef) {
614
      var params = { RequestItems: {} };
×
615

616
      params.RequestItems[tableDef.name] = { Keys: keys };
×
617

618
      if (keysOnly) {
×
619
         params.RequestItems[tableDef.name].AttributesToGet = _.values(tableDef.schema);
×
620
      }
621

622
      return Q.ninvoke(tableDef.docs, 'batchGet', params)
×
623
         .then(function(resp) {
624
            if (!_.isEmpty(resp.UnprocessedKeys)) {
×
625
               throw new Error('Unprocessed keys in batchGet'); // TODO: recursive querying as needed
×
626
            }
627

628
            return resp.Responses[tableDef.name];
×
629
         });
630
   },
631

632
   _compareBatch: function(masterBatch, slaveDef, slaveBatch) {
633
      var self = this;
×
634

635
      if (this._opts.verbose) {
×
636
         console.log('Comparing batch of %d from master to %d from slave %s', masterBatch.length, slaveBatch.length, slaveDef.id);
×
637
      }
638

639
      return Q.all(_.map(masterBatch, function(masterItem) {
×
640
         var key = self._makeKeyFromItem(masterItem),
×
641
             slaveItem = _.findWhere(slaveBatch, key);
×
642

643
         if (!slaveItem) {
×
644
            self._stats[slaveDef.id].missing = self._stats[slaveDef.id].missing + 1;
×
645
            return self.slaveMissingItem(masterItem, slaveDef, key);
×
646
         } else if (self.isItemDifferent(masterItem, slaveItem)) {
×
647
            self._stats[slaveDef.id].differing = self._stats[slaveDef.id].differing + 1;
×
648
            return self.slaveItemDiffers(masterItem, slaveItem, slaveDef, key);
×
649
         }
650

651
         self._stats[slaveDef.id].sameAs = self._stats[slaveDef.id].sameAs + 1;
×
652
         return self.slaveItemMatchesMaster(masterItem, slaveItem, slaveDef, key);
×
653
      }));
654
   },
655

656
});
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc