• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sensedeep / dynamodb-onetable / #67

pending completion
#67

push

Michael O'Brien
CLEAN

1116 of 1616 branches covered (69.06%)

Branch coverage included in aggregate %.

1790 of 2386 relevant lines covered (75.02%)

623.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.56
/src/Table.js
1
/*
2
    Table.js - DynamoDB table class
3

4
    A OneTable Table represents a single (connected) DynamoDB table
5
 */
6

7
import {Buffer} from 'buffer'
50✔
8
import Crypto from 'crypto'
50✔
9
import UUID from './UUID.js'
50✔
10
import ULID from './ULID.js'
50✔
11
import UID from './UID.js'
50✔
12
import Dynamo from './Dynamo.js'
50✔
13
import {Expression} from './Expression.js'
50✔
14
import {Schema} from './Schema.js'
50✔
15
import {Metrics} from './Metrics.js'
50✔
16
import {OneTableArgError, OneTableError} from './Error.js'
50✔
17

18
/*
19
    AWS V2 DocumentClient methods
20
 */
21
const DocumentClientMethods = {
50✔
22
    delete: 'delete',
23
    get: 'get',
24
    find: 'query',
25
    put: 'put',
26
    scan: 'scan',
27
    update: 'update',
28
    batchGet: 'batchGet',
29
    batchWrite: 'batchWrite',
30
    transactGet: 'transactGet',
31
    transactWrite: 'transactWrite',
32
}
33

34
/*
35
    Safety string required on API to delete a table
36
*/
37
const ConfirmRemoveTable = 'DeleteTableForever'
50✔
38

39
/*
40
    Crypto IV length
41
*/
42
const IV_LENGTH = 16
50✔
43

44
const DynamoOps = {
50✔
45
    delete: 'deleteItem',
46
    get: 'getItem',
47
    find: 'query',
48
    put: 'putItem',
49
    scan: 'scan',
50
    update: 'updateItem',
51
    batchGet: 'batchGet',
52
    batchWrite: 'batchWrite',
53
    transactGet: 'transactGet',
54
    transactWrite: 'transactWrite',
55
}
56

57
const GenericModel = '_Generic'
50✔
58

59
const maxBatchSize = 25
50✔
60

61
/*
62
    Represent a single DynamoDB table
63
 */
64
export class Table {
50✔
65
    constructor(params = {}) {
×
66
        if (!params.name) {
56✔
67
            throw new OneTableArgError('Missing "name" property')
1✔
68
        }
69
        this.context = {}
55✔
70

71
        this.log = params.senselogs ? params.senselogs : new Log(params.logger)
55!
72
        this.log.trace(`Loading OneTable`)
55✔
73

74
        if (params.client) {
55✔
75
            this.setClient(params.client)
52✔
76
        }
77
        if (params.crypto) {
55✔
78
            this.initCrypto(params.crypto)
1✔
79
            this.crypto = Object.assign(params.crypto)
1✔
80
            for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
81
                crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
82
                this.crypto[name] = crypto
1✔
83
                this.crypto[name].name = name
1✔
84
            }
85
        }
86
        this.setParams(params)
55✔
87

88
        //  Set schema param defaults
89
        this.typeField = '_type'
55✔
90
        this.createdField = 'created'
55✔
91
        this.isoDates = false
55✔
92
        this.nulls = false
55✔
93
        this.timestamps = false
55✔
94
        this.updatedField = 'updated'
55✔
95

96
        this.schema = new Schema(this, params.schema)
55✔
97
        if (params.dataloader) {
55✔
98
            this.dataloader = new params.dataloader((cmds) => this.batchLoaderFunction(cmds), {maxBatchSize})
2✔
99
        }
100
    }
101

102
    setClient(client) {
103
        if (client.send && !client.V3) {
55✔
104
            //  V3 SDK and not yet wrapped by Dynamo
105
            client = new Dynamo({client})
52✔
106
        }
107
        this.client = client
55✔
108
        this.V3 = client.V3
55✔
109
        this.service = this.V3 ? this.client : this.client.service
55✔
110
    }
111

112
    setParams(params) {
113
        if (
55!
114
            params.createdField != null ||
330✔
115
            this.isoDates != null ||
116
            this.nulls != null ||
117
            this.timestamps != null ||
118
            this.typeField != null ||
119
            this.updatedField != null
120
        ) {
121
            throw new OneTableArgError(
×
122
                'Using deprecated Table constructor parameters. Define in the Schema.params instead.'
123
            )
124
        }
125

126
        if (params.uuid) {
55!
127
            console.warn(
×
128
                'OneTable: Using deprecated Table constructor "uuid" parameter. Use a "generate" function instead or ' +
129
                    'Set schema models to use "generate: uuid|ulid" explicitly.'
130
            )
131
            params.generate = params.generate | params.uuid
×
132
        }
133

134
        if (params.partial == null) {
55!
135
            console.warn(
×
136
                'OneTable: Must set Table constructor "partial" param to true or false. ' +
137
                    'This param permits updating partial nested schemas. Currently defaults to false, ' +
138
                    'but in a future version will default to true. ' +
139
                    'Set to false to future proof or set to true for the new behavior.'
140
            )
141
            params.partial = true
×
142
        }
143
        //  Return hidden fields by default. Default is false.
144
        this.hidden = params.hidden != null ? params.hidden : false
55!
145
        this.partial = params.partial
55✔
146
        this.warn = params.warn || true
55✔
147

148
        if (typeof params.generate == 'function') {
55!
149
            this.generate = params.generate || this.uuid
×
150
        } else if (params.generate) {
55!
151
            throw new OneTableArgError('OneTable: Generate can only be a function')
×
152
        }
153

154
        this.name = params.name
55✔
155

156
        if (params.metrics) {
55!
157
            this.metrics = new Metrics(this, params.metrics, this.metrics)
×
158
        }
159
        if (params.monitor) {
55!
160
            this.monitor = params.monitor
×
161
        }
162
        this.params = params
55✔
163
    }
164

165
    setSchemaParams(params = {}) {
9✔
166
        this.createdField = params.createdField || 'created'
54✔
167
        this.isoDates = params.isoDates || false
54✔
168
        this.nulls = params.nulls || false
54✔
169
        this.timestamps = params.timestamps != null ? params.timestamps : false
54✔
170
        this.typeField = params.typeField || '_type'
54✔
171
        this.updatedField = params.updatedField || 'updated'
54✔
172

173
        if (params.hidden != null) {
54!
174
            console.warn(`Schema hidden params should be specified via the Table constructor params`)
×
175
        }
176
    }
177

178
    getSchemaParams() {
179
        return {
55✔
180
            createdField: this.createdField,
181
            isoDates: this.isoDates,
182
            nulls: this.nulls,
183
            timestamps: this.timestamps,
184
            typeField: this.typeField,
185
            updatedField: this.updatedField,
186
        }
187
    }
188

189
    async setSchema(schema) {
190
        return await this.schema.setSchema(schema)
×
191
    }
192

193
    getCurrentSchema() {
194
        return this.schema.getCurrentSchema()
5✔
195
    }
196

197
    async getKeys() {
198
        return await this.schema.getKeys()
×
199
    }
200

201
    async getPrimaryKeys() {
202
        let keys = await this.schema.getKeys()
×
203
        return keys.primary
×
204
    }
205

206
    async readSchema() {
207
        return this.schema.readSchema()
×
208
    }
209

210
    async readSchemas() {
211
        return this.schema.readSchemas()
×
212
    }
213

214
    async removeSchema(schema) {
215
        return this.schema.removeSchema(schema)
×
216
    }
217

218
    async saveSchema(schema) {
219
        return this.schema.saveSchema(schema)
×
220
    }
221

222
    /*
223
        Output the AWS table definition as a JSON structure to use in external tools such as CloudFormation
224
        or the AWS CLI to create your DynamoDB table. Uses the current schema index definition.
225
        Alternatively, params may contain standard DynamoDB createTable parameters.
226
    */
227
    getTableDefinition(params = {}) {
×
228
        let def = {
52✔
229
            AttributeDefinitions: [],
230
            KeySchema: [],
231
            LocalSecondaryIndexes: [],
232
            GlobalSecondaryIndexes: [],
233
            TableName: this.name,
234
        }
235
        let provisioned = params.provisioned || params.ProvisionedThroughput
52✔
236
        if (provisioned) {
52✔
237
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
1!
238
                def.BillingMode = 'PAY_PER_REQUEST'
×
239
            } else {
240
                def.ProvisionedThroughput = provisioned
1✔
241
                def.BillingMode = 'PROVISIONED'
1✔
242
            }
243
        } else {
244
            def.BillingMode = 'PAY_PER_REQUEST'
51✔
245
        }
246
        if (params.StreamSpecification) {
52!
247
            def.StreamSpecification = params.StreamSpecification
×
248
        }
249
        let attributes = {}
52✔
250
        let {indexes} = this.schema
52✔
251

252
        if (!indexes) {
52!
253
            throw new OneTableArgError('Cannot create table without schema indexes')
×
254
        }
255
        for (let [name, index] of Object.entries(indexes)) {
52✔
256
            let keys
257
            if (name == 'primary') {
125✔
258
                keys = def.KeySchema
52✔
259
            } else {
260
                let collection = index.type == 'local' ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
73✔
261
                keys = []
73✔
262
                let project, projection
263
                if (Array.isArray(index.project)) {
73✔
264
                    projection = 'INCLUDE'
3✔
265
                    project = index.project.filter((a) => a != indexes.primary.hash && a != indexes.primary.sort)
9✔
266
                } else if (index.project == 'keys') {
70✔
267
                    projection = 'KEYS_ONLY'
1✔
268
                } else {
269
                    projection = 'ALL'
69✔
270
                }
271
                let projDef = {
73✔
272
                    IndexName: name,
273
                    KeySchema: keys,
274
                    Projection: {
275
                        ProjectionType: projection,
276
                    },
277
                }
278
                if (project) {
73✔
279
                    projDef.Projection.NonKeyAttributes = project
3✔
280
                }
281
                def[collection].push(projDef)
73✔
282
            }
283
            keys.push({AttributeName: index.hash || indexes.primary.hash, KeyType: 'HASH'})
125!
284

285
            if (index.hash && !attributes[index.hash]) {
125✔
286
                let type = this.getAttributeType(index.hash) == 'number' ? 'N' : 'S'
121!
287
                def.AttributeDefinitions.push({AttributeName: index.hash, AttributeType: type})
121✔
288
                attributes[index.hash] = true
121✔
289
            }
290
            if (index.sort) {
125✔
291
                if (!attributes[index.sort]) {
120✔
292
                    let type = this.getAttributeType(index.sort) == 'number' ? 'N' : 'S'
118✔
293
                    def.AttributeDefinitions.push({AttributeName: index.sort, AttributeType: type})
118✔
294
                    attributes[index.sort] = true
118✔
295
                }
296
                keys.push({AttributeName: index.sort, KeyType: 'RANGE'})
120✔
297
            }
298
        }
299
        if (def.GlobalSecondaryIndexes.length == 0) {
52✔
300
            delete def.GlobalSecondaryIndexes
17✔
301
        } else if (provisioned) {
35✔
302
            for (let index of def.GlobalSecondaryIndexes) {
1✔
303
                index.ProvisionedThroughput = provisioned
3✔
304
            }
305
        }
306
        if (def.LocalSecondaryIndexes.length == 0) {
52✔
307
            delete def.LocalSecondaryIndexes
49✔
308
        }
309
        return def
52✔
310
    }
311

312
    /*
313
        Create a DynamoDB table. Uses the current schema index definition.
314
        Alternatively, params may contain standard DynamoDB createTable parameters.
315
    */
316
    async createTable(params = {}) {
51✔
317
        const def = this.getTableDefinition(params)
52✔
318
        let result
319

320
        this.log.trace(`OneTable createTable for "${this.name}"`, {def})
52✔
321
        if (this.V3) {
52✔
322
            result = await this.service.createTable(def)
50✔
323
        } else {
324
            result = await this.service.createTable(def).promise()
2✔
325
        }
326

327
        /*
328
            Wait for table to become active. Must do if setting a TTL attribute
329
        */
330
        if (params.TimeToLiveSpecification) {
52!
331
            params.wait = 5 * 60
×
332
        }
333
        if (params.wait) {
52!
334
            let deadline = new Date(Date.now() + params.wait * 1000)
×
335
            let info
336
            do {
×
337
                info = await this.describeTable()
×
338
                if (info.Table.TableStatus == 'ACTIVE') {
×
339
                    break
×
340
                }
341
                if (deadline < Date.now()) {
×
342
                    throw new Error('Table has not become active')
×
343
                }
344
                await this.delay(1000)
×
345
            } while (Date.now() < deadline)
346
        }
347

348
        /*
349
            Define a TTL attribute
350
        */
351
        if (params.TimeToLiveSpecification) {
52!
352
            let def = {
×
353
                TableName: this.name,
354
                TimeToLiveSpecification: params.TimeToLiveSpecification,
355
            }
356
            if (this.V3) {
×
357
                await this.service.updateTimeToLive(def)
×
358
            } else {
359
                await this.service.updateTimeToLive(def).promise()
×
360
            }
361
        }
362
        return result
52✔
363
    }
364

365
    getAttributeType(name) {
366
        for (let model of Object.values(this.schema.models)) {
239✔
367
            let fields = model.block.fields
257✔
368
            if (fields[name]) {
257✔
369
                return fields[name].type
233✔
370
            }
371
        }
372
        return null
6✔
373
    }
374

375
    /*
376
        Delete the DynamoDB table forever. Be careful.
377
    */
378
    async deleteTable(confirmation) {
379
        if (confirmation == ConfirmRemoveTable) {
52✔
380
            this.log.trace(`OneTable deleteTable for "${this.name}"`)
51✔
381
            if (this.V3) {
51✔
382
                await this.service.deleteTable({TableName: this.name})
49✔
383
            } else {
384
                await this.service.deleteTable({TableName: this.name}).promise()
2✔
385
            }
386
        } else {
387
            throw new OneTableArgError(`Missing required confirmation "${ConfirmRemoveTable}"`)
1✔
388
        }
389
    }
390

391
    async updateTable(params = {}) {
×
392
        let def = {
×
393
            AttributeDefinitions: [],
394
            GlobalSecondaryIndexUpdates: [],
395
            TableName: this.name,
396
        }
397
        let {create, provisioned} = params
×
398

399
        if (provisioned) {
×
400
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
×
401
                def.BillingMode = 'PAY_PER_REQUEST'
×
402
            } else {
403
                if (!create) {
×
404
                    def.ProvisionedThroughput = provisioned
×
405
                }
406
                def.BillingMode = 'PROVISIONED'
×
407
            }
408
        }
409
        let indexes = this.schema.indexes
×
410
        if (!indexes) {
×
411
            throw new OneTableArgError('Cannot update table without schema indexes')
×
412
        }
413
        if (create) {
×
414
            if (create.hash == null || create.hash == indexes.primary.hash || create.type == 'local') {
×
415
                throw new OneTableArgError('Cannot update table to create an LSI')
×
416
            }
417
            let keys = []
×
418
            let projection, project
419

420
            if (Array.isArray(create.project)) {
×
421
                projection = 'INCLUDE'
×
422
                project = create.project.filter((a) => a != create.hash && a != create.sort)
×
423
            } else if (create.project == 'keys') {
×
424
                projection = 'KEYS_ONLY'
×
425
            } else {
426
                projection = 'ALL'
×
427
            }
428
            let projDef = {
×
429
                IndexName: create.name,
430
                KeySchema: keys,
431
                Projection: {
432
                    ProjectionType: projection,
433
                },
434
            }
435
            if (project) {
×
436
                projDef.Projection.NonKeyAttributes = project
×
437
            }
438
            keys.push({AttributeName: create.hash, KeyType: 'HASH'})
×
439
            def.AttributeDefinitions.push({AttributeName: create.hash, AttributeType: 'S'})
×
440

441
            if (create.sort) {
×
442
                def.AttributeDefinitions.push({AttributeName: create.sort, AttributeType: 'S'})
×
443
                keys.push({AttributeName: create.sort, KeyType: 'RANGE'})
×
444
            }
445
            if (provisioned) {
×
446
                projDef.ProvisionedThroughput = provisioned
×
447
            }
448
            def.GlobalSecondaryIndexUpdates.push({Create: projDef})
×
449
        } else if (params.remove) {
×
450
            def.GlobalSecondaryIndexUpdates.push({Delete: {IndexName: params.remove.name}})
×
451
        } else if (params.update) {
×
452
            let update = {Update: {IndexName: params.update.name}}
×
453
            if (provisioned) {
×
454
                update.Update.ProvisionedThroughput = provisioned
×
455
            }
456
            def.GlobalSecondaryIndexUpdates.push(update)
×
457
        }
458
        if (def.GlobalSecondaryIndexUpdates.length == 0) {
×
459
            delete def.GlobalSecondaryIndexUpdates
×
460
        }
461
        if (params.TimeToLiveSpecification) {
×
462
            let def = {
×
463
                TableName: params.TableName,
464
                TimeToLiveSpecification: params.TimeToLiveSpecification,
465
            }
466
            if (this.V3) {
×
467
                await this.service.updateTimeToLive(def)
×
468
            } else {
469
                await this.service.updateTimeToLive(def).promise()
×
470
            }
471
        }
472
        this.log.trace(`OneTable updateTable for "${this.name}"`, {def})
×
473
        if (this.V3) {
×
474
            return await this.service.updateTable(def)
×
475
        } else {
476
            return await this.service.updateTable(def).promise()
×
477
        }
478
    }
479

480
    /*
481
        Return the raw AWS table description
482
    */
483
    async describeTable() {
484
        if (this.V3) {
4✔
485
            return await this.service.describeTable({TableName: this.name})
3✔
486
        } else {
487
            return await this.service.describeTable({TableName: this.name}).promise()
1✔
488
        }
489
    }
490

491
    /*
492
        Return true if the underlying DynamoDB table represented by this OneTable instance is present.
493
    */
494
    async exists() {
495
        let results = await this.listTables()
132✔
496
        return results && results.find((t) => t == this.name) != null ? true : false
351✔
497
    }
498

499
    /*
500
        Return a list of tables in the AWS region described by the Table instance
501
    */
502
    async listTables() {
503
        let results
504
        if (this.V3) {
135✔
505
            results = await this.service.listTables({})
129✔
506
        } else {
507
            results = await this.service.listTables({}).promise()
6✔
508
        }
509
        return results.TableNames
135✔
510
    }
511

512
    listModels() {
513
        return this.schema.listModels()
8✔
514
    }
515

516
    addModel(name, fields) {
517
        this.schema.addModel(name, fields)
1✔
518
    }
519

520
    getLog() {
521
        return this.log
×
522
    }
523

524
    setLog(log) {
525
        this.log = log
×
526
    }
527

528
    /*
529
        Thows exception if model cannot be found
530
     */
531
    getModel(name, options = {nothrow: false}) {
128✔
532
        return this.schema.getModel(name, options)
128✔
533
    }
534

535
    removeModel(name) {
536
        return this.schema.removeModel(name)
2✔
537
    }
538

539
    getContext() {
540
        return this.context
11✔
541
    }
542

543
    addContext(context = {}) {
×
544
        this.context = Object.assign(this.context, context)
1✔
545
        return this
1✔
546
    }
547

548
    setContext(context = {}, merge = false) {
6!
549
        this.context = merge ? Object.assign(this.context, context) : context
7✔
550
        return this
7✔
551
    }
552

553
    clearContext() {
554
        this.context = {}
1✔
555
        return this
1✔
556
    }
557

558
    /*  PROTOTYPE
559
        Create a clone of the table with the same settings and replace the context
560
    */
561
    child(context) {
562
        let table = JSON.parse(JSON.stringify(this))
×
563
        table.context = context
×
564
        return table
×
565
    }
566

567
    /*
568
        High level model factory API
569
        The high level API is similar to the Model API except the model name is provided as the first parameter.
570
        This API is useful for factories
571
    */
572
    async create(modelName, properties, params) {
573
        let model = this.getModel(modelName)
19✔
574
        return await model.create(properties, params)
19✔
575
    }
576

577
    async find(modelName, properties, params) {
578
        let model = this.getModel(modelName)
3✔
579
        return await model.find(properties, params)
3✔
580
    }
581

582
    async get(modelName, properties, params) {
583
        let model = this.getModel(modelName)
21✔
584
        return await model.get(properties, params)
21✔
585
    }
586

587
    async load(modelName, properties, params) {
588
        let model = this.getModel(modelName)
6✔
589
        return await model.load(properties, params)
6✔
590
    }
591

592
    init(modelName, properties, params) {
593
        let model = this.getModel(modelName)
×
594
        return model.init(properties, params)
×
595
    }
596

597
    async remove(modelName, properties, params) {
598
        let model = this.getModel(modelName)
5✔
599
        return await model.remove(properties, params)
5✔
600
    }
601

602
    async scan(modelName, properties, params) {
603
        let model = this.getModel(modelName)
20✔
604
        return await model.scan(properties, params)
20✔
605
    }
606

607
    async update(modelName, properties, params) {
608
        let model = this.getModel(modelName)
6✔
609
        return await model.update(properties, params)
6✔
610
    }
611

612
    async upsert(modelName, properties, params = {}) {
×
613
        params.exists = null
×
614
        return this.update(modelName, properties, params)
×
615
    }
616

617
    async execute(model, op, cmd, properties = {}, params = {}) {
×
618
        let mark = new Date()
899✔
619
        let trace = {model, cmd, op, properties}
899✔
620
        let result
621
        try {
899✔
622
            let client = params.client || this.client
899✔
623
            if (params.stats || this.metrics || this.monitor) {
899✔
624
                cmd.ReturnConsumedCapacity = params.capacity || 'INDEXES'
1✔
625
                cmd.ReturnItemCollectionMetrics = 'SIZE'
1✔
626
            }
627
            this.log[params.log ? 'info' : 'trace'](`OneTable "${op}" "${model}"`, {trace})
899✔
628
            if (this.V3) {
899✔
629
                result = await client[op](cmd)
869✔
630
            } else {
631
                result = await client[DocumentClientMethods[op]](cmd).promise()
30✔
632
            }
633
        } catch (err) {
634
            //  V3 stores the error in 'name' (Ugh!)
635
            let code = err.code || err.name
6✔
636
            if (params.throw === false) {
6✔
637
                result = {}
1✔
638
            } else if (code == 'ConditionalCheckFailedException' && op == 'put') {
5!
639
                this.log.info(`Conditional check failed "${op}" on "${model}"`, {err, trace})
×
640
                throw new OneTableError(`Conditional create failed for "${model}"`, {
×
641
                    code,
642
                    err,
643
                    trace,
644
                })
645
            } else if (code == 'ProvisionedThroughputExceededException') {
5!
646
                throw new OneTableError('Provisioning Throughput Exception', {
×
647
                    code,
648
                    err,
649
                    trace,
650
                })
651
            } else if (code == 'TransactionCanceledException') {
5✔
652
                throw new OneTableError('Transaction Cancelled', {
3✔
653
                    code,
654
                    err,
655
                    trace,
656
                })
657
            } else {
658
                result = result || {}
2✔
659
                result.Error = 1
2✔
660
                if (params.log != false) {
2✔
661
                    this.log.error(`OneTable exception in "${op}" on "${model} ${err.message}"`, {err, trace})
2✔
662
                }
663
                throw new OneTableError(`OneTable execute failed "${op}" for "${model}", ${err.message}`, {
2✔
664
                    code,
665
                    err,
666
                    trace,
667
                })
668
            }
669
        } finally {
670
            if (result) {
899✔
671
                if (this.metrics) {
896!
672
                    this.metrics.add(model, op, result, params, mark)
×
673
                }
674
                if (this.monitor) {
896!
675
                    await this.monitor(model, op, result, params, mark)
×
676
                }
677
            }
678
        }
679
        if (typeof params.info == 'object') {
894!
680
            params.info.operation = DynamoOps[op]
×
681
            params.info.args = cmd
×
682
            params.info.properties = properties
×
683
        }
684
        return result
894✔
685
    }
686

687
    /*
688
        The low level API does not use models. It permits the reading / writing of any attribute.
689
    */
690
    async batchGet(batch, params = {}) {
2✔
691
        if (Object.getOwnPropertyNames(batch).length == 0) {
7!
692
            return []
×
693
        }
694
        let def = batch.RequestItems[this.name]
7✔
695

696
        if (params.fields) {
7✔
697
            if (params.fields.indexOf(this.typeField) < 0) {
1✔
698
                params.fields.push(this.typeField)
1✔
699
            }
700
            let expression = new Expression(this.schema.genericModel, 'batchGet', {}, params)
1✔
701
            let cmd = expression.command()
1✔
702
            def.ProjectionExpression = cmd.ProjectionExpression
1✔
703
            def.ExpressionAttributeNames = cmd.ExpressionAttributeNames
1✔
704
        }
705
        def.ConsistentRead = params.consistent ? true : false
7✔
706

707
        // let result = await this.execute(GenericModel, 'batchGet', batch, {}, params)
708

709
        let result,
710
            retries = 0,
6✔
711
            more
712
        result = params.parse ? [] : {Responses: {}}
6✔
713
        do {
6✔
714
            more = false
6✔
715
            let data = await this.execute(GenericModel, 'batchGet', batch, {}, params)
6✔
716
            if (data) {
6✔
717
                let responses = data.Responses
6✔
718
                if (responses) {
6✔
719
                    for (let [key, items] of Object.entries(responses)) {
6✔
720
                        for (let item of items) {
6✔
721
                            if (params.parse) {
12✔
722
                                item = this.unmarshall(item, params)
7✔
723
                                let type = item[this.typeField] || '_unknown'
7!
724
                                let model = this.schema.models[type]
7✔
725
                                if (model && model != this.schema.uniqueModel) {
7✔
726
                                    result.push(model.transformReadItem('get', item, {}, params))
7✔
727
                                }
728
                            } else {
729
                                let set = (result.Responses[key] = result.Responses[key] || [])
5✔
730
                                set.push(item)
5✔
731
                            }
732
                        }
733
                    }
734
                }
735
                let unprocessed = data.UnprocessedItems
6✔
736
                if (unprocessed && Object.keys(unprocessed).length) {
6!
737
                    batch.RequestItems = unprocessed
×
738
                    if (params.reprocess === false) {
×
739
                        return false
×
740
                    }
741
                    if (retries > 11) {
×
742
                        throw new Error(unprocessed)
×
743
                    }
744
                    await this.delay(10 * 2 ** retries++)
×
745
                    more = true
×
746
                }
747
            }
748
        } while (more)
749
        return result
6✔
750
    }
751

752
    /*
753
        AWS BatchWrite may throw an exception if no items can be processed.
754
        Otherwise it will retry (up to 11 times) and return partial results in UnprocessedItems.
755
        Those will be handled here if possible.
756
    */
757
    async batchWrite(batch, params = {}) {
5✔
758
        if (Object.getOwnPropertyNames(batch).length == 0) {
6!
759
            return {}
×
760
        }
761
        let retries = 0,
6✔
762
            more
763
        do {
6✔
764
            more = false
6✔
765
            let response = await this.execute(GenericModel, 'batchWrite', batch, {}, params)
6✔
766
            let data = response.data
5✔
767
            if (data && data.UnprocessedItems && Object.keys(data.UnprocessedItems).length) {
5!
768
                batch.RequestItems = data.UnprocessedItems
×
769
                if (params.reprocess === false) {
×
770
                    return false
×
771
                }
772
                if (retries > 11) {
×
773
                    throw new Error(response.UnprocessedItems)
×
774
                }
775
                await this.delay(10 * 2 ** retries++)
×
776
                more = true
×
777
            }
778
        } while (more)
779
        return true
5✔
780
    }
781

782
    async batchLoad(expression) {
783
        if (this.dataloader) {
6✔
784
            return await this.dataloader.load(expression)
6✔
785
        }
786
        throw new Error('params.dataloader DataLoader constructor is required to use load feature')
×
787
    }
788

789
    async deleteItem(properties, params) {
790
        return await this.schema.genericModel.deleteItem(properties, params)
1✔
791
    }
792

793
    async getItem(properties, params) {
794
        return await this.schema.genericModel.getItem(properties, params)
1✔
795
    }
796

797
    async putItem(properties, params) {
798
        return await this.schema.genericModel.putItem(properties, params)
2✔
799
    }
800

801
    async queryItems(properties, params) {
802
        return await this.schema.genericModel.queryItems(properties, params)
8✔
803
    }
804

805
    async scanItems(properties, params) {
806
        return await this.schema.genericModel.scanItems(properties, params)
20✔
807
    }
808

809
    async updateItem(properties, params) {
810
        return await this.schema.genericModel.updateItem(properties, params)
3✔
811
    }
812

813
    async fetch(models, properties, params) {
814
        return await this.schema.genericModel.fetch(models, properties, params)
2✔
815
    }
816

817
    /*
818
        Invoke a prepared transaction. Note: transactGet does not work on non-primary indexes.
819
     */
820
    async transact(op, transaction, params = {}) {
3✔
821
        if (params.execute === false) {
23!
822
            if (params.log !== false) {
×
823
                this.log[params.log ? 'info' : 'data'](`OneTable transaction for "${op}" (not executed)`, {
×
824
                    transaction,
825
                    op,
826
                    params,
827
                })
828
            }
829
            return transaction
×
830
        }
831
        let result = await this.execute(
23✔
832
            GenericModel,
833
            op == 'write' ? 'transactWrite' : 'transactGet',
23✔
834
            transaction,
835
            {},
836
            params
837
        )
838
        if (op == 'get') {
20✔
839
            if (params.parse) {
3✔
840
                let items = []
2✔
841
                for (let r of result.Responses) {
2✔
842
                    if (r.Item) {
6✔
843
                        let item = this.unmarshall(r.Item, params)
6✔
844
                        let type = item[this.typeField] || '_unknown'
6!
845
                        let model = this.schema.models[type]
6✔
846
                        if (model && model != this.schema.uniqueModel) {
6✔
847
                            items.push(model.transformReadItem('get', item, {}, params))
6✔
848
                        }
849
                    }
850
                }
851
                result = items
2✔
852
            }
853
        }
854
        return result
20✔
855
    }
856

857
    /*
858
        Convert items into a map of items by model type
859
    */
860
    groupByType(items, params = {}) {
5✔
861
        let result = {}
6✔
862
        for (let item of items) {
6✔
863
            let type = item[this.typeField] || '_unknown'
23!
864
            let list = (result[type] = result[type] || [])
23✔
865
            let model = this.schema.models[type]
23✔
866
            let preparedItem
867
            if (typeof params.hidden === 'boolean' && !params.hidden) {
23✔
868
                let fields = model.block.fields
4✔
869
                preparedItem = {}
4✔
870
                for (let [name, field] of Object.entries(fields)) {
4✔
871
                    if (!(field.hidden && params.hidden !== true)) {
37✔
872
                        preparedItem[name] = item[name]
17✔
873
                    }
874
                }
875
            } else {
876
                preparedItem = item
19✔
877
            }
878
            list.push(preparedItem)
23✔
879
        }
880
        return result
6✔
881
    }
882

883
    /**
884
        This is a function passed to the DataLoader that given an array of Expression
885
        will group all commands by TableName and make all Keys unique and then will perform
886
        a BatchGet request and process the response of the BatchRequest to return an array
887
        with the corresponding response in the same order as it was requested.
888
        @param expressions
889
        @returns {Promise<*>}
890
     */
891
    async batchLoaderFunction(expressions) {
892
        const commands = expressions.map((each) => each.command())
6✔
893

894
        const groupedByTableName = commands.reduce((groupedBy, item) => {
2✔
895
            const tableName = item.TableName
6✔
896
            if (!groupedBy[tableName]) groupedBy[tableName] = []
6✔
897
            groupedBy[tableName].push(item)
6✔
898
            return groupedBy
6✔
899
        }, {})
900

901
        // convert each of the get requests into a single RequestItem with unique Keys
902
        const requestItems = Object.keys(groupedByTableName).reduce((requestItems, tableName) => {
2✔
903
            // batch get does not support duplicate Keys, so we need to make them unique
904
            // it's complex because we have the unmarshalled values on the Keys when it's V3
905
            const allKeys = groupedByTableName[tableName].map((each) => each.Key)
6✔
906
            const uniqueKeys = allKeys.filter((key1, index1, self) => {
2✔
907
                const index2 = self.findIndex((key2) => {
6✔
908
                    return Object.keys(key2).every((prop) => {
9✔
909
                        if (this.V3) {
15✔
910
                            const type = Object.keys(key1[prop])[0] // { S: "XX" } => type is S
15✔
911
                            return key2[prop][type] === key1[prop][type]
15✔
912
                        }
913
                        return key2[prop] === key1[prop]
×
914
                    })
915
                })
916
                return index2 === index1
6✔
917
            })
918
            requestItems[tableName] = {Keys: uniqueKeys}
2✔
919
            return requestItems
2✔
920
        }, {})
921

922
        const results = await this.batchGet({RequestItems: requestItems})
2✔
923

924
        // return the exact mapping (on same order as input) of each get command request to the result from database
925
        // to do that we need to find in the Responses object the item that was request and return it in the same position
926
        return commands.map((command, index) => {
2✔
927
            const {model, params} = expressions[index]
6✔
928

929
            // each key is { pk: { S: "XX" } } when V3 or { pk: "XX" } when V2
930
            // on map function, key will be pk and unmarshalled will be { S: "XX" }, OR "XXX"
931
            const criteria = Object.entries(command.Key).map(([key, unmarshalled]) => {
6✔
932
                if (this.V3) {
12✔
933
                    const type = Object.keys(unmarshalled)[0] // the type will be S
12✔
934
                    return [[key, type], unmarshalled[type]] // return [[pk, S], "XX"]
12✔
935
                }
936
                return [[key], unmarshalled]
×
937
            })
938

939
            // finds the matching object in the unmarshalled Responses array with criteria Key above
940
            const findByKeyUnmarshalled = (items = []) =>
6!
941
                items.find((item) => {
6✔
942
                    return criteria.every(([[prop, type], value]) => {
9✔
943
                        if (type) return item[prop][type] === value // if it has a type it means it is V3
15✔
944
                        return item[prop] === value
×
945
                    })
946
                })
947

948
            const items = results.Responses[command.TableName]
6✔
949
            const item = findByKeyUnmarshalled(items)
6✔
950
            if (item) {
6✔
951
                const unmarshalled = this.unmarshall(item, params)
6✔
952
                return model.transformReadItem('get', unmarshalled, {}, params)
6✔
953
            }
954
        })
955
    }
956

957
    /*
958
        Simple non-crypto UUID. See node-uuid if you require crypto UUIDs.
959
        Consider ULIDs which are crypto sortable.
960
    */
961
    uuid() {
962
        return UUID()
9✔
963
    }
964

965
    // Simple time-based, sortable unique ID.
966
    ulid() {
967
        return new ULID().toString()
601✔
968
    }
969

970
    /*
971
        Crypto-grade ID of given length. If >= 10 in length, suitably unique for most use-cases.
972
     */
973
    uid(size = 10) {
×
974
        return UID(size)
×
975
    }
976

977
    setGenerate(fn) {
978
        this.generate = fn
×
979
    }
980

981
    /*
982
        Return the value template variable references in a list
983
     */
984
    getVars(v) {
985
        let list = []
470✔
986
        if (Array.isArray(v)) {
470!
987
            list = v
×
988
        } else if (typeof v == 'string') {
470✔
989
            v.replace(/\${(.*?)}/g, (match, varName) => {
469✔
990
                list.push(varName)
496✔
991
            })
992
        }
993
        return list
470✔
994
    }
995

996
    initCrypto(crypto) {
997
        this.crypto = Object.assign(crypto)
1✔
998
        for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
999
            crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
1000
            this.crypto[name] = crypto
1✔
1001
            this.crypto[name].name = name
1✔
1002
        }
1003
    }
1004

1005
    encrypt(text, name = 'primary', inCode = 'utf8', outCode = 'base64') {
×
1006
        if (text) {
1✔
1007
            if (!this.crypto) {
1!
1008
                throw new OneTableArgError('No database secret or cipher defined')
×
1009
            }
1010
            let crypto = this.crypto[name]
1✔
1011
            if (!crypto) {
1!
1012
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1013
            }
1014
            let iv = Crypto.randomBytes(IV_LENGTH)
1✔
1015
            let crypt = Crypto.createCipheriv(crypto.cipher, crypto.secret, iv)
1✔
1016
            let crypted = crypt.update(text, inCode, outCode) + crypt.final(outCode)
1✔
1017
            let tag = crypto.cipher.indexOf('-gcm') > 0 ? crypt.getAuthTag().toString(outCode) : ''
1!
1018
            text = `${crypto.name}:${tag}:${iv.toString('hex')}:${crypted}`
1✔
1019
        }
1020
        return text
1✔
1021
    }
1022

1023
    decrypt(text, inCode = 'base64', outCode = 'utf8') {
×
1024
        if (text) {
2✔
1025
            let [name, tag, iv, data] = text.split(':')
2✔
1026
            if (!data || !iv || !tag || !name) {
2!
1027
                return text
×
1028
            }
1029
            if (!this.crypto) {
2!
1030
                throw new OneTableArgError('No database secret or cipher defined')
×
1031
            }
1032
            let crypto = this.crypto[name]
2✔
1033
            if (!crypto) {
2!
1034
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1035
            }
1036
            iv = Buffer.from(iv, 'hex')
2✔
1037
            let crypt = Crypto.createDecipheriv(crypto.cipher, crypto.secret, iv)
2✔
1038
            crypt.setAuthTag(Buffer.from(tag, inCode))
2✔
1039
            text = crypt.update(data, inCode, outCode) + crypt.final(outCode)
2✔
1040
        }
1041
        return text
2✔
1042
    }
1043

1044
    /*
1045
        Marshall data into and out of DynamoDB format
1046
    */
1047
    marshall(item, params) {
1048
        let client = params.client || this.client
2,563✔
1049
        if (client.V3) {
2,563✔
1050
            let options = client.params.marshall
2,478✔
1051
            if (Array.isArray(item)) {
2,478!
1052
                for (let i = 0; i < item.length; i++) {
×
1053
                    item[i] = client.marshall(item[i], options)
×
1054
                }
1055
            } else {
1056
                item = client.marshall(item, options)
2,478✔
1057
            }
1058
        } else {
1059
            if (Array.isArray(item)) {
85!
1060
                for (let i = 0; i < item.length; i++) {
×
1061
                    item = this.marshallv2(item, params)
×
1062
                }
1063
            } else {
1064
                item = this.marshallv2(item, params)
85✔
1065
            }
1066
        }
1067
        return item
2,563✔
1068
    }
1069

1070
    /*
1071
        Marshall data out of DynamoDB format
1072
    */
1073
    unmarshall(item, params) {
1074
        if (this.V3) {
311✔
1075
            let client = params.client ? params.client : this.client
283!
1076
            let options = client.params.unmarshall
283✔
1077
            if (Array.isArray(item)) {
283✔
1078
                for (let i = 0; i < item.length; i++) {
232✔
1079
                    item[i] = client.unmarshall(item[i], options)
1,824✔
1080
                }
1081
            } else {
1082
                item = client.unmarshall(item, options)
51✔
1083
            }
1084
        } else {
1085
            if (Array.isArray(item)) {
28✔
1086
                for (let i = 0; i < item.length; i++) {
22✔
1087
                    item[i] = this.unmarshallv2(item[i])
26✔
1088
                }
1089
            } else {
1090
                item = this.unmarshallv2(item)
6✔
1091
            }
1092
        }
1093
        return item
311✔
1094
    }
1095

1096
    marshallv2(item, params) {
1097
        let client = params.client ? params.client : this.client
85!
1098
        for (let [key, value] of Object.entries(item)) {
85✔
1099
            if (value instanceof Set) {
203✔
1100
                item[key] = client.createSet(Array.from(value))
7✔
1101
            }
1102
        }
1103
        return item
85✔
1104
    }
1105

1106
    unmarshallv2(item) {
1107
        for (let [key, value] of Object.entries(item)) {
32✔
1108
            if (
440✔
1109
                value != null &&
898✔
1110
                typeof value == 'object' &&
1111
                value.wrapperName == 'Set' &&
1112
                Array.isArray(value.values)
1113
            ) {
1114
                let list = value.values
9✔
1115
                if (value.type == 'Binary') {
9✔
1116
                    //  Match AWS SDK V3 behavior
1117
                    list = list.map((v) => new Uint8Array(v))
9✔
1118
                }
1119
                item[key] = new Set(list)
9✔
1120
            }
1121
        }
1122
        return item
32✔
1123
    }
1124

1125
    unmarshallStreamImage(image, params) {
1126
        let client = params.client ? params.client : this.client
×
1127
        let options = client.params.unmarshall
×
1128
        return client.unmarshall(image, options)
×
1129
    }
1130

1131
    /*
1132
        Handle DynamoDb Stream Records
1133
     */
1134
    stream(records, params = {}) {
×
1135
        const tableModels = this.listModels()
×
1136

1137
        const result = {}
×
1138
        for (const record of records) {
×
1139
            if (!record.dynamodb.NewImage && !record.dynamodb.OldImage) {
×
1140
                continue
×
1141
            }
1142
            const model = {type: record.eventName}
×
1143
            let typeNew
1144
            let typeOld
1145

1146
            // Unmarshall and transform the New Image if it exists
1147
            if (record.dynamodb.NewImage) {
×
1148
                const jsonNew = this.unmarshallStreamImage(record.dynamodb.NewImage, params)
×
1149
                typeNew = jsonNew[this.typeField]
×
1150

1151
                // If type not found then don't do anything
1152
                if (typeNew && tableModels.includes(typeNew)) {
×
1153
                    model.new = this.schema.models[typeNew].transformReadItem('get', jsonNew, {}, params)
×
1154
                }
1155
            }
1156
            // Unmarshall and transform the Old Image if it exists
1157
            if (record.dynamodb.OldImage) {
×
1158
                const jsonOld = this.unmarshallStreamImage(record.dynamodb.OldImage, params)
×
1159
                typeOld = jsonOld[this.typeField]
×
1160

1161
                // If type not found then don't do anything
1162
                if (typeOld && tableModels.includes(typeOld)) {
×
1163
                    // If there was a new image of a different type then skip
1164
                    if (typeNew && typeNew !== typeOld) {
×
1165
                        continue
×
1166
                    }
1167
                    model.old = this.schema.models[typeOld].transformReadItem('get', jsonOld, {}, params)
×
1168
                }
1169
            }
1170
            const type = typeNew || typeOld
×
1171
            let list = (result[type] = result[type] || [])
×
1172
            list.push(model)
×
1173
        }
1174
        return result
×
1175
    }
1176

1177
    /*
1178
        Recursive Object.assign. Will clone dates, regexp, simple objects and arrays.
1179
        Other class instances and primitives are copied not cloned.
1180
        Max recursive depth of 20
1181
    */
1182
    assign(dest, ...sources) {
1183
        for (let src of sources) {
1,253✔
1184
            if (src) {
1,258✔
1185
                dest = this.assignInner(dest, src)
1,258✔
1186
            }
1187
        }
1188
        return dest
1,253✔
1189
    }
1190

1191
    assignInner(dest, src, recurse = 0) {
1,258✔
1192
        if (recurse++ > 1000) {
3,209!
1193
            throw new OneTableError('Recursive merge', {code: 'RuntimeError'})
×
1194
        }
1195
        if (!src || !dest || typeof src != 'object') {
3,209!
1196
            return
×
1197
        }
1198
        for (let [key, v] of Object.entries(src)) {
3,209✔
1199
            if (v === undefined) {
9,297!
1200
                continue
×
1201
            } else if (v instanceof Date) {
9,297✔
1202
                dest[key] = new Date(v)
21✔
1203
            } else if (v instanceof RegExp) {
9,276✔
1204
                dest[key] = new RegExp(v.source, v.flags)
12✔
1205
            } else if (Array.isArray(v)) {
9,264✔
1206
                if (!Array.isArray(dest[key])) {
17✔
1207
                    dest[key] = []
17✔
1208
                }
1209
                if (v.length) {
17✔
1210
                    dest[key] = this.assignInner([key], v, recurse)
17✔
1211
                }
1212
            } else if (typeof v == 'object' && v != null && v.constructor.name == 'Object') {
9,247✔
1213
                if (typeof dest[key] != 'object') {
1,934✔
1214
                    dest[key] = {}
1,929✔
1215
                }
1216
                dest[key] = this.assignInner(dest[key], v, recurse)
1,934✔
1217
            } else {
1218
                dest[key] = v
7,313✔
1219
            }
1220
        }
1221
        return dest
3,209✔
1222
    }
1223

1224
    async delay(time) {
1225
        return new Promise(function (resolve) {
×
1226
            setTimeout(() => resolve(true), time)
×
1227
        })
1228
    }
1229
}
1230

1231
/*
1232
    Emulate SenseLogs API
1233
*/
1234
class Log {
1235
    constructor(logger) {
1236
        if (logger === true) {
55✔
1237
            this.logger = this.defaultLogger
19✔
1238
        } else if (logger) {
36✔
1239
            this.logger = logger
2✔
1240
        }
1241
    }
1242

1243
    enabled() {
1244
        return true
×
1245
    }
1246

1247
    data(message, context) {
1248
        this.process('data', message, context)
855✔
1249
    }
1250

1251
    emit(chan, message, context) {
1252
        this.process(chan, message, context)
×
1253
    }
1254

1255
    error(message, context) {
1256
        this.process('error', message, context)
2✔
1257
    }
1258

1259
    info(message, context) {
1260
        this.process('info', message, context)
6✔
1261
    }
1262

1263
    trace(message, context) {
1264
        this.process('trace', message, context)
1,054✔
1265
    }
1266

1267
    process(level, message, context) {
1268
        if (this.logger) {
1,917✔
1269
            this.logger(level, message, context)
414✔
1270
        }
1271
    }
1272

1273
    defaultLogger(level, message, context) {
1274
        if (level == 'trace' || level == 'data') {
360✔
1275
            //  params.log: true will cause the level to be changed to 'info'
1276
            return
360✔
1277
        }
1278
        if (context) {
×
1279
            try {
×
1280
                console.log(level, message, JSON.stringify(context, null, 4))
×
1281
            } catch (err) {
1282
                let buf = ['{']
×
1283
                for (let [key, value] of Object.entries(context)) {
×
1284
                    try {
×
1285
                        buf.push(`    ${key}: ${JSON.stringify(value, null, 4)}`)
×
1286
                    } catch (err) {
1287
                        /* continue */
1288
                    }
1289
                }
1290
                buf.push('}')
×
1291
                console.log(level, message, buf.join('\n'))
×
1292
            }
1293
        } else {
1294
            console.log(level, message)
×
1295
        }
1296
    }
1297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc