• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2109

15 Aug 2025 04:51PM UTC coverage: 95.404% (-0.05%) from 95.457%
#2109

push

github

web-flow
Merge pull request #1395 from nats-io/header-nullability

Header nullability

3 of 3 new or added lines in 1 file covered. (100.0%)

14 existing lines in 7 files now uncovered.

11915 of 12489 relevant lines covered (95.4%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.61
/src/main/java/io/nats/client/impl/NatsKeyValue.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.*;
18
import io.nats.client.support.DateTimeUtils;
19
import io.nats.client.support.Validator;
20

21
import java.io.IOException;
22
import java.nio.charset.StandardCharsets;
23
import java.time.ZonedDateTime;
24
import java.util.ArrayList;
25
import java.util.Collections;
26
import java.util.List;
27
import java.util.concurrent.LinkedBlockingQueue;
28

29
import static io.nats.client.support.NatsConstants.DOT;
30
import static io.nats.client.support.NatsConstants.GREATER_THAN;
31
import static io.nats.client.support.NatsJetStreamConstants.JS_WRONG_LAST_SEQUENCE;
32
import static io.nats.client.support.NatsKeyValueUtil.*;
33
import static io.nats.client.support.Validator.*;
34

35
public class NatsKeyValue extends NatsFeatureBase implements KeyValue {
36

37
    private final String bucketName;
38
    private final String streamSubject;
39
    private final String readPrefix;
40
    private final String writePrefix;
41

42
    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
43
        super(connection, kvo);
1✔
44
        this.bucketName = Validator.validateBucketName(bucketName, true);
1✔
45
        streamName = toStreamName(bucketName);
1✔
46
        StreamInfo si;
47
        try {
48
             si = jsm.getStreamInfo(streamName);
1✔
49
        } catch (JetStreamApiException e) {
×
50
            // can't throw directly, that would be a breaking change
51
            throw new IOException(e);
×
52
        }
1✔
53

54
        streamSubject = toStreamSubject(bucketName);
1✔
55
        String readTemp = toKeyPrefix(bucketName);
1✔
56

57
        String writeTemp;
58
        Mirror m = si.getConfiguration().getMirror();
1✔
59
        if (m != null) {
1✔
60
            String bName = trimPrefix(m.getName());
1✔
61
            String mExtApi = m.getExternal() == null ? null : m.getExternal().getApi();
1✔
62
            if (mExtApi == null) {
1✔
63
                writeTemp = toKeyPrefix(bName);
1✔
64
            }
65
            else {
66
                readTemp = toKeyPrefix(bName);
1✔
67
                writeTemp = mExtApi + DOT + toKeyPrefix(bName);
1✔
68
            }
69
        }
1✔
70
        else if (kvo == null || kvo.getJetStreamOptions().isDefaultPrefix()) {
1✔
71
            writeTemp = readTemp;
1✔
72
        }
73
        else {
74
            writeTemp = kvo.getJetStreamOptions().getPrefix() + readTemp;
1✔
75
        }
76

77
        readPrefix = readTemp;
1✔
78
        writePrefix = writeTemp;
1✔
79
    }
1✔
80

81
    String readSubject(String key) {
82
        return readPrefix + key;
1✔
83
    }
84

85
    String writeSubject(String key) {
86
        return writePrefix + key;
1✔
87
    }
88

89
    /**
90
     * {@inheritDoc}
91
     */
92
    @Override
93
    public String getBucketName() {
94
        return bucketName;
1✔
95
    }
96

97
    /**
98
     * {@inheritDoc}
99
     */
100
    @Override
101
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
102
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key)));
1✔
103
    }
104

105
    /**
106
     * {@inheritDoc}
107
     */
108
    @Override
109
    public KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException {
110
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key), revision));
1✔
111
    }
112

113
    KeyValueEntry existingOnly(KeyValueEntry kve) {
114
        return kve == null || kve.getOperation() != KeyValueOperation.PUT ? null : kve;
1✔
115
    }
116

117
    KeyValueEntry _get(String key) throws IOException, JetStreamApiException {
118
        MessageInfo mi = _getLast(readSubject(key));
1✔
119
        return mi == null ? null : new KeyValueEntry(mi);
1✔
120
    }
121

122
    KeyValueEntry _get(String key, long revision) throws IOException, JetStreamApiException {
123
        MessageInfo mi = _getBySeq(revision);
1✔
124
        if (mi != null) {
1✔
125
            KeyValueEntry kve = new KeyValueEntry(mi);
1✔
126
            if (key.equals(kve.getKey())) {
1✔
127
                return kve;
1✔
128
            }
129
        }
130
        return null;
1✔
131
    }
132

133
    /**
134
     * {@inheritDoc}
135
     */
136
    @Override
137
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
138
        return _write(key, value, null, null).getSeqno();
1✔
139
    }
140

141
    /**
142
     * {@inheritDoc}
143
     */
144
    @Override
145
    public long put(String key, String value) throws IOException, JetStreamApiException {
146
        return _write(key, value.getBytes(StandardCharsets.UTF_8), null, null).getSeqno();
1✔
147
    }
148

149
    /**
150
     * {@inheritDoc}
151
     */
152
    @Override
153
    public long put(String key, Number value) throws IOException, JetStreamApiException {
154
        return _write(key, value.toString().getBytes(StandardCharsets.US_ASCII), null, null).getSeqno();
1✔
155
    }
156

157
    /**
158
     * {@inheritDoc}
159
     */
160
    @Override
161
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
162
        return create(key, value, null);
1✔
163
    }
164

165
    @Override
166
    public long create(String key, byte[] value, MessageTtl messageTtl) throws IOException, JetStreamApiException {
167
        validateNonWildcardKvKeyRequired(key);
1✔
168
        try {
169
            return _update(key, value, 0, messageTtl);
1✔
170
        }
171
        catch (JetStreamApiException e) {
1✔
172
            if (e.getApiErrorCode() == JS_WRONG_LAST_SEQUENCE) {
1✔
173
                // must check if the last message for this subject is a delete or purge
174
                // if it was, it's okay to "create" it, as long as someone doesn't create in the meantime
175
                // which is why I use the revision, which must be greater than zero b/c I just tried zero
176
                KeyValueEntry kve = _get(key);
1✔
177
                if (kve != null && kve.getOperation() != KeyValueOperation.PUT) {
1✔
178
                    long revision = kve.getRevision();
1✔
179
                    if (revision > 0) {
1✔
180
                        return _update(key, value, kve.getRevision(), messageTtl);
1✔
181
                    }
182
                }
183
            }
184
            throw e;
1✔
185
        }
186
    }
187

188
    /**
189
     * {@inheritDoc}
190
     */
191
    @Override
192
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
193
        validateNonWildcardKvKeyRequired(key);
1✔
194
        return _update(key, value, expectedRevision, null);
1✔
195
    }
196

197
    private long _update(String key, byte[] value, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
198
        return _write(key, value, null, getPublishOptions(expectedRevision, messageTtl)).getSeqno();
1✔
199
    }
200

201
    /**
202
     * {@inheritDoc}
203
     */
204
    @Override
205
    public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException {
206
        return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision);
×
207
    }
208

209
    /**
210
     * {@inheritDoc}
211
     */
212
    @Override
213
    public void delete(String key) throws IOException, JetStreamApiException {
214
        _write(key, null, getDeleteHeaders(), null);
1✔
215
    }
1✔
216

217
    /**
218
     * {@inheritDoc}
219
     */
220
    @Override
221
    public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException {
222
        _write(key, null, getDeleteHeaders(), getPublishOptions(expectedRevision, null));
1✔
223
    }
1✔
224

225
    /**
226
     * {@inheritDoc}
227
     */
228
    @Override
229
    public void purge(String key) throws IOException, JetStreamApiException {
230
        _write(key, null, getPurgeHeaders(), null);
1✔
231
    }
1✔
232

233
    /**
234
     * {@inheritDoc}
235
     */
236
    @Override
237
    public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException {
238
        _write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, null));
1✔
239
    }
1✔
240

241
    /**
242
     * {@inheritDoc}
243
     */
244
    @Override
245
    public void purge(String key, MessageTtl messageTtl) throws IOException, JetStreamApiException {
UNCOV
246
        _write(key, null, getPurgeHeaders(), getPublishOptions(-1, messageTtl));
×
UNCOV
247
    }
×
248

249
    /**
250
     * {@inheritDoc}
251
     */
252
    @Override
253
    public void purge(String key, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
254
        _write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, messageTtl));
×
255
    }
×
256

257
    private PublishAck _write(String key, byte[] data, Headers h, PublishOptions popts) throws IOException, JetStreamApiException {
258
        validateNonWildcardKvKeyRequired(key);
1✔
259
        return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build(), popts);
1✔
260
    }
261

262
    @Override
263
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
264
        validateKvKeyWildcardAllowedRequired(key);
1✔
265
        validateNotNull(watcher, "Watcher is required");
1✔
266
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1, watchOptions);
1✔
267
    }
268

269
    @Override
270
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
271
        validateKvKeyWildcardAllowedRequired(key);
1✔
272
        validateNotNull(watcher, "Watcher is required");
1✔
273
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions);
1✔
274
    }
275

276
    @Override
277
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
278
        validateKvKeysWildcardAllowedRequired(keys);
1✔
279
        validateNotNull(watcher, "Watcher is required");
1✔
280
        return new NatsKeyValueWatchSubscription(this, keys, watcher, -1, watchOptions);
1✔
281
    }
282

283
    @Override
284
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
285
        validateKvKeysWildcardAllowedRequired(keys);
1✔
286
        validateNotNull(watcher, "Watcher is required");
1✔
287
        return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
1✔
288
    }
289

290
    @Override
291
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
292
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, -1, watchOptions);
1✔
293
    }
294

295
    @Override
296
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
297
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, fromRevision, watchOptions);
1✔
298
    }
299

300
    /**
301
     * {@inheritDoc}
302
     */
303
    @Override
304
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
305
        return _keys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
306
    }
307

308
    @Override
309
    public List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException {
310
        return _keys(Collections.singletonList(readSubject(filter)));
1✔
311
    }
312

313
    @Override
314
    public List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException {
315
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
316
        for (String f : filters) {
1✔
317
            readSubjectFilters.add(readSubject(f));
1✔
318
        }
1✔
319
        return _keys(readSubjectFilters);
1✔
320
    }
321

322
    private List<String> _keys(List<String> readSubjectFilters) throws IOException, JetStreamApiException, InterruptedException {
323
        List<String> list = new ArrayList<>();
1✔
324
        visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
325
            KeyValueOperation op = getOperation(m.getHeaders());
1✔
326
            if (op == KeyValueOperation.PUT) {
1✔
327
                list.add(new BucketAndKey(m).key);
1✔
328
            }
329
        });
1✔
330
        return list;
1✔
331
    }
332

333
    /**
334
     * {@inheritDoc}
335
     */
336
    @Override
337
    public LinkedBlockingQueue<KeyResult> consumeKeys() {
338
        return _consumeKeys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
339
    }
340

341
    /**
342
     * {@inheritDoc}
343
     */
344
    @Override
345
    public LinkedBlockingQueue<KeyResult> consumeKeys(String filter) {
346
        return _consumeKeys(Collections.singletonList(readSubject(filter)));
1✔
347
    }
348

349
    /**
350
     * {@inheritDoc}
351
     */
352
    @Override
353
    public LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters) {
354
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
355
        for (String f : filters) {
1✔
356
            readSubjectFilters.add(readSubject(f));
1✔
357
        }
1✔
358
        return _consumeKeys(readSubjectFilters);
1✔
359
    }
360

361
    private LinkedBlockingQueue<KeyResult> _consumeKeys(List<String> readSubjectFilters) {
362
        LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<>();
1✔
363
        js.conn.getOptions().getExecutor().submit( () -> {
1✔
364
            try {
365
                visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
366
                    KeyValueOperation op = getOperation(m.getHeaders());
1✔
367
                    if (op == KeyValueOperation.PUT) {
1✔
368
                        q.offer(new KeyResult(new BucketAndKey(m).key));
1✔
369
                    }
370
                });
1✔
371
                q.offer(new KeyResult());
1✔
372
            }
373
            catch (IOException | JetStreamApiException e) {
×
374
                q.offer(new KeyResult(e));
×
375
            }
376
            catch (InterruptedException e) {
×
377
                q.offer(new KeyResult(e));
×
378
                Thread.currentThread().interrupt();
×
379
            }
1✔
380
        });
1✔
381

382
        return q;
1✔
383
    }
384

385
    /**
386
     * {@inheritDoc}
387
     */
388
    @Override
389
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
390
        validateNonWildcardKvKeyRequired(key);
1✔
391
        List<KeyValueEntry> list = new ArrayList<>();
1✔
392
        visitSubject(readSubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m)));
1✔
393
        return list;
1✔
394
    }
395

396
    /**
397
     * {@inheritDoc}
398
     */
399
    @Override
400
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
401
        purgeDeletes(null);
1✔
402
    }
1✔
403

404
    /**
405
     * {@inheritDoc}
406
     */
407
    @Override
408
    public void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException {
409
        long dmThresh = options == null
1✔
410
            ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS
411
            : options.getDeleteMarkersThresholdMillis();
1✔
412

413
        ZonedDateTime limit;
414
        if (dmThresh < 0) {
1✔
415
            limit = DateTimeUtils.fromNow(600000); // long enough in the future to clear all
1✔
416
        }
417
        else if (dmThresh == 0) {
1✔
418
            limit = DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS);
×
419
        }
420
        else {
421
            limit = DateTimeUtils.fromNow(-dmThresh);
1✔
422
        }
423

424
        List<String> keep0List = new ArrayList<>();
1✔
425
        List<String> keep1List = new ArrayList<>();
1✔
426
        visitSubject(streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
427
            KeyValueEntry kve = new KeyValueEntry(m);
1✔
428
            if (kve.getOperation() != KeyValueOperation.PUT) {
1✔
429
                if (kve.getCreated().isAfter(limit)) {
1✔
430
                    keep1List.add(new BucketAndKey(m).key);
1✔
431
                }
432
                else {
433
                    keep0List.add(new BucketAndKey(m).key);
1✔
434
                }
435
            }
436
        });
1✔
437

438
        for (String key : keep0List) {
1✔
439
            jsm.purgeStream(streamName, PurgeOptions.subject(readSubject(key)));
1✔
440
        }
1✔
441

442
        for (String key : keep1List) {
1✔
443
            PurgeOptions po = PurgeOptions.builder()
1✔
444
                .subject(readSubject(key))
1✔
445
                .keep(1)
1✔
446
                .build();
1✔
447
            jsm.purgeStream(streamName, po);
1✔
448
        }
1✔
449
    }
1✔
450

451
    /**
452
     * {@inheritDoc}
453
     */
454
    @Override
455
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
456
        return new KeyValueStatus(jsm.getStreamInfo(streamName));
1✔
457
    }
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc