• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.55
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import com.github.sonus21.rqueue.core.EndpointRegistry;
20
import com.github.sonus21.rqueue.core.RqueueEndpointManager;
21
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
22
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
23
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
24
import com.github.sonus21.rqueue.enums.QueueType;
25
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
26
import com.github.sonus21.rqueue.listener.QueueDetail;
27
import com.github.sonus21.rqueue.models.db.QueueConfig;
28
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
29
import com.github.sonus21.rqueue.models.response.BaseResponse;
30
import com.github.sonus21.rqueue.service.RqueueUtilityService;
31
import com.github.sonus21.rqueue.utils.Constants;
32
import com.github.sonus21.rqueue.utils.PriorityUtils;
33
import com.github.sonus21.rqueue.utils.Validator;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.HashMap;
37
import java.util.List;
38
import java.util.Map;
39
import org.springframework.beans.factory.annotation.Autowired;
40
import org.springframework.messaging.MessageHeaders;
41
import org.springframework.messaging.converter.MessageConverter;
42
import org.springframework.util.CollectionUtils;
43

44
public class RqueueEndpointManagerImpl extends BaseMessageSender implements RqueueEndpointManager {
45

46
  @Autowired
47
  private RqueueUtilityService rqueueUtilityService;
48

49
  @Autowired
50
  private RqueueSystemConfigDao rqueueSystemConfigDao;
51

52
  public RqueueEndpointManagerImpl(
53
      RqueueMessageTemplate messageTemplate,
54
      com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
55
      MessageConverter messageConverter,
56
      MessageHeaders messageHeaders) {
57
    this(
1✔
58
        messageTemplate,
59
        messageBroker,
60
        messageConverter,
61
        messageHeaders,
62
        new UuidV4RqueueMessageIdGenerator());
63
  }
1✔
64

65
  public RqueueEndpointManagerImpl(
66
      RqueueMessageTemplate messageTemplate,
67
      com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
68
      MessageConverter messageConverter,
69
      MessageHeaders messageHeaders,
70
      RqueueMessageIdGenerator messageIdGenerator) {
71
    super(messageTemplate, messageBroker, messageConverter, messageHeaders, messageIdGenerator);
1✔
72
  }
1✔
73

74
  @Override
75
  public void registerQueue(String name, QueueType type, String... priorities) {
76
    registerQueueInternal(name, type, priorities);
1✔
77
  }
1✔
78

79
  @Override
80
  public boolean isQueueRegistered(String queueName) {
81
    try {
82
      EndpointRegistry.get(queueName);
1✔
83
      return true;
1✔
84
    } catch (QueueDoesNotExist e) {
1✔
85
      return false;
1✔
86
    }
87
  }
88

89
  @Override
90
  public List<QueueDetail> getQueueConfig(String queueName) {
91
    QueueDetail queueDetail = EndpointRegistry.get(queueName);
1✔
92
    Map<String, Integer> priorityMap = queueDetail.getPriority();
1✔
93
    if (CollectionUtils.isEmpty(priorityMap)) {
1!
UNCOV
94
      return Collections.singletonList(queueDetail);
×
95
    }
96
    Map<String, Integer> localPriorityMap = new HashMap<>(priorityMap);
1✔
97
    localPriorityMap.remove(Constants.DEFAULT_PRIORITY_KEY);
1✔
98
    List<QueueDetail> queueDetails = new ArrayList<>();
1✔
99
    queueDetails.add(queueDetail);
1✔
100
    for (String priority : localPriorityMap.keySet()) {
1✔
101
      queueDetails.add(
1✔
102
          EndpointRegistry.get(PriorityUtils.getQueueNameForPriority(queueName, priority)));
1✔
103
    }
1✔
104
    return queueDetails;
1✔
105
  }
106

107
  @Override
108
  public boolean pauseUnpauseQueue(String queueName, boolean pause) {
109
    Validator.validateQueue(queueName);
1✔
110
    PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(pause);
1✔
111
    request.setName(queueName);
1✔
112
    BaseResponse response = rqueueUtilityService.pauseUnpauseQueue(request);
1✔
113
    return response.getCode() == 0;
1✔
114
  }
115

116
  @Override
117
  public boolean pauseUnpauseQueue(String queueName, String priority, boolean pause) {
118
    Validator.validateQueue(queueName);
1✔
119
    Validator.validatePriority(priority);
1✔
120
    PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(pause);
1✔
121
    request.setName(PriorityUtils.getQueueNameForPriority(queueName, priority));
1✔
122
    BaseResponse response = rqueueUtilityService.pauseUnpauseQueue(request);
1✔
123
    return response.getCode() == 0;
1✔
124
  }
125

126
  @Override
127
  public boolean isQueuePaused(String queueName) {
128
    Validator.validateQueue(queueName);
1✔
129
    QueueConfig queueConfig = rqueueSystemConfigDao.getConfigByName(queueName, false);
1✔
130
    if (queueConfig == null) {
1✔
131
      throw new IllegalStateException("QueueConfig does not exist, is this new queue?");
1✔
132
    }
133
    return queueConfig.isPaused();
1✔
134
  }
135

136
  @Override
137
  public boolean isQueuePaused(String queueName, String priority) {
138
    Validator.validateQueue(queueName);
1✔
139
    Validator.validatePriority(priority);
1✔
140
    String name = PriorityUtils.getQueueNameForPriority(queueName, priority);
1✔
141
    QueueConfig queueConfig = rqueueSystemConfigDao.getConfigByName(name, false);
1✔
142
    if (queueConfig == null) {
1✔
143
      throw new IllegalStateException("QueueConfig does not exist, is this new queue?");
1✔
144
    }
145
    return queueConfig.isPaused();
1✔
146
  }
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc