Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

OpenWiseSolutions / openhub-framework / 1198

4 Feb 2021 - 14:46 coverage decreased (-0.5%) to 70.196%
1198

Pull #129

travis-ci

9181eb84f9c35729a3bad740fb7f9d93?size=18&default=identiconweb-flow
Merge b150e80af into 382999a38
Pull Request #129: [OHFJIRA-85]: Upgrade to Spring Boot 2.0.9.RELEASE

23 of 66 new or added lines in 12 files covered. (34.85%)

28 existing lines in 4 files now uncovered.

4329 of 6167 relevant lines covered (70.2%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.18
/core/src/main/java/org/openhubframework/openhub/core/common/asynch/queue/MessagePollExecutor.java
UNCOV
1
/*
!
2
 * Copyright 2014-2020 the original author or authors.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package org.openhubframework.openhub.core.common.asynch.queue;
18

19
import static org.openhubframework.openhub.api.configuration.CoreProps.ASYNCH_POSTPONED_INTERVAL_WHEN_FAILED_SEC;
20

21
import java.time.Instant;
22
import java.util.List;
23

24
import org.apache.camel.Exchange;
25
import org.apache.camel.ExchangePattern;
26
import org.apache.camel.Processor;
27
import org.apache.camel.ProducerTemplate;
28
import org.apache.camel.builder.ExchangeBuilder;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31
import org.springframework.beans.factory.annotation.Autowired;
32
import org.springframework.stereotype.Service;
33
import org.springframework.util.Assert;
34

35
import org.openhubframework.openhub.api.asynch.AsynchConstants;
36
import org.openhubframework.openhub.api.configuration.ConfigurableValue;
37
import org.openhubframework.openhub.api.configuration.ConfigurationItem;
38
import org.openhubframework.openhub.api.entity.Message;
39
import org.openhubframework.openhub.api.exception.IntegrationException;
40
import org.openhubframework.openhub.api.exception.InternalErrorEnum;
41
import org.openhubframework.openhub.api.exception.LockFailureException;
42
import org.openhubframework.openhub.common.time.Seconds;
43
import org.openhubframework.openhub.core.common.asynch.AsynchMessageRoute;
44
import org.openhubframework.openhub.core.common.asynch.LogContextHelper;
45
import org.openhubframework.openhub.core.common.event.AsynchEventHelper;
46
import org.openhubframework.openhub.spi.msg.MessageService;
47

48

49
/**
50
 * Reads messages from DB and sends them for next processing.
51
 * Execution will stop when there is no further message for processing.
52
 * <p>
53
 * This executor is invoked by {@link JobStarterForMessagePooling}.
54
 *
55
 * @author Petr Juza
56
 */
57
@Service
58
public class MessagePollExecutor implements Runnable {
1×
59

60
    private static final Logger LOG = LoggerFactory.getLogger(MessagePollExecutor.class);
1×
61

62
    private static final int LOCK_FAILURE_LIMIT = 5;
63

64
    private static final long GUARANTEED_ORDER_MESSAGES_LIMIT = 2L;
1×
65

66
    @Autowired
67
    private MessagesPool messagesPool;
68

69
    @Autowired
70
    private ProducerTemplate producerTemplate;
71

72
    @Autowired
73
    private MessageService messageService;
74

75
    /**
76
     * Interval (in seconds) after that postponed messages will fail.
77
     */
78
    @ConfigurableValue(key = ASYNCH_POSTPONED_INTERVAL_WHEN_FAILED_SEC)
79
    private ConfigurationItem<Seconds> postponedIntervalWhenFailed;
80

81
    // note: this is because of setting different target URI for tests
82
    private String targetURI = AsynchMessageRoute.URI_ASYNC_PROCESSING_MSG;
1×
83

84
    @Override
85
    public void run() {
86
        LOG.debug("Message pooling starts ...");
1×
87

88
        // is there message for processing?
89
        Message msg = null;
1×
90
        int lockFailureCount = 0;
1×
91
        while (true) {
!
92
            try {
93
                msg = messagesPool.getNextMessage();
1×
94

95
                if (msg != null) {
1×
96
                    LogContextHelper.setLogContextParams(msg, null);
1×
97

98
                    startMessageProcessing(msg);
1×
99
                } else {
100
                    //there is no new message for processing
101
                    //  => finish this executor and try it again after some time
102
                    break;
103
                }
104
            } catch (LockFailureException ex) {
1×
105
                // try again to acquire next message with lock
106
                lockFailureCount++;
1×
107

108
                if (lockFailureCount > LOCK_FAILURE_LIMIT) {
1×
109
                    LOG.warn("Probably problem with locking messages - count of lock failures exceeds limit ("
!
110
                            + LOCK_FAILURE_LIMIT + ").");
111
                    break;
!
112
                }
113
            } catch (Exception ex) {
!
114
                LOG.error("Error occurred during getting message "
!
115
                        + (msg != null ? msg.toHumanString() : ""), ex);
!
116
            }
117
        }
118

119
        LOG.debug("Message pooling finished.");
1×
120
    }
1×
121

122
    void startMessageProcessing(Message msg) {
123
        Assert.notNull(msg, "the msg must not be null");
1×
124

125
        if (isMsgInGuaranteedOrder(msg)) {
1×
126
            // sends message for next processing
127
            producerTemplate.sendBodyAndHeader(targetURI, ExchangePattern.InOnly, msg,
1×
128
                    AsynchConstants.MSG_QUEUE_INSERT_HEADER, System.currentTimeMillis());
1×
129

130
        } else {
1×
UNCOV
131
            Instant failedDate = Instant.now().minusSeconds(postponedIntervalWhenFailed.getValue().getSeconds());
!
132

UNCOV
133
            final Message paramMsg = msg;
!
134

UNCOV
135
            if (msg.getReceiveTimestamp().isBefore(failedDate)) {
!
136
                // change to failed message => redirect to "FAILED" route
UNCOV
137
                producerTemplate.send(AsynchConstants.URI_ERROR_FATAL, ExchangePattern.InOnly,
!
UNCOV
138
                        new Processor() {
!
139
                            @Override
140
                            public void process(Exchange exchange) throws Exception {
UNCOV
141
                                IntegrationException ex = new IntegrationException(InternalErrorEnum.E121,
!
UNCOV
142
                                        "Message " + paramMsg.toHumanString() + " exceeded interval for starting "
!
UNCOV
143
                                                + "processing => changed to FAILED state");
!
144

UNCOV
145
                                exchange.setProperty(Exchange.EXCEPTION_CAUGHT, ex);
!
146

UNCOV
147
                                exchange.getIn().setHeader(AsynchConstants.MSG_HEADER, paramMsg);
!
UNCOV
148
                            }
!
149
                        });
150

UNCOV
151
            } else {
!
152
                // postpone message
UNCOV
153
                messageService.setStatePostponed(msg);
!
154

155
                // create Exchange for event only
UNCOV
156
                ExchangeBuilder exchangeBuilder = ExchangeBuilder.anExchange(producerTemplate.getCamelContext());
!
UNCOV
157
                Exchange exchange = exchangeBuilder.build();
!
158

UNCOV
159
                exchange.getIn().setHeader(AsynchConstants.MSG_HEADER, paramMsg);
!
160

UNCOV
161
                AsynchEventHelper.notifyMsgPostponed(exchange);
!
162
            }
163
        }
164
    }
1×
165

166
    /**
167
     * Checks if specified message should be processed in guaranteed order and if yes
168
     * then checks if the message is in the right order.
169
     *
170
     * @param msg the asynchronous message
171
     * @return {@code true} if message's order is ok otherwise {@code false}
172
     */
173
    private boolean isMsgInGuaranteedOrder(Message msg) {
174
        if (!msg.isGuaranteedOrder()) {
1×
175
            // no guaranteed order => continue
176
            return true;
1×
177
        } else {
178
            // guaranteed order => is the message in the right order?
179
            List<Message> messages = messageService.getMessagesForGuaranteedOrderForRoute(msg.getFunnelValue(),
1×
180
                    msg.isExcludeFailedState(), GUARANTEED_ORDER_MESSAGES_LIMIT);
1×
181

182
            if (messages.size() == 1) {
1×
183
                LOG.debug("There is only one processing message with funnel value: " + msg.getFunnelValue()
1×
184
                        + " => continue");
1×
185

186
                return true;
1×
187

188
            // is specified message first one for processing?
UNCOV
189
            } else if (messages.get(0).equals(msg)) {
!
190
                LOG.debug("Processing message (msg_id = {}, funnel value = '{}') is the first one"
!
191
                        + " => continue", msg.getMsgId(), msg.getFunnelValue());
!
192

193
                return true;
!
194

195
            } else {
UNCOV
196
                LOG.debug("There is at least one processing message with funnel value '{}'"
!
197
                                + " before current message (msg_id = {}); message {} will be postponed.",
UNCOV
198
                        msg.getFunnelValue(), msg.getMsgId(), msg.toHumanString());
!
199

UNCOV
200
                return false;
!
201
            }
202
        }
203
    }
204
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2021 Coveralls, Inc