• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.64
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueInternalPubSubChannel.java
1
/*
2
 * Copyright (c) 2021-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core;
18

19
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
20
import com.github.sonus21.rqueue.config.RqueueConfig;
21
import com.github.sonus21.rqueue.converter.RqueueRedisSerializer;
22
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
23
import com.github.sonus21.rqueue.models.enums.PubSubType;
24
import com.github.sonus21.rqueue.models.event.RqueuePubSubEvent;
25
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
26
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
27
import com.github.sonus21.rqueue.serdes.SerializationUtils;
28
import com.github.sonus21.rqueue.utils.Constants;
29
import com.github.sonus21.rqueue.utils.StringUtils;
30
import java.time.Duration;
31
import java.util.UUID;
32
import lombok.extern.slf4j.Slf4j;
33
import org.springframework.beans.factory.InitializingBean;
34
import org.springframework.data.redis.connection.Message;
35
import org.springframework.data.redis.connection.MessageListener;
36
import org.springframework.data.redis.listener.ChannelTopic;
37

38
@Slf4j
1✔
39
public class RqueueInternalPubSubChannel implements InitializingBean {
40

41
  private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
42
  private final RqueueMessageListenerContainer rqueueMessageListenerContainer;
43
  private final RqueueConfig rqueueConfig;
44
  private final RqueueRedisTemplate<String> stringRqueueRedisTemplate;
45
  private final RqueueRedisSerializer rqueueRedisSerializer;
46
  private final RqueueBeanProvider rqueueBeanProvider;
47
  private final RqueueSerDes serDes = SerializationUtils.getSerDes();
1✔
48

49
  public RqueueInternalPubSubChannel(
50
      RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
51
      RqueueMessageListenerContainer rqueueMessageListenerContainer,
52
      RqueueConfig rqueueConfig,
53
      RqueueRedisTemplate<String> stringRqueueRedisTemplate,
54
      RqueueBeanProvider rqueueBeanProvider) {
1✔
55
    this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
1✔
56
    this.rqueueMessageListenerContainer = rqueueMessageListenerContainer;
1✔
57
    this.rqueueConfig = rqueueConfig;
1✔
58
    this.stringRqueueRedisTemplate = stringRqueueRedisTemplate;
1✔
59
    this.rqueueBeanProvider = rqueueBeanProvider;
1✔
60
    this.rqueueRedisSerializer = new RqueueRedisSerializer();
1✔
61
  }
1✔
62

63
  @Override
64
  public void afterPropertiesSet() throws Exception {
65
    String channel = rqueueConfig.getInternalCommChannelName();
1✔
66
    rqueueRedisListenerContainerFactory.addMessageListener(
1✔
67
        new InternalMessageListener(), new ChannelTopic(channel));
68
  }
1✔
69

70
  public void emitPauseUnpauseQueueEvent(PauseUnpauseQueueRequest pauseUnpauseQueueRequest) {
71
    publish(PubSubType.PAUSE_QUEUE, pauseUnpauseQueueRequest);
1✔
72
  }
1✔
73

74
  private void publish(PubSubType type, Object message) {
75
    byte[] data = rqueueRedisSerializer.serialize(message);
1✔
76
    RqueuePubSubEvent event =
1✔
77
        new RqueuePubSubEvent(type, RqueueConfig.getBrokerId(), new String(data));
1✔
78
    stringRqueueRedisTemplate
1✔
79
        .getRedisTemplate()
1✔
80
        .convertAndSend(rqueueConfig.getInternalCommChannelName(), event);
1✔
81
  }
1✔
82

83
  public void emitQueueConfigUpdateEvent(PauseUnpauseQueueRequest request) {
84
    publish(PubSubType.QUEUE_CRUD, request.getName());
1✔
85
  }
1✔
86

87
  class InternalMessageListener implements MessageListener {
1✔
88

89
    @Override
90
    public void onMessage(Message message, byte[] pattern) {
91
      byte[] body = message.getBody();
1✔
92
      if (SerializationUtils.isEmpty(body)) {
1!
UNCOV
93
        log.error(
×
94
            "Empty message received on channel: {}, pattern: {}",
95
            new String(message.getChannel()),
×
96
            new String(pattern));
97
        return;
×
98
      }
99
      processEvent(body);
1✔
100
    }
1✔
101

102
    private void processEvent(byte[] body) {
103
      log.debug("Message on internal channel {}", new String(body));
1✔
104
      RqueuePubSubEvent rqueuePubSubEvent;
105
      try {
106
        rqueuePubSubEvent = serDes.deserialize(body, RqueuePubSubEvent.class);
1✔
UNCOV
107
      } catch (Exception e) {
×
UNCOV
108
        log.error("Invalid message on pub-sub channel {}", new String(body), e);
×
109
        return;
×
110
      }
1✔
111
      if (rqueuePubSubEvent == null) {
1!
UNCOV
112
        log.error("Invalid message on pub-sub channel {}", new String(body));
×
UNCOV
113
        return;
×
114
      }
115
      try {
116
        switch (rqueuePubSubEvent.getType()) {
1!
117
          case PAUSE_QUEUE:
118
            handlePauseEvent(rqueuePubSubEvent.messageAs(serDes, PauseUnpauseQueueRequest.class));
1✔
119
            break;
1✔
120
          case QUEUE_CRUD:
121
            rqueueBeanProvider
1✔
122
                .getRqueueSystemConfigDao()
1✔
123
                .clearCacheByName(rqueuePubSubEvent.messageAs(serDes, String.class));
1✔
124
            break;
1✔
125
          default:
UNCOV
126
            log.error("Unknown event type {}", rqueuePubSubEvent);
×
127
        }
UNCOV
128
      } catch (Exception e) {
×
129
        log.error("Failed to process pub-sub event {}", rqueuePubSubEvent, e);
×
130
      }
1✔
131
    }
1✔
132

133
    private void handlePauseEvent(PauseUnpauseQueueRequest request) {
134
      if (request == null || StringUtils.isEmpty(request.getName())) {
1!
UNCOV
135
        log.error("Invalid message payload {}", request);
×
UNCOV
136
        return;
×
137
      }
138
      String lockKey = Constants.getQueueCrudLockKey(rqueueConfig, request.getName());
1✔
139
      String lockValue = UUID.randomUUID().toString();
1✔
140
      try {
141
        boolean acquired = rqueueBeanProvider
1✔
142
            .getRqueueLockManager()
1✔
143
            .acquireLock(lockKey, lockValue, Duration.ofMillis(100));
1✔
144
        if (acquired) {
1!
145
          rqueueMessageListenerContainer.pauseUnpauseQueue(request.getName(), request.isPause());
1✔
146
        }
147
      } finally {
148
        rqueueBeanProvider.getRqueueLockManager().releaseLock(lockKey, lockValue);
1✔
149
      }
150
    }
1✔
151
  }
152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc