• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JaidenAshmore / java-dynamic-sqs-listener / #2121

05 Oct 2024 07:45AM UTC coverage: 96.244% (-0.09%) from 96.332%
#2121

push

github

web-flow
refs #408: upgrade dependencies (#409)

3 of 3 new or added lines in 3 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

2178 of 2263 relevant lines covered (96.24%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/core/src/main/java/com/jashmore/sqs/resolver/batching/BatchingMessageResolver.java
1
package com.jashmore.sqs.resolver.batching;
2

3
import static com.jashmore.sqs.aws.AwsConstants.MAX_NUMBER_OF_MESSAGES_IN_BATCH;
4

5
import com.jashmore.documentation.annotations.ThreadSafe;
6
import com.jashmore.sqs.QueueProperties;
7
import com.jashmore.sqs.resolver.MessageResolver;
8
import com.jashmore.sqs.util.collections.QueueUtils;
9
import com.jashmore.sqs.util.thread.ThreadUtils;
10
import java.time.Duration;
11
import java.util.AbstractMap;
12
import java.util.ArrayList;
13
import java.util.LinkedList;
14
import java.util.List;
15
import java.util.Map;
16
import java.util.concurrent.BlockingQueue;
17
import java.util.concurrent.CompletableFuture;
18
import java.util.concurrent.ExecutionException;
19
import java.util.concurrent.ExecutorService;
20
import java.util.concurrent.Executors;
21
import java.util.concurrent.LinkedBlockingQueue;
22
import java.util.stream.Collectors;
23
import lombok.AllArgsConstructor;
24
import lombok.Value;
25
import lombok.extern.slf4j.Slf4j;
26
import software.amazon.awssdk.core.exception.SdkInterruptedException;
27
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
28
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
29
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
30
import software.amazon.awssdk.services.sqs.model.Message;
31

32
/**
33
 * {@link MessageResolver} that will batch the deletions of messages into a group to reduce the amount of messages that are being sent to SQS queue.
34
 *
35
 * <p>This uses a {@link BlockingQueue} to store all of the messages that need to be resolved and once the timeout provided by
36
 * {@link BatchingMessageResolverProperties#getBufferingTime()} is reached or the number of messages goes above
37
 * {@link BatchingMessageResolverProperties#getBufferingSizeLimit()}, the messages are sent out to be deleted.
38
 */
39
@Slf4j
1✔
40
@ThreadSafe
41
public class BatchingMessageResolver implements MessageResolver {
42

43
    private final QueueProperties queueProperties;
44
    private final SqsAsyncClient sqsAsyncClient;
45
    private final BatchingMessageResolverProperties properties;
46

47
    private final BlockingQueue<MessageResolutionBean> messagesToBeResolved;
48

49
    /**
50
     * Builds a {@link BatchingMessageResolver} that will perform a deletion of a message every time a single message is received.
51
     *
52
     * @param queueProperties details about the queue that the arguments will be resolved for
53
     * @param sqsAsyncClient  the client for connecting to the SQS queue
54
     */
55
    public BatchingMessageResolver(final QueueProperties queueProperties, final SqsAsyncClient sqsAsyncClient) {
56
        this(
1✔
57
            queueProperties,
58
            sqsAsyncClient,
59
            StaticBatchingMessageResolverProperties.builder().bufferingSizeLimit(1).bufferingTime(Duration.ofHours(1)).build()
1✔
60
        );
61
    }
1✔
62

63
    /**
64
     * Builds a {@link BatchingMessageResolver} with the provided properties.
65
     *
66
     * @param queueProperties details about the queue that the arguments will be resolved for
67
     * @param sqsAsyncClient  the client for connecting to the SQS queue
68
     * @param properties      configuration properties for this resolver
69
     */
70
    public BatchingMessageResolver(
71
        final QueueProperties queueProperties,
72
        final SqsAsyncClient sqsAsyncClient,
73
        final BatchingMessageResolverProperties properties
74
    ) {
1✔
75
        this.queueProperties = queueProperties;
1✔
76
        this.sqsAsyncClient = sqsAsyncClient;
1✔
77
        this.properties = properties;
1✔
78

79
        this.messagesToBeResolved = new LinkedBlockingQueue<>();
1✔
80
    }
1✔
81

82
    @Override
83
    public CompletableFuture<?> resolveMessage(final Message message) {
84
        final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
1✔
85
        messagesToBeResolved.add(new MessageResolutionBean(message, completableFuture));
1✔
86
        return completableFuture;
1✔
87
    }
88

89
    @Override
90
    public void run() {
91
        log.info("Started MessageResolver background thread");
1✔
92
        boolean continueProcessing = true;
1✔
93
        final ExecutorService executorService = buildExecutorServiceForSendingBatchDeletion();
1✔
94
        // all of the batches currently being sent so that they can be waited on during shutdown
95
        final List<CompletableFuture<?>> batchesBeingPublished = new ArrayList<>();
1✔
96
        while (!Thread.currentThread().isInterrupted() && continueProcessing) {
1✔
97
            final List<MessageResolutionBean> batchOfMessagesToResolve = new LinkedList<>();
1✔
98
            try {
99
                final int batchSize = getBatchSize();
1✔
100
                final Duration bufferingTime = properties.getBufferingTime();
1✔
101
                log.trace("Waiting {}ms for {} messages to be submitted for deletion", bufferingTime.toMillis(), batchSize);
1✔
102
                QueueUtils.drain(messagesToBeResolved, batchOfMessagesToResolve, batchSize, bufferingTime);
1✔
103
            } catch (final InterruptedException interruptedException) {
1✔
104
                log.info("Shutting down MessageResolver");
1✔
105
                // Do nothing, we still want to send the current batch of messages
106
                continueProcessing = false;
1✔
107
            }
1✔
108

109
            if (!batchOfMessagesToResolve.isEmpty()) {
1✔
110
                log.debug("Sending batch deletion for {} messages", batchOfMessagesToResolve.size());
1✔
111
                final CompletableFuture<?> completableFuture = submitMessageDeletionBatch(batchOfMessagesToResolve, executorService);
1✔
112
                batchesBeingPublished.add(completableFuture);
1✔
113
                completableFuture.whenComplete((response, throwable) -> batchesBeingPublished.remove(completableFuture));
1✔
114
            }
115
        }
1✔
116
        try {
117
            log.debug("Waiting for {} batches to complete", batchesBeingPublished.size());
1✔
118
            CompletableFuture.allOf(batchesBeingPublished.toArray(new CompletableFuture<?>[0])).get();
1✔
119
            executorService.shutdownNow();
1✔
120
            log.info("MessageResolver has been successfully stopped");
1✔
UNCOV
121
        } catch (final InterruptedException interruptedException) {
×
UNCOV
122
            log.warn("Thread interrupted while waiting for message batches to be completed");
×
UNCOV
123
            Thread.currentThread().interrupt();
×
124
        } catch (final ExecutionException executionException) {
1✔
125
            log.error("Error waiting for all message batches to be published", executionException.getCause());
1✔
126
        }
1✔
127
    }
1✔
128

129
    /**
130
     * Build the {@link ExecutorService} to send the batch message delete messages.
131
     *
132
     * <p>This is needed because when a thread is interrupted while using the {@link SqsAsyncClient} a {@link SdkInterruptedException} is thrown which is
133
     * ultimately not what we want. We instead want to know that this has been done and wait for the delete requests to eventually finish. Therefore,
134
     * running it on extra threads provides this extra safety.
135
     *
136
     * <p>The extra service also allows for multiple batches to be sent concurrently.
137
     *
138
     * @return the service for running message deletion on a separate thread
139
     */
140
    private ExecutorService buildExecutorServiceForSendingBatchDeletion() {
141
        return Executors.newCachedThreadPool(ThreadUtils.multiNamedThreadFactory(Thread.currentThread().getName() + "-batch-delete"));
1✔
142
    }
143

144
    /**
145
     * Safely get the batch size for the number of messages to resolve as AWS has a limit for how many messages can be sent at once.
146
     *
147
     * @return the number of messages that should be resolved in a single batch
148
     */
149
    private int getBatchSize() {
150
        final int bufferingSizeLimit = properties.getBufferingSizeLimit();
1✔
151
        if (bufferingSizeLimit < 1) {
1✔
152
            return 1;
×
153
        }
154
        return Math.min(bufferingSizeLimit, MAX_NUMBER_OF_MESSAGES_IN_BATCH);
1✔
155
    }
156

157
    /**
158
     * Submit the batch of messages to be resolved asynchronously.
159
     *
160
     * <p>When the batch is completed successfully (or unsuccessfully), the futures for each message will be completed.
161
     *
162
     * @param batchOfMessagesToResolve the messages to resolve
163
     */
164
    private CompletableFuture<?> submitMessageDeletionBatch(
165
        final List<MessageResolutionBean> batchOfMessagesToResolve,
166
        final ExecutorService executorService
167
    ) {
168
        final Map<String, CompletableFuture<Object>> messageCompletableFutures = batchOfMessagesToResolve
1✔
169
            .stream()
1✔
170
            .map(bean -> new AbstractMap.SimpleImmutableEntry<>(bean.getMessage().messageId(), bean.getCompletableFuture()))
1✔
171
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
1✔
172

173
        return CompletableFuture
1✔
174
            .supplyAsync(() -> buildBatchDeleteMessageRequest(batchOfMessagesToResolve))
1✔
175
            .thenComposeAsync(sqsAsyncClient::deleteMessageBatch, executorService)
1✔
176
            .whenComplete((response, exception) -> {
1✔
177
                if (exception != null) {
1✔
178
                    log.error("Error deleting messages", exception);
1✔
179

180
                    messageCompletableFutures.values().forEach(completableFuture -> completableFuture.completeExceptionally(exception));
1✔
181
                    return;
1✔
182
                }
183

184
                log.debug("{} messages successfully deleted, {} failed", response.successful().size(), response.failed().size());
1✔
185

186
                response
1✔
187
                    .successful()
1✔
188
                    .stream()
1✔
189
                    .map(entry -> messageCompletableFutures.remove(entry.id()))
1✔
190
                    .forEach(completableFuture -> completableFuture.complete("completed"));
1✔
191

192
                response
1✔
193
                    .failed()
1✔
194
                    .forEach(entry -> {
1✔
195
                        final CompletableFuture<?> completableFuture = messageCompletableFutures.remove(entry.id());
1✔
196
                        completableFuture.completeExceptionally(new RuntimeException(entry.message()));
1✔
197
                    });
1✔
198

199
                if (!messageCompletableFutures.isEmpty()) {
1✔
200
                    log.error(
1✔
201
                        "{} messages were not handled in the deletion. This could be a bug in the AWS SDK",
202
                        messageCompletableFutures.size()
1✔
203
                    );
204
                    messageCompletableFutures
1✔
205
                        .values()
1✔
206
                        .forEach(completableFuture ->
1✔
207
                            completableFuture.completeExceptionally(
1✔
208
                                new RuntimeException("Message not handled by batch delete. This should not happen")
209
                            )
210
                        );
211
                }
212
            });
1✔
213
    }
214

215
    private DeleteMessageBatchRequest buildBatchDeleteMessageRequest(final List<MessageResolutionBean> batchOfMessagesToResolve) {
216
        return DeleteMessageBatchRequest
1✔
217
            .builder()
1✔
218
            .queueUrl(queueProperties.getQueueUrl())
1✔
219
            .entries(
1✔
220
                batchOfMessagesToResolve
221
                    .stream()
1✔
222
                    .map(MessageResolutionBean::getMessage)
1✔
223
                    .map(messageToDelete ->
1✔
224
                        DeleteMessageBatchRequestEntry
225
                            .builder()
1✔
226
                            .id(messageToDelete.messageId())
1✔
227
                            .receiptHandle(messageToDelete.receiptHandle())
1✔
228
                            .build()
1✔
229
                    )
230
                    .collect(Collectors.toSet())
1✔
231
            )
232
            .build();
1✔
233
    }
234

235
    /**
236
     * Internal bean used for storing the message to be resolved in the internal queue.
237
     */
238
    @Value
239
    @AllArgsConstructor
240
    private static class MessageResolutionBean {
241

242
        /**
243
         * The message to be resolved.
244
         */
245
        Message message;
246
        /**
247
         * The future that should be resolved when the message is successfully or unsuccessfully deleted.
248
         */
249
        CompletableFuture<Object> completableFuture;
250
    }
251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc