• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6507869340

13 Oct 2023 11:54AM UTC coverage: 89.618% (-0.003%) from 89.621%
6507869340

push

gh-ci

jeroiraz
chore(pkg/api): set optional parameters

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33518 of 37401 relevant lines covered (89.62%)

143223.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.86
/embedded/document/engine.go
1
/*
2
Copyright 2023 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16
package document
17

18
import (
19
        "bytes"
20
        "context"
21
        "errors"
22
        "fmt"
23
        "regexp"
24
        "strings"
25

26
        "github.com/codenotary/immudb/embedded/sql"
27
        "github.com/codenotary/immudb/embedded/store"
28
        "github.com/codenotary/immudb/pkg/api/protomodel"
29

30
        "google.golang.org/protobuf/proto"
31
        "google.golang.org/protobuf/types/known/structpb"
32
)
33

34
const (
35
        DefaultDocumentIDField     = "_id"
36
        DocumentBLOBField          = "_doc"
37
        documentFieldPathSeparator = "."
38
)
39

40
var reservedWords = map[string]struct{}{
41
        "collection": {},
42
        "field":      {},
43
        "index":      {},
44
        "document":   {},
45
}
46

47
var collectionNameValidation = regexp.MustCompile(`^[a-zA-Z_]+[a-zA-Z0-9_\-]*$`)
48
var documentIDFieldNameValidation = regexp.MustCompile(`^[a-zA-Z_]+[a-zA-Z0-9_\-]*$`)
49
var fieldNameValidation = regexp.MustCompile(`^[a-zA-Z_]+[a-zA-Z0-9_\-.]*$`)
50

51
type Engine struct {
52
        sqlEngine *sql.Engine
53

54
        maxNestedFields int
55
}
56

57
type EncodedDocument struct {
58
        TxID            uint64
59
        Revision        uint64 // revision is only set when txID == 0 and info is fetch from the index
60
        KVMetadata      *store.KVMetadata
61
        EncodedDocument []byte
62
}
63

64
func NewEngine(store *store.ImmuStore, opts *Options) (*Engine, error) {
690✔
65
        err := opts.Validate()
690✔
66
        if err != nil {
691✔
67
                return nil, err
1✔
68
        }
1✔
69

70
        sqlOpts := sql.DefaultOptions().
689✔
71
                WithPrefix(opts.prefix).
689✔
72
                WithLazyIndexConstraintValidation(true)
689✔
73

689✔
74
        engine, err := sql.NewEngine(store, sqlOpts)
689✔
75
        if err != nil {
690✔
76
                return nil, err
1✔
77
        }
1✔
78

79
        return &Engine{
688✔
80
                sqlEngine:       engine,
688✔
81
                maxNestedFields: opts.maxNestedFields,
688✔
82
        }, nil
688✔
83
}
84

85
func validateCollectionName(collectionName string) error {
335✔
86
        _, isReservedWord := reservedWords[strings.ToLower(collectionName)]
335✔
87
        if isReservedWord {
336✔
88
                return fmt.Errorf("%w: invalid collection name '%s'", ErrReservedName, collectionName)
1✔
89
        }
1✔
90

91
        if !collectionNameValidation.MatchString(collectionName) {
348✔
92
                return fmt.Errorf("%w: invalid collection name '%s'", ErrIllegalArguments, collectionName)
14✔
93
        }
14✔
94

95
        return nil
320✔
96
}
97

98
func validateDocumentIdFieldName(documentIdFieldName string) error {
49✔
99
        _, isReservedWord := reservedWords[strings.ToLower(documentIdFieldName)]
49✔
100
        if isReservedWord {
51✔
101
                return fmt.Errorf("%w: invalid id field name '%s'", ErrReservedName, documentIdFieldName)
2✔
102
        }
2✔
103

104
        if documentIdFieldName == DocumentBLOBField {
48✔
105
                return fmt.Errorf("%w: invalid id field name '%s'", ErrReservedName, documentIdFieldName)
1✔
106
        }
1✔
107

108
        if !documentIDFieldNameValidation.MatchString(documentIdFieldName) {
47✔
109
                return fmt.Errorf("%w: invalid id field name '%s'", ErrIllegalArguments, documentIdFieldName)
1✔
110
        }
1✔
111

112
        return nil
45✔
113
}
114

115
func validateFieldName(fieldName string) error {
272✔
116
        _, isReservedWord := reservedWords[strings.ToLower(fieldName)]
272✔
117
        if isReservedWord {
274✔
118
                return fmt.Errorf("%w: invalid field name '%s'", ErrReservedName, fieldName)
2✔
119
        }
2✔
120

121
        if fieldName == DocumentBLOBField {
271✔
122
                return fmt.Errorf("%w: invalid field name '%s'", ErrReservedName, fieldName)
1✔
123
        }
1✔
124

125
        if !fieldNameValidation.MatchString(fieldName) {
276✔
126
                return fmt.Errorf("%w: invalid field name '%s'", ErrIllegalArguments, fieldName)
7✔
127
        }
7✔
128

129
        return nil
262✔
130
}
131

132
func (e *Engine) CreateCollection(ctx context.Context, username, name, documentIdFieldName string, fields []*protomodel.Field, indexes []*protomodel.Index) error {
47✔
133
        err := validateCollectionName(name)
47✔
134
        if err != nil {
50✔
135
                return err
3✔
136
        }
3✔
137

138
        if documentIdFieldName == "" {
84✔
139
                documentIdFieldName = DefaultDocumentIDField
40✔
140
        }
40✔
141

142
        err = validateDocumentIdFieldName(documentIdFieldName)
44✔
143
        if err != nil {
46✔
144
                return err
2✔
145
        }
2✔
146

147
        // only catalog needs to be up to date
148
        opts := sql.DefaultTxOptions().
42✔
149
                WithUnsafeMVCC(true).
42✔
150
                WithExtra([]byte(username)).
42✔
151
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
84✔
152
                WithSnapshotRenewalPeriod(0).
153
                WithExplicitClose(true)
154

155
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
42✔
156
        if err != nil {
42✔
157
                return mayTranslateError(err)
×
158
        }
×
159
        defer sqlTx.Cancel()
42✔
160

42✔
161
        columns := make([]*sql.ColSpec, 2+len(fields))
42✔
162

42✔
163
        // add primary key for document id
42✔
164
        columns[0] = sql.NewColSpec(documentIdFieldName, sql.BLOBType, MaxDocumentIDLength, false, true)
42✔
165

42✔
166
        // add columnn for blob, which stores the document as a whole
42✔
167
        columns[1] = sql.NewColSpec(DocumentBLOBField, sql.BLOBType, 0, false, false)
42✔
168

42✔
169
        for i, field := range fields {
144✔
170
                err = validateFieldName(field.Name)
102✔
171
                if err != nil {
105✔
172
                        return err
3✔
173
                }
3✔
174

175
                if field.Name == documentIdFieldName {
100✔
176
                        return fmt.Errorf("%w: id field name '%s' should not be specified", ErrIllegalArguments, field.Name)
1✔
177
                }
1✔
178

179
                sqlType, err := protomodelValueTypeToSQLValueType(field.Type)
98✔
180
                if err != nil {
98✔
181
                        return err
×
182
                }
×
183

184
                colLen, err := sqlValueTypeDefaultLength(sqlType)
98✔
185
                if err != nil {
98✔
186
                        return err
×
187
                }
×
188

189
                columns[i+2] = sql.NewColSpec(field.Name, sqlType, colLen, false, false)
98✔
190
        }
191

192
        _, _, err = e.sqlEngine.ExecPreparedStmts(
38✔
193
                ctx,
38✔
194
                sqlTx,
38✔
195
                []sql.SQLStmt{sql.NewCreateTableStmt(
38✔
196
                        name,
38✔
197
                        false,
38✔
198
                        columns,
38✔
199
                        []string{documentIdFieldName},
38✔
200
                )},
38✔
201
                nil,
38✔
202
        )
38✔
203
        if err != nil {
39✔
204
                return mayTranslateError(err)
1✔
205
        }
1✔
206

207
        var indexStmts []sql.SQLStmt
37✔
208

37✔
209
        for _, index := range indexes {
108✔
210
                if len(index.Fields) == 0 {
72✔
211
                        return fmt.Errorf("%w: no fields specified", ErrIllegalArguments)
1✔
212
                }
1✔
213

214
                if len(index.Fields) == 1 && index.Fields[0] == documentIdFieldName {
72✔
215
                        if !index.IsUnique {
3✔
216
                                return fmt.Errorf("%w: index on id field must be unique", ErrIllegalArguments)
1✔
217
                        }
1✔
218
                        // idField is the primary key and so the index is automatically created
219
                        continue
1✔
220
                }
221

222
                for _, field := range index.Fields {
138✔
223
                        err := validateFieldName(field)
70✔
224
                        if err != nil {
71✔
225
                                return err
1✔
226
                        }
1✔
227
                }
228

229
                indexStmts = append(indexStmts, sql.NewCreateIndexStmt(name, index.Fields, index.IsUnique))
67✔
230
        }
231

232
        // add indexes to collection
233
        if len(indexStmts) > 0 {
58✔
234
                _, _, err = e.sqlEngine.ExecPreparedStmts(
24✔
235
                        ctx,
24✔
236
                        sqlTx,
24✔
237
                        indexStmts,
24✔
238
                        nil,
24✔
239
                )
24✔
240
                if err != nil {
25✔
241
                        return mayTranslateError(err)
1✔
242
                }
1✔
243
        }
244

245
        err = sqlTx.Commit(ctx)
33✔
246
        return mayTranslateError(err)
33✔
247
}
248

249
func (e *Engine) GetCollection(ctx context.Context, collectionName string) (*protomodel.Collection, error) {
18✔
250
        opts := sql.DefaultTxOptions().
18✔
251
                WithReadOnly(true).
18✔
252
                WithExplicitClose(true)
18✔
253

18✔
254
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
18✔
255
        if err != nil {
18✔
256
                return nil, mayTranslateError(err)
×
257
        }
×
258
        defer sqlTx.Cancel()
18✔
259

18✔
260
        table, err := getTableForCollection(sqlTx, collectionName)
18✔
261
        if err != nil {
20✔
262
                return nil, err
2✔
263
        }
2✔
264

265
        return collectionFromTable(table), nil
16✔
266
}
267

268
func (e *Engine) GetCollections(ctx context.Context) ([]*protomodel.Collection, error) {
7✔
269
        opts := sql.DefaultTxOptions().
7✔
270
                WithReadOnly(true).
7✔
271
                WithExplicitClose(true)
7✔
272

7✔
273
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
7✔
274
        if err != nil {
7✔
275
                return nil, mayTranslateError(err)
×
276
        }
×
277
        defer sqlTx.Cancel()
7✔
278

7✔
279
        tables := sqlTx.Catalog().GetTables()
7✔
280

7✔
281
        collections := make([]*protomodel.Collection, len(tables))
7✔
282

7✔
283
        for i, table := range tables {
16✔
284
                collections[i] = collectionFromTable(table)
9✔
285
        }
9✔
286

287
        return collections, nil
7✔
288
}
289

290
func docIDFieldName(table *sql.Table) string {
165✔
291
        return table.PrimaryIndex().Cols()[0].Name()
165✔
292
}
165✔
293

294
func getTableForCollection(sqlTx *sql.SQLTx, collectionName string) (*sql.Table, error) {
233✔
295
        err := validateCollectionName(collectionName)
233✔
296
        if err != nil {
237✔
297
                return nil, err
4✔
298
        }
4✔
299

300
        table, err := sqlTx.Catalog().GetTableByName(collectionName)
229✔
301
        if errors.Is(err, sql.ErrTableDoesNotExist) {
232✔
302
                return nil, fmt.Errorf("%w (%s)", mayTranslateError(err), collectionName)
3✔
303
        }
3✔
304

305
        return table, mayTranslateError(err)
226✔
306
}
307

308
func getColumnForField(table *sql.Table, field string) (*sql.Column, error) {
71✔
309
        err := validateFieldName(field)
71✔
310
        if err != nil {
72✔
311
                return nil, err
1✔
312
        }
1✔
313

314
        column, err := table.GetColumnByName(field)
70✔
315
        if errors.Is(err, sql.ErrColumnDoesNotExist) {
71✔
316
                return nil, fmt.Errorf("%w (%s)", mayTranslateError(err), field)
1✔
317
        }
1✔
318

319
        return column, mayTranslateError(err)
69✔
320
}
321

322
func collectionFromTable(table *sql.Table) *protomodel.Collection {
25✔
323
        documentIdFieldName := docIDFieldName(table)
25✔
324

25✔
325
        indexes := table.GetIndexes()
25✔
326

25✔
327
        collection := &protomodel.Collection{
25✔
328
                Name:                table.Name(),
25✔
329
                DocumentIdFieldName: documentIdFieldName,
25✔
330
                Indexes:             make([]*protomodel.Index, len(indexes)),
25✔
331
        }
25✔
332

25✔
333
        for _, col := range table.Cols() {
167✔
334
                if col.Name() == DocumentBLOBField {
167✔
335
                        continue
25✔
336
                }
337

338
                var colType protomodel.FieldType
117✔
339

117✔
340
                if col.Name() == documentIdFieldName {
142✔
341
                        colType = protomodel.FieldType_STRING
25✔
342
                } else {
117✔
343
                        switch col.Type() {
92✔
344
                        case sql.BooleanType:
1✔
345
                                colType = protomodel.FieldType_BOOLEAN
1✔
346
                        case sql.VarcharType:
42✔
347
                                colType = protomodel.FieldType_STRING
42✔
348
                        case sql.UUIDType:
9✔
349
                                colType = protomodel.FieldType_UUID
9✔
350
                        case sql.IntegerType:
33✔
351
                                colType = protomodel.FieldType_INTEGER
33✔
352
                        case sql.Float64Type:
7✔
353
                                colType = protomodel.FieldType_DOUBLE
7✔
354
                        }
355
                }
356

357
                collection.Fields = append(collection.Fields, &protomodel.Field{
117✔
358
                        Name: col.Name(),
117✔
359
                        Type: colType,
117✔
360
                })
117✔
361
        }
362

363
        for i, index := range indexes {
88✔
364
                fields := make([]string, len(index.Cols()))
63✔
365

63✔
366
                for i, c := range index.Cols() {
126✔
367
                        fields[i] = c.Name()
63✔
368
                }
63✔
369

370
                collection.Indexes[i] = &protomodel.Index{
63✔
371
                        Fields:   fields,
63✔
372
                        IsUnique: index.IsUnique(),
63✔
373
                }
63✔
374
        }
375

376
        return collection
25✔
377
}
378

379
func (e *Engine) UpdateCollection(ctx context.Context, username, collectionName string, documentIdFieldName string) error {
8✔
380
        err := validateCollectionName(collectionName)
8✔
381
        if err != nil {
9✔
382
                return err
1✔
383
        }
1✔
384

385
        if documentIdFieldName != "" {
12✔
386
                err := validateDocumentIdFieldName(documentIdFieldName)
5✔
387
                if err != nil {
7✔
388
                        return err
2✔
389
                }
2✔
390
        }
391

392
        opts := sql.DefaultTxOptions().
5✔
393
                WithUnsafeMVCC(true).
5✔
394
                WithExtra([]byte(username)).
5✔
395
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
10✔
396
                WithSnapshotRenewalPeriod(0).
397
                WithExplicitClose(true)
398

399
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
5✔
400
        if err != nil {
5✔
401
                return mayTranslateError(err)
×
402
        }
×
403
        defer sqlTx.Cancel()
5✔
404

5✔
405
        table, err := getTableForCollection(sqlTx, collectionName)
5✔
406
        if err != nil {
6✔
407
                return err
1✔
408
        }
1✔
409

410
        currIDFieldName := docIDFieldName(table)
4✔
411

4✔
412
        if documentIdFieldName != "" && documentIdFieldName != currIDFieldName {
7✔
413
                _, _, err := e.sqlEngine.ExecPreparedStmts(
3✔
414
                        ctx,
3✔
415
                        sqlTx,
3✔
416
                        []sql.SQLStmt{
3✔
417
                                sql.NewRenameColumnStmt(
3✔
418
                                        collectionName,
3✔
419
                                        currIDFieldName,
3✔
420
                                        documentIdFieldName,
3✔
421
                                ),
3✔
422
                        },
3✔
423
                        nil,
3✔
424
                )
3✔
425
                if err != nil {
3✔
426
                        return mayTranslateError(err)
×
427
                }
×
428
        }
429

430
        err = sqlTx.Commit(ctx)
4✔
431
        return mayTranslateError(err)
4✔
432
}
433

434
// DeleteCollection deletes a collection.
435
func (e *Engine) DeleteCollection(ctx context.Context, username, collectionName string) error {
4✔
436
        err := validateCollectionName(collectionName)
4✔
437
        if err != nil {
5✔
438
                return err
1✔
439
        }
1✔
440

441
        opts := sql.DefaultTxOptions().
3✔
442
                WithUnsafeMVCC(true).
3✔
443
                WithExtra([]byte(username)).
3✔
444
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
6✔
445
                WithSnapshotRenewalPeriod(0).
446
                WithExplicitClose(true)
447

448
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
3✔
449
        if err != nil {
3✔
450
                return mayTranslateError(err)
×
451
        }
×
452
        defer sqlTx.Cancel()
3✔
453

3✔
454
        _, _, err = e.sqlEngine.ExecPreparedStmts(
3✔
455
                ctx,
3✔
456
                sqlTx,
3✔
457
                []sql.SQLStmt{
3✔
458
                        sql.NewDropTableStmt(collectionName), // delete collection from catalog
3✔
459
                },
3✔
460
                nil,
3✔
461
        )
3✔
462
        if err != nil {
3✔
463
                return mayTranslateError(err)
×
464
        }
×
465

466
        err = sqlTx.Commit(ctx)
3✔
467
        return mayTranslateError(err)
3✔
468
}
469

470
func (e *Engine) AddField(ctx context.Context, username, collectionName string, field *protomodel.Field) error {
12✔
471
        err := validateCollectionName(collectionName)
12✔
472
        if err != nil {
14✔
473
                return err
2✔
474
        }
2✔
475

476
        if field == nil {
11✔
477
                return fmt.Errorf("%w: no field specified", ErrIllegalArguments)
1✔
478
        }
1✔
479

480
        err = validateFieldName(field.Name)
9✔
481
        if err != nil {
11✔
482
                return err
2✔
483
        }
2✔
484

485
        sqlType, err := protomodelValueTypeToSQLValueType(field.Type)
7✔
486
        if err != nil {
8✔
487
                return err
1✔
488
        }
1✔
489

490
        colLen, err := sqlValueTypeDefaultLength(sqlType)
6✔
491
        if err != nil {
6✔
492
                return err
×
493
        }
×
494

495
        opts := sql.DefaultTxOptions().
6✔
496
                WithUnsafeMVCC(true).
6✔
497
                WithExtra([]byte(username)).
6✔
498
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
12✔
499
                WithSnapshotRenewalPeriod(0).
500
                WithExplicitClose(true)
501

502
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
6✔
503
        if err != nil {
6✔
504
                return mayTranslateError(err)
×
505
        }
×
506
        defer sqlTx.Cancel()
6✔
507

6✔
508
        colSpec := sql.NewColSpec(field.Name, sqlType, colLen, false, false)
6✔
509

6✔
510
        addColumnStmt := sql.NewAddColumnStmt(collectionName, colSpec)
6✔
511

6✔
512
        _, _, err = e.sqlEngine.ExecPreparedStmts(
6✔
513
                ctx,
6✔
514
                sqlTx,
6✔
515
                []sql.SQLStmt{addColumnStmt},
6✔
516
                nil,
6✔
517
        )
6✔
518
        if err != nil {
8✔
519
                return mayTranslateError(err)
2✔
520
        }
2✔
521

522
        err = sqlTx.Commit(ctx)
4✔
523
        return mayTranslateError(err)
4✔
524
}
525

526
func (e *Engine) RemoveField(ctx context.Context, username, collectionName string, fieldName string) error {
10✔
527
        err := validateCollectionName(collectionName)
10✔
528
        if err != nil {
11✔
529
                return err
1✔
530
        }
1✔
531

532
        err = validateFieldName(fieldName)
9✔
533
        if err != nil {
10✔
534
                return err
1✔
535
        }
1✔
536

537
        opts := sql.DefaultTxOptions().
8✔
538
                WithUnsafeMVCC(true).
8✔
539
                WithExtra([]byte(username)).
8✔
540
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
16✔
541
                WithSnapshotRenewalPeriod(0).
542
                WithExplicitClose(true)
543

544
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
8✔
545
        if err != nil {
8✔
546
                return mayTranslateError(err)
×
547
        }
×
548
        defer sqlTx.Cancel()
8✔
549

8✔
550
        dropColumnStmt := sql.NewDropColumnStmt(collectionName, fieldName)
8✔
551

8✔
552
        _, _, err = e.sqlEngine.ExecPreparedStmts(
8✔
553
                ctx,
8✔
554
                sqlTx,
8✔
555
                []sql.SQLStmt{dropColumnStmt},
8✔
556
                nil,
8✔
557
        )
8✔
558
        if err != nil {
12✔
559
                return mayTranslateError(err)
4✔
560
        }
4✔
561

562
        err = sqlTx.Commit(ctx)
4✔
563
        return mayTranslateError(err)
4✔
564
}
565

566
func (e *Engine) CreateIndex(ctx context.Context, username, collectionName string, fields []string, isUnique bool) error {
8✔
567
        err := validateCollectionName(collectionName)
8✔
568
        if err != nil {
9✔
569
                return err
1✔
570
        }
1✔
571

572
        if len(fields) == 0 {
8✔
573
                return fmt.Errorf("%w: no fields specified", ErrIllegalArguments)
1✔
574
        }
1✔
575

576
        opts := sql.DefaultTxOptions().
6✔
577
                WithUnsafeMVCC(true).
6✔
578
                WithExtra([]byte(username)).
6✔
579
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
12✔
580
                WithSnapshotRenewalPeriod(0).
581
                WithExplicitClose(true)
582

583
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
6✔
584
        if err != nil {
6✔
585
                return mayTranslateError(err)
×
586
        }
×
587
        defer sqlTx.Cancel()
6✔
588

6✔
589
        for _, field := range fields {
12✔
590
                err := validateFieldName(field)
6✔
591
                if err != nil {
7✔
592
                        return err
1✔
593
                }
1✔
594
        }
595

596
        createIndexStmt := sql.NewCreateIndexStmt(collectionName, fields, isUnique)
5✔
597

5✔
598
        _, _, err = e.sqlEngine.ExecPreparedStmts(
5✔
599
                ctx,
5✔
600
                sqlTx,
5✔
601
                []sql.SQLStmt{createIndexStmt},
5✔
602
                nil,
5✔
603
        )
5✔
604
        if err != nil {
6✔
605
                return mayTranslateError(err)
1✔
606
        }
1✔
607

608
        err = sqlTx.Commit(ctx)
4✔
609
        return mayTranslateError(err)
4✔
610
}
611

612
func (e *Engine) DeleteIndex(ctx context.Context, username, collectionName string, fields []string) error {
7✔
613
        err := validateCollectionName(collectionName)
7✔
614
        if err != nil {
8✔
615
                return err
1✔
616
        }
1✔
617

618
        if len(fields) == 0 {
7✔
619
                return fmt.Errorf("%w: no fields specified", ErrIllegalArguments)
1✔
620
        }
1✔
621

622
        opts := sql.DefaultTxOptions().
5✔
623
                WithUnsafeMVCC(true).
5✔
624
                WithExtra([]byte(username)).
5✔
625
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
10✔
626
                WithSnapshotRenewalPeriod(0).
627
                WithExplicitClose(true)
628

629
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
5✔
630
        if err != nil {
5✔
631
                return mayTranslateError(err)
×
632
        }
×
633
        defer sqlTx.Cancel()
5✔
634

5✔
635
        for _, field := range fields {
10✔
636
                err := validateFieldName(field)
5✔
637
                if err != nil {
6✔
638
                        return err
1✔
639
                }
1✔
640
        }
641

642
        dropIndexStmt := sql.NewDropIndexStmt(collectionName, fields)
4✔
643

4✔
644
        _, _, err = e.sqlEngine.ExecPreparedStmts(
4✔
645
                ctx,
4✔
646
                sqlTx,
4✔
647
                []sql.SQLStmt{dropIndexStmt},
4✔
648
                nil,
4✔
649
        )
4✔
650
        if err != nil {
4✔
651
                return mayTranslateError(err)
×
652
        }
×
653

654
        err = sqlTx.Commit(ctx)
4✔
655
        return mayTranslateError(err)
4✔
656
}
657

658
func (e *Engine) InsertDocument(ctx context.Context, username, collectionName string, doc *structpb.Struct) (txID uint64, docID DocumentID, err error) {
56✔
659
        txID, docIDs, err := e.InsertDocuments(ctx, username, collectionName, []*structpb.Struct{doc})
56✔
660
        if err != nil {
59✔
661
                return 0, nil, err
3✔
662
        }
3✔
663

664
        return txID, docIDs[0], nil
53✔
665
}
666

667
func (e *Engine) InsertDocuments(ctx context.Context, username, collectionName string, docs []*structpb.Struct) (txID uint64, docIDs []DocumentID, err error) {
120✔
668
        opts := sql.DefaultTxOptions().
120✔
669
                WithUnsafeMVCC(true).
120✔
670
                WithExtra([]byte(username)).
120✔
671
                WithSnapshotMustIncludeTxID(func(lastPrecommittedTxID uint64) uint64 { return 0 }).
739✔
672
                WithSnapshotRenewalPeriod(0)
673

674
        sqlTx, err := e.sqlEngine.NewTx(ctx, opts)
120✔
675
        if err != nil {
120✔
676
                return 0, nil, mayTranslateError(err)
×
677
        }
×
678
        defer sqlTx.Cancel()
120✔
679

120✔
680
        return e.upsertDocuments(ctx, sqlTx, collectionName, docs, true)
120✔
681
}
682

683
func (e *Engine) upsertDocuments(ctx context.Context, sqlTx *sql.SQLTx, collectionName string, docs []*structpb.Struct, isInsert bool) (txID uint64, docIDs []DocumentID, err error) {
126✔
684
        if len(docs) == 0 {
129✔
685
                return 0, nil, fmt.Errorf("%w: no document specified", ErrIllegalArguments)
3✔
686
        }
3✔
687

688
        table, err := getTableForCollection(sqlTx, collectionName)
123✔
689
        if err != nil {
124✔
690
                return 0, nil, err
1✔
691
        }
1✔
692

693
        docIDFieldName := docIDFieldName(table)
122✔
694

122✔
695
        colNames := make([]string, len(table.Cols()))
122✔
696

122✔
697
        for i, col := range table.Cols() {
663✔
698
                colNames[i] = col.Name()
541✔
699
        }
541✔
700

701
        docIDs = make([]DocumentID, len(docs))
122✔
702

122✔
703
        rows := make([]*sql.RowSpec, len(docs))
122✔
704

122✔
705
        for i, doc := range docs {
253✔
706
                if doc == nil || len(doc.Fields) == 0 {
132✔
707
                        doc = &structpb.Struct{Fields: make(map[string]*structpb.Value)}
1✔
708
                }
1✔
709

710
                _, blobFieldProvisioned := doc.Fields[DocumentBLOBField]
131✔
711
                if blobFieldProvisioned {
132✔
712
                        return 0, nil, fmt.Errorf("%w(%s)", ErrReservedName, DocumentBLOBField)
1✔
713
                }
1✔
714

715
                var docID DocumentID
130✔
716

130✔
717
                provisionedDocID, docIDProvisioned := doc.Fields[docIDFieldName]
130✔
718
                if docIDProvisioned {
137✔
719
                        if isInsert {
8✔
720
                                return 0, nil, fmt.Errorf("%w: field (%s) should NOT be specified when inserting a document", ErrIllegalArguments, docIDFieldName)
1✔
721
                        }
1✔
722

723
                        docID, err = NewDocumentIDFromHexEncodedString(provisionedDocID.GetStringValue())
6✔
724
                        if err != nil {
6✔
725
                                return 0, nil, err
×
726
                        }
×
727
                } else {
123✔
728
                        if !isInsert {
123✔
729
                                return 0, nil, fmt.Errorf("%w: field (%s) should be specified when updating a document", ErrIllegalArguments, docIDFieldName)
×
730
                        }
×
731

732
                        // generate document id
733
                        docID = NewDocumentIDFromTx(e.sqlEngine.GetStore().LastPrecommittedTxID())
123✔
734
                        doc.Fields[docIDFieldName] = structpb.NewStringValue(docID.EncodeToHexString())
123✔
735
                }
736

737
                rowSpec, err := e.generateRowSpecForDocument(table, doc)
129✔
738
                if err != nil {
129✔
739
                        return 0, nil, err
×
740
                }
×
741

742
                docIDs[i] = docID
129✔
743
                rows[i] = rowSpec
129✔
744
        }
745

746
        // add documents to collection
747
        _, ctxs, err := e.sqlEngine.ExecPreparedStmts(
120✔
748
                ctx,
120✔
749
                sqlTx,
120✔
750
                []sql.SQLStmt{
120✔
751
                        sql.NewUpserIntoStmt(
120✔
752
                                collectionName,
120✔
753
                                colNames,
120✔
754
                                rows,
120✔
755
                                isInsert,
120✔
756
                                nil,
120✔
757
                        ),
120✔
758
                },
120✔
759
                nil,
120✔
760
        )
120✔
761
        if err != nil {
120✔
762
                return 0, nil, mayTranslateError(err)
×
763
        }
×
764

765
        txID = ctxs[0].TxHeader().ID
120✔
766

120✔
767
        return txID, docIDs, nil
120✔
768
}
769

770
func (e *Engine) generateRowSpecForDocument(table *sql.Table, doc *structpb.Struct) (*sql.RowSpec, error) {
129✔
771
        values := make([]sql.ValueExp, len(table.Cols()))
129✔
772

129✔
773
        for i, col := range table.Cols() {
694✔
774
                if col.Name() == DocumentBLOBField {
694✔
775
                        bs, err := proto.Marshal(doc)
129✔
776
                        if err != nil {
129✔
777
                                return nil, err
×
778
                        }
×
779

780
                        values[i] = sql.NewBlob(bs)
129✔
781
                        continue
129✔
782
                }
783

784
                rval, err := e.structValueFromFieldPath(doc, col.Name())
436✔
785
                if err != nil && !errors.Is(err, ErrFieldDoesNotExist) {
436✔
786
                        return nil, fmt.Errorf("%w: field: %s", err, col.Name())
×
787
                }
×
788

789
                if rval == nil {
451✔
790
                        values[i] = &sql.NullValue{}
15✔
791
                } else {
436✔
792
                        val, err := structValueToSqlValue(rval, col.Type())
421✔
793
                        if err != nil {
421✔
794
                                return nil, fmt.Errorf("%w: field: %s", err, col.Name())
×
795
                        }
×
796
                        values[i] = val
421✔
797
                }
798
        }
799

800
        return sql.NewRowSpec(values), nil
129✔
801
}
802

803
func (e *Engine) structValueFromFieldPath(doc *structpb.Struct, fieldPath string) (*structpb.Value, error) {
436✔
804
        nestedStruct := doc
436✔
805
        nestedFields := strings.SplitN(fieldPath, documentFieldPathSeparator, e.maxNestedFields)
436✔
806

436✔
807
        for i, field := range nestedFields {
886✔
808
                rval, ok := nestedStruct.Fields[field]
450✔
809
                if !ok {
465✔
810
                        return nil, fmt.Errorf("%w('%s'): while reading nested field '%s'", ErrFieldDoesNotExist, fieldPath, field)
15✔
811
                }
15✔
812

813
                if i == len(nestedFields)-1 {
856✔
814
                        return rval, nil
421✔
815
                }
421✔
816

817
                nestedStruct = rval.GetStructValue()
14✔
818
                if nestedStruct == nil {
14✔
819
                        return nil, fmt.Errorf("%w('%s'): while reading nested field '%s'", ErrFieldDoesNotExist, fieldPath, field)
×
820
                }
×
821
        }
822

823
        return nil, fmt.Errorf("%w('%s')", ErrFieldDoesNotExist, fieldPath)
×
824
}
825

826
func (e *Engine) ReplaceDocuments(ctx context.Context, username string, query *protomodel.Query, doc *structpb.Struct) (revisions []*protomodel.DocumentAtRevision, err error) {
10✔
827
        if query == nil {
11✔
828
                return nil, ErrIllegalArguments
1✔
829
        }
1✔
830

831
        if doc == nil || len(doc.Fields) == 0 {
10✔
832
                doc = &structpb.Struct{
1✔
833
                        Fields: make(map[string]*structpb.Value),
1✔
834
                }
1✔
835
        }
1✔
836

837
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithExtra([]byte(username)))
9✔
838
        if err != nil {
9✔
839
                return nil, mayTranslateError(err)
×
840
        }
×
841
        defer sqlTx.Cancel()
9✔
842

9✔
843
        table, err := getTableForCollection(sqlTx, query.CollectionName)
9✔
844
        if err != nil {
10✔
845
                return nil, err
1✔
846
        }
1✔
847

848
        documentIdFieldName := docIDFieldName(table)
8✔
849

8✔
850
        provisionedDocID, docIDProvisioned := doc.Fields[documentIdFieldName]
8✔
851
        if docIDProvisioned {
11✔
852
                // inject id comparisson into query
3✔
853
                idFieldComparisson := &protomodel.FieldComparison{
3✔
854
                        Field:    documentIdFieldName,
3✔
855
                        Operator: protomodel.ComparisonOperator_EQ,
3✔
856
                        Value:    provisionedDocID,
3✔
857
                }
3✔
858

3✔
859
                if len(query.Expressions) == 0 {
4✔
860
                        query.Expressions = []*protomodel.QueryExpression{
1✔
861
                                {
1✔
862
                                        FieldComparisons: []*protomodel.FieldComparison{
1✔
863
                                                idFieldComparisson,
1✔
864
                                        },
1✔
865
                                },
1✔
866
                        }
1✔
867
                } else {
3✔
868
                        // id comparisson as a first comparisson might result in faster evaluation
2✔
869
                        // note it mas be added into every expression
2✔
870
                        for _, exp := range query.Expressions {
4✔
871
                                exp.FieldComparisons = append([]*protomodel.FieldComparison{idFieldComparisson}, exp.FieldComparisons...)
2✔
872
                        }
2✔
873
                }
874
        }
875

876
        queryCondition, err := generateSQLFilteringExpression(query.Expressions, table)
8✔
877
        if err != nil {
8✔
878
                return nil, err
×
879
        }
×
880

881
        queryStmt := sql.NewSelectStmt(
8✔
882
                []sql.Selector{sql.NewColSelector(query.CollectionName, documentIdFieldName)},
8✔
883
                sql.NewTableRef(query.CollectionName, ""),
8✔
884
                queryCondition,
8✔
885
                generateSQLOrderByClauses(table, query.OrderBy),
8✔
886
                sql.NewInteger(int64(query.Limit)),
8✔
887
                nil,
8✔
888
        )
8✔
889

8✔
890
        r, err := e.sqlEngine.QueryPreparedStmt(ctx, sqlTx, queryStmt, nil)
8✔
891
        if err != nil {
8✔
892
                return nil, mayTranslateError(err)
×
893
        }
×
894

895
        var docs []*structpb.Struct
8✔
896

8✔
897
        for {
22✔
898
                row, err := r.Read(ctx)
14✔
899
                if err != nil {
22✔
900
                        r.Close()
8✔
901

8✔
902
                        if errors.Is(err, sql.ErrNoMoreRows) {
16✔
903
                                break
8✔
904
                        }
905

906
                        return nil, mayTranslateError(err)
×
907
                }
908

909
                val := row.ValuesByPosition[0].RawValue().([]byte)
6✔
910
                docID, err := NewDocumentIDFromRawBytes(val)
6✔
911
                if err != nil {
6✔
912
                        return nil, err
×
913
                }
×
914

915
                newDoc, err := structpb.NewStruct(doc.AsMap())
6✔
916
                if err != nil {
6✔
917
                        return nil, err
×
918
                }
×
919

920
                if !docIDProvisioned {
10✔
921
                        // add id field to updated document
4✔
922
                        newDoc.Fields[documentIdFieldName] = structpb.NewStringValue(docID.EncodeToHexString())
4✔
923
                }
4✔
924

925
                docs = append(docs, newDoc)
6✔
926
        }
927

928
        r.Close()
8✔
929

8✔
930
        if len(docs) == 0 {
10✔
931
                return nil, nil
2✔
932
        }
2✔
933

934
        txID, docIDs, err := e.upsertDocuments(ctx, sqlTx, query.CollectionName, docs, false)
6✔
935
        if err != nil {
6✔
936
                return nil, err
×
937
        }
×
938

939
        for _, docID := range docIDs {
12✔
940
                // fetch revision
6✔
941
                searchKey, err := e.getKeyForDocument(ctx, sqlTx, query.CollectionName, docID)
6✔
942
                if err != nil {
6✔
943
                        return nil, err
×
944
                }
×
945

946
                encDoc, err := e.getEncodedDocument(ctx, searchKey, txID)
6✔
947
                if err != nil {
6✔
948
                        return nil, err
×
949
                }
×
950

951
                revisions = append(revisions, &protomodel.DocumentAtRevision{
6✔
952
                        TransactionId: txID,
6✔
953
                        DocumentId:    docID.EncodeToHexString(),
6✔
954
                        Revision:      encDoc.Revision,
6✔
955
                        Metadata:      kvMetadataToProto(encDoc.KVMetadata),
6✔
956
                })
6✔
957
        }
958

959
        return revisions, nil
6✔
960
}
961

962
func (e *Engine) GetDocuments(ctx context.Context, query *protomodel.Query, offset int64) (DocumentReader, error) {
34✔
963
        if query == nil {
35✔
964
                return nil, ErrIllegalArguments
1✔
965
        }
1✔
966

967
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithReadOnly(true))
33✔
968
        if err != nil {
33✔
969
                return nil, mayTranslateError(err)
×
970
        }
×
971

972
        table, err := getTableForCollection(sqlTx, query.CollectionName)
33✔
973
        if err != nil {
33✔
974
                defer sqlTx.Cancel()
×
975
                return nil, err
×
976
        }
×
977

978
        queryCondition, err := generateSQLFilteringExpression(query.Expressions, table)
33✔
979
        if err != nil {
35✔
980
                defer sqlTx.Cancel()
2✔
981
                return nil, err
2✔
982
        }
2✔
983

984
        op := sql.NewSelectStmt(
31✔
985
                []sql.Selector{sql.NewColSelector(query.CollectionName, DocumentBLOBField)},
31✔
986
                sql.NewTableRef(query.CollectionName, ""),
31✔
987
                queryCondition,
31✔
988
                generateSQLOrderByClauses(table, query.OrderBy),
31✔
989
                sql.NewInteger(int64(query.Limit)),
31✔
990
                sql.NewInteger(offset),
31✔
991
        )
31✔
992

31✔
993
        // returning an open reader here, so the caller HAS to close it
31✔
994
        r, err := e.sqlEngine.QueryPreparedStmt(ctx, sqlTx, op, nil)
31✔
995
        if err != nil {
31✔
996
                defer sqlTx.Cancel()
×
997
                return nil, err
×
998
        }
×
999

1000
        return newDocumentReader(r, func(_ DocumentReader) { sqlTx.Cancel() }), nil
62✔
1001
}
1002

1003
func (e *Engine) CountDocuments(ctx context.Context, query *protomodel.Query, offset int64) (int64, error) {
18✔
1004
        if query == nil {
19✔
1005
                return 0, ErrIllegalArguments
1✔
1006
        }
1✔
1007

1008
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithReadOnly(true))
17✔
1009
        if err != nil {
17✔
1010
                return 0, mayTranslateError(err)
×
1011
        }
×
1012

1013
        defer sqlTx.Cancel()
17✔
1014

17✔
1015
        table, err := getTableForCollection(sqlTx, query.CollectionName)
17✔
1016
        if err != nil {
18✔
1017
                return 0, err
1✔
1018
        }
1✔
1019

1020
        queryCondition, err := generateSQLFilteringExpression(query.Expressions, table)
16✔
1021
        if err != nil {
16✔
1022
                return 0, err
×
1023
        }
×
1024

1025
        ds := sql.NewSelectStmt(
16✔
1026
                []sql.Selector{sql.NewColSelector(query.CollectionName, table.Cols()[0].Name())},
16✔
1027
                sql.NewTableRef(query.CollectionName, ""),
16✔
1028
                queryCondition,
16✔
1029
                generateSQLOrderByClauses(table, query.OrderBy),
16✔
1030
                sql.NewInteger(int64(query.Limit)),
16✔
1031
                sql.NewInteger(offset),
16✔
1032
        )
16✔
1033

16✔
1034
        op := sql.NewSelectStmt(
16✔
1035
                []sql.Selector{sql.NewAggColSelector(sql.COUNT, query.CollectionName, "*")},
16✔
1036
                ds,
16✔
1037
                nil,
16✔
1038
                nil,
16✔
1039
                nil,
16✔
1040
                nil,
16✔
1041
        )
16✔
1042

16✔
1043
        r, err := e.sqlEngine.QueryPreparedStmt(ctx, sqlTx, op, nil)
16✔
1044
        if err != nil {
16✔
1045
                return 0, err
×
1046
        }
×
1047

1048
        defer r.Close()
16✔
1049

16✔
1050
        row, err := r.Read(ctx)
16✔
1051
        if err != nil {
16✔
1052
                return 0, err
×
1053
        }
×
1054

1055
        return row.ValuesByPosition[0].RawValue().(int64), nil
16✔
1056
}
1057

1058
func (e *Engine) GetEncodedDocument(ctx context.Context, collectionName string, docID DocumentID, txID uint64) (collectionID uint32, documentIdFieldName string, encodedDoc *EncodedDocument, err error) {
6✔
1059
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithReadOnly(true))
6✔
1060
        if err != nil {
6✔
1061
                return 0, "", nil, mayTranslateError(err)
×
1062
        }
×
1063
        defer sqlTx.Cancel()
6✔
1064

6✔
1065
        table, err := getTableForCollection(sqlTx, collectionName)
6✔
1066
        if err != nil {
6✔
1067
                return 0, "", nil, err
×
1068
        }
×
1069

1070
        searchKey, err := e.getKeyForDocument(ctx, sqlTx, collectionName, docID)
6✔
1071
        if err != nil {
6✔
1072
                return 0, "", nil, err
×
1073
        }
×
1074

1075
        encodedDoc, err = e.getEncodedDocument(ctx, searchKey, txID)
6✔
1076
        if err != nil {
6✔
1077
                return 0, "", nil, err
×
1078
        }
×
1079

1080
        return table.ID(), docIDFieldName(table), encodedDoc, nil
6✔
1081
}
1082

1083
// AuditDocument returns the audit history of a document.
1084
func (e *Engine) AuditDocument(ctx context.Context, collectionName string, docID DocumentID, desc bool, offset uint64, limit int, includePayload bool) ([]*protomodel.DocumentAtRevision, error) {
6✔
1085
        err := validateCollectionName(collectionName)
6✔
1086
        if err != nil {
7✔
1087
                return nil, err
1✔
1088
        }
1✔
1089

1090
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithReadOnly(true))
5✔
1091
        if err != nil {
5✔
1092
                return nil, mayTranslateError(err)
×
1093
        }
×
1094
        defer sqlTx.Cancel()
5✔
1095

5✔
1096
        searchKey, err := e.getKeyForDocument(ctx, sqlTx, collectionName, docID)
5✔
1097
        if err != nil {
5✔
1098
                return nil, err
×
1099
        }
×
1100

1101
        valRefs, _, err := e.sqlEngine.GetStore().History(searchKey, uint64(offset), desc, limit)
5✔
1102
        if err != nil {
5✔
1103
                return nil, err
×
1104
        }
×
1105

1106
        results := make([]*protomodel.DocumentAtRevision, 0)
5✔
1107

5✔
1108
        for _, valRef := range valRefs {
15✔
1109
                docAtRevision, err := e.getDocument(searchKey, valRef, includePayload)
10✔
1110
                if err != nil {
10✔
1111
                        return nil, err
×
1112
                }
×
1113

1114
                hdr, err := e.sqlEngine.GetStore().ReadTxHeader(valRef.Tx(), false, false)
10✔
1115
                if err != nil {
10✔
1116
                        return nil, err
×
1117
                }
×
1118

1119
                docAtRevision.DocumentId = docID.EncodeToHexString()
10✔
1120
                docAtRevision.Ts = hdr.Ts
10✔
1121
                docAtRevision.Revision = valRef.HC()
10✔
1122

10✔
1123
                results = append(results, docAtRevision)
10✔
1124
        }
1125

1126
        return results, nil
5✔
1127
}
1128

1129
// generateSQLFilteringExpression generates a boolean expression in Disjunctive Normal Form from a list of expressions
1130
func generateSQLFilteringExpression(expressions []*protomodel.QueryExpression, table *sql.Table) (sql.ValueExp, error) {
61✔
1131
        var outerExp sql.ValueExp
61✔
1132

61✔
1133
        for i, exp := range expressions {
114✔
1134
                if len(exp.FieldComparisons) == 0 {
53✔
1135
                        return nil, fmt.Errorf("%w: query expression without any field comparisson", ErrIllegalArguments)
×
1136
                }
×
1137

1138
                var innerExp sql.ValueExp
53✔
1139

53✔
1140
                for i, exp := range exp.FieldComparisons {
124✔
1141
                        column, err := getColumnForField(table, exp.Field)
71✔
1142
                        if err != nil {
73✔
1143
                                return nil, err
2✔
1144
                        }
2✔
1145

1146
                        value, err := structValueToSqlValue(exp.Value, column.Type())
69✔
1147
                        if err != nil {
69✔
1148
                                return nil, err
×
1149
                        }
×
1150

1151
                        colSelector := sql.NewColSelector(table.Name(), exp.Field)
69✔
1152

69✔
1153
                        var fieldExp sql.ValueExp
69✔
1154

69✔
1155
                        switch exp.Operator {
69✔
1156
                        case protomodel.ComparisonOperator_LIKE:
1✔
1157
                                {
2✔
1158
                                        fieldExp = sql.NewLikeBoolExp(colSelector, false, value)
1✔
1159
                                }
1✔
1160
                        case protomodel.ComparisonOperator_NOT_LIKE:
2✔
1161
                                {
4✔
1162
                                        fieldExp = sql.NewLikeBoolExp(colSelector, true, value)
2✔
1163
                                }
2✔
1164
                        default:
66✔
1165
                                {
132✔
1166
                                        sqlCmpOp, err := sqlCmpOperatorFor(exp.Operator)
66✔
1167
                                        if err != nil {
66✔
1168
                                                return nil, err
×
1169
                                        }
×
1170

1171
                                        fieldExp = sql.NewCmpBoolExp(sqlCmpOp, colSelector, value)
66✔
1172
                                }
1173
                        }
1174

1175
                        if i == 0 {
120✔
1176
                                innerExp = fieldExp
51✔
1177
                        } else {
69✔
1178
                                innerExp = sql.NewBinBoolExp(sql.AND, innerExp, fieldExp)
18✔
1179
                        }
18✔
1180
                }
1181

1182
                if i == 0 {
102✔
1183
                        outerExp = innerExp
51✔
1184
                } else {
51✔
1185
                        outerExp = sql.NewBinBoolExp(sql.OR, outerExp, innerExp)
×
1186
                }
×
1187
        }
1188

1189
        return outerExp, nil
59✔
1190
}
1191

1192
func sqlCmpOperatorFor(op protomodel.ComparisonOperator) (sql.CmpOperator, error) {
66✔
1193
        switch op {
66✔
1194
        case protomodel.ComparisonOperator_EQ:
31✔
1195
                {
62✔
1196
                        return sql.EQ, nil
31✔
1197
                }
31✔
1198
        case protomodel.ComparisonOperator_NE:
10✔
1199
                {
20✔
1200
                        return sql.NE, nil
10✔
1201
                }
10✔
1202
        case protomodel.ComparisonOperator_LT:
4✔
1203
                {
8✔
1204
                        return sql.LT, nil
4✔
1205
                }
4✔
1206
        case protomodel.ComparisonOperator_LE:
3✔
1207
                {
6✔
1208
                        return sql.LE, nil
3✔
1209
                }
3✔
1210
        case protomodel.ComparisonOperator_GT:
6✔
1211
                {
12✔
1212
                        return sql.GT, nil
6✔
1213
                }
6✔
1214
        case protomodel.ComparisonOperator_GE:
12✔
1215
                {
24✔
1216
                        return sql.GE, nil
12✔
1217
                }
12✔
1218
        default:
×
1219
                {
×
1220
                        return 0, fmt.Errorf("%w: unsupported operator ('%s')", ErrIllegalArguments, op)
×
1221
                }
×
1222
        }
1223
}
1224

1225
func (e *Engine) getKeyForDocument(ctx context.Context, sqlTx *sql.SQLTx, collectionName string, documentID DocumentID) ([]byte, error) {
17✔
1226
        table, err := getTableForCollection(sqlTx, collectionName)
17✔
1227
        if err != nil {
17✔
1228
                return nil, err
×
1229
        }
×
1230

1231
        var searchKey []byte
17✔
1232

17✔
1233
        valbuf := bytes.Buffer{}
17✔
1234

17✔
1235
        rval := sql.NewBlob(documentID[:])
17✔
1236
        encVal, _, err := sql.EncodeRawValueAsKey(rval.RawValue(), sql.BLOBType, MaxDocumentIDLength)
17✔
1237
        if err != nil {
17✔
1238
                return nil, err
×
1239
        }
×
1240
        _, err = valbuf.Write(encVal)
17✔
1241
        if err != nil {
17✔
1242
                return nil, err
×
1243
        }
×
1244

1245
        pkEncVals := valbuf.Bytes()
17✔
1246

17✔
1247
        searchKey = sql.MapKey(
17✔
1248
                e.sqlEngine.GetPrefix(),
17✔
1249
                sql.MappedPrefix,
17✔
1250
                sql.EncodeID(table.ID()),
17✔
1251
                sql.EncodeID(table.PrimaryIndex().ID()),
17✔
1252
                pkEncVals,
17✔
1253
                pkEncVals,
17✔
1254
        )
17✔
1255

17✔
1256
        return searchKey, nil
17✔
1257
}
1258

1259
func (e *Engine) getDocument(key []byte, valRef store.ValueRef, includePayload bool) (docAtRevision *protomodel.DocumentAtRevision, err error) {
10✔
1260
        var encodedDocVal []byte
10✔
1261

10✔
1262
        if includePayload {
18✔
1263
                encodedDocVal, err = valRef.Resolve()
8✔
1264
                if err != nil {
8✔
1265
                        return nil, mayTranslateError(err)
×
1266
                }
×
1267
        }
1268

1269
        encDoc := &EncodedDocument{
10✔
1270
                TxID:            valRef.Tx(),
10✔
1271
                Revision:        valRef.HC(),
10✔
1272
                KVMetadata:      valRef.KVMetadata(),
10✔
1273
                EncodedDocument: encodedDocVal,
10✔
1274
        }
10✔
1275

10✔
1276
        var username string
10✔
1277

10✔
1278
        if valRef.TxMetadata() != nil {
20✔
1279
                username = string(valRef.TxMetadata().Extra())
10✔
1280
        }
10✔
1281

1282
        if encDoc.KVMetadata != nil && encDoc.KVMetadata.Deleted() {
11✔
1283
                return &protomodel.DocumentAtRevision{
1✔
1284
                        TransactionId: encDoc.TxID,
1✔
1285
                        Username:      username,
1✔
1286
                        Metadata:      kvMetadataToProto(encDoc.KVMetadata),
1✔
1287
                }, nil
1✔
1288
        }
1✔
1289

1290
        var doc *structpb.Struct
9✔
1291

9✔
1292
        if includePayload {
16✔
1293
                voff := sql.EncLenLen + sql.EncIDLen
7✔
1294

7✔
1295
                // DocumentIDField
7✔
1296
                _, n, err := sql.DecodeValue(encDoc.EncodedDocument[voff:], sql.BLOBType)
7✔
1297
                if err != nil {
7✔
1298
                        return nil, mayTranslateError(err)
×
1299
                }
×
1300

1301
                voff += n + sql.EncIDLen
7✔
1302

7✔
1303
                // DocumentBLOBField
7✔
1304
                encodedDoc, _, err := sql.DecodeValue(encDoc.EncodedDocument[voff:], sql.BLOBType)
7✔
1305
                if err != nil {
7✔
1306
                        return nil, mayTranslateError(err)
×
1307
                }
×
1308

1309
                docBytes := encodedDoc.RawValue().([]byte)
7✔
1310

7✔
1311
                doc = &structpb.Struct{}
7✔
1312
                err = proto.Unmarshal(docBytes, doc)
7✔
1313
                if err != nil {
7✔
1314
                        return nil, err
×
1315
                }
×
1316
        }
1317

1318
        return &protomodel.DocumentAtRevision{
9✔
1319
                TransactionId: encDoc.TxID,
9✔
1320
                Username:      username,
9✔
1321
                Metadata:      kvMetadataToProto(encDoc.KVMetadata),
9✔
1322
                Document:      doc,
9✔
1323
        }, err
9✔
1324
}
1325

1326
func (e *Engine) getEncodedDocument(ctx context.Context, key []byte, atTx uint64) (encDoc *EncodedDocument, err error) {
12✔
1327
        if atTx > e.sqlEngine.GetStore().LastPrecommittedTxID() {
12✔
1328
                return nil, store.ErrTxNotFound
×
1329
        }
×
1330

1331
        err = e.sqlEngine.GetStore().WaitForIndexingUpto(ctx, atTx)
12✔
1332
        if err != nil {
12✔
1333
                return nil, err
×
1334
        }
×
1335

1336
        var valRef store.ValueRef
12✔
1337

12✔
1338
        if atTx == 0 {
16✔
1339
                valRef, err = e.sqlEngine.GetStore().Get(ctx, key)
4✔
1340
        } else {
12✔
1341
                valRef, err = e.sqlEngine.GetStore().GetBetween(ctx, key, atTx, atTx)
8✔
1342
        }
8✔
1343
        if errors.Is(err, store.ErrKeyNotFound) {
12✔
1344
                return nil, ErrDocumentNotFound
×
1345
        }
×
1346
        if err != nil {
12✔
1347
                return nil, mayTranslateError(err)
×
1348
        }
×
1349

1350
        encodedDoc, err := valRef.Resolve()
12✔
1351
        if err != nil {
12✔
1352
                return nil, mayTranslateError(err)
×
1353
        }
×
1354

1355
        return &EncodedDocument{
12✔
1356
                TxID:            valRef.Tx(),
12✔
1357
                Revision:        valRef.HC(),
12✔
1358
                KVMetadata:      valRef.KVMetadata(),
12✔
1359
                EncodedDocument: encodedDoc,
12✔
1360
        }, err
12✔
1361
}
1362

1363
// DeleteDocuments deletes documents matching the query
1364
func (e *Engine) DeleteDocuments(ctx context.Context, username string, query *protomodel.Query) error {
6✔
1365
        if query == nil {
7✔
1366
                return ErrIllegalArguments
1✔
1367
        }
1✔
1368

1369
        sqlTx, err := e.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithExtra([]byte(username)))
5✔
1370
        if err != nil {
5✔
1371
                return mayTranslateError(err)
×
1372
        }
×
1373
        defer sqlTx.Cancel()
5✔
1374

5✔
1375
        table, err := getTableForCollection(sqlTx, query.CollectionName)
5✔
1376
        if err != nil {
6✔
1377
                return err
1✔
1378
        }
1✔
1379

1380
        queryCondition, err := generateSQLFilteringExpression(query.Expressions, table)
4✔
1381
        if err != nil {
4✔
1382
                return err
×
1383
        }
×
1384

1385
        // Delete a single document matching the query
1386
        deleteStmt := sql.NewDeleteFromStmt(
4✔
1387
                table.Name(),
4✔
1388
                queryCondition,
4✔
1389
                generateSQLOrderByClauses(table, query.OrderBy),
4✔
1390
                sql.NewInteger(int64(query.Limit)),
4✔
1391
        )
4✔
1392

4✔
1393
        _, _, err = e.sqlEngine.ExecPreparedStmts(
4✔
1394
                ctx,
4✔
1395
                sqlTx,
4✔
1396
                []sql.SQLStmt{deleteStmt},
4✔
1397
                nil,
4✔
1398
        )
4✔
1399
        if err != nil {
4✔
1400
                return mayTranslateError(err)
×
1401
        }
×
1402

1403
        return nil
4✔
1404
}
1405

1406
// CopyCatalogToTx copies the current sql catalog to the ongoing transaction.
1407
func (e *Engine) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
28✔
1408
        return e.sqlEngine.CopyCatalogToTx(ctx, tx)
28✔
1409
}
28✔
1410

1411
func generateSQLOrderByClauses(table *sql.Table, orderBy []*protomodel.OrderByClause) (ordCols []*sql.OrdCol) {
59✔
1412
        for _, col := range orderBy {
60✔
1413
                ordCols = append(ordCols, sql.NewOrdCol(table.Name(), col.Field, col.Desc))
1✔
1414
        }
1✔
1415
        return ordCols
59✔
1416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc