• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.66
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/MessageSweeper.java
1
/*
2
 * Copyright (c) 2021-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import com.github.sonus21.rqueue.config.RqueueConfig;
20
import com.github.sonus21.rqueue.core.RqueueMessage;
21
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
22
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
23
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
24
import com.github.sonus21.rqueue.listener.QueueDetail;
25
import com.github.sonus21.rqueue.service.RqueueMessageMetadataService;
26
import com.github.sonus21.rqueue.utils.RetryableRunnable;
27
import com.github.sonus21.rqueue.utils.StringUtils;
28
import java.util.ArrayList;
29
import java.util.Arrays;
30
import java.util.LinkedList;
31
import java.util.List;
32
import java.util.concurrent.ExecutorService;
33
import java.util.concurrent.Executors;
34
import java.util.stream.Collectors;
35
import lombok.AllArgsConstructor;
36
import lombok.Builder;
37
import lombok.ToString;
38
import lombok.extern.slf4j.Slf4j;
39
import org.apache.commons.collections4.ListUtils;
40
import org.springframework.data.redis.connection.DataType;
41
import org.springframework.util.CollectionUtils;
42

43
@Slf4j
1✔
44
public class MessageSweeper {
45
  private static MessageSweeper messageSweeper;
46
  private final ExecutorService executorService;
47
  private final RqueueMessageTemplate messageTemplate;
48
  private final RqueueMessageMetadataService rqueueMessageMetadataService;
49
  private final RqueueConfig rqueueConfig;
50

51
  private MessageSweeper(
52
      RqueueConfig rqueueConfig,
53
      RqueueMessageTemplate messageTemplate,
54
      RqueueMessageMetadataService rqueueMessageMetadataService) {
1✔
55
    this.rqueueMessageMetadataService = rqueueMessageMetadataService;
1✔
56
    this.executorService = Executors.newSingleThreadExecutor();
1✔
57
    this.messageTemplate = messageTemplate;
1✔
58
    this.rqueueConfig = rqueueConfig;
1✔
59
  }
1✔
60

61
  public static MessageSweeper getInstance(
62
      RqueueConfig rqueueConfig,
63
      RqueueMessageTemplate messageTemplate,
64
      RqueueMessageMetadataService rqueueMessageMetadataDao) {
65
    if (MessageSweeper.messageSweeper == null) {
1✔
66
      synchronized (MessageSweeper.class) {
1✔
67
        if (MessageSweeper.messageSweeper == null) {
1!
68
          MessageSweeper.messageSweeper =
1✔
69
              new MessageSweeper(rqueueConfig, messageTemplate, rqueueMessageMetadataDao);
70
          return MessageSweeper.messageSweeper;
1✔
71
        }
UNCOV
72
        return MessageSweeper.messageSweeper;
×
73
      }
74
    }
75
    return MessageSweeper.messageSweeper;
1✔
76
  }
77

78
  public boolean deleteAllMessages(MessageDeleteRequest request) {
79
    log.debug("MessageDeleteRequest {}", request);
1✔
80
    if (!request.isValid()) {
1✔
81
      throw new IllegalArgumentException("Message request is not valid");
1✔
82
    }
83
    List<DeleteJobData> deleteJobData = new ArrayList<>();
1✔
84
    QueueDetail detail = request.queueDetail;
1✔
85
    if (detail != null) {
1✔
86
      String newQueueName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
87
      String newScheduledZsetName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
88
      String newProcessingZsetName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
89
      messageTemplate.renameCollections(
1✔
90
          Arrays.asList(
1✔
91
              detail.getQueueName(),
1✔
92
              detail.getScheduledQueueName(),
1✔
93
              detail.getProcessingQueueName()),
1✔
94
          Arrays.asList(newQueueName, newScheduledZsetName, newProcessingZsetName));
1✔
95
      deleteJobData.add(new DeleteJobData(newQueueName, DataType.LIST));
1✔
96
      deleteJobData.add(new DeleteJobData(newScheduledZsetName, DataType.ZSET));
1✔
97
      deleteJobData.add(new DeleteJobData(newProcessingZsetName, DataType.ZSET));
1✔
98
    } else {
1✔
99
      switch (request.dataType) {
1!
100
        case LIST:
101
          DeleteJobData data =
1✔
102
              new DeleteJobData(rqueueConfig.getDelDataName(request.dataName), request.dataType);
1✔
103
          messageTemplate.renameCollection(request.dataName, data.name);
1✔
104
          deleteJobData.add(data);
1✔
105
          break;
1✔
106
        case ZSET:
107
          data = new DeleteJobData(rqueueConfig.getDelDataName(request.dataName), request.dataType);
1✔
108
          messageTemplate.renameCollection(request.dataName, data.name);
1✔
109
          deleteJobData.add(data);
1✔
110
          break;
1✔
111
        default:
UNCOV
112
          throw new UnknownSwitchCase(request.dataType.code());
×
113
      }
114
    }
115
    if (!CollectionUtils.isEmpty(deleteJobData)) {
1!
116
      if (detail != null) {
1✔
117
        executorService.submit(new MessageDeleteJob(deleteJobData, detail.getName()));
1✔
118
      } else {
119
        executorService.submit(new MessageDeleteJob(deleteJobData, request.queueName));
1✔
120
      }
121
    }
122
    return true;
1✔
123
  }
124

125
  @AllArgsConstructor
126
  private static class DeleteJobData {
127

128
    private final String name;
129
    private final DataType type;
130
  }
131

132
  @Builder
133
  @ToString
134
  public static class MessageDeleteRequest {
135

136
    private final QueueDetail queueDetail;
137
    private final String dataName;
138
    private final String queueName;
139
    private final DataType dataType;
140

141
    private boolean isValid() {
142
      if (queueDetail != null) {
1✔
143
        return true;
1✔
144
      }
145
      return !StringUtils.isEmpty(dataName)
1!
146
          && !StringUtils.isEmpty(queueName)
1!
147
          && Arrays.asList(DataType.LIST, DataType.ZSET).contains(dataType);
1✔
148
    }
149
  }
150

151
  private class MessageDeleteJob extends RetryableRunnable<DeleteJobData> {
152

153
    private static final int batchSize = 1000;
154
    private final String queueName;
155

156
    MessageDeleteJob(List<DeleteJobData> jobData, String queueName) {
1✔
157
      super(log, null, jobData.iterator());
1✔
158
      this.queueName = queueName;
1✔
159
    }
1✔
160

161
    private List<String> getMessageIdFromList(String queueName) {
162
      long offset = 0;
1✔
163
      List<String> ids = new LinkedList<>();
1✔
164
      while (true) {
165
        List<RqueueMessage> rqueueMessageList =
1✔
166
            messageTemplate.readFromList(queueName, offset, batchSize);
1✔
167
        if (!CollectionUtils.isEmpty(rqueueMessageList)) {
1✔
168
          for (RqueueMessage rqueueMessage : rqueueMessageList) {
1✔
169
            ids.add(rqueueMessage.getId());
1✔
170
          }
1✔
171
        }
172
        if (CollectionUtils.isEmpty(rqueueMessageList) || rqueueMessageList.size() < batchSize) {
1✔
173
          break;
1✔
174
        }
175
        offset += batchSize;
1✔
176
      }
1✔
177
      return ids;
1✔
178
    }
179

180
    private List<String> getMessageIdFromZset(String zsetName) {
181
      List<String> ids = new LinkedList<>();
1✔
182
      List<RqueueMessage> rqueueMessageList = messageTemplate.readFromZset(zsetName, 0, -1);
1✔
183
      if (!CollectionUtils.isEmpty(rqueueMessageList)) {
1✔
184
        for (RqueueMessage rqueueMessage : rqueueMessageList) {
1✔
185
          ids.add(rqueueMessage.getId());
1✔
186
        }
1✔
187
      }
188
      return ids;
1✔
189
    }
190

191
    private List<String> getMessageIds(DeleteJobData data) {
192
      if (data.type == DataType.LIST) {
1✔
193
        return getMessageIdFromList(data.name);
1✔
194
      }
195
      return getMessageIdFromZset(data.name);
1✔
196
    }
197

198
    public void delete(DeleteJobData data) {
199
      for (List<String> subIds : ListUtils.partition(getMessageIds(data), batchSize)) {
1✔
200
        List<String> messageMetaIds = subIds.stream()
1✔
201
            .map(e -> RqueueMessageUtils.getMessageMetaId(queueName, e))
1✔
202
            .collect(Collectors.toList());
1✔
203
        rqueueMessageMetadataService.deleteAll(messageMetaIds);
1✔
204
        log.debug("Deleted {} messages meta", messageMetaIds.size());
1✔
205
      }
1✔
206
      messageTemplate.deleteCollection(data.name);
1✔
207
    }
1✔
208

209
    @Override
210
    public void consume(DeleteJobData data) {
211
      delete(data);
1✔
212
    }
1✔
213
  }
214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc