• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24236186926

10 Apr 2026 09:25AM UTC coverage: 89.169% (-0.09%) from 89.257%
24236186926

push

gh-ci

SimoneLazzaris
fix workflows

38207 of 42848 relevant lines covered (89.17%)

151869.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.32
/embedded/sql/engine.go
1
/*
2
Copyright 2025 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "strings"
25

26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
var (
30
        ErrNoSupported                            = errors.New("not supported")
31
        ErrIllegalArguments                       = store.ErrIllegalArguments
32
        ErrMultiIndexingNotEnabled                = fmt.Errorf("%w: multi-indexing must be enabled", store.ErrIllegalState)
33
        ErrParsingError                           = errors.New("parsing error")
34
        ErrDDLorDMLTxOnly                         = errors.New("transactions can NOT combine DDL and DML statements")
35
        ErrUnspecifiedMultiDBHandler              = fmt.Errorf("%w: unspecified multidbHanlder", store.ErrIllegalState)
36
        ErrDatabaseDoesNotExist                   = errors.New("database does not exist")
37
        ErrDatabaseAlreadyExists                  = errors.New("database already exists")
38
        ErrTableAlreadyExists                     = errors.New("table already exists")
39
        ErrTableDoesNotExist                      = errors.New("table does not exist")
40
        ErrColumnDoesNotExist                     = errors.New("column does not exist")
41
        ErrColumnAlreadyExists                    = errors.New("column already exists")
42
        ErrCannotDropColumn                       = errors.New("cannot drop column")
43
        ErrSameOldAndNewNames                     = errors.New("same old and new names")
44
        ErrColumnNotIndexed                       = errors.New("column is not indexed")
45
        ErrFunctionDoesNotExist                   = errors.New("function does not exist")
46
        ErrLimitedKeyType                         = errors.New("indexed key of unsupported type or exceeded length")
47
        ErrLimitedAutoIncrement                   = errors.New("only INTEGER single-column primary keys can be set as auto incremental")
48
        ErrLimitedMaxLen                          = errors.New("only VARCHAR and BLOB types support max length")
49
        ErrDuplicatedColumn                       = errors.New("duplicated column")
50
        ErrInvalidColumn                          = errors.New("invalid column")
51
        ErrInvalidCheckConstraint                 = errors.New("invalid check constraint")
52
        ErrCheckConstraintViolation               = errors.New("check constraint violation")
53
        ErrReservedWord                           = errors.New("reserved word")
54
        ErrNoPrimaryKey                           = errors.New("no primary key specified")
55
        ErrPKCanNotBeNull                         = errors.New("primary key can not be null")
56
        ErrPKCanNotBeUpdated                      = errors.New("primary key can not be updated")
57
        ErrMultiplePrimaryKeys                    = errors.New("multiple primary keys are not allowed")
58
        ErrNotNullableColumnCannotBeNull          = errors.New("not nullable column can not be null")
59
        ErrNewColumnMustBeNullable                = errors.New("new column must be nullable")
60
        ErrIndexAlreadyExists                     = errors.New("index already exists")
61
        ErrMaxNumberOfColumnsInIndexExceeded      = errors.New("number of columns in multi-column index exceeded")
62
        ErrIndexNotFound                          = errors.New("index not found")
63
        ErrConstraintNotFound                     = errors.New("constraint not found")
64
        ErrInvalidNumberOfValues                  = errors.New("invalid number of values provided")
65
        ErrInvalidValue                           = errors.New("invalid value provided")
66
        ErrInferredMultipleTypes                  = errors.New("inferred multiple types")
67
        ErrExpectingDQLStmt                       = errors.New("illegal statement. DQL statement expected")
68
        ErrColumnMustAppearInGroupByOrAggregation = errors.New("must appear in the group by clause or be used in an aggregated function")
69
        ErrIllegalMappedKey                       = errors.New("error illegal mapped key")
70
        ErrCorruptedData                          = store.ErrCorruptedData
71
        ErrBrokenCatalogColSpecExpirable          = fmt.Errorf("%w: catalog column entry set as expirable", ErrCorruptedData)
72
        ErrBrokenCatalogCheckConstraintExpirable  = fmt.Errorf("%w: catalog check constraint set as expirable", ErrCorruptedData)
73
        ErrNoMoreRows                             = store.ErrNoMoreEntries
74
        ErrInvalidTypes                           = errors.New("invalid types")
75
        ErrUnsupportedJoinType                    = errors.New("unsupported join type")
76
        ErrInvalidCondition                       = errors.New("invalid condition")
77
        ErrHavingClauseRequiresGroupClause        = errors.New("having clause requires group clause")
78
        ErrNotComparableValues                    = errors.New("values are not comparable")
79
        ErrNumericTypeExpected                    = errors.New("numeric type expected")
80
        ErrUnexpected                             = errors.New("unexpected error")
81
        ErrMaxKeyLengthExceeded                   = errors.New("max key length exceeded")
82
        ErrMaxLengthExceeded                      = errors.New("max length exceeded")
83
        ErrColumnIsNotAnAggregation               = errors.New("column is not an aggregation")
84
        ErrLimitedCount                           = errors.New("only unbounded counting is supported i.e. COUNT(*)")
85
        ErrTxDoesNotExist                         = errors.New("tx does not exist")
86
        ErrNestedTxNotSupported                   = errors.New("nested tx are not supported")
87
        ErrNoOngoingTx                            = errors.New("no ongoing transaction")
88
        ErrNonTransactionalStmt                   = errors.New("non transactional statement")
89
        ErrDivisionByZero                         = errors.New("division by zero")
90
        ErrMissingParameter                       = errors.New("missing parameter")
91
        ErrUnsupportedParameter                   = errors.New("unsupported parameter")
92
        ErrDuplicatedParameters                   = errors.New("duplicated parameters")
93
        ErrLimitedIndexCreation                   = errors.New("unique index creation is only supported on empty tables")
94
        ErrTooManyRows                            = errors.New("too many rows")
95
        ErrAlreadyClosed                          = store.ErrAlreadyClosed
96
        ErrAmbiguousSelector                      = errors.New("ambiguous selector")
97
        ErrUnsupportedCast                        = fmt.Errorf("%w: unsupported cast", ErrInvalidValue)
98
        ErrColumnMismatchInUnionStmt              = errors.New("column mismatch in union statement")
99
        ErrCannotIndexJson                        = errors.New("cannot index column of type JSON")
100
        ErrInvalidTxMetadata                      = errors.New("invalid transaction metadata")
101
        ErrAccessDenied                           = errors.New("access denied")
102
        ErrDiffRequiresPeriod                     = errors.New("DIFF requires both SINCE/AFTER and UNTIL/BEFORE clauses")
103
)
104

105
var MaxKeyLen = 512
106

107
const (
108
        EncIDLen  = 4
109
        EncLenLen = 4
110
)
111

112
const MaxNumberOfColumnsInIndex = 8
113

114
type Engine struct {
115
        store *store.ImmuStore
116

117
        prefix                        []byte
118
        distinctLimit                 int
119
        sortBufferSize                int
120
        autocommit                    bool
121
        lazyIndexConstraintValidation bool
122
        parseTxMetadata               func([]byte) (map[string]interface{}, error)
123
        multidbHandler                MultiDBHandler
124
        tableResolvers                map[string]TableResolver
125
}
126

127
type MultiDBHandler interface {
128
        ListDatabases(ctx context.Context) ([]string, error)
129
        CreateDatabase(ctx context.Context, db string, ifNotExists bool) error
130
        UseDatabase(ctx context.Context, db string) error
131
        GetLoggedUser(ctx context.Context) (User, error)
132
        ListUsers(ctx context.Context) ([]User, error)
133
        CreateUser(ctx context.Context, username, password string, permission Permission) error
134
        AlterUser(ctx context.Context, username, password string, permission Permission) error
135
        GrantSQLPrivileges(ctx context.Context, database, username string, privileges []SQLPrivilege) error
136
        RevokeSQLPrivileges(ctx context.Context, database, username string, privileges []SQLPrivilege) error
137
        DropUser(ctx context.Context, username string) error
138
        ExecPreparedStmts(ctx context.Context, opts *TxOptions, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error)
139
}
140

141
type TableResolver interface {
142
        Table() string
143
        Resolve(ctx context.Context, tx *SQLTx, alias string) (RowReader, error)
144
}
145

146
type User interface {
147
        Username() string
148
        Permission() Permission
149
        SQLPrivileges() []SQLPrivilege
150
}
151

152
func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) {
1,482✔
153
        if st == nil {
1,483✔
154
                return nil, ErrIllegalArguments
1✔
155
        }
1✔
156

157
        if !st.MultiIndexingEnabled() {
1,482✔
158
                return nil, ErrMultiIndexingNotEnabled
1✔
159
        }
1✔
160

161
        err := opts.Validate()
1,480✔
162
        if err != nil {
1,480✔
163
                return nil, err
×
164
        }
×
165

166
        e := &Engine{
1,480✔
167
                store:                         st,
1,480✔
168
                prefix:                        make([]byte, len(opts.prefix)),
1,480✔
169
                distinctLimit:                 opts.distinctLimit,
1,480✔
170
                sortBufferSize:                opts.sortBufferSize,
1,480✔
171
                autocommit:                    opts.autocommit,
1,480✔
172
                lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation,
1,480✔
173
                parseTxMetadata:               opts.parseTxMetadata,
1,480✔
174
                multidbHandler:                opts.multidbHandler,
1,480✔
175
        }
1,480✔
176

1,480✔
177
        copy(e.prefix, opts.prefix)
1,480✔
178

1,480✔
179
        err = st.InitIndexing(&store.IndexSpec{
1,480✔
180
                SourcePrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,480✔
181
                TargetPrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,480✔
182
                InjectiveMapping: true,
1,480✔
183
        })
1,480✔
184
        if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
1,480✔
185
                return nil, err
×
186
        }
×
187

188
        for _, r := range opts.tableResolvers {
3,542✔
189
                e.registerTableResolver(r.Table(), r)
2,062✔
190
        }
2,062✔
191

192
        // TODO: find a better way to handle parsing errors
193
        yyErrorVerbose = true
1,480✔
194

1,480✔
195
        return e, nil
1,480✔
196
}
197

198
func (e *Engine) NewTx(ctx context.Context, opts *TxOptions) (*SQLTx, error) {
3,652✔
199
        err := opts.Validate()
3,652✔
200
        if err != nil {
3,652✔
201
                return nil, err
×
202
        }
×
203

204
        var mode store.TxMode
3,652✔
205
        if opts.ReadOnly {
4,380✔
206
                mode = store.ReadOnlyTx
728✔
207
        } else {
3,652✔
208
                mode = store.ReadWriteTx
2,924✔
209
        }
2,924✔
210

211
        txOpts := &store.TxOptions{
3,652✔
212
                Mode:                    mode,
3,652✔
213
                SnapshotMustIncludeTxID: opts.SnapshotMustIncludeTxID,
3,652✔
214
                SnapshotRenewalPeriod:   opts.SnapshotRenewalPeriod,
3,652✔
215
                UnsafeMVCC:              opts.UnsafeMVCC,
3,652✔
216
        }
3,652✔
217

3,652✔
218
        tx, err := e.store.NewTx(ctx, txOpts)
3,652✔
219
        if err != nil {
3,653✔
220
                return nil, err
1✔
221
        }
1✔
222

223
        if len(opts.Extra) > 0 {
3,927✔
224
                txmd := store.NewTxMetadata()
276✔
225
                err := txmd.WithExtra(opts.Extra)
276✔
226
                if err != nil {
276✔
227
                        return nil, err
×
228
                }
×
229

230
                tx.WithMetadata(txmd)
276✔
231
        }
232

233
        catalog := newCatalog(e.prefix)
3,651✔
234

3,651✔
235
        err = catalog.load(ctx, tx)
3,651✔
236
        if err != nil {
3,652✔
237
                return nil, err
1✔
238
        }
1✔
239

240
        for _, table := range catalog.GetTables() {
7,970✔
241
                primaryIndex := table.primaryIndex
4,320✔
242

4,320✔
243
                rowEntryPrefix := MapKey(
4,320✔
244
                        e.prefix,
4,320✔
245
                        RowPrefix,
4,320✔
246
                        EncodeID(DatabaseID),
4,320✔
247
                        EncodeID(table.id),
4,320✔
248
                        EncodeID(primaryIndex.id),
4,320✔
249
                )
4,320✔
250

4,320✔
251
                mappedPKEntryPrefix := MapKey(
4,320✔
252
                        e.prefix,
4,320✔
253
                        MappedPrefix,
4,320✔
254
                        EncodeID(table.id),
4,320✔
255
                        EncodeID(primaryIndex.id),
4,320✔
256
                )
4,320✔
257

4,320✔
258
                err = e.store.InitIndexing(&store.IndexSpec{
4,320✔
259
                        SourcePrefix: rowEntryPrefix,
4,320✔
260

4,320✔
261
                        TargetEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
4,320✔
262
                        TargetPrefix:      mappedPKEntryPrefix,
4,320✔
263

4,320✔
264
                        InjectiveMapping: true,
4,320✔
265
                })
4,320✔
266
                if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
4,320✔
267
                        return nil, err
×
268
                }
×
269

270
                for _, index := range table.indexes {
10,359✔
271
                        if index.IsPrimary() {
10,359✔
272
                                continue
4,320✔
273
                        }
274

275
                        mappedEntryPrefix := MapKey(
1,719✔
276
                                e.prefix,
1,719✔
277
                                MappedPrefix,
1,719✔
278
                                EncodeID(table.id),
1,719✔
279
                                EncodeID(index.id),
1,719✔
280
                        )
1,719✔
281

1,719✔
282
                        err = e.store.InitIndexing(&store.IndexSpec{
1,719✔
283
                                SourcePrefix:      rowEntryPrefix,
1,719✔
284
                                SourceEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
1,719✔
285
                                TargetEntryMapper: indexEntryMapperFor(index, primaryIndex),
1,719✔
286
                                TargetPrefix:      mappedEntryPrefix,
1,719✔
287

1,719✔
288
                                InjectiveMapping: true,
1,719✔
289
                        })
1,719✔
290
                        if errors.Is(err, store.ErrIndexAlreadyInitialized) {
3,308✔
291
                                continue
1,589✔
292
                        }
293
                        if err != nil {
130✔
294
                                return nil, err
×
295
                        }
×
296
                }
297

298
                if table.autoIncrementPK {
6,387✔
299
                        encMaxPK, err := loadMaxPK(ctx, e.prefix, tx, table)
2,067✔
300
                        if errors.Is(err, store.ErrNoMoreEntries) {
2,422✔
301
                                continue
355✔
302
                        }
303
                        if err != nil {
1,712✔
304
                                return nil, err
×
305
                        }
×
306

307
                        if len(encMaxPK) != 9 {
1,712✔
308
                                return nil, ErrCorruptedData
×
309
                        }
×
310

311
                        if encMaxPK[0] != KeyValPrefixNotNull {
1,712✔
312
                                return nil, ErrCorruptedData
×
313
                        }
×
314

315
                        // map to signed integer space
316
                        encMaxPK[1] ^= 0x80
1,712✔
317

1,712✔
318
                        table.maxPK = int64(binary.BigEndian.Uint64(encMaxPK[1:]))
1,712✔
319
                }
320
        }
321

322
        return &SQLTx{
3,650✔
323
                engine:           e,
3,650✔
324
                opts:             opts,
3,650✔
325
                tx:               tx,
3,650✔
326
                catalog:          catalog,
3,650✔
327
                lastInsertedPKs:  make(map[string]int64),
3,650✔
328
                firstInsertedPKs: make(map[string]int64),
3,650✔
329
        }, nil
3,650✔
330
}
331

332
func indexEntryMapperFor(index, primaryIndex *Index) store.EntryMapper {
7,758✔
333
        // value={count (colID valLen val)+})
7,758✔
334
        // key=M.{tableID}{indexID}({null}({val}{padding}{valLen})?)+({pkVal}{padding}{pkValLen})+
7,758✔
335

7,758✔
336
        valueExtractor := func(value []byte, valuesByColID map[uint32]TypedValue) error {
21,179✔
337
                voff := 0
13,421✔
338

13,421✔
339
                cols := int(binary.BigEndian.Uint32(value[voff:]))
13,421✔
340
                voff += EncLenLen
13,421✔
341

13,421✔
342
                for i := 0; i < cols; i++ {
78,172✔
343
                        if len(value) < EncIDLen {
64,751✔
344
                                return fmt.Errorf("key is lower than required")
×
345
                        }
×
346

347
                        colID := binary.BigEndian.Uint32(value[voff:])
64,751✔
348
                        voff += EncIDLen
64,751✔
349

64,751✔
350
                        col, err := index.table.GetColumnByID(colID)
64,751✔
351
                        if errors.Is(err, ErrColumnDoesNotExist) {
64,831✔
352
                                vlen := int(binary.BigEndian.Uint32(value[voff:]))
80✔
353
                                voff += EncLenLen + vlen
80✔
354
                                continue
80✔
355
                        } else if err != nil {
64,671✔
356
                                return err
×
357
                        }
×
358

359
                        val, n, err := DecodeValue(value[voff:], col.colType)
64,671✔
360
                        if err != nil {
64,671✔
361
                                return err
×
362
                        }
×
363

364
                        voff += n
64,671✔
365

64,671✔
366
                        valuesByColID[colID] = val
64,671✔
367
                }
368

369
                return nil
13,421✔
370
        }
371

372
        return func(key, value []byte) ([]byte, error) {
21,179✔
373
                encodedValues := make([][]byte, 2+len(index.cols)+1)
13,421✔
374
                encodedValues[0] = EncodeID(index.table.id)
13,421✔
375
                encodedValues[1] = EncodeID(index.id)
13,421✔
376

13,421✔
377
                valuesByColID := make(map[uint32]TypedValue, len(index.cols))
13,421✔
378

13,421✔
379
                for _, col := range index.table.cols {
79,086✔
380
                        valuesByColID[col.id] = &NullValue{t: col.colType}
65,665✔
381
                }
65,665✔
382

383
                err := valueExtractor(value, valuesByColID)
13,421✔
384
                if err != nil {
13,421✔
385
                        return nil, err
×
386
                }
×
387

388
                for i, col := range index.cols {
30,036✔
389
                        encKey, _, err := EncodeValueAsKey(valuesByColID[col.id], col.Type(), col.MaxLen())
16,615✔
390
                        if err != nil {
16,615✔
391
                                return nil, err
×
392
                        }
×
393

394
                        encodedValues[2+i] = encKey
16,615✔
395
                }
396

397
                pkEncVals, err := encodedKey(primaryIndex, valuesByColID)
13,421✔
398
                if err != nil {
13,421✔
399
                        return nil, err
×
400
                }
×
401

402
                encodedValues[len(encodedValues)-1] = pkEncVals
13,421✔
403

13,421✔
404
                return MapKey(index.enginePrefix(), MappedPrefix, encodedValues...), nil
13,421✔
405
        }
406
}
407

408
func (e *Engine) Exec(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
2,494✔
409
        stmts, err := ParseSQL(strings.NewReader(sql))
2,494✔
410
        if err != nil {
2,497✔
411
                return nil, nil, fmt.Errorf("%w: %v", ErrParsingError, err)
3✔
412
        }
3✔
413

414
        return e.ExecPreparedStmts(ctx, tx, stmts, params)
2,491✔
415
}
416

417
func (e *Engine) ExecPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
3,019✔
418
        ntx, ctxs, pendingStmts, err := e.execPreparedStmts(ctx, tx, stmts, params)
3,019✔
419
        if err != nil {
3,173✔
420
                return ntx, ctxs, err
154✔
421
        }
154✔
422

423
        if len(pendingStmts) > 0 {
2,867✔
424
                // a different database was selected
2✔
425

2✔
426
                if e.multidbHandler == nil || ntx != nil {
2✔
427
                        return ntx, ctxs, fmt.Errorf("%w: all statements should have been executed when not using a multidbHandler", ErrUnexpected)
×
428
                }
×
429

430
                var opts *TxOptions
2✔
431

2✔
432
                if tx != nil {
3✔
433
                        opts = tx.opts
1✔
434
                } else {
2✔
435
                        opts = DefaultTxOptions()
1✔
436
                }
1✔
437

438
                ntx, hctxs, err := e.multidbHandler.ExecPreparedStmts(ctx, opts, pendingStmts, params)
2✔
439

2✔
440
                return ntx, append(ctxs, hctxs...), err
2✔
441
        }
442

443
        return ntx, ctxs, nil
2,863✔
444
}
445

446
func (e *Engine) execPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, pendingStmts []SQLStmt, err error) {
3,019✔
447
        if len(stmts) == 0 {
3,019✔
448
                return nil, nil, stmts, ErrIllegalArguments
×
449
        }
×
450

451
        nparams, err := normalizeParams(params)
3,019✔
452
        if err != nil {
3,020✔
453
                return nil, nil, stmts, err
1✔
454
        }
1✔
455

456
        currTx := tx
3,018✔
457

3,018✔
458
        execStmts := 0
3,018✔
459

3,018✔
460
        for _, stmt := range stmts {
6,140✔
461
                if stmt == nil {
3,122✔
462
                        return nil, nil, stmts[execStmts:], ErrIllegalArguments
×
463
                }
×
464

465
                _, isDBSelectionStmt := stmt.(*UseDatabaseStmt)
3,122✔
466

3,122✔
467
                // handle the case when working in non-autocommit mode outside a transaction block
3,122✔
468
                if isDBSelectionStmt && (currTx != nil && !currTx.Closed()) && !currTx.IsExplicitCloseRequired() {
3,123✔
469
                        err = currTx.Commit(ctx)
1✔
470
                        if err == nil {
2✔
471
                                committedTxs = append(committedTxs, currTx)
1✔
472
                        }
1✔
473
                        if err != nil {
1✔
474
                                return nil, committedTxs, stmts[execStmts:], err
×
475
                        }
×
476
                }
477

478
                if currTx == nil || currTx.Closed() {
5,561✔
479
                        var opts *TxOptions
2,439✔
480

2,439✔
481
                        if currTx != nil {
2,447✔
482
                                opts = currTx.opts
8✔
483
                        } else if tx != nil {
2,440✔
484
                                opts = tx.opts
1✔
485
                        } else {
2,431✔
486
                                opts = DefaultTxOptions()
2,430✔
487
                        }
2,430✔
488

489
                        // begin tx with implicit commit
490
                        currTx, err = e.NewTx(ctx, opts)
2,439✔
491
                        if err != nil {
2,440✔
492
                                return nil, committedTxs, stmts[execStmts:], err
1✔
493
                        }
1✔
494
                }
495

496
                if e.multidbHandler != nil {
3,452✔
497
                        if err := e.checkUserPermissions(ctx, stmt); err != nil {
333✔
498
                                currTx.Cancel()
2✔
499
                                return nil, committedTxs, stmts[execStmts:], err
2✔
500
                        }
2✔
501
                }
502

503
                ntx, err := stmt.execAt(ctx, currTx, nparams)
3,119✔
504
                if err != nil {
3,269✔
505
                        currTx.Cancel()
150✔
506
                        return nil, committedTxs, stmts[execStmts:], err
150✔
507
                }
150✔
508

509
                if !currTx.Closed() && !currTx.IsExplicitCloseRequired() && e.autocommit {
2,977✔
510
                        err = currTx.Commit(ctx)
8✔
511
                        if err != nil {
8✔
512
                                return nil, committedTxs, stmts[execStmts:], err
×
513
                        }
×
514
                }
515

516
                if currTx.Closed() {
3,129✔
517
                        committedTxs = append(committedTxs, currTx)
160✔
518
                }
160✔
519

520
                currTx = ntx
2,969✔
521

2,969✔
522
                execStmts++
2,969✔
523

2,969✔
524
                if isDBSelectionStmt && e.multidbHandler != nil {
2,974✔
525
                        break
5✔
526
                }
527
        }
528

529
        if currTx != nil && !currTx.Closed() && !currTx.IsExplicitCloseRequired() {
5,352✔
530
                err = currTx.Commit(ctx)
2,487✔
531
                if err != nil {
2,487✔
532
                        return nil, committedTxs, stmts[execStmts:], err
×
533
                }
×
534

535
                committedTxs = append(committedTxs, currTx)
2,487✔
536
        }
537

538
        if currTx != nil && currTx.Closed() {
5,355✔
539
                currTx = nil
2,490✔
540
        }
2,490✔
541

542
        return currTx, committedTxs, stmts[execStmts:], nil
2,865✔
543
}
544

545
func (e *Engine) checkUserPermissions(ctx context.Context, stmt SQLStmt) error {
424✔
546
        user, err := e.multidbHandler.GetLoggedUser(ctx)
424✔
547
        if err != nil {
424✔
548
                return err
×
549
        }
×
550

551
        if !stmt.readOnly() && user.Permission() == PermissionReadOnly {
426✔
552
                return fmt.Errorf("%w: statement requires %s permission", ErrAccessDenied, PermissionReadWrite)
2✔
553
        }
2✔
554

555
        requiredPrivileges := stmt.requiredPrivileges()
422✔
556
        if !hasAllPrivileges(user.SQLPrivileges(), requiredPrivileges) {
423✔
557
                return fmt.Errorf("%w: statement requires %v privileges", ErrAccessDenied, requiredPrivileges)
1✔
558
        }
1✔
559
        return nil
421✔
560
}
561

562
func hasAllPrivileges(userPrivileges, privileges []SQLPrivilege) bool {
422✔
563
        for _, p := range privileges {
724✔
564
                has := false
302✔
565
                for _, up := range userPrivileges {
994✔
566
                        if up == p {
993✔
567
                                has = true
301✔
568
                                break
301✔
569
                        }
570
                }
571

572
                if !has {
303✔
573
                        return false
1✔
574
                }
1✔
575
        }
576
        return true
421✔
577
}
578

579
func (e *Engine) queryAll(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) ([]*Row, error) {
79✔
580
        reader, err := e.Query(ctx, tx, sql, params)
79✔
581
        if err != nil {
86✔
582
                return nil, err
7✔
583
        }
7✔
584
        defer reader.Close()
72✔
585

72✔
586
        return ReadAllRows(ctx, reader)
72✔
587
}
588

589
func (e *Engine) Query(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (RowReader, error) {
439✔
590
        stmts, err := ParseSQL(strings.NewReader(sql))
439✔
591
        if err != nil {
440✔
592
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
1✔
593
        }
1✔
594
        if len(stmts) != 1 {
439✔
595
                return nil, ErrExpectingDQLStmt
1✔
596
        }
1✔
597

598
        stmt, ok := stmts[0].(DataSource)
437✔
599
        if !ok {
438✔
600
                return nil, ErrExpectingDQLStmt
1✔
601
        }
1✔
602

603
        return e.QueryPreparedStmt(ctx, tx, stmt, params)
436✔
604
}
605

606
func (e *Engine) QueryPreparedStmt(ctx context.Context, tx *SQLTx, stmt DataSource, params map[string]interface{}) (rowReader RowReader, err error) {
580✔
607
        if stmt == nil {
581✔
608
                return nil, ErrIllegalArguments
1✔
609
        }
1✔
610

611
        qtx := tx
579✔
612

579✔
613
        if qtx == nil {
1,026✔
614
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
447✔
615
                if err != nil {
448✔
616
                        return nil, err
1✔
617
                }
1✔
618
                defer func() {
892✔
619
                        if err != nil {
472✔
620
                                qtx.Cancel()
26✔
621
                        }
26✔
622
                }()
623
        }
624

625
        nparams, err := normalizeParams(params)
578✔
626
        if err != nil {
579✔
627
                return nil, err
1✔
628
        }
1✔
629

630
        if e.multidbHandler != nil {
670✔
631
                if err := e.checkUserPermissions(ctx, stmt); err != nil {
94✔
632
                        return nil, err
1✔
633
                }
1✔
634
        }
635

636
        _, err = stmt.execAt(ctx, qtx, nparams)
576✔
637
        if err != nil {
581✔
638
                return nil, err
5✔
639
        }
5✔
640

641
        r, err := stmt.Resolve(ctx, qtx, nparams, nil)
571✔
642
        if err != nil {
594✔
643
                return nil, err
23✔
644
        }
23✔
645

646
        if tx == nil {
968✔
647
                r.onClose(func() {
840✔
648
                        qtx.Cancel()
420✔
649
                })
420✔
650
        }
651

652
        return r, nil
548✔
653
}
654

655
func (e *Engine) Catalog(ctx context.Context, tx *SQLTx) (catalog *Catalog, err error) {
11✔
656
        qtx := tx
11✔
657

11✔
658
        if qtx == nil {
22✔
659
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
11✔
660
                if err != nil {
11✔
661
                        return nil, err
×
662
                }
×
663
                defer qtx.Cancel()
11✔
664
        }
665

666
        return qtx.Catalog(), nil
11✔
667
}
668

669
func (e *Engine) InferParameters(ctx context.Context, tx *SQLTx, sql string) (params map[string]SQLValueType, err error) {
66✔
670
        stmts, err := ParseSQL(strings.NewReader(sql))
66✔
671
        if err != nil {
68✔
672
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
2✔
673
        }
2✔
674
        return e.InferParametersPreparedStmts(ctx, tx, stmts)
64✔
675
}
676

677
func (e *Engine) InferParametersPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt) (params map[string]SQLValueType, err error) {
77✔
678
        if len(stmts) == 0 {
78✔
679
                return nil, ErrIllegalArguments
1✔
680
        }
1✔
681

682
        qtx := tx
76✔
683

76✔
684
        if qtx == nil {
152✔
685
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
76✔
686
                if err != nil {
76✔
687
                        return nil, err
×
688
                }
×
689
                defer qtx.Cancel()
76✔
690
        }
691

692
        params = make(map[string]SQLValueType)
76✔
693

76✔
694
        for _, stmt := range stmts {
157✔
695
                err = stmt.inferParameters(ctx, qtx, params)
81✔
696
                if err != nil {
96✔
697
                        return nil, err
15✔
698
                }
15✔
699
        }
700

701
        return params, nil
61✔
702
}
703

704
func normalizeParams(params map[string]interface{}) (map[string]interface{}, error) {
3,597✔
705
        nparams := make(map[string]interface{}, len(params))
3,597✔
706

3,597✔
707
        for name, value := range params {
10,983✔
708
                nname := strings.ToLower(name)
7,386✔
709

7,386✔
710
                _, exists := nparams[nname]
7,386✔
711
                if exists {
7,388✔
712
                        return nil, ErrDuplicatedParameters
2✔
713
                }
2✔
714

715
                nparams[nname] = value
7,384✔
716
        }
717

718
        return nparams, nil
3,595✔
719
}
720

721
// CopyCatalogToTx copies the current sql catalog to the ongoing transaction.
722
func (e *Engine) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
44✔
723
        if tx == nil {
59✔
724
                return ErrIllegalArguments
15✔
725
        }
15✔
726

727
        catalog := newCatalog(e.prefix)
29✔
728

29✔
729
        err := catalog.addSchemaToTx(ctx, tx)
29✔
730
        if err != nil {
29✔
731
                return err
×
732
        }
×
733

734
        return nil
29✔
735
}
736

737
func (e *Engine) GetStore() *store.ImmuStore {
175✔
738
        return e.store
175✔
739
}
175✔
740

741
func (e *Engine) GetPrefix() []byte {
17✔
742
        return e.prefix
17✔
743
}
17✔
744

745
func (e *Engine) tableResolveFor(tableName string) TableResolver {
16✔
746
        if e.tableResolvers == nil {
23✔
747
                return nil
7✔
748
        }
7✔
749
        return e.tableResolvers[tableName]
9✔
750
}
751

752
func (e *Engine) registerTableResolver(tableName string, r TableResolver) {
2,062✔
753
        if e.tableResolvers == nil {
2,750✔
754
                e.tableResolvers = make(map[string]TableResolver)
688✔
755
        }
688✔
756
        e.tableResolvers[tableName] = r
2,062✔
757
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc