• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2238

29 Sep 2025 04:41PM UTC coverage: 95.464% (-0.06%) from 95.521%
#2238

push

github

web-flow
Merge pull request #1438 from nats-io/improve-connection-listener

Improve ConnectionListener

52 of 64 new or added lines in 2 files covered. (81.25%)

7 existing lines in 3 files now uncovered.

12164 of 12742 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/src/main/java/io/nats/client/support/WebsocketInputStream.java
1
// Copyright 2022 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.support;
15

16
import io.nats.client.support.WebsocketFrameHeader.OpCode;
17

18
import java.io.IOException;
19
import java.io.InputStream;
20

21
public class WebsocketInputStream extends InputStream {
22
    private byte[] buffer = new byte[WebsocketFrameHeader.MAX_FRAME_HEADER_SIZE];
1✔
23
    private WebsocketFrameHeader header = new WebsocketFrameHeader();
1✔
24
    private InputStream in;
25
    private byte[] oneByte = new byte[1];
1✔
26

27
    public WebsocketInputStream(InputStream in) {
1✔
28
        this.in = in;
1✔
29
    }
1✔
30

31
    @Override
32
    public int available() throws IOException {
33
        return in.available();
1✔
34
    }
35

36
    @Override
37
    public void close() throws IOException {
38
        in.close();
1✔
39
    }
1✔
40

41
    @Override
42
    public void mark(int readLimit) {
43

44
    }
1✔
45

46
    @Override
47
    public boolean markSupported() {
48
        return false;
1✔
49
    }
50

51
    @Override
52
    public int read(byte[] buffer) throws IOException {
53
        return read(buffer, 0, buffer.length);
1✔
54
    }
55

56
    @Override
57
    public int read(byte[] buffer, int offset, int length) throws IOException {
58
        // Just in case we get headers with empty payloads:
59
        while (0 == header.getPayloadLength()) {
1✔
60
            if (!readHeader()) {
1✔
61
                return -1;
1✔
62
            }
63
        }
64
        if (header.getOpCode() == OpCode.CLOSE) {
1✔
65
            // Ignore the websocket close message body:
UNCOV
66
            in.skip(header.getPayloadLength());
×
UNCOV
67
            return -1;
×
68
        }
69
        final long headerPayloadLength = header.getPayloadLength();
1✔
70
        final int payloadLength = Math.min(length, headerPayloadLength > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)headerPayloadLength);
1✔
71
        length = in.read(buffer, offset, payloadLength);
1✔
72
        if (-1 == length) {
1✔
73
            return length;
×
74
        }
75
        return header.filterPayload(buffer, offset, length);
1✔
76
    }
77

78
    @Override
79
    public int read() throws IOException {
80
        int result = read(oneByte, 0, 1);
1✔
81
        if (-1 == result) {
1✔
82
            return result;
×
83
        }
84
        return oneByte[0];
1✔
85
    }
86

87
    private boolean readHeader() throws IOException {
88
        int len = 0;
1✔
89
        while (len < 2) {
1✔
90
            int result = in.read(buffer, len, 2 - len);
1✔
91
            if (result < 0) {
1✔
92
                return false;
1✔
93
            }
94
            len += result;
1✔
95
        }
1✔
96
        int headerSize = WebsocketFrameHeader.size(buffer, 0);
1✔
97
        if (headerSize > 2) {
1✔
98
            while (len < headerSize) {
1✔
99
                int result = in.read(buffer, len, headerSize - len);
1✔
100
                if (result < 0) {
1✔
101
                    return false;
×
102
                }
103
                len += result;
1✔
104
            }
1✔
105
        }
106
        header.write(buffer, 0, headerSize);
1✔
107
        return true;
1✔
108
    }
109
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc