• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9299226861

30 May 2024 08:12AM UTC coverage: 89.451% (-0.04%) from 89.49%
9299226861

push

gh-ci

ostafen
Log request information as transaction metadata

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

120 of 152 new or added lines in 16 files covered. (78.95%)

6 existing lines in 2 files now uncovered.

34859 of 38970 relevant lines covered (89.45%)

161745.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.55
/pkg/pgsql/server/query_machine.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "errors"
21
        "fmt"
22
        "io"
23
        "math"
24
        "sort"
25
        "strings"
26

27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        pserr "github.com/codenotary/immudb/pkg/pgsql/errors"
30
        bm "github.com/codenotary/immudb/pkg/pgsql/server/bmessages"
31
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
32
)
33

34
const (
35
        helpPrefix      = "select relname, nspname, relkind from pg_catalog.pg_class c, pg_catalog.pg_namespace n where relkind in ('r', 'v', 'm', 'f', 'p') and nspname not in ('pg_catalog', 'information_schema', 'pg_toast', 'pg_temp_1') and n.oid = relnamespace order by nspname, relname"
36
        tableHelpPrefix = "select n.nspname, c.relname, a.attname, a.atttypid, t.typname, a.attnum, a.attlen, a.atttypmod, a.attnotnull, c.relhasrules, c.relkind, c.oid, pg_get_expr(d.adbin, d.adrelid), case t.typtype when 'd' then t.typbasetype else 0 end, t.typtypmod, c.relhasoids, '', c.relhassubclass from (((pg_catalog.pg_class c inner join pg_catalog.pg_namespace n on n.oid = c.relnamespace and c.relname like '"
37

38
        maxRowsPerMessage = 1024
39
)
40

41
func (s *session) QueryMachine() error {
46✔
42
        var waitForSync = false
46✔
43

46✔
44
        _, err := s.writeMessage(bm.ReadyForQuery())
46✔
45
        if err != nil {
47✔
46
                return err
1✔
47
        }
1✔
48

49
        for {
226✔
50
                msg, extQueryMode, err := s.nextMessage()
181✔
51
                if err != nil {
204✔
52
                        if errors.Is(err, io.EOF) {
45✔
53
                                s.log.Warningf("connection is closed")
22✔
54
                                return nil
22✔
55
                        }
22✔
56
                        s.HandleError(err)
1✔
57
                        continue
1✔
58
                }
59

60
                // When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
61
                // then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal
62
                // message processing. (But note that no skipping occurs if an error is detected while processing Sync — this
63
                // ensures that there is one and only one ReadyForQuery sent for each Sync.)
64
                if waitForSync && extQueryMode {
144✔
65
                        if _, ok := msg.(fm.SyncMsg); !ok {
4✔
66
                                continue
2✔
67
                        }
68
                }
69

70
                switch v := msg.(type) {
140✔
71
                case fm.TerminateMsg:
7✔
72
                        return s.mr.CloseConnection()
7✔
73
                case fm.QueryMsg:
47✔
74
                        statements := v.GetStatements()
47✔
75

47✔
76
                        if statements == helpPrefix {
47✔
77
                                statements = "show tables"
×
78
                        }
×
79

80
                        if strings.HasPrefix(statements, tableHelpPrefix) {
48✔
81
                                tableName := strings.Split(strings.TrimPrefix(statements, tableHelpPrefix), "'")[0]
1✔
82
                                statements = fmt.Sprintf("select column_name as tq, column_name as tow, column_name as tn, column_name as COLUMN_NAME, type_name as DATA_TYPE, type_name as TYPE_NAME, type_name as p, type_name as l, type_name as s, type_name as r, is_nullable as NULLABLE, column_name as rk, column_name as cd, type_name as SQL_DATA_TYPE, type_name as sts, column_name as coll, type_name as orp, is_nullable as IS_NULLABLE, type_name as dz, type_name as ft, type_name as iau, type_name as pn, column_name as toi, column_name as btd, column_name as tmo, column_name as tin from table(%s)", tableName)
1✔
83
                        }
1✔
84

85
                        err := s.fetchAndWriteResults(statements, nil, nil, extQueryMode)
47✔
86
                        if err != nil {
56✔
87
                                waitForSync = extQueryMode
9✔
88
                                s.HandleError(err)
9✔
89
                        }
9✔
90

91
                        if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
53✔
92
                                waitForSync = extQueryMode
6✔
93
                        }
6✔
94
                case fm.ParseMsg:
22✔
95
                        _, ok := s.statements[v.DestPreparedStatementName]
22✔
96
                        // unnamed prepared statement overrides previous
22✔
97
                        if ok && v.DestPreparedStatementName != "" {
23✔
98
                                waitForSync = extQueryMode
1✔
99
                                s.HandleError(fmt.Errorf("statement '%s' already present", v.DestPreparedStatementName))
1✔
100
                                continue
1✔
101
                        }
102

103
                        var paramCols []sql.ColDescriptor
21✔
104
                        var resCols []sql.ColDescriptor
21✔
105
                        var stmt sql.SQLStmt
21✔
106

21✔
107
                        if !s.isInBlackList(v.Statements) {
30✔
108
                                stmts, err := sql.Parse(strings.NewReader(v.Statements))
9✔
109
                                if err != nil {
10✔
110
                                        waitForSync = extQueryMode
1✔
111
                                        s.HandleError(err)
1✔
112
                                        continue
1✔
113
                                }
114

115
                                // Note: as stated in the pgsql spec, the query string contained in a Parse message cannot include more than one SQL statement;
116
                                // else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
117
                                // in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
118
                                // complicate the protocol unduly.
119
                                if len(stmts) > 1 {
9✔
120
                                        waitForSync = extQueryMode
1✔
121
                                        s.HandleError(pserr.ErrMaxStmtNumberExceeded)
1✔
122
                                        continue
1✔
123
                                }
124
                                if paramCols, resCols, err = s.inferParamAndResultCols(stmts[0]); err != nil {
7✔
125
                                        waitForSync = extQueryMode
×
126
                                        s.HandleError(err)
×
127
                                        continue
×
128
                                }
129
                        }
130

131
                        _, err = s.writeMessage(bm.ParseComplete())
19✔
132
                        if err != nil {
20✔
133
                                waitForSync = extQueryMode
1✔
134
                                continue
1✔
135
                        }
136

137
                        newStatement := &statement{
18✔
138
                                // if no name is provided empty string marks the unnamed prepared statement
18✔
139
                                Name:         v.DestPreparedStatementName,
18✔
140
                                Params:       paramCols,
18✔
141
                                SQLStatement: v.Statements,
18✔
142
                                PreparedStmt: stmt,
18✔
143
                                Results:      resCols,
18✔
144
                        }
18✔
145

18✔
146
                        s.statements[v.DestPreparedStatementName] = newStatement
18✔
147

148
                case fm.DescribeMsg:
18✔
149
                        // The Describe message (statement variant) specifies the name of an existing prepared statement
18✔
150
                        // (or an empty string for the unnamed prepared statement). The response is a ParameterDescription
18✔
151
                        // message describing the parameters needed by the statement, followed by a RowDescription message
18✔
152
                        // describing the rows that will be returned when the statement is eventually executed (or a NoData
18✔
153
                        // message if the statement will not return rows). ErrorResponse is issued if there is no such prepared
18✔
154
                        // statement. Note that since Bind has not yet been issued, the formats to be used for returned columns
18✔
155
                        // are not yet known to the backend; the format code fields in the RowDescription message will be zeroes
18✔
156
                        // in this case.
18✔
157
                        if v.DescType == "S" {
28✔
158
                                st, ok := s.statements[v.Name]
10✔
159
                                if !ok {
11✔
160
                                        waitForSync = extQueryMode
1✔
161
                                        s.HandleError(fmt.Errorf("statement '%s' not found", v.Name))
1✔
162
                                        continue
1✔
163
                                }
164

165
                                if _, err = s.writeMessage(bm.ParameterDescription(st.Params)); err != nil {
10✔
166
                                        waitForSync = extQueryMode
1✔
167
                                        continue
1✔
168
                                }
169

170
                                if _, err := s.writeMessage(bm.RowDescription(st.Results, nil)); err != nil {
9✔
171
                                        waitForSync = extQueryMode
1✔
172
                                        continue
1✔
173
                                }
174
                        }
175
                        // The Describe message (portal variant) specifies the name of an existing portal (or an empty string
176
                        // for the unnamed portal). The response is a RowDescription message describing the rows that will be
177
                        // returned by executing the portal; or a NoData message if the portal does not contain a query that
178
                        // will return rows; or ErrorResponse if there is no such portal.
179
                        if v.DescType == "P" {
23✔
180
                                portal, ok := s.portals[v.Name]
8✔
181
                                if !ok {
9✔
182
                                        waitForSync = extQueryMode
1✔
183
                                        s.HandleError(fmt.Errorf("portal '%s' not found", v.Name))
1✔
184
                                        continue
1✔
185
                                }
186

187
                                if _, err = s.writeMessage(bm.RowDescription(portal.Statement.Results, portal.ResultColumnFormatCodes)); err != nil {
8✔
188
                                        waitForSync = extQueryMode
1✔
189
                                        continue
1✔
190
                                }
191
                        }
192
                case fm.SyncMsg:
19✔
193
                        waitForSync = false
19✔
194
                        s.writeMessage(bm.ReadyForQuery())
19✔
195
                case fm.BindMsg:
16✔
196
                        _, ok := s.portals[v.DestPortalName]
16✔
197
                        // unnamed portal overrides previous
16✔
198
                        if ok && v.DestPortalName != "" {
17✔
199
                                waitForSync = extQueryMode
1✔
200
                                s.HandleError(fmt.Errorf("portal '%s' already present", v.DestPortalName))
1✔
201
                                continue
1✔
202
                        }
203

204
                        st, ok := s.statements[v.PreparedStatementName]
15✔
205
                        if !ok {
16✔
206
                                waitForSync = extQueryMode
1✔
207
                                s.HandleError(fmt.Errorf("statement '%s' not found", v.PreparedStatementName))
1✔
208
                                continue
1✔
209
                        }
210

211
                        encodedParams, err := buildNamedParams(st.Params, v.ParamVals)
14✔
212
                        if err != nil {
15✔
213
                                waitForSync = extQueryMode
1✔
214
                                s.HandleError(err)
1✔
215
                                continue
1✔
216
                        }
217

218
                        if _, err = s.writeMessage(bm.BindComplete()); err != nil {
14✔
219
                                waitForSync = extQueryMode
1✔
220
                                continue
1✔
221
                        }
222

223
                        newPortal := &portal{
12✔
224
                                Name:                    v.DestPortalName,
12✔
225
                                Statement:               st,
12✔
226
                                Parameters:              encodedParams,
12✔
227
                                ResultColumnFormatCodes: v.ResultColumnFormatCodes,
12✔
228
                        }
12✔
229

12✔
230
                        s.portals[v.DestPortalName] = newPortal
12✔
231
                case fm.Execute:
11✔
232
                        //query execution
11✔
233
                        portal, ok := s.portals[v.PortalName]
11✔
234
                        if !ok {
11✔
235
                                waitForSync = extQueryMode
×
236
                                s.HandleError(fmt.Errorf("portal '%s' not found", v.PortalName))
×
237
                                continue
×
238
                        }
239

240
                        delete(s.portals, v.PortalName)
11✔
241

11✔
242
                        err := s.fetchAndWriteResults(portal.Statement.SQLStatement,
11✔
243
                                portal.Parameters,
11✔
244
                                portal.ResultColumnFormatCodes,
11✔
245
                                extQueryMode,
11✔
246
                        )
11✔
247
                        if err != nil {
14✔
248
                                waitForSync = extQueryMode
3✔
249
                                s.HandleError(err)
3✔
250
                        }
3✔
251
                case fm.FlushMsg:
×
252
                        // there is no buffer to be flushed
253
                default:
×
254
                        waitForSync = extQueryMode
×
255
                        s.HandleError(pserr.ErrUnknowMessageType)
×
256
                }
257
        }
258
}
259

260
func (s *session) fetchAndWriteResults(statements string, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, extQueryMode bool) error {
58✔
261
        if s.isInBlackList(statements) {
63✔
262
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
5✔
263
                return err
5✔
264
        }
5✔
265

266
        if i := s.isEmulableInternally(statements); i != nil {
56✔
267
                if err := s.tryToHandleInternally(i); err != nil && err != pserr.ErrMessageCannotBeHandledInternally {
4✔
268
                        return err
1✔
269
                }
1✔
270

271
                _, err := s.writeMessage(bm.CommandComplete([]byte("ok")))
2✔
272
                return err
2✔
273
        }
274

275
        stmts, err := sql.Parse(strings.NewReader(statements))
50✔
276
        if err != nil {
55✔
277
                return err
5✔
278
        }
5✔
279

280
        for _, stmt := range stmts {
91✔
281
                switch st := stmt.(type) {
46✔
282
                case *sql.UseDatabaseStmt:
1✔
283
                        {
2✔
284
                                return pserr.ErrUseDBStatementNotSupported
1✔
285
                        }
1✔
286
                case *sql.SelectStmt:
12✔
287
                        if err = s.query(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
13✔
288
                                return err
1✔
289
                        }
1✔
290
                default:
33✔
291
                        if err = s.exec(st, parameters, resultColumnFormatCodes, extQueryMode); err != nil {
34✔
292
                                return err
1✔
293
                        }
1✔
294
                }
295
        }
296

297
        _, err = s.writeMessage(bm.CommandComplete([]byte("ok")))
42✔
298
        if err != nil {
42✔
299
                return err
×
300
        }
×
301

302
        return nil
42✔
303
}
304

305
func (s *session) query(st *sql.SelectStmt, parameters []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
12✔
306
        tx, err := s.sqlTx()
12✔
307
        if err != nil {
12✔
NEW
308
                return err
×
NEW
309
        }
×
310

311
        reader, err := s.db.SQLQueryPrepared(s.ctx, tx, st, schema.NamedParamsFromProto(parameters))
12✔
312
        if err != nil {
13✔
313
                return err
1✔
314
        }
1✔
315

316
        cols, err := reader.Columns(s.ctx)
11✔
317
        if err != nil {
11✔
318
                return err
×
319
        }
×
320

321
        if !skipRowDesc {
16✔
322
                if _, err = s.writeMessage(bm.RowDescription(cols, nil)); err != nil {
5✔
323
                        return err
×
324
                }
×
325
        }
326

327
        return sql.ReadRowsBatch(s.ctx, reader, maxRowsPerMessage, func(rowBatch []*sql.Row) error {
21✔
328
                _, err := s.writeMessage(bm.DataRow(rowBatch, len(cols), resultColumnFormatCodes))
10✔
329
                return err
10✔
330
        })
10✔
331
}
332

333
func (s *session) exec(st sql.SQLStmt, namedParams []*schema.NamedParam, resultColumnFormatCodes []int16, skipRowDesc bool) error {
33✔
334
        params := make(map[string]interface{}, len(namedParams))
33✔
335

33✔
336
        for _, p := range namedParams {
45✔
337
                params[p.Name] = schema.RawValue(p.Value)
12✔
338
        }
12✔
339

340
        tx, err := s.sqlTx()
33✔
341
        if err != nil {
33✔
NEW
342
                return err
×
NEW
343
        }
×
344

345
        ntx, _, err := s.db.SQLExecPrepared(s.ctx, tx, []sql.SQLStmt{st}, params)
33✔
346
        s.tx = ntx
33✔
347

33✔
348
        return err
33✔
349
}
350

351
type portal struct {
352
        Name                    string
353
        Statement               *statement
354
        Parameters              []*schema.NamedParam
355
        ResultColumnFormatCodes []int16
356
}
357

358
type statement struct {
359
        Name         string
360
        SQLStatement string
361
        PreparedStmt sql.SQLStmt
362
        Params       []sql.ColDescriptor
363
        Results      []sql.ColDescriptor
364
}
365

366
func (s *session) inferParamAndResultCols(stmt sql.SQLStmt) ([]sql.ColDescriptor, []sql.ColDescriptor, error) {
7✔
367
        var resCols []sql.ColDescriptor
7✔
368

7✔
369
        sel, ok := stmt.(*sql.SelectStmt)
7✔
370
        if ok {
13✔
371
                rr, err := s.db.SQLQueryPrepared(s.ctx, s.tx, sel, nil)
6✔
372
                if err != nil {
6✔
373
                        return nil, nil, err
×
374
                }
×
375
                resCols, err = rr.Columns(s.ctx)
6✔
376
                if err != nil {
6✔
377
                        return nil, nil, err
×
378
                }
×
379
        }
380

381
        r, err := s.db.InferParametersPrepared(s.ctx, s.tx, stmt)
7✔
382
        if err != nil {
7✔
383
                return nil, nil, err
×
384
        }
×
385

386
        if len(r) > math.MaxInt16 {
7✔
387
                return nil, nil, pserr.ErrMaxParamsNumberExceeded
×
388
        }
×
389

390
        var paramsNameList []string
7✔
391
        for n := range r {
38✔
392
                paramsNameList = append(paramsNameList, n)
31✔
393
        }
31✔
394
        sort.Strings(paramsNameList)
7✔
395

7✔
396
        paramCols := make([]sql.ColDescriptor, 0)
7✔
397
        for _, n := range paramsNameList {
38✔
398
                paramCols = append(paramCols, sql.ColDescriptor{Column: n, Type: r[n]})
31✔
399
        }
31✔
400
        return paramCols, resCols, nil
7✔
401
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc