• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1966

15 May 2025 12:45PM UTC coverage: 95.646% (-0.03%) from 95.679%
#1966

push

github

web-flow
Merge pull request #1316 from francoisprunier/header-validation-fix

Changed header value validation to accept any ascii except CR & LF

1 of 1 new or added line in 1 file covered. (100.0%)

6 existing lines in 3 files now uncovered.

11688 of 12220 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.94
/src/main/java/io/nats/client/impl/NatsConnectionReader.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.ReadListener;
17
import io.nats.client.support.IncomingHeadersProcessor;
18

19
import java.io.IOException;
20
import java.nio.ByteBuffer;
21
import java.nio.CharBuffer;
22
import java.nio.charset.StandardCharsets;
23
import java.util.concurrent.CancellationException;
24
import java.util.concurrent.CompletableFuture;
25
import java.util.concurrent.ExecutionException;
26
import java.util.concurrent.Future;
27
import java.util.concurrent.atomic.AtomicBoolean;
28

29
import static io.nats.client.support.NatsConstants.*;
30

31
class NatsConnectionReader implements Runnable {
32

33
    enum Mode {
1✔
34
        GATHER_OP,
1✔
35
        GATHER_PROTO,
1✔
36
        GATHER_MSG_HMSG_PROTO,
1✔
37
        PARSE_PROTO,
1✔
38
        GATHER_HEADERS,
1✔
39
        GATHER_DATA
1✔
40
    };
41

42
    private final NatsConnection connection;
43

44
    private ByteBuffer protocolBuffer; // use a byte buffer to assist character decoding
45

46
    private boolean gotCR;
47
    
48
    private String op;
49
    private final char[] opArray;
50
    private int opPos;
51

52
    private final char[] msgLineChars;
53
    private int msgLinePosition;
54

55
    private Mode mode;
56

57
    private IncomingMessageFactory incoming;
58
    private byte[] msgHeaders;
59
    private byte[] msgData;
60
    private int msgHeadersPosition;
61
    private int msgDataPosition;
62

63
    private final byte[] buffer;
64
    private int bufferPosition;
65

66
    private Future<Boolean> stopped;
67
    private Future<DataPort> dataPortFuture;
68
    private DataPort dataPort;
69
    private final AtomicBoolean running;
70

71
    private final boolean utf8Mode;
72
    private final ReadListener readListener;
73

74
    NatsConnectionReader(NatsConnection connection) {
1✔
75
        this.connection = connection;
1✔
76

77
        this.running = new AtomicBoolean(false);
1✔
78
        this.stopped = new CompletableFuture<>();
1✔
79
        ((CompletableFuture<Boolean>)this.stopped).complete(Boolean.TRUE); // we are stopped on creation
1✔
80

81
        this.protocolBuffer = ByteBuffer.allocate(this.connection.getOptions().getMaxControlLine());
1✔
82
        this.msgLineChars = new char[this.connection.getOptions().getMaxControlLine()];
1✔
83
        this.opArray = new char[MAX_PROTOCOL_RECEIVE_OP_LENGTH];
1✔
84
        this.buffer = new byte[connection.getOptions().getBufferSize()];
1✔
85
        this.bufferPosition = 0;
1✔
86

87
        this.utf8Mode = connection.getOptions().supportUTF8Subjects();
1✔
88
        readListener = connection.getOptions().getReadListener();
1✔
89
    }
1✔
90

91
    // Should only be called if the current thread has exited.
92
    // Use the Future from stop() to determine if it is ok to call this.
93
    // This method resets that future so mistiming can result in badness.
94
    void start(Future<DataPort> dataPortFuture) {
95
        this.dataPortFuture = dataPortFuture;
1✔
96
        this.running.set(true);
1✔
97
        this.stopped = connection.getExecutor().submit(this, Boolean.TRUE);
1✔
98
    }
1✔
99

100
    Future<Boolean> stop() {
101
        return stop(true);
1✔
102
    }
103

104
    // May be called several times on an error.
105
    // Returns a future that is completed when the thread completes, not when this
106
    // method does.
107
    Future<Boolean> stop(boolean shutdownDataPort) {
108
        if (running.get()) {
1✔
109
            running.set(false);
1✔
110
            if (shutdownDataPort && dataPort != null) {
1✔
111
                try {
112
                    dataPort.shutdownInput();
1✔
113
                }
114
                catch (IOException e) {
×
115
                    // we don't care, we are shutting down anyway
116
                }
1✔
117
            }
118
        }
119
        return stopped;
1✔
120
    }
121

122
    boolean isRunning() {
123
        return running.get();
1✔
124
    }
125

126
    @Override
127
    public void run() {
128
        try {
129
            dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
1✔
130
            this.mode = Mode.GATHER_OP;
1✔
131
            this.gotCR = false;
1✔
132
            this.opPos = 0;
1✔
133

134
            while (running.get() && !Thread.interrupted()) {
1✔
135
                this.bufferPosition = 0;
1✔
136
                int bytesRead = dataPort.read(this.buffer, 0, this.buffer.length);
1✔
137

138
                if (bytesRead > 0) {
1✔
139
                    connection.getNatsStatistics().registerRead(bytesRead);
1✔
140

141
                    while (this.bufferPosition < bytesRead) {
1✔
142
                        if (this.mode == Mode.GATHER_OP) {
1✔
143
                            this.gatherOp(bytesRead);
1✔
144
                        }
145
                        else if (this.mode == Mode.GATHER_MSG_HMSG_PROTO) {
1✔
146
                            if (this.utf8Mode) {
1✔
147
                                this.gatherProtocol(bytesRead);
1✔
148
                            } else {
149
                                this.gatherMessageProtocol(bytesRead);
1✔
150
                            }
151
                        }
152
                        else if (this.mode == Mode.GATHER_PROTO) {
1✔
153
                            this.gatherProtocol(bytesRead);
1✔
154
                        }
155
                        else if (this.mode == Mode.GATHER_HEADERS) {
1✔
156
                            this.gatherHeaders(bytesRead);
1✔
157
                        }
158
                        else {  // Mode.GATHER_DATA
159
                            this.gatherMessageData(bytesRead);
1✔
160
                        }
161

162
                        if (this.mode == Mode.PARSE_PROTO) { // Could be the end of the read
1✔
163
                            this.parseProtocolMessage();
1✔
164
                            this.protocolBuffer.clear();
1✔
165
                        }
166
                    }
167
                } else if (bytesRead < 0) {
1✔
168
                    throw new IOException("Read channel closed.");
1✔
169
                } else {
170
                    this.connection.getNatsStatistics().registerRead(bytesRead); // track the 0
×
171
                }
172
            }
1✔
173
        } catch (IOException io) {
1✔
174
            // if already not running, an IOE is not unreasonable in a transition state
175
            if (running.get()) {
1✔
176
                this.connection.handleCommunicationIssue(io);
1✔
177
            }
178
        } catch (CancellationException | ExecutionException ex) {
×
179
            // Exit
180
        } catch (InterruptedException ex) {
×
181
            // Exit
182
            Thread.currentThread().interrupt();
×
183
        } finally {
184
            this.running.set(false);
1✔
185
            // Clear the buffers, since they are only used inside this try/catch
186
            // We will reuse later
187
            this.protocolBuffer.clear();
1✔
188
        }
189
    }
1✔
190

191
    // Gather the op, either up to the first space or the first carriage return.
192
    void gatherOp(int maxPos) throws IOException {
193
        try {
194
            while(this.bufferPosition < maxPos) {
1✔
195
                byte b = this.buffer[this.bufferPosition];
1✔
196
                this.bufferPosition++;
1✔
197

198
                if (gotCR) {
1✔
199
                    if (b == LF) { // Got CRLF, jump to parsing
1✔
200
                        this.op = opFor(opArray, opPos);
1✔
201
                        this.gotCR = false;
1✔
202
                        this.opPos = 0;
1✔
203
                        this.mode = Mode.PARSE_PROTO;
1✔
204
                        break;
1✔
205
                    } else {
206
                        throw new IllegalStateException("Bad socket data, no LF after CR");
1✔
207
                    }
208
                } else if (b == SP || b == TAB) { // Got a space, get the rest of the protocol line
1✔
209
                    this.op = opFor(opArray, opPos);
1✔
210
                    this.opPos = 0;
1✔
211
                    if (this.op.equals(OP_MSG) || this.op.equals(OP_HMSG)) {
1✔
212
                        this.msgLinePosition = 0;
1✔
213
                        this.mode = Mode.GATHER_MSG_HMSG_PROTO;
1✔
214
                    } else {
215
                        this.mode = Mode.GATHER_PROTO;
1✔
216
                    }
217
                    break;
1✔
218
                } else if (b == CR) {
1✔
219
                    this.gotCR = true;
1✔
220
                } else {
221
                    this.opArray[opPos] = (char) b;
1✔
222
                    this.opPos++;
1✔
223
                }
224
            }
1✔
225
        } catch (ArrayIndexOutOfBoundsException | IllegalStateException | NumberFormatException | NullPointerException ex) {
1✔
226
            this.encounteredProtocolError(ex);
×
227
        }
1✔
228
    }
1✔
229

230
    // Stores the message protocol line in a char buffer that will be read for subject, reply
231
    void gatherMessageProtocol(int maxPos) throws IOException {
232
        try {
233
            while(this.bufferPosition < maxPos) {
1✔
234
                byte b = this.buffer[this.bufferPosition];
1✔
235
                this.bufferPosition++;
1✔
236

237
                if (gotCR) {
1✔
238
                    if (b == LF) {
1✔
239
                        this.mode = Mode.PARSE_PROTO;
1✔
240
                        this.gotCR = false;
1✔
241
                        break;
1✔
242
                    } else {
243
                        throw new IllegalStateException("Bad socket data, no LF after CR");
1✔
244
                    }
245
                } else if (b == CR) {
1✔
246
                    this.gotCR = true;
1✔
247
                } else {
248
                    if (this.msgLinePosition >= this.msgLineChars.length) {
1✔
249
                        throw new IllegalStateException("Protocol line is too long");
×
250
                    }
251
                    this.msgLineChars[this.msgLinePosition] = (char) b; // Assumes ascii, as per protocol doc
1✔
252
                    this.msgLinePosition++;
1✔
253
                }
254
            }
1✔
255
        } catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
1✔
256
            this.encounteredProtocolError(ex);
×
257
        }
1✔
258
    }
1✔
259

260
    // Gather bytes for a protocol line
261
    void gatherProtocol(int maxPos) throws IOException {
262
        // protocol buffer has max capacity, shouldn't need resizing
263
        try {
264
            while(this.bufferPosition < maxPos) {
1✔
265
                byte b = this.buffer[this.bufferPosition];
1✔
266
                this.bufferPosition++;
1✔
267

268
                if (gotCR) {
1✔
269
                    if (b == LF) {
1✔
270
                        this.protocolBuffer.flip();
1✔
271
                        this.mode = Mode.PARSE_PROTO;
1✔
272
                        this.gotCR = false;
1✔
273
                        break;
1✔
274
                    } else {
275
                        throw new IllegalStateException("Bad socket data, no LF after CR");
×
276
                    }
277
                } else if (b == CR) {
1✔
278
                    this.gotCR = true;
1✔
279
                } else {
280
                    if (!protocolBuffer.hasRemaining()) {
1✔
281
                        this.protocolBuffer = this.connection.enlargeBuffer(this.protocolBuffer); // just double it
1✔
282
                    }
283
                    this.protocolBuffer.put(b);
1✔
284
                }
285
            }
1✔
286
        } catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
×
287
            this.encounteredProtocolError(ex);
×
288
        }
1✔
289
    }
1✔
290

291
    void gatherHeaders(int maxPos) throws IOException {
292
        try {
293
            while(this.bufferPosition < maxPos) {
1✔
294
                int possible = maxPos - this.bufferPosition;
1✔
295
                int want = msgHeaders.length - msgHeadersPosition;
1✔
296

297
                // Grab all we can, until we get the necessary number of bytes
298
                if (want > 0 && want <= possible) {
1✔
299
                    System.arraycopy(this.buffer, this.bufferPosition, this.msgHeaders, this.msgHeadersPosition, want);
1✔
300
                    msgHeadersPosition += want;
1✔
301
                    this.bufferPosition += want;
1✔
302
                    continue;
1✔
303
                } else if (want > 0) {
1✔
UNCOV
304
                    System.arraycopy(this.buffer, this.bufferPosition, this.msgHeaders, this.msgHeadersPosition, possible);
×
UNCOV
305
                    msgHeadersPosition += possible;
×
UNCOV
306
                    this.bufferPosition += possible;
×
UNCOV
307
                    continue;
×
308
                }
309

310
                if (msgHeadersPosition == msgHeaders.length) {
1✔
311
                    incoming.setHeaders(new IncomingHeadersProcessor(msgHeaders));
1✔
312
                    msgHeaders = null;
1✔
313
                    msgHeadersPosition = -1;
1✔
314
                    this.mode = Mode.GATHER_DATA;
1✔
315
                    break;
1✔
316
                } else {
317
                    throw new IllegalStateException("Bad socket data, headers do not match expected length");
×
318
                }
319
            }
320
        } catch (IllegalStateException | NullPointerException ex) {
×
321
            this.encounteredProtocolError(ex);
×
322
        }
1✔
323
    }
1✔
324

325
    // Gather bytes for a message body into a byte array that is then
326
    // given to the message object
327
    void gatherMessageData(int maxPos) throws IOException {
328
        try {
329
            while(this.bufferPosition < maxPos) {
1✔
330
                int possible = maxPos - this.bufferPosition;
1✔
331
                int want = msgData.length - msgDataPosition;
1✔
332

333
                // Grab all we can, until we get to the CR/LF
334
                if (want > 0 && want <= possible) {
1✔
335
                    System.arraycopy(this.buffer, this.bufferPosition, this.msgData, this.msgDataPosition, want);
1✔
336
                    msgDataPosition += want;
1✔
337
                    this.bufferPosition += want;
1✔
338
                    continue;
1✔
339
                } else if (want > 0) {
1✔
340
                    System.arraycopy(this.buffer, this.bufferPosition, this.msgData, this.msgDataPosition, possible);
1✔
341
                    msgDataPosition += possible;
1✔
342
                    this.bufferPosition += possible;
1✔
343
                    continue;
1✔
344
                }
345

346
                byte b = this.buffer[this.bufferPosition];
1✔
347
                this.bufferPosition++;
1✔
348

349
                if (gotCR) {
1✔
350
                    if (b == LF) {
1✔
351
                        incoming.setData(msgData);
1✔
352
                        NatsMessage m = incoming.getMessage();
1✔
353
                        this.connection.deliverMessage(m);
1✔
354
                        if (readListener != null) {
1✔
355
                            readListener.message(op, m);
×
356
                        }
357
                        msgData = null;
1✔
358
                        msgDataPosition = 0;
1✔
359
                        incoming = null;
1✔
360
                        gotCR = false;
1✔
361
                        this.op = UNKNOWN_OP;
1✔
362
                        this.mode = Mode.GATHER_OP;
1✔
363
                        break;
1✔
364
                    } else {
365
                        throw new IllegalStateException("Bad socket data, no LF after CR");
1✔
366
                    }
367
                } else if (b == CR) {
1✔
368
                    gotCR = true;
1✔
369
                } else {
370
                    throw new IllegalStateException("Bad socket data, no CRLF after data");
1✔
371
                }
372
            }
1✔
373
        } catch (IllegalStateException | NullPointerException ex) {
1✔
374
            this.encounteredProtocolError(ex);
×
375
        }
1✔
376
    }
1✔
377

378
    public String grabNextMessageLineElement(int max) {
379
        if (this.msgLinePosition >= max) {
1✔
380
            return null;
1✔
381
        }
382

383
        int start = this.msgLinePosition;
1✔
384

385
        while (this.msgLinePosition < max) {
1✔
386
            char c = this.msgLineChars[this.msgLinePosition];
1✔
387
            this.msgLinePosition++;
1✔
388

389
            if (c == SP || c == TAB) {
1✔
390
                return new String(this.msgLineChars, start, this.msgLinePosition - start -1); //don't grab the space, avoid an intermediate char sequence
1✔
391
            }
392
        }
1✔
393

394
        return new String(this.msgLineChars, start, this.msgLinePosition-start);
1✔
395
    }
396

397
    static String opFor(char[] chars, int length) {
398
        if (length == 3) {
1✔
399
            if ((chars[0] == 'M' || chars[0] == 'm') &&
1✔
400
                        (chars[1] == 'S' || chars[1] == 's') && 
401
                        (chars[2] == 'G' || chars[2] == 'g')) {
402
                return OP_MSG;
1✔
403
            } else if (chars[0] == '+' && 
1✔
404
                (chars[1] == 'O' || chars[1] == 'o') && 
405
                (chars[2] == 'K' || chars[2] == 'k')) {
406
                return OP_OK;
1✔
407
            } else {
408
                return UNKNOWN_OP;
1✔
409
            }
410
        } else if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap
1✔
411
            if ((chars[1] == 'I' || chars[1] == 'i') && 
1✔
412
                    (chars[0] == 'P' || chars[0] == 'p') && 
413
                    (chars[2] == 'N' || chars[2] == 'n') &&
414
                    (chars[3] == 'G' || chars[3] == 'g')) {
415
                return OP_PING;
1✔
416
            } else if ((chars[1] == 'O' || chars[1] == 'o') && 
1✔
417
                        (chars[0] == 'P' || chars[0] == 'p') && 
418
                        (chars[2] == 'N' || chars[2] == 'n') &&
419
                        (chars[3] == 'G' || chars[3] == 'g')) {
420
                return OP_PONG;
1✔
421
            } else if (chars[0] == '-' && 
1✔
422
                        (chars[1] == 'E' || chars[1] == 'e') &&
423
                        (chars[2] == 'R' || chars[2] == 'r') && 
424
                        (chars[3] == 'R' || chars[3] == 'r')) {
425
                return OP_ERR;
1✔
426
            } else if ((chars[0] == 'I' || chars[0] == 'i') &&
1✔
427
                    (chars[1] == 'N' || chars[1] == 'n') &&
428
                    (chars[2] == 'F' || chars[2] == 'f') &&
429
                    (chars[3] == 'O' || chars[3] == 'o')) {
430
                return OP_INFO;
1✔
431
            } else if ((chars[0] == 'H' || chars[0] == 'h') &&
1✔
432
                    (chars[1] == 'M' || chars[1] == 'm') &&
433
                    (chars[2] == 'S' || chars[2] == 's') &&
434
                    (chars[3] == 'G' || chars[3] == 'g')) {
435
                return OP_HMSG;
1✔
436
            }  else {
437
                return UNKNOWN_OP;
1✔
438
            }
439
        } else {
440
            return UNKNOWN_OP;
1✔
441
        }
442
    }
443

444
    private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
1✔
445

446
    public static int parseLength(String s) throws NumberFormatException {
447
        int length = s.length();
1✔
448
        int retVal = 0;
1✔
449

450
        if (length > TENS.length) {
1✔
451
            throw new NumberFormatException("Long in message length \"" + s + "\" "+length+" > "+TENS.length);
1✔
452
        }
453
        
454
        for (int i=length-1;i>=0;i--) {
1✔
455
            char c = s.charAt(i);
1✔
456
            int d = (c - '0');
1✔
457

458
            if (d>9) {
1✔
459
                throw new NumberFormatException("Invalid char in message length '" + c + "'");
1✔
460
            }
461

462
            retVal += d * TENS[length - i - 1];
1✔
463
        }
464

465
        return retVal;
1✔
466
    }
467

468
    void parseProtocolMessage() throws IOException {
469
        try {
470
            switch (this.op) {
1✔
471
                case OP_MSG:
472
                    int protocolLength = this.msgLinePosition; //This is just after the last character
1✔
473
                    int protocolLineLength = protocolLength + 4; // 4 for the "MSG "
1✔
474

475
                    if (this.utf8Mode) {
1✔
476
                        protocolLineLength = protocolBuffer.remaining() + 4;
1✔
477

478
                        CharBuffer buff = StandardCharsets.UTF_8.decode(protocolBuffer);
1✔
479
                        protocolLength = buff.remaining();
1✔
480
                        buff.get(this.msgLineChars, 0, protocolLength);
1✔
481
                    }
482

483
                    this.msgLinePosition = 0;
1✔
484
                    String subject = grabNextMessageLineElement(protocolLength);
1✔
485
                    String sid = grabNextMessageLineElement(protocolLength);
1✔
486
                    String replyTo = grabNextMessageLineElement(protocolLength);
1✔
487
                    String lengthChars = null;
1✔
488

489
                    if (this.msgLinePosition < protocolLength) {
1✔
490
                        lengthChars = grabNextMessageLineElement(protocolLength);
1✔
491
                    } else {
492
                        lengthChars = replyTo;
1✔
493
                        replyTo = null;
1✔
494
                    }
495

496
                    if (subject == null || subject.isEmpty() || sid == null || sid.isEmpty() || lengthChars == null) {
1✔
497
                        throw new IllegalStateException("Bad MSG control line, missing required fields");
1✔
498
                    }
499

500
                    int incomingLength = parseLength(lengthChars);
1✔
501

502
                    this.incoming = new IncomingMessageFactory(sid, subject, replyTo, protocolLineLength, utf8Mode);
1✔
503
                    this.mode = Mode.GATHER_DATA;
1✔
504
                    this.msgData = new byte[incomingLength];
1✔
505
                    this.msgDataPosition = 0;
1✔
506
                    this.msgLinePosition = 0;
1✔
507
                    break;
1✔
508
                case OP_HMSG:
509
                    int hProtocolLength = this.msgLinePosition; //This is just after the last character
1✔
510
                    int hProtocolLineLength = hProtocolLength + 5; // 5 for the "HMSG "
1✔
511

512
                    if (this.utf8Mode) {
1✔
513
                        hProtocolLineLength = protocolBuffer.remaining() + 5;
×
514

515
                        CharBuffer buff = StandardCharsets.UTF_8.decode(protocolBuffer);
×
516
                        hProtocolLength = buff.remaining();
×
517
                        buff.get(this.msgLineChars, 0, hProtocolLength);
×
518
                    }
519

520
                    this.msgLinePosition = 0;
1✔
521
                    String hSubject = grabNextMessageLineElement(hProtocolLength);
1✔
522
                    String hSid = grabNextMessageLineElement(hProtocolLength);
1✔
523
                    String replyToOrHdrLen = grabNextMessageLineElement(hProtocolLength);
1✔
524
                    String hdrLenOrTotLen = grabNextMessageLineElement(hProtocolLength);
1✔
525

526
                    String hReplyTo = null;
1✔
527
                    int hdrLen = -1;
1✔
528
                    int totLen = -1;
1✔
529

530
                    // if there is more it must be replyTo hdrLen totLen instead of just hdrLen totLen
531
                    if (this.msgLinePosition < hProtocolLength) {
1✔
532
                        hReplyTo = replyToOrHdrLen;
1✔
533
                        hdrLen = parseLength(hdrLenOrTotLen);
1✔
534
                        totLen = parseLength(grabNextMessageLineElement(hProtocolLength));
1✔
535
                    } else {
536
                        hdrLen = parseLength(replyToOrHdrLen);
1✔
537
                        totLen = parseLength(hdrLenOrTotLen);
1✔
538
                    }
539

540
                    if(hSubject==null || hSubject.isEmpty() || hSid==null || hSid.isEmpty()) {
1✔
541
                        throw new IllegalStateException("Bad HMSG control line, missing required fields");
×
542
                    }
543

544
                    this.incoming = new IncomingMessageFactory(hSid, hSubject, hReplyTo, hProtocolLineLength, utf8Mode);
1✔
545
                    this.msgHeaders = new byte[hdrLen];
1✔
546
                    this.msgData = new byte[totLen - hdrLen];
1✔
547
                    this.mode = Mode.GATHER_HEADERS;
1✔
548
                    this.msgHeadersPosition = 0;
1✔
549
                    this.msgDataPosition = 0;
1✔
550
                    this.msgLinePosition = 0;
1✔
551
                    break;
1✔
552
                case OP_OK:
553
                    this.connection.processOK();
1✔
554
                    if (readListener != null) {
1✔
555
                        readListener.protocol(op, null);
×
556
                    }
557
                    this.op = UNKNOWN_OP;
1✔
558
                    this.mode = Mode.GATHER_OP;
1✔
559
                    break;
1✔
560
                case OP_ERR:
561
                    String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
1✔
562
                    this.connection.processError(errorText);
1✔
563
                    if (readListener != null) {
1✔
564
                        readListener.protocol(op, errorText);
×
565
                    }
566
                    this.op = UNKNOWN_OP;
1✔
567
                    this.mode = Mode.GATHER_OP;
1✔
568
                    break;
1✔
569
                case OP_PING:
570
                    this.connection.sendPong();
1✔
571
                    if (readListener != null) {
1✔
572
                        readListener.protocol(op, null);
×
573
                    }
574
                    this.op = UNKNOWN_OP;
1✔
575
                    this.mode = Mode.GATHER_OP;
1✔
576
                    break;
1✔
577
                case OP_PONG:
578
                    this.connection.handlePong();
1✔
579
                    if (readListener != null) {
1✔
580
                        readListener.protocol(op, null);
×
581
                    }
582
                    this.op = UNKNOWN_OP;
1✔
583
                    this.mode = Mode.GATHER_OP;
1✔
584
                    break;
1✔
585
                case OP_INFO:
586
                    String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
1✔
587
                    this.connection.handleInfo(info);
1✔
588
                    if (readListener != null) {
1✔
589
                        readListener.protocol(op, info);
×
590
                    }
591
                    this.op = UNKNOWN_OP;
1✔
592
                    this.mode = Mode.GATHER_OP;
1✔
593
                    break;
1✔
594
                default:
595
                    throw new IllegalStateException("Unknown protocol operation "+op);
1✔
596
            }
597

598
        } catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
1✔
599
            this.encounteredProtocolError(ex);
×
600
        }
1✔
601
    }
1✔
602

603
    void encounteredProtocolError(Exception ex) throws IOException {
604
        throw new IOException(ex);
1✔
605
    }
606

607
    //For testing
608
    void fakeReadForTest(byte[] bytes) {
609
        System.arraycopy(bytes, 0, this.buffer, 0, bytes.length);
1✔
610
        this.bufferPosition = 0;
1✔
611
        this.op = UNKNOWN_OP;
1✔
612
        this.mode = Mode.GATHER_OP;
1✔
613
    }
1✔
614

615
    String currentOp() {
616
        return this.op;
1✔
617
    }
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc