• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1925

20 Mar 2025 03:46PM UTC coverage: 95.662% (-0.3%) from 95.95%
#1925

push

github

web-flow
Merge pull request #1239 from nats-io/main-2-11

Main for server v2.11

179 of 219 new or added lines in 15 files covered. (81.74%)

2 existing lines in 2 files now uncovered.

11600 of 12126 relevant lines covered (95.66%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.69
/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.Error;
18
import io.nats.client.api.*;
19

20
import java.io.IOException;
21
import java.nio.charset.StandardCharsets;
22
import java.time.ZonedDateTime;
23
import java.util.List;
24

25
import static io.nats.client.support.Validator.*;
26

27
public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
28
    private NatsJetStream js; // this is lazy init'ed
29

30
    public NatsJetStreamManagement(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
31
        super(connection, jsOptions);
1✔
32
    }
1✔
33

34
    /**
35
     * {@inheritDoc}
36
     */
37
    @Override
38
    public AccountStatistics getAccountStatistics() throws IOException, JetStreamApiException {
39
        Message resp = makeRequestResponseRequired(JSAPI_ACCOUNT_INFO, null, getTimeout());
1✔
40
        return new AccountStatistics(resp).throwOnHasError();
1✔
41
    }
42

43
    /**
44
     * {@inheritDoc}
45
     */
46
    @Override
47
    public StreamInfo addStream(StreamConfiguration config) throws IOException, JetStreamApiException {
48
        return addOrUpdateStream(config, JSAPI_STREAM_CREATE);
1✔
49
    }
50

51
    /**
52
     * {@inheritDoc}
53
     */
54
    @Override
55
    public StreamInfo updateStream(StreamConfiguration config) throws IOException, JetStreamApiException {
56
        return addOrUpdateStream(config, JSAPI_STREAM_UPDATE);
1✔
57
    }
58

59
    private StreamInfo addOrUpdateStream(StreamConfiguration config, String template) throws IOException, JetStreamApiException {
60
        validateNotNull(config, "Configuration");
1✔
61
        String streamName = config.getName();
1✔
62
        if (nullOrEmpty(streamName)) {
1✔
63
            throw new IllegalArgumentException("Configuration must have a valid stream name");
1✔
64
        }
65

66
        String subj = String.format(template, streamName);
1✔
67
        Message resp = makeRequestResponseRequired(subj, config.toJson().getBytes(StandardCharsets.UTF_8), getTimeout());
1✔
68
        return createAndCacheStreamInfoThrowOnError(streamName, resp);
1✔
69
    }
70

71
    /**
72
     * {@inheritDoc}
73
     */
74
    @Override
75
    public boolean deleteStream(String streamName) throws IOException, JetStreamApiException {
76
        validateNotNull(streamName, "Stream Name");
1✔
77
        String subj = String.format(JSAPI_STREAM_DELETE, streamName);
1✔
78
        Message resp = makeRequestResponseRequired(subj, null, getTimeout());
1✔
79
        return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
1✔
80
    }
81

82
    /**
83
     * {@inheritDoc}
84
     */
85
    @Override
86
    public StreamInfo getStreamInfo(String streamName) throws IOException, JetStreamApiException {
87
        validateNotNull(streamName, "Stream Name");
1✔
88
        return _getStreamInfo(streamName, null);
1✔
89
    }
90

91
    /**
92
     * {@inheritDoc}
93
     */
94
    @Override
95
    public StreamInfo getStreamInfo(String streamName, StreamInfoOptions options) throws IOException, JetStreamApiException {
96
        validateNotNull(streamName, "Stream Name");
1✔
97
        return _getStreamInfo(streamName, options);
1✔
98
    }
99

100
    /**
101
     * {@inheritDoc}
102
     */
103
    @Override
104
    public PurgeResponse purgeStream(String streamName) throws IOException, JetStreamApiException {
105
        validateNotNull(streamName, "Stream Name");
1✔
106
        String subj = String.format(JSAPI_STREAM_PURGE, streamName);
1✔
107
        Message resp = makeRequestResponseRequired(subj, null, getTimeout());
1✔
108
        return new PurgeResponse(resp).throwOnHasError();
1✔
109
    }
110

111
    /**
112
     * {@inheritDoc}
113
     */
114
    @Override
115
    public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws IOException, JetStreamApiException {
116
        validateNotNull(streamName, "Stream Name");
1✔
117
        validateNotNull(options, "Purge Options");
1✔
118
        String subj = String.format(JSAPI_STREAM_PURGE, streamName);
1✔
119
        byte[] body = options.toJson().getBytes(StandardCharsets.UTF_8);
1✔
120
        Message resp = makeRequestResponseRequired(subj, body, getTimeout());
1✔
121
        return new PurgeResponse(resp).throwOnHasError();
1✔
122
    }
123

124
    /**
125
     * {@inheritDoc}
126
     */
127
    @Override
128
    public ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
129
        validateStreamName(streamName, true);
1✔
130
        validateNotNull(config, "Config");
1✔
131
        return _createConsumer(streamName, config, ConsumerCreateRequest.Action.CreateOrUpdate);
1✔
132
    }
133

134
    /**
135
     * {@inheritDoc}
136
     */
137
    @Override
138
    public ConsumerInfo createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
139
        validateStreamName(streamName, true);
1✔
140
        validateNotNull(config, "Config");
1✔
141
        return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Create);
1✔
142
    }
143

144
    /**
145
     * {@inheritDoc}
146
     */
147
    @Override
148
    public ConsumerInfo updateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
149
        validateStreamName(streamName, true);
1✔
150
        validateNotNull(config, "Config");
1✔
151
        return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Update);
1✔
152
    }
153

154
    /**
155
     * {@inheritDoc}
156
     */
157
    @Override
158
    public boolean deleteConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
159
        validateNotNull(streamName, "Stream Name");
1✔
160
        validateNotNull(consumerName, "Consumer Name");
1✔
161
        String subj = String.format(JSAPI_CONSUMER_DELETE, streamName, consumerName);
1✔
162
        Message resp = makeRequestResponseRequired(subj, null, getTimeout());
1✔
163
        return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
1✔
164
    }
165

166
    /**
167
     * {@inheritDoc}
168
     */
169
    @Override
170
    public ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException {
171
        validateNotNull(streamName, "Stream Name");
1✔
172
        validateNotNull(consumerName, "Consumer Name");
1✔
173
        String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
1✔
174
        ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
1✔
175
        Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), getTimeout());
1✔
176
        return new ConsumerPauseResponse(resp).throwOnHasError();
1✔
177
    }
178

179
    /**
180
     * {@inheritDoc}
181
     */
182
    @Override
183
    public boolean resumeConsumer(String streamName, String consumerName) throws IOException, JetStreamApiException {
184
        validateNotNull(streamName, "Stream Name");
1✔
185
        validateNotNull(consumerName, "Consumer Name");
1✔
186
        String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
1✔
187
        Message resp = makeRequestResponseRequired(subj, null, getTimeout());
1✔
188
        ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
1✔
189
        return !response.isPaused();
1✔
190
    }
191

192
    /**
193
     * {@inheritDoc}
194
     */
195
    @Override
196
    public ConsumerInfo getConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
197
        return super._getConsumerInfo(streamName, consumerName);
1✔
198
    }
199

200
    /**
201
     * {@inheritDoc}
202
     */
203
    @Override
204
    public List<String> getConsumerNames(String streamName) throws IOException, JetStreamApiException {
205
        return getConsumerNames(streamName, null);
1✔
206
    }
207

208
    // TODO FUTURE resurface this api publicly when server supports
209
    // @Override
210
    private List<String> getConsumerNames(String streamName, String filter) throws IOException, JetStreamApiException {
211
        String subj = String.format(JSAPI_CONSUMER_NAMES, streamName);
1✔
212
        ConsumerNamesReader cnr = new ConsumerNamesReader();
1✔
213
        while (cnr.hasMore()) {
1✔
214
            Message resp = makeRequestResponseRequired(subj, cnr.nextJson(filter), getTimeout());
1✔
215
            cnr.process(resp);
1✔
216
        }
1✔
217
        return cnr.getStrings();
1✔
218
    }
219

220
    /**
221
     * {@inheritDoc}
222
     */
223
    @Override
224
    public List<ConsumerInfo> getConsumers(String streamName) throws IOException, JetStreamApiException {
225
        String subj = String.format(JSAPI_CONSUMER_LIST, streamName);
1✔
226
        ConsumerListReader clg = new ConsumerListReader();
1✔
227
        while (clg.hasMore()) {
1✔
228
            Message resp = makeRequestResponseRequired(subj, clg.nextJson(), getTimeout());
1✔
229
            clg.process(resp);
1✔
230
        }
1✔
231
        return clg.getConsumers();
1✔
232
    }
233

234
    /**
235
     * {@inheritDoc}
236
     */
237
    @Override
238
    public List<String> getStreamNames() throws IOException, JetStreamApiException {
239
        return _getStreamNames(null);
1✔
240
    }
241

242
    /**
243
     * {@inheritDoc}
244
     */
245
    @Override
246
    public List<String> getStreamNames(String subjectFilter) throws IOException, JetStreamApiException {
247
        return _getStreamNames(subjectFilter);
1✔
248
    }
249

250
    /**
251
     * {@inheritDoc}
252
     */
253
    @Override
254
    public List<StreamInfo> getStreams() throws IOException, JetStreamApiException {
255
        return getStreams(null);
1✔
256
    }
257

258
    @Override
259
    public List<StreamInfo> getStreams(String subjectFilter) throws IOException, JetStreamApiException {
260
        StreamListReader slr = new StreamListReader();
1✔
261
        while (slr.hasMore()) {
1✔
262
            Message resp = makeRequestResponseRequired(JSAPI_STREAM_LIST, slr.nextJson(subjectFilter), getTimeout());
1✔
263
            slr.process(resp);
1✔
264
        }
1✔
265
        return cacheStreamInfo(slr.getStreams());
1✔
266
    }
267

268
    /**
269
     * {@inheritDoc}
270
     */
271
    @Override
272
    public MessageInfo getMessage(String streamName, long seq) throws IOException, JetStreamApiException {
273
        return _getMessage(streamName, MessageGetRequest.forSequence(seq));
1✔
274
    }
275

276
    @Override
277
    public MessageInfo getMessage(String streamName, MessageGetRequest messageGetRequest) throws IOException, JetStreamApiException {
NEW
278
        return _getMessage(streamName, messageGetRequest);
×
279
    }
280

281
    /**
282
     * {@inheritDoc}
283
     */
284
    @Override
285
    public MessageInfo getLastMessage(String streamName, String subject) throws IOException, JetStreamApiException {
286
        return _getMessage(streamName, MessageGetRequest.lastForSubject(subject));
1✔
287
    }
288

289
    /**
290
     * {@inheritDoc}
291
     */
292
    @Override
293
    public MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException {
294
        return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
1✔
295
    }
296

297
    /**
298
     * {@inheritDoc}
299
     */
300
    @Override
301
    public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
NEW
302
        return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
×
303
    }
304

305
    /**
306
     * {@inheritDoc}
307
     */
308
    @Override
309
    public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
NEW
310
        return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
×
311
    }
312

313
    /**
314
     * {@inheritDoc}
315
     */
316
    @Override
317
    public MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException {
318
        return _getMessage(streamName, MessageGetRequest.nextForSubject(seq, subject));
1✔
319
    }
320

321
    private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetRequest) throws IOException, JetStreamApiException {
322
        validateNotNull(messageGetRequest, "Message Get Request");
1✔
323
        CachedStreamInfo csi = getCachedStreamInfo(streamName);
1✔
324
        if (csi.allowDirect) {
1✔
325
            String subject;
326
            byte[] payload;
327
            if (messageGetRequest.isLastBySubject()) {
1✔
328
                subject = String.format(JSAPI_DIRECT_GET_LAST, streamName, messageGetRequest.getLastBySubject());
1✔
329
                payload = null;
1✔
330
            }
331
            else{
332
                subject = String.format(JSAPI_DIRECT_GET, streamName);
1✔
333
                payload = messageGetRequest.serialize();
1✔
334
            }
335
            Message resp = makeRequestResponseRequired(subject, payload, getTimeout());
1✔
336
            if (resp.isStatusMessage()) {
1✔
337
                throw new JetStreamApiException(Error.convert(resp.getStatus()));
1✔
338
            }
339
            return new MessageInfo(resp, streamName, true);
1✔
340
        }
341
        else {
342
            String getSubject = String.format(JSAPI_MSG_GET, streamName);
1✔
343
            Message resp = makeRequestResponseRequired(getSubject, messageGetRequest.serialize(), getTimeout());
1✔
344
            return new MessageInfo(resp, streamName, false).throwOnHasError();
1✔
345
        }
346
    }
347

348
    /**
349
     * {@inheritDoc}
350
     */
351
    @Override
352
    public boolean deleteMessage(String streamName, long seq) throws IOException, JetStreamApiException {
353
        return deleteMessage(streamName, seq, true);
1✔
354
    }
355

356
    /**
357
     * {@inheritDoc}
358
     */
359
    @Override
360
    public boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException {
361
        validateNotNull(streamName, "Stream Name");
1✔
362
        String subj = String.format(JSAPI_MSG_DELETE, streamName);
1✔
363
        MessageDeleteRequest mdr = new MessageDeleteRequest(seq, erase);
1✔
364
        Message resp = makeRequestResponseRequired(subj, mdr.serialize(), getTimeout());
1✔
365
        return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
1✔
366
    }
367

368
    /**
369
     * {@inheritDoc}
370
     */
371
    @Override
372
    public JetStream jetStream() {
373
        if (js == null) {
×
374
            js = new NatsJetStream(this);
×
375
        }
376
        return js;
×
377
    }
378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc