• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.83
/embedded/sql/window_row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "fmt"
22
        "sort"
23
)
24

25
// windowRowReader materializes all input rows, groups them by partition key,
26
// sorts within each partition, computes window function values, and emits
27
// rows with the window columns appended.
28
type windowRowReader struct {
29
        inner RowReader
30

31
        windowFns   []*WindowFnExp
32
        innerCols   []ColDescriptor
33
        maxRows     int // 0 = unlimited
34

35
        loaded    bool
36
        rows      []*Row
37
        rowIdx    int
38
}
39

40
func newWindowRowReader(ctx context.Context, inner RowReader, windowFns []*WindowFnExp, maxRows int) (*windowRowReader, error) {
54✔
41
        innerCols, err := inner.Columns(ctx)
54✔
42
        if err != nil {
54✔
NEW
43
                return nil, err
×
NEW
44
        }
×
45

46
        return &windowRowReader{
54✔
47
                inner:     inner,
54✔
48
                windowFns: windowFns,
54✔
49
                innerCols: innerCols,
54✔
50
                maxRows:   maxRows,
54✔
51
        }, nil
54✔
52
}
53

54
func (wr *windowRowReader) materialize(ctx context.Context) error {
375✔
55
        if wr.loaded {
718✔
56
                return nil
343✔
57
        }
343✔
58
        wr.loaded = true
32✔
59

32✔
60
        // Read all rows (with optional limit to prevent OOM)
32✔
61
        var allRows []*Row
32✔
62
        for {
428✔
63
                row, err := wr.inner.Read(ctx)
396✔
64
                if err == ErrNoMoreRows {
428✔
65
                        break
32✔
66
                }
67
                if err != nil {
364✔
NEW
68
                        return err
×
NEW
69
                }
×
70
                allRows = append(allRows, row)
364✔
71

364✔
72
                if wr.maxRows > 0 && len(allRows) > wr.maxRows {
364✔
NEW
73
                        return fmt.Errorf("%w: %d rows exceed limit of %d",
×
NEW
74
                                ErrWindowRowsLimitExceeded, len(allRows), wr.maxRows)
×
NEW
75
                }
×
76
        }
77

78
        if len(allRows) == 0 {
32✔
NEW
79
                return nil
×
NEW
80
        }
×
81

82
        tx := wr.inner.Tx()
32✔
83
        tableAlias := wr.inner.TableAlias()
32✔
84

32✔
85
        // Process each window function
32✔
86
        for _, wfn := range wr.windowFns {
70✔
87
                // Partition rows
38✔
88
                partitions := wr.partitionRows(allRows, wfn, tx, tableAlias)
38✔
89

38✔
90
                // Sort within each partition
38✔
91
                for _, partition := range partitions {
94✔
92
                        if len(wfn.orderBy) > 0 {
93✔
93
                                wr.sortPartition(partition, wfn, tx, tableAlias)
37✔
94
                        }
37✔
95
                }
96

97
                // Compute window values
98
                for _, partition := range partitions {
94✔
99
                        wr.computeWindowValues(partition, wfn, tx, tableAlias)
56✔
100
                }
56✔
101
        }
102

103
        wr.rows = allRows
32✔
104
        return nil
32✔
105
}
106

107
func (wr *windowRowReader) partitionRows(rows []*Row, wfn *WindowFnExp, tx *SQLTx, tableAlias string) [][]*Row {
38✔
108
        if len(wfn.partitionBy) == 0 {
64✔
109
                return [][]*Row{rows}
26✔
110
        }
26✔
111

112
        partitionMap := make(map[string][]*Row)
12✔
113
        var keys []string
12✔
114

12✔
115
        for _, row := range rows {
154✔
116
                key := wr.partitionKey(row, wfn, tx, tableAlias)
142✔
117
                if _, exists := partitionMap[key]; !exists {
172✔
118
                        keys = append(keys, key)
30✔
119
                }
30✔
120
                partitionMap[key] = append(partitionMap[key], row)
142✔
121
        }
122

123
        partitions := make([][]*Row, 0, len(keys))
12✔
124
        for _, k := range keys {
42✔
125
                partitions = append(partitions, partitionMap[k])
30✔
126
        }
30✔
127
        return partitions
12✔
128
}
129

130
func (wr *windowRowReader) partitionKey(row *Row, wfn *WindowFnExp, tx *SQLTx, tableAlias string) string {
142✔
131
        // Use the same binary encoding as hashGroupedRowReader.groupKey():
142✔
132
        // 0x00 byte for SQL NULL, 0x01 + EncodeValue bytes for non-NULL.
142✔
133
        // The previous pipe-delimited string approach had two correctness bugs:
142✔
134
        //   1. VARCHAR values containing "|" produced colliding partition keys.
142✔
135
        //   2. VARCHAR value "NULL" was indistinguishable from an actual SQL NULL.
142✔
136
        var key []byte
142✔
137
        for _, pexp := range wfn.partitionBy {
284✔
138
                val, err := pexp.reduce(tx, row, tableAlias)
142✔
139
                if err != nil || val == nil || val.IsNull() {
142✔
NEW
140
                        key = append(key, 0) // NULL sentinel
×
NEW
141
                        continue
×
142
                }
143
                key = append(key, 1) // non-NULL sentinel
142✔
144
                b, _ := EncodeValue(val, val.Type(), 0)
142✔
145
                key = append(key, b...)
142✔
146
        }
147
        return string(key)
142✔
148
}
149

150
func (wr *windowRowReader) sortPartition(partition []*Row, wfn *WindowFnExp, tx *SQLTx, tableAlias string) {
37✔
151
        sort.SliceStable(partition, func(i, j int) bool {
591✔
152
                for _, ord := range wfn.orderBy {
1,108✔
153
                        vi, erri := ord.exp.reduce(tx, partition[i], tableAlias)
554✔
154
                        vj, errj := ord.exp.reduce(tx, partition[j], tableAlias)
554✔
155
                        if erri != nil || errj != nil {
554✔
NEW
156
                                continue
×
157
                        }
158
                        cmp, err := vi.Compare(vj)
554✔
159
                        if err != nil || cmp == 0 {
560✔
160
                                continue
6✔
161
                        }
162
                        if ord.descOrder {
773✔
163
                                return cmp > 0
225✔
164
                        }
225✔
165
                        return cmp < 0
323✔
166
                }
167
                return false
6✔
168
        })
169
}
170

171
func (wr *windowRowReader) computeWindowValues(partition []*Row, wfn *WindowFnExp, tx *SQLTx, tableAlias string) {
56✔
172
        fnName := wfn.fnName
56✔
173
        colSel := wfn.selectorName()
56✔
174

56✔
175
        switch fnName {
56✔
176
        case "ROW_NUMBER":
10✔
177
                for i, row := range partition {
93✔
178
                        appendWindowValue(row, colSel, NewInteger(int64(i+1)))
83✔
179
                }
83✔
180

181
        case "RANK":
10✔
182
                rank := 1
10✔
183
                for i, row := range partition {
74✔
184
                        if i > 0 && !wr.orderByEqual(partition[i-1], row, wfn, tx, tableAlias) {
115✔
185
                                rank = i + 1
51✔
186
                        }
51✔
187
                        appendWindowValue(row, colSel, NewInteger(int64(rank)))
64✔
188
                }
189

190
        case "DENSE_RANK":
4✔
191
                rank := 1
4✔
192
                for i, row := range partition {
18✔
193
                        if i > 0 && !wr.orderByEqual(partition[i-1], row, wfn, tx, tableAlias) {
21✔
194
                                rank++
7✔
195
                        }
7✔
196
                        appendWindowValue(row, colSel, NewInteger(int64(rank)))
14✔
197
                }
198

199
        case "COUNT":
12✔
200
                count := int64(len(partition))
12✔
201
                for _, row := range partition {
80✔
202
                        appendWindowValue(row, colSel, NewInteger(count))
68✔
203
                }
68✔
204

205
        case "SUM":
6✔
206
                var sum float64
6✔
207
                for _, row := range partition {
19✔
208
                        if len(wfn.params) > 0 {
26✔
209
                                val, err := wfn.params[0].reduce(tx, row, tableAlias)
13✔
210
                                if err == nil && !val.IsNull() {
24✔
211
                                        switch v := val.RawValue().(type) {
11✔
212
                                        case int64:
11✔
213
                                                sum += float64(v)
11✔
NEW
214
                                        case float64:
×
NEW
215
                                                sum += v
×
216
                                        }
217
                                }
218
                        }
219
                }
220
                for _, row := range partition {
19✔
221
                        appendWindowValue(row, colSel, NewFloat64(sum))
13✔
222
                }
13✔
223

NEW
224
        case "MIN", "MAX":
×
NEW
225
                var result TypedValue
×
NEW
226
                for _, row := range partition {
×
NEW
227
                        if len(wfn.params) > 0 {
×
NEW
228
                                val, err := wfn.params[0].reduce(tx, row, tableAlias)
×
NEW
229
                                if err == nil && !val.IsNull() {
×
NEW
230
                                        if result == nil {
×
NEW
231
                                                result = val
×
NEW
232
                                        } else {
×
NEW
233
                                                cmp, err := val.Compare(result)
×
NEW
234
                                                if err == nil {
×
NEW
235
                                                        if (fnName == "MIN" && cmp < 0) || (fnName == "MAX" && cmp > 0) {
×
NEW
236
                                                                result = val
×
NEW
237
                                                        }
×
238
                                                }
239
                                        }
240
                                }
241
                        }
242
                }
NEW
243
                for _, row := range partition {
×
NEW
244
                        if result != nil {
×
NEW
245
                                appendWindowValue(row, colSel, result)
×
NEW
246
                        } else {
×
NEW
247
                                appendWindowValue(row, colSel, NewNull(IntegerType))
×
NEW
248
                        }
×
249
                }
250

NEW
251
        case "AVG":
×
NEW
252
                var sum float64
×
NEW
253
                var count int64
×
NEW
254
                for _, row := range partition {
×
NEW
255
                        if len(wfn.params) > 0 {
×
NEW
256
                                val, err := wfn.params[0].reduce(tx, row, tableAlias)
×
NEW
257
                                if err == nil && !val.IsNull() {
×
NEW
258
                                        switch v := val.RawValue().(type) {
×
NEW
259
                                        case int64:
×
NEW
260
                                                sum += float64(v)
×
NEW
261
                                                count++
×
NEW
262
                                        case float64:
×
NEW
263
                                                sum += v
×
NEW
264
                                                count++
×
265
                                        }
266
                                }
267
                        }
268
                }
NEW
269
                if count > 0 {
×
NEW
270
                        avg := sum / float64(count)
×
NEW
271
                        for _, row := range partition {
×
NEW
272
                                appendWindowValue(row, colSel, NewFloat64(avg))
×
NEW
273
                        }
×
NEW
274
                } else {
×
NEW
275
                        for _, row := range partition {
×
NEW
276
                                appendWindowValue(row, colSel, NewNull(Float64Type))
×
NEW
277
                        }
×
278
                }
279

280
        case "LAG":
4✔
281
                for i, row := range partition {
66✔
282
                        offset := 1
62✔
283
                        if len(wfn.params) > 1 {
62✔
NEW
284
                                if ov, ok := wfn.params[1].(*Integer); ok {
×
NEW
285
                                        offset = int(ov.val)
×
NEW
286
                                }
×
287
                        }
288
                        prev := i - offset
62✔
289
                        if prev >= 0 && prev < len(partition) && len(wfn.params) > 0 {
120✔
290
                                val, err := wfn.params[0].reduce(tx, partition[prev], tableAlias)
58✔
291
                                if err == nil {
116✔
292
                                        appendWindowValue(row, colSel, val)
58✔
293
                                        continue
58✔
294
                                }
295
                        }
296
                        appendWindowValue(row, colSel, NewNull(AnyType))
4✔
297
                }
298

299
        case "LEAD":
3✔
300
                for i, row := range partition {
61✔
301
                        offset := 1
58✔
302
                        if len(wfn.params) > 1 {
58✔
NEW
303
                                if ov, ok := wfn.params[1].(*Integer); ok {
×
NEW
304
                                        offset = int(ov.val)
×
NEW
305
                                }
×
306
                        }
307
                        next := i + offset
58✔
308
                        if next >= 0 && next < len(partition) && len(wfn.params) > 0 {
113✔
309
                                val, err := wfn.params[0].reduce(tx, partition[next], tableAlias)
55✔
310
                                if err == nil {
110✔
311
                                        appendWindowValue(row, colSel, val)
55✔
312
                                        continue
55✔
313
                                }
314
                        }
315
                        appendWindowValue(row, colSel, NewNull(AnyType))
3✔
316
                }
317

318
        case "FIRST_VALUE":
2✔
319
                if len(partition) > 0 && len(wfn.params) > 0 {
4✔
320
                        firstVal, err := wfn.params[0].reduce(tx, partition[0], tableAlias)
2✔
321
                        if err != nil {
2✔
NEW
322
                                firstVal = NewNull(AnyType)
×
NEW
323
                        }
×
324
                        for _, row := range partition {
10✔
325
                                appendWindowValue(row, colSel, firstVal)
8✔
326
                        }
8✔
327
                }
328

329
        case "LAST_VALUE":
2✔
330
                if len(partition) > 0 && len(wfn.params) > 0 {
4✔
331
                        lastVal, err := wfn.params[0].reduce(tx, partition[len(partition)-1], tableAlias)
2✔
332
                        if err != nil {
2✔
NEW
333
                                lastVal = NewNull(AnyType)
×
NEW
334
                        }
×
335
                        for _, row := range partition {
10✔
336
                                appendWindowValue(row, colSel, lastVal)
8✔
337
                        }
8✔
338
                }
339

340
        case "NTILE":
3✔
341
                buckets := int64(1)
3✔
342
                if len(wfn.params) > 0 {
6✔
343
                        if bv, ok := wfn.params[0].(*Integer); ok && bv.val > 0 {
6✔
344
                                buckets = bv.val
3✔
345
                        }
3✔
346
                }
347
                n := int64(len(partition))
3✔
348
                for i, row := range partition {
61✔
349
                        bucket := (int64(i) * buckets / n) + 1
58✔
350
                        appendWindowValue(row, colSel, NewInteger(bucket))
58✔
351
                }
58✔
352

NEW
353
        default:
×
NEW
354
                // Unknown window function — append NULL
×
NEW
355
                for _, row := range partition {
×
NEW
356
                        appendWindowValue(row, colSel, NewNull(IntegerType))
×
NEW
357
                }
×
358
        }
359
}
360

361
func appendWindowValue(row *Row, selector string, val TypedValue) {
436✔
362
        row.ValuesByPosition = append(row.ValuesByPosition, val)
436✔
363
        row.ValuesBySelector[selector] = val
436✔
364
}
436✔
365

366
func (wr *windowRowReader) orderByEqual(a, b *Row, wfn *WindowFnExp, tx *SQLTx, tableAlias string) bool {
64✔
367
        for _, ord := range wfn.orderBy {
128✔
368
                va, erra := ord.exp.reduce(tx, a, tableAlias)
64✔
369
                vb, errb := ord.exp.reduce(tx, b, tableAlias)
64✔
370
                if erra != nil || errb != nil {
64✔
NEW
371
                        return false
×
NEW
372
                }
×
373
                cmp, err := va.Compare(vb)
64✔
374
                if err != nil || cmp != 0 {
122✔
375
                        return false
58✔
376
                }
58✔
377
        }
378
        return true
6✔
379
}
380

381
func (wr *windowRowReader) onClose(callback func()) {
43✔
382
        wr.inner.onClose(callback)
43✔
383
}
43✔
384

385
func (wr *windowRowReader) Tx() *SQLTx {
939✔
386
        return wr.inner.Tx()
939✔
387
}
939✔
388

389
func (wr *windowRowReader) TableAlias() string {
2,623✔
390
        return wr.inner.TableAlias()
2,623✔
391
}
2,623✔
392

NEW
393
func (wr *windowRowReader) OrderBy() []ColDescriptor {
×
NEW
394
        return nil
×
NEW
395
}
×
396

NEW
397
func (wr *windowRowReader) ScanSpecs() *ScanSpecs {
×
NEW
398
        return nil
×
NEW
399
}
×
400

401
func (wr *windowRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
33✔
402
        cols := make([]ColDescriptor, len(wr.innerCols))
33✔
403
        copy(cols, wr.innerCols)
33✔
404

33✔
405
        for _, wfn := range wr.windowFns {
72✔
406
                cols = append(cols, ColDescriptor{
39✔
407
                        Table:  wr.inner.TableAlias(),
39✔
408
                        Column: wfn.alias,
39✔
409
                        Type:   wfn.resultType(),
39✔
410
                })
39✔
411
        }
39✔
412
        return cols, nil
33✔
413
}
414

415
func (wr *windowRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
33✔
416
        cols, err := wr.Columns(ctx)
33✔
417
        if err != nil {
33✔
NEW
418
                return nil, err
×
NEW
419
        }
×
420
        result := make(map[string]ColDescriptor, len(cols))
33✔
421
        for _, c := range cols {
168✔
422
                result[c.Selector()] = c
135✔
423
        }
135✔
424
        return result, nil
33✔
425
}
426

427
func (wr *windowRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
11✔
428
        return wr.inner.InferParameters(ctx, params)
11✔
429
}
11✔
430

431
func (wr *windowRowReader) Parameters() map[string]interface{} {
939✔
432
        return wr.inner.Parameters()
939✔
433
}
939✔
434

435
func (wr *windowRowReader) Read(ctx context.Context) (*Row, error) {
375✔
436
        if err := wr.materialize(ctx); err != nil {
375✔
NEW
437
                return nil, err
×
NEW
438
        }
×
439

440
        if wr.rowIdx >= len(wr.rows) {
402✔
441
                return nil, ErrNoMoreRows
27✔
442
        }
27✔
443

444
        row := wr.rows[wr.rowIdx]
348✔
445
        wr.rowIdx++
348✔
446
        return row, nil
348✔
447
}
448

449
func (wr *windowRowReader) Close() error {
54✔
450
        return wr.inner.Close()
54✔
451
}
54✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc