• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25360351619

04 May 2026 04:37PM UTC coverage: 84.989% (-0.3%) from 85.255%
25360351619

Pull #2094

gh-ci

vchaindz
perf: hot-path allocation and lock-contention fixes

Seven independent improvements driven from a manual review of
allocations and lock-holding patterns in the read/write hot paths.
Each fix is self-contained and on-disk format-neutral.

P1 — sync() vLog lock convoy
embedded/store/immustore.go (sync)

  The for-range over s.vLogs called defer s.releaseVLog(i+1) inside
  the loop. Go stacks defers until the enclosing function returns,
  so all per-vLog locks were held for the full duration of every
  vLog.Flush + vLog.Sync of every other vLog, blocking concurrent
  readers on vLogsCond for the entire sync window. Replaced the
  defer with a per-iteration IIFE so each vLog drops its lock as
  soon as its own flush+sync completes. No reordering of the actual
  flush/sync calls; only the lock-release timing changes.

P2 — zero-copy snapshot reads
embedded/tbtree/{tbtree,snapshot}.go

  TBtree.Get and Snapshot.Get always finish with `return cp(v), …`
  where cp performs make+copy of the value bytes — even on cache
  hits served entirely from already-immutable in-memory leafValue
  slices. The defensive copy exists only to protect against caller
  mutation. Added GetReadonly companions next to each (additive
  API, no behaviour change for existing callers): same body minus
  the cp, with a doc-comment that the returned slice is borrowed
  and must not be mutated. Migrating callers is intentionally a
  separate audit pass — every callsite needs a hand-check before
  switching.

P4 — per-tx scratch buffer in commit loops
embedded/store/immustore.go (commitDurable, sync)

  Both commit loops allocated `cb := make([]byte, s.cLogEntrySize)`
  inside the loop body, once per transaction in a batch. cLog.Append
  ultimately routes to AppendableFile.write, which copies the input
  into its own writeBuffer (singleapp/single_app.go:477), so the
  input slice is not retained — safe to hoist a single 12- or
  44-byte buffer outside the loop and reuse it across iteration... (continued)
Pull Request #2094: perf(s3): wave 1-4 remote-storage performance + sql index-only COUNT + hot-path fixes

601 of 855 new or added lines in 16 files covered. (70.29%)

8 existing lines in 3 files now uncovered.

45131 of 53102 relevant lines covered (84.99%)

126741.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.12
/embedded/sql/catalog.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bytes"
21
        "context"
22
        "encoding/binary"
23
        "encoding/json"
24
        "errors"
25
        "fmt"
26
        "math"
27
        "strings"
28
        "time"
29

30
        "github.com/codenotary/immudb/embedded/store"
31
        "github.com/google/uuid"
32
)
33

34
// Catalog represents a database catalog containing metadata for all tables in the database.
35
type Catalog struct {
36
        enginePrefix []byte
37

38
        tables       []*Table
39
        tablesByID   map[uint32]*Table
40
        tablesByName map[string]*Table
41

42
        maxTableID uint32 // The maxTableID variable is used to assign unique ids to new tables as they are created.
43
}
44

45
type Constraint interface{}
46

47
type PrimaryKeyConstraint []string
48

49
// ForeignKeyConstraint is parsed but not enforced — stored for ORM migration compatibility
50
type ForeignKeyConstraint struct {
51
        cols     []string
52
        refTable string
53
        refCols  []string
54
}
55

56
type CheckConstraint struct {
57
        id   uint32
58
        name string
59
        exp  ValueExp
60
}
61

62
type Table struct {
63
        catalog          *Catalog
64
        id               uint32
65
        name             string
66
        cols             []*Column
67
        colsByID         map[uint32]*Column
68
        colsByName       map[string]*Column
69
        indexes          []*Index
70
        indexesByName    map[string]*Index
71
        indexesByColID   map[uint32][]*Index
72
        checkConstraints map[string]CheckConstraint
73
        primaryIndex     *Index
74
        autoIncrementPK  bool
75
        maxPK            int64
76

77
        maxColID   uint32
78
        maxIndexID uint32
79

80
        // systemScan is non-nil only for tables installed via
81
        // RegisterSystemTable. When set, SELECTs against this table bypass
82
        // storage and iterate the rows returned by Scan. See system_tables.go.
83
        systemScan func(ctx context.Context, tx *SQLTx) ([]*Row, error)
84
}
85

86
type Index struct {
87
        table     *Table
88
        id        uint32
89
        unique    bool
90
        cols      []*Column
91
        colsByID  map[uint32]*Column
92
        predicate ValueExp // WHERE clause for partial indexes (nil = full index)
93
}
94

95
type Column struct {
96
        table         *Table
97
        id            uint32
98
        colName       string
99
        colType       SQLValueType
100
        maxLen        int
101
        autoIncrement bool
102
        notNull       bool
103
        defaultValue  ValueExp
104
}
105

106
func newCatalog(enginePrefix []byte) *Catalog {
5,461✔
107
        ctlg := &Catalog{
5,461✔
108
                enginePrefix: enginePrefix,
5,461✔
109
                tablesByID:   make(map[uint32]*Table),
5,461✔
110
                tablesByName: make(map[string]*Table),
5,461✔
111
        }
5,461✔
112

5,461✔
113
        // Install every registered system table (pg_type today; more in
5,461✔
114
        // future pg-compat phases). Each is a virtual catalog entry with
5,461✔
115
        // Go-provided row source — no storage backing.
5,461✔
116
        for _, def := range registeredSystemTables() {
29,515✔
117
                installSystemTable(ctlg, def)
24,054✔
118
        }
24,054✔
119

120
        return ctlg
5,461✔
121
}
122

123
func init() {
33✔
124
        // pg_type: the one system table immudb has shipped with for years.
33✔
125
        // Schema stays at the historic three-column shape (oid, typbasetype,
33✔
126
        // typname) — every existing test that hits pg_type assumes it —
33✔
127
        // but the Scan now returns the canonical PG type catalog so psql
33✔
128
        // \dT and Rails' type map work without the PG wire layer having to
33✔
129
        // fabricate rows separately. The list is intentionally small: just
33✔
130
        // the types immudb actually emits. Callers that want a composite
33✔
131
        // row per user table walk pg_class instead.
33✔
132
        RegisterSystemTable(&SystemTableDef{
33✔
133
                Name: "pg_type",
33✔
134
                Columns: []SystemTableColumn{
33✔
135
                        {Name: "oid", Type: VarcharType, MaxLen: 10},
33✔
136
                        {Name: "typbasetype", Type: VarcharType, MaxLen: 10},
33✔
137
                        {Name: "typname", Type: VarcharType, MaxLen: 50},
33✔
138
                },
33✔
139
                PKColumn: "oid",
33✔
140
                Scan: func(ctx context.Context, tx *SQLTx) ([]*Row, error) {
40✔
141
                        rows := make([]*Row, 0, len(pgTypeBaseRows))
7✔
142
                        for _, r := range pgTypeBaseRows {
189✔
143
                                rows = append(rows, &Row{ValuesByPosition: []TypedValue{
182✔
144
                                        &Varchar{val: r.oid},
182✔
145
                                        &Varchar{val: "0"},
182✔
146
                                        &Varchar{val: r.name},
182✔
147
                                }})
182✔
148
                        }
182✔
149
                        return rows, nil
7✔
150
                },
151
        })
152
}
153

154
// pgTypeBaseRows lists the PostgreSQL base types we advertise in
155
// pg_type. OIDs match real PG so clients that cache them (Rails type
156
// map) round-trip correctly. Kept here rather than in pkg/pgsql/sys to
157
// avoid a dependency inversion — embedded/sql can't import pkg/pgsql.
158
var pgTypeBaseRows = []struct {
159
        oid  string
160
        name string
161
}{
162
        {"16", "bool"},
163
        {"17", "bytea"},
164
        {"18", "char"},
165
        {"19", "name"},
166
        {"20", "int8"},
167
        {"21", "int2"},
168
        {"23", "int4"},
169
        {"25", "text"},
170
        {"26", "oid"},
171
        {"114", "json"},
172
        {"142", "xml"},
173
        {"700", "float4"},
174
        {"701", "float8"},
175
        {"790", "money"},
176
        {"829", "macaddr"},
177
        {"869", "inet"},
178
        {"1042", "bpchar"},
179
        {"1043", "varchar"},
180
        {"1082", "date"},
181
        {"1083", "time"},
182
        {"1114", "timestamp"},
183
        {"1184", "timestamptz"},
184
        {"1186", "interval"},
185
        {"1700", "numeric"},
186
        {"2950", "uuid"},
187
        {"3802", "jsonb"},
188
}
189

190
func (catlg *Catalog) ExistTable(table string) bool {
1,450✔
191
        _, exists := catlg.tablesByName[table]
1,450✔
192
        return exists
1,450✔
193
}
1,450✔
194

195
func (catlg *Catalog) GetTables() []*Table {
14,135✔
196
        ts := make([]*Table, 0, len(catlg.tables))
14,135✔
197

14,135✔
198
        ts = append(ts, catlg.tables...)
14,135✔
199

14,135✔
200
        return ts
14,135✔
201
}
14,135✔
202

203
func (catlg *Catalog) GetTableByName(name string) (*Table, error) {
9,717✔
204
        table, exists := catlg.tablesByName[name]
9,717✔
205
        if !exists {
10,129✔
206
                return nil, fmt.Errorf("%w (%s)", ErrTableDoesNotExist, name)
412✔
207
        }
412✔
208
        return table, nil
9,305✔
209
}
210

211
func (catlg *Catalog) GetTableByID(id uint32) (*Table, error) {
1,316✔
212
        table, exists := catlg.tablesByID[id]
1,316✔
213
        if !exists {
2,630✔
214
                return nil, ErrTableDoesNotExist
1,314✔
215
        }
1,314✔
216
        return table, nil
2✔
217
}
218

219
func (t *Table) ID() uint32 {
175✔
220
        return t.id
175✔
221
}
175✔
222

223
func (t *Table) Cols() []*Column {
829✔
224
        cs := make([]*Column, 0, len(t.cols))
829✔
225

829✔
226
        cs = append(cs, t.cols...)
829✔
227

829✔
228
        return cs
829✔
229
}
829✔
230

231
func (t *Table) ColsByName() map[string]*Column {
50✔
232
        cs := make(map[string]*Column, len(t.cols))
50✔
233

50✔
234
        for _, c := range t.cols {
201✔
235
                cs[c.colName] = c
151✔
236
        }
151✔
237

238
        return cs
50✔
239
}
240

241
func (t *Table) Name() string {
1,215✔
242
        return t.name
1,215✔
243
}
1,215✔
244

245
func (t *Table) PrimaryIndex() *Index {
501✔
246
        return t.primaryIndex
501✔
247
}
501✔
248

249
func (t *Table) IsIndexed(colName string) (indexed bool, err error) {
36✔
250
        col, err := t.GetColumnByName(colName)
36✔
251
        if err != nil {
37✔
252
                return false, err
1✔
253
        }
1✔
254
        return len(t.indexesByColID[col.id]) > 0, nil
35✔
255
}
256

257
func (t *Table) GetColumnByName(name string) (*Column, error) {
21,874✔
258
        col, exists := t.colsByName[name]
21,874✔
259
        if !exists {
21,930✔
260
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, name)
56✔
261
        }
56✔
262
        return col, nil
21,818✔
263
}
264

265
func (t *Table) GetColumnByID(id uint32) (*Column, error) {
303,568✔
266
        col, exists := t.colsByID[id]
303,568✔
267
        if !exists {
303,675✔
268
                return nil, ErrColumnDoesNotExist
107✔
269
        }
107✔
270
        return col, nil
303,461✔
271
}
272

273
func (t *Table) ColumnsByID() map[uint32]*Column {
10✔
274
        return t.colsByID
10✔
275
}
10✔
276

277
func (t *Table) GetIndexes() []*Index {
122✔
278
        idxs := make([]*Index, 0, len(t.indexes))
122✔
279

122✔
280
        idxs = append(idxs, t.indexes...)
122✔
281

122✔
282
        return idxs
122✔
283
}
122✔
284

285
func (t *Table) GetIndexesByColID(colID uint32) []*Index {
14✔
286
        idxs := make([]*Index, 0, len(t.indexes))
14✔
287

14✔
288
        idxs = append(idxs, t.indexesByColID[colID]...)
14✔
289

14✔
290
        return idxs
14✔
291
}
14✔
292

293
func (t *Table) GetMaxColID() uint32 {
50✔
294
        return t.maxColID
50✔
295
}
50✔
296

297
func (i *Index) IsPrimary() bool {
18,948✔
298
        return i.id == PKIndexID
18,948✔
299
}
18,948✔
300

301
func (i *Index) IsUnique() bool {
3,070✔
302
        return i.unique
3,070✔
303
}
3,070✔
304

305
func (i *Index) Cols() []*Column {
635✔
306
        return i.cols
635✔
307
}
635✔
308

309
func (i *Index) IncludesCol(colID uint32) bool {
52✔
310
        _, ok := i.colsByID[colID]
52✔
311
        return ok
52✔
312
}
52✔
313

314
func (i *Index) enginePrefix() []byte {
22,964✔
315
        return i.table.catalog.enginePrefix
22,964✔
316
}
22,964✔
317

318
func (i *Index) coversOrdCols(ordExps []*OrdExp, rangesByColID map[uint32]*typedValueRange) bool {
498✔
319
        if !ordExpsHaveSameDirection(ordExps) {
532✔
320
                return false
34✔
321
        }
34✔
322
        return i.hasPrefix(i.cols, ordExps) || i.sortableUsing(ordExps, rangesByColID)
464✔
323
}
324

325
// countEqualityCoveredCols returns the number of consecutive leading columns
326
// of the index that have a point-equality (unitary) range in rangesByColID.
327
// Used by selectINLJIndex to prefer more selective indexes: an index on (a, b)
328
// scores 2 when both a=x AND b=y are present, beating a single-column (a) index.
329
func (i *Index) countEqualityCoveredCols(rangesByColID map[uint32]*typedValueRange) int {
379✔
330
        count := 0
379✔
331
        for _, col := range i.cols {
766✔
332
                r, ok := rangesByColID[col.id]
387✔
333
                if !ok || !r.unitary() {
741✔
334
                        break
354✔
335
                }
336
                count++
33✔
337
        }
338
        return count
379✔
339
}
340

341
// coversEqualityRanges returns true when at least the index's leading column
342
// has a point-equality range in rangesByColID. It is a convenience wrapper
343
// around countEqualityCoveredCols used in tests and unit assertions.
344
func (i *Index) coversEqualityRanges(rangesByColID map[uint32]*typedValueRange) bool {
8✔
345
        return i.countEqualityCoveredCols(rangesByColID) > 0
8✔
346
}
8✔
347

348
func ordExpsHaveSameDirection(exps []*OrdExp) bool {
498✔
349
        if len(exps) == 0 {
498✔
350
                return true
×
351
        }
×
352

353
        desc := exps[0].descOrder
498✔
354
        for _, e := range exps[1:] {
637✔
355
                if e.descOrder != desc {
173✔
356
                        return false
34✔
357
                }
34✔
358
        }
359
        return true
464✔
360
}
361

362
func (i *Index) hasPrefix(columns []*Column, ordExps []*OrdExp) bool {
490✔
363
        if len(ordExps) > len(columns) {
536✔
364
                return false
46✔
365
        }
46✔
366

367
        for j, ordCol := range ordExps {
936✔
368
                sel := ordCol.AsSelector()
492✔
369
                if sel == nil {
514✔
370
                        return false
22✔
371
                }
22✔
372

373
                aggFn, _, colName := sel.resolve(i.table.Name())
470✔
374
                if len(aggFn) > 0 {
475✔
375
                        return false
5✔
376
                }
5✔
377

378
                col, err := i.table.GetColumnByName(colName)
465✔
379
                if err != nil || col.id != columns[j].id {
654✔
380
                        return false
189✔
381
                }
189✔
382
        }
383
        return true
228✔
384
}
385

386
func (i *Index) sortableUsing(columns []*OrdExp, rangesByColID map[uint32]*typedValueRange) bool {
252✔
387
        // all columns before colID must be fixedValues otherwise the index can not be used
252✔
388
        sel := columns[0].AsSelector()
252✔
389
        if sel == nil {
278✔
390
                return false
26✔
391
        }
26✔
392

393
        aggFn, _, colName := sel.resolve(i.table.Name())
226✔
394
        if len(aggFn) > 0 {
229✔
395
                return false
3✔
396
        }
3✔
397

398
        firstCol, err := i.table.GetColumnByName(colName)
223✔
399
        if err != nil {
237✔
400
                return false
14✔
401
        }
14✔
402

403
        for j, col := range i.cols {
437✔
404
                if col.id == firstCol.id {
254✔
405
                        return i.hasPrefix(i.cols[j:], columns)
26✔
406
                }
26✔
407

408
                colRange, ok := rangesByColID[col.id]
202✔
409
                if ok && colRange.unitary() {
221✔
410
                        continue
19✔
411
                }
412
                return false
183✔
413
        }
414
        return false
×
415
}
416

417
func (i *Index) Name() string {
35,237✔
418
        return indexName(i.table.name, i.cols)
35,237✔
419
}
35,237✔
420

421
func (i *Index) ID() uint32 {
22✔
422
        return i.id
22✔
423
}
22✔
424

425
func (t *Table) GetIndexByName(name string) (*Index, error) {
37✔
426
        idx, exists := t.indexesByName[name]
37✔
427
        if !exists {
38✔
428
                return nil, fmt.Errorf("%w (%s)", ErrIndexNotFound, name)
1✔
429
        }
1✔
430
        return idx, nil
36✔
431
}
432

433
func indexName(tableName string, cols []*Column) string {
35,273✔
434
        var buf strings.Builder
35,273✔
435

35,273✔
436
        buf.WriteString(tableName)
35,273✔
437

35,273✔
438
        buf.WriteString("(")
35,273✔
439

35,273✔
440
        for c, col := range cols {
71,461✔
441
                buf.WriteString(col.colName)
36,188✔
442

36,188✔
443
                if c < len(cols)-1 {
37,103✔
444
                        buf.WriteString(",")
915✔
445
                }
915✔
446
        }
447

448
        buf.WriteString(")")
35,273✔
449

35,273✔
450
        return buf.String()
35,273✔
451
}
452

453
func (catlg *Catalog) newTable(name string, colsSpec map[uint32]*ColSpec, checkConstraints map[string]CheckConstraint, maxColID uint32) (table *Table, err error) {
1,322✔
454
        if len(name) == 0 || len(colsSpec) == 0 {
1,325✔
455
                return nil, ErrIllegalArguments
3✔
456
        }
3✔
457

458
        for id := range colsSpec {
4,895✔
459
                if id <= 0 || id > maxColID {
3,576✔
460
                        return nil, ErrIllegalArguments
×
461
                }
×
462
        }
463

464
        exists := catlg.ExistTable(name)
1,319✔
465
        if exists {
1,326✔
466
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, name)
7✔
467
        }
7✔
468

469
        // Generate a new ID for the table by incrementing the 'maxTableID' variable of the 'catalog' instance.
470
        id := (catlg.maxTableID + 1)
1,312✔
471

1,312✔
472
        // This code is attempting to check if a table with the given id already exists in the Catalog.
1,312✔
473
        // If the function returns nil for err, it means that the table already exists and the function
1,312✔
474
        // should return an error indicating that the table cannot be created again.
1,312✔
475
        _, err = catlg.GetTableByID(id)
1,312✔
476
        if err == nil {
1,312✔
477
                return nil, fmt.Errorf("%w (%d)", ErrTableAlreadyExists, id)
×
478
        }
×
479

480
        table = &Table{
1,312✔
481
                id:               id,
1,312✔
482
                catalog:          catlg,
1,312✔
483
                name:             name,
1,312✔
484
                cols:             make([]*Column, 0, len(colsSpec)),
1,312✔
485
                colsByID:         make(map[uint32]*Column),
1,312✔
486
                colsByName:       make(map[string]*Column),
1,312✔
487
                indexesByName:    make(map[string]*Index),
1,312✔
488
                indexesByColID:   make(map[uint32][]*Index),
1,312✔
489
                checkConstraints: checkConstraints,
1,312✔
490
                maxColID:         maxColID,
1,312✔
491
        }
1,312✔
492

1,312✔
493
        for id := uint32(1); id <= maxColID; id++ {
4,918✔
494
                cs, found := colsSpec[id]
3,606✔
495
                if !found {
3,647✔
496
                        // dropped column
41✔
497
                        continue
41✔
498
                }
499

500
                if isReservedCol(cs.colName) {
3,566✔
501
                        return nil, fmt.Errorf("%w(%s)", ErrReservedWord, cs.colName)
1✔
502
                }
1✔
503

504
                _, colExists := table.colsByName[cs.colName]
3,564✔
505
                if colExists {
3,565✔
506
                        return nil, ErrDuplicatedColumn
1✔
507
                }
1✔
508

509
                if cs.autoIncrement && cs.colType != IntegerType {
3,565✔
510
                        return nil, ErrLimitedAutoIncrement
2✔
511
                }
2✔
512

513
                if !validMaxLenForType(cs.maxLen, cs.colType) {
3,561✔
514
                        return nil, ErrLimitedMaxLen
×
515
                }
×
516

517
                col := &Column{
3,561✔
518
                        id:            uint32(id),
3,561✔
519
                        table:         table,
3,561✔
520
                        colName:       cs.colName,
3,561✔
521
                        colType:       cs.colType,
3,561✔
522
                        maxLen:        cs.maxLen,
3,561✔
523
                        autoIncrement: cs.autoIncrement,
3,561✔
524
                        notNull:       cs.notNull,
3,561✔
525
                        defaultValue:  cs.defaultValue,
3,561✔
526
                }
3,561✔
527

3,561✔
528
                table.cols = append(table.cols, col)
3,561✔
529
                table.colsByID[col.id] = col
3,561✔
530
                table.colsByName[col.colName] = col
3,561✔
531
        }
532

533
        catlg.tables = append(catlg.tables, table)
1,308✔
534
        catlg.tablesByID[table.id] = table
1,308✔
535
        catlg.tablesByName[table.name] = table
1,308✔
536

1,308✔
537
        // increment table count on successfull table creation.
1,308✔
538
        // This ensures that each new table is assigned a unique ID
1,308✔
539
        // that has not been used before.
1,308✔
540
        catlg.maxTableID++
1,308✔
541

1,308✔
542
        return table, nil
1,308✔
543
}
544

545
func (catlg *Catalog) deleteTable(table *Table) error {
8✔
546
        _, exists := catlg.tablesByID[table.id]
8✔
547
        if !exists {
8✔
548
                return ErrTableDoesNotExist
×
549
        }
×
550

551
        newTables := make([]*Table, 0, len(catlg.tables)-1)
8✔
552

8✔
553
        for _, t := range catlg.tables {
29✔
554
                if t.id != table.id {
34✔
555
                        newTables = append(newTables, t)
13✔
556
                }
13✔
557
        }
558

559
        catlg.tables = newTables
8✔
560
        delete(catlg.tablesByID, table.id)
8✔
561
        delete(catlg.tablesByName, table.name)
8✔
562

8✔
563
        return nil
8✔
564
}
565

566
func (t *Table) newIndex(unique bool, colIDs []uint32) (index *Index, err error) {
1,763✔
567
        if len(colIDs) < 1 {
1,764✔
568
                return nil, ErrIllegalArguments
1✔
569
        }
1✔
570

571
        // validate column ids
572
        cols := make([]*Column, len(colIDs))
1,762✔
573
        colsByID := make(map[uint32]*Column, len(colIDs))
1,762✔
574

1,762✔
575
        for i, colID := range colIDs {
3,622✔
576
                col, err := t.GetColumnByID(colID)
1,860✔
577
                if err != nil {
1,861✔
578
                        return nil, err
1✔
579
                }
1✔
580

581
                _, ok := colsByID[colID]
1,859✔
582
                if ok {
1,860✔
583
                        return nil, ErrDuplicatedColumn
1✔
584
                }
1✔
585

586
                cols[i] = col
1,858✔
587
                colsByID[colID] = col
1,858✔
588
        }
589

590
        index = &Index{
1,760✔
591
                id:       uint32(t.maxIndexID),
1,760✔
592
                table:    t,
1,760✔
593
                unique:   unique,
1,760✔
594
                cols:     cols,
1,760✔
595
                colsByID: colsByID,
1,760✔
596
        }
1,760✔
597

1,760✔
598
        _, exists := t.indexesByName[index.Name()]
1,760✔
599
        if exists {
1,766✔
600
                return nil, ErrIndexAlreadyExists
6✔
601
        }
6✔
602

603
        t.indexes = append(t.indexes, index)
1,754✔
604
        t.indexesByName[index.Name()] = index
1,754✔
605

1,754✔
606
        // having a direct way to get the indexes by colID
1,754✔
607
        for _, col := range index.cols {
3,604✔
608
                t.indexesByColID[col.id] = append(t.indexesByColID[col.id], index)
1,850✔
609
        }
1,850✔
610

611
        if index.id == PKIndexID {
3,043✔
612
                t.primaryIndex = index
1,289✔
613
                t.autoIncrementPK = len(index.cols) == 1 && index.cols[0].autoIncrement
1,289✔
614
        }
1,289✔
615

616
        // increment table count on successfull table creation.
617
        // This ensures that each new table is assigned a unique ID
618
        // that has not been used before.
619
        t.maxIndexID++
1,754✔
620

1,754✔
621
        return index, nil
1,754✔
622
}
623

624
func (t *Table) newColumn(spec *ColSpec) (*Column, error) {
19✔
625
        if isReservedCol(spec.colName) {
20✔
626
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, spec.colName)
1✔
627
        }
1✔
628

629
        if spec.autoIncrement {
19✔
630
                return nil, fmt.Errorf("%w (%s)", ErrLimitedAutoIncrement, spec.colName)
1✔
631
        }
1✔
632

633
        if spec.notNull {
18✔
634
                return nil, fmt.Errorf("%w (%s)", ErrNewColumnMustBeNullable, spec.colName)
1✔
635
        }
1✔
636

637
        if !validMaxLenForType(spec.maxLen, spec.colType) {
17✔
638
                return nil, fmt.Errorf("%w (%s)", ErrLimitedMaxLen, spec.colName)
1✔
639
        }
1✔
640

641
        _, exists := t.colsByName[spec.colName]
15✔
642
        if exists {
18✔
643
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, spec.colName)
3✔
644
        }
3✔
645

646
        t.maxColID++
12✔
647

12✔
648
        col := &Column{
12✔
649
                id:            t.maxColID,
12✔
650
                table:         t,
12✔
651
                colName:       spec.colName,
12✔
652
                colType:       spec.colType,
12✔
653
                maxLen:        spec.maxLen,
12✔
654
                autoIncrement: spec.autoIncrement,
12✔
655
                notNull:       spec.notNull,
12✔
656
                defaultValue:  spec.defaultValue,
12✔
657
        }
12✔
658

12✔
659
        t.cols = append(t.cols, col)
12✔
660
        t.colsByID[col.id] = col
12✔
661
        t.colsByName[col.colName] = col
12✔
662

12✔
663
        return col, nil
12✔
664
}
665

666
func (ctlg *Catalog) renameTable(oldName, newName string) (*Table, error) {
6✔
667
        if oldName == newName {
7✔
668
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
669
        }
1✔
670

671
        t, err := ctlg.GetTableByName(oldName)
5✔
672
        if err != nil {
7✔
673
                return nil, err
2✔
674
        }
2✔
675

676
        _, err = ctlg.GetTableByName(newName)
3✔
677
        if err == nil {
4✔
678
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, newName)
1✔
679
        }
1✔
680

681
        t.name = newName
2✔
682

2✔
683
        delete(ctlg.tablesByName, oldName)
2✔
684
        ctlg.tablesByName[newName] = t
2✔
685

2✔
686
        return t, nil
2✔
687
}
688

689
func (t *Table) renameColumn(oldName, newName string) (*Column, error) {
9✔
690
        if isReservedCol(newName) {
9✔
691
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, newName)
×
692
        }
×
693

694
        if oldName == newName {
10✔
695
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
696
        }
1✔
697

698
        col, exists := t.colsByName[oldName]
8✔
699
        if !exists {
9✔
700
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, oldName)
1✔
701
        }
1✔
702

703
        _, exists = t.colsByName[newName]
7✔
704
        if exists {
8✔
705
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, newName)
1✔
706
        }
1✔
707

708
        col.colName = newName
6✔
709

6✔
710
        delete(t.colsByName, oldName)
6✔
711
        t.colsByName[newName] = col
6✔
712

6✔
713
        return col, nil
6✔
714
}
715

716
func (t *Table) deleteColumn(col *Column) error {
12✔
717
        isIndexed, err := t.IsIndexed(col.colName)
12✔
718
        if err != nil {
12✔
719
                return err
×
720
        }
×
721

722
        if isIndexed {
16✔
723
                return fmt.Errorf("%w %s because one or more indexes require it", ErrCannotDropColumn, col.colName)
4✔
724
        }
4✔
725

726
        newCols := make([]*Column, 0, len(t.cols)-1)
8✔
727

8✔
728
        for _, c := range t.cols {
55✔
729
                if c.id != col.id {
86✔
730
                        newCols = append(newCols, c)
39✔
731
                }
39✔
732
        }
733

734
        t.cols = newCols
8✔
735
        delete(t.colsByName, col.colName)
8✔
736
        delete(t.colsByID, col.id)
8✔
737

8✔
738
        return nil
8✔
739
}
740

741
func (t *Table) deleteCheck(name string) (uint32, error) {
4✔
742
        c, exists := t.checkConstraints[name]
4✔
743
        if !exists {
5✔
744
                return 0, fmt.Errorf("%s.%s: %w", t.name, name, ErrConstraintNotFound)
1✔
745
        }
1✔
746

747
        delete(t.checkConstraints, name)
3✔
748
        return c.id, nil
3✔
749
}
750

751
func (t *Table) deleteIndex(index *Index) error {
5✔
752
        if index.IsPrimary() {
6✔
753
                return fmt.Errorf("%w: primary key index can NOT be deleted", ErrIllegalArguments)
1✔
754
        }
1✔
755

756
        newIndexes := make([]*Index, 0, len(t.indexes)-1)
4✔
757

4✔
758
        for _, i := range t.indexes {
13✔
759
                if i.id != index.id {
14✔
760
                        newIndexes = append(newIndexes, i)
5✔
761
                }
5✔
762
        }
763

764
        t.indexes = newIndexes
4✔
765
        delete(t.indexesByColID, index.id)
4✔
766
        delete(t.indexesByName, index.Name())
4✔
767

4✔
768
        return nil
4✔
769
}
770

771
func (c *Column) ID() uint32 {
733✔
772
        return c.id
733✔
773
}
733✔
774

775
func (c *Column) Name() string {
4,179✔
776
        return c.colName
4,179✔
777
}
4,179✔
778

779
func (c *Column) Type() SQLValueType {
28,792✔
780
        return c.colType
28,792✔
781
}
28,792✔
782

783
func (c *Column) MaxLen() int {
83,668✔
784
        switch c.colType {
83,668✔
785
        case BooleanType:
818✔
786
                return 1
818✔
787
        case IntegerType:
61,609✔
788
                return 8
61,609✔
789
        case TimestampType:
1,512✔
790
                return 8
1,512✔
791
        case Float64Type:
3,540✔
792
                return 8
3,540✔
793
        case UUIDType:
39✔
794
                return 16
39✔
795
        }
796

797
        return c.maxLen
16,150✔
798
}
799

800
func (c *Column) IsNullable() bool {
107✔
801
        return !c.notNull
107✔
802
}
107✔
803

804
func (c *Column) IsAutoIncremental() bool {
42✔
805
        return c.autoIncrement
42✔
806
}
42✔
807

808
func (c *Column) HasDefault() bool {
1,716✔
809
        return c.defaultValue != nil
1,716✔
810
}
1,716✔
811

812
func (c *Column) DefaultValue() ValueExp {
×
813
        return c.defaultValue
×
814
}
×
815

816
func validMaxLenForType(maxLen int, sqlType SQLValueType) bool {
3,577✔
817
        switch sqlType {
3,577✔
818
        case BooleanType:
252✔
819
                return maxLen <= 1
252✔
820
        case IntegerType:
1,780✔
821
                return maxLen == 0 || maxLen == 8
1,780✔
822
        case Float64Type:
107✔
823
                return maxLen == 0 || maxLen == 8
107✔
824
        case TimestampType:
57✔
825
                return maxLen == 0 || maxLen == 8
57✔
826
        case UUIDType:
29✔
827
                return maxLen == 0 || maxLen == 16
29✔
828
        }
829

830
        return maxLen >= 0
1,352✔
831
}
832

833
func (catlg *Catalog) load(ctx context.Context, tx *store.OngoingTx) error {
1,131✔
834
        return catlg.loadCatalog(ctx, tx, false)
1,131✔
835
}
1,131✔
836

837
// Clone returns a deep copy of the catalog suitable for use as the in-memory
838
// schema of a fresh transaction. Per-tx DDL mutations operate on the clone
839
// and never touch the source, so a cache of the last known schema can be
840
// cloned for read-write transactions to avoid the cost of re-running
841
// loadCatalog (prefix scans + default-expression re-parse) on every NewTx.
842
//
843
// Registered system tables (pg_type today, more in future pg-compat
844
// phases) are re-installed from scratch by newCatalog — never copied —
845
// so their identity is stable across catalogs and the Scan function
846
// pointers stay bound to the package-level registry.
847
//
848
// Immutable substructures — ValueExp default expressions, Index predicates,
849
// and CheckConstraint expressions — are shared with the source by pointer
850
// because they are never mutated after parsing. All mutable state (tables,
851
// columns, indexes, and their lookup maps) is deep-copied. Back-references
852
// (Table.catalog, Column.table, Index.table) are re-targeted to the clones.
853
//
854
// table.maxPK is copied from the source but callers that need up-to-date
855
// auto-increment state must re-run loadMaxPK (which is snapshot-dependent).
856
func (catlg *Catalog) Clone() *Catalog {
4,300✔
857
        cp := newCatalog(catlg.enginePrefix)
4,300✔
858
        cp.maxTableID = catlg.maxTableID
4,300✔
859

4,300✔
860
        if len(catlg.tables) > 0 {
8,575✔
861
                cp.tables = make([]*Table, 0, len(catlg.tables))
4,275✔
862
        }
4,275✔
863

864
        for _, t := range catlg.tables {
9,415✔
865
                nt := cloneTable(t, cp)
5,115✔
866
                cp.tables = append(cp.tables, nt)
5,115✔
867
                cp.tablesByID[nt.id] = nt
5,115✔
868
                cp.tablesByName[nt.name] = nt
5,115✔
869
        }
5,115✔
870

871
        return cp
4,300✔
872
}
873

874
// cloneTable deep-copies t and reparents columns/indexes onto the returned
875
// clone. The new Catalog back-reference must already be available so the
876
// clone's Table.catalog field can be filled in.
877
func cloneTable(t *Table, newCatalog *Catalog) *Table {
5,115✔
878
        nt := &Table{
5,115✔
879
                id:               t.id,
5,115✔
880
                catalog:          newCatalog,
5,115✔
881
                name:             t.name,
5,115✔
882
                autoIncrementPK:  t.autoIncrementPK,
5,115✔
883
                maxPK:            t.maxPK,
5,115✔
884
                maxColID:         t.maxColID,
5,115✔
885
                maxIndexID:       t.maxIndexID,
5,115✔
886
                cols:             make([]*Column, 0, len(t.cols)),
5,115✔
887
                colsByID:         make(map[uint32]*Column, len(t.colsByID)),
5,115✔
888
                colsByName:       make(map[string]*Column, len(t.colsByName)),
5,115✔
889
                indexes:          make([]*Index, 0, len(t.indexes)),
5,115✔
890
                indexesByName:    make(map[string]*Index, len(t.indexesByName)),
5,115✔
891
                indexesByColID:   make(map[uint32][]*Index, len(t.indexesByColID)),
5,115✔
892
                checkConstraints: make(map[string]CheckConstraint, len(t.checkConstraints)),
5,115✔
893
        }
5,115✔
894

5,115✔
895
        for name, cc := range t.checkConstraints {
5,185✔
896
                // CheckConstraint is a value type; its .exp (ValueExp) is immutable
70✔
897
                // after parsing so the pointer can be shared.
70✔
898
                nt.checkConstraints[name] = cc
70✔
899
        }
70✔
900

901
        for _, c := range t.cols {
23,648✔
902
                nc := *c
18,533✔
903
                nc.table = nt
18,533✔
904
                // nc.defaultValue is a ValueExp interface value; treated as immutable.
18,533✔
905
                nt.cols = append(nt.cols, &nc)
18,533✔
906
                nt.colsByID[nc.id] = &nc
18,533✔
907
                nt.colsByName[nc.colName] = &nc
18,533✔
908
        }
18,533✔
909

910
        for _, idx := range t.indexes {
12,714✔
911
                ni := &Index{
7,599✔
912
                        id:        idx.id,
7,599✔
913
                        table:     nt,
7,599✔
914
                        unique:    idx.unique,
7,599✔
915
                        predicate: idx.predicate, // immutable after parsing
7,599✔
916
                        cols:      make([]*Column, len(idx.cols)),
7,599✔
917
                        colsByID:  make(map[uint32]*Column, len(idx.colsByID)),
7,599✔
918
                }
7,599✔
919
                for i, c := range idx.cols {
15,900✔
920
                        ni.cols[i] = nt.colsByID[c.id]
8,301✔
921
                }
8,301✔
922
                for id := range idx.colsByID {
15,900✔
923
                        ni.colsByID[id] = nt.colsByID[id]
8,301✔
924
                }
8,301✔
925
                nt.indexes = append(nt.indexes, ni)
7,599✔
926
                nt.indexesByName[ni.Name()] = ni
7,599✔
927
                if idx == t.primaryIndex {
12,714✔
928
                        nt.primaryIndex = ni
5,115✔
929
                }
5,115✔
930
        }
931

932
        // Rebuild indexesByColID from the cloned indexes; it mirrors the source's
933
        // mapping from column-id → list of indexes that reference that column.
934
        for _, ni := range nt.indexes {
12,714✔
935
                for _, c := range ni.cols {
15,900✔
936
                        nt.indexesByColID[c.id] = append(nt.indexesByColID[c.id], ni)
8,301✔
937
                }
8,301✔
938
        }
939

940
        return nt
5,115✔
941
}
942

943
func (catlg *Catalog) loadCatalog(ctx context.Context, tx *store.OngoingTx, copyToTx bool) error {
1,160✔
944
        prefix := MapKey(catlg.enginePrefix, catalogTablePrefix, EncodeID(1))
1,160✔
945

1,160✔
946
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
2,049✔
947
                dbID, tableID, err := unmapTableID(catlg.enginePrefix, key)
889✔
948
                if err != nil {
889✔
949
                        return err
×
950
                }
×
951

952
                if dbID != DatabaseID {
889✔
953
                        return ErrCorruptedData
×
954
                }
×
955

956
                if deleted {
918✔
957
                        catlg.maxTableID++
29✔
958
                        return nil
29✔
959
                }
29✔
960

961
                colSpecs, maxColID, err := loadColSpecs(ctx, tableID, tx, catlg.enginePrefix, copyToTx)
860✔
962
                if err != nil {
864✔
963
                        return err
4✔
964
                }
4✔
965

966
                checks, err := loadCheckConstraints(ctx, dbID, tableID, tx, catlg.enginePrefix, copyToTx)
856✔
967
                if err != nil {
856✔
968
                        return err
×
969
                }
×
970

971
                table, err := catlg.newTable(string(value), colSpecs, checks, maxColID)
856✔
972
                if err != nil {
856✔
973
                        return err
×
974
                }
×
975

976
                if tableID != table.id {
856✔
977
                        return ErrCorruptedData
×
978
                }
×
979

980
                if copyToTx {
865✔
981
                        if err := tx.Set(key, nil, value); err != nil {
9✔
982
                                return err
×
983
                        }
×
984
                }
985
                return table.loadIndexes(ctx, catlg.enginePrefix, tx, copyToTx)
856✔
986
        })
987
}
988

989
func loadMaxPK(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, table *Table) ([]byte, error) {
1,956✔
990
        pkReaderSpec := store.KeyReaderSpec{
1,956✔
991
                Prefix:    MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(table.primaryIndex.id)),
1,956✔
992
                DescOrder: true,
1,956✔
993
        }
1,956✔
994

1,956✔
995
        pkReader, err := tx.NewKeyReader(pkReaderSpec)
1,956✔
996
        if err != nil {
1,956✔
997
                return nil, err
×
998
        }
×
999
        defer pkReader.Close()
1,956✔
1000

1,956✔
1001
        mkey, _, err := pkReader.Read(ctx)
1,956✔
1002
        if err != nil {
2,289✔
1003
                return nil, err
333✔
1004
        }
333✔
1005

1006
        return unmapIndexEntry(table.primaryIndex, sqlPrefix, mkey)
1,623✔
1007
}
1008

1009
func loadColSpecs(ctx context.Context, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[uint32]*ColSpec, uint32, error) {
860✔
1010
        prefix := MapKey(sqlPrefix, catalogColumnPrefix, EncodeID(1), EncodeID(tableID))
860✔
1011

860✔
1012
        var maxColID uint32
860✔
1013
        specs := make(map[uint32]*ColSpec)
860✔
1014

860✔
1015
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
3,262✔
1016
                if deleted {
2,443✔
1017
                        maxColID++
41✔
1018
                        return nil
41✔
1019
                }
41✔
1020

1021
                colSpec, colID, err := loadColSpec(sqlPrefix, key, value, tableID)
2,361✔
1022
                if err != nil {
2,364✔
1023
                        return err
3✔
1024
                }
3✔
1025

1026
                maxColID++
2,358✔
1027

2,358✔
1028
                specs[colID] = colSpec
2,358✔
1029

2,358✔
1030
                if copyToTx {
2,390✔
1031
                        return tx.Set(key, nil, value)
32✔
1032
                }
32✔
1033
                return nil
2,326✔
1034
        })
1035
        return specs, maxColID, err
860✔
1036
}
1037

1038
func loadColSpec(sqlPrefix, key, value []byte, tableID uint32) (*ColSpec, uint32, error) {
2,361✔
1039
        if len(value) < 6 {
2,363✔
1040
                return nil, 0, ErrCorruptedData
2✔
1041
        }
2✔
1042

1043
        mdbID, mtableID, colID, colType, err := unmapColSpec(sqlPrefix, key)
2,359✔
1044
        if err != nil {
2,360✔
1045
                return nil, 0, err
1✔
1046
        }
1✔
1047

1048
        if mdbID != 1 || tableID != mtableID {
2,358✔
1049
                return nil, 0, ErrCorruptedData
×
1050
        }
×
1051

1052
        flags := value[0]
2,358✔
1053
        maxLen := int(binary.BigEndian.Uint32(value[1:]))
2,358✔
1054

2,358✔
1055
        var colName string
2,358✔
1056
        var defaultValue ValueExp
2,358✔
1057

2,358✔
1058
        if flags&hasDefaultFlag != 0 && len(value) >= 7 {
2,362✔
1059
                // New format: {flags(1)}{maxLen(4)}{colNameLen(2)}{colName}{defaultSQL}
4✔
1060
                colNameLen := int(binary.BigEndian.Uint16(value[5:]))
4✔
1061
                if len(value) < 7+colNameLen {
4✔
1062
                        return nil, 0, ErrCorruptedData
×
1063
                }
×
1064
                colName = string(value[7 : 7+colNameLen])
4✔
1065

4✔
1066
                // Parse default value SQL if present
4✔
1067
                if defaultStart := 7 + colNameLen; defaultStart < len(value) {
8✔
1068
                        defaultSQL := string(value[defaultStart:])
4✔
1069
                        if defaultSQL != "" {
8✔
1070
                                // Parse the default expression
4✔
1071
                                stmts, err := ParseSQL(strings.NewReader("SELECT " + defaultSQL))
4✔
1072
                                if err == nil && len(stmts) > 0 {
8✔
1073
                                        if sel, ok := stmts[0].(*SelectStmt); ok && len(sel.targets) > 0 {
8✔
1074
                                                defaultValue = sel.targets[0].Exp
4✔
1075
                                        }
4✔
1076
                                }
1077
                        }
1078
                }
1079
        } else {
2,354✔
1080
                // Old format: {flags(1)}{maxLen(4)}{colName}
2,354✔
1081
                colName = string(value[5:])
2,354✔
1082
        }
2,354✔
1083

1084
        return &ColSpec{
2,358✔
1085
                colName:       colName,
2,358✔
1086
                colType:       colType,
2,358✔
1087
                maxLen:        maxLen,
2,358✔
1088
                autoIncrement: flags&autoIncrementFlag != 0,
2,358✔
1089
                notNull:       flags&nullableFlag != 0,
2,358✔
1090
                defaultValue:  defaultValue,
2,358✔
1091
        }, colID, nil
2,358✔
1092
}
1093

1094
func loadCheckConstraints(ctx context.Context, dbID, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[string]CheckConstraint, error) {
856✔
1095
        prefix := MapKey(sqlPrefix, catalogCheckPrefix, EncodeID(dbID), EncodeID(tableID))
856✔
1096
        checks := make(map[string]CheckConstraint)
856✔
1097

856✔
1098
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
898✔
1099
                if deleted {
56✔
1100
                        return nil
14✔
1101
                }
14✔
1102

1103
                check, err := parseCheckConstraint(sqlPrefix, key, value)
28✔
1104
                if err != nil {
28✔
1105
                        return err
×
1106
                }
×
1107
                checks[check.name] = *check
28✔
1108

28✔
1109
                if copyToTx {
28✔
1110
                        return tx.Set(key, nil, value)
×
1111
                }
×
1112
                return nil
28✔
1113
        })
1114
        return checks, err
856✔
1115
}
1116

1117
func (table *Table) loadIndexes(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, copyToTx bool) error {
856✔
1118
        prefix := MapKey(sqlPrefix, catalogIndexPrefix, EncodeID(1), EncodeID(table.id))
856✔
1119

856✔
1120
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
2,055✔
1121
                dbID, tableID, indexID, err := unmapIndex(sqlPrefix, key)
1,199✔
1122
                if err != nil {
1,199✔
1123
                        return err
×
1124
                }
×
1125

1126
                if table.id != tableID || dbID != 1 {
1,199✔
1127
                        return ErrCorruptedData
×
1128
                }
×
1129

1130
                if deleted {
1,211✔
1131
                        table.maxIndexID++
12✔
1132
                        return nil
12✔
1133
                }
12✔
1134

1135
                if copyToTx {
1,210✔
1136
                        if err := tx.Set(key, nil, value); err != nil {
23✔
1137
                                return err
×
1138
                        }
×
1139
                } else {
1,164✔
1140
                        // v={unique {colID1}(ASC|DESC)...{colIDN}(ASC|DESC)}
1,164✔
1141
                        colSpecLen := EncIDLen + 1
1,164✔
1142
                        if len(value) < 1+colSpecLen || len(value)%colSpecLen != 1 {
1,164✔
1143
                                return ErrCorruptedData
×
1144
                        }
×
1145

1146
                        var colIDs []uint32
1,164✔
1147
                        for i := 1; i < len(value); i += colSpecLen {
2,395✔
1148
                                colID := binary.BigEndian.Uint32(value[i:])
1,231✔
1149

1,231✔
1150
                                // TODO: currently only ASC order is supported
1,231✔
1151
                                if value[i+EncIDLen] != 0 {
1,231✔
1152
                                        return ErrCorruptedData
×
1153
                                }
×
1154
                                colIDs = append(colIDs, colID)
1,231✔
1155
                        }
1156

1157
                        index, err := table.newIndex(value[0] > 0, colIDs)
1,164✔
1158
                        if err != nil {
1,165✔
1159
                                return err
1✔
1160
                        }
1✔
1161

1162
                        if indexID != index.id {
1,163✔
1163
                                return ErrCorruptedData
×
1164
                        }
×
1165
                }
1166
                return nil
1,186✔
1167
        })
1168
}
1169

1170
func trimPrefix(prefix, mkey []byte, mappingPrefix []byte) ([]byte, error) {
6,139✔
1171
        if len(prefix)+len(mappingPrefix) > len(mkey) ||
6,139✔
1172
                !bytes.Equal(prefix, mkey[:len(prefix)]) ||
6,139✔
1173
                !bytes.Equal(mappingPrefix, mkey[len(prefix):len(prefix)+len(mappingPrefix)]) {
6,147✔
1174
                return nil, ErrIllegalMappedKey
8✔
1175
        }
8✔
1176

1177
        return mkey[len(prefix)+len(mappingPrefix):], nil
6,131✔
1178
}
1179

1180
func unmapTableID(prefix, mkey []byte) (dbID, tableID uint32, err error) {
892✔
1181
        encID, err := trimPrefix(prefix, mkey, []byte(catalogTablePrefix))
892✔
1182
        if err != nil {
893✔
1183
                return 0, 0, err
1✔
1184
        }
1✔
1185

1186
        if len(encID) != EncIDLen*2 {
892✔
1187
                return 0, 0, ErrCorruptedData
1✔
1188
        }
1✔
1189

1190
        dbID = binary.BigEndian.Uint32(encID)
890✔
1191
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
890✔
1192

890✔
1193
        return
890✔
1194
}
1195

1196
func unmapCheckID(prefix, mkey []byte) (uint32, error) {
28✔
1197
        encID, err := trimPrefix(prefix, mkey, []byte(catalogCheckPrefix))
28✔
1198
        if err != nil {
28✔
1199
                return 0, err
×
1200
        }
×
1201

1202
        if len(encID) < 3*EncIDLen {
28✔
1203
                return 0, ErrCorruptedData
×
1204
        }
×
1205
        return binary.BigEndian.Uint32(encID[2*EncIDLen:]), nil
28✔
1206
}
1207

1208
func parseCheckConstraint(prefix, key, value []byte) (*CheckConstraint, error) {
28✔
1209
        id, err := unmapCheckID(prefix, key)
28✔
1210
        if err != nil {
28✔
1211
                return nil, err
×
1212
        }
×
1213

1214
        nameLen := value[0] + 1
28✔
1215
        name := string(value[1 : 1+nameLen])
28✔
1216

28✔
1217
        exp, err := ParseExpFromString(string(value[1+nameLen:]))
28✔
1218
        if err != nil {
28✔
1219
                return nil, err
×
1220
        }
×
1221

1222
        return &CheckConstraint{
28✔
1223
                id:   id,
28✔
1224
                name: name,
28✔
1225
                exp:  exp,
28✔
1226
        }, nil
28✔
1227
}
1228

1229
func unmapColSpec(prefix, mkey []byte) (dbID, tableID, colID uint32, colType SQLValueType, err error) {
2,363✔
1230
        encID, err := trimPrefix(prefix, mkey, []byte(catalogColumnPrefix))
2,363✔
1231
        if err != nil {
2,364✔
1232
                return 0, 0, 0, "", err
1✔
1233
        }
1✔
1234

1235
        if len(encID) < EncIDLen*3 {
2,363✔
1236
                return 0, 0, 0, "", ErrCorruptedData
1✔
1237
        }
1✔
1238

1239
        dbID = binary.BigEndian.Uint32(encID)
2,361✔
1240
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
2,361✔
1241
        colID = binary.BigEndian.Uint32(encID[2*EncIDLen:])
2,361✔
1242

2,361✔
1243
        colType, err = asType(string(encID[EncIDLen*3:]))
2,361✔
1244
        if err != nil {
2,363✔
1245
                return 0, 0, 0, "", ErrCorruptedData
2✔
1246
        }
2✔
1247

1248
        return
2,359✔
1249
}
1250

1251
func asType(t string) (SQLValueType, error) {
2,361✔
1252
        switch t {
2,361✔
1253
        case IntegerType,
1254
                Float64Type,
1255
                BooleanType,
1256
                VarcharType,
1257
                UUIDType,
1258
                BLOBType,
1259
                TimestampType,
1260
                JSONType:
2,359✔
1261
                return t, nil
2,359✔
1262
        }
1263
        return t, ErrCorruptedData
2✔
1264
}
1265

1266
func unmapIndex(sqlPrefix, mkey []byte) (dbID, tableID, indexID uint32, err error) {
1,202✔
1267
        encID, err := trimPrefix(sqlPrefix, mkey, []byte(catalogIndexPrefix))
1,202✔
1268
        if err != nil {
1,203✔
1269
                return 0, 0, 0, err
1✔
1270
        }
1✔
1271

1272
        if len(encID) != EncIDLen*3 {
1,202✔
1273
                return 0, 0, 0, ErrCorruptedData
1✔
1274
        }
1✔
1275

1276
        dbID = binary.BigEndian.Uint32(encID)
1,200✔
1277
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
1,200✔
1278
        indexID = binary.BigEndian.Uint32(encID[EncIDLen*2:])
1,200✔
1279

1,200✔
1280
        return
1,200✔
1281
}
1282

1283
func unmapIndexEntry(index *Index, sqlPrefix, mkey []byte) (encPKVals []byte, err error) {
1,648✔
1284
        if index == nil {
1,648✔
1285
                return nil, ErrIllegalArguments
×
1286
        }
×
1287

1288
        enc, err := trimPrefix(sqlPrefix, mkey, []byte(MappedPrefix))
1,648✔
1289
        if err != nil {
1,649✔
1290
                return nil, ErrCorruptedData
1✔
1291
        }
1✔
1292

1293
        if len(enc) <= EncIDLen*2 {
1,655✔
1294
                return nil, ErrCorruptedData
8✔
1295
        }
8✔
1296

1297
        off := 0
1,639✔
1298

1,639✔
1299
        tableID := binary.BigEndian.Uint32(enc[off:])
1,639✔
1300
        off += EncIDLen
1,639✔
1301

1,639✔
1302
        indexID := binary.BigEndian.Uint32(enc[off:])
1,639✔
1303
        off += EncIDLen
1,639✔
1304

1,639✔
1305
        if tableID != index.table.id || indexID != index.id {
1,639✔
1306
                return nil, ErrCorruptedData
×
1307
        }
×
1308

1309
        //read index values
1310
        for _, col := range index.cols {
3,278✔
1311
                if enc[off] == KeyValPrefixNull {
1,639✔
1312
                        off += 1
×
1313
                        continue
×
1314
                }
1315
                if enc[off] != KeyValPrefixNotNull {
1,639✔
1316
                        return nil, ErrCorruptedData
×
1317
                }
×
1318
                off += 1
1,639✔
1319

1,639✔
1320
                maxLen := col.MaxLen()
1,639✔
1321
                if variableSizedType(col.colType) {
1,655✔
1322
                        maxLen += EncLenLen
16✔
1323
                }
16✔
1324
                if len(enc)-off < maxLen {
1,653✔
1325
                        return nil, ErrCorruptedData
14✔
1326
                }
14✔
1327

1328
                off += maxLen
1,625✔
1329
        }
1330

1331
        //PK cannot be nil
1332
        if len(enc)-off < 1 {
1,626✔
1333
                return nil, ErrCorruptedData
1✔
1334
        }
1✔
1335

1336
        return enc[off:], nil
1,624✔
1337
}
1338

1339
func variableSizedType(sqlType SQLValueType) bool {
2,269✔
1340
        return sqlType == VarcharType || sqlType == BLOBType
2,269✔
1341
}
2,269✔
1342

1343
func MapKey(prefix []byte, mappingPrefix string, encValues ...[]byte) []byte {
91,891✔
1344
        mkeyLen := len(prefix) + len(mappingPrefix)
91,891✔
1345

91,891✔
1346
        for _, ev := range encValues {
345,214✔
1347
                mkeyLen += len(ev)
253,323✔
1348
        }
253,323✔
1349

1350
        mkey := make([]byte, mkeyLen)
91,891✔
1351

91,891✔
1352
        off := 0
91,891✔
1353

91,891✔
1354
        copy(mkey, prefix)
91,891✔
1355
        off += len(prefix)
91,891✔
1356

91,891✔
1357
        copy(mkey[off:], []byte(mappingPrefix))
91,891✔
1358
        off += len(mappingPrefix)
91,891✔
1359

91,891✔
1360
        for _, ev := range encValues {
345,214✔
1361
                copy(mkey[off:], ev)
253,323✔
1362
                off += len(ev)
253,323✔
1363
        }
253,323✔
1364

1365
        return mkey
91,891✔
1366
}
1367

1368
func EncodeID(id uint32) []byte {
182,060✔
1369
        var encID [EncIDLen]byte
182,060✔
1370
        binary.BigEndian.PutUint32(encID[:], id)
182,060✔
1371
        return encID[:]
182,060✔
1372
}
182,060✔
1373

1374
const (
1375
        KeyValPrefixNull       byte = 0x20
1376
        KeyValPrefixNotNull    byte = 0x80
1377
        KeyValPrefixUpperBound byte = 0xFF
1378
)
1379

1380
func EncodeValueAsKey(val TypedValue, colType SQLValueType, maxLen int) ([]byte, int, error) {
59,509✔
1381
        return EncodeRawValueAsKey(val.RawValue(), colType, maxLen)
59,509✔
1382
}
59,509✔
1383

1384
// EncodeRawValueAsKey encodes a value in a b-tree meaningful way.
1385
func EncodeRawValueAsKey(val interface{}, colType SQLValueType, maxLen int) ([]byte, int, error) {
59,645✔
1386
        if maxLen <= 0 {
59,648✔
1387
                return nil, 0, ErrInvalidValue
3✔
1388
        }
3✔
1389
        if maxLen > MaxKeyLen {
59,643✔
1390
                return nil, 0, ErrMaxKeyLengthExceeded
1✔
1391
        }
1✔
1392

1393
        convVal, err := mayApplyImplicitConversion(val, colType)
59,641✔
1394
        if err != nil {
59,643✔
1395
                return nil, 0, err
2✔
1396
        }
2✔
1397

1398
        if convVal == nil {
59,833✔
1399
                return []byte{KeyValPrefixNull}, 0, nil
194✔
1400
        }
194✔
1401

1402
        switch colType {
59,445✔
1403
        case VarcharType:
6,406✔
1404
                {
12,812✔
1405
                        strVal, ok := convVal.(string)
6,406✔
1406
                        if !ok {
6,406✔
1407
                                return nil, 0, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
×
1408
                        }
×
1409

1410
                        if len(strVal) > maxLen {
6,407✔
1411
                                return nil, 0, ErrMaxLengthExceeded
1✔
1412
                        }
1✔
1413

1414
                        // notnull + value + padding + len(value)
1415
                        encv := make([]byte, 1+maxLen+EncLenLen)
6,405✔
1416
                        encv[0] = KeyValPrefixNotNull
6,405✔
1417
                        copy(encv[1:], []byte(strVal))
6,405✔
1418
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(strVal)))
6,405✔
1419

6,405✔
1420
                        return encv, len(strVal), nil
6,405✔
1421
                }
1422
        case IntegerType:
49,349✔
1423
                {
98,698✔
1424
                        if maxLen != 8 {
49,350✔
1425
                                return nil, 0, ErrCorruptedData
1✔
1426
                        }
1✔
1427

1428
                        intVal, ok := convVal.(int64)
49,348✔
1429
                        if !ok {
49,351✔
1430
                                return nil, 0, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
3✔
1431
                        }
3✔
1432

1433
                        // v
1434
                        var encv [9]byte
49,345✔
1435
                        encv[0] = KeyValPrefixNotNull
49,345✔
1436
                        binary.BigEndian.PutUint64(encv[1:], uint64(intVal))
49,345✔
1437
                        // map to unsigned integer space for lexical sorting order
49,345✔
1438
                        encv[1] ^= 0x80
49,345✔
1439

49,345✔
1440
                        return encv[:], 8, nil
49,345✔
1441
                }
1442
        case BooleanType:
309✔
1443
                {
618✔
1444
                        if maxLen != 1 {
310✔
1445
                                return nil, 0, ErrCorruptedData
1✔
1446
                        }
1✔
1447

1448
                        boolVal, ok := convVal.(bool)
308✔
1449
                        if !ok {
308✔
1450
                                return nil, 0, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
×
1451
                        }
×
1452

1453
                        // v
1454
                        var encv [2]byte
308✔
1455
                        encv[0] = KeyValPrefixNotNull
308✔
1456
                        if boolVal {
495✔
1457
                                encv[1] = 1
187✔
1458
                        }
187✔
1459

1460
                        return encv[:], 1, nil
308✔
1461
                }
1462
        case BLOBType:
1,847✔
1463
                {
3,694✔
1464
                        blobVal, ok := convVal.([]byte)
1,847✔
1465
                        if !ok {
1,848✔
1466
                                return nil, 0, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
1✔
1467
                        }
1✔
1468

1469
                        if len(blobVal) > maxLen {
1,847✔
1470
                                return nil, 0, ErrMaxLengthExceeded
1✔
1471
                        }
1✔
1472

1473
                        // notnull + value + padding + len(value)
1474
                        encv := make([]byte, 1+maxLen+EncLenLen)
1,845✔
1475
                        encv[0] = KeyValPrefixNotNull
1,845✔
1476
                        copy(encv[1:], []byte(blobVal))
1,845✔
1477
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(blobVal)))
1,845✔
1478

1,845✔
1479
                        return encv, len(blobVal), nil
1,845✔
1480
                }
1481
        case UUIDType:
21✔
1482
                {
42✔
1483
                        uuidVal, ok := convVal.(uuid.UUID)
21✔
1484
                        if !ok {
21✔
1485
                                return nil, 0, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1486
                        }
×
1487

1488
                        // notnull + value
1489
                        encv := make([]byte, 17)
21✔
1490
                        encv[0] = KeyValPrefixNotNull
21✔
1491
                        copy(encv[1:], uuidVal[:])
21✔
1492

21✔
1493
                        return encv, 16, nil
21✔
1494
                }
1495
        case TimestampType:
211✔
1496
                {
422✔
1497
                        if maxLen != 8 {
212✔
1498
                                return nil, 0, ErrCorruptedData
1✔
1499
                        }
1✔
1500

1501
                        timeVal, ok := convVal.(time.Time)
210✔
1502
                        if !ok {
211✔
1503
                                return nil, 0, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1504
                        }
1✔
1505

1506
                        // v
1507
                        var encv [9]byte
209✔
1508
                        encv[0] = KeyValPrefixNotNull
209✔
1509
                        binary.BigEndian.PutUint64(encv[1:], uint64(timeVal.UnixNano()))
209✔
1510
                        // map to unsigned integer space for lexical sorting order
209✔
1511
                        encv[1] ^= 0x80
209✔
1512

209✔
1513
                        return encv[:], 8, nil
209✔
1514
                }
1515
        case Float64Type:
1,301✔
1516
                {
2,602✔
1517
                        floatVal, ok := convVal.(float64)
1,301✔
1518
                        if !ok {
1,301✔
1519
                                return nil, 0, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1520
                        }
×
1521

1522
                        // Apart form the sign bit, bit representation of float64
1523
                        // can be sorted lexicographically
1524
                        floatBits := math.Float64bits(floatVal)
1,301✔
1525

1,301✔
1526
                        var encv [9]byte
1,301✔
1527
                        encv[0] = KeyValPrefixNotNull
1,301✔
1528
                        binary.BigEndian.PutUint64(encv[1:], floatBits)
1,301✔
1529

1,301✔
1530
                        if encv[1]&0x80 != 0 {
1,319✔
1531
                                // For negative numbers, the order must be reversed,
18✔
1532
                                // we also negate the sign bit so that all negative
18✔
1533
                                // numbers end up in the smaller half of values
18✔
1534
                                for i := 1; i < 9; i++ {
162✔
1535
                                        encv[i] = ^encv[i]
144✔
1536
                                }
144✔
1537
                        } else {
1,283✔
1538
                                // For positive numbers, the order is already correct,
1,283✔
1539
                                // we only have to set the sign bit to 1 to ensure that
1,283✔
1540
                                // positive numbers end in the larger half of values
1,283✔
1541
                                encv[1] ^= 0x80
1,283✔
1542
                        }
1,283✔
1543

1544
                        return encv[:], 8, nil
1,301✔
1545
                }
1546
        }
1547

1548
        return nil, 0, ErrInvalidValue
1✔
1549
}
1550

1551
// DecodeValueFromKey is the inverse of EncodeRawValueAsKey for the bytes that
1552
// follow the per-column tag in an index key. `buf` must start with the tag
1553
// byte (KeyValPrefixNull or KeyValPrefixNotNull). It returns the decoded
1554
// TypedValue and the number of bytes consumed from `buf`.
1555
//
1556
// The encoded form mirrors EncodeRawValueAsKey:
1557
//
1558
//        NULL:                 [tag]                                         (1 byte)
1559
//        NotNull, fixed-width: [tag][maxLen bytes]                           (1+maxLen bytes)
1560
//        NotNull, var-width:   [tag][maxLen bytes payload][len uint32 BE]    (1+maxLen+EncLenLen bytes)
1561
//
1562
// For Integer/Timestamp the lexical sign-flip is reversed; for Float64 the
1563
// lexical mangling is reversed; for Varchar/BLOB the trailing length suffix
1564
// trims the zero padding.
1565
func DecodeValueFromKey(buf []byte, colType SQLValueType, maxLen int) (TypedValue, int, error) {
543✔
1566
        if maxLen <= 0 {
543✔
NEW
1567
                return nil, 0, ErrInvalidValue
×
NEW
1568
        }
×
1569
        if len(buf) < 1 {
544✔
1570
                return nil, 0, ErrCorruptedData
1✔
1571
        }
1✔
1572

1573
        switch buf[0] {
542✔
1574
        case KeyValPrefixNull:
8✔
1575
                return &NullValue{t: colType}, 1, nil
8✔
1576
        case KeyValPrefixNotNull:
533✔
1577
                // fall through to type-specific decode
1578
        default:
1✔
1579
                return nil, 0, ErrCorruptedData
1✔
1580
        }
1581

1582
        switch colType {
533✔
1583
        case VarcharType:
332✔
1584
                need := 1 + maxLen + EncLenLen
332✔
1585
                if len(buf) < need {
332✔
NEW
1586
                        return nil, 0, ErrCorruptedData
×
NEW
1587
                }
×
1588
                strLen := int(binary.BigEndian.Uint32(buf[1+maxLen:]))
332✔
1589
                if strLen < 0 || strLen > maxLen {
332✔
NEW
1590
                        return nil, 0, ErrCorruptedData
×
NEW
1591
                }
×
1592
                return &Varchar{val: string(buf[1 : 1+strLen])}, need, nil
332✔
1593

1594
        case IntegerType:
183✔
1595
                if maxLen != 8 {
183✔
NEW
1596
                        return nil, 0, ErrCorruptedData
×
NEW
1597
                }
×
1598
                if len(buf) < 9 {
184✔
1599
                        return nil, 0, ErrCorruptedData
1✔
1600
                }
1✔
1601
                var raw [8]byte
182✔
1602
                copy(raw[:], buf[1:9])
182✔
1603
                raw[0] ^= 0x80
182✔
1604
                return &Integer{val: int64(binary.BigEndian.Uint64(raw[:]))}, 9, nil
182✔
1605

1606
        case BooleanType:
2✔
1607
                if maxLen != 1 {
2✔
NEW
1608
                        return nil, 0, ErrCorruptedData
×
NEW
1609
                }
×
1610
                if len(buf) < 2 {
2✔
NEW
1611
                        return nil, 0, ErrCorruptedData
×
NEW
1612
                }
×
1613
                return &Bool{val: buf[1] != 0}, 2, nil
2✔
1614

1615
        case BLOBType:
4✔
1616
                need := 1 + maxLen + EncLenLen
4✔
1617
                if len(buf) < need {
4✔
NEW
1618
                        return nil, 0, ErrCorruptedData
×
NEW
1619
                }
×
1620
                blobLen := int(binary.BigEndian.Uint32(buf[1+maxLen:]))
4✔
1621
                if blobLen < 0 || blobLen > maxLen {
4✔
NEW
1622
                        return nil, 0, ErrCorruptedData
×
NEW
1623
                }
×
1624
                // copy out so the result is independent of the input buffer
1625
                out := make([]byte, blobLen)
4✔
1626
                copy(out, buf[1:1+blobLen])
4✔
1627
                return &Blob{val: out}, need, nil
4✔
1628

1629
        case UUIDType:
1✔
1630
                if maxLen != 16 {
1✔
NEW
1631
                        return nil, 0, ErrCorruptedData
×
NEW
1632
                }
×
1633
                if len(buf) < 17 {
1✔
NEW
1634
                        return nil, 0, ErrCorruptedData
×
NEW
1635
                }
×
1636
                var u uuid.UUID
1✔
1637
                copy(u[:], buf[1:17])
1✔
1638
                return &UUID{val: u}, 17, nil
1✔
1639

1640
        case TimestampType:
3✔
1641
                if maxLen != 8 {
3✔
NEW
1642
                        return nil, 0, ErrCorruptedData
×
NEW
1643
                }
×
1644
                if len(buf) < 9 {
3✔
NEW
1645
                        return nil, 0, ErrCorruptedData
×
NEW
1646
                }
×
1647
                var raw [8]byte
3✔
1648
                copy(raw[:], buf[1:9])
3✔
1649
                raw[0] ^= 0x80
3✔
1650
                nanos := int64(binary.BigEndian.Uint64(raw[:]))
3✔
1651
                return &Timestamp{val: time.Unix(0, nanos).UTC()}, 9, nil
3✔
1652

1653
        case Float64Type:
8✔
1654
                if maxLen != 8 {
8✔
NEW
1655
                        return nil, 0, ErrCorruptedData
×
NEW
1656
                }
×
1657
                if len(buf) < 9 {
8✔
NEW
1658
                        return nil, 0, ErrCorruptedData
×
NEW
1659
                }
×
1660
                var raw [8]byte
8✔
1661
                copy(raw[:], buf[1:9])
8✔
1662
                // Reverse the encode-time mangling: positive numbers had only the
8✔
1663
                // leading sign bit flipped; negative numbers had every byte flipped.
8✔
1664
                if raw[0]&0x80 != 0 {
14✔
1665
                        // originally positive: just flip the sign bit back
6✔
1666
                        raw[0] ^= 0x80
6✔
1667
                } else {
8✔
1668
                        // originally negative: flip all bytes
2✔
1669
                        for i := range raw {
18✔
1670
                                raw[i] = ^raw[i]
16✔
1671
                        }
16✔
1672
                }
1673
                bits := binary.BigEndian.Uint64(raw[:])
8✔
1674
                return &Float64{val: math.Float64frombits(bits)}, 9, nil
8✔
1675
        }
1676

NEW
1677
        return nil, 0, ErrInvalidValue
×
1678
}
1679

1680
func getEncodeRawValue(val TypedValue, colType SQLValueType) (interface{}, error) {
107,411✔
1681
        if colType != JSONType || val.Type() == JSONType {
214,491✔
1682
                return val.RawValue(), nil
107,080✔
1683
        }
107,080✔
1684

1685
        if val.Type() != VarcharType {
332✔
1686
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1687
        }
1✔
1688
        s, _ := val.RawValue().(string)
330✔
1689

330✔
1690
        raw := json.RawMessage(s)
330✔
1691
        if !json.Valid(raw) {
331✔
1692
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1693
        }
1✔
1694
        return raw, nil
329✔
1695
}
1696

1697
func EncodeValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
21,676✔
1698
        v, err := getEncodeRawValue(val, colType)
21,676✔
1699
        if err != nil {
21,678✔
1700
                return nil, err
2✔
1701
        }
2✔
1702
        return EncodeRawValue(v, colType, maxLen, false)
21,674✔
1703
}
1704

1705
func EncodeNullableValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
85,735✔
1706
        v, err := getEncodeRawValue(val, colType)
85,735✔
1707
        if err != nil {
85,735✔
1708
                return nil, err
×
1709
        }
×
1710
        return EncodeRawValue(v, colType, maxLen, true)
85,735✔
1711
}
1712

1713
// EncodeRawValue encode a value in a byte format. This is the internal binary representation of a value. Can be decoded with DecodeValue.
1714
func EncodeRawValue(val interface{}, colType SQLValueType, maxLen int, nullable bool) ([]byte, error) {
107,409✔
1715
        convVal, err := mayApplyImplicitConversion(val, colType)
107,409✔
1716
        if err != nil {
107,410✔
1717
                return nil, err
1✔
1718
        }
1✔
1719

1720
        if convVal == nil && !nullable {
107,408✔
1721
                return nil, ErrInvalidValue
×
1722
        }
×
1723

1724
        if convVal == nil {
107,448✔
1725
                encv := make([]byte, EncLenLen)
40✔
1726
                binary.BigEndian.PutUint32(encv[:], uint32(0))
40✔
1727
                return encv, nil
40✔
1728
        }
40✔
1729

1730
        switch colType {
107,368✔
1731
        case VarcharType:
21,188✔
1732
                {
42,376✔
1733
                        strVal, ok := convVal.(string)
21,188✔
1734
                        if !ok {
21,188✔
1735
                                return nil, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
×
1736
                        }
×
1737

1738
                        if maxLen > 0 && len(strVal) > maxLen {
21,191✔
1739
                                return nil, ErrMaxLengthExceeded
3✔
1740
                        }
3✔
1741

1742
                        // len(v) + v
1743
                        encv := make([]byte, EncLenLen+len(strVal))
21,185✔
1744
                        binary.BigEndian.PutUint32(encv[:], uint32(len(strVal)))
21,185✔
1745
                        copy(encv[EncLenLen:], []byte(strVal))
21,185✔
1746

21,185✔
1747
                        return encv, nil
21,185✔
1748
                }
1749
        case IntegerType:
40,282✔
1750
                {
80,564✔
1751
                        intVal, ok := convVal.(int64)
40,282✔
1752
                        if !ok {
40,284✔
1753
                                return nil, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
2✔
1754
                        }
2✔
1755

1756
                        // map to unsigned integer space
1757
                        // len(v) + v
1758
                        var encv [EncLenLen + 8]byte
40,280✔
1759
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
40,280✔
1760
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(intVal))
40,280✔
1761

40,280✔
1762
                        return encv[:], nil
40,280✔
1763
                }
1764
        case BooleanType:
392✔
1765
                {
784✔
1766
                        boolVal, ok := convVal.(bool)
392✔
1767
                        if !ok {
394✔
1768
                                return nil, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
2✔
1769
                        }
2✔
1770

1771
                        // len(v) + v
1772
                        var encv [EncLenLen + 1]byte
390✔
1773
                        binary.BigEndian.PutUint32(encv[:], uint32(1))
390✔
1774
                        if boolVal {
615✔
1775
                                encv[EncLenLen] = 1
225✔
1776
                        }
225✔
1777

1778
                        return encv[:], nil
390✔
1779
                }
1780
        case BLOBType:
417✔
1781
                {
834✔
1782
                        var blobVal []byte
417✔
1783

417✔
1784
                        if val != nil {
834✔
1785
                                v, ok := convVal.([]byte)
417✔
1786
                                if !ok {
421✔
1787
                                        return nil, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
4✔
1788
                                }
4✔
1789
                                blobVal = v
413✔
1790
                        }
1791

1792
                        if maxLen > 0 && len(blobVal) > maxLen {
416✔
1793
                                return nil, ErrMaxLengthExceeded
3✔
1794
                        }
3✔
1795

1796
                        // len(v) + v
1797
                        encv := make([]byte, EncLenLen+len(blobVal))
410✔
1798
                        binary.BigEndian.PutUint32(encv[:], uint32(len(blobVal)))
410✔
1799
                        copy(encv[EncLenLen:], blobVal)
410✔
1800

410✔
1801
                        return encv[:], nil
410✔
1802
                }
1803
        case JSONType:
431✔
1804
                rawJson, ok := val.(json.RawMessage)
431✔
1805
                if !ok {
533✔
1806
                        data, err := json.Marshal(val)
102✔
1807
                        if err != nil {
102✔
1808
                                return nil, err
×
1809
                        }
×
1810
                        rawJson = data
102✔
1811
                }
1812

1813
                // len(v) + v
1814
                encv := make([]byte, EncLenLen+len(rawJson))
431✔
1815
                binary.BigEndian.PutUint32(encv[:], uint32(len(rawJson)))
431✔
1816
                copy(encv[EncLenLen:], rawJson)
431✔
1817

431✔
1818
                return encv[:], nil
431✔
1819
        case UUIDType:
11✔
1820
                {
22✔
1821
                        uuidVal, ok := convVal.(uuid.UUID)
11✔
1822
                        if !ok {
11✔
1823
                                return nil, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1824
                        }
×
1825

1826
                        // len(v) + v
1827
                        var encv [EncLenLen + 16]byte
11✔
1828
                        binary.BigEndian.PutUint32(encv[:], uint32(16))
11✔
1829
                        copy(encv[EncLenLen:], uuidVal[:])
11✔
1830

11✔
1831
                        return encv[:], nil
11✔
1832
                }
1833
        case TimestampType:
15,018✔
1834
                {
30,036✔
1835
                        timeVal, ok := convVal.(time.Time)
15,018✔
1836
                        if !ok {
15,019✔
1837
                                return nil, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1838
                        }
1✔
1839

1840
                        // len(v) + v
1841
                        var encv [EncLenLen + 8]byte
15,017✔
1842
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
15,017✔
1843
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(TimeToInt64(timeVal)))
15,017✔
1844

15,017✔
1845
                        return encv[:], nil
15,017✔
1846
                }
1847
        case Float64Type:
29,627✔
1848
                {
59,254✔
1849
                        floatVal, ok := convVal.(float64)
29,627✔
1850
                        if !ok {
29,627✔
1851
                                return nil, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1852
                        }
×
1853

1854
                        var encv [EncLenLen + 8]byte
29,627✔
1855
                        floatBits := math.Float64bits(floatVal)
29,627✔
1856
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
29,627✔
1857
                        binary.BigEndian.PutUint64(encv[EncLenLen:], floatBits)
29,627✔
1858

29,627✔
1859
                        return encv[:], nil
29,627✔
1860
                }
1861
        }
1862

1863
        return nil, ErrInvalidValue
2✔
1864
}
1865

1866
func DecodeValueLength(b []byte) (int, int, error) {
940,278✔
1867
        if len(b) < EncLenLen {
940,280✔
1868
                return 0, 0, ErrCorruptedData
2✔
1869
        }
2✔
1870

1871
        vlen := int(binary.BigEndian.Uint32(b[:]))
940,276✔
1872
        voff := EncLenLen
940,276✔
1873

940,276✔
1874
        if vlen < 0 || len(b) < voff+vlen {
940,279✔
1875
                return 0, 0, ErrCorruptedData
3✔
1876
        }
3✔
1877

1878
        return vlen, EncLenLen, nil
940,273✔
1879
}
1880

1881
func DecodeValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
292,177✔
1882
        return decodeValue(b, colType, false)
292,177✔
1883
}
292,177✔
1884

1885
func DecodeNullableValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
638,500✔
1886
        return decodeValue(b, colType, true)
638,500✔
1887
}
638,500✔
1888

1889
func decodeValue(b []byte, colType SQLValueType, nullable bool) (TypedValue, int, error) {
930,677✔
1890
        vlen, voff, err := DecodeValueLength(b)
930,677✔
1891
        if err != nil {
930,681✔
1892
                return nil, 0, err
4✔
1893
        }
4✔
1894

1895
        if vlen == 0 && nullable {
930,848✔
1896
                return &NullValue{t: colType}, voff, nil
175✔
1897
        }
175✔
1898

1899
        switch colType {
930,498✔
1900
        case VarcharType:
170,540✔
1901
                {
341,080✔
1902
                        v := string(b[voff : voff+vlen])
170,540✔
1903
                        voff += vlen
170,540✔
1904

170,540✔
1905
                        return &Varchar{val: v}, voff, nil
170,540✔
1906
                }
170,540✔
1907
        case IntegerType:
327,969✔
1908
                {
655,938✔
1909
                        if vlen != 8 {
327,970✔
1910
                                return nil, 0, ErrCorruptedData
1✔
1911
                        }
1✔
1912

1913
                        v := binary.BigEndian.Uint64(b[voff:])
327,968✔
1914
                        voff += vlen
327,968✔
1915

327,968✔
1916
                        return &Integer{val: int64(v)}, voff, nil
327,968✔
1917
                }
1918
        case BooleanType:
2,103✔
1919
                {
4,206✔
1920
                        if vlen != 1 {
2,105✔
1921
                                return nil, 0, ErrCorruptedData
2✔
1922
                        }
2✔
1923

1924
                        v := b[voff] == 1
2,101✔
1925
                        voff += 1
2,101✔
1926

2,101✔
1927
                        return &Bool{val: v}, voff, nil
2,101✔
1928
                }
1929
        case BLOBType:
3,185✔
1930
                {
6,370✔
1931
                        v := b[voff : voff+vlen]
3,185✔
1932
                        voff += vlen
3,185✔
1933

3,185✔
1934
                        return &Blob{val: v}, voff, nil
3,185✔
1935
                }
3,185✔
1936
        case JSONType:
2,301✔
1937
                {
4,602✔
1938
                        v := b[voff : voff+vlen]
2,301✔
1939
                        voff += vlen
2,301✔
1940

2,301✔
1941
                        var val interface{}
2,301✔
1942
                        err = json.Unmarshal(v, &val)
2,301✔
1943

2,301✔
1944
                        return &JSON{val: val}, voff, err
2,301✔
1945
                }
2,301✔
1946
        case UUIDType:
33✔
1947
                {
66✔
1948
                        if vlen != 16 {
33✔
1949
                                return nil, 0, ErrCorruptedData
×
1950
                        }
×
1951

1952
                        u, err := uuid.FromBytes(b[voff : voff+16])
33✔
1953
                        if err != nil {
33✔
1954
                                return nil, 0, fmt.Errorf("%w: %s", ErrCorruptedData, err.Error())
×
1955
                        }
×
1956

1957
                        voff += vlen
33✔
1958

33✔
1959
                        return &UUID{val: u}, voff, nil
33✔
1960
                }
1961
        case TimestampType:
141,935✔
1962
                {
283,870✔
1963
                        if vlen != 8 {
141,936✔
1964
                                return nil, 0, ErrCorruptedData
1✔
1965
                        }
1✔
1966

1967
                        v := binary.BigEndian.Uint64(b[voff:])
141,934✔
1968
                        voff += vlen
141,934✔
1969

141,934✔
1970
                        return &Timestamp{val: TimeFromInt64(int64(v))}, voff, nil
141,934✔
1971
                }
1972
        case Float64Type:
282,431✔
1973
                {
564,862✔
1974
                        if vlen != 8 {
282,431✔
1975
                                return nil, 0, ErrCorruptedData
×
1976
                        }
×
1977
                        v := binary.BigEndian.Uint64(b[voff:])
282,431✔
1978
                        voff += vlen
282,431✔
1979
                        return &Float64{val: math.Float64frombits(v)}, voff, nil
282,431✔
1980
                }
1981
        }
1982

1983
        return nil, 0, ErrCorruptedData
1✔
1984
}
1985

1986
// addSchemaToTx adds the schema of the catalog to the given transaction.
1987
func (catlg *Catalog) addSchemaToTx(ctx context.Context, tx *store.OngoingTx) error {
29✔
1988
        return catlg.loadCatalog(ctx, tx, true)
29✔
1989
}
29✔
1990

1991
func iteratePrefix(ctx context.Context, tx *store.OngoingTx, prefix []byte, onSpec func(key, value []byte, deleted bool) error) error {
14,568✔
1992
        dbReaderSpec := store.KeyReaderSpec{
14,568✔
1993
                Prefix: prefix,
14,568✔
1994
        }
14,568✔
1995

14,568✔
1996
        colSpecReader, err := tx.NewKeyReader(dbReaderSpec)
14,568✔
1997
        if err != nil {
14,568✔
1998
                return err
×
1999
        }
×
2000
        defer colSpecReader.Close()
14,568✔
2001

14,568✔
2002
        for {
33,659✔
2003
                mkey, vref, err := colSpecReader.Read(ctx)
19,091✔
2004
                if errors.Is(err, store.ErrNoMoreEntries) {
33,649✔
2005
                        break
14,558✔
2006
                }
2007
                if err != nil {
4,533✔
2008
                        return err
×
2009
                }
×
2010

2011
                md := vref.KVMetadata()
4,533✔
2012
                if md != nil && md.IsExpirable() {
4,534✔
2013
                        return ErrBrokenCatalogColSpecExpirable
1✔
2014
                }
1✔
2015

2016
                deleted := md != nil && md.Deleted()
4,532✔
2017
                var v []byte
4,532✔
2018
                if !deleted {
8,968✔
2019
                        v, err = vref.Resolve()
4,436✔
2020
                        if err != nil {
4,436✔
2021
                                return err
×
2022
                        }
×
2023
                }
2024

2025
                err = onSpec(mkey, v, deleted)
4,532✔
2026
                if err != nil {
4,541✔
2027
                        return err
9✔
2028
                }
9✔
2029
        }
2030
        return nil
14,558✔
2031
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc