• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import static com.github.sonus21.rqueue.core.RedisScriptFactory.getScript;
20

21
import com.github.sonus21.rqueue.common.ReactiveRqueueRedisTemplate;
22
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
23
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
24
import com.github.sonus21.rqueue.core.RqueueMessage;
25
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
26
import com.github.sonus21.rqueue.models.MessageMoveResult;
27
import com.github.sonus21.rqueue.utils.Constants;
28
import com.github.sonus21.rqueue.utils.RedisUtils;
29
import java.util.ArrayList;
30
import java.util.Arrays;
31
import java.util.Collections;
32
import java.util.List;
33
import java.util.Optional;
34
import java.util.Set;
35
import lombok.extern.slf4j.Slf4j;
36
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
37
import org.springframework.data.redis.connection.RedisConnectionFactory;
38
import org.springframework.data.redis.core.RedisTemplate;
39
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
40
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
41
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
42
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
43
import org.springframework.data.redis.core.script.RedisScript;
44
import org.springframework.util.CollectionUtils;
45
import reactor.core.publisher.Flux;
46
import reactor.core.publisher.Mono;
47

48
/**
49
 * RqueueMessageTemplate is the core of the Rqueue, this deals with Redis calls.
50
 *
51
 * <p>It communicates with the Redis using Lua script and direct calls.
52
 */
53
@Slf4j
1✔
54
public class RqueueMessageTemplateImpl extends RqueueRedisTemplate<RqueueMessage>
55
    implements RqueueMessageTemplate {
56

57
  private final DefaultScriptExecutor<String> scriptExecutor;
58
  private final ReactiveScriptExecutor<String> reactiveScriptExecutor;
59
  private final ReactiveRqueueRedisTemplate<RqueueMessage> reactiveRedisTemplate;
60

61
  public RqueueMessageTemplateImpl(
62
      RedisConnectionFactory redisConnectionFactory,
63
      ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
64
    super(redisConnectionFactory);
1✔
65
    // On the NATS backend path the connection factory is null and the Redis-script executors
66
    // are never invoked; leaving them null fails fast if anyone does try to use them.
67
    this.scriptExecutor = redisTemplate == null ? null : new DefaultScriptExecutor<>(redisTemplate);
1✔
68
    if (reactiveRedisConnectionFactory != null) {
1✔
69
      this.reactiveRedisTemplate =
1✔
70
          new ReactiveRqueueRedisTemplate<>(reactiveRedisConnectionFactory);
71
      this.reactiveScriptExecutor = new DefaultReactiveScriptExecutor<>(
1✔
72
          reactiveRedisConnectionFactory,
73
          RedisUtils.redisSerializationContextProvider.getSerializationContext());
1✔
74
    } else {
75
      this.reactiveScriptExecutor = null;
1✔
76
      this.reactiveRedisTemplate = null;
1✔
77
    }
78
  }
1✔
79

80
  @Override
81
  public List<RqueueMessage> pop(
82
      String queueName,
83
      String processingQueueName,
84
      String processingChannelName,
85
      long visibilityTimeout,
86
      int count) {
87
    if (count < Constants.MIN_BATCH_SIZE) {
1!
UNCOV
88
      throw new IllegalArgumentException(
×
89
          "Count must be greater than or equal to " + Constants.MIN_BATCH_SIZE);
90
    }
91
    long currentTime = System.currentTimeMillis();
1✔
92
    RedisScript<List<RqueueMessage>> script = getScript(ScriptType.DEQUEUE_MESSAGE);
1✔
93
    List<RqueueMessage> messages = scriptExecutor.execute(
1✔
94
        script,
95
        Arrays.asList(queueName, processingQueueName, processingChannelName),
1✔
96
        currentTime,
1✔
97
        currentTime + visibilityTimeout,
1✔
98
        count);
1✔
99
    log.debug("Pop Queue: {}, N: {}, Messages: {}", queueName, count, messages);
1✔
100
    return messages;
1✔
101
  }
102

103
  @Override
104
  public Long addMessageWithDelay(
105
      String delayQueueName, String delayQueueChannelName, RqueueMessage rqueueMessage) {
106
    log.debug("AddMessageWithDelay Queue: {}, Message: {}", delayQueueName, rqueueMessage);
1✔
107
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
108
    return scriptExecutor.execute(
1✔
109
        script,
110
        Arrays.asList(delayQueueName, delayQueueChannelName),
1✔
111
        rqueueMessage,
112
        rqueueMessage.getProcessAt(),
1✔
113
        System.currentTimeMillis());
1✔
114
  }
115

116
  @Override
117
  public Flux<Long> addReactiveMessageWithDelay(
118
      String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage) {
119
    log.debug(
1✔
120
        "AddReactiveMessageWithDelay Queue: {}, Message: {}", scheduledQueueName, rqueueMessage);
121
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
122
    return reactiveScriptExecutor.execute(
1✔
123
        script,
124
        Arrays.asList(scheduledQueueName, scheduledQueueChannelName),
1✔
125
        Arrays.asList(rqueueMessage, rqueueMessage.getProcessAt(), System.currentTimeMillis()));
1✔
126
  }
127

128
  @Override
129
  public Long addMessage(String listName, RqueueMessage rqueueMessage) {
130
    log.debug("AddMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
131
    return rpush(listName, rqueueMessage);
1✔
132
  }
133

134
  @Override
135
  public Long addMessageAtFront(String listName, RqueueMessage rqueueMessage) {
136
    log.debug("AddMessageAtFront Queue: {}, Message: {}", listName, rqueueMessage);
1✔
137
    return lpush(listName, rqueueMessage);
1✔
138
  }
139

140
  @Override
141
  public Mono<Long> addReactiveMessage(String listName, RqueueMessage rqueueMessage) {
142
    log.debug("AddReactiveMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
143
    return reactiveRedisTemplate.template().opsForList().rightPush(listName, rqueueMessage);
1✔
144
  }
145

146
  @Override
147
  public Boolean addToZset(String zsetName, RqueueMessage rqueueMessage, long score) {
148
    log.debug("AddToZset Queue: {}, Message: {}", zsetName, rqueueMessage);
1✔
149
    return zadd(zsetName, rqueueMessage, score);
1✔
150
  }
151

152
  @Override
153
  public void moveMessageWithDelay(
154
      String srcZsetName, String tgtZsetName, RqueueMessage src, RqueueMessage tgt, long delay) {
155
    log.debug(
1✔
156
        "MoveMessageWithDelay Src:[Q={},M={}], Dst:[Q={},M={}]",
157
        srcZsetName,
158
        src,
159
        tgtZsetName,
160
        tgt);
161
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_ZSET);
1✔
162
    Long response = scriptExecutor.execute(
1✔
163
        script,
164
        Arrays.asList(srcZsetName, tgtZsetName),
1✔
165
        src,
166
        tgt,
167
        System.currentTimeMillis() + delay);
1✔
168
    if (response == null) {
1!
UNCOV
169
      log.error("Duplicate processing for the message {}", src);
×
170
    }
171
  }
1✔
172

173
  @Override
174
  public void moveMessage(
175
      String srcZsetName, String tgtListName, RqueueMessage src, RqueueMessage tgt) {
176
    log.debug("MoveMessage Src:[Q={},M={}], Dst:[Q={},M={}]", srcZsetName, src, tgtListName, tgt);
×
UNCOV
177
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_LIST);
×
178
    Long response =
×
UNCOV
179
        scriptExecutor.execute(script, Arrays.asList(srcZsetName, tgtListName), src, tgt);
×
UNCOV
180
    if (response == null) {
×
UNCOV
181
      log.error("Duplicate processing for the message {}", src);
×
182
    }
UNCOV
183
  }
×
184

185
  @Override
186
  public List<RqueueMessage> getAllMessages(
187
      String queueName, String processingQueueName, String delayQueueName) {
188
    List<RqueueMessage> messages = lrange(queueName, 0, -1);
1✔
189
    if (CollectionUtils.isEmpty(messages)) {
1✔
190
      messages = new ArrayList<>();
1✔
191
    }
192
    Set<RqueueMessage> messagesInProcessingQueue = zrange(processingQueueName, 0, -1);
1✔
193
    if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) {
1✔
194
      messages.addAll(messagesInProcessingQueue);
1✔
195
    }
196
    if (delayQueueName != null) {
1!
197
      Set<RqueueMessage> messagesFromZset = zrange(delayQueueName, 0, -1);
1✔
198
      if (!CollectionUtils.isEmpty(messagesFromZset)) {
1✔
199
        messages.addAll(messagesFromZset);
1✔
200
      }
201
    }
202
    return messages;
1✔
203
  }
204

205
  @Override
206
  public Long getScore(String zsetName, RqueueMessage message) {
207
    Double score = redisTemplate.opsForZSet().score(zsetName, message);
1✔
208
    if (score == null) {
1✔
209
      return null;
1✔
210
    }
211
    return score.longValue();
1✔
212
  }
213

214
  @Override
215
  public boolean addScore(String zsetName, RqueueMessage message, long delta) {
216
    return scriptExecutor.execute(
1✔
217
        getScript(ScriptType.SCORE_UPDATER), Collections.singletonList(zsetName), message, delta);
1✔
218
  }
219

220
  private MessageMoveResult moveMessageToList(
221
      String src, String dst, int maxMessage, ScriptType scriptType) {
222
    RedisScript<Long> script = getScript(scriptType);
1✔
223
    long messagesInSrc = maxMessage;
1✔
224
    int remainingMessages = maxMessage;
1✔
225
    while (messagesInSrc > 0 && remainingMessages > 0) {
1✔
226
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
227
      messagesInSrc = scriptExecutor.execute(script, Arrays.asList(src, dst), messageCount);
1✔
228
      remainingMessages -= messageCount;
1✔
229
    }
1✔
230
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
231
  }
232

233
  @Override
234
  public MessageMoveResult moveMessageListToList(
235
      String srcQueueName, String dstQueueName, int numberOfMessage) {
236
    return moveMessageToList(
1✔
237
        srcQueueName, dstQueueName, numberOfMessage, ScriptType.MOVE_MESSAGE_LIST_TO_LIST);
238
  }
239

240
  @Override
241
  public MessageMoveResult moveMessageZsetToList(
242
      String sourceZset, String destinationList, int maxMessage) {
243
    return moveMessageToList(
1✔
244
        sourceZset, destinationList, maxMessage, ScriptType.MOVE_MESSAGE_ZSET_TO_LIST);
245
  }
246

247
  @Override
248
  public MessageMoveResult moveMessageListToZset(
249
      String sourceList, String destinationZset, int maxMessage, long score) {
250
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_LIST_TO_ZSET);
1✔
251
    long messagesInList = maxMessage;
1✔
252
    int remainingMessages = maxMessage;
1✔
253
    while (messagesInList > 0 && remainingMessages > 0) {
1!
254
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
255
      messagesInList = scriptExecutor.execute(
1✔
256
          script, Arrays.asList(sourceList, destinationZset), messageCount, score);
1✔
257
      remainingMessages -= messageCount;
1✔
258
    }
1✔
259
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
260
  }
261

262
  @Override
263
  public MessageMoveResult moveMessageZsetToZset(
264
      String sourceZset,
265
      String destinationZset,
266
      int maxMessage,
267
      long newScore,
268
      boolean fixedScore) {
269
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_ZSET_TO_ZSET);
1✔
270
    long messageInZset = maxMessage;
1✔
271
    int remainingMessages = maxMessage;
1✔
272
    while (messageInZset > 0 && remainingMessages > 0) {
1✔
273
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
274
      messageInZset = scriptExecutor.execute(
1✔
275
          script, Arrays.asList(sourceZset, destinationZset), messageCount, newScore, fixedScore);
1✔
276
      remainingMessages -= messageCount;
1✔
277
    }
1✔
278
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
279
  }
280

281
  @Override
282
  public List<RqueueMessage> readFromZset(String name, long start, long end) {
283
    Set<RqueueMessage> messages = zrange(name, start, end);
1✔
284
    if (messages == null) {
1!
UNCOV
285
      return new ArrayList<>();
×
286
    }
287
    return new ArrayList<>(messages);
1✔
288
  }
289

290
  @Override
291
  public List<TypedTuple<RqueueMessage>> readFromZsetWithScore(String name, long start, long end) {
292
    Set<TypedTuple<RqueueMessage>> messages = zrangeWithScore(name, start, end);
1✔
293
    if (messages == null) {
1!
UNCOV
294
      return new ArrayList<>();
×
295
    }
296
    return new ArrayList<>(messages);
1✔
297
  }
298

299
  @Override
300
  public Long scheduleMessage(
301
      String zsetName, String messageId, RqueueMessage rqueueMessage, Long expiryInSeconds) {
302
    RedisScript<Long> script = getScript(ScriptType.SCHEDULE_MESSAGE);
1✔
303
    return scriptExecutor.execute(
1✔
304
        script,
305
        Arrays.asList(messageId, zsetName),
1✔
306
        expiryInSeconds,
307
        rqueueMessage,
308
        rqueueMessage.getProcessAt());
1✔
309
  }
310

311
  @Override
312
  public boolean renameCollection(String srcName, String tgtName) {
313
    rename(srcName, tgtName);
1✔
314
    return true;
1✔
315
  }
316

317
  @Override
318
  public boolean renameCollections(List<String> srcNames, List<String> tgtNames) {
319
    rename(srcNames, tgtNames);
1✔
320
    return true;
1✔
321
  }
322

323
  @Override
324
  public void deleteCollection(String name) {
325
    redisTemplate.delete(name);
1✔
326
  }
1✔
327

328
  @Override
329
  public List<RqueueMessage> readFromList(String name, long start, long end) {
330
    List<RqueueMessage> messages = lrange(name, start, end);
1✔
331
    if (messages == null) {
1!
UNCOV
332
      return new ArrayList<>();
×
333
    }
334
    return messages;
1✔
335
  }
336

337
  @Override
338
  public RedisTemplate<String, RqueueMessage> getTemplate() {
339
    return super.redisTemplate;
1✔
340
  }
341

342
  @Override
343
  public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
344
    return super.removeFromZset(zsetName, rqueueMessage);
1✔
345
  }
346

347
  @Override
348
  public Optional<RqueueMessage> findFirstElementFromList(String name) {
UNCOV
349
    return readFromList(name, 0, 0).stream().findFirst();
×
350
  }
351

352
  @Override
353
  public Optional<RqueueMessage> findFirstElementFromZset(String name) {
UNCOV
354
    return readFromZset(name, 0, 0).stream().findFirst();
×
355
  }
356

357
  @Override
358
  public Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name) {
UNCOV
359
    return readFromZsetWithScore(name, 0, 0).stream().findFirst();
×
360
  }
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc