• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2109

15 Aug 2025 04:51PM UTC coverage: 95.404% (-0.05%) from 95.457%
#2109

push

github

web-flow
Merge pull request #1395 from nats-io/header-nullability

Header nullability

3 of 3 new or added lines in 1 file covered. (100.0%)

14 existing lines in 7 files now uncovered.

11915 of 12489 relevant lines covered (95.4%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.98
/src/main/java/io/nats/client/api/KeyValueConfiguration.java
1
// Copyright 2021 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13
package io.nats.client.api;
14

15
import io.nats.client.support.JsonValue;
16
import io.nats.client.support.JsonValueUtils;
17
import io.nats.client.support.NatsKeyValueUtil;
18
import org.jspecify.annotations.NonNull;
19
import org.jspecify.annotations.Nullable;
20

21
import java.time.Duration;
22
import java.util.*;
23

24
import static io.nats.client.support.NatsJetStreamConstants.SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
25
import static io.nats.client.support.NatsKeyValueUtil.*;
26
import static io.nats.client.support.Validator.*;
27

28
/**
29
 * The KeyValueConfiguration class contains the configuration for of a Key Value bucket.
30
 */
31
public class KeyValueConfiguration extends FeatureConfiguration {
32
    KeyValueConfiguration(StreamConfiguration sc) {
33
        super(sc, extractBucketName(sc.getName()));
1✔
34
    }
1✔
35

36
    /**
37
     * Gets the maximum number of history for any one key. Includes the current value.
38
     * @return the maximum number of values for any one key.
39
     */
40
    public long getMaxHistoryPerKey() {
41
        return sc.getMaxMsgsPerSubject();
1✔
42
    }
43

44
    /**
45
     * Gets the maximum size for an individual value in the bucket.
46
     * @deprecated the server value is a 32-bit signed value. Use {@link #getMaximumValueSize()} instead.
47
     * @return the maximum size for a value.
48
     */
49
    @Deprecated
50
    public long getMaxValueSize() {
51
        return sc.getMaximumMessageSize();
1✔
52
    }
53

54
    /**
55
     * Gets the maximum size for an individual value in the bucket.
56
     * @return the maximum size for a value.
57
     */
58
    public int getMaximumValueSize() {
59
        return sc.getMaximumMessageSize();
1✔
60
    }
61

62
    /**
63
     * Get the republish configuration. Might be null.
64
     * @return the republish object
65
     */
66
    @Nullable
67
    public Republish getRepublish() {
68
        return sc.getRepublish();
1✔
69
    }
70

71
    /**
72
     * The mirror definition for this configuration
73
     * @return the mirror
74
     */
75
    @Nullable
76
    public Mirror getMirror() {
77
        return sc.getMirror();
1✔
78
    }
79

80
    /**
81
     * The sources for this configuration
82
     * @return the sources
83
     */
84
    @Nullable
85
    public List<Source> getSources() {
86
        return sc.getSources();
1✔
87
    }
88

89
    /**
90
     * The limit marker ttl if set
91
     * @return the duration
92
     */
93
    @Nullable
94
    public Duration getLimitMarkerTtl() {
95
        return sc.getSubjectDeleteMarkerTtl();
1✔
96
    }
97

98
    @Override
99
    public String toString() {
100
        return "KeyValueConfiguration" + toJson();
1✔
101
    }
102

103
    @Override
104
    @NonNull
105
    public JsonValue toJsonValue() {
106
        JsonValueUtils.MapBuilder mb = new JsonValueUtils.MapBuilder(super.toJsonValue());
1✔
107
        mb.jv.mapOrder.remove("metaData");
1✔
108
        mb.put("maxHistoryPerKey", getMaxHistoryPerKey());
1✔
109
        mb.put("maxValueSize", getMaxValueSize());
1✔
110
        mb.put("republish", getRepublish());
1✔
111
        mb.put("mirror", getMirror());
1✔
112
        mb.put("sources", getSources());
1✔
113
        mb.put("limitMarkerTtl", getLimitMarkerTtl());
1✔
114
        mb.jv.mapOrder.add("metaData");
1✔
115
        return mb.toJsonValue();
1✔
116
    }
117

118
    /**
119
     * Creates a builder for the Key Value Configuration.
120
     * @return a KeyValueConfiguration Builder
121
     */
122
    public static Builder builder() {
123
        return new Builder((KeyValueConfiguration)null);
1✔
124
    }
125

126
    /**
127
     * Creates a builder for the Key Value Configuration.
128
     * @param name the name of the key value bucket
129
     * @return a KeyValueConfiguration Builder
130
     */
131
    public static Builder builder(String name) {
132
        return new Builder(name);
×
133
    }
134

135
    /**
136
     * Creates a builder to copy the key value configuration.
137
     * @param kvc an existing KeyValueConfiguration
138
     * @return a KeyValueConfiguration Builder
139
     */
140
    public static Builder builder(KeyValueConfiguration kvc) {
141
        return new Builder(kvc);
1✔
142
    }
143

144
    /**
145
     * KeyValueConfiguration is created using a Builder. The builder supports chaining and will
146
     * create a default set of options if no methods are calls.
147
     *
148
     * <p>{@code new Builder().build()} will create a new KeyValueConfiguration.
149
     *
150
     */
151
    public static class Builder
152
        extends FeatureConfiguration.Builder<Builder, KeyValueConfiguration>
153
    {
154
        Mirror mirror;
155
        Duration limitMarkerTtl;
156
        final List<Source> sources = new ArrayList<>();
1✔
157

158
        @Override
159
        protected Builder getThis() {
160
            return this;
1✔
161
        }
162

163
        /**
164
         * Default Builder
165
         */
166
        public Builder() {
167
            this((KeyValueConfiguration)null);
×
168
        }
×
169

170
        /**
171
         * Builder accepting the key value bucket name.
172
         * @param name name of the key value bucket.
173
         */
174
        public Builder(String name) {
175
            this((KeyValueConfiguration)null);
1✔
176
            name(name);
×
177
        }
×
178

179
        /**
180
         * Construct the builder by copying another configuration
181
         * @param kvc the configuration to copy
182
         */
183
        public Builder(KeyValueConfiguration kvc) {
1✔
184
            if (kvc == null) {
1✔
185
                scBuilder = new StreamConfiguration.Builder();
1✔
186
                maxHistoryPerKey(1);
1✔
187
                replicas(1);
1✔
188
            }
189
            else {
190
                scBuilder = new StreamConfiguration.Builder(kvc.sc);
1✔
191
                name = NatsKeyValueUtil.extractBucketName(kvc.sc.getName());
1✔
192
            }
193
        }
1✔
194

195
        /**
196
         * Sets the name of the key value bucket.
197
         * @param name name of the key value bucket.
198
         * @return the builder
199
         */
200
        @Override
201
        public Builder name(String name) {
202
            return super.name(name);
1✔
203
        }
204

205
        /**
206
         * Sets the description of the store.
207
         * @param description description of the store.
208
         * @return the builder
209
         */
210
        @Override
211
        public Builder description(String description) {
212
            return super.description(description);
1✔
213
        }
214

215
        /**
216
         * Sets the maximum number of history for any one key. Includes the current value.
217
         * @param maxHistoryPerKey the maximum history
218
         * @return Builder
219
         */
220
        public Builder maxHistoryPerKey(int maxHistoryPerKey) {
221
            scBuilder.maxMessagesPerSubject(validateMaxHistory(maxHistoryPerKey));
1✔
222
            return this;
1✔
223
        }
224

225
        /**
226
         * Sets the maximum number of bytes in the KeyValueConfiguration.
227
         * @param maxBucketSize the maximum number of bytes
228
         * @return Builder
229
         */
230
        @Override
231
        public Builder maxBucketSize(long maxBucketSize) {
232
            return super.maxBucketSize(maxBucketSize);
1✔
233
        }
234

235
        /**
236
         * Sets the maximum size for an individual value in the KeyValueConfiguration.
237
         * @deprecated the server value is a 32-bit signed value. Use {@link #maximumValueSize(int)} instead.
238
         * @param maxValueSize the maximum size for a value
239
         * @return Builder
240
         */
241
        @Deprecated
242
        public Builder maxValueSize(long maxValueSize) {
243
            scBuilder.maximumMessageSize((int)validateMaxValueSize(maxValueSize));
1✔
244
            return this;
1✔
245
        }
246

247
        /**
248
         * Sets the maximum size for an individual value in the KeyValueConfiguration.
249
         * @param maxValueSize the maximum size for a value
250
         * @return Builder
251
         */
252
        public Builder maximumValueSize(int maxValueSize) {
253
            scBuilder.maximumMessageSize((int)validateMaxValueSize(maxValueSize));
1✔
254
            return this;
1✔
255
        }
256

257
        /**
258
         * Sets the maximum age for a value in this KeyValueConfiguration.
259
         * @param ttl the maximum age
260
         * @return Builder
261
         */
262
        @Override
263
        public Builder ttl(Duration ttl) {
264
            return super.ttl(ttl);
1✔
265
        }
266

267
        /**
268
         * Sets the storage type in the KeyValueConfiguration.
269
         * @param storageType the storage type
270
         * @return Builder
271
         */
272
        @Override
273
        public Builder storageType(StorageType storageType) {
274
            return super.storageType(storageType);
1✔
275
        }
276

277
        /**
278
         * Sets the number of replicas a message must be stored on in the KeyValueConfiguration.
279
         * @param replicas the number of replicas
280
         * @return Builder
281
         */
282
        @Override
283
        public Builder replicas(int replicas) {
284
            return super.replicas(replicas);
1✔
285
        }
286

287
        /**
288
         * Sets the placement directive object
289
         * @param placement the placement directive object
290
         * @return Builder
291
         */
292
        @Override
293
        public Builder placement(Placement placement) {
294
            return super.placement(placement);
1✔
295
        }
296

297
        /**
298
         * Sets whether to use compression for the KeyValueConfiguration.
299
         * If set, will use the default compression algorithm of the KV backing store.
300
         * @param compression whether to use compression in the KeyValueConfiguration
301
         * @return Builder
302
         */
303
        @Override
304
        public Builder compression(boolean compression) {
305
            return super.compression(compression);
1✔
306
        }
307

308
        /**
309
         * Sets the metadata for the KeyValueConfiguration
310
         * @param metadata the metadata map
311
         * @return Builder
312
         */
313
        @Override
314
        public Builder metadata(Map<String, String> metadata) {
315
            return super.metadata(metadata);
1✔
316
        }
317

318
        /**
319
         * Sets the Republish options
320
         * @param republish the Republish object
321
         * @return Builder
322
         */
323
        public Builder republish(Republish republish) {
324
            scBuilder.republish(republish);
1✔
325
            return this;
1✔
326
        }
327

328
        /**
329
         * Sets the mirror in the KeyValueConfiguration.
330
         * @param mirror the KeyValue's mirror
331
         * @return Builder
332
         */
333
        public Builder mirror(Mirror mirror) {
334
            this.mirror = mirror;
1✔
335
            return this;
1✔
336
        }
337

338
        /**
339
         * Sets the sources in the KeyValueConfiguration.
340
         * @param sources the KeyValue's sources
341
         * @return Builder
342
         */
343
        public Builder sources(Source... sources) {
344
            this.sources.clear();
1✔
345
            return addSources(sources);
1✔
346
        }
347

348
        /**
349
         * Sets the sources in the KeyValueConfiguration
350
         * @param sources the KeyValue's sources
351
         * @return Builder
352
         */
353
        public Builder sources(Collection<Source> sources) {
354
            this.sources.clear();
1✔
355
            return addSources(sources);
1✔
356
        }
357

358
        /**
359
         * Add a source into the KeyValueConfiguration.
360
         * @param source a KeyValue source
361
         * @return Builder
362
         */
363
        public Builder addSource(Source source) {
364
            if (source != null && !this.sources.contains(source)) {
1✔
365
                this.sources.add(source);
1✔
366
            }
367
            return this;
1✔
368
        }
369

370
        /**
371
         * Adds the sources into the KeyValueConfiguration
372
         * @param sources the KeyValue's sources to add
373
         * @return Builder
374
         */
375
        public Builder addSources(Source... sources) {
376
            if (sources != null) {
1✔
377
                return addSources(Arrays.asList(sources));
1✔
378
            }
379
            return this;
1✔
380
        }
381

382
        /**
383
         * Adds the sources into the KeyValueConfiguration
384
         * @param sources the KeyValue's sources to add
385
         * @return Builder
386
         */
387
        public Builder addSources(Collection<Source> sources) {
388
            if (sources != null) {
1✔
389
                for (Source source : sources) {
1✔
390
                    if (source != null && !this.sources.contains(source)) {
1✔
391
                        this.sources.add(source);
1✔
392
                    }
393
                }
1✔
394
            }
395
            return this;
1✔
396
        }
397

398
        /**
399
         * The limit marker TTL duration. Server accepts 1 second or more.
400
         * Null or empty has the effect of clearing the limit marker ttl
401
         * @param limitMarkerTtl the TTL duration
402
         * @return The Builder
403
         */
404
        public Builder limitMarker(Duration limitMarkerTtl) {
UNCOV
405
            this.limitMarkerTtl = validateDurationNotRequiredGtOrEqSeconds(1, limitMarkerTtl, null, "Limit Marker Ttl");
×
UNCOV
406
            return this;
×
407
        }
408

409
        /**
410
         * The limit marker TTL duration in milliseconds. Server accepts 1 second or more.
411
         * 0 or less has the effect of clearing the limit marker ttl
412
         * @param limitMarkerTtlMillis the TTL duration
413
         * @return The Builder
414
         */
415
        public Builder limitMarker(long limitMarkerTtlMillis) {
416
            if (limitMarkerTtlMillis <= 0) {
1✔
417
                this.limitMarkerTtl = null;
×
418
            }
419
            else {
420
                this.limitMarkerTtl = validateDurationGtOrEqSeconds(1, limitMarkerTtlMillis, "Limit Marker Ttl");
1✔
421
            }
422
            return this;
1✔
423
        }
424

425
        /**
426
         * Builds the KeyValueConfiguration
427
         * @return the KeyValueConfiguration.
428
         */
429
        public KeyValueConfiguration build() {
430
            name = required(name, "name");
1✔
431
            scBuilder.name(toStreamName(name))
1✔
432
                .allowRollup(true)
1✔
433
                .allowDirect(true) // by design
1✔
434
                .discardPolicy(DiscardPolicy.New)
1✔
435
                .denyDelete(true);
1✔
436

437
            if (mirror != null) {
1✔
438
                scBuilder.mirrorDirect(true);
1✔
439
                String name = mirror.getName();
1✔
440
                if (hasPrefix(name)) {
1✔
441
                    scBuilder.mirror(mirror);
1✔
442
                }
443
                else {
444
                    scBuilder.mirror(
1✔
445
                        Mirror.builder(mirror)
1✔
446
                            .name(toStreamName(name))
1✔
447
                            .build());
1✔
448
                }
449
            }
1✔
450
            else if (!sources.isEmpty()) {
1✔
451
                for (Source source : sources) {
1✔
452
                    String name = source.getName();
1✔
453
                    if (hasPrefix(name)) {
1✔
454
                        scBuilder.addSource(source);
1✔
455
                    }
456
                    else {
457
                        scBuilder.addSource(
1✔
458
                            Source.builder(source)
1✔
459
                                .name(toStreamName(name))
1✔
460
                                .build());
1✔
461
                    }
462
                }
1✔
463
            }
464
            else {
465
                scBuilder.subjects(toStreamSubject(name));
1✔
466
            }
467

468
            if (limitMarkerTtl != null) {
1✔
469
                scBuilder.subjectDeleteMarkerTtl(limitMarkerTtl).allowMessageTtl();
1✔
470
            }
471

472
            // When stream's MaxAge is not set, server uses 2 minutes as the default
473
            // for the duplicate window. If MaxAge is set, and lower than 2 minutes,
474
            // then the duplicate window will be set to that. If MaxAge is greater,
475
            // we will cap the duplicate window to 2 minutes (to be consistent with
476
            // previous behavior).
477
            long ttlMs = ttl.toMillis();
1✔
478
            long dupeMs = SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
1✔
479
            if (ttlMs > 0 && ttlMs < SERVER_DEFAULT_DUPLICATE_WINDOW_MS) {
1✔
480
                dupeMs = ttlMs;
1✔
481
            }
482
            scBuilder.duplicateWindow(dupeMs);
1✔
483

484
            return new KeyValueConfiguration(scBuilder.build());
1✔
485
        }
486
    }
487
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc