• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sensedeep / dynamodb-onetable / #65

pending completion
#65

push

Michael O'Brien
DEV: temporarily disable stream unit tests

1107 of 1601 branches covered (69.14%)

Branch coverage included in aggregate %.

1780 of 2373 relevant lines covered (75.01%)

623.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.27
/src/Table.js
1
/*
2
    Table.js - DynamoDB table class
3

4
    A OneTable Table represents a single (connected) DynamoDB table
5
 */
6

7
import Crypto from 'crypto'
50✔
8
import UUID from './UUID.js'
50✔
9
import ULID from './ULID.js'
50✔
10
import UID from './UID.js'
50✔
11
import {Expression} from './Expression.js'
50✔
12
import {Schema} from './Schema.js'
50✔
13
import {Metrics} from './Metrics.js'
50✔
14
import {OneTableArgError, OneTableError} from './Error.js'
50✔
15

16
/*
17
    AWS V2 DocumentClient methods
18
 */
19
const DocumentClientMethods = {
50✔
20
    delete: 'delete',
21
    get: 'get',
22
    find: 'query',
23
    put: 'put',
24
    scan: 'scan',
25
    update: 'update',
26
    batchGet: 'batchGet',
27
    batchWrite: 'batchWrite',
28
    transactGet: 'transactGet',
29
    transactWrite: 'transactWrite',
30
}
31

32
/*
33
    Safety string required on API to delete a table
34
*/
35
const ConfirmRemoveTable = 'DeleteTableForever'
50✔
36

37
/*
38
    Crypto IV length
39
*/
40
const IV_LENGTH = 16
50✔
41

42
const DynamoOps = {
50✔
43
    delete: 'deleteItem',
44
    get: 'getItem',
45
    find: 'query',
46
    put: 'putItem',
47
    scan: 'scan',
48
    update: 'updateItem',
49
    batchGet: 'batchGet',
50
    batchWrite: 'batchWrite',
51
    transactGet: 'transactGet',
52
    transactWrite: 'transactWrite',
53
}
54

55
const GenericModel = '_Generic'
50✔
56

57
const maxBatchSize = 25
50✔
58

59
/*
60
    Represent a single DynamoDB table
61
 */
62
export class Table {
50✔
63
    constructor(params = {}) {
×
64
        if (!params.name) {
56✔
65
            throw new OneTableArgError('Missing "name" property')
1✔
66
        }
67
        this.context = {}
55✔
68

69
        this.log = params.senselogs ? params.senselogs : new Log(params.logger)
55!
70
        this.log.trace(`Loading OneTable`)
55✔
71

72
        if (params.client) {
55✔
73
            this.setClient(params.client)
52✔
74
        }
75
        if (params.crypto) {
55✔
76
            this.initCrypto(params.crypto)
1✔
77
            this.crypto = Object.assign(params.crypto)
1✔
78
            for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
79
                crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
80
                this.crypto[name] = crypto
1✔
81
                this.crypto[name].name = name
1✔
82
            }
83
        }
84
        this.setParams(params)
55✔
85

86
        //  Set schema param defaults
87
        this.typeField = '_type'
55✔
88
        this.createdField = 'created'
55✔
89
        this.isoDates = false
55✔
90
        this.nulls = false
55✔
91
        this.timestamps = false
55✔
92
        this.updatedField = 'updated'
55✔
93

94
        this.schema = new Schema(this, params.schema)
55✔
95
        if (params.dataloader) {
55✔
96
            this.dataloader = new params.dataloader((cmds) => this.batchLoaderFunction(cmds), {maxBatchSize})
2✔
97
        }
98
    }
99

100
    setClient(client) {
101
        this.client = client
55✔
102
        this.V3 = client.V3
55✔
103
        this.service = this.V3 ? this.client : this.client.service
55✔
104
    }
105

106
    setParams(params) {
107
        if (
55!
108
            params.createdField != null ||
330✔
109
            this.isoDates != null ||
110
            this.nulls != null ||
111
            this.timestamps != null ||
112
            this.typeField != null ||
113
            this.updatedField != null
114
        ) {
115
            throw new OneTableArgError(
×
116
                'Using deprecated Table constructor parameters. Define in the Schema.params instead.'
117
            )
118
        }
119

120
        if (params.uuid) {
55!
121
            console.warn(
×
122
                'OneTable: Using deprecated Table constructor "uuid" parameter. Use a "generate" function instead or ' +
123
                    'Set schema models to use "generate: uuid|ulid" explicitly.'
124
            )
125
            params.generate = params.generate | params.uuid
×
126
        }
127

128
        if (params.partial == null) {
55!
129
            console.warn(
×
130
                'OneTable: Must set Table constructor "partial" param to true or false. ' +
131
                    'This param permits updating partial nested schemas. Currently defaults to false, ' +
132
                    'but in a future version will default to true. ' +
133
                    'Set to false to future proof or set to true for the new behavior.'
134
            )
135
            params.partial = true
×
136
        }
137
        //  Return hidden fields by default. Default is false.
138
        this.hidden = params.hidden != null ? params.hidden : false
55!
139
        this.partial = params.partial
55✔
140
        this.warn = params.warn || true
55✔
141

142
        if (typeof params.generate == 'function') {
55!
143
            this.generate = params.generate || this.uuid
×
144
        } else if (params.generate) {
55!
145
            throw new OneTableArgError('OneTable: Generate can only be a function')
×
146
        }
147

148
        this.name = params.name
55✔
149

150
        if (params.metrics) {
55!
151
            this.metrics = new Metrics(this, params.metrics, this.metrics)
×
152
        }
153
        if (params.monitor) {
55!
154
            this.monitor = params.monitor
×
155
        }
156
        this.params = params
55✔
157
    }
158

159
    setSchemaParams(params = {}) {
9✔
160
        this.createdField = params.createdField || 'created'
54✔
161
        this.isoDates = params.isoDates || false
54✔
162
        this.nulls = params.nulls || false
54✔
163
        this.timestamps = params.timestamps != null ? params.timestamps : false
54✔
164
        this.typeField = params.typeField || '_type'
54✔
165
        this.updatedField = params.updatedField || 'updated'
54✔
166

167
        if (params.hidden != null) {
54!
168
            console.warn(`Schema hidden params should be specified via the Table constructor params`)
×
169
        }
170
    }
171

172
    getSchemaParams() {
173
        return {
55✔
174
            createdField: this.createdField,
175
            isoDates: this.isoDates,
176
            nulls: this.nulls,
177
            timestamps: this.timestamps,
178
            typeField: this.typeField,
179
            updatedField: this.updatedField,
180
        }
181
    }
182

183
    async setSchema(schema) {
184
        return await this.schema.setSchema(schema)
×
185
    }
186

187
    getCurrentSchema() {
188
        return this.schema.getCurrentSchema()
5✔
189
    }
190

191
    async getKeys() {
192
        return await this.schema.getKeys()
×
193
    }
194

195
    async getPrimaryKeys() {
196
        let keys = await this.schema.getKeys()
×
197
        return keys.primary
×
198
    }
199

200
    async readSchema() {
201
        return this.schema.readSchema()
×
202
    }
203

204
    async readSchemas() {
205
        return this.schema.readSchemas()
×
206
    }
207

208
    async removeSchema(schema) {
209
        return this.schema.removeSchema(schema)
×
210
    }
211

212
    async saveSchema(schema) {
213
        return this.schema.saveSchema(schema)
×
214
    }
215

216
    /*
217
        Output the AWS table definition as a JSON structure to use in external tools such as CloudFormation
218
        or the AWS CLI to create your DynamoDB table. Uses the current schema index definition.
219
        Alternatively, params may contain standard DynamoDB createTable parameters.
220
    */
221
    getTableDefinition(params = {}) {
×
222
        let def = {
52✔
223
            AttributeDefinitions: [],
224
            KeySchema: [],
225
            LocalSecondaryIndexes: [],
226
            GlobalSecondaryIndexes: [],
227
            TableName: this.name,
228
        }
229
        let provisioned = params.provisioned || params.ProvisionedThroughput
52✔
230
        if (provisioned) {
52✔
231
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
1!
232
                def.BillingMode = 'PAY_PER_REQUEST'
×
233
            } else {
234
                def.ProvisionedThroughput = provisioned
1✔
235
                def.BillingMode = 'PROVISIONED'
1✔
236
            }
237
        } else {
238
            def.BillingMode = 'PAY_PER_REQUEST'
51✔
239
        }
240
        if (params.StreamSpecification) {
52!
241
            def.StreamSpecification = params.StreamSpecification
×
242
        }
243
        let attributes = {}
52✔
244
        let {indexes} = this.schema
52✔
245

246
        if (!indexes) {
52!
247
            throw new OneTableArgError('Cannot create table without schema indexes')
×
248
        }
249
        for (let [name, index] of Object.entries(indexes)) {
52✔
250
            let keys
251
            if (name == 'primary') {
125✔
252
                keys = def.KeySchema
52✔
253
            } else {
254
                let collection = index.type == 'local' ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
73✔
255
                keys = []
73✔
256
                let project, projection
257
                if (Array.isArray(index.project)) {
73✔
258
                    projection = 'INCLUDE'
3✔
259
                    project = index.project.filter((a) => a != indexes.primary.hash && a != indexes.primary.sort)
9✔
260
                } else if (index.project == 'keys') {
70✔
261
                    projection = 'KEYS_ONLY'
1✔
262
                } else {
263
                    projection = 'ALL'
69✔
264
                }
265
                let projDef = {
73✔
266
                    IndexName: name,
267
                    KeySchema: keys,
268
                    Projection: {
269
                        ProjectionType: projection,
270
                    },
271
                }
272
                if (project) {
73✔
273
                    projDef.Projection.NonKeyAttributes = project
3✔
274
                }
275
                def[collection].push(projDef)
73✔
276
            }
277
            keys.push({AttributeName: index.hash || indexes.primary.hash, KeyType: 'HASH'})
125!
278

279
            if (index.hash && !attributes[index.hash]) {
125✔
280
                let type = this.getAttributeType(index.hash) == 'number' ? 'N' : 'S'
121!
281
                def.AttributeDefinitions.push({AttributeName: index.hash, AttributeType: type})
121✔
282
                attributes[index.hash] = true
121✔
283
            }
284
            if (index.sort) {
125✔
285
                if (!attributes[index.sort]) {
120✔
286
                    let type = this.getAttributeType(index.sort) == 'number' ? 'N' : 'S'
118✔
287
                    def.AttributeDefinitions.push({AttributeName: index.sort, AttributeType: type})
118✔
288
                    attributes[index.sort] = true
118✔
289
                }
290
                keys.push({AttributeName: index.sort, KeyType: 'RANGE'})
120✔
291
            }
292
        }
293
        if (def.GlobalSecondaryIndexes.length == 0) {
52✔
294
            delete def.GlobalSecondaryIndexes
17✔
295
        } else if (provisioned) {
35✔
296
            for (let index of def.GlobalSecondaryIndexes) {
1✔
297
                index.ProvisionedThroughput = provisioned
3✔
298
            }
299
        }
300
        if (def.LocalSecondaryIndexes.length == 0) {
52✔
301
            delete def.LocalSecondaryIndexes
49✔
302
        }
303
        return def
52✔
304
    }
305

306
    /*
307
        Create a DynamoDB table. Uses the current schema index definition.
308
        Alternatively, params may contain standard DynamoDB createTable parameters.
309
    */
310
    async createTable(params = {}) {
51✔
311
        const def = this.getTableDefinition(params)
52✔
312
        let result
313

314
        this.log.trace(`OneTable createTable for "${this.name}"`, {def})
52✔
315
        if (this.V3) {
52✔
316
            result = await this.service.createTable(def)
50✔
317
        } else {
318
            result = await this.service.createTable(def).promise()
2✔
319
        }
320

321
        /*
322
            Wait for table to become active. Must do if setting a TTL attribute
323
        */
324
        if (params.TimeToLiveSpecification) {
52!
325
            params.wait = 5 * 60
×
326
        }
327
        if (params.wait) {
52!
328
            let deadline = new Date(Date.now() + params.wait * 1000)
×
329
            let info
330
            do {
×
331
                info = await this.describeTable()
×
332
                if (info.Table.TableStatus == 'ACTIVE') {
×
333
                    break
×
334
                }
335
                if (deadline < Date.now()) {
×
336
                    throw new Error('Table has not become active')
×
337
                }
338
                await this.delay(1000)
×
339
            } while (Date.now() < deadline)
340
        }
341

342
        /*
343
            Define a TTL attribute
344
        */
345
        if (params.TimeToLiveSpecification) {
52!
346
            let def = {
×
347
                TableName: this.name,
348
                TimeToLiveSpecification: params.TimeToLiveSpecification,
349
            }
350
            if (this.V3) {
×
351
                await this.service.updateTimeToLive(def)
×
352
            } else {
353
                await this.service.updateTimeToLive(def).promise()
×
354
            }
355
        }
356
        return result
52✔
357
    }
358

359
    getAttributeType(name) {
360
        for (let model of Object.values(this.schema.models)) {
239✔
361
            let fields = model.block.fields
257✔
362
            if (fields[name]) {
257✔
363
                return fields[name].type
233✔
364
            }
365
        }
366
        return null
6✔
367
    }
368

369
    /*
370
        Delete the DynamoDB table forever. Be careful.
371
    */
372
    async deleteTable(confirmation) {
373
        if (confirmation == ConfirmRemoveTable) {
52✔
374
            this.log.trace(`OneTable deleteTable for "${this.name}"`)
51✔
375
            if (this.V3) {
51✔
376
                await this.service.deleteTable({TableName: this.name})
49✔
377
            } else {
378
                await this.service.deleteTable({TableName: this.name}).promise()
2✔
379
            }
380
        } else {
381
            throw new OneTableArgError(`Missing required confirmation "${ConfirmRemoveTable}"`)
1✔
382
        }
383
    }
384

385
    async updateTable(params = {}) {
×
386
        let def = {
×
387
            AttributeDefinitions: [],
388
            GlobalSecondaryIndexUpdates: [],
389
            TableName: this.name,
390
        }
391
        let {create, provisioned} = params
×
392

393
        if (provisioned) {
×
394
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
×
395
                def.BillingMode = 'PAY_PER_REQUEST'
×
396
            } else {
397
                if (!create) {
×
398
                    def.ProvisionedThroughput = provisioned
×
399
                }
400
                def.BillingMode = 'PROVISIONED'
×
401
            }
402
        }
403
        let indexes = this.schema.indexes
×
404
        if (!indexes) {
×
405
            throw new OneTableArgError('Cannot update table without schema indexes')
×
406
        }
407
        if (create) {
×
408
            if (create.hash == null || create.hash == indexes.primary.hash || create.type == 'local') {
×
409
                throw new OneTableArgError('Cannot update table to create an LSI')
×
410
            }
411
            let keys = []
×
412
            let projection, project
413

414
            if (Array.isArray(create.project)) {
×
415
                projection = 'INCLUDE'
×
416
                project = create.project.filter((a) => a != create.hash && a != create.sort)
×
417
            } else if (create.project == 'keys') {
×
418
                projection = 'KEYS_ONLY'
×
419
            } else {
420
                projection = 'ALL'
×
421
            }
422
            let projDef = {
×
423
                IndexName: create.name,
424
                KeySchema: keys,
425
                Projection: {
426
                    ProjectionType: projection,
427
                },
428
            }
429
            if (project) {
×
430
                projDef.Projection.NonKeyAttributes = project
×
431
            }
432
            keys.push({AttributeName: create.hash, KeyType: 'HASH'})
×
433
            def.AttributeDefinitions.push({AttributeName: create.hash, AttributeType: 'S'})
×
434

435
            if (create.sort) {
×
436
                def.AttributeDefinitions.push({AttributeName: create.sort, AttributeType: 'S'})
×
437
                keys.push({AttributeName: create.sort, KeyType: 'RANGE'})
×
438
            }
439
            if (provisioned) {
×
440
                projDef.ProvisionedThroughput = provisioned
×
441
            }
442
            def.GlobalSecondaryIndexUpdates.push({Create: projDef})
×
443
        } else if (params.remove) {
×
444
            def.GlobalSecondaryIndexUpdates.push({Delete: {IndexName: params.remove.name}})
×
445
        } else if (params.update) {
×
446
            let update = {Update: {IndexName: params.update.name}}
×
447
            if (provisioned) {
×
448
                update.Update.ProvisionedThroughput = provisioned
×
449
            }
450
            def.GlobalSecondaryIndexUpdates.push(update)
×
451
        }
452
        if (def.GlobalSecondaryIndexUpdates.length == 0) {
×
453
            delete def.GlobalSecondaryIndexUpdates
×
454
        }
455
        if (params.TimeToLiveSpecification) {
×
456
            let def = {
×
457
                TableName: params.TableName,
458
                TimeToLiveSpecification: params.TimeToLiveSpecification,
459
            }
460
            if (this.V3) {
×
461
                await this.service.updateTimeToLive(def)
×
462
            } else {
463
                await this.service.updateTimeToLive(def).promise()
×
464
            }
465
        }
466
        this.log.trace(`OneTable updateTable for "${this.name}"`, {def})
×
467
        if (this.V3) {
×
468
            return await this.service.updateTable(def)
×
469
        } else {
470
            return await this.service.updateTable(def).promise()
×
471
        }
472
    }
473

474
    /*
475
        Return the raw AWS table description
476
    */
477
    async describeTable() {
478
        if (this.V3) {
4✔
479
            return await this.service.describeTable({TableName: this.name})
3✔
480
        } else {
481
            return await this.service.describeTable({TableName: this.name}).promise()
1✔
482
        }
483
    }
484

485
    /*
486
        Return true if the underlying DynamoDB table represented by this OneTable instance is present.
487
    */
488
    async exists() {
489
        let results = await this.listTables()
132✔
490
        return results && results.find((t) => t == this.name) != null ? true : false
305✔
491
    }
492

493
    /*
494
        Return a list of tables in the AWS region described by the Table instance
495
    */
496
    async listTables() {
497
        let results
498
        if (this.V3) {
135✔
499
            results = await this.service.listTables({})
129✔
500
        } else {
501
            results = await this.service.listTables({}).promise()
6✔
502
        }
503
        return results.TableNames
135✔
504
    }
505

506
    listModels() {
507
        return this.schema.listModels()
8✔
508
    }
509

510
    addModel(name, fields) {
511
        this.schema.addModel(name, fields)
1✔
512
    }
513

514
    getLog() {
515
        return this.log
×
516
    }
517

518
    setLog(log) {
519
        this.log = log
×
520
    }
521

522
    /*
523
        Thows exception if model cannot be found
524
     */
525
    getModel(name) {
526
        return this.schema.getModel(name)
127✔
527
    }
528

529
    removeModel(name) {
530
        return this.schema.removeModel(name)
2✔
531
    }
532

533
    getContext() {
534
        return this.context
11✔
535
    }
536

537
    addContext(context = {}) {
×
538
        this.context = Object.assign(this.context, context)
1✔
539
        return this
1✔
540
    }
541

542
    setContext(context = {}, merge = false) {
6!
543
        this.context = merge ? Object.assign(this.context, context) : context
7✔
544
        return this
7✔
545
    }
546

547
    clearContext() {
548
        this.context = {}
1✔
549
        return this
1✔
550
    }
551

552
    /*  PROTOTYPE
553
        Create a clone of the table with the same settings and replace the context
554
    */
555
    child(context) {
556
        let table = JSON.parse(JSON.stringify(this))
×
557
        table.context = context
×
558
        return table
×
559
    }
560

561
    /*
562
        High level model factory API
563
        The high level API is similar to the Model API except the model name is provided as the first parameter.
564
        This API is useful for factories
565
    */
566
    async create(modelName, properties, params) {
567
        let model = this.getModel(modelName)
19✔
568
        return await model.create(properties, params)
19✔
569
    }
570

571
    async find(modelName, properties, params) {
572
        let model = this.getModel(modelName)
3✔
573
        return await model.find(properties, params)
3✔
574
    }
575

576
    async get(modelName, properties, params) {
577
        let model = this.getModel(modelName)
21✔
578
        return await model.get(properties, params)
21✔
579
    }
580

581
    async load(modelName, properties, params) {
582
        let model = this.getModel(modelName)
6✔
583
        return await model.load(properties, params)
6✔
584
    }
585

586
    init(modelName, properties, params) {
587
        let model = this.getModel(modelName)
×
588
        return model.init(properties, params)
×
589
    }
590

591
    async remove(modelName, properties, params) {
592
        let model = this.getModel(modelName)
5✔
593
        return await model.remove(properties, params)
5✔
594
    }
595

596
    async scan(modelName, properties, params) {
597
        let model = this.getModel(modelName)
20✔
598
        return await model.scan(properties, params)
20✔
599
    }
600

601
    async update(modelName, properties, params) {
602
        let model = this.getModel(modelName)
6✔
603
        return await model.update(properties, params)
6✔
604
    }
605

606
    async upsert(modelName, properties, params = {}) {
×
607
        params.exists = null
×
608
        return this.update(modelName, properties, params)
×
609
    }
610

611
    async execute(model, op, cmd, properties = {}, params = {}) {
×
612
        let mark = new Date()
898✔
613
        let trace = {model, cmd, op, properties}
898✔
614
        let result
615
        try {
898✔
616
            let client = params.client || this.client
898✔
617
            if (params.stats || this.metrics || this.monitor) {
898✔
618
                cmd.ReturnConsumedCapacity = params.capacity || 'INDEXES'
1✔
619
                cmd.ReturnItemCollectionMetrics = 'SIZE'
1✔
620
            }
621
            this.log[params.log ? 'info' : 'trace'](`OneTable "${op}" "${model}"`, {trace})
898✔
622
            if (this.V3) {
898✔
623
                result = await client[op](cmd)
868✔
624
            } else {
625
                result = await client[DocumentClientMethods[op]](cmd).promise()
30✔
626
            }
627
        } catch (err) {
628
            //  V3 stores the error in 'name' (Ugh!)
629
            let code = err.code || err.name
6✔
630
            if (params.throw === false) {
6✔
631
                result = {}
1✔
632
            } else if (code == 'ConditionalCheckFailedException' && op == 'put') {
5!
633
                this.log.info(`Conditional check failed "${op}" on "${model}"`, {err, trace})
×
634
                throw new OneTableError(`Conditional create failed for "${model}"`, {
×
635
                    code,
636
                    err,
637
                    trace,
638
                })
639
            } else if (code == 'ProvisionedThroughputExceededException') {
5!
640
                throw new OneTableError('Provisioning Throughput Exception', {
×
641
                    code,
642
                    err,
643
                    trace,
644
                })
645
            } else if (code == 'TransactionCanceledException') {
5✔
646
                throw new OneTableError('Transaction Cancelled', {
3✔
647
                    code,
648
                    err,
649
                    trace,
650
                })
651
            } else {
652
                result = result || {}
2✔
653
                result.Error = 1
2✔
654
                if (params.log != false) {
2✔
655
                    this.log.error(`OneTable exception in "${op}" on "${model}"`, {err, trace})
2✔
656
                }
657
                throw new OneTableError(`OneTable execute failed "${op}" for "${model}. ${err.message}`, {
2✔
658
                    code,
659
                    err,
660
                    trace,
661
                })
662
            }
663
        } finally {
664
            if (result) {
898✔
665
                if (this.metrics) {
895!
666
                    this.metrics.add(model, op, result, params, mark)
×
667
                }
668
                if (this.monitor) {
895!
669
                    await this.monitor(model, op, result, params, mark)
×
670
                }
671
            }
672
        }
673
        if (typeof params.info == 'object') {
893!
674
            params.info.operation = DynamoOps[op]
×
675
            params.info.args = cmd
×
676
            params.info.properties = properties
×
677
        }
678
        return result
893✔
679
    }
680

681
    /*
682
        The low level API does not use models. It permits the reading / writing of any attribute.
683
    */
684
    async batchGet(batch, params = {}) {
2✔
685
        if (Object.getOwnPropertyNames(batch).length == 0) {
7!
686
            return []
×
687
        }
688
        let def = batch.RequestItems[this.name]
7✔
689

690
        if (params.fields) {
7✔
691
            if (params.fields.indexOf(this.typeField) < 0) {
1✔
692
                params.fields.push(this.typeField)
1✔
693
            }
694
            let expression = new Expression(this.schema.genericModel, 'batchGet', {}, params)
1✔
695
            let cmd = expression.command()
1✔
696
            def.ProjectionExpression = cmd.ProjectionExpression
1✔
697
            def.ExpressionAttributeNames = cmd.ExpressionAttributeNames
1✔
698
        }
699
        def.ConsistentRead = params.consistent ? true : false
7✔
700

701
        // let result = await this.execute(GenericModel, 'batchGet', batch, {}, params)
702

703
        let result,
704
            retries = 0,
6✔
705
            more
706
        result = params.parse ? [] : {Responses: {}}
6✔
707
        do {
6✔
708
            more = false
6✔
709
            let data = await this.execute(GenericModel, 'batchGet', batch, {}, params)
6✔
710
            if (data) {
6✔
711
                let responses = data.Responses
6✔
712
                if (responses) {
6✔
713
                    for (let [key, items] of Object.entries(responses)) {
6✔
714
                        for (let item of items) {
6✔
715
                            if (params.parse) {
12✔
716
                                item = this.unmarshall(item, params)
7✔
717
                                let type = item[this.typeField] || '_unknown'
7!
718
                                let model = this.schema.models[type]
7✔
719
                                if (model && model != this.schema.uniqueModel) {
7✔
720
                                    result.push(model.transformReadItem('get', item, {}, params))
7✔
721
                                }
722
                            } else {
723
                                let set = (result.Responses[key] = result.Responses[key] || [])
5✔
724
                                set.push(item)
5✔
725
                            }
726
                        }
727
                    }
728
                }
729
                let unprocessed = data.UnprocessedItems
6✔
730
                if (unprocessed && Object.keys(unprocessed).length) {
6!
731
                    batch.RequestItems = unprocessed
×
732
                    if (params.reprocess === false) {
×
733
                        return false
×
734
                    }
735
                    if (retries > 11) {
×
736
                        throw new Error(unprocessed)
×
737
                    }
738
                    await this.delay(10 * 2 ** retries++)
×
739
                    more = true
×
740
                }
741
            }
742
        } while (more)
743
        return result
6✔
744
    }
745

746
    /*
747
        AWS BatchWrite may throw an exception if no items can be processed.
748
        Otherwise it will retry (up to 11 times) and return partial results in UnprocessedItems.
749
        Those will be handled here if possible.
750
    */
751
    async batchWrite(batch, params = {}) {
5✔
752
        if (Object.getOwnPropertyNames(batch).length == 0) {
6!
753
            return {}
×
754
        }
755
        let retries = 0,
6✔
756
            more
757
        do {
6✔
758
            more = false
6✔
759
            let response = await this.execute(GenericModel, 'batchWrite', batch, {}, params)
6✔
760
            let data = response.data
5✔
761
            if (data && data.UnprocessedItems && Object.keys(data.UnprocessedItems).length) {
5!
762
                batch.RequestItems = data.UnprocessedItems
×
763
                if (params.reprocess === false) {
×
764
                    return false
×
765
                }
766
                if (retries > 11) {
×
767
                    throw new Error(response.UnprocessedItems)
×
768
                }
769
                await this.delay(10 * 2 ** retries++)
×
770
                more = true
×
771
            }
772
        } while (more)
773
        return true
5✔
774
    }
775

776
    async batchLoad(expression) {
777
        if (this.dataloader) {
6✔
778
            return await this.dataloader.load(expression)
6✔
779
        }
780
        throw new Error('params.dataloader DataLoader constructor is required to use load feature')
×
781
    }
782

783
    async deleteItem(properties, params) {
784
        return await this.schema.genericModel.deleteItem(properties, params)
1✔
785
    }
786

787
    async getItem(properties, params) {
788
        return await this.schema.genericModel.getItem(properties, params)
1✔
789
    }
790

791
    async putItem(properties, params) {
792
        return await this.schema.genericModel.putItem(properties, params)
2✔
793
    }
794

795
    async queryItems(properties, params) {
796
        return await this.schema.genericModel.queryItems(properties, params)
8✔
797
    }
798

799
    async scanItems(properties, params) {
800
        return await this.schema.genericModel.scanItems(properties, params)
20✔
801
    }
802

803
    async updateItem(properties, params) {
804
        return await this.schema.genericModel.updateItem(properties, params)
3✔
805
    }
806

807
    async fetch(models, properties, params) {
808
        return await this.schema.genericModel.fetch(models, properties, params)
2✔
809
    }
810

811
    /*
812
        Invoke a prepared transaction. Note: transactGet does not work on non-primary indexes.
813
     */
814
    async transact(op, transaction, params = {}) {
3✔
815
        if (params.execute === false) {
23!
816
            if (params.log !== false) {
×
817
                this.log[params.log ? 'info' : 'data'](`OneTable transaction for "${op}" (not executed)`, {
×
818
                    transaction,
819
                    op,
820
                    params,
821
                })
822
            }
823
            return transaction
×
824
        }
825
        let result = await this.execute(
23✔
826
            GenericModel,
827
            op == 'write' ? 'transactWrite' : 'transactGet',
23✔
828
            transaction,
829
            {},
830
            params
831
        )
832
        if (op == 'get') {
20✔
833
            if (params.parse) {
3✔
834
                let items = []
2✔
835
                for (let r of result.Responses) {
2✔
836
                    if (r.Item) {
6✔
837
                        let item = this.unmarshall(r.Item, params)
6✔
838
                        let type = item[this.typeField] || '_unknown'
6!
839
                        let model = this.schema.models[type]
6✔
840
                        if (model && model != this.schema.uniqueModel) {
6✔
841
                            items.push(model.transformReadItem('get', item, {}, params))
6✔
842
                        }
843
                    }
844
                }
845
                result = items
2✔
846
            }
847
        }
848
        return result
20✔
849
    }
850

851
    /*
852
        Convert items into a map of items by model type
853
    */
854
    groupByType(items, params = {}) {
5✔
855
        let result = {}
6✔
856
        for (let item of items) {
6✔
857
            let type = item[this.typeField] || '_unknown'
23!
858
            let list = (result[type] = result[type] || [])
23✔
859
            let model = this.schema.models[type]
23✔
860
            let preparedItem
861
            if (typeof params.hidden === 'boolean' && !params.hidden) {
23✔
862
                let fields = model.block.fields
4✔
863
                preparedItem = {}
4✔
864
                for (let [name, field] of Object.entries(fields)) {
4✔
865
                    if (!(field.hidden && params.hidden !== true)) {
37✔
866
                        preparedItem[name] = item[name]
17✔
867
                    }
868
                }
869
            } else {
870
                preparedItem = item
19✔
871
            }
872
            list.push(preparedItem)
23✔
873
        }
874
        return result
6✔
875
    }
876

877
    /**
878
        This is a function passed to the DataLoader that given an array of Expression
879
        will group all commands by TableName and make all Keys unique and then will perform
880
        a BatchGet request and process the response of the BatchRequest to return an array
881
        with the corresponding response in the same order as it was requested.
882
        @param expressions
883
        @returns {Promise<*>}
884
     */
885
    async batchLoaderFunction(expressions) {
886
        const commands = expressions.map((each) => each.command())
6✔
887

888
        const groupedByTableName = commands.reduce((groupedBy, item) => {
2✔
889
            const tableName = item.TableName
6✔
890
            if (!groupedBy[tableName]) groupedBy[tableName] = []
6✔
891
            groupedBy[tableName].push(item)
6✔
892
            return groupedBy
6✔
893
        }, {})
894

895
        // convert each of the get requests into a single RequestItem with unique Keys
896
        const requestItems = Object.keys(groupedByTableName).reduce((requestItems, tableName) => {
2✔
897
            // batch get does not support duplicate Keys, so we need to make them unique
898
            // it's complex because we have the unmarshalled values on the Keys when it's V3
899
            const allKeys = groupedByTableName[tableName].map((each) => each.Key)
6✔
900
            const uniqueKeys = allKeys.filter((key1, index1, self) => {
2✔
901
                const index2 = self.findIndex((key2) => {
6✔
902
                    return Object.keys(key2).every((prop) => {
9✔
903
                        if (this.V3) {
15✔
904
                            const type = Object.keys(key1[prop])[0] // { S: "XX" } => type is S
15✔
905
                            return key2[prop][type] === key1[prop][type]
15✔
906
                        }
907
                        return key2[prop] === key1[prop]
×
908
                    })
909
                })
910
                return index2 === index1
6✔
911
            })
912
            requestItems[tableName] = {Keys: uniqueKeys}
2✔
913
            return requestItems
2✔
914
        }, {})
915

916
        const results = await this.batchGet({RequestItems: requestItems})
2✔
917

918
        // return the exact mapping (on same order as input) of each get command request to the result from database
919
        // to do that we need to find in the Responses object the item that was request and return it in the same position
920
        return commands.map((command, index) => {
2✔
921
            const {model, params} = expressions[index]
6✔
922

923
            // each key is { pk: { S: "XX" } } when V3 or { pk: "XX" } when V2
924
            // on map function, key will be pk and unmarshalled will be { S: "XX" }, OR "XXX"
925
            const criteria = Object.entries(command.Key).map(([key, unmarshalled]) => {
6✔
926
                if (this.V3) {
12✔
927
                    const type = Object.keys(unmarshalled)[0] // the type will be S
12✔
928
                    return [[key, type], unmarshalled[type]] // return [[pk, S], "XX"]
12✔
929
                }
930
                return [[key], unmarshalled]
×
931
            })
932

933
            // finds the matching object in the unmarshalled Responses array with criteria Key above
934
            const findByKeyUnmarshalled = (items = []) =>
6!
935
                items.find((item) => {
6✔
936
                    return criteria.every(([[prop, type], value]) => {
9✔
937
                        if (type) return item[prop][type] === value // if it has a type it means it is V3
15✔
938
                        return item[prop] === value
×
939
                    })
940
                })
941

942
            const items = results.Responses[command.TableName]
6✔
943
            const item = findByKeyUnmarshalled(items)
6✔
944
            if (item) {
6✔
945
                const unmarshalled = this.unmarshall(item, params)
6✔
946
                return model.transformReadItem('get', unmarshalled, {}, params)
6✔
947
            }
948
        })
949
    }
950

951
    /*
952
        Simple non-crypto UUID. See node-uuid if you require crypto UUIDs.
953
        Consider ULIDs which are crypto sortable.
954
    */
955
    uuid() {
956
        return UUID()
9✔
957
    }
958

959
    // Simple time-based, sortable unique ID.
960
    ulid() {
961
        return new ULID().toString()
601✔
962
    }
963

964
    /*
965
        Crypto-grade ID of given length. If >= 10 in length, suitably unique for most use-cases.
966
     */
967
    uid(size = 10) {
×
968
        return UID(size)
×
969
    }
970

971
    setGenerate(fn) {
972
        this.generate = fn
×
973
    }
974

975
    /*
976
        Return the value template variable references in a list
977
     */
978
    getVars(v) {
979
        let list = []
470✔
980
        if (Array.isArray(v)) {
470!
981
            list = v
×
982
        } else if (typeof v == 'string') {
470✔
983
            v.replace(/\${(.*?)}/g, (match, varName) => {
469✔
984
                list.push(varName)
496✔
985
            })
986
        }
987
        return list
470✔
988
    }
989

990
    initCrypto(crypto) {
991
        this.crypto = Object.assign(crypto)
1✔
992
        for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
993
            crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
994
            this.crypto[name] = crypto
1✔
995
            this.crypto[name].name = name
1✔
996
        }
997
    }
998

999
    encrypt(text, name = 'primary', inCode = 'utf8', outCode = 'base64') {
×
1000
        if (text) {
1✔
1001
            if (!this.crypto) {
1!
1002
                throw new OneTableArgError('No database secret or cipher defined')
×
1003
            }
1004
            let crypto = this.crypto[name]
1✔
1005
            if (!crypto) {
1!
1006
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1007
            }
1008
            let iv = Crypto.randomBytes(IV_LENGTH)
1✔
1009
            let crypt = Crypto.createCipheriv(crypto.cipher, crypto.secret, iv)
1✔
1010
            let crypted = crypt.update(text, inCode, outCode) + crypt.final(outCode)
1✔
1011
            let tag = crypto.cipher.indexOf('-gcm') > 0 ? crypt.getAuthTag().toString(outCode) : ''
1!
1012
            text = `${crypto.name}:${tag}:${iv.toString('hex')}:${crypted}`
1✔
1013
        }
1014
        return text
1✔
1015
    }
1016

1017
    decrypt(text, inCode = 'base64', outCode = 'utf8') {
×
1018
        if (text) {
2✔
1019
            let [name, tag, iv, data] = text.split(':')
2✔
1020
            if (!data || !iv || !tag || !name) {
2!
1021
                return text
×
1022
            }
1023
            if (!this.crypto) {
2!
1024
                throw new OneTableArgError('No database secret or cipher defined')
×
1025
            }
1026
            let crypto = this.crypto[name]
2✔
1027
            if (!crypto) {
2!
1028
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1029
            }
1030
            iv = Buffer.from(iv, 'hex')
2✔
1031
            let crypt = Crypto.createDecipheriv(crypto.cipher, crypto.secret, iv)
2✔
1032
            crypt.setAuthTag(Buffer.from(tag, inCode))
2✔
1033
            text = crypt.update(data, inCode, outCode) + crypt.final(outCode)
2✔
1034
        }
1035
        return text
2✔
1036
    }
1037

1038
    /*
1039
        Marshall data into and out of DynamoDB format
1040
    */
1041
    marshall(item, params) {
1042
        let client = params.client || this.client
2,561✔
1043
        if (client.V3) {
2,561✔
1044
            let options = client.params.marshall
2,476✔
1045
            if (Array.isArray(item)) {
2,476!
1046
                for (let i = 0; i < item.length; i++) {
×
1047
                    item[i] = client.marshall(item[i], options)
×
1048
                }
1049
            } else {
1050
                item = client.marshall(item, options)
2,476✔
1051
            }
1052
        } else {
1053
            if (Array.isArray(item)) {
85!
1054
                for (let i = 0; i < item.length; i++) {
×
1055
                    item = this.marshallv2(item, params)
×
1056
                }
1057
            } else {
1058
                item = this.marshallv2(item, params)
85✔
1059
            }
1060
        }
1061
        return item
2,561✔
1062
    }
1063

1064
    /*
1065
        Marshall data out of DynamoDB format
1066
    */
1067
    unmarshall(item, params) {
1068
        if (this.V3) {
310✔
1069
            let client = params.client ? params.client : this.client
282!
1070
            let options = client.params.unmarshall
282✔
1071
            if (Array.isArray(item)) {
282✔
1072
                for (let i = 0; i < item.length; i++) {
231✔
1073
                    item[i] = client.unmarshall(item[i], options)
1,824✔
1074
                }
1075
            } else {
1076
                item = client.unmarshall(item, options)
51✔
1077
            }
1078
        } else {
1079
            if (Array.isArray(item)) {
28✔
1080
                for (let i = 0; i < item.length; i++) {
22✔
1081
                    item[i] = this.unmarshallv2(item[i])
26✔
1082
                }
1083
            } else {
1084
                item = this.unmarshallv2(item)
6✔
1085
            }
1086
        }
1087
        return item
310✔
1088
    }
1089

1090
    marshallv2(item, params) {
1091
        let client = params.client ? params.client : this.client
85!
1092
        for (let [key, value] of Object.entries(item)) {
85✔
1093
            if (value instanceof Set) {
203✔
1094
                item[key] = client.createSet(Array.from(value))
7✔
1095
            }
1096
        }
1097
        return item
85✔
1098
    }
1099

1100
    unmarshallv2(item) {
1101
        for (let [key, value] of Object.entries(item)) {
32✔
1102
            if (
440✔
1103
                value != null &&
898✔
1104
                typeof value == 'object' &&
1105
                value.wrapperName == 'Set' &&
1106
                Array.isArray(value.values)
1107
            ) {
1108
                let list = value.values
9✔
1109
                if (value.type == 'Binary') {
9✔
1110
                    //  Match AWS SDK V3 behavior
1111
                    list = list.map((v) => new Uint8Array(v))
9✔
1112
                }
1113
                item[key] = new Set(list)
9✔
1114
            }
1115
        }
1116
        return item
32✔
1117
    }
1118

1119
    /*
1120
        Handle DynamoDb Stream Records
1121
     */
1122
    stream(records, params = {}) {
×
1123
        const unmarshallStreamImage = (image, params) => {
×
1124
            if (this.V3) {
×
1125
                return this.unmarshall(image, params)
×
1126
            }
1127
            // Built in unmarshaller for SDK v2 isn't compatible with Stream Record Images
1128
            //  Temporarily disabled as it creates a hard V2 dependency
1129
            throw new Error('Cannot unmarshall AWS V2 SDK streams')
×
1130
            // return Converter.unmarshall(image)
1131
        }
1132

1133
        const tableModels = this.listModels()
×
1134

1135
        const result = {}
×
1136
        for (const record of records) {
×
1137
            if (!record.dynamodb.NewImage && !record.dynamodb.OldImage) {
×
1138
                continue
×
1139
            }
1140

1141
            const model = {
×
1142
                type: record.eventName,
1143
            }
1144
            let typeNew
1145
            let typeOld
1146

1147
            // Unmarshall and transform the New Image if it exists
1148
            if (record.dynamodb.NewImage) {
×
1149
                const jsonNew = unmarshallStreamImage(record.dynamodb.NewImage, params)
×
1150
                typeNew = jsonNew[this.typeField]
×
1151

1152
                // If type not found then don't do anything
1153
                if (typeNew && tableModels.includes(typeNew)) {
×
1154
                    model.new = this.schema.models[typeNew].transformReadItem('get', jsonNew, {}, params)
×
1155
                }
1156
            }
1157

1158
            // Unmarshall and transform the Old Image if it exists
1159
            if (record.dynamodb.OldImage) {
×
1160
                const jsonOld = unmarshallStreamImage(record.dynamodb.OldImage, params)
×
1161
                typeOld = jsonOld[this.typeField]
×
1162

1163
                // If type not found then don't do anything
1164
                if (typeOld && tableModels.includes(typeOld)) {
×
1165
                    // If there was a new image of a different type then skip
1166
                    if (typeNew && typeNew !== typeOld) {
×
1167
                        continue
×
1168
                    }
1169
                    model.old = this.schema.models[typeOld].transformReadItem('get', jsonOld, {}, params)
×
1170
                }
1171
            }
1172

1173
            const type = typeNew || typeOld
×
1174
            let list = (result[type] = result[type] || [])
×
1175
            list.push(model)
×
1176
        }
1177

1178
        return result
×
1179
    }
1180

1181
    /*
1182
        Recursive Object.assign. Will clone dates, regexp, simple objects and arrays.
1183
        Other class instances and primitives are copied not cloned.
1184
        Max recursive depth of 20
1185
    */
1186
    assign(dest, ...sources) {
1187
        for (let src of sources) {
1,252✔
1188
            if (src) {
1,257✔
1189
                dest = this.assignInner(dest, src)
1,257✔
1190
            }
1191
        }
1192
        return dest
1,252✔
1193
    }
1194

1195
    assignInner(dest, src, recurse = 0) {
1,257✔
1196
        if (recurse++ > 1000) {
3,208!
1197
            throw new OneTableError('Recursive merge', {code: 'RuntimeError'})
×
1198
        }
1199
        if (!src || !dest || typeof src != 'object') {
3,208!
1200
            return
×
1201
        }
1202
        for (let [key, v] of Object.entries(src)) {
3,208✔
1203
            if (v === undefined) {
9,293!
1204
                continue
×
1205
            } else if (v instanceof Date) {
9,293✔
1206
                dest[key] = new Date(v)
21✔
1207
            } else if (v instanceof RegExp) {
9,272✔
1208
                dest[key] = new RegExp(v.source, v.flags)
12✔
1209
            } else if (Array.isArray(v)) {
9,260✔
1210
                if (!Array.isArray(dest[key])) {
17✔
1211
                    dest[key] = []
17✔
1212
                }
1213
                if (v.length) {
17✔
1214
                    dest[key] = this.assignInner([key], v, recurse)
17✔
1215
                }
1216
            } else if (typeof v == 'object' && v != null && v.constructor.name == 'Object') {
9,243✔
1217
                if (typeof dest[key] != 'object') {
1,934✔
1218
                    dest[key] = {}
1,929✔
1219
                }
1220
                dest[key] = this.assignInner(dest[key], v, recurse)
1,934✔
1221
            } else {
1222
                dest[key] = v
7,309✔
1223
            }
1224
        }
1225
        return dest
3,208✔
1226
    }
1227

1228
    async delay(time) {
1229
        return new Promise(function (resolve) {
×
1230
            setTimeout(() => resolve(true), time)
×
1231
        })
1232
    }
1233
}
1234

1235
/*
1236
    Emulate SenseLogs API
1237
*/
1238
class Log {
1239
    constructor(logger) {
1240
        if (logger === true) {
55✔
1241
            this.logger = this.defaultLogger
19✔
1242
        } else if (logger) {
36✔
1243
            this.logger = logger
2✔
1244
        }
1245
    }
1246

1247
    enabled() {
1248
        return true
×
1249
    }
1250

1251
    data(message, context) {
1252
        this.process('data', message, context)
854✔
1253
    }
1254

1255
    emit(chan, message, context) {
1256
        this.process(chan, message, context)
×
1257
    }
1258

1259
    error(message, context) {
1260
        this.process('error', message, context)
2✔
1261
    }
1262

1263
    info(message, context) {
1264
        this.process('info', message, context)
6✔
1265
    }
1266

1267
    trace(message, context) {
1268
        this.process('trace', message, context)
1,053✔
1269
    }
1270

1271
    process(level, message, context) {
1272
        if (this.logger) {
1,915✔
1273
            this.logger(level, message, context)
412✔
1274
        }
1275
    }
1276

1277
    defaultLogger(level, message, context) {
1278
        if (level == 'trace' || level == 'data') {
358✔
1279
            //  params.log: true will cause the level to be changed to 'info'
1280
            return
358✔
1281
        }
1282
        if (context) {
×
1283
            try {
×
1284
                console.log(level, message, JSON.stringify(context, null, 4))
×
1285
            } catch (err) {
1286
                let buf = ['{']
×
1287
                for (let [key, value] of Object.entries(context)) {
×
1288
                    try {
×
1289
                        buf.push(`    ${key}: ${JSON.stringify(value, null, 4)}`)
×
1290
                    } catch (err) {
1291
                        /* continue */
1292
                    }
1293
                }
1294
                buf.push('}')
×
1295
                console.log(level, message, buf.join('\n'))
×
1296
            }
1297
        } else {
1298
            console.log(level, message)
×
1299
        }
1300
    }
1301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc