• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2238

29 Sep 2025 04:41PM UTC coverage: 95.464% (-0.06%) from 95.521%
#2238

push

github

web-flow
Merge pull request #1438 from nats-io/improve-connection-listener

Improve ConnectionListener

52 of 64 new or added lines in 2 files covered. (81.25%)

7 existing lines in 3 files now uncovered.

12164 of 12742 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.34
/src/main/java/io/nats/client/impl/NatsConnection.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.ConnectionListener.Events;
18
import io.nats.client.api.ServerInfo;
19
import io.nats.client.support.*;
20
import org.jspecify.annotations.NonNull;
21
import org.jspecify.annotations.Nullable;
22

23
import java.io.IOException;
24
import java.net.InetAddress;
25
import java.net.URISyntaxException;
26
import java.nio.ByteBuffer;
27
import java.nio.CharBuffer;
28
import java.time.Duration;
29
import java.time.Instant;
30
import java.util.*;
31
import java.util.concurrent.*;
32
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.concurrent.atomic.AtomicReference;
35
import java.util.concurrent.locks.Condition;
36
import java.util.concurrent.locks.ReentrantLock;
37
import java.util.function.Predicate;
38

39
import static io.nats.client.support.NatsConstants.*;
40
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
41
import static io.nats.client.support.Validator.*;
42
import static java.nio.charset.StandardCharsets.UTF_8;
43

44
class NatsConnection implements Connection {
45

46
    public static final double NANOS_PER_SECOND = 1_000_000_000.0;
47

48
    private final Options options;
49
    final boolean forceFlushOnRequest;
50

51
    private final StatisticsCollector statistics;
52

53
    private boolean connecting; // you can only connect in one thread
54
    private boolean disconnecting; // you can only disconnect in one thread
55
    private boolean closing; // respect a close call regardless
56
    private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
57
    final ReentrantLock closeSocketLock; // this is not private so it can be tested
58

59
    private Status status;
60
    private final ReentrantLock statusLock;
61
    private final Condition statusChanged;
62

63
    private CompletableFuture<DataPort> dataPortFuture;
64
    private DataPort dataPort;
65
    private NatsUri currentServer;
66
    private NatsUri lastServer;
67
    private CompletableFuture<Boolean> reconnectWaiter;
68
    private final ConcurrentHashMap<NatsUri, String> serverAuthErrors;
69

70
    private NatsConnectionReader reader;
71
    private NatsConnectionWriter writer;
72

73
    private final AtomicReference<ServerInfo> serverInfo;
74

75
    private final Map<String, NatsSubscription> subscribers;
76
    private final Map<String, NatsDispatcher> dispatchers; // use a concurrent map so we get more consistent iteration behavior
77
    private final Collection<ConnectionListener> connectionListeners;
78
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
79
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
80
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
81

82
    private final String mainInbox;
83
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
84
    private final ReentrantLock inboxDispatcherLock;
85
    private ScheduledTask pingTask;
86
    private ScheduledTask cleanupTask;
87

88
    private final AtomicBoolean needPing;
89

90
    private final AtomicLong nextSid;
91
    private final NUID nuid;
92

93
    private final AtomicReference<String> connectError;
94
    private final AtomicReference<String> lastError;
95
    private final AtomicReference<CompletableFuture<Boolean>> draining;
96
    private final AtomicBoolean blockPublishForDrain;
97
    private final AtomicBoolean tryingToConnect;
98

99
    private final ExecutorService callbackRunner;
100
    private final ExecutorService executor;
101
    private final ExecutorService connectExecutor;
102
    private final ScheduledExecutorService scheduledExecutor;
103
    private final boolean advancedTracking;
104

105
    private final ServerPool serverPool;
106
    private final DispatcherFactory dispatcherFactory;
107
    private final @NonNull CancelAction cancelAction;
108

109
    private final boolean trace;
110
    private final TimeTraceLogger timeTraceLogger;
111

112
    NatsConnection(@NonNull Options options) {
1✔
113
        trace = options.isTraceConnection();
1✔
114
        timeTraceLogger = options.getTimeTraceLogger();
1✔
115
        timeTraceLogger.trace("creating connection object");
1✔
116

117
        this.options = options;
1✔
118
        forceFlushOnRequest = options.forceFlushOnRequest();
1✔
119

120
        advancedTracking = options.isTrackAdvancedStats();
1✔
121
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
1✔
122
        this.statistics.setAdvancedTracking(advancedTracking);
1✔
123

124
        this.closeSocketLock = new ReentrantLock();
1✔
125

126
        this.statusLock = new ReentrantLock();
1✔
127
        this.statusChanged = this.statusLock.newCondition();
1✔
128
        this.status = Status.DISCONNECTED;
1✔
129
        this.reconnectWaiter = new CompletableFuture<>();
1✔
130
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
131

132
        this.connectionListeners = ConcurrentHashMap.newKeySet();
1✔
133
        if (options.getConnectionListener() != null) {
1✔
134
            addConnectionListener(options.getConnectionListener());
1✔
135
        }
136

137
        this.dispatchers = new ConcurrentHashMap<>();
1✔
138
        this.subscribers = new ConcurrentHashMap<>();
1✔
139
        this.responsesAwaiting = new ConcurrentHashMap<>();
1✔
140
        this.responsesRespondedTo = new ConcurrentHashMap<>();
1✔
141
        this.serverAuthErrors = new ConcurrentHashMap<>();
1✔
142

143
        this.nextSid = new AtomicLong(1);
1✔
144
        timeTraceLogger.trace("creating NUID");
1✔
145
        this.nuid = new NUID();
1✔
146
        this.mainInbox = createInbox() + ".*";
1✔
147

148
        this.lastError = new AtomicReference<>();
1✔
149
        this.connectError = new AtomicReference<>();
1✔
150

151
        this.serverInfo = new AtomicReference<>(ServerInfo.EMPTY_INFO); // we want serverInfo.get to never return a null
1✔
152
        this.inboxDispatcher = new AtomicReference<>();
1✔
153
        this.inboxDispatcherLock = new ReentrantLock();
1✔
154
        this.pongQueue = new ConcurrentLinkedDeque<>();
1✔
155
        this.draining = new AtomicReference<>();
1✔
156
        this.blockPublishForDrain = new AtomicBoolean();
1✔
157
        this.tryingToConnect = new AtomicBoolean();
1✔
158

159
        timeTraceLogger.trace("creating executors");
1✔
160
        this.executor = options.getExecutor();
1✔
161
        this.callbackRunner = options.getCallbackExecutor();
1✔
162
        this.connectExecutor = options.getConnectExecutor();
1✔
163
        this.scheduledExecutor = options.getScheduledExecutor();
1✔
164

165
        timeTraceLogger.trace("creating reader and writer");
1✔
166
        this.reader = new NatsConnectionReader(this);
1✔
167
        this.writer = new NatsConnectionWriter(this, null);
1✔
168

169
        this.needPing = new AtomicBoolean(true);
1✔
170

171
        serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
1✔
172
        serverPool.initialize(options);
1✔
173
        dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
1✔
174

175
        cancelAction = options.isReportNoResponders() ? CancelAction.REPORT : CancelAction.CANCEL;
1✔
176

177
        timeTraceLogger.trace("connection object created");
1✔
178
    }
1✔
179

180
    // Connect is only called after creation
181
    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
182
        if (!tryingToConnect.get()) {
1✔
183
            try {
184
                tryingToConnect.set(true);
1✔
185
                connectImpl(reconnectOnConnect);
1✔
186
            }
187
            finally {
188
                tryingToConnect.set(false);
1✔
189
            }
190
        }
191
    }
1✔
192

193
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
194
        if (options.getServers().isEmpty()) {
1✔
195
            throw new IllegalArgumentException("No servers provided in options");
×
196
        }
197

198
        boolean trace = options.isTraceConnection();
1✔
199
        long start = NatsSystemClock.nanoTime();
1✔
200

201
        this.lastError.set("");
1✔
202

203
        timeTraceLogger.trace("starting connect loop");
1✔
204

205
        Set<NatsUri> failList = new HashSet<>();
1✔
206
        boolean keepGoing = true;
1✔
207
        NatsUri first = null;
1✔
208
        NatsUri cur;
209
        while (keepGoing && (cur = serverPool.peekNextServer()) != null) {
1✔
210
            if (first == null) {
1✔
211
                first = cur;
1✔
212
            }
213
            else if (cur.equals(first)) {
1✔
214
                break;  // connect only goes through loop once
1✔
215
            }
216
            serverPool.nextServer(); // b/c we only peeked.
1✔
217

218
            // let server pool resolve hostnames, then loop through resolved
219
            List<NatsUri> resolvedList = resolveHost(cur);
1✔
220
            for (NatsUri resolved : resolvedList) {
1✔
221
                if (isClosed()) {
1✔
222
                    keepGoing = false;
1✔
223
                    break;
1✔
224
                }
225
                connectError.set(""); // new on each attempt
1✔
226

227
                timeTraceLogger.trace("setting status to connecting");
1✔
228
                updateStatus(Status.CONNECTING, resolved, cur);
1✔
229

230
                timeTraceLogger.trace("trying to connect to %s", cur);
1✔
231
                tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
232

233
                if (isConnected()) {
1✔
234
                    serverPool.connectSucceeded(cur);
1✔
235
                    keepGoing = false;
1✔
236
                    break;
1✔
237
                }
238

239
                timeTraceLogger.trace("setting status to disconnected");
1✔
240
                updateStatus(Status.DISCONNECTED, resolved, cur);
1✔
241

242
                failList.add(cur);
1✔
243
                serverPool.connectFailed(cur);
1✔
244

245
                String err = connectError.get();
1✔
246

247
                if (this.isAuthenticationError(err)) {
1✔
248
                    this.serverAuthErrors.put(resolved, err);
1✔
249
                }
250
            }
1✔
251
        }
1✔
252

253
        if (!isConnected() && !isClosed()) {
1✔
254
            if (reconnectOnConnect) {
1✔
255
                timeTraceLogger.trace("trying to reconnect on connect");
1✔
256
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
257
            }
258
            else {
259
                timeTraceLogger.trace("connection failed, closing to cleanup");
1✔
260
                close();
1✔
261

262
                String err = connectError.get();
1✔
263
                if (this.isAuthenticationError(err)) {
1✔
264
                    throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
1✔
265
                }
266
                throw new IOException("Unable to connect to NATS servers: " + failList);
1✔
267
            }
268
        }
269
        else if (trace) {
1✔
270
            long end = NatsSystemClock.nanoTime();
1✔
271
            double seconds = ((double) (end - start)) / NANOS_PER_SECOND;
1✔
272
            timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
1✔
273
        }
274
    }
1✔
275

276
    @Override
277
    public void forceReconnect() throws IOException, InterruptedException {
278
        forceReconnect(ForceReconnectOptions.DEFAULT_INSTANCE);
1✔
279
    }
1✔
280

281
    @Override
282
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
283
        if (!tryingToConnect.get()) {
1✔
284
            try {
285
                tryingToConnect.set(true);
1✔
286
                forceReconnectImpl(options == null ? ForceReconnectOptions.DEFAULT_INSTANCE : options);
1✔
287
            }
288
            finally {
289
                tryingToConnect.set(false);
1✔
290
            }
291
        }
292
    }
1✔
293

294
    void forceReconnectImpl(@NonNull ForceReconnectOptions frOpts) throws InterruptedException {
295
        if (frOpts.getFlushWait() != null) {
1✔
296
            try {
297
                flush(frOpts.getFlushWait());
×
298
            }
299
            catch (TimeoutException e) {
×
300
                // Ignored. Manual test demonstrates that if the connection is dropped
301
                // in the middle of the flush, the most likely reason for a TimeoutException,
302
                // the socket is closed.
303
            }
×
304
        }
305

306
        closeSocketLock.lock();
1✔
307
        try {
308
            updateStatus(Status.DISCONNECTED);
1✔
309

310
            // Close and reset the current data port and future
311
            if (dataPortFuture != null) {
1✔
312
                dataPortFuture.cancel(true);
1✔
313
                dataPortFuture = null;
1✔
314
            }
315

316
            // close the data port as a task so as not to block reconnecting
317
            if (dataPort != null) {
1✔
318
                final DataPort dataPortToClose = dataPort;
1✔
319
                dataPort = null;
1✔
320
                executor.submit(() -> {
1✔
321
                    try {
322
                        if (frOpts.isForceClose()) {
1✔
323
                            dataPortToClose.forceClose();
×
324
                        }
325
                        else {
326
                            dataPortToClose.close();
1✔
327
                        }
328
                    }
329
                    catch (IOException ignore) {
×
330
                        // ignored since running as a task and nothing we can do.
331
                    }
1✔
332
                });
1✔
333
            }
334

335
            // stop i/o
336
            try {
337
                this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
1✔
338
            }
339
            catch (Exception ex) {
×
340
                processException(ex);
×
341
            }
1✔
342
            try {
343
                this.writer.stop().get(100, TimeUnit.MILLISECONDS);
1✔
344
            }
345
            catch (Exception ex) {
×
346
                processException(ex);
×
347
            }
1✔
348

349
            // new reader/writer
350
            reader = new NatsConnectionReader(this);
1✔
351
            writer = new NatsConnectionWriter(this, writer);
1✔
352
        }
353
        finally {
354
            closeSocketLock.unlock();
1✔
355
        }
356

357
        reconnectImpl();
1✔
358
        writer.setReconnectMode(false);
1✔
359
    }
1✔
360

361
    void reconnect() throws InterruptedException {
362
        if (!tryingToConnect.get()) {
1✔
363
            try {
364
                tryingToConnect.set(true);
1✔
365
                reconnectImpl();
1✔
366
            }
367
            finally {
368
                tryingToConnect.set(false);
1✔
369
            }
370
        }
371
    }
1✔
372

373
    // Reconnect can only be called when the connection is disconnected
374
    void reconnectImpl() throws InterruptedException {
375
        if (isClosed()) {
1✔
376
            return;
1✔
377
        }
378

379
        if (options.getMaxReconnect() == 0) {
1✔
380
            this.close();
1✔
381
            return;
1✔
382
        }
383

384
        writer.setReconnectMode(true);
1✔
385

386
        if (!isConnected() && !isClosed() && !this.isClosing()) {
1✔
387
            boolean keepGoing = true;
1✔
388
            int totalRounds = 0;
1✔
389
            NatsUri first = null;
1✔
390
            NatsUri cur;
391
            while (keepGoing && (cur = serverPool.nextServer()) != null) {
1✔
392
                if (first == null) {
1✔
393
                    first = cur;
1✔
394
                }
395
                else if (first.equals(cur)) {
1✔
396
                    // went around the pool an entire time
397
                    invokeReconnectDelayHandler(++totalRounds);
1✔
398
                }
399

400
                // let server list provider resolve hostnames
401
                // then loop through resolved
402
                List<NatsUri> resolvedList = resolveHost(cur);
1✔
403
                for (NatsUri resolved : resolvedList) {
1✔
404
                    if (isClosed()) {
1✔
405
                        keepGoing = false;
1✔
406
                        break;
1✔
407
                    }
408
                    connectError.set(""); // reset on each loop
1✔
409
                    if (isDisconnectingOrClosed() || this.isClosing()) {
1✔
410
                        keepGoing = false;
1✔
411
                        break;
1✔
412
                    }
413
                    updateStatus(Status.RECONNECTING, resolved, cur);
1✔
414

415
                    timeTraceLogger.trace("reconnecting to server %s", cur);
1✔
416
                    tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
417

418
                    if (isConnected()) {
1✔
419
                        serverPool.connectSucceeded(cur);
1✔
420
                        statistics.incrementReconnects();
1✔
421
                        keepGoing = false;
1✔
422
                        break;
1✔
423
                    }
424

425
                    serverPool.connectFailed(cur);
1✔
426
                    String err = connectError.get();
1✔
427
                    if (this.isAuthenticationError(err)) {
1✔
428
                        if (err.equals(this.serverAuthErrors.get(resolved))) {
1✔
429
                            keepGoing = false; // double auth error
1✔
430
                            break;
1✔
431
                        }
432
                        serverAuthErrors.put(resolved, err);
1✔
433
                    }
434
                }
1✔
435
            }
1✔
436
        } // end-main-loop
437

438
        if (!isConnected()) {
1✔
439
            this.close();
1✔
440
            return;
1✔
441
        }
442

443
        this.subscribers.forEach((sid, sub) -> {
1✔
444
            if (sub.getDispatcher() == null && !sub.isDraining()) {
1✔
445
                sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
446
            }
447
        });
1✔
448

449
        this.dispatchers.forEach((nuid, d) -> {
1✔
450
            if (!d.isDraining()) {
1✔
451
                d.resendSubscriptions();
1✔
452
            }
453
        });
1✔
454

455
        try {
456
            this.flush(this.options.getConnectionTimeout());
1✔
457
        }
458
        catch (Exception exp) {
1✔
459
            this.processException(exp);
1✔
460
        }
1✔
461

462
        processConnectionEvent(Events.RESUBSCRIBED, uriDetail(currentServer));
1✔
463

464
        // When the flush returns, we are done sending internal messages,
465
        // so we can switch to the non-reconnect queue
466
        this.writer.setReconnectMode(false);
1✔
467
    }
1✔
468

469
    long timeCheck(long endNanos, String message) throws TimeoutException {
470
        long remainingNanos = endNanos - NatsSystemClock.nanoTime();
1✔
471
        if (trace) {
1✔
472
            traceTimeCheck(message, remainingNanos);
1✔
473
        }
474
        if (remainingNanos < 0) {
1✔
475
            throw new TimeoutException("connection timed out");
1✔
476
        }
477
        return remainingNanos;
1✔
478
    }
479

480
    void traceTimeCheck(String message, long remainingNanos) {
481
        if (remainingNanos < 0) {
1✔
482
            if (remainingNanos > -1_000_000) { // less than -1 ms
1✔
483
                timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remainingNanos));
1✔
484
            }
485
            else if (remainingNanos > -1_000_000_000) { // less than -1 second
1✔
486
                long ms = -remainingNanos / 1_000_000;
1✔
487
                timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms));
1✔
488
            }
1✔
489
            else {
490
                double seconds = ((double) -remainingNanos) / 1_000_000_000.0;
1✔
491
                timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds));
1✔
492
            }
1✔
493
        }
494
        else if (remainingNanos < 1_000_000) {
1✔
495
            timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remainingNanos));
1✔
496
        }
497
        else if (remainingNanos < 1_000_000_000) {
1✔
498
            long ms = remainingNanos / 1_000_000;
1✔
499
            timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms));
1✔
500
        }
1✔
501
        else {
502
            double seconds = ((double) remainingNanos) / 1_000_000_000.0;
1✔
503
            timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds));
1✔
504
        }
505
    }
1✔
506

507
    // is called from reconnect and connect
508
    // will wait for any previous attempt to complete, using the reader.stop and
509
    // writer.stop
510
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
511
        clearCurrentServer();
1✔
512

513
        try {
514
            Duration connectTimeout = options.getConnectionTimeout();
1✔
515
            boolean trace = options.isTraceConnection();
1✔
516
            long end = now + connectTimeout.toNanos();
1✔
517
            timeCheck(end, "starting connection attempt");
1✔
518

519
            statusLock.lock();
1✔
520
            try {
521
                if (this.connecting) {
1✔
522
                    return;
×
523
                }
524
                this.connecting = true;
1✔
525
                statusChanged.signalAll();
1✔
526
            }
527
            finally {
528
                statusLock.unlock();
1✔
529
            }
530

531
            // Create a new future for the DataPort, the reader/writer will use this
532
            // to wait for the connect/failure.
533
            this.dataPortFuture = new CompletableFuture<>();
1✔
534

535
            // Make sure the reader and writer are stopped
536
            long timeoutNanos = timeCheck(end, "waiting for reader");
1✔
537
            if (reader.isRunning()) {
1✔
538
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
539
            }
540
            timeoutNanos = timeCheck(end, "waiting for writer");
1✔
541
            if (writer.isRunning()) {
1✔
542
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
543
            }
544

545
            timeCheck(end, "cleaning pong queue");
1✔
546
            cleanUpPongQueue();
1✔
547

548
            timeoutNanos = timeCheck(end, "connecting data port");
1✔
549
            DataPort newDataPort = this.options.buildDataPort();
1✔
550
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
1✔
551

552
            // Notify any threads waiting on the sockets
553
            this.dataPort = newDataPort;
1✔
554
            this.dataPortFuture.complete(this.dataPort);
1✔
555

556
            // Wait for the INFO message manually.
557
            // All other traffic will use the reader and writer
558
            // TLS First, don't read info until after upgrade
559
            // ---
560
            // Also this task does not have any exception catching
561
            // Since it is submitted as an async task, the future
562
            // will be aware of any exception thrown, and the future.get()
563
            // will throw an ExecutionException which is handled futher down
564
            Callable<Object> connectTask = () -> {
1✔
565
                if (!options.isTlsFirst()) {
1✔
566
                    readInitialInfo();
1✔
567
                    checkVersionRequirements();
1✔
568
                }
569
                long start = NatsSystemClock.nanoTime();
1✔
570
                upgradeToSecureIfNeeded(resolved);
1✔
571
                if (trace && options.isTLSRequired()) {
1✔
572
                    // If the time appears too long, it might be related to
573
                    // https://github.com/nats-io/nats.java#linux-platform-note
574
                    timeTraceLogger.trace("TLS upgrade took: %.3f (s)",
×
NEW
575
                        ((double) (NatsSystemClock.nanoTime() - start)) / NANOS_PER_SECOND);
×
576
                }
577
                if (options.isTlsFirst()) {
1✔
578
                    readInitialInfo();
1✔
579
                    checkVersionRequirements();
1✔
580
                }
581
                return null;
1✔
582
            };
583

584
            timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary");
1✔
585
            Future<Object> future = this.connectExecutor.submit(connectTask);
1✔
586
            try {
587
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
588
            }
589
            finally {
590
                future.cancel(true);
1✔
591
            }
592

593
            // start the reader and writer after we secured the connection, if necessary
594
            timeCheck(end, "starting reader");
1✔
595
            this.reader.start(this.dataPortFuture);
1✔
596
            timeCheck(end, "starting writer");
1✔
597
            this.writer.start(this.dataPortFuture);
1✔
598

599
            timeCheck(end, "sending connect message");
1✔
600
            this.sendConnect(resolved);
1✔
601

602
            timeoutNanos = timeCheck(end, "sending initial ping");
1✔
603
            Future<Boolean> pongFuture = sendPing();
1✔
604

605
            if (pongFuture != null) {
1✔
606
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
607
            }
608

609
            if (pingTask == null) {
1✔
610
                timeCheck(end, "starting ping and cleanup timers");
1✔
611
                long pingMillis = this.options.getPingInterval().toMillis();
1✔
612

613
                if (pingMillis > 0) {
1✔
614
                    pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
1✔
615
                        if (isConnected() && !isClosing()) {
1✔
616
                            try {
617
                                softPing(); // The timer always uses the standard queue
1✔
618
                            }
619
                            catch (Exception e) {
1✔
620
                                // it's running in a thread, there is no point throwing here
621
                            }
1✔
622
                        }
623
                    });
1✔
624
                }
625

626
                long cleanMillis = this.options.getRequestCleanupInterval().toMillis();
1✔
627

628
                if (cleanMillis > 0) {
1✔
629
                    cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
1✔
630
                }
631
            }
632

633
            // Set connected status
634
            timeCheck(end, "updating status to connected");
1✔
635
            statusLock.lock();
1✔
636
            try {
637
                this.connecting = false;
1✔
638

639
                if (this.exceptionDuringConnectChange != null) {
1✔
640
                    throw this.exceptionDuringConnectChange;
×
641
                }
642

643
                this.currentServer = cur;
1✔
644
                this.serverAuthErrors.clear(); // reset on successful connection
1✔
645
                updateStatus(Status.CONNECTED); // will signal status change, we also signal in finally
1✔
646
            }
647
            finally {
648
                statusLock.unlock();
1✔
649
            }
650
            timeTraceLogger.trace("status updated");
1✔
651
        }
652
        catch (Exception exp) {
1✔
653
            processException(exp);
1✔
654
            try {
655
                // allow force reconnect since this is pretty exceptional,
656
                // a connection failure while trying to connect
657
                this.closeSocket(false, true);
1✔
658
            }
NEW
659
            catch (InterruptedException e) {
×
660
                processException(e);
×
661
                Thread.currentThread().interrupt();
×
662
            }
1✔
663
        }
664
        finally {
665
            statusLock.lock();
1✔
666
            try {
667
                this.connecting = false;
1✔
668
                statusChanged.signalAll();
1✔
669
            }
670
            finally {
671
                statusLock.unlock();
1✔
672
            }
673
        }
674
    }
1✔
675

676
    private void clearCurrentServer() {
677
        if (currentServer != null) {
1✔
678
            lastServer = currentServer;
1✔
679
        }
680
        currentServer = null;
1✔
681
    }
1✔
682

683
    void checkVersionRequirements() throws IOException {
684
        Options opts = getOptions();
1✔
685
        ServerInfo info = getServerInfo();
1✔
686

687
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
1✔
688
            throw new IOException("Server does not support no echo.");
1✔
689
        }
690
    }
1✔
691

692
    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
693
        // When already communicating over "https" websocket, do NOT try to upgrade to secure.
694
        if (!nuri.isWebsocket()) {
1✔
695
            if (options.isTlsFirst()) {
1✔
696
                dataPort.upgradeToSecure();
1✔
697
            }
698
            else {
699
                // server    | client options      | result
700
                // --------- | ------------------- | --------
701
                // required  | not isTLSRequired() | mismatch
702
                // available | not isTLSRequired() | ok
703
                // neither   | not isTLSRequired() | ok
704
                // required  | isTLSRequired()     | ok
705
                // available | isTLSRequired()     | ok
706
                // neither   | isTLSRequired()     | mismatch
707
                ServerInfo serverInfo = getServerInfo();
1✔
708
                if (options.isTLSRequired()) {
1✔
709
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
1✔
710
                        throw new IOException("SSL connection wanted by client.");
1✔
711
                    }
712
                    dataPort.upgradeToSecure();
1✔
713
                }
714
                else if (serverInfo.isTLSRequired()) {
1✔
715
                    throw new IOException("SSL required by server.");
1✔
716
                }
717
            }
718
        }
719
    }
1✔
720

721
    // Called from reader/writer thread
722
    void handleCommunicationIssue(Exception io) {
723
        // If we are connecting or disconnecting, note exception and leave
724
        statusLock.lock();
1✔
725
        try {
726
            if (this.connecting || this.disconnecting || this.status == Status.CLOSED || this.isDraining()) {
1✔
727
                this.exceptionDuringConnectChange = io;
1✔
728
                return;
1✔
729
            }
730
        }
731
        finally {
732
            statusLock.unlock();
1✔
733
        }
734

735
        processException(io);
1✔
736

737
        // Spawn a thread so we don't have timing issues with
738
        // waiting on read/write threads
739
        executor.submit(() -> {
1✔
740
            if (!tryingToConnect.get()) {
1✔
741
                try {
742
                    tryingToConnect.set(true);
1✔
743

744
                    // any issue that brings us here is pretty serious
745
                    // so we are comfortable forcing the close
746
                    this.closeSocket(true, true);
1✔
747
                }
NEW
748
                catch (InterruptedException e) {
×
749
                    processException(e);
×
750
                    Thread.currentThread().interrupt();
×
751
                }
752
                finally {
753
                    tryingToConnect.set(false);
1✔
754
                }
755
            }
756
        });
1✔
757
    }
1✔
758

759
    // Close socket is called when another connect attempt is possible
760
    // Close is called when the connection should shut down, period
761
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
762
        // Ensure we close the socket exclusively within one thread.
763
        closeSocketLock.lock();
1✔
764
        try {
765
            boolean wasConnected;
766
            statusLock.lock();
1✔
767
            try {
768
                if (isDisconnectingOrClosed()) {
1✔
769
                    waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
770
                    return;
1✔
771
                }
772
                this.disconnecting = true;
1✔
773
                this.exceptionDuringConnectChange = null;
1✔
774
                wasConnected = (this.status == Status.CONNECTED);
1✔
775
                statusChanged.signalAll();
1✔
776
            }
777
            finally {
778
                statusLock.unlock();
1✔
779
            }
780

781
            closeSocketImpl(forceClose);
1✔
782

783
            statusLock.lock();
1✔
784
            try {
785
                updateStatus(Status.DISCONNECTED);
1✔
786
                this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
1✔
787
                this.disconnecting = false;
1✔
788
                statusChanged.signalAll();
1✔
789
            }
790
            finally {
791
                statusLock.unlock();
1✔
792
            }
793

794
            if (isClosing()) { // isClosing() means we are in the close method or were asked to be
1✔
795
                close();
1✔
796
            }
797
            else if (wasConnected && tryReconnectIfConnected) {
1✔
798
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
799
            }
800
        }
801
        finally {
802
            closeSocketLock.unlock();
1✔
803
        }
804
    }
1✔
805

806
    // Close socket is called when another connect attempt is possible
807
    // Close is called when the connection should shut down, period
808

809
    /**
810
     * {@inheritDoc}
811
     */
812
    @Override
813
    public void close() throws InterruptedException {
814
        this.close(true, false);
1✔
815
    }
1✔
816

817
    // This method was originally built assuming there might be multiple paths to this method,
818
    // but it turns out there isn't. Not refactoring the code though, hence the warning suppression
819
    @SuppressWarnings("SameParameterValue")
820
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
821
        statusLock.lock();
1✔
822
        try {
823
            if (checkDrainStatus && this.isDraining()) {
1✔
824
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
825
                return;
1✔
826
            }
827

828
            this.closing = true;// We were asked to close, so do it
1✔
829
            if (isDisconnectingOrClosed()) {
1✔
830
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
831
                return;
1✔
832
            }
833
            else {
834
                this.disconnecting = true;
1✔
835
                this.exceptionDuringConnectChange = null;
1✔
836
                statusChanged.signalAll();
1✔
837
            }
838
        }
839
        finally {
840
            statusLock.unlock();
1✔
841
        }
842

843
        // Stop the reconnect wait timer after we stop the writer/reader (only if we are
844
        // really closing, not on errors)
845
        if (this.reconnectWaiter != null) {
1✔
846
            this.reconnectWaiter.cancel(true);
1✔
847
        }
848

849
        closeSocketImpl(forceClose);
1✔
850

851
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
1✔
852

853
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
1✔
854

855
        this.dispatchers.clear();
1✔
856
        this.subscribers.clear();
1✔
857

858
        if (pingTask != null) {
1✔
859
            pingTask.shutdown();
1✔
860
            pingTask = null;
1✔
861
        }
862
        if (cleanupTask != null) {
1✔
863
            cleanupTask.shutdown();
1✔
864
            cleanupTask = null;
1✔
865
        }
866

867
        cleanResponses(true);
1✔
868

869
        cleanUpPongQueue();
1✔
870

871
        statusLock.lock();
1✔
872
        try {
873
            updateStatus(Status.CLOSED); // will signal, we also signal when we stop disconnecting
1✔
874

875
            /*
876
             * if (exceptionDuringConnectChange != null) {
877
             * processException(exceptionDuringConnectChange); exceptionDuringConnectChange
878
             * = null; }
879
             */
880
        }
881
        finally {
882
            statusLock.unlock();
1✔
883
        }
884

885
        // Stop the error handling and connect executors
886
        callbackRunner.shutdown();
1✔
887
        try {
888
            // At this point in the flow, the connection is shutting down.
889
            // There is really no use in giving this information to the developer,
890
            // It's fair to say that an exception here anyway will practically never happen
891
            // and if it did, the app is probably already frozen.
892
            //noinspection ResultOfMethodCallIgnored
893
            callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
1✔
894
        }
895
        finally {
896
            callbackRunner.shutdownNow();
1✔
897
        }
898

899
        // There's no need to wait for running tasks since we're told to close
900
        connectExecutor.shutdownNow();
1✔
901

902
        // The callbackRunner and connectExecutor always come from a factory,
903
        // so we always shut them down.
904
        // The executor and scheduledExecutor come from a factory only if
905
        // the user does NOT supply them, so we shut them down in that case.
906
        if (options.executorIsInternal()) {
1✔
907
            executor.shutdownNow();
1✔
908
        }
909
        if (options.scheduledExecutorIsInternal()) {
1✔
910
            scheduledExecutor.shutdownNow();
1✔
911
        }
912

913
        statusLock.lock();
1✔
914
        try {
915
            this.disconnecting = false;
1✔
916
            statusChanged.signalAll();
1✔
917
        }
918
        finally {
919
            statusLock.unlock();
1✔
920
        }
921
    }
1✔
922

923
    boolean callbackRunnerIsShutdown() {
924
        return callbackRunner == null || callbackRunner.isShutdown();
1✔
925
    }
926

927
    boolean executorIsShutdown() {
928
        return executor == null || executor.isShutdown();
1✔
929
    }
930

931
    boolean connectExecutorIsShutdown() {
932
        return connectExecutor == null || connectExecutor.isShutdown();
1✔
933
    }
934

935
    boolean scheduledExecutorIsShutdown() {
936
        return scheduledExecutor == null || scheduledExecutor.isShutdown();
1✔
937
    }
938

939
    // Should only be called from closeSocket or close
940
    void closeSocketImpl(boolean forceClose) {
941
        clearCurrentServer();
1✔
942

943
        // Signal both to stop.
944
        final Future<Boolean> readStop = this.reader.stop();
1✔
945
        final Future<Boolean> writeStop = this.writer.stop();
1✔
946

947
        // Now wait until they both stop before closing the socket.
948
        try {
949
            readStop.get(1, TimeUnit.SECONDS);
1✔
950
        }
951
        catch (Exception ex) {
1✔
952
            //
953
        }
1✔
954
        try {
955
            writeStop.get(1, TimeUnit.SECONDS);
1✔
956
        }
NEW
957
        catch (Exception ex) {
×
958
            //
959
        }
1✔
960

961
        // Close and reset the current data port and future
962
        if (dataPortFuture != null) {
1✔
963
            dataPortFuture.cancel(true);
1✔
964
            dataPortFuture = null;
1✔
965
        }
966

967
        // Close the current socket and cancel anyone waiting for it
968
        try {
969
            if (dataPort != null) {
1✔
970
                if (forceClose) {
1✔
971
                    dataPort.forceClose();
1✔
972
                }
973
                else {
974
                    dataPort.close();
1✔
975
                }
976
            }
977

978
        }
NEW
979
        catch (IOException ex) {
×
UNCOV
980
            processException(ex);
×
981
        }
1✔
982
        cleanUpPongQueue();
1✔
983

984
        try {
985
            this.reader.stop().get(10, TimeUnit.SECONDS);
1✔
986
        }
NEW
987
        catch (Exception ex) {
×
UNCOV
988
            processException(ex);
×
989
        }
1✔
990
        try {
991
            this.writer.stop().get(10, TimeUnit.SECONDS);
1✔
992
        }
NEW
993
        catch (Exception ex) {
×
UNCOV
994
            processException(ex);
×
995
        }
1✔
996
    }
1✔
997

998
    void cleanUpPongQueue() {
999
        Future<Boolean> b;
1000
        while ((b = pongQueue.poll()) != null) {
1✔
1001
            b.cancel(true);
1✔
1002
        }
1003
    }
1✔
1004

1005
    /**
1006
     * {@inheritDoc}
1007
     */
1008
    @Override
1009
    public void publish(@NonNull String subject, byte @Nullable [] body) {
1010
        publishInternal(subject, null, null, body, true, false);
1✔
1011
    }
1✔
1012

1013
    /**
1014
     * {@inheritDoc}
1015
     */
1016
    @Override
1017
    public void publish(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
1018
        publishInternal(subject, null, headers, body, true, false);
1✔
1019
    }
1✔
1020

1021
    /**
1022
     * {@inheritDoc}
1023
     */
1024
    @Override
1025
    public void publish(@NonNull String subject, @Nullable String replyTo, byte @Nullable [] body) {
1026
        publishInternal(subject, replyTo, null, body, true, false);
1✔
1027
    }
1✔
1028

1029
    /**
1030
     * {@inheritDoc}
1031
     */
1032
    @Override
1033
    public void publish(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] body) {
1034
        publishInternal(subject, replyTo, headers, body, true, false);
1✔
1035
    }
1✔
1036

1037
    /**
1038
     * {@inheritDoc}
1039
     */
1040
    @Override
1041
    public void publish(@NonNull Message message) {
1042
        validateNotNull(message, "Message");
1✔
1043
        publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
1✔
1044
    }
1✔
1045

1046
    void publishInternal(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1047
        checkPayloadSize(data);
1✔
1048
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1049
        if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
1✔
1050
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
1✔
1051
        }
1052

1053
        if (isClosed()) {
1✔
1054
            throw new IllegalStateException("Connection is Closed");
1✔
1055
        }
1056
        else if (blockPublishForDrain.get()) {
1✔
UNCOV
1057
            throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
×
1058
        }
1059

1060
        if ((status == Status.RECONNECTING || status == Status.DISCONNECTED)
1✔
1061
            && !this.writer.canQueueDuringReconnect(npm)) {
1✔
1062
            throw new IllegalStateException(
1✔
1063
                "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
1✔
1064
        }
1065

1066
        queueOutgoing(npm);
1✔
1067
    }
1✔
1068

1069
    private void checkPayloadSize(byte @Nullable [] body) {
1070
        if (body != null && options.clientSideLimitChecks() && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
1✔
1071
            throw new IllegalArgumentException(
1✔
1072
                "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
1✔
1073
        }
1074
    }
1✔
1075

1076
    /**
1077
     * {@inheritDoc}
1078
     */
1079
    @Override
1080
    @NonNull
1081
    public Subscription subscribe(@NonNull String subject) {
1082
        validateSubject(subject, true);
1✔
1083
        return createSubscription(subject, null, null, null);
1✔
1084
    }
1085

1086
    /**
1087
     * {@inheritDoc}
1088
     */
1089
    @Override
1090
    @NonNull
1091
    public Subscription subscribe(@NonNull String subject, @NonNull String queueName) {
1092
        validateSubject(subject, true);
1✔
1093
        validateQueueName(queueName, true);
1✔
1094
        return createSubscription(subject, queueName, null, null);
1✔
1095
    }
1096

1097
    void invalidate(NatsSubscription sub) {
1098
        remove(sub);
1✔
1099
        sub.invalidate();
1✔
1100
    }
1✔
1101

1102
    void remove(NatsSubscription sub) {
1103
        CharSequence sid = sub.getSID();
1✔
1104
        subscribers.remove(sid);
1✔
1105

1106
        if (sub.getNatsDispatcher() != null) {
1✔
1107
            sub.getNatsDispatcher().remove(sub);
1✔
1108
        }
1109
    }
1✔
1110

1111
    void unsubscribe(NatsSubscription sub, int after) {
1112
        if (isClosed()) { // last chance, usually sub will catch this
1✔
1113
            throw new IllegalStateException("Connection is Closed");
×
1114
        }
1115

1116
        if (after <= 0) {
1✔
1117
            this.invalidate(sub); // Will clean it up
1✔
1118
        }
1119
        else {
1120
            sub.setUnsubLimit(after);
1✔
1121

1122
            if (sub.reachedUnsubLimit()) {
1✔
1123
                sub.invalidate();
1✔
1124
            }
1125
        }
1126

1127
        if (!isConnected()) {
1✔
1128
            return; // We will set up sub on reconnect or ignore
1✔
1129
        }
1130

1131
        sendUnsub(sub, after);
1✔
1132
    }
1✔
1133

1134
    void sendUnsub(@NonNull NatsSubscription sub, int after) {
1135
        ByteArrayBuilder bab =
1✔
1136
            new ByteArrayBuilder().append(UNSUB_SP_BYTES).append(sub.getSID());
1✔
1137
        if (after > 0) {
1✔
1138
            bab.append(SP).append(after);
1✔
1139
        }
1140
        queueOutgoing(new ProtocolMessage(bab, true));
1✔
1141
    }
1✔
1142

1143
    // Assumes the null/empty checks were handled elsewhere
1144
    @NonNull
1145
    NatsSubscription createSubscription(@NonNull String subject,
1146
                                        @Nullable String queueName,
1147
                                        @Nullable NatsDispatcher dispatcher,
1148
                                        @Nullable NatsSubscriptionFactory factory) {
1149
        if (isClosed()) {
1✔
1150
            throw new IllegalStateException("Connection is Closed");
1✔
1151
        }
1152
        else if (isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
1✔
1153
            throw new IllegalStateException("Connection is Draining");
1✔
1154
        }
1155

1156
        NatsSubscription sub;
1157
        String sid = getNextSid();
1✔
1158

1159
        if (factory == null) {
1✔
1160
            sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1161
        }
1162
        else {
1163
            sub = factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1164
        }
1165
        subscribers.put(sid, sub);
1✔
1166

1167
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1168
        return sub;
1✔
1169
    }
1170

1171
    String getNextSid() {
1172
        return Long.toString(nextSid.getAndIncrement());
1✔
1173
    }
1174

1175
    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
1176
        String sid = getNextSid();
1✔
1177
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1178
        subscribers.put(sid, sub);
1✔
1179
        return sid;
1✔
1180
    }
1181

1182
    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
1183
        if (!isConnected()) {
1✔
1184
            return; // We will set up sub on reconnect or ignore
1✔
1185
        }
1186

1187
        ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
1✔
1188
        if (queueName != null) {
1✔
1189
            bab.append(SP).append(queueName);
1✔
1190
        }
1191
        bab.append(SP).append(sid);
1✔
1192

1193
        // setting this to filter on stop.
1194
        // if it's an "internal" message, it won't be filtered
1195
        // if it's a normal message, the subscription will already be registered
1196
        // and therefore will be re-subscribed after a stop anyway
1197
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
1✔
1198
        if (treatAsInternal) {
1✔
1199
            queueInternalOutgoing(subMsg);
1✔
1200
        }
1201
        else {
1202
            queueOutgoing(subMsg);
1✔
1203
        }
1204
    }
1✔
1205

1206
    /**
1207
     * {@inheritDoc}
1208
     */
1209
    @Override
1210
    @NonNull
1211
    public String createInbox() {
1212
        return options.getInboxPrefix() + nuid.next();
1✔
1213
    }
1214

1215
    int getRespInboxLength() {
1216
        return options.getInboxPrefix().length() + 22 + 1; // 22 for nuid, 1 for .
1✔
1217
    }
1218

1219
    String createResponseInbox(String inbox) {
1220
        // Substring gets rid of the * [trailing]
1221
        return inbox.substring(0, getRespInboxLength()) + nuid.next();
1✔
1222
    }
1223

1224
    // If the inbox is long enough, pull out the end part, otherwise, just use the
1225
    // full thing
1226
    String getResponseToken(String responseInbox) {
1227
        int len = getRespInboxLength();
1✔
1228
        if (responseInbox.length() <= len) {
1✔
1229
            return responseInbox;
1✔
1230
        }
1231
        return responseInbox.substring(len);
1✔
1232
    }
1233

1234
    void cleanResponses(boolean closing) {
1235
        ArrayList<String> toRemove = new ArrayList<>();
1✔
1236
        boolean wasInterrupted = false;
1✔
1237

1238
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
1✔
1239
            boolean remove = false;
1✔
1240
            NatsRequestCompletableFuture future = entry.getValue();
1✔
1241
            if (future.hasExceededTimeout()) {
1✔
1242
                remove = true;
1✔
1243
                future.cancelTimedOut();
1✔
1244
            }
1245
            else if (closing) {
1✔
1246
                remove = true;
1✔
1247
                future.cancelClosing();
1✔
1248
            }
1249
            else if (future.isDone()) {
1✔
1250
                // done should have already been removed, not sure if
1251
                // this even needs checking, but it won't hurt
1252
                remove = true;
1✔
1253
                try {
1254
                    future.get();
×
1255
                }
1256
                catch (InterruptedException e) {
×
1257
                    Thread.currentThread().interrupt();
×
1258
                    // We might have collected some entries already, but were interrupted.
1259
                    // Break out so we finish as quick as possible,
1260
                    // cleanResponses will be called again anyway
1261
                    wasInterrupted = true;
×
1262
                    break;
×
1263
                }
1264
                catch (Throwable ignore) {
1✔
NEW
1265
                }
×
1266
            }
1267

1268
            if (remove) {
1✔
1269
                toRemove.add(entry.getKey());
1✔
1270
                statistics.decrementOutstandingRequests();
1✔
1271
            }
1272
        }
1✔
1273

1274
        for (String key : toRemove) {
1✔
1275
            responsesAwaiting.remove(key);
1✔
1276
        }
1✔
1277

1278
        if (advancedTracking && !wasInterrupted) {
1✔
1279
            toRemove.clear(); // we can reuse this but it needs to be cleared
1✔
1280
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1✔
1281
                NatsRequestCompletableFuture future = entry.getValue();
1✔
1282
                if (future.hasExceededTimeout()) {
1✔
1283
                    toRemove.add(entry.getKey());
1✔
1284
                    future.cancelTimedOut();
1✔
1285
                }
1286
            }
1✔
1287

1288
            for (String token : toRemove) {
1✔
1289
                responsesRespondedTo.remove(token);
1✔
1290
            }
1✔
1291
        }
1292
    }
1✔
1293

1294
    /**
1295
     * {@inheritDoc}
1296
     */
1297
    @Override
1298
    @Nullable
1299
    public Message request(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
1300
        return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1301
    }
1302

1303
    /**
1304
     * {@inheritDoc}
1305
     */
1306
    @Override
1307
    @Nullable
1308
    public Message request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
1309
        return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1310
    }
1311

1312
    /**
1313
     * {@inheritDoc}
1314
     */
1315
    @Override
1316
    @Nullable
1317
    public Message request(@NonNull Message message, @Nullable Duration timeout) throws InterruptedException {
1318
        validateNotNull(message, "Message");
1✔
1319
        return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1320
    }
1321

1322
    @Nullable
1323
    Message requestInternal(@NonNull String subject,
1324
                            @Nullable Headers headers,
1325
                            byte @Nullable [] data,
1326
                            @Nullable Duration timeout,
1327
                            @NonNull CancelAction cancelAction,
1328
                            boolean validateSubjectAndReplyTo,
1329
                            boolean flushImmediatelyAfterPublish) throws InterruptedException
1330
    {
1331
        CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1332
        try {
1333
            if (timeout == null) {
1✔
1334
                timeout = getOptions().getConnectionTimeout();
×
1335
            }
1336
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
1337
        }
1338
        catch (TimeoutException | ExecutionException | CancellationException e) {
1✔
1339
            return null;
1✔
1340
        }
1341
    }
1342

1343
    /**
1344
     * {@inheritDoc}
1345
     */
1346
    @Override
1347
    @NonNull
1348
    public CompletableFuture<Message> request(@NonNull String subject, byte @Nullable [] body) {
1349
        return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1350
    }
1351

1352
    /**
1353
     * {@inheritDoc}
1354
     */
1355
    @Override
1356
    @NonNull
1357
    public CompletableFuture<Message> request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
1358
        return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1359
    }
1360

1361
    /**
1362
     * {@inheritDoc}
1363
     */
1364
    @Override
1365
    @NonNull
1366
    public CompletableFuture<Message> requestWithTimeout(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) {
1367
        return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1368
    }
1369

1370
    /**
1371
     * {@inheritDoc}
1372
     */
1373
    @Override
1374
    @NonNull
1375
    public CompletableFuture<Message> requestWithTimeout(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, Duration timeout) {
1376
        return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1377
    }
1378

1379
    /**
1380
     * {@inheritDoc}
1381
     */
1382
    @Override
1383
    @NonNull
1384
    public CompletableFuture<Message> requestWithTimeout(@NonNull Message message, @Nullable Duration timeout) {
1385
        validateNotNull(message, "Message");
1✔
1386
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1387
    }
1388

1389
    /**
1390
     * {@inheritDoc}
1391
     */
1392
    @Override
1393
    @NonNull
1394
    public CompletableFuture<Message> request(@NonNull Message message) {
1395
        validateNotNull(message, "Message");
1✔
1396
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
1✔
1397
    }
1398

1399
    @NonNull
1400
    CompletableFuture<Message> requestFutureInternal(@NonNull String subject,
1401
                                                     @Nullable Headers headers,
1402
                                                     byte @Nullable [] body,
1403
                                                     @Nullable Duration futureTimeout,
1404
                                                     @NonNull CancelAction cancelAction,
1405
                                                     boolean validateSubjectAndReplyTo,
1406
                                                     boolean flushImmediatelyAfterPublish) {
1407
        checkPayloadSize(body);
1✔
1408

1409
        if (isClosed()) {
1✔
1410
            throw new IllegalStateException("Connection is Closed");
1✔
1411
        }
1412
        else if (isDraining()) {
1✔
1413
            throw new IllegalStateException("Connection is Draining");
1✔
1414
        }
1415

1416
        if (inboxDispatcher.get() == null) {
1✔
1417
            inboxDispatcherLock.lock();
1✔
1418
            try {
1419
                if (inboxDispatcher.get() == null) {
1✔
1420
                    NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1✔
1421

1422
                    // Ensure the dispatcher is started before publishing messages
1423
                    String id = this.nuid.next();
1✔
1424
                    this.dispatchers.put(id, d);
1✔
1425
                    d.start(id);
1✔
1426
                    d.subscribe(this.mainInbox);
1✔
1427
                    inboxDispatcher.set(d);
1✔
1428
                }
1429
            }
1430
            finally {
1431
                inboxDispatcherLock.unlock();
1✔
1432
            }
1433
        }
1434

1435
        boolean oldStyle = options.isOldRequestStyle();
1✔
1436
        String responseInbox = oldStyle ? createInbox() : createResponseInbox(this.mainInbox);
1✔
1437
        String responseToken = getResponseToken(responseInbox);
1✔
1438
        NatsRequestCompletableFuture future =
1✔
1439
            new NatsRequestCompletableFuture(cancelAction,
1440
                futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());
1✔
1441

1442
        if (!oldStyle) {
1✔
1443
            responsesAwaiting.put(responseToken, future);
1✔
1444
        }
1445
        statistics.incrementOutstandingRequests();
1✔
1446

1447
        if (oldStyle) {
1✔
1448
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
1✔
1449
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
1✔
1450
            dispatcher.unsubscribe(responseInbox, 1);
1✔
1451
            // Unsubscribe when future is cancelled:
1452
            future.whenComplete((msg, exception) -> {
1✔
1453
                if (exception instanceof CancellationException) {
1✔
1454
                    dispatcher.unsubscribe(responseInbox);
×
1455
                }
1456
            });
1✔
1457
            responsesAwaiting.put(sub.getSID(), future);
1✔
1458
        }
1459

1460
        publishInternal(subject, responseInbox, headers, body, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1461
        statistics.incrementRequestsSent();
1✔
1462

1463
        return future;
1✔
1464
    }
1465

1466
    void deliverReply(Message msg) {
1467
        boolean oldStyle = options.isOldRequestStyle();
1✔
1468
        String subject = msg.getSubject();
1✔
1469
        String token = getResponseToken(subject);
1✔
1470
        String key = oldStyle ? msg.getSID() : token;
1✔
1471
        NatsRequestCompletableFuture f = responsesAwaiting.remove(key);
1✔
1472
        if (f != null) {
1✔
1473
            if (advancedTracking) {
1✔
1474
                responsesRespondedTo.put(key, f);
1✔
1475
            }
1476
            statistics.decrementOutstandingRequests();
1✔
1477
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
1✔
1478
                switch (f.getCancelAction()) {
1✔
1479
                    case COMPLETE:
1480
                        f.complete(msg);
1✔
1481
                        break;
1✔
1482
                    case REPORT:
1483
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
1✔
1484
                        break;
1✔
1485
                    case CANCEL:
1486
                    default:
1487
                        f.cancel(true);
1✔
1488
                }
1489
            }
1490
            else {
1491
                f.complete(msg);
1✔
1492
            }
1493
            statistics.incrementRepliesReceived();
1✔
1494
        }
1495
        else if (!oldStyle && !subject.startsWith(mainInbox)) {
1✔
1496
            if (advancedTracking) {
1✔
1497
                if (responsesRespondedTo.get(key) != null) {
1✔
1498
                    statistics.incrementDuplicateRepliesReceived();
1✔
1499
                }
1500
                else {
1501
                    statistics.incrementOrphanRepliesReceived();
1✔
1502
                }
1503
            }
1504
        }
1505
    }
1✔
1506

1507
    /**
1508
     * {@inheritDoc}
1509
     */
1510
    @NonNull
1511
    public Dispatcher createDispatcher() {
1512
        return createDispatcher(null);
1✔
1513
    }
1514

1515
    /**
1516
     * {@inheritDoc}
1517
     */
1518
    @NonNull
1519
    public Dispatcher createDispatcher(@Nullable MessageHandler handler) {
1520
        if (isClosed()) {
1✔
1521
            throw new IllegalStateException("Connection is Closed");
1✔
1522
        }
1523
        else if (isDraining()) {
1✔
1524
            throw new IllegalStateException("Connection is Draining");
1✔
1525
        }
1526

1527
        NatsDispatcher dispatcher = dispatcherFactory.createDispatcher(this, handler);
1✔
1528
        String id = this.nuid.next();
1✔
1529
        this.dispatchers.put(id, dispatcher);
1✔
1530
        dispatcher.start(id);
1✔
1531
        return dispatcher;
1✔
1532
    }
1533

1534
    /**
1535
     * {@inheritDoc}
1536
     */
1537
    public void closeDispatcher(@NonNull Dispatcher d) {
1538
        if (isClosed()) {
1✔
1539
            throw new IllegalStateException("Connection is Closed");
1✔
1540
        }
1541
        else if (!(d instanceof NatsDispatcher)) {
1✔
1542
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
×
1543
        }
1544

1545
        NatsDispatcher nd = (NatsDispatcher) d;
1✔
1546

1547
        if (nd.isDraining()) {
1✔
1548
            return; // No op while draining
1✔
1549
        }
1550

1551
        if (!this.dispatchers.containsKey(nd.getId())) {
1✔
1552
            throw new IllegalArgumentException("Dispatcher is already closed.");
1✔
1553
        }
1554

1555
        cleanupDispatcher(nd);
1✔
1556
    }
1✔
1557

1558
    void cleanupDispatcher(NatsDispatcher nd) {
1559
        nd.stop(true);
1✔
1560
        this.dispatchers.remove(nd.getId());
1✔
1561
    }
1✔
1562

1563
    Map<String, Dispatcher> getDispatchers() {
1564
        return Collections.unmodifiableMap(dispatchers);
1✔
1565
    }
1566

1567
    /**
1568
     * {@inheritDoc}
1569
     */
1570
    public void addConnectionListener(@NonNull ConnectionListener connectionListener) {
1571
        connectionListeners.add(connectionListener);
1✔
1572
    }
1✔
1573

1574
    /**
1575
     * {@inheritDoc}
1576
     */
1577
    public void removeConnectionListener(@NonNull ConnectionListener connectionListener) {
1578
        connectionListeners.remove(connectionListener);
1✔
1579
    }
1✔
1580

1581
    /**
1582
     * {@inheritDoc}
1583
     */
1584
    public void flush(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
1585

1586
        Instant start = Instant.now();
1✔
1587
        waitForConnectOrClose(timeout);
1✔
1588

1589
        if (isClosed()) {
1✔
1590
            throw new TimeoutException("Attempted to flush while closed");
1✔
1591
        }
1592

1593
        if (timeout == null || timeout.isNegative()) {
1✔
1594
            timeout = Duration.ZERO;
1✔
1595
        }
1596

1597
        Instant now = Instant.now();
1✔
1598
        Duration waitTime = Duration.between(start, now);
1✔
1599

1600
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
1✔
1601
            throw new TimeoutException("Timeout out waiting for connection before flush.");
1✔
1602
        }
1603

1604
        try {
1605
            Future<Boolean> waitForIt = sendPing();
1✔
1606

1607
            if (waitForIt == null) { // error in the send ping code
1✔
1608
                return;
×
1609
            }
1610

1611
            long nanos = timeout.toNanos();
1✔
1612

1613
            if (nanos > 0) {
1✔
1614

1615
                nanos -= waitTime.toNanos();
1✔
1616

1617
                if (nanos <= 0) {
1✔
1618
                    nanos = 1; // let the future timeout if it isn't resolved
×
1619
                }
1620

1621
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
1✔
1622
            }
1623
            else {
1624
                waitForIt.get();
1✔
1625
            }
1626

1627
            this.statistics.incrementFlushCounter();
1✔
1628
        }
1629
        catch (ExecutionException | CancellationException e) {
1✔
1630
            throw new TimeoutException(e.toString());
1✔
1631
        }
1✔
1632
    }
1✔
1633

1634
    void sendConnect(NatsUri nuri) throws IOException {
1635
        try {
1636
            ServerInfo info = this.serverInfo.get();
1✔
1637
            // This is changed - we used to use info.isAuthRequired(), but are changing it to
1638
            // better match older versions of the server. It may change again in the future.
1639
            CharBuffer connectOptions = options.buildProtocolConnectOptionsString(
1✔
1640
                nuri.toString(), true, info.getNonce());
1✔
1641
            ByteArrayBuilder bab =
1✔
1642
                new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
1✔
1643
                    .append(CONNECT_SP_BYTES).append(connectOptions);
1✔
1644
            queueInternalOutgoing(new ProtocolMessage(bab, false));
1✔
1645
        }
1646
        catch (Exception exp) {
1✔
1647
            throw new IOException("Error sending connect string", exp);
1✔
1648
        }
1✔
1649
    }
1✔
1650

1651
    CompletableFuture<Boolean> sendPing() {
1652
        return this.sendPing(true);
1✔
1653
    }
1654

1655
    void softPing() {
1656
        this.sendPing(false);
1✔
1657
    }
1✔
1658

1659
    /**
1660
     * {@inheritDoc}
1661
     */
1662
    @Override
1663
    @NonNull
1664
    public Duration RTT() throws IOException {
1665
        if (!isConnected()) {
1✔
1666
            throw new IOException("Must be connected to do RTT.");
1✔
1667
        }
1668

1669
        long timeout = options.getConnectionTimeout().toMillis();
1✔
1670
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1671
        pongQueue.add(pongFuture);
1✔
1672
        try {
1673
            long time = NatsSystemClock.nanoTime();
1✔
1674
            writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
1✔
1675
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
1✔
1676
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
1✔
1677
        }
1678
        catch (ExecutionException e) {
×
1679
            throw new IOException(e.getCause());
×
1680
        }
1681
        catch (TimeoutException e) {
×
1682
            throw new IOException(e);
×
1683
        }
1684
        catch (InterruptedException e) {
×
1685
            Thread.currentThread().interrupt();
×
1686
            throw new IOException(e);
×
1687
        }
1688
    }
1689

1690
    // Send a ping request and push a pong future on the queue.
1691
    // Futures are completed in order, keep this one if a thread wants to wait
1692
    // for a specific pong. Note, if no pong returns, the wait will not return
1693
    // without setting a timeout.
1694
    @Nullable
1695
    CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
1696
        if (!isConnectedOrConnecting()) {
1✔
1697
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1698
            retVal.complete(Boolean.FALSE);
1✔
1699
            return retVal;
1✔
1700
        }
1701

1702
        if (!treatAsInternal && !this.needPing.get()) {
1✔
1703
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1704
            retVal.complete(Boolean.TRUE);
1✔
1705
            this.needPing.set(true);
1✔
1706
            return retVal;
1✔
1707
        }
1708

1709
        int max = options.getMaxPingsOut();
1✔
1710
        if (max > 0 && pongQueue.size() + 1 > max) {
1✔
1711
            handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
1✔
1712
            return null;
1✔
1713
        }
1714

1715
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1716
        pongQueue.add(pongFuture);
1✔
1717

1718
        if (treatAsInternal) {
1✔
1719
            queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1720
        }
1721
        else {
1722
            queueOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1723
        }
1724

1725
        this.needPing.set(true);
1✔
1726
        this.statistics.incrementPingCount();
1✔
1727
        return pongFuture;
1✔
1728
    }
1729

1730
    // This is a minor speed / memory enhancement.
1731
    // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state,
1732
    // but it is safe to share the data bytes and the size since those fields are just being read
1733
    // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1734
    // reducing allocation of data for something that is often created and used.
1735
    // These static instances are the ones that are used for copying in sendPing and sendPong
1736
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1✔
1737
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1✔
1738

1739
    void sendPong() {
1740
        queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
1✔
1741
    }
1✔
1742

1743
    // Called by the reader
1744
    void handlePong() {
1745
        CompletableFuture<Boolean> pongFuture = pongQueue.pollFirst();
1✔
1746
        if (pongFuture != null) {
1✔
1747
            pongFuture.complete(Boolean.TRUE);
1✔
1748
        }
1749
    }
1✔
1750

1751
    void readInitialInfo() throws IOException {
1752
        byte[] readBuffer = new byte[options.getBufferSize()];
1✔
1753
        ByteBuffer protocolBuffer = ByteBuffer.allocate(options.getBufferSize());
1✔
1754
        boolean gotCRLF = false;
1✔
1755
        boolean gotCR = false;
1✔
1756

1757
        while (!gotCRLF) {
1✔
1758
            int read = this.dataPort.read(readBuffer, 0, readBuffer.length);
1✔
1759

1760
            if (read < 0) {
1✔
1761
                break;
1✔
1762
            }
1763

1764
            int i = 0;
1✔
1765
            while (i < read) {
1✔
1766
                byte b = readBuffer[i++];
1✔
1767

1768
                if (gotCR) {
1✔
1769
                    if (b != LF) {
1✔
1770
                        throw new IOException("Missed LF after CR waiting for INFO.");
1✔
1771
                    }
1772
                    else if (i < read) {
1✔
1773
                        throw new IOException("Read past initial info message.");
1✔
1774
                    }
1775

1776
                    gotCRLF = true;
1✔
1777
                    break;
1✔
1778
                }
1779

1780
                if (b == CR) {
1✔
1781
                    gotCR = true;
1✔
1782
                }
1783
                else {
1784
                    if (!protocolBuffer.hasRemaining()) {
1✔
1785
                        protocolBuffer = enlargeBuffer(protocolBuffer); // just double it
1✔
1786
                    }
1787
                    protocolBuffer.put(b);
1✔
1788
                }
1789
            }
1✔
1790
        }
1✔
1791

1792
        if (!gotCRLF) {
1✔
1793
            throw new IOException("Failed to read initial info message.");
1✔
1794
        }
1795

1796
        protocolBuffer.flip();
1✔
1797

1798
        String infoJson = UTF_8.decode(protocolBuffer).toString();
1✔
1799
        infoJson = infoJson.trim();
1✔
1800
        String[] msg = infoJson.split("\\s");
1✔
1801
        String op = msg[0].toUpperCase();
1✔
1802

1803
        if (!OP_INFO.equals(op)) {
1✔
1804
            throw new IOException("Received non-info initial message.");
1✔
1805
        }
1806

1807
        handleInfo(infoJson);
1✔
1808
    }
1✔
1809

1810
    void handleInfo(String infoJson) {
1811
        ServerInfo serverInfo = new ServerInfo(infoJson);
1✔
1812
        this.serverInfo.set(serverInfo);
1✔
1813

1814
        List<String> urls = this.serverInfo.get().getConnectURLs();
1✔
1815
        if (!urls.isEmpty()) {
1✔
1816
            if (serverPool.acceptDiscoveredUrls(urls)) {
1✔
1817
                processConnectionEvent(Events.DISCOVERED_SERVERS, urls.toString());
1✔
1818
            }
1819
        }
1820

1821
        if (serverInfo.isLameDuckMode()) {
1✔
1822
            processConnectionEvent(Events.LAME_DUCK, uriDetail(currentServer));
1✔
1823
        }
1824
    }
1✔
1825

1826
    void queueOutgoing(NatsMessage msg) {
1827
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1828
            throw new IllegalArgumentException("Control line is too long");
1✔
1829
        }
1830
        if (!writer.queue(msg)) {
1✔
1831
            options.getErrorListener().messageDiscarded(this, msg);
1✔
1832
        }
1833
    }
1✔
1834

1835
    void queueInternalOutgoing(NatsMessage msg) {
1836
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1837
            throw new IllegalArgumentException("Control line is too long");
×
1838
        }
1839
        this.writer.queueInternalMessage(msg);
1✔
1840
    }
1✔
1841

1842
    void deliverMessage(NatsMessage msg) {
1843
        this.needPing.set(false);
1✔
1844
        this.statistics.incrementInMsgs();
1✔
1845
        this.statistics.incrementInBytes(msg.getSizeInBytes());
1✔
1846

1847
        NatsSubscription sub = subscribers.get(msg.getSID());
1✔
1848

1849
        if (sub != null) {
1✔
1850
            msg.setSubscription(sub);
1✔
1851

1852
            NatsDispatcher d = sub.getNatsDispatcher();
1✔
1853
            NatsConsumer c = (d == null) ? sub : d;
1✔
1854
            MessageQueue q = ((d == null) ? sub.getMessageQueue() : d.getMessageQueue());
1✔
1855

1856
            if (c.hasReachedPendingLimits()) {
1✔
1857
                // Drop the message and count it
1858
                this.statistics.incrementDroppedCount();
1✔
1859
                c.incrementDroppedCount();
1✔
1860

1861
                // Notify the first time
1862
                if (!c.isMarkedSlow()) {
1✔
1863
                    c.markSlow();
1✔
1864
                    processSlowConsumer(c);
1✔
1865
                }
1866
            }
1867
            else if (q != null) {
1✔
1868
                c.markNotSlow();
1✔
1869

1870
                // beforeQueueProcessor returns true if the message is allowed to be queued
1871
                if (sub.getBeforeQueueProcessor().apply(msg)) {
1✔
1872
                    q.push(msg);
1✔
1873
                }
1874
            }
1875

1876
        }
1877
//        else {
1878
//            // Drop messages we don't have a subscriber for (could be extras on an
1879
//            // auto-unsub for example)
1880
//        }
1881
    }
1✔
1882

1883
    void processOK() {
1884
        this.statistics.incrementOkCount();
1✔
1885
    }
1✔
1886

1887
    void processSlowConsumer(Consumer consumer) {
1888
        if (!this.callbackRunner.isShutdown()) {
1✔
1889
            try {
1890
                this.callbackRunner.execute(() -> {
1✔
1891
                    try {
1892
                        options.getErrorListener().slowConsumerDetected(this, consumer);
1✔
1893
                    }
1894
                    catch (Exception ex) {
1✔
1895
                        this.statistics.incrementExceptionCount();
1✔
1896
                    }
1✔
1897
                });
1✔
1898
            }
NEW
1899
            catch (RejectedExecutionException re) {
×
1900
                // Timing with shutdown, let it go
1901
            }
1✔
1902
        }
1903
    }
1✔
1904

1905
    void processException(Exception exp) {
1906
        this.statistics.incrementExceptionCount();
1✔
1907

1908
        if (!this.callbackRunner.isShutdown()) {
1✔
1909
            try {
1910
                this.callbackRunner.execute(() -> {
1✔
1911
                    try {
1912
                        options.getErrorListener().exceptionOccurred(this, exp);
1✔
1913
                    }
1914
                    catch (Exception ex) {
1✔
1915
                        this.statistics.incrementExceptionCount();
1✔
1916
                    }
1✔
1917
                });
1✔
1918
            }
1919
            catch (RejectedExecutionException re) {
1✔
1920
                // Timing with shutdown, let it go
1921
            }
1✔
1922
        }
1923
    }
1✔
1924

1925
    void processError(String errorText) {
1926
        this.statistics.incrementErrCount();
1✔
1927

1928
        this.lastError.set(errorText);
1✔
1929
        this.connectError.set(errorText); // even if this isn't during connection, save it just in case
1✔
1930

1931
        // If we are connected && we get an authentication error, save it
1932
        if (this.isConnected() && this.isAuthenticationError(errorText) && currentServer != null) {
1✔
1933
            this.serverAuthErrors.put(currentServer, errorText);
1✔
1934
        }
1935

1936
        if (!this.callbackRunner.isShutdown()) {
1✔
1937
            try {
1938
                this.callbackRunner.execute(() -> {
1✔
1939
                    try {
1940
                        options.getErrorListener().errorOccurred(this, errorText);
1✔
1941
                    }
1942
                    catch (Exception ex) {
1✔
1943
                        this.statistics.incrementExceptionCount();
1✔
1944
                    }
1✔
1945
                });
1✔
1946
            }
NEW
1947
            catch (RejectedExecutionException re) {
×
1948
                // Timing with shutdown, let it go
1949
            }
1✔
1950
        }
1951
    }
1✔
1952

1953
    interface ErrorListenerCaller {
1954
        void call(Connection conn, ErrorListener el);
1955
    }
1956

1957
    void executeCallback(ErrorListenerCaller elc) {
1958
        if (!this.callbackRunner.isShutdown()) {
1✔
1959
            try {
1960
                this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
1✔
1961
            }
NEW
1962
            catch (RejectedExecutionException re) {
×
1963
                // Timing with shutdown, let it go
1964
            }
1✔
1965
        }
1966
    }
1✔
1967

1968
    String uriDetail(NatsUri uri) {
1969
        return uri == null ? null : uri.toString();
1✔
1970
    }
1971

1972
    String uriDetail(NatsUri uri, NatsUri hostOrlast) {
1973
        if (uri != null) {
1✔
1974
            if (hostOrlast == null || uri.equals(hostOrlast)) {
1✔
1975
                return uri.toString();
1✔
1976
            }
1977
            return uri + " [" + hostOrlast + "]";
1✔
1978
        }
NEW
1979
        return hostOrlast == null ? null : hostOrlast.toString();
×
1980
    }
1981

1982
    void processConnectionEvent(Events type, String uriDetails) {
1983
        if (!this.callbackRunner.isShutdown()) {
1✔
1984
            try {
1985
                for (ConnectionListener listener : connectionListeners) {
1✔
1986
                    this.callbackRunner.execute(() -> {
1✔
1987
                        try {
1988
                            listener.connectionEvent(this, type, uriDetails);
1✔
1989
                        } catch (Exception ex) {
1✔
1990
                            this.statistics.incrementExceptionCount();
1✔
1991
                        }
1✔
1992
                    });
1✔
1993
                }
1✔
1994
            } catch (RejectedExecutionException re) {
×
1995
                // Timing with shutdown, let it go
1996
            }
1✔
1997
        }
1998
    }
1✔
1999

2000
    /**
2001
     * {@inheritDoc}
2002
     */
2003
    @Override
2004
    @NonNull
2005
    public ServerInfo getServerInfo() {
2006
        return serverInfo.get();
1✔
2007
    }
2008

2009
    /**
2010
     * {@inheritDoc}
2011
     */
2012
    @Override
2013
    @Nullable
2014
    public InetAddress getClientInetAddress() {
2015
        try {
2016
            ServerInfo si = getServerInfo();
1✔
2017
            return si == ServerInfo.EMPTY_INFO ? null : NatsInetAddress.getByName(si.getClientIp());
1✔
2018
        }
2019
        catch (Exception e) {
×
2020
            return null;
×
2021
        }
2022
    }
2023

2024
    /**
2025
     * {@inheritDoc}
2026
     */
2027
    @Override
2028
    @NonNull
2029
    public Options getOptions() {
2030
        return this.options;
1✔
2031
    }
2032

2033
    /**
2034
     * {@inheritDoc}
2035
     */
2036
    @Override
2037
    @NonNull
2038
    public Statistics getStatistics() {
2039
        return this.statistics;
1✔
2040
    }
2041

2042
    StatisticsCollector getStatisticsCollector() {
2043
        return this.statistics;
1✔
2044
    }
2045

2046
    DataPort getDataPort() {
2047
        return this.dataPort;
1✔
2048
    }
2049

2050
    // Used for testing
2051
    int getConsumerCount() {
2052
        return this.subscribers.size() + this.dispatchers.size();
1✔
2053
    }
2054

2055
    /**
2056
     * {@inheritDoc}
2057
     */
2058
    @Override
2059
    public long getMaxPayload() {
2060
        ServerInfo info = this.serverInfo.get();
1✔
2061

2062
        if (info == null) {
1✔
2063
            return -1;
×
2064
        }
2065

2066
        return info.getMaxPayload();
1✔
2067
    }
2068

2069
    /**
2070
     * {@inheritDoc}
2071
     */
2072
    @Override
2073
    @NonNull
2074
    public Collection<String> getServers() {
2075
        return serverPool.getServerList();
1✔
2076
    }
2077

2078
    protected List<NatsUri> resolveHost(NatsUri nuri) {
2079
        // 1. If the nuri host is not already an ip address or the nuri is not for websocket or fast fallback is disabled,
2080
        //    let the pool resolve it.
2081
        List<NatsUri> results = new ArrayList<>();
1✔
2082
        if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && !options.isEnableFastFallback()) {
1✔
2083
            List<String> ips = serverPool.resolveHostToIps(nuri.getHost());
1✔
2084
            if (ips != null) {
1✔
2085
                for (String ip : ips) {
1✔
2086
                    try {
2087
                        results.add(nuri.reHost(ip));
1✔
2088
                    }
2089
                    catch (URISyntaxException u) {
1✔
2090
                        // ??? should never happen
2091
                    }
1✔
2092
                }
1✔
2093
            }
2094
        }
2095

2096
        // 2. If there were no results,
2097
        //    - host was already an ip address or
2098
        //    - host was for websocket or
2099
        //    - fast fallback is enabled
2100
        //    - pool returned nothing or
2101
        //    - resolving failed...
2102
        //    so the list just becomes the original host.
2103
        if (results.isEmpty()) {
1✔
2104
            results.add(nuri);
1✔
2105
        }
2106
        return results;
1✔
2107
    }
2108

2109
    /**
2110
     * {@inheritDoc}
2111
     */
2112
    @Override
2113
    @Nullable
2114
    public String getConnectedUrl() {
2115
        return currentServer == null ? null : currentServer.toString();
1✔
2116
    }
2117

2118
    /**
2119
     * {@inheritDoc}
2120
     */
2121
    @Override
2122
    @NonNull
2123
    public Status getStatus() {
2124
        return this.status;
1✔
2125
    }
2126

2127
    /**
2128
     * {@inheritDoc}
2129
     */
2130
    @Override
2131
    @Nullable
2132
    public String getLastError() {
2133
        return lastError.get();
1✔
2134
    }
2135

2136
    /**
2137
     * {@inheritDoc}
2138
     */
2139
    @Override
2140
    public void clearLastError() {
2141
        lastError.set(null);
1✔
2142
    }
1✔
2143

2144
    ExecutorService getExecutor() {
2145
        return executor;
1✔
2146
    }
2147

2148
    ScheduledExecutorService getScheduledExecutor() {
2149
        return scheduledExecutor;
1✔
2150
    }
2151

2152
    void updateStatus(Status newStatus) {
2153
        updateStatus(newStatus, uriDetail(currentServer == null ? lastServer : currentServer));
1✔
2154
    }
1✔
2155

2156
    void updateStatus(Status newStatus, NatsUri resolvedUri, NatsUri hostUri) {
2157
        updateStatus(newStatus, uriDetail(resolvedUri, hostUri));
1✔
2158
    }
1✔
2159

2160
    void updateStatus(Status newStatus, String uriDetail) {
2161
        Status oldStatus = this.status;
1✔
2162

2163
        statusLock.lock();
1✔
2164
        try {
2165
            if (oldStatus == Status.CLOSED || newStatus == oldStatus) {
1✔
2166
                return;
1✔
2167
            }
2168
            this.status = newStatus;
1✔
2169
        } finally {
2170
            statusChanged.signalAll();
1✔
2171
            statusLock.unlock();
1✔
2172
        }
2173

2174
        if (this.status == Status.DISCONNECTED) {
1✔
2175
            processConnectionEvent(Events.DISCONNECTED, uriDetail);
1✔
2176
        }
2177
        else if (this.status == Status.CLOSED) {
1✔
2178
            processConnectionEvent(Events.CLOSED, uriDetail);
1✔
2179
        }
2180
        else if (oldStatus == Status.RECONNECTING && this.status == Status.CONNECTED) {
1✔
2181
            processConnectionEvent(Events.RECONNECTED, uriDetail);
1✔
2182
        }
2183
        else if (this.status == Status.CONNECTED) {
1✔
2184
            processConnectionEvent(Events.CONNECTED, uriDetail);
1✔
2185
        }
2186
    }
1✔
2187

2188
    boolean isClosing() {
2189
        return this.closing;
1✔
2190
    }
2191

2192
    boolean isClosed() {
2193
        return this.status == Status.CLOSED;
1✔
2194
    }
2195

2196
    boolean isConnected() {
2197
        return this.status == Status.CONNECTED;
1✔
2198
    }
2199

2200
    boolean isDisconnected() {
2201
        return this.status == Status.DISCONNECTED;
×
2202
    }
2203

2204
    boolean isConnectedOrConnecting() {
2205
        statusLock.lock();
1✔
2206
        try {
2207
            return this.status == Status.CONNECTED || this.connecting;
1✔
2208
        } finally {
2209
            statusLock.unlock();
1✔
2210
        }
2211
    }
2212

2213
    boolean isDisconnectingOrClosed() {
2214
        statusLock.lock();
1✔
2215
        try {
2216
            return this.status == Status.CLOSED || this.disconnecting;
1✔
2217
        } finally {
2218
            statusLock.unlock();
1✔
2219
        }
2220
    }
2221

2222
    boolean isDisconnecting() {
2223
        statusLock.lock();
1✔
2224
        try {
2225
            return this.disconnecting;
1✔
2226
        } finally {
2227
            statusLock.unlock();
1✔
2228
        }
2229
    }
2230

2231
    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
2232
        waitWhile(timeout, (Void) -> this.isDisconnecting() && !this.isClosed() );
1✔
2233
    }
1✔
2234

2235
    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
2236
        waitWhile(timeout, (Void) -> !this.isConnected() && !this.isClosed());
1✔
2237
    }
1✔
2238

2239
    void waitWhile(Duration timeout, Predicate<Void> waitWhileTrue) throws InterruptedException {
2240
        statusLock.lock();
1✔
2241
        try {
2242
            long currentWaitNanos = (timeout != null) ? timeout.toNanos() : -1;
1✔
2243
            long start = NatsSystemClock.nanoTime();
1✔
2244
            while (currentWaitNanos >= 0 && waitWhileTrue.test(null)) {
1✔
2245
                if (currentWaitNanos > 0) {
1✔
2246
                    if (statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS) && !waitWhileTrue.test(null)) {
1✔
2247
                        break;
1✔
2248
                    }
2249
                    long now = NatsSystemClock.nanoTime();
1✔
2250
                    currentWaitNanos = currentWaitNanos - (now - start);
1✔
2251
                    start = now;
1✔
2252

2253
                    if (currentWaitNanos <= 0) {
1✔
2254
                        break;
1✔
2255
                    }
2256
                }
1✔
2257
                else {
2258
                    statusChanged.await();
×
2259
                }
2260
            }
2261
        }
2262
        finally {
2263
            statusLock.unlock();
1✔
2264
        }
2265
    }
1✔
2266

2267
    void invokeReconnectDelayHandler(long totalRounds) {
2268
        long currentWaitNanos = 0;
1✔
2269

2270
        ReconnectDelayHandler handler = options.getReconnectDelayHandler();
1✔
2271
        if (handler == null) {
1✔
2272
            Duration dur = options.getReconnectWait();
1✔
2273
            if (dur != null) {
1✔
2274
                currentWaitNanos = dur.toNanos();
1✔
2275
                dur = serverPool.hasSecureServer() ? options.getReconnectJitterTls() : options.getReconnectJitter();
1✔
2276
                if (dur != null) {
1✔
2277
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
1✔
2278
                }
2279
            }
2280
        }
1✔
2281
        else {
2282
            Duration waitTime = handler.getWaitTime(totalRounds);
1✔
2283
            if (waitTime != null) {
1✔
2284
                currentWaitNanos = waitTime.toNanos();
1✔
2285
            }
2286
        }
2287

2288
        this.reconnectWaiter = new CompletableFuture<>();
1✔
2289

2290
        long start = NatsSystemClock.nanoTime();
1✔
2291
        while (currentWaitNanos > 0 && !isDisconnectingOrClosed() && !isConnected() && !this.reconnectWaiter.isDone()) {
1✔
2292
            try {
2293
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
×
2294
            } catch (Exception exp) {
1✔
2295
                // ignore, try to loop again
2296
            }
×
2297
            long now = NatsSystemClock.nanoTime();
1✔
2298
            currentWaitNanos = currentWaitNanos - (now - start);
1✔
2299
            start = now;
1✔
2300
        }
1✔
2301

2302
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
2303
    }
1✔
2304

2305
    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
2306
        int current = buffer.capacity();
1✔
2307
        int newSize = current * 2;
1✔
2308
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
1✔
2309
        buffer.flip();
1✔
2310
        newBuffer.put(buffer);
1✔
2311
        return newBuffer;
1✔
2312
    }
2313

2314
    // For testing
2315
    NatsConnectionReader getReader() {
2316
        return this.reader;
1✔
2317
    }
2318

2319
    // For testing
2320
    NatsConnectionWriter getWriter() {
2321
        return this.writer;
1✔
2322
    }
2323

2324
    // For testing
2325
    Future<DataPort> getDataPortFuture() {
2326
        return this.dataPortFuture;
1✔
2327
    }
2328

2329
    boolean isDraining() {
2330
        return this.draining.get() != null;
1✔
2331
    }
2332

2333
    boolean isDrained() {
2334
        CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2335

2336
        try {
2337
            if (tracker != null && tracker.getNow(false)) {
1✔
2338
                return true;
1✔
2339
            }
2340
        } catch (Exception e) {
×
2341
            // These indicate the tracker was cancelled/timed out
2342
        }
1✔
2343

2344
        return false;
1✔
2345
    }
2346

2347
    /**
2348
     * {@inheritDoc}
2349
     */
2350
    @Override
2351
    @NonNull
2352
    public CompletableFuture<Boolean> drain(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
2353

2354
        if (isClosing() || isClosed()) {
1✔
2355
            throw new IllegalStateException("A connection can't be drained during close.");
1✔
2356
        }
2357

2358
        this.statusLock.lock();
1✔
2359
        try {
2360
            if (isDraining()) {
1✔
2361
                return this.draining.get();
1✔
2362
            }
2363
            this.draining.set(new CompletableFuture<>());
1✔
2364
        } finally {
2365
            this.statusLock.unlock();
1✔
2366
        }
2367

2368
        final CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2369
        Instant start = Instant.now();
1✔
2370

2371
        // Don't include subscribers with dispatchers
2372
        HashSet<NatsSubscription> pureSubscribers = new HashSet<>(this.subscribers.values());
1✔
2373
        pureSubscribers.removeIf((s) -> s.getDispatcher() != null);
1✔
2374

2375
        final HashSet<NatsConsumer> consumers = new HashSet<>();
1✔
2376
        consumers.addAll(pureSubscribers);
1✔
2377
        consumers.addAll(this.dispatchers.values());
1✔
2378

2379
        NatsDispatcher inboxer = this.inboxDispatcher.get();
1✔
2380

2381
        if (inboxer != null) {
1✔
2382
            consumers.add(inboxer);
1✔
2383
        }
2384

2385
        // Stop the consumers NOW so that when this method returns they are blocked
2386
        consumers.forEach((cons) -> {
1✔
2387
            cons.markDraining(tracker);
1✔
2388
            cons.sendUnsubForDrain();
1✔
2389
        });
1✔
2390

2391
        try {
2392
            this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
1✔
2393
        } catch (Exception e) {
1✔
2394
            this.close(false, false);
1✔
2395
            throw e;
1✔
2396
        }
1✔
2397

2398
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
1✔
2399

2400
        // Wait for the timeout or all consumers are drained
2401
        executor.submit(() -> {
1✔
2402
            try {
2403
                long timeoutNanos = (timeout == null || timeout.toNanos() <= 0)
1✔
2404
                    ? Long.MAX_VALUE : timeout.toNanos();
1✔
2405
                long startTime = System.nanoTime();
1✔
2406
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
1✔
2407
                    consumers.removeIf(NatsConsumer::isDrained);
1✔
2408
                    if (consumers.isEmpty()) {
1✔
2409
                        break;
1✔
2410
                    }
2411
                    //noinspection BusyWait
2412
                    Thread.sleep(1); // Sleep 1 milli
1✔
2413
                }
2414

2415
                // Stop publishing
2416
                this.blockPublishForDrain.set(true);
1✔
2417

2418
                // One last flush
2419
                if (timeout == null || timeout.equals(Duration.ZERO)) {
1✔
2420
                    this.flush(Duration.ZERO);
1✔
2421
                } else {
2422
                    Instant now = Instant.now();
1✔
2423
                    Duration passed = Duration.between(start, now);
1✔
2424
                    Duration newTimeout = timeout.minus(passed);
1✔
2425
                    if (newTimeout.toNanos() > 0) {
1✔
2426
                        this.flush(newTimeout);
1✔
2427
                    }
2428
                }
2429
                this.close(false, false); // close the connection after the last flush
1✔
2430
                tracker.complete(consumers.isEmpty());
1✔
2431
            } catch (TimeoutException e) {
×
2432
                this.processException(e);
×
2433
            } catch (InterruptedException e) {
×
2434
                this.processException(e);
×
2435
                Thread.currentThread().interrupt();
×
2436
            } finally {
2437
                try {
2438
                    this.close(false, false);// close the connection after the last flush
1✔
2439
                } catch (InterruptedException e) {
×
2440
                    processException(e);
×
2441
                    Thread.currentThread().interrupt();
×
2442
                }
1✔
2443
                tracker.complete(false);
1✔
2444
            }
2445
        });
1✔
2446

2447
        return tracker;
1✔
2448
    }
2449

2450
    boolean isAuthenticationError(String err) {
2451
        if (err == null) {
1✔
2452
            return false;
1✔
2453
        }
2454
        err = err.toLowerCase();
1✔
2455
        return err.startsWith("user authentication")
1✔
2456
            || err.contains("authorization violation")
1✔
2457
            || err.startsWith("account authentication expired");
1✔
2458
    }
2459

2460
    /**
2461
     * {@inheritDoc}
2462
     */
2463
    @Override
2464
    public void flushBuffer() throws IOException {
2465
        if (!isConnected()) {
1✔
2466
            throw new IllegalStateException("Connection is not active.");
1✔
2467
        }
2468
        writer.flushBuffer();
1✔
2469
    }
1✔
2470

2471
    /**
2472
     * {@inheritDoc}
2473
     */
2474
    @Override
2475
    @NonNull public StreamContext getStreamContext(@NonNull String streamName) throws IOException, JetStreamApiException {
2476
        Validator.validateStreamName(streamName, true);
1✔
2477
        ensureNotClosing();
1✔
2478
        return new NatsStreamContext(streamName, null, this, null);
1✔
2479
    }
2480

2481
    /**
2482
     * {@inheritDoc}
2483
     */
2484
    @Override
2485
    @NonNull
2486
    public StreamContext getStreamContext(@NonNull String streamName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
2487
        Validator.validateStreamName(streamName, true);
1✔
2488
        ensureNotClosing();
1✔
2489
        return new NatsStreamContext(streamName, null, this, options);
1✔
2490
    }
2491

2492
    /**
2493
     * {@inheritDoc}
2494
     */
2495
    @Override
2496
    @NonNull
2497
    public ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName) throws IOException, JetStreamApiException {
2498
        return getStreamContext(streamName).getConsumerContext(consumerName);
1✔
2499
    }
2500

2501
    /**
2502
     * {@inheritDoc}
2503
     */
2504
    @Override
2505
    @NonNull
2506
    public ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
2507
        return getStreamContext(streamName, options).getConsumerContext(consumerName);
1✔
2508
    }
2509

2510
    /**
2511
     * {@inheritDoc}
2512
     */
2513
    @Override
2514
    @NonNull
2515
    public JetStream jetStream() throws IOException {
2516
        return jetStream(null);
1✔
2517
    }
2518

2519
    /**
2520
     * {@inheritDoc}
2521
     */
2522
    @Override
2523
    @NonNull
2524
    public JetStream jetStream(JetStreamOptions options) throws IOException {
2525
        ensureNotClosing();
1✔
2526
        return new NatsJetStream(this, options);
1✔
2527
    }
2528

2529
    /**
2530
     * {@inheritDoc}
2531
     */
2532
    @Override
2533
    @NonNull
2534
    public JetStreamManagement jetStreamManagement() throws IOException {
2535
        return jetStreamManagement(null);
1✔
2536
    }
2537

2538
    /**
2539
     * {@inheritDoc}
2540
     */
2541
    @Override
2542
    @NonNull
2543
    public JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
2544
        ensureNotClosing();
1✔
2545
        return new NatsJetStreamManagement(this, options);
1✔
2546
    }
2547

2548
    /**
2549
     * {@inheritDoc}
2550
     */
2551
    @Override
2552
    @NonNull
2553
    public KeyValue keyValue(@NonNull String bucketName) throws IOException {
2554
        return keyValue(bucketName, null);
1✔
2555
    }
2556

2557
    /**
2558
     * {@inheritDoc}
2559
     */
2560
    @Override
2561
    @NonNull
2562
    public KeyValue keyValue(@NonNull String bucketName, @Nullable KeyValueOptions options) throws IOException {
2563
        Validator.validateBucketName(bucketName, true);
1✔
2564
        ensureNotClosing();
1✔
2565
        return new NatsKeyValue(this, bucketName, options);
1✔
2566
    }
2567

2568
    /**
2569
     * {@inheritDoc}
2570
     */
2571
    @Override
2572
    @NonNull
2573
    public KeyValueManagement keyValueManagement() throws IOException {
2574
        return keyValueManagement(null);
1✔
2575
    }
2576

2577
    /**
2578
     * {@inheritDoc}
2579
     */
2580
    @Override
2581
    @NonNull
2582
    public KeyValueManagement keyValueManagement(@Nullable KeyValueOptions options) throws IOException {
2583
        ensureNotClosing();
1✔
2584
        return new NatsKeyValueManagement(this, options);
1✔
2585
    }
2586

2587
    /**
2588
     * {@inheritDoc}
2589
     */
2590
    @Override
2591
    @NonNull
2592
    public ObjectStore objectStore(@NonNull String bucketName) throws IOException {
2593
        return objectStore(bucketName, null);
1✔
2594
    }
2595

2596
    /**
2597
     * {@inheritDoc}
2598
     */
2599
    @Override
2600
    @NonNull
2601
    public ObjectStore objectStore(@NonNull String bucketName, @Nullable ObjectStoreOptions options) throws IOException {
2602
        Validator.validateBucketName(bucketName, true);
1✔
2603
        ensureNotClosing();
1✔
2604
        return new NatsObjectStore(this, bucketName, options);
1✔
2605
    }
2606

2607
    /**
2608
     * {@inheritDoc}
2609
     */
2610
    @Override
2611
    @NonNull
2612
    public ObjectStoreManagement objectStoreManagement() throws IOException {
2613
        ensureNotClosing();
1✔
2614
        return new NatsObjectStoreManagement(this, null);
1✔
2615
    }
2616

2617
    /**
2618
     * {@inheritDoc}
2619
     */
2620
    @Override
2621
    @NonNull
2622
    public ObjectStoreManagement objectStoreManagement(@Nullable ObjectStoreOptions options) throws IOException {
2623
        ensureNotClosing();
1✔
2624
        return new NatsObjectStoreManagement(this, options);
1✔
2625
    }
2626

2627
    private void ensureNotClosing() throws IOException {
2628
        if (isClosing() || isClosed()) {
1✔
2629
            throw new IOException("A JetStream context can't be established during close.");
1✔
2630
        }
2631
    }
1✔
2632

2633
    /**
2634
     * {@inheritDoc}
2635
     */
2636
    @Override
2637
    public long outgoingPendingMessageCount() {
2638
        closeSocketLock.lock();
×
2639
        try {
2640
            return writer == null ? -1 : writer.outgoingPendingMessageCount();
×
2641
        }
2642
        finally {
2643
            closeSocketLock.unlock();
×
2644
        }
2645
    }
2646

2647
    /**
2648
     * {@inheritDoc}
2649
     */
2650
    @Override
2651
    public long outgoingPendingBytes() {
2652
        closeSocketLock.lock();
×
2653
        try {
2654
            return writer == null ? -1 : writer.outgoingPendingBytes();
×
2655
        }
2656
        finally {
2657
            closeSocketLock.unlock();
×
2658
        }
2659
    }
2660
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc