• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1959

02 May 2025 03:26PM UTC coverage: 95.638% (-0.02%) from 95.654%
#1959

push

github

web-flow
Merge pull request #1311 from nats-io/kv-ttl-more

KV LimitMarker add missing getter, additional docs and tests

14 of 15 new or added lines in 5 files covered. (93.33%)

2 existing lines in 2 files now uncovered.

11687 of 12220 relevant lines covered (95.64%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.38
/src/main/java/io/nats/client/api/StreamConfiguration.java
1
// Copyright 2020 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.api;
15

16
import io.nats.client.support.*;
17

18
import java.time.Duration;
19
import java.util.*;
20

21
import static io.nats.client.support.ApiConstants.*;
22
import static io.nats.client.support.JsonUtils.*;
23
import static io.nats.client.support.JsonValueUtils.readBoolean;
24
import static io.nats.client.support.JsonValueUtils.readInteger;
25
import static io.nats.client.support.JsonValueUtils.readLong;
26
import static io.nats.client.support.JsonValueUtils.readNanos;
27
import static io.nats.client.support.JsonValueUtils.readString;
28
import static io.nats.client.support.JsonValueUtils.*;
29
import static io.nats.client.support.Validator.*;
30

31
/**
32
 * The StreamConfiguration class specifies the configuration for creating a JetStream stream on the server.
33
 * Options are created using a {@link StreamConfiguration.Builder Builder}.
34
 */
35
public class StreamConfiguration implements JsonSerializable {
36

37
    // see builder for defaults
38
    private final String name;
39
    private final String description;
40
    private final List<String> subjects;
41
    private final RetentionPolicy retentionPolicy;
42
    private final CompressionOption compressionOption;
43
    private final long maxConsumers;
44
    private final long maxMsgs;
45
    private final long maxMsgsPerSubject;
46
    private final long maxBytes;
47
    private final Duration maxAge;
48
    private final int maxMsgSize;
49
    private final StorageType storageType;
50
    private final int replicas;
51
    private final boolean noAck;
52
    private final String templateOwner;
53
    private final DiscardPolicy discardPolicy;
54
    private final Duration duplicateWindow;
55
    private final Placement placement;
56
    private final Republish republish;
57
    private final SubjectTransform subjectTransform;
58
    private final ConsumerLimits consumerLimits;
59
    private final Mirror mirror;
60
    private final List<Source> sources;
61
    private final boolean sealed;
62
    private final boolean allowRollup;
63
    private final boolean allowDirect;
64
    private final boolean mirrorDirect;
65
    private final boolean denyDelete;
66
    private final boolean denyPurge;
67
    private final boolean discardNewPerSubject;
68
    private final Map<String, String> metadata;
69
    private final long firstSequence;
70
    private final boolean allowMessageTtl;
71
    private final Duration subjectDeleteMarkerTtl;
72

73
    static StreamConfiguration instance(JsonValue v) {
74
        return new Builder()
1✔
75
            .retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)))
1✔
76
            .compressionOption(CompressionOption.get(readString(v, COMPRESSION)))
1✔
77
            .storageType(StorageType.get(readString(v, STORAGE)))
1✔
78
            .discardPolicy(DiscardPolicy.get(readString(v, DISCARD)))
1✔
79
            .name(readString(v, NAME))
1✔
80
            .description(readString(v, DESCRIPTION))
1✔
81
            .maxConsumers(readLong(v, MAX_CONSUMERS, -1))
1✔
82
            .maxMessages(readLong(v, MAX_MSGS, -1))
1✔
83
            .maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1))
1✔
84
            .maxBytes(readLong(v, MAX_BYTES, -1))
1✔
85
            .maxAge(readNanos(v, MAX_AGE))
1✔
86
            .maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1))
1✔
87
            .replicas(readInteger(v, NUM_REPLICAS, 1))
1✔
88
            .noAck(readBoolean(v, NO_ACK))
1✔
89
            .templateOwner(readString(v, TEMPLATE_OWNER))
1✔
90
            .duplicateWindow(readNanos(v, DUPLICATE_WINDOW))
1✔
91
            .subjects(readStringList(v, SUBJECTS))
1✔
92
            .placement(Placement.optionalInstance(readValue(v, PLACEMENT)))
1✔
93
            .republish(Republish.optionalInstance(readValue(v, REPUBLISH)))
1✔
94
            .subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)))
1✔
95
            .consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)))
1✔
96
            .mirror(Mirror.optionalInstance(readValue(v, MIRROR)))
1✔
97
            .sources(Source.optionalListOf(readValue(v, SOURCES)))
1✔
98
            .sealed(readBoolean(v, SEALED))
1✔
99
            .allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS))
1✔
100
            .allowDirect(readBoolean(v, ALLOW_DIRECT))
1✔
101
            .mirrorDirect(readBoolean(v, MIRROR_DIRECT))
1✔
102
            .denyDelete(readBoolean(v, DENY_DELETE))
1✔
103
            .denyPurge(readBoolean(v, DENY_PURGE))
1✔
104
            .discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT))
1✔
105
            .metadata(readStringStringMap(v, METADATA))
1✔
106
            .firstSequence(readLong(v, FIRST_SEQ, 1))
1✔
107
            .allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL))
1✔
108
            .subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL))
1✔
109
            .build();
1✔
110
    }
111

112
    // For the builder, assumes all validations are already done in builder
113
    StreamConfiguration(Builder b) {
1✔
114
        this.name = b.name;
1✔
115
        this.description = b.description;
1✔
116
        this.subjects = b.subjects;
1✔
117
        this.retentionPolicy = b.retentionPolicy;
1✔
118
        this.compressionOption = b.compressionOption;
1✔
119
        this.maxConsumers = b.maxConsumers;
1✔
120
        this.maxMsgs = b.maxMsgs;
1✔
121
        this.maxMsgsPerSubject = b.maxMsgsPerSubject;
1✔
122
        this.maxBytes = b.maxBytes;
1✔
123
        this.maxAge = b.maxAge;
1✔
124
        this.maxMsgSize = b.maxMsgSize;
1✔
125
        this.storageType = b.storageType;
1✔
126
        this.replicas = b.replicas;
1✔
127
        this.noAck = b.noAck;
1✔
128
        this.templateOwner = b.templateOwner;
1✔
129
        this.discardPolicy = b.discardPolicy;
1✔
130
        this.duplicateWindow = b.duplicateWindow;
1✔
131
        this.placement = b.placement;
1✔
132
        this.republish = b.republish;
1✔
133
        this.subjectTransform = b.subjectTransform;
1✔
134
        this.consumerLimits = b.consumerLimits;
1✔
135
        this.mirror = b.mirror;
1✔
136
        this.sources = b.sources;
1✔
137
        this.sealed = b.sealed;
1✔
138
        this.allowRollup = b.allowRollup;
1✔
139
        this.allowDirect = b.allowDirect;
1✔
140
        this.mirrorDirect = b.mirrorDirect;
1✔
141
        this.denyDelete = b.denyDelete;
1✔
142
        this.denyPurge = b.denyPurge;
1✔
143
        this.discardNewPerSubject = b.discardNewPerSubject;
1✔
144
        this.metadata = b.metadata;
1✔
145
        this.firstSequence = b.firstSequence;
1✔
146
        this.allowMessageTtl = b.allowMessageTtl;
1✔
147
        this.subjectDeleteMarkerTtl = b.subjectDeleteMarkerTtl;
1✔
148
    }
1✔
149

150
    /**
151
     * Returns a StreamConfiguration deserialized from its JSON form.
152
     *
153
     * @see #toJson()
154
     * @param json the json representing the Stream Configuration
155
     * @return StreamConfiguration for the given json
156
     * @throws JsonParseException thrown if the parsing fails for invalid json
157
     */
158
    public static StreamConfiguration instance(String json) throws JsonParseException {
159
        return instance(JsonParser.parse(json));
1✔
160
    }
161

162
    /**
163
     * Returns a JSON representation of this consumer configuration.
164
     *
165
     * @return json consumer configuration to send to the server.
166
     */
167
    public String toJson() {
168

169
        StringBuilder sb = beginJson();
1✔
170

171
        addField(sb, NAME, name);
1✔
172
        JsonUtils.addField(sb, DESCRIPTION, description);
1✔
173
        addStrings(sb, SUBJECTS, subjects);
1✔
174
        addField(sb, RETENTION, retentionPolicy.toString());
1✔
175
        addEnumWhenNot(sb, COMPRESSION, compressionOption, CompressionOption.None);
1✔
176
        addField(sb, MAX_CONSUMERS, maxConsumers);
1✔
177
        addField(sb, MAX_MSGS, maxMsgs);
1✔
178
        addField(sb, MAX_MSGS_PER_SUB, maxMsgsPerSubject);
1✔
179
        addField(sb, MAX_BYTES, maxBytes);
1✔
180
        addFieldAsNanos(sb, MAX_AGE, maxAge);
1✔
181
        addField(sb, MAX_MSG_SIZE, maxMsgSize);
1✔
182
        addField(sb, STORAGE, storageType.toString());
1✔
183
        addField(sb, NUM_REPLICAS, replicas);
1✔
184
        addFldWhenTrue(sb, NO_ACK, noAck);
1✔
185
        addField(sb, TEMPLATE_OWNER, templateOwner);
1✔
186
        addField(sb, DISCARD, discardPolicy.toString());
1✔
187
        addFieldAsNanos(sb, DUPLICATE_WINDOW, duplicateWindow);
1✔
188
        if (placement != null && placement.hasData()) {
1✔
189
            addField(sb, PLACEMENT, placement);
1✔
190
        }
191
        addField(sb, REPUBLISH, republish);
1✔
192
        addField(sb, SUBJECT_TRANSFORM, subjectTransform);
1✔
193
        addField(sb, CONSUMER_LIMITS, consumerLimits);
1✔
194
        addField(sb, MIRROR, mirror);
1✔
195
        addJsons(sb, SOURCES, sources);
1✔
196
        addFldWhenTrue(sb, SEALED, sealed);
1✔
197
        addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup);
1✔
198
        addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect);
1✔
199
        addFldWhenTrue(sb, MIRROR_DIRECT, mirrorDirect);
1✔
200
        addFldWhenTrue(sb, DENY_DELETE, denyDelete);
1✔
201
        addFldWhenTrue(sb, DENY_PURGE, denyPurge);
1✔
202
        addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject);
1✔
203
        addField(sb, METADATA, metadata);
1✔
204
        addFieldWhenGreaterThan(sb, FIRST_SEQ, firstSequence, 1);
1✔
205
        addFldWhenTrue(sb, ALLOW_MSG_TTL, allowMessageTtl);
1✔
206
        addFieldAsNanos(sb, SUBJECT_DELETE_MARKER_TTL, subjectDeleteMarkerTtl);
1✔
207

208
        return endJson(sb).toString();
1✔
209
    }
210

211
    /**
212
     * Gets the name of this stream configuration.
213
     * @return the name of the stream.
214
     */
215
    public String getName() {
216
        return name;
1✔
217
    }
218

219
    /**
220
     * Gets the description of this stream configuration.
221
     * @return the description of the stream.
222
     */
223
    public String getDescription() {
224
        return description;
1✔
225
    }
226

227
    /**
228
     * Gets the subjects for this stream configuration.
229
     * @return the subject of the stream.
230
     */
231
    public List<String> getSubjects() {
232
        return subjects;
1✔
233
    }
234

235
    /**
236
     * Gets the discard policy for this stream configuration.
237
     * @return the discard policy of the stream.
238
     */
239
    public DiscardPolicy getDiscardPolicy() {
240
        return discardPolicy;
1✔
241
    }
242

243
    /**
244
     * Gets the retention policy for this stream configuration.
245
     * @return the retention policy for this stream.
246
     */
247
    public RetentionPolicy getRetentionPolicy() {
248
        return retentionPolicy;
1✔
249
    }
250

251
    /**
252
     * Gets the compression option for this stream configuration.
253
     * @return the compression option for this stream.
254
     */
255
    public CompressionOption getCompressionOption() {
256
        return compressionOption;
1✔
257
    }
258

259
    /**
260
     * Gets the maximum number of consumers for this stream configuration.
261
     * @return the maximum number of consumers for this stream.
262
     */
263
    public long getMaxConsumers() {
264
        return maxConsumers;
1✔
265
    }
266

267
    /**
268
     * Gets the maximum messages for this stream configuration.
269
     * @return the maximum number of messages for this stream.
270
     */
271
    public long getMaxMsgs() {
272
        return maxMsgs;
1✔
273
    }
274

275
    /**
276
     * Gets the maximum messages for this stream configuration.
277
     * @return the maximum number of messages for this stream.
278
     */
279
    public long getMaxMsgsPerSubject() {
280
        return maxMsgsPerSubject;
1✔
281
    }
282

283
    /**
284
     * Gets the maximum number of bytes for this stream configuration.
285
     * @return the maximum number of bytes for this stream.
286
     */
287
    public long getMaxBytes() {
288
        return maxBytes;
1✔
289
    }
290

291
    /**
292
     * Gets the maximum message age for this stream configuration.
293
     * @return the maximum message age for this stream.
294
     */  
295
    public Duration getMaxAge() {
296
        return maxAge;
1✔
297
    }
298

299
    /**
300
     * Gets the maximum message size for this stream configuration.
301
     * @deprecated the server value is a 32-bit signed value. Use {@link #getMaximumMessageSize()} instead.
302
     * @return the maximum message size for this stream.
303
     */
304
    @Deprecated
305
    public long getMaxMsgSize() {
306
        return maxMsgSize;
1✔
307
    }
308

309
    /**
310
     * Gets the maximum message size for this stream configuration.
311
     * @return the maximum message size for this stream.
312
     */
313
    public int getMaximumMessageSize() {
314
        return maxMsgSize;
1✔
315
    }
316

317
    /**
318
     * Gets the storage type for this stream configuration.
319
     * @return the storage type for this stream.
320
     */
321
    public StorageType getStorageType() {
322
        return storageType;
1✔
323
    }
324

325
    /**
326
     * Gets the number of replicas for this stream configuration.
327
     * @return the number of replicas
328
     */    
329
    public int getReplicas() {
330
        return replicas;
1✔
331
    }
332

333
    /**
334
     * Gets whether acknowledgements are required in this stream configuration.
335
     * @return true if acknowedgments are not required.
336
     */
337
    public boolean getNoAck() {
338
        return noAck;
1✔
339
    }
340

341
    /**
342
     * Gets the template json for this stream configuration.
343
     * @return the template for this stream.
344
     */    
345
    public String getTemplateOwner() {
346
        return templateOwner;
1✔
347
    }
348

349
    /**
350
     * Gets the duplicate checking window stream configuration.  Duration.ZERO
351
     * means duplicate checking is not enabled.
352
     * @return the duration of the window.
353
     */    
354
    public Duration getDuplicateWindow() {
355
        return duplicateWindow;
1✔
356
    }
357

358
    /**
359
     * Get the placement directives to consider when placing replicas of this stream,
360
     * random placement when unset. May be null.
361
     * @return the placement object
362
     */
363
    public Placement getPlacement() {
364
        return placement;
1✔
365
    }
366

367
    /**
368
     * Get the republish configuration. May be null.
369
     * @return the republish object
370
     */
371
    public Republish getRepublish() {
372
        return republish;
1✔
373
    }
374

375
    /**
376
     * Get the subjectTransform configuration. May be null.
377
     * @return the subjectTransform object
378
     */
379
    public SubjectTransform getSubjectTransform() {
380
        return subjectTransform;
1✔
381
    }
382

383
    /**
384
     * Get the consumerLimits configuration. May be null.
385
     * @return the consumerLimits object
386
     */
387
    public ConsumerLimits getConsumerLimits() {
388
        return consumerLimits;
1✔
389
    }
390

391
    /**
392
     * The mirror definition for this stream
393
     * @return the mirror
394
     */
395
    public Mirror getMirror() {
396
        return mirror;
1✔
397
    }
398

399
    /**
400
     * The sources for this stream
401
     * @return the sources
402
     */
403
    public List<Source> getSources() {
404
        return sources;
1✔
405
    }
406

407
    /**
408
     * Get the flag indicating if the stream is sealed.
409
     * @return the sealed flag
410
     */
411
    public boolean getSealed() {
412
        return sealed;
1✔
413
    }
414

415
    /**
416
     * Get the flag indicating if the stream allows rollup.
417
     * @return the allows rollup flag
418
     */
419
    public boolean getAllowRollup() {
420
        return allowRollup;
1✔
421
    }
422

423
    /**
424
     * Get the flag indicating if the stream allows direct message access.
425
     * @return the allows direct flag
426
     */
427
    public boolean getAllowDirect() {
428
        return allowDirect;
1✔
429
    }
430

431
    /**
432
     * Get the flag indicating if the stream allows
433
     * higher performance and unified direct access for mirrors as well.
434
     * @return the allows direct flag
435
     */
436
    public boolean getMirrorDirect() {
437
        return mirrorDirect;
1✔
438
    }
439

440
    /**
441
     * Get the flag indicating if deny delete is set for the stream
442
     * @return the deny delete flag
443
     */
444
    public boolean getDenyDelete() {
445
        return denyDelete;
1✔
446
    }
447

448
    /**
449
     * Get the flag indicating if deny purge is set for the stream
450
     * @return the deny purge flag
451
     */
452
    public boolean getDenyPurge() {
453
        return denyPurge;
1✔
454
    }
455

456
    /**
457
     * Whether discard policy with max message per subject is applied per subject.
458
     * @return the discard new per subject flag
459
     */
460
    public boolean isDiscardNewPerSubject() {
461
        return discardNewPerSubject;
1✔
462
    }
463

464
    /**
465
     * Metadata for the stream
466
     * @return the metadata map. Might be null.
467
     */
468
    public Map<String, String> getMetadata() {
469
        return metadata;
1✔
470
    }
471

472
    /**
473
     * The first sequence used in the stream.
474
     * @return the first sequence
475
     */
476
    public long getFirstSequence() {
477
        return firstSequence;
1✔
478
    }
479

480
    /**
481
     * Whether Allow Message TTL is set
482
     * @return the flag
483
     */
484
    public boolean isAllowMessageTtl() {
485
        return allowMessageTtl;
1✔
486
    }
487

488
    /**
489
     * Get the Subject Delete Marker TTL duration. May be null.
490
     * @return The duration
491
     */
492
    public Duration getSubjectDeleteMarkerTtl() {
493
        return subjectDeleteMarkerTtl;
1✔
494
    }
495

496
    @Override
497
    public String toString() {
498
        return toJson();
1✔
499
    }
500

501
    /**
502
     * Creates a builder for the stream configuration.
503
     * @return a stream configuration builder
504
     */
505
    public static Builder builder() {
506
        return new Builder();
1✔
507
    }
508

509
    /**
510
     * Creates a builder to copy the stream configuration.
511
     * @param sc an existing StreamConfiguration
512
     * @return a stream configuration builder
513
     */
514
    public static Builder builder(StreamConfiguration sc) {
515
        return new Builder(sc);
1✔
516
    }
517

518
    /**
519
     * StreamConfiguration is created using a Builder. The builder supports chaining and will
520
     * create a default set of options if no methods are calls.
521
     * 
522
     * <p>{@code new StreamConfiguration.Builder().build()} will create a new StreamConfiguration.
523
     * 
524
     */
525
    public static class Builder {
526

527
        private String name = null;
1✔
528
        private String description = null;
1✔
529
        private final List<String> subjects = new ArrayList<>();
1✔
530
        private RetentionPolicy retentionPolicy = RetentionPolicy.Limits;
1✔
531
        private CompressionOption compressionOption = CompressionOption.None;
1✔
532
        private long maxConsumers = -1;
1✔
533
        private long maxMsgs = -1;
1✔
534
        private long maxMsgsPerSubject = -1;
1✔
535
        private long maxBytes = -1;
1✔
536
        private Duration maxAge = Duration.ZERO;
1✔
537
        private int maxMsgSize = -1;
1✔
538
        private StorageType storageType = StorageType.File;
1✔
539
        private int replicas = 1;
1✔
540
        private boolean noAck = false;
1✔
541
        private String templateOwner = null;
1✔
542
        private DiscardPolicy discardPolicy = DiscardPolicy.Old;
1✔
543
        private Duration duplicateWindow = Duration.ZERO;
1✔
544
        private Placement placement = null;
1✔
545
        private Republish republish = null;
1✔
546
        private SubjectTransform subjectTransform = null;
1✔
547
        private ConsumerLimits consumerLimits = null;
1✔
548
        private Mirror mirror = null;
1✔
549
        private final List<Source> sources = new ArrayList<>();
1✔
550
        private boolean sealed = false;
1✔
551
        private boolean allowRollup = false;
1✔
552
        private boolean allowDirect = false;
1✔
553
        private boolean mirrorDirect = false;
1✔
554
        private boolean denyDelete = false;
1✔
555
        private boolean denyPurge = false;
1✔
556
        private boolean discardNewPerSubject = false;
1✔
557
        private Map<String, String> metadata;
558
        private long firstSequence = 1;
1✔
559
        private boolean allowMessageTtl = false;
1✔
560
        private Duration subjectDeleteMarkerTtl;
561

562
        /**
563
         * Default Builder
564
         */
565
        public Builder() {}
1✔
566

567
        /**
568
         * Update Builder, useful if you need to update a configuration
569
         * @param sc the configuration to copy
570
         */
571
        public Builder(StreamConfiguration sc) {
1✔
572
            if (sc != null) {
1✔
573
                this.name = sc.name;
1✔
574
                this.description = sc.description;
1✔
575
                subjects(sc.subjects);
1✔
576
                this.retentionPolicy = sc.retentionPolicy;
1✔
577
                this.compressionOption = sc.compressionOption;
1✔
578
                this.maxConsumers = sc.maxConsumers;
1✔
579
                this.maxMsgs = sc.maxMsgs;
1✔
580
                this.maxMsgsPerSubject = sc.maxMsgsPerSubject;
1✔
581
                this.maxBytes = sc.maxBytes;
1✔
582
                this.maxAge = sc.maxAge;
1✔
583
                this.maxMsgSize = sc.maxMsgSize;
1✔
584
                this.storageType = sc.storageType;
1✔
585
                this.replicas = sc.replicas;
1✔
586
                this.noAck = sc.noAck;
1✔
587
                this.templateOwner = sc.templateOwner;
1✔
588
                this.discardPolicy = sc.discardPolicy;
1✔
589
                this.duplicateWindow = sc.duplicateWindow;
1✔
590
                this.placement = sc.placement;
1✔
591
                this.republish = sc.republish;
1✔
592
                this.subjectTransform = sc.subjectTransform;
1✔
593
                this.consumerLimits = sc.consumerLimits;
1✔
594
                this.mirror = sc.mirror;
1✔
595
                sources(sc.sources);
1✔
596
                this.sealed = sc.sealed;
1✔
597
                this.allowRollup = sc.allowRollup;
1✔
598
                this.allowDirect = sc.allowDirect;
1✔
599
                this.mirrorDirect = sc.mirrorDirect;
1✔
600
                this.denyDelete = sc.denyDelete;
1✔
601
                this.denyPurge = sc.denyPurge;
1✔
602
                this.discardNewPerSubject = sc.discardNewPerSubject;
1✔
603
                if (sc.metadata != null) {
1✔
604
                    this.metadata = new HashMap<>(sc.metadata);
1✔
605
                }
606
                this.firstSequence = sc.firstSequence;
1✔
607
                this.allowMessageTtl = sc.allowMessageTtl;
1✔
608
                this.subjectDeleteMarkerTtl = sc.subjectDeleteMarkerTtl;
1✔
609
            }
610
        }
1✔
611

612
        /**
613
         * Sets the name of the stream.
614
         * @param name name of the stream.
615
         * @return the builder
616
         */
617
        public Builder name(String name) {
618
            this.name =  validateStreamName(name, false);
1✔
619
            return this;
1✔
620
        }
621

622
        /**
623
         * Sets the description
624
         * @param description the description
625
         * @return the builder
626
         */
627
        public Builder description(String description) {
628
            this.description = description;
1✔
629
            return this;
1✔
630
        }
631

632
        /**
633
         * Sets the subjects in the StreamConfiguration.
634
         * @param subjects the stream's subjects
635
         * @return The Builder
636
         */
637
        public Builder subjects(String... subjects) {
638
            this.subjects.clear();
1✔
639
            return addSubjects(subjects);
1✔
640
        }
641

642
        /**
643
         * Sets the subjects in the StreamConfiguration.
644
         * @param subjects the stream's subjects
645
         * @return The Builder
646
         */
647
        public Builder subjects(Collection<String> subjects) {
648
            this.subjects.clear();
1✔
649
            return addSubjects(subjects);
1✔
650
        }
651

652
        /**
653
         * Adds unique subjects into the StreamConfiguration.
654
         * @param subjects the stream's subjects to add
655
         * @return The Builder
656
         */
657
        public Builder addSubjects(String... subjects) {
658
            if (subjects != null) {
1✔
659
                return addSubjects(Arrays.asList(subjects));
1✔
660
            }
661
            return this;
1✔
662
        }
663

664
        /**
665
         * Adds unique subjects into the StreamConfiguration.
666
         * @param subjects the stream's subjects to add
667
         * @return The Builder
668
         */
669
        public Builder addSubjects(Collection<String> subjects) {
670
            if (subjects != null) {
1✔
671
                for (String sub : subjects) {
1✔
672
                    if (sub != null && !this.subjects.contains(sub)) {
1✔
673
                        this.subjects.add(sub);
1✔
674
                    }
675
                }
1✔
676
            }
677
            return this;
1✔
678
        }
679

680
        /**
681
         * Sets the retention policy in the StreamConfiguration.
682
         * @param policy the retention policy of the StreamConfiguration
683
         * @return The Builder
684
         */
685
        public Builder retentionPolicy(RetentionPolicy policy) {
686
            this.retentionPolicy = policy == null ? RetentionPolicy.Limits : policy;
1✔
687
            return this;
1✔
688
        }
689

690
        /**
691
         * Sets the compression option in the StreamConfiguration.
692
         * @param compressionOption the compression option of the StreamConfiguration
693
         * @return The Builder
694
         */
695
        public Builder compressionOption(CompressionOption compressionOption) {
696
            this.compressionOption = compressionOption == null ? CompressionOption.None : compressionOption;
1✔
697
            return this;
1✔
698
        }
699

700
        /**
701
         * Sets the maximum number of consumers in the StreamConfiguration.
702
         * @param maxConsumers the maximum number of consumers
703
         * @return The Builder
704
         */        
705
        public Builder maxConsumers(long maxConsumers) {
706
            this.maxConsumers = validateMaxConsumers(maxConsumers);
1✔
707
            return this;
1✔
708
        }
709

710
        /**
711
         * Sets the maximum number of messages in the StreamConfiguration.
712
         * @param maxMsgs the maximum number of messages
713
         * @return The Builder
714
         */
715
        public Builder maxMessages(long maxMsgs) {
716
            this.maxMsgs = validateMaxMessages(maxMsgs);
1✔
717
            return this;
1✔
718
        }
719

720
        /**
721
         * Sets the maximum number of message per subject in the StreamConfiguration.
722
         * @param maxMsgsPerSubject the maximum number of messages
723
         * @return The Builder
724
         */
725
        public Builder maxMessagesPerSubject(long maxMsgsPerSubject) {
726
            this.maxMsgsPerSubject = validateMaxMessagesPerSubject(maxMsgsPerSubject);
1✔
727
            return this;
1✔
728
        }
729

730
        /**
731
         * Sets the maximum number of bytes in the StreamConfiguration.
732
         * @param maxBytes the maximum number of bytes
733
         * @return The Builder
734
         */
735
        public Builder maxBytes(long maxBytes) {
736
            this.maxBytes = validateMaxBytes(maxBytes);
1✔
737
            return this;
1✔
738
        }
739

740
        /**
741
         * Sets the maximum age in the StreamConfiguration.
742
         * @param maxAge the maximum message age
743
         * @return The Builder
744
         */
745
        public Builder maxAge(Duration maxAge) {
746
            this.maxAge = validateDurationNotRequiredGtOrEqZero(maxAge, Duration.ZERO);
1✔
747
            return this;
1✔
748
        }
749

750
        /**
751
         * Sets the maximum age in the StreamConfiguration.
752
         * @param maxAgeMillis the maximum message age
753
         * @return The Builder
754
         */
755
        public Builder maxAge(long maxAgeMillis) {
756
            this.maxAge = validateDurationNotRequiredGtOrEqZero(maxAgeMillis);
1✔
757
            return this;
1✔
758
        }
759

760
        /**
761
         * Sets the maximum message size in the StreamConfiguration.
762
         * @deprecated the server value is a 32-bit signed value. Use {@link #maximumMessageSize(int)} instead.
763
         * @param maxMsgSize the maximum message size
764
         * @return The Builder
765
         */
766
        @Deprecated
767
        public Builder maxMsgSize(long maxMsgSize) {
768
            this.maxMsgSize = (int)validateMaxMessageSize(maxMsgSize);
1✔
769
            return this;
1✔
770
        }
771

772
        /**
773
         * Sets the maximum message size in the StreamConfiguration.
774
         * @param maxMsgSize the maximum message size
775
         * @return The Builder
776
         */
777
        public Builder maximumMessageSize(int maxMsgSize) {
778
            this.maxMsgSize = (int)validateMaxMessageSize(maxMsgSize);
1✔
779
            return this;
1✔
780
        }
781

782
        /**
783
         * Sets the storage type in the StreamConfiguration.
784
         * @param storageType the storage type
785
         * @return The Builder
786
         */        
787
        public Builder storageType(StorageType storageType) {
788
            this.storageType = storageType == null ? StorageType.File : storageType;
1✔
789
            return this;
1✔
790
        }
791

792
        /**
793
         * Sets the number of replicas a message must be stored on in the StreamConfiguration.
794
         * Must be 1 to 5 inclusive
795
         * @param replicas the number of replicas to store this message on
796
         * @return The Builder
797
         */
798
        public Builder replicas(int replicas) {
799
            this.replicas = validateNumberOfReplicas(replicas);
1✔
800
            return this;
1✔
801
        }
802

803
        /**
804
         * Sets the acknowledgement mode of the StreamConfiguration.  if no acknowledgements are
805
         * set, then acknowledgements are not sent back to the client.  The default is false.
806
         * @param noAck true to disable acknowledgements.
807
         * @return The Builder
808
         */        
809
        public Builder noAck(boolean noAck) {
810
            this.noAck = noAck;
1✔
811
            return this;
1✔
812
        }
813

814
        /**
815
         * Sets the template a stream in the form of raw JSON.
816
         * @param templateOwner the stream template of the stream.
817
         * @return the builder
818
         */
819
        public Builder templateOwner(String templateOwner) {
820
            this.templateOwner = emptyAsNull(templateOwner);
1✔
821
            return this;
1✔
822
        }
823

824
        /**
825
         * Sets the discard policy in the StreamConfiguration.
826
         * @param policy the discard policy of the StreamConfiguration
827
         * @return The Builder
828
         */
829
        public Builder discardPolicy(DiscardPolicy policy) {
830
            this.discardPolicy = policy == null ? DiscardPolicy.Old : policy;
1✔
831
            return this;
1✔
832
        }
833

834
        /**
835
         * Sets the duplicate checking window in the StreamConfiguration.  A Duration.Zero
836
         * disables duplicate checking.  Duplicate checking is disabled by default.
837
         * @param window duration to hold message ids for duplicate checking.
838
         * @return The Builder
839
         */
840
        public Builder duplicateWindow(Duration window) {
841
            this.duplicateWindow = validateDurationNotRequiredGtOrEqZero(window, Duration.ZERO);
1✔
842
            return this;
1✔
843
        }
844

845
        /**
846
         * Sets the duplicate checking window in the StreamConfiguration.  A Duration.Zero
847
         * disables duplicate checking.  Duplicate checking is disabled by default.
848
         * @param windowMillis duration to hold message ids for duplicate checking.
849
         * @return The Builder
850
         */
851
        public Builder duplicateWindow(long windowMillis) {
852
            this.duplicateWindow = validateDurationNotRequiredGtOrEqZero(windowMillis);
1✔
853
            return this;
1✔
854
        }
855

856
        /**
857
         * Sets the placement directive object
858
         * @param placement the placement directive object
859
         * @return The Builder
860
         */
861
        public Builder placement(Placement placement) {
862
            this.placement = placement;
1✔
863
            return this;
1✔
864
        }
865

866
        /**
867
         * Sets the republish config object
868
         * @param republish the republish config object
869
         * @return The Builder
870
         */
871
        public Builder republish(Republish republish) {
872
            this.republish = republish;
1✔
873
            return this;
1✔
874
        }
875

876
        /**
877
         * Sets the subjectTransform config object
878
         * @param subjectTransform the subjectTransform config object
879
         * @return The Builder
880
         */
881
        public Builder subjectTransform(SubjectTransform subjectTransform) {
882
            this.subjectTransform = subjectTransform;
1✔
883
            return this;
1✔
884
        }
885

886
        /**
887
         * Sets the consumerLimits config object
888
         * @param consumerLimits the consumerLimits config object
889
         * @return The Builder
890
         */
891
        public Builder consumerLimits(ConsumerLimits consumerLimits) {
892
            this.consumerLimits = consumerLimits;
1✔
893
            return this;
1✔
894
        }
895

896
        /**
897
         * Sets the mirror  object
898
         * @param mirror the mirror object
899
         * @return The Builder
900
         */
901
        public Builder mirror(Mirror mirror) {
902
            this.mirror = mirror;
1✔
903
            return this;
1✔
904
        }
905

906
        /**
907
         * Sets the sources in the StreamConfiguration.
908
         * @param sources the stream's sources
909
         * @return The Builder
910
         */
911
        public Builder sources(Source... sources) {
912
            this.sources.clear();
1✔
913
            return addSources(sources);
1✔
914
        }
915

916
        /**
917
         * Add the sources into the StreamConfiguration.
918
         * @param sources the stream's sources
919
         * @return The Builder
920
         */
921
        public Builder sources(Collection<Source> sources) {
922
            this.sources.clear();
1✔
923
            return addSources(sources);
1✔
924
        }
925

926
        /**
927
         * Add the sources into the StreamConfiguration.
928
         * @param sources the stream's sources
929
         * @return The Builder
930
         */
931
        public Builder addSources(Source... sources) {
932
            return addSources(Arrays.asList(sources));
1✔
933
        }
934

935
        /**
936
         * Sets the sources in the StreamConfiguration.
937
         * @param sources the stream's sources
938
         * @return The Builder
939
         */
940
        public Builder addSources(Collection<Source> sources) {
941
            if (sources != null) {
1✔
942
                for (Source source : sources) {
1✔
943
                    if (source != null && !this.sources.contains(source)) {
1✔
944
                        this.sources.add(source);
1✔
945
                    }
946
                }
1✔
947
            }
948
            return this;
1✔
949
        }
950

951
        /**
952
         * Add a source into the StreamConfiguration.
953
         * @param source a stream source
954
         * @return The Builder
955
         */
956
        public Builder addSource(Source source) {
957
            if (source != null && !this.sources.contains(source)) {
1✔
958
                this.sources.add(source);
1✔
959
            }
960
            return this;
1✔
961
        }
962

963
        /**
964
         * Set whether to seal the stream.
965
         * INTERNAL USE ONLY. Scoped protected for test purposes.
966
         * @param sealed the sealed setting
967
         * @return The Builder
968
         */
969
        protected Builder sealed(boolean sealed) {
970
            this.sealed = sealed;
1✔
971
            return this;
1✔
972
        }
973

974
        /**
975
         * Set whether to allow the rollup feature for a stream
976
         * @param allowRollup the allow rollup setting
977
         * @return The Builder
978
         */
979
        public Builder allowRollup(boolean allowRollup) {
980
            this.allowRollup = allowRollup;
1✔
981
            return this;
1✔
982
        }
983

984
        /**
985
         * Set whether to allow direct message access for a stream
986
         * @param allowDirect the allow direct setting
987
         * @return The Builder
988
         */
989
        public Builder allowDirect(boolean allowDirect) {
990
            this.allowDirect = allowDirect;
1✔
991
            return this;
1✔
992
        }
993

994
        /**
995
         * Set whether to allow unified direct access for mirrors
996
         * @param mirrorDirect the allow direct setting
997
         * @return The Builder
998
         */
999
        public Builder mirrorDirect(boolean mirrorDirect) {
1000
            this.mirrorDirect = mirrorDirect;
1✔
1001
            return this;
1✔
1002
        }
1003

1004
        /**
1005
         * Set whether to deny deleting messages from the stream
1006
         * @param denyDelete the deny delete setting
1007
         * @return The Builder
1008
         */
1009
        public Builder denyDelete(boolean denyDelete) {
1010
            this.denyDelete = denyDelete;
1✔
1011
            return this;
1✔
1012
        }
1013

1014
        /**
1015
         * Set whether to deny purging messages from the stream
1016
         * @param denyPurge the deny purge setting
1017
         * @return The Builder
1018
         */
1019
        public Builder denyPurge(boolean denyPurge) {
1020
            this.denyPurge = denyPurge;
1✔
1021
            return this;
1✔
1022
        }
1023

1024
        /**
1025
         * Set whether discard policy new with max message per subject applies to existing subjects, not just new subjects.
1026
         * @param discardNewPerSubject the setting
1027
         * @return The Builder
1028
         */
1029
        public Builder discardNewPerSubject(boolean discardNewPerSubject) {
1030
            this.discardNewPerSubject = discardNewPerSubject;
1✔
1031
            return this;
1✔
1032
        }
1033

1034
        /**
1035
         * Set this stream to be sealed. This is irreversible.
1036
         * @return The Builder
1037
         */
1038
        public Builder seal() {
1039
            this.sealed = true;
1✔
1040
            return this;
1✔
1041
        }
1042

1043
        /**
1044
         * Sets the metadata for the configuration
1045
         * @param metadata the metadata map
1046
         * @return The Builder
1047
         */
1048
        public Builder metadata(Map<String, String> metadata) {
1049
            this.metadata = metadata;
1✔
1050
            return this;
1✔
1051
        }
1052

1053
        /**
1054
         * Sets the first sequence to be used. 1 is the default. All values less than 2 are treated as 1.
1055
         * @param firstSeq specify the first_seq in the stream config when creating the stream.
1056
         * @return The Builder
1057
         */
1058
        public Builder firstSequence(long firstSeq) {
1059
            this.firstSequence = firstSeq > 1 ? firstSeq : 1;
1✔
1060
            return this;
1✔
1061
        }
1062

1063
        /**
1064
         * Set to allow per message TTL to true
1065
         * @return The Builder
1066
         */
1067
        public Builder allowMessageTtl() {
1068
            this.allowMessageTtl = true;
1✔
1069
            return this;
1✔
1070
        }
1071

1072
        /**
1073
         * Set allow per message TTL flag
1074
         * @param allowMessageTtl the flag
1075
         * @return The Builder
1076
         */
1077
        public Builder allowMessageTtl(boolean allowMessageTtl) {
1078
            this.allowMessageTtl = allowMessageTtl;
1✔
1079
            return this;
1✔
1080
        }
1081

1082
        /**
1083
         * Set the subject delete marker TTL duration. Server accepts 1 second or more.
1084
         * null has the effect of clearing the subject delete marker TTL
1085
         * @param subjectDeleteMarkerTtl the TTL duration
1086
         * @return The Builder
1087
         */
1088
        public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) {
1089
            this.subjectDeleteMarkerTtl = validateDurationNotRequiredGtOrEqSeconds(1, subjectDeleteMarkerTtl, null, "Subject Delete Marker Ttl");
1✔
1090
            return this;
1✔
1091
        }
1092

1093
        /**
1094
         * Set the subject delete marker TTL duration in milliseconds. Server accepts 1 second or more.
1095
         * 0 or less has the effect of clearing the subject delete marker TTL
1096
         * @param subjectDeleteMarkerTtlMillis the TTL duration in milliseconds
1097
         * @return The Builder
1098
         */
1099
        public Builder subjectDeleteMarkerTtl(long subjectDeleteMarkerTtlMillis) {
1100
            this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtlMillis <= 0 ? null
1✔
NEW
1101
                : validateDurationGtOrEqSeconds(1, subjectDeleteMarkerTtlMillis, "Subject Delete Marker Ttl");
×
1102
            return this;
×
1103
        }
1104

1105
        /**
1106
         * Builds the StreamConfiguration
1107
         * @return a stream configuration.
1108
         */
1109
        public StreamConfiguration build() {
1110
            return new StreamConfiguration(this);
1✔
1111
        }
1112
    }
1113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc