• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2014

27 Jun 2025 02:19PM UTC coverage: 95.671% (-0.006%) from 95.677%
#2014

push

github

web-flow
Merge pull request #1339 from nats-io/fix-1337

[BUG] MessageConsumer.isFinished() not set properly in certain conditions

17 of 19 new or added lines in 4 files covered. (89.47%)

2 existing lines in 2 files now uncovered.

11758 of 12290 relevant lines covered (95.67%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/main/java/io/nats/client/impl/PullOrderedMessageManager.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17
import io.nats.client.SubscribeOptions;
18
import io.nats.client.api.ConsumerConfiguration;
19

20
import java.util.concurrent.atomic.AtomicReference;
21

22
import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE;
23
import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED;
24

25
class PullOrderedMessageManager extends PullMessageManager {
26

27
    protected final ConsumerConfiguration originalCc;
28
    protected final NatsJetStream js;
29
    protected final String stream;
30
    protected long expectedExternalConsumerSeq;
31
    protected final AtomicReference<String> targetSid;
32

33
    protected PullOrderedMessageManager(NatsConnection conn,
34
                                        NatsJetStream js,
35
                                        String stream,
36
                                        SubscribeOptions so, ConsumerConfiguration originalCc, boolean syncMode) {
37
        super(conn, so, syncMode);
1✔
38
        this.js = js;
1✔
39
        this.stream = stream;
1✔
40
        this.originalCc = originalCc;
1✔
41
        expectedExternalConsumerSeq = 1; // always starts at 1
1✔
42
        targetSid = new AtomicReference<>();
1✔
43
    }
1✔
44

45
    @Override
46
    protected void startup(NatsJetStreamSubscription sub) {
47
        super.startup(sub);
1✔
48
        targetSid.set(sub.getSID());
1✔
49
    }
1✔
50

51
    @Override
52
    protected ManageResult manage(Message msg) {
53
        if (!msg.getSID().equals(targetSid.get())) {
1✔
UNCOV
54
            return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
×
55
        }
56

57
        if (msg.isJetStream()) {
1✔
58
            long receivedConsumerSeq = msg.metaData().consumerSequence();
1✔
59
            if (expectedExternalConsumerSeq != receivedConsumerSeq) {
1✔
60
                targetSid.set(null);
1✔
61
                expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
1✔
62
                resetTracking();
1✔
63
                if (pullManagerObserver != null) {
1✔
64
                    pullManagerObserver.heartbeatError();
1✔
65
                }
66
                return STATUS_HANDLED;
1✔
67
            }
68
            trackJsMessage(msg);
1✔
69
            expectedExternalConsumerSeq++;
1✔
70
            return MESSAGE;
1✔
71
        }
72

73
        return manageStatus(msg);
1✔
74
    }
75
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc