• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 26773559161

01 Jun 2026 06:21PM UTC coverage: 63.376% (-0.01%) from 63.389%
26773559161

push

github

web-flow
TRUNK-6429: Create application events for service method calls and entity changes (#6084)

272 of 504 new or added lines in 27 files covered. (53.97%)

5 existing lines in 2 files now uncovered.

23598 of 37235 relevant lines covered (63.38%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.0
/api/src/main/java/org/openmrs/event/outbox/tasks/OutboxPollingTaskHandler.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.event.outbox.tasks;
11

12
import com.fasterxml.jackson.databind.ObjectMapper;
13
import org.apache.commons.lang3.exception.ExceptionUtils;
14
import org.openmrs.api.context.Context;
15
import org.openmrs.event.EventPublisher;
16
import org.openmrs.event.outbox.OutboxEvent;
17
import org.openmrs.event.outbox.OutboxEventPayload;
18
import org.openmrs.event.outbox.OutboxEventRegistry;
19
import org.openmrs.event.outbox.OutboxEventService;
20
import org.openmrs.event.outbox.OutboxException;
21
import org.openmrs.event.outbox.OutboxExceptionEvent;
22
import org.openmrs.scheduler.TaskContext;
23
import org.openmrs.scheduler.TaskHandler;
24
import org.slf4j.Logger;
25
import org.slf4j.LoggerFactory;
26
import org.springframework.stereotype.Component;
27

28
import java.util.Arrays;
29
import java.util.LinkedHashSet;
30
import java.util.List;
31
import java.util.Set;
32

33
/**
34
 * @since 2.9.x
35
 */
36
@Component
37
public class OutboxPollingTaskHandler implements TaskHandler<OutboxPollingTaskData> {
38

39
        private static final Logger log = LoggerFactory.getLogger(OutboxPollingTaskHandler.class);
1✔
40
        private final OutboxEventRegistry registry;
41
        private final ObjectMapper objectMapper;
42
        private final EventPublisher eventPublisher;
43
        private final OutboxEventService outboxEventService;
44

45
        public OutboxPollingTaskHandler(OutboxEventRegistry registry, ObjectMapper objectMapper, 
46
                                                                        EventPublisher eventPublisher, OutboxEventService outboxEventService) {
1✔
47
                this.registry = registry;
1✔
48
                this.objectMapper = objectMapper;
1✔
49
                this.eventPublisher = eventPublisher;
1✔
50
                this.outboxEventService = outboxEventService;
1✔
51
        }
1✔
52

53
        /**
54
         * Processes pending outbox events. Only one thread is processing events at a time to guarantee strict 
55
         * chronological ordering.
56
         * <p>
57
         * In case an event listener does not complete in {@code outboxevent.listener.timeout} (120s by default)
58
         * it is considered stale and the event can be picked up by another thread for processing. It is signaled
59
         * by throwing an OutboxException, which can be viewed in scheduler UI.
60
         * <p>
61
         * This method does not complete until outbox is empty, thus it can run indefinitely regardless of the schedule.
62
         * 
63
         * @param taskData task data
64
         * @param taskContext task context
65
         * @throws Exception
66
         */
67
        @Override
68
        public void execute(OutboxPollingTaskData taskData, TaskContext taskContext) throws Exception {
NEW
69
                outboxEventService.resetStuckEvent();
×
70

71
                while (true) {
NEW
72
                        List<OutboxEvent> pendingItems = outboxEventService.getProcessingAndPendingEvents();
×
73

NEW
74
                        if (pendingItems.isEmpty()) {
×
NEW
75
                                return;
×
76
                        }
77

NEW
78
                        outboxEventService.warnOnTooManyPendingEvents();
×
79

NEW
80
                        log.debug("Found {} pending/processing outbox items to process", pendingItems.size());
×
81

NEW
82
                        for (OutboxEvent item : pendingItems) {
×
NEW
83
                                if (OutboxEvent.Status.PROCESSING.equals(item.getStatus())) {
×
84
                                        // An event is currently being processed by another handler.
85
                                        // Stop here to guarantee strict chronological ordering.
NEW
86
                                        return;
×
87
                                }
88
                                
NEW
89
                                if (!outboxEventService.lockEventForProcessing(item)) {
×
90
                                        // Another scheduled task handler just grabbed it and is processing it right now.
91
                                        // Stop processing subsequent events to maintain strict ordering.
NEW
92
                                        return;
×
93
                                }
94

NEW
95
                                Set<String> completedListeners = new LinkedHashSet<>();
×
96
                                try {
NEW
97
                                        Class<?> eventClass = Class.forName(item.getEventType());
×
98
                                        Object event;
NEW
99
                                        if (OutboxEventPayload.class.isAssignableFrom(eventClass)) {
×
NEW
100
                                                event = eventClass.getDeclaredConstructor().newInstance();
×
NEW
101
                                                ((OutboxEventPayload) event).fromPayload(item.getPayload());
×
102
                                        } else {
NEW
103
                                                event = objectMapper.readValue(item.getPayload(), eventClass);
×
104
                                        }
105
                                        
NEW
106
                                        if (item.getCompletedListeners() != null && !item.getCompletedListeners().isEmpty()) {
×
NEW
107
                                                completedListeners.addAll(Arrays.asList(item.getCompletedListeners().split(",")));
×
108
                                        }
109

110
                                        // Dispatch ONLY to registered outbox listeners, bypassing the ApplicationEventPublisher
111
                                        // If one of listeners fails, the event will be re-tried only for the failing listener and continue
112
                                        // with the remaining listeners.
NEW
113
                                        registry.dispatchOutboxEvent(event, completedListeners, () -> {
×
114
                                                // Update processing so that it is not considered stuck after each listener completes
NEW
115
                                                item.setCompletedListeners(String.join(",", completedListeners));
×
NEW
116
                                                log.debug("Successfully dispatched outbox event {} to {}", event.getClass(),  item.getCompletedListeners());
×
NEW
117
                                                outboxEventService.saveOutboxEvent(item);
×
NEW
118
                                        });
×
119
                                        
NEW
120
                                        item.setStatus(OutboxEvent.Status.COMPLETED);
×
NEW
121
                                        log.debug("Completed dispatching outbox event {} to {}", event.getClass(), item.getCompletedListeners());
×
NEW
122
                                        outboxEventService.saveOutboxEvent(item);
×
NEW
123
                                } catch (Exception e) {
×
NEW
124
                                        OutboxException outboxException = new OutboxException("Failed to process outbox item ID: " + item.getId(), e);
×
NEW
125
                                        eventPublisher.publishEvent(new OutboxExceptionEvent(outboxException));
×
126
                                        
NEW
127
                                        int errorCount = item.getErrorCount() == null ? 1 : item.getErrorCount() + 1;
×
128

129
                                        // Extract stacktrace
NEW
130
                                        String errorMessage = ExceptionUtils.getStackTrace(e);
×
NEW
131
                                        if (errorMessage.length() > 1024) {
×
NEW
132
                                                errorMessage = errorMessage.substring(0, 1024);
×
133
                                        }
134

135
                                        // Revert to PENDING (to retry)
NEW
136
                                        item.setStatus(OutboxEvent.Status.PENDING);
×
NEW
137
                                        item.setErrorCount(errorCount);
×
NEW
138
                                        item.setErrorMessage(errorMessage);
×
NEW
139
                                        outboxEventService.saveOutboxEvent(item);
×
140

141
                                        // Stop processing subsequent events to ensure strict ordering
NEW
142
                                        throw outboxException;
×
NEW
143
                                }
×
NEW
144
                        }
×
NEW
145
                        Context.clearSession(); // free up memory
×
NEW
146
                }
×
147
        }
148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc