• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20164

05 Feb 2026 07:38AM UTC coverage: 88.688% (-0.01%) from 88.698%
#20164

push

github

web-flow
Update README etc to reference 1.79.0 (#12643)

35406 of 39922 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.32
/../core/src/main/java/io/grpc/internal/MessageFramer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static java.lang.Math.min;
23

24
import com.google.common.io.ByteStreams;
25
import io.grpc.Codec;
26
import io.grpc.Compressor;
27
import io.grpc.Drainable;
28
import io.grpc.KnownLength;
29
import io.grpc.Status;
30
import io.grpc.StatusRuntimeException;
31
import java.io.ByteArrayInputStream;
32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.io.OutputStream;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.List;
38
import java.util.Locale;
39
import javax.annotation.Nullable;
40

41
/**
42
 * Encodes gRPC messages to be delivered via the transport layer which implements {@link
43
 * MessageFramer.Sink}.
44
 */
45
public class MessageFramer implements Framer {
46

47
  private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;
48

49
  /**
50
   * Sink implemented by the transport layer to receive frames and forward them to their
51
   * destination.
52
   */
53
  public interface Sink {
54
    /**
55
     * Delivers a frame via the transport.
56
     *
57
     * @param frame a non-empty buffer to deliver or {@code null} if the framer is being
58
     *              closed and there is no data to deliver.
59
     * @param endOfStream whether the frame is the last one for the GRPC stream
60
     * @param flush {@code true} if more data may not be arriving soon
61
     * @param numMessages the number of messages that this series of frames represents
62
     */
63
    void deliverFrame(
64
        @Nullable WritableBuffer frame,
65
        boolean endOfStream,
66
        boolean flush,
67
        int numMessages);
68
  }
69

70
  private static final int HEADER_LENGTH = 5;
71
  private static final byte UNCOMPRESSED = 0;
72
  private static final byte COMPRESSED = 1;
73

74
  private final Sink sink;
75
  // effectively final.  Can only be set once.
76
  private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
1✔
77
  private WritableBuffer buffer;
78
  /**
79
   * if > 0 - the number of bytes to allocate for the current known-length message.
80
   */
81
  private int knownLengthPendingAllocation;
82
  private Compressor compressor = Codec.Identity.NONE;
1✔
83
  private boolean messageCompression = true;
1✔
84
  private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
1✔
85
  private final ByteBuffer headerScratch = ByteBuffer.allocate(HEADER_LENGTH);
1✔
86
  private final WritableBufferAllocator bufferAllocator;
87
  private final StatsTraceContext statsTraceCtx;
88
  // transportTracer is nullable until it is integrated with client transports
89
  private boolean closed;
90

91
  // Tracing and stats-related states
92
  private int messagesBuffered;
93
  private int currentMessageSeqNo = -1;
1✔
94
  private long currentMessageWireSize;
95

96
  /**
97
   * Creates a {@code MessageFramer}.
98
   *
99
   * @param sink the sink used to deliver frames to the transport
100
   * @param bufferAllocator allocates buffers that the transport can commit to the wire.
101
   */
102
  public MessageFramer(
103
      Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
1✔
104
    this.sink = checkNotNull(sink, "sink");
1✔
105
    this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator");
1✔
106
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
107
  }
1✔
108

109
  @Override
110
  public MessageFramer setCompressor(Compressor compressor) {
111
    this.compressor = checkNotNull(compressor, "Can't pass an empty compressor");
1✔
112
    return this;
1✔
113
  }
114

115
  @Override
116
  public MessageFramer setMessageCompression(boolean enable) {
117
    messageCompression = enable;
1✔
118
    return this;
1✔
119
  }
120

121
  @Override
122
  public void setMaxOutboundMessageSize(int maxSize) {
123
    checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
1✔
124
    maxOutboundMessageSize = maxSize;
1✔
125
  }
1✔
126

127
  /**
128
   * Writes out a payload message.
129
   *
130
   * @param message contains the message to be written out. It will be completely consumed.
131
   */
132
  @Override
133
  public void writePayload(InputStream message) {
134
    verifyNotClosed();
1✔
135
    messagesBuffered++;
1✔
136
    currentMessageSeqNo++;
1✔
137
    currentMessageWireSize = 0;
1✔
138
    statsTraceCtx.outboundMessage(currentMessageSeqNo);
1✔
139
    boolean compressed = messageCompression && compressor != Codec.Identity.NONE;
1✔
140
    int written = -1;
1✔
141
    int messageLength = -2;
1✔
142
    try {
143
      messageLength = getKnownLength(message);
1✔
144
      if (messageLength != 0 && compressed) {
1✔
145
        written = writeCompressed(message, messageLength);
1✔
146
      } else {
147
        written = writeUncompressed(message, messageLength);
1✔
148
      }
149
    } catch (IOException e) {
×
150
      // This should not be possible, since sink#deliverFrame doesn't throw.
151
      throw Status.INTERNAL
×
152
          .withDescription("Failed to frame message")
×
153
          .withCause(e)
×
154
          .asRuntimeException();
×
155
    } catch (StatusRuntimeException e) {
1✔
156
      throw e;
1✔
157
    } catch (RuntimeException e) {
×
158
      throw Status.INTERNAL
×
159
          .withDescription("Failed to frame message")
×
160
          .withCause(e)
×
161
          .asRuntimeException();
×
162
    }
1✔
163

164
    if (messageLength != -1 && written != messageLength) {
1✔
165
      String err = String.format("Message length inaccurate %s != %s", written, messageLength);
×
166
      throw Status.INTERNAL.withDescription(err).asRuntimeException();
×
167
    }
168
    statsTraceCtx.outboundUncompressedSize(written);
1✔
169
    statsTraceCtx.outboundWireSize(currentMessageWireSize);
1✔
170
    statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
1✔
171
  }
1✔
172

173
  private int writeUncompressed(InputStream message, int messageLength) throws IOException {
174
    if (messageLength != -1) {
1✔
175
      currentMessageWireSize = messageLength;
1✔
176
      return writeKnownLengthUncompressed(message, messageLength);
1✔
177
    }
178
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
1✔
179
    int written = writeToOutputStream(message, bufferChain);
1✔
180
    writeBufferChain(bufferChain, false);
1✔
181
    return written;
1✔
182
  }
183

184
  private int writeCompressed(InputStream message, int unusedMessageLength) throws IOException {
185
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
1✔
186

187
    OutputStream compressingStream = compressor.compress(bufferChain);
1✔
188
    int written;
189
    try {
190
      written = writeToOutputStream(message, compressingStream);
1✔
191
    } finally {
192
      compressingStream.close();
1✔
193
    }
194
    if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
1✔
195
      throw Status.RESOURCE_EXHAUSTED
×
196
          .withDescription(
×
197
              String.format(
×
198
                  Locale.US, "message too large %d > %d", written , maxOutboundMessageSize))
×
199
          .asRuntimeException();
×
200
    }
201

202
    writeBufferChain(bufferChain, true);
1✔
203
    return written;
1✔
204
  }
205

206
  private int getKnownLength(InputStream inputStream) throws IOException {
207
    if (inputStream instanceof KnownLength || inputStream instanceof ByteArrayInputStream) {
1✔
208
      return inputStream.available();
1✔
209
    }
210
    return -1;
1✔
211
  }
212

213
  /**
214
   * Write an unserialized message with a known length, uncompressed.
215
   */
216
  private int writeKnownLengthUncompressed(InputStream message, int messageLength)
217
      throws IOException {
218
    if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
1✔
219
      throw Status.RESOURCE_EXHAUSTED
1✔
220
          .withDescription(
1✔
221
              String.format(
1✔
222
                  Locale.US, "message too large %d > %d", messageLength , maxOutboundMessageSize))
1✔
223
          .asRuntimeException();
1✔
224
    }
225
    headerScratch.clear();
1✔
226
    headerScratch.put(UNCOMPRESSED).putInt(messageLength);
1✔
227
    // Allocate the initial buffer chunk based on frame header + payload length.
228
    // Note that the allocator may allocate a buffer larger or smaller than this length
229
    knownLengthPendingAllocation = HEADER_LENGTH + messageLength;
1✔
230
    writeRaw(headerScratch.array(), 0, headerScratch.position());
1✔
231
    return writeToOutputStream(message, outputStreamAdapter);
1✔
232
  }
233

234
  /**
235
   * Write a message that has been serialized to a sequence of buffers.
236
   */
237
  private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
238
    int messageLength = bufferChain.readableBytes();
1✔
239
    if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
1✔
240
      throw Status.RESOURCE_EXHAUSTED
×
241
          .withDescription(
×
242
              String.format(
×
243
                  Locale.US, "message too large %d > %d", messageLength , maxOutboundMessageSize))
×
244
          .asRuntimeException();
×
245
    }
246
    headerScratch.clear();
1✔
247
    headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength);
1✔
248
    WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
1✔
249
    writeableHeader.write(headerScratch.array(), 0, headerScratch.position());
1✔
250
    if (messageLength == 0) {
1✔
251
      // the payload had 0 length so make the header the current buffer.
252
      buffer = writeableHeader;
1✔
253
      return;
1✔
254
    }
255
    // Note that we are always delivering a small message to the transport here which
256
    // may incur transport framing overhead as it may be sent separately to the contents
257
    // of the GRPC frame.
258
    // The final message may not be completely written because we do not flush the last buffer.
259
    // Do not report the last message as sent.
260
    sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
1✔
261
    messagesBuffered = 1;
1✔
262
    // Commit all except the last buffer to the sink
263
    List<WritableBuffer> bufferList = bufferChain.bufferList;
1✔
264
    for (int i = 0; i < bufferList.size() - 1; i++) {
1✔
265
      sink.deliverFrame(bufferList.get(i), false, false, 0);
1✔
266
    }
267
    // Assign the current buffer to the last in the chain so it can be used
268
    // for future writes or written with end-of-stream=true on close.
269
    buffer = bufferList.get(bufferList.size() - 1);
1✔
270
    currentMessageWireSize = messageLength;
1✔
271
  }
1✔
272

273
  private static int writeToOutputStream(InputStream message, OutputStream outputStream)
274
      throws IOException {
275
    if (message instanceof Drainable) {
1✔
276
      return ((Drainable) message).drainTo(outputStream);
1✔
277
    } else {
278
      // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
279
      // expect performance-critical code to support drainTo().
280
      @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
281
      long written = ByteStreams.copy(message, outputStream);
1✔
282
      checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written);
1✔
283
      return (int) written;
1✔
284
    }
285
  }
286

287
  private void writeRaw(byte[] b, int off, int len) {
288
    while (len > 0) {
1✔
289
      if (buffer != null && buffer.writableBytes() == 0) {
1✔
290
        commitToSink(false, false);
1✔
291
      }
292
      if (buffer == null) {
1✔
293
        checkState(knownLengthPendingAllocation > 0, "knownLengthPendingAllocation reached 0");
1✔
294
        buffer = bufferAllocator.allocate(knownLengthPendingAllocation);
1✔
295
        knownLengthPendingAllocation -= min(knownLengthPendingAllocation, buffer.writableBytes());
1✔
296
      }
297
      int toWrite = min(len, buffer.writableBytes());
1✔
298
      buffer.write(b, off, toWrite);
1✔
299
      off += toWrite;
1✔
300
      len -= toWrite;
1✔
301
    }
1✔
302
  }
1✔
303

304
  /**
305
   * Flushes any buffered data in the framer to the sink.
306
   */
307
  @Override
308
  public void flush() {
309
    if (buffer != null && buffer.readableBytes() > 0) {
1✔
310
      commitToSink(false, true);
1✔
311
    }
312
  }
1✔
313

314
  /**
315
   * Indicates whether or not this framer has been closed via a call to either
316
   * {@link #close()} or {@link #dispose()}.
317
   */
318
  @Override
319
  public boolean isClosed() {
320
    return closed;
1✔
321
  }
322

323
  /**
324
   * Flushes and closes the framer and releases any buffers. After the framer is closed or
325
   * disposed, additional calls to this method will have no affect.
326
   */
327
  @Override
328
  public void close() {
329
    if (!isClosed()) {
1✔
330
      closed = true;
1✔
331
      // With the current code we don't expect readableBytes > 0 to be possible here, added
332
      // defensively to prevent buffer leak issues if the framer code changes later.
333
      if (buffer != null && buffer.readableBytes() == 0) {
1✔
334
        releaseBuffer();
×
335
      }
336
      commitToSink(true, true);
1✔
337
    }
338
  }
1✔
339

340
  /**
341
   * Closes the framer and releases any buffers, but does not flush. After the framer is
342
   * closed or disposed, additional calls to this method will have no affect.
343
   */
344
  @Override
345
  public void dispose() {
346
    closed = true;
×
347
    releaseBuffer();
×
348
  }
×
349

350
  private void releaseBuffer() {
351
    if (buffer != null) {
×
352
      buffer.release();
×
353
      buffer = null;
×
354
    }
355
  }
×
356

357
  private void commitToSink(boolean endOfStream, boolean flush) {
358
    WritableBuffer buf = buffer;
1✔
359
    buffer = null;
1✔
360
    sink.deliverFrame(buf, endOfStream, flush, messagesBuffered);
1✔
361
    messagesBuffered = 0;
1✔
362
  }
1✔
363

364
  private void verifyNotClosed() {
365
    if (isClosed()) {
1✔
366
      throw new IllegalStateException("Framer already closed");
×
367
    }
368
  }
1✔
369

370
  /** OutputStream whose write()s are passed to the framer. */
371
  private class OutputStreamAdapter extends OutputStream {
1✔
372
    /**
373
     * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
374
     * Better yet, you can use your own single byte buffer and call
375
     * {@link #write(byte[], int, int)}.
376
     */
377
    @Override
378
    public void write(int b) {
379
      byte[] singleByte = new byte[]{(byte)b};
×
380
      write(singleByte, 0, 1);
×
381
    }
×
382

383
    @Override
384
    public void write(byte[] b, int off, int len) {
385
      writeRaw(b, off, len);
1✔
386
    }
1✔
387
  }
388

389
  /**
390
   * Produce a collection of {@link WritableBuffer} instances from the data written to an
391
   * {@link OutputStream}.
392
   */
393
  private final class BufferChainOutputStream extends OutputStream {
1✔
394
    private static final int FIRST_BUFFER_SIZE = 4096;
395

396
    private final List<WritableBuffer> bufferList = new ArrayList<>();
1✔
397
    private WritableBuffer current;
398

399
    /**
400
     * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
401
     * Better yet, you can use your own single byte buffer and call
402
     * {@link #write(byte[], int, int)}.
403
     */
404
    @Override
405
    public void write(int b) {
406
      if (current != null && current.writableBytes() > 0) {
1✔
407
        current.write((byte)b);
1✔
408
        return;
1✔
409
      }
410
      byte[] singleByte = new byte[]{(byte)b};
1✔
411
      write(singleByte, 0, 1);
1✔
412
    }
1✔
413

414
    @Override
415
    public void write(byte[] b, int off, int len) {
416
      if (current == null) {
1✔
417
        // Request len bytes initially from the allocator, it may give us more.
418
        current = bufferAllocator.allocate(Math.max(FIRST_BUFFER_SIZE, len));
1✔
419
        bufferList.add(current);
1✔
420
      }
421
      while (len > 0) {
1✔
422
        int canWrite = Math.min(len, current.writableBytes());
1✔
423
        if (canWrite == 0) {
1✔
424
          // Assume message is twice as large as previous assumption if were still not done,
425
          // the allocator may allocate more or less than this amount.
426
          int needed = Math.max(len, current.readableBytes() * 2);
1✔
427
          current = bufferAllocator.allocate(needed);
1✔
428
          bufferList.add(current);
1✔
429
        } else {
1✔
430
          current.write(b, off, canWrite);
1✔
431
          off += canWrite;
1✔
432
          len -= canWrite;
1✔
433
        }
434
      }
1✔
435
    }
1✔
436

437
    private int readableBytes() {
438
      int readable = 0;
1✔
439
      for (WritableBuffer writableBuffer : bufferList) {
1✔
440
        readable += writableBuffer.readableBytes();
1✔
441
      }
1✔
442
      return readable;
1✔
443
    }
444
  }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc