• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25538703157

07 May 2026 03:25PM UTC coverage: 81.062% (-1.1%) from 82.15%
25538703157

push

github

web-flow
[FIXED] Dedupe map cleanup race on leader change (#8106)

`processStreamLeaderChange` cleaned up `seq=0` dedupe placeholders
without holding `clMu`, so a concurrent proposal could plant a
placeholder after the cleanup ran and leak it into `ddmap/ddarr`.

75402 of 93018 relevant lines covered (81.06%)

624787.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.86
/server/parser.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
        argsa   [MAX_HMSG_ARGS + 1][]byte // pre-allocated args array to avoid per-call heap escape
36
}
37

38
type pubArg struct {
39
        arg       []byte
40
        pacache   []byte
41
        origin    []byte
42
        account   []byte
43
        subject   []byte
44
        deliver   []byte
45
        mapped    []byte
46
        reply     []byte
47
        szb       []byte
48
        hdb       []byte
49
        queues    [][]byte
50
        size      int
51
        hdr       int
52
        psi       []*serviceImport
53
        trace     *msgTrace
54
        delivered bool // Only used for service imports
55
}
56

57
// Parser constants
58
const (
59
        OP_START parserState = iota
60
        OP_PLUS
61
        OP_PLUS_O
62
        OP_PLUS_OK
63
        OP_MINUS
64
        OP_MINUS_E
65
        OP_MINUS_ER
66
        OP_MINUS_ERR
67
        OP_MINUS_ERR_SPC
68
        MINUS_ERR_ARG
69
        OP_C
70
        OP_CO
71
        OP_CON
72
        OP_CONN
73
        OP_CONNE
74
        OP_CONNEC
75
        OP_CONNECT
76
        CONNECT_ARG
77
        OP_H
78
        OP_HP
79
        OP_HPU
80
        OP_HPUB
81
        OP_HPUB_SPC
82
        HPUB_ARG
83
        OP_HM
84
        OP_HMS
85
        OP_HMSG
86
        OP_HMSG_SPC
87
        HMSG_ARG
88
        OP_P
89
        OP_PU
90
        OP_PUB
91
        OP_PUB_SPC
92
        PUB_ARG
93
        OP_PI
94
        OP_PIN
95
        OP_PING
96
        OP_PO
97
        OP_PON
98
        OP_PONG
99
        MSG_PAYLOAD
100
        MSG_END_R
101
        MSG_END_N
102
        OP_S
103
        OP_SU
104
        OP_SUB
105
        OP_SUB_SPC
106
        SUB_ARG
107
        OP_A
108
        OP_ASUB
109
        OP_ASUB_SPC
110
        ASUB_ARG
111
        OP_AUSUB
112
        OP_AUSUB_SPC
113
        AUSUB_ARG
114
        OP_L
115
        OP_LS
116
        OP_R
117
        OP_RS
118
        OP_U
119
        OP_UN
120
        OP_UNS
121
        OP_UNSU
122
        OP_UNSUB
123
        OP_UNSUB_SPC
124
        UNSUB_ARG
125
        OP_M
126
        OP_MS
127
        OP_MSG
128
        OP_MSG_SPC
129
        MSG_ARG
130
        OP_I
131
        OP_IN
132
        OP_INF
133
        OP_INFO
134
        INFO_ARG
135
)
136

137
func (c *client) parse(buf []byte) error {
15,718,378✔
138
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
15,718,378✔
139
        // an issue (say data race detection), we could branch outside in readLoop
15,718,378✔
140
        if c.isMqtt() {
15,720,358✔
141
                return c.mqttParse(buf)
1,980✔
142
        }
1,980✔
143
        var i int
15,716,398✔
144
        var b byte
15,716,398✔
145
        var lmsg bool
15,716,398✔
146

15,716,398✔
147
        // Snapshots
15,716,398✔
148
        c.mu.Lock()
15,716,398✔
149
        // Snapshot and then reset when we receive a
15,716,398✔
150
        // proper CONNECT if needed.
15,716,398✔
151
        authSet := c.awaitingAuth()
15,716,398✔
152
        // Snapshot max control line as well.
15,716,398✔
153
        s, mcl, trace := c.srv, c.mcl, c.trace
15,716,398✔
154
        c.mu.Unlock()
15,716,398✔
155

15,716,398✔
156
        // Move to loop instead of range syntax to allow jumping of i
15,716,398✔
157
        for i = 0; i < len(buf); i++ {
1,172,522,938✔
158
                b = buf[i]
1,156,806,540✔
159

1,156,806,540✔
160
                switch c.state {
1,156,806,540✔
161
                case OP_START:
29,108,361✔
162
                        c.op = b
29,108,361✔
163
                        if b != 'C' && b != 'c' {
58,173,024✔
164
                                if authSet {
29,065,267✔
165
                                        if s == nil {
604✔
166
                                                goto authErr
×
167
                                        }
168
                                        var ok bool
604✔
169
                                        switch c.kind {
604✔
170
                                        case CLIENT:
3✔
171
                                                // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
3✔
172
                                                // E.g. telnet proto demos.
3✔
173
                                                opts := s.getOpts()
3✔
174
                                                noAuthUser := opts.NoAuthUser
3✔
175
                                                if c.ws != nil {
3✔
176
                                                        if noAuthUserWS := opts.Websocket.NoAuthUser; noAuthUserWS != _EMPTY_ {
×
177
                                                                noAuthUser = noAuthUserWS
×
178
                                                        }
×
179
                                                }
180
                                                if noAuthUser != _EMPTY_ {
5✔
181
                                                        s.mu.Lock()
2✔
182
                                                        user, exists := s.users[noAuthUser]
2✔
183
                                                        s.mu.Unlock()
2✔
184
                                                        if exists {
4✔
185
                                                                c.RegisterUser(user)
2✔
186
                                                                c.mu.Lock()
2✔
187
                                                                c.clearAuthTimer()
2✔
188
                                                                c.flags.set(connectReceived)
2✔
189
                                                                c.mu.Unlock()
2✔
190
                                                                authSet, ok = false, true
2✔
191
                                                        }
2✔
192
                                                }
193
                                        case LEAF:
583✔
194
                                                // Compressed inbound leaf-node negotiation may require INFO
583✔
195
                                                // before CONNECT. Without compression, leaf connections must
583✔
196
                                                // still start with CONNECT.
583✔
197
                                                ok = (b == 'I' || b == 'i') && needsCompression(s.getOpts().LeafNode.Compression.Mode)
583✔
198
                                        }
199
                                        if !ok {
633✔
200
                                                goto authErr
29✔
201
                                        }
202
                                }
203
                        }
204
                        switch b {
29,108,332✔
205
                        case 'P', 'p':
11,047,896✔
206
                                c.state = OP_P
11,047,896✔
207
                        case 'H', 'h':
1,999,376✔
208
                                c.state = OP_H
1,999,376✔
209
                        case 'S', 's':
136,204✔
210
                                c.state = OP_S
136,204✔
211
                        case 'U', 'u':
15,545✔
212
                                c.state = OP_U
15,545✔
213
                        case 'R', 'r':
15,612,682✔
214
                                if c.kind == CLIENT {
15,612,682✔
215
                                        goto parseErr
×
216
                                } else {
15,612,682✔
217
                                        c.state = OP_R
15,612,682✔
218
                                }
15,612,682✔
219
                        case 'L', 'l':
167,097✔
220
                                if c.kind != LEAF && c.kind != ROUTER {
167,097✔
221
                                        goto parseErr
×
222
                                } else {
167,097✔
223
                                        c.state = OP_L
167,097✔
224
                                }
167,097✔
225
                        case 'A', 'a':
32✔
226
                                if c.kind == CLIENT {
32✔
227
                                        goto parseErr
×
228
                                } else {
32✔
229
                                        c.state = OP_A
32✔
230
                                }
32✔
231
                        case 'C', 'c':
43,698✔
232
                                c.state = OP_C
43,698✔
233
                        case 'I', 'i':
85,696✔
234
                                c.state = OP_I
85,696✔
235
                        case '+':
×
236
                                c.state = OP_PLUS
×
237
                        case '-':
105✔
238
                                c.state = OP_MINUS
105✔
239
                        default:
1✔
240
                                goto parseErr
1✔
241
                        }
242
                case OP_H:
1,999,376✔
243
                        switch b {
1,999,376✔
244
                        case 'P', 'p':
868,241✔
245
                                c.state = OP_HP
868,241✔
246
                        case 'M', 'm':
1,131,135✔
247
                                c.state = OP_HM
1,131,135✔
248
                        default:
×
249
                                goto parseErr
×
250
                        }
251
                case OP_HP:
868,241✔
252
                        switch b {
868,241✔
253
                        case 'U', 'u':
868,241✔
254
                                c.state = OP_HPU
868,241✔
255
                        default:
×
256
                                goto parseErr
×
257
                        }
258
                case OP_HPU:
868,241✔
259
                        switch b {
868,241✔
260
                        case 'B', 'b':
868,241✔
261
                                c.state = OP_HPUB
868,241✔
262
                        default:
×
263
                                goto parseErr
×
264
                        }
265
                case OP_HPUB:
868,241✔
266
                        switch b {
868,241✔
267
                        case ' ', '\t':
868,241✔
268
                                c.state = OP_HPUB_SPC
868,241✔
269
                        default:
×
270
                                goto parseErr
×
271
                        }
272
                case OP_HPUB_SPC:
868,241✔
273
                        switch b {
868,241✔
274
                        case ' ', '\t':
×
275
                                continue
×
276
                        default:
868,241✔
277
                                c.pa.hdr = 0
868,241✔
278
                                c.state = HPUB_ARG
868,241✔
279
                                c.as = i
868,241✔
280
                        }
281
                case HPUB_ARG:
31,486,706✔
282
                        switch b {
31,486,706✔
283
                        case '\r':
868,241✔
284
                                c.drop = 1
868,241✔
285
                        case '\n':
868,241✔
286
                                var arg []byte
868,241✔
287
                                if c.argBuf != nil {
868,335✔
288
                                        arg = c.argBuf
94✔
289
                                        c.argBuf = nil
94✔
290
                                } else {
868,241✔
291
                                        arg = buf[c.as : i-c.drop]
868,147✔
292
                                }
868,147✔
293
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
868,241✔
294
                                        return err
×
295
                                }
×
296
                                if trace {
937,893✔
297
                                        c.traceInOp("HPUB", arg)
69,652✔
298
                                }
69,652✔
299
                                var remaining []byte
868,241✔
300
                                if i < len(buf) {
1,736,482✔
301
                                        remaining = buf[i+1:]
868,241✔
302
                                }
868,241✔
303
                                if err := c.processHeaderPub(arg, remaining); err != nil {
868,243✔
304
                                        return err
2✔
305
                                }
2✔
306

307
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
868,239✔
308
                                // If we don't have a saved buffer then jump ahead with
868,239✔
309
                                // the index. If this overruns what is left we fall out
868,239✔
310
                                // and process split buffer.
868,239✔
311
                                if c.msgBuf == nil {
1,736,478✔
312
                                        i = c.as + c.pa.size - LEN_CR_LF
868,239✔
313
                                }
868,239✔
314
                        default:
29,750,224✔
315
                                if c.argBuf != nil {
29,751,261✔
316
                                        c.argBuf = append(c.argBuf, b)
1,037✔
317
                                }
1,037✔
318
                        }
319
                case OP_HM:
1,131,135✔
320
                        switch b {
1,131,135✔
321
                        case 'S', 's':
1,131,135✔
322
                                c.state = OP_HMS
1,131,135✔
323
                        default:
×
324
                                goto parseErr
×
325
                        }
326
                case OP_HMS:
1,131,135✔
327
                        switch b {
1,131,135✔
328
                        case 'G', 'g':
1,131,135✔
329
                                c.state = OP_HMSG
1,131,135✔
330
                        default:
×
331
                                goto parseErr
×
332
                        }
333
                case OP_HMSG:
1,131,135✔
334
                        switch b {
1,131,135✔
335
                        case ' ', '\t':
1,131,135✔
336
                                c.state = OP_HMSG_SPC
1,131,135✔
337
                        default:
×
338
                                goto parseErr
×
339
                        }
340
                case OP_HMSG_SPC:
1,131,135✔
341
                        switch b {
1,131,135✔
342
                        case ' ', '\t':
×
343
                                continue
×
344
                        default:
1,131,135✔
345
                                c.pa.hdr = 0
1,131,135✔
346
                                c.state = HMSG_ARG
1,131,135✔
347
                                c.as = i
1,131,135✔
348
                        }
349
                case HMSG_ARG:
106,142,824✔
350
                        switch b {
106,142,824✔
351
                        case '\r':
1,131,134✔
352
                                c.drop = 1
1,131,134✔
353
                        case '\n':
1,131,134✔
354
                                var arg []byte
1,131,134✔
355
                                if c.argBuf != nil {
1,136,499✔
356
                                        arg = c.argBuf
5,365✔
357
                                        c.argBuf = nil
5,365✔
358
                                } else {
1,131,134✔
359
                                        arg = buf[c.as : i-c.drop]
1,125,769✔
360
                                }
1,125,769✔
361
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,131,134✔
362
                                        return err
×
363
                                }
×
364
                                var err error
1,131,134✔
365
                                if c.kind == ROUTER || c.kind == GATEWAY {
2,261,871✔
366
                                        if trace {
1,132,884✔
367
                                                c.traceInOp("HMSG", arg)
2,147✔
368
                                        }
2,147✔
369
                                        err = c.processRoutedHeaderMsgArgs(arg)
1,130,737✔
370
                                } else if c.kind == LEAF {
794✔
371
                                        if trace {
549✔
372
                                                c.traceInOp("HMSG", arg)
152✔
373
                                        }
152✔
374
                                        err = c.processLeafHeaderMsgArgs(arg)
397✔
375
                                }
376
                                if err != nil {
1,131,134✔
377
                                        return err
×
378
                                }
×
379
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
1,131,134✔
380

1,131,134✔
381
                                // jump ahead with the index. If this overruns
1,131,134✔
382
                                // what is left we fall out and process split
1,131,134✔
383
                                // buffer.
1,131,134✔
384
                                i = c.as + c.pa.size - LEN_CR_LF
1,131,134✔
385
                        default:
103,880,556✔
386
                                if c.argBuf != nil {
103,988,666✔
387
                                        c.argBuf = append(c.argBuf, b)
108,110✔
388
                                }
108,110✔
389
                        }
390
                case OP_P:
11,047,896✔
391
                        switch b {
11,047,896✔
392
                        case 'U', 'u':
10,980,930✔
393
                                c.state = OP_PU
10,980,930✔
394
                        case 'I', 'i':
39,406✔
395
                                c.state = OP_PI
39,406✔
396
                        case 'O', 'o':
27,560✔
397
                                c.state = OP_PO
27,560✔
398
                        default:
×
399
                                goto parseErr
×
400
                        }
401
                case OP_PU:
10,980,930✔
402
                        switch b {
10,980,930✔
403
                        case 'B', 'b':
10,980,930✔
404
                                c.state = OP_PUB
10,980,930✔
405
                        default:
×
406
                                goto parseErr
×
407
                        }
408
                case OP_PUB:
10,980,930✔
409
                        switch b {
10,980,930✔
410
                        case ' ', '\t':
10,980,929✔
411
                                c.state = OP_PUB_SPC
10,980,929✔
412
                        default:
1✔
413
                                goto parseErr
1✔
414
                        }
415
                case OP_PUB_SPC:
10,980,927✔
416
                        switch b {
10,980,927✔
417
                        case ' ', '\t':
×
418
                                continue
×
419
                        default:
10,980,927✔
420
                                c.pa.hdr = -1
10,980,927✔
421
                                c.state = PUB_ARG
10,980,927✔
422
                                c.as = i
10,980,927✔
423
                        }
424
                case PUB_ARG:
188,630,619✔
425
                        switch b {
188,630,619✔
426
                        case '\r':
10,980,919✔
427
                                c.drop = 1
10,980,919✔
428
                        case '\n':
10,980,919✔
429
                                var arg []byte
10,980,919✔
430
                                if c.argBuf != nil {
11,006,485✔
431
                                        arg = c.argBuf
25,566✔
432
                                        c.argBuf = nil
25,566✔
433
                                } else {
10,980,919✔
434
                                        arg = buf[c.as : i-c.drop]
10,955,353✔
435
                                }
10,955,353✔
436
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
10,980,919✔
437
                                        return err
×
438
                                }
×
439
                                if trace {
11,293,185✔
440
                                        c.traceInOp("PUB", arg)
312,266✔
441
                                }
312,266✔
442
                                if err := c.processPub(arg); err != nil {
10,980,925✔
443
                                        return err
6✔
444
                                }
6✔
445

446
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
10,980,913✔
447
                                // If we don't have a saved buffer then jump ahead with
10,980,913✔
448
                                // the index. If this overruns what is left we fall out
10,980,913✔
449
                                // and process split buffer.
10,980,913✔
450
                                if c.msgBuf == nil {
21,961,826✔
451
                                        i = c.as + c.pa.size - LEN_CR_LF
10,980,913✔
452
                                }
10,980,913✔
453
                        default:
166,668,781✔
454
                                if c.argBuf != nil {
167,151,354✔
455
                                        c.argBuf = append(c.argBuf, b)
482,573✔
456
                                }
482,573✔
457
                        }
458
                case MSG_PAYLOAD:
28,272,827✔
459
                        if c.msgBuf != nil {
30,737,101✔
460
                                // copy as much as we can to the buffer and skip ahead.
2,464,274✔
461
                                toCopy := c.pa.size - len(c.msgBuf)
2,464,274✔
462
                                avail := len(buf) - i
2,464,274✔
463
                                if avail < toCopy {
3,243,461✔
464
                                        toCopy = avail
779,187✔
465
                                }
779,187✔
466
                                if toCopy > 0 {
4,928,548✔
467
                                        start := len(c.msgBuf)
2,464,274✔
468
                                        // This is needed for copy to work.
2,464,274✔
469
                                        c.msgBuf = c.msgBuf[:start+toCopy]
2,464,274✔
470
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
2,464,274✔
471
                                        // Update our index
2,464,274✔
472
                                        i = (i + toCopy) - 1
2,464,274✔
473
                                } else {
2,464,274✔
474
                                        // Fall back to append if needed.
×
475
                                        c.msgBuf = append(c.msgBuf, b)
×
476
                                }
×
477
                                if len(c.msgBuf) >= c.pa.size {
4,149,361✔
478
                                        c.state = MSG_END_R
1,685,087✔
479
                                }
1,685,087✔
480
                        } else if i-c.as+1 >= c.pa.size {
51,617,106✔
481
                                c.state = MSG_END_R
25,808,553✔
482
                        }
25,808,553✔
483
                case MSG_END_R:
27,493,640✔
484
                        if b != '\r' {
27,493,641✔
485
                                goto parseErr
1✔
486
                        }
487
                        if c.msgBuf != nil {
29,191,922✔
488
                                c.msgBuf = append(c.msgBuf, b)
1,698,283✔
489
                        }
1,698,283✔
490
                        c.state = MSG_END_N
27,493,639✔
491
                case MSG_END_N:
27,493,639✔
492
                        if b != '\n' {
27,493,639✔
493
                                goto parseErr
×
494
                        }
495
                        if c.msgBuf != nil {
29,194,106✔
496
                                c.msgBuf = append(c.msgBuf, b)
1,700,467✔
497
                        } else {
27,493,639✔
498
                                c.msgBuf = buf[c.as : i+1]
25,793,172✔
499
                        }
25,793,172✔
500

501
                        var mt *msgTrace
27,493,639✔
502
                        if c.pa.hdr > 0 {
29,493,454✔
503
                                mt = c.initMsgTrace()
1,999,815✔
504
                        }
1,999,815✔
505
                        // Check for mappings.
506
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
27,526,363✔
507
                                changed := c.selectMappedSubject()
32,724✔
508
                                if changed {
45,286✔
509
                                        if trace {
12,639✔
510
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
77✔
511
                                        }
77✔
512
                                        // c.pa.subject is the subject the original is now mapped to.
513
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,562✔
514
                                }
515
                        }
516
                        if trace {
27,884,631✔
517
                                c.traceMsg(c.msgBuf)
390,992✔
518
                        }
390,992✔
519

520
                        c.processInboundMsg(c.msgBuf)
27,493,639✔
521

27,493,639✔
522
                        mt.sendEvent()
27,493,639✔
523
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
27,493,639✔
524
                        c.drop, c.as, c.state = 0, i+1, OP_START
27,493,639✔
525
                        // Drop all pub args
27,493,639✔
526
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
27,493,639✔
527
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
27,493,639✔
528
                        c.pa.trace = nil
27,493,639✔
529
                        c.pa.delivered = false
27,493,639✔
530
                        lmsg = false
27,493,639✔
531
                case OP_A:
32✔
532
                        switch b {
32✔
533
                        case '+':
4✔
534
                                c.state = OP_ASUB
4✔
535
                        case '-', 'u':
28✔
536
                                c.state = OP_AUSUB
28✔
537
                        default:
×
538
                                goto parseErr
×
539
                        }
540
                case OP_ASUB:
4✔
541
                        switch b {
4✔
542
                        case ' ', '\t':
4✔
543
                                c.state = OP_ASUB_SPC
4✔
544
                        default:
×
545
                                goto parseErr
×
546
                        }
547
                case OP_ASUB_SPC:
4✔
548
                        switch b {
4✔
549
                        case ' ', '\t':
×
550
                                continue
×
551
                        default:
4✔
552
                                c.state = ASUB_ARG
4✔
553
                                c.as = i
4✔
554
                        }
555
                case ASUB_ARG:
14✔
556
                        switch b {
14✔
557
                        case '\r':
4✔
558
                                c.drop = 1
4✔
559
                        case '\n':
4✔
560
                                var arg []byte
4✔
561
                                if c.argBuf != nil {
4✔
562
                                        arg = c.argBuf
×
563
                                        c.argBuf = nil
×
564
                                } else {
4✔
565
                                        arg = buf[c.as : i-c.drop]
4✔
566
                                }
4✔
567
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
568
                                        return err
×
569
                                }
×
570
                                if trace {
4✔
571
                                        c.traceInOp("A+", arg)
×
572
                                }
×
573
                                if err := c.processAccountSub(arg); err != nil {
4✔
574
                                        return err
×
575
                                }
×
576
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
577
                        default:
6✔
578
                                if c.argBuf != nil {
6✔
579
                                        c.argBuf = append(c.argBuf, b)
×
580
                                }
×
581
                        }
582
                case OP_AUSUB:
28✔
583
                        switch b {
28✔
584
                        case ' ', '\t':
28✔
585
                                c.state = OP_AUSUB_SPC
28✔
586
                        default:
×
587
                                goto parseErr
×
588
                        }
589
                case OP_AUSUB_SPC:
28✔
590
                        switch b {
28✔
591
                        case ' ', '\t':
×
592
                                continue
×
593
                        default:
28✔
594
                                c.state = AUSUB_ARG
28✔
595
                                c.as = i
28✔
596
                        }
597
                case AUSUB_ARG:
142✔
598
                        switch b {
142✔
599
                        case '\r':
28✔
600
                                c.drop = 1
28✔
601
                        case '\n':
28✔
602
                                var arg []byte
28✔
603
                                if c.argBuf != nil {
28✔
604
                                        arg = c.argBuf
×
605
                                        c.argBuf = nil
×
606
                                } else {
28✔
607
                                        arg = buf[c.as : i-c.drop]
28✔
608
                                }
28✔
609
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
610
                                        return err
×
611
                                }
×
612
                                if trace {
45✔
613
                                        c.traceInOp("A-", arg)
17✔
614
                                }
17✔
615
                                c.processAccountUnsub(arg)
28✔
616
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
617
                        default:
86✔
618
                                if c.argBuf != nil {
86✔
619
                                        c.argBuf = append(c.argBuf, b)
×
620
                                }
×
621
                        }
622
                case OP_S:
136,204✔
623
                        switch b {
136,204✔
624
                        case 'U', 'u':
136,204✔
625
                                c.state = OP_SU
136,204✔
626
                        default:
×
627
                                goto parseErr
×
628
                        }
629
                case OP_SU:
136,204✔
630
                        switch b {
136,204✔
631
                        case 'B', 'b':
136,204✔
632
                                c.state = OP_SUB
136,204✔
633
                        default:
×
634
                                goto parseErr
×
635
                        }
636
                case OP_SUB:
1,178,712✔
637
                        switch b {
1,178,712✔
638
                        case ' ', '\t':
1,178,711✔
639
                                c.state = OP_SUB_SPC
1,178,711✔
640
                        default:
1✔
641
                                goto parseErr
1✔
642
                        }
643
                case OP_SUB_SPC:
1,178,711✔
644
                        switch b {
1,178,711✔
645
                        case ' ', '\t':
×
646
                                continue
×
647
                        default:
1,178,711✔
648
                                c.state = SUB_ARG
1,178,711✔
649
                                c.as = i
1,178,711✔
650
                        }
651
                case SUB_ARG:
44,243,663✔
652
                        switch b {
44,243,663✔
653
                        case '\r':
1,178,694✔
654
                                c.drop = 1
1,178,694✔
655
                        case '\n':
1,178,694✔
656
                                var arg []byte
1,178,694✔
657
                                if c.argBuf != nil {
1,207,051✔
658
                                        arg = c.argBuf
28,357✔
659
                                        c.argBuf = nil
28,357✔
660
                                } else {
1,178,694✔
661
                                        arg = buf[c.as : i-c.drop]
1,150,337✔
662
                                }
1,150,337✔
663
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,178,694✔
664
                                        return err
×
665
                                }
×
666
                                var err error
1,178,694✔
667

1,178,694✔
668
                                switch c.kind {
1,178,694✔
669
                                case CLIENT:
136,203✔
670
                                        if trace {
149,243✔
671
                                                c.traceInOp("SUB", arg)
13,040✔
672
                                        }
13,040✔
673
                                        err = c.parseSub(arg, false)
136,203✔
674
                                case ROUTER:
849,163✔
675
                                        switch c.op {
849,163✔
676
                                        case 'R', 'r':
836,787✔
677
                                                if trace {
839,749✔
678
                                                        c.traceInOp("RS+", arg)
2,962✔
679
                                                }
2,962✔
680
                                                err = c.processRemoteSub(arg, false)
836,787✔
681
                                        case 'L', 'l':
12,376✔
682
                                                if trace {
12,570✔
683
                                                        c.traceInOp("LS+", arg)
194✔
684
                                                }
194✔
685
                                                err = c.processRemoteSub(arg, true)
12,376✔
686
                                        }
687
                                case GATEWAY:
160,594✔
688
                                        if trace {
160,842✔
689
                                                c.traceInOp("RS+", arg)
248✔
690
                                        }
248✔
691
                                        err = c.processGatewayRSub(arg)
160,594✔
692
                                case LEAF:
32,734✔
693
                                        if trace {
32,855✔
694
                                                c.traceInOp("LS+", arg)
121✔
695
                                        }
121✔
696
                                        err = c.processLeafSub(arg)
32,734✔
697
                                }
698
                                if err != nil {
1,178,694✔
699
                                        return err
×
700
                                }
×
701
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,178,694✔
702
                        default:
41,886,275✔
703
                                if c.argBuf != nil {
42,608,233✔
704
                                        c.argBuf = append(c.argBuf, b)
721,958✔
705
                                }
721,958✔
706
                        }
707
                case OP_L:
167,097✔
708
                        switch b {
167,097✔
709
                        case 'S', 's':
60,021✔
710
                                c.state = OP_LS
60,021✔
711
                        case 'M', 'm':
107,076✔
712
                                c.state = OP_M
107,076✔
713
                        default:
×
714
                                goto parseErr
×
715
                        }
716
                case OP_LS:
60,020✔
717
                        switch b {
60,020✔
718
                        case '+':
45,110✔
719
                                c.state = OP_SUB
45,110✔
720
                        case '-':
14,910✔
721
                                c.state = OP_UNSUB
14,910✔
722
                        default:
×
723
                                goto parseErr
×
724
                        }
725
                case OP_R:
15,612,682✔
726
                        switch b {
15,612,682✔
727
                        case 'S', 's':
1,206,372✔
728
                                c.state = OP_RS
1,206,372✔
729
                        case 'M', 'm':
14,406,310✔
730
                                c.state = OP_M
14,406,310✔
731
                        default:
×
732
                                goto parseErr
×
733
                        }
734
                case OP_RS:
1,206,372✔
735
                        switch b {
1,206,372✔
736
                        case '+':
997,398✔
737
                                c.state = OP_SUB
997,398✔
738
                        case '-':
208,974✔
739
                                c.state = OP_UNSUB
208,974✔
740
                        default:
×
741
                                goto parseErr
×
742
                        }
743
                case OP_U:
15,545✔
744
                        switch b {
15,545✔
745
                        case 'N', 'n':
15,545✔
746
                                c.state = OP_UN
15,545✔
747
                        default:
×
748
                                goto parseErr
×
749
                        }
750
                case OP_UN:
15,545✔
751
                        switch b {
15,545✔
752
                        case 'S', 's':
15,545✔
753
                                c.state = OP_UNS
15,545✔
754
                        default:
×
755
                                goto parseErr
×
756
                        }
757
                case OP_UNS:
15,545✔
758
                        switch b {
15,545✔
759
                        case 'U', 'u':
15,545✔
760
                                c.state = OP_UNSU
15,545✔
761
                        default:
×
762
                                goto parseErr
×
763
                        }
764
                case OP_UNSU:
15,545✔
765
                        switch b {
15,545✔
766
                        case 'B', 'b':
15,545✔
767
                                c.state = OP_UNSUB
15,545✔
768
                        default:
×
769
                                goto parseErr
×
770
                        }
771
                case OP_UNSUB:
239,429✔
772
                        switch b {
239,429✔
773
                        case ' ', '\t':
239,429✔
774
                                c.state = OP_UNSUB_SPC
239,429✔
775
                        default:
×
776
                                goto parseErr
×
777
                        }
778
                case OP_UNSUB_SPC:
239,429✔
779
                        switch b {
239,429✔
780
                        case ' ', '\t':
×
781
                                continue
×
782
                        default:
239,429✔
783
                                c.state = UNSUB_ARG
239,429✔
784
                                c.as = i
239,429✔
785
                        }
786
                case UNSUB_ARG:
7,067,110✔
787
                        switch b {
7,067,110✔
788
                        case '\r':
239,424✔
789
                                c.drop = 1
239,424✔
790
                        case '\n':
239,420✔
791
                                var arg []byte
239,420✔
792
                                if c.argBuf != nil {
247,533✔
793
                                        arg = c.argBuf
8,113✔
794
                                        c.argBuf = nil
8,113✔
795
                                } else {
239,420✔
796
                                        arg = buf[c.as : i-c.drop]
231,307✔
797
                                }
231,307✔
798
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
239,420✔
799
                                        return err
×
800
                                }
×
801
                                var err error
239,420✔
802

239,420✔
803
                                switch c.kind {
239,420✔
804
                                case CLIENT:
15,544✔
805
                                        if trace {
28,342✔
806
                                                c.traceInOp("UNSUB", arg)
12,798✔
807
                                        }
12,798✔
808
                                        err = c.processUnsub(arg)
15,544✔
809
                                case ROUTER:
176,746✔
810
                                        if trace && c.srv != nil {
177,146✔
811
                                                switch c.op {
400✔
812
                                                case 'R', 'r':
206✔
813
                                                        c.traceInOp("RS-", arg)
206✔
814
                                                case 'L', 'l':
194✔
815
                                                        c.traceInOp("LS-", arg)
194✔
816
                                                }
817
                                        }
818
                                        leafUnsub := c.op == 'L' || c.op == 'l'
176,746✔
819
                                        err = c.processRemoteUnsub(arg, leafUnsub)
176,746✔
820
                                case GATEWAY:
43,785✔
821
                                        if trace {
44,967✔
822
                                                c.traceInOp("RS-", arg)
1,182✔
823
                                        }
1,182✔
824
                                        err = c.processGatewayRUnsub(arg)
43,785✔
825
                                case LEAF:
3,345✔
826
                                        if trace {
3,349✔
827
                                                c.traceInOp("LS-", arg)
4✔
828
                                        }
4✔
829
                                        err = c.processLeafUnsub(arg)
3,345✔
830
                                }
831
                                if err != nil {
239,420✔
832
                                        return err
×
833
                                }
×
834
                                c.drop, c.as, c.state = 0, i+1, OP_START
239,420✔
835
                        default:
6,588,266✔
836
                                if c.argBuf != nil {
6,763,567✔
837
                                        c.argBuf = append(c.argBuf, b)
175,301✔
838
                                }
175,301✔
839
                        }
840
                case OP_PI:
39,406✔
841
                        switch b {
39,406✔
842
                        case 'N', 'n':
39,406✔
843
                                c.state = OP_PIN
39,406✔
844
                        default:
×
845
                                goto parseErr
×
846
                        }
847
                case OP_PIN:
39,406✔
848
                        switch b {
39,406✔
849
                        case 'G', 'g':
39,406✔
850
                                c.state = OP_PING
39,406✔
851
                        default:
×
852
                                goto parseErr
×
853
                        }
854
                case OP_PING:
78,812✔
855
                        switch b {
78,812✔
856
                        case '\n':
39,406✔
857
                                if trace {
40,047✔
858
                                        c.traceInOp("PING", nil)
641✔
859
                                }
641✔
860
                                c.processPing()
39,406✔
861
                                c.drop, c.state = 0, OP_START
39,406✔
862
                        }
863
                case OP_PO:
27,560✔
864
                        switch b {
27,560✔
865
                        case 'N', 'n':
27,560✔
866
                                c.state = OP_PON
27,560✔
867
                        default:
×
868
                                goto parseErr
×
869
                        }
870
                case OP_PON:
27,560✔
871
                        switch b {
27,560✔
872
                        case 'G', 'g':
27,560✔
873
                                c.state = OP_PONG
27,560✔
874
                        default:
×
875
                                goto parseErr
×
876
                        }
877
                case OP_PONG:
55,120✔
878
                        switch b {
55,120✔
879
                        case '\n':
27,560✔
880
                                if trace {
27,627✔
881
                                        c.traceInOp("PONG", nil)
67✔
882
                                }
67✔
883
                                c.processPong()
27,560✔
884
                                c.drop, c.state = 0, OP_START
27,560✔
885
                        }
886
                case OP_C:
43,698✔
887
                        switch b {
43,698✔
888
                        case 'O', 'o':
43,698✔
889
                                c.state = OP_CO
43,698✔
890
                        default:
×
891
                                goto parseErr
×
892
                        }
893
                case OP_CO:
43,698✔
894
                        switch b {
43,698✔
895
                        case 'N', 'n':
43,698✔
896
                                c.state = OP_CON
43,698✔
897
                        default:
×
898
                                goto parseErr
×
899
                        }
900
                case OP_CON:
43,698✔
901
                        switch b {
43,698✔
902
                        case 'N', 'n':
43,698✔
903
                                c.state = OP_CONN
43,698✔
904
                        default:
×
905
                                goto parseErr
×
906
                        }
907
                case OP_CONN:
43,698✔
908
                        switch b {
43,698✔
909
                        case 'E', 'e':
43,698✔
910
                                c.state = OP_CONNE
43,698✔
911
                        default:
×
912
                                goto parseErr
×
913
                        }
914
                case OP_CONNE:
43,698✔
915
                        switch b {
43,698✔
916
                        case 'C', 'c':
43,698✔
917
                                c.state = OP_CONNEC
43,698✔
918
                        default:
×
919
                                goto parseErr
×
920
                        }
921
                case OP_CONNEC:
43,698✔
922
                        switch b {
43,698✔
923
                        case 'T', 't':
43,698✔
924
                                c.state = OP_CONNECT
43,698✔
925
                        default:
×
926
                                goto parseErr
×
927
                        }
928
                case OP_CONNECT:
87,396✔
929
                        switch b {
87,396✔
930
                        case ' ', '\t':
43,698✔
931
                                continue
43,698✔
932
                        default:
43,698✔
933
                                c.state = CONNECT_ARG
43,698✔
934
                                c.as = i
43,698✔
935
                        }
936
                case CONNECT_ARG:
9,241,308✔
937
                        switch b {
9,241,308✔
938
                        case '\r':
43,698✔
939
                                c.drop = 1
43,698✔
940
                        case '\n':
43,698✔
941
                                var arg []byte
43,698✔
942
                                if c.argBuf != nil {
46,107✔
943
                                        arg = c.argBuf
2,409✔
944
                                        c.argBuf = nil
2,409✔
945
                                } else {
43,698✔
946
                                        arg = buf[c.as : i-c.drop]
41,289✔
947
                                }
41,289✔
948
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
43,698✔
949
                                        return err
×
950
                                }
×
951
                                if trace {
44,379✔
952
                                        c.traceInOp("CONNECT", removeSecretsFromTrace(arg))
681✔
953
                                }
681✔
954
                                if err := c.processConnect(arg); err != nil {
43,943✔
955
                                        return err
245✔
956
                                }
245✔
957
                                c.drop, c.state = 0, OP_START
43,453✔
958
                                // Reset notion on authSet
43,453✔
959
                                c.mu.Lock()
43,453✔
960
                                authSet = c.awaitingAuth()
43,453✔
961
                                c.mu.Unlock()
43,453✔
962
                        default:
9,153,912✔
963
                                if c.argBuf != nil {
9,905,781✔
964
                                        c.argBuf = append(c.argBuf, b)
751,869✔
965
                                }
751,869✔
966
                        }
967
                case OP_M:
14,513,386✔
968
                        switch b {
14,513,386✔
969
                        case 'S', 's':
14,513,386✔
970
                                c.state = OP_MS
14,513,386✔
971
                        default:
×
972
                                goto parseErr
×
973
                        }
974
                case OP_MS:
14,513,386✔
975
                        switch b {
14,513,386✔
976
                        case 'G', 'g':
14,513,386✔
977
                                c.state = OP_MSG
14,513,386✔
978
                        default:
×
979
                                goto parseErr
×
980
                        }
981
                case OP_MSG:
14,513,386✔
982
                        switch b {
14,513,386✔
983
                        case ' ', '\t':
14,513,386✔
984
                                c.state = OP_MSG_SPC
14,513,386✔
985
                        default:
×
986
                                goto parseErr
×
987
                        }
988
                case OP_MSG_SPC:
14,513,386✔
989
                        switch b {
14,513,386✔
990
                        case ' ', '\t':
×
991
                                continue
×
992
                        default:
14,513,386✔
993
                                c.pa.hdr = -1
14,513,386✔
994
                                c.state = MSG_ARG
14,513,386✔
995
                                c.as = i
14,513,386✔
996
                        }
997
                case MSG_ARG:
491,808,421✔
998
                        switch b {
491,808,421✔
999
                        case '\r':
14,513,383✔
1000
                                c.drop = 1
14,513,383✔
1001
                        case '\n':
14,513,382✔
1002
                                var arg []byte
14,513,382✔
1003
                                if c.argBuf != nil {
15,024,999✔
1004
                                        arg = c.argBuf
511,617✔
1005
                                        c.argBuf = nil
511,617✔
1006
                                } else {
14,513,382✔
1007
                                        arg = buf[c.as : i-c.drop]
14,001,765✔
1008
                                }
14,001,765✔
1009
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
14,513,382✔
1010
                                        return err
×
1011
                                }
×
1012
                                var err error
14,513,382✔
1013
                                if c.kind == ROUTER || c.kind == GATEWAY {
28,927,549✔
1014
                                        switch c.op {
14,414,167✔
1015
                                        case 'R', 'r':
14,406,306✔
1016
                                                if trace {
14,412,810✔
1017
                                                        c.traceInOp("RMSG", arg)
6,504✔
1018
                                                }
6,504✔
1019
                                                err = c.processRoutedMsgArgs(arg)
14,406,306✔
1020
                                        case 'L', 'l':
7,861✔
1021
                                                if trace {
8,120✔
1022
                                                        c.traceInOp("LMSG", arg)
259✔
1023
                                                }
259✔
1024
                                                lmsg = true
7,861✔
1025
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
7,861✔
1026
                                        }
1027
                                } else if c.kind == LEAF {
198,430✔
1028
                                        if trace {
99,232✔
1029
                                                c.traceInOp("LMSG", arg)
17✔
1030
                                        }
17✔
1031
                                        err = c.processLeafMsgArgs(arg)
99,215✔
1032
                                }
1033
                                if err != nil {
14,513,383✔
1034
                                        return err
1✔
1035
                                }
1✔
1036
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
14,513,381✔
1037

14,513,381✔
1038
                                // jump ahead with the index. If this overruns
14,513,381✔
1039
                                // what is left we fall out and process split
14,513,381✔
1040
                                // buffer.
14,513,381✔
1041
                                i = c.as + c.pa.size - LEN_CR_LF
14,513,381✔
1042
                        default:
462,781,656✔
1043
                                if c.argBuf != nil {
468,135,665✔
1044
                                        c.argBuf = append(c.argBuf, b)
5,354,009✔
1045
                                }
5,354,009✔
1046
                        }
1047
                case OP_I:
85,696✔
1048
                        switch b {
85,696✔
1049
                        case 'N', 'n':
85,696✔
1050
                                c.state = OP_IN
85,696✔
1051
                        default:
×
1052
                                goto parseErr
×
1053
                        }
1054
                case OP_IN:
85,696✔
1055
                        switch b {
85,696✔
1056
                        case 'F', 'f':
85,696✔
1057
                                c.state = OP_INF
85,696✔
1058
                        default:
×
1059
                                goto parseErr
×
1060
                        }
1061
                case OP_INF:
85,696✔
1062
                        switch b {
85,696✔
1063
                        case 'O', 'o':
85,696✔
1064
                                c.state = OP_INFO
85,696✔
1065
                        default:
×
1066
                                goto parseErr
×
1067
                        }
1068
                case OP_INFO:
171,392✔
1069
                        switch b {
171,392✔
1070
                        case ' ', '\t':
85,696✔
1071
                                continue
85,696✔
1072
                        default:
85,696✔
1073
                                c.state = INFO_ARG
85,696✔
1074
                                c.as = i
85,696✔
1075
                        }
1076
                case INFO_ARG:
32,508,269✔
1077
                        switch b {
32,508,269✔
1078
                        case '\r':
85,695✔
1079
                                c.drop = 1
85,695✔
1080
                        case '\n':
85,695✔
1081
                                var arg []byte
85,695✔
1082
                                if c.argBuf != nil {
94,300✔
1083
                                        arg = c.argBuf
8,605✔
1084
                                        c.argBuf = nil
8,605✔
1085
                                } else {
85,695✔
1086
                                        arg = buf[c.as : i-c.drop]
77,090✔
1087
                                }
77,090✔
1088
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
85,695✔
1089
                                        return err
×
1090
                                }
×
1091
                                if err := c.processInfo(arg); err != nil {
85,695✔
1092
                                        return err
×
1093
                                }
×
1094
                                c.drop, c.as, c.state = 0, i+1, OP_START
85,695✔
1095
                        default:
32,336,879✔
1096
                                if c.argBuf != nil {
33,303,406✔
1097
                                        c.argBuf = append(c.argBuf, b)
966,527✔
1098
                                }
966,527✔
1099
                        }
1100
                case OP_PLUS:
×
1101
                        switch b {
×
1102
                        case 'O', 'o':
×
1103
                                c.state = OP_PLUS_O
×
1104
                        default:
×
1105
                                goto parseErr
×
1106
                        }
1107
                case OP_PLUS_O:
×
1108
                        switch b {
×
1109
                        case 'K', 'k':
×
1110
                                c.state = OP_PLUS_OK
×
1111
                        default:
×
1112
                                goto parseErr
×
1113
                        }
1114
                case OP_PLUS_OK:
×
1115
                        switch b {
×
1116
                        case '\n':
×
1117
                                c.drop, c.state = 0, OP_START
×
1118
                        }
1119
                case OP_MINUS:
105✔
1120
                        switch b {
105✔
1121
                        case 'E', 'e':
105✔
1122
                                c.state = OP_MINUS_E
105✔
1123
                        default:
×
1124
                                goto parseErr
×
1125
                        }
1126
                case OP_MINUS_E:
105✔
1127
                        switch b {
105✔
1128
                        case 'R', 'r':
105✔
1129
                                c.state = OP_MINUS_ER
105✔
1130
                        default:
×
1131
                                goto parseErr
×
1132
                        }
1133
                case OP_MINUS_ER:
105✔
1134
                        switch b {
105✔
1135
                        case 'R', 'r':
105✔
1136
                                c.state = OP_MINUS_ERR
105✔
1137
                        default:
×
1138
                                goto parseErr
×
1139
                        }
1140
                case OP_MINUS_ERR:
105✔
1141
                        switch b {
105✔
1142
                        case ' ', '\t':
105✔
1143
                                c.state = OP_MINUS_ERR_SPC
105✔
1144
                        default:
×
1145
                                goto parseErr
×
1146
                        }
1147
                case OP_MINUS_ERR_SPC:
105✔
1148
                        switch b {
105✔
1149
                        case ' ', '\t':
×
1150
                                continue
×
1151
                        default:
105✔
1152
                                c.state = MINUS_ERR_ARG
105✔
1153
                                c.as = i
105✔
1154
                        }
1155
                case MINUS_ERR_ARG:
4,301✔
1156
                        switch b {
4,301✔
1157
                        case '\r':
105✔
1158
                                c.drop = 1
105✔
1159
                        case '\n':
105✔
1160
                                var arg []byte
105✔
1161
                                if c.argBuf != nil {
106✔
1162
                                        arg = c.argBuf
1✔
1163
                                        c.argBuf = nil
1✔
1164
                                } else {
105✔
1165
                                        arg = buf[c.as : i-c.drop]
104✔
1166
                                }
104✔
1167
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
105✔
1168
                                        return err
×
1169
                                }
×
1170
                                c.processErr(string(arg))
105✔
1171
                                c.drop, c.as, c.state = 0, i+1, OP_START
105✔
1172
                        default:
4,091✔
1173
                                if c.argBuf != nil {
4,093✔
1174
                                        c.argBuf = append(c.argBuf, b)
2✔
1175
                                }
2✔
1176
                        }
1177
                default:
×
1178
                        goto parseErr
×
1179
                }
1180
        }
1181

1182
        // Check for split buffer scenarios for any ARG state.
1183
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
15,716,111✔
1184
                c.state == PUB_ARG || c.state == HPUB_ARG ||
15,716,111✔
1185
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
15,716,111✔
1186
                c.state == MSG_ARG || c.state == HMSG_ARG ||
15,716,111✔
1187
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
16,306,576✔
1188

590,465✔
1189
                // Setup a holder buffer to deal with split buffer scenario.
590,465✔
1190
                if c.argBuf == nil {
1,180,632✔
1191
                        c.argBuf = c.scratch[:0]
590,167✔
1192
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
590,167✔
1193
                }
590,167✔
1194
                // Check for violations of control line length here. Note that this is not
1195
                // exact at all but the performance hit is too great to be precise, and
1196
                // catching here should prevent memory exhaustion attacks.
1197
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
590,466✔
1198
                        return err
1✔
1199
                }
1✔
1200
        }
1201

1202
        // Check for split msg
1203
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
17,416,604✔
1204
                // We need to clone the pubArg if it is still referencing the
1,700,494✔
1205
                // read buffer and we are not able to process the msg.
1,700,494✔
1206

1,700,494✔
1207
                if c.argBuf == nil {
3,400,988✔
1208
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
1,700,494✔
1209
                        if err := c.clonePubArg(lmsg); err != nil {
1,700,494✔
1210
                                goto parseErr
×
1211
                        }
1212
                }
1213

1214
                // If we will overflow the scratch buffer, just create a
1215
                // new buffer to hold the split message.
1216
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
1,735,193✔
1217
                        lrem := len(buf[c.as:])
34,699✔
1218
                        // Consider it a protocol error when the remaining payload
34,699✔
1219
                        // is larger than the reported size for PUB. It can happen
34,699✔
1220
                        // when processing incomplete messages from rogue clients.
34,699✔
1221
                        if lrem > c.pa.size+LEN_CR_LF {
34,699✔
1222
                                goto parseErr
×
1223
                        }
1224
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
34,699✔
1225
                        copy(c.msgBuf, buf[c.as:])
34,699✔
1226
                } else {
1,665,795✔
1227
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
1,665,795✔
1228
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
1,665,795✔
1229
                }
1,665,795✔
1230
        }
1231

1232
        return nil
15,716,110✔
1233

15,716,110✔
1234
authErr:
15,716,110✔
1235
        c.authViolation()
29✔
1236
        return ErrAuthentication
29✔
1237

29✔
1238
parseErr:
29✔
1239
        c.sendErr("Unknown Protocol Operation")
4✔
1240
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
4✔
1241
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
4✔
1242
        return err
4✔
1243
}
1244

1245
func protoSnippet(start, max int, buf []byte) string {
5✔
1246
        stop := start + max
5✔
1247
        bufSize := len(buf)
5✔
1248
        if start >= bufSize {
5✔
1249
                return `""`
×
1250
        }
×
1251
        if stop > bufSize {
8✔
1252
                stop = bufSize - 1
3✔
1253
        }
3✔
1254
        return fmt.Sprintf("%q", buf[start:stop])
5✔
1255
}
1256

1257
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1258
// If so, an error is sent to the client and the connection is closed.
1259
// The error ErrMaxControlLine is returned.
1260
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
29,631,785✔
1261
        // Widen to int64 so mcl*16 cannot overflow for large configured values.
29,631,785✔
1262
        effective := int64(mcl)
29,631,785✔
1263
        if c.kind != CLIENT {
47,223,324✔
1264
                // This is the upper bound on argBuf length for LEAF, ROUTER, and GATEWAY connections.
17,591,539✔
1265
                // These kinds need longer arg lines than CLIENT (which is capped at mcl=4096 by default)
17,591,539✔
1266
                // because cluster/leaf frames encode origin, account, reply, and queue groups.
17,591,539✔
1267
                // By default, this is 64 KB, which matches maxBufSize so a single oversized read
17,591,539✔
1268
                // is caught on the very next parse call.
17,591,539✔
1269
                effective *= 16
17,591,539✔
1270
        }
17,591,539✔
1271
        if int64(len(arg)) > effective {
29,631,786✔
1272
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
1✔
1273
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
1✔
1274
                c.sendErr(err.Error())
1✔
1275
                c.closeConnection(MaxControlLineExceeded)
1✔
1276
                return err
1✔
1277
        }
1✔
1278
        return nil
29,631,784✔
1279
}
1280

1281
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1282
// we need to hold onto it into the next read.
1283
func (c *client) clonePubArg(lmsg bool) error {
1,700,494✔
1284
        // Just copy and re-process original arg buffer.
1,700,494✔
1285
        c.argBuf = c.scratch[:0]
1,700,494✔
1286
        c.argBuf = append(c.argBuf, c.pa.arg...)
1,700,494✔
1287

1,700,494✔
1288
        switch c.kind {
1,700,494✔
1289
        case ROUTER, GATEWAY:
1,654,896✔
1290
                if lmsg {
1,655,540✔
1291
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
644✔
1292
                }
644✔
1293
                if c.pa.hdr < 0 {
3,092,831✔
1294
                        return c.processRoutedMsgArgs(c.argBuf)
1,438,579✔
1295
                } else {
1,654,252✔
1296
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
215,673✔
1297
                }
215,673✔
1298
        case LEAF:
1,461✔
1299
                if c.pa.hdr < 0 {
2,813✔
1300
                        return c.processLeafMsgArgs(c.argBuf)
1,352✔
1301
                } else {
1,461✔
1302
                        return c.processLeafHeaderMsgArgs(c.argBuf)
109✔
1303
                }
109✔
1304
        default:
44,137✔
1305
                if c.pa.hdr < 0 {
82,292✔
1306
                        return c.processPub(c.argBuf)
38,155✔
1307
                } else {
44,137✔
1308
                        return c.processHeaderPub(c.argBuf, nil)
5,982✔
1309
                }
5,982✔
1310
        }
1311
}
1312

1313
func (ps *parseState) getHeader() http.Header {
831✔
1314
        if ps.header == nil {
1,662✔
1315
                if hdr := ps.pa.hdr; hdr > 0 {
904✔
1316
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1317
                        tp := textproto.NewReader(reader)
73✔
1318
                        tp.ReadLine() // skip over first line, contains version
73✔
1319
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1320
                                ps.header = http.Header(mimeHeader)
73✔
1321
                        }
73✔
1322
                }
1323
        }
1324
        return ps.header
831✔
1325
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc