• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1959

02 May 2025 03:26PM UTC coverage: 95.638% (-0.02%) from 95.654%
#1959

push

github

web-flow
Merge pull request #1311 from nats-io/kv-ttl-more

KV LimitMarker add missing getter, additional docs and tests

14 of 15 new or added lines in 5 files covered. (93.33%)

2 existing lines in 2 files now uncovered.

11687 of 12220 relevant lines covered (95.64%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.9
/src/main/java/io/nats/client/impl/MessageQueue.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Message;
17

18
import java.time.Duration;
19
import java.util.ArrayList;
20
import java.util.concurrent.LinkedBlockingQueue;
21
import java.util.concurrent.TimeUnit;
22
import java.util.concurrent.atomic.AtomicInteger;
23
import java.util.concurrent.atomic.AtomicLong;
24
import java.util.concurrent.locks.Lock;
25
import java.util.concurrent.locks.ReentrantLock;
26
import java.util.function.Predicate;
27

28
import static io.nats.client.support.NatsConstants.EMPTY_BODY;
29
import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL;
30

31
class MessageQueue {
32
    protected static final int STOPPED = 0;
33
    protected static final int RUNNING = 1;
34
    protected static final int DRAINING = 2;
35
    protected static final String POISON = "_poison";
36

37
    protected final AtomicLong length;
38
    protected final AtomicLong sizeInBytes;
39
    protected final AtomicInteger running;
40
    protected final boolean singleReaderMode;
41
    protected final LinkedBlockingQueue<NatsMessage> queue;
42
    protected final Lock editLock;
43
    protected final int maxMessagesInOutgoingQueue;
44
    protected final boolean discardWhenFull;
45
    protected final long offerLockMillis;
46
    protected final long offerTimeoutMillis;
47
    protected final Duration requestCleanupInterval;
48

49
    // Poison pill is a graphic, but common term for an item that breaks loops or stop something.
50
    // In this class the poison pill is used to break out of timed waits on the blocking queue.
51
    // A simple == is used to check if any message in the queue is this message.
52
    // /\ /\ /\ /\ which is why it is now a static. It's just a marker anyway.
53
    protected static final NatsMessage POISON_PILL = new NatsMessage(POISON, null, EMPTY_BODY);
1✔
54

55
    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
56
        this(singleReaderMode, -1, false, requestCleanupInterval, null);
1✔
57
    }
1✔
58

59
    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
60
        this(singleReaderMode, -1, false, requestCleanupInterval, source);
1✔
61
    }
1✔
62

63
    /**
64
     * If publishHighwaterMark is set to 0 the underlying queue can grow forever (or until the max size of a linked blocking queue that is).
65
     * A value of 0 is used by readers to prevent the read thread from blocking.
66
     * If set to a number of messages, the publish command will block, which provides
67
     * backpressure on a publisher if the writer is slow to push things onto the network. Publishers use the value of Options.getMaxMessagesInOutgoingQueue().
68
     * @param singleReaderMode allows the use of "accumulate"
69
     * @param maxMessagesInOutgoingQueue sets a limit on the size of the underlying queue
70
     * @param discardWhenFull allows to discard messages when the underlying queue is full
71
     * @param requestCleanupInterval is used to figure the offerTimeoutMillis
72
     */
73
    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
74
        this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
1✔
75
    }
1✔
76

77
    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
1✔
78
        this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
1✔
79
        this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue<>(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue<>();
1✔
80
        this.discardWhenFull = discardWhenFull;
1✔
81
        this.running = new AtomicInteger(RUNNING);
1✔
82
        this.sizeInBytes = new AtomicLong(0);
1✔
83
        this.length = new AtomicLong(0);
1✔
84
        this.offerLockMillis = requestCleanupInterval.toMillis();
1✔
85
        this.offerTimeoutMillis = Math.max(1, requestCleanupInterval.toMillis() * 95 / 100);
1✔
86

87
        editLock = new ReentrantLock();
1✔
88
        
89
        this.singleReaderMode = singleReaderMode;
1✔
90
        this.requestCleanupInterval = requestCleanupInterval;
1✔
91

92
        if (source != null) {
1✔
93
            source.drainTo(this);
1✔
94
        }
95
    }
1✔
96

97
    void drainTo(MessageQueue target) {
98
        editLock.lock();
1✔
99
        try {
100
            queue.drainTo(target.queue);
1✔
101
            target.length.set(queue.size());
1✔
102
        } finally {
103
            editLock.unlock();
1✔
104
        }
105
    }
1✔
106

107
    boolean isSingleReaderMode() {
108
        return singleReaderMode;
1✔
109
    }
110

111
    boolean isRunning() {
112
        return this.running.get() != STOPPED;
1✔
113
    }
114

115
    boolean isDraining() {
116
        return this.running.get() == DRAINING;
1✔
117
    }
118

119
    void pause() {
120
        this.running.set(STOPPED);
1✔
121
        this.poisonTheQueue();
1✔
122
    }
1✔
123

124
    void resume() {
125
        this.running.set(RUNNING);
1✔
126
    }
1✔
127

128
    void drain() {
129
        this.running.set(DRAINING);
1✔
130
        this.poisonTheQueue();
1✔
131
    }
1✔
132

133
    boolean isDrained() {
134
        // poison pill is not included in the length count, or the size
135
        return this.running.get() == DRAINING && this.length() == 0;
1✔
136
    }
137

138
    boolean push(NatsMessage msg) {
139
        return push(msg, false);
1✔
140
    }
141

142
    boolean push(NatsMessage msg, boolean internal) {
143
        long start = System.currentTimeMillis();
1✔
144
        boolean lockWasSuccessful = false;
1✔
145
        try {
146
            /*
147
                This was essentially a Head-Of-Line blocking problem.
148

149
                So the crux of the problem was that many threads were waiting to push a message to the queue.
150
                They all waited for the lock and once they had the lock they waited 5 seconds (4750 millis actually)
151
                only to find out the queue was full. They released the lock, so then another thread acquired the lock,
152
                and waited 5 seconds. So instead of being parallel, all these threads had to wait in line
153
                200 * 4750 = 15.8 minutes
154

155
                So what I did was try to acquire the lock but only wait 5 seconds.
156
                If I could not acquire the lock, then I assumed that this means that we are in this exact situation,
157
                another thread can't add b/c the queue is full, and so there is no point in even trying, so just throw the queue full exception.
158

159
                If I did acquire the lock, I deducted the time spent waiting for the lock from the time allowed to try to add.
160
                I took the max of that or 100 millis to try to add to the queue.
161
                This ensures that the max total time each thread can take is 5100 millis in parallel.
162

163
                Notes: The 5 seconds and the 4750 seconds is derived from the Options requestCleanupInterval, which defaults to 5 seconds and can be modified.
164
                The 4750 is 95% of that time. The 100 ms minimum is arbitrary.
165
             */
166
            if (!editLock.tryLock(offerLockMillis, TimeUnit.MILLISECONDS)) {
1✔
UNCOV
167
                throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
×
168
            }
169

170
            lockWasSuccessful = true;
1✔
171

172
            if (!internal && this.discardWhenFull) {
1✔
173
                return this.queue.offer(msg);
1✔
174
            }
175

176
            long timeoutLeft = Math.max(100, offerTimeoutMillis - (System.currentTimeMillis() - start));
1✔
177

178
            if (!this.queue.offer(msg, timeoutLeft, TimeUnit.MILLISECONDS)) {
1✔
179
                throw new IllegalStateException(OUTPUT_QUEUE_IS_FULL + queue.size());
1✔
180
            }
181
            this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
1✔
182
            this.length.incrementAndGet();
1✔
183
            return true;
1✔
184
        }
185
        catch (InterruptedException e) {
×
186
            Thread.currentThread().interrupt();
×
187
            return false;
×
188
        }
189
        finally {
190
            if (lockWasSuccessful) {
1✔
191
                editLock.unlock();
1✔
192
            }
193
        }
194
    }
195

196
    /**
197
     * poisoning the queue puts the known poison pill into the queue, forcing any waiting code to stop
198
     * waiting and return.
199
     */
200
    void poisonTheQueue() {
201
        try {
202
            this.queue.add(POISON_PILL);
1✔
203
        } catch (IllegalStateException ie) { // queue was full, so we don't really need poison pill
1✔
204
            // ok to ignore this
205
        }
1✔
206
    }
1✔
207

208
    NatsMessage poll(Duration timeout) throws InterruptedException {
209
        NatsMessage msg = null;
1✔
210

211
        if (timeout == null || this.isDraining()) { // try immediately
1✔
212
            msg = this.queue.poll();
1✔
213
        } else {
214
            long nanos = timeout.toNanos();
1✔
215

216
            if (nanos != 0) {
1✔
217
                msg = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
1✔
218
            } else {
219
                // A value of 0 means wait forever
220
                // We will loop and wait for a LONG time
221
                // if told to suspend/drain the poison pill will break this loop
222
                while (this.isRunning()) {
1✔
223
                    msg = this.queue.poll(100, TimeUnit.DAYS);
1✔
224
                    if (msg != null) break;
1✔
225
                }
226
            }
227
        }
228

229
        return msg == null || isPoison(msg) ? null : msg;
1✔
230
    }
231

232
    private boolean isPoison(Message msg) {
233
        return msg == POISON_PILL;
1✔
234
    }
235

236
    NatsMessage pop(Duration timeout) throws InterruptedException {
237
        if (!this.isRunning()) {
1✔
238
            return null;
1✔
239
        }
240

241
        NatsMessage msg = this.poll(timeout);
1✔
242

243
        if (msg == null) {
1✔
244
            return null;
1✔
245
        }
246

247
        this.sizeInBytes.getAndAdd(-msg.getSizeInBytes());
1✔
248
        this.length.decrementAndGet();
1✔
249

250
        return msg;
1✔
251
    }
252

253
    // Waits up to the timeout to try to accumulate multiple messages
254
    // Use the next field to read the entire set accumulated.
255
    // maxSize and maxMessages are both checked and if either is exceeded
256
    // the method returns.
257
    //
258
    // A timeout of 0 will wait forever (or until the queue is stopped/drained)
259
    //
260
    // Only works in single reader mode, because we want to maintain order.
261
    // accumulate reads off the concurrent queue one at a time, so if multiple
262
    // readers are present, you could get out of order message delivery.
263
    NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
264
        throws InterruptedException {
265

266
        if (!this.singleReaderMode) {
1✔
267
            throw new IllegalStateException("Accumulate is only supported in single reader mode.");
1✔
268
        }
269

270
        if (!this.isRunning()) {
1✔
271
            return null;
1✔
272
        }
273

274
        NatsMessage msg = this.poll(timeout);
1✔
275

276
        if (msg == null) {
1✔
277
            return null;
1✔
278
        }
279

280
        long size = msg.getSizeInBytes();
1✔
281

282
        if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
1✔
283
            this.sizeInBytes.addAndGet(-size);
1✔
284
            this.length.decrementAndGet();
1✔
285
            return msg;
1✔
286
        }
287

288
        long count = 1;
1✔
289
        NatsMessage cursor = msg;
1✔
290

291
        while (true) {
292
            NatsMessage next = this.queue.peek();
1✔
293
            if (next != null && !isPoison(next)) {
1✔
294
                long s = next.getSizeInBytes();
1✔
295
                if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
1✔
296
                    size += s;
1✔
297
                    count++;
1✔
298

299
                    this.queue.poll(); // we need to get the message out of the queue b/c we only peeked
1✔
300
                    cursor.next = next;
1✔
301
                    if (next.flushImmediatelyAfterPublish) {
1✔
302
                        // if we are going to flush, then don't accumulate more
303
                        break;
1✔
304
                    }
305
                    if (count == maxMessagesToAccumulate) {
1✔
306
                        break;
1✔
307
                    }
308
                    cursor = cursor.next;
1✔
309
                } else { // One more is too far
310
                    break;
311
                }
312
            } else { // Didn't meet max condition
313
                break;
314
            }
315
        }
1✔
316

317
        this.sizeInBytes.addAndGet(-size);
1✔
318
        this.length.addAndGet(-count);
1✔
319

320
        return msg;
1✔
321
    }
322

323
    // Returns a message or null
324
    NatsMessage popNow() throws InterruptedException {
325
        return pop(null);
1✔
326
    }
327

328
    // Just for testing
329
    long length() {
330
        return this.length.get();
1✔
331
    }
332

333
    long sizeInBytes() {
334
        return this.sizeInBytes.get();
1✔
335
    }
336

337
    void filter(Predicate<NatsMessage> p) {
338
        editLock.lock();
1✔
339
        try {
340
            if (this.isRunning()) {
1✔
341
                throw new IllegalStateException("Filter is only supported when the queue is paused");
1✔
342
            }
343
            ArrayList<NatsMessage> newQueue = new ArrayList<>();
1✔
344
            NatsMessage cursor = this.queue.poll();
1✔
345
            while (cursor != null) {
1✔
346
                if (!p.test(cursor)) {
1✔
347
                    newQueue.add(cursor);
1✔
348
                } else {
349
                    this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
1✔
350
                    this.length.decrementAndGet();
1✔
351
                }
352
                cursor = this.queue.poll();
1✔
353
            }
354
            this.queue.addAll(newQueue);
1✔
355
        } finally {
356
            editLock.unlock();
1✔
357
        }
358
    }
1✔
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc