• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1952

23 Apr 2025 11:32AM UTC coverage: 95.625% (+0.03%) from 95.591%
#1952

push

github

web-flow
Merge pull request #1308 from nats-io/update-consumeKeys-to-return-immediately

Update KV consumeKeys() to return the BlockingQueue immediately.

9 of 14 new or added lines in 1 file covered. (64.29%)

1 existing line in 1 file now uncovered.

11650 of 12183 relevant lines covered (95.63%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.0
/src/main/java/io/nats/client/impl/NatsKeyValue.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.JetStreamApiException;
17
import io.nats.client.KeyValue;
18
import io.nats.client.KeyValueOptions;
19
import io.nats.client.PurgeOptions;
20
import io.nats.client.api.*;
21
import io.nats.client.support.DateTimeUtils;
22
import io.nats.client.support.Validator;
23

24
import java.io.IOException;
25
import java.nio.charset.StandardCharsets;
26
import java.time.ZonedDateTime;
27
import java.util.ArrayList;
28
import java.util.Collections;
29
import java.util.List;
30
import java.util.concurrent.LinkedBlockingQueue;
31

32
import static io.nats.client.support.NatsConstants.DOT;
33
import static io.nats.client.support.NatsConstants.GREATER_THAN;
34
import static io.nats.client.support.NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR;
35
import static io.nats.client.support.NatsJetStreamConstants.JS_WRONG_LAST_SEQUENCE;
36
import static io.nats.client.support.NatsKeyValueUtil.*;
37
import static io.nats.client.support.Validator.*;
38

39
public class NatsKeyValue extends NatsFeatureBase implements KeyValue {
40

41
    private final String bucketName;
42
    private final String streamSubject;
43
    private final String readPrefix;
44
    private final String writePrefix;
45

46
    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
47
        super(connection, kvo);
1✔
48
        this.bucketName = Validator.validateBucketName(bucketName, true);
1✔
49
        streamName = toStreamName(bucketName);
1✔
50
        StreamInfo si;
51
        try {
52
             si = jsm.getStreamInfo(streamName);
1✔
53
        } catch (JetStreamApiException e) {
×
54
            // can't throw directly, that would be a breaking change
55
            throw new IOException(e);
×
56
        }
1✔
57

58
        streamSubject = toStreamSubject(bucketName);
1✔
59
        String readTemp = toKeyPrefix(bucketName);
1✔
60

61
        String writeTemp;
62
        Mirror m = si.getConfiguration().getMirror();
1✔
63
        if (m != null) {
1✔
64
            String bName = trimPrefix(m.getName());
1✔
65
            String mExtApi = m.getExternal() == null ? null : m.getExternal().getApi();
1✔
66
            if (mExtApi == null) {
1✔
67
                writeTemp = toKeyPrefix(bName);
1✔
68
            }
69
            else {
70
                readTemp = toKeyPrefix(bName);
1✔
71
                writeTemp = mExtApi + DOT + toKeyPrefix(bName);
1✔
72
            }
73
        }
1✔
74
        else if (kvo == null || kvo.getJetStreamOptions().isDefaultPrefix()) {
1✔
75
            writeTemp = readTemp;
1✔
76
        }
77
        else {
78
            writeTemp = kvo.getJetStreamOptions().getPrefix() + readTemp;
1✔
79
        }
80

81
        readPrefix = readTemp;
1✔
82
        writePrefix = writeTemp;
1✔
83
    }
1✔
84

85
    String readSubject(String key) {
86
        return readPrefix + key;
1✔
87
    }
88

89
    String writeSubject(String key) {
90
        return writePrefix + key;
1✔
91
    }
92

93
    /**
94
     * {@inheritDoc}
95
     */
96
    @Override
97
    public String getBucketName() {
98
        return bucketName;
1✔
99
    }
100

101
    /**
102
     * {@inheritDoc}
103
     */
104
    @Override
105
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
106
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key)));
1✔
107
    }
108

109
    /**
110
     * {@inheritDoc}
111
     */
112
    @Override
113
    public KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException {
114
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key), revision));
1✔
115
    }
116

117
    KeyValueEntry existingOnly(KeyValueEntry kve) {
118
        return kve == null || kve.getOperation() != KeyValueOperation.PUT ? null : kve;
1✔
119
    }
120

121
    KeyValueEntry _get(String key) throws IOException, JetStreamApiException {
122
        MessageInfo mi = _getLast(readSubject(key));
1✔
123
        return mi == null ? null : new KeyValueEntry(mi);
1✔
124
    }
125

126
    KeyValueEntry _get(String key, long revision) throws IOException, JetStreamApiException {
127
        MessageInfo mi = _getBySeq(revision);
1✔
128
        if (mi != null) {
1✔
129
            KeyValueEntry kve = new KeyValueEntry(mi);
1✔
130
            if (key.equals(kve.getKey())) {
1✔
131
                return kve;
1✔
132
            }
133
        }
134
        return null;
1✔
135
    }
136

137
    /**
138
     * {@inheritDoc}
139
     */
140
    @Override
141
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
142
        return _write(key, value, null).getSeqno();
1✔
143
    }
144

145
    /**
146
     * {@inheritDoc}
147
     */
148
    @Override
149
    public long put(String key, String value) throws IOException, JetStreamApiException {
150
        return _write(key, value.getBytes(StandardCharsets.UTF_8), null).getSeqno();
1✔
151
    }
152

153
    /**
154
     * {@inheritDoc}
155
     */
156
    @Override
157
    public long put(String key, Number value) throws IOException, JetStreamApiException {
158
        return _write(key, value.toString().getBytes(StandardCharsets.US_ASCII), null).getSeqno();
1✔
159
    }
160

161
    /**
162
     * {@inheritDoc}
163
     */
164
    @Override
165
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
166
        validateNonWildcardKvKeyRequired(key);
1✔
167
        try {
168
            return update(key, value, 0);
1✔
169
        }
170
        catch (JetStreamApiException e) {
1✔
171
            if (e.getApiErrorCode() == JS_WRONG_LAST_SEQUENCE) {
1✔
172
                // must check if the last message for this subject is a delete or purge
173
                KeyValueEntry kve = _get(key);
1✔
174
                if (kve != null && kve.getOperation() != KeyValueOperation.PUT) {
1✔
175
                    return update(key, value, kve.getRevision());
1✔
176
                }
177
            }
178
            throw e;
1✔
179
        }
180
    }
181

182
    /**
183
     * {@inheritDoc}
184
     */
185
    @Override
186
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
187
        validateNonWildcardKvKeyRequired(key);
1✔
188
        Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));
1✔
189
        return _write(key, value, h).getSeqno();
1✔
190
    }
191

192
    /**
193
     * {@inheritDoc}
194
     */
195
    @Override
196
    public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException {
197
        return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision);
×
198
    }
199

200
    /**
201
     * {@inheritDoc}
202
     */
203
    @Override
204
    public void delete(String key) throws IOException, JetStreamApiException {
205
        _write(key, null, getDeleteHeaders());
1✔
206
    }
1✔
207

208
    /**
209
     * {@inheritDoc}
210
     */
211
    @Override
212
    public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException {
213
        Headers h = getDeleteHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));
1✔
214
        _write(key, null, h);
1✔
215
    }
1✔
216

217
    /**
218
     * {@inheritDoc}
219
     */
220
    @Override
221
    public void purge(String key) throws IOException, JetStreamApiException {
222
        _write(key, null, getPurgeHeaders());
1✔
223
    }
1✔
224

225
    /**
226
     * {@inheritDoc}
227
     */
228
    @Override
229
    public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException {
230
        Headers h = getPurgeHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));
1✔
231
        _write(key, null, h);
1✔
232
    }
1✔
233

234
    private PublishAck _write(String key, byte[] data, Headers h) throws IOException, JetStreamApiException {
235
        validateNonWildcardKvKeyRequired(key);
1✔
236
        return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build());
1✔
237
    }
238

239
    @Override
240
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
241
        validateKvKeyWildcardAllowedRequired(key);
1✔
242
        validateNotNull(watcher, "Watcher is required");
1✔
243
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1, watchOptions);
1✔
244
    }
245

246
    @Override
247
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
248
        validateKvKeyWildcardAllowedRequired(key);
1✔
249
        validateNotNull(watcher, "Watcher is required");
1✔
250
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions);
1✔
251
    }
252

253
    @Override
254
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
255
        validateKvKeysWildcardAllowedRequired(keys);
1✔
256
        validateNotNull(watcher, "Watcher is required");
1✔
257
        return new NatsKeyValueWatchSubscription(this, keys, watcher, -1, watchOptions);
1✔
258
    }
259

260
    @Override
261
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
262
        validateKvKeysWildcardAllowedRequired(keys);
1✔
263
        validateNotNull(watcher, "Watcher is required");
1✔
264
        return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
1✔
265
    }
266

267
    @Override
268
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
269
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, -1, watchOptions);
1✔
270
    }
271

272
    @Override
273
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
274
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, fromRevision, watchOptions);
1✔
275
    }
276

277
    /**
278
     * {@inheritDoc}
279
     */
280
    @Override
281
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
282
        return _keys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
283
    }
284

285
    @Override
286
    public List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException {
287
        return _keys(Collections.singletonList(readSubject(filter)));
1✔
288
    }
289

290
    @Override
291
    public List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException {
292
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
293
        for (String f : filters) {
1✔
294
            readSubjectFilters.add(readSubject(f));
1✔
295
        }
1✔
296
        return _keys(readSubjectFilters);
1✔
297
    }
298

299
    private List<String> _keys(List<String> readSubjectFilters) throws IOException, JetStreamApiException, InterruptedException {
300
        List<String> list = new ArrayList<>();
1✔
301
        visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
302
            KeyValueOperation op = getOperation(m.getHeaders());
1✔
303
            if (op == KeyValueOperation.PUT) {
1✔
304
                list.add(new BucketAndKey(m).key);
1✔
305
            }
306
        });
1✔
307
        return list;
1✔
308
    }
309

310
    /**
311
     * {@inheritDoc}
312
     */
313
    @Override
314
    public LinkedBlockingQueue<KeyResult> consumeKeys() {
315
        return _consumeKeys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
316
    }
317

318
    /**
319
     * {@inheritDoc}
320
     */
321
    @Override
322
    public LinkedBlockingQueue<KeyResult> consumeKeys(String filter) {
323
        return _consumeKeys(Collections.singletonList(readSubject(filter)));
1✔
324
    }
325

326
    /**
327
     * {@inheritDoc}
328
     */
329
    @Override
330
    public LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters) {
331
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
332
        for (String f : filters) {
1✔
333
            readSubjectFilters.add(readSubject(f));
1✔
334
        }
1✔
335
        return _consumeKeys(readSubjectFilters);
1✔
336
    }
337

338
    private LinkedBlockingQueue<KeyResult> _consumeKeys(List<String> readSubjectFilters) {
339
        LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<>();
1✔
340
        js.conn.getOptions().getExecutor().submit( () -> {
1✔
341
            try {
342
                visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
343
                    KeyValueOperation op = getOperation(m.getHeaders());
1✔
344
                    if (op == KeyValueOperation.PUT) {
1✔
345
                        q.offer(new KeyResult(new BucketAndKey(m).key));
1✔
346
                    }
347
                });
1✔
348
                q.offer(new KeyResult());
1✔
349
            }
NEW
350
            catch (IOException | JetStreamApiException e) {
×
NEW
351
                q.offer(new KeyResult(e));
×
352
            }
NEW
353
            catch (InterruptedException e) {
×
NEW
354
                q.offer(new KeyResult(e));
×
NEW
355
                Thread.currentThread().interrupt();
×
356
            }
1✔
357
        });
1✔
358

359
        return q;
1✔
360
    }
361

362
    /**
363
     * {@inheritDoc}
364
     */
365
    @Override
366
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
367
        validateNonWildcardKvKeyRequired(key);
1✔
368
        List<KeyValueEntry> list = new ArrayList<>();
1✔
369
        visitSubject(readSubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m)));
1✔
370
        return list;
1✔
371
    }
372

373
    /**
374
     * {@inheritDoc}
375
     */
376
    @Override
377
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
378
        purgeDeletes(null);
1✔
379
    }
1✔
380

381
    /**
382
     * {@inheritDoc}
383
     */
384
    @Override
385
    public void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException {
386
        long dmThresh = options == null
1✔
387
            ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS
388
            : options.getDeleteMarkersThresholdMillis();
1✔
389

390
        ZonedDateTime limit;
391
        if (dmThresh < 0) {
1✔
392
            limit = DateTimeUtils.fromNow(600000); // long enough in the future to clear all
1✔
393
        }
394
        else if (dmThresh == 0) {
1✔
395
            limit = DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS);
×
396
        }
397
        else {
398
            limit = DateTimeUtils.fromNow(-dmThresh);
1✔
399
        }
400

401
        List<String> keep0List = new ArrayList<>();
1✔
402
        List<String> keep1List = new ArrayList<>();
1✔
403
        visitSubject(streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
404
            KeyValueEntry kve = new KeyValueEntry(m);
1✔
405
            if (kve.getOperation() != KeyValueOperation.PUT) {
1✔
406
                if (kve.getCreated().isAfter(limit)) {
1✔
407
                    keep1List.add(new BucketAndKey(m).key);
1✔
408
                }
409
                else {
410
                    keep0List.add(new BucketAndKey(m).key);
1✔
411
                }
412
            }
413
        });
1✔
414

415
        for (String key : keep0List) {
1✔
416
            jsm.purgeStream(streamName, PurgeOptions.subject(readSubject(key)));
1✔
417
        }
1✔
418

419
        for (String key : keep1List) {
1✔
420
            PurgeOptions po = PurgeOptions.builder()
1✔
421
                .subject(readSubject(key))
1✔
422
                .keep(1)
1✔
423
                .build();
1✔
424
            jsm.purgeStream(streamName, po);
1✔
425
        }
1✔
426
    }
1✔
427

428
    /**
429
     * {@inheritDoc}
430
     */
431
    @Override
432
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
433
        return new KeyValueStatus(jsm.getStreamInfo(streamName));
1✔
434
    }
435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc