• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6458421232

09 Oct 2023 03:04PM UTC coverage: 89.499% (+0.2%) from 89.257%
6458421232

push

gh-ci

jeroiraz
test(embedded/tbtree): nodeRef coverage

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33451 of 37376 relevant lines covered (89.5%)

144180.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/embedded/sql/engine.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "strings"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
var ErrNoSupported = errors.New("not supported")
30
var ErrIllegalArguments = store.ErrIllegalArguments
31
var ErrMultiIndexingNotEnabled = fmt.Errorf("%w: multi-indexing must be enabled", store.ErrIllegalState)
32
var ErrParsingError = errors.New("parsing error")
33
var ErrDDLorDMLTxOnly = errors.New("transactions can NOT combine DDL and DML statements")
34
var ErrUnspecifiedMultiDBHandler = fmt.Errorf("%w: unspecified multidbHanlder", store.ErrIllegalState)
35
var ErrDatabaseDoesNotExist = errors.New("database does not exist")
36
var ErrDatabaseAlreadyExists = errors.New("database already exists")
37
var ErrTableAlreadyExists = errors.New("table already exists")
38
var ErrTableDoesNotExist = errors.New("table does not exist")
39
var ErrColumnDoesNotExist = errors.New("column does not exist")
40
var ErrColumnAlreadyExists = errors.New("column already exists")
41
var ErrCantDropIndexedColumn = errors.New("can not drop indexed column")
42
var ErrSameOldAndNewColumnName = errors.New("same old and new column names")
43
var ErrColumnNotIndexed = errors.New("column is not indexed")
44
var ErrFunctionDoesNotExist = errors.New("function does not exist")
45
var ErrLimitedKeyType = errors.New("indexed key of unsupported type or exceeded length")
46
var ErrLimitedAutoIncrement = errors.New("only INTEGER single-column primary keys can be set as auto incremental")
47
var ErrLimitedMaxLen = errors.New("only VARCHAR and BLOB types support max length")
48
var ErrDuplicatedColumn = errors.New("duplicated column")
49
var ErrInvalidColumn = errors.New("invalid column")
50
var ErrReservedWord = errors.New("reserved word")
51
var ErrPKCanNotBeNull = errors.New("primary key can not be null")
52
var ErrPKCanNotBeUpdated = errors.New("primary key can not be updated")
53
var ErrNotNullableColumnCannotBeNull = errors.New("not nullable column can not be null")
54
var ErrNewColumnMustBeNullable = errors.New("new column must be nullable")
55
var ErrIndexAlreadyExists = errors.New("index already exists")
56
var ErrMaxNumberOfColumnsInIndexExceeded = errors.New("number of columns in multi-column index exceeded")
57
var ErrNoAvailableIndex = errors.New("no available index")
58
var ErrIndexNotFound = errors.New("index not found")
59
var ErrInvalidNumberOfValues = errors.New("invalid number of values provided")
60
var ErrInvalidValue = errors.New("invalid value provided")
61
var ErrInferredMultipleTypes = errors.New("inferred multiple types")
62
var ErrExpectingDQLStmt = errors.New("illegal statement. DQL statement expected")
63
var ErrLimitedOrderBy = errors.New("order is limit to one indexed column")
64
var ErrLimitedGroupBy = errors.New("group by requires ordering by the grouping column")
65
var ErrIllegalMappedKey = errors.New("error illegal mapped key")
66
var ErrCorruptedData = store.ErrCorruptedData
67
var ErrBrokenCatalogColSpecExpirable = fmt.Errorf("%w: catalog column entry set as expirable", ErrCorruptedData)
68
var ErrNoMoreRows = store.ErrNoMoreEntries
69
var ErrInvalidTypes = errors.New("invalid types")
70
var ErrUnsupportedJoinType = errors.New("unsupported join type")
71
var ErrInvalidCondition = errors.New("invalid condition")
72
var ErrHavingClauseRequiresGroupClause = errors.New("having clause requires group clause")
73
var ErrNotComparableValues = errors.New("values are not comparable")
74
var ErrNumericTypeExpected = errors.New("numeric type expected")
75
var ErrUnexpected = errors.New("unexpected error")
76
var ErrMaxKeyLengthExceeded = errors.New("max key length exceeded")
77
var ErrMaxLengthExceeded = errors.New("max length exceeded")
78
var ErrColumnIsNotAnAggregation = errors.New("column is not an aggregation")
79
var ErrLimitedCount = errors.New("only unbounded counting is supported i.e. COUNT(*)")
80
var ErrTxDoesNotExist = errors.New("tx does not exist")
81
var ErrNestedTxNotSupported = errors.New("nested tx are not supported")
82
var ErrNoOngoingTx = errors.New("no ongoing transaction")
83
var ErrNonTransactionalStmt = errors.New("non transactional statement")
84
var ErrDivisionByZero = errors.New("division by zero")
85
var ErrMissingParameter = errors.New("missing parameter")
86
var ErrUnsupportedParameter = errors.New("unsupported parameter")
87
var ErrDuplicatedParameters = errors.New("duplicated parameters")
88
var ErrLimitedIndexCreation = errors.New("unique index creation is only supported on empty tables")
89
var ErrTooManyRows = errors.New("too many rows")
90
var ErrAlreadyClosed = store.ErrAlreadyClosed
91
var ErrAmbiguousSelector = errors.New("ambiguous selector")
92
var ErrUnsupportedCast = fmt.Errorf("%w: unsupported cast", ErrInvalidValue)
93
var ErrColumnMismatchInUnionStmt = errors.New("column mismatch in union statement")
94

95
var MaxKeyLen = 512
96

97
const EncIDLen = 4
98
const EncLenLen = 4
99

100
const MaxNumberOfColumnsInIndex = 8
101

102
type Engine struct {
103
        store *store.ImmuStore
104

105
        prefix                        []byte
106
        distinctLimit                 int
107
        autocommit                    bool
108
        lazyIndexConstraintValidation bool
109

110
        multidbHandler MultiDBHandler
111
}
112

113
type MultiDBHandler interface {
114
        ListDatabases(ctx context.Context) ([]string, error)
115
        CreateDatabase(ctx context.Context, db string, ifNotExists bool) error
116
        UseDatabase(ctx context.Context, db string) error
117
        ExecPreparedStmts(ctx context.Context, opts *TxOptions, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error)
118
}
119

120
func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) {
1,440✔
121
        if st == nil {
1,441✔
122
                return nil, ErrIllegalArguments
1✔
123
        }
1✔
124

125
        if !st.MultiIndexingEnabled() {
1,440✔
126
                return nil, ErrMultiIndexingNotEnabled
1✔
127
        }
1✔
128

129
        err := opts.Validate()
1,438✔
130
        if err != nil {
1,438✔
131
                return nil, err
×
132
        }
×
133

134
        e := &Engine{
1,438✔
135
                store:                         st,
1,438✔
136
                prefix:                        make([]byte, len(opts.prefix)),
1,438✔
137
                distinctLimit:                 opts.distinctLimit,
1,438✔
138
                autocommit:                    opts.autocommit,
1,438✔
139
                lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation,
1,438✔
140
                multidbHandler:                opts.multidbHandler,
1,438✔
141
        }
1,438✔
142

1,438✔
143
        copy(e.prefix, opts.prefix)
1,438✔
144

1,438✔
145
        err = st.InitIndexing(&store.IndexSpec{
1,438✔
146
                SourcePrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,438✔
147
                TargetPrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,438✔
148
                InjectiveMapping: true,
1,438✔
149
        })
1,438✔
150
        if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
1,438✔
151
                return nil, err
×
152
        }
×
153

154
        // TODO: find a better way to handle parsing errors
155
        yyErrorVerbose = true
1,438✔
156

1,438✔
157
        return e, nil
1,438✔
158
}
159

160
func (e *Engine) NewTx(ctx context.Context, opts *TxOptions) (*SQLTx, error) {
1,894✔
161
        err := opts.Validate()
1,894✔
162
        if err != nil {
1,894✔
163
                return nil, err
×
164
        }
×
165

166
        var mode store.TxMode
1,894✔
167
        if opts.ReadOnly {
2,453✔
168
                mode = store.ReadOnlyTx
559✔
169
        } else {
1,894✔
170
                mode = store.ReadWriteTx
1,335✔
171
        }
1,335✔
172

173
        txOpts := &store.TxOptions{
1,894✔
174
                Mode:                    mode,
1,894✔
175
                SnapshotMustIncludeTxID: opts.SnapshotMustIncludeTxID,
1,894✔
176
                SnapshotRenewalPeriod:   opts.SnapshotRenewalPeriod,
1,894✔
177
                UnsafeMVCC:              opts.UnsafeMVCC,
1,894✔
178
        }
1,894✔
179

1,894✔
180
        tx, err := e.store.NewTx(ctx, txOpts)
1,894✔
181
        if err != nil {
1,895✔
182
                return nil, err
1✔
183
        }
1✔
184

185
        if len(opts.Extra) > 0 {
2,102✔
186
                txmd := store.NewTxMetadata()
209✔
187
                err := txmd.WithExtra(opts.Extra)
209✔
188
                if err != nil {
209✔
189
                        return nil, err
×
190
                }
×
191

192
                tx.WithMetadata(txmd)
209✔
193
        }
194

195
        catalog := newCatalog(e.prefix)
1,893✔
196

1,893✔
197
        err = catalog.load(ctx, tx)
1,893✔
198
        if err != nil {
1,894✔
199
                return nil, err
1✔
200
        }
1✔
201

202
        for _, table := range catalog.GetTables() {
4,152✔
203
                primaryIndex := table.primaryIndex
2,260✔
204

2,260✔
205
                rowEntryPrefix := MapKey(
2,260✔
206
                        e.prefix,
2,260✔
207
                        RowPrefix,
2,260✔
208
                        EncodeID(DatabaseID),
2,260✔
209
                        EncodeID(table.id),
2,260✔
210
                        EncodeID(primaryIndex.id),
2,260✔
211
                )
2,260✔
212

2,260✔
213
                mappedPKEntryPrefix := MapKey(
2,260✔
214
                        e.prefix,
2,260✔
215
                        MappedPrefix,
2,260✔
216
                        EncodeID(table.id),
2,260✔
217
                        EncodeID(primaryIndex.id),
2,260✔
218
                )
2,260✔
219

2,260✔
220
                err = e.store.InitIndexing(&store.IndexSpec{
2,260✔
221
                        SourcePrefix: rowEntryPrefix,
2,260✔
222

2,260✔
223
                        TargetEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
2,260✔
224
                        TargetPrefix:      mappedPKEntryPrefix,
2,260✔
225

2,260✔
226
                        InjectiveMapping: true,
2,260✔
227
                })
2,260✔
228
                if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
2,260✔
229
                        return nil, err
×
230
                }
×
231

232
                for _, index := range table.indexes {
6,091✔
233
                        if index.IsPrimary() {
6,091✔
234
                                continue
2,260✔
235
                        }
236

237
                        mappedEntryPrefix := MapKey(
1,571✔
238
                                e.prefix,
1,571✔
239
                                MappedPrefix,
1,571✔
240
                                EncodeID(table.id),
1,571✔
241
                                EncodeID(index.id),
1,571✔
242
                        )
1,571✔
243

1,571✔
244
                        err = e.store.InitIndexing(&store.IndexSpec{
1,571✔
245
                                SourcePrefix:      rowEntryPrefix,
1,571✔
246
                                SourceEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
1,571✔
247
                                TargetEntryMapper: indexEntryMapperFor(index, primaryIndex),
1,571✔
248
                                TargetPrefix:      mappedEntryPrefix,
1,571✔
249

1,571✔
250
                                InjectiveMapping: true,
1,571✔
251
                        })
1,571✔
252
                        if errors.Is(err, store.ErrIndexAlreadyInitialized) {
3,018✔
253
                                continue
1,447✔
254
                        }
255
                        if err != nil {
124✔
256
                                return nil, err
×
257
                        }
×
258
                }
259

260
                if table.autoIncrementPK {
3,532✔
261
                        encMaxPK, err := loadMaxPK(ctx, e.prefix, tx, table)
1,272✔
262
                        if errors.Is(err, store.ErrNoMoreEntries) {
1,507✔
263
                                continue
235✔
264
                        }
265
                        if err != nil {
1,037✔
266
                                return nil, err
×
267
                        }
×
268

269
                        if len(encMaxPK) != 9 {
1,037✔
270
                                return nil, ErrCorruptedData
×
271
                        }
×
272

273
                        if encMaxPK[0] != KeyValPrefixNotNull {
1,037✔
274
                                return nil, ErrCorruptedData
×
275
                        }
×
276

277
                        // map to signed integer space
278
                        encMaxPK[1] ^= 0x80
1,037✔
279

1,037✔
280
                        table.maxPK = int64(binary.BigEndian.Uint64(encMaxPK[1:]))
1,037✔
281
                }
282
        }
283

284
        return &SQLTx{
1,892✔
285
                engine:           e,
1,892✔
286
                opts:             opts,
1,892✔
287
                tx:               tx,
1,892✔
288
                catalog:          catalog,
1,892✔
289
                lastInsertedPKs:  make(map[string]int64),
1,892✔
290
                firstInsertedPKs: make(map[string]int64),
1,892✔
291
        }, nil
1,892✔
292
}
293

294
func indexEntryMapperFor(index, primaryIndex *Index) store.EntryMapper {
5,402✔
295
        // value={count (colID valLen val)+})
5,402✔
296
        // key=M.{tableID}{indexID}({null}({val}{padding}{valLen})?)+({pkVal}{padding}{pkValLen})+
5,402✔
297

5,402✔
298
        valueExtractor := func(value []byte, valuesByColID map[uint32]TypedValue) error {
8,078✔
299
                voff := 0
2,676✔
300

2,676✔
301
                cols := int(binary.BigEndian.Uint32(value[voff:]))
2,676✔
302
                voff += EncLenLen
2,676✔
303

2,676✔
304
                for i := 0; i < cols; i++ {
11,325✔
305
                        if len(value) < EncIDLen {
8,649✔
306
                                return fmt.Errorf("key is lower than required")
×
307
                        }
×
308

309
                        colID := binary.BigEndian.Uint32(value[voff:])
8,649✔
310
                        voff += EncIDLen
8,649✔
311

8,649✔
312
                        col, err := index.table.GetColumnByID(colID)
8,649✔
313
                        if errors.Is(err, ErrColumnDoesNotExist) {
8,713✔
314
                                vlen := int(binary.BigEndian.Uint32(value[voff:]))
64✔
315
                                voff += EncLenLen + vlen
64✔
316
                                continue
64✔
317
                        } else if err != nil {
8,585✔
318
                                return err
×
319
                        }
×
320

321
                        val, n, err := DecodeValue(value[voff:], col.colType)
8,585✔
322
                        if err != nil {
8,585✔
323
                                return err
×
324
                        }
×
325

326
                        voff += n
8,585✔
327

8,585✔
328
                        valuesByColID[colID] = val
8,585✔
329
                }
330

331
                return nil
2,676✔
332
        }
333

334
        encodedValues := make([][]byte, 2+len(index.cols)+1)
5,402✔
335
        encodedValues[0] = EncodeID(index.table.id)
5,402✔
336
        encodedValues[1] = EncodeID(index.id)
5,402✔
337

5,402✔
338
        return func(key, value []byte) ([]byte, error) {
8,078✔
339
                valuesByColID := make(map[uint32]TypedValue, len(index.cols))
2,676✔
340

2,676✔
341
                for _, col := range index.table.cols {
11,600✔
342
                        valuesByColID[col.id] = &NullValue{t: col.colType}
8,924✔
343
                }
8,924✔
344

345
                err := valueExtractor(value, valuesByColID)
2,676✔
346
                if err != nil {
2,676✔
347
                        return nil, err
×
348
                }
×
349

350
                for i, col := range index.cols {
5,410✔
351
                        encKey, _, err := EncodeValueAsKey(valuesByColID[col.id], col.Type(), col.MaxLen())
2,734✔
352
                        if err != nil {
2,734✔
353
                                return nil, err
×
354
                        }
×
355

356
                        encodedValues[2+i] = encKey
2,734✔
357
                }
358

359
                pkEncVals, err := encodedKey(primaryIndex, valuesByColID)
2,676✔
360
                if err != nil {
2,676✔
361
                        return nil, err
×
362
                }
×
363

364
                encodedValues[len(encodedValues)-1] = pkEncVals
2,676✔
365

2,676✔
366
                return MapKey(index.enginePrefix(), MappedPrefix, encodedValues...), nil
2,676✔
367
        }
368
}
369

370
func (e *Engine) Exec(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
928✔
371
        stmts, err := Parse(strings.NewReader(sql))
928✔
372
        if err != nil {
931✔
373
                return nil, nil, fmt.Errorf("%w: %v", ErrParsingError, err)
3✔
374
        }
3✔
375

376
        return e.ExecPreparedStmts(ctx, tx, stmts, params)
925✔
377
}
378

379
func (e *Engine) ExecPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
1,417✔
380
        ntx, ctxs, pendingStmts, err := e.execPreparedStmts(ctx, tx, stmts, params)
1,417✔
381
        if err != nil {
1,534✔
382
                return ntx, ctxs, err
117✔
383
        }
117✔
384

385
        if len(pendingStmts) > 0 {
1,302✔
386
                // a different database was selected
2✔
387

2✔
388
                if e.multidbHandler == nil || ntx != nil {
2✔
389
                        return ntx, ctxs, fmt.Errorf("%w: all statements should have been executed when not using a multidbHandler", ErrUnexpected)
×
390
                }
×
391

392
                var opts *TxOptions
2✔
393

2✔
394
                if tx != nil {
3✔
395
                        opts = tx.opts
1✔
396
                } else {
2✔
397
                        opts = DefaultTxOptions()
1✔
398
                }
1✔
399

400
                ntx, hctxs, err := e.multidbHandler.ExecPreparedStmts(ctx, opts, pendingStmts, params)
2✔
401

2✔
402
                return ntx, append(ctxs, hctxs...), err
2✔
403
        }
404

405
        return ntx, ctxs, nil
1,298✔
406
}
407

408
func (e *Engine) execPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, pendingStmts []SQLStmt, err error) {
1,417✔
409
        if len(stmts) == 0 {
1,417✔
410
                return nil, nil, stmts, ErrIllegalArguments
×
411
        }
×
412

413
        nparams, err := normalizeParams(params)
1,417✔
414
        if err != nil {
1,418✔
415
                return nil, nil, stmts, err
1✔
416
        }
1✔
417

418
        currTx := tx
1,416✔
419

1,416✔
420
        execStmts := 0
1,416✔
421

1,416✔
422
        for _, stmt := range stmts {
2,921✔
423
                if stmt == nil {
1,505✔
424
                        return nil, nil, stmts[execStmts:], ErrIllegalArguments
×
425
                }
×
426

427
                _, isDBSelectionStmt := stmt.(*UseDatabaseStmt)
1,505✔
428

1,505✔
429
                // handle the case when working in non-autocommit mode outside a transaction block
1,505✔
430
                if isDBSelectionStmt && (currTx != nil && !currTx.Closed()) && !currTx.IsExplicitCloseRequired() {
1,506✔
431
                        err = currTx.Commit(ctx)
1✔
432
                        if err == nil {
2✔
433
                                committedTxs = append(committedTxs, currTx)
1✔
434
                        }
1✔
435
                        if err != nil {
1✔
436
                                return nil, committedTxs, stmts[execStmts:], err
×
437
                        }
×
438
                }
439

440
                if currTx == nil || currTx.Closed() {
2,404✔
441
                        var opts *TxOptions
899✔
442

899✔
443
                        if currTx != nil {
905✔
444
                                opts = currTx.opts
6✔
445
                        } else if tx != nil {
900✔
446
                                opts = tx.opts
1✔
447
                        } else {
893✔
448
                                opts = DefaultTxOptions()
892✔
449
                        }
892✔
450

451
                        // begin tx with implicit commit
452
                        currTx, err = e.NewTx(ctx, opts)
899✔
453
                        if err != nil {
900✔
454
                                return nil, committedTxs, stmts[execStmts:], err
1✔
455
                        }
1✔
456
                }
457

458
                ntx, err := stmt.execAt(ctx, currTx, nparams)
1,504✔
459
                if err != nil {
1,619✔
460
                        currTx.Cancel()
115✔
461
                        return nil, committedTxs, stmts[execStmts:], err
115✔
462
                }
115✔
463

464
                if !currTx.Closed() && !currTx.IsExplicitCloseRequired() && e.autocommit {
1,397✔
465
                        err = currTx.Commit(ctx)
8✔
466
                        if err != nil {
8✔
467
                                return nil, committedTxs, stmts[execStmts:], err
×
468
                        }
×
469
                }
470

471
                if currTx.Closed() {
1,546✔
472
                        committedTxs = append(committedTxs, currTx)
157✔
473
                }
157✔
474

475
                currTx = ntx
1,389✔
476

1,389✔
477
                execStmts++
1,389✔
478

1,389✔
479
                if isDBSelectionStmt && e.multidbHandler != nil {
1,394✔
480
                        break
5✔
481
                }
482
        }
483

484
        if currTx != nil && !currTx.Closed() && !currTx.IsExplicitCloseRequired() {
2,249✔
485
                err = currTx.Commit(ctx)
949✔
486
                if err != nil {
949✔
487
                        return nil, committedTxs, stmts[execStmts:], err
×
488
                }
×
489

490
                committedTxs = append(committedTxs, currTx)
949✔
491
        }
492

493
        if currTx != nil && currTx.Closed() {
2,252✔
494
                currTx = nil
952✔
495
        }
952✔
496

497
        return currTx, committedTxs, stmts[execStmts:], nil
1,300✔
498
}
499

500
func (e *Engine) Query(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (RowReader, error) {
281✔
501
        stmts, err := Parse(strings.NewReader(sql))
281✔
502
        if err != nil {
282✔
503
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
1✔
504
        }
1✔
505
        if len(stmts) != 1 {
281✔
506
                return nil, ErrExpectingDQLStmt
1✔
507
        }
1✔
508

509
        stmt, ok := stmts[0].(DataSource)
279✔
510
        if !ok {
280✔
511
                return nil, ErrExpectingDQLStmt
1✔
512
        }
1✔
513

514
        return e.QueryPreparedStmt(ctx, tx, stmt, params)
278✔
515
}
516

517
func (e *Engine) QueryPreparedStmt(ctx context.Context, tx *SQLTx, stmt DataSource, params map[string]interface{}) (rowReader RowReader, err error) {
407✔
518
        if stmt == nil {
408✔
519
                return nil, ErrIllegalArguments
1✔
520
        }
1✔
521

522
        qtx := tx
406✔
523

406✔
524
        if qtx == nil {
697✔
525
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
291✔
526
                if err != nil {
292✔
527
                        return nil, err
1✔
528
                }
1✔
529
                defer func() {
580✔
530
                        if err != nil {
312✔
531
                                qtx.Cancel()
22✔
532
                        }
22✔
533
                }()
534
        }
535

536
        nparams, err := normalizeParams(params)
405✔
537
        if err != nil {
406✔
538
                return nil, err
1✔
539
        }
1✔
540

541
        _, err = stmt.execAt(ctx, qtx, nparams)
404✔
542
        if err != nil {
412✔
543
                return nil, err
8✔
544
        }
8✔
545

546
        r, err := stmt.Resolve(ctx, qtx, nparams, nil)
396✔
547
        if err != nil {
412✔
548
                return nil, err
16✔
549
        }
16✔
550

551
        if tx == nil {
648✔
552
                r.onClose(func() {
530✔
553
                        qtx.Cancel()
262✔
554
                })
262✔
555
        }
556

557
        return r, nil
380✔
558
}
559

560
func (e *Engine) Catalog(ctx context.Context, tx *SQLTx) (catalog *Catalog, err error) {
11✔
561
        qtx := tx
11✔
562

11✔
563
        if qtx == nil {
22✔
564
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
11✔
565
                if err != nil {
11✔
566
                        return nil, err
×
567
                }
×
568
                defer qtx.Cancel()
11✔
569
        }
570

571
        return qtx.Catalog(), nil
11✔
572
}
573

574
func (e *Engine) InferParameters(ctx context.Context, tx *SQLTx, sql string) (params map[string]SQLValueType, err error) {
65✔
575
        stmts, err := Parse(strings.NewReader(sql))
65✔
576
        if err != nil {
67✔
577
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
2✔
578
        }
2✔
579

580
        return e.InferParametersPreparedStmts(ctx, tx, stmts)
63✔
581
}
582

583
func (e *Engine) InferParametersPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt) (params map[string]SQLValueType, err error) {
76✔
584
        if len(stmts) == 0 {
77✔
585
                return nil, ErrIllegalArguments
1✔
586
        }
1✔
587

588
        qtx := tx
75✔
589

75✔
590
        if qtx == nil {
150✔
591
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
75✔
592
                if err != nil {
75✔
593
                        return nil, err
×
594
                }
×
595
                defer qtx.Cancel()
75✔
596
        }
597

598
        params = make(map[string]SQLValueType)
75✔
599

75✔
600
        for _, stmt := range stmts {
155✔
601
                err = stmt.inferParameters(ctx, qtx, params)
80✔
602
                if err != nil {
94✔
603
                        return nil, err
14✔
604
                }
14✔
605
        }
606

607
        return params, nil
61✔
608
}
609

610
func normalizeParams(params map[string]interface{}) (map[string]interface{}, error) {
1,822✔
611
        nparams := make(map[string]interface{}, len(params))
1,822✔
612

1,822✔
613
        for name, value := range params {
2,459✔
614
                nname := strings.ToLower(name)
637✔
615

637✔
616
                _, exists := nparams[nname]
637✔
617
                if exists {
639✔
618
                        return nil, ErrDuplicatedParameters
2✔
619
                }
2✔
620

621
                nparams[nname] = value
635✔
622
        }
623

624
        return nparams, nil
1,820✔
625
}
626

627
// CopyCatalogToTx copies the current sql catalog to the ongoing transaction.
628
func (e *Engine) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
42✔
629
        if tx == nil {
57✔
630
                return ErrIllegalArguments
15✔
631
        }
15✔
632

633
        catalog := newCatalog(e.prefix)
27✔
634

27✔
635
        err := catalog.addSchemaToTx(ctx, e.prefix, tx)
27✔
636
        if err != nil {
27✔
637
                return err
×
638
        }
×
639

640
        return nil
27✔
641
}
642

643
func (e *Engine) GetStore() *store.ImmuStore {
164✔
644
        return e.store
164✔
645
}
164✔
646

647
func (e *Engine) GetPrefix() []byte {
16✔
648
        return e.prefix
16✔
649
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc