• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2222

25 Sep 2025 02:30PM UTC coverage: 95.522% (-0.05%) from 95.571%
#2222

push

github

web-flow
Merge pull request #1433 from nats-io/prioritized

(2.12) Prioritized Consumer Support

36 of 41 new or added lines in 10 files covered. (87.8%)

4 existing lines in 2 files now uncovered.

12137 of 12706 relevant lines covered (95.52%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.31
/src/main/java/io/nats/client/api/ConsumerConfiguration.java
1
// Copyright 2020 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.api;
15

16
import io.nats.client.PullSubscribeOptions;
17
import io.nats.client.PushSubscribeOptions;
18
import io.nats.client.support.*;
19
import org.jspecify.annotations.NonNull;
20
import org.jspecify.annotations.Nullable;
21

22
import java.time.Duration;
23
import java.time.ZonedDateTime;
24
import java.util.*;
25

26
import static io.nats.client.support.ApiConstants.*;
27
import static io.nats.client.support.JsonUtils.beginJson;
28
import static io.nats.client.support.JsonUtils.endJson;
29
import static io.nats.client.support.JsonValueUtils.*;
30
import static io.nats.client.support.NatsJetStreamClientError.JsConsumerNameDurableMismatch;
31
import static io.nats.client.support.Validator.*;
32

33
/**
34
 * The ConsumerConfiguration class specifies the configuration for creating a JetStream consumer on the client and
35
 * if necessary the server.
36
 * Options are created using a ConsumerConfiguration.Builder.
37
 * <p>ConsumerConfiguration is intended to be used with  {@link io.nats.client.JetStreamManagement#createConsumer(String, ConsumerConfiguration) JetStreamManagement.createConsumer()}.
38
 * <P> By default this will create a <b>pull consumer</b> unless {@link ConsumerConfiguration.Builder#deliverSubject(String) ConsumerConfiguration.Builder.deliverSubject(String) } is set.
39
 */
40
public class ConsumerConfiguration implements JsonSerializable {
41
    @Deprecated
42
    public static final Duration DURATION_MIN = Duration.ofNanos(1);
1✔
43

44
    public static final DeliverPolicy DEFAULT_DELIVER_POLICY = DeliverPolicy.All;
1✔
45
    public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.Explicit;
1✔
46
    public static final ReplayPolicy DEFAULT_REPLAY_POLICY = ReplayPolicy.Instant;
1✔
47
    public static final PriorityPolicy DEFAULT_PRIORITY_POLICY = PriorityPolicy.None;
1✔
48

49
    public static final Duration DURATION_UNSET = Duration.ZERO;
1✔
50
    public static final Duration MIN_IDLE_HEARTBEAT = Duration.ofMillis(100);
1✔
51

52
    public static final int INTEGER_UNSET = -1;
53
    public static final long LONG_UNSET = -1;
54
    public static final long ULONG_UNSET = 0;
55
    public static final long DURATION_UNSET_LONG = 0;
56
    public static final long DURATION_MIN_LONG = 1;
57
    public static final int STANDARD_MIN = 0;
58
    public static final int MAX_DELIVER_MIN = 1;
59

60
    public static final long MIN_IDLE_HEARTBEAT_NANOS = MIN_IDLE_HEARTBEAT.toNanos();
1✔
61
    public static final long MIN_IDLE_HEARTBEAT_MILLIS = MIN_IDLE_HEARTBEAT.toMillis();
1✔
62

63
    protected final DeliverPolicy deliverPolicy;
64
    protected final AckPolicy ackPolicy;
65
    protected final ReplayPolicy replayPolicy;
66
    protected final String description;
67
    protected final String durable;
68
    protected final String name;
69
    protected final String deliverSubject;
70
    protected final String deliverGroup;
71
    protected final String sampleFrequency;
72
    protected final ZonedDateTime startTime;
73
    protected final Duration ackWait;
74
    protected final Duration idleHeartbeat;
75
    protected final Duration maxExpires;
76
    protected final Duration inactiveThreshold;
77
    protected final Long startSeq; // server side this is unsigned
78
    protected final Integer maxDeliver;
79
    protected final Long rateLimit; // server side this is unsigned
80
    protected final Integer maxAckPending;
81
    protected final Integer maxPullWaiting;
82
    protected final Integer maxBatch;
83
    protected final Integer maxBytes;
84
    protected final Integer numReplicas;
85
    protected final ZonedDateTime pauseUntil;
86
    protected final Boolean flowControl;
87
    protected final Boolean headersOnly;
88
    protected final Boolean memStorage;
89
    protected final List<Duration> backoff;
90
    protected final Map<String, String> metadata;
91
    protected final List<String> filterSubjects;
92
    protected final List<String> priorityGroups;
93
    protected final PriorityPolicy priorityPolicy;
94
    protected final Duration priorityTimeout;
95

96
    protected ConsumerConfiguration(ConsumerConfiguration cc) {
1✔
97
        this.deliverPolicy = cc.deliverPolicy;
1✔
98
        this.ackPolicy = cc.ackPolicy;
1✔
99
        this.replayPolicy = cc.replayPolicy;
1✔
100
        this.description = cc.description;
1✔
101
        this.durable = cc.durable;
1✔
102
        this.name = cc.name;
1✔
103
        this.deliverSubject = cc.deliverSubject;
1✔
104
        this.deliverGroup = cc.deliverGroup;
1✔
105
        this.sampleFrequency = cc.sampleFrequency;
1✔
106
        this.startTime = cc.startTime;
1✔
107
        this.ackWait = cc.ackWait;
1✔
108
        this.idleHeartbeat = cc.idleHeartbeat;
1✔
109
        this.maxExpires = cc.maxExpires;
1✔
110
        this.inactiveThreshold = cc.inactiveThreshold;
1✔
111
        this.startSeq = cc.startSeq;
1✔
112
        this.maxDeliver = cc.maxDeliver;
1✔
113
        this.rateLimit = cc.rateLimit;
1✔
114
        this.maxAckPending = cc.maxAckPending;
1✔
115
        this.maxPullWaiting = cc.maxPullWaiting;
1✔
116
        this.maxBatch = cc.maxBatch;
1✔
117
        this.maxBytes = cc.maxBytes;
1✔
118
        this.numReplicas = cc.numReplicas;
1✔
119
        this.pauseUntil = cc.pauseUntil;
1✔
120
        this.flowControl = cc.flowControl;
1✔
121
        this.headersOnly = cc.headersOnly;
1✔
122
        this.memStorage = cc.memStorage;
1✔
123
        this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff);
1✔
124
        this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata);
1✔
125
        this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
1✔
126
        this.priorityGroups = cc.priorityGroups == null ? null : new ArrayList<>(cc.priorityGroups);
1✔
127
        this.priorityPolicy = cc.priorityPolicy;
1✔
128
        this.priorityTimeout = cc.priorityTimeout;
1✔
129
    }
1✔
130

131
    // For the builder
132
    protected ConsumerConfiguration(Builder b)
133
    {
1✔
134
        this.deliverPolicy = b.deliverPolicy;
1✔
135
        this.ackPolicy = b.ackPolicy;
1✔
136
        this.replayPolicy = b.replayPolicy;
1✔
137

138
        this.description = b.description;
1✔
139
        this.durable = b.durable;
1✔
140
        this.name = b.name;
1✔
141
        this.startTime = b.startTime;
1✔
142
        this.ackWait = b.ackWait;
1✔
143
        this.sampleFrequency = b.sampleFrequency;
1✔
144
        this.deliverSubject = b.deliverSubject;
1✔
145
        this.deliverGroup = b.deliverGroup;
1✔
146
        this.idleHeartbeat = b.idleHeartbeat;
1✔
147
        this.maxExpires = b.maxExpires;
1✔
148
        this.inactiveThreshold = b.inactiveThreshold;
1✔
149

150
        this.startSeq = b.startSeq;
1✔
151
        this.maxDeliver = b.maxDeliver;
1✔
152
        this.rateLimit = b.rateLimit;
1✔
153
        this.maxAckPending = b.maxAckPending;
1✔
154
        this.maxPullWaiting = b.maxPullWaiting;
1✔
155
        this.maxBatch = b.maxBatch;
1✔
156
        this.maxBytes = b.maxBytes;
1✔
157
        this.numReplicas = b.numReplicas;
1✔
158
        this.pauseUntil = b.pauseUntil;
1✔
159

160
        this.flowControl = b.flowControl;
1✔
161
        this.headersOnly = b.headersOnly;
1✔
162
        this.memStorage = b.memStorage;
1✔
163

164
        this.backoff = b.backoff;
1✔
165
        this.metadata = b.metadata;
1✔
166
        this.filterSubjects = b.filterSubjects;
1✔
167

168
        this.priorityGroups = b.priorityGroups;
1✔
169
        this.priorityPolicy = b.priorityPolicy;
1✔
170
        this.priorityTimeout = b.priorityTimeout;
1✔
171
    }
1✔
172

173
    /**
174
     * Returns a JSON representation of this consumer configuration.
175
     * @return json consumer configuration json string
176
     */
177
    @Override
178
    @NonNull
179
        public String toJson() {
180
        StringBuilder sb = beginJson();
1✔
181
        JsonUtils.addField(sb, DESCRIPTION, description);
1✔
182
        JsonUtils.addField(sb, DURABLE_NAME, durable);
1✔
183
        JsonUtils.addField(sb, NAME, name);
1✔
184
        JsonUtils.addField(sb, DELIVER_SUBJECT, deliverSubject);
1✔
185
        JsonUtils.addField(sb, DELIVER_GROUP, deliverGroup);
1✔
186
        JsonUtils.addField(sb, DELIVER_POLICY, GetOrDefault(deliverPolicy).toString());
1✔
187
        JsonUtils.addFieldWhenGtZero(sb, OPT_START_SEQ, startSeq);
1✔
188
        JsonUtils.addField(sb, OPT_START_TIME, startTime);
1✔
189
        JsonUtils.addField(sb, ACK_POLICY, GetOrDefault(ackPolicy).toString());
1✔
190
        JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
1✔
191
        JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
1✔
192
        JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
1✔
193
        JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
1✔
194
        JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
1✔
195
        JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
1✔
196
        JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
1✔
197
        JsonUtils.addFldWhenTrue(sb, FLOW_CONTROL, flowControl);
1✔
198
        JsonUtils.addField(sb, ApiConstants.MAX_WAITING, maxPullWaiting);
1✔
199
        JsonUtils.addFldWhenTrue(sb, HEADERS_ONLY, headersOnly);
1✔
200
        JsonUtils.addField(sb, MAX_BATCH, maxBatch);
1✔
201
        JsonUtils.addField(sb, MAX_BYTES, maxBytes);
1✔
202
        JsonUtils.addFieldAsNanos(sb, MAX_EXPIRES, maxExpires);
1✔
203
        JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
1✔
204
        JsonUtils.addDurations(sb, BACKOFF, backoff);
1✔
205
        JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
1✔
206
        JsonUtils.addField(sb, PAUSE_UNTIL, pauseUntil);
1✔
207
        JsonUtils.addField(sb, MEM_STORAGE, memStorage);
1✔
208
        JsonUtils.addField(sb, METADATA, metadata);
1✔
209
        if (filterSubjects != null) {
1✔
210
            if (filterSubjects.size() > 1) {
1✔
211
                JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
1✔
212
            }
213
            else if (filterSubjects.size() == 1) {
1✔
214
                JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
1✔
215
            }
216
        }
217
        JsonUtils.addStrings(sb, PRIORITY_GROUPS, priorityGroups);
1✔
218
        if (priorityPolicy != null && priorityPolicy != DEFAULT_PRIORITY_POLICY) {
1✔
219
            JsonUtils.addField(sb, PRIORITY_POLICY, priorityPolicy.toString());
1✔
220
        }
221
        JsonUtils.addFieldAsNanos(sb, PRIORITY_TIMEOUT, priorityTimeout);
1✔
222

223
        return endJson(sb).toString();
1✔
224
    }
225

226
    /**
227
     * Gets the name of the description of this consumer configuration.
228
     * @return name of the description.
229
     */
230
    @Nullable
231
    public String getDescription() {
232
        return description;
1✔
233
    }
234

235
    /**
236
     * Gets the name of the durable name for this consumer configuration.
237
     * @return name of the durable.
238
     */
239
    @Nullable
240
    public String getDurable() {
241
        return durable;
1✔
242
    }
243

244
    /**
245
     * Gets the name of the consumer name for this consumer configuration.
246
     * @return name of the consumer.
247
     */
248
    @Nullable
249
    public String getName() {
250
        return name;
1✔
251
    }
252

253
    /**
254
     * Gets the deliver subject of this consumer configuration.
255
     * @return the deliver subject.
256
     */
257
    @Nullable
258
    public String getDeliverSubject() {
259
        return deliverSubject;
1✔
260
    }
261

262
    /**
263
     * Gets the deliver group of this consumer configuration.
264
     * @return the deliver group.
265
     */
266
    @Nullable
267
    public String getDeliverGroup() {
268
        return deliverGroup;
1✔
269
    }
270

271
    /**
272
     * Gets the deliver policy of this consumer configuration.
273
     * @return the deliver policy.
274
     */
275
    @NonNull
276
    public DeliverPolicy getDeliverPolicy() {
277
        return GetOrDefault(deliverPolicy);
1✔
278
    }
279

280
    /**
281
     * Gets the start sequence of this consumer configuration.
282
     * @return the start sequence.
283
     */
284
    public long getStartSequence() {
285
        return getOrUnsetUlong(startSeq);
1✔
286
    }
287

288
    /**
289
     * Gets the start time of this consumer configuration.
290
     * @return the start time.
291
     */
292
    @Nullable
293
    public ZonedDateTime getStartTime() {
294
        return startTime;
1✔
295
    }
296

297
    /**
298
     * Gets the acknowledgment policy of this consumer configuration.
299
     * @return the acknowledgment policy.
300
     */
301
    @NonNull
302
    public AckPolicy getAckPolicy() {
303
        return GetOrDefault(ackPolicy);
1✔
304
    }
305

306
    /**
307
     * Gets the acknowledgment wait of this consumer configuration.
308
     * @return the acknowledgment wait duration.
309
     */
310
    @Nullable
311
    public Duration getAckWait() {
312
        return ackWait;
1✔
313
    }
314

315
    /**
316
     * Gets the max delivery amount of this consumer configuration.
317
     * @return the max delivery amount.
318
     */
319
    public long getMaxDeliver() {
320
        return getOrUnset(maxDeliver);
1✔
321
    }
322

323
    /**
324
     * Gets the filter subject of this consumer configuration.
325
     * With the introduction of multiple filter subjects, this method will
326
     * return null if there are not exactly one filter subjects
327
     * @return the first filter subject.
328
     */
329
    @Nullable
330
    public String getFilterSubject() {
331
        return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
1✔
332
    }
333

334
    /**
335
     * Gets the filter subjects as a list. May be null, otherwise won't be empty
336
     * @return the list
337
     */
338
    @Nullable
339
    public List<String> getFilterSubjects() {
340
        return filterSubjects;
1✔
341
    }
342

343
    /**
344
     * Gets the priority groups as a list. May be null, otherwise won't be empty
345
     * @return the list
346
     */
347
    @Nullable
348
    public List<String> getPriorityGroups() {
349
        return priorityGroups;
1✔
350
    }
351

352
    /**
353
     * Whether there are multiple filter subjects for this consumer configuration.
354
     * @return true if there are multiple filter subjects
355
     */
356
    public boolean hasMultipleFilterSubjects() {
357
        return filterSubjects != null && filterSubjects.size() > 1;
1✔
358
    }
359

360
    /**
361
     * Gets the replay policy of this consumer configuration.
362
     * @return the replay policy.
363
     */
364
    @NonNull
365
    public ReplayPolicy getReplayPolicy() {
366
        return GetOrDefault(replayPolicy);
1✔
367
    }
368

369
    /**
370
     * Gets the rate limit for this consumer configuration.
371
     * @return the rate limit in bits per second
372
     */
373
    public long getRateLimit() {
374
        return getOrUnsetUlong(rateLimit);
1✔
375
    }
376

377
    /**
378
     * Gets the maximum ack pending configuration.
379
     * @return maximum ack pending.
380
     */
381
    public long getMaxAckPending() {
382
        return getOrUnset(maxAckPending);
1✔
383
    }
384

385
    /**
386
     * Gets the sample frequency.
387
     * @return sampleFrequency.
388
     */
389
    @Nullable
390
    public String getSampleFrequency() {
391
        return sampleFrequency;
1✔
392
    }
393

394
    /**
395
     * Gets the idle heart beat wait time
396
     * @return the idle heart beat wait duration.
397
     */
398
    @Nullable
399
    public Duration getIdleHeartbeat() {
400
        return idleHeartbeat;
1✔
401
    }
402

403
    /**
404
     * Get the flow control flag indicating whether it's on or off
405
     * @return the flow control mode
406
     */
407
    public boolean isFlowControl() {
408
        // The way the builder and json reading works it's never false if it's not null
409
        // but this way I can make code coverage happy and not assume.
410
        return Boolean.TRUE.equals(flowControl);
1✔
411
    }
412

413
    /**
414
     * Get the number of pulls that can be outstanding on a pull consumer
415
     * @return the max pull waiting
416
     */
417
    public long getMaxPullWaiting() {
418
        return getOrUnset(maxPullWaiting);
1✔
419
    }
420

421
    /**
422
     * Get the header only flag indicating whether it's on or off
423
     * @return the flow control mode
424
     */
425
    public boolean isHeadersOnly() {
426
        return headersOnly != null && headersOnly;
1✔
427
    }
428

429
    /**
430
     * Get the mem storage flag whether it's on or off.
431
     * @return the mem storage mode
432
     */
433
    public boolean isMemStorage() {
434
        return memStorage != null && memStorage;
1✔
435
    }
436

437
    /**
438
     * Get the max batch size for the server to allow on pull requests.
439
     * @return the max batch size
440
     */
441
    public long getMaxBatch() {
442
        return getOrUnset(maxBatch);
1✔
443
    }
444

445
    /**
446
     * Get the max bytes size for the server to allow on pull requests.
447
     * @return the max byte size
448
     */
449
    public long getMaxBytes() {
450
        return getOrUnset(maxBytes);
1✔
451
    }
452

453
    /**
454
     * Get the max amount of expire time for the server to allow on pull requests.
455
     * @return the max expire
456
     */
457
    @Nullable
458
    public Duration getMaxExpires() {
459
        return maxExpires;
1✔
460
    }
461

462
    /**
463
     * Get the amount of time before the consumer is deemed inactive.
464
     * @return the inactive threshold
465
     */
466
    @Nullable
467
    public Duration getInactiveThreshold() {
468
        return inactiveThreshold;
1✔
469
    }
470

471
    /**
472
     * Get the backoff list; may be empty, will never be null.
473
     * @return the list
474
     */
475
    @NonNull
476
    public List<Duration> getBackoff() {
477
        return backoff == null ? Collections.emptyList() : backoff;
1✔
478
    }
479

480
    /**
481
     * Metadata for the consumer; may be empty, will never be null.
482
     * @return the metadata map
483
     */
484
    @NonNull
485
    public Map<String, String> getMetadata() {
486
        return metadata == null ? Collections.emptyMap() : metadata;
1✔
487
    }
488

489
    /**
490
     * Get the number of consumer replicas.
491
     * @return the replicas count
492
     */
493
    public int getNumReplicas() { return getOrUnset(numReplicas); }
1✔
494

495
    /**
496
     * Get the time until the consumer is paused.
497
     * @return paused until time
498
     */
499
    @Nullable
500
    public ZonedDateTime getPauseUntil() {
501
        return pauseUntil;
1✔
502
    }
503

504
    /**
505
     * Gets the priority policy of this consumer configuration. Defaults to PriorityPolicy.None
506
     * @return the priority policy.
507
     */
508
    @NonNull
509
    public PriorityPolicy getPriorityPolicy() {
510
        return GetOrDefault(priorityPolicy);
1✔
511
    }
512

513
    /**
514
     * For pinned_client priority policy how long before the client times out
515
     * @return the duration
516
     */
517
    @Nullable
518
    public Duration getPriorityTimeout() {
519
        return priorityTimeout;
1✔
520
    }
521

522
    /**
523
     * Gets whether deliver policy of this consumer configuration was set or left unset
524
     * @return true if the policy was set, false if the policy was not set
525
     */
526
    public boolean deliverPolicyWasSet() {
527
        return deliverPolicy != null;
1✔
528
    }
529

530
    /**
531
     * Gets whether ack policy for this consumer configuration was set or left unset
532
     * @return true if the policy was set, false if the policy was not set
533
     */
534
    public boolean ackPolicyWasSet() {
535
        return ackPolicy != null;
1✔
536
    }
537

538
    /**
539
     * Gets whether replay policy for this consumer configuration was set or left unset
540
     * @return true if the policy was set, false if the policy was not set
541
     */
542
    public boolean replayPolicyWasSet() {
543
        return replayPolicy != null;
1✔
544
    }
545

546
    /**
547
     * Gets whether start sequence for this consumer configuration was set or left unset
548
     * @return true if the start sequence was set by the user
549
     */
550
    public boolean startSeqWasSet() {
551
        return startSeq != null;
1✔
552
    }
553

554
    /**
555
     * Gets whether max deliver for this consumer configuration was set or left unset
556
     * @return true if max deliver was set by the user
557
     */
558
    public boolean maxDeliverWasSet() {
559
        return maxDeliver != null;
1✔
560
    }
561

562
    /**
563
     * Gets whether rate limit for this consumer configuration was set or left unset
564
     * @return true if rate limit was set by the user
565
     */
566
    public boolean rateLimitWasSet() {
567
        return rateLimit != null;
1✔
568
    }
569

570
    /**
571
     * Gets whether max ack pending for this consumer configuration was set or left unset
572
     * @return true if mac ack pending was set by the user
573
     */
574
    public boolean maxAckPendingWasSet() {
575
        return maxAckPending != null;
1✔
576
    }
577

578
    /**
579
     * Gets whether max pull waiting for this consumer configuration was set or left unset
580
     * @return true if max pull waiting was set by the user
581
     */
582
    public boolean maxPullWaitingWasSet() {
583
        return maxPullWaiting != null;
1✔
584
    }
585

586
    /**
587
     * Gets whether max batch for this consumer configuration was set or left unset
588
     * @return true if max batch was set by the user
589
     */
590
    public boolean maxBatchWasSet() {
591
        return maxBatch != null;
1✔
592
    }
593

594
    /**
595
     * Gets whether max bytes for this consumer configuration was set or left unset
596
     * @return true if max bytes was set by the user
597
     */
598
    public boolean maxBytesWasSet() {
599
        return maxBytes != null;
1✔
600
    }
601

602
    /**
603
     * Gets whether flow control for this consumer configuration was set or left unset
604
     * @return true if the policy was set, false if the policy was not set
605
     */
606
    public boolean flowControlWasSet() {
607
        return flowControl != null;
1✔
608
    }
609

610
    /**
611
     * Gets whether headers only for this consumer configuration was set or left unset
612
     * @return true if the policy was set, false if the policy was not set
613
     */
614
    public boolean headersOnlyWasSet() {
615
        return headersOnly != null;
1✔
616
    }
617

618
    /**
619
     * Gets whether mem storage for this consumer configuration was set or left unset
620
     * @return true if the policy was set, false if the policy was not set
621
     */
622
    public boolean memStorageWasSet() {
623
        return memStorage != null;
1✔
624
    }
625

626
    /**
627
     * Gets whether num replicas for this consumer configuration was set or left unset
628
     * @return true if num replicas was set by the user
629
     */
630
    public boolean numReplicasWasSet() {
631
        return numReplicas != null;
1✔
632
    }
633

634
    /**
635
     * Gets whether backoff for this consumer configuration was set or left unset
636
     * @return true if num backoff was set by the user
637
     */
638
    public boolean backoffWasSet() {
639
        return backoff != null;
1✔
640
    }
641

642
    /**
643
     * Gets whether metadata for this consumer configuration was set or left unset
644
     * @return true if num metadata was set by the user
645
     */
646
    public boolean metadataWasSet() {
647
        return metadata != null;
1✔
648
    }
649

650
    /**
651
     * Gets whether priority policy for this consumer configuration was set or left unset
652
     * @return true if the policy was set, false if the policy was not set
653
     */
654
    public boolean priorityPolicyWasSet() {
655
        return priorityPolicy != null;
1✔
656
    }
657

658
    /**
659
     * Gets whether priority timeout for this consumer configuration was set or left unset
660
     * @return true if the timeout was set, false if the timeout was not set
661
     */
662
    public boolean priorityTimeoutWasSet() {
NEW
663
        return priorityTimeout != null;
×
664
    }
665

666
    /**
667
     * Creates a builder for the options.
668
     * @return a publish options builder
669
     */
670
    public static Builder builder() {
671
        return new Builder();
1✔
672
    }
673

674
    /**
675
     * Creates a builder for the options.
676
     * @param cc the consumer configuration
677
     * @return a publish options builder
678
     */
679
    public static Builder builder(ConsumerConfiguration cc) {
680
        return cc == null ? new Builder() : new Builder(cc);
1✔
681
    }
682

683
    /**
684
     * ConsumerConfiguration is created using a Builder. The builder supports chaining and will
685
     * create a default set of options if no methods are calls.
686
     *
687
     * <p>{@code new ConsumerConfiguration.Builder().build()} will create a default ConsumerConfiguration.
688
     *
689
     */
690
    public static class Builder {
691
        private DeliverPolicy deliverPolicy;
692
        private AckPolicy ackPolicy;
693
        private ReplayPolicy replayPolicy;
694

695
        private String description;
696
        private String durable;
697
        private String name;
698
        private String deliverSubject;
699
        private String deliverGroup;
700
        private String sampleFrequency;
701

702
        private ZonedDateTime startTime;
703
        private Duration ackWait;
704
        private Duration idleHeartbeat;
705
        private Duration maxExpires;
706
        private Duration inactiveThreshold;
707

708
        private Long startSeq;
709
        private Integer maxDeliver;
710
        private Long rateLimit;
711
        private Integer maxAckPending;
712
        private Integer maxPullWaiting;
713
        private Integer maxBatch;
714
        private Integer maxBytes;
715
        private Integer numReplicas;
716
        private ZonedDateTime pauseUntil;
717

718
        private Boolean flowControl;
719
        private Boolean headersOnly;
720
        private Boolean memStorage;
721

722
        private List<Duration> backoff;
723
        private Map<String, String> metadata;
724
        private List<String> filterSubjects;
725

726
        private List<String> priorityGroups;
727
        private PriorityPolicy priorityPolicy;
728
        private Duration priorityTimeout;
729

730
        /**
731
         * Construct the builder
732
         */
733
        public Builder() {}
1✔
734

735
        /**
736
         * Construct the builder and initialize values with the existing ConsumerConfiguration
737
         * @param cc the consumer configuration to clone
738
         */
739
        public Builder(ConsumerConfiguration cc) {
1✔
740
            if (cc != null) {
1✔
741
                this.deliverPolicy = cc.deliverPolicy;
1✔
742
                this.ackPolicy = cc.ackPolicy;
1✔
743
                this.replayPolicy = cc.replayPolicy;
1✔
744

745
                this.description = cc.description;
1✔
746
                this.durable = cc.durable;
1✔
747
                this.name = cc.name;
1✔
748
                this.deliverSubject = cc.deliverSubject;
1✔
749
                this.deliverGroup = cc.deliverGroup;
1✔
750
                this.sampleFrequency = cc.sampleFrequency;
1✔
751

752
                this.startTime = cc.startTime;
1✔
753
                this.ackWait = cc.ackWait;
1✔
754
                this.idleHeartbeat = cc.idleHeartbeat;
1✔
755
                this.maxExpires = cc.maxExpires;
1✔
756
                this.inactiveThreshold = cc.inactiveThreshold;
1✔
757

758
                this.startSeq = cc.startSeq;
1✔
759
                this.maxDeliver = cc.maxDeliver;
1✔
760
                this.rateLimit = cc.rateLimit;
1✔
761
                this.maxAckPending = cc.maxAckPending;
1✔
762
                this.maxPullWaiting = cc.maxPullWaiting;
1✔
763
                this.maxBatch = cc.maxBatch;
1✔
764
                this.maxBytes = cc.maxBytes;
1✔
765
                this.numReplicas = cc.numReplicas;
1✔
766
                this.pauseUntil = cc.pauseUntil;
1✔
767

768
                this.flowControl = cc.flowControl;
1✔
769
                this.headersOnly = cc.headersOnly;
1✔
770
                this.memStorage = cc.memStorage;
1✔
771

772
                if (cc.backoff != null) {
1✔
773
                    this.backoff = new ArrayList<>(cc.backoff);
×
774
                }
775
                if (cc.metadata != null) {
1✔
776
                    this.metadata = new HashMap<>(cc.metadata);
1✔
777
                }
778
                if (cc.filterSubjects != null) {
1✔
779
                    this.filterSubjects = new ArrayList<>(cc.filterSubjects);
1✔
780
                }
781

782
                if (cc.priorityGroups != null) {
1✔
783
                    this.priorityGroups = new ArrayList<>(cc.priorityGroups);
×
784
                }
785
                this.priorityPolicy = cc.priorityPolicy;
1✔
786
                this.priorityTimeout = cc.priorityTimeout;
1✔
787
            }
788
        }
1✔
789

790
        /**
791
         * Initialize values from the json string.
792
         * @param json the json string to parse
793
         * @return the builder
794
         * @throws JsonParseException if the json is invalid
795
         */
796
        public Builder json(String json) throws JsonParseException {
797
            return jsonValue(JsonParser.parse(json));
1✔
798
        }
799

800
        /**
801
         * Initialize values from the JsonValue object.
802
         * @param jsonValue the json value object
803
         * @return the builder
804
         */
805
        public Builder jsonValue(JsonValue jsonValue) {
806
            deliverPolicy(DeliverPolicy.get(readString(jsonValue, DELIVER_POLICY)));
1✔
807
            ackPolicy(AckPolicy.get(readString(jsonValue, ACK_POLICY)));
1✔
808

809
            replayPolicy(ReplayPolicy.get(readString(jsonValue, REPLAY_POLICY)));
1✔
810

811
            description(readString(jsonValue, DESCRIPTION));
1✔
812
            durable(readString(jsonValue, DURABLE_NAME));
1✔
813
            name(readString(jsonValue, NAME));
1✔
814
            deliverSubject(readString(jsonValue, DELIVER_SUBJECT));
1✔
815
            deliverGroup(readString(jsonValue, DELIVER_GROUP));
1✔
816
            sampleFrequency(readString(jsonValue, SAMPLE_FREQ));
1✔
817
            startTime(readDate(jsonValue, OPT_START_TIME));
1✔
818
            ackWait(readNanos(jsonValue, ACK_WAIT));
1✔
819
            maxExpires(readNanos(jsonValue, MAX_EXPIRES));
1✔
820
            inactiveThreshold(readNanos(jsonValue, INACTIVE_THRESHOLD));
1✔
821

822
            startSequence(readLong(jsonValue, OPT_START_SEQ));
1✔
823
            maxDeliver(readLong(jsonValue, MAX_DELIVER, INTEGER_UNSET));
1✔
824
            rateLimit(readLong(jsonValue, RATE_LIMIT_BPS));
1✔
825
            maxAckPending(readLong(jsonValue, MAX_ACK_PENDING));
1✔
826
            maxPullWaiting(readLong(jsonValue, MAX_WAITING));
1✔
827
            maxBatch(readLong(jsonValue, MAX_BATCH));
1✔
828
            maxBytes(readLong(jsonValue, MAX_BYTES));
1✔
829

830
            Integer r = readInteger(jsonValue, NUM_REPLICAS);
1✔
831
            if (r != null) {
1✔
832
                if (r == 0) {
1✔
833
                    numReplicas = 0;
1✔
834
                }
835
                else {
836
                    numReplicas(r);
1✔
837
                }
838
            }
839

840
            pauseUntil(readDate(jsonValue, PAUSE_UNTIL));
1✔
841

842
            Duration idleHeartbeat = readNanos(jsonValue, IDLE_HEARTBEAT);
1✔
843
            if (idleHeartbeat != null) {
1✔
844
                if (readBoolean(jsonValue, FLOW_CONTROL, false)) {
1✔
845
                    flowControl(idleHeartbeat);
1✔
846
                }
847
                else {
848
                    idleHeartbeat(idleHeartbeat);
1✔
849
                }
850
            }
851

852
            headersOnly(readBoolean(jsonValue, HEADERS_ONLY, null));
1✔
853
            memStorage(readBoolean(jsonValue, MEM_STORAGE, null));
1✔
854

855
            //noinspection DataFlowIssue readNanosList with false ensures not null;
856
            backoff(readNanosList(jsonValue, BACKOFF, false).toArray(new Duration[0]));
1✔
857

858
            metadata(readStringStringMap(jsonValue, METADATA));
1✔
859

860
            String fs = emptyAsNull(readString(jsonValue, FILTER_SUBJECT));
1✔
861
            if (fs == null) {
1✔
862
                filterSubjects(readOptionalStringList(jsonValue, FILTER_SUBJECTS));
1✔
863
            }
864
            else {
865
                filterSubject(fs);
1✔
866
            }
867

868
            priorityGroups(readOptionalStringList(jsonValue, PRIORITY_GROUPS));
1✔
869
            priorityPolicy(PriorityPolicy.get(readString(jsonValue, PRIORITY_POLICY)));
1✔
870
            priorityTimeout(readNanos(jsonValue, PRIORITY_TIMEOUT));
1✔
871

872
            return this;
1✔
873
        }
874

875
        /**
876
         * Sets the description
877
         * @param description the description
878
         * @return the builder
879
         */
880
        public Builder description(String description) {
881
            this.description = emptyAsNull(description);
1✔
882
            return this;
1✔
883
        }
884

885
        /**
886
         * Sets the name of the durable consumer.
887
         * Null or empty clears the field.
888
         * @param durable name of the durable consumer.
889
         * @return the builder
890
         */
891
        public Builder durable(String durable) {
892
            this.durable = validateDurable(durable, false);
1✔
893
            return this;
1✔
894
        }
895

896
        /**
897
         * Sets the name of the consumer.
898
         * Null or empty clears the field.
899
         * @param name name of the consumer.
900
         * @return the builder
901
         */
902
        public Builder name(String name) {
903
            this.name = validateConsumerName(name, false);
1✔
904
            return this;
1✔
905
        }
906

907
        /**
908
         * Sets the delivery policy of the ConsumerConfiguration.
909
         * @param policy the delivery policy.
910
         * @return Builder
911
         */
912
        public Builder deliverPolicy(DeliverPolicy policy) {
913
            this.deliverPolicy = policy;
1✔
914
            return this;
1✔
915
        }
916

917
        /**
918
         * Sets the subject to deliver messages to.
919
         * <p> By setting the deliverySubject this configuration will create a <b>push consumer</b>. When left empty or set to NULL a pull consumer will be created.
920
         * @param subject the subject.
921
         * @return the builder
922
         */
923
        public Builder deliverSubject(String subject) {
924
            this.deliverSubject = emptyAsNull(subject);
1✔
925
            return this;
1✔
926
        }
927

928
        /**
929
         * Sets the group to deliver messages to.
930
         * @param group the delivery group.
931
         * @return the builder
932
         */
933
        public Builder deliverGroup(String group) {
934
            this.deliverGroup = emptyAsNull(group);
1✔
935
            return this;
1✔
936
        }
937

938
        /**
939
         * Sets the start sequence of the ConsumerConfiguration or null to unset / clear.
940
         * @param sequence the start sequence
941
         * @return Builder
942
         */
943
        public Builder startSequence(Long sequence) {
944
            this.startSeq = normalizeUlong(sequence);
1✔
945
            return this;
1✔
946
        }
947

948
        /**
949
         * Sets the start sequence of the ConsumerConfiguration.
950
         * @param sequence the start sequence
951
         * @return Builder
952
         */
953
        public Builder startSequence(long sequence) {
954
            this.startSeq = normalizeUlong(sequence);
1✔
955
            return this;
1✔
956
        }
957

958
        /**
959
         * Sets the start time of the ConsumerConfiguration.
960
         * @param startTime the start time
961
         * @return Builder
962
         */
963
        public Builder startTime(ZonedDateTime startTime) {
964
            this.startTime = startTime;
1✔
965
            return this;
1✔
966
        }
967

968
        /**
969
         * Sets the acknowledgement policy of the ConsumerConfiguration.
970
         * @param policy the acknowledgement policy.
971
         * @return Builder
972
         */
973
        public Builder ackPolicy(AckPolicy policy) {
974
            this.ackPolicy = policy;
1✔
975
            return this;
1✔
976
        }
977

978
        /**
979
         * Sets the acknowledgement wait duration of the ConsumerConfiguration.
980
         * @param timeout the wait timeout
981
         * @return Builder
982
         */
983
        public Builder ackWait(Duration timeout) {
984
            this.ackWait = normalize(timeout);
1✔
985
            return this;
1✔
986
        }
987

988
        /**
989
         * Sets the acknowledgement wait duration of the ConsumerConfiguration.
990
         * @param timeoutMillis the wait timeout in milliseconds
991
         * @return Builder
992
         */
993
        public Builder ackWait(long timeoutMillis) {
994
            this.ackWait = normalizeDuration(timeoutMillis);
1✔
995
            return this;
1✔
996
        }
997

998
        /**
999
         * Sets the maximum delivery amount of the ConsumerConfiguration or null to unset / clear.
1000
         * @param maxDeliver the maximum delivery amount
1001
         * @return Builder
1002
         */
1003
        public Builder maxDeliver(Long maxDeliver) {
1004
            this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN);
1✔
1005
            return this;
1✔
1006
        }
1007

1008
        /**
1009
         * Sets the maximum delivery amount of the ConsumerConfiguration.
1010
         * @param maxDeliver the maximum delivery amount
1011
         * @return Builder
1012
         */
1013
        public Builder maxDeliver(long maxDeliver) {
1014
            this.maxDeliver = normalize(maxDeliver, MAX_DELIVER_MIN);
1✔
1015
            return this;
1✔
1016
        }
1017

1018
        /**
1019
         * Sets the filter subject of the ConsumerConfiguration.
1020
         * Replaces any other filter subjects set in the builder
1021
         * @param filterSubject the filter subject
1022
         * @return Builder
1023
         */
1024
        public Builder filterSubject(String filterSubject) {
1025
            if (nullOrEmpty(filterSubject)) {
1✔
1026
                this.filterSubjects = null;
1✔
1027
            }
1028
            else {
1029
                this.filterSubjects = Collections.singletonList(filterSubject);
1✔
1030
            }
1031
            return this;
1✔
1032
        }
1033

1034
        /**
1035
         * Sets the filter subjects of the ConsumerConfiguration.
1036
         * Replaces any other filter subjects set in the builder
1037
         * @param filterSubjects one or more filter subjects
1038
         * @return Builder
1039
         */
1040
        public Builder filterSubjects(String... filterSubjects) {
1041
            return filterSubjects(Arrays.asList(filterSubjects));
1✔
1042
        }
1043

1044
        /**
1045
         * Sets the filter subjects of the ConsumerConfiguration.
1046
         * Replaces any other filter subjects set in the builder
1047
         * @param filterSubjects the list of filter subjects
1048
         * @return Builder
1049
         */
1050
        public Builder filterSubjects(List<String> filterSubjects) {
1051
            this.filterSubjects = new ArrayList<>();
1✔
1052
            if (filterSubjects != null) {
1✔
1053
                for (String fs : filterSubjects) {
1✔
1054
                    if (!nullOrEmpty(fs)) {
1✔
1055
                        this.filterSubjects.add(fs);
1✔
1056
                    }
1057
                }
1✔
1058
            }
1059
            if (this.filterSubjects.isEmpty()) {
1✔
1060
                this.filterSubjects = null;
1✔
1061
            }
1062
            return this;
1✔
1063
        }
1064

1065
        /**
1066
         * Sets the replay policy of the ConsumerConfiguration.
1067
         * @param policy the replay policy.
1068
         * @return Builder
1069
         */
1070
        public Builder replayPolicy(ReplayPolicy policy) {
1071
            this.replayPolicy = policy;
1✔
1072
            return this;
1✔
1073
        }
1074

1075
        /**
1076
         * Sets the sample frequency of the ConsumerConfiguration.
1077
         * @param frequency the frequency
1078
         * @return Builder
1079
         */
1080
        public Builder sampleFrequency(String frequency) {
1081
            this.sampleFrequency = emptyAsNull(frequency);
1✔
1082
            return this;
1✔
1083
        }
1084

1085
        /**
1086
         * Set the rate limit of the ConsumerConfiguration or null to unset / clear.
1087
         * @param bitsPerSecond bits per second to deliver
1088
         * @return Builder
1089
         */
1090
        public Builder rateLimit(Long bitsPerSecond) {
1091
            this.rateLimit = normalizeUlong(bitsPerSecond);
1✔
1092
            return this;
1✔
1093
        }
1094

1095
        /**
1096
         * Set the rate limit of the ConsumerConfiguration.
1097
         * @param bitsPerSecond bits per second to deliver
1098
         * @return Builder
1099
         */
1100
        public Builder rateLimit(long bitsPerSecond) {
1101
            this.rateLimit = normalizeUlong(bitsPerSecond);
1✔
1102
            return this;
1✔
1103
        }
1104

1105
        /**
1106
         * Sets the maximum ack pending or null to unset / clear.
1107
         * @param maxAckPending maximum pending acknowledgements.
1108
         * @return Builder
1109
         */
1110
        public Builder maxAckPending(Long maxAckPending) {
1111
            this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
1✔
1112
            return this;
1✔
1113
        }
1114

1115
        /**
1116
         * Sets the maximum ack pending.
1117
         * @param maxAckPending maximum pending acknowledgements.
1118
         * @return Builder
1119
         */
1120
        public Builder maxAckPending(long maxAckPending) {
1121
            this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
1✔
1122
            return this;
1✔
1123
        }
1124

1125
        /**
1126
         * sets the idle heart beat wait time
1127
         * @param idleHeartbeat the idle heart beat duration
1128
         * @return Builder
1129
         */
1130
        public Builder idleHeartbeat(Duration idleHeartbeat) {
1131
            if (idleHeartbeat == null) {
1✔
1132
                this.idleHeartbeat = null;
1✔
1133
            }
1134
            else {
1135
                long nanos = idleHeartbeat.toNanos();
1✔
1136
                if (nanos <= DURATION_UNSET_LONG) {
1✔
1137
                    this.idleHeartbeat = DURATION_UNSET;
1✔
1138
                }
1139
                else if (nanos < MIN_IDLE_HEARTBEAT_NANOS) {
1✔
1140
                    throw new IllegalArgumentException("Duration must be greater than or equal to " + MIN_IDLE_HEARTBEAT_NANOS + " nanos.");
1✔
1141
                }
1142
                else {
1143
                    this.idleHeartbeat = idleHeartbeat;
1✔
1144
                }
1145
            }
1146
            return this;
1✔
1147
        }
1148

1149
        /**
1150
         * sets the idle heart beat wait time
1151
         * @param idleHeartbeatMillis the idle heart beat duration in milliseconds
1152
         * @return Builder
1153
         */
1154
        public Builder idleHeartbeat(long idleHeartbeatMillis) {
1155
            if (idleHeartbeatMillis <= DURATION_UNSET_LONG) {
1✔
1156
                this.idleHeartbeat = DURATION_UNSET;
1✔
1157
            }
1158
            else if (idleHeartbeatMillis < MIN_IDLE_HEARTBEAT_MILLIS) {
1✔
1159
                throw new IllegalArgumentException("Duration must be greater than or equal to " + MIN_IDLE_HEARTBEAT_MILLIS + " milliseconds.");
1✔
1160
            }
1161
            else {
1162
                this.idleHeartbeat = Duration.ofMillis(idleHeartbeatMillis);
1✔
1163
            }
1164
            return this;
1✔
1165
        }
1166

1167
        /**
1168
         * set the flow control on and set the idle heartbeat
1169
         * @param idleHeartbeat the idle heart beat duration
1170
         * @return Builder
1171
         */
1172
        public Builder flowControl(Duration idleHeartbeat) {
1173
            this.flowControl = true;
1✔
1174
            return idleHeartbeat(idleHeartbeat);
1✔
1175
        }
1176

1177
        /**
1178
         * set the flow control on and set the idle heartbeat
1179
         * @param idleHeartbeatMillis the idle heart beat duration in milliseconds
1180
         * @return Builder
1181
         */
1182
        public Builder flowControl(long idleHeartbeatMillis) {
1183
            this.flowControl = true;
1✔
1184
            return idleHeartbeat(idleHeartbeatMillis);
1✔
1185
        }
1186

1187
        /**
1188
         * sets the max amount of expire time for the server to allow on pull requests.
1189
         * @param maxExpires the max expire duration
1190
         * @return Builder
1191
         */
1192
        public Builder maxExpires(Duration maxExpires) {
1193
            this.maxExpires = normalize(maxExpires);
1✔
1194
            return this;
1✔
1195
        }
1196

1197
        /**
1198
         * sets the max amount of expire time for the server to allow on pull requests.
1199
         * @param maxExpires the max expire duration in milliseconds
1200
         * @return Builder
1201
         */
1202
        public Builder maxExpires(long maxExpires) {
1203
            this.maxExpires = normalizeDuration(maxExpires);
1✔
1204
            return this;
1✔
1205
        }
1206

1207
        /**
1208
         * sets the amount of time before the consumer is deemed inactive.
1209
         * @param inactiveThreshold the threshold duration
1210
         * @return Builder
1211
         */
1212
        public Builder inactiveThreshold(Duration inactiveThreshold) {
1213
            this.inactiveThreshold = normalize(inactiveThreshold);
1✔
1214
            return this;
1✔
1215
        }
1216

1217
        /**
1218
         * sets the amount of time before the consumer is deemed inactive.
1219
         * @param inactiveThreshold the threshold duration in milliseconds
1220
         * @return Builder
1221
         */
1222
        public Builder inactiveThreshold(long inactiveThreshold) {
1223
            this.inactiveThreshold = normalizeDuration(inactiveThreshold);
1✔
1224
            return this;
1✔
1225
        }
1226

1227
        /**
1228
         * sets the max pull waiting, the number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored.
1229
         * Use null to unset / clear.
1230
         * @param maxPullWaiting the max pull waiting
1231
         * @return Builder
1232
         */
1233
        public Builder maxPullWaiting(Long maxPullWaiting) {
1234
            this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN);
1✔
1235
            return this;
1✔
1236
        }
1237

1238
        /**
1239
         * sets the max pull waiting, the number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored.
1240
         * @param maxPullWaiting the max pull waiting
1241
         * @return Builder
1242
         */
1243
        public Builder maxPullWaiting(long maxPullWaiting) {
1244
            this.maxPullWaiting = normalize(maxPullWaiting, STANDARD_MIN);
1✔
1245
            return this;
1✔
1246
        }
1247

1248
        /**
1249
         * sets the max batch size for the server to allow on pull requests.
1250
         * @param maxBatch the max batch size
1251
         * @return Builder
1252
         */
1253
        public Builder maxBatch(Long maxBatch) {
1254
            this.maxBatch = normalize(maxBatch, STANDARD_MIN);
1✔
1255
            return this;
1✔
1256
        }
1257

1258
        /**
1259
         * sets the max batch size for the server to allow on pull requests.
1260
         * @param maxBatch the max batch size
1261
         * @return Builder
1262
         */
1263
        public Builder maxBatch(long maxBatch) {
1264
            this.maxBatch = normalize(maxBatch, STANDARD_MIN);
1✔
1265
            return this;
1✔
1266
        }
1267

1268
        /**
1269
         * sets the max bytes size for the server to allow on pull requests.
1270
         * @param maxBytes the max bytes size
1271
         * @return Builder
1272
         */
1273
        public Builder maxBytes(Long maxBytes) {
1274
            this.maxBytes = normalize(maxBytes, STANDARD_MIN);
1✔
1275
            return this;
1✔
1276
        }
1277

1278
        /**
1279
         * sets the max bytes size for the server to allow on pull requests.
1280
         * @param maxBytes the max bytes size
1281
         * @return Builder
1282
         */
1283
        public Builder maxBytes(long maxBytes) {
1284
            this.maxBytes = normalize(maxBytes, STANDARD_MIN);
1✔
1285
            return this;
1✔
1286
        }
1287

1288
        /**
1289
         * set the number of replicas for the consumer. When set do not inherit the
1290
         * replica count from the stream but specifically set it to this amount.
1291
         * @param numReplicas number of replicas for the consumer
1292
         * @return Builder
1293
         */
1294
        public Builder numReplicas(Integer numReplicas) {
1295
            this.numReplicas = numReplicas == null ? null : validateNumberOfReplicas(numReplicas);
1✔
1296
            return this;
1✔
1297
        }
1298

1299
        /**
1300
         * Sets the time to pause the consumer until.
1301
         * @param pauseUntil the time to pause
1302
         * @return Builder
1303
         */
1304
        public Builder pauseUntil(ZonedDateTime pauseUntil) {
1305
            this.pauseUntil = pauseUntil;
1✔
1306
            return this;
1✔
1307
        }
1308

1309
        /**
1310
         * set the headers only flag saying to deliver only the headers of
1311
         * messages in the stream and not the bodies
1312
         * @param headersOnly the flag
1313
         * @return Builder
1314
         */
1315
        public Builder headersOnly(Boolean headersOnly) {
1316
            this.headersOnly = headersOnly;
1✔
1317
            return this;
1✔
1318
        }
1319

1320
        /**
1321
         * set the mem storage flag to force the consumer state to be kept
1322
         * in memory rather than inherit the setting from the stream
1323
         * @param memStorage the flag
1324
         * @return Builder
1325
         */
1326
        public Builder memStorage(Boolean memStorage) {
1327
            this.memStorage = memStorage;
1✔
1328
            return this;
1✔
1329
        }
1330

1331
        /**
1332
         * Set the list of backoff. Will override ackwait setting.
1333
         * @see <a href="https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability">Delivery Reliability</a>
1334
         * @param backoffs zero or more backoff durations or an array of backoffs
1335
         * @return Builder
1336
         */
1337
        public Builder backoff(Duration... backoffs) {
1338
            if (backoffs == null || (backoffs.length == 1 && backoffs[0] == null))
1✔
1339
            {
1340
                backoff = null;
1✔
1341
            }
1342
            else
1343
            {
1344
                backoff = new ArrayList<>();
1✔
1345
                for (Duration d : backoffs)
1✔
1346
                {
1347
                    if (d != null)
1✔
1348
                    {
1349
                        if (d.toNanos() < DURATION_MIN_LONG)
1✔
1350
                        {
1351
                            throw new IllegalArgumentException("Backoff cannot be less than " + DURATION_MIN_LONG);
1✔
1352
                        }
1353
                        backoff.add(d);
1✔
1354
                    }
1355
                }
1356
            }
1357
            return this;
1✔
1358
        }
1359

1360
        /**
1361
         * Set the list of backoff. Will override ackwait setting.
1362
         * @see <a href="https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability">Delivery Reliability</a>
1363
         * @param backoffsMillis zero or more backoff in millis or an array of backoffsMillis
1364
         * @return Builder
1365
         */
1366
        public Builder backoff(long... backoffsMillis) {
1367
            if (backoffsMillis == null) {
1✔
1368
                backoff = null;
1✔
1369
            }
1370
            else {
1371
                backoff = new ArrayList<>();
1✔
1372
                for (long ms : backoffsMillis) {
1✔
1373
                    if (ms < DURATION_MIN_LONG) {
1✔
1374
                        throw new IllegalArgumentException("Backoff cannot be less than " + DURATION_MIN_LONG);
1✔
1375
                    }
1376
                    this.backoff.add(Duration.ofMillis(ms));
1✔
1377
                }
1378
            }
1379
            return this;
1✔
1380
        }
1381

1382
        /**
1383
         * Sets the metadata for the configuration
1384
         * @param metadata the metadata map
1385
         * @return Builder
1386
         */
1387
        public Builder metadata(Map<String, String> metadata) {
1388
            this.metadata = metadata == null || metadata.isEmpty() ? null : new HashMap<>(metadata);
1✔
1389
            return this;
1✔
1390
        }
1391

1392
        /**
1393
         * Sets the priority groups of the ConsumerConfiguration.
1394
         * Replaces any other priority groups set in the builder
1395
         * @param priorityGroups one or more priority groups
1396
         * @return Builder
1397
         */
1398
        public Builder priorityGroups(String... priorityGroups) {
1399
            return priorityGroups(Arrays.asList(priorityGroups));
1✔
1400
        }
1401

1402
        /**
1403
         * Sets the priority groups of the ConsumerConfiguration.
1404
         * Replaces any other priority groups set in the builder
1405
         * @param priorityGroups the list of priority groups
1406
         * @return Builder
1407
         */
1408
        public Builder priorityGroups(List<String> priorityGroups) {
1409
            this.priorityGroups = new ArrayList<>();
1✔
1410
            if (priorityGroups != null) {
1✔
1411
                for (String pg : priorityGroups) {
1✔
1412
                    if (!nullOrEmpty(pg)) {
1✔
1413
                        this.priorityGroups.add(pg);
1✔
1414
                    }
1415
                }
1✔
1416
            }
1417
            if (this.priorityGroups.isEmpty()) {
1✔
1418
                this.priorityGroups = null;
1✔
1419
            }
1420
            return this;
1✔
1421
        }
1422

1423
        /**
1424
         * Sets the priority policy of the ConsumerConfiguration.
1425
         * @param policy the priority policy.
1426
         * @return Builder
1427
         */
1428
        public Builder priorityPolicy(PriorityPolicy policy) {
1429
            this.priorityPolicy = policy;
1✔
1430
            return this;
1✔
1431
        }
1432

1433
        /**
1434
         * Sets the priority policy timeout
1435
         * @param priorityTimeout the timeout
1436
         * @return Builder
1437
         */
1438
        public Builder priorityTimeout(Duration priorityTimeout) {
1439
            this.priorityTimeout = normalize(priorityTimeout);
1✔
1440
            return this;
1✔
1441
        }
1442

1443
        /**
1444
         * Sets the priority policy timeout
1445
         * @param priorityTimeoutMillis the timeout in milliseconds
1446
         * @return Builder
1447
         */
1448
        public Builder priorityTimeout(long priorityTimeoutMillis) {
NEW
1449
            this.priorityTimeout = normalizeDuration(priorityTimeoutMillis);
×
NEW
1450
            return this;
×
1451
        }
1452

1453
        /**
1454
         * Builds the ConsumerConfiguration
1455
         * @return The consumer configuration.
1456
         */
1457
        public ConsumerConfiguration build() {
1458
            validateMustMatchIfBothSupplied(name, durable, JsConsumerNameDurableMismatch);
1✔
1459
            return new ConsumerConfiguration(this);
1✔
1460
        }
1461

1462
        /**
1463
         * Builds the PushSubscribeOptions with this configuration
1464
         * @return The PushSubscribeOptions.
1465
         */
1466
        public PushSubscribeOptions buildPushSubscribeOptions() {
1467
            return PushSubscribeOptions.builder().configuration(build()).build();
1✔
1468
        }
1469

1470
        /**
1471
         * Builds the PushSubscribeOptions with this configuration.
1472
         * Providing the stream is a hint for the subscription process that
1473
         * saves a call to the server. Assumes the stream is the correct stream
1474
         * for the subject filter, otherwise the server will return an error
1475
         * which the subscription call will raise to the user.
1476
         * @param stream the stream for this consumer
1477
         * @return The PushSubscribeOptions.
1478
         */
1479
        public PushSubscribeOptions buildPushSubscribeOptions(String stream) {
1480
            return PushSubscribeOptions.builder().configuration(build()).stream(stream).build();
×
1481
        }
1482

1483
        /**
1484
         * Builds the PullSubscribeOptions with this configuration
1485
         * @return The PullSubscribeOptions.
1486
         */
1487
        public PullSubscribeOptions buildPullSubscribeOptions() {
1488
            return PullSubscribeOptions.builder().configuration(build()).build();
1✔
1489
        }
1490

1491
        /**
1492
         * Builds the PullSubscribeOptions with this configuration
1493
         * Providing the stream is a hint for the subscription process that
1494
         * saves a call to the server. Assumes the stream is the correct stream
1495
         * for the subject filter, otherwise the server will return an error
1496
         * which the subscription call will raise to the user.
1497
         * @param stream the stream for this consumer
1498
         * @return The PullSubscribeOptions.
1499
         */
1500
        public PullSubscribeOptions buildPullSubscribeOptions(String stream) {
1501
            return PullSubscribeOptions.builder().configuration(build()).stream(stream).build();
×
1502
        }
1503
    }
1504

1505
    @Override
1506
    public String toString() {
1507
        return "ConsumerConfiguration " + toJson();
1✔
1508
    }
1509

1510
    protected static int getOrUnset(Integer val)
1511
    {
1512
        return val == null ? INTEGER_UNSET : val;
1✔
1513
    }
1514

1515
    protected static long getOrUnsetUlong(Long val)
1516
    {
1517
        return val == null || val < 0 ? ULONG_UNSET : val;
1✔
1518
    }
1519

1520
    protected static Duration getOrUnset(Duration val)
1521
    {
1522
        return val == null ? DURATION_UNSET : val;
1✔
1523
    }
1524

1525
    protected static Integer normalize(Long l, int min) {
1526
        if (l == null) {
1✔
1527
            return null;
1✔
1528
        }
1529

1530
        if (l < min) {
1✔
1531
            return INTEGER_UNSET;
1✔
1532
        }
1533

1534
        if (l > Integer.MAX_VALUE) {
1✔
1535
            return Integer.MAX_VALUE;
1✔
1536
        }
1537

1538
        return l.intValue();
1✔
1539
    }
1540

1541
    protected static Long normalizeUlong(Long u)
1542
    {
1543
        return u == null ? null : u <= ULONG_UNSET ? ULONG_UNSET : u;
1✔
1544
    }
1545

1546
    protected static Duration normalize(Duration d)
1547
    {
1548
        return d == null ? null : d.toNanos() <= DURATION_UNSET_LONG ? DURATION_UNSET : d;
1✔
1549
    }
1550

1551
    protected static Duration normalizeDuration(long millis)
1552
    {
1553
        return millis <= DURATION_UNSET_LONG ? DURATION_UNSET : Duration.ofMillis(millis);
1✔
1554
    }
1555

1556
    protected static DeliverPolicy GetOrDefault(DeliverPolicy p) { return p == null ? DEFAULT_DELIVER_POLICY : p; }
1✔
1557
    protected static AckPolicy GetOrDefault(AckPolicy p) { return p == null ? DEFAULT_ACK_POLICY : p; }
1✔
1558
    protected static ReplayPolicy GetOrDefault(ReplayPolicy p) { return p == null ? DEFAULT_REPLAY_POLICY : p; }
1✔
1559
    protected static PriorityPolicy GetOrDefault(PriorityPolicy p) { return p == null ? DEFAULT_PRIORITY_POLICY : p; }
1✔
1560
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc