• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6783902372

07 Nov 2023 11:34AM UTC coverage: 89.548% (-0.02%) from 89.571%
6783902372

push

gh-ci

jeroiraz
test(pkg/pgsql): unit testing for deallocate stmt

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33645 of 37572 relevant lines covered (89.55%)

146027.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.54
/embedded/sql/row_reader.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "math"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
type RowReader interface {
30
        Tx() *SQLTx
31
        TableAlias() string
32
        Parameters() map[string]interface{}
33
        Read(ctx context.Context) (*Row, error)
34
        Close() error
35
        Columns(ctx context.Context) ([]ColDescriptor, error)
36
        OrderBy() []ColDescriptor
37
        ScanSpecs() *ScanSpecs
38
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
39
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
40
        onClose(func())
41
}
42

43
type ScanSpecs struct {
44
        Index          *Index
45
        rangesByColID  map[uint32]*typedValueRange
46
        IncludeHistory bool
47
        DescOrder      bool
48
}
49

50
type Row struct {
51
        ValuesByPosition []TypedValue
52
        ValuesBySelector map[string]TypedValue
53
}
54

55
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
56
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
265✔
57
        for _, sel := range selectors {
308✔
58
                c := EncodeSelector(sel.resolve(table))
43✔
59

43✔
60
                val1, ok := row.ValuesBySelector[c]
43✔
61
                if !ok {
43✔
62
                        return false, ErrInvalidColumn
×
63
                }
×
64

65
                val2, ok := aRow.ValuesBySelector[c]
43✔
66
                if !ok {
43✔
67
                        return false, ErrInvalidColumn
×
68
                }
×
69

70
                cmp, err := val1.Compare(val2)
43✔
71
                if err != nil {
43✔
72
                        return false, err
×
73
                }
×
74

75
                if cmp != 0 {
47✔
76
                        return false, nil
4✔
77
                }
4✔
78
        }
79

80
        return true, nil
261✔
81
}
82

83
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
65✔
84
        h := sha256.New()
65✔
85

65✔
86
        for i, v := range row.ValuesByPosition {
133✔
87
                var b [4]byte
68✔
88
                binary.BigEndian.PutUint32(b[:], uint32(i))
68✔
89
                h.Write(b[:])
68✔
90

68✔
91
                _, isNull := v.(*NullValue)
68✔
92
                if isNull {
70✔
93
                        continue
2✔
94
                }
95

96
                encVal, err := EncodeValue(v, v.Type(), 0)
66✔
97
                if err != nil {
66✔
98
                        return d, err
×
99
                }
×
100

101
                h.Write(encVal)
66✔
102
        }
103

104
        copy(d[:], h.Sum(nil))
65✔
105

65✔
106
        return
65✔
107
}
108

109
type rawRowReader struct {
110
        tx         *SQLTx
111
        table      *Table
112
        tableAlias string
113
        colsByPos  []ColDescriptor
114
        colsBySel  map[string]ColDescriptor
115
        scanSpecs  *ScanSpecs
116

117
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
118
        // the query is resolved only taking into consideration that range of transactioins
119
        period period
120

121
        // underlying store supports reading entries within a range of txs
122
        // the range is calculated based on the period stmt, which is included here to support
123
        // lazy evaluation when parameters are available
124
        txRange *txRange
125

126
        params map[string]interface{}
127

128
        reader          store.KeyReader
129
        onCloseCallback func()
130
}
131

132
type txRange struct {
133
        initialTxID uint64
134
        finalTxID   uint64
135
}
136

137
type ColDescriptor struct {
138
        AggFn  string
139
        Table  string
140
        Column string
141
        Type   SQLValueType
142
}
143

144
func (d *ColDescriptor) Selector() string {
9,907✔
145
        return EncodeSelector(d.AggFn, d.Table, d.Column)
9,907✔
146
}
9,907✔
147

148
type emptyKeyReader struct {
149
}
150

151
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
1✔
152
        return nil, nil, store.ErrNoMoreEntries
1✔
153
}
1✔
154

155
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
156
        return nil, nil, store.ErrNoMoreEntries
×
157
}
×
158

159
func (r emptyKeyReader) Reset() error {
×
160
        return nil
×
161
}
×
162

163
func (r emptyKeyReader) Close() error {
1✔
164
        return nil
1✔
165
}
1✔
166

167
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
593✔
168
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
593✔
169
                return nil, ErrIllegalArguments
×
170
        }
×
171

172
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
593✔
173
        if err != nil {
593✔
174
                return nil, err
×
175
        }
×
176

177
        var r store.KeyReader
593✔
178

593✔
179
        if table.name == "pg_type" {
594✔
180
                r = &emptyKeyReader{}
1✔
181
        } else {
593✔
182
                r, err = tx.newKeyReader(*rSpec)
592✔
183
                if err != nil {
592✔
184
                        return nil, err
×
185
                }
×
186
        }
187

188
        if tableAlias == "" {
1,068✔
189
                tableAlias = table.name
475✔
190
        }
475✔
191

192
        var colsByPos []ColDescriptor
593✔
193
        var colsBySel map[string]ColDescriptor
593✔
194

593✔
195
        var off int
593✔
196

593✔
197
        if scanSpecs.IncludeHistory {
595✔
198
                colsByPos = make([]ColDescriptor, 1+len(table.cols))
2✔
199
                colsBySel = make(map[string]ColDescriptor, 1+len(table.cols))
2✔
200

2✔
201
                colDescriptor := ColDescriptor{
2✔
202
                        Table:  tableAlias,
2✔
203
                        Column: revCol,
2✔
204
                        Type:   IntegerType,
2✔
205
                }
2✔
206

2✔
207
                colsByPos[0] = colDescriptor
2✔
208
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
209

2✔
210
                off = 1
2✔
211
        } else {
593✔
212
                colsByPos = make([]ColDescriptor, len(table.cols))
591✔
213
                colsBySel = make(map[string]ColDescriptor, len(table.cols))
591✔
214
        }
591✔
215

216
        for i, c := range table.cols {
2,516✔
217
                colDescriptor := ColDescriptor{
1,923✔
218
                        Table:  tableAlias,
1,923✔
219
                        Column: c.colName,
1,923✔
220
                        Type:   c.colType,
1,923✔
221
                }
1,923✔
222

1,923✔
223
                colsByPos[off+i] = colDescriptor
1,923✔
224
                colsBySel[colDescriptor.Selector()] = colDescriptor
1,923✔
225
        }
1,923✔
226

227
        return &rawRowReader{
593✔
228
                tx:         tx,
593✔
229
                table:      table,
593✔
230
                period:     period,
593✔
231
                tableAlias: tableAlias,
593✔
232
                colsByPos:  colsByPos,
593✔
233
                colsBySel:  colsBySel,
593✔
234
                scanSpecs:  scanSpecs,
593✔
235
                params:     params,
593✔
236
                reader:     r,
593✔
237
        }, nil
593✔
238
}
239

240
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
595✔
241
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
595✔
242

595✔
243
        var loKey []byte
595✔
244
        var loKeyReady bool
595✔
245

595✔
246
        var hiKey []byte
595✔
247
        var hiKeyReady bool
595✔
248

595✔
249
        loKey = make([]byte, len(prefix))
595✔
250
        copy(loKey, prefix)
595✔
251

595✔
252
        hiKey = make([]byte, len(prefix))
595✔
253
        copy(hiKey, prefix)
595✔
254

595✔
255
        // seekKey and endKey in the loop below are scan prefixes for beginning
595✔
256
        // and end of the index scanning range. On each index we try to make them more
595✔
257
        // concrete.
595✔
258
        for _, col := range scanSpecs.Index.cols {
1,187✔
259
                colRange, ok := scanSpecs.rangesByColID[col.id]
592✔
260
                if !ok {
1,054✔
261
                        break
462✔
262
                }
263

264
                if !hiKeyReady {
260✔
265
                        if colRange.hRange == nil {
141✔
266
                                hiKeyReady = true
11✔
267
                        } else {
130✔
268
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
119✔
269
                                if err != nil {
120✔
270
                                        return nil, err
1✔
271
                                }
1✔
272
                                hiKey = append(hiKey, encVal...)
118✔
273
                        }
274
                }
275

276
                if !loKeyReady {
258✔
277
                        if colRange.lRange == nil {
142✔
278
                                loKeyReady = true
13✔
279
                        } else {
129✔
280
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
116✔
281
                                if err != nil {
117✔
282
                                        return nil, err
1✔
283
                                }
1✔
284
                                loKey = append(loKey, encVal...)
115✔
285
                        }
286
                }
287
        }
288

289
        // Ensure the hiKey is inclusive regarding all values with that prefix
290
        hiKey = append(hiKey, KeyValPrefixUpperBound)
593✔
291

593✔
292
        seekKey := loKey
593✔
293
        endKey := hiKey
593✔
294

593✔
295
        if scanSpecs.DescOrder {
637✔
296
                seekKey, endKey = endKey, seekKey
44✔
297
        }
44✔
298

299
        return &store.KeyReaderSpec{
593✔
300
                SeekKey:        seekKey,
593✔
301
                InclusiveSeek:  true,
593✔
302
                EndKey:         endKey,
593✔
303
                InclusiveEnd:   true,
593✔
304
                Prefix:         prefix,
593✔
305
                DescOrder:      scanSpecs.DescOrder,
593✔
306
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
593✔
307
                IncludeHistory: scanSpecs.IncludeHistory,
593✔
308
        }, nil
593✔
309
}
310

311
func (r *rawRowReader) onClose(callback func()) {
264✔
312
        r.onCloseCallback = callback
264✔
313
}
264✔
314

315
func (r *rawRowReader) Tx() *SQLTx {
1,594✔
316
        return r.tx
1,594✔
317
}
1,594✔
318

319
func (r *rawRowReader) TableAlias() string {
5,066✔
320
        return r.tableAlias
5,066✔
321
}
5,066✔
322

323
func (r *rawRowReader) OrderBy() []ColDescriptor {
33✔
324
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
33✔
325

33✔
326
        for i, col := range r.scanSpecs.Index.cols {
69✔
327
                cols[i] = ColDescriptor{
36✔
328
                        Table:  r.tableAlias,
36✔
329
                        Column: col.colName,
36✔
330
                        Type:   col.colType,
36✔
331
                }
36✔
332
        }
36✔
333

334
        return cols
33✔
335
}
336

337
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
49✔
338
        return r.scanSpecs
49✔
339
}
49✔
340

341
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
183✔
342
        ret := make([]ColDescriptor, len(r.colsByPos))
183✔
343
        copy(ret, r.colsByPos)
183✔
344
        return ret, nil
183✔
345
}
183✔
346

347
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
239✔
348
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
239✔
349
        for sel := range r.colsBySel {
1,005✔
350
                ret[sel] = r.colsBySel[sel]
766✔
351
        }
766✔
352
        return ret, nil
239✔
353
}
354

355
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
49✔
356
        cols, err := r.colsBySelector(ctx)
49✔
357
        if err != nil {
49✔
358
                return err
×
359
        }
×
360

361
        if r.period.start != nil {
59✔
362
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
363
                if err != nil {
10✔
364
                        return err
×
365
                }
×
366
        }
367

368
        if r.period.end != nil {
59✔
369
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
370
                if err != nil {
10✔
371
                        return err
×
372
                }
×
373
        }
374

375
        return nil
49✔
376
}
377

378
func (r *rawRowReader) Parameters() map[string]interface{} {
1,504✔
379
        return r.params
1,504✔
380
}
1,504✔
381

382
func (r *rawRowReader) reduceTxRange() (err error) {
2,534✔
383
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
4,999✔
384
                return nil
2,465✔
385
        }
2,465✔
386

387
        txRange := &txRange{
69✔
388
                initialTxID: uint64(0),
69✔
389
                finalTxID:   uint64(math.MaxUint64),
69✔
390
        }
69✔
391

69✔
392
        if r.period.start != nil {
123✔
393
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
394
                if err != nil {
72✔
395
                        return err
18✔
396
                }
18✔
397
        }
398

399
        if r.period.end != nil {
78✔
400
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
27✔
401
                if err != nil {
30✔
402
                        return err
3✔
403
                }
3✔
404
        }
405

406
        r.txRange = txRange
48✔
407

48✔
408
        return nil
48✔
409
}
410

411
func (r *rawRowReader) Read(ctx context.Context) (row *Row, err error) {
2,534✔
412
        if ctx.Err() != nil {
2,534✔
413
                return nil, err
×
414
        }
×
415

416
        //var mkey []byte
417
        var vref store.ValueRef
2,534✔
418

2,534✔
419
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
2,534✔
420
        err = r.reduceTxRange()
2,534✔
421
        if errors.Is(err, store.ErrTxNotFound) {
2,545✔
422
                return nil, ErrNoMoreRows
11✔
423
        }
11✔
424
        if err != nil {
2,533✔
425
                return nil, err
10✔
426
        }
10✔
427

428
        if r.txRange == nil {
4,945✔
429
                _, vref, err = r.reader.Read(ctx) //mkey
2,432✔
430
        } else {
2,513✔
431
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
81✔
432
        }
81✔
433
        if err != nil {
2,816✔
434
                return nil, err
303✔
435
        }
303✔
436

437
        v, err := vref.Resolve()
2,210✔
438
        if err != nil {
2,210✔
439
                return nil, err
×
440
        }
×
441

442
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
2,210✔
443
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
2,210✔
444

2,210✔
445
        for i, col := range r.colsByPos {
9,161✔
446
                var val TypedValue
6,951✔
447

6,951✔
448
                if col.Column == revCol {
6,971✔
449
                        val = &Integer{val: int64(vref.HC())}
20✔
450
                } else {
6,951✔
451
                        val = &NullValue{t: col.Type}
6,931✔
452
                }
6,931✔
453

454
                valuesByPosition[i] = val
6,951✔
455
                valuesBySelector[col.Selector()] = val
6,951✔
456
        }
457

458
        if len(v) < EncLenLen {
2,210✔
459
                return nil, ErrCorruptedData
×
460
        }
×
461

462
        voff := 0
2,210✔
463

2,210✔
464
        cols := int(binary.BigEndian.Uint32(v[voff:]))
2,210✔
465
        voff += EncLenLen
2,210✔
466

2,210✔
467
        for i, pos := 0, 0; i < cols; i++ {
8,905✔
468
                if len(v) < EncIDLen {
6,695✔
469
                        return nil, ErrCorruptedData
×
470
                }
×
471

472
                colID := binary.BigEndian.Uint32(v[voff:])
6,695✔
473
                voff += EncIDLen
6,695✔
474

6,695✔
475
                col, err := r.table.GetColumnByID(colID)
6,695✔
476
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
6,720✔
477
                        // Dropped column, skip it
25✔
478
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
479
                        if err != nil {
25✔
480
                                return nil, err
×
481
                        }
×
482
                        voff += n + vlen
25✔
483

25✔
484
                        continue
25✔
485
                }
486
                if err != nil {
6,670✔
487
                        return nil, ErrCorruptedData
×
488
                }
×
489

490
                val, n, err := DecodeValue(v[voff:], col.colType)
6,670✔
491
                if err != nil {
6,670✔
492
                        return nil, err
×
493
                }
×
494

495
                voff += n
6,670✔
496

6,670✔
497
                valuesByPosition[pos] = val
6,670✔
498
                pos++
6,670✔
499

6,670✔
500
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
6,670✔
501
        }
502

503
        if len(v)-voff > 0 {
2,210✔
504
                return nil, ErrCorruptedData
×
505
        }
×
506

507
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
2,210✔
508
}
509

510
func (r *rawRowReader) Close() error {
596✔
511
        if r.onCloseCallback != nil {
854✔
512
                defer r.onCloseCallback()
258✔
513
        }
258✔
514

515
        return r.reader.Close()
596✔
516
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc