• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9193594629

22 May 2024 02:00PM UTC coverage: 89.52% (+0.02%) from 89.496%
9193594629

push

gh-ci

ostafen
Fix issue with wrong position of values in raw_reader

10 of 12 new or added lines in 1 file covered. (83.33%)

2 existing lines in 1 file now uncovered.

34765 of 38835 relevant lines covered (89.52%)

162536.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.93
/embedded/sql/row_reader.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "math"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
type RowReader interface {
30
        Tx() *SQLTx
31
        TableAlias() string
32
        Parameters() map[string]interface{}
33
        Read(ctx context.Context) (*Row, error)
34
        Close() error
35
        Columns(ctx context.Context) ([]ColDescriptor, error)
36
        OrderBy() []ColDescriptor
37
        ScanSpecs() *ScanSpecs
38
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
39
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
40
        onClose(func())
41
}
42

43
type ScanSpecs struct {
44
        Index              *Index
45
        rangesByColID      map[uint32]*typedValueRange
46
        IncludeHistory     bool
47
        DescOrder          bool
48
        groupBySortColumns []*OrdCol
49
        orderBySortCols    []*OrdCol
50
}
51

52
type Row struct {
53
        ValuesByPosition []TypedValue
54
        ValuesBySelector map[string]TypedValue
55
}
56

57
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
58
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
877✔
59
        for _, sel := range selectors {
1,559✔
60
                c := EncodeSelector(sel.resolve(table))
682✔
61

682✔
62
                val1, ok := row.ValuesBySelector[c]
682✔
63
                if !ok {
682✔
64
                        return false, ErrInvalidColumn
×
65
                }
×
66

67
                val2, ok := aRow.ValuesBySelector[c]
682✔
68
                if !ok {
682✔
69
                        return false, ErrInvalidColumn
×
70
                }
×
71

72
                cmp, err := val1.Compare(val2)
682✔
73
                if err != nil {
682✔
74
                        return false, err
×
75
                }
×
76

77
                if cmp != 0 {
947✔
78
                        return false, nil
265✔
79
                }
265✔
80
        }
81

82
        return true, nil
612✔
83
}
84

85
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
65✔
86
        h := sha256.New()
65✔
87

65✔
88
        for i, v := range row.ValuesByPosition {
133✔
89
                var b [4]byte
68✔
90
                binary.BigEndian.PutUint32(b[:], uint32(i))
68✔
91
                h.Write(b[:])
68✔
92

68✔
93
                _, isNull := v.(*NullValue)
68✔
94
                if isNull {
70✔
95
                        continue
2✔
96
                }
97

98
                encVal, err := EncodeValue(v, v.Type(), 0)
66✔
99
                if err != nil {
66✔
100
                        return d, err
×
101
                }
×
102

103
                h.Write(encVal)
66✔
104
        }
105

106
        copy(d[:], h.Sum(nil))
65✔
107

65✔
108
        return
65✔
109
}
110

111
type rawRowReader struct {
112
        tx         *SQLTx
113
        table      *Table
114
        tableAlias string
115
        colsByPos  []ColDescriptor
116
        colsBySel  map[string]ColDescriptor
117
        scanSpecs  *ScanSpecs
118

119
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
120
        // the query is resolved only taking into consideration that range of transactioins
121
        period period
122

123
        // underlying store supports reading entries within a range of txs
124
        // the range is calculated based on the period stmt, which is included here to support
125
        // lazy evaluation when parameters are available
126
        txRange *txRange
127

128
        params map[string]interface{}
129

130
        reader          store.KeyReader
131
        onCloseCallback func()
132
}
133

134
type txRange struct {
135
        initialTxID uint64
136
        finalTxID   uint64
137
}
138

139
type ColDescriptor struct {
140
        AggFn  string
141
        Table  string
142
        Column string
143
        Type   SQLValueType
144
}
145

146
func (d *ColDescriptor) Selector() string {
93,061✔
147
        return EncodeSelector(d.AggFn, d.Table, d.Column)
93,061✔
148
}
93,061✔
149

150
type emptyKeyReader struct {
151
}
152

153
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
1✔
154
        return nil, nil, store.ErrNoMoreEntries
1✔
155
}
1✔
156

157
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
158
        return nil, nil, store.ErrNoMoreEntries
×
159
}
×
160

161
func (r emptyKeyReader) Reset() error {
×
162
        return nil
×
163
}
×
164

165
func (r emptyKeyReader) Close() error {
1✔
166
        return nil
1✔
167
}
1✔
168

169
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
656✔
170
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
656✔
171
                return nil, ErrIllegalArguments
×
172
        }
×
173

174
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
656✔
175
        if err != nil {
656✔
176
                return nil, err
×
177
        }
×
178

179
        var r store.KeyReader
656✔
180

656✔
181
        if table.name == "pg_type" {
657✔
182
                r = &emptyKeyReader{}
1✔
183
        } else {
656✔
184
                r, err = tx.newKeyReader(*rSpec)
655✔
185
                if err != nil {
655✔
186
                        return nil, err
×
187
                }
×
188
        }
189

190
        if tableAlias == "" {
1,190✔
191
                tableAlias = table.name
534✔
192
        }
534✔
193

194
        var colsByPos []ColDescriptor
656✔
195
        var colsBySel map[string]ColDescriptor
656✔
196

656✔
197
        var off int
656✔
198

656✔
199
        if scanSpecs.IncludeHistory {
658✔
200
                colsByPos = make([]ColDescriptor, 1+len(table.cols))
2✔
201
                colsBySel = make(map[string]ColDescriptor, 1+len(table.cols))
2✔
202

2✔
203
                colDescriptor := ColDescriptor{
2✔
204
                        Table:  tableAlias,
2✔
205
                        Column: revCol,
2✔
206
                        Type:   IntegerType,
2✔
207
                }
2✔
208

2✔
209
                colsByPos[0] = colDescriptor
2✔
210
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
211

2✔
212
                off = 1
2✔
213
        } else {
656✔
214
                colsByPos = make([]ColDescriptor, len(table.cols))
654✔
215
                colsBySel = make(map[string]ColDescriptor, len(table.cols))
654✔
216
        }
654✔
217

218
        for i, c := range table.cols {
2,873✔
219
                colDescriptor := ColDescriptor{
2,217✔
220
                        Table:  tableAlias,
2,217✔
221
                        Column: c.colName,
2,217✔
222
                        Type:   c.colType,
2,217✔
223
                }
2,217✔
224

2,217✔
225
                colsByPos[off+i] = colDescriptor
2,217✔
226
                colsBySel[colDescriptor.Selector()] = colDescriptor
2,217✔
227
        }
2,217✔
228

229
        return &rawRowReader{
656✔
230
                tx:         tx,
656✔
231
                table:      table,
656✔
232
                period:     period,
656✔
233
                tableAlias: tableAlias,
656✔
234
                colsByPos:  colsByPos,
656✔
235
                colsBySel:  colsBySel,
656✔
236
                scanSpecs:  scanSpecs,
656✔
237
                params:     params,
656✔
238
                reader:     r,
656✔
239
        }, nil
656✔
240
}
241

242
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
658✔
243
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
658✔
244

658✔
245
        var loKey []byte
658✔
246
        var loKeyReady bool
658✔
247

658✔
248
        var hiKey []byte
658✔
249
        var hiKeyReady bool
658✔
250

658✔
251
        loKey = make([]byte, len(prefix))
658✔
252
        copy(loKey, prefix)
658✔
253

658✔
254
        hiKey = make([]byte, len(prefix))
658✔
255
        copy(hiKey, prefix)
658✔
256

658✔
257
        // seekKey and endKey in the loop below are scan prefixes for beginning
658✔
258
        // and end of the index scanning range. On each index we try to make them more
658✔
259
        // concrete.
658✔
260
        for _, col := range scanSpecs.Index.cols {
1,317✔
261
                colRange, ok := scanSpecs.rangesByColID[col.id]
659✔
262
                if !ok {
1,180✔
263
                        break
521✔
264
                }
265

266
                if !hiKeyReady {
276✔
267
                        if colRange.hRange == nil {
149✔
268
                                hiKeyReady = true
11✔
269
                        } else {
138✔
270
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
127✔
271
                                if err != nil {
128✔
272
                                        return nil, err
1✔
273
                                }
1✔
274
                                hiKey = append(hiKey, encVal...)
126✔
275
                        }
276
                }
277

278
                if !loKeyReady {
274✔
279
                        if colRange.lRange == nil {
150✔
280
                                loKeyReady = true
13✔
281
                        } else {
137✔
282
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
124✔
283
                                if err != nil {
125✔
284
                                        return nil, err
1✔
285
                                }
1✔
286
                                loKey = append(loKey, encVal...)
123✔
287
                        }
288
                }
289
        }
290

291
        // Ensure the hiKey is inclusive regarding all values with that prefix
292
        hiKey = append(hiKey, KeyValPrefixUpperBound)
656✔
293

656✔
294
        seekKey := loKey
656✔
295
        endKey := hiKey
656✔
296

656✔
297
        if scanSpecs.DescOrder {
707✔
298
                seekKey, endKey = endKey, seekKey
51✔
299
        }
51✔
300

301
        return &store.KeyReaderSpec{
656✔
302
                SeekKey:        seekKey,
656✔
303
                InclusiveSeek:  true,
656✔
304
                EndKey:         endKey,
656✔
305
                InclusiveEnd:   true,
656✔
306
                Prefix:         prefix,
656✔
307
                DescOrder:      scanSpecs.DescOrder,
656✔
308
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
656✔
309
                IncludeHistory: scanSpecs.IncludeHistory,
656✔
310
        }, nil
656✔
311
}
312

313
func (r *rawRowReader) onClose(callback func()) {
316✔
314
        r.onCloseCallback = callback
316✔
315
}
316✔
316

317
func (r *rawRowReader) Tx() *SQLTx {
1,734✔
318
        return r.tx
1,734✔
319
}
1,734✔
320

321
func (r *rawRowReader) TableAlias() string {
85,525✔
322
        return r.tableAlias
85,525✔
323
}
85,525✔
324

325
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
326
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
327

22✔
328
        for i, col := range r.scanSpecs.Index.cols {
47✔
329
                cols[i] = ColDescriptor{
25✔
330
                        Table:  r.tableAlias,
25✔
331
                        Column: col.colName,
25✔
332
                        Type:   col.colType,
25✔
333
                }
25✔
334
        }
25✔
335

336
        return cols
22✔
337
}
338

339
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
92✔
340
        return r.scanSpecs
92✔
341
}
92✔
342

343
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
277✔
344
        ret := make([]ColDescriptor, len(r.colsByPos))
277✔
345
        copy(ret, r.colsByPos)
277✔
346
        return ret, nil
277✔
347
}
277✔
348

349
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
320✔
350
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
320✔
351
        for sel := range r.colsBySel {
1,395✔
352
                ret[sel] = r.colsBySel[sel]
1,075✔
353
        }
1,075✔
354
        return ret, nil
320✔
355
}
356

357
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
358
        cols, err := r.colsBySelector(ctx)
49✔
359
        if err != nil {
49✔
360
                return err
×
361
        }
×
362

363
        if r.period.start != nil {
59✔
364
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
365
                if err != nil {
10✔
366
                        return err
×
367
                }
×
368
        }
369

370
        if r.period.end != nil {
59✔
371
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
372
                if err != nil {
10✔
373
                        return err
×
374
                }
×
375
        }
376

377
        return nil
49✔
378
}
379

380
func (r *rawRowReader) Parameters() map[string]interface{} {
1,589✔
381
        return r.params
1,589✔
382
}
1,589✔
383

384
func (r *rawRowReader) reduceTxRange() (err error) {
19,003✔
385
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
37,937✔
386
                return nil
18,934✔
387
        }
18,934✔
388

389
        txRange := &txRange{
69✔
390
                initialTxID: uint64(0),
69✔
391
                finalTxID:   uint64(math.MaxUint64),
69✔
392
        }
69✔
393

69✔
394
        if r.period.start != nil {
123✔
395
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
396
                if err != nil {
72✔
397
                        return err
18✔
398
                }
18✔
399
        }
400

401
        if r.period.end != nil {
78✔
402
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
27✔
403
                if err != nil {
30✔
404
                        return err
3✔
405
                }
3✔
406
        }
407

408
        r.txRange = txRange
48✔
409

48✔
410
        return nil
48✔
411
}
412

413
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
19,003✔
414
        if err := ctx.Err(); err != nil {
19,003✔
415
                return nil, err
×
416
        }
×
417

418
        //var mkey []byte
419
        var vref store.ValueRef
19,003✔
420

19,003✔
421
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
19,003✔
422
        err := r.reduceTxRange()
19,003✔
423
        if errors.Is(err, store.ErrTxNotFound) {
19,014✔
424
                return nil, ErrNoMoreRows
11✔
425
        }
11✔
426
        if err != nil {
19,002✔
427
                return nil, err
10✔
428
        }
10✔
429

430
        if r.txRange == nil {
37,883✔
431
                _, vref, err = r.reader.Read(ctx) //mkey
18,901✔
432
        } else {
18,982✔
433
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
81✔
434
        }
81✔
435
        if err != nil {
19,331✔
436
                return nil, err
349✔
437
        }
349✔
438

439
        v, err := vref.Resolve()
18,633✔
440
        if err != nil {
18,633✔
441
                return nil, err
×
442
        }
×
443

444
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
18,633✔
445
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
18,633✔
446

18,633✔
447
        for i, col := range r.colsByPos {
106,898✔
448
                var val TypedValue
88,265✔
449

88,265✔
450
                if col.Column == revCol {
88,285✔
451
                        val = &Integer{val: int64(vref.HC())}
20✔
452
                } else {
88,265✔
453
                        val = &NullValue{t: col.Type}
88,245✔
454
                }
88,245✔
455

456
                valuesByPosition[i] = val
88,265✔
457
                valuesBySelector[col.Selector()] = val
88,265✔
458
        }
459

460
        if len(v) < EncLenLen {
18,633✔
461
                return nil, ErrCorruptedData
×
462
        }
×
463

464
        voff := 0
18,633✔
465

18,633✔
466
        cols := int(binary.BigEndian.Uint32(v[voff:]))
18,633✔
467
        voff += EncLenLen
18,633✔
468

18,633✔
469
        for i, pos := 0, 0; i < cols; i++ {
106,635✔
470
                if len(v) < EncIDLen {
88,002✔
471
                        return nil, ErrCorruptedData
×
472
                }
×
473

474
                colID := binary.BigEndian.Uint32(v[voff:])
88,002✔
475
                voff += EncIDLen
88,002✔
476

88,002✔
477
                col, err := r.table.GetColumnByID(colID)
88,002✔
478
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
88,027✔
479
                        // Dropped column, skip it
25✔
480
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
481
                        if err != nil {
25✔
482
                                return nil, err
×
483
                        }
×
484
                        voff += n + vlen
25✔
485

25✔
486
                        continue
25✔
487
                }
488
                if err != nil {
87,977✔
489
                        return nil, ErrCorruptedData
×
490
                }
×
491

492
                val, n, err := DecodeValue(v[voff:], col.colType)
87,977✔
493
                if err != nil {
87,977✔
494
                        return nil, err
×
495
                }
×
496

497
                voff += n
87,977✔
498

87,977✔
499
                // make sure value is inserted in the correct position
87,977✔
500
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
88,028✔
501
                        pos++
51✔
502
                }
51✔
503

504
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
87,977✔
NEW
505
                        return nil, ErrCorruptedData
×
NEW
506
                }
×
507

508
                if r.scanSpecs.IncludeHistory {
88,017✔
509
                        valuesByPosition[pos+1] = val
40✔
510
                } else {
87,977✔
511
                        valuesByPosition[pos] = val
87,937✔
512
                }
87,937✔
513
                pos++
87,977✔
514

87,977✔
515
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
87,977✔
516
        }
517

518
        if len(v)-voff > 0 {
18,633✔
519
                return nil, ErrCorruptedData
×
520
        }
×
521

522
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
18,633✔
523
}
524

525
func (r *rawRowReader) Close() error {
647✔
526
        if r.onCloseCallback != nil {
946✔
527
                defer r.onCloseCallback()
299✔
528
        }
299✔
529

530
        return r.reader.Close()
647✔
531
}
532

533
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
39✔
534
        var rows []*Row
39✔
535
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
219✔
536
                if rows == nil {
216✔
537
                        rows = make([]*Row, 0, len(rowBatch))
36✔
538
                }
36✔
539
                rows = append(rows, rowBatch...)
180✔
540
                return nil
180✔
541
        })
542
        return rows, err
39✔
543
}
544

545
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
104✔
546
        rows := make([]*Row, batchSize)
104✔
547

104✔
548
        hasMoreRows := true
104✔
549
        for hasMoreRows {
369✔
550
                n, err := readNRows(ctx, reader, batchSize, rows)
265✔
551

265✔
552
                if n > 0 {
519✔
553
                        if err := onBatch(rows[:n]); err != nil {
254✔
554
                                return err
×
555
                        }
×
556
                }
557

558
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
265✔
559
                if err != nil && hasMoreRows {
268✔
560
                        return err
3✔
561
                }
3✔
562
        }
563
        return nil
101✔
564
}
565

566
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
265✔
567
        for i := 0; i < n; i++ {
16,484✔
568
                r, err := reader.Read(ctx)
16,219✔
569
                if err != nil {
16,323✔
570
                        return i, err
104✔
571
                }
104✔
572
                outRows[i] = r
16,115✔
573
        }
574
        return n, nil
161✔
575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc