• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2213

24 Sep 2025 04:04PM UTC coverage: 95.586% (-0.06%) from 95.642%
#2213

push

github

web-flow
Merge pull request #1431 from nats-io/ErrorListener

Exposed StreamName

0 of 1 new or added line in 1 file covered. (0.0%)

9 existing lines in 4 files now uncovered.

12085 of 12643 relevant lines covered (95.59%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.9
/src/main/java/io/nats/client/impl/MessageQueue.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17
import io.nats.client.NatsSystemClock;
18

19
import java.time.Duration;
20
import java.util.ArrayList;
21
import java.util.concurrent.LinkedBlockingQueue;
22
import java.util.concurrent.TimeUnit;
23
import java.util.concurrent.atomic.AtomicInteger;
24
import java.util.concurrent.atomic.AtomicLong;
25
import java.util.concurrent.locks.Lock;
26
import java.util.concurrent.locks.ReentrantLock;
27
import java.util.function.Predicate;
28

29
import static io.nats.client.support.NatsConstants.*;
30

31
class MessageQueue {
32
    protected static final int STOPPED = 0;
33
    protected static final int RUNNING = 1;
34
    protected static final int DRAINING = 2;
35
    protected static final String POISON = "_poison";
36
    protected static final long MIN_OFFER_TIMEOUT_NANOS = 100 * NANOS_PER_MILLI;
37

38
    protected final AtomicLong length;
39
    protected final AtomicLong sizeInBytes;
40
    protected final AtomicInteger running;
41
    protected final boolean singleReaderMode;
42
    protected final LinkedBlockingQueue<NatsMessage> queue;
43
    protected final Lock editLock;
44
    protected final int maxMessagesInOutgoingQueue;
45
    protected final boolean discardWhenFull;
46
    protected final long offerLockNanos;
47
    protected final long offerTimeoutNanos;
48
    protected final Duration requestCleanupInterval;
49

50
    // Poison pill is a graphic, but common term for an item that breaks loops or stop something.
51
    // In this class the poison pill is used to break out of timed waits on the blocking queue.
52
    // A simple == is used to check if any message in the queue is this message.
53
    // /\ /\ /\ /\ which is why it is now a static. It's just a marker anyway.
54
    protected static final NatsMessage POISON_PILL = new NatsMessage(POISON, null, EMPTY_BODY);
1✔
55

56
    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
57
        this(singleReaderMode, -1, false, requestCleanupInterval, null);
1✔
58
    }
1✔
59

60
    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
61
        this(singleReaderMode, -1, false, requestCleanupInterval, source);
1✔
62
    }
1✔
63

64
    /**
65
     * If publishHighwaterMark is set to 0 the underlying queue can grow forever (or until the max size of a linked blocking queue that is).
66
     * A value of 0 is used by readers to prevent the read thread from blocking.
67
     * If set to a number of messages, the publish command will block, which provides
68
     * backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue().
69
     * @param singleReaderMode allows the use of "accumulate"
70
     * @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue
71
     * @param discardWhenFull allows to discard messages when the underlying queue is full
72
     * @param requestCleanupInterval is used to figure the offer timeout
73
     */
74
    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
75
        this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
1✔
76
    }
1✔
77

78
    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
1✔
79
        this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
1✔
80
        this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>();
1✔
81
        this.discardWhenFull = discardWhenFull;
1✔
82
        this.running = new AtomicInteger(RUNNING);
1✔
83
        this.sizeInBytes = new AtomicLong(0);
1✔
84
        this.length = new AtomicLong(0);
1✔
85
        this.offerLockNanos = requestCleanupInterval.toNanos();
1✔
86
        this.offerTimeoutNanos = Math.max(MIN_OFFER_TIMEOUT_NANOS, requestCleanupInterval.toMillis() * NANOS_PER_MILLI * 95 / 100) ;
1✔
87

88
        editLock = new ReentrantLock();
1✔
89
        
90
        this.singleReaderMode = singleReaderMode;
1✔
91
        this.requestCleanupInterval = requestCleanupInterval;
1✔
92

93
        if (source != null) {
1✔
94
            source.drainTo(this);
1✔
95
        }
96
    }
1✔
97

98
    void drainTo(MessageQueue target) {
99
        editLock.lock();
1✔
100
        try {
101
            queue.drainTo(target.queue);
1✔
102
            target.length.set(queue.size());
1✔
103
        } finally {
104
            editLock.unlock();
1✔
105
        }
106
    }
1✔
107

108
    boolean isSingleReaderMode() {
109
        return singleReaderMode;
1✔
110
    }
111

112
    boolean isRunning() {
113
        return this.running.get() != STOPPED;
1✔
114
    }
115

116
    boolean isDraining() {
117
        return this.running.get() == DRAINING;
1✔
118
    }
119

120
    void pause() {
121
        this.running.set(STOPPED);
1✔
122
        this.poisonTheQueue();
1✔
123
    }
1✔
124

125
    void resume() {
126
        this.running.set(RUNNING);
1✔
127
    }
1✔
128

129
    void drain() {
130
        this.running.set(DRAINING);
1✔
131
        this.poisonTheQueue();
1✔
132
    }
1✔
133

134
    boolean isDrained() {
135
        // poison pill is not included in the length count, or the size
136
        return this.running.get() == DRAINING && this.length.get() == 0;
1✔
137
    }
138

139
    boolean push(NatsMessage msg) {
140
        return push(msg, false);
1✔
141
    }
142

143
    boolean push(NatsMessage msg, boolean internal) {
144
        boolean lockWasSuccessful = false;
1✔
145
        try {
146
            long startNanos = NatsSystemClock.nanoTime();
1✔
147
            /*
148
                This was essentially a Head-Of-Line blocking problem.
149

150
                So the crux of the problem was that many threads were waiting to push a message to the queue.
151
                They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually)
152
                only to find out the queue was full. They released the lock, so then another thread acquired the lock,
153
                and waited 5 seconds. So instead of being parallel, all these threads had to wait in line
154
                200 * 4750 = 15.8 minutes
155

156
                So what I did was try to acquire the lock but only wait 5 seconds.
157
                If I could not acquire the lock, then I assumed that this means that we are in this exact situation,
158
                another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception.
159

160
                If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add.
161
                I took the max of that or 100 millis to try to add to the queue.
162
                This ensures that the max total time each thread can take is 5100 millis in parallel.
163

164
                Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified.
165
                The 4750 is 95% of that time. The MIN_OFFER_TIMEOUT_NANOS 100 ms minimum is arbitrary.
166
             */
167
            if (!editLock.tryLock(offerLockNanos, TimeUnit.NANOSECONDS)) {
1✔
UNCOV
168
                throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
×
169
            }
170

171
            lockWasSuccessful = true;
1✔
172

173
            if (!internal && this.discardWhenFull) {
1✔
174
                return this.queue.offer(msg);
1✔
175
            }
176

177
            long timeoutNanosLeft = Math.max(MIN_OFFER_TIMEOUT_NANOS, offerTimeoutNanos - (NatsSystemClock.nanoTime() - startNanos));
1✔
178

179
            if (!this.queue.offer(msg, timeoutNanosLeft, TimeUnit.NANOSECONDS)) {
1✔
180
                throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
1✔
181
            }
182
            this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
1✔
183
            this.length.incrementAndGet();
1✔
184
            return true;
1✔
185
        }
186
        catch (InterruptedException e) {
×
187
            Thread.currentThread().interrupt();
×
188
            return false;
×
189
        }
190
        finally {
191
            if (lockWasSuccessful) {
1✔
192
                editLock.unlock();
1✔
193
            }
194
        }
195
    }
196

197
    /**
198
     * poisoning the queue puts the known poison pill into the queue, forcing any waiting code to stop
199
     * waiting and return.
200
     */
201
    void poisonTheQueue() {
202
        try {
203
            this.queue.add(POISON_PILL);
1✔
204
        } catch (IllegalStateException ie) { // queue was full, so we don't really need poison pill
1✔
205
            // ok to ignore this
206
        }
1✔
207
    }
1✔
208

209
    NatsMessage poll(Duration timeout) throws InterruptedException {
210
        NatsMessage msg = null;
1✔
211

212
        if (timeout == null || this.isDraining()) { // try immediately
1✔
213
            msg = this.queue.poll();
1✔
214
        } else {
215
            long nanos = timeout.toNanos();
1✔
216

217
            if (nanos != 0) {
1✔
218
                msg = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
1✔
219
            } else {
220
                // A value of 0 means wait forever
221
                // We will loop and wait for a LONG time
222
                // if told to suspend/drain the poison pill will break this loop
223
                while (this.isRunning()) {
1✔
224
                    msg = this.queue.poll(100, TimeUnit.DAYS);
1✔
225
                    if (msg != null) break;
1✔
226
                }
227
            }
228
        }
229

230
        return msg == null || isPoison(msg) ? null : msg;
1✔
231
    }
232

233
    private boolean isPoison(Message msg) {
234
        return msg == POISON_PILL;
1✔
235
    }
236

237
    NatsMessage pop(Duration timeout) throws InterruptedException {
238
        if (!this.isRunning()) {
1✔
239
            return null;
1✔
240
        }
241

242
        NatsMessage msg = this.poll(timeout);
1✔
243

244
        if (msg == null) {
1✔
245
            return null;
1✔
246
        }
247

248
        this.sizeInBytes.getAndAdd(-msg.getSizeInBytes());
1✔
249
        this.length.decrementAndGet();
1✔
250

251
        return msg;
1✔
252
    }
253

254
    // Waits up to the timeout to try to accumulate multiple messages
255
    // Use the next field to read the entire set accumulated.
256
    // maxSize and maxMessages are both checked and if either is exceeded
257
    // the method returns.
258
    //
259
    // A timeout of 0 will wait forever (or until the queue is stopped/drained)
260
    //
261
    // Only works in single reader mode, because we want to maintain order.
262
    // accumulate reads off the concurrent queue one at a time, so if multiple
263
    // readers are present, you could get out of order message delivery.
264
    NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
265
        throws InterruptedException {
266

267
        if (!this.singleReaderMode) {
1✔
268
            throw new IllegalStateException("Accumulate is only supported in single reader mode.");
1✔
269
        }
270

271
        if (!this.isRunning()) {
1✔
272
            return null;
1✔
273
        }
274

275
        NatsMessage msg = this.poll(timeout);
1✔
276

277
        if (msg == null) {
1✔
278
            return null;
1✔
279
        }
280

281
        long size = msg.getSizeInBytes();
1✔
282

283
        if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
1✔
284
            this.sizeInBytes.addAndGet(-size);
1✔
285
            this.length.decrementAndGet();
1✔
286
            return msg;
1✔
287
        }
288

289
        long count = 1;
1✔
290
        NatsMessage cursor = msg;
1✔
291

292
        while (true) {
293
            NatsMessage next = this.queue.peek();
1✔
294
            if (next != null && !isPoison(next)) {
1✔
295
                long s = next.getSizeInBytes();
1✔
296
                if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
1✔
297
                    size += s;
1✔
298
                    count++;
1✔
299

300
                    this.queue.poll(); // we need to get the message out of the queue b/c we only peeked
1✔
301
                    cursor.next = next;
1✔
302
                    if (next.flushImmediatelyAfterPublish) {
1✔
303
                        // if we are going to flush, then don't accumulate more
304
                        break;
1✔
305
                    }
306
                    if (count == maxMessagesToAccumulate) {
1✔
307
                        break;
1✔
308
                    }
309
                    cursor = cursor.next;
1✔
310
                } else { // One more is too far
311
                    break;
312
                }
313
            } else { // Didn't meet max condition
314
                break;
315
            }
316
        }
1✔
317

318
        this.sizeInBytes.addAndGet(-size);
1✔
319
        this.length.addAndGet(-count);
1✔
320

321
        return msg;
1✔
322
    }
323

324
    // Returns a message or null
325
    NatsMessage popNow() throws InterruptedException {
326
        return pop(null);
1✔
327
    }
328

329
    long length() {
330
        return this.length.get();
1✔
331
    }
332

333
    long sizeInBytes() {
334
        return this.sizeInBytes.get();
1✔
335
    }
336

337
    void filter(Predicate<NatsMessage> p) {
338
        editLock.lock();
1✔
339
        try {
340
            if (this.isRunning()) {
1✔
341
                throw new IllegalStateException("Filter is only supported when the queue is paused");
1✔
342
            }
343
            ArrayList<NatsMessage> newQueue = new ArrayList<>();
1✔
344
            NatsMessage cursor = this.queue.poll();
1✔
345
            while (cursor != null) {
1✔
346
                if (!p.test(cursor)) {
1✔
347
                    newQueue.add(cursor);
1✔
348
                } else {
349
                    this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
1✔
350
                    this.length.decrementAndGet();
1✔
351
                }
352
                cursor = this.queue.poll();
1✔
353
            }
354
            this.queue.addAll(newQueue);
1✔
355
        } finally {
356
            editLock.unlock();
1✔
357
        }
358
    }
1✔
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc