• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19348

10 Jul 2024 10:03PM CUT coverage: 88.448% (+0.02%) from 88.432%
#19348

push

github

web-flow
Restore old behavior of NettyAdaptiveCumulator, but avoid using that class if Netty is on version 4.1.111 or later. (#11367) (#11373)

32072 of 36261 relevant lines covered (88.45%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.31
/../netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.Preconditions;
21
import io.netty.buffer.ByteBuf;
22
import io.netty.buffer.ByteBufAllocator;
23
import io.netty.buffer.CompositeByteBuf;
24
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
25

26

27
/**
28
 * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
29
 * compose strategies.
30
 */
31

32
class NettyAdaptiveCumulator implements Cumulator {
33
  private final int composeMinSize;
34

35
  /**
36
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
37
   * compose strategies.
38
   *
39
   * @param composeMinSize Determines the minimal size of the buffer that should be composed (added
40
   *                       as a new component of the {@link CompositeByteBuf}). If the total size
41
   *                       of the last component (tail) and the incoming buffer is below this value,
42
   *                       the incoming buffer is appended to the tail, and the new component is not
43
   *                       added.
44
   */
45
  NettyAdaptiveCumulator(int composeMinSize) {
1✔
46
    Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative");
1✔
47
    this.composeMinSize = composeMinSize;
1✔
48
  }
1✔
49

50
  /**
51
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
52
   * compose strategies.
53
   *
54
   * <p>This cumulator applies a heuristic to make a decision whether to track a reference to the
55
   * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into
56
   * the last component (the tail) by performing a memory copy.
57
   *
58
   * <p>It is necessary as a protection from a potential attack on the {@link
59
   * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case
60
   * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator
61
   * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra
62
   * compute to read the cumulation.
63
   *
64
   * <p>Implemented heuristic establishes a minimal threshold for the total size of the tail and
65
   * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is
66
   * used to avoid a case where attacker alternates the size of data packets to trick the cumulator
67
   * into always selecting compose strategy.
68
   *
69
   * <p>Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands
70
   * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when
71
   * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new
72
   * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure
73
   * runtime {@code O(n^2)} is amortized to {@code O(n)}.
74
   */
75
  @Override
76
  @SuppressWarnings("ReferenceEquality")
77
  public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
78
    if (!cumulation.isReadable()) {
1✔
79
      cumulation.release();
1✔
80
      return in;
1✔
81
    }
82
    CompositeByteBuf composite = null;
1✔
83
    try {
84
      if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
1✔
85
        composite = (CompositeByteBuf) cumulation;
1✔
86
        // Writer index must equal capacity if we are going to "write"
87
        // new components to the end
88
        if (composite.writerIndex() != composite.capacity()) {
1✔
89
          composite.capacity(composite.writerIndex());
×
90
        }
91
      } else {
92
        composite = alloc.compositeBuffer(Integer.MAX_VALUE)
1✔
93
            .addFlattenedComponents(true, cumulation);
1✔
94
      }
95
      addInput(alloc, composite, in);
1✔
96
      in = null;
1✔
97
      return composite;
1✔
98
    } finally {
99
      if (in != null) {
1✔
100
        // We must release if the ownership was not transferred as otherwise it may produce a leak
101
        in.release();
1✔
102
        // Also release any new buffer allocated if we're not returning it
103
        if (composite != null && composite != cumulation) {
1✔
104
          composite.release();
1✔
105
        }
106
      }
107
    }
108
  }
109

110
  @VisibleForTesting
111
  void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
112
    if (shouldCompose(composite, in, composeMinSize)) {
1✔
113
      composite.addFlattenedComponents(true, in);
1✔
114
    } else {
115
      // The total size of the new data and the last component are below the threshold. Merge them.
116
      mergeWithCompositeTail(alloc, composite, in);
1✔
117
    }
118
  }
1✔
119

120
  @VisibleForTesting
121
  static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) {
122
    int componentCount = composite.numComponents();
1✔
123
    if (composite.numComponents() == 0) {
1✔
124
      return true;
1✔
125
    }
126
    int inputSize = in.readableBytes();
1✔
127
    int tailStart = composite.toByteIndex(componentCount - 1);
1✔
128
    int tailSize = composite.writerIndex() - tailStart;
1✔
129
    return tailSize + inputSize >= composeMinSize;
1✔
130
  }
131

132
  /**
133
   * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by
134
   * expanding or replacing the tail component of the {@link CompositeByteBuf}.
135
   *
136
   * <p>The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying
137
   * the tail component into a new buffer, for each incoming single-byte buffer. We append the new
138
   * bytes to the tail, when a write (or a fast write) is possible.
139
   *
140
   * <p>Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to
141
   * achieve runtime amortization.
142
   *
143
   * <p>We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)},
144
   * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)},
145
   * which doubles buffer capacity by normalizing it to the closest power of two. This assumption
146
   * is verified in unit tests for this method.
147
   */
148
  @VisibleForTesting
149
  static void mergeWithCompositeTail(
150
      ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
151
    int inputSize = in.readableBytes();
1✔
152
    int tailComponentIndex = composite.numComponents() - 1;
1✔
153
    int tailStart = composite.toByteIndex(tailComponentIndex);
1✔
154
    int tailSize = composite.writerIndex() - tailStart;
1✔
155
    int newTailSize = inputSize + tailSize;
1✔
156
    ByteBuf tail = composite.component(tailComponentIndex);
1✔
157
    ByteBuf newTail = null;
1✔
158
    try {
159
      if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
1✔
160
        // Ideal case: the tail isn't shared, and can be expanded to the required capacity.
161

162
        // Take ownership of the tail.
163
        newTail = tail.retain();
1✔
164

165
        // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
166
        //   the issue fixed.
167
        // In certain cases, removing the CompositeByteBuf component, and then adding it back
168
        // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
169
        // This happens because the buffer returned by composite.component() has out-of-sync
170
        // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
171
        // buffer, but doesn't set the indexes.
172
        //
173
        // To get the right indexes we use the fact that composite.internalComponent() returns
174
        // the slice() into the readable portion of the underlying buffer.
175
        // We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
176
        // and combine it with the fact that SlicedByteBuf duplicates have their indexes
177
        // adjusted so they correspond to the to the readable portion of the slice.
178
        //
179
        // Hence composite.internalComponent().duplicate() returns a buffer with the
180
        // indexes that should've been on the composite.component() in the first place.
181
        // Until the issue is fixed, we manually adjust the indexes of the removed component.
182
        ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
1✔
183
        newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());
1✔
184

185
        /*
186
         * The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
187
         *
188
         * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply
189
         *   updates its boundary to the end of consecutive memory run assigned to this buffer)
190
         * - when the required size doesn't fit into writableBytes(), a new buffer is
191
         *   allocated, and the capacity calculated with alloc.calculateNewCapacity()
192
         * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf
193
         *   is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer.
194
         *   Unwrapping buffers is unsafe, and potential benefit of fast writes may not be
195
         *   as pronounced because the capacity is doubled with each reallocation.
196
         */
197
        newTail.writeBytes(in);
1✔
198

199
      } else {
1✔
200
        // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
201
        newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
1✔
202
        newTail.setBytes(0, composite, tailStart, tailSize)
1✔
203
            .setBytes(tailSize, in, in.readerIndex(), inputSize)
1✔
204
            .writerIndex(newTailSize);
1✔
205
        in.readerIndex(in.writerIndex());
1✔
206
      }
207

208
      // Store readerIndex to avoid out of bounds writerIndex during component replacement.
209
      int prevReader = composite.readerIndex();
1✔
210
      // Remove the old tail, reset writer index.
211
      composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
1✔
212
      // Add back the new tail.
213
      composite.addFlattenedComponents(true, newTail);
1✔
214
      // New tail's ownership transferred to the composite buf.
215
      newTail = null;
1✔
216
      composite.readerIndex(prevReader);
1✔
217
      // Input buffer was successfully merged with the tail.
218
      // Must be the last line in the try because we release it only on success.
219
      in.release();
1✔
220
    } finally {
221
      // If new tail's ownership isn't transferred to the composite buf.
222
      // Release it to prevent a leak.
223
      if (newTail != null) {
1✔
224
        newTail.release();
1✔
225
      }
226
    }
227
  }
1✔
228
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc