• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2206

23 Sep 2025 06:36PM UTC coverage: 95.051% (+0.008%) from 95.043%
#2206

push

github

web-flow
Merge pull request #1430 from nats-io/write-timeout-sim

Update SocketDataPortBlockSimulator

11965 of 12588 relevant lines covered (95.05%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.16
/src/main/java/io/nats/client/impl/SocketDataPort.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Options;
17
import io.nats.client.support.NatsInetAddress;
18
import io.nats.client.support.NatsUri;
19
import io.nats.client.support.WebSocket;
20
import org.jspecify.annotations.NonNull;
21

22
import javax.net.ssl.HandshakeCompletedListener;
23
import javax.net.ssl.SSLContext;
24
import javax.net.ssl.SSLSocket;
25
import javax.net.ssl.SSLSocketFactory;
26
import java.io.IOException;
27
import java.io.InputStream;
28
import java.io.OutputStream;
29
import java.net.*;
30
import java.time.Duration;
31
import java.util.ArrayList;
32
import java.util.Arrays;
33
import java.util.List;
34
import java.util.concurrent.*;
35

36
import static io.nats.client.support.NatsConstants.SECURE_WEBSOCKET_PROTOCOL;
37

38
/**
39
 * This class is not thread-safe.  Caller must ensure thread safety.
40
 */
41
@SuppressWarnings("ClassEscapesDefinedScope") // NatsConnection
42
public class SocketDataPort implements DataPort {
1✔
43

44
    protected NatsConnection connection;
45

46
    protected String host;
47
    protected int port;
48
    protected Socket socket;
49
    protected boolean isSecure = false;
1✔
50
    protected int soLinger;
51

52
    protected InputStream in;
53
    protected OutputStream out;
54

55
    @Override
56
    public void afterConstruct(Options options) {
57
        soLinger = options.getSocketSoLinger();
1✔
58
    }
1✔
59

60
    @Override
61
    public void connect(@NonNull String serverURI, @NonNull NatsConnection conn, long timeoutNanos) throws IOException {
62
        try {
63
            connect(conn, new NatsUri(serverURI), timeoutNanos);
1✔
64
        }
65
        catch (URISyntaxException e) {
×
66
            throw new IOException(e);
×
67
        }
1✔
68
    }
1✔
69

70
    @Override
71
    public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long timeoutNanos) throws IOException {
72
        connection = conn;
1✔
73
        Options options = connection.getOptions();
1✔
74
        long timeout = timeoutNanos / 1_000_000; // convert to millis
1✔
75
        host = nuri.getHost();
1✔
76
        port = nuri.getPort();
1✔
77

78
        try {
79
            if (options.isEnableFastFallback()) {
1✔
80
                socket = connectToFastestIp(options, host, port, (int) timeout);
1✔
81
            } else {
82
                socket = createSocket(options);
1✔
83
                socket.connect(new InetSocketAddress(host, port), (int) timeout);
1✔
84
            }
85

86
            if (soLinger > -1) {
1✔
87
                socket.setSoLinger(true, soLinger);
×
88
            }
89
            if (options.getSocketReadTimeoutMillis() > 0) {
1✔
90
                socket.setSoTimeout(options.getSocketReadTimeoutMillis());
×
91
            }
92

93
            if (isWebsocketScheme(nuri.getScheme())) {
1✔
94
                if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
1✔
95
                    upgradeToSecure();
1✔
96
                }
97
                try {
98
                    socket = new WebSocket(socket, host, options.getHttpRequestInterceptors());
1✔
99
                } catch (Exception ex) {
1✔
100
                    socket.close();
1✔
101
                    throw ex;
1✔
102
                }
1✔
103
            }
104
            in = socket.getInputStream();
1✔
105
            out = socket.getOutputStream();
1✔
106
        }
107
        catch (Exception e) {
1✔
108
            if (socket != null) {
1✔
109
                try { socket.close(); } catch (Exception ignore) {}
1✔
110
            }
111
            socket = null;
1✔
112
            if (e instanceof IOException) {
1✔
113
                throw e;
1✔
114
            }
115
            throw new IOException(e);
1✔
116
        }
1✔
117
    }
1✔
118

119
    /**
120
     * Upgrade the port to SSL. If it is already secured, this is a no-op.
121
     * If the data port type doesn't support SSL it should throw an exception.
122
     */
123
    public void upgradeToSecure() throws IOException {
124
        Options options = connection.getOptions();
1✔
125
        SSLContext context = options.getSslContext();
1✔
126

127
        SSLSocketFactory factory = context.getSocketFactory();
1✔
128
        Duration timeout = options.getConnectionTimeout();
1✔
129

130
        SSLSocket sslSocket = (SSLSocket) factory.createSocket(socket, host, port, true);
1✔
131
        sslSocket.setUseClientMode(true);
1✔
132

133
        final CompletableFuture<Void> waitForHandshake = new CompletableFuture<>();
1✔
134
        final HandshakeCompletedListener hcl = (evt) -> waitForHandshake.complete(null);
1✔
135

136
        sslSocket.addHandshakeCompletedListener(hcl);
1✔
137
        sslSocket.startHandshake();
1✔
138

139
        try {
140
            waitForHandshake.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
141
        } catch (Exception ex) {
×
142
            connection.handleCommunicationIssue(ex);
×
143
            return;
×
144
        }
145
        finally {
146
            sslSocket.removeHandshakeCompletedListener(hcl);
1✔
147
        }
148

149
        socket = sslSocket;
1✔
150
        in = sslSocket.getInputStream();
1✔
151
        out = sslSocket.getOutputStream();
1✔
152
        isSecure = true;
1✔
153
    }
1✔
154

155
    public int read(byte[] dst, int off, int len) throws IOException {
156
        return in.read(dst, off, len);
1✔
157
    }
158

159
    public void write(byte[] src, int toWrite) throws IOException {
160
        out.write(src, 0, toWrite);
1✔
161
    }
1✔
162

163
    public void shutdownInput() throws IOException {
164
        // cannot call shutdownInput on sslSocket
165
        if (!isSecure && socket != null) {
1✔
166
            socket.shutdownInput();
1✔
167
        }
168
    }
1✔
169

170
    public void close() throws IOException {
171
        if (socket != null) {
1✔
172
            socket.close();
1✔
173
        }
174
    }
1✔
175

176
    @Override
177
    public void forceClose() throws IOException {
178
        // socket can technically be null, like between states
179
        // practically it never will be, but guard it anyway
180
        if (socket != null) {
1✔
181
            try {
182
                // If we are being asked to force close, there is no need to linger.
183
                socket.setSoLinger(true, 0);
1✔
184
            }
185
            catch (SocketException e) {
1✔
186
                // don't want to fail if I couldn't set linger
187
            }
1✔
188
            close();
1✔
189
        }
190
    }
1✔
191

192
    public void flush() throws IOException {
193
        out.flush();
1✔
194
    }
1✔
195

196
    protected static boolean isWebsocketScheme(String scheme) {
197
        return "ws".equalsIgnoreCase(scheme) ||
1✔
198
            "wss".equalsIgnoreCase(scheme);
1✔
199
    }
200

201
    /**
202
     * Implements the "Happy Eyeballs" algorithm as described in RFC 6555,
203
     * which attempts to connect to multiple IP addresses in parallel to reduce
204
     * connection setup delays.
205
     */
206
    private Socket connectToFastestIp(Options options, String hostname, int port,
207
                                      int timeoutMillis) throws IOException {
208
        // Get all IP addresses for the hostname
209
        List<InetAddress> ips = Arrays.asList(NatsInetAddress.getAllByName(hostname));
1✔
210

211
        ExecutorService executor = options.getExecutor();
1✔
212
        long CONNECT_DELAY_MILLIS = 250;
1✔
213
        // Create connection tasks for each address
214
        // with delays for each address (0ms, 250ms, 500ms, ...)
215
        List<Callable<Socket>> connectionTasks = new ArrayList<>();
1✔
216

217
        for (int i = 0; i < ips.size(); i++) {
1✔
218
            final InetAddress ip = ips.get(i);
1✔
219
            final int delayMillis = i * (int) CONNECT_DELAY_MILLIS;
1✔
220

221
            connectionTasks.add(() -> {
1✔
222
                if (delayMillis > 0) {
1✔
223
                    try {
224
                        Thread.sleep(delayMillis);
×
225
                    } catch (InterruptedException e) {
1✔
226
                        Thread.currentThread().interrupt();
1✔
227
                    }
×
228
                }
229

230
                Socket socket = createSocket(options);
1✔
231
                socket.connect(new InetSocketAddress(ip, port), timeoutMillis);
1✔
232
                return socket;
1✔
233
            });
234
        }
235

236
        try {
237
            // Use invokeAny to return the first successful connection and cancel other tasks
238
            return executor.invokeAny(connectionTasks);
1✔
239
        } catch (InterruptedException e) {
×
240
            Thread.currentThread().interrupt();
×
241
        } catch (ExecutionException ignored) {
×
242
        }
×
243
        // Could not connect to any IP address
244
        throw new IOException("No responsive IP found for " + hostname);
×
245
    }
246

247
    private Socket createSocket(Options options) throws SocketException {
248
        Socket socket;
249
        if (options.getProxy() != null) {
1✔
250
            socket = new Socket(options.getProxy());
1✔
251
        } else {
252
            socket = new Socket();
1✔
253
        }
254
        socket.setTcpNoDelay(true);
1✔
255
        socket.setReceiveBufferSize(2 * 1024 * 1024);
1✔
256
        socket.setSendBufferSize(2 * 1024 * 1024);
1✔
257
        return socket;
1✔
258
    }
259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc