• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.3
/pkg/pgsql/server/pgadmin_compat.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "bytes"
21
        "encoding/binary"
22
        "regexp"
23
        "strings"
24

25
        "github.com/codenotary/immudb/embedded/sql"
26
        "github.com/codenotary/immudb/pkg/api/schema"
27
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
28
        "github.com/codenotary/immudb/pkg/pgsql/server/pgmeta"
29
)
30

31
// Known column values for pg_catalog canned responses.
32
// When pgAdmin asks for these columns, we return sensible defaults.
33
var knownColumnValues = map[string]sql.TypedValue{
34
        // pg_database
35
        "did":            sql.NewInteger(16384),
36
        "oid":            sql.NewInteger(16384),
37
        "datname":        nil, // filled dynamically with current db name
38
        "datallowconn":   sql.NewBool(true),
39
        "datistemplate":  sql.NewBool(false),
40
        "is_template":    sql.NewBool(false),
41
        "datlastsysoid":  sql.NewInteger(12000),
42
        "datconnlimit":   sql.NewInteger(-1),
43
        "encoding":       sql.NewInteger(6), // UTF8
44
        "serverencoding": sql.NewVarchar("UTF8"),
45
        "datacl":         sql.NewNull(sql.VarcharType),
46
        "owner":          sql.NewInteger(10),
47
        "datdba":         sql.NewInteger(10),
48

49
        // pg_roles / user info
50
        "id":                 sql.NewInteger(10),
51
        "rolname":            sql.NewVarchar("immudb"),
52
        "name":               nil, // filled dynamically based on query context
53
        "rolsuper":           sql.NewBool(true),
54
        "is_superuser":       sql.NewBool(true),
55
        "rolinherit":         sql.NewBool(true),
56
        "rolcreaterole":      sql.NewBool(true),
57
        "can_create_role":    sql.NewBool(true),
58
        "rolcreatedb":        sql.NewBool(true),
59
        "can_create_db":      sql.NewBool(true),
60
        "rolcanlogin":        sql.NewBool(true),
61
        "rolreplication":     sql.NewBool(false),
62
        "rolbypassrls":       sql.NewBool(false),
63
        "can_signal_backend": sql.NewBool(true),
64
        "rolvaliduntil":      sql.NewNull(sql.VarcharType),
65
        "rolconfig":          sql.NewNull(sql.VarcharType),
66

67
        // pg_settings
68
        "setting":    sql.NewVarchar(""),
69
        "unit":       sql.NewNull(sql.VarcharType),
70
        "category":   sql.NewVarchar(""),
71
        "short_desc": sql.NewVarchar(""),
72
        "vartype":    sql.NewVarchar("string"),
73
        "context":    sql.NewVarchar("internal"),
74
        "source":     sql.NewVarchar("default"),
75

76
        // pg_tablespace
77
        "spcname": sql.NewVarchar("pg_default"),
78
        "spcacl":  sql.NewNull(sql.VarcharType),
79

80
        // general boolean columns pgAdmin probes
81
        "cancreate":        sql.NewBool(true),
82
        "connected":        sql.NewBool(true),
83
        "gss_authenticated": sql.NewBool(false),
84
        "encrypted":        sql.NewBool(false),
85

86
        // replication type
87
        "type": sql.NewNull(sql.VarcharType),
88

89
        // general
90
        "description": sql.NewNull(sql.VarcharType),
91
        "comment":     sql.NewNull(sql.VarcharType),
92
}
93

94
// Regex to extract "AS alias" patterns from anywhere in the query
95
var asAliasRe = regexp.MustCompile(`(?i)\bAS\s+(\w+)`)
96

97
// handlePgSystemQuery returns canned responses for pg_catalog queries.
98
// It extracts column aliases from the SQL and returns a single row with
99
// known default values for each column.
NEW
100
func (s *session) handlePgSystemQuery(query string) error {
×
NEW
101
        cols := extractColumnNames(query)
×
NEW
102
        if len(cols) == 0 {
×
NEW
103
                // Can't determine columns — return empty result with single "result" column
×
NEW
104
                cols = []string{"result"}
×
NEW
105
        }
×
106

107
        // Debug logging
NEW
108
        s.log.Infof("pgcompat: intercepted query: %q", query)
×
NEW
109
        s.log.Infof("pgcompat: extracted columns: %v", cols)
×
NEW
110
        s.log.Infof("pgcompat: query length: %d bytes", len(query))
×
NEW
111

×
NEW
112
        // pg_type lookups: emit a real row per standard PG type so Rails's
×
NEW
113
        // load_additional_types populates its OID-to-Ruby-Type map. Without
×
NEW
114
        // this, every column value comes back with a "unknown OID NNNN"
×
NEW
115
        // warning and the cast type becomes Type::Value (the default
×
NEW
116
        // fallback), which then breaks `enum :col, ...` at model-load time.
×
NEW
117
        // pg_range stays as a 0-row response.
×
NEW
118
        lq := strings.ToLower(query)
×
NEW
119
        if strings.Contains(lq, "pg_type") {
×
NEW
120
                return s.handlePgTypeRows(query, cols)
×
NEW
121
        }
×
NEW
122
        if strings.Contains(lq, "pg_range") {
×
NEW
123
                colDescs := make([]sql.ColDescriptor, len(cols))
×
NEW
124
                for i, name := range cols {
×
NEW
125
                        colDescs[i] = sql.ColDescriptor{Column: name, Type: sql.VarcharType}
×
NEW
126
                }
×
NEW
127
                if _, err := s.writeMessage(buildMultiColRowDescription(colDescs)); err != nil {
×
NEW
128
                        return err
×
NEW
129
                }
×
NEW
130
                s.log.Infof("pgcompat: pg_range query — returning 0 rows")
×
NEW
131
                return nil
×
132
        }
NEW
133
        for i, name := range cols {
×
NEW
134
                if val, ok := knownColumnValues[name]; ok && val != nil {
×
NEW
135
                        s.log.Infof("pgcompat:   col[%d] %s -> type=%s val=%v", i, name, val.Type(), val.RawValue())
×
NEW
136
                } else {
×
NEW
137
                        s.log.Infof("pgcompat:   col[%d] %s -> dynamic/null", i, name)
×
NEW
138
                }
×
139
        }
140

141
        // Build column descriptors
NEW
142
        colDescs := make([]sql.ColDescriptor, len(cols))
×
NEW
143
        for i, name := range cols {
×
NEW
144
                colType := sql.VarcharType
×
NEW
145
                if val, ok := knownColumnValues[name]; ok && val != nil {
×
NEW
146
                        colType = val.Type()
×
NEW
147
                }
×
NEW
148
                colDescs[i] = sql.ColDescriptor{Column: name, Type: colType}
×
149
        }
150

151
        // Write row description with plain column names
NEW
152
        if _, err := s.writeMessage(buildMultiColRowDescription(colDescs)); err != nil {
×
NEW
153
                return err
×
NEW
154
        }
×
155

156
        // Build row values
NEW
157
        dbName := "defaultdb"
×
NEW
158
        if s.db != nil {
×
NEW
159
                dbName = s.db.GetName()
×
NEW
160
        }
×
161

NEW
162
        values := make([]sql.TypedValue, len(cols))
×
NEW
163
        for i, name := range cols {
×
NEW
164
                if val, ok := knownColumnValues[name]; ok {
×
NEW
165
                        if val == nil {
×
NEW
166
                                // dynamic value
×
NEW
167
                                switch name {
×
NEW
168
                                case "datname":
×
NEW
169
                                        values[i] = sql.NewVarchar(dbName)
×
NEW
170
                                case "name":
×
NEW
171
                                        if strings.Contains(strings.ToLower(query), "pg_database") {
×
NEW
172
                                                values[i] = sql.NewVarchar(dbName)
×
NEW
173
                                        } else {
×
NEW
174
                                                values[i] = sql.NewVarchar("immudb")
×
NEW
175
                                        }
×
NEW
176
                                default:
×
NEW
177
                                        values[i] = sql.NewNull(sql.VarcharType)
×
178
                                }
NEW
179
                        } else {
×
NEW
180
                                values[i] = val
×
NEW
181
                        }
×
NEW
182
                } else {
×
NEW
183
                        // Unknown column — return NULL
×
NEW
184
                        values[i] = sql.NewNull(sql.VarcharType)
×
NEW
185
                }
×
186
        }
187

NEW
188
        row := &sql.Row{ValuesByPosition: values}
×
NEW
189
        if _, err := s.writeMessage(bm.DataRow([]*sql.Row{row}, len(cols), nil)); err != nil {
×
NEW
190
                return err
×
NEW
191
        }
×
192

NEW
193
        return nil
×
194
}
195

196
// extractColumnNames extracts column names from a SQL query.
197
// Strategy: find all "AS alias" patterns, plus bare column names from
198
// simple "SELECT col1, col2 FROM" patterns.
199
//
200
// IMPORTANT: returns one entry per SELECT-list item. psql's backslash
201
// commands (\d, \l, …) use hard-coded column indexes on the result, so
202
// dropping an item makes libpq crash with
203
// "column number N is out of range 0..M" followed by a segfault. When a
204
// name cannot be derived (bare literal, CASE…END, other complex exprs)
205
// we emit "?column?" — the same placeholder real PostgreSQL uses.
206
func extractColumnNames(query string) []string {
10✔
207
        upper := strings.ToUpper(query)
10✔
208
        selIdx := strings.Index(upper, "SELECT")
10✔
209
        if selIdx < 0 {
10✔
NEW
210
                return nil
×
NEW
211
        }
×
212

213
        rest := query[selIdx+6:]
10✔
214

10✔
215
        // Find FROM at depth 0
10✔
216
        depth := 0
10✔
217
        fromIdx := -1
10✔
218
        for i := 0; i < len(rest)-4; i++ {
586✔
219
                switch rest[i] {
576✔
NEW
220
                case '(':
×
NEW
221
                        depth++
×
NEW
222
                case ')':
×
NEW
223
                        depth--
×
224
                default:
576✔
225
                        if depth == 0 && i+5 <= len(rest) && strings.EqualFold(rest[i:i+4], "FROM") {
586✔
226
                                before := byte(' ')
10✔
227
                                if i > 0 {
20✔
228
                                        before = rest[i-1]
10✔
229
                                }
10✔
230
                                after := byte(' ')
10✔
231
                                if i+4 < len(rest) {
20✔
232
                                        after = rest[i+4]
10✔
233
                                }
10✔
234
                                if (before == ' ' || before == '\n' || before == '\t') &&
10✔
235
                                        (after == ' ' || after == '\n' || after == '\t') {
20✔
236
                                        fromIdx = i
10✔
237
                                }
10✔
238
                        }
239
                }
240
                if fromIdx >= 0 {
586✔
241
                        break
10✔
242
                }
243
        }
244

245
        var selectList string
10✔
246
        if fromIdx < 0 {
10✔
NEW
247
                // No FROM clause (e.g. "SELECT CASE ... END as type;")
×
NEW
248
                // Use everything after SELECT, stripping trailing semicolons
×
NEW
249
                selectList = strings.TrimSpace(strings.TrimRight(rest, "; \n\t"))
×
250
        } else {
10✔
251
                selectList = strings.TrimSpace(rest[:fromIdx])
10✔
252
        }
10✔
253
        if selectList == "*" {
11✔
254
                return nil
1✔
255
        }
1✔
256

257
        parts := splitAtDepthZero(selectList, ',')
9✔
258

9✔
259
        var cols []string
9✔
260
        for _, part := range parts {
47✔
261
                part = strings.TrimSpace(part)
38✔
262
                if part == "" {
38✔
NEW
263
                        // Only genuinely empty (e.g. trailing comma artifact) — no
×
NEW
264
                        // corresponding SELECT-list item, so don't count one.
×
NEW
265
                        continue
×
266
                }
267

268
                colName := ""
38✔
269

38✔
270
                // Check for "... AS alias"
38✔
271
                asMatch := regexp.MustCompile(`(?i)\bAS\s+(\w+)\s*$`).FindStringSubmatch(part)
38✔
272
                if len(asMatch) == 2 {
41✔
273
                        colName = strings.ToLower(asMatch[1])
3✔
274
                } else {
38✔
275
                        // No AS — get the last token as bare column name.
35✔
276
                        // Handle "table.column" or just "column".
35✔
277
                        words := strings.Fields(part)
35✔
278
                        if len(words) > 0 {
70✔
279
                                last := words[len(words)-1]
35✔
280
                                if idx := strings.LastIndex(last, "."); idx >= 0 {
54✔
281
                                        last = last[idx+1:]
19✔
282
                                }
19✔
283
                                last = strings.Trim(last, "\"'`()")
35✔
284
                                colName = strings.ToLower(last)
35✔
285
                        }
286
                }
287

288
                if colName == "" || isReserved(colName) {
42✔
289
                        colName = "?column?"
4✔
290
                }
4✔
291
                cols = append(cols, colName)
38✔
292
        }
293

294
        return cols
9✔
295
}
296

297
func splitAtDepthZero(s string, sep rune) []string {
9✔
298
        var parts []string
9✔
299
        depth := 0
9✔
300
        start := 0
9✔
301
        for i, c := range s {
550✔
302
                switch c {
541✔
NEW
303
                case '(':
×
NEW
304
                        depth++
×
NEW
305
                case ')':
×
NEW
306
                        depth--
×
307
                default:
541✔
308
                        if depth == 0 && c == sep {
570✔
309
                                parts = append(parts, s[start:i])
29✔
310
                                start = i + 1
29✔
311
                        }
29✔
312
                }
313
        }
314
        parts = append(parts, s[start:])
9✔
315
        return parts
9✔
316
}
317

318
func isReserved(s string) bool {
36✔
319
        switch s {
36✔
320
        case "select", "from", "where", "and", "or", "not", "in", "is",
321
                "null", "true", "false", "case", "when", "then", "else", "end",
322
                "as", "on", "join", "left", "right", "inner", "outer", "cross",
323
                "order", "by", "group", "having", "limit", "offset", "union",
324
                "all", "distinct", "exists", "between", "like", "ilike":
2✔
325
                return true
2✔
326
        }
327
        return false
34✔
328
}
329

330
// stdPgTypes is the minimal set of Postgres types that Rails's
331
// load_additional_types pass needs to populate its OID type map.
332
// Without these rows Rails treats every value as Type::Value (default),
333
// which breaks model-side `enum :col, ...` declarations.
334
var stdPgTypes = []struct {
335
        oid     int64
336
        typname string
337
        typtype string
338
}{
339
        {16, "bool", "b"},
340
        {17, "bytea", "b"},
341
        {18, "char", "b"},
342
        {19, "name", "b"},
343
        {20, "int8", "b"},
344
        {21, "int2", "b"},
345
        {23, "int4", "b"},
346
        {25, "text", "b"},
347
        {26, "oid", "b"},
348
        {114, "json", "b"},
349
        {142, "xml", "b"},
350
        {700, "float4", "b"},
351
        {701, "float8", "b"},
352
        {1042, "bpchar", "b"},
353
        {1043, "varchar", "b"},
354
        {1082, "date", "b"},
355
        {1083, "time", "b"},
356
        {1114, "timestamp", "b"},
357
        {1184, "timestamptz", "b"},
358
        {1186, "interval", "b"},
359
        {1700, "numeric", "b"},
360
        {2950, "uuid", "b"},
361
        {3802, "jsonb", "b"},
362
}
363

364
// handlePgTypeRows answers Rails's pg_type catalog query with one row per
365
// standard Postgres type. The query column list varies (Rails 7 selects
366
// oid, typname, typelem, typdelim, typinput, rngsubtype, typtype,
367
// typbasetype) — emit values for the columns the query asks for, NULL
368
// for any unrecognised name.
NEW
369
func (s *session) handlePgTypeRows(query string, cols []string) error {
×
NEW
370
        colDescs := make([]sql.ColDescriptor, len(cols))
×
NEW
371
        for i, name := range cols {
×
NEW
372
                switch strings.ToLower(name) {
×
NEW
373
                case "oid":
×
NEW
374
                        colDescs[i] = sql.ColDescriptor{Column: name, Type: sql.IntegerType}
×
NEW
375
                default:
×
NEW
376
                        colDescs[i] = sql.ColDescriptor{Column: name, Type: sql.VarcharType}
×
377
                }
378
        }
NEW
379
        if _, err := s.writeMessage(buildMultiColRowDescription(colDescs)); err != nil {
×
NEW
380
                return err
×
NEW
381
        }
×
382

NEW
383
        rows := make([]*sql.Row, 0, len(stdPgTypes))
×
NEW
384
        for _, t := range stdPgTypes {
×
NEW
385
                vals := make([]sql.TypedValue, len(cols))
×
NEW
386
                for i, name := range cols {
×
NEW
387
                        switch strings.ToLower(name) {
×
NEW
388
                        case "oid":
×
NEW
389
                                vals[i] = sql.NewInteger(t.oid)
×
NEW
390
                        case "typname":
×
NEW
391
                                vals[i] = sql.NewVarchar(t.typname)
×
NEW
392
                        case "typtype":
×
NEW
393
                                vals[i] = sql.NewVarchar(t.typtype)
×
NEW
394
                        case "typelem":
×
NEW
395
                                vals[i] = sql.NewVarchar("0")
×
NEW
396
                        case "typdelim":
×
NEW
397
                                vals[i] = sql.NewVarchar(",")
×
NEW
398
                        case "typinput":
×
NEW
399
                                vals[i] = sql.NewVarchar(t.typname + "in")
×
NEW
400
                        case "rngsubtype":
×
NEW
401
                                vals[i] = sql.NewNull(sql.VarcharType)
×
NEW
402
                        case "typbasetype":
×
NEW
403
                                vals[i] = sql.NewVarchar("0")
×
NEW
404
                        case "typcategory":
×
NEW
405
                                vals[i] = sql.NewVarchar("U")
×
NEW
406
                        case "typnotnull":
×
NEW
407
                                vals[i] = sql.NewVarchar("false")
×
NEW
408
                        case "typdefault":
×
NEW
409
                                vals[i] = sql.NewNull(sql.VarcharType)
×
NEW
410
                        default:
×
NEW
411
                                vals[i] = sql.NewNull(sql.VarcharType)
×
412
                        }
413
                }
NEW
414
                rows = append(rows, &sql.Row{ValuesByPosition: vals})
×
415
        }
NEW
416
        if _, err := s.writeMessage(bm.DataRow(rows, len(cols), nil)); err != nil {
×
NEW
417
                return err
×
NEW
418
        }
×
NEW
419
        s.log.Infof("pgcompat: pg_type query — returned %d standard type rows", len(rows))
×
NEW
420
        return nil
×
421
}
422

423
// handlePgSystemQueryDataOnly is like handlePgSystemQuery but skips
424
// RowDescription (for extended query protocol where Describe already sent it).
NEW
425
func (s *session) handlePgSystemQueryDataOnly(query string) error {
×
NEW
426
        cols := extractColumnNames(query)
×
NEW
427
        if len(cols) == 0 {
×
NEW
428
                return nil
×
NEW
429
        }
×
430

431
        // pg_type rows are needed by Rails' type registry. In Extended Query
432
        // mode the Describe has already sent RowDescription, so emit only the
433
        // DataRow set.
NEW
434
        lq := strings.ToLower(query)
×
NEW
435
        if strings.Contains(lq, "pg_type") {
×
NEW
436
                rows := make([]*sql.Row, 0, len(stdPgTypes))
×
NEW
437
                for _, t := range stdPgTypes {
×
NEW
438
                        vals := make([]sql.TypedValue, len(cols))
×
NEW
439
                        for i, name := range cols {
×
NEW
440
                                switch strings.ToLower(name) {
×
NEW
441
                                case "oid":
×
NEW
442
                                        vals[i] = sql.NewInteger(t.oid)
×
NEW
443
                                case "typname":
×
NEW
444
                                        vals[i] = sql.NewVarchar(t.typname)
×
NEW
445
                                case "typtype":
×
NEW
446
                                        vals[i] = sql.NewVarchar(t.typtype)
×
NEW
447
                                case "typelem":
×
NEW
448
                                        vals[i] = sql.NewVarchar("0")
×
NEW
449
                                case "typdelim":
×
NEW
450
                                        vals[i] = sql.NewVarchar(",")
×
NEW
451
                                case "typinput":
×
NEW
452
                                        vals[i] = sql.NewVarchar(t.typname + "in")
×
NEW
453
                                case "rngsubtype":
×
NEW
454
                                        vals[i] = sql.NewNull(sql.VarcharType)
×
NEW
455
                                case "typbasetype":
×
NEW
456
                                        vals[i] = sql.NewVarchar("0")
×
NEW
457
                                default:
×
NEW
458
                                        vals[i] = sql.NewNull(sql.VarcharType)
×
459
                                }
460
                        }
NEW
461
                        rows = append(rows, &sql.Row{ValuesByPosition: vals})
×
462
                }
NEW
463
                if _, err := s.writeMessage(bm.DataRow(rows, len(cols), nil)); err != nil {
×
NEW
464
                        return err
×
NEW
465
                }
×
NEW
466
                s.log.Infof("pgcompat: pg_type query (ext mode) — emitted %d type rows", len(rows))
×
NEW
467
                return nil
×
468
        }
NEW
469
        if strings.Contains(lq, "pg_range") {
×
NEW
470
                s.log.Infof("pgcompat: pg_range query (ext mode) — returning 0 rows")
×
NEW
471
                return nil
×
NEW
472
        }
×
473

NEW
474
        dbName := "defaultdb"
×
NEW
475
        if s.db != nil {
×
NEW
476
                dbName = s.db.GetName()
×
NEW
477
        }
×
478

NEW
479
        values := make([]sql.TypedValue, len(cols))
×
NEW
480
        for i, name := range cols {
×
NEW
481
                if val, ok := knownColumnValues[name]; ok {
×
NEW
482
                        if val == nil {
×
NEW
483
                                switch name {
×
NEW
484
                                case "datname":
×
NEW
485
                                        values[i] = sql.NewVarchar(dbName)
×
NEW
486
                                case "name":
×
NEW
487
                                        if strings.Contains(strings.ToLower(query), "pg_database") {
×
NEW
488
                                                values[i] = sql.NewVarchar(dbName)
×
NEW
489
                                        } else {
×
NEW
490
                                                values[i] = sql.NewVarchar("immudb")
×
NEW
491
                                        }
×
NEW
492
                                default:
×
NEW
493
                                        values[i] = sql.NewNull(sql.VarcharType)
×
494
                                }
NEW
495
                        } else {
×
NEW
496
                                values[i] = val
×
NEW
497
                        }
×
NEW
498
                } else {
×
NEW
499
                        values[i] = sql.NewNull(sql.VarcharType)
×
NEW
500
                }
×
501
        }
502

NEW
503
        row := &sql.Row{ValuesByPosition: values}
×
NEW
504

×
NEW
505
        // Log the actual values being sent
×
NEW
506
        for i, v := range values {
×
NEW
507
                if v != nil {
×
NEW
508
                        s.log.Infof("pgcompat: DataRow col[%d]=%s rawValue=%v type=%s", i, cols[i], v.RawValue(), v.Type())
×
NEW
509
                }
×
510
        }
511

NEW
512
        if _, err := s.writeMessage(bm.DataRow([]*sql.Row{row}, len(cols), nil)); err != nil {
×
NEW
513
                return err
×
NEW
514
        }
×
515

NEW
516
        return nil
×
517
}
518

519
// extractResultCols returns ColDescriptor slice for emulable queries,
520
// used by the Parse handler to populate statement.Results for Describe.
521
func extractResultCols(query string) []sql.ColDescriptor {
1✔
522
        names := extractColumnNames(query)
1✔
523
        if len(names) == 0 {
1✔
NEW
524
                return nil
×
NEW
525
        }
×
526

527
        cols := make([]sql.ColDescriptor, len(names))
1✔
528
        for i, name := range names {
5✔
529
                colType := sql.VarcharType
4✔
530
                if val, ok := knownColumnValues[name]; ok && val != nil {
4✔
NEW
531
                        colType = val.Type()
×
NEW
532
                }
×
533
                cols[i] = sql.ColDescriptor{Column: name, Type: colType}
4✔
534
        }
535
        return cols
1✔
536
}
537

538
// buildMultiColRowDescription creates a RowDescription message with plain
539
// column names from ColDescriptor.Column, bypassing the Selector() encoding.
540
func buildMultiColRowDescription(cols []sql.ColDescriptor) []byte {
4✔
541
        messageType := []byte(`T`)
4✔
542
        fieldNumb := make([]byte, 2)
4✔
543
        binary.BigEndian.PutUint16(fieldNumb, uint16(len(cols)))
4✔
544

4✔
545
        fieldData := make([]byte, 0)
4✔
546
        for n, col := range cols {
11✔
547
                fieldName := append([]byte(col.Column), 0)
7✔
548
                tableOID := make([]byte, 4)
7✔
549
                attrNum := make([]byte, 2)
7✔
550
                binary.BigEndian.PutUint16(attrNum, uint16(n+1))
7✔
551
                oid := make([]byte, 4)
7✔
552
                binary.BigEndian.PutUint32(oid, uint32(pgmeta.PgTypeMap[col.Type][pgmeta.PgTypeMapOid]))
7✔
553
                typeLen := make([]byte, 2)
7✔
554
                binary.BigEndian.PutUint16(typeLen, uint16(pgmeta.PgTypeMap[col.Type][pgmeta.PgTypeMapLength]))
7✔
555
                typeMod := make([]byte, 4)
7✔
556
                binary.BigEndian.PutUint32(typeMod, 0xFFFFFFFF)
7✔
557
                formatCode := make([]byte, 2)
7✔
558

7✔
559
                fieldData = append(fieldData, bytes.Join([][]byte{fieldName, tableOID, attrNum, oid, typeLen, typeMod, formatCode}, nil)...)
7✔
560
        }
7✔
561

562
        msgLen := make([]byte, 4)
4✔
563
        binary.BigEndian.PutUint32(msgLen, uint32(4+2+len(fieldData)))
4✔
564

4✔
565
        return bytes.Join([][]byte{messageType, msgLen, fieldNumb, fieldData}, nil)
4✔
566
}
567

568

569
// handleXormColumnsQuery emulates XORM's column-introspection query
570
// (see xormColumnsRe in stmts_handler.go). XORM walks pg_attribute /
571
// pg_class / pg_type / information_schema.columns and reads:
572
//
573
//   column_name, column_default, is_nullable, data_type,
574
//   character_maximum_length, description, primarykey, uniquekey
575
//
576
// Bind values: $1 = c.relname (table name), $2 = s.table_schema (== public).
577
//
578
// We synthesise one row per column from immudb's catalog. Without this,
579
// the query fell to the generic 1-row pgAdminProbe handler, which filled
580
// `column_name` with NULL, and XORM's subsequent
581
//   sql: Scan error on column index 0, name "column_name":
582
//        converting NULL to string is unsupported
583
// stopped initialisation cold.
584
func (s *session) handleXormColumnsQuery(query string, params []*schema.NamedParam, extMode bool) error {
1✔
585
        cols := extractColumnNames(query)
1✔
586
        // Build column descriptors. XORM expects strings for the textual
1✔
587
        // columns and bools for primarykey/uniquekey.
1✔
588
        colDescs := make([]sql.ColDescriptor, len(cols))
1✔
589
        for i, name := range cols {
5✔
590
                colType := sql.VarcharType
4✔
591
                switch strings.ToLower(name) {
4✔
NEW
592
                case "primarykey", "uniquekey":
×
NEW
593
                        colType = sql.BooleanType
×
NEW
594
                case "character_maximum_length":
×
NEW
595
                        colType = sql.IntegerType
×
596
                }
597
                colDescs[i] = sql.ColDescriptor{Column: name, Type: colType}
4✔
598
        }
599
        if !extMode {
1✔
NEW
600
                if _, err := s.writeMessage(buildMultiColRowDescription(colDescs)); err != nil {
×
NEW
601
                        return err
×
NEW
602
                }
×
603
        }
604

605
        // Resolve $1 (table name). paramAsString tolerates both string- and
606
        // bytes-shaped parameter values, so it doesn't matter whether the
607
        // client encoded the bind as text or binary format.
608
        tableName := ""
1✔
609
        if len(params) >= 1 {
2✔
610
                tableName = paramAsString(params[0])
1✔
611
        }
1✔
612
        for _, p := range params {
2✔
613
                if p.Name == "param1" {
2✔
614
                        tableName = paramAsString(p)
1✔
615
                }
1✔
616
        }
617

618
        tx, err := s.db.NewSQLTx(s.ctx, sql.DefaultTxOptions().WithReadOnly(true))
1✔
619
        if err != nil {
1✔
NEW
620
                s.log.Infof("pgcompat: xormColumns: begin tx: %v", err)
×
NEW
621
                return nil
×
NEW
622
        }
×
623
        defer tx.Cancel()
1✔
624

1✔
625
        catalog := tx.Catalog()
1✔
626
        table, err := catalog.GetTableByName(tableName)
1✔
627
        if err != nil {
1✔
NEW
628
                s.log.Infof("pgcompat: xormColumns: table %q absent — emitting 0 rows", tableName)
×
NEW
629
                return nil
×
NEW
630
        }
×
631

632
        // Identify primary-key columns (single-column PK assumed; immudb's
633
        // PK is a single composite index rooted at table.PrimaryIndex()).
634
        pkCols := map[string]bool{}
1✔
635
        if pk := table.PrimaryIndex(); pk != nil {
2✔
636
                for _, c := range pk.Cols() {
2✔
637
                        pkCols[c.Name()] = true
1✔
638
                }
1✔
639
        }
640
        uniqueCols := map[string]bool{}
1✔
641
        for _, idx := range table.GetIndexes() {
2✔
642
                if !idx.IsUnique() {
1✔
NEW
643
                        continue
×
644
                }
645
                for _, c := range idx.Cols() {
2✔
646
                        uniqueCols[c.Name()] = true
1✔
647
                }
1✔
648
        }
649

650
        rows := make([]*sql.Row, 0, len(table.Cols()))
1✔
651
        for _, c := range table.Cols() {
4✔
652
                pgTypeName, _ := immudbToPGType(c.Type())
3✔
653
                var maxLen sql.TypedValue = sql.NewNull(sql.IntegerType)
3✔
654
                if c.Type() == sql.VarcharType && c.MaxLen() > 0 {
4✔
655
                        maxLen = sql.NewInteger(int64(c.MaxLen()))
1✔
656
                }
1✔
657
                isNullable := "YES"
3✔
658
                if c.IsNullable() == false {
4✔
659
                        isNullable = "NO"
1✔
660
                }
1✔
661
                var defaultExpr sql.TypedValue = sql.NewNull(sql.VarcharType)
3✔
662
                if c.HasDefault() {
3✔
NEW
663
                        defaultExpr = sql.NewVarchar(c.DefaultValue().String())
×
NEW
664
                }
×
665

666
                vals := make([]sql.TypedValue, len(cols))
3✔
667
                for i, name := range cols {
15✔
668
                        switch strings.ToLower(name) {
12✔
669
                        case "column_name":
3✔
670
                                vals[i] = sql.NewVarchar(c.Name())
3✔
671
                        case "column_default":
3✔
672
                                vals[i] = defaultExpr
3✔
673
                        case "is_nullable":
3✔
674
                                vals[i] = sql.NewVarchar(isNullable)
3✔
675
                        case "data_type":
3✔
676
                                vals[i] = sql.NewVarchar(pgTypeName)
3✔
NEW
677
                        case "character_maximum_length":
×
NEW
678
                                vals[i] = maxLen
×
NEW
679
                        case "description":
×
NEW
680
                                vals[i] = sql.NewNull(sql.VarcharType)
×
NEW
681
                        case "primarykey":
×
NEW
682
                                vals[i] = sql.NewBool(pkCols[c.Name()])
×
NEW
683
                        case "uniquekey":
×
NEW
684
                                vals[i] = sql.NewBool(uniqueCols[c.Name()])
×
NEW
685
                        default:
×
NEW
686
                                vals[i] = sql.NewNull(sql.VarcharType)
×
687
                        }
688
                }
689
                rows = append(rows, &sql.Row{ValuesByPosition: vals})
3✔
690
        }
691

692
        s.log.Infof("pgcompat: xormColumns: table=%q cols=%d", tableName, len(rows))
1✔
693
        if len(rows) > 0 {
2✔
694
                if _, err := s.writeMessage(bm.DataRow(rows, len(cols), nil)); err != nil {
1✔
NEW
695
                        return err
×
NEW
696
                }
×
697
        }
698
        return nil
1✔
699
}
700

701

702
// paramAsString unwraps a NamedParam into a Go string regardless of
703
// whether the wire delivered it as text (SQLValue_S) or as raw bytes
704
// (SQLValue_Bs — happens when the parameter type was reported as
705
// AnyType/bytea and the client encoded a string as raw UTF-8 bytes).
706
// Used by the surviving XORM column-introspection handler to recover
707
// the bound table name.
708
func paramAsString(p *schema.NamedParam) string {
2✔
709
        switch v := schema.RawValue(p.Value).(type) {
2✔
710
        case string:
2✔
711
                return strings.Trim(v, `"`)
2✔
NEW
712
        case []byte:
×
NEW
713
                return strings.Trim(string(v), `"`)
×
714
        }
NEW
715
        return ""
×
716
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc