• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.55
/embedded/sql/grouped_row_reader.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23

24
        "github.com/codenotary/immudb/embedded/store"
25
)
26

27
type groupedRowReader struct {
28
        rowReader RowReader
29

30
        selectors       []*AggColSelector
31
        groupByCols     []*ColSelector
32
        cols            []ColDescriptor
33
        allAggregations bool
34

35
        currRow *Row
36
        empty   bool
37
}
38

39
func newGroupedRowReader(rowReader RowReader, allAggregations bool, selectors []*AggColSelector, groupBy []*ColSelector) (*groupedRowReader, error) {
126✔
40
        if rowReader == nil {
127✔
41
                return nil, ErrIllegalArguments
1✔
42
        }
1✔
43

44
        gr := &groupedRowReader{
125✔
45
                rowReader:       rowReader,
125✔
46
                selectors:       selectors,
125✔
47
                groupByCols:     groupBy,
125✔
48
                empty:           true,
125✔
49
                allAggregations: allAggregations,
125✔
50
        }
125✔
51

125✔
52
        cols, err := gr.columns()
125✔
53
        if err == nil {
248✔
54
                gr.cols = cols
123✔
55
        }
123✔
56
        return gr, err
125✔
57
}
58

59
func (gr *groupedRowReader) onClose(callback func()) {
68✔
60
        gr.rowReader.onClose(callback)
68✔
61
}
68✔
62

63
func (gr *groupedRowReader) Tx() *SQLTx {
4,983✔
64
        return gr.rowReader.Tx()
4,983✔
65
}
4,983✔
66

67
func (gr *groupedRowReader) TableAlias() string {
7,276✔
68
        return gr.rowReader.TableAlias()
7,276✔
69
}
7,276✔
70

71
func (gr *groupedRowReader) OrderBy() []ColDescriptor {
1✔
72
        return gr.rowReader.OrderBy()
1✔
73
}
1✔
74

75
func (gr *groupedRowReader) ScanSpecs() *ScanSpecs {
19✔
76
        return gr.rowReader.ScanSpecs()
19✔
77
}
19✔
78

79
func (gr *groupedRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
37✔
80
        return gr.cols, nil
37✔
81
}
37✔
82

83
func (gr *groupedRowReader) columns() ([]ColDescriptor, error) {
125✔
84
        colsBySel, err := gr.colsBySelector(context.Background())
125✔
85
        if err != nil {
127✔
86
                return nil, err
2✔
87
        }
2✔
88

89
        selectorMap := make(map[string]bool)
123✔
90
        colsByPos := make([]ColDescriptor, 0, len(gr.selectors))
123✔
91
        for _, sel := range gr.selectors {
279✔
92
                encSel := EncodeSelector(sel.resolve(gr.rowReader.TableAlias()))
156✔
93
                colsByPos = append(colsByPos, colsBySel[encSel])
156✔
94
                selectorMap[encSel] = true
156✔
95
        }
156✔
96

97
        for _, col := range gr.groupByCols {
177✔
98
                sel := EncodeSelector(col.resolve(gr.rowReader.TableAlias()))
54✔
99
                if !selectorMap[sel] {
108✔
100
                        colsByPos = append(colsByPos, colsBySel[sel])
54✔
101
                }
54✔
102
        }
103
        return colsByPos, nil
123✔
104
}
105

106
func (gr *groupedRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
218✔
107
        colDescriptors, err := gr.rowReader.colsBySelector(ctx)
218✔
108
        if err != nil {
218✔
109
                return nil, err
×
110
        }
×
111

112
        for _, sel := range gr.selectors {
503✔
113
                aggFn, table, col := sel.resolve(gr.rowReader.TableAlias())
285✔
114

285✔
115
                if aggFn == "" {
285✔
116
                        continue
×
117
                }
118

119
                des := ColDescriptor{
285✔
120
                        AggFn:  aggFn,
285✔
121
                        Table:  table,
285✔
122
                        Column: col,
285✔
123
                        Type:   IntegerType,
285✔
124
                }
285✔
125

285✔
126
                encSel := des.Selector()
285✔
127

285✔
128
                if aggFn == COUNT && !sel.distinct {
469✔
129
                        colDescriptors[encSel] = des
184✔
130
                        continue
184✔
131
                }
132

133
                if aggFn == COUNT && sel.distinct {
102✔
134
                        // COUNT(DISTINCT col) needs the column type for selector resolution
1✔
135
                        colDesc, ok := colDescriptors[EncodeSelector("", table, col)]
1✔
136
                        if !ok {
1✔
NEW
137
                                return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, col)
×
NEW
138
                        }
×
139
                        des.Type = colDesc.Type
1✔
140
                        colDescriptors[encSel] = des
1✔
141
                        continue
1✔
142
                }
143

144
                colDesc, ok := colDescriptors[EncodeSelector("", table, col)]
100✔
145
                if !ok {
102✔
146
                        return nil, fmt.Errorf("%w (%s)", ErrColumnDoesNotExist, col)
2✔
147
                }
2✔
148

149
                des.Type = colDesc.Type
98✔
150
                colDescriptors[encSel] = des
98✔
151
        }
152
        return colDescriptors, nil
216✔
153
}
154

155
func allAggregations(targets []TargetEntry) bool {
124✔
156
        for _, t := range targets {
281✔
157
                _, isAggregation := t.Exp.(*AggColSelector)
157✔
158
                if !isAggregation {
181✔
159
                        return false
24✔
160
                }
24✔
161
        }
162
        return true
100✔
163
}
164

165
func zeroForType(t SQLValueType) TypedValue {
1,286✔
166
        switch t {
1,286✔
167
        case IntegerType:
659✔
168
                return &Integer{}
659✔
169
        case Float64Type:
43✔
170
                return &Float64{}
43✔
171
        case BooleanType:
83✔
172
                return &Bool{}
83✔
173
        case VarcharType:
340✔
174
                return &Varchar{}
340✔
175
        case JSONType:
9✔
176
                return &JSON{}
9✔
177
        case UUIDType:
7✔
178
                return &UUID{}
7✔
179
        case BLOBType:
128✔
180
                return &Blob{}
128✔
181
        case TimestampType:
17✔
182
                return &Timestamp{}
17✔
183
        }
184
        return nil
×
185
}
186

187
func (gr *groupedRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
22✔
188
        return gr.rowReader.InferParameters(ctx, params)
22✔
189
}
22✔
190

191
func (gr *groupedRowReader) Parameters() map[string]interface{} {
981✔
192
        return gr.rowReader.Parameters()
981✔
193
}
981✔
194

195
func (gr *groupedRowReader) Read(ctx context.Context) (*Row, error) {
221✔
196
        for {
808✔
197
                row, err := gr.rowReader.Read(ctx)
587✔
198
                if errors.Is(err, store.ErrNoMoreEntries) {
676✔
199
                        return gr.emitCurrentRow(ctx)
89✔
200
                }
89✔
201

202
                if err != nil {
499✔
203
                        return nil, err
1✔
204
                }
1✔
205

206
                gr.empty = false
497✔
207

497✔
208
                if gr.currRow == nil {
548✔
209
                        gr.currRow = row
51✔
210
                        err = gr.initAggregations(gr.currRow)
51✔
211
                        if err != nil {
51✔
UNCOV
212
                                return nil, err
×
UNCOV
213
                        }
×
214
                        continue
51✔
215
                }
216

217
                compatible, err := gr.currRow.compatible(row, gr.groupByCols, gr.rowReader.TableAlias())
446✔
218
                if err != nil {
446✔
219
                        return nil, err
×
220
                }
×
221

222
                if !compatible {
577✔
223
                        r := gr.currRow
131✔
224
                        gr.currRow = row
131✔
225

131✔
226
                        err = gr.initAggregations(gr.currRow)
131✔
227
                        if err != nil {
131✔
228
                                return nil, err
×
229
                        }
×
230
                        return r, nil
131✔
231
                }
232

233
                // Compatible rows get merged
234
                err = updateRow(gr.currRow, row)
315✔
235
                if err != nil {
315✔
236
                        return nil, err
×
237
                }
×
238
        }
239
}
240

241
func updateRow(currRow, newRow *Row) error {
857✔
242
        for _, v := range currRow.ValuesBySelector {
3,598✔
243
                aggV, isAggregatedValue := v.(AggregatedValue)
2,741✔
244

2,741✔
245
                if isAggregatedValue {
3,958✔
246
                        if aggV.ColBounded() {
1,677✔
247
                                val, exists := newRow.ValuesBySelector[aggV.Selector()]
460✔
248
                                if !exists {
460✔
249
                                        return ErrColumnDoesNotExist
×
250
                                }
×
251

252
                                err := aggV.updateWith(val)
460✔
253
                                if err != nil {
460✔
254
                                        return err
×
255
                                }
×
256
                        }
257

258
                        if !aggV.ColBounded() {
1,974✔
259
                                err := aggV.updateWith(nil)
757✔
260
                                if err != nil {
757✔
261
                                        return err
×
262
                                }
×
263
                        }
264
                }
265
        }
266
        return nil
857✔
267
}
268

269
func (gr *groupedRowReader) emitCurrentRow(ctx context.Context) (*Row, error) {
89✔
270
        if gr.empty && gr.allAggregations && len(gr.groupByCols) == 0 {
101✔
271
                zr, err := gr.zeroRow(ctx)
12✔
272
                if err != nil {
12✔
273
                        return nil, err
×
274
                }
×
275

276
                gr.empty = false
12✔
277
                return zr, nil
12✔
278
        }
279

280
        if gr.currRow == nil {
105✔
281
                return nil, ErrNoMoreRows
28✔
282
        }
28✔
283

284
        r := gr.currRow
49✔
285
        gr.currRow = nil
49✔
286

49✔
287
        return r, nil
49✔
288
}
289

290
func (gr *groupedRowReader) zeroRow(ctx context.Context) (*Row, error) {
12✔
291
        // special case when all selectors are aggregations
12✔
292
        zeroRow := &Row{
12✔
293
                ValuesByPosition: make([]TypedValue, len(gr.selectors)),
12✔
294
                ValuesBySelector: make(map[string]TypedValue, len(gr.selectors)),
12✔
295
        }
12✔
296

12✔
297
        colsBySelector, err := gr.colsBySelector(ctx)
12✔
298
        if err != nil {
12✔
299
                return nil, err
×
300
        }
×
301

302
        for i, sel := range gr.selectors {
36✔
303
                aggFn, table, col := sel.resolve(gr.rowReader.TableAlias())
24✔
304
                encSel := EncodeSelector(aggFn, table, col)
24✔
305

24✔
306
                var zero TypedValue
24✔
307
                if aggFn == COUNT {
32✔
308
                        zero = zeroForType(IntegerType)
8✔
309
                } else {
24✔
310
                        zero = zeroForType(colsBySelector[encSel].Type)
16✔
311
                }
16✔
312

313
                zeroRow.ValuesByPosition[i] = zero
24✔
314
                zeroRow.ValuesBySelector[encSel] = zero
24✔
315
        }
316
        return zeroRow, nil
12✔
317
}
318

319
func (gr *groupedRowReader) initAggregations(row *Row) error {
334✔
320
        // augment row with aggregated values
334✔
321
        for _, sel := range gr.selectors {
724✔
322
                aggFn, table, col := sel.resolve(gr.rowReader.TableAlias())
390✔
323

390✔
324
                v, err := initAggValue(aggFn, table, col, sel.distinct, sel.separator)
390✔
325
                if err != nil {
390✔
UNCOV
326
                        return err
×
UNCOV
327
                }
×
328

329
                if v == nil {
390✔
330
                        continue
×
331
                }
332

333
                encSel := EncodeSelector(aggFn, table, col)
390✔
334
                row.ValuesBySelector[encSel] = v
390✔
335
        }
336

337
        for i, col := range gr.cols {
1,305✔
338
                v := row.ValuesBySelector[col.Selector()]
971✔
339

971✔
340
                if i < len(row.ValuesByPosition) {
1,917✔
341
                        row.ValuesByPosition[i] = v
946✔
342
                } else {
971✔
343
                        row.ValuesByPosition = append(row.ValuesByPosition, v)
25✔
344
                }
25✔
345
        }
346
        row.ValuesByPosition = row.ValuesByPosition[:len(gr.cols)]
334✔
347
        return updateRow(row, row)
334✔
348
}
349

350
func initAggValue(aggFn, table, col string, opts ...interface{}) (TypedValue, error) {
390✔
351
        isDistinct := false
390✔
352
        separator := ", "
390✔
353
        for _, opt := range opts {
1,170✔
354
                switch v := opt.(type) {
780✔
355
                case bool:
390✔
356
                        isDistinct = v
390✔
357
                case string:
390✔
358
                        separator = v
390✔
359
                }
360
        }
361

362
        var v TypedValue
390✔
363
        switch aggFn {
390✔
364
        case COUNT:
317✔
365
                {
634✔
366
                        cv := &CountValue{sel: EncodeSelector("", table, col), distinct: isDistinct, allRows: col == "*"}
317✔
367
                        if isDistinct {
318✔
368
                                cv.seen = make(map[string]bool)
1✔
369
                        }
1✔
370
                        v = cv
317✔
371
                }
372
        case SUM:
25✔
373
                {
50✔
374
                        v = &SumValue{
25✔
375
                                val: &NullValue{t: AnyType},
25✔
376
                                sel: EncodeSelector("", table, col),
25✔
377
                        }
25✔
378
                }
25✔
379
        case MIN:
14✔
380
                {
28✔
381
                        v = &MinValue{
14✔
382
                                val: &NullValue{t: AnyType},
14✔
383
                                sel: EncodeSelector("", table, col),
14✔
384
                        }
14✔
385
                }
14✔
386
        case MAX:
20✔
387
                {
40✔
388
                        v = &MaxValue{
20✔
389
                                val: &NullValue{t: AnyType},
20✔
390
                                sel: EncodeSelector("", table, col),
20✔
391
                        }
20✔
392
                }
20✔
393
        case AVG:
14✔
394
                {
28✔
395
                        v = &AVGValue{
14✔
396
                                s:   &NullValue{t: AnyType},
14✔
397
                                sel: EncodeSelector("", table, col),
14✔
398
                        }
14✔
399
                }
14✔
NEW
400
        case STRING_AGG:
×
NEW
401
                {
×
NEW
402
                        v = &StringAggValue{
×
NEW
403
                                sep: separator,
×
NEW
404
                                sel: EncodeSelector("", table, col),
×
NEW
405
                        }
×
NEW
406
                }
×
407
        }
408
        return v, nil
390✔
409
}
410

411
func (gr *groupedRowReader) Close() error {
122✔
412
        return gr.rowReader.Close()
122✔
413
}
122✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc