• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sensedeep / dynamodb-onetable / #66

pending completion
#66

push

Michael O'Brien
DEV: bump version

1128 of 1609 branches covered (70.11%)

Branch coverage included in aggregate %.

1808 of 2379 relevant lines covered (76.0%)

623.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.42
/src/Table.js
1
/*
2
    Table.js - DynamoDB table class
3

4
    A OneTable Table represents a single (connected) DynamoDB table
5
 */
6

7
import Crypto from 'crypto'
51✔
8
import UUID from './UUID.js'
51✔
9
import ULID from './ULID.js'
51✔
10
import UID from './UID.js'
51✔
11
import {Expression} from './Expression.js'
51✔
12
import {Schema} from './Schema.js'
51✔
13
import {Metrics} from './Metrics.js'
51✔
14
import {OneTableArgError, OneTableError} from './Error.js'
51✔
15
import {unmarshall} from '@aws-sdk/util-dynamodb'
51✔
16

17
/*
18
    AWS V2 DocumentClient methods
19
 */
20
const DocumentClientMethods = {
51✔
21
    delete: 'delete',
22
    get: 'get',
23
    find: 'query',
24
    put: 'put',
25
    scan: 'scan',
26
    update: 'update',
27
    batchGet: 'batchGet',
28
    batchWrite: 'batchWrite',
29
    transactGet: 'transactGet',
30
    transactWrite: 'transactWrite',
31
}
32

33
/*
34
    Safety string required on API to delete a table
35
*/
36
const ConfirmRemoveTable = 'DeleteTableForever'
51✔
37

38
/*
39
    Crypto IV length
40
*/
41
const IV_LENGTH = 16
51✔
42

43
const DynamoOps = {
51✔
44
    delete: 'deleteItem',
45
    get: 'getItem',
46
    find: 'query',
47
    put: 'putItem',
48
    scan: 'scan',
49
    update: 'updateItem',
50
    batchGet: 'batchGet',
51
    batchWrite: 'batchWrite',
52
    transactGet: 'transactGet',
53
    transactWrite: 'transactWrite',
54
}
55

56
const GenericModel = '_Generic'
51✔
57

58
const maxBatchSize = 25
51✔
59

60
/*
61
    Represent a single DynamoDB table
62
 */
63
export class Table {
51✔
64
    constructor(params = {}) {
×
65
        if (!params.name) {
57✔
66
            throw new OneTableArgError('Missing "name" property')
1✔
67
        }
68
        this.context = {}
56✔
69

70
        this.log = params.senselogs ? params.senselogs : new Log(params.logger)
56!
71
        this.log.trace(`Loading OneTable`)
56✔
72

73
        if (params.client) {
56✔
74
            this.setClient(params.client)
53✔
75
        }
76
        if (params.crypto) {
56✔
77
            this.initCrypto(params.crypto)
1✔
78
            this.crypto = Object.assign(params.crypto)
1✔
79
            for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
80
                crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
81
                this.crypto[name] = crypto
1✔
82
                this.crypto[name].name = name
1✔
83
            }
84
        }
85
        this.setParams(params)
56✔
86

87
        //  Set schema param defaults
88
        this.typeField = '_type'
56✔
89
        this.createdField = 'created'
56✔
90
        this.isoDates = false
56✔
91
        this.nulls = false
56✔
92
        this.timestamps = false
56✔
93
        this.updatedField = 'updated'
56✔
94

95
        this.schema = new Schema(this, params.schema)
56✔
96
        if (params.dataloader) {
56✔
97
            this.dataloader = new params.dataloader((cmds) => this.batchLoaderFunction(cmds), {maxBatchSize})
2✔
98
        }
99
    }
100

101
    setClient(client) {
102
        this.client = client
56✔
103
        this.V3 = client.V3
56✔
104
        this.service = this.V3 ? this.client : this.client.service
56✔
105
    }
106

107
    setParams(params) {
108
        if (
56!
109
            params.createdField != null ||
336✔
110
            this.isoDates != null ||
111
            this.nulls != null ||
112
            this.timestamps != null ||
113
            this.typeField != null ||
114
            this.updatedField != null
115
        ) {
116
            throw new OneTableArgError(
×
117
                'Using deprecated Table constructor parameters. Define in the Schema.params instead.'
118
            )
119
        }
120

121
        if (params.uuid) {
56!
122
            console.warn(
×
123
                'OneTable: Using deprecated Table constructor "uuid" parameter. Use a "generate" function instead or ' +
124
                    'Set schema models to use "generate: uuid|ulid" explicitly.'
125
            )
126
            params.generate = params.generate | params.uuid
×
127
        }
128

129
        if (params.partial == null) {
56!
130
            console.warn(
×
131
                'OneTable: Must set Table constructor "partial" param to true or false. ' +
132
                    'This param permits updating partial nested schemas. Currently defaults to false, ' +
133
                    'but in a future version will default to true. ' +
134
                    'Set to false to future proof or set to true for the new behavior.'
135
            )
136
            params.partial = true
×
137
        }
138
        //  Return hidden fields by default. Default is false.
139
        this.hidden = params.hidden != null ? params.hidden : false
56!
140
        this.partial = params.partial
56✔
141
        this.warn = params.warn || true
56✔
142

143
        if (typeof params.generate == 'function') {
56!
144
            this.generate = params.generate || this.uuid
×
145
        } else if (params.generate) {
56!
146
            throw new OneTableArgError('OneTable: Generate can only be a function')
×
147
        }
148

149
        this.name = params.name
56✔
150

151
        if (params.metrics) {
56!
152
            this.metrics = new Metrics(this, params.metrics, this.metrics)
×
153
        }
154
        if (params.monitor) {
56!
155
            this.monitor = params.monitor
×
156
        }
157
        this.params = params
56✔
158
    }
159

160
    setSchemaParams(params = {}) {
9✔
161
        this.createdField = params.createdField || 'created'
55✔
162
        this.isoDates = params.isoDates || false
55✔
163
        this.nulls = params.nulls || false
55✔
164
        this.timestamps = params.timestamps != null ? params.timestamps : false
55✔
165
        this.typeField = params.typeField || '_type'
55✔
166
        this.updatedField = params.updatedField || 'updated'
55✔
167

168
        if (params.hidden != null) {
55!
169
            console.warn(`Schema hidden params should be specified via the Table constructor params`)
×
170
        }
171
    }
172

173
    getSchemaParams() {
174
        return {
56✔
175
            createdField: this.createdField,
176
            isoDates: this.isoDates,
177
            nulls: this.nulls,
178
            timestamps: this.timestamps,
179
            typeField: this.typeField,
180
            updatedField: this.updatedField,
181
        }
182
    }
183

184
    async setSchema(schema) {
185
        return await this.schema.setSchema(schema)
×
186
    }
187

188
    getCurrentSchema() {
189
        return this.schema.getCurrentSchema()
5✔
190
    }
191

192
    async getKeys() {
193
        return await this.schema.getKeys()
×
194
    }
195

196
    async getPrimaryKeys() {
197
        let keys = await this.schema.getKeys()
×
198
        return keys.primary
×
199
    }
200

201
    async readSchema() {
202
        return this.schema.readSchema()
×
203
    }
204

205
    async readSchemas() {
206
        return this.schema.readSchemas()
×
207
    }
208

209
    async removeSchema(schema) {
210
        return this.schema.removeSchema(schema)
×
211
    }
212

213
    async saveSchema(schema) {
214
        return this.schema.saveSchema(schema)
×
215
    }
216

217
    /*
218
        Output the AWS table definition as a JSON structure to use in external tools such as CloudFormation
219
        or the AWS CLI to create your DynamoDB table. Uses the current schema index definition.
220
        Alternatively, params may contain standard DynamoDB createTable parameters.
221
    */
222
    getTableDefinition(params = {}) {
×
223
        let def = {
52✔
224
            AttributeDefinitions: [],
225
            KeySchema: [],
226
            LocalSecondaryIndexes: [],
227
            GlobalSecondaryIndexes: [],
228
            TableName: this.name,
229
        }
230
        let provisioned = params.provisioned || params.ProvisionedThroughput
52✔
231
        if (provisioned) {
52✔
232
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
1!
233
                def.BillingMode = 'PAY_PER_REQUEST'
×
234
            } else {
235
                def.ProvisionedThroughput = provisioned
1✔
236
                def.BillingMode = 'PROVISIONED'
1✔
237
            }
238
        } else {
239
            def.BillingMode = 'PAY_PER_REQUEST'
51✔
240
        }
241
        if (params.StreamSpecification) {
52!
242
            def.StreamSpecification = params.StreamSpecification
×
243
        }
244
        let attributes = {}
52✔
245
        let {indexes} = this.schema
52✔
246

247
        if (!indexes) {
52!
248
            throw new OneTableArgError('Cannot create table without schema indexes')
×
249
        }
250
        for (let [name, index] of Object.entries(indexes)) {
52✔
251
            let keys
252
            if (name == 'primary') {
125✔
253
                keys = def.KeySchema
52✔
254
            } else {
255
                let collection = index.type == 'local' ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
73✔
256
                keys = []
73✔
257
                let project, projection
258
                if (Array.isArray(index.project)) {
73✔
259
                    projection = 'INCLUDE'
3✔
260
                    project = index.project.filter((a) => a != indexes.primary.hash && a != indexes.primary.sort)
9✔
261
                } else if (index.project == 'keys') {
70✔
262
                    projection = 'KEYS_ONLY'
1✔
263
                } else {
264
                    projection = 'ALL'
69✔
265
                }
266
                let projDef = {
73✔
267
                    IndexName: name,
268
                    KeySchema: keys,
269
                    Projection: {
270
                        ProjectionType: projection,
271
                    },
272
                }
273
                if (project) {
73✔
274
                    projDef.Projection.NonKeyAttributes = project
3✔
275
                }
276
                def[collection].push(projDef)
73✔
277
            }
278
            keys.push({AttributeName: index.hash || indexes.primary.hash, KeyType: 'HASH'})
125!
279

280
            if (index.hash && !attributes[index.hash]) {
125✔
281
                let type = this.getAttributeType(index.hash) == 'number' ? 'N' : 'S'
121!
282
                def.AttributeDefinitions.push({AttributeName: index.hash, AttributeType: type})
121✔
283
                attributes[index.hash] = true
121✔
284
            }
285
            if (index.sort) {
125✔
286
                if (!attributes[index.sort]) {
120✔
287
                    let type = this.getAttributeType(index.sort) == 'number' ? 'N' : 'S'
118✔
288
                    def.AttributeDefinitions.push({AttributeName: index.sort, AttributeType: type})
118✔
289
                    attributes[index.sort] = true
118✔
290
                }
291
                keys.push({AttributeName: index.sort, KeyType: 'RANGE'})
120✔
292
            }
293
        }
294
        if (def.GlobalSecondaryIndexes.length == 0) {
52✔
295
            delete def.GlobalSecondaryIndexes
17✔
296
        } else if (provisioned) {
35✔
297
            for (let index of def.GlobalSecondaryIndexes) {
1✔
298
                index.ProvisionedThroughput = provisioned
3✔
299
            }
300
        }
301
        if (def.LocalSecondaryIndexes.length == 0) {
52✔
302
            delete def.LocalSecondaryIndexes
49✔
303
        }
304
        return def
52✔
305
    }
306

307
    /*
308
        Create a DynamoDB table. Uses the current schema index definition.
309
        Alternatively, params may contain standard DynamoDB createTable parameters.
310
    */
311
    async createTable(params = {}) {
51✔
312
        const def = this.getTableDefinition(params)
52✔
313
        let result
314

315
        this.log.trace(`OneTable createTable for "${this.name}"`, {def})
52✔
316
        if (this.V3) {
52✔
317
            result = await this.service.createTable(def)
50✔
318
        } else {
319
            result = await this.service.createTable(def).promise()
2✔
320
        }
321

322
        /*
323
            Wait for table to become active. Must do if setting a TTL attribute
324
        */
325
        if (params.TimeToLiveSpecification) {
52!
326
            params.wait = 5 * 60
×
327
        }
328
        if (params.wait) {
52!
329
            let deadline = new Date(Date.now() + params.wait * 1000)
×
330
            let info
331
            do {
×
332
                info = await this.describeTable()
×
333
                if (info.Table.TableStatus == 'ACTIVE') {
×
334
                    break
×
335
                }
336
                if (deadline < Date.now()) {
×
337
                    throw new Error('Table has not become active')
×
338
                }
339
                await this.delay(1000)
×
340
            } while (Date.now() < deadline)
341
        }
342

343
        /*
344
            Define a TTL attribute
345
        */
346
        if (params.TimeToLiveSpecification) {
52!
347
            let def = {
×
348
                TableName: this.name,
349
                TimeToLiveSpecification: params.TimeToLiveSpecification,
350
            }
351
            if (this.V3) {
×
352
                await this.service.updateTimeToLive(def)
×
353
            } else {
354
                await this.service.updateTimeToLive(def).promise()
×
355
            }
356
        }
357
        return result
52✔
358
    }
359

360
    getAttributeType(name) {
361
        for (let model of Object.values(this.schema.models)) {
239✔
362
            let fields = model.block.fields
257✔
363
            if (fields[name]) {
257✔
364
                return fields[name].type
233✔
365
            }
366
        }
367
        return null
6✔
368
    }
369

370
    /*
371
        Delete the DynamoDB table forever. Be careful.
372
    */
373
    async deleteTable(confirmation) {
374
        if (confirmation == ConfirmRemoveTable) {
52✔
375
            this.log.trace(`OneTable deleteTable for "${this.name}"`)
51✔
376
            if (this.V3) {
51✔
377
                await this.service.deleteTable({TableName: this.name})
49✔
378
            } else {
379
                await this.service.deleteTable({TableName: this.name}).promise()
2✔
380
            }
381
        } else {
382
            throw new OneTableArgError(`Missing required confirmation "${ConfirmRemoveTable}"`)
1✔
383
        }
384
    }
385

386
    async updateTable(params = {}) {
×
387
        let def = {
×
388
            AttributeDefinitions: [],
389
            GlobalSecondaryIndexUpdates: [],
390
            TableName: this.name,
391
        }
392
        let {create, provisioned} = params
×
393

394
        if (provisioned) {
×
395
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
×
396
                def.BillingMode = 'PAY_PER_REQUEST'
×
397
            } else {
398
                if (!create) {
×
399
                    def.ProvisionedThroughput = provisioned
×
400
                }
401
                def.BillingMode = 'PROVISIONED'
×
402
            }
403
        }
404
        let indexes = this.schema.indexes
×
405
        if (!indexes) {
×
406
            throw new OneTableArgError('Cannot update table without schema indexes')
×
407
        }
408
        if (create) {
×
409
            if (create.hash == null || create.hash == indexes.primary.hash || create.type == 'local') {
×
410
                throw new OneTableArgError('Cannot update table to create an LSI')
×
411
            }
412
            let keys = []
×
413
            let projection, project
414

415
            if (Array.isArray(create.project)) {
×
416
                projection = 'INCLUDE'
×
417
                project = create.project.filter((a) => a != create.hash && a != create.sort)
×
418
            } else if (create.project == 'keys') {
×
419
                projection = 'KEYS_ONLY'
×
420
            } else {
421
                projection = 'ALL'
×
422
            }
423
            let projDef = {
×
424
                IndexName: create.name,
425
                KeySchema: keys,
426
                Projection: {
427
                    ProjectionType: projection,
428
                },
429
            }
430
            if (project) {
×
431
                projDef.Projection.NonKeyAttributes = project
×
432
            }
433
            keys.push({AttributeName: create.hash, KeyType: 'HASH'})
×
434
            def.AttributeDefinitions.push({AttributeName: create.hash, AttributeType: 'S'})
×
435

436
            if (create.sort) {
×
437
                def.AttributeDefinitions.push({AttributeName: create.sort, AttributeType: 'S'})
×
438
                keys.push({AttributeName: create.sort, KeyType: 'RANGE'})
×
439
            }
440
            if (provisioned) {
×
441
                projDef.ProvisionedThroughput = provisioned
×
442
            }
443
            def.GlobalSecondaryIndexUpdates.push({Create: projDef})
×
444
        } else if (params.remove) {
×
445
            def.GlobalSecondaryIndexUpdates.push({Delete: {IndexName: params.remove.name}})
×
446
        } else if (params.update) {
×
447
            let update = {Update: {IndexName: params.update.name}}
×
448
            if (provisioned) {
×
449
                update.Update.ProvisionedThroughput = provisioned
×
450
            }
451
            def.GlobalSecondaryIndexUpdates.push(update)
×
452
        }
453
        if (def.GlobalSecondaryIndexUpdates.length == 0) {
×
454
            delete def.GlobalSecondaryIndexUpdates
×
455
        }
456
        if (params.TimeToLiveSpecification) {
×
457
            let def = {
×
458
                TableName: params.TableName,
459
                TimeToLiveSpecification: params.TimeToLiveSpecification,
460
            }
461
            if (this.V3) {
×
462
                await this.service.updateTimeToLive(def)
×
463
            } else {
464
                await this.service.updateTimeToLive(def).promise()
×
465
            }
466
        }
467
        this.log.trace(`OneTable updateTable for "${this.name}"`, {def})
×
468
        if (this.V3) {
×
469
            return await this.service.updateTable(def)
×
470
        } else {
471
            return await this.service.updateTable(def).promise()
×
472
        }
473
    }
474

475
    /*
476
        Return the raw AWS table description
477
    */
478
    async describeTable() {
479
        if (this.V3) {
4✔
480
            return await this.service.describeTable({TableName: this.name})
3✔
481
        } else {
482
            return await this.service.describeTable({TableName: this.name}).promise()
1✔
483
        }
484
    }
485

486
    /*
487
        Return true if the underlying DynamoDB table represented by this OneTable instance is present.
488
    */
489
    async exists() {
490
        let results = await this.listTables()
132✔
491
        return results && results.find((t) => t == this.name) != null ? true : false
345✔
492
    }
493

494
    /*
495
        Return a list of tables in the AWS region described by the Table instance
496
    */
497
    async listTables() {
498
        let results
499
        if (this.V3) {
135✔
500
            results = await this.service.listTables({})
129✔
501
        } else {
502
            results = await this.service.listTables({}).promise()
6✔
503
        }
504
        return results.TableNames
135✔
505
    }
506

507
    listModels() {
508
        return this.schema.listModels()
10✔
509
    }
510

511
    addModel(name, fields) {
512
        this.schema.addModel(name, fields)
1✔
513
    }
514

515
    getLog() {
516
        return this.log
×
517
    }
518

519
    setLog(log) {
520
        this.log = log
×
521
    }
522

523
    /*
524
        Thows exception if model cannot be found
525
     */
526
    getModel(name) {
527
        return this.schema.getModel(name)
127✔
528
    }
529

530
    removeModel(name) {
531
        return this.schema.removeModel(name)
2✔
532
    }
533

534
    getContext() {
535
        return this.context
11✔
536
    }
537

538
    addContext(context = {}) {
×
539
        this.context = Object.assign(this.context, context)
1✔
540
        return this
1✔
541
    }
542

543
    setContext(context = {}, merge = false) {
6!
544
        this.context = merge ? Object.assign(this.context, context) : context
7✔
545
        return this
7✔
546
    }
547

548
    clearContext() {
549
        this.context = {}
1✔
550
        return this
1✔
551
    }
552

553
    /*  PROTOTYPE
554
        Create a clone of the table with the same settings and replace the context
555
    */
556
    child(context) {
557
        let table = JSON.parse(JSON.stringify(this))
×
558
        table.context = context
×
559
        return table
×
560
    }
561

562
    /*
563
        High level model factory API
564
        The high level API is similar to the Model API except the model name is provided as the first parameter.
565
        This API is useful for factories
566
    */
567
    async create(modelName, properties, params) {
568
        let model = this.getModel(modelName)
19✔
569
        return await model.create(properties, params)
19✔
570
    }
571

572
    async find(modelName, properties, params) {
573
        let model = this.getModel(modelName)
3✔
574
        return await model.find(properties, params)
3✔
575
    }
576

577
    async get(modelName, properties, params) {
578
        let model = this.getModel(modelName)
21✔
579
        return await model.get(properties, params)
21✔
580
    }
581

582
    async load(modelName, properties, params) {
583
        let model = this.getModel(modelName)
6✔
584
        return await model.load(properties, params)
6✔
585
    }
586

587
    init(modelName, properties, params) {
588
        let model = this.getModel(modelName)
×
589
        return model.init(properties, params)
×
590
    }
591

592
    async remove(modelName, properties, params) {
593
        let model = this.getModel(modelName)
5✔
594
        return await model.remove(properties, params)
5✔
595
    }
596

597
    async scan(modelName, properties, params) {
598
        let model = this.getModel(modelName)
20✔
599
        return await model.scan(properties, params)
20✔
600
    }
601

602
    async update(modelName, properties, params) {
603
        let model = this.getModel(modelName)
6✔
604
        return await model.update(properties, params)
6✔
605
    }
606

607
    async upsert(modelName, properties, params = {}) {
×
608
        params.exists = null
×
609
        return this.update(modelName, properties, params)
×
610
    }
611

612
    async execute(model, op, cmd, properties = {}, params = {}) {
×
613
        let mark = new Date()
898✔
614
        let trace = {model, cmd, op, properties}
898✔
615
        let result
616
        try {
898✔
617
            let client = params.client || this.client
898✔
618
            if (params.stats || this.metrics || this.monitor) {
898✔
619
                cmd.ReturnConsumedCapacity = params.capacity || 'INDEXES'
1✔
620
                cmd.ReturnItemCollectionMetrics = 'SIZE'
1✔
621
            }
622
            this.log[params.log ? 'info' : 'trace'](`OneTable "${op}" "${model}"`, {trace})
898✔
623
            if (this.V3) {
898✔
624
                result = await client[op](cmd)
868✔
625
            } else {
626
                result = await client[DocumentClientMethods[op]](cmd).promise()
30✔
627
            }
628
        } catch (err) {
629
            //  V3 stores the error in 'name' (Ugh!)
630
            let code = err.code || err.name
6✔
631
            if (params.throw === false) {
6✔
632
                result = {}
1✔
633
            } else if (code == 'ConditionalCheckFailedException' && op == 'put') {
5!
634
                this.log.info(`Conditional check failed "${op}" on "${model}"`, {err, trace})
×
635
                throw new OneTableError(`Conditional create failed for "${model}"`, {
×
636
                    code,
637
                    err,
638
                    trace,
639
                })
640
            } else if (code == 'ProvisionedThroughputExceededException') {
5!
641
                throw new OneTableError('Provisioning Throughput Exception', {
×
642
                    code,
643
                    err,
644
                    trace,
645
                })
646
            } else if (code == 'TransactionCanceledException') {
5✔
647
                throw new OneTableError('Transaction Cancelled', {
3✔
648
                    code,
649
                    err,
650
                    trace,
651
                })
652
            } else {
653
                result = result || {}
2✔
654
                result.Error = 1
2✔
655
                if (params.log != false) {
2✔
656
                    this.log.error(`OneTable exception in "${op}" on "${model} ${err.message}"`, {err, trace})
2✔
657
                }
658
                throw new OneTableError(`OneTable execute failed "${op}" for "${model}", ${err.message}`, {
2✔
659
                    code,
660
                    err,
661
                    trace,
662
                })
663
            }
664
        } finally {
665
            if (result) {
898✔
666
                if (this.metrics) {
895!
667
                    this.metrics.add(model, op, result, params, mark)
×
668
                }
669
                if (this.monitor) {
895!
670
                    await this.monitor(model, op, result, params, mark)
×
671
                }
672
            }
673
        }
674
        if (typeof params.info == 'object') {
893!
675
            params.info.operation = DynamoOps[op]
×
676
            params.info.args = cmd
×
677
            params.info.properties = properties
×
678
        }
679
        return result
893✔
680
    }
681

682
    /*
683
        The low level API does not use models. It permits the reading / writing of any attribute.
684
    */
685
    async batchGet(batch, params = {}) {
2✔
686
        if (Object.getOwnPropertyNames(batch).length == 0) {
7!
687
            return []
×
688
        }
689
        let def = batch.RequestItems[this.name]
7✔
690

691
        if (params.fields) {
7✔
692
            if (params.fields.indexOf(this.typeField) < 0) {
1✔
693
                params.fields.push(this.typeField)
1✔
694
            }
695
            let expression = new Expression(this.schema.genericModel, 'batchGet', {}, params)
1✔
696
            let cmd = expression.command()
1✔
697
            def.ProjectionExpression = cmd.ProjectionExpression
1✔
698
            def.ExpressionAttributeNames = cmd.ExpressionAttributeNames
1✔
699
        }
700
        def.ConsistentRead = params.consistent ? true : false
7✔
701

702
        // let result = await this.execute(GenericModel, 'batchGet', batch, {}, params)
703

704
        let result,
705
            retries = 0,
6✔
706
            more
707
        result = params.parse ? [] : {Responses: {}}
6✔
708
        do {
6✔
709
            more = false
6✔
710
            let data = await this.execute(GenericModel, 'batchGet', batch, {}, params)
6✔
711
            if (data) {
6✔
712
                let responses = data.Responses
6✔
713
                if (responses) {
6✔
714
                    for (let [key, items] of Object.entries(responses)) {
6✔
715
                        for (let item of items) {
6✔
716
                            if (params.parse) {
12✔
717
                                item = this.unmarshall(item, params)
7✔
718
                                let type = item[this.typeField] || '_unknown'
7!
719
                                let model = this.schema.models[type]
7✔
720
                                if (model && model != this.schema.uniqueModel) {
7✔
721
                                    result.push(model.transformReadItem('get', item, {}, params))
7✔
722
                                }
723
                            } else {
724
                                let set = (result.Responses[key] = result.Responses[key] || [])
5✔
725
                                set.push(item)
5✔
726
                            }
727
                        }
728
                    }
729
                }
730
                let unprocessed = data.UnprocessedItems
6✔
731
                if (unprocessed && Object.keys(unprocessed).length) {
6!
732
                    batch.RequestItems = unprocessed
×
733
                    if (params.reprocess === false) {
×
734
                        return false
×
735
                    }
736
                    if (retries > 11) {
×
737
                        throw new Error(unprocessed)
×
738
                    }
739
                    await this.delay(10 * 2 ** retries++)
×
740
                    more = true
×
741
                }
742
            }
743
        } while (more)
744
        return result
6✔
745
    }
746

747
    /*
748
        AWS BatchWrite may throw an exception if no items can be processed.
749
        Otherwise it will retry (up to 11 times) and return partial results in UnprocessedItems.
750
        Those will be handled here if possible.
751
    */
752
    async batchWrite(batch, params = {}) {
5✔
753
        if (Object.getOwnPropertyNames(batch).length == 0) {
6!
754
            return {}
×
755
        }
756
        let retries = 0,
6✔
757
            more
758
        do {
6✔
759
            more = false
6✔
760
            let response = await this.execute(GenericModel, 'batchWrite', batch, {}, params)
6✔
761
            let data = response.data
5✔
762
            if (data && data.UnprocessedItems && Object.keys(data.UnprocessedItems).length) {
5!
763
                batch.RequestItems = data.UnprocessedItems
×
764
                if (params.reprocess === false) {
×
765
                    return false
×
766
                }
767
                if (retries > 11) {
×
768
                    throw new Error(response.UnprocessedItems)
×
769
                }
770
                await this.delay(10 * 2 ** retries++)
×
771
                more = true
×
772
            }
773
        } while (more)
774
        return true
5✔
775
    }
776

777
    async batchLoad(expression) {
778
        if (this.dataloader) {
6✔
779
            return await this.dataloader.load(expression)
6✔
780
        }
781
        throw new Error('params.dataloader DataLoader constructor is required to use load feature')
×
782
    }
783

784
    async deleteItem(properties, params) {
785
        return await this.schema.genericModel.deleteItem(properties, params)
1✔
786
    }
787

788
    async getItem(properties, params) {
789
        return await this.schema.genericModel.getItem(properties, params)
1✔
790
    }
791

792
    async putItem(properties, params) {
793
        return await this.schema.genericModel.putItem(properties, params)
2✔
794
    }
795

796
    async queryItems(properties, params) {
797
        return await this.schema.genericModel.queryItems(properties, params)
8✔
798
    }
799

800
    async scanItems(properties, params) {
801
        return await this.schema.genericModel.scanItems(properties, params)
20✔
802
    }
803

804
    async updateItem(properties, params) {
805
        return await this.schema.genericModel.updateItem(properties, params)
3✔
806
    }
807

808
    async fetch(models, properties, params) {
809
        return await this.schema.genericModel.fetch(models, properties, params)
2✔
810
    }
811

812
    /*
813
        Invoke a prepared transaction. Note: transactGet does not work on non-primary indexes.
814
     */
815
    async transact(op, transaction, params = {}) {
3✔
816
        if (params.execute === false) {
23!
817
            if (params.log !== false) {
×
818
                this.log[params.log ? 'info' : 'data'](`OneTable transaction for "${op}" (not executed)`, {
×
819
                    transaction,
820
                    op,
821
                    params,
822
                })
823
            }
824
            return transaction
×
825
        }
826
        let result = await this.execute(
23✔
827
            GenericModel,
828
            op == 'write' ? 'transactWrite' : 'transactGet',
23✔
829
            transaction,
830
            {},
831
            params
832
        )
833
        if (op == 'get') {
20✔
834
            if (params.parse) {
3✔
835
                let items = []
2✔
836
                for (let r of result.Responses) {
2✔
837
                    if (r.Item) {
6✔
838
                        let item = this.unmarshall(r.Item, params)
6✔
839
                        let type = item[this.typeField] || '_unknown'
6!
840
                        let model = this.schema.models[type]
6✔
841
                        if (model && model != this.schema.uniqueModel) {
6✔
842
                            items.push(model.transformReadItem('get', item, {}, params))
6✔
843
                        }
844
                    }
845
                }
846
                result = items
2✔
847
            }
848
        }
849
        return result
20✔
850
    }
851

852
    /*
853
        Convert items into a map of items by model type
854
    */
855
    groupByType(items, params = {}) {
5✔
856
        let result = {}
6✔
857
        for (let item of items) {
6✔
858
            let type = item[this.typeField] || '_unknown'
23!
859
            let list = (result[type] = result[type] || [])
23✔
860
            let model = this.schema.models[type]
23✔
861
            let preparedItem
862
            if (typeof params.hidden === 'boolean' && !params.hidden) {
23✔
863
                let fields = model.block.fields
4✔
864
                preparedItem = {}
4✔
865
                for (let [name, field] of Object.entries(fields)) {
4✔
866
                    if (!(field.hidden && params.hidden !== true)) {
37✔
867
                        preparedItem[name] = item[name]
17✔
868
                    }
869
                }
870
            } else {
871
                preparedItem = item
19✔
872
            }
873
            list.push(preparedItem)
23✔
874
        }
875
        return result
6✔
876
    }
877

878
    /**
879
        This is a function passed to the DataLoader that given an array of Expression
880
        will group all commands by TableName and make all Keys unique and then will perform
881
        a BatchGet request and process the response of the BatchRequest to return an array
882
        with the corresponding response in the same order as it was requested.
883
        @param expressions
884
        @returns {Promise<*>}
885
     */
886
    async batchLoaderFunction(expressions) {
887
        const commands = expressions.map((each) => each.command())
6✔
888

889
        const groupedByTableName = commands.reduce((groupedBy, item) => {
2✔
890
            const tableName = item.TableName
6✔
891
            if (!groupedBy[tableName]) groupedBy[tableName] = []
6✔
892
            groupedBy[tableName].push(item)
6✔
893
            return groupedBy
6✔
894
        }, {})
895

896
        // convert each of the get requests into a single RequestItem with unique Keys
897
        const requestItems = Object.keys(groupedByTableName).reduce((requestItems, tableName) => {
2✔
898
            // batch get does not support duplicate Keys, so we need to make them unique
899
            // it's complex because we have the unmarshalled values on the Keys when it's V3
900
            const allKeys = groupedByTableName[tableName].map((each) => each.Key)
6✔
901
            const uniqueKeys = allKeys.filter((key1, index1, self) => {
2✔
902
                const index2 = self.findIndex((key2) => {
6✔
903
                    return Object.keys(key2).every((prop) => {
9✔
904
                        if (this.V3) {
15✔
905
                            const type = Object.keys(key1[prop])[0] // { S: "XX" } => type is S
15✔
906
                            return key2[prop][type] === key1[prop][type]
15✔
907
                        }
908
                        return key2[prop] === key1[prop]
×
909
                    })
910
                })
911
                return index2 === index1
6✔
912
            })
913
            requestItems[tableName] = {Keys: uniqueKeys}
2✔
914
            return requestItems
2✔
915
        }, {})
916

917
        const results = await this.batchGet({RequestItems: requestItems})
2✔
918

919
        // return the exact mapping (on same order as input) of each get command request to the result from database
920
        // to do that we need to find in the Responses object the item that was request and return it in the same position
921
        return commands.map((command, index) => {
2✔
922
            const {model, params} = expressions[index]
6✔
923

924
            // each key is { pk: { S: "XX" } } when V3 or { pk: "XX" } when V2
925
            // on map function, key will be pk and unmarshalled will be { S: "XX" }, OR "XXX"
926
            const criteria = Object.entries(command.Key).map(([key, unmarshalled]) => {
6✔
927
                if (this.V3) {
12✔
928
                    const type = Object.keys(unmarshalled)[0] // the type will be S
12✔
929
                    return [[key, type], unmarshalled[type]] // return [[pk, S], "XX"]
12✔
930
                }
931
                return [[key], unmarshalled]
×
932
            })
933

934
            // finds the matching object in the unmarshalled Responses array with criteria Key above
935
            const findByKeyUnmarshalled = (items = []) =>
6!
936
                items.find((item) => {
6✔
937
                    return criteria.every(([[prop, type], value]) => {
9✔
938
                        if (type) return item[prop][type] === value // if it has a type it means it is V3
15✔
939
                        return item[prop] === value
×
940
                    })
941
                })
942

943
            const items = results.Responses[command.TableName]
6✔
944
            const item = findByKeyUnmarshalled(items)
6✔
945
            if (item) {
6✔
946
                const unmarshalled = this.unmarshall(item, params)
6✔
947
                return model.transformReadItem('get', unmarshalled, {}, params)
6✔
948
            }
949
        })
950
    }
951

952
    /*
953
        Simple non-crypto UUID. See node-uuid if you require crypto UUIDs.
954
        Consider ULIDs which are crypto sortable.
955
    */
956
    uuid() {
957
        return UUID()
9✔
958
    }
959

960
    // Simple time-based, sortable unique ID.
961
    ulid() {
962
        return new ULID().toString()
601✔
963
    }
964

965
    /*
966
        Crypto-grade ID of given length. If >= 10 in length, suitably unique for most use-cases.
967
     */
968
    uid(size = 10) {
×
969
        return UID(size)
×
970
    }
971

972
    setGenerate(fn) {
973
        this.generate = fn
×
974
    }
975

976
    /*
977
        Return the value template variable references in a list
978
     */
979
    getVars(v) {
980
        let list = []
476✔
981
        if (Array.isArray(v)) {
476!
982
            list = v
×
983
        } else if (typeof v == 'string') {
476✔
984
            v.replace(/\${(.*?)}/g, (match, varName) => {
475✔
985
                list.push(varName)
501✔
986
            })
987
        }
988
        return list
476✔
989
    }
990

991
    initCrypto(crypto) {
992
        this.crypto = Object.assign(crypto)
1✔
993
        for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
994
            crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
995
            this.crypto[name] = crypto
1✔
996
            this.crypto[name].name = name
1✔
997
        }
998
    }
999

1000
    encrypt(text, name = 'primary', inCode = 'utf8', outCode = 'base64') {
×
1001
        if (text) {
1✔
1002
            if (!this.crypto) {
1!
1003
                throw new OneTableArgError('No database secret or cipher defined')
×
1004
            }
1005
            let crypto = this.crypto[name]
1✔
1006
            if (!crypto) {
1!
1007
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1008
            }
1009
            let iv = Crypto.randomBytes(IV_LENGTH)
1✔
1010
            let crypt = Crypto.createCipheriv(crypto.cipher, crypto.secret, iv)
1✔
1011
            let crypted = crypt.update(text, inCode, outCode) + crypt.final(outCode)
1✔
1012
            let tag = crypto.cipher.indexOf('-gcm') > 0 ? crypt.getAuthTag().toString(outCode) : ''
1!
1013
            text = `${crypto.name}:${tag}:${iv.toString('hex')}:${crypted}`
1✔
1014
        }
1015
        return text
1✔
1016
    }
1017

1018
    decrypt(text, inCode = 'base64', outCode = 'utf8') {
×
1019
        if (text) {
2✔
1020
            let [name, tag, iv, data] = text.split(':')
2✔
1021
            if (!data || !iv || !tag || !name) {
2!
1022
                return text
×
1023
            }
1024
            if (!this.crypto) {
2!
1025
                throw new OneTableArgError('No database secret or cipher defined')
×
1026
            }
1027
            let crypto = this.crypto[name]
2✔
1028
            if (!crypto) {
2!
1029
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1030
            }
1031
            iv = Buffer.from(iv, 'hex')
2✔
1032
            let crypt = Crypto.createDecipheriv(crypto.cipher, crypto.secret, iv)
2✔
1033
            crypt.setAuthTag(Buffer.from(tag, inCode))
2✔
1034
            text = crypt.update(data, inCode, outCode) + crypt.final(outCode)
2✔
1035
        }
1036
        return text
2✔
1037
    }
1038

1039
    /*
1040
        Marshall data into and out of DynamoDB format
1041
    */
1042
    marshall(item, params) {
1043
        let client = params.client || this.client
2,561✔
1044
        if (client.V3) {
2,561✔
1045
            let options = client.params.marshall
2,476✔
1046
            if (Array.isArray(item)) {
2,476!
1047
                for (let i = 0; i < item.length; i++) {
×
1048
                    item[i] = client.marshall(item[i], options)
×
1049
                }
1050
            } else {
1051
                item = client.marshall(item, options)
2,476✔
1052
            }
1053
        } else {
1054
            if (Array.isArray(item)) {
85!
1055
                for (let i = 0; i < item.length; i++) {
×
1056
                    item = this.marshallv2(item, params)
×
1057
                }
1058
            } else {
1059
                item = this.marshallv2(item, params)
85✔
1060
            }
1061
        }
1062
        return item
2,561✔
1063
    }
1064

1065
    /*
1066
        Marshall data out of DynamoDB format
1067
    */
1068
    unmarshall(item, params) {
1069
        if (this.V3) {
310✔
1070
            let client = params.client ? params.client : this.client
282!
1071
            let options = client.params.unmarshall
282✔
1072
            if (Array.isArray(item)) {
282✔
1073
                for (let i = 0; i < item.length; i++) {
231✔
1074
                    item[i] = client.unmarshall(item[i], options)
1,824✔
1075
                }
1076
            } else {
1077
                item = client.unmarshall(item, options)
51✔
1078
            }
1079
        } else {
1080
            if (Array.isArray(item)) {
28✔
1081
                for (let i = 0; i < item.length; i++) {
22✔
1082
                    item[i] = this.unmarshallv2(item[i])
26✔
1083
                }
1084
            } else {
1085
                item = this.unmarshallv2(item)
6✔
1086
            }
1087
        }
1088
        return item
310✔
1089
    }
1090

1091
    marshallv2(item, params) {
1092
        let client = params.client ? params.client : this.client
85!
1093
        for (let [key, value] of Object.entries(item)) {
85✔
1094
            if (value instanceof Set) {
203✔
1095
                item[key] = client.createSet(Array.from(value))
7✔
1096
            }
1097
        }
1098
        return item
85✔
1099
    }
1100

1101
    unmarshallv2(item) {
1102
        for (let [key, value] of Object.entries(item)) {
32✔
1103
            if (
440✔
1104
                value != null &&
898✔
1105
                typeof value == 'object' &&
1106
                value.wrapperName == 'Set' &&
1107
                Array.isArray(value.values)
1108
            ) {
1109
                let list = value.values
9✔
1110
                if (value.type == 'Binary') {
9✔
1111
                    //  Match AWS SDK V3 behavior
1112
                    list = list.map((v) => new Uint8Array(v))
9✔
1113
                }
1114
                item[key] = new Set(list)
9✔
1115
            }
1116
        }
1117
        return item
32✔
1118
    }
1119

1120
    unmarshallStreamImage(image, params) {
1121
        if (!this.V3) {
10!
1122
            return unmarshall(image)
×
1123
        }
1124

1125
        let client = params.client ? params.client : this.client
10!
1126
        let options = client.params.unmarshall
10✔
1127
        return client.unmarshall(image, options)
10✔
1128
    }
1129

1130
    /*
1131
        Handle DynamoDb Stream Records
1132
     */
1133
    stream(records, params = {}) {
2✔
1134
        const tableModels = this.listModels()
2✔
1135

1136
        const result = {}
2✔
1137
        for (const record of records) {
2✔
1138
            if (!record.dynamodb.NewImage && !record.dynamodb.OldImage) {
6!
1139
                continue
×
1140
            }
1141

1142
            const model = {
6✔
1143
                type: record.eventName,
1144
            }
1145
            let typeNew
1146
            let typeOld
1147

1148
            // Unmarshall and transform the New Image if it exists
1149
            if (record.dynamodb.NewImage) {
6✔
1150
                const jsonNew = this.unmarshallStreamImage(record.dynamodb.NewImage, params)
6✔
1151
                typeNew = jsonNew[this.typeField]
6✔
1152

1153
                // If type not found then don't do anything
1154
                if (typeNew && tableModels.includes(typeNew)) {
6✔
1155
                    model.new = this.schema.models[typeNew].transformReadItem('get', jsonNew, {}, params)
4✔
1156
                }
1157
            }
1158

1159
            // Unmarshall and transform the Old Image if it exists
1160
            if (record.dynamodb.OldImage) {
6✔
1161
                const jsonOld = this.unmarshallStreamImage(record.dynamodb.OldImage, params)
4✔
1162
                typeOld = jsonOld[this.typeField]
4✔
1163

1164
                // If type not found then don't do anything
1165
                if (typeOld && tableModels.includes(typeOld)) {
4✔
1166
                    // If there was a new image of a different type then skip
1167
                    if (typeNew && typeNew !== typeOld) {
2!
1168
                        continue
×
1169
                    }
1170
                    model.old = this.schema.models[typeOld].transformReadItem('get', jsonOld, {}, params)
2✔
1171
                }
1172
            }
1173

1174
            const type = typeNew || typeOld
6!
1175
            let list = (result[type] = result[type] || [])
6✔
1176
            list.push(model)
6✔
1177
        }
1178

1179
        return result
2✔
1180
    }
1181

1182
    /*
1183
        Recursive Object.assign. Will clone dates, regexp, simple objects and arrays.
1184
        Other class instances and primitives are copied not cloned.
1185
        Max recursive depth of 20
1186
    */
1187
    assign(dest, ...sources) {
1188
        for (let src of sources) {
1,258✔
1189
            if (src) {
1,263✔
1190
                dest = this.assignInner(dest, src)
1,263✔
1191
            }
1192
        }
1193
        return dest
1,258✔
1194
    }
1195

1196
    assignInner(dest, src, recurse = 0) {
1,263✔
1197
        if (recurse++ > 1000) {
3,243!
1198
            throw new OneTableError('Recursive merge', {code: 'RuntimeError'})
×
1199
        }
1200
        if (!src || !dest || typeof src != 'object') {
3,243!
1201
            return
×
1202
        }
1203
        for (let [key, v] of Object.entries(src)) {
3,243✔
1204
            if (v === undefined) {
9,369!
1205
                continue
×
1206
            } else if (v instanceof Date) {
9,369✔
1207
                dest[key] = new Date(v)
21✔
1208
            } else if (v instanceof RegExp) {
9,348✔
1209
                dest[key] = new RegExp(v.source, v.flags)
12✔
1210
            } else if (Array.isArray(v)) {
9,336✔
1211
                if (!Array.isArray(dest[key])) {
17✔
1212
                    dest[key] = []
17✔
1213
                }
1214
                if (v.length) {
17✔
1215
                    dest[key] = this.assignInner([key], v, recurse)
17✔
1216
                }
1217
            } else if (typeof v == 'object' && v != null && v.constructor.name == 'Object') {
9,319✔
1218
                if (typeof dest[key] != 'object') {
1,963✔
1219
                    dest[key] = {}
1,958✔
1220
                }
1221
                dest[key] = this.assignInner(dest[key], v, recurse)
1,963✔
1222
            } else {
1223
                dest[key] = v
7,356✔
1224
            }
1225
        }
1226
        return dest
3,243✔
1227
    }
1228

1229
    async delay(time) {
1230
        return new Promise(function (resolve) {
×
1231
            setTimeout(() => resolve(true), time)
×
1232
        })
1233
    }
1234
}
1235

1236
/*
1237
    Emulate SenseLogs API
1238
*/
1239
class Log {
1240
    constructor(logger) {
1241
        if (logger === true) {
56✔
1242
            this.logger = this.defaultLogger
19✔
1243
        } else if (logger) {
37✔
1244
            this.logger = logger
2✔
1245
        }
1246
    }
1247

1248
    enabled() {
1249
        return true
×
1250
    }
1251

1252
    data(message, context) {
1253
        this.process('data', message, context)
854✔
1254
    }
1255

1256
    emit(chan, message, context) {
1257
        this.process(chan, message, context)
×
1258
    }
1259

1260
    error(message, context) {
1261
        this.process('error', message, context)
2✔
1262
    }
1263

1264
    info(message, context) {
1265
        this.process('info', message, context)
6✔
1266
    }
1267

1268
    trace(message, context) {
1269
        this.process('trace', message, context)
1,054✔
1270
    }
1271

1272
    process(level, message, context) {
1273
        if (this.logger) {
1,916✔
1274
            this.logger(level, message, context)
412✔
1275
        }
1276
    }
1277

1278
    defaultLogger(level, message, context) {
1279
        if (level == 'trace' || level == 'data') {
358✔
1280
            //  params.log: true will cause the level to be changed to 'info'
1281
            return
358✔
1282
        }
1283
        if (context) {
×
1284
            try {
×
1285
                console.log(level, message, JSON.stringify(context, null, 4))
×
1286
            } catch (err) {
1287
                let buf = ['{']
×
1288
                for (let [key, value] of Object.entries(context)) {
×
1289
                    try {
×
1290
                        buf.push(`    ${key}: ${JSON.stringify(value, null, 4)}`)
×
1291
                    } catch (err) {
1292
                        /* continue */
1293
                    }
1294
                }
1295
                buf.push('}')
×
1296
                console.log(level, message, buf.join('\n'))
×
1297
            }
1298
        } else {
1299
            console.log(level, message)
×
1300
        }
1301
    }
1302
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc