• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2101

12 Aug 2025 11:26AM UTC coverage: 95.457% (+0.02%) from 95.433%
#2101

push

github

web-flow
Merge pull request #1387 from nats-io/info-nullability

Ensuring nullability contracts

92 of 92 new or added lines in 10 files covered. (100.0%)

108 existing lines in 12 files now uncovered.

11913 of 12480 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.73
/src/main/java/io/nats/client/impl/NatsServerPool.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Options;
17
import io.nats.client.ServerPool;
18
import io.nats.client.support.NatsConstants;
19
import io.nats.client.support.NatsInetAddress;
20
import io.nats.client.support.NatsUri;
21
import org.jspecify.annotations.NonNull;
22
import org.jspecify.annotations.Nullable;
23

24
import java.net.InetAddress;
25
import java.net.URISyntaxException;
26
import java.net.UnknownHostException;
27
import java.util.ArrayList;
28
import java.util.Collections;
29
import java.util.List;
30
import java.util.concurrent.ThreadLocalRandom;
31
import java.util.concurrent.locks.ReentrantLock;
32

33
public class NatsServerPool implements ServerPool {
34

35
    protected final ReentrantLock listLock;
36
    protected List<ServerPoolEntry> entryList;
37
    protected Options options;
38
    protected int maxConnectAttempts;
39
    protected boolean hasSecureServer;
40
    protected NatsUri lastConnected;
41
    protected String defaultScheme;
42

43
    public NatsServerPool() {
1✔
44
        listLock = new ReentrantLock();
1✔
45
        entryList = new ArrayList<>(); // this gets updated occasionally
1✔
46
        options = Options.builder().build(); // this will get updated when initialize is called
1✔
47
    }
1✔
48

49
    /**
50
     * {@inheritDoc}
51
     */
52
    public void initialize(@NonNull Options opts) {
53
        // 1. Hold on to options as we need them for settings
54
        options = opts;
1✔
55

56
        // 2. maxConnectAttempts accounts for the first connect attempt and also reconnect attempts
57
        maxConnectAttempts = options.getMaxReconnect() < 0 ? Integer.MAX_VALUE : options.getMaxReconnect() + 1;
1✔
58

59
        // 3. Add all the bootstrap to the server list and prepare list for next
60
        //    FYI bootstrap will always have at least the default url
61
        listLock.lock();
1✔
62
        try {
63
            for (NatsUri nuri : options.getNatsServerUris()) {
1✔
64
                // 1. If item is not found in the list being built, add to the list
65
                boolean notAlreadyInList = true;
1✔
66
                for (ServerPoolEntry entry : entryList) {
1✔
67
                    if (nuri.equivalent(entry.nuri)) {
1✔
UNCOV
68
                        notAlreadyInList = false;
×
UNCOV
69
                        break;
×
70
                    }
71
                }
1✔
72
                if (notAlreadyInList) {
1✔
73
                    if (defaultScheme == null && !nuri.getScheme().equals(NatsConstants.NATS_PROTOCOL)) {
1✔
74
                        defaultScheme = nuri.getScheme();
1✔
75
                    }
76
                    entryList.add(new ServerPoolEntry(nuri, false));
1✔
77
                }
78
            }
1✔
79

80
            // 6. prepare list for next
81
            afterListChanged();
1✔
82
        }
83
        finally {
84
            listLock.unlock();
1✔
85
        }
86
    }
1✔
87

88
    /**
89
     * {@inheritDoc}
90
     */
91
    @Override
92
    public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredServers) {
93
        // 1. If ignored discovered servers, don't do anything b/c never want
94
        //    anything but the explicit, which is already loaded.
95
        // 2. return false == no new servers discovered
96
        if (options.isIgnoreDiscoveredServers()) {
1✔
97
            return false;
1✔
98
        }
99

100
        listLock.lock();
1✔
101
        try {
102
            // 2. Build a list for discovered
103
            //    - since we'll need the NatsUris later
104
            //    - and to have a list to use to prune removed gossiped servers
105
            List<NatsUri> discovered = new ArrayList<>();
1✔
106
            for (String d : discoveredServers) {
1✔
107
                try {
108
                    discovered.add(new NatsUri(d, defaultScheme));
1✔
UNCOV
109
                } catch (URISyntaxException ignore) {
×
110
                    // should never actually happen
111
                }
1✔
112
            }
1✔
113

114
            // 3. Start a new server list, loading in current order from the current list, and keeping
115
            //    - the last connected
116
            //    - all non-gossiped
117
            //    - any found in the new discovered list
118
            //      - for any new discovered, we also remove them from
119
            //        that list so step there are no dupes for step #4
120
            //      - This also maintains the Srv state of an already known discovered
121
            List<ServerPoolEntry> newEntryList = new ArrayList<>();
1✔
122
            for (ServerPoolEntry entry : entryList) {
1✔
123
                int ix = findEquivalent(discovered, entry.nuri);
1✔
124
                if (ix != -1 || entry.nuri.equals(lastConnected) || !entry.isGossiped) {
1✔
125
                    newEntryList.add(entry);
1✔
126
                    if (ix != -1) {
1✔
127
                        discovered.remove(ix);
1✔
128
                    }
129
                }
130
            }
1✔
131

132
            // 4. Add all left over from the new discovered list
133
            boolean discoveryContainedUnknowns = false;
1✔
134
            if (!discovered.isEmpty()) {
1✔
135
                discoveryContainedUnknowns = true;
1✔
136
                for (NatsUri d : discovered) {
1✔
137
                    newEntryList.add(new ServerPoolEntry(d, true));
1✔
138
                }
1✔
139
            }
140

141
            // 5. replace the list with the new one
142
            entryList = newEntryList;
1✔
143

144
            // 6. prepare list for next
145
            afterListChanged();
1✔
146

147
            // 7.
148
            return discoveryContainedUnknowns;
1✔
149
        }
150
        finally {
151
            listLock.unlock();
1✔
152
        }
153
    }
154

155
    protected void afterListChanged() {
156
        // 1. randomize if needed and allowed
157
        if (entryList.size() > 1 && !options.isNoRandomize()) {
1✔
158
            Collections.shuffle(entryList, ThreadLocalRandom.current());
1✔
159
        }
160

161
        // 2. calculate hasSecureServer and find the index of lastConnected
162
        hasSecureServer = false;
1✔
163
        int lastConnectedIx = -1;
1✔
164
        for (int ix = 0; ix < entryList.size(); ix++) {
1✔
165
            NatsUri nuri = entryList.get(ix).nuri;
1✔
166
            hasSecureServer |= nuri.isSecure();
1✔
167
            if (nuri.equals(lastConnected)) {
1✔
168
                lastConnectedIx = ix;
1✔
169
            }
170
        }
171

172
        // C. put the last connected server at the end of the list
173
        if (lastConnectedIx != -1) {
1✔
174
            entryList.add(entryList.remove(lastConnectedIx));
1✔
175
        }
176
    }
1✔
177

178
    @Override
179
    @Nullable
180
    public NatsUri peekNextServer() {
181
        listLock.lock();
1✔
182
        try {
183
            return entryList.isEmpty() ? null : entryList.get(0).nuri;
1✔
184
        }
185
        finally {
186
            listLock.unlock();
1✔
187
        }
188
    }
189

190
    @Override
191
    @Nullable
192
    public NatsUri nextServer() {
193
        // 0. The list is already managed for qualified by connectFailed
194
        // 1. Get the first item in the list, update it's time, add back to the end of list
195
        listLock.lock();
1✔
196
        try {
197
            if (!entryList.isEmpty()) {
1✔
198
                ServerPoolEntry entry = entryList.remove(0);
1✔
199
                entry.lastAttempt = System.currentTimeMillis();
1✔
200
                entryList.add(entry);
1✔
201
                return entry.nuri;
1✔
202
            }
203
            return null;
1✔
204
        }
205
        finally {
206
            listLock.unlock();
1✔
207
        }
208
    }
209

210
    @Override
211
    @Nullable
212
    public List<String> resolveHostToIps(@NonNull String host) {
213
        // 1. if options.isNoResolveHostnames(), return empty list
214
        if (options.isNoResolveHostnames()) {
1✔
215
            return null;
1✔
216
        }
217

218
        // 2. else, try to resolve the hostname, adding results to list
219
        List<String> results = new ArrayList<>();
1✔
220
        try {
221
            InetAddress[] addresses = NatsInetAddress.getAllByName(host);
1✔
222
            for (InetAddress a : addresses) {
1✔
223
                results.add(a.getHostAddress());
1✔
224
            }
225
        }
226
        catch (UnknownHostException ignore) {
1✔
227
            // A user might have supplied a bad host, but the server shouldn't.
228
            // Either way, nothing much we can do.
229
        }
1✔
230

231
        // 3. no results, return null.
232
        if (results.isEmpty()) {
1✔
233
            return null;
1✔
234
        }
235

236
        // 4. if results has more than 1 and allowed to randomize, shuffle the list
237
        if (results.size() > 1 && !options.isNoRandomize()) {
1✔
238
            Collections.shuffle(results, ThreadLocalRandom.current());
1✔
239
        }
240
        return results;
1✔
241
    }
242

243
    @Override
244
    public void connectSucceeded(@NonNull NatsUri nuri) {
245
        // 1. Work from the end because nextServer moved the one being tried to the end
246
        // 2. If we find the server in the list...
247
        //    2.1. remember it and
248
        //    2.2. reset failed attempts
249
        listLock.lock();
1✔
250
        try {
251
            for (int x = entryList.size() - 1; x >= 0 ; x--) {
1✔
252
                ServerPoolEntry entry = entryList.get(x);
1✔
253
                if (entry.nuri.equals(nuri)) {
1✔
254
                    lastConnected = nuri;
1✔
255
                    entry.failedAttempts = 0;
1✔
256
                    return;
1✔
257
                }
258
            }
259
        }
260
        finally {
261
            listLock.unlock();
1✔
262
        }
UNCOV
263
    }
×
264

265
    @Override
266
    public void connectFailed(@NonNull NatsUri nuri) {
267
        // 1. Work from the end because nextServer moved the one being tried to the end
268
        // 2. If we find the server in the list...
269
        //    2.1. increment failed attempts
270
        //    2.2. if failed attempts reaches max, remove it from the list
271
        listLock.lock();
1✔
272
        try {
273
            for (int x = entryList.size() - 1; x >= 0 ; x--) {
1✔
274
                ServerPoolEntry entry = entryList.get(x);
1✔
275
                if (entry.nuri.equals(nuri)) {
1✔
276
                    if (++entry.failedAttempts >= maxConnectAttempts) {
1✔
277
                        entryList.remove(x);
1✔
278
                    }
279
                    return;
1✔
280
                }
281
            }
282
        }
283
        finally {
284
            listLock.unlock();
1✔
285
        }
UNCOV
286
    }
×
287

288
    @Override
289
    @NonNull
290
    public List<String> getServerList() {
291
        listLock.lock();
1✔
292
        try {
293
            List<String> list = new ArrayList<>();
1✔
294
            for (ServerPoolEntry entry : entryList) {
1✔
295
                list.add(entry.nuri.toString());
1✔
296
            }
1✔
297
            return list;
1✔
298
        }
299
        finally {
300
            listLock.unlock();
1✔
301
        }
302
    }
303

304
    @Override
305
    public boolean hasSecureServer() {
306
        return hasSecureServer;
1✔
307
    }
308

309
    protected int findEquivalent(List<NatsUri> list, NatsUri toFind) {
310
        for (int i = 0; i < list.size(); i++) {
1✔
311
            NatsUri nuri = list.get(i);
1✔
312
            if (nuri.equivalent(toFind)) {
1✔
313
                return i;
1✔
314
            }
315
        }
316
        return -1;
1✔
317
    }
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc