• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ThreadUtils.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.utils;
18

19
import java.util.ArrayList;
20
import java.util.Collection;
21
import java.util.List;
22
import java.util.concurrent.CancellationException;
23
import java.util.concurrent.ExecutionException;
24
import java.util.concurrent.Future;
25
import java.util.concurrent.ScheduledFuture;
26
import java.util.concurrent.TimeUnit;
27
import java.util.concurrent.TimeoutException;
28
import org.slf4j.Logger;
29
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
30
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
31

32
public final class ThreadUtils {
33

34
  private ThreadUtils() {}
35

36
  public static ThreadPoolTaskScheduler createTaskScheduler(
37
      int poolSize, String threadPrefix, int terminationTime) {
38
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
1✔
39
    scheduler.setBeanName(threadPrefix.substring(0, threadPrefix.length() - 1));
1✔
40
    scheduler.setPoolSize(poolSize);
1✔
41
    scheduler.setThreadNamePrefix(threadPrefix);
1✔
42
    scheduler.setAwaitTerminationSeconds(terminationTime);
1✔
43
    scheduler.setRemoveOnCancelPolicy(true);
1✔
44
    scheduler.afterPropertiesSet();
1✔
45
    return scheduler;
1✔
46
  }
47

48
  public static ThreadPoolTaskExecutor createTaskExecutor(
49
      String beanName, String threadPrefix, int corePoolSize, int maxPoolSize, int queueCapacity) {
50
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
1✔
51
    threadPoolTaskExecutor.setThreadNamePrefix(threadPrefix);
1✔
52
    threadPoolTaskExecutor.setBeanName(beanName);
1✔
53
    if (corePoolSize > 0) {
1✔
54
      threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
1✔
55
      threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
1✔
56
      threadPoolTaskExecutor.setMaxPoolSize(Math.max(corePoolSize, maxPoolSize));
1✔
57
      threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
1✔
58
    }
59
    threadPoolTaskExecutor.afterPropertiesSet();
1✔
60
    return threadPoolTaskExecutor;
1✔
61
  }
62

63
  private static void waitForShutdown(
64
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
65
    boolean completedOrCancelled = future.isCancelled() || future.isDone();
1!
66
    if (completedOrCancelled) {
1!
UNCOV
67
      return;
×
68
    }
69
    try {
70
      future.get(waitTimeInMillis, TimeUnit.MILLISECONDS);
1✔
UNCOV
71
    } catch (ExecutionException | TimeoutException | CancellationException e) {
×
72
      log.debug(msg, msgParams, e);
×
73
    } catch (InterruptedException e) {
×
74
      Thread.currentThread().interrupt();
×
75
    }
1✔
76
  }
1✔
77

78
  public static void waitForTermination(
79
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
80
    if (future == null || future.isCancelled() || future.isDone()) {
1!
81
      return;
1✔
82
    }
83
    if (future instanceof ScheduledFuture) {
1✔
84
      ScheduledFuture<?> f = (ScheduledFuture<?>) future;
1✔
85
      if (f.getDelay(TimeUnit.MILLISECONDS) > Constants.MIN_DELAY) {
1✔
86
        f.cancel(false);
1✔
87
        return;
1✔
88
      }
89
    }
90
    waitForShutdown(log, future, waitTimeInMillis, msg, msgParams);
1✔
91
  }
1✔
92

93
  public static boolean waitForWorkerTermination(
94
      final Collection<QueueThreadPool> queueThreadPools, long waitTime) {
95
    long endTime = System.currentTimeMillis() + waitTime;
1✔
96
    List<QueueThreadPool> remaining = new ArrayList<>(queueThreadPools);
1✔
97
    while (System.currentTimeMillis() < endTime && !remaining.isEmpty()) {
1!
98
      List<QueueThreadPool> newRemaining = new ArrayList<>();
1✔
99
      for (QueueThreadPool queueThreadPool : remaining) {
1✔
100
        if (!queueThreadPool.allTasksCompleted()) {
1✔
101
          newRemaining.add(queueThreadPool);
1✔
102
        }
103
      }
1✔
104
      if (!newRemaining.isEmpty()) {
1✔
105
        TimeoutUtils.sleep(10);
1✔
106
      }
107
      remaining = newRemaining;
1✔
108
    }
1✔
109
    return remaining.isEmpty();
1✔
110
  }
111

112
  public static String getWorkerName(String name) {
113
    String camelCase = StringUtils.getBeanName(name);
1✔
114
    return camelCase + "Listener";
1✔
115
  }
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc