• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21187239412

20 Jan 2026 09:05PM UTC coverage: 71.335% (+0.2%) from 71.178%
21187239412

push

github

freeeve
refactor(reader): extract helpers to reduce cognitive complexity

Extract helper functions from findTableForPrefix, searchNextBlocks,
and scanPrefixLoop to reduce cognitive complexity below the threshold of 15.

41 of 50 new or added lines in 1 file covered. (82.0%)

242 existing lines in 4 files now uncovered.

5761 of 8076 relevant lines covered (71.33%)

404816.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.79
/cmd/tinykvs/shell_select.go
1
package main
2

3
import (
4
        "encoding/json"
5
        "fmt"
6
        "math"
7
        "os"
8
        "os/signal"
9
        "strconv"
10
        "strings"
11
        "sync/atomic"
12
        "syscall"
13
        "time"
14

15
        "github.com/blastrain/vitess-sqlparser/sqlparser"
16
        "github.com/freeeve/msgpck"
17
        "github.com/freeeve/tinykvs"
18
)
19

20
// aggType represents an aggregation function type.
21
type aggType int
22

23
const (
24
        aggCount aggType = iota
25
        aggSum
26
        aggAvg
27
        aggMin
28
        aggMax
29
)
30

31
// aggregator holds state for a streaming aggregation.
32
type aggregator struct {
33
        typ   aggType
34
        field string // field path for sum/avg/min/max (empty for count)
35
        alias string // display name
36

37
        // streaming state
38
        count    int64
39
        sum      float64
40
        min      float64
41
        max      float64
42
        hasValue bool
43
}
44

45
func (a *aggregator) update(val tinykvs.Value) {
83✔
46
        a.count++
83✔
47

83✔
48
        if a.field == "" {
107✔
49
                return // count() doesn't need field extraction
24✔
50
        }
24✔
51

52
        // Extract numeric value from field
53
        num, ok := a.extractNumeric(val)
59✔
54
        if !ok {
69✔
55
                return
10✔
56
        }
10✔
57

58
        a.sum += num
49✔
59
        if !a.hasValue {
68✔
60
                a.min = num
19✔
61
                a.max = num
19✔
62
                a.hasValue = true
19✔
63
        } else {
49✔
64
                if num < a.min {
40✔
65
                        a.min = num
10✔
66
                }
10✔
67
                if num > a.max {
48✔
68
                        a.max = num
18✔
69
                }
18✔
70
        }
71
}
72

73
func (a *aggregator) extractNumeric(val tinykvs.Value) (float64, bool) {
59✔
74
        // For non-record types, use the value directly
59✔
75
        if a.field == "" {
59✔
76
                switch val.Type {
×
77
                case tinykvs.ValueTypeInt64:
×
78
                        return float64(val.Int64), true
×
79
                case tinykvs.ValueTypeFloat64:
×
80
                        return val.Float64, true
×
81
                default:
×
82
                        return 0, false
×
83
                }
84
        }
85

86
        // For record/msgpack types, extract the field
87
        var record map[string]any
59✔
88
        var err error
59✔
89
        switch val.Type {
59✔
90
        case tinykvs.ValueTypeRecord:
×
91
                record = val.Record
×
92
        case tinykvs.ValueTypeMsgpack:
59✔
93
                record, err = msgpck.UnmarshalMapStringAny(val.Bytes, false)
59✔
94
                if err != nil {
59✔
95
                        return 0, false
×
96
                }
×
97
        default:
×
98
                return 0, false
×
99
        }
100
        if record == nil {
59✔
101
                return 0, false
×
102
        }
×
103

104
        fieldVal, ok := extractNestedField(record, a.field)
59✔
105
        if !ok {
69✔
106
                return 0, false
10✔
107
        }
10✔
108

109
        switch v := fieldVal.(type) {
49✔
110
        case int:
×
111
                return float64(v), true
×
112
        case int8:
×
113
                return float64(v), true
×
114
        case int16:
×
115
                return float64(v), true
×
116
        case int32:
×
117
                return float64(v), true
×
118
        case int64:
11✔
119
                return float64(v), true
11✔
120
        case uint:
×
121
                return float64(v), true
×
122
        case uint8:
×
123
                return float64(v), true
×
124
        case uint16:
×
125
                return float64(v), true
×
126
        case uint32:
×
127
                return float64(v), true
×
128
        case uint64:
×
129
                return float64(v), true
×
130
        case float32:
×
131
                return float64(v), true
×
132
        case float64:
38✔
133
                return v, true
38✔
134
        default:
×
135
                return 0, false
×
136
        }
137
}
138

139
func (a *aggregator) result() string {
32✔
140
        switch a.typ {
32✔
141
        case aggCount:
9✔
142
                return fmt.Sprintf("%d", a.count)
9✔
143
        case aggSum:
8✔
144
                if !a.hasValue {
10✔
145
                        return "NULL"
2✔
146
                }
2✔
147
                return formatNumber(a.sum)
6✔
148
        case aggAvg:
7✔
149
                if a.count == 0 || !a.hasValue {
9✔
150
                        return "NULL"
2✔
151
                }
2✔
152
                return formatNumber(a.sum / float64(a.count))
5✔
153
        case aggMin:
4✔
154
                if !a.hasValue {
4✔
155
                        return "NULL"
×
156
                }
×
157
                return formatNumber(a.min)
4✔
158
        case aggMax:
4✔
159
                if !a.hasValue {
4✔
160
                        return "NULL"
×
161
                }
×
162
                return formatNumber(a.max)
4✔
163
        default:
×
164
                return "NULL"
×
165
        }
166
}
167

168
func formatNumber(f float64) string {
19✔
169
        if f == math.Trunc(f) {
34✔
170
                return fmt.Sprintf("%.0f", f)
15✔
171
        }
15✔
172
        return fmt.Sprintf("%g", f)
4✔
173
}
174

175
// valueFilter represents a filter condition on a value field
176
type valueFilter struct {
177
        field    string // e.g., "ttl", "name.first"
178
        operator string // "=", "like", ">", "<", ">=", "<="
179
        value    string // the comparison value
180
}
181

182
func (vf *valueFilter) matches(val tinykvs.Value) bool {
×
183
        // Extract the record
×
184
        var record map[string]any
×
185
        var err error
×
186
        switch val.Type {
×
187
        case tinykvs.ValueTypeRecord:
×
188
                record = val.Record
×
189
        case tinykvs.ValueTypeMsgpack:
×
190
                record, err = msgpck.UnmarshalMapStringAny(val.Bytes, false)
×
191
                if err != nil {
×
192
                        return false
×
193
                }
×
194
        default:
×
195
                return false
×
196
        }
197

198
        // Get field value
199
        fieldVal, ok := extractNestedField(record, vf.field)
×
200
        if !ok {
×
201
                return false
×
202
        }
×
203

204
        fieldStr := fmt.Sprintf("%v", fieldVal)
×
205

×
206
        switch vf.operator {
×
207
        case "=":
×
208
                return fieldStr == vf.value
×
209
        case "!=", "<>":
×
210
                return fieldStr != vf.value
×
211
        case "like":
×
212
                // Only prefix matching supported
×
213
                if strings.HasSuffix(vf.value, "%") {
×
214
                        prefix := vf.value[:len(vf.value)-1]
×
215
                        return strings.HasPrefix(fieldStr, prefix)
×
216
                }
×
217
                return fieldStr == vf.value
×
218
        case ">":
×
219
                return fieldStr > vf.value
×
220
        case "<":
×
221
                return fieldStr < vf.value
×
222
        case ">=":
×
223
                return fieldStr >= vf.value
×
224
        case "<=":
×
225
                return fieldStr <= vf.value
×
226
        default:
×
227
                return true
×
228
        }
229
}
230

231
// selectContext holds state for a SELECT query execution.
232
type selectContext struct {
233
        keyEquals    string
234
        keyPrefix    string
235
        keyStart     string
236
        keyEnd       string
237
        valueFilters []*valueFilter
238
        limit        int
239
        fields       []string
240
        aggs         []*aggregator
241
        headers      []string
242
        orderBy      []SortOrder
243
        scanned      int64
244
        scanStats    tinykvs.ScanStats
245
        matchCount   int
246
        bufferedRows [][]string
247
        lastProgress time.Time
248
        startTime    time.Time
249
        interrupted  int32
250
        scanErr      error
251
}
252

253
func (s *Shell) handleSelect(stmt *sqlparser.Select, orderBy []SortOrder) {
132✔
254
        ctx := s.parseSelectStatement(stmt, orderBy)
132✔
255

132✔
256
        sigChan := setupInterruptHandler(&ctx.interrupted)
132✔
257
        defer signal.Stop(sigChan)
132✔
258

132✔
259
        progressCallback := ctx.createProgressCallback()
132✔
260
        processRow := ctx.createRowProcessor()
132✔
261
        safeProcessRow := ctx.wrapRowProcessor(processRow)
132✔
262

132✔
263
        err := s.executeScan(ctx, safeProcessRow, progressCallback)
132✔
264

132✔
265
        ctx.clearProgressLine()
132✔
266

132✔
267
        wasInterrupted := atomic.LoadInt32(&ctx.interrupted) != 0
132✔
268
        if wasInterrupted {
132✔
269
                fmt.Fprintf(os.Stderr, "\r%s\r", strings.Repeat(" ", 80))
×
270
                fmt.Println("^C")
×
271
        }
×
272

273
        if ctx.scanErr != nil {
132✔
274
                fmt.Printf("Scan error: %v\n", ctx.scanErr)
×
275
                return
×
276
        }
×
277
        if err != nil {
136✔
278
                fmt.Printf("Error: %v\n", err)
4✔
279
                return
4✔
280
        }
4✔
281

282
        ctx.renderResults()
128✔
283
        ctx.reportStatistics(wasInterrupted)
128✔
284
}
285

286
func (s *Shell) parseSelectStatement(stmt *sqlparser.Select, orderBy []SortOrder) *selectContext {
132✔
287
        ctx := &selectContext{
132✔
288
                limit:     100,
132✔
289
                orderBy:   orderBy,
132✔
290
                startTime: time.Now(),
132✔
291
        }
132✔
292

132✔
293
        ctx.fields, ctx.aggs = parseSelectExpressions(stmt.SelectExprs)
132✔
294

132✔
295
        if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
138✔
296
                if val, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok {
12✔
297
                        if n, err := strconv.Atoi(string(val.Val)); err == nil {
12✔
298
                                ctx.limit = n
6✔
299
                        }
6✔
300
                }
301
        }
302

303
        if stmt.Where != nil {
231✔
304
                s.parseWhere(stmt.Where.Expr, &ctx.keyEquals, &ctx.keyPrefix, &ctx.keyStart, &ctx.keyEnd, &ctx.valueFilters)
99✔
305
        }
99✔
306

307
        if len(ctx.fields) > 0 {
163✔
308
                ctx.headers = append([]string{"k"}, ctx.fields...)
31✔
309
        } else {
132✔
310
                ctx.headers = []string{"k", "v"}
101✔
311
        }
101✔
312

313
        return ctx
132✔
314
}
315

316
func parseSelectExpressions(exprs sqlparser.SelectExprs) ([]string, []*aggregator) {
132✔
317
        var fields []string
132✔
318
        var aggs []*aggregator
132✔
319

132✔
320
        for _, expr := range exprs {
285✔
321
                switch e := expr.(type) {
153✔
322
                case *sqlparser.AliasedExpr:
71✔
323
                        if agg := tryParseAggregate(e); agg != nil {
100✔
324
                                aggs = append(aggs, agg)
29✔
325
                        } else if field := tryParseColName(e); field != "" {
113✔
326
                                fields = append(fields, field)
42✔
327
                        }
42✔
328
                case *sqlparser.StarExpr:
82✔
329
                        fields = nil
82✔
330
                }
331
        }
332
        return fields, aggs
132✔
333
}
334

335
// tryParseAggregate attempts to parse an aliased expression as an aggregate function.
336
func tryParseAggregate(e *sqlparser.AliasedExpr) *aggregator {
71✔
337
        funcExpr, ok := e.Expr.(*sqlparser.FuncExpr)
71✔
338
        if !ok {
113✔
339
                return nil
42✔
340
        }
42✔
341
        return parseAggregateFunc(funcExpr)
29✔
342
}
343

344
// tryParseColName attempts to parse an aliased expression as a column name.
345
// Returns the field name with proper qualification, or empty string if not a column.
346
func tryParseColName(e *sqlparser.AliasedExpr) string {
42✔
347
        col, ok := e.Expr.(*sqlparser.ColName)
42✔
348
        if !ok {
42✔
UNCOV
349
                return ""
×
UNCOV
350
        }
×
351
        qualifier := strings.ToLower(col.Qualifier.Name.String())
42✔
352
        fieldName := col.Name.String()
42✔
353
        if qualifier == "v" {
76✔
354
                return fieldName
34✔
355
        }
34✔
356
        if qualifier != "" && qualifier != "kv" {
16✔
357
                return qualifier + "." + fieldName
8✔
358
        }
8✔
359
        return ""
×
360
}
361

362
func setupInterruptHandler(interrupted *int32) chan os.Signal {
132✔
363
        sigChan := make(chan os.Signal, 1)
132✔
364
        signal.Notify(sigChan, syscall.SIGINT)
132✔
365
        go func() {
264✔
366
                <-sigChan
132✔
367
                atomic.StoreInt32(interrupted, 1)
132✔
368
        }()
132✔
369
        return sigChan
132✔
370
}
371

372
func (ctx *selectContext) createProgressCallback() tinykvs.ScanProgress {
132✔
373
        return func(stats tinykvs.ScanStats) bool {
132✔
UNCOV
374
                if atomic.LoadInt32(&ctx.interrupted) != 0 {
×
UNCOV
375
                        return false
×
UNCOV
376
                }
×
UNCOV
377
                ctx.scanStats = stats
×
UNCOV
378
                if time.Since(ctx.lastProgress) > time.Second {
×
UNCOV
379
                        elapsed := time.Since(ctx.startTime)
×
380
                        rate := int64(float64(stats.KeysExamined) / elapsed.Seconds())
×
381
                        fmt.Fprintf(os.Stderr, "\rScanned %s keys (%s blocks) in %s (%s keys/sec)...    ",
×
UNCOV
382
                                formatIntCommas(stats.KeysExamined), formatIntCommas(stats.BlocksLoaded),
×
UNCOV
383
                                formatDuration(elapsed), formatIntCommas(rate))
×
UNCOV
384
                        ctx.lastProgress = time.Now()
×
385
                }
×
386
                return true
×
387
        }
388
}
389

390
func (ctx *selectContext) createRowProcessor() func([]byte, tinykvs.Value) bool {
132✔
391
        isAggregate := len(ctx.aggs) > 0
132✔
392
        hasOrderBy := len(ctx.orderBy) > 0
132✔
393

132✔
394
        return func(key []byte, val tinykvs.Value) bool {
359✔
395
                if ctx.isInterrupted() {
227✔
UNCOV
396
                        return false
×
UNCOV
397
                }
×
398
                ctx.scanned++
227✔
399
                ctx.maybeShowFilterProgress()
227✔
400

227✔
401
                if !isAggregate && !hasOrderBy && ctx.matchCount >= ctx.limit {
229✔
402
                        return false
2✔
403
                }
2✔
404
                if !ctx.matchesFilters(val) {
225✔
UNCOV
405
                        return true
×
UNCOV
406
                }
×
407
                ctx.processMatch(key, val, isAggregate)
225✔
408
                return true
225✔
409
        }
410
}
411

412
// isInterrupted checks if query execution was interrupted.
413
func (ctx *selectContext) isInterrupted() bool {
227✔
414
        return atomic.LoadInt32(&ctx.interrupted) != 0
227✔
415
}
227✔
416

417
// maybeShowFilterProgress prints progress if filters are active and enough time passed.
418
func (ctx *selectContext) maybeShowFilterProgress() {
227✔
419
        if len(ctx.valueFilters) > 0 && time.Since(ctx.lastProgress) > time.Second {
227✔
UNCOV
420
                ctx.printFilterProgress()
×
UNCOV
421
        }
×
422
}
423

424
// matchesFilters checks if value passes all value filters.
425
func (ctx *selectContext) matchesFilters(val tinykvs.Value) bool {
225✔
426
        for _, vf := range ctx.valueFilters {
225✔
UNCOV
427
                if !vf.matches(val) {
×
UNCOV
428
                        return false
×
UNCOV
429
                }
×
430
        }
431
        return true
225✔
432
}
433

434
// processMatch handles a matching row by updating aggregates or buffering the row.
435
func (ctx *selectContext) processMatch(key []byte, val tinykvs.Value, isAggregate bool) {
225✔
436
        if isAggregate {
285✔
437
                for _, agg := range ctx.aggs {
143✔
438
                        agg.update(val)
83✔
439
                }
83✔
440
        } else {
165✔
441
                row := extractRowFields(key, val, ctx.fields)
165✔
442
                ctx.bufferedRows = append(ctx.bufferedRows, row)
165✔
443
                ctx.matchCount++
165✔
444
        }
165✔
445
}
446

UNCOV
447
func (ctx *selectContext) printFilterProgress() {
×
UNCOV
448
        elapsed := time.Since(ctx.startTime)
×
UNCOV
449
        rate := int64(float64(ctx.scanned) / elapsed.Seconds())
×
UNCOV
450
        fmt.Fprintf(os.Stderr, "\rScanned %s keys (%s blocks) in %s (%s keys/sec), found %d matches...    ",
×
UNCOV
451
                formatIntCommas(ctx.scanned), formatIntCommas(ctx.scanStats.BlocksLoaded),
×
UNCOV
452
                formatDuration(elapsed), formatIntCommas(rate), ctx.matchCount)
×
UNCOV
453
        ctx.lastProgress = time.Now()
×
UNCOV
454
}
×
455

456
func (ctx *selectContext) wrapRowProcessor(processRow func([]byte, tinykvs.Value) bool) func([]byte, tinykvs.Value) bool {
132✔
457
        return func(key []byte, val tinykvs.Value) bool {
359✔
458
                defer func() {
454✔
459
                        if r := recover(); r != nil {
227✔
UNCOV
460
                                ctx.scanErr = fmt.Errorf("panic processing key %x: %v", key[:min(8, len(key))], r)
×
UNCOV
461
                        }
×
462
                }()
463
                return processRow(key, val)
227✔
464
        }
465
}
466

467
func (s *Shell) executeScan(ctx *selectContext, processRow func([]byte, tinykvs.Value) bool, progress tinykvs.ScanProgress) error {
132✔
468
        if ctx.keyEquals != "" {
206✔
469
                return s.executePointLookup(ctx, processRow)
74✔
470
        }
74✔
471
        if ctx.keyPrefix != "" {
73✔
472
                var err error
15✔
473
                ctx.scanStats, err = s.store.ScanPrefixWithStats([]byte(ctx.keyPrefix), processRow, progress)
15✔
474
                return err
15✔
475
        }
15✔
476
        if ctx.keyStart != "" && ctx.keyEnd != "" {
48✔
477
                return s.store.ScanRange([]byte(ctx.keyStart), []byte(ctx.keyEnd), processRow)
5✔
478
        }
5✔
479
        var err error
38✔
480
        ctx.scanStats, err = s.store.ScanPrefixWithStats(nil, processRow, progress)
38✔
481
        return err
38✔
482
}
483

484
func (s *Shell) executePointLookup(ctx *selectContext, processRow func([]byte, tinykvs.Value) bool) error {
74✔
485
        val, err := s.store.Get([]byte(ctx.keyEquals))
74✔
486
        if err == tinykvs.ErrKeyNotFound {
89✔
487
                if len(ctx.aggs) > 0 {
16✔
488
                        printAggregateResults(ctx.aggs)
1✔
489
                } else {
15✔
490
                        printTable(ctx.headers, nil)
14✔
491
                        fmt.Printf("(0 rows)\n")
14✔
492
                }
14✔
493
                return nil
15✔
494
        }
495
        if err != nil {
60✔
496
                return err
1✔
497
        }
1✔
498
        processRow([]byte(ctx.keyEquals), val)
58✔
499
        return nil
58✔
500
}
501

502
func (ctx *selectContext) clearProgressLine() {
132✔
503
        hasValueFilters := len(ctx.valueFilters) > 0
132✔
504
        if (hasValueFilters || ctx.scanned > 10000) && ctx.scanned > 0 {
132✔
505
                fmt.Fprintf(os.Stderr, "\r%s\r", strings.Repeat(" ", 80))
×
UNCOV
506
        }
×
507
}
508

509
func (ctx *selectContext) renderResults() {
128✔
510
        if len(ctx.aggs) > 0 {
147✔
511
                printAggregateResults(ctx.aggs)
19✔
512
                return
19✔
513
        }
19✔
514

515
        if len(ctx.orderBy) > 0 {
119✔
516
                SortRows(ctx.headers, ctx.bufferedRows, ctx.orderBy)
10✔
517
        }
10✔
518

519
        if len(ctx.bufferedRows) > ctx.limit {
109✔
UNCOV
520
                ctx.bufferedRows = ctx.bufferedRows[:ctx.limit]
×
521
        }
×
522
        ctx.matchCount = len(ctx.bufferedRows)
109✔
523

109✔
524
        printTable(ctx.headers, ctx.bufferedRows)
109✔
525
}
526

527
func (ctx *selectContext) reportStatistics(wasInterrupted bool) {
128✔
528
        if ctx.scanned == 0 && ctx.scanStats.BlocksLoaded == 0 {
150✔
529
                fmt.Printf("(%d rows)\n", ctx.matchCount)
22✔
530
                return
22✔
531
        }
22✔
532

533
        elapsed := time.Since(ctx.startTime)
106✔
534
        rate := float64(ctx.scanned) / elapsed.Seconds()
106✔
535

106✔
536
        if wasInterrupted {
106✔
UNCOV
537
                fmt.Printf("(%d rows) - interrupted, scanned %s keys (%s blocks) in %s (%s keys/sec)\n",
×
UNCOV
538
                        ctx.matchCount, formatIntCommas(ctx.scanned),
×
UNCOV
539
                        formatIntCommas(ctx.scanStats.BlocksLoaded), formatDuration(elapsed), formatIntCommas(int64(rate)))
×
540
                return
×
541
        }
×
542

543
        blockDetails := fmt.Sprintf("%d blocks", ctx.scanStats.BlocksLoaded)
106✔
544
        if ctx.scanStats.BlocksCacheHit > 0 || ctx.scanStats.BlocksDiskRead > 0 {
107✔
545
                blockDetails = fmt.Sprintf("%d blocks (%d cache, %d disk)",
1✔
546
                        ctx.scanStats.BlocksLoaded, ctx.scanStats.BlocksCacheHit, ctx.scanStats.BlocksDiskRead)
1✔
547
        }
1✔
548
        tableDetails := ""
106✔
549
        if ctx.scanStats.TablesChecked > 0 {
107✔
550
                tableDetails = fmt.Sprintf(", %d/%d tables", ctx.scanStats.TablesAdded, ctx.scanStats.TablesChecked)
1✔
551
        }
1✔
552
        fmt.Printf("(%d rows) scanned %s keys, %s%s, %s\n",
106✔
553
                ctx.matchCount, formatIntCommas(ctx.scanned), blockDetails, tableDetails, formatDuration(elapsed))
106✔
554
}
555

556
// printStreamingHeader prints column headers for streaming output
UNCOV
557
func printStreamingHeader(headers []string) {
×
558
        for i, h := range headers {
×
559
                if i > 0 {
×
UNCOV
560
                        fmt.Print("\t")
×
UNCOV
561
                }
×
UNCOV
562
                fmt.Print(h)
×
563
        }
UNCOV
564
        fmt.Println()
×
UNCOV
565
        // Print separator
×
UNCOV
566
        for i, h := range headers {
×
UNCOV
567
                if i > 0 {
×
UNCOV
568
                        fmt.Print("\t")
×
UNCOV
569
                }
×
UNCOV
570
                fmt.Print(strings.Repeat("-", len(h)))
×
571
        }
UNCOV
572
        fmt.Println()
×
573
}
574

575
// printStreamingRow prints a single row for streaming output
UNCOV
576
func printStreamingRow(row []string) {
×
UNCOV
577
        for i, cell := range row {
×
UNCOV
578
                if i > 0 {
×
UNCOV
579
                        fmt.Print("\t")
×
UNCOV
580
                }
×
581
                // Truncate long values
UNCOV
582
                if len(cell) > 60 {
×
UNCOV
583
                        fmt.Print(cell[:57] + "...")
×
UNCOV
584
                } else {
×
UNCOV
585
                        fmt.Print(cell)
×
UNCOV
586
                }
×
587
        }
UNCOV
588
        fmt.Println()
×
589
}
590

591
// printTable prints rows in DuckDB-style box format
592
func printTable(headers []string, rows [][]string) {
123✔
593
        if len(headers) == 0 {
123✔
UNCOV
594
                return
×
UNCOV
595
        }
×
596

597
        widths := calculateColumnWidths(headers, rows)
123✔
598

123✔
599
        printBoxLine(widths, "┌", "┬", "┐")
123✔
600
        printTableRow(headers, widths)
123✔
601
        printBoxLine(widths, "├", "┼", "┤")
123✔
602
        for _, row := range rows {
288✔
603
                printTableRow(row, widths)
165✔
604
        }
165✔
605
        printBoxLine(widths, "└", "┴", "┘")
123✔
606
}
607

608
// calculateColumnWidths computes column widths based on headers and data rows.
609
func calculateColumnWidths(headers []string, rows [][]string) []int {
123✔
610
        widths := make([]int, len(headers))
123✔
611
        for i, h := range headers {
380✔
612
                widths[i] = len(h)
257✔
613
        }
257✔
614
        for _, row := range rows {
288✔
615
                updateRowWidths(widths, row)
165✔
616
        }
165✔
617
        capWidths(widths, 50)
123✔
618
        return widths
123✔
619
}
620

621
// updateRowWidths updates widths based on a single row's cell lengths.
622
func updateRowWidths(widths []int, row []string) {
165✔
623
        for i, cell := range row {
512✔
624
                if i < len(widths) && len(cell) > widths[i] {
495✔
625
                        widths[i] = len(cell)
148✔
626
                }
148✔
627
        }
628
}
629

630
// capWidths limits all widths to the specified maximum.
631
func capWidths(widths []int, maxWidth int) {
123✔
632
        for i := range widths {
380✔
633
                if widths[i] > maxWidth {
259✔
634
                        widths[i] = maxWidth
2✔
635
                }
2✔
636
        }
637
}
638

639
// printTableRow prints a single table row with proper formatting.
640
func printTableRow(cells []string, widths []int) {
288✔
641
        fmt.Print("│")
288✔
642
        for i, w := range widths {
892✔
643
                cell := ""
604✔
644
                if i < len(cells) {
1,208✔
645
                        cell = cells[i]
604✔
646
                }
604✔
647
                fmt.Printf(" %-*s │", w, truncate(cell, w))
604✔
648
        }
649
        fmt.Println()
288✔
650
}
651

652
func printBoxLine(widths []int, left, mid, right string) {
369✔
653
        fmt.Print(left)
369✔
654
        for i, w := range widths {
1,140✔
655
                fmt.Print(strings.Repeat("─", w+2))
771✔
656
                if i < len(widths)-1 {
1,173✔
657
                        fmt.Print(mid)
402✔
658
                }
402✔
659
        }
660
        fmt.Println(right)
369✔
661
}
662

663
func truncate(s string, maxLen int) string {
604✔
664
        if len(s) <= maxLen {
1,206✔
665
                return s
602✔
666
        }
602✔
667
        if maxLen <= 3 {
2✔
UNCOV
668
                return s[:maxLen]
×
UNCOV
669
        }
×
670
        return s[:maxLen-3] + "..."
2✔
671
}
672

673
func parseAggregateFunc(funcExpr *sqlparser.FuncExpr) *aggregator {
29✔
674
        funcName := strings.ToLower(funcExpr.Name.String())
29✔
675

29✔
676
        var typ aggType
29✔
677
        switch funcName {
29✔
678
        case "count":
8✔
679
                typ = aggCount
8✔
680
        case "sum":
7✔
681
                typ = aggSum
7✔
682
        case "avg":
6✔
683
                typ = aggAvg
6✔
684
        case "min":
4✔
685
                typ = aggMin
4✔
686
        case "max":
4✔
687
                typ = aggMax
4✔
UNCOV
688
        default:
×
UNCOV
689
                return nil
×
690
        }
691

692
        agg := &aggregator{typ: typ}
29✔
693

29✔
694
        // Extract field argument for sum/avg/min/max
29✔
695
        if len(funcExpr.Exprs) > 0 {
50✔
696
                if aliased, ok := funcExpr.Exprs[0].(*sqlparser.AliasedExpr); ok {
42✔
697
                        if col, ok := aliased.Expr.(*sqlparser.ColName); ok {
42✔
698
                                qualifier := strings.ToLower(col.Qualifier.Name.String())
21✔
699
                                fieldName := col.Name.String()
21✔
700

21✔
701
                                if qualifier == "v" {
40✔
702
                                        agg.field = fieldName
19✔
703
                                } else if qualifier != "" && qualifier != "kv" {
23✔
704
                                        agg.field = qualifier + "." + fieldName
2✔
705
                                }
2✔
706
                        }
707
                }
708
        }
709

710
        // Build alias for display
711
        if agg.field != "" {
50✔
712
                agg.alias = fmt.Sprintf("%s(v.%s)", funcName, agg.field)
21✔
713
        } else {
29✔
714
                agg.alias = fmt.Sprintf("%s()", funcName)
8✔
715
        }
8✔
716

717
        return agg
29✔
718
}
719

720
func printAggregateResults(aggs []*aggregator) {
20✔
721
        // Print header
20✔
722
        headers := make([]string, len(aggs))
20✔
723
        for i, agg := range aggs {
52✔
724
                headers[i] = agg.alias
32✔
725
        }
32✔
726
        fmt.Println(strings.Join(headers, " | "))
20✔
727

20✔
728
        // Print separator
20✔
729
        seps := make([]string, len(aggs))
20✔
730
        for i, agg := range aggs {
52✔
731
                seps[i] = strings.Repeat("-", len(agg.alias))
32✔
732
        }
32✔
733
        fmt.Println(strings.Join(seps, "-+-"))
20✔
734

20✔
735
        // Print values
20✔
736
        values := make([]string, len(aggs))
20✔
737
        for i, agg := range aggs {
52✔
738
                result := agg.result()
32✔
739
                // Pad to match header width
32✔
740
                if len(result) < len(agg.alias) {
61✔
741
                        result = strings.Repeat(" ", len(agg.alias)-len(result)) + result
29✔
742
                }
29✔
743
                values[i] = result
32✔
744
        }
745
        fmt.Println(strings.Join(values, " | "))
20✔
746
}
747

748
// extractRowFields extracts field values as strings for tabular display
749
func extractRowFields(key []byte, val tinykvs.Value, fields []string) []string {
166✔
750
        keyStr := formatKey(key)
166✔
751

166✔
752
        if len(fields) == 0 {
292✔
753
                // SELECT * - return key and full value
126✔
754
                return []string{keyStr, formatValue(val)}
126✔
755
        }
126✔
756

757
        // Extract specific fields
758
        row := []string{keyStr}
40✔
759

40✔
760
        var record map[string]any
40✔
761
        var err error
40✔
762
        switch val.Type {
40✔
763
        case tinykvs.ValueTypeRecord:
2✔
764
                record = val.Record
2✔
765
        case tinykvs.ValueTypeMsgpack:
36✔
766
                record, err = msgpck.UnmarshalMapStringAny(val.Bytes, false)
36✔
767
                if err != nil {
36✔
768
                        // Can't decode, return NULLs for all fields
×
769
                        for range fields {
×
UNCOV
770
                                row = append(row, "NULL")
×
UNCOV
771
                        }
×
UNCOV
772
                        return row
×
773
                }
774
        default:
2✔
775
                // Non-record type, return NULLs for all fields
2✔
776
                for range fields {
4✔
777
                        row = append(row, "NULL")
2✔
778
                }
2✔
779
                return row
2✔
780
        }
781

782
        for _, field := range fields {
93✔
783
                if v, ok := extractNestedField(record, field); ok {
99✔
784
                        row = append(row, fmt.Sprintf("%v", v))
44✔
785
                } else {
55✔
786
                        row = append(row, "NULL")
11✔
787
                }
11✔
788
        }
789
        return row
38✔
790
}
791

792
func formatValue(val tinykvs.Value) string {
126✔
793
        switch val.Type {
126✔
UNCOV
794
        case tinykvs.ValueTypeInt64:
×
UNCOV
795
                return fmt.Sprintf("%d", val.Int64)
×
796
        case tinykvs.ValueTypeFloat64:
1✔
797
                return fmt.Sprintf("%f", val.Float64)
1✔
798
        case tinykvs.ValueTypeBool:
1✔
799
                return fmt.Sprintf("%t", val.Bool)
1✔
800
        case tinykvs.ValueTypeString, tinykvs.ValueTypeBytes:
83✔
801
                if len(val.Bytes) > 100 {
84✔
802
                        return string(val.Bytes[:100]) + "..."
1✔
803
                }
1✔
804
                return string(val.Bytes)
82✔
805
        case tinykvs.ValueTypeRecord:
1✔
806
                if val.Record == nil {
2✔
807
                        return "{}"
1✔
808
                }
1✔
UNCOV
809
                jsonBytes, _ := json.Marshal(val.Record)
×
UNCOV
810
                return string(jsonBytes)
×
811
        case tinykvs.ValueTypeMsgpack:
39✔
812
                record, err := msgpck.UnmarshalMapStringAny(val.Bytes, false)
39✔
813
                if err != nil {
39✔
UNCOV
814
                        return "(msgpack)"
×
UNCOV
815
                }
×
816
                jsonBytes, _ := json.Marshal(record)
39✔
817
                return string(jsonBytes)
39✔
818
        default:
1✔
819
                return "(unknown)"
1✔
820
        }
821
}
822

823
func (s *Shell) parseWhere(expr sqlparser.Expr, keyEquals, keyPrefix, keyStart, keyEnd *string, valueFilters *[]*valueFilter) {
120✔
824
        switch e := expr.(type) {
120✔
825
        case *sqlparser.ComparisonExpr:
110✔
826
                s.parseComparisonExpr(e, keyEquals, keyPrefix, keyStart, keyEnd, valueFilters)
110✔
827
        case *sqlparser.RangeCond:
7✔
828
                parseRangeCond(e, keyStart, keyEnd)
7✔
829
        case *sqlparser.AndExpr:
2✔
830
                s.parseWhere(e.Left, keyEquals, keyPrefix, keyStart, keyEnd, valueFilters)
2✔
831
                s.parseWhere(e.Right, keyEquals, keyPrefix, keyStart, keyEnd, valueFilters)
2✔
832
        }
833
}
834

835
// parseComparisonExpr handles a WHERE comparison expression.
836
func (s *Shell) parseComparisonExpr(e *sqlparser.ComparisonExpr, keyEquals, keyPrefix, keyStart, keyEnd *string, valueFilters *[]*valueFilter) {
110✔
837
        col, ok := e.Left.(*sqlparser.ColName)
110✔
838
        if !ok {
110✔
UNCOV
839
                return
×
UNCOV
840
        }
×
841

842
        qualifier := strings.ToLower(col.Qualifier.Name.String())
110✔
843
        colName := strings.ToLower(col.Name.String())
110✔
844

110✔
845
        if isKeyColumn(colName, qualifier) {
220✔
846
                parseKeyFilter(e, keyEquals, keyPrefix, keyStart, keyEnd)
110✔
847
        } else {
110✔
848
                parseValueFieldFilter(e, qualifier, colName, valueFilters)
×
849
        }
×
850
}
851

852
// isKeyColumn returns true if the column represents the key (k).
853
func isKeyColumn(colName, qualifier string) bool {
110✔
854
        return (colName == "k" && qualifier == "") || (colName == "k" && qualifier == "kv")
110✔
855
}
110✔
856

857
// parseKeyFilter handles key-based WHERE conditions.
858
func parseKeyFilter(e *sqlparser.ComparisonExpr, keyEquals, keyPrefix, keyStart, keyEnd *string) {
110✔
859
        val, isHexPrefix := extractValueForLike(e.Right, e.Operator)
110✔
860
        switch e.Operator {
110✔
861
        case "=":
83✔
862
                *keyEquals = val
83✔
863
        case "like":
20✔
864
                parseKeyLikeFilter(val, isHexPrefix, keyPrefix)
20✔
865
        case ">=":
4✔
866
                *keyStart = val
4✔
867
        case "<=":
3✔
868
                *keyEnd = val
3✔
869
        }
870
}
871

872
// parseKeyLikeFilter handles LIKE conditions on keys.
873
func parseKeyLikeFilter(val string, isHexPrefix bool, keyPrefix *string) {
20✔
874
        if isHexPrefix {
20✔
UNCOV
875
                *keyPrefix = val
×
UNCOV
876
                return
×
UNCOV
877
        }
×
878
        if strings.HasSuffix(val, "%") && !strings.Contains(val[:len(val)-1], "%") {
39✔
879
                *keyPrefix = val[:len(val)-1]
19✔
880
                return
19✔
881
        }
19✔
882
        fmt.Println("Warning: LIKE only supports prefix matching (e.g., 'prefix%' or x'14%')")
1✔
883
}
884

885
// parseValueFieldFilter handles value field WHERE conditions.
UNCOV
886
func parseValueFieldFilter(e *sqlparser.ComparisonExpr, qualifier, colName string, valueFilters *[]*valueFilter) {
×
UNCOV
887
        fieldName := resolveFieldName(qualifier, colName)
×
UNCOV
888
        val := extractValue(e.Right)
×
UNCOV
889
        *valueFilters = append(*valueFilters, &valueFilter{
×
UNCOV
890
                field:    fieldName,
×
UNCOV
891
                operator: e.Operator,
×
UNCOV
892
                value:    val,
×
UNCOV
893
        })
×
UNCOV
894
}
×
895

896
// resolveFieldName determines the field name from qualifier and column name.
UNCOV
897
func resolveFieldName(qualifier, colName string) string {
×
UNCOV
898
        if qualifier == "v" {
×
UNCOV
899
                return colName
×
UNCOV
900
        }
×
UNCOV
901
        if qualifier != "" && qualifier != "kv" {
×
UNCOV
902
                return qualifier + "." + colName
×
UNCOV
903
        }
×
UNCOV
904
        return colName
×
905
}
906

907
// parseRangeCond handles BETWEEN conditions.
908
func parseRangeCond(e *sqlparser.RangeCond, keyStart, keyEnd *string) {
7✔
909
        col, ok := e.Left.(*sqlparser.ColName)
7✔
910
        if !ok {
7✔
UNCOV
911
                return
×
UNCOV
912
        }
×
913
        if strings.ToLower(col.Name.String()) == "k" {
14✔
914
                *keyStart = extractValue(e.From)
7✔
915
                *keyEnd = extractValue(e.To)
7✔
916
        }
7✔
917
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc