• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841571249

23 Apr 2026 02:44PM UTC coverage: 85.275% (-4.0%) from 89.306%
24841571249

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

119 existing lines in 18 files now uncovered.

44597 of 52298 relevant lines covered (85.27%)

127591.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.66
/pkg/pgsql/server/session.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "context"
21
        "crypto/tls"
22
        "strings"
23

24
        "net"
25

26
        "github.com/codenotary/immudb/embedded/logger"
27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        "github.com/codenotary/immudb/pkg/client"
30
        "github.com/codenotary/immudb/pkg/database"
31
        "github.com/codenotary/immudb/pkg/pgsql/errors"
32
        "github.com/codenotary/immudb/pkg/server/sessions"
33
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
34
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
35
        "github.com/codenotary/immudb/pkg/pgsql/server/pgmeta"
36
)
37

38
// stmtCacheSize is the maximum number of parsed statement lists cached
39
// per session. When the cache is full the oldest entry is evicted (FIFO).
40
const stmtCacheSize = 64
41

42
type session struct {
43
        immudbHost         string
44
        immudbPort         int
45
        tlsConfig          *tls.Config
46
        log                logger.Logger
47
        logRequestMetadata bool
48

49
        dbList      database.DatabaseList
50
        sessManager sessions.Manager
51

52
        client client.ImmuClient
53

54
        ctx    context.Context
55
        user   string
56
        ipAddr string
57
        db     database.DB
58
        tx     *sql.SQLTx
59

60
        mr MessageReader
61

62
        // txStatus is the byte we report in the next ReadyForQuery message.
63
        // 'I' (idle) outside an explicit transaction; 'T' inside; 'E' inside
64
        // a transaction that's been aborted by an error. Clients (pq, JDBC,
65
        // XORM) inspect this to gate their commit/rollback logic — emitting
66
        // 'I' after a successful BEGIN trips
67
        //   "unexpected transaction status idle".
68
        // Default zero-value is 0 not 'I', so always init via session ctor.
69
        txStatus byte
70

71
        connParams      map[string]string
72
        protocolVersion string
73

74
        statements map[string]*statement
75
        portals    map[string]*portal
76

77
        // stmtCache is a bounded per-session cache of parsed SQL statement lists,
78
        // keyed on the post-normalization SQL string (after removePGCatalogReferences).
79
        // Avoids repeated lex+parse overhead for queries repeated by dashboards or ORMs.
80
        // Eviction is FIFO; stmtCacheKeys tracks insertion order for eviction.
81
        stmtCache     map[string][]sql.SQLStmt
82
        stmtCacheKeys []string
83
}
84

85
type Session interface {
86
        InitializeSession() error
87
        HandleStartup(context.Context) error
88
        QueryMachine() error
89
        HandleError(error)
90
        Close() error
91
}
92

93
func newSession(
94
        c net.Conn,
95
        immudbHost string,
96
        immudbPort int,
97
        log logger.Logger,
98
        tlsConfig *tls.Config,
99
        logRequestMetadata bool,
100
        dbList database.DatabaseList,
101
        sessManager sessions.Manager,
102
) *session {
96✔
103
        addr := c.RemoteAddr().String()
96✔
104
        i := strings.Index(addr, ":")
96✔
105
        if i >= 0 {
192✔
106
                addr = addr[:i]
96✔
107
        }
96✔
108

109
        return &session{
96✔
110
                immudbHost:         immudbHost,
96✔
111
                immudbPort:         immudbPort,
96✔
112
                tlsConfig:          tlsConfig,
96✔
113
                log:                log,
96✔
114
                logRequestMetadata: logRequestMetadata,
96✔
115
                dbList:             dbList,
96✔
116
                sessManager:        sessManager,
96✔
117
                ipAddr:             addr,
96✔
118
                mr:                 NewMessageReader(c),
96✔
119
                statements:         make(map[string]*statement),
96✔
120
                portals:            make(map[string]*portal),
96✔
121
                stmtCache:          make(map[string][]sql.SQLStmt, stmtCacheSize),
96✔
122
                stmtCacheKeys:      make([]string, 0, stmtCacheSize),
96✔
123
                txStatus:           bm.TxStatusIdle,
96✔
124
        }
96✔
125
}
126

127
func (s *session) HandleError(e error) {
31✔
128
        pgerr := errors.MapPgError(e)
31✔
129

31✔
130
        _, err := s.writeMessage(pgerr.Encode())
31✔
131
        if err != nil {
44✔
132
                // "broken pipe" / "use of closed network connection" — the
13✔
133
                // client gave up while we were preparing the ErrorResponse
13✔
134
                // (common in pq's connection-rotation paths). Not a server
13✔
135
                // fault; demote so it doesn't pollute logs.
13✔
136
                s.log.Debugf("unable to write error on wire: %v", err)
13✔
137
        }
13✔
138
}
139

140
func (s *session) nextMessage() (interface{}, bool, error) {
1,380✔
141
        msg, err := s.mr.ReadRawMessage()
1,380✔
142
        if err != nil {
1,403✔
143
                return nil, false, err
23✔
144
        }
23✔
145

146
        s.log.Debugf("received %s - %s message", string(msg.t), pgmeta.MTypes[msg.t])
1,341✔
147

1,341✔
148
        extQueryMode := false
1,341✔
149

1,341✔
150
        i, err := s.parseRawMessage(msg)
1,341✔
151
        if msg.t == 'P' ||
1,341✔
152
                msg.t == 'B' ||
1,341✔
153
                msg.t == 'D' ||
1,341✔
154
                msg.t == 'E' ||
1,341✔
155
                msg.t == 'H' {
1,983✔
156
                extQueryMode = true
642✔
157
        }
642✔
158

159
        return i, extQueryMode, err
1,341✔
160
}
161

162
func (s *session) parseRawMessage(msg *rawMessage) (interface{}, error) {
1,341✔
163
        switch msg.t {
1,341✔
164
        case 'p':
92✔
165
                return fm.ParsePasswordMsg(msg.payload)
92✔
166
        case 'Q':
223✔
167
                return fm.ParseQueryMsg(msg.payload)
223✔
168
        case 'X':
76✔
169
                return fm.ParseTerminateMsg(msg.payload)
76✔
170
        case 'P':
164✔
171
                return fm.ParseParseMsg(msg.payload)
164✔
172
        case 'B':
164✔
173
                return fm.ParseBindMsg(msg.payload)
164✔
174
        case 'D':
155✔
175
                return fm.ParseDescribeMsg(msg.payload)
155✔
176
        case 'S':
308✔
177
                return fm.ParseSyncMsg(msg.payload)
308✔
178
        case 'E':
158✔
179
                return fm.ParseExecuteMsg(msg.payload)
158✔
180
        case 'H':
1✔
181
                return fm.ParseFlushMsg(msg.payload)
1✔
NEW
182
        case 'd':
×
NEW
183
                return fm.ParseCopyDataMsg(msg.payload)
×
NEW
184
        case 'c':
×
NEW
185
                return fm.ParseCopyDoneMsg(msg.payload)
×
NEW
186
        case 'f':
×
NEW
187
                return fm.ParseCopyFailMsg(msg.payload)
×
188
        default:
×
189
                return nil, errors.ErrUnknowMessageType
×
190
        }
191
}
192

193
func (s *session) writeMessage(msg []byte) (int, error) {
2,706✔
194
        if len(msg) > 0 {
5,412✔
195
                s.log.Debugf("write %s - %s message", string(msg[0]), pgmeta.MTypes[msg[0]])
2,706✔
196
        }
2,706✔
197

198
        return s.mr.Write(msg)
2,706✔
199
}
200

201
// refreshSessionActivity updates the server-side session activity timestamp
202
// to prevent the session guard from killing long-running operations like COPY.
NEW
203
func (s *session) refreshSessionActivity() {
×
NEW
204
        if s.sessManager != nil && s.client != nil {
×
NEW
205
                sessionID := s.client.GetSessionID()
×
NEW
206
                if sessionID != "" {
×
NEW
207
                        s.sessManager.UpdateSessionActivityTime(sessionID)
×
NEW
208
                        s.log.Infof("pgcompat: refreshed session activity for %s", sessionID)
×
NEW
209
                }
×
NEW
210
        } else {
×
NEW
211
                s.log.Warningf("pgcompat: cannot refresh session: sessManager=%v client=%v", s.sessManager != nil, s.client != nil)
×
NEW
212
        }
×
213
}
214

215
func (s *session) sqlTx() (*sql.SQLTx, error) {
456✔
216
        if s.tx != nil || !s.logRequestMetadata {
912✔
217
                return s.tx, nil
456✔
218
        }
456✔
219

220
        md := schema.Metadata{
×
221
                schema.UserRequestMetadataKey: s.user,
×
222
                schema.IpRequestMetadataKey:   s.ipAddr,
×
223
        }
×
224

×
225
        // create transaction explicitly to inject request metadata
×
226
        ctx := schema.ContextWithMetadata(s.ctx, md)
×
227
        return s.db.NewSQLTx(ctx, sql.DefaultTxOptions())
×
228
}
229

230
// useDatabase rebinds this pgsql session to a different immudb database
231
// without forcing the client to reconnect. Permission enforcement is
232
// delegated to the embedded gRPC client (s.client.UseDatabase), which
233
// runs the same checks as the initial connect: the logged-in user must
234
// have Admin / R / RW on the target database, otherwise the gRPC layer
235
// returns PermissionDenied and we surface that to the pgsql client.
236
//
237
// Any in-flight SQL transaction is rolled back, and prepared statements
238
// and portals are dropped, since their plans reference the old catalog
239
// and would silently bind to wrong tables on the new database.
240
func (s *session) useDatabase(name string) error {
3✔
241
        dbHandle, err := s.dbList.GetByName(name)
3✔
242
        if err != nil {
4✔
243
                return err
1✔
244
        }
1✔
245

246
        if s.client != nil {
4✔
247
                if _, err := s.client.UseDatabase(s.ctx, &schema.Database{DatabaseName: name}); err != nil {
2✔
NEW
248
                        return err
×
NEW
249
                }
×
250
        }
251

252
        if s.tx != nil {
2✔
NEW
253
                _ = s.tx.Cancel()
×
NEW
254
                s.tx = nil
×
NEW
255
        }
×
256
        for k := range s.statements {
2✔
NEW
257
                delete(s.statements, k)
×
NEW
258
        }
×
259
        for k := range s.portals {
2✔
NEW
260
                delete(s.portals, k)
×
NEW
261
        }
×
262

263
        s.db = dbHandle
2✔
264
        s.log.Infof("pgcompat: session for user %q switched to database %q", s.user, name)
2✔
265
        return nil
2✔
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc