• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2101

12 Aug 2025 11:26AM UTC coverage: 95.457% (+0.02%) from 95.433%
#2101

push

github

web-flow
Merge pull request #1387 from nats-io/info-nullability

Ensuring nullability contracts

92 of 92 new or added lines in 10 files covered. (100.0%)

108 existing lines in 12 files now uncovered.

11913 of 12480 relevant lines covered (95.46%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.26
/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.JetStreamApiException;
17
import io.nats.client.JetStreamOptions;
18
import io.nats.client.Message;
19
import io.nats.client.api.*;
20
import io.nats.client.support.NatsJetStreamConstants;
21

22
import java.io.IOException;
23
import java.time.Duration;
24
import java.util.List;
25
import java.util.concurrent.ConcurrentHashMap;
26

27
import static io.nats.client.support.NatsConstants.GREATER_THAN;
28
import static io.nats.client.support.NatsJetStreamClientError.JsConsumerCreate290NotAvailable;
29
import static io.nats.client.support.NatsJetStreamClientError.JsMultipleFilterSubjects210NotAvailable;
30
import static io.nats.client.support.NatsJetStreamUtil.generateConsumerName;
31
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
32

33
class NatsJetStreamImpl implements NatsJetStreamConstants {
34

35
    // currently the only thing we care about caching is the allowDirect setting
36
    static class CachedStreamInfo {
37
        public final boolean allowDirect;
38

39
        public CachedStreamInfo(StreamInfo si) {
1✔
40
            allowDirect = si.getConfiguration().getAllowDirect();
1✔
41
        }
1✔
42
    }
43

44
    private static final ConcurrentHashMap<String, CachedStreamInfo> CACHED_STREAM_INFO_MAP = new ConcurrentHashMap<>();
1✔
45

46
    final NatsConnection conn;
47
    final JetStreamOptions jso;
48
    final Duration timeout;
49
    final boolean consumerCreate290Available;
50
    final boolean multipleSubjectFilter210Available;
51
    final boolean directBatchGet211Available;
52

53
    // ----------------------------------------------------------------------------------------------------
54
    // Create / Init
55
    // ----------------------------------------------------------------------------------------------------
56
    NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) {
1✔
57
        conn = connection;
1✔
58

59
        // Get a working version of JetStream Options...
60
        // Clone the input jsOptions (JetStreamOptions.builder(...) handles null.
61
        // If jsOptions is not supplied or the jsOptions request timeout
62
        // was not set, use the connection options connect timeout.
63
        timeout = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
1✔
64
        jso = JetStreamOptions.builder(jsOptions).requestTimeout(timeout).build();
1✔
65

66
        ServerInfo si = conn.getServerInfo();
1✔
67
        consumerCreate290Available = si.isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate();
1✔
68
        multipleSubjectFilter210Available = si.isNewerVersionThan("2.9.99");
1✔
69
        directBatchGet211Available = si.isNewerVersionThan("2.10.99");
1✔
70
    }
1✔
71

72
    NatsJetStreamImpl(NatsJetStreamImpl impl) {
×
73
        conn = impl.conn;
×
74
        jso = impl.jso;
×
75
        timeout = impl.timeout;
×
76
        consumerCreate290Available = impl.consumerCreate290Available;
×
77
        multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available;
×
78
        directBatchGet211Available = impl.directBatchGet211Available;
×
UNCOV
79
    }
×
80

81
    Duration getTimeout() {
82
        return timeout;
1✔
83
    }
84

85
    // ----------------------------------------------------------------------------------------------------
86
    // Management that is also needed by regular context
87
    // ----------------------------------------------------------------------------------------------------
88
    ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
89
        String subj = String.format(JSAPI_CONSUMER_INFO, streamName, consumerName);
1✔
90
        Message resp = makeRequestResponseRequired(subj, null, getTimeout());
1✔
91
        return new ConsumerInfo(resp).throwOnHasError();
1✔
92
    }
93

94
    ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config, ConsumerCreateRequest.Action action) throws IOException, JetStreamApiException {
95
        // ConsumerConfiguration validates that name and durable are the same if both are supplied.
96
        String consumerName = config.getName();
1✔
97
        if (consumerName != null && !consumerCreate290Available) {
1✔
98
            throw JsConsumerCreate290NotAvailable.instance();
1✔
99
        }
100

101
        boolean hasMultipleFilterSubjects = config.hasMultipleFilterSubjects();
1✔
102

103
        // seems strange that this could happen, but checking anyway...
104
        if (hasMultipleFilterSubjects && !multipleSubjectFilter210Available) {
1✔
UNCOV
105
            throw JsMultipleFilterSubjects210NotAvailable.instance();
×
106
        }
107

108
        String durable = config.getDurable();
1✔
109
        String subj;
110
        // new consumer create not available before 290 and can't be used with multiple filter subjects
111
        if (consumerCreate290Available && !hasMultipleFilterSubjects) {
1✔
112
            if (consumerName == null) {
1✔
113
                // if both consumerName and durable are null, generate a name
114
                consumerName = durable == null ? generateConsumerName() : durable;
1✔
115
            }
116
            String fs = config.getFilterSubject(); // we've already determined there are not more than 1 filter subjects, so this gives us one or null
1✔
117
            if (fs == null || fs.equals(GREATER_THAN)) {
1✔
118
                subj = String.format(JSAPI_CONSUMER_CREATE_V290, streamName, consumerName);
1✔
119
            }
120
            else {
121
                subj = String.format(JSAPI_CONSUMER_CREATE_V290_W_FILTER, streamName, consumerName, fs);
1✔
122
            }
123
        }
1✔
124
        else if (durable == null) {
1✔
125
            subj = String.format(JSAPI_CONSUMER_CREATE, streamName);
1✔
126
        }
127
        else {
128
            subj = String.format(JSAPI_DURABLE_CREATE, streamName, durable);
1✔
129
        }
130

131
        ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config, action);
1✔
132
        Message resp = makeRequestResponseRequired(subj, ccr.serialize(), getTimeout());
1✔
133
        return new ConsumerInfo(resp).throwOnHasError();
1✔
134
    }
135

136
    void _createConsumerUnsubscribeOnException(String stream, ConsumerConfiguration cc, NatsJetStreamSubscription sub) throws IOException, JetStreamApiException {
137
        try {
138
            ConsumerInfo ci = _createConsumer(stream, cc, ConsumerCreateRequest.Action.CreateOrUpdate);
1✔
139
            sub.setConsumerName(ci.getName());
1✔
140
        }
141
        catch (IOException | JetStreamApiException e) {
1✔
142
            // create consumer can fail, unsubscribe and then throw the exception to the user
143
            if (sub.getDispatcher() == null) {
1✔
144
                sub.unsubscribe();
1✔
145
            }
146
            else {
147
                sub.getDispatcher().unsubscribe(sub);
1✔
148
            }
149
            throw e;
1✔
150
        }
1✔
151
    }
1✔
152

153
    StreamInfo _getStreamInfo(String streamName, StreamInfoOptions options) throws IOException, JetStreamApiException {
154
        String subj = String.format(JSAPI_STREAM_INFO, streamName);
1✔
155
        StreamInfoReader sir = new StreamInfoReader();
1✔
156
        while (sir.hasMore()) {
1✔
157
            Message resp = makeRequestResponseRequired(subj, sir.nextJson(options), getTimeout());
1✔
158
            sir.process(resp);
1✔
159
        }
1✔
160
        return cacheStreamInfo(streamName, sir.getStreamInfo());
1✔
161
    }
162

163
    StreamInfo createAndCacheStreamInfoThrowOnError(String streamName, Message resp) throws JetStreamApiException {
164
        return cacheStreamInfo(streamName, new StreamInfo(resp).throwOnHasError());
1✔
165
    }
166

167
    StreamInfo cacheStreamInfo(String streamName, StreamInfo si) {
168
        CACHED_STREAM_INFO_MAP.put(streamName, new CachedStreamInfo(si));
1✔
169
        return si;
1✔
170
    }
171

172
    List<StreamInfo> cacheStreamInfo(List<StreamInfo> list) {
173
        list.forEach(si -> CACHED_STREAM_INFO_MAP.put(si.getConfiguration().getName(), new CachedStreamInfo(si)));
1✔
174
        return list;
1✔
175
    }
176

177
    List<String> _getStreamNames(String subjectFilter) throws IOException, JetStreamApiException {
178
        StreamNamesReader snr = new StreamNamesReader();
1✔
179
        while (snr.hasMore()) {
1✔
180
            Message resp = makeRequestResponseRequired(JSAPI_STREAM_NAMES, snr.nextJson(subjectFilter), getTimeout());
1✔
181
            snr.process(resp);
1✔
182
        }
1✔
183
        return snr.getStrings();
1✔
184
    }
185

186
    // ----------------------------------------------------------------------------------------------------
187
    // General Utils
188
    // ----------------------------------------------------------------------------------------------------
189
    ConsumerConfiguration.Builder consumerConfigurationForOrdered(
190
        ConsumerConfiguration initial,
191
        long lastStreamSeq,
192
        String newDeliverSubject,
193
        Long inactiveThreshold)
194
    {
195
        ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(initial);
1✔
196

197
        // push will always give one, pull will always give null
198
        if (newDeliverSubject != null) {
1✔
199
            builder.deliverSubject(newDeliverSubject);
1✔
200
        }
201

202
        // if the last stream seq is > 0, this means this config is for an ordered restart at a sequence
203
        if (lastStreamSeq > 0) {
1✔
204
            builder
1✔
205
                .deliverPolicy(DeliverPolicy.ByStartSequence)
1✔
206
                .startSequence(Math.max(1, lastStreamSeq + 1))
1✔
207
                .startTime(null); // clear start time in case it was originally set
1✔
208
        }
209

210
        if (inactiveThreshold != null) {
1✔
211
            builder.inactiveThreshold(inactiveThreshold);
1✔
212
        }
213

214
        return builder;
1✔
215
    }
216

217
    ConsumerInfo lookupConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
218
        try {
219
            return _getConsumerInfo(streamName, consumerName);
1✔
220
        }
221
        catch (JetStreamApiException e) {
1✔
222
            // The right side of this condition (after the ||) is for backward compatibility with server versions that did not provide api error codes
223
            if (e.getApiErrorCode() == JS_CONSUMER_NOT_FOUND_ERR || (e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer"))) {
1✔
224
                return null;
1✔
225
            }
226
            throw e;
1✔
227
        }
228
    }
229

230
    String lookupStreamBySubject(String subject) throws IOException, JetStreamApiException {
231
        List<String> list = _getStreamNames(subject);
1✔
232
        return list.size() == 1 ? list.get(0) : null;
1✔
233
    }
234

235
    // ----------------------------------------------------------------------------------------------------
236
    // Request Utils
237
    // ----------------------------------------------------------------------------------------------------
238
    Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeout) throws IOException {
239
        try {
240
            return responseRequired(conn.request(prependPrefix(subject), bytes, timeout));
1✔
241
        } catch (InterruptedException e) {
×
242
            Thread.currentThread().interrupt();
×
UNCOV
243
            throw new IOException(e);
×
244
        }
245
    }
246

247
    Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws IOException {
248
        try {
249
            return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish));
1✔
250
        } catch (InterruptedException e) {
×
251
            Thread.currentThread().interrupt();
×
UNCOV
252
            throw new IOException(e);
×
253
        }
254
    }
255

256
    Message responseRequired(Message respMessage) throws IOException {
257
        if (respMessage == null) {
1✔
258
            throw new IOException("Timeout or no response waiting for NATS JetStream server");
1✔
259
        }
260
        return respMessage;
1✔
261
    }
262

263
    String prependPrefix(String subject) {
264
        return jso.getPrefix() + subject;
1✔
265
    }
266

267
    CachedStreamInfo getCachedStreamInfo(String streamName) throws IOException, JetStreamApiException {
268
        CachedStreamInfo csi = CACHED_STREAM_INFO_MAP.get(streamName);
1✔
269
        if (csi != null) {
1✔
270
            return csi;
1✔
271
        }
272
        _getStreamInfo(streamName, null);
×
UNCOV
273
        return CACHED_STREAM_INFO_MAP.get(streamName);
×
274
    }
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc