• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sensedeep / dynamodb-onetable / #81

06 Jul 2024 11:46PM UTC coverage: 75.377% (+0.1%) from 75.239%
#81

push

Michael O'Brien
CLEAN: eslint disable

1197 of 1665 branches covered (71.89%)

Branch coverage included in aggregate %.

1901 of 2445 relevant lines covered (77.75%)

747.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.31
/src/Table.js
1
/*
2
    Table.js - DynamoDB table class
3

4
    A OneTable Table represents a single (connected) DynamoDB table
5
 */
6

7
import process from 'process'
58✔
8
import {Buffer} from 'buffer'
58✔
9
import Crypto from 'crypto'
58✔
10
import {UID, ULID, UUID} from './UID.js'
58✔
11
import Dynamo from './Dynamo.js'
58✔
12
import {Expression} from './Expression.js'
58✔
13
import {Schema} from './Schema.js'
58✔
14
import {Metrics} from './Metrics.js'
58✔
15
import {OneTableArgError, OneTableError} from './Error.js'
58✔
16

17
/*
18
    AWS V2 DocumentClient methods
19
 */
20
const DocumentClientMethods = {
58✔
21
    delete: 'delete',
22
    get: 'get',
23
    find: 'query',
24
    put: 'put',
25
    scan: 'scan',
26
    update: 'update',
27
    batchGet: 'batchGet',
28
    batchWrite: 'batchWrite',
29
    transactGet: 'transactGet',
30
    transactWrite: 'transactWrite',
31
}
32

33
/*
34
    Safety string required on API to delete a table
35
*/
36
const ConfirmRemoveTable = 'DeleteTableForever'
58✔
37

38
/*
39
    Crypto IV length
40
*/
41
const IV_LENGTH = 16
58✔
42

43
const DynamoOps = {
58✔
44
    delete: 'deleteItem',
45
    get: 'getItem',
46
    find: 'query',
47
    put: 'putItem',
48
    scan: 'scan',
49
    update: 'updateItem',
50
    batchGet: 'batchGet',
51
    batchWrite: 'batchWrite',
52
    transactGet: 'transactGet',
53
    transactWrite: 'transactWrite',
54
}
55

56
const GenericModel = '_Generic'
58✔
57

58
const maxBatchSize = 25
58✔
59

60
/*
61
    On exit, flush buffered metrics. This requires any Lambda layer to receive this signal.
62
    Without lambda layers, users can call flushMetrics() from time to time.
63
 */
64
process.on(
58✔
65
    'SIGTERM',
66
    /* istanbul ignore next */
67
    async () => {
68
        /* istanbul ignore next */
69
        await Table.terminate()
70
    }
71
)
72

73
/*
74
    Represent a single DynamoDB table
75
 */
76
export class Table {
58✔
77
    constructor(params = {}) {
×
78
        if (!params.name) {
64✔
79
            throw new OneTableArgError('Missing "name" property')
1✔
80
        }
81
        this.context = {}
63✔
82

83
        this.log = params.senselogs ? params.senselogs : new Log(params.logger)
63!
84
        this.log.trace(`Loading OneTable`)
63✔
85

86
        if (params.client) {
63✔
87
            this.setClient(params.client)
60✔
88
        }
89
        if (params.crypto) {
63✔
90
            this.initCrypto(params.crypto)
1✔
91
            this.crypto = Object.assign(params.crypto)
1✔
92
            for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
93
                crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
94
                this.crypto[name] = crypto
1✔
95
                this.crypto[name].name = name
1✔
96
            }
97
        }
98
        this.setParams(params)
63✔
99

100
        //  Set schema param defaults
101
        this.typeField = '_type'
63✔
102
        this.createdField = 'created'
63✔
103
        this.isoDates = false
63✔
104
        this.nulls = false
63✔
105
        this.separator = '#'
63✔
106
        this.timestamps = false
63✔
107
        this.updatedField = 'updated'
63✔
108

109
        this.schema = new Schema(this, params.schema)
63✔
110

111
        if (params.metrics) {
63✔
112
            this.metrics = new Metrics(this, params.metrics)
1✔
113
        }
114
        if (params.dataloader) {
63✔
115
            this.dataloader = new params.dataloader((cmds) => this.batchLoaderFunction(cmds), {maxBatchSize})
2✔
116
        }
117
    }
118

119
    setClient(client) {
120
        if (client.send && !client.V3) {
63✔
121
            //  V3 SDK and not yet wrapped by Dynamo
122
            client = new Dynamo({client})
60✔
123
        }
124
        this.client = client
63✔
125
        this.V3 = client.V3
63✔
126
        this.service = this.V3 ? this.client : this.client.service
63✔
127
    }
128

129
    setParams(params) {
130
        if (
63!
131
            params.createdField != null ||
441✔
132
            this.isoDates != null ||
133
            this.nulls != null ||
134
            this.separator != null ||
135
            this.timestamps != null ||
136
            this.typeField != null ||
137
            this.updatedField != null
138
        ) {
139
            throw new OneTableArgError(
×
140
                'Using deprecated Table constructor parameters. Define in the Schema.params instead.'
141
            )
142
        }
143

144
        if (params.uuid) {
63!
145
            console.warn(
×
146
                'OneTable: Using deprecated Table constructor "uuid" parameter. Use a "generate" function instead or ' +
147
                    'Set schema models to use "generate: uuid|ulid" explicitly.'
148
            )
149
            params.generate = params.generate | params.uuid
×
150
        }
151

152
        if (params.partial == null) {
63!
153
            console.warn(
×
154
                'OneTable: Must set Table constructor "partial" param to true or false. ' +
155
                    'This param permits updating partial nested schemas. Defaults to true.'
156
            )
157
            params.partial = true
×
158
        }
159
        //  Return hidden fields by default. Default is false.
160
        this.hidden = params.hidden != null ? params.hidden : false
63!
161
        this.partial = params.partial
63✔
162
        this.warn = params.warn || true
63✔
163

164
        if (typeof params.generate == 'function') {
63!
165
            this.generate = params.generate || this.uuid
×
166
        } else if (params.generate) {
63!
167
            throw new OneTableArgError('OneTable: Generate can only be a function')
×
168
        }
169

170
        this.name = params.name
63✔
171

172
        if (params.monitor) {
63!
173
            this.monitor = params.monitor
×
174
        }
175
        this.params = params
63✔
176
    }
177

178
    setSchemaParams(params = {}) {
11✔
179
        this.createdField = params.createdField || 'created'
62✔
180
        this.isoDates = params.isoDates || false
62✔
181
        this.nulls = params.nulls || false
62✔
182
        this.separator = params.separator != null ? params.separator : '#'
62✔
183
        this.timestamps = params.timestamps != null ? params.timestamps : false
62✔
184
        this.typeField = params.typeField || '_type'
62✔
185
        this.updatedField = params.updatedField || 'updated'
62✔
186

187
        if (params.hidden != null) {
62!
188
            this.log.warn(`Schema hidden params should be specified via the Table constructor params`, {'@stack': true})
×
189
        }
190
    }
191

192
    getSchemaParams() {
193
        return {
63✔
194
            createdField: this.createdField,
195
            isoDates: this.isoDates,
196
            nulls: this.nulls,
197
            separator: this.separator,
198
            timestamps: this.timestamps,
199
            typeField: this.typeField,
200
            updatedField: this.updatedField,
201
        }
202
    }
203

204
    async setSchema(schema) {
205
        return await this.schema.setSchema(schema)
×
206
    }
207

208
    getCurrentSchema() {
209
        return this.schema.getCurrentSchema()
5✔
210
    }
211

212
    async getKeys() {
213
        return await this.schema.getKeys()
×
214
    }
215

216
    async getPrimaryKeys() {
217
        let keys = await this.schema.getKeys()
×
218
        return keys.primary
×
219
    }
220

221
    async readSchema() {
222
        return this.schema.readSchema()
×
223
    }
224

225
    async readSchemas() {
226
        return this.schema.readSchemas()
×
227
    }
228

229
    async removeSchema(schema) {
230
        return this.schema.removeSchema(schema)
×
231
    }
232

233
    async saveSchema(schema) {
234
        return this.schema.saveSchema(schema)
×
235
    }
236

237
    /*
238
        Output the AWS table definition as a JSON structure to use in external tools such as CloudFormation
239
        or the AWS CLI to create your DynamoDB table. Uses the current schema index definition.
240
        Alternatively, params may contain standard DynamoDB createTable parameters.
241
    */
242
    getTableDefinition(params = {}) {
×
243
        let def = {
60✔
244
            AttributeDefinitions: [],
245
            KeySchema: [],
246
            LocalSecondaryIndexes: [],
247
            GlobalSecondaryIndexes: [],
248
            TableName: this.name,
249
        }
250
        let provisioned = params.provisioned || params.ProvisionedThroughput
60✔
251
        if (provisioned) {
60✔
252
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
1!
253
                def.BillingMode = 'PAY_PER_REQUEST'
×
254
            } else {
255
                def.ProvisionedThroughput = provisioned
1✔
256
                def.BillingMode = 'PROVISIONED'
1✔
257
            }
258
        } else {
259
            def.BillingMode = 'PAY_PER_REQUEST'
59✔
260
        }
261
        if (params.StreamSpecification) {
60!
262
            def.StreamSpecification = params.StreamSpecification
×
263
        }
264
        let attributes = {}
60✔
265
        let {indexes} = this.schema
60✔
266

267
        if (!indexes) {
60!
268
            throw new OneTableArgError('Cannot create table without schema indexes')
×
269
        }
270
        for (let [name, index] of Object.entries(indexes)) {
60✔
271
            let keys
272
            if (name == 'primary') {
139✔
273
                keys = def.KeySchema
60✔
274
            } else {
275
                let collection = index.type == 'local' ? 'LocalSecondaryIndexes' : 'GlobalSecondaryIndexes'
79✔
276
                keys = []
79✔
277
                let project, projection
278
                if (Array.isArray(index.project)) {
79✔
279
                    projection = 'INCLUDE'
3✔
280
                    project = index.project.filter((a) => a != indexes.primary.hash && a != indexes.primary.sort)
9✔
281
                } else if (index.project == 'keys') {
76✔
282
                    projection = 'KEYS_ONLY'
1✔
283
                } else {
284
                    projection = 'ALL'
75✔
285
                }
286
                let projDef = {
79✔
287
                    IndexName: name,
288
                    KeySchema: keys,
289
                    Projection: {
290
                        ProjectionType: projection,
291
                    },
292
                }
293
                if (project) {
79✔
294
                    projDef.Projection.NonKeyAttributes = project
3✔
295
                }
296
                def[collection].push(projDef)
79✔
297
            }
298
            keys.push({AttributeName: index.hash || indexes.primary.hash, KeyType: 'HASH'})
139!
299

300
            if (index.hash && !attributes[index.hash]) {
139✔
301
                let type = this.getAttributeType(index.hash) == 'number' ? 'N' : 'S'
135!
302
                def.AttributeDefinitions.push({AttributeName: index.hash, AttributeType: type})
135✔
303
                attributes[index.hash] = true
135✔
304
            }
305
            if (index.sort) {
139✔
306
                if (!attributes[index.sort]) {
133✔
307
                    let type = this.getAttributeType(index.sort) == 'number' ? 'N' : 'S'
131✔
308
                    def.AttributeDefinitions.push({AttributeName: index.sort, AttributeType: type})
131✔
309
                    attributes[index.sort] = true
131✔
310
                }
311
                keys.push({AttributeName: index.sort, KeyType: 'RANGE'})
133✔
312
            }
313
        }
314
        if (def.GlobalSecondaryIndexes.length == 0) {
60✔
315
            delete def.GlobalSecondaryIndexes
19✔
316
        } else if (provisioned) {
41✔
317
            for (let index of def.GlobalSecondaryIndexes) {
1✔
318
                index.ProvisionedThroughput = provisioned
3✔
319
            }
320
        }
321
        if (def.LocalSecondaryIndexes.length == 0) {
60✔
322
            delete def.LocalSecondaryIndexes
57✔
323
        }
324
        return def
60✔
325
    }
326

327
    /*
328
        Create a DynamoDB table. Uses the current schema index definition.
329
        Alternatively, params may contain standard DynamoDB createTable parameters.
330
    */
331
    async createTable(params = {}) {
59✔
332
        const def = this.getTableDefinition(params)
60✔
333
        let result
334

335
        this.log.trace(`OneTable createTable for "${this.name}"`, {def})
60✔
336
        if (this.V3) {
60✔
337
            result = await this.service.createTable(def)
58✔
338
        } else {
339
            result = await this.service.createTable(def).promise()
2✔
340
        }
341

342
        /*
343
            Wait for table to become active. Must do if setting a TTL attribute
344
        */
345
        if (params.TimeToLiveSpecification) {
60!
346
            params.wait = 5 * 60
×
347
        }
348
        if (params.wait) {
60!
349
            let deadline = new Date(Date.now() + params.wait * 1000)
×
350
            let info
351
            do {
×
352
                info = await this.describeTable()
×
353
                if (info.Table.TableStatus == 'ACTIVE') {
×
354
                    break
×
355
                }
356
                if (deadline < Date.now()) {
×
357
                    throw new Error('Table has not become active')
×
358
                }
359
                await this.delay(1000)
×
360
            } while (Date.now() < deadline)
361
        }
362

363
        /*
364
            Define a TTL attribute
365
        */
366
        if (params.TimeToLiveSpecification) {
60!
367
            let def = {
×
368
                TableName: this.name,
369
                TimeToLiveSpecification: params.TimeToLiveSpecification,
370
            }
371
            if (this.V3) {
×
372
                await this.service.updateTimeToLive(def)
×
373
            } else {
374
                await this.service.updateTimeToLive(def).promise()
×
375
            }
376
        }
377
        return result
60✔
378
    }
379

380
    getAttributeType(name) {
381
        for (let model of Object.values(this.schema.models)) {
266✔
382
            let fields = model.block.fields
308✔
383
            if (fields[name]) {
308✔
384
                return fields[name].type
248✔
385
            }
386
        }
387
        return null
18✔
388
    }
389

390
    /*
391
        Delete the DynamoDB table forever. Be careful.
392
    */
393
    async deleteTable(confirmation) {
394
        if (confirmation == ConfirmRemoveTable) {
60✔
395
            this.log.trace(`OneTable deleteTable for "${this.name}"`)
59✔
396
            if (this.V3) {
59✔
397
                await this.service.deleteTable({TableName: this.name})
57✔
398
            } else {
399
                await this.service.deleteTable({TableName: this.name}).promise()
2✔
400
            }
401
        } else {
402
            throw new OneTableArgError(`Missing required confirmation "${ConfirmRemoveTable}"`)
1✔
403
        }
404
    }
405

406
    async updateTable(params = {}) {
×
407
        let def = {
×
408
            AttributeDefinitions: [],
409
            GlobalSecondaryIndexUpdates: [],
410
            TableName: this.name,
411
        }
412
        let {create, provisioned} = params
×
413

414
        if (provisioned) {
×
415
            if (!provisioned.ReadCapacityUnits && !provisioned.WriteCapacityUnits) {
×
416
                def.BillingMode = 'PAY_PER_REQUEST'
×
417
            } else {
418
                if (!create) {
×
419
                    def.ProvisionedThroughput = provisioned
×
420
                }
421
                def.BillingMode = 'PROVISIONED'
×
422
            }
423
        }
424
        let indexes = this.schema.indexes
×
425
        if (!indexes) {
×
426
            throw new OneTableArgError('Cannot update table without schema indexes')
×
427
        }
428
        if (create) {
×
429
            if (create.hash == null || create.hash == indexes.primary.hash || create.type == 'local') {
×
430
                throw new OneTableArgError('Cannot update table to create an LSI')
×
431
            }
432
            let keys = []
×
433
            let projection, project
434

435
            if (Array.isArray(create.project)) {
×
436
                projection = 'INCLUDE'
×
437
                project = create.project.filter((a) => a != create.hash && a != create.sort)
×
438
            } else if (create.project == 'keys') {
×
439
                projection = 'KEYS_ONLY'
×
440
            } else {
441
                projection = 'ALL'
×
442
            }
443
            let projDef = {
×
444
                IndexName: create.name,
445
                KeySchema: keys,
446
                Projection: {
447
                    ProjectionType: projection,
448
                },
449
            }
450
            if (project) {
×
451
                projDef.Projection.NonKeyAttributes = project
×
452
            }
453
            keys.push({AttributeName: create.hash, KeyType: 'HASH'})
×
454
            def.AttributeDefinitions.push({AttributeName: create.hash, AttributeType: 'S'})
×
455

456
            if (create.sort) {
×
457
                def.AttributeDefinitions.push({AttributeName: create.sort, AttributeType: 'S'})
×
458
                keys.push({AttributeName: create.sort, KeyType: 'RANGE'})
×
459
            }
460
            if (provisioned) {
×
461
                projDef.ProvisionedThroughput = provisioned
×
462
            }
463
            def.GlobalSecondaryIndexUpdates.push({Create: projDef})
×
464
        } else if (params.remove) {
×
465
            def.GlobalSecondaryIndexUpdates.push({Delete: {IndexName: params.remove.name}})
×
466
        } else if (params.update) {
×
467
            let update = {Update: {IndexName: params.update.name}}
×
468
            if (provisioned) {
×
469
                update.Update.ProvisionedThroughput = provisioned
×
470
            }
471
            def.GlobalSecondaryIndexUpdates.push(update)
×
472
        }
473
        if (def.GlobalSecondaryIndexUpdates.length == 0) {
×
474
            delete def.GlobalSecondaryIndexUpdates
×
475
        }
476
        if (params.TimeToLiveSpecification) {
×
477
            let def = {
×
478
                TableName: params.TableName,
479
                TimeToLiveSpecification: params.TimeToLiveSpecification,
480
            }
481
            if (this.V3) {
×
482
                await this.service.updateTimeToLive(def)
×
483
            } else {
484
                await this.service.updateTimeToLive(def).promise()
×
485
            }
486
        }
487
        this.log.trace(`OneTable updateTable for "${this.name}"`, {def})
×
488
        if (this.V3) {
×
489
            return await this.service.updateTable(def)
×
490
        } else {
491
            return await this.service.updateTable(def).promise()
×
492
        }
493
    }
494

495
    /*
496
        Return the raw AWS table description
497
    */
498
    async describeTable() {
499
        if (this.V3) {
4✔
500
            return await this.service.describeTable({TableName: this.name})
3✔
501
        } else {
502
            return await this.service.describeTable({TableName: this.name}).promise()
1✔
503
        }
504
    }
505

506
    /*
507
        Return true if the underlying DynamoDB table represented by this OneTable instance is present.
508
    */
509
    async exists() {
510
        let results = await this.listTables()
154✔
511
        return results && results.find((t) => t == this.name) != null ? true : false
259✔
512
    }
513

514
    /*
515
        Return a list of tables in the AWS region described by the Table instance
516
    */
517
    async listTables() {
518
        let results
519
        if (this.V3) {
157✔
520
            results = await this.service.listTables({})
151✔
521
        } else {
522
            results = await this.service.listTables({}).promise()
6✔
523
        }
524
        return results.TableNames
157✔
525
    }
526

527
    listModels() {
528
        return this.schema.listModels()
8✔
529
    }
530

531
    addModel(name, fields) {
532
        this.schema.addModel(name, fields)
1✔
533
    }
534

535
    getLog() {
536
        return this.log
×
537
    }
538

539
    setLog(log) {
540
        this.log = log
×
541
        if (this.metrics) {
×
542
            this.metrics.setLog(log)
×
543
        }
544
    }
545

546
    /*
547
        Thows exception if model cannot be found
548
     */
549
    getModel(name, options = {nothrow: false}) {
145✔
550
        return this.schema.getModel(name, options)
145✔
551
    }
552

553
    removeModel(name) {
554
        return this.schema.removeModel(name)
2✔
555
    }
556

557
    getContext() {
558
        return this.context
11✔
559
    }
560

561
    addContext(context = {}) {
×
562
        this.context = Object.assign(this.context, context)
1✔
563
        return this
1✔
564
    }
565

566
    setContext(context = {}, merge = false) {
6!
567
        this.context = merge ? Object.assign(this.context, context) : context
7✔
568
        return this
7✔
569
    }
570

571
    clearContext() {
572
        this.context = {}
1✔
573
        return this
1✔
574
    }
575

576
    /*  PROTOTYPE
577
        Create a clone of the table with the same settings and replace the context
578
    */
579
    child(context) {
580
        let table = JSON.parse(JSON.stringify(this))
×
581
        table.context = context
×
582
        return table
×
583
    }
584

585
    /*
586
        High level model factory API
587
        The high level API is similar to the Model API except the model name is provided as the first parameter.
588
        This API is useful for factories
589
    */
590
    async create(modelName, properties, params) {
591
        let model = this.getModel(modelName)
20✔
592
        return await model.create(properties, params)
20✔
593
    }
594

595
    async find(modelName, properties, params) {
596
        let model = this.getModel(modelName)
3✔
597
        return await model.find(properties, params)
3✔
598
    }
599

600
    async get(modelName, properties, params) {
601
        let model = this.getModel(modelName)
24✔
602
        return await model.get(properties, params)
24✔
603
    }
604

605
    async load(modelName, properties, params) {
606
        let model = this.getModel(modelName)
6✔
607
        return await model.load(properties, params)
6✔
608
    }
609

610
    init(modelName, properties, params) {
611
        let model = this.getModel(modelName)
×
612
        return model.init(properties, params)
×
613
    }
614

615
    async remove(modelName, properties, params) {
616
        let model = this.getModel(modelName)
5✔
617
        return await model.remove(properties, params)
5✔
618
    }
619

620
    async scan(modelName, properties, params) {
621
        let model = this.getModel(modelName)
21✔
622
        return await model.scan(properties, params)
21✔
623
    }
624

625
    async update(modelName, properties, params) {
626
        let model = this.getModel(modelName)
8✔
627
        return await model.update(properties, params)
8✔
628
    }
629

630
    async upsert(modelName, properties, params = {}) {
×
631
        params.exists = null
×
632
        return this.update(modelName, properties, params)
×
633
    }
634

635
    async execute(model, op, cmd, properties = {}, params = {}) {
×
636
        let mark = new Date()
1,197✔
637
        let trace = {model, cmd, op, properties}
1,197✔
638
        let result
639
        try {
1,197✔
640
            let client = params.client || this.client
1,197✔
641
            if (params.stats || this.metrics || this.monitor) {
1,197✔
642
                cmd.ReturnConsumedCapacity = params.capacity || 'INDEXES'
4✔
643
                cmd.ReturnItemCollectionMetrics = 'SIZE'
4✔
644
            }
645
            this.log[params.log ? 'info' : 'trace'](`OneTable "${op}" "${model}"`, {trace})
1,197✔
646
            if (this.V3) {
1,197✔
647
                result = await client[op](cmd)
1,167✔
648
            } else {
649
                result = await client[DocumentClientMethods[op]](cmd).promise()
30✔
650
            }
651
        } catch (err) {
652
            //  V3 stores the error in 'name' (Ugh!)
653
            let code = err.code || err.name
6✔
654
            if (params.throw === false) {
6✔
655
                result = {}
1✔
656
            } else if (code == 'ConditionalCheckFailedException' && op == 'put') {
5!
657
                this.log.info(`Conditional check failed "${op}" on "${model}"`, {err, trace})
×
658
                throw new OneTableError(`Conditional create failed for "${model}"`, {
×
659
                    code,
660
                    err,
661
                    trace,
662
                })
663
            } else if (code == 'ProvisionedThroughputExceededException') {
5!
664
                throw new OneTableError('Provisioning Throughput Exception', {
×
665
                    code,
666
                    err,
667
                    trace,
668
                })
669
            } else if (code == 'TransactionCanceledException') {
5✔
670
                throw new OneTableError('Transaction Cancelled', {
3✔
671
                    code,
672
                    err,
673
                    trace,
674
                })
675
            } else {
676
                result = result || {}
2✔
677
                result.Error = 1
2✔
678
                if (params.log != false) {
2✔
679
                    this.log.error(`OneTable exception in "${op}" on "${model} ${err.message}"`, {err, trace})
2✔
680
                }
681
                throw new OneTableError(`OneTable execute failed "${op}" for "${model}", ${err.message}`, {
2✔
682
                    code,
683
                    err,
684
                    trace,
685
                })
686
            }
687
        } finally {
688
            if (result) {
1,197✔
689
                if (this.metrics) {
1,194✔
690
                    await this.metrics.add(model, op, result, params, mark)
3✔
691
                }
692
                if (this.monitor) {
1,194!
693
                    await this.monitor(model, op, result, params, mark)
×
694
                }
695
            }
696
        }
697
        if (typeof params.info == 'object') {
1,192!
698
            params.info.operation = DynamoOps[op]
×
699
            params.info.args = cmd
×
700
            params.info.properties = properties
×
701
        }
702
        return result
1,192✔
703
    }
704

705
    /*
706
        The low level API does not use models. It permits the reading / writing of any attribute.
707
    */
708
    async batchGet(batch, params = {}) {
2✔
709
        if (Object.getOwnPropertyNames(batch).length == 0) {
7!
710
            return []
×
711
        }
712
        let def = batch.RequestItems[this.name]
7✔
713

714
        if (params.fields) {
7✔
715
            if (params.fields.indexOf(this.typeField) < 0) {
1✔
716
                params.fields.push(this.typeField)
1✔
717
            }
718
            let expression = new Expression(this.schema.genericModel, 'batchGet', {}, params)
1✔
719
            let cmd = expression.command()
1✔
720
            def.ProjectionExpression = cmd.ProjectionExpression
1✔
721
            def.ExpressionAttributeNames = cmd.ExpressionAttributeNames
1✔
722
        }
723
        def.ConsistentRead = params.consistent ? true : false
7✔
724

725
        // let result = await this.execute(GenericModel, 'batchGet', batch, {}, params)
726

727
        let result,
728
            retries = 0,
6✔
729
            more
730
        result = params.parse ? [] : {Responses: {}}
6✔
731
        do {
6✔
732
            more = false
6✔
733
            let data = await this.execute(GenericModel, 'batchGet', batch, {}, params)
6✔
734
            if (data) {
6✔
735
                let responses = data.Responses
6✔
736
                if (responses) {
6✔
737
                    for (let [key, items] of Object.entries(responses)) {
6✔
738
                        for (let item of items) {
6✔
739
                            if (params.parse) {
12✔
740
                                item = this.unmarshall(item, params)
7✔
741
                                let type = item[this.typeField] || '_unknown'
7!
742
                                let model = this.schema.models[type]
7✔
743
                                if (model && model != this.schema.uniqueModel) {
7✔
744
                                    result.push(model.transformReadItem('get', item, {}, params))
7✔
745
                                }
746
                            } else {
747
                                let set = (result.Responses[key] = result.Responses[key] || [])
5✔
748
                                set.push(item)
5✔
749
                            }
750
                        }
751
                    }
752
                }
753
                let unprocessed = data.UnprocessedItems
6✔
754
                if (unprocessed && Object.keys(unprocessed).length) {
6!
755
                    batch.RequestItems = unprocessed
×
756
                    if (params.reprocess === false) {
×
757
                        return false
×
758
                    }
759
                    if (retries > 11) {
×
760
                        throw new Error(unprocessed)
×
761
                    }
762
                    await this.delay(10 * 2 ** retries++)
×
763
                    more = true
×
764
                }
765
            }
766
        } while (more)
767
        return result
6✔
768
    }
769

770
    /*
771
        AWS BatchWrite may throw an exception if no items can be processed.
772
        Otherwise it will retry (up to 11 times) and return partial results in UnprocessedItems.
773
        Those will be handled here if possible.
774
    */
775
    async batchWrite(batch, params = {}) {
5✔
776
        if (Object.getOwnPropertyNames(batch).length === 0) {
6!
777
            return {}
×
778
        }
779
        let retries = 0,
6✔
780
            more
781
        do {
6✔
782
            more = false
6✔
783
            let response = await this.execute(GenericModel, 'batchWrite', batch, {}, params)
6✔
784
            if (response && response.UnprocessedItems && Object.keys(response.UnprocessedItems).length) {
5!
785
                batch.RequestItems = response.UnprocessedItems
×
786
                if (params.reprocess === false) {
×
787
                    return false
×
788
                }
789
                if (retries > 11) {
×
790
                    throw new Error(response.UnprocessedItems)
×
791
                }
792
                await this.delay(10 * 2 ** retries++)
×
793
                more = true
×
794
            }
795
        } while (more)
796
        return true
5✔
797
    }
798

799
    async batchLoad(expression) {
800
        if (this.dataloader) {
6✔
801
            return await this.dataloader.load(expression)
6✔
802
        }
803
        throw new Error('params.dataloader DataLoader constructor is required to use load feature')
×
804
    }
805

806
    async deleteItem(properties, params) {
807
        return await this.schema.genericModel.deleteItem(properties, params)
1✔
808
    }
809

810
    async getItem(properties, params) {
811
        return await this.schema.genericModel.getItem(properties, params)
2✔
812
    }
813

814
    async putItem(properties, params) {
815
        return await this.schema.genericModel.putItem(properties, params)
2✔
816
    }
817

818
    async queryItems(properties, params) {
819
        return await this.schema.genericModel.queryItems(properties, params)
10✔
820
    }
821

822
    async scanItems(properties, params) {
823
        return await this.schema.genericModel.scanItems(properties, params)
20✔
824
    }
825

826
    async updateItem(properties, params) {
827
        return await this.schema.genericModel.updateItem(properties, params)
3✔
828
    }
829

830
    async fetch(models, properties, params) {
831
        return await this.schema.genericModel.fetch(models, properties, params)
2✔
832
    }
833

834
    /*
835
        Invoke a prepared transaction. Note: transactGet does not work on non-primary indexes.
836
     */
837
    async transact(op, transaction, params = {}) {
4✔
838
        if (params.execute === false) {
27!
839
            if (params.log !== false) {
×
840
                this.log[params.log ? 'info' : 'data'](`OneTable transaction for "${op}" (not executed)`, {
×
841
                    transaction,
842
                    op,
843
                    params,
844
                })
845
            }
846
            return transaction
×
847
        }
848
        let result = await this.execute(
27✔
849
            GenericModel,
850
            op == 'write' ? 'transactWrite' : 'transactGet',
27✔
851
            transaction,
852
            {},
853
            params
854
        )
855
        if (op == 'get') {
24✔
856
            if (params.parse) {
4✔
857
                let items = []
3✔
858
                for (let r of result.Responses) {
3✔
859
                    if (r.Item) {
7✔
860
                        let item = this.unmarshall(r.Item, params)
7✔
861
                        let type = item[this.typeField] || '_unknown'
7!
862
                        let model = this.schema.models[type]
7✔
863
                        if (model && model != this.schema.uniqueModel) {
7✔
864
                            items.push(model.transformReadItem('get', item, {}, params))
7✔
865
                        }
866
                    }
867
                }
868
                result = items
3✔
869
            }
870
        }
871
        return result
24✔
872
    }
873

874
    /*
875
        Convert items into a map of items by model type
876
    */
877
    groupByType(items, params = {}) {
5✔
878
        let result = {}
6✔
879
        for (let item of items) {
6✔
880
            let type = item[this.typeField] || '_unknown'
23!
881
            let list = (result[type] = result[type] || [])
23✔
882
            let model = this.schema.models[type]
23✔
883
            let preparedItem
884
            if (typeof params.hidden === 'boolean' && !params.hidden) {
23✔
885
                let fields = model.block.fields
4✔
886
                preparedItem = {}
4✔
887
                for (let [name, field] of Object.entries(fields)) {
4✔
888
                    if (!(field.hidden && params.hidden !== true)) {
37✔
889
                        preparedItem[name] = item[name]
17✔
890
                    }
891
                }
892
            } else {
893
                preparedItem = item
19✔
894
            }
895
            list.push(preparedItem)
23✔
896
        }
897
        return result
6✔
898
    }
899

900
    /**
901
        This is a function passed to the DataLoader that given an array of Expression
902
        will group all commands by TableName and make all Keys unique and then will perform
903
        a BatchGet request and process the response of the BatchRequest to return an array
904
        with the corresponding response in the same order as it was requested.
905
        @param expressions
906
        @returns {Promise<*>}
907
     */
908
    async batchLoaderFunction(expressions) {
909
        const commands = expressions.map((each) => each.command())
6✔
910

911
        const groupedByTableName = commands.reduce((groupedBy, item) => {
2✔
912
            const tableName = item.TableName
6✔
913
            if (!groupedBy[tableName]) groupedBy[tableName] = []
6✔
914
            groupedBy[tableName].push(item)
6✔
915
            return groupedBy
6✔
916
        }, {})
917

918
        // convert each of the get requests into a single RequestItem with unique Keys
919
        const requestItems = Object.keys(groupedByTableName).reduce((requestItems, tableName) => {
2✔
920
            // batch get does not support duplicate Keys, so we need to make them unique
921
            // it's complex because we have the unmarshalled values on the Keys when it's V3
922
            const allKeys = groupedByTableName[tableName].map((each) => each.Key)
6✔
923
            const uniqueKeys = allKeys.filter((key1, index1, self) => {
2✔
924
                const index2 = self.findIndex((key2) => {
6✔
925
                    return Object.keys(key2).every((prop) => {
9✔
926
                        if (this.V3) {
15✔
927
                            const type = Object.keys(key1[prop])[0] // { S: "XX" } => type is S
15✔
928
                            return key2[prop][type] === key1[prop][type]
15✔
929
                        }
930
                        return key2[prop] === key1[prop]
×
931
                    })
932
                })
933
                return index2 === index1
6✔
934
            })
935
            requestItems[tableName] = {Keys: uniqueKeys}
2✔
936
            return requestItems
2✔
937
        }, {})
938

939
        const results = await this.batchGet({RequestItems: requestItems})
2✔
940

941
        // return the exact mapping (on same order as input) of each get command request to the result from database
942
        // to do that we need to find in the Responses object the item that was request and return it in the same position
943
        return commands.map((command, index) => {
2✔
944
            const {model, params} = expressions[index]
6✔
945

946
            // each key is { pk: { S: "XX" } } when V3 or { pk: "XX" } when V2
947
            // on map function, key will be pk and unmarshalled will be { S: "XX" }, OR "XXX"
948
            const criteria = Object.entries(command.Key).map(([key, unmarshalled]) => {
6✔
949
                if (this.V3) {
12✔
950
                    const type = Object.keys(unmarshalled)[0] // the type will be S
12✔
951
                    return [[key, type], unmarshalled[type]] // return [[pk, S], "XX"]
12✔
952
                }
953
                return [[key], unmarshalled]
×
954
            })
955

956
            // finds the matching object in the unmarshalled Responses array with criteria Key above
957
            const findByKeyUnmarshalled = (items = []) =>
6!
958
                items.find((item) => {
6✔
959
                    return criteria.every(([[prop, type], value]) => {
9✔
960
                        if (type) return item[prop][type] === value // if it has a type it means it is V3
15✔
961
                        return item[prop] === value
×
962
                    })
963
                })
964

965
            const items = results.Responses[command.TableName]
6✔
966
            const item = findByKeyUnmarshalled(items)
6✔
967
            if (item) {
6✔
968
                const unmarshalled = this.unmarshall(item, params)
6✔
969
                return model.transformReadItem('get', unmarshalled, {}, params)
6✔
970
            }
971
        })
972
    }
973

974
    /*
975
        Simple non-crypto UUID. See node-uuid if you require crypto UUIDs.
976
        Consider ULIDs which are crypto sortable.
977
    */
978
    uuid() {
979
        return UUID()
8✔
980
    }
981

982
    // Simple time-based, sortable unique ID.
983
    ulid() {
984
        return new ULID().toString()
854✔
985
    }
986

987
    /*
988
        Crypto-grade ID of given length. If >= 10 in length, suitably unique for most use-cases.
989
     */
990
    uid(size = 10) {
×
991
        return UID(size)
×
992
    }
993

994
    setGenerate(fn) {
995
        this.generate = fn
×
996
    }
997

998
    /*
999
        Return the value template variable references in a list
1000
     */
1001
    getVars(v) {
1002
        let list = []
550✔
1003
        if (Array.isArray(v)) {
550!
1004
            list = v
×
1005
        } else if (typeof v == 'string') {
550✔
1006
            v.replace(/\${(.*?)}/g, (match, varName) => {
549✔
1007
                list.push(varName)
644✔
1008
            })
1009
        }
1010
        return list
550✔
1011
    }
1012

1013
    initCrypto(crypto) {
1014
        this.crypto = Object.assign(crypto)
1✔
1015
        for (let [name, crypto] of Object.entries(this.crypto)) {
1✔
1016
            crypto.secret = Crypto.createHash('sha256').update(crypto.password, 'utf8').digest()
1✔
1017
            this.crypto[name] = crypto
1✔
1018
            this.crypto[name].name = name
1✔
1019
        }
1020
    }
1021

1022
    encrypt(text, name = 'primary', inCode = 'utf8', outCode = 'base64') {
×
1023
        if (text) {
1✔
1024
            if (!this.crypto) {
1!
1025
                throw new OneTableArgError('No database secret or cipher defined')
×
1026
            }
1027
            let crypto = this.crypto[name]
1✔
1028
            if (!crypto) {
1!
1029
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1030
            }
1031
            let iv = Crypto.randomBytes(IV_LENGTH)
1✔
1032
            let crypt = Crypto.createCipheriv(crypto.cipher, crypto.secret, iv)
1✔
1033
            let crypted = crypt.update(text, inCode, outCode) + crypt.final(outCode)
1✔
1034
            let tag = crypto.cipher.indexOf('-gcm') > 0 ? crypt.getAuthTag().toString(outCode) : ''
1!
1035
            text = `${crypto.name}:${tag}:${iv.toString('hex')}:${crypted}`
1✔
1036
        }
1037
        return text
1✔
1038
    }
1039

1040
    decrypt(text, inCode = 'base64', outCode = 'utf8') {
×
1041
        if (text) {
2✔
1042
            let [name, tag, iv, data] = text.split(':')
2✔
1043
            if (!data || !iv || !tag || !name) {
2!
1044
                return text
×
1045
            }
1046
            if (!this.crypto) {
2!
1047
                throw new OneTableArgError('No database secret or cipher defined')
×
1048
            }
1049
            let crypto = this.crypto[name]
2✔
1050
            if (!crypto) {
2!
1051
                throw new OneTableArgError(`Database crypto not defined for ${name}`)
×
1052
            }
1053
            iv = Buffer.from(iv, 'hex')
2✔
1054
            let crypt = Crypto.createDecipheriv(crypto.cipher, crypto.secret, iv)
2✔
1055
            crypt.setAuthTag(Buffer.from(tag, inCode))
2✔
1056
            text = crypt.update(data, inCode, outCode) + crypt.final(outCode)
2✔
1057
        }
1058
        return text
2✔
1059
    }
1060

1061
    /*
1062
        Marshall data into and out of DynamoDB format
1063
    */
1064
    marshall(item, params) {
1065
        let client = params.client || this.client
3,431✔
1066
        if (client.V3) {
3,431✔
1067
            let options = client.params.marshall
3,346✔
1068
            if (Array.isArray(item)) {
3,346!
1069
                for (let i = 0; i < item.length; i++) {
×
1070
                    item[i] = client.marshall(item[i], options)
×
1071
                }
1072
            } else {
1073
                item = client.marshall(item, options)
3,346✔
1074
            }
1075
        } else {
1076
            if (Array.isArray(item)) {
85!
1077
                for (let i = 0; i < item.length; i++) {
×
1078
                    item = this.marshallv2(item, params)
×
1079
                }
1080
            } else {
1081
                item = this.marshallv2(item, params)
85✔
1082
            }
1083
        }
1084
        return item
3,431✔
1085
    }
1086

1087
    /*
1088
        Marshall data out of DynamoDB format
1089
    */
1090
    unmarshall(item, params) {
1091
        if (this.V3) {
362✔
1092
            let client = params.client ? params.client : this.client
334!
1093
            let options = client.params.unmarshall
334✔
1094
            if (Array.isArray(item)) {
334✔
1095
                for (let i = 0; i < item.length; i++) {
273✔
1096
                    item[i] = client.unmarshall(item[i], options)
2,385✔
1097
                }
1098
            } else {
1099
                item = client.unmarshall(item, options)
61✔
1100
            }
1101
        } else {
1102
            if (Array.isArray(item)) {
28✔
1103
                for (let i = 0; i < item.length; i++) {
22✔
1104
                    item[i] = this.unmarshallv2(item[i])
26✔
1105
                }
1106
            } else {
1107
                item = this.unmarshallv2(item)
6✔
1108
            }
1109
        }
1110
        return item
362✔
1111
    }
1112

1113
    marshallv2(item, params) {
1114
        let client = params.client ? params.client : this.client
85!
1115
        for (let [key, value] of Object.entries(item)) {
85✔
1116
            if (value instanceof Set) {
203✔
1117
                item[key] = client.createSet(Array.from(value))
7✔
1118
            }
1119
        }
1120
        return item
85✔
1121
    }
1122

1123
    unmarshallv2(item) {
1124
        for (let [key, value] of Object.entries(item)) {
32✔
1125
            if (
440✔
1126
                value != null &&
898✔
1127
                typeof value == 'object' &&
1128
                value.wrapperName == 'Set' &&
1129
                Array.isArray(value.values)
1130
            ) {
1131
                let list = value.values
9✔
1132
                if (value.type == 'Binary') {
9✔
1133
                    //  Match AWS SDK V3 behavior
1134
                    list = list.map((v) => new Uint8Array(v))
9✔
1135
                }
1136
                item[key] = new Set(list)
9✔
1137
            }
1138
        }
1139
        return item
32✔
1140
    }
1141

1142
    unmarshallStreamImage(image, params) {
1143
        let client = params.client ? params.client : this.client
×
1144
        let options = client.params.unmarshall
×
1145
        return client.unmarshall(image, options)
×
1146
    }
1147

1148
    /*
1149
        Handle DynamoDb Stream Records
1150
     */
1151
    stream(records, params = {}) {
×
1152
        const tableModels = this.listModels()
×
1153

1154
        const result = {}
×
1155
        for (const record of records) {
×
1156
            if (!record.dynamodb.NewImage && !record.dynamodb.OldImage) {
×
1157
                continue
×
1158
            }
1159
            const model = {type: record.eventName}
×
1160
            let typeNew
1161
            let typeOld
1162

1163
            // Unmarshall and transform the New Image if it exists
1164
            if (record.dynamodb.NewImage) {
×
1165
                const jsonNew = this.unmarshallStreamImage(record.dynamodb.NewImage, params)
×
1166
                typeNew = jsonNew[this.typeField]
×
1167

1168
                // If type not found then don't do anything
1169
                if (typeNew && tableModels.includes(typeNew)) {
×
1170
                    model.new = this.schema.models[typeNew].transformReadItem('get', jsonNew, {}, params)
×
1171
                }
1172
            }
1173
            // Unmarshall and transform the Old Image if it exists
1174
            if (record.dynamodb.OldImage) {
×
1175
                const jsonOld = this.unmarshallStreamImage(record.dynamodb.OldImage, params)
×
1176
                typeOld = jsonOld[this.typeField]
×
1177

1178
                // If type not found then don't do anything
1179
                if (typeOld && tableModels.includes(typeOld)) {
×
1180
                    // If there was a new image of a different type then skip
1181
                    if (typeNew && typeNew !== typeOld) {
×
1182
                        continue
×
1183
                    }
1184
                    model.old = this.schema.models[typeOld].transformReadItem('get', jsonOld, {}, params)
×
1185
                }
1186
            }
1187
            const type = typeNew || typeOld
×
1188
            let list = (result[type] = result[type] || [])
×
1189
            list.push(model)
×
1190
        }
1191
        return result
×
1192
    }
1193

1194
    /*
1195
        Recursive Object.assign. Will clone dates, regexp, simple objects and arrays.
1196
        Other class instances and primitives are copied not cloned.
1197
        Max recursive depth of 20
1198
    */
1199
    assign(dest, ...sources) {
1200
        for (let src of sources) {
1,614✔
1201
            if (src) {
1,619✔
1202
                dest = this.assignInner(dest, src)
1,619✔
1203
            }
1204
        }
1205
        return dest
1,614✔
1206
    }
1207

1208
    assignInner(dest, src, recurse = 0) {
1,619✔
1209
        if (recurse++ > 1000) {
3,989!
1210
            throw new OneTableError('Recursive merge', {code: 'RuntimeError'})
×
1211
        }
1212
        if (!src || !dest || typeof src != 'object') {
3,989!
1213
            return
×
1214
        }
1215
        for (let [key, v] of Object.entries(src)) {
3,989✔
1216
            if (v === undefined) {
11,045!
1217
                continue
×
1218
            } else if (v instanceof Date) {
11,045✔
1219
                dest[key] = new Date(v)
21✔
1220
            } else if (v instanceof RegExp) {
11,024✔
1221
                dest[key] = new RegExp(v.source, v.flags)
12✔
1222
            } else if (Array.isArray(v)) {
11,012✔
1223
                if (!Array.isArray(dest[key])) {
48✔
1224
                    dest[key] = []
48✔
1225
                }
1226
                if (v.length) {
48✔
1227
                    dest[key] = this.assignInner([key], v, recurse)
43✔
1228
                }
1229
            } else if (typeof v == 'object' && v != null && v.constructor.name == 'Object') {
10,964✔
1230
                if (typeof dest[key] != 'object') {
2,327✔
1231
                    dest[key] = {}
2,322✔
1232
                }
1233
                dest[key] = this.assignInner(dest[key], v, recurse)
2,327✔
1234
            } else {
1235
                dest[key] = v
8,637✔
1236
            }
1237
        }
1238
        return dest
3,989✔
1239
    }
1240

1241
    async delay(time) {
1242
        return new Promise(function (resolve) {
×
1243
            setTimeout(() => resolve(true), time)
×
1244
        })
1245
    }
1246

1247
    async flushMetrics() {
1248
        await this.metrics.flush()
3✔
1249
    }
1250

1251
    static async terminate() {
1252
        await Metrics.terminate()
1✔
1253
    }
1254
}
1255

1256
/*
1257
    Emulate SenseLogs API
1258
*/
1259
class Log {
1260
    constructor(logger) {
1261
        if (logger === true) {
63✔
1262
            this.logger = this.defaultLogger
25✔
1263
        } else if (logger) {
38✔
1264
            this.logger = logger
3✔
1265
        }
1266
    }
1267

1268
    enabled() {
1269
        return true
3✔
1270
    }
1271

1272
    data(message, context) {
1273
        this.process('data', message, context)
1,149✔
1274
    }
1275

1276
    emit(chan, message, context) {
1277
        this.process(chan, message, context)
×
1278
    }
1279

1280
    error(message, context) {
1281
        this.process('error', message, context)
6✔
1282
    }
1283

1284
    info(message, context) {
1285
        this.process('info', message, context)
8✔
1286
    }
1287

1288
    trace(message, context) {
1289
        this.process('trace', message, context)
1,455✔
1290
    }
1291

1292
    process(level, message, context) {
1293
        if (this.logger) {
2,618✔
1294
            this.logger(level, message, context)
761✔
1295
        }
1296
    }
1297

1298
    defaultLogger(level, message, context) {
1299
        if (level == 'trace' || level == 'data') {
515✔
1300
            //  params.log: true will cause the level to be changed to 'info'
1301
            return
515✔
1302
        }
1303
        if (context) {
×
1304
            try {
×
1305
                console.log(level, message, JSON.stringify(context, null, 4))
×
1306
            } catch (err) {
1307
                let buf = ['{']
×
1308
                for (let [key, value] of Object.entries(context)) {
×
1309
                    try {
×
1310
                        buf.push(`    ${key}: ${JSON.stringify(value, null, 4)}`)
×
1311
                    } catch (err) {
1312
                        /* continue */
1313
                    }
1314
                }
1315
                buf.push('}')
×
1316
                console.log(level, message, buf.join('\n'))
×
1317
            }
1318
        } else {
1319
            console.log(level, message)
×
1320
        }
1321
    }
1322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc