• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2059

15 Jul 2025 06:40PM UTC coverage: 95.593% (-0.06%) from 95.655%
#2059

push

github

web-flow
Merge pull request #1357 from nats-io/shutdown-internal-connection-executors

[FIX] Shutdown internal executors on connection close.

19 of 19 new or added lines in 2 files covered. (100.0%)

16 existing lines in 7 files now uncovered.

11844 of 12390 relevant lines covered (95.59%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.41
/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.ForceReconnectOptions;
17
import io.nats.client.NatsSystemClock;
18
import io.nats.client.Options;
19
import io.nats.client.support.NatsUri;
20
import io.nats.client.support.ScheduledTask;
21

22
import java.io.IOException;
23
import java.util.concurrent.atomic.AtomicLong;
24

25
/**
26
 * This class is not thread-safe.  Caller must ensure thread safety.
27
 */
28
public class SocketDataPortWithWriteTimeout extends SocketDataPort {
29

30
    private long writeTimeoutNanos;
31
    private long delayPeriodMillis;
32
    private ScheduledTask writeWatchTask;
33
    private final AtomicLong writeMustBeDoneBy;
34

35
    public SocketDataPortWithWriteTimeout() {
1✔
36
        writeMustBeDoneBy = new AtomicLong(Long.MAX_VALUE);
1✔
37
    }
1✔
38

39
    @Override
40
    public void afterConstruct(Options options) {
41
        super.afterConstruct(options);
1✔
42
        long writeTimeoutMillis;
43
        if (options.getSocketWriteTimeout() == null) {
1✔
44
            writeTimeoutMillis = Options.DEFAULT_SOCKET_WRITE_TIMEOUT.toMillis();
×
45
        }
46
        else {
47
            writeTimeoutMillis = options.getSocketWriteTimeout().toMillis();
1✔
48
        }
49
        delayPeriodMillis = writeTimeoutMillis * 51 / 100;
1✔
50
        writeTimeoutNanos = writeTimeoutMillis * 1_000_000;
1✔
51
    }
1✔
52

53
    @Override
54
    public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws IOException {
55
        super.connect(conn, nuri, timeoutNanos);
1✔
56
        writeWatchTask = new ScheduledTask(conn.getScheduledExecutor(), delayPeriodMillis,
1✔
57
            () -> {
58
                //  if now is after when it was supposed to be done by
59
                if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) {
1✔
UNCOV
60
                    writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed
×
UNCOV
61
                    connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
×
62
                    try {
UNCOV
63
                        connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE);
×
64
                    }
65
                    catch (IOException e) {
×
66
                        // retry maybe?
67
                    }
68
                    catch (InterruptedException e) {
×
69
                        Thread.currentThread().interrupt();
×
70
                        // This task is going to re-run anyway, so no point in throwing
UNCOV
71
                    }
×
72
                }
73
            });
1✔
74
    }
1✔
75

76
    public void write(byte[] src, int toWrite) throws IOException {
77
        writeMustBeDoneBy.set(NatsSystemClock.nanoTime() + writeTimeoutNanos);
1✔
78
        out.write(src, 0, toWrite);
1✔
79
        writeMustBeDoneBy.set(Long.MAX_VALUE);
1✔
80
    }
1✔
81

82
    public void close() throws IOException {
83
        writeWatchTask.shutdown();
1✔
84
        super.close();
1✔
85
    }
1✔
86
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc