• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2101

12 Aug 2025 11:26AM UTC coverage: 95.457% (+0.02%) from 95.433%
#2101

push

github

web-flow
Merge pull request #1387 from nats-io/info-nullability

Ensuring nullability contracts

92 of 92 new or added lines in 10 files covered. (100.0%)

108 existing lines in 12 files now uncovered.

11913 of 12480 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.41
/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.ForceReconnectOptions;
17
import io.nats.client.NatsSystemClock;
18
import io.nats.client.Options;
19
import io.nats.client.support.NatsUri;
20
import io.nats.client.support.ScheduledTask;
21
import org.jspecify.annotations.NonNull;
22

23
import java.io.IOException;
24
import java.util.concurrent.atomic.AtomicLong;
25

26
/**
27
 * This class is not thread-safe.  Caller must ensure thread safety.
28
 */
29
@SuppressWarnings("ClassEscapesDefinedScope") // NatsConnection
30
public class SocketDataPortWithWriteTimeout extends SocketDataPort {
31

32
    private long writeTimeoutNanos;
33
    private long delayPeriodMillis;
34
    private ScheduledTask writeWatchTask;
35
    private final AtomicLong writeMustBeDoneBy;
36

37
    public SocketDataPortWithWriteTimeout() {
1✔
38
        writeMustBeDoneBy = new AtomicLong(Long.MAX_VALUE);
1✔
39
    }
1✔
40

41
    @Override
42
    public void afterConstruct(Options options) {
43
        super.afterConstruct(options);
1✔
44
        long writeTimeoutMillis;
45
        if (options.getSocketWriteTimeout() == null) {
1✔
UNCOV
46
            writeTimeoutMillis = Options.DEFAULT_SOCKET_WRITE_TIMEOUT.toMillis();
×
47
        }
48
        else {
49
            writeTimeoutMillis = options.getSocketWriteTimeout().toMillis();
1✔
50
        }
51
        delayPeriodMillis = writeTimeoutMillis * 51 / 100;
1✔
52
        writeTimeoutNanos = writeTimeoutMillis * 1_000_000;
1✔
53
    }
1✔
54

55
    @Override
56
    public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long timeoutNanos) throws IOException {
57
        super.connect(conn, nuri, timeoutNanos);
1✔
58
        writeWatchTask = new ScheduledTask(conn.getScheduledExecutor(), delayPeriodMillis,
1✔
59
            () -> {
60
                //  if now is after when it was supposed to be done by
61
                if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) {
1✔
UNCOV
62
                    writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed
×
63
                    connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
×
64
                    try {
65
                        connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE);
×
66
                    }
UNCOV
67
                    catch (IOException e) {
×
68
                        // retry maybe?
69
                    }
UNCOV
70
                    catch (InterruptedException e) {
×
71
                        Thread.currentThread().interrupt();
×
72
                        // This task is going to re-run anyway, so no point in throwing
UNCOV
73
                    }
×
74
                }
75
            });
1✔
76
    }
1✔
77

78
    public void write(byte[] src, int toWrite) throws IOException {
79
        writeMustBeDoneBy.set(NatsSystemClock.nanoTime() + writeTimeoutNanos);
1✔
80
        out.write(src, 0, toWrite);
1✔
81
        writeMustBeDoneBy.set(Long.MAX_VALUE);
1✔
82
    }
1✔
83

84
    public void close() throws IOException {
85
        writeWatchTask.shutdown();
1✔
86
        super.close();
1✔
87
    }
1✔
88
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc