• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Mercateo / sqs-utils / 153

pending completion
153

push

travis-ci-com

web-flow
Actually shutdown thread pool executor (#13)

4 of 4 new or added lines in 1 file covered. (100.0%)

0 of 257 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java
1
/**
2
 * Copyright © 2017 Mercateo AG (http://www.mercateo.com)
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.mercateo.sqs.utils.message.handling;
17

18
import com.mercateo.sqs.utils.queue.Queue;
19
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtender;
20
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;
21

22
import java.time.Duration;
23
import java.util.concurrent.ScheduledExecutorService;
24
import java.util.concurrent.ScheduledFuture;
25
import java.util.concurrent.TimeUnit;
26

27
import lombok.NonNull;
28
import lombok.SneakyThrows;
29
import lombok.extern.slf4j.Slf4j;
30

31
import org.springframework.messaging.Message;
32
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
33

34
@Slf4j
×
35
public class LongRunningMessageHandler<I, O> {
36

37
    private final ThreadPoolTaskExecutor messageProcessingExecutor;
38

39
    private final MessageHandlingRunnableFactory messageHandlingRunnableFactory;
40

41
    private final VisibilityTimeoutExtenderFactory timeoutExtenderFactory;
42

43
    private final MessageWorkerWithHeaders<I, O> worker;
44

45
    private final Queue queue;
46

47
    private final FinishedMessageCallback<I, O> finishedMessageCallback;
48

49
    private final SetWithUpperBound<String> messagesInProcessing;
50

51
    private final Duration timeUntilVisibilityTimeoutExtension;
52

53
    private final ScheduledExecutorService timeoutExtensionExecutor;
54
    
55
    private final ErrorHandlingStrategy<I> errorHandlingStrategy;
56

57
    private final Duration awaitShutDown;
58

59
    LongRunningMessageHandler(@NonNull ScheduledExecutorService timeoutExtensionExecutor,
×
60
            int maxNumberOfMessages, int numberOfThreads,
61
            @NonNull MessageHandlingRunnableFactory messageHandlingRunnableFactory,
×
62
            @NonNull VisibilityTimeoutExtenderFactory timeoutExtenderFactory,
×
63
            @NonNull MessageWorkerWithHeaders<I, O> worker, @NonNull Queue queue,
×
64
            @NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
×
65
            @NonNull Duration timeUntilVisibilityTimeoutExtension,
×
66
            @NonNull Duration awaitShutDown,
×
67
            @NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {
×
68
        if (timeUntilVisibilityTimeoutExtension.isZero() || timeUntilVisibilityTimeoutExtension
×
69
                .isNegative()) {
×
70
            throw new IllegalArgumentException("the timeout has to be > 0");
×
71
        }
72
        this.timeoutExtensionExecutor = timeoutExtensionExecutor;
×
73
        this.messageHandlingRunnableFactory = messageHandlingRunnableFactory;
×
74
        this.timeoutExtenderFactory = timeoutExtenderFactory;
×
75
        this.worker = worker;
×
76
        this.queue = queue;
×
77
        this.finishedMessageCallback = finishedMessageCallback;
×
78
        this.timeUntilVisibilityTimeoutExtension = timeUntilVisibilityTimeoutExtension;
×
79
        this.awaitShutDown = awaitShutDown;
×
80
        this.errorHandlingStrategy = errorHandlingStrategy;
×
81

82
        messageProcessingExecutor = new ThreadPoolTaskExecutor();
×
83
        messageProcessingExecutor.setMaxPoolSize(numberOfThreads);
×
84
        messageProcessingExecutor.setCorePoolSize(numberOfThreads);
×
85
        messageProcessingExecutor.setThreadNamePrefix(getClass().getSimpleName()+"-"+queue.getName().getId()+"-");
×
86
        /*
87
         * Since we only accept new messages if one slot in the messagesInProcessing-Set
88
         * / executor is free we can schedule at least one message for instant execution
89
         * while (maxNumberOfMessages - 1) will be put into the queue
90
         */
91
        messageProcessingExecutor.setQueueCapacity(maxNumberOfMessages - 1);
×
92
        messageProcessingExecutor.afterPropertiesSet();
×
93

94
        messagesInProcessing = new SetWithUpperBound<>(numberOfThreads);
×
95

96
        if (queue.getDefaultVisibilityTimeout().minusSeconds(5).compareTo(
×
97
                timeUntilVisibilityTimeoutExtension) < 0) {
98
            throw new IllegalStateException("The extension interval of "
×
99
                    + timeUntilVisibilityTimeoutExtension.getSeconds()
×
100
                    + " is too close to the VisibilityTimeout of " + queue
101
                            .getDefaultVisibilityTimeout().getSeconds()
×
102
                    + " seconds of the queue, has to be at least 5 seconds less.");
103
        }
104
    }
×
105

106
    /**
107
     * Submits a task for the processing of the message into the internal executor.
108
     * Schedules a timeoutExtender that takes care of extending the visibility
109
     * timeout until and during message processing.
110
     *
111
     * <p>
112
     * Returns iff there is at least one free slot in the internal executor i.e.
113
     * that new messages can be consumed. That way we guarantee that we can handle
114
     * an incoming maxNumberOfMessages on the next iteration. Returning from this
115
     * method does <b>not</b> mean the message has already been processed, it simply
116
     * means that it is in processing.
117
     *
118
     * <p>
119
     * This method should only be called from a single thread, from a single
120
     * SqsListener and only once per message.
121
     *
122
     * <p>
123
     * The SimpleMessageListenerContainer dispatches one task per incoming message
124
     * to an internal ThreadPoolExecutor and waits for all the tasks to finish
125
     * before polling from SQS again. That means we can block each task / thread
126
     * from returning until a free worker is available without interfering with the
127
     * dispatching of other message tasks.
128
     *
129
     * @param message
130
     *            the message to be processed
131
     */
132
    public void handleMessage(@NonNull Message<I> message) {
×
133
        String messageId = message.getHeaders().get("MessageId", String.class);
×
134
        if (messagesInProcessing.contains(messageId)) {
×
135
            return;
×
136
        }
137
        messagesInProcessing.add(messageId);
×
138

139
        ScheduledFuture<?> timeoutExtender;
140
        try {
141
            timeoutExtender = scheduleNewVisibilityTimeoutExtender(message);
×
142
        } catch (RuntimeException rex) {
×
143
            messagesInProcessing.remove(messageId);
×
144
            log.error("error while trying to schedule timeout extender", rex);
×
145
            throw new RuntimeException(rex);
×
146
        }
×
147

148
        try {
149
            scheduleNewMessageTask(message, timeoutExtender);
×
150
        } catch (RuntimeException rex) {
×
151
            messagesInProcessing.remove(messageId);
×
152
            timeoutExtender.cancel(true);
×
153
            log.error("error while trying to submit message processing task", rex);
×
154
            throw new RuntimeException(rex);
×
155
        }
×
156

157
        messagesInProcessing.waitUntilAtLeastOneFree();
×
158
    }
×
159

160
    private void scheduleNewMessageTask(@NonNull Message<I> message,
×
161
            ScheduledFuture<?> visibilityTimeoutExtender) {
162
        MessageHandlingRunnable<I, O> messageTask = messageHandlingRunnableFactory.get(worker,
×
163
                message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);
164

165
        messageProcessingExecutor.submit(messageTask);
×
166
    }
×
167

168
    private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull Message<I> message) {
×
169
        VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy);
×
170
        return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender,
×
171
                timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension
×
172
                        .toMillis(), TimeUnit.MILLISECONDS);
×
173
    }
174

175
    /**
176
     * Visible for Testing
177
     *
178
     * @return Set containing messageIds in processing
179
     */
180
    SetWithUpperBound<String> getMessagesInProcessing() {
181
        return messagesInProcessing;
×
182
    }
183

184
    @SneakyThrows
×
185
    public void shutdown() {
186
        messageProcessingExecutor.getThreadPoolExecutor().shutdown();
×
187
        boolean successfullyTerminated = messageProcessingExecutor.getThreadPoolExecutor().awaitTermination(awaitShutDown.getSeconds(), TimeUnit.SECONDS);
×
188
        if (!successfullyTerminated) {
×
189
            messageProcessingExecutor.getThreadPoolExecutor().shutdownNow();
×
190
            messageProcessingExecutor.getThreadPoolExecutor().awaitTermination(10, TimeUnit.SECONDS);
×
191
        }
192
    }
×
193

194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc