• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13085279382

31 Jan 2025 03:27PM UTC coverage: 85.493% (-0.1%) from 85.623%
13085279382

push

github

web-flow
Expose `raftz` and `ipqueuesz` via system account (#6439)

This PR adds `$SYS.REQ.SERVER.*.RAFTZ` and `$SYS.REQ.SERVER.*.IPQUEUESZ`
so that they can be queried via the system account as well as the
monitoring port.

Signed-off-by: Neil Twigg <neil@nats.io>

68921 of 80616 relevant lines covered (85.49%)

801962.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.95
/server/parser.go
1
// Copyright 2012-2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
}
36

37
type pubArg struct {
38
        arg     []byte
39
        pacache []byte
40
        origin  []byte
41
        account []byte
42
        subject []byte
43
        deliver []byte
44
        mapped  []byte
45
        reply   []byte
46
        szb     []byte
47
        hdb     []byte
48
        queues  [][]byte
49
        size    int
50
        hdr     int
51
        psi     []*serviceImport
52
        trace   *msgTrace
53
}
54

55
// Parser constants
56
const (
57
        OP_START parserState = iota
58
        OP_PLUS
59
        OP_PLUS_O
60
        OP_PLUS_OK
61
        OP_MINUS
62
        OP_MINUS_E
63
        OP_MINUS_ER
64
        OP_MINUS_ERR
65
        OP_MINUS_ERR_SPC
66
        MINUS_ERR_ARG
67
        OP_C
68
        OP_CO
69
        OP_CON
70
        OP_CONN
71
        OP_CONNE
72
        OP_CONNEC
73
        OP_CONNECT
74
        CONNECT_ARG
75
        OP_H
76
        OP_HP
77
        OP_HPU
78
        OP_HPUB
79
        OP_HPUB_SPC
80
        HPUB_ARG
81
        OP_HM
82
        OP_HMS
83
        OP_HMSG
84
        OP_HMSG_SPC
85
        HMSG_ARG
86
        OP_P
87
        OP_PU
88
        OP_PUB
89
        OP_PUB_SPC
90
        PUB_ARG
91
        OP_PI
92
        OP_PIN
93
        OP_PING
94
        OP_PO
95
        OP_PON
96
        OP_PONG
97
        MSG_PAYLOAD
98
        MSG_END_R
99
        MSG_END_N
100
        OP_S
101
        OP_SU
102
        OP_SUB
103
        OP_SUB_SPC
104
        SUB_ARG
105
        OP_A
106
        OP_ASUB
107
        OP_ASUB_SPC
108
        ASUB_ARG
109
        OP_AUSUB
110
        OP_AUSUB_SPC
111
        AUSUB_ARG
112
        OP_L
113
        OP_LS
114
        OP_R
115
        OP_RS
116
        OP_U
117
        OP_UN
118
        OP_UNS
119
        OP_UNSU
120
        OP_UNSUB
121
        OP_UNSUB_SPC
122
        UNSUB_ARG
123
        OP_M
124
        OP_MS
125
        OP_MSG
126
        OP_MSG_SPC
127
        MSG_ARG
128
        OP_I
129
        OP_IN
130
        OP_INF
131
        OP_INFO
132
        INFO_ARG
133
)
134

135
func (c *client) parse(buf []byte) error {
11,715,115✔
136
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
11,715,115✔
137
        // an issue (say data race detection), we could branch outside in readLoop
11,715,115✔
138
        if c.isMqtt() {
11,716,809✔
139
                return c.mqttParse(buf)
1,694✔
140
        }
1,694✔
141
        var i int
11,713,421✔
142
        var b byte
11,713,421✔
143
        var lmsg bool
11,713,421✔
144

11,713,421✔
145
        // Snapshots
11,713,421✔
146
        c.mu.Lock()
11,713,421✔
147
        // Snapshot and then reset when we receive a
11,713,421✔
148
        // proper CONNECT if needed.
11,713,421✔
149
        authSet := c.awaitingAuth()
11,713,421✔
150
        // Snapshot max control line as well.
11,713,421✔
151
        s, mcl, trace := c.srv, c.mcl, c.trace
11,713,421✔
152
        c.mu.Unlock()
11,713,421✔
153

11,713,421✔
154
        // Move to loop instead of range syntax to allow jumping of i
11,713,421✔
155
        for i = 0; i < len(buf); i++ {
1,546,524,531✔
156
                b = buf[i]
1,534,811,110✔
157

1,534,811,110✔
158
                switch c.state {
1,534,811,110✔
159
                case OP_START:
23,480,632✔
160
                        c.op = b
23,480,632✔
161
                        if b != 'C' && b != 'c' {
46,925,257✔
162
                                if authSet {
23,444,639✔
163
                                        if s == nil {
14✔
164
                                                goto authErr
×
165
                                        }
166
                                        var ok bool
14✔
167
                                        // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
14✔
168
                                        // E.g. telnet proto demos.
14✔
169
                                        if noAuthUser := s.getOpts().NoAuthUser; noAuthUser != _EMPTY_ {
16✔
170
                                                s.mu.Lock()
2✔
171
                                                user, exists := s.users[noAuthUser]
2✔
172
                                                s.mu.Unlock()
2✔
173
                                                if exists {
4✔
174
                                                        c.RegisterUser(user)
2✔
175
                                                        c.mu.Lock()
2✔
176
                                                        c.clearAuthTimer()
2✔
177
                                                        c.flags.set(connectReceived)
2✔
178
                                                        c.mu.Unlock()
2✔
179
                                                        authSet, ok = false, true
2✔
180
                                                }
2✔
181
                                        }
182
                                        if !ok {
26✔
183
                                                goto authErr
12✔
184
                                        }
185
                                }
186
                                // If the connection is a gateway connection, make sure that
187
                                // if this is an inbound, it starts with a CONNECT.
188
                                if c.kind == GATEWAY && !c.gw.outbound && !c.gw.connected {
23,444,613✔
189
                                        // Use auth violation since no CONNECT was sent.
×
190
                                        // It could be a parseErr too.
×
191
                                        goto authErr
×
192
                                }
193
                        }
194
                        switch b {
23,480,620✔
195
                        case 'P', 'p':
3,364,512✔
196
                                c.state = OP_P
3,364,512✔
197
                        case 'H', 'h':
6,812,148✔
198
                                c.state = OP_H
6,812,148✔
199
                        case 'S', 's':
137,881✔
200
                                c.state = OP_S
137,881✔
201
                        case 'U', 'u':
14,676✔
202
                                c.state = OP_U
14,676✔
203
                        case 'R', 'r':
12,875,165✔
204
                                if c.kind == CLIENT {
12,875,165✔
205
                                        goto parseErr
×
206
                                } else {
12,875,165✔
207
                                        c.state = OP_R
12,875,165✔
208
                                }
12,875,165✔
209
                        case 'L', 'l':
167,278✔
210
                                if c.kind != LEAF && c.kind != ROUTER {
167,278✔
211
                                        goto parseErr
×
212
                                } else {
167,278✔
213
                                        c.state = OP_L
167,278✔
214
                                }
167,278✔
215
                        case 'A', 'a':
32✔
216
                                if c.kind == CLIENT {
32✔
217
                                        goto parseErr
×
218
                                } else {
32✔
219
                                        c.state = OP_A
32✔
220
                                }
32✔
221
                        case 'C', 'c':
36,007✔
222
                                c.state = OP_C
36,007✔
223
                        case 'I', 'i':
72,751✔
224
                                c.state = OP_I
72,751✔
225
                        case '+':
3✔
226
                                c.state = OP_PLUS
3✔
227
                        case '-':
155✔
228
                                c.state = OP_MINUS
155✔
229
                        default:
12✔
230
                                goto parseErr
12✔
231
                        }
232
                case OP_H:
6,812,148✔
233
                        switch b {
6,812,148✔
234
                        case 'P', 'p':
3,727,836✔
235
                                c.state = OP_HP
3,727,836✔
236
                        case 'M', 'm':
3,084,312✔
237
                                c.state = OP_HM
3,084,312✔
238
                        default:
×
239
                                goto parseErr
×
240
                        }
241
                case OP_HP:
3,727,836✔
242
                        switch b {
3,727,836✔
243
                        case 'U', 'u':
3,727,836✔
244
                                c.state = OP_HPU
3,727,836✔
245
                        default:
×
246
                                goto parseErr
×
247
                        }
248
                case OP_HPU:
3,727,836✔
249
                        switch b {
3,727,836✔
250
                        case 'B', 'b':
3,727,836✔
251
                                c.state = OP_HPUB
3,727,836✔
252
                        default:
×
253
                                goto parseErr
×
254
                        }
255
                case OP_HPUB:
3,727,836✔
256
                        switch b {
3,727,836✔
257
                        case ' ', '\t':
3,727,836✔
258
                                c.state = OP_HPUB_SPC
3,727,836✔
259
                        default:
×
260
                                goto parseErr
×
261
                        }
262
                case OP_HPUB_SPC:
3,727,836✔
263
                        switch b {
3,727,836✔
264
                        case ' ', '\t':
×
265
                                continue
×
266
                        default:
3,727,836✔
267
                                c.pa.hdr = 0
3,727,836✔
268
                                c.state = HPUB_ARG
3,727,836✔
269
                                c.as = i
3,727,836✔
270
                        }
271
                case HPUB_ARG:
305,406,920✔
272
                        switch b {
305,406,920✔
273
                        case '\r':
3,727,836✔
274
                                c.drop = 1
3,727,836✔
275
                        case '\n':
3,727,836✔
276
                                var arg []byte
3,727,836✔
277
                                if c.argBuf != nil {
3,727,872✔
278
                                        arg = c.argBuf
36✔
279
                                        c.argBuf = nil
36✔
280
                                } else {
3,727,836✔
281
                                        arg = buf[c.as : i-c.drop]
3,727,800✔
282
                                }
3,727,800✔
283
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
3,727,836✔
284
                                        return err
×
285
                                }
×
286
                                if trace {
3,727,842✔
287
                                        c.traceInOp("HPUB", arg)
6✔
288
                                }
6✔
289
                                var remaining []byte
3,727,836✔
290
                                if i < len(buf) {
7,455,672✔
291
                                        remaining = buf[i+1:]
3,727,836✔
292
                                }
3,727,836✔
293
                                if err := c.processHeaderPub(arg, remaining); err != nil {
3,727,840✔
294
                                        return err
4✔
295
                                }
4✔
296

297
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
3,727,832✔
298
                                // If we don't have a saved buffer then jump ahead with
3,727,832✔
299
                                // the index. If this overruns what is left we fall out
3,727,832✔
300
                                // and process split buffer.
3,727,832✔
301
                                if c.msgBuf == nil {
7,455,664✔
302
                                        i = c.as + c.pa.size - LEN_CR_LF
3,727,832✔
303
                                }
3,727,832✔
304
                        default:
297,951,248✔
305
                                if c.argBuf != nil {
297,951,859✔
306
                                        c.argBuf = append(c.argBuf, b)
611✔
307
                                }
611✔
308
                        }
309
                case OP_HM:
3,084,312✔
310
                        switch b {
3,084,312✔
311
                        case 'S', 's':
3,084,312✔
312
                                c.state = OP_HMS
3,084,312✔
313
                        default:
×
314
                                goto parseErr
×
315
                        }
316
                case OP_HMS:
3,084,312✔
317
                        switch b {
3,084,312✔
318
                        case 'G', 'g':
3,084,312✔
319
                                c.state = OP_HMSG
3,084,312✔
320
                        default:
×
321
                                goto parseErr
×
322
                        }
323
                case OP_HMSG:
3,084,312✔
324
                        switch b {
3,084,312✔
325
                        case ' ', '\t':
3,084,312✔
326
                                c.state = OP_HMSG_SPC
3,084,312✔
327
                        default:
×
328
                                goto parseErr
×
329
                        }
330
                case OP_HMSG_SPC:
3,084,312✔
331
                        switch b {
3,084,312✔
332
                        case ' ', '\t':
×
333
                                continue
×
334
                        default:
3,084,312✔
335
                                c.pa.hdr = 0
3,084,312✔
336
                                c.state = HMSG_ARG
3,084,312✔
337
                                c.as = i
3,084,312✔
338
                        }
339
                case HMSG_ARG:
310,438,097✔
340
                        switch b {
310,438,097✔
341
                        case '\r':
3,084,312✔
342
                                c.drop = 1
3,084,312✔
343
                        case '\n':
3,084,312✔
344
                                var arg []byte
3,084,312✔
345
                                if c.argBuf != nil {
3,116,960✔
346
                                        arg = c.argBuf
32,648✔
347
                                        c.argBuf = nil
32,648✔
348
                                } else {
3,084,312✔
349
                                        arg = buf[c.as : i-c.drop]
3,051,664✔
350
                                }
3,051,664✔
351
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
3,084,312✔
352
                                        return err
×
353
                                }
×
354
                                var err error
3,084,312✔
355
                                if c.kind == ROUTER || c.kind == GATEWAY {
6,168,240✔
356
                                        if trace {
3,086,078✔
357
                                                c.traceInOp("HMSG", arg)
2,150✔
358
                                        }
2,150✔
359
                                        err = c.processRoutedHeaderMsgArgs(arg)
3,083,928✔
360
                                } else if c.kind == LEAF {
768✔
361
                                        if trace {
536✔
362
                                                c.traceInOp("HMSG", arg)
152✔
363
                                        }
152✔
364
                                        err = c.processLeafHeaderMsgArgs(arg)
384✔
365
                                }
366
                                if err != nil {
3,084,313✔
367
                                        return err
1✔
368
                                }
1✔
369
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
3,084,311✔
370

3,084,311✔
371
                                // jump ahead with the index. If this overruns
3,084,311✔
372
                                // what is left we fall out and process split
3,084,311✔
373
                                // buffer.
3,084,311✔
374
                                i = c.as + c.pa.size - LEN_CR_LF
3,084,311✔
375
                        default:
304,269,473✔
376
                                if c.argBuf != nil {
305,814,691✔
377
                                        c.argBuf = append(c.argBuf, b)
1,545,218✔
378
                                }
1,545,218✔
379
                        }
380
                case OP_P:
3,364,511✔
381
                        switch b {
3,364,511✔
382
                        case 'U', 'u':
3,312,233✔
383
                                c.state = OP_PU
3,312,233✔
384
                        case 'I', 'i':
31,586✔
385
                                c.state = OP_PI
31,586✔
386
                        case 'O', 'o':
20,691✔
387
                                c.state = OP_PO
20,691✔
388
                        default:
1✔
389
                                goto parseErr
1✔
390
                        }
391
                case OP_PU:
3,312,229✔
392
                        switch b {
3,312,229✔
393
                        case 'B', 'b':
3,312,228✔
394
                                c.state = OP_PUB
3,312,228✔
395
                        default:
1✔
396
                                goto parseErr
1✔
397
                        }
398
                case OP_PUB:
3,312,228✔
399
                        switch b {
3,312,228✔
400
                        case ' ', '\t':
3,312,227✔
401
                                c.state = OP_PUB_SPC
3,312,227✔
402
                        default:
1✔
403
                                goto parseErr
1✔
404
                        }
405
                case OP_PUB_SPC:
3,312,227✔
406
                        switch b {
3,312,227✔
407
                        case ' ', '\t':
1✔
408
                                continue
1✔
409
                        default:
3,312,226✔
410
                                c.pa.hdr = -1
3,312,226✔
411
                                c.state = PUB_ARG
3,312,226✔
412
                                c.as = i
3,312,226✔
413
                        }
414
                case PUB_ARG:
177,802,912✔
415
                        switch b {
177,802,912✔
416
                        case '\r':
3,312,216✔
417
                                c.drop = 1
3,312,216✔
418
                        case '\n':
3,312,217✔
419
                                var arg []byte
3,312,217✔
420
                                if c.argBuf != nil {
3,349,097✔
421
                                        arg = c.argBuf
36,880✔
422
                                        c.argBuf = nil
36,880✔
423
                                } else {
3,312,217✔
424
                                        arg = buf[c.as : i-c.drop]
3,275,337✔
425
                                }
3,275,337✔
426
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
3,312,218✔
427
                                        return err
1✔
428
                                }
1✔
429
                                if trace {
3,589,761✔
430
                                        c.traceInOp("PUB", arg)
277,545✔
431
                                }
277,545✔
432
                                if err := c.processPub(arg); err != nil {
3,312,227✔
433
                                        return err
11✔
434
                                }
11✔
435

436
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
3,312,205✔
437
                                // If we don't have a saved buffer then jump ahead with
3,312,205✔
438
                                // the index. If this overruns what is left we fall out
3,312,205✔
439
                                // and process split buffer.
3,312,205✔
440
                                if c.msgBuf == nil {
6,624,410✔
441
                                        i = c.as + c.pa.size - LEN_CR_LF
3,312,205✔
442
                                }
3,312,205✔
443
                        default:
171,178,479✔
444
                                if c.argBuf != nil {
172,111,967✔
445
                                        c.argBuf = append(c.argBuf, b)
933,488✔
446
                                }
933,488✔
447
                        }
448
                case MSG_PAYLOAD:
22,349,261✔
449
                        if c.msgBuf != nil {
24,236,880✔
450
                                // copy as much as we can to the buffer and skip ahead.
1,887,619✔
451
                                toCopy := c.pa.size - len(c.msgBuf)
1,887,619✔
452
                                avail := len(buf) - i
1,887,619✔
453
                                if avail < toCopy {
2,186,937✔
454
                                        toCopy = avail
299,318✔
455
                                }
299,318✔
456
                                if toCopy > 0 {
3,775,238✔
457
                                        start := len(c.msgBuf)
1,887,619✔
458
                                        // This is needed for copy to work.
1,887,619✔
459
                                        c.msgBuf = c.msgBuf[:start+toCopy]
1,887,619✔
460
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
1,887,619✔
461
                                        // Update our index
1,887,619✔
462
                                        i = (i + toCopy) - 1
1,887,619✔
463
                                } else {
1,887,619✔
464
                                        // Fall back to append if needed.
×
465
                                        c.msgBuf = append(c.msgBuf, b)
×
466
                                }
×
467
                                if len(c.msgBuf) >= c.pa.size {
3,475,920✔
468
                                        c.state = MSG_END_R
1,588,301✔
469
                                }
1,588,301✔
470
                        } else if i-c.as+1 >= c.pa.size {
40,923,284✔
471
                                c.state = MSG_END_R
20,461,642✔
472
                        }
20,461,642✔
473
                case MSG_END_R:
22,049,943✔
474
                        if b != '\r' {
22,049,946✔
475
                                goto parseErr
3✔
476
                        }
477
                        if c.msgBuf != nil {
23,641,749✔
478
                                c.msgBuf = append(c.msgBuf, b)
1,591,809✔
479
                        }
1,591,809✔
480
                        c.state = MSG_END_N
22,049,940✔
481
                case MSG_END_N:
22,049,924✔
482
                        if b != '\n' {
22,049,925✔
483
                                goto parseErr
1✔
484
                        }
485
                        if c.msgBuf != nil {
23,651,054✔
486
                                c.msgBuf = append(c.msgBuf, b)
1,601,131✔
487
                        } else {
22,049,923✔
488
                                c.msgBuf = buf[c.as : i+1]
20,448,792✔
489
                        }
20,448,792✔
490

491
                        var mt *msgTrace
22,049,923✔
492
                        if c.pa.hdr > 0 {
28,862,497✔
493
                                mt = c.initMsgTrace()
6,812,574✔
494
                        }
6,812,574✔
495
                        // Check for mappings.
496
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
22,074,401✔
497
                                changed := c.selectMappedSubject()
24,478✔
498
                                if changed {
37,034✔
499
                                        if trace {
12,629✔
500
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
73✔
501
                                        }
73✔
502
                                        // c.pa.subject is the subject the original is now mapped to.
503
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,556✔
504
                                }
505
                        }
506
                        if trace {
22,336,609✔
507
                                c.traceMsg(c.msgBuf)
286,686✔
508
                        }
286,686✔
509

510
                        c.processInboundMsg(c.msgBuf)
22,049,923✔
511

22,049,923✔
512
                        mt.sendEvent()
22,049,923✔
513
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
22,049,923✔
514
                        c.drop, c.as, c.state = 0, i+1, OP_START
22,049,923✔
515
                        // Drop all pub args
22,049,923✔
516
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
22,049,923✔
517
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
22,049,923✔
518
                        c.pa.trace = nil
22,049,923✔
519
                        lmsg = false
22,049,923✔
520
                case OP_A:
32✔
521
                        switch b {
32✔
522
                        case '+':
4✔
523
                                c.state = OP_ASUB
4✔
524
                        case '-', 'u':
28✔
525
                                c.state = OP_AUSUB
28✔
526
                        default:
×
527
                                goto parseErr
×
528
                        }
529
                case OP_ASUB:
4✔
530
                        switch b {
4✔
531
                        case ' ', '\t':
4✔
532
                                c.state = OP_ASUB_SPC
4✔
533
                        default:
×
534
                                goto parseErr
×
535
                        }
536
                case OP_ASUB_SPC:
4✔
537
                        switch b {
4✔
538
                        case ' ', '\t':
×
539
                                continue
×
540
                        default:
4✔
541
                                c.state = ASUB_ARG
4✔
542
                                c.as = i
4✔
543
                        }
544
                case ASUB_ARG:
14✔
545
                        switch b {
14✔
546
                        case '\r':
4✔
547
                                c.drop = 1
4✔
548
                        case '\n':
4✔
549
                                var arg []byte
4✔
550
                                if c.argBuf != nil {
4✔
551
                                        arg = c.argBuf
×
552
                                        c.argBuf = nil
×
553
                                } else {
4✔
554
                                        arg = buf[c.as : i-c.drop]
4✔
555
                                }
4✔
556
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
557
                                        return err
×
558
                                }
×
559
                                if trace {
4✔
560
                                        c.traceInOp("A+", arg)
×
561
                                }
×
562
                                if err := c.processAccountSub(arg); err != nil {
4✔
563
                                        return err
×
564
                                }
×
565
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
566
                        default:
6✔
567
                                if c.argBuf != nil {
6✔
568
                                        c.argBuf = append(c.argBuf, b)
×
569
                                }
×
570
                        }
571
                case OP_AUSUB:
28✔
572
                        switch b {
28✔
573
                        case ' ', '\t':
28✔
574
                                c.state = OP_AUSUB_SPC
28✔
575
                        default:
×
576
                                goto parseErr
×
577
                        }
578
                case OP_AUSUB_SPC:
28✔
579
                        switch b {
28✔
580
                        case ' ', '\t':
×
581
                                continue
×
582
                        default:
28✔
583
                                c.state = AUSUB_ARG
28✔
584
                                c.as = i
28✔
585
                        }
586
                case AUSUB_ARG:
142✔
587
                        switch b {
142✔
588
                        case '\r':
28✔
589
                                c.drop = 1
28✔
590
                        case '\n':
28✔
591
                                var arg []byte
28✔
592
                                if c.argBuf != nil {
28✔
593
                                        arg = c.argBuf
×
594
                                        c.argBuf = nil
×
595
                                } else {
28✔
596
                                        arg = buf[c.as : i-c.drop]
28✔
597
                                }
28✔
598
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
599
                                        return err
×
600
                                }
×
601
                                if trace {
45✔
602
                                        c.traceInOp("A-", arg)
17✔
603
                                }
17✔
604
                                c.processAccountUnsub(arg)
28✔
605
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
606
                        default:
86✔
607
                                if c.argBuf != nil {
86✔
608
                                        c.argBuf = append(c.argBuf, b)
×
609
                                }
×
610
                        }
611
                case OP_S:
137,881✔
612
                        switch b {
137,881✔
613
                        case 'U', 'u':
137,880✔
614
                                c.state = OP_SU
137,880✔
615
                        default:
1✔
616
                                goto parseErr
1✔
617
                        }
618
                case OP_SU:
137,880✔
619
                        switch b {
137,880✔
620
                        case 'B', 'b':
137,879✔
621
                                c.state = OP_SUB
137,879✔
622
                        default:
1✔
623
                                goto parseErr
1✔
624
                        }
625
                case OP_SUB:
1,049,706✔
626
                        switch b {
1,049,706✔
627
                        case ' ', '\t':
1,049,704✔
628
                                c.state = OP_SUB_SPC
1,049,704✔
629
                        default:
2✔
630
                                goto parseErr
2✔
631
                        }
632
                case OP_SUB_SPC:
1,049,705✔
633
                        switch b {
1,049,705✔
634
                        case ' ', '\t':
1✔
635
                                continue
1✔
636
                        default:
1,049,704✔
637
                                c.state = SUB_ARG
1,049,704✔
638
                                c.as = i
1,049,704✔
639
                        }
640
                case SUB_ARG:
38,832,437✔
641
                        switch b {
38,832,437✔
642
                        case '\r':
1,049,688✔
643
                                c.drop = 1
1,049,688✔
644
                        case '\n':
1,049,687✔
645
                                var arg []byte
1,049,687✔
646
                                if c.argBuf != nil {
1,073,357✔
647
                                        arg = c.argBuf
23,670✔
648
                                        c.argBuf = nil
23,670✔
649
                                } else {
1,049,687✔
650
                                        arg = buf[c.as : i-c.drop]
1,026,017✔
651
                                }
1,026,017✔
652
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,049,687✔
653
                                        return err
×
654
                                }
×
655
                                var err error
1,049,687✔
656

1,049,687✔
657
                                switch c.kind {
1,049,687✔
658
                                case CLIENT:
137,875✔
659
                                        if trace {
150,116✔
660
                                                c.traceInOp("SUB", arg)
12,241✔
661
                                        }
12,241✔
662
                                        err = c.parseSub(arg, false)
137,875✔
663
                                case ROUTER:
723,627✔
664
                                        switch c.op {
723,627✔
665
                                        case 'R', 'r':
711,713✔
666
                                                if trace {
714,683✔
667
                                                        c.traceInOp("RS+", arg)
2,970✔
668
                                                }
2,970✔
669
                                                err = c.processRemoteSub(arg, false)
711,713✔
670
                                        case 'L', 'l':
11,914✔
671
                                                if trace {
12,108✔
672
                                                        c.traceInOp("LS+", arg)
194✔
673
                                                }
194✔
674
                                                err = c.processRemoteSub(arg, true)
11,914✔
675
                                        }
676
                                case GATEWAY:
157,055✔
677
                                        if trace {
157,418✔
678
                                                c.traceInOp("RS+", arg)
363✔
679
                                        }
363✔
680
                                        err = c.processGatewayRSub(arg)
157,055✔
681
                                case LEAF:
31,130✔
682
                                        if trace {
31,256✔
683
                                                c.traceInOp("LS+", arg)
126✔
684
                                        }
126✔
685
                                        err = c.processLeafSub(arg)
31,130✔
686
                                }
687
                                if err != nil {
1,049,691✔
688
                                        return err
4✔
689
                                }
4✔
690
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,049,683✔
691
                        default:
36,733,062✔
692
                                if c.argBuf != nil {
37,317,622✔
693
                                        c.argBuf = append(c.argBuf, b)
584,560✔
694
                                }
584,560✔
695
                        }
696
                case OP_L:
167,278✔
697
                        switch b {
167,278✔
698
                        case 'S', 's':
57,375✔
699
                                c.state = OP_LS
57,375✔
700
                        case 'M', 'm':
109,903✔
701
                                c.state = OP_M
109,903✔
702
                        default:
×
703
                                goto parseErr
×
704
                        }
705
                case OP_LS:
57,375✔
706
                        switch b {
57,375✔
707
                        case '+':
43,044✔
708
                                c.state = OP_SUB
43,044✔
709
                        case '-':
14,331✔
710
                                c.state = OP_UNSUB
14,331✔
711
                        default:
×
712
                                goto parseErr
×
713
                        }
714
                case OP_R:
12,875,165✔
715
                        switch b {
12,875,165✔
716
                        case 'S', 's':
1,059,442✔
717
                                c.state = OP_RS
1,059,442✔
718
                        case 'M', 'm':
11,815,723✔
719
                                c.state = OP_M
11,815,723✔
720
                        default:
×
721
                                goto parseErr
×
722
                        }
723
                case OP_RS:
1,059,440✔
724
                        switch b {
1,059,440✔
725
                        case '+':
868,783✔
726
                                c.state = OP_SUB
868,783✔
727
                        case '-':
190,657✔
728
                                c.state = OP_UNSUB
190,657✔
729
                        default:
×
730
                                goto parseErr
×
731
                        }
732
                case OP_U:
14,676✔
733
                        switch b {
14,676✔
734
                        case 'N', 'n':
14,675✔
735
                                c.state = OP_UN
14,675✔
736
                        default:
1✔
737
                                goto parseErr
1✔
738
                        }
739
                case OP_UN:
14,675✔
740
                        switch b {
14,675✔
741
                        case 'S', 's':
14,674✔
742
                                c.state = OP_UNS
14,674✔
743
                        default:
1✔
744
                                goto parseErr
1✔
745
                        }
746
                case OP_UNS:
14,674✔
747
                        switch b {
14,674✔
748
                        case 'U', 'u':
14,673✔
749
                                c.state = OP_UNSU
14,673✔
750
                        default:
1✔
751
                                goto parseErr
1✔
752
                        }
753
                case OP_UNSU:
14,673✔
754
                        switch b {
14,673✔
755
                        case 'B', 'b':
14,672✔
756
                                c.state = OP_UNSUB
14,672✔
757
                        default:
1✔
758
                                goto parseErr
1✔
759
                        }
760
                case OP_UNSUB:
219,660✔
761
                        switch b {
219,660✔
762
                        case ' ', '\t':
219,654✔
763
                                c.state = OP_UNSUB_SPC
219,654✔
764
                        default:
6✔
765
                                goto parseErr
6✔
766
                        }
767
                case OP_UNSUB_SPC:
219,671✔
768
                        switch b {
219,671✔
769
                        case ' ', '\t':
17✔
770
                                continue
17✔
771
                        default:
219,654✔
772
                                c.state = UNSUB_ARG
219,654✔
773
                                c.as = i
219,654✔
774
                        }
775
                case UNSUB_ARG:
5,986,600✔
776
                        switch b {
5,986,600✔
777
                        case '\r':
219,651✔
778
                                c.drop = 1
219,651✔
779
                        case '\n':
219,653✔
780
                                var arg []byte
219,653✔
781
                                if c.argBuf != nil {
225,959✔
782
                                        arg = c.argBuf
6,306✔
783
                                        c.argBuf = nil
6,306✔
784
                                } else {
219,653✔
785
                                        arg = buf[c.as : i-c.drop]
213,347✔
786
                                }
213,347✔
787
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
219,653✔
788
                                        return err
×
789
                                }
×
790
                                var err error
219,653✔
791

219,653✔
792
                                switch c.kind {
219,653✔
793
                                case CLIENT:
14,666✔
794
                                        if trace {
26,665✔
795
                                                c.traceInOp("UNSUB", arg)
11,999✔
796
                                        }
11,999✔
797
                                        err = c.processUnsub(arg)
14,666✔
798
                                case ROUTER:
159,199✔
799
                                        if trace && c.srv != nil {
159,583✔
800
                                                switch c.op {
384✔
801
                                                case 'R', 'r':
190✔
802
                                                        c.traceInOp("RS-", arg)
190✔
803
                                                case 'L', 'l':
194✔
804
                                                        c.traceInOp("LS-", arg)
194✔
805
                                                }
806
                                        }
807
                                        leafUnsub := c.op == 'L' || c.op == 'l'
159,199✔
808
                                        err = c.processRemoteUnsub(arg, leafUnsub)
159,199✔
809
                                case GATEWAY:
42,637✔
810
                                        if trace {
43,822✔
811
                                                c.traceInOp("RS-", arg)
1,185✔
812
                                        }
1,185✔
813
                                        err = c.processGatewayRUnsub(arg)
42,637✔
814
                                case LEAF:
3,151✔
815
                                        if trace {
3,151✔
816
                                                c.traceInOp("LS-", arg)
×
817
                                        }
×
818
                                        err = c.processLeafUnsub(arg)
3,151✔
819
                                }
820
                                if err != nil {
219,655✔
821
                                        return err
2✔
822
                                }
2✔
823
                                c.drop, c.as, c.state = 0, i+1, OP_START
219,651✔
824
                        default:
5,547,296✔
825
                                if c.argBuf != nil {
5,664,169✔
826
                                        c.argBuf = append(c.argBuf, b)
116,873✔
827
                                }
116,873✔
828
                        }
829
                case OP_PI:
31,586✔
830
                        switch b {
31,586✔
831
                        case 'N', 'n':
31,585✔
832
                                c.state = OP_PIN
31,585✔
833
                        default:
1✔
834
                                goto parseErr
1✔
835
                        }
836
                case OP_PIN:
31,585✔
837
                        switch b {
31,585✔
838
                        case 'G', 'g':
31,584✔
839
                                c.state = OP_PING
31,584✔
840
                        default:
1✔
841
                                goto parseErr
1✔
842
                        }
843
                case OP_PING:
63,173✔
844
                        switch b {
63,173✔
845
                        case '\n':
31,583✔
846
                                if trace {
32,104✔
847
                                        c.traceInOp("PING", nil)
521✔
848
                                }
521✔
849
                                c.processPing()
31,583✔
850
                                c.drop, c.state = 0, OP_START
31,583✔
851
                        }
852
                case OP_PO:
20,691✔
853
                        switch b {
20,691✔
854
                        case 'N', 'n':
20,690✔
855
                                c.state = OP_PON
20,690✔
856
                        default:
1✔
857
                                goto parseErr
1✔
858
                        }
859
                case OP_PON:
20,690✔
860
                        switch b {
20,690✔
861
                        case 'G', 'g':
20,689✔
862
                                c.state = OP_PONG
20,689✔
863
                        default:
1✔
864
                                goto parseErr
1✔
865
                        }
866
                case OP_PONG:
41,383✔
867
                        switch b {
41,383✔
868
                        case '\n':
20,688✔
869
                                if trace {
20,696✔
870
                                        c.traceInOp("PONG", nil)
8✔
871
                                }
8✔
872
                                c.processPong()
20,688✔
873
                                c.drop, c.state = 0, OP_START
20,688✔
874
                        }
875
                case OP_C:
36,007✔
876
                        switch b {
36,007✔
877
                        case 'O', 'o':
36,006✔
878
                                c.state = OP_CO
36,006✔
879
                        default:
1✔
880
                                goto parseErr
1✔
881
                        }
882
                case OP_CO:
36,006✔
883
                        switch b {
36,006✔
884
                        case 'N', 'n':
36,005✔
885
                                c.state = OP_CON
36,005✔
886
                        default:
1✔
887
                                goto parseErr
1✔
888
                        }
889
                case OP_CON:
36,005✔
890
                        switch b {
36,005✔
891
                        case 'N', 'n':
36,004✔
892
                                c.state = OP_CONN
36,004✔
893
                        default:
1✔
894
                                goto parseErr
1✔
895
                        }
896
                case OP_CONN:
36,004✔
897
                        switch b {
36,004✔
898
                        case 'E', 'e':
36,003✔
899
                                c.state = OP_CONNE
36,003✔
900
                        default:
1✔
901
                                goto parseErr
1✔
902
                        }
903
                case OP_CONNE:
36,003✔
904
                        switch b {
36,003✔
905
                        case 'C', 'c':
36,002✔
906
                                c.state = OP_CONNEC
36,002✔
907
                        default:
1✔
908
                                goto parseErr
1✔
909
                        }
910
                case OP_CONNEC:
36,002✔
911
                        switch b {
36,002✔
912
                        case 'T', 't':
36,000✔
913
                                c.state = OP_CONNECT
36,000✔
914
                        default:
2✔
915
                                goto parseErr
2✔
916
                        }
917
                case OP_CONNECT:
72,000✔
918
                        switch b {
72,000✔
919
                        case ' ', '\t':
36,000✔
920
                                continue
36,000✔
921
                        default:
36,000✔
922
                                c.state = CONNECT_ARG
36,000✔
923
                                c.as = i
36,000✔
924
                        }
925
                case CONNECT_ARG:
7,227,687✔
926
                        switch b {
7,227,687✔
927
                        case '\r':
35,999✔
928
                                c.drop = 1
35,999✔
929
                        case '\n':
36,000✔
930
                                var arg []byte
36,000✔
931
                                if c.argBuf != nil {
37,395✔
932
                                        arg = c.argBuf
1,395✔
933
                                        c.argBuf = nil
1,395✔
934
                                } else {
36,000✔
935
                                        arg = buf[c.as : i-c.drop]
34,605✔
936
                                }
34,605✔
937
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
36,000✔
938
                                        return err
×
939
                                }
×
940
                                if trace {
36,677✔
941
                                        c.traceInOp("CONNECT", removePassFromTrace(arg))
677✔
942
                                }
677✔
943
                                if err := c.processConnect(arg); err != nil {
36,293✔
944
                                        return err
293✔
945
                                }
293✔
946
                                c.drop, c.state = 0, OP_START
35,707✔
947
                                // Reset notion on authSet
35,707✔
948
                                c.mu.Lock()
35,707✔
949
                                authSet = c.awaitingAuth()
35,707✔
950
                                c.mu.Unlock()
35,707✔
951
                        default:
7,155,688✔
952
                                if c.argBuf != nil {
7,615,901✔
953
                                        c.argBuf = append(c.argBuf, b)
460,213✔
954
                                }
460,213✔
955
                        }
956
                case OP_M:
11,925,626✔
957
                        switch b {
11,925,626✔
958
                        case 'S', 's':
11,925,626✔
959
                                c.state = OP_MS
11,925,626✔
960
                        default:
×
961
                                goto parseErr
×
962
                        }
963
                case OP_MS:
11,925,626✔
964
                        switch b {
11,925,626✔
965
                        case 'G', 'g':
11,925,626✔
966
                                c.state = OP_MSG
11,925,626✔
967
                        default:
×
968
                                goto parseErr
×
969
                        }
970
                case OP_MSG:
11,925,626✔
971
                        switch b {
11,925,626✔
972
                        case ' ', '\t':
11,925,626✔
973
                                c.state = OP_MSG_SPC
11,925,626✔
974
                        default:
×
975
                                goto parseErr
×
976
                        }
977
                case OP_MSG_SPC:
11,925,626✔
978
                        switch b {
11,925,626✔
979
                        case ' ', '\t':
×
980
                                continue
×
981
                        default:
11,925,626✔
982
                                c.pa.hdr = -1
11,925,626✔
983
                                c.state = MSG_ARG
11,925,626✔
984
                                c.as = i
11,925,626✔
985
                        }
986
                case MSG_ARG:
459,207,155✔
987
                        switch b {
459,207,155✔
988
                        case '\r':
11,925,626✔
989
                                c.drop = 1
11,925,626✔
990
                        case '\n':
11,925,626✔
991
                                var arg []byte
11,925,626✔
992
                                if c.argBuf != nil {
12,146,704✔
993
                                        arg = c.argBuf
221,078✔
994
                                        c.argBuf = nil
221,078✔
995
                                } else {
11,925,626✔
996
                                        arg = buf[c.as : i-c.drop]
11,704,548✔
997
                                }
11,704,548✔
998
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
11,925,626✔
999
                                        return err
×
1000
                                }
×
1001
                                var err error
11,925,626✔
1002
                                if c.kind == ROUTER || c.kind == GATEWAY {
23,749,710✔
1003
                                        switch c.op {
11,824,084✔
1004
                                        case 'R', 'r':
11,815,723✔
1005
                                                if trace {
11,822,287✔
1006
                                                        c.traceInOp("RMSG", arg)
6,564✔
1007
                                                }
6,564✔
1008
                                                err = c.processRoutedMsgArgs(arg)
11,815,723✔
1009
                                        case 'L', 'l':
8,361✔
1010
                                                if trace {
8,615✔
1011
                                                        c.traceInOp("LMSG", arg)
254✔
1012
                                                }
254✔
1013
                                                lmsg = true
8,361✔
1014
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
8,361✔
1015
                                        }
1016
                                } else if c.kind == LEAF {
203,084✔
1017
                                        if trace {
101,557✔
1018
                                                c.traceInOp("LMSG", arg)
15✔
1019
                                        }
15✔
1020
                                        err = c.processLeafMsgArgs(arg)
101,542✔
1021
                                }
1022
                                if err != nil {
11,925,627✔
1023
                                        return err
1✔
1024
                                }
1✔
1025
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
11,925,625✔
1026

11,925,625✔
1027
                                // jump ahead with the index. If this overruns
11,925,625✔
1028
                                // what is left we fall out and process split
11,925,625✔
1029
                                // buffer.
11,925,625✔
1030
                                i = c.as + c.pa.size - LEN_CR_LF
11,925,625✔
1031
                        default:
435,355,903✔
1032
                                if c.argBuf != nil {
438,105,941✔
1033
                                        c.argBuf = append(c.argBuf, b)
2,750,038✔
1034
                                }
2,750,038✔
1035
                        }
1036
                case OP_I:
72,751✔
1037
                        switch b {
72,751✔
1038
                        case 'N', 'n':
72,750✔
1039
                                c.state = OP_IN
72,750✔
1040
                        default:
1✔
1041
                                goto parseErr
1✔
1042
                        }
1043
                case OP_IN:
72,750✔
1044
                        switch b {
72,750✔
1045
                        case 'F', 'f':
72,749✔
1046
                                c.state = OP_INF
72,749✔
1047
                        default:
1✔
1048
                                goto parseErr
1✔
1049
                        }
1050
                case OP_INF:
72,749✔
1051
                        switch b {
72,749✔
1052
                        case 'O', 'o':
72,748✔
1053
                                c.state = OP_INFO
72,748✔
1054
                        default:
1✔
1055
                                goto parseErr
1✔
1056
                        }
1057
                case OP_INFO:
145,497✔
1058
                        switch b {
145,497✔
1059
                        case ' ', '\t':
72,749✔
1060
                                continue
72,749✔
1061
                        default:
72,748✔
1062
                                c.state = INFO_ARG
72,748✔
1063
                                c.as = i
72,748✔
1064
                        }
1065
                case INFO_ARG:
27,014,545✔
1066
                        switch b {
27,014,545✔
1067
                        case '\r':
72,745✔
1068
                                c.drop = 1
72,745✔
1069
                        case '\n':
72,746✔
1070
                                var arg []byte
72,746✔
1071
                                if c.argBuf != nil {
79,989✔
1072
                                        arg = c.argBuf
7,243✔
1073
                                        c.argBuf = nil
7,243✔
1074
                                } else {
72,746✔
1075
                                        arg = buf[c.as : i-c.drop]
65,503✔
1076
                                }
65,503✔
1077
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
72,746✔
1078
                                        return err
×
1079
                                }
×
1080
                                if err := c.processInfo(arg); err != nil {
72,747✔
1081
                                        return err
1✔
1082
                                }
1✔
1083
                                c.drop, c.as, c.state = 0, i+1, OP_START
72,745✔
1084
                        default:
26,869,054✔
1085
                                if c.argBuf != nil {
27,692,246✔
1086
                                        c.argBuf = append(c.argBuf, b)
823,192✔
1087
                                }
823,192✔
1088
                        }
1089
                case OP_PLUS:
3✔
1090
                        switch b {
3✔
1091
                        case 'O', 'o':
2✔
1092
                                c.state = OP_PLUS_O
2✔
1093
                        default:
1✔
1094
                                goto parseErr
1✔
1095
                        }
1096
                case OP_PLUS_O:
2✔
1097
                        switch b {
2✔
1098
                        case 'K', 'k':
1✔
1099
                                c.state = OP_PLUS_OK
1✔
1100
                        default:
1✔
1101
                                goto parseErr
1✔
1102
                        }
1103
                case OP_PLUS_OK:
2✔
1104
                        switch b {
2✔
1105
                        case '\n':
1✔
1106
                                c.drop, c.state = 0, OP_START
1✔
1107
                        }
1108
                case OP_MINUS:
155✔
1109
                        switch b {
155✔
1110
                        case 'E', 'e':
154✔
1111
                                c.state = OP_MINUS_E
154✔
1112
                        default:
1✔
1113
                                goto parseErr
1✔
1114
                        }
1115
                case OP_MINUS_E:
154✔
1116
                        switch b {
154✔
1117
                        case 'R', 'r':
153✔
1118
                                c.state = OP_MINUS_ER
153✔
1119
                        default:
1✔
1120
                                goto parseErr
1✔
1121
                        }
1122
                case OP_MINUS_ER:
153✔
1123
                        switch b {
153✔
1124
                        case 'R', 'r':
152✔
1125
                                c.state = OP_MINUS_ERR
152✔
1126
                        default:
1✔
1127
                                goto parseErr
1✔
1128
                        }
1129
                case OP_MINUS_ERR:
152✔
1130
                        switch b {
152✔
1131
                        case ' ', '\t':
151✔
1132
                                c.state = OP_MINUS_ERR_SPC
151✔
1133
                        default:
1✔
1134
                                goto parseErr
1✔
1135
                        }
1136
                case OP_MINUS_ERR_SPC:
151✔
1137
                        switch b {
151✔
1138
                        case ' ', '\t':
×
1139
                                continue
×
1140
                        default:
151✔
1141
                                c.state = MINUS_ERR_ARG
151✔
1142
                                c.as = i
151✔
1143
                        }
1144
                case MINUS_ERR_ARG:
6,193✔
1145
                        switch b {
6,193✔
1146
                        case '\r':
151✔
1147
                                c.drop = 1
151✔
1148
                        case '\n':
151✔
1149
                                var arg []byte
151✔
1150
                                if c.argBuf != nil {
152✔
1151
                                        arg = c.argBuf
1✔
1152
                                        c.argBuf = nil
1✔
1153
                                } else {
151✔
1154
                                        arg = buf[c.as : i-c.drop]
150✔
1155
                                }
150✔
1156
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
151✔
1157
                                        return err
×
1158
                                }
×
1159
                                c.processErr(string(arg))
151✔
1160
                                c.drop, c.as, c.state = 0, i+1, OP_START
151✔
1161
                        default:
5,891✔
1162
                                if c.argBuf != nil {
5,896✔
1163
                                        c.argBuf = append(c.argBuf, b)
5✔
1164
                                }
5✔
1165
                        }
1166
                default:
×
1167
                        goto parseErr
×
1168
                }
1169
        }
1170

1171
        // Check for split buffer scenarios for any ARG state.
1172
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
11,713,038✔
1173
                c.state == PUB_ARG || c.state == HPUB_ARG ||
11,713,038✔
1174
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
11,713,038✔
1175
                c.state == MSG_ARG || c.state == HMSG_ARG ||
11,713,038✔
1176
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
12,042,600✔
1177

329,562✔
1178
                // Setup a holder buffer to deal with split buffer scenario.
329,562✔
1179
                if c.argBuf == nil {
658,848✔
1180
                        c.argBuf = c.scratch[:0]
329,286✔
1181
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
329,286✔
1182
                }
329,286✔
1183
                // Check for violations of control line length here. Note that this is not
1184
                // exact at all but the performance hit is too great to be precise, and
1185
                // catching here should prevent memory exhaustion attacks.
1186
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
329,564✔
1187
                        return err
2✔
1188
                }
2✔
1189
        }
1190

1191
        // Check for split msg
1192
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
13,314,213✔
1193
                // We need to clone the pubArg if it is still referencing the
1,601,177✔
1194
                // read buffer and we are not able to process the msg.
1,601,177✔
1195

1,601,177✔
1196
                if c.argBuf == nil {
3,202,354✔
1197
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
1,601,177✔
1198
                        if err := c.clonePubArg(lmsg); err != nil {
1,601,177✔
1199
                                goto parseErr
×
1200
                        }
1201
                }
1202

1203
                // If we will overflow the scratch buffer, just create a
1204
                // new buffer to hold the split message.
1205
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
1,671,513✔
1206
                        lrem := len(buf[c.as:])
70,336✔
1207
                        // Consider it a protocol error when the remaining payload
70,336✔
1208
                        // is larger than the reported size for PUB. It can happen
70,336✔
1209
                        // when processing incomplete messages from rogue clients.
70,336✔
1210
                        if lrem > c.pa.size+LEN_CR_LF {
70,336✔
1211
                                goto parseErr
×
1212
                        }
1213
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
70,336✔
1214
                        copy(c.msgBuf, buf[c.as:])
70,336✔
1215
                } else {
1,530,841✔
1216
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
1,530,841✔
1217
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
1,530,841✔
1218
                }
1,530,841✔
1219
        }
1220

1221
        return nil
11,713,036✔
1222

11,713,036✔
1223
authErr:
11,713,036✔
1224
        c.authViolation()
12✔
1225
        return ErrAuthentication
12✔
1226

12✔
1227
parseErr:
12✔
1228
        c.sendErr("Unknown Protocol Operation")
53✔
1229
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
53✔
1230
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
53✔
1231
        return err
53✔
1232
}
1233

1234
func protoSnippet(start, max int, buf []byte) string {
111✔
1235
        stop := start + max
111✔
1236
        bufSize := len(buf)
111✔
1237
        if start >= bufSize {
114✔
1238
                return `""`
3✔
1239
        }
3✔
1240
        if stop > bufSize {
191✔
1241
                stop = bufSize - 1
83✔
1242
        }
83✔
1243
        return fmt.Sprintf("%q", buf[start:stop])
108✔
1244
}
1245

1246
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1247
// If so, an error is sent to the client and the connection is closed.
1248
// The error ErrMaxControlLine is returned.
1249
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
23,757,822✔
1250
        if c.kind != CLIENT {
40,273,756✔
1251
                return nil
16,515,934✔
1252
        }
16,515,934✔
1253
        if len(arg) > int(mcl) {
7,241,891✔
1254
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
3✔
1255
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
3✔
1256
                c.sendErr(err.Error())
3✔
1257
                c.closeConnection(MaxControlLineExceeded)
3✔
1258
                return err
3✔
1259
        }
3✔
1260
        return nil
7,241,885✔
1261
}
1262

1263
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1264
// we need to hold onto it into the next read.
1265
func (c *client) clonePubArg(lmsg bool) error {
1,601,177✔
1266
        // Just copy and re-process original arg buffer.
1,601,177✔
1267
        c.argBuf = c.scratch[:0]
1,601,177✔
1268
        c.argBuf = append(c.argBuf, c.pa.arg...)
1,601,177✔
1269

1,601,177✔
1270
        switch c.kind {
1,601,177✔
1271
        case ROUTER, GATEWAY:
1,009,448✔
1272
                if lmsg {
1,010,231✔
1273
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
783✔
1274
                }
783✔
1275
                if c.pa.hdr < 0 {
1,770,803✔
1276
                        return c.processRoutedMsgArgs(c.argBuf)
762,138✔
1277
                } else {
1,008,665✔
1278
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
246,527✔
1279
                }
246,527✔
1280
        case LEAF:
1,936✔
1281
                if c.pa.hdr < 0 {
3,763✔
1282
                        return c.processLeafMsgArgs(c.argBuf)
1,827✔
1283
                } else {
1,936✔
1284
                        return c.processLeafHeaderMsgArgs(c.argBuf)
109✔
1285
                }
109✔
1286
        default:
589,793✔
1287
                if c.pa.hdr < 0 {
630,902✔
1288
                        return c.processPub(c.argBuf)
41,109✔
1289
                } else {
589,793✔
1290
                        return c.processHeaderPub(c.argBuf, nil)
548,684✔
1291
                }
548,684✔
1292
        }
1293
}
1294

1295
func (ps *parseState) getHeader() http.Header {
830✔
1296
        if ps.header == nil {
1,660✔
1297
                if hdr := ps.pa.hdr; hdr > 0 {
903✔
1298
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1299
                        tp := textproto.NewReader(reader)
73✔
1300
                        tp.ReadLine() // skip over first line, contains version
73✔
1301
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1302
                                ps.header = http.Header(mimeHeader)
73✔
1303
                        }
73✔
1304
                }
1305
        }
1306
        return ps.header
830✔
1307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc