• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2042

04 Jul 2025 01:42PM UTC coverage: 95.608% (-0.003%) from 95.611%
#2042

push

github

web-flow
Merge pull request #1348 from nats-io/fix-what-i-broke-2-21-3

Fix heartbeat timer handling broken when replacing timer with scheduler.

44 of 50 new or added lines in 7 files covered. (88.0%)

6 existing lines in 4 files now uncovered.

11798 of 12340 relevant lines covered (95.61%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.04
/src/main/java/io/nats/client/impl/NatsConsumerContext.java
1
// Copyright 2020-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.ConsumerConfiguration;
18
import io.nats.client.api.ConsumerInfo;
19
import io.nats.client.api.OrderedConsumerConfiguration;
20
import io.nats.client.support.Validator;
21

22
import java.io.IOException;
23
import java.time.Duration;
24
import java.util.concurrent.atomic.AtomicLong;
25
import java.util.concurrent.atomic.AtomicReference;
26
import java.util.concurrent.locks.ReentrantLock;
27

28
import static io.nats.client.BaseConsumeOptions.DEFAULT_EXPIRES_IN_MILLIS;
29
import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;
30
import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS;
31
import static io.nats.client.impl.NatsJetStreamSubscription.EXPIRE_ADJUSTMENT;
32

33
/**
34
 * Implementation of Consumer Context
35
 */
36
public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscriptionMaker {
37
    private final ReentrantLock stateLock;
38
    private final NatsStreamContext streamCtx;
39
    private final boolean ordered;
40
    private final ConsumerConfiguration initialOrderedConsumerConfig;
41
    private final PullSubscribeOptions unorderedBindPso;
42

43
    private final AtomicReference<ConsumerInfo> cachedConsumerInfo;
44
    private final AtomicReference<String> consumerName;
45
    private final AtomicLong highestSeq;
46
    private final AtomicReference<Dispatcher> defaultDispatcher;
47
    private final AtomicReference<NatsMessageConsumerBase> lastConsumer;
48

49
    NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration occ) {
1✔
50
        stateLock = new ReentrantLock();
1✔
51
        streamCtx = sc;
1✔
52
        cachedConsumerInfo = new AtomicReference<>();
1✔
53
        consumerName = new AtomicReference<>();
1✔
54
        highestSeq = new AtomicLong();
1✔
55
        defaultDispatcher = new AtomicReference<>();
1✔
56
        lastConsumer = new AtomicReference<>();
1✔
57
        if (unorderedConsumerInfo != null) {
1✔
58
            ordered = false;
1✔
59
            initialOrderedConsumerConfig = null;
1✔
60
            cachedConsumerInfo.set(unorderedConsumerInfo);
1✔
61
            consumerName.set(unorderedConsumerInfo.getName());
1✔
62
            unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, unorderedConsumerInfo.getName());
1✔
63
        }
64
        else {
65
            ordered = true;
1✔
66
            initialOrderedConsumerConfig = ConsumerConfiguration.builder()
1✔
67
                .name(occ.getConsumerNamePrefix())
1✔
68
                .filterSubjects(occ.getFilterSubjects())
1✔
69
                .deliverPolicy(occ.getDeliverPolicy())
1✔
70
                .startSequence(occ.getStartSequence())
1✔
71
                .startTime(occ.getStartTime())
1✔
72
                .replayPolicy(occ.getReplayPolicy())
1✔
73
                .headersOnly(occ.getHeadersOnly())
1✔
74
                .build();
1✔
75
            unorderedBindPso = null;
1✔
76
        }
77
    }
1✔
78

79
    static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder {
80
        OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) {
1✔
81
            stream(streamName);
1✔
82
            configuration(cc);
1✔
83
            ordered = true;
1✔
84
        }
1✔
85
    }
86

87
    @Override
88
    public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm, Long optionalInactiveThreshold) throws IOException, JetStreamApiException {
89
        PullSubscribeOptions pso;
90
        if (ordered) {
1✔
91
            NatsMessageConsumerBase lastCon = lastConsumer.get();
1✔
92
            if (lastCon != null) {
1✔
93
                highestSeq.set(Math.max(highestSeq.get(), lastCon.pmm.lastStreamSeq));
1✔
94
            }
95
            ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered(initialOrderedConsumerConfig, highestSeq.get(), null, optionalInactiveThreshold).build();
1✔
96
            pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build();
1✔
97
        }
1✔
98
        else {
99
            pso = unorderedBindPso;
1✔
100
        }
101

102
        NatsJetStreamPullSubscription sub;
103
        if (messageHandler == null) {
1✔
104
            sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
1✔
105
                null, null, pso, null, null, null, false, optionalPmm);
106
        }
107
        else {
108
            Dispatcher d = userDispatcher;
1✔
109
            if (d == null) {
1✔
110
                d = defaultDispatcher.get();
1✔
111
                if (d == null) {
1✔
112
                    d = streamCtx.js.conn.createDispatcher();
1✔
113
                    defaultDispatcher.set(d);
1✔
114
                }
115
            }
116
            sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
1✔
117
                null, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
118
        }
119
        consumerName.set(sub.getConsumerName());
1✔
120
        return sub;
1✔
121
    }
122

123
    private void checkState() throws IOException {
124
        NatsMessageConsumerBase lastCon = lastConsumer.get();
1✔
125
        if (lastCon != null) {
1✔
126
            if (ordered) {
1✔
127
                if (!lastCon.finished.get()) {
1✔
128
                    throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time.");
1✔
129
                }
130
            }
131
            if (lastCon.finished.get() && !lastCon.stopped.get()) {
1✔
NEW
132
                lastCon.shutdownSub(); // finished, might as well make sure the sub is closed.
×
133
            }
134
        }
135
    }
1✔
136

137
    private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
138
        lastConsumer.set(con);
1✔
139
        return con;
1✔
140
    }
141

142
    /**
143
     * {@inheritDoc}
144
     */
145
    @Override
146
    public String getConsumerName() {
147
        return consumerName.get();
1✔
148
    }
149

150
    /**
151
     * {@inheritDoc}
152
     */
153
    @Override
154
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
155
        ConsumerInfo ci = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, consumerName.get());
1✔
156
        cachedConsumerInfo.set(ci);
1✔
157
        consumerName.set(ci.getName());
1✔
158
        return ci;
1✔
159
    }
160

161
    /**
162
     * {@inheritDoc}
163
     */
164
    @Override
165
    public ConsumerInfo getCachedConsumerInfo() {
166
        return cachedConsumerInfo.get();
1✔
167
    }
168

169
    /**
170
     * {@inheritDoc}
171
     */
172
    @Override
173
    public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
174
        return next(DEFAULT_EXPIRES_IN_MILLIS);
1✔
175
    }
176

177
    /**
178
     * {@inheritDoc}
179
     */
180
    @Override
181
    public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
182
        return maxWait == null ? next(DEFAULT_EXPIRES_IN_MILLIS) : next(maxWait.toMillis());
1✔
183
    }
184

185
    /**
186
     * {@inheritDoc}
187
     */
188
    @Override
189
    public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
190
        if (maxWaitMillis < MIN_EXPIRES_MILLS) {
1✔
191
            throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds.");
1✔
192
        }
193

194
        NatsMessageConsumerBase nmcb = null;
1✔
195
        try {
196
            stateLock.lock();
1✔
197
            checkState();
1✔
198

199
            try {
200
                long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
1✔
201
                nmcb = new NatsMessageConsumerBase(cachedConsumerInfo.get());
1✔
202
                nmcb.initSub(subscribe(null, null, null, inactiveThreshold));
1✔
203
                nmcb.setConsumerName(consumerName.get()); // the call to subscribe sets this
1✔
204
                trackConsume(nmcb); // this has to be done after the nmcb is fully set up
1✔
205
                nmcb.sub._pull(PullRequestOptions.builder(1)
1✔
206
                    .expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT)
1✔
207
                    .build(), false, null);
1✔
208
            }
209
            catch (Exception e) {
×
210
                if (nmcb != null) {
×
211
                    try {
212
                        nmcb.close();
×
213
                    }
214
                    catch (Exception ignore) {}
×
215
                }
216
                return null;
×
217
            }
1✔
218
        }
219
        finally {
220
            stateLock.unlock();
1✔
221
        }
222

223
        // intentionally outside the lock
224
        try {
225
            return nmcb.sub.nextMessage(maxWaitMillis);
1✔
226
        }
227
        finally {
228
            try {
229
                nmcb.finished.set(true);
1✔
230
                nmcb.close();
1✔
231
            }
232
            catch (Exception e) {
×
233
                // from close/autocloseable, but we know it doesn't actually throw
234
            }
1✔
235
        }
236
    }
237

238
    /**
239
     * {@inheritDoc}
240
     */
241
    @Override
242
    public FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException {
243
        return fetch(FetchConsumeOptions.builder().maxMessages(maxMessages).build());
1✔
244
    }
245

246
    /**
247
     * {@inheritDoc}
248
     */
249
    @Override
250
    public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException {
251
        return fetch(FetchConsumeOptions.builder().maxBytes(maxBytes).build());
1✔
252
    }
253

254
    /**
255
     * {@inheritDoc}
256
     */
257
    @Override
258
    public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException {
259
        try {
260
            stateLock.lock();
1✔
261
            checkState();
1✔
262
            Validator.required(fetchConsumeOptions, "Fetch Consume Options");
1✔
263
            return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
1✔
264
        }
265
        finally {
266
            stateLock.unlock();
1✔
267
        }
268
    }
269

270
    /**
271
     * {@inheritDoc}
272
     */
273
    @Override
274
    public IterableConsumer iterate() throws IOException, JetStreamApiException {
275
        return iterate(DEFAULT_CONSUME_OPTIONS);
1✔
276
    }
277

278
    /**
279
     * {@inheritDoc}
280
     */
281
    @Override
282
    public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException {
283
        try {
284
            stateLock.lock();
1✔
285
            checkState();
1✔
286
            Validator.required(consumeOptions, "Consume Options");
1✔
287
            return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, cachedConsumerInfo.get(), consumeOptions));
1✔
288
        }
289
        finally {
290
            stateLock.unlock();
1✔
291
        }
292
    }
293

294
    /**
295
     * {@inheritDoc}
296
     */
297
    @Override
298
    public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException {
299
        return consume(DEFAULT_CONSUME_OPTIONS, null, handler);
1✔
300
    }
301

302
    /**
303
     * {@inheritDoc}
304
     */
305
    @Override
306
    public MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException {
307
        return consume(DEFAULT_CONSUME_OPTIONS, dispatcher, handler);
×
308
    }
309

310
    /**
311
     * {@inheritDoc}
312
     */
313
    @Override
314
    public MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException {
315
        return consume(consumeOptions, null, handler);
1✔
316
    }
317

318
    /**
319
     * {@inheritDoc}
320
     */
321
    @Override
322
    public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher userDispatcher, MessageHandler handler) throws IOException, JetStreamApiException {
323
        try {
324
            stateLock.lock();
1✔
325
            checkState();
1✔
326
            Validator.required(handler, "Message Handler");
1✔
327
            Validator.required(consumeOptions, "Consume Options");
1✔
328
            return trackConsume(new NatsMessageConsumer(this, cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
1✔
329
        }
330
        finally {
331
            stateLock.unlock();
1✔
332
        }
333
    }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc