• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2794

03 Jul 2024 04:24AM UTC coverage: 91.353% (-0.07%) from 91.424%
2794

Pull #231

circleci

Sonu Kumar
javadoc
Pull Request #231: Do not retry

39 of 48 new or added lines in 10 files covered. (81.25%)

4 existing lines in 3 files now uncovered.

5388 of 5898 relevant lines covered (91.35%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java
1
/*
2
 * Copyright (c) 2020-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import static com.github.sonus21.rqueue.core.RedisScriptFactory.getScript;
20

21
import com.github.sonus21.rqueue.common.ReactiveRqueueRedisTemplate;
22
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
23
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
24
import com.github.sonus21.rqueue.core.RqueueMessage;
25
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
26
import com.github.sonus21.rqueue.models.MessageMoveResult;
27
import com.github.sonus21.rqueue.utils.Constants;
28
import com.github.sonus21.rqueue.utils.RedisUtils;
29
import java.util.ArrayList;
30
import java.util.Arrays;
31
import java.util.Collections;
32
import java.util.List;
33
import java.util.Set;
34
import lombok.extern.slf4j.Slf4j;
35
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
36
import org.springframework.data.redis.connection.RedisConnectionFactory;
37
import org.springframework.data.redis.core.RedisTemplate;
38
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
39
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
40
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
41
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
42
import org.springframework.data.redis.core.script.RedisScript;
43
import org.springframework.util.CollectionUtils;
44
import reactor.core.publisher.Flux;
45
import reactor.core.publisher.Mono;
46

47
/**
48
 * RqueueMessageTemplate is the core of the Rqueue, this deals with Redis calls.
49
 *
50
 * <p>It communicates with the Redis using Lua script and direct calls.
51
 */
52
@Slf4j
1✔
53
public class RqueueMessageTemplateImpl extends RqueueRedisTemplate<RqueueMessage>
54
    implements RqueueMessageTemplate {
55

56
  private final DefaultScriptExecutor<String> scriptExecutor;
57
  private final ReactiveScriptExecutor<String> reactiveScriptExecutor;
58
  private final ReactiveRqueueRedisTemplate<RqueueMessage> reactiveRedisTemplate;
59

60
  public RqueueMessageTemplateImpl(
61
      RedisConnectionFactory redisConnectionFactory,
62
      ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
63
    super(redisConnectionFactory);
1✔
64
    this.scriptExecutor = new DefaultScriptExecutor<>(redisTemplate);
1✔
65
    if (reactiveRedisConnectionFactory != null) {
1✔
66
      this.reactiveRedisTemplate =
1✔
67
          new ReactiveRqueueRedisTemplate<>(reactiveRedisConnectionFactory);
68
      this.reactiveScriptExecutor =
1✔
69
          new DefaultReactiveScriptExecutor<>(
70
              reactiveRedisConnectionFactory,
71
              RedisUtils.redisSerializationContextProvider.getSerializationContext());
1✔
72
    } else {
73
      this.reactiveScriptExecutor = null;
1✔
74
      this.reactiveRedisTemplate = null;
1✔
75
    }
76
  }
1✔
77

78
  @Override
79
  public List<RqueueMessage> pop(
80
      String queueName,
81
      String processingQueueName,
82
      String processingChannelName,
83
      long visibilityTimeout,
84
      int count) {
85
    if (count < Constants.MIN_BATCH_SIZE) {
1✔
86
      throw new IllegalArgumentException(
×
87
          "Count must be greater than or equal to " + Constants.MIN_BATCH_SIZE);
88
    }
89
    long currentTime = System.currentTimeMillis();
1✔
90
    RedisScript<List<RqueueMessage>> script = getScript(ScriptType.DEQUEUE_MESSAGE);
1✔
91
    List<RqueueMessage> messages =
1✔
92
        scriptExecutor.execute(
1✔
93
            script,
94
            Arrays.asList(queueName, processingQueueName, processingChannelName),
1✔
95
            currentTime,
1✔
96
            currentTime + visibilityTimeout,
1✔
97
            count);
1✔
98
    log.debug("Pop Queue: {}, N: {}, Messages: {}", queueName, count, messages);
1✔
99
    return messages;
1✔
100
  }
101

102
  @Override
103
  public Long addMessageWithDelay(
104
      String delayQueueName, String delayQueueChannelName, RqueueMessage rqueueMessage) {
105
    log.debug("AddMessageWithDelay Queue: {}, Message: {}", delayQueueName, rqueueMessage);
1✔
106
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
107
    return scriptExecutor.execute(
1✔
108
        script,
109
        Arrays.asList(delayQueueName, delayQueueChannelName),
1✔
110
        rqueueMessage,
111
        rqueueMessage.getProcessAt(),
1✔
112
        System.currentTimeMillis());
1✔
113
  }
114

115
  @Override
116
  public Flux<Long> addReactiveMessageWithDelay(
117
      String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage) {
118
    log.debug(
1✔
119
        "AddReactiveMessageWithDelay Queue: {}, Message: {}", scheduledQueueName, rqueueMessage);
120
    RedisScript<Long> script = getScript(ScriptType.ENQUEUE_MESSAGE);
1✔
121
    return reactiveScriptExecutor.execute(
1✔
122
        script,
123
        Arrays.asList(scheduledQueueName, scheduledQueueChannelName),
1✔
124
        Arrays.asList(rqueueMessage, rqueueMessage.getProcessAt(), System.currentTimeMillis()));
1✔
125
  }
126

127
  @Override
128
  public Long addMessage(String listName, RqueueMessage rqueueMessage) {
129
    log.debug("AddMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
130
    return rpush(listName, rqueueMessage);
1✔
131
  }
132

133
  @Override
134
  public Mono<Long> addReactiveMessage(String listName, RqueueMessage rqueueMessage) {
135
    log.debug("AddReactiveMessage Queue: {}, Message: {}", listName, rqueueMessage);
1✔
136
    return reactiveRedisTemplate.template().opsForList().rightPush(listName, rqueueMessage);
1✔
137
  }
138

139
  @Override
140
  public Boolean addToZset(String zsetName, RqueueMessage rqueueMessage, long score) {
141
    log.debug("AddToZset Queue: {}, Message: {}", zsetName, rqueueMessage);
1✔
142
    return zadd(zsetName, rqueueMessage, score);
1✔
143
  }
144

145
  @Override
146
  public void moveMessageWithDelay(
147
      String srcZsetName, String tgtZsetName, RqueueMessage src, RqueueMessage tgt, long delay) {
148
    log.debug(
1✔
149
        "MoveMessageWithDelay Src:[Q={},M={}], Dst:[Q={},M={}]",
150
        srcZsetName,
151
        src,
152
        tgtZsetName,
153
        tgt);
154
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_ZSET);
1✔
155
    Long response =
1✔
156
        scriptExecutor.execute(
1✔
157
            script,
158
            Arrays.asList(srcZsetName, tgtZsetName),
1✔
159
            src,
160
            tgt,
161
            System.currentTimeMillis() + delay);
1✔
162
    if (response == null) {
1✔
UNCOV
163
      log.error("Duplicate processing for the message {}", src);
×
164
    }
165
  }
1✔
166

167
  @Override
168
  public void moveMessage(
169
      String srcZsetName, String tgtListName, RqueueMessage src, RqueueMessage tgt) {
170
    log.debug("MoveMessage Src:[Q={},M={}], Dst:[Q={},M={}]", srcZsetName, src, tgtListName, tgt);
1✔
171
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_TO_LIST);
1✔
172
    Long response =
1✔
173
        scriptExecutor.execute(script, Arrays.asList(srcZsetName, tgtListName), src, tgt);
1✔
174
    if (response == null) {
1✔
175
      log.error("Duplicate processing for the message {}", src);
×
176
    }
177
  }
1✔
178

179
  @Override
180
  public List<RqueueMessage> getAllMessages(
181
      String queueName, String processingQueueName, String delayQueueName) {
182
    List<RqueueMessage> messages = lrange(queueName, 0, -1);
1✔
183
    if (CollectionUtils.isEmpty(messages)) {
1✔
184
      messages = new ArrayList<>();
1✔
185
    }
186
    Set<RqueueMessage> messagesInProcessingQueue = zrange(processingQueueName, 0, -1);
1✔
187
    if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) {
1✔
188
      messages.addAll(messagesInProcessingQueue);
1✔
189
    }
190
    if (delayQueueName != null) {
1✔
191
      Set<RqueueMessage> messagesFromZset = zrange(delayQueueName, 0, -1);
1✔
192
      if (!CollectionUtils.isEmpty(messagesFromZset)) {
1✔
193
        messages.addAll(messagesFromZset);
1✔
194
      }
195
    }
196
    return messages;
1✔
197
  }
198

199
  @Override
200
  public Long getScore(String zsetName, RqueueMessage message) {
201
    Double score = redisTemplate.opsForZSet().score(zsetName, message);
1✔
202
    if (score == null) {
1✔
203
      return null;
1✔
204
    }
205
    return score.longValue();
1✔
206
  }
207

208
  @Override
209
  public boolean addScore(String zsetName, RqueueMessage message, long delta) {
210
    return scriptExecutor.execute(
1✔
211
        getScript(ScriptType.SCORE_UPDATER), Collections.singletonList(zsetName), message, delta);
1✔
212
  }
213

214
  private MessageMoveResult moveMessageToList(
215
      String src, String dst, int maxMessage, ScriptType scriptType) {
216
    RedisScript<Long> script = getScript(scriptType);
1✔
217
    long messagesInSrc = maxMessage;
1✔
218
    int remainingMessages = maxMessage;
1✔
219
    while (messagesInSrc > 0 && remainingMessages > 0) {
1✔
220
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
221
      messagesInSrc = scriptExecutor.execute(script, Arrays.asList(src, dst), messageCount);
1✔
222
      remainingMessages -= messageCount;
1✔
223
    }
1✔
224
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
225
  }
226

227
  @Override
228
  public MessageMoveResult moveMessageListToList(
229
      String srcQueueName, String dstQueueName, int numberOfMessage) {
230
    return moveMessageToList(
1✔
231
        srcQueueName, dstQueueName, numberOfMessage, ScriptType.MOVE_MESSAGE_LIST_TO_LIST);
232
  }
233

234
  @Override
235
  public MessageMoveResult moveMessageZsetToList(
236
      String sourceZset, String destinationList, int maxMessage) {
237
    return moveMessageToList(
1✔
238
        sourceZset, destinationList, maxMessage, ScriptType.MOVE_MESSAGE_ZSET_TO_LIST);
239
  }
240

241
  @Override
242
  public MessageMoveResult moveMessageListToZset(
243
      String sourceList, String destinationZset, int maxMessage, long score) {
244
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_LIST_TO_ZSET);
1✔
245
    long messagesInList = maxMessage;
1✔
246
    int remainingMessages = maxMessage;
1✔
247
    while (messagesInList > 0 && remainingMessages > 0) {
1✔
248
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
249
      messagesInList =
1✔
250
          scriptExecutor.execute(
1✔
251
              script, Arrays.asList(sourceList, destinationZset), messageCount, score);
1✔
252
      remainingMessages -= messageCount;
1✔
253
    }
1✔
254
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
255
  }
256

257
  @Override
258
  public MessageMoveResult moveMessageZsetToZset(
259
      String sourceZset,
260
      String destinationZset,
261
      int maxMessage,
262
      long newScore,
263
      boolean fixedScore) {
264
    RedisScript<Long> script = getScript(ScriptType.MOVE_MESSAGE_ZSET_TO_ZSET);
1✔
265
    long messageInZset = maxMessage;
1✔
266
    int remainingMessages = maxMessage;
1✔
267
    while (messageInZset > 0 && remainingMessages > 0) {
1✔
268
      long messageCount = Math.min(remainingMessages, Constants.MAX_MESSAGES);
1✔
269
      messageInZset =
1✔
270
          scriptExecutor.execute(
1✔
271
              script,
272
              Arrays.asList(sourceZset, destinationZset),
1✔
273
              messageCount,
1✔
274
              newScore,
1✔
275
              fixedScore);
1✔
276
      remainingMessages -= messageCount;
1✔
277
    }
1✔
278
    return new MessageMoveResult(maxMessage - remainingMessages, true);
1✔
279
  }
280

281
  @Override
282
  public List<RqueueMessage> readFromZset(String name, long start, long end) {
283
    Set<RqueueMessage> messages = zrange(name, start, end);
1✔
284
    if (messages == null) {
1✔
285
      return new ArrayList<>();
×
286
    }
287
    return new ArrayList<>(messages);
1✔
288
  }
289

290
  @Override
291
  public List<TypedTuple<RqueueMessage>> readFromZsetWithScore(String name, long start, long end) {
292
    Set<TypedTuple<RqueueMessage>> messages = zrangeWithScore(name, start, end);
1✔
293
    if (messages == null) {
1✔
294
      return new ArrayList<>();
×
295
    }
296
    return new ArrayList<>(messages);
1✔
297
  }
298

299
  @Override
300
  public Long scheduleMessage(
301
      String zsetName, String messageId, RqueueMessage rqueueMessage, Long expiryInSeconds) {
302
    RedisScript<Long> script = getScript(ScriptType.SCHEDULE_MESSAGE);
1✔
303
    return scriptExecutor.execute(
1✔
304
        script,
305
        Arrays.asList(messageId, zsetName),
1✔
306
        expiryInSeconds,
307
        rqueueMessage,
308
        rqueueMessage.getProcessAt());
1✔
309
  }
310

311
  @Override
312
  public boolean renameCollection(String srcName, String tgtName) {
313
    rename(srcName, tgtName);
1✔
314
    return true;
1✔
315
  }
316

317
  @Override
318
  public boolean renameCollections(List<String> srcNames, List<String> tgtNames) {
319
    rename(srcNames, tgtNames);
1✔
320
    return true;
1✔
321
  }
322

323
  @Override
324
  public void deleteCollection(String name) {
325
    redisTemplate.delete(name);
1✔
326
  }
1✔
327

328
  @Override
329
  public List<RqueueMessage> readFromList(String name, long start, long end) {
330
    List<RqueueMessage> messages = lrange(name, start, end);
1✔
331
    if (messages == null) {
1✔
332
      return new ArrayList<>();
×
333
    }
334
    return messages;
1✔
335
  }
336

337
  @Override
338
  public RedisTemplate<String, RqueueMessage> getTemplate() {
339
    return super.redisTemplate;
1✔
340
  }
341

342
  @Override
343
  public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
344
    return super.removeFromZset(zsetName, rqueueMessage);
1✔
345
  }
346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc