• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 4797094592

25 Apr 2023 11:50AM UTC coverage: 75.847% (+0.03%) from 75.813%
4797094592

push

github

finley
fix Error()

6224 of 8206 relevant lines covered (75.85%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.32
/redis/parser/parser.go
1
package parser
2

3
import (
4
        "bufio"
5
        "bytes"
6
        "errors"
7
        "github.com/hdt3213/godis/interface/redis"
8
        "github.com/hdt3213/godis/lib/logger"
9
        "github.com/hdt3213/godis/redis/protocol"
10
        "io"
11
        "runtime/debug"
12
        "strconv"
13
        "strings"
14
)
15

16
// Payload stores redis.Reply or error
17
type Payload struct {
18
        Data redis.Reply
19
        Err  error
20
}
21

22
// ParseStream reads data from io.Reader and send payloads through channel
23
func ParseStream(reader io.Reader) <-chan *Payload {
1✔
24
        ch := make(chan *Payload)
1✔
25
        go parse0(reader, ch)
1✔
26
        return ch
1✔
27
}
1✔
28

29
// ParseBytes reads data from []byte and return all replies
30
func ParseBytes(data []byte) ([]redis.Reply, error) {
×
31
        ch := make(chan *Payload)
×
32
        reader := bytes.NewReader(data)
×
33
        go parse0(reader, ch)
×
34
        var results []redis.Reply
×
35
        for payload := range ch {
×
36
                if payload == nil {
×
37
                        return nil, errors.New("no protocol")
×
38
                }
×
39
                if payload.Err != nil {
×
40
                        if payload.Err == io.EOF {
×
41
                                break
×
42
                        }
43
                        return nil, payload.Err
×
44
                }
45
                results = append(results, payload.Data)
×
46
        }
47
        return results, nil
×
48
}
49

50
// ParseOne reads data from []byte and return the first payload
51
func ParseOne(data []byte) (redis.Reply, error) {
1✔
52
        ch := make(chan *Payload)
1✔
53
        reader := bytes.NewReader(data)
1✔
54
        go parse0(reader, ch)
1✔
55
        payload := <-ch // parse0 will close the channel
1✔
56
        if payload == nil {
1✔
57
                return nil, errors.New("no protocol")
×
58
        }
×
59
        return payload.Data, payload.Err
1✔
60
}
61

62
func parse0(rawReader io.Reader, ch chan<- *Payload) {
1✔
63
        defer func() {
2✔
64
                if err := recover(); err != nil {
1✔
65
                        logger.Error(err, string(debug.Stack()))
×
66
                }
×
67
        }()
68
        reader := bufio.NewReader(rawReader)
1✔
69
        for {
2✔
70
                line, err := reader.ReadBytes('\n')
1✔
71
                if err != nil {
2✔
72
                        ch <- &Payload{Err: err}
1✔
73
                        close(ch)
1✔
74
                        return
1✔
75
                }
1✔
76
                length := len(line)
1✔
77
                if length <= 2 || line[length-2] != '\r' {
1✔
78
                        // there are some empty lines within replication traffic, ignore this error
×
79
                        //protocolError(ch, "empty line")
×
80
                        continue
×
81
                }
82
                line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
1✔
83
                switch line[0] {
1✔
84
                case '+':
1✔
85
                        content := string(line[1:])
1✔
86
                        ch <- &Payload{
1✔
87
                                Data: protocol.MakeStatusReply(content),
1✔
88
                        }
1✔
89
                        if strings.HasPrefix(content, "FULLRESYNC") {
1✔
90
                                err = parseRDBBulkString(reader, ch)
×
91
                                if err != nil {
×
92
                                        ch <- &Payload{Err: err}
×
93
                                        close(ch)
×
94
                                        return
×
95
                                }
×
96
                        }
97
                case '-':
1✔
98
                        ch <- &Payload{
1✔
99
                                Data: protocol.MakeErrReply(string(line[1:])),
1✔
100
                        }
1✔
101
                case ':':
1✔
102
                        value, err := strconv.ParseInt(string(line[1:]), 10, 64)
1✔
103
                        if err != nil {
1✔
104
                                protocolError(ch, "illegal number "+string(line[1:]))
×
105
                                continue
×
106
                        }
107
                        ch <- &Payload{
1✔
108
                                Data: protocol.MakeIntReply(value),
1✔
109
                        }
1✔
110
                case '$':
1✔
111
                        err = parseBulkString(line, reader, ch)
1✔
112
                        if err != nil {
1✔
113
                                ch <- &Payload{Err: err}
×
114
                                close(ch)
×
115
                                return
×
116
                        }
×
117
                case '*':
1✔
118
                        err = parseArray(line, reader, ch)
1✔
119
                        if err != nil {
1✔
120
                                ch <- &Payload{Err: err}
×
121
                                close(ch)
×
122
                                return
×
123
                        }
×
124
                default:
1✔
125
                        args := bytes.Split(line, []byte{' '})
1✔
126
                        ch <- &Payload{
1✔
127
                                Data: protocol.MakeMultiBulkReply(args),
1✔
128
                        }
1✔
129
                }
130
        }
131
}
132

133
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
1✔
134
        strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
1✔
135
        if err != nil || strLen < -1 {
1✔
136
                protocolError(ch, "illegal bulk string header: "+string(header))
×
137
                return nil
×
138
        } else if strLen == -1 {
2✔
139
                ch <- &Payload{
1✔
140
                        Data: protocol.MakeNullBulkReply(),
1✔
141
                }
1✔
142
                return nil
1✔
143
        }
1✔
144
        body := make([]byte, strLen+2)
1✔
145
        _, err = io.ReadFull(reader, body)
1✔
146
        if err != nil {
1✔
147
                return err
×
148
        }
×
149
        ch <- &Payload{
1✔
150
                Data: protocol.MakeBulkReply(body[:len(body)-2]),
1✔
151
        }
1✔
152
        return nil
1✔
153
}
154

155
// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently
156
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {
×
157
        header, err := reader.ReadBytes('\n')
×
158
        header = bytes.TrimSuffix(header, []byte{'\r', '\n'})
×
159
        if len(header) == 0 {
×
160
                return errors.New("empty header")
×
161
        }
×
162
        strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
×
163
        if err != nil || strLen <= 0 {
×
164
                return errors.New("illegal bulk header: " + string(header))
×
165
        }
×
166
        body := make([]byte, strLen)
×
167
        _, err = io.ReadFull(reader, body)
×
168
        if err != nil {
×
169
                return err
×
170
        }
×
171
        ch <- &Payload{
×
172
                Data: protocol.MakeBulkReply(body[:len(body)]),
×
173
        }
×
174
        return nil
×
175
}
176

177
func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
1✔
178
        nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)
1✔
179
        if err != nil || nStrs < 0 {
1✔
180
                protocolError(ch, "illegal array header "+string(header[1:]))
×
181
                return nil
×
182
        } else if nStrs == 0 {
2✔
183
                ch <- &Payload{
1✔
184
                        Data: protocol.MakeEmptyMultiBulkReply(),
1✔
185
                }
1✔
186
                return nil
1✔
187
        }
1✔
188
        lines := make([][]byte, 0, nStrs)
1✔
189
        for i := int64(0); i < nStrs; i++ {
2✔
190
                var line []byte
1✔
191
                line, err = reader.ReadBytes('\n')
1✔
192
                if err != nil {
1✔
193
                        return err
×
194
                }
×
195
                length := len(line)
1✔
196
                if length < 4 || line[length-2] != '\r' || line[0] != '$' {
1✔
197
                        protocolError(ch, "illegal bulk string header "+string(line))
×
198
                        break
×
199
                }
200
                strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)
1✔
201
                if err != nil || strLen < -1 {
1✔
202
                        protocolError(ch, "illegal bulk string length "+string(line))
×
203
                        break
×
204
                } else if strLen == -1 {
1✔
205
                        lines = append(lines, []byte{})
×
206
                } else {
1✔
207
                        body := make([]byte, strLen+2)
1✔
208
                        _, err := io.ReadFull(reader, body)
1✔
209
                        if err != nil {
1✔
210
                                return err
×
211
                        }
×
212
                        lines = append(lines, body[:len(body)-2])
1✔
213
                }
214
        }
215
        ch <- &Payload{
1✔
216
                Data: protocol.MakeMultiBulkReply(lines),
1✔
217
        }
1✔
218
        return nil
1✔
219
}
220

221
func protocolError(ch chan<- *Payload, msg string) {
×
222
        err := errors.New("protocol error: " + msg)
×
223
        ch <- &Payload{Err: err}
×
224
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc