• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1957

30 Apr 2025 08:01PM UTC coverage: 95.654% (+0.01%) from 95.641%
#1957

push

github

web-flow
Merge pull request #1310 from nats-io/kv-limit-marker

KV Limit Marker

55 of 59 new or added lines in 8 files covered. (93.22%)

4 existing lines in 3 files now uncovered.

11687 of 12218 relevant lines covered (95.65%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.64
/src/main/java/io/nats/client/api/KeyValueConfiguration.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13
package io.nats.client.api;
14

15
import io.nats.client.support.JsonValue;
16
import io.nats.client.support.JsonValueUtils;
17
import io.nats.client.support.NatsKeyValueUtil;
18

19
import java.time.Duration;
20
import java.util.*;
21

22
import static io.nats.client.support.NatsJetStreamConstants.SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
23
import static io.nats.client.support.NatsKeyValueUtil.*;
24
import static io.nats.client.support.Validator.*;
25

26
/**
27
 * The KeyValueConfiguration class contains the configuration for of a Key Value bucket.
28
 */
29
public class KeyValueConfiguration extends FeatureConfiguration {
30
    KeyValueConfiguration(StreamConfiguration sc) {
31
        super(sc, extractBucketName(sc.getName()));
1✔
32
    }
1✔
33

34
    /**
35
     * Gets the maximum number of history for any one key. Includes the current value.
36
     * @return the maximum number of values for any one key.
37
     */
38
    public long getMaxHistoryPerKey() {
39
        return sc.getMaxMsgsPerSubject();
1✔
40
    }
41

42
    /**
43
     * Gets the maximum size for an individual value in the bucket.
44
     * @deprecated the server value is a 32-bit signed value. Use {@link #getMaximumValueSize()} instead.
45
     * @return the maximum size for a value.
46
     */
47
    @Deprecated
48
    public long getMaxValueSize() {
49
        return sc.getMaximumMessageSize();
1✔
50
    }
51

52
    /**
53
     * Gets the maximum size for an individual value in the bucket.
54
     * @return the maximum size for a value.
55
     */
56
    public int getMaximumValueSize() {
57
        return sc.getMaximumMessageSize();
1✔
58
    }
59

60
    /**
61
     * Get the republish configuration. Might be null.
62
     * @return the republish object
63
     */
64
    public Republish getRepublish() {
65
        return sc.getRepublish();
1✔
66
    }
67

68
    /**
69
     * The mirror definition for this configuration
70
     * @return the mirror
71
     */
72
    public Mirror getMirror() {
73
        return sc.getMirror();
1✔
74
    }
75

76
    /**
77
     * The sources for this configuration
78
     * @return the sources
79
     */
80
    public List<Source> getSources() {
81
        return sc.getSources();
1✔
82
    }
83

84
    @Override
85
    public String toString() {
86
        return "KeyValueConfiguration" + toJson();
1✔
87
    }
88

89
    @Override
90
    public JsonValue toJsonValue() {
91
        JsonValueUtils.MapBuilder mb = new JsonValueUtils.MapBuilder(super.toJsonValue());
1✔
92
        mb.jv.mapOrder.remove("metaData");
1✔
93
        mb.put("maxHistoryPerKey", getMaxHistoryPerKey());
1✔
94
        mb.put("maxValueSize", getMaxValueSize());
1✔
95
        mb.put("republish", getRepublish());
1✔
96
        mb.put("mirror", getMirror());
1✔
97
        mb.put("sources", getSources());
1✔
98
        mb.jv.mapOrder.add("metaData");
1✔
99
        return mb.toJsonValue();
1✔
100
    }
101

102
    /**
103
     * Creates a builder for the Key Value Configuration.
104
     * @return a KeyValueConfiguration Builder
105
     */
106
    public static Builder builder() {
107
        return new Builder((KeyValueConfiguration)null);
1✔
108
    }
109

110
    /**
111
     * Creates a builder for the Key Value Configuration.
112
     * @param name the name of the key value bucket
113
     * @return a KeyValueConfiguration Builder
114
     */
115
    public static Builder builder(String name) {
116
        return new Builder(name);
×
117
    }
118

119
    /**
120
     * Creates a builder to copy the key value configuration.
121
     * @param kvc an existing KeyValueConfiguration
122
     * @return a KeyValueConfiguration Builder
123
     */
124
    public static Builder builder(KeyValueConfiguration kvc) {
125
        return new Builder(kvc);
1✔
126
    }
127

128
    /**
129
     * KeyValueConfiguration is created using a Builder. The builder supports chaining and will
130
     * create a default set of options if no methods are calls.
131
     *
132
     * <p>{@code new Builder().build()} will create a new KeyValueConfiguration.
133
     *
134
     */
135
    public static class Builder
136
        extends FeatureConfiguration.Builder<Builder, KeyValueConfiguration>
137
    {
138
        Mirror mirror;
139
        Duration limitMarkerTtl;
140
        final List<Source> sources = new ArrayList<>();
1✔
141

142
        @Override
143
        protected Builder getThis() {
144
            return this;
1✔
145
        }
146

147
        /**
148
         * Default Builder
149
         */
150
        public Builder() {
151
            this((KeyValueConfiguration)null);
×
152
        }
×
153

154
        /**
155
         * Builder accepting the key value bucket name.
156
         * @param name name of the key value bucket.
157
         */
158
        public Builder(String name) {
159
            this((KeyValueConfiguration)null);
1✔
160
            name(name);
×
161
        }
×
162

163
        /**
164
         * Construct the builder by copying another configuration
165
         * @param kvc the configuration to copy
166
         */
167
        public Builder(KeyValueConfiguration kvc) {
1✔
168
            if (kvc == null) {
1✔
169
                scBuilder = new StreamConfiguration.Builder();
1✔
170
                maxHistoryPerKey(1);
1✔
171
                replicas(1);
1✔
172
            }
173
            else {
174
                scBuilder = new StreamConfiguration.Builder(kvc.sc);
1✔
175
                name = NatsKeyValueUtil.extractBucketName(kvc.sc.getName());
1✔
176
            }
177
        }
1✔
178

179
        /**
180
         * Sets the name of the key value bucket.
181
         * @param name name of the key value bucket.
182
         * @return the builder
183
         */
184
        @Override
185
        public Builder name(String name) {
186
            return super.name(name);
1✔
187
        }
188

189
        /**
190
         * Sets the description of the store.
191
         * @param description description of the store.
192
         * @return the builder
193
         */
194
        @Override
195
        public Builder description(String description) {
196
            return super.description(description);
1✔
197
        }
198

199
        /**
200
         * Sets the maximum number of history for any one key. Includes the current value.
201
         * @param maxHistoryPerKey the maximum history
202
         * @return Builder
203
         */
204
        public Builder maxHistoryPerKey(int maxHistoryPerKey) {
205
            scBuilder.maxMessagesPerSubject(validateMaxHistory(maxHistoryPerKey));
1✔
206
            return this;
1✔
207
        }
208

209
        /**
210
         * Sets the maximum number of bytes in the KeyValueConfiguration.
211
         * @param maxBucketSize the maximum number of bytes
212
         * @return Builder
213
         */
214
        @Override
215
        public Builder maxBucketSize(long maxBucketSize) {
216
            return super.maxBucketSize(maxBucketSize);
1✔
217
        }
218

219
        /**
220
         * Sets the maximum size for an individual value in the KeyValueConfiguration.
221
         * @deprecated the server value is a 32-bit signed value. Use {@link #maximumValueSize(int)} instead.
222
         * @param maxValueSize the maximum size for a value
223
         * @return Builder
224
         */
225
        @Deprecated
226
        public Builder maxValueSize(long maxValueSize) {
227
            scBuilder.maximumMessageSize((int)validateMaxValueSize(maxValueSize));
1✔
228
            return this;
1✔
229
        }
230

231
        /**
232
         * Sets the maximum size for an individual value in the KeyValueConfiguration.
233
         * @param maxValueSize the maximum size for a value
234
         * @return Builder
235
         */
236
        public Builder maximumValueSize(int maxValueSize) {
237
            scBuilder.maximumMessageSize((int)validateMaxValueSize(maxValueSize));
1✔
238
            return this;
1✔
239
        }
240

241
        /**
242
         * Sets the maximum age for a value in this KeyValueConfiguration.
243
         * @param ttl the maximum age
244
         * @return Builder
245
         */
246
        @Override
247
        public Builder ttl(Duration ttl) {
248
            return super.ttl(ttl);
1✔
249
        }
250

251
        /**
252
         * Sets the storage type in the KeyValueConfiguration.
253
         * @param storageType the storage type
254
         * @return Builder
255
         */
256
        @Override
257
        public Builder storageType(StorageType storageType) {
258
            return super.storageType(storageType);
1✔
259
        }
260

261
        /**
262
         * Sets the number of replicas a message must be stored on in the KeyValueConfiguration.
263
         * @param replicas the number of replicas
264
         * @return Builder
265
         */
266
        @Override
267
        public Builder replicas(int replicas) {
268
            return super.replicas(replicas);
1✔
269
        }
270

271
        /**
272
         * Sets the placement directive object
273
         * @param placement the placement directive object
274
         * @return Builder
275
         */
276
        @Override
277
        public Builder placement(Placement placement) {
278
            return super.placement(placement);
1✔
279
        }
280

281
        /**
282
         * Sets whether to use compression for the KeyValueConfiguration.
283
         * If set, will use the default compression algorithm of the KV backing store.
284
         * @param compression whether to use compression in the KeyValueConfiguration
285
         * @return Builder
286
         */
287
        @Override
288
        public Builder compression(boolean compression) {
289
            return super.compression(compression);
1✔
290
        }
291

292
        /**
293
         * Sets the metadata for the KeyValueConfiguration
294
         * @param metadata the metadata map
295
         * @return Builder
296
         */
297
        @Override
298
        public Builder metadata(Map<String, String> metadata) {
299
            return super.metadata(metadata);
1✔
300
        }
301

302
        /**
303
         * Sets the Republish options
304
         * @param republish the Republish object
305
         * @return Builder
306
         */
307
        public Builder republish(Republish republish) {
308
            scBuilder.republish(republish);
1✔
309
            return this;
1✔
310
        }
311

312
        /**
313
         * Sets the mirror in the KeyValueConfiguration.
314
         * @param mirror the KeyValue's mirror
315
         * @return Builder
316
         */
317
        public Builder mirror(Mirror mirror) {
318
            this.mirror = mirror;
1✔
319
            return this;
1✔
320
        }
321

322
        /**
323
         * Sets the sources in the KeyValueConfiguration.
324
         * @param sources the KeyValue's sources
325
         * @return Builder
326
         */
327
        public Builder sources(Source... sources) {
328
            this.sources.clear();
1✔
329
            return addSources(sources);
1✔
330
        }
331

332
        /**
333
         * Sets the sources in the KeyValueConfiguration
334
         * @param sources the KeyValue's sources
335
         * @return Builder
336
         */
337
        public Builder sources(Collection<Source> sources) {
338
            this.sources.clear();
1✔
339
            return addSources(sources);
1✔
340
        }
341

342
        /**
343
         * Add a source into the KeyValueConfiguration.
344
         * @param source a KeyValue source
345
         * @return Builder
346
         */
347
        public Builder addSource(Source source) {
348
            if (source != null && !this.sources.contains(source)) {
1✔
349
                this.sources.add(source);
1✔
350
            }
351
            return this;
1✔
352
        }
353

354
        /**
355
         * Adds the sources into the KeyValueConfiguration
356
         * @param sources the KeyValue's sources to add
357
         * @return Builder
358
         */
359
        public Builder addSources(Source... sources) {
360
            if (sources != null) {
1✔
361
                return addSources(Arrays.asList(sources));
1✔
362
            }
363
            return this;
1✔
364
        }
365

366
        /**
367
         * Adds the sources into the KeyValueConfiguration
368
         * @param sources the KeyValue's sources to add
369
         * @return Builder
370
         */
371
        public Builder addSources(Collection<Source> sources) {
372
            if (sources != null) {
1✔
373
                for (Source source : sources) {
1✔
374
                    if (source != null && !this.sources.contains(source)) {
1✔
375
                        this.sources.add(source);
1✔
376
                    }
377
                }
1✔
378
            }
379
            return this;
1✔
380
        }
381

382
        /**
383
         * The limit marker TTL duration. Server accepts 1 second or more.
384
         * @param limitMarkerTtl the TTL duration
385
         * @return The Builder
386
         */
387
        public Builder limitMarker(Duration limitMarkerTtl) {
388
            this.limitMarkerTtl = validateDurationNotRequiredGtOrEqSeconds(1, limitMarkerTtl, null);
1✔
389
            return this;
1✔
390
        }
391

392
        /**
393
         * The limit marker TTL duration. Server accepts 1 second or more.
394
         * @param limitMarkerTtlMillis the TTL duration
395
         * @return The Builder
396
         */
397
        public Builder limitMarker(long limitMarkerTtlMillis) {
398
            if (limitMarkerTtlMillis <= 0) {
1✔
NEW
399
                this.limitMarkerTtl = null;
×
400
            }
401
            else {
402
                this.limitMarkerTtl = validateDurationGtOrEqSeconds(1, limitMarkerTtlMillis);
1✔
403
            }
404
            return this;
1✔
405
        }
406

407
        /**
408
         * Builds the KeyValueConfiguration
409
         * @return the KeyValueConfiguration.
410
         */
411
        public KeyValueConfiguration build() {
412
            name = required(name, "name");
1✔
413
            scBuilder.name(toStreamName(name))
1✔
414
                .allowRollup(true)
1✔
415
                .allowDirect(true) // by design
1✔
416
                .discardPolicy(DiscardPolicy.New)
1✔
417
                .denyDelete(true);
1✔
418

419
            if (mirror != null) {
1✔
420
                scBuilder.mirrorDirect(true);
1✔
421
                String name = mirror.getName();
1✔
422
                if (hasPrefix(name)) {
1✔
423
                    scBuilder.mirror(mirror);
1✔
424
                }
425
                else {
426
                    scBuilder.mirror(
1✔
427
                        Mirror.builder(mirror)
1✔
428
                            .name(toStreamName(name))
1✔
429
                            .build());
1✔
430
                }
431
            }
1✔
432
            else if (!sources.isEmpty()) {
1✔
433
                for (Source source : sources) {
1✔
434
                    String name = source.getName();
1✔
435
                    if (hasPrefix(name)) {
1✔
436
                        scBuilder.addSource(source);
1✔
437
                    }
438
                    else {
439
                        scBuilder.addSource(
1✔
440
                            Source.builder(source)
1✔
441
                                .name(toStreamName(name))
1✔
442
                                .build());
1✔
443
                    }
444
                }
1✔
445
            }
446
            else {
447
                scBuilder.subjects(toStreamSubject(name));
1✔
448
            }
449

450
            if (limitMarkerTtl != null) {
1✔
451
                scBuilder.subjectDeleteMarkerTtl(limitMarkerTtl).allowMessageTtl();
1✔
452
            }
453

454
            // When stream's MaxAge is not set, server uses 2 minutes as the default
455
            // for the duplicate window. If MaxAge is set, and lower than 2 minutes,
456
            // then the duplicate window will be set to that. If MaxAge is greater,
457
            // we will cap the duplicate window to 2 minutes (to be consistent with
458
            // previous behavior).
459
            long ttlMs = ttl.toMillis();
1✔
460
            long dupeMs = SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
1✔
461
            if (ttlMs > 0 && ttlMs < SERVER_DEFAULT_DUPLICATE_WINDOW_MS) {
1✔
462
                dupeMs = ttlMs;
1✔
463
            }
464
            scBuilder.duplicateWindow(dupeMs);
1✔
465

466
            return new KeyValueConfiguration(scBuilder.build());
1✔
467
        }
468
    }
469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc