• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9791636125

04 Jul 2024 09:10AM UTC coverage: 89.523% (+0.1%) from 89.416%
9791636125

push

gh-ci

ostafen
Implement CHECK constraints

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

554 of 628 new or added lines in 10 files covered. (88.22%)

14 existing lines in 7 files now uncovered.

35374 of 39514 relevant lines covered (89.52%)

160396.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.81
/embedded/sql/catalog.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bytes"
21
        "context"
22
        "encoding/binary"
23
        "encoding/json"
24
        "errors"
25
        "fmt"
26
        "math"
27
        "strings"
28
        "time"
29

30
        "github.com/codenotary/immudb/embedded/store"
31
        "github.com/google/uuid"
32
)
33

34
// Catalog represents a database catalog containing metadata for all tables in the database.
35
type Catalog struct {
36
        enginePrefix []byte
37

38
        tables       []*Table
39
        tablesByID   map[uint32]*Table
40
        tablesByName map[string]*Table
41

42
        maxTableID uint32 // The maxTableID variable is used to assign unique ids to new tables as they are created.
43
}
44

45
type CheckConstraint struct {
46
        id   uint32
47
        name string
48
        exp  ValueExp
49
}
50

51
type Table struct {
52
        catalog          *Catalog
53
        id               uint32
54
        name             string
55
        cols             []*Column
56
        colsByID         map[uint32]*Column
57
        colsByName       map[string]*Column
58
        indexes          []*Index
59
        indexesByName    map[string]*Index
60
        indexesByColID   map[uint32][]*Index
61
        checkConstraints map[string]CheckConstraint
62
        primaryIndex     *Index
63
        autoIncrementPK  bool
64
        maxPK            int64
65

66
        maxColID   uint32
67
        maxIndexID uint32
68
}
69

70
type Index struct {
71
        table    *Table
72
        id       uint32
73
        unique   bool
74
        cols     []*Column
75
        colsByID map[uint32]*Column
76
}
77

78
type Column struct {
79
        table         *Table
80
        id            uint32
81
        colName       string
82
        colType       SQLValueType
83
        maxLen        int
84
        autoIncrement bool
85
        notNull       bool
86
}
87

88
func newCatalog(enginePrefix []byte) *Catalog {
3,113✔
89
        ctlg := &Catalog{
3,113✔
90
                enginePrefix: enginePrefix,
3,113✔
91
                tablesByID:   make(map[uint32]*Table),
3,113✔
92
                tablesByName: make(map[string]*Table),
3,113✔
93
        }
3,113✔
94

3,113✔
95
        pgTypeTable := &Table{
3,113✔
96
                catalog: ctlg,
3,113✔
97
                name:    "pg_type",
3,113✔
98
                cols: []*Column{
3,113✔
99
                        {
3,113✔
100
                                colName: "oid",
3,113✔
101
                                colType: VarcharType,
3,113✔
102
                                maxLen:  10,
3,113✔
103
                        },
3,113✔
104
                        {
3,113✔
105
                                colName: "typbasetype",
3,113✔
106
                                colType: VarcharType,
3,113✔
107
                                maxLen:  10,
3,113✔
108
                        },
3,113✔
109
                        {
3,113✔
110
                                colName: "typname",
3,113✔
111
                                colType: VarcharType,
3,113✔
112
                                maxLen:  50,
3,113✔
113
                        },
3,113✔
114
                },
3,113✔
115
        }
3,113✔
116

3,113✔
117
        pgTypeTable.colsByName = make(map[string]*Column, len(pgTypeTable.cols))
3,113✔
118

3,113✔
119
        for _, col := range pgTypeTable.cols {
12,452✔
120
                pgTypeTable.colsByName[col.colName] = col
9,339✔
121
        }
9,339✔
122

123
        pgTypeTable.indexes = []*Index{
3,113✔
124
                {
3,113✔
125
                        unique: true,
3,113✔
126
                        cols: []*Column{
3,113✔
127
                                pgTypeTable.colsByName["oid"],
3,113✔
128
                        },
3,113✔
129
                        colsByID: map[uint32]*Column{
3,113✔
130
                                0: pgTypeTable.colsByName["oid"],
3,113✔
131
                        },
3,113✔
132
                },
3,113✔
133
        }
3,113✔
134

3,113✔
135
        pgTypeTable.primaryIndex = pgTypeTable.indexes[0]
3,113✔
136
        ctlg.tablesByName[pgTypeTable.name] = pgTypeTable
3,113✔
137

3,113✔
138
        return ctlg
3,113✔
139
}
140

141
func (catlg *Catalog) ExistTable(table string) bool {
3,811✔
142
        _, exists := catlg.tablesByName[table]
3,811✔
143
        return exists
3,811✔
144
}
3,811✔
145

146
func (catlg *Catalog) GetTables() []*Table {
3,097✔
147
        ts := make([]*Table, 0, len(catlg.tables))
3,097✔
148

3,097✔
149
        ts = append(ts, catlg.tables...)
3,097✔
150

3,097✔
151
        return ts
3,097✔
152
}
3,097✔
153

154
func (catlg *Catalog) GetTableByName(name string) (*Table, error) {
4,098✔
155
        table, exists := catlg.tablesByName[name]
4,098✔
156
        if !exists {
4,133✔
157
                return nil, fmt.Errorf("%w (%s)", ErrTableDoesNotExist, name)
35✔
158
        }
35✔
159
        return table, nil
4,063✔
160
}
161

162
func (catlg *Catalog) GetTableByID(id uint32) (*Table, error) {
3,770✔
163
        table, exists := catlg.tablesByID[id]
3,770✔
164
        if !exists {
7,538✔
165
                return nil, ErrTableDoesNotExist
3,768✔
166
        }
3,768✔
167
        return table, nil
2✔
168
}
169

170
func (t *Table) ID() uint32 {
175✔
171
        return t.id
175✔
172
}
175✔
173

174
func (t *Table) Cols() []*Column {
760✔
175
        cs := make([]*Column, 0, len(t.cols))
760✔
176

760✔
177
        cs = append(cs, t.cols...)
760✔
178

760✔
179
        return cs
760✔
180
}
760✔
181

182
func (t *Table) ColsByName() map[string]*Column {
50✔
183
        cs := make(map[string]*Column, len(t.cols))
50✔
184

50✔
185
        for _, c := range t.cols {
201✔
186
                cs[c.colName] = c
151✔
187
        }
151✔
188

189
        return cs
50✔
190
}
191

192
func (t *Table) Name() string {
806✔
193
        return t.name
806✔
194
}
806✔
195

196
func (t *Table) PrimaryIndex() *Index {
434✔
197
        return t.primaryIndex
434✔
198
}
434✔
199

200
func (t *Table) IsIndexed(colName string) (indexed bool, err error) {
33✔
201
        col, err := t.GetColumnByName(colName)
33✔
202
        if err != nil {
34✔
203
                return false, err
1✔
204
        }
1✔
205
        return len(t.indexesByColID[col.id]) > 0, nil
32✔
206
}
207

208
func (t *Table) GetColumnByName(name string) (*Column, error) {
7,744✔
209
        col, exists := t.colsByName[name]
7,744✔
210
        if !exists {
7,768✔
211
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, name)
24✔
212
        }
24✔
213
        return col, nil
7,720✔
214
}
215

216
func (t *Table) GetColumnByID(id uint32) (*Column, error) {
123,519✔
217
        col, exists := t.colsByID[id]
123,519✔
218
        if !exists {
123,626✔
219
                return nil, ErrColumnDoesNotExist
107✔
220
        }
107✔
221
        return col, nil
123,412✔
222
}
223

224
func (t *Table) ColumnsByID() map[uint32]*Column {
6✔
225
        return t.colsByID
6✔
226
}
6✔
227

228
func (t *Table) GetIndexes() []*Index {
25✔
229
        idxs := make([]*Index, 0, len(t.indexes))
25✔
230

25✔
231
        idxs = append(idxs, t.indexes...)
25✔
232

25✔
233
        return idxs
25✔
234
}
25✔
235

236
func (t *Table) GetIndexesByColID(colID uint32) []*Index {
11✔
237
        idxs := make([]*Index, 0, len(t.indexes))
11✔
238

11✔
239
        idxs = append(idxs, t.indexesByColID[colID]...)
11✔
240

11✔
241
        return idxs
11✔
242
}
11✔
243

244
func (t *Table) GetMaxColID() uint32 {
50✔
245
        return t.maxColID
50✔
246
}
50✔
247

248
func (i *Index) IsPrimary() bool {
8,413✔
249
        return i.id == PKIndexID
8,413✔
250
}
8,413✔
251

252
func (i *Index) IsUnique() bool {
1,267✔
253
        return i.unique
1,267✔
254
}
1,267✔
255

256
func (i *Index) Cols() []*Column {
510✔
257
        return i.cols
510✔
258
}
510✔
259

260
func (i *Index) IncludesCol(colID uint32) bool {
41✔
261
        _, ok := i.colsByID[colID]
41✔
262
        return ok
41✔
263
}
41✔
264

265
func (i *Index) enginePrefix() []byte {
10,774✔
266
        return i.table.catalog.enginePrefix
10,774✔
267
}
10,774✔
268

269
func (i *Index) coversOrdCols(ordCols []*OrdCol, rangesByColID map[uint32]*typedValueRange) bool {
357✔
270
        if !ordColumnsHaveSameDirection(ordCols) {
371✔
271
                return false
14✔
272
        }
14✔
273
        return i.hasPrefix(i.cols, ordCols) || i.sortableUsing(ordCols, rangesByColID)
343✔
274
}
275

276
func ordColumnsHaveSameDirection(cols []*OrdCol) bool {
357✔
277
        if len(cols) == 0 {
357✔
278
                return true
×
279
        }
×
280

281
        desc := cols[0].descOrder
357✔
282
        for _, ordCol := range cols[1:] {
464✔
283
                if ordCol.descOrder != desc {
121✔
284
                        return false
14✔
285
                }
14✔
286
        }
287
        return true
343✔
288
}
289

290
func (i *Index) hasPrefix(columns []*Column, ordCols []*OrdCol) bool {
365✔
291
        if len(ordCols) > len(columns) {
395✔
292
                return false
30✔
293
        }
30✔
294

295
        for j, ordCol := range ordCols {
718✔
296
                aggFn, _, colName := ordCol.sel.resolve(i.table.Name())
383✔
297
                if len(aggFn) > 0 {
388✔
298
                        return false
5✔
299
                }
5✔
300

301
                col, err := i.table.GetColumnByName(colName)
378✔
302
                if err != nil || col.id != columns[j].id {
502✔
303
                        return false
124✔
304
                }
124✔
305
        }
306
        return true
206✔
307
}
308

309
func (i *Index) sortableUsing(columns []*OrdCol, rangesByColID map[uint32]*typedValueRange) bool {
153✔
310
        // all columns before colID must be fixedValues otherwise the index can not be used
153✔
311
        aggFn, _, colName := columns[0].sel.resolve(i.table.Name())
153✔
312
        if len(aggFn) > 0 {
156✔
313
                return false
3✔
314
        }
3✔
315

316
        firstCol, err := i.table.GetColumnByName(colName)
150✔
317
        if err != nil {
154✔
318
                return false
4✔
319
        }
4✔
320

321
        for j, col := range i.cols {
311✔
322
                if col.id == firstCol.id {
187✔
323
                        return i.hasPrefix(i.cols[j:], columns)
22✔
324
                }
22✔
325

326
                colRange, ok := rangesByColID[col.id]
143✔
327
                if ok && colRange.unitary() {
162✔
328
                        continue
19✔
329
                }
330

331
                return false
124✔
332
        }
333
        return false
×
334
}
335

336
func (i *Index) Name() string {
11,179✔
337
        return indexName(i.table.name, i.cols)
11,179✔
338
}
11,179✔
339

340
func (i *Index) ID() uint32 {
17✔
341
        return i.id
17✔
342
}
17✔
343

344
func (t *Table) GetIndexByName(name string) (*Index, error) {
36✔
345
        idx, exists := t.indexesByName[name]
36✔
346
        if !exists {
37✔
347
                return nil, fmt.Errorf("%w (%s)", ErrIndexNotFound, name)
1✔
348
        }
1✔
349
        return idx, nil
35✔
350
}
351

352
func indexName(tableName string, cols []*Column) string {
11,214✔
353
        var buf strings.Builder
11,214✔
354

11,214✔
355
        buf.WriteString(tableName)
11,214✔
356

11,214✔
357
        buf.WriteString("(")
11,214✔
358

11,214✔
359
        for c, col := range cols {
23,177✔
360
                buf.WriteString(col.colName)
11,963✔
361

11,963✔
362
                if c < len(cols)-1 {
12,712✔
363
                        buf.WriteString(",")
749✔
364
                }
749✔
365
        }
366

367
        buf.WriteString(")")
11,214✔
368

11,214✔
369
        return buf.String()
11,214✔
370
}
371

372
func (catlg *Catalog) newTable(name string, colsSpec map[uint32]*ColSpec, checkConstraints map[string]CheckConstraint, maxColID uint32) (table *Table, err error) {
3,776✔
373
        if len(name) == 0 || len(colsSpec) == 0 {
3,779✔
374
                return nil, ErrIllegalArguments
3✔
375
        }
3✔
376

377
        for id := range colsSpec {
16,270✔
378
                if id <= 0 || id > maxColID {
12,497✔
379
                        return nil, ErrIllegalArguments
×
380
                }
×
381
        }
382

383
        exists := catlg.ExistTable(name)
3,773✔
384
        if exists {
3,780✔
385
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, name)
7✔
386
        }
7✔
387

388
        // Generate a new ID for the table by incrementing the 'maxTableID' variable of the 'catalog' instance.
389
        id := (catlg.maxTableID + 1)
3,766✔
390

3,766✔
391
        // This code is attempting to check if a table with the given id already exists in the Catalog.
3,766✔
392
        // If the function returns nil for err, it means that the table already exists and the function
3,766✔
393
        // should return an error indicating that the table cannot be created again.
3,766✔
394
        _, err = catlg.GetTableByID(id)
3,766✔
395
        if err == nil {
3,766✔
396
                return nil, fmt.Errorf("%w (%d)", ErrTableAlreadyExists, id)
×
397
        }
×
398

399
        table = &Table{
3,766✔
400
                id:               id,
3,766✔
401
                catalog:          catlg,
3,766✔
402
                name:             name,
3,766✔
403
                cols:             make([]*Column, 0, len(colsSpec)),
3,766✔
404
                colsByID:         make(map[uint32]*Column),
3,766✔
405
                colsByName:       make(map[string]*Column),
3,766✔
406
                indexesByName:    make(map[string]*Index),
3,766✔
407
                indexesByColID:   make(map[uint32][]*Index),
3,766✔
408
                checkConstraints: checkConstraints,
3,766✔
409
                maxColID:         maxColID,
3,766✔
410
        }
3,766✔
411

3,766✔
412
        for id := uint32(1); id <= maxColID; id++ {
16,321✔
413
                cs, found := colsSpec[id]
12,555✔
414
                if !found {
12,624✔
415
                        // dropped column
69✔
416
                        continue
69✔
417
                }
418

419
                if isReservedCol(cs.colName) {
12,487✔
420
                        return nil, fmt.Errorf("%w(%s)", ErrReservedWord, cs.colName)
1✔
421
                }
1✔
422

423
                _, colExists := table.colsByName[cs.colName]
12,485✔
424
                if colExists {
12,486✔
425
                        return nil, ErrDuplicatedColumn
1✔
426
                }
1✔
427

428
                if cs.autoIncrement && cs.colType != IntegerType {
12,486✔
429
                        return nil, ErrLimitedAutoIncrement
2✔
430
                }
2✔
431

432
                if !validMaxLenForType(cs.maxLen, cs.colType) {
12,482✔
433
                        return nil, ErrLimitedMaxLen
×
434
                }
×
435

436
                col := &Column{
12,482✔
437
                        id:            uint32(id),
12,482✔
438
                        table:         table,
12,482✔
439
                        colName:       cs.colName,
12,482✔
440
                        colType:       cs.colType,
12,482✔
441
                        maxLen:        cs.maxLen,
12,482✔
442
                        autoIncrement: cs.autoIncrement,
12,482✔
443
                        notNull:       cs.notNull,
12,482✔
444
                }
12,482✔
445

12,482✔
446
                table.cols = append(table.cols, col)
12,482✔
447
                table.colsByID[col.id] = col
12,482✔
448
                table.colsByName[col.colName] = col
12,482✔
449
        }
450

451
        catlg.tables = append(catlg.tables, table)
3,762✔
452
        catlg.tablesByID[table.id] = table
3,762✔
453
        catlg.tablesByName[table.name] = table
3,762✔
454

3,762✔
455
        // increment table count on successfull table creation.
3,762✔
456
        // This ensures that each new table is assigned a unique ID
3,762✔
457
        // that has not been used before.
3,762✔
458
        catlg.maxTableID++
3,762✔
459

3,762✔
460
        return table, nil
3,762✔
461
}
462

463
func (catlg *Catalog) deleteTable(table *Table) error {
6✔
464
        _, exists := catlg.tablesByID[table.id]
6✔
465
        if !exists {
6✔
466
                return ErrTableDoesNotExist
×
467
        }
×
468

469
        newTables := make([]*Table, 0, len(catlg.tables)-1)
6✔
470

6✔
471
        for _, t := range catlg.tables {
24✔
472
                if t.id != table.id {
30✔
473
                        newTables = append(newTables, t)
12✔
474
                }
12✔
475
        }
476

477
        catlg.tables = newTables
6✔
478
        delete(catlg.tablesByID, table.id)
6✔
479
        delete(catlg.tablesByName, table.name)
6✔
480

6✔
481
        return nil
6✔
482
}
483

484
func (t *Table) newIndex(unique bool, colIDs []uint32) (index *Index, err error) {
5,590✔
485
        if len(colIDs) < 1 {
5,591✔
486
                return nil, ErrIllegalArguments
1✔
487
        }
1✔
488

489
        // validate column ids
490
        cols := make([]*Column, len(colIDs))
5,589✔
491
        colsByID := make(map[uint32]*Column, len(colIDs))
5,589✔
492

5,589✔
493
        for i, colID := range colIDs {
11,544✔
494
                col, err := t.GetColumnByID(colID)
5,955✔
495
                if err != nil {
5,956✔
496
                        return nil, err
1✔
497
                }
1✔
498

499
                _, ok := colsByID[colID]
5,954✔
500
                if ok {
5,955✔
501
                        return nil, ErrDuplicatedColumn
1✔
502
                }
1✔
503

504
                cols[i] = col
5,953✔
505
                colsByID[colID] = col
5,953✔
506
        }
507

508
        index = &Index{
5,587✔
509
                id:       uint32(t.maxIndexID),
5,587✔
510
                table:    t,
5,587✔
511
                unique:   unique,
5,587✔
512
                cols:     cols,
5,587✔
513
                colsByID: colsByID,
5,587✔
514
        }
5,587✔
515

5,587✔
516
        _, exists := t.indexesByName[index.Name()]
5,587✔
517
        if exists {
5,593✔
518
                return nil, ErrIndexAlreadyExists
6✔
519
        }
6✔
520

521
        t.indexes = append(t.indexes, index)
5,581✔
522
        t.indexesByName[index.Name()] = index
5,581✔
523

5,581✔
524
        // having a direct way to get the indexes by colID
5,581✔
525
        for _, col := range index.cols {
11,526✔
526
                t.indexesByColID[col.id] = append(t.indexesByColID[col.id], index)
5,945✔
527
        }
5,945✔
528

529
        if index.id == PKIndexID {
9,325✔
530
                t.primaryIndex = index
3,744✔
531
                t.autoIncrementPK = len(index.cols) == 1 && index.cols[0].autoIncrement
3,744✔
532
        }
3,744✔
533

534
        // increment table count on successfull table creation.
535
        // This ensures that each new table is assigned a unique ID
536
        // that has not been used before.
537
        t.maxIndexID++
5,581✔
538

5,581✔
539
        return index, nil
5,581✔
540
}
541

542
func (t *Table) newColumn(spec *ColSpec) (*Column, error) {
19✔
543
        if isReservedCol(spec.colName) {
20✔
544
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, spec.colName)
1✔
545
        }
1✔
546

547
        if spec.autoIncrement {
19✔
548
                return nil, fmt.Errorf("%w (%s)", ErrLimitedAutoIncrement, spec.colName)
1✔
549
        }
1✔
550

551
        if spec.notNull {
18✔
552
                return nil, fmt.Errorf("%w (%s)", ErrNewColumnMustBeNullable, spec.colName)
1✔
553
        }
1✔
554

555
        if !validMaxLenForType(spec.maxLen, spec.colType) {
17✔
556
                return nil, fmt.Errorf("%w (%s)", ErrLimitedMaxLen, spec.colName)
1✔
557
        }
1✔
558

559
        _, exists := t.colsByName[spec.colName]
15✔
560
        if exists {
18✔
561
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, spec.colName)
3✔
562
        }
3✔
563

564
        t.maxColID++
12✔
565

12✔
566
        col := &Column{
12✔
567
                id:            t.maxColID,
12✔
568
                table:         t,
12✔
569
                colName:       spec.colName,
12✔
570
                colType:       spec.colType,
12✔
571
                maxLen:        spec.maxLen,
12✔
572
                autoIncrement: spec.autoIncrement,
12✔
573
                notNull:       spec.notNull,
12✔
574
        }
12✔
575

12✔
576
        t.cols = append(t.cols, col)
12✔
577
        t.colsByID[col.id] = col
12✔
578
        t.colsByName[col.colName] = col
12✔
579

12✔
580
        return col, nil
12✔
581
}
582

583
func (ctlg *Catalog) renameTable(oldName, newName string) (*Table, error) {
6✔
584
        if oldName == newName {
7✔
585
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
586
        }
1✔
587

588
        t, err := ctlg.GetTableByName(oldName)
5✔
589
        if err != nil {
7✔
590
                return nil, err
2✔
591
        }
2✔
592

593
        _, err = ctlg.GetTableByName(newName)
3✔
594
        if err == nil {
4✔
595
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, newName)
1✔
596
        }
1✔
597

598
        t.name = newName
2✔
599

2✔
600
        delete(ctlg.tablesByName, oldName)
2✔
601
        ctlg.tablesByName[newName] = t
2✔
602

2✔
603
        return t, nil
2✔
604
}
605

606
func (t *Table) renameColumn(oldName, newName string) (*Column, error) {
9✔
607
        if isReservedCol(newName) {
9✔
608
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, newName)
×
609
        }
×
610

611
        if oldName == newName {
10✔
612
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
613
        }
1✔
614

615
        col, exists := t.colsByName[oldName]
8✔
616
        if !exists {
9✔
617
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, oldName)
1✔
618
        }
1✔
619

620
        _, exists = t.colsByName[newName]
7✔
621
        if exists {
8✔
622
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, newName)
1✔
623
        }
1✔
624

625
        col.colName = newName
6✔
626

6✔
627
        delete(t.colsByName, oldName)
6✔
628
        t.colsByName[newName] = col
6✔
629

6✔
630
        return col, nil
6✔
631
}
632

633
func (t *Table) deleteColumn(col *Column) error {
12✔
634
        isIndexed, err := t.IsIndexed(col.colName)
12✔
635
        if err != nil {
12✔
636
                return err
×
637
        }
×
638

639
        if isIndexed {
16✔
640
                return fmt.Errorf("%w %s because one or more indexes require it", ErrCannotDropColumn, col.colName)
4✔
641
        }
4✔
642

643
        newCols := make([]*Column, 0, len(t.cols)-1)
8✔
644

8✔
645
        for _, c := range t.cols {
55✔
646
                if c.id != col.id {
86✔
647
                        newCols = append(newCols, c)
39✔
648
                }
39✔
649
        }
650

651
        t.cols = newCols
8✔
652
        delete(t.colsByName, col.colName)
8✔
653
        delete(t.colsByID, col.id)
8✔
654

8✔
655
        return nil
8✔
656
}
657

658
func (t *Table) deleteCheck(name string) (uint32, error) {
4✔
659
        c, exists := t.checkConstraints[name]
4✔
660
        if !exists {
5✔
661
                return 0, fmt.Errorf("%s.%s: %w", t.name, name, ErrConstraintNotFound)
1✔
662
        }
1✔
663

664
        delete(t.checkConstraints, name)
3✔
665
        return c.id, nil
3✔
666
}
667

668
func (t *Table) deleteIndex(index *Index) error {
5✔
669
        if index.IsPrimary() {
6✔
670
                return fmt.Errorf("%w: primary key index can NOT be deleted", ErrIllegalArguments)
1✔
671
        }
1✔
672

673
        newIndexes := make([]*Index, 0, len(t.indexes)-1)
4✔
674

4✔
675
        for _, i := range t.indexes {
13✔
676
                if i.id != index.id {
14✔
677
                        newIndexes = append(newIndexes, i)
5✔
678
                }
5✔
679
        }
680

681
        t.indexes = newIndexes
4✔
682
        delete(t.indexesByColID, index.id)
4✔
683
        delete(t.indexesByName, index.Name())
4✔
684

4✔
685
        return nil
4✔
686
}
687

688
func (c *Column) ID() uint32 {
684✔
689
        return c.id
684✔
690
}
684✔
691

692
func (c *Column) Name() string {
3,317✔
693
        return c.colName
3,317✔
694
}
3,317✔
695

696
func (c *Column) Type() SQLValueType {
14,518✔
697
        return c.colType
14,518✔
698
}
14,518✔
699

700
func (c *Column) MaxLen() int {
37,783✔
701
        switch c.colType {
37,783✔
702
        case BooleanType:
658✔
703
                return 1
658✔
704
        case IntegerType:
27,198✔
705
                return 8
27,198✔
706
        case TimestampType:
1,144✔
707
                return 8
1,144✔
708
        case Float64Type:
2,037✔
709
                return 8
2,037✔
710
        case UUIDType:
39✔
711
                return 16
39✔
712
        }
713

714
        return c.maxLen
6,707✔
715
}
716

717
func (c *Column) IsNullable() bool {
19✔
718
        return !c.notNull
19✔
719
}
19✔
720

721
func (c *Column) IsAutoIncremental() bool {
11✔
722
        return c.autoIncrement
11✔
723
}
11✔
724

725
func validMaxLenForType(maxLen int, sqlType SQLValueType) bool {
12,498✔
726
        switch sqlType {
12,498✔
727
        case BooleanType:
733✔
728
                return maxLen <= 1
733✔
729
        case IntegerType:
5,708✔
730
                return maxLen == 0 || maxLen == 8
5,708✔
731
        case Float64Type:
1,114✔
732
                return maxLen == 0 || maxLen == 8
1,114✔
733
        case TimestampType:
994✔
734
                return maxLen == 0 || maxLen == 8
994✔
735
        case UUIDType:
69✔
736
                return maxLen == 0 || maxLen == 16
69✔
737
        }
738

739
        return maxLen >= 0
3,880✔
740
}
741

742
func (catlg *Catalog) load(ctx context.Context, tx *store.OngoingTx) error {
3,085✔
743
        return catlg.loadCatalog(ctx, tx, false)
3,085✔
744
}
3,085✔
745

746
func (catlg *Catalog) loadCatalog(ctx context.Context, tx *store.OngoingTx, copyToTx bool) error {
3,112✔
747
        prefix := MapKey(catlg.enginePrefix, catalogTablePrefix, EncodeID(1))
3,112✔
748

3,112✔
749
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
6,691✔
750
                dbID, tableID, err := unmapTableID(catlg.enginePrefix, key)
3,579✔
751
                if err != nil {
3,579✔
752
                        return err
×
753
                }
×
754

755
                if dbID != DatabaseID {
3,579✔
756
                        return ErrCorruptedData
×
757
                }
×
758

759
                if deleted {
3,608✔
760
                        catlg.maxTableID++
29✔
761
                        return nil
29✔
762
                }
29✔
763

764
                colSpecs, maxColID, err := loadColSpecs(ctx, tableID, tx, catlg.enginePrefix, copyToTx)
3,550✔
765
                if err != nil {
3,554✔
766
                        return err
4✔
767
                }
4✔
768

769
                checks, err := loadCheckConstraints(ctx, dbID, tableID, tx, catlg.enginePrefix, copyToTx)
3,546✔
770
                if err != nil {
3,546✔
771
                        return err
×
772
                }
×
773

774
                table, err := catlg.newTable(string(value), colSpecs, checks, maxColID)
3,546✔
775
                if err != nil {
3,546✔
776
                        return err
×
777
                }
×
778

779
                if tableID != table.id {
3,546✔
780
                        return ErrCorruptedData
×
781
                }
×
782

783
                if copyToTx {
3,555✔
784
                        if err := tx.Set(key, nil, value); err != nil {
9✔
NEW
785
                                return err
×
NEW
786
                        }
×
787
                }
788
                return table.loadIndexes(ctx, catlg.enginePrefix, tx, copyToTx)
3,546✔
789
        })
790
}
791

792
func loadMaxPK(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, table *Table) ([]byte, error) {
1,749✔
793
        pkReaderSpec := store.KeyReaderSpec{
1,749✔
794
                Prefix:    MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(table.primaryIndex.id)),
1,749✔
795
                DescOrder: true,
1,749✔
796
        }
1,749✔
797

1,749✔
798
        pkReader, err := tx.NewKeyReader(pkReaderSpec)
1,749✔
799
        if err != nil {
1,749✔
800
                return nil, err
×
801
        }
×
802
        defer pkReader.Close()
1,749✔
803

1,749✔
804
        mkey, _, err := pkReader.Read(ctx)
1,749✔
805
        if err != nil {
2,000✔
806
                return nil, err
251✔
807
        }
251✔
808

809
        return unmapIndexEntry(table.primaryIndex, sqlPrefix, mkey)
1,498✔
810
}
811

812
func loadColSpecs(ctx context.Context, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[uint32]*ColSpec, uint32, error) {
3,550✔
813
        prefix := MapKey(sqlPrefix, catalogColumnPrefix, EncodeID(1), EncodeID(tableID))
3,550✔
814

3,550✔
815
        var maxColID uint32
3,550✔
816
        specs := make(map[uint32]*ColSpec)
3,550✔
817

3,550✔
818
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
15,426✔
819
                if deleted {
11,945✔
820
                        maxColID++
69✔
821
                        return nil
69✔
822
                }
69✔
823

824
                colSpec, colID, err := loadColSpec(sqlPrefix, key, value, tableID)
11,807✔
825
                if err != nil {
11,810✔
826
                        return err
3✔
827
                }
3✔
828

829
                maxColID++
11,804✔
830

11,804✔
831
                specs[colID] = colSpec
11,804✔
832

11,804✔
833
                if copyToTx {
11,836✔
834
                        return tx.Set(key, nil, value)
32✔
835
                }
32✔
836
                return nil
11,772✔
837
        })
838
        return specs, maxColID, err
3,550✔
839
}
840

841
func loadColSpec(sqlPrefix, key, value []byte, tableID uint32) (*ColSpec, uint32, error) {
11,807✔
842
        if len(value) < 6 {
11,809✔
843
                return nil, 0, ErrCorruptedData
2✔
844
        }
2✔
845

846
        mdbID, mtableID, colID, colType, err := unmapColSpec(sqlPrefix, key)
11,805✔
847
        if err != nil {
11,806✔
848
                return nil, 0, err
1✔
849
        }
1✔
850

851
        if mdbID != 1 || tableID != mtableID {
11,804✔
NEW
852
                return nil, 0, ErrCorruptedData
×
UNCOV
853
        }
×
854

855
        return &ColSpec{
11,804✔
856
                colName:       string(value[5:]),
11,804✔
857
                colType:       colType,
11,804✔
858
                maxLen:        int(binary.BigEndian.Uint32(value[1:])),
11,804✔
859
                autoIncrement: value[0]&autoIncrementFlag != 0,
11,804✔
860
                notNull:       value[0]&nullableFlag != 0,
11,804✔
861
        }, colID, nil
11,804✔
862
}
863

864
func loadCheckConstraints(ctx context.Context, dbID, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte, copyToTx bool) (map[string]CheckConstraint, error) {
3,546✔
865
        prefix := MapKey(sqlPrefix, catalogCheckPrefix, EncodeID(dbID), EncodeID(tableID))
3,546✔
866
        checks := make(map[string]CheckConstraint)
3,546✔
867

3,546✔
868
        err := iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
3,666✔
869
                if deleted {
142✔
870
                        return nil
22✔
871
                }
22✔
872

873
                check, err := parseCheckConstraint(sqlPrefix, key, value)
98✔
874
                if err != nil {
98✔
875
                        return err
×
876
                }
×
877
                checks[check.name] = *check
98✔
878

98✔
879
                if copyToTx {
98✔
NEW
880
                        return tx.Set(key, nil, value)
×
UNCOV
881
                }
×
882
                return nil
98✔
883
        })
884
        return checks, err
3,546✔
885
}
886

887
func (table *Table) loadIndexes(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, copyToTx bool) error {
3,546✔
888
        prefix := MapKey(sqlPrefix, catalogIndexPrefix, EncodeID(1), EncodeID(table.id))
3,546✔
889

3,546✔
890
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
8,827✔
891
                dbID, tableID, indexID, err := unmapIndex(sqlPrefix, key)
5,281✔
892
                if err != nil {
5,281✔
893
                        return err
×
894
                }
×
895

896
                if table.id != tableID || dbID != 1 {
5,281✔
897
                        return ErrCorruptedData
×
898
                }
×
899

900
                if deleted {
5,301✔
901
                        table.maxIndexID++
20✔
902
                        return nil
20✔
903
                }
20✔
904

905
                if copyToTx {
5,284✔
906
                        if err := tx.Set(key, nil, value); err != nil {
23✔
NEW
907
                                return err
×
NEW
908
                        }
×
909
                } else {
5,238✔
910
                        // v={unique {colID1}(ASC|DESC)...{colIDN}(ASC|DESC)}
5,238✔
911
                        colSpecLen := EncIDLen + 1
5,238✔
912
                        if len(value) < 1+colSpecLen || len(value)%colSpecLen != 1 {
5,238✔
NEW
913
                                return ErrCorruptedData
×
NEW
914
                        }
×
915

916
                        var colIDs []uint32
5,238✔
917
                        for i := 1; i < len(value); i += colSpecLen {
10,813✔
918
                                colID := binary.BigEndian.Uint32(value[i:])
5,575✔
919

5,575✔
920
                                // TODO: currently only ASC order is supported
5,575✔
921
                                if value[i+EncIDLen] != 0 {
5,575✔
NEW
922
                                        return ErrCorruptedData
×
NEW
923
                                }
×
924
                                colIDs = append(colIDs, colID)
5,575✔
925
                        }
926

927
                        index, err := table.newIndex(value[0] > 0, colIDs)
5,238✔
928
                        if err != nil {
5,239✔
929
                                return err
1✔
930
                        }
1✔
931

932
                        if indexID != index.id {
5,237✔
UNCOV
933
                                return ErrCorruptedData
×
934
                        }
×
935
                }
936
                return nil
5,260✔
937
        })
938
}
939

940
func trimPrefix(prefix, mkey []byte, mappingPrefix []byte) ([]byte, error) {
22,302✔
941
        if len(prefix)+len(mappingPrefix) > len(mkey) ||
22,302✔
942
                !bytes.Equal(prefix, mkey[:len(prefix)]) ||
22,302✔
943
                !bytes.Equal(mappingPrefix, mkey[len(prefix):len(prefix)+len(mappingPrefix)]) {
22,310✔
944
                return nil, ErrIllegalMappedKey
8✔
945
        }
8✔
946

947
        return mkey[len(prefix)+len(mappingPrefix):], nil
22,294✔
948
}
949

950
func unmapTableID(prefix, mkey []byte) (dbID, tableID uint32, err error) {
3,582✔
951
        encID, err := trimPrefix(prefix, mkey, []byte(catalogTablePrefix))
3,582✔
952
        if err != nil {
3,583✔
953
                return 0, 0, err
1✔
954
        }
1✔
955

956
        if len(encID) != EncIDLen*2 {
3,582✔
957
                return 0, 0, ErrCorruptedData
1✔
958
        }
1✔
959

960
        dbID = binary.BigEndian.Uint32(encID)
3,580✔
961
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
3,580✔
962

3,580✔
963
        return
3,580✔
964
}
965

966
func unmapCheckID(prefix, mkey []byte) (uint32, error) {
98✔
967
        encID, err := trimPrefix(prefix, mkey, []byte(catalogCheckPrefix))
98✔
968
        if err != nil {
98✔
NEW
969
                return 0, err
×
NEW
970
        }
×
971

972
        if len(encID) < 3*EncIDLen {
98✔
NEW
973
                return 0, ErrCorruptedData
×
NEW
974
        }
×
975
        return binary.BigEndian.Uint32(encID[2*EncIDLen:]), nil
98✔
976
}
977

978
func parseCheckConstraint(prefix, key, value []byte) (*CheckConstraint, error) {
98✔
979
        id, err := unmapCheckID(prefix, key)
98✔
980
        if err != nil {
98✔
NEW
981
                return nil, err
×
NEW
982
        }
×
983

984
        nameLen := value[0] + 1
98✔
985
        name := string(value[1 : 1+nameLen])
98✔
986

98✔
987
        exp, err := ParseExpFromString(string(value[1+nameLen:]))
98✔
988
        if err != nil {
98✔
NEW
989
                return nil, err
×
NEW
990
        }
×
991

992
        return &CheckConstraint{
98✔
993
                id:   id,
98✔
994
                name: name,
98✔
995
                exp:  exp,
98✔
996
        }, nil
98✔
997
}
998

999
func unmapColSpec(prefix, mkey []byte) (dbID, tableID, colID uint32, colType SQLValueType, err error) {
11,809✔
1000
        encID, err := trimPrefix(prefix, mkey, []byte(catalogColumnPrefix))
11,809✔
1001
        if err != nil {
11,810✔
1002
                return 0, 0, 0, "", err
1✔
1003
        }
1✔
1004

1005
        if len(encID) < EncIDLen*3 {
11,809✔
1006
                return 0, 0, 0, "", ErrCorruptedData
1✔
1007
        }
1✔
1008

1009
        dbID = binary.BigEndian.Uint32(encID)
11,807✔
1010
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
11,807✔
1011
        colID = binary.BigEndian.Uint32(encID[2*EncIDLen:])
11,807✔
1012

11,807✔
1013
        colType, err = asType(string(encID[EncIDLen*3:]))
11,807✔
1014
        if err != nil {
11,809✔
1015
                return 0, 0, 0, "", ErrCorruptedData
2✔
1016
        }
2✔
1017

1018
        return
11,805✔
1019
}
1020

1021
func asType(t string) (SQLValueType, error) {
11,807✔
1022
        switch t {
11,807✔
1023
        case IntegerType,
1024
                Float64Type,
1025
                BooleanType,
1026
                VarcharType,
1027
                UUIDType,
1028
                BLOBType,
1029
                TimestampType,
1030
                JSONType:
11,805✔
1031
                return t, nil
11,805✔
1032
        }
1033
        return t, ErrCorruptedData
2✔
1034
}
1035

1036
func unmapIndex(sqlPrefix, mkey []byte) (dbID, tableID, indexID uint32, err error) {
5,284✔
1037
        encID, err := trimPrefix(sqlPrefix, mkey, []byte(catalogIndexPrefix))
5,284✔
1038
        if err != nil {
5,285✔
1039
                return 0, 0, 0, err
1✔
1040
        }
1✔
1041

1042
        if len(encID) != EncIDLen*3 {
5,284✔
1043
                return 0, 0, 0, ErrCorruptedData
1✔
1044
        }
1✔
1045

1046
        dbID = binary.BigEndian.Uint32(encID)
5,282✔
1047
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
5,282✔
1048
        indexID = binary.BigEndian.Uint32(encID[EncIDLen*2:])
5,282✔
1049

5,282✔
1050
        return
5,282✔
1051
}
1052

1053
func unmapIndexEntry(index *Index, sqlPrefix, mkey []byte) (encPKVals []byte, err error) {
1,523✔
1054
        if index == nil {
1,523✔
1055
                return nil, ErrIllegalArguments
×
1056
        }
×
1057

1058
        enc, err := trimPrefix(sqlPrefix, mkey, []byte(MappedPrefix))
1,523✔
1059
        if err != nil {
1,524✔
1060
                return nil, ErrCorruptedData
1✔
1061
        }
1✔
1062

1063
        if len(enc) <= EncIDLen*2 {
1,530✔
1064
                return nil, ErrCorruptedData
8✔
1065
        }
8✔
1066

1067
        off := 0
1,514✔
1068

1,514✔
1069
        tableID := binary.BigEndian.Uint32(enc[off:])
1,514✔
1070
        off += EncIDLen
1,514✔
1071

1,514✔
1072
        indexID := binary.BigEndian.Uint32(enc[off:])
1,514✔
1073
        off += EncIDLen
1,514✔
1074

1,514✔
1075
        if tableID != index.table.id || indexID != index.id {
1,514✔
1076
                return nil, ErrCorruptedData
×
1077
        }
×
1078

1079
        //read index values
1080
        for _, col := range index.cols {
3,028✔
1081
                if enc[off] == KeyValPrefixNull {
1,514✔
1082
                        off += 1
×
1083
                        continue
×
1084
                }
1085
                if enc[off] != KeyValPrefixNotNull {
1,514✔
1086
                        return nil, ErrCorruptedData
×
1087
                }
×
1088
                off += 1
1,514✔
1089

1,514✔
1090
                maxLen := col.MaxLen()
1,514✔
1091
                if variableSizedType(col.colType) {
1,530✔
1092
                        maxLen += EncLenLen
16✔
1093
                }
16✔
1094
                if len(enc)-off < maxLen {
1,528✔
1095
                        return nil, ErrCorruptedData
14✔
1096
                }
14✔
1097

1098
                off += maxLen
1,500✔
1099
        }
1100

1101
        //PK cannot be nil
1102
        if len(enc)-off < 1 {
1,501✔
1103
                return nil, ErrCorruptedData
1✔
1104
        }
1✔
1105

1106
        return enc[off:], nil
1,499✔
1107
}
1108

1109
func variableSizedType(sqlType SQLValueType) bool {
1,893✔
1110
        return sqlType == VarcharType || sqlType == BLOBType
1,893✔
1111
}
1,893✔
1112

1113
func MapKey(prefix []byte, mappingPrefix string, encValues ...[]byte) []byte {
42,273✔
1114
        mkeyLen := len(prefix) + len(mappingPrefix)
42,273✔
1115

42,273✔
1116
        for _, ev := range encValues {
162,503✔
1117
                mkeyLen += len(ev)
120,230✔
1118
        }
120,230✔
1119

1120
        mkey := make([]byte, mkeyLen)
42,273✔
1121

42,273✔
1122
        off := 0
42,273✔
1123

42,273✔
1124
        copy(mkey, prefix)
42,273✔
1125
        off += len(prefix)
42,273✔
1126

42,273✔
1127
        copy(mkey[off:], []byte(mappingPrefix))
42,273✔
1128
        off += len(mappingPrefix)
42,273✔
1129

42,273✔
1130
        for _, ev := range encValues {
162,503✔
1131
                copy(mkey[off:], ev)
120,230✔
1132
                off += len(ev)
120,230✔
1133
        }
120,230✔
1134

1135
        return mkey
42,273✔
1136
}
1137

1138
func EncodeID(id uint32) []byte {
88,722✔
1139
        var encID [EncIDLen]byte
88,722✔
1140
        binary.BigEndian.PutUint32(encID[:], id)
88,722✔
1141
        return encID[:]
88,722✔
1142
}
88,722✔
1143

1144
const (
1145
        KeyValPrefixNull       byte = 0x20
1146
        KeyValPrefixNotNull    byte = 0x80
1147
        KeyValPrefixUpperBound byte = 0xFF
1148
)
1149

1150
func EncodeValueAsKey(val TypedValue, colType SQLValueType, maxLen int) ([]byte, int, error) {
27,488✔
1151
        return EncodeRawValueAsKey(val.RawValue(), colType, maxLen)
27,488✔
1152
}
27,488✔
1153

1154
// EncodeRawValueAsKey encodes a value in a b-tree meaningful way.
1155
func EncodeRawValueAsKey(val interface{}, colType SQLValueType, maxLen int) ([]byte, int, error) {
27,622✔
1156
        if maxLen <= 0 {
27,625✔
1157
                return nil, 0, ErrInvalidValue
3✔
1158
        }
3✔
1159
        if maxLen > MaxKeyLen {
27,620✔
1160
                return nil, 0, ErrMaxKeyLengthExceeded
1✔
1161
        }
1✔
1162

1163
        convVal, err := mayApplyImplicitConversion(val, colType)
27,618✔
1164
        if err != nil {
27,619✔
1165
                return nil, 0, err
1✔
1166
        }
1✔
1167

1168
        if convVal == nil {
27,722✔
1169
                return []byte{KeyValPrefixNull}, 0, nil
105✔
1170
        }
105✔
1171

1172
        switch colType {
27,512✔
1173
        case VarcharType:
2,204✔
1174
                {
4,408✔
1175
                        strVal, ok := convVal.(string)
2,204✔
1176
                        if !ok {
2,205✔
1177
                                return nil, 0, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
1✔
1178
                        }
1✔
1179

1180
                        if len(strVal) > maxLen {
2,204✔
1181
                                return nil, 0, ErrMaxLengthExceeded
1✔
1182
                        }
1✔
1183

1184
                        // notnull + value + padding + len(value)
1185
                        encv := make([]byte, 1+maxLen+EncLenLen)
2,202✔
1186
                        encv[0] = KeyValPrefixNotNull
2,202✔
1187
                        copy(encv[1:], []byte(strVal))
2,202✔
1188
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(strVal)))
2,202✔
1189

2,202✔
1190
                        return encv, len(strVal), nil
2,202✔
1191
                }
1192
        case IntegerType:
21,910✔
1193
                {
43,820✔
1194
                        if maxLen != 8 {
21,911✔
1195
                                return nil, 0, ErrCorruptedData
1✔
1196
                        }
1✔
1197

1198
                        intVal, ok := convVal.(int64)
21,909✔
1199
                        if !ok {
21,912✔
1200
                                return nil, 0, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
3✔
1201
                        }
3✔
1202

1203
                        // v
1204
                        var encv [9]byte
21,906✔
1205
                        encv[0] = KeyValPrefixNotNull
21,906✔
1206
                        binary.BigEndian.PutUint64(encv[1:], uint64(intVal))
21,906✔
1207
                        // map to unsigned integer space for lexical sorting order
21,906✔
1208
                        encv[1] ^= 0x80
21,906✔
1209

21,906✔
1210
                        return encv[:], 8, nil
21,906✔
1211
                }
1212
        case BooleanType:
297✔
1213
                {
594✔
1214
                        if maxLen != 1 {
298✔
1215
                                return nil, 0, ErrCorruptedData
1✔
1216
                        }
1✔
1217

1218
                        boolVal, ok := convVal.(bool)
296✔
1219
                        if !ok {
297✔
1220
                                return nil, 0, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
1✔
1221
                        }
1✔
1222

1223
                        // v
1224
                        var encv [2]byte
295✔
1225
                        encv[0] = KeyValPrefixNotNull
295✔
1226
                        if boolVal {
470✔
1227
                                encv[1] = 1
175✔
1228
                        }
175✔
1229

1230
                        return encv[:], 1, nil
295✔
1231
                }
1232
        case BLOBType:
1,834✔
1233
                {
3,668✔
1234
                        blobVal, ok := convVal.([]byte)
1,834✔
1235
                        if !ok {
1,835✔
1236
                                return nil, 0, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
1✔
1237
                        }
1✔
1238

1239
                        if len(blobVal) > maxLen {
1,834✔
1240
                                return nil, 0, ErrMaxLengthExceeded
1✔
1241
                        }
1✔
1242

1243
                        // notnull + value + padding + len(value)
1244
                        encv := make([]byte, 1+maxLen+EncLenLen)
1,832✔
1245
                        encv[0] = KeyValPrefixNotNull
1,832✔
1246
                        copy(encv[1:], []byte(blobVal))
1,832✔
1247
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(blobVal)))
1,832✔
1248

1,832✔
1249
                        return encv, len(blobVal), nil
1,832✔
1250
                }
1251
        case UUIDType:
20✔
1252
                {
40✔
1253
                        uuidVal, ok := convVal.(uuid.UUID)
20✔
1254
                        if !ok {
20✔
1255
                                return nil, 0, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1256
                        }
×
1257

1258
                        // notnull + value
1259
                        encv := make([]byte, 17)
20✔
1260
                        encv[0] = KeyValPrefixNotNull
20✔
1261
                        copy(encv[1:], uuidVal[:])
20✔
1262

20✔
1263
                        return encv, 16, nil
20✔
1264
                }
1265
        case TimestampType:
202✔
1266
                {
404✔
1267
                        if maxLen != 8 {
203✔
1268
                                return nil, 0, ErrCorruptedData
1✔
1269
                        }
1✔
1270

1271
                        timeVal, ok := convVal.(time.Time)
201✔
1272
                        if !ok {
202✔
1273
                                return nil, 0, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1274
                        }
1✔
1275

1276
                        // v
1277
                        var encv [9]byte
200✔
1278
                        encv[0] = KeyValPrefixNotNull
200✔
1279
                        binary.BigEndian.PutUint64(encv[1:], uint64(timeVal.UnixNano()))
200✔
1280
                        // map to unsigned integer space for lexical sorting order
200✔
1281
                        encv[1] ^= 0x80
200✔
1282

200✔
1283
                        return encv[:], 8, nil
200✔
1284
                }
1285
        case Float64Type:
1,044✔
1286
                {
2,088✔
1287
                        floatVal, ok := convVal.(float64)
1,044✔
1288
                        if !ok {
1,044✔
1289
                                return nil, 0, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1290
                        }
×
1291

1292
                        // Apart form the sign bit, bit representation of float64
1293
                        // can be sorted lexicographically
1294
                        floatBits := math.Float64bits(floatVal)
1,044✔
1295

1,044✔
1296
                        var encv [9]byte
1,044✔
1297
                        encv[0] = KeyValPrefixNotNull
1,044✔
1298
                        binary.BigEndian.PutUint64(encv[1:], floatBits)
1,044✔
1299

1,044✔
1300
                        if encv[1]&0x80 != 0 {
1,060✔
1301
                                // For negative numbers, the order must be reversed,
16✔
1302
                                // we also negate the sign bit so that all negative
16✔
1303
                                // numbers end up in the smaller half of values
16✔
1304
                                for i := 1; i < 9; i++ {
144✔
1305
                                        encv[i] = ^encv[i]
128✔
1306
                                }
128✔
1307
                        } else {
1,028✔
1308
                                // For positive numbers, the order is already correct,
1,028✔
1309
                                // we only have to set the sign bit to 1 to ensure that
1,028✔
1310
                                // positive numbers end in the larger half of values
1,028✔
1311
                                encv[1] ^= 0x80
1,028✔
1312
                        }
1,028✔
1313

1314
                        return encv[:], 8, nil
1,044✔
1315
                }
1316
        }
1317

1318
        return nil, 0, ErrInvalidValue
1✔
1319
}
1320

1321
func getEncodeRawValue(val TypedValue, colType SQLValueType) (interface{}, error) {
31,898✔
1322
        if colType != JSONType || val.Type() == JSONType {
63,564✔
1323
                return val.RawValue(), nil
31,666✔
1324
        }
31,666✔
1325

1326
        if val.Type() != VarcharType {
233✔
1327
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1328
        }
1✔
1329
        s, _ := val.RawValue().(string)
231✔
1330

231✔
1331
        raw := json.RawMessage(s)
231✔
1332
        if !json.Valid(raw) {
232✔
1333
                return nil, fmt.Errorf("%w: invalid json value", ErrInvalidValue)
1✔
1334
        }
1✔
1335
        return raw, nil
230✔
1336
}
1337

1338
func EncodeValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
7,501✔
1339
        v, err := getEncodeRawValue(val, colType)
7,501✔
1340
        if err != nil {
7,503✔
1341
                return nil, err
2✔
1342
        }
2✔
1343
        return EncodeRawValue(v, colType, maxLen, false)
7,499✔
1344
}
1345

1346
func EncodeNullableValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
24,397✔
1347
        v, err := getEncodeRawValue(val, colType)
24,397✔
1348
        if err != nil {
24,397✔
1349
                return nil, err
×
1350
        }
×
1351
        return EncodeRawValue(v, colType, maxLen, true)
24,397✔
1352
}
1353

1354
// EncodeRawValue encode a value in a byte format. This is the internal binary representation of a value. Can be decoded with DecodeValue.
1355
func EncodeRawValue(val interface{}, colType SQLValueType, maxLen int, nullable bool) ([]byte, error) {
31,896✔
1356
        convVal, err := mayApplyImplicitConversion(val, colType)
31,896✔
1357
        if err != nil {
31,896✔
1358
                return nil, err
×
1359
        }
×
1360

1361
        if convVal == nil && !nullable {
31,896✔
1362
                return nil, ErrInvalidValue
×
1363
        }
×
1364

1365
        if convVal == nil {
31,904✔
1366
                encv := make([]byte, EncLenLen)
8✔
1367
                binary.BigEndian.PutUint32(encv[:], uint32(0))
8✔
1368
                return encv, nil
8✔
1369
        }
8✔
1370

1371
        switch colType {
31,888✔
1372
        case VarcharType:
6,569✔
1373
                {
13,138✔
1374
                        strVal, ok := convVal.(string)
6,569✔
1375
                        if !ok {
6,573✔
1376
                                return nil, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
4✔
1377
                        }
4✔
1378

1379
                        if maxLen > 0 && len(strVal) > maxLen {
6,568✔
1380
                                return nil, ErrMaxLengthExceeded
3✔
1381
                        }
3✔
1382

1383
                        // len(v) + v
1384
                        encv := make([]byte, EncLenLen+len(strVal))
6,562✔
1385
                        binary.BigEndian.PutUint32(encv[:], uint32(len(strVal)))
6,562✔
1386
                        copy(encv[EncLenLen:], []byte(strVal))
6,562✔
1387

6,562✔
1388
                        return encv, nil
6,562✔
1389
                }
1390
        case IntegerType:
13,587✔
1391
                {
27,174✔
1392
                        intVal, ok := convVal.(int64)
13,587✔
1393
                        if !ok {
13,589✔
1394
                                return nil, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
2✔
1395
                        }
2✔
1396

1397
                        // map to unsigned integer space
1398
                        // len(v) + v
1399
                        var encv [EncLenLen + 8]byte
13,585✔
1400
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
13,585✔
1401
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(intVal))
13,585✔
1402

13,585✔
1403
                        return encv[:], nil
13,585✔
1404
                }
1405
        case BooleanType:
253✔
1406
                {
506✔
1407
                        boolVal, ok := convVal.(bool)
253✔
1408
                        if !ok {
256✔
1409
                                return nil, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
3✔
1410
                        }
3✔
1411

1412
                        // len(v) + v
1413
                        var encv [EncLenLen + 1]byte
250✔
1414
                        binary.BigEndian.PutUint32(encv[:], uint32(1))
250✔
1415
                        if boolVal {
397✔
1416
                                encv[EncLenLen] = 1
147✔
1417
                        }
147✔
1418

1419
                        return encv[:], nil
250✔
1420
                }
1421
        case BLOBType:
413✔
1422
                {
826✔
1423
                        var blobVal []byte
413✔
1424

413✔
1425
                        if val != nil {
826✔
1426
                                v, ok := convVal.([]byte)
413✔
1427
                                if !ok {
416✔
1428
                                        return nil, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
3✔
1429
                                }
3✔
1430
                                blobVal = v
410✔
1431
                        }
1432

1433
                        if maxLen > 0 && len(blobVal) > maxLen {
413✔
1434
                                return nil, ErrMaxLengthExceeded
3✔
1435
                        }
3✔
1436

1437
                        // len(v) + v
1438
                        encv := make([]byte, EncLenLen+len(blobVal))
407✔
1439
                        binary.BigEndian.PutUint32(encv[:], uint32(len(blobVal)))
407✔
1440
                        copy(encv[EncLenLen:], blobVal)
407✔
1441

407✔
1442
                        return encv[:], nil
407✔
1443
                }
1444
        case JSONType:
332✔
1445
                rawJson, ok := val.(json.RawMessage)
332✔
1446
                if !ok {
434✔
1447
                        data, err := json.Marshal(val)
102✔
1448
                        if err != nil {
102✔
1449
                                return nil, err
×
1450
                        }
×
1451
                        rawJson = data
102✔
1452
                }
1453

1454
                // len(v) + v
1455
                encv := make([]byte, EncLenLen+len(rawJson))
332✔
1456
                binary.BigEndian.PutUint32(encv[:], uint32(len(rawJson)))
332✔
1457
                copy(encv[EncLenLen:], rawJson)
332✔
1458

332✔
1459
                return encv[:], nil
332✔
1460
        case UUIDType:
11✔
1461
                {
22✔
1462
                        uuidVal, ok := convVal.(uuid.UUID)
11✔
1463
                        if !ok {
11✔
1464
                                return nil, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1465
                        }
×
1466

1467
                        // len(v) + v
1468
                        var encv [EncLenLen + 16]byte
11✔
1469
                        binary.BigEndian.PutUint32(encv[:], uint32(16))
11✔
1470
                        copy(encv[EncLenLen:], uuidVal[:])
11✔
1471

11✔
1472
                        return encv[:], nil
11✔
1473
                }
1474
        case TimestampType:
5,365✔
1475
                {
10,730✔
1476
                        timeVal, ok := convVal.(time.Time)
5,365✔
1477
                        if !ok {
5,366✔
1478
                                return nil, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1479
                        }
1✔
1480

1481
                        // len(v) + v
1482
                        var encv [EncLenLen + 8]byte
5,364✔
1483
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
5,364✔
1484
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(TimeToInt64(timeVal)))
5,364✔
1485

5,364✔
1486
                        return encv[:], nil
5,364✔
1487
                }
1488
        case Float64Type:
5,356✔
1489
                {
10,712✔
1490
                        floatVal, ok := convVal.(float64)
5,356✔
1491
                        if !ok {
5,356✔
1492
                                return nil, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1493
                        }
×
1494

1495
                        var encv [EncLenLen + 8]byte
5,356✔
1496
                        floatBits := math.Float64bits(floatVal)
5,356✔
1497
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
5,356✔
1498
                        binary.BigEndian.PutUint64(encv[EncLenLen:], floatBits)
5,356✔
1499

5,356✔
1500
                        return encv[:], nil
5,356✔
1501
                }
1502
        }
1503

1504
        return nil, ErrInvalidValue
2✔
1505
}
1506

1507
func DecodeValueLength(b []byte) (int, int, error) {
297,358✔
1508
        if len(b) < EncLenLen {
297,360✔
1509
                return 0, 0, ErrCorruptedData
2✔
1510
        }
2✔
1511

1512
        vlen := int(binary.BigEndian.Uint32(b[:]))
297,356✔
1513
        voff := EncLenLen
297,356✔
1514

297,356✔
1515
        if vlen < 0 || len(b) < voff+vlen {
297,359✔
1516
                return 0, 0, ErrCorruptedData
3✔
1517
        }
3✔
1518

1519
        return vlen, EncLenLen, nil
297,353✔
1520
}
1521

1522
func DecodeValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
117,607✔
1523
        return decodeValue(b, colType, false)
117,607✔
1524
}
117,607✔
1525

1526
func DecodeNullableValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
179,724✔
1527
        return decodeValue(b, colType, true)
179,724✔
1528
}
179,724✔
1529

1530
func decodeValue(b []byte, colType SQLValueType, nullable bool) (TypedValue, int, error) {
297,331✔
1531
        vlen, voff, err := DecodeValueLength(b)
297,331✔
1532
        if err != nil {
297,335✔
1533
                return nil, 0, err
4✔
1534
        }
4✔
1535

1536
        if vlen == 0 && nullable {
297,367✔
1537
                return &NullValue{t: colType}, voff, nil
40✔
1538
        }
40✔
1539

1540
        switch colType {
297,287✔
1541
        case VarcharType:
61,976✔
1542
                {
123,952✔
1543
                        v := string(b[voff : voff+vlen])
61,976✔
1544
                        voff += vlen
61,976✔
1545

61,976✔
1546
                        return &Varchar{val: v}, voff, nil
61,976✔
1547
                }
61,976✔
1548
        case IntegerType:
125,315✔
1549
                {
250,630✔
1550
                        if vlen != 8 {
125,316✔
1551
                                return nil, 0, ErrCorruptedData
1✔
1552
                        }
1✔
1553

1554
                        v := binary.BigEndian.Uint64(b[voff:])
125,314✔
1555
                        voff += vlen
125,314✔
1556

125,314✔
1557
                        return &Integer{val: int64(v)}, voff, nil
125,314✔
1558
                }
1559
        case BooleanType:
2,398✔
1560
                {
4,796✔
1561
                        if vlen != 1 {
2,400✔
1562
                                return nil, 0, ErrCorruptedData
2✔
1563
                        }
2✔
1564

1565
                        v := b[voff] == 1
2,396✔
1566
                        voff += 1
2,396✔
1567

2,396✔
1568
                        return &Bool{val: v}, voff, nil
2,396✔
1569
                }
1570
        case BLOBType:
3,434✔
1571
                {
6,868✔
1572
                        v := b[voff : voff+vlen]
3,434✔
1573
                        voff += vlen
3,434✔
1574

3,434✔
1575
                        return &Blob{val: v}, voff, nil
3,434✔
1576
                }
3,434✔
1577
        case JSONType:
1,903✔
1578
                {
3,806✔
1579
                        v := b[voff : voff+vlen]
1,903✔
1580
                        voff += vlen
1,903✔
1581

1,903✔
1582
                        var val interface{}
1,903✔
1583
                        err = json.Unmarshal(v, &val)
1,903✔
1584

1,903✔
1585
                        return &JSON{val: val}, voff, err
1,903✔
1586
                }
1,903✔
1587
        case UUIDType:
36✔
1588
                {
72✔
1589
                        if vlen != 16 {
36✔
1590
                                return nil, 0, ErrCorruptedData
×
1591
                        }
×
1592

1593
                        u, err := uuid.FromBytes(b[voff : voff+16])
36✔
1594
                        if err != nil {
36✔
1595
                                return nil, 0, fmt.Errorf("%w: %s", ErrCorruptedData, err.Error())
×
1596
                        }
×
1597

1598
                        voff += vlen
36✔
1599

36✔
1600
                        return &UUID{val: u}, voff, nil
36✔
1601
                }
1602
        case TimestampType:
51,099✔
1603
                {
102,198✔
1604
                        if vlen != 8 {
51,100✔
1605
                                return nil, 0, ErrCorruptedData
1✔
1606
                        }
1✔
1607

1608
                        v := binary.BigEndian.Uint64(b[voff:])
51,098✔
1609
                        voff += vlen
51,098✔
1610

51,098✔
1611
                        return &Timestamp{val: TimeFromInt64(int64(v))}, voff, nil
51,098✔
1612
                }
1613
        case Float64Type:
51,125✔
1614
                {
102,250✔
1615
                        if vlen != 8 {
51,125✔
1616
                                return nil, 0, ErrCorruptedData
×
1617
                        }
×
1618
                        v := binary.BigEndian.Uint64(b[voff:])
51,125✔
1619
                        voff += vlen
51,125✔
1620
                        return &Float64{val: math.Float64frombits(v)}, voff, nil
51,125✔
1621
                }
1622
        }
1623

1624
        return nil, 0, ErrCorruptedData
1✔
1625
}
1626

1627
// addSchemaToTx adds the schema of the catalog to the given transaction.
1628
func (catlg *Catalog) addSchemaToTx(ctx context.Context, tx *store.OngoingTx) error {
27✔
1629
        return catlg.loadCatalog(ctx, tx, true)
27✔
1630
}
27✔
1631

1632
func iteratePrefix(ctx context.Context, tx *store.OngoingTx, prefix []byte, onSpec func(key, value []byte, deleted bool) error) error {
13,754✔
1633
        dbReaderSpec := store.KeyReaderSpec{
13,754✔
1634
                Prefix: prefix,
13,754✔
1635
        }
13,754✔
1636

13,754✔
1637
        colSpecReader, err := tx.NewKeyReader(dbReaderSpec)
13,754✔
1638
        if err != nil {
13,755✔
1639
                return err
1✔
1640
        }
1✔
1641
        defer colSpecReader.Close()
13,753✔
1642

13,753✔
1643
        for {
48,353✔
1644
                mkey, vref, err := colSpecReader.Read(ctx)
34,600✔
1645
                if errors.Is(err, store.ErrNoMoreEntries) {
48,343✔
1646
                        break
13,743✔
1647
                }
1648
                if err != nil {
20,857✔
NEW
1649
                        return err
×
1650
                }
×
1651

1652
                md := vref.KVMetadata()
20,857✔
1653
                if md != nil && md.IsExpirable() {
20,858✔
1654
                        return ErrBrokenCatalogColSpecExpirable
1✔
1655
                }
1✔
1656

1657
                deleted := md != nil && md.Deleted()
20,856✔
1658
                var v []byte
20,856✔
1659
                if !deleted {
41,572✔
1660
                        v, err = vref.Resolve()
20,716✔
1661
                        if err != nil {
20,716✔
NEW
1662
                                return err
×
NEW
1663
                        }
×
1664
                }
1665

1666
                err = onSpec(mkey, v, deleted)
20,856✔
1667
                if err != nil {
20,865✔
1668
                        return err
9✔
1669
                }
9✔
1670
        }
1671
        return nil
13,743✔
1672
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc