• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.16
/embedded/sql/joint_row_reader.go
1
/*
2
Copyright 2025 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "fmt"
22

23
        "github.com/codenotary/immudb/embedded/multierr"
24
)
25

26
type jointRowReader struct {
27
        rowReader RowReader
28

29
        joins []*JoinSpec
30

31
        rowReaders                 []RowReader
32
        rowReadersValuesByPosition [][]TypedValue
33
        rowReadersValuesBySelector []map[string]TypedValue
34

35
        // outerPoppedDuringRead is set when Read() pops jointr.rowReader off
36
        // the active rowReaders stack at end-of-iteration. We deliberately
37
        // leave the outer reader open until Close() runs: wrapping readers
38
        // such as groupedRowReader.emitCurrentRow → zeroRow → colsBySelector
39
        // reach back into jointr.rowReader after iteration ends to introspect
40
        // the join's column list, and closing the outer mid-iteration trips
41
        // "already closed" on that path for empty-result COUNT(*) over JOIN.
42
        outerPoppedDuringRead bool
43

44
        // hashJoin caches per-join hash-table state. Populated lazily on the
45
        // first outer row reaching each join level. nil entry means hash join
46
        // is not in use for that join (either disabled, unsupported shape, or
47
        // build failed — fall back to the per-row Resolve path).
48
        hashTables      []*hashJoinTable
49
        hashJoinChecked []bool
50
}
51

52
func newJointRowReader(rowReader RowReader, joins []*JoinSpec) (*jointRowReader, error) {
75✔
53
        if rowReader == nil || len(joins) == 0 {
76✔
54
                return nil, ErrIllegalArguments
1✔
55
        }
1✔
56

57
        for _, jspec := range joins {
156✔
58
                switch jspec.joinType {
82✔
59
                case InnerJoin, LeftJoin, CrossJoin, FullOuterJoin:
80✔
60
                default:
2✔
61
                        return nil, ErrUnsupportedJoinType
2✔
62
                }
63
        }
64

65
        return &jointRowReader{
72✔
66
                rowReader:                  rowReader,
72✔
67
                joins:                      joins,
72✔
68
                rowReaders:                 []RowReader{rowReader},
72✔
69
                rowReadersValuesByPosition: make([][]TypedValue, 1+len(joins)),
72✔
70
                rowReadersValuesBySelector: make([]map[string]TypedValue, 1+len(joins)),
72✔
71
                hashTables:                 make([]*hashJoinTable, len(joins)),
72✔
72
                hashJoinChecked:            make([]bool, len(joins)),
72✔
73
        }, nil
72✔
74
}
75

76
func (jointr *jointRowReader) onClose(callback func()) {
36✔
77
        jointr.rowReader.onClose(callback)
36✔
78
}
36✔
79

80
func (jointr *jointRowReader) Tx() *SQLTx {
1,674✔
81
        return jointr.rowReader.Tx()
1,674✔
82
}
1,674✔
83

84
func (jointr *jointRowReader) TableAlias() string {
3,579✔
85
        return jointr.rowReader.TableAlias()
3,579✔
86
}
3,579✔
87

88
func (jointr *jointRowReader) OrderBy() []ColDescriptor {
1✔
89
        return jointr.rowReader.OrderBy()
1✔
90
}
1✔
91

92
func (jointr *jointRowReader) ScanSpecs() *ScanSpecs {
1✔
93
        return jointr.rowReader.ScanSpecs()
1✔
94
}
1✔
95

96
func (jointr *jointRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
25✔
97
        return jointr.colsByPos(ctx)
25✔
98
}
25✔
99

100
func (jointr *jointRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
59✔
101
        colDescriptors, err := jointr.rowReader.colsBySelector(ctx)
59✔
102
        if err != nil {
61✔
103
                return nil, err
2✔
104
        }
2✔
105

106
        jointDescriptors := make(map[string]ColDescriptor, len(colDescriptors))
57✔
107
        for sel, desc := range colDescriptors {
345✔
108
                jointDescriptors[sel] = desc
288✔
109
        }
288✔
110

111
        for _, jspec := range jointr.joins {
119✔
112
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
62✔
113
                //            on jointRowReader creation,
62✔
114
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
62✔
115
                //       in column list though
62✔
116
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
62✔
117
                if err != nil {
63✔
118
                        return nil, err
1✔
119
                }
1✔
120
                defer rr.Close()
61✔
121

61✔
122
                cd, err := rr.colsBySelector(ctx)
61✔
123
                if err != nil {
62✔
124
                        return nil, err
1✔
125
                }
1✔
126

127
                for sel, des := range cd {
280✔
128
                        if _, exists := jointDescriptors[sel]; exists {
221✔
129
                                return nil, fmt.Errorf(
1✔
130
                                        "error resolving '%s' in a join: %w, "+
1✔
131
                                                "use aliasing to assign unique names "+
1✔
132
                                                "for all tables, sub-queries and columns",
1✔
133
                                        sel,
1✔
134
                                        ErrAmbiguousSelector,
1✔
135
                                )
1✔
136
                        }
1✔
137
                        jointDescriptors[sel] = des
219✔
138
                }
139
        }
140
        return jointDescriptors, nil
54✔
141
}
142

143
func (jointr *jointRowReader) colsByPos(ctx context.Context) ([]ColDescriptor, error) {
25✔
144
        colDescriptors, err := jointr.rowReader.Columns(ctx)
25✔
145
        if err != nil {
26✔
146
                return nil, err
1✔
147
        }
1✔
148

149
        for _, jspec := range jointr.joins {
52✔
150

28✔
151
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
28✔
152
                //            on jointRowReader creation,
28✔
153
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
28✔
154
                //       in column list though
28✔
155
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
28✔
156
                if err != nil {
28✔
157
                        return nil, err
×
158
                }
×
159
                defer rr.Close()
28✔
160

28✔
161
                cd, err := rr.Columns(ctx)
28✔
162
                if err != nil {
28✔
163
                        return nil, err
×
164
                }
×
165

166
                colDescriptors = append(colDescriptors, cd...)
28✔
167
        }
168

169
        return colDescriptors, nil
24✔
170
}
171

172
func (jointr *jointRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
8✔
173
        err := jointr.rowReader.InferParameters(ctx, params)
8✔
174
        if err != nil {
9✔
175
                return err
1✔
176
        }
1✔
177

178
        cols, err := jointr.colsBySelector(ctx)
7✔
179
        if err != nil {
8✔
180
                return err
1✔
181
        }
1✔
182

183
        for _, join := range jointr.joins {
12✔
184
                err = join.ds.inferParameters(ctx, jointr.Tx(), params)
6✔
185
                if err != nil {
6✔
186
                        return err
×
187
                }
×
188

189
                _, err = join.cond.inferType(cols, params, jointr.TableAlias())
6✔
190
                if err != nil {
6✔
191
                        return err
×
192
                }
×
193
        }
194
        return err
6✔
195
}
196

197
func (jointr *jointRowReader) Parameters() map[string]interface{} {
1,470✔
198
        return jointr.rowReader.Parameters()
1,470✔
199
}
1,470✔
200

201
func (jointr *jointRowReader) Read(ctx context.Context) (row *Row, err error) {
452✔
202
        for {
1,078✔
203
                row := &Row{
626✔
204
                        ValuesByPosition: make([]TypedValue, 0),
626✔
205
                        ValuesBySelector: make(map[string]TypedValue),
626✔
206
                }
626✔
207

626✔
208
                for len(jointr.rowReaders) > 0 {
1,635✔
209
                        lastReader := jointr.rowReaders[len(jointr.rowReaders)-1]
1,009✔
210

1,009✔
211
                        r, err := lastReader.Read(ctx)
1,009✔
212
                        if err == ErrNoMoreRows {
1,446✔
213
                                // previous reader will need to read next row
437✔
214
                                jointr.rowReaders = jointr.rowReaders[:len(jointr.rowReaders)-1]
437✔
215

437✔
216
                                // Close inner join readers immediately to release their
437✔
217
                                // resources, but leave the outer reader (== jointr.rowReader)
437✔
218
                                // open until jointRowReader.Close runs. Wrapping readers
437✔
219
                                // (groupedRowReader.emitCurrentRow → zeroRow → colsBySelector)
437✔
220
                                // read back the outer's columns after iteration exhausts,
437✔
221
                                // and an early Close there trips "already closed" on the
437✔
222
                                // empty-result COUNT(*) over JOIN path that Gitea's
437✔
223
                                // GetIssueStats hits on the issues page.
437✔
224
                                if lastReader == jointr.rowReader {
485✔
225
                                        jointr.outerPoppedDuringRead = true
48✔
226
                                } else {
437✔
227
                                        err = lastReader.Close()
389✔
228
                                        if err != nil {
389✔
NEW
229
                                                return nil, err
×
NEW
230
                                        }
×
231
                                }
232

233
                                continue
437✔
234
                        }
235
                        if err != nil {
572✔
236
                                return nil, err
×
237
                        }
×
238

239
                        // override row data
240
                        jointr.rowReadersValuesByPosition[len(jointr.rowReaders)-1] = r.ValuesByPosition
572✔
241
                        jointr.rowReadersValuesBySelector[len(jointr.rowReaders)-1] = r.ValuesBySelector
572✔
242

572✔
243
                        break
572✔
244
                }
245

246
                if len(jointr.rowReaders) == 0 {
680✔
247
                        return nil, ErrNoMoreRows
54✔
248
                }
54✔
249

250
                // append values from readers
251
                for i := 0; i < len(jointr.rowReaders); i++ {
1,174✔
252
                        row.ValuesByPosition = append(row.ValuesByPosition, jointr.rowReadersValuesByPosition[i]...)
602✔
253

602✔
254
                        for c, v := range jointr.rowReadersValuesBySelector[i] {
4,504✔
255
                                row.ValuesBySelector[c] = v
3,902✔
256
                        }
3,902✔
257
                }
258

259
                unsolvedFK := false
572✔
260

572✔
261
                for i := len(jointr.rowReaders) - 1; i < len(jointr.joins); i++ {
1,147✔
262
                        jspec := jointr.joins[i]
575✔
263

575✔
264
                        ds := jspec.ds
575✔
265
                        where := jspec.cond.reduceSelectors(row, jointr.TableAlias())
575✔
266

575✔
267
                        // For LATERAL joins, reduce the subquery's internal WHERE with outer row values
575✔
268
                        if jspec.lateral {
575✔
NEW
269
                                if selStmt, ok := ds.(*SelectStmt); ok {
×
NEW
270
                                        lateralDS := *selStmt
×
NEW
271
                                        if lateralDS.where != nil {
×
NEW
272
                                                lateralDS.where = lateralDS.where.reduceSelectors(row, jointr.TableAlias())
×
NEW
273
                                        }
×
NEW
274
                                        ds = &lateralDS
×
275
                                }
276
                        }
277

278
                        // Hash-join fast path: for non-correlated INNER equi-joins, build the
279
                        // inner hash table once on first outer row, then probe per outer row
280
                        // instead of opening a fresh inner scan. Falls back transparently to
281
                        // the per-row Resolve path on any unsupported shape, build error, or
282
                        // if jointr.tryHashProbe returns false.
283
                        if probed, reader, r, err := jointr.tryHashProbe(ctx, i, jspec, ds, where); err != nil {
575✔
NEW
284
                                return nil, err
×
285
                        } else if probed {
1,112✔
286
                                if r == nil {
711✔
287
                                        // Hash table existed but no inner row matched this outer.
174✔
288
                                        // INNER JOIN: backtrack so the outer reader advances.
174✔
289
                                        unsolvedFK = true
174✔
290
                                        break
174✔
291
                                }
292

293
                                jointr.rowReaders = append(jointr.rowReaders, reader)
363✔
294
                                jointr.rowReadersValuesByPosition[i+1] = r.ValuesByPosition
363✔
295
                                jointr.rowReadersValuesBySelector[i+1] = r.ValuesBySelector
363✔
296

363✔
297
                                row.ValuesByPosition = append(row.ValuesByPosition, r.ValuesByPosition...)
363✔
298
                                for c, v := range r.ValuesBySelector {
1,873✔
299
                                        row.ValuesBySelector[c] = v
1,510✔
300
                                }
1,510✔
301
                                continue
363✔
302
                        }
303

304
                        jointq := &SelectStmt{
38✔
305
                                ds:      ds,
38✔
306
                                where:   where,
38✔
307
                                indexOn: jspec.indexOn,
38✔
308
                        }
38✔
309

38✔
310
                        reader, err := jointq.Resolve(ctx, jointr.Tx(), jointr.Parameters(), nil)
38✔
311
                        if err != nil {
39✔
312
                                return nil, err
1✔
313
                        }
1✔
314

315
                        r, err := reader.Read(ctx)
37✔
316
                        if err == ErrNoMoreRows {
38✔
317
                                if jspec.joinType == InnerJoin {
1✔
UNCOV
318
                                        // previous reader will need to read next row
×
UNCOV
319
                                        unsolvedFK = true
×
UNCOV
320

×
UNCOV
321
                                        err = reader.Close()
×
UNCOV
322
                                        if err != nil {
×
323
                                                return nil, err
×
324
                                        }
×
325

UNCOV
326
                                        break
×
327
                                } else { // LEFT JOIN: fill column values with NULLs
1✔
328
                                        cols, err := reader.Columns(ctx)
1✔
329
                                        if err != nil {
1✔
330
                                                return nil, err
×
331
                                        }
×
332

333
                                        r = &Row{
1✔
334
                                                ValuesByPosition: make([]TypedValue, len(cols)),
1✔
335
                                                ValuesBySelector: make(map[string]TypedValue, len(cols)),
1✔
336
                                        }
1✔
337

1✔
338
                                        for i, col := range cols {
2✔
339
                                                nullValue := NewNull(col.Type)
1✔
340

1✔
341
                                                r.ValuesByPosition[i] = nullValue
1✔
342
                                                r.ValuesBySelector[col.Selector()] = nullValue
1✔
343
                                        }
1✔
344
                                }
345
                        } else if err != nil {
36✔
346
                                reader.Close()
×
347
                                return nil, err
×
348
                        }
×
349

350
                        // progress with the joint readers
351
                        // append the reader and kept the values for following rows
352
                        jointr.rowReaders = append(jointr.rowReaders, reader)
37✔
353
                        jointr.rowReadersValuesByPosition[i+1] = r.ValuesByPosition
37✔
354
                        jointr.rowReadersValuesBySelector[i+1] = r.ValuesBySelector
37✔
355

37✔
356
                        row.ValuesByPosition = append(row.ValuesByPosition, r.ValuesByPosition...)
37✔
357

37✔
358
                        for c, v := range r.ValuesBySelector {
134✔
359
                                row.ValuesBySelector[c] = v
97✔
360
                        }
97✔
361
                }
362

363
                // all readers have a valid read
364
                if !unsolvedFK {
968✔
365
                        return row, nil
397✔
366
                }
397✔
367
        }
368
}
369

370
// tryHashProbe attempts the hash-join fast path for join index i with the
371
// already-reduced join condition for the current outer row. It returns
372
// (probed=true, reader, row, nil) when a hashProbeReader was built and a
373
// first row pulled, (probed=true, nil, nil, nil) when the outer row has no
374
// matching inner rows AND the join is INNER (caller backtracks), and
375
// (probed=false, nil, nil, nil) when the join is not eligible (LATERAL,
376
// NATURAL, FULL OUTER, non-equi cond, etc.) so the caller falls back to
377
// the existing Resolve-per-row path.
378
//
379
// LEFT JOIN: when matched is empty, synthesises a single NULL-extended row
380
// for the inner side (mirrors the slow path's NULL-fill behaviour at
381
// joint_row_reader.go:327).
382
//
383
// The hash table is built once per join level on first eligible call and
384
// cached on jointr.hashTables[i]. Build errors degrade to the slow path
385
// without surfacing — correctness must always win over the optimization.
386
//
387
// When the join cond was enriched by D7 with inner-only residual predicates,
388
// extractEquiJoinPlan separates them out and they are applied as a WHERE
389
// filter at hash-build time so D1 + D7 compose correctly.
390
func (jointr *jointRowReader) tryHashProbe(
391
        ctx context.Context,
392
        i int,
393
        jspec *JoinSpec,
394
        ds DataSource,
395
        reducedWhere ValueExp,
396
) (probed bool, reader RowReader, r *Row, err error) {
575✔
397
        if jspec.lateral || jspec.natural {
579✔
398
                return false, nil, nil, nil
4✔
399
        }
4✔
400
        if jspec.joinType != InnerJoin && jspec.joinType != LeftJoin {
578✔
401
                return false, nil, nil, nil
7✔
402
        }
7✔
403

404
        // We need the inner table's alias to classify ColSelector references in
405
        // extractEquiJoinPlan. Currently we only support simple *tableRef
406
        // sources; subqueries / values rows fall back to the slow path.
407
        tref, ok := ds.(*tableRef)
564✔
408
        if !ok {
565✔
409
                return false, nil, nil, nil
1✔
410
        }
1✔
411
        innerAlias := tref.Alias()
563✔
412

563✔
413
        if !jointr.hashJoinChecked[i] {
618✔
414
                jointr.hashJoinChecked[i] = true
55✔
415

55✔
416
                _, innerSels, innerResidual, planOk := extractEquiJoinPlan(reducedWhere, innerAlias)
55✔
417
                if !planOk {
61✔
418
                        return false, nil, nil, nil
6✔
419
                }
6✔
420

421
                ht, buildErr := buildJoinHashTable(ctx, jointr.Tx(), ds, innerSels, innerResidual, jointr.Parameters())
49✔
422
                if buildErr != nil {
50✔
423
                        // Build failure is non-fatal; fall back to the per-row path.
1✔
424
                        return false, nil, nil, nil
1✔
425
                }
1✔
426
                jointr.hashTables[i] = ht
48✔
427
        }
428

429
        ht := jointr.hashTables[i]
556✔
430
        if ht == nil {
575✔
431
                return false, nil, nil, nil
19✔
432
        }
19✔
433

434
        outerVals, _, _, planOk := extractEquiJoinPlan(reducedWhere, innerAlias)
537✔
435
        if !planOk {
537✔
NEW
436
                // Should not happen if first call succeeded, but stay defensive.
×
NEW
437
                return false, nil, nil, nil
×
NEW
438
        }
×
439

440
        matched := ht.rows[compositeHashKey(outerVals)]
537✔
441
        if len(matched) == 0 {
715✔
442
                if jspec.joinType != LeftJoin {
352✔
443
                        return true, nil, nil, nil
174✔
444
                }
174✔
445
                // LEFT JOIN with no inner match: synthesise a single NULL-extended
446
                // row over the inner column set. The probe reader returns it once,
447
                // then ErrNoMoreRows on the next Read (popping cleanly so the
448
                // outer reader advances to the next row).
449
                nullRow := &Row{
4✔
450
                        ValuesByPosition: make([]TypedValue, len(ht.cols)),
4✔
451
                        ValuesBySelector: make(map[string]TypedValue, len(ht.cols)),
4✔
452
                }
4✔
453
                for k, col := range ht.cols {
41✔
454
                        nullValue := NewNull(col.Type)
37✔
455
                        nullRow.ValuesByPosition[k] = nullValue
37✔
456
                        nullRow.ValuesBySelector[col.Selector()] = nullValue
37✔
457
                }
37✔
458
                matched = []*Row{nullRow}
4✔
459
        }
460

461
        probe := newHashProbeReader(matched, ht.cols, jointr.Tx(), jointr.Parameters())
363✔
462

363✔
463
        first, readErr := probe.Read(ctx)
363✔
464
        if readErr != nil {
363✔
NEW
465
                probe.Close()
×
NEW
466
                return false, nil, nil, readErr
×
NEW
467
        }
×
468
        return true, probe, first, nil
363✔
469
}
470

471
func (jointr *jointRowReader) Close() error {
66✔
472
        merr := multierr.NewMultiErr()
66✔
473

66✔
474
        // Closing joint readers backwards - the first reader executes the onClose callback
66✔
475
        // thus it must be closed at the end
66✔
476
        for i := len(jointr.rowReaders) - 1; i >= 0; i-- {
95✔
477
                err := jointr.rowReaders[i].Close()
29✔
478
                merr.Append(err)
29✔
479
        }
29✔
480

481
        // Read() leaves jointr.rowReader open (not Close'd) when popping the
482
        // outermost reader from the active stack so wrapping readers can
483
        // still introspect its columns post-exhaustion. Close it here so
484
        // the tx state is released.
485
        if jointr.outerPoppedDuringRead {
114✔
486
                merr.Append(jointr.rowReader.Close())
48✔
487
        }
48✔
488

489
        return merr.Reduce()
66✔
490
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc