• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1976

22 May 2025 08:03PM UTC coverage: 95.679% (-0.008%) from 95.687%
#1976

push

github

web-flow
Merge pull request #1318 from nats-io/doc-cleanup

Update repository info and cleanup readme.

11692 of 12220 relevant lines covered (95.68%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/src/main/java/io/nats/client/impl/NatsConsumer.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Consumer;
17

18
import java.time.Duration;
19
import java.time.Instant;
20
import java.util.concurrent.CompletableFuture;
21
import java.util.concurrent.TimeoutException;
22
import java.util.concurrent.atomic.AtomicBoolean;
23
import java.util.concurrent.atomic.AtomicLong;
24
import java.util.concurrent.atomic.AtomicReference;
25

26
abstract class NatsConsumer implements Consumer {
27

28
    NatsConnection connection;
29
    private final AtomicLong maxMessages;
30
    private final AtomicLong maxBytes;
31
    private final AtomicLong droppedMessages;
32
    private final AtomicLong messagesDelivered;
33
    private final AtomicBoolean slow;
34
    private final AtomicReference<CompletableFuture<Boolean>> drainingFuture;
35

36
    NatsConsumer(NatsConnection conn) {
1✔
37
        this.connection = conn;
1✔
38
        this.maxMessages = new AtomicLong(Consumer.DEFAULT_MAX_MESSAGES);
1✔
39
        this.maxBytes = new AtomicLong(Consumer.DEFAULT_MAX_BYTES);
1✔
40
        this.droppedMessages = new AtomicLong();
1✔
41
        this.messagesDelivered = new AtomicLong(0);
1✔
42
        this.slow = new AtomicBoolean(false);
1✔
43
        this.drainingFuture = new AtomicReference<>();
1✔
44
    }
1✔
45

46
    /**
47
     * Set limits on the maximum number of messages, or maximum size of messages
48
     * this consumer will hold before it starts to drop new messages waiting.
49
     * <p>
50
     * Messages are dropped as they encounter a full queue, which is to say, new
51
     * messages are dropped rather than old messages. If a queue is 10 deep and
52
     * fills up, the 11th message is dropped.
53
     * <p>
54
     * Any value less than or equal to zero means unlimited and will be stored as 0.
55
     * @param maxMessages the maximum message count to hold, defaults to
56
     *                    {@value #DEFAULT_MAX_MESSAGES}.
57
     * @param maxBytes    the maximum bytes to hold, defaults to
58
     *                    {@value #DEFAULT_MAX_BYTES}.
59
     */
60
    public void setPendingLimits(long maxMessages, long maxBytes) {
61
        this.maxMessages.set(maxMessages <= 0 ? 0 : maxMessages);
1✔
62
        this.maxBytes.set(maxBytes <= 0 ? 0 : maxBytes);
1✔
63
    }
1✔
64

65
    /**
66
     * @return the pending message limit set by {@link #setPendingLimits(long, long)
67
     *         setPendingLimits}.
68
     */
69
    public long getPendingMessageLimit() {
70
        return this.maxMessages.get();
1✔
71
    }
72

73
    /**
74
     * @return the pending byte limit set by {@link #setPendingLimits(long, long)
75
     *         setPendingLimits}.
76
     */
77
    public long getPendingByteLimit() {
78
        return this.maxBytes.get();
1✔
79
    }
80

81
    /**
82
     * @return the number of messages waiting to be delivered/popped,
83
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
84
     */
85
    public long getPendingMessageCount() {
86
        return this.getMessageQueue() != null ? this.getMessageQueue().length() : 0;
1✔
87
    }
88

89
    /**
90
     * @return the cumulative size of the messages waiting to be delivered/popped,
91
     *         {@link #setPendingLimits(long, long) setPendingLimits}.
92
     */
93
    public long getPendingByteCount() {
94
        return this.getMessageQueue() != null ? this.getMessageQueue().sizeInBytes() : 0;
1✔
95
    }
96

97
    /**
98
     * @return the total number of messages delivered to this consumer, for all
99
     *         time.
100
     */
101
    public long getDeliveredCount() {
102
        return this.messagesDelivered.get();
1✔
103
    }
104

105
    void incrementDeliveredCount() {
106
        this.messagesDelivered.incrementAndGet();
1✔
107
    }
1✔
108

109
    void incrementDroppedCount() {
110
        this.droppedMessages.incrementAndGet();
1✔
111
    }
1✔
112

113
    /**
114
     * @return the number of messages dropped from this consumer, since the last
115
     *         call to {@link #clearDroppedCount}.
116
     */
117
    public long getDroppedCount() {
118
        return this.droppedMessages.get();
1✔
119
    }
120

121
    /**
122
     * Reset the drop count to 0.
123
     */
124
    public void clearDroppedCount() {
125
        this.droppedMessages.set(0);
1✔
126
    }
1✔
127

128
    void markSlow() {
129
        this.slow.set(true);
1✔
130
    }
1✔
131

132
    void markNotSlow() {
133
        this.slow.set(false);
1✔
134
    }
1✔
135

136
    boolean isMarkedSlow() {
137
        return this.slow.get();
1✔
138
    }
139

140
    boolean hasReachedPendingLimits() {
141
        long ml = maxMessages.get();
1✔
142
        if (ml > 0 && getPendingMessageCount() >= ml) {
1✔
143
            return true;
1✔
144
        }
145
        long bl = maxBytes.get();
1✔
146
        return bl > 0 && getPendingByteCount() >= bl;
1✔
147
    }
148

149
    void markDraining(CompletableFuture<Boolean> future) {
150
        this.drainingFuture.set(future);
1✔
151
    }
1✔
152

153
    void markUnsubedForDrain() {
154
        if (this.getMessageQueue() != null) {
1✔
155
            this.getMessageQueue().drain();
1✔
156
        }
157
    }
1✔
158

159
    CompletableFuture<Boolean> getDrainingFuture() {
160
        return this.drainingFuture.get();
1✔
161
    }
162

163
    boolean isDraining() {
164
        return this.drainingFuture.get() != null;
1✔
165
    }
166

167
    boolean isDrained() {
168
        return isDraining() && this.getPendingMessageCount() == 0;
1✔
169
    }
170

171
    /**
172
    * Drain tells the consumer to process in flight, or cached messages, but stop receiving new ones. The library will
173
    * flush the unsubscribe call(s) insuring that any publish calls made by this client are included. When all messages
174
    * are processed the consumer effectively becomes unsubscribed.
175
    * 
176
    * @param timeout The time to wait for the drain to succeed, pass 0 to wait
177
    *                    forever. Drain involves moving messages to and from the server
178
    *                    so a very short timeout is not recommended.
179
    * @return A future that can be used to check if the drain has completed
180
    * @throws InterruptedException if the thread is interrupted
181
    */
182
   public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException {
183
       if (!this.isActive() || this.connection==null) {
1✔
184
           throw new IllegalStateException("Consumer is closed");
×
185
       }
186

187
       if (isDraining()) {
1✔
188
           return this.getDrainingFuture();
1✔
189
       }
190

191
       Instant start = Instant.now();
1✔
192
       final CompletableFuture<Boolean> tracker = new CompletableFuture<>();
1✔
193
       this.markDraining(tracker);
1✔
194
       this.sendUnsubForDrain();
1✔
195

196
       try {
197
            this.connection.flush(timeout); // Flush and wait up to the timeout
1✔
198
       } catch (TimeoutException e) {
×
199
           this.connection.processException(e);
×
200
       }
1✔
201

202
       this.markUnsubedForDrain();
1✔
203

204
        // Wait for the timeout or the pending count to go to 0, skipped if conn is
205
        // draining
206
        connection.getExecutor().submit(() -> {
1✔
207
            try {
208
                long stop = (timeout == null || timeout.equals(Duration.ZERO))
1✔
209
                    ? Long.MAX_VALUE
210
                    : System.nanoTime() + timeout.toNanos();
1✔
211
                while (System.nanoTime() < stop && !Thread.interrupted()) {
1✔
212
                    if (this.isDrained()) {
1✔
213
                        break;
1✔
214
                    }
215
                    //noinspection BusyWait
216
                    Thread.sleep(1); // Sleep 1 milli
1✔
217
                }
218

219
                this.cleanUpAfterDrain();
1✔
220
            } catch (InterruptedException e) {
×
221
                this.connection.processException(e);
×
222
                Thread.currentThread().interrupt();
×
223
            } finally {
224
                tracker.complete(this.isDrained());
1✔
225
            }
226
       });
1✔
227

228
       return getDrainingFuture();
1✔
229
   }
230

231
    /**
232
     * @return whether this consumer is still processing messages. For a
233
     *         subscription the answer is false after unsubscribe. For a dispatcher,
234
     *         false after stop.
235
     */
236
    public abstract boolean isActive();
237

238
    abstract MessageQueue getMessageQueue();
239

240
    /**
241
     * Called during drain to tell the consumer to send appropriate unsub requests
242
     * to the connection.
243
     * 
244
     * A subscription will unsub itself, while a dispatcher will unsub all of its
245
     * subscriptions.
246
     */
247
    abstract void sendUnsubForDrain();
248

249
    /**
250
     * Abstract method, called by the connection when the drain is complete.
251
     */
252
    abstract void cleanUpAfterDrain();
253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc