• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2170

09 Sep 2025 12:23PM UTC coverage: 95.143% (-0.3%) from 95.433%
#2170

push

github

web-flow
Merge pull request #1415 from nats-io/stats-options-review

Statistics classes improvements

3 of 40 new or added lines in 3 files covered. (7.5%)

4 existing lines in 2 files now uncovered.

11970 of 12581 relevant lines covered (95.14%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.64
/src/main/java/io/nats/client/impl/NatsConnectionWriter.java
1

2
// Copyright 2015-2018 The NATS Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at:
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package io.nats.client.impl;
16

17
import io.nats.client.Options;
18
import io.nats.client.StatisticsCollector;
19
import io.nats.client.support.ByteArrayBuilder;
20

21
import java.io.IOException;
22
import java.nio.BufferOverflowException;
23
import java.time.Duration;
24
import java.util.concurrent.CancellationException;
25
import java.util.concurrent.CompletableFuture;
26
import java.util.concurrent.ExecutionException;
27
import java.util.concurrent.Future;
28
import java.util.concurrent.atomic.AtomicBoolean;
29
import java.util.concurrent.atomic.AtomicInteger;
30
import java.util.concurrent.locks.ReentrantLock;
31

32
import static io.nats.client.support.BuilderBase.bufferAllocSize;
33
import static io.nats.client.support.NatsConstants.CR;
34
import static io.nats.client.support.NatsConstants.LF;
35

36
class NatsConnectionWriter implements Runnable {
37
    private static final int BUFFER_BLOCK_SIZE = 256;
38

39
    private final NatsConnection connection;
40

41
    private final ReentrantLock writerLock;
42
    private Future<Boolean> stopped;
43
    private Future<DataPort> dataPortFuture;
44
    private DataPort dataPort;
45
    private final AtomicBoolean running;
46
    private final AtomicBoolean reconnectMode;
47
    private final ReentrantLock startStopLock;
48

49
    private byte[] sendBuffer;
50
    private final AtomicInteger sendBufferLength;
51

52
    private final MessageQueue outgoing;
53
    private final MessageQueue reconnectOutgoing;
54
    private final long reconnectBufferSize;
55

56
    NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) {
1✔
57
        this.connection = connection;
1✔
58
        writerLock = new ReentrantLock();
1✔
59

60
        this.running = new AtomicBoolean(false);
1✔
61
        this.reconnectMode = new AtomicBoolean(sourceWriter != null);
1✔
62
        this.startStopLock = new ReentrantLock();
1✔
63
        this.stopped = new CompletableFuture<>();
1✔
64
        ((CompletableFuture<Boolean>)this.stopped).complete(Boolean.TRUE); // we are stopped on creation
1✔
65

66
        Options options = connection.getOptions();
1✔
67
        int sbl = bufferAllocSize(options.getBufferSize(), BUFFER_BLOCK_SIZE);
1✔
68
        sendBufferLength = new AtomicInteger(sbl);
1✔
69
        sendBuffer = new byte[sbl];
1✔
70

71
        outgoing = new MessageQueue(true,
1✔
72
            options.getMaxMessagesInOutgoingQueue(),
1✔
73
            options.isDiscardMessagesWhenOutgoingQueueFull(),
1✔
74
            options.getRequestCleanupInterval(),
1✔
75
            sourceWriter == null ? null : sourceWriter.outgoing);
76

77
        // The "reconnect" buffer contains internal messages, and we will keep it unlimited in size
78
        reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval(),
1✔
79
            sourceWriter == null ? null : sourceWriter.reconnectOutgoing);
80
        reconnectBufferSize = options.getReconnectBufferSize();
1✔
81
    }
1✔
82

83
    // Should only be called if the current thread has exited.
84
    // Use the Future from stop() to determine if it is ok to call this.
85
    // This method resets that future so mistiming can result in badness.
86
    void start(Future<DataPort> dataPortFuture) {
87
        this.startStopLock.lock();
1✔
88
        try {
89
            this.dataPortFuture = dataPortFuture;
1✔
90
            this.running.set(true);
1✔
91
            this.outgoing.resume();
1✔
92
            this.reconnectOutgoing.resume();
1✔
93
            this.stopped = connection.getExecutor().submit(this, Boolean.TRUE);
1✔
94
        } finally {
95
            this.startStopLock.unlock();
1✔
96
        }
97
    }
1✔
98

99
    // May be called several times on an error.
100
    // Returns a future that is completed when the thread completes, not when this
101
    // method does.
102
    Future<Boolean> stop() {
103
        if (running.get()) {
1✔
104
            running.set(false);
1✔
105
            startStopLock.lock();
1✔
106
            try {
107
                this.outgoing.pause();
1✔
108
                this.reconnectOutgoing.pause();
1✔
109
                this.outgoing.filter(NatsMessage::isProtocolFilterOnStop);
1✔
110
            }
111
            finally {
112
                this.startStopLock.unlock();
1✔
113
            }
114
        }
115
        return this.stopped;
1✔
116
    }
117

118
    boolean isRunning() {
119
        return running.get();
1✔
120
    }
121

122
    void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException {
123
        writerLock.lock();
1✔
124
        try {
125
            int sendPosition = 0;
1✔
126
            int sbl = sendBufferLength.get();
1✔
127

128
            while (msg != null) {
1✔
129
                long size = msg.getSizeInBytes();
1✔
130

131
                if (sendPosition + size > sbl) {
1✔
132
                    if (sendPosition > 0) {
1✔
133
                        dataPort.write(sendBuffer, sendPosition);
×
NEW
134
                        stats.registerWrite(sendPosition);
×
135
                        sendPosition = 0;
×
136
                    }
137
                    if (size > sbl) { // have to resize b/c can't fit 1 message
1✔
138
                        sbl = bufferAllocSize((int) size, BUFFER_BLOCK_SIZE);
1✔
139
                        sendBufferLength.set(sbl);
1✔
140
                        sendBuffer = new byte[sbl];
1✔
141
                    }
142
                }
143

144
                ByteArrayBuilder bab = msg.getProtocolBab();
1✔
145
                int babLen = bab.length();
1✔
146
                System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen);
1✔
147
                sendPosition += babLen;
1✔
148

149
                sendBuffer[sendPosition++] = CR;
1✔
150
                sendBuffer[sendPosition++] = LF;
1✔
151

152
                if (!msg.isProtocol()) { // because a protocol message does not have headers or data
1✔
153
                    sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);
1✔
154

155
                    byte[] bytes = msg.getData(); // guaranteed to not be null
1✔
156
                    if (bytes.length > 0) {
1✔
157
                        System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
1✔
158
                        sendPosition += bytes.length;
1✔
159
                    }
160

161
                    sendBuffer[sendPosition++] = CR;
1✔
162
                    sendBuffer[sendPosition++] = LF;
1✔
163
                }
164

165
                stats.incrementOutMsgs();
1✔
166
                stats.incrementOutBytes(size);
1✔
167

168
                if (msg.flushImmediatelyAfterPublish) {
1✔
169
                    dataPort.flush();
1✔
170
                }
171
                msg = msg.next;
1✔
172
            }
1✔
173

174
            // no need to write if there are no bytes
175
            if (sendPosition > 0) {
1✔
176
                dataPort.write(sendBuffer, sendPosition);
1✔
177
                stats.registerWrite(sendPosition);
1✔
178
            }
179
        }
180
        finally {
181
            writerLock.unlock();
1✔
182
        }
183
    }
1✔
184

185
    @Override
186
    public void run() {
187
        Duration outgoingTimeout = Duration.ofMinutes(2); // This can be long since no one is sending
1✔
188
        Duration reconnectTimeout = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through
1✔
189

190
        try {
191
            dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
1✔
192
            StatisticsCollector stats = this.connection.getStatisticsCollector();
1✔
193

194
            while (running.get() && !Thread.interrupted()) {
1✔
195
                NatsMessage msg;
196
                if (this.reconnectMode.get()) {
1✔
197
                    msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
1✔
198
                }
199
                else {
200
                    msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout);
1✔
201
                }
202
                if (msg != null) {
1✔
203
                    sendMessageBatch(msg, dataPort, stats);
1✔
204
                }
205
            }
1✔
206
        } catch (IOException | BufferOverflowException io) {
1✔
207
            // if already not running, an IOE is not unreasonable in a transition state
208
            if (running.get()) {
1✔
209
                this.connection.handleCommunicationIssue(io);
1✔
210
            }
211
        } catch (CancellationException | ExecutionException ex) {
×
212
            // Exit
213
        } catch (InterruptedException ex) {
1✔
214
            // Exit
215
            Thread.currentThread().interrupt();
1✔
216
        } finally {
217
            this.running.set(false);
1✔
218
        }
219
    }
1✔
220

221
    void setReconnectMode(boolean tf) {
222
        reconnectMode.set(tf);
1✔
223
    }
1✔
224

225
    boolean canQueueDuringReconnect(NatsMessage msg) {
226
        // don't over fill the "send" buffer while waiting to reconnect
227
        return (reconnectBufferSize < 0 || (outgoing.sizeInBytes() + msg.getSizeInBytes()) < reconnectBufferSize);
1✔
228
    }
229

230
    boolean queue(NatsMessage msg) {
231
        return this.outgoing.push(msg);
1✔
232
    }
233

234
    void queueInternalMessage(NatsMessage msg) {
235
        if (this.reconnectMode.get()) {
1✔
236
            this.reconnectOutgoing.push(msg);
1✔
237
        } else {
238
            this.outgoing.push(msg, true);
1✔
239
        }
240
    }
1✔
241

242
    void flushBuffer() {
243
        // Since there is no connection level locking, we rely on synchronization
244
        // of the APIs here.
245
        writerLock.lock();
1✔
246
        try {
247
            if (this.running.get()) {
1✔
248
                dataPort.flush();
1✔
249
            }
250
        } catch (Exception e) {
×
251
            // NOOP;
252
        }
253
        finally {
254
            writerLock.unlock();
1✔
255
        }
256
    }
1✔
257

258
    long outgoingPendingMessageCount() {
259
        return outgoing.length();
×
260
    }
261

262
    long outgoingPendingBytes() {
263
        return outgoing.sizeInBytes();
×
264
    }
265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc