• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24327339784

10 Apr 2026 02:22PM UTC coverage: 81.808% (-1.2%) from 82.972%
24327339784

push

github

web-flow
(2.14) [ADDED] Config reload: add/remove remote leafnodes (#7937)

The configuration reload now supports adding and/or removing remote
leafnodes. A remote is identified with the combination of its URLs list,
local account and credentials file name. This is what is used by the
server to detect changes for the remote leafnodes list.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

75037 of 91723 relevant lines covered (81.81%)

483084.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.6
/server/parser.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
        argsa   [MAX_HMSG_ARGS + 1][]byte // pre-allocated args array to avoid per-call heap escape
36
}
37

38
type pubArg struct {
39
        arg       []byte
40
        pacache   []byte
41
        origin    []byte
42
        account   []byte
43
        subject   []byte
44
        deliver   []byte
45
        mapped    []byte
46
        reply     []byte
47
        szb       []byte
48
        hdb       []byte
49
        queues    [][]byte
50
        size      int
51
        hdr       int
52
        psi       []*serviceImport
53
        trace     *msgTrace
54
        delivered bool // Only used for service imports
55
}
56

57
// Parser constants
58
const (
59
        OP_START parserState = iota
60
        OP_PLUS
61
        OP_PLUS_O
62
        OP_PLUS_OK
63
        OP_MINUS
64
        OP_MINUS_E
65
        OP_MINUS_ER
66
        OP_MINUS_ERR
67
        OP_MINUS_ERR_SPC
68
        MINUS_ERR_ARG
69
        OP_C
70
        OP_CO
71
        OP_CON
72
        OP_CONN
73
        OP_CONNE
74
        OP_CONNEC
75
        OP_CONNECT
76
        CONNECT_ARG
77
        OP_H
78
        OP_HP
79
        OP_HPU
80
        OP_HPUB
81
        OP_HPUB_SPC
82
        HPUB_ARG
83
        OP_HM
84
        OP_HMS
85
        OP_HMSG
86
        OP_HMSG_SPC
87
        HMSG_ARG
88
        OP_P
89
        OP_PU
90
        OP_PUB
91
        OP_PUB_SPC
92
        PUB_ARG
93
        OP_PI
94
        OP_PIN
95
        OP_PING
96
        OP_PO
97
        OP_PON
98
        OP_PONG
99
        MSG_PAYLOAD
100
        MSG_END_R
101
        MSG_END_N
102
        OP_S
103
        OP_SU
104
        OP_SUB
105
        OP_SUB_SPC
106
        SUB_ARG
107
        OP_A
108
        OP_ASUB
109
        OP_ASUB_SPC
110
        ASUB_ARG
111
        OP_AUSUB
112
        OP_AUSUB_SPC
113
        AUSUB_ARG
114
        OP_L
115
        OP_LS
116
        OP_R
117
        OP_RS
118
        OP_U
119
        OP_UN
120
        OP_UNS
121
        OP_UNSU
122
        OP_UNSUB
123
        OP_UNSUB_SPC
124
        UNSUB_ARG
125
        OP_M
126
        OP_MS
127
        OP_MSG
128
        OP_MSG_SPC
129
        MSG_ARG
130
        OP_I
131
        OP_IN
132
        OP_INF
133
        OP_INFO
134
        INFO_ARG
135
)
136

137
func (c *client) parse(buf []byte) error {
11,287,316✔
138
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
11,287,316✔
139
        // an issue (say data race detection), we could branch outside in readLoop
11,287,316✔
140
        if c.isMqtt() {
11,289,721✔
141
                return c.mqttParse(buf)
2,405✔
142
        }
2,405✔
143
        var i int
11,284,911✔
144
        var b byte
11,284,911✔
145
        var lmsg bool
11,284,911✔
146

11,284,911✔
147
        // Snapshots
11,284,911✔
148
        c.mu.Lock()
11,284,911✔
149
        // Snapshot and then reset when we receive a
11,284,911✔
150
        // proper CONNECT if needed.
11,284,911✔
151
        authSet := c.awaitingAuth()
11,284,911✔
152
        // Snapshot max control line as well.
11,284,911✔
153
        s, mcl, trace := c.srv, c.mcl, c.trace
11,284,911✔
154
        c.mu.Unlock()
11,284,911✔
155

11,284,911✔
156
        // Move to loop instead of range syntax to allow jumping of i
11,284,911✔
157
        for i = 0; i < len(buf); i++ {
859,493,544✔
158
                b = buf[i]
848,208,633✔
159

848,208,633✔
160
                switch c.state {
848,208,633✔
161
                case OP_START:
20,402,756✔
162
                        c.op = b
20,402,756✔
163
                        if b != 'C' && b != 'c' {
40,762,593✔
164
                                if authSet {
20,359,853✔
165
                                        if s == nil {
16✔
166
                                                goto authErr
×
167
                                        }
168
                                        var ok bool
16✔
169
                                        // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
16✔
170
                                        // E.g. telnet proto demos.
16✔
171
                                        if noAuthUser := s.getOpts().NoAuthUser; noAuthUser != _EMPTY_ {
18✔
172
                                                s.mu.Lock()
2✔
173
                                                user, exists := s.users[noAuthUser]
2✔
174
                                                s.mu.Unlock()
2✔
175
                                                if exists {
4✔
176
                                                        c.RegisterUser(user)
2✔
177
                                                        c.mu.Lock()
2✔
178
                                                        c.clearAuthTimer()
2✔
179
                                                        c.flags.set(connectReceived)
2✔
180
                                                        c.mu.Unlock()
2✔
181
                                                        authSet, ok = false, true
2✔
182
                                                }
2✔
183
                                        }
184
                                        if !ok {
30✔
185
                                                goto authErr
14✔
186
                                        }
187
                                }
188
                                // If the connection is a gateway connection, make sure that
189
                                // if this is an inbound, it starts with a CONNECT.
190
                                if c.kind == GATEWAY && !c.gw.outbound && !c.gw.connected {
20,359,823✔
191
                                        // Use auth violation since no CONNECT was sent.
×
192
                                        // It could be a parseErr too.
×
193
                                        goto authErr
×
194
                                }
195
                        }
196
                        switch b {
20,402,742✔
197
                        case 'P', 'p':
7,200,683✔
198
                                c.state = OP_P
7,200,683✔
199
                        case 'H', 'h':
1,046,360✔
200
                                c.state = OP_H
1,046,360✔
201
                        case 'S', 's':
136,891✔
202
                                c.state = OP_S
136,891✔
203
                        case 'U', 'u':
16,268✔
204
                                c.state = OP_U
16,268✔
205
                        case 'R', 'r':
11,723,173✔
206
                                if c.kind == CLIENT {
11,723,173✔
207
                                        goto parseErr
×
208
                                } else {
11,723,173✔
209
                                        c.state = OP_R
11,723,173✔
210
                                }
11,723,173✔
211
                        case 'L', 'l':
151,366✔
212
                                if c.kind != LEAF && c.kind != ROUTER {
151,366✔
213
                                        goto parseErr
×
214
                                } else {
151,366✔
215
                                        c.state = OP_L
151,366✔
216
                                }
151,366✔
217
                        case 'A', 'a':
32✔
218
                                if c.kind == CLIENT {
32✔
219
                                        goto parseErr
×
220
                                } else {
32✔
221
                                        c.state = OP_A
32✔
222
                                }
32✔
223
                        case 'C', 'c':
42,919✔
224
                                c.state = OP_C
42,919✔
225
                        case 'I', 'i':
84,939✔
226
                                c.state = OP_I
84,939✔
227
                        case '+':
×
228
                                c.state = OP_PLUS
×
229
                        case '-':
108✔
230
                                c.state = OP_MINUS
108✔
231
                        default:
3✔
232
                                goto parseErr
3✔
233
                        }
234
                case OP_H:
1,046,360✔
235
                        switch b {
1,046,360✔
236
                        case 'P', 'p':
565,618✔
237
                                c.state = OP_HP
565,618✔
238
                        case 'M', 'm':
480,742✔
239
                                c.state = OP_HM
480,742✔
240
                        default:
×
241
                                goto parseErr
×
242
                        }
243
                case OP_HP:
565,618✔
244
                        switch b {
565,618✔
245
                        case 'U', 'u':
565,618✔
246
                                c.state = OP_HPU
565,618✔
247
                        default:
×
248
                                goto parseErr
×
249
                        }
250
                case OP_HPU:
565,618✔
251
                        switch b {
565,618✔
252
                        case 'B', 'b':
565,618✔
253
                                c.state = OP_HPUB
565,618✔
254
                        default:
×
255
                                goto parseErr
×
256
                        }
257
                case OP_HPUB:
565,618✔
258
                        switch b {
565,618✔
259
                        case ' ', '\t':
565,618✔
260
                                c.state = OP_HPUB_SPC
565,618✔
261
                        default:
×
262
                                goto parseErr
×
263
                        }
264
                case OP_HPUB_SPC:
565,618✔
265
                        switch b {
565,618✔
266
                        case ' ', '\t':
×
267
                                continue
×
268
                        default:
565,618✔
269
                                c.pa.hdr = 0
565,618✔
270
                                c.state = HPUB_ARG
565,618✔
271
                                c.as = i
565,618✔
272
                        }
273
                case HPUB_ARG:
20,329,605✔
274
                        switch b {
20,329,605✔
275
                        case '\r':
565,617✔
276
                                c.drop = 1
565,617✔
277
                        case '\n':
565,617✔
278
                                var arg []byte
565,617✔
279
                                if c.argBuf != nil {
565,700✔
280
                                        arg = c.argBuf
83✔
281
                                        c.argBuf = nil
83✔
282
                                } else {
565,617✔
283
                                        arg = buf[c.as : i-c.drop]
565,534✔
284
                                }
565,534✔
285
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
565,617✔
286
                                        return err
×
287
                                }
×
288
                                if trace {
631,395✔
289
                                        c.traceInOp("HPUB", arg)
65,778✔
290
                                }
65,778✔
291
                                var remaining []byte
565,617✔
292
                                if i < len(buf) {
1,131,234✔
293
                                        remaining = buf[i+1:]
565,617✔
294
                                }
565,617✔
295
                                if err := c.processHeaderPub(arg, remaining); err != nil {
565,623✔
296
                                        return err
6✔
297
                                }
6✔
298

299
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
565,611✔
300
                                // If we don't have a saved buffer then jump ahead with
565,611✔
301
                                // the index. If this overruns what is left we fall out
565,611✔
302
                                // and process split buffer.
565,611✔
303
                                if c.msgBuf == nil {
1,131,222✔
304
                                        i = c.as + c.pa.size - LEN_CR_LF
565,611✔
305
                                }
565,611✔
306
                        default:
19,198,371✔
307
                                if c.argBuf != nil {
19,199,422✔
308
                                        c.argBuf = append(c.argBuf, b)
1,051✔
309
                                }
1,051✔
310
                        }
311
                case OP_HM:
480,742✔
312
                        switch b {
480,742✔
313
                        case 'S', 's':
480,742✔
314
                                c.state = OP_HMS
480,742✔
315
                        default:
×
316
                                goto parseErr
×
317
                        }
318
                case OP_HMS:
480,742✔
319
                        switch b {
480,742✔
320
                        case 'G', 'g':
480,742✔
321
                                c.state = OP_HMSG
480,742✔
322
                        default:
×
323
                                goto parseErr
×
324
                        }
325
                case OP_HMSG:
480,742✔
326
                        switch b {
480,742✔
327
                        case ' ', '\t':
480,742✔
328
                                c.state = OP_HMSG_SPC
480,742✔
329
                        default:
×
330
                                goto parseErr
×
331
                        }
332
                case OP_HMSG_SPC:
480,742✔
333
                        switch b {
480,742✔
334
                        case ' ', '\t':
×
335
                                continue
×
336
                        default:
480,742✔
337
                                c.pa.hdr = 0
480,742✔
338
                                c.state = HMSG_ARG
480,742✔
339
                                c.as = i
480,742✔
340
                        }
341
                case HMSG_ARG:
46,539,097✔
342
                        switch b {
46,539,097✔
343
                        case '\r':
480,742✔
344
                                c.drop = 1
480,742✔
345
                        case '\n':
480,742✔
346
                                var arg []byte
480,742✔
347
                                if c.argBuf != nil {
484,112✔
348
                                        arg = c.argBuf
3,370✔
349
                                        c.argBuf = nil
3,370✔
350
                                } else {
480,742✔
351
                                        arg = buf[c.as : i-c.drop]
477,372✔
352
                                }
477,372✔
353
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
480,742✔
354
                                        return err
×
355
                                }
×
356
                                var err error
480,742✔
357
                                if c.kind == ROUTER || c.kind == GATEWAY {
961,099✔
358
                                        if trace {
482,501✔
359
                                                c.traceInOp("HMSG", arg)
2,144✔
360
                                        }
2,144✔
361
                                        err = c.processRoutedHeaderMsgArgs(arg)
480,357✔
362
                                } else if c.kind == LEAF {
770✔
363
                                        if trace {
535✔
364
                                                c.traceInOp("HMSG", arg)
150✔
365
                                        }
150✔
366
                                        err = c.processLeafHeaderMsgArgs(arg)
385✔
367
                                }
368
                                if err != nil {
480,742✔
369
                                        return err
×
370
                                }
×
371
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
480,742✔
372

480,742✔
373
                                // jump ahead with the index. If this overruns
480,742✔
374
                                // what is left we fall out and process split
480,742✔
375
                                // buffer.
480,742✔
376
                                i = c.as + c.pa.size - LEN_CR_LF
480,742✔
377
                        default:
45,577,613✔
378
                                if c.argBuf != nil {
45,745,374✔
379
                                        c.argBuf = append(c.argBuf, b)
167,761✔
380
                                }
167,761✔
381
                        }
382
                case OP_P:
7,200,683✔
383
                        switch b {
7,200,683✔
384
                        case 'U', 'u':
7,131,094✔
385
                                c.state = OP_PU
7,131,094✔
386
                        case 'I', 'i':
40,782✔
387
                                c.state = OP_PI
40,782✔
388
                        case 'O', 'o':
28,807✔
389
                                c.state = OP_PO
28,807✔
390
                        default:
×
391
                                goto parseErr
×
392
                        }
393
                case OP_PU:
7,131,092✔
394
                        switch b {
7,131,092✔
395
                        case 'B', 'b':
7,131,092✔
396
                                c.state = OP_PUB
7,131,092✔
397
                        default:
×
398
                                goto parseErr
×
399
                        }
400
                case OP_PUB:
7,131,092✔
401
                        switch b {
7,131,092✔
402
                        case ' ', '\t':
7,131,091✔
403
                                c.state = OP_PUB_SPC
7,131,091✔
404
                        default:
1✔
405
                                goto parseErr
1✔
406
                        }
407
                case OP_PUB_SPC:
7,131,089✔
408
                        switch b {
7,131,089✔
409
                        case ' ', '\t':
×
410
                                continue
×
411
                        default:
7,131,089✔
412
                                c.pa.hdr = -1
7,131,089✔
413
                                c.state = PUB_ARG
7,131,089✔
414
                                c.as = i
7,131,089✔
415
                        }
416
                case PUB_ARG:
137,498,950✔
417
                        switch b {
137,498,950✔
418
                        case '\r':
7,131,084✔
419
                                c.drop = 1
7,131,084✔
420
                        case '\n':
7,131,084✔
421
                                var arg []byte
7,131,084✔
422
                                if c.argBuf != nil {
7,156,591✔
423
                                        arg = c.argBuf
25,507✔
424
                                        c.argBuf = nil
25,507✔
425
                                } else {
7,131,084✔
426
                                        arg = buf[c.as : i-c.drop]
7,105,577✔
427
                                }
7,105,577✔
428
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
7,131,084✔
429
                                        return err
×
430
                                }
×
431
                                if trace {
7,460,223✔
432
                                        c.traceInOp("PUB", arg)
329,139✔
433
                                }
329,139✔
434
                                if err := c.processPub(arg); err != nil {
7,131,090✔
435
                                        return err
6✔
436
                                }
6✔
437

438
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
7,131,078✔
439
                                // If we don't have a saved buffer then jump ahead with
7,131,078✔
440
                                // the index. If this overruns what is left we fall out
7,131,078✔
441
                                // and process split buffer.
7,131,078✔
442
                                if c.msgBuf == nil {
14,262,156✔
443
                                        i = c.as + c.pa.size - LEN_CR_LF
7,131,078✔
444
                                }
7,131,078✔
445
                        default:
123,236,782✔
446
                                if c.argBuf != nil {
123,727,473✔
447
                                        c.argBuf = append(c.argBuf, b)
490,691✔
448
                                }
490,691✔
449
                        }
450
                case MSG_PAYLOAD:
19,298,331✔
451
                        if c.msgBuf != nil {
20,925,694✔
452
                                // copy as much as we can to the buffer and skip ahead.
1,627,363✔
453
                                toCopy := c.pa.size - len(c.msgBuf)
1,627,363✔
454
                                avail := len(buf) - i
1,627,363✔
455
                                if avail < toCopy {
2,125,524✔
456
                                        toCopy = avail
498,161✔
457
                                }
498,161✔
458
                                if toCopy > 0 {
3,254,726✔
459
                                        start := len(c.msgBuf)
1,627,363✔
460
                                        // This is needed for copy to work.
1,627,363✔
461
                                        c.msgBuf = c.msgBuf[:start+toCopy]
1,627,363✔
462
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
1,627,363✔
463
                                        // Update our index
1,627,363✔
464
                                        i = (i + toCopy) - 1
1,627,363✔
465
                                } else {
1,627,363✔
466
                                        // Fall back to append if needed.
×
467
                                        c.msgBuf = append(c.msgBuf, b)
×
468
                                }
×
469
                                if len(c.msgBuf) >= c.pa.size {
2,756,565✔
470
                                        c.state = MSG_END_R
1,129,202✔
471
                                }
1,129,202✔
472
                        } else if i-c.as+1 >= c.pa.size {
35,341,936✔
473
                                c.state = MSG_END_R
17,670,968✔
474
                        }
17,670,968✔
475
                case MSG_END_R:
18,800,169✔
476
                        if b != '\r' {
18,800,170✔
477
                                goto parseErr
1✔
478
                        }
479
                        if c.msgBuf != nil {
19,936,620✔
480
                                c.msgBuf = append(c.msgBuf, b)
1,136,452✔
481
                        }
1,136,452✔
482
                        c.state = MSG_END_N
18,800,168✔
483
                case MSG_END_N:
18,800,168✔
484
                        if b != '\n' {
18,800,168✔
485
                                goto parseErr
×
486
                        }
487
                        if c.msgBuf != nil {
19,938,594✔
488
                                c.msgBuf = append(c.msgBuf, b)
1,138,426✔
489
                        } else {
18,800,168✔
490
                                c.msgBuf = buf[c.as : i+1]
17,661,742✔
491
                        }
17,661,742✔
492

493
                        var mt *msgTrace
18,800,168✔
494
                        if c.pa.hdr > 0 {
19,846,952✔
495
                                mt = c.initMsgTrace()
1,046,784✔
496
                        }
1,046,784✔
497
                        // Check for mappings.
498
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
18,832,930✔
499
                                changed := c.selectMappedSubject()
32,762✔
500
                                if changed {
45,338✔
501
                                        if trace {
12,650✔
502
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
74✔
503
                                        }
74✔
504
                                        // c.pa.subject is the subject the original is now mapped to.
505
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,576✔
506
                                }
507
                        }
508
                        if trace {
19,204,303✔
509
                                c.traceMsg(c.msgBuf)
404,135✔
510
                        }
404,135✔
511

512
                        c.processInboundMsg(c.msgBuf)
18,800,168✔
513

18,800,168✔
514
                        mt.sendEvent()
18,800,168✔
515
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
18,800,168✔
516
                        c.drop, c.as, c.state = 0, i+1, OP_START
18,800,168✔
517
                        // Drop all pub args
18,800,168✔
518
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
18,800,168✔
519
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
18,800,168✔
520
                        c.pa.trace = nil
18,800,168✔
521
                        c.pa.delivered = false
18,800,168✔
522
                        lmsg = false
18,800,168✔
523
                case OP_A:
32✔
524
                        switch b {
32✔
525
                        case '+':
4✔
526
                                c.state = OP_ASUB
4✔
527
                        case '-', 'u':
28✔
528
                                c.state = OP_AUSUB
28✔
529
                        default:
×
530
                                goto parseErr
×
531
                        }
532
                case OP_ASUB:
4✔
533
                        switch b {
4✔
534
                        case ' ', '\t':
4✔
535
                                c.state = OP_ASUB_SPC
4✔
536
                        default:
×
537
                                goto parseErr
×
538
                        }
539
                case OP_ASUB_SPC:
4✔
540
                        switch b {
4✔
541
                        case ' ', '\t':
×
542
                                continue
×
543
                        default:
4✔
544
                                c.state = ASUB_ARG
4✔
545
                                c.as = i
4✔
546
                        }
547
                case ASUB_ARG:
14✔
548
                        switch b {
14✔
549
                        case '\r':
4✔
550
                                c.drop = 1
4✔
551
                        case '\n':
4✔
552
                                var arg []byte
4✔
553
                                if c.argBuf != nil {
4✔
554
                                        arg = c.argBuf
×
555
                                        c.argBuf = nil
×
556
                                } else {
4✔
557
                                        arg = buf[c.as : i-c.drop]
4✔
558
                                }
4✔
559
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
560
                                        return err
×
561
                                }
×
562
                                if trace {
4✔
563
                                        c.traceInOp("A+", arg)
×
564
                                }
×
565
                                if err := c.processAccountSub(arg); err != nil {
4✔
566
                                        return err
×
567
                                }
×
568
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
569
                        default:
6✔
570
                                if c.argBuf != nil {
6✔
571
                                        c.argBuf = append(c.argBuf, b)
×
572
                                }
×
573
                        }
574
                case OP_AUSUB:
28✔
575
                        switch b {
28✔
576
                        case ' ', '\t':
28✔
577
                                c.state = OP_AUSUB_SPC
28✔
578
                        default:
×
579
                                goto parseErr
×
580
                        }
581
                case OP_AUSUB_SPC:
28✔
582
                        switch b {
28✔
583
                        case ' ', '\t':
×
584
                                continue
×
585
                        default:
28✔
586
                                c.state = AUSUB_ARG
28✔
587
                                c.as = i
28✔
588
                        }
589
                case AUSUB_ARG:
142✔
590
                        switch b {
142✔
591
                        case '\r':
28✔
592
                                c.drop = 1
28✔
593
                        case '\n':
28✔
594
                                var arg []byte
28✔
595
                                if c.argBuf != nil {
28✔
596
                                        arg = c.argBuf
×
597
                                        c.argBuf = nil
×
598
                                } else {
28✔
599
                                        arg = buf[c.as : i-c.drop]
28✔
600
                                }
28✔
601
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
602
                                        return err
×
603
                                }
×
604
                                if trace {
45✔
605
                                        c.traceInOp("A-", arg)
17✔
606
                                }
17✔
607
                                c.processAccountUnsub(arg)
28✔
608
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
609
                        default:
86✔
610
                                if c.argBuf != nil {
86✔
611
                                        c.argBuf = append(c.argBuf, b)
×
612
                                }
×
613
                        }
614
                case OP_S:
136,891✔
615
                        switch b {
136,891✔
616
                        case 'U', 'u':
136,891✔
617
                                c.state = OP_SU
136,891✔
618
                        default:
×
619
                                goto parseErr
×
620
                        }
621
                case OP_SU:
136,891✔
622
                        switch b {
136,891✔
623
                        case 'B', 'b':
136,891✔
624
                                c.state = OP_SUB
136,891✔
625
                        default:
×
626
                                goto parseErr
×
627
                        }
628
                case OP_SUB:
1,166,590✔
629
                        switch b {
1,166,590✔
630
                        case ' ', '\t':
1,166,589✔
631
                                c.state = OP_SUB_SPC
1,166,589✔
632
                        default:
1✔
633
                                goto parseErr
1✔
634
                        }
635
                case OP_SUB_SPC:
1,166,588✔
636
                        switch b {
1,166,588✔
637
                        case ' ', '\t':
×
638
                                continue
×
639
                        default:
1,166,588✔
640
                                c.state = SUB_ARG
1,166,588✔
641
                                c.as = i
1,166,588✔
642
                        }
643
                case SUB_ARG:
43,778,514✔
644
                        switch b {
43,778,514✔
645
                        case '\r':
1,166,548✔
646
                                c.drop = 1
1,166,548✔
647
                        case '\n':
1,166,548✔
648
                                var arg []byte
1,166,548✔
649
                                if c.argBuf != nil {
1,195,064✔
650
                                        arg = c.argBuf
28,516✔
651
                                        c.argBuf = nil
28,516✔
652
                                } else {
1,166,548✔
653
                                        arg = buf[c.as : i-c.drop]
1,138,032✔
654
                                }
1,138,032✔
655
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,166,548✔
656
                                        return err
×
657
                                }
×
658
                                var err error
1,166,548✔
659

1,166,548✔
660
                                switch c.kind {
1,166,548✔
661
                                case CLIENT:
136,890✔
662
                                        if trace {
150,674✔
663
                                                c.traceInOp("SUB", arg)
13,784✔
664
                                        }
13,784✔
665
                                        err = c.parseSub(arg, false)
136,890✔
666
                                case ROUTER:
835,489✔
667
                                        switch c.op {
835,489✔
668
                                        case 'R', 'r':
823,487✔
669
                                                if trace {
826,449✔
670
                                                        c.traceInOp("RS+", arg)
2,962✔
671
                                                }
2,962✔
672
                                                err = c.processRemoteSub(arg, false)
823,487✔
673
                                        case 'L', 'l':
12,002✔
674
                                                if trace {
12,196✔
675
                                                        c.traceInOp("LS+", arg)
194✔
676
                                                }
194✔
677
                                                err = c.processRemoteSub(arg, true)
12,002✔
678
                                        }
679
                                case GATEWAY:
161,192✔
680
                                        if trace {
161,554✔
681
                                                c.traceInOp("RS+", arg)
362✔
682
                                        }
362✔
683
                                        err = c.processGatewayRSub(arg)
161,192✔
684
                                case LEAF:
32,977✔
685
                                        if trace {
33,098✔
686
                                                c.traceInOp("LS+", arg)
121✔
687
                                        }
121✔
688
                                        err = c.processLeafSub(arg)
32,977✔
689
                                }
690
                                if err != nil {
1,166,548✔
691
                                        return err
×
692
                                }
×
693
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,166,548✔
694
                        default:
41,445,418✔
695
                                if c.argBuf != nil {
42,155,107✔
696
                                        c.argBuf = append(c.argBuf, b)
709,689✔
697
                                }
709,689✔
698
                        }
699
                case OP_L:
151,366✔
700
                        switch b {
151,366✔
701
                        case 'S', 's':
59,781✔
702
                                c.state = OP_LS
59,781✔
703
                        case 'M', 'm':
91,585✔
704
                                c.state = OP_M
91,585✔
705
                        default:
×
706
                                goto parseErr
×
707
                        }
708
                case OP_LS:
59,781✔
709
                        switch b {
59,781✔
710
                        case '+':
44,979✔
711
                                c.state = OP_SUB
44,979✔
712
                        case '-':
14,802✔
713
                                c.state = OP_UNSUB
14,802✔
714
                        default:
×
715
                                goto parseErr
×
716
                        }
717
                case OP_R:
11,723,172✔
718
                        switch b {
11,723,172✔
719
                        case 'S', 's':
1,191,992✔
720
                                c.state = OP_RS
1,191,992✔
721
                        case 'M', 'm':
10,531,180✔
722
                                c.state = OP_M
10,531,180✔
723
                        default:
×
724
                                goto parseErr
×
725
                        }
726
                case OP_RS:
1,191,991✔
727
                        switch b {
1,191,991✔
728
                        case '+':
984,721✔
729
                                c.state = OP_SUB
984,721✔
730
                        case '-':
207,270✔
731
                                c.state = OP_UNSUB
207,270✔
732
                        default:
×
733
                                goto parseErr
×
734
                        }
735
                case OP_U:
16,267✔
736
                        switch b {
16,267✔
737
                        case 'N', 'n':
16,267✔
738
                                c.state = OP_UN
16,267✔
739
                        default:
×
740
                                goto parseErr
×
741
                        }
742
                case OP_UN:
16,267✔
743
                        switch b {
16,267✔
744
                        case 'S', 's':
16,267✔
745
                                c.state = OP_UNS
16,267✔
746
                        default:
×
747
                                goto parseErr
×
748
                        }
749
                case OP_UNS:
16,267✔
750
                        switch b {
16,267✔
751
                        case 'U', 'u':
16,267✔
752
                                c.state = OP_UNSU
16,267✔
753
                        default:
×
754
                                goto parseErr
×
755
                        }
756
                case OP_UNSU:
16,267✔
757
                        switch b {
16,267✔
758
                        case 'B', 'b':
16,267✔
759
                                c.state = OP_UNSUB
16,267✔
760
                        default:
×
761
                                goto parseErr
×
762
                        }
763
                case OP_UNSUB:
238,338✔
764
                        switch b {
238,338✔
765
                        case ' ', '\t':
238,338✔
766
                                c.state = OP_UNSUB_SPC
238,338✔
767
                        default:
×
768
                                goto parseErr
×
769
                        }
770
                case OP_UNSUB_SPC:
238,338✔
771
                        switch b {
238,338✔
772
                        case ' ', '\t':
×
773
                                continue
×
774
                        default:
238,338✔
775
                                c.state = UNSUB_ARG
238,338✔
776
                                c.as = i
238,338✔
777
                        }
778
                case UNSUB_ARG:
7,026,399✔
779
                        switch b {
7,026,399✔
780
                        case '\r':
238,332✔
781
                                c.drop = 1
238,332✔
782
                        case '\n':
238,331✔
783
                                var arg []byte
238,331✔
784
                                if c.argBuf != nil {
245,390✔
785
                                        arg = c.argBuf
7,059✔
786
                                        c.argBuf = nil
7,059✔
787
                                } else {
238,331✔
788
                                        arg = buf[c.as : i-c.drop]
231,272✔
789
                                }
231,272✔
790
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
238,331✔
791
                                        return err
×
792
                                }
×
793
                                var err error
238,331✔
794

238,331✔
795
                                switch c.kind {
238,331✔
796
                                case CLIENT:
16,267✔
797
                                        if trace {
29,803✔
798
                                                c.traceInOp("UNSUB", arg)
13,536✔
799
                                        }
13,536✔
800
                                        err = c.processUnsub(arg)
16,267✔
801
                                case ROUTER:
173,698✔
802
                                        if trace && c.srv != nil {
174,104✔
803
                                                switch c.op {
406✔
804
                                                case 'R', 'r':
212✔
805
                                                        c.traceInOp("RS-", arg)
212✔
806
                                                case 'L', 'l':
194✔
807
                                                        c.traceInOp("LS-", arg)
194✔
808
                                                }
809
                                        }
810
                                        leafUnsub := c.op == 'L' || c.op == 'l'
173,698✔
811
                                        err = c.processRemoteUnsub(arg, leafUnsub)
173,698✔
812
                                case GATEWAY:
44,977✔
813
                                        if trace {
46,156✔
814
                                                c.traceInOp("RS-", arg)
1,179✔
815
                                        }
1,179✔
816
                                        err = c.processGatewayRUnsub(arg)
44,977✔
817
                                case LEAF:
3,389✔
818
                                        if trace {
3,393✔
819
                                                c.traceInOp("LS-", arg)
4✔
820
                                        }
4✔
821
                                        err = c.processLeafUnsub(arg)
3,389✔
822
                                }
823
                                if err != nil {
238,331✔
824
                                        return err
×
825
                                }
×
826
                                c.drop, c.as, c.state = 0, i+1, OP_START
238,331✔
827
                        default:
6,549,736✔
828
                                if c.argBuf != nil {
6,709,518✔
829
                                        c.argBuf = append(c.argBuf, b)
159,782✔
830
                                }
159,782✔
831
                        }
832
                case OP_PI:
40,782✔
833
                        switch b {
40,782✔
834
                        case 'N', 'n':
40,782✔
835
                                c.state = OP_PIN
40,782✔
836
                        default:
×
837
                                goto parseErr
×
838
                        }
839
                case OP_PIN:
40,782✔
840
                        switch b {
40,782✔
841
                        case 'G', 'g':
40,782✔
842
                                c.state = OP_PING
40,782✔
843
                        default:
×
844
                                goto parseErr
×
845
                        }
846
                case OP_PING:
81,564✔
847
                        switch b {
81,564✔
848
                        case '\n':
40,782✔
849
                                if trace {
41,426✔
850
                                        c.traceInOp("PING", nil)
644✔
851
                                }
644✔
852
                                c.processPing()
40,782✔
853
                                c.drop, c.state = 0, OP_START
40,782✔
854
                        }
855
                case OP_PO:
28,807✔
856
                        switch b {
28,807✔
857
                        case 'N', 'n':
28,807✔
858
                                c.state = OP_PON
28,807✔
859
                        default:
×
860
                                goto parseErr
×
861
                        }
862
                case OP_PON:
28,807✔
863
                        switch b {
28,807✔
864
                        case 'G', 'g':
28,807✔
865
                                c.state = OP_PONG
28,807✔
866
                        default:
×
867
                                goto parseErr
×
868
                        }
869
                case OP_PONG:
57,614✔
870
                        switch b {
57,614✔
871
                        case '\n':
28,807✔
872
                                if trace {
28,870✔
873
                                        c.traceInOp("PONG", nil)
63✔
874
                                }
63✔
875
                                c.processPong()
28,807✔
876
                                c.drop, c.state = 0, OP_START
28,807✔
877
                        }
878
                case OP_C:
42,919✔
879
                        switch b {
42,919✔
880
                        case 'O', 'o':
42,919✔
881
                                c.state = OP_CO
42,919✔
882
                        default:
×
883
                                goto parseErr
×
884
                        }
885
                case OP_CO:
42,919✔
886
                        switch b {
42,919✔
887
                        case 'N', 'n':
42,919✔
888
                                c.state = OP_CON
42,919✔
889
                        default:
×
890
                                goto parseErr
×
891
                        }
892
                case OP_CON:
42,919✔
893
                        switch b {
42,919✔
894
                        case 'N', 'n':
42,919✔
895
                                c.state = OP_CONN
42,919✔
896
                        default:
×
897
                                goto parseErr
×
898
                        }
899
                case OP_CONN:
42,919✔
900
                        switch b {
42,919✔
901
                        case 'E', 'e':
42,919✔
902
                                c.state = OP_CONNE
42,919✔
903
                        default:
×
904
                                goto parseErr
×
905
                        }
906
                case OP_CONNE:
42,919✔
907
                        switch b {
42,919✔
908
                        case 'C', 'c':
42,919✔
909
                                c.state = OP_CONNEC
42,919✔
910
                        default:
×
911
                                goto parseErr
×
912
                        }
913
                case OP_CONNEC:
42,919✔
914
                        switch b {
42,919✔
915
                        case 'T', 't':
42,919✔
916
                                c.state = OP_CONNECT
42,919✔
917
                        default:
×
918
                                goto parseErr
×
919
                        }
920
                case OP_CONNECT:
85,838✔
921
                        switch b {
85,838✔
922
                        case ' ', '\t':
42,919✔
923
                                continue
42,919✔
924
                        default:
42,919✔
925
                                c.state = CONNECT_ARG
42,919✔
926
                                c.as = i
42,919✔
927
                        }
928
                case CONNECT_ARG:
9,104,653✔
929
                        switch b {
9,104,653✔
930
                        case '\r':
42,919✔
931
                                c.drop = 1
42,919✔
932
                        case '\n':
42,919✔
933
                                var arg []byte
42,919✔
934
                                if c.argBuf != nil {
45,322✔
935
                                        arg = c.argBuf
2,403✔
936
                                        c.argBuf = nil
2,403✔
937
                                } else {
42,919✔
938
                                        arg = buf[c.as : i-c.drop]
40,516✔
939
                                }
40,516✔
940
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
42,919✔
941
                                        return err
×
942
                                }
×
943
                                if trace {
43,604✔
944
                                        c.traceInOp("CONNECT", removeSecretsFromTrace(arg))
685✔
945
                                }
685✔
946
                                if err := c.processConnect(arg); err != nil {
43,162✔
947
                                        return err
243✔
948
                                }
243✔
949
                                c.drop, c.state = 0, OP_START
42,676✔
950
                                // Reset notion on authSet
42,676✔
951
                                c.mu.Lock()
42,676✔
952
                                authSet = c.awaitingAuth()
42,676✔
953
                                c.mu.Unlock()
42,676✔
954
                        default:
9,018,815✔
955
                                if c.argBuf != nil {
9,769,285✔
956
                                        c.argBuf = append(c.argBuf, b)
750,470✔
957
                                }
750,470✔
958
                        }
959
                case OP_M:
10,622,765✔
960
                        switch b {
10,622,765✔
961
                        case 'S', 's':
10,622,765✔
962
                                c.state = OP_MS
10,622,765✔
963
                        default:
×
964
                                goto parseErr
×
965
                        }
966
                case OP_MS:
10,622,765✔
967
                        switch b {
10,622,765✔
968
                        case 'G', 'g':
10,622,765✔
969
                                c.state = OP_MSG
10,622,765✔
970
                        default:
×
971
                                goto parseErr
×
972
                        }
973
                case OP_MSG:
10,622,765✔
974
                        switch b {
10,622,765✔
975
                        case ' ', '\t':
10,622,765✔
976
                                c.state = OP_MSG_SPC
10,622,765✔
977
                        default:
×
978
                                goto parseErr
×
979
                        }
980
                case OP_MSG_SPC:
10,622,765✔
981
                        switch b {
10,622,765✔
982
                        case ' ', '\t':
×
983
                                continue
×
984
                        default:
10,622,765✔
985
                                c.pa.hdr = -1
10,622,765✔
986
                                c.state = MSG_ARG
10,622,765✔
987
                                c.as = i
10,622,765✔
988
                        }
989
                case MSG_ARG:
380,805,791✔
990
                        switch b {
380,805,791✔
991
                        case '\r':
10,622,762✔
992
                                c.drop = 1
10,622,762✔
993
                        case '\n':
10,622,762✔
994
                                var arg []byte
10,622,762✔
995
                                if c.argBuf != nil {
10,898,806✔
996
                                        arg = c.argBuf
276,044✔
997
                                        c.argBuf = nil
276,044✔
998
                                } else {
10,622,762✔
999
                                        arg = buf[c.as : i-c.drop]
10,346,718✔
1000
                                }
10,346,718✔
1001
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
10,622,762✔
1002
                                        return err
×
1003
                                }
×
1004
                                var err error
10,622,762✔
1005
                                if c.kind == ROUTER || c.kind == GATEWAY {
21,162,356✔
1006
                                        switch c.op {
10,539,594✔
1007
                                        case 'R', 'r':
10,531,178✔
1008
                                                if trace {
10,537,844✔
1009
                                                        c.traceInOp("RMSG", arg)
6,666✔
1010
                                                }
6,666✔
1011
                                                err = c.processRoutedMsgArgs(arg)
10,531,178✔
1012
                                        case 'L', 'l':
8,416✔
1013
                                                if trace {
8,662✔
1014
                                                        c.traceInOp("LMSG", arg)
246✔
1015
                                                }
246✔
1016
                                                lmsg = true
8,416✔
1017
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
8,416✔
1018
                                        }
1019
                                } else if c.kind == LEAF {
166,336✔
1020
                                        if trace {
83,184✔
1021
                                                c.traceInOp("LMSG", arg)
16✔
1022
                                        }
16✔
1023
                                        err = c.processLeafMsgArgs(arg)
83,168✔
1024
                                }
1025
                                if err != nil {
10,622,763✔
1026
                                        return err
1✔
1027
                                }
1✔
1028
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
10,622,761✔
1029

10,622,761✔
1030
                                // jump ahead with the index. If this overruns
10,622,761✔
1031
                                // what is left we fall out and process split
10,622,761✔
1032
                                // buffer.
10,622,761✔
1033
                                i = c.as + c.pa.size - LEN_CR_LF
10,622,761✔
1034
                        default:
359,560,267✔
1035
                                if c.argBuf != nil {
362,605,010✔
1036
                                        c.argBuf = append(c.argBuf, b)
3,044,743✔
1037
                                }
3,044,743✔
1038
                        }
1039
                case OP_I:
84,939✔
1040
                        switch b {
84,939✔
1041
                        case 'N', 'n':
84,939✔
1042
                                c.state = OP_IN
84,939✔
1043
                        default:
×
1044
                                goto parseErr
×
1045
                        }
1046
                case OP_IN:
84,939✔
1047
                        switch b {
84,939✔
1048
                        case 'F', 'f':
84,939✔
1049
                                c.state = OP_INF
84,939✔
1050
                        default:
×
1051
                                goto parseErr
×
1052
                        }
1053
                case OP_INF:
84,939✔
1054
                        switch b {
84,939✔
1055
                        case 'O', 'o':
84,939✔
1056
                                c.state = OP_INFO
84,939✔
1057
                        default:
×
1058
                                goto parseErr
×
1059
                        }
1060
                case OP_INFO:
169,878✔
1061
                        switch b {
169,878✔
1062
                        case ' ', '\t':
84,939✔
1063
                                continue
84,939✔
1064
                        default:
84,939✔
1065
                                c.state = INFO_ARG
84,939✔
1066
                                c.as = i
84,939✔
1067
                        }
1068
                case INFO_ARG:
32,180,421✔
1069
                        switch b {
32,180,421✔
1070
                        case '\r':
84,933✔
1071
                                c.drop = 1
84,933✔
1072
                        case '\n':
84,933✔
1073
                                var arg []byte
84,933✔
1074
                                if c.argBuf != nil {
93,695✔
1075
                                        arg = c.argBuf
8,762✔
1076
                                        c.argBuf = nil
8,762✔
1077
                                } else {
84,933✔
1078
                                        arg = buf[c.as : i-c.drop]
76,171✔
1079
                                }
76,171✔
1080
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
84,933✔
1081
                                        return err
×
1082
                                }
×
1083
                                if err := c.processInfo(arg); err != nil {
84,933✔
1084
                                        return err
×
1085
                                }
×
1086
                                c.drop, c.as, c.state = 0, i+1, OP_START
84,933✔
1087
                        default:
32,010,555✔
1088
                                if c.argBuf != nil {
33,002,956✔
1089
                                        c.argBuf = append(c.argBuf, b)
992,401✔
1090
                                }
992,401✔
1091
                        }
1092
                case OP_PLUS:
×
1093
                        switch b {
×
1094
                        case 'O', 'o':
×
1095
                                c.state = OP_PLUS_O
×
1096
                        default:
×
1097
                                goto parseErr
×
1098
                        }
1099
                case OP_PLUS_O:
×
1100
                        switch b {
×
1101
                        case 'K', 'k':
×
1102
                                c.state = OP_PLUS_OK
×
1103
                        default:
×
1104
                                goto parseErr
×
1105
                        }
1106
                case OP_PLUS_OK:
×
1107
                        switch b {
×
1108
                        case '\n':
×
1109
                                c.drop, c.state = 0, OP_START
×
1110
                        }
1111
                case OP_MINUS:
108✔
1112
                        switch b {
108✔
1113
                        case 'E', 'e':
108✔
1114
                                c.state = OP_MINUS_E
108✔
1115
                        default:
×
1116
                                goto parseErr
×
1117
                        }
1118
                case OP_MINUS_E:
108✔
1119
                        switch b {
108✔
1120
                        case 'R', 'r':
108✔
1121
                                c.state = OP_MINUS_ER
108✔
1122
                        default:
×
1123
                                goto parseErr
×
1124
                        }
1125
                case OP_MINUS_ER:
108✔
1126
                        switch b {
108✔
1127
                        case 'R', 'r':
108✔
1128
                                c.state = OP_MINUS_ERR
108✔
1129
                        default:
×
1130
                                goto parseErr
×
1131
                        }
1132
                case OP_MINUS_ERR:
108✔
1133
                        switch b {
108✔
1134
                        case ' ', '\t':
108✔
1135
                                c.state = OP_MINUS_ERR_SPC
108✔
1136
                        default:
×
1137
                                goto parseErr
×
1138
                        }
1139
                case OP_MINUS_ERR_SPC:
108✔
1140
                        switch b {
108✔
1141
                        case ' ', '\t':
×
1142
                                continue
×
1143
                        default:
108✔
1144
                                c.state = MINUS_ERR_ARG
108✔
1145
                                c.as = i
108✔
1146
                        }
1147
                case MINUS_ERR_ARG:
4,754✔
1148
                        switch b {
4,754✔
1149
                        case '\r':
108✔
1150
                                c.drop = 1
108✔
1151
                        case '\n':
108✔
1152
                                var arg []byte
108✔
1153
                                if c.argBuf != nil {
109✔
1154
                                        arg = c.argBuf
1✔
1155
                                        c.argBuf = nil
1✔
1156
                                } else {
108✔
1157
                                        arg = buf[c.as : i-c.drop]
107✔
1158
                                }
107✔
1159
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
108✔
1160
                                        return err
×
1161
                                }
×
1162
                                c.processErr(string(arg))
108✔
1163
                                c.drop, c.as, c.state = 0, i+1, OP_START
108✔
1164
                        default:
4,538✔
1165
                                if c.argBuf != nil {
4,540✔
1166
                                        c.argBuf = append(c.argBuf, b)
2✔
1167
                                }
2✔
1168
                        }
1169
                default:
×
1170
                        goto parseErr
×
1171
                }
1172
        }
1173

1174
        // Check for split buffer scenarios for any ARG state.
1175
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
11,284,635✔
1176
                c.state == PUB_ARG || c.state == HPUB_ARG ||
11,284,635✔
1177
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
11,284,635✔
1178
                c.state == MSG_ARG || c.state == HMSG_ARG ||
11,284,635✔
1179
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
11,636,746✔
1180

352,111✔
1181
                // Setup a holder buffer to deal with split buffer scenario.
352,111✔
1182
                if c.argBuf == nil {
703,918✔
1183
                        c.argBuf = c.scratch[:0]
351,807✔
1184
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
351,807✔
1185
                }
351,807✔
1186
                // Check for violations of control line length here. Note that this is not
1187
                // exact at all but the performance hit is too great to be precise, and
1188
                // catching here should prevent memory exhaustion attacks.
1189
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
352,112✔
1190
                        return err
1✔
1191
                }
1✔
1192
        }
1193

1194
        // Check for split msg
1195
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
12,423,083✔
1196
                // We need to clone the pubArg if it is still referencing the
1,138,449✔
1197
                // read buffer and we are not able to process the msg.
1,138,449✔
1198

1,138,449✔
1199
                if c.argBuf == nil {
2,276,898✔
1200
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
1,138,449✔
1201
                        if err := c.clonePubArg(lmsg); err != nil {
1,138,449✔
1202
                                goto parseErr
×
1203
                        }
1204
                }
1205

1206
                // If we will overflow the scratch buffer, just create a
1207
                // new buffer to hold the split message.
1208
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
1,173,278✔
1209
                        lrem := len(buf[c.as:])
34,829✔
1210
                        // Consider it a protocol error when the remaining payload
34,829✔
1211
                        // is larger than the reported size for PUB. It can happen
34,829✔
1212
                        // when processing incomplete messages from rogue clients.
34,829✔
1213
                        if lrem > c.pa.size+LEN_CR_LF {
34,829✔
1214
                                goto parseErr
×
1215
                        }
1216
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
34,829✔
1217
                        copy(c.msgBuf, buf[c.as:])
34,829✔
1218
                } else {
1,103,620✔
1219
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
1,103,620✔
1220
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
1,103,620✔
1221
                }
1,103,620✔
1222
        }
1223

1224
        return nil
11,284,634✔
1225

11,284,634✔
1226
authErr:
11,284,634✔
1227
        c.authViolation()
14✔
1228
        return ErrAuthentication
14✔
1229

14✔
1230
parseErr:
14✔
1231
        c.sendErr("Unknown Protocol Operation")
6✔
1232
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
6✔
1233
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
6✔
1234
        return err
6✔
1235
}
1236

1237
func protoSnippet(start, max int, buf []byte) string {
7✔
1238
        stop := start + max
7✔
1239
        bufSize := len(buf)
7✔
1240
        if start >= bufSize {
7✔
1241
                return `""`
×
1242
        }
×
1243
        if stop > bufSize {
10✔
1244
                stop = bufSize - 1
3✔
1245
        }
3✔
1246
        return fmt.Sprintf("%q", buf[start:stop])
7✔
1247
}
1248

1249
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1250
// If so, an error is sent to the client and the connection is closed.
1251
// The error ErrMaxControlLine is returned.
1252
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
20,685,187✔
1253
        if c.kind != CLIENT {
33,481,408✔
1254
                return nil
12,796,221✔
1255
        }
12,796,221✔
1256
        if len(arg) > int(mcl) {
7,888,967✔
1257
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
1✔
1258
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
1✔
1259
                c.sendErr(err.Error())
1✔
1260
                c.closeConnection(MaxControlLineExceeded)
1✔
1261
                return err
1✔
1262
        }
1✔
1263
        return nil
7,888,965✔
1264
}
1265

1266
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1267
// we need to hold onto it into the next read.
1268
func (c *client) clonePubArg(lmsg bool) error {
1,138,449✔
1269
        // Just copy and re-process original arg buffer.
1,138,449✔
1270
        c.argBuf = c.scratch[:0]
1,138,449✔
1271
        c.argBuf = append(c.argBuf, c.pa.arg...)
1,138,449✔
1272

1,138,449✔
1273
        switch c.kind {
1,138,449✔
1274
        case ROUTER, GATEWAY:
1,097,922✔
1275
                if lmsg {
1,098,515✔
1276
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
593✔
1277
                }
593✔
1278
                if c.pa.hdr < 0 {
2,144,144✔
1279
                        return c.processRoutedMsgArgs(c.argBuf)
1,046,815✔
1280
                } else {
1,097,329✔
1281
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
50,514✔
1282
                }
50,514✔
1283
        case LEAF:
1,847✔
1284
                if c.pa.hdr < 0 {
3,596✔
1285
                        return c.processLeafMsgArgs(c.argBuf)
1,749✔
1286
                } else {
1,847✔
1287
                        return c.processLeafHeaderMsgArgs(c.argBuf)
98✔
1288
                }
98✔
1289
        default:
38,680✔
1290
                if c.pa.hdr < 0 {
75,535✔
1291
                        return c.processPub(c.argBuf)
36,855✔
1292
                } else {
38,680✔
1293
                        return c.processHeaderPub(c.argBuf, nil)
1,825✔
1294
                }
1,825✔
1295
        }
1296
}
1297

1298
func (ps *parseState) getHeader() http.Header {
857✔
1299
        if ps.header == nil {
1,714✔
1300
                if hdr := ps.pa.hdr; hdr > 0 {
930✔
1301
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1302
                        tp := textproto.NewReader(reader)
73✔
1303
                        tp.ReadLine() // skip over first line, contains version
73✔
1304
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1305
                                ps.header = http.Header(mimeHeader)
73✔
1306
                        }
73✔
1307
                }
1308
        }
1309
        return ps.header
857✔
1310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc