• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.0
/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceImpl.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.web.service;
18

19
import static com.github.sonus21.rqueue.utils.StringUtils.clean;
20
import static com.google.common.collect.Lists.newArrayList;
21

22
import com.github.sonus21.rqueue.config.RqueueConfig;
23
import com.github.sonus21.rqueue.core.EndpointRegistry;
24
import com.github.sonus21.rqueue.core.RqueueMessage;
25
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
26
import com.github.sonus21.rqueue.core.spi.MessageBroker;
27
import com.github.sonus21.rqueue.core.spi.SubscriberView;
28
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
29
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
30
import com.github.sonus21.rqueue.listener.QueueDetail;
31
import com.github.sonus21.rqueue.models.db.DeadLetterQueue;
32
import com.github.sonus21.rqueue.models.db.MessageMetadata;
33
import com.github.sonus21.rqueue.models.db.QueueConfig;
34
import com.github.sonus21.rqueue.models.enums.ActionType;
35
import com.github.sonus21.rqueue.models.enums.DataType;
36
import com.github.sonus21.rqueue.models.enums.NavTab;
37
import com.github.sonus21.rqueue.models.enums.TableColumnType;
38
import com.github.sonus21.rqueue.models.registry.RqueueWorkerPollerView;
39
import com.github.sonus21.rqueue.models.response.Action;
40
import com.github.sonus21.rqueue.models.response.DataViewResponse;
41
import com.github.sonus21.rqueue.models.response.RedisDataDetail;
42
import com.github.sonus21.rqueue.models.response.RowColumnMeta;
43
import com.github.sonus21.rqueue.models.response.RowColumnMetaType;
44
import com.github.sonus21.rqueue.models.response.SubscriberRow;
45
import com.github.sonus21.rqueue.models.response.TableColumn;
46
import com.github.sonus21.rqueue.models.response.TableRow;
47
import com.github.sonus21.rqueue.models.response.TerminalStorageRow;
48
import com.github.sonus21.rqueue.repository.MessageBrowsingRepository;
49
import com.github.sonus21.rqueue.service.RqueueMessageMetadataService;
50
import com.github.sonus21.rqueue.utils.Constants;
51
import com.github.sonus21.rqueue.utils.DateTimeUtils;
52
import com.github.sonus21.rqueue.utils.StringUtils;
53
import com.github.sonus21.rqueue.web.RqueueQDetailService;
54
import com.github.sonus21.rqueue.web.RqueueSystemManagerService;
55
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
56
import java.util.ArrayList;
57
import java.util.Arrays;
58
import java.util.Collections;
59
import java.util.HashMap;
60
import java.util.List;
61
import java.util.Map;
62
import java.util.Map.Entry;
63
import java.util.Objects;
64
import java.util.stream.Collectors;
65
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.data.redis.core.DefaultTypedTuple;
67
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
68
import org.springframework.stereotype.Service;
69
import org.springframework.util.CollectionUtils;
70
import reactor.core.publisher.Mono;
71

72
@Service
1✔
73
public class RqueueQDetailServiceImpl implements RqueueQDetailService {
74

75
  private final MessageBrowsingRepository messageBrowsingRepository;
76
  private final RqueueMessageTemplate rqueueMessageTemplate;
77
  private final RqueueSystemManagerService rqueueSystemManagerService;
78
  private final RqueueMessageMetadataService rqueueMessageMetadataService;
79
  private final RqueueConfig rqueueConfig;
80
  private final RqueueWorkerRegistry rqueueWorkerRegistry;
81

82
  /**
83
   * Optional broker SPI. When set (non-Redis backend), the dashboard prefers
84
   * {@link MessageBroker#size(QueueDetail)} and
85
   * {@link MessageBroker#peek(QueueDetail, long, long)} for read paths instead of
86
   * the Redis DAOs. {@code @Autowired(required = false)} keeps the Redis-only path
87
   * unchanged when no broker is configured.
88
   */
89
  private MessageBroker messageBroker;
90

91
  @Autowired
92
  public RqueueQDetailServiceImpl(
93
      MessageBrowsingRepository messageBrowsingRepository,
94
      RqueueMessageTemplate rqueueMessageTemplate,
95
      RqueueSystemManagerService rqueueSystemManagerService,
96
      RqueueMessageMetadataService rqueueMessageMetadataService,
97
      RqueueConfig rqueueConfig,
98
      RqueueWorkerRegistry rqueueWorkerRegistry) {
1✔
99
    this.messageBrowsingRepository = messageBrowsingRepository;
1✔
100
    this.rqueueMessageTemplate = rqueueMessageTemplate;
1✔
101
    this.rqueueSystemManagerService = rqueueSystemManagerService;
1✔
102
    this.rqueueMessageMetadataService = rqueueMessageMetadataService;
1✔
103
    this.rqueueConfig = rqueueConfig;
1✔
104
    this.rqueueWorkerRegistry = rqueueWorkerRegistry;
1✔
105
  }
1✔
106

107
  @Autowired(required = false)
108
  public void setMessageBroker(MessageBroker messageBroker) {
109
    this.messageBroker = messageBroker;
1✔
110
  }
1✔
111

112
  /**
113
   * Visible for tests and pluggable backends.
114
   */
115
  public MessageBroker getMessageBroker() {
116
    return messageBroker;
×
117
  }
118

119
  /**
120
   * Resolves a {@link QueueDetail} for the given queue name. Returns {@code null} when no
121
   * detail is registered (e.g. shutdown / late init); callers should fall back to the Redis
122
   * path in that case.
123
   */
124
  private QueueDetail lookupQueueDetail(String queueName) {
125
    try {
126
      return EndpointRegistry.get(queueName);
1✔
127
    } catch (Exception e) {
×
128
      return null;
×
129
    }
130
  }
131

132
  private boolean brokerHidesScheduled() {
133
    return messageBroker != null && !messageBroker.capabilities().supportsScheduledIntrospection();
1✔
134
  }
135

136
  private boolean brokerHidesCron() {
137
    return messageBroker != null && !messageBroker.capabilities().supportsCronJobs();
1✔
138
  }
139

140
  /**
141
   * Returns true when the broker manages its own in-flight tracking and does not use the Redis
142
   * processing ZSET. Brokers that return {@code usesPrimaryHandlerDispatch() == false} (e.g. NATS)
143
   * have no separate "running" queue to inspect, so the RUNNING tab/row must be suppressed.
144
   */
145
  private boolean brokerHidesRunning() {
146
    return messageBroker != null && !messageBroker.capabilities().usesPrimaryHandlerDispatch();
1✔
147
  }
148

149
  @Override
150
  public String storageKicker() {
151
    return messageBroker != null ? messageBroker.storageKicker() : "Redis";
1!
152
  }
153

154
  @Override
155
  public String storageDescription() {
156
    return messageBroker != null
1!
157
        ? messageBroker.storageDescription()
1✔
158
        : "Underlying Redis structures for the queues visible on this page.";
×
159
  }
160

161
  @Override
162
  public Map<String, List<Entry<NavTab, RedisDataDetail>>> getQueueDataStructureDetails(
163
      List<QueueConfig> queueConfig) {
164
    return queueConfig.parallelStream()
1✔
165
        .collect(Collectors.toMap(QueueConfig::getName, this::getQueueDataStructureDetail));
1✔
166
  }
167

168
  @Override
169
  public List<Entry<NavTab, RedisDataDetail>> getQueueDataStructureDetail(QueueConfig queueConfig) {
170
    if (queueConfig == null) {
1✔
171
      return Collections.emptyList();
1✔
172
    }
173
    // Route size lookup through the broker SPI when configured (non-Redis backend).
174
    QueueDetail brokerQueueDetail =
175
        messageBroker != null ? lookupQueueDetail(queueConfig.getName()) : null;
1✔
176
    Long pending;
177
    if (brokerQueueDetail != null) {
1✔
178
      pending = messageBroker.size(brokerQueueDetail);
1✔
179
    } else {
180
      pending = messageBrowsingRepository.getDataSize(queueConfig.getQueueName(), DataType.LIST);
1✔
181
    }
182
    // When a non-Redis broker is configured, use its storage display names instead of Redis keys.
183
    String pendingDisplayName =
184
        brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null
1!
185
            ? messageBroker.storageDisplayName(brokerQueueDetail)
×
186
            : queueConfig.getQueueName();
1✔
187
    List<Entry<NavTab, RedisDataDetail>> queueRedisDataDetails = newArrayList();
1✔
188
    // Per-consumer pending breakdown for brokers that expose it (e.g. NATS Limits-retention
189
    // streams where each durable consumer has its own offset). When present, render one row
190
    // per consumer with an exact pending count instead of a single aggregated "~ N" row.
191
    Map<String, Long> perConsumer =
192
        brokerQueueDetail != null ? messageBroker.consumerPendingSizes(brokerQueueDetail) : null;
1✔
193
    if (perConsumer != null && !perConsumer.isEmpty()) {
1!
NEW
194
      String label = brokerLabel(NavTab.PENDING, DataType.LIST);
×
NEW
195
      for (Map.Entry<String, Long> entry : perConsumer.entrySet()) {
×
NEW
196
        Long size = entry.getValue();
×
NEW
197
        RedisDataDetail consumerDetail =
×
NEW
198
            new RedisDataDetail(pendingDisplayName, DataType.LIST, size == null ? 0 : size);
×
NEW
199
        consumerDetail.setTypeLabel(label);
×
NEW
200
        consumerDetail.setConsumerName(entry.getKey());
×
201
        // Per-consumer counts are exact (numPending or position math for that subscriber);
202
        // the approximation flag only applies to the aggregated single-row view.
NEW
203
        queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.PENDING, consumerDetail));
×
NEW
204
      }
×
NEW
205
    } else {
×
206
      RedisDataDetail pendingDetail =
1✔
207
          new RedisDataDetail(pendingDisplayName, DataType.LIST, pending == null ? 0 : pending);
1!
208
      pendingDetail.setTypeLabel(brokerLabel(NavTab.PENDING, DataType.LIST));
1✔
209
      if (brokerQueueDetail != null) {
1✔
210
        pendingDetail.setApproximate(messageBroker.isSizeApproximate(brokerQueueDetail));
1✔
211
      }
212
      queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.PENDING, pendingDetail));
1✔
213
    }
214
    // Brokers that manage their own in-flight tracking (e.g. NATS JetStream) have no separate
215
    // processing ZSET, so omit the RUNNING entry to avoid a 501 when the explorer opens it.
216
    if (!brokerHidesRunning()) {
1✔
217
      String processingQueueName = queueConfig.getProcessingQueueName();
1✔
218
      Long running = messageBrowsingRepository.getDataSize(processingQueueName, DataType.ZSET);
1✔
219
      RedisDataDetail runningDetail =
1✔
220
          new RedisDataDetail(processingQueueName, DataType.ZSET, running == null ? 0 : running);
1!
221
      runningDetail.setTypeLabel(brokerLabel(NavTab.RUNNING, DataType.ZSET));
1✔
222
      queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.RUNNING, runningDetail));
1✔
223
    }
224
    String scheduledQueueName = queueConfig.getScheduledQueueName();
1✔
225
    // When the broker doesn't support scheduled introspection (e.g. JetStream), suppress
226
    // the SCHEDULED nav tab entry entirely so the dashboard doesn't query an absent ZSET.
227
    if (!brokerHidesScheduled()) {
1✔
228
      Long scheduled = messageBrowsingRepository.getDataSize(scheduledQueueName, DataType.ZSET);
1✔
229
      RedisDataDetail scheduledDetail =
1✔
230
          new RedisDataDetail(scheduledQueueName, DataType.ZSET, scheduled == null ? 0 : scheduled);
1!
231
      scheduledDetail.setTypeLabel(brokerLabel(NavTab.SCHEDULED, DataType.ZSET));
1✔
232
      queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.SCHEDULED, scheduledDetail));
1✔
233
    }
234
    if (!CollectionUtils.isEmpty(queueConfig.getDeadLetterQueues())) {
1✔
235
      for (DeadLetterQueue dlq : queueConfig.getDeadLetterQueues()) {
1✔
236
        String dlqDisplayName = brokerQueueDetail != null
237
                && messageBroker.dlqStorageDisplayName(brokerQueueDetail) != null
1!
238
            ? messageBroker.dlqStorageDisplayName(brokerQueueDetail)
×
239
            : dlq.getName();
1✔
240
        RedisDataDetail dlqDetail;
241
        if (!dlq.isConsumerEnabled()) {
1!
242
          Long dlqSize = messageBrowsingRepository.getDataSize(dlq.getName(), DataType.LIST);
1✔
243
          dlqDetail =
1✔
244
              new RedisDataDetail(dlqDisplayName, DataType.LIST, dlqSize == null ? 0 : dlqSize);
1!
245
        } else {
1✔
246
          // TODO should we redirect to the queue page?
NEW
247
          dlqDetail = new RedisDataDetail(dlqDisplayName, DataType.LIST, -1);
×
248
        }
249
        dlqDetail.setTypeLabel(brokerLabel(NavTab.DEAD, DataType.LIST));
1✔
250
        queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.DEAD, dlqDetail));
1✔
251
      }
1✔
252
    }
253
    if (rqueueConfig.messageInTerminalStateShouldBeStored()
1✔
254
        && !StringUtils.isEmpty(queueConfig.getCompletedQueueName())) {
1!
255
      Long completed =
1✔
256
          messageBrowsingRepository.getDataSize(queueConfig.getCompletedQueueName(), DataType.ZSET);
1✔
257
      String completedDisplayName =
258
          brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null
1!
259
              ? messageBroker.storageDisplayName(brokerQueueDetail)
×
260
              : queueConfig.getCompletedQueueName();
1✔
261
      RedisDataDetail completedDetail = new RedisDataDetail(
1✔
262
          completedDisplayName, DataType.ZSET, completed == null ? 0 : completed);
1!
263
      completedDetail.setTypeLabel(brokerLabel(NavTab.COMPLETED, DataType.ZSET));
1✔
264
      queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.COMPLETED, completedDetail));
1✔
265
    }
266
    return queueRedisDataDetails;
1✔
267
  }
268

269
  /**
270
   * Resolve the broker-specific human-readable label for the given (NavTab, DataType) pair.
271
   * Returns {@code null} on the legacy Redis path so the template falls back to
272
   * {@code DataType.name()} (LIST/ZSET).
273
   */
274
  private String brokerLabel(NavTab tab, DataType type) {
275
    return messageBroker != null ? messageBroker.dataTypeLabel(tab, type) : null;
1✔
276
  }
277

278
  @Override
279
  public List<NavTab> getNavTabs(QueueConfig queueConfig) {
280
    List<NavTab> navTabs = new ArrayList<>();
1✔
281
    if (queueConfig != null) {
1✔
282
      navTabs.add(NavTab.PENDING);
1✔
283
      // Hide SCHEDULED tab for brokers without scheduled-queue introspection support.
284
      if (!brokerHidesScheduled()) {
1✔
285
        navTabs.add(NavTab.SCHEDULED);
1✔
286
      }
287
      // Hide RUNNING tab for brokers that manage in-flight tracking internally (e.g. NATS).
288
      if (!brokerHidesRunning()) {
1✔
289
        navTabs.add(NavTab.RUNNING);
1✔
290
      }
291
      if (queueConfig.hasDeadLetterQueue()) {
1✔
292
        navTabs.add(NavTab.DEAD);
1✔
293
      }
294
    }
295
    return navTabs;
1✔
296
  }
297

298
  private List<TypedTuple<RqueueMessage>> readFromZset(
299
      String name, int pageNumber, int itemPerPage) {
300
    requireScheduledIntrospection("readFromZset");
1✔
301
    long start = pageNumber * (long) itemPerPage;
1✔
302
    long end = start + itemPerPage - 1;
1✔
303

304
    return rqueueMessageTemplate.readFromZset(name, start, end).stream()
1✔
305
        .map(e -> new DefaultTypedTuple<>(e, null))
1✔
306
        .collect(Collectors.toList());
1✔
307
  }
308

309
  private List<TypedTuple<RqueueMessage>> readFromList(
310
      String name, int pageNumber, int itemPerPage) {
311
    long start = pageNumber * (long) itemPerPage;
1✔
312
    long end = start + itemPerPage - 1;
1✔
313
    return rqueueMessageTemplate.readFromList(name, start, end).stream()
1✔
314
        .map(e -> new DefaultTypedTuple<>(e, null))
1✔
315
        .collect(Collectors.toList());
1✔
316
  }
317

318
  private List<TypedTuple<RqueueMessage>> readFromZetWithScore(
319
      String name, int pageNumber, int itemPerPage) {
320
    requireScheduledIntrospection("readFromZsetWithScore");
1✔
321
    long start = pageNumber * (long) itemPerPage;
1✔
322
    long end = start + itemPerPage - 1;
1✔
323
    return rqueueMessageTemplate.readFromZsetWithScore(name, start, end);
1✔
324
  }
325

326
  /**
327
   * Guard for ZSET-shaped lookups that the redis backend services natively but no other backend
328
   * does. Backends that report {@code !supportsScheduledIntrospection()} surface a structured 501
329
   * via {@code BackendCapabilityException} instead of NPE-ing through a Redis-shaped template
330
   * with no Redis connection.
331
   */
332
  private void requireScheduledIntrospection(String op) {
333
    if (messageBroker != null && !messageBroker.capabilities().supportsScheduledIntrospection()) {
1!
334
      throw new com.github.sonus21.rqueue.exception.BackendCapabilityException(
×
335
          messageBroker.getClass().getSimpleName(),
×
336
          op,
337
          "broker does not expose scheduled / completion ZSET introspection");
338
    }
339
  }
1✔
340

341
  private List<TableRow> buildRows(
342
      List<TypedTuple<RqueueMessage>> rqueueMessages, RowBuilder rowBuilder) {
343
    if (CollectionUtils.isEmpty(rqueueMessages)) {
1✔
344
      return Collections.emptyList();
1✔
345
    }
346
    Map<String, String> messageMetaIdToId = new HashMap<>();
1✔
347
    for (TypedTuple<RqueueMessage> tuple : rqueueMessages) {
1✔
348
      RqueueMessage rqueueMessage = tuple.getValue();
1✔
349
      assert rqueueMessage != null;
1!
350
      String messageMetaId =
1✔
351
          RqueueMessageUtils.getMessageMetaId(rqueueMessage.getQueueName(), rqueueMessage.getId());
1✔
352
      messageMetaIdToId.put(messageMetaId, rqueueMessage.getId());
1✔
353
    }
1✔
354
    List<MessageMetadata> vals = rqueueMessageMetadataService.findAll(messageMetaIdToId.keySet());
1✔
355
    Map<String, Boolean> msgIdToDeleted = new HashMap<>();
1✔
356
    for (MessageMetadata messageMetadata : vals) {
1✔
357
      String messageMetaId = messageMetadata.getId();
1✔
358
      String id = messageMetaIdToId.get(messageMetaId);
1✔
359
      msgIdToDeleted.put(id, messageMetadata.isDeleted());
1✔
360
    }
1✔
361
    return rqueueMessages.stream()
1✔
362
        .map(e -> rowBuilder.row(
1✔
363
            e.getValue(), msgIdToDeleted.getOrDefault(e.getValue().getId(), false), e.getScore()))
1✔
364
        .collect(Collectors.toList());
1✔
365
  }
366

367
  private void addActionsIfRequired(
368
      String src,
369
      String name,
370
      DataType type,
371
      boolean scheduledQueue,
372
      boolean deadLetterQueue,
373
      boolean completionQueue,
374
      DataViewResponse response) {
375
    if (deadLetterQueue) {
1✔
376
      response.addAction(
1✔
377
          new Action(ActionType.DELETE, String.format("dead letter queue '%s'", name)));
1✔
378
    } else if (completionQueue) {
1✔
379
      response.addAction(
1✔
380
          new Action(ActionType.DELETE, String.format("Completed messages for queue '%s'", name)));
1✔
381
    } else if (type == DataType.LIST) {
1✔
382
      response.addAction(
1✔
383
          new Action(ActionType.DELETE, String.format("pending messages for queue '%s'", src)));
1✔
384
    } else if (scheduledQueue) {
1✔
385
      response.addAction(
1✔
386
          new Action(ActionType.DELETE, String.format("scheduled messages for queue '%s'", src)));
1✔
387
    }
388
  }
1✔
389

390
  @Override
391
  public DataViewResponse getExplorePageData(
392
      String src,
393
      String name,
394
      DataType type,
395
      String consumerName,
396
      int pageNumber,
397
      int itemPerPage) {
398
    QueueConfig queueConfig = rqueueSystemManagerService.getQueueConfig(src);
1✔
399
    DataViewResponse response = new DataViewResponse();
1✔
400
    if (queueConfig == null) {
1!
NEW
401
      response.set(1, "Queue '" + src + "' does not exist");
×
NEW
402
      return response;
×
403
    }
404
    boolean deadLetterQueue = queueConfig.isDeadLetterQueue(name);
1✔
405
    boolean scheduledQueue = queueConfig.getScheduledQueueName().equals(name);
1✔
406
    boolean completionQueue = name.equals(queueConfig.getCompletedQueueName());
1✔
407
    // Surface broker capability hints so the UI can hide the corresponding panels.
408
    response.setHideScheduledPanel(brokerHidesScheduled());
1✔
409
    response.setHideCronJobs(brokerHidesCron());
1✔
410
    // When the broker does not support scheduled-queue introspection, return an empty
411
    // result set for the scheduled tab. The hideScheduledPanel flag (above) tells the
412
    // frontend to grey out / hide the panel.
413
    if (scheduledQueue && brokerHidesScheduled()) {
1✔
414
      response.setRows(Collections.emptyList());
1✔
415
      return response;
1✔
416
    }
417
    setHeadersIfRequired(deadLetterQueue, completionQueue, type, response, pageNumber);
1✔
418
    addActionsIfRequired(
1✔
419
        src, name, type, scheduledQueue, deadLetterQueue, completionQueue, response);
420
    // Prefer broker.peek() for the ready (LIST) queue when a non-Redis broker is configured.
421
    if (type == DataType.LIST && !deadLetterQueue && messageBroker != null) {
1✔
422
      QueueDetail qd = lookupQueueDetail(queueConfig.getName());
1✔
423
      if (qd != null) {
1!
424
        long offset = (long) pageNumber * itemPerPage;
1✔
425
        List<RqueueMessage> peeked = messageBroker.peek(qd, consumerName, offset, itemPerPage);
1✔
426
        List<TypedTuple<RqueueMessage>> tuples = peeked.stream()
1✔
427
            .map(m -> (TypedTuple<RqueueMessage>) new DefaultTypedTuple<>(m, null))
1✔
428
            .collect(Collectors.toList());
1✔
429
        response.setRows(buildRows(tuples, new ListRowBuilder(false)));
1✔
430
        return response;
1✔
431
      }
432
    }
433
    switch (type) {
1!
434
      case ZSET:
435
        if (scheduledQueue) {
1✔
436
          response.setRows(buildRows(
1✔
437
              readFromZset(name, pageNumber, itemPerPage), new ZsetRowBuilder(true, false)));
1✔
438
        } else if (completionQueue) {
1✔
439
          response.setRows(buildRows(
1✔
440
              readFromMessageMetadataStore(name, pageNumber, itemPerPage),
1✔
441
              new ZsetRowBuilder(false, true)));
442
        } else {
443
          response.setRows(buildRows(
1✔
444
              readFromZetWithScore(name, pageNumber, itemPerPage),
1✔
445
              new ZsetRowBuilder(false, false)));
446
        }
447
        break;
1✔
448
      case LIST:
449
        response.setRows(buildRows(
1✔
450
            readFromList(name, pageNumber, itemPerPage), new ListRowBuilder(deadLetterQueue)));
1✔
451
        break;
1✔
452
      default:
453
        throw new UnknownSwitchCase(type.name());
×
454
    }
455
    return response;
1✔
456
  }
457

458
  private List<TypedTuple<RqueueMessage>> readFromMessageMetadataStore(
459
      String name, int pageNumber, int itemPerPage) {
460
    long start = pageNumber * (long) itemPerPage;
1✔
461
    long end = start + itemPerPage - 1;
1✔
462
    List<TypedTuple<MessageMetadata>> mes =
1✔
463
        rqueueMessageMetadataService.readMessageMetadataForQueue(name, start, end);
1✔
464
    if (CollectionUtils.isEmpty(mes)) {
1!
465
      return Collections.emptyList();
×
466
    }
467
    return mes.stream()
1✔
468
        .map(e -> new DefaultTypedTuple<>(
1✔
469
            Objects.requireNonNull(e.getValue()).getRqueueMessage(),
1✔
470
            (double) e.getValue().getUpdatedOn()))
1✔
471
        .collect(Collectors.toList());
1✔
472
  }
473

474
  @Override
475
  public DataViewResponse viewData(
476
      String name, DataType type, String key, int pageNumber, int itemPerPage) {
477
    if (StringUtils.isEmpty(name)) {
1✔
478
      return DataViewResponse.createErrorMessage("Data name cannot be empty.");
1✔
479
    }
480
    if (DataType.isUnknown(type)) {
1✔
481
      return DataViewResponse.createErrorMessage("Data type is not provided.");
1✔
482
    }
483
    // Delegate the per-type dispatch to the storage layer. Backends without arbitrary keyed
484
    // reads (NATS) throw BackendCapabilityException → 501; the web advice surfaces it.
485
    return messageBrowsingRepository.viewData(
1✔
486
        clean(name), type, key == null ? null : clean(key), pageNumber, itemPerPage);
1!
487
  }
488

489
  private void setHeadersIfRequired(
490
      boolean deadLetterQueue,
491
      boolean completedQueue,
492
      DataType type,
493
      DataViewResponse response,
494
      int pageNumber) {
495
    if (pageNumber != 0) {
1!
496
      return;
×
497
    }
498
    List<String> headers = newArrayList("Id", "Message", "Type");
1✔
499
    if (DataType.ZSET == type && !completedQueue) {
1✔
500
      headers.add("Time Left");
1✔
501
    }
502
    if (deadLetterQueue) {
1✔
503
      headers.add("Added On");
1✔
504
    } else if (completedQueue) {
1✔
505
      headers.add("Completed On");
1✔
506
    } else {
507
      headers.add("Action");
1✔
508
    }
509
    response.setHeaders(headers);
1✔
510
  }
1✔
511

512
  @Override
513
  public List<List<Object>> getRunningTasks() {
514
    // Brokers that manage in-flight tracking internally (e.g. NATS JetStream durable consumers)
515
    // have no separate processing ZSET to report on. Surface an empty table with just the header
516
    // row so the home dashboard shows the section but doesn't render a column of zeros.
517
    if (brokerHidesRunning()) {
1!
NEW
518
      return emptyTable("Processing");
×
519
    }
520
    return bulkSizeTable(
1✔
521
        rqueueSystemManagerService.getSortedQueueConfigs(),
1✔
522
        QueueConfig::getProcessingQueueName,
523
        DataType.ZSET,
524
        "Processing [ZSET]");
525
  }
526

527
  @Override
528
  public List<List<Object>> getWaitingTasks() {
529
    return bulkSizeTable(
1✔
530
        rqueueSystemManagerService.getSortedQueueConfigs(),
1✔
531
        QueueConfig::getQueueName,
532
        DataType.LIST,
533
        "Queue [LIST]");
534
  }
535

536
  @Override
537
  public List<List<Object>> getScheduledTasks() {
538
    // Brokers without scheduled-queue introspection (e.g. NATS JetStream) have no scheduled ZSET.
539
    // Return an empty table so the home dashboard doesn't query an absent data structure.
540
    if (brokerHidesScheduled()) {
1!
NEW
541
      return emptyTable("Scheduled");
×
542
    }
543
    return bulkSizeTable(
1✔
544
        rqueueSystemManagerService.getSortedQueueConfigs(),
1✔
545
        QueueConfig::getScheduledQueueName,
546
        DataType.ZSET,
547
        "Scheduled [ZSET]");
548
  }
549

550
  /**
551
   * Header-only table used when a broker capability suppresses an entire section (e.g.
552
   * NATS hiding the running / scheduled rows). The frontend renders the column header and
553
   * no body rows.
554
   */
555
  private List<List<Object>> emptyTable(String section) {
NEW
556
    List<List<Object>> rows = new ArrayList<>();
×
NEW
557
    rows.add(Arrays.asList("Queue", section, "Number of Messages"));
×
NEW
558
    return rows;
×
559
  }
560

561
  /**
562
   * Render the home-dashboard "queue / data-name / count" 3-column table for a per-queue data
563
   * structure. The repository's {@link MessageBrowsingRepository#getDataSizes(List, List)} is
564
   * expected to pipeline on Redis; NATS returns zeros.
565
   */
566
  private List<List<Object>> bulkSizeTable(
567
      List<QueueConfig> queueConfigs,
568
      java.util.function.Function<QueueConfig, String> nameExtractor,
569
      DataType dataType,
570
      String columnLabel) {
571
    List<List<Object>> rows = new ArrayList<>();
1✔
572
    rows.add(Arrays.asList("Queue", columnLabel, "Number of Messages"));
1✔
573
    if (CollectionUtils.isEmpty(queueConfigs)) {
1!
574
      return rows;
×
575
    }
576
    List<String> names = new ArrayList<>(queueConfigs.size());
1✔
577
    List<DataType> types = new ArrayList<>(queueConfigs.size());
1✔
578
    for (QueueConfig queueConfig : queueConfigs) {
1✔
579
      names.add(nameExtractor.apply(queueConfig));
1✔
580
      types.add(dataType);
1✔
581
    }
1✔
582
    List<Long> sizes = messageBrowsingRepository.getDataSizes(names, types);
1✔
583
    for (int i = 0; i < queueConfigs.size(); i++) {
1✔
584
      rows.add(Arrays.asList(queueConfigs.get(i).getName(), names.get(i), sizes.get(i)));
1✔
585
    }
586
    return rows;
1✔
587
  }
588

589
  private void addRows(
590
      List<Long> result,
591
      List<List<Object>> rows,
592
      List<Entry<QueueConfig, String>> queueConfigAndDlq) {
593
    for (int i = 0, j = 0; i < queueConfigAndDlq.size(); i++) {
1✔
594
      Entry<QueueConfig, String> entry = queueConfigAndDlq.get(i);
1✔
595
      QueueConfig queueConfig = entry.getKey();
1✔
596
      if (entry.getValue().isEmpty()) {
1✔
597
        rows.add(Arrays.asList(queueConfig.getName(), Constants.BLANK, Constants.BLANK));
1✔
598
      } else {
599
        String name = Constants.BLANK;
1✔
600
        if (i == 0
1✔
601
            || !queueConfig
602
                .getQueueName()
1✔
603
                .equals(queueConfigAndDlq.get(i - 1).getKey().getQueueName())) {
1!
604
          name = queueConfig.getName();
1✔
605
        }
606
        rows.add(Arrays.asList(name, entry.getValue(), result.get(j++)));
1✔
607
      }
608
    }
609
  }
1✔
610

611
  @Override
612
  public List<List<Object>> getDeadLetterTasks() {
613
    List<QueueConfig> queueConfigs = rqueueSystemManagerService.getSortedQueueConfigs();
1✔
614
    List<Entry<QueueConfig, String>> queueConfigAndDlq = new ArrayList<>();
1✔
615
    for (QueueConfig queueConfig : queueConfigs) {
1✔
616
      if (queueConfig.hasDeadLetterQueue()) {
1✔
617
        for (DeadLetterQueue dlq : queueConfig.getDeadLetterQueues()) {
1✔
618
          queueConfigAndDlq.add(new HashMap.SimpleEntry<>(queueConfig, dlq.getName()));
1✔
619
        }
1✔
620
      } else {
621
        queueConfigAndDlq.add(new HashMap.SimpleEntry<>(queueConfig, Constants.BLANK));
1✔
622
      }
623
    }
1✔
624
    List<List<Object>> rows = new ArrayList<>();
1✔
625
    List<Long> result = new ArrayList<>();
1✔
626
    if (!CollectionUtils.isEmpty(queueConfigAndDlq)) {
1!
627
      List<String> dlqNames = new ArrayList<>();
1✔
628
      List<DataType> dlqTypes = new ArrayList<>();
1✔
629
      for (Entry<QueueConfig, String> entry : queueConfigAndDlq) {
1✔
630
        if (!entry.getValue().isEmpty()) {
1✔
631
          dlqNames.add(entry.getValue());
1✔
632
          dlqTypes.add(DataType.LIST);
1✔
633
        }
634
      }
1✔
635
      result = messageBrowsingRepository.getDataSizes(dlqNames, dlqTypes);
1✔
636
    }
637
    rows.add(Arrays.asList("Queue", "Dead Letter Queues [LIST]", "Number of Messages"));
1✔
638
    addRows(result, rows, queueConfigAndDlq);
1✔
639
    return rows;
1✔
640
  }
641

642
  @Override
643
  public List<RqueueWorkerPollerView> getQueueWorkers(String queueName) {
644
    return rqueueWorkerRegistry.getQueueWorkers(queueName);
1✔
645
  }
646

647
  // -------------------------------------------------------------------------
648
  // Subscriber + Terminal Storage rows (new queue-detail UI)
649
  // -------------------------------------------------------------------------
650

651
  /**
652
   * Build the per-subscriber rows that drive the new "Subscribers" section. Joins broker SPI
653
   * data ({@link MessageBroker#subscribers}) with last-active info from the worker registry,
654
   * keyed on {@code consumerName}. Falls back to a single anonymous row when the queue has
655
   * no registered handlers (e.g. a producer-only deployment).
656
   */
657
  @Override
658
  public List<SubscriberRow> getSubscriberRows(QueueConfig queueConfig) {
659
    if (queueConfig == null) {
1!
NEW
660
      return Collections.emptyList();
×
661
    }
662
    QueueDetail brokerQueueDetail =
663
        messageBroker != null ? lookupQueueDetail(queueConfig.getName()) : null;
1!
664
    List<SubscriberView> views = brokerSubscribers(queueConfig, brokerQueueDetail);
1✔
665
    if (views.isEmpty()) {
1!
NEW
666
      return Collections.emptyList();
×
667
    }
668
    String storageName =
669
        brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null
1!
NEW
670
            ? messageBroker.storageDisplayName(brokerQueueDetail)
×
671
            : queueConfig.getQueueName();
1✔
672
    String label = brokerLabel(NavTab.PENDING, DataType.LIST);
1✔
673
    Map<String, RqueueWorkerPollerView> workersByConsumer =
1✔
674
        indexWorkersByConsumer(queueConfig.getName());
1✔
675
    Map<String, Integer> workerCountByConsumer = countWorkersByConsumer(queueConfig.getName());
1✔
676
    List<SubscriberRow> rows = new ArrayList<>(views.size());
1✔
677
    long now = System.currentTimeMillis();
1✔
678
    for (SubscriberView v : views) {
1✔
679
      SubscriberRow.SubscriberRowBuilder builder = SubscriberRow.builder()
1✔
680
          .consumerName(v.consumerName())
1✔
681
          .typeLabel(label)
1✔
682
          .storageName(storageName)
1✔
683
          .dataType(DataType.LIST)
1✔
684
          .pending(v.pending())
1✔
685
          .pendingShared(v.pendingShared())
1✔
686
          .inFlight(v.inFlight())
1✔
687
          .workerCount(workerCountByConsumer.getOrDefault(v.consumerName(), 0));
1✔
688
      RqueueWorkerPollerView w = workersByConsumer.get(v.consumerName());
1✔
689
      if (w != null) {
1!
690
        builder
1✔
691
            .status(w.getStatus())
1✔
692
            .host(w.getHost())
1✔
693
            .pid(w.getPid())
1✔
694
            .lastPollAt(w.getLastPollAt());
1✔
695
        if (w.getLastPollAt() > 0) {
1!
696
          builder.lastPollAge(DateTimeUtils.milliToHumanRepresentation(now - w.getLastPollAt()));
1✔
697
        }
698
      }
699
      rows.add(builder.build());
1✔
700
    }
1✔
701
    return rows;
1✔
702
  }
703

704
  /**
705
   * Count the live worker threads bucketed by {@code consumerName}. Mirrors the bucketing that
706
   * {@link #indexWorkersByConsumer(String)} does but keeps the count instead of collapsing to
707
   * the most-recently polling worker — surfaces the {@code @RqueueListener.concurrency} fanout
708
   * separately from the row's representative worker (host / pid / lastPollAt).
709
   */
710
  private Map<String, Integer> countWorkersByConsumer(String queueName) {
711
    List<RqueueWorkerPollerView> workers = rqueueWorkerRegistry.getQueueWorkers(queueName);
1✔
712
    if (CollectionUtils.isEmpty(workers)) {
1!
NEW
713
      return Collections.emptyMap();
×
714
    }
715
    Map<String, Integer> out = new HashMap<>();
1✔
716
    for (RqueueWorkerPollerView w : workers) {
1✔
717
      String key = w.getConsumerName();
1✔
718
      if (key == null || key.isEmpty()) {
1!
NEW
719
        continue;
×
720
      }
721
      out.merge(key, 1, Integer::sum);
1✔
722
    }
1✔
723
    return out;
1✔
724
  }
725

726
  private List<SubscriberView> brokerSubscribers(
727
      QueueConfig queueConfig, QueueDetail brokerQueueDetail) {
728
    if (brokerQueueDetail != null && messageBroker != null) {
1!
729
      try {
730
        List<SubscriberView> views = messageBroker.subscribers(brokerQueueDetail);
1✔
731
        if (views != null && !views.isEmpty()) {
1!
732
          return views;
1✔
733
        }
NEW
734
      } catch (RuntimeException ignored) {
×
735
        // fall through to producer-only path
NEW
736
      }
×
737
    }
738
    // No active QueueDetail registered (producer-only or shutdown). Surface a single row so
739
    // the operator at least sees the queue's pending count from the repository fallback.
NEW
740
    Long pending = messageBrowsingRepository.getDataSize(queueConfig.getQueueName(), DataType.LIST);
×
NEW
741
    if (pending == null || pending <= 0) {
×
NEW
742
      return Collections.emptyList();
×
743
    }
NEW
744
    return Collections.singletonList(new SubscriberView(queueConfig.getName(), pending, 0L, true));
×
745
  }
746

747
  private Map<String, RqueueWorkerPollerView> indexWorkersByConsumer(String queueName) {
748
    List<RqueueWorkerPollerView> workers = rqueueWorkerRegistry.getQueueWorkers(queueName);
1✔
749
    if (CollectionUtils.isEmpty(workers)) {
1!
NEW
750
      return Collections.emptyMap();
×
751
    }
752
    Map<String, RqueueWorkerPollerView> out = new HashMap<>(workers.size());
1✔
753
    for (RqueueWorkerPollerView w : workers) {
1✔
754
      String key = w.getConsumerName();
1✔
755
      if (key == null || key.isEmpty()) {
1!
NEW
756
        continue;
×
757
      }
758
      RqueueWorkerPollerView existing = out.get(key);
1✔
759
      if (existing == null || w.getLastPollAt() > existing.getLastPollAt()) {
1!
760
        out.put(key, w);
1✔
761
      }
762
    }
1✔
763
    return out;
1✔
764
  }
765

766
  /**
767
   * Terminal-storage rows: COMPLETED set + each DLQ. These are shared across subscribers, so
768
   * they live in their own table rather than being repeated on every subscriber row.
769
   */
770
  @Override
771
  public List<TerminalStorageRow> getTerminalRows(QueueConfig queueConfig) {
772
    if (queueConfig == null) {
1!
NEW
773
      return Collections.emptyList();
×
774
    }
775
    List<TerminalStorageRow> out = new ArrayList<>();
1✔
776
    QueueDetail brokerQueueDetail =
777
        messageBroker != null ? lookupQueueDetail(queueConfig.getName()) : null;
1!
778
    if (rqueueConfig.messageInTerminalStateShouldBeStored()
1!
779
        && !StringUtils.isEmpty(queueConfig.getCompletedQueueName())) {
1!
780
      Long completed =
1✔
781
          messageBrowsingRepository.getDataSize(queueConfig.getCompletedQueueName(), DataType.ZSET);
1✔
782
      String completedDisplayName =
783
          brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null
1!
NEW
784
              ? messageBroker.storageDisplayName(brokerQueueDetail)
×
785
              : queueConfig.getCompletedQueueName();
1✔
786
      out.add(TerminalStorageRow.builder()
1✔
787
          .tab(NavTab.COMPLETED)
1✔
788
          .typeLabel(brokerLabel(NavTab.COMPLETED, DataType.ZSET))
1✔
789
          .storageName(completedDisplayName)
1✔
790
          .dataType(DataType.ZSET)
1✔
791
          .size(completed == null ? 0L : completed)
1!
792
          .build());
1✔
793
    }
794
    if (!CollectionUtils.isEmpty(queueConfig.getDeadLetterQueues())) {
1✔
795
      for (DeadLetterQueue dlq : queueConfig.getDeadLetterQueues()) {
1✔
796
        String dlqDisplayName = brokerQueueDetail != null
797
                && messageBroker.dlqStorageDisplayName(brokerQueueDetail) != null
1!
NEW
798
            ? messageBroker.dlqStorageDisplayName(brokerQueueDetail)
×
799
            : dlq.getName();
1✔
800
        long size;
801
        if (dlq.isConsumerEnabled()) {
1!
NEW
802
          size = -1L;
×
803
        } else {
804
          Long dlqSize = messageBrowsingRepository.getDataSize(dlq.getName(), DataType.LIST);
1✔
805
          size = dlqSize == null ? 0L : dlqSize;
1!
806
        }
807
        out.add(TerminalStorageRow.builder()
1✔
808
            .tab(NavTab.DEAD)
1✔
809
            .typeLabel(brokerLabel(NavTab.DEAD, DataType.LIST))
1✔
810
            .storageName(dlqDisplayName)
1✔
811
            .dataType(DataType.LIST)
1✔
812
            .size(size)
1✔
813
            .build());
1✔
814
      }
1✔
815
    }
816
    return out;
1✔
817
  }
818

819
  @Override
820
  public Mono<DataViewResponse> getReactiveExplorePageData(
821
      String src,
822
      String name,
823
      DataType type,
824
      String consumerName,
825
      int pageNumber,
826
      int itemPerPage) {
827
    return Mono.just(getExplorePageData(src, name, type, consumerName, pageNumber, itemPerPage));
1✔
828
  }
829

830
  @Override
831
  public Mono<DataViewResponse> viewReactiveData(
832
      String name, DataType type, String key, int pageNumber, int itemPerPage) {
833
    return Mono.just(viewData(name, type, key, pageNumber, itemPerPage));
1✔
834
  }
835

836
  private interface RowBuilder {
837

838
    default TableRow getRow(RqueueMessage rqueueMessage) {
839
      TableRow row = new TableRow(new TableColumn(rqueueMessage.getId()));
1✔
840
      TableColumn column = new TableColumn(rqueueMessage.toString());
1✔
841
      column.setMeta(Collections.singletonList(
1✔
842
          new RowColumnMeta(RowColumnMetaType.JOBS_BUTTON, rqueueMessage.getId())));
1✔
843
      row.addColumn(column);
1✔
844
      if (rqueueMessage.isPeriodic()) {
1!
845
        row.addColumn(new TableColumn("Periodic(" + rqueueMessage.getPeriod() + ")Ms"));
×
846
      } else {
847
        row.addColumn(new TableColumn("Simple"));
1✔
848
      }
849
      return row;
1✔
850
    }
851

852
    TableRow row(RqueueMessage rqueueMessage, boolean deleted, Double score);
853
  }
854

855
  private static class ListRowBuilder implements RowBuilder {
856

857
    private final boolean deadLetterQueue;
858

859
    ListRowBuilder(boolean deadLetterQueue) {
1✔
860
      this.deadLetterQueue = deadLetterQueue;
1✔
861
    }
1✔
862

863
    @Override
864
    public TableRow row(RqueueMessage rqueueMessage, boolean deleted, Double score) {
865
      TableRow tableRow = getRow(rqueueMessage);
1✔
866
      if (!deadLetterQueue) {
1✔
867
        if (deleted) {
1✔
868
          tableRow.addColumn(new TableColumn(Constants.BLANK));
1✔
869
        } else {
870
          tableRow.addColumn(new TableColumn(TableColumnType.ACTION, ActionType.DELETE));
1✔
871
        }
872
      } else {
873
        tableRow.addColumn(
1✔
874
            new TableColumn(DateTimeUtils.formatMilliToString(rqueueMessage.getReEnqueuedAt())));
1✔
875
      }
876
      return tableRow;
1✔
877
    }
878
  }
879

880
  private static class ZsetRowBuilder implements RowBuilder {
881

882
    private final long currentTime;
883
    private final boolean scheduledQueue;
884
    private final boolean completionQueue;
885

886
    ZsetRowBuilder(boolean scheduledQueue, boolean completionQueue) {
1✔
887
      this.scheduledQueue = scheduledQueue;
1✔
888
      this.completionQueue = completionQueue;
1✔
889
      this.currentTime = System.currentTimeMillis();
1✔
890
    }
1✔
891

892
    @Override
893
    public TableRow row(RqueueMessage rqueueMessage, boolean deleted, Double score) {
894
      TableRow row = getRow(rqueueMessage);
1✔
895
      if (scheduledQueue) {
1✔
896
        row.addColumn(new TableColumn(
1✔
897
            DateTimeUtils.milliToHumanRepresentation(rqueueMessage.getProcessAt() - currentTime)));
1✔
898
      } else if (completionQueue) {
1✔
899
        row.addColumn(new TableColumn(DateTimeUtils.milliToHumanRepresentation(
1✔
900
            System.currentTimeMillis() - score.longValue())));
1✔
901
      } else {
902
        row.addColumn(new TableColumn(
1✔
903
            DateTimeUtils.milliToHumanRepresentation(score.longValue() - currentTime)));
1✔
904
      }
905
      if (!completionQueue) {
1✔
906
        if (!deleted) {
1!
907
          row.addColumn(new TableColumn(
1✔
908
              TableColumnType.ACTION,
909
              scheduledQueue && !rqueueMessage.isPeriodic()
1!
910
                  ? ActionType.ENQUEUE
1✔
911
                  : ActionType.DELETE));
1✔
912
        } else {
913
          row.addColumn(new TableColumn(Constants.BLANK));
×
914
        }
915
      }
916
      return row;
1✔
917
    }
918
  }
919
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc