• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2224

25 Sep 2025 03:10PM UTC coverage: 95.538% (+0.02%) from 95.522%
#2224

push

github

web-flow
Merge pull request #1434 from nats-io/start-2-23-0

Start 2.23.0

12139 of 12706 relevant lines covered (95.54%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/src/main/java/io/nats/client/impl/NatsConsumer.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Consumer;
17
import io.nats.client.NatsSystemClock;
18

19
import java.time.Duration;
20
import java.util.concurrent.CompletableFuture;
21
import java.util.concurrent.TimeoutException;
22
import java.util.concurrent.atomic.AtomicBoolean;
23
import java.util.concurrent.atomic.AtomicLong;
24
import java.util.concurrent.atomic.AtomicReference;
25

26
abstract class NatsConsumer implements Consumer {
27

28
    NatsConnection connection;
29
    private final AtomicLong maxMessages;
30
    private final AtomicLong maxBytes;
31
    private final AtomicLong droppedMessages;
32
    private final AtomicLong messagesDelivered;
33
    private final AtomicBoolean slow;
34
    private final AtomicReference<CompletableFuture<Boolean>> drainingFuture;
35

36
    NatsConsumer(NatsConnection conn) {
1✔
37
        this.connection = conn;
1✔
38
        this.maxMessages = new AtomicLong(Consumer.DEFAULT_MAX_MESSAGES);
1✔
39
        this.maxBytes = new AtomicLong(Consumer.DEFAULT_MAX_BYTES);
1✔
40
        this.droppedMessages = new AtomicLong();
1✔
41
        this.messagesDelivered = new AtomicLong(0);
1✔
42
        this.slow = new AtomicBoolean(false);
1✔
43
        this.drainingFuture = new AtomicReference<>();
1✔
44
    }
1✔
45

46
    /**
47
     * Set limits on the maximum number of messages, or maximum size of messages
48
     * this consumer will hold before it starts to drop new messages waiting.
49
     * <p>
50
     * Messages are dropped as they encounter a full queue, which is to say, new
51
     * messages are dropped rather than old messages. If a queue is 10 deep and
52
     * fills up, the 11th message is dropped.
53
     * <p>
54
     * Any value less than or equal to zero means unlimited and will be stored as 0.
55
     * @param maxMessages the maximum message count to hold, defaults to
56
     *                    {@value #DEFAULT_MAX_MESSAGES}.
57
     * @param maxBytes    the maximum bytes to hold, defaults to
58
     *                    {@value #DEFAULT_MAX_BYTES}.
59
     */
60
    public void setPendingLimits(long maxMessages, long maxBytes) {
61
        this.maxMessages.set(maxMessages <= 0 ? 0 : maxMessages);
1✔
62
        this.maxBytes.set(maxBytes <= 0 ? 0 : maxBytes);
1✔
63
    }
1✔
64

65
    /**
66
     * @return the pending message limit set by {@link #setPendingLimits(long, long)
67
     *         setPendingLimits}.
68
     */
69
    public long getPendingMessageLimit() {
70
        return this.maxMessages.get();
1✔
71
    }
72

73
    /**
74
     * @return the pending byte limit set by {@link #setPendingLimits(long, long)
75
     *         setPendingLimits}.
76
     */
77
    public long getPendingByteLimit() {
78
        return this.maxBytes.get();
1✔
79
    }
80

81
    /**
82
     * @return the number of messages waiting to be delivered/popped,
83
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
84
     */
85
    public long getPendingMessageCount() {
86
        return this.getMessageQueue() != null ? this.getMessageQueue().length() : 0;
1✔
87
    }
88

89
    /**
90
     * @return the cumulative size of the messages waiting to be delivered/popped,
91
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
92
     */
93
    public long getPendingByteCount() {
94
        return this.getMessageQueue() != null ? this.getMessageQueue().sizeInBytes() : 0;
1✔
95
    }
96

97
    /**
98
     * @return the total number of messages delivered to this consumer, for all
99
     *         time.
100
     */
101
    public long getDeliveredCount() {
102
        return this.messagesDelivered.get();
1✔
103
    }
104

105
    void incrementDeliveredCount() {
106
        this.messagesDelivered.incrementAndGet();
1✔
107
    }
1✔
108

109
    void incrementDroppedCount() {
110
        this.droppedMessages.incrementAndGet();
1✔
111
    }
1✔
112

113
    /**
114
     * @return the number of messages dropped from this consumer, since the last
115
     *         call to {@link #clearDroppedCount}.
116
     */
117
    public long getDroppedCount() {
118
        return this.droppedMessages.get();
1✔
119
    }
120

121
    /**
122
     * Reset the drop count to 0.
123
     */
124
    public void clearDroppedCount() {
125
        this.droppedMessages.set(0);
1✔
126
    }
1✔
127

128
    void markSlow() {
129
        this.slow.set(true);
1✔
130
    }
1✔
131

132
    void markNotSlow() {
133
        this.slow.set(false);
1✔
134
    }
1✔
135

136
    boolean isMarkedSlow() {
137
        return this.slow.get();
1✔
138
    }
139

140
    boolean hasReachedPendingLimits() {
141
        long ml = maxMessages.get();
1✔
142
        if (ml > 0 && getPendingMessageCount() >= ml) {
1✔
143
            return true;
1✔
144
        }
145
        long bl = maxBytes.get();
1✔
146
        return bl > 0 && getPendingByteCount() >= bl;
1✔
147
    }
148

149
    void markDraining(CompletableFuture<Boolean> future) {
150
        this.drainingFuture.set(future);
1✔
151
    }
1✔
152

153
    void markUnsubedForDrain() {
154
        if (this.getMessageQueue() != null) {
1✔
155
            this.getMessageQueue().drain();
1✔
156
        }
157
    }
1✔
158

159
    CompletableFuture<Boolean> getDrainingFuture() {
160
        return this.drainingFuture.get();
1✔
161
    }
162

163
    boolean isDraining() {
164
        return this.drainingFuture.get() != null;
1✔
165
    }
166

167
    boolean isDrained() {
168
        return isDraining() && this.getPendingMessageCount() == 0;
1✔
169
    }
170

171
    /**
172
    * Drain tells the consumer to process in flight, or cached messages, but stop receiving new ones. The library will
173
    * flush the unsubscribe call(s) insuring that any publish calls made by this client are included. When all messages
174
    * are processed the consumer effectively becomes unsubscribed.
175
    * 
176
    * @param timeout The time to wait for the drain to succeed, pass 0 to wait
177
    *                    forever. Drain involves moving messages to and from the server
178
    *                    so a very short timeout is not recommended.
179
    * @return A future that can be used to check if the drain has completed
180
    * @throws InterruptedException if the thread is interrupted
181
    */
182
   public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException {
183
       if (!this.isActive() || this.connection==null) {
1✔
184
           throw new IllegalStateException("Consumer is closed");
×
185
       }
186

187
       if (isDraining()) {
1✔
188
           return this.getDrainingFuture();
1✔
189
       }
190

191
       final CompletableFuture<Boolean> tracker = new CompletableFuture<>();
1✔
192
       this.markDraining(tracker);
1✔
193
       this.sendUnsubForDrain();
1✔
194

195
       try {
196
            this.connection.flush(timeout); // Flush and wait up to the timeout
1✔
197
       } catch (TimeoutException e) {
×
198
           this.connection.processException(e);
×
199
       }
1✔
200

201
       this.markUnsubedForDrain();
1✔
202

203
        // Wait for the timeout or consumer is drained
204
        // Skipped if conn is draining
205
        connection.getExecutor().submit(() -> {
1✔
206
            try {
207
                long timeoutNanos = (timeout == null || timeout.toNanos() <= 0)
1✔
208
                    ? Long.MAX_VALUE : timeout.toNanos();
1✔
209
                long startTime = System.nanoTime();
1✔
210
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted()) {
1✔
211
                    if (this.isDrained()) {
1✔
212
                        break;
1✔
213
                    }
214
                    //noinspection BusyWait
215
                    Thread.sleep(1); // Sleep 1 milli
1✔
216
                }
217

218
                this.cleanUpAfterDrain();
1✔
219
            } catch (InterruptedException e) {
×
220
                this.connection.processException(e);
×
221
                Thread.currentThread().interrupt();
×
222
            } finally {
223
                tracker.complete(this.isDrained());
1✔
224
            }
225
       });
1✔
226

227
       return getDrainingFuture();
1✔
228
   }
229

230
    /**
231
     * @return whether this consumer is still processing messages. For a
232
     *         subscription the answer is false after unsubscribe. For a dispatcher,
233
     *         false after stop.
234
     */
235
    public abstract boolean isActive();
236

237
    abstract MessageQueue getMessageQueue();
238

239
    /**
240
     * Called during drain to tell the consumer to send appropriate unsub requests
241
     * to the connection.
242
     * 
243
     * A subscription will unsub itself, while a dispatcher will unsub all of its
244
     * subscriptions.
245
     */
246
    abstract void sendUnsubForDrain();
247

248
    /**
249
     * Abstract method, called by the connection when the drain is complete.
250
     */
251
    abstract void cleanUpAfterDrain();
252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc