• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1922

19 Mar 2025 09:29PM UTC coverage: 95.95% (+0.2%) from 95.799%
#1922

push

github

web-flow
Merge pull request #1293 from nats-io/tune-simple-fetch-nowait

Improve FetchConsumeOptions construction and add test

10 of 13 new or added lines in 3 files covered. (76.92%)

2 existing lines in 2 files now uncovered.

11491 of 11976 relevant lines covered (95.95%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.21
/src/main/java/io/nats/client/BaseConsumeOptions.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client;
15

16
import io.nats.client.api.ConsumerConfiguration;
17
import io.nats.client.support.JsonParseException;
18
import io.nats.client.support.JsonParser;
19
import io.nats.client.support.JsonSerializable;
20
import io.nats.client.support.JsonValue;
21

22
import static io.nats.client.support.ApiConstants.*;
23
import static io.nats.client.support.JsonUtils.*;
24
import static io.nats.client.support.JsonValueUtils.readInteger;
25
import static io.nats.client.support.JsonValueUtils.readLong;
26

27
/**
28
 * Base Consume Options are provided to customize the way the consume and
29
 * fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions.
30
 */
31
public class BaseConsumeOptions implements JsonSerializable {
32
    public static final int DEFAULT_MESSAGE_COUNT = 500;
33
    public static final int DEFAULT_MESSAGE_COUNT_WHEN_BYTES = 1_000_000;
34
    public static final int DEFAULT_THRESHOLD_PERCENT = 25;
35
    public static final long DEFAULT_EXPIRES_IN_MILLIS = 30000;
36
    public static final long MIN_EXPIRES_MILLS = 1000;
37
    public static final long MAX_HEARTBEAT_MILLIS = 30000;
38
    public static final int MAX_IDLE_HEARTBEAT_PERCENT = 50;
39

40
    protected final int messages;
41
    protected final long bytes;
42
    protected final long expiresIn;
43
    protected final long idleHeartbeat;
44
    protected final int thresholdPercent;
45
    protected final boolean noWait;
46
    protected final boolean raiseStatusWarnings;
47

48
    @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
49
    protected BaseConsumeOptions(Builder b) {
1✔
50
        bytes = b.bytes;
1✔
51
        if (bytes > 0) {
1✔
52
            messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
1✔
53
        }
54
        else {
55
            messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
1✔
56
        }
57

58
        // validation handled in builder
59
        thresholdPercent = b.thresholdPercent;
1✔
60
        noWait = b.noWait;
1✔
61
        raiseStatusWarnings = b.raiseStatusWarnings;
1✔
62

63
        // if it's not noWait, it must have an expiresIn
64
        // we can't check this in the builder because we can't guarantee order
65
        // so we always default to LONG_UNSET in the builder and check it here.
66
        if (b.expiresIn == ConsumerConfiguration.LONG_UNSET && !noWait) {
1✔
UNCOV
67
            expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
×
68
        }
69
        else {
70
            expiresIn = b.expiresIn;
1✔
71
        }
72

73
        // calculated
74
        idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
1✔
75
    }
1✔
76

77
    @Override
78
    public String toJson() {
79
        StringBuilder sb = beginJson();
1✔
80
        addField(sb, MESSAGES, messages);
1✔
81
        addField(sb, BYTES, bytes);
1✔
82
        addField(sb, EXPIRES_IN, expiresIn);
1✔
83
        addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
1✔
84
        addField(sb, THRESHOLD_PERCENT, thresholdPercent);
1✔
85
        addFldWhenTrue(sb, RAISE_STATUS_WARNINGS, raiseStatusWarnings);
1✔
86
        addFldWhenTrue(sb, NO_WAIT, noWait);
1✔
87
        return endJson(sb).toString();
1✔
88
    }
89

90
    public long getExpiresInMillis() {
91
        return expiresIn;
1✔
92
    }
93

94
    public long getIdleHeartbeat() {
95
        return idleHeartbeat;
1✔
96
    }
97

98
    public int getThresholdPercent() {
99
        return thresholdPercent;
1✔
100
    }
101

102
    public boolean isNoWait() {
103
        return noWait;
1✔
104
    }
105

106
    public boolean raiseStatusWarnings() {
107
        return raiseStatusWarnings;
1✔
108
    }
109

110
    protected static abstract class Builder<B, CO> {
1✔
111
        protected int messages = -1;
1✔
112
        protected long bytes = 0;
1✔
113
        protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
1✔
114
        protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
1✔
115
        protected boolean noWait = false;
1✔
116
        protected boolean raiseStatusWarnings = false;
1✔
117

118
        protected abstract B getThis();
119

120
        /**
121
         * Initialize values from the json string.
122
         * @param json the json string to parse
123
         * @return the builder
124
         * @throws JsonParseException if the json is invalid
125
         */
126
        public B json(String json) throws JsonParseException {
127
            return jsonValue(JsonParser.parse(json));
1✔
128
        }
129

130
        /**
131
         * Initialize values from the JsonValue object.
132
         * @param jsonValue the json value object
133
         * @return the builder
134
         */
135
        public B jsonValue(JsonValue jsonValue) {
136
            messages(readInteger(jsonValue, MESSAGES, -1));
1✔
137
            bytes(readLong(jsonValue, BYTES, -1));
1✔
138
            expiresIn(readLong(jsonValue, EXPIRES_IN, MIN_EXPIRES_MILLS));
1✔
139
            thresholdPercent(readInteger(jsonValue, THRESHOLD_PERCENT, -1));
1✔
140
            return getThis();
1✔
141
        }
142

143
        protected B messages(int messages) {
144
            this.messages = messages < 1 ? -1 : messages;
1✔
145
            return getThis();
1✔
146
        }
147

148
        protected B bytes(long bytes) {
149
            this.bytes = bytes < 1 ? 0 : bytes;
1✔
150
            return getThis();
1✔
151
        }
152

153
        /**
154
         * In Fetch, sets the maximum amount of time to wait to reach the batch size or max byte.
155
         * In Consume, sets the maximum amount of time for an individual pull to be open
156
         * before issuing a replacement pull.
157
         * <p>Zero or less will default to {@value BaseConsumeOptions#DEFAULT_EXPIRES_IN_MILLIS},
158
         * otherwise, cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS}</p>
159
         * @param expiresInMillis the expiration time in milliseconds
160
         * @return the builder
161
         */
162
        public B expiresIn(long expiresInMillis) {
163
            if (expiresInMillis < 1) {
1✔
164
                expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
1✔
165
            }
166
            else if (expiresInMillis < MIN_EXPIRES_MILLS) {
1✔
167
                throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);
1✔
168
            }
169
            else {
170
                expiresIn = expiresInMillis;
1✔
171
            }
172
            return getThis();
1✔
173
        }
174

175
        /**
176
         * Set the threshold percent of max bytes (if max bytes is specified) or messages
177
         * that will trigger issuing pull requests to keep messages flowing.
178
         * <p>Only applies to endless consumes.</p>
179
         * <p>For instance if the batch size is 100 and the re-pull percent is 25,
180
         * the first pull will be for 100, and then when 25 messages have been received
181
         * another 75 will be requested, keeping the number of messages in transit always at 100.</p>
182
         * <p>Must be between 1 and 100 inclusive.
183
         * Less than 1 will assume the default of {@value BaseConsumeOptions#DEFAULT_THRESHOLD_PERCENT}.
184
         * Greater than 100 will assume 100. </p>
185
         * @param thresholdPercent the threshold percent
186
         * @return the builder
187
         */
188
        public B thresholdPercent(int thresholdPercent) {
189
            this.thresholdPercent = thresholdPercent < 1 ? DEFAULT_THRESHOLD_PERCENT : Math.min(100, thresholdPercent);
1✔
190
            return getThis();
1✔
191
        }
192

193
        /**
194
         * Raise status warning turns on sending status messages to the error listener.
195
         * The default of to not raise status warning
196
         * @return the builder
197
         */
198
        public B raiseStatusWarnings() {
199
            this.raiseStatusWarnings = true;
1✔
200
            return getThis();
1✔
201
        }
202

203
        /**
204
         * Turn on or off raise status warning turns. When on, status messages are sent to the error listener.
205
         * The default of to not raise status warning
206
         * @return the builder
207
         */
208
        public B raiseStatusWarnings(boolean raiseStatusWarnings) {
209
            this.raiseStatusWarnings = raiseStatusWarnings;
1✔
210
            return getThis();
1✔
211
        }
212

213
        /**
214
         * Build the options.
215
         * @return the built options
216
         */
217
        public abstract CO build();
218
    }
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc