• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19498

07 Oct 2024 05:44PM UTC coverage: 84.654% (-0.002%) from 84.656%
#19498

push

github

web-flow
report uncompressed message size when it does not need compression (#11598)

33771 of 39893 relevant lines covered (84.65%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.08
/../core/src/main/java/io/grpc/internal/MessageDeframer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Codec;
25
import io.grpc.Decompressor;
26
import io.grpc.Status;
27
import java.io.Closeable;
28
import java.io.FilterInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.Locale;
32
import java.util.zip.DataFormatException;
33
import javax.annotation.Nullable;
34
import javax.annotation.concurrent.NotThreadSafe;
35

36
/**
37
 * Deframer for GRPC frames.
38
 *
39
 * <p>This class is not thread-safe. Unless otherwise stated, all calls to public methods should be
40
 * made in the deframing thread.
41
 */
42
@NotThreadSafe
43
public class MessageDeframer implements Closeable, Deframer {
44
  private static final int HEADER_LENGTH = 5;
45
  private static final int COMPRESSED_FLAG_MASK = 1;
46
  private static final int RESERVED_MASK = 0xFE;
47
  private static final int MAX_BUFFER_SIZE = 1024 * 1024 * 2;
48

49
  /**
50
   * A listener of deframing events. These methods will be invoked from the deframing thread.
51
   */
52
  public interface Listener {
53

54
    /**
55
     * Called when the given number of bytes has been read from the input source of the deframer.
56
     * This is typically used to indicate to the underlying transport that more data can be
57
     * accepted.
58
     *
59
     * @param numBytes the number of bytes read from the deframer's input source.
60
     */
61
    void bytesRead(int numBytes);
62

63
    /**
64
     * Called to deliver the next complete message.
65
     *
66
     * @param producer single message producer wrapping the message.
67
     */
68
    void messagesAvailable(StreamListener.MessageProducer producer);
69

70
    /**
71
     * Called when the deframer closes.
72
     *
73
     * @param hasPartialMessage whether the deframer contained an incomplete message at closing.
74
     */
75
    void deframerClosed(boolean hasPartialMessage);
76

77
    /**
78
     * Called when a {@link #deframe(ReadableBuffer)} operation failed.
79
     *
80
     * @param cause the actual failure
81
     */
82
    void deframeFailed(Throwable cause);
83
  }
84

85
  private enum State {
1✔
86
    HEADER, BODY
1✔
87
  }
88

89
  private Listener listener;
90
  private int maxInboundMessageSize;
91
  private final StatsTraceContext statsTraceCtx;
92
  private final TransportTracer transportTracer;
93
  private Decompressor decompressor;
94
  private GzipInflatingBuffer fullStreamDecompressor;
95
  private byte[] inflatedBuffer;
96
  private int inflatedIndex;
97
  private State state = State.HEADER;
1✔
98
  private int requiredLength = HEADER_LENGTH;
1✔
99
  private boolean compressedFlag;
100
  private CompositeReadableBuffer nextFrame;
101
  private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
1✔
102
  private long pendingDeliveries;
103
  private boolean inDelivery = false;
1✔
104
  private int currentMessageSeqNo = -1;
1✔
105
  private int inboundBodyWireSize;
106

107
  private boolean closeWhenComplete = false;
1✔
108
  private volatile boolean stopDelivery = false;
1✔
109

110
  /**
111
   * Create a deframer.
112
   *
113
   * @param listener listener for deframer events.
114
   * @param decompressor the compression used if a compressed frame is encountered, with
115
   *                     {@code NONE} meaning unsupported
116
   * @param maxMessageSize the maximum allowed size for received messages.
117
   */
118
  public MessageDeframer(
119
      Listener listener,
120
      Decompressor decompressor,
121
      int maxMessageSize,
122
      StatsTraceContext statsTraceCtx,
123
      TransportTracer transportTracer) {
1✔
124
    this.listener = checkNotNull(listener, "sink");
1✔
125
    this.decompressor = checkNotNull(decompressor, "decompressor");
1✔
126
    this.maxInboundMessageSize = maxMessageSize;
1✔
127
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
128
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
129
  }
1✔
130

131
  void setListener(Listener listener) {
132
    this.listener = listener;
1✔
133
  }
1✔
134

135
  @Override
136
  public void setMaxInboundMessageSize(int messageSize) {
137
    maxInboundMessageSize = messageSize;
1✔
138
  }
1✔
139

140
  @Override
141
  public void setDecompressor(Decompressor decompressor) {
142
    checkState(fullStreamDecompressor == null, "Already set full stream decompressor");
1✔
143
    this.decompressor = checkNotNull(decompressor, "Can't pass an empty decompressor");
1✔
144
  }
1✔
145

146
  @Override
147
  public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
148
    checkState(decompressor == Codec.Identity.NONE, "per-message decompressor already set");
1✔
149
    checkState(this.fullStreamDecompressor == null, "full stream decompressor already set");
1✔
150
    this.fullStreamDecompressor =
1✔
151
        checkNotNull(fullStreamDecompressor, "Can't pass a null full stream decompressor");
1✔
152
    unprocessed = null;
1✔
153
  }
1✔
154

155
  @Override
156
  public void request(int numMessages) {
157
    checkArgument(numMessages > 0, "numMessages must be > 0");
1✔
158
    if (isClosed()) {
1✔
159
      return;
1✔
160
    }
161
    pendingDeliveries += numMessages;
1✔
162
    deliver();
1✔
163
  }
1✔
164

165
  @Override
166
  public void deframe(ReadableBuffer data) {
167
    checkNotNull(data, "data");
1✔
168
    boolean needToCloseData = true;
1✔
169
    try {
170
      if (!isClosedOrScheduledToClose()) {
1✔
171
        if (fullStreamDecompressor != null) {
1✔
172
          fullStreamDecompressor.addGzippedBytes(data);
1✔
173
        } else {
174
          unprocessed.addBuffer(data);
1✔
175
        }
176
        needToCloseData = false;
1✔
177

178
        deliver();
1✔
179
      }
180
    } finally {
181
      if (needToCloseData) {
1✔
182
        data.close();
1✔
183
      }
184
    }
185
  }
1✔
186

187
  @Override
188
  public void closeWhenComplete() {
189
    if (isClosed()) {
1✔
190
      return;
1✔
191
    } else if (isStalled()) {
1✔
192
      close();
1✔
193
    } else {
194
      closeWhenComplete = true;
1✔
195
    }
196
  }
1✔
197

198
  /**
199
   * Sets a flag to interrupt delivery of any currently queued messages. This may be invoked outside
200
   * of the deframing thread, and must be followed by a call to {@link #close()} in the deframing
201
   * thread. Without a subsequent call to {@link #close()}, the deframer may hang waiting for
202
   * additional messages before noticing that the {@code stopDelivery} flag has been set.
203
   */
204
  void stopDelivery() {
205
    stopDelivery = true;
×
206
  }
×
207

208
  boolean hasPendingDeliveries() {
209
    return pendingDeliveries != 0;
×
210
  }
211

212
  @Override
213
  public void close() {
214
    if (isClosed()) {
1✔
215
      return;
×
216
    }
217
    boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
1✔
218
    try {
219
      if (fullStreamDecompressor != null) {
1✔
220
        hasPartialMessage = hasPartialMessage || fullStreamDecompressor.hasPartialData();
1✔
221
        fullStreamDecompressor.close();
1✔
222
      }
223
      if (unprocessed != null) {
1✔
224
        unprocessed.close();
1✔
225
      }
226
      if (nextFrame != null) {
1✔
227
        nextFrame.close();
1✔
228
      }
229
    } finally {
230
      fullStreamDecompressor = null;
1✔
231
      unprocessed = null;
1✔
232
      nextFrame = null;
1✔
233
    }
234
    listener.deframerClosed(hasPartialMessage);
1✔
235
  }
1✔
236

237
  /**
238
   * Indicates whether or not this deframer has been closed.
239
   */
240
  public boolean isClosed() {
241
    return unprocessed == null && fullStreamDecompressor == null;
1✔
242
  }
243

244
  /** Returns true if this deframer has already been closed or scheduled to close. */
245
  private boolean isClosedOrScheduledToClose() {
246
    return isClosed() || closeWhenComplete;
1✔
247
  }
248

249
  private boolean isStalled() {
250
    if (fullStreamDecompressor != null) {
1✔
251
      return fullStreamDecompressor.isStalled();
1✔
252
    } else {
253
      return unprocessed.readableBytes() == 0;
1✔
254
    }
255
  }
256

257
  /**
258
   * Reads and delivers as many messages to the listener as possible.
259
   */
260
  private void deliver() {
261
    // We can have reentrancy here when using a direct executor, triggered by calls to
262
    // request more messages. This is safe as we simply loop until pendingDelivers = 0
263
    if (inDelivery) {
1✔
264
      return;
1✔
265
    }
266
    inDelivery = true;
1✔
267
    try {
268
      // Process the uncompressed bytes.
269
      while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
1✔
270
        switch (state) {
1✔
271
          case HEADER:
272
            processHeader();
1✔
273
            break;
1✔
274
          case BODY:
275
            // Read the body and deliver the message.
276
            processBody();
1✔
277

278
            // Since we've delivered a message, decrement the number of pending
279
            // deliveries remaining.
280
            pendingDeliveries--;
1✔
281
            break;
1✔
282
          default:
283
            throw new AssertionError("Invalid state: " + state);
×
284
        }
285
      }
286

287
      if (stopDelivery) {
1✔
288
        close();
×
289
        return;
×
290
      }
291

292
      /*
293
       * We are stalled when there are no more bytes to process. This allows delivering errors as
294
       * soon as the buffered input has been consumed, independent of whether the application
295
       * has requested another message.  At this point in the function, either all frames have been
296
       * delivered, or unprocessed is empty.  If there is a partial message, it will be inside next
297
       * frame and not in unprocessed.  If there is extra data but no pending deliveries, it will
298
       * be in unprocessed.
299
       */
300
      if (closeWhenComplete && isStalled()) {
1✔
301
        close();
1✔
302
      }
303
    } finally {
304
      inDelivery = false;
1✔
305
    }
306
  }
1✔
307

308
  /**
309
   * Attempts to read the required bytes into nextFrame.
310
   *
311
   * @return {@code true} if all of the required bytes have been read.
312
   */
313
  private boolean readRequiredBytes() {
314
    int totalBytesRead = 0;
1✔
315
    int deflatedBytesRead = 0;
1✔
316
    try {
317
      if (nextFrame == null) {
1✔
318
        nextFrame = new CompositeReadableBuffer();
1✔
319
      }
320

321
      // Read until the buffer contains all the required bytes.
322
      int missingBytes;
323
      while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
1✔
324
        if (fullStreamDecompressor != null) {
1✔
325
          try {
326
            if (inflatedBuffer == null || inflatedIndex == inflatedBuffer.length) {
1✔
327
              inflatedBuffer = new byte[Math.min(missingBytes, MAX_BUFFER_SIZE)];
1✔
328
              inflatedIndex = 0;
1✔
329
            }
330
            int bytesToRead = Math.min(missingBytes, inflatedBuffer.length - inflatedIndex);
1✔
331
            int n = fullStreamDecompressor.inflateBytes(inflatedBuffer, inflatedIndex, bytesToRead);
1✔
332
            totalBytesRead += fullStreamDecompressor.getAndResetBytesConsumed();
1✔
333
            deflatedBytesRead += fullStreamDecompressor.getAndResetDeflatedBytesConsumed();
1✔
334
            if (n == 0) {
1✔
335
              // No more inflated data is available.
336
              return false;
1✔
337
            }
338
            nextFrame.addBuffer(ReadableBuffers.wrap(inflatedBuffer, inflatedIndex, n));
1✔
339
            inflatedIndex += n;
1✔
340
          } catch (IOException e) {
×
341
            throw new RuntimeException(e);
×
342
          } catch (DataFormatException e) {
×
343
            throw new RuntimeException(e);
×
344
          }
1✔
345
        } else {
346
          if (unprocessed.readableBytes() == 0) {
1✔
347
            // No more data is available.
348
            return false;
1✔
349
          }
350
          int toRead = Math.min(missingBytes, unprocessed.readableBytes());
1✔
351
          totalBytesRead += toRead;
1✔
352
          nextFrame.addBuffer(unprocessed.readBytes(toRead));
1✔
353
        }
1✔
354
      }
355
      return true;
1✔
356
    } finally {
357
      if (totalBytesRead > 0) {
1✔
358
        listener.bytesRead(totalBytesRead);
1✔
359
        if (state == State.BODY) {
1✔
360
          if (fullStreamDecompressor != null) {
1✔
361
            // With compressed streams, totalBytesRead can include gzip header and trailer metadata
362
            statsTraceCtx.inboundWireSize(deflatedBytesRead);
1✔
363
            inboundBodyWireSize += deflatedBytesRead;
1✔
364
          } else {
365
            statsTraceCtx.inboundWireSize(totalBytesRead);
1✔
366
            inboundBodyWireSize += totalBytesRead;
1✔
367
          }
368
        }
369
      }
370
    }
371
  }
372

373
  /**
374
   * Processes the GRPC compression header which is composed of the compression flag and the outer
375
   * frame length.
376
   */
377
  private void processHeader() {
378
    int type = nextFrame.readUnsignedByte();
1✔
379
    if ((type & RESERVED_MASK) != 0) {
1✔
380
      throw Status.INTERNAL.withDescription(
×
381
          "gRPC frame header malformed: reserved bits not zero")
382
          .asRuntimeException();
×
383
    }
384
    compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
1✔
385

386
    // Update the required length to include the length of the frame.
387
    requiredLength = nextFrame.readInt();
1✔
388
    if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
1✔
389
      throw Status.RESOURCE_EXHAUSTED.withDescription(
1✔
390
          String.format(Locale.US, "gRPC message exceeds maximum size %d: %d",
1✔
391
              maxInboundMessageSize, requiredLength))
1✔
392
          .asRuntimeException();
1✔
393
    }
394

395
    currentMessageSeqNo++;
1✔
396
    statsTraceCtx.inboundMessage(currentMessageSeqNo);
1✔
397
    transportTracer.reportMessageReceived();
1✔
398
    // Continue reading the frame body.
399
    state = State.BODY;
1✔
400
  }
1✔
401

402
  /**
403
   * Processes the GRPC message body, which depending on frame header flags may be compressed.
404
   */
405
  private void processBody() {
406
    // There is no reliable way to get the uncompressed size per message when it's compressed,
407
    // because the uncompressed bytes are provided through an InputStream whose total size is
408
    // unknown until all bytes are read, and we don't know when it happens.
409
    statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize,
1✔
410
        (compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize);
1✔
411
    inboundBodyWireSize = 0;
1✔
412
    InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
1✔
413
    nextFrame.touch();
1✔
414
    nextFrame = null;
1✔
415
    listener.messagesAvailable(new SingleMessageProducer(stream));
1✔
416

417
    // Done with this frame, begin processing the next header.
418
    state = State.HEADER;
1✔
419
    requiredLength = HEADER_LENGTH;
1✔
420
  }
1✔
421

422
  private InputStream getUncompressedBody() {
423
    statsTraceCtx.inboundUncompressedSize(nextFrame.readableBytes());
1✔
424
    return ReadableBuffers.openStream(nextFrame, true);
1✔
425
  }
426

427
  private InputStream getCompressedBody() {
428
    if (decompressor == Codec.Identity.NONE) {
1✔
429
      throw Status.INTERNAL.withDescription(
×
430
          "Can't decode compressed gRPC message as compression not configured")
431
          .asRuntimeException();
×
432
    }
433

434
    try {
435
      // Enforce the maxMessageSize limit on the returned stream.
436
      InputStream unlimitedStream =
1✔
437
          decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
1✔
438
      return new SizeEnforcingInputStream(
1✔
439
          unlimitedStream, maxInboundMessageSize, statsTraceCtx);
440
    } catch (IOException e) {
×
441
      throw new RuntimeException(e);
×
442
    }
443
  }
444

445
  /**
446
   * An {@link InputStream} that enforces the {@link #maxMessageSize} limit for compressed frames.
447
   */
448
  @VisibleForTesting
449
  static final class SizeEnforcingInputStream extends FilterInputStream {
450
    private final int maxMessageSize;
451
    private final StatsTraceContext statsTraceCtx;
452
    private long maxCount;
453
    private long count;
454
    private long mark = -1;
1✔
455

456
    SizeEnforcingInputStream(InputStream in, int maxMessageSize, StatsTraceContext statsTraceCtx) {
457
      super(in);
1✔
458
      this.maxMessageSize = maxMessageSize;
1✔
459
      this.statsTraceCtx = statsTraceCtx;
1✔
460
    }
1✔
461

462
    @Override
463
    public int read() throws IOException {
464
      int result = in.read();
1✔
465
      if (result != -1) {
1✔
466
        count++;
1✔
467
      }
468
      verifySize();
1✔
469
      reportCount();
1✔
470
      return result;
1✔
471
    }
472

473
    @Override
474
    public int read(byte[] b, int off, int len) throws IOException {
475
      int result = in.read(b, off, len);
1✔
476
      if (result != -1) {
1✔
477
        count += result;
1✔
478
      }
479
      verifySize();
1✔
480
      reportCount();
1✔
481
      return result;
1✔
482
    }
483

484
    @Override
485
    public long skip(long n) throws IOException {
486
      long result = in.skip(n);
1✔
487
      count += result;
1✔
488
      verifySize();
1✔
489
      reportCount();
1✔
490
      return result;
1✔
491
    }
492

493
    @Override
494
    public synchronized void mark(int readlimit) {
495
      in.mark(readlimit);
1✔
496
      mark = count;
1✔
497
      // it's okay to mark even if mark isn't supported, as reset won't work
498
    }
1✔
499

500
    @Override
501
    public synchronized void reset() throws IOException {
502
      if (!in.markSupported()) {
1✔
503
        throw new IOException("Mark not supported");
×
504
      }
505
      if (mark == -1) {
1✔
506
        throw new IOException("Mark not set");
×
507
      }
508

509
      in.reset();
1✔
510
      count = mark;
1✔
511
    }
1✔
512

513
    private void reportCount() {
514
      if (count > maxCount) {
1✔
515
        statsTraceCtx.inboundUncompressedSize(count - maxCount);
1✔
516
        maxCount = count;
1✔
517
      }
518
    }
1✔
519

520
    private void verifySize() {
521
      if (count > maxMessageSize) {
1✔
522
        throw Status.RESOURCE_EXHAUSTED
1✔
523
            .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)
1✔
524
            .asRuntimeException();
1✔
525
      }
526
    }
1✔
527
  }
528

529
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
530
    private InputStream message;
531

532
    private SingleMessageProducer(InputStream message) {
1✔
533
      this.message = message;
1✔
534
    }
1✔
535

536
    @Nullable
537
    @Override
538
    public InputStream next() {
539
      InputStream messageToReturn = message;
1✔
540
      message = null;
1✔
541
      return messageToReturn;
1✔
542
    }
543
  }
544
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc