• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.04
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 */
10
package com.github.sonus21.rqueue.nats.js;
11

12
import static java.nio.charset.StandardCharsets.UTF_8;
13

14
import com.github.sonus21.rqueue.core.RqueueMessage;
15
import com.github.sonus21.rqueue.core.spi.Capabilities;
16
import com.github.sonus21.rqueue.core.spi.MessageBroker;
17
import com.github.sonus21.rqueue.enums.QueueType;
18
import com.github.sonus21.rqueue.listener.QueueDetail;
19
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
20
import com.github.sonus21.rqueue.nats.RqueueNatsException;
21
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
22
import com.github.sonus21.rqueue.serdes.RqJacksonSerDes;
23
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
24
import com.github.sonus21.rqueue.serdes.SerializationUtils;
25
import com.github.sonus21.rqueue.utils.PriorityUtils;
26
import io.nats.client.Connection;
27
import io.nats.client.Dispatcher;
28
import io.nats.client.JetStream;
29
import io.nats.client.JetStreamApiException;
30
import io.nats.client.JetStreamManagement;
31
import io.nats.client.JetStreamSubscription;
32
import io.nats.client.Message;
33
import io.nats.client.PullSubscribeOptions;
34
import io.nats.client.impl.Headers;
35
import java.io.IOException;
36
import java.time.Duration;
37
import java.time.Instant;
38
import java.time.ZoneOffset;
39
import java.time.format.DateTimeFormatter;
40
import java.util.ArrayList;
41
import java.util.Collections;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.concurrent.ConcurrentHashMap;
45
import java.util.function.Consumer;
46
import java.util.logging.Level;
47
import java.util.logging.Logger;
48
import reactor.core.publisher.Mono;
49

50
/**
51
 * JetStream-backed implementation of {@link MessageBroker}.
52
 *
53
 * <p>This class keeps a per-instance in-memory map ({@code inFlight}) of NATS messages popped via
54
 * {@link #pop} so that {@link #ack} / {@link #nack} can locate the underlying NATS message handle.
55
 * The map is intentionally local: a process restart loses any pending entries, which is consistent
56
 * with the v1 capability set declaring no scheduled introspection. NATS itself will redeliver
57
 * unacked messages after {@code ackWait}.
58
 *
59
 * <p>Delayed enqueue and any scheduled/cron features throw {@link UnsupportedOperationException}.
60
 * {@code moveExpired} is a no-op returning 0; redelivery is handled by JetStream's ack-wait timer.
61
 */
62
public class JetStreamMessageBroker implements MessageBroker, AutoCloseable {
63

64
  private static final Logger log = Logger.getLogger(JetStreamMessageBroker.class.getName());
1✔
65

66
  /**
67
   * JetStream publish header used by NATS >= 2.12 to hold a message until a UTC delivery time
68
   * (ADR-51). Value must be RFC 3339 UTC, e.g. {@code 2026-05-09T12:30:00Z}.
69
   */
70
  static final String HDR_NEXT_DELIVER_TIME = "Nats-Next-Deliver-Time";
71

72
  /**
73
   * Enrichment header: epoch-ms at which this message was scheduled to be processed.
74
   * Written at scheduling publish time; read back in {@code enrichFromDelivery} so that
75
   * {@link RqueueMessage#getProcessAt()} can be populated without deserializing the payload.
76
   */
77
  static final String HDR_PROCESS_AT = "Rqueue-Process-At";
78

79
  /**
80
   * Enrichment header: period in milliseconds for periodic messages.
81
   * Written at scheduling publish time; read back in {@code enrichFromDelivery} to restore
82
   * {@link RqueueMessage#getPeriod()} for payloads that have it as zero.
83
   */
84
  static final String HDR_PERIOD = "Rqueue-Period";
85

86
  /**
87
   * Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects
88
   * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. Callers that
89
   * want long-poll semantics should pass the desired wait explicitly (e.g. the listener
90
   * container's {@code pollingInterval}); this constant only guards against accidental zero waits
91
   * from non-listener callers.
92
   */
93
  private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50);
1✔
94

95
  /** RFC 3339 UTC formatter for the {@code Nats-Next-Deliver-Time} header value. */
96
  private static final DateTimeFormatter RFC3339_UTC =
1✔
97
      DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZone(ZoneOffset.UTC);
1✔
98

99
  private final Connection connection;
100
  private final JetStream js;
101
  private final JetStreamManagement jsm;
102
  private final RqueueNatsConfig config;
103
  private final RqueueSerDes serdes;
104
  private final NatsProvisioner provisioner;
105
  private final boolean schedulingSupported;
106
  private final Capabilities caps;
107

108
  /**
109
   * keyed by {@code "<consumerName>::<RqueueMessage.id>"}, value is the underlying NATS
110
   * Message for ack/nak. The consumer prefix is required for Limits-retention streams where
111
   * multiple durable consumers each receive their own copy of every message — keying on
112
   * just the message id would let one consumer's {@code put} overwrite another's, and the
113
   * subsequent ack would target the wrong NATS Message handle (leaving the original delivery
114
   * stuck in {@code numAckPending} until {@code AckWait} expires).
115
   */
116
  private final ConcurrentHashMap<String, Message> inFlight = new ConcurrentHashMap<>();
1✔
117

118
  private static String inFlightKey(String consumerName, String messageId) {
119
    return (consumerName == null ? "" : consumerName) + "::" + messageId;
1✔
120
  }
121

122
  /**
123
   * Cached pull subscriptions keyed by stream + consumerName so we don't re-bind on every pop.
124
   */
125
  private final ConcurrentHashMap<String, JetStreamSubscription> subscriptionCache =
1✔
126
      new ConcurrentHashMap<>();
127

128
  // Public so tests in sibling packages (e.g. JetStreamMessageBrokerDelayThrowsTest) can build a
129
  // broker directly without going through builder() — keeps the regression caught by that test
130
  // pinned to the constructor signature itself.
131
  public JetStreamMessageBroker(
132
      Connection connection,
133
      JetStream js,
134
      JetStreamManagement jsm,
135
      RqueueNatsConfig config,
136
      RqueueSerDes serdes,
137
      NatsProvisioner provisioner) {
1✔
138
    this.connection = connection;
1✔
139
    this.js = js;
1✔
140
    this.jsm = jsm;
1✔
141
    this.config = config;
1✔
142
    this.serdes = serdes;
1✔
143
    this.provisioner = provisioner;
1✔
144
    this.schedulingSupported = provisioner != null && provisioner.isMessageSchedulingSupported();
1✔
145
    this.caps = new Capabilities(
1✔
146
        schedulingSupported, // supportsDelayedEnqueue  — requires NATS >= 2.12
147
        false, // supportsScheduledIntrospection — no inspectable scheduled-zset
148
        false, // supportsCronJobs         — no server-side cron
149
        false, // usesPrimaryHandlerDispatch — no Redis processing-ZSET
150
        true, // supportsViewData         — peek() reads from JetStream stream
151
        true); // supportsMoveMessage      — NatsRqueueUtilityService.moveMessage()
152
  }
1✔
153

154
  public static Builder builder() {
155
    return new Builder();
1✔
156
  }
157

158
  // ---- subject / stream naming -------------------------------------------
159

160
  private String subjectFor(QueueDetail q) {
161
    return config.getSubjectPrefix() + q.getName();
1✔
162
  }
163

164
  private String streamFor(QueueDetail q) {
165
    return config.getStreamPrefix() + q.getName();
1✔
166
  }
167

168
  /**
169
   * Resolve the priority-specific subject. Uses the same {@code "_priority"} suffix as
170
   * {@link com.github.sonus21.rqueue.utils.PriorityUtils#getSuffix(String)} so the subject
171
   * matches the expanded {@link QueueDetail#getName()} used by the poller (e.g. {@code "pq_high"}).
172
   */
173
  private String subjectFor(QueueDetail q, String priority) {
174
    if (priority == null || priority.isEmpty()) {
1✔
175
      return subjectFor(q);
1✔
176
    }
177
    return config.getSubjectPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
1✔
178
  }
179

180
  /**
181
   * Resolve the priority-specific stream. Uses the same {@code "_priority"} suffix as
182
   * {@link com.github.sonus21.rqueue.utils.PriorityUtils#getSuffix(String)} so the stream name
183
   * matches what the poller derives from the expanded {@link QueueDetail#getName()}.
184
   */
185
  private String streamFor(QueueDetail q, String priority) {
186
    if (priority == null || priority.isEmpty()) {
1✔
187
      return streamFor(q);
1✔
188
    }
189
    return config.getStreamPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
1✔
190
  }
191

192
  private String dlqStreamFor(QueueDetail q) {
193
    return streamFor(q) + config.getDlqStreamSuffix();
1✔
194
  }
195

196
  private String dlqSubjectFor(QueueDetail q) {
197
    return subjectFor(q) + config.getDlqSubjectSuffix();
1✔
198
  }
199

200
  /** Stream description shown in {@code nats stream info} so operators can map back to rqueue. */
201
  private static String streamDescription(QueueDetail q) {
202
    return "rqueue queue: " + q.getName();
1✔
203
  }
204

205
  /** Stream description for the priority sub-stream. */
206
  private static String streamDescription(QueueDetail q, String priority) {
207
    return priority == null || priority.isEmpty()
1✔
208
        ? streamDescription(q)
1✔
209
        : "rqueue queue: " + q.getName() + " (priority=" + priority + ")";
1✔
210
  }
211

212
  private static String dlqStreamDescription(QueueDetail q) {
213
    return "rqueue DLQ for queue: " + q.getName();
1✔
214
  }
215

216
  // ---- MessageBroker -----------------------------------------------------
217

218
  @Override
219
  public void enqueue(QueueDetail q, RqueueMessage m) {
220
    String subject = subjectFor(q);
1✔
221
    String stream = streamFor(q);
1✔
222
    provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
1✔
223
    Headers headers = new Headers();
1✔
224
    if (m.getId() != null) {
1!
225
      headers.add("Nats-Msg-Id", m.getId());
1✔
226
    }
227
    try {
228
      byte[] payload = serdes.serialize(m);
1✔
229
      js.publish(subject, headers, payload);
1✔
230
    } catch (IOException | JetStreamApiException e) {
1✔
231
      throw new RqueueNatsException(
1✔
232
          "Failed to enqueue message id="
233
              + m.getId()
1✔
234
              + " queue="
235
              + q.getName()
1✔
236
              + " subject="
237
              + subject,
238
          e);
239
    } catch (RuntimeException e) {
×
240
      throw new RqueueNatsException(
×
241
          "Failed to serialize/enqueue message id="
242
              + m.getId()
×
243
              + " queue="
244
              + q.getName()
×
245
              + " subject="
246
              + subject,
247
          e);
248
    }
1✔
249
  }
1✔
250

251
  @Override
252
  public void enqueue(QueueDetail q, String priority, RqueueMessage m) {
253
    String subject = subjectFor(q, priority);
1✔
254
    String stream = streamFor(q, priority);
1✔
255
    provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q, priority));
1✔
256
    Headers headers = new Headers();
1✔
257
    if (m.getId() != null) {
1!
258
      headers.add("Nats-Msg-Id", m.getId());
1✔
259
    }
260
    try {
261
      byte[] payload = serdes.serialize(m);
1✔
262
      js.publish(subject, headers, payload);
1✔
263
    } catch (IOException | JetStreamApiException e) {
×
264
      throw new RqueueNatsException(
×
265
          "Failed to enqueue message id="
266
              + m.getId()
×
267
              + " queue="
268
              + q.getName()
×
269
              + " priority="
270
              + priority
271
              + " subject="
272
              + subject,
273
          e);
274
    } catch (RuntimeException e) {
×
275
      throw new RqueueNatsException(
×
276
          "Failed to serialize/enqueue message id="
277
              + m.getId()
×
278
              + " queue="
279
              + q.getName()
×
280
              + " priority="
281
              + priority
282
              + " subject="
283
              + subject,
284
          e);
285
    }
1✔
286
  }
1✔
287

288
  @Override
289
  public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
290
    if (!schedulingSupported) {
1✔
291
      throw new RqueueNatsException(
1✔
292
          "NATS message scheduling (ADR-51) is not available: the connected server is older than "
293
              + NatsProvisioner.SCHEDULING_MIN_VERSION
294
              + ". Upgrade NATS to "
295
              + NatsProvisioner.SCHEDULING_MIN_VERSION
296
              + "+ or use the Redis backend for delayed messages.");
297
    }
298
    String subject = subjectFor(q);
1✔
299
    String stream = streamFor(q);
1✔
300
    provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
1✔
301
    Headers headers = buildSchedulingHeaders(m, delayMs);
1✔
302
    try {
303
      byte[] payload = serdes.serialize(m);
1✔
304
      js.publish(subject, headers, payload);
1✔
NEW
305
    } catch (IOException | JetStreamApiException e) {
×
NEW
306
      throw new RqueueNatsException(
×
307
          "Failed to enqueue scheduled message id="
NEW
308
              + m.getId()
×
309
              + " queue="
NEW
310
              + q.getName()
×
311
              + " subject="
312
              + subject,
313
          e);
NEW
314
    } catch (RuntimeException e) {
×
NEW
315
      throw new RqueueNatsException(
×
316
          "Failed to serialize/enqueue scheduled message id="
NEW
317
              + m.getId()
×
318
              + " queue="
NEW
319
              + q.getName()
×
320
              + " subject="
321
              + subject,
322
          e);
323
    }
1✔
324
  }
1✔
325

326
  @Override
327
  public Mono<Void> enqueueReactive(QueueDetail q, RqueueMessage m) {
328
    String subject = subjectFor(q);
1✔
329
    String stream = streamFor(q);
1✔
330
    try {
331
      provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
1✔
332
    } catch (Exception e) {
×
333
      return Mono.error(new RqueueNatsException(
×
334
          "Failed to provision stream for reactive enqueue id="
335
              + m.getId()
×
336
              + " queue="
337
              + q.getName(),
×
338
          e));
339
    }
1✔
340
    Headers headers = new Headers();
1✔
341
    if (m.getId() != null) {
1!
342
      headers.add("Nats-Msg-Id", m.getId());
1✔
343
    }
344
    byte[] payload;
345
    try {
346
      payload = serdes.serialize(m);
1✔
347
    } catch (RuntimeException | IOException e) {
×
348
      return Mono.error(new RqueueNatsException(
×
349
          "Failed to serialize message id="
350
              + m.getId()
×
351
              + " queue="
352
              + q.getName()
×
353
              + " subject="
354
              + subject,
355
          e));
356
    }
1✔
357
    return Mono.fromFuture(() -> js.publishAsync(subject, headers, payload))
1✔
358
        .onErrorMap(e -> e instanceof RqueueNatsException
1!
359
            ? e
×
360
            : new RqueueNatsException(
1✔
361
                "Failed to enqueue message id="
362
                    + m.getId()
1✔
363
                    + " queue="
364
                    + q.getName()
1✔
365
                    + " subject="
366
                    + subject,
367
                e))
368
        .then();
1✔
369
  }
370

371
  @Override
372
  public Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long delayMs) {
373
    if (!schedulingSupported) {
1!
374
      return Mono.error(new RqueueNatsException(
1✔
375
          "NATS message scheduling (ADR-51) is not available: the connected server is older than "
376
              + NatsProvisioner.SCHEDULING_MIN_VERSION
377
              + ". Upgrade NATS to "
378
              + NatsProvisioner.SCHEDULING_MIN_VERSION
379
              + "+ or use the Redis backend for delayed messages."));
380
    }
NEW
381
    String subject = subjectFor(q);
×
NEW
382
    String stream = streamFor(q);
×
383
    try {
NEW
384
      provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
×
NEW
385
    } catch (Exception e) {
×
NEW
386
      return Mono.error(new RqueueNatsException(
×
387
          "Failed to provision stream for reactive scheduled enqueue id="
NEW
388
              + m.getId()
×
389
              + " queue="
NEW
390
              + q.getName(),
×
391
          e));
NEW
392
    }
×
NEW
393
    Headers headers = buildSchedulingHeaders(m, delayMs);
×
394
    byte[] payload;
395
    try {
NEW
396
      payload = serdes.serialize(m);
×
NEW
397
    } catch (RuntimeException | IOException e) {
×
NEW
398
      return Mono.error(new RqueueNatsException(
×
399
          "Failed to serialize scheduled message id="
NEW
400
              + m.getId()
×
401
              + " queue="
NEW
402
              + q.getName()
×
403
              + " subject="
404
              + subject,
405
          e));
NEW
406
    }
×
NEW
407
    return Mono.fromFuture(() -> js.publishAsync(subject, headers, payload))
×
NEW
408
        .onErrorMap(e -> e instanceof RqueueNatsException
×
NEW
409
            ? e
×
NEW
410
            : new RqueueNatsException(
×
411
                "Failed to enqueue scheduled message id="
NEW
412
                    + m.getId()
×
413
                    + " queue="
NEW
414
                    + q.getName()
×
415
                    + " subject="
416
                    + subject,
417
                e))
NEW
418
        .then();
×
419
  }
420

421
  @Override
422
  public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
423
    return popInternal(
1✔
424
        streamFor(q),
1✔
425
        subjectFor(q),
1✔
426
        resolveConsumerName(q.getName(), consumerName),
1✔
427
        batch,
428
        wait,
429
        resolveAckWait(q, config),
1✔
430
        resolveMaxDeliver(q, config));
1✔
431
  }
432

433
  @Override
434
  public List<RqueueMessage> pop(
435
      QueueDetail q, String priority, String consumerName, int batch, Duration wait) {
436
    return popInternal(
×
437
        streamFor(q, priority),
×
438
        subjectFor(q, priority),
×
439
        resolveConsumerName(q.getName(), consumerName),
×
440
        batch,
441
        wait,
442
        resolveAckWait(q, config),
×
443
        resolveMaxDeliver(q, config));
×
444
  }
445

446
  private static String resolveConsumerName(String queueName, String consumerName) {
447
    return (consumerName != null && !consumerName.isEmpty()) ? consumerName : "rqueue-" + queueName;
1!
448
  }
449

450
  /**
451
   * Resolve the JetStream {@code ackWait} for this queue's pull consumer: per-queue
452
   * {@link QueueDetail#getVisibilityTimeout()} (when positive), else the global
453
   * {@code RqueueNatsConfig.ConsumerDefaults.getAckWait()}. Honouring visibilityTimeout makes
454
   * the NATS backend match the contract every other rqueue backend exposes: a message stays
455
   * invisible to other consumers for that window and is redelivered if not acked in time.
456
   */
457
  public static Duration resolveAckWait(QueueDetail q, RqueueNatsConfig config) {
458
    long vt = q.getVisibilityTimeout();
1✔
459
    if (vt > 0) {
1✔
460
      return Duration.ofMillis(vt);
1✔
461
    }
462
    return config.getConsumerDefaults().getAckWait();
1✔
463
  }
464

465
  /**
466
   * Resolve the JetStream {@code maxDeliver} from per-queue {@link QueueDetail#getNumRetry()}
467
   * (counted as initial delivery + N retries = numRetry + 1). The {@link Integer#MAX_VALUE}
468
   * "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); non-positive
469
   * numRetry falls back to {@code RqueueNatsConfig.ConsumerDefaults.getMaxDeliver()}.
470
   */
471
  public static long resolveMaxDeliver(QueueDetail q, RqueueNatsConfig config) {
472
    int numRetry = q.getNumRetry();
1✔
473
    if (numRetry == Integer.MAX_VALUE) {
1✔
474
      return -1L;
1✔
475
    }
476
    if (numRetry > 0) {
1✔
477
      return numRetry + 1L;
1✔
478
    }
479
    return config.getConsumerDefaults().getMaxDeliver();
1✔
480
  }
481

482
  private List<RqueueMessage> popInternal(
483
      String stream,
484
      String subject,
485
      String consumerName,
486
      int batch,
487
      Duration wait,
488
      Duration ackWait,
489
      long maxDeliver) {
490
    // Honour the caller-supplied wait — this is the listener container's pollingInterval for
491
    // RqueueMessagePoller, and lets JetStream long-poll instead of the broker firing a steady
492
    // stream of $JS.API.CONSUMER.MSG.NEXT requests. Only fall back when the caller didn't
493
    // express a preference; zero/negative waits are rounded up to the JetStream minimum.
494
    Duration fetchWait;
495
    if (wait == null) {
1!
496
      fetchWait = config.getDefaultFetchWait();
×
497
    } else if (wait.isZero() || wait.isNegative()) {
1!
498
      fetchWait = MIN_FETCH_WAIT;
×
499
    } else {
500
      fetchWait = wait;
1✔
501
    }
502
    String key = stream + "/" + consumerName;
1✔
503
    JetStreamSubscription sub = subscriptionCache.computeIfAbsent(key, k -> {
1✔
504
      // NatsStreamValidator provisions the stream and consumer at bootstrap (RqueueBootstrapEvent).
505
      // NatsProvisioner caches both, so ensureConsumer here is a map lookup — no backend call.
506
      try {
507
        String actualConsumerName = provisioner.ensureConsumer(
1✔
508
            stream,
509
            consumerName,
510
            ackWait,
511
            maxDeliver,
512
            config.getConsumerDefaults().getMaxAckPending());
1✔
513
        PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, actualConsumerName);
1✔
514
        // Consumer has no filter subject; pass null so the NATS client doesn't validate
515
        // the subject against a (nonexistent) filter — SUB-90011 otherwise.
516
        return js.subscribe(null, opts);
1✔
517
      } catch (IOException | JetStreamApiException e) {
×
518
        throw new RqueueNatsException(
×
519
            "Failed to bind pull subscription stream=" + stream + " consumer=" + consumerName, e);
520
      }
521
    });
522

523
    List<Message> msgs = sub.fetch(batch, fetchWait);
1✔
524
    List<RqueueMessage> out = new ArrayList<>(msgs.size());
1✔
525
    for (Message nm : msgs) {
1✔
526
      try {
527
        RqueueMessage rm = serdes.deserialize(nm.getData(), RqueueMessage.class);
1✔
528
        enrichFromDelivery(rm, nm);
1✔
529
        if (rm.getId() != null) {
1!
530
          inFlight.put(inFlightKey(consumerName, rm.getId()), nm);
1✔
531
        }
532
        out.add(rm);
1✔
533
      } catch (RuntimeException | IOException e) {
×
534
        log.log(
×
535
            Level.WARNING,
536
            "Failed to deserialize JetStream payload on subject "
537
                + subject
538
                + "; nak'ing for redelivery",
539
            e);
540
        try {
541
          nm.nak();
×
542
        } catch (RuntimeException ignored) {
×
543
          // best-effort
544
        }
×
545
      }
1✔
546
    }
1✔
547
    return out;
1✔
548
  }
549

550
  @Override
551
  public boolean ack(QueueDetail q, RqueueMessage m) {
552
    if (m.getId() == null) {
1!
553
      return false;
×
554
    }
555
    Message nm = inFlight.remove(inFlightKey(q.resolvedConsumerName(), m.getId()));
1✔
556
    if (nm == null) {
1✔
557
      return false;
1✔
558
    }
559
    nm.ack();
1✔
560
    return true;
1✔
561
  }
562

563
  /**
564
   * Extend the visibility timeout for a message that is still being processed. Sends a NATS
565
   * WIP (work-in-progress) signal to the server, which resets the consumer's {@code ackWait}
566
   * timer back to its configured value. Call this periodically from a long-running handler to
567
   * prevent JetStream from redelivering the message to another consumer while work is in flight.
568
   *
569
   * <p>The {@code deltaMs} hint from the caller is ignored — NATS always resets to the consumer's
570
   * fixed {@code ackWait}; there is no per-message extension API in JetStream.
571
   *
572
   * @return {@code true} if the WIP signal was sent; {@code false} if the message is no longer
573
   *         tracked (already acked, nacked, or the process restarted).
574
   */
575
  @Override
576
  public boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long deltaMs) {
577
    if (m.getId() == null) {
1!
NEW
578
      return false;
×
579
    }
580
    Message nm = inFlight.get(inFlightKey(q.resolvedConsumerName(), m.getId()));
1✔
581
    if (nm == null) {
1!
NEW
582
      return false;
×
583
    }
584
    try {
585
      nm.inProgress();
1✔
586
      return true;
1✔
NEW
587
    } catch (RuntimeException e) {
×
NEW
588
      log.log(
×
589
          Level.WARNING,
NEW
590
          "inProgress failed for message id=" + m.getId() + " queue=" + q.getName(),
×
591
          e);
NEW
592
      return false;
×
593
    }
594
  }
595

596
  @Override
597
  public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) {
598
    if (m.getId() == null) {
1!
599
      return false;
×
600
    }
601
    Message nm = inFlight.remove(inFlightKey(q.resolvedConsumerName(), m.getId()));
1✔
602
    if (nm == null) {
1✔
603
      return false;
1✔
604
    }
605
    nm.nakWithDelay(Duration.ofMillis(Math.max(0L, retryDelayMs)));
1✔
606
    return true;
1✔
607
  }
608

609
  @Override
610
  public void moveToDlq(
611
      QueueDetail source,
612
      String targetQueue,
613
      RqueueMessage old,
614
      RqueueMessage updated,
615
      long delayMs) {
616
    // Ack the original NATS message so it is removed from the source stream.
617
    if (old.getId() != null) {
×
NEW
618
      Message nm = inFlight.remove(inFlightKey(source.resolvedConsumerName(), old.getId()));
×
619
      if (nm != null) {
×
620
        nm.ack();
×
621
      }
622
    }
623
    // targetQueue is the configured deadLetterQueue name (e.g. "job-morgue"). Map it to a NATS
624
    // stream and subject using the same prefix convention as any other queue.
625
    // NATS JetStream has no server-side delayed publish, so delayMs is ignored.
626
    String dlqStream = config.getStreamPrefix() + targetQueue;
×
627
    String dlqSubject = config.getSubjectPrefix() + targetQueue;
×
628
    Headers headers = new Headers();
×
629
    if (updated.getId() != null) {
×
630
      headers.add("Nats-Msg-Id", updated.getId() + "-dlq");
×
631
    }
632
    try {
633
      provisioner.ensureStream(
×
634
          dlqStream, List.of(dlqSubject), QueueType.QUEUE, "rqueue DLQ for queue: " + targetQueue);
×
635
      byte[] payload = serdes.serialize(updated);
×
636
      js.publish(dlqSubject, headers, payload);
×
637
    } catch (IOException | JetStreamApiException e) {
×
638
      throw new RqueueNatsException(
×
639
          "Failed to move message id=" + old.getId() + " to DLQ stream=" + dlqStream, e);
×
640
    } catch (RuntimeException e) {
×
641
      throw new RqueueNatsException(
×
642
          "Failed to serialize/publish message id=" + old.getId() + " to DLQ stream=" + dlqStream,
×
643
          e);
644
    }
×
645
  }
×
646

647
  @Override
648
  public long moveExpired(QueueDetail q, long now, int batch) {
649
    // No-op: JetStream's ack-wait + maxDeliver + DLQ advisory bridge handle redelivery and
650
    // dead-lettering. v1 capabilities advertise no scheduled introspection.
651
    return 0L;
1✔
652
  }
653

654
  @Override
655
  public List<RqueueMessage> peek(QueueDetail q, long offset, long count) {
656
    return peek(q, null, offset, count);
1✔
657
  }
658

659
  @Override
660
  public List<RqueueMessage> peek(QueueDetail q, String consumerName, long offset, long count) {
661
    String stream = streamFor(q);
1✔
662
    if (count <= 0) {
1!
NEW
663
      return Collections.emptyList();
×
664
    }
665
    try {
666
      // Read messages directly from the stream by sequence number via the JetStream
667
      // Management API. This avoids creating any consumer, which sidesteps two NATS 2.12+
668
      // restrictions on WorkQueue-retention streams:
669
      //   1. Pull consumers require AckPolicy.Explicit (error 10084).
670
      //   2. Multiple consumers on a WorkQueue stream must be mutually exclusive via
671
      //      filter subjects (error 10100) — incompatible with the always-on durable
672
      //      consumer that the listener container uses.
673
      // Reading by sequence is purely non-destructive and works regardless of retention
674
      // policy or what other consumers exist on the stream.
675
      io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream);
1✔
676
      long firstSeq = info.getStreamState().getFirstSequence();
1✔
677
      long lastSeq = info.getStreamState().getLastSequence();
1✔
678
      if (lastSeq < firstSeq) {
1!
NEW
679
        return Collections.emptyList();
×
680
      }
681
      // Consumer-aware base sequence for Limits-retention streams: when a consumerName is
682
      // provided, start from that consumer's lowest unacked sequence (ackFloor + 1) so the
683
      // dashboard shows everything this subscriber still has work to do on — both messages
684
      // already delivered but not yet acked (in-flight) and messages still to be delivered
685
      // (pending). Using delivered.streamSeq + 1 would hide the in-flight window, which
686
      // surprises operators who see "in-flight = 15" but get an empty explorer.
687
      // WorkQueue streams have a single shared consumer (msgs are removed on ack) so the
688
      // stream's firstSeq is already the right base — skip the lookup.
689
      long base = firstSeq;
1✔
690
      if (consumerName != null
1✔
691
          && !consumerName.isEmpty()
1!
692
          && info.getConfiguration() != null
1!
693
          && info.getConfiguration().getRetentionPolicy()
1!
694
              == io.nats.client.api.RetentionPolicy.Limits) {
695
        try {
696
          io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName);
1✔
697
          if (ci != null && ci.getAckFloor() != null) {
1!
698
            base = Math.max(firstSeq, ci.getAckFloor().getStreamSequence() + 1);
1✔
699
          }
700
        } catch (JetStreamApiException ignore) {
1✔
701
          // consumer may have disappeared mid-walk; fall back to stream firstSeq
702
        }
1✔
703
      }
704
      long startSeq = base + Math.max(0L, offset);
1✔
705
      long endSeq = Math.min(lastSeq, startSeq + count - 1);
1✔
706
      List<RqueueMessage> out = new ArrayList<>();
1✔
707
      for (long seq = startSeq; seq <= endSeq && out.size() < count; seq++) {
1!
708
        try {
709
          io.nats.client.api.MessageInfo mi = jsm.getMessage(stream, seq);
1✔
710
          if (mi == null || mi.getData() == null) {
1!
NEW
711
            continue;
×
712
          }
713
          out.add(serdes.deserialize(mi.getData(), RqueueMessage.class));
1✔
NEW
714
        } catch (JetStreamApiException notFound) {
×
715
          // Sequence may have been purged or skipped (e.g. WorkQueue acks); keep walking.
NEW
716
          log.log(
×
717
              Level.FINE, "peek: skipping missing seq=" + seq + " on stream=" + stream, notFound);
NEW
718
        } catch (Exception deserErr) {
×
NEW
719
          log.log(Level.WARNING, "peek: skipping undeserializable seq=" + seq, deserErr);
×
720
        }
1✔
721
      }
722
      return out;
1✔
723
    } catch (IOException | JetStreamApiException e) {
×
724
      throw new RqueueNatsException(
×
725
          "Failed to peek queue=" + q.getName() + " offset=" + offset + " count=" + count, e);
×
726
    }
727
  }
728

729
  @Override
730
  public long size(QueueDetail q) {
731
    String stream = streamFor(q);
1✔
732
    try {
733
      io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream);
1✔
734
      io.nats.client.api.RetentionPolicy retention =
735
          info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null;
1!
736
      // WorkQueue retention removes messages on ack, so streamState.msgCount is the exact
737
      // count of outstanding work — the natural "pending size" for queue mode. For Limits
738
      // retention msgCount is the total retained messages regardless of consumer progress,
739
      // so we compute the worst-case outstanding work from stream position math:
740
      //   outstanding ≈ lastSeq - min(consumer.ackFloor.streamSeq)
741
      // which is the messages the slowest durable consumer has not yet acked. This matches
742
      // the per-consumer pending semantic in subscribers() (numPending + numAckPending) so
743
      // the queue-level "size" and the per-row pending counts agree on what "outstanding"
744
      // means.
745
      if (retention == io.nats.client.api.RetentionPolicy.Limits) {
1!
NEW
746
        return approximateLimitsPending(stream, info);
×
747
      }
748
      return info.getStreamState().getMsgCount();
1✔
NEW
749
    } catch (IOException | JetStreamApiException e) {
×
NEW
750
      throw new RqueueNatsException("Failed to read stream size for queue=" + q.getName(), e);
×
751
    }
752
  }
753

754
  /**
755
   * Position-based outstanding-work estimate for a Limits-retention stream:
756
   * {@code lastSeq - min(consumer.ackFloor.streamSeq)} across all durable consumers — i.e. the
757
   * size of the unacked window for the slowest consumer (counts both yet-to-deliver and
758
   * delivered-but-unacked messages). Returns {@code msgCount} as a fallback when no consumers
759
   * exist or the enumeration fails, so the dashboard never misses a non-zero queue.
760
   */
761
  private long approximateLimitsPending(String stream, io.nats.client.api.StreamInfo info) {
NEW
762
    long lastSeq = info.getStreamState().getLastSequence();
×
NEW
763
    if (lastSeq <= 0) {
×
NEW
764
      return 0L;
×
765
    }
766
    try {
NEW
767
      List<String> consumers = jsm.getConsumerNames(stream);
×
NEW
768
      if (consumers == null || consumers.isEmpty()) {
×
769
        // No consumers attached: the entire retained range is outstanding from the perspective
770
        // of any future consumer. Stream's msgCount is the right approximation.
NEW
771
        return info.getStreamState().getMsgCount();
×
772
      }
NEW
773
      long minAckFloor = Long.MAX_VALUE;
×
NEW
774
      for (String consumer : consumers) {
×
775
        try {
NEW
776
          io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer);
×
NEW
777
          if (ci == null || ci.getAckFloor() == null) {
×
NEW
778
            continue;
×
779
          }
NEW
780
          long ackFloor = ci.getAckFloor().getStreamSequence();
×
NEW
781
          if (ackFloor < minAckFloor) {
×
NEW
782
            minAckFloor = ackFloor;
×
783
          }
NEW
784
        } catch (IOException | JetStreamApiException ignore) {
×
785
          // best-effort; skip consumers that disappear mid-walk
786
        }
×
787
      }
×
NEW
788
      if (minAckFloor == Long.MAX_VALUE) {
×
NEW
789
        return info.getStreamState().getMsgCount();
×
790
      }
NEW
791
      return Math.max(0L, lastSeq - minAckFloor);
×
NEW
792
    } catch (IOException | JetStreamApiException ignore) {
×
NEW
793
      return info.getStreamState().getMsgCount();
×
794
    }
795
  }
796

797
  /**
798
   * Reports whether {@link #size(QueueDetail)} is an approximation. True for Limits-retention
799
   * streams (per-consumer position math) and false for WorkQueue streams (msgCount is exact).
800
   */
801
  public boolean isSizeApproximate(QueueDetail q) {
NEW
802
    String stream = streamFor(q);
×
803
    try {
NEW
804
      io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream);
×
805
      io.nats.client.api.RetentionPolicy retention =
NEW
806
          info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null;
×
NEW
807
      return retention == io.nats.client.api.RetentionPolicy.Limits;
×
NEW
808
    } catch (IOException | JetStreamApiException e) {
×
NEW
809
      return false;
×
810
    }
811
  }
812

813
  /**
814
   * Per-consumer subscriber view used by the queue-detail dashboard. Walks all durable
815
   * consumers on the queue's stream and reports each one's pending + in-flight counts as
816
   * separate columns — same split as the Redis backend's processing ZSET vs ready LIST.
817
   *
818
   * <p><b>Pending semantics.</b> {@code pending} is yet-to-deliver work for this consumer.
819
   * For WorkQueue retention this is the stream's shared {@code msgCount} (every row shows the
820
   * same number, marked {@code pendingShared = true}); for Limits retention it is the
821
   * consumer's exact {@code numPending}. {@code inFlight} is always the consumer's exclusive
822
   * {@code numAckPending}: messages delivered but not yet acked. The two are disjoint —
823
   * {@code pending} excludes anything currently in flight — so an operator reading the row
824
   * sees the work split between "still to dispatch" and "currently being processed". Total
825
   * outstanding work for the consumer is the sum of the two, which is what the explorer
826
   * surfaces when the operator clicks the consumer link.
827
   *
828
   * <p>If consumer enumeration fails or the stream is unprovisioned, falls back to the
829
   * default single-row implementation so the dashboard still renders something useful.
830
   */
831
  @Override
832
  public java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> subscribers(
833
      QueueDetail q) {
834
    String stream = streamFor(q);
×
835
    try {
NEW
836
      io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream);
×
837
      io.nats.client.api.RetentionPolicy retention =
NEW
838
          info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null;
×
NEW
839
      boolean pendingIsShared = retention != io.nats.client.api.RetentionPolicy.Limits;
×
NEW
840
      long sharedPending = info.getStreamState().getMsgCount();
×
NEW
841
      List<String> consumers = jsm.getConsumerNames(stream);
×
NEW
842
      if (consumers == null || consumers.isEmpty()) {
×
NEW
843
        return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q);
×
844
      }
NEW
845
      java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> out =
×
NEW
846
          new java.util.ArrayList<>(consumers.size());
×
NEW
847
      for (String consumer : consumers) {
×
848
        try {
NEW
849
          io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer);
×
NEW
850
          if (ci == null) {
×
NEW
851
            continue;
×
852
          }
NEW
853
          long pending = pendingIsShared ? sharedPending : ci.getNumPending();
×
NEW
854
          long inFlight = ci.getNumAckPending();
×
NEW
855
          out.add(new com.github.sonus21.rqueue.core.spi.SubscriberView(
×
856
              consumer, pending, inFlight, pendingIsShared));
NEW
857
        } catch (IOException | JetStreamApiException ignore) {
×
858
          // best-effort; skip consumers that disappear mid-walk
NEW
859
        }
×
NEW
860
      }
×
NEW
861
      if (out.isEmpty()) {
×
NEW
862
        return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q);
×
863
      }
NEW
864
      return out;
×
865
    } catch (IOException | JetStreamApiException e) {
×
NEW
866
      log.log(Level.WARNING, "subscribers() failed for stream=" + stream, e);
×
NEW
867
      return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q);
×
868
    }
869
  }
870

871
  /**
872
   * For Limits-retention streams, returns an exact per-consumer pending count
873
   * ({@code lastSeq - delivered.streamSeq}). For WorkQueue streams returns {@code null} so the
874
   * dashboard falls back to the single {@link #size(QueueDetail)} row — WorkQueue messages are
875
   * shared across consumers, so a per-consumer split is meaningless.
876
   *
877
   * <p>The map iteration order matches {@link io.nats.client.JetStreamManagement#getConsumerNames}
878
   * (insertion order), giving the dashboard a stable rendering.
879
   */
880
  @Override
881
  @Deprecated
882
  public java.util.Map<String, Long> consumerPendingSizes(QueueDetail q) {
NEW
883
    String stream = streamFor(q);
×
884
    try {
NEW
885
      io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream);
×
886
      io.nats.client.api.RetentionPolicy retention =
NEW
887
          info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null;
×
NEW
888
      if (retention != io.nats.client.api.RetentionPolicy.Limits) {
×
889
        // WorkQueue (and any future single-pool retention) doesn't have per-consumer pending.
NEW
890
        return null;
×
891
      }
NEW
892
      long lastSeq = info.getStreamState().getLastSequence();
×
NEW
893
      List<String> consumers = jsm.getConsumerNames(stream);
×
NEW
894
      if (consumers == null || consumers.isEmpty()) {
×
NEW
895
        return java.util.Collections.emptyMap();
×
896
      }
NEW
897
      java.util.Map<String, Long> out = new java.util.LinkedHashMap<>();
×
NEW
898
      for (String consumer : consumers) {
×
899
        try {
NEW
900
          io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer);
×
NEW
901
          if (ci == null) {
×
NEW
902
            continue;
×
903
          }
904
          // Prefer numPending when available (server-computed); fall back to position math.
NEW
905
          long pending = ci.getNumPending();
×
NEW
906
          if (pending == 0 && ci.getDelivered() != null) {
×
NEW
907
            long delivered = ci.getDelivered().getStreamSequence();
×
NEW
908
            pending = Math.max(0L, lastSeq - delivered);
×
909
          }
NEW
910
          out.put(consumer, pending);
×
NEW
911
        } catch (IOException | JetStreamApiException ignore) {
×
912
          // best-effort; skip consumers that disappear mid-walk
NEW
913
        }
×
NEW
914
      }
×
NEW
915
      return out;
×
NEW
916
    } catch (IOException | JetStreamApiException e) {
×
NEW
917
      log.log(Level.WARNING, "consumerPendingSizes failed for stream=" + stream, e);
×
NEW
918
      return null;
×
919
    }
920
  }
921

922
  @Override
923
  public AutoCloseable subscribe(String channel, Consumer<String> handler) {
924
    Dispatcher d =
1✔
925
        connection.createDispatcher(msg -> handler.accept(new String(msg.getData(), UTF_8)));
1✔
926
    d.subscribe(channel);
1✔
927
    return () -> {
1✔
928
      try {
929
        connection.closeDispatcher(d);
1✔
930
      } catch (RuntimeException e) {
×
931
        // best-effort close
932
      }
1✔
933
    };
1✔
934
  }
935

936
  @Override
937
  public void publish(String channel, String payload) {
938
    connection.publish(channel, payload.getBytes(UTF_8));
1✔
939
  }
1✔
940

941
  @Override
942
  public void onQueueRegistered(QueueDetail q) {
943
    String stream = streamFor(q);
1✔
944
    String subject = subjectFor(q);
1✔
945
    provisioner.ensureStream(stream, List.of(subject), q.getType(), streamDescription(q));
1✔
946
  }
1✔
947

948
  /**
949
   * NATS subjects use {@code .} as a hierarchy separator and stream / consumer names disallow it
950
   * outright. A queue name like {@code "orders.us"} would (a) silently turn the publish subject
951
   * into a two-level token tree and (b) make {@code rqueue-js-orders.us} an invalid stream name —
952
   * the JetStream API would reject it with an opaque error at first publish. Reject the name at
953
   * registration so the failure is loud and local.
954
   *
955
   * <p>{@code *} and {@code &gt;} are also illegal in subject tokens (they're NATS wildcards), and
956
   * whitespace is rejected by the server; check those too while we're here.
957
   */
958
  @Override
959
  public void validateQueueName(String queueName) {
960
    if (queueName == null || queueName.isEmpty()) {
1✔
961
      return;
1✔
962
    }
963
    for (int i = 0; i < queueName.length(); i++) {
1✔
964
      char c = queueName.charAt(i);
1✔
965
      if (c == '.' || c == '*' || c == '>' || Character.isWhitespace(c)) {
1✔
966
        throw new IllegalArgumentException("Queue name '"
1✔
967
            + queueName
968
            + "' contains illegal character '"
969
            + c
970
            + "' for the NATS backend. Subject hierarchy ('.'), wildcards ('*', '>') and"
971
            + " whitespace are not allowed in queue names — use '-' or '_' instead.");
972
      }
973
    }
974
  }
1✔
975

976
  @Override
977
  public Capabilities capabilities() {
978
    return caps;
1✔
979
  }
980

981
  /**
982
   * Headers for a scheduled (or periodic) JetStream publish.
983
   *
984
   * <p><b>Dedup key strategy — {@code Nats-Msg-Id}:</b>
985
   * <ul>
986
   *   <li>Scheduled messages ({@code processAt > 0}) — key is {@code id@processAt}.
987
   *       Each period of a recurring message has a unique {@code processAt} (advances by
988
   *       {@code period} each time), so consecutive periods never share a key and are never
989
   *       suppressed. Retries of the same period reuse the identical {@code processAt}, so
990
   *       the duplicate {@code scheduleNext} publish is correctly deduplicated by JetStream —
991
   *       preventing double-execution of a period when the handler fails and is redelivered.
992
   *   <li>Non-scheduled messages ({@code processAt == 0}) — plain {@code m.getId()}; each
993
   *       enqueue generates a fresh UUID so there is no collision risk.
994
   * </ul>
995
   *
996
   * <p><b>Enrichment headers</b> — written at publish time so they can be read back on
997
   * {@link #popInternal} without deserializing the payload:
998
   * <ul>
999
   *   <li>{@code Rqueue-Process-At} — epoch-ms at which the message should be processed;
1000
   *       used to set {@link RqueueMessage#setProcessAt} if the deserialized payload lacks it.
1001
   *   <li>{@code Rqueue-Period} — period in ms for periodic messages; used to set
1002
   *       {@link RqueueMessage#setPeriod} if the deserialized payload lacks it.
1003
   * </ul>
1004
   */
1005
  private Headers buildSchedulingHeaders(RqueueMessage m, long delayMs) {
1006
    Headers headers = new Headers();
1✔
1007
    if (m.getId() != null) {
1!
1008
      // Dedup key: id-at-processAt for scheduled messages (processAt > 0).
1009
      //   - Each period of a recurring message has a unique processAt → unique key → no
1010
      //     cross-period suppression.
1011
      //   - If scheduleNext is called again for the same period on redelivery after a handler
1012
      //     failure, processAt is identical → same key → JetStream deduplicates the second
1013
      //     publish and the period executes exactly once.
1014
      // For non-scheduled messages (processAt == 0) the plain id is used; each enqueue
1015
      // generates a fresh UUID so there is no collision risk.
1016
      String dedupKey = m.getProcessAt() > 0 ? m.getId() + "-at-" + m.getProcessAt() : m.getId();
1!
1017
      headers.add("Nats-Msg-Id", dedupKey);
1✔
1018
    }
1019
    long deliverAtMs =
1020
        m.getProcessAt() > 0 ? m.getProcessAt() : System.currentTimeMillis() + delayMs;
1!
1021
    String deliverAt = RFC3339_UTC.format(Instant.ofEpochMilli(deliverAtMs));
1✔
1022
    headers.add(HDR_NEXT_DELIVER_TIME, deliverAt);
1✔
1023
    // Enrichment headers — readable on pop without deserializing the payload
1024
    headers.add(HDR_PROCESS_AT, String.valueOf(deliverAtMs));
1✔
1025
    if (m.isPeriodic()) {
1✔
1026
      headers.add(HDR_PERIOD, String.valueOf(m.getPeriod()));
1✔
1027
    }
1028
    return headers;
1✔
1029
  }
1030

1031
  /**
1032
   * Enrich a deserialized {@link RqueueMessage} with delivery metadata from the NATS message.
1033
   *
1034
   * <p><b>Failure count</b> — derived from {@code metaData().deliveredCount() - 1} (JetStream
1035
   * tracks redeliveries in the reply-to subject). This is the authoritative source; we never
1036
   * re-publish on retry, so a static header set at publish time would always show 0.
1037
   *
1038
   * <p><b>Scheduling fields</b> — {@code processAt} and {@code period} are read from the
1039
   * enrichment headers ({@code Rqueue-Process-At}, {@code Rqueue-Period}) when the deserialized
1040
   * message has them as zero. This handles payloads published by older broker versions that
1041
   * predate per-field population, or any case where the JSON payload was trimmed.
1042
   */
1043
  private static void enrichFromDelivery(RqueueMessage rm, Message nm) {
1044
    // Failure count from JetStream redelivery metadata (the authoritative retry counter).
1045
    try {
1046
      long deliveredCount = nm.metaData().deliveredCount();
1✔
1047
      rm.setFailureCount((int) Math.max(0, deliveredCount - 1));
1✔
NEW
1048
    } catch (Exception ignored) {
×
1049
      // defensive: metadata absent on non-JetStream or synthetic messages
1050
    }
1✔
1051
    // Scheduling fields from publish-time enrichment headers.
1052
    if (rm.getProcessAt() <= 0) {
1✔
1053
      String processAtHdr =
1054
          nm.getHeaders() == null ? null : nm.getHeaders().getFirst(HDR_PROCESS_AT);
1!
1055
      if (processAtHdr != null) {
1!
1056
        try {
NEW
1057
          rm.setProcessAt(Long.parseLong(processAtHdr));
×
NEW
1058
        } catch (NumberFormatException ignored) {
×
1059
          // malformed header; leave processAt as-is
NEW
1060
        }
×
1061
      }
1062
    }
1063
    if (!rm.isPeriodic() && nm.getHeaders() != null) {
1!
1064
      String periodHdr = nm.getHeaders().getFirst(HDR_PERIOD);
1✔
1065
      if (periodHdr != null) {
1!
1066
        try {
NEW
1067
          long period = Long.parseLong(periodHdr);
×
NEW
1068
          if (period > 0) {
×
NEW
1069
            rm.setPeriod(period);
×
1070
          }
NEW
1071
        } catch (NumberFormatException ignored) {
×
1072
          // malformed header; leave period as-is
NEW
1073
        }
×
1074
      }
1075
    }
1076
  }
1✔
1077

1078
  @Override
1079
  public String storageKicker() {
1080
    return "NATS";
×
1081
  }
1082

1083
  @Override
1084
  public String storageDescription() {
1085
    return "Underlying NATS JetStream streams for the queues visible on this page.";
×
1086
  }
1087

1088
  @Override
1089
  public String storageDisplayName(QueueDetail q) {
1090
    return streamFor(q);
×
1091
  }
1092

1093
  @Override
1094
  public String dlqStorageDisplayName(QueueDetail q) {
1095
    return dlqStreamFor(q);
×
1096
  }
1097

1098
  /**
1099
   * Map the dashboard's Redis-shaped data-type tokens onto NATS terminology. Each per-queue
1100
   * stream uses the JetStream {@code WorkQueue} retention policy by default, so the pending
1101
   * row is labelled "Queue (Stream)". Completed messages are tracked in a KV bucket and DLQs
1102
   * are independent streams.
1103
   */
1104
  @Override
1105
  public String dataTypeLabel(
1106
      com.github.sonus21.rqueue.models.enums.NavTab tab,
1107
      com.github.sonus21.rqueue.models.enums.DataType type) {
NEW
1108
    if (tab == null) {
×
NEW
1109
      return type == null ? null : "Stream";
×
1110
    }
NEW
1111
    switch (tab) {
×
1112
      case PENDING:
NEW
1113
        return "Queue (Stream)";
×
1114
      case DEAD:
NEW
1115
        return "Dead Letter (Stream)";
×
1116
      case COMPLETED:
NEW
1117
        return "Completed (KV)";
×
1118
      default:
1119
        // Running / Scheduled / Cron tabs are hidden on NATS via Capabilities; fall through.
NEW
1120
        return type == null ? null : "Stream";
×
1121
    }
1122
  }
1123

1124
  @Override
1125
  public void close() {
1126
    for (JetStreamSubscription s : subscriptionCache.values()) {
1✔
1127
      try {
1128
        s.unsubscribe();
1✔
1129
      } catch (RuntimeException ignored) {
×
1130
        // ignore
1131
      }
1✔
1132
    }
1✔
1133
    subscriptionCache.clear();
1✔
1134
    inFlight.clear();
1✔
1135
  }
1✔
1136

1137
  /**
1138
   * Provision a DLQ stream for the given queue. Caller wires up an advisory listener (subscribed to
1139
   * {@code $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>}) that republishes the exhausted message
1140
   * onto {@link #dlqSubjectFor(QueueDetail)}. v1 leaves the bridge wiring opt-in via
1141
   * {@link #installDeadLetterBridge(QueueDetail, String)}.
1142
   */
1143
  public void provisionDlq(QueueDetail q) {
1144
    // Explicit call — always provision, bypassing the autoCreateDlqStream flag.
1145
    // That flag gates automatic provisioning at bootstrap; here the caller is explicitly opting in.
1146
    provisioner.ensureStream(
1✔
1147
        dlqStreamFor(q), List.of(dlqSubjectFor(q)), QueueType.QUEUE, dlqStreamDescription(q));
1✔
1148
  }
1✔
1149

1150
  /**
1151
   * Install a background dispatcher that watches max-deliveries advisories on the queue's stream
1152
   * and republishes the offending payload onto the DLQ subject. Returns an {@link AutoCloseable}
1153
   * that tears the dispatcher down. Tests rely on this; production code in Phase 3 will call it
1154
   * during container start.
1155
   */
1156
  public AutoCloseable installDeadLetterBridge(QueueDetail q, String consumerName) {
1157
    provisionDlq(q);
1✔
1158
    String advisorySubject =
1✔
1159
        "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES." + streamFor(q) + "." + consumerName;
1✔
1160
    String dlqSubject = dlqSubjectFor(q);
1✔
1161
    String stream = streamFor(q);
1✔
1162
    Dispatcher d = connection.createDispatcher(advisoryMsg -> {
1✔
1163
      try {
1164
        @SuppressWarnings("unchecked")
1165
        Map<String, Object> adv = serdes.deserialize(advisoryMsg.getData(), Map.class);
1✔
1166
        Object seqVal = adv.get("stream_seq");
1✔
1167
        long streamSeq = seqVal instanceof Number ? ((Number) seqVal).longValue() : -1L;
1!
1168
        if (streamSeq <= 0) {
1!
1169
          return;
×
1170
        }
1171
        io.nats.client.api.MessageInfo mi = jsm.getMessage(stream, streamSeq);
1✔
1172
        Headers h = new Headers();
1✔
1173
        if (mi.getHeaders() != null) {
1!
1174
          mi.getHeaders().forEach((k, v) -> h.add(k, v));
1✔
1175
        }
1176
        js.publish(dlqSubject, h, mi.getData());
1✔
1177
      } catch (Exception e) {
×
1178
        log.log(
×
1179
            Level.WARNING, "Failed to bridge max-delivery advisory to DLQ for stream=" + stream, e);
1180
      }
1✔
1181
    });
1✔
1182
    d.subscribe(advisorySubject);
1✔
1183
    return () -> {
1✔
1184
      try {
1185
        connection.closeDispatcher(d);
1✔
1186
      } catch (RuntimeException ignored) {
×
1187
        // best-effort
1188
      }
1✔
1189
    };
1✔
1190
  }
1191

1192
  // ---- builder -----------------------------------------------------------
1193

1194
  public static class Builder {
1✔
1195

1196
    private Connection connection;
1197
    private JetStream jetStream;
1198
    private JetStreamManagement management;
1199
    private RqueueNatsConfig config;
1200
    private RqueueSerDes serdes;
1201
    private NatsProvisioner provisioner;
1202

1203
    public Builder connection(Connection connection) {
1204
      this.connection = connection;
1✔
1205
      return this;
1✔
1206
    }
1207

1208
    public Builder jetStream(JetStream jetStream) {
1209
      this.jetStream = jetStream;
1✔
1210
      return this;
1✔
1211
    }
1212

1213
    public Builder management(JetStreamManagement management) {
1214
      this.management = management;
1✔
1215
      return this;
1✔
1216
    }
1217

1218
    public Builder config(RqueueNatsConfig config) {
1219
      this.config = config;
1✔
1220
      return this;
1✔
1221
    }
1222

1223
    public Builder serDes(RqueueSerDes serdes) {
1224
      this.serdes = serdes;
×
1225
      return this;
×
1226
    }
1227

1228
    public Builder provisioner(NatsProvisioner provisioner) {
1229
      this.provisioner = provisioner;
1✔
1230
      return this;
1✔
1231
    }
1232

1233
    public JetStreamMessageBroker build() {
1234
      if (connection == null) {
1!
1235
        throw new IllegalStateException("connection is required");
×
1236
      }
1237
      try {
1238
        if (jetStream == null) {
1✔
1239
          jetStream = connection.jetStream();
1✔
1240
        }
1241
        if (management == null) {
1✔
1242
          management = connection.jetStreamManagement();
1✔
1243
        }
1244
        if (provisioner == null) {
1✔
1245
          provisioner = new NatsProvisioner(
1✔
1246
              connection, management, config != null ? config : RqueueNatsConfig.defaults());
1✔
1247
        }
1248
      } catch (IOException e) {
×
1249
        throw new RqueueNatsException("Failed to derive JetStream context from connection", e);
×
1250
      }
1✔
1251
      if (config == null) {
1✔
1252
        config = RqueueNatsConfig.defaults();
1✔
1253
      }
1254
      if (serdes == null) {
1!
1255
        serdes = new RqJacksonSerDes(SerializationUtils.getObjectMapper());
1✔
1256
      }
1257
      return new JetStreamMessageBroker(
1✔
1258
          connection, jetStream, management, config, serdes, provisioner);
1259
    }
1260

1261
    /**
1262
     * Create a broker that wraps a pre-built {@link Map} of NATS handles. Used by the factory.
1263
     */
1264
    public JetStreamMessageBroker buildFromConfig(Map<String, String> ignored) {
1265
      return build();
×
1266
    }
1267
  }
1268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc