• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25621809822

10 May 2026 06:27AM UTC coverage: 83.425% (+0.03%) from 83.396%
25621809822

push

github

web-flow
Nats scheduling fix (#297)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipp... (continued)

2628 of 3487 branches covered (75.37%)

Branch coverage included in aggregate %.

128 of 179 new or added lines in 12 files covered. (71.51%)

1 existing line in 1 file now uncovered.

7841 of 9062 relevant lines covered (86.53%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.06
/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java
1
/*
2
 * Copyright (c) 2019-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.metrics;
18

19
import com.github.sonus21.rqueue.config.MetricsProperties;
20
import com.github.sonus21.rqueue.core.EndpointRegistry;
21
import com.github.sonus21.rqueue.listener.QueueDetail;
22
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
23
import io.micrometer.core.instrument.Gauge;
24
import io.micrometer.core.instrument.Gauge.Builder;
25
import io.micrometer.core.instrument.MeterRegistry;
26
import io.micrometer.core.instrument.Tags;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.scheduling.annotation.Async;
29

30
/**
31
 * RqueueMetrics register metrics related to queue. A queue can have 4 types of metrics like
32
 * queue.size, processing.queue.size and scheduled.queue.size. Some messages can be in dead letter
33
 * queue if dead letter queue is configured.
34
 */
35
public class RqueueMetrics implements RqueueMetricsRegistry {
36

37
  static final String QUEUE_KEY = "key";
38
  /**
39
   * Tag added when a {@link QueueDetail} declares a {@code consumerName} override. Without this
40
   * tag, two {@code @RqueueListener} methods on the same queue with different consumer names
41
   * register gauges with identical (name, tag-set) pairs and Micrometer silently keeps only the
42
   * first — losing the second consumer's metrics entirely.
43
   */
44
  static final String CONSUMER_KEY = "consumer";
45

46
  private static final String QUEUE_SIZE = "queue.size";
47
  private static final String SCHEDULED_QUEUE_SIZE = "scheduled.queue.size";
48
  private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size";
49
  private static final String DEAD_LETTER_QUEUE_SIZE = "dead.letter.queue.size";
50
  private final QueueCounter queueCounter;
51

52
  @Autowired
53
  private MetricsProperties metricsProperties;
54

55
  @Autowired
56
  private MeterRegistry meterRegistry;
57

58
  @Autowired
59
  private RqueueQueueMetricsProvider queueMetricsProvider;
60

61
  public RqueueMetrics(QueueCounter queueCounter) {
1✔
62
    this.queueCounter = queueCounter;
1✔
63
  }
1✔
64

65
  private void monitor() {
66
    for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
1✔
67
      Tags queueTags =
1✔
68
          Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
1✔
69
      // When a queue carries multiple consumers (multiple @RqueueListener with distinct
70
      // consumerName overrides), each gets its own QueueDetail. Without a `consumer` tag the
71
      // gauges would share the same (name, tags) and Micrometer would drop all but the first.
72
      String consumerName = queueDetail.getConsumerName();
1✔
73
      boolean hasConsumerOverride = consumerName != null && !consumerName.isEmpty();
1!
74
      if (hasConsumerOverride) {
1✔
75
        queueTags = queueTags.and(CONSUMER_KEY, consumerName);
1✔
76
      }
77
      Gauge.builder(
1✔
78
              metricsProperties.getMetricName(QUEUE_SIZE),
1✔
79
              queueDetail,
80
              c -> hasConsumerOverride
1!
NEW
81
                  ? queueMetricsProvider.getPendingMessageCountByConsumer(
×
NEW
82
                      queueDetail.getName(), consumerName)
×
83
                  : queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
1✔
84
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
1✔
85
          .description("The number of entries in this queue")
1✔
86
          .register(meterRegistry);
1✔
87
      Gauge.builder(
1✔
88
              metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE),
1✔
89
              queueDetail,
90
              c -> hasConsumerOverride
1!
NEW
91
                  ? queueMetricsProvider.getProcessingMessageCountByConsumer(
×
NEW
92
                      queueDetail.getName(), consumerName)
×
93
                  : queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
1✔
94
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
1✔
95
          .description("The number of entries in the processing queue")
1✔
96
          .register(meterRegistry);
1✔
97
      Gauge.builder(
1✔
98
              metricsProperties.getMetricName(SCHEDULED_QUEUE_SIZE),
1✔
99
              queueDetail,
100
              c -> queueMetricsProvider.getScheduledMessageCount(queueDetail.getName()))
1✔
101
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getScheduledQueueName()))
1✔
102
          .description("The number of entries waiting in the scheduled queue")
1✔
103
          .register(meterRegistry);
1✔
104
      if (queueDetail.isDlqSet()) {
1✔
105
        Builder<QueueDetail> builder = Gauge.builder(
1✔
106
            metricsProperties.getMetricName(DEAD_LETTER_QUEUE_SIZE),
1✔
107
            queueDetail,
108
            c -> queueMetricsProvider.getDeadLetterMessageCount(queueDetail.getName()));
1✔
109
        builder.tags(queueTags);
1✔
110
        builder.description("The number of entries in the dead letter queue");
1✔
111
        builder.register(meterRegistry);
1✔
112
      }
113
      queueCounter.registerQueue(metricsProperties, queueTags, meterRegistry, queueDetail);
1✔
114
    }
1✔
115
  }
1✔
116

117
  @Override
118
  @Async
119
  public void onApplicationEvent(RqueueBootstrapEvent event) {
120
    if (event.isStartup()) {
1✔
121
      monitor();
1✔
122
    }
123
  }
1✔
124

125
  @Override
126
  public QueueCounter getQueueCounter() {
127
    return this.queueCounter;
1✔
128
  }
129
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc