• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13363030201

15 Feb 2025 01:54AM UTC coverage: 84.426% (-1.1%) from 85.509%
13363030201

push

github

web-flow
NRG: Invalidate pending append entries cache (#6513)

The `n.pae` is an in-memory cache of pending but not yet applied
entries. When applying commits we can pull from this cache so we don't
need to pull them from disk for example. However, the cache has a
bounded size. So if the cache would be fully filled and we'd store a
different entry at an index that was cached, we'd apply the wrong
(cached) entry.

If we get an entry that we can't cache because it's full, we can simply
drop the entry from the cache if it exists. If an entry at this index
doesn't exist it's a noop, but if it did exist then it clears up room in
the cache for the next entries to be stored.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

68197 of 80777 relevant lines covered (84.43%)

863883.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.57
/server/parser.go
1
// Copyright 2012-2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
}
36

37
type pubArg struct {
38
        arg     []byte
39
        pacache []byte
40
        origin  []byte
41
        account []byte
42
        subject []byte
43
        deliver []byte
44
        mapped  []byte
45
        reply   []byte
46
        szb     []byte
47
        hdb     []byte
48
        queues  [][]byte
49
        size    int
50
        hdr     int
51
        psi     []*serviceImport
52
        trace   *msgTrace
53
}
54

55
// Parser constants
56
const (
57
        OP_START parserState = iota
58
        OP_PLUS
59
        OP_PLUS_O
60
        OP_PLUS_OK
61
        OP_MINUS
62
        OP_MINUS_E
63
        OP_MINUS_ER
64
        OP_MINUS_ERR
65
        OP_MINUS_ERR_SPC
66
        MINUS_ERR_ARG
67
        OP_C
68
        OP_CO
69
        OP_CON
70
        OP_CONN
71
        OP_CONNE
72
        OP_CONNEC
73
        OP_CONNECT
74
        CONNECT_ARG
75
        OP_H
76
        OP_HP
77
        OP_HPU
78
        OP_HPUB
79
        OP_HPUB_SPC
80
        HPUB_ARG
81
        OP_HM
82
        OP_HMS
83
        OP_HMSG
84
        OP_HMSG_SPC
85
        HMSG_ARG
86
        OP_P
87
        OP_PU
88
        OP_PUB
89
        OP_PUB_SPC
90
        PUB_ARG
91
        OP_PI
92
        OP_PIN
93
        OP_PING
94
        OP_PO
95
        OP_PON
96
        OP_PONG
97
        MSG_PAYLOAD
98
        MSG_END_R
99
        MSG_END_N
100
        OP_S
101
        OP_SU
102
        OP_SUB
103
        OP_SUB_SPC
104
        SUB_ARG
105
        OP_A
106
        OP_ASUB
107
        OP_ASUB_SPC
108
        ASUB_ARG
109
        OP_AUSUB
110
        OP_AUSUB_SPC
111
        AUSUB_ARG
112
        OP_L
113
        OP_LS
114
        OP_R
115
        OP_RS
116
        OP_U
117
        OP_UN
118
        OP_UNS
119
        OP_UNSU
120
        OP_UNSUB
121
        OP_UNSUB_SPC
122
        UNSUB_ARG
123
        OP_M
124
        OP_MS
125
        OP_MSG
126
        OP_MSG_SPC
127
        MSG_ARG
128
        OP_I
129
        OP_IN
130
        OP_INF
131
        OP_INFO
132
        INFO_ARG
133
)
134

135
func (c *client) parse(buf []byte) error {
11,446,139✔
136
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
11,446,139✔
137
        // an issue (say data race detection), we could branch outside in readLoop
11,446,139✔
138
        if c.isMqtt() {
11,447,816✔
139
                return c.mqttParse(buf)
1,677✔
140
        }
1,677✔
141
        var i int
11,444,462✔
142
        var b byte
11,444,462✔
143
        var lmsg bool
11,444,462✔
144

11,444,462✔
145
        // Snapshots
11,444,462✔
146
        c.mu.Lock()
11,444,462✔
147
        // Snapshot and then reset when we receive a
11,444,462✔
148
        // proper CONNECT if needed.
11,444,462✔
149
        authSet := c.awaitingAuth()
11,444,462✔
150
        // Snapshot max control line as well.
11,444,462✔
151
        s, mcl, trace := c.srv, c.mcl, c.trace
11,444,462✔
152
        c.mu.Unlock()
11,444,462✔
153

11,444,462✔
154
        // Move to loop instead of range syntax to allow jumping of i
11,444,462✔
155
        for i = 0; i < len(buf); i++ {
1,816,740,889✔
156
                b = buf[i]
1,805,296,427✔
157

1,805,296,427✔
158
                switch c.state {
1,805,296,427✔
159
                case OP_START:
34,325,610✔
160
                        c.op = b
34,325,610✔
161
                        if b != 'C' && b != 'c' {
68,614,411✔
162
                                if authSet {
34,288,815✔
163
                                        if s == nil {
14✔
164
                                                goto authErr
×
165
                                        }
166
                                        var ok bool
14✔
167
                                        // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
14✔
168
                                        // E.g. telnet proto demos.
14✔
169
                                        if noAuthUser := s.getOpts().NoAuthUser; noAuthUser != _EMPTY_ {
16✔
170
                                                s.mu.Lock()
2✔
171
                                                user, exists := s.users[noAuthUser]
2✔
172
                                                s.mu.Unlock()
2✔
173
                                                if exists {
4✔
174
                                                        c.RegisterUser(user)
2✔
175
                                                        c.mu.Lock()
2✔
176
                                                        c.clearAuthTimer()
2✔
177
                                                        c.flags.set(connectReceived)
2✔
178
                                                        c.mu.Unlock()
2✔
179
                                                        authSet, ok = false, true
2✔
180
                                                }
2✔
181
                                        }
182
                                        if !ok {
26✔
183
                                                goto authErr
12✔
184
                                        }
185
                                }
186
                                // If the connection is a gateway connection, make sure that
187
                                // if this is an inbound, it starts with a CONNECT.
188
                                if c.kind == GATEWAY && !c.gw.outbound && !c.gw.connected {
34,288,789✔
189
                                        // Use auth violation since no CONNECT was sent.
×
190
                                        // It could be a parseErr too.
×
191
                                        goto authErr
×
192
                                }
193
                        }
194
                        switch b {
34,325,598✔
195
                        case 'P', 'p':
11,953,244✔
196
                                c.state = OP_P
11,953,244✔
197
                        case 'H', 'h':
8,264,154✔
198
                                c.state = OP_H
8,264,154✔
199
                        case 'S', 's':
138,205✔
200
                                c.state = OP_S
138,205✔
201
                        case 'U', 'u':
15,002✔
202
                                c.state = OP_U
15,002✔
203
                        case 'R', 'r':
13,679,000✔
204
                                if c.kind == CLIENT {
13,679,000✔
205
                                        goto parseErr
×
206
                                } else {
13,679,000✔
207
                                        c.state = OP_R
13,679,000✔
208
                                }
13,679,000✔
209
                        case 'L', 'l':
164,524✔
210
                                if c.kind != LEAF && c.kind != ROUTER {
164,524✔
211
                                        goto parseErr
×
212
                                } else {
164,524✔
213
                                        c.state = OP_L
164,524✔
214
                                }
164,524✔
215
                        case 'A', 'a':
32✔
216
                                if c.kind == CLIENT {
32✔
217
                                        goto parseErr
×
218
                                } else {
32✔
219
                                        c.state = OP_A
32✔
220
                                }
32✔
221
                        case 'C', 'c':
36,809✔
222
                                c.state = OP_C
36,809✔
223
                        case 'I', 'i':
74,433✔
224
                                c.state = OP_I
74,433✔
225
                        case '+':
3✔
226
                                c.state = OP_PLUS
3✔
227
                        case '-':
180✔
228
                                c.state = OP_MINUS
180✔
229
                        default:
12✔
230
                                goto parseErr
12✔
231
                        }
232
                case OP_H:
8,264,154✔
233
                        switch b {
8,264,154✔
234
                        case 'P', 'p':
4,887,395✔
235
                                c.state = OP_HP
4,887,395✔
236
                        case 'M', 'm':
3,376,759✔
237
                                c.state = OP_HM
3,376,759✔
238
                        default:
×
239
                                goto parseErr
×
240
                        }
241
                case OP_HP:
4,887,395✔
242
                        switch b {
4,887,395✔
243
                        case 'U', 'u':
4,887,395✔
244
                                c.state = OP_HPU
4,887,395✔
245
                        default:
×
246
                                goto parseErr
×
247
                        }
248
                case OP_HPU:
4,887,395✔
249
                        switch b {
4,887,395✔
250
                        case 'B', 'b':
4,887,395✔
251
                                c.state = OP_HPUB
4,887,395✔
252
                        default:
×
253
                                goto parseErr
×
254
                        }
255
                case OP_HPUB:
4,887,395✔
256
                        switch b {
4,887,395✔
257
                        case ' ', '\t':
4,887,395✔
258
                                c.state = OP_HPUB_SPC
4,887,395✔
259
                        default:
×
260
                                goto parseErr
×
261
                        }
262
                case OP_HPUB_SPC:
4,887,395✔
263
                        switch b {
4,887,395✔
264
                        case ' ', '\t':
×
265
                                continue
×
266
                        default:
4,887,395✔
267
                                c.pa.hdr = 0
4,887,395✔
268
                                c.state = HPUB_ARG
4,887,395✔
269
                                c.as = i
4,887,395✔
270
                        }
271
                case HPUB_ARG:
403,532,596✔
272
                        switch b {
403,532,596✔
273
                        case '\r':
4,887,395✔
274
                                c.drop = 1
4,887,395✔
275
                        case '\n':
4,887,395✔
276
                                var arg []byte
4,887,395✔
277
                                if c.argBuf != nil {
4,887,445✔
278
                                        arg = c.argBuf
50✔
279
                                        c.argBuf = nil
50✔
280
                                } else {
4,887,395✔
281
                                        arg = buf[c.as : i-c.drop]
4,887,345✔
282
                                }
4,887,345✔
283
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4,887,395✔
284
                                        return err
×
285
                                }
×
286
                                if trace {
4,887,401✔
287
                                        c.traceInOp("HPUB", arg)
6✔
288
                                }
6✔
289
                                var remaining []byte
4,887,395✔
290
                                if i < len(buf) {
9,774,790✔
291
                                        remaining = buf[i+1:]
4,887,395✔
292
                                }
4,887,395✔
293
                                if err := c.processHeaderPub(arg, remaining); err != nil {
4,887,399✔
294
                                        return err
4✔
295
                                }
4✔
296

297
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
4,887,391✔
298
                                // If we don't have a saved buffer then jump ahead with
4,887,391✔
299
                                // the index. If this overruns what is left we fall out
4,887,391✔
300
                                // and process split buffer.
4,887,391✔
301
                                if c.msgBuf == nil {
9,774,782✔
302
                                        i = c.as + c.pa.size - LEN_CR_LF
4,887,391✔
303
                                }
4,887,391✔
304
                        default:
393,757,806✔
305
                                if c.argBuf != nil {
393,758,739✔
306
                                        c.argBuf = append(c.argBuf, b)
933✔
307
                                }
933✔
308
                        }
309
                case OP_HM:
3,376,759✔
310
                        switch b {
3,376,759✔
311
                        case 'S', 's':
3,376,759✔
312
                                c.state = OP_HMS
3,376,759✔
313
                        default:
×
314
                                goto parseErr
×
315
                        }
316
                case OP_HMS:
3,376,759✔
317
                        switch b {
3,376,759✔
318
                        case 'G', 'g':
3,376,759✔
319
                                c.state = OP_HMSG
3,376,759✔
320
                        default:
×
321
                                goto parseErr
×
322
                        }
323
                case OP_HMSG:
3,376,759✔
324
                        switch b {
3,376,759✔
325
                        case ' ', '\t':
3,376,759✔
326
                                c.state = OP_HMSG_SPC
3,376,759✔
327
                        default:
×
328
                                goto parseErr
×
329
                        }
330
                case OP_HMSG_SPC:
3,376,759✔
331
                        switch b {
3,376,759✔
332
                        case ' ', '\t':
×
333
                                continue
×
334
                        default:
3,376,759✔
335
                                c.pa.hdr = 0
3,376,759✔
336
                                c.state = HMSG_ARG
3,376,759✔
337
                                c.as = i
3,376,759✔
338
                        }
339
                case HMSG_ARG:
325,513,202✔
340
                        switch b {
325,513,202✔
341
                        case '\r':
3,376,759✔
342
                                c.drop = 1
3,376,759✔
343
                        case '\n':
3,376,759✔
344
                                var arg []byte
3,376,759✔
345
                                if c.argBuf != nil {
3,418,407✔
346
                                        arg = c.argBuf
41,648✔
347
                                        c.argBuf = nil
41,648✔
348
                                } else {
3,376,759✔
349
                                        arg = buf[c.as : i-c.drop]
3,335,111✔
350
                                }
3,335,111✔
351
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
3,376,759✔
352
                                        return err
×
353
                                }
×
354
                                var err error
3,376,759✔
355
                                if c.kind == ROUTER || c.kind == GATEWAY {
6,753,136✔
356
                                        if trace {
3,378,518✔
357
                                                c.traceInOp("HMSG", arg)
2,141✔
358
                                        }
2,141✔
359
                                        err = c.processRoutedHeaderMsgArgs(arg)
3,376,377✔
360
                                } else if c.kind == LEAF {
764✔
361
                                        if trace {
532✔
362
                                                c.traceInOp("HMSG", arg)
150✔
363
                                        }
150✔
364
                                        err = c.processLeafHeaderMsgArgs(arg)
382✔
365
                                }
366
                                if err != nil {
3,376,760✔
367
                                        return err
1✔
368
                                }
1✔
369
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
3,376,758✔
370

3,376,758✔
371
                                // jump ahead with the index. If this overruns
3,376,758✔
372
                                // what is left we fall out and process split
3,376,758✔
373
                                // buffer.
3,376,758✔
374
                                i = c.as + c.pa.size - LEN_CR_LF
3,376,758✔
375
                        default:
318,759,684✔
376
                                if c.argBuf != nil {
320,655,487✔
377
                                        c.argBuf = append(c.argBuf, b)
1,895,803✔
378
                                }
1,895,803✔
379
                        }
380
                case OP_P:
11,953,243✔
381
                        switch b {
11,953,243✔
382
                        case 'U', 'u':
11,899,483✔
383
                                c.state = OP_PU
11,899,483✔
384
                        case 'I', 'i':
32,350✔
385
                                c.state = OP_PI
32,350✔
386
                        case 'O', 'o':
21,409✔
387
                                c.state = OP_PO
21,409✔
388
                        default:
1✔
389
                                goto parseErr
1✔
390
                        }
391
                case OP_PU:
11,899,479✔
392
                        switch b {
11,899,479✔
393
                        case 'B', 'b':
11,899,478✔
394
                                c.state = OP_PUB
11,899,478✔
395
                        default:
1✔
396
                                goto parseErr
1✔
397
                        }
398
                case OP_PUB:
11,899,478✔
399
                        switch b {
11,899,478✔
400
                        case ' ', '\t':
11,899,477✔
401
                                c.state = OP_PUB_SPC
11,899,477✔
402
                        default:
1✔
403
                                goto parseErr
1✔
404
                        }
405
                case OP_PUB_SPC:
11,899,477✔
406
                        switch b {
11,899,477✔
407
                        case ' ', '\t':
1✔
408
                                continue
1✔
409
                        default:
11,899,476✔
410
                                c.pa.hdr = -1
11,899,476✔
411
                                c.state = PUB_ARG
11,899,476✔
412
                                c.as = i
11,899,476✔
413
                        }
414
                case PUB_ARG:
232,813,834✔
415
                        switch b {
232,813,834✔
416
                        case '\r':
11,899,466✔
417
                                c.drop = 1
11,899,466✔
418
                        case '\n':
11,899,466✔
419
                                var arg []byte
11,899,466✔
420
                                if c.argBuf != nil {
11,935,722✔
421
                                        arg = c.argBuf
36,256✔
422
                                        c.argBuf = nil
36,256✔
423
                                } else {
11,899,466✔
424
                                        arg = buf[c.as : i-c.drop]
11,863,210✔
425
                                }
11,863,210✔
426
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
11,899,467✔
427
                                        return err
1✔
428
                                }
1✔
429
                                if trace {
12,169,740✔
430
                                        c.traceInOp("PUB", arg)
270,275✔
431
                                }
270,275✔
432
                                if err := c.processPub(arg); err != nil {
11,899,476✔
433
                                        return err
11✔
434
                                }
11✔
435

436
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
11,899,454✔
437
                                // If we don't have a saved buffer then jump ahead with
11,899,454✔
438
                                // the index. If this overruns what is left we fall out
11,899,454✔
439
                                // and process split buffer.
11,899,454✔
440
                                if c.msgBuf == nil {
23,798,908✔
441
                                        i = c.as + c.pa.size - LEN_CR_LF
11,899,454✔
442
                                }
11,899,454✔
443
                        default:
209,014,902✔
444
                                if c.argBuf != nil {
209,910,573✔
445
                                        c.argBuf = append(c.argBuf, b)
895,671✔
446
                                }
895,671✔
447
                        }
448
                case MSG_PAYLOAD:
33,013,099✔
449
                        if c.msgBuf != nil {
34,836,151✔
450
                                // copy as much as we can to the buffer and skip ahead.
1,823,052✔
451
                                toCopy := c.pa.size - len(c.msgBuf)
1,823,052✔
452
                                avail := len(buf) - i
1,823,052✔
453
                                if avail < toCopy {
1,970,630✔
454
                                        toCopy = avail
147,578✔
455
                                }
147,578✔
456
                                if toCopy > 0 {
3,646,104✔
457
                                        start := len(c.msgBuf)
1,823,052✔
458
                                        // This is needed for copy to work.
1,823,052✔
459
                                        c.msgBuf = c.msgBuf[:start+toCopy]
1,823,052✔
460
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
1,823,052✔
461
                                        // Update our index
1,823,052✔
462
                                        i = (i + toCopy) - 1
1,823,052✔
463
                                } else {
1,823,052✔
464
                                        // Fall back to append if needed.
×
465
                                        c.msgBuf = append(c.msgBuf, b)
×
466
                                }
×
467
                                if len(c.msgBuf) >= c.pa.size {
3,498,526✔
468
                                        c.state = MSG_END_R
1,675,474✔
469
                                }
1,675,474✔
470
                        } else if i-c.as+1 >= c.pa.size {
62,380,094✔
471
                                c.state = MSG_END_R
31,190,047✔
472
                        }
31,190,047✔
473
                case MSG_END_R:
32,865,521✔
474
                        if b != '\r' {
32,865,524✔
475
                                goto parseErr
3✔
476
                        }
477
                        if c.msgBuf != nil {
34,543,664✔
478
                                c.msgBuf = append(c.msgBuf, b)
1,678,146✔
479
                        }
1,678,146✔
480
                        c.state = MSG_END_N
32,865,518✔
481
                case MSG_END_N:
32,865,505✔
482
                        if b != '\n' {
32,865,506✔
483
                                goto parseErr
1✔
484
                        }
485
                        if c.msgBuf != nil {
34,545,854✔
486
                                c.msgBuf = append(c.msgBuf, b)
1,680,350✔
487
                        } else {
32,865,504✔
488
                                c.msgBuf = buf[c.as : i+1]
31,185,154✔
489
                        }
31,185,154✔
490

491
                        var mt *msgTrace
32,865,504✔
492
                        if c.pa.hdr > 0 {
41,130,069✔
493
                                mt = c.initMsgTrace()
8,264,565✔
494
                        }
8,264,565✔
495
                        // Check for mappings.
496
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
32,889,885✔
497
                                changed := c.selectMappedSubject()
24,381✔
498
                                if changed {
36,936✔
499
                                        if trace {
12,627✔
500
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
72✔
501
                                        }
72✔
502
                                        // c.pa.subject is the subject the original is now mapped to.
503
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,555✔
504
                                }
505
                        }
506
                        if trace {
33,144,921✔
507
                                c.traceMsg(c.msgBuf)
279,417✔
508
                        }
279,417✔
509

510
                        c.processInboundMsg(c.msgBuf)
32,865,504✔
511

32,865,504✔
512
                        mt.sendEvent()
32,865,504✔
513
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
32,865,504✔
514
                        c.drop, c.as, c.state = 0, i+1, OP_START
32,865,504✔
515
                        // Drop all pub args
32,865,504✔
516
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
32,865,504✔
517
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
32,865,504✔
518
                        c.pa.trace = nil
32,865,504✔
519
                        lmsg = false
32,865,504✔
520
                case OP_A:
32✔
521
                        switch b {
32✔
522
                        case '+':
4✔
523
                                c.state = OP_ASUB
4✔
524
                        case '-', 'u':
28✔
525
                                c.state = OP_AUSUB
28✔
526
                        default:
×
527
                                goto parseErr
×
528
                        }
529
                case OP_ASUB:
4✔
530
                        switch b {
4✔
531
                        case ' ', '\t':
4✔
532
                                c.state = OP_ASUB_SPC
4✔
533
                        default:
×
534
                                goto parseErr
×
535
                        }
536
                case OP_ASUB_SPC:
4✔
537
                        switch b {
4✔
538
                        case ' ', '\t':
×
539
                                continue
×
540
                        default:
4✔
541
                                c.state = ASUB_ARG
4✔
542
                                c.as = i
4✔
543
                        }
544
                case ASUB_ARG:
14✔
545
                        switch b {
14✔
546
                        case '\r':
4✔
547
                                c.drop = 1
4✔
548
                        case '\n':
4✔
549
                                var arg []byte
4✔
550
                                if c.argBuf != nil {
4✔
551
                                        arg = c.argBuf
×
552
                                        c.argBuf = nil
×
553
                                } else {
4✔
554
                                        arg = buf[c.as : i-c.drop]
4✔
555
                                }
4✔
556
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
557
                                        return err
×
558
                                }
×
559
                                if trace {
4✔
560
                                        c.traceInOp("A+", arg)
×
561
                                }
×
562
                                if err := c.processAccountSub(arg); err != nil {
4✔
563
                                        return err
×
564
                                }
×
565
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
566
                        default:
6✔
567
                                if c.argBuf != nil {
6✔
568
                                        c.argBuf = append(c.argBuf, b)
×
569
                                }
×
570
                        }
571
                case OP_AUSUB:
28✔
572
                        switch b {
28✔
573
                        case ' ', '\t':
28✔
574
                                c.state = OP_AUSUB_SPC
28✔
575
                        default:
×
576
                                goto parseErr
×
577
                        }
578
                case OP_AUSUB_SPC:
28✔
579
                        switch b {
28✔
580
                        case ' ', '\t':
×
581
                                continue
×
582
                        default:
28✔
583
                                c.state = AUSUB_ARG
28✔
584
                                c.as = i
28✔
585
                        }
586
                case AUSUB_ARG:
142✔
587
                        switch b {
142✔
588
                        case '\r':
28✔
589
                                c.drop = 1
28✔
590
                        case '\n':
28✔
591
                                var arg []byte
28✔
592
                                if c.argBuf != nil {
28✔
593
                                        arg = c.argBuf
×
594
                                        c.argBuf = nil
×
595
                                } else {
28✔
596
                                        arg = buf[c.as : i-c.drop]
28✔
597
                                }
28✔
598
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
599
                                        return err
×
600
                                }
×
601
                                if trace {
45✔
602
                                        c.traceInOp("A-", arg)
17✔
603
                                }
17✔
604
                                c.processAccountUnsub(arg)
28✔
605
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
606
                        default:
86✔
607
                                if c.argBuf != nil {
86✔
608
                                        c.argBuf = append(c.argBuf, b)
×
609
                                }
×
610
                        }
611
                case OP_S:
138,205✔
612
                        switch b {
138,205✔
613
                        case 'U', 'u':
138,204✔
614
                                c.state = OP_SU
138,204✔
615
                        default:
1✔
616
                                goto parseErr
1✔
617
                        }
618
                case OP_SU:
138,204✔
619
                        switch b {
138,204✔
620
                        case 'B', 'b':
138,203✔
621
                                c.state = OP_SUB
138,203✔
622
                        default:
1✔
623
                                goto parseErr
1✔
624
                        }
625
                case OP_SUB:
1,073,896✔
626
                        switch b {
1,073,896✔
627
                        case ' ', '\t':
1,073,894✔
628
                                c.state = OP_SUB_SPC
1,073,894✔
629
                        default:
2✔
630
                                goto parseErr
2✔
631
                        }
632
                case OP_SUB_SPC:
1,073,895✔
633
                        switch b {
1,073,895✔
634
                        case ' ', '\t':
1✔
635
                                continue
1✔
636
                        default:
1,073,894✔
637
                                c.state = SUB_ARG
1,073,894✔
638
                                c.as = i
1,073,894✔
639
                        }
640
                case SUB_ARG:
39,966,256✔
641
                        switch b {
39,966,256✔
642
                        case '\r':
1,073,879✔
643
                                c.drop = 1
1,073,879✔
644
                        case '\n':
1,073,878✔
645
                                var arg []byte
1,073,878✔
646
                                if c.argBuf != nil {
1,097,421✔
647
                                        arg = c.argBuf
23,543✔
648
                                        c.argBuf = nil
23,543✔
649
                                } else {
1,073,878✔
650
                                        arg = buf[c.as : i-c.drop]
1,050,335✔
651
                                }
1,050,335✔
652
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,073,878✔
653
                                        return err
×
654
                                }
×
655
                                var err error
1,073,878✔
656

1,073,878✔
657
                                switch c.kind {
1,073,878✔
658
                                case CLIENT:
138,199✔
659
                                        if trace {
150,765✔
660
                                                c.traceInOp("SUB", arg)
12,566✔
661
                                        }
12,566✔
662
                                        err = c.parseSub(arg, false)
138,199✔
663
                                case ROUTER:
747,009✔
664
                                        switch c.op {
747,009✔
665
                                        case 'R', 'r':
734,951✔
666
                                                if trace {
737,924✔
667
                                                        c.traceInOp("RS+", arg)
2,973✔
668
                                                }
2,973✔
669
                                                err = c.processRemoteSub(arg, false)
734,951✔
670
                                        case 'L', 'l':
12,058✔
671
                                                if trace {
12,252✔
672
                                                        c.traceInOp("LS+", arg)
194✔
673
                                                }
194✔
674
                                                err = c.processRemoteSub(arg, true)
12,058✔
675
                                        }
676
                                case GATEWAY:
157,005✔
677
                                        if trace {
157,314✔
678
                                                c.traceInOp("RS+", arg)
309✔
679
                                        }
309✔
680
                                        err = c.processGatewayRSub(arg)
157,005✔
681
                                case LEAF:
31,665✔
682
                                        if trace {
31,786✔
683
                                                c.traceInOp("LS+", arg)
121✔
684
                                        }
121✔
685
                                        err = c.processLeafSub(arg)
31,665✔
686
                                }
687
                                if err != nil {
1,073,882✔
688
                                        return err
4✔
689
                                }
4✔
690
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,073,874✔
691
                        default:
37,818,499✔
692
                                if c.argBuf != nil {
38,413,006✔
693
                                        c.argBuf = append(c.argBuf, b)
594,507✔
694
                                }
594,507✔
695
                        }
696
                case OP_L:
164,524✔
697
                        switch b {
164,524✔
698
                        case 'S', 's':
58,233✔
699
                                c.state = OP_LS
58,233✔
700
                        case 'M', 'm':
106,291✔
701
                                c.state = OP_M
106,291✔
702
                        default:
×
703
                                goto parseErr
×
704
                        }
705
                case OP_LS:
58,233✔
706
                        switch b {
58,233✔
707
                        case '+':
43,723✔
708
                                c.state = OP_SUB
43,723✔
709
                        case '-':
14,510✔
710
                                c.state = OP_UNSUB
14,510✔
711
                        default:
×
712
                                goto parseErr
×
713
                        }
714
                case OP_R:
13,679,000✔
715
                        switch b {
13,679,000✔
716
                        case 'S', 's':
1,083,333✔
717
                                c.state = OP_RS
1,083,333✔
718
                        case 'M', 'm':
12,595,667✔
719
                                c.state = OP_M
12,595,667✔
720
                        default:
×
721
                                goto parseErr
×
722
                        }
723
                case OP_RS:
1,083,332✔
724
                        switch b {
1,083,332✔
725
                        case '+':
891,971✔
726
                                c.state = OP_SUB
891,971✔
727
                        case '-':
191,361✔
728
                                c.state = OP_UNSUB
191,361✔
729
                        default:
×
730
                                goto parseErr
×
731
                        }
732
                case OP_U:
15,002✔
733
                        switch b {
15,002✔
734
                        case 'N', 'n':
15,001✔
735
                                c.state = OP_UN
15,001✔
736
                        default:
1✔
737
                                goto parseErr
1✔
738
                        }
739
                case OP_UN:
15,001✔
740
                        switch b {
15,001✔
741
                        case 'S', 's':
15,000✔
742
                                c.state = OP_UNS
15,000✔
743
                        default:
1✔
744
                                goto parseErr
1✔
745
                        }
746
                case OP_UNS:
15,000✔
747
                        switch b {
15,000✔
748
                        case 'U', 'u':
14,999✔
749
                                c.state = OP_UNSU
14,999✔
750
                        default:
1✔
751
                                goto parseErr
1✔
752
                        }
753
                case OP_UNSU:
14,999✔
754
                        switch b {
14,999✔
755
                        case 'B', 'b':
14,998✔
756
                                c.state = OP_UNSUB
14,998✔
757
                        default:
1✔
758
                                goto parseErr
1✔
759
                        }
760
                case OP_UNSUB:
220,869✔
761
                        switch b {
220,869✔
762
                        case ' ', '\t':
220,863✔
763
                                c.state = OP_UNSUB_SPC
220,863✔
764
                        default:
6✔
765
                                goto parseErr
6✔
766
                        }
767
                case OP_UNSUB_SPC:
220,880✔
768
                        switch b {
220,880✔
769
                        case ' ', '\t':
17✔
770
                                continue
17✔
771
                        default:
220,863✔
772
                                c.state = UNSUB_ARG
220,863✔
773
                                c.as = i
220,863✔
774
                        }
775
                case UNSUB_ARG:
6,033,546✔
776
                        switch b {
6,033,546✔
777
                        case '\r':
220,856✔
778
                                c.drop = 1
220,856✔
779
                        case '\n':
220,858✔
780
                                var arg []byte
220,858✔
781
                                if c.argBuf != nil {
227,196✔
782
                                        arg = c.argBuf
6,338✔
783
                                        c.argBuf = nil
6,338✔
784
                                } else {
220,858✔
785
                                        arg = buf[c.as : i-c.drop]
214,520✔
786
                                }
214,520✔
787
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
220,858✔
788
                                        return err
×
789
                                }
×
790
                                var err error
220,858✔
791

220,858✔
792
                                switch c.kind {
220,858✔
793
                                case CLIENT:
14,992✔
794
                                        if trace {
27,316✔
795
                                                c.traceInOp("UNSUB", arg)
12,324✔
796
                                        }
12,324✔
797
                                        err = c.processUnsub(arg)
14,992✔
798
                                case ROUTER:
160,387✔
799
                                        if trace && c.srv != nil {
160,782✔
800
                                                switch c.op {
395✔
801
                                                case 'R', 'r':
201✔
802
                                                        c.traceInOp("RS-", arg)
201✔
803
                                                case 'L', 'l':
194✔
804
                                                        c.traceInOp("LS-", arg)
194✔
805
                                                }
806
                                        }
807
                                        leafUnsub := c.op == 'L' || c.op == 'l'
160,387✔
808
                                        err = c.processRemoteUnsub(arg, leafUnsub)
160,387✔
809
                                case GATEWAY:
42,324✔
810
                                        if trace {
43,514✔
811
                                                c.traceInOp("RS-", arg)
1,190✔
812
                                        }
1,190✔
813
                                        err = c.processGatewayRUnsub(arg)
42,324✔
814
                                case LEAF:
3,155✔
815
                                        if trace {
3,155✔
816
                                                c.traceInOp("LS-", arg)
×
817
                                        }
×
818
                                        err = c.processLeafUnsub(arg)
3,155✔
819
                                }
820
                                if err != nil {
220,860✔
821
                                        return err
2✔
822
                                }
2✔
823
                                c.drop, c.as, c.state = 0, i+1, OP_START
220,856✔
824
                        default:
5,591,832✔
825
                                if c.argBuf != nil {
5,709,367✔
826
                                        c.argBuf = append(c.argBuf, b)
117,535✔
827
                                }
117,535✔
828
                        }
829
                case OP_PI:
32,350✔
830
                        switch b {
32,350✔
831
                        case 'N', 'n':
32,349✔
832
                                c.state = OP_PIN
32,349✔
833
                        default:
1✔
834
                                goto parseErr
1✔
835
                        }
836
                case OP_PIN:
32,349✔
837
                        switch b {
32,349✔
838
                        case 'G', 'g':
32,348✔
839
                                c.state = OP_PING
32,348✔
840
                        default:
1✔
841
                                goto parseErr
1✔
842
                        }
843
                case OP_PING:
64,701✔
844
                        switch b {
64,701✔
845
                        case '\n':
32,347✔
846
                                if trace {
32,883✔
847
                                        c.traceInOp("PING", nil)
536✔
848
                                }
536✔
849
                                c.processPing()
32,347✔
850
                                c.drop, c.state = 0, OP_START
32,347✔
851
                        }
852
                case OP_PO:
21,409✔
853
                        switch b {
21,409✔
854
                        case 'N', 'n':
21,408✔
855
                                c.state = OP_PON
21,408✔
856
                        default:
1✔
857
                                goto parseErr
1✔
858
                        }
859
                case OP_PON:
21,408✔
860
                        switch b {
21,408✔
861
                        case 'G', 'g':
21,407✔
862
                                c.state = OP_PONG
21,407✔
863
                        default:
1✔
864
                                goto parseErr
1✔
865
                        }
866
                case OP_PONG:
42,819✔
867
                        switch b {
42,819✔
868
                        case '\n':
21,406✔
869
                                if trace {
21,432✔
870
                                        c.traceInOp("PONG", nil)
26✔
871
                                }
26✔
872
                                c.processPong()
21,406✔
873
                                c.drop, c.state = 0, OP_START
21,406✔
874
                        }
875
                case OP_C:
36,809✔
876
                        switch b {
36,809✔
877
                        case 'O', 'o':
36,808✔
878
                                c.state = OP_CO
36,808✔
879
                        default:
1✔
880
                                goto parseErr
1✔
881
                        }
882
                case OP_CO:
36,808✔
883
                        switch b {
36,808✔
884
                        case 'N', 'n':
36,807✔
885
                                c.state = OP_CON
36,807✔
886
                        default:
1✔
887
                                goto parseErr
1✔
888
                        }
889
                case OP_CON:
36,807✔
890
                        switch b {
36,807✔
891
                        case 'N', 'n':
36,806✔
892
                                c.state = OP_CONN
36,806✔
893
                        default:
1✔
894
                                goto parseErr
1✔
895
                        }
896
                case OP_CONN:
36,806✔
897
                        switch b {
36,806✔
898
                        case 'E', 'e':
36,805✔
899
                                c.state = OP_CONNE
36,805✔
900
                        default:
1✔
901
                                goto parseErr
1✔
902
                        }
903
                case OP_CONNE:
36,805✔
904
                        switch b {
36,805✔
905
                        case 'C', 'c':
36,804✔
906
                                c.state = OP_CONNEC
36,804✔
907
                        default:
1✔
908
                                goto parseErr
1✔
909
                        }
910
                case OP_CONNEC:
36,804✔
911
                        switch b {
36,804✔
912
                        case 'T', 't':
36,802✔
913
                                c.state = OP_CONNECT
36,802✔
914
                        default:
2✔
915
                                goto parseErr
2✔
916
                        }
917
                case OP_CONNECT:
73,604✔
918
                        switch b {
73,604✔
919
                        case ' ', '\t':
36,802✔
920
                                continue
36,802✔
921
                        default:
36,802✔
922
                                c.state = CONNECT_ARG
36,802✔
923
                                c.as = i
36,802✔
924
                        }
925
                case CONNECT_ARG:
7,385,287✔
926
                        switch b {
7,385,287✔
927
                        case '\r':
36,801✔
928
                                c.drop = 1
36,801✔
929
                        case '\n':
36,802✔
930
                                var arg []byte
36,802✔
931
                                if c.argBuf != nil {
38,199✔
932
                                        arg = c.argBuf
1,397✔
933
                                        c.argBuf = nil
1,397✔
934
                                } else {
36,802✔
935
                                        arg = buf[c.as : i-c.drop]
35,405✔
936
                                }
35,405✔
937
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
36,802✔
938
                                        return err
×
939
                                }
×
940
                                if trace {
37,476✔
941
                                        c.traceInOp("CONNECT", removePassFromTrace(arg))
674✔
942
                                }
674✔
943
                                if err := c.processConnect(arg); err != nil {
37,097✔
944
                                        return err
295✔
945
                                }
295✔
946
                                c.drop, c.state = 0, OP_START
36,507✔
947
                                // Reset notion on authSet
36,507✔
948
                                c.mu.Lock()
36,507✔
949
                                authSet = c.awaitingAuth()
36,507✔
950
                                c.mu.Unlock()
36,507✔
951
                        default:
7,311,684✔
952
                                if c.argBuf != nil {
7,774,287✔
953
                                        c.argBuf = append(c.argBuf, b)
462,603✔
954
                                }
462,603✔
955
                        }
956
                case OP_M:
12,701,958✔
957
                        switch b {
12,701,958✔
958
                        case 'S', 's':
12,701,958✔
959
                                c.state = OP_MS
12,701,958✔
960
                        default:
×
961
                                goto parseErr
×
962
                        }
963
                case OP_MS:
12,701,958✔
964
                        switch b {
12,701,958✔
965
                        case 'G', 'g':
12,701,958✔
966
                                c.state = OP_MSG
12,701,958✔
967
                        default:
×
968
                                goto parseErr
×
969
                        }
970
                case OP_MSG:
12,701,958✔
971
                        switch b {
12,701,958✔
972
                        case ' ', '\t':
12,701,958✔
973
                                c.state = OP_MSG_SPC
12,701,958✔
974
                        default:
×
975
                                goto parseErr
×
976
                        }
977
                case OP_MSG_SPC:
12,701,958✔
978
                        switch b {
12,701,958✔
979
                        case ' ', '\t':
×
980
                                continue
×
981
                        default:
12,701,958✔
982
                                c.pa.hdr = -1
12,701,958✔
983
                                c.state = MSG_ARG
12,701,958✔
984
                                c.as = i
12,701,958✔
985
                        }
986
                case MSG_ARG:
470,737,731✔
987
                        switch b {
470,737,731✔
988
                        case '\r':
12,701,958✔
989
                                c.drop = 1
12,701,958✔
990
                        case '\n':
12,701,958✔
991
                                var arg []byte
12,701,958✔
992
                                if c.argBuf != nil {
12,926,379✔
993
                                        arg = c.argBuf
224,421✔
994
                                        c.argBuf = nil
224,421✔
995
                                } else {
12,701,958✔
996
                                        arg = buf[c.as : i-c.drop]
12,477,537✔
997
                                }
12,477,537✔
998
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
12,701,958✔
999
                                        return err
×
1000
                                }
×
1001
                                var err error
12,701,958✔
1002
                                if c.kind == ROUTER || c.kind == GATEWAY {
25,306,160✔
1003
                                        switch c.op {
12,604,202✔
1004
                                        case 'R', 'r':
12,595,667✔
1005
                                                if trace {
12,602,265✔
1006
                                                        c.traceInOp("RMSG", arg)
6,598✔
1007
                                                }
6,598✔
1008
                                                err = c.processRoutedMsgArgs(arg)
12,595,667✔
1009
                                        case 'L', 'l':
8,535✔
1010
                                                if trace {
8,768✔
1011
                                                        c.traceInOp("LMSG", arg)
233✔
1012
                                                }
233✔
1013
                                                lmsg = true
8,535✔
1014
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
8,535✔
1015
                                        }
1016
                                } else if c.kind == LEAF {
195,512✔
1017
                                        if trace {
97,770✔
1018
                                                c.traceInOp("LMSG", arg)
14✔
1019
                                        }
14✔
1020
                                        err = c.processLeafMsgArgs(arg)
97,756✔
1021
                                }
1022
                                if err != nil {
12,701,959✔
1023
                                        return err
1✔
1024
                                }
1✔
1025
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
12,701,957✔
1026

12,701,957✔
1027
                                // jump ahead with the index. If this overruns
12,701,957✔
1028
                                // what is left we fall out and process split
12,701,957✔
1029
                                // buffer.
12,701,957✔
1030
                                i = c.as + c.pa.size - LEN_CR_LF
12,701,957✔
1031
                        default:
445,333,815✔
1032
                                if c.argBuf != nil {
447,944,088✔
1033
                                        c.argBuf = append(c.argBuf, b)
2,610,273✔
1034
                                }
2,610,273✔
1035
                        }
1036
                case OP_I:
74,433✔
1037
                        switch b {
74,433✔
1038
                        case 'N', 'n':
74,432✔
1039
                                c.state = OP_IN
74,432✔
1040
                        default:
1✔
1041
                                goto parseErr
1✔
1042
                        }
1043
                case OP_IN:
74,432✔
1044
                        switch b {
74,432✔
1045
                        case 'F', 'f':
74,431✔
1046
                                c.state = OP_INF
74,431✔
1047
                        default:
1✔
1048
                                goto parseErr
1✔
1049
                        }
1050
                case OP_INF:
74,431✔
1051
                        switch b {
74,431✔
1052
                        case 'O', 'o':
74,430✔
1053
                                c.state = OP_INFO
74,430✔
1054
                        default:
1✔
1055
                                goto parseErr
1✔
1056
                        }
1057
                case OP_INFO:
148,861✔
1058
                        switch b {
148,861✔
1059
                        case ' ', '\t':
74,431✔
1060
                                continue
74,431✔
1061
                        default:
74,430✔
1062
                                c.state = INFO_ARG
74,430✔
1063
                                c.as = i
74,430✔
1064
                        }
1065
                case INFO_ARG:
27,662,566✔
1066
                        switch b {
27,662,566✔
1067
                        case '\r':
74,428✔
1068
                                c.drop = 1
74,428✔
1069
                        case '\n':
74,429✔
1070
                                var arg []byte
74,429✔
1071
                                if c.argBuf != nil {
81,930✔
1072
                                        arg = c.argBuf
7,501✔
1073
                                        c.argBuf = nil
7,501✔
1074
                                } else {
74,429✔
1075
                                        arg = buf[c.as : i-c.drop]
66,928✔
1076
                                }
66,928✔
1077
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
74,429✔
1078
                                        return err
×
1079
                                }
×
1080
                                if err := c.processInfo(arg); err != nil {
74,430✔
1081
                                        return err
1✔
1082
                                }
1✔
1083
                                c.drop, c.as, c.state = 0, i+1, OP_START
74,428✔
1084
                        default:
27,513,709✔
1085
                                if c.argBuf != nil {
28,378,080✔
1086
                                        c.argBuf = append(c.argBuf, b)
864,371✔
1087
                                }
864,371✔
1088
                        }
1089
                case OP_PLUS:
3✔
1090
                        switch b {
3✔
1091
                        case 'O', 'o':
2✔
1092
                                c.state = OP_PLUS_O
2✔
1093
                        default:
1✔
1094
                                goto parseErr
1✔
1095
                        }
1096
                case OP_PLUS_O:
2✔
1097
                        switch b {
2✔
1098
                        case 'K', 'k':
1✔
1099
                                c.state = OP_PLUS_OK
1✔
1100
                        default:
1✔
1101
                                goto parseErr
1✔
1102
                        }
1103
                case OP_PLUS_OK:
2✔
1104
                        switch b {
2✔
1105
                        case '\n':
1✔
1106
                                c.drop, c.state = 0, OP_START
1✔
1107
                        }
1108
                case OP_MINUS:
180✔
1109
                        switch b {
180✔
1110
                        case 'E', 'e':
179✔
1111
                                c.state = OP_MINUS_E
179✔
1112
                        default:
1✔
1113
                                goto parseErr
1✔
1114
                        }
1115
                case OP_MINUS_E:
179✔
1116
                        switch b {
179✔
1117
                        case 'R', 'r':
178✔
1118
                                c.state = OP_MINUS_ER
178✔
1119
                        default:
1✔
1120
                                goto parseErr
1✔
1121
                        }
1122
                case OP_MINUS_ER:
178✔
1123
                        switch b {
178✔
1124
                        case 'R', 'r':
177✔
1125
                                c.state = OP_MINUS_ERR
177✔
1126
                        default:
1✔
1127
                                goto parseErr
1✔
1128
                        }
1129
                case OP_MINUS_ERR:
177✔
1130
                        switch b {
177✔
1131
                        case ' ', '\t':
176✔
1132
                                c.state = OP_MINUS_ERR_SPC
176✔
1133
                        default:
1✔
1134
                                goto parseErr
1✔
1135
                        }
1136
                case OP_MINUS_ERR_SPC:
176✔
1137
                        switch b {
176✔
1138
                        case ' ', '\t':
×
1139
                                continue
×
1140
                        default:
176✔
1141
                                c.state = MINUS_ERR_ARG
176✔
1142
                                c.as = i
176✔
1143
                        }
1144
                case MINUS_ERR_ARG:
7,570✔
1145
                        switch b {
7,570✔
1146
                        case '\r':
176✔
1147
                                c.drop = 1
176✔
1148
                        case '\n':
176✔
1149
                                var arg []byte
176✔
1150
                                if c.argBuf != nil {
176✔
1151
                                        arg = c.argBuf
×
1152
                                        c.argBuf = nil
×
1153
                                } else {
176✔
1154
                                        arg = buf[c.as : i-c.drop]
176✔
1155
                                }
176✔
1156
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
176✔
1157
                                        return err
×
1158
                                }
×
1159
                                c.processErr(string(arg))
176✔
1160
                                c.drop, c.as, c.state = 0, i+1, OP_START
176✔
1161
                        default:
7,218✔
1162
                                if c.argBuf != nil {
7,218✔
1163
                                        c.argBuf = append(c.argBuf, b)
×
1164
                                }
×
1165
                        }
1166
                default:
×
1167
                        goto parseErr
×
1168
                }
1169
        }
1170

1171
        // Check for split buffer scenarios for any ARG state.
1172
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
11,444,077✔
1173
                c.state == PUB_ARG || c.state == HPUB_ARG ||
11,444,077✔
1174
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
11,444,077✔
1175
                c.state == MSG_ARG || c.state == HMSG_ARG ||
11,444,077✔
1176
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
11,785,543✔
1177

341,466✔
1178
                // Setup a holder buffer to deal with split buffer scenario.
341,466✔
1179
                if c.argBuf == nil {
682,652✔
1180
                        c.argBuf = c.scratch[:0]
341,186✔
1181
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
341,186✔
1182
                }
341,186✔
1183
                // Check for violations of control line length here. Note that this is not
1184
                // exact at all but the performance hit is too great to be precise, and
1185
                // catching here should prevent memory exhaustion attacks.
1186
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
341,468✔
1187
                        return err
2✔
1188
                }
2✔
1189
        }
1190

1191
        // Check for split msg
1192
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
13,124,477✔
1193
                // We need to clone the pubArg if it is still referencing the
1,680,402✔
1194
                // read buffer and we are not able to process the msg.
1,680,402✔
1195

1,680,402✔
1196
                if c.argBuf == nil {
3,360,804✔
1197
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
1,680,402✔
1198
                        if err := c.clonePubArg(lmsg); err != nil {
1,680,402✔
1199
                                goto parseErr
×
1200
                        }
1201
                }
1202

1203
                // If we will overflow the scratch buffer, just create a
1204
                // new buffer to hold the split message.
1205
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
1,756,320✔
1206
                        lrem := len(buf[c.as:])
75,918✔
1207
                        // Consider it a protocol error when the remaining payload
75,918✔
1208
                        // is larger than the reported size for PUB. It can happen
75,918✔
1209
                        // when processing incomplete messages from rogue clients.
75,918✔
1210
                        if lrem > c.pa.size+LEN_CR_LF {
75,918✔
1211
                                goto parseErr
×
1212
                        }
1213
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
75,918✔
1214
                        copy(c.msgBuf, buf[c.as:])
75,918✔
1215
                } else {
1,604,484✔
1216
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
1,604,484✔
1217
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
1,604,484✔
1218
                }
1,604,484✔
1219
        }
1220

1221
        return nil
11,444,075✔
1222

11,444,075✔
1223
authErr:
11,444,075✔
1224
        c.authViolation()
12✔
1225
        return ErrAuthentication
12✔
1226

12✔
1227
parseErr:
12✔
1228
        c.sendErr("Unknown Protocol Operation")
53✔
1229
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
53✔
1230
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
53✔
1231
        return err
53✔
1232
}
1233

1234
func protoSnippet(start, max int, buf []byte) string {
111✔
1235
        stop := start + max
111✔
1236
        bufSize := len(buf)
111✔
1237
        if start >= bufSize {
114✔
1238
                return `""`
3✔
1239
        }
3✔
1240
        if stop > bufSize {
191✔
1241
                stop = bufSize - 1
83✔
1242
        }
83✔
1243
        return fmt.Sprintf("%q", buf[start:stop])
108✔
1244
}
1245

1246
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1247
// If so, an error is sent to the client and the connection is closed.
1248
// The error ErrMaxControlLine is returned.
1249
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
34,613,219✔
1250
        if c.kind != CLIENT {
52,238,638✔
1251
                return nil
17,625,419✔
1252
        }
17,625,419✔
1253
        if len(arg) > int(mcl) {
16,987,803✔
1254
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
3✔
1255
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
3✔
1256
                c.sendErr(err.Error())
3✔
1257
                c.closeConnection(MaxControlLineExceeded)
3✔
1258
                return err
3✔
1259
        }
3✔
1260
        return nil
16,987,797✔
1261
}
1262

1263
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1264
// we need to hold onto it into the next read.
1265
func (c *client) clonePubArg(lmsg bool) error {
1,680,402✔
1266
        // Just copy and re-process original arg buffer.
1,680,402✔
1267
        c.argBuf = c.scratch[:0]
1,680,402✔
1268
        c.argBuf = append(c.argBuf, c.pa.arg...)
1,680,402✔
1269

1,680,402✔
1270
        switch c.kind {
1,680,402✔
1271
        case ROUTER, GATEWAY:
902,630✔
1272
                if lmsg {
903,409✔
1273
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
779✔
1274
                }
779✔
1275
                if c.pa.hdr < 0 {
1,621,361✔
1276
                        return c.processRoutedMsgArgs(c.argBuf)
719,510✔
1277
                } else {
901,851✔
1278
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
182,341✔
1279
                }
182,341✔
1280
        case LEAF:
2,078✔
1281
                if c.pa.hdr < 0 {
4,052✔
1282
                        return c.processLeafMsgArgs(c.argBuf)
1,974✔
1283
                } else {
2,078✔
1284
                        return c.processLeafHeaderMsgArgs(c.argBuf)
104✔
1285
                }
104✔
1286
        default:
775,694✔
1287
                if c.pa.hdr < 0 {
819,642✔
1288
                        return c.processPub(c.argBuf)
43,948✔
1289
                } else {
775,694✔
1290
                        return c.processHeaderPub(c.argBuf, nil)
731,746✔
1291
                }
731,746✔
1292
        }
1293
}
1294

1295
func (ps *parseState) getHeader() http.Header {
845✔
1296
        if ps.header == nil {
1,690✔
1297
                if hdr := ps.pa.hdr; hdr > 0 {
918✔
1298
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1299
                        tp := textproto.NewReader(reader)
73✔
1300
                        tp.ReadLine() // skip over first line, contains version
73✔
1301
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1302
                                ps.header = http.Header(mimeHeader)
73✔
1303
                        }
73✔
1304
                }
1305
        }
1306
        return ps.header
845✔
1307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc