• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

go-sql-driver / mysql / 14310852047

07 Apr 2025 01:44PM UTC coverage: 82.857% (-0.1%) from 82.959%
14310852047

Pull #1681

github

shogo82148
README.md: update Go version requirement to 1.22 or higher
Pull Request #1681: add Go 1.24 to the test matrix

3248 of 3920 relevant lines covered (82.86%)

2441799.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.71
/compress.go
1
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package
2
//
3
// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved.
4
//
5
// This Source Code Form is subject to the terms of the Mozilla Public
6
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
7
// You can obtain one at http://mozilla.org/MPL/2.0/.
8

9
package mysql
10

11
import (
12
        "bytes"
13
        "compress/zlib"
14
        "fmt"
15
        "io"
16
        "sync"
17
)
18

19
var (
20
        zrPool *sync.Pool // Do not use directly. Use zDecompress() instead.
21
        zwPool *sync.Pool // Do not use directly. Use zCompress() instead.
22
)
23

24
func init() {
32✔
25
        zrPool = &sync.Pool{
32✔
26
                New: func() any { return nil },
3,165✔
27
        }
28
        zwPool = &sync.Pool{
32✔
29
                New: func() any {
345✔
30
                        zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2)
313✔
31
                        if err != nil {
313✔
32
                                panic(err) // compress/zlib return non-nil error only if level is invalid
×
33
                        }
34
                        return zw
313✔
35
                },
36
        }
37
}
38

39
func zDecompress(src []byte, dst *bytes.Buffer) (int, error) {
10,791✔
40
        br := bytes.NewReader(src)
10,791✔
41
        var zr io.ReadCloser
10,791✔
42
        var err error
10,791✔
43

10,791✔
44
        if a := zrPool.Get(); a == nil {
13,924✔
45
                if zr, err = zlib.NewReader(br); err != nil {
3,133✔
46
                        return 0, err
×
47
                }
×
48
        } else {
7,658✔
49
                zr = a.(io.ReadCloser)
7,658✔
50
                if err := zr.(zlib.Resetter).Reset(br, nil); err != nil {
7,658✔
51
                        return 0, err
×
52
                }
×
53
        }
54

55
        n, _ := dst.ReadFrom(zr) // ignore err because zr.Close() will return it again.
10,791✔
56
        err = zr.Close()         // zr.Close() may return chuecksum error.
10,791✔
57
        zrPool.Put(zr)
10,791✔
58
        return int(n), err
10,791✔
59
}
60

61
func zCompress(src []byte, dst io.Writer) error {
544✔
62
        zw := zwPool.Get().(*zlib.Writer)
544✔
63
        zw.Reset(dst)
544✔
64
        if _, err := zw.Write(src); err != nil {
544✔
65
                return err
×
66
        }
×
67
        err := zw.Close()
544✔
68
        zwPool.Put(zw)
544✔
69
        return err
544✔
70
}
71

72
type compIO struct {
73
        mc   *mysqlConn
74
        buff bytes.Buffer
75
}
76

77
func newCompIO(mc *mysqlConn) *compIO {
3,618✔
78
        return &compIO{
3,618✔
79
                mc: mc,
3,618✔
80
        }
3,618✔
81
}
3,618✔
82

83
func (c *compIO) reset() {
17,768✔
84
        c.buff.Reset()
17,768✔
85
}
17,768✔
86

87
func (c *compIO) readNext(need int, r readerFunc) ([]byte, error) {
16,894,866✔
88
        for c.buff.Len() < need {
16,914,490✔
89
                if err := c.readCompressedPacket(r); err != nil {
19,624✔
90
                        return nil, err
×
91
                }
×
92
        }
93
        data := c.buff.Next(need)
16,894,866✔
94
        return data[:need:need], nil // prevent caller writes into c.buff
16,894,866✔
95
}
96

97
func (c *compIO) readCompressedPacket(r readerFunc) error {
19,624✔
98
        header, err := c.mc.buf.readNext(7, r) // size of compressed header
19,624✔
99
        if err != nil {
19,624✔
100
                return err
×
101
        }
×
102
        _ = header[6] // bounds check hint to compiler; guaranteed by readNext
19,624✔
103

19,624✔
104
        // compressed header structure
19,624✔
105
        comprLength := getUint24(header[0:3])
19,624✔
106
        compressionSequence := uint8(header[3])
19,624✔
107
        uncompressedLength := getUint24(header[4:7])
19,624✔
108
        if debug {
19,624✔
109
                fmt.Printf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n",
×
110
                        comprLength, uncompressedLength, compressionSequence, c.mc.sequence)
×
111
        }
×
112
        // Do not return ErrPktSync here.
113
        // Server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes)
114
        // before receiving all packets from client. In this case, seqnr is younger than expected.
115
        // NOTE: Both of mariadbclient and mysqlclient do not check seqnr. Only server checks it.
116
        if debug && compressionSequence != c.mc.sequence {
19,624✔
117
                fmt.Printf("WARN: unexpected cmpress seq nr: expected %v, got %v",
×
118
                        c.mc.sequence, compressionSequence)
×
119
        }
×
120
        c.mc.sequence = compressionSequence + 1
19,624✔
121
        c.mc.compressSequence = c.mc.sequence
19,624✔
122

19,624✔
123
        comprData, err := c.mc.buf.readNext(comprLength, r)
19,624✔
124
        if err != nil {
19,624✔
125
                return err
×
126
        }
×
127

128
        // if payload is uncompressed, its length will be specified as zero, and its
129
        // true length is contained in comprLength
130
        if uncompressedLength == 0 {
28,457✔
131
                c.buff.Write(comprData)
8,833✔
132
                return nil
8,833✔
133
        }
8,833✔
134

135
        // use existing capacity in bytesBuf if possible
136
        c.buff.Grow(uncompressedLength)
10,791✔
137
        nread, err := zDecompress(comprData, &c.buff)
10,791✔
138
        if err != nil {
10,791✔
139
                return err
×
140
        }
×
141
        if nread != uncompressedLength {
10,791✔
142
                return fmt.Errorf("invalid compressed packet: uncompressed length in header is %d, actual %d",
×
143
                        uncompressedLength, nread)
×
144
        }
×
145
        return nil
10,791✔
146
}
147

148
const minCompressLength = 150
149
const maxPayloadLen = maxPacketSize - 4
150

151
// writePackets sends one or some packets with compression.
152
// Use this instead of mc.netConn.Write() when mc.compress is true.
153
func (c *compIO) writePackets(packets []byte) (int, error) {
24,899✔
154
        totalBytes := len(packets)
24,899✔
155
        blankHeader := make([]byte, 7)
24,899✔
156
        buf := &c.buff
24,899✔
157

24,899✔
158
        for len(packets) > 0 {
49,894✔
159
                payloadLen := min(maxPayloadLen, len(packets))
24,995✔
160
                payload := packets[:payloadLen]
24,995✔
161
                uncompressedLen := payloadLen
24,995✔
162

24,995✔
163
                buf.Reset()
24,995✔
164
                buf.Write(blankHeader) // Buffer.Write() never returns error
24,995✔
165

24,995✔
166
                // If payload is less than minCompressLength, don't compress.
24,995✔
167
                if uncompressedLen < minCompressLength {
49,446✔
168
                        buf.Write(payload)
24,451✔
169
                        uncompressedLen = 0
24,451✔
170
                } else {
24,995✔
171
                        err := zCompress(payload, buf)
544✔
172
                        if debug && err != nil {
544✔
173
                                fmt.Printf("zCompress error: %v", err)
×
174
                        }
×
175
                        // do not compress if compressed data is larger than uncompressed data
176
                        // I intentionally miss 7 byte header in the buf; zCompress must compress more than 7 bytes.
177
                        if err != nil || buf.Len() >= uncompressedLen {
576✔
178
                                buf.Reset()
32✔
179
                                buf.Write(blankHeader)
32✔
180
                                buf.Write(payload)
32✔
181
                                uncompressedLen = 0
32✔
182
                        }
32✔
183
                }
184

185
                if n, err := c.writeCompressedPacket(buf.Bytes(), uncompressedLen); err != nil {
24,995✔
186
                        // To allow returning ErrBadConn when sending really 0 bytes, we sum
×
187
                        // up compressed bytes that is returned by underlying Write().
×
188
                        return totalBytes - len(packets) + n, err
×
189
                }
×
190
                packets = packets[payloadLen:]
24,995✔
191
        }
192

193
        return totalBytes, nil
24,899✔
194
}
195

196
// writeCompressedPacket writes a compressed packet with header.
197
// data should start with 7 size space for header followed by payload.
198
func (c *compIO) writeCompressedPacket(data []byte, uncompressedLen int) (int, error) {
24,995✔
199
        mc := c.mc
24,995✔
200
        comprLength := len(data) - 7
24,995✔
201
        if debug {
24,995✔
202
                fmt.Printf(
×
203
                        "writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v",
×
204
                        comprLength, uncompressedLen, mc.compressSequence)
×
205
        }
×
206

207
        // compression header
208
        putUint24(data[0:3], comprLength)
24,995✔
209
        data[3] = mc.compressSequence
24,995✔
210
        putUint24(data[4:7], uncompressedLen)
24,995✔
211

24,995✔
212
        mc.compressSequence++
24,995✔
213
        return mc.writeWithTimeout(data)
24,995✔
214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc