• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2218

25 Sep 2025 01:31PM UTC coverage: 95.571% (-0.02%) from 95.586%
#2218

push

github

web-flow
Merge pull request #1432 from nats-io/socket-danger

Options to set underlying socket configuration of SO_SNDBUF and SO_RCVBUF

21 of 28 new or added lines in 2 files covered. (75.0%)

3 existing lines in 2 files now uncovered.

12106 of 12667 relevant lines covered (95.57%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.09
/src/main/java/io/nats/client/impl/SocketDataPort.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Options;
17
import io.nats.client.support.NatsInetAddress;
18
import io.nats.client.support.NatsUri;
19
import io.nats.client.support.WebSocket;
20
import org.jspecify.annotations.NonNull;
21

22
import javax.net.ssl.HandshakeCompletedListener;
23
import javax.net.ssl.SSLContext;
24
import javax.net.ssl.SSLSocket;
25
import javax.net.ssl.SSLSocketFactory;
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.OutputStream;
29
import java.net.*;
30
import java.time.Duration;
31
import java.util.ArrayList;
32
import java.util.Arrays;
33
import java.util.List;
34
import java.util.concurrent.*;
35

36
import static io.nats.client.support.NatsConstants.SECURE_WEBSOCKET_PROTOCOL;
37

38
/**
39
 * This class is not thread-safe.  Caller must ensure thread safety.
40
 */
41
@SuppressWarnings("ClassEscapesDefinedScope") // NatsConnection
42
public class SocketDataPort implements DataPort {
1✔
43

44
    protected NatsConnection connection;
45

46
    protected String host;
47
    protected int port;
48
    protected Socket socket;
49
    protected boolean isSecure = false;
1✔
50
    protected int soLinger;
51
    protected int receiveBufferSize;
52
    protected int sendBufferSize;
53

54
    protected InputStream in;
55
    protected OutputStream out;
56

57
    @Override
58
    public void afterConstruct(Options options) {
59
        soLinger = options.getSocketSoLinger();
1✔
60
        receiveBufferSize = options.getReceiveBufferSize();
1✔
61
        sendBufferSize = options.getSendBufferSize();
1✔
62
    }
1✔
63

64
    @Override
65
    public void connect(@NonNull String serverURI, @NonNull NatsConnection conn, long timeoutNanos) throws IOException {
66
        try {
67
            connect(conn, new NatsUri(serverURI), timeoutNanos);
1✔
68
        }
69
        catch (URISyntaxException e) {
×
70
            throw new IOException(e);
×
71
        }
1✔
72
    }
1✔
73

74
    @Override
75
    public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long timeoutNanos) throws IOException {
76
        connection = conn;
1✔
77
        Options options = connection.getOptions();
1✔
78
        long timeout = timeoutNanos / 1_000_000; // convert to millis
1✔
79
        host = nuri.getHost();
1✔
80
        port = nuri.getPort();
1✔
81

82
        try {
83
            if (options.isEnableFastFallback()) {
1✔
84
                socket = connectToFastestIp(options, host, port, (int) timeout);
1✔
85
            } else {
86
                socket = createSocket(options);
1✔
87
                socket.connect(new InetSocketAddress(host, port), (int) timeout);
1✔
88
            }
89
            if (options.getSocketReadTimeoutMillis() > 0) {
1✔
NEW
90
                socket.setSoTimeout(options.getSocketReadTimeoutMillis());
×
91
            }
92

93
            if (soLinger > 0) {
1✔
94
                socket.setSoLinger(true, soLinger);
×
95
            }
96

97
            if (receiveBufferSize > 0) {
1✔
NEW
98
                socket.setReceiveBufferSize(receiveBufferSize);
×
99
            }
100

101
            if (sendBufferSize > 0) {
1✔
NEW
102
                socket.setSendBufferSize(sendBufferSize);
×
103
            }
104

105
            if (isWebsocketScheme(nuri.getScheme())) {
1✔
106
                if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
1✔
107
                    upgradeToSecure();
1✔
108
                }
109
                try {
110
                    socket = new WebSocket(socket, host, options.getHttpRequestInterceptors());
1✔
111
                } catch (Exception ex) {
1✔
112
                    socket.close();
1✔
113
                    throw ex;
1✔
114
                }
1✔
115
            }
116
            in = socket.getInputStream();
1✔
117
            out = socket.getOutputStream();
1✔
118
        }
119
        catch (Exception e) {
1✔
120
            if (socket != null) {
1✔
121
                try { socket.close(); } catch (Exception ignore) {}
1✔
122
            }
123
            socket = null;
1✔
124
            if (e instanceof IOException) {
1✔
125
                throw e;
1✔
126
            }
127
            throw new IOException(e);
1✔
128
        }
1✔
129
    }
1✔
130

131
    /**
132
     * Upgrade the port to SSL. If it is already secured, this is a no-op.
133
     * If the data port type doesn't support SSL it should throw an exception.
134
     */
135
    public void upgradeToSecure() throws IOException {
136
        Options options = connection.getOptions();
1✔
137
        SSLContext context = options.getSslContext();
1✔
138

139
        SSLSocketFactory factory = context.getSocketFactory();
1✔
140
        Duration timeout = options.getConnectionTimeout();
1✔
141

142
        SSLSocket sslSocket = (SSLSocket) factory.createSocket(socket, host, port, true);
1✔
143
        sslSocket.setUseClientMode(true);
1✔
144

145
        final CompletableFuture<Void> waitForHandshake = new CompletableFuture<>();
1✔
146
        final HandshakeCompletedListener hcl = (evt) -> waitForHandshake.complete(null);
1✔
147

148
        sslSocket.addHandshakeCompletedListener(hcl);
1✔
149
        sslSocket.startHandshake();
1✔
150

151
        try {
152
            waitForHandshake.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
153
        } catch (Exception ex) {
×
154
            connection.handleCommunicationIssue(ex);
×
155
            return;
×
156
        }
157
        finally {
158
            sslSocket.removeHandshakeCompletedListener(hcl);
1✔
159
        }
160

161
        socket = sslSocket;
1✔
162
        in = sslSocket.getInputStream();
1✔
163
        out = sslSocket.getOutputStream();
1✔
164
        isSecure = true;
1✔
165
    }
1✔
166

167
    public int read(byte[] dst, int off, int len) throws IOException {
168
        return in.read(dst, off, len);
1✔
169
    }
170

171
    public void write(byte[] src, int toWrite) throws IOException {
172
        out.write(src, 0, toWrite);
1✔
173
    }
1✔
174

175
    public void shutdownInput() throws IOException {
176
        // cannot call shutdownInput on sslSocket
177
        if (!isSecure && socket != null) {
1✔
178
            socket.shutdownInput();
1✔
179
        }
180
    }
1✔
181

182
    public void close() throws IOException {
183
        if (socket != null) {
1✔
184
            socket.close();
1✔
185
        }
186
    }
1✔
187

188
    @Override
189
    public void forceClose() throws IOException {
190
        // socket can technically be null, like between states
191
        // practically it never will be, but guard it anyway
192
        if (socket != null) {
1✔
193
            try {
194
                // If we are being asked to force close, there is no need to linger.
195
                socket.setSoLinger(true, 0);
1✔
196
            }
197
            catch (SocketException e) {
1✔
198
                // don't want to fail if I couldn't set linger
199
            }
1✔
200
            close();
1✔
201
        }
202
    }
1✔
203

204
    public void flush() throws IOException {
205
        out.flush();
1✔
206
    }
1✔
207

208
    protected static boolean isWebsocketScheme(String scheme) {
209
        return "ws".equalsIgnoreCase(scheme) ||
1✔
210
            "wss".equalsIgnoreCase(scheme);
1✔
211
    }
212

213
    /**
214
     * Implements the "Happy Eyeballs" algorithm as described in RFC 6555,
215
     * which attempts to connect to multiple IP addresses in parallel to reduce
216
     * connection setup delays.
217
     */
218
    private Socket connectToFastestIp(Options options, String hostname, int port,
219
                                      int timeoutMillis) throws IOException {
220
        // Get all IP addresses for the hostname
221
        List<InetAddress> ips = Arrays.asList(NatsInetAddress.getAllByName(hostname));
1✔
222

223
        ExecutorService executor = options.getExecutor();
1✔
224
        long CONNECT_DELAY_MILLIS = 250;
1✔
225
        // Create connection tasks for each address
226
        // with delays for each address (0ms, 250ms, 500ms, ...)
227
        List<Callable<Socket>> connectionTasks = new ArrayList<>();
1✔
228

229
        for (int i = 0; i < ips.size(); i++) {
1✔
230
            final InetAddress ip = ips.get(i);
1✔
231
            final int delayMillis = i * (int) CONNECT_DELAY_MILLIS;
1✔
232

233
            connectionTasks.add(() -> {
1✔
234
                if (delayMillis > 0) {
1✔
235
                    try {
236
                        Thread.sleep(delayMillis);
×
237
                    } catch (InterruptedException e) {
1✔
238
                        Thread.currentThread().interrupt();
1✔
239
                    }
×
240
                }
241

242
                Socket socket = createSocket(options);
1✔
243
                socket.connect(new InetSocketAddress(ip, port), timeoutMillis);
1✔
244
                return socket;
1✔
245
            });
246
        }
247

248
        try {
249
            // Use invokeAny to return the first successful connection and cancel other tasks
250
            return executor.invokeAny(connectionTasks);
1✔
251
        } catch (InterruptedException e) {
×
252
            Thread.currentThread().interrupt();
×
253
        } catch (ExecutionException ignored) {
×
254
        }
×
255
        // Could not connect to any IP address
256
        throw new IOException("No responsive IP found for " + hostname);
×
257
    }
258

259
    private Socket createSocket(Options options) throws SocketException {
260
        Socket socket;
261
        if (options.getProxy() != null) {
1✔
262
            socket = new Socket(options.getProxy());
1✔
263
        } else {
264
            socket = new Socket();
1✔
265
        }
266
        socket.setTcpNoDelay(true);
1✔
267
        socket.setReceiveBufferSize(2 * 1024 * 1024);
1✔
268
        socket.setSendBufferSize(2 * 1024 * 1024);
1✔
269
        return socket;
1✔
270
    }
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc