• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600409404

09 May 2026 11:49AM UTC coverage: 83.347%. First build
25600409404

Pull #295

github

web-flow
Merge ace5e3c2a into 9757517ae
Pull Request #295: Nats v2 web

2564 of 3407 branches covered (75.26%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

7711 of 8921 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.57
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16
package com.github.sonus21.rqueue.nats.js;
17

18
import com.github.sonus21.rqueue.config.RqueueConfig;
19
import com.github.sonus21.rqueue.core.EndpointRegistry;
20
import com.github.sonus21.rqueue.listener.QueueDetail;
21
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
22
import com.github.sonus21.rqueue.nats.RqueueNatsException;
23
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
24
import com.github.sonus21.rqueue.utils.Constants;
25
import com.github.sonus21.rqueue.utils.PriorityUtils;
26
import java.time.Duration;
27
import java.util.ArrayList;
28
import java.util.List;
29
import java.util.logging.Level;
30
import java.util.logging.Logger;
31
import org.springframework.beans.factory.SmartInitializingSingleton;
32

33
/**
34
 * Boot-time JetStream stream / DLQ existence guard. Mirrors the role
35
 * {@code NatsKvBucketValidator} plays for KV buckets — moves stream existence checks off the
36
 * publish / pop hot path (where they cost a {@code getStreamInfo} round-trip per message) onto
37
 * the bootstrap path so a running broker never has to ask "does this stream exist?" again.
38
 *
39
 * <p><b>When this runs.</b> Implements {@link SmartInitializingSingleton} so provisioning fires
40
 * after every singleton bean — including {@code RqueueMessageListenerContainer} — has finished
41
 * its {@code afterPropertiesSet}, which is when {@link EndpointRegistry} is populated, and
42
 * <em>before</em> {@code SmartLifecycle.start()} spawns the message pollers. Listening on
43
 * {@code RqueueBootstrapEvent} would race against the pollers because that event fires
44
 * <em>after</em> {@code doStart()} has already submitted them, and a poll on a not-yet-created
45
 * stream surfaces as {@code stream not found [10059]}. {@code InitializingBean} would be too
46
 * early — the registry is still empty when this bean's own {@code afterPropertiesSet} would run.
47
 *
48
 * <p><b>What it walks.</b> For every queue in {@link EndpointRegistry#getActiveQueueDetails()}:
49
 * <ul>
50
 *   <li>the main stream {@code <streamPrefix><queue>},
51
 *   <li>one stream per declared priority sub-queue ({@code <streamPrefix><queue>-<priority>}),
52
 *   <li>when the listener declared a Rqueue-level DLQ ({@link QueueDetail#isDlqSet()}): the
53
 *       target DLQ queue's stream ({@code <streamPrefix><deadLetterQueueName>}), so that
54
 *       {@code PostProcessingHandler} can publish there after retry exhaustion.
55
 *   <li>when no Rqueue DLQ is declared and {@code RqueueNatsConfig.isAutoCreateDlqStream()} is
56
 *       true: the NATS-native DLQ stream ({@code <streamPrefix><queue><dlqStreamSuffix>}) as a
57
 *       safety net for messages that exhaust JetStream {@code maxDeliver}.
58
 * </ul>
59
 *
60
 * <p><b>Behaviour by flag.</b> All work is delegated to
61
 * {@link NatsProvisioner#ensureStream(String, java.util.List)} /
62
 * {@link NatsProvisioner#ensureDlqStream(String, java.util.List)}, so the validator inherits the
63
 * existing flag semantics without re-implementing them:
64
 * <ul>
65
 *   <li>{@code autoCreateStreams=true} (default) — any missing stream is created using
66
 *       {@link RqueueNatsConfig.StreamDefaults}.
67
 *   <li>{@code autoCreateStreams=false} — every missing stream surfaces an
68
 *       {@link RqueueNatsException}; the validator collects all of them and raises one
69
 *       {@link IllegalStateException} listing every missing stream so operators can run a
70
 *       single batch of {@code nats stream add} commands rather than chase failures one queue
71
 *       at a time.
72
 * </ul>
73
 */
74
public class NatsStreamValidator implements SmartInitializingSingleton {
75

76
  private static final Logger log = Logger.getLogger(NatsStreamValidator.class.getName());
1✔
77

78
  private final NatsProvisioner provisioner;
79
  private final RqueueNatsConfig config;
80
  private final RqueueConfig rqueueConfig;
81

82
  public NatsStreamValidator(NatsProvisioner provisioner, RqueueNatsConfig config) {
83
    this(provisioner, config, null);
1✔
84
  }
1✔
85

86
  public NatsStreamValidator(
87
      NatsProvisioner provisioner, RqueueNatsConfig config, RqueueConfig rqueueConfig) {
1✔
88
    this.provisioner = provisioner;
1✔
89
    this.config = config;
1✔
90
    this.rqueueConfig = rqueueConfig;
1✔
91
  }
1✔
92

93
  @Override
94
  public void afterSingletonsInstantiated() {
95
    boolean schedulingSupported = provisioner.isMessageSchedulingSupported();
1✔
96
    if (schedulingSupported) {
1!
NEW
97
      log.log(
×
98
          Level.INFO,
99
          "NatsStreamValidator: NATS message scheduling (ADR-51) is AVAILABLE — "
100
              + "enqueueWithDelay will use the Nats-Next-Deliver-Time header.");
101
    } else {
102
      log.log(
1✔
103
          Level.WARNING,
104
          "NatsStreamValidator: NATS message scheduling (ADR-51) is NOT AVAILABLE — "
105
              + "the connected server is older than {0}. "
106
              + "Calls to enqueueWithDelay will throw RqueueNatsException at runtime. "
107
              + "Upgrade NATS to {0}+ to enable delayed message delivery.",
108
          new Object[] {NatsProvisioner.SCHEDULING_MIN_VERSION});
109
    }
110

111
    List<QueueDetail> queues = EndpointRegistry.getActiveQueueDetails();
1✔
112
    if (queues.isEmpty()) {
1✔
113
      log.log(Level.FINE, "NatsStreamValidator: no active queues registered; nothing to do");
1✔
114
      return;
1✔
115
    }
116
    RqueueNatsConfig.ConsumerDefaults cd = config.getConsumerDefaults();
1✔
117
    boolean producerOnly = rqueueConfig != null && rqueueConfig.isProducer();
1✔
118
    List<String> failures = new ArrayList<>();
1✔
119
    int total = 0;
1✔
120
    for (QueueDetail q : queues) {
1✔
121
      String mainStream = config.getStreamPrefix() + q.getName();
1✔
122
      String mainSubject = config.getSubjectPrefix() + q.getName();
1✔
123
      total += tryEnsure(failures, mainStream, mainSubject, q);
1✔
124
      if (!producerOnly) {
1✔
125
        tryEnsureConsumer(failures, mainStream, q.resolvedConsumerName(), q, cd);
1✔
126
      }
127

128
      if (q.getPriority() != null) {
1!
129
        for (String priority : q.getPriority().keySet()) {
1✔
130
          if (Constants.DEFAULT_PRIORITY_KEY.equals(priority)) {
1✔
131
            continue; // DEFAULT entry is the queue itself; already handled above
1✔
132
          }
133
          String pStream =
1✔
134
              config.getStreamPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
1✔
135
          String pSubject =
1✔
136
              config.getSubjectPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
1✔
137
          total += tryEnsure(failures, pStream, pSubject, q);
1✔
138
          // Consumer is NOT created here: each priority sub-queue has its own QueueDetail
139
          // in the registry and is processed as its own mainStream entry above, so exactly
140
          // one consumer is created per stream. Adding a second one here would fail on
141
          // WorkQueue streams (error 10099).
142
        }
1✔
143
      }
144

145
      if (q.isDlqSet()) {
1✔
146
        // User declared a Rqueue-level DLQ: ensure the target queue's JetStream stream exists
147
        // so that PostProcessingHandler can publish to it after retry exhaustion. The
148
        // NATS-native "job-queue-dlq" stream is unrelated and must not be created here —
149
        // Rqueue routes the message explicitly, not via advisory bridging.
150
        String dlqQueueStream = config.getStreamPrefix() + q.getDeadLetterQueueName();
1✔
151
        String dlqQueueSubject = config.getSubjectPrefix() + q.getDeadLetterQueueName();
1✔
152
        total += tryEnsure(failures, dlqQueueStream, dlqQueueSubject, q);
1✔
153
        // No consumer needed for the DLQ stream here — the DLQ queue registers its own listener.
154
      }
155
    }
1✔
156
    if (!failures.isEmpty()) {
1!
157
      String hint = config.isAutoCreateStreams()
×
158
          ? "Stream creation failed — verify NATS is running with JetStream enabled"
×
159
              + " (start the server with `nats-server -js`) and that the account has"
160
              + " `add_stream` permission."
161
          : "With rqueue.nats.auto-create-streams=false every required stream must exist"
×
162
              + " before the application starts. Run `nats stream add` for each missing"
163
              + " stream or set rqueue.nats.auto-create-streams=true to let rqueue create"
164
              + " them automatically.";
165
      throw new IllegalStateException("NATS JetStream provisioning failed for "
×
166
          + failures.size()
×
167
          + " of "
168
          + total
169
          + " stream(s) at startup. "
170
          + hint
171
          + " Failed streams:\n"
172
          + "  - "
173
          + String.join("\n  - ", failures));
×
174
    }
175
    log.log(
1✔
176
        Level.INFO,
177
        "NatsStreamValidator: ensured {0} JetStream stream(s) across {1} queue(s)",
178
        new Object[] {total, queues.size()});
1✔
179
  }
1✔
180

181
  private void tryEnsureConsumer(
182
      List<String> failures,
183
      String streamName,
184
      String consumerName,
185
      QueueDetail q,
186
      RqueueNatsConfig.ConsumerDefaults cd) {
187
    Duration ackWait = JetStreamMessageBroker.resolveAckWait(q, config);
1✔
188
    long maxDeliver = JetStreamMessageBroker.resolveMaxDeliver(q, config);
1✔
189
    try {
190
      provisioner.ensureConsumer(
1✔
191
          streamName, consumerName, ackWait, maxDeliver, cd.getMaxAckPending());
1✔
192
    } catch (RqueueNatsException e) {
×
193
      failures.add("consumer " + consumerName + " on " + streamName + ": " + rootCause(e));
×
194
    }
1✔
195
  }
1✔
196

197
  private int tryEnsure(List<String> failures, String streamName, String subject, QueueDetail q) {
198
    try {
199
      provisioner.ensureStream(
1✔
200
          streamName, List.of(subject), q.getType(), "rqueue queue: " + q.getName());
1✔
201
      return 1;
1✔
202
    } catch (RqueueNatsException e) {
×
203
      failures.add(streamName + " (subject " + subject + "): " + rootCause(e));
×
204
      return 1;
×
205
    }
206
  }
207

208
  private int tryEnsureDlq(List<String> failures, String dlqStream, String dlqSubject) {
209
    try {
210
      provisioner.ensureDlqStream(dlqStream, List.of(dlqSubject));
×
211
      return 1;
×
212
    } catch (RqueueNatsException e) {
×
213
      failures.add(dlqStream + " (DLQ subject " + dlqSubject + "): " + rootCause(e));
×
214
      return 1;
×
215
    }
216
  }
217

218
  /** Returns the deepest non-null message in the cause chain for diagnostics. */
219
  private static String rootCause(Throwable t) {
220
    Throwable cause = t;
×
221
    while (cause.getCause() != null) {
×
222
      cause = cause.getCause();
×
223
    }
224
    String msg = cause.getMessage();
×
225
    return (msg != null && !msg.isEmpty()) ? msg : cause.getClass().getSimpleName();
×
226
  }
227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc