• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6783902372

07 Nov 2023 11:34AM UTC coverage: 89.548% (-0.02%) from 89.571%
6783902372

push

gh-ci

jeroiraz
test(pkg/pgsql): unit testing for deallocate stmt

Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>

33645 of 37572 relevant lines covered (89.55%)

146027.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.84
/pkg/pgsql/server/query_machine.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "errors"
21
        "fmt"
22
        "io"
23
        "math"
24
        "sort"
25
        "strings"
26

27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        pserr "github.com/codenotary/immudb/pkg/pgsql/errors"
30
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
31
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
32
)
33

34
func (s *session) QueryMachine() error {
44✔
35
        var waitForSync = false
44✔
36

44✔
37
        _, err := s.writeMessage(bm.ReadyForQuery())
44✔
38
        if err != nil {
45✔
39
                return err
1✔
40
        }
1✔
41

42
        for {
220✔
43
                msg, extQueryMode, err := s.nextMessage()
177✔
44
                if err != nil {
198✔
45
                        if errors.Is(err, io.EOF) {
41✔
46
                                s.log.Warningf("connection is closed")
20✔
47
                                return nil
20✔
48
                        }
20✔
49
                        s.HandleError(err)
1✔
50
                        continue
1✔
51
                }
52

53
                // When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
54
                // then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal
55
                // message processing. (But note that no skipping occurs if an error is detected while processing Sync — this
56
                // ensures that there is one and only one ReadyForQuery sent for each Sync.)
57
                if waitForSync && extQueryMode {
142✔
58
                        if _, ok := msg.(fm.SyncMsg); !ok {
4✔
59
                                continue
2✔
60
                        }
61
                }
62

63
                switch v := msg.(type) {
138✔
64
                case fm.TerminateMsg:
7✔
65
                        return s.mr.CloseConnection()
7✔
66
                case fm.QueryMsg:
45✔
67
                        err := s.fetchAndWriteResults(v.GetStatements(), nil, nil, extQueryMode)
45✔
68
                        if err != nil {
52✔
69
                                waitForSync = extQueryMode
7✔
70
                                s.HandleError(err)
7✔
71
                        }
7✔
72

73
                        if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
49✔
74
                                waitForSync = extQueryMode
4✔
75
                        }
4✔
76
                case fm.ParseMsg:
22✔
77
                        _, ok := s.statements[v.DestPreparedStatementName]
22✔
78
                        // unnamed prepared statement overrides previous
22✔
79
                        if ok && v.DestPreparedStatementName != "" {
23✔
80
                                waitForSync = extQueryMode
1✔
81
                                s.HandleError(fmt.Errorf("statement '%s' already present", v.DestPreparedStatementName))
1✔
82
                                continue
1✔
83
                        }
84

85
                        var paramCols []*schema.Column
21✔
86
                        var resCols []*schema.Column
21✔
87
                        var stmt sql.SQLStmt
21✔
88

21✔
89
                        if !s.isInBlackList(v.Statements) {
30✔
90
                                stmts, err := sql.Parse(strings.NewReader(v.Statements))
9✔
91
                                if err != nil {
10✔
92
                                        waitForSync = extQueryMode
1✔
93
                                        s.HandleError(err)
1✔
94
                                        continue
1✔
95
                                }
96

97
                                // Note: as stated in the pgsql spec, the query string contained in a Parse message cannot include more than one SQL statement;
98
                                // else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
99
                                // in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
100
                                // complicate the protocol unduly.
101
                                if len(stmts) > 1 {
9✔
102
                                        waitForSync = extQueryMode
1✔
103
                                        s.HandleError(pserr.ErrMaxStmtNumberExceeded)
1✔
104
                                        continue
1✔
105
                                }
106
                                if paramCols, resCols, err = s.inferParamAndResultCols(stmts[0]); err != nil {
7✔
107
                                        waitForSync = extQueryMode
×
108
                                        s.HandleError(err)
×
109
                                        continue
×
110
                                }
111
                        }
112

113
                        _, err = s.writeMessage(bm.ParseComplete())
19✔
114
                        if err != nil {
20✔
115
                                waitForSync = extQueryMode
1✔
116
                                continue
1✔
117
                        }
118

119
                        newStatement := &statement{
18✔
120
                                // if no name is provided empty string marks the unnamed prepared statement
18✔
121
                                Name:         v.DestPreparedStatementName,
18✔
122
                                Params:       paramCols,
18✔
123
                                SQLStatement: v.Statements,
18✔
124
                                PreparedStmt: stmt,
18✔
125
                                Results:      resCols,
18✔
126
                        }
18✔
127

18✔
128
                        s.statements[v.DestPreparedStatementName] = newStatement
18✔
129

130
                case fm.DescribeMsg:
18✔
131
                        // The Describe message (statement variant) specifies the name of an existing prepared statement
18✔
132
                        // (or an empty string for the unnamed prepared statement). The response is a ParameterDescription
18✔
133
                        // message describing the parameters needed by the statement, followed by a RowDescription message
18✔
134
                        // describing the rows that will be returned when the statement is eventually executed (or a NoData
18✔
135
                        // message if the statement will not return rows). ErrorResponse is issued if there is no such prepared
18✔
136
                        // statement. Note that since Bind has not yet been issued, the formats to be used for returned columns
18✔
137
                        // are not yet known to the backend; the format code fields in the RowDescription message will be zeroes
18✔
138
                        // in this case.
18✔
139
                        if v.DescType == "S" {
28✔
140
                                st, ok := s.statements[v.Name]
10✔
141
                                if !ok {
11✔
142
                                        waitForSync = extQueryMode
1✔
143
                                        s.HandleError(fmt.Errorf("statement '%s' not found", v.Name))
1✔
144
                                        continue
1✔
145
                                }
146

147
                                if _, err = s.writeMessage(bm.ParameterDescription(st.Params)); err != nil {
10✔
148
                                        waitForSync = extQueryMode
1✔
149
                                        continue
1✔
150
                                }
151

152
                                if _, err := s.writeMessage(bm.RowDescription(st.Results, nil)); err != nil {
9✔
153
                                        waitForSync = extQueryMode
1✔
154
                                        continue
1✔
155
                                }
156
                        }
157
                        // The Describe message (portal variant) specifies the name of an existing portal (or an empty string
158
                        // for the unnamed portal). The response is a RowDescription message describing the rows that will be
159
                        // returned by executing the portal; or a NoData message if the portal does not contain a query that
160
                        // will return rows; or ErrorResponse if there is no such portal.
161
                        if v.DescType == "P" {
23✔
162
                                portal, ok := s.portals[v.Name]
8✔
163
                                if !ok {
9✔
164
                                        waitForSync = extQueryMode
1✔
165
                                        s.HandleError(fmt.Errorf("portal '%s' not found", v.Name))
1✔
166
                                        continue
1✔
167
                                }
168

169
                                if _, err = s.writeMessage(bm.RowDescription(portal.Statement.Results, portal.ResultColumnFormatCodes)); err != nil {
8✔
170
                                        waitForSync = extQueryMode
1✔
171
                                        continue
1✔
172
                                }
173
                        }
174
                case fm.SyncMsg:
19✔
175
                        waitForSync = false
19✔
176
                        s.writeMessage(bm.ReadyForQuery())
19✔
177
                case fm.BindMsg:
16✔
178
                        _, ok := s.portals[v.DestPortalName]
16✔
179
                        // unnamed portal overrides previous
16✔
180
                        if ok && v.DestPortalName != "" {
17✔
181
                                waitForSync = extQueryMode
1✔
182
                                s.HandleError(fmt.Errorf("portal '%s' already present", v.DestPortalName))
1✔
183
                                continue
1✔
184
                        }
185

186
                        st, ok := s.statements[v.PreparedStatementName]
15✔
187
                        if !ok {
16✔
188
                                waitForSync = extQueryMode
1✔
189
                                s.HandleError(fmt.Errorf("statement '%s' not found", v.PreparedStatementName))
1✔
190
                                continue
1✔
191
                        }
192

193
                        encodedParams, err := buildNamedParams(st.Params, v.ParamVals)
14✔
194
                        if err != nil {
15✔
195
                                waitForSync = extQueryMode
1✔
196
                                s.HandleError(err)
1✔
197
                                continue
1✔
198
                        }
199

200
                        if _, err = s.writeMessage(bm.BindComplete()); err != nil {
14✔
201
                                waitForSync = extQueryMode
1✔
202
                                continue
1✔
203
                        }
204

205
                        newPortal := &portal{
12✔
206
                                Name:                    v.DestPortalName,
12✔
207
                                Statement:               st,
12✔
208
                                Parameters:              encodedParams,
12✔
209
                                ResultColumnFormatCodes: v.ResultColumnFormatCodes,
12✔
210
                        }
12✔
211

12✔
212
                        s.portals[v.DestPortalName] = newPortal
12✔
213
                case fm.Execute:
11✔
214
                        //query execution
11✔
215
                        portal, ok := s.portals[v.PortalName]
11✔
216
                        if !ok {
11✔
217
                                waitForSync = extQueryMode
×
218
                                s.HandleError(fmt.Errorf("portal '%s' not found", v.PortalName))
×
219
                                continue
×
220
                        }
221

222
                        delete(s.portals, v.PortalName)
11✔
223

11✔
224
                        err := s.fetchAndWriteResults(portal.Statement.SQLStatement,
11✔
225
                                portal.Parameters,
11✔
226
                                portal.ResultColumnFormatCodes,
11✔
227
                                extQueryMode,
11✔
228
                        )
11✔
229
                        if err != nil {
14✔
230
                                waitForSync = extQueryMode
3✔
231
                                s.HandleError(err)
3✔
232
                        }
3✔
233
                case fm.FlushMsg:
×
234
                        // there is no buffer to be flushed
235
                default:
×
236
                        waitForSync = extQueryMode
×
237
                        s.HandleError(pserr.ErrUnknowMessageType)
×
238
                }
239
        }
240
}
241

242
func (s *session) fetchAndWriteResults(statements string, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, extQueryMode bool) error {
56✔
243
        if len(statements) == 0 {
56✔
244
                _, err := s.writeMessage(bm.EmptyQueryResponse())
×
245
                return err
×
246
        }
×
247

248
        if s.isInBlackList(statements) {
61✔
249
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
5✔
250
                return err
5✔
251
        }
5✔
252

253
        if i := s.isEmulableInternally(statements); i != nil {
54✔
254
                if err := s.tryToHandleInternally(i); err != nil && err != pserr.ErrMessageCannotBeHandledInternally {
4✔
255
                        return err
1✔
256
                }
1✔
257

258
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
2✔
259
                return err
2✔
260
        }
261

262
        stmts, err := sql.Parse(strings.NewReader(statements))
48✔
263
        if err != nil {
51✔
264
                return err
3✔
265
        }
3✔
266

267
        for _, stmt := range stmts {
91✔
268
                switch st := stmt.(type) {
46✔
269
                case *sql.UseDatabaseStmt:
1✔
270
                        {
2✔
271
                                return pserr.ErrUseDBStatementNotSupported
1✔
272
                        }
1✔
273
                case *sql.SelectStmt:
12✔
274
                        if err = s.query(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
13✔
275
                                return err
1✔
276
                        }
1✔
277
                default:
33✔
278
                        if err = s.exec(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
34✔
279
                                return err
1✔
280
                        }
1✔
281
                }
282
        }
283

284
        _, err = s.writeMessage(bm.CommandComplete([]byte("ok")))
42✔
285
        if err != nil {
42✔
286
                return err
×
287
        }
×
288

289
        return nil
42✔
290
}
291

292
func (s *session) query(st *sql.SelectStmt, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
12✔
293
        res, err := s.db.SQLQueryPrepared(s.ctx, s.tx, st, parameters)
12✔
294
        if err != nil {
13✔
295
                return err
1✔
296
        }
1✔
297

298
        if !skipRowDesc {
16✔
299
                if _, err = s.writeMessage(bm.RowDescription(res.Columns, nil)); err != nil {
5✔
300
                        return err
×
301
                }
×
302
        }
303

304
        _, err = s.writeMessage(bm.DataRow(res.Rows, len(res.Columns), resultColumnFormatCodes))
11✔
305
        return err
11✔
306
}
307

308
func (s *session) exec(st sql.SQLStmt, namedParams []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
33✔
309
        params := make(map[string]interface{}, len(namedParams))
33✔
310

33✔
311
        for _, p := range namedParams {
45✔
312
                params[p.Name] = schema.RawValue(p.Value)
12✔
313
        }
12✔
314

315
        ntx, _, err := s.db.SQLExecPrepared(s.ctx, s.tx, []sql.SQLStmt{st}, params)
33✔
316
        s.tx = ntx
33✔
317

33✔
318
        return err
33✔
319
}
320

321
type portal struct {
322
        Name                    string
323
        Statement               *statement
324
        Parameters              []*schema.NamedParam
325
        ResultColumnFormatCodes []int16
326
}
327

328
type statement struct {
329
        Name         string
330
        SQLStatement string
331
        PreparedStmt sql.SQLStmt
332
        Params       []*schema.Column
333
        Results      []*schema.Column
334
}
335

336
func (s *session) inferParamAndResultCols(stmt sql.SQLStmt) ([]*schema.Column, []*schema.Column, error) {
7✔
337
        resCols := make([]*schema.Column, 0)
7✔
338

7✔
339
        sel, ok := stmt.(*sql.SelectStmt)
7✔
340
        if ok {
13✔
341
                rr, err := s.db.SQLQueryRowReader(s.ctx, s.tx, sel, nil)
6✔
342
                if err != nil {
6✔
343
                        return nil, nil, err
×
344
                }
×
345
                cols, err := rr.Columns(s.ctx)
6✔
346
                if err != nil {
6✔
347
                        return nil, nil, err
×
348
                }
×
349
                for _, c := range cols {
30✔
350
                        resCols = append(resCols, &schema.Column{Name: c.Selector(), Type: c.Type})
24✔
351
                }
24✔
352
        }
353

354
        r, err := s.db.InferParametersPrepared(s.ctx, s.tx, stmt)
7✔
355
        if err != nil {
7✔
356
                return nil, nil, err
×
357
        }
×
358

359
        if len(r) > math.MaxInt16 {
7✔
360
                return nil, nil, pserr.ErrMaxParamsNumberExceeded
×
361
        }
×
362

363
        var paramsNameList []string
7✔
364
        for n := range r {
38✔
365
                paramsNameList = append(paramsNameList, n)
31✔
366
        }
31✔
367
        sort.Strings(paramsNameList)
7✔
368

7✔
369
        paramCols := make([]*schema.Column, 0)
7✔
370
        for _, n := range paramsNameList {
38✔
371
                paramCols = append(paramCols, &schema.Column{Name: n, Type: r[n]})
31✔
372
        }
31✔
373

374
        return paramCols, resCols, nil
7✔
375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc