• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JaidenAshmore / java-dynamic-sqs-listener / #2013

pending completion
#2013

push

github-actions

web-flow
Update brave monorepo to v5.15.1

2180 of 2263 relevant lines covered (96.33%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.39
/core/src/main/java/com/jashmore/sqs/broker/grouping/GroupingMessageBroker.java
1
package com.jashmore.sqs.broker.grouping;
2

3
import com.jashmore.documentation.annotations.Nullable;
4
import com.jashmore.documentation.annotations.Positive;
5
import com.jashmore.documentation.annotations.PositiveOrZero;
6
import com.jashmore.sqs.broker.MessageBroker;
7
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
8
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
9
import com.jashmore.sqs.util.properties.PropertyUtils;
10
import java.time.Duration;
11
import java.util.Deque;
12
import java.util.HashMap;
13
import java.util.HashSet;
14
import java.util.LinkedHashMap;
15
import java.util.LinkedList;
16
import java.util.Map;
17
import java.util.Optional;
18
import java.util.Queue;
19
import java.util.Set;
20
import java.util.concurrent.CompletableFuture;
21
import java.util.concurrent.CompletionException;
22
import java.util.concurrent.ExecutorService;
23
import java.util.concurrent.locks.ReentrantLock;
24
import java.util.function.BooleanSupplier;
25
import java.util.function.Function;
26
import java.util.function.Supplier;
27
import lombok.extern.slf4j.Slf4j;
28
import software.amazon.awssdk.services.sqs.model.Message;
29
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
30

31
/**
32
 * {@link MessageBroker} that concurrently processes messages that are grouped by a certain criteria, like the
33
 * {@link MessageSystemAttributeName#MESSAGE_GROUP_ID} of a FIFO queue.
34
 *
35
 * <p>The following properties of a {@link GroupingMessageBroker} are:
36
 * <ul>
37
 *     <li>no two messages in the same message group can be processed at the same time</li>
38
 *     <li>each message in the message group must be processed in order that they are received from SQS</li>
39
 *     <li>there is a limit for the number of messages to process at the same time but can change during processing</li>
40
 *     <li>will strive to have as many messages processing as possible</li>
41
 *     <li>attempts to process a new message as soon as another has finished to maintain concurrency</li>
42
 *     <li>when a message is retrieved it should request another one straight away</li>
43
 *     <li>on message processing failure the rest of the messages in the message group can be removed when configured to</li>
44
 * </ul>
45
 *
46
 * <p>The concurrency rate will be recalculated when more messages are obtained or a message has finished processing. For example, if the current concurrency
47
 * rate is 5 and when it recalculates the concurrency it is now 6 it will allow another thread to process messages. However,
48
 * if the rate of concurrency decreases it will wait until a certain number of messages finish processing before requesting more messages. Due to this it
49
 * may take a slight delay for the concurrency rate to change.
50
 *
51
 * @see GroupingMessageBrokerProperties for how to configure this broker
52
 */
53
@Slf4j
1✔
54
public class GroupingMessageBroker implements MessageBroker {
55

56
    private final GroupingMessageBrokerProperties properties;
57
    private final ConcurrentMessageBroker concurrentMessageBroker;
58
    private final ReentrantLock reentrantLock = new ReentrantLock();
1✔
59

60
    /**
61
     * Contains all of the messages that have been received but cannot be processed because there is not a thread to take it or there is currently already
62
     * a message in the same message group being processed.
63
     */
64
    private final Map<String, Queue<Message>> internalMessageCache = new LinkedHashMap<>();
1✔
65

66
    /**
67
     * All of the requests for messages.
68
     *
69
     * <p>This is needing to be stored to make sure that we cancel them when we are stopping this broker.
70
     */
71
    private final Set<CompletableFuture<Message>> actualMessageRequests = new HashSet<>();
1✔
72

73
    /**
74
     * Requests for messages by the {@link #concurrentMessageBroker} which indicates that another thread can begin processing a new message.
75
     */
76
    private final Deque<CompletableFuture<Message>> delegateBrokerRequestsForMessages = new LinkedList<>();
1✔
77

78
    /**
79
     * Stores all of the message groups that are currently running so that two messages with the same group are not executed at the same time.
80
     */
81
    private final Set<String> messageGroupsCurrentlyProcessing = new HashSet<>();
1✔
82

83
    /**
84
     * Stores a map of the messages that failed to process from the message's message group ID to the time that the message failed to be processed.
85
     */
86
    private final Map<String, Long> failingMessages = new HashMap<>();
1✔
87

88
    public GroupingMessageBroker(final GroupingMessageBrokerProperties properties) {
1✔
89
        this.properties = properties;
1✔
90
        this.concurrentMessageBroker =
1✔
91
            new ConcurrentMessageBroker(
92
                new ConcurrentMessageBrokerProperties() {
1✔
93
                    @Override
94
                    public @PositiveOrZero int getConcurrencyLevel() {
95
                        return properties.getConcurrencyLevel();
1✔
96
                    }
97

98
                    @Override
99
                    public @Nullable @Positive Duration getConcurrencyPollingRate() {
100
                        return properties.getConcurrencyPollingRate();
1✔
101
                    }
102

103
                    @Override
104
                    public @Nullable @PositiveOrZero Duration getErrorBackoffTime() {
105
                        return properties.getErrorBackoffTime();
×
106
                    }
107
                }
108
            );
109
    }
1✔
110

111
    @Override
112
    public void processMessages(
113
        final ExecutorService messageProcessingExecutorService,
114
        final BooleanSupplier keepProcessingMessages,
115
        final Supplier<CompletableFuture<Message>> messageSupplier,
116
        final Function<Message, CompletableFuture<?>> messageProcessor
117
    ) throws InterruptedException {
118
        log.debug("Beginning processing of messages");
1✔
119
        normalProcessingOfMessage(messageProcessingExecutorService, keepProcessingMessages, messageSupplier, messageProcessor);
1✔
120
        cancelAllRequestsForMessages();
1✔
121
        log.debug("Ending processing of messages");
1✔
122

123
        if (properties.processCachedMessagesOnShutdown()) {
1✔
124
            log.debug("Beginning processing of internally cached messages");
1✔
125
            processInternallyCachedMessages(messageProcessingExecutorService, messageSupplier, messageProcessor);
1✔
126
            log.debug("Ending processing of internally cached messages");
1✔
127
        }
128
    }
1✔
129

130
    private void normalProcessingOfMessage(
131
        final ExecutorService messageProcessingExecutorService,
132
        final BooleanSupplier keepProcessingMessages,
133
        final Supplier<CompletableFuture<Message>> messageSupplier,
134
        final Function<Message, CompletableFuture<?>> messageProcessor
135
    ) {
136
        try {
137
            concurrentMessageBroker.processMessages(
1✔
138
                messageProcessingExecutorService,
139
                keepProcessingMessages,
140
                wrapMessageSupplier(messageSupplier, true),
1✔
141
                wrapMessageProcessor(messageProcessor)
1✔
142
            );
143
        } catch (InterruptedException interruptedException) {
1✔
144
            // do nothing
145
        }
×
146
    }
1✔
147

148
    private void processInternallyCachedMessages(
149
        final ExecutorService messageProcessingExecutorService,
150
        final Supplier<CompletableFuture<Message>> messageSupplier,
151
        final Function<Message, CompletableFuture<?>> messageProcessor
152
    ) throws InterruptedException {
153
        concurrentMessageBroker.processMessages(
1✔
154
            messageProcessingExecutorService,
155
            () -> !internalMessageCache.isEmpty(),
1✔
156
            wrapMessageSupplier(messageSupplier, false),
1✔
157
            wrapMessageProcessor(messageProcessor)
1✔
158
        );
159
    }
1✔
160

161
    private void cancelAllRequestsForMessages() {
162
        reentrantLock.lock();
1✔
163
        try {
164
            actualMessageRequests.forEach(future -> future.cancel(true));
1✔
165
        } finally {
166
            reentrantLock.unlock();
1✔
167
        }
168
    }
1✔
169

170
    /**
171
     * Wraps the actual message processor with logic to handle message groups that are current processing as well as handling any failing messages.
172
     *
173
     * <p>It will attempt to run more message processing if possible.
174
     *
175
     * @param messageProcessor the actual message processor to process the message
176
     * @return a wrapped message processor
177
     */
178
    private Function<Message, CompletableFuture<?>> wrapMessageProcessor(final Function<Message, CompletableFuture<?>> messageProcessor) {
179
        return message -> {
1✔
180
            final String messageGroupKey = properties.messageGroupingFunction().apply(message);
1✔
181
            return messageProcessor
1✔
182
                .apply(message)
1✔
183
                .handle((ignored, throwable) -> {
1✔
184
                    reentrantLock.lock();
1✔
185
                    try {
186
                        if (throwable != null) {
1✔
187
                            final Throwable actualThrowable;
188
                            if (throwable instanceof CompletionException) {
1✔
189
                                actualThrowable = throwable.getCause();
1✔
190
                            } else {
191
                                actualThrowable = throwable;
1✔
192
                            }
193
                            log.error("Error processing message", actualThrowable);
1✔
194
                            if (properties.purgeExtraMessagesInGroupOnError()) {
1✔
195
                                failingMessages.put(messageGroupKey, System.currentTimeMillis());
1✔
196
                                internalMessageCache.remove(messageGroupKey);
1✔
197
                            }
198
                        }
199
                        messageGroupsCurrentlyProcessing.remove(properties.messageGroupingFunction().apply(message));
1✔
200

201
                        tryProcessAnotherMessage();
1✔
202
                    } finally {
203
                        reentrantLock.unlock();
1✔
204
                    }
205
                    return null;
1✔
206
                });
207
        };
208
    }
209

210
    /**
211
     * Wraps the message supplier with logic to internally cache messages.
212
     *
213
     * @param messageSupplier the actual message supplier
214
     * @param withMessageRetrieval whether requesting a message should actually call out to the delegate for the message
215
     * @return the wrapped message supplier
216
     */
217
    private Supplier<CompletableFuture<Message>> wrapMessageSupplier(
218
        final Supplier<CompletableFuture<Message>> messageSupplier,
219
        final boolean withMessageRetrieval
220
    ) {
221
        return () -> {
1✔
222
            reentrantLock.lock();
1✔
223
            try {
224
                if (delegateBrokerRequestsForMessages.isEmpty()) {
1✔
225
                    final Optional<Message> optionalCachedMessage = getInternalCachedMessageAvailableForProcessing();
1✔
226
                    if (optionalCachedMessage.isPresent()) {
1✔
227
                        final Message message = optionalCachedMessage.get();
1✔
228
                        final String messageGroupKey = properties.messageGroupingFunction().apply(message);
1✔
229
                        messageGroupsCurrentlyProcessing.add(messageGroupKey);
1✔
230
                        return CompletableFuture.completedFuture(message);
1✔
231
                    }
232
                }
233

234
                final CompletableFuture<Message> messageRequest = new CompletableFuture<>();
1✔
235

236
                delegateBrokerRequestsForMessages.addLast(messageRequest);
1✔
237

238
                if (withMessageRetrieval) {
1✔
239
                    performMessageRetrieval(messageSupplier);
1✔
240
                }
241

242
                return messageRequest;
1✔
243
            } finally {
244
                reentrantLock.unlock();
1✔
245
            }
246
        };
247
    }
248

249
    /**
250
     * Actually attempt to get a new message depending on whether too many messages will be cached if this is successfully retrieved.
251
     *
252
     * @param messageSupplier the delegate message supplier
253
     */
254
    private void performMessageRetrieval(final Supplier<CompletableFuture<Message>> messageSupplier) {
255
        reentrantLock.lock();
1✔
256
        try {
257
            if (internalMessageCache.size() + actualMessageRequests.size() < getMaximumNumberOfCachedMessageGroups()) {
1✔
258
                final CompletableFuture<Message> messageRetrievalFuture = messageSupplier.get();
1✔
259
                actualMessageRequests.add(messageRetrievalFuture);
1✔
260

261
                messageRetrievalFuture.thenAccept(message -> {
1✔
262
                    final String messageGroupKey = properties.messageGroupingFunction().apply(message);
1✔
263
                    reentrantLock.lock();
1✔
264
                    try {
265
                        final boolean messageWithSameGroupFailedInShortPeriod = failingMessages
1✔
266
                            .entrySet()
1✔
267
                            .stream()
1✔
268
                            .filter(entry -> System.currentTimeMillis() - entry.getValue() < Duration.ofSeconds(1).toMillis())
1✔
269
                            .anyMatch(entry -> entry.getKey().equals(messageGroupKey));
1✔
270

271
                        actualMessageRequests.remove(messageRetrievalFuture);
1✔
272
                        if (!messageWithSameGroupFailedInShortPeriod) {
1✔
273
                            failingMessages.remove(messageGroupKey);
1✔
274
                            internalMessageCache.putIfAbsent(messageGroupKey, new LinkedList<>());
1✔
275
                            internalMessageCache.get(messageGroupKey).add(message);
1✔
276
                        }
277

278
                        tryProcessAnotherMessage();
1✔
279

280
                        performMessageRetrieval(messageSupplier);
1✔
281
                    } finally {
282
                        reentrantLock.unlock();
1✔
283
                    }
284
                });
1✔
285
            }
286
        } finally {
287
            reentrantLock.unlock();
1✔
288
        }
289
    }
1✔
290

291
    /**
292
     * Determine if there is an internally cached message that can begin to be processed.
293
     *
294
     * @return the optional message to process
295
     */
296
    private Optional<Message> getInternalCachedMessageAvailableForProcessing() {
297
        final Optional<Map.Entry<String, Queue<Message>>> optionalMessageGroupValue = internalMessageCache
1✔
298
            .entrySet()
1✔
299
            .stream()
1✔
300
            .filter(entry -> !messageGroupsCurrentlyProcessing.contains(entry.getKey()))
1✔
301
            .findFirst();
1✔
302

303
        if (!optionalMessageGroupValue.isPresent()) {
1✔
304
            return Optional.empty();
1✔
305
        }
306

307
        final Map.Entry<String, Queue<Message>> optionalMessageGroup = optionalMessageGroupValue.get();
1✔
308

309
        final String messageGroupKey = optionalMessageGroup.getKey();
1✔
310
        final Queue<Message> messageQueueCache = optionalMessageGroup.getValue();
1✔
311
        final Message message = messageQueueCache.remove();
1✔
312
        if (messageQueueCache.isEmpty()) {
1✔
313
            internalMessageCache.remove(messageGroupKey);
1✔
314
        }
315

316
        return Optional.of(message);
1✔
317
    }
318

319
    /**
320
     * Request another message to be processed if possible.
321
     */
322
    private void tryProcessAnotherMessage() {
323
        reentrantLock.lock();
1✔
324
        try {
325
            if (delegateBrokerRequestsForMessages.isEmpty()) {
1✔
326
                return;
1✔
327
            }
328

329
            getInternalCachedMessageAvailableForProcessing()
1✔
330
                .ifPresent(messageToProcess -> {
1✔
331
                    final String messageToProcessGroupKey = properties.messageGroupingFunction().apply(messageToProcess);
1✔
332
                    log.trace("Processing message for group: {}", messageToProcessGroupKey);
1✔
333
                    messageGroupsCurrentlyProcessing.add(messageToProcessGroupKey);
1✔
334
                    final CompletableFuture<Message> future = delegateBrokerRequestsForMessages.removeFirst();
1✔
335
                    future.complete(messageToProcess);
1✔
336
                });
1✔
337
        } finally {
338
            reentrantLock.unlock();
1✔
339
        }
340
    }
1✔
341

342
    private int getMaximumNumberOfCachedMessageGroups() {
343
        return PropertyUtils.safelyGetPositiveIntegerValue(
1✔
344
            "maximumNumberOfCachedMessageGroups",
345
            properties::getMaximumNumberOfCachedMessageGroups,
1✔
346
            1
347
        );
348
    }
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc