• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2064

16 Jul 2025 10:21AM UTC coverage: 95.561% (-0.03%) from 95.593%
#2064

push

github

web-flow
Merge pull request #1360 from nats-io/sslSocket-removeHandshakeCompletedListener

SSL handshake listeners never removed, preventing garbage collection

3 of 3 new or added lines in 1 file covered. (100.0%)

4 existing lines in 2 files now uncovered.

11839 of 12389 relevant lines covered (95.56%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.85
/src/main/java/io/nats/client/impl/NatsConnection.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.ConnectionListener.Events;
18
import io.nats.client.api.ServerInfo;
19
import io.nats.client.support.*;
20

21
import java.io.IOException;
22
import java.net.InetAddress;
23
import java.net.URISyntaxException;
24
import java.nio.ByteBuffer;
25
import java.nio.CharBuffer;
26
import java.time.Duration;
27
import java.time.Instant;
28
import java.util.*;
29
import java.util.concurrent.*;
30
import java.util.concurrent.atomic.AtomicBoolean;
31
import java.util.concurrent.atomic.AtomicLong;
32
import java.util.concurrent.atomic.AtomicReference;
33
import java.util.concurrent.locks.Condition;
34
import java.util.concurrent.locks.ReentrantLock;
35
import java.util.function.Predicate;
36

37
import static io.nats.client.support.NatsConstants.*;
38
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
39
import static io.nats.client.support.Validator.*;
40
import static java.nio.charset.StandardCharsets.UTF_8;
41

42
class NatsConnection implements Connection {
43

44
    public static final double NANOS_PER_SECOND = 1_000_000_000.0;
45

46
    private final Options options;
47
    final boolean forceFlushOnRequest;
48

49
    private final StatisticsCollector statistics;
50

51
    private boolean connecting; // you can only connect in one thread
52
    private boolean disconnecting; // you can only disconnect in one thread
53
    private boolean closing; // respect a close call regardless
54
    private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
55
    final ReentrantLock closeSocketLock;
56

57
    private Status status;
58
    private final ReentrantLock statusLock;
59
    private final Condition statusChanged;
60

61
    private CompletableFuture<DataPort> dataPortFuture;
62
    private DataPort dataPort;
63
    private NatsUri currentServer;
64
    private CompletableFuture<Boolean> reconnectWaiter;
65
    private final ConcurrentHashMap<NatsUri, String> serverAuthErrors;
66

67
    private NatsConnectionReader reader;
68
    private NatsConnectionWriter writer;
69

70
    private final AtomicReference<ServerInfo> serverInfo;
71

72
    private final Map<String, NatsSubscription> subscribers;
73
    private final Map<String, NatsDispatcher> dispatchers; // use a concurrent map so we get more consistent iteration behavior
74
    private final Collection<ConnectionListener> connectionListeners;
75
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
76
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
77
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
78

79
    private final String mainInbox;
80
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
81
    private final ReentrantLock inboxDispatcherLock;
82
    private ScheduledTask pingTask;
83
    private ScheduledTask cleanupTask;
84

85
    private final AtomicBoolean needPing;
86

87
    private final AtomicLong nextSid;
88
    private final NUID nuid;
89

90
    private final AtomicReference<String> connectError;
91
    private final AtomicReference<String> lastError;
92
    private final AtomicReference<CompletableFuture<Boolean>> draining;
93
    private final AtomicBoolean blockPublishForDrain;
94
    private final AtomicBoolean tryingToConnect;
95

96
    private final ExecutorService callbackRunner;
97
    private final ExecutorService executor;
98
    private final ExecutorService connectExecutor;
99
    private final ScheduledExecutorService scheduledExecutor;
100
    private final boolean advancedTracking;
101

102
    private final ServerPool serverPool;
103
    private final DispatcherFactory dispatcherFactory;
104
    final CancelAction cancelAction;
105

106
    private final boolean trace;
107
    private final TimeTraceLogger timeTraceLogger;
108

109
    NatsConnection(Options options) {
1✔
110
        trace = options.isTraceConnection();
1✔
111
        timeTraceLogger = options.getTimeTraceLogger();
1✔
112
        timeTraceLogger.trace("creating connection object");
1✔
113

114
        this.options = options;
1✔
115
        forceFlushOnRequest = options.forceFlushOnRequest();
1✔
116

117
        advancedTracking = options.isTrackAdvancedStats();
1✔
118
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
1✔
119
        this.statistics.setAdvancedTracking(advancedTracking);
1✔
120

121
        this.closeSocketLock = new ReentrantLock();
1✔
122

123
        this.statusLock = new ReentrantLock();
1✔
124
        this.statusChanged = this.statusLock.newCondition();
1✔
125
        this.status = Status.DISCONNECTED;
1✔
126
        this.reconnectWaiter = new CompletableFuture<>();
1✔
127
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
128

129
        this.connectionListeners = ConcurrentHashMap.newKeySet();
1✔
130
        if (options.getConnectionListener() != null) {
1✔
131
            addConnectionListener(options.getConnectionListener());
1✔
132
        }
133

134
        this.dispatchers = new ConcurrentHashMap<>();
1✔
135
        this.subscribers = new ConcurrentHashMap<>();
1✔
136
        this.responsesAwaiting = new ConcurrentHashMap<>();
1✔
137
        this.responsesRespondedTo = new ConcurrentHashMap<>();
1✔
138
        this.serverAuthErrors = new ConcurrentHashMap<>();
1✔
139

140
        this.nextSid = new AtomicLong(1);
1✔
141
        timeTraceLogger.trace("creating NUID");
1✔
142
        this.nuid = new NUID();
1✔
143
        this.mainInbox = createInbox() + ".*";
1✔
144

145
        this.lastError = new AtomicReference<>();
1✔
146
        this.connectError = new AtomicReference<>();
1✔
147

148
        this.serverInfo = new AtomicReference<>();
1✔
149
        this.inboxDispatcher = new AtomicReference<>();
1✔
150
        this.inboxDispatcherLock = new ReentrantLock();
1✔
151
        this.pongQueue = new ConcurrentLinkedDeque<>();
1✔
152
        this.draining = new AtomicReference<>();
1✔
153
        this.blockPublishForDrain = new AtomicBoolean();
1✔
154
        this.tryingToConnect = new AtomicBoolean();
1✔
155

156
        timeTraceLogger.trace("creating executors");
1✔
157
        this.executor = options.getExecutor();
1✔
158
        this.callbackRunner = options.getCallbackExecutor();
1✔
159
        this.connectExecutor = options.getConnectExecutor();
1✔
160
        this.scheduledExecutor = options.getScheduledExecutor();
1✔
161

162
        timeTraceLogger.trace("creating reader and writer");
1✔
163
        this.reader = new NatsConnectionReader(this);
1✔
164
        this.writer = new NatsConnectionWriter(this, null);
1✔
165

166
        this.needPing = new AtomicBoolean(true);
1✔
167

168
        serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
1✔
169
        serverPool.initialize(options);
1✔
170
        dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
1✔
171

172
        cancelAction = options.isReportNoResponders() ? CancelAction.REPORT : CancelAction.CANCEL;
1✔
173

174
        timeTraceLogger.trace("connection object created");
1✔
175
    }
1✔
176

177
    // Connect is only called after creation
178
    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
179
        if (!tryingToConnect.get()) {
1✔
180
            try {
181
                tryingToConnect.set(true);
1✔
182
                connectImpl(reconnectOnConnect);
1✔
183
            }
184
            finally {
185
                tryingToConnect.set(false);
1✔
186
            }
187
        }
188
    }
1✔
189

190
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
191
        if (options.getServers().isEmpty()) {
1✔
192
            throw new IllegalArgumentException("No servers provided in options");
×
193
        }
194

195
        boolean trace = options.isTraceConnection();
1✔
196
        long start = NatsSystemClock.nanoTime();
1✔
197

198
        this.lastError.set("");
1✔
199

200
        timeTraceLogger.trace("starting connect loop");
1✔
201

202
        Set<NatsUri> failList = new HashSet<>();
1✔
203
        boolean keepGoing = true;
1✔
204
        NatsUri first = null;
1✔
205
        NatsUri cur;
206
        while (keepGoing && (cur = serverPool.peekNextServer()) != null) {
1✔
207
            if (first == null) {
1✔
208
                first = cur;
1✔
209
            }
210
            else if (cur.equals(first)) {
1✔
211
                break;  // connect only goes through loop once
1✔
212
            }
213
            serverPool.nextServer(); // b/c we only peeked.
1✔
214

215
            // let server pool resolve hostnames, then loop through resolved
216
            List<NatsUri> resolvedList = resolveHost(cur);
1✔
217
            for (NatsUri resolved : resolvedList) {
1✔
218
                if (isClosed()) {
1✔
219
                    keepGoing = false;
1✔
220
                    break;
1✔
221
                }
222
                connectError.set(""); // new on each attempt
1✔
223

224
                timeTraceLogger.trace("setting status to connecting");
1✔
225
                updateStatus(Status.CONNECTING);
1✔
226

227
                timeTraceLogger.trace("trying to connect to %s", cur);
1✔
228
                tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
229

230
                if (isConnected()) {
1✔
231
                    serverPool.connectSucceeded(cur);
1✔
232
                    keepGoing = false;
1✔
233
                    break;
1✔
234
                }
235

236
                timeTraceLogger.trace("setting status to disconnected");
1✔
237
                updateStatus(Status.DISCONNECTED);
1✔
238

239
                failList.add(cur);
1✔
240
                serverPool.connectFailed(cur);
1✔
241

242
                String err = connectError.get();
1✔
243

244
                if (this.isAuthenticationError(err)) {
1✔
245
                    this.serverAuthErrors.put(resolved, err);
1✔
246
                }
247
            }
1✔
248
        }
1✔
249

250
        if (!isConnected() && !isClosed()) {
1✔
251
            if (reconnectOnConnect) {
1✔
252
                timeTraceLogger.trace("trying to reconnect on connect");
1✔
253
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
254
            }
255
            else {
256
                timeTraceLogger.trace("connection failed, closing to cleanup");
1✔
257
                close();
1✔
258

259
                String err = connectError.get();
1✔
260
                if (this.isAuthenticationError(err)) {
1✔
261
                    throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
1✔
262
                }
263
                throw new IOException("Unable to connect to NATS servers: " + failList);
1✔
264
            }
265
        }
266
        else if (trace) {
1✔
267
            long end = NatsSystemClock.nanoTime();
1✔
268
            double seconds = ((double) (end - start)) / NANOS_PER_SECOND;
1✔
269
            timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
1✔
270
        }
271
    }
1✔
272

273
    @Override
274
    public void forceReconnect() throws IOException, InterruptedException {
275
        forceReconnect(null);
1✔
276
    }
1✔
277

278
    @Override
279
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
280
        if (!tryingToConnect.get()) {
1✔
281
            try {
282
                tryingToConnect.set(true);
1✔
283
                forceReconnectImpl(options);
1✔
284
            }
285
            finally {
286
                tryingToConnect.set(false);
1✔
287
            }
288
        }
289
    }
1✔
290

291
    void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedException {
292
        if (options != null && options.getFlushWait() != null) {
1✔
293
            try {
294
                flush(options.getFlushWait());
×
295
            }
296
            catch (TimeoutException e) {
×
297
                // ignore, don't care, too bad;
298
            }
×
299
        }
300

301
        closeSocketLock.lock();
1✔
302
        try {
303
            updateStatus(Status.DISCONNECTED);
1✔
304

305
            // Close and reset the current data port and future
306
            if (dataPortFuture != null) {
1✔
307
                dataPortFuture.cancel(true);
1✔
308
                dataPortFuture = null;
1✔
309
            }
310

311
            // close the data port as a task so as not to block reconnect
312
            if (dataPort != null) {
1✔
313
                final DataPort closeMe = dataPort;
1✔
314
                dataPort = null;
1✔
315
                executor.submit(() -> {
1✔
316
                    try {
317
                        if (options != null && options.isForceClose()) {
1✔
318
                            closeMe.forceClose();
×
319
                        }
320
                        else {
321
                            closeMe.close();
1✔
322
                        }
323
                    }
324
                    catch (IOException ignore) {
×
325
                    }
1✔
326
                });
1✔
327
            }
328

329
            // stop i/o
330
            try {
331
                this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
1✔
332
            }
333
            catch (Exception ex) {
×
334
                processException(ex);
×
335
            }
1✔
336
            try {
337
                this.writer.stop().get(100, TimeUnit.MILLISECONDS);
1✔
338
            }
339
            catch (Exception ex) {
×
340
                processException(ex);
×
341
            }
1✔
342

343
            // new reader/writer
344
            reader = new NatsConnectionReader(this);
1✔
345
            writer = new NatsConnectionWriter(this, writer);
1✔
346
        }
347
        finally {
348
            closeSocketLock.unlock();
1✔
349
        }
350

351
        // calling connect just starts like a new connection versus reconnect
352
        // but we have to manually resubscribe like reconnect once it is connected
353
        reconnectImpl();
1✔
354
        writer.setReconnectMode(false);
1✔
355
    }
1✔
356

357
    void reconnect() throws InterruptedException {
358
        if (!tryingToConnect.get()) {
1✔
359
            try {
360
                tryingToConnect.set(true);
1✔
361
                reconnectImpl();
1✔
362
            }
363
            finally {
364
                tryingToConnect.set(false);
1✔
365
            }
366
        }
367
    }
1✔
368

369
    // Reconnect can only be called when the connection is disconnected
370
    void reconnectImpl() throws InterruptedException {
371
        if (isClosed()) {
1✔
372
            return;
1✔
373
        }
374

375
        if (options.getMaxReconnect() == 0) {
1✔
376
            this.close();
1✔
377
            return;
1✔
378
        }
379

380
        writer.setReconnectMode(true);
1✔
381

382
        if (!isConnected() && !isClosed() && !this.isClosing()) {
1✔
383
            boolean keepGoing = true;
1✔
384
            int totalRounds = 0;
1✔
385
            NatsUri first = null;
1✔
386
            NatsUri cur;
387
            while (keepGoing && (cur = serverPool.nextServer()) != null) {
1✔
388
                if (first == null) {
1✔
389
                    first = cur;
1✔
390
                }
391
                else if (first.equals(cur)) {
1✔
392
                    // went around the pool an entire time
393
                    invokeReconnectDelayHandler(++totalRounds);
1✔
394
                }
395

396
                // let server list provider resolve hostnames
397
                // then loop through resolved
398
                List<NatsUri> resolvedList = resolveHost(cur);
1✔
399
                for (NatsUri resolved : resolvedList) {
1✔
400
                    if (isClosed()) {
1✔
401
                        keepGoing = false;
1✔
402
                        break;
1✔
403
                    }
404
                    connectError.set(""); // reset on each loop
1✔
405
                    if (isDisconnectingOrClosed() || this.isClosing()) {
1✔
UNCOV
406
                        keepGoing = false;
×
UNCOV
407
                        break;
×
408
                    }
409
                    updateStatus(Status.RECONNECTING);
1✔
410

411
                    timeTraceLogger.trace("reconnecting to server %s", cur);
1✔
412
                    tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
413

414
                    if (isConnected()) {
1✔
415
                        serverPool.connectSucceeded(cur);
1✔
416
                        statistics.incrementReconnects();
1✔
417
                        keepGoing = false;
1✔
418
                        break;
1✔
419
                    }
420

421
                    serverPool.connectFailed(cur);
1✔
422
                    String err = connectError.get();
1✔
423
                    if (this.isAuthenticationError(err)) {
1✔
424
                        if (err.equals(this.serverAuthErrors.get(resolved))) {
1✔
425
                            keepGoing = false; // double auth error
1✔
426
                            break;
1✔
427
                        }
428
                        serverAuthErrors.put(resolved, err);
1✔
429
                    }
430
                }
1✔
431
            }
1✔
432
        } // end-main-loop
433

434
        if (!isConnected()) {
1✔
435
            this.close();
1✔
436
            return;
1✔
437
        }
438

439
        this.subscribers.forEach((sid, sub) -> {
1✔
440
            if (sub.getDispatcher() == null && !sub.isDraining()) {
1✔
441
                sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
442
            }
443
        });
1✔
444

445
        this.dispatchers.forEach((nuid, d) -> {
1✔
446
            if (!d.isDraining()) {
1✔
447
                d.resendSubscriptions();
1✔
448
            }
449
        });
1✔
450

451
        try {
452
            this.flush(this.options.getConnectionTimeout());
1✔
453
        } catch (Exception exp) {
1✔
454
            this.processException(exp);
1✔
455
        }
1✔
456

457
        processConnectionEvent(Events.RESUBSCRIBED);
1✔
458

459
        // When the flush returns we are done sending internal messages,
460
        // so we can switch to the non-reconnect queue
461
        this.writer.setReconnectMode(false);
1✔
462
    }
1✔
463

464
    long timeCheck(long endNanos, String message) throws TimeoutException {
465
        long remaining = endNanos - NatsSystemClock.nanoTime();
1✔
466
        if (trace) {
1✔
467
            traceTimeCheck(message, remaining);
1✔
468
        }
469
        if (remaining < 0) {
1✔
470
            throw new TimeoutException("connection timed out");
1✔
471
        }
472
        return remaining;
1✔
473
    }
474

475
    void traceTimeCheck(String message, long remaining) {
476
        if (remaining < 0) {
1✔
477
            if (remaining > -1_000_000) { // less than -1 ms
1✔
478
                timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remaining));
1✔
479
            }
480
            else if (remaining > -1_000_000_000) { // less than -1 second
1✔
481
                long ms = -remaining / 1_000_000;
1✔
482
                timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms));
1✔
483
            }
1✔
484
            else {
485
                double seconds = ((double)-remaining) / 1_000_000_000.0;
1✔
486
                timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds));
1✔
487
            }
1✔
488
        }
489
        else if (remaining < 1_000_000) {
1✔
490
            timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remaining));
1✔
491
        }
492
        else if (remaining < 1_000_000_000) {
1✔
493
            long ms = remaining / 1_000_000;
1✔
494
            timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms));
1✔
495
        }
1✔
496
        else {
497
            double seconds = ((double) remaining) / 1_000_000_000.0;
1✔
498
            timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds));
1✔
499
        }
500
    }
1✔
501

502
    // is called from reconnect and connect
503
    // will wait for any previous attempt to complete, using the reader.stop and
504
    // writer.stop
505
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
506
        currentServer = null;
1✔
507

508
        try {
509
            Duration connectTimeout = options.getConnectionTimeout();
1✔
510
            boolean trace = options.isTraceConnection();
1✔
511
            long end = now + connectTimeout.toNanos();
1✔
512
            timeCheck(end, "starting connection attempt");
1✔
513

514
            statusLock.lock();
1✔
515
            try {
516
                if (this.connecting) {
1✔
517
                    return;
×
518
                }
519
                this.connecting = true;
1✔
520
                statusChanged.signalAll();
1✔
521
            } finally {
522
                statusLock.unlock();
1✔
523
            }
524

525
            // Create a new future for the dataport, the reader/writer will use this
526
            // to wait for the connect/failure.
527
            this.dataPortFuture = new CompletableFuture<>();
1✔
528

529
            // Make sure the reader and writer are stopped
530
            long timeoutNanos = timeCheck(end, "waiting for reader");
1✔
531
            if (reader.isRunning()) {
1✔
532
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
533
            }
534
            timeoutNanos = timeCheck(end, "waiting for writer");
1✔
535
            if (writer.isRunning()) {
1✔
536
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
537
            }
538

539
            timeCheck(end, "cleaning pong queue");
1✔
540
            cleanUpPongQueue();
1✔
541

542
            timeoutNanos = timeCheck(end, "connecting data port");
1✔
543
            DataPort newDataPort = this.options.buildDataPort();
1✔
544
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
1✔
545

546
            // Notify any threads waiting on the sockets
547
            this.dataPort = newDataPort;
1✔
548
            this.dataPortFuture.complete(this.dataPort);
1✔
549

550
            // Wait for the INFO message manually
551
            // all other traffic will use the reader and writer
552
            // TLS First, don't read info until after upgrade
553
            Callable<Object> connectTask = () -> {
1✔
554
                if (!options.isTlsFirst()) {
1✔
555
                    readInitialInfo();
1✔
556
                    checkVersionRequirements();
1✔
557
                }
558
                long start = NatsSystemClock.nanoTime();
1✔
559
                upgradeToSecureIfNeeded(resolved);
1✔
560
                if (trace && options.isTLSRequired()) {
1✔
561
                    // If the time appears too long it might be related to
562
                    // https://github.com/nats-io/nats.java#linux-platform-note
563
                    timeTraceLogger.trace("TLS upgrade took: %.3f (s)",
×
564
                            ((double) (NatsSystemClock.nanoTime() - start)) / NANOS_PER_SECOND);
×
565
                }
566
                if (options.isTlsFirst()) {
1✔
567
                    readInitialInfo();
1✔
568
                    checkVersionRequirements();
1✔
569
                }
570
                return null;
1✔
571
            };
572

573
            timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary");
1✔
574
            Future<Object> future = this.connectExecutor.submit(connectTask);
1✔
575
            try {
576
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
577
            } finally {
578
                future.cancel(true);
1✔
579
            }
580

581
            // start the reader and writer after we secured the connection, if necessary
582
            timeCheck(end, "starting reader");
1✔
583
            this.reader.start(this.dataPortFuture);
1✔
584
            timeCheck(end, "starting writer");
1✔
585
            this.writer.start(this.dataPortFuture);
1✔
586

587
            timeCheck(end, "sending connect message");
1✔
588
            this.sendConnect(resolved);
1✔
589

590
            timeoutNanos = timeCheck(end, "sending initial ping");
1✔
591
            Future<Boolean> pongFuture = sendPing();
1✔
592

593
            if (pongFuture != null) {
1✔
594
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
595
            }
596

597
            if (pingTask == null) {
1✔
598
                timeCheck(end, "starting ping and cleanup timers");
1✔
599
                long pingMillis = this.options.getPingInterval().toMillis();
1✔
600

601
                if (pingMillis > 0) {
1✔
602
                    pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
1✔
603
                        if (isConnected() && !isClosing()) {
1✔
604
                            try {
605
                                softPing(); // The timer always uses the standard queue
1✔
606
                            }
607
                            catch (Exception e) {
1✔
608
                                // it's running in a thread, there is no point throwing here
609
                            }
1✔
610
                        }
611
                    });
1✔
612
                }
613

614
                long cleanMillis = this.options.getRequestCleanupInterval().toMillis();
1✔
615

616
                if (cleanMillis > 0) {
1✔
617
                    cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
1✔
618
                }
619
            }
620

621
            // Set connected status
622
            timeCheck(end, "updating status to connected");
1✔
623
            statusLock.lock();
1✔
624
            try {
625
                this.connecting = false;
1✔
626

627
                if (this.exceptionDuringConnectChange != null) {
1✔
628
                    throw this.exceptionDuringConnectChange;
×
629
                }
630

631
                this.currentServer = cur;
1✔
632
                this.serverAuthErrors.clear(); // reset on successful connection
1✔
633
                updateStatus(Status.CONNECTED); // will signal status change, we also signal in finally
1✔
634
            } finally {
635
                statusLock.unlock();
1✔
636
            }
637
            timeTraceLogger.trace("status updated");
1✔
638
        } catch (Exception exp) {
1✔
639
            processException(exp);
1✔
640
            try {
641
                // allow force reconnect since this is pretty exceptional,
642
                // a connection failure while trying to connect
643
                this.closeSocket(false, true);
1✔
644
            } catch (InterruptedException e) {
×
645
                processException(e);
×
646
                Thread.currentThread().interrupt();
×
647
            }
1✔
648
        } finally {
649
            statusLock.lock();
1✔
650
            try {
651
                this.connecting = false;
1✔
652
                statusChanged.signalAll();
1✔
653
            } finally {
654
                statusLock.unlock();
1✔
655
            }
656
        }
657
    }
1✔
658

659
    void checkVersionRequirements() throws IOException {
660
        Options opts = getOptions();
1✔
661
        ServerInfo info = getInfo();
1✔
662

663
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
1✔
664
            throw new IOException("Server does not support no echo.");
1✔
665
        }
666
    }
1✔
667

668
    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
669
        // When already communicating over "https" websocket, do NOT try to upgrade to secure.
670
        if (!nuri.isWebsocket()) {
1✔
671
            if (options.isTlsFirst()) {
1✔
672
                dataPort.upgradeToSecure();
1✔
673
            }
674
            else {
675
                // server    | client options      | result
676
                // --------- | ------------------- | --------
677
                // required  | not isTLSRequired() | mismatch
678
                // available | not isTLSRequired() | ok
679
                // neither   | not isTLSRequired() | ok
680
                // required  | isTLSRequired()     | ok
681
                // available | isTLSRequired()     | ok
682
                // neither   | isTLSRequired()     | mismatch
683
                ServerInfo serverInfo = getInfo();
1✔
684
                if (options.isTLSRequired()) {
1✔
685
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
1✔
686
                        throw new IOException("SSL connection wanted by client.");
1✔
687
                    }
688
                    dataPort.upgradeToSecure();
1✔
689
                }
690
                else if (serverInfo.isTLSRequired()) {
1✔
691
                    throw new IOException("SSL required by server.");
1✔
692
                }
693
            }
694
        }
695
    }
1✔
696
    // Called from reader/writer thread
697
    void handleCommunicationIssue(Exception io) {
698
        // If we are connecting or disconnecting, note exception and leave
699
        statusLock.lock();
1✔
700
        try {
701
            if (this.connecting || this.disconnecting || this.status == Status.CLOSED || this.isDraining()) {
1✔
702
                this.exceptionDuringConnectChange = io;
1✔
703
                return;
1✔
704
            }
705
        } finally {
706
            statusLock.unlock();
1✔
707
        }
708

709
        processException(io);
1✔
710

711
        // Spawn a thread so we don't have timing issues with
712
        // waiting on read/write threads
713
        executor.submit(() -> {
1✔
714
            if (!tryingToConnect.get()) {
1✔
715
                try {
716
                    tryingToConnect.set(true);
1✔
717

718
                    // any issue that brings us here is pretty serious
719
                    // so we are comfortable forcing the close
720
                    this.closeSocket(true, true);
1✔
721
                } catch (InterruptedException e) {
×
722
                    processException(e);
×
723
                    Thread.currentThread().interrupt();
×
724
                } finally {
725
                    tryingToConnect.set(false);
1✔
726
                }
727
            }
728
        });
1✔
729
    }
1✔
730

731
    // Close socket is called when another connect attempt is possible
732
    // Close is called when the connection should shut down, period
733
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
734
        // Ensure we close the socket exclusively within one thread.
735
        closeSocketLock.lock();
1✔
736
        try {
737
            boolean wasConnected;
738
            statusLock.lock();
1✔
739
            try {
740
                if (isDisconnectingOrClosed()) {
1✔
741
                    waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
742
                    return;
1✔
743
                }
744
                this.disconnecting = true;
1✔
745
                this.exceptionDuringConnectChange = null;
1✔
746
                wasConnected = (this.status == Status.CONNECTED);
1✔
747
                statusChanged.signalAll();
1✔
748
            } finally {
749
                statusLock.unlock();
1✔
750
            }
751

752
            closeSocketImpl(forceClose);
1✔
753

754
            statusLock.lock();
1✔
755
            try {
756
                updateStatus(Status.DISCONNECTED);
1✔
757
                this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
1✔
758
                this.disconnecting = false;
1✔
759
                statusChanged.signalAll();
1✔
760
            } finally {
761
                statusLock.unlock();
1✔
762
            }
763

764
            if (isClosing()) { // isClosing() means we are in the close method or were asked to be
1✔
765
                close();
1✔
766
            } else if (wasConnected && tryReconnectIfConnected) {
1✔
767
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
768
            }
769
        } finally {
770
            closeSocketLock.unlock();
1✔
771
        }
772
    }
1✔
773

774
    // Close socket is called when another connect attempt is possible
775
    // Close is called when the connection should shut down, period
776
    /**
777
     * {@inheritDoc}
778
     */
779
    @Override
780
    public void close() throws InterruptedException {
781
        this.close(true, false);
1✔
782
    }
1✔
783

784
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
785
        statusLock.lock();
1✔
786
        try {
787
            if (checkDrainStatus && this.isDraining()) {
1✔
788
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
789
                return;
1✔
790
            }
791

792
            this.closing = true;// We were asked to close, so do it
1✔
793
            if (isDisconnectingOrClosed()) {
1✔
794
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
795
                return;
1✔
796
            } else {
797
                this.disconnecting = true;
1✔
798
                this.exceptionDuringConnectChange = null;
1✔
799
                statusChanged.signalAll();
1✔
800
            }
801
        } finally {
802
            statusLock.unlock();
1✔
803
        }
804

805
        // Stop the reconnect wait timer after we stop the writer/reader (only if we are
806
        // really closing, not on errors)
807
        if (this.reconnectWaiter != null) {
1✔
808
            this.reconnectWaiter.cancel(true);
1✔
809
        }
810

811
        closeSocketImpl(forceClose);
1✔
812

813
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
1✔
814

815
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
1✔
816

817
        this.dispatchers.clear();
1✔
818
        this.subscribers.clear();
1✔
819

820
        if (pingTask != null) {
1✔
821
            pingTask.shutdown();
1✔
822
            pingTask = null;
1✔
823
        }
824
        if (cleanupTask != null) {
1✔
825
            cleanupTask.shutdown();
1✔
826
            cleanupTask = null;
1✔
827
        }
828

829
        cleanResponses(true);
1✔
830

831
        cleanUpPongQueue();
1✔
832

833
        statusLock.lock();
1✔
834
        try {
835
            updateStatus(Status.CLOSED); // will signal, we also signal when we stop disconnecting
1✔
836

837
            /*
838
             * if (exceptionDuringConnectChange != null) {
839
             * processException(exceptionDuringConnectChange); exceptionDuringConnectChange
840
             * = null; }
841
             */
842
        } finally {
843
            statusLock.unlock();
1✔
844
        }
845

846
        // Stop the error handling and connect executors
847
        callbackRunner.shutdown();
1✔
848
        try {
849
            callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
1✔
850
        } finally {
851
            callbackRunner.shutdownNow();
1✔
852
        }
853

854
        // There's no need to wait for running tasks since we're told to close
855
        connectExecutor.shutdownNow();
1✔
856

857
        // The callbackRunner and connectExecutor always come from a factory
858
        // so we always shut them down.
859
        // The executor and scheduledExecutor come from a factory iff
860
        // the user does not supply them, so we shut them down in that case.
861
        if (options.executorIsInternal()) {
1✔
862
            executor.shutdownNow();
1✔
863
        }
864
        if (options.scheduledExecutorIsInternal()) {
1✔
865
            scheduledExecutor.shutdownNow();
1✔
866
        }
867

868
        statusLock.lock();
1✔
869
        try {
870
            this.disconnecting = false;
1✔
871
            statusChanged.signalAll();
1✔
872
        } finally {
873
            statusLock.unlock();
1✔
874
        }
875
    }
1✔
876

877
    boolean callbackRunnerIsShutdown() {
878
        return callbackRunner == null || callbackRunner.isShutdown();
1✔
879
    }
880

881
    boolean executorIsShutdown() {
882
        return executor == null || executor.isShutdown();
1✔
883
    }
884

885
    boolean connectExecutorIsShutdown() {
886
        return connectExecutor == null || connectExecutor.isShutdown();
1✔
887
    }
888

889
    boolean scheduledExecutorIsShutdown() {
890
        return scheduledExecutor == null || scheduledExecutor.isShutdown();
1✔
891
    }
892

893
    // Should only be called from closeSocket or close
894
    void closeSocketImpl(boolean forceClose) {
895
        this.currentServer = null;
1✔
896

897
        // Signal both to stop.
898
        final Future<Boolean> readStop = this.reader.stop();
1✔
899
        final Future<Boolean> writeStop = this.writer.stop();
1✔
900

901
        // Now wait until they both stop before closing the socket.
902
        try {
903
            readStop.get(1, TimeUnit.SECONDS);
1✔
904
        } catch (Exception ex) {
1✔
905
            //
906
        }
1✔
907
        try {
908
            writeStop.get(1, TimeUnit.SECONDS);
1✔
909
        } catch (Exception ex) {
×
910
            //
911
        }
1✔
912

913
        // Close and reset the current data port and future
914
        if (dataPortFuture != null) {
1✔
915
            dataPortFuture.cancel(true);
1✔
916
            dataPortFuture = null;
1✔
917
        }
918

919
        // Close the current socket and cancel anyone waiting for it
920
        try {
921
            if (dataPort != null) {
1✔
922
                if (forceClose) {
1✔
923
                    dataPort.forceClose();
1✔
924
                }
925
                else {
926
                    dataPort.close();
1✔
927
                }
928
            }
929

930
        } catch (IOException ex) {
×
931
            processException(ex);
×
932
        }
1✔
933
        cleanUpPongQueue();
1✔
934

935
        try {
936
            this.reader.stop().get(10, TimeUnit.SECONDS);
1✔
937
        } catch (Exception ex) {
×
938
            processException(ex);
×
939
        }
1✔
940
        try {
941
            this.writer.stop().get(10, TimeUnit.SECONDS);
1✔
942
        } catch (Exception ex) {
×
943
            processException(ex);
×
944
        }
1✔
945
    }
1✔
946

947
    void cleanUpPongQueue() {
948
        Future<Boolean> b;
949
        while ((b = pongQueue.poll()) != null) {
1✔
950
            b.cancel(true);
1✔
951
        }
952
    }
1✔
953

954
    /**
955
     * {@inheritDoc}
956
     */
957
    @Override
958
    public void publish(String subject, byte[] body) {
959
        publishInternal(subject, null, null, body, true, false);
1✔
960
    }
1✔
961

962
    /**
963
     * {@inheritDoc}
964
     */
965
    @Override
966
    public void publish(String subject, Headers headers, byte[] body) {
967
        publishInternal(subject, null, headers, body, true, false);
1✔
968
    }
1✔
969

970
    /**
971
     * {@inheritDoc}
972
     */
973
    @Override
974
    public void publish(String subject, String replyTo, byte[] body) {
975
        publishInternal(subject, replyTo, null, body, true, false);
1✔
976
    }
1✔
977

978
    /**
979
     * {@inheritDoc}
980
     */
981
    @Override
982
    public void publish(String subject, String replyTo, Headers headers, byte[] body) {
983
        publishInternal(subject, replyTo, headers, body, true, false);
1✔
984
    }
1✔
985

986
    /**
987
     * {@inheritDoc}
988
     */
989
    @Override
990
    public void publish(Message message) {
991
        validateNotNull(message, "Message");
1✔
992
        publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
1✔
993
    }
1✔
994

995
    void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
996
        checkPayloadSize(data);
1✔
997
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
998
        if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
1✔
999
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
1✔
1000
        }
1001

1002
        if (isClosed()) {
1✔
1003
            throw new IllegalStateException("Connection is Closed");
1✔
1004
        } else if (blockPublishForDrain.get()) {
1✔
1005
            throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
×
1006
        }
1007

1008
        if ((status == Status.RECONNECTING || status == Status.DISCONNECTED)
1✔
1009
                && !this.writer.canQueueDuringReconnect(npm)) {
1✔
1010
            throw new IllegalStateException(
1✔
1011
                    "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
1✔
1012
        }
1013

1014
        queueOutgoing(npm);
1✔
1015
    }
1✔
1016

1017
    private void checkPayloadSize(byte[] body) {
1018
        if (options.clientSideLimitChecks() && body != null && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
1✔
1019
            throw new IllegalArgumentException(
1✔
1020
                "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
1✔
1021
        }
1022
    }
1✔
1023
    /**
1024
     * {@inheritDoc}
1025
     */
1026
    @Override
1027
    public Subscription subscribe(String subject) {
1028
        validateSubject(subject, true);
1✔
1029
        return createSubscription(subject, null, null, null);
1✔
1030
    }
1031

1032
    /**
1033
     * {@inheritDoc}
1034
     */
1035
    @Override
1036
    public Subscription subscribe(String subject, String queueName) {
1037
        validateSubject(subject, true);
1✔
1038
        validateQueueName(queueName, true);
1✔
1039
        return createSubscription(subject, queueName, null, null);
1✔
1040
    }
1041

1042
    void invalidate(NatsSubscription sub) {
1043
        remove(sub);
1✔
1044
        sub.invalidate();
1✔
1045
    }
1✔
1046

1047
    void remove(NatsSubscription sub) {
1048
        CharSequence sid = sub.getSID();
1✔
1049
        subscribers.remove(sid);
1✔
1050

1051
        if (sub.getNatsDispatcher() != null) {
1✔
1052
            sub.getNatsDispatcher().remove(sub);
1✔
1053
        }
1054
    }
1✔
1055

1056
    void unsubscribe(NatsSubscription sub, int after) {
1057
        if (isClosed()) { // last chance, usually sub will catch this
1✔
1058
            throw new IllegalStateException("Connection is Closed");
×
1059
        }
1060

1061
        if (after <= 0) {
1✔
1062
            this.invalidate(sub); // Will clean it up
1✔
1063
        } else {
1064
            sub.setUnsubLimit(after);
1✔
1065

1066
            if (sub.reachedUnsubLimit()) {
1✔
1067
                sub.invalidate();
1✔
1068
            }
1069
        }
1070

1071
        if (!isConnected()) {
1✔
1072
            return; // We will set up sub on reconnect or ignore
1✔
1073
        }
1074

1075
        sendUnsub(sub, after);
1✔
1076
    }
1✔
1077

1078
    void sendUnsub(NatsSubscription sub, int after) {
1079
        ByteArrayBuilder bab =
1✔
1080
            new ByteArrayBuilder().append(UNSUB_SP_BYTES).append(sub.getSID());
1✔
1081
        if (after > 0) {
1✔
1082
            bab.append(SP).append(after);
1✔
1083
        }
1084
        queueOutgoing(new ProtocolMessage(bab, true));
1✔
1085
    }
1✔
1086

1087
    // Assumes the null/empty checks were handled elsewhere
1088
    NatsSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, NatsSubscriptionFactory factory) {
1089
        if (isClosed()) {
1✔
1090
            throw new IllegalStateException("Connection is Closed");
1✔
1091
        } else if (isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
1✔
1092
            throw new IllegalStateException("Connection is Draining");
1✔
1093
        }
1094

1095
        NatsSubscription sub;
1096
        String sid = getNextSid();
1✔
1097

1098
        if (factory == null) {
1✔
1099
            sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1100
        }
1101
        else {
1102
            sub = factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1103
        }
1104
        subscribers.put(sid, sub);
1✔
1105

1106
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1107
        return sub;
1✔
1108
    }
1109

1110
    String getNextSid() {
1111
        return Long.toString(nextSid.getAndIncrement());
1✔
1112
    }
1113

1114
    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
1115
        String sid = getNextSid();
1✔
1116
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1117
        subscribers.put(sid, sub);
1✔
1118
        return sid;
1✔
1119
    }
1120

1121
    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
1122
        if (!isConnected()) {
1✔
1123
            return; // We will set up sub on reconnect or ignore
1✔
1124
        }
1125

1126
        ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
1✔
1127
        if (queueName != null) {
1✔
1128
            bab.append(SP).append(queueName);
1✔
1129
        }
1130
        bab.append(SP).append(sid);
1✔
1131

1132
        // setting this to filter on stop.
1133
        // if it's an "internal" message, it won't be filtered
1134
        // if it's a normal message, the subscription will already be registered
1135
        // and therefore will be re-subscribed after a stop anyway
1136
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
1✔
1137
        if (treatAsInternal) {
1✔
1138
            queueInternalOutgoing(subMsg);
1✔
1139
        } else {
1140
            queueOutgoing(subMsg);
1✔
1141
        }
1142
    }
1✔
1143

1144
    /**
1145
     * {@inheritDoc}
1146
     */
1147
    @Override
1148
    public String createInbox() {
1149
        return options.getInboxPrefix() + nuid.next();
1✔
1150
    }
1151

1152
    int getRespInboxLength() {
1153
        return options.getInboxPrefix().length() + 22 + 1; // 22 for nuid, 1 for .
1✔
1154
    }
1155

1156
    String createResponseInbox(String inbox) {
1157
        // Substring gets rid of the * [trailing]
1158
        return inbox.substring(0, getRespInboxLength()) + nuid.next();
1✔
1159
    }
1160

1161
    // If the inbox is long enough, pull out the end part, otherwise, just use the
1162
    // full thing
1163
    String getResponseToken(String responseInbox) {
1164
        int len = getRespInboxLength();
1✔
1165
        if (responseInbox.length() <= len) {
1✔
1166
            return responseInbox;
1✔
1167
        }
1168
        return responseInbox.substring(len);
1✔
1169
    }
1170

1171
    void cleanResponses(boolean closing) {
1172
        ArrayList<String> toRemove = new ArrayList<>();
1✔
1173
        boolean wasInterrupted = false;
1✔
1174

1175
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
1✔
1176
            boolean remove = false;
1✔
1177
            NatsRequestCompletableFuture future = entry.getValue();
1✔
1178
            if (future.hasExceededTimeout()) {
1✔
1179
                remove = true;
1✔
1180
                future.cancelTimedOut();
1✔
1181
            }
1182
            else if (closing) {
1✔
1183
                remove = true;
1✔
1184
                future.cancelClosing();
1✔
1185
            }
1186
            else if (future.isDone()) {
1✔
1187
                // done should have already been removed, not sure if
1188
                // this even needs checking, but it won't hurt
1189
                remove = true;
1✔
1190
                try {
1191
                    future.get();
1✔
1192
                }
1193
                catch (InterruptedException e) {
×
1194
                    Thread.currentThread().interrupt();
×
1195
                    // we might have collected some entries already, but were interrupted
1196
                    // break out so we finish as quick as possible
1197
                    // cleanResponses will be called again anyway
1198
                    wasInterrupted = true;
×
1199
                    break;
×
1200
                }
1201
                catch (Throwable ignore) {}
1✔
1202
            }
1203

1204
            if (remove) {
1✔
1205
                toRemove.add(entry.getKey());
1✔
1206
                statistics.decrementOutstandingRequests();
1✔
1207
            }
1208
        }
1✔
1209

1210
        for (String key : toRemove) {
1✔
1211
            responsesAwaiting.remove(key);
1✔
1212
        }
1✔
1213

1214
        if (advancedTracking && !wasInterrupted) {
1✔
1215
            toRemove.clear(); // just reuse this
1✔
1216
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1✔
1217
                NatsRequestCompletableFuture future = entry.getValue();
1✔
1218
                if (future.hasExceededTimeout()) {
1✔
1219
                    toRemove.add(entry.getKey());
1✔
1220
                    future.cancelTimedOut();
1✔
1221
                }
1222
            }
1✔
1223

1224
            for (String token : toRemove) {
1✔
1225
                responsesRespondedTo.remove(token);
1✔
1226
            }
1✔
1227
        }
1228
    }
1✔
1229

1230
    /**
1231
     * {@inheritDoc}
1232
     */
1233
    @Override
1234
    public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
1235
        return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1236
    }
1237

1238
    /**
1239
     * {@inheritDoc}
1240
     */
1241
    @Override
1242
    public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
1243
        return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1244
    }
1245

1246
    /**
1247
     * {@inheritDoc}
1248
     */
1249
    @Override
1250
    public Message request(Message message, Duration timeout) throws InterruptedException {
1251
        validateNotNull(message, "Message");
1✔
1252
        return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1253
    }
1254

1255
    Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout,
1256
                            CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
1257
        CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1258
        try {
1259
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
1260
        } catch (TimeoutException | ExecutionException | CancellationException e) {
1✔
1261
            return null;
1✔
1262
        }
1263
    }
1264

1265
    /**
1266
     * {@inheritDoc}
1267
     */
1268
    @Override
1269
    public CompletableFuture<Message> request(String subject, byte[] body) {
1270
        return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1271
    }
1272

1273
    /**
1274
     * {@inheritDoc}
1275
     */
1276
    @Override
1277
    public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
1278
        return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1279
    }
1280

1281
    /**
1282
     * {@inheritDoc}
1283
     */
1284
    @Override
1285
    public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
1286
        return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1287
    }
1288

1289
    /**
1290
     * {@inheritDoc}
1291
     */
1292
    @Override
1293
    public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
1294
        return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1295
    }
1296

1297
    /**
1298
     * {@inheritDoc}
1299
     */
1300
    @Override
1301
    public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
1302
        validateNotNull(message, "Message");
1✔
1303
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1304
    }
1305

1306
    /**
1307
     * {@inheritDoc}
1308
     */
1309
    @Override
1310
    public CompletableFuture<Message> request(Message message) {
1311
        validateNotNull(message, "Message");
1✔
1312
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
1✔
1313
    }
1314

1315
    CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout,
1316
                                                     CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1317
        checkPayloadSize(data);
1✔
1318

1319
        if (isClosed()) {
1✔
1320
            throw new IllegalStateException("Connection is Closed");
1✔
1321
        } else if (isDraining()) {
1✔
1322
            throw new IllegalStateException("Connection is Draining");
1✔
1323
        }
1324

1325
        if (inboxDispatcher.get() == null) {
1✔
1326
            inboxDispatcherLock.lock();
1✔
1327
            try {
1328
                if (inboxDispatcher.get() == null) {
1✔
1329
                    NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1✔
1330

1331
                    // Ensure the dispatcher is started before publishing messages
1332
                    String id = this.nuid.next();
1✔
1333
                    this.dispatchers.put(id, d);
1✔
1334
                    d.start(id);
1✔
1335
                    d.subscribe(this.mainInbox);
1✔
1336
                    inboxDispatcher.set(d);
1✔
1337
                }
1338
            } finally {
1339
                inboxDispatcherLock.unlock();
1✔
1340
            }
1341
        }
1342

1343
        boolean oldStyle = options.isOldRequestStyle();
1✔
1344
        String responseInbox = oldStyle ? createInbox() : createResponseInbox(this.mainInbox);
1✔
1345
        String responseToken = getResponseToken(responseInbox);
1✔
1346
        NatsRequestCompletableFuture future =
1✔
1347
            new NatsRequestCompletableFuture(cancelAction,
1348
                futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());
1✔
1349

1350
        if (!oldStyle) {
1✔
1351
            responsesAwaiting.put(responseToken, future);
1✔
1352
        }
1353
        statistics.incrementOutstandingRequests();
1✔
1354

1355
        if (oldStyle) {
1✔
1356
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
1✔
1357
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
1✔
1358
            dispatcher.unsubscribe(responseInbox, 1);
1✔
1359
            // Unsubscribe when future is cancelled:
1360
            future.whenComplete((msg, exception) -> {
1✔
1361
                if (exception instanceof CancellationException) {
1✔
1362
                    dispatcher.unsubscribe(responseInbox);
×
1363
                }
1364
            });
1✔
1365
            responsesAwaiting.put(sub.getSID(), future);
1✔
1366
        }
1367

1368
        publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1369
        statistics.incrementRequestsSent();
1✔
1370

1371
        return future;
1✔
1372
    }
1373

1374
    void deliverReply(Message msg) {
1375
        boolean oldStyle = options.isOldRequestStyle();
1✔
1376
        String subject = msg.getSubject();
1✔
1377
        String token = getResponseToken(subject);
1✔
1378
        String key = oldStyle ? msg.getSID() : token;
1✔
1379
        NatsRequestCompletableFuture f = responsesAwaiting.remove(key);
1✔
1380
        if (f != null) {
1✔
1381
            if (advancedTracking) {
1✔
1382
                responsesRespondedTo.put(key, f);
1✔
1383
            }
1384
            statistics.decrementOutstandingRequests();
1✔
1385
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
1✔
1386
                switch (f.getCancelAction()) {
1✔
1387
                    case COMPLETE:
1388
                        f.complete(msg);
1✔
1389
                        break;
1✔
1390
                    case REPORT:
1391
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
1✔
1392
                        break;
1✔
1393
                    case CANCEL:
1394
                    default:
1395
                        f.cancel(true);
1✔
1396
                }
1397
            }
1398
            else {
1399
                f.complete(msg);
1✔
1400
            }
1401
            statistics.incrementRepliesReceived();
1✔
1402
        }
1403
        else if (!oldStyle && !subject.startsWith(mainInbox)) {
1✔
1404
            if (advancedTracking) {
1✔
1405
                if (responsesRespondedTo.get(key) != null) {
1✔
1406
                    statistics.incrementDuplicateRepliesReceived();
1✔
1407
                } else {
1408
                    statistics.incrementOrphanRepliesReceived();
1✔
1409
                }
1410
            }
1411
        }
1412
    }
1✔
1413

1414
    public Dispatcher createDispatcher() {
1415
        return createDispatcher(null);
1✔
1416
    }
1417

1418
    public Dispatcher createDispatcher(MessageHandler handler) {
1419
        if (isClosed()) {
1✔
1420
            throw new IllegalStateException("Connection is Closed");
1✔
1421
        } else if (isDraining()) {
1✔
1422
            throw new IllegalStateException("Connection is Draining");
1✔
1423
        }
1424

1425
        NatsDispatcher dispatcher = dispatcherFactory.createDispatcher(this, handler);
1✔
1426
        String id = this.nuid.next();
1✔
1427
        this.dispatchers.put(id, dispatcher);
1✔
1428
        dispatcher.start(id);
1✔
1429
        return dispatcher;
1✔
1430
    }
1431

1432
    public void closeDispatcher(Dispatcher d) {
1433
        if (isClosed()) {
1✔
1434
            throw new IllegalStateException("Connection is Closed");
1✔
1435
        } else if (!(d instanceof NatsDispatcher)) {
1✔
1436
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
×
1437
        }
1438

1439
        NatsDispatcher nd = ((NatsDispatcher) d);
1✔
1440

1441
        if (nd.isDraining()) {
1✔
1442
            return; // No op while draining
1✔
1443
        }
1444

1445
        if (!this.dispatchers.containsKey(nd.getId())) {
1✔
1446
            throw new IllegalArgumentException("Dispatcher is already closed.");
1✔
1447
        }
1448

1449
        cleanupDispatcher(nd);
1✔
1450
    }
1✔
1451

1452
    void cleanupDispatcher(NatsDispatcher nd) {
1453
        nd.stop(true);
1✔
1454
        this.dispatchers.remove(nd.getId());
1✔
1455
    }
1✔
1456

1457
    Map<String, Dispatcher> getDispatchers() {
1458
        return Collections.unmodifiableMap(dispatchers);
1✔
1459
    }
1460

1461
    public void addConnectionListener(ConnectionListener connectionListener) {
1462
        connectionListeners.add(connectionListener);
1✔
1463
    }
1✔
1464

1465
    public void removeConnectionListener(ConnectionListener connectionListener) {
1466
        connectionListeners.remove(connectionListener);
1✔
1467
    }
1✔
1468

1469
    public void flush(Duration timeout) throws TimeoutException, InterruptedException {
1470

1471
        Instant start = Instant.now();
1✔
1472
        waitForConnectOrClose(timeout);
1✔
1473

1474
        if (isClosed()) {
1✔
1475
            throw new TimeoutException("Attempted to flush while closed");
1✔
1476
        }
1477

1478
        if (timeout == null) {
1✔
1479
            timeout = Duration.ZERO;
1✔
1480
        }
1481

1482
        Instant now = Instant.now();
1✔
1483
        Duration waitTime = Duration.between(start, now);
1✔
1484

1485
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
1✔
1486
            throw new TimeoutException("Timeout out waiting for connection before flush.");
1✔
1487
        }
1488

1489
        try {
1490
            Future<Boolean> waitForIt = sendPing();
1✔
1491

1492
            if (waitForIt == null) { // error in the send ping code
1✔
1493
                return;
×
1494
            }
1495

1496
            long nanos = timeout.toNanos();
1✔
1497

1498
            if (nanos > 0) {
1✔
1499

1500
                nanos -= waitTime.toNanos();
1✔
1501

1502
                if (nanos <= 0) {
1✔
1503
                    nanos = 1; // let the future timeout if it isn't resolved
×
1504
                }
1505

1506
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
1✔
1507
            } else {
1508
                waitForIt.get();
1✔
1509
            }
1510

1511
            this.statistics.incrementFlushCounter();
1✔
1512
        } catch (ExecutionException | CancellationException e) {
1✔
1513
            throw new TimeoutException(e.toString());
1✔
1514
        }
1✔
1515
    }
1✔
1516

1517
    void sendConnect(NatsUri nuri) throws IOException {
1518
        try {
1519
            ServerInfo info = this.serverInfo.get();
1✔
1520
            // This is changed - we used to use info.isAuthRequired(), but are changing it to
1521
            // better match older versions of the server. It may change again in the future.
1522
            CharBuffer connectOptions = options.buildProtocolConnectOptionsString(
1✔
1523
                nuri.toString(), true, info.getNonce());
1✔
1524
            ByteArrayBuilder bab =
1✔
1525
                new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
1✔
1526
                    .append(CONNECT_SP_BYTES).append(connectOptions);
1✔
1527
            queueInternalOutgoing(new ProtocolMessage(bab, false));
1✔
1528
        } catch (Exception exp) {
1✔
1529
            throw new IOException("Error sending connect string", exp);
1✔
1530
        }
1✔
1531
    }
1✔
1532

1533
    CompletableFuture<Boolean> sendPing() {
1534
        return this.sendPing(true);
1✔
1535
    }
1536

1537
    CompletableFuture<Boolean> softPing() {
1538
        return this.sendPing(false);
1✔
1539
    }
1540

1541
    /**
1542
     * {@inheritDoc}
1543
     */
1544
    @Override
1545
    public Duration RTT() throws IOException {
1546
        if (!isConnectedOrConnecting()) {
1✔
1547
            throw new IOException("Must be connected to do RTT.");
1✔
1548
        }
1549

1550
        long timeout = options.getConnectionTimeout().toMillis();
1✔
1551
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1552
        pongQueue.add(pongFuture);
1✔
1553
        try {
1554
            long time = NatsSystemClock.nanoTime();
1✔
1555
            writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
1✔
1556
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
1✔
1557
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
1✔
1558
        }
1559
        catch (ExecutionException e) {
×
1560
            throw new IOException(e.getCause());
×
1561
        }
1562
        catch (TimeoutException e) {
×
1563
            throw new IOException(e);
×
1564
        }
1565
        catch (InterruptedException e) {
×
1566
            Thread.currentThread().interrupt();
×
1567
            throw new IOException(e);
×
1568
        }
1569
    }
1570

1571
    // Send a ping request and push a pong future on the queue.
1572
    // futures are completed in order, keep this one if a thread wants to wait
1573
    // for a specific pong. Note, if no pong returns the wait will not return
1574
    // without setting a timeout.
1575
    CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
1576
        if (!isConnectedOrConnecting()) {
1✔
1577
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1578
            retVal.complete(Boolean.FALSE);
1✔
1579
            return retVal;
1✔
1580
        }
1581

1582
        if (!treatAsInternal && !this.needPing.get()) {
1✔
1583
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1584
            retVal.complete(Boolean.TRUE);
1✔
1585
            this.needPing.set(true);
1✔
1586
            return retVal;
1✔
1587
        }
1588

1589
        int max = options.getMaxPingsOut();
1✔
1590
        if (max > 0 && pongQueue.size() + 1 > max) {
1✔
1591
            handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
1✔
1592
            return null;
1✔
1593
        }
1594

1595
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1596
        pongQueue.add(pongFuture);
1✔
1597

1598
        if (treatAsInternal) {
1✔
1599
            queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1600
        } else {
1601
            queueOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1602
        }
1603

1604
        this.needPing.set(true);
1✔
1605
        this.statistics.incrementPingCount();
1✔
1606
        return pongFuture;
1✔
1607
    }
1608

1609
    // This is a minor speed / memory enhancement.
1610
    // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state
1611
    // But it is safe to share the data bytes and the size since those fields are just being read
1612
    // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1613
    // reducing allocation of data for something that is often created and used
1614
    // These static instances are the once that are used for copying, sendPing and sendPong
1615
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1✔
1616
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1✔
1617

1618
    void sendPong() {
1619
        queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
1✔
1620
    }
1✔
1621

1622
    // Called by the reader
1623
    void handlePong() {
1624
        CompletableFuture<Boolean> pongFuture = pongQueue.pollFirst();
1✔
1625
        if (pongFuture != null) {
1✔
1626
            pongFuture.complete(Boolean.TRUE);
1✔
1627
        }
1628
    }
1✔
1629

1630
    void readInitialInfo() throws IOException {
1631
        byte[] readBuffer = new byte[options.getBufferSize()];
1✔
1632
        ByteBuffer protocolBuffer = ByteBuffer.allocate(options.getBufferSize());
1✔
1633
        boolean gotCRLF = false;
1✔
1634
        boolean gotCR = false;
1✔
1635

1636
        while (!gotCRLF) {
1✔
1637
            int read = this.dataPort.read(readBuffer, 0, readBuffer.length);
1✔
1638

1639
            if (read < 0) {
1✔
1640
                break;
1✔
1641
            }
1642

1643
            int i = 0;
1✔
1644
            while (i < read) {
1✔
1645
                byte b = readBuffer[i++];
1✔
1646

1647
                if (gotCR) {
1✔
1648
                    if (b != LF) {
1✔
1649
                        throw new IOException("Missed LF after CR waiting for INFO.");
1✔
1650
                    } else if (i < read) {
1✔
1651
                        throw new IOException("Read past initial info message.");
1✔
1652
                    }
1653

1654
                    gotCRLF = true;
1✔
1655
                    break;
1✔
1656
                }
1657

1658
                if (b == CR) {
1✔
1659
                    gotCR = true;
1✔
1660
                } else {
1661
                    if (!protocolBuffer.hasRemaining()) {
1✔
1662
                        protocolBuffer = enlargeBuffer(protocolBuffer); // just double it
1✔
1663
                    }
1664
                    protocolBuffer.put(b);
1✔
1665
                }
1666
            }
1✔
1667
        }
1✔
1668

1669
        if (!gotCRLF) {
1✔
1670
            throw new IOException("Failed to read initial info message.");
1✔
1671
        }
1672

1673
        protocolBuffer.flip();
1✔
1674

1675
        String infoJson = UTF_8.decode(protocolBuffer).toString();
1✔
1676
        infoJson = infoJson.trim();
1✔
1677
        String[] msg = infoJson.split("\\s");
1✔
1678
        String op = msg[0].toUpperCase();
1✔
1679

1680
        if (!OP_INFO.equals(op)) {
1✔
1681
            throw new IOException("Received non-info initial message.");
1✔
1682
        }
1683

1684
        handleInfo(infoJson);
1✔
1685
    }
1✔
1686

1687
    void handleInfo(String infoJson) {
1688
        ServerInfo serverInfo = new ServerInfo(infoJson);
1✔
1689
        this.serverInfo.set(serverInfo);
1✔
1690

1691
        List<String> urls = this.serverInfo.get().getConnectURLs();
1✔
1692
        if (urls != null && !urls.isEmpty()) {
1✔
1693
            if (serverPool.acceptDiscoveredUrls(urls)) {
1✔
1694
                processConnectionEvent(Events.DISCOVERED_SERVERS);
1✔
1695
            }
1696
        }
1697

1698
        if (serverInfo.isLameDuckMode()) {
1✔
1699
            processConnectionEvent(Events.LAME_DUCK);
1✔
1700
        }
1701
    }
1✔
1702

1703
    void queueOutgoing(NatsMessage msg) {
1704
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1705
            throw new IllegalArgumentException("Control line is too long");
1✔
1706
        }
1707
        if (!writer.queue(msg)) {
1✔
1708
            options.getErrorListener().messageDiscarded(this, msg);
1✔
1709
        }
1710
    }
1✔
1711

1712
    void queueInternalOutgoing(NatsMessage msg) {
1713
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1714
            throw new IllegalArgumentException("Control line is too long");
×
1715
        }
1716
        this.writer.queueInternalMessage(msg);
1✔
1717
    }
1✔
1718

1719
    void deliverMessage(NatsMessage msg) {
1720
        this.needPing.set(false);
1✔
1721
        this.statistics.incrementInMsgs();
1✔
1722
        this.statistics.incrementInBytes(msg.getSizeInBytes());
1✔
1723

1724
        NatsSubscription sub = subscribers.get(msg.getSID());
1✔
1725

1726
        if (sub != null) {
1✔
1727
            msg.setSubscription(sub);
1✔
1728

1729
            NatsDispatcher d = sub.getNatsDispatcher();
1✔
1730
            NatsConsumer c = (d == null) ? sub : d;
1✔
1731
            MessageQueue q = ((d == null) ? sub.getMessageQueue() : d.getMessageQueue());
1✔
1732

1733
            if (c.hasReachedPendingLimits()) {
1✔
1734
                // Drop the message and count it
1735
                this.statistics.incrementDroppedCount();
1✔
1736
                c.incrementDroppedCount();
1✔
1737

1738
                // Notify the first time
1739
                if (!c.isMarkedSlow()) {
1✔
1740
                    c.markSlow();
1✔
1741
                    processSlowConsumer(c);
1✔
1742
                }
1743
            } else if (q != null) {
1✔
1744
                c.markNotSlow();
1✔
1745

1746
                // beforeQueueProcessor returns true if the message is allowed to be queued
1747
                if (sub.getBeforeQueueProcessor().apply(msg)) {
1✔
1748
                    q.push(msg);
1✔
1749
                }
1750
            }
1751

1752
        }
1753
//        else {
1754
//            // Drop messages we don't have a subscriber for (could be extras on an
1755
//            // auto-unsub for example)
1756
//        }
1757
    }
1✔
1758

1759
    void processOK() {
1760
        this.statistics.incrementOkCount();
1✔
1761
    }
1✔
1762

1763
    void processSlowConsumer(Consumer consumer) {
1764
        if (!this.callbackRunner.isShutdown()) {
1✔
1765
            try {
1766
                this.callbackRunner.execute(() -> {
1✔
1767
                    try {
1768
                        options.getErrorListener().slowConsumerDetected(this, consumer);
1✔
1769
                    } catch (Exception ex) {
1✔
1770
                        this.statistics.incrementExceptionCount();
1✔
1771
                    }
1✔
1772
                });
1✔
1773
            } catch (RejectedExecutionException re) {
×
1774
                // Timing with shutdown, let it go
1775
            }
1✔
1776
        }
1777
    }
1✔
1778

1779
    void processException(Exception exp) {
1780
        this.statistics.incrementExceptionCount();
1✔
1781

1782
        if (!this.callbackRunner.isShutdown()) {
1✔
1783
            try {
1784
                this.callbackRunner.execute(() -> {
1✔
1785
                    try {
1786
                        options.getErrorListener().exceptionOccurred(this, exp);
1✔
1787
                    } catch (Exception ex) {
1✔
1788
                        this.statistics.incrementExceptionCount();
1✔
1789
                    }
1✔
1790
                });
1✔
UNCOV
1791
            } catch (RejectedExecutionException re) {
×
1792
                // Timing with shutdown, let it go
1793
            }
1✔
1794
        }
1795
    }
1✔
1796

1797
    void processError(String errorText) {
1798
        this.statistics.incrementErrCount();
1✔
1799

1800
        this.lastError.set(errorText);
1✔
1801
        this.connectError.set(errorText); // even if this isn't during connection, save it just in case
1✔
1802

1803
        // If we are connected && we get an authentication error, save it
1804
        if (this.isConnected() && this.isAuthenticationError(errorText) && currentServer != null) {
1✔
1805
            this.serverAuthErrors.put(currentServer, errorText);
1✔
1806
        }
1807

1808
        if (!this.callbackRunner.isShutdown()) {
1✔
1809
            try {
1810
                this.callbackRunner.execute(() -> {
1✔
1811
                    try {
1812
                        options.getErrorListener().errorOccurred(this, errorText);
1✔
1813
                    } catch (Exception ex) {
1✔
1814
                        this.statistics.incrementExceptionCount();
1✔
1815
                    }
1✔
1816
                });
1✔
1817
            } catch (RejectedExecutionException re) {
×
1818
                // Timing with shutdown, let it go
1819
            }
1✔
1820
        }
1821
    }
1✔
1822

1823
    interface ErrorListenerCaller {
1824
        void call(Connection conn, ErrorListener el);
1825
    }
1826

1827
    void executeCallback(ErrorListenerCaller elc) {
1828
        if (!this.callbackRunner.isShutdown()) {
1✔
1829
            try {
1830
                this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
1✔
1831
            } catch (RejectedExecutionException re) {
×
1832
                // Timing with shutdown, let it go
1833
            }
1✔
1834
        }
1835
    }
1✔
1836

1837
    void processConnectionEvent(Events type) {
1838
        if (!this.callbackRunner.isShutdown()) {
1✔
1839
            try {
1840
                for (ConnectionListener listener : connectionListeners) {
1✔
1841
                    this.callbackRunner.execute(() -> {
1✔
1842
                        try {
1843
                            listener.connectionEvent(this, type);
1✔
1844
                        } catch (Exception ex) {
1✔
1845
                            this.statistics.incrementExceptionCount();
1✔
1846
                        }
1✔
1847
                    });
1✔
1848
                }
1✔
1849
            } catch (RejectedExecutionException re) {
×
1850
                // Timing with shutdown, let it go
1851
            }
1✔
1852
        }
1853
    }
1✔
1854

1855
    /**
1856
     * {@inheritDoc}
1857
     */
1858
    @Override
1859
    public ServerInfo getServerInfo() {
1860
        return getInfo();
1✔
1861
    }
1862

1863
    /**
1864
     * {@inheritDoc}
1865
     */
1866
    @Override
1867
    public InetAddress getClientInetAddress() {
1868
        try {
1869
            return InetAddress.getByName(getInfo().getClientIp());
1✔
1870
        }
1871
        catch (Exception e) {
×
1872
            return null;
×
1873
        }
1874
    }
1875

1876
    ServerInfo getInfo() {
1877
        return this.serverInfo.get();
1✔
1878
    }
1879

1880
    /**
1881
     * {@inheritDoc}
1882
     */
1883
    @Override
1884
    public Options getOptions() {
1885
        return this.options;
1✔
1886
    }
1887

1888
    /**
1889
     * {@inheritDoc}
1890
     */
1891
    @Override
1892
    public Statistics getStatistics() {
1893
        return this.statistics;
1✔
1894
    }
1895

1896
    StatisticsCollector getNatsStatistics() {
1897
        return this.statistics;
1✔
1898
    }
1899

1900
    DataPort getDataPort() {
1901
        return this.dataPort;
1✔
1902
    }
1903

1904
    // Used for testing
1905
    int getConsumerCount() {
1906
        return this.subscribers.size() + this.dispatchers.size();
1✔
1907
    }
1908

1909
    public long getMaxPayload() {
1910
        ServerInfo info = this.serverInfo.get();
1✔
1911

1912
        if (info == null) {
1✔
1913
            return -1;
×
1914
        }
1915

1916
        return info.getMaxPayload();
1✔
1917
    }
1918

1919
    /**
1920
     * Return the list of known server urls, including additional servers discovered
1921
     * after a connection has been established.
1922
     * @return this connection's list of known server URLs
1923
     */
1924
    public Collection<String> getServers() {
1925
        return serverPool.getServerList();
1✔
1926
    }
1927

1928
    protected List<NatsUri> resolveHost(NatsUri nuri) {
1929
        // 1. If the nuri host is not already an ip address or the nuri is not for websocket or fast fallback is disabled,
1930
        //    let the pool resolve it.
1931
        List<NatsUri> results = new ArrayList<>();
1✔
1932
        if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && !options.isEnableFastFallback()) {
1✔
1933
            List<String> ips = serverPool.resolveHostToIps(nuri.getHost());
1✔
1934
            if (ips != null) {
1✔
1935
                for (String ip : ips) {
1✔
1936
                    try {
1937
                        results.add(nuri.reHost(ip));
1✔
1938
                    }
1939
                    catch (URISyntaxException u) {
1✔
1940
                        // ??? should never happen
1941
                    }
1✔
1942
                }
1✔
1943
            }
1944
        }
1945

1946
        // 2. If there were no results,
1947
        //    - host was already an ip address or
1948
        //    - host was for websocket or
1949
        //    - fast fallback is enabled
1950
        //    - pool returned nothing or
1951
        //    - resolving failed...
1952
        //    so the list just becomes the original host.
1953
        if (results.isEmpty()) {
1✔
1954
            results.add(nuri);
1✔
1955
        }
1956
        return results;
1✔
1957
    }
1958

1959
    /**
1960
     * {@inheritDoc}
1961
     */
1962
    @Override
1963
    public String getConnectedUrl() {
1964
        return currentServer == null ? null : currentServer.toString();
1✔
1965
    }
1966

1967
    /**
1968
     * {@inheritDoc}
1969
     */
1970
    @Override
1971
    public Status getStatus() {
1972
        return this.status;
1✔
1973
    }
1974

1975
    /**
1976
     * {@inheritDoc}
1977
     */
1978
    @Override
1979
    public String getLastError() {
1980
        return this.lastError.get();
1✔
1981
    }
1982

1983
    /**
1984
     * {@inheritDoc}
1985
     */
1986
    @Override
1987
    public void clearLastError() {
1988
        this.lastError.set("");
1✔
1989
    }
1✔
1990

1991
    ExecutorService getExecutor() {
1992
        return executor;
1✔
1993
    }
1994

1995
    ScheduledExecutorService getScheduledExecutor() {
1996
        return scheduledExecutor;
1✔
1997
    }
1998

1999
    void updateStatus(Status newStatus) {
2000
        Status oldStatus = this.status;
1✔
2001

2002
        statusLock.lock();
1✔
2003
        try {
2004
            if (oldStatus == Status.CLOSED || newStatus == oldStatus) {
1✔
2005
                return;
1✔
2006
            }
2007
            this.status = newStatus;
1✔
2008
        } finally {
2009
            statusChanged.signalAll();
1✔
2010
            statusLock.unlock();
1✔
2011
        }
2012

2013
        if (this.status == Status.DISCONNECTED) {
1✔
2014
            processConnectionEvent(Events.DISCONNECTED);
1✔
2015
        } else if (this.status == Status.CLOSED) {
1✔
2016
            processConnectionEvent(Events.CLOSED);
1✔
2017
        } else if (oldStatus == Status.RECONNECTING && this.status == Status.CONNECTED) {
1✔
2018
            processConnectionEvent(Events.RECONNECTED);
1✔
2019
        } else if (this.status == Status.CONNECTED) {
1✔
2020
            processConnectionEvent(Events.CONNECTED);
1✔
2021
        }
2022
    }
1✔
2023

2024
    boolean isClosing() {
2025
        return this.closing;
1✔
2026
    }
2027

2028
    boolean isClosed() {
2029
        return this.status == Status.CLOSED;
1✔
2030
    }
2031

2032
    boolean isConnected() {
2033
        return this.status == Status.CONNECTED;
1✔
2034
    }
2035

2036
    boolean isDisconnected() {
2037
        return this.status == Status.DISCONNECTED;
×
2038
    }
2039

2040
    boolean isConnectedOrConnecting() {
2041
        statusLock.lock();
1✔
2042
        try {
2043
            return this.status == Status.CONNECTED || this.connecting;
1✔
2044
        } finally {
2045
            statusLock.unlock();
1✔
2046
        }
2047
    }
2048

2049
    boolean isDisconnectingOrClosed() {
2050
        statusLock.lock();
1✔
2051
        try {
2052
            return this.status == Status.CLOSED || this.disconnecting;
1✔
2053
        } finally {
2054
            statusLock.unlock();
1✔
2055
        }
2056
    }
2057

2058
    boolean isDisconnecting() {
2059
        statusLock.lock();
1✔
2060
        try {
2061
            return this.disconnecting;
1✔
2062
        } finally {
2063
            statusLock.unlock();
1✔
2064
        }
2065
    }
2066

2067
    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
2068
        waitFor(timeout, (Void) -> this.isDisconnecting() && !this.isClosed() );
1✔
2069
    }
1✔
2070

2071
    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
2072
        waitFor(timeout, (Void) -> !this.isConnected() && !this.isClosed());
1✔
2073
    }
1✔
2074

2075
    void waitFor(Duration timeout, Predicate<Void> test) throws InterruptedException {
2076
        statusLock.lock();
1✔
2077
        try {
2078
            long currentWaitNanos = (timeout != null) ? timeout.toNanos() : -1;
1✔
2079
            long start = NatsSystemClock.nanoTime();
1✔
2080
            while (currentWaitNanos >= 0 && test.test(null)) {
1✔
2081
                if (currentWaitNanos > 0) {
1✔
2082
                    statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS);
1✔
2083
                    long now = NatsSystemClock.nanoTime();
1✔
2084
                    currentWaitNanos = currentWaitNanos - (now - start);
1✔
2085
                    start = now;
1✔
2086

2087
                    if (currentWaitNanos <= 0) {
1✔
2088
                        break;
1✔
2089
                    }
2090
                } else {
1✔
2091
                    statusChanged.await();
×
2092
                }
2093
            }
2094
        } finally {
2095
            statusLock.unlock();
1✔
2096
        }
2097
    }
1✔
2098

2099
    void invokeReconnectDelayHandler(long totalRounds) {
2100
        long currentWaitNanos = 0;
1✔
2101

2102
        ReconnectDelayHandler handler = options.getReconnectDelayHandler();
1✔
2103
        if (handler == null) {
1✔
2104
            Duration dur = options.getReconnectWait();
1✔
2105
            if (dur != null) {
1✔
2106
                currentWaitNanos = dur.toNanos();
1✔
2107
                dur = serverPool.hasSecureServer() ? options.getReconnectJitterTls() : options.getReconnectJitter();
1✔
2108
                if (dur != null) {
1✔
2109
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
1✔
2110
                }
2111
            }
2112
        }
1✔
2113
        else {
2114
            Duration waitTime = handler.getWaitTime(totalRounds);
1✔
2115
            if (waitTime != null) {
1✔
2116
                currentWaitNanos = waitTime.toNanos();
1✔
2117
            }
2118
        }
2119

2120
        this.reconnectWaiter = new CompletableFuture<>();
1✔
2121

2122
        long start = NatsSystemClock.nanoTime();
1✔
2123
        while (currentWaitNanos > 0 && !isDisconnectingOrClosed() && !isConnected() && !this.reconnectWaiter.isDone()) {
1✔
2124
            try {
2125
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
×
2126
            } catch (Exception exp) {
1✔
2127
                // ignore, try to loop again
2128
            }
×
2129
            long now = NatsSystemClock.nanoTime();
1✔
2130
            currentWaitNanos = currentWaitNanos - (now - start);
1✔
2131
            start = now;
1✔
2132
        }
1✔
2133

2134
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
2135
    }
1✔
2136

2137
    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
2138
        int current = buffer.capacity();
1✔
2139
        int newSize = current * 2;
1✔
2140
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
1✔
2141
        buffer.flip();
1✔
2142
        newBuffer.put(buffer);
1✔
2143
        return newBuffer;
1✔
2144
    }
2145

2146
    // For testing
2147
    NatsConnectionReader getReader() {
2148
        return this.reader;
1✔
2149
    }
2150

2151
    // For testing
2152
    NatsConnectionWriter getWriter() {
2153
        return this.writer;
1✔
2154
    }
2155

2156
    // For testing
2157
    Future<DataPort> getDataPortFuture() {
2158
        return this.dataPortFuture;
1✔
2159
    }
2160

2161
    boolean isDraining() {
2162
        return this.draining.get() != null;
1✔
2163
    }
2164

2165
    boolean isDrained() {
2166
        CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2167

2168
        try {
2169
            if (tracker != null && tracker.getNow(false)) {
1✔
2170
                return true;
1✔
2171
            }
2172
        } catch (Exception e) {
×
2173
            // These indicate the tracker was cancelled/timed out
2174
        }
1✔
2175

2176
        return false;
1✔
2177
    }
2178

2179
    /**
2180
     * {@inheritDoc}
2181
     */
2182
    @Override
2183
    public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException {
2184

2185
        if (isClosing() || isClosed()) {
1✔
2186
            throw new IllegalStateException("A connection can't be drained during close.");
1✔
2187
        }
2188

2189
        this.statusLock.lock();
1✔
2190
        try {
2191
            if (isDraining()) {
1✔
2192
                return this.draining.get();
1✔
2193
            }
2194
            this.draining.set(new CompletableFuture<>());
1✔
2195
        } finally {
2196
            this.statusLock.unlock();
1✔
2197
        }
2198

2199
        final CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2200
        Instant start = Instant.now();
1✔
2201

2202
        // Don't include subscribers with dispatchers
2203
        HashSet<NatsSubscription> pureSubscribers = new HashSet<>(this.subscribers.values());
1✔
2204
        pureSubscribers.removeIf((s) -> s.getDispatcher() != null);
1✔
2205

2206
        final HashSet<NatsConsumer> consumers = new HashSet<>();
1✔
2207
        consumers.addAll(pureSubscribers);
1✔
2208
        consumers.addAll(this.dispatchers.values());
1✔
2209

2210
        NatsDispatcher inboxer = this.inboxDispatcher.get();
1✔
2211

2212
        if (inboxer != null) {
1✔
2213
            consumers.add(inboxer);
1✔
2214
        }
2215

2216
        // Stop the consumers NOW so that when this method returns they are blocked
2217
        consumers.forEach((cons) -> {
1✔
2218
            cons.markDraining(tracker);
1✔
2219
            cons.sendUnsubForDrain();
1✔
2220
        });
1✔
2221

2222
        try {
2223
            this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
1✔
2224
        } catch (Exception e) {
1✔
2225
            this.close(false, false);
1✔
2226
            throw e;
1✔
2227
        }
1✔
2228

2229
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
1✔
2230

2231
        // Wait for the timeout or the pending count to go to 0
2232
        executor.submit(() -> {
1✔
2233
            try {
2234
                long stop = (timeout == null || timeout.equals(Duration.ZERO))
1✔
2235
                    ? Long.MAX_VALUE
2236
                    : NatsSystemClock.nanoTime() + timeout.toNanos();
1✔
2237
                while (NatsSystemClock.nanoTime() < stop && !Thread.interrupted())
1✔
2238
                {
2239
                    consumers.removeIf(NatsConsumer::isDrained);
1✔
2240
                    if (consumers.isEmpty()) {
1✔
2241
                        break;
1✔
2242
                    }
2243
                    //noinspection BusyWait
2244
                    Thread.sleep(1); // Sleep 1 milli
1✔
2245
                }
2246

2247
                // Stop publishing
2248
                this.blockPublishForDrain.set(true);
1✔
2249

2250
                // One last flush
2251
                if (timeout == null || timeout.equals(Duration.ZERO)) {
1✔
2252
                    this.flush(Duration.ZERO);
1✔
2253
                } else {
2254
                    Instant now = Instant.now();
1✔
2255
                    Duration passed = Duration.between(start, now);
1✔
2256
                    Duration newTimeout = timeout.minus(passed);
1✔
2257
                    if (newTimeout.toNanos() > 0) {
1✔
2258
                        this.flush(newTimeout);
1✔
2259
                    }
2260
                }
2261
                this.close(false, false); // close the connection after the last flush
1✔
2262
                tracker.complete(consumers.isEmpty());
1✔
2263
            } catch (TimeoutException e) {
×
2264
                this.processException(e);
×
2265
            } catch (InterruptedException e) {
×
2266
                this.processException(e);
×
2267
                Thread.currentThread().interrupt();
×
2268
            } finally {
2269
                try {
2270
                    this.close(false, false);// close the connection after the last flush
1✔
2271
                } catch (InterruptedException e) {
×
2272
                    processException(e);
×
2273
                    Thread.currentThread().interrupt();
×
2274
                }
1✔
2275
                tracker.complete(false);
1✔
2276
            }
2277
        });
1✔
2278

2279
        return tracker;
1✔
2280
    }
2281

2282
    boolean isAuthenticationError(String err) {
2283
        if (err == null) {
1✔
2284
            return false;
1✔
2285
        }
2286
        err = err.toLowerCase();
1✔
2287
        return err.startsWith("user authentication")
1✔
2288
            || err.contains("authorization violation")
1✔
2289
            || err.startsWith("account authentication expired");
1✔
2290
    }
2291

2292
    /**
2293
     * {@inheritDoc}
2294
     */
2295
    @Override
2296
    public void flushBuffer() throws IOException {
2297
        if (!isConnected()) {
1✔
2298
            throw new IllegalStateException("Connection is not active.");
1✔
2299
        }
2300
        writer.flushBuffer();
1✔
2301
    }
1✔
2302

2303
    /**
2304
     * {@inheritDoc}
2305
     */
2306
    @Override
2307
    public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException {
2308
        Validator.validateStreamName(streamName, true);
1✔
2309
        ensureNotClosing();
1✔
2310
        return new NatsStreamContext(streamName, null, this, null);
1✔
2311
    }
2312

2313
    /**
2314
     * {@inheritDoc}
2315
     */
2316
    @Override
2317
    public StreamContext getStreamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException {
2318
        Validator.validateStreamName(streamName, true);
1✔
2319
        ensureNotClosing();
1✔
2320
        return new NatsStreamContext(streamName, null, this, options);
1✔
2321
    }
2322

2323
    /**
2324
     * {@inheritDoc}
2325
     */
2326
    @Override
2327
    public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException {
2328
        return getStreamContext(streamName).getConsumerContext(consumerName);
1✔
2329
    }
2330

2331
    /**
2332
     * {@inheritDoc}
2333
     */
2334
    @Override
2335
    public ConsumerContext getConsumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException {
2336
        return getStreamContext(streamName, options).getConsumerContext(consumerName);
1✔
2337
    }
2338

2339
    /**
2340
     * {@inheritDoc}
2341
     */
2342
    @Override
2343
    public JetStream jetStream() throws IOException {
2344
        ensureNotClosing();
1✔
2345
        return new NatsJetStream(this, null);
1✔
2346
    }
2347

2348
    /**
2349
     * {@inheritDoc}
2350
     */
2351
    @Override
2352
    public JetStream jetStream(JetStreamOptions options) throws IOException {
2353
        ensureNotClosing();
1✔
2354
        return new NatsJetStream(this, options);
1✔
2355
    }
2356

2357
    /**
2358
     * {@inheritDoc}
2359
     */
2360
    @Override
2361
    public JetStreamManagement jetStreamManagement() throws IOException {
2362
        ensureNotClosing();
1✔
2363
        return new NatsJetStreamManagement(this, null);
1✔
2364
    }
2365

2366
    /**
2367
     * {@inheritDoc}
2368
     */
2369
    @Override
2370
    public JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
2371
        ensureNotClosing();
1✔
2372
        return new NatsJetStreamManagement(this, options);
1✔
2373
    }
2374

2375
    /**
2376
     * {@inheritDoc}
2377
     */
2378
    @Override
2379
    public KeyValue keyValue(String bucketName) throws IOException {
2380
        Validator.validateBucketName(bucketName, true);
1✔
2381
        ensureNotClosing();
1✔
2382
        return new NatsKeyValue(this, bucketName, null);
1✔
2383
    }
2384

2385
    /**
2386
     * {@inheritDoc}
2387
     */
2388
    @Override
2389
    public KeyValue keyValue(String bucketName, KeyValueOptions options) throws IOException {
2390
        Validator.validateBucketName(bucketName, true);
1✔
2391
        ensureNotClosing();
1✔
2392
        return new NatsKeyValue(this, bucketName, options);
1✔
2393
    }
2394

2395
    /**
2396
     * {@inheritDoc}
2397
     */
2398
    @Override
2399
    public KeyValueManagement keyValueManagement() throws IOException {
2400
        ensureNotClosing();
1✔
2401
        return new NatsKeyValueManagement(this, null);
1✔
2402
    }
2403

2404
    /**
2405
     * {@inheritDoc}
2406
     */
2407
    @Override
2408
    public KeyValueManagement keyValueManagement(KeyValueOptions options) throws IOException {
2409
        ensureNotClosing();
1✔
2410
        return new NatsKeyValueManagement(this, options);
1✔
2411
    }
2412

2413
    /**
2414
     * {@inheritDoc}
2415
     */
2416
    @Override
2417
    public ObjectStore objectStore(String bucketName) throws IOException {
2418
        Validator.validateBucketName(bucketName, true);
1✔
2419
        ensureNotClosing();
1✔
2420
        return new NatsObjectStore(this, bucketName, null);
1✔
2421
    }
2422

2423
    /**
2424
     * {@inheritDoc}
2425
     */
2426
    @Override
2427
    public ObjectStore objectStore(String bucketName, ObjectStoreOptions options) throws IOException {
2428
        Validator.validateBucketName(bucketName, true);
1✔
2429
        ensureNotClosing();
1✔
2430
        return new NatsObjectStore(this, bucketName, options);
1✔
2431
    }
2432

2433
    /**
2434
     * {@inheritDoc}
2435
     */
2436
    @Override
2437
    public ObjectStoreManagement objectStoreManagement() throws IOException {
2438
        ensureNotClosing();
1✔
2439
        return new NatsObjectStoreManagement(this, null);
1✔
2440
    }
2441

2442
    /**
2443
     * {@inheritDoc}
2444
     */
2445
    @Override
2446
    public ObjectStoreManagement objectStoreManagement(ObjectStoreOptions options) throws IOException {
2447
        ensureNotClosing();
1✔
2448
        return new NatsObjectStoreManagement(this, options);
1✔
2449
    }
2450

2451
    private void ensureNotClosing() throws IOException {
2452
        if (isClosing() || isClosed()) {
1✔
2453
            throw new IOException("A JetStream context can't be established during close.");
1✔
2454
        }
2455
    }
1✔
2456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc