• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.12
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 */
10

11
package com.github.sonus21.rqueue.nats.service;
12

13
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14
import com.github.sonus21.rqueue.config.RqueueWebConfig;
15
import com.github.sonus21.rqueue.core.RqueueMessage;
16
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
17
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
18
import com.github.sonus21.rqueue.models.Pair;
19
import com.github.sonus21.rqueue.models.db.MessageMetadata;
20
import com.github.sonus21.rqueue.models.db.QueueConfig;
21
import com.github.sonus21.rqueue.models.enums.AggregationType;
22
import com.github.sonus21.rqueue.models.request.MessageMoveRequest;
23
import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest;
24
import com.github.sonus21.rqueue.models.response.BaseResponse;
25
import com.github.sonus21.rqueue.models.response.BooleanResponse;
26
import com.github.sonus21.rqueue.models.response.DataSelectorResponse;
27
import com.github.sonus21.rqueue.models.response.MessageMoveResponse;
28
import com.github.sonus21.rqueue.models.response.StringResponse;
29
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
30
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
31
import com.github.sonus21.rqueue.service.RqueueMessageMetadataService;
32
import com.github.sonus21.rqueue.service.RqueueUtilityService;
33
import com.github.sonus21.rqueue.utils.Constants;
34
import com.github.sonus21.rqueue.utils.StringUtils;
35
import io.nats.client.JetStream;
36
import io.nats.client.JetStreamApiException;
37
import io.nats.client.JetStreamManagement;
38
import io.nats.client.api.MessageInfo;
39
import io.nats.client.api.StreamInfo;
40
import io.nats.client.impl.Headers;
41
import java.io.IOException;
42
import java.time.Duration;
43
import java.util.LinkedList;
44
import java.util.List;
45
import lombok.extern.slf4j.Slf4j;
46
import org.springframework.beans.factory.annotation.Autowired;
47
import org.springframework.context.annotation.Conditional;
48
import org.springframework.stereotype.Service;
49
import reactor.core.publisher.Mono;
50

51
/**
52
 * NATS-backend implementation of {@link RqueueUtilityService}.
53
 *
54
 * <p>The implementation supports operations that map cleanly onto JetStream's model:
55
 * <ul>
56
 *   <li>{@link #pauseUnpauseQueue(PauseUnpauseQueueRequest)} — flips the {@code paused} flag on
57
 *       {@link QueueConfig} in the queue-config KV bucket and propagates the change to the local
58
 *       {@link RqueueMessageListenerContainer} so the poller stops polling. Multi-instance fan-out
59
 *       is a follow-up (NATS pub/sub bridge).
60
 *   <li>{@link #deleteMessage(String, String)} — soft delete: marks the metadata record in the
61
 *       message-metadata KV bucket. The stream message persists; the dashboard hides it via the
62
 *       {@code deleted} flag, matching the Redis impl's semantics.
63
 *   <li>{@link #aggregateDataCounter(AggregationType)} — pure date-selector logic, no backend
64
 *       dependency.
65
 *   <li>{@link #getDataType(String)} — reports {@code "STREAM"} since JetStream subjects map to
66
 *       stream messages, not Redis-shaped data structures.
67
 * </ul>
68
 *
69
 * <p>{@link #moveMessage(MessageMoveRequest)} reads up to {@code maxMessages} from the source
70
 * JetStream stream, republishes each to the destination stream, and hard-deletes the source copy
71
 * via {@link JetStreamManagement#deleteMessage}. {@link #enqueueMessage(String, String, String)}
72
 * looks up the message in the metadata store and republishes it immediately (without a
73
 * {@code Nats-Next-Deliver-Time} header) so the worker picks it up on its next poll.
74
 *
75
 * <p>{@link #makeEmpty(String, String)} still returns "not supported" — purging a stream is a
76
 * destructive admin operation best performed via {@code nats stream purge}.
77
 */
78
@Service
79
@Conditional(NatsBackendCondition.class)
80
@Slf4j
1✔
81
public class NatsRqueueUtilityService implements RqueueUtilityService {
82

83
  private static final String NOT_SUPPORTED_SUFFIX =
84
      " is not supported with rqueue.backend=nats in v1";
85

86
  /** Error code returned by JetStream when a sequence does not exist in the stream. */
87
  private static final int JS_NO_MESSAGE_FOUND = 10037;
88

89
  private final RqueueWebConfig rqueueWebConfig;
90
  private final RqueueSystemConfigDao systemConfigDao;
91
  private final RqueueMessageMetadataService messageMetadataService;
92
  private final RqueueMessageListenerContainer rqueueMessageListenerContainer;
93
  private final JetStreamManagement jsm;
94
  private final JetStream js;
95
  private final RqueueSerDes serdes;
96
  private final String streamPrefix;
97
  private final String subjectPrefix;
98

99
  @Autowired
100
  public NatsRqueueUtilityService(
101
      RqueueWebConfig rqueueWebConfig,
102
      RqueueSystemConfigDao systemConfigDao,
103
      RqueueMessageMetadataService messageMetadataService,
104
      RqueueMessageListenerContainer rqueueMessageListenerContainer,
105
      JetStreamManagement jsm,
106
      JetStream js,
107
      RqueueSerDes serdes,
108
      RqueueNatsConfig natsConfig) {
1✔
109
    this.rqueueWebConfig = rqueueWebConfig;
1✔
110
    this.systemConfigDao = systemConfigDao;
1✔
111
    this.messageMetadataService = messageMetadataService;
1✔
112
    this.rqueueMessageListenerContainer = rqueueMessageListenerContainer;
1✔
113
    this.jsm = jsm;
1✔
114
    this.js = js;
1✔
115
    this.serdes = serdes;
1✔
116
    this.streamPrefix = natsConfig.getStreamPrefix();
1✔
117
    this.subjectPrefix = natsConfig.getSubjectPrefix();
1✔
118
  }
1✔
119

120
  private static <T extends BaseResponse> T notSupported(T response, String op) {
121
    response.setCode(1);
1✔
122
    response.setMessage(op + NOT_SUPPORTED_SUFFIX);
1✔
123
    return response;
1✔
124
  }
125

126
  /**
127
   * Soft-delete: marks the message metadata as deleted. The underlying stream message persists
128
   * (JetStream streams are immutable), but the dashboard and consumers honor the deleted flag.
129
   */
130
  @Override
131
  public BooleanResponse deleteMessage(String queueName, String id) {
132
    BooleanResponse response = new BooleanResponse();
1✔
133
    if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(id)) {
1!
134
      response.setCode(1);
1✔
135
      response.setMessage("queueName and id are required");
1✔
136
      return response;
1✔
137
    }
138
    try {
139
      boolean ok = messageMetadataService.deleteMessage(
1✔
140
          queueName, id, Duration.ofDays(Constants.DAYS_IN_A_MONTH));
1✔
141
      if (!ok) {
1✔
142
        response.setCode(1);
1✔
143
        response.setMessage("Message metadata not found for queue=" + queueName + " id=" + id);
1✔
144
        return response;
1✔
145
      }
146
      response.setValue(true);
1✔
147
      return response;
1✔
148
    } catch (Exception e) {
1✔
149
      log.warn("deleteMessage failed for queue={} id={}", queueName, id, e);
1✔
150
      response.setCode(1);
1✔
151
      response.setMessage("deleteMessage failed: " + e.getMessage());
1✔
152
      return response;
1✔
153
    }
154
  }
155

156
  /**
157
   * Re-enqueue a message for immediate delivery. Looks up the {@link RqueueMessage} from the
158
   * metadata store by {@code queueName + id}, then republishes the raw bytes to the queue's
159
   * JetStream stream without a {@code Nats-Next-Deliver-Time} header so the poller picks it up
160
   * on its next fetch. A fresh {@code Nats-Msg-Id} ({@code id-requeue-<millis>}) prevents
161
   * JetStream from deduplicating against the original scheduled publish. The {@code position}
162
   * hint (FRONT / BACK) is ignored — JetStream pull consumers deliver in stream-sequence order.
163
   */
164
  @Override
165
  public BooleanResponse enqueueMessage(String queueName, String id, String position) {
166
    BooleanResponse r = new BooleanResponse();
1✔
167
    if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(id)) {
1✔
168
      r.setCode(1);
1✔
169
      r.setMessage("queueName and id are required");
1✔
170
      return r;
1✔
171
    }
172
    MessageMetadata meta = messageMetadataService.getByMessageId(queueName, id);
1✔
173
    if (meta == null || meta.getRqueueMessage() == null) {
1!
174
      r.setCode(1);
1✔
175
      r.setMessage("Message not found for queue=" + queueName + " id=" + id);
1✔
176
      return r;
1✔
177
    }
178
    RqueueMessage message = meta.getRqueueMessage();
1✔
179
    String subject = toSubject(queueName);
1✔
180
    try {
181
      byte[] payload = serdes.serialize(message);
1✔
182
      Headers headers = new Headers();
1✔
183
      // fresh dedup key so JetStream doesn't drop this as a duplicate of the original publish
184
      headers.add("Nats-Msg-Id", id + "-requeue-" + System.currentTimeMillis());
1✔
185
      js.publish(subject, headers, payload);
1✔
186
      r.setValue(true);
1✔
187
      return r;
1✔
188
    } catch (Exception e) {
1✔
189
      log.warn("enqueueMessage failed queue={} id={}", queueName, id, e);
1✔
190
      r.setCode(1);
1✔
191
      r.setMessage("enqueueMessage failed: " + e.getMessage());
1✔
192
      return r;
1✔
193
    }
194
  }
195

196
  /**
197
   * Move up to {@code maxMessages} messages from the source stream to the destination stream.
198
   * For each message: re-publishes the raw bytes (with original headers minus {@code Nats-Msg-Id})
199
   * to the destination subject, then hard-deletes the source sequence via
200
   * {@link JetStreamManagement#deleteMessage}. Both {@code src} and {@code dst} can be either
201
   * bare queue names (e.g. {@code "orders"}) or fully-prefixed stream names
202
   * (e.g. {@code "rqueue-js-orders"}) — the prefix is added if absent.
203
   */
204
  @Override
205
  public MessageMoveResponse moveMessage(MessageMoveRequest request) {
206
    String error = request.validationMessage();
1✔
207
    if (error != null) {
1✔
208
      MessageMoveResponse r = new MessageMoveResponse();
1✔
209
      r.setCode(1);
1✔
210
      r.setMessage(error);
1✔
211
      return r;
1✔
212
    }
213
    int maxCount = request.getMessageCount(rqueueWebConfig);
1✔
214
    String srcStream = toStream(request.getSrc());
1✔
215
    String dstStream = toStream(request.getDst());
1✔
216
    String dstSubject = toSubject(request.getDst());
1✔
217
    try {
218
      StreamInfo info = jsm.getStreamInfo(srcStream);
1✔
219
      long seq = info.getStreamState().getFirstSequence();
1✔
220
      long last = info.getStreamState().getLastSequence();
1✔
221
      int moved = 0;
1✔
222
      while (seq <= last && moved < maxCount) {
1!
223
        try {
224
          MessageInfo mi = jsm.getMessage(srcStream, seq);
1✔
225
          if (mi != null && mi.getData() != null) {
1!
226
            Headers h = new Headers();
1✔
227
            if (mi.getHeaders() != null) {
1✔
228
              mi.getHeaders().forEach((k, v) -> {
1✔
229
                if (!"Nats-Msg-Id".equals(k)) { // avoid dedup collision on destination
1✔
230
                  h.add(k, v);
1✔
231
                }
232
              });
1✔
233
            }
234
            js.publish(dstSubject, h, mi.getData());
1✔
235
            jsm.deleteMessage(srcStream, seq, false);
1✔
236
            moved++;
1✔
237
          }
238
        } catch (JetStreamApiException e) {
1✔
239
          if (e.getApiErrorCode() == JS_NO_MESSAGE_FOUND) {
1!
240
            // already consumed or deleted by another process — skip
241
          } else {
NEW
242
            throw e;
×
243
          }
244
        }
1✔
245
        seq++;
1✔
246
      }
247
      MessageMoveResponse r = new MessageMoveResponse(moved);
1✔
248
      r.setValue(moved > 0);
1!
249
      return r;
1✔
250
    } catch (IOException | JetStreamApiException e) {
1✔
251
      log.warn("moveMessage failed src={} dst={}", srcStream, dstStream, e);
1✔
252
      MessageMoveResponse r = new MessageMoveResponse();
1✔
253
      r.setCode(1);
1✔
254
      r.setMessage("moveMessage failed: " + e.getMessage());
1✔
255
      return r;
1✔
256
    }
257
  }
258

259
  private String toStream(String name) {
260
    return name.startsWith(streamPrefix) ? name : streamPrefix + name;
1!
261
  }
262

263
  private String toSubject(String name) {
264
    return name.startsWith(subjectPrefix) ? name : subjectPrefix + name;
1!
265
  }
266

267
  /**
268
   * NATS would require destructive stream re-creation to empty a queue. Out-of-band admin op
269
   * (e.g. {@code nats stream purge}) is the recommended path; surfaces "not supported" for now.
270
   */
271
  @Override
272
  public BooleanResponse makeEmpty(String queueName, String dataName) {
273
    return notSupported(new BooleanResponse(), "makeEmpty");
1✔
274
  }
275

276
  @Override
277
  public Pair<String, String> getLatestVersion() {
278
    return new Pair<>("", "");
1✔
279
  }
280

281
  /**
282
   * NATS-backed queues are always JetStream streams; report a fixed type rather than probing the
283
   * KV / stream layer per call.
284
   */
285
  @Override
286
  public StringResponse getDataType(String name) {
287
    StringResponse response = new StringResponse();
1✔
288
    response.setVal("STREAM");
1✔
289
    return response;
1✔
290
  }
291

292
  @Override
293
  public Mono<BooleanResponse> makeEmptyReactive(String queueName, String datasetName) {
NEW
294
    return Mono.just(makeEmpty(queueName, datasetName));
×
295
  }
296

297
  @Override
298
  public Mono<BooleanResponse> deleteReactiveMessage(String queueName, String messageId) {
299
    return Mono.just(deleteMessage(queueName, messageId));
1✔
300
  }
301

302
  @Override
303
  public Mono<BooleanResponse> enqueueReactiveMessage(
304
      String queueName, String messageId, String position) {
NEW
305
    return Mono.just(enqueueMessage(queueName, messageId, position));
×
306
  }
307

308
  @Override
309
  public Mono<StringResponse> getReactiveDataType(String name) {
NEW
310
    return Mono.just(getDataType(name));
×
311
  }
312

313
  @Override
314
  public Mono<MessageMoveResponse> moveReactiveMessage(MessageMoveRequest request) {
NEW
315
    return Mono.just(moveMessage(request));
×
316
  }
317

318
  @Override
319
  public Mono<BaseResponse> reactivePauseUnpauseQueue(PauseUnpauseQueueRequest request) {
320
    return Mono.just(pauseUnpauseQueue(request));
1✔
321
  }
322

323
  /**
324
   * Toggle the {@code paused} flag on the queue's {@link QueueConfig} (persisted in the
325
   * {@code rqueue-queue-config} KV bucket) and propagate the change to the local listener
326
   * container so the poller stops dispatching new work.
327
   *
328
   * <p>Multi-instance fan-out (i.e. propagating the pause across worker JVMs) is a follow-up;
329
   * single-instance deployments are fully covered by this path.
330
   */
331
  @Override
332
  public BaseResponse pauseUnpauseQueue(PauseUnpauseQueueRequest request) {
333
    log.info("Queue PauseUnpause request {}", request);
1✔
334
    BaseResponse response = new BaseResponse();
1✔
335
    if (request == null || StringUtils.isEmpty(request.getName())) {
1!
336
      response.set(400, "Queue name is required");
1✔
337
      return response;
1✔
338
    }
339
    QueueConfig queueConfig = systemConfigDao.getConfigByName(request.getName(), true);
1✔
340
    if (queueConfig == null) {
1✔
341
      response.set(404, "Queue does not exist");
1✔
342
      return response;
1✔
343
    }
344
    boolean targetState = request.isPause();
1✔
345
    if (queueConfig.isPaused() == targetState) {
1✔
346
      // No-op: state already matches; respond OK and skip the listener call to avoid the
347
      // "duplicate pause" / "not paused but unpause" warnings in QueueStateMgr.
348
      return response;
1✔
349
    }
350
    queueConfig.setPaused(targetState);
1✔
351
    systemConfigDao.saveQConfig(queueConfig);
1✔
352
    try {
353
      rqueueMessageListenerContainer.pauseUnpauseQueue(request.getName(), targetState);
1✔
354
    } catch (Exception e) {
1✔
355
      // QueueConfig is already persisted; surface the pause-propagation failure to the caller
356
      // but do not roll back — the next listener restart will pick up the persisted flag.
357
      log.warn("pauseUnpauseQueue listener notification failed for queue={}", request.getName(), e);
1✔
358
      response.set(500, "Persisted but listener notification failed: " + e.getMessage());
1✔
359
    }
1✔
360
    return response;
1✔
361
  }
362

363
  @Override
364
  public Mono<DataSelectorResponse> reactiveAggregateDataCounter(AggregationType type) {
365
    return Mono.just(aggregateDataCounter(type));
×
366
  }
367

368
  @Override
369
  public DataSelectorResponse aggregateDataCounter(AggregationType type) {
370
    String title;
371
    List<Pair<String, String>> data;
372
    if (type == AggregationType.DAILY) {
1✔
373
      data = getDailyDateCounter();
1✔
374
      title = "Select Number of Days";
1✔
375
    } else if (type == AggregationType.WEEKLY) {
1✔
376
      data = getWeeklyDateCounter();
1✔
377
      title = "Select Number of Weeks";
1✔
378
    } else {
379
      data = getMonthlyDateCounter();
1✔
380
      title = "Select Number of Months";
1✔
381
    }
382
    return new DataSelectorResponse(title, data);
1✔
383
  }
384

385
  private List<Pair<String, String>> getDailyDateCounter() {
386
    List<Pair<String, String>> dateSelector = new LinkedList<>();
1✔
387
    int[] dates = new int[] {1, 2, 3, 4, 6, 7};
1✔
388
    int step = 15;
1✔
389
    int stepAfter = 15;
1✔
390
    int i = 1;
1✔
391
    dateSelector.add(new Pair<>("0", "Select"));
1✔
392
    while (i <= rqueueWebConfig.getHistoryDay()) {
1✔
393
      if (i >= stepAfter) {
1!
394
        if (i <= rqueueWebConfig.getHistoryDay()) {
×
395
          dateSelector.add(new Pair<>(String.valueOf(i), String.format("Last %d days", i)));
×
396
        }
397
        i += step;
×
398
      } else {
399
        for (int date : dates) {
1✔
400
          if (date == i) {
1✔
401
            String suffix = i == 1 ? "day" : "days";
1✔
402
            dateSelector.add(
1✔
403
                new Pair<>(String.valueOf(date), String.format("Last %d %s", date, suffix)));
1✔
404
            break;
1✔
405
          }
406
        }
407
        i += 1;
1✔
408
      }
409
    }
410
    return dateSelector;
1✔
411
  }
412

413
  private List<Pair<String, String>> getWeeklyDateCounter() {
414
    List<Pair<String, String>> dateSelector = new LinkedList<>();
1✔
415
    dateSelector.add(new Pair<>("0", "Select"));
1✔
416
    int nWeek =
1✔
417
        (int) Math.ceil(rqueueWebConfig.getHistoryDay() / (double) Constants.DAYS_IN_A_WEEK);
1✔
418
    for (int week = 1; week <= nWeek; week++) {
1✔
419
      String suffix = week == 1 ? "week" : "weeks";
1!
420
      dateSelector.add(new Pair<>(String.valueOf(week), String.format("Last %d %s", week, suffix)));
1✔
421
    }
422
    return dateSelector;
1✔
423
  }
424

425
  private List<Pair<String, String>> getMonthlyDateCounter() {
426
    List<Pair<String, String>> dateSelector = new LinkedList<>();
1✔
427
    dateSelector.add(new Pair<>("0", "Select"));
1✔
428
    int nMonths =
1✔
429
        (int) Math.ceil(rqueueWebConfig.getHistoryDay() / (double) Constants.DAYS_IN_A_MONTH);
1✔
430
    for (int month = 1; month <= nMonths; month++) {
1✔
431
      String suffix = month == 1 ? "month" : "months";
1!
432
      dateSelector.add(
1✔
433
          new Pair<>(String.valueOf(month), String.format("Last %d %s", month, suffix)));
1✔
434
    }
435
    return dateSelector;
1✔
436
  }
437
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc