• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24236186926

10 Apr 2026 09:25AM UTC coverage: 89.169% (-0.09%) from 89.257%
24236186926

push

gh-ci

SimoneLazzaris
fix workflows

38207 of 42848 relevant lines covered (89.17%)

151869.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.74
/embedded/sql/row_reader.go
1
/*
2
Copyright 2025 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
type RowReader interface {
31
        Tx() *SQLTx
32
        TableAlias() string
33
        Parameters() map[string]interface{}
34
        Read(ctx context.Context) (*Row, error)
35
        Close() error
36
        Columns(ctx context.Context) ([]ColDescriptor, error)
37
        OrderBy() []ColDescriptor
38
        ScanSpecs() *ScanSpecs
39
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
40
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
41
        onClose(func())
42
}
43

44
type ScanSpecs struct {
45
        Index             *Index
46
        rangesByColID     map[uint32]*typedValueRange
47
        IncludeHistory    bool
48
        IncludeDiff       bool
49
        IncludeTxMetadata bool
50
        DescOrder         bool
51
        groupBySortExps   []*OrdExp
52
        orderBySortExps   []*OrdExp
53
}
54

55
func (s *ScanSpecs) extraCols() int {
48,065✔
56
        n := 0
48,065✔
57
        if s.IncludeHistory {
48,087✔
58
                n++
22✔
59
        }
22✔
60

61
        if s.IncludeDiff {
48,065✔
62
                n++
×
63
        }
×
64

65
        if s.IncludeTxMetadata {
48,080✔
66
                n++
15✔
67
        }
15✔
68
        return n
48,065✔
69
}
70

71
type Row struct {
72
        ValuesByPosition []TypedValue
73
        ValuesBySelector map[string]TypedValue
74
}
75

76
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
77
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
894✔
78
        for _, sel := range selectors {
1,576✔
79
                c := EncodeSelector(sel.resolve(table))
682✔
80

682✔
81
                val1, ok := row.ValuesBySelector[c]
682✔
82
                if !ok {
682✔
83
                        return false, ErrInvalidColumn
×
84
                }
×
85

86
                val2, ok := aRow.ValuesBySelector[c]
682✔
87
                if !ok {
682✔
88
                        return false, ErrInvalidColumn
×
89
                }
×
90

91
                cmp, err := val1.Compare(val2)
682✔
92
                if err != nil {
682✔
93
                        return false, err
×
94
                }
×
95

96
                if cmp != 0 {
947✔
97
                        return false, nil
265✔
98
                }
265✔
99
        }
100

101
        return true, nil
629✔
102
}
103

104
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
77✔
105
        h := sha256.New()
77✔
106

77✔
107
        for i, v := range row.ValuesByPosition {
181✔
108
                var b [4]byte
104✔
109
                binary.BigEndian.PutUint32(b[:], uint32(i))
104✔
110
                h.Write(b[:])
104✔
111

104✔
112
                _, isNull := v.(*NullValue)
104✔
113
                if isNull {
106✔
114
                        continue
2✔
115
                }
116

117
                encVal, err := EncodeValue(v, v.Type(), 0)
102✔
118
                if err != nil {
102✔
119
                        return d, err
×
120
                }
×
121

122
                h.Write(encVal)
102✔
123
        }
124

125
        copy(d[:], h.Sum(nil))
77✔
126

77✔
127
        return
77✔
128
}
129

130
type rawRowReader struct {
131
        tx         *SQLTx
132
        table      *Table
133
        tableAlias string
134
        colsByPos  []ColDescriptor
135
        colsBySel  map[string]ColDescriptor
136
        scanSpecs  *ScanSpecs
137

138
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
139
        // the query is resolved only taking into consideration that range of transactioins
140
        period period
141

142
        // underlying store supports reading entries within a range of txs
143
        // the range is calculated based on the period stmt, which is included here to support
144
        // lazy evaluation when parameters are available
145
        txRange *txRange
146

147
        params map[string]interface{}
148

149
        reader          store.KeyReader
150
        onCloseCallback func()
151
}
152

153
type txRange struct {
154
        initialTxID uint64
155
        finalTxID   uint64
156
}
157

158
type ColDescriptor struct {
159
        AggFn  string
160
        Table  string
161
        Column string
162
        Type   SQLValueType
163
}
164

165
func (d *ColDescriptor) Selector() string {
279,472✔
166
        return EncodeSelector(d.AggFn, d.Table, d.Column)
279,472✔
167
}
279,472✔
168

169
type emptyKeyReader struct {
170
}
171

172
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
1✔
173
        return nil, nil, store.ErrNoMoreEntries
1✔
174
}
1✔
175

176
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
177
        return nil, nil, store.ErrNoMoreEntries
×
178
}
×
179

180
func (r emptyKeyReader) Reset() error {
×
181
        return nil
×
182
}
×
183

184
func (r emptyKeyReader) Close() error {
1✔
185
        return nil
1✔
186
}
1✔
187

188
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
979✔
189
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
979✔
190
                return nil, ErrIllegalArguments
×
191
        }
×
192

193
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
979✔
194
        if err != nil {
979✔
195
                return nil, err
×
196
        }
×
197

198
        var r store.KeyReader
979✔
199

979✔
200
        if table.name == "pg_type" {
980✔
201
                r = &emptyKeyReader{}
1✔
202
        } else {
979✔
203
                r, err = tx.newKeyReader(*rSpec)
978✔
204
                if err != nil {
978✔
205
                        return nil, err
×
206
                }
×
207
        }
208

209
        if tableAlias == "" {
1,695✔
210
                tableAlias = table.name
716✔
211
        }
716✔
212

213
        nCols := len(table.cols) + scanSpecs.extraCols()
979✔
214

979✔
215
        colsByPos := make([]ColDescriptor, nCols)
979✔
216
        colsBySel := make(map[string]ColDescriptor, nCols)
979✔
217

979✔
218
        off := 0
979✔
219
        if scanSpecs.IncludeHistory {
981✔
220
                colDescriptor := ColDescriptor{
2✔
221
                        Table:  tableAlias,
2✔
222
                        Column: revCol,
2✔
223
                        Type:   IntegerType,
2✔
224
                }
2✔
225

2✔
226
                colsByPos[off] = colDescriptor
2✔
227
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
228
                off++
2✔
229
        }
2✔
230

231
        if scanSpecs.IncludeTxMetadata {
983✔
232
                colDescriptor := ColDescriptor{
4✔
233
                        Table:  tableAlias,
4✔
234
                        Column: txMetadataCol,
4✔
235
                        Type:   JSONType,
4✔
236
                }
4✔
237

4✔
238
                colsByPos[off] = colDescriptor
4✔
239
                colsBySel[colDescriptor.Selector()] = colDescriptor
4✔
240
                off++
4✔
241
        }
4✔
242

243
        for i, c := range table.cols {
4,291✔
244
                colDescriptor := ColDescriptor{
3,312✔
245
                        Table:  tableAlias,
3,312✔
246
                        Column: c.colName,
3,312✔
247
                        Type:   c.colType,
3,312✔
248
                }
3,312✔
249

3,312✔
250
                colsByPos[off+i] = colDescriptor
3,312✔
251
                colsBySel[colDescriptor.Selector()] = colDescriptor
3,312✔
252
        }
3,312✔
253

254
        return &rawRowReader{
979✔
255
                tx:         tx,
979✔
256
                table:      table,
979✔
257
                period:     period,
979✔
258
                tableAlias: tableAlias,
979✔
259
                colsByPos:  colsByPos,
979✔
260
                colsBySel:  colsBySel,
979✔
261
                scanSpecs:  scanSpecs,
979✔
262
                params:     params,
979✔
263
                reader:     r,
979✔
264
        }, nil
979✔
265
}
266

267
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
981✔
268
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
981✔
269

981✔
270
        var loKey []byte
981✔
271
        var loKeyReady bool
981✔
272

981✔
273
        var hiKey []byte
981✔
274
        var hiKeyReady bool
981✔
275

981✔
276
        loKey = make([]byte, len(prefix))
981✔
277
        copy(loKey, prefix)
981✔
278

981✔
279
        hiKey = make([]byte, len(prefix))
981✔
280
        copy(hiKey, prefix)
981✔
281

981✔
282
        // seekKey and endKey in the loop below are scan prefixes for beginning
981✔
283
        // and end of the index scanning range. On each index we try to make them more
981✔
284
        // concrete.
981✔
285
        for _, col := range scanSpecs.Index.cols {
1,949✔
286
                colRange, ok := scanSpecs.rangesByColID[col.id]
968✔
287
                if !ok {
1,787✔
288
                        break
819✔
289
                }
290

291
                if !hiKeyReady {
298✔
292
                        if colRange.hRange == nil {
162✔
293
                                hiKeyReady = true
13✔
294
                        } else {
149✔
295
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
136✔
296
                                if err != nil {
137✔
297
                                        return nil, err
1✔
298
                                }
1✔
299
                                hiKey = append(hiKey, encVal...)
135✔
300
                        }
301
                }
302

303
                if !loKeyReady {
296✔
304
                        if colRange.lRange == nil {
161✔
305
                                loKeyReady = true
13✔
306
                        } else {
148✔
307
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
135✔
308
                                if err != nil {
136✔
309
                                        return nil, err
1✔
310
                                }
1✔
311
                                loKey = append(loKey, encVal...)
134✔
312
                        }
313
                }
314
        }
315

316
        // Ensure the hiKey is inclusive regarding all values with that prefix
317
        hiKey = append(hiKey, KeyValPrefixUpperBound)
979✔
318

979✔
319
        seekKey := loKey
979✔
320
        endKey := hiKey
979✔
321

979✔
322
        if scanSpecs.DescOrder {
1,030✔
323
                seekKey, endKey = endKey, seekKey
51✔
324
        }
51✔
325

326
        return &store.KeyReaderSpec{
979✔
327
                SeekKey:        seekKey,
979✔
328
                InclusiveSeek:  true,
979✔
329
                EndKey:         endKey,
979✔
330
                InclusiveEnd:   true,
979✔
331
                Prefix:         prefix,
979✔
332
                DescOrder:      scanSpecs.DescOrder,
979✔
333
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
979✔
334
                IncludeHistory: scanSpecs.IncludeHistory,
979✔
335
        }, nil
979✔
336
}
337

338
func (r *rawRowReader) onClose(callback func()) {
389✔
339
        r.onCloseCallback = callback
389✔
340
}
389✔
341

342
func (r *rawRowReader) Tx() *SQLTx {
744,457✔
343
        return r.tx
744,457✔
344
}
744,457✔
345

346
func (r *rawRowReader) TableAlias() string {
1,134,191✔
347
        return r.tableAlias
1,134,191✔
348
}
1,134,191✔
349

350
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
351
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
352

22✔
353
        for i, col := range r.scanSpecs.Index.cols {
47✔
354
                cols[i] = ColDescriptor{
25✔
355
                        Table:  r.tableAlias,
25✔
356
                        Column: col.colName,
25✔
357
                        Type:   col.colType,
25✔
358
                }
25✔
359
        }
25✔
360

361
        return cols
22✔
362
}
363

364
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
108✔
365
        return r.scanSpecs
108✔
366
}
108✔
367

368
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
565✔
369
        ret := make([]ColDescriptor, len(r.colsByPos))
565✔
370
        copy(ret, r.colsByPos)
565✔
371
        return ret, nil
565✔
372
}
565✔
373

374
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
441✔
375
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
441✔
376
        for sel := range r.colsBySel {
2,027✔
377
                ret[sel] = r.colsBySel[sel]
1,586✔
378
        }
1,586✔
379
        return ret, nil
441✔
380
}
381

382
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
383
        cols, err := r.colsBySelector(ctx)
49✔
384
        if err != nil {
49✔
385
                return err
×
386
        }
×
387

388
        if r.period.start != nil {
59✔
389
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
390
                if err != nil {
10✔
391
                        return err
×
392
                }
×
393
        }
394

395
        if r.period.end != nil {
59✔
396
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
397
                if err != nil {
10✔
398
                        return err
×
399
                }
×
400
        }
401

402
        return nil
49✔
403
}
404

405
func (r *rawRowReader) Parameters() map[string]interface{} {
207,006✔
406
        return r.params
207,006✔
407
}
207,006✔
408

409
func (r *rawRowReader) reduceTxRange() (err error) {
47,744✔
410
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
95,411✔
411
                return nil
47,667✔
412
        }
47,667✔
413

414
        txRange := &txRange{
77✔
415
                initialTxID: uint64(0),
77✔
416
                finalTxID:   uint64(math.MaxUint64),
77✔
417
        }
77✔
418

77✔
419
        if r.period.start != nil {
131✔
420
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
421
                if err != nil {
72✔
422
                        return err
18✔
423
                }
18✔
424
        }
425

426
        if r.period.end != nil {
94✔
427
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
35✔
428
                if err != nil {
38✔
429
                        return err
3✔
430
                }
3✔
431
        }
432

433
        r.txRange = txRange
56✔
434

56✔
435
        return nil
56✔
436
}
437

438
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
47,744✔
439
        if err := ctx.Err(); err != nil {
47,744✔
440
                return nil, err
×
441
        }
×
442

443
        //var mkey []byte
444
        var vref store.ValueRef
47,744✔
445

47,744✔
446
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
47,744✔
447
        err := r.reduceTxRange()
47,744✔
448
        if errors.Is(err, store.ErrTxNotFound) {
47,755✔
449
                return nil, ErrNoMoreRows
11✔
450
        }
11✔
451
        if err != nil {
47,743✔
452
                return nil, err
10✔
453
        }
10✔
454

455
        if r.txRange == nil {
95,340✔
456
                _, vref, err = r.reader.Read(ctx) //mkey
47,617✔
457
        } else {
47,723✔
458
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
106✔
459
        }
106✔
460
        if err != nil {
48,358✔
461
                return nil, err
635✔
462
        }
635✔
463

464
        v, err := vref.Resolve()
47,088✔
465
        if err != nil {
47,088✔
466
                return nil, err
×
467
        }
×
468

469
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
47,088✔
470
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
47,088✔
471

47,088✔
472
        for i, col := range r.colsByPos {
301,956✔
473
                var val TypedValue
254,868✔
474

254,868✔
475
                switch col.Column {
254,868✔
476
                case revCol:
20✔
477
                        val = &Integer{val: int64(vref.HC())}
20✔
478
                case txMetadataCol:
13✔
479
                        val, err = r.parseTxMetadata(vref.TxMetadata())
13✔
480
                        if err != nil {
15✔
481
                                return nil, err
2✔
482
                        }
2✔
483
                default:
254,835✔
484
                        val = &NullValue{t: col.Type}
254,835✔
485
                }
486

487
                valuesByPosition[i] = val
254,866✔
488
                valuesBySelector[col.Selector()] = val
254,866✔
489
        }
490

491
        if len(v) < EncLenLen {
47,086✔
492
                return nil, ErrCorruptedData
×
493
        }
×
494

495
        extraCols := r.scanSpecs.extraCols()
47,086✔
496

47,086✔
497
        voff := 0
47,086✔
498

47,086✔
499
        cols := int(binary.BigEndian.Uint32(v[voff:]))
47,086✔
500
        voff += EncLenLen
47,086✔
501

47,086✔
502
        for i, pos := 0, 0; i < cols; i++ {
301,441✔
503
                if len(v) < EncIDLen {
254,355✔
504
                        return nil, ErrCorruptedData
×
505
                }
×
506

507
                colID := binary.BigEndian.Uint32(v[voff:])
254,355✔
508
                voff += EncIDLen
254,355✔
509

254,355✔
510
                col, err := r.table.GetColumnByID(colID)
254,355✔
511
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
254,380✔
512
                        // Dropped column, skip it
25✔
513
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
514
                        if err != nil {
25✔
515
                                return nil, err
×
516
                        }
×
517
                        voff += n + vlen
25✔
518

25✔
519
                        continue
25✔
520
                }
521
                if err != nil {
254,330✔
522
                        return nil, ErrCorruptedData
×
523
                }
×
524

525
                val, n, err := DecodeValue(v[voff:], col.colType)
254,330✔
526
                if err != nil {
254,330✔
527
                        return nil, err
×
528
                }
×
529

530
                voff += n
254,330✔
531

254,330✔
532
                // make sure value is inserted in the correct position
254,330✔
533
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
254,584✔
534
                        pos++
254✔
535
                }
254✔
536

537
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
254,330✔
538
                        return nil, ErrCorruptedData
×
539
                }
×
540

541
                valuesByPosition[pos+extraCols] = val
254,330✔
542

254,330✔
543
                pos++
254,330✔
544

254,330✔
545
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
254,330✔
546
        }
547

548
        if len(v)-voff > 0 {
47,086✔
549
                return nil, ErrCorruptedData
×
550
        }
×
551

552
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
47,086✔
553
}
554

555
func (r *rawRowReader) parseTxMetadata(txmd *store.TxMetadata) (TypedValue, error) {
13✔
556
        if txmd == nil {
13✔
557
                return &NullValue{t: JSONType}, nil
×
558
        }
×
559

560
        if extra := txmd.Extra(); extra != nil {
26✔
561
                if r.tx.engine.parseTxMetadata == nil {
14✔
562
                        return nil, fmt.Errorf("unable to parse tx metadata")
1✔
563
                }
1✔
564

565
                md, err := r.tx.engine.parseTxMetadata(extra)
12✔
566
                if err != nil {
13✔
567
                        return nil, fmt.Errorf("%w: %s", ErrInvalidTxMetadata, err)
1✔
568
                }
1✔
569
                return &JSON{val: md}, nil
11✔
570
        }
571
        return &NullValue{t: JSONType}, nil
×
572
}
573

574
func (r *rawRowReader) Close() error {
987✔
575
        if r.onCloseCallback != nil {
1,376✔
576
                defer r.onCloseCallback()
389✔
577
        }
389✔
578

579
        return r.reader.Close()
987✔
580
}
581

582
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
113✔
583
        var rows []*Row
113✔
584
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
490✔
585
                if rows == nil {
466✔
586
                        rows = make([]*Row, 0, len(rowBatch))
89✔
587
                }
89✔
588
                rows = append(rows, rowBatch...)
377✔
589
                return nil
377✔
590
        })
591
        return rows, err
113✔
592
}
593

594
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
179✔
595
        rows := make([]*Row, batchSize)
179✔
596

179✔
597
        hasMoreRows := true
179✔
598
        for hasMoreRows {
670✔
599
                n, err := readNRows(ctx, reader, batchSize, rows)
491✔
600

491✔
601
                if n > 0 {
943✔
602
                        if err := onBatch(rows[:n]); err != nil {
452✔
603
                                return err
×
604
                        }
×
605
                }
606

607
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
491✔
608
                if err != nil && hasMoreRows {
515✔
609
                        return err
24✔
610
                }
24✔
611
        }
612
        return nil
155✔
613
}
614

615
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
491✔
616
        for i := 0; i < n; i++ {
33,353✔
617
                r, err := reader.Read(ctx)
32,862✔
618
                if err != nil {
33,041✔
619
                        return i, err
179✔
620
                }
179✔
621
                outRows[i] = r
32,683✔
622
        }
623
        return n, nil
312✔
624
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc