• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2052

09 Jul 2025 07:26PM UTC coverage: 95.573% (-0.09%) from 95.659%
#2052

push

github

web-flow
Merge pull request #1351 from jitinsharma/fast_fallback_algorithm

Implement Fast fallback algorithm in SocketDataPort

34 of 41 new or added lines in 3 files covered. (82.93%)

5 existing lines in 2 files now uncovered.

11831 of 12379 relevant lines covered (95.57%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.85
/src/main/java/io/nats/client/impl/SocketDataPort.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Options;
17
import io.nats.client.support.NatsUri;
18
import io.nats.client.support.WebSocket;
19

20
import javax.net.ssl.SSLContext;
21
import javax.net.ssl.SSLSocket;
22
import javax.net.ssl.SSLSocketFactory;
23
import java.io.IOException;
24
import java.io.InputStream;
25
import java.io.OutputStream;
26
import java.net.*;
27
import java.time.Duration;
28
import java.util.ArrayList;
29
import java.util.Arrays;
30
import java.util.List;
31
import java.util.concurrent.*;
32

33
import static io.nats.client.support.NatsConstants.SECURE_WEBSOCKET_PROTOCOL;
34

35
/**
36
 * This class is not thread-safe.  Caller must ensure thread safety.
37
 */
38
public class SocketDataPort implements DataPort {
1✔
39

40
    protected NatsConnection connection;
41
    protected String host;
42
    protected int port;
43
    protected Socket socket;
44
    protected boolean isSecure = false;
1✔
45
    protected int soLinger;
46

47
    protected InputStream in;
48
    protected OutputStream out;
49

50
    @Override
51
    public void afterConstruct(Options options) {
52
        soLinger = options.getSocketSoLinger();
1✔
53
    }
1✔
54

55
    @Override
56
    public void connect(String serverURI, NatsConnection conn, long timeoutNanos) throws IOException {
57
        try {
58
            connect(conn, new NatsUri(serverURI), timeoutNanos);
1✔
59
        }
60
        catch (URISyntaxException e) {
×
61
            throw new IOException(e);
×
62
        }
1✔
63
    }
1✔
64

65
    @Override
66
    public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws IOException {
67
        connection = conn;
1✔
68
        Options options = connection.getOptions();
1✔
69
        long timeout = timeoutNanos / 1_000_000; // convert to millis
1✔
70
        host = nuri.getHost();
1✔
71
        port = nuri.getPort();
1✔
72

73
        try {
74
            if (options.isEnableFastFallback()) {
1✔
75
                socket = connectToFastestIp(options, host, port, (int) timeout);
1✔
76
            } else {
77
                socket = createSocket(options);
1✔
78
                socket.connect(new InetSocketAddress(host, port), (int) timeout);
1✔
79
            }
80

81
            if (soLinger > -1) {
1✔
82
                socket.setSoLinger(true, soLinger);
1✔
83
            }
84
            if (options.getSocketReadTimeoutMillis() > 0) {
1✔
85
                socket.setSoTimeout(options.getSocketReadTimeoutMillis());
×
86
            }
87

88
            if (isWebsocketScheme(nuri.getScheme())) {
1✔
89
                if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
1✔
90
                    upgradeToSecure();
1✔
91
                }
92
                try {
93
                    socket = new WebSocket(socket, host, options.getHttpRequestInterceptors());
1✔
94
                } catch (Exception ex) {
1✔
95
                    socket.close();
1✔
96
                    throw ex;
1✔
97
                }
1✔
98
            }
99
            in = socket.getInputStream();
1✔
100
            out = socket.getOutputStream();
1✔
101
        }
102
        catch (Exception e) {
1✔
103
            try { socket.close(); } catch (Exception ignore) {}
1✔
104
            socket = null;
1✔
105
            if (e instanceof IOException) {
1✔
106
                throw e;
1✔
107
            }
108
            throw new IOException(e);
1✔
109
        }
1✔
110
    }
1✔
111

112
    /**
113
     * Upgrade the port to SSL. If it is already secured, this is a no-op.
114
     * If the data port type doesn't support SSL it should throw an exception.
115
     */
116
    public void upgradeToSecure() throws IOException {
117
        Options options = connection.getOptions();
1✔
118
        SSLContext context = options.getSslContext();
1✔
119

120
        SSLSocketFactory factory = context.getSocketFactory();
1✔
121
        Duration timeout = options.getConnectionTimeout();
1✔
122

123
        SSLSocket sslSocket = (SSLSocket) factory.createSocket(socket, host, port, true);
1✔
124
        sslSocket.setUseClientMode(true);
1✔
125

126
        final CompletableFuture<Void> waitForHandshake = new CompletableFuture<>();
1✔
127

128
        sslSocket.addHandshakeCompletedListener((evt) -> {
1✔
129
            waitForHandshake.complete(null);
1✔
130
        });
1✔
131

132
        sslSocket.startHandshake();
1✔
133

134
        try {
135
            waitForHandshake.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
136
        } catch (Exception ex) {
×
137
            connection.handleCommunicationIssue(ex);
×
138
            return;
×
139
        }
1✔
140

141
        socket = sslSocket;
1✔
142
        in = sslSocket.getInputStream();
1✔
143
        out = sslSocket.getOutputStream();
1✔
144
        isSecure = true;
1✔
145
    }
1✔
146

147
    public int read(byte[] dst, int off, int len) throws IOException {
148
        return in.read(dst, off, len);
1✔
149
    }
150

151
    public void write(byte[] src, int toWrite) throws IOException {
152
        out.write(src, 0, toWrite);
1✔
153
    }
1✔
154

155
    public void shutdownInput() throws IOException {
156
        // cannot call shutdownInput on sslSocket
157
        if (!isSecure) {
1✔
158
            socket.shutdownInput();
1✔
159
        }
160
    }
1✔
161

162
    public void close() throws IOException {
163
        socket.close();
1✔
164
    }
1✔
165

166
    @Override
167
    public void forceClose() throws IOException {
168
        try {
169
            // If we are being asked to force close, there is no need to linger.
170
            socket.setSoLinger(true, 0);
1✔
171
        }
172
        catch (SocketException e) {
1✔
173
            // don't want to fail if I couldn't set linger
174
        }
1✔
175
        close();
1✔
176
    }
1✔
177

178
    public void flush() throws IOException {
179
        out.flush();
1✔
180
    }
1✔
181

182
    protected static boolean isWebsocketScheme(String scheme) {
183
        return "ws".equalsIgnoreCase(scheme) ||
1✔
184
            "wss".equalsIgnoreCase(scheme);
1✔
185
    }
186

187
    /**
188
     * Implements the "Happy Eyeballs" algorithm as described in RFC 6555,
189
     * which attempts to connect to multiple IP addresses in parallel to reduce
190
     * connection setup delays.
191
     */
192
    private Socket connectToFastestIp(Options options, String hostname, int port,
193
                                      int timeoutMillis) throws IOException {
194
        // Get all IP addresses for the hostname
195
        List<InetAddress> ips = Arrays.asList(InetAddress.getAllByName(hostname));
1✔
196

197
        ExecutorService executor = options.getExecutor();
1✔
198
        long CONNECT_DELAY_MILLIS = 250;
1✔
199
        // Create connection tasks for each address
200
        // with delays for each address (0ms, 250ms, 500ms, ...)
201
        List<Callable<Socket>> connectionTasks = new ArrayList<>();
1✔
202

203
        for (int i = 0; i < ips.size(); i++) {
1✔
204
            final InetAddress ip = ips.get(i);
1✔
205
            final int delayMillis = i * (int) CONNECT_DELAY_MILLIS;
1✔
206

207
            connectionTasks.add(() -> {
1✔
208
                if (delayMillis > 0) {
1✔
209
                    try {
NEW
210
                        Thread.sleep(delayMillis);
×
211
                    } catch (InterruptedException e) {
1✔
212
                        Thread.currentThread().interrupt();
1✔
NEW
213
                    }
×
214
                }
215

216
                Socket socket = createSocket(options);
1✔
217
                socket.connect(new InetSocketAddress(ip, port), timeoutMillis);
1✔
218
                return socket;
1✔
219
            });
220
        }
221

222
        try {
223
            // Use invokeAny to return the first successful connection and cancel other tasks
224
            return executor.invokeAny(connectionTasks);
1✔
NEW
225
        } catch (InterruptedException e) {
×
NEW
226
            Thread.currentThread().interrupt();
×
NEW
227
        } catch (ExecutionException ignored) {
×
NEW
228
        }
×
229
        // Could not connect to any IP address
NEW
230
        throw new IOException("No responsive IP found for " + hostname);
×
231
    }
232

233
    private Socket createSocket(Options options) throws SocketException {
234
        Socket socket;
235
        if (options.getProxy() != null) {
1✔
236
            socket = new Socket(options.getProxy());
1✔
237
        } else {
238
            socket = new Socket();
1✔
239
        }
240
        socket.setTcpNoDelay(true);
1✔
241
        socket.setReceiveBufferSize(2 * 1024 * 1024);
1✔
242
        socket.setSendBufferSize(2 * 1024 * 1024);
1✔
243
        return socket;
1✔
244
    }
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc