• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2202

19 Sep 2025 03:44PM UTC coverage: 95.043% (-0.03%) from 95.075%
#2202

push

github

web-flow
Merge pull request #1429 from nats-io/write-timeout-tuning

Ensure write timeout is not less than 100 nanoseconds

7 of 8 new or added lines in 3 files covered. (87.5%)

7 existing lines in 2 files now uncovered.

11964 of 12588 relevant lines covered (95.04%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.43
/src/main/java/io/nats/client/impl/NatsConsumer.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Consumer;
17
import io.nats.client.NatsSystemClock;
18

19
import java.time.Duration;
20
import java.time.Instant;
21
import java.util.concurrent.CompletableFuture;
22
import java.util.concurrent.TimeoutException;
23
import java.util.concurrent.atomic.AtomicBoolean;
24
import java.util.concurrent.atomic.AtomicLong;
25
import java.util.concurrent.atomic.AtomicReference;
26

27
abstract class NatsConsumer implements Consumer {
28

29
    NatsConnection connection;
30
    private final AtomicLong maxMessages;
31
    private final AtomicLong maxBytes;
32
    private final AtomicLong droppedMessages;
33
    private final AtomicLong messagesDelivered;
34
    private final AtomicBoolean slow;
35
    private final AtomicReference<CompletableFuture<Boolean>> drainingFuture;
36

37
    NatsConsumer(NatsConnection conn) {
1✔
38
        this.connection = conn;
1✔
39
        this.maxMessages = new AtomicLong(Consumer.DEFAULT_MAX_MESSAGES);
1✔
40
        this.maxBytes = new AtomicLong(Consumer.DEFAULT_MAX_BYTES);
1✔
41
        this.droppedMessages = new AtomicLong();
1✔
42
        this.messagesDelivered = new AtomicLong(0);
1✔
43
        this.slow = new AtomicBoolean(false);
1✔
44
        this.drainingFuture = new AtomicReference<>();
1✔
45
    }
1✔
46

47
    /**
48
     * Set limits on the maximum number of messages, or maximum size of messages
49
     * this consumer will hold before it starts to drop new messages waiting.
50
     * <p>
51
     * Messages are dropped as they encounter a full queue, which is to say, new
52
     * messages are dropped rather than old messages. If a queue is 10 deep and
53
     * fills up, the 11th message is dropped.
54
     * <p>
55
     * Any value less than or equal to zero means unlimited and will be stored as 0.
56
     * @param maxMessages the maximum message count to hold, defaults to
57
     *                    {@value #DEFAULT_MAX_MESSAGES}.
58
     * @param maxBytes    the maximum bytes to hold, defaults to
59
     *                    {@value #DEFAULT_MAX_BYTES}.
60
     */
61
    public void setPendingLimits(long maxMessages, long maxBytes) {
62
        this.maxMessages.set(maxMessages <= 0 ? 0 : maxMessages);
1✔
63
        this.maxBytes.set(maxBytes <= 0 ? 0 : maxBytes);
1✔
64
    }
1✔
65

66
    /**
67
     * @return the pending message limit set by {@link #setPendingLimits(long, long)
68
     *         setPendingLimits}.
69
     */
70
    public long getPendingMessageLimit() {
71
        return this.maxMessages.get();
1✔
72
    }
73

74
    /**
75
     * @return the pending byte limit set by {@link #setPendingLimits(long, long)
76
     *         setPendingLimits}.
77
     */
78
    public long getPendingByteLimit() {
79
        return this.maxBytes.get();
1✔
80
    }
81

82
    /**
83
     * @return the number of messages waiting to be delivered/popped,
84
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
85
     */
86
    public long getPendingMessageCount() {
87
        return this.getMessageQueue() != null ? this.getMessageQueue().length() : 0;
1✔
88
    }
89

90
    /**
91
     * @return the cumulative size of the messages waiting to be delivered/popped,
92
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
93
     */
94
    public long getPendingByteCount() {
95
        return this.getMessageQueue() != null ? this.getMessageQueue().sizeInBytes() : 0;
1✔
96
    }
97

98
    /**
99
     * @return the total number of messages delivered to this consumer, for all
100
     *         time.
101
     */
102
    public long getDeliveredCount() {
103
        return this.messagesDelivered.get();
1✔
104
    }
105

106
    void incrementDeliveredCount() {
107
        this.messagesDelivered.incrementAndGet();
1✔
108
    }
1✔
109

110
    void incrementDroppedCount() {
111
        this.droppedMessages.incrementAndGet();
1✔
112
    }
1✔
113

114
    /**
115
     * @return the number of messages dropped from this consumer, since the last
116
     *         call to {@link #clearDroppedCount}.
117
     */
118
    public long getDroppedCount() {
119
        return this.droppedMessages.get();
1✔
120
    }
121

122
    /**
123
     * Reset the drop count to 0.
124
     */
125
    public void clearDroppedCount() {
126
        this.droppedMessages.set(0);
1✔
127
    }
1✔
128

129
    void markSlow() {
130
        this.slow.set(true);
1✔
131
    }
1✔
132

133
    void markNotSlow() {
134
        this.slow.set(false);
1✔
135
    }
1✔
136

137
    boolean isMarkedSlow() {
138
        return this.slow.get();
1✔
139
    }
140

141
    boolean hasReachedPendingLimits() {
142
        long ml = maxMessages.get();
1✔
143
        if (ml > 0 && getPendingMessageCount() >= ml) {
1✔
144
            return true;
1✔
145
        }
146
        long bl = maxBytes.get();
1✔
147
        return bl > 0 && getPendingByteCount() >= bl;
1✔
148
    }
149

150
    void markDraining(CompletableFuture<Boolean> future) {
151
        this.drainingFuture.set(future);
1✔
152
    }
1✔
153

154
    void markUnsubedForDrain() {
155
        if (this.getMessageQueue() != null) {
1✔
156
            this.getMessageQueue().drain();
1✔
157
        }
158
    }
1✔
159

160
    CompletableFuture<Boolean> getDrainingFuture() {
161
        return this.drainingFuture.get();
1✔
162
    }
163

164
    boolean isDraining() {
165
        return this.drainingFuture.get() != null;
1✔
166
    }
167

168
    boolean isDrained() {
169
        return isDraining() && this.getPendingMessageCount() == 0;
1✔
170
    }
171

172
    /**
173
    * Drain tells the consumer to process in flight, or cached messages, but stop receiving new ones. The library will
174
    * flush the unsubscribe call(s) insuring that any publish calls made by this client are included. When all messages
175
    * are processed the consumer effectively becomes unsubscribed.
176
    * 
177
    * @param timeout The time to wait for the drain to succeed, pass 0 to wait
178
    *                    forever. Drain involves moving messages to and from the server
179
    *                    so a very short timeout is not recommended.
180
    * @return A future that can be used to check if the drain has completed
181
    * @throws InterruptedException if the thread is interrupted
182
    */
183
   public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException {
184
       if (!this.isActive() || this.connection==null) {
1✔
UNCOV
185
           throw new IllegalStateException("Consumer is closed");
×
186
       }
187

188
       if (isDraining()) {
1✔
189
           return this.getDrainingFuture();
1✔
190
       }
191

192
       Instant start = Instant.now();
1✔
193
       final CompletableFuture<Boolean> tracker = new CompletableFuture<>();
1✔
194
       this.markDraining(tracker);
1✔
195
       this.sendUnsubForDrain();
1✔
196

197
       try {
198
            this.connection.flush(timeout); // Flush and wait up to the timeout
1✔
199
       } catch (TimeoutException e) {
×
200
           this.connection.processException(e);
×
201
       }
1✔
202

203
       this.markUnsubedForDrain();
1✔
204

205
        // Wait for the timeout or consumer is drained
206
        // Skipped if conn is draining
207
        connection.getExecutor().submit(() -> {
1✔
208
            try {
209
                long timeoutNanos = (timeout == null || timeout.toNanos() <= 0)
1✔
210
                    ? Long.MAX_VALUE : timeout.toNanos();
1✔
211
                long startTime = System.nanoTime();
1✔
212
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
1✔
213
                    if (this.isDrained()) {
1✔
214
                        break;
1✔
215
                    }
216
                    //noinspection BusyWait
217
                    Thread.sleep(1); // Sleep 1 milli
1✔
218
                }
219

220
                this.cleanUpAfterDrain();
1✔
221
            } catch (InterruptedException e) {
×
222
                this.connection.processException(e);
×
223
                Thread.currentThread().interrupt();
×
224
            } finally {
225
                tracker.complete(this.isDrained());
1✔
226
            }
227
       });
1✔
228

229
       return getDrainingFuture();
1✔
230
   }
231

232
    /**
233
     * @return whether this consumer is still processing messages. For a
234
     *         subscription the answer is false after unsubscribe. For a dispatcher,
235
     *         false after stop.
236
     */
237
    public abstract boolean isActive();
238

239
    abstract MessageQueue getMessageQueue();
240

241
    /**
242
     * Called during drain to tell the consumer to send appropriate unsub requests
243
     * to the connection.
244
     * 
245
     * A subscription will unsub itself, while a dispatcher will unsub all of its
246
     * subscriptions.
247
     */
248
    abstract void sendUnsubForDrain();
249

250
    /**
251
     * Abstract method, called by the connection when the drain is complete.
252
     */
253
    abstract void cleanUpAfterDrain();
254
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc