• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2714

19 Mar 2024 10:20AM UTC coverage: 91.492% (+0.3%) from 91.237%
2714

Pull #221

circleci

web-flow
Update Pebble version
Pull Request #221: Update Pebble to fix CVE-2022-37767

5377 of 5877 relevant lines covered (91.49%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.34
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageManagerImpl.java
1
/*
2
 * Copyright (c) 2020-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import static org.springframework.util.Assert.isTrue;
20
import static org.springframework.util.Assert.notNull;
21

22
import com.github.sonus21.rqueue.common.RqueueLockManager;
23
import com.github.sonus21.rqueue.core.EndpointRegistry;
24
import com.github.sonus21.rqueue.core.RqueueMessage;
25
import com.github.sonus21.rqueue.core.RqueueMessageManager;
26
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
27
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
28
import com.github.sonus21.rqueue.exception.LockCanNotBeAcquired;
29
import com.github.sonus21.rqueue.listener.QueueDetail;
30
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
31
import com.github.sonus21.rqueue.models.MessageMoveResult;
32
import com.github.sonus21.rqueue.models.db.MessageMetadata;
33
import com.github.sonus21.rqueue.utils.Constants;
34
import java.time.Duration;
35
import java.util.ArrayList;
36
import java.util.List;
37
import java.util.UUID;
38
import lombok.extern.slf4j.Slf4j;
39
import org.springframework.beans.factory.annotation.Autowired;
40
import org.springframework.messaging.Message;
41
import org.springframework.messaging.MessageHeaders;
42
import org.springframework.messaging.converter.MessageConverter;
43
import org.springframework.messaging.support.MessageBuilder;
44

45
@Slf4j
1✔
46
public class RqueueMessageManagerImpl extends BaseMessageSender implements RqueueMessageManager {
47

48
  @Autowired
49
  private RqueueLockManager rqueueLockManager;
50

51
  public RqueueMessageManagerImpl(
52
      RqueueMessageTemplate messageTemplate,
53
      MessageConverter messageConverter,
54
      MessageHeaders messageHeaders) {
55
    super(messageTemplate, messageConverter, messageHeaders);
1✔
56
  }
1✔
57

58
  @Override
59
  public boolean deleteAllMessages(String queueName) {
60
    QueueDetail queueDetail = EndpointRegistry.get(queueName);
1✔
61
    try {
62
      deleteAllMessages(queueDetail);
1✔
63
      return true;
1✔
64
    } catch (Exception e) {
×
65
      log.error("Delete all message failed", e);
×
66
      return false;
×
67
    }
68
  }
69

70
  @Override
71
  public List<Object> getAllMessages(String queueName) {
72
    List<Object> messages = new ArrayList<>();
1✔
73
    for (RqueueMessage message : getAllRqueueMessage(queueName)) {
1✔
74
      messages.add(RqueueMessageUtils.convertMessageToObject(message, messageConverter));
1✔
75
    }
1✔
76
    return messages;
1✔
77
  }
78

79
  @Override
80
  public Object getMessage(String queueName, String id) {
81
    RqueueMessage rqueueMessage = getRqueueMessage(queueName, id);
1✔
82
    if (rqueueMessage == null) {
1✔
83
      return null;
1✔
84
    }
85
    Message<String> message =
1✔
86
        MessageBuilder.createMessage(
1✔
87
            rqueueMessage.getMessage(), RqueueMessageHeaders.emptyMessageHeaders());
1✔
88
    return messageConverter.fromMessage(message, null);
1✔
89
  }
90

91
  @Override
92
  public RqueueMessage getRqueueMessage(String queueName, String id) {
93
    MessageMetadata messageMetadata = rqueueMessageMetadataService.getByMessageId(queueName, id);
1✔
94
    if (messageMetadata == null) {
1✔
95
      return null;
1✔
96
    }
97
    return messageMetadata.getRqueueMessage();
1✔
98
  }
99

100
  @Override
101
  public List<RqueueMessage> getAllRqueueMessage(String queueName) {
102
    QueueDetail queueDetail = EndpointRegistry.get(queueName);
1✔
103
    return messageTemplate.getAllMessages(
1✔
104
        queueDetail.getQueueName(),
1✔
105
        queueDetail.getProcessingQueueName(),
1✔
106
        queueDetail.getScheduledQueueName());
1✔
107
  }
108

109
  @Override
110
  public boolean exist(String queueName, String id) {
111
    String lockValue = UUID.randomUUID().toString();
1✔
112
    if (rqueueLockManager.acquireLock(queueName, lockValue, Duration.ofSeconds(1))) {
1✔
113
      boolean exist = getMessage(queueName, id) != null;
1✔
114
      rqueueLockManager.releaseLock(queueName, lockValue);
1✔
115
      return exist;
1✔
116
    }
117
    throw new LockCanNotBeAcquired(queueName);
1✔
118
  }
119

120
  @Override
121
  public boolean deleteMessage(String queueName, String id) {
122
    RqueueMessage rqueueMessage = getRqueueMessage(queueName, id);
1✔
123
    if (rqueueMessage == null) {
1✔
124
      return false;
1✔
125
    }
126
    Duration duration = rqueueConfig.getMessageDurability(rqueueMessage.getPeriod());
1✔
127
    return rqueueMessageMetadataService.deleteMessage(queueName, id, duration);
1✔
128
  }
129

130
  @Override
131
  public MessageConverter getMessageConverter() {
132
    return messageConverter;
1✔
133
  }
134

135
  @Override
136
  public boolean moveMessageFromDeadLetterToQueue(
137
      String deadLetterQueueName, String queueName, Integer maxMessages) {
138
    return moveMessageListToList(deadLetterQueueName, queueName, maxMessages).isSuccess();
1✔
139
  }
140

141
  @Override
142
  public boolean moveMessageFromDeadLetterToQueue(String deadLetterQueueName, String queueName) {
143
    return moveMessageListToList(deadLetterQueueName, queueName, null).isSuccess();
1✔
144
  }
145

146
  private MessageMoveResult moveMessageListToList(
147
      String sourceQueue, String destinationQueue, Integer maxMessage) {
148
    notNull(sourceQueue, "sourceQueue must not be null");
1✔
149
    notNull(destinationQueue, "destinationQueue must not be null");
1✔
150
    isTrue(
1✔
151
        !sourceQueue.equals(destinationQueue),
1✔
152
        "sourceQueue and destinationQueue must be different");
153
    Integer messageCount = maxMessage;
1✔
154
    if (messageCount == null) {
1✔
155
      messageCount = Constants.MAX_MESSAGES;
1✔
156
    }
157
    isTrue(messageCount > 0, "maxMessage must be greater than zero");
1✔
158
    return messageTemplate.moveMessageListToList(sourceQueue, destinationQueue, messageCount);
1✔
159
  }
160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc