• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19943

13 Aug 2025 09:23PM UTC coverage: 88.545% (-0.03%) from 88.573%
#19943

push

github

ejona86
Upgrade to Netty 4.1.124.Final

This implicitly disables NettyAdaptiveCumulator (#11284), which can have a
performance impact. We delayed upgrading Netty to give time to rework
the optimization, but we've gone too long already without upgrading
which causes problems for vulnerability tracking.

34659 of 39143 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.53
/../netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.Preconditions;
21
import io.netty.buffer.ByteBuf;
22
import io.netty.buffer.ByteBufAllocator;
23
import io.netty.buffer.CompositeByteBuf;
24
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
25

26

27
/**
28
 * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
29
 * compose strategies.
30
 */
31

32
class NettyAdaptiveCumulator implements Cumulator {
33
  private final int composeMinSize;
34

35
  /**
36
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
37
   * compose strategies.
38
   *
39
   * @param composeMinSize Determines the minimal size of the buffer that should be composed (added
40
   *                       as a new component of the {@link CompositeByteBuf}). If the total size
41
   *                       of the last component (tail) and the incoming buffer is below this value,
42
   *                       the incoming buffer is appended to the tail, and the new component is not
43
   *                       added.
44
   */
45
  NettyAdaptiveCumulator(int composeMinSize) {
1✔
46
    Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative");
1✔
47
    this.composeMinSize = composeMinSize;
1✔
48
  }
1✔
49

50
  /**
51
   * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
52
   * compose strategies.
53
   *
54
   * <p>This cumulator applies a heuristic to make a decision whether to track a reference to the
55
   * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into
56
   * the last component (the tail) by performing a memory copy.
57
   *
58
   * <p>It is necessary as a protection from a potential attack on the {@link
59
   * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case
60
   * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator
61
   * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra
62
   * compute to read the cumulation.
63
   *
64
   * <p>Implemented heuristic establishes a minimal threshold for the total size of the tail and
65
   * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is
66
   * used to avoid a case where attacker alternates the size of data packets to trick the cumulator
67
   * into always selecting compose strategy.
68
   *
69
   * <p>Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands
70
   * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when
71
   * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new
72
   * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure
73
   * runtime {@code O(n^2)} is amortized to {@code O(n)}.
74
   */
75
  @Override
76
  @SuppressWarnings("ReferenceEquality")
77
  public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
78
    if (!cumulation.isReadable()) {
1✔
79
      cumulation.release();
1✔
80
      return in;
1✔
81
    }
82
    CompositeByteBuf composite = null;
1✔
83
    try {
84
      if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
1✔
85
        composite = (CompositeByteBuf) cumulation;
1✔
86
        // Writer index must equal capacity if we are going to "write"
87
        // new components to the end
88
        if (composite.writerIndex() != composite.capacity()) {
1✔
89
          composite.capacity(composite.writerIndex());
×
90
        }
91
      } else {
92
        composite = alloc.compositeBuffer(Integer.MAX_VALUE)
1✔
93
            .addFlattenedComponents(true, cumulation);
1✔
94
      }
95
      addInput(alloc, composite, in);
1✔
96
      in = null;
1✔
97
      return composite;
1✔
98
    } finally {
99
      if (in != null) {
1✔
100
        // We must release if the ownership was not transferred as otherwise it may produce a leak
101
        in.release();
1✔
102
        // Also release any new buffer allocated if we're not returning it
103
        if (composite != null && composite != cumulation) {
1✔
104
          composite.release();
1✔
105
        }
106
      }
107
    }
108
  }
109

110
  @VisibleForTesting
111
  void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
112
    if (shouldCompose(composite, in, composeMinSize)) {
×
113
      composite.addFlattenedComponents(true, in);
×
114
    } else {
115
      // The total size of the new data and the last component are below the threshold. Merge them.
116
      mergeWithCompositeTail(alloc, composite, in);
×
117
    }
118
  }
×
119

120
  @VisibleForTesting
121
  static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) {
122
    int componentCount = composite.numComponents();
1✔
123
    if (composite.numComponents() == 0) {
1✔
124
      return true;
1✔
125
    }
126
    int inputSize = in.readableBytes();
1✔
127
    int tailStart = composite.toByteIndex(componentCount - 1);
1✔
128
    int tailSize = composite.writerIndex() - tailStart;
1✔
129
    return tailSize + inputSize >= composeMinSize;
1✔
130
  }
131

132
  /**
133
   * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by
134
   * expanding or replacing the tail component of the {@link CompositeByteBuf}.
135
   *
136
   * <p>The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying
137
   * the tail component into a new buffer, for each incoming single-byte buffer. We append the new
138
   * bytes to the tail, when a write (or a fast write) is possible.
139
   *
140
   * <p>Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to
141
   * achieve runtime amortization.
142
   *
143
   * <p>We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)},
144
   * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)},
145
   * which doubles buffer capacity by normalizing it to the closest power of two. This assumption
146
   * is verified in unit tests for this method.
147
   */
148
  @VisibleForTesting
149
  static void mergeWithCompositeTail(
150
      ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
151
    int inputSize = in.readableBytes();
1✔
152
    int tailComponentIndex = composite.numComponents() - 1;
1✔
153
    int tailStart = composite.toByteIndex(tailComponentIndex);
1✔
154
    int tailSize = composite.writerIndex() - tailStart;
1✔
155
    int newTailSize = inputSize + tailSize;
1✔
156
    ByteBuf tail = composite.component(tailComponentIndex);
1✔
157
    ByteBuf newTail = null;
1✔
158
    try {
159
      if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
1✔
160
        // Ideal case: the tail isn't shared, and can be expanded to the required capacity.
161

162
        // Take ownership of the tail.
163
        newTail = tail.retain();
1✔
164

165
        // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
166
        //   the issue fixed.
167
        // In certain cases, removing the CompositeByteBuf component, and then adding it back
168
        // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
169
        // This happens because the buffer returned by composite.component() has out-of-sync
170
        // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
171
        // buffer, but doesn't set the indexes.
172
        //
173
        // To get the right indexes we use the fact that composite.internalComponent() returns
174
        // the slice() into the readable portion of the underlying buffer.
175
        // We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
176
        // and combine it with the fact that SlicedByteBuf duplicates have their indexes
177
        // adjusted so they correspond to the to the readable portion of the slice.
178
        //
179
        // Hence composite.internalComponent().duplicate() returns a buffer with the
180
        // indexes that should've been on the composite.component() in the first place.
181
        // Until the issue is fixed, we manually adjust the indexes of the removed component.
182
        ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
1✔
183
        newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());
1✔
184

185
        /*
186
         * The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
187
         *
188
         * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply
189
         *   updates its boundary to the end of consecutive memory run assigned to this buffer)
190
         * - when the required size doesn't fit into writableBytes(), a new buffer is
191
         *   allocated, and the capacity calculated with alloc.calculateNewCapacity()
192
         * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf
193
         *   is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer.
194
         *   Unwrapping buffers is unsafe, and potential benefit of fast writes may not be
195
         *   as pronounced because the capacity is doubled with each reallocation.
196
         */
197
        newTail.writeBytes(in);
1✔
198

199
      } else {
1✔
200
        // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
201
        newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
1✔
202
        newTail.setBytes(0, composite, tailStart, tailSize)
1✔
203
            .setBytes(tailSize, in, in.readerIndex(), inputSize)
1✔
204
            .writerIndex(newTailSize);
1✔
205
        in.readerIndex(in.writerIndex());
1✔
206
      }
207

208
      // Store readerIndex to avoid out of bounds writerIndex during component replacement.
209
      int prevReader = composite.readerIndex();
1✔
210
      // Remove the old tail, reset writer index.
211
      composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
1✔
212
      // Add back the new tail.
213
      composite.addFlattenedComponents(true, newTail);
1✔
214
      // New tail's ownership transferred to the composite buf.
215
      newTail = null;
1✔
216
      composite.readerIndex(prevReader);
1✔
217
      // Input buffer was successfully merged with the tail.
218
      // Must be the last line in the try because we release it only on success.
219
      in.release();
1✔
220
    } finally {
221
      // If new tail's ownership isn't transferred to the composite buf.
222
      // Release it to prevent a leak.
223
      if (newTail != null) {
1✔
224
        newTail.release();
1✔
225
      }
226
    }
227
  }
1✔
228
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc