• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19690

14 Feb 2025 01:59PM CUT coverage: 88.626% (+0.04%) from 88.585%
#19690

push

github

web-flow
optimize number of buffer allocations (#11879)

Currently this improves 2 flows

1. Known length message which length is greater than 1Mb. Previously the
first buffer was 1Mb, and then many buffers of 4096 bytes (from
CodedOutputStream), now subsequent buffers are also up to 1Mb

2. In case of compression, the first write is always 10 bytes buffer
(gzip header), but worth allocating more space

34262 of 38659 relevant lines covered (88.63%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.32
/../core/src/main/java/io/grpc/internal/MessageFramer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static java.lang.Math.min;
23

24
import com.google.common.io.ByteStreams;
25
import io.grpc.Codec;
26
import io.grpc.Compressor;
27
import io.grpc.Drainable;
28
import io.grpc.KnownLength;
29
import io.grpc.Status;
30
import io.grpc.StatusRuntimeException;
31
import java.io.ByteArrayInputStream;
32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.io.OutputStream;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.List;
38
import java.util.Locale;
39
import javax.annotation.Nullable;
40

41
/**
42
 * Encodes gRPC messages to be delivered via the transport layer which implements {@link
43
 * MessageFramer.Sink}.
44
 */
45
public class MessageFramer implements Framer {
46

47
  private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;
48

49
  /**
50
   * Sink implemented by the transport layer to receive frames and forward them to their
51
   * destination.
52
   */
53
  public interface Sink {
54
    /**
55
     * Delivers a frame via the transport.
56
     *
57
     * @param frame a non-empty buffer to deliver or {@code null} if the framer is being
58
     *              closed and there is no data to deliver.
59
     * @param endOfStream whether the frame is the last one for the GRPC stream
60
     * @param flush {@code true} if more data may not be arriving soon
61
     * @param numMessages the number of messages that this series of frames represents
62
     */
63
    void deliverFrame(
64
        @Nullable WritableBuffer frame,
65
        boolean endOfStream,
66
        boolean flush,
67
        int numMessages);
68
  }
69

70
  private static final int HEADER_LENGTH = 5;
71
  private static final byte UNCOMPRESSED = 0;
72
  private static final byte COMPRESSED = 1;
73

74
  private final Sink sink;
75
  // effectively final.  Can only be set once.
76
  private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
1✔
77
  private WritableBuffer buffer;
78
  /**
79
   * if > 0 - the number of bytes to allocate for the current known-length message.
80
   */
81
  private int knownLengthPendingAllocation;
82
  private Compressor compressor = Codec.Identity.NONE;
1✔
83
  private boolean messageCompression = true;
1✔
84
  private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
1✔
85
  private final ByteBuffer headerScratch = ByteBuffer.allocate(HEADER_LENGTH);
1✔
86
  private final WritableBufferAllocator bufferAllocator;
87
  private final StatsTraceContext statsTraceCtx;
88
  // transportTracer is nullable until it is integrated with client transports
89
  private boolean closed;
90

91
  // Tracing and stats-related states
92
  private int messagesBuffered;
93
  private int currentMessageSeqNo = -1;
1✔
94
  private long currentMessageWireSize;
95

96
  /**
97
   * Creates a {@code MessageFramer}.
98
   *
99
   * @param sink the sink used to deliver frames to the transport
100
   * @param bufferAllocator allocates buffers that the transport can commit to the wire.
101
   */
102
  public MessageFramer(
103
      Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
1✔
104
    this.sink = checkNotNull(sink, "sink");
1✔
105
    this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator");
1✔
106
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
107
  }
1✔
108

109
  @Override
110
  public MessageFramer setCompressor(Compressor compressor) {
111
    this.compressor = checkNotNull(compressor, "Can't pass an empty compressor");
1✔
112
    return this;
1✔
113
  }
114

115
  @Override
116
  public MessageFramer setMessageCompression(boolean enable) {
117
    messageCompression = enable;
1✔
118
    return this;
1✔
119
  }
120

121
  @Override
122
  public void setMaxOutboundMessageSize(int maxSize) {
123
    checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
1✔
124
    maxOutboundMessageSize = maxSize;
1✔
125
  }
1✔
126

127
  /**
128
   * Writes out a payload message.
129
   *
130
   * @param message contains the message to be written out. It will be completely consumed.
131
   */
132
  @Override
133
  public void writePayload(InputStream message) {
134
    verifyNotClosed();
1✔
135
    messagesBuffered++;
1✔
136
    currentMessageSeqNo++;
1✔
137
    currentMessageWireSize = 0;
1✔
138
    statsTraceCtx.outboundMessage(currentMessageSeqNo);
1✔
139
    boolean compressed = messageCompression && compressor != Codec.Identity.NONE;
1✔
140
    int written = -1;
1✔
141
    int messageLength = -2;
1✔
142
    try {
143
      messageLength = getKnownLength(message);
1✔
144
      if (messageLength != 0 && compressed) {
1✔
145
        written = writeCompressed(message, messageLength);
1✔
146
      } else {
147
        written = writeUncompressed(message, messageLength);
1✔
148
      }
149
    } catch (IOException e) {
×
150
      // This should not be possible, since sink#deliverFrame doesn't throw.
151
      throw Status.INTERNAL
×
152
          .withDescription("Failed to frame message")
×
153
          .withCause(e)
×
154
          .asRuntimeException();
×
155
    } catch (StatusRuntimeException e) {
1✔
156
      throw e;
1✔
157
    } catch (RuntimeException e) {
×
158
      throw Status.INTERNAL
×
159
          .withDescription("Failed to frame message")
×
160
          .withCause(e)
×
161
          .asRuntimeException();
×
162
    }
1✔
163

164
    if (messageLength != -1 && written != messageLength) {
1✔
165
      String err = String.format("Message length inaccurate %s != %s", written, messageLength);
×
166
      throw Status.INTERNAL.withDescription(err).asRuntimeException();
×
167
    }
168
    statsTraceCtx.outboundUncompressedSize(written);
1✔
169
    statsTraceCtx.outboundWireSize(currentMessageWireSize);
1✔
170
    statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
1✔
171
  }
1✔
172

173
  private int writeUncompressed(InputStream message, int messageLength) throws IOException {
174
    if (messageLength != -1) {
1✔
175
      currentMessageWireSize = messageLength;
1✔
176
      return writeKnownLengthUncompressed(message, messageLength);
1✔
177
    }
178
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
1✔
179
    int written = writeToOutputStream(message, bufferChain);
1✔
180
    writeBufferChain(bufferChain, false);
1✔
181
    return written;
1✔
182
  }
183

184
  private int writeCompressed(InputStream message, int unusedMessageLength) throws IOException {
185
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
1✔
186

187
    OutputStream compressingStream = compressor.compress(bufferChain);
1✔
188
    int written;
189
    try {
190
      written = writeToOutputStream(message, compressingStream);
1✔
191
    } finally {
192
      compressingStream.close();
1✔
193
    }
194
    if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
1✔
195
      throw Status.RESOURCE_EXHAUSTED
×
196
          .withDescription(
×
197
              String.format(
×
198
                  Locale.US, "message too large %d > %d", written , maxOutboundMessageSize))
×
199
          .asRuntimeException();
×
200
    }
201

202
    writeBufferChain(bufferChain, true);
1✔
203
    return written;
1✔
204
  }
205

206
  private int getKnownLength(InputStream inputStream) throws IOException {
207
    if (inputStream instanceof KnownLength || inputStream instanceof ByteArrayInputStream) {
1✔
208
      return inputStream.available();
1✔
209
    }
210
    return -1;
1✔
211
  }
212

213
  /**
214
   * Write an unserialized message with a known length, uncompressed.
215
   */
216
  private int writeKnownLengthUncompressed(InputStream message, int messageLength)
217
      throws IOException {
218
    if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
1✔
219
      throw Status.RESOURCE_EXHAUSTED
1✔
220
          .withDescription(
1✔
221
              String.format(
1✔
222
                  Locale.US, "message too large %d > %d", messageLength , maxOutboundMessageSize))
1✔
223
          .asRuntimeException();
1✔
224
    }
225
    headerScratch.clear();
1✔
226
    headerScratch.put(UNCOMPRESSED).putInt(messageLength);
1✔
227
    // Allocate the initial buffer chunk based on frame header + payload length.
228
    // Note that the allocator may allocate a buffer larger or smaller than this length
229
    knownLengthPendingAllocation = HEADER_LENGTH + messageLength;
1✔
230
    writeRaw(headerScratch.array(), 0, headerScratch.position());
1✔
231
    return writeToOutputStream(message, outputStreamAdapter);
1✔
232
  }
233

234
  /**
235
   * Write a message that has been serialized to a sequence of buffers.
236
   */
237
  private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
238
    int messageLength = bufferChain.readableBytes();
1✔
239
    if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
1✔
240
      throw Status.RESOURCE_EXHAUSTED
×
241
          .withDescription(
×
242
              String.format(
×
243
                  Locale.US, "message too large %d > %d", messageLength , maxOutboundMessageSize))
×
244
          .asRuntimeException();
×
245
    }
246
    headerScratch.clear();
1✔
247
    headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength);
1✔
248
    WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
1✔
249
    writeableHeader.write(headerScratch.array(), 0, headerScratch.position());
1✔
250
    if (messageLength == 0) {
1✔
251
      // the payload had 0 length so make the header the current buffer.
252
      buffer = writeableHeader;
1✔
253
      return;
1✔
254
    }
255
    // Note that we are always delivering a small message to the transport here which
256
    // may incur transport framing overhead as it may be sent separately to the contents
257
    // of the GRPC frame.
258
    // The final message may not be completely written because we do not flush the last buffer.
259
    // Do not report the last message as sent.
260
    sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
1✔
261
    messagesBuffered = 1;
1✔
262
    // Commit all except the last buffer to the sink
263
    List<WritableBuffer> bufferList = bufferChain.bufferList;
1✔
264
    for (int i = 0; i < bufferList.size() - 1; i++) {
1✔
265
      sink.deliverFrame(bufferList.get(i), false, false, 0);
1✔
266
    }
267
    // Assign the current buffer to the last in the chain so it can be used
268
    // for future writes or written with end-of-stream=true on close.
269
    buffer = bufferList.get(bufferList.size() - 1);
1✔
270
    currentMessageWireSize = messageLength;
1✔
271
  }
1✔
272

273
  private static int writeToOutputStream(InputStream message, OutputStream outputStream)
274
      throws IOException {
275
    if (message instanceof Drainable) {
1✔
276
      return ((Drainable) message).drainTo(outputStream);
1✔
277
    } else {
278
      // This makes an unnecessary copy of the bytes when bytebuf supports array(). However, we
279
      // expect performance-critical code to support drainTo().
280
      @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
281
      long written = ByteStreams.copy(message, outputStream);
1✔
282
      checkArgument(written <= Integer.MAX_VALUE, "Message size overflow: %s", written);
1✔
283
      return (int) written;
1✔
284
    }
285
  }
286

287
  private void writeRaw(byte[] b, int off, int len) {
288
    while (len > 0) {
1✔
289
      if (buffer != null && buffer.writableBytes() == 0) {
1✔
290
        commitToSink(false, false);
1✔
291
      }
292
      if (buffer == null) {
1✔
293
        checkState(knownLengthPendingAllocation > 0, "knownLengthPendingAllocation reached 0");
1✔
294
        buffer = bufferAllocator.allocate(knownLengthPendingAllocation);
1✔
295
        knownLengthPendingAllocation -= min(knownLengthPendingAllocation, buffer.writableBytes());
1✔
296
      }
297
      int toWrite = min(len, buffer.writableBytes());
1✔
298
      buffer.write(b, off, toWrite);
1✔
299
      off += toWrite;
1✔
300
      len -= toWrite;
1✔
301
    }
1✔
302
  }
1✔
303

304
  /**
305
   * Flushes any buffered data in the framer to the sink.
306
   */
307
  @Override
308
  public void flush() {
309
    if (buffer != null && buffer.readableBytes() > 0) {
1✔
310
      commitToSink(false, true);
1✔
311
    }
312
  }
1✔
313

314
  /**
315
   * Indicates whether or not this framer has been closed via a call to either
316
   * {@link #close()} or {@link #dispose()}.
317
   */
318
  @Override
319
  public boolean isClosed() {
320
    return closed;
1✔
321
  }
322

323
  /**
324
   * Flushes and closes the framer and releases any buffers. After the framer is closed or
325
   * disposed, additional calls to this method will have no affect.
326
   */
327
  @Override
328
  public void close() {
329
    if (!isClosed()) {
1✔
330
      closed = true;
1✔
331
      // With the current code we don't expect readableBytes > 0 to be possible here, added
332
      // defensively to prevent buffer leak issues if the framer code changes later.
333
      if (buffer != null && buffer.readableBytes() == 0) {
1✔
334
        releaseBuffer();
×
335
      }
336
      commitToSink(true, true);
1✔
337
    }
338
  }
1✔
339

340
  /**
341
   * Closes the framer and releases any buffers, but does not flush. After the framer is
342
   * closed or disposed, additional calls to this method will have no affect.
343
   */
344
  @Override
345
  public void dispose() {
346
    closed = true;
×
347
    releaseBuffer();
×
348
  }
×
349

350
  private void releaseBuffer() {
351
    if (buffer != null) {
×
352
      buffer.release();
×
353
      buffer = null;
×
354
    }
355
  }
×
356

357
  private void commitToSink(boolean endOfStream, boolean flush) {
358
    WritableBuffer buf = buffer;
1✔
359
    buffer = null;
1✔
360
    sink.deliverFrame(buf, endOfStream, flush, messagesBuffered);
1✔
361
    messagesBuffered = 0;
1✔
362
  }
1✔
363

364
  private void verifyNotClosed() {
365
    if (isClosed()) {
1✔
366
      throw new IllegalStateException("Framer already closed");
×
367
    }
368
  }
1✔
369

370
  /** OutputStream whose write()s are passed to the framer. */
371
  private class OutputStreamAdapter extends OutputStream {
1✔
372
    /**
373
     * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
374
     * Better yet, you can use your own single byte buffer and call
375
     * {@link #write(byte[], int, int)}.
376
     */
377
    @Override
378
    public void write(int b) {
379
      byte[] singleByte = new byte[]{(byte)b};
×
380
      write(singleByte, 0, 1);
×
381
    }
×
382

383
    @Override
384
    public void write(byte[] b, int off, int len) {
385
      writeRaw(b, off, len);
1✔
386
    }
1✔
387
  }
388

389
  /**
390
   * Produce a collection of {@link WritableBuffer} instances from the data written to an
391
   * {@link OutputStream}.
392
   */
393
  private final class BufferChainOutputStream extends OutputStream {
1✔
394
    private static final int FIRST_BUFFER_SIZE = 4096;
395

396
    private final List<WritableBuffer> bufferList = new ArrayList<>();
1✔
397
    private WritableBuffer current;
398

399
    /**
400
     * This is slow, don't call it.  If you care about write overhead, use a BufferedOutputStream.
401
     * Better yet, you can use your own single byte buffer and call
402
     * {@link #write(byte[], int, int)}.
403
     */
404
    @Override
405
    public void write(int b) {
406
      if (current != null && current.writableBytes() > 0) {
1✔
407
        current.write((byte)b);
1✔
408
        return;
1✔
409
      }
410
      byte[] singleByte = new byte[]{(byte)b};
1✔
411
      write(singleByte, 0, 1);
1✔
412
    }
1✔
413

414
    @Override
415
    public void write(byte[] b, int off, int len) {
416
      if (current == null) {
1✔
417
        // Request len bytes initially from the allocator, it may give us more.
418
        current = bufferAllocator.allocate(Math.max(FIRST_BUFFER_SIZE, len));
1✔
419
        bufferList.add(current);
1✔
420
      }
421
      while (len > 0) {
1✔
422
        int canWrite = Math.min(len, current.writableBytes());
1✔
423
        if (canWrite == 0) {
1✔
424
          // Assume message is twice as large as previous assumption if were still not done,
425
          // the allocator may allocate more or less than this amount.
426
          int needed = Math.max(len, current.readableBytes() * 2);
1✔
427
          current = bufferAllocator.allocate(needed);
1✔
428
          bufferList.add(current);
1✔
429
        } else {
1✔
430
          current.write(b, off, canWrite);
1✔
431
          off += canWrite;
1✔
432
          len -= canWrite;
1✔
433
        }
434
      }
1✔
435
    }
1✔
436

437
    private int readableBytes() {
438
      int readable = 0;
1✔
439
      for (WritableBuffer writableBuffer : bufferList) {
1✔
440
        readable += writableBuffer.readableBytes();
1✔
441
      }
1✔
442
      return readable;
1✔
443
    }
444
  }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc