• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.0
/pkg/pgsql/server/server.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "context"
21
        "crypto/tls"
22
        "errors"
23
        "fmt"
24
        "net"
25
        "os"
26
        "sync"
27
        "sync/atomic"
28

29
        "github.com/codenotary/immudb/embedded/logger"
30
        "github.com/codenotary/immudb/pkg/database"
31
        "github.com/codenotary/immudb/pkg/server/sessions"
32

33
        // Register pg_catalog system tables (pg_namespace, pg_am, pg_class,
34
        // pg_attribute, pg_index). Side-effect import — the init() chain in
35
        // pkg/pgsql/sys calls sql.RegisterSystemTable. Without this import
36
        // the registry stays at just pg_type (the legacy entry registered
37
        // directly by embedded/sql).
38
        _ "github.com/codenotary/immudb/pkg/pgsql/sys"
39

40
        "golang.org/x/net/netutil"
41
)
42

43
type pgsrv struct {
44
        m sync.RWMutex
45
        // running is read without taking m on the Accept hot loop, so it's kept
46
        // as an atomic.Bool. Stop writes via Store(false); Serve reads via Load.
47
        running            atomic.Bool
48
        maxConnections     int
49
        tlsConfig          *tls.Config
50
        logger             logger.Logger
51
        logRequestMetadata bool
52
        host               string
53
        port               int
54
        immudbPort         int
55
        dbList             database.DatabaseList
56
        sessManager        sessions.Manager
57
        listener           net.Listener
58
}
59

60
type PGSQLServer interface {
61
        Initialize() error
62
        Serve() error
63
        Stop() error
64
        GetPort() int
65
}
66

67
func New(setters ...Option) *pgsrv {
93✔
68
        // Default Options
93✔
69
        srv := &pgsrv{
93✔
70
                maxConnections: 1000,
93✔
71
                tlsConfig:      &tls.Config{},
93✔
72
                logger:         logger.NewSimpleLogger("pgsqlSrv", os.Stderr),
93✔
73
                host:           "0.0.0.0",
93✔
74
                immudbPort:     3322,
93✔
75
                port:           5432,
93✔
76
        }
93✔
77
        srv.running.Store(true)
93✔
78

93✔
79
        for _, setter := range setters {
837✔
80
                setter(srv)
744✔
81
        }
744✔
82

83
        return srv
93✔
84
}
85

86
// Initialize initialize listener. If provided port is zero os auto assign a free one.
87
func (s *pgsrv) Initialize() (err error) {
94✔
88
        s.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", s.host, s.port))
94✔
89
        if err != nil {
95✔
90
                return err
1✔
91
        }
1✔
92
        return nil
93✔
93
}
94

95
func (s *pgsrv) Serve() (err error) {
94✔
96
        s.m.Lock()
94✔
97
        if s.listener == nil {
95✔
98
                s.m.Unlock()
1✔
99
                // Previously returned without unlocking, leaking the mutex and
1✔
100
                // deadlocking any subsequent Stop/GetPort call.
1✔
101
                return errors.New("no listener found for pgsql server")
1✔
102
        }
1✔
103
        s.listener = netutil.LimitListener(s.listener, s.maxConnections)
93✔
104
        listener := s.listener
93✔
105
        s.m.Unlock()
93✔
106

93✔
107
        if s.tlsConfig == nil || len(s.tlsConfig.Certificates) == 0 {
185✔
108
                s.logger.Warningf("pgsql server is running WITHOUT TLS. " +
92✔
109
                        "Client passwords will be transmitted in cleartext. " +
92✔
110
                        "Configure TLS or disable the pgsql server (--pgsql-server=false) in production.")
92✔
111
        }
92✔
112

113
        for {
282✔
114
                if !s.running.Load() {
189✔
UNCOV
115
                        return nil
×
116
                }
×
117

118
                conn, err := listener.Accept()
189✔
119
                if err != nil {
189✔
NEW
120
                        // Stop() closes the listener, which is the expected path out of
×
NEW
121
                        // Accept. Report an error only if we were still supposed to be
×
NEW
122
                        // running — otherwise the error is simply the consequence of Stop.
×
NEW
123
                        if !s.running.Load() {
×
NEW
124
                                return nil
×
NEW
125
                        }
×
UNCOV
126
                        s.logger.Errorf("%v", err)
×
NEW
127
                        continue
×
128
                }
129
                go s.handleRequest(context.Background(), conn)
96✔
130
        }
131
}
132

133
func (s *pgsrv) newSession(conn net.Conn) Session {
96✔
134
        return newSession(conn, s.host, s.immudbPort, s.logger, s.tlsConfig, s.logRequestMetadata, s.dbList, s.sessManager)
96✔
135
}
96✔
136

137
func (s *pgsrv) Stop() (err error) {
1✔
138
        s.running.Store(false)
1✔
139

1✔
140
        s.m.Lock()
1✔
141
        defer s.m.Unlock()
1✔
142

1✔
143
        if s.listener != nil {
1✔
UNCOV
144
                return s.listener.Close()
×
145
        }
×
146

147
        return nil
1✔
148
}
149

150
func (s *pgsrv) GetPort() int {
94✔
151
        s.m.RLock()
94✔
152
        defer s.m.RUnlock()
94✔
153

94✔
154
        if s.listener != nil {
187✔
155
                return s.listener.Addr().(*net.TCPAddr).Port
93✔
156
        }
93✔
157

158
        return 0
1✔
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc