• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2238

29 Sep 2025 04:41PM UTC coverage: 95.464% (-0.06%) from 95.521%
#2238

push

github

web-flow
Merge pull request #1438 from nats-io/improve-connection-listener

Improve ConnectionListener

52 of 64 new or added lines in 2 files covered. (81.25%)

7 existing lines in 3 files now uncovered.

12164 of 12742 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
1
// Copyright 2020-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.ConsumerInfo;
18

19
import java.io.IOException;
20

21
class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManagerObserver {
22
    protected final ConsumeOptions consumeOpts;
23
    protected final int thresholdMessages;
24
    protected final long thresholdBytes;
25
    protected final SimplifiedSubscriptionMaker subscriptionMaker;
26
    protected final Dispatcher userDispatcher;
27
    protected final MessageHandler userMessageHandler;
28

29
    NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
30
                        ConsumerInfo cachedConsumerInfo,
31
                        ConsumeOptions consumeOpts,
32
                        Dispatcher userDispatcher,
33
                        final MessageHandler userMessageHandler) throws IOException, JetStreamApiException
34
    {
35
        super(cachedConsumerInfo);
1✔
36

37
        this.subscriptionMaker = subscriptionMaker;
1✔
38
        this.consumeOpts = consumeOpts;
1✔
39
        this.userDispatcher = userDispatcher;
1✔
40
        this.userMessageHandler = userMessageHandler;
1✔
41

42
        int bm = consumeOpts.getBatchSize();
1✔
43
        long bb = consumeOpts.getBatchBytes();
1✔
44
        int rePullMessages = Math.max(1, bm * consumeOpts.getThresholdPercent() / 100);
1✔
45
        long rePullBytes = bb == 0 ? 0 : Math.max(1, bb * consumeOpts.getThresholdPercent() / 100);
1✔
46
        thresholdMessages = bm - rePullMessages;
1✔
47
        thresholdBytes = bb == 0 ? Integer.MIN_VALUE : bb - rePullBytes;
1✔
48

49
        doSub(true);
1✔
50
    }
1✔
51

52
    @Override
53
    public void heartbeatError() {
54
        try {
55
            if (stopped.get()) {
1✔
56
                fullClose();
×
57
            }
58
            else {
59
                shutdownSub();
1✔
60
                doSub(false);
1✔
61
            }
62
        }
63
        catch (JetStreamApiException | IOException e) {
×
64
            setupHbAlarmToTrigger();
×
65
        }
1✔
66
    }
1✔
67

68
    void doSub(boolean first) throws JetStreamApiException, IOException {
69
        MessageHandler mh = userMessageHandler == null ? null : msg -> {
1✔
70
            userMessageHandler.onMessage(msg);
1✔
71
            if (stopped.get() && pmm.noMorePending()) {
1✔
UNCOV
72
                finished.set(true);
×
73
            }
74
        };
1✔
75
        try {
76
            stopped.set(false);
1✔
77
            finished.set(false);
1✔
78
            super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null), !first);
1✔
79
            repull();
1✔
80
        }
81
        catch (JetStreamApiException | IOException e) {
1✔
82
            setupHbAlarmToTrigger();
1✔
83
        }
1✔
84
    }
1✔
85

86
    private void setupHbAlarmToTrigger() {
87
        pmm.resetTracking();
1✔
88
        pmm.initOrResetHeartbeatTimer();
1✔
89
    }
1✔
90

91
    @Override
92
    public void pendingUpdated() {
93
        if (stopped.get()) {
1✔
94
            if (pmm.noMorePending()) {
1✔
95
                fullClose();
1✔
96
            }
97
        }
98
        else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))
1✔
99
        {
100
            repull();
1✔
101
        }
102
    }
1✔
103

104
    private void repull() {
105
        int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
1✔
106
        long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
1✔
107
// TODO - PINNED CONSUMER SUPPORT
108
//        PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
109
        PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
1✔
110
            .maxBytes(rePullBytes)
1✔
111
            .expiresIn(consumeOpts.getExpiresInMillis())
1✔
112
            .idleHeartbeat(consumeOpts.getIdleHeartbeat())
1✔
113
            .group(consumeOpts.getGroup())
1✔
114
            .priority(consumeOpts.getPriority())
1✔
115
            .minPending(consumeOpts.getMinPending())
1✔
116
            .minAckPending(consumeOpts.getMinAckPending())
1✔
117
            .build();
1✔
118
        sub._pull(pro, consumeOpts.raiseStatusWarnings(), this);
1✔
119
    }
1✔
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc