• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2055

15 Jul 2025 11:44AM UTC coverage: 95.67% (+0.09%) from 95.581%
#2055

push

github

web-flow
Merge pull request #1354 from nats-io/ordered-consumer-name-argh

[Fix] Simplified Ordered Consumer - Getting name early can cause NPE

1 of 2 new or added lines in 1 file covered. (50.0%)

2 existing lines in 2 files now uncovered.

11844 of 12380 relevant lines covered (95.67%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.16
/src/main/java/io/nats/client/impl/NatsConnection.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.ConnectionListener.Events;
18
import io.nats.client.api.ServerInfo;
19
import io.nats.client.support.*;
20

21
import java.io.IOException;
22
import java.net.InetAddress;
23
import java.net.URISyntaxException;
24
import java.nio.ByteBuffer;
25
import java.nio.CharBuffer;
26
import java.time.Duration;
27
import java.time.Instant;
28
import java.util.*;
29
import java.util.concurrent.*;
30
import java.util.concurrent.atomic.AtomicBoolean;
31
import java.util.concurrent.atomic.AtomicLong;
32
import java.util.concurrent.atomic.AtomicReference;
33
import java.util.concurrent.locks.Condition;
34
import java.util.concurrent.locks.ReentrantLock;
35
import java.util.function.Predicate;
36

37
import static io.nats.client.support.NatsConstants.*;
38
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
39
import static io.nats.client.support.Validator.*;
40
import static java.nio.charset.StandardCharsets.UTF_8;
41

42
class NatsConnection implements Connection {
43

44
    public static final double NANOS_PER_SECOND = 1_000_000_000.0;
45

46
    private final Options options;
47
    final boolean forceFlushOnRequest;
48

49
    private final StatisticsCollector statistics;
50

51
    private boolean connecting; // you can only connect in one thread
52
    private boolean disconnecting; // you can only disconnect in one thread
53
    private boolean closing; // respect a close call regardless
54
    private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
55
    final ReentrantLock closeSocketLock;
56

57
    private Status status;
58
    private final ReentrantLock statusLock;
59
    private final Condition statusChanged;
60

61
    private CompletableFuture<DataPort> dataPortFuture;
62
    private DataPort dataPort;
63
    private NatsUri currentServer;
64
    private CompletableFuture<Boolean> reconnectWaiter;
65
    private final HashMap<NatsUri, String> serverAuthErrors;
66

67
    private NatsConnectionReader reader;
68
    private NatsConnectionWriter writer;
69

70
    private final AtomicReference<ServerInfo> serverInfo;
71

72
    private final Map<String, NatsSubscription> subscribers;
73
    private final Map<String, NatsDispatcher> dispatchers; // use a concurrent map so we get more consistent iteration behavior
74
    private final Collection<ConnectionListener> connectionListeners;
75
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
76
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
77
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
78

79
    private final String mainInbox;
80
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
81
    private final ReentrantLock inboxDispatcherLock;
82
    private ScheduledTask pingTask;
83
    private ScheduledTask cleanupTask;
84

85
    private final AtomicBoolean needPing;
86

87
    private final AtomicLong nextSid;
88
    private final NUID nuid;
89

90
    private final AtomicReference<String> connectError;
91
    private final AtomicReference<String> lastError;
92
    private final AtomicReference<CompletableFuture<Boolean>> draining;
93
    private final AtomicBoolean blockPublishForDrain;
94
    private final AtomicBoolean tryingToConnect;
95

96
    private final ExecutorService callbackRunner;
97
    private final ExecutorService executor;
98
    private final ExecutorService connectExecutor;
99
    private final ScheduledExecutorService scheduledExecutor;
100
    private final boolean advancedTracking;
101

102
    private final ServerPool serverPool;
103
    private final DispatcherFactory dispatcherFactory;
104
    final CancelAction cancelAction;
105

106
    private final boolean trace;
107
    private final TimeTraceLogger timeTraceLogger;
108

109
    NatsConnection(Options options) {
1✔
110
        trace = options.isTraceConnection();
1✔
111
        timeTraceLogger = options.getTimeTraceLogger();
1✔
112
        timeTraceLogger.trace("creating connection object");
1✔
113

114
        this.options = options;
1✔
115
        forceFlushOnRequest = options.forceFlushOnRequest();
1✔
116

117
        advancedTracking = options.isTrackAdvancedStats();
1✔
118
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
1✔
119
        this.statistics.setAdvancedTracking(advancedTracking);
1✔
120

121
        this.closeSocketLock = new ReentrantLock();
1✔
122

123
        this.statusLock = new ReentrantLock();
1✔
124
        this.statusChanged = this.statusLock.newCondition();
1✔
125
        this.status = Status.DISCONNECTED;
1✔
126
        this.reconnectWaiter = new CompletableFuture<>();
1✔
127
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
128

129
        this.connectionListeners = ConcurrentHashMap.newKeySet();
1✔
130
        if (options.getConnectionListener() != null) {
1✔
131
            addConnectionListener(options.getConnectionListener());
1✔
132
        }
133

134
        this.dispatchers = new ConcurrentHashMap<>();
1✔
135
        this.subscribers = new ConcurrentHashMap<>();
1✔
136
        this.responsesAwaiting = new ConcurrentHashMap<>();
1✔
137
        this.responsesRespondedTo = new ConcurrentHashMap<>();
1✔
138

139
        this.serverAuthErrors = new HashMap<>();
1✔
140

141
        this.nextSid = new AtomicLong(1);
1✔
142
        timeTraceLogger.trace("creating NUID");
1✔
143
        this.nuid = new NUID();
1✔
144
        this.mainInbox = createInbox() + ".*";
1✔
145

146
        this.lastError = new AtomicReference<>();
1✔
147
        this.connectError = new AtomicReference<>();
1✔
148

149
        this.serverInfo = new AtomicReference<>();
1✔
150
        this.inboxDispatcher = new AtomicReference<>();
1✔
151
        this.inboxDispatcherLock = new ReentrantLock();
1✔
152
        this.pongQueue = new ConcurrentLinkedDeque<>();
1✔
153
        this.draining = new AtomicReference<>();
1✔
154
        this.blockPublishForDrain = new AtomicBoolean();
1✔
155
        this.tryingToConnect = new AtomicBoolean();
1✔
156

157
        timeTraceLogger.trace("creating executors");
1✔
158
        this.executor = options.getExecutor();
1✔
159
        this.callbackRunner = options.getCallbackExecutor();
1✔
160
        this.connectExecutor = options.getConnectExecutor();
1✔
161
        this.scheduledExecutor = options.getScheduledExecutor();
1✔
162

163
        timeTraceLogger.trace("creating reader and writer");
1✔
164
        this.reader = new NatsConnectionReader(this);
1✔
165
        this.writer = new NatsConnectionWriter(this, null);
1✔
166

167
        this.needPing = new AtomicBoolean(true);
1✔
168

169
        serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
1✔
170
        serverPool.initialize(options);
1✔
171
        dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
1✔
172

173
        cancelAction = options.isReportNoResponders() ? CancelAction.REPORT : CancelAction.CANCEL;
1✔
174

175
        timeTraceLogger.trace("connection object created");
1✔
176
    }
1✔
177

178
    // Connect is only called after creation
179
    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
180
        if (!tryingToConnect.get()) {
1✔
181
            try {
182
                tryingToConnect.set(true);
1✔
183
                connectImpl(reconnectOnConnect);
1✔
184
            }
185
            finally {
186
                tryingToConnect.set(false);
1✔
187
            }
188
        }
189
    }
1✔
190

191
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
192
        if (options.getServers().isEmpty()) {
1✔
193
            throw new IllegalArgumentException("No servers provided in options");
×
194
        }
195

196
        boolean trace = options.isTraceConnection();
1✔
197
        long start = NatsSystemClock.nanoTime();
1✔
198

199
        this.lastError.set("");
1✔
200

201
        timeTraceLogger.trace("starting connect loop");
1✔
202

203
        Set<NatsUri> failList = new HashSet<>();
1✔
204
        boolean keepGoing = true;
1✔
205
        NatsUri first = null;
1✔
206
        NatsUri cur;
207
        while (keepGoing && (cur = serverPool.peekNextServer()) != null) {
1✔
208
            if (first == null) {
1✔
209
                first = cur;
1✔
210
            }
211
            else if (cur.equals(first)) {
1✔
212
                break;  // connect only goes through loop once
1✔
213
            }
214
            serverPool.nextServer(); // b/c we only peeked.
1✔
215

216
            // let server pool resolve hostnames, then loop through resolved
217
            List<NatsUri> resolvedList = resolveHost(cur);
1✔
218
            for (NatsUri resolved : resolvedList) {
1✔
219
                if (isClosed()) {
1✔
220
                    keepGoing = false;
1✔
221
                    break;
1✔
222
                }
223
                connectError.set(""); // new on each attempt
1✔
224

225
                timeTraceLogger.trace("setting status to connecting");
1✔
226
                updateStatus(Status.CONNECTING);
1✔
227

228
                timeTraceLogger.trace("trying to connect to %s", cur);
1✔
229
                tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
230

231
                if (isConnected()) {
1✔
232
                    serverPool.connectSucceeded(cur);
1✔
233
                    keepGoing = false;
1✔
234
                    break;
1✔
235
                }
236

237
                timeTraceLogger.trace("setting status to disconnected");
1✔
238
                updateStatus(Status.DISCONNECTED);
1✔
239

240
                failList.add(cur);
1✔
241
                serverPool.connectFailed(cur);
1✔
242

243
                String err = connectError.get();
1✔
244

245
                if (this.isAuthenticationError(err)) {
1✔
246
                    this.serverAuthErrors.put(resolved, err);
1✔
247
                }
248
            }
1✔
249
        }
1✔
250

251
        if (!isConnected() && !isClosed()) {
1✔
252
            if (reconnectOnConnect) {
1✔
253
                timeTraceLogger.trace("trying to reconnect on connect");
1✔
254
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
255
            }
256
            else {
257
                timeTraceLogger.trace("connection failed, closing to cleanup");
1✔
258
                close();
1✔
259

260
                String err = connectError.get();
1✔
261
                if (this.isAuthenticationError(err)) {
1✔
262
                    throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
1✔
263
                }
264
                throw new IOException("Unable to connect to NATS servers: " + failList);
1✔
265
            }
266
        }
267
        else if (trace) {
1✔
268
            long end = NatsSystemClock.nanoTime();
1✔
269
            double seconds = ((double) (end - start)) / NANOS_PER_SECOND;
1✔
270
            timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
1✔
271
        }
272
    }
1✔
273

274
    @Override
275
    public void forceReconnect() throws IOException, InterruptedException {
276
        forceReconnect(null);
1✔
277
    }
1✔
278

279
    @Override
280
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
281
        if (!tryingToConnect.get()) {
1✔
282
            try {
283
                tryingToConnect.set(true);
1✔
284
                forceReconnectImpl(options);
1✔
285
            }
286
            finally {
287
                tryingToConnect.set(false);
1✔
288
            }
289
        }
290
    }
1✔
291

292
    void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedException {
293
        if (options != null && options.getFlushWait() != null) {
1✔
294
            try {
295
                flush(options.getFlushWait());
×
296
            }
297
            catch (TimeoutException e) {
×
298
                // ignore, don't care, too bad;
299
            }
×
300
        }
301

302
        closeSocketLock.lock();
1✔
303
        try {
304
            updateStatus(Status.DISCONNECTED);
1✔
305

306
            // Close and reset the current data port and future
307
            if (dataPortFuture != null) {
1✔
308
                dataPortFuture.cancel(true);
1✔
309
                dataPortFuture = null;
1✔
310
            }
311

312
            // close the data port as a task so as not to block reconnect
313
            if (dataPort != null) {
1✔
314
                final DataPort closeMe = dataPort;
1✔
315
                dataPort = null;
1✔
316
                executor.submit(() -> {
1✔
317
                    try {
318
                        if (options != null && options.isForceClose()) {
1✔
319
                            closeMe.forceClose();
1✔
320
                        }
321
                        else {
322
                            closeMe.close();
1✔
323
                        }
324
                    }
325
                    catch (IOException ignore) {
×
326
                    }
1✔
327
                });
1✔
328
            }
329

330
            // stop i/o
331
            try {
332
                this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
1✔
333
            }
334
            catch (Exception ex) {
×
335
                processException(ex);
×
336
            }
1✔
337
            try {
338
                this.writer.stop().get(100, TimeUnit.MILLISECONDS);
1✔
339
            }
340
            catch (Exception ex) {
×
341
                processException(ex);
×
342
            }
1✔
343

344
            // new reader/writer
345
            reader = new NatsConnectionReader(this);
1✔
346
            writer = new NatsConnectionWriter(this, writer);
1✔
347
        }
348
        finally {
349
            closeSocketLock.unlock();
1✔
350
        }
351

352
        // calling connect just starts like a new connection versus reconnect
353
        // but we have to manually resubscribe like reconnect once it is connected
354
        reconnectImpl();
1✔
355
        writer.setReconnectMode(false);
1✔
356
    }
1✔
357

358
    void reconnect() throws InterruptedException {
359
        if (!tryingToConnect.get()) {
1✔
360
            try {
361
                tryingToConnect.set(true);
1✔
362
                reconnectImpl();
1✔
363
            }
364
            finally {
365
                tryingToConnect.set(false);
1✔
366
            }
367
        }
368
    }
1✔
369

370
    // Reconnect can only be called when the connection is disconnected
371
    void reconnectImpl() throws InterruptedException {
372
        if (isClosed()) {
1✔
373
            return;
1✔
374
        }
375

376
        if (options.getMaxReconnect() == 0) {
1✔
377
            this.close();
1✔
378
            return;
1✔
379
        }
380

381
        writer.setReconnectMode(true);
1✔
382

383
        if (!isConnected() && !isClosed() && !this.isClosing()) {
1✔
384
            boolean keepGoing = true;
1✔
385
            int totalRounds = 0;
1✔
386
            NatsUri first = null;
1✔
387
            NatsUri cur;
388
            while (keepGoing && (cur = serverPool.nextServer()) != null) {
1✔
389
                if (first == null) {
1✔
390
                    first = cur;
1✔
391
                }
392
                else if (first.equals(cur)) {
1✔
393
                    // went around the pool an entire time
394
                    invokeReconnectDelayHandler(++totalRounds);
1✔
395
                }
396

397
                // let server list provider resolve hostnames
398
                // then loop through resolved
399
                List<NatsUri> resolvedList = resolveHost(cur);
1✔
400
                for (NatsUri resolved : resolvedList) {
1✔
401
                    if (isClosed()) {
1✔
402
                        keepGoing = false;
1✔
403
                        break;
1✔
404
                    }
405
                    connectError.set(""); // reset on each loop
1✔
406
                    if (isDisconnectingOrClosed() || this.isClosing()) {
1✔
407
                        keepGoing = false;
1✔
408
                        break;
1✔
409
                    }
410
                    updateStatus(Status.RECONNECTING);
1✔
411

412
                    timeTraceLogger.trace("reconnecting to server %s", cur);
1✔
413
                    tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
414

415
                    if (isConnected()) {
1✔
416
                        serverPool.connectSucceeded(cur);
1✔
417
                        statistics.incrementReconnects();
1✔
418
                        keepGoing = false;
1✔
419
                        break;
1✔
420
                    }
421

422
                    serverPool.connectFailed(cur);
1✔
423
                    String err = connectError.get();
1✔
424
                    if (this.isAuthenticationError(err)) {
1✔
425
                        if (err.equals(this.serverAuthErrors.get(resolved))) {
1✔
426
                            keepGoing = false; // double auth error
1✔
427
                            break;
1✔
428
                        }
429
                        serverAuthErrors.put(resolved, err);
1✔
430
                    }
431
                }
1✔
432
            }
1✔
433
        } // end-main-loop
434

435
        if (!isConnected()) {
1✔
436
            this.close();
1✔
437
            return;
1✔
438
        }
439

440
        this.subscribers.forEach((sid, sub) -> {
1✔
441
            if (sub.getDispatcher() == null && !sub.isDraining()) {
1✔
442
                sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
443
            }
444
        });
1✔
445

446
        this.dispatchers.forEach((nuid, d) -> {
1✔
447
            if (!d.isDraining()) {
1✔
448
                d.resendSubscriptions();
1✔
449
            }
450
        });
1✔
451

452
        try {
453
            this.flush(this.options.getConnectionTimeout());
1✔
454
        } catch (Exception exp) {
1✔
455
            this.processException(exp);
1✔
456
        }
1✔
457

458
        processConnectionEvent(Events.RESUBSCRIBED);
1✔
459

460
        // When the flush returns we are done sending internal messages,
461
        // so we can switch to the non-reconnect queue
462
        this.writer.setReconnectMode(false);
1✔
463
    }
1✔
464

465
    long timeCheck(long endNanos, String message) throws TimeoutException {
466
        long remaining = endNanos - NatsSystemClock.nanoTime();
1✔
467
        if (trace) {
1✔
468
            traceTimeCheck(message, remaining);
1✔
469
        }
470
        if (remaining < 0) {
1✔
471
            throw new TimeoutException("connection timed out");
1✔
472
        }
473
        return remaining;
1✔
474
    }
475

476
    void traceTimeCheck(String message, long remaining) {
477
        if (remaining < 0) {
1✔
478
            if (remaining > -1_000_000) { // less than -1 ms
1✔
479
                timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remaining));
1✔
480
            }
481
            else if (remaining > -1_000_000_000) { // less than -1 second
1✔
482
                long ms = -remaining / 1_000_000;
1✔
483
                timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms));
1✔
484
            }
1✔
485
            else {
486
                double seconds = ((double)-remaining) / 1_000_000_000.0;
1✔
487
                timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds));
1✔
488
            }
1✔
489
        }
490
        else if (remaining < 1_000_000) {
1✔
491
            timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remaining));
1✔
492
        }
493
        else if (remaining < 1_000_000_000) {
1✔
494
            long ms = remaining / 1_000_000;
1✔
495
            timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms));
1✔
496
        }
1✔
497
        else {
498
            double seconds = ((double) remaining) / 1_000_000_000.0;
1✔
499
            timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds));
1✔
500
        }
501
    }
1✔
502

503
    // is called from reconnect and connect
504
    // will wait for any previous attempt to complete, using the reader.stop and
505
    // writer.stop
506
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
507
        currentServer = null;
1✔
508

509
        try {
510
            Duration connectTimeout = options.getConnectionTimeout();
1✔
511
            boolean trace = options.isTraceConnection();
1✔
512
            long end = now + connectTimeout.toNanos();
1✔
513
            timeCheck(end, "starting connection attempt");
1✔
514

515
            statusLock.lock();
1✔
516
            try {
517
                if (this.connecting) {
1✔
518
                    return;
×
519
                }
520
                this.connecting = true;
1✔
521
                statusChanged.signalAll();
1✔
522
            } finally {
523
                statusLock.unlock();
1✔
524
            }
525

526
            // Create a new future for the dataport, the reader/writer will use this
527
            // to wait for the connect/failure.
528
            this.dataPortFuture = new CompletableFuture<>();
1✔
529

530
            // Make sure the reader and writer are stopped
531
            long timeoutNanos = timeCheck(end, "waiting for reader");
1✔
532
            if (reader.isRunning()) {
1✔
533
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
534
            }
535
            timeoutNanos = timeCheck(end, "waiting for writer");
1✔
536
            if (writer.isRunning()) {
1✔
537
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
538
            }
539

540
            timeCheck(end, "cleaning pong queue");
1✔
541
            cleanUpPongQueue();
1✔
542

543
            timeoutNanos = timeCheck(end, "connecting data port");
1✔
544
            DataPort newDataPort = this.options.buildDataPort();
1✔
545
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
1✔
546

547
            // Notify any threads waiting on the sockets
548
            this.dataPort = newDataPort;
1✔
549
            this.dataPortFuture.complete(this.dataPort);
1✔
550

551
            // Wait for the INFO message manually
552
            // all other traffic will use the reader and writer
553
            // TLS First, don't read info until after upgrade
554
            Callable<Object> connectTask = () -> {
1✔
555
                if (!options.isTlsFirst()) {
1✔
556
                    readInitialInfo();
1✔
557
                    checkVersionRequirements();
1✔
558
                }
559
                long start = NatsSystemClock.nanoTime();
1✔
560
                upgradeToSecureIfNeeded(resolved);
1✔
561
                if (trace && options.isTLSRequired()) {
1✔
562
                    // If the time appears too long it might be related to
563
                    // https://github.com/nats-io/nats.java#linux-platform-note
564
                    timeTraceLogger.trace("TLS upgrade took: %.3f (s)",
×
565
                            ((double) (NatsSystemClock.nanoTime() - start)) / NANOS_PER_SECOND);
×
566
                }
567
                if (options.isTlsFirst()) {
1✔
568
                    readInitialInfo();
1✔
569
                    checkVersionRequirements();
1✔
570
                }
571
                return null;
1✔
572
            };
573

574
            timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary");
1✔
575
            Future<Object> future = this.connectExecutor.submit(connectTask);
1✔
576
            try {
577
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
578
            } finally {
579
                future.cancel(true);
1✔
580
            }
581

582
            // start the reader and writer after we secured the connection, if necessary
583
            timeCheck(end, "starting reader");
1✔
584
            this.reader.start(this.dataPortFuture);
1✔
585
            timeCheck(end, "starting writer");
1✔
586
            this.writer.start(this.dataPortFuture);
1✔
587

588
            timeCheck(end, "sending connect message");
1✔
589
            this.sendConnect(resolved);
1✔
590

591
            timeoutNanos = timeCheck(end, "sending initial ping");
1✔
592
            Future<Boolean> pongFuture = sendPing();
1✔
593

594
            if (pongFuture != null) {
1✔
595
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
596
            }
597

598
            if (pingTask == null) {
1✔
599
                timeCheck(end, "starting ping and cleanup timers");
1✔
600
                long pingMillis = this.options.getPingInterval().toMillis();
1✔
601

602
                if (pingMillis > 0) {
1✔
603
                    pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
1✔
604
                        if (isConnected() && !isClosing()) {
1✔
605
                            try {
606
                                softPing(); // The timer always uses the standard queue
1✔
607
                            }
608
                            catch (Exception e) {
1✔
609
                                // it's running in a thread, there is no point throwing here
610
                            }
1✔
611
                        }
612
                    });
1✔
613
                }
614

615
                long cleanMillis = this.options.getRequestCleanupInterval().toMillis();
1✔
616

617
                if (cleanMillis > 0) {
1✔
618
                    cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
1✔
619
                }
620
            }
621

622
            // Set connected status
623
            timeCheck(end, "updating status to connected");
1✔
624
            statusLock.lock();
1✔
625
            try {
626
                this.connecting = false;
1✔
627

628
                if (this.exceptionDuringConnectChange != null) {
1✔
629
                    throw this.exceptionDuringConnectChange;
×
630
                }
631

632
                this.currentServer = cur;
1✔
633
                this.serverAuthErrors.clear(); // reset on successful connection
1✔
634
                updateStatus(Status.CONNECTED); // will signal status change, we also signal in finally
1✔
635
            } finally {
636
                statusLock.unlock();
1✔
637
            }
638
            timeTraceLogger.trace("status updated");
1✔
639
        } catch (Exception exp) {
1✔
640
            processException(exp);
1✔
641
            try {
642
                // allow force reconnect since this is pretty exceptional,
643
                // a connection failure while trying to connect
644
                this.closeSocket(false, true);
1✔
645
            } catch (InterruptedException e) {
×
646
                processException(e);
×
647
                Thread.currentThread().interrupt();
×
648
            }
1✔
649
        } finally {
650
            statusLock.lock();
1✔
651
            try {
652
                this.connecting = false;
1✔
653
                statusChanged.signalAll();
1✔
654
            } finally {
655
                statusLock.unlock();
1✔
656
            }
657
        }
658
    }
1✔
659

660
    void checkVersionRequirements() throws IOException {
661
        Options opts = getOptions();
1✔
662
        ServerInfo info = getInfo();
1✔
663

664
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
1✔
665
            throw new IOException("Server does not support no echo.");
1✔
666
        }
667
    }
1✔
668

669
    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
670
        // When already communicating over "https" websocket, do NOT try to upgrade to secure.
671
        if (!nuri.isWebsocket()) {
1✔
672
            if (options.isTlsFirst()) {
1✔
673
                dataPort.upgradeToSecure();
1✔
674
            }
675
            else {
676
                // server    | client options      | result
677
                // --------- | ------------------- | --------
678
                // required  | not isTLSRequired() | mismatch
679
                // available | not isTLSRequired() | ok
680
                // neither   | not isTLSRequired() | ok
681
                // required  | isTLSRequired()     | ok
682
                // available | isTLSRequired()     | ok
683
                // neither   | isTLSRequired()     | mismatch
684
                ServerInfo serverInfo = getInfo();
1✔
685
                if (options.isTLSRequired()) {
1✔
686
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
1✔
687
                        throw new IOException("SSL connection wanted by client.");
1✔
688
                    }
689
                    dataPort.upgradeToSecure();
1✔
690
                }
691
                else if (serverInfo.isTLSRequired()) {
1✔
692
                    throw new IOException("SSL required by server.");
1✔
693
                }
694
            }
695
        }
696
    }
1✔
697
    // Called from reader/writer thread
698
    void handleCommunicationIssue(Exception io) {
699
        // If we are connecting or disconnecting, note exception and leave
700
        statusLock.lock();
1✔
701
        try {
702
            if (this.connecting || this.disconnecting || this.status == Status.CLOSED || this.isDraining()) {
1✔
703
                this.exceptionDuringConnectChange = io;
1✔
704
                return;
1✔
705
            }
706
        } finally {
707
            statusLock.unlock();
1✔
708
        }
709

710
        processException(io);
1✔
711

712
        // Spawn a thread so we don't have timing issues with
713
        // waiting on read/write threads
714
        executor.submit(() -> {
1✔
715
            if (!tryingToConnect.get()) {
1✔
716
                try {
717
                    tryingToConnect.set(true);
1✔
718

719
                    // any issue that brings us here is pretty serious
720
                    // so we are comfortable forcing the close
721
                    this.closeSocket(true, true);
1✔
722
                } catch (InterruptedException e) {
×
723
                    processException(e);
×
724
                    Thread.currentThread().interrupt();
×
725
                } finally {
726
                    tryingToConnect.set(false);
1✔
727
                }
728
            }
729
        });
1✔
730
    }
1✔
731

732
    // Close socket is called when another connect attempt is possible
733
    // Close is called when the connection should shut down, period
734
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
735
        // Ensure we close the socket exclusively within one thread.
736
        closeSocketLock.lock();
1✔
737
        try {
738
            boolean wasConnected;
739
            statusLock.lock();
1✔
740
            try {
741
                if (isDisconnectingOrClosed()) {
1✔
742
                    waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
743
                    return;
1✔
744
                }
745
                this.disconnecting = true;
1✔
746
                this.exceptionDuringConnectChange = null;
1✔
747
                wasConnected = (this.status == Status.CONNECTED);
1✔
748
                statusChanged.signalAll();
1✔
749
            } finally {
750
                statusLock.unlock();
1✔
751
            }
752

753
            closeSocketImpl(forceClose);
1✔
754

755
            statusLock.lock();
1✔
756
            try {
757
                updateStatus(Status.DISCONNECTED);
1✔
758
                this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
1✔
759
                this.disconnecting = false;
1✔
760
                statusChanged.signalAll();
1✔
761
            } finally {
762
                statusLock.unlock();
1✔
763
            }
764

765
            if (isClosing()) { // isClosing() means we are in the close method or were asked to be
1✔
766
                close();
1✔
767
            } else if (wasConnected && tryReconnectIfConnected) {
1✔
768
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
769
            }
770
        } finally {
771
            closeSocketLock.unlock();
1✔
772
        }
773
    }
1✔
774

775
    // Close socket is called when another connect attempt is possible
776
    // Close is called when the connection should shut down, period
777
    /**
778
     * {@inheritDoc}
779
     */
780
    @Override
781
    public void close() throws InterruptedException {
782
        this.close(true, false);
1✔
783
    }
1✔
784

785
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
786
        statusLock.lock();
1✔
787
        try {
788
            if (checkDrainStatus && this.isDraining()) {
1✔
789
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
790
                return;
1✔
791
            }
792

793
            this.closing = true;// We were asked to close, so do it
1✔
794
            if (isDisconnectingOrClosed()) {
1✔
795
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
796
                return;
1✔
797
            } else {
798
                this.disconnecting = true;
1✔
799
                this.exceptionDuringConnectChange = null;
1✔
800
                statusChanged.signalAll();
1✔
801
            }
802
        } finally {
803
            statusLock.unlock();
1✔
804
        }
805

806
        // Stop the reconnect wait timer after we stop the writer/reader (only if we are
807
        // really closing, not on errors)
808
        if (this.reconnectWaiter != null) {
1✔
809
            this.reconnectWaiter.cancel(true);
1✔
810
        }
811

812
        closeSocketImpl(forceClose);
1✔
813

814
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
1✔
815

816
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
1✔
817

818
        this.dispatchers.clear();
1✔
819
        this.subscribers.clear();
1✔
820

821
        if (pingTask != null) {
1✔
822
            pingTask.shutdown();
1✔
823
            pingTask = null;
1✔
824
        }
825
        if (cleanupTask != null) {
1✔
826
            cleanupTask.shutdown();
1✔
827
            cleanupTask = null;
1✔
828
        }
829

830
        cleanResponses(true);
1✔
831

832
        cleanUpPongQueue();
1✔
833

834
        statusLock.lock();
1✔
835
        try {
836
            updateStatus(Status.CLOSED); // will signal, we also signal when we stop disconnecting
1✔
837

838
            /*
839
             * if (exceptionDuringConnectChange != null) {
840
             * processException(exceptionDuringConnectChange); exceptionDuringConnectChange
841
             * = null; }
842
             */
843
        } finally {
844
            statusLock.unlock();
1✔
845
        }
846

847
        // Stop the error handling and connect executors
848
        callbackRunner.shutdown();
1✔
849
        try {
850
            callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
1✔
851
        } finally {
852
            callbackRunner.shutdownNow();
1✔
853
        }
854

855
        // There's no need to wait for running tasks since we're told to close
856
        connectExecutor.shutdownNow();
1✔
857

858
        statusLock.lock();
1✔
859
        try {
860
            this.disconnecting = false;
1✔
861
            statusChanged.signalAll();
1✔
862
        } finally {
863
            statusLock.unlock();
1✔
864
        }
865
    }
1✔
866

867
    // Should only be called from closeSocket or close
868
    void closeSocketImpl(boolean forceClose) {
869
        this.currentServer = null;
1✔
870

871
        // Signal both to stop.
872
        final Future<Boolean> readStop = this.reader.stop();
1✔
873
        final Future<Boolean> writeStop = this.writer.stop();
1✔
874

875
        // Now wait until they both stop before closing the socket.
876
        try {
877
            readStop.get(1, TimeUnit.SECONDS);
1✔
878
        } catch (Exception ex) {
1✔
879
            //
880
        }
1✔
881
        try {
882
            writeStop.get(1, TimeUnit.SECONDS);
1✔
883
        } catch (Exception ex) {
1✔
884
            //
885
        }
1✔
886

887
        // Close and reset the current data port and future
888
        if (dataPortFuture != null) {
1✔
889
            dataPortFuture.cancel(true);
1✔
890
            dataPortFuture = null;
1✔
891
        }
892

893
        // Close the current socket and cancel anyone waiting for it
894
        try {
895
            if (dataPort != null) {
1✔
896
                if (forceClose) {
1✔
897
                    dataPort.forceClose();
1✔
898
                }
899
                else {
900
                    dataPort.close();
1✔
901
                }
902
            }
903

904
        } catch (IOException ex) {
×
905
            processException(ex);
×
906
        }
1✔
907
        cleanUpPongQueue();
1✔
908

909
        try {
910
            this.reader.stop().get(10, TimeUnit.SECONDS);
1✔
911
        } catch (Exception ex) {
×
912
            processException(ex);
×
913
        }
1✔
914
        try {
915
            this.writer.stop().get(10, TimeUnit.SECONDS);
1✔
916
        } catch (Exception ex) {
×
917
            processException(ex);
×
918
        }
1✔
919
    }
1✔
920

921
    void cleanUpPongQueue() {
922
        Future<Boolean> b;
923
        while ((b = pongQueue.poll()) != null) {
1✔
924
            b.cancel(true);
1✔
925
        }
926
    }
1✔
927

928
    /**
929
     * {@inheritDoc}
930
     */
931
    @Override
932
    public void publish(String subject, byte[] body) {
933
        publishInternal(subject, null, null, body, true, false);
1✔
934
    }
1✔
935

936
    /**
937
     * {@inheritDoc}
938
     */
939
    @Override
940
    public void publish(String subject, Headers headers, byte[] body) {
941
        publishInternal(subject, null, headers, body, true, false);
1✔
942
    }
1✔
943

944
    /**
945
     * {@inheritDoc}
946
     */
947
    @Override
948
    public void publish(String subject, String replyTo, byte[] body) {
949
        publishInternal(subject, replyTo, null, body, true, false);
1✔
950
    }
1✔
951

952
    /**
953
     * {@inheritDoc}
954
     */
955
    @Override
956
    public void publish(String subject, String replyTo, Headers headers, byte[] body) {
957
        publishInternal(subject, replyTo, headers, body, true, false);
1✔
958
    }
1✔
959

960
    /**
961
     * {@inheritDoc}
962
     */
963
    @Override
964
    public void publish(Message message) {
965
        validateNotNull(message, "Message");
1✔
966
        publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
1✔
967
    }
1✔
968

969
    void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
970
        checkPayloadSize(data);
1✔
971
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
972
        if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
1✔
973
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
1✔
974
        }
975

976
        if (isClosed()) {
1✔
977
            throw new IllegalStateException("Connection is Closed");
1✔
978
        } else if (blockPublishForDrain.get()) {
1✔
979
            throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
×
980
        }
981

982
        if ((status == Status.RECONNECTING || status == Status.DISCONNECTED)
1✔
983
                && !this.writer.canQueueDuringReconnect(npm)) {
1✔
984
            throw new IllegalStateException(
1✔
985
                    "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
1✔
986
        }
987

988
        queueOutgoing(npm);
1✔
989
    }
1✔
990

991
    private void checkPayloadSize(byte[] body) {
992
        if (options.clientSideLimitChecks() && body != null && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
1✔
993
            throw new IllegalArgumentException(
1✔
994
                "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
1✔
995
        }
996
    }
1✔
997
    /**
998
     * {@inheritDoc}
999
     */
1000
    @Override
1001
    public Subscription subscribe(String subject) {
1002
        validateSubject(subject, true);
1✔
1003
        return createSubscription(subject, null, null, null);
1✔
1004
    }
1005

1006
    /**
1007
     * {@inheritDoc}
1008
     */
1009
    @Override
1010
    public Subscription subscribe(String subject, String queueName) {
1011
        validateSubject(subject, true);
1✔
1012
        validateQueueName(queueName, true);
1✔
1013
        return createSubscription(subject, queueName, null, null);
1✔
1014
    }
1015

1016
    void invalidate(NatsSubscription sub) {
1017
        remove(sub);
1✔
1018
        sub.invalidate();
1✔
1019
    }
1✔
1020

1021
    void remove(NatsSubscription sub) {
1022
        CharSequence sid = sub.getSID();
1✔
1023
        subscribers.remove(sid);
1✔
1024

1025
        if (sub.getNatsDispatcher() != null) {
1✔
1026
            sub.getNatsDispatcher().remove(sub);
1✔
1027
        }
1028
    }
1✔
1029

1030
    void unsubscribe(NatsSubscription sub, int after) {
1031
        if (isClosed()) { // last chance, usually sub will catch this
1✔
1032
            throw new IllegalStateException("Connection is Closed");
×
1033
        }
1034

1035
        if (after <= 0) {
1✔
1036
            this.invalidate(sub); // Will clean it up
1✔
1037
        } else {
1038
            sub.setUnsubLimit(after);
1✔
1039

1040
            if (sub.reachedUnsubLimit()) {
1✔
1041
                sub.invalidate();
1✔
1042
            }
1043
        }
1044

1045
        if (!isConnected()) {
1✔
1046
            return; // We will set up sub on reconnect or ignore
1✔
1047
        }
1048

1049
        sendUnsub(sub, after);
1✔
1050
    }
1✔
1051

1052
    void sendUnsub(NatsSubscription sub, int after) {
1053
        ByteArrayBuilder bab =
1✔
1054
            new ByteArrayBuilder().append(UNSUB_SP_BYTES).append(sub.getSID());
1✔
1055
        if (after > 0) {
1✔
1056
            bab.append(SP).append(after);
1✔
1057
        }
1058
        queueOutgoing(new ProtocolMessage(bab, true));
1✔
1059
    }
1✔
1060

1061
    // Assumes the null/empty checks were handled elsewhere
1062
    NatsSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, NatsSubscriptionFactory factory) {
1063
        if (isClosed()) {
1✔
1064
            throw new IllegalStateException("Connection is Closed");
1✔
1065
        } else if (isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
1✔
1066
            throw new IllegalStateException("Connection is Draining");
1✔
1067
        }
1068

1069
        NatsSubscription sub;
1070
        String sid = getNextSid();
1✔
1071

1072
        if (factory == null) {
1✔
1073
            sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1074
        }
1075
        else {
1076
            sub = factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1077
        }
1078
        subscribers.put(sid, sub);
1✔
1079

1080
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1081
        return sub;
1✔
1082
    }
1083

1084
    String getNextSid() {
1085
        return Long.toString(nextSid.getAndIncrement());
1✔
1086
    }
1087

1088
    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
1089
        String sid = getNextSid();
1✔
1090
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1091
        subscribers.put(sid, sub);
1✔
1092
        return sid;
1✔
1093
    }
1094

1095
    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
1096
        if (!isConnected()) {
1✔
1097
            return; // We will set up sub on reconnect or ignore
1✔
1098
        }
1099

1100
        ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
1✔
1101
        if (queueName != null) {
1✔
1102
            bab.append(SP).append(queueName);
1✔
1103
        }
1104
        bab.append(SP).append(sid);
1✔
1105

1106
        // setting this to filter on stop.
1107
        // if it's an "internal" message, it won't be filtered
1108
        // if it's a normal message, the subscription will already be registered
1109
        // and therefore will be re-subscribed after a stop anyway
1110
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
1✔
1111
        if (treatAsInternal) {
1✔
1112
            queueInternalOutgoing(subMsg);
1✔
1113
        } else {
1114
            queueOutgoing(subMsg);
1✔
1115
        }
1116
    }
1✔
1117

1118
    /**
1119
     * {@inheritDoc}
1120
     */
1121
    @Override
1122
    public String createInbox() {
1123
        return options.getInboxPrefix() + nuid.next();
1✔
1124
    }
1125

1126
    int getRespInboxLength() {
1127
        return options.getInboxPrefix().length() + 22 + 1; // 22 for nuid, 1 for .
1✔
1128
    }
1129

1130
    String createResponseInbox(String inbox) {
1131
        // Substring gets rid of the * [trailing]
1132
        return inbox.substring(0, getRespInboxLength()) + nuid.next();
1✔
1133
    }
1134

1135
    // If the inbox is long enough, pull out the end part, otherwise, just use the
1136
    // full thing
1137
    String getResponseToken(String responseInbox) {
1138
        int len = getRespInboxLength();
1✔
1139
        if (responseInbox.length() <= len) {
1✔
1140
            return responseInbox;
1✔
1141
        }
1142
        return responseInbox.substring(len);
1✔
1143
    }
1144

1145
    void cleanResponses(boolean closing) {
1146
        ArrayList<String> toRemove = new ArrayList<>();
1✔
1147
        boolean wasInterrupted = false;
1✔
1148

1149
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
1✔
1150
            boolean remove = false;
1✔
1151
            NatsRequestCompletableFuture future = entry.getValue();
1✔
1152
            if (future.hasExceededTimeout()) {
1✔
1153
                remove = true;
1✔
1154
                future.cancelTimedOut();
1✔
1155
            }
1156
            else if (closing) {
1✔
1157
                remove = true;
1✔
1158
                future.cancelClosing();
1✔
1159
            }
1160
            else if (future.isDone()) {
1✔
1161
                // done should have already been removed, not sure if
1162
                // this even needs checking, but it won't hurt
1163
                remove = true;
1✔
1164
                try {
UNCOV
1165
                    future.get();
×
1166
                }
1167
                catch (InterruptedException e) {
×
1168
                    Thread.currentThread().interrupt();
×
1169
                    // we might have collected some entries already, but were interrupted
1170
                    // break out so we finish as quick as possible
1171
                    // cleanResponses will be called again anyway
1172
                    wasInterrupted = true;
×
1173
                    break;
×
1174
                }
1175
                catch (Throwable ignore) {}
1✔
1176
            }
1177

1178
            if (remove) {
1✔
1179
                toRemove.add(entry.getKey());
1✔
1180
                statistics.decrementOutstandingRequests();
1✔
1181
            }
1182
        }
1✔
1183

1184
        for (String key : toRemove) {
1✔
1185
            responsesAwaiting.remove(key);
1✔
1186
        }
1✔
1187

1188
        if (advancedTracking && !wasInterrupted) {
1✔
1189
            toRemove.clear(); // just reuse this
1✔
1190
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1✔
1191
                NatsRequestCompletableFuture future = entry.getValue();
1✔
1192
                if (future.hasExceededTimeout()) {
1✔
1193
                    toRemove.add(entry.getKey());
1✔
1194
                    future.cancelTimedOut();
1✔
1195
                }
1196
            }
1✔
1197

1198
            for (String token : toRemove) {
1✔
1199
                responsesRespondedTo.remove(token);
1✔
1200
            }
1✔
1201
        }
1202
    }
1✔
1203

1204
    /**
1205
     * {@inheritDoc}
1206
     */
1207
    @Override
1208
    public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
1209
        return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1210
    }
1211

1212
    /**
1213
     * {@inheritDoc}
1214
     */
1215
    @Override
1216
    public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
1217
        return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1218
    }
1219

1220
    /**
1221
     * {@inheritDoc}
1222
     */
1223
    @Override
1224
    public Message request(Message message, Duration timeout) throws InterruptedException {
1225
        validateNotNull(message, "Message");
1✔
1226
        return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1227
    }
1228

1229
    Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout,
1230
                            CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
1231
        CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1232
        try {
1233
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
1234
        } catch (TimeoutException | ExecutionException | CancellationException e) {
1✔
1235
            return null;
1✔
1236
        }
1237
    }
1238

1239
    /**
1240
     * {@inheritDoc}
1241
     */
1242
    @Override
1243
    public CompletableFuture<Message> request(String subject, byte[] body) {
1244
        return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1245
    }
1246

1247
    /**
1248
     * {@inheritDoc}
1249
     */
1250
    @Override
1251
    public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
1252
        return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1253
    }
1254

1255
    /**
1256
     * {@inheritDoc}
1257
     */
1258
    @Override
1259
    public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
1260
        return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1261
    }
1262

1263
    /**
1264
     * {@inheritDoc}
1265
     */
1266
    @Override
1267
    public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
1268
        return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1269
    }
1270

1271
    /**
1272
     * {@inheritDoc}
1273
     */
1274
    @Override
1275
    public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
1276
        validateNotNull(message, "Message");
1✔
1277
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1278
    }
1279

1280
    /**
1281
     * {@inheritDoc}
1282
     */
1283
    @Override
1284
    public CompletableFuture<Message> request(Message message) {
1285
        validateNotNull(message, "Message");
1✔
1286
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
1✔
1287
    }
1288

1289
    CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout,
1290
                                                     CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1291
        checkPayloadSize(data);
1✔
1292

1293
        if (isClosed()) {
1✔
1294
            throw new IllegalStateException("Connection is Closed");
1✔
1295
        } else if (isDraining()) {
1✔
1296
            throw new IllegalStateException("Connection is Draining");
1✔
1297
        }
1298

1299
        if (inboxDispatcher.get() == null) {
1✔
1300
            inboxDispatcherLock.lock();
1✔
1301
            try {
1302
                if (inboxDispatcher.get() == null) {
1✔
1303
                    NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1✔
1304

1305
                    // Ensure the dispatcher is started before publishing messages
1306
                    String id = this.nuid.next();
1✔
1307
                    this.dispatchers.put(id, d);
1✔
1308
                    d.start(id);
1✔
1309
                    d.subscribe(this.mainInbox);
1✔
1310
                    inboxDispatcher.set(d);
1✔
1311
                }
1312
            } finally {
1313
                inboxDispatcherLock.unlock();
1✔
1314
            }
1315
        }
1316

1317
        boolean oldStyle = options.isOldRequestStyle();
1✔
1318
        String responseInbox = oldStyle ? createInbox() : createResponseInbox(this.mainInbox);
1✔
1319
        String responseToken = getResponseToken(responseInbox);
1✔
1320
        NatsRequestCompletableFuture future =
1✔
1321
            new NatsRequestCompletableFuture(cancelAction,
1322
                futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());
1✔
1323

1324
        if (!oldStyle) {
1✔
1325
            responsesAwaiting.put(responseToken, future);
1✔
1326
        }
1327
        statistics.incrementOutstandingRequests();
1✔
1328

1329
        if (oldStyle) {
1✔
1330
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
1✔
1331
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
1✔
1332
            dispatcher.unsubscribe(responseInbox, 1);
1✔
1333
            // Unsubscribe when future is cancelled:
1334
            future.whenComplete((msg, exception) -> {
1✔
1335
                if (exception instanceof CancellationException) {
1✔
1336
                    dispatcher.unsubscribe(responseInbox);
×
1337
                }
1338
            });
1✔
1339
            responsesAwaiting.put(sub.getSID(), future);
1✔
1340
        }
1341

1342
        publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1343
        statistics.incrementRequestsSent();
1✔
1344

1345
        return future;
1✔
1346
    }
1347

1348
    void deliverReply(Message msg) {
1349
        boolean oldStyle = options.isOldRequestStyle();
1✔
1350
        String subject = msg.getSubject();
1✔
1351
        String token = getResponseToken(subject);
1✔
1352
        String key = oldStyle ? msg.getSID() : token;
1✔
1353
        NatsRequestCompletableFuture f = responsesAwaiting.remove(key);
1✔
1354
        if (f != null) {
1✔
1355
            if (advancedTracking) {
1✔
1356
                responsesRespondedTo.put(key, f);
1✔
1357
            }
1358
            statistics.decrementOutstandingRequests();
1✔
1359
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
1✔
1360
                switch (f.getCancelAction()) {
1✔
1361
                    case COMPLETE:
1362
                        f.complete(msg);
1✔
1363
                        break;
1✔
1364
                    case REPORT:
1365
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
1✔
1366
                        break;
1✔
1367
                    case CANCEL:
1368
                    default:
1369
                        f.cancel(true);
1✔
1370
                }
1371
            }
1372
            else {
1373
                f.complete(msg);
1✔
1374
            }
1375
            statistics.incrementRepliesReceived();
1✔
1376
        }
1377
        else if (!oldStyle && !subject.startsWith(mainInbox)) {
1✔
1378
            if (advancedTracking) {
1✔
1379
                if (responsesRespondedTo.get(key) != null) {
1✔
1380
                    statistics.incrementDuplicateRepliesReceived();
1✔
1381
                } else {
1382
                    statistics.incrementOrphanRepliesReceived();
1✔
1383
                }
1384
            }
1385
        }
1386
    }
1✔
1387

1388
    public Dispatcher createDispatcher() {
1389
        return createDispatcher(null);
1✔
1390
    }
1391

1392
    public Dispatcher createDispatcher(MessageHandler handler) {
1393
        if (isClosed()) {
1✔
1394
            throw new IllegalStateException("Connection is Closed");
1✔
1395
        } else if (isDraining()) {
1✔
1396
            throw new IllegalStateException("Connection is Draining");
1✔
1397
        }
1398

1399
        NatsDispatcher dispatcher = dispatcherFactory.createDispatcher(this, handler);
1✔
1400
        String id = this.nuid.next();
1✔
1401
        this.dispatchers.put(id, dispatcher);
1✔
1402
        dispatcher.start(id);
1✔
1403
        return dispatcher;
1✔
1404
    }
1405

1406
    public void closeDispatcher(Dispatcher d) {
1407
        if (isClosed()) {
1✔
1408
            throw new IllegalStateException("Connection is Closed");
1✔
1409
        } else if (!(d instanceof NatsDispatcher)) {
1✔
1410
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
×
1411
        }
1412

1413
        NatsDispatcher nd = ((NatsDispatcher) d);
1✔
1414

1415
        if (nd.isDraining()) {
1✔
1416
            return; // No op while draining
1✔
1417
        }
1418

1419
        if (!this.dispatchers.containsKey(nd.getId())) {
1✔
1420
            throw new IllegalArgumentException("Dispatcher is already closed.");
1✔
1421
        }
1422

1423
        cleanupDispatcher(nd);
1✔
1424
    }
1✔
1425

1426
    void cleanupDispatcher(NatsDispatcher nd) {
1427
        nd.stop(true);
1✔
1428
        this.dispatchers.remove(nd.getId());
1✔
1429
    }
1✔
1430

1431
    Map<String, Dispatcher> getDispatchers() {
1432
        return Collections.unmodifiableMap(dispatchers);
1✔
1433
    }
1434

1435
    public void addConnectionListener(ConnectionListener connectionListener) {
1436
        connectionListeners.add(connectionListener);
1✔
1437
    }
1✔
1438

1439
    public void removeConnectionListener(ConnectionListener connectionListener) {
1440
        connectionListeners.remove(connectionListener);
1✔
1441
    }
1✔
1442

1443
    public void flush(Duration timeout) throws TimeoutException, InterruptedException {
1444

1445
        Instant start = Instant.now();
1✔
1446
        waitForConnectOrClose(timeout);
1✔
1447

1448
        if (isClosed()) {
1✔
1449
            throw new TimeoutException("Attempted to flush while closed");
1✔
1450
        }
1451

1452
        if (timeout == null) {
1✔
1453
            timeout = Duration.ZERO;
1✔
1454
        }
1455

1456
        Instant now = Instant.now();
1✔
1457
        Duration waitTime = Duration.between(start, now);
1✔
1458

1459
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
1✔
1460
            throw new TimeoutException("Timeout out waiting for connection before flush.");
1✔
1461
        }
1462

1463
        try {
1464
            Future<Boolean> waitForIt = sendPing();
1✔
1465

1466
            if (waitForIt == null) { // error in the send ping code
1✔
1467
                return;
×
1468
            }
1469

1470
            long nanos = timeout.toNanos();
1✔
1471

1472
            if (nanos > 0) {
1✔
1473

1474
                nanos -= waitTime.toNanos();
1✔
1475

1476
                if (nanos <= 0) {
1✔
1477
                    nanos = 1; // let the future timeout if it isn't resolved
×
1478
                }
1479

1480
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
1✔
1481
            } else {
1482
                waitForIt.get();
1✔
1483
            }
1484

1485
            this.statistics.incrementFlushCounter();
1✔
1486
        } catch (ExecutionException | CancellationException e) {
1✔
1487
            throw new TimeoutException(e.toString());
1✔
1488
        }
1✔
1489
    }
1✔
1490

1491
    void sendConnect(NatsUri nuri) throws IOException {
1492
        try {
1493
            ServerInfo info = this.serverInfo.get();
1✔
1494
            // This is changed - we used to use info.isAuthRequired(), but are changing it to
1495
            // better match older versions of the server. It may change again in the future.
1496
            CharBuffer connectOptions = options.buildProtocolConnectOptionsString(
1✔
1497
                nuri.toString(), true, info.getNonce());
1✔
1498
            ByteArrayBuilder bab =
1✔
1499
                new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
1✔
1500
                    .append(CONNECT_SP_BYTES).append(connectOptions);
1✔
1501
            queueInternalOutgoing(new ProtocolMessage(bab, false));
1✔
1502
        } catch (Exception exp) {
1✔
1503
            throw new IOException("Error sending connect string", exp);
1✔
1504
        }
1✔
1505
    }
1✔
1506

1507
    CompletableFuture<Boolean> sendPing() {
1508
        return this.sendPing(true);
1✔
1509
    }
1510

1511
    CompletableFuture<Boolean> softPing() {
1512
        return this.sendPing(false);
1✔
1513
    }
1514

1515
    /**
1516
     * {@inheritDoc}
1517
     */
1518
    @Override
1519
    public Duration RTT() throws IOException {
1520
        if (!isConnectedOrConnecting()) {
1✔
1521
            throw new IOException("Must be connected to do RTT.");
1✔
1522
        }
1523

1524
        long timeout = options.getConnectionTimeout().toMillis();
1✔
1525
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1526
        pongQueue.add(pongFuture);
1✔
1527
        try {
1528
            long time = NatsSystemClock.nanoTime();
1✔
1529
            writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
1✔
1530
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
1✔
1531
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
1✔
1532
        }
1533
        catch (ExecutionException e) {
×
1534
            throw new IOException(e.getCause());
×
1535
        }
1536
        catch (TimeoutException e) {
×
1537
            throw new IOException(e);
×
1538
        }
1539
        catch (InterruptedException e) {
×
1540
            Thread.currentThread().interrupt();
×
1541
            throw new IOException(e);
×
1542
        }
1543
    }
1544

1545
    // Send a ping request and push a pong future on the queue.
1546
    // futures are completed in order, keep this one if a thread wants to wait
1547
    // for a specific pong. Note, if no pong returns the wait will not return
1548
    // without setting a timeout.
1549
    CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
1550
        if (!isConnectedOrConnecting()) {
1✔
1551
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1552
            retVal.complete(Boolean.FALSE);
1✔
1553
            return retVal;
1✔
1554
        }
1555

1556
        if (!treatAsInternal && !this.needPing.get()) {
1✔
1557
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1558
            retVal.complete(Boolean.TRUE);
1✔
1559
            this.needPing.set(true);
1✔
1560
            return retVal;
1✔
1561
        }
1562

1563
        int max = options.getMaxPingsOut();
1✔
1564
        if (max > 0 && pongQueue.size() + 1 > max) {
1✔
1565
            handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
1✔
1566
            return null;
1✔
1567
        }
1568

1569
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1570
        pongQueue.add(pongFuture);
1✔
1571

1572
        if (treatAsInternal) {
1✔
1573
            queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1574
        } else {
1575
            queueOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1576
        }
1577

1578
        this.needPing.set(true);
1✔
1579
        this.statistics.incrementPingCount();
1✔
1580
        return pongFuture;
1✔
1581
    }
1582

1583
    // This is a minor speed / memory enhancement.
1584
    // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state
1585
    // But it is safe to share the data bytes and the size since those fields are just being read
1586
    // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1587
    // reducing allocation of data for something that is often created and used
1588
    // These static instances are the once that are used for copying, sendPing and sendPong
1589
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1✔
1590
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1✔
1591

1592
    void sendPong() {
1593
        queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
1✔
1594
    }
1✔
1595

1596
    // Called by the reader
1597
    void handlePong() {
1598
        CompletableFuture<Boolean> pongFuture = pongQueue.pollFirst();
1✔
1599
        if (pongFuture != null) {
1✔
1600
            pongFuture.complete(Boolean.TRUE);
1✔
1601
        }
1602
    }
1✔
1603

1604
    void readInitialInfo() throws IOException {
1605
        byte[] readBuffer = new byte[options.getBufferSize()];
1✔
1606
        ByteBuffer protocolBuffer = ByteBuffer.allocate(options.getBufferSize());
1✔
1607
        boolean gotCRLF = false;
1✔
1608
        boolean gotCR = false;
1✔
1609

1610
        while (!gotCRLF) {
1✔
1611
            int read = this.dataPort.read(readBuffer, 0, readBuffer.length);
1✔
1612

1613
            if (read < 0) {
1✔
1614
                break;
1✔
1615
            }
1616

1617
            int i = 0;
1✔
1618
            while (i < read) {
1✔
1619
                byte b = readBuffer[i++];
1✔
1620

1621
                if (gotCR) {
1✔
1622
                    if (b != LF) {
1✔
1623
                        throw new IOException("Missed LF after CR waiting for INFO.");
1✔
1624
                    } else if (i < read) {
1✔
1625
                        throw new IOException("Read past initial info message.");
1✔
1626
                    }
1627

1628
                    gotCRLF = true;
1✔
1629
                    break;
1✔
1630
                }
1631

1632
                if (b == CR) {
1✔
1633
                    gotCR = true;
1✔
1634
                } else {
1635
                    if (!protocolBuffer.hasRemaining()) {
1✔
1636
                        protocolBuffer = enlargeBuffer(protocolBuffer); // just double it
1✔
1637
                    }
1638
                    protocolBuffer.put(b);
1✔
1639
                }
1640
            }
1✔
1641
        }
1✔
1642

1643
        if (!gotCRLF) {
1✔
1644
            throw new IOException("Failed to read initial info message.");
1✔
1645
        }
1646

1647
        protocolBuffer.flip();
1✔
1648

1649
        String infoJson = UTF_8.decode(protocolBuffer).toString();
1✔
1650
        infoJson = infoJson.trim();
1✔
1651
        String[] msg = infoJson.split("\\s");
1✔
1652
        String op = msg[0].toUpperCase();
1✔
1653

1654
        if (!OP_INFO.equals(op)) {
1✔
1655
            throw new IOException("Received non-info initial message.");
1✔
1656
        }
1657

1658
        handleInfo(infoJson);
1✔
1659
    }
1✔
1660

1661
    void handleInfo(String infoJson) {
1662
        ServerInfo serverInfo = new ServerInfo(infoJson);
1✔
1663
        this.serverInfo.set(serverInfo);
1✔
1664

1665
        List<String> urls = this.serverInfo.get().getConnectURLs();
1✔
1666
        if (urls != null && !urls.isEmpty()) {
1✔
1667
            if (serverPool.acceptDiscoveredUrls(urls)) {
1✔
1668
                processConnectionEvent(Events.DISCOVERED_SERVERS);
1✔
1669
            }
1670
        }
1671

1672
        if (serverInfo.isLameDuckMode()) {
1✔
1673
            processConnectionEvent(Events.LAME_DUCK);
1✔
1674
        }
1675
    }
1✔
1676

1677
    void queueOutgoing(NatsMessage msg) {
1678
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1679
            throw new IllegalArgumentException("Control line is too long");
1✔
1680
        }
1681
        if (!writer.queue(msg)) {
1✔
1682
            options.getErrorListener().messageDiscarded(this, msg);
1✔
1683
        }
1684
    }
1✔
1685

1686
    void queueInternalOutgoing(NatsMessage msg) {
1687
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1688
            throw new IllegalArgumentException("Control line is too long");
×
1689
        }
1690
        this.writer.queueInternalMessage(msg);
1✔
1691
    }
1✔
1692

1693
    void deliverMessage(NatsMessage msg) {
1694
        this.needPing.set(false);
1✔
1695
        this.statistics.incrementInMsgs();
1✔
1696
        this.statistics.incrementInBytes(msg.getSizeInBytes());
1✔
1697

1698
        NatsSubscription sub = subscribers.get(msg.getSID());
1✔
1699

1700
        if (sub != null) {
1✔
1701
            msg.setSubscription(sub);
1✔
1702

1703
            NatsDispatcher d = sub.getNatsDispatcher();
1✔
1704
            NatsConsumer c = (d == null) ? sub : d;
1✔
1705
            MessageQueue q = ((d == null) ? sub.getMessageQueue() : d.getMessageQueue());
1✔
1706

1707
            if (c.hasReachedPendingLimits()) {
1✔
1708
                // Drop the message and count it
1709
                this.statistics.incrementDroppedCount();
1✔
1710
                c.incrementDroppedCount();
1✔
1711

1712
                // Notify the first time
1713
                if (!c.isMarkedSlow()) {
1✔
1714
                    c.markSlow();
1✔
1715
                    processSlowConsumer(c);
1✔
1716
                }
1717
            } else if (q != null) {
1✔
1718
                c.markNotSlow();
1✔
1719

1720
                // beforeQueueProcessor returns true if the message is allowed to be queued
1721
                if (sub.getBeforeQueueProcessor().apply(msg)) {
1✔
1722
                    q.push(msg);
1✔
1723
                }
1724
            }
1725

1726
        }
1727
//        else {
1728
//            // Drop messages we don't have a subscriber for (could be extras on an
1729
//            // auto-unsub for example)
1730
//        }
1731
    }
1✔
1732

1733
    void processOK() {
1734
        this.statistics.incrementOkCount();
1✔
1735
    }
1✔
1736

1737
    void processSlowConsumer(Consumer consumer) {
1738
        if (!this.callbackRunner.isShutdown()) {
1✔
1739
            try {
1740
                this.callbackRunner.execute(() -> {
1✔
1741
                    try {
1742
                        options.getErrorListener().slowConsumerDetected(this, consumer);
1✔
1743
                    } catch (Exception ex) {
1✔
1744
                        this.statistics.incrementExceptionCount();
1✔
1745
                    }
1✔
1746
                });
1✔
1747
            } catch (RejectedExecutionException re) {
×
1748
                // Timing with shutdown, let it go
1749
            }
1✔
1750
        }
1751
    }
1✔
1752

1753
    void processException(Exception exp) {
1754
        this.statistics.incrementExceptionCount();
1✔
1755

1756
        if (!this.callbackRunner.isShutdown()) {
1✔
1757
            try {
1758
                this.callbackRunner.execute(() -> {
1✔
1759
                    try {
1760
                        options.getErrorListener().exceptionOccurred(this, exp);
1✔
1761
                    } catch (Exception ex) {
1✔
1762
                        this.statistics.incrementExceptionCount();
1✔
1763
                    }
1✔
1764
                });
1✔
1765
            } catch (RejectedExecutionException re) {
1✔
1766
                // Timing with shutdown, let it go
1767
            }
1✔
1768
        }
1769
    }
1✔
1770

1771
    void processError(String errorText) {
1772
        this.statistics.incrementErrCount();
1✔
1773

1774
        this.lastError.set(errorText);
1✔
1775
        this.connectError.set(errorText); // even if this isn't during connection, save it just in case
1✔
1776

1777
        // If we are connected && we get an authentication error, save it
1778
        if (this.isConnected() && this.isAuthenticationError(errorText) && currentServer != null) {
1✔
1779
            this.serverAuthErrors.put(currentServer, errorText);
1✔
1780
        }
1781

1782
        if (!this.callbackRunner.isShutdown()) {
1✔
1783
            try {
1784
                this.callbackRunner.execute(() -> {
1✔
1785
                    try {
1786
                        options.getErrorListener().errorOccurred(this, errorText);
1✔
1787
                    } catch (Exception ex) {
1✔
1788
                        this.statistics.incrementExceptionCount();
1✔
1789
                    }
1✔
1790
                });
1✔
1791
            } catch (RejectedExecutionException re) {
×
1792
                // Timing with shutdown, let it go
1793
            }
1✔
1794
        }
1795
    }
1✔
1796

1797
    interface ErrorListenerCaller {
1798
        void call(Connection conn, ErrorListener el);
1799
    }
1800

1801
    void executeCallback(ErrorListenerCaller elc) {
1802
        if (!this.callbackRunner.isShutdown()) {
1✔
1803
            try {
1804
                this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
1✔
1805
            } catch (RejectedExecutionException re) {
×
1806
                // Timing with shutdown, let it go
1807
            }
1✔
1808
        }
1809
    }
1✔
1810

1811
    void processConnectionEvent(Events type) {
1812
        if (!this.callbackRunner.isShutdown()) {
1✔
1813
            try {
1814
                for (ConnectionListener listener : connectionListeners) {
1✔
1815
                    this.callbackRunner.execute(() -> {
1✔
1816
                        try {
1817
                            listener.connectionEvent(this, type);
1✔
1818
                        } catch (Exception ex) {
1✔
1819
                            this.statistics.incrementExceptionCount();
1✔
1820
                        }
1✔
1821
                    });
1✔
1822
                }
1✔
1823
            } catch (RejectedExecutionException re) {
×
1824
                // Timing with shutdown, let it go
1825
            }
1✔
1826
        }
1827
    }
1✔
1828

1829
    /**
1830
     * {@inheritDoc}
1831
     */
1832
    @Override
1833
    public ServerInfo getServerInfo() {
1834
        return getInfo();
1✔
1835
    }
1836

1837
    /**
1838
     * {@inheritDoc}
1839
     */
1840
    @Override
1841
    public InetAddress getClientInetAddress() {
1842
        try {
1843
            return InetAddress.getByName(getInfo().getClientIp());
1✔
1844
        }
1845
        catch (Exception e) {
×
1846
            return null;
×
1847
        }
1848
    }
1849

1850
    ServerInfo getInfo() {
1851
        return this.serverInfo.get();
1✔
1852
    }
1853

1854
    /**
1855
     * {@inheritDoc}
1856
     */
1857
    @Override
1858
    public Options getOptions() {
1859
        return this.options;
1✔
1860
    }
1861

1862
    /**
1863
     * {@inheritDoc}
1864
     */
1865
    @Override
1866
    public Statistics getStatistics() {
1867
        return this.statistics;
1✔
1868
    }
1869

1870
    StatisticsCollector getNatsStatistics() {
1871
        return this.statistics;
1✔
1872
    }
1873

1874
    DataPort getDataPort() {
1875
        return this.dataPort;
1✔
1876
    }
1877

1878
    // Used for testing
1879
    int getConsumerCount() {
1880
        return this.subscribers.size() + this.dispatchers.size();
1✔
1881
    }
1882

1883
    public long getMaxPayload() {
1884
        ServerInfo info = this.serverInfo.get();
1✔
1885

1886
        if (info == null) {
1✔
1887
            return -1;
×
1888
        }
1889

1890
        return info.getMaxPayload();
1✔
1891
    }
1892

1893
    /**
1894
     * Return the list of known server urls, including additional servers discovered
1895
     * after a connection has been established.
1896
     * @return this connection's list of known server URLs
1897
     */
1898
    public Collection<String> getServers() {
1899
        return serverPool.getServerList();
1✔
1900
    }
1901

1902
    protected List<NatsUri> resolveHost(NatsUri nuri) {
1903
        // 1. If the nuri host is not already an ip address or the nuri is not for websocket or fast fallback is disabled,
1904
        //    let the pool resolve it.
1905
        List<NatsUri> results = new ArrayList<>();
1✔
1906
        if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && !options.isEnableFastFallback()) {
1✔
1907
            List<String> ips = serverPool.resolveHostToIps(nuri.getHost());
1✔
1908
            if (ips != null) {
1✔
1909
                for (String ip : ips) {
1✔
1910
                    try {
1911
                        results.add(nuri.reHost(ip));
1✔
1912
                    }
1913
                    catch (URISyntaxException u) {
1✔
1914
                        // ??? should never happen
1915
                    }
1✔
1916
                }
1✔
1917
            }
1918
        }
1919

1920
        // 2. If there were no results,
1921
        //    - host was already an ip address or
1922
        //    - host was for websocket or
1923
        //    - fast fallback is enabled
1924
        //    - pool returned nothing or
1925
        //    - resolving failed...
1926
        //    so the list just becomes the original host.
1927
        if (results.isEmpty()) {
1✔
1928
            results.add(nuri);
1✔
1929
        }
1930
        return results;
1✔
1931
    }
1932

1933
    /**
1934
     * {@inheritDoc}
1935
     */
1936
    @Override
1937
    public String getConnectedUrl() {
1938
        return currentServer == null ? null : currentServer.toString();
1✔
1939
    }
1940

1941
    /**
1942
     * {@inheritDoc}
1943
     */
1944
    @Override
1945
    public Status getStatus() {
1946
        return this.status;
1✔
1947
    }
1948

1949
    /**
1950
     * {@inheritDoc}
1951
     */
1952
    @Override
1953
    public String getLastError() {
1954
        return this.lastError.get();
1✔
1955
    }
1956

1957
    /**
1958
     * {@inheritDoc}
1959
     */
1960
    @Override
1961
    public void clearLastError() {
1962
        this.lastError.set("");
1✔
1963
    }
1✔
1964

1965
    ExecutorService getExecutor() {
1966
        return executor;
1✔
1967
    }
1968

1969
    ScheduledExecutorService getScheduledExecutor() {
1970
        return scheduledExecutor;
1✔
1971
    }
1972

1973
    void updateStatus(Status newStatus) {
1974
        Status oldStatus = this.status;
1✔
1975

1976
        statusLock.lock();
1✔
1977
        try {
1978
            if (oldStatus == Status.CLOSED || newStatus == oldStatus) {
1✔
1979
                return;
1✔
1980
            }
1981
            this.status = newStatus;
1✔
1982
        } finally {
1983
            statusChanged.signalAll();
1✔
1984
            statusLock.unlock();
1✔
1985
        }
1986

1987
        if (this.status == Status.DISCONNECTED) {
1✔
1988
            processConnectionEvent(Events.DISCONNECTED);
1✔
1989
        } else if (this.status == Status.CLOSED) {
1✔
1990
            processConnectionEvent(Events.CLOSED);
1✔
1991
        } else if (oldStatus == Status.RECONNECTING && this.status == Status.CONNECTED) {
1✔
1992
            processConnectionEvent(Events.RECONNECTED);
1✔
1993
        } else if (this.status == Status.CONNECTED) {
1✔
1994
            processConnectionEvent(Events.CONNECTED);
1✔
1995
        }
1996
    }
1✔
1997

1998
    boolean isClosing() {
1999
        return this.closing;
1✔
2000
    }
2001

2002
    boolean isClosed() {
2003
        return this.status == Status.CLOSED;
1✔
2004
    }
2005

2006
    boolean isConnected() {
2007
        return this.status == Status.CONNECTED;
1✔
2008
    }
2009

2010
    boolean isDisconnected() {
2011
        return this.status == Status.DISCONNECTED;
×
2012
    }
2013

2014
    boolean isConnectedOrConnecting() {
2015
        statusLock.lock();
1✔
2016
        try {
2017
            return this.status == Status.CONNECTED || this.connecting;
1✔
2018
        } finally {
2019
            statusLock.unlock();
1✔
2020
        }
2021
    }
2022

2023
    boolean isDisconnectingOrClosed() {
2024
        statusLock.lock();
1✔
2025
        try {
2026
            return this.status == Status.CLOSED || this.disconnecting;
1✔
2027
        } finally {
2028
            statusLock.unlock();
1✔
2029
        }
2030
    }
2031

2032
    boolean isDisconnecting() {
2033
        statusLock.lock();
1✔
2034
        try {
2035
            return this.disconnecting;
1✔
2036
        } finally {
2037
            statusLock.unlock();
1✔
2038
        }
2039
    }
2040

2041
    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
2042
        waitFor(timeout, (Void) -> this.isDisconnecting() && !this.isClosed() );
1✔
2043
    }
1✔
2044

2045
    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
2046
        waitFor(timeout, (Void) -> !this.isConnected() && !this.isClosed());
1✔
2047
    }
1✔
2048

2049
    void waitFor(Duration timeout, Predicate<Void> test) throws InterruptedException {
2050
        statusLock.lock();
1✔
2051
        try {
2052
            long currentWaitNanos = (timeout != null) ? timeout.toNanos() : -1;
1✔
2053
            long start = NatsSystemClock.nanoTime();
1✔
2054
            while (currentWaitNanos >= 0 && test.test(null)) {
1✔
2055
                if (currentWaitNanos > 0) {
1✔
2056
                    statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS);
1✔
2057
                    long now = NatsSystemClock.nanoTime();
1✔
2058
                    currentWaitNanos = currentWaitNanos - (now - start);
1✔
2059
                    start = now;
1✔
2060

2061
                    if (currentWaitNanos <= 0) {
1✔
2062
                        break;
1✔
2063
                    }
2064
                } else {
1✔
2065
                    statusChanged.await();
×
2066
                }
2067
            }
2068
        } finally {
2069
            statusLock.unlock();
1✔
2070
        }
2071
    }
1✔
2072

2073
    void invokeReconnectDelayHandler(long totalRounds) {
2074
        long currentWaitNanos = 0;
1✔
2075

2076
        ReconnectDelayHandler handler = options.getReconnectDelayHandler();
1✔
2077
        if (handler == null) {
1✔
2078
            Duration dur = options.getReconnectWait();
1✔
2079
            if (dur != null) {
1✔
2080
                currentWaitNanos = dur.toNanos();
1✔
2081
                dur = serverPool.hasSecureServer() ? options.getReconnectJitterTls() : options.getReconnectJitter();
1✔
2082
                if (dur != null) {
1✔
2083
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
1✔
2084
                }
2085
            }
2086
        }
1✔
2087
        else {
2088
            Duration waitTime = handler.getWaitTime(totalRounds);
1✔
2089
            if (waitTime != null) {
1✔
2090
                currentWaitNanos = waitTime.toNanos();
1✔
2091
            }
2092
        }
2093

2094
        this.reconnectWaiter = new CompletableFuture<>();
1✔
2095

2096
        long start = NatsSystemClock.nanoTime();
1✔
2097
        while (currentWaitNanos > 0 && !isDisconnectingOrClosed() && !isConnected() && !this.reconnectWaiter.isDone()) {
1✔
2098
            try {
2099
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
×
2100
            } catch (Exception exp) {
1✔
2101
                // ignore, try to loop again
2102
            }
×
2103
            long now = NatsSystemClock.nanoTime();
1✔
2104
            currentWaitNanos = currentWaitNanos - (now - start);
1✔
2105
            start = now;
1✔
2106
        }
1✔
2107

2108
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
2109
    }
1✔
2110

2111
    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
2112
        int current = buffer.capacity();
1✔
2113
        int newSize = current * 2;
1✔
2114
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
1✔
2115
        buffer.flip();
1✔
2116
        newBuffer.put(buffer);
1✔
2117
        return newBuffer;
1✔
2118
    }
2119

2120
    // For testing
2121
    NatsConnectionReader getReader() {
2122
        return this.reader;
1✔
2123
    }
2124

2125
    // For testing
2126
    NatsConnectionWriter getWriter() {
2127
        return this.writer;
1✔
2128
    }
2129

2130
    // For testing
2131
    Future<DataPort> getDataPortFuture() {
2132
        return this.dataPortFuture;
1✔
2133
    }
2134

2135
    boolean isDraining() {
2136
        return this.draining.get() != null;
1✔
2137
    }
2138

2139
    boolean isDrained() {
2140
        CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2141

2142
        try {
2143
            if (tracker != null && tracker.getNow(false)) {
1✔
2144
                return true;
1✔
2145
            }
2146
        } catch (Exception e) {
×
2147
            // These indicate the tracker was cancelled/timed out
2148
        }
1✔
2149

2150
        return false;
1✔
2151
    }
2152

2153
    /**
2154
     * {@inheritDoc}
2155
     */
2156
    @Override
2157
    public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException {
2158

2159
        if (isClosing() || isClosed()) {
1✔
2160
            throw new IllegalStateException("A connection can't be drained during close.");
1✔
2161
        }
2162

2163
        this.statusLock.lock();
1✔
2164
        try {
2165
            if (isDraining()) {
1✔
2166
                return this.draining.get();
1✔
2167
            }
2168
            this.draining.set(new CompletableFuture<>());
1✔
2169
        } finally {
2170
            this.statusLock.unlock();
1✔
2171
        }
2172

2173
        final CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2174
        Instant start = Instant.now();
1✔
2175

2176
        // Don't include subscribers with dispatchers
2177
        HashSet<NatsSubscription> pureSubscribers = new HashSet<>(this.subscribers.values());
1✔
2178
        pureSubscribers.removeIf((s) -> s.getDispatcher() != null);
1✔
2179

2180
        final HashSet<NatsConsumer> consumers = new HashSet<>();
1✔
2181
        consumers.addAll(pureSubscribers);
1✔
2182
        consumers.addAll(this.dispatchers.values());
1✔
2183

2184
        NatsDispatcher inboxer = this.inboxDispatcher.get();
1✔
2185

2186
        if (inboxer != null) {
1✔
2187
            consumers.add(inboxer);
1✔
2188
        }
2189

2190
        // Stop the consumers NOW so that when this method returns they are blocked
2191
        consumers.forEach((cons) -> {
1✔
2192
            cons.markDraining(tracker);
1✔
2193
            cons.sendUnsubForDrain();
1✔
2194
        });
1✔
2195

2196
        try {
2197
            this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
1✔
2198
        } catch (Exception e) {
1✔
2199
            this.close(false, false);
1✔
2200
            throw e;
1✔
2201
        }
1✔
2202

2203
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
1✔
2204

2205
        // Wait for the timeout or the pending count to go to 0
2206
        executor.submit(() -> {
1✔
2207
            try {
2208
                long stop = (timeout == null || timeout.equals(Duration.ZERO))
1✔
2209
                    ? Long.MAX_VALUE
2210
                    : NatsSystemClock.nanoTime() + timeout.toNanos();
1✔
2211
                while (NatsSystemClock.nanoTime() < stop && !Thread.interrupted())
1✔
2212
                {
2213
                    consumers.removeIf(NatsConsumer::isDrained);
1✔
2214
                    if (consumers.isEmpty()) {
1✔
2215
                        break;
1✔
2216
                    }
2217
                    //noinspection BusyWait
2218
                    Thread.sleep(1); // Sleep 1 milli
1✔
2219
                }
2220

2221
                // Stop publishing
2222
                this.blockPublishForDrain.set(true);
1✔
2223

2224
                // One last flush
2225
                if (timeout == null || timeout.equals(Duration.ZERO)) {
1✔
2226
                    this.flush(Duration.ZERO);
1✔
2227
                } else {
2228
                    Instant now = Instant.now();
1✔
2229
                    Duration passed = Duration.between(start, now);
1✔
2230
                    Duration newTimeout = timeout.minus(passed);
1✔
2231
                    if (newTimeout.toNanos() > 0) {
1✔
2232
                        this.flush(newTimeout);
1✔
2233
                    }
2234
                }
2235
                this.close(false, false); // close the connection after the last flush
1✔
2236
                tracker.complete(consumers.isEmpty());
1✔
2237
            } catch (TimeoutException e) {
×
2238
                this.processException(e);
×
2239
            } catch (InterruptedException e) {
×
2240
                this.processException(e);
×
2241
                Thread.currentThread().interrupt();
×
2242
            } finally {
2243
                try {
2244
                    this.close(false, false);// close the connection after the last flush
1✔
2245
                } catch (InterruptedException e) {
×
2246
                    processException(e);
×
2247
                    Thread.currentThread().interrupt();
×
2248
                }
1✔
2249
                tracker.complete(false);
1✔
2250
            }
2251
        });
1✔
2252

2253
        return tracker;
1✔
2254
    }
2255

2256
    boolean isAuthenticationError(String err) {
2257
        if (err == null) {
1✔
2258
            return false;
1✔
2259
        }
2260
        err = err.toLowerCase();
1✔
2261
        return err.startsWith("user authentication")
1✔
2262
            || err.contains("authorization violation")
1✔
2263
            || err.startsWith("account authentication expired");
1✔
2264
    }
2265

2266
    /**
2267
     * {@inheritDoc}
2268
     */
2269
    @Override
2270
    public void flushBuffer() throws IOException {
2271
        if (!isConnected()) {
1✔
2272
            throw new IllegalStateException("Connection is not active.");
1✔
2273
        }
2274
        writer.flushBuffer();
1✔
2275
    }
1✔
2276

2277
    /**
2278
     * {@inheritDoc}
2279
     */
2280
    @Override
2281
    public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException {
2282
        Validator.validateStreamName(streamName, true);
1✔
2283
        ensureNotClosing();
1✔
2284
        return new NatsStreamContext(streamName, null, this, null);
1✔
2285
    }
2286

2287
    /**
2288
     * {@inheritDoc}
2289
     */
2290
    @Override
2291
    public StreamContext getStreamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException {
2292
        Validator.validateStreamName(streamName, true);
1✔
2293
        ensureNotClosing();
1✔
2294
        return new NatsStreamContext(streamName, null, this, options);
1✔
2295
    }
2296

2297
    /**
2298
     * {@inheritDoc}
2299
     */
2300
    @Override
2301
    public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException {
2302
        return getStreamContext(streamName).getConsumerContext(consumerName);
1✔
2303
    }
2304

2305
    /**
2306
     * {@inheritDoc}
2307
     */
2308
    @Override
2309
    public ConsumerContext getConsumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException {
2310
        return getStreamContext(streamName, options).getConsumerContext(consumerName);
1✔
2311
    }
2312

2313
    /**
2314
     * {@inheritDoc}
2315
     */
2316
    @Override
2317
    public JetStream jetStream() throws IOException {
2318
        ensureNotClosing();
1✔
2319
        return new NatsJetStream(this, null);
1✔
2320
    }
2321

2322
    /**
2323
     * {@inheritDoc}
2324
     */
2325
    @Override
2326
    public JetStream jetStream(JetStreamOptions options) throws IOException {
2327
        ensureNotClosing();
1✔
2328
        return new NatsJetStream(this, options);
1✔
2329
    }
2330

2331
    /**
2332
     * {@inheritDoc}
2333
     */
2334
    @Override
2335
    public JetStreamManagement jetStreamManagement() throws IOException {
2336
        ensureNotClosing();
1✔
2337
        return new NatsJetStreamManagement(this, null);
1✔
2338
    }
2339

2340
    /**
2341
     * {@inheritDoc}
2342
     */
2343
    @Override
2344
    public JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
2345
        ensureNotClosing();
1✔
2346
        return new NatsJetStreamManagement(this, options);
1✔
2347
    }
2348

2349
    /**
2350
     * {@inheritDoc}
2351
     */
2352
    @Override
2353
    public KeyValue keyValue(String bucketName) throws IOException {
2354
        Validator.validateBucketName(bucketName, true);
1✔
2355
        ensureNotClosing();
1✔
2356
        return new NatsKeyValue(this, bucketName, null);
1✔
2357
    }
2358

2359
    /**
2360
     * {@inheritDoc}
2361
     */
2362
    @Override
2363
    public KeyValue keyValue(String bucketName, KeyValueOptions options) throws IOException {
2364
        Validator.validateBucketName(bucketName, true);
1✔
2365
        ensureNotClosing();
1✔
2366
        return new NatsKeyValue(this, bucketName, options);
1✔
2367
    }
2368

2369
    /**
2370
     * {@inheritDoc}
2371
     */
2372
    @Override
2373
    public KeyValueManagement keyValueManagement() throws IOException {
2374
        ensureNotClosing();
1✔
2375
        return new NatsKeyValueManagement(this, null);
1✔
2376
    }
2377

2378
    /**
2379
     * {@inheritDoc}
2380
     */
2381
    @Override
2382
    public KeyValueManagement keyValueManagement(KeyValueOptions options) throws IOException {
2383
        ensureNotClosing();
1✔
2384
        return new NatsKeyValueManagement(this, options);
1✔
2385
    }
2386

2387
    /**
2388
     * {@inheritDoc}
2389
     */
2390
    @Override
2391
    public ObjectStore objectStore(String bucketName) throws IOException {
2392
        Validator.validateBucketName(bucketName, true);
1✔
2393
        ensureNotClosing();
1✔
2394
        return new NatsObjectStore(this, bucketName, null);
1✔
2395
    }
2396

2397
    /**
2398
     * {@inheritDoc}
2399
     */
2400
    @Override
2401
    public ObjectStore objectStore(String bucketName, ObjectStoreOptions options) throws IOException {
2402
        Validator.validateBucketName(bucketName, true);
1✔
2403
        ensureNotClosing();
1✔
2404
        return new NatsObjectStore(this, bucketName, options);
1✔
2405
    }
2406

2407
    /**
2408
     * {@inheritDoc}
2409
     */
2410
    @Override
2411
    public ObjectStoreManagement objectStoreManagement() throws IOException {
2412
        ensureNotClosing();
1✔
2413
        return new NatsObjectStoreManagement(this, null);
1✔
2414
    }
2415

2416
    /**
2417
     * {@inheritDoc}
2418
     */
2419
    @Override
2420
    public ObjectStoreManagement objectStoreManagement(ObjectStoreOptions options) throws IOException {
2421
        ensureNotClosing();
1✔
2422
        return new NatsObjectStoreManagement(this, options);
1✔
2423
    }
2424

2425
    private void ensureNotClosing() throws IOException {
2426
        if (isClosing() || isClosed()) {
1✔
2427
            throw new IOException("A JetStream context can't be established during close.");
1✔
2428
        }
2429
    }
1✔
2430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc