• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/embedded/sql/distinct_row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "context"
23
        "crypto/sha256"
24
        "io"
25
        "os"
26
        "sort"
27
)
28

29
// distinctRowReader emits each distinct (by sha256 over rendered cols)
30
// row from rowReader exactly once.
31
//
32
// Two modes:
33
//
34
//   - Memory-only (default): the in-memory dedup map can grow up to
35
//     SQLTx.distinctLimit() entries, after which the next unique row
36
//     returns ErrTooManyRows. Same behaviour as before D5.
37
//
38
//   - Spill-enabled (engine option WithDistinctSpillThreshold > 0): when
39
//     the in-memory map reaches the threshold, its digests are sorted
40
//     and merged with any existing spill file into a fresh sorted spill
41
//     on disk; the in-memory map is then reset. Subsequent dedup checks
42
//     consult both the in-memory map and the spill via binary search,
43
//     keeping resident memory bounded regardless of total result size.
44
type distinctRowReader struct {
45
        rowReader RowReader
46
        cols      []ColDescriptor
47

48
        // in-memory dedup set; resets to empty after each spill flush.
49
        readRows map[[sha256.Size]byte]struct{}
50

51
        // spillThreshold is the engine's WithDistinctSpillThreshold (0 disables).
52
        spillThreshold int
53

54
        // Sorted spill file: sequence of sha256.Size-byte digests in ascending
55
        // byte order. spillCount is the number of entries currently in it.
56
        spillFile  *os.File
57
        spillCount int
58
}
59

60
func newDistinctRowReader(ctx context.Context, rowReader RowReader) (*distinctRowReader, error) {
16✔
61
        cols, err := rowReader.Columns(ctx)
16✔
62
        if err != nil {
18✔
63
                return nil, err
2✔
64
        }
2✔
65

66
        // rowReader.Tx() may be nil under test fixtures (dummyRowReader);
67
        // gracefully fall back to spill-disabled in that case so existing
68
        // unit tests don't see a nil-deref regression.
69
        threshold := 0
14✔
70
        if tx := rowReader.Tx(); tx != nil {
27✔
71
                threshold = tx.distinctSpillThreshold()
13✔
72
        }
13✔
73

74
        return &distinctRowReader{
14✔
75
                rowReader:      rowReader,
14✔
76
                cols:           cols,
14✔
77
                readRows:       make(map[[sha256.Size]byte]struct{}),
14✔
78
                spillThreshold: threshold,
14✔
79
        }, nil
14✔
80
}
81

82
func (dr *distinctRowReader) onClose(callback func()) {
11✔
83
        dr.rowReader.onClose(callback)
11✔
84
}
11✔
85

86
func (dr *distinctRowReader) Tx() *SQLTx {
1✔
87
        return dr.rowReader.Tx()
1✔
88
}
1✔
89

90
func (dr *distinctRowReader) TableAlias() string {
1✔
91
        return dr.rowReader.TableAlias()
1✔
92
}
1✔
93

94
func (dr *distinctRowReader) Parameters() map[string]interface{} {
1✔
95
        return dr.rowReader.Parameters()
1✔
96
}
1✔
97

98
func (dr *distinctRowReader) OrderBy() []ColDescriptor {
1✔
99
        return dr.rowReader.OrderBy()
1✔
100
}
1✔
101

102
func (dr *distinctRowReader) ScanSpecs() *ScanSpecs {
1✔
103
        return dr.rowReader.ScanSpecs()
1✔
104
}
1✔
105

106
func (dr *distinctRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
9✔
107
        return dr.rowReader.Columns(ctx)
9✔
108
}
9✔
109

110
func (dr *distinctRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
1✔
111
        return dr.rowReader.colsBySelector(ctx)
1✔
112
}
1✔
113

114
func (dr *distinctRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
2✔
115
        return dr.rowReader.InferParameters(ctx, params)
2✔
116
}
2✔
117

118
func (dr *distinctRowReader) Read(ctx context.Context) (*Row, error) {
363✔
119
        for {
1,539✔
120
                // Memory cap only applies when spill is disabled — with spill, the
1,176✔
121
                // in-memory map resets on flush so the cap loses meaning.
1,176✔
122
                if dr.spillThreshold == 0 && len(dr.readRows) == dr.rowReader.Tx().distinctLimit() {
1,177✔
123
                        return nil, ErrTooManyRows
1✔
124
                }
1✔
125

126
                row, err := dr.rowReader.Read(ctx)
1,175✔
127
                if err != nil {
1,185✔
128
                        return nil, err
10✔
129
                }
10✔
130

131
                digest, err := row.digest(dr.cols)
1,165✔
132
                if err != nil {
1,165✔
133
                        return nil, err
×
134
                }
×
135

136
                if _, ok := dr.readRows[digest]; ok {
1,210✔
137
                        continue
45✔
138
                }
139

140
                if dr.spillFile != nil {
2,148✔
141
                        present, err := dr.spillContains(digest)
1,028✔
142
                        if err != nil {
1,028✔
NEW
143
                                return nil, err
×
NEW
144
                        }
×
145
                        if present {
1,796✔
146
                                continue
768✔
147
                        }
148
                }
149

150
                dr.readRows[digest] = struct{}{}
352✔
151

352✔
152
                if dr.spillThreshold > 0 && len(dr.readRows) >= dr.spillThreshold {
370✔
153
                        if err := dr.flushToSpill(); err != nil {
18✔
NEW
154
                                return nil, err
×
NEW
155
                        }
×
156
                }
157

158
                return row, nil
352✔
159
        }
160
}
161

162
// flushToSpill sorts the in-memory digest set and merges it with the
163
// existing spill file (if any) into a fresh sorted spill file. Resets
164
// the in-memory map. Old spill file is deregistered + removed.
165
//
166
// Memory cost: O(threshold) for the sort buffer.
167
// I/O cost per flush: O(spillCount + threshold) sequential read + write.
168
// Amortized cost per emitted row over many flushes: O(N / threshold)
169
// where N is the eventual total spill size — append-only with periodic
170
// merge yields the best lookup latency at the price of write amplification.
171
func (dr *distinctRowReader) flushToSpill() error {
18✔
172
        sortedNew := make([][sha256.Size]byte, 0, len(dr.readRows))
18✔
173
        for d := range dr.readRows {
306✔
174
                sortedNew = append(sortedNew, d)
288✔
175
        }
288✔
176
        sort.Slice(sortedNew, func(i, j int) bool {
1,187✔
177
                return bytes.Compare(sortedNew[i][:], sortedNew[j][:]) < 0
1,169✔
178
        })
1,169✔
179

180
        tx := dr.rowReader.Tx()
18✔
181
        newFile, err := tx.createTempFile()
18✔
182
        if err != nil {
18✔
NEW
183
                return err
×
NEW
184
        }
×
185

186
        w := bufio.NewWriter(newFile)
18✔
187
        mergedCount, err := mergeSortedDigests(w, dr.spillFile, dr.spillCount, sortedNew)
18✔
188
        if err != nil {
18✔
NEW
189
                newFile.Close()
×
NEW
190
                return err
×
NEW
191
        }
×
192
        if err := w.Flush(); err != nil {
18✔
NEW
193
                newFile.Close()
×
NEW
194
                return err
×
NEW
195
        }
×
196

197
        if dr.spillFile != nil {
34✔
198
                // Deregister + close + remove the old spill so the surrounding
16✔
199
                // SQLTx doesn't keep a stale handle.
16✔
200
                oldFile := dr.spillFile
16✔
201
                tx.deregisterTempFile(oldFile)
16✔
202
                _ = oldFile.Close()
16✔
203
                _ = os.Remove(oldFile.Name())
16✔
204
        }
16✔
205

206
        dr.spillFile = newFile
18✔
207
        dr.spillCount = mergedCount
18✔
208
        dr.readRows = make(map[[sha256.Size]byte]struct{})
18✔
209
        return nil
18✔
210
}
211

212
// mergeSortedDigests writes the merged union of an existing on-disk
213
// sorted spill (existingFile, existingCount entries each sha256.Size
214
// bytes) and an in-memory sorted slice (newDigests) to w. Returns the
215
// total number of entries written. Duplicates between the two streams
216
// are emitted exactly once.
217
func mergeSortedDigests(w *bufio.Writer, existingFile *os.File, existingCount int, newDigests [][sha256.Size]byte) (int, error) {
18✔
218
        written := 0
18✔
219

18✔
220
        if existingFile == nil {
20✔
221
                for _, d := range newDigests {
42✔
222
                        if _, err := w.Write(d[:]); err != nil {
40✔
NEW
223
                                return written, err
×
NEW
224
                        }
×
225
                        written++
40✔
226
                }
227
                return written, nil
2✔
228
        }
229

230
        if _, err := existingFile.Seek(0, io.SeekStart); err != nil {
16✔
NEW
231
                return 0, err
×
NEW
232
        }
×
233
        r := bufio.NewReader(existingFile)
16✔
234

16✔
235
        var existing [sha256.Size]byte
16✔
236
        haveExisting := false
16✔
237

16✔
238
        advanceExisting := func() error {
1,040✔
239
                if existingCount == 0 {
1,040✔
240
                        haveExisting = false
16✔
241
                        return nil
16✔
242
                }
16✔
243
                if _, err := io.ReadFull(r, existing[:]); err != nil {
1,008✔
NEW
244
                        return err
×
NEW
245
                }
×
246
                existingCount--
1,008✔
247
                haveExisting = true
1,008✔
248
                return nil
1,008✔
249
        }
250

251
        if err := advanceExisting(); err != nil {
16✔
NEW
252
                return 0, err
×
NEW
253
        }
×
254

255
        i := 0
16✔
256
        for haveExisting && i < len(newDigests) {
1,208✔
257
                c := bytes.Compare(existing[:], newDigests[i][:])
1,192✔
258
                switch {
1,192✔
259
                case c < 0:
949✔
260
                        if _, err := w.Write(existing[:]); err != nil {
949✔
NEW
261
                                return written, err
×
NEW
262
                        }
×
263
                        written++
949✔
264
                        if err := advanceExisting(); err != nil {
949✔
NEW
265
                                return written, err
×
NEW
266
                        }
×
267
                case c > 0:
243✔
268
                        if _, err := w.Write(newDigests[i][:]); err != nil {
243✔
NEW
269
                                return written, err
×
NEW
270
                        }
×
271
                        written++
243✔
272
                        i++
243✔
NEW
273
                default: // equal
×
NEW
274
                        if _, err := w.Write(existing[:]); err != nil {
×
NEW
275
                                return written, err
×
NEW
276
                        }
×
NEW
277
                        written++
×
NEW
278
                        if err := advanceExisting(); err != nil {
×
NEW
279
                                return written, err
×
NEW
280
                        }
×
NEW
281
                        i++
×
282
                }
283
        }
284

285
        for haveExisting {
75✔
286
                if _, err := w.Write(existing[:]); err != nil {
59✔
NEW
287
                        return written, err
×
NEW
288
                }
×
289
                written++
59✔
290
                if err := advanceExisting(); err != nil {
59✔
NEW
291
                        return written, err
×
NEW
292
                }
×
293
        }
294
        for ; i < len(newDigests); i++ {
21✔
295
                if _, err := w.Write(newDigests[i][:]); err != nil {
5✔
NEW
296
                        return written, err
×
NEW
297
                }
×
298
                written++
5✔
299
        }
300
        return written, nil
16✔
301
}
302

303
// spillContains binary-searches the on-disk sorted spill file for digest.
304
// Each entry is sha256.Size bytes; offset = idx * sha256.Size.
305
func (dr *distinctRowReader) spillContains(digest [sha256.Size]byte) (bool, error) {
1,028✔
306
        lo, hi := 0, dr.spillCount
1,028✔
307
        var buf [sha256.Size]byte
1,028✔
308

1,028✔
309
        for lo < hi {
7,786✔
310
                mid := int(uint(lo+hi) >> 1)
6,758✔
311
                if _, err := dr.spillFile.ReadAt(buf[:], int64(mid)*sha256.Size); err != nil {
6,758✔
NEW
312
                        return false, err
×
NEW
313
                }
×
314
                c := bytes.Compare(buf[:], digest[:])
6,758✔
315
                switch {
6,758✔
316
                case c == 0:
768✔
317
                        return true, nil
768✔
318
                case c < 0:
2,780✔
319
                        lo = mid + 1
2,780✔
320
                default:
3,210✔
321
                        hi = mid
3,210✔
322
                }
323
        }
324
        return false, nil
260✔
325
}
326

327
func (dr *distinctRowReader) Close() error {
13✔
328
        // The spill file (if present) is registered with the SQLTx via
13✔
329
        // createTempFile so the surrounding tx Cancel/Commit will clean it up
13✔
330
        // alongside our own deregister-on-flush pattern. No extra work needed
13✔
331
        // here — leaving the close to the tx avoids a double-close on the
13✔
332
        // remove-then-close ordering between flushToSpill and Close.
13✔
333
        return dr.rowReader.Close()
13✔
334
}
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc