• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.61
/embedded/sql/catalog.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bytes"
21
        "context"
22
        "encoding/binary"
23
        "encoding/json"
24
        "errors"
25
        "fmt"
26
        "math"
27
        "strings"
28
        "time"
29

30
        "github.com/codenotary/immudb/embedded/store"
31
        "github.com/google/uuid"
32
)
33

34
// Catalog represents a database catalog containing metadata for all tables in the database.
35
type Catalog struct {
36
        enginePrefix []byte
37

38
        tables       []*Table
39
        tablesByID   map[uint32]*Table
40
        tablesByName map[string]*Table
41

42
        maxTableID uint32 // The maxTableID variable is used to assign unique ids to new tables as they are created.
43
}
44

45
type Constraint interface{}
46

47
type PrimaryKeyConstraint []string
48

49
// ForeignKeyConstraint is parsed but not enforced — stored for ORM migration compatibility
50
type ForeignKeyConstraint struct {
51
        cols     []string
52
        refTable string
53
        refCols  []string
54
}
55

56
type CheckConstraint struct {
57
        id   uint32
58
        name string
59
        exp  ValueExp
60
}
61

62
type Table struct {
63
        catalog          *Catalog
64
        id               uint32
65
        name             string
66
        cols             []*Column
67
        colsByID         map[uint32]*Column
68
        colsByName       map[string]*Column
69
        indexes          []*Index
70
        indexesByName    map[string]*Index
71
        indexesByColID   map[uint32][]*Index
72
        checkConstraints map[string]CheckConstraint
73
        primaryIndex     *Index
74
        autoIncrementPK  bool
75
        maxPK            int64
76

77
        maxColID   uint32
78
        maxIndexID uint32
79

80
        // systemScan is non-nil only for tables installed via
81
        // RegisterSystemTable. When set, SELECTs against this table bypass
82
        // storage and iterate the rows returned by Scan. See system_tables.go.
83
        systemScan func(ctx context.Context, tx *SQLTx) ([]*Row, error)
84
}
85

86
type Index struct {
87
        table     *Table
88
        id        uint32
89
        unique    bool
90
        cols      []*Column
91
        colsByID  map[uint32]*Column
92
        predicate ValueExp // WHERE clause for partial indexes (nil = full index)
93
}
94

95
type Column struct {
96
        table         *Table
97
        id            uint32
98
        colName       string
99
        colType       SQLValueType
100
        maxLen        int
101
        autoIncrement bool
102
        notNull       bool
103
        defaultValue  ValueExp
104
}
105

106
func newCatalog(enginePrefix []byte) *Catalog {
5,149✔
107
        ctlg := &Catalog{
5,149✔
108
                enginePrefix: enginePrefix,
5,149✔
109
                tablesByID:   make(map[uint32]*Table),
5,149✔
110
                tablesByName: make(map[string]*Table),
5,149✔
111
        }
5,149✔
112

5,149✔
113
        // Install every registered system table (pg_type today; more in
5,149✔
114
        // future pg-compat phases). Each is a virtual catalog entry with
5,149✔
115
        // Go-provided row source — no storage backing.
5,149✔
116
        for _, def := range registeredSystemTables() {
28,735✔
117
                installSystemTable(ctlg, def)
23,586✔
118
        }
23,586✔
119

120
        return ctlg
5,149✔
121
}
122

123
func init() {
33✔
124
        // pg_type: the one system table immudb has shipped with for years.
33✔
125
        // Schema stays at the historic three-column shape (oid, typbasetype,
33✔
126
        // typname) — every existing test that hits pg_type assumes it —
33✔
127
        // but the Scan now returns the canonical PG type catalog so psql
33✔
128
        // \dT and Rails' type map work without the PG wire layer having to
33✔
129
        // fabricate rows separately. The list is intentionally small: just
33✔
130
        // the types immudb actually emits. Callers that want a composite
33✔
131
        // row per user table walk pg_class instead.
33✔
132
        RegisterSystemTable(&SystemTableDef{
33✔
133
                Name: "pg_type",
33✔
134
                Columns: []SystemTableColumn{
33✔
135
                        {Name: "oid", Type: VarcharType, MaxLen: 10},
33✔
136
                        {Name: "typbasetype", Type: VarcharType, MaxLen: 10},
33✔
137
                        {Name: "typname", Type: VarcharType, MaxLen: 50},
33✔
138
                },
33✔
139
                PKColumn: "oid",
33✔
140
                Scan: func(ctx context.Context, tx *SQLTx) ([]*Row, error) {
40✔
141
                        rows := make([]*Row, 0, len(pgTypeBaseRows))
7✔
142
                        for _, r := range pgTypeBaseRows {
189✔
143
                                rows = append(rows, &Row{ValuesByPosition: []TypedValue{
182✔
144
                                        &Varchar{val: r.oid},
182✔
145
                                        &Varchar{val: "0"},
182✔
146
                                        &Varchar{val: r.name},
182✔
147
                                }})
182✔
148
                        }
182✔
149
                        return rows, nil
7✔
150
                },
151
        })
152
}
153

154
// pgTypeBaseRows lists the PostgreSQL base types we advertise in
155
// pg_type. OIDs match real PG so clients that cache them (Rails type
156
// map) round-trip correctly. Kept here rather than in pkg/pgsql/sys to
157
// avoid a dependency inversion — embedded/sql can't import pkg/pgsql.
158
var pgTypeBaseRows = []struct {
159
        oid  string
160
        name string
161
}{
162
        {"16", "bool"},
163
        {"17", "bytea"},
164
        {"18", "char"},
165
        {"19", "name"},
166
        {"20", "int8"},
167
        {"21", "int2"},
168
        {"23", "int4"},
169
        {"25", "text"},
170
        {"26", "oid"},
171
        {"114", "json"},
172
        {"142", "xml"},
173
        {"700", "float4"},
174
        {"701", "float8"},
175
        {"790", "money"},
176
        {"829", "macaddr"},
177
        {"869", "inet"},
178
        {"1042", "bpchar"},
179
        {"1043", "varchar"},
180
        {"1082", "date"},
181
        {"1083", "time"},
182
        {"1114", "timestamp"},
183
        {"1184", "timestamptz"},
184
        {"1186", "interval"},
185
        {"1700", "numeric"},
186
        {"2950", "uuid"},
187
        {"3802", "jsonb"},
188
}
189

190
func (catlg *Catalog) ExistTable(table string) bool {
1,428✔
191
        _, exists := catlg.tablesByName[table]
1,428✔
192
        return exists
1,428✔
193
}
1,428✔
194

195
func (catlg *Catalog) GetTables() []*Table {
13,267✔
196
        ts := make([]*Table, 0, len(catlg.tables))
13,267✔
197

13,267✔
198
        ts = append(ts, catlg.tables...)
13,267✔
199

13,267✔
200
        return ts
13,267✔
201
}
13,267✔
202

203
func (catlg *Catalog) GetTableByName(name string) (*Table, error) {
9,350✔
204
        table, exists := catlg.tablesByName[name]
9,350✔
205
        if !exists {
9,762✔
206
                return nil, fmt.Errorf("%w (%s)", ErrTableDoesNotExist, name)
412✔
207
        }
412✔
208
        return table, nil
8,938✔
209
}
210

211
func (catlg *Catalog) GetTableByID(id uint32) (*Table, error) {
1,296✔
212
        table, exists := catlg.tablesByID[id]
1,296✔
213
        if !exists {
2,590✔
214
                return nil, ErrTableDoesNotExist
1,294✔
215
        }
1,294✔
216
        return table, nil
2✔
217
}
218

219
func (t *Table) ID() uint32 {
175✔
220
        return t.id
175✔
221
}
175✔
222

223
func (t *Table) Cols() []*Column {
829✔
224
        cs := make([]*Column, 0, len(t.cols))
829✔
225

829✔
226
        cs = append(cs, t.cols...)
829✔
227

829✔
228
        return cs
829✔
229
}
829✔
230

231
func (t *Table) ColsByName() map[string]*Column {
50✔
232
        cs := make(map[string]*Column, len(t.cols))
50✔
233

50✔
234
        for _, c := range t.cols {
201✔
235
                cs[c.colName] = c
151✔
236
        }
151✔
237

238
        return cs
50✔
239
}
240

241
func (t *Table) Name() string {
1,211✔
242
        return t.name
1,211✔
243
}
1,211✔
244

245
func (t *Table) PrimaryIndex() *Index {
500✔
246
        return t.primaryIndex
500✔
247
}
500✔
248

249
func (t *Table) IsIndexed(colName string) (indexed bool, err error) {
36✔
250
        col, err := t.GetColumnByName(colName)
36✔
251
        if err != nil {
37✔
252
                return false, err
1✔
253
        }
1✔
254
        return len(t.indexesByColID[col.id]) > 0, nil
35✔
255
}
256

257
func (t *Table) GetColumnByName(name string) (*Column, error) {
20,564✔
258
        col, exists := t.colsByName[name]
20,564✔
259
        if !exists {
20,620✔
260
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, name)
56✔
261
        }
56✔
262
        return col, nil
20,508✔
263
}
264

265
func (t *Table) GetColumnByID(id uint32) (*Column, error) {
295,232✔
266
        col, exists := t.colsByID[id]
295,232✔
267
        if !exists {
295,339✔
268
                return nil, ErrColumnDoesNotExist
107✔
269
        }
107✔
270
        return col, nil
295,125✔
271
}
272

273
func (t *Table) ColumnsByID() map[uint32]*Column {
10✔
274
        return t.colsByID
10✔
275
}
10✔
276

277
func (t *Table) GetIndexes() []*Index {
122✔
278
        idxs := make([]*Index, 0, len(t.indexes))
122✔
279

122✔
280
        idxs = append(idxs, t.indexes...)
122✔
281

122✔
282
        return idxs
122✔
283
}
122✔
284

285
func (t *Table) GetIndexesByColID(colID uint32) []*Index {
14✔
286
        idxs := make([]*Index, 0, len(t.indexes))
14✔
287

14✔
288
        idxs = append(idxs, t.indexesByColID[colID]...)
14✔
289

14✔
290
        return idxs
14✔
291
}
14✔
292

293
func (t *Table) GetMaxColID() uint32 {
50✔
294
        return t.maxColID
50✔
295
}
50✔
296

297
func (i *Index) IsPrimary() bool {
17,654✔
298
        return i.id == PKIndexID
17,654✔
299
}
17,654✔
300

301
func (i *Index) IsUnique() bool {
2,764✔
302
        return i.unique
2,764✔
303
}
2,764✔
304

305
func (i *Index) Cols() []*Column {
635✔
306
        return i.cols
635✔
307
}
635✔
308

309
func (i *Index) IncludesCol(colID uint32) bool {
51✔
310
        _, ok := i.colsByID[colID]
51✔
311
        return ok
51✔
312
}
51✔
313

314
func (i *Index) enginePrefix() []byte {
21,735✔
315
        return i.table.catalog.enginePrefix
21,735✔
316
}
21,735✔
317

318
func (i *Index) coversOrdCols(ordExps []*OrdExp, rangesByColID map[uint32]*typedValueRange) bool {
495✔
319
        if !ordExpsHaveSameDirection(ordExps) {
529✔
320
                return false
34✔
321
        }
34✔
322
        return i.hasPrefix(i.cols, ordExps) || i.sortableUsing(ordExps, rangesByColID)
461✔
323
}
324

325
// countEqualityCoveredCols returns the number of consecutive leading columns
326
// of the index that have a point-equality (unitary) range in rangesByColID.
327
// Used by selectINLJIndex to prefer more selective indexes: an index on (a, b)
328
// scores 2 when both a=x AND b=y are present, beating a single-column (a) index.
329
func (i *Index) countEqualityCoveredCols(rangesByColID map[uint32]*typedValueRange) int {
370✔
330
        count := 0
370✔
331
        for _, col := range i.cols {
742✔
332
                r, ok := rangesByColID[col.id]
372✔
333
                if !ok || !r.unitary() {
717✔
334
                        break
345✔
335
                }
336
                count++
27✔
337
        }
338
        return count
370✔
339
}
340

341
// coversEqualityRanges returns true when at least the index's leading column
342
// has a point-equality range in rangesByColID. It is a convenience wrapper
343
// around countEqualityCoveredCols used in tests and unit assertions.
344
func (i *Index) coversEqualityRanges(rangesByColID map[uint32]*typedValueRange) bool {
8✔
345
        return i.countEqualityCoveredCols(rangesByColID) > 0
8✔
346
}
8✔
347

348
func ordExpsHaveSameDirection(exps []*OrdExp) bool {
495✔
349
        if len(exps) == 0 {
495✔
350
                return true
×
351
        }
×
352

353
        desc := exps[0].descOrder
495✔
354
        for _, e := range exps[1:] {
634✔
355
                if e.descOrder != desc {
173✔
356
                        return false
34✔
357
                }
34✔
358
        }
359
        return true
461✔
360
}
361

362
func (i *Index) hasPrefix(columns []*Column, ordExps []*OrdExp) bool {
487✔
363
        if len(ordExps) > len(columns) {
533✔
364
                return false
46✔
365
        }
46✔
366

367
        for j, ordCol := range ordExps {
930✔
368
                sel := ordCol.AsSelector()
489✔
369
                if sel == nil {
511✔
370
                        return false
22✔
371
                }
22✔
372

373
                aggFn, _, colName := sel.resolve(i.table.Name())
467✔
374
                if len(aggFn) > 0 {
472✔
375
                        return false
5✔
376
                }
5✔
377

378
                col, err := i.table.GetColumnByName(colName)
462✔
379
                if err != nil || col.id != columns[j].id {
650✔
380
                        return false
188✔
381
                }
188✔
382
        }
383
        return true
226✔
384
}
385

386
func (i *Index) sortableUsing(columns []*OrdExp, rangesByColID map[uint32]*typedValueRange) bool {
251✔
387
        // all columns before colID must be fixedValues otherwise the index can not be used
251✔
388
        sel := columns[0].AsSelector()
251✔
389
        if sel == nil {
277✔
390
                return false
26✔
391
        }
26✔
392

393
        aggFn, _, colName := sel.resolve(i.table.Name())
225✔
394
        if len(aggFn) > 0 {
228✔
395
                return false
3✔
396
        }
3✔
397

398
        firstCol, err := i.table.GetColumnByName(colName)
222✔
399
        if err != nil {
236✔
400
                return false
14✔
401
        }
14✔
402

403
        for j, col := range i.cols {
435✔
404
                if col.id == firstCol.id {
253✔
405
                        return i.hasPrefix(i.cols[j:], columns)
26✔
406
                }
26✔
407

408
                colRange, ok := rangesByColID[col.id]
201✔
409
                if ok && colRange.unitary() {
220✔
410
                        continue
19✔
411
                }
412
                return false
182✔
413
        }
414
        return false
×
415
}
416

417
func (i *Index) Name() string {
34,085✔
418
        return indexName(i.table.name, i.cols)
34,085✔
419
}
34,085✔
420

421
func (i *Index) ID() uint32 {
22✔
422
        return i.id
22✔
423
}
22✔
424

425
func (t *Table) GetIndexByName(name string) (*Index, error) {
37✔
426
        idx, exists := t.indexesByName[name]
37✔
427
        if !exists {
38✔
428
                return nil, fmt.Errorf("%w (%s)", ErrIndexNotFound, name)
1✔
429
        }
1✔
430
        return idx, nil
36✔
431
}
432

433
func indexName(tableName string, cols []*Column) string {
34,121✔
434
        var buf strings.Builder
34,121✔
435

34,121✔
436
        buf.WriteString(tableName)
34,121✔
437

34,121✔
438
        buf.WriteString("(")
34,121✔
439

34,121✔
440
        for c, col := range cols {
68,549✔
441
                buf.WriteString(col.colName)
34,428✔
442

34,428✔
443
                if c < len(cols)-1 {
34,735✔
444
                        buf.WriteString(",")
307✔
445
                }
307✔
446
        }
447

448
        buf.WriteString(")")
34,121✔
449

34,121✔
450
        return buf.String()
34,121✔
451
}
452

453
func (catlg *Catalog) newTable(name string, colsSpec map[uint32]*ColSpec, checkConstraints map[string]CheckConstraint, maxColID uint32) (table *Table, err error) {
1,302✔
454
        if len(name) == 0 || len(colsSpec) == 0 {
1,305✔
455
                return nil, ErrIllegalArguments
3✔
456
        }
3✔
457

458
        for id := range colsSpec {
4,826✔
459
                if id <= 0 || id > maxColID {
3,527✔
460
                        return nil, ErrIllegalArguments
×
461
                }
×
462
        }
463

464
        exists := catlg.ExistTable(name)
1,299✔
465
        if exists {
1,306✔
466
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, name)
7✔
467
        }
7✔
468

469
        // Generate a new ID for the table by incrementing the 'maxTableID' variable of the 'catalog' instance.
470
        id := (catlg.maxTableID + 1)
1,292✔
471

1,292✔
472
        // This code is attempting to check if a table with the given id already exists in the Catalog.
1,292✔
473
        // If the function returns nil for err, it means that the table already exists and the function
1,292✔
474
        // should return an error indicating that the table cannot be created again.
1,292✔
475
        _, err = catlg.GetTableByID(id)
1,292✔
476
        if err == nil {
1,292✔
477
                return nil, fmt.Errorf("%w (%d)", ErrTableAlreadyExists, id)
×
478
        }
×
479

480
        table = &Table{
1,292✔
481
                id:               id,
1,292✔
482
                catalog:          catlg,
1,292✔
483
                name:             name,
1,292✔
484
                cols:             make([]*Column, 0, len(colsSpec)),
1,292✔
485
                colsByID:         make(map[uint32]*Column),
1,292✔
486
                colsByName:       make(map[string]*Column),
1,292✔
487
                indexesByName:    make(map[string]*Index),
1,292✔
488
                indexesByColID:   make(map[uint32][]*Index),
1,292✔
489
                checkConstraints: checkConstraints,
1,292✔
490
                maxColID:         maxColID,
1,292✔
491
        }
1,292✔
492

1,292✔
493
        for id := uint32(1); id <= maxColID; id++ {
4,849✔
494
                cs, found := colsSpec[id]
3,557✔
495
                if !found {
3,598✔
496
                        // dropped column
41✔
497
                        continue
41✔
498
                }
499

500
                if isReservedCol(cs.colName) {
3,517✔
501
                        return nil, fmt.Errorf("%w(%s)", ErrReservedWord, cs.colName)
1✔
502
                }
1✔
503

504
                _, colExists := table.colsByName[cs.colName]
3,515✔
505
                if colExists {
3,516✔
506
                        return nil, ErrDuplicatedColumn
1✔
507
                }
1✔
508

509
                if cs.autoIncrement && cs.colType != IntegerType {
3,516✔
510
                        return nil, ErrLimitedAutoIncrement
2✔
511
                }
2✔
512

513
                if !validMaxLenForType(cs.maxLen, cs.colType) {
3,512✔
514
                        return nil, ErrLimitedMaxLen
×
515
                }
×
516

517
                col := &Column{
3,512✔
518
                        id:            uint32(id),
3,512✔
519
                        table:         table,
3,512✔
520
                        colName:       cs.colName,
3,512✔
521
                        colType:       cs.colType,
3,512✔
522
                        maxLen:        cs.maxLen,
3,512✔
523
                        autoIncrement: cs.autoIncrement,
3,512✔
524
                        notNull:       cs.notNull,
3,512✔
525
                        defaultValue:  cs.defaultValue,
3,512✔
526
                }
3,512✔
527

3,512✔
528
                table.cols = append(table.cols, col)
3,512✔
529
                table.colsByID[col.id] = col
3,512✔
530
                table.colsByName[col.colName] = col
3,512✔
531
        }
532

533
        catlg.tables = append(catlg.tables, table)
1,288✔
534
        catlg.tablesByID[table.id] = table
1,288✔
535
        catlg.tablesByName[table.name] = table
1,288✔
536

1,288✔
537
        // increment table count on successfull table creation.
1,288✔
538
        // This ensures that each new table is assigned a unique ID
1,288✔
539
        // that has not been used before.
1,288✔
540
        catlg.maxTableID++
1,288✔
541

1,288✔
542
        return table, nil
1,288✔
543
}
544

545
func (catlg *Catalog) deleteTable(table *Table) error {
8✔
546
        _, exists := catlg.tablesByID[table.id]
8✔
547
        if !exists {
8✔
548
                return ErrTableDoesNotExist
×
549
        }
×
550

551
        newTables := make([]*Table, 0, len(catlg.tables)-1)
8✔
552

8✔
553
        for _, t := range catlg.tables {
29✔
554
                if t.id != table.id {
34✔
555
                        newTables = append(newTables, t)
13✔
556
                }
13✔
557
        }
558

559
        catlg.tables = newTables
8✔
560
        delete(catlg.tablesByID, table.id)
8✔
561
        delete(catlg.tablesByName, table.name)
8✔
562

8✔
563
        return nil
8✔
564
}
565

566
func (t *Table) newIndex(unique bool, colIDs []uint32) (index *Index, err error) {
1,739✔
567
        if len(colIDs) < 1 {
1,740✔
568
                return nil, ErrIllegalArguments
1✔
569
        }
1✔
570

571
        // validate column ids
572
        cols := make([]*Column, len(colIDs))
1,738✔
573
        colsByID := make(map[uint32]*Column, len(colIDs))
1,738✔
574

1,738✔
575
        for i, colID := range colIDs {
3,570✔
576
                col, err := t.GetColumnByID(colID)
1,832✔
577
                if err != nil {
1,833✔
578
                        return nil, err
1✔
579
                }
1✔
580

581
                _, ok := colsByID[colID]
1,831✔
582
                if ok {
1,832✔
583
                        return nil, ErrDuplicatedColumn
1✔
584
                }
1✔
585

586
                cols[i] = col
1,830✔
587
                colsByID[colID] = col
1,830✔
588
        }
589

590
        index = &Index{
1,736✔
591
                id:       uint32(t.maxIndexID),
1,736✔
592
                table:    t,
1,736✔
593
                unique:   unique,
1,736✔
594
                cols:     cols,
1,736✔
595
                colsByID: colsByID,
1,736✔
596
        }
1,736✔
597

1,736✔
598
        _, exists := t.indexesByName[index.Name()]
1,736✔
599
        if exists {
1,742✔
600
                return nil, ErrIndexAlreadyExists
6✔
601
        }
6✔
602

603
        t.indexes = append(t.indexes, index)
1,730✔
604
        t.indexesByName[index.Name()] = index
1,730✔
605

1,730✔
606
        // having a direct way to get the indexes by colID
1,730✔
607
        for _, col := range index.cols {
3,552✔
608
                t.indexesByColID[col.id] = append(t.indexesByColID[col.id], index)
1,822✔
609
        }
1,822✔
610

611
        if index.id == PKIndexID {
2,999✔
612
                t.primaryIndex = index
1,269✔
613
                t.autoIncrementPK = len(index.cols) == 1 && index.cols[0].autoIncrement
1,269✔
614
        }
1,269✔
615

616
        // increment table count on successfull table creation.
617
        // This ensures that each new table is assigned a unique ID
618
        // that has not been used before.
619
        t.maxIndexID++
1,730✔
620

1,730✔
621
        return index, nil
1,730✔
622
}
623

624
func (t *Table) newColumn(spec *ColSpec) (*Column, error) {
19✔
625
        if isReservedCol(spec.colName) {
20✔
626
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, spec.colName)
1✔
627
        }
1✔
628

629
        if spec.autoIncrement {
19✔
630
                return nil, fmt.Errorf("%w (%s)", ErrLimitedAutoIncrement, spec.colName)
1✔
631
        }
1✔
632

633
        if spec.notNull {
18✔
634
                return nil, fmt.Errorf("%w (%s)", ErrNewColumnMustBeNullable, spec.colName)
1✔
635
        }
1✔
636

637
        if !validMaxLenForType(spec.maxLen, spec.colType) {
17✔
638
                return nil, fmt.Errorf("%w (%s)", ErrLimitedMaxLen, spec.colName)
1✔
639
        }
1✔
640

641
        _, exists := t.colsByName[spec.colName]
15✔
642
        if exists {
18✔
643
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, spec.colName)
3✔
644
        }
3✔
645

646
        t.maxColID++
12✔
647

12✔
648
        col := &Column{
12✔
649
                id:            t.maxColID,
12✔
650
                table:         t,
12✔
651
                colName:       spec.colName,
12✔
652
                colType:       spec.colType,
12✔
653
                maxLen:        spec.maxLen,
12✔
654
                autoIncrement: spec.autoIncrement,
12✔
655
                notNull:       spec.notNull,
12✔
656
                defaultValue:  spec.defaultValue,
12✔
657
        }
12✔
658

12✔
659
        t.cols = append(t.cols, col)
12✔
660
        t.colsByID[col.id] = col
12✔
661
        t.colsByName[col.colName] = col
12✔
662

12✔
663
        return col, nil
12✔
664
}
665

666
func (ctlg *Catalog) renameTable(oldName, newName string) (*Table, error) {
6✔
667
        if oldName == newName {
7✔
668
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
669
        }
1✔
670

671
        t, err := ctlg.GetTableByName(oldName)
5✔
672
        if err != nil {
7✔
673
                return nil, err
2✔
674
        }
2✔
675

676
        _, err = ctlg.GetTableByName(newName)
3✔
677
        if err == nil {
4✔
678
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, newName)
1✔
679
        }
1✔
680

681
        t.name = newName
2✔
682

2✔
683
        delete(ctlg.tablesByName, oldName)
2✔
684
        ctlg.tablesByName[newName] = t
2✔
685

2✔
686
        return t, nil
2✔
687
}
688

689
func (t *Table) renameColumn(oldName, newName string) (*Column, error) {
9✔
690
        if isReservedCol(newName) {
9✔
691
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, newName)
×
692
        }
×
693

694
        if oldName == newName {
10✔
695
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
696
        }
1✔
697

698
        col, exists := t.colsByName[oldName]
8✔
699
        if !exists {
9✔
700
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, oldName)
1✔
701
        }
1✔
702

703
        _, exists = t.colsByName[newName]
7✔
704
        if exists {
8✔
705
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, newName)
1✔
706
        }
1✔
707

708
        col.colName = newName
6✔
709

6✔
710
        delete(t.colsByName, oldName)
6✔
711
        t.colsByName[newName] = col
6✔
712

6✔
713
        return col, nil
6✔
714
}
715

716
func (t *Table) deleteColumn(col *Column) error {
12✔
717
        isIndexed, err := t.IsIndexed(col.colName)
12✔
718
        if err != nil {
12✔
719
                return err
×
720
        }
×
721

722
        if isIndexed {
16✔
723
                return fmt.Errorf("%w %s because one or more indexes require it", ErrCannotDropColumn, col.colName)
4✔
724
        }
4✔
725

726
        newCols := make([]*Column, 0, len(t.cols)-1)
8✔
727

8✔
728
        for _, c := range t.cols {
55✔
729
                if c.id != col.id {
86✔
730
                        newCols = append(newCols, c)
39✔
731
                }
39✔
732
        }
733

734
        t.cols = newCols
8✔
735
        delete(t.colsByName, col.colName)
8✔
736
        delete(t.colsByID, col.id)
8✔
737

8✔
738
        return nil
8✔
739
}
740

741
func (t *Table) deleteCheck(name string) (uint32, error) {
4✔
742
        c, exists := t.checkConstraints[name]
4✔
743
        if !exists {
5✔
744
                return 0, fmt.Errorf("%s.%s: %w", t.name, name, ErrConstraintNotFound)
1✔
745
        }
1✔
746

747
        delete(t.checkConstraints, name)
3✔
748
        return c.id, nil
3✔
749
}
750

751
func (t *Table) deleteIndex(index *Index) error {
5✔
752
        if index.IsPrimary() {
6✔
753
                return fmt.Errorf("%w: primary key index can NOT be deleted", ErrIllegalArguments)
1✔
754
        }
1✔
755

756
        newIndexes := make([]*Index, 0, len(t.indexes)-1)
4✔
757

4✔
758
        for _, i := range t.indexes {
13✔
759
                if i.id != index.id {
14✔
760
                        newIndexes = append(newIndexes, i)
5✔
761
                }
5✔
762
        }
763

764
        t.indexes = newIndexes
4✔
765
        delete(t.indexesByColID, index.id)
4✔
766
        delete(t.indexesByName, index.Name())
4✔
767

4✔
768
        return nil
4✔
769
}
770

771
func (c *Column) ID() uint32 {
733✔
772
        return c.id
733✔
773
}
733✔
774

775
func (c *Column) Name() string {
4,168✔
776
        return c.colName
4,168✔
777
}
4,168✔
778

779
func (c *Column) Type() SQLValueType {
26,954✔
780
        return c.colType
26,954✔
781
}
26,954✔
782

783
func (c *Column) MaxLen() int {
77,022✔
784
        switch c.colType {
77,022✔
785
        case BooleanType:
818✔
786
                return 1
818✔
787
        case IntegerType:
57,487✔
788
                return 8
57,487✔
789
        case TimestampType:
1,509✔
790
                return 8
1,509✔
791
        case Float64Type:
3,540✔
792
                return 8
3,540✔
793
        case UUIDType:
39✔
794
                return 16
39✔
795
        }
796

797
        return c.maxLen
13,629✔
798
}
799

800
func (c *Column) IsNullable() bool {
107✔
801
        return !c.notNull
107✔
802
}
107✔
803

804
func (c *Column) IsAutoIncremental() bool {
42✔
805
        return c.autoIncrement
42✔
806
}
42✔
807

808
func (c *Column) HasDefault() bool {
1,415✔
809
        return c.defaultValue != nil
1,415✔
810
}
1,415✔
811

NEW
812
func (c *Column) DefaultValue() ValueExp {
×
NEW
813
        return c.defaultValue
×
NEW
814
}
×
815

816
func validMaxLenForType(maxLen int, sqlType SQLValueType) bool {
3,528✔
817
        switch sqlType {
3,528✔
818
        case BooleanType:
252✔
819
                return maxLen <= 1
252✔
820
        case IntegerType:
1,770✔
821
                return maxLen == 0 || maxLen == 8
1,770✔
822
        case Float64Type:
107✔
823
                return maxLen == 0 || maxLen == 8
107✔
824
        case TimestampType:
50✔
825
                return maxLen == 0 || maxLen == 8
50✔
826
        case UUIDType:
29✔
827
                return maxLen == 0 || maxLen == 16
29✔
828
        }
829

830
        return maxLen >= 0
1,320✔
831
}
832

833
func (catlg *Catalog) load(ctx context.Context, tx *store.OngoingTx) error {
1,097✔
834
        return catlg.loadCatalog(ctx, tx, false)
1,097✔
835
}
1,097✔
836

837
// Clone returns a deep copy of the catalog suitable for use as the in-memory
838
// schema of a fresh transaction. Per-tx DDL mutations operate on the clone
839
// and never touch the source, so a cache of the last known schema can be
840
// cloned for read-write transactions to avoid the cost of re-running
841
// loadCatalog (prefix scans + default-expression re-parse) on every NewTx.
842
//
843
// Registered system tables (pg_type today, more in future pg-compat
844
// phases) are re-installed from scratch by newCatalog — never copied —
845
// so their identity is stable across catalogs and the Scan function
846
// pointers stay bound to the package-level registry.
847
//
848
// Immutable substructures — ValueExp default expressions, Index predicates,
849
// and CheckConstraint expressions — are shared with the source by pointer
850
// because they are never mutated after parsing. All mutable state (tables,
851
// columns, indexes, and their lookup maps) is deep-copied. Back-references
852
// (Table.catalog, Column.table, Index.table) are re-targeted to the clones.
853
//
854
// table.maxPK is copied from the source but callers that need up-to-date
855
// auto-increment state must re-run loadMaxPK (which is snapshot-dependent).
856
func (catlg *Catalog) Clone() *Catalog {
4,022✔
857
        cp := newCatalog(catlg.enginePrefix)
4,022✔
858
        cp.maxTableID = catlg.maxTableID
4,022✔
859

4,022✔
860
        if len(catlg.tables) > 0 {
7,992✔
861
                cp.tables = make([]*Table, 0, len(catlg.tables))
3,970✔
862
        }
3,970✔
863

864
        for _, t := range catlg.tables {
8,805✔
865
                nt := cloneTable(t, cp)
4,783✔
866
                cp.tables = append(cp.tables, nt)
4,783✔
867
                cp.tablesByID[nt.id] = nt
4,783✔
868
                cp.tablesByName[nt.name] = nt
4,783✔
869
        }
4,783✔
870

871
        return cp
4,022✔
872
}
873

874
// cloneTable deep-copies t and reparents columns/indexes onto the returned
875
// clone. The new Catalog back-reference must already be available so the
876
// clone's Table.catalog field can be filled in.
877
func cloneTable(t *Table, newCatalog *Catalog) *Table {
4,783✔
878
        nt := &Table{
4,783✔
879
                id:               t.id,
4,783✔
880
                catalog:          newCatalog,
4,783✔
881
                name:             t.name,
4,783✔
882
                autoIncrementPK:  t.autoIncrementPK,
4,783✔
883
                maxPK:            t.maxPK,
4,783✔
884
                maxColID:         t.maxColID,
4,783✔
885
                maxIndexID:       t.maxIndexID,
4,783✔
886
                cols:             make([]*Column, 0, len(t.cols)),
4,783✔
887
                colsByID:         make(map[uint32]*Column, len(t.colsByID)),
4,783✔
888
                colsByName:       make(map[string]*Column, len(t.colsByName)),
4,783✔
889
                indexes:          make([]*Index, 0, len(t.indexes)),
4,783✔
890
                indexesByName:    make(map[string]*Index, len(t.indexesByName)),
4,783✔
891
                indexesByColID:   make(map[uint32][]*Index, len(t.indexesByColID)),
4,783✔
892
                checkConstraints: make(map[string]CheckConstraint, len(t.checkConstraints)),
4,783✔
893
        }
4,783✔
894

4,783✔
895
        for name, cc := range t.checkConstraints {
4,853✔
896
                // CheckConstraint is a value type; its .exp (ValueExp) is immutable
70✔
897
                // after parsing so the pointer can be shared.
70✔
898
                nt.checkConstraints[name] = cc
70✔
899
        }
70✔
900

901
        for _, c := range t.cols {
21,752✔
902
                nc := *c
16,969✔
903
                nc.table = nt
16,969✔
904
                // nc.defaultValue is a ValueExp interface value; treated as immutable.
16,969✔
905
                nt.cols = append(nt.cols, &nc)
16,969✔
906
                nt.colsByID[nc.id] = &nc
16,969✔
907
                nt.colsByName[nc.colName] = &nc
16,969✔
908
        }
16,969✔
909

910
        for _, idx := range t.indexes {
11,746✔
911
                ni := &Index{
6,963✔
912
                        id:        idx.id,
6,963✔
913
                        table:     nt,
6,963✔
914
                        unique:    idx.unique,
6,963✔
915
                        predicate: idx.predicate, // immutable after parsing
6,963✔
916
                        cols:      make([]*Column, len(idx.cols)),
6,963✔
917
                        colsByID:  make(map[uint32]*Column, len(idx.colsByID)),
6,963✔
918
                }
6,963✔
919
                for i, c := range idx.cols {
14,028✔
920
                        ni.cols[i] = nt.colsByID[c.id]
7,065✔
921
                }
7,065✔
922
                for id := range idx.colsByID {
14,028✔
923
                        ni.colsByID[id] = nt.colsByID[id]
7,065✔
924
                }
7,065✔
925
                nt.indexes = append(nt.indexes, ni)
6,963✔
926
                nt.indexesByName[ni.Name()] = ni
6,963✔
927
                if idx == t.primaryIndex {
11,746✔
928
                        nt.primaryIndex = ni
4,783✔
929
                }
4,783✔
930
        }
931

932
        // Rebuild indexesByColID from the cloned indexes; it mirrors the source's
933
        // mapping from column-id → list of indexes that reference that column.
934
        for _, ni := range nt.indexes {
11,746✔
935
                for _, c := range ni.cols {
14,028✔
936
                        nt.indexesByColID[c.id] = append(nt.indexesByColID[c.id], ni)
7,065✔
937
                }
7,065✔
938
        }
939

940
        return nt
4,783✔
941
}
942

943
func (catlg *Catalog) loadCatalog(ctx context.Context, tx *store.OngoingTx, copyToTx bool) error {
1,126✔
944
        prefix := MapKey(catlg.enginePrefix, catalogTablePrefix, EncodeID(1))
1,126✔
945

1,126✔
946
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
1,999✔
947
                dbID, tableID, err := unmapTableID(catlg.enginePrefix, key)
873✔
948
                if err != nil {
873✔
949
                        return err
×
950
                }
×
951

952
                if dbID != DatabaseID {
873✔
953
                        return ErrCorruptedData
×
954
                }
×
955

956
                if deleted {
902✔
957
                        catlg.maxTableID++
29✔
958
                        return nil
29✔
959
                }
29✔
960

961
                colSpecs, maxColID, err := loadColSpecs(ctx, tableID, tx, catlg.enginePrefix, copyToTx)
844✔
962
                if err != nil {
848✔
963
                        return err
4✔
964
                }
4✔
965

966
                checks, err := loadCheckConstraints(ctx, dbID, tableID, tx, catlg.enginePrefix, copyToTx)
840✔
967
                if err != nil {
840✔
968
                        return err
×
969
                }
×
970

971
                table, err := catlg.newTable(string(value), colSpecs, checks, maxColID)
840✔
972
                if err != nil {
840✔
973
                        return err
×
974
                }
×
975

976
                if tableID != table.id {
840✔
977
                        return ErrCorruptedData
×
978
                }
×
979

980
                if copyToTx {
849✔
981
                        if err := tx.Set(key, nil, value); err != nil {
9✔
982
                                return err
×
983
                        }
×
984
                }
985
                return table.loadIndexes(ctx, catlg.enginePrefix, tx, copyToTx)
840✔
986
        })
987
}
988

989
func loadMaxPK(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, table *Table) ([]byte, error) {
1,654✔
990
        pkReaderSpec := store.KeyReaderSpec{
1,654✔
991
                Prefix:    MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(table.primaryIndex.id)),
1,654✔
992
                DescOrder: true,
1,654✔
993
        }
1,654✔
994

1,654✔
995
        pkReader, err := tx.NewKeyReader(pkReaderSpec)
1,654✔
996
        if err != nil {
1,654✔
997
                return nil, err
×
998
        }
×
999
        defer pkReader.Close()
1,654✔
1000

1,654✔
1001
        mkey, _, err := pkReader.Read(ctx)
1,654✔
1002
        if err != nil {
1,985✔
1003
                return nil, err
331✔
1004
        }
331✔
1005

1006
        return unmapIndexEntry(table.primaryIndex, sqlPrefix, mkey)
1,323✔
1007
}
1008

1009
func loadColSpecs(ctx context.Context, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[uint32]*ColSpec, uint32, error) {
844✔
1010
        prefix := MapKey(sqlPrefix, catalogColumnPrefix, EncodeID(1), EncodeID(tableID))
844✔
1011

844✔
1012
        var maxColID uint32
844✔
1013
        specs := make(map[uint32]*ColSpec)
844✔
1014

844✔
1015
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
3,208✔
1016
                if deleted {
2,405✔
1017
                        maxColID++
41✔
1018
                        return nil
41✔
1019
                }
41✔
1020

1021
                colSpec, colID, err := loadColSpec(sqlPrefix, key, value, tableID)
2,323✔
1022
                if err != nil {
2,326✔
1023
                        return err
3✔
1024
                }
3✔
1025

1026
                maxColID++
2,320✔
1027

2,320✔
1028
                specs[colID] = colSpec
2,320✔
1029

2,320✔
1030
                if copyToTx {
2,352✔
1031
                        return tx.Set(key, nil, value)
32✔
1032
                }
32✔
1033
                return nil
2,288✔
1034
        })
1035
        return specs, maxColID, err
844✔
1036
}
1037

1038
func loadColSpec(sqlPrefix, key, value []byte, tableID uint32) (*ColSpec, uint32, error) {
2,323✔
1039
        if len(value) < 6 {
2,325✔
1040
                return nil, 0, ErrCorruptedData
2✔
1041
        }
2✔
1042

1043
        mdbID, mtableID, colID, colType, err := unmapColSpec(sqlPrefix, key)
2,321✔
1044
        if err != nil {
2,322✔
1045
                return nil, 0, err
1✔
1046
        }
1✔
1047

1048
        if mdbID != 1 || tableID != mtableID {
2,320✔
1049
                return nil, 0, ErrCorruptedData
×
1050
        }
×
1051

1052
        flags := value[0]
2,320✔
1053
        maxLen := int(binary.BigEndian.Uint32(value[1:]))
2,320✔
1054

2,320✔
1055
        var colName string
2,320✔
1056
        var defaultValue ValueExp
2,320✔
1057

2,320✔
1058
        if flags&hasDefaultFlag != 0 && len(value) >= 7 {
2,324✔
1059
                // New format: {flags(1)}{maxLen(4)}{colNameLen(2)}{colName}{defaultSQL}
4✔
1060
                colNameLen := int(binary.BigEndian.Uint16(value[5:]))
4✔
1061
                if len(value) < 7+colNameLen {
4✔
NEW
1062
                        return nil, 0, ErrCorruptedData
×
NEW
1063
                }
×
1064
                colName = string(value[7 : 7+colNameLen])
4✔
1065

4✔
1066
                // Parse default value SQL if present
4✔
1067
                if defaultStart := 7 + colNameLen; defaultStart < len(value) {
8✔
1068
                        defaultSQL := string(value[defaultStart:])
4✔
1069
                        if defaultSQL != "" {
8✔
1070
                                // Parse the default expression
4✔
1071
                                stmts, err := ParseSQL(strings.NewReader("SELECT " + defaultSQL))
4✔
1072
                                if err == nil && len(stmts) > 0 {
8✔
1073
                                        if sel, ok := stmts[0].(*SelectStmt); ok && len(sel.targets) > 0 {
8✔
1074
                                                defaultValue = sel.targets[0].Exp
4✔
1075
                                        }
4✔
1076
                                }
1077
                        }
1078
                }
1079
        } else {
2,316✔
1080
                // Old format: {flags(1)}{maxLen(4)}{colName}
2,316✔
1081
                colName = string(value[5:])
2,316✔
1082
        }
2,316✔
1083

1084
        return &ColSpec{
2,320✔
1085
                colName:       colName,
2,320✔
1086
                colType:       colType,
2,320✔
1087
                maxLen:        maxLen,
2,320✔
1088
                autoIncrement: flags&autoIncrementFlag != 0,
2,320✔
1089
                notNull:       flags&nullableFlag != 0,
2,320✔
1090
                defaultValue:  defaultValue,
2,320✔
1091
        }, colID, nil
2,320✔
1092
}
1093

1094
func loadCheckConstraints(ctx context.Context, dbID, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[string]CheckConstraint, error) {
840✔
1095
        prefix := MapKey(sqlPrefix, catalogCheckPrefix, EncodeID(dbID), EncodeID(tableID))
840✔
1096
        checks := make(map[string]CheckConstraint)
840✔
1097

840✔
1098
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
882✔
1099
                if deleted {
56✔
1100
                        return nil
14✔
1101
                }
14✔
1102

1103
                check, err := parseCheckConstraint(sqlPrefix, key, value)
28✔
1104
                if err != nil {
28✔
1105
                        return err
×
1106
                }
×
1107
                checks[check.name] = *check
28✔
1108

28✔
1109
                if copyToTx {
28✔
1110
                        return tx.Set(key, nil, value)
×
1111
                }
×
1112
                return nil
28✔
1113
        })
1114
        return checks, err
840✔
1115
}
1116

1117
func (table *Table) loadIndexes(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, copyToTx bool) error {
840✔
1118
        prefix := MapKey(sqlPrefix, catalogIndexPrefix, EncodeID(1), EncodeID(table.id))
840✔
1119

840✔
1120
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
2,020✔
1121
                dbID, tableID, indexID, err := unmapIndex(sqlPrefix, key)
1,180✔
1122
                if err != nil {
1,180✔
1123
                        return err
×
1124
                }
×
1125

1126
                if table.id != tableID || dbID != 1 {
1,180✔
1127
                        return ErrCorruptedData
×
1128
                }
×
1129

1130
                if deleted {
1,192✔
1131
                        table.maxIndexID++
12✔
1132
                        return nil
12✔
1133
                }
12✔
1134

1135
                if copyToTx {
1,191✔
1136
                        if err := tx.Set(key, nil, value); err != nil {
23✔
1137
                                return err
×
1138
                        }
×
1139
                } else {
1,145✔
1140
                        // v={unique {colID1}(ASC|DESC)...{colIDN}(ASC|DESC)}
1,145✔
1141
                        colSpecLen := EncIDLen + 1
1,145✔
1142
                        if len(value) < 1+colSpecLen || len(value)%colSpecLen != 1 {
1,145✔
1143
                                return ErrCorruptedData
×
1144
                        }
×
1145

1146
                        var colIDs []uint32
1,145✔
1147
                        for i := 1; i < len(value); i += colSpecLen {
2,355✔
1148
                                colID := binary.BigEndian.Uint32(value[i:])
1,210✔
1149

1,210✔
1150
                                // TODO: currently only ASC order is supported
1,210✔
1151
                                if value[i+EncIDLen] != 0 {
1,210✔
1152
                                        return ErrCorruptedData
×
1153
                                }
×
1154
                                colIDs = append(colIDs, colID)
1,210✔
1155
                        }
1156

1157
                        index, err := table.newIndex(value[0] > 0, colIDs)
1,145✔
1158
                        if err != nil {
1,146✔
1159
                                return err
1✔
1160
                        }
1✔
1161

1162
                        if indexID != index.id {
1,144✔
1163
                                return ErrCorruptedData
×
1164
                        }
×
1165
                }
1166
                return nil
1,167✔
1167
        })
1168
}
1169

1170
func trimPrefix(prefix, mkey []byte, mappingPrefix []byte) ([]byte, error) {
5,766✔
1171
        if len(prefix)+len(mappingPrefix) > len(mkey) ||
5,766✔
1172
                !bytes.Equal(prefix, mkey[:len(prefix)]) ||
5,766✔
1173
                !bytes.Equal(mappingPrefix, mkey[len(prefix):len(prefix)+len(mappingPrefix)]) {
5,774✔
1174
                return nil, ErrIllegalMappedKey
8✔
1175
        }
8✔
1176

1177
        return mkey[len(prefix)+len(mappingPrefix):], nil
5,758✔
1178
}
1179

1180
func unmapTableID(prefix, mkey []byte) (dbID, tableID uint32, err error) {
876✔
1181
        encID, err := trimPrefix(prefix, mkey, []byte(catalogTablePrefix))
876✔
1182
        if err != nil {
877✔
1183
                return 0, 0, err
1✔
1184
        }
1✔
1185

1186
        if len(encID) != EncIDLen*2 {
876✔
1187
                return 0, 0, ErrCorruptedData
1✔
1188
        }
1✔
1189

1190
        dbID = binary.BigEndian.Uint32(encID)
874✔
1191
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
874✔
1192

874✔
1193
        return
874✔
1194
}
1195

1196
func unmapCheckID(prefix, mkey []byte) (uint32, error) {
28✔
1197
        encID, err := trimPrefix(prefix, mkey, []byte(catalogCheckPrefix))
28✔
1198
        if err != nil {
28✔
1199
                return 0, err
×
1200
        }
×
1201

1202
        if len(encID) < 3*EncIDLen {
28✔
1203
                return 0, ErrCorruptedData
×
1204
        }
×
1205
        return binary.BigEndian.Uint32(encID[2*EncIDLen:]), nil
28✔
1206
}
1207

1208
func parseCheckConstraint(prefix, key, value []byte) (*CheckConstraint, error) {
28✔
1209
        id, err := unmapCheckID(prefix, key)
28✔
1210
        if err != nil {
28✔
1211
                return nil, err
×
1212
        }
×
1213

1214
        nameLen := value[0] + 1
28✔
1215
        name := string(value[1 : 1+nameLen])
28✔
1216

28✔
1217
        exp, err := ParseExpFromString(string(value[1+nameLen:]))
28✔
1218
        if err != nil {
28✔
1219
                return nil, err
×
1220
        }
×
1221

1222
        return &CheckConstraint{
28✔
1223
                id:   id,
28✔
1224
                name: name,
28✔
1225
                exp:  exp,
28✔
1226
        }, nil
28✔
1227
}
1228

1229
func unmapColSpec(prefix, mkey []byte) (dbID, tableID, colID uint32, colType SQLValueType, err error) {
2,325✔
1230
        encID, err := trimPrefix(prefix, mkey, []byte(catalogColumnPrefix))
2,325✔
1231
        if err != nil {
2,326✔
1232
                return 0, 0, 0, "", err
1✔
1233
        }
1✔
1234

1235
        if len(encID) < EncIDLen*3 {
2,325✔
1236
                return 0, 0, 0, "", ErrCorruptedData
1✔
1237
        }
1✔
1238

1239
        dbID = binary.BigEndian.Uint32(encID)
2,323✔
1240
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
2,323✔
1241
        colID = binary.BigEndian.Uint32(encID[2*EncIDLen:])
2,323✔
1242

2,323✔
1243
        colType, err = asType(string(encID[EncIDLen*3:]))
2,323✔
1244
        if err != nil {
2,325✔
1245
                return 0, 0, 0, "", ErrCorruptedData
2✔
1246
        }
2✔
1247

1248
        return
2,321✔
1249
}
1250

1251
func asType(t string) (SQLValueType, error) {
2,323✔
1252
        switch t {
2,323✔
1253
        case IntegerType,
1254
                Float64Type,
1255
                BooleanType,
1256
                VarcharType,
1257
                UUIDType,
1258
                BLOBType,
1259
                TimestampType,
1260
                JSONType:
2,321✔
1261
                return t, nil
2,321✔
1262
        }
1263
        return t, ErrCorruptedData
2✔
1264
}
1265

1266
func unmapIndex(sqlPrefix, mkey []byte) (dbID, tableID, indexID uint32, err error) {
1,183✔
1267
        encID, err := trimPrefix(sqlPrefix, mkey, []byte(catalogIndexPrefix))
1,183✔
1268
        if err != nil {
1,184✔
1269
                return 0, 0, 0, err
1✔
1270
        }
1✔
1271

1272
        if len(encID) != EncIDLen*3 {
1,183✔
1273
                return 0, 0, 0, ErrCorruptedData
1✔
1274
        }
1✔
1275

1276
        dbID = binary.BigEndian.Uint32(encID)
1,181✔
1277
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
1,181✔
1278
        indexID = binary.BigEndian.Uint32(encID[EncIDLen*2:])
1,181✔
1279

1,181✔
1280
        return
1,181✔
1281
}
1282

1283
func unmapIndexEntry(index *Index, sqlPrefix, mkey []byte) (encPKVals []byte, err error) {
1,348✔
1284
        if index == nil {
1,348✔
1285
                return nil, ErrIllegalArguments
×
1286
        }
×
1287

1288
        enc, err := trimPrefix(sqlPrefix, mkey, []byte(MappedPrefix))
1,348✔
1289
        if err != nil {
1,349✔
1290
                return nil, ErrCorruptedData
1✔
1291
        }
1✔
1292

1293
        if len(enc) <= EncIDLen*2 {
1,355✔
1294
                return nil, ErrCorruptedData
8✔
1295
        }
8✔
1296

1297
        off := 0
1,339✔
1298

1,339✔
1299
        tableID := binary.BigEndian.Uint32(enc[off:])
1,339✔
1300
        off += EncIDLen
1,339✔
1301

1,339✔
1302
        indexID := binary.BigEndian.Uint32(enc[off:])
1,339✔
1303
        off += EncIDLen
1,339✔
1304

1,339✔
1305
        if tableID != index.table.id || indexID != index.id {
1,339✔
1306
                return nil, ErrCorruptedData
×
1307
        }
×
1308

1309
        //read index values
1310
        for _, col := range index.cols {
2,678✔
1311
                if enc[off] == KeyValPrefixNull {
1,339✔
1312
                        off += 1
×
1313
                        continue
×
1314
                }
1315
                if enc[off] != KeyValPrefixNotNull {
1,339✔
1316
                        return nil, ErrCorruptedData
×
1317
                }
×
1318
                off += 1
1,339✔
1319

1,339✔
1320
                maxLen := col.MaxLen()
1,339✔
1321
                if variableSizedType(col.colType) {
1,355✔
1322
                        maxLen += EncLenLen
16✔
1323
                }
16✔
1324
                if len(enc)-off < maxLen {
1,353✔
1325
                        return nil, ErrCorruptedData
14✔
1326
                }
14✔
1327

1328
                off += maxLen
1,325✔
1329
        }
1330

1331
        //PK cannot be nil
1332
        if len(enc)-off < 1 {
1,326✔
1333
                return nil, ErrCorruptedData
1✔
1334
        }
1✔
1335

1336
        return enc[off:], nil
1,324✔
1337
}
1338

1339
func variableSizedType(sqlType SQLValueType) bool {
1,962✔
1340
        return sqlType == VarcharType || sqlType == BLOBType
1,962✔
1341
}
1,962✔
1342

1343
func MapKey(prefix []byte, mappingPrefix string, encValues ...[]byte) []byte {
86,423✔
1344
        mkeyLen := len(prefix) + len(mappingPrefix)
86,423✔
1345

86,423✔
1346
        for _, ev := range encValues {
324,196✔
1347
                mkeyLen += len(ev)
237,773✔
1348
        }
237,773✔
1349

1350
        mkey := make([]byte, mkeyLen)
86,423✔
1351

86,423✔
1352
        off := 0
86,423✔
1353

86,423✔
1354
        copy(mkey, prefix)
86,423✔
1355
        off += len(prefix)
86,423✔
1356

86,423✔
1357
        copy(mkey[off:], []byte(mappingPrefix))
86,423✔
1358
        off += len(mappingPrefix)
86,423✔
1359

86,423✔
1360
        for _, ev := range encValues {
324,196✔
1361
                copy(mkey[off:], ev)
237,773✔
1362
                off += len(ev)
237,773✔
1363
        }
237,773✔
1364

1365
        return mkey
86,423✔
1366
}
1367

1368
func EncodeID(id uint32) []byte {
171,386✔
1369
        var encID [EncIDLen]byte
171,386✔
1370
        binary.BigEndian.PutUint32(encID[:], id)
171,386✔
1371
        return encID[:]
171,386✔
1372
}
171,386✔
1373

1374
const (
1375
        KeyValPrefixNull       byte = 0x20
1376
        KeyValPrefixNotNull    byte = 0x80
1377
        KeyValPrefixUpperBound byte = 0xFF
1378
)
1379

1380
func EncodeValueAsKey(val TypedValue, colType SQLValueType, maxLen int) ([]byte, int, error) {
55,165✔
1381
        return EncodeRawValueAsKey(val.RawValue(), colType, maxLen)
55,165✔
1382
}
55,165✔
1383

1384
// EncodeRawValueAsKey encodes a value in a b-tree meaningful way.
1385
func EncodeRawValueAsKey(val interface{}, colType SQLValueType, maxLen int) ([]byte, int, error) {
55,301✔
1386
        if maxLen <= 0 {
55,304✔
1387
                return nil, 0, ErrInvalidValue
3✔
1388
        }
3✔
1389
        if maxLen > MaxKeyLen {
55,299✔
1390
                return nil, 0, ErrMaxKeyLengthExceeded
1✔
1391
        }
1✔
1392

1393
        convVal, err := mayApplyImplicitConversion(val, colType)
55,297✔
1394
        if err != nil {
55,299✔
1395
                return nil, 0, err
2✔
1396
        }
2✔
1397

1398
        if convVal == nil {
55,480✔
1399
                return []byte{KeyValPrefixNull}, 0, nil
185✔
1400
        }
185✔
1401

1402
        switch colType {
55,110✔
1403
        case VarcharType:
5,141✔
1404
                {
10,282✔
1405
                        strVal, ok := convVal.(string)
5,141✔
1406
                        if !ok {
5,141✔
UNCOV
1407
                                return nil, 0, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
×
UNCOV
1408
                        }
×
1409

1410
                        if len(strVal) > maxLen {
5,142✔
1411
                                return nil, 0, ErrMaxLengthExceeded
1✔
1412
                        }
1✔
1413

1414
                        // notnull + value + padding + len(value)
1415
                        encv := make([]byte, 1+maxLen+EncLenLen)
5,140✔
1416
                        encv[0] = KeyValPrefixNotNull
5,140✔
1417
                        copy(encv[1:], []byte(strVal))
5,140✔
1418
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(strVal)))
5,140✔
1419

5,140✔
1420
                        return encv, len(strVal), nil
5,140✔
1421
                }
1422
        case IntegerType:
46,297✔
1423
                {
92,594✔
1424
                        if maxLen != 8 {
46,298✔
1425
                                return nil, 0, ErrCorruptedData
1✔
1426
                        }
1✔
1427

1428
                        intVal, ok := convVal.(int64)
46,296✔
1429
                        if !ok {
46,299✔
1430
                                return nil, 0, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
3✔
1431
                        }
3✔
1432

1433
                        // v
1434
                        var encv [9]byte
46,293✔
1435
                        encv[0] = KeyValPrefixNotNull
46,293✔
1436
                        binary.BigEndian.PutUint64(encv[1:], uint64(intVal))
46,293✔
1437
                        // map to unsigned integer space for lexical sorting order
46,293✔
1438
                        encv[1] ^= 0x80
46,293✔
1439

46,293✔
1440
                        return encv[:], 8, nil
46,293✔
1441
                }
1442
        case BooleanType:
307✔
1443
                {
614✔
1444
                        if maxLen != 1 {
308✔
1445
                                return nil, 0, ErrCorruptedData
1✔
1446
                        }
1✔
1447

1448
                        boolVal, ok := convVal.(bool)
306✔
1449
                        if !ok {
306✔
UNCOV
1450
                                return nil, 0, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
×
UNCOV
1451
                        }
×
1452

1453
                        // v
1454
                        var encv [2]byte
306✔
1455
                        encv[0] = KeyValPrefixNotNull
306✔
1456
                        if boolVal {
492✔
1457
                                encv[1] = 1
186✔
1458
                        }
186✔
1459

1460
                        return encv[:], 1, nil
306✔
1461
                }
1462
        case BLOBType:
1,843✔
1463
                {
3,686✔
1464
                        blobVal, ok := convVal.([]byte)
1,843✔
1465
                        if !ok {
1,844✔
1466
                                return nil, 0, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
1✔
1467
                        }
1✔
1468

1469
                        if len(blobVal) > maxLen {
1,843✔
1470
                                return nil, 0, ErrMaxLengthExceeded
1✔
1471
                        }
1✔
1472

1473
                        // notnull + value + padding + len(value)
1474
                        encv := make([]byte, 1+maxLen+EncLenLen)
1,841✔
1475
                        encv[0] = KeyValPrefixNotNull
1,841✔
1476
                        copy(encv[1:], []byte(blobVal))
1,841✔
1477
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(blobVal)))
1,841✔
1478

1,841✔
1479
                        return encv, len(blobVal), nil
1,841✔
1480
                }
1481
        case UUIDType:
20✔
1482
                {
40✔
1483
                        uuidVal, ok := convVal.(uuid.UUID)
20✔
1484
                        if !ok {
20✔
1485
                                return nil, 0, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1486
                        }
×
1487

1488
                        // notnull + value
1489
                        encv := make([]byte, 17)
20✔
1490
                        encv[0] = KeyValPrefixNotNull
20✔
1491
                        copy(encv[1:], uuidVal[:])
20✔
1492

20✔
1493
                        return encv, 16, nil
20✔
1494
                }
1495
        case TimestampType:
208✔
1496
                {
416✔
1497
                        if maxLen != 8 {
209✔
1498
                                return nil, 0, ErrCorruptedData
1✔
1499
                        }
1✔
1500

1501
                        timeVal, ok := convVal.(time.Time)
207✔
1502
                        if !ok {
208✔
1503
                                return nil, 0, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1504
                        }
1✔
1505

1506
                        // v
1507
                        var encv [9]byte
206✔
1508
                        encv[0] = KeyValPrefixNotNull
206✔
1509
                        binary.BigEndian.PutUint64(encv[1:], uint64(timeVal.UnixNano()))
206✔
1510
                        // map to unsigned integer space for lexical sorting order
206✔
1511
                        encv[1] ^= 0x80
206✔
1512

206✔
1513
                        return encv[:], 8, nil
206✔
1514
                }
1515
        case Float64Type:
1,293✔
1516
                {
2,586✔
1517
                        floatVal, ok := convVal.(float64)
1,293✔
1518
                        if !ok {
1,293✔
1519
                                return nil, 0, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1520
                        }
×
1521

1522
                        // Apart form the sign bit, bit representation of float64
1523
                        // can be sorted lexicographically
1524
                        floatBits := math.Float64bits(floatVal)
1,293✔
1525

1,293✔
1526
                        var encv [9]byte
1,293✔
1527
                        encv[0] = KeyValPrefixNotNull
1,293✔
1528
                        binary.BigEndian.PutUint64(encv[1:], floatBits)
1,293✔
1529

1,293✔
1530
                        if encv[1]&0x80 != 0 {
1,309✔
1531
                                // For negative numbers, the order must be reversed,
16✔
1532
                                // we also negate the sign bit so that all negative
16✔
1533
                                // numbers end up in the smaller half of values
16✔
1534
                                for i := 1; i < 9; i++ {
144✔
1535
                                        encv[i] = ^encv[i]
128✔
1536
                                }
128✔
1537
                        } else {
1,277✔
1538
                                // For positive numbers, the order is already correct,
1,277✔
1539
                                // we only have to set the sign bit to 1 to ensure that
1,277✔
1540
                                // positive numbers end in the larger half of values
1,277✔
1541
                                encv[1] ^= 0x80
1,277✔
1542
                        }
1,277✔
1543

1544
                        return encv[:], 8, nil
1,293✔
1545
                }
1546
        }
1547

1548
        return nil, 0, ErrInvalidValue
1✔
1549
}
1550

1551
func getEncodeRawValue(val TypedValue, colType SQLValueType) (interface{}, error) {
105,903✔
1552
        if colType != JSONType || val.Type() == JSONType {
211,469✔
1553
                return val.RawValue(), nil
105,566✔
1554
        }
105,566✔
1555

1556
        if val.Type() != VarcharType {
338✔
1557
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1558
        }
1✔
1559
        s, _ := val.RawValue().(string)
336✔
1560

336✔
1561
        raw := json.RawMessage(s)
336✔
1562
        if !json.Valid(raw) {
337✔
1563
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1564
        }
1✔
1565
        return raw, nil
335✔
1566
}
1567

1568
func EncodeValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
20,168✔
1569
        v, err := getEncodeRawValue(val, colType)
20,168✔
1570
        if err != nil {
20,170✔
1571
                return nil, err
2✔
1572
        }
2✔
1573
        return EncodeRawValue(v, colType, maxLen, false)
20,166✔
1574
}
1575

1576
func EncodeNullableValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
85,735✔
1577
        v, err := getEncodeRawValue(val, colType)
85,735✔
1578
        if err != nil {
85,735✔
1579
                return nil, err
×
1580
        }
×
1581
        return EncodeRawValue(v, colType, maxLen, true)
85,735✔
1582
}
1583

1584
// EncodeRawValue encode a value in a byte format. This is the internal binary representation of a value. Can be decoded with DecodeValue.
1585
func EncodeRawValue(val interface{}, colType SQLValueType, maxLen int, nullable bool) ([]byte, error) {
105,901✔
1586
        convVal, err := mayApplyImplicitConversion(val, colType)
105,901✔
1587
        if err != nil {
105,902✔
1588
                return nil, err
1✔
1589
        }
1✔
1590

1591
        if convVal == nil && !nullable {
105,900✔
1592
                return nil, ErrInvalidValue
×
1593
        }
×
1594

1595
        if convVal == nil {
105,940✔
1596
                encv := make([]byte, EncLenLen)
40✔
1597
                binary.BigEndian.PutUint32(encv[:], uint32(0))
40✔
1598
                return encv, nil
40✔
1599
        }
40✔
1600

1601
        switch colType {
105,860✔
1602
        case VarcharType:
20,274✔
1603
                {
40,548✔
1604
                        strVal, ok := convVal.(string)
20,274✔
1605
                        if !ok {
20,274✔
UNCOV
1606
                                return nil, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
×
UNCOV
1607
                        }
×
1608

1609
                        if maxLen > 0 && len(strVal) > maxLen {
20,277✔
1610
                                return nil, ErrMaxLengthExceeded
3✔
1611
                        }
3✔
1612

1613
                        // len(v) + v
1614
                        encv := make([]byte, EncLenLen+len(strVal))
20,271✔
1615
                        binary.BigEndian.PutUint32(encv[:], uint32(len(strVal)))
20,271✔
1616
                        copy(encv[EncLenLen:], []byte(strVal))
20,271✔
1617

20,271✔
1618
                        return encv, nil
20,271✔
1619
                }
1620
        case IntegerType:
39,684✔
1621
                {
79,368✔
1622
                        intVal, ok := convVal.(int64)
39,684✔
1623
                        if !ok {
39,686✔
1624
                                return nil, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
2✔
1625
                        }
2✔
1626

1627
                        // map to unsigned integer space
1628
                        // len(v) + v
1629
                        var encv [EncLenLen + 8]byte
39,682✔
1630
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
39,682✔
1631
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(intVal))
39,682✔
1632

39,682✔
1633
                        return encv[:], nil
39,682✔
1634
                }
1635
        case BooleanType:
392✔
1636
                {
784✔
1637
                        boolVal, ok := convVal.(bool)
392✔
1638
                        if !ok {
394✔
1639
                                return nil, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
2✔
1640
                        }
2✔
1641

1642
                        // len(v) + v
1643
                        var encv [EncLenLen + 1]byte
390✔
1644
                        binary.BigEndian.PutUint32(encv[:], uint32(1))
390✔
1645
                        if boolVal {
613✔
1646
                                encv[EncLenLen] = 1
223✔
1647
                        }
223✔
1648

1649
                        return encv[:], nil
390✔
1650
                }
1651
        case BLOBType:
417✔
1652
                {
834✔
1653
                        var blobVal []byte
417✔
1654

417✔
1655
                        if val != nil {
834✔
1656
                                v, ok := convVal.([]byte)
417✔
1657
                                if !ok {
421✔
1658
                                        return nil, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
4✔
1659
                                }
4✔
1660
                                blobVal = v
413✔
1661
                        }
1662

1663
                        if maxLen > 0 && len(blobVal) > maxLen {
416✔
1664
                                return nil, ErrMaxLengthExceeded
3✔
1665
                        }
3✔
1666

1667
                        // len(v) + v
1668
                        encv := make([]byte, EncLenLen+len(blobVal))
410✔
1669
                        binary.BigEndian.PutUint32(encv[:], uint32(len(blobVal)))
410✔
1670
                        copy(encv[EncLenLen:], blobVal)
410✔
1671

410✔
1672
                        return encv[:], nil
410✔
1673
                }
1674
        case JSONType:
437✔
1675
                rawJson, ok := val.(json.RawMessage)
437✔
1676
                if !ok {
539✔
1677
                        data, err := json.Marshal(val)
102✔
1678
                        if err != nil {
102✔
1679
                                return nil, err
×
1680
                        }
×
1681
                        rawJson = data
102✔
1682
                }
1683

1684
                // len(v) + v
1685
                encv := make([]byte, EncLenLen+len(rawJson))
437✔
1686
                binary.BigEndian.PutUint32(encv[:], uint32(len(rawJson)))
437✔
1687
                copy(encv[EncLenLen:], rawJson)
437✔
1688

437✔
1689
                return encv[:], nil
437✔
1690
        case UUIDType:
11✔
1691
                {
22✔
1692
                        uuidVal, ok := convVal.(uuid.UUID)
11✔
1693
                        if !ok {
11✔
1694
                                return nil, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1695
                        }
×
1696

1697
                        // len(v) + v
1698
                        var encv [EncLenLen + 16]byte
11✔
1699
                        binary.BigEndian.PutUint32(encv[:], uint32(16))
11✔
1700
                        copy(encv[EncLenLen:], uuidVal[:])
11✔
1701

11✔
1702
                        return encv[:], nil
11✔
1703
                }
1704
        case TimestampType:
15,016✔
1705
                {
30,032✔
1706
                        timeVal, ok := convVal.(time.Time)
15,016✔
1707
                        if !ok {
15,017✔
1708
                                return nil, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1709
                        }
1✔
1710

1711
                        // len(v) + v
1712
                        var encv [EncLenLen + 8]byte
15,015✔
1713
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
15,015✔
1714
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(TimeToInt64(timeVal)))
15,015✔
1715

15,015✔
1716
                        return encv[:], nil
15,015✔
1717
                }
1718
        case Float64Type:
29,627✔
1719
                {
59,254✔
1720
                        floatVal, ok := convVal.(float64)
29,627✔
1721
                        if !ok {
29,627✔
1722
                                return nil, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1723
                        }
×
1724

1725
                        var encv [EncLenLen + 8]byte
29,627✔
1726
                        floatBits := math.Float64bits(floatVal)
29,627✔
1727
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
29,627✔
1728
                        binary.BigEndian.PutUint64(encv[EncLenLen:], floatBits)
29,627✔
1729

29,627✔
1730
                        return encv[:], nil
29,627✔
1731
                }
1732
        }
1733

1734
        return nil, ErrInvalidValue
2✔
1735
}
1736

1737
func DecodeValueLength(b []byte) (int, int, error) {
931,970✔
1738
        if len(b) < EncLenLen {
931,972✔
1739
                return 0, 0, ErrCorruptedData
2✔
1740
        }
2✔
1741

1742
        vlen := int(binary.BigEndian.Uint32(b[:]))
931,968✔
1743
        voff := EncLenLen
931,968✔
1744

931,968✔
1745
        if vlen < 0 || len(b) < voff+vlen {
931,971✔
1746
                return 0, 0, ErrCorruptedData
3✔
1747
        }
3✔
1748

1749
        return vlen, EncLenLen, nil
931,965✔
1750
}
1751

1752
func DecodeValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
285,516✔
1753
        return decodeValue(b, colType, false)
285,516✔
1754
}
285,516✔
1755

1756
func DecodeNullableValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
638,500✔
1757
        return decodeValue(b, colType, true)
638,500✔
1758
}
638,500✔
1759

1760
func decodeValue(b []byte, colType SQLValueType, nullable bool) (TypedValue, int, error) {
924,016✔
1761
        vlen, voff, err := DecodeValueLength(b)
924,016✔
1762
        if err != nil {
924,020✔
1763
                return nil, 0, err
4✔
1764
        }
4✔
1765

1766
        if vlen == 0 && nullable {
924,187✔
1767
                return &NullValue{t: colType}, voff, nil
175✔
1768
        }
175✔
1769

1770
        switch colType {
923,837✔
1771
        case VarcharType:
166,410✔
1772
                {
332,820✔
1773
                        v := string(b[voff : voff+vlen])
166,410✔
1774
                        voff += vlen
166,410✔
1775

166,410✔
1776
                        return &Varchar{val: v}, voff, nil
166,410✔
1777
                }
166,410✔
1778
        case IntegerType:
325,453✔
1779
                {
650,906✔
1780
                        if vlen != 8 {
325,454✔
1781
                                return nil, 0, ErrCorruptedData
1✔
1782
                        }
1✔
1783

1784
                        v := binary.BigEndian.Uint64(b[voff:])
325,452✔
1785
                        voff += vlen
325,452✔
1786

325,452✔
1787
                        return &Integer{val: int64(v)}, voff, nil
325,452✔
1788
                }
1789
        case BooleanType:
2,101✔
1790
                {
4,202✔
1791
                        if vlen != 1 {
2,103✔
1792
                                return nil, 0, ErrCorruptedData
2✔
1793
                        }
2✔
1794

1795
                        v := b[voff] == 1
2,099✔
1796
                        voff += 1
2,099✔
1797

2,099✔
1798
                        return &Bool{val: v}, voff, nil
2,099✔
1799
                }
1800
        case BLOBType:
3,185✔
1801
                {
6,370✔
1802
                        v := b[voff : voff+vlen]
3,185✔
1803
                        voff += vlen
3,185✔
1804

3,185✔
1805
                        return &Blob{val: v}, voff, nil
3,185✔
1806
                }
3,185✔
1807
        case JSONType:
2,313✔
1808
                {
4,626✔
1809
                        v := b[voff : voff+vlen]
2,313✔
1810
                        voff += vlen
2,313✔
1811

2,313✔
1812
                        var val interface{}
2,313✔
1813
                        err = json.Unmarshal(v, &val)
2,313✔
1814

2,313✔
1815
                        return &JSON{val: val}, voff, err
2,313✔
1816
                }
2,313✔
1817
        case UUIDType:
33✔
1818
                {
66✔
1819
                        if vlen != 16 {
33✔
1820
                                return nil, 0, ErrCorruptedData
×
1821
                        }
×
1822

1823
                        u, err := uuid.FromBytes(b[voff : voff+16])
33✔
1824
                        if err != nil {
33✔
1825
                                return nil, 0, fmt.Errorf("%w: %s", ErrCorruptedData, err.Error())
×
1826
                        }
×
1827

1828
                        voff += vlen
33✔
1829

33✔
1830
                        return &UUID{val: u}, voff, nil
33✔
1831
                }
1832
        case TimestampType:
141,910✔
1833
                {
283,820✔
1834
                        if vlen != 8 {
141,911✔
1835
                                return nil, 0, ErrCorruptedData
1✔
1836
                        }
1✔
1837

1838
                        v := binary.BigEndian.Uint64(b[voff:])
141,909✔
1839
                        voff += vlen
141,909✔
1840

141,909✔
1841
                        return &Timestamp{val: TimeFromInt64(int64(v))}, voff, nil
141,909✔
1842
                }
1843
        case Float64Type:
282,431✔
1844
                {
564,862✔
1845
                        if vlen != 8 {
282,431✔
1846
                                return nil, 0, ErrCorruptedData
×
1847
                        }
×
1848
                        v := binary.BigEndian.Uint64(b[voff:])
282,431✔
1849
                        voff += vlen
282,431✔
1850
                        return &Float64{val: math.Float64frombits(v)}, voff, nil
282,431✔
1851
                }
1852
        }
1853

1854
        return nil, 0, ErrCorruptedData
1✔
1855
}
1856

1857
// addSchemaToTx adds the schema of the catalog to the given transaction.
1858
func (catlg *Catalog) addSchemaToTx(ctx context.Context, tx *store.OngoingTx) error {
29✔
1859
        return catlg.loadCatalog(ctx, tx, true)
29✔
1860
}
29✔
1861

1862
func iteratePrefix(ctx context.Context, tx *store.OngoingTx, prefix []byte, onSpec func(key, value []byte, deleted bool) error) error {
13,862✔
1863
        dbReaderSpec := store.KeyReaderSpec{
13,862✔
1864
                Prefix: prefix,
13,862✔
1865
        }
13,862✔
1866

13,862✔
1867
        colSpecReader, err := tx.NewKeyReader(dbReaderSpec)
13,862✔
1868
        if err != nil {
13,862✔
UNCOV
1869
                return err
×
UNCOV
1870
        }
×
1871
        defer colSpecReader.Close()
13,862✔
1872

13,862✔
1873
        for {
32,174✔
1874
                mkey, vref, err := colSpecReader.Read(ctx)
18,312✔
1875
                if errors.Is(err, store.ErrNoMoreEntries) {
32,164✔
1876
                        break
13,852✔
1877
                }
1878
                if err != nil {
4,460✔
1879
                        return err
×
1880
                }
×
1881

1882
                md := vref.KVMetadata()
4,460✔
1883
                if md != nil && md.IsExpirable() {
4,461✔
1884
                        return ErrBrokenCatalogColSpecExpirable
1✔
1885
                }
1✔
1886

1887
                deleted := md != nil && md.Deleted()
4,459✔
1888
                var v []byte
4,459✔
1889
                if !deleted {
8,822✔
1890
                        v, err = vref.Resolve()
4,363✔
1891
                        if err != nil {
4,363✔
1892
                                return err
×
1893
                        }
×
1894
                }
1895

1896
                err = onSpec(mkey, v, deleted)
4,459✔
1897
                if err != nil {
4,468✔
1898
                        return err
9✔
1899
                }
9✔
1900
        }
1901
        return nil
13,852✔
1902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc