• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 27824292656

19 Jun 2026 11:57AM UTC coverage: 63.653% (-0.08%) from 63.728%
27824292656

push

github

web-flow
TRUNK-6429: Addressing post commit review (#6190)

33 of 69 new or added lines in 10 files covered. (47.83%)

43 existing lines in 5 files now uncovered.

23926 of 37588 relevant lines covered (63.65%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.04
/api/src/main/java/org/openmrs/event/outbox/tasks/OutboxPollingTaskHandler.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.event.outbox.tasks;
11

12
import com.fasterxml.jackson.databind.ObjectMapper;
13
import org.apache.commons.lang3.exception.ExceptionUtils;
14
import org.openmrs.api.context.Context;
15
import org.openmrs.event.EventPublisher;
16
import org.openmrs.event.outbox.OutboxEvent;
17
import org.openmrs.event.outbox.OutboxEventPayload;
18
import org.openmrs.event.outbox.OutboxEventRegistry;
19
import org.openmrs.event.outbox.OutboxEventService;
20
import org.openmrs.event.outbox.OutboxException;
21
import org.openmrs.event.outbox.OutboxExceptionEvent;
22
import org.openmrs.scheduler.TaskContext;
23
import org.openmrs.scheduler.TaskHandler;
24
import org.slf4j.Logger;
25
import org.slf4j.LoggerFactory;
26
import org.springframework.beans.factory.annotation.Value;
27
import org.springframework.stereotype.Component;
28

29
import java.util.Arrays;
30
import java.util.LinkedHashSet;
31
import java.util.List;
32
import java.util.Set;
33

34
/**
35
 * @since 2.9.0
36
 */
37
@Component
38
public class OutboxPollingTaskHandler implements TaskHandler<OutboxPollingTaskData> {
39

40
        private static final Logger log = LoggerFactory.getLogger(OutboxPollingTaskHandler.class);
1✔
41
        private final OutboxEventRegistry registry;
42
        private final ObjectMapper objectMapper;
43
        private final EventPublisher eventPublisher;
44
        private final OutboxEventService outboxEventService;
45
        private final int retryLimit;
46

47
        public OutboxPollingTaskHandler(OutboxEventRegistry registry, ObjectMapper objectMapper, 
48
                                                                        EventPublisher eventPublisher, OutboxEventService outboxEventService,
49
                                                                        @Value("${outboxevent.retry.limit:16}") int retryLimit) {
1✔
50
                this.registry = registry;
1✔
51
                this.objectMapper = objectMapper;
1✔
52
                this.eventPublisher = eventPublisher;
1✔
53
                this.outboxEventService = outboxEventService;
1✔
54
                this.retryLimit = retryLimit;
1✔
55
        }
1✔
56

57
        /**
58
         * Processes pending outbox events. Only one thread is processing events at a time to guarantee strict 
59
         * chronological ordering.
60
         * <p>
61
         * In case an event listener does not complete in {@code outboxevent.listener.timeout} (120s by default)
62
         * it is considered stale and the event can be picked up by another thread for processing. It is signaled
63
         * by throwing an OutboxException, which can be viewed in scheduler UI.
64
         * <p>
65
         * This method does not complete until outbox is empty, thus it can run indefinitely regardless of the schedule.
66
         * 
67
         * @param taskData task data
68
         * @param taskContext task context
69
         * @throws Exception
70
         */
71
        @Override
72
        public void execute(OutboxPollingTaskData taskData, TaskContext taskContext) throws Exception {
73
                outboxEventService.resetStuckEvent();
×
74
                
75
                while (true) {
76
                        List<OutboxEvent> pendingItems = outboxEventService.getProcessingAndPendingEvents();
×
77

78
                        if (pendingItems.isEmpty()) {
×
79
                                return;
×
80
                        }
81

82
                        outboxEventService.warnOnTooManyPendingEvents();
×
83

84
                        log.debug("Found {} pending/processing outbox items to process", pendingItems.size());
×
85

86
                        for (OutboxEvent item : pendingItems) {
×
87
                                if (OutboxEvent.Status.PROCESSING.equals(item.getStatus())) {
×
88
                                        // An event is currently being processed by another handler.
89
                                        // Stop here to guarantee strict chronological ordering.
90
                                        return;
×
91
                                }
92
                                
93
                                if (!outboxEventService.lockEventForProcessing(item)) {
×
94
                                        // Another scheduled task handler just grabbed it and is processing it right now.
95
                                        // Stop processing subsequent events to maintain strict ordering.
96
                                        return;
×
97
                                }
98

99
                                Set<String> completedListeners = new LinkedHashSet<>();
×
100
                                try {
101
                                        Class<?> eventClass = Class.forName(item.getEventType());
×
102
                                        Object event;
103
                                        if (OutboxEventPayload.class.isAssignableFrom(eventClass)) {
×
104
                                                event = eventClass.getDeclaredConstructor().newInstance();
×
105
                                                ((OutboxEventPayload) event).fromPayload(item.getPayload());
×
106
                                        } else {
107
                                                event = objectMapper.readValue(item.getPayload(), eventClass);
×
108
                                        }
109
                                        
110
                                        if (item.getCompletedListeners() != null && !item.getCompletedListeners().isEmpty()) {
×
111
                                                completedListeners.addAll(Arrays.asList(item.getCompletedListeners().split(",")));
×
112
                                        }
113

114
                                        // Dispatch ONLY to registered outbox listeners, bypassing the ApplicationEventPublisher
115
                                        // If one of listeners fails, the event will be re-tried only for the failing listener and continue
116
                                        // with the remaining listeners.
117
                                        registry.dispatchOutboxEvent(event, completedListeners, () -> {
×
118
                                                // Update processing so that it is not considered stuck after each listener completes
119
                                                item.setCompletedListeners(String.join(",", completedListeners));
×
120
                                                log.debug("Successfully dispatched outbox event {} to {}", event.getClass(),  item.getCompletedListeners());
×
121
                                                outboxEventService.saveOutboxEvent(item);
×
122
                                        });
×
123
                                        
124
                                        item.setStatus(OutboxEvent.Status.COMPLETED);
×
125
                                        log.debug("Completed dispatching outbox event {} to {}", event.getClass(), item.getCompletedListeners());
×
126
                                        outboxEventService.saveOutboxEvent(item);
×
127
                                } catch (Exception e) {
×
128
                                        int errorCount = item.getErrorCount() == null ? 1 : item.getErrorCount() + 1;
×
129

130
                                        // Extract stacktrace
131
                                        String errorMessage = ExceptionUtils.getStackTrace(e);
×
132
                                        if (errorMessage.length() > 1024) {
×
133
                                                errorMessage = errorMessage.substring(0, 1024);
×
134
                                        }
UNCOV
135
                                        item.setErrorCount(errorCount);
×
UNCOV
136
                                        item.setErrorMessage(errorMessage);
×
137

NEW
138
                                        OutboxException outboxException = new OutboxException("Failed " + item.getEventType() + " with UUID: " + item.getUuid(), e);
×
NEW
139
                                        OutboxExceptionEvent outboxExceptionEvent = new OutboxExceptionEvent(outboxException, item.getUuid());
×
NEW
140
                                        outboxExceptionEvent.setRetryCount(errorCount);
×
141
                                        
NEW
142
                                        if (errorCount < retryLimit) {
×
143
                                                // Revert to PENDING to retry
NEW
144
                                                item.setStatus(OutboxEvent.Status.PENDING);
×
NEW
145
                                                outboxExceptionEvent.setPendingRetry(true);
×
146
                                        } else {
147
                                                // Stop retries to process the next item
NEW
148
                                                item.setStatus(OutboxEvent.Status.FAILED);
×
NEW
149
                                                outboxExceptionEvent.setPendingRetry(false);
×
150
                                        }
151
                                        
NEW
152
                                        outboxEventService.saveOutboxEvent(item);
×
153
                                        
NEW
154
                                        eventPublisher.publishEvent(outboxExceptionEvent);
×
155
                                        
156
                                        // Stop processing subsequent events to ensure strict ordering
157
                                        throw outboxException;
×
158
                                }
×
159
                        }
×
160
                        Context.clearSession(); // free up memory
×
161
                }
×
162
        }
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc