• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9367064922

04 Jun 2024 12:27PM UTC coverage: 89.43% (-0.02%) from 89.451%
9367064922

push

gh-ci

ostafen
Add support for JSON type

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

521 of 575 new or added lines in 14 files covered. (90.61%)

12 existing lines in 5 files now uncovered.

35172 of 39329 relevant lines covered (89.43%)

160547.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.14
/embedded/sql/row_reader.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
type RowReader interface {
31
        Tx() *SQLTx
32
        TableAlias() string
33
        Parameters() map[string]interface{}
34
        Read(ctx context.Context) (*Row, error)
35
        Close() error
36
        Columns(ctx context.Context) ([]ColDescriptor, error)
37
        OrderBy() []ColDescriptor
38
        ScanSpecs() *ScanSpecs
39
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
40
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
41
        onClose(func())
42
}
43

44
type ScanSpecs struct {
45
        Index              *Index
46
        rangesByColID      map[uint32]*typedValueRange
47
        IncludeHistory     bool
48
        IncludeTxMetadata  bool
49
        DescOrder          bool
50
        groupBySortColumns []*OrdCol
51
        orderBySortCols    []*OrdCol
52
}
53

54
func (s *ScanSpecs) extraCols() int {
17,757✔
55
        n := 0
17,757✔
56
        if s.IncludeHistory {
17,779✔
57
                n++
22✔
58
        }
22✔
59

60
        if s.IncludeTxMetadata {
17,772✔
61
                n++
15✔
62
        }
15✔
63
        return n
17,757✔
64
}
65

66
type Row struct {
67
        ValuesByPosition []TypedValue
68
        ValuesBySelector map[string]TypedValue
69
}
70

71
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
72
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
890✔
73
        for _, sel := range selectors {
1,572✔
74
                c := EncodeSelector(sel.resolve(table))
682✔
75

682✔
76
                val1, ok := row.ValuesBySelector[c]
682✔
77
                if !ok {
682✔
78
                        return false, ErrInvalidColumn
×
79
                }
×
80

81
                val2, ok := aRow.ValuesBySelector[c]
682✔
82
                if !ok {
682✔
83
                        return false, ErrInvalidColumn
×
84
                }
×
85

86
                cmp, err := val1.Compare(val2)
682✔
87
                if err != nil {
682✔
88
                        return false, err
×
89
                }
×
90

91
                if cmp != 0 {
947✔
92
                        return false, nil
265✔
93
                }
265✔
94
        }
95

96
        return true, nil
625✔
97
}
98

99
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
65✔
100
        h := sha256.New()
65✔
101

65✔
102
        for i, v := range row.ValuesByPosition {
133✔
103
                var b [4]byte
68✔
104
                binary.BigEndian.PutUint32(b[:], uint32(i))
68✔
105
                h.Write(b[:])
68✔
106

68✔
107
                _, isNull := v.(*NullValue)
68✔
108
                if isNull {
70✔
109
                        continue
2✔
110
                }
111

112
                encVal, err := EncodeValue(v, v.Type(), 0)
66✔
113
                if err != nil {
66✔
114
                        return d, err
×
115
                }
×
116

117
                h.Write(encVal)
66✔
118
        }
119

120
        copy(d[:], h.Sum(nil))
65✔
121

65✔
122
        return
65✔
123
}
124

125
type rawRowReader struct {
126
        tx         *SQLTx
127
        table      *Table
128
        tableAlias string
129
        colsByPos  []ColDescriptor
130
        colsBySel  map[string]ColDescriptor
131
        scanSpecs  *ScanSpecs
132

133
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
134
        // the query is resolved only taking into consideration that range of transactioins
135
        period period
136

137
        // underlying store supports reading entries within a range of txs
138
        // the range is calculated based on the period stmt, which is included here to support
139
        // lazy evaluation when parameters are available
140
        txRange *txRange
141

142
        params map[string]interface{}
143

144
        reader          store.KeyReader
145
        onCloseCallback func()
146
}
147

148
type txRange struct {
149
        initialTxID uint64
150
        finalTxID   uint64
151
}
152

153
type ColDescriptor struct {
154
        AggFn  string
155
        Table  string
156
        Column string
157
        Type   SQLValueType
158
}
159

160
func (d *ColDescriptor) Selector() string {
78,507✔
161
        return EncodeSelector(d.AggFn, d.Table, d.Column)
78,507✔
162
}
78,507✔
163

164
type emptyKeyReader struct {
165
}
166

167
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
1✔
168
        return nil, nil, store.ErrNoMoreEntries
1✔
169
}
1✔
170

171
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
172
        return nil, nil, store.ErrNoMoreEntries
×
173
}
×
174

175
func (r emptyKeyReader) Reset() error {
×
176
        return nil
×
177
}
×
178

179
func (r emptyKeyReader) Close() error {
1✔
180
        return nil
1✔
181
}
1✔
182

183
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
774✔
184
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
774✔
185
                return nil, ErrIllegalArguments
×
186
        }
×
187

188
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
774✔
189
        if err != nil {
774✔
190
                return nil, err
×
191
        }
×
192

193
        var r store.KeyReader
774✔
194

774✔
195
        if table.name == "pg_type" {
775✔
196
                r = &emptyKeyReader{}
1✔
197
        } else {
774✔
198
                r, err = tx.newKeyReader(*rSpec)
773✔
199
                if err != nil {
773✔
200
                        return nil, err
×
201
                }
×
202
        }
203

204
        if tableAlias == "" {
1,425✔
205
                tableAlias = table.name
651✔
206
        }
651✔
207

208
        nCols := len(table.cols) + scanSpecs.extraCols()
774✔
209

774✔
210
        colsByPos := make([]ColDescriptor, nCols)
774✔
211
        colsBySel := make(map[string]ColDescriptor, nCols)
774✔
212

774✔
213
        off := 0
774✔
214
        if scanSpecs.IncludeHistory {
776✔
215
                colDescriptor := ColDescriptor{
2✔
216
                        Table:  tableAlias,
2✔
217
                        Column: revCol,
2✔
218
                        Type:   IntegerType,
2✔
219
                }
2✔
220

2✔
221
                colsByPos[off] = colDescriptor
2✔
222
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
223
                off++
2✔
224
        }
2✔
225

226
        if scanSpecs.IncludeTxMetadata {
778✔
227
                colDescriptor := ColDescriptor{
4✔
228
                        Table:  tableAlias,
4✔
229
                        Column: txMetadataCol,
4✔
230
                        Type:   JSONType,
4✔
231
                }
4✔
232

4✔
233
                colsByPos[off] = colDescriptor
4✔
234
                colsBySel[colDescriptor.Selector()] = colDescriptor
4✔
235
                off++
4✔
236
        }
4✔
237

238
        for i, c := range table.cols {
3,229✔
239
                colDescriptor := ColDescriptor{
2,455✔
240
                        Table:  tableAlias,
2,455✔
241
                        Column: c.colName,
2,455✔
242
                        Type:   c.colType,
2,455✔
243
                }
2,455✔
244

2,455✔
245
                colsByPos[off+i] = colDescriptor
2,455✔
246
                colsBySel[colDescriptor.Selector()] = colDescriptor
2,455✔
247
        }
2,455✔
248

249
        return &rawRowReader{
774✔
250
                tx:         tx,
774✔
251
                table:      table,
774✔
252
                period:     period,
774✔
253
                tableAlias: tableAlias,
774✔
254
                colsByPos:  colsByPos,
774✔
255
                colsBySel:  colsBySel,
774✔
256
                scanSpecs:  scanSpecs,
774✔
257
                params:     params,
774✔
258
                reader:     r,
774✔
259
        }, nil
774✔
260
}
261

262
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
776✔
263
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
776✔
264

776✔
265
        var loKey []byte
776✔
266
        var loKeyReady bool
776✔
267

776✔
268
        var hiKey []byte
776✔
269
        var hiKeyReady bool
776✔
270

776✔
271
        loKey = make([]byte, len(prefix))
776✔
272
        copy(loKey, prefix)
776✔
273

776✔
274
        hiKey = make([]byte, len(prefix))
776✔
275
        copy(hiKey, prefix)
776✔
276

776✔
277
        // seekKey and endKey in the loop below are scan prefixes for beginning
776✔
278
        // and end of the index scanning range. On each index we try to make them more
776✔
279
        // concrete.
776✔
280
        for _, col := range scanSpecs.Index.cols {
1,552✔
281
                colRange, ok := scanSpecs.rangesByColID[col.id]
776✔
282
                if !ok {
1,414✔
283
                        break
638✔
284
                }
285

286
                if !hiKeyReady {
276✔
287
                        if colRange.hRange == nil {
149✔
288
                                hiKeyReady = true
11✔
289
                        } else {
138✔
290
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
127✔
291
                                if err != nil {
128✔
292
                                        return nil, err
1✔
293
                                }
1✔
294
                                hiKey = append(hiKey, encVal...)
126✔
295
                        }
296
                }
297

298
                if !loKeyReady {
274✔
299
                        if colRange.lRange == nil {
150✔
300
                                loKeyReady = true
13✔
301
                        } else {
137✔
302
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
124✔
303
                                if err != nil {
125✔
304
                                        return nil, err
1✔
305
                                }
1✔
306
                                loKey = append(loKey, encVal...)
123✔
307
                        }
308
                }
309
        }
310

311
        // Ensure the hiKey is inclusive regarding all values with that prefix
312
        hiKey = append(hiKey, KeyValPrefixUpperBound)
774✔
313

774✔
314
        seekKey := loKey
774✔
315
        endKey := hiKey
774✔
316

774✔
317
        if scanSpecs.DescOrder {
825✔
318
                seekKey, endKey = endKey, seekKey
51✔
319
        }
51✔
320

321
        return &store.KeyReaderSpec{
774✔
322
                SeekKey:        seekKey,
774✔
323
                InclusiveSeek:  true,
774✔
324
                EndKey:         endKey,
774✔
325
                InclusiveEnd:   true,
774✔
326
                Prefix:         prefix,
774✔
327
                DescOrder:      scanSpecs.DescOrder,
774✔
328
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
774✔
329
                IncludeHistory: scanSpecs.IncludeHistory,
774✔
330
        }, nil
774✔
331
}
332

333
func (r *rawRowReader) onClose(callback func()) {
330✔
334
        r.onCloseCallback = callback
330✔
335
}
330✔
336

337
func (r *rawRowReader) Tx() *SQLTx {
319,530✔
338
        return r.tx
319,530✔
339
}
319,530✔
340

341
func (r *rawRowReader) TableAlias() string {
385,905✔
342
        return r.tableAlias
385,905✔
343
}
385,905✔
344

345
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
346
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
347

22✔
348
        for i, col := range r.scanSpecs.Index.cols {
47✔
349
                cols[i] = ColDescriptor{
25✔
350
                        Table:  r.tableAlias,
25✔
351
                        Column: col.colName,
25✔
352
                        Type:   col.colType,
25✔
353
                }
25✔
354
        }
25✔
355

356
        return cols
22✔
357
}
358

359
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
94✔
360
        return r.scanSpecs
94✔
361
}
94✔
362

363
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
383✔
364
        ret := make([]ColDescriptor, len(r.colsByPos))
383✔
365
        copy(ret, r.colsByPos)
383✔
366
        return ret, nil
383✔
367
}
383✔
368

369
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
370✔
370
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
370✔
371
        for sel := range r.colsBySel {
1,665✔
372
                ret[sel] = r.colsBySel[sel]
1,295✔
373
        }
1,295✔
374
        return ret, nil
370✔
375
}
376

377
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
378
        cols, err := r.colsBySelector(ctx)
49✔
379
        if err != nil {
49✔
380
                return err
×
381
        }
×
382

383
        if r.period.start != nil {
59✔
384
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
385
                if err != nil {
10✔
386
                        return err
×
387
                }
×
388
        }
389

390
        if r.period.end != nil {
59✔
391
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
392
                if err != nil {
10✔
393
                        return err
×
394
                }
×
395
        }
396

397
        return nil
49✔
398
}
399

400
func (r *rawRowReader) Parameters() map[string]interface{} {
3,299✔
401
        return r.params
3,299✔
402
}
3,299✔
403

404
func (r *rawRowReader) reduceTxRange() (err error) {
17,471✔
405
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
34,873✔
406
                return nil
17,402✔
407
        }
17,402✔
408

409
        txRange := &txRange{
69✔
410
                initialTxID: uint64(0),
69✔
411
                finalTxID:   uint64(math.MaxUint64),
69✔
412
        }
69✔
413

69✔
414
        if r.period.start != nil {
123✔
415
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
416
                if err != nil {
72✔
417
                        return err
18✔
418
                }
18✔
419
        }
420

421
        if r.period.end != nil {
78✔
422
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
27✔
423
                if err != nil {
30✔
424
                        return err
3✔
425
                }
3✔
426
        }
427

428
        r.txRange = txRange
48✔
429

48✔
430
        return nil
48✔
431
}
432

433
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
17,471✔
434
        if err := ctx.Err(); err != nil {
17,471✔
435
                return nil, err
×
436
        }
×
437

438
        //var mkey []byte
439
        var vref store.ValueRef
17,471✔
440

17,471✔
441
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
17,471✔
442
        err := r.reduceTxRange()
17,471✔
443
        if errors.Is(err, store.ErrTxNotFound) {
17,482✔
444
                return nil, ErrNoMoreRows
11✔
445
        }
11✔
446
        if err != nil {
17,470✔
447
                return nil, err
10✔
448
        }
10✔
449

450
        if r.txRange == nil {
34,819✔
451
                _, vref, err = r.reader.Read(ctx) //mkey
17,369✔
452
        } else {
17,450✔
453
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
81✔
454
        }
81✔
455
        if err != nil {
17,915✔
456
                return nil, err
465✔
457
        }
465✔
458

459
        v, err := vref.Resolve()
16,985✔
460
        if err != nil {
16,985✔
461
                return nil, err
×
462
        }
×
463

464
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
16,985✔
465
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
16,985✔
466

16,985✔
467
        for i, col := range r.colsByPos {
90,390✔
468
                var val TypedValue
73,405✔
469

73,405✔
470
                switch col.Column {
73,405✔
471
                case revCol:
20✔
472
                        val = &Integer{val: int64(vref.HC())}
20✔
473
                case txMetadataCol:
13✔
474
                        val, err = r.parseTxMetadata(vref.TxMetadata())
13✔
475
                        if err != nil {
15✔
476
                                return nil, err
2✔
477
                        }
2✔
478
                default:
73,372✔
479
                        val = &NullValue{t: col.Type}
73,372✔
480
                }
481

482
                valuesByPosition[i] = val
73,403✔
483
                valuesBySelector[col.Selector()] = val
73,403✔
484
        }
485

486
        if len(v) < EncLenLen {
16,983✔
487
                return nil, ErrCorruptedData
×
488
        }
×
489

490
        extraCols := r.scanSpecs.extraCols()
16,983✔
491

16,983✔
492
        voff := 0
16,983✔
493

16,983✔
494
        cols := int(binary.BigEndian.Uint32(v[voff:]))
16,983✔
495
        voff += EncLenLen
16,983✔
496

16,983✔
497
        for i, pos := 0, 0; i < cols; i++ {
90,110✔
498
                if len(v) < EncIDLen {
73,127✔
499
                        return nil, ErrCorruptedData
×
500
                }
×
501

502
                colID := binary.BigEndian.Uint32(v[voff:])
73,127✔
503
                voff += EncIDLen
73,127✔
504

73,127✔
505
                col, err := r.table.GetColumnByID(colID)
73,127✔
506
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
73,152✔
507
                        // Dropped column, skip it
25✔
508
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
509
                        if err != nil {
25✔
510
                                return nil, err
×
511
                        }
×
512
                        voff += n + vlen
25✔
513

25✔
514
                        continue
25✔
515
                }
516
                if err != nil {
73,102✔
517
                        return nil, ErrCorruptedData
×
518
                }
×
519

520
                val, n, err := DecodeValue(v[voff:], col.colType)
73,102✔
521
                if err != nil {
73,102✔
522
                        return nil, err
×
523
                }
×
524

525
                voff += n
73,102✔
526

73,102✔
527
                // make sure value is inserted in the correct position
73,102✔
528
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
73,153✔
529
                        pos++
51✔
530
                }
51✔
531

532
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
73,102✔
533
                        return nil, ErrCorruptedData
×
534
                }
×
535

536
                valuesByPosition[pos+extraCols] = val
73,102✔
537

73,102✔
538
                pos++
73,102✔
539

73,102✔
540
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
73,102✔
541
        }
542

543
        if len(v)-voff > 0 {
16,983✔
544
                return nil, ErrCorruptedData
×
545
        }
×
546

547
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
16,983✔
548
}
549

550
func (r *rawRowReader) parseTxMetadata(txmd *store.TxMetadata) (TypedValue, error) {
13✔
551
        if txmd == nil {
13✔
NEW
552
                return &NullValue{t: JSONType}, nil
×
NEW
553
        }
×
554

555
        if extra := txmd.Extra(); extra != nil {
26✔
556
                if r.tx.engine.parseTxMetadata == nil {
14✔
557
                        return nil, fmt.Errorf("unable to parse tx metadata")
1✔
558
                }
1✔
559

560
                md, err := r.tx.engine.parseTxMetadata(extra)
12✔
561
                if err != nil {
13✔
562
                        return nil, fmt.Errorf("%w: %s", ErrInvalidTxMetadata, err)
1✔
563
                }
1✔
564
                return &JSON{val: md}, nil
11✔
565
        }
NEW
566
        return &NullValue{t: JSONType}, nil
×
567
}
568

569
func (r *rawRowReader) Close() error {
765✔
570
        if r.onCloseCallback != nil {
1,078✔
571
                defer r.onCloseCallback()
313✔
572
        }
313✔
573

574
        return r.reader.Close()
765✔
575
}
576

577
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
53✔
578
        var rows []*Row
53✔
579
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
211✔
580
                if rows == nil {
204✔
581
                        rows = make([]*Row, 0, len(rowBatch))
46✔
582
                }
46✔
583
                rows = append(rows, rowBatch...)
158✔
584
                return nil
158✔
585
        })
586
        return rows, err
53✔
587
}
588

589
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
119✔
590
        rows := make([]*Row, batchSize)
119✔
591

119✔
592
        hasMoreRows := true
119✔
593
        for hasMoreRows {
371✔
594
                n, err := readNRows(ctx, reader, batchSize, rows)
252✔
595

252✔
596
                if n > 0 {
485✔
597
                        if err := onBatch(rows[:n]); err != nil {
233✔
598
                                return err
×
599
                        }
×
600
                }
601

602
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
252✔
603
                if err != nil && hasMoreRows {
259✔
604
                        return err
7✔
605
                }
7✔
606
        }
607
        return nil
112✔
608
}
609

610
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
252✔
611
        for i := 0; i < n; i++ {
13,137✔
612
                r, err := reader.Read(ctx)
12,885✔
613
                if err != nil {
13,004✔
614
                        return i, err
119✔
615
                }
119✔
616
                outRows[i] = r
12,766✔
617
        }
618
        return n, nil
133✔
619
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc