• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.18
/embedded/sql/sort_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "container/heap"
21
        "context"
22
        "fmt"
23
)
24

25
// topNSortThreshold is the maximum LIMIT value for which the top-N heap
26
// optimisation is applied. For larger limits the full file-sort path is used.
27
const topNSortThreshold = 1000
28

29
type sortDirection int8
30

31
const (
32
        sortDirectionDesc sortDirection = -1
33
        sortDirectionAsc  sortDirection = 1
34
)
35

36
type sortRowReader struct {
37
        rowReader          RowReader
38
        ordExps            []*OrdExp
39
        orderByDescriptors []ColDescriptor
40
        sorter             fileSorter
41

42
        resultReader resultReader
43

44
        // topNLimit, when > 0, activates the bounded heap optimisation: only the
45
        // top-N rows (in sort order) are retained in memory instead of sorting
46
        // the full result set. Set from the query LIMIT when applicable.
47
        topNLimit int
48

49
        // onCloseCallback fires from this reader's own Close (not from inner
50
        // readers'). Stored locally rather than propagated so that the join
51
        // machinery's mid-iteration Close on inner readers does not trigger
52
        // qtx.Cancel before sortRowReader.finalize finishes merging temp files.
53
        onCloseCallback func()
54
}
55

56
func newSortRowReader(rowReader RowReader, ordExps []*OrdExp) (*sortRowReader, error) {
94✔
57
        if rowReader == nil || len(ordExps) == 0 {
95✔
58
                return nil, ErrIllegalArguments
1✔
59
        }
1✔
60

61
        descriptors, err := rowReader.Columns(context.Background())
93✔
62
        if err != nil {
93✔
63
                return nil, err
×
64
        }
×
65

66
        for _, col := range ordExps {
223✔
67
                colPos, isColRef := col.exp.(*Integer)
130✔
68
                if isColRef && (colPos.val <= 0 || colPos.val > int64(len(descriptors))) {
130✔
69
                        return nil, fmt.Errorf("position %d is not in select list", colPos.val)
×
70
                }
×
71
        }
72

73
        colPosBySelector, err := getColPositionsBySelector(descriptors)
93✔
74
        if err != nil {
93✔
75
                return nil, err
×
76
        }
×
77

78
        colTypes, err := getColTypes(rowReader)
93✔
79
        if err != nil {
93✔
80
                return nil, err
×
81
        }
×
82

83
        orderByDescriptors, err := getOrderByDescriptors(ordExps, rowReader)
93✔
84
        if err != nil {
94✔
85
                return nil, err
1✔
86
        }
1✔
87

88
        tx := rowReader.Tx()
92✔
89
        sr := &sortRowReader{
92✔
90
                rowReader:          rowReader,
92✔
91
                ordExps:            ordExps,
92✔
92
                orderByDescriptors: orderByDescriptors,
92✔
93
                sorter: fileSorter{
92✔
94
                        colPosBySelector: colPosBySelector,
92✔
95
                        colTypes:         colTypes,
92✔
96
                        tx:               tx,
92✔
97
                        sortBufSize:      tx.engine.sortBufferSize,
92✔
98
                        sortBuf:          make([]*Row, tx.engine.sortBufferSize),
92✔
99
                },
92✔
100
        }
92✔
101

92✔
102
        directions := make([]sortDirection, len(ordExps))
92✔
103
        for i, col := range ordExps {
221✔
104
                directions[i] = sortDirectionAsc
129✔
105
                if col.descOrder {
172✔
106
                        directions[i] = sortDirectionDesc
43✔
107
                }
43✔
108
        }
109

110
        t1 := make(Tuple, len(ordExps))
92✔
111
        t2 := make(Tuple, len(ordExps))
92✔
112

92✔
113
        nullsOrders := make([]NullsOrder, len(ordExps))
92✔
114
        for i, col := range ordExps {
221✔
115
                nullsOrders[i] = col.nullsOrder
129✔
116
        }
129✔
117

118
        sr.sorter.cmp = func(r1, r2 *Row) (int, error) {
265,902✔
119
                if err := sr.evalSortExps(r1, t1); err != nil {
265,810✔
120
                        return 0, err
×
121
                }
×
122

123
                if err := sr.evalSortExps(r2, t2); err != nil {
265,810✔
124
                        return 0, err
×
125
                }
×
126

127
                for i := range t1 {
558,354✔
128
                        v1Null := t1[i] == nil || t1[i].IsNull()
292,544✔
129
                        v2Null := t2[i] == nil || t2[i].IsNull()
292,544✔
130

292,544✔
131
                        if v1Null && v2Null {
292,555✔
132
                                continue
11✔
133
                        }
134
                        if v1Null || v2Null {
292,598✔
135
                                nullOrder := nullsOrders[i]
65✔
136
                                if nullOrder == NullsDefault {
116✔
137
                                        // immudb default: NULLS FIRST for ASC, NULLS LAST for DESC
51✔
138
                                        if directions[i] == sortDirectionAsc {
102✔
139
                                                nullOrder = NullsFirst
51✔
140
                                        } else {
51✔
NEW
141
                                                nullOrder = NullsLast
×
NEW
142
                                        }
×
143
                                }
144
                                if v1Null {
101✔
145
                                        if nullOrder == NullsFirst {
68✔
146
                                                return -1, nil
32✔
147
                                        }
32✔
148
                                        return 1, nil
4✔
149
                                }
150
                                if nullOrder == NullsFirst {
54✔
151
                                        return 1, nil
25✔
152
                                }
25✔
153
                                return -1, nil
4✔
154
                        }
155

156
                        res, err := t1[i].Compare(t2[i])
292,468✔
157
                        if err != nil {
292,579✔
158
                                return 0, err
111✔
159
                        }
111✔
160
                        if res != 0 {
549,038✔
161
                                return res * int(directions[i]), nil
256,681✔
162
                        }
256,681✔
163
                }
164
                return 0, nil
8,953✔
165
        }
166
        return sr, nil
92✔
167
}
168

169
func (s *sortRowReader) evalSortExps(inRow *Row, out Tuple) error {
531,620✔
170
        for i, col := range s.ordExps {
1,420,600✔
171
                colPos, isColRef := col.exp.(*Integer)
888,980✔
172
                if isColRef {
1,245,596✔
173
                        if colPos.val < 1 || colPos.val > int64(len(inRow.ValuesByPosition)) {
356,616✔
174
                                return fmt.Errorf("position %d is not in select list", colPos.val)
×
175
                        }
×
176
                        out[i] = inRow.ValuesByPosition[colPos.val-1]
356,616✔
177
                } else {
532,364✔
178
                        val, err := col.exp.reduce(s.Tx(), inRow, s.TableAlias())
532,364✔
179
                        if err != nil {
532,364✔
180
                                return err
×
181
                        }
×
182
                        out[i] = val
532,364✔
183
                }
184
        }
185
        return nil
531,620✔
186
}
187

188
func getOrderByDescriptors(ordExps []*OrdExp, rowReader RowReader) ([]ColDescriptor, error) {
93✔
189
        colsBySel, err := rowReader.colsBySelector(context.Background())
93✔
190
        if err != nil {
93✔
191
                return nil, err
×
192
        }
×
193

194
        params := make(map[string]string)
93✔
195
        orderByDescriptors := make([]ColDescriptor, len(ordExps))
93✔
196
        for i, col := range ordExps {
223✔
197
                sqlType, err := col.exp.inferType(colsBySel, params, rowReader.TableAlias())
130✔
198
                if err != nil {
131✔
199
                        return nil, err
1✔
200
                }
1✔
201

202
                if sel := col.AsSelector(); sel != nil {
227✔
203
                        aggFn, table, col := sel.resolve(rowReader.TableAlias())
98✔
204
                        orderByDescriptors[i] = ColDescriptor{
98✔
205
                                AggFn:  aggFn,
98✔
206
                                Table:  table,
98✔
207
                                Column: col,
98✔
208
                                Type:   sqlType,
98✔
209
                        }
98✔
210
                } else {
129✔
211
                        orderByDescriptors[i] = ColDescriptor{
31✔
212
                                Column: col.exp.String(),
31✔
213
                                Type:   sqlType,
31✔
214
                        }
31✔
215
                }
31✔
216
        }
217
        return orderByDescriptors, nil
92✔
218
}
219

220
func getColTypes(r RowReader) ([]string, error) {
93✔
221
        descriptors, err := r.Columns(context.Background())
93✔
222
        if err != nil {
93✔
223
                return nil, err
×
224
        }
×
225

226
        cols := make([]string, len(descriptors))
93✔
227
        for i, desc := range descriptors {
860✔
228
                cols[i] = desc.Type
767✔
229
        }
767✔
230
        return cols, err
93✔
231
}
232

233
func getColPositionsBySelector(desc []ColDescriptor) (map[string]int, error) {
93✔
234
        colPositionsBySelector := make(map[string]int)
93✔
235
        for i, desc := range desc {
860✔
236
                colPositionsBySelector[desc.Selector()] = i
767✔
237
        }
767✔
238
        return colPositionsBySelector, nil
93✔
239
}
240

241
// onClose registers a callback that fires exactly once when this reader's
242
// own Close runs. We deliberately do NOT propagate the callback down to
243
// sr.rowReader because inner readers (notably jointRowReader) Close
244
// nested sub-readers mid-iteration; propagating would let qtx.Cancel
245
// fire while sortRowReader.finalize is still merging temp files. See
246
// joint_row_reader.go:198 and the JOIN+GROUP+ORDER regression test.
247
func (sr *sortRowReader) onClose(callback func()) {
86✔
248
        sr.onCloseCallback = callback
86✔
249
}
86✔
250

251
func (sr *sortRowReader) Tx() *SQLTx {
688,359✔
252
        return sr.rowReader.Tx()
688,359✔
253
}
688,359✔
254

255
func (sr *sortRowReader) TableAlias() string {
1,000,364✔
256
        return sr.rowReader.TableAlias()
1,000,364✔
257
}
1,000,364✔
258

259
func (sr *sortRowReader) Parameters() map[string]interface{} {
155,981✔
260
        return sr.rowReader.Parameters()
155,981✔
261
}
155,981✔
262

263
func (sr *sortRowReader) OrderBy() []ColDescriptor {
5✔
264
        return sr.orderByDescriptors
5✔
265
}
5✔
266

267
func (sr *sortRowReader) ScanSpecs() *ScanSpecs {
32✔
268
        return sr.rowReader.ScanSpecs()
32✔
269
}
32✔
270

271
func (sr *sortRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
32✔
272
        return sr.rowReader.Columns(ctx)
32✔
273
}
32✔
274

275
func (sr *sortRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
11✔
276
        return sr.rowReader.colsBySelector(ctx)
11✔
277
}
11✔
278

279
func (sr *sortRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
5✔
280
        return sr.rowReader.InferParameters(ctx, params)
5✔
281
}
5✔
282

283
func (sr *sortRowReader) Read(ctx context.Context) (*Row, error) {
27,200✔
284
        if sr.resultReader == nil {
27,274✔
285
                reader, err := sr.readAndSort(ctx)
74✔
286
                if err != nil {
75✔
287
                        return nil, err
1✔
288
                }
1✔
289
                sr.resultReader = reader
73✔
290
        }
291
        return sr.resultReader.Read()
27,199✔
292
}
293

294
func (sr *sortRowReader) readAndSort(ctx context.Context) (resultReader, error) {
74✔
295
        if sr.topNLimit > 0 {
76✔
296
                return sr.readAndSortTopN(ctx)
2✔
297
        }
2✔
298
        err := sr.readAll(ctx)
72✔
299
        if err != nil {
72✔
300
                return nil, err
×
301
        }
×
302
        return sr.sorter.finalize()
72✔
303
}
304

305
// topNHeap is a max-heap of *Row values used by the top-N sort optimisation.
306
// It retains only the N rows with the smallest sort key (the rows that appear
307
// first in the final ORDER BY output). A max-heap lets us cheaply compare and
308
// evict the current worst candidate without touching the other rows.
309
type topNHeap struct {
310
        rows []*Row
311
        cmp  func(r1, r2 *Row) (int, error)
312
        err  error // first comparison error, if any
313
}
314

315
func (h *topNHeap) Len() int { return len(h.rows) }
35✔
316

317
// Less makes this a max-heap: the root holds the row that sorts LAST (worst).
318
func (h *topNHeap) Less(i, j int) bool {
16✔
319
        res, err := h.cmp(h.rows[i], h.rows[j])
16✔
320
        if err != nil && h.err == nil {
16✔
NEW
321
                h.err = err
×
NEW
322
        }
×
323
        return res > 0
16✔
324
}
325

326
func (h *topNHeap) Swap(i, j int) { h.rows[i], h.rows[j] = h.rows[j], h.rows[i] }
13✔
327

328
func (h *topNHeap) Push(x interface{}) { h.rows = append(h.rows, x.(*Row)) }
10✔
329

330
func (h *topNHeap) Pop() interface{} {
10✔
331
        old := h.rows
10✔
332
        n := len(old)
10✔
333
        x := old[n-1]
10✔
334
        old[n-1] = nil
10✔
335
        h.rows = old[:n-1]
10✔
336
        return x
10✔
337
}
10✔
338

339
// readAndSortTopN reads all inner rows into a bounded max-heap of size
340
// topNLimit, then extracts them in ascending sort order. This avoids the
341
// disk-spill path for queries like ORDER BY col LIMIT 10.
342
func (sr *sortRowReader) readAndSortTopN(ctx context.Context) (resultReader, error) {
2✔
343
        h := &topNHeap{
2✔
344
                cmp:  sr.sorter.cmp,
2✔
345
                rows: make([]*Row, 0, sr.topNLimit+1),
2✔
346
        }
2✔
347
        heap.Init(h)
2✔
348

2✔
349
        for {
15✔
350
                row, err := sr.rowReader.Read(ctx)
13✔
351
                if err == ErrNoMoreRows {
15✔
352
                        break
2✔
353
                }
354
                if err != nil {
11✔
NEW
355
                        return nil, err
×
NEW
356
                }
×
357

358
                if h.Len() < sr.topNLimit {
19✔
359
                        heap.Push(h, row)
8✔
360
                } else {
11✔
361
                        // Evict the current worst row if this one is better.
3✔
362
                        res, cmpErr := sr.sorter.cmp(row, h.rows[0])
3✔
363
                        if cmpErr != nil {
3✔
NEW
364
                                return nil, cmpErr
×
NEW
365
                        }
×
366
                        if res < 0 { // row sorts before the heap root (current worst)
5✔
367
                                heap.Pop(h)
2✔
368
                                heap.Push(h, row)
2✔
369
                        }
2✔
370
                }
371
                if h.err != nil {
11✔
NEW
372
                        return nil, h.err
×
NEW
373
                }
×
374
        }
375

376
        // Pop from max-heap: elements come out largest-first.  Reverse to get
377
        // ascending (correct final) order.
378
        n := h.Len()
2✔
379
        rows := make([]*Row, n)
2✔
380
        for i := n - 1; i >= 0; i-- {
10✔
381
                rows[i] = heap.Pop(h).(*Row)
8✔
382
        }
8✔
383
        return &bufferResultReader{sortBuf: rows}, nil
2✔
384
}
385

386
func (sr *sortRowReader) readAll(ctx context.Context) error {
72✔
387
        for {
27,411✔
388
                row, err := sr.rowReader.Read(ctx)
27,339✔
389
                if err == ErrNoMoreRows {
27,411✔
390
                        return nil
72✔
391
                }
72✔
392

393
                if err != nil {
27,267✔
394
                        return err
×
395
                }
×
396

397
                err = sr.sorter.update(row)
27,267✔
398
                if err != nil {
27,267✔
399
                        return err
×
400
                }
×
401
        }
402
}
403

404
func (sr *sortRowReader) Close() error {
91✔
405
        if cb := sr.onCloseCallback; cb != nil {
177✔
406
                // Fire after inner Close so the tx that owns any underlying
86✔
407
                // resources stays alive until everyone downstream is done.
86✔
408
                defer cb()
86✔
409
                sr.onCloseCallback = nil
86✔
410
        }
86✔
411
        var resultErr error
91✔
412
        if sr.resultReader != nil {
164✔
413
                resultErr = sr.resultReader.Close()
73✔
414
                sr.resultReader = nil
73✔
415
        }
73✔
416
        if err := sr.rowReader.Close(); err != nil {
91✔
NEW
417
                return err
×
NEW
418
        }
×
419
        return resultErr
91✔
420
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc