• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openmrs / openmrs-core / 14617411652

23 Apr 2025 11:50AM UTC coverage: 63.801% (+0.1%) from 63.69%
14617411652

push

github

rkorytkowski
TRUNK-6300 Add Storage Service

(cherry picked from commit 8a14c70a3)

137 of 227 new or added lines in 6 files covered. (60.35%)

1 existing line in 1 file now uncovered.

22160 of 34733 relevant lines covered (63.8%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.7
/api/src/main/java/org/openmrs/api/stream/StreamDataService.java
1
/**
2
 * This Source Code Form is subject to the terms of the Mozilla Public License,
3
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
4
 * obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under
5
 * the terms of the Healthcare Disclaimer located at http://openmrs.org/license.
6
 *
7
 * Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS
8
 * graphic logo is a trademark of OpenMRS Inc.
9
 */
10
package org.openmrs.api.stream;
11

12
import java.io.ByteArrayInputStream;
13
import java.io.ByteArrayOutputStream;
14
import java.io.IOException;
15
import java.io.InputStream;
16
import java.io.InterruptedIOException;
17
import java.io.OutputStream;
18
import java.io.UncheckedIOException;
19
import java.time.Duration;
20
import java.util.Objects;
21
import java.util.concurrent.BlockingQueue;
22
import java.util.concurrent.LinkedBlockingQueue;
23
import java.util.concurrent.TimeUnit;
24

25
import org.slf4j.Logger;
26
import org.slf4j.LoggerFactory;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.core.task.TaskExecutor;
29
import org.springframework.stereotype.Service;
30
import org.springframework.util.unit.DataSize;
31

32
/**
33
 * The service can be used to convert data from OutputStream to InputStream without copying all data in memory.
34
 * <p>
35
 * The {@link #streamData(StreamDataWriter, Long)} method may run {@link StreamDataWriter#write(OutputStream)} in a 
36
 * separate thread using {@link TaskExecutor}.
37
 * <p>
38
 * It's providing the {@link java.io.PipedInputStream}/{@link java.io.PipedOutputStream} mechanism in a thread safe way 
39
 * with the use of {@link BlockingQueue}.
40
 * 
41
 * @since 2.8.0, 2.7.4, 2.6.16, 2.5.15
42
 */
43
@Service
44
public class StreamDataService {
45
        public static final int BUFFER_SIZE = (int) DataSize.ofKilobytes(128).toBytes();
1✔
46
        
47
        private static final Logger log = LoggerFactory.getLogger(StreamDataService.class);
1✔
48
        private final TaskExecutor taskExecutor;
49
        
50
        public StreamDataService(@Autowired TaskExecutor taskExecutor) {
1✔
51
                this.taskExecutor = taskExecutor;
1✔
52
        }
1✔
53
        
54
        private static class QueueInputStream extends InputStream {
55
                private final BlockingQueue<Integer> blockingQueue;
56
                private final long timeoutNanos;
57
                private volatile IOException streamException;
58

59
                public QueueInputStream() {
1✔
60
                        this.blockingQueue = new LinkedBlockingQueue<>(BUFFER_SIZE);
1✔
61
                        this.timeoutNanos = Duration.ofSeconds(30).toNanos();
1✔
62
                }
1✔
63

64
                public QueueOutputStream newQueueOutputStream() {
65
                        return new QueueOutputStream(blockingQueue, timeoutNanos);
1✔
66
                }
67

68
                @Override
69
                public int read() throws IOException {
70
                        try {
71
                                if (streamException != null) {
1✔
72
                                        // Rethrow exception if writer failed.
73
                                        throw streamException;
1✔
74
                                }
75
                                
76
                                Integer peek = this.blockingQueue.peek();
1✔
77
                                if (Integer.valueOf(-1).equals(peek)) {
1✔
78
                                        return -1;
1✔
79
                                }
80

81
                                Integer value = this.blockingQueue.poll(this.timeoutNanos, TimeUnit.NANOSECONDS);
1✔
82
                                if (value == null) {
1✔
83
                                        // Timeout
NEW
84
                                        return -1;
×
85
                                } else if (value == -1) {
1✔
86
                                        // End of stream. Put the end of stream back in the queue for consistency.
NEW
87
                                        this.blockingQueue.clear();
×
NEW
88
                                        if (!this.blockingQueue.offer(-1, timeoutNanos, TimeUnit.NANOSECONDS)) {
×
NEW
89
                                                throw new IOException("Failed to write to full queue");
×
90
                                        }
NEW
91
                                        return -1;
×
92
                                } else {
93
                                        return 255 & value;
1✔
94
                                }
NEW
95
                        } catch (InterruptedException e) {
×
NEW
96
                                Thread.currentThread().interrupt();
×
NEW
97
                                throw new IllegalStateException(e);
×
98
                        }
99
                }
100

101
                /**
102
                 * Propagate exception from a writing thread to a reading thread so that processing is stopped.
103
                 * 
104
                 * @param streamException exception
105
                 * @throws UncheckedIOException rethrows e
106
                 */
107
                public void propagateStreamException(IOException streamException) {
108
                        this.streamException = streamException;
1✔
109
                        throw new UncheckedIOException(this.streamException);
1✔
110
                }
111
        }
112
        
113
        private static class QueueOutputStream extends OutputStream {
114
                private final BlockingQueue<Integer> blockingQueue;
115
                private final long timeoutNanos;
116
                
117
                public QueueOutputStream(BlockingQueue<Integer> blockingQueue, long timeoutNanos) {
1✔
118
                        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
1✔
119
                        this.timeoutNanos = timeoutNanos;
1✔
120
                }
1✔
121

122
                /**
123
                 * @param b   the <code>byte</code>.
124
                 * @throws IOException when queue full or interrupted
125
                 */
126
                @Override
127
                public void write(int b) throws IOException {
128
                        try {
129
                                if (!this.blockingQueue.offer(255 & b, timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
NEW
130
                                        throw new IOException("Failed to write to full queue");
×
131
                                }
NEW
132
                        } catch (InterruptedException e) {
×
NEW
133
                                Thread.currentThread().interrupt();
×
NEW
134
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
NEW
135
                                interruptedIoException.initCause(e);
×
NEW
136
                                throw interruptedIoException;
×
137
                        }
1✔
138
                }
1✔
139

140
                /**
141
                 * Closing the stream doesn't fail any following writes, but effectively only data up to closing the stream
142
                 * is read.
143
                 * 
144
                 * @throws IOException when queue full or interrupted
145
                 */
146
                @Override
147
                public void close() throws IOException {
148
                        try {
149
                                // Indicate the end of stream
150
                                if (!this.blockingQueue.offer(-1, timeoutNanos, TimeUnit.NANOSECONDS)) {
1✔
NEW
151
                                        throw new IOException("Failed to write to full queue");
×
152
                                }
NEW
153
                        } catch (InterruptedException e) {
×
NEW
154
                                Thread.currentThread().interrupt();
×
NEW
155
                                InterruptedIOException interruptedIoException = new InterruptedIOException();
×
NEW
156
                                interruptedIoException.initCause(e);
×
NEW
157
                                throw interruptedIoException;
×
158
                        }
1✔
159
                }
1✔
160
        }
161

162
        /**
163
         * Runs {@link StreamDataWriter#write(OutputStream)} in a separate thread using {@link TaskExecutor} or copies 
164
         * in-memory if the length is smaller than {@link #BUFFER_SIZE}.
165
         * <p>
166
         * The returned InputStream doesn't need to be closed and the close operation takes no effect.
167
         * 
168
         * @param writer the write method
169
         * @param length the number of bytes if known or null
170
         * @return InputStream
171
         * 
172
         * @throws IOException when failing to stream data
173
         */
174
        public InputStream streamData(StreamDataWriter writer, Long length) throws IOException {
175
                if (length != null && length < BUFFER_SIZE) {
1✔
NEW
176
                        ByteArrayOutputStream out = new ByteArrayOutputStream(length.intValue());
×
177
                        try {
NEW
178
                                writer.write(out);
×
NEW
179
                        } catch (Exception e) {
×
NEW
180
                                throw new IOException("Failed to write data to byte array", e);
×
NEW
181
                        }
×
NEW
182
                        return new ByteArrayInputStream(out.toByteArray());
×
183
                } else {
184
                        QueueInputStream in = new QueueInputStream();
1✔
185

186
                        taskExecutor.execute(() -> {
1✔
187
                                try (QueueOutputStream out = in.newQueueOutputStream()) {
1✔
188
                                        writer.write(out);
1✔
189
                                } catch (Exception e) {
1✔
NEW
190
                                        in.propagateStreamException(new IOException("Failed to write data in parallel", e));
×
191
                                }
1✔
192
                        });
1✔
193

194
                        return in;
1✔
195
                }
196
        }
197
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc