• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25051172095

28 Apr 2026 11:51AM UTC coverage: 85.275% (-0.004%) from 85.279%
25051172095

push

gh-ci

vchaindz
chore(deps): bump grpc and logrus to patched versions

- google.golang.org/grpc v1.58.3 -> v1.79.3 in linear-fake PoC fixtures
  (closes Dependabot #182, #183 - authorization bypass via missing leading slash)
- github.com/sirupsen/logrus v1.8.1 -> v1.9.3 in root module
  (closes Dependabot #186 - DoS via Entry.Writer())

pgx v5.9.1 already includes the fix for the dollar-quoted-string SQL
injection (GHSA-mrww-27vc-gghv, patched in v5.5.4); Dependabot #187
can be dismissed in the Security tab.

44597 of 52298 relevant lines covered (85.27%)

127290.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.96
/embedded/sql/row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
type RowReader interface {
31
        Tx() *SQLTx
32
        TableAlias() string
33
        Parameters() map[string]interface{}
34
        Read(ctx context.Context) (*Row, error)
35
        Close() error
36
        Columns(ctx context.Context) ([]ColDescriptor, error)
37
        OrderBy() []ColDescriptor
38
        ScanSpecs() *ScanSpecs
39
        InferParameters(ctx context.Context, params map[string]SQLValueType) error
40
        colsBySelector(ctx context.Context) (map[string]ColDescriptor, error)
41
        onClose(func())
42
}
43

44
type ScanSpecs struct {
45
        Index             *Index
46
        rangesByColID     map[uint32]*typedValueRange
47
        IncludeHistory    bool
48
        IncludeDiff       bool
49
        IncludeTxMetadata bool
50
        DescOrder         bool
51
        groupBySortExps   []*OrdExp
52
        orderBySortExps   []*OrdExp
53
        // neededColIDs, when non-nil, restricts column decoding to the listed IDs.
54
        // Columns absent from the map are skipped (offset advanced, no allocation).
55
        // nil means decode all columns (backward-compatible default).
56
        neededColIDs map[uint32]bool
57
}
58

59
func (s *ScanSpecs) extraCols() int {
39,304✔
60
        n := 0
39,304✔
61
        if s.IncludeHistory {
39,326✔
62
                n++
22✔
63
        }
22✔
64

65
        if s.IncludeDiff {
39,304✔
66
                n++
×
67
        }
×
68

69
        if s.IncludeTxMetadata {
39,321✔
70
                n++
17✔
71
        }
17✔
72
        return n
39,304✔
73
}
74

75
type Row struct {
76
        ValuesByPosition []TypedValue
77
        ValuesBySelector map[string]TypedValue
78
}
79

80
// rows are selector-compatible if both rows have the same assigned value for all specified selectors
81
func (row *Row) compatible(aRow *Row, selectors []*ColSelector, table string) (bool, error) {
444✔
82
        for _, sel := range selectors {
762✔
83
                c := EncodeSelector(sel.resolve(table))
318✔
84

318✔
85
                val1, ok := row.ValuesBySelector[c]
318✔
86
                if !ok {
318✔
87
                        return false, ErrInvalidColumn
×
88
                }
×
89

90
                val2, ok := aRow.ValuesBySelector[c]
318✔
91
                if !ok {
318✔
92
                        return false, ErrInvalidColumn
×
93
                }
×
94

95
                cmp, err := val1.Compare(val2)
318✔
96
                if err != nil {
318✔
97
                        return false, err
×
98
                }
×
99

100
                if cmp != 0 {
449✔
101
                        return false, nil
131✔
102
                }
131✔
103
        }
104

105
        return true, nil
313✔
106
}
107

108
func (row *Row) digest(cols []ColDescriptor) (d [sha256.Size]byte, err error) {
1,177✔
109
        h := sha256.New()
1,177✔
110

1,177✔
111
        for i, v := range row.ValuesByPosition {
2,381✔
112
                var b [4]byte
1,204✔
113
                binary.BigEndian.PutUint32(b[:], uint32(i))
1,204✔
114
                h.Write(b[:])
1,204✔
115

1,204✔
116
                _, isNull := v.(*NullValue)
1,204✔
117
                if isNull {
1,206✔
118
                        continue
2✔
119
                }
120

121
                encVal, err := EncodeValue(v, v.Type(), 0)
1,202✔
122
                if err != nil {
1,202✔
123
                        return d, err
×
124
                }
×
125

126
                h.Write(encVal)
1,202✔
127
        }
128

129
        copy(d[:], h.Sum(nil))
1,177✔
130

1,177✔
131
        return
1,177✔
132
}
133

134
type rawRowReader struct {
135
        tx         *SQLTx
136
        table      *Table
137
        tableAlias string
138
        colsByPos  []ColDescriptor
139
        colsBySel  map[string]ColDescriptor
140
        scanSpecs  *ScanSpecs
141

142
        // defines a sub-range a transactions based on a combination of tx IDs and timestamps
143
        // the query is resolved only taking into consideration that range of transactioins
144
        period period
145

146
        // underlying store supports reading entries within a range of txs
147
        // the range is calculated based on the period stmt, which is included here to support
148
        // lazy evaluation when parameters are available
149
        txRange *txRange
150

151
        params map[string]interface{}
152

153
        reader          store.KeyReader
154
        onCloseCallback func()
155
}
156

157
type txRange struct {
158
        initialTxID uint64
159
        finalTxID   uint64
160
}
161

162
type ColDescriptor struct {
163
        AggFn  string
164
        Table  string
165
        Column string
166
        Type   SQLValueType
167
}
168

169
func (d *ColDescriptor) Selector() string {
257,344✔
170
        return EncodeSelector(d.AggFn, d.Table, d.Column)
257,344✔
171
}
257,344✔
172

173
type emptyKeyReader struct {
174
}
175

176
func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
×
177
        return nil, nil, store.ErrNoMoreEntries
×
178
}
×
179

180
func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
×
181
        return nil, nil, store.ErrNoMoreEntries
×
182
}
×
183

184
func (r emptyKeyReader) Reset() error {
×
185
        return nil
×
186
}
×
187

188
func (r emptyKeyReader) Close() error {
×
189
        return nil
×
190
}
×
191

192
func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
1,264✔
193
        if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
1,264✔
194
                return nil, ErrIllegalArguments
×
195
        }
×
196

197
        rSpec, err := keyReaderSpecFrom(tx.engine.prefix, table, scanSpecs)
1,264✔
198
        if err != nil {
1,264✔
199
                return nil, err
×
200
        }
×
201

202
        var r store.KeyReader
1,264✔
203

1,264✔
204
        // System tables have no storage backing — their rows come from
1,264✔
205
        // Table.systemScan via tableRef.Resolve. If anything still funnels
1,264✔
206
        // a rawRowReader at one (e.g. a direct internal call), fall back to
1,264✔
207
        // an empty reader rather than scanning storage with a key prefix
1,264✔
208
        // that has no entries.
1,264✔
209
        if table.systemScan != nil {
1,264✔
210
                r = &emptyKeyReader{}
×
211
        } else {
1,264✔
212
                r, err = tx.newKeyReader(*rSpec)
1,264✔
213
                if err != nil {
1,265✔
214
                        return nil, err
1✔
215
                }
1✔
216
        }
217

218
        if tableAlias == "" {
2,191✔
219
                tableAlias = table.name
928✔
220
        }
928✔
221

222
        nCols := len(table.cols) + scanSpecs.extraCols()
1,263✔
223

1,263✔
224
        colsByPos := make([]ColDescriptor, nCols)
1,263✔
225
        colsBySel := make(map[string]ColDescriptor, nCols)
1,263✔
226

1,263✔
227
        off := 0
1,263✔
228
        if scanSpecs.IncludeHistory {
1,265✔
229
                colDescriptor := ColDescriptor{
2✔
230
                        Table:  tableAlias,
2✔
231
                        Column: revCol,
2✔
232
                        Type:   IntegerType,
2✔
233
                }
2✔
234

2✔
235
                colsByPos[off] = colDescriptor
2✔
236
                colsBySel[colDescriptor.Selector()] = colDescriptor
2✔
237
                off++
2✔
238
        }
2✔
239

240
        if scanSpecs.IncludeTxMetadata {
1,267✔
241
                colDescriptor := ColDescriptor{
4✔
242
                        Table:  tableAlias,
4✔
243
                        Column: txMetadataCol,
4✔
244
                        Type:   JSONType,
4✔
245
                }
4✔
246

4✔
247
                colsByPos[off] = colDescriptor
4✔
248
                colsBySel[colDescriptor.Selector()] = colDescriptor
4✔
249
                off++
4✔
250
        }
4✔
251

252
        for i, c := range table.cols {
5,010✔
253
                colDescriptor := ColDescriptor{
3,747✔
254
                        Table:  tableAlias,
3,747✔
255
                        Column: c.colName,
3,747✔
256
                        Type:   c.colType,
3,747✔
257
                }
3,747✔
258

3,747✔
259
                colsByPos[off+i] = colDescriptor
3,747✔
260
                colsBySel[colDescriptor.Selector()] = colDescriptor
3,747✔
261
        }
3,747✔
262

263
        return &rawRowReader{
1,263✔
264
                tx:         tx,
1,263✔
265
                table:      table,
1,263✔
266
                period:     period,
1,263✔
267
                tableAlias: tableAlias,
1,263✔
268
                colsByPos:  colsByPos,
1,263✔
269
                colsBySel:  colsBySel,
1,263✔
270
                scanSpecs:  scanSpecs,
1,263✔
271
                params:     params,
1,263✔
272
                reader:     r,
1,263✔
273
        }, nil
1,263✔
274
}
275

276
func keyReaderSpecFrom(sqlPrefix []byte, table *Table, scanSpecs *ScanSpecs) (spec *store.KeyReaderSpec, err error) {
1,266✔
277
        prefix := MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(scanSpecs.Index.id))
1,266✔
278

1,266✔
279
        var loKey []byte
1,266✔
280
        var loKeyReady bool
1,266✔
281

1,266✔
282
        var hiKey []byte
1,266✔
283
        var hiKeyReady bool
1,266✔
284

1,266✔
285
        loKey = make([]byte, len(prefix))
1,266✔
286
        copy(loKey, prefix)
1,266✔
287

1,266✔
288
        hiKey = make([]byte, len(prefix))
1,266✔
289
        copy(hiKey, prefix)
1,266✔
290

1,266✔
291
        // seekKey and endKey in the loop below are scan prefixes for beginning
1,266✔
292
        // and end of the index scanning range. On each index we try to make them more
1,266✔
293
        // concrete.
1,266✔
294
        for _, col := range scanSpecs.Index.cols {
2,485✔
295
                colRange, ok := scanSpecs.rangesByColID[col.id]
1,219✔
296
                if !ok {
2,198✔
297
                        break
979✔
298
                }
299

300
                if !hiKeyReady {
480✔
301
                        if colRange.hRange == nil {
260✔
302
                                hiKeyReady = true
20✔
303
                        } else {
240✔
304
                                encVal, _, err := EncodeValueAsKey(colRange.hRange.val, col.colType, col.MaxLen())
220✔
305
                                if err != nil {
221✔
306
                                        return nil, err
1✔
307
                                }
1✔
308
                                hiKey = append(hiKey, encVal...)
219✔
309
                        }
310
                }
311

312
                if !loKeyReady {
478✔
313
                        if colRange.lRange == nil {
252✔
314
                                loKeyReady = true
13✔
315
                        } else {
239✔
316
                                encVal, _, err := EncodeValueAsKey(colRange.lRange.val, col.colType, col.MaxLen())
226✔
317
                                if err != nil {
227✔
318
                                        return nil, err
1✔
319
                                }
1✔
320
                                loKey = append(loKey, encVal...)
225✔
321
                        }
322
                }
323
        }
324

325
        // Ensure the hiKey is inclusive regarding all values with that prefix
326
        hiKey = append(hiKey, KeyValPrefixUpperBound)
1,264✔
327

1,264✔
328
        seekKey := loKey
1,264✔
329
        endKey := hiKey
1,264✔
330

1,264✔
331
        if scanSpecs.DescOrder {
1,315✔
332
                seekKey, endKey = endKey, seekKey
51✔
333
        }
51✔
334

335
        return &store.KeyReaderSpec{
1,264✔
336
                SeekKey:        seekKey,
1,264✔
337
                InclusiveSeek:  true,
1,264✔
338
                EndKey:         endKey,
1,264✔
339
                InclusiveEnd:   true,
1,264✔
340
                Prefix:         prefix,
1,264✔
341
                DescOrder:      scanSpecs.DescOrder,
1,264✔
342
                Filters:        []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
1,264✔
343
                IncludeHistory: scanSpecs.IncludeHistory,
1,264✔
344
        }, nil
1,264✔
345
}
346

347
func (r *rawRowReader) onClose(callback func()) {
554✔
348
        r.onCloseCallback = callback
554✔
349
}
554✔
350

351
func (r *rawRowReader) Tx() *SQLTx {
731,676✔
352
        return r.tx
731,676✔
353
}
731,676✔
354

355
func (r *rawRowReader) TableAlias() string {
1,129,840✔
356
        return r.tableAlias
1,129,840✔
357
}
1,129,840✔
358

359
func (r *rawRowReader) OrderBy() []ColDescriptor {
22✔
360
        cols := make([]ColDescriptor, len(r.scanSpecs.Index.cols))
22✔
361

22✔
362
        for i, col := range r.scanSpecs.Index.cols {
47✔
363
                cols[i] = ColDescriptor{
25✔
364
                        Table:  r.tableAlias,
25✔
365
                        Column: col.colName,
25✔
366
                        Type:   col.colType,
25✔
367
                }
25✔
368
        }
25✔
369

370
        return cols
22✔
371
}
372

373
func (r *rawRowReader) ScanSpecs() *ScanSpecs {
115✔
374
        return r.scanSpecs
115✔
375
}
115✔
376

377
func (r *rawRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
426✔
378
        ret := make([]ColDescriptor, len(r.colsByPos))
426✔
379
        copy(ret, r.colsByPos)
426✔
380
        return ret, nil
426✔
381
}
426✔
382

383
func (r *rawRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
994✔
384
        ret := make(map[string]ColDescriptor, len(r.colsBySel))
994✔
385
        for sel := range r.colsBySel {
3,834✔
386
                ret[sel] = r.colsBySel[sel]
2,840✔
387
        }
2,840✔
388
        return ret, nil
994✔
389
}
390

391
func (r *rawRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
146✔
392
        cols, err := r.colsBySelector(ctx)
146✔
393
        if err != nil {
146✔
394
                return err
×
395
        }
×
396

397
        if r.period.start != nil {
156✔
398
                _, err = r.period.start.instant.exp.inferType(cols, params, r.TableAlias())
10✔
399
                if err != nil {
10✔
400
                        return err
×
401
                }
×
402
        }
403

404
        if r.period.end != nil {
156✔
405
                _, err = r.period.end.instant.exp.inferType(cols, params, r.TableAlias())
10✔
406
                if err != nil {
10✔
407
                        return err
×
408
                }
×
409
        }
410

411
        return nil
146✔
412
}
413

414
func (r *rawRowReader) Parameters() map[string]interface{} {
199,377✔
415
        return r.params
199,377✔
416
}
199,377✔
417

418
func (r *rawRowReader) reduceTxRange() (err error) {
38,720✔
419
        if r.txRange != nil || (r.period.start == nil && r.period.end == nil) {
77,363✔
420
                return nil
38,643✔
421
        }
38,643✔
422

423
        txRange := &txRange{
77✔
424
                initialTxID: uint64(0),
77✔
425
                finalTxID:   uint64(math.MaxUint64),
77✔
426
        }
77✔
427

77✔
428
        if r.period.start != nil {
131✔
429
                txRange.initialTxID, err = r.period.start.instant.resolve(r.tx, r.params, true, r.period.start.inclusive)
54✔
430
                if err != nil {
72✔
431
                        return err
18✔
432
                }
18✔
433
        }
434

435
        if r.period.end != nil {
94✔
436
                txRange.finalTxID, err = r.period.end.instant.resolve(r.tx, r.params, false, r.period.end.inclusive)
35✔
437
                if err != nil {
38✔
438
                        return err
3✔
439
                }
3✔
440
        }
441

442
        r.txRange = txRange
56✔
443

56✔
444
        return nil
56✔
445
}
446

447
func (r *rawRowReader) Read(ctx context.Context) (*Row, error) {
38,695✔
448
        if err := ctx.Err(); err != nil {
38,695✔
449
                return nil, err
×
450
        }
×
451

452
        //var mkey []byte
453
        var vref store.ValueRef
38,695✔
454

38,695✔
455
        // evaluation of txRange is postponed to allow parameters to be provided after rowReader initialization
38,695✔
456
        err := r.reduceTxRange()
38,695✔
457
        if errors.Is(err, store.ErrTxNotFound) {
38,706✔
458
                return nil, ErrNoMoreRows
11✔
459
        }
11✔
460
        if err != nil {
38,694✔
461
                return nil, err
10✔
462
        }
10✔
463

464
        if r.txRange == nil {
77,275✔
465
                _, vref, err = r.reader.Read(ctx) //mkey
38,601✔
466
        } else {
38,674✔
467
                _, vref, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID) //mkey
73✔
468
        }
73✔
469
        if err != nil {
39,307✔
470
                return nil, err
633✔
471
        }
633✔
472

473
        v, err := vref.Resolve()
38,041✔
474
        if err != nil {
38,041✔
475
                return nil, err
×
476
        }
×
477

478
        valuesByPosition := make([]TypedValue, len(r.colsByPos))
38,041✔
479
        valuesBySelector := make(map[string]TypedValue, len(r.colsBySel))
38,041✔
480

38,041✔
481
        // extraCols is the number of synthetic leading columns (rev, txMetadata).
38,041✔
482
        // Table columns occupy r.colsByPos[extraCols:], corresponding 1-to-1 with
38,041✔
483
        // r.table.cols. Moved up so the pre-fill loop can use it for the skip check.
38,041✔
484
        extraCols := r.scanSpecs.extraCols()
38,041✔
485

38,041✔
486
        for i, col := range r.colsByPos {
245,400✔
487
                // Skip NullValue pre-fill for table columns that the query does not
207,359✔
488
                // need. The nil zero-value is semantically NULL and is never accessed
207,359✔
489
                // for columns excluded by neededColIDs. Extra synthetic columns (rev,
207,359✔
490
                // txMetadata) are always included regardless of neededColIDs.
207,359✔
491
                if r.scanSpecs.neededColIDs != nil {
231,547✔
492
                        if tableIdx := i - extraCols; tableIdx >= 0 && tableIdx < len(r.table.cols) {
48,376✔
493
                                if !r.scanSpecs.neededColIDs[r.table.cols[tableIdx].id] {
32,409✔
494
                                        continue
8,221✔
495
                                }
496
                        }
497
                }
498

499
                var val TypedValue
199,138✔
500

199,138✔
501
                switch col.Column {
199,138✔
502
                case revCol:
20✔
503
                        val = &Integer{val: int64(vref.HC())}
20✔
504
                case txMetadataCol:
13✔
505
                        val, err = r.parseTxMetadata(vref.TxMetadata())
13✔
506
                        if err != nil {
15✔
507
                                return nil, err
2✔
508
                        }
2✔
509
                default:
199,105✔
510
                        val = &NullValue{t: col.Type}
199,105✔
511
                }
512

513
                valuesByPosition[i] = val
199,136✔
514
                valuesBySelector[col.Selector()] = val
199,136✔
515
        }
516

517
        if len(v) < EncLenLen {
38,039✔
518
                return nil, ErrCorruptedData
×
519
        }
×
520

521

522
        voff := 0
38,039✔
523

38,039✔
524
        cols := int(binary.BigEndian.Uint32(v[voff:]))
38,039✔
525
        voff += EncLenLen
38,039✔
526

38,039✔
527
        for i, pos := 0, 0; i < cols; i++ {
244,860✔
528
                if len(v) < EncIDLen {
206,821✔
529
                        return nil, ErrCorruptedData
×
530
                }
×
531

532
                colID := binary.BigEndian.Uint32(v[voff:])
206,821✔
533
                voff += EncIDLen
206,821✔
534

206,821✔
535
                col, err := r.table.GetColumnByID(colID)
206,821✔
536
                if errors.Is(err, ErrColumnDoesNotExist) && colID <= r.table.maxColID {
206,846✔
537
                        // Dropped column, skip it
25✔
538
                        vlen, n, err := DecodeValueLength(v[voff:])
25✔
539
                        if err != nil {
25✔
540
                                return nil, err
×
541
                        }
×
542
                        voff += n + vlen
25✔
543

25✔
544
                        continue
25✔
545
                }
546
                if err != nil {
206,796✔
547
                        return nil, ErrCorruptedData
×
548
                }
×
549

550
                // Projection pushdown: skip columns not needed by the query.
551
                // We still advance voff so the byte stream stays in sync.
552
                // pos advancement is handled by the loop at the decode site below,
553
                // since that same loop also advances pos past any skipped entries.
554
                if r.scanSpecs.neededColIDs != nil && !r.scanSpecs.neededColIDs[colID] {
214,723✔
555
                        vlen, n, err := DecodeValueLength(v[voff:])
7,927✔
556
                        if err != nil {
7,927✔
557
                                return nil, err
×
558
                        }
×
559
                        voff += n + vlen
7,927✔
560
                        continue
7,927✔
561
                }
562

563
                val, n, err := DecodeValue(v[voff:], col.colType)
198,869✔
564
                if err != nil {
198,869✔
565
                        return nil, err
×
566
                }
×
567

568
                voff += n
198,869✔
569

198,869✔
570
                // make sure value is inserted in the correct position
198,869✔
571
                for pos < len(r.table.cols) && r.table.cols[pos].id < colID {
202,356✔
572
                        pos++
3,487✔
573
                }
3,487✔
574

575
                if pos == len(r.table.cols) || r.table.cols[pos].id != colID {
198,869✔
576
                        return nil, ErrCorruptedData
×
577
                }
×
578

579
                valuesByPosition[pos+extraCols] = val
198,869✔
580

198,869✔
581
                pos++
198,869✔
582

198,869✔
583
                valuesBySelector[EncodeSelector("", r.tableAlias, col.colName)] = val
198,869✔
584
        }
585

586
        if len(v)-voff > 0 {
38,039✔
587
                return nil, ErrCorruptedData
×
588
        }
×
589

590
        return &Row{ValuesByPosition: valuesByPosition, ValuesBySelector: valuesBySelector}, nil
38,039✔
591
}
592

593
// CountAll iterates the scan without decoding column values, returning the
594
// number of matching index entries. Used by the COUNT(*) fast-path.
595
func (r *rawRowReader) CountAll(ctx context.Context) (int64, error) {
25✔
596
        if err := ctx.Err(); err != nil {
25✔
597
                return 0, err
×
598
        }
×
599
        if err := r.reduceTxRange(); errors.Is(err, store.ErrTxNotFound) {
25✔
600
                return 0, nil
×
601
        } else if err != nil {
25✔
602
                return 0, err
×
603
        }
×
604
        var n int64
25✔
605
        for {
1,515✔
606
                var err error
1,490✔
607
                if r.txRange == nil {
2,947✔
608
                        _, _, err = r.reader.Read(ctx)
1,457✔
609
                } else {
1,490✔
610
                        _, _, err = r.reader.ReadBetween(ctx, r.txRange.initialTxID, r.txRange.finalTxID)
33✔
611
                }
33✔
612
                if errors.Is(err, store.ErrNoMoreEntries) {
1,515✔
613
                        return n, nil
25✔
614
                }
25✔
615
                if err != nil {
1,465✔
616
                        return 0, err
×
617
                }
×
618
                n++
1,465✔
619
        }
620
}
621

622
func (r *rawRowReader) parseTxMetadata(txmd *store.TxMetadata) (TypedValue, error) {
13✔
623
        if txmd == nil {
13✔
624
                return &NullValue{t: JSONType}, nil
×
625
        }
×
626

627
        if extra := txmd.Extra(); extra != nil {
26✔
628
                if r.tx.engine.parseTxMetadata == nil {
14✔
629
                        return nil, fmt.Errorf("unable to parse tx metadata")
1✔
630
                }
1✔
631

632
                md, err := r.tx.engine.parseTxMetadata(extra)
12✔
633
                if err != nil {
13✔
634
                        return nil, fmt.Errorf("%w: %s", ErrInvalidTxMetadata, err)
1✔
635
                }
1✔
636
                return &JSON{val: md}, nil
11✔
637
        }
638
        return &NullValue{t: JSONType}, nil
×
639
}
640

641
func (r *rawRowReader) Close() error {
1,271✔
642
        if r.onCloseCallback != nil {
1,824✔
643
                defer r.onCloseCallback()
553✔
644
        }
553✔
645

646
        return r.reader.Close()
1,271✔
647
}
648

649
func ReadAllRows(ctx context.Context, reader RowReader) ([]*Row, error) {
127✔
650
        var rows []*Row
127✔
651
        err := ReadRowsBatch(ctx, reader, 100, func(rowBatch []*Row) error {
517✔
652
                if rows == nil {
492✔
653
                        rows = make([]*Row, 0, len(rowBatch))
102✔
654
                }
102✔
655
                rows = append(rows, rowBatch...)
390✔
656
                return nil
390✔
657
        })
658
        return rows, err
127✔
659
}
660

661
func ReadRowsBatch(ctx context.Context, reader RowReader, batchSize int, onBatch func([]*Row) error) error {
364✔
662
        rows := make([]*Row, batchSize)
364✔
663

364✔
664
        hasMoreRows := true
364✔
665
        for hasMoreRows {
1,040✔
666
                n, err := readNRows(ctx, reader, batchSize, rows)
676✔
667

676✔
668
                if n > 0 {
1,307✔
669
                        if err := onBatch(rows[:n]); err != nil {
631✔
670
                                return err
×
671
                        }
×
672
                }
673

674
                hasMoreRows = !errors.Is(err, ErrNoMoreRows)
676✔
675
                if err != nil && hasMoreRows {
700✔
676
                        return err
24✔
677
                }
24✔
678
        }
679
        return nil
340✔
680
}
681

682
func readNRows(ctx context.Context, reader RowReader, n int, outRows []*Row) (int, error) {
676✔
683
        for i := 0; i < n; i++ {
34,090✔
684
                r, err := reader.Read(ctx)
33,414✔
685
                if err != nil {
33,778✔
686
                        return i, err
364✔
687
                }
364✔
688
                outRows[i] = r
33,050✔
689
        }
690
        return n, nil
312✔
691
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc