• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2044

04 Jul 2025 02:04PM UTC coverage: 95.609% (+0.001%) from 95.608%
#2044

push

github

web-flow
Merge pull request #1344 from nats-io/kv-purge-per-message-ttl

KV Purge Per Message TTL

2 of 4 new or added lines in 1 file covered. (50.0%)

1 existing line in 1 file now uncovered.

11802 of 12344 relevant lines covered (95.61%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.81
/src/main/java/io/nats/client/impl/NatsKeyValue.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.*;
18
import io.nats.client.support.DateTimeUtils;
19
import io.nats.client.support.Validator;
20

21
import java.io.IOException;
22
import java.nio.charset.StandardCharsets;
23
import java.time.ZonedDateTime;
24
import java.util.ArrayList;
25
import java.util.Collections;
26
import java.util.List;
27
import java.util.concurrent.LinkedBlockingQueue;
28

29
import static io.nats.client.support.NatsConstants.DOT;
30
import static io.nats.client.support.NatsConstants.GREATER_THAN;
31
import static io.nats.client.support.NatsJetStreamConstants.JS_WRONG_LAST_SEQUENCE;
32
import static io.nats.client.support.NatsKeyValueUtil.*;
33
import static io.nats.client.support.Validator.*;
34

35
public class NatsKeyValue extends NatsFeatureBase implements KeyValue {
36

37
    private final String bucketName;
38
    private final String streamSubject;
39
    private final String readPrefix;
40
    private final String writePrefix;
41

42
    NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
43
        super(connection, kvo);
1✔
44
        this.bucketName = Validator.validateBucketName(bucketName, true);
1✔
45
        streamName = toStreamName(bucketName);
1✔
46
        StreamInfo si;
47
        try {
48
             si = jsm.getStreamInfo(streamName);
1✔
49
        } catch (JetStreamApiException e) {
×
50
            // can't throw directly, that would be a breaking change
51
            throw new IOException(e);
×
52
        }
1✔
53

54
        streamSubject = toStreamSubject(bucketName);
1✔
55
        String readTemp = toKeyPrefix(bucketName);
1✔
56

57
        String writeTemp;
58
        Mirror m = si.getConfiguration().getMirror();
1✔
59
        if (m != null) {
1✔
60
            String bName = trimPrefix(m.getName());
1✔
61
            String mExtApi = m.getExternal() == null ? null : m.getExternal().getApi();
1✔
62
            if (mExtApi == null) {
1✔
63
                writeTemp = toKeyPrefix(bName);
1✔
64
            }
65
            else {
66
                readTemp = toKeyPrefix(bName);
1✔
67
                writeTemp = mExtApi + DOT + toKeyPrefix(bName);
1✔
68
            }
69
        }
1✔
70
        else if (kvo == null || kvo.getJetStreamOptions().isDefaultPrefix()) {
1✔
71
            writeTemp = readTemp;
1✔
72
        }
73
        else {
74
            writeTemp = kvo.getJetStreamOptions().getPrefix() + readTemp;
1✔
75
        }
76

77
        readPrefix = readTemp;
1✔
78
        writePrefix = writeTemp;
1✔
79
    }
1✔
80

81
    String readSubject(String key) {
82
        return readPrefix + key;
1✔
83
    }
84

85
    String writeSubject(String key) {
86
        return writePrefix + key;
1✔
87
    }
88

89
    /**
90
     * {@inheritDoc}
91
     */
92
    @Override
93
    public String getBucketName() {
94
        return bucketName;
1✔
95
    }
96

97
    /**
98
     * {@inheritDoc}
99
     */
100
    @Override
101
    public KeyValueEntry get(String key) throws IOException, JetStreamApiException {
102
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key)));
1✔
103
    }
104

105
    /**
106
     * {@inheritDoc}
107
     */
108
    @Override
109
    public KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException {
110
        return existingOnly(_get(validateNonWildcardKvKeyRequired(key), revision));
1✔
111
    }
112

113
    KeyValueEntry existingOnly(KeyValueEntry kve) {
114
        return kve == null || kve.getOperation() != KeyValueOperation.PUT ? null : kve;
1✔
115
    }
116

117
    KeyValueEntry _get(String key) throws IOException, JetStreamApiException {
118
        MessageInfo mi = _getLast(readSubject(key));
1✔
119
        return mi == null ? null : new KeyValueEntry(mi);
1✔
120
    }
121

122
    KeyValueEntry _get(String key, long revision) throws IOException, JetStreamApiException {
123
        MessageInfo mi = _getBySeq(revision);
1✔
124
        if (mi != null) {
1✔
125
            KeyValueEntry kve = new KeyValueEntry(mi);
1✔
126
            if (key.equals(kve.getKey())) {
1✔
127
                return kve;
1✔
128
            }
129
        }
130
        return null;
1✔
131
    }
132

133
    /**
134
     * {@inheritDoc}
135
     */
136
    @Override
137
    public long put(String key, byte[] value) throws IOException, JetStreamApiException {
138
        return _write(key, value, null, null).getSeqno();
1✔
139
    }
140

141
    /**
142
     * {@inheritDoc}
143
     */
144
    @Override
145
    public long put(String key, String value) throws IOException, JetStreamApiException {
146
        return _write(key, value.getBytes(StandardCharsets.UTF_8), null, null).getSeqno();
1✔
147
    }
148

149
    /**
150
     * {@inheritDoc}
151
     */
152
    @Override
153
    public long put(String key, Number value) throws IOException, JetStreamApiException {
154
        return _write(key, value.toString().getBytes(StandardCharsets.US_ASCII), null, null).getSeqno();
1✔
155
    }
156

157
    /**
158
     * {@inheritDoc}
159
     */
160
    @Override
161
    public long create(String key, byte[] value) throws IOException, JetStreamApiException {
162
        return create(key, value, null);
1✔
163
    }
164

165
    @Override
166
    public long create(String key, byte[] value, MessageTtl messageTtl) throws IOException, JetStreamApiException {
167
        validateNonWildcardKvKeyRequired(key);
1✔
168
        try {
169
            return _update(key, value, 0, messageTtl);
1✔
170
        }
171
        catch (JetStreamApiException e) {
1✔
172
            if (e.getApiErrorCode() == JS_WRONG_LAST_SEQUENCE) {
1✔
173
                // must check if the last message for this subject is a delete or purge
174
                KeyValueEntry kve = _get(key);
1✔
175
                if (kve != null && kve.getOperation() != KeyValueOperation.PUT) {
1✔
176
                    return _update(key, value, kve.getRevision(), messageTtl);
1✔
177
                }
178
            }
179
            throw e;
1✔
180
        }
181
    }
182

183
    /**
184
     * {@inheritDoc}
185
     */
186
    @Override
187
    public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {
188
        validateNonWildcardKvKeyRequired(key);
1✔
189
        return _update(key, value, expectedRevision, null);
1✔
190
    }
191

192
    private long _update(String key, byte[] value, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
193
        return _write(key, value, null, getPublishOptions(expectedRevision, messageTtl)).getSeqno();
1✔
194
    }
195

196
    /**
197
     * {@inheritDoc}
198
     */
199
    @Override
200
    public long update(String key, String value, long expectedRevision) throws IOException, JetStreamApiException {
201
        return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision);
×
202
    }
203

204
    /**
205
     * {@inheritDoc}
206
     */
207
    @Override
208
    public void delete(String key) throws IOException, JetStreamApiException {
209
        _write(key, null, getDeleteHeaders(), null);
1✔
210
    }
1✔
211

212
    /**
213
     * {@inheritDoc}
214
     */
215
    @Override
216
    public void delete(String key, long expectedRevision) throws IOException, JetStreamApiException {
217
        _write(key, null, getDeleteHeaders(), getPublishOptions(expectedRevision, null));
1✔
218
    }
1✔
219

220
    /**
221
     * {@inheritDoc}
222
     */
223
    @Override
224
    public void purge(String key) throws IOException, JetStreamApiException {
225
        _write(key, null, getPurgeHeaders(), null);
1✔
226
    }
1✔
227

228
    /**
229
     * {@inheritDoc}
230
     */
231
    @Override
232
    public void purge(String key, long expectedRevision) throws IOException, JetStreamApiException {
233
        _write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, null));
1✔
234
    }
1✔
235

236
    /**
237
     * {@inheritDoc}
238
     */
239
    @Override
240
    public void purge(String key, MessageTtl messageTtl) throws IOException, JetStreamApiException {
241
        _write(key, null, getPurgeHeaders(), getPublishOptions(-1, messageTtl));
1✔
242
    }
1✔
243

244
    /**
245
     * {@inheritDoc}
246
     */
247
    @Override
248
    public void purge(String key, long expectedRevision, MessageTtl messageTtl) throws IOException, JetStreamApiException {
NEW
249
        _write(key, null, getPurgeHeaders(), getPublishOptions(expectedRevision, messageTtl));
×
NEW
250
    }
×
251

252
    private PublishAck _write(String key, byte[] data, Headers h, PublishOptions popts) throws IOException, JetStreamApiException {
253
        validateNonWildcardKvKeyRequired(key);
1✔
254
        return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build(), popts);
1✔
255
    }
256

257
    @Override
258
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
259
        validateKvKeyWildcardAllowedRequired(key);
1✔
260
        validateNotNull(watcher, "Watcher is required");
1✔
261
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1, watchOptions);
1✔
262
    }
263

264
    @Override
265
    public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
266
        validateKvKeyWildcardAllowedRequired(key);
1✔
267
        validateNotNull(watcher, "Watcher is required");
1✔
268
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions);
1✔
269
    }
270

271
    @Override
272
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
273
        validateKvKeysWildcardAllowedRequired(keys);
1✔
274
        validateNotNull(watcher, "Watcher is required");
1✔
275
        return new NatsKeyValueWatchSubscription(this, keys, watcher, -1, watchOptions);
1✔
276
    }
277

278
    @Override
279
    public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
280
        validateKvKeysWildcardAllowedRequired(keys);
1✔
281
        validateNotNull(watcher, "Watcher is required");
1✔
282
        return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
1✔
283
    }
284

285
    @Override
286
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
287
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, -1, watchOptions);
1✔
288
    }
289

290
    @Override
291
    public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
292
        return new NatsKeyValueWatchSubscription(this, Collections.singletonList(GREATER_THAN), watcher, fromRevision, watchOptions);
1✔
293
    }
294

295
    /**
296
     * {@inheritDoc}
297
     */
298
    @Override
299
    public List<String> keys() throws IOException, JetStreamApiException, InterruptedException {
300
        return _keys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
301
    }
302

303
    @Override
304
    public List<String> keys(String filter) throws IOException, JetStreamApiException, InterruptedException {
305
        return _keys(Collections.singletonList(readSubject(filter)));
1✔
306
    }
307

308
    @Override
309
    public List<String> keys(List<String> filters) throws IOException, JetStreamApiException, InterruptedException {
310
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
311
        for (String f : filters) {
1✔
312
            readSubjectFilters.add(readSubject(f));
1✔
313
        }
1✔
314
        return _keys(readSubjectFilters);
1✔
315
    }
316

317
    private List<String> _keys(List<String> readSubjectFilters) throws IOException, JetStreamApiException, InterruptedException {
318
        List<String> list = new ArrayList<>();
1✔
319
        visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
320
            KeyValueOperation op = getOperation(m.getHeaders());
1✔
321
            if (op == KeyValueOperation.PUT) {
1✔
322
                list.add(new BucketAndKey(m).key);
1✔
323
            }
324
        });
1✔
325
        return list;
1✔
326
    }
327

328
    /**
329
     * {@inheritDoc}
330
     */
331
    @Override
332
    public LinkedBlockingQueue<KeyResult> consumeKeys() {
333
        return _consumeKeys(Collections.singletonList(readSubject(GREATER_THAN)));
1✔
334
    }
335

336
    /**
337
     * {@inheritDoc}
338
     */
339
    @Override
340
    public LinkedBlockingQueue<KeyResult> consumeKeys(String filter) {
341
        return _consumeKeys(Collections.singletonList(readSubject(filter)));
1✔
342
    }
343

344
    /**
345
     * {@inheritDoc}
346
     */
347
    @Override
348
    public LinkedBlockingQueue<KeyResult> consumeKeys(List<String> filters) {
349
        List<String> readSubjectFilters = new ArrayList<>(filters.size());
1✔
350
        for (String f : filters) {
1✔
351
            readSubjectFilters.add(readSubject(f));
1✔
352
        }
1✔
353
        return _consumeKeys(readSubjectFilters);
1✔
354
    }
355

356
    private LinkedBlockingQueue<KeyResult> _consumeKeys(List<String> readSubjectFilters) {
357
        LinkedBlockingQueue<KeyResult> q = new LinkedBlockingQueue<>();
1✔
358
        js.conn.getOptions().getExecutor().submit( () -> {
1✔
359
            try {
360
                visitSubject(readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
361
                    KeyValueOperation op = getOperation(m.getHeaders());
1✔
362
                    if (op == KeyValueOperation.PUT) {
1✔
363
                        q.offer(new KeyResult(new BucketAndKey(m).key));
1✔
364
                    }
365
                });
1✔
366
                q.offer(new KeyResult());
1✔
367
            }
368
            catch (IOException | JetStreamApiException e) {
×
369
                q.offer(new KeyResult(e));
×
370
            }
371
            catch (InterruptedException e) {
×
372
                q.offer(new KeyResult(e));
×
373
                Thread.currentThread().interrupt();
×
374
            }
1✔
375
        });
1✔
376

377
        return q;
1✔
378
    }
379

380
    /**
381
     * {@inheritDoc}
382
     */
383
    @Override
384
    public List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException {
385
        validateNonWildcardKvKeyRequired(key);
1✔
386
        List<KeyValueEntry> list = new ArrayList<>();
1✔
387
        visitSubject(readSubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m)));
1✔
388
        return list;
1✔
389
    }
390

391
    /**
392
     * {@inheritDoc}
393
     */
394
    @Override
395
    public void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException {
396
        purgeDeletes(null);
1✔
397
    }
1✔
398

399
    /**
400
     * {@inheritDoc}
401
     */
402
    @Override
403
    public void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException {
404
        long dmThresh = options == null
1✔
405
            ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS
406
            : options.getDeleteMarkersThresholdMillis();
1✔
407

408
        ZonedDateTime limit;
409
        if (dmThresh < 0) {
1✔
410
            limit = DateTimeUtils.fromNow(600000); // long enough in the future to clear all
1✔
411
        }
412
        else if (dmThresh == 0) {
1✔
413
            limit = DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS);
×
414
        }
415
        else {
416
            limit = DateTimeUtils.fromNow(-dmThresh);
1✔
417
        }
418

419
        List<String> keep0List = new ArrayList<>();
1✔
420
        List<String> keep1List = new ArrayList<>();
1✔
421
        visitSubject(streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> {
1✔
422
            KeyValueEntry kve = new KeyValueEntry(m);
1✔
423
            if (kve.getOperation() != KeyValueOperation.PUT) {
1✔
424
                if (kve.getCreated().isAfter(limit)) {
1✔
425
                    keep1List.add(new BucketAndKey(m).key);
1✔
426
                }
427
                else {
428
                    keep0List.add(new BucketAndKey(m).key);
1✔
429
                }
430
            }
431
        });
1✔
432

433
        for (String key : keep0List) {
1✔
434
            jsm.purgeStream(streamName, PurgeOptions.subject(readSubject(key)));
1✔
435
        }
1✔
436

437
        for (String key : keep1List) {
1✔
438
            PurgeOptions po = PurgeOptions.builder()
1✔
439
                .subject(readSubject(key))
1✔
440
                .keep(1)
1✔
441
                .build();
1✔
442
            jsm.purgeStream(streamName, po);
1✔
443
        }
1✔
444
    }
1✔
445

446
    /**
447
     * {@inheritDoc}
448
     */
449
    @Override
450
    public KeyValueStatus getStatus() throws IOException, JetStreamApiException, InterruptedException {
451
        return new KeyValueStatus(jsm.getStreamInfo(streamName));
1✔
452
    }
453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc