• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2094

05 Aug 2025 01:45PM UTC coverage: 95.507% (-0.008%) from 95.515%
#2094

push

github

web-flow
Merge pull request #1381 from nats-io/replace-vs-all

Use String.replace instead of String.replaceAll

5 of 5 new or added lines in 2 files covered. (100.0%)

3 existing lines in 3 files now uncovered.

11860 of 12418 relevant lines covered (95.51%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.95
/src/main/java/io/nats/client/impl/NatsConnection.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.ConnectionListener.Events;
18
import io.nats.client.api.ServerInfo;
19
import io.nats.client.support.*;
20
import org.jspecify.annotations.NonNull;
21

22
import java.io.IOException;
23
import java.net.InetAddress;
24
import java.net.URISyntaxException;
25
import java.nio.ByteBuffer;
26
import java.nio.CharBuffer;
27
import java.time.Duration;
28
import java.time.Instant;
29
import java.util.*;
30
import java.util.concurrent.*;
31
import java.util.concurrent.atomic.AtomicBoolean;
32
import java.util.concurrent.atomic.AtomicLong;
33
import java.util.concurrent.atomic.AtomicReference;
34
import java.util.concurrent.locks.Condition;
35
import java.util.concurrent.locks.ReentrantLock;
36
import java.util.function.Predicate;
37

38
import static io.nats.client.support.NatsConstants.*;
39
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
40
import static io.nats.client.support.Validator.*;
41
import static java.nio.charset.StandardCharsets.UTF_8;
42

43
class NatsConnection implements Connection {
44

45
    public static final double NANOS_PER_SECOND = 1_000_000_000.0;
46

47
    private final Options options;
48
    final boolean forceFlushOnRequest;
49

50
    private final StatisticsCollector statistics;
51

52
    private boolean connecting; // you can only connect in one thread
53
    private boolean disconnecting; // you can only disconnect in one thread
54
    private boolean closing; // respect a close call regardless
55
    private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
56
    final ReentrantLock closeSocketLock;
57

58
    private Status status;
59
    private final ReentrantLock statusLock;
60
    private final Condition statusChanged;
61

62
    private CompletableFuture<DataPort> dataPortFuture;
63
    private DataPort dataPort;
64
    private NatsUri currentServer;
65
    private CompletableFuture<Boolean> reconnectWaiter;
66
    private final ConcurrentHashMap<NatsUri, String> serverAuthErrors;
67

68
    private NatsConnectionReader reader;
69
    private NatsConnectionWriter writer;
70

71
    private final AtomicReference<ServerInfo> serverInfo;
72

73
    private final Map<String, NatsSubscription> subscribers;
74
    private final Map<String, NatsDispatcher> dispatchers; // use a concurrent map so we get more consistent iteration behavior
75
    private final Collection<ConnectionListener> connectionListeners;
76
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
77
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
78
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
79

80
    private final String mainInbox;
81
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
82
    private final ReentrantLock inboxDispatcherLock;
83
    private ScheduledTask pingTask;
84
    private ScheduledTask cleanupTask;
85

86
    private final AtomicBoolean needPing;
87

88
    private final AtomicLong nextSid;
89
    private final NUID nuid;
90

91
    private final AtomicReference<String> connectError;
92
    private final AtomicReference<String> lastError;
93
    private final AtomicReference<CompletableFuture<Boolean>> draining;
94
    private final AtomicBoolean blockPublishForDrain;
95
    private final AtomicBoolean tryingToConnect;
96

97
    private final ExecutorService callbackRunner;
98
    private final ExecutorService executor;
99
    private final ExecutorService connectExecutor;
100
    private final ScheduledExecutorService scheduledExecutor;
101
    private final boolean advancedTracking;
102

103
    private final ServerPool serverPool;
104
    private final DispatcherFactory dispatcherFactory;
105
    final CancelAction cancelAction;
106

107
    private final boolean trace;
108
    private final TimeTraceLogger timeTraceLogger;
109

110
    NatsConnection(Options options) {
1✔
111
        trace = options.isTraceConnection();
1✔
112
        timeTraceLogger = options.getTimeTraceLogger();
1✔
113
        timeTraceLogger.trace("creating connection object");
1✔
114

115
        this.options = options;
1✔
116
        forceFlushOnRequest = options.forceFlushOnRequest();
1✔
117

118
        advancedTracking = options.isTrackAdvancedStats();
1✔
119
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
1✔
120
        this.statistics.setAdvancedTracking(advancedTracking);
1✔
121

122
        this.closeSocketLock = new ReentrantLock();
1✔
123

124
        this.statusLock = new ReentrantLock();
1✔
125
        this.statusChanged = this.statusLock.newCondition();
1✔
126
        this.status = Status.DISCONNECTED;
1✔
127
        this.reconnectWaiter = new CompletableFuture<>();
1✔
128
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
129

130
        this.connectionListeners = ConcurrentHashMap.newKeySet();
1✔
131
        if (options.getConnectionListener() != null) {
1✔
132
            addConnectionListener(options.getConnectionListener());
1✔
133
        }
134

135
        this.dispatchers = new ConcurrentHashMap<>();
1✔
136
        this.subscribers = new ConcurrentHashMap<>();
1✔
137
        this.responsesAwaiting = new ConcurrentHashMap<>();
1✔
138
        this.responsesRespondedTo = new ConcurrentHashMap<>();
1✔
139
        this.serverAuthErrors = new ConcurrentHashMap<>();
1✔
140

141
        this.nextSid = new AtomicLong(1);
1✔
142
        timeTraceLogger.trace("creating NUID");
1✔
143
        this.nuid = new NUID();
1✔
144
        this.mainInbox = createInbox() + ".*";
1✔
145

146
        this.lastError = new AtomicReference<>();
1✔
147
        this.connectError = new AtomicReference<>();
1✔
148

149
        this.serverInfo = new AtomicReference<>();
1✔
150
        this.inboxDispatcher = new AtomicReference<>();
1✔
151
        this.inboxDispatcherLock = new ReentrantLock();
1✔
152
        this.pongQueue = new ConcurrentLinkedDeque<>();
1✔
153
        this.draining = new AtomicReference<>();
1✔
154
        this.blockPublishForDrain = new AtomicBoolean();
1✔
155
        this.tryingToConnect = new AtomicBoolean();
1✔
156

157
        timeTraceLogger.trace("creating executors");
1✔
158
        this.executor = options.getExecutor();
1✔
159
        this.callbackRunner = options.getCallbackExecutor();
1✔
160
        this.connectExecutor = options.getConnectExecutor();
1✔
161
        this.scheduledExecutor = options.getScheduledExecutor();
1✔
162

163
        timeTraceLogger.trace("creating reader and writer");
1✔
164
        this.reader = new NatsConnectionReader(this);
1✔
165
        this.writer = new NatsConnectionWriter(this, null);
1✔
166

167
        this.needPing = new AtomicBoolean(true);
1✔
168

169
        serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
1✔
170
        serverPool.initialize(options);
1✔
171
        dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
1✔
172

173
        cancelAction = options.isReportNoResponders() ? CancelAction.REPORT : CancelAction.CANCEL;
1✔
174

175
        timeTraceLogger.trace("connection object created");
1✔
176
    }
1✔
177

178
    // Connect is only called after creation
179
    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
180
        if (!tryingToConnect.get()) {
1✔
181
            try {
182
                tryingToConnect.set(true);
1✔
183
                connectImpl(reconnectOnConnect);
1✔
184
            }
185
            finally {
186
                tryingToConnect.set(false);
1✔
187
            }
188
        }
189
    }
1✔
190

191
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
192
        if (options.getServers().isEmpty()) {
1✔
193
            throw new IllegalArgumentException("No servers provided in options");
×
194
        }
195

196
        boolean trace = options.isTraceConnection();
1✔
197
        long start = NatsSystemClock.nanoTime();
1✔
198

199
        this.lastError.set("");
1✔
200

201
        timeTraceLogger.trace("starting connect loop");
1✔
202

203
        Set<NatsUri> failList = new HashSet<>();
1✔
204
        boolean keepGoing = true;
1✔
205
        NatsUri first = null;
1✔
206
        NatsUri cur;
207
        while (keepGoing && (cur = serverPool.peekNextServer()) != null) {
1✔
208
            if (first == null) {
1✔
209
                first = cur;
1✔
210
            }
211
            else if (cur.equals(first)) {
1✔
212
                break;  // connect only goes through loop once
1✔
213
            }
214
            serverPool.nextServer(); // b/c we only peeked.
1✔
215

216
            // let server pool resolve hostnames, then loop through resolved
217
            List<NatsUri> resolvedList = resolveHost(cur);
1✔
218
            for (NatsUri resolved : resolvedList) {
1✔
219
                if (isClosed()) {
1✔
220
                    keepGoing = false;
1✔
221
                    break;
1✔
222
                }
223
                connectError.set(""); // new on each attempt
1✔
224

225
                timeTraceLogger.trace("setting status to connecting");
1✔
226
                updateStatus(Status.CONNECTING);
1✔
227

228
                timeTraceLogger.trace("trying to connect to %s", cur);
1✔
229
                tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
230

231
                if (isConnected()) {
1✔
232
                    serverPool.connectSucceeded(cur);
1✔
233
                    keepGoing = false;
1✔
234
                    break;
1✔
235
                }
236

237
                timeTraceLogger.trace("setting status to disconnected");
1✔
238
                updateStatus(Status.DISCONNECTED);
1✔
239

240
                failList.add(cur);
1✔
241
                serverPool.connectFailed(cur);
1✔
242

243
                String err = connectError.get();
1✔
244

245
                if (this.isAuthenticationError(err)) {
1✔
246
                    this.serverAuthErrors.put(resolved, err);
1✔
247
                }
248
            }
1✔
249
        }
1✔
250

251
        if (!isConnected() && !isClosed()) {
1✔
252
            if (reconnectOnConnect) {
1✔
253
                timeTraceLogger.trace("trying to reconnect on connect");
1✔
254
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
255
            }
256
            else {
257
                timeTraceLogger.trace("connection failed, closing to cleanup");
1✔
258
                close();
1✔
259

260
                String err = connectError.get();
1✔
261
                if (this.isAuthenticationError(err)) {
1✔
262
                    throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
1✔
263
                }
264
                throw new IOException("Unable to connect to NATS servers: " + failList);
1✔
265
            }
266
        }
267
        else if (trace) {
1✔
268
            long end = NatsSystemClock.nanoTime();
1✔
269
            double seconds = ((double) (end - start)) / NANOS_PER_SECOND;
1✔
270
            timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
1✔
271
        }
272
    }
1✔
273

274
    @Override
275
    public void forceReconnect() throws IOException, InterruptedException {
276
        forceReconnect(ForceReconnectOptions.DEFAULT_INSTANCE);
1✔
277
    }
1✔
278

279
    @Override
280
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
281
        if (!tryingToConnect.get()) {
1✔
282
            try {
283
                tryingToConnect.set(true);
1✔
284
                forceReconnectImpl(options == null ? ForceReconnectOptions.DEFAULT_INSTANCE : options);
1✔
285
            }
286
            finally {
287
                tryingToConnect.set(false);
1✔
288
            }
289
        }
290
    }
1✔
291

292
    void forceReconnectImpl(@NonNull ForceReconnectOptions frOpts) throws InterruptedException {
293
        if (frOpts.getFlushWait() != null) {
1✔
294
            try {
295
                flush(frOpts.getFlushWait());
×
296
            }
297
            catch (TimeoutException e) {
×
298
                // Ignored. Manual test demonstrates that if the connection is dropped
299
                // in the middle of the flush, the most likely reason for a TimeoutException,
300
                // the socket is closed.
301
            }
×
302
        }
303

304
        closeSocketLock.lock();
1✔
305
        try {
306
            updateStatus(Status.DISCONNECTED);
1✔
307

308
            // Close and reset the current data port and future
309
            if (dataPortFuture != null) {
1✔
310
                dataPortFuture.cancel(true);
1✔
311
                dataPortFuture = null;
1✔
312
            }
313

314
            // close the data port as a task so as not to block reconnecting
315
            if (dataPort != null) {
1✔
316
                final DataPort dataPortToClose = dataPort;
1✔
317
                dataPort = null;
1✔
318
                executor.submit(() -> {
1✔
319
                    try {
320
                        if (frOpts.isForceClose()) {
1✔
321
                            dataPortToClose.forceClose();
×
322
                        }
323
                        else {
324
                            dataPortToClose.close();
1✔
325
                        }
326
                    }
327
                    catch (IOException ignore) {
×
328
                        // ignored since running as a task and nothing we can do.
329
                    }
1✔
330
                });
1✔
331
            }
332

333
            // stop i/o
334
            try {
335
                this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
1✔
336
            }
337
            catch (Exception ex) {
×
338
                processException(ex);
×
339
            }
1✔
340
            try {
341
                this.writer.stop().get(100, TimeUnit.MILLISECONDS);
1✔
342
            }
343
            catch (Exception ex) {
×
344
                processException(ex);
×
345
            }
1✔
346

347
            // new reader/writer
348
            reader = new NatsConnectionReader(this);
1✔
349
            writer = new NatsConnectionWriter(this, writer);
1✔
350
        }
351
        finally {
352
            closeSocketLock.unlock();
1✔
353
        }
354

355
        reconnectImpl();
1✔
356
        writer.setReconnectMode(false);
1✔
357
    }
1✔
358

359
    void reconnect() throws InterruptedException {
360
        if (!tryingToConnect.get()) {
1✔
361
            try {
362
                tryingToConnect.set(true);
1✔
363
                reconnectImpl();
1✔
364
            }
365
            finally {
366
                tryingToConnect.set(false);
1✔
367
            }
368
        }
369
    }
1✔
370

371
    // Reconnect can only be called when the connection is disconnected
372
    void reconnectImpl() throws InterruptedException {
373
        if (isClosed()) {
1✔
374
            return;
1✔
375
        }
376

377
        if (options.getMaxReconnect() == 0) {
1✔
378
            this.close();
1✔
379
            return;
1✔
380
        }
381

382
        writer.setReconnectMode(true);
1✔
383

384
        if (!isConnected() && !isClosed() && !this.isClosing()) {
1✔
385
            boolean keepGoing = true;
1✔
386
            int totalRounds = 0;
1✔
387
            NatsUri first = null;
1✔
388
            NatsUri cur;
389
            while (keepGoing && (cur = serverPool.nextServer()) != null) {
1✔
390
                if (first == null) {
1✔
391
                    first = cur;
1✔
392
                }
393
                else if (first.equals(cur)) {
1✔
394
                    // went around the pool an entire time
395
                    invokeReconnectDelayHandler(++totalRounds);
1✔
396
                }
397

398
                // let server list provider resolve hostnames
399
                // then loop through resolved
400
                List<NatsUri> resolvedList = resolveHost(cur);
1✔
401
                for (NatsUri resolved : resolvedList) {
1✔
402
                    if (isClosed()) {
1✔
403
                        keepGoing = false;
1✔
404
                        break;
1✔
405
                    }
406
                    connectError.set(""); // reset on each loop
1✔
407
                    if (isDisconnectingOrClosed() || this.isClosing()) {
1✔
408
                        keepGoing = false;
1✔
409
                        break;
1✔
410
                    }
411
                    updateStatus(Status.RECONNECTING);
1✔
412

413
                    timeTraceLogger.trace("reconnecting to server %s", cur);
1✔
414
                    tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
415

416
                    if (isConnected()) {
1✔
417
                        serverPool.connectSucceeded(cur);
1✔
418
                        statistics.incrementReconnects();
1✔
419
                        keepGoing = false;
1✔
420
                        break;
1✔
421
                    }
422

423
                    serverPool.connectFailed(cur);
1✔
424
                    String err = connectError.get();
1✔
425
                    if (this.isAuthenticationError(err)) {
1✔
426
                        if (err.equals(this.serverAuthErrors.get(resolved))) {
1✔
427
                            keepGoing = false; // double auth error
1✔
428
                            break;
1✔
429
                        }
430
                        serverAuthErrors.put(resolved, err);
1✔
431
                    }
432
                }
1✔
433
            }
1✔
434
        } // end-main-loop
435

436
        if (!isConnected()) {
1✔
437
            this.close();
1✔
438
            return;
1✔
439
        }
440

441
        this.subscribers.forEach((sid, sub) -> {
1✔
442
            if (sub.getDispatcher() == null && !sub.isDraining()) {
1✔
443
                sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
444
            }
445
        });
1✔
446

447
        this.dispatchers.forEach((nuid, d) -> {
1✔
448
            if (!d.isDraining()) {
1✔
449
                d.resendSubscriptions();
1✔
450
            }
451
        });
1✔
452

453
        try {
454
            this.flush(this.options.getConnectionTimeout());
1✔
455
        } catch (Exception exp) {
1✔
456
            this.processException(exp);
1✔
457
        }
1✔
458

459
        processConnectionEvent(Events.RESUBSCRIBED);
1✔
460

461
        // When the flush returns, we are done sending internal messages,
462
        // so we can switch to the non-reconnect queue
463
        this.writer.setReconnectMode(false);
1✔
464
    }
1✔
465

466
    long timeCheck(long endNanos, String message) throws TimeoutException {
467
        long remainingNanos = endNanos - NatsSystemClock.nanoTime();
1✔
468
        if (trace) {
1✔
469
            traceTimeCheck(message, remainingNanos);
1✔
470
        }
471
        if (remainingNanos < 0) {
1✔
472
            throw new TimeoutException("connection timed out");
1✔
473
        }
474
        return remainingNanos;
1✔
475
    }
476

477
    void traceTimeCheck(String message, long remainingNanos) {
478
        if (remainingNanos < 0) {
1✔
479
            if (remainingNanos > -1_000_000) { // less than -1 ms
1✔
480
                timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remainingNanos));
1✔
481
            }
482
            else if (remainingNanos > -1_000_000_000) { // less than -1 second
1✔
483
                long ms = -remainingNanos / 1_000_000;
1✔
484
                timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms));
1✔
485
            }
1✔
486
            else {
487
                double seconds = ((double)-remainingNanos) / 1_000_000_000.0;
1✔
488
                timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds));
1✔
489
            }
1✔
490
        }
491
        else if (remainingNanos < 1_000_000) {
1✔
492
            timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remainingNanos));
1✔
493
        }
494
        else if (remainingNanos < 1_000_000_000) {
1✔
495
            long ms = remainingNanos / 1_000_000;
1✔
496
            timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms));
1✔
497
        }
1✔
498
        else {
499
            double seconds = ((double) remainingNanos) / 1_000_000_000.0;
1✔
500
            timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds));
1✔
501
        }
502
    }
1✔
503

504
    // is called from reconnect and connect
505
    // will wait for any previous attempt to complete, using the reader.stop and
506
    // writer.stop
507
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
508
        currentServer = null;
1✔
509

510
        try {
511
            Duration connectTimeout = options.getConnectionTimeout();
1✔
512
            boolean trace = options.isTraceConnection();
1✔
513
            long end = now + connectTimeout.toNanos();
1✔
514
            timeCheck(end, "starting connection attempt");
1✔
515

516
            statusLock.lock();
1✔
517
            try {
518
                if (this.connecting) {
1✔
519
                    return;
×
520
                }
521
                this.connecting = true;
1✔
522
                statusChanged.signalAll();
1✔
523
            } finally {
524
                statusLock.unlock();
1✔
525
            }
526

527
            // Create a new future for the DataPort, the reader/writer will use this
528
            // to wait for the connect/failure.
529
            this.dataPortFuture = new CompletableFuture<>();
1✔
530

531
            // Make sure the reader and writer are stopped
532
            long timeoutNanos = timeCheck(end, "waiting for reader");
1✔
533
            if (reader.isRunning()) {
1✔
534
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
535
            }
536
            timeoutNanos = timeCheck(end, "waiting for writer");
1✔
537
            if (writer.isRunning()) {
1✔
538
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
539
            }
540

541
            timeCheck(end, "cleaning pong queue");
1✔
542
            cleanUpPongQueue();
1✔
543

544
            timeoutNanos = timeCheck(end, "connecting data port");
1✔
545
            DataPort newDataPort = this.options.buildDataPort();
1✔
546
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
1✔
547

548
            // Notify any threads waiting on the sockets
549
            this.dataPort = newDataPort;
1✔
550
            this.dataPortFuture.complete(this.dataPort);
1✔
551

552
            // Wait for the INFO message manually.
553
            // All other traffic will use the reader and writer
554
            // TLS First, don't read info until after upgrade
555
            // ---
556
            // Also this task does not have any exception catching
557
            // Since it is submitted as an async task, the future
558
            // will be aware of any exception thrown, and the future.get()
559
            // will throw an ExecutionException which is handled futher down
560
            Callable<Object> connectTask = () -> {
1✔
561
                if (!options.isTlsFirst()) {
1✔
562
                    readInitialInfo();
1✔
563
                    checkVersionRequirements();
1✔
564
                }
565
                long start = NatsSystemClock.nanoTime();
1✔
566
                upgradeToSecureIfNeeded(resolved);
1✔
567
                if (trace && options.isTLSRequired()) {
1✔
568
                    // If the time appears too long, it might be related to
569
                    // https://github.com/nats-io/nats.java#linux-platform-note
570
                    timeTraceLogger.trace("TLS upgrade took: %.3f (s)",
×
571
                            ((double) (NatsSystemClock.nanoTime() - start)) / NANOS_PER_SECOND);
×
572
                }
573
                if (options.isTlsFirst()) {
1✔
574
                    readInitialInfo();
1✔
575
                    checkVersionRequirements();
1✔
576
                }
577
                return null;
1✔
578
            };
579

580
            timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary");
1✔
581
            Future<Object> future = this.connectExecutor.submit(connectTask);
1✔
582
            try {
583
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
584
            } finally {
585
                future.cancel(true);
1✔
586
            }
587

588
            // start the reader and writer after we secured the connection, if necessary
589
            timeCheck(end, "starting reader");
1✔
590
            this.reader.start(this.dataPortFuture);
1✔
591
            timeCheck(end, "starting writer");
1✔
592
            this.writer.start(this.dataPortFuture);
1✔
593

594
            timeCheck(end, "sending connect message");
1✔
595
            this.sendConnect(resolved);
1✔
596

597
            timeoutNanos = timeCheck(end, "sending initial ping");
1✔
598
            Future<Boolean> pongFuture = sendPing();
1✔
599

600
            if (pongFuture != null) {
1✔
601
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
602
            }
603

604
            if (pingTask == null) {
1✔
605
                timeCheck(end, "starting ping and cleanup timers");
1✔
606
                long pingMillis = this.options.getPingInterval().toMillis();
1✔
607

608
                if (pingMillis > 0) {
1✔
609
                    pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
1✔
610
                        if (isConnected() && !isClosing()) {
1✔
611
                            try {
612
                                softPing(); // The timer always uses the standard queue
1✔
613
                            }
614
                            catch (Exception e) {
1✔
615
                                // it's running in a thread, there is no point throwing here
616
                            }
1✔
617
                        }
618
                    });
1✔
619
                }
620

621
                long cleanMillis = this.options.getRequestCleanupInterval().toMillis();
1✔
622

623
                if (cleanMillis > 0) {
1✔
624
                    cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
1✔
625
                }
626
            }
627

628
            // Set connected status
629
            timeCheck(end, "updating status to connected");
1✔
630
            statusLock.lock();
1✔
631
            try {
632
                this.connecting = false;
1✔
633

634
                if (this.exceptionDuringConnectChange != null) {
1✔
635
                    throw this.exceptionDuringConnectChange;
×
636
                }
637

638
                this.currentServer = cur;
1✔
639
                this.serverAuthErrors.clear(); // reset on successful connection
1✔
640
                updateStatus(Status.CONNECTED); // will signal status change, we also signal in finally
1✔
641
            } finally {
642
                statusLock.unlock();
1✔
643
            }
644
            timeTraceLogger.trace("status updated");
1✔
645
        } catch (Exception exp) {
1✔
646
            processException(exp);
1✔
647
            try {
648
                // allow force reconnect since this is pretty exceptional,
649
                // a connection failure while trying to connect
650
                this.closeSocket(false, true);
1✔
651
            } catch (InterruptedException e) {
×
652
                processException(e);
×
653
                Thread.currentThread().interrupt();
×
654
            }
1✔
655
        } finally {
656
            statusLock.lock();
1✔
657
            try {
658
                this.connecting = false;
1✔
659
                statusChanged.signalAll();
1✔
660
            } finally {
661
                statusLock.unlock();
1✔
662
            }
663
        }
664
    }
1✔
665

666
    void checkVersionRequirements() throws IOException {
667
        Options opts = getOptions();
1✔
668
        ServerInfo info = getInfo();
1✔
669

670
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
1✔
671
            throw new IOException("Server does not support no echo.");
1✔
672
        }
673
    }
1✔
674

675
    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
676
        // When already communicating over "https" websocket, do NOT try to upgrade to secure.
677
        if (!nuri.isWebsocket()) {
1✔
678
            if (options.isTlsFirst()) {
1✔
679
                dataPort.upgradeToSecure();
1✔
680
            }
681
            else {
682
                // server    | client options      | result
683
                // --------- | ------------------- | --------
684
                // required  | not isTLSRequired() | mismatch
685
                // available | not isTLSRequired() | ok
686
                // neither   | not isTLSRequired() | ok
687
                // required  | isTLSRequired()     | ok
688
                // available | isTLSRequired()     | ok
689
                // neither   | isTLSRequired()     | mismatch
690
                ServerInfo serverInfo = getInfo();
1✔
691
                if (options.isTLSRequired()) {
1✔
692
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
1✔
693
                        throw new IOException("SSL connection wanted by client.");
1✔
694
                    }
695
                    dataPort.upgradeToSecure();
1✔
696
                }
697
                else if (serverInfo.isTLSRequired()) {
1✔
698
                    throw new IOException("SSL required by server.");
1✔
699
                }
700
            }
701
        }
702
    }
1✔
703
    // Called from reader/writer thread
704
    void handleCommunicationIssue(Exception io) {
705
        // If we are connecting or disconnecting, note exception and leave
706
        statusLock.lock();
1✔
707
        try {
708
            if (this.connecting || this.disconnecting || this.status == Status.CLOSED || this.isDraining()) {
1✔
709
                this.exceptionDuringConnectChange = io;
1✔
710
                return;
1✔
711
            }
712
        } finally {
713
            statusLock.unlock();
1✔
714
        }
715

716
        processException(io);
1✔
717

718
        // Spawn a thread so we don't have timing issues with
719
        // waiting on read/write threads
720
        executor.submit(() -> {
1✔
721
            if (!tryingToConnect.get()) {
1✔
722
                try {
723
                    tryingToConnect.set(true);
1✔
724

725
                    // any issue that brings us here is pretty serious
726
                    // so we are comfortable forcing the close
727
                    this.closeSocket(true, true);
1✔
728
                } catch (InterruptedException e) {
×
729
                    processException(e);
×
730
                    Thread.currentThread().interrupt();
×
731
                } finally {
732
                    tryingToConnect.set(false);
1✔
733
                }
734
            }
735
        });
1✔
736
    }
1✔
737

738
    // Close socket is called when another connect attempt is possible
739
    // Close is called when the connection should shut down, period
740
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
741
        // Ensure we close the socket exclusively within one thread.
742
        closeSocketLock.lock();
1✔
743
        try {
744
            boolean wasConnected;
745
            statusLock.lock();
1✔
746
            try {
747
                if (isDisconnectingOrClosed()) {
1✔
748
                    waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
749
                    return;
1✔
750
                }
751
                this.disconnecting = true;
1✔
752
                this.exceptionDuringConnectChange = null;
1✔
753
                wasConnected = (this.status == Status.CONNECTED);
1✔
754
                statusChanged.signalAll();
1✔
755
            } finally {
756
                statusLock.unlock();
1✔
757
            }
758

759
            closeSocketImpl(forceClose);
1✔
760

761
            statusLock.lock();
1✔
762
            try {
763
                updateStatus(Status.DISCONNECTED);
1✔
764
                this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
1✔
765
                this.disconnecting = false;
1✔
766
                statusChanged.signalAll();
1✔
767
            } finally {
768
                statusLock.unlock();
1✔
769
            }
770

771
            if (isClosing()) { // isClosing() means we are in the close method or were asked to be
1✔
772
                close();
1✔
773
            } else if (wasConnected && tryReconnectIfConnected) {
1✔
774
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
775
            }
776
        } finally {
777
            closeSocketLock.unlock();
1✔
778
        }
779
    }
1✔
780

781
    // Close socket is called when another connect attempt is possible
782
    // Close is called when the connection should shut down, period
783
    /**
784
     * {@inheritDoc}
785
     */
786
    @Override
787
    public void close() throws InterruptedException {
788
        this.close(true, false);
1✔
789
    }
1✔
790

791
    // This method was originally built assuming there might be multiple paths to this method,
792
    // but it turns out there isn't. Not refactoring the code though, hence the warning suppression
793
    @SuppressWarnings("SameParameterValue")
794
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
795
        statusLock.lock();
1✔
796
        try {
797
            if (checkDrainStatus && this.isDraining()) {
1✔
798
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
799
                return;
1✔
800
            }
801

802
            this.closing = true;// We were asked to close, so do it
1✔
803
            if (isDisconnectingOrClosed()) {
1✔
804
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
805
                return;
1✔
806
            } else {
807
                this.disconnecting = true;
1✔
808
                this.exceptionDuringConnectChange = null;
1✔
809
                statusChanged.signalAll();
1✔
810
            }
811
        } finally {
812
            statusLock.unlock();
1✔
813
        }
814

815
        // Stop the reconnect wait timer after we stop the writer/reader (only if we are
816
        // really closing, not on errors)
817
        if (this.reconnectWaiter != null) {
1✔
818
            this.reconnectWaiter.cancel(true);
1✔
819
        }
820

821
        closeSocketImpl(forceClose);
1✔
822

823
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
1✔
824

825
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
1✔
826

827
        this.dispatchers.clear();
1✔
828
        this.subscribers.clear();
1✔
829

830
        if (pingTask != null) {
1✔
831
            pingTask.shutdown();
1✔
832
            pingTask = null;
1✔
833
        }
834
        if (cleanupTask != null) {
1✔
835
            cleanupTask.shutdown();
1✔
836
            cleanupTask = null;
1✔
837
        }
838

839
        cleanResponses(true);
1✔
840

841
        cleanUpPongQueue();
1✔
842

843
        statusLock.lock();
1✔
844
        try {
845
            updateStatus(Status.CLOSED); // will signal, we also signal when we stop disconnecting
1✔
846

847
            /*
848
             * if (exceptionDuringConnectChange != null) {
849
             * processException(exceptionDuringConnectChange); exceptionDuringConnectChange
850
             * = null; }
851
             */
852
        } finally {
853
            statusLock.unlock();
1✔
854
        }
855

856
        // Stop the error handling and connect executors
857
        callbackRunner.shutdown();
1✔
858
        try {
859
            // At this point in the flow, the connection is shutting down.
860
            // There is really no use in giving this information to the developer,
861
            // It's fair to say that an exception here anyway will practically never happen
862
            // and if it did, the app is probably already frozen.
863
            //noinspection ResultOfMethodCallIgnored
864
            callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
1✔
865
        } finally {
866
            callbackRunner.shutdownNow();
1✔
867
        }
868

869
        // There's no need to wait for running tasks since we're told to close
870
        connectExecutor.shutdownNow();
1✔
871

872
        // The callbackRunner and connectExecutor always come from a factory,
873
        // so we always shut them down.
874
        // The executor and scheduledExecutor come from a factory only if
875
        // the user does NOT supply them, so we shut them down in that case.
876
        if (options.executorIsInternal()) {
1✔
877
            executor.shutdownNow();
1✔
878
        }
879
        if (options.scheduledExecutorIsInternal()) {
1✔
880
            scheduledExecutor.shutdownNow();
1✔
881
        }
882

883
        statusLock.lock();
1✔
884
        try {
885
            this.disconnecting = false;
1✔
886
            statusChanged.signalAll();
1✔
887
        } finally {
888
            statusLock.unlock();
1✔
889
        }
890
    }
1✔
891

892
    boolean callbackRunnerIsShutdown() {
893
        return callbackRunner == null || callbackRunner.isShutdown();
1✔
894
    }
895

896
    boolean executorIsShutdown() {
897
        return executor == null || executor.isShutdown();
1✔
898
    }
899

900
    boolean connectExecutorIsShutdown() {
901
        return connectExecutor == null || connectExecutor.isShutdown();
1✔
902
    }
903

904
    boolean scheduledExecutorIsShutdown() {
905
        return scheduledExecutor == null || scheduledExecutor.isShutdown();
1✔
906
    }
907

908
    // Should only be called from closeSocket or close
909
    void closeSocketImpl(boolean forceClose) {
910
        this.currentServer = null;
1✔
911

912
        // Signal both to stop.
913
        final Future<Boolean> readStop = this.reader.stop();
1✔
914
        final Future<Boolean> writeStop = this.writer.stop();
1✔
915

916
        // Now wait until they both stop before closing the socket.
917
        try {
918
            readStop.get(1, TimeUnit.SECONDS);
1✔
919
        } catch (Exception ex) {
1✔
920
            //
921
        }
1✔
922
        try {
923
            writeStop.get(1, TimeUnit.SECONDS);
1✔
924
        } catch (Exception ex) {
×
925
            //
926
        }
1✔
927

928
        // Close and reset the current data port and future
929
        if (dataPortFuture != null) {
1✔
930
            dataPortFuture.cancel(true);
1✔
931
            dataPortFuture = null;
1✔
932
        }
933

934
        // Close the current socket and cancel anyone waiting for it
935
        try {
936
            if (dataPort != null) {
1✔
937
                if (forceClose) {
1✔
938
                    dataPort.forceClose();
1✔
939
                }
940
                else {
941
                    dataPort.close();
1✔
942
                }
943
            }
944

945
        } catch (IOException ex) {
×
946
            processException(ex);
×
947
        }
1✔
948
        cleanUpPongQueue();
1✔
949

950
        try {
951
            this.reader.stop().get(10, TimeUnit.SECONDS);
1✔
952
        } catch (Exception ex) {
×
953
            processException(ex);
×
954
        }
1✔
955
        try {
956
            this.writer.stop().get(10, TimeUnit.SECONDS);
1✔
957
        } catch (Exception ex) {
×
958
            processException(ex);
×
959
        }
1✔
960
    }
1✔
961

962
    void cleanUpPongQueue() {
963
        Future<Boolean> b;
964
        while ((b = pongQueue.poll()) != null) {
1✔
965
            b.cancel(true);
1✔
966
        }
967
    }
1✔
968

969
    /**
970
     * {@inheritDoc}
971
     */
972
    @Override
973
    public void publish(String subject, byte[] body) {
974
        publishInternal(subject, null, null, body, true, false);
1✔
975
    }
1✔
976

977
    /**
978
     * {@inheritDoc}
979
     */
980
    @Override
981
    public void publish(String subject, Headers headers, byte[] body) {
982
        publishInternal(subject, null, headers, body, true, false);
1✔
983
    }
1✔
984

985
    /**
986
     * {@inheritDoc}
987
     */
988
    @Override
989
    public void publish(String subject, String replyTo, byte[] body) {
990
        publishInternal(subject, replyTo, null, body, true, false);
1✔
991
    }
1✔
992

993
    /**
994
     * {@inheritDoc}
995
     */
996
    @Override
997
    public void publish(String subject, String replyTo, Headers headers, byte[] body) {
998
        publishInternal(subject, replyTo, headers, body, true, false);
1✔
999
    }
1✔
1000

1001
    /**
1002
     * {@inheritDoc}
1003
     */
1004
    @Override
1005
    public void publish(Message message) {
1006
        validateNotNull(message, "Message");
1✔
1007
        publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
1✔
1008
    }
1✔
1009

1010
    void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1011
        checkPayloadSize(data);
1✔
1012
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1013
        if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
1✔
1014
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
1✔
1015
        }
1016

1017
        if (isClosed()) {
1✔
1018
            throw new IllegalStateException("Connection is Closed");
1✔
1019
        } else if (blockPublishForDrain.get()) {
1✔
1020
            throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
×
1021
        }
1022

1023
        if ((status == Status.RECONNECTING || status == Status.DISCONNECTED)
1✔
1024
                && !this.writer.canQueueDuringReconnect(npm)) {
1✔
1025
            throw new IllegalStateException(
1✔
1026
                    "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
1✔
1027
        }
1028

1029
        queueOutgoing(npm);
1✔
1030
    }
1✔
1031

1032
    private void checkPayloadSize(byte[] body) {
1033
        if (options.clientSideLimitChecks() && body != null && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
1✔
1034
            throw new IllegalArgumentException(
1✔
1035
                "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
1✔
1036
        }
1037
    }
1✔
1038
    /**
1039
     * {@inheritDoc}
1040
     */
1041
    @Override
1042
    public Subscription subscribe(String subject) {
1043
        validateSubject(subject, true);
1✔
1044
        return createSubscription(subject, null, null, null);
1✔
1045
    }
1046

1047
    /**
1048
     * {@inheritDoc}
1049
     */
1050
    @Override
1051
    public Subscription subscribe(String subject, String queueName) {
1052
        validateSubject(subject, true);
1✔
1053
        validateQueueName(queueName, true);
1✔
1054
        return createSubscription(subject, queueName, null, null);
1✔
1055
    }
1056

1057
    void invalidate(NatsSubscription sub) {
1058
        remove(sub);
1✔
1059
        sub.invalidate();
1✔
1060
    }
1✔
1061

1062
    void remove(NatsSubscription sub) {
1063
        CharSequence sid = sub.getSID();
1✔
1064
        subscribers.remove(sid);
1✔
1065

1066
        if (sub.getNatsDispatcher() != null) {
1✔
1067
            sub.getNatsDispatcher().remove(sub);
1✔
1068
        }
1069
    }
1✔
1070

1071
    void unsubscribe(NatsSubscription sub, int after) {
1072
        if (isClosed()) { // last chance, usually sub will catch this
1✔
1073
            throw new IllegalStateException("Connection is Closed");
×
1074
        }
1075

1076
        if (after <= 0) {
1✔
1077
            this.invalidate(sub); // Will clean it up
1✔
1078
        } else {
1079
            sub.setUnsubLimit(after);
1✔
1080

1081
            if (sub.reachedUnsubLimit()) {
1✔
1082
                sub.invalidate();
1✔
1083
            }
1084
        }
1085

1086
        if (!isConnected()) {
1✔
1087
            return; // We will set up sub on reconnect or ignore
1✔
1088
        }
1089

1090
        sendUnsub(sub, after);
1✔
1091
    }
1✔
1092

1093
    void sendUnsub(NatsSubscription sub, int after) {
1094
        ByteArrayBuilder bab =
1✔
1095
            new ByteArrayBuilder().append(UNSUB_SP_BYTES).append(sub.getSID());
1✔
1096
        if (after > 0) {
1✔
1097
            bab.append(SP).append(after);
1✔
1098
        }
1099
        queueOutgoing(new ProtocolMessage(bab, true));
1✔
1100
    }
1✔
1101

1102
    // Assumes the null/empty checks were handled elsewhere
1103
    NatsSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, NatsSubscriptionFactory factory) {
1104
        if (isClosed()) {
1✔
1105
            throw new IllegalStateException("Connection is Closed");
1✔
1106
        } else if (isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
1✔
1107
            throw new IllegalStateException("Connection is Draining");
1✔
1108
        }
1109

1110
        NatsSubscription sub;
1111
        String sid = getNextSid();
1✔
1112

1113
        if (factory == null) {
1✔
1114
            sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1115
        }
1116
        else {
1117
            sub = factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1118
        }
1119
        subscribers.put(sid, sub);
1✔
1120

1121
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1122
        return sub;
1✔
1123
    }
1124

1125
    String getNextSid() {
1126
        return Long.toString(nextSid.getAndIncrement());
1✔
1127
    }
1128

1129
    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
1130
        String sid = getNextSid();
1✔
1131
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1132
        subscribers.put(sid, sub);
1✔
1133
        return sid;
1✔
1134
    }
1135

1136
    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
1137
        if (!isConnected()) {
1✔
1138
            return; // We will set up sub on reconnect or ignore
1✔
1139
        }
1140

1141
        ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
1✔
1142
        if (queueName != null) {
1✔
1143
            bab.append(SP).append(queueName);
1✔
1144
        }
1145
        bab.append(SP).append(sid);
1✔
1146

1147
        // setting this to filter on stop.
1148
        // if it's an "internal" message, it won't be filtered
1149
        // if it's a normal message, the subscription will already be registered
1150
        // and therefore will be re-subscribed after a stop anyway
1151
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
1✔
1152
        if (treatAsInternal) {
1✔
1153
            queueInternalOutgoing(subMsg);
1✔
1154
        } else {
1155
            queueOutgoing(subMsg);
1✔
1156
        }
1157
    }
1✔
1158

1159
    /**
1160
     * {@inheritDoc}
1161
     */
1162
    @Override
1163
    public String createInbox() {
1164
        return options.getInboxPrefix() + nuid.next();
1✔
1165
    }
1166

1167
    int getRespInboxLength() {
1168
        return options.getInboxPrefix().length() + 22 + 1; // 22 for nuid, 1 for .
1✔
1169
    }
1170

1171
    String createResponseInbox(String inbox) {
1172
        // Substring gets rid of the * [trailing]
1173
        return inbox.substring(0, getRespInboxLength()) + nuid.next();
1✔
1174
    }
1175

1176
    // If the inbox is long enough, pull out the end part, otherwise, just use the
1177
    // full thing
1178
    String getResponseToken(String responseInbox) {
1179
        int len = getRespInboxLength();
1✔
1180
        if (responseInbox.length() <= len) {
1✔
1181
            return responseInbox;
1✔
1182
        }
1183
        return responseInbox.substring(len);
1✔
1184
    }
1185

1186
    void cleanResponses(boolean closing) {
1187
        ArrayList<String> toRemove = new ArrayList<>();
1✔
1188
        boolean wasInterrupted = false;
1✔
1189

1190
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
1✔
1191
            boolean remove = false;
1✔
1192
            NatsRequestCompletableFuture future = entry.getValue();
1✔
1193
            if (future.hasExceededTimeout()) {
1✔
1194
                remove = true;
1✔
1195
                future.cancelTimedOut();
1✔
1196
            }
1197
            else if (closing) {
1✔
1198
                remove = true;
1✔
1199
                future.cancelClosing();
1✔
1200
            }
1201
            else if (future.isDone()) {
1✔
1202
                // done should have already been removed, not sure if
1203
                // this even needs checking, but it won't hurt
1204
                remove = true;
1✔
1205
                try {
1206
                    future.get();
×
1207
                }
1208
                catch (InterruptedException e) {
×
1209
                    Thread.currentThread().interrupt();
×
1210
                    // We might have collected some entries already, but were interrupted.
1211
                    // Break out so we finish as quick as possible,
1212
                    // cleanResponses will be called again anyway
1213
                    wasInterrupted = true;
×
1214
                    break;
×
1215
                }
1216
                catch (Throwable ignore) {}
1✔
1217
            }
1218

1219
            if (remove) {
1✔
1220
                toRemove.add(entry.getKey());
1✔
1221
                statistics.decrementOutstandingRequests();
1✔
1222
            }
1223
        }
1✔
1224

1225
        for (String key : toRemove) {
1✔
1226
            responsesAwaiting.remove(key);
1✔
1227
        }
1✔
1228

1229
        if (advancedTracking && !wasInterrupted) {
1✔
1230
            toRemove.clear(); // we can reuse this but it needs to be cleared
1✔
1231
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1✔
1232
                NatsRequestCompletableFuture future = entry.getValue();
1✔
1233
                if (future.hasExceededTimeout()) {
1✔
1234
                    toRemove.add(entry.getKey());
1✔
1235
                    future.cancelTimedOut();
1✔
1236
                }
1237
            }
1✔
1238

1239
            for (String token : toRemove) {
1✔
1240
                responsesRespondedTo.remove(token);
1✔
1241
            }
1✔
1242
        }
1243
    }
1✔
1244

1245
    /**
1246
     * {@inheritDoc}
1247
     */
1248
    @Override
1249
    public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
1250
        return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1251
    }
1252

1253
    /**
1254
     * {@inheritDoc}
1255
     */
1256
    @Override
1257
    public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
1258
        return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1259
    }
1260

1261
    /**
1262
     * {@inheritDoc}
1263
     */
1264
    @Override
1265
    public Message request(Message message, Duration timeout) throws InterruptedException {
1266
        validateNotNull(message, "Message");
1✔
1267
        return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1268
    }
1269

1270
    Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout,
1271
                            CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
1272
        CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1273
        try {
1274
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
1275
        } catch (TimeoutException | ExecutionException | CancellationException e) {
1✔
1276
            return null;
1✔
1277
        }
1278
    }
1279

1280
    /**
1281
     * {@inheritDoc}
1282
     */
1283
    @Override
1284
    public CompletableFuture<Message> request(String subject, byte[] body) {
1285
        return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1286
    }
1287

1288
    /**
1289
     * {@inheritDoc}
1290
     */
1291
    @Override
1292
    public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
1293
        return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1294
    }
1295

1296
    /**
1297
     * {@inheritDoc}
1298
     */
1299
    @Override
1300
    public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
1301
        return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1302
    }
1303

1304
    /**
1305
     * {@inheritDoc}
1306
     */
1307
    @Override
1308
    public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
1309
        return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1310
    }
1311

1312
    /**
1313
     * {@inheritDoc}
1314
     */
1315
    @Override
1316
    public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
1317
        validateNotNull(message, "Message");
1✔
1318
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1319
    }
1320

1321
    /**
1322
     * {@inheritDoc}
1323
     */
1324
    @Override
1325
    public CompletableFuture<Message> request(Message message) {
1326
        validateNotNull(message, "Message");
1✔
1327
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
1✔
1328
    }
1329

1330
    CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout,
1331
                                                     CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1332
        checkPayloadSize(data);
1✔
1333

1334
        if (isClosed()) {
1✔
1335
            throw new IllegalStateException("Connection is Closed");
1✔
1336
        } else if (isDraining()) {
1✔
1337
            throw new IllegalStateException("Connection is Draining");
1✔
1338
        }
1339

1340
        if (inboxDispatcher.get() == null) {
1✔
1341
            inboxDispatcherLock.lock();
1✔
1342
            try {
1343
                if (inboxDispatcher.get() == null) {
1✔
1344
                    NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1✔
1345

1346
                    // Ensure the dispatcher is started before publishing messages
1347
                    String id = this.nuid.next();
1✔
1348
                    this.dispatchers.put(id, d);
1✔
1349
                    d.start(id);
1✔
1350
                    d.subscribe(this.mainInbox);
1✔
1351
                    inboxDispatcher.set(d);
1✔
1352
                }
1353
            } finally {
1354
                inboxDispatcherLock.unlock();
1✔
1355
            }
1356
        }
1357

1358
        boolean oldStyle = options.isOldRequestStyle();
1✔
1359
        String responseInbox = oldStyle ? createInbox() : createResponseInbox(this.mainInbox);
1✔
1360
        String responseToken = getResponseToken(responseInbox);
1✔
1361
        NatsRequestCompletableFuture future =
1✔
1362
            new NatsRequestCompletableFuture(cancelAction,
1363
                futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());
1✔
1364

1365
        if (!oldStyle) {
1✔
1366
            responsesAwaiting.put(responseToken, future);
1✔
1367
        }
1368
        statistics.incrementOutstandingRequests();
1✔
1369

1370
        if (oldStyle) {
1✔
1371
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
1✔
1372
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
1✔
1373
            dispatcher.unsubscribe(responseInbox, 1);
1✔
1374
            // Unsubscribe when future is cancelled:
1375
            future.whenComplete((msg, exception) -> {
1✔
1376
                if (exception instanceof CancellationException) {
1✔
1377
                    dispatcher.unsubscribe(responseInbox);
×
1378
                }
1379
            });
1✔
1380
            responsesAwaiting.put(sub.getSID(), future);
1✔
1381
        }
1382

1383
        publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1384
        statistics.incrementRequestsSent();
1✔
1385

1386
        return future;
1✔
1387
    }
1388

1389
    void deliverReply(Message msg) {
1390
        boolean oldStyle = options.isOldRequestStyle();
1✔
1391
        String subject = msg.getSubject();
1✔
1392
        String token = getResponseToken(subject);
1✔
1393
        String key = oldStyle ? msg.getSID() : token;
1✔
1394
        NatsRequestCompletableFuture f = responsesAwaiting.remove(key);
1✔
1395
        if (f != null) {
1✔
1396
            if (advancedTracking) {
1✔
1397
                responsesRespondedTo.put(key, f);
1✔
1398
            }
1399
            statistics.decrementOutstandingRequests();
1✔
1400
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
1✔
1401
                switch (f.getCancelAction()) {
1✔
1402
                    case COMPLETE:
1403
                        f.complete(msg);
1✔
1404
                        break;
1✔
1405
                    case REPORT:
1406
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
1✔
1407
                        break;
1✔
1408
                    case CANCEL:
1409
                    default:
1410
                        f.cancel(true);
1✔
1411
                }
1412
            }
1413
            else {
1414
                f.complete(msg);
1✔
1415
            }
1416
            statistics.incrementRepliesReceived();
1✔
1417
        }
1418
        else if (!oldStyle && !subject.startsWith(mainInbox)) {
1✔
1419
            if (advancedTracking) {
1✔
1420
                if (responsesRespondedTo.get(key) != null) {
1✔
1421
                    statistics.incrementDuplicateRepliesReceived();
1✔
1422
                } else {
1423
                    statistics.incrementOrphanRepliesReceived();
1✔
1424
                }
1425
            }
1426
        }
1427
    }
1✔
1428

1429
    public Dispatcher createDispatcher() {
1430
        return createDispatcher(null);
1✔
1431
    }
1432

1433
    public Dispatcher createDispatcher(MessageHandler handler) {
1434
        if (isClosed()) {
1✔
1435
            throw new IllegalStateException("Connection is Closed");
1✔
1436
        } else if (isDraining()) {
1✔
1437
            throw new IllegalStateException("Connection is Draining");
1✔
1438
        }
1439

1440
        NatsDispatcher dispatcher = dispatcherFactory.createDispatcher(this, handler);
1✔
1441
        String id = this.nuid.next();
1✔
1442
        this.dispatchers.put(id, dispatcher);
1✔
1443
        dispatcher.start(id);
1✔
1444
        return dispatcher;
1✔
1445
    }
1446

1447
    public void closeDispatcher(Dispatcher d) {
1448
        if (isClosed()) {
1✔
1449
            throw new IllegalStateException("Connection is Closed");
1✔
1450
        } else if (!(d instanceof NatsDispatcher)) {
1✔
1451
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
×
1452
        }
1453

1454
        NatsDispatcher nd = ((NatsDispatcher) d);
1✔
1455

1456
        if (nd.isDraining()) {
1✔
1457
            return; // No op while draining
1✔
1458
        }
1459

1460
        if (!this.dispatchers.containsKey(nd.getId())) {
1✔
1461
            throw new IllegalArgumentException("Dispatcher is already closed.");
1✔
1462
        }
1463

1464
        cleanupDispatcher(nd);
1✔
1465
    }
1✔
1466

1467
    void cleanupDispatcher(NatsDispatcher nd) {
1468
        nd.stop(true);
1✔
1469
        this.dispatchers.remove(nd.getId());
1✔
1470
    }
1✔
1471

1472
    Map<String, Dispatcher> getDispatchers() {
1473
        return Collections.unmodifiableMap(dispatchers);
1✔
1474
    }
1475

1476
    public void addConnectionListener(ConnectionListener connectionListener) {
1477
        connectionListeners.add(connectionListener);
1✔
1478
    }
1✔
1479

1480
    public void removeConnectionListener(ConnectionListener connectionListener) {
1481
        connectionListeners.remove(connectionListener);
1✔
1482
    }
1✔
1483

1484
    public void flush(Duration timeout) throws TimeoutException, InterruptedException {
1485

1486
        Instant start = Instant.now();
1✔
1487
        waitForConnectOrClose(timeout);
1✔
1488

1489
        if (isClosed()) {
1✔
1490
            throw new TimeoutException("Attempted to flush while closed");
1✔
1491
        }
1492

1493
        if (timeout == null) {
1✔
1494
            timeout = Duration.ZERO;
1✔
1495
        }
1496

1497
        Instant now = Instant.now();
1✔
1498
        Duration waitTime = Duration.between(start, now);
1✔
1499

1500
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
1✔
1501
            throw new TimeoutException("Timeout out waiting for connection before flush.");
1✔
1502
        }
1503

1504
        try {
1505
            Future<Boolean> waitForIt = sendPing();
1✔
1506

1507
            if (waitForIt == null) { // error in the send ping code
1✔
1508
                return;
×
1509
            }
1510

1511
            long nanos = timeout.toNanos();
1✔
1512

1513
            if (nanos > 0) {
1✔
1514

1515
                nanos -= waitTime.toNanos();
1✔
1516

1517
                if (nanos <= 0) {
1✔
1518
                    nanos = 1; // let the future timeout if it isn't resolved
×
1519
                }
1520

1521
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
1✔
1522
            } else {
1523
                waitForIt.get();
1✔
1524
            }
1525

1526
            this.statistics.incrementFlushCounter();
1✔
1527
        } catch (ExecutionException | CancellationException e) {
1✔
1528
            throw new TimeoutException(e.toString());
1✔
1529
        }
1✔
1530
    }
1✔
1531

1532
    void sendConnect(NatsUri nuri) throws IOException {
1533
        try {
1534
            ServerInfo info = this.serverInfo.get();
1✔
1535
            // This is changed - we used to use info.isAuthRequired(), but are changing it to
1536
            // better match older versions of the server. It may change again in the future.
1537
            CharBuffer connectOptions = options.buildProtocolConnectOptionsString(
1✔
1538
                nuri.toString(), true, info.getNonce());
1✔
1539
            ByteArrayBuilder bab =
1✔
1540
                new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
1✔
1541
                    .append(CONNECT_SP_BYTES).append(connectOptions);
1✔
1542
            queueInternalOutgoing(new ProtocolMessage(bab, false));
1✔
1543
        } catch (Exception exp) {
1✔
1544
            throw new IOException("Error sending connect string", exp);
1✔
1545
        }
1✔
1546
    }
1✔
1547

1548
    CompletableFuture<Boolean> sendPing() {
1549
        return this.sendPing(true);
1✔
1550
    }
1551

1552
    void softPing() {
1553
        this.sendPing(false);
1✔
1554
    }
1✔
1555

1556
    /**
1557
     * {@inheritDoc}
1558
     */
1559
    @Override
1560
    public Duration RTT() throws IOException {
1561
        if (!isConnected()) {
1✔
1562
            throw new IOException("Must be connected to do RTT.");
1✔
1563
        }
1564

1565
        long timeout = options.getConnectionTimeout().toMillis();
1✔
1566
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1567
        pongQueue.add(pongFuture);
1✔
1568
        try {
1569
            long time = NatsSystemClock.nanoTime();
1✔
1570
            writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
1✔
1571
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
1✔
1572
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
1✔
1573
        }
1574
        catch (ExecutionException e) {
×
1575
            throw new IOException(e.getCause());
×
1576
        }
1577
        catch (TimeoutException e) {
×
1578
            throw new IOException(e);
×
1579
        }
1580
        catch (InterruptedException e) {
×
1581
            Thread.currentThread().interrupt();
×
1582
            throw new IOException(e);
×
1583
        }
1584
    }
1585

1586
    // Send a ping request and push a pong future on the queue.
1587
    // Futures are completed in order, keep this one if a thread wants to wait
1588
    // for a specific pong. Note, if no pong returns, the wait will not return
1589
    // without setting a timeout.
1590
    CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
1591
        if (!isConnectedOrConnecting()) {
1✔
1592
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1593
            retVal.complete(Boolean.FALSE);
1✔
1594
            return retVal;
1✔
1595
        }
1596

1597
        if (!treatAsInternal && !this.needPing.get()) {
1✔
1598
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1599
            retVal.complete(Boolean.TRUE);
1✔
1600
            this.needPing.set(true);
1✔
1601
            return retVal;
1✔
1602
        }
1603

1604
        int max = options.getMaxPingsOut();
1✔
1605
        if (max > 0 && pongQueue.size() + 1 > max) {
1✔
1606
            handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
1✔
1607
            return null;
1✔
1608
        }
1609

1610
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1611
        pongQueue.add(pongFuture);
1✔
1612

1613
        if (treatAsInternal) {
1✔
1614
            queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1615
        } else {
1616
            queueOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1617
        }
1618

1619
        this.needPing.set(true);
1✔
1620
        this.statistics.incrementPingCount();
1✔
1621
        return pongFuture;
1✔
1622
    }
1623

1624
    // This is a minor speed / memory enhancement.
1625
    // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state,
1626
    // but it is safe to share the data bytes and the size since those fields are just being read
1627
    // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1628
    // reducing allocation of data for something that is often created and used.
1629
    // These static instances are the ones that are used for copying in sendPing and sendPong
1630
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1✔
1631
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1✔
1632

1633
    void sendPong() {
1634
        queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
1✔
1635
    }
1✔
1636

1637
    // Called by the reader
1638
    void handlePong() {
1639
        CompletableFuture<Boolean> pongFuture = pongQueue.pollFirst();
1✔
1640
        if (pongFuture != null) {
1✔
1641
            pongFuture.complete(Boolean.TRUE);
1✔
1642
        }
1643
    }
1✔
1644

1645
    void readInitialInfo() throws IOException {
1646
        byte[] readBuffer = new byte[options.getBufferSize()];
1✔
1647
        ByteBuffer protocolBuffer = ByteBuffer.allocate(options.getBufferSize());
1✔
1648
        boolean gotCRLF = false;
1✔
1649
        boolean gotCR = false;
1✔
1650

1651
        while (!gotCRLF) {
1✔
1652
            int read = this.dataPort.read(readBuffer, 0, readBuffer.length);
1✔
1653

1654
            if (read < 0) {
1✔
1655
                break;
1✔
1656
            }
1657

1658
            int i = 0;
1✔
1659
            while (i < read) {
1✔
1660
                byte b = readBuffer[i++];
1✔
1661

1662
                if (gotCR) {
1✔
1663
                    if (b != LF) {
1✔
1664
                        throw new IOException("Missed LF after CR waiting for INFO.");
1✔
1665
                    } else if (i < read) {
1✔
1666
                        throw new IOException("Read past initial info message.");
1✔
1667
                    }
1668

1669
                    gotCRLF = true;
1✔
1670
                    break;
1✔
1671
                }
1672

1673
                if (b == CR) {
1✔
1674
                    gotCR = true;
1✔
1675
                } else {
1676
                    if (!protocolBuffer.hasRemaining()) {
1✔
1677
                        protocolBuffer = enlargeBuffer(protocolBuffer); // just double it
1✔
1678
                    }
1679
                    protocolBuffer.put(b);
1✔
1680
                }
1681
            }
1✔
1682
        }
1✔
1683

1684
        if (!gotCRLF) {
1✔
1685
            throw new IOException("Failed to read initial info message.");
1✔
1686
        }
1687

1688
        protocolBuffer.flip();
1✔
1689

1690
        String infoJson = UTF_8.decode(protocolBuffer).toString();
1✔
1691
        infoJson = infoJson.trim();
1✔
1692
        String[] msg = infoJson.split("\\s");
1✔
1693
        String op = msg[0].toUpperCase();
1✔
1694

1695
        if (!OP_INFO.equals(op)) {
1✔
1696
            throw new IOException("Received non-info initial message.");
1✔
1697
        }
1698

1699
        handleInfo(infoJson);
1✔
1700
    }
1✔
1701

1702
    void handleInfo(String infoJson) {
1703
        ServerInfo serverInfo = new ServerInfo(infoJson);
1✔
1704
        this.serverInfo.set(serverInfo);
1✔
1705

1706
        List<String> urls = this.serverInfo.get().getConnectURLs();
1✔
1707
        if (!urls.isEmpty()) {
1✔
1708
            if (serverPool.acceptDiscoveredUrls(urls)) {
1✔
1709
                processConnectionEvent(Events.DISCOVERED_SERVERS);
1✔
1710
            }
1711
        }
1712

1713
        if (serverInfo.isLameDuckMode()) {
1✔
1714
            processConnectionEvent(Events.LAME_DUCK);
1✔
1715
        }
1716
    }
1✔
1717

1718
    void queueOutgoing(NatsMessage msg) {
1719
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1720
            throw new IllegalArgumentException("Control line is too long");
1✔
1721
        }
1722
        if (!writer.queue(msg)) {
1✔
1723
            options.getErrorListener().messageDiscarded(this, msg);
1✔
1724
        }
1725
    }
1✔
1726

1727
    void queueInternalOutgoing(NatsMessage msg) {
1728
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1729
            throw new IllegalArgumentException("Control line is too long");
×
1730
        }
1731
        this.writer.queueInternalMessage(msg);
1✔
1732
    }
1✔
1733

1734
    void deliverMessage(NatsMessage msg) {
1735
        this.needPing.set(false);
1✔
1736
        this.statistics.incrementInMsgs();
1✔
1737
        this.statistics.incrementInBytes(msg.getSizeInBytes());
1✔
1738

1739
        NatsSubscription sub = subscribers.get(msg.getSID());
1✔
1740

1741
        if (sub != null) {
1✔
1742
            msg.setSubscription(sub);
1✔
1743

1744
            NatsDispatcher d = sub.getNatsDispatcher();
1✔
1745
            NatsConsumer c = (d == null) ? sub : d;
1✔
1746
            MessageQueue q = ((d == null) ? sub.getMessageQueue() : d.getMessageQueue());
1✔
1747

1748
            if (c.hasReachedPendingLimits()) {
1✔
1749
                // Drop the message and count it
1750
                this.statistics.incrementDroppedCount();
1✔
1751
                c.incrementDroppedCount();
1✔
1752

1753
                // Notify the first time
1754
                if (!c.isMarkedSlow()) {
1✔
1755
                    c.markSlow();
1✔
1756
                    processSlowConsumer(c);
1✔
1757
                }
1758
            } else if (q != null) {
1✔
1759
                c.markNotSlow();
1✔
1760

1761
                // beforeQueueProcessor returns true if the message is allowed to be queued
1762
                if (sub.getBeforeQueueProcessor().apply(msg)) {
1✔
1763
                    q.push(msg);
1✔
1764
                }
1765
            }
1766

1767
        }
1768
//        else {
1769
//            // Drop messages we don't have a subscriber for (could be extras on an
1770
//            // auto-unsub for example)
1771
//        }
1772
    }
1✔
1773

1774
    void processOK() {
1775
        this.statistics.incrementOkCount();
1✔
1776
    }
1✔
1777

1778
    void processSlowConsumer(Consumer consumer) {
1779
        if (!this.callbackRunner.isShutdown()) {
1✔
1780
            try {
1781
                this.callbackRunner.execute(() -> {
1✔
1782
                    try {
1783
                        options.getErrorListener().slowConsumerDetected(this, consumer);
1✔
1784
                    } catch (Exception ex) {
1✔
1785
                        this.statistics.incrementExceptionCount();
1✔
1786
                    }
1✔
1787
                });
1✔
1788
            } catch (RejectedExecutionException re) {
×
1789
                // Timing with shutdown, let it go
1790
            }
1✔
1791
        }
1792
    }
1✔
1793

1794
    void processException(Exception exp) {
1795
        this.statistics.incrementExceptionCount();
1✔
1796

1797
        if (!this.callbackRunner.isShutdown()) {
1✔
1798
            try {
1799
                this.callbackRunner.execute(() -> {
1✔
1800
                    try {
1801
                        options.getErrorListener().exceptionOccurred(this, exp);
1✔
1802
                    } catch (Exception ex) {
1✔
1803
                        this.statistics.incrementExceptionCount();
1✔
1804
                    }
1✔
1805
                });
1✔
UNCOV
1806
            } catch (RejectedExecutionException re) {
×
1807
                // Timing with shutdown, let it go
1808
            }
1✔
1809
        }
1810
    }
1✔
1811

1812
    void processError(String errorText) {
1813
        this.statistics.incrementErrCount();
1✔
1814

1815
        this.lastError.set(errorText);
1✔
1816
        this.connectError.set(errorText); // even if this isn't during connection, save it just in case
1✔
1817

1818
        // If we are connected && we get an authentication error, save it
1819
        if (this.isConnected() && this.isAuthenticationError(errorText) && currentServer != null) {
1✔
1820
            this.serverAuthErrors.put(currentServer, errorText);
1✔
1821
        }
1822

1823
        if (!this.callbackRunner.isShutdown()) {
1✔
1824
            try {
1825
                this.callbackRunner.execute(() -> {
1✔
1826
                    try {
1827
                        options.getErrorListener().errorOccurred(this, errorText);
1✔
1828
                    } catch (Exception ex) {
1✔
1829
                        this.statistics.incrementExceptionCount();
1✔
1830
                    }
1✔
1831
                });
1✔
1832
            } catch (RejectedExecutionException re) {
×
1833
                // Timing with shutdown, let it go
1834
            }
1✔
1835
        }
1836
    }
1✔
1837

1838
    interface ErrorListenerCaller {
1839
        void call(Connection conn, ErrorListener el);
1840
    }
1841

1842
    void executeCallback(ErrorListenerCaller elc) {
1843
        if (!this.callbackRunner.isShutdown()) {
1✔
1844
            try {
1845
                this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
1✔
1846
            } catch (RejectedExecutionException re) {
×
1847
                // Timing with shutdown, let it go
1848
            }
1✔
1849
        }
1850
    }
1✔
1851

1852
    void processConnectionEvent(Events type) {
1853
        if (!this.callbackRunner.isShutdown()) {
1✔
1854
            try {
1855
                for (ConnectionListener listener : connectionListeners) {
1✔
1856
                    this.callbackRunner.execute(() -> {
1✔
1857
                        try {
1858
                            listener.connectionEvent(this, type);
1✔
1859
                        } catch (Exception ex) {
1✔
1860
                            this.statistics.incrementExceptionCount();
1✔
1861
                        }
1✔
1862
                    });
1✔
1863
                }
1✔
1864
            } catch (RejectedExecutionException re) {
×
1865
                // Timing with shutdown, let it go
1866
            }
1✔
1867
        }
1868
    }
1✔
1869

1870
    /**
1871
     * {@inheritDoc}
1872
     */
1873
    @Override
1874
    public ServerInfo getServerInfo() {
1875
        return getInfo();
1✔
1876
    }
1877

1878
    /**
1879
     * {@inheritDoc}
1880
     */
1881
    @Override
1882
    public InetAddress getClientInetAddress() {
1883
        try {
1884
            return NatsInetAddress.getByName(getInfo().getClientIp());
1✔
1885
        }
1886
        catch (Exception e) {
×
1887
            return null;
×
1888
        }
1889
    }
1890

1891
    ServerInfo getInfo() {
1892
        return this.serverInfo.get();
1✔
1893
    }
1894

1895
    /**
1896
     * {@inheritDoc}
1897
     */
1898
    @Override
1899
    public Options getOptions() {
1900
        return this.options;
1✔
1901
    }
1902

1903
    /**
1904
     * {@inheritDoc}
1905
     */
1906
    @Override
1907
    public Statistics getStatistics() {
1908
        return this.statistics;
1✔
1909
    }
1910

1911
    StatisticsCollector getNatsStatistics() {
1912
        return this.statistics;
1✔
1913
    }
1914

1915
    DataPort getDataPort() {
1916
        return this.dataPort;
1✔
1917
    }
1918

1919
    // Used for testing
1920
    int getConsumerCount() {
1921
        return this.subscribers.size() + this.dispatchers.size();
1✔
1922
    }
1923

1924
    public long getMaxPayload() {
1925
        ServerInfo info = this.serverInfo.get();
1✔
1926

1927
        if (info == null) {
1✔
1928
            return -1;
×
1929
        }
1930

1931
        return info.getMaxPayload();
1✔
1932
    }
1933

1934
    /**
1935
     * Return the list of known server urls, including additional servers discovered
1936
     * after a connection has been established.
1937
     * @return this connection's list of known server URLs
1938
     */
1939
    public Collection<String> getServers() {
1940
        return serverPool.getServerList();
1✔
1941
    }
1942

1943
    protected List<NatsUri> resolveHost(NatsUri nuri) {
1944
        // 1. If the nuri host is not already an ip address or the nuri is not for websocket or fast fallback is disabled,
1945
        //    let the pool resolve it.
1946
        List<NatsUri> results = new ArrayList<>();
1✔
1947
        if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && !options.isEnableFastFallback()) {
1✔
1948
            List<String> ips = serverPool.resolveHostToIps(nuri.getHost());
1✔
1949
            if (ips != null) {
1✔
1950
                for (String ip : ips) {
1✔
1951
                    try {
1952
                        results.add(nuri.reHost(ip));
1✔
1953
                    }
1954
                    catch (URISyntaxException u) {
1✔
1955
                        // ??? should never happen
1956
                    }
1✔
1957
                }
1✔
1958
            }
1959
        }
1960

1961
        // 2. If there were no results,
1962
        //    - host was already an ip address or
1963
        //    - host was for websocket or
1964
        //    - fast fallback is enabled
1965
        //    - pool returned nothing or
1966
        //    - resolving failed...
1967
        //    so the list just becomes the original host.
1968
        if (results.isEmpty()) {
1✔
1969
            results.add(nuri);
1✔
1970
        }
1971
        return results;
1✔
1972
    }
1973

1974
    /**
1975
     * {@inheritDoc}
1976
     */
1977
    @Override
1978
    public String getConnectedUrl() {
1979
        return currentServer == null ? null : currentServer.toString();
1✔
1980
    }
1981

1982
    /**
1983
     * {@inheritDoc}
1984
     */
1985
    @Override
1986
    public Status getStatus() {
1987
        return this.status;
1✔
1988
    }
1989

1990
    /**
1991
     * {@inheritDoc}
1992
     */
1993
    @Override
1994
    public String getLastError() {
1995
        return this.lastError.get();
1✔
1996
    }
1997

1998
    /**
1999
     * {@inheritDoc}
2000
     */
2001
    @Override
2002
    public void clearLastError() {
2003
        this.lastError.set("");
1✔
2004
    }
1✔
2005

2006
    ExecutorService getExecutor() {
2007
        return executor;
1✔
2008
    }
2009

2010
    ScheduledExecutorService getScheduledExecutor() {
2011
        return scheduledExecutor;
1✔
2012
    }
2013

2014
    void updateStatus(Status newStatus) {
2015
        Status oldStatus = this.status;
1✔
2016

2017
        statusLock.lock();
1✔
2018
        try {
2019
            if (oldStatus == Status.CLOSED || newStatus == oldStatus) {
1✔
2020
                return;
1✔
2021
            }
2022
            this.status = newStatus;
1✔
2023
        } finally {
2024
            statusChanged.signalAll();
1✔
2025
            statusLock.unlock();
1✔
2026
        }
2027

2028
        if (this.status == Status.DISCONNECTED) {
1✔
2029
            processConnectionEvent(Events.DISCONNECTED);
1✔
2030
        } else if (this.status == Status.CLOSED) {
1✔
2031
            processConnectionEvent(Events.CLOSED);
1✔
2032
        } else if (oldStatus == Status.RECONNECTING && this.status == Status.CONNECTED) {
1✔
2033
            processConnectionEvent(Events.RECONNECTED);
1✔
2034
        } else if (this.status == Status.CONNECTED) {
1✔
2035
            processConnectionEvent(Events.CONNECTED);
1✔
2036
        }
2037
    }
1✔
2038

2039
    boolean isClosing() {
2040
        return this.closing;
1✔
2041
    }
2042

2043
    boolean isClosed() {
2044
        return this.status == Status.CLOSED;
1✔
2045
    }
2046

2047
    boolean isConnected() {
2048
        return this.status == Status.CONNECTED;
1✔
2049
    }
2050

2051
    boolean isDisconnected() {
2052
        return this.status == Status.DISCONNECTED;
×
2053
    }
2054

2055
    boolean isConnectedOrConnecting() {
2056
        statusLock.lock();
1✔
2057
        try {
2058
            return this.status == Status.CONNECTED || this.connecting;
1✔
2059
        } finally {
2060
            statusLock.unlock();
1✔
2061
        }
2062
    }
2063

2064
    boolean isDisconnectingOrClosed() {
2065
        statusLock.lock();
1✔
2066
        try {
2067
            return this.status == Status.CLOSED || this.disconnecting;
1✔
2068
        } finally {
2069
            statusLock.unlock();
1✔
2070
        }
2071
    }
2072

2073
    boolean isDisconnecting() {
2074
        statusLock.lock();
1✔
2075
        try {
2076
            return this.disconnecting;
1✔
2077
        } finally {
2078
            statusLock.unlock();
1✔
2079
        }
2080
    }
2081

2082
    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
2083
        waitWhile(timeout, (Void) -> this.isDisconnecting() && !this.isClosed() );
1✔
2084
    }
1✔
2085

2086
    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
2087
        waitWhile(timeout, (Void) -> !this.isConnected() && !this.isClosed());
1✔
2088
    }
1✔
2089

2090
    void waitWhile(Duration timeout, Predicate<Void> waitWhileTrue) throws InterruptedException {
2091
        statusLock.lock();
1✔
2092
        try {
2093
            long currentWaitNanos = (timeout != null) ? timeout.toNanos() : -1;
1✔
2094
            long start = NatsSystemClock.nanoTime();
1✔
2095
            while (currentWaitNanos >= 0 && waitWhileTrue.test(null)) {
1✔
2096
                if (currentWaitNanos > 0) {
1✔
2097
                    if (statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS) && !waitWhileTrue.test(null)) {
1✔
2098
                        break;
1✔
2099
                    }
2100
                    long now = NatsSystemClock.nanoTime();
1✔
2101
                    currentWaitNanos = currentWaitNanos - (now - start);
1✔
2102
                    start = now;
1✔
2103

2104
                    if (currentWaitNanos <= 0) {
1✔
2105
                        break;
1✔
2106
                    }
2107
                }
1✔
2108
                else {
2109
                    statusChanged.await();
×
2110
                }
2111
            }
2112
        }
2113
        finally {
2114
            statusLock.unlock();
1✔
2115
        }
2116
    }
1✔
2117

2118
    void invokeReconnectDelayHandler(long totalRounds) {
2119
        long currentWaitNanos = 0;
1✔
2120

2121
        ReconnectDelayHandler handler = options.getReconnectDelayHandler();
1✔
2122
        if (handler == null) {
1✔
2123
            Duration dur = options.getReconnectWait();
1✔
2124
            if (dur != null) {
1✔
2125
                currentWaitNanos = dur.toNanos();
1✔
2126
                dur = serverPool.hasSecureServer() ? options.getReconnectJitterTls() : options.getReconnectJitter();
1✔
2127
                if (dur != null) {
1✔
2128
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
1✔
2129
                }
2130
            }
2131
        }
1✔
2132
        else {
2133
            Duration waitTime = handler.getWaitTime(totalRounds);
1✔
2134
            if (waitTime != null) {
1✔
2135
                currentWaitNanos = waitTime.toNanos();
1✔
2136
            }
2137
        }
2138

2139
        this.reconnectWaiter = new CompletableFuture<>();
1✔
2140

2141
        long start = NatsSystemClock.nanoTime();
1✔
2142
        while (currentWaitNanos > 0 && !isDisconnectingOrClosed() && !isConnected() && !this.reconnectWaiter.isDone()) {
1✔
2143
            try {
2144
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
×
2145
            } catch (Exception exp) {
1✔
2146
                // ignore, try to loop again
2147
            }
×
2148
            long now = NatsSystemClock.nanoTime();
1✔
2149
            currentWaitNanos = currentWaitNanos - (now - start);
1✔
2150
            start = now;
1✔
2151
        }
1✔
2152

2153
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
2154
    }
1✔
2155

2156
    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
2157
        int current = buffer.capacity();
1✔
2158
        int newSize = current * 2;
1✔
2159
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
1✔
2160
        buffer.flip();
1✔
2161
        newBuffer.put(buffer);
1✔
2162
        return newBuffer;
1✔
2163
    }
2164

2165
    // For testing
2166
    NatsConnectionReader getReader() {
2167
        return this.reader;
1✔
2168
    }
2169

2170
    // For testing
2171
    NatsConnectionWriter getWriter() {
2172
        return this.writer;
1✔
2173
    }
2174

2175
    // For testing
2176
    Future<DataPort> getDataPortFuture() {
2177
        return this.dataPortFuture;
1✔
2178
    }
2179

2180
    boolean isDraining() {
2181
        return this.draining.get() != null;
1✔
2182
    }
2183

2184
    boolean isDrained() {
2185
        CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2186

2187
        try {
2188
            if (tracker != null && tracker.getNow(false)) {
1✔
2189
                return true;
1✔
2190
            }
2191
        } catch (Exception e) {
×
2192
            // These indicate the tracker was cancelled/timed out
2193
        }
1✔
2194

2195
        return false;
1✔
2196
    }
2197

2198
    /**
2199
     * {@inheritDoc}
2200
     */
2201
    @Override
2202
    public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException {
2203

2204
        if (isClosing() || isClosed()) {
1✔
2205
            throw new IllegalStateException("A connection can't be drained during close.");
1✔
2206
        }
2207

2208
        this.statusLock.lock();
1✔
2209
        try {
2210
            if (isDraining()) {
1✔
2211
                return this.draining.get();
1✔
2212
            }
2213
            this.draining.set(new CompletableFuture<>());
1✔
2214
        } finally {
2215
            this.statusLock.unlock();
1✔
2216
        }
2217

2218
        final CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2219
        Instant start = Instant.now();
1✔
2220

2221
        // Don't include subscribers with dispatchers
2222
        HashSet<NatsSubscription> pureSubscribers = new HashSet<>(this.subscribers.values());
1✔
2223
        pureSubscribers.removeIf((s) -> s.getDispatcher() != null);
1✔
2224

2225
        final HashSet<NatsConsumer> consumers = new HashSet<>();
1✔
2226
        consumers.addAll(pureSubscribers);
1✔
2227
        consumers.addAll(this.dispatchers.values());
1✔
2228

2229
        NatsDispatcher inboxer = this.inboxDispatcher.get();
1✔
2230

2231
        if (inboxer != null) {
1✔
2232
            consumers.add(inboxer);
1✔
2233
        }
2234

2235
        // Stop the consumers NOW so that when this method returns they are blocked
2236
        consumers.forEach((cons) -> {
1✔
2237
            cons.markDraining(tracker);
1✔
2238
            cons.sendUnsubForDrain();
1✔
2239
        });
1✔
2240

2241
        try {
2242
            this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
1✔
2243
        } catch (Exception e) {
1✔
2244
            this.close(false, false);
1✔
2245
            throw e;
1✔
2246
        }
1✔
2247

2248
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
1✔
2249

2250
        // Wait for the timeout or all consumers are drained
2251
        executor.submit(() -> {
1✔
2252
            try {
2253
                long timeoutNanos = (timeout == null || timeout.toNanos() <= 0)
1✔
2254
                    ? Long.MAX_VALUE : timeout.toNanos();
1✔
2255
                long startTime = System.nanoTime();
1✔
2256
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
1✔
2257
                    consumers.removeIf(NatsConsumer::isDrained);
1✔
2258
                    if (consumers.isEmpty()) {
1✔
2259
                        break;
1✔
2260
                    }
2261
                    //noinspection BusyWait
2262
                    Thread.sleep(1); // Sleep 1 milli
1✔
2263
                }
2264

2265
                // Stop publishing
2266
                this.blockPublishForDrain.set(true);
1✔
2267

2268
                // One last flush
2269
                if (timeout == null || timeout.equals(Duration.ZERO)) {
1✔
2270
                    this.flush(Duration.ZERO);
1✔
2271
                } else {
2272
                    Instant now = Instant.now();
1✔
2273
                    Duration passed = Duration.between(start, now);
1✔
2274
                    Duration newTimeout = timeout.minus(passed);
1✔
2275
                    if (newTimeout.toNanos() > 0) {
1✔
2276
                        this.flush(newTimeout);
1✔
2277
                    }
2278
                }
2279
                this.close(false, false); // close the connection after the last flush
1✔
2280
                tracker.complete(consumers.isEmpty());
1✔
2281
            } catch (TimeoutException e) {
×
2282
                this.processException(e);
×
2283
            } catch (InterruptedException e) {
×
2284
                this.processException(e);
×
2285
                Thread.currentThread().interrupt();
×
2286
            } finally {
2287
                try {
2288
                    this.close(false, false);// close the connection after the last flush
1✔
2289
                } catch (InterruptedException e) {
×
2290
                    processException(e);
×
2291
                    Thread.currentThread().interrupt();
×
2292
                }
1✔
2293
                tracker.complete(false);
1✔
2294
            }
2295
        });
1✔
2296

2297
        return tracker;
1✔
2298
    }
2299

2300
    boolean isAuthenticationError(String err) {
2301
        if (err == null) {
1✔
2302
            return false;
1✔
2303
        }
2304
        err = err.toLowerCase();
1✔
2305
        return err.startsWith("user authentication")
1✔
2306
            || err.contains("authorization violation")
1✔
2307
            || err.startsWith("account authentication expired");
1✔
2308
    }
2309

2310
    /**
2311
     * {@inheritDoc}
2312
     */
2313
    @Override
2314
    public void flushBuffer() throws IOException {
2315
        if (!isConnected()) {
1✔
2316
            throw new IllegalStateException("Connection is not active.");
1✔
2317
        }
2318
        writer.flushBuffer();
1✔
2319
    }
1✔
2320

2321
    /**
2322
     * {@inheritDoc}
2323
     */
2324
    @Override
2325
    public StreamContext getStreamContext(String streamName) throws IOException, JetStreamApiException {
2326
        Validator.validateStreamName(streamName, true);
1✔
2327
        ensureNotClosing();
1✔
2328
        return new NatsStreamContext(streamName, null, this, null);
1✔
2329
    }
2330

2331
    /**
2332
     * {@inheritDoc}
2333
     */
2334
    @Override
2335
    public StreamContext getStreamContext(String streamName, JetStreamOptions options) throws IOException, JetStreamApiException {
2336
        Validator.validateStreamName(streamName, true);
1✔
2337
        ensureNotClosing();
1✔
2338
        return new NatsStreamContext(streamName, null, this, options);
1✔
2339
    }
2340

2341
    /**
2342
     * {@inheritDoc}
2343
     */
2344
    @Override
2345
    public ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException {
2346
        return getStreamContext(streamName).getConsumerContext(consumerName);
1✔
2347
    }
2348

2349
    /**
2350
     * {@inheritDoc}
2351
     */
2352
    @Override
2353
    public ConsumerContext getConsumerContext(String streamName, String consumerName, JetStreamOptions options) throws IOException, JetStreamApiException {
2354
        return getStreamContext(streamName, options).getConsumerContext(consumerName);
1✔
2355
    }
2356

2357
    /**
2358
     * {@inheritDoc}
2359
     */
2360
    @Override
2361
    public JetStream jetStream() throws IOException {
2362
        ensureNotClosing();
1✔
2363
        return new NatsJetStream(this, null);
1✔
2364
    }
2365

2366
    /**
2367
     * {@inheritDoc}
2368
     */
2369
    @Override
2370
    public JetStream jetStream(JetStreamOptions options) throws IOException {
2371
        ensureNotClosing();
1✔
2372
        return new NatsJetStream(this, options);
1✔
2373
    }
2374

2375
    /**
2376
     * {@inheritDoc}
2377
     */
2378
    @Override
2379
    public JetStreamManagement jetStreamManagement() throws IOException {
2380
        ensureNotClosing();
1✔
2381
        return new NatsJetStreamManagement(this, null);
1✔
2382
    }
2383

2384
    /**
2385
     * {@inheritDoc}
2386
     */
2387
    @Override
2388
    public JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
2389
        ensureNotClosing();
1✔
2390
        return new NatsJetStreamManagement(this, options);
1✔
2391
    }
2392

2393
    /**
2394
     * {@inheritDoc}
2395
     */
2396
    @Override
2397
    public KeyValue keyValue(String bucketName) throws IOException {
2398
        Validator.validateBucketName(bucketName, true);
1✔
2399
        ensureNotClosing();
1✔
2400
        return new NatsKeyValue(this, bucketName, null);
1✔
2401
    }
2402

2403
    /**
2404
     * {@inheritDoc}
2405
     */
2406
    @Override
2407
    public KeyValue keyValue(String bucketName, KeyValueOptions options) throws IOException {
2408
        Validator.validateBucketName(bucketName, true);
1✔
2409
        ensureNotClosing();
1✔
2410
        return new NatsKeyValue(this, bucketName, options);
1✔
2411
    }
2412

2413
    /**
2414
     * {@inheritDoc}
2415
     */
2416
    @Override
2417
    public KeyValueManagement keyValueManagement() throws IOException {
2418
        ensureNotClosing();
1✔
2419
        return new NatsKeyValueManagement(this, null);
1✔
2420
    }
2421

2422
    /**
2423
     * {@inheritDoc}
2424
     */
2425
    @Override
2426
    public KeyValueManagement keyValueManagement(KeyValueOptions options) throws IOException {
2427
        ensureNotClosing();
1✔
2428
        return new NatsKeyValueManagement(this, options);
1✔
2429
    }
2430

2431
    /**
2432
     * {@inheritDoc}
2433
     */
2434
    @Override
2435
    public ObjectStore objectStore(String bucketName) throws IOException {
2436
        Validator.validateBucketName(bucketName, true);
1✔
2437
        ensureNotClosing();
1✔
2438
        return new NatsObjectStore(this, bucketName, null);
1✔
2439
    }
2440

2441
    /**
2442
     * {@inheritDoc}
2443
     */
2444
    @Override
2445
    public ObjectStore objectStore(String bucketName, ObjectStoreOptions options) throws IOException {
2446
        Validator.validateBucketName(bucketName, true);
1✔
2447
        ensureNotClosing();
1✔
2448
        return new NatsObjectStore(this, bucketName, options);
1✔
2449
    }
2450

2451
    /**
2452
     * {@inheritDoc}
2453
     */
2454
    @Override
2455
    public ObjectStoreManagement objectStoreManagement() throws IOException {
2456
        ensureNotClosing();
1✔
2457
        return new NatsObjectStoreManagement(this, null);
1✔
2458
    }
2459

2460
    /**
2461
     * {@inheritDoc}
2462
     */
2463
    @Override
2464
    public ObjectStoreManagement objectStoreManagement(ObjectStoreOptions options) throws IOException {
2465
        ensureNotClosing();
1✔
2466
        return new NatsObjectStoreManagement(this, options);
1✔
2467
    }
2468

2469
    private void ensureNotClosing() throws IOException {
2470
        if (isClosing() || isClosed()) {
1✔
2471
            throw new IOException("A JetStream context can't be established during close.");
1✔
2472
        }
2473
    }
1✔
2474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc