• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.95
/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java
1
/*
2
 * Copyright (c) 2019-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.config;
18

19
import com.github.sonus21.rqueue.models.enums.RqueueMode;
20
import com.github.sonus21.rqueue.utils.Constants;
21
import com.github.sonus21.rqueue.utils.StringUtils;
22
import com.google.common.io.CharStreams;
23
import java.io.IOException;
24
import java.io.InputStream;
25
import java.io.InputStreamReader;
26
import java.net.Proxy;
27
import java.nio.charset.StandardCharsets;
28
import java.time.Duration;
29
import java.util.UUID;
30
import java.util.concurrent.atomic.AtomicLong;
31
import lombok.Getter;
32
import lombok.RequiredArgsConstructor;
33
import lombok.Setter;
34
import org.springframework.beans.factory.annotation.Value;
35
import org.springframework.context.annotation.Configuration;
36
import org.springframework.core.io.ClassPathResource;
37
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
38
import org.springframework.data.redis.connection.RedisConnectionFactory;
39

40
@Getter
41
@Setter
42
@RequiredArgsConstructor
43
@Configuration
44
public class RqueueConfig {
45

46
  @Getter
47
  private static final String brokerId = generateBrokerId();
1✔
48

49
  private static String generateBrokerId() {
50
    String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1✔
51
    java.util.Random random = new java.util.Random();
1✔
52
    StringBuilder sb = new StringBuilder(8);
1✔
53
    for (int i = 0; i < 8; i++) {
1✔
54
      sb.append(chars.charAt(random.nextInt(chars.length())));
1✔
55
    }
56
    return sb.toString();
1✔
57
  }
58

59
  private static final AtomicLong counter = new AtomicLong(1);
1✔
60
  private final RedisConnectionFactory connectionFactory;
61
  private final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
62
  private final boolean sharedConnection;
63
  private final int dbVersion;
64

65
  /**
66
   * Active rqueue backend. Defaults to {@link Backend#REDIS} so every existing call site that
67
   * constructs {@code RqueueConfig} via the Lombok-generated constructor keeps the same
68
   * behavior. The {@code rqueueConfig} bean factory in {@link RqueueListenerBaseConfig} reads the
69
   * {@code rqueue.backend} property and overrides it. Beans that need to know the active backend
70
   * should call {@link #getBackend()} instead of probing the classpath.
71
   */
72
  private Backend backend = Backend.REDIS;
73

74
  @Value("${rqueue.reactive.enabled:false}")
75
  private boolean reactiveEnabled;
76

77
  private String version = "";
78

79
  @Value("${rqueue.latest.version.check.enabled:true}")
80
  private boolean latestVersionCheckEnabled;
81

82
  @Value("${rqueue.key.prefix:__rq::}")
83
  private String prefix;
84

85
  @Value("${rqueue.del.prefix:del::")
86
  private String delPrefix;
87

88
  @Value("${rqueue.job.enabled:true}")
89
  private boolean jobEnabled;
90

91
  // 30 minutes
92
  @Value("${rqueue.job.durability.in-terminal-state:1800}")
93
  private long jobDurabilityInTerminalStateInSecond;
94

95
  @Value("${rqueue.job.key.prefix:job::}")
96
  private String jobKeyPrefix;
97

98
  @Value("${rqueue.jobs.collection.name.prefix:jobs::}")
99
  private String jobsCollectionNamePrefix;
100

101
  @Value("${rqueue.cluster.mode:true}")
102
  private boolean clusterMode;
103

104
  @Value("${rqueue.simple.queue.prefix:}")
105
  private String simpleQueuePrefix;
106

107
  @Value("${rqueue.scheduled.queue.prefix:}")
108
  private String scheduledQueuePrefix;
109

110
  @Value("${rqueue.completed.queue.prefix:}")
111
  private String completedQueuePrefix;
112

113
  @Value("${rqueue.scheduled.queue.channel.prefix:}")
114
  private String scheduledQueueChannelPrefix;
115

116
  @Value("${rqueue.processing.queue.name.prefix:}")
117
  private String processingQueuePrefix;
118

119
  @Value("${rqueue.processing.queue.channel.prefix:}")
120
  private String processingQueueChannelPrefix;
121

122
  @Value("${rqueue.queues.key.suffix:queues}")
123
  private String queuesKeySuffix;
124

125
  @Value("${rqueue.lock.key.prefix:lock::}")
126
  private String lockKeyPrefix;
127

128
  @Value("${rqueue.queue.stat.key.prefix:q-stat::}")
129
  private String queueStatKeyPrefix;
130

131
  @Value("${rqueue.queue.config.key.prefix:q-config::}")
132
  private String queueConfigKeyPrefix;
133

134
  @Value("${rqueue.retry.per.poll:1}")
135
  private int retryPerPoll;
136

137
  @Value("${rqueue.net.proxy.host:}")
138
  private String proxyHost;
139

140
  @Value("${rqueue.net.proxy.port:8000}")
141
  private Integer proxyPort;
142

143
  @Value("${rqueue.net.proxy.type:HTTP}")
144
  private Proxy.Type proxyType;
145

146
  // 7 days
147
  @Value("${rqueue.message.durability:10080}")
148
  private long messageDurabilityInMinute;
149

150
  // 30 minutes
151
  @Value("${rqueue.message.durability.in-terminal-state:1800}")
152
  private long messageDurabilityInTerminalStateInSecond;
153

154
  @Value("${rqueue.system.mode:BOTH}")
155
  private RqueueMode mode;
156

157
  @Value("${rqueue.internal.communication.channel.name.prefix:i-channel}")
158
  private String internalChannelNamePrefix;
159

160
  @Value("${rqueue.completed.job.cleanup.interval:30000}")
161
  private long completedJobCleanupIntervalInMs;
162

163
  @Value("${rqueue.worker.registry.enabled:true}")
164
  private boolean workerRegistryEnabled;
165

166
  @Value("${rqueue.worker.registry.worker.ttl:300}")
167
  private long workerRegistryWorkerTtlInSeconds;
168

169
  @Value("${rqueue.worker.registry.worker.heartbeat.interval:60}")
170
  private long workerRegistryWorkerHeartbeatIntervalInSeconds;
171

172
  @Value("${rqueue.worker.registry.queue.ttl:3600}")
173
  private long workerRegistryQueueTtlInSeconds;
174

175
  @Value("${rqueue.worker.registry.queue.heartbeat.interval:15}")
176
  private long workerRegistryQueueHeartbeatIntervalInSeconds;
177

178
  @Value("${rqueue.worker.registry.key.prefix:worker::}")
179
  private String workerRegistryKeyPrefix;
180

181
  @Value("${rqueue.worker.registry.queue.key.prefix:q-pollers::}")
182
  private String workerRegistryQueueKeyPrefix;
183

184
  public boolean messageInTerminalStateShouldBeStored() {
185
    return getMessageDurabilityInTerminalStateInSecond() > 0;
1✔
186
  }
187

188
  public long messageDurabilityInTerminalStateInMillisecond() {
189
    return getMessageDurabilityInTerminalStateInSecond() * Constants.ONE_MILLI;
1✔
190
  }
191

192
  public String getInternalCommChannelName() {
193
    return prefix + internalChannelNamePrefix;
1✔
194
  }
195

196
  public String getQueuesKey() {
UNCOV
197
    return prefix + queuesKeySuffix;
×
198
  }
199

200
  private String getSimpleQueueSuffix() {
201
    if (!StringUtils.isEmpty(simpleQueuePrefix)) {
1✔
202
      return simpleQueuePrefix;
1✔
203
    }
204
    if (dbVersion == 2) {
1!
205
      return "queue::";
1✔
206
    }
UNCOV
207
    return "queue-v2::";
×
208
  }
209

210
  private String getScheduledQueueSuffix() {
211
    if (!StringUtils.isEmpty(scheduledQueuePrefix)) {
1✔
212
      return scheduledQueuePrefix;
1✔
213
    }
214
    if (dbVersion == 2) {
1!
215
      return "d-queue::";
1✔
216
    }
UNCOV
217
    return "d-queue-v2::";
×
218
  }
219

220
  private String getCompletedQueueSuffix() {
221
    if (!StringUtils.isEmpty(completedQueuePrefix)) {
1!
UNCOV
222
      return completedQueuePrefix;
×
223
    }
224
    return "c-queue::";
1✔
225
  }
226

227
  private String getScheduledQueueChannelSuffix() {
228
    if (!StringUtils.isEmpty(scheduledQueueChannelPrefix)) {
1✔
229
      return scheduledQueueChannelPrefix;
1✔
230
    }
231
    if (dbVersion == 2) {
1!
232
      return "d-channel::";
1✔
233
    }
UNCOV
234
    return "d-channel-v2::";
×
235
  }
236

237
  private String getProcessingQueueSuffix() {
238
    if (!StringUtils.isEmpty(processingQueuePrefix)) {
1✔
239
      return processingQueuePrefix;
1✔
240
    }
241
    if (dbVersion == 2) {
1!
242
      return "p-queue::";
1✔
243
    }
UNCOV
244
    return "p-queue-v2::";
×
245
  }
246

247
  private String getProcessingQueueChannelSuffix() {
248
    if (!StringUtils.isEmpty(processingQueueChannelPrefix)) {
1✔
249
      return processingQueueChannelPrefix;
1✔
250
    }
251
    if (dbVersion == 2) {
1!
252
      return "p-channel::";
1✔
253
    }
UNCOV
254
    return "p-channel-v2::";
×
255
  }
256

257
  public String getQueueName(String queueName) {
258
    if (dbVersion == 1) {
1✔
259
      return queueName;
1✔
260
    }
261
    return prefix + getSimpleQueueSuffix() + getTaggedName(queueName);
1✔
262
  }
263

264
  public String getCompletedQueueName(String queueName) {
265
    return prefix + getCompletedQueueSuffix() + getTaggedName(queueName);
1✔
266
  }
267

268
  public String getScheduledQueueName(String queueName) {
269
    if (dbVersion == 1) {
1✔
270
      return "rqueue-delay::" + queueName;
1✔
271
    }
272
    return prefix + getScheduledQueueSuffix() + getTaggedName(queueName);
1✔
273
  }
274

275
  public String getScheduledQueueChannelName(String queueName) {
276
    if (dbVersion == 1) {
1✔
277
      return "rqueue-channel::" + queueName;
1✔
278
    }
279
    return prefix + getScheduledQueueChannelSuffix() + getTaggedName(queueName);
1✔
280
  }
281

282
  public String getProcessingQueueName(String queueName) {
283
    if (dbVersion == 1) {
1✔
284
      return "rqueue-processing::" + queueName;
1✔
285
    }
286
    return prefix + getProcessingQueueSuffix() + getTaggedName(queueName);
1✔
287
  }
288

289
  public String getProcessingQueueChannelName(String queueName) {
290
    if (dbVersion == 1) {
1✔
291
      return "rqueue-processing-channel::" + queueName;
1✔
292
    }
293
    return prefix + getProcessingQueueChannelSuffix() + getTaggedName(queueName);
1✔
294
  }
295

296
  public String getLockKey(String key) {
297
    return prefix + lockKeyPrefix + key;
1✔
298
  }
299

300
  public String getQueueStatisticsKey(String name) {
301
    return prefix + queueStatKeyPrefix + name;
1✔
302
  }
303

304
  public String getQueueConfigKey(String name) {
305
    return prefix + queueConfigKeyPrefix + name;
1✔
306
  }
307

308
  private String getTaggedName(String queueName) {
309
    if (!clusterMode) {
1✔
310
      return queueName;
1✔
311
    }
312
    boolean left = false;
1✔
313
    boolean right = false;
1✔
314
    for (Character c : queueName.toCharArray()) {
1✔
315
      if (c == '{') {
1✔
316
        left = true;
1✔
317
      } else if (c == '}') {
1✔
318
        right = true;
1✔
319
      }
320
    }
321
    if (left && right) {
1!
322
      return queueName;
1✔
323
    }
324
    return "{" + queueName + "}";
1✔
325
  }
326

327
  public String getJobId() {
328
    return prefix + jobKeyPrefix + UUID.randomUUID().toString();
1✔
329
  }
330

331
  public String getJobsKey(String messageId) {
332
    return prefix + jobsCollectionNamePrefix + messageId;
1✔
333
  }
334

335
  public String getWorkerRegistryKey(String workerId) {
336
    return prefix + workerRegistryKeyPrefix + workerId;
1✔
337
  }
338

339
  public String getWorkerRegistryQueueKey(String queueName) {
340
    return prefix + workerRegistryQueueKeyPrefix + getTaggedName(queueName);
1✔
341
  }
342

343
  public String getDelDataName(String queueName) {
344
    return prefix
1✔
345
        + delPrefix
346
        + brokerId
347
        + Constants.REDIS_KEY_SEPARATOR
348
        + getTaggedName(queueName)
1✔
349
        + counter.incrementAndGet();
1✔
350
  }
351

352
  public Duration getJobDurabilityInTerminalState() {
353
    return Duration.ofSeconds(jobDurabilityInTerminalStateInSecond);
1✔
354
  }
355

356
  public Duration getWorkerRegistryWorkerTtl() {
357
    return Duration.ofSeconds(workerRegistryWorkerTtlInSeconds);
1✔
358
  }
359

360
  public Duration getWorkerRegistryWorkerHeartbeatInterval() {
361
    return Duration.ofSeconds(workerRegistryWorkerHeartbeatIntervalInSeconds);
1✔
362
  }
363

364
  public Duration getWorkerRegistryQueueTtl() {
365
    return Duration.ofSeconds(workerRegistryQueueTtlInSeconds);
1✔
366
  }
367

368
  public Duration getWorkerRegistryQueueHeartbeatInterval() {
369
    return Duration.ofSeconds(workerRegistryQueueHeartbeatIntervalInSeconds);
1✔
370
  }
371

372
  public String getLibVersion() {
373
    if (StringUtils.isEmpty(version)) {
1✔
374
      ClassPathResource resource =
1✔
375
          new ClassPathResource("META-INF/RQUEUE.MF", this.getClass().getClassLoader());
1✔
376
      try {
377
        InputStream inputStream = resource.getInputStream();
1✔
378
        String result =
1✔
379
            CharStreams.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
1✔
380
        for (String line : result.split("\n")) {
1✔
381
          String[] words = line.trim().split(":");
1✔
382
          if (2 == words.length && words[0].equals("Version")) {
1!
383
            version = words[1].split("-")[0];
1✔
384
          }
385
        }
UNCOV
386
      } catch (IOException e) {
×
UNCOV
387
        e.printStackTrace();
×
388
      }
1✔
389
    }
390
    return version;
1✔
391
  }
392

393
  public boolean isProducer() {
394
    return RqueueMode.PRODUCER.equals(getMode());
1✔
395
  }
396

397
  public Duration getMessageDurability(Long messageLife) {
398
    if (messageLife == null || messageLife.intValue() == 0) {
1✔
399
      return Duration.ofMinutes(messageDurabilityInMinute);
1✔
400
    }
401
    Duration duration = Duration.ofMillis(2 * messageLife);
1✔
402
    if (duration.toMinutes() > messageDurabilityInMinute) {
1!
UNCOV
403
      return duration;
×
404
    }
405
    return Duration.ofMinutes(messageDurabilityInMinute);
1✔
406
  }
407
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc