• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1925

20 Mar 2025 03:46PM UTC coverage: 95.662% (-0.3%) from 95.95%
#1925

push

github

web-flow
Merge pull request #1239 from nats-io/main-2-11

Main for server v2.11

179 of 219 new or added lines in 15 files covered. (81.74%)

2 existing lines in 2 files now uncovered.

11600 of 12126 relevant lines covered (95.66%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.01
/src/main/java/io/nats/client/api/ConsumerConfiguration.java
1
// Copyright 2020 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.api;
15

16
import io.nats.client.PullSubscribeOptions;
17
import io.nats.client.PushSubscribeOptions;
18
import io.nats.client.support.*;
19

20
import java.time.Duration;
21
import java.time.ZonedDateTime;
22
import java.util.*;
23

24
import static io.nats.client.support.ApiConstants.*;
25
import static io.nats.client.support.JsonUtils.beginJson;
26
import static io.nats.client.support.JsonUtils.endJson;
27
import static io.nats.client.support.JsonValueUtils.*;
28
import static io.nats.client.support.NatsJetStreamClientError.JsConsumerNameDurableMismatch;
29
import static io.nats.client.support.Validator.*;
30

31
/**
32
 * The ConsumerConfiguration class specifies the configuration for creating a JetStream consumer on the client and
33
 * if necessary the server.
34
 * Options are created using a ConsumerConfiguration.Builder.
35
 * <p>ConsumerConfiguration is intended to be used with  {@link io.nats.client.JetStreamManagement#createConsumer(String, ConsumerConfiguration) JetStreamManagement.createConsumer()}.
36
 * <P> By default this will create a <b>pull consumer</b> unless {@link ConsumerConfiguration.Builder#deliverSubject(String) ConsumerConfiguration.Builder.deliverSubject(String) } is set.
37
 */
38
public class ConsumerConfiguration implements JsonSerializable {
39
    @Deprecated
40
    public static final Duration DURATION_MIN = Duration.ofNanos(1);
1✔
41

42
    public static final DeliverPolicy DEFAULT_DELIVER_POLICY = DeliverPolicy.All;
1✔
43
    public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.Explicit;
1✔
44
    public static final ReplayPolicy DEFAULT_REPLAY_POLICY = ReplayPolicy.Instant;
1✔
45
    public static final PriorityPolicy DEFAULT_PRIORITY_POLICY = PriorityPolicy.None;
1✔
46

47
    public static final Duration DURATION_UNSET = Duration.ZERO;
1✔
48
    public static final Duration MIN_IDLE_HEARTBEAT = Duration.ofMillis(100);
1✔
49

50
    public static final int INTEGER_UNSET = -1;
51
    public static final long LONG_UNSET = -1;
52
    public static final long ULONG_UNSET = 0;
53
    public static final long DURATION_UNSET_LONG = 0;
54
    public static final long DURATION_MIN_LONG = 1;
55
    public static final int STANDARD_MIN = 0;
56
    public static final int MAX_DELIVER_MIN = 1;
57

58
    public static final long MIN_IDLE_HEARTBEAT_NANOS = MIN_IDLE_HEARTBEAT.toNanos();
1✔
59
    public static final long MIN_IDLE_HEARTBEAT_MILLIS = MIN_IDLE_HEARTBEAT.toMillis();
1✔
60

61
    protected final DeliverPolicy deliverPolicy;
62
    protected final AckPolicy ackPolicy;
63
    protected final ReplayPolicy replayPolicy;
64
    protected final String description;
65
    protected final String durable;
66
    protected final String name;
67
    protected final String deliverSubject;
68
    protected final String deliverGroup;
69
    protected final String sampleFrequency;
70
    protected final ZonedDateTime startTime;
71
    protected final Duration ackWait;
72
    protected final Duration idleHeartbeat;
73
    protected final Duration maxExpires;
74
    protected final Duration inactiveThreshold;
75
    protected final Long startSeq; // server side this is unsigned
76
    protected final Integer maxDeliver;
77
    protected final Long rateLimit; // server side this is unsigned
78
    protected final Integer maxAckPending;
79
    protected final Integer maxPullWaiting;
80
    protected final Integer maxBatch;
81
    protected final Integer maxBytes;
82
    protected final Integer numReplicas;
83
    protected final ZonedDateTime pauseUntil;
84
    protected final Boolean flowControl;
85
    protected final Boolean headersOnly;
86
    protected final Boolean memStorage;
87
    protected final List<Duration> backoff;
88
    protected final Map<String, String> metadata;
89
    protected final List<String> filterSubjects;
90
    protected final List<String> priorityGroups;
91
    protected final PriorityPolicy priorityPolicy;
92

93
    protected ConsumerConfiguration(ConsumerConfiguration cc) {
1✔
94
        this.deliverPolicy = cc.deliverPolicy;
1✔
95
        this.ackPolicy = cc.ackPolicy;
1✔
96
        this.replayPolicy = cc.replayPolicy;
1✔
97
        this.description = cc.description;
1✔
98
        this.durable = cc.durable;
1✔
99
        this.name = cc.name;
1✔
100
        this.deliverSubject = cc.deliverSubject;
1✔
101
        this.deliverGroup = cc.deliverGroup;
1✔
102
        this.sampleFrequency = cc.sampleFrequency;
1✔
103
        this.startTime = cc.startTime;
1✔
104
        this.ackWait = cc.ackWait;
1✔
105
        this.idleHeartbeat = cc.idleHeartbeat;
1✔
106
        this.maxExpires = cc.maxExpires;
1✔
107
        this.inactiveThreshold = cc.inactiveThreshold;
1✔
108
        this.startSeq = cc.startSeq;
1✔
109
        this.maxDeliver = cc.maxDeliver;
1✔
110
        this.rateLimit = cc.rateLimit;
1✔
111
        this.maxAckPending = cc.maxAckPending;
1✔
112
        this.maxPullWaiting = cc.maxPullWaiting;
1✔
113
        this.maxBatch = cc.maxBatch;
1✔
114
        this.maxBytes = cc.maxBytes;
1✔
115
        this.numReplicas = cc.numReplicas;
1✔
116
        this.pauseUntil = cc.pauseUntil;
1✔
117
        this.flowControl = cc.flowControl;
1✔
118
        this.headersOnly = cc.headersOnly;
1✔
119
        this.memStorage = cc.memStorage;
1✔
120
        this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff);
1✔
121
        this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata);
1✔
122
        this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
1✔
123
        this.priorityGroups = cc.priorityGroups == null ? null : new ArrayList<>(cc.priorityGroups);
1✔
124
        this.priorityPolicy = cc.priorityPolicy;
1✔
125
    }
1✔
126

127
    // For the builder
128
    protected ConsumerConfiguration(Builder b)
129
    {
1✔
130
        this.deliverPolicy = b.deliverPolicy;
1✔
131
        this.ackPolicy = b.ackPolicy;
1✔
132
        this.replayPolicy = b.replayPolicy;
1✔
133

134
        this.description = b.description;
1✔
135
        this.durable = b.durable;
1✔
136
        this.name = b.name;
1✔
137
        this.startTime = b.startTime;
1✔
138
        this.ackWait = b.ackWait;
1✔
139
        this.sampleFrequency = b.sampleFrequency;
1✔
140
        this.deliverSubject = b.deliverSubject;
1✔
141
        this.deliverGroup = b.deliverGroup;
1✔
142
        this.idleHeartbeat = b.idleHeartbeat;
1✔
143
        this.maxExpires = b.maxExpires;
1✔
144
        this.inactiveThreshold = b.inactiveThreshold;
1✔
145

146
        this.startSeq = b.startSeq;
1✔
147
        this.maxDeliver = b.maxDeliver;
1✔
148
        this.rateLimit = b.rateLimit;
1✔
149
        this.maxAckPending = b.maxAckPending;
1✔
150
        this.maxPullWaiting = b.maxPullWaiting;
1✔
151
        this.maxBatch = b.maxBatch;
1✔
152
        this.maxBytes = b.maxBytes;
1✔
153
        this.numReplicas = b.numReplicas;
1✔
154
        this.pauseUntil = b.pauseUntil;
1✔
155

156
        this.flowControl = b.flowControl;
1✔
157
        this.headersOnly = b.headersOnly;
1✔
158
        this.memStorage = b.memStorage;
1✔
159

160
        this.backoff = b.backoff;
1✔
161
        this.metadata = b.metadata;
1✔
162
        this.filterSubjects = b.filterSubjects;
1✔
163

164
        this.priorityGroups = b.priorityGroups;
1✔
165
        this.priorityPolicy = b.priorityPolicy;
1✔
166
    }
1✔
167

168
    /**
169
     * Returns a JSON representation of this consumer configuration.
170
     * @return json consumer configuration json string
171
     */
172
    @Override
173
        public String toJson() {
174
        StringBuilder sb = beginJson();
1✔
175
        JsonUtils.addField(sb, DESCRIPTION, description);
1✔
176
        JsonUtils.addField(sb, DURABLE_NAME, durable);
1✔
177
        JsonUtils.addField(sb, NAME, name);
1✔
178
        JsonUtils.addField(sb, DELIVER_SUBJECT, deliverSubject);
1✔
179
        JsonUtils.addField(sb, DELIVER_GROUP, deliverGroup);
1✔
180
        JsonUtils.addField(sb, DELIVER_POLICY, GetOrDefault(deliverPolicy).toString());
1✔
181
        JsonUtils.addFieldWhenGtZero(sb, OPT_START_SEQ, startSeq);
1✔
182
        JsonUtils.addField(sb, OPT_START_TIME, startTime);
1✔
183
        JsonUtils.addField(sb, ACK_POLICY, GetOrDefault(ackPolicy).toString());
1✔
184
        JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
1✔
185
        JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
1✔
186
        JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
1✔
187
        JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
1✔
188
        JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
1✔
189
        JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
1✔
190
        JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
1✔
191
        JsonUtils.addFldWhenTrue(sb, FLOW_CONTROL, flowControl);
1✔
192
        JsonUtils.addField(sb, ApiConstants.MAX_WAITING, maxPullWaiting);
1✔
193
        JsonUtils.addFldWhenTrue(sb, HEADERS_ONLY, headersOnly);
1✔
194
        JsonUtils.addField(sb, MAX_BATCH, maxBatch);
1✔
195
        JsonUtils.addField(sb, MAX_BYTES, maxBytes);
1✔
196
        JsonUtils.addFieldAsNanos(sb, MAX_EXPIRES, maxExpires);
1✔
197
        JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
1✔
198
        JsonUtils.addDurations(sb, BACKOFF, backoff);
1✔
199
        JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
1✔
200
        JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil);
1✔
201
        JsonUtils.addField(sb, MEM_STORAGE, memStorage);
1✔
202
        JsonUtils.addField(sb, METADATA, metadata);
1✔
203
        if (filterSubjects != null) {
1✔
204
            if (filterSubjects.size() > 1) {
1✔
205
                JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
1✔
206
            }
207
            else if (filterSubjects.size() == 1) {
1✔
208
                JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
1✔
209
            }
210
        }
211
        JsonUtils.addStrings(sb, PRIORITY_GROUPS, priorityGroups);
1✔
212
        if (priorityPolicy != null && priorityPolicy != DEFAULT_PRIORITY_POLICY) {
1✔
213
            JsonUtils.addField(sb, PRIORITY_POLICY, priorityPolicy.toString());
1✔
214
        }
215
        return endJson(sb).toString();
1✔
216
    }
217

218
    /**
219
     * Gets the name of the description of this consumer configuration.
220
     * @return name of the description.
221
     */
222
    public String getDescription() {
223
        return description;
1✔
224
    }
225

226
    /**
227
     * Gets the name of the durable name for this consumer configuration.
228
     * @return name of the durable.
229
     */
230
    public String getDurable() {
231
        return durable;
1✔
232
    }
233

234
    /**
235
     * Gets the name of the consumer name for this consumer configuration.
236
     * @return name of the consumer.
237
     */
238
    public String getName() {
239
        return name;
1✔
240
    }
241

242
    /**
243
     * Gets the deliver subject of this consumer configuration.
244
     * @return the deliver subject.
245
     */
246
    public String getDeliverSubject() {
247
        return deliverSubject;
1✔
248
    }
249

250
    /**
251
     * Gets the deliver group of this consumer configuration.
252
     * @return the deliver group.
253
     */
254
    public String getDeliverGroup() {
255
        return deliverGroup;
1✔
256
    }
257

258
    /**
259
     * Gets the deliver policy of this consumer configuration.
260
     * @return the deliver policy.
261
     */
262
    public DeliverPolicy getDeliverPolicy() {
263
        return GetOrDefault(deliverPolicy);
1✔
264
    }
265

266
    /**
267
     * Gets the start sequence of this consumer configuration.
268
     * @return the start sequence.
269
     */
270
    public long getStartSequence() {
271
        return getOrUnsetUlong(startSeq);
1✔
272
    }
273

274
    /**
275
     * Gets the start time of this consumer configuration.
276
     * @return the start time.
277
     */
278
    public ZonedDateTime getStartTime() {
279
        return startTime;
1✔
280
    }
281

282
    /**
283
     * Gets the acknowledgment policy of this consumer configuration.
284
     * @return the acknowledgment policy.
285
     */
286
    public AckPolicy getAckPolicy() {
287
        return GetOrDefault(ackPolicy);
1✔
288
    }
289

290
    /**
291
     * Gets the acknowledgment wait of this consumer configuration.
292
     * @return the acknowledgment wait duration.
293
     */
294
    public Duration getAckWait() {
295
        return ackWait;
1✔
296
    }
297

298
    /**
299
     * Gets the max delivery amount of this consumer configuration.
300
     * @return the max delivery amount.
301
     */
302
    public long getMaxDeliver() {
303
        return getOrUnset(maxDeliver);
1✔
304
    }
305

306
    /**
307
     * Gets the filter subject of this consumer configuration.
308
     * With the introduction of multiple filter subjects, this method will
309
     * return null if there are not exactly one filter subjects
310
     * @return the first filter subject.
311
     */
312
    public String getFilterSubject() {
313
        return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
1✔
314
    }
315

316
    /**
317
     * Gets the filter subjects as a list. May be null, otherwise won't be empty
318
     * @return the list
319
     */
320
    public List<String> getFilterSubjects() {
321
        return filterSubjects;
1✔
322
    }
323

324
    /**
325
     * Gets the priority groups as a list. May be null, otherwise won't be empty
326
     * @return the list
327
     */
328
    public List<String> getPriorityGroups() {
329
        return priorityGroups;
1✔
330
    }
331

332
    /**
333
     * Whether there are multiple filter subjects for this consumer configuration.
334
     * @return true if there are multiple filter subjects
335
     */
336
    public boolean hasMultipleFilterSubjects() {
337
        return filterSubjects != null && filterSubjects.size() > 1;
1✔
338
    }
339

340
    /**
341
     * Gets the replay policy of this consumer configuration.
342
     * @return the replay policy.
343
     */
344
    public ReplayPolicy getReplayPolicy() {
345
        return GetOrDefault(replayPolicy);
1✔
346
    }
347

348
    /**
349
     * Gets the rate limit for this consumer configuration.
350
     * @return the rate limit in bits per second
351
     */
352
    public long getRateLimit() {
353
        return getOrUnsetUlong(rateLimit);
1✔
354
    }
355

356
    /**
357
     * Gets the maximum ack pending configuration.
358
     * @return maximum ack pending.
359
     */
360
    public long getMaxAckPending() {
361
        return getOrUnset(maxAckPending);
1✔
362
    }
363

364
    /**
365
     * Gets the sample frequency.
366
     * @return sampleFrequency.
367
     */
368
    public String getSampleFrequency() {
369
        return sampleFrequency;
1✔
370
    }
371

372

373
    /**
374
     * Gets the idle heart beat wait time
375
     * @return the idle heart beat wait duration.
376
     */
377
    public Duration getIdleHeartbeat() {
378
        return idleHeartbeat;
1✔
379
    }
380

381
    /**
382
     * Get the flow control flag indicating whether it's on or off
383
     * @return the flow control mode
384
     */
385
    public boolean isFlowControl() {
386
        // The way the builder and json reading works it's never false if it's not null
387
        // but this way I can make code coverage happy and not assume.
388
        return Boolean.TRUE.equals(flowControl);
1✔
389
    }
390

391
    /**
392
     * Get the number of pulls that can be outstanding on a pull consumer
393
     * @return the max pull waiting
394
     */
395
    public long getMaxPullWaiting() {
396
        return getOrUnset(maxPullWaiting);
1✔
397
    }
398

399
    /**
400
     * Get the header only flag indicating whether it's on or off
401
     * @return the flow control mode
402
     */
403
    public boolean isHeadersOnly() {
404
        return headersOnly != null && headersOnly;
1✔
405
    }
406

407
    /**
408
     * Get the mem storage flag whether it's on or off.
409
     * @return the mem storage mode
410
     */
411
    public boolean isMemStorage() {
412
        return memStorage != null && memStorage;
1✔
413
    }
414

415
    /**
416
     * Get the max batch size for the server to allow on pull requests.
417
     * @return the max batch size
418
     */
419
    public long getMaxBatch() {
420
        return getOrUnset(maxBatch);
1✔
421
    }
422

423
    /**
424
     * Get the max bytes size for the server to allow on pull requests.
425
     * @return the max byte size
426
     */
427
    public long getMaxBytes() {
428
        return getOrUnset(maxBytes);
1✔
429
    }
430

431
    /**
432
     * Get the max amount of expire time for the server to allow on pull requests.
433
     * @return the max expire
434
     */
435
    public Duration getMaxExpires() {
436
        return maxExpires;
1✔
437
    }
438

439
    /**
440
     * Get the amount of time before the consumer is deemed inactive.
441
     * @return the inactive threshold
442
     */
443
    public Duration getInactiveThreshold() {
444
        return inactiveThreshold;
1✔
445
    }
446

447
    /**
448
     * Get the backoff list; may be empty, will never be null.
449
     * @return the list
450
     */
451
    public List<Duration> getBackoff() {
452
        return backoff == null ? Collections.emptyList() : backoff;
1✔
453
    }
454

455
    /**
456
     * Metadata for the consumer; may be empty, will never be null.
457
     * @return the metadata map
458
     */
459
    public Map<String, String> getMetadata() {
460
        return metadata == null ? Collections.emptyMap() : metadata;
1✔
461
    }
462

463
    /**
464
     * Get the number of consumer replicas.
465
     * @return the replicas count
466
     */
467
    public int getNumReplicas() { return getOrUnset(numReplicas); }
1✔
468

469
    /**
470
     * Get the time until the consumer is paused.
471
     * @return paused until time
472
     */
473
    public ZonedDateTime getPauseUntil() {
474
        return pauseUntil;
1✔
475
    }
476

477
    /**
478
     * Gets the priority policy of this consumer configuration.
479
     * @return the priority policy.
480
     */
481
    public PriorityPolicy getPriorityPolicy() {
482
        return GetOrDefault(priorityPolicy);
1✔
483
    }
484

485
    /**
486
     * Gets whether deliver policy of this consumer configuration was set or left unset
487
     * @return true if the policy was set, false if the policy was not set
488
     */
489
    public boolean deliverPolicyWasSet() {
490
        return deliverPolicy != null;
1✔
491
    }
492

493
    /**
494
     * Gets whether ack policy for this consumer configuration was set or left unset
495
     * @return true if the policy was set, false if the policy was not set
496
     */
497
    public boolean ackPolicyWasSet() {
498
        return ackPolicy != null;
1✔
499
    }
500

501
    /**
502
     * Gets whether replay policy for this consumer configuration was set or left unset
503
     * @return true if the policy was set, false if the policy was not set
504
     */
505
    public boolean replayPolicyWasSet() {
506
        return replayPolicy != null;
1✔
507
    }
508

509
    /**
510
     * Gets whether start sequence for this consumer configuration was set or left unset
511
     * @return true if the start sequence was set by the user
512
     */
513
    public boolean startSeqWasSet() {
514
        return startSeq != null;
1✔
515
    }
516

517
    /**
518
     * Gets whether max deliver for this consumer configuration was set or left unset
519
     * @return true if max deliver was set by the user
520
     */
521
    public boolean maxDeliverWasSet() {
522
        return maxDeliver != null;
1✔
523
    }
524

525
    /**
526
     * Gets whether rate limit for this consumer configuration was set or left unset
527
     * @return true if rate limit was set by the user
528
     */
529
    public boolean rateLimitWasSet() {
530
        return rateLimit != null;
1✔
531
    }
532

533
    /**
534
     * Gets whether max ack pending for this consumer configuration was set or left unset
535
     * @return true if mac ack pending was set by the user
536
     */
537
    public boolean maxAckPendingWasSet() {
538
        return maxAckPending != null;
1✔
539
    }
540

541
    /**
542
     * Gets whether max pull waiting for this consumer configuration was set or left unset
543
     * @return true if max pull waiting was set by the user
544
     */
545
    public boolean maxPullWaitingWasSet() {
546
        return maxPullWaiting != null;
1✔
547
    }
548

549
    /**
550
     * Gets whether max batch for this consumer configuration was set or left unset
551
     * @return true if max batch was set by the user
552
     */
553
    public boolean maxBatchWasSet() {
554
        return maxBatch != null;
1✔
555
    }
556

557
    /**
558
     * Gets whether max bytes for this consumer configuration was set or left unset
559
     * @return true if max bytes was set by the user
560
     */
561
    public boolean maxBytesWasSet() {
562
        return maxBytes != null;
1✔
563
    }
564

565
    /**
566
     * Gets whether flow control for this consumer configuration was set or left unset
567
     * @return true if the policy was set, false if the policy was not set
568
     */
569
    public boolean flowControlWasSet() {
570
        return flowControl != null;
1✔
571
    }
572

573
    /**
574
     * Gets whether headers only for this consumer configuration was set or left unset
575
     * @return true if the policy was set, false if the policy was not set
576
     */
577
    public boolean headersOnlyWasSet() {
578
        return headersOnly != null;
1✔
579
    }
580

581
    /**
582
     * Gets whether mem storage for this consumer configuration was set or left unset
583
     * @return true if the policy was set, false if the policy was not set
584
     */
585
    public boolean memStorageWasSet() {
586
        return memStorage != null;
1✔
587
    }
588

589
    /**
590
     * Gets whether num replicas for this consumer configuration was set or left unset
591
     * @return true if num replicas was set by the user
592
     */
593
    public boolean numReplicasWasSet() {
594
        return numReplicas != null;
1✔
595
    }
596

597
    /**
598
     * Gets whether backoff for this consumer configuration was set or left unset
599
     * @return true if num backoff was set by the user
600
     */
601
    public boolean backoffWasSet() {
602
        return backoff != null;
1✔
603
    }
604

605
    /**
606
     * Gets whether metadata for this consumer configuration was set or left unset
607
     * @return true if num metadata was set by the user
608
     */
609
    public boolean metadataWasSet() {
610
        return metadata != null;
1✔
611
    }
612

613
    /**
614
     * Gets whether priority policy for this consumer configuration was set or left unset
615
     * @return true if the policy was set, false if the policy was not set
616
     */
617
    public boolean priorityPolicyWasSet() {
618
        return priorityPolicy != null;
1✔
619
    }
620

621
    /**
622
     * Creates a builder for the options.
623
     * @return a publish options builder
624
     */
625
    public static Builder builder() {
626
        return new Builder();
1✔
627
    }
628

629
    /**
630
     * Creates a builder for the options.
631
     * @param cc the consumer configuration
632
     * @return a publish options builder
633
     */
634
    public static Builder builder(ConsumerConfiguration cc) {
635
        return cc == null ? new Builder() : new Builder(cc);
1✔
636
    }
637

638
    /**
639
     * ConsumerConfiguration is created using a Builder. The builder supports chaining and will
640
     * create a default set of options if no methods are calls.
641
     *
642
     * <p>{@code new ConsumerConfiguration.Builder().build()} will create a default ConsumerConfiguration.
643
     *
644
     */
645
    public static class Builder {
646
        private DeliverPolicy deliverPolicy;
647
        private AckPolicy ackPolicy;
648
        private ReplayPolicy replayPolicy;
649

650
        private String description;
651
        private String durable;
652
        private String name;
653
        private String deliverSubject;
654
        private String deliverGroup;
655
        private String sampleFrequency;
656

657
        private ZonedDateTime startTime;
658
        private Duration ackWait;
659
        private Duration idleHeartbeat;
660
        private Duration maxExpires;
661
        private Duration inactiveThreshold;
662

663
        private Long startSeq;
664
        private Integer maxDeliver;
665
        private Long rateLimit;
666
        private Integer maxAckPending;
667
        private Integer maxPullWaiting;
668
        private Integer maxBatch;
669
        private Integer maxBytes;
670
        private Integer numReplicas;
671
        private ZonedDateTime pauseUntil;
672

673
        private Boolean flowControl;
674
        private Boolean headersOnly;
675
        private Boolean memStorage;
676

677
        private List<Duration> backoff;
678
        private Map<String, String> metadata;
679
        private List<String> filterSubjects;
680

681
        private List<String> priorityGroups;
682
        private PriorityPolicy priorityPolicy;
683

684
        /**
685
         * Construct the builder
686
         */
687
        public Builder() {}
1✔
688

689
        /**
690
         * Construct the builder and initialize values with the existing ConsumerConfiguration
691
         * @param cc the consumer configuration to clone
692
         */
693
        public Builder(ConsumerConfiguration cc) {
1✔
694
            if (cc != null) {
1✔
695
                this.deliverPolicy = cc.deliverPolicy;
1✔
696
                this.ackPolicy = cc.ackPolicy;
1✔
697
                this.replayPolicy = cc.replayPolicy;
1✔
698

699
                this.description = cc.description;
1✔
700
                this.durable = cc.durable;
1✔
701
                this.name = cc.name;
1✔
702
                this.deliverSubject = cc.deliverSubject;
1✔
703
                this.deliverGroup = cc.deliverGroup;
1✔
704
                this.sampleFrequency = cc.sampleFrequency;
1✔
705

706
                this.startTime = cc.startTime;
1✔
707
                this.ackWait = cc.ackWait;
1✔
708
                this.idleHeartbeat = cc.idleHeartbeat;
1✔
709
                this.maxExpires = cc.maxExpires;
1✔
710
                this.inactiveThreshold = cc.inactiveThreshold;
1✔
711

712
                this.startSeq = cc.startSeq;
1✔
713
                this.maxDeliver = cc.maxDeliver;
1✔
714
                this.rateLimit = cc.rateLimit;
1✔
715
                this.maxAckPending = cc.maxAckPending;
1✔
716
                this.maxPullWaiting = cc.maxPullWaiting;
1✔
717
                this.maxBatch = cc.maxBatch;
1✔
718
                this.maxBytes = cc.maxBytes;
1✔
719
                this.numReplicas = cc.numReplicas;
1✔
720
                this.pauseUntil = cc.pauseUntil;
1✔
721

722
                this.flowControl = cc.flowControl;
1✔
723
                this.headersOnly = cc.headersOnly;
1✔
724
                this.memStorage = cc.memStorage;
1✔
725

726
                if (cc.backoff != null) {
1✔
727
                    this.backoff = new ArrayList<>(cc.backoff);
×
728
                }
729
                if (cc.metadata != null) {
1✔
730
                    this.metadata = new HashMap<>(cc.metadata);
1✔
731
                }
732
                if (cc.filterSubjects != null) {
1✔
733
                    this.filterSubjects = new ArrayList<>(cc.filterSubjects);
1✔
734
                }
735

736
                if (cc.priorityGroups != null) {
1✔
NEW
737
                    this.priorityGroups = new ArrayList<>(cc.priorityGroups);
×
738
                }
739
                this.priorityPolicy = cc.priorityPolicy;
1✔
740
            }
741
        }
1✔
742

743
        /**
744
         * Initialize values from the json string.
745
         * @param json the json string to parse
746
         * @return the builder
747
         * @throws JsonParseException if the json is invalid
748
         */
749
        public Builder json(String json) throws JsonParseException {
750
            return jsonValue(JsonParser.parse(json));
1✔
751
        }
752

753
        /**
754
         * Initialize values from the JsonValue object.
755
         * @param jsonValue the json value object
756
         * @return the builder
757
         */
758
        public Builder jsonValue(JsonValue jsonValue) {
759
            deliverPolicy(DeliverPolicy.get(readString(jsonValue, DELIVER_POLICY)));
1✔
760
            ackPolicy(AckPolicy.get(readString(jsonValue, ACK_POLICY)));
1✔
761

762
            replayPolicy(ReplayPolicy.get(readString(jsonValue, REPLAY_POLICY)));
1✔
763

764
            description(readString(jsonValue, DESCRIPTION));
1✔
765
            durable(readString(jsonValue, DURABLE_NAME));
1✔
766
            name(readString(jsonValue, NAME));
1✔
767
            deliverSubject(readString(jsonValue, DELIVER_SUBJECT));
1✔
768
            deliverGroup(readString(jsonValue, DELIVER_GROUP));
1✔
769
            sampleFrequency(readString(jsonValue, SAMPLE_FREQ));
1✔
770
            startTime(readDate(jsonValue, OPT_START_TIME));
1✔
771
            ackWait(readNanos(jsonValue, ACK_WAIT));
1✔
772
            maxExpires(readNanos(jsonValue, MAX_EXPIRES));
1✔
773
            inactiveThreshold(readNanos(jsonValue, INACTIVE_THRESHOLD));
1✔
774

775
            startSequence(readLong(jsonValue, OPT_START_SEQ));
1✔
776
            maxDeliver(readLong(jsonValue, MAX_DELIVER, INTEGER_UNSET));
1✔
777
            rateLimit(readLong(jsonValue, RATE_LIMIT_BPS));
1✔
778
            maxAckPending(readLong(jsonValue, MAX_ACK_PENDING));
1✔
779
            maxPullWaiting(readLong(jsonValue, MAX_WAITING));
1✔
780
            maxBatch(readLong(jsonValue, MAX_BATCH));
1✔
781
            maxBytes(readLong(jsonValue, MAX_BYTES));
1✔
782

783
            Integer r = readInteger(jsonValue, NUM_REPLICAS);
1✔
784
            if (r != null) {
1✔
785
                if (r == 0) {
1✔
786
                    numReplicas = 0;
1✔
787
                }
788
                else {
789
                    numReplicas(r);
1✔
790
                }
791
            }
792

793
            pauseUntil(readDate(jsonValue, PAUSE_UNTIL));
1✔
794

795
            Duration idleHeartbeat = readNanos(jsonValue, IDLE_HEARTBEAT);
1✔
796
            if (idleHeartbeat != null) {
1✔
797
                if (readBoolean(jsonValue, FLOW_CONTROL, false)) {
1✔
798
                    flowControl(idleHeartbeat);
1✔
799
                }
800
                else {
801
                    idleHeartbeat(idleHeartbeat);
1✔
802
                }
803
            }
804

805
            headersOnly(readBoolean(jsonValue, HEADERS_ONLY, null));
1✔
806
            memStorage(readBoolean(jsonValue, MEM_STORAGE, null));
1✔
807

808
            //noinspection DataFlowIssue readNanosList with false ensures not null;
809
            backoff(readNanosList(jsonValue, BACKOFF, false).toArray(new Duration[0]));
1✔
810

811
            metadata(readStringStringMap(jsonValue, METADATA));
1✔
812

813
            String fs = emptyAsNull(readString(jsonValue, FILTER_SUBJECT));
1✔
814
            if (fs == null) {
1✔
815
                filterSubjects(readOptionalStringList(jsonValue, FILTER_SUBJECTS));
1✔
816
            }
817
            else {
818
                filterSubject(fs);
1✔
819
            }
820

821
            priorityGroups(readOptionalStringList(jsonValue, PRIORITY_GROUPS));
1✔
822
            priorityPolicy(PriorityPolicy.get(readString(jsonValue, PRIORITY_POLICY)));
1✔
823

824
            return this;
1✔
825
        }
826

827
        /**
828
         * Sets the description
829
         * @param description the description
830
         * @return the builder
831
         */
832
        public Builder description(String description) {
833
            this.description = emptyAsNull(description);
1✔
834
            return this;
1✔
835
        }
836

837
        /**
838
         * Sets the name of the durable consumer.
839
         * Null or empty clears the field.
840
         * @param durable name of the durable consumer.
841
         * @return the builder
842
         */
843
        public Builder durable(String durable) {
844
            this.durable = validateDurable(durable, false);
1✔
845
            return this;
1✔
846
        }
847

848
        /**
849
         * Sets the name of the consumer.
850
         * Null or empty clears the field.
851
         * @param name name of the consumer.
852
         * @return the builder
853
         */
854
        public Builder name(String name) {
855
            this.name = validateConsumerName(name, false);
1✔
856
            return this;
1✔
857
        }
858

859
        /**
860
         * Sets the delivery policy of the ConsumerConfiguration.
861
         * @param policy the delivery policy.
862
         * @return Builder
863
         */
864
        public Builder deliverPolicy(DeliverPolicy policy) {
865
            this.deliverPolicy = policy;
1✔
866
            return this;
1✔
867
        }
868

869
        /**
870
         * Sets the subject to deliver messages to.
871
         * <p> By setting the deliverySubject this configuration will create a <b>push consumer</b>. When left empty or set to NULL a pull consumer will be created.
872
         * @param subject the subject.
873
         * @return the builder
874
         */
875
        public Builder deliverSubject(String subject) {
876
            this.deliverSubject = emptyAsNull(subject);
1✔
877
            return this;
1✔
878
        }
879

880
        /**
881
         * Sets the group to deliver messages to.
882
         * @param group the delivery group.
883
         * @return the builder
884
         */
885
        public Builder deliverGroup(String group) {
886
            this.deliverGroup = emptyAsNull(group);
1✔
887
            return this;
1✔
888
        }
889

890
        /**
891
         * Sets the start sequence of the ConsumerConfiguration or null to unset / clear.
892
         * @param sequence the start sequence
893
         * @return Builder
894
         */
895
        public Builder startSequence(Long sequence) {
896
            this.startSeq = normalizeUlong(sequence);
1✔
897
            return this;
1✔
898
        }
899

900
        /**
901
         * Sets the start sequence of the ConsumerConfiguration.
902
         * @param sequence the start sequence
903
         * @return Builder
904
         */
905
        public Builder startSequence(long sequence) {
906
            this.startSeq = normalizeUlong(sequence);
1✔
907
            return this;
1✔
908
        }
909

910
        /**
911
         * Sets the start time of the ConsumerConfiguration.
912
         * @param startTime the start time
913
         * @return Builder
914
         */
915
        public Builder startTime(ZonedDateTime startTime) {
916
            this.startTime = startTime;
1✔
917
            return this;
1✔
918
        }
919

920
        /**
921
         * Sets the acknowledgement policy of the ConsumerConfiguration.
922
         * @param policy the acknowledgement policy.
923
         * @return Builder
924
         */
925
        public Builder ackPolicy(AckPolicy policy) {
926
            this.ackPolicy = policy;
1✔
927
            return this;
1✔
928
        }
929

930
        /**
931
         * Sets the acknowledgement wait duration of the ConsumerConfiguration.
932
         * @param timeout the wait timeout
933
         * @return Builder
934
         */
935
        public Builder ackWait(Duration timeout) {
936
            this.ackWait = normalize(timeout);
1✔
937
            return this;
1✔
938
        }
939

940
        /**
941
         * Sets the acknowledgement wait duration of the ConsumerConfiguration.
942
         * @param timeoutMillis the wait timeout in milliseconds
943
         * @return Builder
944
         */
945
        public Builder ackWait(long timeoutMillis) {
946
            this.ackWait = normalizeDuration(timeoutMillis);
1✔
947
            return this;
1✔
948
        }
949

950
        /**
951
         * Sets the maximum delivery amount of the ConsumerConfiguration or null to unset / clear.
952
         * @param maxDeliver the maximum delivery amount
953
         * @return Builder
954
         */
955
        public Builder maxDeliver(Long maxDeliver) {
956
            this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN);
1✔
957
            return this;
1✔
958
        }
959

960
        /**
961
         * Sets the maximum delivery amount of the ConsumerConfiguration.
962
         * @param maxDeliver the maximum delivery amount
963
         * @return Builder
964
         */
965
        public Builder maxDeliver(long maxDeliver) {
966
            this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN);
1✔
967
            return this;
1✔
968
        }
969

970
        /**
971
         * Sets the filter subject of the ConsumerConfiguration.
972
         * Replaces any other filter subjects set in the builder
973
         * @param filterSubject the filter subject
974
         * @return Builder
975
         */
976
        public Builder filterSubject(String filterSubject) {
977
            if (nullOrEmpty(filterSubject)) {
1✔
978
                this.filterSubjects = null;
1✔
979
            }
980
            else {
981
                this.filterSubjects = Collections.singletonList(filterSubject);
1✔
982
            }
983
            return this;
1✔
984
        }
985

986
        /**
987
         * Sets the filter subjects of the ConsumerConfiguration.
988
         * Replaces any other filter subjects set in the builder
989
         * @param filterSubjects one or more filter subjects
990
         * @return Builder
991
         */
992
        public Builder filterSubjects(String... filterSubjects) {
993
            return filterSubjects(Arrays.asList(filterSubjects));
1✔
994
        }
995

996
        /**
997
         * Sets the filter subjects of the ConsumerConfiguration.
998
         * Replaces any other filter subjects set in the builder
999
         * @param filterSubjects the list of filter subjects
1000
         * @return Builder
1001
         */
1002
        public Builder filterSubjects(List<String> filterSubjects) {
1003
            this.filterSubjects = new ArrayList<>();
1✔
1004
            if (filterSubjects != null) {
1✔
1005
                for (String fs : filterSubjects) {
1✔
1006
                    if (!nullOrEmpty(fs)) {
1✔
1007
                        this.filterSubjects.add(fs);
1✔
1008
                    }
1009
                }
1✔
1010
            }
1011
            if (this.filterSubjects.isEmpty()) {
1✔
1012
                this.filterSubjects = null;
1✔
1013
            }
1014
            return this;
1✔
1015
        }
1016

1017
        /**
1018
         * Sets the replay policy of the ConsumerConfiguration.
1019
         * @param policy the replay policy.
1020
         * @return Builder
1021
         */
1022
        public Builder replayPolicy(ReplayPolicy policy) {
1023
            this.replayPolicy = policy;
1✔
1024
            return this;
1✔
1025
        }
1026

1027
        /**
1028
         * Sets the sample frequency of the ConsumerConfiguration.
1029
         * @param frequency the frequency
1030
         * @return Builder
1031
         */
1032
        public Builder sampleFrequency(String frequency) {
1033
            this.sampleFrequency = emptyAsNull(frequency);
1✔
1034
            return this;
1✔
1035
        }
1036

1037
        /**
1038
         * Set the rate limit of the ConsumerConfiguration or null to unset / clear.
1039
         * @param bitsPerSecond bits per second to deliver
1040
         * @return Builder
1041
         */
1042
        public Builder rateLimit(Long bitsPerSecond) {
1043
            this.rateLimit = normalizeUlong(bitsPerSecond);
1✔
1044
            return this;
1✔
1045
        }
1046

1047
        /**
1048
         * Set the rate limit of the ConsumerConfiguration.
1049
         * @param bitsPerSecond bits per second to deliver
1050
         * @return Builder
1051
         */
1052
        public Builder rateLimit(long bitsPerSecond) {
1053
            this.rateLimit = normalizeUlong(bitsPerSecond);
1✔
1054
            return this;
1✔
1055
        }
1056

1057
        /**
1058
         * Sets the maximum ack pending or null to unset / clear.
1059
         * @param maxAckPending maximum pending acknowledgements.
1060
         * @return Builder
1061
         */
1062
        public Builder maxAckPending(Long maxAckPending) {
1063
            this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
1✔
1064
            return this;
1✔
1065
        }
1066

1067
        /**
1068
         * Sets the maximum ack pending.
1069
         * @param maxAckPending maximum pending acknowledgements.
1070
         * @return Builder
1071
         */
1072
        public Builder maxAckPending(long maxAckPending) {
1073
            this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
1✔
1074
            return this;
1✔
1075
        }
1076

1077
        /**
1078
         * sets the idle heart beat wait time
1079
         * @param idleHeartbeat the idle heart beat duration
1080
         * @return Builder
1081
         */
1082
        public Builder idleHeartbeat(Duration idleHeartbeat) {
1083
            if (idleHeartbeat == null) {
1✔
1084
                this.idleHeartbeat = null;
1✔
1085
            }
1086
            else {
1087
                long nanos = idleHeartbeat.toNanos();
1✔
1088
                if (nanos <= DURATION_UNSET_LONG) {
1✔
1089
                    this.idleHeartbeat = DURATION_UNSET;
1✔
1090
                }
1091
                else if (nanos < MIN_IDLE_HEARTBEAT_NANOS) {
1✔
1092
                    throw new IllegalArgumentException("Duration must be greater than or equal to " + MIN_IDLE_HEARTBEAT_NANOS + " nanos.");
1✔
1093
                }
1094
                else {
1095
                    this.idleHeartbeat = idleHeartbeat;
1✔
1096
                }
1097
            }
1098
            return this;
1✔
1099
        }
1100

1101
        /**
1102
         * sets the idle heart beat wait time
1103
         * @param idleHeartbeatMillis the idle heart beat duration in milliseconds
1104
         * @return Builder
1105
         */
1106
        public Builder idleHeartbeat(long idleHeartbeatMillis) {
1107
            if (idleHeartbeatMillis <= DURATION_UNSET_LONG) {
1✔
1108
                this.idleHeartbeat = DURATION_UNSET;
1✔
1109
            }
1110
            else if (idleHeartbeatMillis < MIN_IDLE_HEARTBEAT_MILLIS) {
1✔
1111
                throw new IllegalArgumentException("Duration must be greater than or equal to " + MIN_IDLE_HEARTBEAT_MILLIS + " milliseconds.");
1✔
1112
            }
1113
            else {
1114
                this.idleHeartbeat = Duration.ofMillis(idleHeartbeatMillis);
1✔
1115
            }
1116
            return this;
1✔
1117
        }
1118

1119
        /**
1120
         * set the flow control on and set the idle heartbeat
1121
         * @param idleHeartbeat the idle heart beat duration
1122
         * @return Builder
1123
         */
1124
        public Builder flowControl(Duration idleHeartbeat) {
1125
            this.flowControl = true;
1✔
1126
            return idleHeartbeat(idleHeartbeat);
1✔
1127
        }
1128

1129
        /**
1130
         * set the flow control on and set the idle heartbeat
1131
         * @param idleHeartbeatMillis the idle heart beat duration in milliseconds
1132
         * @return Builder
1133
         */
1134
        public Builder flowControl(long idleHeartbeatMillis) {
1135
            this.flowControl = true;
1✔
1136
            return idleHeartbeat(idleHeartbeatMillis);
1✔
1137
        }
1138

1139
        /**
1140
         * sets the max amount of expire time for the server to allow on pull requests.
1141
         * @param maxExpires the max expire duration
1142
         * @return Builder
1143
         */
1144
        public Builder maxExpires(Duration maxExpires) {
1145
            this.maxExpires = normalize(maxExpires);
1✔
1146
            return this;
1✔
1147
        }
1148

1149
        /**
1150
         * sets the max amount of expire time for the server to allow on pull requests.
1151
         * @param maxExpires the max expire duration in milliseconds
1152
         * @return Builder
1153
         */
1154
        public Builder maxExpires(long maxExpires) {
1155
            this.maxExpires = normalizeDuration(maxExpires);
1✔
1156
            return this;
1✔
1157
        }
1158

1159
        /**
1160
         * sets the amount of time before the consumer is deemed inactive.
1161
         * @param inactiveThreshold the threshold duration
1162
         * @return Builder
1163
         */
1164
        public Builder inactiveThreshold(Duration inactiveThreshold) {
1165
            this.inactiveThreshold = normalize(inactiveThreshold);
1✔
1166
            return this;
1✔
1167
        }
1168

1169
        /**
1170
         * sets the amount of time before the consumer is deemed inactive.
1171
         * @param inactiveThreshold the threshold duration in milliseconds
1172
         * @return Builder
1173
         */
1174
        public Builder inactiveThreshold(long inactiveThreshold) {
1175
            this.inactiveThreshold = normalizeDuration(inactiveThreshold);
1✔
1176
            return this;
1✔
1177
        }
1178

1179
        /**
1180
         * sets the max pull waiting, the number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored.
1181
         * Use null to unset / clear.
1182
         * @param maxPullWaiting the max pull waiting
1183
         * @return Builder
1184
         */
1185
        public Builder maxPullWaiting(Long maxPullWaiting) {
1186
            this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN);
1✔
1187
            return this;
1✔
1188
        }
1189

1190
        /**
1191
         * sets the max pull waiting, the number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored.
1192
         * @param maxPullWaiting the max pull waiting
1193
         * @return Builder
1194
         */
1195
        public Builder maxPullWaiting(long maxPullWaiting) {
1196
            this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN);
1✔
1197
            return this;
1✔
1198
        }
1199

1200
        /**
1201
         * sets the max batch size for the server to allow on pull requests.
1202
         * @param maxBatch the max batch size
1203
         * @return Builder
1204
         */
1205
        public Builder maxBatch(Long maxBatch) {
1206
            this.maxBatch = normalize(maxBatch, STANDARD_MIN);
1✔
1207
            return this;
1✔
1208
        }
1209

1210
        /**
1211
         * sets the max batch size for the server to allow on pull requests.
1212
         * @param maxBatch the max batch size
1213
         * @return Builder
1214
         */
1215
        public Builder maxBatch(long maxBatch) {
1216
            this.maxBatch = normalize(maxBatch, STANDARD_MIN);
1✔
1217
            return this;
1✔
1218
        }
1219

1220
        /**
1221
         * sets the max bytes size for the server to allow on pull requests.
1222
         * @param maxBytes the max bytes size
1223
         * @return Builder
1224
         */
1225
        public Builder maxBytes(Long maxBytes) {
1226
            this.maxBytes = normalize(maxBytes, STANDARD_MIN);
1✔
1227
            return this;
1✔
1228
        }
1229

1230
        /**
1231
         * sets the max bytes size for the server to allow on pull requests.
1232
         * @param maxBytes the max bytes size
1233
         * @return Builder
1234
         */
1235
        public Builder maxBytes(long maxBytes) {
1236
            this.maxBytes = normalize(maxBytes, STANDARD_MIN);
1✔
1237
            return this;
1✔
1238
        }
1239

1240
        /**
1241
         * set the number of replicas for the consumer. When set do not inherit the
1242
         * replica count from the stream but specifically set it to this amount.
1243
         * @param numReplicas number of replicas for the consumer
1244
         * @return Builder
1245
         */
1246
        public Builder numReplicas(Integer numReplicas) {
1247
            this.numReplicas = numReplicas == null ? null : validateNumberOfReplicas(numReplicas);
1✔
1248
            return this;
1✔
1249
        }
1250

1251
        /**
1252
         * Sets the time to pause the consumer until.
1253
         * @param pauseUntil the time to pause
1254
         * @return Builder
1255
         */
1256
        public Builder pauseUntil(ZonedDateTime pauseUntil) {
1257
            this.pauseUntil = pauseUntil;
1✔
1258
            return this;
1✔
1259
        }
1260

1261
        /**
1262
         * set the headers only flag saying to deliver only the headers of
1263
         * messages in the stream and not the bodies
1264
         * @param headersOnly the flag
1265
         * @return Builder
1266
         */
1267
        public Builder headersOnly(Boolean headersOnly) {
1268
            this.headersOnly = headersOnly;
1✔
1269
            return this;
1✔
1270
        }
1271

1272
        /**
1273
         * set the mem storage flag to force the consumer state to be kept
1274
         * in memory rather than inherit the setting from the stream
1275
         * @param memStorage the flag
1276
         * @return Builder
1277
         */
1278
        public Builder memStorage(Boolean memStorage) {
1279
            this.memStorage = memStorage;
1✔
1280
            return this;
1✔
1281
        }
1282

1283
        /**
1284
         * Set the list of backoff. Will override ackwait setting.
1285
         * @see <a href="https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability">Delivery Reliability</a>
1286
         * @param backoffs zero or more backoff durations or an array of backoffs
1287
         * @return Builder
1288
         */
1289
        public Builder backoff(Duration... backoffs) {
1290
            if (backoffs == null || (backoffs.length == 1 && backoffs[0] == null))
1✔
1291
            {
1292
                backoff = null;
1✔
1293
            }
1294
            else
1295
            {
1296
                backoff = new ArrayList<>();
1✔
1297
                for (Duration d : backoffs)
1✔
1298
                {
1299
                    if (d != null)
1✔
1300
                    {
1301
                        if (d.toNanos() < DURATION_MIN_LONG)
1✔
1302
                        {
1303
                            throw new IllegalArgumentException("Backoff cannot be less than " + DURATION_MIN_LONG);
1✔
1304
                        }
1305
                        backoff.add(d);
1✔
1306
                    }
1307
                }
1308
            }
1309
            return this;
1✔
1310
        }
1311

1312
        /**
1313
         * Set the list of backoff. Will override ackwait setting.
1314
         * @see <a href="https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability">Delivery Reliability</a>
1315
         * @param backoffsMillis zero or more backoff in millis or an array of backoffsMillis
1316
         * @return Builder
1317
         */
1318
        public Builder backoff(long... backoffsMillis) {
1319
            if (backoffsMillis == null) {
1✔
1320
                backoff = null;
1✔
1321
            }
1322
            else {
1323
                backoff = new ArrayList<>();
1✔
1324
                for (long ms : backoffsMillis) {
1✔
1325
                    if (ms < DURATION_MIN_LONG) {
1✔
1326
                        throw new IllegalArgumentException("Backoff cannot be less than " + DURATION_MIN_LONG);
1✔
1327
                    }
1328
                    this.backoff.add(Duration.ofMillis(ms));
1✔
1329
                }
1330
            }
1331
            return this;
1✔
1332
        }
1333

1334
        /**
1335
         * Sets the metadata for the configuration
1336
         * @param metadata the metadata map
1337
         * @return Builder
1338
         */
1339
        public Builder metadata(Map<String, String> metadata) {
1340
            this.metadata = metadata == null || metadata.isEmpty() ? null : new HashMap<>(metadata);
1✔
1341
            return this;
1✔
1342
        }
1343

1344
        /**
1345
         * Sets the priority groups of the ConsumerConfiguration.
1346
         * Replaces any other priority groups set in the builder
1347
         * @param priorityGroups one or more priority groups
1348
         * @return Builder
1349
         */
1350
        public Builder priorityGroups(String... priorityGroups) {
1351
            return priorityGroups(Arrays.asList(priorityGroups));
1✔
1352
        }
1353

1354
        /**
1355
         * Sets the priority groups of the ConsumerConfiguration.
1356
         * Replaces any other priority groups set in the builder
1357
         * @param priorityGroups the list of priority groups
1358
         * @return Builder
1359
         */
1360
        public Builder priorityGroups(List<String> priorityGroups) {
1361
            this.priorityGroups = new ArrayList<>();
1✔
1362
            if (priorityGroups != null) {
1✔
1363
                for (String pg : priorityGroups) {
1✔
1364
                    if (!nullOrEmpty(pg)) {
1✔
1365
                        this.priorityGroups.add(pg);
1✔
1366
                    }
1367
                }
1✔
1368
            }
1369
            if (this.priorityGroups.isEmpty()) {
1✔
1370
                this.priorityGroups = null;
1✔
1371
            }
1372
            return this;
1✔
1373
        }
1374

1375
        /**
1376
         * Sets the priority policy of the ConsumerConfiguration.
1377
         * @param policy the priority policy.
1378
         * @return Builder
1379
         */
1380
        public Builder priorityPolicy(PriorityPolicy policy) {
1381
            this.priorityPolicy = policy;
1✔
1382
            return this;
1✔
1383
        }
1384

1385
        /**
1386
         * Builds the ConsumerConfiguration
1387
         * @return The consumer configuration.
1388
         */
1389
        public ConsumerConfiguration build() {
1390
            validateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch);
1✔
1391
            return new ConsumerConfiguration(this);
1✔
1392
        }
1393

1394
        /**
1395
         * Builds the PushSubscribeOptions with this configuration
1396
         * @return The PushSubscribeOptions.
1397
         */
1398
        public PushSubscribeOptions buildPushSubscribeOptions() {
1399
            return PushSubscribeOptions.builder().configuration(build()).build();
1✔
1400
        }
1401

1402
        /**
1403
         * Builds the PushSubscribeOptions with this configuration.
1404
         * Providing the stream is a hint for the subscription process that
1405
         * saves a call to the server. Assumes the stream is the correct stream
1406
         * for the subject filter, otherwise the server will return an error
1407
         * which the subscription call will raise to the user.
1408
         * @param stream the stream for this consumer
1409
         * @return The PushSubscribeOptions.
1410
         */
1411
        public PushSubscribeOptions buildPushSubscribeOptions(String stream) {
1412
            return PushSubscribeOptions.builder().configuration(build()).stream(stream).build();
×
1413
        }
1414

1415
        /**
1416
         * Builds the PullSubscribeOptions with this configuration
1417
         * @return The PullSubscribeOptions.
1418
         */
1419
        public PullSubscribeOptions buildPullSubscribeOptions() {
1420
            return PullSubscribeOptions.builder().configuration(build()).build();
1✔
1421
        }
1422

1423
        /**
1424
         * Builds the PullSubscribeOptions with this configuration
1425
         * Providing the stream is a hint for the subscription process that
1426
         * saves a call to the server. Assumes the stream is the correct stream
1427
         * for the subject filter, otherwise the server will return an error
1428
         * which the subscription call will raise to the user.
1429
         * @param stream the stream for this consumer
1430
         * @return The PullSubscribeOptions.
1431
         */
1432
        public PullSubscribeOptions buildPullSubscribeOptions(String stream) {
1433
            return PullSubscribeOptions.builder().configuration(build()).stream(stream).build();
×
1434
        }
1435
    }
1436

1437
    @Override
1438
    public String toString() {
1439
        return "ConsumerConfiguration " + toJson();
1✔
1440
    }
1441

1442
    protected static int getOrUnset(Integer val)
1443
    {
1444
        return val == null ? INTEGER_UNSET : val;
1✔
1445
    }
1446

1447
    protected static long getOrUnsetUlong(Long val)
1448
    {
1449
        return val == null || val < 0 ? ULONG_UNSET : val;
1✔
1450
    }
1451

1452
    protected static Duration getOrUnset(Duration val)
1453
    {
1454
        return val == null ? DURATION_UNSET : val;
1✔
1455
    }
1456

1457
    protected static Integer normalize(Long l, int min) {
1458
        if (l == null) {
1✔
1459
            return null;
1✔
1460
        }
1461

1462
        if (l < min) {
1✔
1463
            return INTEGER_UNSET;
1✔
1464
        }
1465

1466
        if (l > Integer.MAX_VALUE) {
1✔
1467
            return Integer.MAX_VALUE;
1✔
1468
        }
1469

1470
        return l.intValue();
1✔
1471
    }
1472

1473
    protected static Long normalizeUlong(Long u)
1474
    {
1475
        return u == null ? null : u <= ULONG_UNSET ? ULONG_UNSET : u;
1✔
1476
    }
1477

1478
    protected static Duration normalize(Duration d)
1479
    {
1480
        return d == null ? null : d.toNanos() <= DURATION_UNSET_LONG ? DURATION_UNSET : d;
1✔
1481
    }
1482

1483
    protected static Duration normalizeDuration(long millis)
1484
    {
1485
        return millis <= DURATION_UNSET_LONG ? DURATION_UNSET : Duration.ofMillis(millis);
1✔
1486
    }
1487

1488
    protected static DeliverPolicy GetOrDefault(DeliverPolicy p) { return p == null ? DEFAULT_DELIVER_POLICY : p; }
1✔
1489
    protected static AckPolicy GetOrDefault(AckPolicy p) { return p == null ? DEFAULT_ACK_POLICY : p; }
1✔
1490
    protected static ReplayPolicy GetOrDefault(ReplayPolicy p) { return p == null ? DEFAULT_REPLAY_POLICY : p; }
1✔
1491
    protected static PriorityPolicy GetOrDefault(PriorityPolicy p) { return p == null ? DEFAULT_PRIORITY_POLICY : p; }
1✔
1492
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc