• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20123

22 Dec 2025 07:27PM UTC coverage: 88.719% (-0.03%) from 88.747%
#20123

push

github

web-flow
core: Delete ReadableBuffer.readBytes(ByteBuffer) (#12580)

At the very least it isn't used now. The method is as old as
ReadableBuffer itself (05a2b252b), but it appears to have never actually
been used.

35445 of 39952 relevant lines covered (88.72%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.4
/../core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import java.io.IOException;
20
import java.io.OutputStream;
21
import java.nio.ByteBuffer;
22
import java.nio.InvalidMarkException;
23
import java.util.ArrayDeque;
24
import java.util.Deque;
25
import javax.annotation.Nullable;
26

27
/**
28
 * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
29
 * facade that allows multiple buffers to be treated as one.
30
 *
31
 * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
32
 * the composite has read past the end of a given buffer, that buffer is automatically closed and
33
 * removed from the composite.
34
 */
35
public class CompositeReadableBuffer extends AbstractReadableBuffer {
36

37
  private final Deque<ReadableBuffer> readableBuffers;
38
  private Deque<ReadableBuffer> rewindableBuffers;
39
  private int readableBytes;
40
  private boolean marked;
41

42
  public CompositeReadableBuffer(int initialCapacity) {
1✔
43
    readableBuffers = new ArrayDeque<>(initialCapacity);
1✔
44
  }
1✔
45

46
  public CompositeReadableBuffer() {
1✔
47
    readableBuffers = new ArrayDeque<>();
1✔
48
  }
1✔
49

50
  /**
51
   * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
52
   * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
53
   * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
54
   * this {@code CompositeBuffer}.
55
   */
56
  public void addBuffer(ReadableBuffer buffer) {
57
    boolean markHead = marked && readableBuffers.isEmpty();
1✔
58
    enqueueBuffer(buffer);
1✔
59
    if (markHead) {
1✔
60
      readableBuffers.peek().mark();
1✔
61
    }
62
  }
1✔
63

64
  private void enqueueBuffer(ReadableBuffer buffer) {
65
    if (!(buffer instanceof CompositeReadableBuffer)) {
1✔
66
      readableBuffers.add(buffer);
1✔
67
      readableBytes += buffer.readableBytes();
1✔
68
      return;
1✔
69
    }
70

71
    CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
1✔
72
    while (!compositeBuffer.readableBuffers.isEmpty()) {
1✔
73
      ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
1✔
74
      readableBuffers.add(subBuffer);
1✔
75
    }
1✔
76
    readableBytes += compositeBuffer.readableBytes;
1✔
77
    compositeBuffer.readableBytes = 0;
1✔
78
    compositeBuffer.close();
1✔
79
  }
1✔
80

81
  @Override
82
  public int readableBytes() {
83
    return readableBytes;
1✔
84
  }
85

86
  private static final NoThrowReadOperation<Void> UBYTE_OP =
1✔
87
      new NoThrowReadOperation<Void>() {
1✔
88
        @Override
89
        public int read(ReadableBuffer buffer, int length, Void unused, int value) {
90
          return buffer.readUnsignedByte();
1✔
91
        }
92
      };
93

94
  @Override
95
  public int readUnsignedByte() {
96
    return executeNoThrow(UBYTE_OP, 1, null, 0);
1✔
97
  }
98

99
  private static final NoThrowReadOperation<Void> SKIP_OP =
1✔
100
      new NoThrowReadOperation<Void>() {
1✔
101
        @Override
102
        public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
103
          buffer.skipBytes(length);
1✔
104
          return 0;
1✔
105
        }
106
      };
107

108
  @Override
109
  public void skipBytes(int length) {
110
    executeNoThrow(SKIP_OP, length, null, 0);
1✔
111
  }
1✔
112

113
  private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
1✔
114
      new NoThrowReadOperation<byte[]>() {
1✔
115
        @Override
116
        public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
117
          buffer.readBytes(dest, offset, length);
1✔
118
          return offset + length;
1✔
119
        }
120
      };
121

122
  private static final ReadOperation<OutputStream> STREAM_OP =
1✔
123
      new ReadOperation<OutputStream>() {
1✔
124
        @Override
125
        public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
126
            throws IOException {
127
          buffer.readBytes(dest, length);
1✔
128
          return 0;
1✔
129
        }
130
      };
131

132
  @Override
133
  public void readBytes(byte[] dest, int destOffset, int length) {
134
    executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
1✔
135
  }
1✔
136

137
  @Override
138
  public void readBytes(OutputStream dest, int length) throws IOException {
139
    execute(STREAM_OP, length, dest, 0);
1✔
140
  }
1✔
141

142
  @Override
143
  public ReadableBuffer readBytes(int length) {
144
    if (length <= 0) {
1✔
145
      return ReadableBuffers.empty();
×
146
    }
147
    checkReadable(length);
1✔
148
    readableBytes -= length;
1✔
149

150
    ReadableBuffer newBuffer = null;
1✔
151
    CompositeReadableBuffer newComposite = null;
1✔
152
    do {
153
      ReadableBuffer buffer = readableBuffers.peek();
1✔
154
      int readable = buffer.readableBytes();
1✔
155
      ReadableBuffer readBuffer;
156
      if (readable > length) {
1✔
157
        readBuffer = buffer.readBytes(length);
1✔
158
        length = 0;
1✔
159
      } else {
160
        if (marked) {
1✔
161
          readBuffer = buffer.readBytes(readable);
1✔
162
          advanceBuffer();
1✔
163
        } else {
164
          readBuffer = readableBuffers.poll();
1✔
165
        }
166
        length -= readable;
1✔
167
      }
168
      if (newBuffer == null) {
1✔
169
        newBuffer = readBuffer;
1✔
170
      } else {
171
        if (newComposite == null) {
1✔
172
          newComposite = new CompositeReadableBuffer(
1✔
173
              length == 0 ? 2 : Math.min(readableBuffers.size() + 2, 16));
1✔
174
          newComposite.addBuffer(newBuffer);
1✔
175
          newBuffer = newComposite;
1✔
176
        }
177
        newComposite.addBuffer(readBuffer);
1✔
178
      }
179
    } while (length > 0);
1✔
180
    return newBuffer;
1✔
181
  }
182

183
  @Override
184
  public boolean markSupported() {
185
    for (ReadableBuffer buffer : readableBuffers) {
1✔
186
      if (!buffer.markSupported()) {
1✔
187
        return false;
1✔
188
      }
189
    }
1✔
190
    return true;
1✔
191
  }
192

193
  @Override
194
  public void mark() {
195
    if (rewindableBuffers == null) {
1✔
196
      rewindableBuffers = new ArrayDeque<>(Math.min(readableBuffers.size(), 16));
1✔
197
    }
198
    while (!rewindableBuffers.isEmpty()) {
1✔
199
      rewindableBuffers.remove().close();
1✔
200
    }
201
    marked = true;
1✔
202
    ReadableBuffer buffer = readableBuffers.peek();
1✔
203
    if (buffer != null) {
1✔
204
      buffer.mark();
1✔
205
    }
206
  }
1✔
207

208
  @Override
209
  public void reset() {
210
    if (!marked) {
1✔
211
      throw new InvalidMarkException();
1✔
212
    }
213
    ReadableBuffer buffer;
214
    if ((buffer = readableBuffers.peek()) != null) {
1✔
215
      int currentRemain = buffer.readableBytes();
1✔
216
      buffer.reset();
1✔
217
      readableBytes += (buffer.readableBytes() - currentRemain);
1✔
218
    }
219
    while ((buffer = rewindableBuffers.pollLast()) != null) {
1✔
220
      buffer.reset();
1✔
221
      readableBuffers.addFirst(buffer);
1✔
222
      readableBytes += buffer.readableBytes();
1✔
223
    }
224
  }
1✔
225

226
  @Override
227
  public boolean byteBufferSupported() {
228
    for (ReadableBuffer buffer : readableBuffers) {
1✔
229
      if (!buffer.byteBufferSupported()) {
1✔
230
        return false;
1✔
231
      }
232
    }
1✔
233
    return true;
1✔
234
  }
235

236
  @Nullable
237
  @Override
238
  public ByteBuffer getByteBuffer() {
239
    if (readableBuffers.isEmpty()) {
1✔
240
      return null;
×
241
    }
242
    return readableBuffers.peek().getByteBuffer();
1✔
243
  }
244

245
  @Override
246
  public void close() {
247
    while (!readableBuffers.isEmpty()) {
1✔
248
      readableBuffers.remove().close();
1✔
249
    }
250
    if (rewindableBuffers != null) {
1✔
251
      while (!rewindableBuffers.isEmpty()) {
1✔
252
        rewindableBuffers.remove().close();
1✔
253
      }
254
    }
255
  }
1✔
256

257
  /**
258
   * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
259
   * satisfy the requested {@code length}.
260
   */
261
  private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
262
    checkReadable(length);
1✔
263

264
    if (!readableBuffers.isEmpty()) {
1✔
265
      advanceBufferIfNecessary();
1✔
266
    }
267

268
    for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
1✔
269
      ReadableBuffer buffer = readableBuffers.peek();
1✔
270
      int lengthToCopy = Math.min(length, buffer.readableBytes());
1✔
271

272
      // Perform the read operation for this buffer.
273
      value = op.read(buffer, lengthToCopy, dest, value);
1✔
274

275
      length -= lengthToCopy;
1✔
276
      readableBytes -= lengthToCopy;
1✔
277
    }
278

279
    if (length > 0) {
1✔
280
      // Should never get here.
281
      throw new AssertionError("Failed executing read operation");
×
282
    }
283

284
    return value;
1✔
285
  }
286

287
  private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
288
    try {
289
      return execute(op, length, dest, value);
1✔
290
    } catch (IOException e) {
×
291
      throw new AssertionError(e); // shouldn't happen
×
292
    }
293
  }
294

295
  /**
296
   * If the current buffer is exhausted, removes and closes it.
297
   */
298
  private void advanceBufferIfNecessary() {
299
    ReadableBuffer buffer = readableBuffers.peek();
1✔
300
    if (buffer.readableBytes() == 0) {
1✔
301
      advanceBuffer();
1✔
302
    }
303
  }
1✔
304

305
  /**
306
   * Removes one buffer from the front and closes it.
307
   */
308
  private void advanceBuffer() {
309
    if (marked) {
1✔
310
      rewindableBuffers.add(readableBuffers.remove());
1✔
311
      ReadableBuffer next = readableBuffers.peek();
1✔
312
      if (next != null) {
1✔
313
        next.mark();
1✔
314
      }
315
    } else {
1✔
316
      readableBuffers.remove().close();
1✔
317
    }
318
  }
1✔
319

320
  /**
321
   * A simple read operation to perform on a single {@link ReadableBuffer}.
322
   * All state management for the buffers is done by
323
   * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
324
   */
325
  private interface ReadOperation<T> {
326
    /**
327
     * This method can also be used to simultaneously perform operation-specific int-valued
328
     * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
329
     * {@code value} is the return value from the prior buffer, or the "initial" value passed
330
     * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
331
     * returned by the operation called on the last buffer.
332
     */
333
    int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
334
  }
335

336
  private interface NoThrowReadOperation<T> extends ReadOperation<T> {
337
    @Override
338
    int read(ReadableBuffer buffer, int length, T dest, int value);
339
  }
340
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc