• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.47
/embedded/sql/file_sort.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "encoding/binary"
23
        "io"
24
        "os"
25
        "sort"
26
)
27

28
type sortedChunk struct {
29
        offset uint64
30
        size   uint64
31
}
32

33
type fileSorter struct {
34
        colPosBySelector map[string]int
35
        colTypes         []string
36
        cmp              func(r1, r2 *Row) (int, error)
37

38
        tx          *SQLTx
39
        sortBufSize int
40
        sortBuf     []*Row
41
        nextIdx     int
42

43
        tempFile     *os.File
44
        writer       *bufio.Writer
45
        tempFileSize uint64
46

47
        // allTempFiles tracks every os.File that fileSorter has opened so they
48
        // can be closed and removed when the consuming reader chain calls
49
        // Close(). Lifecycle is owned by fileSorter (not the surrounding SQLTx)
50
        // so the files survive a tx Cancel that fires mid-iteration of the
51
        // returned resultReader — see the JOIN+GROUP+ORDER race documented at
52
        // embedded/sql/joint_row_reader.go:198.
53
        allTempFiles []*os.File
54

55
        chunksToMerge []sortedChunk
56
}
57

58
func (s *fileSorter) update(r *Row) error {
27,267✔
59
        if s.nextIdx == s.sortBufSize {
28,499✔
60
                err := s.sortAndFlushBuffer()
1,232✔
61
                if err != nil {
1,232✔
62
                        return err
×
63
                }
×
64
                s.nextIdx = 0
1,232✔
65
        }
66

67
        s.sortBuf[s.nextIdx] = r
27,267✔
68
        s.nextIdx++
27,267✔
69

27,267✔
70
        return nil
27,267✔
71
}
72

73
func (s *fileSorter) finalize() (resultReader, error) {
72✔
74
        if s.nextIdx > 0 {
144✔
75
                if err := s.sortBuffer(); err != nil {
73✔
76
                        return nil, err
1✔
77
                }
1✔
78
        }
79

80
        // result rows are all in memory
81
        if len(s.chunksToMerge) == 0 {
124✔
82
                return &bufferResultReader{
53✔
83
                        sortBuf: s.sortBuf[:s.nextIdx],
53✔
84
                }, nil
53✔
85
        }
53✔
86

87
        err := s.flushBuffer()
18✔
88
        if err != nil {
18✔
89
                return nil, err
×
90
        }
×
91

92
        err = s.writer.Flush()
18✔
93
        if err != nil {
18✔
94
                return nil, err
×
95
        }
×
96
        reader, err := s.mergeAllChunks()
18✔
97
        if err != nil {
18✔
NEW
98
                // merge failed — make sure we don't leak temp files we opened.
×
NEW
99
                s.Close()
×
NEW
100
                return nil, err
×
NEW
101
        }
×
102
        return reader, nil
18✔
103
}
104

105
// Close closes and removes every temp file opened by this sorter, after
106
// first deregistering them from the SQLTx so the tx's deferred
107
// removeTempFiles won't double-close. Safe to call multiple times.
108
func (s *fileSorter) Close() error {
18✔
109
        var firstErr error
18✔
110
        for _, f := range s.allTempFiles {
54✔
111
                if s.tx != nil {
72✔
112
                        s.tx.deregisterTempFile(f)
36✔
113
                }
36✔
114
                if err := f.Close(); err != nil && firstErr == nil {
36✔
NEW
115
                        firstErr = err
×
NEW
116
                }
×
117
                if err := os.Remove(f.Name()); err != nil && firstErr == nil {
36✔
NEW
118
                        firstErr = err
×
NEW
119
                }
×
120
        }
121
        s.allTempFiles = nil
18✔
122
        return firstErr
18✔
123
}
124

125
func (s *fileSorter) mergeAllChunks() (resultReader, error) {
18✔
126
        currFile := s.tempFile
18✔
127

18✔
128
        outFile, err := s.tx.createTempFile()
18✔
129
        if err != nil {
18✔
130
                return nil, err
×
131
        }
×
132
        s.allTempFiles = append(s.allTempFiles, outFile)
18✔
133

18✔
134
        lbuf := &bufio.Reader{}
18✔
135
        rbuf := &bufio.Reader{}
18✔
136

18✔
137
        lr := &fileRowReader{
18✔
138
                colPosBySelector: s.colPosBySelector,
18✔
139
                colTypes:         s.colTypes,
18✔
140
                reader:           lbuf,
18✔
141
                reuseRow:         true,
18✔
142
        }
18✔
143
        rr := &fileRowReader{
18✔
144
                colPosBySelector: s.colPosBySelector,
18✔
145
                colTypes:         s.colTypes,
18✔
146
                reader:           rbuf,
18✔
147
                reuseRow:         true,
18✔
148
        }
18✔
149

18✔
150
        chunks := s.chunksToMerge
18✔
151
        for len(chunks) > 1 {
132✔
152
                s.writer.Reset(outFile)
114✔
153

114✔
154
                var offset uint64
114✔
155

114✔
156
                newChunks := make([]sortedChunk, (len(chunks)+1)/2)
114✔
157
                for i := 0; i < len(chunks)/2; i++ {
1,346✔
158
                        c1 := chunks[i*2]
1,232✔
159
                        c2 := chunks[i*2+1]
1,232✔
160

1,232✔
161
                        lbuf.Reset(io.NewSectionReader(currFile, int64(c1.offset), int64(c1.size)))
1,232✔
162
                        rbuf.Reset(io.NewSectionReader(currFile, int64(c2.offset), int64(c2.size)))
1,232✔
163

1,232✔
164
                        err := s.mergeChunks(lr, rr, s.writer)
1,232✔
165
                        if err != nil {
1,232✔
166
                                return nil, err
×
167
                        }
×
168

169
                        newChunks[i] = sortedChunk{
1,232✔
170
                                offset: offset,
1,232✔
171
                                size:   c1.size + c2.size,
1,232✔
172
                        }
1,232✔
173
                        offset += c1.size + c2.size
1,232✔
174
                }
175

176
                err := s.writer.Flush()
114✔
177
                if err != nil {
114✔
178
                        return nil, err
×
179
                }
×
180

181
                if len(chunks)%2 != 0 { // copy last sorted chunk
178✔
182
                        lastChunk := chunks[len(chunks)-1]
64✔
183

64✔
184
                        _, err := io.Copy(outFile, io.NewSectionReader(currFile, int64(lastChunk.offset), int64(lastChunk.size)))
64✔
185
                        if err != nil {
64✔
186
                                return nil, err
×
187
                        }
×
188
                        newChunks[len(chunks)/2] = lastChunk
64✔
189
                }
190

191
                temp := currFile
114✔
192
                currFile = outFile
114✔
193
                outFile = temp
114✔
194

114✔
195
                _, err = outFile.Seek(0, io.SeekStart)
114✔
196
                if err != nil {
114✔
197
                        return nil, err
×
198
                }
×
199

200
                chunks = newChunks
114✔
201
        }
202

203
        return &fileRowReader{
18✔
204
                colTypes:         s.colTypes,
18✔
205
                colPosBySelector: s.colPosBySelector,
18✔
206
                reader:           bufio.NewReader(io.NewSectionReader(currFile, 0, int64(s.tempFileSize))),
18✔
207
                closer:           s.Close,
18✔
208
        }, nil
18✔
209
}
210

211
func (s *fileSorter) mergeChunks(lr, rr *fileRowReader, writer io.Writer) error {
1,232✔
212
        var err error
1,232✔
213
        var lrAtEOF bool
1,232✔
214
        var r1, r2 *Row
1,232✔
215

1,232✔
216
        for {
96,752✔
217
                if r1 == nil {
150,582✔
218
                        r1, err = lr.Read()
55,062✔
219
                        if err == ErrNoMoreRows {
55,697✔
220
                                lrAtEOF = true
635✔
221
                                break
635✔
222
                        }
223

224
                        if err != nil {
54,427✔
225
                                return err
×
226
                        }
×
227
                }
228

229
                if r2 == nil {
136,575✔
230
                        r2, err = rr.Read()
41,690✔
231
                        if err == ErrNoMoreRows {
42,287✔
232
                                break
597✔
233
                        }
234

235
                        if err != nil {
41,093✔
236
                                return err
×
237
                        }
×
238
                }
239

240
                var rawData []byte
94,288✔
241
                res, err := s.cmp(r1, r2)
94,288✔
242
                if err != nil {
94,288✔
243
                        return err
×
244
                }
×
245

246
                if res < 0 {
148,118✔
247
                        rawData = lr.rowBuf.Bytes()
53,830✔
248
                        r1 = nil
53,830✔
249
                } else {
94,288✔
250
                        rawData = rr.rowBuf.Bytes()
40,458✔
251
                        r2 = nil
40,458✔
252
                }
40,458✔
253

254
                _, err = writer.Write(rawData)
94,288✔
255
                if err != nil {
94,288✔
256
                        return err
×
257
                }
×
258
        }
259

260
        readerToCopy := lr
1,232✔
261
        if lrAtEOF {
1,867✔
262
                readerToCopy = rr
635✔
263
        }
635✔
264

265
        _, err = writer.Write(readerToCopy.rowBuf.Bytes())
1,232✔
266
        if err != nil {
1,232✔
267
                return err
×
268
        }
×
269

270
        _, err = io.Copy(writer, readerToCopy.reader)
1,232✔
271
        return err
1,232✔
272
}
273

274
type resultReader interface {
275
        Read() (*Row, error)
276
        // Close releases any temp files or other resources owned by the reader.
277
        // Safe to call multiple times. Implementations that have nothing to
278
        // release return nil.
279
        Close() error
280
}
281

282
type bufferResultReader struct {
283
        sortBuf []*Row
284
        nextIdx int
285
}
286

287
func (r *bufferResultReader) Read() (*Row, error) {
12,379✔
288
        if r.nextIdx == len(r.sortBuf) {
12,426✔
289
                return nil, ErrNoMoreRows
47✔
290
        }
47✔
291

292
        row := r.sortBuf[r.nextIdx]
12,332✔
293
        r.nextIdx++
12,332✔
294
        return row, nil
12,332✔
295
}
296

297
func (r *bufferResultReader) Close() error {
55✔
298
        return nil
55✔
299
}
55✔
300

301
type fileRowReader struct {
302
        colPosBySelector map[string]int
303
        colTypes         []SQLValueType
304
        reader           io.Reader
305
        rowBuf           bytes.Buffer
306
        row              *Row
307
        reuseRow         bool
308
        closer           func() error
309
}
310

311
func (r *fileRowReader) Close() error {
18✔
312
        if r.closer == nil {
18✔
NEW
313
                return nil
×
NEW
314
        }
×
315
        c := r.closer
18✔
316
        r.closer = nil
18✔
317
        return c()
18✔
318
}
319

320
func (r *fileRowReader) readValues(out []TypedValue) error {
111,572✔
321
        var size uint16
111,572✔
322
        err := binary.Read(r.reader, binary.BigEndian, &size)
111,572✔
323
        if err != nil {
112,821✔
324
                return err
1,249✔
325
        }
1,249✔
326

327
        r.rowBuf.Reset()
110,323✔
328

110,323✔
329
        binary.Write(&r.rowBuf, binary.BigEndian, &size)
110,323✔
330

110,323✔
331
        _, err = io.CopyN(&r.rowBuf, r.reader, int64(size))
110,323✔
332
        if err != nil {
110,323✔
333
                return err
×
334
        }
×
335

336
        data := r.rowBuf.Bytes()
110,323✔
337
        return decodeValues(data[2:], r.colTypes, out)
110,323✔
338
}
339

340
func (r *fileRowReader) Read() (*Row, error) {
111,572✔
341
        row := r.getRow()
111,572✔
342

111,572✔
343
        err := r.readValues(row.ValuesByPosition)
111,572✔
344
        if err == io.EOF {
112,821✔
345
                return nil, ErrNoMoreRows
1,249✔
346
        }
1,249✔
347
        if err != nil {
110,323✔
348
                return nil, err
×
349
        }
×
350

351
        for sel, pos := range r.colPosBySelector {
748,823✔
352
                row.ValuesBySelector[sel] = row.ValuesByPosition[pos]
638,500✔
353
        }
638,500✔
354
        return row, nil
110,323✔
355
}
356

357
func (r *fileRowReader) getRow() *Row {
111,572✔
358
        row := r.row
111,572✔
359
        if row == nil || !r.reuseRow {
126,428✔
360
                row = &Row{
14,856✔
361
                        ValuesByPosition: make([]TypedValue, len(r.colPosBySelector)),
14,856✔
362
                        ValuesBySelector: make(map[string]TypedValue, len(r.colPosBySelector)),
14,856✔
363
                }
14,856✔
364
                r.row = row
14,856✔
365
        }
14,856✔
366
        return row
111,572✔
367
}
368

369
func decodeValues(data []byte, colTypes []SQLValueType, out []TypedValue) error {
110,323✔
370
        var voff int
110,323✔
371
        for i, col := range colTypes {
748,823✔
372
                v, n, err := DecodeNullableValue(data[voff:], col)
638,500✔
373
                if err != nil {
638,500✔
374
                        return err
×
375
                }
×
376
                voff += n
638,500✔
377

638,500✔
378
                out[i] = v
638,500✔
379
        }
380
        return nil
110,323✔
381
}
382

383
func (s *fileSorter) sortAndFlushBuffer() error {
1,232✔
384
        if err := s.sortBuffer(); err != nil {
1,232✔
385
                return err
×
386
        }
×
387
        return s.flushBuffer()
1,232✔
388
}
389

390
func (s *fileSorter) sortBuffer() error {
1,304✔
391
        buf := s.sortBuf[:s.nextIdx]
1,304✔
392

1,304✔
393
        var outErr error
1,304✔
394
        sort.Slice(buf, func(i, j int) bool {
172,807✔
395
                r1 := buf[i]
171,503✔
396
                r2 := buf[j]
171,503✔
397

171,503✔
398
                res, err := s.cmp(r1, r2)
171,503✔
399
                if err != nil {
171,614✔
400
                        outErr = err
111✔
401
                }
111✔
402
                return res < 0
171,503✔
403
        })
404
        return outErr
1,304✔
405
}
406

407
func (s *fileSorter) flushBuffer() error {
1,250✔
408
        writer, err := s.tempFileWriter()
1,250✔
409
        if err != nil {
1,250✔
410
                return err
×
411
        }
×
412

413
        var chunkSize uint64
1,250✔
414
        for _, row := range s.sortBuf[:s.nextIdx] {
16,093✔
415
                data, err := encodeRow(row)
14,843✔
416
                if err != nil {
14,843✔
417
                        return err
×
418
                }
×
419

420
                _, err = writer.Write(data)
14,843✔
421
                if err != nil {
14,843✔
422
                        return err
×
423
                }
×
424

425
                chunkSize += uint64(len(data))
14,843✔
426
        }
427

428
        s.chunksToMerge = append(s.chunksToMerge, sortedChunk{
1,250✔
429
                offset: s.tempFileSize,
1,250✔
430
                size:   chunkSize,
1,250✔
431
        })
1,250✔
432
        s.tempFileSize += chunkSize
1,250✔
433
        return nil
1,250✔
434
}
435

436
func (s *fileSorter) tempFileWriter() (*bufio.Writer, error) {
1,250✔
437
        if s.writer != nil {
2,482✔
438
                return s.writer, nil
1,232✔
439
        }
1,232✔
440
        file, err := s.tx.createTempFile()
18✔
441
        if err != nil {
18✔
442
                return nil, err
×
443
        }
×
444
        s.tempFile = file
18✔
445
        s.allTempFiles = append(s.allTempFiles, file)
18✔
446
        s.writer = bufio.NewWriter(file)
18✔
447
        return s.writer, nil
18✔
448
}
449

450
func encodeRow(r *Row) ([]byte, error) {
14,843✔
451
        var buf bytes.Buffer
14,843✔
452
        buf.Write([]byte{0, 0}) // make room for size field
14,843✔
453

14,843✔
454
        for _, v := range r.ValuesByPosition {
100,578✔
455
                rawValue, err := EncodeNullableValue(v, v.Type(), -1)
85,735✔
456
                if err != nil {
85,735✔
457
                        return nil, err
×
458
                }
×
459
                buf.Write(rawValue)
85,735✔
460
        }
461

462
        data := buf.Bytes()
14,843✔
463
        size := uint16(len(data) - 2)
14,843✔
464
        binary.BigEndian.PutUint16(data, size)
14,843✔
465

14,843✔
466
        return data, nil
14,843✔
467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc