• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600409404

09 May 2026 11:49AM UTC coverage: 83.347%. First build
25600409404

Pull #295

github

web-flow
Merge ace5e3c2a into 9757517ae
Pull Request #295: Nats v2 web

2564 of 3407 branches covered (75.26%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

7711 of 8921 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.18
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.spi;
18

19
import com.github.sonus21.rqueue.core.RqueueMessage;
20
import com.github.sonus21.rqueue.listener.QueueDetail;
21
import java.time.Duration;
22
import java.util.List;
23
import java.util.function.Consumer;
24
import reactor.core.publisher.Mono;
25

26
/**
27
 * Internal SPI. Subject to change. Application code must not depend on this directly.
28
 */
29
public interface MessageBroker {
30
  void enqueue(QueueDetail q, RqueueMessage m);
31

32
  /**
33
   * Priority-aware enqueue overload. Implementations that route to a per-priority destination
34
   * (e.g. a NATS subject suffixed with the priority name) override this. The default delegates
35
   * to {@link #enqueue(QueueDetail, RqueueMessage)} so backends without per-priority routing
36
   * (Redis already encodes priority in the queue name) keep their existing behavior.
37
   *
38
   * @param q queue detail (already priority-suffixed for backends that key off queue name)
39
   * @param priority priority name as declared on {@code @RqueueListener.priority}; may be
40
   *     {@code null} or empty for the default priority bucket
41
   * @param m message to publish
42
   */
43
  default void enqueue(QueueDetail q, String priority, RqueueMessage m) {
44
    enqueue(q, m);
1✔
45
  }
1✔
46

47
  void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs);
48

49
  /**
50
   * Called by {@code RqueueEndpointManager.registerQueue} after a queue is added to the registry.
51
   * Backends that need to provision resources (e.g. JetStream streams) at registration time should
52
   * override this. The default is a no-op so Redis and other backends are unaffected.
53
   */
54
  default void onQueueRegistered(QueueDetail q) {}
1✔
55

56
  /**
57
   * Validate the queue name against backend-specific rules. Called from every queue-registration
58
   * path ({@code RqueueEndpointManager.registerQueue} and the {@code @RqueueListener} bootstrap)
59
   * before the queue is added to the registry, so an illegal name fails fast with a clear error
60
   * instead of surfacing later as an opaque NATS / driver-side rejection.
61
   *
62
   * <p>Default is a no-op — backends like Redis accept any non-empty name.
63
   *
64
   * @throws IllegalArgumentException if {@code queueName} is not legal for this backend
65
   */
66
  default void validateQueueName(String queueName) {}
1✔
67

68
  /**
69
   * Reactive variant of {@link #enqueue(QueueDetail, RqueueMessage)}. The default falls back to the
70
   * blocking implementation wrapped in {@code Mono.fromRunnable}; backends with native async
71
   * publish APIs (e.g. JetStream) should override this to avoid blocking the calling thread.
72
   */
73
  default Mono<Void> enqueueReactive(QueueDetail q, RqueueMessage m) {
74
    return Mono.fromRunnable(() -> enqueue(q, m));
×
75
  }
76

77
  /**
78
   * Reactive variant of {@link #enqueueWithDelay(QueueDetail, RqueueMessage, long)}. The default
79
   * falls back to the blocking implementation. Backends that do not support delayed enqueue should
80
   * override this to return {@code Mono.error(new UnsupportedOperationException(...))}.
81
   */
82
  default Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long delayMs) {
83
    return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs));
×
84
  }
85

86
  List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait);
87

88
  /**
89
   * Priority-aware pop overload. Implementations that route to a per-priority stream/consumer
90
   * override this; the default delegates to
91
   * {@link #pop(QueueDetail, String, int, Duration)}.
92
   *
93
   * @param q queue detail
94
   * @param priority priority name; {@code null} or empty for the default bucket
95
   * @param consumerName durable consumer name (already priority-suffixed by the caller for
96
   *     backends that key off the consumer name)
97
   * @param batch maximum messages to fetch
98
   * @param wait fetch wait duration
99
   */
100
  default List<RqueueMessage> pop(
101
      QueueDetail q, String priority, String consumerName, int batch, Duration wait) {
102
    return pop(q, consumerName, batch, wait);
×
103
  }
104

105
  boolean ack(QueueDetail q, RqueueMessage m);
106

107
  boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs);
108

109
  long moveExpired(QueueDetail q, long now, int batch);
110

111
  List<RqueueMessage> peek(QueueDetail q, long offset, long count);
112

113
  /**
114
   * Consumer-aware peek overload. When {@code consumerName} is non-null and the backend has
115
   * per-consumer offsets (e.g. NATS Limits-retention streams), the implementation starts
116
   * pagination from that consumer's next undelivered sequence so the dashboard shows messages
117
   * still pending for that specific subscriber instead of the entire retained window. The
118
   * default delegates to {@link #peek(QueueDetail, long, long)} for backends with a single
119
   * shared pool.
120
   */
121
  default List<RqueueMessage> peek(QueueDetail q, String consumerName, long offset, long count) {
NEW
122
    return peek(q, offset, count);
×
123
  }
124

125
  /**
126
   * Remove {@code old} from the processing store and re-enqueue {@code updated} for retry.
127
   * {@code delayMs <= 0} means immediate; {@code delayMs > 0} means schedule after that delay.
128
   * Backends without a processing store (e.g. NATS) default to a plain nack.
129
   */
130
  default void parkForRetry(QueueDetail q, RqueueMessage old, RqueueMessage updated, long delayMs) {
131
    nack(q, updated, delayMs);
×
132
  }
×
133

134
  /**
135
   * Remove {@code old} from the processing store and enqueue {@code updated} to {@code targetQueue}.
136
   * {@code delayMs <= 0} means immediate (list push); {@code delayMs > 0} means schedule (sorted-set).
137
   * Backends without a processing store default to a plain enqueue to the DLQ.
138
   */
139
  default void moveToDlq(
140
      QueueDetail source,
141
      String targetQueue,
142
      RqueueMessage old,
143
      RqueueMessage updated,
144
      long delayMs) {
145
    if (delayMs > 0) {
×
146
      enqueueWithDelay(source, updated, delayMs);
×
147
    } else {
148
      enqueue(source, updated);
×
149
    }
150
  }
×
151

152
  /**
153
   * Schedule the next execution of a periodic message.
154
   * {@code messageKey} is the deduplication key; {@code expirySeconds} is the TTL for that key.
155
   * Backends that don't support server-side scheduling default to a delayed enqueue.
156
   */
157
  default void scheduleNext(
158
      QueueDetail q, String messageKey, RqueueMessage message, long expirySeconds) {
159
    long delayMs = Math.max(0, message.getProcessAt() - System.currentTimeMillis());
1✔
160
    enqueueWithDelay(q, message, delayMs);
1✔
161
  }
1✔
162

163
  /**
164
   * Returns the score (epoch-ms deadline) of {@code m} in the processing store, or {@code null}
165
   * if the backend does not track per-message visibility (e.g. NATS uses consumer-level AckWait).
166
   */
167
  default Long getVisibilityTimeoutScore(QueueDetail q, RqueueMessage m) {
168
    return null;
×
169
  }
170

171
  /**
172
   * Adds {@code deltaMs} to the visibility timeout of {@code m} in the processing store.
173
   * Returns {@code false} if the backend does not support per-message lease extension.
174
   */
175
  default boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long deltaMs) {
176
    return false;
×
177
  }
178

179
  long size(QueueDetail q);
180

181
  /**
182
   * Short label for the storage backend shown in the dashboard "Queue Storage Footprint" section
183
   * header (e.g. "Redis", "NATS"). Defaults to "Redis".
184
   */
185
  default String storageKicker() {
186
    return "Redis";
1✔
187
  }
188

189
  /**
190
   * One-line description for the storage backend shown below the footprint section heading.
191
   * Defaults to the Redis description.
192
   */
193
  default String storageDescription() {
194
    return "Underlying Redis structures for the queues visible on this page.";
1✔
195
  }
196

197
  /**
198
   * Display name for the primary storage unit backing the given queue's messages (pending,
199
   * in-flight, and completed). Returns {@code null} to fall back to the Redis key name.
200
   */
201
  default String storageDisplayName(QueueDetail q) {
202
    return null;
1✔
203
  }
204

205
  /**
206
   * Display name for the dead-letter storage unit of the given queue. Returns {@code null} to
207
   * fall back to the DLQ key name stored in {@code DeadLetterQueue}.
208
   */
209
  default String dlqStorageDisplayName(QueueDetail q) {
210
    return null;
1✔
211
  }
212

213
  /**
214
   * Indicates whether {@link #size(QueueDetail)} returns an exact count or an approximation
215
   * for the given queue. Brokers that compute pending from per-consumer position math (e.g.
216
   * NATS JetStream Limits-retention streams) return {@code true} so the dashboard renders
217
   * the figure with a "~" prefix instead of presenting it as authoritative. Defaults to
218
   * {@code false} (the historical Redis behavior — exact list / sorted-set lengths).
219
   */
220
  default boolean isSizeApproximate(QueueDetail q) {
221
    return false;
1✔
222
  }
223

224
  /**
225
   * Per-consumer pending breakdown for queues whose backend has multiple independent
226
   * subscribers — e.g. JetStream Limits-retention streams where each durable consumer
227
   * progresses at its own pace. Returns an ordered map of {@code consumerName -> pending}
228
   * so the dashboard can render one row per consumer instead of a single aggregate.
229
   *
230
   * <p>The default returns {@code null}, signalling that the queue has a single shared pool
231
   * (Redis lists, NATS WorkQueue streams) and the caller should fall back to
232
   * {@link #size(QueueDetail)}. Empty / null also means "no consumers attached".
233
   *
234
   * @deprecated superseded by {@link #subscribers(QueueDetail)} which returns a richer view
235
   *     (consumer + pending + in-flight + shared flag). Retained for one release so
236
   *     downstream callers keep compiling.
237
   */
238
  @Deprecated
239
  default java.util.Map<String, Long> consumerPendingSizes(QueueDetail q) {
240
    return null;
1✔
241
  }
242

243
  /**
244
   * Per-subscriber breakdown for the queue-detail dashboard. Each entry represents one
245
   * logical handler attached to the queue:
246
   *
247
   * <ul>
248
   *   <li><b>Redis</b> — one entry per {@code @RqueueListener} method that registered for
249
   *       the queue. {@code pending} is the shared list size on every row
250
   *       ({@code pendingShared = true}); {@code inFlight} is the shared processing-ZSET
251
   *       size.
252
   *   <li><b>NATS JetStream</b> — one entry per durable consumer. For WorkQueue retention
253
   *       {@code pending} is the shared stream {@code msgCount} ({@code pendingShared = true});
254
   *       for Limits retention it is the exact per-consumer {@code numPending}
255
   *       ({@code pendingShared = false}). {@code inFlight} is the consumer's
256
   *       {@code numAckPending} in both cases.
257
   * </ul>
258
   *
259
   * <p>The default returns a single anonymous row backed by {@link #size(QueueDetail)}, so
260
   * brokers that don't track named subscribers still render a working table.
261
   */
262
  default java.util.List<SubscriberView> subscribers(QueueDetail q) {
263
    long pending;
264
    try {
NEW
265
      pending = size(q);
×
NEW
266
    } catch (RuntimeException e) {
×
NEW
267
      pending = 0L;
×
NEW
268
    }
×
NEW
269
    return java.util.Collections.singletonList(
×
NEW
270
        new SubscriberView(q.resolvedConsumerName(), pending, 0L, true));
×
271
  }
272

273
  /**
274
   * Backend-aware human-readable label for the given Redis-shaped {@code DataType} on the given
275
   * dashboard tab. Surfaces in the queue-detail page's "Data Type" column so NATS deployments
276
   * can show "Queue (Stream)" instead of "LIST".
277
   *
278
   * <p>The default returns {@code null}, which the dashboard interprets as "fall back to
279
   * {@code DataType.name()}" (the historical Redis behavior).
280
   *
281
   * @param tab the dashboard nav tab the row corresponds to (PENDING, RUNNING, SCHEDULED, DEAD,
282
   *     COMPLETED, etc.). May be {@code null} when called in a context without a tab.
283
   * @param type Redis-shaped data type used by the dashboard's table rendering.
284
   * @return display label, or {@code null} to fall through to the default rendering.
285
   */
286
  default String dataTypeLabel(
287
      com.github.sonus21.rqueue.models.enums.NavTab tab,
288
      com.github.sonus21.rqueue.models.enums.DataType type) {
289
    return null;
1✔
290
  }
291

292
  AutoCloseable subscribe(String channel, Consumer<String> handler);
293

294
  void publish(String channel, String payload);
295

296
  Capabilities capabilities();
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc