• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9299226861

30 May 2024 08:12AM UTC coverage: 89.451% (-0.04%) from 89.49%
9299226861

push

gh-ci

ostafen
Log request information as transaction metadata

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

120 of 152 new or added lines in 16 files covered. (78.95%)

6 existing lines in 2 files now uncovered.

34859 of 38970 relevant lines covered (89.45%)

161745.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.37
/pkg/pgsql/server/session.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "context"
21
        "crypto/tls"
22
        "strings"
23

24
        "net"
25

26
        "github.com/codenotary/immudb/embedded/logger"
27
        "github.com/codenotary/immudb/embedded/sql"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
        "github.com/codenotary/immudb/pkg/client"
30
        "github.com/codenotary/immudb/pkg/database"
31
        "github.com/codenotary/immudb/pkg/pgsql/errors"
32
        fm "github.com/codenotary/immudb/pkg/pgsql/server/fmessages"
33
        "github.com/codenotary/immudb/pkg/pgsql/server/pgmeta"
34
)
35

36
type session struct {
37
        immudbHost         string
38
        immudbPort         int
39
        tlsConfig          *tls.Config
40
        log                logger.Logger
41
        logRequestMetadata bool
42

43
        dbList database.DatabaseList
44

45
        client client.ImmuClient
46

47
        ctx    context.Context
48
        user   string
49
        ipAddr string
50
        db     database.DB
51
        tx     *sql.SQLTx
52

53
        mr MessageReader
54

55
        connParams      map[string]string
56
        protocolVersion string
57

58
        statements map[string]*statement
59
        portals    map[string]*portal
60
}
61

62
type Session interface {
63
        InitializeSession() error
64
        HandleStartup(context.Context) error
65
        QueryMachine() error
66
        HandleError(error)
67
        Close() error
68
}
69

70
func newSession(
71
        c net.Conn,
72
        immudbHost string,
73
        immudbPort int,
74
        log logger.Logger,
75
        tlsConfig *tls.Config,
76
        logRequestMetadata bool,
77
        dbList database.DatabaseList,
78
) *session {
27✔
79
        addr := c.RemoteAddr().String()
27✔
80
        i := strings.Index(addr, ":")
27✔
81
        if i >= 0 {
54✔
82
                addr = addr[:i]
27✔
83
        }
27✔
84

85
        return &session{
27✔
86
                immudbHost:         immudbHost,
27✔
87
                immudbPort:         immudbPort,
27✔
88
                tlsConfig:          tlsConfig,
27✔
89
                log:                log,
27✔
90
                logRequestMetadata: logRequestMetadata,
27✔
91
                dbList:             dbList,
27✔
92
                ipAddr:             addr,
27✔
93
                mr:                 NewMessageReader(c),
27✔
94
                statements:         make(map[string]*statement),
27✔
95
                portals:            make(map[string]*portal),
27✔
96
        }
27✔
97
}
98

99
func (s *session) HandleError(e error) {
28✔
100
        pgerr := errors.MapPgError(e)
28✔
101

28✔
102
        _, err := s.writeMessage(pgerr.Encode())
28✔
103
        if err != nil {
40✔
104
                s.log.Errorf("unable to write error on wire: %w", err)
12✔
105
        }
12✔
106
}
107

108
func (s *session) nextMessage() (interface{}, bool, error) {
204✔
109
        msg, err := s.mr.ReadRawMessage()
204✔
110
        if err != nil {
227✔
111
                return nil, false, err
23✔
112
        }
23✔
113

114
        s.log.Debugf("received %s - %s message", string(msg.t), pgmeta.MTypes[msg.t])
165✔
115

165✔
116
        extQueryMode := false
165✔
117

165✔
118
        i, err := s.parseRawMessage(msg)
165✔
119
        if msg.t == 'P' ||
165✔
120
                msg.t == 'B' ||
165✔
121
                msg.t == 'D' ||
165✔
122
                msg.t == 'E' ||
165✔
123
                msg.t == 'H' {
234✔
124
                extQueryMode = true
69✔
125
        }
69✔
126

127
        return i, extQueryMode, err
165✔
128
}
129

130
func (s *session) parseRawMessage(msg *rawMessage) (interface{}, error) {
165✔
131
        switch msg.t {
165✔
132
        case 'p':
23✔
133
                return fm.ParsePasswordMsg(msg.payload)
23✔
134
        case 'Q':
47✔
135
                return fm.ParseQueryMsg(msg.payload)
47✔
136
        case 'X':
7✔
137
                return fm.ParseTerminateMsg(msg.payload)
7✔
138
        case 'P':
22✔
139
                return fm.ParseParseMsg(msg.payload)
22✔
140
        case 'B':
17✔
141
                return fm.ParseBindMsg(msg.payload)
17✔
142
        case 'D':
19✔
143
                return fm.ParseDescribeMsg(msg.payload)
19✔
144
        case 'S':
19✔
145
                return fm.ParseSyncMsg(msg.payload)
19✔
146
        case 'E':
11✔
147
                return fm.ParseExecuteMsg(msg.payload)
11✔
148
        case 'H':
×
149
                return fm.ParseFlushMsg(msg.payload)
×
150
        default:
×
151
                return nil, errors.ErrUnknowMessageType
×
152
        }
153
}
154

155
func (s *session) writeMessage(msg []byte) (int, error) {
368✔
156
        if len(msg) > 0 {
736✔
157
                s.log.Debugf("write %s - %s message", string(msg[0]), pgmeta.MTypes[msg[0]])
368✔
158
        }
368✔
159

160
        return s.mr.Write(msg)
368✔
161
}
162

163
func (s *session) sqlTx() (*sql.SQLTx, error) {
45✔
164
        if s.tx != nil || !s.logRequestMetadata {
90✔
165
                return s.tx, nil
45✔
166
        }
45✔
167

NEW
168
        md := schema.Metadata{
×
NEW
169
                schema.UserRequestMetadataKey: s.user,
×
NEW
170
                schema.IpRequestMetadataKey:   s.ipAddr,
×
NEW
171
        }
×
NEW
172

×
NEW
173
        // create transaction explicitly to inject request metadata
×
NEW
174
        ctx := schema.ContextWithMetadata(s.ctx, md)
×
NEW
175
        return s.db.NewSQLTx(ctx, sql.DefaultTxOptions())
×
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc