• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

go-sql-driver / mysql / 15624673805

13 Jun 2025 01:56AM UTC coverage: 82.906% (-0.004%) from 82.91%
15624673805

push

github

web-flow
[1.9] fix PING on compressed connections (#1723)

Add missing mc.syncSequence()

Fix #1718

15 of 18 new or added lines in 2 files covered. (83.33%)

7 existing lines in 3 files now uncovered.

3264 of 3937 relevant lines covered (82.91%)

2489344.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.29
/compress.go
1
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package
2
//
3
// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved.
4
//
5
// This Source Code Form is subject to the terms of the Mozilla Public
6
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
7
// You can obtain one at http://mozilla.org/MPL/2.0/.
8

9
package mysql
10

11
import (
12
        "bytes"
13
        "compress/zlib"
14
        "fmt"
15
        "io"
16
        "sync"
17
)
18

19
var (
20
        zrPool *sync.Pool // Do not use directly. Use zDecompress() instead.
21
        zwPool *sync.Pool // Do not use directly. Use zCompress() instead.
22
)
23

24
func init() {
32✔
25
        zrPool = &sync.Pool{
32✔
26
                New: func() any { return nil },
2,943✔
27
        }
28
        zwPool = &sync.Pool{
32✔
29
                New: func() any {
308✔
30
                        zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2)
276✔
31
                        if err != nil {
276✔
32
                                panic(err) // compress/zlib return non-nil error only if level is invalid
×
33
                        }
34
                        return zw
276✔
35
                },
36
        }
37
}
38

39
func zDecompress(src []byte, dst *bytes.Buffer) (int, error) {
10,727✔
40
        br := bytes.NewReader(src)
10,727✔
41
        var zr io.ReadCloser
10,727✔
42
        var err error
10,727✔
43

10,727✔
44
        if a := zrPool.Get(); a == nil {
13,638✔
45
                if zr, err = zlib.NewReader(br); err != nil {
2,911✔
46
                        return 0, err
×
47
                }
×
48
        } else {
7,816✔
49
                zr = a.(io.ReadCloser)
7,816✔
50
                if err := zr.(zlib.Resetter).Reset(br, nil); err != nil {
7,816✔
51
                        return 0, err
×
52
                }
×
53
        }
54

55
        n, _ := dst.ReadFrom(zr) // ignore err because zr.Close() will return it again.
10,727✔
56
        err = zr.Close()         // zr.Close() may return chuecksum error.
10,727✔
57
        zrPool.Put(zr)
10,727✔
58
        return int(n), err
10,727✔
59
}
60

61
func zCompress(src []byte, dst io.Writer) error {
544✔
62
        zw := zwPool.Get().(*zlib.Writer)
544✔
63
        zw.Reset(dst)
544✔
64
        if _, err := zw.Write(src); err != nil {
544✔
65
                return err
×
66
        }
×
67
        err := zw.Close()
544✔
68
        zwPool.Put(zw)
544✔
69
        return err
544✔
70
}
71

72
type compIO struct {
73
        mc   *mysqlConn
74
        buff bytes.Buffer
75
}
76

77
func newCompIO(mc *mysqlConn) *compIO {
3,510✔
78
        return &compIO{
3,510✔
79
                mc: mc,
3,510✔
80
        }
3,510✔
81
}
3,510✔
82

83
func (c *compIO) reset() {
24,190✔
84
        c.buff.Reset()
24,190✔
85
}
24,190✔
86

87
func (c *compIO) readNext(need int) ([]byte, error) {
16,894,136✔
88
        for c.buff.Len() < need {
16,913,994✔
89
                if err := c.readCompressedPacket(); err != nil {
19,886✔
90
                        return nil, err
28✔
91
                }
28✔
92
        }
93
        data := c.buff.Next(need)
16,894,108✔
94
        return data[:need:need], nil // prevent caller writes into c.buff
16,894,108✔
95
}
96

97
func (c *compIO) readCompressedPacket() error {
19,858✔
98
        header, err := c.mc.readNext(7)
19,858✔
99
        if err != nil {
19,886✔
100
                return err
28✔
101
        }
28✔
102
        _ = header[6] // bounds check hint to compiler; guaranteed by readNext
19,830✔
103

19,830✔
104
        // compressed header structure
19,830✔
105
        comprLength := getUint24(header[0:3])
19,830✔
106
        compressionSequence := header[3]
19,830✔
107
        uncompressedLength := getUint24(header[4:7])
19,830✔
108
        if debug {
19,830✔
109
                fmt.Printf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n",
×
110
                        comprLength, uncompressedLength, compressionSequence, c.mc.sequence)
×
111
        }
×
112
        // Do not return ErrPktSync here.
113
        // Server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes)
114
        // before receiving all packets from client. In this case, seqnr is younger than expected.
115
        // NOTE: Both of mariadbclient and mysqlclient do not check seqnr. Only server checks it.
116
        if debug && compressionSequence != c.mc.compressSequence {
19,830✔
117
                fmt.Printf("WARN: unexpected cmpress seq nr: expected %v, got %v",
×
NEW
118
                        c.mc.compressSequence, compressionSequence)
×
119
        }
×
120
        c.mc.compressSequence = compressionSequence + 1
19,830✔
121

19,830✔
122
        comprData, err := c.mc.readNext(comprLength)
19,830✔
123
        if err != nil {
19,830✔
124
                return err
×
125
        }
×
126

127
        // if payload is uncompressed, its length will be specified as zero, and its
128
        // true length is contained in comprLength
129
        if uncompressedLength == 0 {
28,933✔
130
                c.buff.Write(comprData)
9,103✔
131
                return nil
9,103✔
132
        }
9,103✔
133

134
        // use existing capacity in bytesBuf if possible
135
        c.buff.Grow(uncompressedLength)
10,727✔
136
        nread, err := zDecompress(comprData, &c.buff)
10,727✔
137
        if err != nil {
10,727✔
138
                return err
×
139
        }
×
140
        if nread != uncompressedLength {
10,727✔
141
                return fmt.Errorf("invalid compressed packet: uncompressed length in header is %d, actual %d",
×
142
                        uncompressedLength, nread)
×
143
        }
×
144
        return nil
10,727✔
145
}
146

147
const minCompressLength = 150
148
const maxPayloadLen = maxPacketSize - 4
149

150
// writePackets sends one or some packets with compression.
151
// Use this instead of mc.netConn.Write() when mc.compress is true.
152
func (c *compIO) writePackets(packets []byte) (int, error) {
24,670✔
153
        totalBytes := len(packets)
24,670✔
154
        blankHeader := make([]byte, 7)
24,670✔
155
        buf := &c.buff
24,670✔
156

24,670✔
157
        for len(packets) > 0 {
49,436✔
158
                payloadLen := min(maxPayloadLen, len(packets))
24,766✔
159
                payload := packets[:payloadLen]
24,766✔
160
                uncompressedLen := payloadLen
24,766✔
161

24,766✔
162
                buf.Reset()
24,766✔
163
                buf.Write(blankHeader) // Buffer.Write() never returns error
24,766✔
164

24,766✔
165
                // If payload is less than minCompressLength, don't compress.
24,766✔
166
                if uncompressedLen < minCompressLength {
48,988✔
167
                        buf.Write(payload)
24,222✔
168
                        uncompressedLen = 0
24,222✔
169
                } else {
24,766✔
170
                        err := zCompress(payload, buf)
544✔
171
                        if debug && err != nil {
544✔
172
                                fmt.Printf("zCompress error: %v", err)
×
173
                        }
×
174
                        // do not compress if compressed data is larger than uncompressed data
175
                        // I intentionally miss 7 byte header in the buf; zCompress must compress more than 7 bytes.
176
                        if err != nil || buf.Len() >= uncompressedLen {
576✔
177
                                buf.Reset()
32✔
178
                                buf.Write(blankHeader)
32✔
179
                                buf.Write(payload)
32✔
180
                                uncompressedLen = 0
32✔
181
                        }
32✔
182
                }
183

184
                if n, err := c.writeCompressedPacket(buf.Bytes(), uncompressedLen); err != nil {
24,767✔
185
                        // To allow returning ErrBadConn when sending really 0 bytes, we sum
1✔
186
                        // up compressed bytes that is returned by underlying Write().
1✔
187
                        return totalBytes - len(packets) + n, err
1✔
188
                }
1✔
189
                packets = packets[payloadLen:]
24,765✔
190
        }
191

192
        return totalBytes, nil
24,669✔
193
}
194

195
// writeCompressedPacket writes a compressed packet with header.
196
// data should start with 7 size space for header followed by payload.
197
func (c *compIO) writeCompressedPacket(data []byte, uncompressedLen int) (int, error) {
24,766✔
198
        mc := c.mc
24,766✔
199
        comprLength := len(data) - 7
24,766✔
200
        if debug {
24,766✔
201
                fmt.Printf(
×
NEW
202
                        "writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v\n",
×
203
                        comprLength, uncompressedLen, mc.compressSequence)
×
204
        }
×
205

206
        // compression header
207
        putUint24(data[0:3], comprLength)
24,766✔
208
        data[3] = mc.compressSequence
24,766✔
209
        putUint24(data[4:7], uncompressedLen)
24,766✔
210

24,766✔
211
        mc.compressSequence++
24,766✔
212
        return mc.writeWithTimeout(data)
24,766✔
213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc