• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2010

26 Jun 2025 08:30PM UTC coverage: 95.677% (+0.02%) from 95.653%
#2010

push

github

web-flow
Merge pull request #1338 from nats-io/kv-limit-marker-test

Fixed KV Limit Marker Test to only run against 2.11.2 or later

11753 of 12284 relevant lines covered (95.68%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.41
/src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java
1
// Copyright 2020 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.ConsumerInfo;
18
import io.nats.client.support.NatsJetStreamConstants;
19

20
import java.io.IOException;
21
import java.time.Duration;
22
import java.util.Iterator;
23
import java.util.List;
24

25
import static io.nats.client.support.NatsConstants.NANOS_PER_MILLI;
26

27
/**
28
 * This is a JetStream specific subscription.
29
 */
30
public class NatsJetStreamSubscription extends NatsSubscription implements JetStreamSubscription, NatsJetStreamConstants {
31

32
    public static final String SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL = "Subscription type does not support pull.";
33
    public static final long EXPIRE_ADJUSTMENT = 10;
34
    public static final long MIN_EXPIRE_MILLIS = 20;
35

36
    protected final NatsJetStream js;
37

38
    protected String stream;
39
    protected String consumerName;
40

41
    protected MessageManager manager;
42

43
    NatsJetStreamSubscription(String sid, String subject, String queueName,
44
                              NatsConnection connection, NatsDispatcher dispatcher,
45
                              NatsJetStream js,
46
                              String stream, String consumer,
47
                              MessageManager manager)
48
    {
49
        super(sid, subject, queueName, connection, dispatcher);
1✔
50

51
        this.js = js;
1✔
52
        this.stream = stream;
1✔
53
        this.consumerName = consumer; // might be null, someone will call setConsumerName
1✔
54

55
        this.manager = manager;
1✔
56
        manager.startup(this);
1✔
57
    }
1✔
58

59
    void setConsumerName(String consumerName) {
60
        this.consumerName = consumerName;
1✔
61
    }
1✔
62

63
    @Override
64
    public String getConsumerName() {
65
        return consumerName;
1✔
66
    }
67

68
    String getStream() {
69
        return stream;
1✔
70
    }
71

72
    boolean isPullMode() {
73
        return false;
1✔
74
    }
75

76
    MessageManager getManager() { return manager; } // internal, for testing
1✔
77

78
    @Override
79
    void invalidate() {
80
        manager.shutdown();
1✔
81
        super.invalidate();
1✔
82
    }
1✔
83

84
    @Override
85
    public Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException {
86
        if (timeout == null) {
1✔
87
            return _nextUnmanagedNoWait(null);
1✔
88
        }
89
        long timeoutNanos = timeout.toNanos();
1✔
90
        if (timeoutNanos <= 0) {
1✔
91
            return _nextUnmanagedWaitForever(null);
1✔
92
        }
93
        return _nextUnmanaged(timeoutNanos, null);
1✔
94
    }
95

96
    @Override
97
    public Message nextMessage(long timeoutMillis) throws InterruptedException, IllegalStateException {
98
        if (timeoutMillis <= 0) {
1✔
99
            return _nextUnmanagedWaitForever(null);
1✔
100
        }
101
        return _nextUnmanaged(timeoutMillis * NANOS_PER_MILLI, null);
1✔
102
    }
103

104
    protected Message _nextUnmanagedWaitForever(String expectedPullSubject) throws InterruptedException {
105
        while (true) {
106
            Message msg = nextMessageInternal(Duration.ZERO);
1✔
107
            if (msg != null) { // null shouldn't happen, so just a code guard b/c nextMessageInternal can return null
1✔
108
                switch (manager.manage(msg)) {
1✔
109
                    case MESSAGE:
110
                        return msg;
1✔
111
                    case STATUS_ERROR:
112
                        // if the status applies throw exception, otherwise it's ignored, fall through
113
                        if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
×
114
                            throw new JetStreamStatusException(msg.getStatus(), this);
×
115
                        }
116
                        break;
117
                }
118
                // Check again since waiting forever when:
119
                // 1. Any STATUS_HANDLED or STATUS_TERMINUS
120
                // 2. STATUS_ERRORS that aren't for expected pullSubject
121
            }
122
        }
×
123
    }
124

125
    protected Message _nextUnmanagedNoWait(String expectedPullSubject) throws InterruptedException {
126
        while (true) {
127
            Message msg = nextMessageInternal(null);
1✔
128
            if (msg == null) {
1✔
129
                return null;
1✔
130
            }
131
            switch (manager.manage(msg)) {
1✔
132
                case MESSAGE:
133
                    return msg;
1✔
134
                case STATUS_TERMINUS:
135
                    // if the status applies, return null, otherwise it's ignored, fall through
136
                    if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
1✔
137
                        return null;
1✔
138
                    }
139
                    break;
140
                case STATUS_ERROR:
141
                    // if the status applies, throw exception, otherwise it's ignored, fall through
142
                    if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
×
143
                        throw new JetStreamStatusException(msg.getStatus(), this);
×
144
                    }
145
                    break;
146
            }
147
            // These statuses don't apply to the message that came in,
148
            // so we just loop and move on to the next message.
149
            // 1. Any STATUS_HANDLED
150
            // 2. STATUS_TERMINUS or STATUS_ERRORS that aren't for expected pullSubject
151
        }
×
152
    }
153

154
    protected Message _nextUnmanaged(long timeoutNanos, String expectedPullSubject) throws InterruptedException {
155
        long timeLeftNanos = timeoutNanos;
1✔
156
        long start = NatsSystemClock.nanoTime();
1✔
157
        while (timeLeftNanos > 0) {
1✔
158
            Message msg = nextMessageInternal( Duration.ofNanos(timeLeftNanos) );
1✔
159
            if (msg == null) {
1✔
160
                return null; // normal timeout
1✔
161
            }
162
            switch (manager.manage(msg)) {
1✔
163
                case MESSAGE:
164
                    return msg;
1✔
165
                case STATUS_TERMINUS:
166
                    // if the status applies return null, otherwise it's ignored, fall through
167
                    if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
1✔
168
                        return null;
1✔
169
                    }
170
                    break;
171
                case STATUS_ERROR:
172
                    // if the status applies throw exception, otherwise it's ignored, fall through
173
                    if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
1✔
174
                        throw new JetStreamStatusException(msg.getStatus(), this);
1✔
175
                    }
176
                    break;
177
            }
178
            // anything else, try again while we have time
179
            timeLeftNanos = timeoutNanos - (NatsSystemClock.nanoTime() - start);
1✔
180
        }
1✔
181
        return null;
×
182
    }
183

184
    /**
185
     * {@inheritDoc}
186
     */
187
    @Override
188
    public void pull(int batchSize) {
189
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
190
    }
191

192
    /**
193
     * {@inheritDoc}
194
     */
195
    @Override
196
    public void pull(PullRequestOptions pullRequestOptions) {
197
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
198
    }
199

200
    /**
201
     * {@inheritDoc}
202
     */
203
    @Override
204
    public void pullNoWait(int batchSize) {
205
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
206
    }
207

208
    /**
209
     * {@inheritDoc}
210
     */
211
    @Override
212
    public void pullNoWait(int batchSize, Duration expiresIn) {
213
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
214
    }
215

216
    /**
217
     * {@inheritDoc}
218
     */
219
    @Override
220
    public void pullNoWait(int batchSize, long expiresInMillis) {
221
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
222
    }
223

224
    /**
225
     * {@inheritDoc}
226
     */
227
    @Override
228
    public void pullExpiresIn(int batchSize, Duration expiresIn) {
229
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
230
    }
231

232
    /**
233
     * {@inheritDoc}
234
     */
235
    @Override
236
    public void pullExpiresIn(int batchSize, long expiresInMillis) {
237
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
238
    }
239

240
    /**
241
     * {@inheritDoc}
242
     */
243
    @Override
244
    public List<Message> fetch(int batchSize, long maxWaitMillis) {
245
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
246
    }
247

248
    /**
249
     * {@inheritDoc}
250
     */
251
    @Override
252
    public List<Message> fetch(int batchSize, Duration maxWait) {
253
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
254
    }
255

256
    /**
257
     * {@inheritDoc}
258
     */
259
    @Override
260
    public Iterator<Message> iterate(int batchSize, Duration maxWait) {
261
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
262
    }
263

264
    /**
265
     * {@inheritDoc}
266
     */
267
    @Override
268
    public Iterator<Message> iterate(final int batchSize, long maxWaitMillis) {
269
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
270
    }
271

272
    /**
273
     * {@inheritDoc}
274
     */
275
    @Override
276
    public JetStreamReader reader(int batchSize, int repullAt) {
277
        throw new IllegalStateException(SUBSCRIPTION_TYPE_DOES_NOT_SUPPORT_PULL);
1✔
278
    }
279

280
    /**
281
     * {@inheritDoc}
282
     */
283
    @Override
284
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
285
        return js.lookupConsumerInfo(stream, consumerName);
1✔
286
    }
287

288
    @Override
289
    public String toString() {
290
        return "NatsJetStreamSubscription{" +
1✔
291
                "consumer='" + consumerName + '\'' +
292
                ", stream='" + stream + '\'' +
293
                ", deliver='" + getSubject() + '\'' +
1✔
294
                ", isPullMode=" + isPullMode() +
1✔
295
                '}';
296
    }
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc