• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.68
/pkg/pgsql/server/initialize_session.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "context"
23
        "crypto/tls"
24
        "encoding/binary"
25
        "errors"
26
        "fmt"
27

28
        "github.com/codenotary/immudb/pkg/client"
29
        "github.com/codenotary/immudb/pkg/database"
30
        pserr "github.com/codenotary/immudb/pkg/pgsql/errors"
31
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
32
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
33
        "github.com/codenotary/immudb/pkg/pgsql/server/pgmeta"
34
        "google.golang.org/grpc"
35
        "google.golang.org/grpc/credentials"
36
        "google.golang.org/grpc/credentials/insecure"
37
        "google.golang.org/grpc/metadata"
38
)
39

40
// InitializeSession
41
func (s *session) InitializeSession() (err error) {
96✔
42
        defer func() {
192✔
43
                if err != nil {
98✔
44
                        s.HandleError(err)
2✔
45
                        s.mr.CloseConnection()
2✔
46
                }
2✔
47
        }()
48

49
        lb := make([]byte, 4)
96✔
50
        if _, err := s.mr.Read(lb); err != nil {
96✔
51
                return err
×
52
        }
×
53
        pvb := make([]byte, 4)
96✔
54
        if _, err := s.mr.Read(pvb); err != nil {
96✔
55
                return err
×
56
        }
×
57

58
        s.protocolVersion = parseProtocolVersion(pvb)
96✔
59

96✔
60
        // SSL Request packet
96✔
61
        if s.protocolVersion == pgmeta.PgsqlSSLRequestProtocolVersion {
98✔
62
                if s.tlsConfig == nil || len(s.tlsConfig.Certificates) == 0 {
3✔
63
                        // Respond with 'N' (no SSL) and continue — the client will
1✔
64
                        // retry with a plaintext startup message per PG protocol spec.
1✔
65
                        if _, err = s.writeMessage([]byte(`N`)); err != nil {
1✔
66
                                return err
×
67
                        }
×
68
                } else {
1✔
69
                        if _, err = s.writeMessage([]byte(`S`)); err != nil {
1✔
NEW
70
                                return err
×
NEW
71
                        }
×
72

73
                        if err = s.handshake(); err != nil {
1✔
NEW
74
                                return err
×
NEW
75
                        }
×
76
                }
77

78
                // Read the next startup message (plaintext retry or post-TLS startup)
79
                lb = make([]byte, 4)
2✔
80
                if _, err := s.mr.Read(lb); err != nil {
3✔
81
                        return err
1✔
82
                }
1✔
83
                pvb = make([]byte, 4)
1✔
84
                if _, err := s.mr.Read(pvb); err != nil {
1✔
85
                        return err
×
86
                }
×
87

88
                s.protocolVersion = parseProtocolVersion(pvb)
1✔
89
        }
90

91
        if !isValidProtocolVersion(s.protocolVersion) {
96✔
92
                return fmt.Errorf("%w: %s", pgmeta.ErrInvalidPgsqlProtocolVersion, s.protocolVersion)
1✔
93
        }
1✔
94

95
        // startup message
96
        connStringLenght := int(binary.BigEndian.Uint32(lb) - 8)
94✔
97
        if connStringLenght < 0 {
94✔
98
                return pserr.ErrMalformedMessage
×
99
        }
×
100

101
        if connStringLenght > pgmeta.MaxMsgSize {
94✔
102
                return pserr.ErrMessageTooLarge
×
103
        }
×
104

105
        connString := make([]byte, connStringLenght)
94✔
106

94✔
107
        if _, err := s.mr.Read(connString); err != nil {
94✔
108
                return err
×
109
        }
×
110

111
        pr := bufio.NewScanner(bytes.NewBuffer(connString))
94✔
112

94✔
113
        split := func(data []byte, atEOF bool) (int, []byte, error) {
1,014✔
114
                if atEOF && len(data) == 0 {
1,108✔
115
                        return 0, nil, nil
188✔
116
                }
188✔
117
                if i := bytes.IndexByte(data, 0); i >= 0 {
1,464✔
118
                        return i + 1, data[0:i], nil
732✔
119
                }
732✔
120
                if atEOF {
×
121
                        return len(data), data, nil
×
122
                }
×
123
                return 0, nil, nil
×
124
        }
125

126
        pr.Split(split)
94✔
127

94✔
128
        pmap := make(map[string]string)
94✔
129

94✔
130
        for pr.Scan() {
507✔
131
                key := pr.Text()
413✔
132
                for pr.Scan() {
732✔
133
                        value := pr.Text()
319✔
134
                        if value != "" {
638✔
135
                                pmap[key] = value
319✔
136
                        }
319✔
137
                        break
319✔
138
                }
139
        }
140

141
        s.connParams = pmap
94✔
142

94✔
143
        return nil
94✔
144
}
145

146
// HandleStartup errors are returned and handled in the caller
147
func (s *session) HandleStartup(ctx context.Context) (err error) {
94✔
148
        defer func() {
188✔
149
                if err != nil {
99✔
150
                        s.HandleError(err)
5✔
151
                        s.mr.CloseConnection()
5✔
152
                }
5✔
153
        }()
154

155
        user, ok := s.connParams["user"]
94✔
156
        if !ok || user == "" {
94✔
157
                return pserr.ErrUsernameNotprovided
×
158
        }
×
159
        s.user = user
94✔
160

94✔
161
        db, ok := s.connParams["database"]
94✔
162
        if !ok {
95✔
163
                return pserr.ErrDBNotprovided
1✔
164
        }
1✔
165

166
        s.db, err = s.dbList.GetByName(db)
93✔
167
        if err != nil {
94✔
168
                if errors.Is(err, database.ErrDatabaseNotExists) {
2✔
169
                        return pserr.ErrDBNotExists
1✔
170
                }
1✔
171
                return err
×
172
        }
173

174
        if _, err = s.writeMessage(bm.AuthenticationCleartextPassword()); err != nil {
92✔
175
                return err
×
176
        }
×
177

178
        msg, _, err := s.nextMessage()
92✔
179
        if err != nil {
92✔
180
                return err
×
181
        }
×
182

183
        pw, ok := msg.(fm.PasswordMsg)
92✔
184
        if !ok || pw.GetSecret() == "" {
94✔
185
                return pserr.ErrPwNotprovided
2✔
186
        }
2✔
187

188
        var transportCredentials credentials.TransportCredentials
90✔
189

90✔
190
        if s.tlsConfig == nil || s.tlsConfig.RootCAs == nil {
179✔
191
                transportCredentials = insecure.NewCredentials()
89✔
192
        } else {
90✔
193
                config := &tls.Config{
1✔
194
                        RootCAs: s.tlsConfig.RootCAs,
1✔
195
                }
1✔
196

1✔
197
                transportCredentials = credentials.NewTLS(config)
1✔
198
        }
1✔
199

200
        opts := client.DefaultOptions().
90✔
201
                WithAddress(s.immudbHost).
90✔
202
                WithPort(s.immudbPort).
90✔
203
                WithDisableIdentityCheck(true).
90✔
204
                WithDialOptions([]grpc.DialOption{grpc.WithTransportCredentials(transportCredentials)})
90✔
205

90✔
206
        s.client = client.NewClient().WithOptions(opts)
90✔
207

90✔
208
        err = s.client.OpenSession(ctx, []byte(user), []byte(pw.GetSecret()), db)
90✔
209
        if err != nil {
91✔
210
                return err
1✔
211
        }
1✔
212

213
        s.client.CurrentState(context.Background())
89✔
214

89✔
215
        sessionID := s.client.GetSessionID()
89✔
216
        s.ctx = metadata.NewIncomingContext(context.Background(), metadata.Pairs("sessionid", sessionID))
89✔
217

89✔
218
        s.log.Debugf("authentication successful for %s", user)
89✔
219
        if _, err := s.writeMessage(bm.AuthenticationOk()); err != nil {
89✔
220
                return err
×
221
        }
×
222

223
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("standard_conforming_strings"), []byte("on"))); err != nil {
89✔
224
                return err
×
225
        }
×
226

227
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("client_encoding"), []byte("UTF8"))); err != nil {
89✔
228
                return err
×
229
        }
×
230

231
        // server_version is needed by jdbc driver and pgAdmin
232
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("server_version"), []byte(pgmeta.PgsqlServerVersion))); err != nil {
89✔
233
                return err
×
234
        }
×
235

236
        // server_version_num is required by pgAdmin to determine feature support
237
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("server_version_num"), []byte(pgmeta.PgsqlServerVersionNum))); err != nil {
89✔
NEW
238
                return err
×
NEW
239
        }
×
240

241
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("server_encoding"), []byte("UTF8"))); err != nil {
89✔
NEW
242
                return err
×
NEW
243
        }
×
244

245
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("integer_datetimes"), []byte("on"))); err != nil {
89✔
NEW
246
                return err
×
NEW
247
        }
×
248

249
        if _, err := s.writeMessage(bm.ParameterStatus([]byte("DateStyle"), []byte("ISO, MDY"))); err != nil {
89✔
NEW
250
                return err
×
NEW
251
        }
×
252

253
        return nil
89✔
254
}
255

256
func parseProtocolVersion(payload []byte) string {
97✔
257
        major := int(binary.BigEndian.Uint16(payload[0:2]))
97✔
258
        minor := int(binary.BigEndian.Uint16(payload[2:4]))
97✔
259
        return fmt.Sprintf("%d.%d", major, minor)
97✔
260
}
97✔
261

262
func isValidProtocolVersion(version string) bool {
95✔
263
        return version == pgmeta.PgsqlProtocolVersion || version == pgmeta.PgsqlSSLRequestProtocolVersion
95✔
264
}
95✔
265

266
func (s *session) Close() error {
80✔
267
        s.mr.CloseConnection()
80✔
268

80✔
269
        if s.client != nil {
154✔
270
                return s.client.CloseSession(s.ctx)
74✔
271
        }
74✔
272

273
        return nil
6✔
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc