• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2794

03 Jul 2024 04:24AM UTC coverage: 91.353% (-0.07%) from 91.424%
2794

Pull #231

circleci

Sonu Kumar
javadoc
Pull Request #231: Do not retry

39 of 48 new or added lines in 10 files covered. (81.25%)

4 existing lines in 3 files now uncovered.

5388 of 5898 relevant lines covered (91.35%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.7
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/MessageSweeper.java
1
/*
2
 * Copyright (c) 2021-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core.impl;
18

19
import com.github.sonus21.rqueue.config.RqueueConfig;
20
import com.github.sonus21.rqueue.core.RqueueMessage;
21
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
22
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
23
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
24
import com.github.sonus21.rqueue.listener.QueueDetail;
25
import com.github.sonus21.rqueue.utils.RetryableRunnable;
26
import com.github.sonus21.rqueue.utils.StringUtils;
27
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
28
import java.util.ArrayList;
29
import java.util.Arrays;
30
import java.util.LinkedList;
31
import java.util.List;
32
import java.util.concurrent.ExecutorService;
33
import java.util.concurrent.Executors;
34
import java.util.stream.Collectors;
35
import lombok.AllArgsConstructor;
36
import lombok.Builder;
37
import lombok.ToString;
38
import lombok.extern.slf4j.Slf4j;
39
import org.apache.commons.collections4.ListUtils;
40
import org.springframework.data.redis.connection.DataType;
41
import org.springframework.util.CollectionUtils;
42

43
@Slf4j
1✔
44
public class MessageSweeper {
45

46
  private static MessageSweeper messageSweeper;
47

48
  private final ExecutorService executorService;
49
  private final RqueueMessageTemplate messageTemplate;
50
  private final RqueueMessageMetadataService rqueueMessageMetadataService;
51
  private final RqueueConfig rqueueConfig;
52

53
  private MessageSweeper(
54
      RqueueConfig rqueueConfig,
55
      RqueueMessageTemplate messageTemplate,
56
      RqueueMessageMetadataService rqueueMessageMetadataService) {
1✔
57
    this.rqueueMessageMetadataService = rqueueMessageMetadataService;
1✔
58
    this.executorService = Executors.newSingleThreadExecutor();
1✔
59
    this.messageTemplate = messageTemplate;
1✔
60
    this.rqueueConfig = rqueueConfig;
1✔
61
  }
1✔
62

63
  public static MessageSweeper getInstance(
64
      RqueueConfig rqueueConfig,
65
      RqueueMessageTemplate messageTemplate,
66
      RqueueMessageMetadataService rqueueMessageMetadataDao) {
67
    if (MessageSweeper.messageSweeper == null) {
1✔
68
      synchronized (MessageSweeper.class) {
1✔
69
        if (MessageSweeper.messageSweeper == null) {
1✔
70
          MessageSweeper.messageSweeper =
1✔
71
              new MessageSweeper(rqueueConfig, messageTemplate, rqueueMessageMetadataDao);
72
          return MessageSweeper.messageSweeper;
1✔
73
        }
74
        return MessageSweeper.messageSweeper;
×
75
      }
76
    }
77
    return MessageSweeper.messageSweeper;
1✔
78
  }
79

80
  public boolean deleteAllMessages(MessageDeleteRequest request) {
81
    log.debug("MessageDeleteRequest {}", request);
1✔
82
    if (!request.isValid()) {
1✔
83
      throw new IllegalArgumentException("Message request is not valid");
1✔
84
    }
85
    List<DeleteJobData> deleteJobData = new ArrayList<>();
1✔
86
    QueueDetail detail = request.queueDetail;
1✔
87
    if (detail != null) {
1✔
88
      String newQueueName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
89
      String newScheduledZsetName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
90
      String newProcessingZsetName = rqueueConfig.getDelDataName(detail.getQueueName());
1✔
91
      messageTemplate.renameCollections(
1✔
92
          Arrays.asList(
1✔
93
              detail.getQueueName(),
1✔
94
              detail.getScheduledQueueName(),
1✔
95
              detail.getProcessingQueueName()),
1✔
96
          Arrays.asList(newQueueName, newScheduledZsetName, newProcessingZsetName));
1✔
97
      deleteJobData.add(new DeleteJobData(newQueueName, DataType.LIST));
1✔
98
      deleteJobData.add(new DeleteJobData(newScheduledZsetName, DataType.ZSET));
1✔
99
      deleteJobData.add(new DeleteJobData(newProcessingZsetName, DataType.ZSET));
1✔
100
    } else {
1✔
101
      switch (request.dataType) {
1✔
102
        case LIST:
103
          DeleteJobData data =
1✔
104
              new DeleteJobData(rqueueConfig.getDelDataName(request.dataName), request.dataType);
1✔
105
          messageTemplate.renameCollection(request.dataName, data.name);
1✔
106
          deleteJobData.add(data);
1✔
107
          break;
1✔
108
        case ZSET:
109
          data = new DeleteJobData(rqueueConfig.getDelDataName(request.dataName), request.dataType);
1✔
110
          messageTemplate.renameCollection(request.dataName, data.name);
1✔
111
          deleteJobData.add(data);
1✔
112
          break;
1✔
113
        default:
114
          throw new UnknownSwitchCase(request.dataType.code());
×
115
      }
116
    }
117
    if (!CollectionUtils.isEmpty(deleteJobData)) {
1✔
118
      if (detail != null) {
1✔
119
        executorService.submit(new MessageDeleteJob(deleteJobData, detail.getName()));
1✔
120
      } else {
121
        executorService.submit(new MessageDeleteJob(deleteJobData, request.queueName));
1✔
122
      }
123
    }
124
    return true;
1✔
125
  }
126

127
  @AllArgsConstructor
128
  private static class DeleteJobData {
129

130
    private final String name;
131
    private final DataType type;
132
  }
133

134
  @Builder
135
  @ToString
136
  public static class MessageDeleteRequest {
137

138
    private final QueueDetail queueDetail;
139
    private final String dataName;
140
    private final String queueName;
141
    private final DataType dataType;
142

143
    private boolean isValid() {
144
      if (queueDetail != null) {
1✔
145
        return true;
1✔
146
      }
147
      return !StringUtils.isEmpty(dataName)
1✔
148
          && !StringUtils.isEmpty(queueName)
1✔
149
          && Arrays.asList(DataType.LIST, DataType.ZSET).contains(dataType);
1✔
150
    }
151
  }
152

153
  private class MessageDeleteJob extends RetryableRunnable<DeleteJobData> {
154

155
    private static final int batchSize = 1000;
156
    private final String queueName;
157

158
    MessageDeleteJob(List<DeleteJobData> jobData, String queueName) {
1✔
159
      super(log, null, jobData.iterator());
1✔
160
      this.queueName = queueName;
1✔
161
    }
1✔
162

163
    private List<String> getMessageIdFromList(String queueName) {
164
      long offset = 0;
1✔
165
      List<String> ids = new LinkedList<>();
1✔
166
      while (true) {
167
        List<RqueueMessage> rqueueMessageList =
1✔
168
            messageTemplate.readFromList(queueName, offset, batchSize);
1✔
169
        if (!CollectionUtils.isEmpty(rqueueMessageList)) {
1✔
170
          for (RqueueMessage rqueueMessage : rqueueMessageList) {
1✔
171
            ids.add(rqueueMessage.getId());
1✔
172
          }
1✔
173
        }
174
        if (CollectionUtils.isEmpty(rqueueMessageList) || rqueueMessageList.size() < batchSize) {
1✔
175
          break;
1✔
176
        }
UNCOV
177
        offset += batchSize;
×
UNCOV
178
      }
×
179
      return ids;
1✔
180
    }
181

182
    private List<String> getMessageIdFromZset(String zsetName) {
183
      List<String> ids = new LinkedList<>();
1✔
184
      List<RqueueMessage> rqueueMessageList = messageTemplate.readFromZset(zsetName, 0, -1);
1✔
185
      if (!CollectionUtils.isEmpty(rqueueMessageList)) {
1✔
186
        for (RqueueMessage rqueueMessage : rqueueMessageList) {
1✔
187
          ids.add(rqueueMessage.getId());
1✔
188
        }
1✔
189
      }
190
      return ids;
1✔
191
    }
192

193
    private List<String> getMessageIds(DeleteJobData data) {
194
      if (data.type == DataType.LIST) {
1✔
195
        return getMessageIdFromList(data.name);
1✔
196
      }
197
      return getMessageIdFromZset(data.name);
1✔
198
    }
199

200
    public void delete(DeleteJobData data) {
201
      for (List<String> subIds : ListUtils.partition(getMessageIds(data), batchSize)) {
1✔
202
        List<String> messageMetaIds =
1✔
203
            subIds.stream()
1✔
204
                .map(e -> RqueueMessageUtils.getMessageMetaId(queueName, e))
1✔
205
                .collect(Collectors.toList());
1✔
206
        rqueueMessageMetadataService.deleteAll(messageMetaIds);
1✔
207
        log.debug("Deleted {} messages meta", messageMetaIds.size());
1✔
208
      }
1✔
209
      messageTemplate.deleteCollection(data.name);
1✔
210
    }
1✔
211

212
    @Override
213
    public void consume(DeleteJobData data) {
214
      delete(data);
1✔
215
    }
1✔
216
  }
217
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc