• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19298

21 Jun 2024 11:31PM CUT coverage: 88.456% (-0.002%) from 88.458%
#19298

push

github

web-flow
netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 (#11294) (#11303)

* Use addComponent instead of addFlattenedComponent and do not append to components that are composites.

32060 of 36244 relevant lines covered (88.46%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/../netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.Preconditions;
21
import io.netty.buffer.ByteBuf;
22
import io.netty.buffer.ByteBufAllocator;
23
import io.netty.buffer.CompositeByteBuf;
24
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
25

26

27
/**
28
 * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
29
 * compose strategies.
30
 * <br><br>
31
 *
32
 * <p><b><font color="red">Avoid using</font></b>
33
 * {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
34
 * to corruption, where the components' readable area are not equal to the Composite's capacity
35
 * (see https://github.com/netty/netty/issues/12844).
36
 */
37

38
class NettyAdaptiveCumulator implements Cumulator {
39
  private final int composeMinSize;
40

41
  /**
42
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
43
   * compose strategies.
44
   *
45
   * @param composeMinSize Determines the minimal size of the buffer that should be composed (added
46
   *                       as a new component of the {@link CompositeByteBuf}). If the total size
47
   *                       of the last component (tail) and the incoming buffer is below this value,
48
   *                       the incoming buffer is appended to the tail, and the new component is not
49
   *                       added.
50
   */
51
  NettyAdaptiveCumulator(int composeMinSize) {
1✔
52
    Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative");
1✔
53
    this.composeMinSize = composeMinSize;
1✔
54
  }
1✔
55

56
  /**
57
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
58
   * compose strategies.
59
   *
60
   * <p>This cumulator applies a heuristic to make a decision whether to track a reference to the
61
   * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into
62
   * the last component (the tail) by performing a memory copy.
63
   *
64
   * <p>It is necessary as a protection from a potential attack on the {@link
65
   * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case
66
   * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator
67
   * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra
68
   * compute to read the cumulation.
69
   *
70
   * <p>Implemented heuristic establishes a minimal threshold for the total size of the tail and
71
   * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is
72
   * used to avoid a case where attacker alternates the size of data packets to trick the cumulator
73
   * into always selecting compose strategy.
74
   *
75
   * <p>Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands
76
   * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when
77
   * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new
78
   * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure
79
   * runtime {@code O(n^2)} is amortized to {@code O(n)}.
80
   */
81
  @Override
82
  @SuppressWarnings("ReferenceEquality")
83
  public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
84
    if (!cumulation.isReadable()) {
1✔
85
      cumulation.release();
1✔
86
      return in;
1✔
87
    }
88
    CompositeByteBuf composite = null;
1✔
89
    try {
90
      if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
1✔
91
        composite = (CompositeByteBuf) cumulation;
1✔
92
        // Writer index must equal capacity if we are going to "write"
93
        // new components to the end
94
        if (composite.writerIndex() != composite.capacity()) {
1✔
95
          composite.capacity(composite.writerIndex());
×
96
        }
97
      } else {
98
        composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
1✔
99
      }
100
      addInput(alloc, composite, in);
1✔
101
      in = null;
1✔
102
      return composite;
1✔
103
    } finally {
104
      if (in != null) {
1✔
105
        // We must release if the ownership was not transferred as otherwise it may produce a leak
106
        in.release();
1✔
107
        // Also release any new buffer allocated if we're not returning it
108
        if (composite != null && composite != cumulation) {
1✔
109
          composite.release();
1✔
110
        }
111
      }
112
    }
113
  }
114

115
  @VisibleForTesting
116
  void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
117
    if (shouldCompose(composite, in, composeMinSize)) {
1✔
118
      composite.addComponent(true, in);
1✔
119
    } else {
120
      // The total size of the new data and the last component are below the threshold. Merge them.
121
      mergeWithCompositeTail(alloc, composite, in);
1✔
122
    }
123
  }
1✔
124

125
  @VisibleForTesting
126
  static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) {
127
    int componentCount = composite.numComponents();
1✔
128
    if (composite.numComponents() == 0) {
1✔
129
      return true;
×
130
    }
131
    int inputSize = in.readableBytes();
1✔
132
    int tailStart = composite.toByteIndex(componentCount - 1);
1✔
133
    int tailSize = composite.writerIndex() - tailStart;
1✔
134
    return tailSize + inputSize >= composeMinSize;
1✔
135
  }
136

137
  /**
138
   * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by
139
   * expanding or replacing the tail component of the {@link CompositeByteBuf}.
140
   *
141
   * <p>The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying
142
   * the tail component into a new buffer, for each incoming single-byte buffer. We append the new
143
   * bytes to the tail, when a write (or a fast write) is possible.
144
   *
145
   * <p>Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to
146
   * achieve runtime amortization.
147
   *
148
   * <p>We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)},
149
   * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)},
150
   * which doubles buffer capacity by normalizing it to the closest power of two. This assumption
151
   * is verified in unit tests for this method.
152
   */
153
  @VisibleForTesting
154
  static void mergeWithCompositeTail(
155
      ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
156
    int inputSize = in.readableBytes();
1✔
157
    int tailComponentIndex = composite.numComponents() - 1;
1✔
158
    int tailStart = composite.toByteIndex(tailComponentIndex);
1✔
159
    int tailSize = composite.writerIndex() - tailStart;
1✔
160
    int newTailSize = inputSize + tailSize;
1✔
161
    ByteBuf tail = composite.component(tailComponentIndex);
1✔
162
    ByteBuf newTail = null;
1✔
163
    try {
164
      if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
1✔
165
          && !isCompositeOrWrappedComposite(tail)) {
1✔
166
        // Ideal case: the tail isn't shared, and can be expanded to the required capacity.
167

168
        // Take ownership of the tail.
169
        newTail = tail.retain();
1✔
170

171
        /*
172
         * The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
173
         *
174
         * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply
175
         *   updates its boundary to the end of consecutive memory run assigned to this buffer)
176
         * - when the required size doesn't fit into writableBytes(), a new buffer is
177
         *   allocated, and the capacity calculated with alloc.calculateNewCapacity()
178
         * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf
179
         *   is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer.
180
         *   Unwrapping buffers is unsafe, and potential benefit of fast writes may not be
181
         *   as pronounced because the capacity is doubled with each reallocation.
182
         */
183
        newTail.writeBytes(in);
1✔
184

185
      } else {
186
        // The tail satisfies one or more criteria:
187
        // - Shared
188
        // - Not expandable
189
        // - Composite
190
        // - Wrapped Composite
191
        newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
1✔
192
        newTail.setBytes(0, composite, tailStart, tailSize)
1✔
193
            .setBytes(tailSize, in, in.readerIndex(), inputSize)
1✔
194
            .writerIndex(newTailSize);
1✔
195
        in.readerIndex(in.writerIndex());
1✔
196
      }
197

198
      // Store readerIndex to avoid out of bounds writerIndex during component replacement.
199
      int prevReader = composite.readerIndex();
1✔
200
      // Remove the old tail, reset writer index.
201
      composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
1✔
202
      // Add back the new tail.
203
      composite.addComponent(true, newTail);
1✔
204
      // New tail's ownership transferred to the composite buf.
205
      newTail = null;
1✔
206
      composite.readerIndex(prevReader);
1✔
207
      // Input buffer was successfully merged with the tail.
208
      // Must be the last line in the try because we release it only on success.
209
      in.release();
1✔
210
    } finally {
211
      // If new tail's ownership isn't transferred to the composite buf.
212
      // Release it to prevent a leak.
213
      if (newTail != null) {
1✔
214
        newTail.release();
1✔
215
      }
216
    }
217
  }
1✔
218

219
  private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
220
    ByteBuf cur = tail;
1✔
221
    while (cur.unwrap() != null) {
1✔
222
      cur = cur.unwrap();
1✔
223
    }
224
    return cur instanceof CompositeByteBuf;
1✔
225
  }
226
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc