• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.54
/pkg/client/sql.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package client
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/sha256"
23
        "encoding/binary"
24
        "io"
25

26
        "github.com/codenotary/immudb/pkg/client/errors"
27

28
        "github.com/codenotary/immudb/embedded/sql"
29
        "github.com/codenotary/immudb/embedded/store"
30
        "github.com/codenotary/immudb/pkg/api/schema"
31
        "google.golang.org/protobuf/types/known/emptypb"
32
)
33

34
const SQLPrefix byte = 2
35

36
// SQLExec performs a modifying SQL query within the transaction.
37
// Such query does not return SQL result.
38
func (c *immuClient) SQLExec(ctx context.Context, sql string, params map[string]interface{}) (*schema.SQLExecResult, error) {
114✔
39
        if !c.IsConnected() {
119✔
40
                return nil, errors.FromError(ErrNotConnected)
5✔
41
        }
5✔
42

43
        namedParams, err := schema.EncodeParams(params)
109✔
44
        if err != nil {
110✔
45
                return nil, err
1✔
46
        }
1✔
47

48
        return c.ServiceClient.SQLExec(ctx, &schema.SQLExecRequest{Sql: sql, Params: namedParams})
108✔
49
}
50

51
// SQLQuery performs a query (read-only) operation.
52
//
53
// Deprecated: Use method SQLQueryReader instead.
54
//
55
// The renewSnapshot parameter is deprecated and is ignored by the server.
56
func (c *immuClient) SQLQuery(ctx context.Context, sql string, params map[string]interface{}, renewSnapshot bool) (*schema.SQLQueryResult, error) {
37✔
57
        if !c.IsConnected() {
38✔
58
                return nil, errors.FromError(ErrNotConnected)
1✔
59
        }
1✔
60

61
        stream, err := c.sqlQuery(ctx, sql, params, false)
36✔
62
        if err != nil {
38✔
63
                return nil, err
2✔
64
        }
2✔
65

66
        res, err := stream.Recv()
34✔
67
        if err != nil {
34✔
68
                return nil, errors.FromError(err)
×
69
        }
×
70

71
        if _, err := stream.Recv(); err != io.EOF {
35✔
72
                return res, errors.FromError(err)
1✔
73
        }
1✔
74
        return res, nil
33✔
75
}
76

77
// SQLQueryReader submits an SQL query to the server and returns a reader object for efficient retrieval of all rows in the result set.
78
func (c *immuClient) SQLQueryReader(ctx context.Context, sql string, params map[string]interface{}) (SQLQueryRowReader, error) {
15✔
79
        if !c.IsConnected() {
15✔
80
                return nil, errors.FromError(ErrNotConnected)
×
81
        }
×
82

83
        cancelCtx, cancel := context.WithCancel(ctx)
15✔
84
        stream, err := c.sqlQuery(cancelCtx, sql, params, true)
15✔
85
        if err != nil {
15✔
NEW
86
                cancel()
×
87
                return nil, err
×
88
        }
×
89
        return newSQLQueryRowReader(stream, cancel)
15✔
90
}
91

92
func (c *immuClient) sqlQuery(ctx context.Context, sql string, params map[string]interface{}, acceptStream bool) (schema.ImmuService_SQLQueryClient, error) {
51✔
93
        if !c.IsConnected() {
51✔
94
                return nil, errors.FromError(ErrNotConnected)
×
95
        }
×
96

97
        namedParams, err := schema.EncodeParams(params)
51✔
98
        if err != nil {
52✔
99
                return nil, errors.FromError(err)
1✔
100
        }
1✔
101

102
        stream, err := c.ServiceClient.SQLQuery(ctx, &schema.SQLQueryRequest{Sql: sql, Params: namedParams, AcceptStream: acceptStream})
50✔
103
        return stream, errors.FromError(err)
50✔
104
}
105

106
// ListTables returns a list of SQL tables.
107
func (c *immuClient) ListTables(ctx context.Context) (*schema.SQLQueryResult, error) {
2✔
108
        if !c.IsConnected() {
3✔
109
                return nil, errors.FromError(ErrNotConnected)
1✔
110
        }
1✔
111
        return c.ServiceClient.ListTables(ctx, &emptypb.Empty{})
1✔
112
}
113

114
// Describe table returns a description of a table structure.
115
func (c *immuClient) DescribeTable(ctx context.Context, tableName string) (*schema.SQLQueryResult, error) {
2✔
116
        if !c.IsConnected() {
3✔
117
                return nil, errors.FromError(ErrNotConnected)
1✔
118
        }
1✔
119
        return c.ServiceClient.DescribeTable(ctx, &schema.Table{TableName: tableName})
1✔
120
}
121

122
// VerifyRow reads a single row from the database with additional validation of server-provided proof.
123
//
124
// The row parameter should contain row from a single table, either returned from
125
// query or manually assembled. The table parameter contains the name of the table
126
// where the row comes from. The pkVals argument is an array containing values for
127
// the primary key of the row. The row parameter does not have to contain all
128
// columns of the table. Once the row itself is verified, only those columns that
129
// are in the row will be compared against the verified row retrieved from the database.
130
func (c *immuClient) VerifyRow(ctx context.Context, row *schema.Row, table string, pkVals []*schema.SQLValue) error {
49✔
131
        if row == nil || len(table) == 0 || len(pkVals) == 0 {
50✔
132
                return ErrIllegalArguments
1✔
133
        }
1✔
134

135
        if len(row.Columns) == 0 || len(row.Columns) != len(row.Values) {
49✔
136
                return sql.ErrCorruptedData
1✔
137
        }
1✔
138

139
        if !c.IsConnected() {
48✔
140
                return ErrNotConnected
1✔
141
        }
1✔
142

143
        err := c.StateService.CacheLock()
46✔
144
        if err != nil {
46✔
145
                return err
×
146
        }
×
147
        defer c.StateService.CacheUnlock()
46✔
148

46✔
149
        state, err := c.StateService.GetState(ctx, c.currentDatabase())
46✔
150
        if err != nil {
46✔
151
                return err
×
152
        }
×
153

154
        vEntry, err := c.ServiceClient.VerifiableSQLGet(ctx, &schema.VerifiableSQLGetRequest{
46✔
155
                SqlGetRequest: &schema.SQLGetRequest{Table: table, PkValues: pkVals},
46✔
156
                ProveSinceTx:  state.TxId,
46✔
157
        })
46✔
158
        if err != nil {
46✔
159
                return err
×
160
        }
×
161

162
        if len(vEntry.PKIDs) < len(pkVals) {
46✔
163
                return ErrIllegalArguments
×
164
        }
×
165

166
        entrySpecDigest, err := store.EntrySpecDigestFor(int(vEntry.VerifiableTx.Tx.Header.Version))
46✔
167
        if err != nil {
46✔
168
                return err
×
169
        }
×
170

171
        inclusionProof := schema.InclusionProofFromProto(vEntry.InclusionProof)
46✔
172
        dualProof := schema.DualProofFromProto(vEntry.VerifiableTx.DualProof)
46✔
173

46✔
174
        var eh [sha256.Size]byte
46✔
175

46✔
176
        var sourceID, targetID uint64
46✔
177
        var sourceAlh, targetAlh [sha256.Size]byte
46✔
178

46✔
179
        vTx := vEntry.SqlEntry.Tx
46✔
180

46✔
181
        dbID := vEntry.DatabaseId
46✔
182
        tableID := vEntry.TableId
46✔
183

46✔
184
        valbuf := bytes.Buffer{}
46✔
185

46✔
186
        for i, pkVal := range pkVals {
92✔
187
                pkID := vEntry.PKIDs[i]
46✔
188

46✔
189
                pkType, ok := vEntry.ColTypesById[pkID]
46✔
190
                if !ok {
46✔
191
                        return sql.ErrCorruptedData
×
192
                }
×
193

194
                pkLen, ok := vEntry.ColLenById[pkID]
46✔
195
                if !ok {
46✔
196
                        return sql.ErrCorruptedData
×
197
                }
×
198

199
                pkEncVal, _, err := sql.EncodeRawValueAsKey(schema.RawValue(pkVal), pkType, int(pkLen))
46✔
200
                if err != nil {
46✔
201
                        return err
×
202
                }
×
203

204
                _, err = valbuf.Write(pkEncVal)
46✔
205
                if err != nil {
46✔
206
                        return err
×
207
                }
×
208
        }
209

210
        pkKey := sql.MapKey(
46✔
211
                []byte{SQLPrefix},
46✔
212
                sql.RowPrefix,
46✔
213
                sql.EncodeID(dbID),
46✔
214
                sql.EncodeID(tableID),
46✔
215
                sql.EncodeID(sql.PKIndexID),
46✔
216
                valbuf.Bytes())
46✔
217

46✔
218
        decodedRow, err := decodeRow(vEntry.SqlEntry.Value, vEntry.ColTypesById, vEntry.MaxColId)
46✔
219
        if err != nil {
46✔
220
                return err
×
221
        }
×
222

223
        err = verifyRowAgainst(row, decodedRow, vEntry.ColIdsByName)
46✔
224
        if err != nil {
49✔
225
                return err
3✔
226
        }
3✔
227

228
        e := &store.EntrySpec{Key: pkKey, Value: vEntry.SqlEntry.Value}
43✔
229

43✔
230
        if state.TxId <= vTx {
77✔
231
                eh = schema.DigestFromProto(vEntry.VerifiableTx.DualProof.TargetTxHeader.EH)
34✔
232

34✔
233
                sourceID = state.TxId
34✔
234
                sourceAlh = schema.DigestFromProto(state.TxHash)
34✔
235
                targetID = vTx
34✔
236
                targetAlh = dualProof.TargetTxHeader.Alh()
34✔
237
        } else {
43✔
238
                eh = schema.DigestFromProto(vEntry.VerifiableTx.DualProof.SourceTxHeader.EH)
9✔
239

9✔
240
                sourceID = vTx
9✔
241
                sourceAlh = dualProof.SourceTxHeader.Alh()
9✔
242
                targetID = state.TxId
9✔
243
                targetAlh = schema.DigestFromProto(state.TxHash)
9✔
244
        }
9✔
245

246
        verifies := store.VerifyInclusion(
43✔
247
                inclusionProof,
43✔
248
                entrySpecDigest(e),
43✔
249
                eh)
43✔
250
        if !verifies {
43✔
251
                return store.ErrCorruptedData
×
252
        }
×
253

254
        if state.TxId > 0 {
86✔
255
                err := c.verifyDualProof(
43✔
256
                        ctx,
43✔
257
                        dualProof,
43✔
258
                        sourceID,
43✔
259
                        targetID,
43✔
260
                        sourceAlh,
43✔
261
                        targetAlh,
43✔
262
                )
43✔
263
                if err != nil {
43✔
264
                        return err
×
265
                }
×
266
        }
267

268
        newState := &schema.ImmutableState{
43✔
269
                Db:        c.currentDatabase(),
43✔
270
                TxId:      targetID,
43✔
271
                TxHash:    targetAlh[:],
43✔
272
                Signature: vEntry.VerifiableTx.Signature,
43✔
273
        }
43✔
274

43✔
275
        if c.serverSigningPubKey != nil {
62✔
276
                err := newState.CheckSignature(c.serverSigningPubKey)
19✔
277
                if err != nil {
19✔
278
                        return err
×
279
                }
×
280
        }
281

282
        err = c.StateService.SetState(c.currentDatabase(), newState)
43✔
283
        if err != nil {
43✔
284
                return err
×
285
        }
×
286

287
        return nil
43✔
288
}
289

290
func verifyRowAgainst(row *schema.Row, decodedRow map[uint32]*schema.SQLValue, colIdsByName map[string]uint32) error {
53✔
291
        for i, colName := range row.Columns {
169✔
292
                colID, ok := colIdsByName[colName]
116✔
293
                if !ok {
119✔
294
                        return sql.ErrColumnDoesNotExist
3✔
295
                }
3✔
296

297
                val := row.Values[i]
113✔
298

113✔
299
                if val == nil || val.Value == nil {
114✔
300
                        return sql.ErrCorruptedData
1✔
301
                }
1✔
302

303
                decodedVal, ok := decodedRow[colID]
112✔
304
                if !ok {
137✔
305
                        _, isNull := val.Value.(*schema.SQLValue_Null)
25✔
306
                        if isNull {
49✔
307
                                continue
24✔
308
                        }
309
                        return sql.ErrCorruptedData
1✔
310
                }
311

312
                if decodedVal == nil || decodedVal.Value == nil {
88✔
313
                        return sql.ErrCorruptedData
1✔
314
                }
1✔
315

316
                equals, err := val.Value.(schema.SqlValue).Equal(decodedVal.Value.(schema.SqlValue))
86✔
317
                if err != nil {
87✔
318
                        return err
1✔
319
                }
1✔
320
                if !equals {
87✔
321
                        return sql.ErrCorruptedData
2✔
322
                }
2✔
323
        }
324

325
        return nil
44✔
326
}
327

328
func decodeRow(encodedRow []byte, colTypes map[uint32]sql.SQLValueType, maxColID uint32) (map[uint32]*schema.SQLValue, error) {
51✔
329
        off := 0
51✔
330

51✔
331
        if len(encodedRow) < off+sql.EncLenLen {
53✔
332
                return nil, sql.ErrCorruptedData
2✔
333
        }
2✔
334

335
        colsCount := binary.BigEndian.Uint32(encodedRow[off:])
49✔
336
        off += sql.EncLenLen
49✔
337

49✔
338
        values := make(map[uint32]*schema.SQLValue, colsCount)
49✔
339

49✔
340
        for i := 0; i < int(colsCount); i++ {
161✔
341
                if len(encodedRow) < off+sql.EncIDLen {
113✔
342
                        return nil, sql.ErrCorruptedData
1✔
343
                }
1✔
344

345
                colID := binary.BigEndian.Uint32(encodedRow[off:])
111✔
346
                off += sql.EncIDLen
111✔
347

111✔
348
                colType, ok := colTypes[colID]
111✔
349
                if !ok {
114✔
350
                        // Support for dropped columns
3✔
351
                        if colID > maxColID {
4✔
352
                                return nil, sql.ErrCorruptedData
1✔
353
                        }
1✔
354

355
                        vlen, voff, err := sql.DecodeValueLength(encodedRow[off:])
2✔
356
                        if err != nil {
3✔
357
                                return nil, err
1✔
358
                        }
1✔
359

360
                        off += vlen
1✔
361
                        off += voff
1✔
362
                        continue
1✔
363
                }
364

365
                val, n, err := sql.DecodeValue(encodedRow[off:], colType)
108✔
366
                if err != nil {
108✔
367
                        return nil, err
×
368
                }
×
369

370
                values[colID] = schema.TypedValueToRowValue(val)
108✔
371
                off += n
108✔
372
        }
373

374
        return values, nil
46✔
375
}
376

377
type Row []interface{}
378

379
type Column struct {
380
        Type string
381
        Name string
382
}
383

384
type SQLQueryRowReader interface {
385
        // Columns returns the set of columns
386
        Columns() []Column
387

388
        // Next() prepares the subsequent row for retrieval, indicating availability with a returned value of true.
389
        // Any encountered IO errors will be deferred until subsequent calls to Read() or Close(), prompting the function to return false.
390
        Next() bool
391

392
        // Read retrieves the current row as a slice of values.
393
        //
394
        // It's important to note that successive calls to Read() may recycle the same slice, necessitating copying to retain its contents.
395
        Read() (Row, error)
396

397
        // Close closes the reader. Subsequent calls to Next() or Read() will return an error.
398
        Close() error
399
}
400

401
type rowReader struct {
402
        stream schema.ImmuService_SQLQueryClient
403
        cancel context.CancelFunc
404

405
        cols []Column
406
        rows []*schema.Row
407
        row  Row
408

409
        nextRow int
410
        closed  bool
411
        err     error
412
}
413

414
func newSQLQueryRowReader(stream schema.ImmuService_SQLQueryClient, cancel context.CancelFunc) (*rowReader, error) {
17✔
415
        res, err := stream.Recv()
17✔
416
        if err != nil {
21✔
417
                return nil, errors.FromError(err)
4✔
418
        }
4✔
419

420
        return &rowReader{
13✔
421
                stream:  stream,
13✔
422
                cancel:  cancel,
13✔
423
                rows:    res.Rows,
13✔
424
                row:     make(Row, len(res.Columns)),
13✔
425
                nextRow: -1,
13✔
426
                cols:    fromProtoCols(res.Columns),
13✔
427
        }, nil
13✔
428
}
429

430
func fromProtoCols(columns []*schema.Column) []Column {
13✔
431
        cols := make([]Column, len(columns))
13✔
432
        for i, col := range columns {
61✔
433
                cols[i] = Column{Type: col.Type, Name: col.Name}
48✔
434
        }
48✔
435
        return cols
13✔
436
}
437

438
func (it *rowReader) Columns() []Column {
9✔
439
        return it.cols
9✔
440
}
9✔
441

442
func (it *rowReader) Next() bool {
35✔
443
        if it.closed {
36✔
444
                return false
1✔
445
        }
1✔
446

447
        if it.nextRow+1 < len(it.rows) {
56✔
448
                it.nextRow++
22✔
449
                return true
22✔
450
        }
22✔
451

452
        if err := it.fetchRows(); err != nil {
16✔
453
                it.err = err
4✔
454
                return false
4✔
455
        }
4✔
456

457
        it.nextRow = 0
8✔
458
        return true
8✔
459
}
460

461
func (it *rowReader) Read() (Row, error) {
32✔
462
        if it.closed {
33✔
463
                return nil, sql.ErrAlreadyClosed
1✔
464
        }
1✔
465

466
        if it.err != nil {
32✔
467
                return nil, it.err
1✔
468
        }
1✔
469

470
        if it.nextRow < 0 {
31✔
471
                return nil, errors.New("Read called without calling Next")
1✔
472
        }
1✔
473

474
        protoRow := it.rows[it.nextRow]
29✔
475
        for i, protoVal := range protoRow.Values {
112✔
476
                val := schema.RawValue(protoVal)
83✔
477
                it.row[i] = val
83✔
478
        }
83✔
479
        return it.row, nil
29✔
480
}
481

482
func (it *rowReader) fetchRows() error {
12✔
483
        res, err := it.stream.Recv()
12✔
484
        if err == io.EOF {
16✔
485
                it.cancel()
4✔
486
                return sql.ErrNoMoreRows
4✔
487
        }
4✔
488

489
        if err != nil {
8✔
NEW
490
                it.cancel()
×
NEW
491
                return errors.FromError(err)
×
UNCOV
492
        }
×
493

494
        it.rows = res.Rows
8✔
495
        return nil
8✔
496
}
497

498
func (it *rowReader) Close() error {
11✔
499
        if it.closed {
12✔
500
                return sql.ErrAlreadyClosed
1✔
501
        }
1✔
502

503
        // Cancel the derived context so the underlying gRPC stream is signalled
504
        // even when the consumer abandons the reader before reaching EOF.
505
        // fetchRows already calls cancel() on EOF/error, so this is a no-op in
506
        // the normal completion path but essential for early-close callers.
507
        it.cancel()
10✔
508
        it.stream = nil
10✔
509
        it.closed = true
10✔
510
        it.rows = nil
10✔
511
        it.nextRow = 0
10✔
512

10✔
513
        if it.err == sql.ErrNoMoreRows {
12✔
514
                return nil
2✔
515
        }
2✔
516
        return it.err
8✔
517
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc