• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2222

25 Sep 2025 02:30PM UTC coverage: 95.522% (-0.05%) from 95.571%
#2222

push

github

web-flow
Merge pull request #1433 from nats-io/prioritized

(2.12) Prioritized Consumer Support

36 of 41 new or added lines in 10 files covered. (87.8%)

4 existing lines in 2 files now uncovered.

12137 of 12706 relevant lines covered (95.52%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
/src/main/java/io/nats/client/PullRequestOptions.java
1
// Copyright 2022 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client;
15

16
import io.nats.client.support.JsonSerializable;
17
import io.nats.client.support.JsonUtils;
18
import org.jspecify.annotations.NonNull;
19

20
import java.time.Duration;
21

22
import static io.nats.client.support.ApiConstants.*;
23
import static io.nats.client.support.Validator.validateGtZero;
24

25
/**
26
 * The PullRequestOptions class specifies the options for pull requests
27
 */
28
public class PullRequestOptions implements JsonSerializable {
29

30
    private final int batchSize;
31
    private final long maxBytes;
32
    private final boolean noWait;
33
    private final Duration expiresIn;
34
    private final Duration idleHeartbeat;
35
    private final String group;
36
    private final int priority;
37
    private final long minPending;
38
    private final long minAckPending;
39

40
    public PullRequestOptions(Builder b) {
1✔
41
        this.batchSize = b.batchSize;
1✔
42
        this.maxBytes = b.maxBytes;
1✔
43
        this.noWait = b.noWait;
1✔
44
        this.expiresIn = b.expiresIn;
1✔
45
        this.idleHeartbeat = b.idleHeartbeat;
1✔
46
        this.group = b.group;
1✔
47
        this.priority = b.priority;
1✔
48
        this.minPending = b.minPending < 0 ? -1 : b.minPending;
1✔
49
        this.minAckPending = b.minAckPending < 0 ? -1 : b.minAckPending;
1✔
50
    }
1✔
51

52
    @Override
53
    @NonNull
54
    public String toJson() {
55
        StringBuilder sb = JsonUtils.beginJson();
1✔
56
        JsonUtils.addField(sb, BATCH, batchSize);
1✔
57
        JsonUtils.addField(sb, MAX_BYTES, maxBytes);
1✔
58
        JsonUtils.addFldWhenTrue(sb, NO_WAIT, noWait);
1✔
59
        JsonUtils.addFieldAsNanos(sb, EXPIRES, expiresIn);
1✔
60
        JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
1✔
61
        JsonUtils.addField(sb, GROUP, group);
1✔
62
        JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority);
1✔
63
// TODO - PINNED CONSUMER SUPPORT
64
//        JsonUtils.addField(sb, ID, getPinId());
65
        JsonUtils.addField(sb, MIN_PENDING, minPending);
1✔
66
        JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
1✔
67
        return JsonUtils.endJson(sb).toString();
1✔
68
    }
69

70
// TODO - PINNED CONSUMER SUPPORT
71
//    protected String getPinId() {
72
//        return null;
73
//    }
74

75
    /**
76
     * Get the batch size option value
77
     * @return the batch size
78
     */
79
    public int getBatchSize() {
80
        return batchSize;
1✔
81
    }
82

83
    /**
84
     * Get the max bytes size option value
85
     * @return the max bytes size
86
     */
87
    public long getMaxBytes() {
88
        return maxBytes;
1✔
89
    }
90

91
    /**
92
     * Get the no wait flag value
93
     * @return the flag
94
     */
95
    public boolean isNoWait() {
96
        return noWait;
1✔
97
    }
98

99
    /**
100
     * Get the expires in option value
101
     * @return the expires in duration
102
     */
103
    public Duration getExpiresIn() {
104
        return expiresIn;
1✔
105
    }
106

107
    /**
108
     * Get the idle heartbeat option value
109
     * @return the idle heartbeat duration
110
     */
111
    public Duration getIdleHeartbeat() {
112
        return idleHeartbeat;
1✔
113
    }
114

115
    public String getGroup() {
116
        return group;
1✔
117
    }
118

NEW
119
    public int getPriority() { return priority; }
×
120

121
    public long getMinPending() {
122
        return minPending;
1✔
123
    }
124

125
    public long getMinAckPending() {
126
        return minAckPending;
1✔
127
    }
128

129
    /**
130
     * Creates a builder for the pull options, with batch size since it's always required
131
     * @param batchSize the size of the batch. Must be greater than 0
132
     * @return a pull options builder
133
     */
134
    public static Builder builder(int batchSize) {
135
        return new Builder().batchSize(batchSize);
1✔
136
    }
137

138
    /**
139
     * Creates a builder for the pull options, setting no wait to true and accepting batch size
140
     * @param batchSize the size of the batch. Must be greater than 0
141
     * @return a pull options builder
142
     */
143
    public static Builder noWait(int batchSize) {
144
        return new Builder().batchSize(batchSize).noWait();
1✔
145
    }
146

147
    public static class Builder {
1✔
148
        private int batchSize;
149
        private long maxBytes;
150
        private boolean noWait;
151
        private Duration expiresIn;
152
        private Duration idleHeartbeat;
153
        private String group;
154
        private int priority;
155
        private long minPending = -1;
1✔
156
        private long minAckPending = -1;
1✔
157

158
        /**
159
         * Set the batch size for the pull
160
         * @param batchSize the size of the batch. Must be greater than 0
161
         * @return the builder
162
         */
163
        public Builder batchSize(int batchSize) {
164
            this.batchSize = batchSize;
1✔
165
            return this;
1✔
166
        }
167

168
        /**
169
         * The maximum bytes for the pull
170
         * @param maxBytes the maximum bytes
171
         * @return the builder
172
         */
173
        public Builder maxBytes(long maxBytes) {
174
            this.maxBytes = maxBytes;
1✔
175
            return this;
1✔
176
        }
177

178
        /**
179
         * Set no wait to true
180
         * @return the builder
181
         */
182
        public Builder noWait() {
183
            this.noWait = true;
1✔
184
            return this;
1✔
185
        }
186

187
        /**
188
         * Set the no wait flag
189
         * @param noWait the flag
190
         * @return the builder
191
         */
192
        public Builder noWait(boolean noWait) {
193
            this.noWait = noWait;
1✔
194
            return this;
1✔
195
        }
196

197
        /**
198
         * Set the expires time in millis
199
         * @param expiresInMillis the millis
200
         * @return the builder
201
         */
202
        public Builder expiresIn(long expiresInMillis) {
203
            this.expiresIn = Duration.ofMillis(expiresInMillis);
1✔
204
            return this;
1✔
205
        }
206

207
        /**
208
         * Set the expires duration
209
         * @param expiresIn the duration
210
         * @return the builder
211
         */
212
        public Builder expiresIn(Duration expiresIn) {
213
            this.expiresIn = expiresIn;
1✔
214
            return this;
1✔
215
        }
216

217
        /**
218
         * Set the idle heartbeat time in millis
219
         * @param idleHeartbeatMillis the millis
220
         * @return the builder
221
         */
222
        public Builder idleHeartbeat(long idleHeartbeatMillis) {
223
            this.idleHeartbeat = Duration.ofMillis(idleHeartbeatMillis);
1✔
224
            return this;
1✔
225
        }
226

227
        /**
228
         * Set the idle heartbeat duration
229
         * @param idleHeartbeat the duration
230
         * @return the builder
231
         */
232
        public Builder idleHeartbeat(Duration idleHeartbeat) {
233
            this.idleHeartbeat = idleHeartbeat;
1✔
234
            return this;
1✔
235
        }
236

237
        /**
238
         * Sets the group
239
         * Replaces any other groups set in the builder
240
         * @param group the priority group for this pull
241
         * @return Builder
242
         */
243
        public Builder group(String group) {
244
            this.group = group;
1✔
245
            return this;
1✔
246
        }
247

248
        /**
249
         * Sets the priority within the group. Priority must be between 0 and 9 inclusive.
250
         * @param priority the priority
251
         * @return Builder
252
         */
253
        public Builder priority(int priority) {
254
            this.priority = priority;
1✔
255
            return this;
1✔
256
        }
257

258
        /**
259
         * When specified, the pull request will only receive messages when the consumer has at least this many pending messages.
260
         * @param minPending the min pending
261
         * @return the builder
262
         */
263
        public Builder minPending(long minPending) {
264
            this.minPending = minPending < 1 ? -1 : minPending;
1✔
265
            return this;
1✔
266
        }
267

268
        /**
269
         * When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
270
         * @param minAckPending the min ack pending
271
         * @return the builder
272
         */
273
        public Builder minAckPending(long minAckPending) {
274
            this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
1✔
275
            return this;
1✔
276
        }
277

278
        /**
279
         * Build the PullRequestOptions.
280
         * <p>Validates that the batch size is greater than 0</p>
281
         * <p>If supplied, validates that the idle heartbeat is valid for the expiration</p>
282
         * @return the built PullRequestOptions
283
         */
284
        public PullRequestOptions build() {
285
            validateGtZero(batchSize, "Pull batch size");
1✔
286
            if (priority < 0 || priority > 9) {
1✔
NEW
287
                throw new IllegalArgumentException("Priority must be between 0 and 9 inclusive.");
×
288
            }
289
            if (idleHeartbeat != null) {
1✔
290
                long idleNanosTemp = idleHeartbeat.toNanos() * 2;
1✔
291
                if (idleNanosTemp > 0) {
1✔
292
                    if (expiresIn == null) {
1✔
293
                        throw new IllegalArgumentException("Idle Heartbeat not allowed without expiration.");
1✔
294
                    }
295
                    long expiresNanos = expiresIn.toNanos();
1✔
296
                    if (idleNanosTemp > expiresNanos) {
1✔
297
                        throw new IllegalArgumentException("Idle Heartbeat cannot be more than half the expiration.");
1✔
298
                    }
299
                }
300
            }
301
            return new PullRequestOptions(this);
1✔
302
        }
303
    }
304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc