• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6783902372

07 Nov 2023 11:34AM UTC coverage: 89.548% (-0.02%) from 89.571%
6783902372

push

gh-ci

jeroiraz
test(pkg/pgsql): unit testing for deallocate stmt

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33645 of 37572 relevant lines covered (89.55%)

146027.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.39
/embedded/sql/catalog.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bytes"
21
        "context"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "io"
26
        "math"
27
        "strings"
28
        "time"
29

30
        "github.com/codenotary/immudb/embedded/store"
31
        "github.com/google/uuid"
32
)
33

34
// Catalog represents a database catalog containing metadata for all tables in the database.
35
type Catalog struct {
36
        enginePrefix []byte
37

38
        tables       []*Table
39
        tablesByID   map[uint32]*Table
40
        tablesByName map[string]*Table
41

42
        maxTableID uint32 // The maxTableID variable is used to assign unique ids to new tables as they are created.
43
}
44

45
type Table struct {
46
        catalog         *Catalog
47
        id              uint32
48
        name            string
49
        cols            []*Column
50
        colsByID        map[uint32]*Column
51
        colsByName      map[string]*Column
52
        indexes         []*Index
53
        indexesByName   map[string]*Index
54
        indexesByColID  map[uint32][]*Index
55
        primaryIndex    *Index
56
        autoIncrementPK bool
57
        maxPK           int64
58

59
        maxColID   uint32
60
        maxIndexID uint32
61
}
62

63
type Index struct {
64
        table    *Table
65
        id       uint32
66
        unique   bool
67
        cols     []*Column
68
        colsByID map[uint32]*Column
69
}
70

71
type Column struct {
72
        table         *Table
73
        id            uint32
74
        colName       string
75
        colType       SQLValueType
76
        maxLen        int
77
        autoIncrement bool
78
        notNull       bool
79
}
80

81
func newCatalog(enginePrefix []byte) *Catalog {
1,932✔
82
        ctlg := &Catalog{
1,932✔
83
                enginePrefix: enginePrefix,
1,932✔
84
                tablesByID:   make(map[uint32]*Table),
1,932✔
85
                tablesByName: make(map[string]*Table),
1,932✔
86
        }
1,932✔
87

1,932✔
88
        pgTypeTable := &Table{
1,932✔
89
                catalog: ctlg,
1,932✔
90
                name:    "pg_type",
1,932✔
91
                cols: []*Column{
1,932✔
92
                        {
1,932✔
93
                                colName: "oid",
1,932✔
94
                                colType: VarcharType,
1,932✔
95
                                maxLen:  10,
1,932✔
96
                        },
1,932✔
97
                        {
1,932✔
98
                                colName: "typbasetype",
1,932✔
99
                                colType: VarcharType,
1,932✔
100
                                maxLen:  10,
1,932✔
101
                        },
1,932✔
102
                        {
1,932✔
103
                                colName: "typname",
1,932✔
104
                                colType: VarcharType,
1,932✔
105
                                maxLen:  50,
1,932✔
106
                        },
1,932✔
107
                },
1,932✔
108
        }
1,932✔
109

1,932✔
110
        pgTypeTable.colsByName = make(map[string]*Column, len(pgTypeTable.cols))
1,932✔
111

1,932✔
112
        for _, col := range pgTypeTable.cols {
7,728✔
113
                pgTypeTable.colsByName[col.colName] = col
5,796✔
114
        }
5,796✔
115

116
        pgTypeTable.indexes = []*Index{
1,932✔
117
                {
1,932✔
118
                        unique: true,
1,932✔
119
                        cols: []*Column{
1,932✔
120
                                pgTypeTable.colsByName["oid"],
1,932✔
121
                        },
1,932✔
122
                        colsByID: map[uint32]*Column{
1,932✔
123
                                0: pgTypeTable.colsByName["oid"],
1,932✔
124
                        },
1,932✔
125
                },
1,932✔
126
        }
1,932✔
127

1,932✔
128
        pgTypeTable.primaryIndex = pgTypeTable.indexes[0]
1,932✔
129
        ctlg.tablesByName[pgTypeTable.name] = pgTypeTable
1,932✔
130

1,932✔
131
        return ctlg
1,932✔
132
}
133

134
func (catlg *Catalog) ExistTable(table string) bool {
2,523✔
135
        _, exists := catlg.tablesByName[table]
2,523✔
136
        return exists
2,523✔
137
}
2,523✔
138

139
func (catlg *Catalog) GetTables() []*Table {
1,916✔
140
        ts := make([]*Table, 0, len(catlg.tables))
1,916✔
141

1,916✔
142
        ts = append(ts, catlg.tables...)
1,916✔
143

1,916✔
144
        return ts
1,916✔
145
}
1,916✔
146

147
func (catlg *Catalog) GetTableByName(name string) (*Table, error) {
2,755✔
148
        table, exists := catlg.tablesByName[name]
2,755✔
149
        if !exists {
2,788✔
150
                return nil, fmt.Errorf("%w (%s)", ErrTableDoesNotExist, name)
33✔
151
        }
33✔
152
        return table, nil
2,722✔
153
}
154

155
func (catlg *Catalog) GetTableByID(id uint32) (*Table, error) {
2,485✔
156
        table, exists := catlg.tablesByID[id]
2,485✔
157
        if !exists {
4,968✔
158
                return nil, ErrTableDoesNotExist
2,483✔
159
        }
2,483✔
160
        return table, nil
2✔
161
}
162

163
func (t *Table) ID() uint32 {
175✔
164
        return t.id
175✔
165
}
175✔
166

167
func (t *Table) Cols() []*Column {
747✔
168
        cs := make([]*Column, 0, len(t.cols))
747✔
169

747✔
170
        cs = append(cs, t.cols...)
747✔
171

747✔
172
        return cs
747✔
173
}
747✔
174

175
func (t *Table) ColsByName() map[string]*Column {
50✔
176
        cs := make(map[string]*Column, len(t.cols))
50✔
177

50✔
178
        for _, c := range t.cols {
201✔
179
                cs[c.colName] = c
151✔
180
        }
151✔
181

182
        return cs
50✔
183
}
184

185
func (t *Table) Name() string {
257✔
186
        return t.name
257✔
187
}
257✔
188

189
func (t *Table) PrimaryIndex() *Index {
427✔
190
        return t.primaryIndex
427✔
191
}
427✔
192

193
func (t *Table) IsIndexed(colName string) (indexed bool, err error) {
127✔
194
        col, err := t.GetColumnByName(colName)
127✔
195
        if err != nil {
129✔
196
                return false, err
2✔
197
        }
2✔
198

199
        return len(t.indexesByColID[col.id]) > 0, nil
125✔
200
}
201

202
func (t *Table) GetColumnByName(name string) (*Column, error) {
3,037✔
203
        col, exists := t.colsByName[name]
3,037✔
204
        if !exists {
3,054✔
205
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, name)
17✔
206
        }
17✔
207
        return col, nil
3,020✔
208
}
209

210
func (t *Table) GetColumnByID(id uint32) (*Column, error) {
22,300✔
211
        col, exists := t.colsByID[id]
22,300✔
212
        if !exists {
22,407✔
213
                return nil, ErrColumnDoesNotExist
107✔
214
        }
107✔
215
        return col, nil
22,193✔
216
}
217

218
func (t *Table) ColumnsByID() map[uint32]*Column {
6✔
219
        return t.colsByID
6✔
220
}
6✔
221

222
func (t *Table) GetIndexes() []*Index {
25✔
223
        idxs := make([]*Index, 0, len(t.indexes))
25✔
224

25✔
225
        idxs = append(idxs, t.indexes...)
25✔
226

25✔
227
        return idxs
25✔
228
}
25✔
229

230
func (t *Table) GetIndexesByColID(colID uint32) []*Index {
11✔
231
        idxs := make([]*Index, 0, len(t.indexes))
11✔
232

11✔
233
        idxs = append(idxs, t.indexesByColID[colID]...)
11✔
234

11✔
235
        return idxs
11✔
236
}
11✔
237

238
func (t *Table) GetMaxColID() uint32 {
50✔
239
        return t.maxColID
50✔
240
}
50✔
241

242
func (i *Index) IsPrimary() bool {
5,803✔
243
        return i.id == PKIndexID
5,803✔
244
}
5,803✔
245

246
func (i *Index) IsUnique() bool {
1,204✔
247
        return i.unique
1,204✔
248
}
1,204✔
249

250
func (i *Index) Cols() []*Column {
510✔
251
        return i.cols
510✔
252
}
510✔
253

254
func (i *Index) IncludesCol(colID uint32) bool {
34✔
255
        _, ok := i.colsByID[colID]
34✔
256
        return ok
34✔
257
}
34✔
258

259
func (i *Index) enginePrefix() []byte {
3,595✔
260
        return i.table.catalog.enginePrefix
3,595✔
261
}
3,595✔
262

263
func (i *Index) sortableUsing(colID uint32, rangesByColID map[uint32]*typedValueRange) bool {
104✔
264
        // all columns before colID must be fixedValues otherwise the index can not be used
104✔
265
        for _, col := range i.cols {
215✔
266
                if col.id == colID {
207✔
267
                        return true
96✔
268
                }
96✔
269

270
                colRange, ok := rangesByColID[col.id]
15✔
271
                if ok && colRange.unitary() {
22✔
272
                        continue
7✔
273
                }
274

275
                return false
8✔
276
        }
277
        return false
×
278
}
279

280
func (i *Index) Name() string {
8,347✔
281
        return indexName(i.table.name, i.cols)
8,347✔
282
}
8,347✔
283

284
func (i *Index) ID() uint32 {
17✔
285
        return i.id
17✔
286
}
17✔
287

288
func (t *Table) GetIndexByName(name string) (*Index, error) {
31✔
289
        idx, exists := t.indexesByName[name]
31✔
290
        if !exists {
32✔
291
                return nil, fmt.Errorf("%w (%s)", ErrIndexNotFound, name)
1✔
292
        }
1✔
293
        return idx, nil
30✔
294
}
295

296
func indexName(tableName string, cols []*Column) string {
8,377✔
297
        var buf strings.Builder
8,377✔
298

8,377✔
299
        buf.WriteString(tableName)
8,377✔
300

8,377✔
301
        buf.WriteString("(")
8,377✔
302

8,377✔
303
        for c, col := range cols {
17,203✔
304
                buf.WriteString(col.colName)
8,826✔
305

8,826✔
306
                if c < len(cols)-1 {
9,275✔
307
                        buf.WriteString(",")
449✔
308
                }
449✔
309
        }
310

311
        buf.WriteString(")")
8,377✔
312

8,377✔
313
        return buf.String()
8,377✔
314
}
315

316
func (catlg *Catalog) newTable(name string, colsSpec map[uint32]*ColSpec, maxColID uint32) (table *Table, err error) {
2,491✔
317
        if len(name) == 0 || len(colsSpec) == 0 {
2,494✔
318
                return nil, ErrIllegalArguments
3✔
319
        }
3✔
320

321
        for id := range colsSpec {
9,803✔
322
                if id <= 0 || id > maxColID {
7,315✔
323
                        return nil, ErrIllegalArguments
×
324
                }
×
325
        }
326

327
        exists := catlg.ExistTable(name)
2,488✔
328
        if exists {
2,495✔
329
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, name)
7✔
330
        }
7✔
331

332
        // Generate a new ID for the table by incrementing the 'maxTableID' variable of the 'catalog' instance.
333
        id := (catlg.maxTableID + 1)
2,481✔
334

2,481✔
335
        // This code is attempting to check if a table with the given id already exists in the Catalog.
2,481✔
336
        // If the function returns nil for err, it means that the table already exists and the function
2,481✔
337
        // should return an error indicating that the table cannot be created again.
2,481✔
338
        _, err = catlg.GetTableByID(id)
2,481✔
339
        if err == nil {
2,481✔
340
                return nil, fmt.Errorf("%w (%d)", ErrTableAlreadyExists, id)
×
341
        }
×
342

343
        table = &Table{
2,481✔
344
                id:             id,
2,481✔
345
                catalog:        catlg,
2,481✔
346
                name:           name,
2,481✔
347
                cols:           make([]*Column, 0, len(colsSpec)),
2,481✔
348
                colsByID:       make(map[uint32]*Column),
2,481✔
349
                colsByName:     make(map[string]*Column),
2,481✔
350
                indexesByName:  make(map[string]*Index),
2,481✔
351
                indexesByColID: make(map[uint32][]*Index),
2,481✔
352
                maxColID:       maxColID,
2,481✔
353
        }
2,481✔
354

2,481✔
355
        for id := uint32(1); id <= maxColID; id++ {
9,849✔
356
                cs, found := colsSpec[id]
7,368✔
357
                if !found {
7,432✔
358
                        // dropped column
64✔
359
                        continue
64✔
360
                }
361

362
                if cs.colName == revCol {
7,305✔
363
                        return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
1✔
364
                }
1✔
365

366
                _, colExists := table.colsByName[cs.colName]
7,303✔
367
                if colExists {
7,304✔
368
                        return nil, ErrDuplicatedColumn
1✔
369
                }
1✔
370

371
                if cs.autoIncrement && cs.colType != IntegerType {
7,304✔
372
                        return nil, ErrLimitedAutoIncrement
2✔
373
                }
2✔
374

375
                if !validMaxLenForType(cs.maxLen, cs.colType) {
7,300✔
376
                        return nil, ErrLimitedMaxLen
×
377
                }
×
378

379
                col := &Column{
7,300✔
380
                        id:            uint32(id),
7,300✔
381
                        table:         table,
7,300✔
382
                        colName:       cs.colName,
7,300✔
383
                        colType:       cs.colType,
7,300✔
384
                        maxLen:        cs.maxLen,
7,300✔
385
                        autoIncrement: cs.autoIncrement,
7,300✔
386
                        notNull:       cs.notNull,
7,300✔
387
                }
7,300✔
388

7,300✔
389
                table.cols = append(table.cols, col)
7,300✔
390
                table.colsByID[col.id] = col
7,300✔
391
                table.colsByName[col.colName] = col
7,300✔
392
        }
393

394
        catlg.tables = append(catlg.tables, table)
2,477✔
395
        catlg.tablesByID[table.id] = table
2,477✔
396
        catlg.tablesByName[table.name] = table
2,477✔
397

2,477✔
398
        // increment table count on successfull table creation.
2,477✔
399
        // This ensures that each new table is assigned a unique ID
2,477✔
400
        // that has not been used before.
2,477✔
401
        catlg.maxTableID++
2,477✔
402

2,477✔
403
        return table, nil
2,477✔
404
}
405

406
func (catlg *Catalog) deleteTable(table *Table) error {
6✔
407
        _, exists := catlg.tablesByID[table.id]
6✔
408
        if !exists {
6✔
409
                return ErrTableDoesNotExist
×
410
        }
×
411

412
        newTables := make([]*Table, 0, len(catlg.tables)-1)
6✔
413

6✔
414
        for _, t := range catlg.tables {
24✔
415
                if t.id != table.id {
30✔
416
                        newTables = append(newTables, t)
12✔
417
                }
12✔
418
        }
419

420
        catlg.tables = newTables
6✔
421
        delete(catlg.tablesByID, table.id)
6✔
422
        delete(catlg.tablesByName, table.name)
6✔
423

6✔
424
        return nil
6✔
425
}
426

427
func (t *Table) newIndex(unique bool, colIDs []uint32) (index *Index, err error) {
4,174✔
428
        if len(colIDs) < 1 {
4,175✔
429
                return nil, ErrIllegalArguments
1✔
430
        }
1✔
431

432
        // validate column ids
433
        cols := make([]*Column, len(colIDs))
4,173✔
434
        colsByID := make(map[uint32]*Column, len(colIDs))
4,173✔
435

4,173✔
436
        for i, colID := range colIDs {
8,565✔
437
                col, err := t.GetColumnByID(colID)
4,392✔
438
                if err != nil {
4,393✔
439
                        return nil, err
1✔
440
                }
1✔
441

442
                _, ok := colsByID[colID]
4,391✔
443
                if ok {
4,392✔
444
                        return nil, ErrDuplicatedColumn
1✔
445
                }
1✔
446

447
                cols[i] = col
4,390✔
448
                colsByID[colID] = col
4,390✔
449
        }
450

451
        index = &Index{
4,171✔
452
                id:       uint32(t.maxIndexID),
4,171✔
453
                table:    t,
4,171✔
454
                unique:   unique,
4,171✔
455
                cols:     cols,
4,171✔
456
                colsByID: colsByID,
4,171✔
457
        }
4,171✔
458

4,171✔
459
        _, exists := t.indexesByName[index.Name()]
4,171✔
460
        if exists {
4,177✔
461
                return nil, ErrIndexAlreadyExists
6✔
462
        }
6✔
463

464
        t.indexes = append(t.indexes, index)
4,165✔
465
        t.indexesByName[index.Name()] = index
4,165✔
466

4,165✔
467
        // having a direct way to get the indexes by colID
4,165✔
468
        for _, col := range index.cols {
8,547✔
469
                t.indexesByColID[col.id] = append(t.indexesByColID[col.id], index)
4,382✔
470
        }
4,382✔
471

472
        if index.id == PKIndexID {
6,625✔
473
                t.primaryIndex = index
2,460✔
474
                t.autoIncrementPK = len(index.cols) == 1 && index.cols[0].autoIncrement
2,460✔
475
        }
2,460✔
476

477
        // increment table count on successfull table creation.
478
        // This ensures that each new table is assigned a unique ID
479
        // that has not been used before.
480
        t.maxIndexID++
4,165✔
481

4,165✔
482
        return index, nil
4,165✔
483
}
484

485
func (t *Table) newColumn(spec *ColSpec) (*Column, error) {
19✔
486
        if spec.colName == revCol {
20✔
487
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
1✔
488
        }
1✔
489

490
        if spec.autoIncrement {
19✔
491
                return nil, fmt.Errorf("%w (%s)", ErrLimitedAutoIncrement, spec.colName)
1✔
492
        }
1✔
493

494
        if spec.notNull {
18✔
495
                return nil, fmt.Errorf("%w (%s)", ErrNewColumnMustBeNullable, spec.colName)
1✔
496
        }
1✔
497

498
        if !validMaxLenForType(spec.maxLen, spec.colType) {
17✔
499
                return nil, fmt.Errorf("%w (%s)", ErrLimitedMaxLen, spec.colName)
1✔
500
        }
1✔
501

502
        _, exists := t.colsByName[spec.colName]
15✔
503
        if exists {
18✔
504
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, spec.colName)
3✔
505
        }
3✔
506

507
        t.maxColID++
12✔
508

12✔
509
        col := &Column{
12✔
510
                id:            t.maxColID,
12✔
511
                table:         t,
12✔
512
                colName:       spec.colName,
12✔
513
                colType:       spec.colType,
12✔
514
                maxLen:        spec.maxLen,
12✔
515
                autoIncrement: spec.autoIncrement,
12✔
516
                notNull:       spec.notNull,
12✔
517
        }
12✔
518

12✔
519
        t.cols = append(t.cols, col)
12✔
520
        t.colsByID[col.id] = col
12✔
521
        t.colsByName[col.colName] = col
12✔
522

12✔
523
        return col, nil
12✔
524
}
525

526
func (ctlg *Catalog) renameTable(oldName, newName string) (*Table, error) {
5✔
527
        if oldName == newName {
6✔
528
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
529
        }
1✔
530

531
        t, err := ctlg.GetTableByName(oldName)
4✔
532
        if err != nil {
5✔
533
                return nil, err
1✔
534
        }
1✔
535

536
        _, err = ctlg.GetTableByName(newName)
3✔
537
        if err == nil {
4✔
538
                return nil, fmt.Errorf("%w (%s)", ErrTableAlreadyExists, newName)
1✔
539
        }
1✔
540

541
        t.name = newName
2✔
542

2✔
543
        delete(ctlg.tablesByName, oldName)
2✔
544
        ctlg.tablesByName[newName] = t
2✔
545

2✔
546
        return t, nil
2✔
547
}
548

549
func (t *Table) renameColumn(oldName, newName string) (*Column, error) {
9✔
550
        if newName == revCol {
9✔
551
                return nil, fmt.Errorf("%w(%s)", ErrReservedWord, revCol)
×
552
        }
×
553

554
        if oldName == newName {
10✔
555
                return nil, fmt.Errorf("%w (%s)", ErrSameOldAndNewNames, oldName)
1✔
556
        }
1✔
557

558
        col, exists := t.colsByName[oldName]
8✔
559
        if !exists {
9✔
560
                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, oldName)
1✔
561
        }
1✔
562

563
        _, exists = t.colsByName[newName]
7✔
564
        if exists {
8✔
565
                return nil, fmt.Errorf("%w (%s)", ErrColumnAlreadyExists, newName)
1✔
566
        }
1✔
567

568
        col.colName = newName
6✔
569

6✔
570
        delete(t.colsByName, oldName)
6✔
571
        t.colsByName[newName] = col
6✔
572

6✔
573
        return col, nil
6✔
574
}
575

576
func (t *Table) deleteColumn(col *Column) error {
11✔
577
        isIndexed, err := t.IsIndexed(col.colName)
11✔
578
        if err != nil {
11✔
579
                return err
×
580
        }
×
581

582
        if isIndexed {
15✔
583
                return fmt.Errorf("%w (%s)", ErrCantDropIndexedColumn, col.colName)
4✔
584
        }
4✔
585

586
        newCols := make([]*Column, 0, len(t.cols)-1)
7✔
587

7✔
588
        for _, c := range t.cols {
48✔
589
                if c.id != col.id {
75✔
590
                        newCols = append(newCols, c)
34✔
591
                }
34✔
592
        }
593

594
        t.cols = newCols
7✔
595
        delete(t.colsByName, col.colName)
7✔
596
        delete(t.colsByID, col.id)
7✔
597

7✔
598
        return nil
7✔
599
}
600

601
func (t *Table) deleteIndex(index *Index) error {
5✔
602
        if index.IsPrimary() {
6✔
603
                return fmt.Errorf("%w: primary key index can NOT be deleted", ErrIllegalArguments)
1✔
604
        }
1✔
605

606
        newIndexes := make([]*Index, 0, len(t.indexes)-1)
4✔
607

4✔
608
        for _, i := range t.indexes {
13✔
609
                if i.id != index.id {
14✔
610
                        newIndexes = append(newIndexes, i)
5✔
611
                }
5✔
612
        }
613

614
        t.indexes = newIndexes
4✔
615
        delete(t.indexesByColID, index.id)
4✔
616
        delete(t.indexesByName, index.Name())
4✔
617

4✔
618
        return nil
4✔
619
}
620

621
func (c *Column) ID() uint32 {
684✔
622
        return c.id
684✔
623
}
684✔
624

625
func (c *Column) Name() string {
3,147✔
626
        return c.colName
3,147✔
627
}
3,147✔
628

629
func (c *Column) Type() SQLValueType {
4,466✔
630
        return c.colType
4,466✔
631
}
4,466✔
632

633
func (c *Column) MaxLen() int {
14,499✔
634
        switch c.colType {
14,499✔
635
        case BooleanType:
490✔
636
                return 1
490✔
637
        case IntegerType:
9,085✔
638
                return 8
9,085✔
639
        case TimestampType:
404✔
640
                return 8
404✔
641
        case Float64Type:
535✔
642
                return 8
535✔
643
        case UUIDType:
22✔
644
                return 16
22✔
645
        }
646

647
        return c.maxLen
3,963✔
648
}
649

650
func (c *Column) IsNullable() bool {
19✔
651
        return !c.notNull
19✔
652
}
19✔
653

654
func (c *Column) IsAutoIncremental() bool {
11✔
655
        return c.autoIncrement
11✔
656
}
11✔
657

658
func validMaxLenForType(maxLen int, sqlType SQLValueType) bool {
7,316✔
659
        switch sqlType {
7,316✔
660
        case BooleanType:
650✔
661
                return maxLen <= 1
650✔
662
        case IntegerType:
3,472✔
663
                return maxLen == 0 || maxLen == 8
3,472✔
664
        case Float64Type:
287✔
665
                return maxLen == 0 || maxLen == 8
287✔
666
        case TimestampType:
228✔
667
                return maxLen == 0 || maxLen == 8
228✔
668
        case UUIDType:
57✔
669
                return maxLen == 0 || maxLen == 16
57✔
670
        }
671

672
        return maxLen >= 0
2,622✔
673
}
674

675
func (catlg *Catalog) load(ctx context.Context, tx *store.OngoingTx) error {
1,904✔
676
        dbReaderSpec := store.KeyReaderSpec{
1,904✔
677
                Prefix: MapKey(catlg.enginePrefix, catalogTablePrefix, EncodeID(1)),
1,904✔
678
        }
1,904✔
679

1,904✔
680
        tableReader, err := tx.NewKeyReader(dbReaderSpec)
1,904✔
681
        if err != nil {
1,905✔
682
                return err
1✔
683
        }
1✔
684
        defer tableReader.Close()
1,903✔
685

1,903✔
686
        for {
6,099✔
687
                mkey, vref, err := tableReader.Read(ctx)
4,196✔
688
                if errors.Is(err, store.ErrNoMoreEntries) {
6,094✔
689
                        break
1,898✔
690
                }
691
                if err != nil {
2,298✔
692
                        return err
×
693
                }
×
694

695
                dbID, tableID, err := unmapTableID(catlg.enginePrefix, mkey)
2,298✔
696
                if err != nil {
2,298✔
697
                        return err
×
698
                }
×
699

700
                if dbID != 1 {
2,298✔
701
                        return ErrCorruptedData
×
702
                }
×
703

704
                // Retrieve the key-value metadata (KVMetadata) of the current version reference (vref).
705
                // If the metadata is not nil and the "Deleted" flag of the metadata is set to true,
706
                // increment the catalog's table count by 1 and continue to the next iteration.
707
                // This implies this is a deleted table and we should not load it.
708
                md := vref.KVMetadata()
2,298✔
709
                if md != nil && md.Deleted() {
2,327✔
710
                        catlg.maxTableID++
29✔
711
                        continue
29✔
712
                }
713

714
                colSpecs, maxColID, err := loadColSpecs(ctx, dbID, tableID, tx, catlg.enginePrefix)
2,269✔
715
                if err != nil {
2,273✔
716
                        return err
4✔
717
                }
4✔
718

719
                v, err := vref.Resolve()
2,265✔
720
                if err != nil {
2,265✔
721
                        return err
×
722
                }
×
723

724
                table, err := catlg.newTable(string(v), colSpecs, maxColID)
2,265✔
725
                if err != nil {
2,265✔
726
                        return err
×
727
                }
×
728

729
                if tableID != table.id {
2,265✔
730
                        return ErrCorruptedData
×
731
                }
×
732

733
                err = table.loadIndexes(ctx, catlg.enginePrefix, tx)
2,265✔
734
                if err != nil {
2,266✔
735
                        return err
1✔
736
                }
1✔
737
        }
738

739
        return nil
1,898✔
740
}
741

742
func loadMaxPK(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx, table *Table) ([]byte, error) {
1,272✔
743
        pkReaderSpec := store.KeyReaderSpec{
1,272✔
744
                Prefix:    MapKey(sqlPrefix, MappedPrefix, EncodeID(table.id), EncodeID(table.primaryIndex.id)),
1,272✔
745
                DescOrder: true,
1,272✔
746
        }
1,272✔
747

1,272✔
748
        pkReader, err := tx.NewKeyReader(pkReaderSpec)
1,272✔
749
        if err != nil {
1,272✔
750
                return nil, err
×
751
        }
×
752
        defer pkReader.Close()
1,272✔
753

1,272✔
754
        mkey, _, err := pkReader.Read(ctx)
1,272✔
755
        if err != nil {
1,508✔
756
                return nil, err
236✔
757
        }
236✔
758

759
        return unmapIndexEntry(table.primaryIndex, sqlPrefix, mkey)
1,036✔
760
}
761

762
func loadColSpecs(ctx context.Context, dbID, tableID uint32, tx *store.OngoingTx, sqlPrefix []byte) (specs map[uint32]*ColSpec, maxColID uint32, err error) {
2,269✔
763
        initialKey := MapKey(sqlPrefix, catalogColumnPrefix, EncodeID(dbID), EncodeID(tableID))
2,269✔
764

2,269✔
765
        dbReaderSpec := store.KeyReaderSpec{
2,269✔
766
                Prefix: initialKey,
2,269✔
767
        }
2,269✔
768

2,269✔
769
        colSpecReader, err := tx.NewKeyReader(dbReaderSpec)
2,269✔
770
        if err != nil {
2,269✔
771
                return nil, 0, err
×
772
        }
×
773
        defer colSpecReader.Close()
2,269✔
774

2,269✔
775
        specs = make(map[uint32]*ColSpec, 0)
2,269✔
776

2,269✔
777
        for {
11,230✔
778
                mkey, vref, err := colSpecReader.Read(ctx)
8,961✔
779
                if errors.Is(err, store.ErrNoMoreEntries) {
11,226✔
780
                        break
2,265✔
781
                }
782
                if err != nil {
6,696✔
783
                        return nil, 0, err
×
784
                }
×
785

786
                md := vref.KVMetadata()
6,696✔
787
                if md != nil && md.IsExpirable() {
6,697✔
788
                        return nil, 0, ErrBrokenCatalogColSpecExpirable
1✔
789
                }
1✔
790

791
                mdbID, mtableID, colID, colType, err := unmapColSpec(sqlPrefix, mkey)
6,695✔
792
                if err != nil {
6,697✔
793
                        return nil, 0, err
2✔
794
                }
2✔
795

796
                if dbID != mdbID || tableID != mtableID {
6,693✔
797
                        return nil, 0, ErrCorruptedData
×
798
                }
×
799

800
                if colID != maxColID+1 {
6,693✔
801
                        return nil, 0, fmt.Errorf("%w: table columns not stored sequentially", ErrCorruptedData)
×
802
                }
×
803

804
                maxColID = colID
6,693✔
805

6,693✔
806
                if md != nil && md.Deleted() {
6,757✔
807
                        continue
64✔
808
                }
809

810
                v, err := vref.Resolve()
6,629✔
811
                if err != nil {
6,629✔
812
                        return nil, 0, err
×
813
                }
×
814

815
                if len(v) < 6 {
6,630✔
816
                        return nil, 0, fmt.Errorf("%w: mismatch on database or table ids", ErrCorruptedData)
1✔
817
                }
1✔
818

819
                specs[colID] = &ColSpec{
6,628✔
820
                        colName:       string(v[5:]),
6,628✔
821
                        colType:       colType,
6,628✔
822
                        maxLen:        int(binary.BigEndian.Uint32(v[1:])),
6,628✔
823
                        autoIncrement: v[0]&autoIncrementFlag != 0,
6,628✔
824
                        notNull:       v[0]&nullableFlag != 0,
6,628✔
825
                }
6,628✔
826
        }
827

828
        return specs, maxColID, nil
2,265✔
829
}
830

831
func (table *Table) loadIndexes(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx) error {
2,265✔
832
        initialKey := MapKey(sqlPrefix, catalogIndexPrefix, EncodeID(1), EncodeID(table.id))
2,265✔
833

2,265✔
834
        idxReaderSpec := store.KeyReaderSpec{
2,265✔
835
                Prefix: initialKey,
2,265✔
836
        }
2,265✔
837

2,265✔
838
        idxSpecReader, err := tx.NewKeyReader(idxReaderSpec)
2,265✔
839
        if err != nil {
2,265✔
840
                return err
×
841
        }
×
842
        defer idxSpecReader.Close()
2,265✔
843

2,265✔
844
        for {
8,389✔
845
                mkey, vref, err := idxSpecReader.Read(ctx)
6,124✔
846
                if errors.Is(err, store.ErrNoMoreEntries) {
8,388✔
847
                        break
2,264✔
848
                }
849
                if err != nil {
3,860✔
850
                        return err
×
851
                }
×
852

853
                // Retrieve the key-value metadata (KVMetadata) of the current version reference (vref).
854
                // If the metadata is not nil and the "Deleted" flag of the metadata is set to true,
855
                // increment the catalog's index count by 1 and continue to the next iteration.
856
                // This implies this is a deleted index and we should not load it.
857
                md := vref.KVMetadata()
3,860✔
858
                if md != nil && md.Deleted() {
3,880✔
859
                        table.maxIndexID++
20✔
860
                        continue
20✔
861
                }
862

863
                dbID, tableID, indexID, err := unmapIndex(sqlPrefix, mkey)
3,840✔
864
                if err != nil {
3,840✔
865
                        return err
×
866
                }
×
867

868
                if table.id != tableID || dbID != 1 {
3,840✔
869
                        return ErrCorruptedData
×
870
                }
×
871

872
                v, err := vref.Resolve()
3,840✔
873
                if err != nil {
3,840✔
874
                        return err
×
875
                }
×
876

877
                // v={unique {colID1}(ASC|DESC)...{colIDN}(ASC|DESC)}
878
                colSpecLen := EncIDLen + 1
3,840✔
879

3,840✔
880
                if len(v) < 1+colSpecLen || len(v)%colSpecLen != 1 {
3,840✔
881
                        return ErrCorruptedData
×
882
                }
×
883

884
                var colIDs []uint32
3,840✔
885

3,840✔
886
                for i := 1; i < len(v); i += colSpecLen {
7,881✔
887
                        colID := binary.BigEndian.Uint32(v[i:])
4,041✔
888

4,041✔
889
                        // TODO: currently only ASC order is supported
4,041✔
890
                        if v[i+EncIDLen] != 0 {
4,041✔
891
                                return ErrCorruptedData
×
892
                        }
×
893

894
                        colIDs = append(colIDs, colID)
4,041✔
895
                }
896

897
                index, err := table.newIndex(v[0] > 0, colIDs)
3,840✔
898
                if err != nil {
3,841✔
899
                        return err
1✔
900
                }
1✔
901

902
                if indexID != index.id {
3,839✔
903
                        return ErrCorruptedData
×
904
                }
×
905
        }
906

907
        return nil
2,264✔
908
}
909

910
func trimPrefix(prefix, mkey []byte, mappingPrefix []byte) ([]byte, error) {
13,974✔
911
        if len(prefix)+len(mappingPrefix) > len(mkey) ||
13,974✔
912
                !bytes.Equal(prefix, mkey[:len(prefix)]) ||
13,974✔
913
                !bytes.Equal(mappingPrefix, mkey[len(prefix):len(prefix)+len(mappingPrefix)]) {
13,982✔
914
                return nil, ErrIllegalMappedKey
8✔
915
        }
8✔
916

917
        return mkey[len(prefix)+len(mappingPrefix):], nil
13,966✔
918
}
919

920
func unmapTableID(prefix, mkey []byte) (dbID, tableID uint32, err error) {
2,310✔
921
        encID, err := trimPrefix(prefix, mkey, []byte(catalogTablePrefix))
2,310✔
922
        if err != nil {
2,311✔
923
                return 0, 0, err
1✔
924
        }
1✔
925

926
        if len(encID) != EncIDLen*2 {
2,310✔
927
                return 0, 0, ErrCorruptedData
1✔
928
        }
1✔
929

930
        dbID = binary.BigEndian.Uint32(encID)
2,308✔
931
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
2,308✔
932

2,308✔
933
        return
2,308✔
934
}
935

936
func unmapColSpec(prefix, mkey []byte) (dbID, tableID, colID uint32, colType SQLValueType, err error) {
6,731✔
937
        encID, err := trimPrefix(prefix, mkey, []byte(catalogColumnPrefix))
6,731✔
938
        if err != nil {
6,732✔
939
                return 0, 0, 0, "", err
1✔
940
        }
1✔
941

942
        if len(encID) < EncIDLen*3 {
6,731✔
943
                return 0, 0, 0, "", ErrCorruptedData
1✔
944
        }
1✔
945

946
        dbID = binary.BigEndian.Uint32(encID)
6,729✔
947
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
6,729✔
948
        colID = binary.BigEndian.Uint32(encID[2*EncIDLen:])
6,729✔
949

6,729✔
950
        colType, err = asType(string(encID[EncIDLen*3:]))
6,729✔
951
        if err != nil {
6,732✔
952
                return 0, 0, 0, "", ErrCorruptedData
3✔
953
        }
3✔
954

955
        return
6,726✔
956
}
957

958
func asType(t string) (SQLValueType, error) {
6,729✔
959
        if t == IntegerType ||
6,729✔
960
                t == Float64Type ||
6,729✔
961
                t == BooleanType ||
6,729✔
962
                t == VarcharType ||
6,729✔
963
                t == UUIDType ||
6,729✔
964
                t == BLOBType ||
6,729✔
965
                t == TimestampType {
13,455✔
966
                return t, nil
6,726✔
967
        }
6,726✔
968

969
        return t, ErrCorruptedData
3✔
970
}
971

972
func unmapIndex(sqlPrefix, mkey []byte) (dbID, tableID, indexID uint32, err error) {
3,866✔
973
        encID, err := trimPrefix(sqlPrefix, mkey, []byte(catalogIndexPrefix))
3,866✔
974
        if err != nil {
3,867✔
975
                return 0, 0, 0, err
1✔
976
        }
1✔
977

978
        if len(encID) != EncIDLen*3 {
3,866✔
979
                return 0, 0, 0, ErrCorruptedData
1✔
980
        }
1✔
981

982
        dbID = binary.BigEndian.Uint32(encID)
3,864✔
983
        tableID = binary.BigEndian.Uint32(encID[EncIDLen:])
3,864✔
984
        indexID = binary.BigEndian.Uint32(encID[EncIDLen*2:])
3,864✔
985

3,864✔
986
        return
3,864✔
987
}
988

989
func unmapIndexEntry(index *Index, sqlPrefix, mkey []byte) (encPKVals []byte, err error) {
1,061✔
990
        if index == nil {
1,061✔
991
                return nil, ErrIllegalArguments
×
992
        }
×
993

994
        enc, err := trimPrefix(sqlPrefix, mkey, []byte(MappedPrefix))
1,061✔
995
        if err != nil {
1,062✔
996
                return nil, ErrCorruptedData
1✔
997
        }
1✔
998

999
        if len(enc) <= EncIDLen*2 {
1,068✔
1000
                return nil, ErrCorruptedData
8✔
1001
        }
8✔
1002

1003
        off := 0
1,052✔
1004

1,052✔
1005
        tableID := binary.BigEndian.Uint32(enc[off:])
1,052✔
1006
        off += EncIDLen
1,052✔
1007

1,052✔
1008
        indexID := binary.BigEndian.Uint32(enc[off:])
1,052✔
1009
        off += EncIDLen
1,052✔
1010

1,052✔
1011
        if tableID != index.table.id || indexID != index.id {
1,052✔
1012
                return nil, ErrCorruptedData
×
1013
        }
×
1014

1015
        //read index values
1016
        for _, col := range index.cols {
2,104✔
1017
                if enc[off] == KeyValPrefixNull {
1,052✔
1018
                        off += 1
×
1019
                        continue
×
1020
                }
1021
                if enc[off] != KeyValPrefixNotNull {
1,052✔
1022
                        return nil, ErrCorruptedData
×
1023
                }
×
1024
                off += 1
1,052✔
1025

1,052✔
1026
                maxLen := col.MaxLen()
1,052✔
1027
                if variableSizedType(col.colType) {
1,068✔
1028
                        maxLen += EncLenLen
16✔
1029
                }
16✔
1030
                if len(enc)-off < maxLen {
1,066✔
1031
                        return nil, ErrCorruptedData
14✔
1032
                }
14✔
1033

1034
                off += maxLen
1,038✔
1035
        }
1036

1037
        //PK cannot be nil
1038
        if len(enc)-off < 1 {
1,039✔
1039
                return nil, ErrCorruptedData
1✔
1040
        }
1✔
1041

1042
        return enc[off:], nil
1,037✔
1043
}
1044

1045
func variableSizedType(sqlType SQLValueType) bool {
1,402✔
1046
        return sqlType == VarcharType || sqlType == BLOBType
1,402✔
1047
}
1,402✔
1048

1049
func MapKey(prefix []byte, mappingPrefix string, encValues ...[]byte) []byte {
22,142✔
1050
        mkeyLen := len(prefix) + len(mappingPrefix)
22,142✔
1051

22,142✔
1052
        for _, ev := range encValues {
80,666✔
1053
                mkeyLen += len(ev)
58,524✔
1054
        }
58,524✔
1055

1056
        mkey := make([]byte, mkeyLen)
22,142✔
1057

22,142✔
1058
        off := 0
22,142✔
1059

22,142✔
1060
        copy(mkey, prefix)
22,142✔
1061
        off += len(prefix)
22,142✔
1062

22,142✔
1063
        copy(mkey[off:], []byte(mappingPrefix))
22,142✔
1064
        off += len(mappingPrefix)
22,142✔
1065

22,142✔
1066
        for _, ev := range encValues {
80,666✔
1067
                copy(mkey[off:], ev)
58,524✔
1068
                off += len(ev)
58,524✔
1069
        }
58,524✔
1070

1071
        return mkey
22,142✔
1072
}
1073

1074
func EncodeID(id uint32) []byte {
47,111✔
1075
        var encID [EncIDLen]byte
47,111✔
1076
        binary.BigEndian.PutUint32(encID[:], id)
47,111✔
1077
        return encID[:]
47,111✔
1078
}
47,111✔
1079

1080
const (
1081
        KeyValPrefixNull       byte = 0x20
1082
        KeyValPrefixNotNull    byte = 0x80
1083
        KeyValPrefixUpperBound byte = 0xFF
1084
)
1085

1086
func EncodeValueAsKey(val TypedValue, colType SQLValueType, maxLen int) ([]byte, int, error) {
9,406✔
1087
        return EncodeRawValueAsKey(val.RawValue(), colType, maxLen)
9,406✔
1088
}
9,406✔
1089

1090
// EncodeRawValueAsKey encodes a value in a b-tree meaningful way.
1091
func EncodeRawValueAsKey(val interface{}, colType SQLValueType, maxLen int) ([]byte, int, error) {
9,540✔
1092
        if maxLen <= 0 {
9,543✔
1093
                return nil, 0, ErrInvalidValue
3✔
1094
        }
3✔
1095
        if maxLen > MaxKeyLen {
9,538✔
1096
                return nil, 0, ErrMaxKeyLengthExceeded
1✔
1097
        }
1✔
1098

1099
        convVal, err := mayApplyImplicitConversion(val, colType)
9,536✔
1100
        if err != nil {
9,537✔
1101
                return nil, 0, err
1✔
1102
        }
1✔
1103

1104
        if convVal == nil {
9,620✔
1105
                return []byte{KeyValPrefixNull}, 0, nil
85✔
1106
        }
85✔
1107

1108
        switch colType {
9,450✔
1109
        case VarcharType:
640✔
1110
                {
1,280✔
1111
                        strVal, ok := convVal.(string)
640✔
1112
                        if !ok {
641✔
1113
                                return nil, 0, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
1✔
1114
                        }
1✔
1115

1116
                        if len(strVal) > maxLen {
640✔
1117
                                return nil, 0, ErrMaxLengthExceeded
1✔
1118
                        }
1✔
1119

1120
                        // notnull + value + padding + len(value)
1121
                        encv := make([]byte, 1+maxLen+EncLenLen)
638✔
1122
                        encv[0] = KeyValPrefixNotNull
638✔
1123
                        copy(encv[1:], []byte(strVal))
638✔
1124
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(strVal)))
638✔
1125

638✔
1126
                        return encv, len(strVal), nil
638✔
1127
                }
1128
        case IntegerType:
6,279✔
1129
                {
12,558✔
1130
                        if maxLen != 8 {
6,280✔
1131
                                return nil, 0, ErrCorruptedData
1✔
1132
                        }
1✔
1133

1134
                        intVal, ok := convVal.(int64)
6,278✔
1135
                        if !ok {
6,281✔
1136
                                return nil, 0, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
3✔
1137
                        }
3✔
1138

1139
                        // v
1140
                        var encv [9]byte
6,275✔
1141
                        encv[0] = KeyValPrefixNotNull
6,275✔
1142
                        binary.BigEndian.PutUint64(encv[1:], uint64(intVal))
6,275✔
1143
                        // map to unsigned integer space for lexical sorting order
6,275✔
1144
                        encv[1] ^= 0x80
6,275✔
1145

6,275✔
1146
                        return encv[:], 8, nil
6,275✔
1147
                }
1148
        case BooleanType:
187✔
1149
                {
374✔
1150
                        if maxLen != 1 {
188✔
1151
                                return nil, 0, ErrCorruptedData
1✔
1152
                        }
1✔
1153

1154
                        boolVal, ok := convVal.(bool)
186✔
1155
                        if !ok {
187✔
1156
                                return nil, 0, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
1✔
1157
                        }
1✔
1158

1159
                        // v
1160
                        var encv [2]byte
185✔
1161
                        encv[0] = KeyValPrefixNotNull
185✔
1162
                        if boolVal {
300✔
1163
                                encv[1] = 1
115✔
1164
                        }
115✔
1165

1166
                        return encv[:], 1, nil
185✔
1167
                }
1168
        case BLOBType:
1,828✔
1169
                {
3,656✔
1170
                        blobVal, ok := convVal.([]byte)
1,828✔
1171
                        if !ok {
1,829✔
1172
                                return nil, 0, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
1✔
1173
                        }
1✔
1174

1175
                        if len(blobVal) > maxLen {
1,828✔
1176
                                return nil, 0, ErrMaxLengthExceeded
1✔
1177
                        }
1✔
1178

1179
                        // notnull + value + padding + len(value)
1180
                        encv := make([]byte, 1+maxLen+EncLenLen)
1,826✔
1181
                        encv[0] = KeyValPrefixNotNull
1,826✔
1182
                        copy(encv[1:], []byte(blobVal))
1,826✔
1183
                        binary.BigEndian.PutUint32(encv[len(encv)-EncLenLen:], uint32(len(blobVal)))
1,826✔
1184

1,826✔
1185
                        return encv, len(blobVal), nil
1,826✔
1186
                }
1187
        case UUIDType:
10✔
1188
                {
20✔
1189
                        uuidVal, ok := convVal.(uuid.UUID)
10✔
1190
                        if !ok {
10✔
1191
                                return nil, 0, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1192
                        }
×
1193

1194
                        // notnull + value
1195
                        encv := make([]byte, 17)
10✔
1196
                        encv[0] = KeyValPrefixNotNull
10✔
1197
                        copy(encv[1:], uuidVal[:])
10✔
1198

10✔
1199
                        return encv, 16, nil
10✔
1200
                }
1201
        case TimestampType:
202✔
1202
                {
404✔
1203
                        if maxLen != 8 {
203✔
1204
                                return nil, 0, ErrCorruptedData
1✔
1205
                        }
1✔
1206

1207
                        timeVal, ok := convVal.(time.Time)
201✔
1208
                        if !ok {
202✔
1209
                                return nil, 0, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1210
                        }
1✔
1211

1212
                        // v
1213
                        var encv [9]byte
200✔
1214
                        encv[0] = KeyValPrefixNotNull
200✔
1215
                        binary.BigEndian.PutUint64(encv[1:], uint64(timeVal.UnixNano()))
200✔
1216
                        // map to unsigned integer space for lexical sorting order
200✔
1217
                        encv[1] ^= 0x80
200✔
1218

200✔
1219
                        return encv[:], 8, nil
200✔
1220
                }
1221
        case Float64Type:
303✔
1222
                {
606✔
1223
                        floatVal, ok := convVal.(float64)
303✔
1224
                        if !ok {
303✔
1225
                                return nil, 0, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1226
                        }
×
1227

1228
                        // Apart form the sign bit, bit representation of float64
1229
                        // can be sorted lexicographically
1230
                        floatBits := math.Float64bits(floatVal)
303✔
1231

303✔
1232
                        var encv [9]byte
303✔
1233
                        encv[0] = KeyValPrefixNotNull
303✔
1234
                        binary.BigEndian.PutUint64(encv[1:], floatBits)
303✔
1235

303✔
1236
                        if encv[1]&0x80 != 0 {
319✔
1237
                                // For negative numbers, the order must be reversed,
16✔
1238
                                // we also negate the sign bit so that all negative
16✔
1239
                                // numbers end up in the smaller half of values
16✔
1240
                                for i := 1; i < 9; i++ {
144✔
1241
                                        encv[i] = ^encv[i]
128✔
1242
                                }
128✔
1243
                        } else {
287✔
1244
                                // For positive numbers, the order is already correct,
287✔
1245
                                // we only have to set the sign bit to 1 to ensure that
287✔
1246
                                // positive numbers end in the larger half of values
287✔
1247
                                encv[1] ^= 0x80
287✔
1248
                        }
287✔
1249

1250
                        return encv[:], 8, nil
303✔
1251
                }
1252
        }
1253

1254
        return nil, 0, ErrInvalidValue
1✔
1255
}
1256

1257
func EncodeValue(val TypedValue, colType SQLValueType, maxLen int) ([]byte, error) {
2,831✔
1258
        return EncodeRawValue(val.RawValue(), colType, maxLen)
2,831✔
1259
}
2,831✔
1260

1261
// EncodeRawValue encode a value in a byte format. This is the internal binary representation of a value. Can be decoded with DecodeValue.
1262
func EncodeRawValue(val interface{}, colType SQLValueType, maxLen int) ([]byte, error) {
2,831✔
1263
        convVal, err := mayApplyImplicitConversion(val, colType)
2,831✔
1264
        if err != nil {
2,831✔
1265
                return nil, err
×
1266
        }
×
1267

1268
        if convVal == nil {
2,831✔
1269
                return nil, ErrInvalidValue
×
1270
        }
×
1271

1272
        switch colType {
2,831✔
1273
        case VarcharType:
579✔
1274
                {
1,158✔
1275
                        strVal, ok := convVal.(string)
579✔
1276
                        if !ok {
583✔
1277
                                return nil, fmt.Errorf("value is not a string: %w", ErrInvalidValue)
4✔
1278
                        }
4✔
1279

1280
                        if maxLen > 0 && len(strVal) > maxLen {
578✔
1281
                                return nil, ErrMaxLengthExceeded
3✔
1282
                        }
3✔
1283

1284
                        // len(v) + v
1285
                        encv := make([]byte, EncLenLen+len(strVal))
572✔
1286
                        binary.BigEndian.PutUint32(encv[:], uint32(len(strVal)))
572✔
1287
                        copy(encv[EncLenLen:], []byte(strVal))
572✔
1288

572✔
1289
                        return encv, nil
572✔
1290
                }
1291
        case IntegerType:
1,273✔
1292
                {
2,546✔
1293
                        intVal, ok := convVal.(int64)
1,273✔
1294
                        if !ok {
1,275✔
1295
                                return nil, fmt.Errorf("value is not an integer: %w", ErrInvalidValue)
2✔
1296
                        }
2✔
1297

1298
                        // map to unsigned integer space
1299
                        // len(v) + v
1300
                        var encv [EncLenLen + 8]byte
1,271✔
1301
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
1,271✔
1302
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(intVal))
1,271✔
1303

1,271✔
1304
                        return encv[:], nil
1,271✔
1305
                }
1306
        case BooleanType:
197✔
1307
                {
394✔
1308
                        boolVal, ok := convVal.(bool)
197✔
1309
                        if !ok {
200✔
1310
                                return nil, fmt.Errorf("value is not a boolean: %w", ErrInvalidValue)
3✔
1311
                        }
3✔
1312

1313
                        // len(v) + v
1314
                        var encv [EncLenLen + 1]byte
194✔
1315
                        binary.BigEndian.PutUint32(encv[:], uint32(1))
194✔
1316
                        if boolVal {
311✔
1317
                                encv[EncLenLen] = 1
117✔
1318
                        }
117✔
1319

1320
                        return encv[:], nil
194✔
1321
                }
1322
        case BLOBType:
412✔
1323
                {
824✔
1324
                        var blobVal []byte
412✔
1325

412✔
1326
                        if val != nil {
824✔
1327
                                v, ok := convVal.([]byte)
412✔
1328
                                if !ok {
415✔
1329
                                        return nil, fmt.Errorf("value is not a blob: %w", ErrInvalidValue)
3✔
1330
                                }
3✔
1331
                                blobVal = v
409✔
1332
                        }
1333

1334
                        if maxLen > 0 && len(blobVal) > maxLen {
412✔
1335
                                return nil, ErrMaxLengthExceeded
3✔
1336
                        }
3✔
1337

1338
                        // len(v) + v
1339
                        encv := make([]byte, EncLenLen+len(blobVal))
406✔
1340
                        binary.BigEndian.PutUint32(encv[:], uint32(len(blobVal)))
406✔
1341
                        copy(encv[EncLenLen:], blobVal)
406✔
1342

406✔
1343
                        return encv[:], nil
406✔
1344
                }
1345
        case UUIDType:
5✔
1346
                {
10✔
1347
                        uuidVal, ok := convVal.(uuid.UUID)
5✔
1348
                        if !ok {
5✔
1349
                                return nil, fmt.Errorf("value is not an UUID: %w", ErrInvalidValue)
×
1350
                        }
×
1351

1352
                        // len(v) + v
1353
                        var encv [EncLenLen + 16]byte
5✔
1354
                        binary.BigEndian.PutUint32(encv[:], uint32(16))
5✔
1355
                        copy(encv[EncLenLen:], uuidVal[:])
5✔
1356

5✔
1357
                        return encv[:], nil
5✔
1358
                }
1359
        case TimestampType:
192✔
1360
                {
384✔
1361
                        timeVal, ok := convVal.(time.Time)
192✔
1362
                        if !ok {
193✔
1363
                                return nil, fmt.Errorf("value is not a timestamp: %w", ErrInvalidValue)
1✔
1364
                        }
1✔
1365

1366
                        // len(v) + v
1367
                        var encv [EncLenLen + 8]byte
191✔
1368
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
191✔
1369
                        binary.BigEndian.PutUint64(encv[EncLenLen:], uint64(TimeToInt64(timeVal)))
191✔
1370

191✔
1371
                        return encv[:], nil
191✔
1372
                }
1373
        case Float64Type:
171✔
1374
                {
342✔
1375
                        floatVal, ok := convVal.(float64)
171✔
1376
                        if !ok {
171✔
1377
                                return nil, fmt.Errorf("value is not a float: %w", ErrInvalidValue)
×
1378
                        }
×
1379

1380
                        var encv [EncLenLen + 8]byte
171✔
1381
                        floatBits := math.Float64bits(floatVal)
171✔
1382
                        binary.BigEndian.PutUint32(encv[:], uint32(8))
171✔
1383
                        binary.BigEndian.PutUint64(encv[EncLenLen:], floatBits)
171✔
1384

171✔
1385
                        return encv[:], nil
171✔
1386
                }
1387
        }
1388

1389
        return nil, ErrInvalidValue
2✔
1390
}
1391

1392
func DecodeValueLength(b []byte) (int, int, error) {
17,978✔
1393
        if len(b) < EncLenLen {
17,980✔
1394
                return 0, 0, ErrCorruptedData
2✔
1395
        }
2✔
1396

1397
        vlen := int(binary.BigEndian.Uint32(b[:]))
17,976✔
1398
        voff := EncLenLen
17,976✔
1399

17,976✔
1400
        if vlen < 0 || len(b) < voff+vlen {
17,979✔
1401
                return 0, 0, ErrCorruptedData
3✔
1402
        }
3✔
1403

1404
        return vlen, EncLenLen, nil
17,973✔
1405
}
1406

1407
func DecodeValue(b []byte, colType SQLValueType) (TypedValue, int, error) {
17,951✔
1408
        vlen, voff, err := DecodeValueLength(b)
17,951✔
1409
        if err != nil {
17,955✔
1410
                return nil, 0, err
4✔
1411
        }
4✔
1412

1413
        switch colType {
17,947✔
1414
        case VarcharType:
3,731✔
1415
                {
7,462✔
1416
                        v := string(b[voff : voff+vlen])
3,731✔
1417
                        voff += vlen
3,731✔
1418

3,731✔
1419
                        return &Varchar{val: v}, voff, nil
3,731✔
1420
                }
3,731✔
1421
        case IntegerType:
7,878✔
1422
                {
15,756✔
1423
                        if vlen != 8 {
7,879✔
1424
                                return nil, 0, ErrCorruptedData
1✔
1425
                        }
1✔
1426

1427
                        v := binary.BigEndian.Uint64(b[voff:])
7,877✔
1428
                        voff += vlen
7,877✔
1429

7,877✔
1430
                        return &Integer{val: int64(v)}, voff, nil
7,877✔
1431
                }
1432
        case BooleanType:
1,174✔
1433
                {
2,348✔
1434
                        if vlen != 1 {
1,176✔
1435
                                return nil, 0, ErrCorruptedData
2✔
1436
                        }
2✔
1437

1438
                        v := b[voff] == 1
1,172✔
1439
                        voff += 1
1,172✔
1440

1,172✔
1441
                        return &Bool{val: v}, voff, nil
1,172✔
1442
                }
1443
        case BLOBType:
3,402✔
1444
                {
6,804✔
1445
                        v := b[voff : voff+vlen]
3,402✔
1446
                        voff += vlen
3,402✔
1447

3,402✔
1448
                        return &Blob{val: v}, voff, nil
3,402✔
1449
                }
3,402✔
1450
        case UUIDType:
18✔
1451
                {
36✔
1452
                        if vlen != 16 {
18✔
1453
                                return nil, 0, ErrCorruptedData
×
1454
                        }
×
1455

1456
                        u, err := uuid.FromBytes(b[voff : voff+16])
18✔
1457
                        if err != nil {
18✔
1458
                                return nil, 0, fmt.Errorf("%w: %s", ErrCorruptedData, err.Error())
×
1459
                        }
×
1460

1461
                        voff += vlen
18✔
1462

18✔
1463
                        return &UUID{val: u}, voff, nil
18✔
1464
                }
1465
        case TimestampType:
874✔
1466
                {
1,748✔
1467
                        if vlen != 8 {
875✔
1468
                                return nil, 0, ErrCorruptedData
1✔
1469
                        }
1✔
1470

1471
                        v := binary.BigEndian.Uint64(b[voff:])
873✔
1472
                        voff += vlen
873✔
1473

873✔
1474
                        return &Timestamp{val: TimeFromInt64(int64(v))}, voff, nil
873✔
1475
                }
1476
        case Float64Type:
869✔
1477
                {
1,738✔
1478
                        if vlen != 8 {
869✔
1479
                                return nil, 0, ErrCorruptedData
×
1480
                        }
×
1481
                        v := binary.BigEndian.Uint64(b[voff:])
869✔
1482
                        voff += vlen
869✔
1483
                        return &Float64{val: math.Float64frombits(v)}, voff, nil
869✔
1484
                }
1485
        }
1486

1487
        return nil, 0, ErrCorruptedData
1✔
1488
}
1489

1490
// addSchemaToTx adds the schema to the ongoing transaction.
1491
func (t *Table) addIndexesToTx(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx) error {
9✔
1492
        initialKey := MapKey(sqlPrefix, catalogIndexPrefix, EncodeID(1), EncodeID(t.id))
9✔
1493

9✔
1494
        idxReaderSpec := store.KeyReaderSpec{
9✔
1495
                Prefix: initialKey,
9✔
1496
        }
9✔
1497

9✔
1498
        idxSpecReader, err := tx.NewKeyReader(idxReaderSpec)
9✔
1499
        if err != nil {
9✔
1500
                return err
×
1501
        }
×
1502
        defer idxSpecReader.Close()
9✔
1503

9✔
1504
        for {
41✔
1505
                mkey, vref, err := idxSpecReader.Read(ctx)
32✔
1506
                if errors.Is(err, store.ErrNoMoreEntries) {
41✔
1507
                        break
9✔
1508
                }
1509
                if err != nil {
23✔
1510
                        return err
×
1511
                }
×
1512

1513
                dbID, tableID, _, err := unmapIndex(sqlPrefix, mkey)
23✔
1514
                if err != nil {
23✔
1515
                        return err
×
1516
                }
×
1517

1518
                if t.id != tableID || dbID != 1 {
23✔
1519
                        return ErrCorruptedData
×
1520
                }
×
1521

1522
                v, err := vref.Resolve()
23✔
1523
                if err == io.EOF {
23✔
1524
                        continue
×
1525
                }
1526
                if err != nil {
23✔
1527
                        return err
×
1528
                }
×
1529

1530
                err = tx.Set(mkey, nil, v)
23✔
1531
                if err != nil {
23✔
1532
                        return err
×
1533
                }
×
1534
        }
1535

1536
        return nil
9✔
1537
}
1538

1539
// addSchemaToTx adds the schema of the catalog to the given transaction.
1540
func (catlg *Catalog) addSchemaToTx(ctx context.Context, sqlPrefix []byte, tx *store.OngoingTx) error {
27✔
1541
        dbReaderSpec := store.KeyReaderSpec{
27✔
1542
                Prefix: MapKey(sqlPrefix, catalogTablePrefix, EncodeID(1)),
27✔
1543
        }
27✔
1544

27✔
1545
        tableReader, err := tx.NewKeyReader(dbReaderSpec)
27✔
1546
        if err != nil {
27✔
1547
                return err
×
1548
        }
×
1549
        defer tableReader.Close()
27✔
1550

27✔
1551
        for {
63✔
1552
                mkey, vref, err := tableReader.Read(ctx)
36✔
1553
                if errors.Is(err, store.ErrNoMoreEntries) {
63✔
1554
                        break
27✔
1555
                }
1556
                if err != nil {
9✔
1557
                        return err
×
1558
                }
×
1559

1560
                dbID, tableID, err := unmapTableID(sqlPrefix, mkey)
9✔
1561
                if err != nil {
9✔
1562
                        return err
×
1563
                }
×
1564

1565
                if dbID != 1 {
9✔
1566
                        return ErrCorruptedData
×
1567
                }
×
1568

1569
                // read col specs into tx
1570
                colSpecs, maxColID, err := addColSpecsToTx(ctx, tx, sqlPrefix, tableID)
9✔
1571
                if err != nil {
9✔
1572
                        return err
×
1573
                }
×
1574

1575
                v, err := vref.Resolve()
9✔
1576
                if err == io.EOF {
9✔
1577
                        continue
×
1578
                }
1579
                if err != nil {
9✔
1580
                        return err
×
1581
                }
×
1582

1583
                err = tx.Set(mkey, nil, v)
9✔
1584
                if err != nil {
9✔
1585
                        return err
×
1586
                }
×
1587

1588
                table, err := catlg.newTable(string(v), colSpecs, maxColID)
9✔
1589
                if err != nil {
9✔
1590
                        return err
×
1591
                }
×
1592

1593
                if tableID != table.id {
9✔
1594
                        return ErrCorruptedData
×
1595
                }
×
1596

1597
                // read index specs into tx
1598
                err = table.addIndexesToTx(ctx, sqlPrefix, tx)
9✔
1599
                if err != nil {
9✔
1600
                        return err
×
1601
                }
×
1602

1603
        }
1604

1605
        return nil
27✔
1606
}
1607

1608
// addColSpecsToTx adds the column specs of the given table to the given transaction.
1609
func addColSpecsToTx(ctx context.Context, tx *store.OngoingTx, sqlPrefix []byte, tableID uint32) (specs map[uint32]*ColSpec, maxColID uint32, err error) {
9✔
1610
        initialKey := MapKey(sqlPrefix, catalogColumnPrefix, EncodeID(1), EncodeID(tableID))
9✔
1611

9✔
1612
        dbReaderSpec := store.KeyReaderSpec{
9✔
1613
                Prefix: initialKey,
9✔
1614
        }
9✔
1615

9✔
1616
        colSpecReader, err := tx.NewKeyReader(dbReaderSpec)
9✔
1617
        if err != nil {
9✔
1618
                return nil, 0, err
×
1619
        }
×
1620
        defer colSpecReader.Close()
9✔
1621

9✔
1622
        specs = make(map[uint32]*ColSpec, 0)
9✔
1623

9✔
1624
        for {
50✔
1625
                mkey, vref, err := colSpecReader.Read(ctx)
41✔
1626
                if errors.Is(err, store.ErrNoMoreEntries) {
50✔
1627
                        break
9✔
1628
                }
1629
                if err != nil {
32✔
1630
                        return nil, 0, err
×
1631
                }
×
1632

1633
                md := vref.KVMetadata()
32✔
1634
                if md != nil && md.IsExpirable() {
32✔
1635
                        return nil, 0, ErrBrokenCatalogColSpecExpirable
×
1636
                }
×
1637

1638
                mdbID, mtableID, colID, colType, err := unmapColSpec(sqlPrefix, mkey)
32✔
1639
                if err != nil {
32✔
1640
                        return nil, 0, err
×
1641
                }
×
1642

1643
                if mdbID != 1 || tableID != mtableID {
32✔
1644
                        return nil, 0, ErrCorruptedData
×
1645
                }
×
1646

1647
                if colID != maxColID+1 {
32✔
1648
                        return nil, 0, fmt.Errorf("%w: table columns not stored sequentially", ErrCorruptedData)
×
1649
                }
×
1650

1651
                maxColID = colID
32✔
1652

32✔
1653
                if md != nil && md.Deleted() {
32✔
1654
                        continue
×
1655
                }
1656

1657
                v, err := vref.Resolve()
32✔
1658
                if err != nil {
32✔
1659
                        return nil, 0, err
×
1660
                }
×
1661
                if len(v) < 6 {
32✔
1662
                        return nil, 0, ErrCorruptedData
×
1663
                }
×
1664

1665
                err = tx.Set(mkey, nil, v)
32✔
1666
                if err != nil {
32✔
1667
                        return nil, 0, err
×
1668
                }
×
1669

1670
                specs[colID] = &ColSpec{
32✔
1671
                        colName:       string(v[5:]),
32✔
1672
                        colType:       colType,
32✔
1673
                        maxLen:        int(binary.BigEndian.Uint32(v[1:])),
32✔
1674
                        autoIncrement: v[0]&autoIncrementFlag != 0,
32✔
1675
                        notNull:       v[0]&nullableFlag != 0,
32✔
1676
                }
32✔
1677
        }
1678

1679
        return specs, maxColID, nil
9✔
1680
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc