• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18822

07 Sep 2023 08:15PM UTC coverage: 88.321% (+0.02%) from 88.302%
#18822

push

github-actions

ejona86
netty: Touch ByteBuf when message framing has been decoded

When a memory leak occurs, it is really helpful to have access records
to understand where the buffer was being held when it leaked. retain()
when we create the NettyReadableBuffer already creates an access record
the ByteBuf, so here we track when the ByteBuf is passed to another
thread.

See #8330

30349 of 34362 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/../core/src/main/java/io/grpc/internal/MessageDeframer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Codec;
25
import io.grpc.Decompressor;
26
import io.grpc.Status;
27
import java.io.Closeable;
28
import java.io.FilterInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.Locale;
32
import java.util.zip.DataFormatException;
33
import javax.annotation.Nullable;
34
import javax.annotation.concurrent.NotThreadSafe;
35

36
/**
37
 * Deframer for GRPC frames.
38
 *
39
 * <p>This class is not thread-safe. Unless otherwise stated, all calls to public methods should be
40
 * made in the deframing thread.
41
 */
42
@NotThreadSafe
43
public class MessageDeframer implements Closeable, Deframer {
44
  private static final int HEADER_LENGTH = 5;
45
  private static final int COMPRESSED_FLAG_MASK = 1;
46
  private static final int RESERVED_MASK = 0xFE;
47
  private static final int MAX_BUFFER_SIZE = 1024 * 1024 * 2;
48

49
  /**
50
   * A listener of deframing events. These methods will be invoked from the deframing thread.
51
   */
52
  public interface Listener {
53

54
    /**
55
     * Called when the given number of bytes has been read from the input source of the deframer.
56
     * This is typically used to indicate to the underlying transport that more data can be
57
     * accepted.
58
     *
59
     * @param numBytes the number of bytes read from the deframer's input source.
60
     */
61
    void bytesRead(int numBytes);
62

63
    /**
64
     * Called to deliver the next complete message.
65
     *
66
     * @param producer single message producer wrapping the message.
67
     */
68
    void messagesAvailable(StreamListener.MessageProducer producer);
69

70
    /**
71
     * Called when the deframer closes.
72
     *
73
     * @param hasPartialMessage whether the deframer contained an incomplete message at closing.
74
     */
75
    void deframerClosed(boolean hasPartialMessage);
76

77
    /**
78
     * Called when a {@link #deframe(ReadableBuffer)} operation failed.
79
     *
80
     * @param cause the actual failure
81
     */
82
    void deframeFailed(Throwable cause);
83
  }
84

85
  private enum State {
1✔
86
    HEADER, BODY
1✔
87
  }
88

89
  private Listener listener;
90
  private int maxInboundMessageSize;
91
  private final StatsTraceContext statsTraceCtx;
92
  private final TransportTracer transportTracer;
93
  private Decompressor decompressor;
94
  private GzipInflatingBuffer fullStreamDecompressor;
95
  private byte[] inflatedBuffer;
96
  private int inflatedIndex;
97
  private State state = State.HEADER;
1✔
98
  private int requiredLength = HEADER_LENGTH;
1✔
99
  private boolean compressedFlag;
100
  private CompositeReadableBuffer nextFrame;
101
  private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
1✔
102
  private long pendingDeliveries;
103
  private boolean inDelivery = false;
1✔
104
  private int currentMessageSeqNo = -1;
1✔
105
  private int inboundBodyWireSize;
106

107
  private boolean closeWhenComplete = false;
1✔
108
  private volatile boolean stopDelivery = false;
1✔
109

110
  /**
111
   * Create a deframer.
112
   *
113
   * @param listener listener for deframer events.
114
   * @param decompressor the compression used if a compressed frame is encountered, with
115
   *                     {@code NONE} meaning unsupported
116
   * @param maxMessageSize the maximum allowed size for received messages.
117
   */
118
  public MessageDeframer(
119
      Listener listener,
120
      Decompressor decompressor,
121
      int maxMessageSize,
122
      StatsTraceContext statsTraceCtx,
123
      TransportTracer transportTracer) {
1✔
124
    this.listener = checkNotNull(listener, "sink");
1✔
125
    this.decompressor = checkNotNull(decompressor, "decompressor");
1✔
126
    this.maxInboundMessageSize = maxMessageSize;
1✔
127
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
128
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
129
  }
1✔
130

131
  void setListener(Listener listener) {
132
    this.listener = listener;
1✔
133
  }
1✔
134

135
  @Override
136
  public void setMaxInboundMessageSize(int messageSize) {
137
    maxInboundMessageSize = messageSize;
1✔
138
  }
1✔
139

140
  @Override
141
  public void setDecompressor(Decompressor decompressor) {
142
    checkState(fullStreamDecompressor == null, "Already set full stream decompressor");
1✔
143
    this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
1✔
144
  }
1✔
145

146
  @Override
147
  public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
148
    checkState(decompressor == Codec.Identity.NONE, "per-message decompressor already set");
1✔
149
    checkState(this.fullStreamDecompressor == null, "full stream decompressor already set");
1✔
150
    this.fullStreamDecompressor =
1✔
151
        checkNotNull(fullStreamDecompressor, "Can't pass a null full stream decompressor");
1✔
152
    unprocessed = null;
1✔
153
  }
1✔
154

155
  @Override
156
  public void request(int numMessages) {
157
    checkArgument(numMessages > 0, "numMessages must be > 0");
1✔
158
    if (isClosed()) {
1✔
159
      return;
1✔
160
    }
161
    pendingDeliveries += numMessages;
1✔
162
    deliver();
1✔
163
  }
1✔
164

165
  @Override
166
  public void deframe(ReadableBuffer data) {
167
    checkNotNull(data, "data");
1✔
168
    boolean needToCloseData = true;
1✔
169
    try {
170
      if (!isClosedOrScheduledToClose()) {
1✔
171
        if (fullStreamDecompressor != null) {
1✔
172
          fullStreamDecompressor.addGzippedBytes(data);
1✔
173
        } else {
174
          unprocessed.addBuffer(data);
1✔
175
        }
176
        needToCloseData = false;
1✔
177

178
        deliver();
1✔
179
      }
180
    } finally {
181
      if (needToCloseData) {
1✔
182
        data.close();
1✔
183
      }
184
    }
185
  }
1✔
186

187
  @Override
188
  public void closeWhenComplete() {
189
    if (isClosed()) {
1✔
190
      return;
1✔
191
    } else if (isStalled()) {
1✔
192
      close();
1✔
193
    } else {
194
      closeWhenComplete = true;
1✔
195
    }
196
  }
1✔
197

198
  /**
199
   * Sets a flag to interrupt delivery of any currently queued messages. This may be invoked outside
200
   * of the deframing thread, and must be followed by a call to {@link #close()} in the deframing
201
   * thread. Without a subsequent call to {@link #close()}, the deframer may hang waiting for
202
   * additional messages before noticing that the {@code stopDelivery} flag has been set.
203
   */
204
  void stopDelivery() {
205
    stopDelivery = true;
×
206
  }
×
207

208
  boolean hasPendingDeliveries() {
209
    return pendingDeliveries != 0;
×
210
  }
211

212
  @Override
213
  public void close() {
214
    if (isClosed()) {
1✔
215
      return;
×
216
    }
217
    boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
1✔
218
    try {
219
      if (fullStreamDecompressor != null) {
1✔
220
        hasPartialMessage = hasPartialMessage || fullStreamDecompressor.hasPartialData();
1✔
221
        fullStreamDecompressor.close();
1✔
222
      }
223
      if (unprocessed != null) {
1✔
224
        unprocessed.close();
1✔
225
      }
226
      if (nextFrame != null) {
1✔
227
        nextFrame.close();
1✔
228
      }
229
    } finally {
230
      fullStreamDecompressor = null;
1✔
231
      unprocessed = null;
1✔
232
      nextFrame = null;
1✔
233
    }
234
    listener.deframerClosed(hasPartialMessage);
1✔
235
  }
1✔
236

237
  /**
238
   * Indicates whether or not this deframer has been closed.
239
   */
240
  public boolean isClosed() {
241
    return unprocessed == null && fullStreamDecompressor == null;
1✔
242
  }
243

244
  /** Returns true if this deframer has already been closed or scheduled to close. */
245
  private boolean isClosedOrScheduledToClose() {
246
    return isClosed() || closeWhenComplete;
1✔
247
  }
248

249
  private boolean isStalled() {
250
    if (fullStreamDecompressor != null) {
1✔
251
      return fullStreamDecompressor.isStalled();
1✔
252
    } else {
253
      return unprocessed.readableBytes() == 0;
1✔
254
    }
255
  }
256

257
  /**
258
   * Reads and delivers as many messages to the listener as possible.
259
   */
260
  private void deliver() {
261
    // We can have reentrancy here when using a direct executor, triggered by calls to
262
    // request more messages. This is safe as we simply loop until pendingDelivers = 0
263
    if (inDelivery) {
1✔
264
      return;
1✔
265
    }
266
    inDelivery = true;
1✔
267
    try {
268
      // Process the uncompressed bytes.
269
      while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
1✔
270
        switch (state) {
1✔
271
          case HEADER:
272
            processHeader();
1✔
273
            break;
1✔
274
          case BODY:
275
            // Read the body and deliver the message.
276
            processBody();
1✔
277

278
            // Since we've delivered a message, decrement the number of pending
279
            // deliveries remaining.
280
            pendingDeliveries--;
1✔
281
            break;
1✔
282
          default:
283
            throw new AssertionError("Invalid state: " + state);
×
284
        }
285
      }
286

287
      if (stopDelivery) {
1✔
288
        close();
×
289
        return;
×
290
      }
291

292
      /*
293
       * We are stalled when there are no more bytes to process. This allows delivering errors as
294
       * soon as the buffered input has been consumed, independent of whether the application
295
       * has requested another message.  At this point in the function, either all frames have been
296
       * delivered, or unprocessed is empty.  If there is a partial message, it will be inside next
297
       * frame and not in unprocessed.  If there is extra data but no pending deliveries, it will
298
       * be in unprocessed.
299
       */
300
      if (closeWhenComplete && isStalled()) {
1✔
301
        close();
1✔
302
      }
303
    } finally {
304
      inDelivery = false;
1✔
305
    }
306
  }
1✔
307

308
  /**
309
   * Attempts to read the required bytes into nextFrame.
310
   *
311
   * @return {@code true} if all of the required bytes have been read.
312
   */
313
  private boolean readRequiredBytes() {
314
    int totalBytesRead = 0;
1✔
315
    int deflatedBytesRead = 0;
1✔
316
    try {
317
      if (nextFrame == null) {
1✔
318
        nextFrame = new CompositeReadableBuffer();
1✔
319
      }
320

321
      // Read until the buffer contains all the required bytes.
322
      int missingBytes;
323
      while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
1✔
324
        if (fullStreamDecompressor != null) {
1✔
325
          try {
326
            if (inflatedBuffer == null || inflatedIndex == inflatedBuffer.length) {
1✔
327
              inflatedBuffer = new byte[Math.min(missingBytes, MAX_BUFFER_SIZE)];
1✔
328
              inflatedIndex = 0;
1✔
329
            }
330
            int bytesToRead = Math.min(missingBytes, inflatedBuffer.length - inflatedIndex);
1✔
331
            int n = fullStreamDecompressor.inflateBytes(inflatedBuffer, inflatedIndex, bytesToRead);
1✔
332
            totalBytesRead += fullStreamDecompressor.getAndResetBytesConsumed();
1✔
333
            deflatedBytesRead += fullStreamDecompressor.getAndResetDeflatedBytesConsumed();
1✔
334
            if (n == 0) {
1✔
335
              // No more inflated data is available.
336
              return false;
1✔
337
            }
338
            nextFrame.addBuffer(ReadableBuffers.wrap(inflatedBuffer, inflatedIndex, n));
1✔
339
            inflatedIndex += n;
1✔
340
          } catch (IOException e) {
×
341
            throw new RuntimeException(e);
×
342
          } catch (DataFormatException e) {
×
343
            throw new RuntimeException(e);
×
344
          }
1✔
345
        } else {
346
          if (unprocessed.readableBytes() == 0) {
1✔
347
            // No more data is available.
348
            return false;
1✔
349
          }
350
          int toRead = Math.min(missingBytes, unprocessed.readableBytes());
1✔
351
          totalBytesRead += toRead;
1✔
352
          nextFrame.addBuffer(unprocessed.readBytes(toRead));
1✔
353
        }
1✔
354
      }
355
      return true;
1✔
356
    } finally {
357
      if (totalBytesRead > 0) {
1✔
358
        listener.bytesRead(totalBytesRead);
1✔
359
        if (state == State.BODY) {
1✔
360
          if (fullStreamDecompressor != null) {
1✔
361
            // With compressed streams, totalBytesRead can include gzip header and trailer metadata
362
            statsTraceCtx.inboundWireSize(deflatedBytesRead);
1✔
363
            inboundBodyWireSize += deflatedBytesRead;
1✔
364
          } else {
365
            statsTraceCtx.inboundWireSize(totalBytesRead);
1✔
366
            inboundBodyWireSize += totalBytesRead;
1✔
367
          }
368
        }
369
      }
370
    }
371
  }
372

373
  /**
374
   * Processes the GRPC compression header which is composed of the compression flag and the outer
375
   * frame length.
376
   */
377
  private void processHeader() {
378
    int type = nextFrame.readUnsignedByte();
1✔
379
    if ((type & RESERVED_MASK) != 0) {
1✔
380
      throw Status.INTERNAL.withDescription(
×
381
          "gRPC frame header malformed: reserved bits not zero")
382
          .asRuntimeException();
×
383
    }
384
    compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
1✔
385

386
    // Update the required length to include the length of the frame.
387
    requiredLength = nextFrame.readInt();
1✔
388
    if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
1✔
389
      throw Status.RESOURCE_EXHAUSTED.withDescription(
1✔
390
          String.format(Locale.US, "gRPC message exceeds maximum size %d: %d",
1✔
391
              maxInboundMessageSize, requiredLength))
1✔
392
          .asRuntimeException();
1✔
393
    }
394

395
    currentMessageSeqNo++;
1✔
396
    statsTraceCtx.inboundMessage(currentMessageSeqNo);
1✔
397
    transportTracer.reportMessageReceived();
1✔
398
    // Continue reading the frame body.
399
    state = State.BODY;
1✔
400
  }
1✔
401

402
  /**
403
   * Processes the GRPC message body, which depending on frame header flags may be compressed.
404
   */
405
  private void processBody() {
406
    // There is no reliable way to get the uncompressed size per message when it's compressed,
407
    // because the uncompressed bytes are provided through an InputStream whose total size is
408
    // unknown until all bytes are read, and we don't know when it happens.
409
    statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
1✔
410
    inboundBodyWireSize = 0;
1✔
411
    InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
1✔
412
    nextFrame.touch();
1✔
413
    nextFrame = null;
1✔
414
    listener.messagesAvailable(new SingleMessageProducer(stream));
1✔
415

416
    // Done with this frame, begin processing the next header.
417
    state = State.HEADER;
1✔
418
    requiredLength = HEADER_LENGTH;
1✔
419
  }
1✔
420

421
  private InputStream getUncompressedBody() {
422
    statsTraceCtx.inboundUncompressedSize(nextFrame.readableBytes());
1✔
423
    return ReadableBuffers.openStream(nextFrame, true);
1✔
424
  }
425

426
  private InputStream getCompressedBody() {
427
    if (decompressor == Codec.Identity.NONE) {
1✔
428
      throw Status.INTERNAL.withDescription(
×
429
          "Can't decode compressed gRPC message as compression not configured")
430
          .asRuntimeException();
×
431
    }
432

433
    try {
434
      // Enforce the maxMessageSize limit on the returned stream.
435
      InputStream unlimitedStream =
1✔
436
          decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
1✔
437
      return new SizeEnforcingInputStream(
1✔
438
          unlimitedStream, maxInboundMessageSize, statsTraceCtx);
439
    } catch (IOException e) {
×
440
      throw new RuntimeException(e);
×
441
    }
442
  }
443

444
  /**
445
   * An {@link InputStream} that enforces the {@link #maxMessageSize} limit for compressed frames.
446
   */
447
  @VisibleForTesting
448
  static final class SizeEnforcingInputStream extends FilterInputStream {
449
    private final int maxMessageSize;
450
    private final StatsTraceContext statsTraceCtx;
451
    private long maxCount;
452
    private long count;
453
    private long mark = -1;
1✔
454

455
    SizeEnforcingInputStream(InputStream in, int maxMessageSize, StatsTraceContext statsTraceCtx) {
456
      super(in);
1✔
457
      this.maxMessageSize = maxMessageSize;
1✔
458
      this.statsTraceCtx = statsTraceCtx;
1✔
459
    }
1✔
460

461
    @Override
462
    public int read() throws IOException {
463
      int result = in.read();
1✔
464
      if (result != -1) {
1✔
465
        count++;
1✔
466
      }
467
      verifySize();
1✔
468
      reportCount();
1✔
469
      return result;
1✔
470
    }
471

472
    @Override
473
    public int read(byte[] b, int off, int len) throws IOException {
474
      int result = in.read(b, off, len);
1✔
475
      if (result != -1) {
1✔
476
        count += result;
1✔
477
      }
478
      verifySize();
1✔
479
      reportCount();
1✔
480
      return result;
1✔
481
    }
482

483
    @Override
484
    public long skip(long n) throws IOException {
485
      long result = in.skip(n);
1✔
486
      count += result;
1✔
487
      verifySize();
1✔
488
      reportCount();
1✔
489
      return result;
1✔
490
    }
491

492
    @Override
493
    public synchronized void mark(int readlimit) {
494
      in.mark(readlimit);
1✔
495
      mark = count;
1✔
496
      // it's okay to mark even if mark isn't supported, as reset won't work
497
    }
1✔
498

499
    @Override
500
    public synchronized void reset() throws IOException {
501
      if (!in.markSupported()) {
1✔
502
        throw new IOException("Mark not supported");
×
503
      }
504
      if (mark == -1) {
1✔
505
        throw new IOException("Mark not set");
×
506
      }
507

508
      in.reset();
1✔
509
      count = mark;
1✔
510
    }
1✔
511

512
    private void reportCount() {
513
      if (count > maxCount) {
1✔
514
        statsTraceCtx.inboundUncompressedSize(count - maxCount);
1✔
515
        maxCount = count;
1✔
516
      }
517
    }
1✔
518

519
    private void verifySize() {
520
      if (count > maxMessageSize) {
1✔
521
        throw Status.RESOURCE_EXHAUSTED
1✔
522
            .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)
1✔
523
            .asRuntimeException();
1✔
524
      }
525
    }
1✔
526
  }
527

528
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
529
    private InputStream message;
530

531
    private SingleMessageProducer(InputStream message) {
1✔
532
      this.message = message;
1✔
533
    }
1✔
534

535
    @Nullable
536
    @Override
537
    public InputStream next() {
538
      InputStream messageToReturn = message;
1✔
539
      message = null;
1✔
540
      return messageToReturn;
1✔
541
    }
542
  }
543
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc