• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.06
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/support/RqueueMessageUtils.java
1
/*
2
 * Copyright (c) 2019-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.support;
18

19
import static com.github.sonus21.rqueue.utils.Constants.REDIS_KEY_SEPARATOR;
20

21
import com.github.sonus21.rqueue.core.RqueueMessage;
22
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
23
import java.util.ArrayList;
24
import java.util.HashMap;
25
import java.util.List;
26
import org.springframework.messaging.Message;
27
import org.springframework.messaging.MessageHeaders;
28
import org.springframework.messaging.converter.MessageConversionException;
29
import org.springframework.messaging.converter.MessageConverter;
30
import org.springframework.messaging.support.GenericMessage;
31

32
public final class RqueueMessageUtils {
33

34
  private static final String META_DATA_KEY_PREFIX = "__rq::m-mdata::";
35

36
  private RqueueMessageUtils() {}
37

38
  public static String getMessageMetaId(String queueName, String messageId) {
39
    return META_DATA_KEY_PREFIX + queueName + REDIS_KEY_SEPARATOR + messageId;
1✔
40
  }
41

42
  public static Object convertMessageToObject(
43
      RqueueMessage message, MessageConverter messageConverter) {
44
    return convertMessageToObject(new GenericMessage<>(message.getMessage()), messageConverter);
1✔
45
  }
46

47
  public static Object convertMessageToObject(
48
      Message<String> message, MessageConverter messageConverter) {
49
    return messageConverter.fromMessage(message, null);
1✔
50
  }
51

52
  public static RqueueMessage buildPeriodicMessage(
53
      RqueueMessageIdGenerator messageIdGenerator,
54
      MessageConverter converter,
55
      String queueName,
56
      String messageId,
57
      Object message,
58
      Integer retryCount,
59
      long period,
60
      MessageHeaders messageHeaders) {
61
    Message<?> msg = converter.toMessage(message, messageHeaders);
1✔
62
    if (msg == null) {
1✔
63
      throw new MessageConversionException("Message could not be build (null)");
1✔
64
    }
65
    Object payload = msg.getPayload();
1✔
66
    long processAt = System.currentTimeMillis() + period;
1✔
67
    String strMessage;
68
    if (payload instanceof String) {
1✔
69
      strMessage = (String) payload;
1✔
70
    } else if (payload instanceof byte[]) {
1!
71
      strMessage = new String((byte[]) payload);
×
72
    } else {
73
      throw new MessageConversionException("Message payload is neither String nor byte[]");
1✔
74
    }
75
    RqueueMessage rqueueMessage = RqueueMessage.builder()
1✔
76
        .id(messageIdGenerator.generate())
1✔
77
        .queueName(queueName)
1✔
78
        .message(strMessage)
1✔
79
        .processAt(processAt)
1✔
80
        .retryCount(retryCount)
1✔
81
        .period(period)
1✔
82
        .build();
1✔
83
    if (messageId != null) {
1✔
84
      rqueueMessage.setId(messageId);
1✔
85
    }
86
    return rqueueMessage;
1✔
87
  }
88

89
  public static RqueueMessage buildMessage(
90
      RqueueMessageIdGenerator messageIdGenerator,
91
      MessageConverter converter,
92
      String queueName,
93
      String messageId,
94
      Object message,
95
      Integer retryCount,
96
      Long delay,
97
      MessageHeaders messageHeaders) {
98
    Message<?> msg = converter.toMessage(message, messageHeaders);
1✔
99
    if (msg == null) {
1✔
100
      throw new MessageConversionException("Message could not be build (null)");
1✔
101
    }
102
    long queuedTime = System.nanoTime();
1✔
103
    long processAt = System.currentTimeMillis();
1✔
104
    if (delay != null) {
1✔
105
      processAt += delay;
1✔
106
    }
107
    Object payload = msg.getPayload();
1✔
108
    String strMessage;
109
    if (payload instanceof String) {
1✔
110
      strMessage = (String) payload;
1✔
111
    } else if (payload instanceof byte[]) {
1!
UNCOV
112
      strMessage = new String((byte[]) payload);
×
113
    } else {
114
      throw new MessageConversionException("Message payload is neither String nor byte[]");
1✔
115
    }
116
    RqueueMessage rqueueMessage = RqueueMessage.builder()
1✔
117
        .retryCount(retryCount)
1✔
118
        .queuedTime(queuedTime)
1✔
119
        .id(messageIdGenerator.generate())
1✔
120
        .queueName(queueName)
1✔
121
        .message(strMessage)
1✔
122
        .processAt(processAt)
1✔
123
        .build();
1✔
124
    if (messageId != null) {
1✔
125
      rqueueMessage.setId(messageId);
1✔
126
    }
127
    return rqueueMessage;
1✔
128
  }
129

130
  public static List<RqueueMessage> generateMessages(
131
      RqueueMessageIdGenerator messageIdGenerator,
132
      MessageConverter converter,
133
      String queueName,
134
      int count) {
135
    return generateMessages(
1✔
136
        messageIdGenerator, converter, messageIdGenerator.generate(), queueName, null, null, count);
1✔
137
  }
138

139
  public static RqueueMessage generateMessage(
140
      RqueueMessageIdGenerator messageIdGenerator, MessageConverter converter, String queueName) {
141
    return generateMessages(
1✔
142
            messageIdGenerator, converter, messageIdGenerator.generate(), queueName, null, null, 1)
1✔
143
        .get(0);
1✔
144
  }
145

146
  public static List<RqueueMessage> generateMessages(
147
      RqueueMessageIdGenerator messageIdGenerator,
148
      MessageConverter converter,
149
      String queueName,
150
      long delay,
151
      int count) {
152
    return generateMessages(
1✔
153
        messageIdGenerator,
154
        converter,
155
        messageIdGenerator.generate(),
1✔
156
        queueName,
157
        null,
158
        delay,
1✔
159
        count);
160
  }
161

162
  public static List<RqueueMessage> generateMessages(
163
      RqueueMessageIdGenerator messageIdGenerator,
164
      MessageConverter converter,
165
      Object object,
166
      String queueName,
167
      Integer retryCount,
168
      Long delay,
169
      int count) {
170
    List<RqueueMessage> messages = new ArrayList<>();
1✔
171
    for (int i = 0; i < count; i++) {
1✔
172
      messages.add(buildMessage(
1✔
173
          messageIdGenerator, converter, queueName, null, object, retryCount, delay, null));
174
    }
175
    return messages;
1✔
176
  }
177

178
  public static Message<?> cloneMessage(Message<?> message) {
179
    return new GenericMessage<>(message.getPayload(), new HashMap<>(message.getHeaders()));
1✔
180
  }
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc