• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 17208175973

25 Aug 2025 12:00PM UTC coverage: 63.742% (+0.07%) from 63.671%
17208175973

push

github

ibacher
TRUNK-6395: Saner scheme for copying properties from the installation script (#5260)

0 of 2 new or added lines in 1 file covered. (0.0%)

697 existing lines in 13 files now uncovered.

22147 of 34745 relevant lines covered (63.74%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/api/src/main/java/org/openmrs/api/stream/StreamDataService.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.api.stream;
11

12
import java.io.ByteArrayInputStream;
13
import java.io.ByteArrayOutputStream;
14
import java.io.IOException;
15
import java.io.InputStream;
16
import java.io.InterruptedIOException;
17
import java.io.OutputStream;
18
import java.io.UncheckedIOException;
19
import java.time.Duration;
20
import java.util.concurrent.BlockingQueue;
21
import java.util.concurrent.LinkedBlockingQueue;
22
import java.util.concurrent.TimeUnit;
23

24
import org.apache.commons.io.IOUtils;
25
import org.slf4j.Logger;
26
import org.slf4j.LoggerFactory;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.core.task.TaskExecutor;
29
import org.springframework.stereotype.Service;
30
import org.springframework.util.unit.DataSize;
31

32
/**
33
 * The service can be used to convert data from OutputStream to InputStream without copying all data in memory.
34
 * <p>
35
 * The {@link #streamData(StreamDataWriter, Long)} method may run {@link StreamDataWriter#write(OutputStream)} in a 
36
 * separate thread using {@link TaskExecutor}.
37
 * <p>
38
 * It's providing the {@link java.io.PipedInputStream}/{@link java.io.PipedOutputStream} mechanism in a thread safe way 
39
 * with the use of {@link BlockingQueue}.
40
 * 
41
 * @since 2.8.0, 2.7.5, 2.6.16, 2.5.15
42
 */
43
@Service
44
public class StreamDataService {
45
        public static final int BUFFER_SIZE = (int) DataSize.ofKilobytes(128).toBytes();
1✔
46
        
47
        private static final Logger log = LoggerFactory.getLogger(StreamDataService.class);
1✔
48
        private final TaskExecutor taskExecutor;
49
        
50
        public StreamDataService(@Autowired TaskExecutor taskExecutor) {
1✔
51
                this.taskExecutor = taskExecutor;
1✔
52
        }
1✔
53
        
54
        private static class QueueInputStream extends InputStream {
55
                private final BlockingQueue<Integer> blockingQueue;
56
                private final long timeoutNanos;
57
                private volatile IOException streamException;
58

59
                public QueueInputStream() {
1✔
60
                        this.blockingQueue = new LinkedBlockingQueue<>(BUFFER_SIZE);
1✔
61
                        this.timeoutNanos = Duration.ofSeconds(30).toNanos();
1✔
62
                }
1✔
63

64
                public QueueOutputStream newQueueOutputStream() {
65
                        return new QueueOutputStream(this);
1✔
66
                }
67

68
                @Override
69
                public int read() throws IOException {
70
                        try {
71
                                checkStreamException();
1✔
72
                                
73
                                int result;
74
                                Integer peek = this.blockingQueue.peek();
1✔
75
                                if (Integer.valueOf(-1).equals(peek)) {
1✔
76
                                        result = -1;
1✔
77
                                } else {
78
                                        Integer value = this.blockingQueue.poll(this.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
79
                                        if (value == null) {
1✔
80
                                                // Timeout
81
                                                result = -1;
1✔
82
                                        } else if (value == -1) {
1✔
83
                                                // End of stream. Put the end of stream back in the queue for consistency.
84
                                                this.blockingQueue.clear();
×
UNCOV
85
                                                if (!this.blockingQueue.offer(-1, timeoutNanos, TimeUnit.NANOSECONDS)) {
×
UNCOV
86
                                                        throw new IOException("Failed to write to full queue");
×
87
                                                }
88
                                                result = -1;
×
89
                                        } else {
90
                                                result = 255 & value;
1✔
91
                                        }
92
                                }
93

94
                                checkStreamException();
1✔
95
                                return result;
1✔
96
                        } catch (InterruptedException e) {
×
97
                                Thread.currentThread().interrupt();
×
UNCOV
98
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
UNCOV
99
                                interruptedIoException.initCause(e);
×
UNCOV
100
                                throw interruptedIoException;
×
101
                        }
102
                }
103

104
                @Override
105
                public void close() throws IOException {
UNCOV
106
                        checkStreamException();
×
UNCOV
107
                        super.close();
×
UNCOV
108
                }
×
109

110
                /**
111
                 * Propagate exception from a writing thread to a reading thread so that processing is stopped.
112
                 * 
113
                 * @param streamException exception
114
                 * @throws UncheckedIOException rethrows e
115
                 */
116
                public void propagateStreamException(IOException streamException) {
117
                        this.streamException = streamException;
1✔
118
                }
1✔
119
                
120
                public void checkStreamException() throws IOException {
121
                        if (streamException != null) {
1✔
122
                                throw streamException;
1✔
123
                        }
124
                }
1✔
125
        }
126
        
127
        private static class QueueOutputStream extends OutputStream {
128
                private final QueueInputStream queueInputStream;
129
                
130
                public QueueOutputStream(QueueInputStream queueInputStream) {
1✔
131
                        this.queueInputStream = queueInputStream;
1✔
132
                }
1✔
133

134
                /**
135
                 * @param b   the <code>byte</code>.
136
                 * @throws IOException when queue full or interrupted
137
                 */
138
                @Override
139
                public void write(int b) throws IOException {
140
                        try {
141
                                queueInputStream.checkStreamException();
1✔
142
                                
143
                                if (!queueInputStream.blockingQueue.offer(255 & b, queueInputStream.timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
UNCOV
144
                                        IOException streamException = new IOException("Failed to write to full queue");
×
UNCOV
145
                                        queueInputStream.propagateStreamException(streamException);
×
146
                                }
UNCOV
147
                        } catch (InterruptedException e) {
×
UNCOV
148
                                Thread.currentThread().interrupt();
×
UNCOV
149
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
150
                                interruptedIoException.initCause(e);
×
UNCOV
151
                                throw interruptedIoException;
×
152
                        }
1✔
153
                }
1✔
154

155
                /**
156
                 * Closing the stream doesn't fail any following writes, but effectively only data up to closing the stream
157
                 * is read.
158
                 * 
159
                 * @throws IOException when queue full or interrupted
160
                 */
161
                @Override
162
                public void close() throws IOException {
163
                        try {
164
                                queueInputStream.checkStreamException();
1✔
165
                                
166
                                // Indicate the end of stream
167
                                if (!this.queueInputStream.blockingQueue.offer(-1, queueInputStream.timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
UNCOV
168
                                        IOException streamException = new IOException("Failed to write to full queue");
×
UNCOV
169
                                        queueInputStream.propagateStreamException(streamException);
×
170
                                }
UNCOV
171
                        } catch (InterruptedException e) {
×
UNCOV
172
                                Thread.currentThread().interrupt();
×
UNCOV
173
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
UNCOV
174
                                interruptedIoException.initCause(e);
×
175
                                throw interruptedIoException;
×
176
                        }
1✔
177
                }
1✔
178
        }
179

180
        /**
181
         * Runs {@link StreamDataWriter#write(OutputStream)} in a separate thread using {@link TaskExecutor} or copies 
182
         * in-memory if the length is smaller than {@link #BUFFER_SIZE}.
183
         * <p>
184
         * The returned InputStream doesn't need to be closed and the close operation takes no effect.
185
         * 
186
         * @param writer the write method
187
         * @param length the number of bytes if known or null
188
         * @return InputStream
189
         * 
190
         * @throws IOException when failing to stream data
191
         */
192
        public InputStream streamData(StreamDataWriter writer, Long length) throws IOException {
193
                if (length != null && length < BUFFER_SIZE) {
1✔
UNCOV
194
                        ByteArrayOutputStream out = new ByteArrayOutputStream(length.intValue());
×
195
                        try {
UNCOV
196
                                writer.write(out);
×
UNCOV
197
                        } catch (Exception e) {
×
UNCOV
198
                                throw new IOException("Failed to write data to byte array", e);
×
UNCOV
199
                        }
×
UNCOV
200
                        return new ByteArrayInputStream(out.toByteArray());
×
201
                } else {
202
                        QueueInputStream in = new QueueInputStream();
1✔
203

204
                        taskExecutor.execute(() -> {
1✔
205
                                QueueOutputStream out = in.newQueueOutputStream();
1✔
206
                                try {
207
                                        writer.write(out);
1✔
208
                                } catch (Exception e) {
1✔
209
                                        log.error("Failed to write data in parallel", e);
1✔
210
                                        in.propagateStreamException(new IOException("Failed to write data in parallel", e));
1✔
211
                                } finally {
212
                                        // Closing quietly as any exceptions in QueueOutputStream.close() are propagated
213
                                        IOUtils.closeQuietly(out);
1✔
214
                                }
215
                        });
1✔
216

217
                        return in;
1✔
218
                }
219
        }
220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc