• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6850968753

13 Nov 2023 01:58PM UTC coverage: 89.235% (-0.3%) from 89.556%
6850968753

push

gh-ci

jeroiraz
feat(embedded/sql): show table stmt

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

82 of 169 new or added lines in 2 files covered. (48.52%)

643 existing lines in 10 files now uncovered.

33838 of 37920 relevant lines covered (89.24%)

144254.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.33
/pkg/pgsql/server/query_machine.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "errors"
21
        "fmt"
22
        "io"
23
        "math"
24
        "sort"
25
        "strings"
26

27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        pserr "github.com/codenotary/immudb/pkg/pgsql/errors"
30
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
31
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
32
)
33

34
func (s *session) QueryMachine() error {
44✔
35
        var waitForSync = false
44✔
36

44✔
37
        _, err := s.writeMessage(bm.ReadyForQuery())
44✔
38
        if err != nil {
45✔
39
                return err
1✔
40
        }
1✔
41

42
        for {
220✔
43
                msg, extQueryMode, err := s.nextMessage()
177✔
44
                if err != nil {
198✔
45
                        if errors.Is(err, io.EOF) {
41✔
46
                                s.log.Warningf("connection is closed")
20✔
47
                                return nil
20✔
48
                        }
20✔
49
                        s.HandleError(err)
1✔
50
                        continue
1✔
51
                }
52

53
                // When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
54
                // then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal
55
                // message processing. (But note that no skipping occurs if an error is detected while processing Sync — this
56
                // ensures that there is one and only one ReadyForQuery sent for each Sync.)
57
                if waitForSync && extQueryMode {
142✔
58
                        if _, ok := msg.(fm.SyncMsg); !ok {
4✔
59
                                continue
2✔
60
                        }
61
                }
62

63
                switch v := msg.(type) {
138✔
64
                case fm.TerminateMsg:
7✔
65
                        return s.mr.CloseConnection()
7✔
66
                case fm.QueryMsg:
45✔
67
                        statements := v.GetStatements()
45✔
68

45✔
69
                        if statements == "select relname, nspname, relkind from pg_catalog.pg_class c, pg_catalog.pg_namespace n where relkind in ('r', 'v', 'm', 'f', 'p') and nspname not in ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1') and n.oid = relnamespace order by nspname, relname" {
45✔
UNCOV
70
                                statements = "show tables"
×
UNCOV
71
                        }
×
72

73
                        tableHelpPrefix := "select n.nspname, c.relname, a.attname, a.atttypid, t.typname, a.attnum, a.attlen, a.atttypmod, a.attnotnull, c.relhasrules, c.relkind, c.oid, pg_get_expr(d.adbin, d.adrelid), case t.typtype when 'd' then t.typbasetype else 0 end, t.typtypmod, c.relhasoids, '', c.relhassubclass from (((pg_catalog.pg_class c inner join pg_catalog.pg_namespace n on n.oid = c.relnamespace and c.relname like '"
45✔
74

45✔
75
                        if strings.HasPrefix(statements, tableHelpPrefix) {
45✔
UNCOV
76
                                tableName := strings.Split(strings.TrimPrefix(statements, tableHelpPrefix), "'")[0]
×
UNCOV
77
                                statements = fmt.Sprintf("select column_name, type_name, is_nullable from table(%s)", tableName)
×
UNCOV
78
                        }
×
79

80
                        err := s.fetchAndWriteResults(statements, nil, nil, extQueryMode)
45✔
81
                        if err != nil {
52✔
82
                                waitForSync = extQueryMode
7✔
83
                                s.HandleError(err)
7✔
84
                        }
7✔
85

86
                        if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
49✔
87
                                waitForSync = extQueryMode
4✔
88
                        }
4✔
89
                case fm.ParseMsg:
22✔
90
                        _, ok := s.statements[v.DestPreparedStatementName]
22✔
91
                        // unnamed prepared statement overrides previous
22✔
92
                        if ok && v.DestPreparedStatementName != "" {
23✔
93
                                waitForSync = extQueryMode
1✔
94
                                s.HandleError(fmt.Errorf("statement '%s' already present", v.DestPreparedStatementName))
1✔
95
                                continue
1✔
96
                        }
97

98
                        var paramCols []*schema.Column
21✔
99
                        var resCols []*schema.Column
21✔
100
                        var stmt sql.SQLStmt
21✔
101

21✔
102
                        if !s.isInBlackList(v.Statements) {
30✔
103
                                stmts, err := sql.Parse(strings.NewReader(v.Statements))
9✔
104
                                if err != nil {
10✔
105
                                        waitForSync = extQueryMode
1✔
106
                                        s.HandleError(err)
1✔
107
                                        continue
1✔
108
                                }
109

110
                                // Note: as stated in the pgsql spec, the query string contained in a Parse message cannot include more than one SQL statement;
111
                                // else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
112
                                // in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
113
                                // complicate the protocol unduly.
114
                                if len(stmts) > 1 {
9✔
115
                                        waitForSync = extQueryMode
1✔
116
                                        s.HandleError(pserr.ErrMaxStmtNumberExceeded)
1✔
117
                                        continue
1✔
118
                                }
119
                                if paramCols, resCols, err = s.inferParamAndResultCols(stmts[0]); err != nil {
7✔
UNCOV
120
                                        waitForSync = extQueryMode
×
UNCOV
121
                                        s.HandleError(err)
×
UNCOV
122
                                        continue
×
123
                                }
124
                        }
125

126
                        _, err = s.writeMessage(bm.ParseComplete())
19✔
127
                        if err != nil {
20✔
128
                                waitForSync = extQueryMode
1✔
129
                                continue
1✔
130
                        }
131

132
                        newStatement := &statement{
18✔
133
                                // if no name is provided empty string marks the unnamed prepared statement
18✔
134
                                Name:         v.DestPreparedStatementName,
18✔
135
                                Params:       paramCols,
18✔
136
                                SQLStatement: v.Statements,
18✔
137
                                PreparedStmt: stmt,
18✔
138
                                Results:      resCols,
18✔
139
                        }
18✔
140

18✔
141
                        s.statements[v.DestPreparedStatementName] = newStatement
18✔
142

143
                case fm.DescribeMsg:
18✔
144
                        // The Describe message (statement variant) specifies the name of an existing prepared statement
18✔
145
                        // (or an empty string for the unnamed prepared statement). The response is a ParameterDescription
18✔
146
                        // message describing the parameters needed by the statement, followed by a RowDescription message
18✔
147
                        // describing the rows that will be returned when the statement is eventually executed (or a NoData
18✔
148
                        // message if the statement will not return rows). ErrorResponse is issued if there is no such prepared
18✔
149
                        // statement. Note that since Bind has not yet been issued, the formats to be used for returned columns
18✔
150
                        // are not yet known to the backend; the format code fields in the RowDescription message will be zeroes
18✔
151
                        // in this case.
18✔
152
                        if v.DescType == "S" {
28✔
153
                                st, ok := s.statements[v.Name]
10✔
154
                                if !ok {
11✔
155
                                        waitForSync = extQueryMode
1✔
156
                                        s.HandleError(fmt.Errorf("statement '%s' not found", v.Name))
1✔
157
                                        continue
1✔
158
                                }
159

160
                                if _, err = s.writeMessage(bm.ParameterDescription(st.Params)); err != nil {
10✔
161
                                        waitForSync = extQueryMode
1✔
162
                                        continue
1✔
163
                                }
164

165
                                if _, err := s.writeMessage(bm.RowDescription(st.Results, nil)); err != nil {
9✔
166
                                        waitForSync = extQueryMode
1✔
167
                                        continue
1✔
168
                                }
169
                        }
170
                        // The Describe message (portal variant) specifies the name of an existing portal (or an empty string
171
                        // for the unnamed portal). The response is a RowDescription message describing the rows that will be
172
                        // returned by executing the portal; or a NoData message if the portal does not contain a query that
173
                        // will return rows; or ErrorResponse if there is no such portal.
174
                        if v.DescType == "P" {
23✔
175
                                portal, ok := s.portals[v.Name]
8✔
176
                                if !ok {
9✔
177
                                        waitForSync = extQueryMode
1✔
178
                                        s.HandleError(fmt.Errorf("portal '%s' not found", v.Name))
1✔
179
                                        continue
1✔
180
                                }
181

182
                                if _, err = s.writeMessage(bm.RowDescription(portal.Statement.Results, portal.ResultColumnFormatCodes)); err != nil {
8✔
183
                                        waitForSync = extQueryMode
1✔
184
                                        continue
1✔
185
                                }
186
                        }
187
                case fm.SyncMsg:
19✔
188
                        waitForSync = false
19✔
189
                        s.writeMessage(bm.ReadyForQuery())
19✔
190
                case fm.BindMsg:
16✔
191
                        _, ok := s.portals[v.DestPortalName]
16✔
192
                        // unnamed portal overrides previous
16✔
193
                        if ok && v.DestPortalName != "" {
17✔
194
                                waitForSync = extQueryMode
1✔
195
                                s.HandleError(fmt.Errorf("portal '%s' already present", v.DestPortalName))
1✔
196
                                continue
1✔
197
                        }
198

199
                        st, ok := s.statements[v.PreparedStatementName]
15✔
200
                        if !ok {
16✔
201
                                waitForSync = extQueryMode
1✔
202
                                s.HandleError(fmt.Errorf("statement '%s' not found", v.PreparedStatementName))
1✔
203
                                continue
1✔
204
                        }
205

206
                        encodedParams, err := buildNamedParams(st.Params, v.ParamVals)
14✔
207
                        if err != nil {
15✔
208
                                waitForSync = extQueryMode
1✔
209
                                s.HandleError(err)
1✔
210
                                continue
1✔
211
                        }
212

213
                        if _, err = s.writeMessage(bm.BindComplete()); err != nil {
14✔
214
                                waitForSync = extQueryMode
1✔
215
                                continue
1✔
216
                        }
217

218
                        newPortal := &portal{
12✔
219
                                Name:                    v.DestPortalName,
12✔
220
                                Statement:               st,
12✔
221
                                Parameters:              encodedParams,
12✔
222
                                ResultColumnFormatCodes: v.ResultColumnFormatCodes,
12✔
223
                        }
12✔
224

12✔
225
                        s.portals[v.DestPortalName] = newPortal
12✔
226
                case fm.Execute:
11✔
227
                        //query execution
11✔
228
                        portal, ok := s.portals[v.PortalName]
11✔
229
                        if !ok {
11✔
UNCOV
230
                                waitForSync = extQueryMode
×
UNCOV
231
                                s.HandleError(fmt.Errorf("portal '%s' not found", v.PortalName))
×
UNCOV
232
                                continue
×
233
                        }
234

235
                        delete(s.portals, v.PortalName)
11✔
236

11✔
237
                        err := s.fetchAndWriteResults(portal.Statement.SQLStatement,
11✔
238
                                portal.Parameters,
11✔
239
                                portal.ResultColumnFormatCodes,
11✔
240
                                extQueryMode,
11✔
241
                        )
11✔
242
                        if err != nil {
14✔
243
                                waitForSync = extQueryMode
3✔
244
                                s.HandleError(err)
3✔
245
                        }
3✔
246
                case fm.FlushMsg:
×
247
                        // there is no buffer to be flushed
UNCOV
248
                default:
×
UNCOV
249
                        waitForSync = extQueryMode
×
UNCOV
250
                        s.HandleError(pserr.ErrUnknowMessageType)
×
251
                }
252
        }
253
}
254

255
func (s *session) fetchAndWriteResults(statements string, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, extQueryMode bool) error {
56✔
256
        if len(statements) == 0 {
56✔
UNCOV
257
                _, err := s.writeMessage(bm.EmptyQueryResponse())
×
UNCOV
258
                return err
×
UNCOV
259
        }
×
260

261
        if s.isInBlackList(statements) {
61✔
262
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
5✔
263
                return err
5✔
264
        }
5✔
265

266
        if i := s.isEmulableInternally(statements); i != nil {
54✔
267
                if err := s.tryToHandleInternally(i); err != nil && err != pserr.ErrMessageCannotBeHandledInternally {
4✔
268
                        return err
1✔
269
                }
1✔
270

271
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
2✔
272
                return err
2✔
273
        }
274

275
        stmts, err := sql.Parse(strings.NewReader(statements))
48✔
276
        if err != nil {
51✔
277
                return err
3✔
278
        }
3✔
279

280
        for _, stmt := range stmts {
91✔
281
                switch st := stmt.(type) {
46✔
282
                case *sql.UseDatabaseStmt:
1✔
283
                        {
2✔
284
                                return pserr.ErrUseDBStatementNotSupported
1✔
285
                        }
1✔
286
                case *sql.SelectStmt:
12✔
287
                        if err = s.query(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
13✔
288
                                return err
1✔
289
                        }
1✔
290
                default:
33✔
291
                        if err = s.exec(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
34✔
292
                                return err
1✔
293
                        }
1✔
294
                }
295
        }
296

297
        _, err = s.writeMessage(bm.CommandComplete([]byte("ok")))
42✔
298
        if err != nil {
42✔
UNCOV
299
                return err
×
300
        }
×
301

302
        return nil
42✔
303
}
304

305
func (s *session) query(st *sql.SelectStmt, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
12✔
306
        res, err := s.db.SQLQueryPrepared(s.ctx, s.tx, st, parameters)
12✔
307
        if err != nil {
13✔
308
                return err
1✔
309
        }
1✔
310

311
        if !skipRowDesc {
16✔
312
                if _, err = s.writeMessage(bm.RowDescription(res.Columns, nil)); err != nil {
5✔
UNCOV
313
                        return err
×
UNCOV
314
                }
×
315
        }
316

317
        _, err = s.writeMessage(bm.DataRow(res.Rows, len(res.Columns), resultColumnFormatCodes))
11✔
318
        return err
11✔
319
}
320

321
func (s *session) exec(st sql.SQLStmt, namedParams []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
33✔
322
        params := make(map[string]interface{}, len(namedParams))
33✔
323

33✔
324
        for _, p := range namedParams {
45✔
325
                params[p.Name] = schema.RawValue(p.Value)
12✔
326
        }
12✔
327

328
        ntx, _, err := s.db.SQLExecPrepared(s.ctx, s.tx, []sql.SQLStmt{st}, params)
33✔
329
        s.tx = ntx
33✔
330

33✔
331
        return err
33✔
332
}
333

334
type portal struct {
335
        Name                    string
336
        Statement               *statement
337
        Parameters              []*schema.NamedParam
338
        ResultColumnFormatCodes []int16
339
}
340

341
type statement struct {
342
        Name         string
343
        SQLStatement string
344
        PreparedStmt sql.SQLStmt
345
        Params       []*schema.Column
346
        Results      []*schema.Column
347
}
348

349
func (s *session) inferParamAndResultCols(stmt sql.SQLStmt) ([]*schema.Column, []*schema.Column, error) {
7✔
350
        resCols := make([]*schema.Column, 0)
7✔
351

7✔
352
        sel, ok := stmt.(*sql.SelectStmt)
7✔
353
        if ok {
13✔
354
                rr, err := s.db.SQLQueryRowReader(s.ctx, s.tx, sel, nil)
6✔
355
                if err != nil {
6✔
356
                        return nil, nil, err
×
357
                }
×
358
                cols, err := rr.Columns(s.ctx)
6✔
359
                if err != nil {
6✔
360
                        return nil, nil, err
×
361
                }
×
362
                for _, c := range cols {
30✔
363
                        resCols = append(resCols, &schema.Column{Name: c.Selector(), Type: c.Type})
24✔
364
                }
24✔
365
        }
366

367
        r, err := s.db.InferParametersPrepared(s.ctx, s.tx, stmt)
7✔
368
        if err != nil {
7✔
UNCOV
369
                return nil, nil, err
×
UNCOV
370
        }
×
371

372
        if len(r) > math.MaxInt16 {
7✔
UNCOV
373
                return nil, nil, pserr.ErrMaxParamsNumberExceeded
×
UNCOV
374
        }
×
375

376
        var paramsNameList []string
7✔
377
        for n := range r {
38✔
378
                paramsNameList = append(paramsNameList, n)
31✔
379
        }
31✔
380
        sort.Strings(paramsNameList)
7✔
381

7✔
382
        paramCols := make([]*schema.Column, 0)
7✔
383
        for _, n := range paramsNameList {
38✔
384
                paramCols = append(paramCols, &schema.Column{Name: n, Type: r[n]})
31✔
385
        }
31✔
386

387
        return paramCols, resCols, nil
7✔
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc