• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25621809822

10 May 2026 06:27AM UTC coverage: 83.425% (+0.03%) from 83.396%
25621809822

push

github

web-flow
Nats scheduling fix (#297)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipp... (continued)

2628 of 3487 branches covered (75.37%)

Branch coverage included in aggregate %.

128 of 179 new or added lines in 12 files covered. (71.51%)

1 existing line in 1 file now uncovered.

7841 of 9062 relevant lines covered (86.53%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.39
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 */
10
package com.github.sonus21.rqueue.nats.internal;
11

12
import com.github.sonus21.rqueue.enums.QueueType;
13
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
14
import com.github.sonus21.rqueue.nats.RqueueNatsException;
15
import io.nats.client.Connection;
16
import io.nats.client.JetStreamApiException;
17
import io.nats.client.JetStreamManagement;
18
import io.nats.client.KeyValue;
19
import io.nats.client.KeyValueManagement;
20
import io.nats.client.api.AckPolicy;
21
import io.nats.client.api.CompressionOption;
22
import io.nats.client.api.ConsumerConfiguration;
23
import io.nats.client.api.ConsumerInfo;
24
import io.nats.client.api.DeliverPolicy;
25
import io.nats.client.api.KeyValueConfiguration;
26
import io.nats.client.api.KeyValueStatus;
27
import io.nats.client.api.RetentionPolicy;
28
import io.nats.client.api.StreamConfiguration;
29
import io.nats.client.api.StreamInfo;
30
import java.io.IOException;
31
import java.time.Duration;
32
import java.util.List;
33
import java.util.concurrent.ConcurrentHashMap;
34
import java.util.logging.Level;
35
import java.util.logging.Logger;
36

37
/**
38
 * Idempotent stream/consumer/KV provisioning helpers. All methods are safe to call repeatedly;
39
 * if the target object already exists with a compatible config, this class leaves it alone. If it
40
 * exists but with diverging config (e.g. ackWait, maxDeliver), this class logs a WARN and does
41
 * NOT auto-mutate so user customizations are preserved.
42
 */
43
public class NatsProvisioner {
44

45
  private static final Logger log = Logger.getLogger(NatsProvisioner.class.getName());
1✔
46

47
  private final Connection connection;
48
  private final KeyValueManagement kvm;
49
  private final JetStreamManagement jsm;
50
  private final RqueueNatsConfig config;
51

52
  private final ConcurrentHashMap<String, KeyValue> kvCache = new ConcurrentHashMap<>();
1✔
53
  private final ConcurrentHashMap<String, Object> kvLocks = new ConcurrentHashMap<>();
1✔
54

55
  // stream name → schedulingEnabled (true if the stream was created/updated with
56
  // allowMessageSchedules)
57
  private final ConcurrentHashMap<String, Boolean> streamsDone = new ConcurrentHashMap<>();
1✔
58
  private final ConcurrentHashMap<String, Object> streamLocks = new ConcurrentHashMap<>();
1✔
59

60
  // "streamName/requestedConsumerName" → actual consumer name (may differ for stale-rebind)
61
  private final ConcurrentHashMap<String, String> consumerCache = new ConcurrentHashMap<>();
1✔
62
  private final ConcurrentHashMap<String, Object> consumerLocks = new ConcurrentHashMap<>();
1✔
63

64
  /**
65
   * Minimum NATS server version that supports server-side message scheduling via the
66
   * {@code Nats-Schedule} JetStream publish header (ADR-51).
67
   */
68
  public static final String SCHEDULING_MIN_VERSION = "2.12.0";
69

70
  private final boolean schedulingSupported;
71

72
  public NatsProvisioner(Connection connection, JetStreamManagement jsm, RqueueNatsConfig config)
73
      throws IOException {
1✔
74
    this.connection = connection;
1✔
75
    this.kvm = connection.keyValueManagement();
1✔
76
    this.jsm = jsm;
1✔
77
    this.config = config;
1✔
78
    io.nats.client.api.ServerInfo serverInfo = connection.getServerInfo();
1✔
79
    this.schedulingSupported =
1✔
80
        serverInfo != null && serverInfo.isSameOrNewerThanVersion(SCHEDULING_MIN_VERSION);
1✔
81
    log.log(
1✔
82
        Level.INFO,
83
        "NATS server version={0}; message scheduling (ADR-51) supported={1}",
84
        new Object[] {serverInfo != null ? serverInfo.getVersion() : "unknown", schedulingSupported
1✔
85
        });
86
  }
1✔
87

88
  /** Returns {@code true} when the connected NATS server supports message scheduling (>= 2.12). */
89
  public boolean isMessageSchedulingSupported() {
90
    return schedulingSupported;
1✔
91
  }
92

93
  // ---- KV provisioning --------------------------------------------------
94

95
  /**
96
   * Returns a {@link KeyValue} handle for {@code bucketName}, creating the bucket on first call.
97
   * All buckets are created with S2 compression. A bucket-level TTL is applied at creation time
98
   * only; existing buckets are reused as-is.
99
   *
100
   * @param bucketName NATS KV bucket name (e.g. {@code "rqueue-jobs"})
101
   * @param ttl        bucket-level max-age; {@code null} or non-positive = no TTL
102
   */
103
  public KeyValue ensureKv(String bucketName, Duration ttl)
104
      throws IOException, JetStreamApiException {
105
    KeyValue cached = kvCache.get(bucketName);
1✔
106
    if (cached != null) {
1✔
107
      return cached;
1✔
108
    }
109
    Object lock = kvLocks.computeIfAbsent(bucketName, k -> new Object());
1✔
110
    synchronized (lock) {
1✔
111
      cached = kvCache.get(bucketName);
1✔
112
      if (cached != null) {
1✔
113
        return cached;
1✔
114
      }
115
      try {
116
        KeyValueStatus status = kvm.getStatus(bucketName);
1✔
117
        if (status != null) {
1!
118
          KeyValue kv = connection.keyValue(bucketName);
1✔
119
          kvCache.put(bucketName, kv);
1✔
120
          return kv;
1✔
121
        }
122
      } catch (JetStreamApiException missing) {
1✔
123
        // bucket absent — fall through to create
124
      }
×
125
      RqueueNatsConfig.StreamDefaults sd = config.getStreamDefaults();
1✔
126
      KeyValueConfiguration.Builder cfg = KeyValueConfiguration.builder()
1✔
127
          .name(bucketName)
1✔
128
          .compression(true)
1✔
129
          .replicas(sd.getReplicas())
1✔
130
          .storageType(sd.getStorage());
1✔
131
      if (ttl != null && !ttl.isZero() && !ttl.isNegative()) {
1!
132
        cfg.ttl(ttl);
1✔
133
      }
134
      kvm.create(cfg.build());
1✔
135
      KeyValue kv = connection.keyValue(bucketName);
1✔
136
      kvCache.put(bucketName, kv);
1✔
137
      return kv;
1✔
138
    }
139
  }
140

141
  // ---- Stream provisioning ----------------------------------------------
142

143
  /**
144
   * Ensure a JetStream stream exists with the given subjects, using {@link QueueType#QUEUE}
145
   * (WorkQueue retention) as the default. Callers that have a {@link QueueType} available should
146
   * use {@link #ensureStream(String, List, QueueType)} instead.
147
   */
148
  public void ensureStream(String streamName, List<String> subjects) {
149
    ensureStream(streamName, subjects, QueueType.QUEUE, null, false);
1✔
150
  }
1✔
151

152
  /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */
153
  public void ensureStream(String streamName, List<String> subjects, QueueType queueType) {
154
    ensureStream(streamName, subjects, queueType, null, false);
1✔
155
  }
1✔
156

157
  /** See {@link #ensureStream(String, List, QueueType, String, boolean)}. */
158
  public void ensureStream(
159
      String streamName, List<String> subjects, QueueType queueType, String description) {
160
    ensureStream(streamName, subjects, queueType, description, false);
1✔
161
  }
1✔
162

163
  /**
164
   * Ensure a JetStream stream exists with the given subjects and retention policy derived from
165
   * {@code queueType}:
166
   * <ul>
167
   *   <li>{@link QueueType#QUEUE} — {@link io.nats.client.api.RetentionPolicy#WorkQueue}: each
168
   *       message is delivered to exactly one consumer; competing-consumer semantics.
169
   *   <li>{@link QueueType#STREAM} — {@link io.nats.client.api.RetentionPolicy#Limits}: every
170
   *       independent durable consumer group receives all messages; stream/fan-out semantics.
171
   * </ul>
172
   *
173
   * <p>{@code description} is forwarded to JetStream as the stream's description (visible via
174
   * {@code nats stream info}). Callers should pass the rqueue queue name so operators can map a
175
   * stream back to the queue that created it; pass {@code null} to skip.
176
   *
177
   * <p>{@code allowSchedules} must be {@code true} when the stream will receive messages published
178
   * with the {@code Nats-Schedule} header (ADR-51). Only callers that perform delayed
179
   * enqueue should pass {@code true}; regular enqueue callers should pass {@code false} (or use the
180
   * shorter overloads). Equivalent to the CLI flag {@code --allow-schedules}.
181
   *
182
   * <p>Hits the NATS backend at most once per stream name per process lifetime; subsequent calls
183
   * return immediately from the in-process cache. If {@code allowSchedules=true} is later requested
184
   * for a stream that was previously created without that flag, the stream is updated in place via
185
   * {@link JetStreamManagement#updateStream}. If the stream already exists with a different
186
   * retention policy, a WARNING is logged and the existing config is left untouched.
187
   */
188
  public void ensureStream(
189
      String streamName,
190
      List<String> subjects,
191
      QueueType queueType,
192
      String description,
193
      boolean allowSchedules) {
194
    // Fast-path: already provisioned with at least as many capabilities as requested.
195
    Boolean cached = streamsDone.get(streamName);
1✔
196
    if (cached != null && (cached || !allowSchedules)) {
1✔
197
      return;
1✔
198
    }
199
    Object lock = streamLocks.computeIfAbsent(streamName, k -> new Object());
1✔
200
    synchronized (lock) {
1✔
201
      cached = streamsDone.get(streamName);
1✔
202
      if (cached != null && (cached || !allowSchedules)) {
1!
UNCOV
203
        return;
×
204
      }
205
      boolean enableSchedules = allowSchedules && schedulingSupported;
1✔
206
      try {
207
        StreamInfo existing = safeGetStreamInfo(streamName);
1✔
208
        RetentionPolicy desired =
209
            queueType == QueueType.STREAM ? RetentionPolicy.Limits : RetentionPolicy.WorkQueue;
1✔
210
        if (existing == null) {
1✔
211
          if (!config.isAutoCreateStreams()) {
1✔
212
            throw new RqueueNatsException(
1✔
213
                "Stream '" + streamName + "' does not exist and autoCreateStreams=false");
214
          }
215
          RqueueNatsConfig.StreamDefaults sd = config.getStreamDefaults();
1✔
216
          StreamConfiguration.Builder b = StreamConfiguration.builder()
1✔
217
              .name(streamName)
1✔
218
              .subjects(subjects)
1✔
219
              .replicas(sd.getReplicas())
1✔
220
              .storageType(sd.getStorage())
1✔
221
              .retentionPolicy(desired)
1✔
222
              .compressionOption(CompressionOption.S2);
1✔
223
          if (enableSchedules) {
1✔
224
            // Enable server-side message scheduling (ADR-51 / Nats-Schedule header).
225
            // Equivalent to: nats stream add MY_STREAM --allow-schedules
226
            b.allowMessageSchedules(true);
1✔
227
          }
228
          if (description != null && !description.isEmpty()) {
1!
229
            b.description(description);
1✔
230
          }
231
          if (sd.getMaxMsgs() > 0) {
1!
232
            b.maxMessages(sd.getMaxMsgs());
×
233
          }
234
          if (sd.getMaxBytes() > 0) {
1!
235
            b.maxBytes(sd.getMaxBytes());
×
236
          }
237
          if (sd.getMaxAge() != null
1✔
238
              && !sd.getMaxAge().isZero()
1!
239
              && !sd.getMaxAge().isNegative()) {
1!
240
            b.maxAge(sd.getMaxAge());
1✔
241
          }
242
          jsm.addStream(b.build());
1✔
243
        } else {
1✔
244
          RetentionPolicy actual = existing.getConfiguration().getRetentionPolicy();
1✔
245
          if (actual != desired) {
1✔
246
            log.log(
1✔
247
                Level.WARNING,
248
                "Stream ''{0}'' exists with retention={1} but queueMode requires retention={2}"
249
                    + " — leaving existing config in place.",
250
                new Object[] {streamName, actual, desired});
251
          }
252
          // Check whether new subjects need to be merged in (e.g. the sched wildcard added by
253
          // enqueueWithDelay after the stream was originally created by a plain enqueue call).
254
          java.util.List<String> existingSubjects = existing.getConfiguration().getSubjects();
1✔
255
          java.util.Set<String> existingSet = existingSubjects != null
1!
256
              ? new java.util.HashSet<>(existingSubjects)
1✔
257
              : new java.util.HashSet<>();
1✔
258
          boolean needsSubjectUpdate = subjects.stream().anyMatch(s -> !existingSet.contains(s));
1✔
259
          boolean needsFlagUpdate =
1✔
260
              enableSchedules && !existing.getConfiguration().getAllowMsgSchedules();
1!
261

262
          if (needsFlagUpdate || needsSubjectUpdate) {
1!
263
            // Merge: keep all existing subjects and append new ones (never remove).
264
            java.util.LinkedHashSet<String> merged = new java.util.LinkedHashSet<>(existingSet);
1✔
265
            merged.addAll(subjects);
1✔
266
            StreamConfiguration.Builder upd = StreamConfiguration.builder(
1✔
267
                    existing.getConfiguration())
1✔
268
                .subjects(new java.util.ArrayList<>(merged));
1✔
269
            if (needsFlagUpdate) {
1!
270
              upd.allowMessageSchedules(true);
1✔
271
            }
272
            jsm.updateStream(upd.build());
1✔
273
            if (needsFlagUpdate) {
1!
274
              log.log(
1✔
275
                  Level.INFO,
276
                  "Stream ''{0}'' updated to enable message scheduling (ADR-51).",
277
                  streamName);
278
            }
279
            if (needsSubjectUpdate) {
1✔
280
              log.log(
1✔
281
                  Level.INFO,
282
                  "Stream ''{0}'' updated with additional subjects: {1}.",
283
                  new Object[] {streamName, subjects});
284
            }
285
          }
286
        }
287
      } catch (IOException | JetStreamApiException e) {
1✔
288
        throw new RqueueNatsException(
1✔
289
            "Failed to ensure stream '" + streamName + "' for subjects " + subjects, e);
290
      }
1✔
291
      streamsDone.put(streamName, enableSchedules);
1✔
292
    }
1✔
293
  }
1✔
294

295
  /**
296
   * Ensure a durable pull consumer exists, returning the consumer name.
297
   * Hits the NATS backend at most once per (stream, consumer) pair per process lifetime.
298
   *
299
   * <p>Overload without a filter subject: used when the stream has only the work subject so the
300
   * filter would be redundant, or when multiple independent consumer groups (fan-out) must coexist
301
   * on a Limits-retention stream (NATS rejects two consumers with the same filter subject, error
302
   * 10100). For WorkQueue streams that also carry scheduler subjects ({@code .sched.*}) a filter
303
   * subject MUST be supplied via
304
   * {@link #ensureConsumer(String, String, String, Duration, long, long)} so the consumer only
305
   * receives work-subject messages and does not accidentally pick up scheduler entries.
306
   */
307
  public String ensureConsumer(
308
      String streamName,
309
      String consumerName,
310
      Duration ackWait,
311
      long maxDeliver,
312
      long maxAckPending) {
313
    return ensureConsumer(streamName, consumerName, null, ackWait, maxDeliver, maxAckPending);
1✔
314
  }
315

316
  /**
317
   * Ensure a durable pull consumer exists with an optional subject filter, returning the consumer
318
   * name. Hits the NATS backend at most once per (stream, consumer) pair per process lifetime.
319
   *
320
   * <p>{@code filterSubject} — when non-null, sets the consumer's filter subject so it only
321
   * receives messages published to that subject. Required when the stream carries both work subjects
322
   * and scheduler subjects ({@code .sched.*}): without a filter the consumer reads scheduler
323
   * entries before the scheduled time, delivering the message early. Pass {@code null} for streams
324
   * that do not use NATS scheduling, or where fan-out across multiple consumers is needed (Limits
325
   * retention).
326
   */
327
  public String ensureConsumer(
328
      String streamName,
329
      String consumerName,
330
      String filterSubject,
331
      Duration ackWait,
332
      long maxDeliver,
333
      long maxAckPending) {
334
    String cacheKey = streamName + "/" + consumerName;
1✔
335
    String cached = consumerCache.get(cacheKey);
1✔
336
    if (cached != null) {
1✔
337
      return cached;
1✔
338
    }
339
    Object lock = consumerLocks.computeIfAbsent(cacheKey, k -> new Object());
1✔
340
    synchronized (lock) {
1✔
341
      cached = consumerCache.get(cacheKey);
1✔
342
      if (cached != null) {
1!
343
        return cached;
×
344
      }
345
      String actual = doEnsureConsumer(
1✔
346
          streamName, consumerName, filterSubject, ackWait, maxDeliver, maxAckPending);
347
      consumerCache.put(cacheKey, actual);
1✔
348
      return actual;
1✔
349
    }
350
  }
351

352
  private String doEnsureConsumer(
353
      String streamName,
354
      String consumerName,
355
      String filterSubject,
356
      Duration ackWait,
357
      long maxDeliver,
358
      long maxAckPending) {
359
    try {
360
      ConsumerInfo info = safeGetConsumerInfo(streamName, consumerName);
1✔
361
      if (info != null) {
1✔
362
        ConsumerConfiguration cc = info.getConsumerConfiguration();
1✔
363
        if (cc.getAckWait() != null && !cc.getAckWait().equals(ackWait)) {
1!
364
          log.log(
×
365
              Level.WARNING,
366
              "Consumer " + streamName + "/" + consumerName
367
                  + " ackWait differs (existing=" + cc.getAckWait()
×
368
                  + ", desired=" + ackWait + ") - leaving existing config in place.");
369
        }
370
        if (cc.getMaxDeliver() != maxDeliver) {
1!
371
          log.log(
×
372
              Level.WARNING,
373
              "Consumer " + streamName + "/" + consumerName
374
                  + " maxDeliver differs (existing=" + cc.getMaxDeliver()
×
375
                  + ", desired=" + maxDeliver + ") - leaving existing config in place.");
376
        }
377
        return consumerName;
1✔
378
      }
379
      if (!config.isAutoCreateConsumers()) {
1!
380
        throw new RqueueNatsException("Consumer '" + consumerName + "' on stream '" + streamName
×
381
            + "' does not exist and autoCreateConsumers=false");
382
      }
383
      ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder()
1✔
384
          .durable(consumerName)
1✔
385
          .ackPolicy(AckPolicy.Explicit)
1✔
386
          .deliverPolicy(DeliverPolicy.All)
1✔
387
          .ackWait(ackWait)
1✔
388
          .maxDeliver(maxDeliver)
1✔
389
          .maxAckPending(maxAckPending);
1✔
390
      if (filterSubject != null && !filterSubject.isEmpty()) {
1!
391
        // Filter to the work subject only so that scheduler entries (published to
392
        // <workSubject>.sched.*) are not delivered to this consumer before the scheduled time.
393
        // The NATS scheduler fires the triggered message to workSubject when the time arrives.
394
        ccBuilder.filterSubject(filterSubject);
1✔
395
      }
396
      jsm.addOrUpdateConsumer(streamName, ccBuilder.build());
1✔
397
      return consumerName;
1✔
398
    } catch (JetStreamApiException e) {
×
399
      throw new RqueueNatsException(
×
400
          "Failed to ensure consumer '" + consumerName + "' on stream '" + streamName + "'", e);
401
    } catch (IOException e) {
×
402
      throw new RqueueNatsException(
×
403
          "Failed to ensure consumer '" + consumerName + "' on stream '" + streamName + "'", e);
404
    }
405
  }
406

407
  /** Ensure a DLQ stream exists capturing dead-letter subjects (e.g. "rqueue.js.*.dlq"). */
408
  public void ensureDlqStream(String dlqStreamName, List<String> dlqSubjects) {
409
    if (!config.isAutoCreateDlqStream()) {
1✔
410
      return;
1✔
411
    }
412
    ensureStream(dlqStreamName, dlqSubjects);
1✔
413
  }
1✔
414

415
  // ---- private helpers --------------------------------------------------
416

417
  private StreamInfo safeGetStreamInfo(String streamName)
418
      throws IOException, JetStreamApiException {
419
    try {
420
      return jsm.getStreamInfo(streamName);
1✔
421
    } catch (JetStreamApiException e) {
1✔
422
      // 10059 = stream not found
423
      if (e.getApiErrorCode() == 10059 || e.getErrorCode() == 404) {
1!
424
        return null;
1✔
425
      }
426
      throw e;
×
427
    }
428
  }
429

430
  private ConsumerInfo safeGetConsumerInfo(String streamName, String consumerName)
431
      throws IOException, JetStreamApiException {
432
    try {
433
      return jsm.getConsumerInfo(streamName, consumerName);
1✔
434
    } catch (JetStreamApiException e) {
1✔
435
      // 10014 = consumer not found, 10059 = stream not found
436
      if (e.getApiErrorCode() == 10014 || e.getApiErrorCode() == 10059 || e.getErrorCode() == 404) {
1!
437
        return null;
1✔
438
      }
439
      throw e;
×
440
    }
441
  }
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc