• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20244

17 Apr 2026 02:24PM UTC coverage: 88.793% (-0.02%) from 88.811%
#20244

push

github

web-flow
core: Reduce per-stream idle memory by 20%

Metadata was accidentally being retained after the start of the call.
That can be an overwhelming percentage of memory for an idle RPC; don't
do that. The other changes are considerably smaller, but I happened to
notice them and the changes are straight-forward without magic numbers
(e.g., there's many arrays that could be tuned).

The regular interop server uses 4600 bytes per full duplex stream while
idle, but much of that is Census recorded events hanging around. Keeping
the Census integration but removing the Census impl (so a noop is used)
drops that to 3000 bytes. This change brings that down to ~2450 bytes
(which is still including stuff from TestServiceImpl). But there's very
little Metadata in the interop tests, so absolute real-life savings
would be much higher (but relative real-life savings may be lower,
because the application will often have more state).

The measurements were captured using a modified
timeout_on_sleeping_server client that had 100,000 concurrent full
duplex calls on one connection.

36017 of 40563 relevant lines covered (88.79%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/../core/src/main/java/io/grpc/internal/MessageDeframer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Codec;
25
import io.grpc.Decompressor;
26
import io.grpc.Status;
27
import java.io.Closeable;
28
import java.io.FilterInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.Locale;
32
import java.util.zip.DataFormatException;
33
import javax.annotation.Nullable;
34
import javax.annotation.concurrent.NotThreadSafe;
35

36
/**
37
 * Deframer for GRPC frames.
38
 *
39
 * <p>This class is not thread-safe. Unless otherwise stated, all calls to public methods should be
40
 * made in the deframing thread.
41
 */
42
@NotThreadSafe
43
public class MessageDeframer implements Closeable, Deframer {
44
  private static final int HEADER_LENGTH = 5;
45
  private static final int COMPRESSED_FLAG_MASK = 1;
46
  private static final int RESERVED_MASK = 0xFE;
47
  private static final int MAX_BUFFER_SIZE = 1024 * 1024 * 2;
48

49
  /**
50
   * A listener of deframing events. These methods will be invoked from the deframing thread.
51
   */
52
  public interface Listener {
53

54
    /**
55
     * Called when the given number of bytes has been read from the input source of the deframer.
56
     * This is typically used to indicate to the underlying transport that more data can be
57
     * accepted.
58
     *
59
     * @param numBytes the number of bytes read from the deframer's input source.
60
     */
61
    void bytesRead(int numBytes);
62

63
    /**
64
     * Called to deliver the next complete message.
65
     *
66
     * @param producer single message producer wrapping the message.
67
     */
68
    void messagesAvailable(StreamListener.MessageProducer producer);
69

70
    /**
71
     * Called when the deframer closes.
72
     *
73
     * @param hasPartialMessage whether the deframer contained an incomplete message at closing.
74
     */
75
    void deframerClosed(boolean hasPartialMessage);
76

77
    /**
78
     * Called when a {@link #deframe(ReadableBuffer)} operation failed.
79
     *
80
     * @param cause the actual failure
81
     */
82
    void deframeFailed(Throwable cause);
83
  }
84

85
  private enum State {
1✔
86
    HEADER, BODY
1✔
87
  }
88

89
  private Listener listener;
90
  private int maxInboundMessageSize;
91
  private final StatsTraceContext statsTraceCtx;
92
  private final TransportTracer transportTracer;
93
  private Decompressor decompressor;
94
  private GzipInflatingBuffer fullStreamDecompressor;
95
  private byte[] inflatedBuffer;
96
  private int inflatedIndex;
97
  private State state = State.HEADER;
1✔
98
  private int requiredLength = HEADER_LENGTH;
1✔
99
  private boolean compressedFlag;
100
  private CompositeReadableBuffer nextFrame;
101
  private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
1✔
102
  private long pendingDeliveries;
103
  private boolean inDelivery = false;
1✔
104
  private int currentMessageSeqNo = -1;
1✔
105
  private int inboundBodyWireSize;
106

107
  private boolean closeWhenComplete = false;
1✔
108
  private volatile boolean stopDelivery = false;
1✔
109

110
  /**
111
   * Create a deframer.
112
   *
113
   * @param listener listener for deframer events.
114
   * @param decompressor the compression used if a compressed frame is encountered, with
115
   *                     {@code NONE} meaning unsupported
116
   * @param maxMessageSize the maximum allowed size for received messages.
117
   */
118
  public MessageDeframer(
119
      Listener listener,
120
      Decompressor decompressor,
121
      int maxMessageSize,
122
      StatsTraceContext statsTraceCtx,
123
      TransportTracer transportTracer) {
1✔
124
    this.listener = checkNotNull(listener, "sink");
1✔
125
    this.decompressor = checkNotNull(decompressor, "decompressor");
1✔
126
    this.maxInboundMessageSize = maxMessageSize;
1✔
127
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
128
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
129
  }
1✔
130

131
  void setListener(Listener listener) {
132
    this.listener = listener;
1✔
133
  }
1✔
134

135
  @Override
136
  public void setMaxInboundMessageSize(int messageSize) {
137
    maxInboundMessageSize = messageSize;
1✔
138
  }
1✔
139

140
  @Override
141
  public void setDecompressor(Decompressor decompressor) {
142
    checkState(fullStreamDecompressor == null, "Already set full stream decompressor");
1✔
143
    this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
1✔
144
  }
1✔
145

146
  @Override
147
  public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
148
    checkState(decompressor == Codec.Identity.NONE, "per-message decompressor already set");
1✔
149
    checkState(this.fullStreamDecompressor == null, "full stream decompressor already set");
1✔
150
    this.fullStreamDecompressor =
1✔
151
        checkNotNull(fullStreamDecompressor, "Can't pass a null full stream decompressor");
1✔
152
    unprocessed = null;
1✔
153
  }
1✔
154

155
  @Override
156
  public void request(int numMessages) {
157
    checkArgument(numMessages > 0, "numMessages must be > 0");
1✔
158
    if (isClosed()) {
1✔
159
      return;
1✔
160
    }
161
    pendingDeliveries += numMessages;
1✔
162
    deliver();
1✔
163
  }
1✔
164

165
  @Override
166
  public void deframe(ReadableBuffer data) {
167
    checkNotNull(data, "data");
1✔
168
    boolean needToCloseData = true;
1✔
169
    try {
170
      if (!isClosedOrScheduledToClose()) {
1✔
171
        if (fullStreamDecompressor != null) {
1✔
172
          fullStreamDecompressor.addGzippedBytes(data);
1✔
173
        } else {
174
          unprocessed.addBuffer(data);
1✔
175
        }
176
        needToCloseData = false;
1✔
177

178
        deliver();
1✔
179
      }
180
    } finally {
181
      if (needToCloseData) {
1✔
182
        data.close();
1✔
183
      }
184
    }
185
  }
1✔
186

187
  @Override
188
  public void closeWhenComplete() {
189
    if (isClosed()) {
1✔
190
      return;
1✔
191
    } else if (isStalled()) {
1✔
192
      close();
1✔
193
    } else {
194
      closeWhenComplete = true;
1✔
195
    }
196
  }
1✔
197

198
  /**
199
   * Sets a flag to interrupt delivery of any currently queued messages. This may be invoked outside
200
   * of the deframing thread, and must be followed by a call to {@link #close()} in the deframing
201
   * thread. Without a subsequent call to {@link #close()}, the deframer may hang waiting for
202
   * additional messages before noticing that the {@code stopDelivery} flag has been set.
203
   */
204
  void stopDelivery() {
205
    stopDelivery = true;
×
206
  }
×
207

208
  boolean hasPendingDeliveries() {
209
    return pendingDeliveries != 0;
×
210
  }
211

212
  @Override
213
  public void close() {
214
    if (isClosed()) {
1✔
215
      return;
×
216
    }
217
    boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
1✔
218
    try {
219
      if (fullStreamDecompressor != null) {
1✔
220
        hasPartialMessage = hasPartialMessage || fullStreamDecompressor.hasPartialData();
1✔
221
        fullStreamDecompressor.close();
1✔
222
      }
223
      if (unprocessed != null) {
1✔
224
        unprocessed.close();
1✔
225
      }
226
      if (nextFrame != null) {
1✔
227
        nextFrame.close();
1✔
228
      }
229
    } finally {
230
      fullStreamDecompressor = null;
1✔
231
      unprocessed = null;
1✔
232
      nextFrame = null;
1✔
233
    }
234
    listener.deframerClosed(hasPartialMessage);
1✔
235
  }
1✔
236

237
  /**
238
   * Indicates whether or not this deframer has been closed.
239
   */
240
  public boolean isClosed() {
241
    return unprocessed == null && fullStreamDecompressor == null;
1✔
242
  }
243

244
  /** Returns true if this deframer has already been closed or scheduled to close. */
245
  private boolean isClosedOrScheduledToClose() {
246
    return isClosed() || closeWhenComplete;
1✔
247
  }
248

249
  private boolean isStalled() {
250
    if (fullStreamDecompressor != null) {
1✔
251
      return fullStreamDecompressor.isStalled();
1✔
252
    } else {
253
      return unprocessed.readableBytes() == 0;
1✔
254
    }
255
  }
256

257
  /**
258
   * Reads and delivers as many messages to the listener as possible.
259
   */
260
  private void deliver() {
261
    // We can have reentrancy here when using a direct executor, triggered by calls to
262
    // request more messages. This is safe as we simply loop until pendingDelivers = 0
263
    if (inDelivery) {
1✔
264
      return;
1✔
265
    }
266
    inDelivery = true;
1✔
267
    try {
268
      // Process the uncompressed bytes.
269
      while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
1✔
270
        switch (state) {
1✔
271
          case HEADER:
272
            processHeader();
1✔
273
            break;
1✔
274
          case BODY:
275
            // Read the body and deliver the message.
276
            processBody();
1✔
277

278
            // Since we've delivered a message, decrement the number of pending
279
            // deliveries remaining.
280
            pendingDeliveries--;
1✔
281
            break;
1✔
282
          default:
283
            throw new AssertionError("Invalid state: " + state);
×
284
        }
285
      }
286

287
      if (stopDelivery) {
1✔
288
        close();
×
289
        return;
×
290
      }
291

292
      /*
293
       * We are stalled when there are no more bytes to process. This allows delivering errors as
294
       * soon as the buffered input has been consumed, independent of whether the application
295
       * has requested another message.  At this point in the function, either all frames have been
296
       * delivered, or unprocessed is empty.  If there is a partial message, it will be inside next
297
       * frame and not in unprocessed.  If there is extra data but no pending deliveries, it will
298
       * be in unprocessed.
299
       */
300
      if (closeWhenComplete && isStalled()) {
1✔
301
        close();
1✔
302
      }
303
    } finally {
304
      inDelivery = false;
1✔
305
    }
306
  }
1✔
307

308
  /**
309
   * Attempts to read the required bytes into nextFrame.
310
   *
311
   * @return {@code true} if all of the required bytes have been read.
312
   */
313
  private boolean readRequiredBytes() {
314
    int totalBytesRead = 0;
1✔
315
    int deflatedBytesRead = 0;
1✔
316
    try {
317
      // Avoid allocating nextFrame when idle
318
      if (requiredLength > 0 && fullStreamDecompressor == null
1✔
319
          && unprocessed.readableBytes() == 0) {
1✔
320
        return false;
1✔
321
      }
322

323
      if (nextFrame == null) {
1✔
324
        nextFrame = new CompositeReadableBuffer();
1✔
325
      }
326

327
      // Read until the buffer contains all the required bytes.
328
      int missingBytes;
329
      while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
1✔
330
        if (fullStreamDecompressor != null) {
1✔
331
          try {
332
            if (inflatedBuffer == null || inflatedIndex == inflatedBuffer.length) {
1✔
333
              inflatedBuffer = new byte[Math.min(missingBytes, MAX_BUFFER_SIZE)];
1✔
334
              inflatedIndex = 0;
1✔
335
            }
336
            int bytesToRead = Math.min(missingBytes, inflatedBuffer.length - inflatedIndex);
1✔
337
            int n = fullStreamDecompressor.inflateBytes(inflatedBuffer, inflatedIndex, bytesToRead);
1✔
338
            totalBytesRead += fullStreamDecompressor.getAndResetBytesConsumed();
1✔
339
            deflatedBytesRead += fullStreamDecompressor.getAndResetDeflatedBytesConsumed();
1✔
340
            if (n == 0) {
1✔
341
              // No more inflated data is available.
342
              return false;
1✔
343
            }
344
            nextFrame.addBuffer(ReadableBuffers.wrap(inflatedBuffer, inflatedIndex, n));
1✔
345
            inflatedIndex += n;
1✔
346
          } catch (IOException e) {
×
347
            throw new RuntimeException(e);
×
348
          } catch (DataFormatException e) {
×
349
            throw new RuntimeException(e);
×
350
          }
1✔
351
        } else {
352
          if (unprocessed.readableBytes() == 0) {
1✔
353
            // No more data is available.
354
            return false;
1✔
355
          }
356
          int toRead = Math.min(missingBytes, unprocessed.readableBytes());
1✔
357
          totalBytesRead += toRead;
1✔
358
          nextFrame.addBuffer(unprocessed.readBytes(toRead));
1✔
359
        }
1✔
360
      }
361
      return true;
1✔
362
    } finally {
363
      if (totalBytesRead > 0) {
1✔
364
        listener.bytesRead(totalBytesRead);
1✔
365
        if (state == State.BODY) {
1✔
366
          if (fullStreamDecompressor != null) {
1✔
367
            // With compressed streams, totalBytesRead can include gzip header and trailer metadata
368
            statsTraceCtx.inboundWireSize(deflatedBytesRead);
1✔
369
            inboundBodyWireSize += deflatedBytesRead;
1✔
370
          } else {
371
            statsTraceCtx.inboundWireSize(totalBytesRead);
1✔
372
            inboundBodyWireSize += totalBytesRead;
1✔
373
          }
374
        }
375
      }
376
    }
377
  }
378

379
  /**
380
   * Processes the GRPC compression header which is composed of the compression flag and the outer
381
   * frame length.
382
   */
383
  private void processHeader() {
384
    int type = nextFrame.readUnsignedByte();
1✔
385
    if ((type & RESERVED_MASK) != 0) {
1✔
386
      throw Status.INTERNAL.withDescription(
×
387
          "gRPC frame header malformed: reserved bits not zero")
388
          .asRuntimeException();
×
389
    }
390
    compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
1✔
391

392
    // Update the required length to include the length of the frame.
393
    requiredLength = nextFrame.readInt();
1✔
394
    if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
1✔
395
      throw Status.RESOURCE_EXHAUSTED.withDescription(
1✔
396
          String.format(Locale.US, "gRPC message exceeds maximum size %d: %d",
1✔
397
              maxInboundMessageSize, requiredLength))
1✔
398
          .asRuntimeException();
1✔
399
    }
400

401
    currentMessageSeqNo++;
1✔
402
    statsTraceCtx.inboundMessage(currentMessageSeqNo);
1✔
403
    transportTracer.reportMessageReceived();
1✔
404
    // Continue reading the frame body.
405
    state = State.BODY;
1✔
406
  }
1✔
407

408
  /**
409
   * Processes the GRPC message body, which depending on frame header flags may be compressed.
410
   */
411
  private void processBody() {
412
    // There is no reliable way to get the uncompressed size per message when it's compressed,
413
    // because the uncompressed bytes are provided through an InputStream whose total size is
414
    // unknown until all bytes are read, and we don't know when it happens.
415
    statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize,
1✔
416
        (compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize);
1✔
417
    inboundBodyWireSize = 0;
1✔
418
    InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
1✔
419
    nextFrame.touch();
1✔
420
    nextFrame = null;
1✔
421
    listener.messagesAvailable(new SingleMessageProducer(stream));
1✔
422

423
    // Done with this frame, begin processing the next header.
424
    state = State.HEADER;
1✔
425
    requiredLength = HEADER_LENGTH;
1✔
426
  }
1✔
427

428
  private InputStream getUncompressedBody() {
429
    statsTraceCtx.inboundUncompressedSize(nextFrame.readableBytes());
1✔
430
    return ReadableBuffers.openStream(nextFrame, true);
1✔
431
  }
432

433
  private InputStream getCompressedBody() {
434
    if (decompressor == Codec.Identity.NONE) {
1✔
435
      throw Status.INTERNAL.withDescription(
×
436
          "Can't decode compressed gRPC message as compression not configured")
437
          .asRuntimeException();
×
438
    }
439

440
    try {
441
      // Enforce the maxMessageSize limit on the returned stream.
442
      InputStream unlimitedStream =
1✔
443
          decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
1✔
444
      return new SizeEnforcingInputStream(
1✔
445
          unlimitedStream, maxInboundMessageSize, statsTraceCtx);
446
    } catch (IOException e) {
×
447
      throw new RuntimeException(e);
×
448
    }
449
  }
450

451
  /**
452
   * An {@link InputStream} that enforces the {@link #maxMessageSize} limit for compressed frames.
453
   */
454
  @VisibleForTesting
455
  static final class SizeEnforcingInputStream extends FilterInputStream {
456
    private final int maxMessageSize;
457
    private final StatsTraceContext statsTraceCtx;
458
    private long maxCount;
459
    private long count;
460
    private long mark = -1;
1✔
461

462
    SizeEnforcingInputStream(InputStream in, int maxMessageSize, StatsTraceContext statsTraceCtx) {
463
      super(in);
1✔
464
      this.maxMessageSize = maxMessageSize;
1✔
465
      this.statsTraceCtx = statsTraceCtx;
1✔
466
    }
1✔
467

468
    @Override
469
    public int read() throws IOException {
470
      int result = in.read();
1✔
471
      if (result != -1) {
1✔
472
        count++;
1✔
473
      }
474
      verifySize();
1✔
475
      reportCount();
1✔
476
      return result;
1✔
477
    }
478

479
    @Override
480
    public int read(byte[] b, int off, int len) throws IOException {
481
      int result = in.read(b, off, len);
1✔
482
      if (result != -1) {
1✔
483
        count += result;
1✔
484
      }
485
      verifySize();
1✔
486
      reportCount();
1✔
487
      return result;
1✔
488
    }
489

490
    @Override
491
    public long skip(long n) throws IOException {
492
      long result = in.skip(n);
1✔
493
      count += result;
1✔
494
      verifySize();
1✔
495
      reportCount();
1✔
496
      return result;
1✔
497
    }
498

499
    @Override
500
    public synchronized void mark(int readlimit) {
501
      in.mark(readlimit);
1✔
502
      mark = count;
1✔
503
      // it's okay to mark even if mark isn't supported, as reset won't work
504
    }
1✔
505

506
    @Override
507
    public synchronized void reset() throws IOException {
508
      if (!in.markSupported()) {
1✔
509
        throw new IOException("Mark not supported");
×
510
      }
511
      if (mark == -1) {
1✔
512
        throw new IOException("Mark not set");
×
513
      }
514

515
      in.reset();
1✔
516
      count = mark;
1✔
517
    }
1✔
518

519
    private void reportCount() {
520
      if (count > maxCount) {
1✔
521
        statsTraceCtx.inboundUncompressedSize(count - maxCount);
1✔
522
        maxCount = count;
1✔
523
      }
524
    }
1✔
525

526
    private void verifySize() {
527
      if (count > maxMessageSize) {
1✔
528
        throw Status.RESOURCE_EXHAUSTED
1✔
529
            .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)
1✔
530
            .asRuntimeException();
1✔
531
      }
532
    }
1✔
533
  }
534

535
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
536
    private InputStream message;
537

538
    private SingleMessageProducer(InputStream message) {
1✔
539
      this.message = message;
1✔
540
    }
1✔
541

542
    @Nullable
543
    @Override
544
    public InputStream next() {
545
      InputStream messageToReturn = message;
1✔
546
      message = null;
1✔
547
      return messageToReturn;
1✔
548
    }
549
  }
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc