• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6888354313

16 Nov 2023 08:48AM UTC coverage: 89.492% (-0.02%) from 89.514%
6888354313

push

gh-ci

jeroiraz
chore(pkg/pgsql): handle odbc help

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

1 of 8 new or added lines in 1 file covered. (12.5%)

6 existing lines in 3 files now uncovered.

33929 of 37913 relevant lines covered (89.49%)

143191.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/pkg/pgsql/server/query_machine.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "errors"
21
        "fmt"
22
        "io"
23
        "math"
24
        "sort"
25
        "strings"
26

27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        pserr "github.com/codenotary/immudb/pkg/pgsql/errors"
30
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
31
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
32
)
33

34
func (s *session) QueryMachine() error {
44✔
35
        var waitForSync = false
44✔
36

44✔
37
        _, err := s.writeMessage(bm.ReadyForQuery())
44✔
38
        if err != nil {
45✔
39
                return err
1✔
40
        }
1✔
41

42
        for {
220✔
43
                msg, extQueryMode, err := s.nextMessage()
177✔
44
                if err != nil {
198✔
45
                        if errors.Is(err, io.EOF) {
41✔
46
                                s.log.Warningf("connection is closed")
20✔
47
                                return nil
20✔
48
                        }
20✔
49
                        s.HandleError(err)
1✔
50
                        continue
1✔
51
                }
52

53
                // When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
54
                // then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal
55
                // message processing. (But note that no skipping occurs if an error is detected while processing Sync — this
56
                // ensures that there is one and only one ReadyForQuery sent for each Sync.)
57
                if waitForSync && extQueryMode {
142✔
58
                        if _, ok := msg.(fm.SyncMsg); !ok {
4✔
59
                                continue
2✔
60
                        }
61
                }
62

63
                switch v := msg.(type) {
138✔
64
                case fm.TerminateMsg:
7✔
65
                        return s.mr.CloseConnection()
7✔
66
                case fm.QueryMsg:
45✔
67
                        statements := v.GetStatements()
45✔
68

45✔
69
                        if statements == "select relname, nspname, relkind from pg_catalog.pg_class c, pg_catalog.pg_namespace n where relkind in ('r', 'v', 'm', 'f', 'p') and nspname not in ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1') and n.oid = relnamespace order by nspname, relname" {
45✔
70
                                statements = "show tables"
×
71
                        }
×
72

73
                        tableHelpPrefix := "select n.nspname, c.relname, a.attname, a.atttypid, t.typname, a.attnum, a.attlen, a.atttypmod, a.attnotnull, c.relhasrules, c.relkind, c.oid, pg_get_expr(d.adbin, d.adrelid), case t.typtype when 'd' then t.typbasetype else 0 end, t.typtypmod, c.relhasoids, '', c.relhassubclass from (((pg_catalog.pg_class c inner join pg_catalog.pg_namespace n on n.oid = c.relnamespace and c.relname like '"
45✔
74

45✔
75
                        if strings.HasPrefix(statements, tableHelpPrefix) {
45✔
76
                                tableName := strings.Split(strings.TrimPrefix(statements, tableHelpPrefix), "'")[0]
×
NEW
77
                                statements = fmt.Sprintf("select column_name as tq, column_name as tow, column_name as tn, column_name as COLUMN_NAME, type_name as DATA_TYPE, type_name as TYPE_NAME, type_name as p, type_name as l, type_name as s, type_name as r, is_nullable as NULLABLE, column_name as rk, column_name as cd, type_name as SQL_DATA_TYPE, type_name as sts, column_name as coll, type_name as orp, is_nullable as IS_NULLABLE, type_name as dz, type_name as ft, type_name as iau, type_name as pn, column_name as toi, column_name as btd, column_name as tmo, column_name as tin from table(%s)", tableName)
×
78
                        }
×
79

80
                        err := s.fetchAndWriteResults(statements, nil, nil, extQueryMode)
45✔
81
                        if err != nil {
52✔
82
                                waitForSync = extQueryMode
7✔
83
                                s.HandleError(err)
7✔
84
                        }
7✔
85

86
                        if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
49✔
87
                                waitForSync = extQueryMode
4✔
88
                        }
4✔
89
                case fm.ParseMsg:
22✔
90
                        _, ok := s.statements[v.DestPreparedStatementName]
22✔
91
                        // unnamed prepared statement overrides previous
22✔
92
                        if ok && v.DestPreparedStatementName != "" {
23✔
93
                                waitForSync = extQueryMode
1✔
94
                                s.HandleError(fmt.Errorf("statement '%s' already present", v.DestPreparedStatementName))
1✔
95
                                continue
1✔
96
                        }
97

98
                        var paramCols []*schema.Column
21✔
99
                        var resCols []*schema.Column
21✔
100
                        var stmt sql.SQLStmt
21✔
101

21✔
102
                        if !s.isInBlackList(v.Statements) {
30✔
103
                                stmts, err := sql.Parse(strings.NewReader(v.Statements))
9✔
104
                                if err != nil {
10✔
105
                                        waitForSync = extQueryMode
1✔
106
                                        s.HandleError(err)
1✔
107
                                        continue
1✔
108
                                }
109

110
                                // Note: as stated in the pgsql spec, the query string contained in a Parse message cannot include more than one SQL statement;
111
                                // else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
112
                                // in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
113
                                // complicate the protocol unduly.
114
                                if len(stmts) > 1 {
9✔
115
                                        waitForSync = extQueryMode
1✔
116
                                        s.HandleError(pserr.ErrMaxStmtNumberExceeded)
1✔
117
                                        continue
1✔
118
                                }
119
                                if paramCols, resCols, err = s.inferParamAndResultCols(stmts[0]); err != nil {
7✔
120
                                        waitForSync = extQueryMode
×
121
                                        s.HandleError(err)
×
122
                                        continue
×
123
                                }
124
                        }
125

126
                        _, err = s.writeMessage(bm.ParseComplete())
19✔
127
                        if err != nil {
20✔
128
                                waitForSync = extQueryMode
1✔
129
                                continue
1✔
130
                        }
131

132
                        newStatement := &statement{
18✔
133
                                // if no name is provided empty string marks the unnamed prepared statement
18✔
134
                                Name:         v.DestPreparedStatementName,
18✔
135
                                Params:       paramCols,
18✔
136
                                SQLStatement: v.Statements,
18✔
137
                                PreparedStmt: stmt,
18✔
138
                                Results:      resCols,
18✔
139
                        }
18✔
140

18✔
141
                        s.statements[v.DestPreparedStatementName] = newStatement
18✔
142

143
                case fm.DescribeMsg:
18✔
144
                        // The Describe message (statement variant) specifies the name of an existing prepared statement
18✔
145
                        // (or an empty string for the unnamed prepared statement). The response is a ParameterDescription
18✔
146
                        // message describing the parameters needed by the statement, followed by a RowDescription message
18✔
147
                        // describing the rows that will be returned when the statement is eventually executed (or a NoData
18✔
148
                        // message if the statement will not return rows). ErrorResponse is issued if there is no such prepared
18✔
149
                        // statement. Note that since Bind has not yet been issued, the formats to be used for returned columns
18✔
150
                        // are not yet known to the backend; the format code fields in the RowDescription message will be zeroes
18✔
151
                        // in this case.
18✔
152
                        if v.DescType == "S" {
28✔
153
                                st, ok := s.statements[v.Name]
10✔
154
                                if !ok {
11✔
155
                                        waitForSync = extQueryMode
1✔
156
                                        s.HandleError(fmt.Errorf("statement '%s' not found", v.Name))
1✔
157
                                        continue
1✔
158
                                }
159

160
                                if _, err = s.writeMessage(bm.ParameterDescription(st.Params)); err != nil {
10✔
161
                                        waitForSync = extQueryMode
1✔
162
                                        continue
1✔
163
                                }
164

165
                                if _, err := s.writeMessage(bm.RowDescription(st.Results, nil)); err != nil {
9✔
166
                                        waitForSync = extQueryMode
1✔
167
                                        continue
1✔
168
                                }
169
                        }
170
                        // The Describe message (portal variant) specifies the name of an existing portal (or an empty string
171
                        // for the unnamed portal). The response is a RowDescription message describing the rows that will be
172
                        // returned by executing the portal; or a NoData message if the portal does not contain a query that
173
                        // will return rows; or ErrorResponse if there is no such portal.
174
                        if v.DescType == "P" {
23✔
175
                                portal, ok := s.portals[v.Name]
8✔
176
                                if !ok {
9✔
177
                                        waitForSync = extQueryMode
1✔
178
                                        s.HandleError(fmt.Errorf("portal '%s' not found", v.Name))
1✔
179
                                        continue
1✔
180
                                }
181

182
                                if _, err = s.writeMessage(bm.RowDescription(portal.Statement.Results, portal.ResultColumnFormatCodes)); err != nil {
8✔
183
                                        waitForSync = extQueryMode
1✔
184
                                        continue
1✔
185
                                }
186
                        }
187
                case fm.SyncMsg:
19✔
188
                        waitForSync = false
19✔
189
                        s.writeMessage(bm.ReadyForQuery())
19✔
190
                case fm.BindMsg:
16✔
191
                        _, ok := s.portals[v.DestPortalName]
16✔
192
                        // unnamed portal overrides previous
16✔
193
                        if ok && v.DestPortalName != "" {
17✔
194
                                waitForSync = extQueryMode
1✔
195
                                s.HandleError(fmt.Errorf("portal '%s' already present", v.DestPortalName))
1✔
196
                                continue
1✔
197
                        }
198

199
                        st, ok := s.statements[v.PreparedStatementName]
15✔
200
                        if !ok {
16✔
201
                                waitForSync = extQueryMode
1✔
202
                                s.HandleError(fmt.Errorf("statement '%s' not found", v.PreparedStatementName))
1✔
203
                                continue
1✔
204
                        }
205

206
                        encodedParams, err := buildNamedParams(st.Params, v.ParamVals)
14✔
207
                        if err != nil {
15✔
208
                                waitForSync = extQueryMode
1✔
209
                                s.HandleError(err)
1✔
210
                                continue
1✔
211
                        }
212

213
                        if _, err = s.writeMessage(bm.BindComplete()); err != nil {
14✔
214
                                waitForSync = extQueryMode
1✔
215
                                continue
1✔
216
                        }
217

218
                        newPortal := &portal{
12✔
219
                                Name:                    v.DestPortalName,
12✔
220
                                Statement:               st,
12✔
221
                                Parameters:              encodedParams,
12✔
222
                                ResultColumnFormatCodes: v.ResultColumnFormatCodes,
12✔
223
                        }
12✔
224

12✔
225
                        s.portals[v.DestPortalName] = newPortal
12✔
226
                case fm.Execute:
11✔
227
                        //query execution
11✔
228
                        portal, ok := s.portals[v.PortalName]
11✔
229
                        if !ok {
11✔
230
                                waitForSync = extQueryMode
×
231
                                s.HandleError(fmt.Errorf("portal '%s' not found", v.PortalName))
×
232
                                continue
×
233
                        }
234

235
                        delete(s.portals, v.PortalName)
11✔
236

11✔
237
                        err := s.fetchAndWriteResults(portal.Statement.SQLStatement,
11✔
238
                                portal.Parameters,
11✔
239
                                portal.ResultColumnFormatCodes,
11✔
240
                                extQueryMode,
11✔
241
                        )
11✔
242
                        if err != nil {
14✔
243
                                waitForSync = extQueryMode
3✔
244
                                s.HandleError(err)
3✔
245
                        }
3✔
246
                case fm.FlushMsg:
×
247
                        // there is no buffer to be flushed
248
                default:
×
249
                        waitForSync = extQueryMode
×
250
                        s.HandleError(pserr.ErrUnknowMessageType)
×
251
                }
252
        }
253
}
254

255
func (s *session) fetchAndWriteResults(statements string, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, extQueryMode bool) error {
56✔
256
        if len(statements) == 0 {
56✔
257
                _, err := s.writeMessage(bm.EmptyQueryResponse())
×
258
                return err
×
259
        }
×
260

261
        if statements == "select current_schema()" {
56✔
NEW
262
                _, err := s.writeMessage(bm.DataRow(nil, 0, resultColumnFormatCodes))
×
NEW
263
                if err != nil {
×
NEW
264
                        return err
×
NEW
265
                }
×
266

NEW
267
                _, err = s.writeMessage(bm.CommandComplete([]byte("ok")))
×
NEW
268
                return err
×
269
        }
270

271
        if s.isInBlackList(statements) {
61✔
272
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
5✔
273
                return err
5✔
274
        }
5✔
275

276
        if i := s.isEmulableInternally(statements); i != nil {
54✔
277
                if err := s.tryToHandleInternally(i); err != nil && err != pserr.ErrMessageCannotBeHandledInternally {
4✔
278
                        return err
1✔
279
                }
1✔
280

281
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
2✔
282
                return err
2✔
283
        }
284

285
        stmts, err := sql.Parse(strings.NewReader(statements))
48✔
286
        if err != nil {
51✔
287
                return err
3✔
288
        }
3✔
289

290
        for _, stmt := range stmts {
91✔
291
                switch st := stmt.(type) {
46✔
292
                case *sql.UseDatabaseStmt:
1✔
293
                        {
2✔
294
                                return pserr.ErrUseDBStatementNotSupported
1✔
295
                        }
1✔
296
                case *sql.SelectStmt:
12✔
297
                        if err = s.query(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
13✔
298
                                return err
1✔
299
                        }
1✔
300
                default:
33✔
301
                        if err = s.exec(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
34✔
302
                                return err
1✔
303
                        }
1✔
304
                }
305
        }
306

307
        _, err = s.writeMessage(bm.CommandComplete([]byte("ok")))
42✔
308
        if err != nil {
42✔
309
                return err
×
310
        }
×
311

312
        return nil
42✔
313
}
314

315
func (s *session) query(st *sql.SelectStmt, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
12✔
316
        res, err := s.db.SQLQueryPrepared(s.ctx, s.tx, st, parameters)
12✔
317
        if err != nil {
13✔
318
                return err
1✔
319
        }
1✔
320

321
        if !skipRowDesc {
16✔
322
                if _, err = s.writeMessage(bm.RowDescription(res.Columns, nil)); err != nil {
5✔
323
                        return err
×
324
                }
×
325
        }
326

327
        _, err = s.writeMessage(bm.DataRow(res.Rows, len(res.Columns), resultColumnFormatCodes))
11✔
328
        return err
11✔
329
}
330

331
func (s *session) exec(st sql.SQLStmt, namedParams []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
33✔
332
        params := make(map[string]interface{}, len(namedParams))
33✔
333

33✔
334
        for _, p := range namedParams {
45✔
335
                params[p.Name] = schema.RawValue(p.Value)
12✔
336
        }
12✔
337

338
        ntx, _, err := s.db.SQLExecPrepared(s.ctx, s.tx, []sql.SQLStmt{st}, params)
33✔
339
        s.tx = ntx
33✔
340

33✔
341
        return err
33✔
342
}
343

344
type portal struct {
345
        Name                    string
346
        Statement               *statement
347
        Parameters              []*schema.NamedParam
348
        ResultColumnFormatCodes []int16
349
}
350

351
type statement struct {
352
        Name         string
353
        SQLStatement string
354
        PreparedStmt sql.SQLStmt
355
        Params       []*schema.Column
356
        Results      []*schema.Column
357
}
358

359
func (s *session) inferParamAndResultCols(stmt sql.SQLStmt) ([]*schema.Column, []*schema.Column, error) {
7✔
360
        resCols := make([]*schema.Column, 0)
7✔
361

7✔
362
        sel, ok := stmt.(*sql.SelectStmt)
7✔
363
        if ok {
13✔
364
                rr, err := s.db.SQLQueryRowReader(s.ctx, s.tx, sel, nil)
6✔
365
                if err != nil {
6✔
366
                        return nil, nil, err
×
367
                }
×
368
                cols, err := rr.Columns(s.ctx)
6✔
369
                if err != nil {
6✔
370
                        return nil, nil, err
×
371
                }
×
372
                for _, c := range cols {
30✔
373
                        resCols = append(resCols, &schema.Column{Name: c.Selector(), Type: c.Type})
24✔
374
                }
24✔
375
        }
376

377
        r, err := s.db.InferParametersPrepared(s.ctx, s.tx, stmt)
7✔
378
        if err != nil {
7✔
379
                return nil, nil, err
×
380
        }
×
381

382
        if len(r) > math.MaxInt16 {
7✔
383
                return nil, nil, pserr.ErrMaxParamsNumberExceeded
×
384
        }
×
385

386
        var paramsNameList []string
7✔
387
        for n := range r {
38✔
388
                paramsNameList = append(paramsNameList, n)
31✔
389
        }
31✔
390
        sort.Strings(paramsNameList)
7✔
391

7✔
392
        paramCols := make([]*schema.Column, 0)
7✔
393
        for _, n := range paramsNameList {
38✔
394
                paramCols = append(paramCols, &schema.Column{Name: n, Type: r[n]})
31✔
395
        }
31✔
396

397
        return paramCols, resCols, nil
7✔
398
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc