• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25497188798

06 May 2026 07:42PM UTC coverage: 84.952% (-0.08%) from 85.027%
25497188798

push

gh-ci

vchaindz
perf(sql): add reduceBool fast path to skip *Bool boxing in COUNT(*)

CmpBoolExp.reduce, BinBoolExp.reduce, and NotBoolExp.reduce each return
&Bool{...} per call. On the issue #2093 shape (4 comparisons + 3 ANDs in
WHERE), that is 7 boxed-Bool allocations per matched row.

Add a parallel reduceBool method to Bool, NullValue, CmpBoolExp, BinBoolExp,
and NotBoolExp that returns (val, isNull, err) directly. Composition through
AND/OR trees is handled by reduceBoolValueExp, which dispatches to the fast
path for known boolean exps and falls back to reduce + type-check for
unknown ones (LikeBoolExp, ExistsBoolExp, InListExp, ColSelector...). Slow
path semantics are preserved exactly: the fallback path is the same code
that previously lived inline in CountAllWithKeyFilter.

CountAllWithKeyFilter now calls reduceBoolValueExp instead of cond.reduce +
type assertion. Other reduce call sites are unchanged.

30 of 82 new or added lines in 2 files covered. (36.59%)

15 existing lines in 1 file now uncovered.

45204 of 53211 relevant lines covered (84.95%)

126091.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.23
/embedded/sql/row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
type RowReader interface {
31
        Tx() *SQLTx
32
        TableAlias() string
33
        Parameters() map[string]interface{}
34
        Read(ctx context.Context) (*Row, error)
35
        Close() error
36
        Columns(ctx context.Context) ([]ColDescriptor, error)
37
        OrderBy() []ColDescriptor
38
        ScanSpecs() *ScanSpecs
39
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
40
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
41
        onClose(func())
42
}
43

44
type ScanSpecs struct {
45
        Index             *Index
46
        rangesByColID     map[uint32]*typedValueRange
47
        IncludeHistory    bool
48
        IncludeDiff       bool
49
        IncludeTxMetadata bool
50
        DescOrder         bool
51
        groupBySortExps   []*OrdExp
52
        orderBySortExps   []*OrdExp
53
        // neededColIDs, when non-nil, restricts column decoding to the listed IDs.
54
        // Columns absent from the map are skipped (offset advanced, no allocation).
55
        // nil means decode all columns (backward-compatible default).
56
        neededColIDs map[uint32]bool
57
}
58

59
func (s *ScanSpecs) extraCols() int {
39,843✔
60
        n := 0
39,843✔
61
        if s.IncludeHistory {
39,865✔
62
                n++
22✔
63
        }
22✔
64

65
        if s.IncludeDiff {
39,843✔
66
                n++
×
67
        }
×
68

69
        if s.IncludeTxMetadata {
39,860✔
70
                n++
17✔
71
        }
17✔
72
        return n
39,843✔
73
}
74

75
type Row struct {
76
        ValuesByPosition []TypedValue
77
        ValuesBySelector map[string]TypedValue
78
}
79

80
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
81
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
549✔
82
        for _, sel := range selectors {
966✔
83
                c := EncodeSelector(sel.resolve(table))
417✔
84

417✔
85
                val1, ok := row.ValuesBySelector[c]
417✔
86
                if !ok {
417✔
87
                        return false, ErrInvalidColumn
×
88
                }
×
89

90
                val2, ok := aRow.ValuesBySelector[c]
417✔
91
                if !ok {
417✔
92
                        return false, ErrInvalidColumn
×
93
                }
×
94

95
                cmp, err := val1.Compare(val2)
417✔
96
                if err != nil {
417✔
97
                        return false, err
×
98
                }
×
99

100
                if cmp != 0 {
548✔
101
                        return false, nil
131✔
102
                }
131✔
103
        }
104

105
        return true, nil
418✔
106
}
107

108
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
1,177✔
109
        h := sha256.New()
1,177✔
110

1,177✔
111
        for i, v := range row.ValuesByPosition {
2,381✔
112
                var b [4]byte
1,204✔
113
                binary.BigEndian.PutUint32(b[:], uint32(i))
1,204✔
114
                h.Write(b[:])
1,204✔
115

1,204✔
116
                _, isNull := v.(*NullValue)
1,204✔
117
                if isNull {
1,206✔
118
                        continue
2✔
119
                }
120

121
                encVal, err := EncodeValue(v, v.Type(), 0)
1,202✔
122
                if err != nil {
1,202✔
123
                        return d, err
×
124
                }
×
125

126
                h.Write(encVal)
1,202✔
127
        }
128

129
        copy(d[:], h.Sum(nil))
1,177✔
130

1,177✔
131
        return
1,177✔
132
}
133

134
type rawRowReader struct {
135
        tx         *SQLTx
136
        table      *Table
137
        tableAlias string
138
        colsByPos  []ColDescriptor
139
        colsBySel  map[string]ColDescriptor
140
        scanSpecs  *ScanSpecs
141

142
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
143
        // the query is resolved only taking into consideration that range of transactioins
144
        period period
145

146
        // underlying store supports reading entries within a range of txs
147
        // the range is calculated based on the period stmt, which is included here to support
148
        // lazy evaluation when parameters are available
149
        txRange *txRange
150

151
        params map[string]interface{}
152

153
        reader          store.KeyReader
154
        onCloseCallback func()
155
}
156

157
type txRange struct {
158
        initialTxID uint64
159
        finalTxID   uint64
160
}
161

162
type ColDescriptor struct {
163
        AggFn  string
164
        Table  string
165
        Column string
166
        Type   SQLValueType
167
}
168

169
func (d *ColDescriptor) Selector() string {
261,138✔
170
        return EncodeSelector(d.AggFn, d.Table, d.Column)
261,138✔
171
}
261,138✔
172

173
type emptyKeyReader struct {
174
}
175

176
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
×
177
        return nil, nil, store.ErrNoMoreEntries
×
178
}
×
179

180
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
181
        return nil, nil, store.ErrNoMoreEntries
×
182
}
×
183

184
func (r emptyKeyReader) Reset() error {
×
185
        return nil
×
186
}
×
187

188
func (r emptyKeyReader) Close() error {
×
189
        return nil
×
190
}
×
191

192
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
1,306✔
193
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
1,306✔
194
                return nil, ErrIllegalArguments
×
195
        }
×
196

197
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
1,306✔
198
        if err != nil {
1,306✔
199
                return nil, err
×
200
        }
×
201

202
        var r store.KeyReader
1,306✔
203

1,306✔
204
        // System tables have no storage backing — their rows come from
1,306✔
205
        // Table.systemScan via tableRef.Resolve. If anything still funnels
1,306✔
206
        // a rawRowReader at one (e.g. a direct internal call), fall back to
1,306✔
207
        // an empty reader rather than scanning storage with a key prefix
1,306✔
208
        // that has no entries.
1,306✔
209
        if table.systemScan != nil {
1,306✔
210
                r = &emptyKeyReader{}
×
211
        } else {
1,306✔
212
                r, err = tx.newKeyReader(*rSpec)
1,306✔
213
                if err != nil {
1,307✔
214
                        return nil, err
1✔
215
                }
1✔
216
        }
217

218
        if tableAlias == "" {
2,273✔
219
                tableAlias = table.name
968✔
220
        }
968✔
221

222
        nCols := len(table.cols) + scanSpecs.extraCols()
1,305✔
223

1,305✔
224
        colsByPos := make([]ColDescriptor, nCols)
1,305✔
225
        colsBySel := make(map[string]ColDescriptor, nCols)
1,305✔
226

1,305✔
227
        off := 0
1,305✔
228
        if scanSpecs.IncludeHistory {
1,307✔
229
                colDescriptor := ColDescriptor{
2✔
230
                        Table:  tableAlias,
2✔
231
                        Column: revCol,
2✔
232
                        Type:   IntegerType,
2✔
233
                }
2✔
234

2✔
235
                colsByPos[off] = colDescriptor
2✔
236
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
237
                off++
2✔
238
        }
2✔
239

240
        if scanSpecs.IncludeTxMetadata {
1,309✔
241
                colDescriptor := ColDescriptor{
4✔
242
                        Table:  tableAlias,
4✔
243
                        Column: txMetadataCol,
4✔
244
                        Type:   JSONType,
4✔
245
                }
4✔
246

4✔
247
                colsByPos[off] = colDescriptor
4✔
248
                colsBySel[colDescriptor.Selector()] = colDescriptor
4✔
249
                off++
4✔
250
        }
4✔
251

252
        for i, c := range table.cols {
5,191✔
253
                colDescriptor := ColDescriptor{
3,886✔
254
                        Table:  tableAlias,
3,886✔
255
                        Column: c.colName,
3,886✔
256
                        Type:   c.colType,
3,886✔
257
                }
3,886✔
258

3,886✔
259
                colsByPos[off+i] = colDescriptor
3,886✔
260
                colsBySel[colDescriptor.Selector()] = colDescriptor
3,886✔
261
        }
3,886✔
262

263
        return &rawRowReader{
1,305✔
264
                tx:         tx,
1,305✔
265
                table:      table,
1,305✔
266
                period:     period,
1,305✔
267
                tableAlias: tableAlias,
1,305✔
268
                colsByPos:  colsByPos,
1,305✔
269
                colsBySel:  colsBySel,
1,305✔
270
                scanSpecs:  scanSpecs,
1,305✔
271
                params:     params,
1,305✔
272
                reader:     r,
1,305✔
273
        }, nil
1,305✔
274
}
275

276
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
1,308✔
277
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
1,308✔
278

1,308✔
279
        var loKey []byte
1,308✔
280
        var loKeyReady bool
1,308✔
281

1,308✔
282
        var hiKey []byte
1,308✔
283
        var hiKeyReady bool
1,308✔
284

1,308✔
285
        loKey = make([]byte, len(prefix))
1,308✔
286
        copy(loKey, prefix)
1,308✔
287

1,308✔
288
        hiKey = make([]byte, len(prefix))
1,308✔
289
        copy(hiKey, prefix)
1,308✔
290

1,308✔
291
        // seekKey and endKey in the loop below are scan prefixes for beginning
1,308✔
292
        // and end of the index scanning range. On each index we try to make them more
1,308✔
293
        // concrete.
1,308✔
294
        for _, col := range scanSpecs.Index.cols {
2,582✔
295
                colRange, ok := scanSpecs.rangesByColID[col.id]
1,274✔
296
                if !ok {
2,277✔
297
                        break
1,003✔
298
                }
299

300
                if !hiKeyReady {
542✔
301
                        if colRange.hRange == nil {
292✔
302
                                hiKeyReady = true
21✔
303
                        } else {
271✔
304
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
250✔
305
                                if err != nil {
251✔
306
                                        return nil, err
1✔
307
                                }
1✔
308
                                hiKey = append(hiKey, encVal...)
249✔
309
                        }
310
                }
311

312
                if !loKeyReady {
540✔
313
                        if colRange.lRange == nil {
283✔
314
                                loKeyReady = true
13✔
315
                        } else {
270✔
316
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
257✔
317
                                if err != nil {
258✔
318
                                        return nil, err
1✔
319
                                }
1✔
320
                                loKey = append(loKey, encVal...)
256✔
321
                        }
322
                }
323
        }
324

325
        // Ensure the hiKey is inclusive regarding all values with that prefix
326
        hiKey = append(hiKey, KeyValPrefixUpperBound)
1,306✔
327

1,306✔
328
        seekKey := loKey
1,306✔
329
        endKey := hiKey
1,306✔
330

1,306✔
331
        if scanSpecs.DescOrder {
1,357✔
332
                seekKey, endKey = endKey, seekKey
51✔
333
        }
51✔
334

335
        return &store.KeyReaderSpec{
1,306✔
336
                SeekKey:        seekKey,
1,306✔
337
                InclusiveSeek:  true,
1,306✔
338
                EndKey:         endKey,
1,306✔
339
                InclusiveEnd:   true,
1,306✔
340
                Prefix:         prefix,
1,306✔
341
                DescOrder:      scanSpecs.DescOrder,
1,306✔
342
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
1,306✔
343
                IncludeHistory: scanSpecs.IncludeHistory,
1,306✔
344
        }, nil
1,306✔
345
}
346

347
func (r *rawRowReader) onClose(callback func()) {
581✔
348
        r.onCloseCallback = callback
581✔
349
}
581✔
350

351
func (r *rawRowReader) Tx() *SQLTx {
732,339✔
352
        return r.tx
732,339✔
353
}
732,339✔
354

355
func (r *rawRowReader) TableAlias() string {
1,131,086✔
356
        return r.tableAlias
1,131,086✔
357
}
1,131,086✔
358

359
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
360
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
361

22✔
362
        for i, col := range r.scanSpecs.Index.cols {
47✔
363
                cols[i] = ColDescriptor{
25✔
364
                        Table:  r.tableAlias,
25✔
365
                        Column: col.colName,
25✔
366
                        Type:   col.colType,
25✔
367
                }
25✔
368
        }
25✔
369

370
        return cols
22✔
371
}
372

373
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
131✔
374
        return r.scanSpecs
131✔
375
}
131✔
376

377
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
439✔
378
        ret := make([]ColDescriptor, len(r.colsByPos))
439✔
379
        copy(ret, r.colsByPos)
439✔
380
        return ret, nil
439✔
381
}
439✔
382

383
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
1,028✔
384
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
1,028✔
385
        for sel := range r.colsBySel {
3,951✔
386
                ret[sel] = r.colsBySel[sel]
2,923✔
387
        }
2,923✔
388
        return ret, nil
1,028✔
389
}
390

391
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
152✔
392
        cols, err := r.colsBySelector(ctx)
152✔
393
        if err != nil {
152✔
394
                return err
×
395
        }
×
396

397
        if r.period.start != nil {
162✔
398
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
399
                if err != nil {
10✔
400
                        return err
×
401
                }
×
402
        }
403

404
        if r.period.end != nil {
162✔
405
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
406
                if err != nil {
10✔
407
                        return err
×
408
                }
×
409
        }
410

411
        return nil
152✔
412
}
413

414
func (r *rawRowReader) Parameters() map[string]interface{} {
200,040✔
415
        return r.params
200,040✔
416
}
200,040✔
417

418
func (r *rawRowReader) reduceTxRange() (err error) {
39,253✔
419
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
78,429✔
420
                return nil
39,176✔
421
        }
39,176✔
422

423
        txRange := &txRange{
77✔
424
                initialTxID: uint64(0),
77✔
425
                finalTxID:   uint64(math.MaxUint64),
77✔
426
        }
77✔
427

77✔
428
        if r.period.start != nil {
131✔
429
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
430
                if err != nil {
72✔
431
                        return err
18✔
432
                }
18✔
433
        }
434

435
        if r.period.end != nil {
94✔
436
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
35✔
437
                if err != nil {
38✔
438
                        return err
3✔
439
                }
3✔
440
        }
441

442
        r.txRange = txRange
56✔
443

56✔
444
        return nil
56✔
445
}
446

447
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
39,221✔
448
        if err := ctx.Err(); err != nil {
39,221✔
449
                return nil, err
×
450
        }
×
451

452
        //var mkey []byte
453
        var vref store.ValueRef
39,221✔
454

39,221✔
455
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
39,221✔
456
        err := r.reduceTxRange()
39,221✔
457
        if errors.Is(err, store.ErrTxNotFound) {
39,232✔
458
                return nil, ErrNoMoreRows
11✔
459
        }
11✔
460
        if err != nil {
39,220✔
461
                return nil, err
10✔
462
        }
10✔
463

464
        if r.txRange == nil {
78,327✔
465
                _, vref, err = r.reader.Read(ctx) //mkey
39,127✔
466
        } else {
39,200✔
467
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
73✔
468
        }
73✔
469
        if err != nil {
39,862✔
470
                return nil, err
662✔
471
        }
662✔
472

473
        v, err := vref.Resolve()
38,538✔
474
        if err != nil {
38,538✔
475
                return nil, err
×
476
        }
×
477

478
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
38,538✔
479
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
38,538✔
480

38,538✔
481
        // extraCols is the number of synthetic leading columns (rev, txMetadata).
38,538✔
482
        // Table columns occupy r.colsByPos[extraCols:], corresponding 1-to-1 with
38,538✔
483
        // r.table.cols. Moved up so the pre-fill loop can use it for the skip check.
38,538✔
484
        extraCols := r.scanSpecs.extraCols()
38,538✔
485

38,538✔
486
        for i, col := range r.colsByPos {
248,321✔
487
                // Skip NullValue pre-fill for table columns that the query does not
209,783✔
488
                // need. The nil zero-value is semantically NULL and is never accessed
209,783✔
489
                // for columns excluded by neededColIDs. Extra synthetic columns (rev,
209,783✔
490
                // txMetadata) are always included regardless of neededColIDs.
209,783✔
491
                if r.scanSpecs.neededColIDs != nil {
236,247✔
492
                        if tableIdx := i - extraCols; tableIdx >= 0 && tableIdx < len(r.table.cols) {
52,928✔
493
                                if !r.scanSpecs.neededColIDs[r.table.cols[tableIdx].id] {
36,364✔
494
                                        continue
9,900✔
495
                                }
496
                        }
497
                }
498

499
                var val TypedValue
199,883✔
500

199,883✔
501
                switch col.Column {
199,883✔
502
                case revCol:
20✔
503
                        val = &Integer{val: int64(vref.HC())}
20✔
504
                case txMetadataCol:
13✔
505
                        val, err = r.parseTxMetadata(vref.TxMetadata())
13✔
506
                        if err != nil {
15✔
507
                                return nil, err
2✔
508
                        }
2✔
509
                default:
199,850✔
510
                        val = &NullValue{t: col.Type}
199,850✔
511
                }
512

513
                valuesByPosition[i] = val
199,881✔
514
                valuesBySelector[col.Selector()] = val
199,881✔
515
        }
516

517
        if len(v) < EncLenLen {
38,536✔
518
                return nil, ErrCorruptedData
×
519
        }
×
520

521

522
        voff := 0
38,536✔
523

38,536✔
524
        cols := int(binary.BigEndian.Uint32(v[voff:]))
38,536✔
525
        voff += EncLenLen
38,536✔
526

38,536✔
527
        for i, pos := 0, 0; i < cols; i++ {
247,771✔
528
                if len(v) < EncIDLen {
209,235✔
529
                        return nil, ErrCorruptedData
×
530
                }
×
531

532
                colID := binary.BigEndian.Uint32(v[voff:])
209,235✔
533
                voff += EncIDLen
209,235✔
534

209,235✔
535
                col, err := r.table.GetColumnByID(colID)
209,235✔
536
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
209,260✔
537
                        // Dropped column, skip it
25✔
538
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
539
                        if err != nil {
25✔
540
                                return nil, err
×
541
                        }
×
542
                        voff += n + vlen
25✔
543

25✔
544
                        continue
25✔
545
                }
546
                if err != nil {
209,210✔
547
                        return nil, ErrCorruptedData
×
548
                }
×
549

550
                // Projection pushdown: skip columns not needed by the query.
551
                // We still advance voff so the byte stream stays in sync.
552
                // pos advancement is handled by the loop at the decode site below,
553
                // since that same loop also advances pos past any skipped entries.
554
                if r.scanSpecs.neededColIDs != nil && !r.scanSpecs.neededColIDs[colID] {
218,809✔
555
                        vlen, n, err := DecodeValueLength(v[voff:])
9,599✔
556
                        if err != nil {
9,599✔
557
                                return nil, err
×
558
                        }
×
559
                        voff += n + vlen
9,599✔
560
                        continue
9,599✔
561
                }
562

563
                val, n, err := DecodeValue(v[voff:], col.colType)
199,611✔
564
                if err != nil {
199,611✔
565
                        return nil, err
×
566
                }
×
567

568
                voff += n
199,611✔
569

199,611✔
570
                // make sure value is inserted in the correct position
199,611✔
571
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
203,755✔
572
                        pos++
4,144✔
573
                }
4,144✔
574

575
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
199,611✔
576
                        return nil, ErrCorruptedData
×
577
                }
×
578

579
                valuesByPosition[pos+extraCols] = val
199,611✔
580

199,611✔
581
                pos++
199,611✔
582

199,611✔
583
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
199,611✔
584
        }
585

586
        if len(v)-voff > 0 {
38,536✔
587
                return nil, ErrCorruptedData
×
588
        }
×
589

590
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
38,536✔
591
}
592

593
// CountAll iterates the scan without decoding column values, returning the
594
// number of matching index entries. Used by the COUNT(*) fast-path.
595
func (r *rawRowReader) CountAll(ctx context.Context) (int64, error) {
26✔
596
        if err := ctx.Err(); err != nil {
26✔
597
                return 0, err
×
598
        }
×
599
        if err := r.reduceTxRange(); errors.Is(err, store.ErrTxNotFound) {
26✔
600
                return 0, nil
×
601
        } else if err != nil {
26✔
602
                return 0, err
×
603
        }
×
604
        var n int64
26✔
605
        for {
1,817✔
606
                var err error
1,791✔
607
                if r.txRange == nil {
3,549✔
608
                        _, _, err = r.reader.Read(ctx)
1,758✔
609
                } else {
1,791✔
610
                        _, _, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID)
33✔
611
                }
33✔
612
                if errors.Is(err, store.ErrNoMoreEntries) {
1,817✔
613
                        return n, nil
26✔
614
                }
26✔
615
                if err != nil {
1,765✔
616
                        return 0, err
×
617
                }
×
618
                n++
1,765✔
619
        }
620
}
621

622
// CountAllWithKeyFilter iterates the index scan and counts entries that
623
// satisfy `where`, evaluating the predicate against values decoded from the
624
// index key alone (no value-reference resolution, no row payload decode).
625
//
626
// The caller must guarantee that every column referenced by `where` belongs
627
// to r.scanSpecs.Index.cols; see SelectStmt.canCountWithKeyOnly. The encoded
628
// key layout is documented in unmapIndexEntry (catalog.go) — sqlPrefix +
629
// MappedPrefix + tableID + indexID + (per-column tag+payload) + PK bytes.
630
func (r *rawRowReader) CountAllWithKeyFilter(ctx context.Context, where ValueExp) (int64, error) {
6✔
631
        if where == nil {
6✔
632
                return r.CountAll(ctx)
×
633
        }
×
634
        if err := ctx.Err(); err != nil {
6✔
635
                return 0, err
×
636
        }
×
637
        if err := r.reduceTxRange(); errors.Is(err, store.ErrTxNotFound) {
6✔
638
                return 0, nil
×
639
        } else if err != nil {
6✔
640
                return 0, err
×
641
        }
×
642

643
        index := r.scanSpecs.Index
6✔
644
        // Bytes preceding the per-column data: sqlPrefix + MappedPrefix +
6✔
645
        // tableID(4) + indexID(4). Anything after this offset is the ordered
6✔
646
        // sequence of indexed-column tag+payload runs, terminated by the PK.
6✔
647
        headerLen := len(r.tx.engine.prefix) + len(MappedPrefix) + 2*EncIDLen
6✔
648

6✔
649
        // Reusable per-iteration buffers; sparse Row keyed only by index columns.
6✔
650
        valuesBySelector := make(map[string]TypedValue, len(index.cols))
6✔
651
        row := &Row{ValuesBySelector: valuesBySelector}
6✔
652

6✔
653
        // Substitute params once: substitute is non-mutating (stmt.go:6755-6759,
6✔
654
        // per issue #1153) and the resulting AST is reusable across reduce calls,
6✔
655
        // so per-row substitution would only churn allocations.
6✔
656
        cond, err := where.substitute(r.params)
6✔
657
        if err != nil {
6✔
UNCOV
658
                return 0, fmt.Errorf("%w: when evaluating WHERE clause", err)
×
UNCOV
659
        }
×
660

661
        // Encoded selector strings are pure functions of (alias, colName); cache
662
        // them by index-column position to avoid per-row string concatenation.
663
        selectors := make([]string, len(index.cols))
6✔
664
        for i, col := range index.cols {
22✔
665
                selectors[i] = EncodeSelector("", r.tableAlias, col.colName)
16✔
666
        }
16✔
667

668
        var n int64
6✔
669
        for {
187✔
670
                var (
181✔
671
                        mkey []byte
181✔
672
                        err  error
181✔
673
                )
181✔
674
                if r.txRange == nil {
362✔
675
                        mkey, _, err = r.reader.Read(ctx)
181✔
676
                } else {
181✔
UNCOV
677
                        mkey, _, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID)
×
UNCOV
678
                }
×
679
                if errors.Is(err, store.ErrNoMoreEntries) {
187✔
680
                        return n, nil
6✔
681
                }
6✔
682
                if err != nil {
175✔
UNCOV
683
                        return 0, err
×
UNCOV
684
                }
×
685

686
                if len(mkey) < headerLen {
175✔
UNCOV
687
                        return 0, ErrCorruptedData
×
UNCOV
688
                }
×
689

690
                off := headerLen
175✔
691
                // Reset map entries from the previous iteration. Re-using the map
175✔
692
                // avoids per-row allocation on a hot count loop.
175✔
693
                for k := range valuesBySelector {
665✔
694
                        delete(valuesBySelector, k)
490✔
695
                }
490✔
696

697
                for i, col := range index.cols {
678✔
698
                        val, consumed, derr := DecodeValueFromKey(mkey[off:], col.colType, col.MaxLen())
503✔
699
                        if derr != nil {
503✔
700
                                return 0, derr
×
701
                        }
×
702
                        off += consumed
503✔
703

503✔
704
                        valuesBySelector[selectors[i]] = val
503✔
705
                }
706

707
                match, isNull, err := reduceBoolValueExp(r.tx, row, r.tableAlias, cond)
175✔
708
                if err != nil {
175✔
709
                        return 0, fmt.Errorf("%w: when evaluating WHERE clause", err)
×
UNCOV
710
                }
×
711
                if isNull {
175✔
UNCOV
712
                        continue
×
713
                }
714
                if match {
276✔
715
                        n++
101✔
716
                }
101✔
717
        }
718
}
719

720
func (r *rawRowReader) parseTxMetadata(txmd *store.TxMetadata) (TypedValue, error) {
13✔
721
        if txmd == nil {
13✔
UNCOV
722
                return &NullValue{t: JSONType}, nil
×
UNCOV
723
        }
×
724

725
        if extra := txmd.Extra(); extra != nil {
26✔
726
                if r.tx.engine.parseTxMetadata == nil {
14✔
727
                        return nil, fmt.Errorf("unable to parse tx metadata")
1✔
728
                }
1✔
729

730
                md, err := r.tx.engine.parseTxMetadata(extra)
12✔
731
                if err != nil {
13✔
732
                        return nil, fmt.Errorf("%w: %s", ErrInvalidTxMetadata, err)
1✔
733
                }
1✔
734
                return &JSON{val: md}, nil
11✔
735
        }
UNCOV
736
        return &NullValue{t: JSONType}, nil
×
737
}
738

739
func (r *rawRowReader) Close() error {
1,313✔
740
        if r.onCloseCallback != nil {
1,893✔
741
                defer r.onCloseCallback()
580✔
742
        }
580✔
743

744
        return r.reader.Close()
1,313✔
745
}
746

747
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
143✔
748
        var rows []*Row
143✔
749
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
549✔
750
                if rows == nil {
524✔
751
                        rows = make([]*Row, 0, len(rowBatch))
118✔
752
                }
118✔
753
                rows = append(rows, rowBatch...)
406✔
754
                return nil
406✔
755
        })
756
        return rows, err
143✔
757
}
758

759
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
383✔
760
        rows := make([]*Row, batchSize)
383✔
761

383✔
762
        hasMoreRows := true
383✔
763
        for hasMoreRows {
1,078✔
764
                n, err := readNRows(ctx, reader, batchSize, rows)
695✔
765

695✔
766
                if n > 0 {
1,344✔
767
                        if err := onBatch(rows[:n]); err != nil {
649✔
UNCOV
768
                                return err
×
UNCOV
769
                        }
×
770
                }
771

772
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
695✔
773
                if err != nil && hasMoreRows {
719✔
774
                        return err
24✔
775
                }
24✔
776
        }
777
        return nil
359✔
778
}
779

780
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
695✔
781
        for i := 0; i < n; i++ {
34,165✔
782
                r, err := reader.Read(ctx)
33,470✔
783
                if err != nil {
33,853✔
784
                        return i, err
383✔
785
                }
383✔
786
                outRows[i] = r
33,087✔
787
        }
788
        return n, nil
312✔
789
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc