• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 19125103750

05 Nov 2025 02:14PM UTC coverage: 85.082% (-1.0%) from 86.085%
19125103750

push

github

web-flow
[IMPROVED] Implicit RePublish based on SubjectTransform (#7515)

If the stream configuration contains stream subject `a.>`, subject
transform `a.>` to `b.>` and republish `>` to `>`, this currently errors
containing a cycle. However, the intention behind this is that the
transformed `b.>` subject is RePublished. This PR now allows this, but
only if this link is preserved. Specifically only allowing if the stream
has a single subject and the transform maps only that subject. This is a
non-breaking change since this used to be an error before, and mostly
allows to not need to repeat the transformed subject of `b.>` two more
times in the source/destination of the RePublish.

Put simply, the `RePublish` of `>` to `>` gets transformed to a source
of `b.>` and destination of `b.>` given the prerequisites.

Resolves https://github.com/nats-io/nats-server/issues/7511

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

74059 of 87044 relevant lines covered (85.08%)

334642.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.15
/server/parser.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
}
36

37
type pubArg struct {
38
        arg       []byte
39
        pacache   []byte
40
        origin    []byte
41
        account   []byte
42
        subject   []byte
43
        deliver   []byte
44
        mapped    []byte
45
        reply     []byte
46
        szb       []byte
47
        hdb       []byte
48
        queues    [][]byte
49
        size      int
50
        hdr       int
51
        psi       []*serviceImport
52
        trace     *msgTrace
53
        delivered bool // Only used for service imports
54
}
55

56
// Parser constants
57
const (
58
        OP_START parserState = iota
59
        OP_PLUS
60
        OP_PLUS_O
61
        OP_PLUS_OK
62
        OP_MINUS
63
        OP_MINUS_E
64
        OP_MINUS_ER
65
        OP_MINUS_ERR
66
        OP_MINUS_ERR_SPC
67
        MINUS_ERR_ARG
68
        OP_C
69
        OP_CO
70
        OP_CON
71
        OP_CONN
72
        OP_CONNE
73
        OP_CONNEC
74
        OP_CONNECT
75
        CONNECT_ARG
76
        OP_H
77
        OP_HP
78
        OP_HPU
79
        OP_HPUB
80
        OP_HPUB_SPC
81
        HPUB_ARG
82
        OP_HM
83
        OP_HMS
84
        OP_HMSG
85
        OP_HMSG_SPC
86
        HMSG_ARG
87
        OP_P
88
        OP_PU
89
        OP_PUB
90
        OP_PUB_SPC
91
        PUB_ARG
92
        OP_PI
93
        OP_PIN
94
        OP_PING
95
        OP_PO
96
        OP_PON
97
        OP_PONG
98
        MSG_PAYLOAD
99
        MSG_END_R
100
        MSG_END_N
101
        OP_S
102
        OP_SU
103
        OP_SUB
104
        OP_SUB_SPC
105
        SUB_ARG
106
        OP_A
107
        OP_ASUB
108
        OP_ASUB_SPC
109
        ASUB_ARG
110
        OP_AUSUB
111
        OP_AUSUB_SPC
112
        AUSUB_ARG
113
        OP_L
114
        OP_LS
115
        OP_R
116
        OP_RS
117
        OP_U
118
        OP_UN
119
        OP_UNS
120
        OP_UNSU
121
        OP_UNSUB
122
        OP_UNSUB_SPC
123
        UNSUB_ARG
124
        OP_M
125
        OP_MS
126
        OP_MSG
127
        OP_MSG_SPC
128
        MSG_ARG
129
        OP_I
130
        OP_IN
131
        OP_INF
132
        OP_INFO
133
        INFO_ARG
134
)
135

136
func (c *client) parse(buf []byte) error {
6,621,544✔
137
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
6,621,544✔
138
        // an issue (say data race detection), we could branch outside in readLoop
6,621,544✔
139
        if c.isMqtt() {
6,623,177✔
140
                return c.mqttParse(buf)
1,633✔
141
        }
1,633✔
142
        var i int
6,619,911✔
143
        var b byte
6,619,911✔
144
        var lmsg bool
6,619,911✔
145

6,619,911✔
146
        // Snapshots
6,619,911✔
147
        c.mu.Lock()
6,619,911✔
148
        // Snapshot and then reset when we receive a
6,619,911✔
149
        // proper CONNECT if needed.
6,619,911✔
150
        authSet := c.awaitingAuth()
6,619,911✔
151
        // Snapshot max control line as well.
6,619,911✔
152
        s, mcl, trace := c.srv, c.mcl, c.trace
6,619,911✔
153
        c.mu.Unlock()
6,619,911✔
154

6,619,911✔
155
        // Move to loop instead of range syntax to allow jumping of i
6,619,911✔
156
        for i = 0; i < len(buf); i++ {
606,368,018✔
157
                b = buf[i]
599,748,107✔
158

599,748,107✔
159
                switch c.state {
599,748,107✔
160
                case OP_START:
15,917,802✔
161
                        c.op = b
15,917,802✔
162
                        if b != 'C' && b != 'c' {
31,791,295✔
163
                                if authSet {
15,873,507✔
164
                                        if s == nil {
14✔
165
                                                goto authErr
×
166
                                        }
167
                                        var ok bool
14✔
168
                                        // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
14✔
169
                                        // E.g. telnet proto demos.
14✔
170
                                        if noAuthUser := s.getOpts().NoAuthUser; noAuthUser != _EMPTY_ {
16✔
171
                                                s.mu.Lock()
2✔
172
                                                user, exists := s.users[noAuthUser]
2✔
173
                                                s.mu.Unlock()
2✔
174
                                                if exists {
4✔
175
                                                        c.RegisterUser(user)
2✔
176
                                                        c.mu.Lock()
2✔
177
                                                        c.clearAuthTimer()
2✔
178
                                                        c.flags.set(connectReceived)
2✔
179
                                                        c.mu.Unlock()
2✔
180
                                                        authSet, ok = false, true
2✔
181
                                                }
2✔
182
                                        }
183
                                        if !ok {
26✔
184
                                                goto authErr
12✔
185
                                        }
186
                                }
187
                                // If the connection is a gateway connection, make sure that
188
                                // if this is an inbound, it starts with a CONNECT.
189
                                if c.kind == GATEWAY && !c.gw.outbound && !c.gw.connected {
15,873,481✔
190
                                        // Use auth violation since no CONNECT was sent.
×
191
                                        // It could be a parseErr too.
×
192
                                        goto authErr
×
193
                                }
194
                        }
195
                        switch b {
15,917,790✔
196
                        case 'P', 'p':
7,835,363✔
197
                                c.state = OP_P
7,835,363✔
198
                        case 'H', 'h':
604,954✔
199
                                c.state = OP_H
604,954✔
200
                        case 'S', 's':
135,412✔
201
                                c.state = OP_S
135,412✔
202
                        case 'U', 'u':
13,120✔
203
                                c.state = OP_U
13,120✔
204
                        case 'R', 'r':
7,015,989✔
205
                                if c.kind == CLIENT {
7,015,989✔
206
                                        goto parseErr
×
207
                                } else {
7,015,989✔
208
                                        c.state = OP_R
7,015,989✔
209
                                }
7,015,989✔
210
                        case 'L', 'l':
180,430✔
211
                                if c.kind != LEAF && c.kind != ROUTER {
180,430✔
212
                                        goto parseErr
×
213
                                } else {
180,430✔
214
                                        c.state = OP_L
180,430✔
215
                                }
180,430✔
216
                        case 'A', 'a':
32✔
217
                                if c.kind == CLIENT {
32✔
218
                                        goto parseErr
×
219
                                } else {
32✔
220
                                        c.state = OP_A
32✔
221
                                }
32✔
222
                        case 'C', 'c':
44,309✔
223
                                c.state = OP_C
44,309✔
224
                        case 'I', 'i':
87,994✔
225
                                c.state = OP_I
87,994✔
226
                        case '+':
3✔
227
                                c.state = OP_PLUS
3✔
228
                        case '-':
172✔
229
                                c.state = OP_MINUS
172✔
230
                        default:
12✔
231
                                goto parseErr
12✔
232
                        }
233
                case OP_H:
604,954✔
234
                        switch b {
604,954✔
235
                        case 'P', 'p':
228,598✔
236
                                c.state = OP_HP
228,598✔
237
                        case 'M', 'm':
376,356✔
238
                                c.state = OP_HM
376,356✔
239
                        default:
×
240
                                goto parseErr
×
241
                        }
242
                case OP_HP:
228,598✔
243
                        switch b {
228,598✔
244
                        case 'U', 'u':
228,598✔
245
                                c.state = OP_HPU
228,598✔
246
                        default:
×
247
                                goto parseErr
×
248
                        }
249
                case OP_HPU:
228,598✔
250
                        switch b {
228,598✔
251
                        case 'B', 'b':
228,598✔
252
                                c.state = OP_HPUB
228,598✔
253
                        default:
×
254
                                goto parseErr
×
255
                        }
256
                case OP_HPUB:
228,598✔
257
                        switch b {
228,598✔
258
                        case ' ', '\t':
228,598✔
259
                                c.state = OP_HPUB_SPC
228,598✔
260
                        default:
×
261
                                goto parseErr
×
262
                        }
263
                case OP_HPUB_SPC:
228,598✔
264
                        switch b {
228,598✔
265
                        case ' ', '\t':
×
266
                                continue
×
267
                        default:
228,598✔
268
                                c.pa.hdr = 0
228,598✔
269
                                c.state = HPUB_ARG
228,598✔
270
                                c.as = i
228,598✔
271
                        }
272
                case HPUB_ARG:
8,516,511✔
273
                        switch b {
8,516,511✔
274
                        case '\r':
228,598✔
275
                                c.drop = 1
228,598✔
276
                        case '\n':
228,598✔
277
                                var arg []byte
228,598✔
278
                                if c.argBuf != nil {
228,660✔
279
                                        arg = c.argBuf
62✔
280
                                        c.argBuf = nil
62✔
281
                                } else {
228,598✔
282
                                        arg = buf[c.as : i-c.drop]
228,536✔
283
                                }
228,536✔
284
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
228,598✔
285
                                        return err
×
286
                                }
×
287
                                if trace {
228,606✔
288
                                        c.traceInOp("HPUB", arg)
8✔
289
                                }
8✔
290
                                var remaining []byte
228,598✔
291
                                if i < len(buf) {
457,196✔
292
                                        remaining = buf[i+1:]
228,598✔
293
                                }
228,598✔
294
                                if err := c.processHeaderPub(arg, remaining); err != nil {
228,602✔
295
                                        return err
4✔
296
                                }
4✔
297

298
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
228,594✔
299
                                // If we don't have a saved buffer then jump ahead with
228,594✔
300
                                // the index. If this overruns what is left we fall out
228,594✔
301
                                // and process split buffer.
228,594✔
302
                                if c.msgBuf == nil {
457,188✔
303
                                        i = c.as + c.pa.size - LEN_CR_LF
228,594✔
304
                                }
228,594✔
305
                        default:
8,059,315✔
306
                                if c.argBuf != nil {
8,060,092✔
307
                                        c.argBuf = append(c.argBuf, b)
777✔
308
                                }
777✔
309
                        }
310
                case OP_HM:
376,356✔
311
                        switch b {
376,356✔
312
                        case 'S', 's':
376,356✔
313
                                c.state = OP_HMS
376,356✔
314
                        default:
×
315
                                goto parseErr
×
316
                        }
317
                case OP_HMS:
376,356✔
318
                        switch b {
376,356✔
319
                        case 'G', 'g':
376,356✔
320
                                c.state = OP_HMSG
376,356✔
321
                        default:
×
322
                                goto parseErr
×
323
                        }
324
                case OP_HMSG:
376,356✔
325
                        switch b {
376,356✔
326
                        case ' ', '\t':
376,356✔
327
                                c.state = OP_HMSG_SPC
376,356✔
328
                        default:
×
329
                                goto parseErr
×
330
                        }
331
                case OP_HMSG_SPC:
376,356✔
332
                        switch b {
376,356✔
333
                        case ' ', '\t':
×
334
                                continue
×
335
                        default:
376,356✔
336
                                c.pa.hdr = 0
376,356✔
337
                                c.state = HMSG_ARG
376,356✔
338
                                c.as = i
376,356✔
339
                        }
340
                case HMSG_ARG:
31,619,932✔
341
                        switch b {
31,619,932✔
342
                        case '\r':
376,356✔
343
                                c.drop = 1
376,356✔
344
                        case '\n':
376,356✔
345
                                var arg []byte
376,356✔
346
                                if c.argBuf != nil {
380,980✔
347
                                        arg = c.argBuf
4,624✔
348
                                        c.argBuf = nil
4,624✔
349
                                } else {
376,356✔
350
                                        arg = buf[c.as : i-c.drop]
371,732✔
351
                                }
371,732✔
352
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
376,356✔
353
                                        return err
×
354
                                }
×
355
                                var err error
376,356✔
356
                                if c.kind == ROUTER || c.kind == GATEWAY {
752,318✔
357
                                        if trace {
378,115✔
358
                                                c.traceInOp("HMSG", arg)
2,153✔
359
                                        }
2,153✔
360
                                        err = c.processRoutedHeaderMsgArgs(arg)
375,962✔
361
                                } else if c.kind == LEAF {
788✔
362
                                        if trace {
549✔
363
                                                c.traceInOp("HMSG", arg)
155✔
364
                                        }
155✔
365
                                        err = c.processLeafHeaderMsgArgs(arg)
394✔
366
                                }
367
                                if err != nil {
376,357✔
368
                                        return err
1✔
369
                                }
1✔
370
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
376,355✔
371

376,355✔
372
                                // jump ahead with the index. If this overruns
376,355✔
373
                                // what is left we fall out and process split
376,355✔
374
                                // buffer.
376,355✔
375
                                i = c.as + c.pa.size - LEN_CR_LF
376,355✔
376
                        default:
30,867,220✔
377
                                if c.argBuf != nil {
30,993,525✔
378
                                        c.argBuf = append(c.argBuf, b)
126,305✔
379
                                }
126,305✔
380
                        }
381
                case OP_P:
7,835,362✔
382
                        switch b {
7,835,362✔
383
                        case 'U', 'u':
7,765,741✔
384
                                c.state = OP_PU
7,765,741✔
385
                        case 'I', 'i':
41,073✔
386
                                c.state = OP_PI
41,073✔
387
                        case 'O', 'o':
28,547✔
388
                                c.state = OP_PO
28,547✔
389
                        default:
1✔
390
                                goto parseErr
1✔
391
                        }
392
                case OP_PU:
7,765,740✔
393
                        switch b {
7,765,740✔
394
                        case 'B', 'b':
7,765,739✔
395
                                c.state = OP_PUB
7,765,739✔
396
                        default:
1✔
397
                                goto parseErr
1✔
398
                        }
399
                case OP_PUB:
7,765,739✔
400
                        switch b {
7,765,739✔
401
                        case ' ', '\t':
7,765,738✔
402
                                c.state = OP_PUB_SPC
7,765,738✔
403
                        default:
1✔
404
                                goto parseErr
1✔
405
                        }
406
                case OP_PUB_SPC:
7,765,739✔
407
                        switch b {
7,765,739✔
408
                        case ' ', '\t':
1✔
409
                                continue
1✔
410
                        default:
7,765,738✔
411
                                c.pa.hdr = -1
7,765,738✔
412
                                c.state = PUB_ARG
7,765,738✔
413
                                c.as = i
7,765,738✔
414
                        }
415
                case PUB_ARG:
119,572,108✔
416
                        switch b {
119,572,108✔
417
                        case '\r':
7,765,726✔
418
                                c.drop = 1
7,765,726✔
419
                        case '\n':
7,765,727✔
420
                                var arg []byte
7,765,727✔
421
                                if c.argBuf != nil {
7,793,898✔
422
                                        arg = c.argBuf
28,171✔
423
                                        c.argBuf = nil
28,171✔
424
                                } else {
7,765,727✔
425
                                        arg = buf[c.as : i-c.drop]
7,737,556✔
426
                                }
7,737,556✔
427
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
7,765,728✔
428
                                        return err
1✔
429
                                }
1✔
430
                                if trace {
8,062,618✔
431
                                        c.traceInOp("PUB", arg)
296,892✔
432
                                }
296,892✔
433
                                if err := c.processPub(arg); err != nil {
7,765,737✔
434
                                        return err
11✔
435
                                }
11✔
436

437
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
7,765,715✔
438
                                // If we don't have a saved buffer then jump ahead with
7,765,715✔
439
                                // the index. If this overruns what is left we fall out
7,765,715✔
440
                                // and process split buffer.
7,765,715✔
441
                                if c.msgBuf == nil {
15,531,430✔
442
                                        i = c.as + c.pa.size - LEN_CR_LF
7,765,715✔
443
                                }
7,765,715✔
444
                        default:
104,040,655✔
445
                                if c.argBuf != nil {
104,560,131✔
446
                                        c.argBuf = append(c.argBuf, b)
519,476✔
447
                                }
519,476✔
448
                        }
449
                case MSG_PAYLOAD:
14,528,088✔
450
                        if c.msgBuf != nil {
15,378,967✔
451
                                // copy as much as we can to the buffer and skip ahead.
850,879✔
452
                                toCopy := c.pa.size - len(c.msgBuf)
850,879✔
453
                                avail := len(buf) - i
850,879✔
454
                                if avail < toCopy {
1,059,116✔
455
                                        toCopy = avail
208,237✔
456
                                }
208,237✔
457
                                if toCopy > 0 {
1,701,758✔
458
                                        start := len(c.msgBuf)
850,879✔
459
                                        // This is needed for copy to work.
850,879✔
460
                                        c.msgBuf = c.msgBuf[:start+toCopy]
850,879✔
461
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
850,879✔
462
                                        // Update our index
850,879✔
463
                                        i = (i + toCopy) - 1
850,879✔
464
                                } else {
850,879✔
465
                                        // Fall back to append if needed.
×
466
                                        c.msgBuf = append(c.msgBuf, b)
×
467
                                }
×
468
                                if len(c.msgBuf) >= c.pa.size {
1,493,521✔
469
                                        c.state = MSG_END_R
642,642✔
470
                                }
642,642✔
471
                        } else if i-c.as+1 >= c.pa.size {
27,354,418✔
472
                                c.state = MSG_END_R
13,677,209✔
473
                        }
13,677,209✔
474
                case MSG_END_R:
14,319,850✔
475
                        if b != '\r' {
14,319,853✔
476
                                goto parseErr
3✔
477
                        }
478
                        if c.msgBuf != nil {
14,980,444✔
479
                                c.msgBuf = append(c.msgBuf, b)
660,597✔
480
                        }
660,597✔
481
                        c.state = MSG_END_N
14,319,847✔
482
                case MSG_END_N:
14,319,831✔
483
                        if b != '\n' {
14,319,832✔
484
                                goto parseErr
1✔
485
                        }
486
                        if c.msgBuf != nil {
14,982,213✔
487
                                c.msgBuf = append(c.msgBuf, b)
662,383✔
488
                        } else {
14,319,830✔
489
                                c.msgBuf = buf[c.as : i+1]
13,657,447✔
490
                        }
13,657,447✔
491

492
                        var mt *msgTrace
14,319,830✔
493
                        if c.pa.hdr > 0 {
14,925,226✔
494
                                mt = c.initMsgTrace()
605,396✔
495
                        }
605,396✔
496
                        // Check for mappings.
497
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
14,352,829✔
498
                                changed := c.selectMappedSubject()
32,999✔
499
                                if changed {
45,575✔
500
                                        if trace {
12,652✔
501
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
76✔
502
                                        }
76✔
503
                                        // c.pa.subject is the subject the original is now mapped to.
504
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,576✔
505
                                }
506
                        }
507
                        if trace {
14,626,064✔
508
                                c.traceMsg(c.msgBuf)
306,234✔
509
                        }
306,234✔
510

511
                        c.processInboundMsg(c.msgBuf)
14,319,830✔
512

14,319,830✔
513
                        mt.sendEvent()
14,319,830✔
514
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
14,319,830✔
515
                        c.drop, c.as, c.state = 0, i+1, OP_START
14,319,830✔
516
                        // Drop all pub args
14,319,830✔
517
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
14,319,830✔
518
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
14,319,830✔
519
                        c.pa.trace = nil
14,319,830✔
520
                        c.pa.delivered = false
14,319,830✔
521
                        lmsg = false
14,319,830✔
522
                case OP_A:
32✔
523
                        switch b {
32✔
524
                        case '+':
4✔
525
                                c.state = OP_ASUB
4✔
526
                        case '-', 'u':
28✔
527
                                c.state = OP_AUSUB
28✔
528
                        default:
×
529
                                goto parseErr
×
530
                        }
531
                case OP_ASUB:
4✔
532
                        switch b {
4✔
533
                        case ' ', '\t':
4✔
534
                                c.state = OP_ASUB_SPC
4✔
535
                        default:
×
536
                                goto parseErr
×
537
                        }
538
                case OP_ASUB_SPC:
4✔
539
                        switch b {
4✔
540
                        case ' ', '\t':
×
541
                                continue
×
542
                        default:
4✔
543
                                c.state = ASUB_ARG
4✔
544
                                c.as = i
4✔
545
                        }
546
                case ASUB_ARG:
14✔
547
                        switch b {
14✔
548
                        case '\r':
4✔
549
                                c.drop = 1
4✔
550
                        case '\n':
4✔
551
                                var arg []byte
4✔
552
                                if c.argBuf != nil {
4✔
553
                                        arg = c.argBuf
×
554
                                        c.argBuf = nil
×
555
                                } else {
4✔
556
                                        arg = buf[c.as : i-c.drop]
4✔
557
                                }
4✔
558
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
559
                                        return err
×
560
                                }
×
561
                                if trace {
4✔
562
                                        c.traceInOp("A+", arg)
×
563
                                }
×
564
                                if err := c.processAccountSub(arg); err != nil {
4✔
565
                                        return err
×
566
                                }
×
567
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
568
                        default:
6✔
569
                                if c.argBuf != nil {
6✔
570
                                        c.argBuf = append(c.argBuf, b)
×
571
                                }
×
572
                        }
573
                case OP_AUSUB:
28✔
574
                        switch b {
28✔
575
                        case ' ', '\t':
28✔
576
                                c.state = OP_AUSUB_SPC
28✔
577
                        default:
×
578
                                goto parseErr
×
579
                        }
580
                case OP_AUSUB_SPC:
28✔
581
                        switch b {
28✔
582
                        case ' ', '\t':
×
583
                                continue
×
584
                        default:
28✔
585
                                c.state = AUSUB_ARG
28✔
586
                                c.as = i
28✔
587
                        }
588
                case AUSUB_ARG:
142✔
589
                        switch b {
142✔
590
                        case '\r':
28✔
591
                                c.drop = 1
28✔
592
                        case '\n':
28✔
593
                                var arg []byte
28✔
594
                                if c.argBuf != nil {
28✔
595
                                        arg = c.argBuf
×
596
                                        c.argBuf = nil
×
597
                                } else {
28✔
598
                                        arg = buf[c.as : i-c.drop]
28✔
599
                                }
28✔
600
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
601
                                        return err
×
602
                                }
×
603
                                if trace {
45✔
604
                                        c.traceInOp("A-", arg)
17✔
605
                                }
17✔
606
                                c.processAccountUnsub(arg)
28✔
607
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
608
                        default:
86✔
609
                                if c.argBuf != nil {
86✔
610
                                        c.argBuf = append(c.argBuf, b)
×
611
                                }
×
612
                        }
613
                case OP_S:
135,412✔
614
                        switch b {
135,412✔
615
                        case 'U', 'u':
135,411✔
616
                                c.state = OP_SU
135,411✔
617
                        default:
1✔
618
                                goto parseErr
1✔
619
                        }
620
                case OP_SU:
135,411✔
621
                        switch b {
135,411✔
622
                        case 'B', 'b':
135,410✔
623
                                c.state = OP_SUB
135,410✔
624
                        default:
1✔
625
                                goto parseErr
1✔
626
                        }
627
                case OP_SUB:
1,173,988✔
628
                        switch b {
1,173,988✔
629
                        case ' ', '\t':
1,173,986✔
630
                                c.state = OP_SUB_SPC
1,173,986✔
631
                        default:
2✔
632
                                goto parseErr
2✔
633
                        }
634
                case OP_SUB_SPC:
1,173,987✔
635
                        switch b {
1,173,987✔
636
                        case ' ', '\t':
1✔
637
                                continue
1✔
638
                        default:
1,173,986✔
639
                                c.state = SUB_ARG
1,173,986✔
640
                                c.as = i
1,173,986✔
641
                        }
642
                case SUB_ARG:
44,185,869✔
643
                        switch b {
44,185,869✔
644
                        case '\r':
1,173,966✔
645
                                c.drop = 1
1,173,966✔
646
                        case '\n':
1,173,966✔
647
                                var arg []byte
1,173,966✔
648
                                if c.argBuf != nil {
1,201,335✔
649
                                        arg = c.argBuf
27,369✔
650
                                        c.argBuf = nil
27,369✔
651
                                } else {
1,173,966✔
652
                                        arg = buf[c.as : i-c.drop]
1,146,597✔
653
                                }
1,146,597✔
654
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,173,966✔
655
                                        return err
×
656
                                }
×
657
                                var err error
1,173,966✔
658

1,173,966✔
659
                                switch c.kind {
1,173,966✔
660
                                case CLIENT:
135,406✔
661
                                        if trace {
146,049✔
662
                                                c.traceInOp("SUB", arg)
10,643✔
663
                                        }
10,643✔
664
                                        err = c.parseSub(arg, false)
135,406✔
665
                                case ROUTER:
844,503✔
666
                                        switch c.op {
844,503✔
667
                                        case 'R', 'r':
832,442✔
668
                                                if trace {
835,361✔
669
                                                        c.traceInOp("RS+", arg)
2,919✔
670
                                                }
2,919✔
671
                                                err = c.processRemoteSub(arg, false)
832,442✔
672
                                        case 'L', 'l':
12,061✔
673
                                                if trace {
12,255✔
674
                                                        c.traceInOp("LS+", arg)
194✔
675
                                                }
194✔
676
                                                err = c.processRemoteSub(arg, true)
12,061✔
677
                                        }
678
                                case GATEWAY:
160,896✔
679
                                        if trace {
161,364✔
680
                                                c.traceInOp("RS+", arg)
468✔
681
                                        }
468✔
682
                                        err = c.processGatewayRSub(arg)
160,896✔
683
                                case LEAF:
33,161✔
684
                                        if trace {
33,287✔
685
                                                c.traceInOp("LS+", arg)
126✔
686
                                        }
126✔
687
                                        err = c.processLeafSub(arg)
33,161✔
688
                                }
689
                                if err != nil {
1,173,970✔
690
                                        return err
4✔
691
                                }
4✔
692
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,173,962✔
693
                        default:
41,837,937✔
694
                                if c.argBuf != nil {
42,522,946✔
695
                                        c.argBuf = append(c.argBuf, b)
685,009✔
696
                                }
685,009✔
697
                        }
698
                case OP_L:
180,430✔
699
                        switch b {
180,430✔
700
                        case 'S', 's':
60,015✔
701
                                c.state = OP_LS
60,015✔
702
                        case 'M', 'm':
120,415✔
703
                                c.state = OP_M
120,415✔
704
                        default:
×
705
                                goto parseErr
×
706
                        }
707
                case OP_LS:
60,015✔
708
                        switch b {
60,015✔
709
                        case '+':
45,222✔
710
                                c.state = OP_SUB
45,222✔
711
                        case '-':
14,793✔
712
                                c.state = OP_UNSUB
14,793✔
713
                        default:
×
714
                                goto parseErr
×
715
                        }
716
                case OP_R:
7,015,989✔
717
                        switch b {
7,015,989✔
718
                        case 'S', 's':
1,187,181✔
719
                                c.state = OP_RS
1,187,181✔
720
                        case 'M', 'm':
5,828,808✔
721
                                c.state = OP_M
5,828,808✔
722
                        default:
×
723
                                goto parseErr
×
724
                        }
725
                case OP_RS:
1,187,181✔
726
                        switch b {
1,187,181✔
727
                        case '+':
993,357✔
728
                                c.state = OP_SUB
993,357✔
729
                        case '-':
193,824✔
730
                                c.state = OP_UNSUB
193,824✔
731
                        default:
×
732
                                goto parseErr
×
733
                        }
734
                case OP_U:
13,120✔
735
                        switch b {
13,120✔
736
                        case 'N', 'n':
13,119✔
737
                                c.state = OP_UN
13,119✔
738
                        default:
1✔
739
                                goto parseErr
1✔
740
                        }
741
                case OP_UN:
13,119✔
742
                        switch b {
13,119✔
743
                        case 'S', 's':
13,118✔
744
                                c.state = OP_UNS
13,118✔
745
                        default:
1✔
746
                                goto parseErr
1✔
747
                        }
748
                case OP_UNS:
13,118✔
749
                        switch b {
13,118✔
750
                        case 'U', 'u':
13,117✔
751
                                c.state = OP_UNSU
13,117✔
752
                        default:
1✔
753
                                goto parseErr
1✔
754
                        }
755
                case OP_UNSU:
13,117✔
756
                        switch b {
13,117✔
757
                        case 'B', 'b':
13,116✔
758
                                c.state = OP_UNSUB
13,116✔
759
                        default:
1✔
760
                                goto parseErr
1✔
761
                        }
762
                case OP_UNSUB:
221,733✔
763
                        switch b {
221,733✔
764
                        case ' ', '\t':
221,727✔
765
                                c.state = OP_UNSUB_SPC
221,727✔
766
                        default:
6✔
767
                                goto parseErr
6✔
768
                        }
769
                case OP_UNSUB_SPC:
221,743✔
770
                        switch b {
221,743✔
771
                        case ' ', '\t':
17✔
772
                                continue
17✔
773
                        default:
221,726✔
774
                                c.state = UNSUB_ARG
221,726✔
775
                                c.as = i
221,726✔
776
                        }
777
                case UNSUB_ARG:
6,223,680✔
778
                        switch b {
6,223,680✔
779
                        case '\r':
221,722✔
780
                                c.drop = 1
221,722✔
781
                        case '\n':
221,724✔
782
                                var arg []byte
221,724✔
783
                                if c.argBuf != nil {
228,300✔
784
                                        arg = c.argBuf
6,576✔
785
                                        c.argBuf = nil
6,576✔
786
                                } else {
221,724✔
787
                                        arg = buf[c.as : i-c.drop]
215,148✔
788
                                }
215,148✔
789
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
221,724✔
790
                                        return err
×
791
                                }
×
792
                                var err error
221,724✔
793

221,724✔
794
                                switch c.kind {
221,724✔
795
                                case CLIENT:
13,110✔
796
                                        if trace {
23,503✔
797
                                                c.traceInOp("UNSUB", arg)
10,393✔
798
                                        }
10,393✔
799
                                        err = c.processUnsub(arg)
13,110✔
800
                                case ROUTER:
162,515✔
801
                                        if trace && c.srv != nil {
162,886✔
802
                                                switch c.op {
371✔
803
                                                case 'R', 'r':
177✔
804
                                                        c.traceInOp("RS-", arg)
177✔
805
                                                case 'L', 'l':
194✔
806
                                                        c.traceInOp("LS-", arg)
194✔
807
                                                }
808
                                        }
809
                                        leafUnsub := c.op == 'L' || c.op == 'l'
162,515✔
810
                                        err = c.processRemoteUnsub(arg, leafUnsub)
162,515✔
811
                                case GATEWAY:
42,626✔
812
                                        if trace {
43,808✔
813
                                                c.traceInOp("RS-", arg)
1,182✔
814
                                        }
1,182✔
815
                                        err = c.processGatewayRUnsub(arg)
42,626✔
816
                                case LEAF:
3,473✔
817
                                        if trace {
3,479✔
818
                                                c.traceInOp("LS-", arg)
6✔
819
                                        }
6✔
820
                                        err = c.processLeafUnsub(arg)
3,473✔
821
                                }
822
                                if err != nil {
221,726✔
823
                                        return err
2✔
824
                                }
2✔
825
                                c.drop, c.as, c.state = 0, i+1, OP_START
221,722✔
826
                        default:
5,780,234✔
827
                                if c.argBuf != nil {
5,909,998✔
828
                                        c.argBuf = append(c.argBuf, b)
129,764✔
829
                                }
129,764✔
830
                        }
831
                case OP_PI:
41,073✔
832
                        switch b {
41,073✔
833
                        case 'N', 'n':
41,072✔
834
                                c.state = OP_PIN
41,072✔
835
                        default:
1✔
836
                                goto parseErr
1✔
837
                        }
838
                case OP_PIN:
41,072✔
839
                        switch b {
41,072✔
840
                        case 'G', 'g':
41,071✔
841
                                c.state = OP_PING
41,071✔
842
                        default:
1✔
843
                                goto parseErr
1✔
844
                        }
845
                case OP_PING:
82,147✔
846
                        switch b {
82,147✔
847
                        case '\n':
41,070✔
848
                                if trace {
41,715✔
849
                                        c.traceInOp("PING", nil)
645✔
850
                                }
645✔
851
                                c.processPing()
41,070✔
852
                                c.drop, c.state = 0, OP_START
41,070✔
853
                        }
854
                case OP_PO:
28,547✔
855
                        switch b {
28,547✔
856
                        case 'N', 'n':
28,546✔
857
                                c.state = OP_PON
28,546✔
858
                        default:
1✔
859
                                goto parseErr
1✔
860
                        }
861
                case OP_PON:
28,546✔
862
                        switch b {
28,546✔
863
                        case 'G', 'g':
28,545✔
864
                                c.state = OP_PONG
28,545✔
865
                        default:
1✔
866
                                goto parseErr
1✔
867
                        }
868
                case OP_PONG:
57,095✔
869
                        switch b {
57,095✔
870
                        case '\n':
28,544✔
871
                                if trace {
28,601✔
872
                                        c.traceInOp("PONG", nil)
57✔
873
                                }
57✔
874
                                c.processPong()
28,544✔
875
                                c.drop, c.state = 0, OP_START
28,544✔
876
                        }
877
                case OP_C:
44,309✔
878
                        switch b {
44,309✔
879
                        case 'O', 'o':
44,308✔
880
                                c.state = OP_CO
44,308✔
881
                        default:
1✔
882
                                goto parseErr
1✔
883
                        }
884
                case OP_CO:
44,308✔
885
                        switch b {
44,308✔
886
                        case 'N', 'n':
44,307✔
887
                                c.state = OP_CON
44,307✔
888
                        default:
1✔
889
                                goto parseErr
1✔
890
                        }
891
                case OP_CON:
44,307✔
892
                        switch b {
44,307✔
893
                        case 'N', 'n':
44,306✔
894
                                c.state = OP_CONN
44,306✔
895
                        default:
1✔
896
                                goto parseErr
1✔
897
                        }
898
                case OP_CONN:
44,306✔
899
                        switch b {
44,306✔
900
                        case 'E', 'e':
44,305✔
901
                                c.state = OP_CONNE
44,305✔
902
                        default:
1✔
903
                                goto parseErr
1✔
904
                        }
905
                case OP_CONNE:
44,305✔
906
                        switch b {
44,305✔
907
                        case 'C', 'c':
44,304✔
908
                                c.state = OP_CONNEC
44,304✔
909
                        default:
1✔
910
                                goto parseErr
1✔
911
                        }
912
                case OP_CONNEC:
44,304✔
913
                        switch b {
44,304✔
914
                        case 'T', 't':
44,302✔
915
                                c.state = OP_CONNECT
44,302✔
916
                        default:
2✔
917
                                goto parseErr
2✔
918
                        }
919
                case OP_CONNECT:
88,604✔
920
                        switch b {
88,604✔
921
                        case ' ', '\t':
44,302✔
922
                                continue
44,302✔
923
                        default:
44,302✔
924
                                c.state = CONNECT_ARG
44,302✔
925
                                c.as = i
44,302✔
926
                        }
927
                case CONNECT_ARG:
9,360,717✔
928
                        switch b {
9,360,717✔
929
                        case '\r':
44,301✔
930
                                c.drop = 1
44,301✔
931
                        case '\n':
44,302✔
932
                                var arg []byte
44,302✔
933
                                if c.argBuf != nil {
46,731✔
934
                                        arg = c.argBuf
2,429✔
935
                                        c.argBuf = nil
2,429✔
936
                                } else {
44,302✔
937
                                        arg = buf[c.as : i-c.drop]
41,873✔
938
                                }
41,873✔
939
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
44,302✔
940
                                        return err
×
941
                                }
×
942
                                if trace {
45,013✔
943
                                        c.traceInOp("CONNECT", removeSecretsFromTrace(arg))
711✔
944
                                }
711✔
945
                                if err := c.processConnect(arg); err != nil {
44,619✔
946
                                        return err
317✔
947
                                }
317✔
948
                                c.drop, c.state = 0, OP_START
43,985✔
949
                                // Reset notion on authSet
43,985✔
950
                                c.mu.Lock()
43,985✔
951
                                authSet = c.awaitingAuth()
43,985✔
952
                                c.mu.Unlock()
43,985✔
953
                        default:
9,272,114✔
954
                                if c.argBuf != nil {
10,029,265✔
955
                                        c.argBuf = append(c.argBuf, b)
757,151✔
956
                                }
757,151✔
957
                        }
958
                case OP_M:
5,949,223✔
959
                        switch b {
5,949,223✔
960
                        case 'S', 's':
5,949,223✔
961
                                c.state = OP_MS
5,949,223✔
962
                        default:
×
963
                                goto parseErr
×
964
                        }
965
                case OP_MS:
5,949,223✔
966
                        switch b {
5,949,223✔
967
                        case 'G', 'g':
5,949,223✔
968
                                c.state = OP_MSG
5,949,223✔
969
                        default:
×
970
                                goto parseErr
×
971
                        }
972
                case OP_MSG:
5,949,223✔
973
                        switch b {
5,949,223✔
974
                        case ' ', '\t':
5,949,223✔
975
                                c.state = OP_MSG_SPC
5,949,223✔
976
                        default:
×
977
                                goto parseErr
×
978
                        }
979
                case OP_MSG_SPC:
5,949,223✔
980
                        switch b {
5,949,223✔
981
                        case ' ', '\t':
×
982
                                continue
×
983
                        default:
5,949,223✔
984
                                c.pa.hdr = -1
5,949,223✔
985
                                c.state = MSG_ARG
5,949,223✔
986
                                c.as = i
5,949,223✔
987
                        }
988
                case MSG_ARG:
217,383,950✔
989
                        switch b {
217,383,950✔
990
                        case '\r':
5,949,220✔
991
                                c.drop = 1
5,949,220✔
992
                        case '\n':
5,949,219✔
993
                                var arg []byte
5,949,219✔
994
                                if c.argBuf != nil {
6,112,323✔
995
                                        arg = c.argBuf
163,104✔
996
                                        c.argBuf = nil
163,104✔
997
                                } else {
5,949,219✔
998
                                        arg = buf[c.as : i-c.drop]
5,786,115✔
999
                                }
5,786,115✔
1000
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
5,949,219✔
1001
                                        return err
×
1002
                                }
×
1003
                                var err error
5,949,219✔
1004
                                if c.kind == ROUTER || c.kind == GATEWAY {
11,785,591✔
1005
                                        switch c.op {
5,836,372✔
1006
                                        case 'R', 'r':
5,828,805✔
1007
                                                if trace {
5,835,549✔
1008
                                                        c.traceInOp("RMSG", arg)
6,744✔
1009
                                                }
6,744✔
1010
                                                err = c.processRoutedMsgArgs(arg)
5,828,805✔
1011
                                        case 'L', 'l':
7,567✔
1012
                                                if trace {
7,836✔
1013
                                                        c.traceInOp("LMSG", arg)
269✔
1014
                                                }
269✔
1015
                                                lmsg = true
7,567✔
1016
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
7,567✔
1017
                                        }
1018
                                } else if c.kind == LEAF {
225,694✔
1019
                                        if trace {
112,862✔
1020
                                                c.traceInOp("LMSG", arg)
15✔
1021
                                        }
15✔
1022
                                        err = c.processLeafMsgArgs(arg)
112,847✔
1023
                                }
1024
                                if err != nil {
5,949,220✔
1025
                                        return err
1✔
1026
                                }
1✔
1027
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
5,949,218✔
1028

5,949,218✔
1029
                                // jump ahead with the index. If this overruns
5,949,218✔
1030
                                // what is left we fall out and process split
5,949,218✔
1031
                                // buffer.
5,949,218✔
1032
                                i = c.as + c.pa.size - LEN_CR_LF
5,949,218✔
1033
                        default:
205,485,511✔
1034
                                if c.argBuf != nil {
207,364,429✔
1035
                                        c.argBuf = append(c.argBuf, b)
1,878,918✔
1036
                                }
1,878,918✔
1037
                        }
1038
                case OP_I:
87,994✔
1039
                        switch b {
87,994✔
1040
                        case 'N', 'n':
87,993✔
1041
                                c.state = OP_IN
87,993✔
1042
                        default:
1✔
1043
                                goto parseErr
1✔
1044
                        }
1045
                case OP_IN:
87,993✔
1046
                        switch b {
87,993✔
1047
                        case 'F', 'f':
87,992✔
1048
                                c.state = OP_INF
87,992✔
1049
                        default:
1✔
1050
                                goto parseErr
1✔
1051
                        }
1052
                case OP_INF:
87,992✔
1053
                        switch b {
87,992✔
1054
                        case 'O', 'o':
87,991✔
1055
                                c.state = OP_INFO
87,991✔
1056
                        default:
1✔
1057
                                goto parseErr
1✔
1058
                        }
1059
                case OP_INFO:
175,983✔
1060
                        switch b {
175,983✔
1061
                        case ' ', '\t':
87,992✔
1062
                                continue
87,992✔
1063
                        default:
87,991✔
1064
                                c.state = INFO_ARG
87,991✔
1065
                                c.as = i
87,991✔
1066
                        }
1067
                case INFO_ARG:
33,206,926✔
1068
                        switch b {
33,206,926✔
1069
                        case '\r':
87,990✔
1070
                                c.drop = 1
87,990✔
1071
                        case '\n':
87,991✔
1072
                                var arg []byte
87,991✔
1073
                                if c.argBuf != nil {
96,746✔
1074
                                        arg = c.argBuf
8,755✔
1075
                                        c.argBuf = nil
8,755✔
1076
                                } else {
87,991✔
1077
                                        arg = buf[c.as : i-c.drop]
79,236✔
1078
                                }
79,236✔
1079
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
87,991✔
1080
                                        return err
×
1081
                                }
×
1082
                                if err := c.processInfo(arg); err != nil {
87,992✔
1083
                                        return err
1✔
1084
                                }
1✔
1085
                                c.drop, c.as, c.state = 0, i+1, OP_START
87,990✔
1086
                        default:
33,030,945✔
1087
                                if c.argBuf != nil {
33,978,092✔
1088
                                        c.argBuf = append(c.argBuf, b)
947,147✔
1089
                                }
947,147✔
1090
                        }
1091
                case OP_PLUS:
3✔
1092
                        switch b {
3✔
1093
                        case 'O', 'o':
2✔
1094
                                c.state = OP_PLUS_O
2✔
1095
                        default:
1✔
1096
                                goto parseErr
1✔
1097
                        }
1098
                case OP_PLUS_O:
2✔
1099
                        switch b {
2✔
1100
                        case 'K', 'k':
1✔
1101
                                c.state = OP_PLUS_OK
1✔
1102
                        default:
1✔
1103
                                goto parseErr
1✔
1104
                        }
1105
                case OP_PLUS_OK:
2✔
1106
                        switch b {
2✔
1107
                        case '\n':
1✔
1108
                                c.drop, c.state = 0, OP_START
1✔
1109
                        }
1110
                case OP_MINUS:
172✔
1111
                        switch b {
172✔
1112
                        case 'E', 'e':
171✔
1113
                                c.state = OP_MINUS_E
171✔
1114
                        default:
1✔
1115
                                goto parseErr
1✔
1116
                        }
1117
                case OP_MINUS_E:
171✔
1118
                        switch b {
171✔
1119
                        case 'R', 'r':
170✔
1120
                                c.state = OP_MINUS_ER
170✔
1121
                        default:
1✔
1122
                                goto parseErr
1✔
1123
                        }
1124
                case OP_MINUS_ER:
170✔
1125
                        switch b {
170✔
1126
                        case 'R', 'r':
169✔
1127
                                c.state = OP_MINUS_ERR
169✔
1128
                        default:
1✔
1129
                                goto parseErr
1✔
1130
                        }
1131
                case OP_MINUS_ERR:
169✔
1132
                        switch b {
169✔
1133
                        case ' ', '\t':
168✔
1134
                                c.state = OP_MINUS_ERR_SPC
168✔
1135
                        default:
1✔
1136
                                goto parseErr
1✔
1137
                        }
1138
                case OP_MINUS_ERR_SPC:
168✔
1139
                        switch b {
168✔
1140
                        case ' ', '\t':
×
1141
                                continue
×
1142
                        default:
168✔
1143
                                c.state = MINUS_ERR_ARG
168✔
1144
                                c.as = i
168✔
1145
                        }
1146
                case MINUS_ERR_ARG:
6,244✔
1147
                        switch b {
6,244✔
1148
                        case '\r':
168✔
1149
                                c.drop = 1
168✔
1150
                        case '\n':
168✔
1151
                                var arg []byte
168✔
1152
                                if c.argBuf != nil {
170✔
1153
                                        arg = c.argBuf
2✔
1154
                                        c.argBuf = nil
2✔
1155
                                } else {
168✔
1156
                                        arg = buf[c.as : i-c.drop]
166✔
1157
                                }
166✔
1158
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
168✔
1159
                                        return err
×
1160
                                }
×
1161
                                c.processErr(string(arg))
168✔
1162
                                c.drop, c.as, c.state = 0, i+1, OP_START
168✔
1163
                        default:
5,908✔
1164
                                if c.argBuf != nil {
5,915✔
1165
                                        c.argBuf = append(c.argBuf, b)
7✔
1166
                                }
7✔
1167
                        }
1168
                default:
×
1169
                        goto parseErr
×
1170
                }
1171
        }
1172

1173
        // Check for split buffer scenarios for any ARG state.
1174
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
6,619,504✔
1175
                c.state == PUB_ARG || c.state == HPUB_ARG ||
6,619,504✔
1176
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
6,619,504✔
1177
                c.state == MSG_ARG || c.state == HMSG_ARG ||
6,619,504✔
1178
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
6,860,941✔
1179

241,437✔
1180
                // Setup a holder buffer to deal with split buffer scenario.
241,437✔
1181
                if c.argBuf == nil {
482,566✔
1182
                        c.argBuf = c.scratch[:0]
241,129✔
1183
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
241,129✔
1184
                }
241,129✔
1185
                // Check for violations of control line length here. Note that this is not
1186
                // exact at all but the performance hit is too great to be precise, and
1187
                // catching here should prevent memory exhaustion attacks.
1188
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
241,439✔
1189
                        return err
2✔
1190
                }
2✔
1191
        }
1192

1193
        // Check for split msg
1194
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
7,281,933✔
1195
                // We need to clone the pubArg if it is still referencing the
662,431✔
1196
                // read buffer and we are not able to process the msg.
662,431✔
1197

662,431✔
1198
                if c.argBuf == nil {
1,324,862✔
1199
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
662,431✔
1200
                        if err := c.clonePubArg(lmsg); err != nil {
662,431✔
1201
                                goto parseErr
×
1202
                        }
1203
                }
1204

1205
                // If we will overflow the scratch buffer, just create a
1206
                // new buffer to hold the split message.
1207
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
697,664✔
1208
                        lrem := len(buf[c.as:])
35,233✔
1209
                        // Consider it a protocol error when the remaining payload
35,233✔
1210
                        // is larger than the reported size for PUB. It can happen
35,233✔
1211
                        // when processing incomplete messages from rogue clients.
35,233✔
1212
                        if lrem > c.pa.size+LEN_CR_LF {
35,233✔
1213
                                goto parseErr
×
1214
                        }
1215
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
35,233✔
1216
                        copy(c.msgBuf, buf[c.as:])
35,233✔
1217
                } else {
627,198✔
1218
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
627,198✔
1219
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
627,198✔
1220
                }
627,198✔
1221
        }
1222

1223
        return nil
6,619,502✔
1224

6,619,502✔
1225
authErr:
6,619,502✔
1226
        c.authViolation()
12✔
1227
        return ErrAuthentication
12✔
1228

12✔
1229
parseErr:
12✔
1230
        c.sendErr("Unknown Protocol Operation")
53✔
1231
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
53✔
1232
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
53✔
1233
        return err
53✔
1234
}
1235

1236
func protoSnippet(start, max int, buf []byte) string {
111✔
1237
        stop := start + max
111✔
1238
        bufSize := len(buf)
111✔
1239
        if start >= bufSize {
114✔
1240
                return `""`
3✔
1241
        }
3✔
1242
        if stop > bufSize {
191✔
1243
                stop = bufSize - 1
83✔
1244
        }
83✔
1245
        return fmt.Sprintf("%q", buf[start:stop])
108✔
1246
}
1247

1248
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1249
// If so, an error is sent to the client and the connection is closed.
1250
// The error ErrMaxControlLine is returned.
1251
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
16,089,520✔
1252
        if c.kind != CLIENT {
23,994,029✔
1253
                return nil
7,904,509✔
1254
        }
7,904,509✔
1255
        if len(arg) > int(mcl) {
8,185,014✔
1256
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
3✔
1257
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
3✔
1258
                c.sendErr(err.Error())
3✔
1259
                c.closeConnection(MaxControlLineExceeded)
3✔
1260
                return err
3✔
1261
        }
3✔
1262
        return nil
8,185,008✔
1263
}
1264

1265
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1266
// we need to hold onto it into the next read.
1267
func (c *client) clonePubArg(lmsg bool) error {
662,431✔
1268
        // Just copy and re-process original arg buffer.
662,431✔
1269
        c.argBuf = c.scratch[:0]
662,431✔
1270
        c.argBuf = append(c.argBuf, c.pa.arg...)
662,431✔
1271

662,431✔
1272
        switch c.kind {
662,431✔
1273
        case ROUTER, GATEWAY:
621,141✔
1274
                if lmsg {
621,865✔
1275
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
724✔
1276
                }
724✔
1277
                if c.pa.hdr < 0 {
1,170,132✔
1278
                        return c.processRoutedMsgArgs(c.argBuf)
549,715✔
1279
                } else {
620,417✔
1280
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
70,702✔
1281
                }
70,702✔
1282
        case LEAF:
2,200✔
1283
                if c.pa.hdr < 0 {
4,310✔
1284
                        return c.processLeafMsgArgs(c.argBuf)
2,110✔
1285
                } else {
2,200✔
1286
                        return c.processLeafHeaderMsgArgs(c.argBuf)
90✔
1287
                }
90✔
1288
        default:
39,090✔
1289
                if c.pa.hdr < 0 {
77,603✔
1290
                        return c.processPub(c.argBuf)
38,513✔
1291
                } else {
39,090✔
1292
                        return c.processHeaderPub(c.argBuf, nil)
577✔
1293
                }
577✔
1294
        }
1295
}
1296

1297
func (ps *parseState) getHeader() http.Header {
842✔
1298
        if ps.header == nil {
1,684✔
1299
                if hdr := ps.pa.hdr; hdr > 0 {
915✔
1300
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1301
                        tp := textproto.NewReader(reader)
73✔
1302
                        tp.ReadLine() // skip over first line, contains version
73✔
1303
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1304
                                ps.header = http.Header(mimeHeader)
73✔
1305
                        }
73✔
1306
                }
1307
        }
1308
        return ps.header
842✔
1309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc