• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6850968753

13 Nov 2023 01:58PM UTC coverage: 89.235% (-0.3%) from 89.556%
6850968753

push

gh-ci

jeroiraz
feat(embedded/sql): show table stmt

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

82 of 169 new or added lines in 2 files covered. (48.52%)

643 existing lines in 10 files now uncovered.

33838 of 37920 relevant lines covered (89.24%)

144254.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/embedded/sql/engine.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "strings"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
var ErrNoSupported = errors.New("not supported")
30
var ErrIllegalArguments = store.ErrIllegalArguments
31
var ErrMultiIndexingNotEnabled = fmt.Errorf("%w: multi-indexing must be enabled", store.ErrIllegalState)
32
var ErrParsingError = errors.New("parsing error")
33
var ErrDDLorDMLTxOnly = errors.New("transactions can NOT combine DDL and DML statements")
34
var ErrUnspecifiedMultiDBHandler = fmt.Errorf("%w: unspecified multidbHanlder", store.ErrIllegalState)
35
var ErrDatabaseDoesNotExist = errors.New("database does not exist")
36
var ErrDatabaseAlreadyExists = errors.New("database already exists")
37
var ErrTableAlreadyExists = errors.New("table already exists")
38
var ErrTableDoesNotExist = errors.New("table does not exist")
39
var ErrColumnDoesNotExist = errors.New("column does not exist")
40
var ErrColumnAlreadyExists = errors.New("column already exists")
41
var ErrCantDropIndexedColumn = errors.New("can not drop indexed column")
42
var ErrSameOldAndNewNames = errors.New("same old and new names")
43
var ErrColumnNotIndexed = errors.New("column is not indexed")
44
var ErrFunctionDoesNotExist = errors.New("function does not exist")
45
var ErrLimitedKeyType = errors.New("indexed key of unsupported type or exceeded length")
46
var ErrLimitedAutoIncrement = errors.New("only INTEGER single-column primary keys can be set as auto incremental")
47
var ErrLimitedMaxLen = errors.New("only VARCHAR and BLOB types support max length")
48
var ErrDuplicatedColumn = errors.New("duplicated column")
49
var ErrInvalidColumn = errors.New("invalid column")
50
var ErrReservedWord = errors.New("reserved word")
51
var ErrPKCanNotBeNull = errors.New("primary key can not be null")
52
var ErrPKCanNotBeUpdated = errors.New("primary key can not be updated")
53
var ErrNotNullableColumnCannotBeNull = errors.New("not nullable column can not be null")
54
var ErrNewColumnMustBeNullable = errors.New("new column must be nullable")
55
var ErrIndexAlreadyExists = errors.New("index already exists")
56
var ErrMaxNumberOfColumnsInIndexExceeded = errors.New("number of columns in multi-column index exceeded")
57
var ErrNoAvailableIndex = errors.New("no available index")
58
var ErrIndexNotFound = errors.New("index not found")
59
var ErrInvalidNumberOfValues = errors.New("invalid number of values provided")
60
var ErrInvalidValue = errors.New("invalid value provided")
61
var ErrInferredMultipleTypes = errors.New("inferred multiple types")
62
var ErrExpectingDQLStmt = errors.New("illegal statement. DQL statement expected")
63
var ErrLimitedOrderBy = errors.New("order is limit to one indexed column")
64
var ErrLimitedGroupBy = errors.New("group by requires ordering by the grouping column")
65
var ErrIllegalMappedKey = errors.New("error illegal mapped key")
66
var ErrCorruptedData = store.ErrCorruptedData
67
var ErrBrokenCatalogColSpecExpirable = fmt.Errorf("%w: catalog column entry set as expirable", ErrCorruptedData)
68
var ErrNoMoreRows = store.ErrNoMoreEntries
69
var ErrInvalidTypes = errors.New("invalid types")
70
var ErrUnsupportedJoinType = errors.New("unsupported join type")
71
var ErrInvalidCondition = errors.New("invalid condition")
72
var ErrHavingClauseRequiresGroupClause = errors.New("having clause requires group clause")
73
var ErrNotComparableValues = errors.New("values are not comparable")
74
var ErrNumericTypeExpected = errors.New("numeric type expected")
75
var ErrUnexpected = errors.New("unexpected error")
76
var ErrMaxKeyLengthExceeded = errors.New("max key length exceeded")
77
var ErrMaxLengthExceeded = errors.New("max length exceeded")
78
var ErrColumnIsNotAnAggregation = errors.New("column is not an aggregation")
79
var ErrLimitedCount = errors.New("only unbounded counting is supported i.e. COUNT(*)")
80
var ErrTxDoesNotExist = errors.New("tx does not exist")
81
var ErrNestedTxNotSupported = errors.New("nested tx are not supported")
82
var ErrNoOngoingTx = errors.New("no ongoing transaction")
83
var ErrNonTransactionalStmt = errors.New("non transactional statement")
84
var ErrDivisionByZero = errors.New("division by zero")
85
var ErrMissingParameter = errors.New("missing parameter")
86
var ErrUnsupportedParameter = errors.New("unsupported parameter")
87
var ErrDuplicatedParameters = errors.New("duplicated parameters")
88
var ErrLimitedIndexCreation = errors.New("unique index creation is only supported on empty tables")
89
var ErrTooManyRows = errors.New("too many rows")
90
var ErrAlreadyClosed = store.ErrAlreadyClosed
91
var ErrAmbiguousSelector = errors.New("ambiguous selector")
92
var ErrUnsupportedCast = fmt.Errorf("%w: unsupported cast", ErrInvalidValue)
93
var ErrColumnMismatchInUnionStmt = errors.New("column mismatch in union statement")
94

95
var MaxKeyLen = 512
96

97
const EncIDLen = 4
98
const EncLenLen = 4
99

100
const MaxNumberOfColumnsInIndex = 8
101

102
type Engine struct {
103
        store *store.ImmuStore
104

105
        prefix                        []byte
106
        distinctLimit                 int
107
        autocommit                    bool
108
        lazyIndexConstraintValidation bool
109

110
        multidbHandler MultiDBHandler
111
}
112

113
type MultiDBHandler interface {
114
        ListDatabases(ctx context.Context) ([]string, error)
115
        CreateDatabase(ctx context.Context, db string, ifNotExists bool) error
116
        UseDatabase(ctx context.Context, db string) error
117
        ListUsers(ctx context.Context) ([]User, error)
118
        CreateUser(ctx context.Context, username, password string, permission Permission) error
119
        AlterUser(ctx context.Context, username, password string, permission Permission) error
120
        DropUser(ctx context.Context, username string) error
121
        ExecPreparedStmts(ctx context.Context, opts *TxOptions, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error)
122
}
123

124
type User interface {
125
        Username() string
126
        Permission() uint32
127
}
128

129
func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) {
1,458✔
130
        if st == nil {
1,459✔
131
                return nil, ErrIllegalArguments
1✔
132
        }
1✔
133

134
        if !st.MultiIndexingEnabled() {
1,458✔
135
                return nil, ErrMultiIndexingNotEnabled
1✔
136
        }
1✔
137

138
        err := opts.Validate()
1,456✔
139
        if err != nil {
1,456✔
UNCOV
140
                return nil, err
×
UNCOV
141
        }
×
142

143
        e := &Engine{
1,456✔
144
                store:                         st,
1,456✔
145
                prefix:                        make([]byte, len(opts.prefix)),
1,456✔
146
                distinctLimit:                 opts.distinctLimit,
1,456✔
147
                autocommit:                    opts.autocommit,
1,456✔
148
                lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation,
1,456✔
149
                multidbHandler:                opts.multidbHandler,
1,456✔
150
        }
1,456✔
151

1,456✔
152
        copy(e.prefix, opts.prefix)
1,456✔
153

1,456✔
154
        err = st.InitIndexing(&store.IndexSpec{
1,456✔
155
                SourcePrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,456✔
156
                TargetPrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,456✔
157
                InjectiveMapping: true,
1,456✔
158
        })
1,456✔
159
        if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
1,456✔
UNCOV
160
                return nil, err
×
UNCOV
161
        }
×
162

163
        // TODO: find a better way to handle parsing errors
164
        yyErrorVerbose = true
1,456✔
165

1,456✔
166
        return e, nil
1,456✔
167
}
168

169
func (e *Engine) NewTx(ctx context.Context, opts *TxOptions) (*SQLTx, error) {
1,915✔
170
        err := opts.Validate()
1,915✔
171
        if err != nil {
1,915✔
UNCOV
172
                return nil, err
×
UNCOV
173
        }
×
174

175
        var mode store.TxMode
1,915✔
176
        if opts.ReadOnly {
2,481✔
177
                mode = store.ReadOnlyTx
566✔
178
        } else {
1,915✔
179
                mode = store.ReadWriteTx
1,349✔
180
        }
1,349✔
181

182
        txOpts := &store.TxOptions{
1,915✔
183
                Mode:                    mode,
1,915✔
184
                SnapshotMustIncludeTxID: opts.SnapshotMustIncludeTxID,
1,915✔
185
                SnapshotRenewalPeriod:   opts.SnapshotRenewalPeriod,
1,915✔
186
                UnsafeMVCC:              opts.UnsafeMVCC,
1,915✔
187
        }
1,915✔
188

1,915✔
189
        tx, err := e.store.NewTx(ctx, txOpts)
1,915✔
190
        if err != nil {
1,916✔
191
                return nil, err
1✔
192
        }
1✔
193

194
        if len(opts.Extra) > 0 {
2,123✔
195
                txmd := store.NewTxMetadata()
209✔
196
                err := txmd.WithExtra(opts.Extra)
209✔
197
                if err != nil {
209✔
UNCOV
198
                        return nil, err
×
UNCOV
199
                }
×
200

201
                tx.WithMetadata(txmd)
209✔
202
        }
203

204
        catalog := newCatalog(e.prefix)
1,914✔
205

1,914✔
206
        err = catalog.load(ctx, tx)
1,914✔
207
        if err != nil {
1,915✔
208
                return nil, err
1✔
209
        }
1✔
210

211
        for _, table := range catalog.GetTables() {
4,177✔
212
                primaryIndex := table.primaryIndex
2,264✔
213

2,264✔
214
                rowEntryPrefix := MapKey(
2,264✔
215
                        e.prefix,
2,264✔
216
                        RowPrefix,
2,264✔
217
                        EncodeID(DatabaseID),
2,264✔
218
                        EncodeID(table.id),
2,264✔
219
                        EncodeID(primaryIndex.id),
2,264✔
220
                )
2,264✔
221

2,264✔
222
                mappedPKEntryPrefix := MapKey(
2,264✔
223
                        e.prefix,
2,264✔
224
                        MappedPrefix,
2,264✔
225
                        EncodeID(table.id),
2,264✔
226
                        EncodeID(primaryIndex.id),
2,264✔
227
                )
2,264✔
228

2,264✔
229
                err = e.store.InitIndexing(&store.IndexSpec{
2,264✔
230
                        SourcePrefix: rowEntryPrefix,
2,264✔
231

2,264✔
232
                        TargetEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
2,264✔
233
                        TargetPrefix:      mappedPKEntryPrefix,
2,264✔
234

2,264✔
235
                        InjectiveMapping: true,
2,264✔
236
                })
2,264✔
237
                if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
2,264✔
UNCOV
238
                        return nil, err
×
UNCOV
239
                }
×
240

241
                for _, index := range table.indexes {
6,103✔
242
                        if index.IsPrimary() {
6,103✔
243
                                continue
2,264✔
244
                        }
245

246
                        mappedEntryPrefix := MapKey(
1,575✔
247
                                e.prefix,
1,575✔
248
                                MappedPrefix,
1,575✔
249
                                EncodeID(table.id),
1,575✔
250
                                EncodeID(index.id),
1,575✔
251
                        )
1,575✔
252

1,575✔
253
                        err = e.store.InitIndexing(&store.IndexSpec{
1,575✔
254
                                SourcePrefix:      rowEntryPrefix,
1,575✔
255
                                SourceEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
1,575✔
256
                                TargetEntryMapper: indexEntryMapperFor(index, primaryIndex),
1,575✔
257
                                TargetPrefix:      mappedEntryPrefix,
1,575✔
258

1,575✔
259
                                InjectiveMapping: true,
1,575✔
260
                        })
1,575✔
261
                        if errors.Is(err, store.ErrIndexAlreadyInitialized) {
3,026✔
262
                                continue
1,451✔
263
                        }
264
                        if err != nil {
124✔
UNCOV
265
                                return nil, err
×
266
                        }
×
267
                }
268

269
                if table.autoIncrementPK {
3,536✔
270
                        encMaxPK, err := loadMaxPK(ctx, e.prefix, tx, table)
1,272✔
271
                        if errors.Is(err, store.ErrNoMoreEntries) {
1,508✔
272
                                continue
236✔
273
                        }
274
                        if err != nil {
1,036✔
275
                                return nil, err
×
UNCOV
276
                        }
×
277

278
                        if len(encMaxPK) != 9 {
1,036✔
UNCOV
279
                                return nil, ErrCorruptedData
×
UNCOV
280
                        }
×
281

282
                        if encMaxPK[0] != KeyValPrefixNotNull {
1,036✔
UNCOV
283
                                return nil, ErrCorruptedData
×
UNCOV
284
                        }
×
285

286
                        // map to signed integer space
287
                        encMaxPK[1] ^= 0x80
1,036✔
288

1,036✔
289
                        table.maxPK = int64(binary.BigEndian.Uint64(encMaxPK[1:]))
1,036✔
290
                }
291
        }
292

293
        return &SQLTx{
1,913✔
294
                engine:           e,
1,913✔
295
                opts:             opts,
1,913✔
296
                tx:               tx,
1,913✔
297
                catalog:          catalog,
1,913✔
298
                lastInsertedPKs:  make(map[string]int64),
1,913✔
299
                firstInsertedPKs: make(map[string]int64),
1,913✔
300
        }, nil
1,913✔
301
}
302

303
func indexEntryMapperFor(index, primaryIndex *Index) store.EntryMapper {
5,414✔
304
        // value={count (colID valLen val)+})
5,414✔
305
        // key=M.{tableID}{indexID}({null}({val}{padding}{valLen})?)+({pkVal}{padding}{pkValLen})+
5,414✔
306

5,414✔
307
        valueExtractor := func(value []byte, valuesByColID map[uint32]TypedValue) error {
9,018✔
308
                voff := 0
3,604✔
309

3,604✔
310
                cols := int(binary.BigEndian.Uint32(value[voff:]))
3,604✔
311
                voff += EncLenLen
3,604✔
312

3,604✔
313
                for i := 0; i < cols; i++ {
14,852✔
314
                        if len(value) < EncIDLen {
11,248✔
UNCOV
315
                                return fmt.Errorf("key is lower than required")
×
UNCOV
316
                        }
×
317

318
                        colID := binary.BigEndian.Uint32(value[voff:])
11,248✔
319
                        voff += EncIDLen
11,248✔
320

11,248✔
321
                        col, err := index.table.GetColumnByID(colID)
11,248✔
322
                        if errors.Is(err, ErrColumnDoesNotExist) {
11,328✔
323
                                vlen := int(binary.BigEndian.Uint32(value[voff:]))
80✔
324
                                voff += EncLenLen + vlen
80✔
325
                                continue
80✔
326
                        } else if err != nil {
11,168✔
UNCOV
327
                                return err
×
UNCOV
328
                        }
×
329

330
                        val, n, err := DecodeValue(value[voff:], col.colType)
11,168✔
331
                        if err != nil {
11,168✔
UNCOV
332
                                return err
×
UNCOV
333
                        }
×
334

335
                        voff += n
11,168✔
336

11,168✔
337
                        valuesByColID[colID] = val
11,168✔
338
                }
339

340
                return nil
3,604✔
341
        }
342

343
        return func(key, value []byte) ([]byte, error) {
9,018✔
344
                encodedValues := make([][]byte, 2+len(index.cols)+1)
3,604✔
345
                encodedValues[0] = EncodeID(index.table.id)
3,604✔
346
                encodedValues[1] = EncodeID(index.id)
3,604✔
347

3,604✔
348
                valuesByColID := make(map[uint32]TypedValue, len(index.cols))
3,604✔
349

3,604✔
350
                for _, col := range index.table.cols {
15,250✔
351
                        valuesByColID[col.id] = &NullValue{t: col.colType}
11,646✔
352
                }
11,646✔
353

354
                err := valueExtractor(value, valuesByColID)
3,604✔
355
                if err != nil {
3,604✔
UNCOV
356
                        return nil, err
×
UNCOV
357
                }
×
358

359
                for i, col := range index.cols {
7,270✔
360
                        encKey, _, err := EncodeValueAsKey(valuesByColID[col.id], col.Type(), col.MaxLen())
3,666✔
361
                        if err != nil {
3,666✔
362
                                return nil, err
×
UNCOV
363
                        }
×
364

365
                        encodedValues[2+i] = encKey
3,666✔
366
                }
367

368
                pkEncVals, err := encodedKey(primaryIndex, valuesByColID)
3,604✔
369
                if err != nil {
3,604✔
UNCOV
370
                        return nil, err
×
UNCOV
371
                }
×
372

373
                encodedValues[len(encodedValues)-1] = pkEncVals
3,604✔
374

3,604✔
375
                return MapKey(index.enginePrefix(), MappedPrefix, encodedValues...), nil
3,604✔
376
        }
377
}
378

379
func (e *Engine) Exec(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
948✔
380
        stmts, err := Parse(strings.NewReader(sql))
948✔
381
        if err != nil {
951✔
382
                return nil, nil, fmt.Errorf("%w: %v", ErrParsingError, err)
3✔
383
        }
3✔
384

385
        return e.ExecPreparedStmts(ctx, tx, stmts, params)
945✔
386
}
387

388
func (e *Engine) ExecPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
1,446✔
389
        ntx, ctxs, pendingStmts, err := e.execPreparedStmts(ctx, tx, stmts, params)
1,446✔
390
        if err != nil {
1,568✔
391
                return ntx, ctxs, err
122✔
392
        }
122✔
393

394
        if len(pendingStmts) > 0 {
1,326✔
395
                // a different database was selected
2✔
396

2✔
397
                if e.multidbHandler == nil || ntx != nil {
2✔
UNCOV
398
                        return ntx, ctxs, fmt.Errorf("%w: all statements should have been executed when not using a multidbHandler", ErrUnexpected)
×
UNCOV
399
                }
×
400

401
                var opts *TxOptions
2✔
402

2✔
403
                if tx != nil {
3✔
404
                        opts = tx.opts
1✔
405
                } else {
2✔
406
                        opts = DefaultTxOptions()
1✔
407
                }
1✔
408

409
                ntx, hctxs, err := e.multidbHandler.ExecPreparedStmts(ctx, opts, pendingStmts, params)
2✔
410

2✔
411
                return ntx, append(ctxs, hctxs...), err
2✔
412
        }
413

414
        return ntx, ctxs, nil
1,322✔
415
}
416

417
func (e *Engine) execPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, pendingStmts []SQLStmt, err error) {
1,446✔
418
        if len(stmts) == 0 {
1,446✔
UNCOV
419
                return nil, nil, stmts, ErrIllegalArguments
×
UNCOV
420
        }
×
421

422
        nparams, err := normalizeParams(params)
1,446✔
423
        if err != nil {
1,447✔
424
                return nil, nil, stmts, err
1✔
425
        }
1✔
426

427
        currTx := tx
1,445✔
428

1,445✔
429
        execStmts := 0
1,445✔
430

1,445✔
431
        for _, stmt := range stmts {
2,979✔
432
                if stmt == nil {
1,534✔
UNCOV
433
                        return nil, nil, stmts[execStmts:], ErrIllegalArguments
×
UNCOV
434
                }
×
435

436
                _, isDBSelectionStmt := stmt.(*UseDatabaseStmt)
1,534✔
437

1,534✔
438
                // handle the case when working in non-autocommit mode outside a transaction block
1,534✔
439
                if isDBSelectionStmt && (currTx != nil && !currTx.Closed()) && !currTx.IsExplicitCloseRequired() {
1,535✔
440
                        err = currTx.Commit(ctx)
1✔
441
                        if err == nil {
2✔
442
                                committedTxs = append(committedTxs, currTx)
1✔
443
                        }
1✔
444
                        if err != nil {
1✔
UNCOV
445
                                return nil, committedTxs, stmts[execStmts:], err
×
UNCOV
446
                        }
×
447
                }
448

449
                if currTx == nil || currTx.Closed() {
2,439✔
450
                        var opts *TxOptions
905✔
451

905✔
452
                        if currTx != nil {
911✔
453
                                opts = currTx.opts
6✔
454
                        } else if tx != nil {
906✔
455
                                opts = tx.opts
1✔
456
                        } else {
899✔
457
                                opts = DefaultTxOptions()
898✔
458
                        }
898✔
459

460
                        // begin tx with implicit commit
461
                        currTx, err = e.NewTx(ctx, opts)
905✔
462
                        if err != nil {
906✔
463
                                return nil, committedTxs, stmts[execStmts:], err
1✔
464
                        }
1✔
465
                }
466

467
                ntx, err := stmt.execAt(ctx, currTx, nparams)
1,533✔
468
                if err != nil {
1,653✔
469
                        currTx.Cancel()
120✔
470
                        return nil, committedTxs, stmts[execStmts:], err
120✔
471
                }
120✔
472

473
                if !currTx.Closed() && !currTx.IsExplicitCloseRequired() && e.autocommit {
1,421✔
474
                        err = currTx.Commit(ctx)
8✔
475
                        if err != nil {
8✔
UNCOV
476
                                return nil, committedTxs, stmts[execStmts:], err
×
UNCOV
477
                        }
×
478
                }
479

480
                if currTx.Closed() {
1,570✔
481
                        committedTxs = append(committedTxs, currTx)
157✔
482
                }
157✔
483

484
                currTx = ntx
1,413✔
485

1,413✔
486
                execStmts++
1,413✔
487

1,413✔
488
                if isDBSelectionStmt && e.multidbHandler != nil {
1,418✔
489
                        break
5✔
490
                }
491
        }
492

493
        if currTx != nil && !currTx.Closed() && !currTx.IsExplicitCloseRequired() {
2,275✔
494
                err = currTx.Commit(ctx)
951✔
495
                if err != nil {
951✔
UNCOV
496
                        return nil, committedTxs, stmts[execStmts:], err
×
UNCOV
497
                }
×
498

499
                committedTxs = append(committedTxs, currTx)
951✔
500
        }
501

502
        if currTx != nil && currTx.Closed() {
2,278✔
503
                currTx = nil
954✔
504
        }
954✔
505

506
        return currTx, committedTxs, stmts[execStmts:], nil
1,324✔
507
}
508

509
func (e *Engine) Query(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (RowReader, error) {
286✔
510
        stmts, err := Parse(strings.NewReader(sql))
286✔
511
        if err != nil {
287✔
512
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
1✔
513
        }
1✔
514
        if len(stmts) != 1 {
286✔
515
                return nil, ErrExpectingDQLStmt
1✔
516
        }
1✔
517

518
        stmt, ok := stmts[0].(DataSource)
284✔
519
        if !ok {
285✔
520
                return nil, ErrExpectingDQLStmt
1✔
521
        }
1✔
522

523
        return e.QueryPreparedStmt(ctx, tx, stmt, params)
283✔
524
}
525

526
func (e *Engine) QueryPreparedStmt(ctx context.Context, tx *SQLTx, stmt DataSource, params map[string]interface{}) (rowReader RowReader, err error) {
417✔
527
        if stmt == nil {
418✔
528
                return nil, ErrIllegalArguments
1✔
529
        }
1✔
530

531
        qtx := tx
416✔
532

416✔
533
        if qtx == nil {
708✔
534
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
292✔
535
                if err != nil {
293✔
536
                        return nil, err
1✔
537
                }
1✔
538
                defer func() {
582✔
539
                        if err != nil {
313✔
540
                                qtx.Cancel()
22✔
541
                        }
22✔
542
                }()
543
        }
544

545
        nparams, err := normalizeParams(params)
415✔
546
        if err != nil {
416✔
547
                return nil, err
1✔
548
        }
1✔
549

550
        _, err = stmt.execAt(ctx, qtx, nparams)
414✔
551
        if err != nil {
422✔
552
                return nil, err
8✔
553
        }
8✔
554

555
        r, err := stmt.Resolve(ctx, qtx, nparams, nil)
406✔
556
        if err != nil {
422✔
557
                return nil, err
16✔
558
        }
16✔
559

560
        if tx == nil {
659✔
561
                r.onClose(func() {
532✔
562
                        qtx.Cancel()
263✔
563
                })
263✔
564
        }
565

566
        return r, nil
390✔
567
}
568

569
func (e *Engine) Catalog(ctx context.Context, tx *SQLTx) (catalog *Catalog, err error) {
11✔
570
        qtx := tx
11✔
571

11✔
572
        if qtx == nil {
22✔
573
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
11✔
574
                if err != nil {
11✔
UNCOV
575
                        return nil, err
×
UNCOV
576
                }
×
577
                defer qtx.Cancel()
11✔
578
        }
579

580
        return qtx.Catalog(), nil
11✔
581
}
582

583
func (e *Engine) InferParameters(ctx context.Context, tx *SQLTx, sql string) (params map[string]SQLValueType, err error) {
65✔
584
        stmts, err := Parse(strings.NewReader(sql))
65✔
585
        if err != nil {
67✔
586
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
2✔
587
        }
2✔
588

589
        return e.InferParametersPreparedStmts(ctx, tx, stmts)
63✔
590
}
591

592
func (e *Engine) InferParametersPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt) (params map[string]SQLValueType, err error) {
76✔
593
        if len(stmts) == 0 {
77✔
594
                return nil, ErrIllegalArguments
1✔
595
        }
1✔
596

597
        qtx := tx
75✔
598

75✔
599
        if qtx == nil {
150✔
600
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
75✔
601
                if err != nil {
75✔
UNCOV
602
                        return nil, err
×
UNCOV
603
                }
×
604
                defer qtx.Cancel()
75✔
605
        }
606

607
        params = make(map[string]SQLValueType)
75✔
608

75✔
609
        for _, stmt := range stmts {
155✔
610
                err = stmt.inferParameters(ctx, qtx, params)
80✔
611
                if err != nil {
94✔
612
                        return nil, err
14✔
613
                }
14✔
614
        }
615

616
        return params, nil
61✔
617
}
618

619
func normalizeParams(params map[string]interface{}) (map[string]interface{}, error) {
1,861✔
620
        nparams := make(map[string]interface{}, len(params))
1,861✔
621

1,861✔
622
        for name, value := range params {
2,498✔
623
                nname := strings.ToLower(name)
637✔
624

637✔
625
                _, exists := nparams[nname]
637✔
626
                if exists {
639✔
627
                        return nil, ErrDuplicatedParameters
2✔
628
                }
2✔
629

630
                nparams[nname] = value
635✔
631
        }
632

633
        return nparams, nil
1,859✔
634
}
635

636
// CopyCatalogToTx copies the current sql catalog to the ongoing transaction.
637
func (e *Engine) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
42✔
638
        if tx == nil {
57✔
639
                return ErrIllegalArguments
15✔
640
        }
15✔
641

642
        catalog := newCatalog(e.prefix)
27✔
643

27✔
644
        err := catalog.addSchemaToTx(ctx, e.prefix, tx)
27✔
645
        if err != nil {
27✔
UNCOV
646
                return err
×
UNCOV
647
        }
×
648

649
        return nil
27✔
650
}
651

652
func (e *Engine) GetStore() *store.ImmuStore {
175✔
653
        return e.store
175✔
654
}
175✔
655

656
func (e *Engine) GetPrefix() []byte {
17✔
657
        return e.prefix
17✔
658
}
17✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc