• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.27
/embedded/sql/engine.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "strings"
25
        "sync"
26
        "sync/atomic"
27

28
        "github.com/codenotary/immudb/embedded/store"
29
)
30

31
var (
32
        ErrNoSupported                            = errors.New("not supported")
33
        ErrIllegalArguments                       = store.ErrIllegalArguments
34
        ErrMultiIndexingNotEnabled                = fmt.Errorf("%w: multi-indexing must be enabled", store.ErrIllegalState)
35
        ErrParsingError                           = errors.New("parsing error")
36
        ErrDDLorDMLTxOnly                         = errors.New("transactions can NOT combine DDL and DML statements")
37
        ErrUnspecifiedMultiDBHandler              = fmt.Errorf("%w: unspecified multidbHanlder", store.ErrIllegalState)
38
        ErrDatabaseDoesNotExist                   = errors.New("database does not exist")
39
        ErrDatabaseAlreadyExists                  = errors.New("database already exists")
40
        ErrTableAlreadyExists                     = errors.New("table already exists")
41
        ErrTableDoesNotExist                      = errors.New("table does not exist")
42
        ErrColumnDoesNotExist                     = errors.New("column does not exist")
43
        ErrColumnAlreadyExists                    = errors.New("column already exists")
44
        ErrCannotDropColumn                       = errors.New("cannot drop column")
45
        ErrSameOldAndNewNames                     = errors.New("same old and new names")
46
        ErrColumnNotIndexed                       = errors.New("column is not indexed")
47
        ErrFunctionDoesNotExist                   = errors.New("function does not exist")
48
        ErrLimitedKeyType                         = errors.New("indexed key of unsupported type or exceeded length")
49
        ErrLimitedAutoIncrement                   = errors.New("only INTEGER single-column primary keys can be set as auto incremental")
50
        ErrLimitedMaxLen                          = errors.New("only VARCHAR and BLOB types support max length")
51
        ErrDuplicatedColumn                       = errors.New("duplicated column")
52
        ErrInvalidColumn                          = errors.New("invalid column")
53
        ErrInvalidCheckConstraint                 = errors.New("invalid check constraint")
54
        ErrCheckConstraintViolation               = errors.New("check constraint violation")
55
        ErrReservedWord                           = errors.New("reserved word")
56
        ErrNoPrimaryKey                           = errors.New("no primary key specified")
57
        ErrPKCanNotBeNull                         = errors.New("primary key can not be null")
58
        ErrPKCanNotBeUpdated                      = errors.New("primary key can not be updated")
59
        ErrMultiplePrimaryKeys                    = errors.New("multiple primary keys are not allowed")
60
        ErrNotNullableColumnCannotBeNull          = errors.New("not nullable column can not be null")
61
        ErrNewColumnMustBeNullable                = errors.New("new column must be nullable")
62
        ErrIndexAlreadyExists                     = errors.New("index already exists")
63
        ErrMaxNumberOfColumnsInIndexExceeded      = errors.New("number of columns in multi-column index exceeded")
64
        ErrIndexNotFound                          = errors.New("index not found")
65
        ErrConstraintNotFound                     = errors.New("constraint not found")
66
        ErrInvalidNumberOfValues                  = errors.New("invalid number of values provided")
67
        ErrInvalidValue                           = errors.New("invalid value provided")
68
        ErrInferredMultipleTypes                  = errors.New("inferred multiple types")
69
        ErrExpectingDQLStmt                       = errors.New("illegal statement. DQL statement expected")
70
        ErrColumnMustAppearInGroupByOrAggregation = errors.New("must appear in the group by clause or be used in an aggregated function")
71
        ErrIllegalMappedKey                       = errors.New("error illegal mapped key")
72
        ErrCorruptedData                          = store.ErrCorruptedData
73
        ErrBrokenCatalogColSpecExpirable          = fmt.Errorf("%w: catalog column entry set as expirable", ErrCorruptedData)
74
        ErrBrokenCatalogCheckConstraintExpirable  = fmt.Errorf("%w: catalog check constraint set as expirable", ErrCorruptedData)
75
        ErrNoMoreRows                             = store.ErrNoMoreEntries
76
        ErrInvalidTypes                           = errors.New("invalid types")
77
        ErrUnsupportedJoinType                    = errors.New("unsupported join type")
78
        ErrInvalidCondition                       = errors.New("invalid condition")
79
        ErrHavingClauseRequiresGroupClause        = errors.New("having clause requires group clause")
80
        ErrNotComparableValues                    = errors.New("values are not comparable")
81
        ErrNumericTypeExpected                    = errors.New("numeric type expected")
82
        ErrUnexpected                             = errors.New("unexpected error")
83
        ErrMaxKeyLengthExceeded                   = errors.New("max key length exceeded")
84
        ErrMaxLengthExceeded                      = errors.New("max length exceeded")
85
        ErrColumnIsNotAnAggregation               = errors.New("column is not an aggregation")
86
        ErrLimitedCount                           = errors.New("only unbounded counting is supported i.e. COUNT(*)")
87
        ErrWindowRowsLimitExceeded                = errors.New("window function result exceeds max_window_rows limit")
88
        ErrTxDoesNotExist                         = errors.New("tx does not exist")
89
        ErrNestedTxNotSupported                   = errors.New("nested tx are not supported")
90
        ErrNoOngoingTx                            = errors.New("no ongoing transaction")
91
        ErrNonTransactionalStmt                   = errors.New("non transactional statement")
92
        ErrDivisionByZero                         = errors.New("division by zero")
93
        ErrMissingParameter                       = errors.New("missing parameter")
94
        ErrUnsupportedParameter                   = errors.New("unsupported parameter")
95
        ErrDuplicatedParameters                   = errors.New("duplicated parameters")
96
        ErrLimitedIndexCreation                   = errors.New("unique index creation is only supported on empty tables")
97
        ErrTooManyRows                            = errors.New("too many rows")
98
        ErrAlreadyClosed                          = store.ErrAlreadyClosed
99
        ErrAmbiguousSelector                      = errors.New("ambiguous selector")
100
        ErrUnsupportedCast                        = fmt.Errorf("%w: unsupported cast", ErrInvalidValue)
101
        ErrColumnMismatchInUnionStmt              = errors.New("column mismatch in union statement")
102
        ErrCannotIndexJson                        = errors.New("cannot index column of type JSON")
103
        ErrInvalidTxMetadata                      = errors.New("invalid transaction metadata")
104
        ErrAccessDenied                           = errors.New("access denied")
105
        ErrDiffRequiresPeriod                     = errors.New("DIFF requires both SINCE/AFTER and UNTIL/BEFORE clauses")
106
)
107

108
// MaxKeyLen caps the length of variable-width indexed columns (the
109
// real on-disk constraint is in the store/btree layer, currently 1024
110
// bytes — see embedded/store/immustore.go MaxKeyLen and the btree
111
// DefaultMaxKeySize). Engine code historically used 512 bytes, which
112
// is well under the storage cap; raising it to 1024 unblocks dumps
113
// that declare longer text PKs (e.g. netflix's `show_id text`) without
114
// any on-disk format change. The variable stays mutable so that
115
// embedded/sql.Options can override it at engine init time.
116
var MaxKeyLen = 1024
117

118
// CatalogCacheHitObserver and CatalogCacheMissObserver are package-level
119
// hooks the upper layers (pkg/server/metrics.go) populate to translate
120
// catalog cache events into Prometheus counters. Kept as func() vars to
121
// avoid pulling a Prometheus dependency into embedded/sql, which is a
122
// pure storage primitive shared across all higher layers.
123
//
124
// Default no-op implementations make these safe to call from NewTx
125
// regardless of whether the server has wired observers in yet.
126
var (
127
        CatalogCacheHitObserver  = func() {}
4,436✔
128
        CatalogCacheMissObserver = func() {}
714✔
129
)
130

131
const (
132
        EncIDLen  = 4
133
        EncLenLen = 4
134
)
135

136
const MaxNumberOfColumnsInIndex = 8
137

138
type Engine struct {
139
        store *store.ImmuStore
140

141
        prefix                        []byte
142
        distinctLimit                 int
143
        distinctSpillThreshold        int
144
        sortBufferSize                int
145
        maxWindowRows                 int // max rows materialized for window functions (0 = unlimited)
146
        autocommit                    bool
147
        lazyIndexConstraintValidation bool
148
        parseTxMetadata               func([]byte) (map[string]interface{}, error)
149
        multidbHandler                MultiDBHandler
150
        tableResolvers                map[string]TableResolver
151
        sequences                     map[string]*Sequence
152

153
        // catalogMu guards cachedCatalog. The cached catalog is built once from the
154
        // store and reused across read-only transactions to avoid redundant B-tree
155
        // scans on every autocommit SELECT. It is invalidated whenever a DDL
156
        // statement commits (mutatedCatalog == true).
157
        catalogMu     sync.RWMutex
158
        cachedCatalog *Catalog
159

160
        // cachedCatalogVersion is bumped on every invalidateCatalogCache call.
161
        // Each SQLTx records this counter in openCatalogVersion at NewTx time;
162
        // on commit, tryPopulateCatalogCache only accepts a populate when the
163
        // version still matches — i.e. no DDL has invalidated the cache since
164
        // this tx opened. Without this guard, a non-DDL RW tx whose snapshot
165
        // pre-dates a concurrent DDL commit could overwrite the cache with a
166
        // stale schema view (the doc's "stale-view race").
167
        cachedCatalogVersion atomic.Uint64
168
}
169

170
// Sequence represents a named auto-incrementing counter
171
type Sequence struct {
172
        name      string
173
        currValue int64
174
        increment int64
175
        started   bool
176
}
177

178
func (e *Engine) CreateSequence(name string, startValue, increment int64) {
6✔
179
        if e.sequences == nil {
10✔
180
                e.sequences = make(map[string]*Sequence)
4✔
181
        }
4✔
182
        e.sequences[name] = &Sequence{
6✔
183
                name:      name,
6✔
184
                currValue: startValue - increment, // first NEXTVAL will return startValue
6✔
185
                increment: increment,
6✔
186
        }
6✔
187
}
188

189
func (e *Engine) DropSequence(name string) bool {
5✔
190
        if e.sequences == nil {
7✔
191
                return false
2✔
192
        }
2✔
193
        _, exists := e.sequences[name]
3✔
194
        if exists {
4✔
195
                delete(e.sequences, name)
1✔
196
        }
1✔
197
        return exists
3✔
198
}
199

200
func (e *Engine) NextVal(name string) (int64, error) {
11✔
201
        if e.sequences == nil {
11✔
NEW
202
                return 0, fmt.Errorf("sequence does not exist (%s)", name)
×
NEW
203
        }
×
204
        seq, exists := e.sequences[name]
11✔
205
        if !exists {
12✔
206
                return 0, fmt.Errorf("sequence does not exist (%s)", name)
1✔
207
        }
1✔
208
        seq.currValue += seq.increment
10✔
209
        seq.started = true
10✔
210
        return seq.currValue, nil
10✔
211
}
212

213
func (e *Engine) CurrVal(name string) (int64, error) {
4✔
214
        if e.sequences == nil {
4✔
NEW
215
                return 0, fmt.Errorf("sequence does not exist (%s)", name)
×
NEW
216
        }
×
217
        seq, exists := e.sequences[name]
4✔
218
        if !exists {
4✔
NEW
219
                return 0, fmt.Errorf("sequence does not exist (%s)", name)
×
NEW
220
        }
×
221
        if !seq.started {
5✔
222
                return 0, fmt.Errorf("sequence '%s' has not been used yet", name)
1✔
223
        }
1✔
224
        return seq.currValue, nil
3✔
225
}
226

227
func (e *Engine) loadViews(ctx context.Context, tx *store.OngoingTx) {
5,102✔
228
        prefix := MapKey(e.prefix, catalogViewPrefix, EncodeID(DatabaseID))
5,102✔
229

5,102✔
230
        _ = iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
5,102✔
NEW
231
                if deleted || len(value) == 0 {
×
NEW
232
                        return nil
×
NEW
233
                }
×
234

235
                // Extract view name from key
NEW
236
                nameStart := len(prefix)
×
NEW
237
                if nameStart >= len(key) {
×
NEW
238
                        return nil
×
NEW
239
                }
×
NEW
240
                viewName := string(key[nameStart:])
×
NEW
241
                sqlText := string(value)
×
NEW
242

×
NEW
243
                // Parse the view query and register it
×
NEW
244
                stmts, err := ParseSQL(strings.NewReader(sqlText))
×
NEW
245
                if err != nil {
×
NEW
246
                        return nil // skip unparseable views
×
NEW
247
                }
×
248

NEW
249
                for _, stmt := range stmts {
×
NEW
250
                        if ds, ok := stmt.(DataSource); ok {
×
NEW
251
                                e.registerTableResolver(viewName, &viewResolver{
×
NEW
252
                                        name:  viewName,
×
NEW
253
                                        query: ds,
×
NEW
254
                                })
×
NEW
255
                                break
×
256
                        }
257
                }
258

NEW
259
                return nil
×
260
        })
261
}
262

263
func (e *Engine) loadSequences(ctx context.Context, tx *store.OngoingTx) error {
5,110✔
264
        prefix := MapKey(e.prefix, catalogSequencePrefix, EncodeID(DatabaseID))
5,110✔
265

5,110✔
266
        return iteratePrefix(ctx, tx, prefix, func(key, value []byte, deleted bool) error {
5,110✔
NEW
267
                if deleted || len(value) < 16 {
×
NEW
268
                        return nil
×
NEW
269
                }
×
270

271
                // Extract name from key: prefix + DatabaseID + name
NEW
272
                nameStart := len(prefix)
×
NEW
273
                if nameStart >= len(key) {
×
NEW
274
                        return nil
×
NEW
275
                }
×
NEW
276
                name := string(key[nameStart:])
×
NEW
277

×
NEW
278
                currValue := int64(binary.BigEndian.Uint64(value[0:8]))
×
NEW
279
                increment := int64(binary.BigEndian.Uint64(value[8:16]))
×
NEW
280

×
NEW
281
                if e.sequences == nil {
×
NEW
282
                        e.sequences = make(map[string]*Sequence)
×
NEW
283
                }
×
284

NEW
285
                e.sequences[name] = &Sequence{
×
NEW
286
                        name:      name,
×
NEW
287
                        currValue: currValue,
×
NEW
288
                        increment: increment,
×
NEW
289
                        started:   currValue != 0,
×
NEW
290
                }
×
NEW
291

×
NEW
292
                return nil
×
293
        })
294
}
295

296
type MultiDBHandler interface {
297
        ListDatabases(ctx context.Context) ([]string, error)
298
        CreateDatabase(ctx context.Context, db string, ifNotExists bool) error
299
        UseDatabase(ctx context.Context, db string) error
300
        GetLoggedUser(ctx context.Context) (User, error)
301
        ListUsers(ctx context.Context) ([]User, error)
302
        CreateUser(ctx context.Context, username, password string, permission Permission) error
303
        AlterUser(ctx context.Context, username, password string, permission Permission) error
304
        GrantSQLPrivileges(ctx context.Context, database, username string, privileges []SQLPrivilege) error
305
        RevokeSQLPrivileges(ctx context.Context, database, username string, privileges []SQLPrivilege) error
306
        DropUser(ctx context.Context, username string) error
307
        ExecPreparedStmts(ctx context.Context, opts *TxOptions, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error)
308
}
309

310
type TableResolver interface {
311
        Table() string
312
        Resolve(ctx context.Context, tx *SQLTx, alias string) (RowReader, error)
313
}
314

315
type User interface {
316
        Username() string
317
        Permission() Permission
318
        SQLPrivileges() []SQLPrivilege
319
}
320

321
func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) {
1,914✔
322
        if st == nil {
1,915✔
323
                return nil, ErrIllegalArguments
1✔
324
        }
1✔
325

326
        if !st.MultiIndexingEnabled() {
1,914✔
327
                return nil, ErrMultiIndexingNotEnabled
1✔
328
        }
1✔
329

330
        err := opts.Validate()
1,912✔
331
        if err != nil {
1,912✔
332
                return nil, err
×
333
        }
×
334

335
        // Apply the engine-side variable-key cap override, if requested.
336
        // This mutates a package-level variable that other call sites
337
        // (EncodeRawValueAsKey, the document layer, etc.) consult — the
338
        // global is intentional, matching the existing MaxKeyLen design.
339
        // Zero leaves the package default in place.
340
        if opts.maxKeyLen != 0 {
1,912✔
NEW
341
                MaxKeyLen = opts.maxKeyLen
×
NEW
342
        }
×
343

344
        e := &Engine{
1,912✔
345
                store:                         st,
1,912✔
346
                prefix:                        make([]byte, len(opts.prefix)),
1,912✔
347
                distinctLimit:                 opts.distinctLimit,
1,912✔
348
                distinctSpillThreshold:        opts.distinctSpillThreshold,
1,912✔
349
                sortBufferSize:                opts.sortBufferSize,
1,912✔
350
                maxWindowRows:                 opts.maxWindowRows,
1,912✔
351
                autocommit:                    opts.autocommit,
1,912✔
352
                lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation,
1,912✔
353
                parseTxMetadata:               opts.parseTxMetadata,
1,912✔
354
                multidbHandler:                opts.multidbHandler,
1,912✔
355
        }
1,912✔
356

1,912✔
357
        copy(e.prefix, opts.prefix)
1,912✔
358

1,912✔
359
        err = st.InitIndexing(&store.IndexSpec{
1,912✔
360
                SourcePrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,912✔
361
                TargetPrefix:     append(e.prefix, []byte(catalogPrefix)...),
1,912✔
362
                InjectiveMapping: true,
1,912✔
363
        })
1,912✔
364
        if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
1,912✔
365
                return nil, err
×
366
        }
×
367

368
        for _, r := range opts.tableResolvers {
1,913✔
369
                e.registerTableResolver(r.Table(), r)
1✔
370
        }
1✔
371

372
        // TODO: find a better way to handle parsing errors
373
        yyErrorVerbose = true
1,912✔
374

1,912✔
375
        return e, nil
1,912✔
376
}
377

378
func (e *Engine) NewTx(ctx context.Context, opts *TxOptions) (*SQLTx, error) {
6,345✔
379
        err := opts.Validate()
6,345✔
380
        if err != nil {
6,345✔
381
                return nil, err
×
382
        }
×
383

384
        var mode store.TxMode
6,345✔
385
        if opts.ReadOnly {
7,772✔
386
                mode = store.ReadOnlyTx
1,427✔
387
        } else {
6,345✔
388
                mode = store.ReadWriteTx
4,918✔
389
        }
4,918✔
390

391
        txOpts := &store.TxOptions{
6,345✔
392
                Mode:                    mode,
6,345✔
393
                SnapshotMustIncludeTxID: opts.SnapshotMustIncludeTxID,
6,345✔
394
                SnapshotRenewalPeriod:   opts.SnapshotRenewalPeriod,
6,345✔
395
                UnsafeMVCC:              opts.UnsafeMVCC,
6,345✔
396
        }
6,345✔
397

6,345✔
398
        tx, err := e.store.NewTx(ctx, txOpts)
6,345✔
399
        if err != nil {
6,346✔
400
                return nil, err
1✔
401
        }
1✔
402

403
        if len(opts.Extra) > 0 {
6,620✔
404
                txmd := store.NewTxMetadata()
276✔
405
                err := txmd.WithExtra(opts.Extra)
276✔
406
                if err != nil {
276✔
407
                        return nil, err
×
408
                }
×
409

410
                tx.WithMetadata(txmd)
276✔
411
        }
412

413
        // Try to reuse the engine-level catalog cache.
414
        // catalog.load() scans all table/column/index metadata from the B-tree on
415
        // every transaction open — skipping it eliminates the dominant per-query
416
        // overhead for autocommit SELECT workloads, and (via Clone) for DML-heavy
417
        // read-write workloads as well.
418
        //
419
        // openCatalogVersion is captured once here under the same RLock as
420
        // cachedCatalog so the (catalog, version) pair seen by this tx is
421
        // coherent. tryPopulateCatalogCache rejects populates whose version
422
        // disagrees with what subsequently lives on the engine.
423
        e.catalogMu.RLock()
6,344✔
424
        cached := e.cachedCatalog
6,344✔
425
        openVersion := e.cachedCatalogVersion.Load()
6,344✔
426
        e.catalogMu.RUnlock()
6,344✔
427

6,344✔
428
        if cached != nil && opts.ReadOnly {
7,574✔
429
                // Read-only transactions cannot mutate the schema, so they share the
1,230✔
430
                // cached catalog directly. Sequences and views are already loaded in
1,230✔
431
                // engine fields.
1,230✔
432
                CatalogCacheHitObserver()
1,230✔
433
                return &SQLTx{
1,230✔
434
                        engine:             e,
1,230✔
435
                        opts:               opts,
1,230✔
436
                        tx:                 tx,
1,230✔
437
                        catalog:            cached,
1,230✔
438
                        openCatalogVersion: openVersion,
1,230✔
439
                        lastInsertedPKs:    make(map[string]int64),
1,230✔
440
                        firstInsertedPKs:   make(map[string]int64),
1,230✔
441
                }, nil
1,230✔
442
        }
1,230✔
443

444
        var catalog *Catalog
5,114✔
445
        if cached != nil {
9,136✔
446
                // Read-write tx with a warm cache: clone so any DDL this tx performs
4,022✔
447
                // lands on the local copy, not the shared cache. Reset maxPK on the
4,022✔
448
                // clones — the authoritative value comes from the snapshot-dependent
4,022✔
449
                // loadMaxPK loop below and must not leak the cache's stale value into
4,022✔
450
                // an empty-table case (where loadMaxPK returns ErrNoMoreEntries and
4,022✔
451
                // the existing code falls through without resetting).
4,022✔
452
                CatalogCacheHitObserver()
4,022✔
453
                catalog = cached.Clone()
4,022✔
454
                for _, table := range catalog.GetTables() {
8,805✔
455
                        table.maxPK = 0
4,783✔
456
                }
4,783✔
457
                // D2 Phase 2 correctness: catalog.load was skipped, so the MVCC
458
                // read-set has no record of the catalog rows this tx implicitly
459
                // observed. Without this seed, two concurrent txs doing
460
                // conflicting DDL would both commit (no read-conflict tripwire).
461
                // See immudb-improvements.md "Open question #2".
462
                if err := e.seedCatalogReadSet(ctx, tx, catalog.GetTables()); err != nil {
4,022✔
NEW
463
                        return nil, err
×
NEW
464
                }
×
465
        } else {
1,092✔
466
                CatalogCacheMissObserver()
1,092✔
467
                catalog = newCatalog(e.prefix)
1,092✔
468
                if err := catalog.load(ctx, tx); err != nil {
1,092✔
NEW
469
                        return nil, err
×
NEW
470
                }
×
471
        }
472

473
        // Load persisted sequences (only once, on first tx)
474
        if e.sequences == nil {
10,224✔
475
                if err := e.loadSequences(ctx, tx); err != nil {
5,110✔
NEW
476
                        // Non-fatal: sequences are optional
×
NEW
477
                        _ = err
×
NEW
478
                }
×
479
        }
480

481
        // Load persisted views (only once, on first tx open)
482
        if len(e.tableResolvers) == 0 {
10,216✔
483
                e.loadViews(ctx, tx)
5,102✔
484
        }
5,102✔
485

486
        for _, table := range catalog.GetTables() {
10,727✔
487
                primaryIndex := table.primaryIndex
5,613✔
488

5,613✔
489
                rowEntryPrefix := MapKey(
5,613✔
490
                        e.prefix,
5,613✔
491
                        RowPrefix,
5,613✔
492
                        EncodeID(DatabaseID),
5,613✔
493
                        EncodeID(table.id),
5,613✔
494
                        EncodeID(primaryIndex.id),
5,613✔
495
                )
5,613✔
496

5,613✔
497
                mappedPKEntryPrefix := MapKey(
5,613✔
498
                        e.prefix,
5,613✔
499
                        MappedPrefix,
5,613✔
500
                        EncodeID(table.id),
5,613✔
501
                        EncodeID(primaryIndex.id),
5,613✔
502
                )
5,613✔
503

5,613✔
504
                err = e.store.InitIndexing(&store.IndexSpec{
5,613✔
505
                        SourcePrefix: rowEntryPrefix,
5,613✔
506

5,613✔
507
                        TargetEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
5,613✔
508
                        TargetPrefix:      mappedPKEntryPrefix,
5,613✔
509

5,613✔
510
                        InjectiveMapping: true,
5,613✔
511
                })
5,613✔
512
                if err != nil && !errors.Is(err, store.ErrIndexAlreadyInitialized) {
5,613✔
513
                        return nil, err
×
514
                }
×
515

516
                for _, index := range table.indexes {
13,720✔
517
                        if index.IsPrimary() {
13,720✔
518
                                continue
5,613✔
519
                        }
520

521
                        mappedEntryPrefix := MapKey(
2,494✔
522
                                e.prefix,
2,494✔
523
                                MappedPrefix,
2,494✔
524
                                EncodeID(table.id),
2,494✔
525
                                EncodeID(index.id),
2,494✔
526
                        )
2,494✔
527

2,494✔
528
                        err = e.store.InitIndexing(&store.IndexSpec{
2,494✔
529
                                SourcePrefix:      rowEntryPrefix,
2,494✔
530
                                SourceEntryMapper: indexEntryMapperFor(primaryIndex, primaryIndex),
2,494✔
531
                                TargetEntryMapper: indexEntryMapperFor(index, primaryIndex),
2,494✔
532
                                TargetPrefix:      mappedEntryPrefix,
2,494✔
533

2,494✔
534
                                InjectiveMapping: true,
2,494✔
535
                        })
2,494✔
536
                        if errors.Is(err, store.ErrIndexAlreadyInitialized) {
4,847✔
537
                                continue
2,353✔
538
                        }
539
                        if err != nil {
141✔
540
                                return nil, err
×
541
                        }
×
542
                }
543

544
                if table.autoIncrementPK {
7,267✔
545
                        encMaxPK, err := loadMaxPK(ctx, e.prefix, tx, table)
1,654✔
546
                        if errors.Is(err, store.ErrNoMoreEntries) {
1,985✔
547
                                continue
331✔
548
                        }
549
                        if err != nil {
1,323✔
550
                                return nil, err
×
551
                        }
×
552

553
                        if len(encMaxPK) != 9 {
1,323✔
554
                                return nil, ErrCorruptedData
×
555
                        }
×
556

557
                        if encMaxPK[0] != KeyValPrefixNotNull {
1,323✔
558
                                return nil, ErrCorruptedData
×
559
                        }
×
560

561
                        // map to signed integer space
562
                        encMaxPK[1] ^= 0x80
1,323✔
563

1,323✔
564
                        table.maxPK = int64(binary.BigEndian.Uint64(encMaxPK[1:]))
1,323✔
565
                }
566
        }
567

568
        // Populate the read-only catalog cache so subsequent autocommit SELECTs
569
        // bypass catalog.load() entirely. Only cache when the caller did not
570
        // mutate the schema (write transactions are never cached).
571
        if opts.ReadOnly {
5,311✔
572
                e.catalogMu.Lock()
197✔
573
                if e.cachedCatalog == nil {
394✔
574
                        e.cachedCatalog = catalog
197✔
575
                }
197✔
576
                e.catalogMu.Unlock()
197✔
577
        }
578

579
        return &SQLTx{
5,114✔
580
                engine:             e,
5,114✔
581
                opts:               opts,
5,114✔
582
                tx:                 tx,
5,114✔
583
                catalog:            catalog,
5,114✔
584
                openCatalogVersion: openVersion,
5,114✔
585
                lastInsertedPKs:    make(map[string]int64),
5,114✔
586
                firstInsertedPKs:   make(map[string]int64),
5,114✔
587
        }, nil
5,114✔
588
}
589

590
// seedCatalogReadSet registers prefix-range fingerprints on the
591
// OngoingTx read-set for every catalog prefix that catalog.load
592
// would have walked. Used by NewTx's Clone fast path (D2 Phase 2)
593
// so DDL-vs-DDL conflict detection still fires even when the cached
594
// catalog skips the parse step.
595
//
596
// Uses OngoingTx.MarkPrefixScanned (R1 from the agent review) which
597
// records ONE fingerprint per prefix instead of per-entry
598
// expectedReads + expectedNoMoreEntries — same conflict guarantee,
599
// without the O(N) bookkeeping tax on every warm-cache NewTx.
600
func (e *Engine) seedCatalogReadSet(ctx context.Context, tx *store.OngoingTx, tables []*Table) error {
4,022✔
601
        prefixes := [][]byte{
4,022✔
602
                MapKey(e.prefix, catalogTablePrefix, EncodeID(DatabaseID)),
4,022✔
603
        }
4,022✔
604
        for _, t := range tables {
8,805✔
605
                prefixes = append(prefixes,
4,783✔
606
                        MapKey(e.prefix, catalogColumnPrefix, EncodeID(DatabaseID), EncodeID(t.id)),
4,783✔
607
                        MapKey(e.prefix, catalogIndexPrefix, EncodeID(DatabaseID), EncodeID(t.id)),
4,783✔
608
                        MapKey(e.prefix, catalogCheckPrefix, EncodeID(DatabaseID), EncodeID(t.id)),
4,783✔
609
                )
4,783✔
610
        }
4,783✔
611
        for _, p := range prefixes {
22,393✔
612
                if err := tx.MarkPrefixScanned(ctx, store.KeyReaderSpec{Prefix: p}); err != nil {
18,371✔
NEW
613
                        return err
×
NEW
614
                }
×
615
        }
616
        return nil
4,022✔
617
}
618

NEW
619
func drainKeyReader(ctx context.Context, tx *store.OngoingTx, prefix []byte) error {
×
NEW
620
        rdr, err := tx.NewKeyReader(store.KeyReaderSpec{Prefix: prefix})
×
NEW
621
        if err != nil {
×
NEW
622
                return err
×
NEW
623
        }
×
NEW
624
        defer rdr.Close()
×
NEW
625

×
NEW
626
        for {
×
NEW
627
                _, _, err := rdr.Read(ctx)
×
NEW
628
                if errors.Is(err, store.ErrNoMoreEntries) {
×
NEW
629
                        return nil
×
NEW
630
                }
×
NEW
631
                if err != nil {
×
NEW
632
                        return err
×
NEW
633
                }
×
634
        }
635
}
636

637
// invalidateCatalogCache clears the engine-level catalog cache. Called after
638
// any DDL transaction commits so the next read-only tx reloads from the store.
639
//
640
// The version counter is bumped under the same lock so a concurrent
641
// tryPopulateCatalogCache call sees a coherent (catalog, version) pair —
642
// either the old cached catalog with version V, or nil with version V+1.
643
// Never the inconsistent intermediate state where the catalog is nil but
644
// the version is still V.
645
func (e *Engine) invalidateCatalogCache() {
528✔
646
        e.catalogMu.Lock()
528✔
647
        e.cachedCatalog = nil
528✔
648
        e.cachedCatalogVersion.Add(1)
528✔
649
        e.catalogMu.Unlock()
528✔
650
}
528✔
651

652
// tryPopulateCatalogCache opportunistically promotes a freshly-committed
653
// (non-DDL) tx's catalog into the engine cache so subsequent NewTx calls
654
// can take the Clone fast path. No-op when:
655
//   - The cache is already warm (first-fill-wins, preserves coherence
656
//     even if the existing entry came from a different tx).
657
//   - cachedCatalogVersion has changed since this tx opened — meaning a
658
//     concurrent DDL invalidated the cache, and our snapshot is by now
659
//     stale relative to the next valid view.
660
//
661
// The catalog reference handed in is shared post-publication: callers
662
// MUST NOT mutate the catalog after handing it over.
663
func (e *Engine) tryPopulateCatalogCache(catalog *Catalog, openVersion uint64) {
4,155✔
664
        if catalog == nil {
4,155✔
NEW
665
                return
×
NEW
666
        }
×
667
        e.catalogMu.Lock()
4,155✔
668
        defer e.catalogMu.Unlock()
4,155✔
669

4,155✔
670
        if e.cachedCatalog != nil {
8,089✔
671
                return
3,934✔
672
        }
3,934✔
673
        if e.cachedCatalogVersion.Load() != openVersion {
221✔
NEW
674
                return
×
NEW
675
        }
×
676
        e.cachedCatalog = catalog
221✔
677
}
678

679
func indexEntryMapperFor(index, primaryIndex *Index) store.EntryMapper {
10,601✔
680
        // value={count (colID valLen val)+})
10,601✔
681
        // key=M.{tableID}{indexID}({null}({val}{padding}{valLen})?)+({pkVal}{padding}{pkValLen})+
10,601✔
682

10,601✔
683
        valueExtractor := func(value []byte, valuesByColID map[uint32]TypedValue) error {
32,336✔
684
                voff := 0
21,735✔
685

21,735✔
686
                cols := int(binary.BigEndian.Uint32(value[voff:]))
21,735✔
687
                voff += EncLenLen
21,735✔
688

21,735✔
689
                for i := 0; i < cols; i++ {
108,312✔
690
                        if len(value) < EncIDLen {
86,577✔
691
                                return fmt.Errorf("key is lower than required")
×
692
                        }
×
693

694
                        colID := binary.BigEndian.Uint32(value[voff:])
86,577✔
695
                        voff += EncIDLen
86,577✔
696

86,577✔
697
                        col, err := index.table.GetColumnByID(colID)
86,577✔
698
                        if errors.Is(err, ErrColumnDoesNotExist) {
86,657✔
699
                                vlen := int(binary.BigEndian.Uint32(value[voff:]))
80✔
700
                                voff += EncLenLen + vlen
80✔
701
                                continue
80✔
702
                        } else if err != nil {
86,497✔
703
                                return err
×
704
                        }
×
705

706
                        val, n, err := DecodeValue(value[voff:], col.colType)
86,497✔
707
                        if err != nil {
86,497✔
708
                                return err
×
709
                        }
×
710

711
                        voff += n
86,497✔
712

86,497✔
713
                        valuesByColID[colID] = val
86,497✔
714
                }
715

716
                return nil
21,735✔
717
        }
718

719
        return func(key, value []byte) ([]byte, error) {
32,336✔
720
                encodedValues := make([][]byte, 2+len(index.cols)+1)
21,735✔
721
                encodedValues[0] = EncodeID(index.table.id)
21,735✔
722
                encodedValues[1] = EncodeID(index.id)
21,735✔
723

21,735✔
724
                valuesByColID := make(map[uint32]TypedValue, len(index.cols))
21,735✔
725

21,735✔
726
                for _, col := range index.table.cols {
109,261✔
727
                        valuesByColID[col.id] = &NullValue{t: col.colType}
87,526✔
728
                }
87,526✔
729

730
                err := valueExtractor(value, valuesByColID)
21,735✔
731
                if err != nil {
21,735✔
732
                        return nil, err
×
733
                }
×
734

735
                for i, col := range index.cols {
46,666✔
736
                        encKey, _, err := EncodeValueAsKey(valuesByColID[col.id], col.Type(), col.MaxLen())
24,931✔
737
                        if err != nil {
24,931✔
738
                                return nil, err
×
739
                        }
×
740

741
                        encodedValues[2+i] = encKey
24,931✔
742
                }
743

744
                pkEncVals, err := encodedKey(primaryIndex, valuesByColID)
21,735✔
745
                if err != nil {
21,735✔
746
                        return nil, err
×
747
                }
×
748

749
                encodedValues[len(encodedValues)-1] = pkEncVals
21,735✔
750

21,735✔
751
                return MapKey(index.enginePrefix(), MappedPrefix, encodedValues...), nil
21,735✔
752
        }
753
}
754

755
func (e *Engine) Exec(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
5,341✔
756
        stmts, err := ParseSQL(strings.NewReader(sql))
5,341✔
757
        if err != nil {
5,344✔
758
                return nil, nil, fmt.Errorf("%w: %v", ErrParsingError, err)
3✔
759
        }
3✔
760

761
        return e.ExecPreparedStmts(ctx, tx, stmts, params)
5,338✔
762
}
763

764
func (e *Engine) ExecPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, err error) {
6,104✔
765
        ntx, ctxs, pendingStmts, err := e.execPreparedStmts(ctx, tx, stmts, params)
6,104✔
766
        if err != nil {
6,268✔
767
                return ntx, ctxs, err
164✔
768
        }
164✔
769

770
        if len(pendingStmts) > 0 {
5,942✔
771
                // a different database was selected
2✔
772

2✔
773
                if e.multidbHandler == nil || ntx != nil {
2✔
774
                        return ntx, ctxs, fmt.Errorf("%w: all statements should have been executed when not using a multidbHandler", ErrUnexpected)
×
775
                }
×
776

777
                var opts *TxOptions
2✔
778

2✔
779
                if tx != nil {
3✔
780
                        opts = tx.opts
1✔
781
                } else {
2✔
782
                        opts = DefaultTxOptions()
1✔
783
                }
1✔
784

785
                ntx, hctxs, err := e.multidbHandler.ExecPreparedStmts(ctx, opts, pendingStmts, params)
2✔
786

2✔
787
                return ntx, append(ctxs, hctxs...), err
2✔
788
        }
789

790
        return ntx, ctxs, nil
5,938✔
791
}
792

793
func (e *Engine) execPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt, params map[string]interface{}) (ntx *SQLTx, committedTxs []*SQLTx, pendingStmts []SQLStmt, err error) {
6,104✔
794
        if len(stmts) == 0 {
6,104✔
795
                return nil, nil, stmts, ErrIllegalArguments
×
796
        }
×
797

798
        nparams, err := normalizeParams(params)
6,104✔
799
        if err != nil {
6,105✔
800
                return nil, nil, stmts, err
1✔
801
        }
1✔
802

803
        currTx := tx
6,103✔
804

6,103✔
805
        execStmts := 0
6,103✔
806

6,103✔
807
        for _, stmt := range stmts {
12,447✔
808
                if stmt == nil {
6,344✔
809
                        return nil, nil, stmts[execStmts:], ErrIllegalArguments
×
810
                }
×
811

812
                _, isDBSelectionStmt := stmt.(*UseDatabaseStmt)
6,344✔
813

6,344✔
814
                // handle the case when working in non-autocommit mode outside a transaction block
6,344✔
815
                if isDBSelectionStmt && (currTx != nil && !currTx.Closed()) && !currTx.IsExplicitCloseRequired() {
6,345✔
816
                        err = currTx.Commit(ctx)
1✔
817
                        if err == nil {
2✔
818
                                committedTxs = append(committedTxs, currTx)
1✔
819
                        }
1✔
820
                        if err != nil {
1✔
821
                                return nil, committedTxs, stmts[execStmts:], err
×
822
                        }
×
823
                }
824

825
                if currTx == nil || currTx.Closed() {
10,757✔
826
                        var opts *TxOptions
4,413✔
827

4,413✔
828
                        if currTx != nil {
4,421✔
829
                                opts = currTx.opts
8✔
830
                        } else if tx != nil {
4,414✔
831
                                opts = tx.opts
1✔
832
                        } else {
4,405✔
833
                                opts = DefaultTxOptions()
4,404✔
834
                        }
4,404✔
835

836
                        // begin tx with implicit commit
837
                        currTx, err = e.NewTx(ctx, opts)
4,413✔
838
                        if err != nil {
4,414✔
839
                                return nil, committedTxs, stmts[execStmts:], err
1✔
840
                        }
1✔
841
                }
842

843
                if e.multidbHandler != nil {
6,912✔
844
                        if err := e.checkUserPermissions(ctx, stmt); err != nil {
571✔
845
                                currTx.Cancel()
2✔
846
                                return nil, committedTxs, stmts[execStmts:], err
2✔
847
                        }
2✔
848
                }
849

850
                ntx, err := stmt.execAt(ctx, currTx, nparams)
6,341✔
851
                if err != nil {
6,501✔
852
                        currTx.Cancel()
160✔
853
                        return nil, committedTxs, stmts[execStmts:], err
160✔
854
                }
160✔
855

856
                if !currTx.Closed() && !currTx.IsExplicitCloseRequired() && e.autocommit {
6,189✔
857
                        err = currTx.Commit(ctx)
8✔
858
                        if err != nil {
8✔
859
                                return nil, committedTxs, stmts[execStmts:], err
×
860
                        }
×
861
                }
862

863
                if currTx.Closed() {
6,349✔
864
                        committedTxs = append(committedTxs, currTx)
168✔
865
                }
168✔
866

867
                currTx = ntx
6,181✔
868

6,181✔
869
                execStmts++
6,181✔
870

6,181✔
871
                if isDBSelectionStmt && e.multidbHandler != nil {
6,186✔
872
                        break
5✔
873
                }
874
        }
875

876
        if currTx != nil && !currTx.Closed() && !currTx.IsExplicitCloseRequired() {
10,383✔
877
                err = currTx.Commit(ctx)
4,443✔
878
                if err != nil {
4,443✔
879
                        return nil, committedTxs, stmts[execStmts:], err
×
880
                }
×
881

882
                committedTxs = append(committedTxs, currTx)
4,443✔
883
        }
884

885
        if currTx != nil && currTx.Closed() {
10,386✔
886
                currTx = nil
4,446✔
887
        }
4,446✔
888

889
        return currTx, committedTxs, stmts[execStmts:], nil
5,940✔
890
}
891

892
func (e *Engine) checkUserPermissions(ctx context.Context, stmt SQLStmt) error {
971✔
893
        user, err := e.multidbHandler.GetLoggedUser(ctx)
971✔
894
        if err != nil {
971✔
895
                return err
×
896
        }
×
897

898
        if !stmt.readOnly() && user.Permission() == PermissionReadOnly {
973✔
899
                return fmt.Errorf("%w: statement requires %s permission", ErrAccessDenied, PermissionReadWrite)
2✔
900
        }
2✔
901

902
        requiredPrivileges := stmt.requiredPrivileges()
969✔
903
        if !hasAllPrivileges(user.SQLPrivileges(), requiredPrivileges) {
970✔
904
                return fmt.Errorf("%w: statement requires %v privileges", ErrAccessDenied, requiredPrivileges)
1✔
905
        }
1✔
906
        return nil
968✔
907
}
908

909
func hasAllPrivileges(userPrivileges, privileges []SQLPrivilege) bool {
969✔
910
        for _, p := range privileges {
1,818✔
911
                has := false
849✔
912
                for _, up := range userPrivileges {
2,504✔
913
                        if up == p {
2,503✔
914
                                has = true
848✔
915
                                break
848✔
916
                        }
917
                }
918

919
                if !has {
850✔
920
                        return false
1✔
921
                }
1✔
922
        }
923
        return true
968✔
924
}
925

926
func (e *Engine) queryAll(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) ([]*Row, error) {
87✔
927
        reader, err := e.Query(ctx, tx, sql, params)
87✔
928
        if err != nil {
94✔
929
                return nil, err
7✔
930
        }
7✔
931
        defer reader.Close()
80✔
932

80✔
933
        return ReadAllRows(ctx, reader)
80✔
934
}
935

936
func (e *Engine) Query(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (RowReader, error) {
698✔
937
        stmts, err := ParseSQL(strings.NewReader(sql))
698✔
938
        if err != nil {
699✔
939
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
1✔
940
        }
1✔
941
        if len(stmts) != 1 {
698✔
942
                return nil, ErrExpectingDQLStmt
1✔
943
        }
1✔
944

945
        stmt, ok := stmts[0].(DataSource)
696✔
946
        if !ok {
697✔
947
                return nil, ErrExpectingDQLStmt
1✔
948
        }
1✔
949

950
        return e.QueryPreparedStmt(ctx, tx, stmt, params)
695✔
951
}
952

953
func (e *Engine) QueryPreparedStmt(ctx context.Context, tx *SQLTx, stmt DataSource, params map[string]interface{}) (rowReader RowReader, err error) {
1,149✔
954
        if stmt == nil {
1,150✔
955
                return nil, ErrIllegalArguments
1✔
956
        }
1✔
957

958
        qtx := tx
1,148✔
959

1,148✔
960
        if qtx == nil {
2,163✔
961
                readOnly := stmt.readOnly()
1,015✔
962
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(readOnly))
1,015✔
963
                if err != nil {
1,015✔
UNCOV
964
                        return nil, err
×
UNCOV
965
                }
×
966
                defer func() {
2,030✔
967
                        if err != nil {
1,044✔
968
                                qtx.Cancel()
29✔
969
                        }
29✔
970
                }()
971
        }
972

973
        nparams, err := normalizeParams(params)
1,148✔
974
        if err != nil {
1,149✔
975
                return nil, err
1✔
976
        }
1✔
977

978
        if e.multidbHandler != nil {
1,549✔
979
                if err := e.checkUserPermissions(ctx, stmt); err != nil {
403✔
980
                        return nil, err
1✔
981
                }
1✔
982
        }
983

984
        _, err = stmt.execAt(ctx, qtx, nparams)
1,146✔
985
        if err != nil {
1,151✔
986
                return nil, err
5✔
987
        }
5✔
988

989
        r, err := stmt.Resolve(ctx, qtx, nparams, nil)
1,141✔
990
        if err != nil {
1,167✔
991
                return nil, err
26✔
992
        }
26✔
993

994
        if tx == nil {
2,101✔
995
                if stmt.readOnly() {
1,953✔
996
                        r.onClose(func() {
1,934✔
997
                                qtx.Cancel()
967✔
998
                        })
967✔
999
                } else {
19✔
1000
                        // DML with RETURNING: commit the transaction on close
19✔
1001
                        r.onClose(func() {
38✔
1002
                                qtx.Commit(ctx)
19✔
1003
                        })
19✔
1004
                }
1005
        }
1006

1007
        return r, nil
1,115✔
1008
}
1009

1010
func (e *Engine) Catalog(ctx context.Context, tx *SQLTx) (catalog *Catalog, err error) {
11✔
1011
        qtx := tx
11✔
1012

11✔
1013
        if qtx == nil {
22✔
1014
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
11✔
1015
                if err != nil {
11✔
1016
                        return nil, err
×
1017
                }
×
1018
                defer qtx.Cancel()
11✔
1019
        }
1020

1021
        return qtx.Catalog(), nil
11✔
1022
}
1023

1024
func (e *Engine) InferParameters(ctx context.Context, tx *SQLTx, sql string) (params map[string]SQLValueType, err error) {
74✔
1025
        stmts, err := ParseSQL(strings.NewReader(sql))
74✔
1026
        if err != nil {
76✔
1027
                return nil, fmt.Errorf("%w: %v", ErrParsingError, err)
2✔
1028
        }
2✔
1029
        return e.InferParametersPreparedStmts(ctx, tx, stmts)
72✔
1030
}
1031

1032
func (e *Engine) InferParametersPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLStmt) (params map[string]SQLValueType, err error) {
224✔
1033
        if len(stmts) == 0 {
225✔
1034
                return nil, ErrIllegalArguments
1✔
1035
        }
1✔
1036

1037
        qtx := tx
223✔
1038

223✔
1039
        if qtx == nil {
446✔
1040
                qtx, err = e.NewTx(ctx, DefaultTxOptions().WithReadOnly(true))
223✔
1041
                if err != nil {
223✔
1042
                        return nil, err
×
1043
                }
×
1044
                defer qtx.Cancel()
223✔
1045
        }
1046

1047
        params = make(map[string]SQLValueType)
223✔
1048

223✔
1049
        for _, stmt := range stmts {
451✔
1050
                err = stmt.inferParameters(ctx, qtx, params)
228✔
1051
                if err != nil {
243✔
1052
                        return nil, err
15✔
1053
                }
15✔
1054
        }
1055

1056
        return params, nil
208✔
1057
}
1058

1059
func normalizeParams(params map[string]interface{}) (map[string]interface{}, error) {
7,252✔
1060
        nparams := make(map[string]interface{}, len(params))
7,252✔
1061

7,252✔
1062
        for name, value := range params {
14,769✔
1063
                nname := strings.ToLower(name)
7,517✔
1064

7,517✔
1065
                _, exists := nparams[nname]
7,517✔
1066
                if exists {
7,519✔
1067
                        return nil, ErrDuplicatedParameters
2✔
1068
                }
2✔
1069

1070
                nparams[nname] = value
7,515✔
1071
        }
1072

1073
        return nparams, nil
7,250✔
1074
}
1075

1076
// CopyCatalogToTx copies the current sql catalog to the ongoing transaction.
1077
func (e *Engine) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
44✔
1078
        if tx == nil {
59✔
1079
                return ErrIllegalArguments
15✔
1080
        }
15✔
1081

1082
        catalog := newCatalog(e.prefix)
29✔
1083

29✔
1084
        err := catalog.addSchemaToTx(ctx, tx)
29✔
1085
        if err != nil {
29✔
1086
                return err
×
1087
        }
×
1088

1089
        return nil
29✔
1090
}
1091

1092
func (e *Engine) GetStore() *store.ImmuStore {
175✔
1093
        return e.store
175✔
1094
}
175✔
1095

1096
func (e *Engine) GetPrefix() []byte {
17✔
1097
        return e.prefix
17✔
1098
}
17✔
1099

1100
func (e *Engine) tableResolveFor(tableName string) TableResolver {
410✔
1101
        if e.tableResolvers == nil {
438✔
1102
                return nil
28✔
1103
        }
28✔
1104
        return e.tableResolvers[tableName]
382✔
1105
}
1106

1107
func (e *Engine) registerTableResolver(tableName string, r TableResolver) {
176✔
1108
        if e.tableResolvers == nil {
193✔
1109
                e.tableResolvers = make(map[string]TableResolver)
17✔
1110
        }
17✔
1111
        e.tableResolvers[tableName] = r
176✔
1112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc