• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1922

19 Mar 2025 09:29PM UTC coverage: 95.95% (+0.2%) from 95.799%
#1922

push

github

web-flow
Merge pull request #1293 from nats-io/tune-simple-fetch-nowait

Improve FetchConsumeOptions construction and add test

10 of 13 new or added lines in 3 files covered. (76.92%)

2 existing lines in 2 files now uncovered.

11491 of 11976 relevant lines covered (95.95%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/src/main/java/io/nats/client/FetchConsumeOptions.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client;
15

16
import io.nats.client.api.ConsumerConfiguration;
17
import io.nats.client.support.JsonValue;
18

19
import static io.nats.client.support.ApiConstants.EXPIRES_IN;
20
import static io.nats.client.support.ApiConstants.NO_WAIT;
21
import static io.nats.client.support.JsonValueUtils.readBoolean;
22
import static io.nats.client.support.JsonValueUtils.readLong;
23

24
/**
25
 * Fetch Consume Options are provided to customize the fetch operation.
26
 */
27
public class FetchConsumeOptions extends BaseConsumeOptions {
28
    public static FetchConsumeOptions DEFAULT_FETCH_OPTIONS = FetchConsumeOptions.builder().build();
1✔
29

30
    private FetchConsumeOptions(Builder b) {
31
        super(b);
1✔
32
    }
1✔
33

34
    /**
35
     * The maximum number of messages to fetch.
36
     * @return the maximum number of messages to fetch
37
     */
38
    public int getMaxMessages() {
39
        return messages;
1✔
40
    }
41

42
    /**
43
     * The maximum number of bytes to fetch.
44
     * @return the maximum number of bytes to fetch
45
     */
46
    public long getMaxBytes() {
47
        return bytes;
1✔
48
    }
49

50
    public static Builder builder() {
51
        return new Builder();
1✔
52
    }
53

54
    public static class Builder
1✔
55
        extends BaseConsumeOptions.Builder<Builder, FetchConsumeOptions> {
56

57
        protected Builder getThis() { return this; }
1✔
58

59
        @Override
60
        public Builder jsonValue(JsonValue jsonValue) {
61
            super.jsonValue(jsonValue);
1✔
62
            if (readBoolean(jsonValue, NO_WAIT, false)) {
1✔
NEW
63
                noWaitExpiresIn(readLong(jsonValue, EXPIRES_IN, ConsumerConfiguration.LONG_UNSET));
×
64
            }
65
            return this;
1✔
66
        }
67

68
        /**
69
         * Set the maximum number of messages to fetch and remove any previously set {@link #maxBytes(long)} constraint.
70
         * The number of messages fetched will also be constrained by the expiration time.
71
         * <p>Less than 1 means default of {@value BaseConsumeOptions#DEFAULT_MESSAGE_COUNT}.</p>
72
         * @param maxMessages the number of messages.
73
         * @return the builder
74
         */
75
        public Builder maxMessages(int maxMessages) {
76
            messages(maxMessages);
1✔
77
            return bytes(-1);
1✔
78
        }
79

80
        /**
81
         * Set maximum number of bytes to fetch and remove any previously set {@link #maxMessages constraint}
82
         * The number of bytes fetched will also be constrained by the expiration time.
83
         * <p>Less than 1 removes any previously set max bytes constraint.</p>
84
         * <p>It is important to set the byte size greater than your largest message payload, plus some amount
85
         * to account for overhead, otherwise the consume process will stall if there are no messages that fit the criteria.</p>
86
         * @see Message#consumeByteCount()
87
         * @param maxBytes the maximum bytes
88
         * @return the builder
89
         */
90
        public Builder maxBytes(long maxBytes) {
91
            return super.bytes(maxBytes);
1✔
92
        }
93

94
        /**
95
         * Set maximum number of bytes or messages to fetch.
96
         * The number of messages/bytes fetched will also be constrained by
97
         * whichever constraint is reached first, as well as the expiration time.
98
         * <p>Less than 1 max bytes removes any previously set max bytes constraint.</p>
99
         * <p>Less than 1 max messages removes any previously set max messages constraint.</p>
100
         * <p>It is important to set the byte size greater than your largest message payload, plus some amount
101
         * to account for overhead, otherwise the consume process will stall if there are no messages that fit the criteria.</p>
102
         * @see Message#consumeByteCount()
103
         * @param maxBytes the maximum bytes
104
         * @param maxMessages the maximum number of messages
105
         * @return the builder
106
         */
107
        public Builder max(int maxBytes, int maxMessages) {
108
            messages(maxMessages);
1✔
109
            return bytes(maxBytes);
1✔
110
        }
111

112
        @Override
113
        public Builder expiresIn(long expiresInMillis) {
114
            if (noWait && expiresInMillis < 1) {
1✔
NEW
115
                expiresIn = ConsumerConfiguration.LONG_UNSET;
×
NEW
116
                return this;
×
117
            }
118
            return super.expiresIn(expiresInMillis);
1✔
119
        }
120

121
        /**
122
         * Set no wait to true
123
         * When no wait is true, the fetch will return immediately with as many messages as are available. Between zero and the maximum configured.
124
         * @return the builder
125
         */
126
        public Builder noWait() {
127
            this.noWait = true;
1✔
128
            expiresIn = ConsumerConfiguration.LONG_UNSET;
1✔
129
            return this;
1✔
130
        }
131

132
        /**
133
         * Set no wait to true with an expiration. This is the common configuration to receive messages as soon as they arrive in the stream without excessive pulling.
134
         * When no wait is true with expire, the fetch will return immediately with as many messages as are available, but at least one message. Between one and the maximum configured.
135
         * When no message is available it will wait for new messages to arrive till it expires.
136
         * @param expiresInMillis the expiration time in milliseconds
137
         * @return the builder
138
         */
139
        public Builder noWaitExpiresIn(long expiresInMillis) {
140
            this.noWait = true;
1✔
141
            return expiresIn(expiresInMillis);
1✔
142
        }
143

144
        /**
145
         * Build the FetchConsumeOptions.
146
         * @return a FetchConsumeOptions instance
147
         */
148
        public FetchConsumeOptions build() {
149
            return new FetchConsumeOptions(this);
1✔
150
        }
151
    }
152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc