• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25621442036

10 May 2026 06:06AM UTC coverage: 83.337% (-0.06%) from 83.396%
25621442036

Pull #297

github

web-flow
Merge 151ec0fb3 into a868dcde0
Pull Request #297: Nats scheduling fix

2625 of 3487 branches covered (75.28%)

Branch coverage included in aggregate %.

129 of 180 new or added lines in 12 files covered. (71.67%)

11 existing lines in 3 files now uncovered.

7833 of 9062 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.06
/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java
1
/*
2
 * Copyright (c) 2019-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.metrics;
18

19
import com.github.sonus21.rqueue.config.MetricsProperties;
20
import com.github.sonus21.rqueue.core.EndpointRegistry;
21
import com.github.sonus21.rqueue.listener.QueueDetail;
22
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
23
import io.micrometer.core.instrument.Gauge;
24
import io.micrometer.core.instrument.Gauge.Builder;
25
import io.micrometer.core.instrument.MeterRegistry;
26
import io.micrometer.core.instrument.Tags;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.scheduling.annotation.Async;
29

30
/**
31
 * RqueueMetrics register metrics related to queue. A queue can have 4 types of metrics like
32
 * queue.size, processing.queue.size and scheduled.queue.size. Some messages can be in dead letter
33
 * queue if dead letter queue is configured.
34
 */
35
public class RqueueMetrics implements RqueueMetricsRegistry {
36

37
  static final String QUEUE_KEY = "key";
38
  /**
39
   * Tag added when a {@link QueueDetail} declares a {@code consumerName} override. Without this
40
   * tag, two {@code @RqueueListener} methods on the same queue with different consumer names
41
   * register gauges with identical (name, tag-set) pairs and Micrometer silently keeps only the
42
   * first — losing the second consumer's metrics entirely.
43
   */
44
  static final String CONSUMER_KEY = "consumer";
45
  private static final String QUEUE_SIZE = "queue.size";
46
  private static final String SCHEDULED_QUEUE_SIZE = "scheduled.queue.size";
47
  private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size";
48
  private static final String DEAD_LETTER_QUEUE_SIZE = "dead.letter.queue.size";
49
  private final QueueCounter queueCounter;
50

51
  @Autowired
52
  private MetricsProperties metricsProperties;
53

54
  @Autowired
55
  private MeterRegistry meterRegistry;
56

57
  @Autowired
58
  private RqueueQueueMetricsProvider queueMetricsProvider;
59

60
  public RqueueMetrics(QueueCounter queueCounter) {
1✔
61
    this.queueCounter = queueCounter;
1✔
62
  }
1✔
63

64
  private void monitor() {
65
    for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
1✔
66
      Tags queueTags =
1✔
67
          Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
1✔
68
      // When a queue carries multiple consumers (multiple @RqueueListener with distinct
69
      // consumerName overrides), each gets its own QueueDetail. Without a `consumer` tag the
70
      // gauges would share the same (name, tags) and Micrometer would drop all but the first.
71
      String consumerName = queueDetail.getConsumerName();
1✔
72
      boolean hasConsumerOverride = consumerName != null && !consumerName.isEmpty();
1!
73
      if (hasConsumerOverride) {
1✔
74
        queueTags = queueTags.and(CONSUMER_KEY, consumerName);
1✔
75
      }
76
      Gauge.builder(
1✔
77
              metricsProperties.getMetricName(QUEUE_SIZE),
1✔
78
              queueDetail,
79
              c -> hasConsumerOverride
1!
NEW
80
                  ? queueMetricsProvider.getPendingMessageCountByConsumer(
×
NEW
81
                      queueDetail.getName(), consumerName)
×
82
                  : queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
1✔
83
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
1✔
84
          .description("The number of entries in this queue")
1✔
85
          .register(meterRegistry);
1✔
86
      Gauge.builder(
1✔
87
              metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE),
1✔
88
              queueDetail,
89
              c -> hasConsumerOverride
1!
NEW
90
                  ? queueMetricsProvider.getProcessingMessageCountByConsumer(
×
NEW
91
                      queueDetail.getName(), consumerName)
×
92
                  : queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
1✔
93
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
1✔
94
          .description("The number of entries in the processing queue")
1✔
95
          .register(meterRegistry);
1✔
96
      Gauge.builder(
1✔
97
              metricsProperties.getMetricName(SCHEDULED_QUEUE_SIZE),
1✔
98
              queueDetail,
99
              c -> queueMetricsProvider.getScheduledMessageCount(queueDetail.getName()))
1✔
100
          .tags(queueTags.and(QUEUE_KEY, queueDetail.getScheduledQueueName()))
1✔
101
          .description("The number of entries waiting in the scheduled queue")
1✔
102
          .register(meterRegistry);
1✔
103
      if (queueDetail.isDlqSet()) {
1✔
104
        Builder<QueueDetail> builder = Gauge.builder(
1✔
105
            metricsProperties.getMetricName(DEAD_LETTER_QUEUE_SIZE),
1✔
106
            queueDetail,
107
            c -> queueMetricsProvider.getDeadLetterMessageCount(queueDetail.getName()));
1✔
108
        builder.tags(queueTags);
1✔
109
        builder.description("The number of entries in the dead letter queue");
1✔
110
        builder.register(meterRegistry);
1✔
111
      }
112
      queueCounter.registerQueue(metricsProperties, queueTags, meterRegistry, queueDetail);
1✔
113
    }
1✔
114
  }
1✔
115

116
  @Override
117
  @Async
118
  public void onApplicationEvent(RqueueBootstrapEvent event) {
119
    if (event.isStartup()) {
1✔
120
      monitor();
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public QueueCounter getQueueCounter() {
126
    return this.queueCounter;
1✔
127
  }
128
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc