• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6407416232

04 Oct 2023 02:25PM UTC coverage: 89.271% (-0.3%) from 89.573%
6407416232

push

gh-ci

jeroiraz
test(embedded/document): adjust unit test to reproduce issue when removing and adding field

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33630 of 37672 relevant lines covered (89.27%)

142446.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.25
/embedded/sql/row_reader.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "math"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
type RowReader interface {
30
        Tx() *SQLTx
31
        TableAlias() string
32
        Parameters() map[string]interface{}
33
        Read(ctx context.Context) (*Row, error)
34
        Close() error
35
        Columns(ctx context.Context) ([]ColDescriptor, error)
36
        OrderBy() []ColDescriptor
37
        ScanSpecs() *ScanSpecs
38
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
39
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
40
        onClose(func())
41
}
42

43
type ScanSpecs struct {
44
        Index         *Index
45
        rangesByColID map[uint32]*typedValueRange
46
        DescOrder     bool
47
}
48

49
type Row struct {
50
        ValuesByPosition []TypedValue
51
        ValuesBySelector map[string]TypedValue
52
}
53

54
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
55
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
265✔
56
        for _, sel := range selectors {
308✔
57
                c := EncodeSelector(sel.resolve(table))
43✔
58

43✔
59
                val1, ok := row.ValuesBySelector[c]
43✔
60
                if !ok {
43✔
61
                        return false, ErrInvalidColumn
×
62
                }
×
63

64
                val2, ok := aRow.ValuesBySelector[c]
43✔
65
                if !ok {
43✔
66
                        return false, ErrInvalidColumn
×
67
                }
×
68

69
                cmp, err := val1.Compare(val2)
43✔
70
                if err != nil {
43✔
71
                        return false, err
×
72
                }
×
73

74
                if cmp != 0 {
47✔
75
                        return false, nil
4✔
76
                }
4✔
77
        }
78

79
        return true, nil
261✔
80
}
81

82
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
65✔
83
        h := sha256.New()
65✔
84

65✔
85
        for i, v := range row.ValuesByPosition {
133✔
86
                var b [4]byte
68✔
87
                binary.BigEndian.PutUint32(b[:], uint32(i))
68✔
88
                h.Write(b[:])
68✔
89

68✔
90
                _, isNull := v.(*NullValue)
68✔
91
                if isNull {
70✔
92
                        continue
2✔
93
                }
94

95
                encVal, err := EncodeValue(v, v.Type(), 0)
66✔
96
                if err != nil {
66✔
97
                        return d, err
×
98
                }
×
99

100
                h.Write(encVal)
66✔
101
        }
102

103
        copy(d[:], h.Sum(nil))
65✔
104

65✔
105
        return
65✔
106
}
107

108
type rawRowReader struct {
109
        tx         *SQLTx
110
        table      *Table
111
        tableAlias string
112
        colsByPos  []ColDescriptor
113
        colsBySel  map[string]ColDescriptor
114
        scanSpecs  *ScanSpecs
115

116
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
117
        // the query is resolved only taking into consideration that range of transactioins
118
        period period
119

120
        // underlying store supports reading entries within a range of txs
121
        // the range is calculated based on the period stmt, which is included here to support
122
        // lazy evaluation when parameters are available
123
        txRange *txRange
124

125
        params map[string]interface{}
126

127
        reader          store.KeyReader
128
        onCloseCallback func()
129
}
130

131
type txRange struct {
132
        initialTxID uint64
133
        finalTxID   uint64
134
}
135

136
type ColDescriptor struct {
137
        AggFn  string
138
        Table  string
139
        Column string
140
        Type   SQLValueType
141
}
142

143
func (d *ColDescriptor) Selector() string {
2,920✔
144
        return EncodeSelector(d.AggFn, d.Table, d.Column)
2,920✔
145
}
2,920✔
146

147
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
580✔
148
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
580✔
149
                return nil, ErrIllegalArguments
×
150
        }
×
151

152
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
580✔
153
        if err != nil {
580✔
154
                return nil, err
×
155
        }
×
156

157
        r, err := tx.newKeyReader(*rSpec)
580✔
158
        if err != nil {
580✔
159
                return nil, err
×
160
        }
×
161

162
        if tableAlias == "" {
1,052✔
163
                tableAlias = table.name
472✔
164
        }
472✔
165

166
        colsByPos := make([]ColDescriptor, len(table.cols))
580✔
167
        colsBySel := make(map[string]ColDescriptor, len(table.cols))
580✔
168

580✔
169
        for i, c := range table.cols {
2,476✔
170
                colDescriptor := ColDescriptor{
1,896✔
171
                        Table:  tableAlias,
1,896✔
172
                        Column: c.colName,
1,896✔
173
                        Type:   c.colType,
1,896✔
174
                }
1,896✔
175

1,896✔
176
                colsByPos[i] = colDescriptor
1,896✔
177
                colsBySel[colDescriptor.Selector()] = colDescriptor
1,896✔
178
        }
1,896✔
179

180
        return &rawRowReader{
580✔
181
                tx:         tx,
580✔
182
                table:      table,
580✔
183
                period:     period,
580✔
184
                tableAlias: tableAlias,
580✔
185
                colsByPos:  colsByPos,
580✔
186
                colsBySel:  colsBySel,
580✔
187
                scanSpecs:  scanSpecs,
580✔
188
                params:     params,
580✔
189
                reader:     r,
580✔
190
        }, nil
580✔
191
}
192

193
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
582✔
194
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
582✔
195

582✔
196
        var loKey []byte
582✔
197
        var loKeyReady bool
582✔
198

582✔
199
        var hiKey []byte
582✔
200
        var hiKeyReady bool
582✔
201

582✔
202
        loKey = make([]byte, len(prefix))
582✔
203
        copy(loKey, prefix)
582✔
204

582✔
205
        hiKey = make([]byte, len(prefix))
582✔
206
        copy(hiKey, prefix)
582✔
207

582✔
208
        // seekKey and endKey in the loop below are scan prefixes for beginning
582✔
209
        // and end of the index scanning range. On each index we try to make them more
582✔
210
        // concrete.
582✔
211
        for _, col := range scanSpecs.Index.cols {
1,161✔
212
                colRange, ok := scanSpecs.rangesByColID[col.id]
579✔
213
                if !ok {
1,038✔
214
                        break
459✔
215
                }
216

217
                if !hiKeyReady {
240✔
218
                        if colRange.hRange == nil {
131✔
219
                                hiKeyReady = true
11✔
220
                        } else {
120✔
221
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
109✔
222
                                if err != nil {
110✔
223
                                        return nil, err
1✔
224
                                }
1✔
225
                                hiKey = append(hiKey, encVal...)
108✔
226
                        }
227
                }
228

229
                if !loKeyReady {
238✔
230
                        if colRange.lRange == nil {
132✔
231
                                loKeyReady = true
13✔
232
                        } else {
119✔
233
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
106✔
234
                                if err != nil {
107✔
235
                                        return nil, err
1✔
236
                                }
1✔
237
                                loKey = append(loKey, encVal...)
105✔
238
                        }
239
                }
240
        }
241

242
        // Ensure the hiKey is inclusive regarding all values with that prefix
243
        hiKey = append(hiKey, KeyValPrefixUpperBound)
580✔
244

580✔
245
        seekKey := loKey
580✔
246
        endKey := hiKey
580✔
247

580✔
248
        if scanSpecs.DescOrder {
623✔
249
                seekKey, endKey = endKey, seekKey
43✔
250
        }
43✔
251

252
        return &store.KeyReaderSpec{
580✔
253
                SeekKey:       seekKey,
580✔
254
                InclusiveSeek: true,
580✔
255
                EndKey:        endKey,
580✔
256
                InclusiveEnd:  true,
580✔
257
                Prefix:        prefix,
580✔
258
                DescOrder:     scanSpecs.DescOrder,
580✔
259
                Filters:       []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
580✔
260
        }, nil
580✔
261
}
262

263
func (r *rawRowReader) onClose(callback func()) {
264✔
264
        r.onCloseCallback = callback
264✔
265
}
264✔
266

267
func (r *rawRowReader) Tx() *SQLTx {
1,531✔
268
        return r.tx
1,531✔
269
}
1,531✔
270

271
func (r *rawRowReader) TableAlias() string {
4,811✔
272
        return r.tableAlias
4,811✔
273
}
4,811✔
274

275
func (r *rawRowReader) OrderBy() []ColDescriptor {
33✔
276
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
33✔
277

33✔
278
        for i, col := range r.scanSpecs.Index.cols {
69✔
279
                cols[i] = ColDescriptor{
36✔
280
                        Table:  r.tableAlias,
36✔
281
                        Column: col.colName,
36✔
282
                        Type:   col.colType,
36✔
283
                }
36✔
284
        }
36✔
285

286
        return cols
33✔
287
}
288

289
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
49✔
290
        return r.scanSpecs
49✔
291
}
49✔
292

293
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
183✔
294
        ret := make([]ColDescriptor, len(r.colsByPos))
183✔
295
        copy(ret, r.colsByPos)
183✔
296
        return ret, nil
183✔
297
}
183✔
298

299
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
239✔
300
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
239✔
301
        for sel := range r.colsBySel {
1,005✔
302
                ret[sel] = r.colsBySel[sel]
766✔
303
        }
766✔
304
        return ret, nil
239✔
305
}
306

307
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
308
        cols, err := r.colsBySelector(ctx)
49✔
309
        if err != nil {
49✔
310
                return err
×
311
        }
×
312

313
        if r.period.start != nil {
59✔
314
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
315
                if err != nil {
10✔
316
                        return err
×
317
                }
×
318
        }
319

320
        if r.period.end != nil {
59✔
321
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
322
                if err != nil {
10✔
323
                        return err
×
324
                }
×
325
        }
326

327
        return nil
49✔
328
}
329

330
func (r *rawRowReader) Parameters() map[string]interface{} {
1,441✔
331
        return r.params
1,441✔
332
}
1,441✔
333

334
func (r *rawRowReader) reduceTxRange() (err error) {
2,438✔
335
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
4,807✔
336
                return nil
2,369✔
337
        }
2,369✔
338

339
        txRange := &txRange{
69✔
340
                initialTxID: uint64(0),
69✔
341
                finalTxID:   uint64(math.MaxUint64),
69✔
342
        }
69✔
343

69✔
344
        if r.period.start != nil {
123✔
345
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
346
                if errors.Is(err, store.ErrTxNotFound) {
65✔
347
                        txRange.initialTxID = uint64(math.MaxUint64)
11✔
348
                }
11✔
349
                if err != nil && err != store.ErrTxNotFound {
61✔
350
                        return err
7✔
351
                }
7✔
352
        }
353

354
        if r.period.end != nil {
89✔
355
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
27✔
356
                if errors.Is(err, store.ErrTxNotFound) {
27✔
357
                        txRange.finalTxID = uint64(0)
×
358
                }
×
359
                if err != nil && err != store.ErrTxNotFound {
30✔
360
                        return err
3✔
361
                }
3✔
362
        }
363

364
        r.txRange = txRange
59✔
365

59✔
366
        return nil
59✔
367
}
368

369
func (r *rawRowReader) Read(ctx context.Context) (row *Row, err error) {
2,438✔
370
        if ctx.Err() != nil {
2,438✔
371
                return nil, err
×
372
        }
×
373

374
        //var mkey []byte
375
        var vref store.ValueRef
2,438✔
376

2,438✔
377
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
2,438✔
378
        err = r.reduceTxRange()
2,438✔
379
        if err != nil {
2,448✔
380
                return nil, err
10✔
381
        }
10✔
382

383
        if r.txRange == nil {
4,764✔
384
                _, vref, err = r.reader.Read(ctx) //mkey
2,336✔
385
        } else {
2,428✔
386
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
92✔
387
        }
92✔
388
        if err != nil {
2,737✔
389
                return nil, err
309✔
390
        }
309✔
391

392
        v, err := vref.Resolve()
2,119✔
393
        if err != nil {
2,119✔
394
                return nil, err
×
395
        }
×
396

397
        valuesByPosition := make([]TypedValue, len(r.table.cols))
2,119✔
398
        valuesBySelector := make(map[string]TypedValue, len(r.table.cols))
2,119✔
399

2,119✔
400
        for i, col := range r.table.cols {
8,864✔
401
                v := &NullValue{t: col.colType}
6,745✔
402

6,745✔
403
                valuesByPosition[i] = v
6,745✔
404
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = v
6,745✔
405
        }
6,745✔
406

407
        if len(v) < EncLenLen {
2,119✔
408
                return nil, ErrCorruptedData
×
409
        }
×
410

411
        voff := 0
2,119✔
412

2,119✔
413
        cols := int(binary.BigEndian.Uint32(v[voff:]))
2,119✔
414
        voff += EncLenLen
2,119✔
415

2,119✔
416
        for i, pos := 0, 0; i < cols; i++ {
8,631✔
417
                if len(v) < EncIDLen {
6,512✔
418
                        return nil, ErrCorruptedData
×
419
                }
×
420

421
                colID := binary.BigEndian.Uint32(v[voff:])
6,512✔
422
                voff += EncIDLen
6,512✔
423

6,512✔
424
                col, err := r.table.GetColumnByID(colID)
6,512✔
425
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
6,537✔
426
                        // Dropped column, skip it
25✔
427
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
428
                        if err != nil {
25✔
429
                                return nil, err
×
430
                        }
×
431
                        voff += n + vlen
25✔
432

25✔
433
                        continue
25✔
434
                }
435
                if err != nil {
6,487✔
436
                        return nil, ErrCorruptedData
×
437
                }
×
438

439
                val, n, err := DecodeValue(v[voff:], col.colType)
6,487✔
440
                if err != nil {
6,487✔
441
                        return nil, err
×
442
                }
×
443

444
                voff += n
6,487✔
445

6,487✔
446
                valuesByPosition[pos] = val
6,487✔
447
                pos++
6,487✔
448

6,487✔
449
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
6,487✔
450
        }
451

452
        if len(v)-voff > 0 {
2,119✔
453
                return nil, ErrCorruptedData
×
454
        }
×
455

456
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
2,119✔
457
}
458

459
func (r *rawRowReader) Close() error {
580✔
460
        if r.onCloseCallback != nil {
838✔
461
                defer r.onCloseCallback()
258✔
462
        }
258✔
463

464
        return r.reader.Close()
580✔
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc