• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.07
/pkg/pgsql/server/copy_handler.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "fmt"
21
        "regexp"
22
        "strings"
23
        "time"
24

25
        "github.com/codenotary/immudb/embedded/sql"
26
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
27
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
28
)
29

30
var copyFromStdinRe = regexp.MustCompile(`(?i)^\s*COPY\s+(\S+)\s*\(([^)]+)\)\s+FROM\s+stdin\s*;?\s*$`)
31

32
// parseCopyStatement extracts table name and columns from a COPY ... FROM stdin statement.
33
// Returns table, columns, ok.
34
func parseCopyStatement(sql string) (string, []string, bool) {
223✔
35
        matches := copyFromStdinRe.FindStringSubmatch(sql)
223✔
36
        if len(matches) != 3 {
446✔
37
                return "", nil, false
223✔
38
        }
223✔
39

NEW
40
        table := strings.TrimSpace(matches[1])
×
NEW
41
        // Remove schema prefix (e.g. "public.actor" -> "actor")
×
NEW
42
        if idx := strings.LastIndex(table, "."); idx >= 0 {
×
NEW
43
                table = table[idx+1:]
×
NEW
44
        }
×
45

NEW
46
        colStr := matches[2]
×
NEW
47
        parts := strings.Split(colStr, ",")
×
NEW
48
        cols := make([]string, 0, len(parts))
×
NEW
49
        for _, p := range parts {
×
NEW
50
                col := strings.TrimSpace(p)
×
NEW
51
                // Strip double quotes from column names and handle reserved words
×
NEW
52
                col = strings.Trim(col, "\"")
×
NEW
53
                if col != "" {
×
NEW
54
                        col = sanitizeColumnName(col)
×
NEW
55
                        cols = append(cols, col)
×
NEW
56
                }
×
57
        }
58

NEW
59
        return table, cols, true
×
60
}
61

62
// handleCopyFromStdin implements the COPY sub-protocol.
63
// It sends CopyInResponse, reads CopyData messages, converts to INSERTs,
64
// and executes them.
NEW
65
func (s *session) handleCopyFromStdin(table string, cols []string) error {
×
NEW
66
        numCols := len(cols)
×
NEW
67

×
NEW
68
        s.log.Infof("pgcompat: COPY sending CopyInResponse for %d cols", numCols)
×
NEW
69

×
NEW
70
        // Send CopyInResponse to tell client to start sending data
×
NEW
71
        if _, err := s.writeMessage(bm.CopyInResponse(numCols)); err != nil {
×
NEW
72
                return err
×
NEW
73
        }
×
74

NEW
75
        s.log.Infof("pgcompat: COPY waiting for CopyData messages")
×
NEW
76

×
NEW
77
        // Collect all rows from CopyData messages
×
NEW
78
        var rows [][]string
×
NEW
79
        var currentData []byte
×
NEW
80

×
NEW
81
        for {
×
NEW
82
                msg, _, err := s.nextMessage()
×
NEW
83
                if err != nil {
×
NEW
84
                        s.log.Warningf("COPY %s: error reading message: %v", table, err)
×
NEW
85
                        return err
×
NEW
86
                }
×
NEW
87
                s.log.Infof("COPY %s: received protocol message", table)
×
NEW
88

×
NEW
89
                switch v := msg.(type) {
×
NEW
90
                case fm.CopyDataMsg:
×
NEW
91
                        // Accumulate data — may contain multiple lines or partial lines
×
NEW
92
                        currentData = append(currentData, v.Data...)
×
NEW
93

×
NEW
94
                        // Process complete lines
×
NEW
95
                        for {
×
NEW
96
                                idx := indexOf(currentData, '\n')
×
NEW
97
                                if idx < 0 {
×
NEW
98
                                        break
×
99
                                }
100

NEW
101
                                line := string(currentData[:idx])
×
NEW
102
                                currentData = currentData[idx+1:]
×
NEW
103

×
NEW
104
                                // Skip empty lines and the COPY terminator "\."
×
NEW
105
                                line = strings.TrimRight(line, "\r")
×
NEW
106
                                if line == "" || line == "\\." {
×
NEW
107
                                        continue
×
108
                                }
109

110
                                // Parse tab-separated values
NEW
111
                                fields := strings.Split(line, "\t")
×
NEW
112
                                if len(fields) != numCols {
×
NEW
113
                                        s.log.Warningf("COPY %s: row has %d field(s), expected %d — skipping malformed row",
×
NEW
114
                                                table, len(fields), numCols)
×
NEW
115
                                        continue
×
116
                                }
NEW
117
                                row := make([]string, numCols)
×
NEW
118
                                for i, f := range fields {
×
NEW
119
                                        row[i] = unescapeCopyValue(f)
×
NEW
120
                                }
×
NEW
121
                                rows = append(rows, row)
×
122
                        }
123

NEW
124
                case fm.CopyDoneMsg:
×
NEW
125
                        // Process any remaining data
×
NEW
126
                        if len(currentData) > 0 {
×
NEW
127
                                line := strings.TrimRight(string(currentData), "\r\n")
×
NEW
128
                                if line != "" && line != "\\." {
×
NEW
129
                                        fields := strings.Split(line, "\t")
×
NEW
130
                                        if len(fields) != numCols {
×
NEW
131
                                                s.log.Warningf("COPY %s: row has %d field(s), expected %d — skipping malformed row",
×
NEW
132
                                                        table, len(fields), numCols)
×
NEW
133
                                        } else {
×
NEW
134
                                                row := make([]string, numCols)
×
NEW
135
                                                for i, f := range fields {
×
NEW
136
                                                        row[i] = unescapeCopyValue(f)
×
NEW
137
                                                }
×
NEW
138
                                                rows = append(rows, row)
×
139
                                        }
140
                                }
141
                        }
142

143
                        // Execute INSERTs in batches
NEW
144
                        rowCount, err := s.executeCopyInserts(table, cols, rows)
×
NEW
145
                        if err != nil {
×
NEW
146
                                return err
×
NEW
147
                        }
×
148

149
                        // Send CommandComplete with row count
NEW
150
                        _, err = s.writeMessage(bm.CommandComplete([]byte(fmt.Sprintf("COPY %d", rowCount))))
×
NEW
151
                        return err
×
152

NEW
153
                case fm.CopyFailMsg:
×
NEW
154
                        s.log.Warningf("COPY failed: %s", v.Error)
×
NEW
155
                        return fmt.Errorf("COPY failed: %s", v.Error)
×
156

NEW
157
                default:
×
NEW
158
                        return fmt.Errorf("unexpected message during COPY: %T", v)
×
159
                }
160
        }
161
}
162

163
// executeCopyInserts converts COPY rows to INSERT statements and executes them.
NEW
164
func (s *session) executeCopyInserts(table string, cols []string, rows [][]string) (int, error) {
×
NEW
165
        s.log.Infof("COPY %s: executing %d rows into %d columns", table, len(rows), len(cols))
×
NEW
166
        if len(rows) == 0 {
×
NEW
167
                return 0, nil
×
NEW
168
        }
×
169

NEW
170
        colList := strings.Join(cols, ", ")
×
NEW
171
        batchSize := 100
×
NEW
172
        totalInserted := 0
×
NEW
173
        lastKeepAlive := time.Now()
×
NEW
174

×
NEW
175
        for i := 0; i < len(rows); i += batchSize {
×
NEW
176
                // Refresh session activity to prevent session timeout during bulk inserts
×
NEW
177
                if time.Since(lastKeepAlive) > 10*time.Second {
×
NEW
178
                        s.refreshSessionActivity()
×
NEW
179
                        lastKeepAlive = time.Now()
×
NEW
180
                }
×
NEW
181
                end := i + batchSize
×
NEW
182
                if end > len(rows) {
×
NEW
183
                        end = len(rows)
×
NEW
184
                }
×
NEW
185
                batch := rows[i:end]
×
NEW
186

×
NEW
187
                for _, row := range batch {
×
NEW
188
                        // Build VALUES clause
×
NEW
189
                        vals := make([]string, len(row))
×
NEW
190
                        for j, v := range row {
×
NEW
191
                                if v == "NULL" {
×
NEW
192
                                        vals[j] = "NULL"
×
NEW
193
                                } else if isTimestampValue(v) {
×
NEW
194
                                        // immudb requires CAST for timestamp string literals
×
NEW
195
                                        // Strip timezone offset (+00, -05:00) that immudb can't parse
×
NEW
196
                                        tsVal := stripTimestampTz(v)
×
NEW
197
                                        vals[j] = "CAST('" + strings.ReplaceAll(tsVal, "'", "''") + "' AS TIMESTAMP)"
×
NEW
198
                                } else if isBoolValue(v) {
×
NEW
199
                                        vals[j] = normalizeBool(v)
×
NEW
200
                                } else {
×
NEW
201
                                        vals[j] = "'" + strings.ReplaceAll(v, "'", "''") + "'"
×
NEW
202
                                }
×
203
                        }
204

NEW
205
                        insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
×
NEW
206
                                table, colList, strings.Join(vals, ", "))
×
NEW
207

×
NEW
208
                        stmts, err := removePGCatalogReferencesAndParse(insertSQL)
×
NEW
209
                        if err != nil {
×
NEW
210
                                s.log.Warningf("COPY INSERT parse error (skipping row): %v — SQL: %.200s", err, insertSQL)
×
NEW
211
                                continue
×
212
                        }
213

NEW
214
                        for _, stmt := range stmts {
×
NEW
215
                                if err := s.exec(stmt, nil, nil, false); err != nil {
×
NEW
216
                                        s.log.Warningf("COPY INSERT exec error (skipping row): %v", err)
×
NEW
217
                                        continue
×
218
                                }
219
                        }
220

NEW
221
                        totalInserted++
×
222
                }
223
        }
224

NEW
225
        if totalInserted < len(rows) && len(rows) > 0 {
×
NEW
226
                s.log.Warningf("COPY %s: only %d/%d rows inserted (some failed)", table, totalInserted, len(rows))
×
NEW
227
        } else {
×
NEW
228
                s.log.Infof("COPY %s: inserted %d/%d rows", table, totalInserted, len(rows))
×
NEW
229
        }
×
NEW
230
        return totalInserted, nil
×
231
}
232

233
// removePGCatalogReferencesAndParse is a helper that strips pg_catalog references
234
// and parses the SQL.
NEW
235
func removePGCatalogReferencesAndParse(sqlStr string) ([]sql.SQLStmt, error) {
×
NEW
236
        cleaned := removePGCatalogReferences(sqlStr)
×
NEW
237
        return sql.ParseSQL(strings.NewReader(cleaned))
×
NEW
238
}
×
239

240
// unescapeCopyValue converts COPY text format escapes.
241
// \N = NULL, \\ = backslash, \t = tab, \n = newline
NEW
242
func unescapeCopyValue(s string) string {
×
NEW
243
        if s == "\\N" {
×
NEW
244
                return "NULL"
×
NEW
245
        }
×
246

NEW
247
        var b strings.Builder
×
NEW
248
        b.Grow(len(s))
×
NEW
249

×
NEW
250
        for i := 0; i < len(s); i++ {
×
NEW
251
                if s[i] == '\\' && i+1 < len(s) {
×
NEW
252
                        switch s[i+1] {
×
NEW
253
                        case '\\':
×
NEW
254
                                b.WriteByte('\\')
×
NEW
255
                                i++
×
NEW
256
                        case 'n':
×
NEW
257
                                b.WriteByte('\n')
×
NEW
258
                                i++
×
NEW
259
                        case 't':
×
NEW
260
                                b.WriteByte('\t')
×
NEW
261
                                i++
×
NEW
262
                        case 'r':
×
NEW
263
                                b.WriteByte('\r')
×
NEW
264
                                i++
×
NEW
265
                        default:
×
NEW
266
                                b.WriteByte(s[i])
×
267
                        }
NEW
268
                } else {
×
NEW
269
                        b.WriteByte(s[i])
×
NEW
270
                }
×
271
        }
NEW
272
        return b.String()
×
273
}
274

275
var timestampRe = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}`)
276

277
// timestampTzStripRe matches the trailing TZ offset (+HH, +HH:MM, -HH, -HH:MM)
278
// only when it follows a time component (HH:MM or HH:MM:SS[.fff]). The leading
279
// `\d{2}:\d{2}(...)?` capture is required so that bare DATE values like
280
// `2021-09-25` are not mistaken for `... -25` timezone offsets.
281
var timestampTzStripRe = regexp.MustCompile(`(\d{2}:\d{2}(?::\d{2}(?:\.\d+)?)?)[+-]\d{2}(?::\d{2})?$`)
282

NEW
283
func isTimestampValue(v string) bool {
×
NEW
284
        return timestampRe.MatchString(v)
×
NEW
285
}
×
286

287
// stripTimestampTz removes a trailing timezone offset (+00, +00:00, -05:00)
288
// from timestamptz strings. Plain dates and timestamps without a TZ offset
289
// are returned unchanged.
290
func stripTimestampTz(v string) string {
12✔
291
        return timestampTzStripRe.ReplaceAllString(v, "$1")
12✔
292
}
12✔
293

294
// sqlReservedWords lists tokens that immudb's SQL parser
295
// (embedded/sql/sql_grammar.y) refuses as bare identifiers in DDL/DML
296
// contexts. When a COPY column list mentions one of these, sanitizeColumnName
297
// rewrites it to `_<word>` so the generated INSERT parses.
298
//
299
// Words that *look* reserved in PostgreSQL but are accepted as identifiers
300
// by immudb (verified live with `CREATE TABLE t(id INT NOT NULL, <word>
301
// VARCHAR(8), PRIMARY KEY(id))`) are intentionally absent — listing them
302
// forces an unnecessary rename of perfectly valid column names like
303
// `type`, `date`, `year`, etc., which broke loading the netflix sample
304
// dump.
305
var sqlReservedWords = map[string]bool{
306
        "check": true, "default": true, "desc": true, "asc": true, "select": true,
307
        "from": true, "where": true, "grant": true, "user": true,
308
        "limit": true, "primary": true,
309
        "foreign": true, "create": true, "drop": true, "alter": true, "insert": true,
310
        "update": true, "delete": true,
311
        "having": true, "like": true, "in": true, "is": true,
312
        "not": true, "null": true, "and": true, "or": true, "cast": true,
313
        "case": true, "when": true, "then": true, "else": true, "end": true,
314
        "join": true, "on": true, "as": true, "distinct": true, "all": true,
315
        "any": true, "exists": true, "union": true, "except": true, "intersect": true,
316
        "natural": true, "cross": true, "full": true, "outer": true, "inner": true,
317
        "left": true, "right": true, "using": true, "returning": true, "with": true,
318
        "recursive": true, "password": true, "database": true, "transaction": true,
319
        "column": true, "table": true,
320
}
321

322
func sanitizeColumnName(col string) string {
37✔
323
        if sqlReservedWords[strings.ToLower(col)] {
55✔
324
                return "_" + col
18✔
325
        }
18✔
326
        return col
19✔
327
}
328

NEW
329
func isBoolValue(v string) bool {
×
NEW
330
        return v == "t" || v == "f" || v == "true" || v == "false" || v == "TRUE" || v == "FALSE"
×
NEW
331
}
×
332

NEW
333
func normalizeBool(v string) string {
×
NEW
334
        if v == "t" {
×
NEW
335
                return "true"
×
NEW
336
        }
×
NEW
337
        if v == "f" {
×
NEW
338
                return "false"
×
NEW
339
        }
×
NEW
340
        return v
×
341
}
342

NEW
343
func indexOf(data []byte, b byte) int {
×
NEW
344
        for i, c := range data {
×
NEW
345
                if c == b {
×
NEW
346
                        return i
×
NEW
347
                }
×
348
        }
NEW
349
        return -1
×
350
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc