• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

evolvedbinary / elemental / 982

29 Apr 2025 08:34PM UTC coverage: 56.409% (+0.007%) from 56.402%
982

push

circleci

adamretter
[feature] Improve README.md badges

28451 of 55847 branches covered (50.94%)

Branch coverage included in aggregate %.

77468 of 131924 relevant lines covered (58.72%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.14
/exist-core/src/main/java/org/exist/util/io/CachingFilterInputStream.java
1
/*
2
 * Elemental
3
 * Copyright (C) 2024, Evolved Binary Ltd
4
 *
5
 * admin@evolvedbinary.com
6
 * https://www.evolvedbinary.com | https://www.elemental.xyz
7
 *
8
 * Use of this software is governed by the Business Source License 1.1
9
 * included in the LICENSE file and at www.mariadb.com/bsl11.
10
 *
11
 * Change Date: 2028-04-27
12
 *
13
 * On the date above, in accordance with the Business Source License, use
14
 * of this software will be governed by the Apache License, Version 2.0.
15
 *
16
 * Additional Use Grant: Production use of the Licensed Work for a permitted
17
 * purpose. A Permitted Purpose is any purpose other than a Competing Use.
18
 * A Competing Use means making the Software available to others in a commercial
19
 * product or service that: substitutes for the Software; substitutes for any
20
 * other product or service we offer using the Software that exists as of the
21
 * date we make the Software available; or offers the same or substantially
22
 * similar functionality as the Software.
23
 */
24
package org.exist.util.io;
25

26
import net.jcip.annotations.NotThreadSafe;
27

28
import java.io.FilterInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31

32
/**
33
 * Implementation of an Input Stream Filter that extends any InputStream with
34
 * mark() and reset() capabilities by caching the read data for later
35
 * re-reading.
36
 *
37
 * NOTE - Only supports reading data up to 2GB as the cache index uses an 'int'
38
 * index
39
 *
40
 * @version 1.1
41
 *
42
 * @author <a href="mailto:adam.retter@googlemail.com">Adam Retter</a>
43
 * @author <a href="tobi.krebsATgmail.com">Tobi Krebs</a>
44
 */
45
@NotThreadSafe
46
public class CachingFilterInputStream extends FilterInputStream {
47

48
    //TODO what about if the underlying stream supports marking
49
    //then we could just use its capabilities?
50
    private final FilterInputStreamCache cache;
51

52
    private int srcOffset = 0;
1✔
53
    private int mark = 0;
1✔
54

55
    /**
56
     * Constructor which uses an existing Cache from a CachingFilterInputStream,
57
     * if inputStream is a CachingFilterInputStream.
58
     *
59
     * @param inputStream the input stream
60
     *
61
     * @throws InstantiationException if the construction fails
62
     */
63
    public CachingFilterInputStream(final InputStream inputStream) throws InstantiationException {
64
        super(null);
1✔
65

66
        if (inputStream instanceof CachingFilterInputStream) {
1!
67
            this.cache = ((CachingFilterInputStream) inputStream).shareCache();     // must be #shareCache not #getCache() to increment references
1✔
68
        } else {
1✔
69
            throw new InstantiationException("Only CachingFilterInputStream are supported as InputStream");
×
70
        }
71
    }
1✔
72

73
    public CachingFilterInputStream(final FilterInputStreamCache cache) {
74
        super(null);
1✔
75
        this.cache = cache;
1✔
76
    }
1✔
77

78
    /**
79
     * Gets the cache implementation directly.
80
     */
81
    FilterInputStreamCache getCache() {
82
        return cache;
1✔
83
    }
84

85
    /**
86
     * Gets the cache implementation for
87
     * sharing with another source. This is done
88
     * by incrementing its shared reference count.
89
     *
90
     * @return the cache implementation
91
     */
92
    FilterInputStreamCache shareCache() {
93
        cache.incrementSharedReferences();
1✔
94
        return cache;
1✔
95
    }
96

97
    @Override
98
    public int available() throws IOException {
99
        return getCache().available() - srcOffset;
1✔
100
    }
101

102
    @Override
103
    public synchronized void mark(final int readLimit) {
104
        mark = srcOffset;
1✔
105
    }
1✔
106

107
    @Override
108
    public boolean markSupported() {
109
        return true;
×
110
    }
111

112
    @Override
113
    public synchronized void reset() throws IOException {
114
        srcOffset = mark;
1✔
115
    }
1✔
116

117
    @Override
118
    public int read() throws IOException {
119

120
        if (getCache().isSrcClosed()) {
1✔
121
            throw new IOException(FilterInputStreamCache.INPUTSTREAM_CLOSED);
1✔
122
        }
123

124
        //Read from cache
125
        if (useCache()) {
1✔
126
            final int data = getCache().get(srcOffset++);
1✔
127
            return data;
1✔
128
        } else {
129
            final int data = getCache().read();
1✔
130
            
131
            if(data == FileFilterInputStreamCache.END_OF_STREAM) {
1✔
132
                return FilterInputStreamCache.END_OF_STREAM;
1✔
133
            }
134
            
135
            srcOffset++;
1✔
136
            return data;
1✔
137
        }
138
    }
139

140
    @Override
141
    public int read(final byte[] b) throws IOException {
142
        return read(b, 0, b.length);
1✔
143
    }
144

145
    @Override
146
    public int read(final byte[] b, final int off, final int len) throws IOException {
147

148
        if (getCache().isSrcClosed()) {
1✔
149
            throw new IOException(FilterInputStreamCache.INPUTSTREAM_CLOSED);
1✔
150
        }
151

152
        if (useCache()) {
1✔
153

154
            //copy data from the cache
155
            int actualLen = (len > getCache().getLength() - this.srcOffset ? getCache().getLength() - this.srcOffset : len);
1✔
156
            getCache().copyTo(this.srcOffset, b, off, actualLen);
1✔
157
            this.srcOffset += actualLen;
1✔
158

159
            //if the requested bytes were more than what is present in the cache, then also read from the src
160
            if (actualLen < len) {
1✔
161
                int srcLen = getCache().read(b, off + actualLen, len - actualLen);
1✔
162

163
                //have we reached the end of the stream?
164
                if (srcLen == FilterInputStreamCache.END_OF_STREAM) {
1✔
165
                    return actualLen;
1✔
166
                }
167

168
                //increase srcOffset due to the read opertaion above
169
                srcOffset += srcLen;
1✔
170

171
                actualLen += srcLen;
1✔
172
            }
173

174
            return actualLen;
1✔
175

176
        } else {
177
            int actualLen = getCache().read(b, off, len);
1✔
178

179
            //have we reached the end of the stream?
180
            if (actualLen == FilterInputStreamCache.END_OF_STREAM) {
1✔
181
                return actualLen;
1✔
182
            }
183

184
            //increase srcOffset due to read operation above
185
            srcOffset += actualLen;
1✔
186

187
            return actualLen;
1✔
188
        }
189
    }
190

191
    public boolean isClosed() {
192
        return getCache().isSrcClosed();
1✔
193
    }
194

195
    /**
196
     * Closes the src InputStream and empties the cache
197
     */
198
    @Override
199
    public void close() throws IOException {
200
        if(!getCache().isSrcClosed()) {
1✔
201
            getCache().close();
1✔
202
        }    
203
    }
1✔
204

205
    /**
206
     * Determine the current offset
207
     *
208
     * @return The current offset of this stream
209
     */
210
    public int offset() {
211
        return srcOffset;
1✔
212
    }
213

214
    /**
215
     * Similar to {@link #skip(long)} but travels backwards
216
     *
217
     * @param len The number of bytes to skip backwards
218
     *
219
     * @return The actual number of bytes skipped backwards
220
     */
221
    public long skipBackwards(final long len) {
222
        if(len == 0) {
×
223
            return 0;
×
224
        }
225

226
        // can only skip back to zero
227
        final long actualLen = Math.min(srcOffset, len);
×
228

229
        srcOffset = srcOffset - (int)actualLen;
×
230

231
        return actualLen;
×
232
    }
233

234
    /**
235
     * We cant actually skip as we need to read so that we can cache the data,
236
     * however apart from the potentially increased I/O and Memory, the end
237
     * result is the same
238
     */
239
    @Override
240
    public long skip(final long len) throws IOException {
241

242
        if (getCache().isSrcClosed()) {
1✔
243
            throw new IOException(FilterInputStreamCache.INPUTSTREAM_CLOSED);
1✔
244
        } else if (len < 1) {
1✔
245
            return 0;
1✔
246
        }
247

248
        if (useCache()) {
1✔
249

250
            //skip data from the cache
251
            long actualLen = (len > getCache().getLength() - this.srcOffset ? getCache().getLength() - this.srcOffset : len);
1!
252

253
            //if the requested bytes were more than what is present in the cache, then also read from the src
254
            if (actualLen < len) {
1!
255

256
                // we can't skip directly on the src otherwise it will never be read into the cache, so we read over the amount of bytes we want to skip instead
257

258
                final int toReadFromSrc = (int) (len - actualLen);
×
259
                final byte[] skipped = new byte[toReadFromSrc];
×
260

261
                //read some data from the source (and into the cache)
262
                int toRead = toReadFromSrc;
×
263
                while(toRead > 0) {
×
264
                    final int read = getCache().read(skipped, 0, toRead);
×
265

266
                    //have we reached the end of the stream?
267
                    if(read == FilterInputStreamCache.END_OF_STREAM) {
×
268
                        break;
×
269
                    }
270

271
                    toRead -= read;
×
272
                    actualLen += read;
×
273
                }
274
            }
275

276
            //increase srcOffset due to the read operation above
277
            srcOffset += (int)actualLen;
1✔
278

279
            return actualLen;
1✔
280

281
        } else {
282

283
            final byte[] skipped = new byte[(int) len];  //TODO could overflow
1✔
284
            int toRead = (int)len;
1✔
285

286
            int totalRead = 0;
1✔
287
            while(toRead > 0) {
1✔
288
                final int read = getCache().read(skipped, 0, toRead);
1✔
289

290
                //have we reached the end of the stream?
291
                if(read == FilterInputStreamCache.END_OF_STREAM) {
1!
292
                    break;
×
293
                }
294

295
                toRead -= read;
1✔
296
                totalRead += read;
1✔
297
            }
298

299
            //increase srcOffset due to read operation above
300
            srcOffset += totalRead;
1✔
301

302
            return totalRead;
1✔
303
        }
304
    }
305

306
    private boolean useCache() {
307
        //If cache hasRead and srcOffset is still in cache useCache
308
        return getCache().getSrcOffset() > 0 && getCache().getLength() > srcOffset;
1✔
309
    }
310

311
    /**
312
     * Increments the number of shared references to the cache.
313
     */
314
    public void incrementSharedReferences() {
315
        getCache().incrementSharedReferences();
1✔
316
    }
1✔
317

318
    /**
319
     * Decrements the number of shared references to the cache.
320
     */
321
    public void decrementSharedReferences() {
322
        getCache().decrementSharedReferences();
×
323
    }
×
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc