• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2059

15 Jul 2025 06:40PM UTC coverage: 95.593% (-0.06%) from 95.655%
#2059

push

github

web-flow
Merge pull request #1357 from nats-io/shutdown-internal-connection-executors

[FIX] Shutdown internal executors on connection close.

19 of 19 new or added lines in 2 files covered. (100.0%)

16 existing lines in 7 files now uncovered.

11844 of 12390 relevant lines covered (95.59%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/src/main/java/io/nats/client/impl/OrderedMessageManager.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17
import io.nats.client.SubscribeOptions;
18
import io.nats.client.api.ConsumerConfiguration;
19
import io.nats.client.api.ConsumerCreateRequest;
20
import io.nats.client.api.ConsumerInfo;
21

22
import java.util.concurrent.atomic.AtomicReference;
23

24
import static io.nats.client.impl.MessageManager.ManageResult.MESSAGE;
25
import static io.nats.client.impl.MessageManager.ManageResult.STATUS_HANDLED;
26
import static io.nats.client.support.NatsJetStreamUtil.generateConsumerName;
27

28
class OrderedMessageManager extends PushMessageManager {
29

30
    protected long expectedExternalConsumerSeq;
31
    protected final AtomicReference<String> targetSid;
32

33
    protected OrderedMessageManager(
34
        NatsConnection conn,
35
        NatsJetStream js,
36
        String stream,
37
        SubscribeOptions so,
38
        ConsumerConfiguration originalCc,
39
        boolean queueMode,
40
        boolean syncMode)
41
    {
42
        super(conn, js, stream, so, originalCc, queueMode, syncMode);
1✔
43
        expectedExternalConsumerSeq = 1; // always starts at 1
1✔
44
        targetSid = new AtomicReference<>();
1✔
45
    }
1✔
46

47
    @Override
48
    protected void startup(NatsJetStreamSubscription sub) {
49
        expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
1✔
50
        super.startup(sub);
1✔
51
        targetSid.set(sub.getSID());
1✔
52
    }
1✔
53

54
    @Override
55
    protected ManageResult manage(Message msg) {
56
        if (!msg.getSID().equals(targetSid.get())) {
1✔
57
            return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
1✔
58
        }
59

60
        if (msg.isJetStream()) {
1✔
61
            long receivedConsumerSeq = msg.metaData().consumerSequence();
1✔
62
            if (expectedExternalConsumerSeq != receivedConsumerSeq) {
1✔
63
                handleErrorCondition();
1✔
64
                return STATUS_HANDLED;
1✔
65
            }
66
            trackJsMessage(msg);
1✔
67
            expectedExternalConsumerSeq++;
1✔
68
            return MESSAGE;
1✔
69
        }
70

71
        return manageStatus(msg);
×
72
    }
73

74
    @Override
75
    protected void handleHeartbeatError() {
76
        super.handleHeartbeatError();
1✔
77
        handleErrorCondition();
1✔
78
    }
1✔
79

80
    private void handleErrorCondition() {
81
        try {
82
            targetSid.set(null);
1✔
83
            expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
1✔
84

85
            // 1. re-subscribe. This means killing the sub then making a new one.
86
            //    New sub needs a new deliverSubject
87
            String newDeliverSubject = sub.connection.createInbox();
1✔
88
            sub.reSubscribe(newDeliverSubject);
1✔
89
            targetSid.set(sub.getSID());
1✔
90

91
            // 2a. make a new consumer using the same "deliver" subject but with a new starting point, and a new name
92
            ConsumerConfiguration.Builder b = js.consumerConfigurationForOrdered(initialCc, lastStreamSeq, newDeliverSubject, null);
1✔
93

94
            // 2b. because we bypass the normal create-subscription workflow,
95
            //     we have to handle the fact that ordered consumers must always have a unique name.
96
            //     if the user supplied a name, well call generateConsumerName with the original name as a prefix
97
            if (initialCc.getName() != null) {
1✔
98
                b.name(generateConsumerName(initialCc.getName()));
1✔
99
            }
100
            ConsumerConfiguration userCC = b.build();
1✔
101
            ConsumerInfo ci = js._createConsumer(stream, userCC, ConsumerCreateRequest.Action.Create); // this can fail when a server is down.
1✔
102
            sub.setConsumerName(ci.getName());
1✔
103

104
            // 3. restart the manager.
105
            startup(sub);
1✔
106
        }
UNCOV
107
        catch (Exception e) {
×
108
            // don't want this doubly failing for any reason
109
            try {
110
                js.conn.processException(e);
×
111
            }
UNCOV
112
            catch (Exception ignore) {}
×
UNCOV
113
            initOrResetHeartbeatTimer();
×
114
        }
1✔
115
    }
1✔
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc