• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1888

24 Feb 2025 10:04PM UTC coverage: 95.729% (+0.007%) from 95.722%
#1888

push

github

web-flow
Tuning common code used for watches and key lookup (#1281)

3 of 4 new or added lines in 1 file covered. (75.0%)

6 existing lines in 2 files now uncovered.

11453 of 11964 relevant lines covered (95.73%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.02
/src/main/java/io/nats/client/impl/NatsFeatureBase.java
1
// Copyright 2022 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.*;
17
import io.nats.client.api.AckPolicy;
18
import io.nats.client.api.ConsumerConfiguration;
19
import io.nats.client.api.DeliverPolicy;
20
import io.nats.client.api.MessageInfo;
21

22
import java.io.IOException;
23
import java.time.Duration;
24
import java.util.Collections;
25
import java.util.List;
26

27
import static io.nats.client.support.NatsJetStreamConstants.JS_NO_MESSAGE_FOUND_ERR;
28

29
public class NatsFeatureBase {
30

31
    protected final NatsJetStream js;
32
    protected final NatsJetStreamManagement jsm;
33
    protected String streamName;
34

35
    NatsFeatureBase(NatsConnection connection, FeatureOptions fo) throws IOException {
1✔
36
        if (fo == null) {
1✔
37
            js = new NatsJetStream(connection, null);
1✔
38
            jsm = new NatsJetStreamManagement(connection, null);
1✔
39
        }
40
        else {
41
            js = new NatsJetStream(connection, fo.getJetStreamOptions());
1✔
42
            jsm = new NatsJetStreamManagement(connection, fo.getJetStreamOptions());
1✔
43
        }
44
    }
1✔
45

46
    String getStreamName() {
47
        return streamName;
1✔
48
    }
49

50
    protected MessageInfo _getLast(String subject) throws IOException, JetStreamApiException {
51
        try {
52
            return jsm.getLastMessage(streamName, subject);
1✔
53
        }
54
        catch (JetStreamApiException jsae) {
1✔
55
            if (jsae.getApiErrorCode() == JS_NO_MESSAGE_FOUND_ERR) {
1✔
56
                return null;
1✔
57
            }
58
            throw jsae;
×
59
        }
60
    }
61

62
    protected MessageInfo _getBySeq(long seq) throws IOException, JetStreamApiException {
63
        try {
64
            return jsm.getMessage(streamName, seq);
1✔
65
        }
66
        catch (JetStreamApiException jsae) {
1✔
67
            if (jsae.getApiErrorCode() == JS_NO_MESSAGE_FOUND_ERR) {
1✔
68
                return null;
1✔
69
            }
70
            throw jsae;
×
71
        }
72
    }
73

74
    protected void visitSubject(String subject, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
75
        visitSubject(Collections.singletonList(subject), deliverPolicy, headersOnly, ordered, handler);
1✔
76
    }
1✔
77

78
    protected void visitSubject(List<String> subjects, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException, InterruptedException {
79
        ConsumerConfiguration.Builder ccb = ConsumerConfiguration.builder()
1✔
80
            .ackPolicy(AckPolicy.None)
1✔
81
            .deliverPolicy(deliverPolicy)
1✔
82
            .headersOnly(headersOnly)
1✔
83
            .filterSubjects(subjects);
1✔
84

85
        PushSubscribeOptions pso = PushSubscribeOptions.builder()
1✔
86
            .stream(streamName)
1✔
87
            .ordered(ordered)
1✔
88
            .configuration(ccb.build())
1✔
89
            .build();
1✔
90

91
        Duration timeout = js.getTimeout();
1✔
92
        JetStreamSubscription sub = js.subscribe(null, pso);
1✔
93
        try {
94
            long pending = sub.getConsumerInfo().getCalculatedPending();
1✔
95
            while (pending > 0) { // no need to loop if nothing pending
1✔
96
                Message m = sub.nextMessage(timeout);
1✔
97
                if (m == null) {
1✔
NEW
98
                    return; // if there are no messages by the timeout, we are done.
×
99
                }
100
                handler.onMessage(m);
1✔
101
                if (--pending == 0) {
1✔
102
                    return;
1✔
103
                }
104
            }
1✔
105
        }
106
        finally {
107
            sub.unsubscribe();
1✔
108
        }
109
    }
1✔
110
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc