• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2017

27 Jun 2025 04:11PM UTC coverage: 95.598% (-0.07%) from 95.671%
#2017

push

github

web-flow
Merge pull request #1340 from nats-io/flapper-test-overflow-fetch

Fix flapping test: testOverflowFetch

11749 of 12290 relevant lines covered (95.6%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.3
/src/main/java/io/nats/client/impl/PullMessageManager.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17
import io.nats.client.PullRequestOptions;
18
import io.nats.client.SubscribeOptions;
19
import io.nats.client.support.Status;
20

21
import static io.nats.client.impl.MessageManager.ManageResult.*;
22
import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_BYTES;
23
import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_MESSAGES;
24
import static io.nats.client.support.Status.*;
25

26
class PullMessageManager extends MessageManager {
27

28
    protected int pendingMessages;
29
    protected long pendingBytes;
30
    protected boolean trackingBytes;
31
    protected boolean raiseStatusWarnings;
32
    protected PullManagerObserver pullManagerObserver;
33

34
    protected PullMessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
35
        super(conn, so, syncMode);
1✔
36
        resetTracking();
1✔
37
    }
1✔
38

39
    @Override
40
    protected void startup(NatsJetStreamSubscription sub) {
41
        super.startup(sub);
1✔
42
        sub.setBeforeQueueProcessor(this::beforeQueueProcessorImpl);
1✔
43
    }
1✔
44

45
    @Override
46
    protected void startPullRequest(String pullSubject, PullRequestOptions pro, boolean raiseStatusWarnings, PullManagerObserver pullManagerObserver) {
47
        stateChangeLock.lock();
1✔
48
        try {
49
            this.raiseStatusWarnings = raiseStatusWarnings;
1✔
50
            this.pullManagerObserver = pullManagerObserver;
1✔
51
            pendingMessages += pro.getBatchSize();
1✔
52
            pendingBytes += pro.getMaxBytes();
1✔
53
            trackingBytes = (pendingBytes > 0);
1✔
54
            configureIdleHeartbeat(pro.getIdleHeartbeat(), -1);
1✔
55
            if (hb) {
1✔
56
                initOrResetHeartbeatTimer();
1✔
57
            }
58
            else {
59
                shutdownHeartbeatTimer(); // just in case the pull was changed from hb to non-hb
1✔
60
            }
61
        }
62
        finally {
63
            stateChangeLock.unlock();
1✔
64
        }
65
    }
1✔
66

67
    @Override
68
    protected void handleHeartbeatError() {
69
        super.handleHeartbeatError();
1✔
70
        resetTracking();
1✔
71
        if (pullManagerObserver != null) {
1✔
72
            pullManagerObserver.heartbeatError();
×
73
        }
74
    }
1✔
75

76
    private void trackIncoming(int m, long b) {
77
        stateChangeLock.lock();
1✔
78
        try {
79
            // message time used for heartbeat tracking
80
            updateLastMessageReceived();
1✔
81

82
            if (m != Integer.MIN_VALUE) {
1✔
83
                pendingMessages -= m;
1✔
84
                boolean zero = pendingMessages < 1;
1✔
85
                if (trackingBytes) {
1✔
86
                    pendingBytes -= b;
1✔
87
                    zero |= pendingBytes < 1;
1✔
88
                }
89
                if (zero) {
1✔
90
                    resetTracking();
1✔
91
                }
92
                if (pullManagerObserver != null) {
1✔
93
                    pullManagerObserver.pendingUpdated();
1✔
94
                }
95
            }
96
        }
97
        finally {
98
            stateChangeLock.unlock();
1✔
99
        }
100
    }
1✔
101

102
    protected void resetTracking() {
103
        pendingMessages = 0;
1✔
104
        pendingBytes = 0;
1✔
105
        trackingBytes = false;
1✔
106
        updateLastMessageReceived();
1✔
107
    }
1✔
108

109
    @Override
110
    protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
111
        Status status = msg.getStatus();
1✔
112

113
        // normal js message
114
        if (status == null) {
1✔
115
            trackIncoming(1, msg.consumeByteCount());
1✔
116
            return true;
1✔
117
        }
118

119
        // heartbeat just needed to be recorded
120
        if (status.isHeartbeat()) {
1✔
121
            updateLastMessageReceived(); // no need to call track incoming, this is all it does
1✔
122
            return false;
1✔
123
        }
124

125
        int m = Integer.MIN_VALUE;
1✔
126
        long b = Long.MIN_VALUE;
1✔
127
        Headers h = msg.getHeaders();
1✔
128
        if (h != null) {
1✔
129
            try {
130
                m = Integer.parseInt(h.getFirst(NATS_PENDING_MESSAGES));
1✔
131
                b = Long.parseLong(h.getFirst(NATS_PENDING_BYTES));
1✔
132
            }
133
            catch (NumberFormatException ignore) {
×
134
                m = Integer.MIN_VALUE; // shouldn't happen but don't fail; make sure don't track m/b
×
135
            }
1✔
136
        }
137
        trackIncoming(m, b);
1✔
138
        return true;
1✔
139
    }
140

141
    @Override
142
    protected ManageResult manage(Message msg) {
143
        // normal js message
144
        if (msg.getStatus() == null) {
1✔
145
            trackJsMessage(msg);
1✔
146
            return MESSAGE;
1✔
147
        }
148
        return manageStatus(msg);
1✔
149
    }
150

151
    protected ManageResult manageStatus(Message msg) {
152
        Status status = msg.getStatus();
1✔
153
        switch (status.getCode()) {
1✔
154
            case NOT_FOUND_CODE:
155
            case REQUEST_TIMEOUT_CODE:
156
            case NO_RESPONDERS_CODE:
157
                if (raiseStatusWarnings) {
1✔
158
                    conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
1✔
159
                }
160
                return STATUS_TERMINUS;
1✔
161

162
            case CONFLICT_CODE:
163
                // sometimes just a warning
164
                String statMsg = status.getMessage();
1✔
165
                if (statMsg.startsWith(EXCEEDED_MAX_PREFIX) || statMsg.equals(SERVER_SHUTDOWN))
1✔
166
                {
167
                    if (raiseStatusWarnings) {
1✔
168
                        conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
1✔
169
                    }
170
                    return STATUS_HANDLED;
1✔
171
                }
172

173
                if (statMsg.equals(BATCH_COMPLETED)
1✔
174
                    || statMsg.equals(LEADERSHIP_CHANGE)
1✔
175
                    || statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
1✔
176
                {
177
                    return STATUS_TERMINUS;
1✔
178
                }
179
                break;
180
        }
181

182
        // All unknown 409s are errors, since that basically means the client is not aware of them.
183
        // These known ones are also errors: "Consumer Deleted" and "Consumer is push based"
184
        conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status));
1✔
185
        return STATUS_ERROR;
1✔
186
    }
187

188
    protected boolean noMorePending() {
189
        return pendingMessages < 1 || (trackingBytes && pendingBytes < 1);
1✔
190
    }
191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc