• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.06
/embedded/sql/sql_tx.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "os"
24
        "time"
25

26
        "github.com/codenotary/immudb/embedded/multierr"
27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
// savepointState captures SQLTx state at a SAVEPOINT for later rollback.
31
type savepointState struct {
32
        updatedRows      int
33
        lastInsertedPKs  map[string]int64
34
        firstInsertedPKs map[string]int64
35
        mutatedCatalog   bool
36
}
37

38
// SQLTx (no-thread safe) represents an interactive or incremental transaction with support of RYOW
39
type SQLTx struct {
40
        engine *Engine
41

42
        opts *TxOptions
43

44
        tx        *store.OngoingTx
45
        tempFiles []*os.File
46

47
        catalog *Catalog // in-mem catalog
48

49
        // openCatalogVersion is engine.cachedCatalogVersion at the time this tx
50
        // was opened. Compared on commit (D2 Phase 2) to determine whether this
51
        // tx's catalog can safely populate the engine cache: if the version has
52
        // changed, an invalidation happened concurrently and our view is stale.
53
        openCatalogVersion uint64
54

55
        mutatedCatalog bool // set when a DDL stmt was executed within the current tx
56

57
        updatedRows      int
58
        lastInsertedPKs  map[string]int64 // last inserted PK by table name
59
        firstInsertedPKs map[string]int64 // first inserted PK by table name
60

61
        txHeader *store.TxHeader // header is set once tx is committed
62

63
        onCommittedCallbacks []onCommittedCallback
64

65
        savepoints map[string]*savepointState
66
}
67

68
type onCommittedCallback = func(sqlTx *SQLTx) error
69

70
func (sqlTx *SQLTx) Catalog() *Catalog {
396✔
71
        return sqlTx.catalog
396✔
72
}
396✔
73

74
func (sqlTx *SQLTx) IsExplicitCloseRequired() bool {
12,068✔
75
        return sqlTx.opts.ExplicitClose
12,068✔
76
}
12,068✔
77

78
func (sqlTx *SQLTx) RequireExplicitClose() error {
68✔
79
        if sqlTx.updatedRows != 0 {
69✔
80
                return store.ErrIllegalState
1✔
81
        }
1✔
82

83
        sqlTx.opts.ExplicitClose = true
67✔
84

67✔
85
        return nil
67✔
86
}
87

88
func (sqlTx *SQLTx) Timestamp() time.Time {
166✔
89
        return sqlTx.tx.Timestamp()
166✔
90
}
166✔
91

92
func (sqlTx *SQLTx) UpdatedRows() int {
226✔
93
        return sqlTx.updatedRows
226✔
94
}
226✔
95

96
func (sqlTx *SQLTx) LastInsertedPKs() map[string]int64 {
423✔
97
        return sqlTx.lastInsertedPKs
423✔
98
}
423✔
99

100
func (sqlTx *SQLTx) FirstInsertedPKs() map[string]int64 {
205✔
101
        return sqlTx.firstInsertedPKs
205✔
102
}
205✔
103

104
func (sqlTx *SQLTx) TxHeader() *store.TxHeader {
348✔
105
        return sqlTx.txHeader
348✔
106
}
348✔
107

108
func (sqlTx *SQLTx) sqlPrefix() []byte {
15,650✔
109
        return sqlTx.engine.prefix
15,650✔
110
}
15,650✔
111

112
func (sqlTx *SQLTx) distinctLimit() int {
74✔
113
        return sqlTx.engine.distinctLimit
74✔
114
}
74✔
115

116
func (sqlTx *SQLTx) distinctSpillThreshold() int {
13✔
117
        return sqlTx.engine.distinctSpillThreshold
13✔
118
}
13✔
119

120
func (sqlTx *SQLTx) newKeyReader(rSpec store.KeyReaderSpec) (store.KeyReader, error) {
1,264✔
121
        return sqlTx.tx.NewKeyReader(rSpec)
1,264✔
122
}
1,264✔
123

124
func (sqlTx *SQLTx) get(ctx context.Context, key []byte) (store.ValueRef, error) {
5,539✔
125
        return sqlTx.tx.Get(ctx, key)
5,539✔
126
}
5,539✔
127

128
func (sqlTx *SQLTx) set(key []byte, metadata *store.KVMetadata, value []byte) error {
7,921✔
129
        return sqlTx.tx.Set(key, metadata, value)
7,921✔
130
}
7,921✔
131

132
func (sqlTx *SQLTx) setTransient(key []byte, metadata *store.KVMetadata, value []byte) error {
2,068✔
133
        return sqlTx.tx.SetTransient(key, metadata, value)
2,068✔
134
}
2,068✔
135

136
func (sqlTx *SQLTx) getWithPrefix(ctx context.Context, prefix, neq []byte) (key []byte, valRef store.ValueRef, err error) {
107✔
137
        return sqlTx.tx.GetWithPrefix(ctx, prefix, neq)
107✔
138
}
107✔
139

NEW
140
func (sqlTx *SQLTx) Savepoint(name string) {
×
NEW
141
        if sqlTx.savepoints == nil {
×
NEW
142
                sqlTx.savepoints = make(map[string]*savepointState)
×
NEW
143
        }
×
144

145
        // Copy current state
NEW
146
        lastPKs := make(map[string]int64)
×
NEW
147
        for k, v := range sqlTx.lastInsertedPKs {
×
NEW
148
                lastPKs[k] = v
×
NEW
149
        }
×
NEW
150
        firstPKs := make(map[string]int64)
×
NEW
151
        for k, v := range sqlTx.firstInsertedPKs {
×
NEW
152
                firstPKs[k] = v
×
NEW
153
        }
×
154

NEW
155
        sqlTx.savepoints[name] = &savepointState{
×
NEW
156
                updatedRows:      sqlTx.updatedRows,
×
NEW
157
                lastInsertedPKs:  lastPKs,
×
NEW
158
                firstInsertedPKs: firstPKs,
×
NEW
159
                mutatedCatalog:   sqlTx.mutatedCatalog,
×
NEW
160
        }
×
161
}
162

NEW
163
func (sqlTx *SQLTx) RollbackToSavepoint(name string) error {
×
NEW
164
        if sqlTx.savepoints == nil {
×
NEW
165
                return fmt.Errorf("savepoint %s does not exist", name)
×
NEW
166
        }
×
167

NEW
168
        sp, ok := sqlTx.savepoints[name]
×
NEW
169
        if !ok {
×
NEW
170
                return fmt.Errorf("savepoint %s does not exist", name)
×
NEW
171
        }
×
172

173
        // Restore state
NEW
174
        sqlTx.updatedRows = sp.updatedRows
×
NEW
175
        sqlTx.lastInsertedPKs = sp.lastInsertedPKs
×
NEW
176
        sqlTx.firstInsertedPKs = sp.firstInsertedPKs
×
NEW
177
        sqlTx.mutatedCatalog = sp.mutatedCatalog
×
NEW
178

×
NEW
179
        // Remove this savepoint and any created after it
×
NEW
180
        // (PostgreSQL behavior: ROLLBACK TO destroys savepoints created after the named one)
×
NEW
181
        delete(sqlTx.savepoints, name)
×
NEW
182

×
NEW
183
        return nil
×
184
}
185

NEW
186
func (sqlTx *SQLTx) ReleaseSavepoint(name string) error {
×
NEW
187
        if sqlTx.savepoints == nil {
×
NEW
188
                return fmt.Errorf("savepoint %s does not exist", name)
×
NEW
189
        }
×
190

NEW
191
        if _, ok := sqlTx.savepoints[name]; !ok {
×
NEW
192
                return fmt.Errorf("savepoint %s does not exist", name)
×
NEW
193
        }
×
194

NEW
195
        delete(sqlTx.savepoints, name)
×
NEW
196
        return nil
×
197
}
198

199
func (sqlTx *SQLTx) Cancel() error {
1,953✔
200
        defer sqlTx.removeTempFiles()
1,953✔
201

1,953✔
202
        return sqlTx.tx.Cancel()
1,953✔
203
}
1,953✔
204

205
func (sqlTx *SQLTx) Commit(ctx context.Context) error {
4,693✔
206
        defer sqlTx.removeTempFiles()
4,693✔
207

4,693✔
208
        err := sqlTx.tx.RequireMVCCOnFollowingTxs(sqlTx.mutatedCatalog)
4,693✔
209
        if err != nil {
4,693✔
210
                return err
×
211
        }
×
212

213
        // no need to wait for indexing to be up to date during commit phase
214
        sqlTx.txHeader, err = sqlTx.tx.AsyncCommit(ctx)
4,693✔
215
        if err != nil && !errors.Is(err, store.ErrNoEntriesProvided) {
4,703✔
216
                return err
10✔
217
        }
10✔
218

219
        // DDL committed: the cached catalog is now stale; clear it so the next
220
        // read-only transaction reloads the schema from the store.
221
        if sqlTx.mutatedCatalog {
5,211✔
222
                sqlTx.engine.invalidateCatalogCache()
528✔
223
        } else {
4,683✔
224
                // D2 Phase 2: opportunistically populate the engine catalog cache
4,155✔
225
                // from this RW tx's view. Subsequent RW txs then take the Clone
4,155✔
226
                // fast path (saves the per-NewTx catalog.load); option (a) from
4,155✔
227
                // the prior comment is now satisfied because NewTx's Clone branch
4,155✔
228
                // calls seedCatalogReadSet, restoring the MVCC read-set entries
4,155✔
229
                // that catalog.load would have produced.
4,155✔
230
                //
4,155✔
231
                // tryPopulateCatalogCache is no-op when the cache is already
4,155✔
232
                // warm or when an invalidation happened during this tx's
4,155✔
233
                // lifetime (cachedCatalogVersion mismatch).
4,155✔
234
                sqlTx.engine.tryPopulateCatalogCache(sqlTx.catalog, sqlTx.openCatalogVersion)
4,155✔
235
        }
4,155✔
236

237
        merr := multierr.NewMultiErr()
4,683✔
238

4,683✔
239
        for _, onCommitCallback := range sqlTx.onCommittedCallbacks {
4,697✔
240
                err := onCommitCallback(sqlTx)
14✔
241
                merr.Append(err)
14✔
242
        }
14✔
243

244
        return merr.Reduce()
4,683✔
245
}
246

247
func (sqlTx *SQLTx) Closed() bool {
26,004✔
248
        return sqlTx.tx.Closed()
26,004✔
249
}
26,004✔
250

251
func (sqlTx *SQLTx) delete(ctx context.Context, key []byte) error {
68✔
252
        return sqlTx.tx.Delete(ctx, key)
68✔
253
}
68✔
254

255
func (sqlTx *SQLTx) addOnCommittedCallback(callback onCommittedCallback) error {
15✔
256
        if callback == nil {
15✔
257
                return ErrIllegalArguments
×
258
        }
×
259

260
        sqlTx.onCommittedCallbacks = append(sqlTx.onCommittedCallbacks, callback)
15✔
261

15✔
262
        return nil
15✔
263
}
264

265
// createTempFile returns an os.CreateTemp("", "immudb") file and registers
266
// it with the SQLTx for observability + defensive cleanup. Ownership of
267
// the close/remove sits with the caller (e.g. fileSorter.Close), which
268
// must call deregisterTempFile to drop the registration before the
269
// surrounding tx Cancel/Commit closes it. This split avoids a race where
270
// the tx's deferred removeTempFiles would force-close a file the caller's
271
// row reader is still consuming — see embedded/sql/file_sort.go and the
272
// JOIN+GROUP+ORDER regression in joint_row_reader.go:198.
273
func (sqlTx *SQLTx) createTempFile() (*os.File, error) {
56✔
274
        tempFile, err := os.CreateTemp("", "immudb")
56✔
275
        if err == nil {
112✔
276
                sqlTx.tempFiles = append(sqlTx.tempFiles, tempFile)
56✔
277
        }
56✔
278
        return tempFile, err
56✔
279
}
280

281
// deregisterTempFile removes f from sqlTx.tempFiles so the deferred
282
// removeTempFiles in Cancel/Commit will not touch it. The caller is then
283
// responsible for closing and removing the file. Safe to call with a file
284
// that was never registered.
285
func (sqlTx *SQLTx) deregisterTempFile(f *os.File) {
54✔
286
        for i, tf := range sqlTx.tempFiles {
108✔
287
                if tf == f {
107✔
288
                        sqlTx.tempFiles = append(sqlTx.tempFiles[:i], sqlTx.tempFiles[i+1:]...)
53✔
289
                        return
53✔
290
                }
53✔
291
        }
292
}
293

294
// removeTempFiles closes and removes any temp files still registered with
295
// the tx. Run from Cancel/Commit as a defensive safety net for files the
296
// caller never explicitly closed.
297
func (sqlTx *SQLTx) removeTempFiles() error {
6,646✔
298
        for _, file := range sqlTx.tempFiles {
6,649✔
299
                err := file.Close()
3✔
300
                if err != nil {
3✔
301
                        return err
×
302
                }
×
303

304
                err = os.Remove(file.Name())
3✔
305
                if err != nil {
3✔
306
                        return err
×
307
                }
×
308
        }
309
        sqlTx.tempFiles = nil
6,646✔
310
        return nil
6,646✔
311
}
312

313
func (sqlTx *SQLTx) ListUsers(ctx context.Context) ([]User, error) {
13✔
314
        if sqlTx.engine.multidbHandler == nil {
16✔
315
                return nil, ErrUnspecifiedMultiDBHandler
3✔
316
        }
3✔
317
        return sqlTx.engine.multidbHandler.ListUsers(ctx)
10✔
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc