• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.15
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import static com.github.sonus21.rqueue.core.support.RqueueMessageUtils.buildMessage;
20
import static com.github.sonus21.rqueue.core.support.RqueueMessageUtils.buildPeriodicMessage;
21
import static com.github.sonus21.rqueue.utils.Constants.DEFAULT_PRIORITY_KEY;
22
import static com.github.sonus21.rqueue.utils.Constants.MIN_DELAY;
23
import static com.github.sonus21.rqueue.utils.Validator.validateQueue;
24
import static org.springframework.util.Assert.notNull;
25

26
import com.github.sonus21.rqueue.config.RqueueConfig;
27
import com.github.sonus21.rqueue.core.EndpointRegistry;
28
import com.github.sonus21.rqueue.core.RqueueMessage;
29
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
30
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
31
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
32
import com.github.sonus21.rqueue.enums.QueueType;
33
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
34
import com.github.sonus21.rqueue.listener.QueueDetail;
35
import com.github.sonus21.rqueue.models.db.MessageMetadata;
36
import com.github.sonus21.rqueue.models.enums.MessageStatus;
37
import com.github.sonus21.rqueue.service.RqueueMessageMetadataService;
38
import com.github.sonus21.rqueue.utils.PriorityUtils;
39
import java.time.Duration;
40
import java.util.Collections;
41
import java.util.HashMap;
42
import java.util.Map;
43
import lombok.extern.slf4j.Slf4j;
44
import org.springframework.beans.factory.annotation.Autowired;
45
import org.springframework.messaging.MessageHeaders;
46
import org.springframework.messaging.converter.MessageConverter;
47

48
@Slf4j
1✔
49
@SuppressWarnings("WeakerAccess")
50
abstract class BaseMessageSender {
51

52
  protected final MessageHeaders messageHeaders;
53
  protected final MessageConverter messageConverter;
54
  protected final RqueueMessageTemplate messageTemplate;
55
  protected final RqueueMessageIdGenerator messageIdGenerator;
56
  protected final com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker;
57

58
  @Autowired
59
  protected RqueueConfig rqueueConfig;
60

61
  @Autowired
62
  protected RqueueMessageMetadataService rqueueMessageMetadataService;
63

64
  BaseMessageSender(
65
      RqueueMessageTemplate messageTemplate,
66
      com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
67
      MessageConverter messageConverter,
68
      MessageHeaders messageHeaders,
69
      RqueueMessageIdGenerator messageIdGenerator) {
1✔
70
    notNull(messageTemplate, "messageTemplate cannot be null");
1✔
71
    notNull(messageBroker, "messageBroker cannot be null");
1✔
72
    notNull(messageConverter, "messageConverter cannot be null");
1✔
73
    notNull(messageIdGenerator, "messageIdGenerator cannot be null");
1✔
74
    this.messageTemplate = messageTemplate;
1✔
75
    this.messageBroker = messageBroker;
1✔
76
    this.messageConverter = messageConverter;
1✔
77
    this.messageHeaders = messageHeaders;
1✔
78
    this.messageIdGenerator = messageIdGenerator;
1✔
79
  }
1✔
80

81
  protected Object storeMessageMetadata(
82
      RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive, boolean isUnique) {
83
    boolean skipMetadata = !messageBroker.capabilities().usesPrimaryHandlerDispatch();
1✔
84
    if (skipMetadata) {
1✔
85
      return reactive ? reactor.core.publisher.Mono.just(true) : null;
1✔
86
    }
87
    MessageMetadata messageMetadata = new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
1✔
88
    Duration duration = rqueueConfig.getMessageDurability(delayInMillis);
1✔
89
    if (reactive) {
1✔
90
      return rqueueMessageMetadataService.saveReactive(messageMetadata, duration, isUnique);
1✔
91
    } else {
92
      rqueueMessageMetadataService.save(messageMetadata, duration, isUnique);
1✔
93
    }
94
    return null;
1✔
95
  }
96

97
  protected Object enqueue(
98
      QueueDetail queueDetail,
99
      RqueueMessage rqueueMessage,
100
      Long delayInMilliSecs,
101
      boolean reactive) {
102
    return enqueue(queueDetail, null, rqueueMessage, delayInMilliSecs, reactive);
1✔
103
  }
104

105
  /**
106
   * Priority-aware enqueue. Always routes through {@link
107
   * com.github.sonus21.rqueue.core.spi.MessageBroker} — the Redis-vs-NATS dispatch lives inside
108
   * each broker implementation. Backends that key off the queue name (Redis) ignore {@code
109
   * priority}; backends that publish to a per-priority destination (NATS) use it to pick the
110
   * subject. Reactive enqueues route through {@code enqueueReactive} so backends with native
111
   * async APIs do not block a thread.
112
   */
113
  protected Object enqueue(
114
      QueueDetail queueDetail,
115
      String priority,
116
      RqueueMessage rqueueMessage,
117
      Long delayInMilliSecs,
118
      boolean reactive) {
119
    if (delayInMilliSecs == null || delayInMilliSecs <= MIN_DELAY) {
1✔
120
      if (reactive) {
1!
UNCOV
121
        return messageBroker.enqueueReactive(queueDetail, rqueueMessage);
×
122
      }
123
      messageBroker.enqueue(queueDetail, priority, rqueueMessage);
1✔
124
    } else {
125
      if (reactive) {
1!
UNCOV
126
        return messageBroker.enqueueWithDelayReactive(queueDetail, rqueueMessage, delayInMilliSecs);
×
127
      }
128
      messageBroker.enqueueWithDelay(queueDetail, rqueueMessage, delayInMilliSecs);
1✔
129
    }
130
    return null;
1✔
131
  }
132

133
  protected String pushMessage(
134
      String queueName,
135
      String messageId,
136
      Object message,
137
      Integer retryCount,
138
      Long delayInMilliSecs,
139
      boolean isUnique) {
140
    return pushMessage(queueName, null, messageId, message, retryCount, delayInMilliSecs, isUnique);
1✔
141
  }
142

143
  protected String pushMessage(
144
      String queueName,
145
      String priority,
146
      String messageId,
147
      Object message,
148
      Integer retryCount,
149
      Long delayInMilliSecs,
150
      boolean isUnique) {
151
    QueueDetail queueDetail = EndpointRegistry.get(queueName);
1✔
152
    RqueueMessage rqueueMessage = buildMessage(
1✔
153
        messageIdGenerator,
154
        messageConverter,
155
        queueName,
156
        messageId,
157
        message,
158
        retryCount,
159
        delayInMilliSecs,
160
        messageHeaders);
161
    try {
162
      storeMessageMetadata(rqueueMessage, delayInMilliSecs, false, isUnique);
1✔
163
      enqueue(queueDetail, priority, rqueueMessage, delayInMilliSecs, false);
1✔
164
    } catch (DuplicateMessageException e) {
1✔
165
      log.warn(
1✔
166
          "Duplicate message enqueue attempted queue: {}, messageId: {}",
167
          queueName,
168
          rqueueMessage.getId());
1✔
169
      return null;
1✔
UNCOV
170
    } catch (Exception e) {
×
UNCOV
171
      log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage.getId(), e);
×
UNCOV
172
      return null;
×
173
    }
1✔
174
    return rqueueMessage.getId();
1✔
175
  }
176

177
  protected String pushPeriodicMessage(
178
      String queueName, String messageId, Object message, long periodInMilliSeconds) {
179
    QueueDetail queueDetail = EndpointRegistry.get(queueName);
1✔
180
    RqueueMessage rqueueMessage = buildPeriodicMessage(
1✔
181
        messageIdGenerator,
182
        messageConverter,
183
        queueName,
184
        messageId,
185
        message,
186
        null,
187
        periodInMilliSeconds,
188
        messageHeaders);
189
    try {
190
      storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false, false);
1✔
191
      enqueue(queueDetail, rqueueMessage, periodInMilliSeconds, false);
1✔
192
      return rqueueMessage.getId();
1✔
UNCOV
193
    } catch (Exception e) {
×
UNCOV
194
      log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage, e);
×
UNCOV
195
      return null;
×
196
    }
197
  }
198

199
  protected Object deleteAllMessages(QueueDetail queueDetail) {
200
    return MessageSweeper.getInstance(rqueueConfig, messageTemplate, rqueueMessageMetadataService)
1✔
201
        .deleteAllMessages(
1✔
202
            MessageDeleteRequest.builder().queueDetail(queueDetail).build());
1✔
203
  }
204

205
  protected void registerQueueInternal(String queueName, QueueType type, String... priorities) {
206
    validateQueue(queueName);
1✔
207
    messageBroker.validateQueueName(queueName);
1✔
208
    notNull(priorities, "priorities cannot be null");
1✔
209
    Map<String, Integer> priorityMap = new HashMap<>();
1✔
210
    priorityMap.put(DEFAULT_PRIORITY_KEY, 1);
1✔
211
    for (String priority : priorities) {
1✔
212
      priorityMap.put(priority, 1);
1✔
213
    }
214

215
    QueueDetail queueDetail = QueueDetail.builder()
1✔
216
        .name(queueName)
1✔
217
        .active(false)
1✔
218
        .queueName(rqueueConfig.getQueueName(queueName))
1✔
219
        .scheduledQueueName(rqueueConfig.getScheduledQueueName(queueName))
1✔
220
        .scheduledQueueChannelName(rqueueConfig.getScheduledQueueChannelName(queueName))
1✔
221
        .processingQueueName(rqueueConfig.getProcessingQueueName(queueName))
1✔
222
        .processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(queueName))
1✔
223
        .priority(priorityMap)
1✔
224
        .type(type)
1✔
225
        .build();
1✔
226
    EndpointRegistry.register(queueDetail);
1✔
227
    notifyBrokerQueueRegistered(queueDetail);
1✔
228
    for (String priority : priorities) {
1✔
229
      String suffix = PriorityUtils.getSuffix(priority);
1✔
230
      queueDetail = QueueDetail.builder()
1✔
231
          .name(queueName + suffix)
1✔
232
          .active(false)
1✔
233
          .queueName(rqueueConfig.getQueueName(queueName) + suffix)
1✔
234
          .scheduledQueueName(rqueueConfig.getScheduledQueueName(queueName) + suffix)
1✔
235
          .scheduledQueueChannelName(rqueueConfig.getScheduledQueueChannelName(queueName) + suffix)
1✔
236
          .processingQueueName(rqueueConfig.getProcessingQueueName(queueName) + suffix)
1✔
237
          .processingQueueChannelName(
1✔
238
              rqueueConfig.getProcessingQueueChannelName(queueName) + suffix)
1✔
239
          .priority(Collections.singletonMap(DEFAULT_PRIORITY_KEY, 1))
1✔
240
          .build();
1✔
241
      EndpointRegistry.register(queueDetail);
1✔
242
      notifyBrokerQueueRegistered(queueDetail);
1✔
243
    }
244
  }
1✔
245

246
  private void notifyBrokerQueueRegistered(QueueDetail queueDetail) {
247
    messageBroker.onQueueRegistered(queueDetail);
1✔
248
  }
1✔
249
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc