• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25364743170

05 May 2026 08:00AM UTC coverage: 85.038% (-0.2%) from 85.255%
25364743170

push

gh-ci

vchaindz
chore(deps): bump jackc/pgx/v5 v5.9.1 -> v5.9.2 (CVE-2026-41889)

Patches a low-severity SQL-injection edge case in pgx's simple-protocol
codepath when a dollar-quoted string literal embeds attacker-controllable
placeholder text (GHSA-j88v-2chj-qfwx).

Not exploitable in immudb: pgx is a test-only dependency used by
pkg/pgsql/server/pgsql_{hardened,compat_integration,integration}_test.go
to drive the wire-compat layer with a real Postgres client. It is not
in the production package graph (`go list -deps ./cmd/... ./pkg/...
./embedded/...` returns 0 for jackc/pgx). Bumped purely to keep the
Dependabot alert clean.

Verified:
  go build ./...                                ok
  go test -count=1 ./pkg/pgsql/server/          ok

45157 of 53102 relevant lines covered (85.04%)

126482.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.8
/embedded/sql/row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
type RowReader interface {
31
        Tx() *SQLTx
32
        TableAlias() string
33
        Parameters() map[string]interface{}
34
        Read(ctx context.Context) (*Row, error)
35
        Close() error
36
        Columns(ctx context.Context) ([]ColDescriptor, error)
37
        OrderBy() []ColDescriptor
38
        ScanSpecs() *ScanSpecs
39
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
40
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
41
        onClose(func())
42
}
43

44
type ScanSpecs struct {
45
        Index             *Index
46
        rangesByColID     map[uint32]*typedValueRange
47
        IncludeHistory    bool
48
        IncludeDiff       bool
49
        IncludeTxMetadata bool
50
        DescOrder         bool
51
        groupBySortExps   []*OrdExp
52
        orderBySortExps   []*OrdExp
53
        // neededColIDs, when non-nil, restricts column decoding to the listed IDs.
54
        // Columns absent from the map are skipped (offset advanced, no allocation).
55
        // nil means decode all columns (backward-compatible default).
56
        neededColIDs map[uint32]bool
57
}
58

59
func (s *ScanSpecs) extraCols() int {
39,793✔
60
        n := 0
39,793✔
61
        if s.IncludeHistory {
39,815✔
62
                n++
22✔
63
        }
22✔
64

65
        if s.IncludeDiff {
39,793✔
66
                n++
×
67
        }
×
68

69
        if s.IncludeTxMetadata {
39,810✔
70
                n++
17✔
71
        }
17✔
72
        return n
39,793✔
73
}
74

75
type Row struct {
76
        ValuesByPosition []TypedValue
77
        ValuesBySelector map[string]TypedValue
78
}
79

80
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
81
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
547✔
82
        for _, sel := range selectors {
964✔
83
                c := EncodeSelector(sel.resolve(table))
417✔
84

417✔
85
                val1, ok := row.ValuesBySelector[c]
417✔
86
                if !ok {
417✔
87
                        return false, ErrInvalidColumn
×
88
                }
×
89

90
                val2, ok := aRow.ValuesBySelector[c]
417✔
91
                if !ok {
417✔
92
                        return false, ErrInvalidColumn
×
93
                }
×
94

95
                cmp, err := val1.Compare(val2)
417✔
96
                if err != nil {
417✔
97
                        return false, err
×
98
                }
×
99

100
                if cmp != 0 {
548✔
101
                        return false, nil
131✔
102
                }
131✔
103
        }
104

105
        return true, nil
416✔
106
}
107

108
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
1,177✔
109
        h := sha256.New()
1,177✔
110

1,177✔
111
        for i, v := range row.ValuesByPosition {
2,381✔
112
                var b [4]byte
1,204✔
113
                binary.BigEndian.PutUint32(b[:], uint32(i))
1,204✔
114
                h.Write(b[:])
1,204✔
115

1,204✔
116
                _, isNull := v.(*NullValue)
1,204✔
117
                if isNull {
1,206✔
118
                        continue
2✔
119
                }
120

121
                encVal, err := EncodeValue(v, v.Type(), 0)
1,202✔
122
                if err != nil {
1,202✔
123
                        return d, err
×
124
                }
×
125

126
                h.Write(encVal)
1,202✔
127
        }
128

129
        copy(d[:], h.Sum(nil))
1,177✔
130

1,177✔
131
        return
1,177✔
132
}
133

134
type rawRowReader struct {
135
        tx         *SQLTx
136
        table      *Table
137
        tableAlias string
138
        colsByPos  []ColDescriptor
139
        colsBySel  map[string]ColDescriptor
140
        scanSpecs  *ScanSpecs
141

142
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
143
        // the query is resolved only taking into consideration that range of transactioins
144
        period period
145

146
        // underlying store supports reading entries within a range of txs
147
        // the range is calculated based on the period stmt, which is included here to support
148
        // lazy evaluation when parameters are available
149
        txRange *txRange
150

151
        params map[string]interface{}
152

153
        reader          store.KeyReader
154
        onCloseCallback func()
155
}
156

157
type txRange struct {
158
        initialTxID uint64
159
        finalTxID   uint64
160
}
161

162
type ColDescriptor struct {
163
        AggFn  string
164
        Table  string
165
        Column string
166
        Type   SQLValueType
167
}
168

169
func (d *ColDescriptor) Selector() string {
260,481✔
170
        return EncodeSelector(d.AggFn, d.Table, d.Column)
260,481✔
171
}
260,481✔
172

173
type emptyKeyReader struct {
174
}
175

176
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
×
177
        return nil, nil, store.ErrNoMoreEntries
×
178
}
×
179

180
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
181
        return nil, nil, store.ErrNoMoreEntries
×
182
}
×
183

184
func (r emptyKeyReader) Reset() error {
×
185
        return nil
×
186
}
×
187

188
func (r emptyKeyReader) Close() error {
×
189
        return nil
×
190
}
×
191

192
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
1,293✔
193
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
1,293✔
194
                return nil, ErrIllegalArguments
×
195
        }
×
196

197
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
1,293✔
198
        if err != nil {
1,293✔
199
                return nil, err
×
200
        }
×
201

202
        var r store.KeyReader
1,293✔
203

1,293✔
204
        // System tables have no storage backing — their rows come from
1,293✔
205
        // Table.systemScan via tableRef.Resolve. If anything still funnels
1,293✔
206
        // a rawRowReader at one (e.g. a direct internal call), fall back to
1,293✔
207
        // an empty reader rather than scanning storage with a key prefix
1,293✔
208
        // that has no entries.
1,293✔
209
        if table.systemScan != nil {
1,293✔
210
                r = &emptyKeyReader{}
×
211
        } else {
1,293✔
212
                r, err = tx.newKeyReader(*rSpec)
1,293✔
213
                if err != nil {
1,294✔
214
                        return nil, err
1✔
215
                }
1✔
216
        }
217

218
        if tableAlias == "" {
2,249✔
219
                tableAlias = table.name
957✔
220
        }
957✔
221

222
        nCols := len(table.cols) + scanSpecs.extraCols()
1,292✔
223

1,292✔
224
        colsByPos := make([]ColDescriptor, nCols)
1,292✔
225
        colsBySel := make(map[string]ColDescriptor, nCols)
1,292✔
226

1,292✔
227
        off := 0
1,292✔
228
        if scanSpecs.IncludeHistory {
1,294✔
229
                colDescriptor := ColDescriptor{
2✔
230
                        Table:  tableAlias,
2✔
231
                        Column: revCol,
2✔
232
                        Type:   IntegerType,
2✔
233
                }
2✔
234

2✔
235
                colsByPos[off] = colDescriptor
2✔
236
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
237
                off++
2✔
238
        }
2✔
239

240
        if scanSpecs.IncludeTxMetadata {
1,296✔
241
                colDescriptor := ColDescriptor{
4✔
242
                        Table:  tableAlias,
4✔
243
                        Column: txMetadataCol,
4✔
244
                        Type:   JSONType,
4✔
245
                }
4✔
246

4✔
247
                colsByPos[off] = colDescriptor
4✔
248
                colsBySel[colDescriptor.Selector()] = colDescriptor
4✔
249
                off++
4✔
250
        }
4✔
251

252
        for i, c := range table.cols {
5,127✔
253
                colDescriptor := ColDescriptor{
3,835✔
254
                        Table:  tableAlias,
3,835✔
255
                        Column: c.colName,
3,835✔
256
                        Type:   c.colType,
3,835✔
257
                }
3,835✔
258

3,835✔
259
                colsByPos[off+i] = colDescriptor
3,835✔
260
                colsBySel[colDescriptor.Selector()] = colDescriptor
3,835✔
261
        }
3,835✔
262

263
        return &rawRowReader{
1,292✔
264
                tx:         tx,
1,292✔
265
                table:      table,
1,292✔
266
                period:     period,
1,292✔
267
                tableAlias: tableAlias,
1,292✔
268
                colsByPos:  colsByPos,
1,292✔
269
                colsBySel:  colsBySel,
1,292✔
270
                scanSpecs:  scanSpecs,
1,292✔
271
                params:     params,
1,292✔
272
                reader:     r,
1,292✔
273
        }, nil
1,292✔
274
}
275

276
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
1,295✔
277
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
1,295✔
278

1,295✔
279
        var loKey []byte
1,295✔
280
        var loKeyReady bool
1,295✔
281

1,295✔
282
        var hiKey []byte
1,295✔
283
        var hiKeyReady bool
1,295✔
284

1,295✔
285
        loKey = make([]byte, len(prefix))
1,295✔
286
        copy(loKey, prefix)
1,295✔
287

1,295✔
288
        hiKey = make([]byte, len(prefix))
1,295✔
289
        copy(hiKey, prefix)
1,295✔
290

1,295✔
291
        // seekKey and endKey in the loop below are scan prefixes for beginning
1,295✔
292
        // and end of the index scanning range. On each index we try to make them more
1,295✔
293
        // concrete.
1,295✔
294
        for _, col := range scanSpecs.Index.cols {
2,554✔
295
                colRange, ok := scanSpecs.rangesByColID[col.id]
1,259✔
296
                if !ok {
2,257✔
297
                        break
998✔
298
                }
299

300
                if !hiKeyReady {
522✔
301
                        if colRange.hRange == nil {
282✔
302
                                hiKeyReady = true
21✔
303
                        } else {
261✔
304
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
240✔
305
                                if err != nil {
241✔
306
                                        return nil, err
1✔
307
                                }
1✔
308
                                hiKey = append(hiKey, encVal...)
239✔
309
                        }
310
                }
311

312
                if !loKeyReady {
520✔
313
                        if colRange.lRange == nil {
273✔
314
                                loKeyReady = true
13✔
315
                        } else {
260✔
316
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
247✔
317
                                if err != nil {
248✔
318
                                        return nil, err
1✔
319
                                }
1✔
320
                                loKey = append(loKey, encVal...)
246✔
321
                        }
322
                }
323
        }
324

325
        // Ensure the hiKey is inclusive regarding all values with that prefix
326
        hiKey = append(hiKey, KeyValPrefixUpperBound)
1,293✔
327

1,293✔
328
        seekKey := loKey
1,293✔
329
        endKey := hiKey
1,293✔
330

1,293✔
331
        if scanSpecs.DescOrder {
1,344✔
332
                seekKey, endKey = endKey, seekKey
51✔
333
        }
51✔
334

335
        return &store.KeyReaderSpec{
1,293✔
336
                SeekKey:        seekKey,
1,293✔
337
                InclusiveSeek:  true,
1,293✔
338
                EndKey:         endKey,
1,293✔
339
                InclusiveEnd:   true,
1,293✔
340
                Prefix:         prefix,
1,293✔
341
                DescOrder:      scanSpecs.DescOrder,
1,293✔
342
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
1,293✔
343
                IncludeHistory: scanSpecs.IncludeHistory,
1,293✔
344
        }, nil
1,293✔
345
}
346

347
func (r *rawRowReader) onClose(callback func()) {
577✔
348
        r.onCloseCallback = callback
577✔
349
}
577✔
350

351
func (r *rawRowReader) Tx() *SQLTx {
732,162✔
352
        return r.tx
732,162✔
353
}
732,162✔
354

355
func (r *rawRowReader) TableAlias() string {
1,130,554✔
356
        return r.tableAlias
1,130,554✔
357
}
1,130,554✔
358

359
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
360
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
361

22✔
362
        for i, col := range r.scanSpecs.Index.cols {
47✔
363
                cols[i] = ColDescriptor{
25✔
364
                        Table:  r.tableAlias,
25✔
365
                        Column: col.colName,
25✔
366
                        Type:   col.colType,
25✔
367
                }
25✔
368
        }
25✔
369

370
        return cols
22✔
371
}
372

373
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
120✔
374
        return r.scanSpecs
120✔
375
}
120✔
376

377
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
428✔
378
        ret := make([]ColDescriptor, len(r.colsByPos))
428✔
379
        copy(ret, r.colsByPos)
428✔
380
        return ret, nil
428✔
381
}
428✔
382

383
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
1,021✔
384
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
1,021✔
385
        for sel := range r.colsBySel {
3,921✔
386
                ret[sel] = r.colsBySel[sel]
2,900✔
387
        }
2,900✔
388
        return ret, nil
1,021✔
389
}
390

391
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
152✔
392
        cols, err := r.colsBySelector(ctx)
152✔
393
        if err != nil {
152✔
394
                return err
×
395
        }
×
396

397
        if r.period.start != nil {
162✔
398
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
399
                if err != nil {
10✔
400
                        return err
×
401
                }
×
402
        }
403

404
        if r.period.end != nil {
162✔
405
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
406
                if err != nil {
10✔
407
                        return err
×
408
                }
×
409
        }
410

411
        return nil
152✔
412
}
413

414
func (r *rawRowReader) Parameters() map[string]interface{} {
199,863✔
415
        return r.params
199,863✔
416
}
199,863✔
417

418
func (r *rawRowReader) reduceTxRange() (err error) {
39,205✔
419
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
78,333✔
420
                return nil
39,128✔
421
        }
39,128✔
422

423
        txRange := &txRange{
77✔
424
                initialTxID: uint64(0),
77✔
425
                finalTxID:   uint64(math.MaxUint64),
77✔
426
        }
77✔
427

77✔
428
        if r.period.start != nil {
131✔
429
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
430
                if err != nil {
72✔
431
                        return err
18✔
432
                }
18✔
433
        }
434

435
        if r.period.end != nil {
94✔
436
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
35✔
437
                if err != nil {
38✔
438
                        return err
3✔
439
                }
3✔
440
        }
441

442
        r.txRange = txRange
56✔
443

56✔
444
        return nil
56✔
445
}
446

447
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
39,173✔
448
        if err := ctx.Err(); err != nil {
39,173✔
449
                return nil, err
×
450
        }
×
451

452
        //var mkey []byte
453
        var vref store.ValueRef
39,173✔
454

39,173✔
455
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
39,173✔
456
        err := r.reduceTxRange()
39,173✔
457
        if errors.Is(err, store.ErrTxNotFound) {
39,184✔
458
                return nil, ErrNoMoreRows
11✔
459
        }
11✔
460
        if err != nil {
39,172✔
461
                return nil, err
10✔
462
        }
10✔
463

464
        if r.txRange == nil {
78,231✔
465
                _, vref, err = r.reader.Read(ctx) //mkey
39,079✔
466
        } else {
39,152✔
467
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
73✔
468
        }
73✔
469
        if err != nil {
39,803✔
470
                return nil, err
651✔
471
        }
651✔
472

473
        v, err := vref.Resolve()
38,501✔
474
        if err != nil {
38,501✔
475
                return nil, err
×
476
        }
×
477

478
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
38,501✔
479
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
38,501✔
480

38,501✔
481
        // extraCols is the number of synthetic leading columns (rev, txMetadata).
38,501✔
482
        // Table columns occupy r.colsByPos[extraCols:], corresponding 1-to-1 with
38,501✔
483
        // r.table.cols. Moved up so the pre-fill loop can use it for the skip check.
38,501✔
484
        extraCols := r.scanSpecs.extraCols()
38,501✔
485

38,501✔
486
        for i, col := range r.colsByPos {
248,103✔
487
                // Skip NullValue pre-fill for table columns that the query does not
209,602✔
488
                // need. The nil zero-value is semantically NULL and is never accessed
209,602✔
489
                // for columns excluded by neededColIDs. Extra synthetic columns (rev,
209,602✔
490
                // txMetadata) are always included regardless of neededColIDs.
209,602✔
491
                if r.scanSpecs.neededColIDs != nil {
236,031✔
492
                        if tableIdx := i - extraCols; tableIdx >= 0 && tableIdx < len(r.table.cols) {
52,858✔
493
                                if !r.scanSpecs.neededColIDs[r.table.cols[tableIdx].id] {
36,299✔
494
                                        continue
9,870✔
495
                                }
496
                        }
497
                }
498

499
                var val TypedValue
199,732✔
500

199,732✔
501
                switch col.Column {
199,732✔
502
                case revCol:
20✔
503
                        val = &Integer{val: int64(vref.HC())}
20✔
504
                case txMetadataCol:
13✔
505
                        val, err = r.parseTxMetadata(vref.TxMetadata())
13✔
506
                        if err != nil {
15✔
507
                                return nil, err
2✔
508
                        }
2✔
509
                default:
199,699✔
510
                        val = &NullValue{t: col.Type}
199,699✔
511
                }
512

513
                valuesByPosition[i] = val
199,730✔
514
                valuesBySelector[col.Selector()] = val
199,730✔
515
        }
516

517
        if len(v) < EncLenLen {
38,499✔
518
                return nil, ErrCorruptedData
×
519
        }
×
520

521

522
        voff := 0
38,499✔
523

38,499✔
524
        cols := int(binary.BigEndian.Uint32(v[voff:]))
38,499✔
525
        voff += EncLenLen
38,499✔
526

38,499✔
527
        for i, pos := 0, 0; i < cols; i++ {
247,558✔
528
                if len(v) < EncIDLen {
209,059✔
529
                        return nil, ErrCorruptedData
×
530
                }
×
531

532
                colID := binary.BigEndian.Uint32(v[voff:])
209,059✔
533
                voff += EncIDLen
209,059✔
534

209,059✔
535
                col, err := r.table.GetColumnByID(colID)
209,059✔
536
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
209,084✔
537
                        // Dropped column, skip it
25✔
538
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
539
                        if err != nil {
25✔
540
                                return nil, err
×
541
                        }
×
542
                        voff += n + vlen
25✔
543

25✔
544
                        continue
25✔
545
                }
546
                if err != nil {
209,034✔
547
                        return nil, ErrCorruptedData
×
548
                }
×
549

550
                // Projection pushdown: skip columns not needed by the query.
551
                // We still advance voff so the byte stream stays in sync.
552
                // pos advancement is handled by the loop at the decode site below,
553
                // since that same loop also advances pos past any skipped entries.
554
                if r.scanSpecs.neededColIDs != nil && !r.scanSpecs.neededColIDs[colID] {
218,608✔
555
                        vlen, n, err := DecodeValueLength(v[voff:])
9,574✔
556
                        if err != nil {
9,574✔
557
                                return nil, err
×
558
                        }
×
559
                        voff += n + vlen
9,574✔
560
                        continue
9,574✔
561
                }
562

563
                val, n, err := DecodeValue(v[voff:], col.colType)
199,460✔
564
                if err != nil {
199,460✔
565
                        return nil, err
×
566
                }
×
567

568
                voff += n
199,460✔
569

199,460✔
570
                // make sure value is inserted in the correct position
199,460✔
571
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
203,589✔
572
                        pos++
4,129✔
573
                }
4,129✔
574

575
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
199,460✔
576
                        return nil, ErrCorruptedData
×
577
                }
×
578

579
                valuesByPosition[pos+extraCols] = val
199,460✔
580

199,460✔
581
                pos++
199,460✔
582

199,460✔
583
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
199,460✔
584
        }
585

586
        if len(v)-voff > 0 {
38,499✔
587
                return nil, ErrCorruptedData
×
588
        }
×
589

590
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
38,499✔
591
}
592

593
// CountAll iterates the scan without decoding column values, returning the
594
// number of matching index entries. Used by the COUNT(*) fast-path.
595
func (r *rawRowReader) CountAll(ctx context.Context) (int64, error) {
26✔
596
        if err := ctx.Err(); err != nil {
26✔
597
                return 0, err
×
598
        }
×
599
        if err := r.reduceTxRange(); errors.Is(err, store.ErrTxNotFound) {
26✔
600
                return 0, nil
×
601
        } else if err != nil {
26✔
602
                return 0, err
×
603
        }
×
604
        var n int64
26✔
605
        for {
1,817✔
606
                var err error
1,791✔
607
                if r.txRange == nil {
3,549✔
608
                        _, _, err = r.reader.Read(ctx)
1,758✔
609
                } else {
1,791✔
610
                        _, _, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID)
33✔
611
                }
33✔
612
                if errors.Is(err, store.ErrNoMoreEntries) {
1,817✔
613
                        return n, nil
26✔
614
                }
26✔
615
                if err != nil {
1,765✔
616
                        return 0, err
×
617
                }
×
618
                n++
1,765✔
619
        }
620
}
621

622
// CountAllWithKeyFilter iterates the index scan and counts entries that
623
// satisfy `where`, evaluating the predicate against values decoded from the
624
// index key alone (no value-reference resolution, no row payload decode).
625
//
626
// The caller must guarantee that every column referenced by `where` belongs
627
// to r.scanSpecs.Index.cols; see SelectStmt.canCountWithKeyOnly. The encoded
628
// key layout is documented in unmapIndexEntry (catalog.go) — sqlPrefix +
629
// MappedPrefix + tableID + indexID + (per-column tag+payload) + PK bytes.
630
func (r *rawRowReader) CountAllWithKeyFilter(ctx context.Context, where ValueExp) (int64, error) {
6✔
631
        if where == nil {
6✔
632
                return r.CountAll(ctx)
×
633
        }
×
634
        if err := ctx.Err(); err != nil {
6✔
635
                return 0, err
×
636
        }
×
637
        if err := r.reduceTxRange(); errors.Is(err, store.ErrTxNotFound) {
6✔
638
                return 0, nil
×
639
        } else if err != nil {
6✔
640
                return 0, err
×
641
        }
×
642

643
        index := r.scanSpecs.Index
6✔
644
        // Bytes preceding the per-column data: sqlPrefix + MappedPrefix +
6✔
645
        // tableID(4) + indexID(4). Anything after this offset is the ordered
6✔
646
        // sequence of indexed-column tag+payload runs, terminated by the PK.
6✔
647
        headerLen := len(r.tx.engine.prefix) + len(MappedPrefix) + 2*EncIDLen
6✔
648

6✔
649
        // Reusable per-iteration buffers; sparse Row keyed only by index columns.
6✔
650
        valuesBySelector := make(map[string]TypedValue, len(index.cols))
6✔
651
        row := &Row{ValuesBySelector: valuesBySelector}
6✔
652

6✔
653
        var n int64
6✔
654
        for {
187✔
655
                var (
181✔
656
                        mkey []byte
181✔
657
                        err  error
181✔
658
                )
181✔
659
                if r.txRange == nil {
362✔
660
                        mkey, _, err = r.reader.Read(ctx)
181✔
661
                } else {
181✔
662
                        mkey, _, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID)
×
663
                }
×
664
                if errors.Is(err, store.ErrNoMoreEntries) {
187✔
665
                        return n, nil
6✔
666
                }
6✔
667
                if err != nil {
175✔
668
                        return 0, err
×
669
                }
×
670

671
                if len(mkey) < headerLen {
175✔
672
                        return 0, ErrCorruptedData
×
673
                }
×
674

675
                off := headerLen
175✔
676
                // Reset map entries from the previous iteration. Re-using the map
175✔
677
                // avoids per-row allocation on a hot count loop.
175✔
678
                for k := range valuesBySelector {
665✔
679
                        delete(valuesBySelector, k)
490✔
680
                }
490✔
681

682
                for _, col := range index.cols {
678✔
683
                        val, consumed, derr := DecodeValueFromKey(mkey[off:], col.colType, col.MaxLen())
503✔
684
                        if derr != nil {
503✔
685
                                return 0, derr
×
686
                        }
×
687
                        off += consumed
503✔
688

503✔
689
                        sel := EncodeSelector("", r.tableAlias, col.colName)
503✔
690
                        valuesBySelector[sel] = val
503✔
691
                }
692

693
                cond, err := where.substitute(r.params)
175✔
694
                if err != nil {
175✔
695
                        return 0, fmt.Errorf("%w: when evaluating WHERE clause", err)
×
696
                }
×
697

698
                res, err := cond.reduce(r.tx, row, r.tableAlias)
175✔
699
                if err != nil {
175✔
700
                        return 0, fmt.Errorf("%w: when evaluating WHERE clause", err)
×
701
                }
×
702

703
                if nv, isNull := res.(*NullValue); isNull && nv.Type() == BooleanType {
175✔
704
                        continue
×
705
                }
706
                bv, isBool := res.(*Bool)
175✔
707
                if !isBool {
175✔
708
                        return 0, fmt.Errorf("%w: expected '%s' in WHERE clause, but '%s' was provided", ErrInvalidCondition, BooleanType, res.Type())
×
709
                }
×
710
                if bv.val {
276✔
711
                        n++
101✔
712
                }
101✔
713
        }
714
}
715

716
func (r *rawRowReader) parseTxMetadata(txmd *store.TxMetadata) (TypedValue, error) {
13✔
717
        if txmd == nil {
13✔
718
                return &NullValue{t: JSONType}, nil
×
719
        }
×
720

721
        if extra := txmd.Extra(); extra != nil {
26✔
722
                if r.tx.engine.parseTxMetadata == nil {
14✔
723
                        return nil, fmt.Errorf("unable to parse tx metadata")
1✔
724
                }
1✔
725

726
                md, err := r.tx.engine.parseTxMetadata(extra)
12✔
727
                if err != nil {
13✔
728
                        return nil, fmt.Errorf("%w: %s", ErrInvalidTxMetadata, err)
1✔
729
                }
1✔
730
                return &JSON{val: md}, nil
11✔
731
        }
732
        return &NullValue{t: JSONType}, nil
×
733
}
734

735
func (r *rawRowReader) Close() error {
1,300✔
736
        if r.onCloseCallback != nil {
1,876✔
737
                defer r.onCloseCallback()
576✔
738
        }
576✔
739

740
        return r.reader.Close()
1,300✔
741
}
742

743
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
143✔
744
        var rows []*Row
143✔
745
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
549✔
746
                if rows == nil {
524✔
747
                        rows = make([]*Row, 0, len(rowBatch))
118✔
748
                }
118✔
749
                rows = append(rows, rowBatch...)
406✔
750
                return nil
406✔
751
        })
752
        return rows, err
143✔
753
}
754

755
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
383✔
756
        rows := make([]*Row, batchSize)
383✔
757

383✔
758
        hasMoreRows := true
383✔
759
        for hasMoreRows {
1,078✔
760
                n, err := readNRows(ctx, reader, batchSize, rows)
695✔
761

695✔
762
                if n > 0 {
1,344✔
763
                        if err := onBatch(rows[:n]); err != nil {
649✔
764
                                return err
×
765
                        }
×
766
                }
767

768
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
695✔
769
                if err != nil && hasMoreRows {
719✔
770
                        return err
24✔
771
                }
24✔
772
        }
773
        return nil
359✔
774
}
775

776
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
695✔
777
        for i := 0; i < n; i++ {
34,165✔
778
                r, err := reader.Read(ctx)
33,470✔
779
                if err != nil {
33,853✔
780
                        return i, err
383✔
781
                }
383✔
782
                outRows[i] = r
33,087✔
783
        }
784
        return n, nil
312✔
785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc