• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2231

26 Sep 2025 04:51PM UTC coverage: 95.521% (-0.001%) from 95.522%
#2231

push

github

web-flow
Merge pull request #1437 from nats-io/fix_size_in_bytes

Properly return size in bytes

3 of 7 new or added lines in 2 files covered. (42.86%)

6 existing lines in 2 files now uncovered.

12155 of 12725 relevant lines covered (95.52%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/src/main/java/io/nats/client/impl/SocketDataPort.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Options;
17
import io.nats.client.support.NatsInetAddress;
18
import io.nats.client.support.NatsUri;
19
import io.nats.client.support.WebSocket;
20
import org.jspecify.annotations.NonNull;
21

22
import javax.net.ssl.HandshakeCompletedListener;
23
import javax.net.ssl.SSLContext;
24
import javax.net.ssl.SSLSocket;
25
import javax.net.ssl.SSLSocketFactory;
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.OutputStream;
29
import java.net.*;
30
import java.time.Duration;
31
import java.util.ArrayList;
32
import java.util.Arrays;
33
import java.util.List;
34
import java.util.concurrent.*;
35

36
import static io.nats.client.support.NatsConstants.SECURE_WEBSOCKET_PROTOCOL;
37

38
/**
39
 * This class is not thread-safe.  Caller must ensure thread safety.
40
 */
41
@SuppressWarnings("ClassEscapesDefinedScope") // NatsConnection
42
public class SocketDataPort implements DataPort {
1✔
43

44
    protected NatsConnection connection;
45

46
    protected String host;
47
    protected int port;
48
    protected Socket socket;
49
    protected boolean isSecure = false;
1✔
50

51
    protected InputStream in;
52
    protected OutputStream out;
53

54
    @Override
55
    public void afterConstruct(Options options) {
56
    }
1✔
57

58
    @Override
59
    public void connect(@NonNull String serverURI, @NonNull NatsConnection conn, long timeoutNanos) throws IOException {
60
        try {
61
            connect(conn, new NatsUri(serverURI), timeoutNanos);
1✔
62
        }
63
        catch (URISyntaxException e) {
×
64
            throw new IOException(e);
×
65
        }
1✔
66
    }
1✔
67

68
    @Override
69
    public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long timeoutNanos) throws IOException {
70
        connection = conn;
1✔
71
        Options options = connection.getOptions();
1✔
72
        long timeout = timeoutNanos / 1_000_000; // convert to millis
1✔
73
        host = nuri.getHost();
1✔
74
        port = nuri.getPort();
1✔
75

76
        try {
77
            if (options.isEnableFastFallback()) {
1✔
78
                socket = connectToFastestIp(options, host, port, (int) timeout);
1✔
79
            } else {
80
                socket = createSocket(options);
1✔
81
                socket.connect(new InetSocketAddress(host, port), (int) timeout);
1✔
82
            }
83

84
            if (options.getSocketReadTimeoutMillis() > 0) {
1✔
85
                socket.setSoTimeout(options.getSocketReadTimeoutMillis());
×
86
            }
87

88
            if (options.getSocketSoLinger() > 0) {
1✔
NEW
89
                socket.setSoLinger(true, options.getSocketSoLinger());
×
90
            }
91

92
            if (options.getReceiveBufferSize() > 0) {
1✔
NEW
93
                socket.setReceiveBufferSize(options.getReceiveBufferSize());
×
94
            }
95

96
            if (options.getSendBufferSize() > 0) {
1✔
NEW
97
                socket.setSendBufferSize(options.getSendBufferSize());
×
98
            }
99

100
            if (isWebsocketScheme(nuri.getScheme())) {
1✔
101
                if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
1✔
102
                    upgradeToSecure();
1✔
103
                }
104
                try {
105
                    socket = new WebSocket(socket, host, options.getHttpRequestInterceptors());
1✔
106
                } catch (Exception ex) {
1✔
107
                    socket.close();
1✔
108
                    throw ex;
1✔
109
                }
1✔
110
            }
111
            in = socket.getInputStream();
1✔
112
            out = socket.getOutputStream();
1✔
113
        }
114
        catch (Exception e) {
1✔
115
            if (socket != null) {
1✔
116
                try { socket.close(); } catch (Exception ignore) {}
1✔
117
            }
118
            socket = null;
1✔
119
            if (e instanceof IOException) {
1✔
120
                throw e;
1✔
121
            }
122
            throw new IOException(e);
1✔
123
        }
1✔
124
    }
1✔
125

126
    /**
127
     * Upgrade the port to SSL. If it is already secured, this is a no-op.
128
     * If the data port type doesn't support SSL it should throw an exception.
129
     */
130
    public void upgradeToSecure() throws IOException {
131
        Options options = connection.getOptions();
1✔
132
        SSLContext context = options.getSslContext();
1✔
133

134
        SSLSocketFactory factory = context.getSocketFactory();
1✔
135
        Duration timeout = options.getConnectionTimeout();
1✔
136

137
        SSLSocket sslSocket = (SSLSocket) factory.createSocket(socket, host, port, true);
1✔
138
        sslSocket.setUseClientMode(true);
1✔
139

140
        final CompletableFuture<Void> waitForHandshake = new CompletableFuture<>();
1✔
141
        final HandshakeCompletedListener hcl = (evt) -> waitForHandshake.complete(null);
1✔
142

143
        sslSocket.addHandshakeCompletedListener(hcl);
1✔
144
        sslSocket.startHandshake();
1✔
145

146
        try {
147
            waitForHandshake.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
148
        } catch (Exception ex) {
×
149
            connection.handleCommunicationIssue(ex);
×
150
            return;
×
151
        }
152
        finally {
153
            sslSocket.removeHandshakeCompletedListener(hcl);
1✔
154
        }
155

156
        socket = sslSocket;
1✔
157
        in = sslSocket.getInputStream();
1✔
158
        out = sslSocket.getOutputStream();
1✔
159
        isSecure = true;
1✔
160
    }
1✔
161

162
    public int read(byte[] dst, int off, int len) throws IOException {
163
        return in.read(dst, off, len);
1✔
164
    }
165

166
    public void write(byte[] src, int toWrite) throws IOException {
167
        out.write(src, 0, toWrite);
1✔
168
    }
1✔
169

170
    public void shutdownInput() throws IOException {
171
        // cannot call shutdownInput on sslSocket
172
        if (!isSecure && socket != null) {
1✔
173
            socket.shutdownInput();
1✔
174
        }
175
    }
1✔
176

177
    public void close() throws IOException {
178
        if (socket != null) {
1✔
179
            socket.close();
1✔
180
        }
181
    }
1✔
182

183
    @Override
184
    public void forceClose() throws IOException {
185
        // socket can technically be null, like between states
186
        // practically it never will be, but guard it anyway
187
        if (socket != null) {
1✔
188
            try {
189
                // If we are being asked to force close, there is no need to linger.
190
                socket.setSoLinger(true, 0);
1✔
191
            }
192
            catch (SocketException e) {
1✔
193
                // don't want to fail if I couldn't set linger
194
            }
1✔
195
            close();
1✔
196
        }
197
    }
1✔
198

199
    public void flush() throws IOException {
200
        out.flush();
1✔
201
    }
1✔
202

203
    protected static boolean isWebsocketScheme(String scheme) {
204
        return "ws".equalsIgnoreCase(scheme) ||
1✔
205
            "wss".equalsIgnoreCase(scheme);
1✔
206
    }
207

208
    /**
209
     * Implements the "Happy Eyeballs" algorithm as described in RFC 6555,
210
     * which attempts to connect to multiple IP addresses in parallel to reduce
211
     * connection setup delays.
212
     */
213
    private Socket connectToFastestIp(Options options, String hostname, int port,
214
                                      int timeoutMillis) throws IOException {
215
        // Get all IP addresses for the hostname
216
        List<InetAddress> ips = Arrays.asList(NatsInetAddress.getAllByName(hostname));
1✔
217

218
        ExecutorService executor = options.getExecutor();
1✔
219
        long CONNECT_DELAY_MILLIS = 250;
1✔
220
        // Create connection tasks for each address
221
        // with delays for each address (0ms, 250ms, 500ms, ...)
222
        List<Callable<Socket>> connectionTasks = new ArrayList<>();
1✔
223

224
        for (int i = 0; i < ips.size(); i++) {
1✔
225
            final InetAddress ip = ips.get(i);
1✔
226
            final int delayMillis = i * (int) CONNECT_DELAY_MILLIS;
1✔
227

228
            connectionTasks.add(() -> {
1✔
229
                if (delayMillis > 0) {
1✔
230
                    try {
231
                        Thread.sleep(delayMillis);
×
232
                    } catch (InterruptedException e) {
1✔
233
                        Thread.currentThread().interrupt();
1✔
234
                    }
×
235
                }
236

237
                Socket socket = createSocket(options);
1✔
238
                socket.connect(new InetSocketAddress(ip, port), timeoutMillis);
1✔
239
                return socket;
1✔
240
            });
241
        }
242

243
        try {
244
            // Use invokeAny to return the first successful connection and cancel other tasks
245
            return executor.invokeAny(connectionTasks);
1✔
246
        } catch (InterruptedException e) {
×
247
            Thread.currentThread().interrupt();
×
248
        } catch (ExecutionException ignored) {
×
249
        }
×
250
        // Could not connect to any IP address
251
        throw new IOException("No responsive IP found for " + hostname);
×
252
    }
253

254
    private Socket createSocket(Options options) throws SocketException {
255
        Socket socket;
256
        if (options.getProxy() != null) {
1✔
257
            socket = new Socket(options.getProxy());
1✔
258
        } else {
259
            socket = new Socket();
1✔
260
        }
261
        socket.setTcpNoDelay(true);
1✔
262
        socket.setReceiveBufferSize(2 * 1024 * 1024);
1✔
263
        socket.setSendBufferSize(2 * 1024 * 1024);
1✔
264
        return socket;
1✔
265
    }
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc