• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 19727982349

27 Nov 2025 07:04AM UTC coverage: 64.902% (-0.01%) from 64.915%
19727982349

push

github

web-flow
TRUNK-6480: Resolve Errors When Mocking Context (#5525) (#5531)

(cherry picked from commit 463afcefe)

3 of 3 new or added lines in 1 file covered. (100.0%)

7 existing lines in 3 files now uncovered.

23449 of 36130 relevant lines covered (64.9%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/api/src/main/java/org/openmrs/api/stream/StreamDataService.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.api.stream;
11

12
import java.io.ByteArrayInputStream;
13
import java.io.ByteArrayOutputStream;
14
import java.io.IOException;
15
import java.io.InputStream;
16
import java.io.InterruptedIOException;
17
import java.io.OutputStream;
18
import java.io.UncheckedIOException;
19
import java.time.Duration;
20
import java.util.Objects;
21
import java.util.concurrent.BlockingQueue;
22
import java.util.concurrent.LinkedBlockingQueue;
23
import java.util.concurrent.TimeUnit;
24

25
import org.apache.commons.io.IOUtils;
26
import org.slf4j.Logger;
27
import org.slf4j.LoggerFactory;
28
import org.springframework.beans.factory.annotation.Autowired;
29
import org.springframework.core.task.TaskExecutor;
30
import org.springframework.stereotype.Service;
31
import org.springframework.util.unit.DataSize;
32

33
/**
34
 * The service can be used to convert data from OutputStream to InputStream without copying all data in memory.
35
 * <p>
36
 * The {@link #streamData(StreamDataWriter, Long)} method may run {@link StreamDataWriter#write(OutputStream)} in a 
37
 * separate thread using {@link TaskExecutor}.
38
 * <p>
39
 * It's providing the {@link java.io.PipedInputStream}/{@link java.io.PipedOutputStream} mechanism in a thread safe way 
40
 * with the use of {@link BlockingQueue}.
41
 * 
42
 * @since 2.8.0, 2.7.5, 2.6.16, 2.5.15
43
 */
44
@Service
45
public class StreamDataService {
46
        public static final int BUFFER_SIZE = (int) DataSize.ofKilobytes(128).toBytes();
1✔
47
        
48
        private static final Logger log = LoggerFactory.getLogger(StreamDataService.class);
1✔
49
        private final TaskExecutor taskExecutor;
50
        
51
        public StreamDataService(@Autowired TaskExecutor taskExecutor) {
1✔
52
                this.taskExecutor = taskExecutor;
1✔
53
        }
1✔
54
        
55
        private static class QueueInputStream extends InputStream {
56
                private final BlockingQueue<Integer> blockingQueue;
57
                private final long timeoutNanos;
58
                private volatile IOException streamException;
59

60
                public QueueInputStream() {
1✔
61
                        this.blockingQueue = new LinkedBlockingQueue<>(BUFFER_SIZE);
1✔
62
                        this.timeoutNanos = Duration.ofSeconds(30).toNanos();
1✔
63
                }
1✔
64

65
                public QueueOutputStream newQueueOutputStream() {
66
                        return new QueueOutputStream(this);
1✔
67
                }
68

69
                @Override
70
                public int read() throws IOException {
71
                        try {
72
                                checkStreamException();
1✔
73
                                
74
                                int result;
75
                                Integer peek = this.blockingQueue.peek();
1✔
76
                                if (Integer.valueOf(-1).equals(peek)) {
1✔
77
                                        result = -1;
1✔
78
                                } else {
79
                                        Integer value = this.blockingQueue.poll(this.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
80
                                        if (value == null) {
1✔
81
                                                // Timeout
82
                                                result = -1;
1✔
83
                                        } else if (value == -1) {
1✔
84
                                                // End of stream. Put the end of stream back in the queue for consistency.
UNCOV
85
                                                this.blockingQueue.clear();
×
UNCOV
86
                                                if (!this.blockingQueue.offer(-1, timeoutNanos, TimeUnit.NANOSECONDS)) {
×
87
                                                        throw new IOException("Failed to write to full queue");
×
88
                                                }
UNCOV
89
                                                result = -1;
×
90
                                        } else {
91
                                                result = 255 & value;
1✔
92
                                        }
93
                                }
94

95
                                checkStreamException();
1✔
96
                                return result;
1✔
97
                        } catch (InterruptedException e) {
×
98
                                Thread.currentThread().interrupt();
×
99
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
100
                                interruptedIoException.initCause(e);
×
101
                                throw interruptedIoException;
×
102
                        }
103
                }
104

105
                @Override
106
                public void close() throws IOException {
107
                        checkStreamException();
×
108
                        super.close();
×
109
                }
×
110

111
                /**
112
                 * Propagate exception from a writing thread to a reading thread so that processing is stopped.
113
                 * 
114
                 * @param streamException exception
115
                 * @throws UncheckedIOException rethrows e
116
                 */
117
                public void propagateStreamException(IOException streamException) {
118
                        this.streamException = streamException;
1✔
119
                }
1✔
120
                
121
                public void checkStreamException() throws IOException {
122
                        if (streamException != null) {
1✔
123
                                throw streamException;
1✔
124
                        }
125
                }
1✔
126
        }
127
        
128
        private static class QueueOutputStream extends OutputStream {
129
                private final QueueInputStream queueInputStream;
130
                
131
                public QueueOutputStream(QueueInputStream queueInputStream) {
1✔
132
                        this.queueInputStream = queueInputStream;
1✔
133
                }
1✔
134

135
                /**
136
                 * @param b   the <code>byte</code>.
137
                 * @throws IOException when queue full or interrupted
138
                 */
139
                @Override
140
                public void write(int b) throws IOException {
141
                        try {
142
                                queueInputStream.checkStreamException();
1✔
143
                                
144
                                if (!queueInputStream.blockingQueue.offer(255 & b, queueInputStream.timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
145
                                        IOException streamException = new IOException("Failed to write to full queue");
×
146
                                        queueInputStream.propagateStreamException(streamException);
×
147
                                }
148
                        } catch (InterruptedException e) {
×
149
                                Thread.currentThread().interrupt();
×
150
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
151
                                interruptedIoException.initCause(e);
×
152
                                throw interruptedIoException;
×
153
                        }
1✔
154
                }
1✔
155

156
                /**
157
                 * Closing the stream doesn't fail any following writes, but effectively only data up to closing the stream
158
                 * is read.
159
                 * 
160
                 * @throws IOException when queue full or interrupted
161
                 */
162
                @Override
163
                public void close() throws IOException {
164
                        try {
165
                                queueInputStream.checkStreamException();
1✔
166
                                
167
                                // Indicate the end of stream
168
                                if (!this.queueInputStream.blockingQueue.offer(-1, queueInputStream.timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
169
                                        IOException streamException = new IOException("Failed to write to full queue");
×
170
                                        queueInputStream.propagateStreamException(streamException);
×
171
                                }
172
                        } catch (InterruptedException e) {
×
173
                                Thread.currentThread().interrupt();
×
174
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
175
                                interruptedIoException.initCause(e);
×
176
                                throw interruptedIoException;
×
177
                        }
1✔
178
                }
1✔
179
        }
180

181
        /**
182
         * Runs {@link StreamDataWriter#write(OutputStream)} in a separate thread using {@link TaskExecutor} or copies 
183
         * in-memory if the length is smaller than {@link #BUFFER_SIZE}.
184
         * <p>
185
         * The returned InputStream doesn't need to be closed and the close operation takes no effect.
186
         * 
187
         * @param writer the write method
188
         * @param length the number of bytes if known or null
189
         * @return InputStream
190
         * 
191
         * @throws IOException when failing to stream data
192
         */
193
        public InputStream streamData(StreamDataWriter writer, Long length) throws IOException {
194
                if (length != null && length < BUFFER_SIZE) {
1✔
195
                        ByteArrayOutputStream out = new ByteArrayOutputStream(length.intValue());
×
196
                        try {
197
                                writer.write(out);
×
198
                        } catch (Exception e) {
×
199
                                throw new IOException("Failed to write data to byte array", e);
×
200
                        }
×
201
                        return new ByteArrayInputStream(out.toByteArray());
×
202
                } else {
203
                        QueueInputStream in = new QueueInputStream();
1✔
204

205
                        taskExecutor.execute(() -> {
1✔
206
                                QueueOutputStream out = in.newQueueOutputStream();
1✔
207
                                try {
208
                                        writer.write(out);
1✔
209
                                } catch (Exception e) {
1✔
210
                                        log.error("Failed to write data in parallel", e);
1✔
211
                                        in.propagateStreamException(new IOException("Failed to write data in parallel", e));
1✔
212
                                } finally {
213
                                        // Closing quietly as any exceptions in QueueOutputStream.close() are propagated
214
                                        IOUtils.closeQuietly(out);
1✔
215
                                }
216
                        });
1✔
217

218
                        return in;
1✔
219
                }
220
        }
221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc