• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 20087658921

09 Dec 2025 02:35PM UTC coverage: 84.763% (+0.07%) from 84.691%
20087658921

push

github

web-flow
[FIXED] Filestore compaction issues (#7627)

This PR fixes a couple issues relating to `Compact(seq)`:
1. If an (almost) empty stream would be compacted or purged to `seq=2`
and the server would be hard killed after, it would not have written a
tombstone to recover this sequence from.
2. A particular sequence of `SkipMsg(seq)`, `Compact(seq)`,
`SkipMsg(seq+1)` would preserve the block and tombstones after the
compact, but the subsequent `SkipMsg(seq+1)` on the empty block would
see the server starting to overwrite block data at the head of the file.
Reading that file from disk afterward would appear corrupted and log:
`indexCacheBuf corrupt record state`.
3. Compaction of a block that would allow to reclaim over half of the
storage size taken up by the file would overwrite the file in place.
This could lose messages if the server was hard killed after the file
truncation, but before it fully wrote the compacted bytes. We now write
the file separately and rename it to overwrite the original file
atomically.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

74120 of 87444 relevant lines covered (84.76%)

359268.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.96
/server/parser.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "fmt"
20
        "net/http"
21
        "net/textproto"
22
)
23

24
type parserState int
25
type parseState struct {
26
        state   parserState
27
        op      byte
28
        as      int
29
        drop    int
30
        pa      pubArg
31
        argBuf  []byte
32
        msgBuf  []byte
33
        header  http.Header // access via getHeader
34
        scratch [MAX_CONTROL_LINE_SIZE]byte
35
}
36

37
type pubArg struct {
38
        arg       []byte
39
        pacache   []byte
40
        origin    []byte
41
        account   []byte
42
        subject   []byte
43
        deliver   []byte
44
        mapped    []byte
45
        reply     []byte
46
        szb       []byte
47
        hdb       []byte
48
        queues    [][]byte
49
        size      int
50
        hdr       int
51
        psi       []*serviceImport
52
        trace     *msgTrace
53
        delivered bool // Only used for service imports
54
}
55

56
// Parser constants
57
const (
58
        OP_START parserState = iota
59
        OP_PLUS
60
        OP_PLUS_O
61
        OP_PLUS_OK
62
        OP_MINUS
63
        OP_MINUS_E
64
        OP_MINUS_ER
65
        OP_MINUS_ERR
66
        OP_MINUS_ERR_SPC
67
        MINUS_ERR_ARG
68
        OP_C
69
        OP_CO
70
        OP_CON
71
        OP_CONN
72
        OP_CONNE
73
        OP_CONNEC
74
        OP_CONNECT
75
        CONNECT_ARG
76
        OP_H
77
        OP_HP
78
        OP_HPU
79
        OP_HPUB
80
        OP_HPUB_SPC
81
        HPUB_ARG
82
        OP_HM
83
        OP_HMS
84
        OP_HMSG
85
        OP_HMSG_SPC
86
        HMSG_ARG
87
        OP_P
88
        OP_PU
89
        OP_PUB
90
        OP_PUB_SPC
91
        PUB_ARG
92
        OP_PI
93
        OP_PIN
94
        OP_PING
95
        OP_PO
96
        OP_PON
97
        OP_PONG
98
        MSG_PAYLOAD
99
        MSG_END_R
100
        MSG_END_N
101
        OP_S
102
        OP_SU
103
        OP_SUB
104
        OP_SUB_SPC
105
        SUB_ARG
106
        OP_A
107
        OP_ASUB
108
        OP_ASUB_SPC
109
        ASUB_ARG
110
        OP_AUSUB
111
        OP_AUSUB_SPC
112
        AUSUB_ARG
113
        OP_L
114
        OP_LS
115
        OP_R
116
        OP_RS
117
        OP_U
118
        OP_UN
119
        OP_UNS
120
        OP_UNSU
121
        OP_UNSUB
122
        OP_UNSUB_SPC
123
        UNSUB_ARG
124
        OP_M
125
        OP_MS
126
        OP_MSG
127
        OP_MSG_SPC
128
        MSG_ARG
129
        OP_I
130
        OP_IN
131
        OP_INF
132
        OP_INFO
133
        INFO_ARG
134
)
135

136
func (c *client) parse(buf []byte) error {
7,183,285✔
137
        // Branch out to mqtt clients. c.mqtt is immutable, but should it become
7,183,285✔
138
        // an issue (say data race detection), we could branch outside in readLoop
7,183,285✔
139
        if c.isMqtt() {
7,185,549✔
140
                return c.mqttParse(buf)
2,264✔
141
        }
2,264✔
142
        var i int
7,181,021✔
143
        var b byte
7,181,021✔
144
        var lmsg bool
7,181,021✔
145

7,181,021✔
146
        // Snapshots
7,181,021✔
147
        c.mu.Lock()
7,181,021✔
148
        // Snapshot and then reset when we receive a
7,181,021✔
149
        // proper CONNECT if needed.
7,181,021✔
150
        authSet := c.awaitingAuth()
7,181,021✔
151
        // Snapshot max control line as well.
7,181,021✔
152
        s, mcl, trace := c.srv, c.mcl, c.trace
7,181,021✔
153
        c.mu.Unlock()
7,181,021✔
154

7,181,021✔
155
        // Move to loop instead of range syntax to allow jumping of i
7,181,021✔
156
        for i = 0; i < len(buf); i++ {
667,091,560✔
157
                b = buf[i]
659,910,539✔
158

659,910,539✔
159
                switch c.state {
659,910,539✔
160
                case OP_START:
18,606,051✔
161
                        c.op = b
18,606,051✔
162
                        if b != 'C' && b != 'c' {
37,168,063✔
163
                                if authSet {
18,562,026✔
164
                                        if s == nil {
14✔
165
                                                goto authErr
×
166
                                        }
167
                                        var ok bool
14✔
168
                                        // Check here for NoAuthUser. If this is set allow non CONNECT protos as our first.
14✔
169
                                        // E.g. telnet proto demos.
14✔
170
                                        if noAuthUser := s.getOpts().NoAuthUser; noAuthUser != _EMPTY_ {
16✔
171
                                                s.mu.Lock()
2✔
172
                                                user, exists := s.users[noAuthUser]
2✔
173
                                                s.mu.Unlock()
2✔
174
                                                if exists {
4✔
175
                                                        c.RegisterUser(user)
2✔
176
                                                        c.mu.Lock()
2✔
177
                                                        c.clearAuthTimer()
2✔
178
                                                        c.flags.set(connectReceived)
2✔
179
                                                        c.mu.Unlock()
2✔
180
                                                        authSet, ok = false, true
2✔
181
                                                }
2✔
182
                                        }
183
                                        if !ok {
26✔
184
                                                goto authErr
12✔
185
                                        }
186
                                }
187
                                // If the connection is a gateway connection, make sure that
188
                                // if this is an inbound, it starts with a CONNECT.
189
                                if c.kind == GATEWAY && !c.gw.outbound && !c.gw.connected {
18,562,000✔
190
                                        // Use auth violation since no CONNECT was sent.
×
191
                                        // It could be a parseErr too.
×
192
                                        goto authErr
×
193
                                }
194
                        }
195
                        switch b {
18,606,039✔
196
                        case 'P', 'p':
9,837,324✔
197
                                c.state = OP_P
9,837,324✔
198
                        case 'H', 'h':
827,528✔
199
                                c.state = OP_H
827,528✔
200
                        case 'S', 's':
135,355✔
201
                                c.state = OP_S
135,355✔
202
                        case 'U', 'u':
13,043✔
203
                                c.state = OP_U
13,043✔
204
                        case 'R', 'r':
7,493,106✔
205
                                if c.kind == CLIENT {
7,493,106✔
206
                                        goto parseErr
×
207
                                } else {
7,493,106✔
208
                                        c.state = OP_R
7,493,106✔
209
                                }
7,493,106✔
210
                        case 'L', 'l':
167,835✔
211
                                if c.kind != LEAF && c.kind != ROUTER {
167,835✔
212
                                        goto parseErr
×
213
                                } else {
167,835✔
214
                                        c.state = OP_L
167,835✔
215
                                }
167,835✔
216
                        case 'A', 'a':
32✔
217
                                if c.kind == CLIENT {
32✔
218
                                        goto parseErr
×
219
                                } else {
32✔
220
                                        c.state = OP_A
32✔
221
                                }
32✔
222
                        case 'C', 'c':
44,039✔
223
                                c.state = OP_C
44,039✔
224
                        case 'I', 'i':
87,623✔
225
                                c.state = OP_I
87,623✔
226
                        case '+':
3✔
227
                                c.state = OP_PLUS
3✔
228
                        case '-':
139✔
229
                                c.state = OP_MINUS
139✔
230
                        default:
12✔
231
                                goto parseErr
12✔
232
                        }
233
                case OP_H:
827,528✔
234
                        switch b {
827,528✔
235
                        case 'P', 'p':
323,013✔
236
                                c.state = OP_HP
323,013✔
237
                        case 'M', 'm':
504,515✔
238
                                c.state = OP_HM
504,515✔
239
                        default:
×
240
                                goto parseErr
×
241
                        }
242
                case OP_HP:
323,012✔
243
                        switch b {
323,012✔
244
                        case 'U', 'u':
323,012✔
245
                                c.state = OP_HPU
323,012✔
246
                        default:
×
247
                                goto parseErr
×
248
                        }
249
                case OP_HPU:
323,012✔
250
                        switch b {
323,012✔
251
                        case 'B', 'b':
323,012✔
252
                                c.state = OP_HPUB
323,012✔
253
                        default:
×
254
                                goto parseErr
×
255
                        }
256
                case OP_HPUB:
323,012✔
257
                        switch b {
323,012✔
258
                        case ' ', '\t':
323,012✔
259
                                c.state = OP_HPUB_SPC
323,012✔
260
                        default:
×
261
                                goto parseErr
×
262
                        }
263
                case OP_HPUB_SPC:
323,012✔
264
                        switch b {
323,012✔
265
                        case ' ', '\t':
×
266
                                continue
×
267
                        default:
323,012✔
268
                                c.pa.hdr = 0
323,012✔
269
                                c.state = HPUB_ARG
323,012✔
270
                                c.as = i
323,012✔
271
                        }
272
                case HPUB_ARG:
11,458,812✔
273
                        switch b {
11,458,812✔
274
                        case '\r':
323,010✔
275
                                c.drop = 1
323,010✔
276
                        case '\n':
323,010✔
277
                                var arg []byte
323,010✔
278
                                if c.argBuf != nil {
323,088✔
279
                                        arg = c.argBuf
78✔
280
                                        c.argBuf = nil
78✔
281
                                } else {
323,010✔
282
                                        arg = buf[c.as : i-c.drop]
322,932✔
283
                                }
322,932✔
284
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
323,010✔
285
                                        return err
×
286
                                }
×
287
                                if trace {
373,096✔
288
                                        c.traceInOp("HPUB", arg)
50,086✔
289
                                }
50,086✔
290
                                var remaining []byte
323,010✔
291
                                if i < len(buf) {
646,020✔
292
                                        remaining = buf[i+1:]
323,010✔
293
                                }
323,010✔
294
                                if err := c.processHeaderPub(arg, remaining); err != nil {
323,014✔
295
                                        return err
4✔
296
                                }
4✔
297

298
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
323,006✔
299
                                // If we don't have a saved buffer then jump ahead with
323,006✔
300
                                // the index. If this overruns what is left we fall out
323,006✔
301
                                // and process split buffer.
323,006✔
302
                                if c.msgBuf == nil {
646,012✔
303
                                        i = c.as + c.pa.size - LEN_CR_LF
323,006✔
304
                                }
323,006✔
305
                        default:
10,812,792✔
306
                                if c.argBuf != nil {
10,813,790✔
307
                                        c.argBuf = append(c.argBuf, b)
998✔
308
                                }
998✔
309
                        }
310
                case OP_HM:
504,515✔
311
                        switch b {
504,515✔
312
                        case 'S', 's':
504,515✔
313
                                c.state = OP_HMS
504,515✔
314
                        default:
×
315
                                goto parseErr
×
316
                        }
317
                case OP_HMS:
504,515✔
318
                        switch b {
504,515✔
319
                        case 'G', 'g':
504,515✔
320
                                c.state = OP_HMSG
504,515✔
321
                        default:
×
322
                                goto parseErr
×
323
                        }
324
                case OP_HMSG:
504,515✔
325
                        switch b {
504,515✔
326
                        case ' ', '\t':
504,515✔
327
                                c.state = OP_HMSG_SPC
504,515✔
328
                        default:
×
329
                                goto parseErr
×
330
                        }
331
                case OP_HMSG_SPC:
504,515✔
332
                        switch b {
504,515✔
333
                        case ' ', '\t':
×
334
                                continue
×
335
                        default:
504,515✔
336
                                c.pa.hdr = 0
504,515✔
337
                                c.state = HMSG_ARG
504,515✔
338
                                c.as = i
504,515✔
339
                        }
340
                case HMSG_ARG:
38,404,755✔
341
                        switch b {
38,404,755✔
342
                        case '\r':
504,515✔
343
                                c.drop = 1
504,515✔
344
                        case '\n':
504,515✔
345
                                var arg []byte
504,515✔
346
                                if c.argBuf != nil {
507,421✔
347
                                        arg = c.argBuf
2,906✔
348
                                        c.argBuf = nil
2,906✔
349
                                } else {
504,515✔
350
                                        arg = buf[c.as : i-c.drop]
501,609✔
351
                                }
501,609✔
352
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
504,515✔
353
                                        return err
×
354
                                }
×
355
                                var err error
504,515✔
356
                                if c.kind == ROUTER || c.kind == GATEWAY {
1,008,632✔
357
                                        if trace {
506,267✔
358
                                                c.traceInOp("HMSG", arg)
2,150✔
359
                                        }
2,150✔
360
                                        err = c.processRoutedHeaderMsgArgs(arg)
504,117✔
361
                                } else if c.kind == LEAF {
796✔
362
                                        if trace {
549✔
363
                                                c.traceInOp("HMSG", arg)
151✔
364
                                        }
151✔
365
                                        err = c.processLeafHeaderMsgArgs(arg)
398✔
366
                                }
367
                                if err != nil {
504,516✔
368
                                        return err
1✔
369
                                }
1✔
370
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
504,514✔
371

504,514✔
372
                                // jump ahead with the index. If this overruns
504,514✔
373
                                // what is left we fall out and process split
504,514✔
374
                                // buffer.
504,514✔
375
                                i = c.as + c.pa.size - LEN_CR_LF
504,514✔
376
                        default:
37,395,725✔
377
                                if c.argBuf != nil {
37,451,213✔
378
                                        c.argBuf = append(c.argBuf, b)
55,488✔
379
                                }
55,488✔
380
                        }
381
                case OP_P:
9,837,324✔
382
                        switch b {
9,837,324✔
383
                        case 'U', 'u':
9,769,460✔
384
                                c.state = OP_PU
9,769,460✔
385
                        case 'I', 'i':
39,990✔
386
                                c.state = OP_PI
39,990✔
387
                        case 'O', 'o':
27,873✔
388
                                c.state = OP_PO
27,873✔
389
                        default:
1✔
390
                                goto parseErr
1✔
391
                        }
392
                case OP_PU:
9,769,460✔
393
                        switch b {
9,769,460✔
394
                        case 'B', 'b':
9,769,459✔
395
                                c.state = OP_PUB
9,769,459✔
396
                        default:
1✔
397
                                goto parseErr
1✔
398
                        }
399
                case OP_PUB:
9,769,459✔
400
                        switch b {
9,769,459✔
401
                        case ' ', '\t':
9,769,458✔
402
                                c.state = OP_PUB_SPC
9,769,458✔
403
                        default:
1✔
404
                                goto parseErr
1✔
405
                        }
406
                case OP_PUB_SPC:
9,769,456✔
407
                        switch b {
9,769,456✔
408
                        case ' ', '\t':
1✔
409
                                continue
1✔
410
                        default:
9,769,455✔
411
                                c.pa.hdr = -1
9,769,455✔
412
                                c.state = PUB_ARG
9,769,455✔
413
                                c.as = i
9,769,455✔
414
                        }
415
                case PUB_ARG:
134,913,607✔
416
                        switch b {
134,913,607✔
417
                        case '\r':
9,769,442✔
418
                                c.drop = 1
9,769,442✔
419
                        case '\n':
9,769,442✔
420
                                var arg []byte
9,769,442✔
421
                                if c.argBuf != nil {
9,797,885✔
422
                                        arg = c.argBuf
28,443✔
423
                                        c.argBuf = nil
28,443✔
424
                                } else {
9,769,442✔
425
                                        arg = buf[c.as : i-c.drop]
9,740,999✔
426
                                }
9,740,999✔
427
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
9,769,443✔
428
                                        return err
1✔
429
                                }
1✔
430
                                if trace {
9,854,033✔
431
                                        c.traceInOp("PUB", arg)
84,592✔
432
                                }
84,592✔
433
                                if err := c.processPub(arg); err != nil {
9,769,452✔
434
                                        return err
11✔
435
                                }
11✔
436

437
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
9,769,430✔
438
                                // If we don't have a saved buffer then jump ahead with
9,769,430✔
439
                                // the index. If this overruns what is left we fall out
9,769,430✔
440
                                // and process split buffer.
9,769,430✔
441
                                if c.msgBuf == nil {
19,538,860✔
442
                                        i = c.as + c.pa.size - LEN_CR_LF
9,769,430✔
443
                                }
9,769,430✔
444
                        default:
115,374,723✔
445
                                if c.argBuf != nil {
115,901,175✔
446
                                        c.argBuf = append(c.argBuf, b)
526,452✔
447
                                }
526,452✔
448
                        }
449
                case MSG_PAYLOAD:
17,289,824✔
450
                        if c.msgBuf != nil {
18,278,840✔
451
                                // copy as much as we can to the buffer and skip ahead.
989,016✔
452
                                toCopy := c.pa.size - len(c.msgBuf)
989,016✔
453
                                avail := len(buf) - i
989,016✔
454
                                if avail < toCopy {
1,269,694✔
455
                                        toCopy = avail
280,678✔
456
                                }
280,678✔
457
                                if toCopy > 0 {
1,978,032✔
458
                                        start := len(c.msgBuf)
989,016✔
459
                                        // This is needed for copy to work.
989,016✔
460
                                        c.msgBuf = c.msgBuf[:start+toCopy]
989,016✔
461
                                        copy(c.msgBuf[start:], buf[i:i+toCopy])
989,016✔
462
                                        // Update our index
989,016✔
463
                                        i = (i + toCopy) - 1
989,016✔
464
                                } else {
989,016✔
465
                                        // Fall back to append if needed.
×
466
                                        c.msgBuf = append(c.msgBuf, b)
×
467
                                }
×
468
                                if len(c.msgBuf) >= c.pa.size {
1,697,354✔
469
                                        c.state = MSG_END_R
708,338✔
470
                                }
708,338✔
471
                        } else if i-c.as+1 >= c.pa.size {
32,601,616✔
472
                                c.state = MSG_END_R
16,300,808✔
473
                        }
16,300,808✔
474
                case MSG_END_R:
17,009,146✔
475
                        if b != '\r' {
17,009,149✔
476
                                goto parseErr
3✔
477
                        }
478
                        if c.msgBuf != nil {
17,730,697✔
479
                                c.msgBuf = append(c.msgBuf, b)
721,554✔
480
                        }
721,554✔
481
                        c.state = MSG_END_N
17,009,143✔
482
                case MSG_END_N:
17,009,130✔
483
                        if b != '\n' {
17,009,131✔
484
                                goto parseErr
1✔
485
                        }
486
                        if c.msgBuf != nil {
17,732,693✔
487
                                c.msgBuf = append(c.msgBuf, b)
723,564✔
488
                        } else {
17,009,129✔
489
                                c.msgBuf = buf[c.as : i+1]
16,285,565✔
490
                        }
16,285,565✔
491

492
                        var mt *msgTrace
17,009,129✔
493
                        if c.pa.hdr > 0 {
17,837,083✔
494
                                mt = c.initMsgTrace()
827,954✔
495
                        }
827,954✔
496
                        // Check for mappings.
497
                        if (c.kind == CLIENT || c.kind == LEAF) && c.in.flags.isSet(hasMappings) {
17,041,823✔
498
                                changed := c.selectMappedSubject()
32,694✔
499
                                if changed {
45,286✔
500
                                        if trace {
12,667✔
501
                                                c.traceInOp("MAPPING", []byte(fmt.Sprintf("%s -> %s", c.pa.mapped, c.pa.subject)))
75✔
502
                                        }
75✔
503
                                        // c.pa.subject is the subject the original is now mapped to.
504
                                        mt.addSubjectMappingEvent(c.pa.subject)
12,592✔
505
                                }
506
                        }
507
                        if trace {
17,152,977✔
508
                                c.traceMsg(c.msgBuf)
143,848✔
509
                        }
143,848✔
510

511
                        c.processInboundMsg(c.msgBuf)
17,009,129✔
512

17,009,129✔
513
                        mt.sendEvent()
17,009,129✔
514
                        c.argBuf, c.msgBuf, c.header = nil, nil, nil
17,009,129✔
515
                        c.drop, c.as, c.state = 0, i+1, OP_START
17,009,129✔
516
                        // Drop all pub args
17,009,129✔
517
                        c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil
17,009,129✔
518
                        c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil
17,009,129✔
519
                        c.pa.trace = nil
17,009,129✔
520
                        c.pa.delivered = false
17,009,129✔
521
                        lmsg = false
17,009,129✔
522
                case OP_A:
32✔
523
                        switch b {
32✔
524
                        case '+':
4✔
525
                                c.state = OP_ASUB
4✔
526
                        case '-', 'u':
28✔
527
                                c.state = OP_AUSUB
28✔
528
                        default:
×
529
                                goto parseErr
×
530
                        }
531
                case OP_ASUB:
4✔
532
                        switch b {
4✔
533
                        case ' ', '\t':
4✔
534
                                c.state = OP_ASUB_SPC
4✔
535
                        default:
×
536
                                goto parseErr
×
537
                        }
538
                case OP_ASUB_SPC:
4✔
539
                        switch b {
4✔
540
                        case ' ', '\t':
×
541
                                continue
×
542
                        default:
4✔
543
                                c.state = ASUB_ARG
4✔
544
                                c.as = i
4✔
545
                        }
546
                case ASUB_ARG:
14✔
547
                        switch b {
14✔
548
                        case '\r':
4✔
549
                                c.drop = 1
4✔
550
                        case '\n':
4✔
551
                                var arg []byte
4✔
552
                                if c.argBuf != nil {
4✔
553
                                        arg = c.argBuf
×
554
                                        c.argBuf = nil
×
555
                                } else {
4✔
556
                                        arg = buf[c.as : i-c.drop]
4✔
557
                                }
4✔
558
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
4✔
559
                                        return err
×
560
                                }
×
561
                                if trace {
4✔
562
                                        c.traceInOp("A+", arg)
×
563
                                }
×
564
                                if err := c.processAccountSub(arg); err != nil {
4✔
565
                                        return err
×
566
                                }
×
567
                                c.drop, c.as, c.state = 0, i+1, OP_START
4✔
568
                        default:
6✔
569
                                if c.argBuf != nil {
6✔
570
                                        c.argBuf = append(c.argBuf, b)
×
571
                                }
×
572
                        }
573
                case OP_AUSUB:
28✔
574
                        switch b {
28✔
575
                        case ' ', '\t':
28✔
576
                                c.state = OP_AUSUB_SPC
28✔
577
                        default:
×
578
                                goto parseErr
×
579
                        }
580
                case OP_AUSUB_SPC:
28✔
581
                        switch b {
28✔
582
                        case ' ', '\t':
×
583
                                continue
×
584
                        default:
28✔
585
                                c.state = AUSUB_ARG
28✔
586
                                c.as = i
28✔
587
                        }
588
                case AUSUB_ARG:
142✔
589
                        switch b {
142✔
590
                        case '\r':
28✔
591
                                c.drop = 1
28✔
592
                        case '\n':
28✔
593
                                var arg []byte
28✔
594
                                if c.argBuf != nil {
28✔
595
                                        arg = c.argBuf
×
596
                                        c.argBuf = nil
×
597
                                } else {
28✔
598
                                        arg = buf[c.as : i-c.drop]
28✔
599
                                }
28✔
600
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
28✔
601
                                        return err
×
602
                                }
×
603
                                if trace {
45✔
604
                                        c.traceInOp("A-", arg)
17✔
605
                                }
17✔
606
                                c.processAccountUnsub(arg)
28✔
607
                                c.drop, c.as, c.state = 0, i+1, OP_START
28✔
608
                        default:
86✔
609
                                if c.argBuf != nil {
86✔
610
                                        c.argBuf = append(c.argBuf, b)
×
611
                                }
×
612
                        }
613
                case OP_S:
135,355✔
614
                        switch b {
135,355✔
615
                        case 'U', 'u':
135,354✔
616
                                c.state = OP_SU
135,354✔
617
                        default:
1✔
618
                                goto parseErr
1✔
619
                        }
620
                case OP_SU:
135,354✔
621
                        switch b {
135,354✔
622
                        case 'B', 'b':
135,353✔
623
                                c.state = OP_SUB
135,353✔
624
                        default:
1✔
625
                                goto parseErr
1✔
626
                        }
627
                case OP_SUB:
1,175,835✔
628
                        switch b {
1,175,835✔
629
                        case ' ', '\t':
1,175,833✔
630
                                c.state = OP_SUB_SPC
1,175,833✔
631
                        default:
2✔
632
                                goto parseErr
2✔
633
                        }
634
                case OP_SUB_SPC:
1,175,833✔
635
                        switch b {
1,175,833✔
636
                        case ' ', '\t':
1✔
637
                                continue
1✔
638
                        default:
1,175,832✔
639
                                c.state = SUB_ARG
1,175,832✔
640
                                c.as = i
1,175,832✔
641
                        }
642
                case SUB_ARG:
44,035,813✔
643
                        switch b {
44,035,813✔
644
                        case '\r':
1,175,821✔
645
                                c.drop = 1
1,175,821✔
646
                        case '\n':
1,175,820✔
647
                                var arg []byte
1,175,820✔
648
                                if c.argBuf != nil {
1,203,204✔
649
                                        arg = c.argBuf
27,384✔
650
                                        c.argBuf = nil
27,384✔
651
                                } else {
1,175,820✔
652
                                        arg = buf[c.as : i-c.drop]
1,148,436✔
653
                                }
1,148,436✔
654
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
1,175,820✔
655
                                        return err
×
656
                                }
×
657
                                var err error
1,175,820✔
658

1,175,820✔
659
                                switch c.kind {
1,175,820✔
660
                                case CLIENT:
135,350✔
661
                                        if trace {
145,912✔
662
                                                c.traceInOp("SUB", arg)
10,562✔
663
                                        }
10,562✔
664
                                        err = c.parseSub(arg, false)
135,350✔
665
                                case ROUTER:
846,437✔
666
                                        switch c.op {
846,437✔
667
                                        case 'R', 'r':
834,431✔
668
                                                if trace {
837,297✔
669
                                                        c.traceInOp("RS+", arg)
2,866✔
670
                                                }
2,866✔
671
                                                err = c.processRemoteSub(arg, false)
834,431✔
672
                                        case 'L', 'l':
12,006✔
673
                                                if trace {
12,200✔
674
                                                        c.traceInOp("LS+", arg)
194✔
675
                                                }
194✔
676
                                                err = c.processRemoteSub(arg, true)
12,006✔
677
                                        }
678
                                case GATEWAY:
161,132✔
679
                                        if trace {
161,547✔
680
                                                c.traceInOp("RS+", arg)
415✔
681
                                        }
415✔
682
                                        err = c.processGatewayRSub(arg)
161,132✔
683
                                case LEAF:
32,901✔
684
                                        if trace {
33,022✔
685
                                                c.traceInOp("LS+", arg)
121✔
686
                                        }
121✔
687
                                        err = c.processLeafSub(arg)
32,901✔
688
                                }
689
                                if err != nil {
1,175,824✔
690
                                        return err
4✔
691
                                }
4✔
692
                                c.drop, c.as, c.state = 0, i+1, OP_START
1,175,816✔
693
                        default:
41,684,172✔
694
                                if c.argBuf != nil {
42,366,490✔
695
                                        c.argBuf = append(c.argBuf, b)
682,318✔
696
                                }
682,318✔
697
                        }
698
                case OP_L:
167,835✔
699
                        switch b {
167,835✔
700
                        case 'S', 's':
59,723✔
701
                                c.state = OP_LS
59,723✔
702
                        case 'M', 'm':
108,112✔
703
                                c.state = OP_M
108,112✔
704
                        default:
×
705
                                goto parseErr
×
706
                        }
707
                case OP_LS:
59,723✔
708
                        switch b {
59,723✔
709
                        case '+':
44,907✔
710
                                c.state = OP_SUB
44,907✔
711
                        case '-':
14,816✔
712
                                c.state = OP_UNSUB
14,816✔
713
                        default:
×
714
                                goto parseErr
×
715
                        }
716
                case OP_R:
7,493,105✔
717
                        switch b {
7,493,105✔
718
                        case 'S', 's':
1,188,987✔
719
                                c.state = OP_RS
1,188,987✔
720
                        case 'M', 'm':
6,304,118✔
721
                                c.state = OP_M
6,304,118✔
722
                        default:
×
723
                                goto parseErr
×
724
                        }
725
                case OP_RS:
1,188,987✔
726
                        switch b {
1,188,987✔
727
                        case '+':
995,575✔
728
                                c.state = OP_SUB
995,575✔
729
                        case '-':
193,412✔
730
                                c.state = OP_UNSUB
193,412✔
731
                        default:
×
732
                                goto parseErr
×
733
                        }
734
                case OP_U:
13,043✔
735
                        switch b {
13,043✔
736
                        case 'N', 'n':
13,042✔
737
                                c.state = OP_UN
13,042✔
738
                        default:
1✔
739
                                goto parseErr
1✔
740
                        }
741
                case OP_UN:
13,042✔
742
                        switch b {
13,042✔
743
                        case 'S', 's':
13,041✔
744
                                c.state = OP_UNS
13,041✔
745
                        default:
1✔
746
                                goto parseErr
1✔
747
                        }
748
                case OP_UNS:
13,041✔
749
                        switch b {
13,041✔
750
                        case 'U', 'u':
13,040✔
751
                                c.state = OP_UNSU
13,040✔
752
                        default:
1✔
753
                                goto parseErr
1✔
754
                        }
755
                case OP_UNSU:
13,039✔
756
                        switch b {
13,039✔
757
                        case 'B', 'b':
13,038✔
758
                                c.state = OP_UNSUB
13,038✔
759
                        default:
1✔
760
                                goto parseErr
1✔
761
                        }
762
                case OP_UNSUB:
221,266✔
763
                        switch b {
221,266✔
764
                        case ' ', '\t':
221,260✔
765
                                c.state = OP_UNSUB_SPC
221,260✔
766
                        default:
6✔
767
                                goto parseErr
6✔
768
                        }
769
                case OP_UNSUB_SPC:
221,277✔
770
                        switch b {
221,277✔
771
                        case ' ', '\t':
17✔
772
                                continue
17✔
773
                        default:
221,260✔
774
                                c.state = UNSUB_ARG
221,260✔
775
                                c.as = i
221,260✔
776
                        }
777
                case UNSUB_ARG:
6,149,072✔
778
                        switch b {
6,149,072✔
779
                        case '\r':
221,257✔
780
                                c.drop = 1
221,257✔
781
                        case '\n':
221,259✔
782
                                var arg []byte
221,259✔
783
                                if c.argBuf != nil {
227,869✔
784
                                        arg = c.argBuf
6,610✔
785
                                        c.argBuf = nil
6,610✔
786
                                } else {
221,259✔
787
                                        arg = buf[c.as : i-c.drop]
214,649✔
788
                                }
214,649✔
789
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
221,259✔
790
                                        return err
×
791
                                }
×
792
                                var err error
221,259✔
793

221,259✔
794
                                switch c.kind {
221,259✔
795
                                case CLIENT:
13,032✔
796
                                        if trace {
23,344✔
797
                                                c.traceInOp("UNSUB", arg)
10,312✔
798
                                        }
10,312✔
799
                                        err = c.processUnsub(arg)
13,032✔
800
                                case ROUTER:
163,751✔
801
                                        if trace && c.srv != nil {
164,118✔
802
                                                switch c.op {
367✔
803
                                                case 'R', 'r':
173✔
804
                                                        c.traceInOp("RS-", arg)
173✔
805
                                                case 'L', 'l':
194✔
806
                                                        c.traceInOp("LS-", arg)
194✔
807
                                                }
808
                                        }
809
                                        leafUnsub := c.op == 'L' || c.op == 'l'
163,751✔
810
                                        err = c.processRemoteUnsub(arg, leafUnsub)
163,751✔
811
                                case GATEWAY:
41,051✔
812
                                        if trace {
42,239✔
813
                                                c.traceInOp("RS-", arg)
1,188✔
814
                                        }
1,188✔
815
                                        err = c.processGatewayRUnsub(arg)
41,051✔
816
                                case LEAF:
3,425✔
817
                                        if trace {
3,427✔
818
                                                c.traceInOp("LS-", arg)
2✔
819
                                        }
2✔
820
                                        err = c.processLeafUnsub(arg)
3,425✔
821
                                }
822
                                if err != nil {
221,261✔
823
                                        return err
2✔
824
                                }
2✔
825
                                c.drop, c.as, c.state = 0, i+1, OP_START
221,257✔
826
                        default:
5,706,556✔
827
                                if c.argBuf != nil {
5,831,276✔
828
                                        c.argBuf = append(c.argBuf, b)
124,720✔
829
                                }
124,720✔
830
                        }
831
                case OP_PI:
39,990✔
832
                        switch b {
39,990✔
833
                        case 'N', 'n':
39,989✔
834
                                c.state = OP_PIN
39,989✔
835
                        default:
1✔
836
                                goto parseErr
1✔
837
                        }
838
                case OP_PIN:
39,989✔
839
                        switch b {
39,989✔
840
                        case 'G', 'g':
39,988✔
841
                                c.state = OP_PING
39,988✔
842
                        default:
1✔
843
                                goto parseErr
1✔
844
                        }
845
                case OP_PING:
79,981✔
846
                        switch b {
79,981✔
847
                        case '\n':
39,987✔
848
                                if trace {
40,623✔
849
                                        c.traceInOp("PING", nil)
636✔
850
                                }
636✔
851
                                c.processPing()
39,987✔
852
                                c.drop, c.state = 0, OP_START
39,987✔
853
                        }
854
                case OP_PO:
27,873✔
855
                        switch b {
27,873✔
856
                        case 'N', 'n':
27,872✔
857
                                c.state = OP_PON
27,872✔
858
                        default:
1✔
859
                                goto parseErr
1✔
860
                        }
861
                case OP_PON:
27,872✔
862
                        switch b {
27,872✔
863
                        case 'G', 'g':
27,871✔
864
                                c.state = OP_PONG
27,871✔
865
                        default:
1✔
866
                                goto parseErr
1✔
867
                        }
868
                case OP_PONG:
55,747✔
869
                        switch b {
55,747✔
870
                        case '\n':
27,870✔
871
                                if trace {
27,920✔
872
                                        c.traceInOp("PONG", nil)
50✔
873
                                }
50✔
874
                                c.processPong()
27,870✔
875
                                c.drop, c.state = 0, OP_START
27,870✔
876
                        }
877
                case OP_C:
44,039✔
878
                        switch b {
44,039✔
879
                        case 'O', 'o':
44,038✔
880
                                c.state = OP_CO
44,038✔
881
                        default:
1✔
882
                                goto parseErr
1✔
883
                        }
884
                case OP_CO:
44,038✔
885
                        switch b {
44,038✔
886
                        case 'N', 'n':
44,037✔
887
                                c.state = OP_CON
44,037✔
888
                        default:
1✔
889
                                goto parseErr
1✔
890
                        }
891
                case OP_CON:
44,037✔
892
                        switch b {
44,037✔
893
                        case 'N', 'n':
44,036✔
894
                                c.state = OP_CONN
44,036✔
895
                        default:
1✔
896
                                goto parseErr
1✔
897
                        }
898
                case OP_CONN:
44,036✔
899
                        switch b {
44,036✔
900
                        case 'E', 'e':
44,035✔
901
                                c.state = OP_CONNE
44,035✔
902
                        default:
1✔
903
                                goto parseErr
1✔
904
                        }
905
                case OP_CONNE:
44,035✔
906
                        switch b {
44,035✔
907
                        case 'C', 'c':
44,034✔
908
                                c.state = OP_CONNEC
44,034✔
909
                        default:
1✔
910
                                goto parseErr
1✔
911
                        }
912
                case OP_CONNEC:
44,034✔
913
                        switch b {
44,034✔
914
                        case 'T', 't':
44,032✔
915
                                c.state = OP_CONNECT
44,032✔
916
                        default:
2✔
917
                                goto parseErr
2✔
918
                        }
919
                case OP_CONNECT:
88,064✔
920
                        switch b {
88,064✔
921
                        case ' ', '\t':
44,032✔
922
                                continue
44,032✔
923
                        default:
44,032✔
924
                                c.state = CONNECT_ARG
44,032✔
925
                                c.as = i
44,032✔
926
                        }
927
                case CONNECT_ARG:
9,321,535✔
928
                        switch b {
9,321,535✔
929
                        case '\r':
44,031✔
930
                                c.drop = 1
44,031✔
931
                        case '\n':
44,032✔
932
                                var arg []byte
44,032✔
933
                                if c.argBuf != nil {
46,459✔
934
                                        arg = c.argBuf
2,427✔
935
                                        c.argBuf = nil
2,427✔
936
                                } else {
44,032✔
937
                                        arg = buf[c.as : i-c.drop]
41,605✔
938
                                }
41,605✔
939
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
44,032✔
940
                                        return err
×
941
                                }
×
942
                                if trace {
44,720✔
943
                                        c.traceInOp("CONNECT", removeSecretsFromTrace(arg))
688✔
944
                                }
688✔
945
                                if err := c.processConnect(arg); err != nil {
44,315✔
946
                                        return err
283✔
947
                                }
283✔
948
                                c.drop, c.state = 0, OP_START
43,749✔
949
                                // Reset notion on authSet
43,749✔
950
                                c.mu.Lock()
43,749✔
951
                                authSet = c.awaitingAuth()
43,749✔
952
                                c.mu.Unlock()
43,749✔
953
                        default:
9,233,472✔
954
                                if c.argBuf != nil {
9,990,471✔
955
                                        c.argBuf = append(c.argBuf, b)
756,999✔
956
                                }
756,999✔
957
                        }
958
                case OP_M:
6,412,230✔
959
                        switch b {
6,412,230✔
960
                        case 'S', 's':
6,412,230✔
961
                                c.state = OP_MS
6,412,230✔
962
                        default:
×
963
                                goto parseErr
×
964
                        }
965
                case OP_MS:
6,412,230✔
966
                        switch b {
6,412,230✔
967
                        case 'G', 'g':
6,412,230✔
968
                                c.state = OP_MSG
6,412,230✔
969
                        default:
×
970
                                goto parseErr
×
971
                        }
972
                case OP_MSG:
6,412,230✔
973
                        switch b {
6,412,230✔
974
                        case ' ', '\t':
6,412,230✔
975
                                c.state = OP_MSG_SPC
6,412,230✔
976
                        default:
×
977
                                goto parseErr
×
978
                        }
979
                case OP_MSG_SPC:
6,412,230✔
980
                        switch b {
6,412,230✔
981
                        case ' ', '\t':
×
982
                                continue
×
983
                        default:
6,412,230✔
984
                                c.pa.hdr = -1
6,412,230✔
985
                                c.state = MSG_ARG
6,412,230✔
986
                                c.as = i
6,412,230✔
987
                        }
988
                case MSG_ARG:
230,540,497✔
989
                        switch b {
230,540,497✔
990
                        case '\r':
6,412,227✔
991
                                c.drop = 1
6,412,227✔
992
                        case '\n':
6,412,227✔
993
                                var arg []byte
6,412,227✔
994
                                if c.argBuf != nil {
6,601,502✔
995
                                        arg = c.argBuf
189,275✔
996
                                        c.argBuf = nil
189,275✔
997
                                } else {
6,412,227✔
998
                                        arg = buf[c.as : i-c.drop]
6,222,952✔
999
                                }
6,222,952✔
1000
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
6,412,227✔
1001
                                        return err
×
1002
                                }
×
1003
                                var err error
6,412,227✔
1004
                                if c.kind == ROUTER || c.kind == GATEWAY {
12,723,364✔
1005
                                        switch c.op {
6,311,137✔
1006
                                        case 'R', 'r':
6,304,116✔
1007
                                                if trace {
6,310,716✔
1008
                                                        c.traceInOp("RMSG", arg)
6,600✔
1009
                                                }
6,600✔
1010
                                                err = c.processRoutedMsgArgs(arg)
6,304,116✔
1011
                                        case 'L', 'l':
7,021✔
1012
                                                if trace {
7,276✔
1013
                                                        c.traceInOp("LMSG", arg)
255✔
1014
                                                }
255✔
1015
                                                lmsg = true
7,021✔
1016
                                                err = c.processRoutedOriginClusterMsgArgs(arg)
7,021✔
1017
                                        }
1018
                                } else if c.kind == LEAF {
202,180✔
1019
                                        if trace {
101,106✔
1020
                                                c.traceInOp("LMSG", arg)
16✔
1021
                                        }
16✔
1022
                                        err = c.processLeafMsgArgs(arg)
101,090✔
1023
                                }
1024
                                if err != nil {
6,412,228✔
1025
                                        return err
1✔
1026
                                }
1✔
1027
                                c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
6,412,226✔
1028

6,412,226✔
1029
                                // jump ahead with the index. If this overruns
6,412,226✔
1030
                                // what is left we fall out and process split
6,412,226✔
1031
                                // buffer.
6,412,226✔
1032
                                i = c.as + c.pa.size - LEN_CR_LF
6,412,226✔
1033
                        default:
217,716,043✔
1034
                                if c.argBuf != nil {
219,839,159✔
1035
                                        c.argBuf = append(c.argBuf, b)
2,123,116✔
1036
                                }
2,123,116✔
1037
                        }
1038
                case OP_I:
87,623✔
1039
                        switch b {
87,623✔
1040
                        case 'N', 'n':
87,622✔
1041
                                c.state = OP_IN
87,622✔
1042
                        default:
1✔
1043
                                goto parseErr
1✔
1044
                        }
1045
                case OP_IN:
87,622✔
1046
                        switch b {
87,622✔
1047
                        case 'F', 'f':
87,621✔
1048
                                c.state = OP_INF
87,621✔
1049
                        default:
1✔
1050
                                goto parseErr
1✔
1051
                        }
1052
                case OP_INF:
87,621✔
1053
                        switch b {
87,621✔
1054
                        case 'O', 'o':
87,620✔
1055
                                c.state = OP_INFO
87,620✔
1056
                        default:
1✔
1057
                                goto parseErr
1✔
1058
                        }
1059
                case OP_INFO:
175,241✔
1060
                        switch b {
175,241✔
1061
                        case ' ', '\t':
87,621✔
1062
                                continue
87,621✔
1063
                        default:
87,620✔
1064
                                c.state = INFO_ARG
87,620✔
1065
                                c.as = i
87,620✔
1066
                        }
1067
                case INFO_ARG:
33,144,199✔
1068
                        switch b {
33,144,199✔
1069
                        case '\r':
87,618✔
1070
                                c.drop = 1
87,618✔
1071
                        case '\n':
87,619✔
1072
                                var arg []byte
87,619✔
1073
                                if c.argBuf != nil {
95,934✔
1074
                                        arg = c.argBuf
8,315✔
1075
                                        c.argBuf = nil
8,315✔
1076
                                } else {
87,619✔
1077
                                        arg = buf[c.as : i-c.drop]
79,304✔
1078
                                }
79,304✔
1079
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
87,619✔
1080
                                        return err
×
1081
                                }
×
1082
                                if err := c.processInfo(arg); err != nil {
87,620✔
1083
                                        return err
1✔
1084
                                }
1✔
1085
                                c.drop, c.as, c.state = 0, i+1, OP_START
87,618✔
1086
                        default:
32,968,962✔
1087
                                if c.argBuf != nil {
33,916,403✔
1088
                                        c.argBuf = append(c.argBuf, b)
947,441✔
1089
                                }
947,441✔
1090
                        }
1091
                case OP_PLUS:
3✔
1092
                        switch b {
3✔
1093
                        case 'O', 'o':
2✔
1094
                                c.state = OP_PLUS_O
2✔
1095
                        default:
1✔
1096
                                goto parseErr
1✔
1097
                        }
1098
                case OP_PLUS_O:
2✔
1099
                        switch b {
2✔
1100
                        case 'K', 'k':
1✔
1101
                                c.state = OP_PLUS_OK
1✔
1102
                        default:
1✔
1103
                                goto parseErr
1✔
1104
                        }
1105
                case OP_PLUS_OK:
2✔
1106
                        switch b {
2✔
1107
                        case '\n':
1✔
1108
                                c.drop, c.state = 0, OP_START
1✔
1109
                        }
1110
                case OP_MINUS:
139✔
1111
                        switch b {
139✔
1112
                        case 'E', 'e':
138✔
1113
                                c.state = OP_MINUS_E
138✔
1114
                        default:
1✔
1115
                                goto parseErr
1✔
1116
                        }
1117
                case OP_MINUS_E:
138✔
1118
                        switch b {
138✔
1119
                        case 'R', 'r':
137✔
1120
                                c.state = OP_MINUS_ER
137✔
1121
                        default:
1✔
1122
                                goto parseErr
1✔
1123
                        }
1124
                case OP_MINUS_ER:
137✔
1125
                        switch b {
137✔
1126
                        case 'R', 'r':
136✔
1127
                                c.state = OP_MINUS_ERR
136✔
1128
                        default:
1✔
1129
                                goto parseErr
1✔
1130
                        }
1131
                case OP_MINUS_ERR:
136✔
1132
                        switch b {
136✔
1133
                        case ' ', '\t':
135✔
1134
                                c.state = OP_MINUS_ERR_SPC
135✔
1135
                        default:
1✔
1136
                                goto parseErr
1✔
1137
                        }
1138
                case OP_MINUS_ERR_SPC:
135✔
1139
                        switch b {
135✔
1140
                        case ' ', '\t':
×
1141
                                continue
×
1142
                        default:
135✔
1143
                                c.state = MINUS_ERR_ARG
135✔
1144
                                c.as = i
135✔
1145
                        }
1146
                case MINUS_ERR_ARG:
6,322✔
1147
                        switch b {
6,322✔
1148
                        case '\r':
135✔
1149
                                c.drop = 1
135✔
1150
                        case '\n':
135✔
1151
                                var arg []byte
135✔
1152
                                if c.argBuf != nil {
136✔
1153
                                        arg = c.argBuf
1✔
1154
                                        c.argBuf = nil
1✔
1155
                                } else {
135✔
1156
                                        arg = buf[c.as : i-c.drop]
134✔
1157
                                }
134✔
1158
                                if err := c.overMaxControlLineLimit(arg, mcl); err != nil {
135✔
1159
                                        return err
×
1160
                                }
×
1161
                                c.processErr(string(arg))
135✔
1162
                                c.drop, c.as, c.state = 0, i+1, OP_START
135✔
1163
                        default:
6,052✔
1164
                                if c.argBuf != nil {
6,052✔
1165
                                        c.argBuf = append(c.argBuf, b)
×
1166
                                }
×
1167
                        }
1168
                default:
×
1169
                        goto parseErr
×
1170
                }
1171
        }
1172

1173
        // Check for split buffer scenarios for any ARG state.
1174
        if c.state == SUB_ARG || c.state == UNSUB_ARG ||
7,180,648✔
1175
                c.state == PUB_ARG || c.state == HPUB_ARG ||
7,180,648✔
1176
                c.state == ASUB_ARG || c.state == AUSUB_ARG ||
7,180,648✔
1177
                c.state == MSG_ARG || c.state == HMSG_ARG ||
7,180,648✔
1178
                c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
7,446,439✔
1179

265,791✔
1180
                // Setup a holder buffer to deal with split buffer scenario.
265,791✔
1181
                if c.argBuf == nil {
531,262✔
1182
                        c.argBuf = c.scratch[:0]
265,471✔
1183
                        c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
265,471✔
1184
                }
265,471✔
1185
                // Check for violations of control line length here. Note that this is not
1186
                // exact at all but the performance hit is too great to be precise, and
1187
                // catching here should prevent memory exhaustion attacks.
1188
                if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil {
265,793✔
1189
                        return err
2✔
1190
                }
2✔
1191
        }
1192

1193
        // Check for split msg
1194
        if (c.state == MSG_PAYLOAD || c.state == MSG_END_R || c.state == MSG_END_N) && c.msgBuf == nil {
7,904,253✔
1195
                // We need to clone the pubArg if it is still referencing the
723,607✔
1196
                // read buffer and we are not able to process the msg.
723,607✔
1197

723,607✔
1198
                if c.argBuf == nil {
1,447,214✔
1199
                        // Works also for MSG_ARG, when message comes from ROUTE or GATEWAY.
723,607✔
1200
                        if err := c.clonePubArg(lmsg); err != nil {
723,607✔
1201
                                goto parseErr
×
1202
                        }
1203
                }
1204

1205
                // If we will overflow the scratch buffer, just create a
1206
                // new buffer to hold the split message.
1207
                if c.pa.size > cap(c.scratch)-len(c.argBuf) {
758,907✔
1208
                        lrem := len(buf[c.as:])
35,300✔
1209
                        // Consider it a protocol error when the remaining payload
35,300✔
1210
                        // is larger than the reported size for PUB. It can happen
35,300✔
1211
                        // when processing incomplete messages from rogue clients.
35,300✔
1212
                        if lrem > c.pa.size+LEN_CR_LF {
35,300✔
1213
                                goto parseErr
×
1214
                        }
1215
                        c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
35,300✔
1216
                        copy(c.msgBuf, buf[c.as:])
35,300✔
1217
                } else {
688,307✔
1218
                        c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
688,307✔
1219
                        c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
688,307✔
1220
                }
688,307✔
1221
        }
1222

1223
        return nil
7,180,646✔
1224

7,180,646✔
1225
authErr:
7,180,646✔
1226
        c.authViolation()
12✔
1227
        return ErrAuthentication
12✔
1228

12✔
1229
parseErr:
12✔
1230
        c.sendErr("Unknown Protocol Operation")
53✔
1231
        snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
53✔
1232
        err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
53✔
1233
        return err
53✔
1234
}
1235

1236
func protoSnippet(start, max int, buf []byte) string {
111✔
1237
        stop := start + max
111✔
1238
        bufSize := len(buf)
111✔
1239
        if start >= bufSize {
114✔
1240
                return `""`
3✔
1241
        }
3✔
1242
        if stop > bufSize {
191✔
1243
                stop = bufSize - 1
83✔
1244
        }
83✔
1245
        return fmt.Sprintf("%q", buf[start:stop])
108✔
1246
}
1247

1248
// Check if the length of buffer `arg` is over the max control line limit `mcl`.
1249
// If so, an error is sent to the client and the connection is closed.
1250
// The error ErrMaxControlLine is returned.
1251
func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error {
18,803,882✔
1252
        if c.kind != CLIENT {
27,324,554✔
1253
                return nil
8,520,672✔
1254
        }
8,520,672✔
1255
        if len(arg) > int(mcl) {
10,283,213✔
1256
                err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)",
3✔
1257
                        c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTROL_LINE_SNIPPET_SIZE, arg))
3✔
1258
                c.sendErr(err.Error())
3✔
1259
                c.closeConnection(MaxControlLineExceeded)
3✔
1260
                return err
3✔
1261
        }
3✔
1262
        return nil
10,283,207✔
1263
}
1264

1265
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
1266
// we need to hold onto it into the next read.
1267
func (c *client) clonePubArg(lmsg bool) error {
723,607✔
1268
        // Just copy and re-process original arg buffer.
723,607✔
1269
        c.argBuf = c.scratch[:0]
723,607✔
1270
        c.argBuf = append(c.argBuf, c.pa.arg...)
723,607✔
1271

723,607✔
1272
        switch c.kind {
723,607✔
1273
        case ROUTER, GATEWAY:
683,481✔
1274
                if lmsg {
684,127✔
1275
                        return c.processRoutedOriginClusterMsgArgs(c.argBuf)
646✔
1276
                }
646✔
1277
                if c.pa.hdr < 0 {
1,272,366✔
1278
                        return c.processRoutedMsgArgs(c.argBuf)
589,531✔
1279
                } else {
682,835✔
1280
                        return c.processRoutedHeaderMsgArgs(c.argBuf)
93,304✔
1281
                }
93,304✔
1282
        case LEAF:
2,063✔
1283
                if c.pa.hdr < 0 {
4,044✔
1284
                        return c.processLeafMsgArgs(c.argBuf)
1,981✔
1285
                } else {
2,063✔
1286
                        return c.processLeafHeaderMsgArgs(c.argBuf)
82✔
1287
                }
82✔
1288
        default:
38,063✔
1289
                if c.pa.hdr < 0 {
75,269✔
1290
                        return c.processPub(c.argBuf)
37,206✔
1291
                } else {
38,063✔
1292
                        return c.processHeaderPub(c.argBuf, nil)
857✔
1293
                }
857✔
1294
        }
1295
}
1296

1297
func (ps *parseState) getHeader() http.Header {
840✔
1298
        if ps.header == nil {
1,680✔
1299
                if hdr := ps.pa.hdr; hdr > 0 {
913✔
1300
                        reader := bufio.NewReader(bytes.NewReader(ps.msgBuf[0:hdr]))
73✔
1301
                        tp := textproto.NewReader(reader)
73✔
1302
                        tp.ReadLine() // skip over first line, contains version
73✔
1303
                        if mimeHeader, err := tp.ReadMIMEHeader(); err == nil {
146✔
1304
                                ps.header = http.Header(mimeHeader)
73✔
1305
                        }
73✔
1306
                }
1307
        }
1308
        return ps.header
840✔
1309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc