• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2042

04 Jul 2025 01:42PM UTC coverage: 95.608% (-0.003%) from 95.611%
#2042

push

github

web-flow
Merge pull request #1348 from nats-io/fix-what-i-broke-2-21-3

Fix heartbeat timer handling broken when replacing timer with scheduler.

44 of 50 new or added lines in 7 files covered. (88.0%)

6 existing lines in 4 files now uncovered.

11798 of 12340 relevant lines covered (95.61%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java
1
// Copyright 2020-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.JetStreamApiException;
17
import io.nats.client.MessageConsumer;
18
import io.nats.client.api.ConsumerInfo;
19

20
import java.io.IOException;
21
import java.util.concurrent.atomic.AtomicBoolean;
22

23
class NatsMessageConsumerBase implements MessageConsumer {
24
    protected NatsJetStreamPullSubscription sub;
25
    protected PullMessageManager pmm;
26
    protected final AtomicBoolean stopped;
27
    protected final AtomicBoolean finished;
28
    protected ConsumerInfo cachedConsumerInfo;
29
    protected String consumerName;
30

31
    NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo) {
1✔
32
        this.cachedConsumerInfo = cachedConsumerInfo;
1✔
33
        if (cachedConsumerInfo != null) {
1✔
34
            this.consumerName = cachedConsumerInfo.getName();
1✔
35
        }
36
        this.stopped = new AtomicBoolean(false);
1✔
37
        this.finished = new AtomicBoolean(false);
1✔
38
    }
1✔
39

40
    void setConsumerName(String consumerName) {
41
        this.consumerName = consumerName;
1✔
42
    }
1✔
43

44
    void initSub(NatsJetStreamPullSubscription sub) {
45
        this.sub = sub;
1✔
46
        pmm = (PullMessageManager)sub.manager;
1✔
47
    }
1✔
48

49
    /**
50
     * {@inheritDoc}
51
     */
52
    public boolean isStopped() {
53
        return stopped.get();
×
54
    }
55

56
    /**
57
     * {@inheritDoc}
58
     */
59
    public boolean isFinished() {
60
        return finished.get();
1✔
61
    }
62

63
    /**
64
     * {@inheritDoc}
65
     */
66
    @Override
67
    public String getConsumerName() {
68
        if (consumerName == null) {
×
69
            consumerName = cachedConsumerInfo.getName();
×
70
        }
71
        return consumerName;
×
72
    }
73

74
    /**
75
     * {@inheritDoc}
76
     */
77
    @Override
78
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
79
        if (cachedConsumerInfo == null) {
1✔
80
            cachedConsumerInfo = sub.getConsumerInfo();
1✔
81
            consumerName = cachedConsumerInfo.getName();
1✔
82
        }
83
        return cachedConsumerInfo;
1✔
84
    }
85

86
    /**
87
     * {@inheritDoc}
88
     */
89
    @Override
90
    public ConsumerInfo getCachedConsumerInfo() {
91
        return cachedConsumerInfo;
×
92
    }
93

94
    /**
95
     * {@inheritDoc}
96
     */
97
    @Override
98
    public void stop() {
99
        stopped.set(true);
1✔
100
    }
1✔
101

102
    @Override
103
    public void close() throws Exception {
104
        stopped.set(true);
1✔
105
        shutdownSub();
1✔
106
    }
1✔
107

108
    protected void fullClose() {
109
        stopped.set(true);
1✔
110
        finished.set(true);
1✔
111
        shutdownSub();
1✔
112
    }
1✔
113

114
    protected void shutdownSub() {
115
        try {
116
            if (sub.isActive()) {
1✔
117
                if (sub.getNatsDispatcher() != null) {
1✔
118
                    sub.getDispatcher().unsubscribe(sub);
1✔
119
                }
120
                else {
121
                    sub.unsubscribe();
1✔
122
                }
123
            }
124
        }
125
        catch (Throwable ignore) {
×
126
            // nothing to do
127
        }
1✔
128
        if (pmm != null) {
1✔
129
            try {
130
                pmm.shutdownHeartbeatTimer();
1✔
131
            }
NEW
132
            catch (Throwable ignore) {
×
133
                // nothing to do
134
            }
1✔
135
        }
136
    }
1✔
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc