• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20097

26 Nov 2025 09:31PM UTC coverage: 88.614% (+0.02%) from 88.595%
#20097

push

github

ejona86
core: remove unused method from CompositeReadableBuffer

35117 of 39629 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.62
/../core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import java.io.IOException;
20
import java.io.OutputStream;
21
import java.nio.Buffer;
22
import java.nio.ByteBuffer;
23
import java.nio.InvalidMarkException;
24
import java.util.ArrayDeque;
25
import java.util.Deque;
26
import javax.annotation.Nullable;
27

28
/**
29
 * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
30
 * facade that allows multiple buffers to be treated as one.
31
 *
32
 * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
33
 * the composite has read past the end of a given buffer, that buffer is automatically closed and
34
 * removed from the composite.
35
 */
36
public class CompositeReadableBuffer extends AbstractReadableBuffer {
37

38
  private final Deque<ReadableBuffer> readableBuffers;
39
  private Deque<ReadableBuffer> rewindableBuffers;
40
  private int readableBytes;
41
  private boolean marked;
42

43
  public CompositeReadableBuffer(int initialCapacity) {
1✔
44
    readableBuffers = new ArrayDeque<>(initialCapacity);
1✔
45
  }
1✔
46

47
  public CompositeReadableBuffer() {
1✔
48
    readableBuffers = new ArrayDeque<>();
1✔
49
  }
1✔
50

51
  /**
52
   * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
53
   * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
54
   * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
55
   * this {@code CompositeBuffer}.
56
   */
57
  public void addBuffer(ReadableBuffer buffer) {
58
    boolean markHead = marked && readableBuffers.isEmpty();
1✔
59
    enqueueBuffer(buffer);
1✔
60
    if (markHead) {
1✔
61
      readableBuffers.peek().mark();
1✔
62
    }
63
  }
1✔
64

65
  private void enqueueBuffer(ReadableBuffer buffer) {
66
    if (!(buffer instanceof CompositeReadableBuffer)) {
1✔
67
      readableBuffers.add(buffer);
1✔
68
      readableBytes += buffer.readableBytes();
1✔
69
      return;
1✔
70
    }
71

72
    CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
1✔
73
    while (!compositeBuffer.readableBuffers.isEmpty()) {
1✔
74
      ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
1✔
75
      readableBuffers.add(subBuffer);
1✔
76
    }
1✔
77
    readableBytes += compositeBuffer.readableBytes;
1✔
78
    compositeBuffer.readableBytes = 0;
1✔
79
    compositeBuffer.close();
1✔
80
  }
1✔
81

82
  @Override
83
  public int readableBytes() {
84
    return readableBytes;
1✔
85
  }
86

87
  private static final NoThrowReadOperation<Void> UBYTE_OP =
1✔
88
      new NoThrowReadOperation<Void>() {
1✔
89
        @Override
90
        public int read(ReadableBuffer buffer, int length, Void unused, int value) {
91
          return buffer.readUnsignedByte();
1✔
92
        }
93
      };
94

95
  @Override
96
  public int readUnsignedByte() {
97
    return executeNoThrow(UBYTE_OP, 1, null, 0);
1✔
98
  }
99

100
  private static final NoThrowReadOperation<Void> SKIP_OP =
1✔
101
      new NoThrowReadOperation<Void>() {
1✔
102
        @Override
103
        public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
104
          buffer.skipBytes(length);
1✔
105
          return 0;
1✔
106
        }
107
      };
108

109
  @Override
110
  public void skipBytes(int length) {
111
    executeNoThrow(SKIP_OP, length, null, 0);
1✔
112
  }
1✔
113

114
  private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
1✔
115
      new NoThrowReadOperation<byte[]>() {
1✔
116
        @Override
117
        public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
118
          buffer.readBytes(dest, offset, length);
1✔
119
          return offset + length;
1✔
120
        }
121
      };
122

123
  private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP =
1✔
124
      new NoThrowReadOperation<ByteBuffer>() {
1✔
125
        @Override
126
        public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) {
127
          // Change the limit so that only lengthToCopy bytes are available.
128
          int prevLimit = dest.limit();
1✔
129
          ((Buffer) dest).limit(dest.position() + length);
1✔
130
          // Write the bytes and restore the original limit.
131
          buffer.readBytes(dest);
1✔
132
          ((Buffer) dest).limit(prevLimit);
1✔
133
          return 0;
1✔
134
        }
135
      };
136

137
  private static final ReadOperation<OutputStream> STREAM_OP =
1✔
138
      new ReadOperation<OutputStream>() {
1✔
139
        @Override
140
        public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
141
            throws IOException {
142
          buffer.readBytes(dest, length);
1✔
143
          return 0;
1✔
144
        }
145
      };
146

147
  @Override
148
  public void readBytes(byte[] dest, int destOffset, int length) {
149
    executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
1✔
150
  }
1✔
151

152
  @Override
153
  public void readBytes(ByteBuffer dest) {
154
    executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0);
1✔
155
  }
1✔
156

157
  @Override
158
  public void readBytes(OutputStream dest, int length) throws IOException {
159
    execute(STREAM_OP, length, dest, 0);
1✔
160
  }
1✔
161

162
  @Override
163
  public ReadableBuffer readBytes(int length) {
164
    if (length <= 0) {
1✔
165
      return ReadableBuffers.empty();
×
166
    }
167
    checkReadable(length);
1✔
168
    readableBytes -= length;
1✔
169

170
    ReadableBuffer newBuffer = null;
1✔
171
    CompositeReadableBuffer newComposite = null;
1✔
172
    do {
173
      ReadableBuffer buffer = readableBuffers.peek();
1✔
174
      int readable = buffer.readableBytes();
1✔
175
      ReadableBuffer readBuffer;
176
      if (readable > length) {
1✔
177
        readBuffer = buffer.readBytes(length);
1✔
178
        length = 0;
1✔
179
      } else {
180
        if (marked) {
1✔
181
          readBuffer = buffer.readBytes(readable);
1✔
182
          advanceBuffer();
1✔
183
        } else {
184
          readBuffer = readableBuffers.poll();
1✔
185
        }
186
        length -= readable;
1✔
187
      }
188
      if (newBuffer == null) {
1✔
189
        newBuffer = readBuffer;
1✔
190
      } else {
191
        if (newComposite == null) {
1✔
192
          newComposite = new CompositeReadableBuffer(
1✔
193
              length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
1✔
194
          newComposite.addBuffer(newBuffer);
1✔
195
          newBuffer = newComposite;
1✔
196
        }
197
        newComposite.addBuffer(readBuffer);
1✔
198
      }
199
    } while (length > 0);
1✔
200
    return newBuffer;
1✔
201
  }
202

203
  @Override
204
  public boolean markSupported() {
205
    for (ReadableBuffer buffer : readableBuffers) {
1✔
206
      if (!buffer.markSupported()) {
1✔
207
        return false;
1✔
208
      }
209
    }
1✔
210
    return true;
1✔
211
  }
212

213
  @Override
214
  public void mark() {
215
    if (rewindableBuffers == null) {
1✔
216
      rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
1✔
217
    }
218
    while (!rewindableBuffers.isEmpty()) {
1✔
219
      rewindableBuffers.remove().close();
1✔
220
    }
221
    marked = true;
1✔
222
    ReadableBuffer buffer = readableBuffers.peek();
1✔
223
    if (buffer != null) {
1✔
224
      buffer.mark();
1✔
225
    }
226
  }
1✔
227

228
  @Override
229
  public void reset() {
230
    if (!marked) {
1✔
231
      throw new InvalidMarkException();
1✔
232
    }
233
    ReadableBuffer buffer;
234
    if ((buffer = readableBuffers.peek()) != null) {
1✔
235
      int currentRemain = buffer.readableBytes();
1✔
236
      buffer.reset();
1✔
237
      readableBytes += (buffer.readableBytes() - currentRemain);
1✔
238
    }
239
    while ((buffer = rewindableBuffers.pollLast()) != null) {
1✔
240
      buffer.reset();
1✔
241
      readableBuffers.addFirst(buffer);
1✔
242
      readableBytes += buffer.readableBytes();
1✔
243
    }
244
  }
1✔
245

246
  @Override
247
  public boolean byteBufferSupported() {
248
    for (ReadableBuffer buffer : readableBuffers) {
1✔
249
      if (!buffer.byteBufferSupported()) {
1✔
250
        return false;
1✔
251
      }
252
    }
1✔
253
    return true;
1✔
254
  }
255

256
  @Nullable
257
  @Override
258
  public ByteBuffer getByteBuffer() {
259
    if (readableBuffers.isEmpty()) {
1✔
260
      return null;
×
261
    }
262
    return readableBuffers.peek().getByteBuffer();
1✔
263
  }
264

265
  @Override
266
  public void close() {
267
    while (!readableBuffers.isEmpty()) {
1✔
268
      readableBuffers.remove().close();
1✔
269
    }
270
    if (rewindableBuffers != null) {
1✔
271
      while (!rewindableBuffers.isEmpty()) {
1✔
272
        rewindableBuffers.remove().close();
1✔
273
      }
274
    }
275
  }
1✔
276

277
  /**
278
   * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
279
   * satisfy the requested {@code length}.
280
   */
281
  private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
282
    checkReadable(length);
1✔
283

284
    if (!readableBuffers.isEmpty()) {
1✔
285
      advanceBufferIfNecessary();
1✔
286
    }
287

288
    for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
1✔
289
      ReadableBuffer buffer = readableBuffers.peek();
1✔
290
      int lengthToCopy = Math.min(length, buffer.readableBytes());
1✔
291

292
      // Perform the read operation for this buffer.
293
      value = op.read(buffer, lengthToCopy, dest, value);
1✔
294

295
      length -= lengthToCopy;
1✔
296
      readableBytes -= lengthToCopy;
1✔
297
    }
298

299
    if (length > 0) {
1✔
300
      // Should never get here.
301
      throw new AssertionError("Failed executing read operation");
×
302
    }
303

304
    return value;
1✔
305
  }
306

307
  private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
308
    try {
309
      return execute(op, length, dest, value);
1✔
310
    } catch (IOException e) {
×
311
      throw new AssertionError(e); // shouldn't happen
×
312
    }
313
  }
314

315
  /**
316
   * If the current buffer is exhausted, removes and closes it.
317
   */
318
  private void advanceBufferIfNecessary() {
319
    ReadableBuffer buffer = readableBuffers.peek();
1✔
320
    if (buffer.readableBytes() == 0) {
1✔
321
      advanceBuffer();
1✔
322
    }
323
  }
1✔
324

325
  /**
326
   * Removes one buffer from the front and closes it.
327
   */
328
  private void advanceBuffer() {
329
    if (marked) {
1✔
330
      rewindableBuffers.add(readableBuffers.remove());
1✔
331
      ReadableBuffer next = readableBuffers.peek();
1✔
332
      if (next != null) {
1✔
333
        next.mark();
1✔
334
      }
335
    } else {
1✔
336
      readableBuffers.remove().close();
1✔
337
    }
338
  }
1✔
339

340
  /**
341
   * A simple read operation to perform on a single {@link ReadableBuffer}.
342
   * All state management for the buffers is done by
343
   * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
344
   */
345
  private interface ReadOperation<T> {
346
    /**
347
     * This method can also be used to simultaneously perform operation-specific int-valued
348
     * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
349
     * {@code value} is the return value from the prior buffer, or the "initial" value passed
350
     * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
351
     * returned by the operation called on the last buffer.
352
     */
353
    int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
354
  }
355

356
  private interface NoThrowReadOperation<T> extends ReadOperation<T> {
357
    @Override
358
    int read(ReadableBuffer buffer, int length, T dest, int value);
359
  }
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc