• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2231

26 Sep 2025 04:51PM UTC coverage: 95.521% (-0.001%) from 95.522%
#2231

push

github

web-flow
Merge pull request #1437 from nats-io/fix_size_in_bytes

Properly return size in bytes

3 of 7 new or added lines in 2 files covered. (42.86%)

6 existing lines in 2 files now uncovered.

12155 of 12725 relevant lines covered (95.52%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.7
/src/main/java/io/nats/client/impl/NatsConnection.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.ConnectionListener.Events;
18
import io.nats.client.api.ServerInfo;
19
import io.nats.client.support.*;
20
import org.jspecify.annotations.NonNull;
21
import org.jspecify.annotations.Nullable;
22

23
import java.io.IOException;
24
import java.net.InetAddress;
25
import java.net.URISyntaxException;
26
import java.nio.ByteBuffer;
27
import java.nio.CharBuffer;
28
import java.time.Duration;
29
import java.time.Instant;
30
import java.util.*;
31
import java.util.concurrent.*;
32
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.atomic.AtomicLong;
34
import java.util.concurrent.atomic.AtomicReference;
35
import java.util.concurrent.locks.Condition;
36
import java.util.concurrent.locks.ReentrantLock;
37
import java.util.function.Predicate;
38

39
import static io.nats.client.support.NatsConstants.*;
40
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
41
import static io.nats.client.support.Validator.*;
42
import static java.nio.charset.StandardCharsets.UTF_8;
43

44
class NatsConnection implements Connection {
45

46
    public static final double NANOS_PER_SECOND = 1_000_000_000.0;
47

48
    private final Options options;
49
    final boolean forceFlushOnRequest;
50

51
    private final StatisticsCollector statistics;
52

53
    private boolean connecting; // you can only connect in one thread
54
    private boolean disconnecting; // you can only disconnect in one thread
55
    private boolean closing; // respect a close call regardless
56
    private Exception exceptionDuringConnectChange; // exception occurred in another thread while dis/connecting
57
    final ReentrantLock closeSocketLock; // this is not private so it can be tested
58

59
    private Status status;
60
    private final ReentrantLock statusLock;
61
    private final Condition statusChanged;
62

63
    private CompletableFuture<DataPort> dataPortFuture;
64
    private DataPort dataPort;
65
    private NatsUri currentServer;
66
    private CompletableFuture<Boolean> reconnectWaiter;
67
    private final ConcurrentHashMap<NatsUri, String> serverAuthErrors;
68

69
    private NatsConnectionReader reader;
70
    private NatsConnectionWriter writer;
71

72
    private final AtomicReference<ServerInfo> serverInfo;
73

74
    private final Map<String, NatsSubscription> subscribers;
75
    private final Map<String, NatsDispatcher> dispatchers; // use a concurrent map so we get more consistent iteration behavior
76
    private final Collection<ConnectionListener> connectionListeners;
77
    private final Map<String, NatsRequestCompletableFuture> responsesAwaiting;
78
    private final Map<String, NatsRequestCompletableFuture> responsesRespondedTo;
79
    private final ConcurrentLinkedDeque<CompletableFuture<Boolean>> pongQueue;
80

81
    private final String mainInbox;
82
    private final AtomicReference<NatsDispatcher> inboxDispatcher;
83
    private final ReentrantLock inboxDispatcherLock;
84
    private ScheduledTask pingTask;
85
    private ScheduledTask cleanupTask;
86

87
    private final AtomicBoolean needPing;
88

89
    private final AtomicLong nextSid;
90
    private final NUID nuid;
91

92
    private final AtomicReference<String> connectError;
93
    private final AtomicReference<String> lastError;
94
    private final AtomicReference<CompletableFuture<Boolean>> draining;
95
    private final AtomicBoolean blockPublishForDrain;
96
    private final AtomicBoolean tryingToConnect;
97

98
    private final ExecutorService callbackRunner;
99
    private final ExecutorService executor;
100
    private final ExecutorService connectExecutor;
101
    private final ScheduledExecutorService scheduledExecutor;
102
    private final boolean advancedTracking;
103

104
    private final ServerPool serverPool;
105
    private final DispatcherFactory dispatcherFactory;
106
    private final @NonNull CancelAction cancelAction;
107

108
    private final boolean trace;
109
    private final TimeTraceLogger timeTraceLogger;
110

111
    NatsConnection(@NonNull Options options) {
1✔
112
        trace = options.isTraceConnection();
1✔
113
        timeTraceLogger = options.getTimeTraceLogger();
1✔
114
        timeTraceLogger.trace("creating connection object");
1✔
115

116
        this.options = options;
1✔
117
        forceFlushOnRequest = options.forceFlushOnRequest();
1✔
118

119
        advancedTracking = options.isTrackAdvancedStats();
1✔
120
        this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
1✔
121
        this.statistics.setAdvancedTracking(advancedTracking);
1✔
122

123
        this.closeSocketLock = new ReentrantLock();
1✔
124

125
        this.statusLock = new ReentrantLock();
1✔
126
        this.statusChanged = this.statusLock.newCondition();
1✔
127
        this.status = Status.DISCONNECTED;
1✔
128
        this.reconnectWaiter = new CompletableFuture<>();
1✔
129
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
130

131
        this.connectionListeners = ConcurrentHashMap.newKeySet();
1✔
132
        if (options.getConnectionListener() != null) {
1✔
133
            addConnectionListener(options.getConnectionListener());
1✔
134
        }
135

136
        this.dispatchers = new ConcurrentHashMap<>();
1✔
137
        this.subscribers = new ConcurrentHashMap<>();
1✔
138
        this.responsesAwaiting = new ConcurrentHashMap<>();
1✔
139
        this.responsesRespondedTo = new ConcurrentHashMap<>();
1✔
140
        this.serverAuthErrors = new ConcurrentHashMap<>();
1✔
141

142
        this.nextSid = new AtomicLong(1);
1✔
143
        timeTraceLogger.trace("creating NUID");
1✔
144
        this.nuid = new NUID();
1✔
145
        this.mainInbox = createInbox() + ".*";
1✔
146

147
        this.lastError = new AtomicReference<>();
1✔
148
        this.connectError = new AtomicReference<>();
1✔
149

150
        this.serverInfo = new AtomicReference<>(ServerInfo.EMPTY_INFO); // we want serverInfo.get to never return a null
1✔
151
        this.inboxDispatcher = new AtomicReference<>();
1✔
152
        this.inboxDispatcherLock = new ReentrantLock();
1✔
153
        this.pongQueue = new ConcurrentLinkedDeque<>();
1✔
154
        this.draining = new AtomicReference<>();
1✔
155
        this.blockPublishForDrain = new AtomicBoolean();
1✔
156
        this.tryingToConnect = new AtomicBoolean();
1✔
157

158
        timeTraceLogger.trace("creating executors");
1✔
159
        this.executor = options.getExecutor();
1✔
160
        this.callbackRunner = options.getCallbackExecutor();
1✔
161
        this.connectExecutor = options.getConnectExecutor();
1✔
162
        this.scheduledExecutor = options.getScheduledExecutor();
1✔
163

164
        timeTraceLogger.trace("creating reader and writer");
1✔
165
        this.reader = new NatsConnectionReader(this);
1✔
166
        this.writer = new NatsConnectionWriter(this, null);
1✔
167

168
        this.needPing = new AtomicBoolean(true);
1✔
169

170
        serverPool = options.getServerPool() == null ? new NatsServerPool() : options.getServerPool();
1✔
171
        serverPool.initialize(options);
1✔
172
        dispatcherFactory = options.getDispatcherFactory() == null ? new DispatcherFactory() : options.getDispatcherFactory();
1✔
173

174
        cancelAction = options.isReportNoResponders() ? CancelAction.REPORT : CancelAction.CANCEL;
1✔
175

176
        timeTraceLogger.trace("connection object created");
1✔
177
    }
1✔
178

179
    // Connect is only called after creation
180
    void connect(boolean reconnectOnConnect) throws InterruptedException, IOException {
181
        if (!tryingToConnect.get()) {
1✔
182
            try {
183
                tryingToConnect.set(true);
1✔
184
                connectImpl(reconnectOnConnect);
1✔
185
            }
186
            finally {
187
                tryingToConnect.set(false);
1✔
188
            }
189
        }
190
    }
1✔
191

192
    void connectImpl(boolean reconnectOnConnect) throws InterruptedException, IOException {
193
        if (options.getServers().isEmpty()) {
1✔
194
            throw new IllegalArgumentException("No servers provided in options");
×
195
        }
196

197
        boolean trace = options.isTraceConnection();
1✔
198
        long start = NatsSystemClock.nanoTime();
1✔
199

200
        this.lastError.set("");
1✔
201

202
        timeTraceLogger.trace("starting connect loop");
1✔
203

204
        Set<NatsUri> failList = new HashSet<>();
1✔
205
        boolean keepGoing = true;
1✔
206
        NatsUri first = null;
1✔
207
        NatsUri cur;
208
        while (keepGoing && (cur = serverPool.peekNextServer()) != null) {
1✔
209
            if (first == null) {
1✔
210
                first = cur;
1✔
211
            }
212
            else if (cur.equals(first)) {
1✔
213
                break;  // connect only goes through loop once
1✔
214
            }
215
            serverPool.nextServer(); // b/c we only peeked.
1✔
216

217
            // let server pool resolve hostnames, then loop through resolved
218
            List<NatsUri> resolvedList = resolveHost(cur);
1✔
219
            for (NatsUri resolved : resolvedList) {
1✔
220
                if (isClosed()) {
1✔
221
                    keepGoing = false;
1✔
222
                    break;
1✔
223
                }
224
                connectError.set(""); // new on each attempt
1✔
225

226
                timeTraceLogger.trace("setting status to connecting");
1✔
227
                updateStatus(Status.CONNECTING);
1✔
228

229
                timeTraceLogger.trace("trying to connect to %s", cur);
1✔
230
                tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
231

232
                if (isConnected()) {
1✔
233
                    serverPool.connectSucceeded(cur);
1✔
234
                    keepGoing = false;
1✔
235
                    break;
1✔
236
                }
237

238
                timeTraceLogger.trace("setting status to disconnected");
1✔
239
                updateStatus(Status.DISCONNECTED);
1✔
240

241
                failList.add(cur);
1✔
242
                serverPool.connectFailed(cur);
1✔
243

244
                String err = connectError.get();
1✔
245

246
                if (this.isAuthenticationError(err)) {
1✔
247
                    this.serverAuthErrors.put(resolved, err);
1✔
248
                }
249
            }
1✔
250
        }
1✔
251

252
        if (!isConnected() && !isClosed()) {
1✔
253
            if (reconnectOnConnect) {
1✔
254
                timeTraceLogger.trace("trying to reconnect on connect");
1✔
255
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
256
            }
257
            else {
258
                timeTraceLogger.trace("connection failed, closing to cleanup");
1✔
259
                close();
1✔
260

261
                String err = connectError.get();
1✔
262
                if (this.isAuthenticationError(err)) {
1✔
263
                    throw new AuthenticationException("Authentication error connecting to NATS server: " + err);
1✔
264
                }
265
                throw new IOException("Unable to connect to NATS servers: " + failList);
1✔
266
            }
267
        }
268
        else if (trace) {
1✔
269
            long end = NatsSystemClock.nanoTime();
1✔
270
            double seconds = ((double) (end - start)) / NANOS_PER_SECOND;
1✔
271
            timeTraceLogger.trace("connect complete in %.3f seconds", seconds);
1✔
272
        }
273
    }
1✔
274

275
    @Override
276
    public void forceReconnect() throws IOException, InterruptedException {
277
        forceReconnect(ForceReconnectOptions.DEFAULT_INSTANCE);
1✔
278
    }
1✔
279

280
    @Override
281
    public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
282
        if (!tryingToConnect.get()) {
1✔
283
            try {
284
                tryingToConnect.set(true);
1✔
285
                forceReconnectImpl(options == null ? ForceReconnectOptions.DEFAULT_INSTANCE : options);
1✔
286
            }
287
            finally {
288
                tryingToConnect.set(false);
1✔
289
            }
290
        }
291
    }
1✔
292

293
    void forceReconnectImpl(@NonNull ForceReconnectOptions frOpts) throws InterruptedException {
294
        if (frOpts.getFlushWait() != null) {
1✔
295
            try {
296
                flush(frOpts.getFlushWait());
×
297
            }
298
            catch (TimeoutException e) {
×
299
                // Ignored. Manual test demonstrates that if the connection is dropped
300
                // in the middle of the flush, the most likely reason for a TimeoutException,
301
                // the socket is closed.
302
            }
×
303
        }
304

305
        closeSocketLock.lock();
1✔
306
        try {
307
            updateStatus(Status.DISCONNECTED);
1✔
308

309
            // Close and reset the current data port and future
310
            if (dataPortFuture != null) {
1✔
311
                dataPortFuture.cancel(true);
1✔
312
                dataPortFuture = null;
1✔
313
            }
314

315
            // close the data port as a task so as not to block reconnecting
316
            if (dataPort != null) {
1✔
317
                final DataPort dataPortToClose = dataPort;
1✔
318
                dataPort = null;
1✔
319
                executor.submit(() -> {
1✔
320
                    try {
321
                        if (frOpts.isForceClose()) {
1✔
322
                            dataPortToClose.forceClose();
×
323
                        }
324
                        else {
325
                            dataPortToClose.close();
1✔
326
                        }
327
                    }
328
                    catch (IOException ignore) {
×
329
                        // ignored since running as a task and nothing we can do.
330
                    }
1✔
331
                });
1✔
332
            }
333

334
            // stop i/o
335
            try {
336
                this.reader.stop(false).get(100, TimeUnit.MILLISECONDS);
1✔
337
            }
338
            catch (Exception ex) {
×
339
                processException(ex);
×
340
            }
1✔
341
            try {
342
                this.writer.stop().get(100, TimeUnit.MILLISECONDS);
1✔
343
            }
344
            catch (Exception ex) {
×
345
                processException(ex);
×
346
            }
1✔
347

348
            // new reader/writer
349
            reader = new NatsConnectionReader(this);
1✔
350
            writer = new NatsConnectionWriter(this, writer);
1✔
351
        }
352
        finally {
353
            closeSocketLock.unlock();
1✔
354
        }
355

356
        reconnectImpl();
1✔
357
        writer.setReconnectMode(false);
1✔
358
    }
1✔
359

360
    void reconnect() throws InterruptedException {
361
        if (!tryingToConnect.get()) {
1✔
362
            try {
363
                tryingToConnect.set(true);
1✔
364
                reconnectImpl();
1✔
365
            }
366
            finally {
367
                tryingToConnect.set(false);
1✔
368
            }
369
        }
370
    }
1✔
371

372
    // Reconnect can only be called when the connection is disconnected
373
    void reconnectImpl() throws InterruptedException {
374
        if (isClosed()) {
1✔
375
            return;
1✔
376
        }
377

378
        if (options.getMaxReconnect() == 0) {
1✔
379
            this.close();
1✔
380
            return;
1✔
381
        }
382

383
        writer.setReconnectMode(true);
1✔
384

385
        if (!isConnected() && !isClosed() && !this.isClosing()) {
1✔
386
            boolean keepGoing = true;
1✔
387
            int totalRounds = 0;
1✔
388
            NatsUri first = null;
1✔
389
            NatsUri cur;
390
            while (keepGoing && (cur = serverPool.nextServer()) != null) {
1✔
391
                if (first == null) {
1✔
392
                    first = cur;
1✔
393
                }
394
                else if (first.equals(cur)) {
1✔
395
                    // went around the pool an entire time
396
                    invokeReconnectDelayHandler(++totalRounds);
1✔
397
                }
398

399
                // let server list provider resolve hostnames
400
                // then loop through resolved
401
                List<NatsUri> resolvedList = resolveHost(cur);
1✔
402
                for (NatsUri resolved : resolvedList) {
1✔
403
                    if (isClosed()) {
1✔
404
                        keepGoing = false;
1✔
405
                        break;
1✔
406
                    }
407
                    connectError.set(""); // reset on each loop
1✔
408
                    if (isDisconnectingOrClosed() || this.isClosing()) {
1✔
409
                        keepGoing = false;
1✔
410
                        break;
1✔
411
                    }
412
                    updateStatus(Status.RECONNECTING);
1✔
413

414
                    timeTraceLogger.trace("reconnecting to server %s", cur);
1✔
415
                    tryToConnect(cur, resolved, NatsSystemClock.nanoTime());
1✔
416

417
                    if (isConnected()) {
1✔
418
                        serverPool.connectSucceeded(cur);
1✔
419
                        statistics.incrementReconnects();
1✔
420
                        keepGoing = false;
1✔
421
                        break;
1✔
422
                    }
423

424
                    serverPool.connectFailed(cur);
1✔
425
                    String err = connectError.get();
1✔
426
                    if (this.isAuthenticationError(err)) {
1✔
427
                        if (err.equals(this.serverAuthErrors.get(resolved))) {
1✔
428
                            keepGoing = false; // double auth error
1✔
429
                            break;
1✔
430
                        }
431
                        serverAuthErrors.put(resolved, err);
1✔
432
                    }
433
                }
1✔
434
            }
1✔
435
        } // end-main-loop
436

437
        if (!isConnected()) {
1✔
438
            this.close();
1✔
439
            return;
1✔
440
        }
441

442
        this.subscribers.forEach((sid, sub) -> {
1✔
443
            if (sub.getDispatcher() == null && !sub.isDraining()) {
1✔
444
                sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
445
            }
446
        });
1✔
447

448
        this.dispatchers.forEach((nuid, d) -> {
1✔
449
            if (!d.isDraining()) {
1✔
450
                d.resendSubscriptions();
1✔
451
            }
452
        });
1✔
453

454
        try {
455
            this.flush(this.options.getConnectionTimeout());
1✔
456
        } catch (Exception exp) {
1✔
457
            this.processException(exp);
1✔
458
        }
1✔
459

460
        processConnectionEvent(Events.RESUBSCRIBED);
1✔
461

462
        // When the flush returns, we are done sending internal messages,
463
        // so we can switch to the non-reconnect queue
464
        this.writer.setReconnectMode(false);
1✔
465
    }
1✔
466

467
    long timeCheck(long endNanos, String message) throws TimeoutException {
468
        long remainingNanos = endNanos - NatsSystemClock.nanoTime();
1✔
469
        if (trace) {
1✔
470
            traceTimeCheck(message, remainingNanos);
1✔
471
        }
472
        if (remainingNanos < 0) {
1✔
473
            throw new TimeoutException("connection timed out");
1✔
474
        }
475
        return remainingNanos;
1✔
476
    }
477

478
    void traceTimeCheck(String message, long remainingNanos) {
479
        if (remainingNanos < 0) {
1✔
480
            if (remainingNanos > -1_000_000) { // less than -1 ms
1✔
481
                timeTraceLogger.trace(message + String.format(", %d (ns) beyond timeout", -remainingNanos));
1✔
482
            }
483
            else if (remainingNanos > -1_000_000_000) { // less than -1 second
1✔
484
                long ms = -remainingNanos / 1_000_000;
1✔
485
                timeTraceLogger.trace(message + String.format(", %d (ms) beyond timeout", ms));
1✔
486
            }
1✔
487
            else {
488
                double seconds = ((double)-remainingNanos) / 1_000_000_000.0;
1✔
489
                timeTraceLogger.trace(message + String.format(", %.3f (s) beyond timeout", seconds));
1✔
490
            }
1✔
491
        }
492
        else if (remainingNanos < 1_000_000) {
1✔
493
            timeTraceLogger.trace(message + String.format(", %d (ns) remaining", remainingNanos));
1✔
494
        }
495
        else if (remainingNanos < 1_000_000_000) {
1✔
496
            long ms = remainingNanos / 1_000_000;
1✔
497
            timeTraceLogger.trace(message + String.format(", %d (ms) remaining", ms));
1✔
498
        }
1✔
499
        else {
500
            double seconds = ((double) remainingNanos) / 1_000_000_000.0;
1✔
501
            timeTraceLogger.trace(message + String.format(", %.3f (s) remaining", seconds));
1✔
502
        }
503
    }
1✔
504

505
    // is called from reconnect and connect
506
    // will wait for any previous attempt to complete, using the reader.stop and
507
    // writer.stop
508
    void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
509
        currentServer = null;
1✔
510

511
        try {
512
            Duration connectTimeout = options.getConnectionTimeout();
1✔
513
            boolean trace = options.isTraceConnection();
1✔
514
            long end = now + connectTimeout.toNanos();
1✔
515
            timeCheck(end, "starting connection attempt");
1✔
516

517
            statusLock.lock();
1✔
518
            try {
519
                if (this.connecting) {
1✔
520
                    return;
×
521
                }
522
                this.connecting = true;
1✔
523
                statusChanged.signalAll();
1✔
524
            } finally {
525
                statusLock.unlock();
1✔
526
            }
527

528
            // Create a new future for the DataPort, the reader/writer will use this
529
            // to wait for the connect/failure.
530
            this.dataPortFuture = new CompletableFuture<>();
1✔
531

532
            // Make sure the reader and writer are stopped
533
            long timeoutNanos = timeCheck(end, "waiting for reader");
1✔
534
            if (reader.isRunning()) {
1✔
535
                this.reader.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
536
            }
537
            timeoutNanos = timeCheck(end, "waiting for writer");
1✔
538
            if (writer.isRunning()) {
1✔
539
                this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS);
×
540
            }
541

542
            timeCheck(end, "cleaning pong queue");
1✔
543
            cleanUpPongQueue();
1✔
544

545
            timeoutNanos = timeCheck(end, "connecting data port");
1✔
546
            DataPort newDataPort = this.options.buildDataPort();
1✔
547
            newDataPort.connect(resolved.toString(), this, timeoutNanos);
1✔
548

549
            // Notify any threads waiting on the sockets
550
            this.dataPort = newDataPort;
1✔
551
            this.dataPortFuture.complete(this.dataPort);
1✔
552

553
            // Wait for the INFO message manually.
554
            // All other traffic will use the reader and writer
555
            // TLS First, don't read info until after upgrade
556
            // ---
557
            // Also this task does not have any exception catching
558
            // Since it is submitted as an async task, the future
559
            // will be aware of any exception thrown, and the future.get()
560
            // will throw an ExecutionException which is handled futher down
561
            Callable<Object> connectTask = () -> {
1✔
562
                if (!options.isTlsFirst()) {
1✔
563
                    readInitialInfo();
1✔
564
                    checkVersionRequirements();
1✔
565
                }
566
                long start = NatsSystemClock.nanoTime();
1✔
567
                upgradeToSecureIfNeeded(resolved);
1✔
568
                if (trace && options.isTLSRequired()) {
1✔
569
                    // If the time appears too long, it might be related to
570
                    // https://github.com/nats-io/nats.java#linux-platform-note
571
                    timeTraceLogger.trace("TLS upgrade took: %.3f (s)",
×
572
                            ((double) (NatsSystemClock.nanoTime() - start)) / NANOS_PER_SECOND);
×
573
                }
574
                if (options.isTlsFirst()) {
1✔
575
                    readInitialInfo();
1✔
576
                    checkVersionRequirements();
1✔
577
                }
578
                return null;
1✔
579
            };
580

581
            timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary");
1✔
582
            Future<Object> future = this.connectExecutor.submit(connectTask);
1✔
583
            try {
584
                future.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
585
            } finally {
586
                future.cancel(true);
1✔
587
            }
588

589
            // start the reader and writer after we secured the connection, if necessary
590
            timeCheck(end, "starting reader");
1✔
591
            this.reader.start(this.dataPortFuture);
1✔
592
            timeCheck(end, "starting writer");
1✔
593
            this.writer.start(this.dataPortFuture);
1✔
594

595
            timeCheck(end, "sending connect message");
1✔
596
            this.sendConnect(resolved);
1✔
597

598
            timeoutNanos = timeCheck(end, "sending initial ping");
1✔
599
            Future<Boolean> pongFuture = sendPing();
1✔
600

601
            if (pongFuture != null) {
1✔
602
                pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
1✔
603
            }
604

605
            if (pingTask == null) {
1✔
606
                timeCheck(end, "starting ping and cleanup timers");
1✔
607
                long pingMillis = this.options.getPingInterval().toMillis();
1✔
608

609
                if (pingMillis > 0) {
1✔
610
                    pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
1✔
611
                        if (isConnected() && !isClosing()) {
1✔
612
                            try {
613
                                softPing(); // The timer always uses the standard queue
1✔
614
                            }
615
                            catch (Exception e) {
1✔
616
                                // it's running in a thread, there is no point throwing here
617
                            }
1✔
618
                        }
619
                    });
1✔
620
                }
621

622
                long cleanMillis = this.options.getRequestCleanupInterval().toMillis();
1✔
623

624
                if (cleanMillis > 0) {
1✔
625
                    cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
1✔
626
                }
627
            }
628

629
            // Set connected status
630
            timeCheck(end, "updating status to connected");
1✔
631
            statusLock.lock();
1✔
632
            try {
633
                this.connecting = false;
1✔
634

635
                if (this.exceptionDuringConnectChange != null) {
1✔
636
                    throw this.exceptionDuringConnectChange;
×
637
                }
638

639
                this.currentServer = cur;
1✔
640
                this.serverAuthErrors.clear(); // reset on successful connection
1✔
641
                updateStatus(Status.CONNECTED); // will signal status change, we also signal in finally
1✔
642
            } finally {
643
                statusLock.unlock();
1✔
644
            }
645
            timeTraceLogger.trace("status updated");
1✔
646
        } catch (Exception exp) {
1✔
647
            processException(exp);
1✔
648
            try {
649
                // allow force reconnect since this is pretty exceptional,
650
                // a connection failure while trying to connect
651
                this.closeSocket(false, true);
1✔
652
            } catch (InterruptedException e) {
×
653
                processException(e);
×
654
                Thread.currentThread().interrupt();
×
655
            }
1✔
656
        } finally {
657
            statusLock.lock();
1✔
658
            try {
659
                this.connecting = false;
1✔
660
                statusChanged.signalAll();
1✔
661
            } finally {
662
                statusLock.unlock();
1✔
663
            }
664
        }
665
    }
1✔
666

667
    void checkVersionRequirements() throws IOException {
668
        Options opts = getOptions();
1✔
669
        ServerInfo info = getServerInfo();
1✔
670

671
        if (opts.isNoEcho() && info.getProtocolVersion() < 1) {
1✔
672
            throw new IOException("Server does not support no echo.");
1✔
673
        }
674
    }
1✔
675

676
    void upgradeToSecureIfNeeded(NatsUri nuri) throws IOException {
677
        // When already communicating over "https" websocket, do NOT try to upgrade to secure.
678
        if (!nuri.isWebsocket()) {
1✔
679
            if (options.isTlsFirst()) {
1✔
680
                dataPort.upgradeToSecure();
1✔
681
            }
682
            else {
683
                // server    | client options      | result
684
                // --------- | ------------------- | --------
685
                // required  | not isTLSRequired() | mismatch
686
                // available | not isTLSRequired() | ok
687
                // neither   | not isTLSRequired() | ok
688
                // required  | isTLSRequired()     | ok
689
                // available | isTLSRequired()     | ok
690
                // neither   | isTLSRequired()     | mismatch
691
                ServerInfo serverInfo = getServerInfo();
1✔
692
                if (options.isTLSRequired()) {
1✔
693
                    if (!serverInfo.isTLSRequired() && !serverInfo.isTLSAvailable()) {
1✔
694
                        throw new IOException("SSL connection wanted by client.");
1✔
695
                    }
696
                    dataPort.upgradeToSecure();
1✔
697
                }
698
                else if (serverInfo.isTLSRequired()) {
1✔
699
                    throw new IOException("SSL required by server.");
1✔
700
                }
701
            }
702
        }
703
    }
1✔
704
    // Called from reader/writer thread
705
    void handleCommunicationIssue(Exception io) {
706
        // If we are connecting or disconnecting, note exception and leave
707
        statusLock.lock();
1✔
708
        try {
709
            if (this.connecting || this.disconnecting || this.status == Status.CLOSED || this.isDraining()) {
1✔
710
                this.exceptionDuringConnectChange = io;
1✔
711
                return;
1✔
712
            }
713
        } finally {
714
            statusLock.unlock();
1✔
715
        }
716

717
        processException(io);
1✔
718

719
        // Spawn a thread so we don't have timing issues with
720
        // waiting on read/write threads
721
        executor.submit(() -> {
1✔
722
            if (!tryingToConnect.get()) {
1✔
723
                try {
724
                    tryingToConnect.set(true);
1✔
725

726
                    // any issue that brings us here is pretty serious
727
                    // so we are comfortable forcing the close
728
                    this.closeSocket(true, true);
1✔
729
                } catch (InterruptedException e) {
×
730
                    processException(e);
×
731
                    Thread.currentThread().interrupt();
×
732
                } finally {
733
                    tryingToConnect.set(false);
1✔
734
                }
735
            }
736
        });
1✔
737
    }
1✔
738

739
    // Close socket is called when another connect attempt is possible
740
    // Close is called when the connection should shut down, period
741
    void closeSocket(boolean tryReconnectIfConnected, boolean forceClose) throws InterruptedException {
742
        // Ensure we close the socket exclusively within one thread.
743
        closeSocketLock.lock();
1✔
744
        try {
745
            boolean wasConnected;
746
            statusLock.lock();
1✔
747
            try {
748
                if (isDisconnectingOrClosed()) {
1✔
749
                    waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
750
                    return;
1✔
751
                }
752
                this.disconnecting = true;
1✔
753
                this.exceptionDuringConnectChange = null;
1✔
754
                wasConnected = (this.status == Status.CONNECTED);
1✔
755
                statusChanged.signalAll();
1✔
756
            } finally {
757
                statusLock.unlock();
1✔
758
            }
759

760
            closeSocketImpl(forceClose);
1✔
761

762
            statusLock.lock();
1✔
763
            try {
764
                updateStatus(Status.DISCONNECTED);
1✔
765
                this.exceptionDuringConnectChange = null; // Ignore IOExceptions during closeSocketImpl()
1✔
766
                this.disconnecting = false;
1✔
767
                statusChanged.signalAll();
1✔
768
            } finally {
769
                statusLock.unlock();
1✔
770
            }
771

772
            if (isClosing()) { // isClosing() means we are in the close method or were asked to be
1✔
UNCOV
773
                close();
×
774
            } else if (wasConnected && tryReconnectIfConnected) {
1✔
775
                reconnectImpl(); // call the impl here otherwise the tryingToConnect guard will block the behavior
1✔
776
            }
777
        } finally {
778
            closeSocketLock.unlock();
1✔
779
        }
780
    }
1✔
781

782
    // Close socket is called when another connect attempt is possible
783
    // Close is called when the connection should shut down, period
784
    /**
785
     * {@inheritDoc}
786
     */
787
    @Override
788
    public void close() throws InterruptedException {
789
        this.close(true, false);
1✔
790
    }
1✔
791

792
    // This method was originally built assuming there might be multiple paths to this method,
793
    // but it turns out there isn't. Not refactoring the code though, hence the warning suppression
794
    @SuppressWarnings("SameParameterValue")
795
    void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedException {
796
        statusLock.lock();
1✔
797
        try {
798
            if (checkDrainStatus && this.isDraining()) {
1✔
799
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
800
                return;
1✔
801
            }
802

803
            this.closing = true;// We were asked to close, so do it
1✔
804
            if (isDisconnectingOrClosed()) {
1✔
805
                waitForDisconnectOrClose(this.options.getConnectionTimeout());
1✔
806
                return;
1✔
807
            } else {
808
                this.disconnecting = true;
1✔
809
                this.exceptionDuringConnectChange = null;
1✔
810
                statusChanged.signalAll();
1✔
811
            }
812
        } finally {
813
            statusLock.unlock();
1✔
814
        }
815

816
        // Stop the reconnect wait timer after we stop the writer/reader (only if we are
817
        // really closing, not on errors)
818
        if (this.reconnectWaiter != null) {
1✔
819
            this.reconnectWaiter.cancel(true);
1✔
820
        }
821

822
        closeSocketImpl(forceClose);
1✔
823

824
        this.dispatchers.forEach((nuid, d) -> d.stop(false));
1✔
825

826
        this.subscribers.forEach((sid, sub) -> sub.invalidate());
1✔
827

828
        this.dispatchers.clear();
1✔
829
        this.subscribers.clear();
1✔
830

831
        if (pingTask != null) {
1✔
832
            pingTask.shutdown();
1✔
833
            pingTask = null;
1✔
834
        }
835
        if (cleanupTask != null) {
1✔
836
            cleanupTask.shutdown();
1✔
837
            cleanupTask = null;
1✔
838
        }
839

840
        cleanResponses(true);
1✔
841

842
        cleanUpPongQueue();
1✔
843

844
        statusLock.lock();
1✔
845
        try {
846
            updateStatus(Status.CLOSED); // will signal, we also signal when we stop disconnecting
1✔
847

848
            /*
849
             * if (exceptionDuringConnectChange != null) {
850
             * processException(exceptionDuringConnectChange); exceptionDuringConnectChange
851
             * = null; }
852
             */
853
        } finally {
854
            statusLock.unlock();
1✔
855
        }
856

857
        // Stop the error handling and connect executors
858
        callbackRunner.shutdown();
1✔
859
        try {
860
            // At this point in the flow, the connection is shutting down.
861
            // There is really no use in giving this information to the developer,
862
            // It's fair to say that an exception here anyway will practically never happen
863
            // and if it did, the app is probably already frozen.
864
            //noinspection ResultOfMethodCallIgnored
865
            callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
1✔
866
        } finally {
867
            callbackRunner.shutdownNow();
1✔
868
        }
869

870
        // There's no need to wait for running tasks since we're told to close
871
        connectExecutor.shutdownNow();
1✔
872

873
        // The callbackRunner and connectExecutor always come from a factory,
874
        // so we always shut them down.
875
        // The executor and scheduledExecutor come from a factory only if
876
        // the user does NOT supply them, so we shut them down in that case.
877
        if (options.executorIsInternal()) {
1✔
878
            executor.shutdownNow();
1✔
879
        }
880
        if (options.scheduledExecutorIsInternal()) {
1✔
881
            scheduledExecutor.shutdownNow();
1✔
882
        }
883

884
        statusLock.lock();
1✔
885
        try {
886
            this.disconnecting = false;
1✔
887
            statusChanged.signalAll();
1✔
888
        } finally {
889
            statusLock.unlock();
1✔
890
        }
891
    }
1✔
892

893
    boolean callbackRunnerIsShutdown() {
894
        return callbackRunner == null || callbackRunner.isShutdown();
1✔
895
    }
896

897
    boolean executorIsShutdown() {
898
        return executor == null || executor.isShutdown();
1✔
899
    }
900

901
    boolean connectExecutorIsShutdown() {
902
        return connectExecutor == null || connectExecutor.isShutdown();
1✔
903
    }
904

905
    boolean scheduledExecutorIsShutdown() {
906
        return scheduledExecutor == null || scheduledExecutor.isShutdown();
1✔
907
    }
908

909
    // Should only be called from closeSocket or close
910
    void closeSocketImpl(boolean forceClose) {
911
        this.currentServer = null;
1✔
912

913
        // Signal both to stop.
914
        final Future<Boolean> readStop = this.reader.stop();
1✔
915
        final Future<Boolean> writeStop = this.writer.stop();
1✔
916

917
        // Now wait until they both stop before closing the socket.
918
        try {
919
            readStop.get(1, TimeUnit.SECONDS);
1✔
920
        } catch (Exception ex) {
1✔
921
            //
922
        }
1✔
923
        try {
924
            writeStop.get(1, TimeUnit.SECONDS);
1✔
925
        } catch (Exception ex) {
1✔
926
            //
927
        }
1✔
928

929
        // Close and reset the current data port and future
930
        if (dataPortFuture != null) {
1✔
931
            dataPortFuture.cancel(true);
1✔
932
            dataPortFuture = null;
1✔
933
        }
934

935
        // Close the current socket and cancel anyone waiting for it
936
        try {
937
            if (dataPort != null) {
1✔
938
                if (forceClose) {
1✔
939
                    dataPort.forceClose();
1✔
940
                }
941
                else {
942
                    dataPort.close();
1✔
943
                }
944
            }
945

946
        } catch (IOException ex) {
×
947
            processException(ex);
×
948
        }
1✔
949
        cleanUpPongQueue();
1✔
950

951
        try {
952
            this.reader.stop().get(10, TimeUnit.SECONDS);
1✔
953
        } catch (Exception ex) {
×
954
            processException(ex);
×
955
        }
1✔
956
        try {
957
            this.writer.stop().get(10, TimeUnit.SECONDS);
1✔
958
        } catch (Exception ex) {
1✔
959
            processException(ex);
1✔
960
        }
1✔
961
    }
1✔
962

963
    void cleanUpPongQueue() {
964
        Future<Boolean> b;
965
        while ((b = pongQueue.poll()) != null) {
1✔
966
            b.cancel(true);
1✔
967
        }
968
    }
1✔
969

970
    /**
971
     * {@inheritDoc}
972
     */
973
    @Override
974
    public void publish(@NonNull String subject, byte @Nullable [] body) {
975
        publishInternal(subject, null, null, body, true, false);
1✔
976
    }
1✔
977

978
    /**
979
     * {@inheritDoc}
980
     */
981
    @Override
982
    public void publish(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
983
        publishInternal(subject, null, headers, body, true, false);
1✔
984
    }
1✔
985

986
    /**
987
     * {@inheritDoc}
988
     */
989
    @Override
990
    public void publish(@NonNull String subject, @Nullable String replyTo, byte @Nullable [] body) {
991
        publishInternal(subject, replyTo, null, body, true, false);
1✔
992
    }
1✔
993

994
    /**
995
     * {@inheritDoc}
996
     */
997
    @Override
998
    public void publish(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] body) {
999
        publishInternal(subject, replyTo, headers, body, true, false);
1✔
1000
    }
1✔
1001

1002
    /**
1003
     * {@inheritDoc}
1004
     */
1005
    @Override
1006
    public void publish(@NonNull Message message) {
1007
        validateNotNull(message, "Message");
1✔
1008
        publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
1✔
1009
    }
1✔
1010

1011
    void publishInternal(@NonNull String subject, @Nullable String replyTo, @Nullable Headers headers, byte @Nullable [] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
1012
        checkPayloadSize(data);
1✔
1013
        NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1014
        if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
1✔
1015
            throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
1✔
1016
        }
1017

1018
        if (isClosed()) {
1✔
1019
            throw new IllegalStateException("Connection is Closed");
1✔
1020
        } else if (blockPublishForDrain.get()) {
1✔
1021
            throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
×
1022
        }
1023

1024
        if ((status == Status.RECONNECTING || status == Status.DISCONNECTED)
1✔
1025
                && !this.writer.canQueueDuringReconnect(npm)) {
1✔
1026
            throw new IllegalStateException(
1✔
1027
                    "Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
1✔
1028
        }
1029

1030
        queueOutgoing(npm);
1✔
1031
    }
1✔
1032

1033
    private void checkPayloadSize(byte @Nullable [] body) {
1034
        if (body != null && options.clientSideLimitChecks() && body.length > this.getMaxPayload() && this.getMaxPayload() > 0) {
1✔
1035
            throw new IllegalArgumentException(
1✔
1036
                "Message payload size exceed server configuration " + body.length + " vs " + this.getMaxPayload());
1✔
1037
        }
1038
    }
1✔
1039

1040
    /**
1041
     * {@inheritDoc}
1042
     */
1043
    @Override
1044
    @NonNull
1045
    public Subscription subscribe(@NonNull String subject) {
1046
        validateSubject(subject, true);
1✔
1047
        return createSubscription(subject, null, null, null);
1✔
1048
    }
1049

1050
    /**
1051
     * {@inheritDoc}
1052
     */
1053
    @Override
1054
    @NonNull
1055
    public Subscription subscribe(@NonNull String subject, @NonNull String queueName) {
1056
        validateSubject(subject, true);
1✔
1057
        validateQueueName(queueName, true);
1✔
1058
        return createSubscription(subject, queueName, null, null);
1✔
1059
    }
1060

1061
    void invalidate(NatsSubscription sub) {
1062
        remove(sub);
1✔
1063
        sub.invalidate();
1✔
1064
    }
1✔
1065

1066
    void remove(NatsSubscription sub) {
1067
        CharSequence sid = sub.getSID();
1✔
1068
        subscribers.remove(sid);
1✔
1069

1070
        if (sub.getNatsDispatcher() != null) {
1✔
1071
            sub.getNatsDispatcher().remove(sub);
1✔
1072
        }
1073
    }
1✔
1074

1075
    void unsubscribe(NatsSubscription sub, int after) {
1076
        if (isClosed()) { // last chance, usually sub will catch this
1✔
1077
            throw new IllegalStateException("Connection is Closed");
×
1078
        }
1079

1080
        if (after <= 0) {
1✔
1081
            this.invalidate(sub); // Will clean it up
1✔
1082
        } else {
1083
            sub.setUnsubLimit(after);
1✔
1084

1085
            if (sub.reachedUnsubLimit()) {
1✔
1086
                sub.invalidate();
1✔
1087
            }
1088
        }
1089

1090
        if (!isConnected()) {
1✔
1091
            return; // We will set up sub on reconnect or ignore
1✔
1092
        }
1093

1094
        sendUnsub(sub, after);
1✔
1095
    }
1✔
1096

1097
    void sendUnsub(@NonNull NatsSubscription sub, int after) {
1098
        ByteArrayBuilder bab =
1✔
1099
            new ByteArrayBuilder().append(UNSUB_SP_BYTES).append(sub.getSID());
1✔
1100
        if (after > 0) {
1✔
1101
            bab.append(SP).append(after);
1✔
1102
        }
1103
        queueOutgoing(new ProtocolMessage(bab, true));
1✔
1104
    }
1✔
1105

1106
    // Assumes the null/empty checks were handled elsewhere
1107
    @NonNull
1108
    NatsSubscription createSubscription(@NonNull String subject,
1109
                                        @Nullable String queueName,
1110
                                        @Nullable NatsDispatcher dispatcher,
1111
                                        @Nullable NatsSubscriptionFactory factory) {
1112
        if (isClosed()) {
1✔
1113
            throw new IllegalStateException("Connection is Closed");
1✔
1114
        } else if (isDraining() && (dispatcher == null || dispatcher != this.inboxDispatcher.get())) {
1✔
1115
            throw new IllegalStateException("Connection is Draining");
1✔
1116
        }
1117

1118
        NatsSubscription sub;
1119
        String sid = getNextSid();
1✔
1120

1121
        if (factory == null) {
1✔
1122
            sub = new NatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1123
        }
1124
        else {
1125
            sub = factory.createNatsSubscription(sid, subject, queueName, this, dispatcher);
1✔
1126
        }
1127
        subscribers.put(sid, sub);
1✔
1128

1129
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1130
        return sub;
1✔
1131
    }
1132

1133
    String getNextSid() {
1134
        return Long.toString(nextSid.getAndIncrement());
1✔
1135
    }
1136

1137
    String reSubscribe(NatsSubscription sub, String subject, String queueName) {
1138
        String sid = getNextSid();
1✔
1139
        sendSubscriptionMessage(sid, subject, queueName, false);
1✔
1140
        subscribers.put(sid, sub);
1✔
1141
        return sid;
1✔
1142
    }
1143

1144
    void sendSubscriptionMessage(String sid, String subject, String queueName, boolean treatAsInternal) {
1145
        if (!isConnected()) {
1✔
1146
            return; // We will set up sub on reconnect or ignore
1✔
1147
        }
1148

1149
        ByteArrayBuilder bab = new ByteArrayBuilder(UTF_8).append(SUB_SP_BYTES).append(subject);
1✔
1150
        if (queueName != null) {
1✔
1151
            bab.append(SP).append(queueName);
1✔
1152
        }
1153
        bab.append(SP).append(sid);
1✔
1154

1155
        // setting this to filter on stop.
1156
        // if it's an "internal" message, it won't be filtered
1157
        // if it's a normal message, the subscription will already be registered
1158
        // and therefore will be re-subscribed after a stop anyway
1159
        ProtocolMessage subMsg = new ProtocolMessage(bab, true);
1✔
1160
        if (treatAsInternal) {
1✔
1161
            queueInternalOutgoing(subMsg);
1✔
1162
        } else {
1163
            queueOutgoing(subMsg);
1✔
1164
        }
1165
    }
1✔
1166

1167
    /**
1168
     * {@inheritDoc}
1169
     */
1170
    @Override
1171
    @NonNull
1172
    public String createInbox() {
1173
        return options.getInboxPrefix() + nuid.next();
1✔
1174
    }
1175

1176
    int getRespInboxLength() {
1177
        return options.getInboxPrefix().length() + 22 + 1; // 22 for nuid, 1 for .
1✔
1178
    }
1179

1180
    String createResponseInbox(String inbox) {
1181
        // Substring gets rid of the * [trailing]
1182
        return inbox.substring(0, getRespInboxLength()) + nuid.next();
1✔
1183
    }
1184

1185
    // If the inbox is long enough, pull out the end part, otherwise, just use the
1186
    // full thing
1187
    String getResponseToken(String responseInbox) {
1188
        int len = getRespInboxLength();
1✔
1189
        if (responseInbox.length() <= len) {
1✔
1190
            return responseInbox;
1✔
1191
        }
1192
        return responseInbox.substring(len);
1✔
1193
    }
1194

1195
    void cleanResponses(boolean closing) {
1196
        ArrayList<String> toRemove = new ArrayList<>();
1✔
1197
        boolean wasInterrupted = false;
1✔
1198

1199
        for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
1✔
1200
            boolean remove = false;
1✔
1201
            NatsRequestCompletableFuture future = entry.getValue();
1✔
1202
            if (future.hasExceededTimeout()) {
1✔
1203
                remove = true;
1✔
1204
                future.cancelTimedOut();
1✔
1205
            }
1206
            else if (closing) {
1✔
1207
                remove = true;
1✔
1208
                future.cancelClosing();
1✔
1209
            }
1210
            else if (future.isDone()) {
1✔
1211
                // done should have already been removed, not sure if
1212
                // this even needs checking, but it won't hurt
1213
                remove = true;
1✔
1214
                try {
1215
                    future.get();
×
1216
                }
1217
                catch (InterruptedException e) {
×
1218
                    Thread.currentThread().interrupt();
×
1219
                    // We might have collected some entries already, but were interrupted.
1220
                    // Break out so we finish as quick as possible,
1221
                    // cleanResponses will be called again anyway
1222
                    wasInterrupted = true;
×
1223
                    break;
×
1224
                }
1225
                catch (Throwable ignore) {}
1✔
1226
            }
1227

1228
            if (remove) {
1✔
1229
                toRemove.add(entry.getKey());
1✔
1230
                statistics.decrementOutstandingRequests();
1✔
1231
            }
1232
        }
1✔
1233

1234
        for (String key : toRemove) {
1✔
1235
            responsesAwaiting.remove(key);
1✔
1236
        }
1✔
1237

1238
        if (advancedTracking && !wasInterrupted) {
1✔
1239
            toRemove.clear(); // we can reuse this but it needs to be cleared
1✔
1240
            for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1✔
1241
                NatsRequestCompletableFuture future = entry.getValue();
1✔
1242
                if (future.hasExceededTimeout()) {
1✔
1243
                    toRemove.add(entry.getKey());
1✔
1244
                    future.cancelTimedOut();
1✔
1245
                }
1246
            }
1✔
1247

1248
            for (String token : toRemove) {
1✔
1249
                responsesRespondedTo.remove(token);
1✔
1250
            }
1✔
1251
        }
1252
    }
1✔
1253

1254
    /**
1255
     * {@inheritDoc}
1256
     */
1257
    @Override
1258
    @Nullable
1259
    public Message request(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
1260
        return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1261
    }
1262

1263
    /**
1264
     * {@inheritDoc}
1265
     */
1266
    @Override
1267
    @Nullable
1268
    public Message request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, @Nullable Duration timeout) throws InterruptedException {
1269
        return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1270
    }
1271

1272
    /**
1273
     * {@inheritDoc}
1274
     */
1275
    @Override
1276
    @Nullable
1277
    public Message request(@NonNull Message message, @Nullable Duration timeout) throws InterruptedException {
1278
        validateNotNull(message, "Message");
1✔
1279
        return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1280
    }
1281

1282
    @Nullable
1283
    Message requestInternal(@NonNull String subject,
1284
                            @Nullable Headers headers,
1285
                            byte @Nullable [] data,
1286
                            @Nullable Duration timeout,
1287
                            @NonNull CancelAction cancelAction,
1288
                            boolean validateSubjectAndReplyTo,
1289
                            boolean flushImmediatelyAfterPublish) throws InterruptedException
1290
    {
1291
        CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1292
        try {
1293
            if (timeout == null) {
1✔
1294
                timeout = getOptions().getConnectionTimeout();
×
1295
            }
1296
            return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
1✔
1297
        } catch (TimeoutException | ExecutionException | CancellationException e) {
1✔
1298
            return null;
1✔
1299
        }
1300
    }
1301

1302
    /**
1303
     * {@inheritDoc}
1304
     */
1305
    @Override
1306
    @NonNull
1307
    public CompletableFuture<Message> request(@NonNull String subject, byte @Nullable [] body) {
1308
        return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1309
    }
1310

1311
    /**
1312
     * {@inheritDoc}
1313
     */
1314
    @Override
1315
    @NonNull
1316
    public CompletableFuture<Message> request(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body) {
1317
        return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
1✔
1318
    }
1319

1320
    /**
1321
     * {@inheritDoc}
1322
     */
1323
    @Override
1324
    @NonNull
1325
    public CompletableFuture<Message> requestWithTimeout(@NonNull String subject, byte @Nullable [] body, @Nullable Duration timeout) {
1326
        return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1327
    }
1328

1329
    /**
1330
     * {@inheritDoc}
1331
     */
1332
    @Override
1333
    @NonNull
1334
    public CompletableFuture<Message> requestWithTimeout(@NonNull String subject, @Nullable Headers headers, byte @Nullable [] body, Duration timeout) {
1335
        return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
1✔
1336
    }
1337

1338
    /**
1339
     * {@inheritDoc}
1340
     */
1341
    @Override
1342
    @NonNull
1343
    public CompletableFuture<Message> requestWithTimeout(@NonNull Message message, @Nullable Duration timeout) {
1344
        validateNotNull(message, "Message");
1✔
1345
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
1✔
1346
    }
1347

1348
    /**
1349
     * {@inheritDoc}
1350
     */
1351
    @Override
1352
    @NonNull
1353
    public CompletableFuture<Message> request(@NonNull Message message) {
1354
        validateNotNull(message, "Message");
1✔
1355
        return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
1✔
1356
    }
1357

1358
    @NonNull
1359
    CompletableFuture<Message> requestFutureInternal(@NonNull String subject,
1360
                                                     @Nullable Headers headers,
1361
                                                     byte @Nullable [] body,
1362
                                                     @Nullable Duration futureTimeout,
1363
                                                     @NonNull CancelAction cancelAction,
1364
                                                     boolean validateSubjectAndReplyTo,
1365
                                                     boolean flushImmediatelyAfterPublish)
1366
    {
1367
        checkPayloadSize(body);
1✔
1368

1369
        if (isClosed()) {
1✔
1370
            throw new IllegalStateException("Connection is Closed");
1✔
1371
        } else if (isDraining()) {
1✔
1372
            throw new IllegalStateException("Connection is Draining");
1✔
1373
        }
1374

1375
        if (inboxDispatcher.get() == null) {
1✔
1376
            inboxDispatcherLock.lock();
1✔
1377
            try {
1378
                if (inboxDispatcher.get() == null) {
1✔
1379
                    NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1✔
1380

1381
                    // Ensure the dispatcher is started before publishing messages
1382
                    String id = this.nuid.next();
1✔
1383
                    this.dispatchers.put(id, d);
1✔
1384
                    d.start(id);
1✔
1385
                    d.subscribe(this.mainInbox);
1✔
1386
                    inboxDispatcher.set(d);
1✔
1387
                }
1388
            } finally {
1389
                inboxDispatcherLock.unlock();
1✔
1390
            }
1391
        }
1392

1393
        boolean oldStyle = options.isOldRequestStyle();
1✔
1394
        String responseInbox = oldStyle ? createInbox() : createResponseInbox(this.mainInbox);
1✔
1395
        String responseToken = getResponseToken(responseInbox);
1✔
1396
        NatsRequestCompletableFuture future =
1✔
1397
            new NatsRequestCompletableFuture(cancelAction,
1398
                futureTimeout == null ? options.getRequestCleanupInterval() : futureTimeout, options.useTimeoutException());
1✔
1399

1400
        if (!oldStyle) {
1✔
1401
            responsesAwaiting.put(responseToken, future);
1✔
1402
        }
1403
        statistics.incrementOutstandingRequests();
1✔
1404

1405
        if (oldStyle) {
1✔
1406
            NatsDispatcher dispatcher = this.inboxDispatcher.get();
1✔
1407
            NatsSubscription sub = dispatcher.subscribeReturningSubscription(responseInbox);
1✔
1408
            dispatcher.unsubscribe(responseInbox, 1);
1✔
1409
            // Unsubscribe when future is cancelled:
1410
            future.whenComplete((msg, exception) -> {
1✔
1411
                if (exception instanceof CancellationException) {
1✔
1412
                    dispatcher.unsubscribe(responseInbox);
×
1413
                }
1414
            });
1✔
1415
            responsesAwaiting.put(sub.getSID(), future);
1✔
1416
        }
1417

1418
        publishInternal(subject, responseInbox, headers, body, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
1✔
1419
        statistics.incrementRequestsSent();
1✔
1420

1421
        return future;
1✔
1422
    }
1423

1424
    void deliverReply(Message msg) {
1425
        boolean oldStyle = options.isOldRequestStyle();
1✔
1426
        String subject = msg.getSubject();
1✔
1427
        String token = getResponseToken(subject);
1✔
1428
        String key = oldStyle ? msg.getSID() : token;
1✔
1429
        NatsRequestCompletableFuture f = responsesAwaiting.remove(key);
1✔
1430
        if (f != null) {
1✔
1431
            if (advancedTracking) {
1✔
1432
                responsesRespondedTo.put(key, f);
1✔
1433
            }
1434
            statistics.decrementOutstandingRequests();
1✔
1435
            if (msg.isStatusMessage() && msg.getStatus().getCode() == 503) {
1✔
1436
                switch (f.getCancelAction()) {
1✔
1437
                    case COMPLETE:
1438
                        f.complete(msg);
1✔
1439
                        break;
1✔
1440
                    case REPORT:
1441
                        f.completeExceptionally(new JetStreamStatusException(msg.getStatus()));
1✔
1442
                        break;
1✔
1443
                    case CANCEL:
1444
                    default:
1445
                        f.cancel(true);
1✔
1446
                }
1447
            }
1448
            else {
1449
                f.complete(msg);
1✔
1450
            }
1451
            statistics.incrementRepliesReceived();
1✔
1452
        }
1453
        else if (!oldStyle && !subject.startsWith(mainInbox)) {
1✔
1454
            if (advancedTracking) {
1✔
1455
                if (responsesRespondedTo.get(key) != null) {
1✔
1456
                    statistics.incrementDuplicateRepliesReceived();
1✔
1457
                } else {
1458
                    statistics.incrementOrphanRepliesReceived();
1✔
1459
                }
1460
            }
1461
        }
1462
    }
1✔
1463

1464
    /**
1465
     * {@inheritDoc}
1466
     */
1467
    @NonNull
1468
    public Dispatcher createDispatcher() {
1469
        return createDispatcher(null);
1✔
1470
    }
1471

1472
    /**
1473
     * {@inheritDoc}
1474
     */
1475
    @NonNull
1476
    public Dispatcher createDispatcher(@Nullable MessageHandler handler) {
1477
        if (isClosed()) {
1✔
1478
            throw new IllegalStateException("Connection is Closed");
1✔
1479
        } else if (isDraining()) {
1✔
1480
            throw new IllegalStateException("Connection is Draining");
1✔
1481
        }
1482

1483
        NatsDispatcher dispatcher = dispatcherFactory.createDispatcher(this, handler);
1✔
1484
        String id = this.nuid.next();
1✔
1485
        this.dispatchers.put(id, dispatcher);
1✔
1486
        dispatcher.start(id);
1✔
1487
        return dispatcher;
1✔
1488
    }
1489

1490
    /**
1491
     * {@inheritDoc}
1492
     */
1493
    public void closeDispatcher(@NonNull Dispatcher d) {
1494
        if (isClosed()) {
1✔
1495
            throw new IllegalStateException("Connection is Closed");
1✔
1496
        }
1497
        else if (!(d instanceof NatsDispatcher)) {
1✔
1498
            throw new IllegalArgumentException("Connection can only manage its own dispatchers");
×
1499
        }
1500

1501
        NatsDispatcher nd = (NatsDispatcher) d;
1✔
1502

1503
        if (nd.isDraining()) {
1✔
1504
            return; // No op while draining
1✔
1505
        }
1506

1507
        if (!this.dispatchers.containsKey(nd.getId())) {
1✔
1508
            throw new IllegalArgumentException("Dispatcher is already closed.");
1✔
1509
        }
1510

1511
        cleanupDispatcher(nd);
1✔
1512
    }
1✔
1513

1514
    void cleanupDispatcher(NatsDispatcher nd) {
1515
        nd.stop(true);
1✔
1516
        this.dispatchers.remove(nd.getId());
1✔
1517
    }
1✔
1518

1519
    Map<String, Dispatcher> getDispatchers() {
1520
        return Collections.unmodifiableMap(dispatchers);
1✔
1521
    }
1522

1523
    /**
1524
     * {@inheritDoc}
1525
     */
1526
    public void addConnectionListener(@NonNull ConnectionListener connectionListener) {
1527
        connectionListeners.add(connectionListener);
1✔
1528
    }
1✔
1529

1530
    /**
1531
     * {@inheritDoc}
1532
     */
1533
    public void removeConnectionListener(@NonNull ConnectionListener connectionListener) {
1534
        connectionListeners.remove(connectionListener);
1✔
1535
    }
1✔
1536

1537
    /**
1538
     * {@inheritDoc}
1539
     */
1540
    public void flush(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
1541

1542
        Instant start = Instant.now();
1✔
1543
        waitForConnectOrClose(timeout);
1✔
1544

1545
        if (isClosed()) {
1✔
1546
            throw new TimeoutException("Attempted to flush while closed");
1✔
1547
        }
1548

1549
        if (timeout == null || timeout.isNegative()) {
1✔
1550
            timeout = Duration.ZERO;
1✔
1551
        }
1552

1553
        Instant now = Instant.now();
1✔
1554
        Duration waitTime = Duration.between(start, now);
1✔
1555

1556
        if (!timeout.equals(Duration.ZERO) && waitTime.compareTo(timeout) >= 0) {
1✔
1557
            throw new TimeoutException("Timeout out waiting for connection before flush.");
1✔
1558
        }
1559

1560
        try {
1561
            Future<Boolean> waitForIt = sendPing();
1✔
1562

1563
            if (waitForIt == null) { // error in the send ping code
1✔
1564
                return;
×
1565
            }
1566

1567
            long nanos = timeout.toNanos();
1✔
1568

1569
            if (nanos > 0) {
1✔
1570

1571
                nanos -= waitTime.toNanos();
1✔
1572

1573
                if (nanos <= 0) {
1✔
1574
                    nanos = 1; // let the future timeout if it isn't resolved
×
1575
                }
1576

1577
                waitForIt.get(nanos, TimeUnit.NANOSECONDS);
1✔
1578
            } else {
1579
                waitForIt.get();
1✔
1580
            }
1581

1582
            this.statistics.incrementFlushCounter();
1✔
1583
        } catch (ExecutionException | CancellationException e) {
1✔
1584
            throw new TimeoutException(e.toString());
1✔
1585
        }
1✔
1586
    }
1✔
1587

1588
    void sendConnect(NatsUri nuri) throws IOException {
1589
        try {
1590
            ServerInfo info = this.serverInfo.get();
1✔
1591
            // This is changed - we used to use info.isAuthRequired(), but are changing it to
1592
            // better match older versions of the server. It may change again in the future.
1593
            CharBuffer connectOptions = options.buildProtocolConnectOptionsString(
1✔
1594
                nuri.toString(), true, info.getNonce());
1✔
1595
            ByteArrayBuilder bab =
1✔
1596
                new ByteArrayBuilder(OP_CONNECT_SP_LEN + connectOptions.limit(), UTF_8)
1✔
1597
                    .append(CONNECT_SP_BYTES).append(connectOptions);
1✔
1598
            queueInternalOutgoing(new ProtocolMessage(bab, false));
1✔
1599
        } catch (Exception exp) {
1✔
1600
            throw new IOException("Error sending connect string", exp);
1✔
1601
        }
1✔
1602
    }
1✔
1603

1604
    CompletableFuture<Boolean> sendPing() {
1605
        return this.sendPing(true);
1✔
1606
    }
1607

1608
    void softPing() {
1609
        this.sendPing(false);
1✔
1610
    }
1✔
1611

1612
    /**
1613
     * {@inheritDoc}
1614
     */
1615
    @Override
1616
    @NonNull public Duration RTT() throws IOException {
1617
        if (!isConnected()) {
1✔
1618
            throw new IOException("Must be connected to do RTT.");
1✔
1619
        }
1620

1621
        long timeout = options.getConnectionTimeout().toMillis();
1✔
1622
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1623
        pongQueue.add(pongFuture);
1✔
1624
        try {
1625
            long time = NatsSystemClock.nanoTime();
1✔
1626
            writer.queueInternalMessage(new ProtocolMessage(PING_PROTO));
1✔
1627
            pongFuture.get(timeout, TimeUnit.MILLISECONDS);
1✔
1628
            return Duration.ofNanos(NatsSystemClock.nanoTime() - time);
1✔
1629
        }
1630
        catch (ExecutionException e) {
×
1631
            throw new IOException(e.getCause());
×
1632
        }
1633
        catch (TimeoutException e) {
×
1634
            throw new IOException(e);
×
1635
        }
1636
        catch (InterruptedException e) {
×
1637
            Thread.currentThread().interrupt();
×
1638
            throw new IOException(e);
×
1639
        }
1640
    }
1641

1642
    // Send a ping request and push a pong future on the queue.
1643
    // Futures are completed in order, keep this one if a thread wants to wait
1644
    // for a specific pong. Note, if no pong returns, the wait will not return
1645
    // without setting a timeout.
1646
    @Nullable CompletableFuture<Boolean> sendPing(boolean treatAsInternal) {
1647
        if (!isConnectedOrConnecting()) {
1✔
1648
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1649
            retVal.complete(Boolean.FALSE);
1✔
1650
            return retVal;
1✔
1651
        }
1652

1653
        if (!treatAsInternal && !this.needPing.get()) {
1✔
1654
            CompletableFuture<Boolean> retVal = new CompletableFuture<>();
1✔
1655
            retVal.complete(Boolean.TRUE);
1✔
1656
            this.needPing.set(true);
1✔
1657
            return retVal;
1✔
1658
        }
1659

1660
        int max = options.getMaxPingsOut();
1✔
1661
        if (max > 0 && pongQueue.size() + 1 > max) {
1✔
1662
            handleCommunicationIssue(new IllegalStateException("Max outgoing Ping count exceeded."));
1✔
1663
            return null;
1✔
1664
        }
1665

1666
        CompletableFuture<Boolean> pongFuture = new CompletableFuture<>();
1✔
1667
        pongQueue.add(pongFuture);
1✔
1668

1669
        if (treatAsInternal) {
1✔
1670
            queueInternalOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1671
        } else {
1672
            queueOutgoing(new ProtocolMessage(PING_PROTO));
1✔
1673
        }
1674

1675
        this.needPing.set(true);
1✔
1676
        this.statistics.incrementPingCount();
1✔
1677
        return pongFuture;
1✔
1678
    }
1679

1680
    // This is a minor speed / memory enhancement.
1681
    // We can't reuse the same instance of any NatsMessage b/c of the "NatsMessage next" state,
1682
    // but it is safe to share the data bytes and the size since those fields are just being read
1683
    // This constructor "ProtocolMessage(ProtocolMessage pm)" shares the data and size
1684
    // reducing allocation of data for something that is often created and used.
1685
    // These static instances are the ones that are used for copying in sendPing and sendPong
1686
    private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
1✔
1687
    private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
1✔
1688

1689
    void sendPong() {
1690
        queueInternalOutgoing(new ProtocolMessage(PONG_PROTO));
1✔
1691
    }
1✔
1692

1693
    // Called by the reader
1694
    void handlePong() {
1695
        CompletableFuture<Boolean> pongFuture = pongQueue.pollFirst();
1✔
1696
        if (pongFuture != null) {
1✔
1697
            pongFuture.complete(Boolean.TRUE);
1✔
1698
        }
1699
    }
1✔
1700

1701
    void readInitialInfo() throws IOException {
1702
        byte[] readBuffer = new byte[options.getBufferSize()];
1✔
1703
        ByteBuffer protocolBuffer = ByteBuffer.allocate(options.getBufferSize());
1✔
1704
        boolean gotCRLF = false;
1✔
1705
        boolean gotCR = false;
1✔
1706

1707
        while (!gotCRLF) {
1✔
1708
            int read = this.dataPort.read(readBuffer, 0, readBuffer.length);
1✔
1709

1710
            if (read < 0) {
1✔
1711
                break;
1✔
1712
            }
1713

1714
            int i = 0;
1✔
1715
            while (i < read) {
1✔
1716
                byte b = readBuffer[i++];
1✔
1717

1718
                if (gotCR) {
1✔
1719
                    if (b != LF) {
1✔
1720
                        throw new IOException("Missed LF after CR waiting for INFO.");
1✔
1721
                    } else if (i < read) {
1✔
1722
                        throw new IOException("Read past initial info message.");
1✔
1723
                    }
1724

1725
                    gotCRLF = true;
1✔
1726
                    break;
1✔
1727
                }
1728

1729
                if (b == CR) {
1✔
1730
                    gotCR = true;
1✔
1731
                } else {
1732
                    if (!protocolBuffer.hasRemaining()) {
1✔
1733
                        protocolBuffer = enlargeBuffer(protocolBuffer); // just double it
1✔
1734
                    }
1735
                    protocolBuffer.put(b);
1✔
1736
                }
1737
            }
1✔
1738
        }
1✔
1739

1740
        if (!gotCRLF) {
1✔
1741
            throw new IOException("Failed to read initial info message.");
1✔
1742
        }
1743

1744
        protocolBuffer.flip();
1✔
1745

1746
        String infoJson = UTF_8.decode(protocolBuffer).toString();
1✔
1747
        infoJson = infoJson.trim();
1✔
1748
        String[] msg = infoJson.split("\\s");
1✔
1749
        String op = msg[0].toUpperCase();
1✔
1750

1751
        if (!OP_INFO.equals(op)) {
1✔
1752
            throw new IOException("Received non-info initial message.");
1✔
1753
        }
1754

1755
        handleInfo(infoJson);
1✔
1756
    }
1✔
1757

1758
    void handleInfo(String infoJson) {
1759
        ServerInfo serverInfo = new ServerInfo(infoJson);
1✔
1760
        this.serverInfo.set(serverInfo);
1✔
1761

1762
        List<String> urls = this.serverInfo.get().getConnectURLs();
1✔
1763
        if (!urls.isEmpty()) {
1✔
1764
            if (serverPool.acceptDiscoveredUrls(urls)) {
1✔
1765
                processConnectionEvent(Events.DISCOVERED_SERVERS);
1✔
1766
            }
1767
        }
1768

1769
        if (serverInfo.isLameDuckMode()) {
1✔
1770
            processConnectionEvent(Events.LAME_DUCK);
1✔
1771
        }
1772
    }
1✔
1773

1774
    void queueOutgoing(NatsMessage msg) {
1775
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1776
            throw new IllegalArgumentException("Control line is too long");
1✔
1777
        }
1778
        if (!writer.queue(msg)) {
1✔
1779
            options.getErrorListener().messageDiscarded(this, msg);
1✔
1780
        }
1781
    }
1✔
1782

1783
    void queueInternalOutgoing(NatsMessage msg) {
1784
        if (msg.getControlLineLength() > this.options.getMaxControlLine()) {
1✔
1785
            throw new IllegalArgumentException("Control line is too long");
×
1786
        }
1787
        this.writer.queueInternalMessage(msg);
1✔
1788
    }
1✔
1789

1790
    void deliverMessage(NatsMessage msg) {
1791
        this.needPing.set(false);
1✔
1792
        this.statistics.incrementInMsgs();
1✔
1793
        this.statistics.incrementInBytes(msg.getSizeInBytes());
1✔
1794

1795
        NatsSubscription sub = subscribers.get(msg.getSID());
1✔
1796

1797
        if (sub != null) {
1✔
1798
            msg.setSubscription(sub);
1✔
1799

1800
            NatsDispatcher d = sub.getNatsDispatcher();
1✔
1801
            NatsConsumer c = (d == null) ? sub : d;
1✔
1802
            MessageQueue q = ((d == null) ? sub.getMessageQueue() : d.getMessageQueue());
1✔
1803

1804
            if (c.hasReachedPendingLimits()) {
1✔
1805
                // Drop the message and count it
1806
                this.statistics.incrementDroppedCount();
1✔
1807
                c.incrementDroppedCount();
1✔
1808

1809
                // Notify the first time
1810
                if (!c.isMarkedSlow()) {
1✔
1811
                    c.markSlow();
1✔
1812
                    processSlowConsumer(c);
1✔
1813
                }
1814
            } else if (q != null) {
1✔
1815
                c.markNotSlow();
1✔
1816

1817
                // beforeQueueProcessor returns true if the message is allowed to be queued
1818
                if (sub.getBeforeQueueProcessor().apply(msg)) {
1✔
1819
                    q.push(msg);
1✔
1820
                }
1821
            }
1822

1823
        }
1824
//        else {
1825
//            // Drop messages we don't have a subscriber for (could be extras on an
1826
//            // auto-unsub for example)
1827
//        }
1828
    }
1✔
1829

1830
    void processOK() {
1831
        this.statistics.incrementOkCount();
1✔
1832
    }
1✔
1833

1834
    void processSlowConsumer(Consumer consumer) {
1835
        if (!this.callbackRunner.isShutdown()) {
1✔
1836
            try {
1837
                this.callbackRunner.execute(() -> {
1✔
1838
                    try {
1839
                        options.getErrorListener().slowConsumerDetected(this, consumer);
1✔
1840
                    } catch (Exception ex) {
1✔
1841
                        this.statistics.incrementExceptionCount();
1✔
1842
                    }
1✔
1843
                });
1✔
1844
            } catch (RejectedExecutionException re) {
×
1845
                // Timing with shutdown, let it go
1846
            }
1✔
1847
        }
1848
    }
1✔
1849

1850
    void processException(Exception exp) {
1851
        this.statistics.incrementExceptionCount();
1✔
1852

1853
        if (!this.callbackRunner.isShutdown()) {
1✔
1854
            try {
1855
                this.callbackRunner.execute(() -> {
1✔
1856
                    try {
1857
                        options.getErrorListener().exceptionOccurred(this, exp);
1✔
1858
                    } catch (Exception ex) {
1✔
1859
                        this.statistics.incrementExceptionCount();
1✔
1860
                    }
1✔
1861
                });
1✔
1862
            } catch (RejectedExecutionException re) {
1✔
1863
                // Timing with shutdown, let it go
1864
            }
1✔
1865
        }
1866
    }
1✔
1867

1868
    void processError(String errorText) {
1869
        this.statistics.incrementErrCount();
1✔
1870

1871
        this.lastError.set(errorText);
1✔
1872
        this.connectError.set(errorText); // even if this isn't during connection, save it just in case
1✔
1873

1874
        // If we are connected && we get an authentication error, save it
1875
        if (this.isConnected() && this.isAuthenticationError(errorText) && currentServer != null) {
1✔
1876
            this.serverAuthErrors.put(currentServer, errorText);
1✔
1877
        }
1878

1879
        if (!this.callbackRunner.isShutdown()) {
1✔
1880
            try {
1881
                this.callbackRunner.execute(() -> {
1✔
1882
                    try {
1883
                        options.getErrorListener().errorOccurred(this, errorText);
1✔
1884
                    } catch (Exception ex) {
1✔
1885
                        this.statistics.incrementExceptionCount();
1✔
1886
                    }
1✔
1887
                });
1✔
1888
            } catch (RejectedExecutionException re) {
×
1889
                // Timing with shutdown, let it go
1890
            }
1✔
1891
        }
1892
    }
1✔
1893

1894
    interface ErrorListenerCaller {
1895
        void call(Connection conn, ErrorListener el);
1896
    }
1897

1898
    void executeCallback(ErrorListenerCaller elc) {
1899
        if (!this.callbackRunner.isShutdown()) {
1✔
1900
            try {
1901
                this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
1✔
1902
            } catch (RejectedExecutionException re) {
1✔
1903
                // Timing with shutdown, let it go
1904
            }
1✔
1905
        }
1906
    }
1✔
1907

1908
    void processConnectionEvent(Events type) {
1909
        if (!this.callbackRunner.isShutdown()) {
1✔
1910
            try {
1911
                for (ConnectionListener listener : connectionListeners) {
1✔
1912
                    this.callbackRunner.execute(() -> {
1✔
1913
                        try {
1914
                            listener.connectionEvent(this, type);
1✔
1915
                        } catch (Exception ex) {
1✔
1916
                            this.statistics.incrementExceptionCount();
1✔
1917
                        }
1✔
1918
                    });
1✔
1919
                }
1✔
UNCOV
1920
            } catch (RejectedExecutionException re) {
×
1921
                // Timing with shutdown, let it go
1922
            }
1✔
1923
        }
1924
    }
1✔
1925

1926
    /**
1927
     * {@inheritDoc}
1928
     */
1929
    @Override
1930
    @NonNull
1931
    public ServerInfo getServerInfo() {
1932
        return serverInfo.get();
1✔
1933
    }
1934

1935
    /**
1936
     * {@inheritDoc}
1937
     */
1938
    @Override
1939
    @Nullable
1940
    public InetAddress getClientInetAddress() {
1941
        try {
1942
            ServerInfo si = getServerInfo();
1✔
1943
            return si == ServerInfo.EMPTY_INFO ? null : NatsInetAddress.getByName(si.getClientIp());
1✔
1944
        }
1945
        catch (Exception e) {
×
1946
            return null;
×
1947
        }
1948
    }
1949

1950
    /**
1951
     * {@inheritDoc}
1952
     */
1953
    @Override
1954
    @NonNull
1955
    public Options getOptions() {
1956
        return this.options;
1✔
1957
    }
1958

1959
    /**
1960
     * {@inheritDoc}
1961
     */
1962
    @Override
1963
    @NonNull
1964
    public Statistics getStatistics() {
1965
        return this.statistics;
1✔
1966
    }
1967

1968
    StatisticsCollector getStatisticsCollector() {
1969
        return this.statistics;
1✔
1970
    }
1971

1972
    DataPort getDataPort() {
1973
        return this.dataPort;
1✔
1974
    }
1975

1976
    // Used for testing
1977
    int getConsumerCount() {
1978
        return this.subscribers.size() + this.dispatchers.size();
1✔
1979
    }
1980

1981
    /**
1982
     * {@inheritDoc}
1983
     */
1984
    @Override
1985
    public long getMaxPayload() {
1986
        ServerInfo info = this.serverInfo.get();
1✔
1987

1988
        if (info == null) {
1✔
1989
            return -1;
×
1990
        }
1991

1992
        return info.getMaxPayload();
1✔
1993
    }
1994

1995
    /**
1996
     * {@inheritDoc}
1997
     */
1998
    @Override
1999
    @NonNull
2000
    public Collection<String> getServers() {
2001
        return serverPool.getServerList();
1✔
2002
    }
2003

2004
    protected List<NatsUri> resolveHost(NatsUri nuri) {
2005
        // 1. If the nuri host is not already an ip address or the nuri is not for websocket or fast fallback is disabled,
2006
        //    let the pool resolve it.
2007
        List<NatsUri> results = new ArrayList<>();
1✔
2008
        if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && !options.isEnableFastFallback()) {
1✔
2009
            List<String> ips = serverPool.resolveHostToIps(nuri.getHost());
1✔
2010
            if (ips != null) {
1✔
2011
                for (String ip : ips) {
1✔
2012
                    try {
2013
                        results.add(nuri.reHost(ip));
1✔
2014
                    }
2015
                    catch (URISyntaxException u) {
1✔
2016
                        // ??? should never happen
2017
                    }
1✔
2018
                }
1✔
2019
            }
2020
        }
2021

2022
        // 2. If there were no results,
2023
        //    - host was already an ip address or
2024
        //    - host was for websocket or
2025
        //    - fast fallback is enabled
2026
        //    - pool returned nothing or
2027
        //    - resolving failed...
2028
        //    so the list just becomes the original host.
2029
        if (results.isEmpty()) {
1✔
2030
            results.add(nuri);
1✔
2031
        }
2032
        return results;
1✔
2033
    }
2034

2035
    /**
2036
     * {@inheritDoc}
2037
     */
2038
    @Override
2039
    @Nullable
2040
    public String getConnectedUrl() {
2041
        return currentServer == null ? null : currentServer.toString();
1✔
2042
    }
2043

2044
    /**
2045
     * {@inheritDoc}
2046
     */
2047
    @Override
2048
    @NonNull
2049
    public Status getStatus() {
2050
        return this.status;
1✔
2051
    }
2052

2053
    /**
2054
     * {@inheritDoc}
2055
     */
2056
    @Override
2057
    @Nullable
2058
    public String getLastError() {
2059
        return lastError.get();
1✔
2060
    }
2061

2062
    /**
2063
     * {@inheritDoc}
2064
     */
2065
    @Override
2066
    public void clearLastError() {
2067
        lastError.set(null);
1✔
2068
    }
1✔
2069

2070
    ExecutorService getExecutor() {
2071
        return executor;
1✔
2072
    }
2073

2074
    ScheduledExecutorService getScheduledExecutor() {
2075
        return scheduledExecutor;
1✔
2076
    }
2077

2078
    void updateStatus(Status newStatus) {
2079
        Status oldStatus = this.status;
1✔
2080

2081
        statusLock.lock();
1✔
2082
        try {
2083
            if (oldStatus == Status.CLOSED || newStatus == oldStatus) {
1✔
2084
                return;
1✔
2085
            }
2086
            this.status = newStatus;
1✔
2087
        } finally {
2088
            statusChanged.signalAll();
1✔
2089
            statusLock.unlock();
1✔
2090
        }
2091

2092
        if (this.status == Status.DISCONNECTED) {
1✔
2093
            processConnectionEvent(Events.DISCONNECTED);
1✔
2094
        } else if (this.status == Status.CLOSED) {
1✔
2095
            processConnectionEvent(Events.CLOSED);
1✔
2096
        } else if (oldStatus == Status.RECONNECTING && this.status == Status.CONNECTED) {
1✔
2097
            processConnectionEvent(Events.RECONNECTED);
1✔
2098
        } else if (this.status == Status.CONNECTED) {
1✔
2099
            processConnectionEvent(Events.CONNECTED);
1✔
2100
        }
2101
    }
1✔
2102

2103
    boolean isClosing() {
2104
        return this.closing;
1✔
2105
    }
2106

2107
    boolean isClosed() {
2108
        return this.status == Status.CLOSED;
1✔
2109
    }
2110

2111
    boolean isConnected() {
2112
        return this.status == Status.CONNECTED;
1✔
2113
    }
2114

2115
    boolean isDisconnected() {
2116
        return this.status == Status.DISCONNECTED;
×
2117
    }
2118

2119
    boolean isConnectedOrConnecting() {
2120
        statusLock.lock();
1✔
2121
        try {
2122
            return this.status == Status.CONNECTED || this.connecting;
1✔
2123
        } finally {
2124
            statusLock.unlock();
1✔
2125
        }
2126
    }
2127

2128
    boolean isDisconnectingOrClosed() {
2129
        statusLock.lock();
1✔
2130
        try {
2131
            return this.status == Status.CLOSED || this.disconnecting;
1✔
2132
        } finally {
2133
            statusLock.unlock();
1✔
2134
        }
2135
    }
2136

2137
    boolean isDisconnecting() {
2138
        statusLock.lock();
1✔
2139
        try {
2140
            return this.disconnecting;
1✔
2141
        } finally {
2142
            statusLock.unlock();
1✔
2143
        }
2144
    }
2145

2146
    void waitForDisconnectOrClose(Duration timeout) throws InterruptedException {
2147
        waitWhile(timeout, (Void) -> this.isDisconnecting() && !this.isClosed() );
1✔
2148
    }
1✔
2149

2150
    void waitForConnectOrClose(Duration timeout) throws InterruptedException {
2151
        waitWhile(timeout, (Void) -> !this.isConnected() && !this.isClosed());
1✔
2152
    }
1✔
2153

2154
    void waitWhile(Duration timeout, Predicate<Void> waitWhileTrue) throws InterruptedException {
2155
        statusLock.lock();
1✔
2156
        try {
2157
            long currentWaitNanos = (timeout != null) ? timeout.toNanos() : -1;
1✔
2158
            long start = NatsSystemClock.nanoTime();
1✔
2159
            while (currentWaitNanos >= 0 && waitWhileTrue.test(null)) {
1✔
2160
                if (currentWaitNanos > 0) {
1✔
2161
                    if (statusChanged.await(currentWaitNanos, TimeUnit.NANOSECONDS) && !waitWhileTrue.test(null)) {
1✔
2162
                        break;
1✔
2163
                    }
2164
                    long now = NatsSystemClock.nanoTime();
1✔
2165
                    currentWaitNanos = currentWaitNanos - (now - start);
1✔
2166
                    start = now;
1✔
2167

2168
                    if (currentWaitNanos <= 0) {
1✔
2169
                        break;
1✔
2170
                    }
2171
                }
1✔
2172
                else {
2173
                    statusChanged.await();
×
2174
                }
2175
            }
2176
        }
2177
        finally {
2178
            statusLock.unlock();
1✔
2179
        }
2180
    }
1✔
2181

2182
    void invokeReconnectDelayHandler(long totalRounds) {
2183
        long currentWaitNanos = 0;
1✔
2184

2185
        ReconnectDelayHandler handler = options.getReconnectDelayHandler();
1✔
2186
        if (handler == null) {
1✔
2187
            Duration dur = options.getReconnectWait();
1✔
2188
            if (dur != null) {
1✔
2189
                currentWaitNanos = dur.toNanos();
1✔
2190
                dur = serverPool.hasSecureServer() ? options.getReconnectJitterTls() : options.getReconnectJitter();
1✔
2191
                if (dur != null) {
1✔
2192
                    currentWaitNanos += ThreadLocalRandom.current().nextLong(dur.toNanos());
1✔
2193
                }
2194
            }
2195
        }
1✔
2196
        else {
2197
            Duration waitTime = handler.getWaitTime(totalRounds);
1✔
2198
            if (waitTime != null) {
1✔
2199
                currentWaitNanos = waitTime.toNanos();
1✔
2200
            }
2201
        }
2202

2203
        this.reconnectWaiter = new CompletableFuture<>();
1✔
2204

2205
        long start = NatsSystemClock.nanoTime();
1✔
2206
        while (currentWaitNanos > 0 && !isDisconnectingOrClosed() && !isConnected() && !this.reconnectWaiter.isDone()) {
1✔
2207
            try {
2208
                this.reconnectWaiter.get(currentWaitNanos, TimeUnit.NANOSECONDS);
×
2209
            } catch (Exception exp) {
1✔
2210
                // ignore, try to loop again
2211
            }
×
2212
            long now = NatsSystemClock.nanoTime();
1✔
2213
            currentWaitNanos = currentWaitNanos - (now - start);
1✔
2214
            start = now;
1✔
2215
        }
1✔
2216

2217
        this.reconnectWaiter.complete(Boolean.TRUE);
1✔
2218
    }
1✔
2219

2220
    ByteBuffer enlargeBuffer(ByteBuffer buffer) {
2221
        int current = buffer.capacity();
1✔
2222
        int newSize = current * 2;
1✔
2223
        ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
1✔
2224
        buffer.flip();
1✔
2225
        newBuffer.put(buffer);
1✔
2226
        return newBuffer;
1✔
2227
    }
2228

2229
    // For testing
2230
    NatsConnectionReader getReader() {
2231
        return this.reader;
1✔
2232
    }
2233

2234
    // For testing
2235
    NatsConnectionWriter getWriter() {
2236
        return this.writer;
1✔
2237
    }
2238

2239
    // For testing
2240
    Future<DataPort> getDataPortFuture() {
2241
        return this.dataPortFuture;
1✔
2242
    }
2243

2244
    boolean isDraining() {
2245
        return this.draining.get() != null;
1✔
2246
    }
2247

2248
    boolean isDrained() {
2249
        CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2250

2251
        try {
2252
            if (tracker != null && tracker.getNow(false)) {
1✔
2253
                return true;
1✔
2254
            }
2255
        } catch (Exception e) {
×
2256
            // These indicate the tracker was cancelled/timed out
2257
        }
1✔
2258

2259
        return false;
1✔
2260
    }
2261

2262
    /**
2263
     * {@inheritDoc}
2264
     */
2265
    @Override
2266
    @NonNull
2267
    public CompletableFuture<Boolean> drain(@Nullable Duration timeout) throws TimeoutException, InterruptedException {
2268

2269
        if (isClosing() || isClosed()) {
1✔
2270
            throw new IllegalStateException("A connection can't be drained during close.");
1✔
2271
        }
2272

2273
        this.statusLock.lock();
1✔
2274
        try {
2275
            if (isDraining()) {
1✔
2276
                return this.draining.get();
1✔
2277
            }
2278
            this.draining.set(new CompletableFuture<>());
1✔
2279
        } finally {
2280
            this.statusLock.unlock();
1✔
2281
        }
2282

2283
        final CompletableFuture<Boolean> tracker = this.draining.get();
1✔
2284
        Instant start = Instant.now();
1✔
2285

2286
        // Don't include subscribers with dispatchers
2287
        HashSet<NatsSubscription> pureSubscribers = new HashSet<>(this.subscribers.values());
1✔
2288
        pureSubscribers.removeIf((s) -> s.getDispatcher() != null);
1✔
2289

2290
        final HashSet<NatsConsumer> consumers = new HashSet<>();
1✔
2291
        consumers.addAll(pureSubscribers);
1✔
2292
        consumers.addAll(this.dispatchers.values());
1✔
2293

2294
        NatsDispatcher inboxer = this.inboxDispatcher.get();
1✔
2295

2296
        if (inboxer != null) {
1✔
2297
            consumers.add(inboxer);
1✔
2298
        }
2299

2300
        // Stop the consumers NOW so that when this method returns they are blocked
2301
        consumers.forEach((cons) -> {
1✔
2302
            cons.markDraining(tracker);
1✔
2303
            cons.sendUnsubForDrain();
1✔
2304
        });
1✔
2305

2306
        try {
2307
            this.flush(timeout); // Flush and wait up to the timeout, if this fails, let the caller know
1✔
2308
        } catch (Exception e) {
1✔
2309
            this.close(false, false);
1✔
2310
            throw e;
1✔
2311
        }
1✔
2312

2313
        consumers.forEach(NatsConsumer::markUnsubedForDrain);
1✔
2314

2315
        // Wait for the timeout or all consumers are drained
2316
        executor.submit(() -> {
1✔
2317
            try {
2318
                long timeoutNanos = (timeout == null || timeout.toNanos() <= 0)
1✔
2319
                    ? Long.MAX_VALUE : timeout.toNanos();
1✔
2320
                long startTime = System.nanoTime();
1✔
2321
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
1✔
2322
                    consumers.removeIf(NatsConsumer::isDrained);
1✔
2323
                    if (consumers.isEmpty()) {
1✔
2324
                        break;
1✔
2325
                    }
2326
                    //noinspection BusyWait
2327
                    Thread.sleep(1); // Sleep 1 milli
1✔
2328
                }
2329

2330
                // Stop publishing
2331
                this.blockPublishForDrain.set(true);
1✔
2332

2333
                // One last flush
2334
                if (timeout == null || timeout.equals(Duration.ZERO)) {
1✔
2335
                    this.flush(Duration.ZERO);
1✔
2336
                } else {
2337
                    Instant now = Instant.now();
1✔
2338
                    Duration passed = Duration.between(start, now);
1✔
2339
                    Duration newTimeout = timeout.minus(passed);
1✔
2340
                    if (newTimeout.toNanos() > 0) {
1✔
2341
                        this.flush(newTimeout);
1✔
2342
                    }
2343
                }
2344
                this.close(false, false); // close the connection after the last flush
1✔
2345
                tracker.complete(consumers.isEmpty());
1✔
2346
            } catch (TimeoutException e) {
×
2347
                this.processException(e);
×
2348
            } catch (InterruptedException e) {
×
2349
                this.processException(e);
×
2350
                Thread.currentThread().interrupt();
×
2351
            } finally {
2352
                try {
2353
                    this.close(false, false);// close the connection after the last flush
1✔
2354
                } catch (InterruptedException e) {
×
2355
                    processException(e);
×
2356
                    Thread.currentThread().interrupt();
×
2357
                }
1✔
2358
                tracker.complete(false);
1✔
2359
            }
2360
        });
1✔
2361

2362
        return tracker;
1✔
2363
    }
2364

2365
    boolean isAuthenticationError(String err) {
2366
        if (err == null) {
1✔
2367
            return false;
1✔
2368
        }
2369
        err = err.toLowerCase();
1✔
2370
        return err.startsWith("user authentication")
1✔
2371
            || err.contains("authorization violation")
1✔
2372
            || err.startsWith("account authentication expired");
1✔
2373
    }
2374

2375
    /**
2376
     * {@inheritDoc}
2377
     */
2378
    @Override
2379
    public void flushBuffer() throws IOException {
2380
        if (!isConnected()) {
1✔
2381
            throw new IllegalStateException("Connection is not active.");
1✔
2382
        }
2383
        writer.flushBuffer();
1✔
2384
    }
1✔
2385

2386
    /**
2387
     * {@inheritDoc}
2388
     */
2389
    @Override
2390
    @NonNull public StreamContext getStreamContext(@NonNull String streamName) throws IOException, JetStreamApiException {
2391
        Validator.validateStreamName(streamName, true);
1✔
2392
        ensureNotClosing();
1✔
2393
        return new NatsStreamContext(streamName, null, this, null);
1✔
2394
    }
2395

2396
    /**
2397
     * {@inheritDoc}
2398
     */
2399
    @Override
2400
    @NonNull
2401
    public StreamContext getStreamContext(@NonNull String streamName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
2402
        Validator.validateStreamName(streamName, true);
1✔
2403
        ensureNotClosing();
1✔
2404
        return new NatsStreamContext(streamName, null, this, options);
1✔
2405
    }
2406

2407
    /**
2408
     * {@inheritDoc}
2409
     */
2410
    @Override
2411
    @NonNull
2412
    public ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName) throws IOException, JetStreamApiException {
2413
        return getStreamContext(streamName).getConsumerContext(consumerName);
1✔
2414
    }
2415

2416
    /**
2417
     * {@inheritDoc}
2418
     */
2419
    @Override
2420
    @NonNull
2421
    public ConsumerContext getConsumerContext(@NonNull String streamName, @NonNull String consumerName, @Nullable JetStreamOptions options) throws IOException, JetStreamApiException {
2422
        return getStreamContext(streamName, options).getConsumerContext(consumerName);
1✔
2423
    }
2424

2425
    /**
2426
     * {@inheritDoc}
2427
     */
2428
    @Override
2429
    @NonNull
2430
    public JetStream jetStream() throws IOException {
2431
        return jetStream(null);
1✔
2432
    }
2433

2434
    /**
2435
     * {@inheritDoc}
2436
     */
2437
    @Override
2438
    @NonNull
2439
    public JetStream jetStream(JetStreamOptions options) throws IOException {
2440
        ensureNotClosing();
1✔
2441
        return new NatsJetStream(this, options);
1✔
2442
    }
2443

2444
    /**
2445
     * {@inheritDoc}
2446
     */
2447
    @Override
2448
    @NonNull
2449
    public JetStreamManagement jetStreamManagement() throws IOException {
2450
        return jetStreamManagement(null);
1✔
2451
    }
2452

2453
    /**
2454
     * {@inheritDoc}
2455
     */
2456
    @Override
2457
    @NonNull
2458
    public JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException {
2459
        ensureNotClosing();
1✔
2460
        return new NatsJetStreamManagement(this, options);
1✔
2461
    }
2462

2463
    /**
2464
     * {@inheritDoc}
2465
     */
2466
    @Override
2467
    @NonNull
2468
    public KeyValue keyValue(@NonNull String bucketName) throws IOException {
2469
        return keyValue(bucketName, null);
1✔
2470
    }
2471

2472
    /**
2473
     * {@inheritDoc}
2474
     */
2475
    @Override
2476
    @NonNull
2477
    public KeyValue keyValue(@NonNull String bucketName, @Nullable KeyValueOptions options) throws IOException {
2478
        Validator.validateBucketName(bucketName, true);
1✔
2479
        ensureNotClosing();
1✔
2480
        return new NatsKeyValue(this, bucketName, options);
1✔
2481
    }
2482

2483
    /**
2484
     * {@inheritDoc}
2485
     */
2486
    @Override
2487
    @NonNull
2488
    public KeyValueManagement keyValueManagement() throws IOException {
2489
        return keyValueManagement(null);
1✔
2490
    }
2491

2492
    /**
2493
     * {@inheritDoc}
2494
     */
2495
    @Override
2496
    @NonNull
2497
    public KeyValueManagement keyValueManagement(@Nullable KeyValueOptions options) throws IOException {
2498
        ensureNotClosing();
1✔
2499
        return new NatsKeyValueManagement(this, options);
1✔
2500
    }
2501

2502
    /**
2503
     * {@inheritDoc}
2504
     */
2505
    @Override
2506
    @NonNull
2507
    public ObjectStore objectStore(@NonNull String bucketName) throws IOException {
2508
        return objectStore(bucketName, null);
1✔
2509
    }
2510

2511
    /**
2512
     * {@inheritDoc}
2513
     */
2514
    @Override
2515
    @NonNull
2516
    public ObjectStore objectStore(@NonNull String bucketName, @Nullable ObjectStoreOptions options) throws IOException {
2517
        Validator.validateBucketName(bucketName, true);
1✔
2518
        ensureNotClosing();
1✔
2519
        return new NatsObjectStore(this, bucketName, options);
1✔
2520
    }
2521

2522
    /**
2523
     * {@inheritDoc}
2524
     */
2525
    @Override
2526
    @NonNull
2527
    public ObjectStoreManagement objectStoreManagement() throws IOException {
2528
        ensureNotClosing();
1✔
2529
        return new NatsObjectStoreManagement(this, null);
1✔
2530
    }
2531

2532
    /**
2533
     * {@inheritDoc}
2534
     */
2535
    @Override
2536
    @NonNull
2537
    public ObjectStoreManagement objectStoreManagement(@Nullable ObjectStoreOptions options) throws IOException {
2538
        ensureNotClosing();
1✔
2539
        return new NatsObjectStoreManagement(this, options);
1✔
2540
    }
2541

2542
    private void ensureNotClosing() throws IOException {
2543
        if (isClosing() || isClosed()) {
1✔
2544
            throw new IOException("A JetStream context can't be established during close.");
1✔
2545
        }
2546
    }
1✔
2547

2548
    /**
2549
     * {@inheritDoc}
2550
     */
2551
    @Override
2552
    public long outgoingPendingMessageCount() {
2553
        closeSocketLock.lock();
×
2554
        try {
2555
            return writer == null ? -1 : writer.outgoingPendingMessageCount();
×
2556
        }
2557
        finally {
2558
            closeSocketLock.unlock();
×
2559
        }
2560
    }
2561

2562
    /**
2563
     * {@inheritDoc}
2564
     */
2565
    @Override
2566
    public long outgoingPendingBytes() {
2567
        closeSocketLock.lock();
×
2568
        try {
2569
            return writer == null ? -1 : writer.outgoingPendingBytes();
×
2570
        }
2571
        finally {
2572
            closeSocketLock.unlock();
×
2573
        }
2574
    }
2575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc