• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1997

20 Jun 2025 12:27PM UTC coverage: 95.66% (-0.03%) from 95.69%
#1997

push

github

web-flow
Merge pull request #1334 from nats-io/timers-move-to-nano

Nano time for elapsed timings and Nats System Clock

78 of 84 new or added lines in 14 files covered. (92.86%)

1 existing line in 1 file now uncovered.

11748 of 12281 relevant lines covered (95.66%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.02
/src/main/java/io/nats/client/impl/MessageManager.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17
import io.nats.client.NatsSystemClock;
18
import io.nats.client.PullRequestOptions;
19
import io.nats.client.SubscribeOptions;
20
import io.nats.client.support.NatsConstants;
21

22
import java.time.Duration;
23
import java.util.Timer;
24
import java.util.TimerTask;
25
import java.util.concurrent.atomic.AtomicBoolean;
26
import java.util.concurrent.atomic.AtomicLong;
27
import java.util.concurrent.locks.ReentrantLock;
28

29
abstract class MessageManager {
30
    public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR}
1✔
31

32
    protected static final int THRESHOLD = 3;
33

34
    protected final ReentrantLock stateChangeLock;
35
    protected final NatsConnection conn;
36
    protected final SubscribeOptions so;
37
    protected final boolean syncMode;
38

39
    protected NatsJetStreamSubscription sub; // not final it is not set until after construction
40

41
    protected long lastStreamSeq;
42
    protected long lastConsumerSeq;
43
    protected AtomicLong lastMsgReceivedNanoTime;
44

45
    // heartbeat stuff
46
    protected boolean hb;
47
    protected long idleHeartbeatSetting;
48
    protected long alarmPeriodSettingNanos;
49
    protected MmTimerTask heartbeatTimerTask;
50
    protected Timer heartbeatTimer;
51

52
    protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
1✔
53
        stateChangeLock = new ReentrantLock();
1✔
54

55
        this.conn = conn;
1✔
56
        this.so = so;
1✔
57
        this.syncMode = syncMode;
1✔
58
        lastStreamSeq = 0;
1✔
59
        lastConsumerSeq = 0;
1✔
60

61
        hb = false;
1✔
62
        idleHeartbeatSetting = 0;
1✔
63
        alarmPeriodSettingNanos = 0;
1✔
64
        lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
1✔
65
    }
1✔
66

67
    protected boolean isSyncMode()              { return syncMode; }
1✔
68
    protected long getLastStreamSequence()      { return lastStreamSeq; }
1✔
69
    protected long getLastConsumerSequence()    { return lastConsumerSeq; }
1✔
70
    protected long getLastMsgReceivedNanoTime() { return lastMsgReceivedNanoTime.get(); }
1✔
71
    protected boolean isHb()                    { return hb; }
1✔
72
    protected long getIdleHeartbeatSetting()    { return idleHeartbeatSetting; }
1✔
73
    protected long getAlarmPeriodSettingNanos() { return alarmPeriodSettingNanos; }
1✔
74

75
    protected void startup(NatsJetStreamSubscription sub) {
76
        this.sub = sub;
1✔
77
    }
1✔
78

79
    protected void shutdown() {
80
        shutdownHeartbeatTimer();
1✔
81
    }
1✔
82

83
    protected void startPullRequest(String pullSubject, PullRequestOptions pullRequestOptions, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) {
84
        // does nothing - only implemented for pulls, but in base class since instance is referenced as MessageManager, not subclass
85
    }
×
86

87
    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
88
        return true;
×
89
    }
90
    abstract protected ManageResult manage(Message msg);
91

92
    protected void trackJsMessage(Message msg) {
93
        stateChangeLock.lock();
1✔
94
        try {
95
            NatsJetStreamMetaData meta = msg.metaData();
1✔
96
            lastStreamSeq = meta.streamSequence();
1✔
97
            lastConsumerSeq++;
1✔
98
        }
99
        finally {
100
            stateChangeLock.unlock();
1✔
101
        }
102
    }
1✔
103

104
    protected void handleHeartbeatError() {
105
        conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
1✔
106
    }
1✔
107

108
    protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
109
        stateChangeLock.lock();
1✔
110
        try {
111
            idleHeartbeatSetting = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
1✔
112
            if (idleHeartbeatSetting <= 0) {
1✔
113
                alarmPeriodSettingNanos = 0;
1✔
114
                hb = false;
1✔
115
            }
116
            else {
117
                long alarmPeriodSetting;
118
                if (configMessageAlarmTime < idleHeartbeatSetting) {
1✔
119
                    alarmPeriodSetting = idleHeartbeatSetting * THRESHOLD;
1✔
120
                }
121
                else {
122
                    alarmPeriodSetting = configMessageAlarmTime;
1✔
123
                }
124
                alarmPeriodSettingNanos = alarmPeriodSetting * NatsConstants.NANOS_PER_MILLI;
1✔
125
                hb = true;
1✔
126
            }
127
        }
128
        finally {
129
            stateChangeLock.unlock();
1✔
130
        }
131
    }
1✔
132

133
    protected void updateLastMessageReceived() {
134
        lastMsgReceivedNanoTime.set(NatsSystemClock.nanoTime());
1✔
135
    }
1✔
136

137
    class MmTimerTask extends TimerTask {
138
        long alarmPeriodNanos;
139
        final AtomicBoolean alive;
140

141
        public MmTimerTask(long alarmPeriodNanos) {
1✔
142
            this.alarmPeriodNanos = alarmPeriodNanos;
1✔
143
            alive = new AtomicBoolean(true);
1✔
144
        }
1✔
145

146
        public void reuse() {
147
            alive.getAndSet(true);
1✔
148
        }
1✔
149

150
        public void shutdown() {
151
            alive.getAndSet(false);
1✔
152
        }
1✔
153

154
        @Override
155
        public void run() {
156
            if (alive.get() && !Thread.interrupted()) {
1✔
157
                long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
1✔
158
                if (alive.get() && sinceLast > alarmPeriodNanos) {
1✔
159
                    handleHeartbeatError();
1✔
160
                }
161
            }
162
        }
1✔
163

164
        @Override
165
        public String toString() {
NEW
166
            long sinceLastMillis = (NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get()) / NatsConstants.NANOS_PER_MILLI;
×
167
            return "MmTimerTask{" +
×
168
                ", alarmPeriod=" + (alarmPeriodNanos / NatsConstants.NANOS_PER_MILLI) +
NEW
169
                "ms, alive=" + alive.get() +
×
170
                ", sinceLast=" + sinceLastMillis + "ms}";
171
        }
172
    }
173

174
    protected void initOrResetHeartbeatTimer() {
175
        stateChangeLock.lock();
1✔
176
        try {
177
            if (heartbeatTimer != null) {
1✔
178
                // Same settings, just reuse the existing timer
179
                if (heartbeatTimerTask.alarmPeriodNanos == alarmPeriodSettingNanos) {
1✔
180
                    heartbeatTimerTask.reuse();
1✔
181
                    updateLastMessageReceived();
1✔
182
                    return;
1✔
183
                }
184

185
                // Replace timer since settings have changed
186
                shutdownHeartbeatTimer();
×
187
            }
188
            // replacement or new comes here
189
            heartbeatTimer = new Timer();
1✔
190
            heartbeatTimerTask = new MmTimerTask(alarmPeriodSettingNanos);
1✔
191
            long alarmPeriodSettingMillis = alarmPeriodSettingNanos / NatsConstants.NANOS_PER_MILLI;
1✔
192
            heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSettingMillis, alarmPeriodSettingMillis);
1✔
193
            updateLastMessageReceived();
1✔
194
        }
195
        finally {
196
            stateChangeLock.unlock();
1✔
197
        }
198
    }
1✔
199

200
    protected void shutdownHeartbeatTimer() {
201
        stateChangeLock.lock();
1✔
202
        try {
203
            if (heartbeatTimer != null) {
1✔
204
                heartbeatTimerTask.shutdown();
1✔
205
                heartbeatTimerTask = null;
1✔
206
                heartbeatTimer.cancel();
1✔
207
                heartbeatTimer = null;
1✔
208
            }
209
        }
210
        finally {
211
            stateChangeLock.unlock();
1✔
212
        }
213
    }
1✔
214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc