• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19731

11 Mar 2025 05:09PM UTC coverage: 88.545% (+0.02%) from 88.527%
#19731

push

github

web-flow
netty: Swap to UniformStreamByteDistributor (#11954)

34584 of 39058 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.99
/../netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Preconditions;
24
import com.google.common.base.Ticker;
25
import io.grpc.ChannelLogger;
26
import io.netty.channel.ChannelHandlerContext;
27
import io.netty.channel.ChannelPromise;
28
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
29
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
30
import io.netty.handler.codec.http2.Http2Exception;
31
import io.netty.handler.codec.http2.Http2LocalFlowController;
32
import io.netty.handler.codec.http2.Http2Settings;
33
import io.netty.handler.codec.http2.Http2Stream;
34
import java.util.concurrent.TimeUnit;
35

36
/**
37
 * Base class for all Netty gRPC handlers. This class standardizes exception handling (always
38
 * shutdown the connection) as well as sending the initial connection window at startup.
39
 */
40
abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
41
  private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
42

43
  private final int initialConnectionWindow;
44
  private final FlowControlPinger flowControlPing;
45
  protected final int maxHeaderListSize;
46
  protected final int softLimitHeaderListSize;
47
  private boolean autoTuneFlowControlOn;
48
  private ChannelHandlerContext ctx;
49
  private boolean initialWindowSent = false;
1✔
50
  private final Ticker ticker;
51

52
  private static final long BDP_MEASUREMENT_PING = 1234;
53
  protected static final int MIN_ALLOCATED_CHUNK = 16 * 1024;
54

55
  AbstractNettyHandler(
56
      ChannelPromise channelUnused,
57
      Http2ConnectionDecoder decoder,
58
      Http2ConnectionEncoder encoder,
59
      Http2Settings initialSettings,
60
      ChannelLogger negotiationLogger,
61
      boolean autoFlowControl,
62
      PingLimiter pingLimiter,
63
      Ticker ticker,
64
      int maxHeaderListSize,
65
      int softLimitHeaderListSize) {
66
    super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);
1✔
67

68
    // During a graceful shutdown, wait until all streams are closed.
69
    gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);
1✔
70

71
    // Extract the connection window from the settings if it was set.
72
    this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
1✔
73
        initialSettings.initialWindowSize();
1✔
74
    this.autoTuneFlowControlOn = autoFlowControl;
1✔
75
    if (pingLimiter == null) {
1✔
76
      pingLimiter = new AllowPingLimiter();
1✔
77
    }
78
    this.flowControlPing = new FlowControlPinger(pingLimiter);
1✔
79
    this.ticker = checkNotNull(ticker, "ticker");
1✔
80
    this.maxHeaderListSize = maxHeaderListSize;
1✔
81
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
82
  }
1✔
83

84
  @Override
85
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
86
    this.ctx = ctx;
1✔
87
    // Sends the connection preface if we haven't already.
88
    super.handlerAdded(ctx);
1✔
89
    sendInitialConnectionWindow();
1✔
90
  }
1✔
91

92
  @Override
93
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
94
    // Sends connection preface if we haven't already.
95
    super.channelActive(ctx);
1✔
96
    sendInitialConnectionWindow();
1✔
97
  }
1✔
98

99
  @Override
100
  public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
101
    Http2Exception embedded = getEmbeddedHttp2Exception(cause);
1✔
102
    if (embedded == null) {
1✔
103
      // There was no embedded Http2Exception, assume it's a connection error. Subclasses are
104
      // responsible for storing the appropriate status and shutting down the connection.
105
      onError(ctx, /* outbound= */ false, cause);
1✔
106
    } else {
107
      super.exceptionCaught(ctx, cause);
×
108
    }
109
  }
1✔
110

111
  protected final ChannelHandlerContext ctx() {
112
    return ctx;
1✔
113
  }
114

115
  /**
116
   * Sends initial connection window to the remote endpoint if necessary.
117
   */
118
  private void sendInitialConnectionWindow() throws Http2Exception {
119
    if (!initialWindowSent && ctx.channel().isActive()) {
1✔
120
      Http2Stream connectionStream = connection().connectionStream();
1✔
121
      int currentSize = connection().local().flowController().windowSize(connectionStream);
1✔
122
      int delta = initialConnectionWindow - currentSize;
1✔
123
      decoder().flowController().incrementWindowSize(connectionStream, delta);
1✔
124
      initialWindowSent = true;
1✔
125
      ctx.flush();
1✔
126
    }
127
  }
1✔
128

129
  @VisibleForTesting
130
  FlowControlPinger flowControlPing() {
131
    return flowControlPing;
1✔
132
  }
133

134
  @VisibleForTesting
135
  void setAutoTuneFlowControl(boolean isOn) {
136
    autoTuneFlowControlOn = isOn;
1✔
137
  }
1✔
138

139
  /**
140
   * Class for handling flow control pinging and flow control window updates as necessary.
141
   */
142
  final class FlowControlPinger {
143

144
    private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
145
    public static final int MAX_BACKOFF = 10;
146

147
    private final PingLimiter pingLimiter;
148
    private int pingCount;
149
    private int pingReturn;
150
    private boolean pinging;
151
    private int dataSizeSincePing;
152
    private long lastBandwidth; // bytes per nanosecond
153
    private long lastPingTime;
154
    private int lastTargetWindow;
155
    private int pingFrequencyMultiplier;
156

157
    public FlowControlPinger(PingLimiter pingLimiter) {
1✔
158
      Preconditions.checkNotNull(pingLimiter, "pingLimiter");
1✔
159
      this.pingLimiter = pingLimiter;
1✔
160
    }
1✔
161

162
    public long payload() {
163
      return BDP_MEASUREMENT_PING;
1✔
164
    }
165

166
    public int maxWindow() {
167
      return MAX_WINDOW_SIZE;
1✔
168
    }
169

170
    public void onDataRead(int dataLength, int paddingLength) {
171
      if (!autoTuneFlowControlOn) {
1✔
172
        return;
1✔
173
      }
174

175
      // Note that we are double counting around the ping initiation as the current data will be
176
      // added at the end of this method, so will be available in the next check.  This at worst
177
      // causes us to send a ping one data packet earlier, but makes startup faster if there are
178
      // small packets before big ones.
179
      int dataForCheck = getDataSincePing() + dataLength + paddingLength;
1✔
180
      // Need to double the data here to account for targetWindow being set to twice the data below
181
      if (!isPinging() && pingLimiter.isPingAllowed()
1✔
182
          && dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier) {
183
        setPinging(true);
1✔
184
        sendPing(ctx());
1✔
185
      }
186

187
      if (lastTargetWindow == 0) {
1✔
188
        lastTargetWindow =
1✔
189
            decoder().flowController().initialWindowSize(connection().connectionStream());
1✔
190
      }
191

192
      incrementDataSincePing(dataLength + paddingLength);
1✔
193
    }
1✔
194

195
    public void updateWindow() throws Http2Exception {
196
      if (!autoTuneFlowControlOn) {
1✔
197
        return;
1✔
198
      }
199
      pingReturn++;
1✔
200
      setPinging(false);
1✔
201

202
      long elapsedTime = (ticker.read() - lastPingTime);
1✔
203
      if (elapsedTime == 0) {
1✔
204
        elapsedTime = 1;
1✔
205
      }
206

207
      long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
1✔
208
      // Calculate new window size by doubling the observed BDP, but cap at max window
209
      int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
1✔
210
      Http2LocalFlowController fc = decoder().flowController();
1✔
211
      int currentWindow = fc.initialWindowSize(connection().connectionStream());
1✔
212
      if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) {
1✔
213
        pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF);
1✔
214
        return;
1✔
215
      }
216

217
      pingFrequencyMultiplier = 0; // react quickly when size is changing
1✔
218
      lastBandwidth = bandwidth;
1✔
219
      lastTargetWindow = targetWindow;
1✔
220
      int increase = targetWindow - currentWindow;
1✔
221
      fc.incrementWindowSize(connection().connectionStream(), increase);
1✔
222
      fc.initialWindowSize(targetWindow);
1✔
223
      Http2Settings settings = new Http2Settings();
1✔
224
      settings.initialWindowSize(targetWindow);
1✔
225
      frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
1✔
226
    }
1✔
227

228
    private boolean isPinging() {
229
      return pinging;
1✔
230
    }
231

232
    private void setPinging(boolean pingOut) {
233
      pinging = pingOut;
1✔
234
    }
1✔
235

236
    private void sendPing(ChannelHandlerContext ctx) {
237
      setDataSizeSincePing(0);
1✔
238
      lastPingTime = ticker.read();
1✔
239
      encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
1✔
240
      pingCount++;
1✔
241
    }
1✔
242

243
    private void incrementDataSincePing(int increase) {
244
      int currentSize = getDataSincePing();
1✔
245
      setDataSizeSincePing(currentSize + increase);
1✔
246
    }
1✔
247

248
    @VisibleForTesting
249
    int getPingCount() {
250
      return pingCount;
1✔
251
    }
252

253
    @VisibleForTesting
254
    int getPingReturn() {
255
      return pingReturn;
1✔
256
    }
257

258
    @VisibleForTesting
259
    int getDataSincePing() {
260
      return dataSizeSincePing;
1✔
261
    }
262

263
    private void setDataSizeSincePing(int dataSize) {
264
      dataSizeSincePing = dataSize;
1✔
265
    }
1✔
266

267
    // Only used in testing
268
    @VisibleForTesting
269
    void setDataSizeAndSincePing(int dataSize) {
270
      setDataSizeSincePing(dataSize);
1✔
271
      pingFrequencyMultiplier = 1;
1✔
272
      lastPingTime = ticker.read() ;
1✔
273
    }
1✔
274
  }
275

276
  /** Controls whether PINGs like those for BDP are permitted to be sent at the current time. */
277
  public interface PingLimiter {
278
    boolean isPingAllowed();
279
  }
280

281
  private static final class AllowPingLimiter implements PingLimiter {
282
    @Override public boolean isPingAllowed() {
283
      return true;
1✔
284
    }
285
  }
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc