• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2014

27 Jun 2025 02:19PM UTC coverage: 95.671% (-0.006%) from 95.677%
#2014

push

github

web-flow
Merge pull request #1339 from nats-io/fix-1337

[BUG] MessageConsumer.isFinished() not set properly in certain conditions

17 of 19 new or added lines in 4 files covered. (89.47%)

2 existing lines in 2 files now uncovered.

11758 of 12290 relevant lines covered (95.67%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
1
// Copyright 2020-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.ConsumerInfo;
18

19
import java.io.IOException;
20

21
class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManagerObserver {
22
    protected final ConsumeOptions consumeOpts;
23
    protected final int thresholdMessages;
24
    protected final long thresholdBytes;
25
    protected final SimplifiedSubscriptionMaker subscriptionMaker;
26
    protected final Dispatcher userDispatcher;
27
    protected final MessageHandler userMessageHandler;
28

29
    NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
30
                        ConsumerInfo cachedConsumerInfo,
31
                        ConsumeOptions consumeOpts,
32
                        Dispatcher userDispatcher,
33
                        final MessageHandler userMessageHandler) throws IOException, JetStreamApiException
34
    {
35
        super(cachedConsumerInfo);
1✔
36

37
        this.subscriptionMaker = subscriptionMaker;
1✔
38
        this.consumeOpts = consumeOpts;
1✔
39
        this.userDispatcher = userDispatcher;
1✔
40
        this.userMessageHandler = userMessageHandler;
1✔
41

42
        int bm = consumeOpts.getBatchSize();
1✔
43
        long bb = consumeOpts.getBatchBytes();
1✔
44
        int rePullMessages = Math.max(1, bm * consumeOpts.getThresholdPercent() / 100);
1✔
45
        long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * consumeOpts.getThresholdPercent() / 100);
1✔
46
        thresholdMessages = bm - rePullMessages;
1✔
47
        thresholdBytes = bb == 0 ? Integer.MIN_VALUE : bb - rePullBytes;
1✔
48

49
        doSub();
1✔
50
    }
1✔
51

52
    @Override
53
    public void heartbeatError() {
54
        try {
55
            // just close the current sub and make another one.
56
            // this could go on endlessly - unless the user had called stop
57
            if (stopped.get()) {
1✔
NEW
58
                finishAndClose();
×
59
            }
60
            else {
61
                lenientClose();
1✔
62
                doSub();
1✔
63
            }
64
        }
65
        catch (JetStreamApiException | IOException e) {
×
66
            pmm.resetTracking();
×
67
            pmm.initOrResetHeartbeatTimer();
×
68
        }
1✔
69
    }
1✔
70

71
    void doSub() throws JetStreamApiException, IOException {
72
        MessageHandler mh = userMessageHandler == null ? null : msg -> {
1✔
73
            userMessageHandler.onMessage(msg);
1✔
74
            if (stopped.get() && pmm.noMorePending()) {
1✔
NEW
75
                finishAndClose();
×
76
            }
77
        };
1✔
78
        super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
1✔
79
        repull();
1✔
80
        stopped.set(false);
1✔
81
        finished.set(false);
1✔
82
    }
1✔
83

84
    @Override
85
    public void pendingUpdated() {
86
        if (stopped.get()) {
1✔
87
            if (pmm.noMorePending()) {
1✔
88
                finishAndClose();
1✔
89
            }
90
        }
91
        else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))
1✔
92
        {
93
            repull();
1✔
94
        }
95
    }
1✔
96

97
    private void repull() {
98
        int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
1✔
99
        long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
1✔
100
        PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
1✔
101
            .maxBytes(rePullBytes)
1✔
102
            .expiresIn(consumeOpts.getExpiresInMillis())
1✔
103
            .idleHeartbeat(consumeOpts.getIdleHeartbeat())
1✔
104
            .group(consumeOpts.getGroup())
1✔
105
            .minPending(consumeOpts.getMinPending())
1✔
106
            .minAckPending(consumeOpts.getMinAckPending())
1✔
107
            .build();
1✔
108
        sub._pull(pro, consumeOpts.raiseStatusWarnings(), this);
1✔
109
    }
1✔
110
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc