• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600409404

09 May 2026 11:49AM UTC coverage: 83.347%. First build
25600409404

Pull #295

github

web-flow
Merge ace5e3c2a into 9757517ae
Pull Request #295: Nats v2 web

2564 of 3407 branches covered (75.26%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

7711 of 8921 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.79
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.spi.redis;
18

19
import com.github.sonus21.rqueue.core.RqueueMessage;
20
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
21
import com.github.sonus21.rqueue.core.spi.Capabilities;
22
import com.github.sonus21.rqueue.core.spi.MessageBroker;
23
import com.github.sonus21.rqueue.listener.QueueDetail;
24
import com.github.sonus21.rqueue.models.MessageMoveResult;
25
import com.github.sonus21.rqueue.utils.RedisUtils;
26
import java.time.Duration;
27
import java.util.List;
28
import java.util.function.Consumer;
29
import org.springframework.data.redis.connection.Message;
30
import org.springframework.data.redis.connection.MessageListener;
31
import org.springframework.data.redis.core.RedisTemplate;
32
import org.springframework.data.redis.listener.ChannelTopic;
33
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
34
import reactor.core.publisher.Mono;
35

36
/**
37
 * Default {@link MessageBroker} implementation that delegates to the existing Redis-backed
38
 * code path via {@link RqueueMessageTemplate}.
39
 *
40
 * <p>This is a thin Phase 1 wrapper: every method routes to the same call site that the existing
41
 * public API uses. No Lua scripts, DAO impls, or message flows are duplicated here. The intent is
42
 * to introduce the SPI seam without changing observable Redis behavior.
43
 */
44
public class RedisMessageBroker implements MessageBroker {
45

46
  private final RqueueMessageTemplate template;
47
  private final RedisMessageListenerContainer pubSubContainer;
48

49
  public RedisMessageBroker(RqueueMessageTemplate template) {
50
    this(template, null);
1✔
51
  }
1✔
52

53
  public RedisMessageBroker(
54
      RqueueMessageTemplate template, RedisMessageListenerContainer pubSubContainer) {
1✔
55
    if (template == null) {
1!
56
      throw new IllegalArgumentException("template cannot be null");
×
57
    }
58
    this.template = template;
1✔
59
    this.pubSubContainer = pubSubContainer;
1✔
60
  }
1✔
61

62
  public RqueueMessageTemplate getTemplate() {
63
    return template;
×
64
  }
65

66
  @Override
67
  public void enqueue(QueueDetail q, RqueueMessage m) {
68
    template.addMessage(q.getQueueName(), m);
1✔
69
  }
1✔
70

71
  @Override
72
  public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
73
    // Delegate to existing scheduled-queue add path; processAt is encoded on the message.
74
    template.addMessageWithDelay(q.getScheduledQueueName(), q.getScheduledQueueChannelName(), m);
1✔
75
  }
1✔
76

77
  /**
78
   * Override the SPI default (which wraps the blocking call in {@code Mono.fromRunnable}) so
79
   * reactive callers stay on the reactive Redis driver and never block a thread.
80
   */
81
  @Override
82
  public Mono<Void> enqueueReactive(QueueDetail q, RqueueMessage m) {
83
    return template.addReactiveMessage(q.getQueueName(), m).then();
1✔
84
  }
85

86
  /** Reactive scheduled-queue equivalent of {@link #enqueueWithDelay}. */
87
  @Override
88
  public Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long delayMs) {
89
    return template
1✔
90
        .addReactiveMessageWithDelay(q.getScheduledQueueName(), q.getScheduledQueueChannelName(), m)
1✔
91
        .then();
1✔
92
  }
93

94
  @Override
95
  public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
96
    return template.pop(
1✔
97
        q.getQueueName(),
1✔
98
        q.getProcessingQueueName(),
1✔
99
        q.getProcessingQueueChannelName(),
1✔
100
        q.getVisibilityTimeout(),
1✔
101
        batch);
102
  }
103

104
  @Override
105
  public boolean ack(QueueDetail q, RqueueMessage m) {
106
    Long removed = template.removeElementFromZset(q.getProcessingQueueName(), m);
1✔
107
    return removed != null && removed > 0;
1!
108
  }
109

110
  @Override
111
  public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) {
112
    if (retryDelayMs <= 0) {
1✔
113
      template.moveMessage(q.getProcessingQueueName(), q.getQueueName(), m, m);
1✔
114
    } else {
115
      template.moveMessageWithDelay(
1✔
116
          q.getProcessingQueueName(), q.getScheduledQueueName(), m, m, retryDelayMs);
1✔
117
    }
118
    return true;
1✔
119
  }
120

121
  @Override
122
  public long moveExpired(QueueDetail q, long now, int batch) {
123
    MessageMoveResult result =
1✔
124
        template.moveMessageZsetToList(q.getScheduledQueueName(), q.getQueueName(), batch);
1✔
125
    return result == null ? 0L : result.getNumberOfMessages();
1!
126
  }
127

128
  @Override
129
  public List<RqueueMessage> peek(QueueDetail q, long offset, long count) {
130
    long end = (count <= 0) ? -1L : offset + count - 1;
1!
131
    return template.readFromList(q.getQueueName(), offset, end);
1✔
132
  }
133

134
  @Override
135
  public long size(QueueDetail q) {
136
    RedisTemplate<String, RqueueMessage> rt = template.getTemplate();
1✔
137
    Long size = rt.opsForList().size(q.getQueueName());
1✔
138
    return size == null ? 0L : size;
1!
139
  }
140

141
  /**
142
   * Per-subscriber rows for the Redis backend. Walks the {@link
143
   * com.github.sonus21.rqueue.core.EndpointRegistry} for every {@code @RqueueListener}
144
   * registered against this queue, then reports the shared list size and processing-ZSET
145
   * size on each row. {@code pendingShared = true} on every row because Redis listeners
146
   * compete on the same backing list — the figure is identical across rows but surfacing
147
   * each handler still tells the operator which methods are subscribed and (joined with
148
   * the worker registry by the dashboard) when each was last active.
149
   */
150
  @Override
151
  public java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> subscribers(
152
      QueueDetail q) {
153
    long sharedPending;
154
    try {
155
      sharedPending = size(q);
1✔
NEW
156
    } catch (RuntimeException e) {
×
NEW
157
      sharedPending = 0L;
×
158
    }
1✔
159
    long sharedInFlight;
160
    try {
161
      RedisTemplate<String, RqueueMessage> rt = template.getTemplate();
1✔
162
      Long zsetSize = rt.opsForZSet().size(q.getProcessingQueueName());
1✔
163
      sharedInFlight = zsetSize == null ? 0L : zsetSize;
1!
NEW
164
    } catch (RuntimeException e) {
×
NEW
165
      sharedInFlight = 0L;
×
166
    }
1✔
167
    java.util.List<QueueDetail> registered =
1✔
168
        com.github.sonus21.rqueue.core.EndpointRegistry.getAllForQueue(q.getName());
1✔
169
    if (registered.isEmpty()) {
1!
170
      // Queue is registered as primary only (no secondary handlers) — fall through to a
171
      // single row using the queue's own consumer name so the table still renders.
NEW
172
      return java.util.Collections.singletonList(
×
173
          new com.github.sonus21.rqueue.core.spi.SubscriberView(
NEW
174
              q.resolvedConsumerName(), sharedPending, sharedInFlight, true));
×
175
    }
176
    java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> out =
1✔
177
        new java.util.ArrayList<>(registered.size());
1✔
178
    for (QueueDetail qd : registered) {
1✔
179
      out.add(new com.github.sonus21.rqueue.core.spi.SubscriberView(
1✔
180
          qd.resolvedConsumerName(), sharedPending, sharedInFlight, true));
1✔
181
    }
1✔
182
    return out;
1✔
183
  }
184

185
  @Override
186
  public AutoCloseable subscribe(String channel, Consumer<String> handler) {
187
    if (pubSubContainer == null) {
1!
188
      throw new IllegalStateException(
×
189
          "RedisMessageListenerContainer not configured for RedisMessageBroker; subscribe is"
190
              + " unavailable");
191
    }
192
    final ChannelTopic topic = new ChannelTopic(channel);
1✔
193
    final MessageListener listener = new MessageListener() {
1✔
194
      @Override
195
      public void onMessage(Message message, byte[] pattern) {
196
        byte[] body = message.getBody();
1✔
197
        if (body == null) {
1!
198
          return;
×
199
        }
200
        handler.accept(new String(body));
1✔
201
      }
1✔
202
    };
203
    pubSubContainer.addMessageListener(listener, topic);
1✔
204
    return () -> pubSubContainer.removeMessageListener(listener, topic);
1✔
205
  }
206

207
  @Override
208
  public void publish(String channel, String payload) {
209
    template.getTemplate().convertAndSend(channel, payload);
1✔
210
  }
1✔
211

212
  @Override
213
  public void parkForRetry(QueueDetail q, RqueueMessage old, RqueueMessage updated, long delayMs) {
214
    if (delayMs <= 0) {
1!
215
      template.moveMessage(q.getProcessingQueueName(), q.getQueueName(), old, updated);
×
216
    } else {
217
      template.moveMessageWithDelay(
1✔
218
          q.getProcessingQueueName(), q.getScheduledQueueName(), old, updated, delayMs);
1✔
219
    }
220
  }
1✔
221

222
  @Override
223
  public void moveToDlq(
224
      QueueDetail source,
225
      String targetQueue,
226
      RqueueMessage old,
227
      RqueueMessage updated,
228
      long delayMs) {
229
    RedisUtils.executePipeLine(
1✔
230
        template.getTemplate(), (connection, keySerializer, valueSerializer) -> {
1✔
231
          byte[] updatedBytes = valueSerializer.serialize(updated);
1✔
232
          byte[] oldBytes = valueSerializer.serialize(old);
1✔
233
          byte[] processingQueueBytes = keySerializer.serialize(source.getProcessingQueueName());
1✔
234
          byte[] targetQueueBytes = keySerializer.serialize(targetQueue);
1✔
235
          if (delayMs > 0) {
1✔
236
            connection.zAdd(targetQueueBytes, delayMs, updatedBytes);
1✔
237
          } else {
238
            connection.lPush(targetQueueBytes, updatedBytes);
1✔
239
          }
240
          connection.zRem(processingQueueBytes, oldBytes);
1✔
241
        });
1✔
242
  }
1✔
243

244
  @Override
245
  public void scheduleNext(
246
      QueueDetail q, String messageKey, RqueueMessage message, long expirySeconds) {
247
    template.scheduleMessage(q.getScheduledQueueName(), messageKey, message, expirySeconds);
1✔
248
  }
1✔
249

250
  @Override
251
  public Long getVisibilityTimeoutScore(QueueDetail q, RqueueMessage m) {
252
    return template.getScore(q.getProcessingQueueName(), m);
1✔
253
  }
254

255
  @Override
256
  public boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long deltaMs) {
257
    return template.addScore(q.getProcessingQueueName(), m, deltaMs);
1✔
258
  }
259

260
  @Override
261
  public Capabilities capabilities() {
262
    return Capabilities.REDIS_DEFAULTS;
1✔
263
  }
264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc