• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 00429fde-922b-4d6e-8663-1c6d645932ac

13 Mar 2026 03:20PM UTC coverage: 88.627% (-0.7%) from 89.324%
00429fde-922b-4d6e-8663-1c6d645932ac

Pull #279

circleci

igorjava2025
add fixes after review
Pull Request #279: Add hard strict priority mode

1657 of 2001 branches covered (82.81%)

Branch coverage included in aggregate %.

65 of 120 new or added lines in 6 files covered. (54.17%)

4 existing lines in 2 files now uncovered.

5458 of 6027 relevant lines covered (90.56%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.96
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
1
/*
2
 * Copyright (c) 2020-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import static com.github.sonus21.rqueue.core.RedisScriptFactory.getScript;
20

21
import com.github.sonus21.rqueue.common.ReactiveRqueueRedisTemplate;
22
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
23
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
24
import com.github.sonus21.rqueue.core.RqueueMessage;
25
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
26
import com.github.sonus21.rqueue.models.MessageMoveResult;
27
import com.github.sonus21.rqueue.utils.Constants;
28
import com.github.sonus21.rqueue.utils.RedisUtils;
29
import java.util.ArrayList;
30
import java.util.Arrays;
31
import java.util.Collections;
32
import java.util.List;
33
import java.util.Optional;
34
import java.util.Set;
35
import lombok.extern.slf4j.Slf4j;
36
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
37
import org.springframework.data.redis.connection.RedisConnectionFactory;
38
import org.springframework.data.redis.core.RedisTemplate;
39
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
40
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
41
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
42
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
43
import org.springframework.data.redis.core.script.RedisScript;
44
import org.springframework.util.CollectionUtils;
45
import reactor.core.publisher.Flux;
46
import reactor.core.publisher.Mono;
47

48
/**
49
 * RqueueMessageTemplate is the core of the Rqueue, this deals with Redis calls.
50
 *
51
 * <p>It communicates with the Redis using Lua script and direct calls.
52
 */
53
@Slf4j
1✔
54
public class RqueueMessageTemplateImpl extends RqueueRedisTemplate<RqueueMessage>
55
    implements RqueueMessageTemplate {
56

57
  private final DefaultScriptExecutor<String> scriptExecutor;
58
  private final ReactiveScriptExecutor<String> reactiveScriptExecutor;
59
  private final ReactiveRqueueRedisTemplate<RqueueMessage> reactiveRedisTemplate;
60

61
  public RqueueMessageTemplateImpl(
62
      RedisConnectionFactory redisConnectionFactory,
63
      ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
64
    super(redisConnectionFactory);
1✔
65
    this.scriptExecutor = new DefaultScriptExecutor<>(redisTemplate);
1✔
66
    if (reactiveRedisConnectionFactory != null) {
1✔
67
      this.reactiveRedisTemplate =
1✔
68
          new ReactiveRqueueRedisTemplate<>(reactiveRedisConnectionFactory);
69
      this.reactiveScriptExecutor =
1✔
70
          new DefaultReactiveScriptExecutor<>(
71
              reactiveRedisConnectionFactory,
72
              RedisUtils.redisSerializationContextProvider.getSerializationContext());
1✔
73
    } else {
74
      this.reactiveScriptExecutor = null;
1✔
75
      this.reactiveRedisTemplate = null;
1✔
76
    }
77
  }
1✔
78

79
  @Override
80
  public List<RqueueMessage> pop(
81
      String queueName,
82
      String processingQueueName,
83
      String processingChannelName,
84
      long visibilityTimeout,
85
      int count) {
86
    if (count < Constants.MIN_BATCH_SIZE) {
1!
87
      throw new IllegalArgumentException(
×
88
          "Count must be greater than or equal to " + Constants.MIN_BATCH_SIZE);
89
    }
90
    long currentTime = System.currentTimeMillis();
1✔
91
    RedisScript<List<RqueueMessage>> script = getScript(ScriptType.DEQUEUE_MESSAGE);
1✔
92
    List<RqueueMessage> messages =
1✔
93
        scriptExecutor.execute(
1✔
94
            script,
95
            Arrays.asList(queueName, processingQueueName, processingChannelName),
1✔
96
            currentTime,
1✔
97
            currentTime + visibilityTimeout,
1✔
98
            count);
1✔
99
    log.debug("Pop Queue: {}, N: {}, Messages: {}", queueName, count, messages);
1✔
100
    return messages;
1✔
101
  }
102

103
  @Override
104
  public Long addMessageWithDelay(
105
      String delayQueueName, String delayQueueChannelName, RqueueMessage rqueueMessage) {
106
    log.debug("AddMessageWithDelay Queue: {}, Message: {}", delayQueueName, rqueueMessage);
1✔
107
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
108
    return scriptExecutor.execute(
1✔
109
        script,
110
        Arrays.asList(delayQueueName, delayQueueChannelName),
1✔
111
        rqueueMessage,
112
        rqueueMessage.getProcessAt(),
1✔
113
        System.currentTimeMillis());
1✔
114
  }
115

116
  @Override
117
  public Flux<Long> addReactiveMessageWithDelay(
118
      String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage) {
119
    log.debug(
1✔
120
        "AddReactiveMessageWithDelay Queue: {}, Message: {}", scheduledQueueName, rqueueMessage);
121
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
122
    return reactiveScriptExecutor.execute(
1✔
123
        script,
124
        Arrays.asList(scheduledQueueName, scheduledQueueChannelName),
1✔
125
        Arrays.asList(rqueueMessage, rqueueMessage.getProcessAt(), System.currentTimeMillis()));
1✔
126
  }
127

128
  @Override
129
  public Long addMessage(String listName, RqueueMessage rqueueMessage) {
130
    log.debug("AddMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
131
    return rpush(listName, rqueueMessage);
1✔
132
  }
133

134
  @Override
135
  public Mono<Long> addReactiveMessage(String listName, RqueueMessage rqueueMessage) {
136
    log.debug("AddReactiveMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
137
    return reactiveRedisTemplate.template().opsForList().rightPush(listName, rqueueMessage);
1✔
138
  }
139

140
  @Override
141
  public Boolean addToZset(String zsetName, RqueueMessage rqueueMessage, long score) {
142
    log.debug("AddToZset Queue: {}, Message: {}", zsetName, rqueueMessage);
1✔
143
    return zadd(zsetName, rqueueMessage, score);
1✔
144
  }
145

146
  @Override
147
  public void moveMessageWithDelay(
148
      String srcZsetName, String tgtZsetName, RqueueMessage src, RqueueMessage tgt, long delay) {
149
    log.debug(
1✔
150
        "MoveMessageWithDelay Src:[Q={},M={}], Dst:[Q={},M={}]",
151
        srcZsetName,
152
        src,
153
        tgtZsetName,
154
        tgt);
155
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_ZSET);
1✔
156
    Long response =
1✔
157
        scriptExecutor.execute(
1✔
158
            script,
159
            Arrays.asList(srcZsetName, tgtZsetName),
1✔
160
            src,
161
            tgt,
162
            System.currentTimeMillis() + delay);
1✔
163
    if (response == null) {
1!
164
      log.error("Duplicate processing for the message {}", src);
×
165
    }
166
  }
1✔
167

168
  @Override
169
  public void moveMessage(
170
      String srcZsetName, String tgtListName, RqueueMessage src, RqueueMessage tgt) {
171
    log.debug("MoveMessage Src:[Q={},M={}], Dst:[Q={},M={}]", srcZsetName, src, tgtListName, tgt);
×
172
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_LIST);
×
173
    Long response =
×
174
        scriptExecutor.execute(script, Arrays.asList(srcZsetName, tgtListName), src, tgt);
×
175
    if (response == null) {
×
176
      log.error("Duplicate processing for the message {}", src);
×
177
    }
178
  }
×
179

180
  @Override
181
  public List<RqueueMessage> getAllMessages(
182
      String queueName, String processingQueueName, String delayQueueName) {
183
    List<RqueueMessage> messages = lrange(queueName, 0, -1);
1✔
184
    if (CollectionUtils.isEmpty(messages)) {
1✔
185
      messages = new ArrayList<>();
1✔
186
    }
187
    Set<RqueueMessage> messagesInProcessingQueue = zrange(processingQueueName, 0, -1);
1✔
188
    if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) {
1✔
189
      messages.addAll(messagesInProcessingQueue);
1✔
190
    }
191
    if (delayQueueName != null) {
1!
192
      Set<RqueueMessage> messagesFromZset = zrange(delayQueueName, 0, -1);
1✔
193
      if (!CollectionUtils.isEmpty(messagesFromZset)) {
1✔
194
        messages.addAll(messagesFromZset);
1✔
195
      }
196
    }
197
    return messages;
1✔
198
  }
199

200
  @Override
201
  public Long getScore(String zsetName, RqueueMessage message) {
202
    Double score = redisTemplate.opsForZSet().score(zsetName, message);
1✔
203
    if (score == null) {
1✔
204
      return null;
1✔
205
    }
206
    return score.longValue();
1✔
207
  }
208

209
  @Override
210
  public boolean addScore(String zsetName, RqueueMessage message, long delta) {
211
    return scriptExecutor.execute(
1✔
212
        getScript(ScriptType.SCORE_UPDATER), Collections.singletonList(zsetName), message, delta);
1✔
213
  }
214

215
  private MessageMoveResult moveMessageToList(
216
      String src, String dst, int maxMessage, ScriptType scriptType) {
217
    RedisScript<Long> script = getScript(scriptType);
1✔
218
    long messagesInSrc = maxMessage;
1✔
219
    int remainingMessages = maxMessage;
1✔
220
    while (messagesInSrc > 0 && remainingMessages > 0) {
1✔
221
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
222
      messagesInSrc = scriptExecutor.execute(script, Arrays.asList(src, dst), messageCount);
1✔
223
      remainingMessages -= messageCount;
1✔
224
    }
1✔
225
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
226
  }
227

228
  @Override
229
  public MessageMoveResult moveMessageListToList(
230
      String srcQueueName, String dstQueueName, int numberOfMessage) {
231
    return moveMessageToList(
1✔
232
        srcQueueName, dstQueueName, numberOfMessage, ScriptType.MOVE_MESSAGE_LIST_TO_LIST);
233
  }
234

235
  @Override
236
  public MessageMoveResult moveMessageZsetToList(
237
      String sourceZset, String destinationList, int maxMessage) {
238
    return moveMessageToList(
1✔
239
        sourceZset, destinationList, maxMessage, ScriptType.MOVE_MESSAGE_ZSET_TO_LIST);
240
  }
241

242
  @Override
243
  public MessageMoveResult moveMessageListToZset(
244
      String sourceList, String destinationZset, int maxMessage, long score) {
245
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_LIST_TO_ZSET);
1✔
246
    long messagesInList = maxMessage;
1✔
247
    int remainingMessages = maxMessage;
1✔
248
    while (messagesInList > 0 && remainingMessages > 0) {
1!
249
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
250
      messagesInList =
1✔
251
          scriptExecutor.execute(
1✔
252
              script, Arrays.asList(sourceList, destinationZset), messageCount, score);
1✔
253
      remainingMessages -= messageCount;
1✔
254
    }
1✔
255
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
256
  }
257

258
  @Override
259
  public MessageMoveResult moveMessageZsetToZset(
260
      String sourceZset,
261
      String destinationZset,
262
      int maxMessage,
263
      long newScore,
264
      boolean fixedScore) {
265
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_ZSET_TO_ZSET);
1✔
266
    long messageInZset = maxMessage;
1✔
267
    int remainingMessages = maxMessage;
1✔
268
    while (messageInZset > 0 && remainingMessages > 0) {
1✔
269
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
270
      messageInZset =
1✔
271
          scriptExecutor.execute(
1✔
272
              script,
273
              Arrays.asList(sourceZset, destinationZset),
1✔
274
              messageCount,
1✔
275
              newScore,
1✔
276
              fixedScore);
1✔
277
      remainingMessages -= messageCount;
1✔
278
    }
1✔
279
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
280
  }
281

282
  @Override
283
  public List<RqueueMessage> readFromZset(String name, long start, long end) {
284
    Set<RqueueMessage> messages = zrange(name, start, end);
1✔
285
    if (messages == null) {
1!
286
      return new ArrayList<>();
×
287
    }
288
    return new ArrayList<>(messages);
1✔
289
  }
290

291
  @Override
292
  public List<TypedTuple<RqueueMessage>> readFromZsetWithScore(String name, long start, long end) {
293
    Set<TypedTuple<RqueueMessage>> messages = zrangeWithScore(name, start, end);
1✔
294
    if (messages == null) {
1!
295
      return new ArrayList<>();
×
296
    }
297
    return new ArrayList<>(messages);
1✔
298
  }
299

300
  @Override
301
  public Long scheduleMessage(
302
      String zsetName, String messageId, RqueueMessage rqueueMessage, Long expiryInSeconds) {
303
    RedisScript<Long> script = getScript(ScriptType.SCHEDULE_MESSAGE);
1✔
304
    return scriptExecutor.execute(
1✔
305
        script,
306
        Arrays.asList(messageId, zsetName),
1✔
307
        expiryInSeconds,
308
        rqueueMessage,
309
        rqueueMessage.getProcessAt());
1✔
310
  }
311

312
  @Override
313
  public boolean renameCollection(String srcName, String tgtName) {
314
    rename(srcName, tgtName);
1✔
315
    return true;
1✔
316
  }
317

318
  @Override
319
  public boolean renameCollections(List<String> srcNames, List<String> tgtNames) {
320
    rename(srcNames, tgtNames);
1✔
321
    return true;
1✔
322
  }
323

324
  @Override
325
  public void deleteCollection(String name) {
326
    redisTemplate.delete(name);
1✔
327
  }
1✔
328

329
  @Override
330
  public List<RqueueMessage> readFromList(String name, long start, long end) {
331
    List<RqueueMessage> messages = lrange(name, start, end);
1✔
332
    if (messages == null) {
1!
333
      return new ArrayList<>();
×
334
    }
335
    return messages;
1✔
336
  }
337

338
  @Override
339
  public RedisTemplate<String, RqueueMessage> getTemplate() {
340
    return super.redisTemplate;
1✔
341
  }
342

343
  @Override
344
  public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
345
    return super.removeFromZset(zsetName, rqueueMessage);
1✔
346
  }
347

348
  @Override
349
  public Optional<RqueueMessage> findFirstElementFromList(String name) {
NEW
350
    return readFromList(name, 0, 0).stream().findFirst();
×
351
  }
352

353
  @Override
354
  public Optional<RqueueMessage> findFirstElementFromZset(String name) {
NEW
355
    return readFromZset(name, 0, 0).stream().findFirst();
×
356
  }
357

358
  @Override
359
  public Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name) {
NEW
360
    return readFromZsetWithScore(name, 0, 0).stream().findFirst();
×
361
  }
362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc