• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JaidenAshmore / java-dynamic-sqs-listener / #2000

pending completion
#2000

push

github-actions

web-flow
#refs 395: upgrade to Java 17, Spring Boot 3 and other dependencies (#397)

236 of 236 new or added lines in 21 files covered. (100.0%)

2180 of 2263 relevant lines covered (96.33%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.13
/core/src/main/java/com/jashmore/sqs/decorator/AutoVisibilityExtenderMessageProcessingDecorator.java
1
package com.jashmore.sqs.decorator;
2

3
import com.jashmore.documentation.annotations.ThreadSafe;
4
import com.jashmore.sqs.QueueProperties;
5
import com.jashmore.sqs.aws.AwsConstants;
6
import com.jashmore.sqs.util.collections.CollectionUtils;
7
import com.jashmore.sqs.util.thread.ThreadUtils;
8
import java.time.Duration;
9
import java.time.Instant;
10
import java.time.temporal.ChronoUnit;
11
import java.util.ArrayList;
12
import java.util.HashMap;
13
import java.util.List;
14
import java.util.Map;
15
import java.util.Optional;
16
import java.util.concurrent.CompletableFuture;
17
import java.util.concurrent.Executors;
18
import java.util.stream.Collectors;
19
import org.immutables.value.Value;
20
import org.slf4j.Logger;
21
import org.slf4j.LoggerFactory;
22
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
23
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
24
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
25
import software.amazon.awssdk.services.sqs.model.Message;
26

27
/**
28
 * {@link MessageProcessingDecorator} that will continually extend the visibility of the message while it is being processed.
29
 *
30
 * <p>No effort is made to guarantee that a message is successfully extended and therefore if the request fails or partially fails (some messages are not
31
 * extended) it will not re-attempt to extend them and just assume that they passed. Therefore if you desire a higher certainty that the visibility
32
 * extension will succeed you can configure the {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#bufferDuration()} to be a higher value.
33
 * For example, you could have the {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#visibilityTimeout()} to be 30 seconds but the
34
 * {@link AutoVisibilityExtenderMessageProcessingDecoratorProperties#bufferDuration()} to be 20 seconds and therefore you will have 3 attempts to successfully
35
 * extend the message.
36
 *
37
 * <p>Note that this only works with synchronous implementations of the message listener, e.g. functions that are executed using the
38
 * {@link com.jashmore.sqs.processor.LambdaMessageProcessor} or the {@link com.jashmore.sqs.processor.CoreMessageProcessor} where the function does not
39
 * return a {@link CompletableFuture}. This is because it is not easy to interrupt the processing of a message if it has been placed onto a different
40
 * thread to process.
41
 *
42
 * <p>This {@link MessageProcessingDecorator} is thread safe and will work safely when multiple messages are all being processed at once.
43
 *
44
 * @see AutoVisibilityExtenderMessageProcessingDecoratorProperties for configuration options
45
 */
46
@ThreadSafe
47
public class AutoVisibilityExtenderMessageProcessingDecorator implements MessageProcessingDecorator {
48

49
    private static final Logger log = LoggerFactory.getLogger(AutoVisibilityExtenderMessageProcessingDecorator.class);
1✔
50

51
    private final SqsAsyncClient sqsAsyncClient;
52
    private final QueueProperties queueProperties;
53
    private final AutoVisibilityExtenderMessageProcessingDecoratorProperties decoratorProperties;
54
    private final Map<Message, MessageProcessingState> currentMessagesProcessing;
55
    private final Object waitingLock = new Object();
1✔
56

57
    public AutoVisibilityExtenderMessageProcessingDecorator(
58
        final SqsAsyncClient sqsAsyncClient,
59
        final QueueProperties queueProperties,
60
        final AutoVisibilityExtenderMessageProcessingDecoratorProperties decoratorProperties
61
    ) {
1✔
62
        this.sqsAsyncClient = sqsAsyncClient;
1✔
63
        this.queueProperties = queueProperties;
1✔
64
        this.decoratorProperties = decoratorProperties;
1✔
65

66
        this.currentMessagesProcessing = new HashMap<>();
1✔
67
    }
1✔
68

69
    @Override
70
    public void onPreMessageProcessing(final MessageProcessingContext context, final Message message) {
71
        synchronized (waitingLock) {
1✔
72
            final Instant timeNow = Instant.now();
1✔
73
            log.debug("Registering message {} with visibility auto extender", message.messageId());
1✔
74
            currentMessagesProcessing.put(
1✔
75
                message,
76
                ImmutableMessageProcessingState
77
                    .builder()
1✔
78
                    .thread(Thread.currentThread())
1✔
79
                    .startTime(timeNow)
1✔
80
                    .nextVisibilityExtensionTime(nextExtensionTime(timeNow, message, decoratorProperties.bufferDuration()))
1✔
81
                    .build()
1✔
82
            );
83

84
            if (currentMessagesProcessing.size() == 1) {
1✔
85
                CompletableFuture
1✔
86
                    .runAsync(
1✔
87
                        this::performBackgroundThread,
88
                        Executors.newSingleThreadExecutor(
1✔
89
                            ThreadUtils.singleNamedThreadFactory(context.getListenerIdentifier() + "-auto-visibility-extender")
1✔
90
                        )
91
                    )
92
                    .whenComplete((ignored, throwable) -> {
1✔
93
                        if (throwable != null) {
1✔
94
                            log.error("Unexpected error with visibility timeout extender", throwable);
×
95
                        }
96
                    });
1✔
97
            }
98

99
            // We need to notify the background thread to recalculate the updated time in case it has configured this message to have a smaller visibility
100
            // timeout then the current wait time
101
            waitingLock.notify();
1✔
102
        }
1✔
103
    }
1✔
104

105
    @Override
106
    public void onMessageProcessingThreadComplete(final MessageProcessingContext context, final Message message) {
107
        removeMessageFromAutoVisibilityExtender(message);
1✔
108
    }
1✔
109

110
    @Override
111
    public void onMessageResolve(MessageProcessingContext context, Message message) {
112
        // Needed in case the message listener is manually acknowledging the message
113
        removeMessageFromAutoVisibilityExtender(message);
1✔
114
    }
1✔
115

116
    private void removeMessageFromAutoVisibilityExtender(final Message message) {
117
        synchronized (waitingLock) {
1✔
118
            final MessageProcessingState valueStored = currentMessagesProcessing.remove(message);
1✔
119
            // Makes sure we only do this once for the message
120
            if (valueStored != null) {
1✔
121
                decoratorProperties.messageDoneProcessing(message);
1✔
122
                waitingLock.notify();
1✔
123
            }
124
        }
1✔
125
    }
1✔
126

127
    private void performBackgroundThread() {
128
        log.debug("Starting background thread for auto visibility extender");
1✔
129
        synchronized (waitingLock) {
1✔
130
            while (!currentMessagesProcessing.isEmpty()) {
1✔
131
                final Instant timeNow = Instant.now();
1✔
132
                final Duration maxDuration = decoratorProperties.maxDuration();
1✔
133
                final Duration bufferDuration = decoratorProperties.bufferDuration();
1✔
134

135
                interruptLongRunningThreads(timeNow, maxDuration);
1✔
136

137
                extendThreadsWithMoreTime(timeNow, bufferDuration);
1✔
138

139
                try {
140
                    waitUntilNextIteration(maxDuration);
1✔
141
                } catch (final InterruptedException interruptedException) {
×
142
                    break;
×
143
                }
1✔
144
            }
1✔
145
        }
1✔
146
        log.debug("Finished background thread for auto visibility extender");
1✔
147
    }
1✔
148

149
    private void interruptLongRunningThreads(final Instant timeNow, final Duration maxDuration) {
150
        final Map<Message, MessageProcessingState> messagesToInterrupt = currentMessagesProcessing
1✔
151
            .entrySet()
1✔
152
            .stream()
1✔
153
            .filter(messageStateEntry -> timeNow.compareTo(messageStateEntry.getValue().startTime().plus(maxDuration)) >= 0)
1✔
154
            .collect(CollectionUtils.pairsToMap());
1✔
155

156
        messagesToInterrupt.forEach((message, state) -> {
1✔
157
            log.info("Interrupting message processing thread due to exceeded time for message {}", message.messageId());
1✔
158
            state.thread().interrupt();
1✔
159
            currentMessagesProcessing.remove(message);
1✔
160
        });
1✔
161
    }
1✔
162

163
    /**
164
     * For each message that has hit the visibility timeout extension time, attempt to extend the visibility.
165
     *
166
     * <p>This method does not wait for the response from the visibility timeout extension and just assumes that it works.
167
     *
168
     * @param timeNow the time that this iteration started at
169
     * @param bufferDuration the amount of buffer time for the next visibility timeout extension
170
     */
171
    private void extendThreadsWithMoreTime(final Instant timeNow, final Duration bufferDuration) {
172
        final Map<Message, MessageProcessingState> messagesToExtend = currentMessagesProcessing
1✔
173
            .entrySet()
1✔
174
            .stream()
1✔
175
            .filter(messageStateEntry -> timeNow.compareTo(messageStateEntry.getValue().nextVisibilityExtensionTime()) >= 0)
1✔
176
            .collect(CollectionUtils.pairsToMap());
1✔
177

178
        List<Message> messageBatch = new ArrayList<>(AwsConstants.MAX_NUMBER_OF_MESSAGES_IN_BATCH);
1✔
179
        for (final Map.Entry<Message, MessageProcessingState> stateEntry : messagesToExtend.entrySet()) {
1✔
180
            final Message message = stateEntry.getKey();
1✔
181
            final MessageProcessingState state = stateEntry.getValue();
1✔
182
            log.info("Automatically extending visibility timeout of message {}", message.messageId());
1✔
183
            messageBatch.add(message);
1✔
184
            if (messageBatch.size() == AwsConstants.MAX_NUMBER_OF_MESSAGES_IN_BATCH) {
1✔
185
                extendMessageBatch(messageBatch);
×
186
                messageBatch.clear();
×
187
            }
188
            currentMessagesProcessing.put(
1✔
189
                message,
190
                ImmutableMessageProcessingState
191
                    .builder()
1✔
192
                    .from(state)
1✔
193
                    .nextVisibilityExtensionTime(timeNow.plus(decoratorProperties.visibilityTimeout(message).minus(bufferDuration)))
1✔
194
                    .build()
1✔
195
            );
196
        }
1✔
197

198
        if (!messageBatch.isEmpty()) {
1✔
199
            extendMessageBatch(messageBatch);
1✔
200
        }
201
    }
1✔
202

203
    private void extendMessageBatch(final List<Message> messageBatch) {
204
        sqsAsyncClient
1✔
205
            .changeMessageVisibilityBatch(builder ->
1✔
206
                builder
1✔
207
                    .queueUrl(queueProperties.getQueueUrl())
1✔
208
                    .entries(
1✔
209
                        messageBatch
210
                            .stream()
1✔
211
                            .map(message ->
1✔
212
                                ChangeMessageVisibilityBatchRequestEntry
213
                                    .builder()
1✔
214
                                    .id(message.messageId())
1✔
215
                                    .receiptHandle(message.receiptHandle())
1✔
216
                                    .visibilityTimeout((int) decoratorProperties.visibilityTimeout(message).getSeconds())
1✔
217
                                    .build()
1✔
218
                            )
219
                            .collect(Collectors.toList())
1✔
220
                    )
221
            )
222
            .whenComplete((ignoredResponse, throwable) -> {
1✔
223
                if (throwable != null) {
1✔
224
                    log.error(
×
225
                        "Error changing visibility timeout for message. The following messages were not extended: " +
226
                        messageBatch.stream().map(Message::messageId).collect(Collectors.toList()),
×
227
                        throwable
228
                    );
229
                }
230

231
                if (ignoredResponse.hasFailed()) {
1✔
232
                    log.error(
×
233
                        "Some messages failed to be have their visibility timeout changed: {}",
234
                        ignoredResponse.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toList())
×
235
                    );
236
                }
237
            });
1✔
238
    }
1✔
239

240
    /**
241
     * If there are more messages that are currently processing, determine the next time that a message needs to be interrupted or extended and wait until
242
     * that.
243
     *
244
     * @param maxDuration the maximum amount of time to wait for a message
245
     * @throws InterruptedException if the thread was interrupted while waiting
246
     */
247
    private void waitUntilNextIteration(final Duration maxDuration) throws InterruptedException {
248
        final Optional<Instant> optionalEarliestNextUpdateTime = currentMessagesProcessing
1✔
249
            .values()
1✔
250
            .stream()
1✔
251
            .map(state -> determineEarliestTrigger(state, maxDuration))
1✔
252
            .min(Instant::compareTo);
1✔
253

254
        if (!optionalEarliestNextUpdateTime.isPresent()) {
1✔
255
            return;
1✔
256
        }
257

258
        final long nextTime = Instant.now().until(optionalEarliestNextUpdateTime.get(), ChronoUnit.MILLIS);
1✔
259
        if (nextTime <= 0) {
1✔
260
            return;
1✔
261
        }
262

263
        log.debug("Waiting {}ms to change visibility timeout", nextTime);
1✔
264
        waitingLock.wait(nextTime);
1✔
265
    }
1✔
266

267
    /**
268
     * Determines the next time that the message needs to be extended to stop its visibility from expiring.
269
     *
270
     * @param timeNow the time that this iteration started at
271
     * @param message the message to determine the visibility timeout for
272
     * @param bufferDuration the buffer to change the visibility timeout before it actually expires
273
     * @return the time to extend the message's visibility
274
     */
275
    private Instant nextExtensionTime(final Instant timeNow, final Message message, final Duration bufferDuration) {
276
        return timeNow.plus(decoratorProperties.visibilityTimeout(message)).minus(bufferDuration);
1✔
277
    }
278

279
    /**
280
     * Determines whether the earliest time for this message should be when it should be interrupted or the next visibility extension time.
281
     *
282
     * @param state the state of this message
283
     * @param maxDuration the maximum time the message should process
284
     * @return the next time that the message should be extended or interrupted
285
     */
286
    private static Instant determineEarliestTrigger(final MessageProcessingState state, final Duration maxDuration) {
287
        final Instant maxTime = state.startTime().plus(maxDuration);
1✔
288
        final Instant nextVisibilityExtensionTime = state.nextVisibilityExtensionTime();
1✔
289
        if (maxTime.isBefore(nextVisibilityExtensionTime)) {
1✔
290
            return maxTime;
1✔
291
        } else {
292
            return nextVisibilityExtensionTime;
1✔
293
        }
294
    }
295

296
    @Value.Immutable
297
    interface MessageProcessingState {
298
        /**
299
         * The thread that is processing this message.
300
         *
301
         * <p> This is used to interrupt the processing if has run too long.
302
         *
303
         * @return the thread processing the message
304
         */
305
        Thread thread();
306

307
        /**
308
         * The time that the message began processing.
309
         *
310
         * @return the start time for the message
311
         */
312
        Instant startTime();
313

314
        /**
315
         * The next time that the visibility of the message will need to be extended.
316
         *
317
         * <p> This includes the buffer time and therefore will occur before the message's timeout actually expires.
318
         *
319
         * @return the next visibility extension time
320
         */
321
        Instant nextVisibilityExtensionTime();
322
    }
323
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc