• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2101

12 Aug 2025 11:26AM UTC coverage: 95.457% (+0.02%) from 95.433%
#2101

push

github

web-flow
Merge pull request #1387 from nats-io/info-nullability

Ensuring nullability contracts

92 of 92 new or added lines in 10 files covered. (100.0%)

108 existing lines in 12 files now uncovered.

11913 of 12480 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.44
/src/main/java/io/nats/client/impl/NatsConsumerContext.java
1
// Copyright 2020-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.ConsumerConfiguration;
18
import io.nats.client.api.ConsumerInfo;
19
import io.nats.client.api.OrderedConsumerConfiguration;
20
import io.nats.client.support.Validator;
21
import org.jspecify.annotations.NonNull;
22
import org.jspecify.annotations.Nullable;
23

24
import java.io.IOException;
25
import java.time.Duration;
26
import java.util.concurrent.atomic.AtomicLong;
27
import java.util.concurrent.atomic.AtomicReference;
28
import java.util.concurrent.locks.ReentrantLock;
29

30
import static io.nats.client.BaseConsumeOptions.DEFAULT_EXPIRES_IN_MILLIS;
31
import static io.nats.client.BaseConsumeOptions.MIN_EXPIRES_MILLS;
32
import static io.nats.client.ConsumeOptions.DEFAULT_CONSUME_OPTIONS;
33
import static io.nats.client.impl.NatsJetStreamSubscription.EXPIRE_ADJUSTMENT;
34

35
/**
36
 * Implementation of Consumer Context
37
 */
38
public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscriptionMaker {
39
    private final ReentrantLock stateLock;
40
    private final NatsStreamContext streamCtx;
41
    private final boolean ordered;
42
    private final ConsumerConfiguration initialOrderedConsumerConfig;
43
    private final PullSubscribeOptions unorderedBindPso;
44

45
    private final AtomicReference<ConsumerInfo> cachedConsumerInfo;
46
    private final AtomicReference<String> consumerName;
47
    private final AtomicLong highestSeq;
48
    private final AtomicReference<Dispatcher> defaultDispatcher;
49
    private final AtomicReference<NatsMessageConsumerBase> lastConsumer;
50

51
    NatsConsumerContext(@NonNull NatsStreamContext sc, @Nullable ConsumerInfo unorderedConsumerInfo, @Nullable OrderedConsumerConfiguration occ) {
1✔
52
        stateLock = new ReentrantLock();
1✔
53
        streamCtx = sc;
1✔
54
        cachedConsumerInfo = new AtomicReference<>();
1✔
55
        consumerName = new AtomicReference<>();
1✔
56
        highestSeq = new AtomicLong();
1✔
57
        defaultDispatcher = new AtomicReference<>();
1✔
58
        lastConsumer = new AtomicReference<>();
1✔
59
        if (unorderedConsumerInfo != null) {
1✔
60
            ordered = false;
1✔
61
            initialOrderedConsumerConfig = null;
1✔
62
            cachedConsumerInfo.set(unorderedConsumerInfo);
1✔
63
            consumerName.set(unorderedConsumerInfo.getName());
1✔
64
            unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, unorderedConsumerInfo.getName());
1✔
65
        }
66
        else if (occ != null) {
1✔
67
            ordered = true;
1✔
68
            initialOrderedConsumerConfig = ConsumerConfiguration.builder()
1✔
69
                .name(occ.getConsumerNamePrefix())
1✔
70
                .filterSubjects(occ.getFilterSubjects())
1✔
71
                .deliverPolicy(occ.getDeliverPolicy())
1✔
72
                .startSequence(occ.getStartSequence())
1✔
73
                .startTime(occ.getStartTime())
1✔
74
                .replayPolicy(occ.getReplayPolicy())
1✔
75
                .headersOnly(occ.getHeadersOnly())
1✔
76
                .build();
1✔
77
            unorderedBindPso = null;
1✔
78
        }
79
        else {
UNCOV
80
            throw new IllegalArgumentException("Internal Error, must be ordered or unordered.");
×
81
        }
82
    }
1✔
83

84
    static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Builder {
85
        OrderedPullSubscribeOptionsBuilder(String streamName, ConsumerConfiguration cc) {
1✔
86
            stream(streamName);
1✔
87
            configuration(cc);
1✔
88
            ordered = true;
1✔
89
        }
1✔
90
    }
91

92
    @Override
93
    public NatsJetStreamPullSubscription subscribe(@Nullable MessageHandler messageHandler,
94
                                                   @Nullable Dispatcher userDispatcher,
95
                                                   @SuppressWarnings("ClassEscapesDefinedScope") @Nullable PullMessageManager optionalPmm,
96
                                                   @Nullable Long optionalInactiveThreshold)
97
        throws IOException, JetStreamApiException
98
    {
99
        PullSubscribeOptions pso;
100
        if (ordered) {
1✔
101
            NatsMessageConsumerBase lastCon = lastConsumer.get();
1✔
102
            if (lastCon != null) {
1✔
103
                highestSeq.set(Math.max(highestSeq.get(), lastCon.pmm.lastStreamSeq));
1✔
104
            }
105
            ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered(initialOrderedConsumerConfig, highestSeq.get(), null, optionalInactiveThreshold).build();
1✔
106
            pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build();
1✔
107
        }
1✔
108
        else {
109
            pso = unorderedBindPso;
1✔
110
        }
111

112
        NatsJetStreamPullSubscription sub;
113
        if (messageHandler == null) {
1✔
114
            sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
1✔
115
                null, null, pso, null, null, null, false, optionalPmm);
116
        }
117
        else {
118
            Dispatcher d = userDispatcher;
1✔
119
            if (d == null) {
1✔
120
                d = defaultDispatcher.get();
1✔
121
                if (d == null) {
1✔
122
                    d = streamCtx.js.conn.createDispatcher();
1✔
123
                    defaultDispatcher.set(d);
1✔
124
                }
125
            }
126
            sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
1✔
127
                null, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
128
        }
129
        consumerName.set(sub.getConsumerName());
1✔
130
        return sub;
1✔
131
    }
132

133
    private void checkState() throws IOException {
134
        NatsMessageConsumerBase lastCon = lastConsumer.get();
1✔
135
        if (lastCon != null) {
1✔
136
            if (ordered) {
1✔
137
                if (!lastCon.finished.get()) {
1✔
138
                    throw new IOException("The ordered consumer is already receiving messages. Ordered Consumer does not allow multiple instances at time.");
1✔
139
                }
140
            }
141
            if (lastCon.finished.get() && !lastCon.stopped.get()) {
1✔
UNCOV
142
                lastCon.shutdownSub(); // finished, might as well make sure the sub is closed.
×
143
            }
144
        }
145
    }
1✔
146

147
    private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
148
        lastConsumer.set(con);
1✔
149
        return con;
1✔
150
    }
151

152
    /**
153
     * {@inheritDoc}
154
     */
155
    @Override
156
    public String getConsumerName() {
157
        return consumerName.get();
1✔
158
    }
159

160
    /**
161
     * {@inheritDoc}
162
     */
163
    @Override
164
    @NonNull
165
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
166
        ConsumerInfo ci = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, consumerName.get());
1✔
167
        cachedConsumerInfo.set(ci);
1✔
168
        consumerName.set(ci.getName());
1✔
169
        return ci;
1✔
170
    }
171

172
    /**
173
     * {@inheritDoc}
174
     */
175
    @Override
176
    @Nullable
177
    public ConsumerInfo getCachedConsumerInfo() {
178
        return cachedConsumerInfo.get();
1✔
179
    }
180

181
    /**
182
     * {@inheritDoc}
183
     */
184
    @Override
185
    @Nullable
186
    public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
187
        return next(DEFAULT_EXPIRES_IN_MILLIS);
1✔
188
    }
189

190
    /**
191
     * {@inheritDoc}
192
     */
193
    @Override
194
    @Nullable
195
    public Message next(@Nullable Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
196
        return maxWait == null || maxWait.isZero() || maxWait.isNegative()
1✔
197
            ? next(DEFAULT_EXPIRES_IN_MILLIS)
1✔
198
            : next(maxWait.toMillis());
1✔
199
    }
200

201
    /**
202
     * {@inheritDoc}
203
     */
204
    @Override
205
    @Nullable
206
    public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
207
        if (maxWaitMillis < MIN_EXPIRES_MILLS) {
1✔
208
            throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds.");
1✔
209
        }
210

211
        NatsMessageConsumerBase nmcb = null;
1✔
212
        try {
213
            stateLock.lock();
1✔
214
            checkState();
1✔
215

216
            try {
217
                long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
1✔
218
                nmcb = new NatsMessageConsumerBase(cachedConsumerInfo.get());
1✔
219
                nmcb.initSub(subscribe(null, null, null, inactiveThreshold));
1✔
220
                nmcb.setConsumerName(consumerName.get()); // the call to subscribe sets this
1✔
221
                trackConsume(nmcb); // this has to be done after the nmcb is fully set up
1✔
222
                nmcb.sub._pull(PullRequestOptions.builder(1)
1✔
223
                    .expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT)
1✔
224
                    .build(), false, null);
1✔
225
            }
UNCOV
226
            catch (Exception e) {
×
UNCOV
227
                if (nmcb != null) {
×
228
                    try {
UNCOV
229
                        nmcb.close();
×
230
                    }
UNCOV
231
                    catch (Exception ignore) {}
×
232
                }
UNCOV
233
                return null;
×
234
            }
1✔
235
        }
236
        finally {
237
            stateLock.unlock();
1✔
238
        }
239

240
        // intentionally outside the lock
241
        try {
242
            return nmcb.sub.nextMessage(maxWaitMillis);
1✔
243
        }
244
        finally {
245
            try {
246
                nmcb.finished.set(true);
1✔
247
                nmcb.close();
1✔
248
            }
UNCOV
249
            catch (Exception e) {
×
250
                // from close/autocloseable, but we know it doesn't actually throw
251
            }
1✔
252
        }
253
    }
254

255
    /**
256
     * {@inheritDoc}
257
     */
258
    @Override
259
    @NonNull
260
    public FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException {
261
        return fetch(FetchConsumeOptions.builder().maxMessages(maxMessages).build());
1✔
262
    }
263

264
    /**
265
     * {@inheritDoc}
266
     */
267
    @Override
268
    @NonNull
269
    public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException {
270
        return fetch(FetchConsumeOptions.builder().maxBytes(maxBytes).build());
1✔
271
    }
272

273
    /**
274
     * {@inheritDoc}
275
     */
276
    @Override
277
    @NonNull
278
    public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException {
279
        Validator.required(fetchConsumeOptions, "Fetch Consume Options");
1✔
280
        try {
281
            stateLock.lock();
1✔
282
            checkState();
1✔
283
            return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
1✔
284
        }
285
        finally {
286
            stateLock.unlock();
1✔
287
        }
288
    }
289

290
    /**
291
     * {@inheritDoc}
292
     */
293
    @Override
294
    @NonNull
295
    public IterableConsumer iterate() throws IOException, JetStreamApiException {
296
        return iterate(DEFAULT_CONSUME_OPTIONS);
1✔
297
    }
298

299
    /**
300
     * {@inheritDoc}
301
     */
302
    @Override
303
    @NonNull
304
    public IterableConsumer iterate(@NonNull ConsumeOptions consumeOptions) throws IOException, JetStreamApiException {
305
        Validator.required(consumeOptions, "Consume Options");
1✔
306
        try {
307
            stateLock.lock();
1✔
308
            checkState();
1✔
309
            return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, cachedConsumerInfo.get(), consumeOptions));
1✔
310
        }
311
        finally {
312
            stateLock.unlock();
1✔
313
        }
314
    }
315

316
    /**
317
     * {@inheritDoc}
318
     */
319
    @Override
320
    @NonNull
321
    public MessageConsumer consume(@NonNull MessageHandler handler) throws IOException, JetStreamApiException {
322
        return consume(DEFAULT_CONSUME_OPTIONS, null, handler);
1✔
323
    }
324

325
    /**
326
     * {@inheritDoc}
327
     */
328
    @Override
329
    @NonNull
330
    public MessageConsumer consume(@Nullable Dispatcher dispatcher,
331
                                   @NonNull MessageHandler handler) throws IOException, JetStreamApiException {
UNCOV
332
        return consume(DEFAULT_CONSUME_OPTIONS, dispatcher, handler);
×
333
    }
334

335
    /**
336
     * {@inheritDoc}
337
     */
338
    @Override
339
    @NonNull
340
    public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions,
341
                                   @NonNull MessageHandler handler) throws IOException, JetStreamApiException {
342
        return consume(consumeOptions, null, handler);
1✔
343
    }
344

345
    /**
346
     * {@inheritDoc}
347
     */
348
    @Override
349
    @NonNull
350
    public MessageConsumer consume(@NonNull ConsumeOptions consumeOptions,
351
                                   @Nullable Dispatcher userDispatcher,
352
                                   @NonNull MessageHandler handler)
353
        throws IOException, JetStreamApiException
354
    {
355
        Validator.required(consumeOptions, "Consume Options");
1✔
356
        Validator.required(handler, "Message Handler");
1✔
357
        try {
358
            stateLock.lock();
1✔
359
            checkState();
1✔
360
            return trackConsume(new NatsMessageConsumer(this, cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
1✔
361
        }
362
        finally {
363
            stateLock.unlock();
1✔
364
        }
365
    }
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc