• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6458421232

09 Oct 2023 03:04PM UTC coverage: 89.499% (+0.2%) from 89.257%
6458421232

push

gh-ci

jeroiraz
test(embedded/tbtree): nodeRef coverage

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33451 of 37376 relevant lines covered (89.5%)

144180.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/embedded/sql/row_reader.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "math"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
type RowReader interface {
30
        Tx() *SQLTx
31
        TableAlias() string
32
        Parameters() map[string]interface{}
33
        Read(ctx context.Context) (*Row, error)
34
        Close() error
35
        Columns(ctx context.Context) ([]ColDescriptor, error)
36
        OrderBy() []ColDescriptor
37
        ScanSpecs() *ScanSpecs
38
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
39
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
40
        onClose(func())
41
}
42

43
type ScanSpecs struct {
44
        Index          *Index
45
        rangesByColID  map[uint32]*typedValueRange
46
        IncludeHistory bool
47
        DescOrder      bool
48
}
49

50
type Row struct {
51
        ValuesByPosition []TypedValue
52
        ValuesBySelector map[string]TypedValue
53
}
54

55
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
56
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
265✔
57
        for _, sel := range selectors {
308✔
58
                c := EncodeSelector(sel.resolve(table))
43✔
59

43✔
60
                val1, ok := row.ValuesBySelector[c]
43✔
61
                if !ok {
43✔
62
                        return false, ErrInvalidColumn
×
63
                }
×
64

65
                val2, ok := aRow.ValuesBySelector[c]
43✔
66
                if !ok {
43✔
67
                        return false, ErrInvalidColumn
×
68
                }
×
69

70
                cmp, err := val1.Compare(val2)
43✔
71
                if err != nil {
43✔
72
                        return false, err
×
73
                }
×
74

75
                if cmp != 0 {
47✔
76
                        return false, nil
4✔
77
                }
4✔
78
        }
79

80
        return true, nil
261✔
81
}
82

83
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
65✔
84
        h := sha256.New()
65✔
85

65✔
86
        for i, v := range row.ValuesByPosition {
133✔
87
                var b [4]byte
68✔
88
                binary.BigEndian.PutUint32(b[:], uint32(i))
68✔
89
                h.Write(b[:])
68✔
90

68✔
91
                _, isNull := v.(*NullValue)
68✔
92
                if isNull {
70✔
93
                        continue
2✔
94
                }
95

96
                encVal, err := EncodeValue(v, v.Type(), 0)
66✔
97
                if err != nil {
66✔
98
                        return d, err
×
99
                }
×
100

101
                h.Write(encVal)
66✔
102
        }
103

104
        copy(d[:], h.Sum(nil))
65✔
105

65✔
106
        return
65✔
107
}
108

109
type rawRowReader struct {
110
        tx         *SQLTx
111
        table      *Table
112
        tableAlias string
113
        colsByPos  []ColDescriptor
114
        colsBySel  map[string]ColDescriptor
115
        scanSpecs  *ScanSpecs
116

117
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
118
        // the query is resolved only taking into consideration that range of transactioins
119
        period period
120

121
        // underlying store supports reading entries within a range of txs
122
        // the range is calculated based on the period stmt, which is included here to support
123
        // lazy evaluation when parameters are available
124
        txRange *txRange
125

126
        params map[string]interface{}
127

128
        reader          store.KeyReader
129
        onCloseCallback func()
130
}
131

132
type txRange struct {
133
        initialTxID uint64
134
        finalTxID   uint64
135
}
136

137
type ColDescriptor struct {
138
        AggFn  string
139
        Table  string
140
        Column string
141
        Type   SQLValueType
142
}
143

144
func (d *ColDescriptor) Selector() string {
9,769✔
145
        return EncodeSelector(d.AggFn, d.Table, d.Column)
9,769✔
146
}
9,769✔
147

148
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
592✔
149
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
592✔
150
                return nil, ErrIllegalArguments
×
151
        }
×
152

153
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
592✔
154
        if err != nil {
592✔
155
                return nil, err
×
156
        }
×
157

158
        r, err := tx.newKeyReader(*rSpec)
592✔
159
        if err != nil {
592✔
160
                return nil, err
×
161
        }
×
162

163
        if tableAlias == "" {
1,066✔
164
                tableAlias = table.name
474✔
165
        }
474✔
166

167
        var colsByPos []ColDescriptor
592✔
168
        var colsBySel map[string]ColDescriptor
592✔
169

592✔
170
        var off int
592✔
171

592✔
172
        if scanSpecs.IncludeHistory {
594✔
173
                colsByPos = make([]ColDescriptor, 1+len(table.cols))
2✔
174
                colsBySel = make(map[string]ColDescriptor, 1+len(table.cols))
2✔
175

2✔
176
                colDescriptor := ColDescriptor{
2✔
177
                        Table:  tableAlias,
2✔
178
                        Column: revCol,
2✔
179
                        Type:   IntegerType,
2✔
180
                }
2✔
181

2✔
182
                colsByPos[0] = colDescriptor
2✔
183
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
184

2✔
185
                off = 1
2✔
186
        } else {
592✔
187
                colsByPos = make([]ColDescriptor, len(table.cols))
590✔
188
                colsBySel = make(map[string]ColDescriptor, len(table.cols))
590✔
189
        }
590✔
190

191
        for i, c := range table.cols {
2,512✔
192
                colDescriptor := ColDescriptor{
1,920✔
193
                        Table:  tableAlias,
1,920✔
194
                        Column: c.colName,
1,920✔
195
                        Type:   c.colType,
1,920✔
196
                }
1,920✔
197

1,920✔
198
                colsByPos[off+i] = colDescriptor
1,920✔
199
                colsBySel[colDescriptor.Selector()] = colDescriptor
1,920✔
200
        }
1,920✔
201

202
        return &rawRowReader{
592✔
203
                tx:         tx,
592✔
204
                table:      table,
592✔
205
                period:     period,
592✔
206
                tableAlias: tableAlias,
592✔
207
                colsByPos:  colsByPos,
592✔
208
                colsBySel:  colsBySel,
592✔
209
                scanSpecs:  scanSpecs,
592✔
210
                params:     params,
592✔
211
                reader:     r,
592✔
212
        }, nil
592✔
213
}
214

215
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
594✔
216
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
594✔
217

594✔
218
        var loKey []byte
594✔
219
        var loKeyReady bool
594✔
220

594✔
221
        var hiKey []byte
594✔
222
        var hiKeyReady bool
594✔
223

594✔
224
        loKey = make([]byte, len(prefix))
594✔
225
        copy(loKey, prefix)
594✔
226

594✔
227
        hiKey = make([]byte, len(prefix))
594✔
228
        copy(hiKey, prefix)
594✔
229

594✔
230
        // seekKey and endKey in the loop below are scan prefixes for beginning
594✔
231
        // and end of the index scanning range. On each index we try to make them more
594✔
232
        // concrete.
594✔
233
        for _, col := range scanSpecs.Index.cols {
1,185✔
234
                colRange, ok := scanSpecs.rangesByColID[col.id]
591✔
235
                if !ok {
1,052✔
236
                        break
461✔
237
                }
238

239
                if !hiKeyReady {
260✔
240
                        if colRange.hRange == nil {
141✔
241
                                hiKeyReady = true
11✔
242
                        } else {
130✔
243
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
119✔
244
                                if err != nil {
120✔
245
                                        return nil, err
1✔
246
                                }
1✔
247
                                hiKey = append(hiKey, encVal...)
118✔
248
                        }
249
                }
250

251
                if !loKeyReady {
258✔
252
                        if colRange.lRange == nil {
142✔
253
                                loKeyReady = true
13✔
254
                        } else {
129✔
255
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
116✔
256
                                if err != nil {
117✔
257
                                        return nil, err
1✔
258
                                }
1✔
259
                                loKey = append(loKey, encVal...)
115✔
260
                        }
261
                }
262
        }
263

264
        // Ensure the hiKey is inclusive regarding all values with that prefix
265
        hiKey = append(hiKey, KeyValPrefixUpperBound)
592✔
266

592✔
267
        seekKey := loKey
592✔
268
        endKey := hiKey
592✔
269

592✔
270
        if scanSpecs.DescOrder {
636✔
271
                seekKey, endKey = endKey, seekKey
44✔
272
        }
44✔
273

274
        return &store.KeyReaderSpec{
592✔
275
                SeekKey:        seekKey,
592✔
276
                InclusiveSeek:  true,
592✔
277
                EndKey:         endKey,
592✔
278
                InclusiveEnd:   true,
592✔
279
                Prefix:         prefix,
592✔
280
                DescOrder:      scanSpecs.DescOrder,
592✔
281
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
592✔
282
                IncludeHistory: scanSpecs.IncludeHistory,
592✔
283
        }, nil
592✔
284
}
285

286
func (r *rawRowReader) onClose(callback func()) {
266✔
287
        r.onCloseCallback = callback
266✔
288
}
266✔
289

290
func (r *rawRowReader) Tx() *SQLTx {
1,531✔
291
        return r.tx
1,531✔
292
}
1,531✔
293

294
func (r *rawRowReader) TableAlias() string {
4,851✔
295
        return r.tableAlias
4,851✔
296
}
4,851✔
297

298
func (r *rawRowReader) OrderBy() []ColDescriptor {
33✔
299
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
33✔
300

33✔
301
        for i, col := range r.scanSpecs.Index.cols {
69✔
302
                cols[i] = ColDescriptor{
36✔
303
                        Table:  r.tableAlias,
36✔
304
                        Column: col.colName,
36✔
305
                        Type:   col.colType,
36✔
306
                }
36✔
307
        }
36✔
308

309
        return cols
33✔
310
}
311

312
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
49✔
313
        return r.scanSpecs
49✔
314
}
49✔
315

316
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
183✔
317
        ret := make([]ColDescriptor, len(r.colsByPos))
183✔
318
        copy(ret, r.colsByPos)
183✔
319
        return ret, nil
183✔
320
}
183✔
321

322
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
239✔
323
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
239✔
324
        for sel := range r.colsBySel {
1,005✔
325
                ret[sel] = r.colsBySel[sel]
766✔
326
        }
766✔
327
        return ret, nil
239✔
328
}
329

330
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
331
        cols, err := r.colsBySelector(ctx)
49✔
332
        if err != nil {
49✔
333
                return err
×
334
        }
×
335

336
        if r.period.start != nil {
59✔
337
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
338
                if err != nil {
10✔
339
                        return err
×
340
                }
×
341
        }
342

343
        if r.period.end != nil {
59✔
344
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
345
                if err != nil {
10✔
346
                        return err
×
347
                }
×
348
        }
349

350
        return nil
49✔
351
}
352

353
func (r *rawRowReader) Parameters() map[string]interface{} {
1,441✔
354
        return r.params
1,441✔
355
}
1,441✔
356

357
func (r *rawRowReader) reduceTxRange() (err error) {
2,470✔
358
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
4,871✔
359
                return nil
2,401✔
360
        }
2,401✔
361

362
        txRange := &txRange{
69✔
363
                initialTxID: uint64(0),
69✔
364
                finalTxID:   uint64(math.MaxUint64),
69✔
365
        }
69✔
366

69✔
367
        if r.period.start != nil {
123✔
368
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
369
                if errors.Is(err, store.ErrTxNotFound) {
65✔
370
                        txRange.initialTxID = uint64(math.MaxUint64)
11✔
371
                }
11✔
372
                if err != nil && err != store.ErrTxNotFound {
61✔
373
                        return err
7✔
374
                }
7✔
375
        }
376

377
        if r.period.end != nil {
89✔
378
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
27✔
379
                if errors.Is(err, store.ErrTxNotFound) {
27✔
380
                        txRange.finalTxID = uint64(0)
×
381
                }
×
382
                if err != nil && err != store.ErrTxNotFound {
30✔
383
                        return err
3✔
384
                }
3✔
385
        }
386

387
        r.txRange = txRange
59✔
388

59✔
389
        return nil
59✔
390
}
391

392
func (r *rawRowReader) Read(ctx context.Context) (row *Row, err error) {
2,470✔
393
        if ctx.Err() != nil {
2,470✔
394
                return nil, err
×
395
        }
×
396

397
        //var mkey []byte
398
        var vref store.ValueRef
2,470✔
399

2,470✔
400
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
2,470✔
401
        err = r.reduceTxRange()
2,470✔
402
        if err != nil {
2,480✔
403
                return nil, err
10✔
404
        }
10✔
405

406
        if r.txRange == nil {
4,828✔
407
                _, vref, err = r.reader.Read(ctx) //mkey
2,368✔
408
        } else {
2,460✔
409
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
92✔
410
        }
92✔
411
        if err != nil {
2,772✔
412
                return nil, err
312✔
413
        }
312✔
414

415
        v, err := vref.Resolve()
2,148✔
416
        if err != nil {
2,148✔
417
                return nil, err
×
418
        }
×
419

420
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
2,148✔
421
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
2,148✔
422

2,148✔
423
        for i, col := range r.colsByPos {
8,971✔
424
                var val TypedValue
6,823✔
425

6,823✔
426
                if col.Column == revCol {
6,843✔
427
                        val = &Integer{val: int64(vref.HC())}
20✔
428
                } else {
6,823✔
429
                        val = &NullValue{t: col.Type}
6,803✔
430
                }
6,803✔
431

432
                valuesByPosition[i] = val
6,823✔
433
                valuesBySelector[col.Selector()] = val
6,823✔
434
        }
435

436
        if len(v) < EncLenLen {
2,148✔
437
                return nil, ErrCorruptedData
×
438
        }
×
439

440
        voff := 0
2,148✔
441

2,148✔
442
        cols := int(binary.BigEndian.Uint32(v[voff:]))
2,148✔
443
        voff += EncLenLen
2,148✔
444

2,148✔
445
        for i, pos := 0, 0; i < cols; i++ {
8,718✔
446
                if len(v) < EncIDLen {
6,570✔
447
                        return nil, ErrCorruptedData
×
448
                }
×
449

450
                colID := binary.BigEndian.Uint32(v[voff:])
6,570✔
451
                voff += EncIDLen
6,570✔
452

6,570✔
453
                col, err := r.table.GetColumnByID(colID)
6,570✔
454
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
6,595✔
455
                        // Dropped column, skip it
25✔
456
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
457
                        if err != nil {
25✔
458
                                return nil, err
×
459
                        }
×
460
                        voff += n + vlen
25✔
461

25✔
462
                        continue
25✔
463
                }
464
                if err != nil {
6,545✔
465
                        return nil, ErrCorruptedData
×
466
                }
×
467

468
                val, n, err := DecodeValue(v[voff:], col.colType)
6,545✔
469
                if err != nil {
6,545✔
470
                        return nil, err
×
471
                }
×
472

473
                voff += n
6,545✔
474

6,545✔
475
                valuesByPosition[pos] = val
6,545✔
476
                pos++
6,545✔
477

6,545✔
478
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
6,545✔
479
        }
480

481
        if len(v)-voff > 0 {
2,148✔
482
                return nil, ErrCorruptedData
×
483
        }
×
484

485
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
2,148✔
486
}
487

488
func (r *rawRowReader) Close() error {
592✔
489
        if r.onCloseCallback != nil {
852✔
490
                defer r.onCloseCallback()
260✔
491
        }
260✔
492

493
        return r.reader.Close()
592✔
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc